本文转载自微信公众号《一言》,作者一言。转载本文请联系一言公众号。在我们的数据平台产品中,为了简化开发,对Flink做了一层封装,定义了Job和Flow的抽象。一个job其实就是Flink的一个job。每个作业可以定义多个流程。Flow可以理解为Flink的一个数据流。使用作业传递的StreamExecutionEnvironment,可以将包括source和sink在内的多个算子添加到流中。可以使用自定义的@JobFlow注解来配置Job和Flow的关系,这样在执行抽象AbstractJob的run()方法时,可以通过反射获取到该Job下的所有Flow,并运行每一个流可以遍历()方法。在Flow的run()方法中,实际上是根据StreamExecutionEnvironment执行多个算子。为了保证计算的稳定性,Flink提供了不同的重启策略。比如我们设置重启策略为失败率(failure-rate),如果执行任务的错误数达到失败率配置要求,Flink的Worker节点的TaskManager就会重启。如果超过重启次数,任务管理器将停止运行。失败的原因可能有很多,比如Flink集群环境导致的资源不足、网络通信失败等故障,但也有可能是我们写的job在处理流式数据的时候,由于处理不当导致业务异常抛出数据处理,使得Flink认为是失败的。为了减少因为业务原因抛出异常导致TaskManager不必要的重启,需要对我们编写的Flink程序的异常处理机制进行指定。由于Flink的Job是封装的,所以从一开始我就考虑一劳永逸的解决业务异常的问题,即在AbstractJob的run()方法中,捕获我们自定义的业务异常,并将错误信息记录在日志中.异常“吃”,避免因抛出异常导致执行失败,导致TaskManager重启,如:publicabstractclassAbstractFlowimplementsFlow{publicvoidrun(){try{runBare();}catch(DomainExceptionex){//...}}protectedabstractvoidrunBare();}不知道这个处理机制根本抓不到业务异常!为什么?这就要从Flink的分布式机制说起。Client要在Flink集群上执行任务,需要将作业提交给Flink集群的Master节点。Master的Dispatcher接收到Job并启动JobManager。通过分析Job的逻辑视图,了解Job的资源需求,然后向ResourceManager(Standalone模式,如果是YARN,由YARN管理和调度资源)申请这个Job需要的资源。.JobManager将Job的逻辑视图转换为物理视图,将计算任务分发部署到Flink集群的TaskManager中。整个执行过程如下图所示:我们封装的一个flow,在物理上看,其实就是一个job,也就是前面提到的计算任务。一个作业可以包含多个操作员。如果相邻的算子之间没有数据洗牌,并且并行度相同,那么它们将被合并成一条算子链。每个算子或算子链形成一个JobVertex,在执行过程中被视为一个任务(Task)。根据并行度的设置,每个任务包含具有并行度数的子任务(SubTask)。这些子任务是作业调度的最小逻辑单元,对应进程资源中的一个线程。在Flink中,它是一个Slot(如果不考虑Slot共享的话)。假设Flink环境的并行度设置为1,job的前两个算子满足合并算子链的要求,并行度设置为2;之后通过keyBy()等操作符完成数据Shuffle,然后合并到同一个Sink。那么它们之间的关系如下图所示:显然,Flink集群在执行作业时,会对作业进行划分,并将划分后的子任务分发到TaskManager中的各个slot中。一个TaskManager就是一个JVM,一个Slot就是进程中的一个线程。答案是不言而喻的。AbstractFlow之所以无法捕捉到各个operator在执行任务时抛出的业务异常,是因为它们根本不是在一个JVM上执行的,也不在同一个线程中运行。这是分布式开发和本地开发的本质区别。如果不了解Flink的执行原理,可能会疑惑为什么Java的异常处理机制不起作用。在做分布式开发的时候,如果还照搬本地开发的经验,可能真的要撞脑撞血才能看清真相。因此,正确的做法是在每个算子的实现中捕获个别异常,即保证每个算子本身是健壮的,从而尽可能保证作业的健壮性。当然,分布式开发与本地开发的本质区别不仅限于此。例如,分布式开发需要跨进程调用的序列化,对数据一致性的不同要求,以及对异步通信机制和阻塞调用的理解。成员有不同的体验。归根结底,了解分布式开发或分布式系统的底层原理,可以让我们尽快看清真相,避免在不知情的情况下被转移到坑里。
