编者按:文章介绍了EasyScheduler的架构设计以及各个组件的工作原理。如果你在工作中遇到需要大数据工作流的场景,可以尝试使用这种架构。由于文章比较长,建议先收藏书签再阅读。EasyScheduler大数据工作流调度系统已经开源,下载地址:https://github.com/analysys/在讲解调度系统的架构之前,我们先了解一下调度系统的常用术语。名词解释DAG:全称DirectedAcyclicGraph,简称DAG。工作流中的Task任务以有向无环图的形式组装起来,从入度为0的节点开始进行拓扑遍历,直到没有后继节点。例如流程定义如下图所示:通过拖拽任务节点,建立任务节点关联形成的可视化DAG流程实例:流程实例是流程定义的实例化,任务实例可以通过手动启动生成orscheduled调度:一个任务实例就是一个流程定义中间任务节点的实例化标识了具体的任务执行状态任务类型:目前支持SHELL、SQL、SUB_PROCESS、PROCEDURE、MR、SPARK、PYTHON、DEPENDENT,并计划支持动态插件扩展。注意:子SUB_PROCESS也是一个单独的进程定义,是一种可以独立启动和执行的调度方式:系统支持定时调度和基于cron表达式的手动调度。命令类型支持:启动工作流,从当前节点开始执行,恢复容错工作流,恢复暂停进程,从失败节点开始执行,补充,调度,重新运行,暂停,停止,恢复等待线程。其中恢复容错工作流和恢复等待线程两种命令类型由调度内部控制使用,外部无法调用定时调度:系统采用quartz分布式调度器,支持生成同时cron表达式的可视化依赖:系统不仅支持DAG简单的前驱节点和后继节点之间的依赖,还提供了任务依赖节点,支持进程间自定义任务依赖优先级:支持进程实例和任务实例的优先级,如果流程实例和任务实例的优先级不设置,默认先进先出邮件告警:支持SQL任务查询结果邮件告警,流程实例运行结果邮件告警,容错告警通知失败策略:对于并行运行的任务,如果任意一个任务失败,提供两种失败策略模式,continuemeans不考虑并行运行任务的状态,直到进程失败结束。结束的意思是一旦发现失败的任务,同时杀死正在运行的并行任务,进程失败结束。补码:对历史数据进行补码,支持区间并行和串行补码方式。概念上,MasterServer主要负责DAG任务切分,任务提交监控,同时监控其他MasterServers和WorkerServers的健康状态。MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点的变化进行容错处理。该服务主要包括:DistributedQuartz分布式调度组件,主要负责定时任务的启停操作。当quartz启动任务时,Master内部会有一个线程池负责任务的后续操作。MasterSchedulerThread是一个定时扫描的扫描线程。数据库中的命令表根据不同的命令类型进行不同的业务操作。MasterExecThread主要负责DAG任务切分,任务提交监听,以及各种命令类型的逻辑处理。MasterTaskExecThread主要负责任务持久化。WorkerServerWorkerServer同样采用分布式无线中心化的设计理念,WorkerServer主要负责任务执行和提供日志服务。当WorkerServer服务启动时,它向Zookeeper注册临时节点并维护心跳。该服务包括:FetchTaskThread主要负责不断从TaskQueue中接收任务,并根据不同的任务类型调用TaskScheduleThread对应的执行器。LoggerServer是一个RPC服务,提供查看、刷新、下载日志碎片等功能。ZooKeeperZooKeeper服务。系统中的MasterServer和WorkerServer节点使用ZooKeeper进行集群管理和容错。此外,系统还基于ZooKeeper进行事件监听和分布式锁。我们也实现了基于Redis的队列,但是我们希望EasyScheduler依赖的组件越少越好,所以我们最终去掉了Redis的实现。TaskQueue提供任务队列的操作,目前是基于Zookeeper实现的。由于队列中存储的信息较少,因此无需担心队列中的数据过多。事实上,我们对***数据存储队列进行了压力测试,对系统稳定性和性能没有任何影响。Alert提供告警相关接口,主要包括两类告警数据的存储、查询、通知功能。其中通知功能有两个:邮件通知和**SNMP(尚未实现)**。APIAPI接口层主要负责处理来自前端UI层的请求。服务统一??提供RESTfulapi对外提供请求服务。该接口包括工作流的创建、定义、查询、修改、发布、下线、手动启动、停止、暂停、恢复、从此节点执行等。UI系统的前端页面提供了系统的各种可视化操作界面。有关详细信息,请参阅**系统用户手册**部分。架构设计思想DecentralizationvsCentralization集中化的设计理念比较简单。分布式集群中的节点根据角色分为两种角色:Master的角色主要负责任务分发和监控Slave的健康状态,可以动态均衡任务给Slave,让Slave节点将不要处于“忙死”或“闲死”的状态。Worker的作用主要是负责任务的执行和维护Master的心跳,以便Master给Slave分配任务。中心化思想设计的问题:一旦Master出现问题,group就会没有leader,整个cluster就会崩溃。为了解决这个问题,大多数Master/Slave架构模型都采用主备Master的设计方案,可以是热备也可以是冷备,也可以是自动切换也可以是手动切换,越来越多的新系统开始有能够自动选举和切换Master以提高系统可用性。还有一个问题是,如果Scheduler在Master上,虽然可以支持一个DAG中的不同任务运行在不同的机器上,但是会造成Master的过载。如果Scheduler在Slave上,则一个DAG中的所有任务只能在某台机器上提交。当并行任务较多时,Slave的压力可能会更大。去中心化在去中心化设计中,通常没有Master/Slave的概念,所有角色都是一样的,地位是平等的。全球互联网是一个典型的去中心化分布式系统,任何节点设备连接到网络下机,只会影响小范围的功能。去中心化设计的核心设计是整个分布式系统中没有区别于其他节点的“管理者”,因此不存在单点故障。但是,由于没有“管理”节点,每个节点都需要与其他节点进行通信以获取必要的机器信息,而分布式系统不可靠的通信大大增加了上述功能的实现难度。事实上,真正去中心化的分布式系统很少见。相反,动态集中式分布式系统正在出现。在这种架构下,集群中的管理者是动态选择的,而不是预先设定的,当集群发生故障时,集群的节点会自发地召开“会议”,选举新的“管理者”来主持工作。最典型的案例就是Go语言实现的ZooKeeper和Etcd。EasyScheduler的去中心化是Master/Worker注册在Zookeeper中,Master集群和Worker集群是无中心的,利用Zookeeper分布式锁选举Master或Worker其中之一作为“管理者”来执行任务。分布式锁实践EasyScheduler使用ZooKeeper分布式锁实现只有一个Master同时执行Scheduler,或者只有一个Worker执行任务的提交。获取分布式锁的核心流程算法如下:EasyScheduler中Scheduler线程分布式锁实现流程图:线程不足,循环等待问题如果一个DAG中没有子进程,如果Command中的数据项数较多超过线程池设置的阈值,则直接Process等待或失败。如果在一个大DAG中嵌套了很多子进程,下图会产生“死等待”状态:上图中,MainFlowThread等待SubFlowThread1结束,SubFlowThread1等待SubFlowThread2结束,SubFlowThread2等待SubFlowThread3结束,而SubFlowThread3在线程池中等待新的线程,整个DAG进程无法结束,所以里面的线程无法释放。这样就形成了子-父进程循环等待状态。这个时候除非启动一个新的Master增加线程来打破这样的“死锁”,否则调度集群将不再可用。开一个新的Master来打破僵局似乎有点不尽如人意,所以我们提出了以下三种方案来降低这种风险:计算所有Master的线程总和,然后计算每个DAG需要的线程数,即,在DAG流程执行前做预计算。因为是多Master线程池,所以无法实时获取线程总数。判断单个Master线程池。如果线程池满了,就让线程直接挂掉。添加资源不足的命令类型。如果线程池不足,主进程就会被挂起。这样线程池就有了新的线程,可以重新唤醒因资源不足而挂起的进程的执行。注意:MasterScheduler线程在得到Command时以FIFO方式执行。所以我们选择了第三种方式来解决线程不足的问题。容错设计容错分为服务宕机容错和任务重试。服务宕机容错分为Master容错和Worker容错。宕机容错服务容错设计依赖于ZooKeeper的Watcher机制。实现原理如图:其中Master监控其他Master和Worker的目录。如果检测到移除事件,流程实例或任务实例将根据具体的业务逻辑进行容错。Master容错流程图:ZooKeeperMaster容错完成后,会被EasyScheduler中的Scheduler线程重新调度,遍历DAG寻找“正在运行”和“提交成功”的任务,并监控其任务实例的状态对于“正在运行”的任务,是“提交成功”的任务需要判断TaskQueue中是否已经存在,如果存在则监听任务实例的状态,如果不存在则重新提交任务实例。Worker容错流程图:一旦MasterScheduler线程发现任务实例处于“需要容错”状态,就会接管任务并重新提交。注意:由于“网络抖动”,节点可能会在短时间内失去与ZooKeeper的心跳,导致节点的remove事件。对于这种情况,我们采用最简单的方法,即一旦节点与ZooKeeper的连接超时,则直接停止Master或Worker服务。任务失败重试这里首先要区分任务失败重试、进程失败恢复、进程失败重跑的概念:任务失败重试是任务层面的,由调度系统自动执行。比如一个Shell任务设置重试次数为3次,那么当Shell任务运行失败后,最多会尝试运行该进程3次。进程故障恢复是在进程级别,是手动执行的。恢复只能从故障节点或当前节点执行。运行也是进程级的,手动完成,重新运行从start节点开始。接下来,我们将工作流中的任务节点分为两种类型。一种是业务节点,对应一个实际的脚本或处理语句,如Shell节点、MR节点、Spark节点、依赖节点等。还有一种是逻辑节点,不做实际的脚本或语句处理,但仅对整个流程流程进行逻辑处理,例如子流程部分。每个业务节点都可以配置失败的重试次数。当任务节点失败时,会自动重试,直到成功或超过配置的重试次数。逻辑节点不支持失败重试。但是逻辑节点中的任务支持重试。如果工作流中的任务未能达到最大重试次数,工作流将失败并停止,失败的工作流可以手动重新运行或进程恢复。任务优先级设计在早期的调度设计中,如果没有优先级设计,如果采用公平调度设计,先提交的任务可能会和后提交的任务同时完成,无法设置优先级流程或任务,所以我们重新设计了这个。目前我们的设计是这样的:不同流程实例的优先级高于同一个流程实例。同一进程内任务的优先级高于同一进程内任务的优先级。任务按从高到低的顺序处理。流程定义的优先级是考虑到一些流程需要在其他流程之前被处理。这可以在进程启动或计划启动时配置。总共有5个级别,分别是HIGHEST、HIGH、MEDIUM、LOW和LOWEST。如下图所示,具体实现是根据任务实例的json解析优先级,然后在ZooKeeper任务队列中保存进程实例priority_processinstanceid_taskpriority_taskid信息。从任务队列中获取时,通过字符串比较,可以得出最需要执行的任务的优先级也分为5个级别,分别是HIGHEST、HIGH、MEDIUM、LOW和LOWEST。如下图所示,Logback和gRPC实现了日志访问。由于Web(UI)和Worker不一定在同一台机器上,查看日志不能像查询本地文件一样。有两种选择:将日志放在ES搜索引擎上通过gRPC通信获取远程日志信息考虑到EasyScheduler尽可能轻便,选择gRPC实现远程访问日志信息。总结本文从调度入手,初步介绍了大数据分布式工作流调度系统——EasyScheduler的架构原理和实现思路。
