当前位置: 首页 > 网络应用技术

[源代码分析] TensorFlow分布式colusterCoordinator

时间:2023-03-06 16:42:00 网络应用技术

  在本文中,我们主要研究参数如何分发计算,这是ClusterCoordinator运行的方式。这是TF分布式的最后一篇文章。

  本系列中的其他文章如下:

  [翻译] TensorFlow分布式论文大规模机器学习异质分布

  [翻译] TensorFlow分布式论文“在Tensorflow中的控制流的实现”

  [源代码分析] TensorFlow分布式环境(1)---总体体系结构

  [源代码分析] TensorFlow分布式环境(2)---主静态逻辑

  [源代码分析] TensorFlow分布式环境(3)---工人静态逻辑

  [源代码分析] TensorFlow分布式环境(4)--- WorkerCache

  [源代码分析] TensorFlow分布式环境(5)---会话

  [源代码分析] TensorFlow分布式环境(6)---主动力学逻辑

  [源代码分析] TensorFlow分布式环境(7)---工人动态逻辑

  [源代码分析] TensorFlow分布式环境(8)---通信机制

  [翻译]使用TensorFlow进行分布式培训

  [源代码分析] TensorFlow分布式分布式strategy的基本文章

  [源代码分析] TensorFlow的分布式变量

  [源代码分析] TensorFlow分布式MirroredStrategy

  [源代码分析] TensorFlow分布式参数Verstrategy V1

  [源代码分析] TensorFlow分布式参数Verstrategy V2

  TensorFlow 2建议使用中央协调的体系结构进行参数服务器培训。每个工作人员和参数服务器运行tf.distribution.server。基于此,协调员任务负责创建资源,调度函数以及对工人和参数服务器进行协调培训。协调员使用tf.distribution.experiment.coordinator.clustercoordinator.clusterCoordinator来协调群集并使用tf.distribution.distribution.experimenter.parameterserstraterstraterategygy.parameterstraterstergygy为了定义参数服务器上的变量和工人的计算。

  colusterCoordinator是安排和协调远程函数的对象。该类用于为远程TensorFlow服务器创建容忍错误的资源和调度函数。该类不支持独立使用,应与TF.Distribution策略一起使用。旨在与之合作的旨在合作。colusterCoordinator类目前仅适用于tf.distribution.experental.parameterserserserstrategy。

  使用ParameterServersestergy之后,可以使用所有计算,用户可以使用tf.distribution.experental.coordinator.clustercoordinter来创建资源并将培训步骤分配给远程工人。

  首先,让我们创建一个clusterCoordinator对象并传递策略对象。

  其次,创建一个属于每个工人的数据集和一个迭代器。GPU。

  最后一步是使用ColusterCoordinator.schedule分配给远程工人。

  以下是远程值的结果。

  用户还可以启动所有步骤(步骤)并在等待完成时做某事。

  根据以前的代码,我们总结了问题如下:

  接下来,我们尝试通过分析代码回答这些问题。

  ColusterCoordinator的主要思想如下。

  clusterCoordinator定义了特定的特定。我们可以看到它主要使用_Strategy成员变量配置,该变量生成_Cluster成员变量。

  ColusterCoordinator对象提供的最重要的API是时间表,它将为Worker分配tf.功能以执行异步。细节如下:

  失败和容错的策略如下:

  时间表的具体定义如下,并且将以FN作为参数之一引入数据迭代器。

  联接方法的作用是阻止直到执行所有计划功能。具体特征如下:

  完成的方法返回是否已执行所有分布功能。如果任何先前分布式的函数都会导致错误,则完成'将失败。

  获取将获得远程值的结果。

  除了计划远程功能外,ClusterCoordinator还帮助所有工人创建数据集,并在从失败中恢复的工作者可以通过调用DataSet_fn在Worker设备上创建数据集时重建这些数据集。使用示例如下:如下:

  上面的代码使用create_per_worker_dataset在Worker上创建数据集。这些数据集由DataSet_FN生成,并返回了代表这些数据集的集合。ITER将返回此类数据集中的tf.distribution.experental.coordinator.perworkervalues。它是迭代器的集合,并将迭代器放置在每个工人上。

  应该注意的是,迭代器的“ perworkervalues”上直接调用“ next”。迭代设备应传递给tf.distribution.experental.coordinator.clordcoordinter.schedule作为参数。该函数由工人执行,将接收与工人相对应的单个迭代器。此功能可以调用迭代器的下一个方法。

  目前,时间表方法假设工人是相同的,因此假设不同工人的数据集是相同的,除非包括数据集,否则未设置任何随机种子。在这种情况下,将是不同的。因为这样做建议,建议重复数据集无限并安排有限的步骤,而不是依靠OutofrangeRangeRangeRrangeRornorRorrrrrrrro。

  get_per_worker_dataset返回perworkerdatasetfromdataset或perworkerdatasetfromdatasetFunction。

  PerWorkerDistributedDaset表示由数据集建立的工人使用的分布式数据集。

  PerWorkerDistributedDaset表示由数据集方法建立的工人使用的分布式数据集。

  iTer中有:

  _CREATE_PER_WORKER_RESOURCES调用每个工人的方法,以使每个工人获取数据。

  PerWorkerValues是一个容纳值列表的容器,每个工人对应于一个值。每个值都位于相应的工人上。当它用作tf.distribution.experental.clustercoordinaster.schegs或argschegs时,工人的特定值将传递给工人的功能。

  创建tf.distribution.experiment.coordinator.perworkervalues的唯一途径是在clusterCoordinator.create_perter_dataset.t返回的分布式数据集上调用ITER。Perworkervalues。

  获得数据的逻辑如下:

  集群是业务执行人。

  簇是工人集群。在初始方法中,将进行以下处理:

  应该在这里注意到如何忽略由于工人的瞬时连接错误的故障。

  此类提供的最重要的API是“时间表”/“加入”。“计划” API没有阻止。它将“ tf.function”插入队列中,并立即返回“远程值”。

  特定逻辑如下。虚线表示数据集已传递。这里的队列是six.moves导入队列引入的队列。我们将在_CoordinedClosurequeue中看到它。

  或从官方文档图中,当前的左圆圈已完成。

  停止代码如下,特别是调用队列的处理方法。

  闭合的作用是封装任务并提供其他功能。

  CLOSURE的execute_on负责运行,特别是在指定的设备上执行self._function,即用户定义的函数。请注意,context.executor_scope(worker.executor)使用上下文。

  self._function是用户定义的函数。我们给出另一个方法示例。可以看出,您可以使用straging.run将培训方法分配给远程工人进行培训。

  用户可以设置闭合,该关闭设置为在返回值中设置。

  ResourceClosure是一个衍生类类,可以用远程值包装。实际上,ResourceClosure实际上是使用的。

  _CoordinedClosurequeue是任务所在的队列。

  观点和获取方法负责插入和删除。

  put_back负责重新输入闭合到队列。

  方法等待将等待所有关闭结束。

  Mark_Faird和Done是末端和异常的一组组合。

  停止和_cancel_all_closures负责暂停关闭。

  工人是该功能的执行者。

  工人的定义如下,并且启动了一个线程run_process_queue。

  new_executor调用tfe_newexecutor。

  tfe_newexecutor在tensorflow/c/c_api_experiment.cc中定义,生成tfe_executor。

  TFE_EXECUTOR定义如下。执行人类是会话执行器的抽象。在TF2中,还有EageExecutor。

  _process_queue方法将从队列中取出封闭并运行任务。

  7.2.1等待_process_queue首先致电_maybe_dlay等待环境变量配置。

  7.2.2处理task_process_queue然后将调用_process_closusion运行闭合。

  让我们看一下如何读取工人的数据。如前所述,Create_Resource将调用create_resource,为每个工人建立自己的资源。

  _register_Resource将向工人注册每个工人的资源。

  逻辑如下所示,虚线表示数据流。用户通过PUT方法将闭合到队列中,并且Worker通过PUT方法从队列中获得了闭合执行。

  一系列方法(例如停止)负责停止。

  在这一点上,我们尚未与策略正式相关。让我们以另一个示例查看。在这里,我们会发现,在传递给协调员的方法中,我们将称为“策略”。

  处理故障的总体策略大致如下:

  当工人失败时,特定的逻辑如下:

  当参数服务器失败时,计划,加入或完成时,还可以触发tf.errors.unavailablerror.ine,除了重置故障参数服务器外,用户还应重新启动协调器以将其重新连接到Worker和parameter Server,创建变量并加载检查点。如果协调器失败,则用户将其重置后,程序将自动连接到工作人员和参数服务器,并从检查点继续。由于协调器本身可能不可用。建议使用一些工具不失去培训进度:

  如果成功执行函数,您可以成功获得远程值。这是因为当前执行函数后,返回值将立即复制到协调器中。如果在复制过程中任何工人失败,则将在另一个工人上检索该函数。因此,如果您想优化性能,则可以安排一个没有返回值的函数。

  一旦协调员找到了一个错误,例如unavailableRror或来自参数服务器的其他应用程序错误,例如来自tf.debugging.check_numerics的无效符号,它将取消所有待处理和排队的功能,然后造成错误。。

  导致错误后,协调员不会导致相同的错误或取消函数的任何错误。

  CrosCoordinator假设所有函数误差都是致命的。基于此假设,其错误报告逻辑是:

  WorkerPreemptionHandler是失败过程的主要模块。定义如下:

  8.6.1当在集群中生成配置时,将配置WorkerPreemptionHandler。

  8.6.2等待关闭,您将用Wait_on_failure包裹一层以处理错误。

  WAIR_ON_FAILURE方法的WorkerPreementHandler如下:如下:

  _validate_preemption_failure定义如下:

  8.6.3处理程序WorkerPreemptionHandler有一个背景线程_PReemption_handler_thread。

  _preemption_handler将执行必要的错误。

  根据以前的代码,我们总结了问题如下:

  ★★★★★★思考生活和技术★★★★★★

  微信公共帐户:罗西的思想

  TensorFlow源代码分析

  TensorFlow分布式培训

  TensorFlow分布式原理理解

  TensorFlow体系结构和设计:概述

  Tensorflow交叉 - 设备通信

  Tensorflow章节|Tensorflow 2.x分布式培训概述

  原始:https://juejin.cn/post/7100031504658464804