过年被病毒闹的出不了门,就顺带着把flink秒级统计的逻辑写了一下,逻辑很简单,就是从kafka消费数据,然后select count(*) from xxx group by xxx来一秒产出一个打点。程序代码非常简单
```java
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //并发数量小于等于kafka的partition数量,否则不会触发水位线 bsEnv.setParallelism(1); //get input from kafka String[] fieldNamesOriginal = new String[]{"i","t","op"}; TypeInformation[] typesOriginal = new TypeInformation[]{Types.STRING(), Types.JAVA_BIG_DEC(), Types.STRING()}; Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "xxxx.com:9200"); properties.setProperty("group.id", "xxxxxxx"); //主动提交offset,不利用flink的job properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); FlinkKafkaConsumer011<Row> kafkaConsumer011 = new FlinkKafkaConsumer011<>("xxxxxxxx", new JsonRowDeserializationSchema(new RowTypeInfo(typesOriginal, fieldNamesOriginal)), properties); kafkaConsumer011.setStartFromLatest(); DataStream<Row> streamSource = bsEnv .addSource(kafkaConsumer011) .uid("from_kafka_topic_is"); TypeInformation[] returnTypeTmp = new TypeInformation[]{Types.STRING(), Types.SQL_TIME(), Types.STRING()}; DataStream<Row> dataStreamWithTime = streamSource.map(new MapFunction<Row, Row>() { @Override public Row map(Row value) throws Exception { //SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Row row = new Row(3); BigDecimal time = (BigDecimal)value.getField(1); row.setField(0, value.getField(0)); row.setField(1, new Time(time.longValue())); row.setField(2, value.getField(2)); return row; } }) .returns(new RowTypeInfo(returnTypeTmp, fieldNamesOriginal)) .uid("add_time") .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Row>(org.apache.flink.streaming.api.windowing.time.Time.seconds(2)) { @Override public long extractTimestamp(Row element) { return ((Time)element.getField(1)).getTime(); } }); bsTableEnv.registerDataStream("_source_table", dataStreamWithTime, "i,t,op, rowtime.rowtime"); bsTableEnv.connect(new Kafka() .version("0.11") .topic("s-log") .property("bootstrap.servers", "abc.com:9200") .startFromEarliest()) .withFormat(new Json().jsonSchema("{\"type\":\"object\",\"properties\":{\"index_name\":{\"type\":\"string\"},\"event_time_log\":{\"type\":\"string\"},\"op_type\":{\"type\":\"string\"},\"count_result\":{\"type\":\"number\"}}}")) .withSchema(new Schema() .field("index_name", Types.STRING()) .field("event_time_log", Types.STRING()) .field("op_type", Types.STRING()) .field("count_result", Types.LONG())) .inAppendMode() .registerTableSink("operator_log_result"); bsTableEnv.sqlUpdate("insert into operator_log_result " + "select i as index_name, DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '1' SECOND), 'yyyy-MM-dd HH:mm:ss') as event_time_log, op as op_type, count(op) as count_result " + "from _source_table group by TUMBLE(rowtime, INTERVAL '1' SECOND) , i, op"); bsEnv.execute("data-metrics");
代码如上,非常简单,输入的数据类似于
```json
{"t":1580896327330,"st":"DIRECT","i":"hitch_passenger_order_2020_01_22_23_50_40","op":"es_sink","id":",guid:15756341902141200002900","ut":",order_update_time:2020-02-05 17:52:07.190435","msg":"","rId":"0.1.1.1"}
{"t":1580896327831,"st":"DIRECT","i":"hitch_passenger_order_2020_01_22_23_50_40","op":"es_sink","id":",guid:15756341902141200002900","ut":",order_update_time:2020-02-05 17:52:07.687468","msg":"","rId":"0.1.1.1"}
{"t":1580896328131,"st":"DIRECT","i":"hitch_passenger_order_2020_01_22_23_50_40","op":"es_sink","id":",guid:15756341902141200002900","ut":",order_update_time:2020-02-05 17:52:08.047474","msg":"","rId":"0.1.1.1"}
然而,死活就是无法输出正确的统计数据。但是当我把setParallelism并发设置为1的时候,就能正常输出统计数据。太神奇了,为什么跟并发度有关系呢?
于是我在本地打开了跟踪调试功能,我发现问题出在StatusWatermarkValve这个类里,这个类的这个方法是这么写的
```java
private void findAndOutputNewMinWatermarkAcrossAlignedChannels() { long newMinWatermark = Long.MAX_VALUE; boolean hasAlignedChannels = false; // determine new overall watermark by considering only watermark-aligned channels across all channels for (InputChannelStatus channelStatus : channelStatuses) { if (channelStatus.isWatermarkAligned) { hasAlignedChannels = true; newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); } } // we acknowledge and output the new overall watermark if it really is aggregated // from some remaining aligned channel, and is also larger than the last output watermark if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) { lastOutputWatermark = newMinWatermark; outputHandler.handleWatermark(new Watermark(lastOutputWatermark)); } }
这个方法跟踪调试,正常的时候是这样的也是比较符合我们预期的,到点了就该触发了。
而不正常的时候,多个并发的时候,是这样的
可以看到其他的subtask取到的watermark是一个负值,也就是初始值。
这个时候flink怎么处理的呢?根据刚刚的代码,他会取并发里面最小的一个值作为watermark,也就是那个最小的long型值。
然后一比较,跟默认值等的,不满足触发的条件,也就未能触发事件了。
看到一篇文章是这么写的
文章引用: https://www.jianshu.com/p/753e8cf803bb
我感觉是flink的代码不合理导致的,至此,困扰我多日的一个问题终于有了答案。