当前位置: 首页 > 后端技术 > Java

开源微服务编排框架:NetflixConductor

时间:2023-04-02 01:56:11 Java

简介:本文主要介绍netflixconductor的基本概念和主要运行机制。作者|夜阳源|阿里科技公众号本文主要介绍netflixconductor的基本概念和主要运行机制。1.简介netflixconductor是一个基于JAVA语言编写的开源流程引擎,用于构建基于微服务的流程。它具有以下特点:它允许创建复杂的业务流程,流程中的每个独立任务都由一个微服务来实现。创建基于JSONDSL的工作流来编排任务的执行。工作流在执行过程中是可见和可追溯的。提供suspend、resume、restart等多种控制模型。提供了一种最大限度地重用微服务的简单方法。具有扩展到百万进程并发运行的服务能力。客户端和服务端的分离是通过队列服务实现的。支持HTTP或其他RPC协议进行数据传输2基本概念1TaskTask是最小的执行单元,承载一段执行逻辑,比如发送HTTP请求。SystemTask:由conductor服务执行,这些任务的执行与引擎在同一个JVM中。WorkerTask:由worker服务执行,执行与引擎隔离。worker通过队列获取任务后,执行并更新结果状态给引擎。Worker的实现是跨语言的,使用Http协议与Server通信。Conductor提供了几个内置的SystemTasks:FunctionalTask:HTTP:发送http请求JSON_JQ_TRANSFORM:jq命令执行,一般用户json转换,见jq官方文档KAFKA_PUBLISH:发布kafka消息流程控制Task:SWITCH(原Decision):条件判断Branch,代码中类似于switchcaseFORK:开始并行分支,用于调度并行任务JOIN:汇总并行分支,用于汇总并行任务DO_WHILE:循环,代码中类似于dowhileWAIT:一直运行直到外部时间触发更新节点状态,可用于等待外部操作SUB_WORKFLOW:子流程,执行其他流程TERMINATE:结束流程,以指定输出提前结束流程,可与SWITCH节点配合使用,类似于早期代码中的return语句CustomTask:对于SystemTask,Conductor提供了WorkflowSystemTask抽象类,可以自定义和扩展。对于WorkerTask,可以通过conductor的客户端Worker接口实现执行逻辑。2WorkflowWorkflow由一系列需要执行的任务组成,conductor使用json来描述任务的流转关系。除了基本的顺序流程外,借助内置的SWITCH、FORK、JOIN、DO_WIHLE、TERMINATE任务,还可以实现分支、并行、循环、提前结束等流程控制。3Input&OutputTask的输入是一个映射,它被实例化为工作流的一部分或某些其他任务的输出。允许来自工作流或其他任务的输入/输出作为后续执行任务的输入。Task有自己的输入和输出,都是jsonobject类型。任务可以使用${taskxxx.output}引用其他任务的输入和输出。参考语法是json-path。除了${taskxxx.output}最基本的值解析方式外,还支持其他复杂的操作,比如过滤等,具体见json-path语法。Workflow启动时可以传入流程的输入数据,Task可以通过${workflow.input}引用。Task实现了原子操作和流程控制操作的处理,Workflow定义了Task的流转关系,Task是指Workflow或其他Task的输入输出。通过这些机制,conductor实现了流程的JSONDSL描述。三个整体架构主要分为几个部分:Orchestrator:负责进程的流动和调度;管理/执行服务:提供流程和任务管理更新等操作;TaskQueues:任务队列,Orchestrator解析出来的待执行任务会放在队列中;Worker:任务执行worker,从TaskQueues中获取任务,并通过ExecutionService更新任务状态和结果数据;Database:元数据&运行时数据库,用于保存运行时的Workflow、Task等状态信息,以及流程任务定义的原始信息;Index:索引数据库,用于存储执行历史;四种运行模型1任务状态转换SCHEDULED:待调度,任务放入队列,未轮询执行状态IN_PROGRESS:正在执行,轮询执行但尚未完成时的状态COMPLETED:执行完成FAILED:执行失败CANCELLED:中止时出现该状态,一般有两种情况:1、手动终止进程时,正在运行的任务会置为该状态;2。多个分叉分支。当一个分支的任务失败时,其他分支中正在运行的任务将设置为该状态;2、任务队列中任务的执行(同步系统任务除外)会先加入到任务队列中,是典型的生产者消费者模式。任务队列是具有延迟和优先级功能的队列;每种类型的任务都是一个单独的队列。另外,如果配置了domain和isolationGroup,会拆分成多个队列,实现执行隔离;决策者服务是生产者,根据流程配置和当前执行情况分析可执行任务,将其加入队列;任务执行者(SystemTaskWorker,Worker)为消费者,长轮询对应的队列,从队列中获取任务执行;队列接口是可插拔的,conductor提供了Dynomite、MySQL和PostgreSQL的实现。3核心功能实现机制conductor调度的核心是decider服务,它根据进程当前运行状态解析出需要执行的任务列表,并将任务入队给worker执行。decision的主要流程简化如下。详细代码见WorkflowExecutor.java的decide方法:其中,调度任务处理流程简化如下。详细代码见WorkflowExecutor.java的scheduleTask方法:decide的触发时机最重要的触发时机:当新开始执行时,当操作系统任务执行完成时,会触发decide操作。当Worker任务通过ExecutionService更新任务状态时,会触发decision操作。控制节点的实现机制1)Task&TaskMapper对于每个Task,都有Task和TaskMapper两部分:Task:任务的执行逻辑代码,其作用是任务的执行task,通过任务的定义和配置,返回实际需要执行的任务列表,当前实例的执行状态等信息对于任务,TaskMapper返回Task本身,补充一些执行实例的状态信息.但是对于控制节点,会有不同的逻辑。2)条件分支(SWITCH)的实现机制SWITCH用于根据条件判断执行不同的分支。实际上,这个节点的任务不执行任何操作。TaskMapper根据分支条件判断出要走的分支后,返回对应分支的第一个任务。SwitchTaskMapper.javagetMappedTasks方法关键代码://需要调度的任务列表,最后返回结果ListtasksToBeScheduled=newLinkedList<>();//evalResult是分支条件变量(case)的值//decisionCases是一个Map结构,key是分支的case值,value是对应分支的任务定义列表(会有多个taskdefinitionsinthebranch)//根据分支变量的实际值,获取对应分支的任务定义列表ListselectedTasks=taskToSchedule.getDecisionCases().get(evalResult);//默认逻辑:如果获取不到对应的分支或者分支为空,使用默认分支()){//获取分支的第一个(下标0)任务,返回给decider服务进行调度(decider会将任务加入队列,交给worker执行)WorkflowTaskselectedTask=selectedTasks.get(0);//调用deciderService的getTasksToBeScheduled方法,在该方法中从TaskMapper中获取MappedTasks。这里使用递归调用解析嵌套的TaskListcaseTasks=taskMapperContext.getDeciderService().getTasksToBeScheduled(workflowInstance,selectedTask,retryCount,taskMapperContext.getRetryTaskId());tasksToBeScheduled.addAll(caseTasks);switchTask.getInputData().put("hasChildren","true");}returntasksToBeScheduled;3)并行(FORK)实现机制FORK用于开启多个并行分支。实际上这个节点的任务并没有做任何操作,TaskMapper返回的是所有并行分支的第一个任务。ForkJoinTaskMapper.javagetMappedTasks关键代码://待调度的Task列表,最后返回结果ListtasksToBeScheduled=newLinkedList<>();//配置中的所有fork分支List>forkTasks=任务计划。getForkTasks();for(Listwfts:forkTasks){//为每个分支取第一个任务WorkflowTaskwft=wfts.get(0);//调用deciderService的getTasksToBeScheduled方法,在TaskMapper调用getMappedTasks的这个方法中获取。这里使用递归调用解析嵌套的TaskListtasks2=taskMapperContext.getDeciderService().getTasksToBeScheduled(workflowInstance,wft,retryCount);tasksToBeScheduled.addAll(tasks2);}returntasksToBeScheduled;分支(SWITCH)和并行(FORK)节点本身没有执行逻辑。它们通过TaskMapper返回实际要执行的Task,然后交给DeciderService处理。重试的实现机制重试及其延迟时间的设置都是借助任务队列的功能实现的。重试:将任务重新添加到任务队列中重试延迟时间:设置添加到任务队列时的延迟时间。延迟时间过后,可以在队列中轮询任务,实现五完整性保证机制。由于调度过程中可能出现机器重启、网络异常、JVM崩溃等偶然情况,会导致决策进程意外终止、进程执行不完整、进程一直在运行(实际上不再预定),或显示其他状态错误等。1WorkflowReconciler针对这种情况,conductor有一个WorkflowReconciler,它会周期性的尝试判断所有正在运行的进程,修复进程执行的一致性。另外,它还有一个作用就是验证进程超时。2decideQueue那么WorkflowReconciler是如何获取当前正在运行的进程的呢?答案是decideQueue。decideQueue和任务队列一样,也是一个带有延时功能的队列,里面存放的是正在执行的进程的实例id。当任务开始执行时(包括新开始执行、重试执行、恢复执行、重新运行执行等),实例id会被推送到decideQueue;当执行结束(成功、失败)时,实例id将从decideQueue中删除。3ExecutionLockServiceWorkflowReconciler会周期性尝试决定所有正在运行的进程进行超时判断,保持进程一致性。但是,流程本身的正常执行也会触发一个决定。如果同一个执行同时触发两个决策,可能会造成状态混乱、执行卡顿等问题。Conductor使用锁来解决这个问题。它提供了三种实现:单机LocalOnlyLock(基于semaphore)、redis分布式锁(基于redission)、zookeeper分布式锁。decide方法一开始会尝试获取锁,获取失败直接返回。锁用于确保决策不会在同一个流程实例上并发执行。if(!executionLockService.acquireLock(workflowId)){returnfalse;}由于锁是可配置的,所以可能会导致一种误解:单机不需要配置锁。其实单机也是需要配置锁的,因为WorkflowReconciler会和流程的正常执行发生冲突,偶尔会造成流程状态混乱。原文链接本文为阿里云原创内容,未经许可不得转载。