flink相关 – 读入kafka数据源

业务代码如下 :
fsTableEnv.connect(
       newKafka()
                .version("0.11")
                .topic("xxxx")
                .property("bootstrap.servers”,”xxxxx:xxxx")
                .property("group.id","test4444")
                .startFromEarliest()//测试需要
)
        .withSchema(newSchema().schema(newTableSchema(fields, types)))
        .withFormat(newJson().schema(newRowTypeInfo(types, fields)))
        .inAppendMode()
        .registerTableSource(“testTable”);
问题是:读出来的数据总是null
DataStream<Row> table = fsTableEnv.toAppendStream(fsTableEnv.sqlQuery("select * from testTable"),Row.class);
table.print();

Kafka.java 这个类依然是一个配置收集器,就是为了收集一堆配置,然后给后面的工厂方法使用。依然通过FactoryService找到了Kafka011TableSourceSinkFactory这个工厂类。
Kafka011TableSourceSinkFactory : 作用是为了生成Kafka011TableSource这个TableSource。
Kafka011TableSource: 这个类的主要功效还是一个TableSource除了定义了TableSchema和ReturnType,最主要的是完成了他的核心功能,getDataStream()方法
FlinkKafkaConsumerBase : 由上一步的getDataStream方法,其实是为了生成这个类。这个类是一个SourceFunction,其中弹出信息的方法是run方法,我们继续看他的run方法
最后跟踪定位到convert那个地方,才发现,原来解析的时候,他使用的是完整的json schema,而不是仅仅是data里的field字段,例如你传给kafka的json schema是包含了data平级的那些字段,table,operation等。而我传过去的是data里的field字段,当然就错了。
{
    "schema": “44444",
    "table": "safs",
    "operation": "INSERT",
    "data": {
        "start_time": "2019-10-10 23:00:00",
        "end_point": "0101000020E61000008BDEA9807B545E4062D9CC21A9313F40",
        "start_point": "0101000020E61000002250FD8348575E40C284D1AC6C1F3F40",
        "user_new_id": 1200001917,
        "end_adcode": null,
        "end_time": null,
        "distance": null,
        "seat_count": 4
    },
    "operateTime": 1570602631926
}
跟踪调试,发现具体的是在Kafka09Fetcher的runFetchLoop方法里有一段,final T value = deserializer.deserialize(record);解析出来就出错了,就空了。继续跟踪调试,发现是在 JsonRowDeserializationSchema类的
@Override
public Row deserialize(byte[] message) throws IOException {
   try {
      final JsonNode root = objectMapper.readTree(message);
      return (Row) runtimeConverter.convert(objectMapper, root);
   } catch (Throwable t) {
      throw new IOException("Failed to deserialize JSON object.", t);
   }
}
这段代码里,root解析出来是好的,然后他开始convert.
private DeserializationRuntimeConverter assembleRowConverter(
   String[] fieldNames,
   List<DeserializationRuntimeConverter> fieldConverters) {
   return (mapper, jsonNode) -> {
      ObjectNode node = (ObjectNode) jsonNode;
      int arity = fieldNames.length;
      Row row = new Row(arity);
      for (int i = 0; i < arity; i++) {
         String fieldName = fieldNames[i];
         JsonNode field = node.get(fieldName);
         Object convertField = convertField(mapper, fieldConverters.get(i), fieldName, field);
         row.setField(i, convertField);
      }
      return row;
   };
}
其中有这么一段JsonNode field = node.get(fieldName);他拿着你的fieldName去json数据里get,因为少了一个data层,所以就出错了。
这个kafka的连接器的依赖也挺奇葩的 。
0.11版本依赖0.10.再依赖0.9……..让人很迷惑.

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注