分析和解决ElasticSearch数据同步丢失问题

背景

给普惠顺风车和打车使用的匹配引擎,他数据同步使用的是搜索dump引擎,dump引擎用flink程序消费往es引擎写数据。临近930大促却频繁的发生数据丢失的情况,经过分析发现flink里的部分worker发生了错误,而且错误之后就一直报错,再也无法恢复。这个问题之前就小概率发生,但是一般发生之后也没有什么问题,自己就恢复了,所以一直没有在意。但是最近发生的越来越频繁,又是临近930,又涉及到了普惠的底层最核心的数据服务层,一旦在930当天发生,就会导致顺风车和打车交易跌0的故障,直接影响目标达成,因此不得不重点关注起来。
我们首先做的一件事,是把这个情况监控起来,由于日志是在flink的TaskManager的日志里,只能通过手动查看是非常被动的,在询问了数据平台的同事之后,了解到这些日志是已经收集了(不过被某些规则过滤了)。然后针对这个错误做了一个监控和告警,一旦有发生,能第一时间知道,然后人肉的介入处理。

分析

光靠告警+人肉的处理,终归不是办法,人总不能24小时在电脑旁边,还是得分析问题的原因,然后解决他。
先看看报错如下,I/O reactor status: STOPPED,有了报错关键字,就比较容易寻找答案了。

2022-09-13 07:39:58,919 ERROR COMMON_ERROR                                       [] - action [update {[hitch_economy_hitch_economy_car_driver_2022_09_10_11_18_24][_doc][1459871599], doc_as_upsert[true], doc[index {[null][_doc][null], source[{"engine_type":1,"driver_status":"121 131 141 152 161 112 221 232 211","user_new_id":1459871599,"open_status":20}]}], scripted_upsert[false], detect_noop[true]}] sink to es failed [Request cannot be executed; I/O reactor status: STOPPED]
java.lang.IllegalStateException: Request cannot be executed; I/O reactor status: STOPPED
    at org.apache.http.util.Asserts.check(Asserts.java:46) ~[16590.jar:?]
    at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase.ensureRunning(CloseableHttpAsyncClientBase.java:90) ~[16590.jar:?]
    at org.apache.http.impl.nio.client.InternalHttpAsyncClient.execute(InternalHttpAsyncClient.java:123) ~[16590.jar:?]
    at org.elasticsearch.client.RestClient.lambda$performRequestAsync$0(RestClient.java:327) ~[16590.jar:?]
    at org.elasticsearch.client.Cancellable.runIfNotCancelled(Cancellable.java:81) ~[16590.jar:?]
    at org.elasticsearch.client.RestClient.performRequestAsync(RestClient.java:325) ~[16590.jar:?]
    at org.elasticsearch.client.RestClient.performRequestAsync(RestClient.java:314) ~[16590.jar:?]
    at org.elasticsearch.client.RestHighLevelClient.internalPerformRequestAsync(RestHighLevelClient.java:1653) ~[16590.jar:?]
    at org.elasticsearch.client.RestHighLevelClient.performRequestAsync(RestHighLevelClient.java:1614) ~[16590.jar:?]
    at org.elasticsearch.client.RestHighLevelClient.performRequestAsyncAndParseEntity(RestHighLevelClient.java:1580) ~[16590.jar:?]
    at org.elasticsearch.client.RestHighLevelClient.bulkAsync(RestHighLevelClient.java:509) ~[16590.jar:?]
    at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUniversalApiCallBridge.lambda$createBulkProcessorBuilder$0(ElasticsearchUniversalApiCallBridge.java:86) ~[16590.jar:?]
    at org.elasticsearch.action.bulk.Retry$RetryHandler.execute(Retry.java:205) [16590.jar:?]
    at org.elasticsearch.action.bulk.Retry.withBackoff(Retry.java:59) [16590.jar:?]
    at org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:62) [16590.jar:?]
    at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) [16590.jar:?]
    at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) [16590.jar:?]
    at org.elasticsearch.action.bulk.BulkProcessor.access$400(BulkProcessor.java:54) [16590.jar:?]
    at org.elasticsearch.action.bulk.BulkProcessor$Flush.run(BulkProcessor.java:504) [16590.jar:?]
    at org.elasticsearch.threadpool.Scheduler$ReschedulingRunnable.doRun(Scheduler.java:223) [16590.jar:?]
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [16590.jar:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_201]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_201]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_201]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_201]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_201]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_201]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]

