当前位置: 首页 > 科技观察

Zabbix与ELK集成实现安全日志数据的实时监控与告警

时间:2023-03-12 06:34:22 科技观察

1ELK与ZABBIX有什么关系?ELK你应该不陌生,zabbix你应该不陌生,所以如果你把ELK和zabbix放在一起,你可能会有什么疑问?把这两个放在一起的目的是什么?详细听我说。ELK是一个日志收集套件。它实际上由三个软件组成:Elasticsearch、Logstash和Kibana。通过ELK,您可以收集系统日志和网站。日志、应用系统日志等日志数据也可以进行过滤清洗,然后集中存储,供实时检索分析。这是ELK的基本功能。但是有时候,我们希望在收集日志的时候,能够及时提取出日志中的异常信息(warning、error、failure等),因为日志中的异常信息意味着操作系统和应用程序可能出现了故障。如果能够及时将日志中的故障信息告知运维人员,那么运维人员就可以第一时间排查和处理故障,从而避免很多故障的发生。那么如何将ELK收集的日志数据中的异常信息及时告知运维人员呢?这就需要使用zabbix。ELK(更准确的说应该是logstash)可以实时读取日志的内容,还可以过滤日志信息。通过ELK的读取过滤功能,可以过滤掉日志中一些异常的关键字(error、failed、OutOff、Warning),然后通过logstash的zabbix插件过滤错误日志信息发送给zabbix,然后zabbix收到这个数据后,结合自己的机制,然后发起告警动作,从而实现了zabbix实时告警日志异常的功能。2Logstash和zabbix插件的使用Logstash支持多种输出介质,如syslog、HTTP、TCP、elasticsearch、kafka等,有时我们希望在采集的日志中输出一些错误信息并warn,我们uselogstash-output-zabbix是一个插件,这个插件可以整合Logstash和zabbix,即对Logstash收集的数据进行过滤,将带有错误标记的日志输出到zabbix,最后通过zabbix的告警机制。logstash-output-zabbix是社区维护的插件。Logstash默认没有安装它,但是安装起来很容易。直接在logstash中运行如下命令即可:[root@elk-masterbin]#/usr/share/logstash/bin/logstash-plugininstalllogstash-output-zabbix其中,/usr/share/logstash/bin/是Logstashyum的默认安装目录。如果是源码安装,需要自己修改。另外还有很多logstash-plugin的命令我们来看看这个用法:2.1Listcurrentlyinstalledplugins列出所有安装的插件[root@elk-master~]#/usr/share/logstash/bin/logstash-pluginlist列出已安装的插件和版本信息[root@elk-master~]#/usr/share/logstash/bin/logstash-pluginlist--verbose列出所有已安装的包含namefragment的插件[root@elk-master~]#/usr/share/logstash/bin/logstash-pluginlist"*namefragment*"列出特定组的所有已安装插件(输入、过滤器、编解码器、输出)[root@elk-master~]#/usr/share/logstash/bin/logstash-plugin--groupoutput2.2安装插件安装插件,例如安装kafka插件,执行如下命令:[root@elk-master~]#/usr/share/logstash/bin/logstash-plugininstalllogstash-output-kafka要使用此命令安装插件,您的计算机需要能够访问互联网。这种插件安装方法检索托管在公共存储库(RubyGems.org)上的插件,然后将它们下载到本地计算机并在Logstash安装之上自动安装它们2.3更新插件每个插件都有自己的发布周期和版本更新,这些更新通常独立于Logstash发布周期。因此,有时需要单独更新插件,可以使用update子命令获取最新版本的插件来更新所有已安装的插件[root@elk-master~]#/usr/share/logstash/bin/logstash-pluginupdate只更新指定的[root@elk-master~]#/usr/share/logstash/bin/logstash-pluginupdatelogstash-output-kafka2.4删除插件如果需要从Logstash插件中删除插件,可以执行如下命令:[root@elk-master~]#/usr/share/logstash/bin/logstash-pluginremovelogstash-output-kafka这样就删除了logstash-output-kafka插件3logstash-output-使用logstash-output-zabbix安装zabbix插件后,在logstash配置文件中即可使用。下面是logstash-output-zabbix的使用示例:zabbix{zabbix_host=>"[@metadata][zabbix_host]"zabbix_key=>"[@metadata][zabbix_key]"zabbix_server_host=>"x.x.x.x"zabbix_server_port=>"xxxx"zabbix_value="xxxx"}其中:zabbix_host:表示Zabbix主机名字段的名称,可以是单个字段,也可以是@metadata字段的子字段,必须设置,没有默认值。zabbix_key:表示zabbixitemkey的值,即zabbix中的item。该字段可以是单个字段,也可以是@metadata字段的子字段,没有默认值。zabbix_server_host:表示Zabbix服务器的IP或可解析的主机名。默认值为“localhost”,需要设置为zabbixserver所在地址。zabbix_server_port:表示Zabbixserver开启的监听端口,默认值为10051。zabbix_value:表示要发送给zabbixitem监控项的值对应的字段名。默认值为“message”,即“message”字段的内容通过zabbix_key发送给上面定义的zabbixitem监控项。当然也可以指定一个特定字段的内容发送到zabbixitem监控项。4将logstash与zabbix集成这里我们使用logstash来收集日志,然后读取日志,最后选择关键字进行过滤,调用zabbix报警流程,看看如何配置logstash实现zabbix报警。先说明一下我们的应用需求:通过对系统日志文件的监控,然后过滤日志信息中的一些关键字,比如ERROR,Failed,WARNING等,过滤掉日志中的这些信息,然后发送给zabbix,最后使用zabbix的告警功能实现系统日志中以上关键字的告警。对于过滤关键字和告警,不同的业务系统可能有不同的关键字。例如对于http系统,可能需要过滤500、403、503等错误码。对于java相关的系统,可能需要过滤OutOfMemoryError、PermGen、Javaheap等关键字。在一些业务系统的日志输出中,可能会有一些自定义的错误信息,所以这些也需要作为过滤关键字。4.1配置logstash事件配置文件下一步是创建一个logstash事件配置文件。这里,配置文件分为三部分来介绍,首先是输入部分,内容如下:input{file{path="/var/log/secure"type="system"start_position="beginning"}}input部分是从/var/log/secure文件中读取数据,start_position表示从secure文件开头读取内容。接着是过滤部分,内容如下:{POSINT:message_pid}\])?:%{GREEDYDATA:message_content}"}#这里将message字段的数据通过grok进行分字段,将message字段分为5个子字段。其中message_content字段会在输出中使用}mutate{add_field=>["[zabbix_key]","oslogs"]#新增字段,字段名为zabbix_key,值为oslogsadd_field=>[[]zabbix_host]","Zabbixserver"]#新增字段,字段名为zabbix_host,其值可以直接在此处定义,也可以通过引用字段变量获取。这里的%{host}是日志数据的主机名,需要和zabbixweb中的“主机名”保持一致remove_field=>["@version","message"]#这里是删除不需要的字段}date{#这里是将日志输出中的日期字段进行转换,其中message_timestamp字段默认为时间日期字段输出,并将该字段的值传递给@timestamp字段match=>["message_timestamp","MMMdHH:mm:ss","MMMddHH:mm:ss","ISO8601"]}}filter部分是重点,这部分重点在message_timestamp字段,message_content字段最后输出部分,内容如下:output{elasticsearch{#这部分是将日志发送到es,在kibana中展示。如果没有部署,可以去掉index=>"oslogs-%{+YYYY.MM.dd}"hosts=>["192.168.73.133:9200"]user=>"elastic"password=>"Goldwind@2019"sniffing=>false}if[message_content]=~/(ERR|error|ERROR|Failed)/{#定义在message_content字段中,需要过滤的关键字信息,即message_content字段中给出的关键字,然后把这个信息发给zabbixzabbix{zabbix_host=>"[zabbix_host]"#这个zabbix_host会得到上面的filter部分定义字段变量的值%{host}zabbix_key=>"[zabbix_key]"#这个zabbix_key会得到上面filter部分给出的值zabbix_server_host=>"192.168.73.133"#这是指定zabbix的IP地址serverzabbix_server_port=>"10051"#这个是指定zabbixserver的监听端口zabbix_value=>"message_content"#这个很重要,指定传递给zabbix监控项(oslogs)的值,??zabbix_value的默认值是“message”字段,因为上面我们删除了“message”字段,所以这里需要重新指定。根据上面过滤部分中“message”字段的内容划分,我们指定为“message_content”字段。实际上,“message_content”字段的输出就是服务器上的具体内容。日志内容}}#stdout{codec=>ruby??debug}}#这里是开启调试模式。初次配置时,建议开启,这样过滤后的日志信息直接输出到屏幕上,方便调试。调试成功后,可以将以上三部分关闭合并成一个文件file_to_zabbix.conf,然后启动logstash服务:[root@logstashserver~]#cd/usr/local/logstash[root@logstashserverlogstash]#nohupbin/logstash-fconfig/file_to_zabbix.conf--path.data/tmp/&这里--path.data是指定logstash进程的数据存放目录,用于在一台服务器上启动多个logstash进程。4.2Zabbix平台配置日志告警登录zabbixweb平台,选择配置——>模板——>创建模板,命名为logstash-output-zabbix,如下图:然后,在该模块下创建应用集,点击应用集————>创建应用设置,如下图:然后,在该模块下创建监控项,点击监控项--->创建监控项,如下图:至此,zabbix监控的日志数据配置logstash完成这里我们以客户192.168.73.135主机为例,即在192.168.73.135主机上监控系统日志数据,发现日志异常时发出告警创建模板并监控项,然后将此模板链接到192.168.73.135主机,选择“配置”-“主机”,然后选择192.168.73.135主机,选择“模板”选项卡,将上面创建的模板添加到192.168。73.135主机,如下图:这样,监控项c上面说的在192.168.73.135主机上会自动生效。让我们模拟一次失败。在任意一台主机上通过ssh登录192.168.73.135主机,然后输入错误的密码让系统的/var/log/secure文件产生错误日志,再看一下。logstash可以过滤吗,可以发到zabbix吗?首先让系统文件/var/log/secure生成类似下面的内容:[root@k8s-node03~]#tail/var/log/secureDec2522:57:44k8s-node03sshd[49803]:Failed来自192.168.73.1端口60256ssh2Dec2522:57:48k8s-node03sshd[49803]的root密码:错误:收到来自192.168.73.1端口60256:13的断开连接。[preauth]Dec2522:57:48k8s-node03sshd[49803]:从192.168.73.1端口60256断开[preauth]Dec2523:02:40k8s-node03sshd[49908]:反向映射检查bogon[192.168.73.1]的getaddrinfo失败-可能的闯入尝试!Dec2523:02:43k8s-node03unix_chkpwd[49911]:用户(root)密码检查失败Dec2523:02:43k8s-node03sshd[49908]:pam_unix(sshd:auth):身份验证失败;logname=uid=0euid=0tty=sshruser=rhost=192.168.73.1user=rootDec2523:02:43k8s-node03sshd[49908]:pam_succeed_if(sshd:auth):requirement"uid>=1000"not遇到用户“root”Dec2523:02:44k8s-node03sshd[49908]:来自192.168.73.1端口60757的root密码失败ssh2Dec2523:02:46k8s-node03sshd[49908]:错误:收到断开连接来自192.168.73.1端口60757:13:用户取消认证。[preauth]Dec2523:02:46k8s-node03sshd[49908]:Disconnectedfrom192.168.73.1port60757[preauth]里面有我们要过滤的关键字Failed,所以logstash会过滤掉这个内容发给扎比克斯。然后,登录zabbixweb平台,点击监控——>最新数据,如果zabbix能收到日志,可以看到下图中的最新数据:点击历史记录可以查看详细内容,如图下图:可以看到红框中的内容就是logstash中定义的message_content字段的内容,至此,zabbix已经可以接收到logstash发送过来的数据,但是要实现报警,需要在zabbix中创建一个触发器,进入配置--->模板,选择logstash-output-zabbixtemplate,然后点击上面的trigger,继续点击右上角的createtrigger,如下图:这里要注意创建trigger时表达式的写法,这里的trigger条件是:如果logstash发送进来的数据会发出告警,也就是说如果接收到的数据长度大于0,就会发出告警。触发器配置完成后,如果配置正常,会发出告警。DingTalk用于图文告警,告警内容如下图:Kibana上查看安全日志总结首先我们看一下思路:我们的架构基本没变,还是filebat收集日志并将它们推送到kibana消息队列中,然后Logstash拉取日志数据,最后处理后传输出去;它只是一个传输输出到zabbix;该功能的核心贡献者是Logsatsh插件(logstash-output-zabbix);这里需要注意的是:filebeat采集端的IP一定要和zabbix监听主机的IP对应,否则日志是过不来的~分享一个小技巧:通过这个命令,可以测试key值在zabbix上定义;下面的输出就正常了~,如果failed不为零,说明失败#在server客户端使用zabbix_sender发送测试到zabbix[root@localhostzabbix_sender]#/usr/local/zabbix/bin/zabbix_sender-s192.168.73.135-z192.168.73.133-k“oslogs”-o1来自服务器的信息:“已处理:1;失败:0;总计:1;花费的秒数:0.000081”已发送:1;跳过:0;总数:1详细解释:-s:指定本地代理-z:指定zabbixserver-k:指定key值