任务切分DataX完成一个单一的数据同步作业,称为Job,这个Job会做拆分阶段,一个一个拆分成Tasks,每个Task会有一个Reader和Writer任务。拆分成多个任务是为了并发执行。这个并发数就是之前的通道数。我们假设job在split阶段有100个task,channel数为5,那么实际channel数为100和5中的最小值,即5。并发执行的task数不是简单的100/5=20,但是先把任务放到TaskGroup里面,然后把并发分配给TaskGroup。例如每个TaskGroup的并发数为5,通过core.container.taskGroup.channel设置,则TaskGroup的个数为Channel总数/TaskGroup的个数,所有TaskGroup的个数为20/5=4.由于有100个Task,一共有4个TaskGroup,每个TaskGroup有100/4=25个Task。启动任务的第一件事是启动线程池。这个线程池有一个固定的数量,就是taskGroups的数量。每个线程启动一个TaskGroupContainer,每个TaskGroupContainer包含JobId、taskGroupId、channelClazz、configuration等信息。当TaskGroupContainer启动时,它会在taskQueue集合中存储25个任务。另外,会创建一个大小为5的runTasks集合来存放Task执行器TaskExecutor。Task执行器的创建依赖于taskQueue集合。每次从taskQueue集合中取出一个任务创建一个Task执行器并启动,都需要从taskQueue集合中移除该任务。由于每个TaskGroup的并发数为5,因此最多同时有5个Task执行器,即25个任务,其中5个先执行,其中一个或多个执行完后再执行剩余的任务。直到taskQueue集合为空,每一个Task执行器都执行完毕,那么这25个任务才会被执行。Taskexecutor每个Task对应一个Task执行器。Task执行器包括任务的运行配置、taskId、channel、读写线程。Reader线程由ReaderRunner启动,包含插件(如StreamReader$Task)、RecordSender等属性。writer线程由WriterRunner启动,WriterRunner包含插件(如StreamWriter$Task)、RecordReceiver等属性。Task执行器启动时,首先启动WriterRunner线程,然后是ReaderRunner线程。WriterRunner和ReaderRunner的执行顺序如下,左边是Reader,右边是Writer。主要部分是Reader的startRead和Writer的startWriter。首先Reader读取数据,然后封装在Record中,Record中包含数据集Column,byteSize,以及需要的内存memorySize。Column就是我们读取的每一条数据,包括数据的类型,数据的内容等,读取的数据发送到Channel。目前默认是基于内存的MemoryChannel,也可以自行扩展。MemoryChannel维护着Record的阻塞队列队列。当队列中有数据存入时,Writer的读操作就会被唤醒。由于Channel是共享的,Writer会从Channel的队列中读取Reader存储的数据进行业务操作。