初步方案

经过一通搜索,发现这个是ES Client的官方bug,并且有很多人遇到,而到目前为止,es官方并未能很好的解决。https://github.com/elastic/elasticsearch/issues/49124 这里有非常热烈的讨论,感兴趣的可以仔细爬下E文,有人说是httpclient的bug,要升级httpclient到5.0,也有人说是因为handler没处理好,并给出了示例代码,于是我们直接拷贝和使用了示例代码

RestClientBuilder builder = RestClient.builder(...);

CustomHttpClientConfigCallback configurationCallback = new CustomHttpClientConfigCallback();
builder.setHttpClientConfigCallback(configurationCallback);

public static class CustomHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback {
  @Override
  public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {

    // Add custom exception handler.
    // - https://hc.apache.org/httpcomponents-core-ga/tutorial/html/nio.html#d5e601
    // - This always handles the exception and just logs a warning.
    try {
      DefaultConnectingIOReactor ioReactor = new DefaultConnectingIOReactor();
      ioReactor.setExceptionHandler(new IOReactorExceptionHandler() {
        @Override
        public boolean handle(IOException e) {
          logger.warn("System may be unstable: IOReactor encountered a checked exception : " + e.getMessage(), e);
          return true; // Return true to note this exception as handled, it will not be re-thrown
        }

        @Override
        public boolean handle(RuntimeException e) {
          logger.warn("System may be unstable: IOReactor encountered a runtime exception : " + e.getMessage(), e);
          return true; // Return true to note this exception as handled, it will not be re-thrown
        }
      });

      httpClientBuilder.setConnectionManager(new PoolingNHttpClientConnectionManager(ioReactor));
    } catch (IOReactorException e) {
      throw new RuntimeException(e);
    }

    return httpClientBuilder;
  }
}

本来以为解决了,很安心的睡觉去了,但是在0909中秋节大促的当天,又发生了同样问题,所幸0909并不是打车的高峰(0910为打车高峰),并且我们有备用索引,通过切换索引,避免了故障的发生。

进一步的分析

看起来问题没有这么简单,于是再次仔细阅读官方的讨论,也去爬了一下httpclient的讨论https://lists.apache.org/thread/msdv250tl1kg6snxs89mjdng7qzj3r48 ,大概理解了来龙去脉。
问题原因是es client实际上使用的是httpclient发送http请求到es集群进行通信,因为各种原因(可能是网络抖动,可能是集群抖动)发生了IO异常,这个时候就会抛出异常,如果这个异常不被处理,就会抛给上层,造成上述不可恢复的问题。
那官方为什么不去处理这个异常呢?es官方也说了,他面对的情况太多了,无法顺利的拿到The IOReactor object,这个对象是由 connection manager 来管理的,用户构造他们的方式多种多样,是他们无法控制的,因此他也没办法修复这个,这就有了我们的第一个方案,就是我们自己设置这个异常的处理器。

为什么升级httpclient可以部分解决?

那么又关乎Apache Httpclient有什么关系?为什么有人说升级了httpclient的版本就解决问题了? 那是因为httpcilent的5.0之前是需要用户自己来处理异常,5.0之后他自己设置了一个默认的异常处理器,所以即使你不用设置,也会有个默认的来兜底。当然,老外也吐槽了,Apache httpclient的文档混乱,连一个例子也没有

image.png

老外也很暴躁,也有键盘侠? 上来就吐槽es的开发没有仔细阅读文档,导致产生了bug的
image.png)

为什么我们的代码不生效?

经过我们寻找,发现了这段报错
回过头来,我们再看看我们的代码,可以发现,他处理了两种异常IOException, RunTimeException,而我们这个是由于类冲突,报出来的是NoSuchMethodError,自然也就没办法交给handler来处理了,他不是我们代码里所捕获的异常,因为也解决不了我们的问题了

ioReactor.setExceptionHandler(new IOReactorExceptionHandler() {
                                    @Override
                                    public boolean handle(IOException e) {
                                        log.warn("System may be unstable: IOReactor encountered a checked exception : "
                                                + e.getMessage(), e);
                                        return true; // Return true to note this exception as handled, it will not be re-thrown
                                    }

                                    @Override
                                    public boolean handle(RuntimeException e) {
                                        log.warn("System may be unstable: IOReactor encountered a runtime exception : "
                                                + e.getMessage(), e);
                                        return true; // Return true to note this exception as handled, it will not be re-thrown
                                    }
                                });

