Flink1.12SQL实时写数据到Redis本文转载请联系肌农公众号。插件名称:flink-connector-redis插件地址:https://github.com/jeff-zou/flink-connector-redis.git项目介绍基于bahir-flink二次开发,支持SQL到直接定义写入redis,用户通过DDL指定需要保存的字段。使用方法:在命令行执行mvnpackage-DskipTests=true后,将生成的包flink-connector-redis_2.12-1.11.1.jar导入到flinklib中,无需其他设置。重构介绍:与上一版本相比,简化了参数设置,思路更清晰。之前版本中字段的值会根据主键等条件自动生成。这需要用户了解相关规则,有一定的学习成本,容易埋点,重构字段的值由用户在DDL中指定,如下:'key-column'='username','value-column'='passport','//直接指定字段名取消了主键的限制,使用起来更方便。如果有多个字段组合成一个键或值,用户需要使用DML中的concat_ws进行组装。它不再是一个在后台用不可见字符组装它的插件。使用示例:1.SQL方法示例代码路径:src/test/java/org.apache.flink.streaming.connectors.redis.table.SQLInsertTest.javaset示例,相当于redis命令:settesttest11StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettingssenvironmentSettings=EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env,environmentSettings);Stringddl="createtablesink_redis(usernameVARCHAR,passport'connect'orwith'('connect,"+"'host'='10.11.80.147','port'='7001','redis-mode'='single','password'='******','key-column'='username','value-column'='passport','command'='set')";tEnv.executeSql(ddl);Stringsql="insertintosink_redisselect*from(values('test','test11'))";TableResulttableResult=tEnv.executeSql(sql);tableResult.getJobClient().get().getJobExecutionResult().get();2.DataStream示例代码路径:src/test/java/org.apache.flink。流媒体.connecttors.redis.datastream.DataStreamInsertTest.javahset示例,相对于redis指令:hsettommath150Configurationconfiguration=newConfiguration();configuration.setString(RedisOptions.KEY_COLUMN,"name");configuration.setString(RedisOptions.FIELD_COLUMN,"subject");//对应hash的字段、sortedset的scoreconfiguration.setString(RedisOptions.VALUE_COLUMN,"score");configuration.setString(REDIS_MODE,REDIS_CLUSTER);configuration.setString(REDIS_COMMAND,RedisCommand.HSET.name());RedisMapperredisMapper=RedisHandlerServices.findRedisHandler(RedisMapperHandler.class,configuration.toMap()).createRedisMapper(配置);StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();GenericRowDatagenericRowData=newGenericRowData(3);genericRowData.setField(0,"tom");genericRowData.setField(1)"math");genericRowData.setField(2,"150");DataStream
