当前位置: 首页 > 后端技术 > Java

PulsarTieredStorage--S3Demo

时间:2023-04-02 02:08:46 Java

本文将首先介绍Pulsar的分层存储。然后通过一个例子,演示如何在分层存储中使用AmazonS3作为二级存储。简介Pulsar中的每个主题都由一个有序的Segment列表组成。Pulsar只会在其中写入最后一段。除了最后一段,之前的所有段都已经封存,不能再往里面添加数据。这是Pulsar的基于段的存储结构。Pulsar基于段的存储架构支持在一个主题的空间大小内存储整个存储集群。但是,随着要保存的Partition的数据逐渐增多,存储变得昂贵。解决此问题的一种策略是使用分层存储。使用分层存储的一种场景是用户希望将一个比较大的topic的数据保存比较长的一段时间。比如一个topic里面包含了所有客户的点击记录,我们可以用这个topic里面的数据来训练推荐系统。如果保留这个主题的数据,当训练算法改变时,我们可以重新使用这些历史数据来训练新的推荐系统。Pulsar的分层存储特性允许将较旧的积压数据卸载到二级存储进行长期存储,从而释放BookKeeper中的空间并降低存储成本。使用分层存储,我们可以自动将一个Topic中的旧数据段(Segment)从BookKeeper中移动到更便宜的二级存储中。对于客户来说,整个过程是透明和无缝的。当Offload被手动或自动触发时,Broker会把主存中的Segment一个一个地移动到副存中。Pulsar目前支持使用S3、Google-Cloud-Storage作为二级存储。用户可以灵活配置在BookKeeper(一级存储)中保留的Topic的大小,以及数据移动到二级存储后多长时间(默认4小时)从BookKeeper中删除。我们将在本文中以S3为例来演示如何使用Pulsar的这一特性。OperationsWithS3本例使用的是Pulsar-2.1.1版本,主要包括3个步骤:在AmazonS3(https://s3.console.aws.amazon...)中创建一个bucket;下载Pulsar包,为Pulsar的Broker配置Offload,以Standalone模式启动Pulsar;使用Pulsar的producer生成数据,触发Offload,验证。1、在S3中创建Bucket登录AWS控制台,选择S3服务:创建一个bucket,点击“Createbucket”,填写Bucket的名称,然后点击next,点击确认。经过以上操作,可以看到已经创建了一个新的Bucket。确认在这台机器上配置了aws访问。2、下载Pulsar包和配置下载最新的PulsarBinary文件(apache-pulsar-x.x.x-bin.tar.gz),解压,准备修改配置文件conf/standalone.conf。修改配置文件conf/standalone.conf中的Offload选项,设置第一步创建的bucket:managedLedgerOffloadDriver=S3s3ManagedLedgerOffloadBucket=offload-test-awss3ManagedLedgerOffloadRegion=us-east-1修改每个Segment大小配置,使得每个细分更小,更容易产生新的细分。在终端中以standalone模式启动Pulsar:运行命令bin/pulsarstandalone3.在Pulsar中生成数据,触发Offload,在终端中验证是否启动了一个consumer,确保要生成的数据不会被丢弃,因为有没有消费者:bin/pulsar-clientconsume-s“my-sub-name”my-topic-for-offload打开一个新的终端窗口,两次运行以下命令生成2000条消息,并确保其中有两个段主题,以便第一个A段可以移动到S3。bin/pulsar-clientproducemy-topic-for-offload--messages"hellopulsarthisisthecontentforeachmessage"-n1000使用命令行手动触发Offload:bin/pulsar-admintopicsoffload--size-threshold10Kpublic/default/my-topic-for-offload使用命令等待卸载成功:bin/pulsar-admintopicsoffload-statuspublic/default/my-topic-for-offload验证第一个Segment已经被movedtoS3:在s3的终端可以看到ledger-31已经入库了。使用命令行也可以看到这个topic有两个segment,第一个segment的offloadedstatus--ledger-31为true。以上步骤就是使用S3作为二级存储的整个过程。如果想了解更多关于分层存储的知识,请参考Pulsar官网:http://pulsar.apache.org/docs...http://pulsar.apache.org/docs...