flink读取kafka数据未能触发watermark

过年被病毒闹的出不了门,就顺带着把flink秒级统计的逻辑写了一下,逻辑很简单,就是从kafka消费数据,然后select count(*) from xxx group by xxx来一秒产出一个打点。程序代码非常简单

```java

StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//并发数量小于等于kafka的partition数量,否则不会触发水位线
bsEnv.setParallelism(1);
//get input from kafka
String[] fieldNamesOriginal = new String[]{"i","t","op"};
TypeInformation[] typesOriginal = new TypeInformation[]{Types.STRING(), Types.JAVA_BIG_DEC(), Types.STRING()};
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "xxxx.com:9200");
properties.setProperty("group.id", "xxxxxxx");
//主动提交offset,不利用flink的job
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
FlinkKafkaConsumer011<Row> kafkaConsumer011 = new FlinkKafkaConsumer011<>("xxxxxxxx",
        new JsonRowDeserializationSchema(new RowTypeInfo(typesOriginal, fieldNamesOriginal)), properties);
kafkaConsumer011.setStartFromLatest();
DataStream<Row> streamSource = bsEnv
        .addSource(kafkaConsumer011)
        .uid("from_kafka_topic_is");
TypeInformation[] returnTypeTmp = new TypeInformation[]{Types.STRING(), Types.SQL_TIME(), Types.STRING()};
DataStream<Row> dataStreamWithTime = streamSource.map(new MapFunction<Row, Row>() {
    @Override
    public Row map(Row value) throws Exception {
        //SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Row row = new Row(3);
        BigDecimal time = (BigDecimal)value.getField(1);
        row.setField(0, value.getField(0));
        row.setField(1, new Time(time.longValue()));
        row.setField(2, value.getField(2));
        return row;
    }
})
        .returns(new RowTypeInfo(returnTypeTmp, fieldNamesOriginal))
        .uid("add_time")
        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Row>(org.apache.flink.streaming.api.windowing.time.Time.seconds(2)) {
            @Override
            public long extractTimestamp(Row element) {
                return ((Time)element.getField(1)).getTime();
            }
        });
bsTableEnv.registerDataStream("_source_table", dataStreamWithTime, "i,t,op, rowtime.rowtime");
bsTableEnv.connect(new Kafka()
        .version("0.11")
        .topic("s-log")
        .property("bootstrap.servers", "abc.com:9200")
        .startFromEarliest())
        .withFormat(new Json().jsonSchema("{\"type\":\"object\",\"properties\":{\"index_name\":{\"type\":\"string\"},\"event_time_log\":{\"type\":\"string\"},\"op_type\":{\"type\":\"string\"},\"count_result\":{\"type\":\"number\"}}}"))
        .withSchema(new Schema()
                .field("index_name", Types.STRING())
                .field("event_time_log", Types.STRING())
                .field("op_type", Types.STRING())
                .field("count_result", Types.LONG()))
        .inAppendMode()
        .registerTableSink("operator_log_result");
bsTableEnv.sqlUpdate("insert into operator_log_result " +
        "select  i as index_name,  DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '1' SECOND), 'yyyy-MM-dd HH:mm:ss') as event_time_log, op as op_type, count(op) as count_result " +
        "from _source_table group by TUMBLE(rowtime, INTERVAL '1' SECOND) , i, op");
bsEnv.execute("data-metrics");




代码如上,非常简单,输入的数据类似于

```json

{"t":1580896327330,"st":"DIRECT","i":"hitch_passenger_order_2020_01_22_23_50_40","op":"es_sink","id":",guid:15756341902141200002900","ut":",order_update_time:2020-02-05 17:52:07.190435","msg":"","rId":"0.1.1.1"}

{"t":1580896327831,"st":"DIRECT","i":"hitch_passenger_order_2020_01_22_23_50_40","op":"es_sink","id":",guid:15756341902141200002900","ut":",order_update_time:2020-02-05 17:52:07.687468","msg":"","rId":"0.1.1.1"}

{"t":1580896328131,"st":"DIRECT","i":"hitch_passenger_order_2020_01_22_23_50_40","op":"es_sink","id":",guid:15756341902141200002900","ut":",order_update_time:2020-02-05 17:52:08.047474","msg":"","rId":"0.1.1.1"}

 

然而,死活就是无法输出正确的统计数据。但是当我把setParallelism并发设置为1的时候,就能正常输出统计数据。太神奇了,为什么跟并发度有关系呢?

于是我在本地打开了跟踪调试功能,我发现问题出在StatusWatermarkValve这个类里,这个类的这个方法是这么写的

```java

private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
   long newMinWatermark = Long.MAX_VALUE;
   boolean hasAlignedChannels = false;

   // determine new overall watermark by considering only watermark-aligned channels across all channels
   for (InputChannelStatus channelStatus : channelStatuses) {
      if (channelStatus.isWatermarkAligned) {
         hasAlignedChannels = true;
         newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
      }
   }

   // we acknowledge and output the new overall watermark if it really is aggregated
   // from some remaining aligned channel, and is also larger than the last output watermark
   if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
      lastOutputWatermark = newMinWatermark;
      outputHandler.handleWatermark(new Watermark(lastOutputWatermark));
   }
}

这个方法跟踪调试,正常的时候是这样的也是比较符合我们预期的,到点了就该触发了。

而不正常的时候,多个并发的时候,是这样的

可以看到其他的subtask取到的watermark是一个负值,也就是初始值。

 

这个时候flink怎么处理的呢?根据刚刚的代码,他会取并发里面最小的一个值作为watermark,也就是那个最小的long型值。

然后一比较,跟默认值等的,不满足触发的条件,也就未能触发事件了。

看到一篇文章是这么写的

 

文章引用: https://www.jianshu.com/p/753e8cf803bb

 

我感觉是flink的代码不合理导致的,至此,困扰我多日的一个问题终于有了答案。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注