4.1简介Storm可以保证spout发出的每条消息都得到完整的处理。本章将描述Storm系统如何实现这一目标,并详细说明开发人员应如何使用Storm的这些机制来实现可靠的数据处理。4.2理解消息被完全处理从一个spout发送的一条消息(元组)可能会导致成百上千条消息基于这条消息被创建。我们想一下流式“单词统计”的例子:storm任务每次从数据源(Kestrel队列)中读取一个完整的英文句子;把这个句子分解成独立的词,***,实时输出每个句子的词和出现的次数。在这个例子中,从spout发送的每条消息(每个英文句子)都会触发创建许多消息,从句子中分离出来的单词就是创建的新消息。这些消息形成一个树结构,我们称之为“元组树”,它看起来像图1:在图1示例元组树中,Storm在什么情况下认为从spout发送的消息已被完全处理??答案是同时满足以下条件:元组树不再生长树中的任何消息被标记为“已处理”如果在指定时间内,从消息派生的元组树还没有完全成功处理,则此消息被视为未完全处理。这个超时值可以通过任务级参数Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS进行配置,默认超时值为30秒。4.3消息生命周期如果消息处理完或者没有处理完,Storm将如何进行下一步的操作?为了弄清楚这个问题,我们来研究一下从spout发出的消息的生命周期。下面是spout应该实现的接口列表:首先,Storm使用spout实例的nextTuple()方法从spout请求消息(元组)。收到请求后,spout使用open方法中提供的SpoutOutputCollector将一条或多条消息发送到其输出流。每次发送消息时,Spout都会为消息提供一个消息ID,用于标识消息。假设我们从kestrel队列中读取一条消息,Spout会使用kestrel队列为这条消息设置的ID作为这条消息的消息ID。向SpoutOutputCollector发送消息的格式如下:接下来,这些消息会被发送给bolts进行后续的业务处理,Storm会跟踪这个消息产生的新消息。当检测到消息派生的元组树已经处理完毕,Storm会调用Spout中的ack方法,将消息的messageID作为参数传入。同理,如果某条消息处理超时,会调用这条消息对应的Spout的fail方法,调用时会传入这条消息的messageID作为参数。注意:一条消息只会被发送它的spout调用为ack或fail。如果系统中的一个spout由多个任务运行,则该消息将仅由创建它的spout任务应答(确认或失败),而永远不会被其他spout任务应答。我们继续以从kestrel队列中读取消息为例来说明spout在高可靠性下需要做什么(假设spout的名字是KestrelSpout)。下面简单介绍一下kestrel消息队列:当KestrelSpout从kestrel队列中读取到一条消息时,就意味着它“打开”了队列中的一条消息。这意味着消息并没有真正从队列中删除,而是将消息设置为“待处理”状态,等待客户端的响应,直到收到响应后消息才会真正从队列中删除.处于“待定”状态的消息将不会被其他客户端看到。此外,如果客户端意外断开连接,则该客户端“打开”的所有消息都将重新排队。当一条消息被“打开”时,kestrel队列也会为该消息提供一个唯一的标识符。KestrelSpout使用此唯一标识符作为此元组的messageID。稍后调用ack或fail时,KestrelSpout会将ack或fail连同messageID一起发送到kestrel队列,kestrel会将消息真正从队列中删除或放回队列中。#p#4.4可靠相关API为了使用Storm提供的可靠处理特性,我们需要做两件事:每当在元组树中创建一个新节点时,我们需要显式通知Storm;,我们需要告诉Storm元组树的改变状态。通过以上两步,storm可以检测到一个tupletree何时处理完毕,并调用相应的ack或fail方法。Storm提供了简单明了的方法来完成以上两个步骤。将新节点添加到元组树中指定的节点称为锚定。锚定在我们发送消息的同时发生。为了更容易说明问题,我们以下面的代码为例。此示例的螺栓将包含整个句子的消息分解为一系列子消息,每个子消息包含一个单词。每条消息都以这种方式锚定:输入消息作为emit方法的第一个参数。因为词消息是锚定在输入消息上的,所以输入消息是spout发送的元组树的根节点。如果任何一个词消息处理失败,将重新发送派生元组树的spout消息。相反,让我们看看Storm在使用下面的方法发出消息时是如何处理的:如果以这种方式发送消息,消息将不会被锚定。如果此元组树中的消息处理失败,则不会重新发送派生此元组树的根消息。根据任务的容错级别,有时发送非锚定消息是合适的。输出消息可以锚定到一个或多个输入消息,这在进行连接或聚合时很有用。多锚消息处理失败将导致与其关联的多个spout消息被重发。多个锚点是通过在emit方法中指定多个输入消息来实现的:多个锚点会将锚定的消息添加到多个元组树中。注意:多次绑定可能会破坏传统的树结构,从而形成DAGs(有向无环图),如图2所示:图2由多个anchor组成的菱形结构Storm的实现可以像树一样处理DAG。锚指示如何将消息添加到指定的元组树。高可靠处理API的下一部分将向您描述当我们处理完元组树中的单个消息时我们应该做什么。这些是通过OutputCollector的ack和fail方法实现的。回过头来看SplitSentence的例子,可以发现,当所有的单词消息发送完毕后,代表句子的输入消息就会被应答(acked)。处理的每条消息都必须指示成功或失败(确认或失败)。Storm使用内存来跟踪每条消息的处理,如果处理完的消息没有被回复,内存迟早会被耗尽!>很多bolt都遵循一个特定的处理流程:读取消息,发送从中派生的子消息,并在执行结束时回复这条消息。一般的过滤器(filter)或者简单的处理函数都是这样的应用。Storm有一个BasicBolt接口,封装了上述过程。示例SplitSentence可以使用BasicBolt重写:这样代码比以前稍微简单一些,但功能是一样的。发送给BasicOutputCollector的消息会自动锚定到输入消息上,执行完成后会自动回复输入消息。在许多情况下,消息需要延迟确认,例如聚合或连接。只有从一组输入消息中获得结果后,所有先前的输入消息才会得到答复。并且大多数时候聚合和连接都是输出消息的多个锚点。然而,这些功能都不是IBasicBolt可以处理的。#p#4.5tupletreeStorm的高效实现系统中有一组特殊的任务叫做“ackers”,负责跟踪DAG(DirectedAcyclicGraph)中的每条消息。每当它发现DAG已被完全处理时,它就会向创建此根消息的spout任务发送一个信号。通过配置参数Config.TOPOLOGY_ACKERS可以设置拓扑中acker任务的并行度。acker任务的默认并行度为1,当系统中有大量消息时,应适当提高acker任务的并行度。为了理解Storm的可靠性处理机制,我们从研究消息的生命周期和元组树的管理入手。创建消息时(无论是在spout还是在bolt中),系统都会分配一个64位随机值作为消息的id。acker使用这些随机id来跟踪从spout消息派生的元组树。每条消息都知道它所在的元组树对应的根消息的id。每当bolt生成新消息时,元组树中根消息对应的messageId被复制到消息中。当此消息被确认时,它会将有关元组树中更改的信息发送给跟踪树的acker。比如他会告诉acker:这条消息已经被处理了,但是我又衍生出一些新的消息,请帮忙追踪。例如,假设消息D和E是从消息C派生的,这演示了当消息C被应答时元组树如何变化。因为在从树中删除C的同时将D和E添加到元组树中,所以不会过早地认为元组树已完全处理。让我们仔细看看Storm是如何跟踪元组树的。如前所述,系统中可以有任意数量的acker,那么它如何知道在创建或回复消息时应该通知哪个acker?系统使用hash算法根据spout消息的messageId来判断是哪个acker在跟踪这个消息派生的tupletree。因为每条消息都知道其对应的根消息的messageId,所以它知道应该与哪个acker通信。当一个spout发送消息时,它会通知对应的acker一个新的根消息已经生成,acker会创建一个新的tupletree。当acker发现这棵树处理完了,就通知对应的spout任务。如何跟踪元组?系统中有几万条消息,如果我们为spout发送的每一条消息都建一棵树,内存会很快耗尽。因此,必须采用不同的策略来跟踪每条消息。得益于新的跟踪算法,Storm只需要固定数量的内存(大约20字节)来跟踪一棵树。这个算法是Storm正确运行的核心,也是Storm***的突破口。acker任务持有spout消息id到一对值的映射。第一个值是spout的任务ID。通过这个id,acker知道消息处理完成后通知哪个spout任务。第二个值是一个64位的数字,我们称之为“ackval”,它是树中所有消息的随机id的异或结果。ackval代表整棵树的状态。不管树有多大,只需要这个固定大小的数字就可以跟踪整棵树。创建并回复消息时,将发送相同的消息ID进行XOR。每当acker发现一棵ackval为0的树时,它就知道这棵树已被完全处理。因为消息的随机ID是一个64bit的值,所以在处理树之前ackval被设置为0的概率很小。假设你每秒发送10,000条消息,从概率上讲,至少需要50,000,000年才有可能出现错误。即便如此,如果消息确实无法处理,只会导致数据丢失!4.6选择合适的可靠性级别Acker任务是轻量级的,因此在拓扑中不需要太多的Acker。您可以通过StormUI观察acker任务的吞吐量。如果看起来吞吐量不够,则意味着您需要添加额外的ackers。如果您不需要处理每条消息(您允许在处理过程中丢失一些信息),您可以关闭消息的可靠处理以获得更好的性能。关闭对消息的可靠处理意味着系统中的消息数量减半(每条消息都不需要确认)。另外,关闭消息的可靠处理可以减少消息的大小(不需要为每个元组记录它的根id),从而节省带宽。处理消息的可靠处理机制有3种方式:设置参数Config.TOPOLOGY_ACKERS为0,通过该方法,当Spout发送消息时,会立即调用其ack方法;第二种方式是Spout发送一条消息,不指定这条消息的messageID。当您需要关闭特定消息的可靠性时,可以使用此方法;***,如果不关心某个消息派生出的子消息的可靠性,那么这个消息派生出的子消息在发送时就不要锚定,即emit中不指定输入消息方法。由于这些后代消息未锚定在任何元组树中,因此它们的失败不会导致任何spout重新发送消息。4.7集群各级容错到现在为止,你已经了解了Storm的可靠性机制,知道了如何选择不同的可靠性级别来满足你的需要。下面我们来研究一下Storm是如何在各种情况下保证数据不丢失的。3.7.1Task-levelfailurebolt任务崩溃导致的消息没有得到回复。此时,acker中与这个bolt任务关联的所有消息都会超时失败,并调用相应spout的fail方法。acker任务失败。如果acker任务本身失败,它在失败之前持有的所有消息都将因超时而失败。Spout的fail方法将被调用。Spout任务失败。在这种情况下,连接到Spout任务的外部设备(例如MQ)负责消息的完整性。例如,当客户端出现异常时,kestrel队列会将所有处于pending状态的消息放回队列中。4.7.2任务槽(slot)失败worker失败。每个worker包含多个bolt(或spout)任务。主管负责监督这些任务。当一个worker出现故障时,supervisor会尝试在本地重启它。主管失败了。supervisor是无状态的,所以supervisor失效不会影响当前运行的任务,只要及时重启即可。supervisor不自举,需要外部监控及时重启。光轮失败。Nimbus是无状态的,所以nimbus失效不会影响当前正在运行的任务(当nimbus失效时,无法提交新的任务),只要及时重启即可。nimbus不自举,需要外部监控才能及时重启。4.7.3.Clusternode(machine)failure风暴集群中的节点故障。此时nimbus会将本机上所有正在运行的任务转移到其他可用的机器上。zookeeper集群中的节点故障。Zookeeper保证只有不到一半的机器还能正常运行,故障机器能及时修复。4.8小结本章介绍了storm集群如何实现可靠的数据处理。借助创新的元组树跟踪技术,Storm通过数据响应机制高效保证数据不丢失。除nimbus外,storm集群中没有单点,任何节点都可以发生故障而不会丢失数据。nimbus被设计成无状态的,只要能及时重启,就不会影响正在运行的任务。原文链接:http://blog.linezing.com/2013/01/storm%E5%85%A5%E9%97%A8%E6%95%99%E7%A8%8B-%E7%AC%AC%E5%9B%9B%E7
