当前位置: 首页 > 后端技术 > Java

如何远程调试自定义开发的Flume应用

时间:2023-04-02 10:09:14 Java

1.前言Flume是最流行的大数据采集组件之一。具有分布式/高可靠/高可用等优点,但与Flink/Spark/Kafka等大数据组件相比,对本地调试的支持度不高。如果不掌握Flume的远程调试要领,就只能不停地记录、部署、记录、部署这种低效的工作,对程序员来说无异于一种折磨。那么今天就和大家一起探讨一下Flume的远程调试方法。2.环境准备Flink官网下载上传服务器并解压。开发自定义Source,这里做一个简单的读取mysql表数据的demo,部分代码如下:packageorg.bigwinner.flume.sources;importorg.apache.flume.Context;importorg.apache.flume.Event;导入org.apache.flume.EventDeliveryException;导入org.apache.flume.PollableSource;导入org.apache.flume.conf.Configurable;导入org.apache.flume.event.EventBuilder;导入org.apache.flume.source。AbstractSource;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.sql.*;/***@author:ITBigLion*@date:2021/8/139:11PM*@version:1.0。0*@description:CustomSource--从MySQL表读取数据*/publicclassMysqlSourceextendsAbstractSourceimplementsPollableSource,Configurable{privatestaticfinalLoggerLOGGER=LoggerFactory.getLogger(MysqlSource.class);私有字符串mysqlUrl;私有字符串mysqlUser私有字符串mysqlPassword;私有字符串mysqlTable;私有字符串mysqlDriver;私有连接conn=null;公共状态过程()抛出EventDeliveryException{Stringsql="select*from"+mysqlTable;尝试{PreparedStatement语句=conn.prepareStatement(sql);结果集resultSet=statement.executeQuery();while(resultSet.next()){字符串id=resultSet.getString(1);字符串uuid=resultSet.getString(2);Stringiccid=resultSet.getString(3);byte[]eventBytes=newStringBuilder().append(id).append("--").append(uuid).append("--").append(iccid).toString().getBytes();事件事件=EventBuilder.withBody(eventBytes);getChannelProcessor().processEvent(事件);}}catch(SQLExceptionthrowables){throwables.printStackTrace();}返回Status.READY;}publiclonggetBackOffSleepIncrement(){返回0;}publiclonggetMaxBackOffSleepInterval(){返回0;}@Override/**Flume生命周期开始,可以做一些初始化工作*/publicvoidstart(){LOGGER.info("Mysqlsourcestart...");尝试{Class.forName(mysqlDriver);conn=DriverManager.getConnection(mysqlUrl,mysqlUser,mysqlPassword);}catch(ClassNotFoundExceptione){LOGGER.error("驱动类未找到!");}catch(SQLExceptionthrowables){LOGGER.error("gettheconnectionerror:{}",throwables);}}@Override/**Flume的生命周期结束,可以在结束前做一些保存等工作*/publicvoidstop(){LOGGER.info("Mysqlsourcestop...");if(conn!=null){try{conn.close();}catch(SQLExceptionthrowables){LOGGER.error("连接关闭异常:{}",throwables);}}super.stop();}/**Flume配置文件读取方法*/publicvoidconfigure(Contextcontext){mysqlUrl=context.getString("mysql.url","");mysqlUser=context.getString("mysql.user","");mysqlPassword=context.getString("mysql.password","");mysqlTable=context.getString("mysql.table","");LOGGER.info("mysql_driver:{}-->mysql_url:{}-->mysql_user:{}-->mysql_password:{}-->mysql_table:{}",mysqlDriver,mysqlUrl,mysqlUser,mysqlPassword,mysqlTable);}}编辑flumeagent配置文件,上传到flumeconf目录下a1.sources=s1a1.sinks=k1a1.channels=c1################################源################################自定义MySQL源类a1.sources.s1.type=org.bigwinner.flume.sources.MysqlSourcea1.sources.s1.mysql.driver=com.mysql.jdbc.Drivera1.sources.s1.mysql.url=jdbc:mysql://lsl001:3306/redis_tempa1.sources.s1.mysql.user=superboya1.sources.s1.mysql.password=iamsuperboya1.sources.s1.mysql.table=redis_temp###############################Channel################################配置文件通道数据管道a1.channels.c1.type=file#最小要求空间a1.channels.c1.minimumRequiredSpace=3145728#最大文件大小a1.channels.c1.maxFileSize=2146435071#flume事件指针检查点备份目录a1.channels.c1.checkpointDir=/opt/soft/flume/flume/data/checkpoint#file-channelpairevent备份到本地文件目录a1.channels.c1.dataDirs=/opt/soft/flume/data/file-channel-mysql/data#文件管道中的数据容量,单位a1.channels的个数。c1.capacity=200#文件管道中的事务数据容量,单位个数a1.channels.c1.transactionCapacity=100#Checkpoint备份flume时间指针间隔a1.channels.c1.checkpointInterval=60000###############################下沉################################本次测试的重点是Source,所以sink可以使用null,表示不会输出到任何地方chengchengkuan依赖包,包括所有依赖并上传到flume3的lib目录.环境配置服务器环境配置修改flume-ng启动命令文件:vim/opt/soft/flume/bin/flume-ng,修改如下Configuration://portdefault8000JAVA_OPTS="-Xmx500m-Xdebug-Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=y"如果正在使用flume-env.sh文件,需要注释掉flume-env.sh的JAVA_OPTS配置:vim/opt/soft/flume/conf/flume-env.sh,不用的话可以忽略。本地IDE(本例基于Idea)环境配置编辑配置界面,添加remote配置remote4.验证并启动flume代理,结果如下图,说明配置没有问题:start调试程序查看是否正常调试:从上面我们看到程序正确进入了断点,并且查询到了mysql记录。五、总结以上就是今天给大家分享的Flume远程调试方法。不会的请赶快练习,提高效率,珍惜自己!案例代码参考:flume_demo