当前位置: 首页 > Linux

日志系统EFK后续:monitor告警监控

时间:2023-04-06 22:57:24 Linux

上一篇日志系统EFK后续:fluent-bit服务独立完成fluent-bit收集,fluentd转发到kafka,再到elasticsearch的全过程。后面会提到服务器的日志要同步到fluent-bit所在的环境。这个可以通过rsync做增量同步来实现,具体就不记录了。现在主要记录kafka中告警日志监控和发送消息通知的过程。fluentd配置过滤器错误级别日志@typerecord_transformerenable_rubytag${record["fbKey"].split('/')[3]}remove_keysfbKey@typecopy@typerewrite_tag_filterkeylevelpattern/^ERROR$/tagerror.fb.dapeng@typekafka_bufferedbrokerskafka服务器ip:9092topic_keyefkbuffer_type文件buffer_path/tmp/bufferflush_interval5sdefault_topicefkoutput_data_typejsoncompression_codecgzipmax_send_retries3required_acks-1discard_kafka_delivery_failedtrue@typekafka_bufferedbrokerskafka服务器ip:9092topic_keyefk_errorbuffer_typefilebuffer_path/tmp/buffer_errorflush_interval5sdefault_topicefk_erroroutput_data_typejsoncompression_codecgzipmax_send_retries3required_acks-1discard_kafka_delivery_failedtrue多copy:将事件每个输出,相当于matchrewrite_tag_filter:根据规则为匹配的事件重写tag,发送一个新的tag的消息,从上到下重新处理一遍,所以注意重写的tag不能和当前匹配匹配,否则会陷入死循环...这里匹配tag为fb.dapeng的消息:匹配level为ERROR的消息并将tag重写为error.fb.dapeng,直接发送消息到kafka的topic:efk重写了在1中标记为error.fb。对于大鹏消息,将消息发送到kafka的主题:efk_error,这样elasticsearch只消费kafka的主题:efk,包含所有级别的日志信息,而对于告警监控monitor,只消费kafka的主题:efk_error,都是ERROR级别log注意:需要安装rewrite_tag_filter插件,修改fluentd的Dockerfile并重建镜像FROMfluent/fluentd:v1.2#Addesplugin,kafkaplugin,rewrite_tag_filterpluginRUNfluent-geminstallfluent-plugin-elasticsearchRUNfluent-gem安装fluent-plugin-kafkaRUNfluent-gem安装fluent-plugin-rewrite-tag-filterCMDexecfluentd-c/fluentd/etc/${FLUENTD_CONF}-p/fluentd/plugins$FLUENTD_OPTmonitor项目消费kafka消息kafka消费处理逻辑如下:首先解析消息json,取出sessionTid(标记服务调用链,同一个调用操作,sessionTid相同),对于同一个sessionTid,只记录第一个报错信息(sessionTid缓存10s,然后清除,一般同一个调用报错不会间隔10s)钉钉群消息通知@Component@Slf4jpublicclassEfkAlarmConsumer{@ResourceEfkAlarmAppefkAlarmApp;privatefinalMapcache=newConcurrentHashMap<>();私人最终定时器定时器=新定时器();@KafkaListener(topics={"efk_error"},groupId="efk_error_consumer")publicvoidprocessEfkAlarm(ConsumerRecordrecord){Stringjson=record.value();日志l=resolveLog(json);if(null==l){log.error("非法信息:{}",json);}else{log.debug("接收消息日志:{}",l);进程日志(l);}}privatevoidprocessLog(Logl){finalStringtid=l.getSessionTid();Longt=cache.get(tid);if(t==null){cache.put(tid,System.currentTimeMillis());//10s之后tid的数据清除timer.schedule(newTimerTask(){@Overridepublicvoidrun(){cache.remove(tid);}},10000);StringcurrIndex=String.format("dapeng_log_index-%s",newSimpleDateFormat("yyyy.MM.dd").format(newDate()));//发钉钉...Stringtext=String.format("%s",l.getMessage());Stringtitle=String.format("[%s]%s:%s[%s]",efkAlarmApp.getDingTag(),l.getLogtime(),l.getTag(),l.getHostname());字符串url=String.format(AppConst.ESHEAD_LINK_URI,currIndex,l.getSessionTid());DingService.send(efkAlarmApp.getDingWebHook(),msg(text,title,url));}}privateLogresolveLog(Stringjson){Logl=null;尝试{l=JSON.parseObject(json,Log.class);}catch(Throwablee){log.error("消息转换异常",e);}返回l;}privateStringmsg(Stringtext,Stringtitle,Stringurl){returnString.format("{\n"+"\"msgtype\":\"link\",\n"+"\"link\":{\n"+"\"text\":\"%s\",\n"+"\"title\":\"%s\",\n"+"\"picUrl\":\"\",\n"+"\"messageUrl\":\"%s\"\n"+"}\n"+"}",text,title,url);}}消息链接跳转链接格式ESHEAD_LINK_URI:publicclassAppConst{publicstaticfinalStringES_BASE_URI="elasticsearch服务器访问地址";publicstaticfinalStringESHEAD_BASE_URI="elasticsearch-head网页访问地址";publicstaticfinalStringESHEAD_LINK_URI=ESHEAD_BASE_URI+"?curr_index=%s&sessionTid=%s&base_uri="+ES_BASE_URI="+ES_BASE_URI;}修改elasticsearch-head项目中的index.html