当前位置: 首页 > Web前端 > HTML

日志服务FlinkConnector《支持Exactly Once》

时间:2023-04-02 22:12:03 HTML

摘要:Flink日志连接器是阿里云日志服务推出的一款连接Flink的工具。它由消费者和生产者两部分组成。消费者用于从日志服务中读取数据。支持exactlyonce语义。生产者用于向日志服务写入数据。Connector隐藏了一些日志服务的概念,比如分片的拆分和合并。用户在使用时只需要关注自己的业务逻辑即可。阿里云日志服务是实时数据的一站式服务。用户只需要专注于分析。在此过程中,数据采集、对接各种存储计算、数据索引和查询等琐碎的工作都可以交给日志服务。日志服务最基本的功能是LogHub,支持实时的数据采集和消费。实时消费家族除了SparkStreaming、Storm、StreamCompute(Blink除外)之外,又增加了Flink。FlinkConnectorFlinklogconnector是阿里云日志服务提供的一个连接Flink的工具。它包括消费者(Consumer)和生产者(Producer)两部分。Consumer用于从日志服务中读取数据,支持exactlyonce语义,支持shard负载均衡。生产者用于向日志服务写入数据。使用connector时需要在项目中添加maven依赖:org.apache.flinkflink-streaming-java_2.111.3.2com.aliyun.openservicesflink-log-connector0.1.3com.google.protobufprotobuf-java2.5.0com.aliyun.openservicesaliyun-log0.6.10com.aliyun.openserviceslog-loghub-producer0.1.8代码:请参考日志服务文档Github的使用。正确创建Logstore如果使用子账号访问,请确认LogStore的RAM策略设置正确。参见授权RAM子用户访问日志服务资源。1.LogConsumer在Connector中,FlinkLogConsumer类提供了订阅日志服务中某个LogStore的能力,实现了exactlyonce语义。使用时,用户无需关心LogStore中分片数量的变化,消费者自动感知。Flink中的每个子任务负责消费LogStore中的部分分片。如果LogStore中的分片被拆分或合并,子任务消耗的分片也会随之变化。1.1配置启动参数PropertiesconfigProps=newProperties();//设置访问日志服务的域名configProps.put(ConfigConstants.LOG_ENDPOINT,"cn-hangzhou.log.aliyuncs.com");//设置访问权限akconfigProps.put(ConfigConstants.LOG_ACCESSSKEYID,"");configProps.put(ConfigConstants.LOG_ACCESSKEY,"");//设置日志服务项目configProps.put(ConfigConstants.LOG_PROJECT,"ali-cn-hangzhou-sls-admin");//设置服务的日志LogStoreconfigProps.put(ConfigConstants.LOG_LOGSTORE,"sls_consumergroup_log");//设置消费者日志服务的起始位置configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,Consts.LOG_END_CURSOR);//设置日志服务的消息反序列化方法RawLogGroupListDeserializerdeserializer=newRawLogGroupListDeserializer();finalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();DataStreamlogTestStream=env.addSource(newFlinkLogConsumer(deserializer,configProps));上面是一个简单的消费者例子,我们使用java。util.Properties作为配置工具使用,所有的Consumer配置都可以在ConfigConstants中找到。注意flinkstream的subtask数量和日志服务LogStore中的shard数量是独立的。如果分片数大于子任务数,则每个子任务不会重复消费多个分片。如果小于这个值,一些子任务将被闲置消耗,等待新的分片生成。1.2设置消费起始位置Flinklogconsumer支持设置shard消费起始位置。通过设置属性ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,您可以自定义消费从分片的开始和结束或特定时间开始。具体取值如下:consts.LOG_BEGIN_CURSOR:表示从分片头部开始消费,即从分片中最旧的数据开始消费。consts.LOG_END_CURSOR:表示从分片末尾开始消费,即分片中最新的数据。UnixTimestamp:一串整数值,表示从1970-01-01到现在的秒数,表示在分片中消费这个时间点之后的数据。三种取值示例如下:configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,Consts.LOG_BEGIN_CURSOR);configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,Consts.LOG_END_CURSOR);configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,"1512439000");1.3监控:消费进度(可选)Flink日志消费者支持设置消费进度监控。所谓消费进度,就是获取每个分片的实时消费位置。该位置由时间戳表示。详细概念请参考文档消费组-查看状态,【消费组-监控告警】(https://help.aliyun.com/docum...configProps.put(ConfigConstants.LOG_CONSUMERGROUP,"你的消费组名称");注意上面的代码是可选的,如果设置了,consumer会先创建一个consumerGroup,如果已经存在,什么都不做,consumer中的快照会自动同步到日志服务的consumerGroup中,用户可以查看consumer的消费进度在日志服务的控制台。1.4容灾和exactlyonce语义支持当开启Flink的checkpointing功能后,Flinklogconsumer会周期性保存每个shard的消费进度。当作业失败时,Flink会恢复日志消费者,保存最新的Checkpoint启动消费。写checkpoint的周期定义了发生故障时回溯多少数据,即重新消费。代码如下:finalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();//Enableflinkexactlyoncesemanticsenv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//每5s保存一次checkpointenv.enableCheckpointing(5000);更多关于FlinkCheckpoints的内容可以参考Flink官方文档Checkpoints。1.5补充材料:关联API和权限设置Flink日志消费者使用的阿里云日志服务接口如下:GetCursorOrData用于从shard中拉取数据。请注意,频繁调用该接口可能会导致数据超出日志服务的分片配额。您可以使用ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS和ConfigConstants.LOG_MAX_NUMBER_PER_FETCH来控制接口调用的时间间隔和每次调用拉取的日志条数。shard的配额可以参考文章【shard介绍】(https://help.aliyun.com/document_detail/28976.html).configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS,"100");configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH,"100");ListShards用于获取logStore中的所有分片列表,获取分片状态等,如果你的分片经常分裂合并,可以通过调整接口的调用周期及时发现分片的变化。//设置为每30秒调用一次ListShardsconfigProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS,"30000");CreateConsumerGroup只有设置了消费进度监控才会调用这个接口。作用是创建一个consumerGroup,用于同步checkpoints。ConsumerGroupUpdateCheckPoint该接口允许用户将Flink的快照同步到日志服务的consumerGroup。子用户需要授权以下RAM策略才能使用Flink日志消费者:日志生产者FlinkLogProducer用于向阿里云日志服务写入数据。注意,生产者只支持Flinkat-least-once语义,这意味着在作业失败时,写入日志服务的数据可能会重复,但不会丢失。使用示例如下,我们将模拟生成的字符串写入日志服务://将数据序列化为日志服务类的数据格式SimpleLogSerializerimplementsLogSerializationSchema{publicRawLogGroupserialize(Stringelement){RawLogGrouprlg=新的原始日志组();RawLogrl=newRawLog();rl.setTime((int)(System.currentTimeMillis()/1000));rl.addContent("消息",元素);rlg.addLog(rl);返回rlg;}}publicclassProducerSample{publicstaticStringsEndpoint="cn-hangzhou.log.aliyuncs.com";公共静态字符串sAccessKeyId="";公共静态字符串sAccessKey="";publicstaticStringsProject="ali-cn-hangzhou-sls-admin";publicstaticStringsLogstore="test-flink-producer";privatestaticfinalLoggerLOG=LoggerFactory.getLogger(ConsumerSample.class);publicstaticvoidmain(String[]args)throwsException{finalParameterToolparams=ParameterTool.fromArgs(args);最终StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setGlobalJobParameters(参数);env.setParallelism(3);DataStreamsimpleStringStream=env.addSource(newEventsGenerator());属性configProps=newProperties();//设置访问日志服务的域名configProps.put(ConfigConstants.LOG_ENDPOINT,sEndpoint);//设置访问日志服务的akconfigProps.put(ConfigConstants.LOG_ACCESSSKEYID,sAccessKeyId);configProps.put(ConfigConstants.LOG_ACCESSKEY,sAccessKey);//设置写入日志的日志服务项目configProps.put(ConfigConstants.LOG_PROJECT,sProject);//设置写日志的日志服务logStoreconfigProps.put(ConfigConstants.LOG_LOGSTORE,sLogstore);FlinkLogProducerlogProducer=newFlinkLogProducer(newSimpleLogSerializer(),configProps);simpleStringStream.addSink(logProducer);env.execute("flink日志生产者");}//模拟日志生产public静态类EventsGenerator实现SourceFunction{privatebooleanrunning=true;@Overridepublicvoidrun(SourceContextctx)throwsException{longseq=0;while(running){Thread.sleep(10);((seq++)+"-"+RandomStringUtils.randomAlphabetic(12));}}@Overridepublicvoidcancel(){running=false;}}}2.1初始化Producer初始化主要需要做两件事:初始化配置参数Properties,这一步和Consumer类似。生产者有一些自定义参数。一般来说,可以使用默认值。特殊场景可以考虑自定义://用于发送数据的io线程数,默认为8ConfigConstants.LOG_SENDER_IO_THREAD_COUNT//这个值定义了日志数据缓存和发送的时间,默认为3000ConfigConstants.LOG_PACKAGE_TIMEOUT_MILLIS//缓存发送包的日志条数,默认4096ConfigConstants.LOG_LOGS_COUNT_PER_PACKAGE//缓存发送包的大小,默认3MbConfigConstants.LOG_LOGS_BYTES_PER_PACKAGE//作业可以使用的内存总大小,默认为100MbConfigConstants.LOG_MEM_POOL_BYTES以上参数不是必选参数,用户可以不设置直接使用默认值重载LogSerializationSchema来定义将数据序列化到RawLogGroup中的方法。RawLogGroup是日志的集合。各字段含义请参考文档【日志数据模型】(https://help.aliyun.com/document_detail/29054.html)。如果用户需要使用日志服务的shardHashKey功能,指定要写入某个分片的数据,可以使用LogPartitioner生成数据的hashKey。使用示例如下:FlinkLogProducerlogProducer=newFlinkLogProducer(newSimpleLogSerializer(),configProps);logProducer.setCustomPartitioner(newLogPartitioner(){//生成32位哈希值publicStringgetHashKey(字符串元素){try{MessageDigestmd=MessageDigest.getInstance("MD5");md.update(element.getBytes());Stringhash=newBigInteger(1,md.digest()).toString(16);而(hash.length()<32)hash=“0”+hash;returnhash;}catch(nosuchalgorithmexceptione){}|请注意,LogPartitioner是可选的。如果不设置,数据会随机写入某个shard。2.2权限设置:RAMPolicyProducer依赖日志服务API写入数据,如下:log:PostLogStoreLogslog:ListShardsRAM子用户使用Producer时,需要对以上两个API进行授权: