flink使用时间戳作为起始offset消费kafka的问题

代码如下
private static voidtest2()throwsException {
    EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
   StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
   StreamTableEnvironment tttt = StreamTableEnvironment.create(fsEnv,fsSettings);
   Properties properties =newProperties();
   properties.setProperty("bootstrap.servers","localhost:9092");
    properties.setProperty("group.id","test"+ System.currentTimeMillis());
   FlinkKafkaConsumer011<String> kafkaConsumer011 =newFlinkKafkaConsumer011<String>("test", newSimpleStringSchema(),properties);
   kafkaConsumer011.setStartFromTimestamp(System.currentTimeMillis());
   DataStream<String> stream = fsEnv.addSource(kafkaConsumer011);
   stream.print();
   fsEnv.execute("test");
}2019-10-21 23:06:58.970 [Thread-8] INFO  o.a.f.s.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 creating fetcher with offsets {}.
表现为,无法收到kafka发送过来的消息。这就很奇怪了,因为我使用kafka-consumer-console.sh的方式可以看到实际上是有消息产出的,但是这里死活就收不到数据。

进一步的测试,我把timestamp改成提前了很长的时间,就能正常工作。
进一步测试,如果刚好设置的timestamp那个点有消息产生,那么也能正常工作。
对比了下,能拿到数据和不能拿到数据的区别。下面是不能拿到数据时候的日志输出,
2019-10-21 23:06:58.970 [Thread-10] INFO  o.a.f.s.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 3 creating fetcher with offsets {}.
2019-10-21 23:06:58.970 [Thread-7] INFO  o.a.f.s.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 1 creating fetcher with offsets {}.
2019-10-21 23:07:01.100 [Source: Custom Source -> Sink: Print to Std. Out (3/4)] INFO  o.a.f.s.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 2 initially has no partitions to read from.
2019-10-21 23:07:01.108 [Thread-9] INFO  o.a.f.s.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 2 creating fetcher with offsets {}.
而下面是能拿到数据时候的日志输出, 可以看到有一个task成功拿到了这个partition的offset
2019-10-21 23:14:11.871 [Thread-7] INFO  o.a.f.s.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 2 creating fetcher with offsets {KafkaTopicPartition{topic='test', partition=0}=35}.
所以原因是因为,通过kafka的consumer的api,consumer.offsetsForTimes的时候,能否拿到offset,如果拿不到在KafkaConsumerThread的线程的run方法里,有一个这个代码表明这个方法是阻塞的,一直等到有新的partition到来,才会继续。而新的partition不会到来,所以这里就有问题了。。。
try {
   if (hasAssignedPartitions) {
      newPartitions = unassignedPartitionsQueue.pollBatch();
   }
   else {
      // if no assigned partitions block until we get at least one
      // instead of hot spinning this loop. We rely on a fact that
      // unassignedPartitionsQueue will be closed on a shutdown, so
      // we don't block indefinitely
      newPartitions = unassignedPartitionsQueue.getBatchBlocking();
   }
   if (newPartitions != null) {
      reassignPartitions(newPartitions);
   }
} catch (AbortedReassignmentException e) {
   continue;
}
那么kafka的这个offsetsForTimes的接口为何不能返回结果呢 ?
private static void testConsumer() {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("test"));
    TopicPartition topicPartition = new TopicPartition("test", 0);
    long current = System.currentTimeMillis();
    for (int i=0; i<10000; i++) {
        Map<TopicPartition, Long> map = new HashMap<>();
        map.put(topicPartition, current - i*1000);
        Map<TopicPartition, OffsetAndTimestamp> result  = consumer.offsetsForTimes(map);
        System.out.println("===================test : " + i + "==================");
        System.out.println(new Gson().toJson(result));
    }
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}
通过如上代码我们发现,如果这个timestamp之后有数据产出,那么就能正确的拿到offset,否则就拿不到。
也就是我们最后一条数据如果是18:00产生的,那么如果你的时间戳早于18:00是能正确的拿到offset的,如果比18:00晚,那么就拿不到了。
总结,问题在于flink和kafka的配合。kafka的特征是,时间戳之后如果没有消息,那么就不返回offset(我认为合理的应该是返回last offset + 1) , 那么flink发现没有返回offset是怎么处理的呢?那就不消费数据了,因为他觉得返回的数据并不符合自己的预期,他也不知道从哪里开始取了。如果从offset+1开始取,可能会出错,干脆就不取了。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注