New ElasticSearch()…xxx...registerTableSink();
ElasticSearch是一个ConnectorDescriptor: 最关键的就是一个toConnectorProperties就是一个配置的map,传给factory使用,他其实是一个配置收集器.
TableFactoryUtils : 生成一个TableFactory的工具类,
findAndCreateTableSink( ) 通过一个描述来生成一个工厂类.//描述里有必要的配置.
先找到合适的工厂类,然后创建一个TableSink.
第一步,A先找到合适的工厂类,通过properties里的配置来寻找,
TableFactoryService.find(TableSinkFactory.class, properties)
findSingleInternal()里的两个代码是关键代码
A.1 : List<TableFactory> tableFactories = discoverFactories(classLoader); 通过classLoader(可以传入, 否则使用默认)找到所有的Factory.
A.2: 然后开始过滤, List<T> filtered = filter(tableFactories, factoryClass, properties)
A.2.1 : 首先检查类型, TableSinkFactory.class filterByFactoryClass(factoryClass,properties,foundFactories)
A.2.2 : 然后检查必须的配置是不是设置了, List<T> contextFactories = filterByContext(factoryClass,properties,foundFactories,classFactories);
其中必备的配置是在factory里配置和定义的,必须不能为空.这个是有TableFactory的requiredContext()方法定义的.
A.2.3 : 经过以上两轮的过滤,然后再继续检查,依然是TableFactoryService.java,这里面的
private static <T extends TableFactory> List<T> filterBySupportedProperties(
Class<T> factoryClass,
Map<String, String> properties,
List<TableFactory> foundFactories,
List<T> classFactories) 方法
这里主要是TableFactory里的两个方法定义的属性
Map<String, String> requiredContext();
List<String> supportedProperties();
然后和用户Descriptor(如 ElasticSearch)里设置的属性匹配,其中有字符忽略大小写,星号匹配等细节,不再研究.
然后这就找到了工厂类,接下来要生成TableSink了。例如 ElasticSearch找到的就是Elasticsearch6UpsertTableSinkFactory.
第二步,生成TableSink.在ElasticSearch中,他是直接的new了一个Elasticsearch6UpsertTableSink.而这个对象又是一个StreamTableSink,这样就生成了一个StreamTableSink, 这个sink可以交给flink直接使用了。在ElasticSearch这个TableSink里,最关键的是实现了consumeDataStream,这个决定了sink的写逻辑.注意区分的是一个是StreamTableSink是给Table接口使用的,另一个是DataStreamSink是给DataStream使用的。这里是连接Table和Stream接口的地方。所以说,Table的API其实还是基于Stream的API的。
生成了Elasticsearch的TableSink之后,可以看到到这个类的用途是为了向es写入数据,他的作用是生成一个createSinkFunction。
createSinkFunction是具体的连接es,向es写入数据的逻辑,具体的实现是ElasticsearchUpsertSinkFunction的process的方法。是由ElasticsearchSinkBase发起的调用。
在写入ElasticSearch的时候,如何让_id使用数据里的指定的field的值?
在ES2.0之后,mapping里设置_id就被移除了这个特性。目的是为了让业务方不需要关心es的实现细节。在es connector里,他会根据你的table的primary key来生成这个_id的值。一般情况下,在append模式下,因为只是insert,所以他都是随机的。只有在update的时候,才需要根据primary key查找到元素并且进行更新。具体的实现在processUpsert方法。
private void processUpsert(Row row, RequestIndexer indexer) {
final byte[] document = serializationSchema.serialize(row);
if (keyFieldIndices.length == 0) {
final IndexRequest indexRequest = requestFactory.createIndexRequest(
index,
docType,
contentType,
document);
indexer.add(indexRequest);
} else {
final String key = createKey(row);
final UpdateRequest updateRequest = requestFactory.createUpdateRequest(
index,
docType,
key,
contentType,
document);
indexer.add(updateRequest);
}
}
这个方法的关键是keyFieldIndices这个属性。如果这个属性没值,那么久不指定key,让他自动生成_id,否则就从元数据里拿到这个字段值,再拼接成_id.而这个值是由方法
public void setKeyFields(String[] keyNames) {} 来设置的。这个就是tablesink接口里定义的方法。这个方法可不是给业务方调用的,这个是flink框架调用的。所以你显示的调用这个方法也没用,他flink最终会覆盖掉你的配置。
/**
* Configures the unique key fields of the {@linkTable} to write.
* The method is called after {@linkTableSink#configure(String[], TypeInformation[])}.
*
*<p>The keys array might be empty, if the table consists of a single (updated) record.
* If the table does not have a key and is append-only, the keys attribute is null.
*
*@paramkeysthe field names of the table's keys, an empty array if the table has a single
* row, and null if the table is append-only and has no key.
*/
voidsetKeyFields(String[] keys);
说明在这里。说的很清楚了,如果是append-only的模式,传入的keys就为空,否则就用table 的 primary key. 所以你可以定义一个table,指定主键。
找了一圈发现并没有定义table 的 primary的语法,翻看flink的源码发现他是自动判断的,具体的是根据你的sql语句来判断的。
UpdatingPlanChecker.scala是一个scala的不太好看明白,具体的在.private class UniqueKeyExtractor {} 这个类里面。他的寻找方式是,如果你是group by 语句,那么group by 就是你的key.如果你是join,那就看你左边的表和你右边的表来分别递归计算。
// Output of join must have keys if left and right both contain key(s).
// Key groups from both side will be merged by join equi-predicates
val lInNames: Seq[String] = j.getLeft.getRowType.getFieldNames
val rInNames: Seq[String] = j.getRight.getRowType.getFieldNames
val joinNames = j.getRowType.getFieldNames
// if right field names equal to left field names, calcite will rename right
// field names. For example, T1(pk, a) join T2(pk, b), calcite will rename T2(pk, b)
总之就是,只有在聚合计算的时候才会有key, 也就是group by 后面的内容。那这不就扯了吗? ElasticSearch connector 不就只支持插入操作了吗?
Ps: flink后续会支持在table schema里定义primary key, 已经提交了pr,https://github.com/apache/flink/pull/8736 可惜现在还没有。pat ~