当前位置: 首页 > 科技观察

如何使用KafkaConnect创建一个开源数据管道来处理实时数据?

时间:2023-03-16 00:13:03 科技观察

【.com速译】KafkaConnect是一个特别强大的开源数据流工具;有了它,将Kafka与其他数据技术一起使用非常容易。作为一种分布式技术,KafkaConnect提供了特别高的可用性和独立于Kafka集群的弹性扩展。KafkaConnect使用source或sink连接器将数据发送到Kafka主题和从Kafka主题发送出去,并且无需代码即可与各种非Kafka技术集成。图1.可靠的开源Kafka连接器可用于许多流行的数据技术,您也有机会编写自己的连接器。本文描述了一个真实世界的数据用例,说明如何使用KafkaConnect将来自Kafka的实时流数据与Elasticsearch(以实现为Kafka记录编制索引的可扩展搜索)和Kibana(以可视化这些结果)相集成。图2的灵感来自CDCCOVID-19数据跟踪器,用于展示Kafka和KafkaConnect优势的用例。基于Kafka的跟踪器使用多种协议从多个位置以多种格式收集实时COVID-19检测数据,并将这些事件处理成易于使用的可视化。追踪器还具有必要的数据治理机制,以确保结果快速到达且值得信赖。我着手寻找一个同样复杂和引人注目的用例——但理想情况下,它不像COVID-19那样令人担忧。最终,我发现了一个有趣的领域:moontide,包括公开可用的流式RESTAPI和简单JSON格式的丰富数据。农历潮汐数据潮汐遵循农历日,以24小时50分钟为周期;在此期间,地球完全旋转到轨道卫星下方的同一点。每个农历日都有两次高潮和两次低潮,这是由月球引力引起的:图3.来自美国国家海洋和大气管理局美国国家海洋和大气管理局(NOAA)提供了一个可以从潮汐中轻松访问的RESTAPI世界各地的车站。获取详细的传感器数据。图4例如,以下REST调用指定潮汐站ID、数据类型(我选择海平面)和数据(平均海平面),并请求以公制单位表示的最新结果:https://api.tidesandcurrents。noaa.gov/api/prod/datagetter?date=latest&station=8724580&product=water_level&datum=msl&units=metric&time_zone=gmt&application=instaclustr&format=json此调用返回一个JSON结果,其中包含潮汐站的经度、纬度、时间和水位。注意,你必须记住你调用的是什么,才能知道返回结果的数据类型、数据和单位!{"metadata":{"id":"8724580","name":"KeyWest","lat":"24.5508","lon":"-81.8081"},"数据":[{"t":"2020-09-2404:18","v":"0.597","s":"0.005""f":"1,0,0,0","q":"p"}]}启动数据管道(使用REST源连接器)要开始创建KafkaConnect流式数据管道,我们首先要准备Kafka集群和KafkaConnect集群。图5接下来,我们介绍一个REST连接器,例如这个可用的开源连接器。我们会将其部署到AWSS3存储桶(如果需要,请按照这些说明进行操作)。然后我们将要求KafkaConnect集群使用S3存储桶,同步它以在集群中可见,配置连接器,最后让它运行。这种“BYOC”(自带连接器)方法可确保您有无数种方法来找到满足您特定要求的连接器。图6以下示例演示了使用“curl”命令配置完全开源的KafkaConnect部署以使用RESTAPI。请注意,您需要更改URL、名称和密码以匹配您自己的部分:curlhttps://connectorClusterIP:8083/connectors-k-uname:password-XPOST-H'Content-Type:application/json'-d'{“名称”:“source_rest_tide_1”,“配置”:{“key.converter”:“org.apache.kafka.connect.storage.StringConverter”,“value.converter”:“org.apache.kafka.connect.storage。StringConverter","connector.class":"com.tm.kafka.connect.rest.RestSourceConnector","tasks.max":"1","re??st.source.poll.interval.ms":"600000","rest.source.method":"GET","re??st.source.url":"https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=latest&station=8454000&product=water_level&datum=msl&units=metric&time_zone=gmt&application=instaclustr&format=json","re??st.source.headers":"Content-Type:application/json,Accept:application/json","rest.source.topic.selector":"com.tm.kafka.connect.rest.selector.SimpleTopicSelector","re??st.source.destination.topics":"tides-topic"}}这段代码创建的连接器任务开始用RESTAPI每隔10分钟轮询一次,并将结果写入“潮汐主题”Kafka主题通过随机选择五个潮汐传感器以这种方式收集数据,潮汐数据现在通过五种配置和五个连接填充Tide图7结束管道(使用Elasticsearchsinkconnector)为了把这个潮汐数据放在某个地方,我们将在管道的末端引入Elasticsearch集群和Kibana。我们将配置一个开源的Elasticsearchsinkconnector发送到ElasticsearchData.Figure8Thefollowingexampleconfigurationusesthesinkname,class,Elasticsearchindex,andourKafkatopic.如果索引尚不存在,则会使用默认映射创建一个。curlhttps://connectorClusterIP:8083/connectors-k-uname:password-XPOST-H'Content-Type:application/json'-d'{"name":"elastic-sink-tides","config":{"connector.class":"com.datamountaineer.streamreactor.connect.elastic7.ElasticSinkConnector","tasks.max":3,"topics":"tides","connect.elastic.hosts":"ip","连接。elastic.port":9201,"connect.elastic.kcql":"INSERTINTOtides-indexSELECT*FROMtides-topic","connect.elastic.use.http.username":"elasticName","connect.elastic.use.http.password":"elasticPassword"}}'管道现在可以运行了。但是,由于默认的索引映射,进入潮汐索引的所有潮汐数据都是一个字符串。图9需要自定义映射才能准确绘制我们的时间序列数据。我们将为下面的潮汐指数创建此自定义映射,使用自定义日期的JSON“t”字段,“v”作为双精度值,“name”作为表示聚合的键。curl-uelasticName:elasticPassword”elasticURL:9201/tides-index”-XPUT-H'Content-Type:application/json'-d'{"mappings":{"properties":{"data":{"properties":{"t":{"type":"date","format":"yyyy-MM-ddHH:mm"},"v":{"type":"double"},"f":{"type":"text"},"q":{"type":"text"},"s":{"type":"text"}}},"metadata":{"properties":{"id":{"type":"text"},"lat":{"type":"text"},"long":{"type":"text"},"name":{"type":"关键字"}}}}}}'每次更改Elasticsearch索引映射时,通常需要Elasticsearch进行“重新索引”(删除索引并重新索引所有数据)。数据可以从现有的Kafka接收器连接重播,就像我们在这个用例中所做的那样,或者使用Elasticsearch重新索引操作来获取数据。使用Kibana可视化数据为了可视化潮汐数据,我们首先使用Kibana创建一个索引模式,将“t”配置为时间过滤器字段。然后,我们将创建一个可视化,选择折线图类型。最后,我们将配置图表设置,使y轴显示30分钟内的平均潮位,x轴显示随时间变化的数据。结果是下图显示了管道采集数据的五个样本潮汐站的潮汐变化:图10结果我们可以从可视化中清楚地看到潮汐的周期性,每个农历日有两次高潮。图11中更令人惊讶的是,每个全球潮汐站的高潮和低潮之间的间隔并不相同。这不仅受到月球的影响,还受到太阳、当地地理、天气和气候变化的影响。这个示例KafkaConnect管道利用Kafka、Elasticsearch和Kibana来帮助展示可视化的好处:它们通常可以揭示原始数据无法揭示的信息!原标题:如何使用KafkaConnect创建处理实时数据的开源数据管道,作者:PaulBrebner【翻译稿件,合作站点转载请注明原译者和出处.com】