为什么最近发生的特别频繁

在搜索之后,发现有人遇到了同样的问题,并且还稳定的复现了https://www.cnblogs.com/quwenli/p/15050215.html,顺着这个思路,我们再看看我们代码,发现在捕获异常的同时,还夹带了一些"私货",不知道从哪拷来的,不过看看也还挺合理,连接数的设置比默认值要好。但是这里面的关键点在于keep alive的时间,720s是远远大于我们服务端(slb的nginx层配置)60s,这就导致了频繁的发生服务端主动断开了keep alive,而客户端还在傻傻的等待,以为自己还是好的这种情况。

  httpClientBuilder.setMaxConnTotal(100);
  httpClientBuilder.setMaxConnPerRoute(50);
  List<Header> headers = new ArrayList<>(2);
  headers.add(new BasicHeader("Connection", "keep-alive"));
  headers.add(new BasicHeader("Keep-Alive", "720"));

解决NoSuchMethodError

继续翻看es client部分的代码,在org.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput这个方法里,按照代码所写他会判断连接的状态,无论什么状态,他都能正确的处理,问题在那个无参的ConnectionClosedException(),IDEA里本地看到的没什么问题,但是把打出来的fat jar包解压之后发现,打在jar包里的Class是一个旧版本的只有一个有参构造函数的ConnectionClosedException,这个肯定就是类冲突了。

  if (handler != null) {
                    if (state.isValid()) {
                        handler.inputTerminated();
                    } else {
                        handler.failed(new ConnectionClosedException());
                    }
                }

解决冲突的过程就不细说了,总之是非常奇怪,通过maven dependency:tree以及IDEA自带的Dependency Analyzer都没看到冲突,而且看到的版本都是正确的高版本ConnectionClosedException,可以从tree里看到依赖的版本明明是4.4.13,在也没有其他的地方有引入这个依赖,但是打到fat jar里却是4.4.6,都不知道这个4.4.6的版本哪来的。

由于这个类在httpcore包里,因此我把httpcore单独的申明出来,再到fat jar里解压出来确认打进去的是正确的版本之后,再继续测试

总结

到现在我们知道了问题所在了,由于keep alive的时间设置不合理,导致es的服务端断开了连接,而client由于类冲突,在解决Closed问题的途中,抛出了NoSuchMethodError,而这个错误又不是handler所能处理的,因此这个 I/O reactor的错误就一直报下去。
因为这个连接已经坏了,却不能正常的关闭,每次拿到这个连接去发送请求的时候都会发生错误。
我们遇到的情况,不是官方讨论里的IOException的情况,而是自己的类冲突带来的非正常关闭。正因为每个人遇到的情况都不同,所以官方讨论的代码有人能解决,有人却无法解决。
遇到问题,我们还是要深入去理解和分析,理解里面的原因,才能找到问题,别人的答案可能不一定适合你

解决和验证

我们尝试了两个方案,既然是keep alive设置不合理导致的连接断开,那么我们把keep alive设置的少于服务端的时长避免断开,不就可以解决问题了。
另外一个方面,由于类冲突导致的close情况无法正常处理,这个是本质,把类冲突解决之后,close就能正常处理,即使发生了各种原因的关闭(可能不止 keep alive这种情况),程序也能正常处理
同时,我们的handler也需要添加,防止网络抖动抛出的IOException带来的问题。
这几天一直关注修改后的代码运行情况,直到看到这条日志,能证明我们代码上去之后,又发生了连接关闭事件,不同的是,这次程序能够正确的处理连接关闭的问题,并且继续运行下去,不会出错。

建议

由于我们用的是相同的依赖,都是用的数据平台释放给我们的父依赖关系,所以看到生产还有大量的报错,并且一定也发生了数据丢失,至于数据丢失是否重要,对业务有没有影响,这个不知道是谁的业务,快去检查下数据丢失吧
只要是flink往es写数据的,用的是这个依赖包的依赖关系的,应该都有同样的问题,去看看taskmanager.err日志里看看有没有NoSuchMethoError的报错,应该都能参照我们的方案解决

参考文档

https://github.com/elastic/elasticsearch/issues/49124
https://lists.apache.org/thread/msdv250tl1kg6snxs89mjdng7qzj3r48
https://www.cnblogs.com/quwenli/p/15050215.html

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注