首先说到Linux操作审计需求,我们最想做的是还原在线服务器被手动(误)操作时执行的命令行,以及它的关联上下文。这个需求场景其实和一般的业务日志采集是一致的。如果比较简单,可以直接通过history将内容发送到syslog,如果比较复杂,可以使用auditd或者ebpf来捕捉内核层面的行为。不过本文无意解释上述方案的原理,只是从角度上完成80%的日常运行审计(80%的数据来源?不知道,大概是28原理)运维新手。由于文章的标题是用Shell做的,可见今天的话题和Bash不无关系。一句话总结今天的主题:使用自定义Bash源增加日志审计功能,将用户操作发送到rsyslog进行聚合,最后在elasticsearch中做日志存储和查询。Linux部分准备一些必要的工具rsyslog:Linux自带的日志处理服务,兼容syslog语法jq:shell下处理json数据的小工具logger:可以将日志输入syslog的工具这些小工具在除了jq,大多数操作系统发行版都自带,如果没有,也可以直接用操作系统自带的包管理工具安装。ash.audit.sh,并将其选中贝到/etc/profile.d/目录下if["${SHELL##*/}"!="bash"];然后returnfiif["${AUDIT_READY}"="yes"];然后returnfideclare-rxHISTFILE="$HOME/.bash_historydeclare-rxHISTSIZE=500000declare-rxHISTFILESIZE=500000declare-rxHISTCONTROL=""declare-rxHISTIGNORE=""declare-rxHISTCMDdeclare-rxAUDIT_READY="yes"shopt-shistappendshopt-scmdhistshopt-shistverifyifshopt-qlogin_shell&&[-t0];thenstty-ixonfiifgroups|grep-qroot;然后declare-xTMOUT=86400#chattr+a"$HISTFILE"fideclare-aLOGIN_INFO=($(who-mu|awk'{print$1,$2,$6}'))declare-rxAUDIT_LOGINUSER="${LOGIN_INFO[0]}"declare-rxAUDIT_LOGINPID="${LOGIN_INFO[2]}"declare-rxAUDIT_USER="$USER"声明-rxAUDIT_PID="$$"声明-rxAUDIT_TTY="${LOGIN_INFO[1]}"声明-rxAUDIT_SSH="$([-n"$SSH_CONNECTION"]&&echo"$SSH_CONNECTION"|awk'{print$1":"$2"->"$3":"$4}')"declare-rxAUDIT_STR="$AUDIT_LOGINUSER$AUDIT_LOGINPID$AUDIT_TTY$AUDIT_SSH"declare-rxAUDIT_TAG=$(echo-n$AUDIT_STR|sha1sum|cut-c1-12)declare-xAUDIT_LASTHISTLINE=""set+ofunctraceshopt-sextglobfunctionAUDIT_DEBUG(){if[-z"$AUDIT_LASTHISTLINE"];thenlocalAUDIT_CMD="$(fc-l-1-1)"AUDIT_LASTHISTLINE="${AUDIT_CMD%%+([^0-9])*}"elseAUDIT_LASTHISTLINE="$AUDIT_HISTLINE"filocalAUDIT_CMD="$(历史1)"AUDIT_HISTLINE="${AUDIT_CMD%%+([^0-9])*}"if["${AUDIT_HISTLINE:-0}"-ne"${AUDIT_LASTHISTLINE:-0}"]||["${AUDIT_HISTLINE:-0}"-eq"1"];然后MESSAGE=$(jq-c-n\--argpwd"$PWD"\--argcmd"${AUDIT_CMD##*()?(+([0-9])?(\*)+())}"\--arg用户"$AUDIT_LOGINUSER"\--arg成为"$AUDIT_USER"\--argpid"$AUDIT_PID"\--arg信息"${AUDIT_STR}"\'{cmd:$cmd,user:$user,become:$become,pid:$pid,pwd:$pwd,info:$info}')logger-plocal6.info-t"$AUDIT_TAG""@cee:$MESSAGE"fi}functionAUDIT_EXIT(){localAUDIT_STATUS="$?”if[-n"$AUDIT_TTY"];thenMESSAGE_CLOSED=$(jq-c-n\--argaction"sessionclosed"\--arguser"$AUDIT_LOGINUSER"\--argbecome"$AUDIT_USER"\--argpid"$AUDIT_PID"\--arginfo"${AUDIT_STR}"\'{user:$user,become:$become,pid:$pid,action:$action,info:$info}')logger-plocal6.info-t"$AUDIT_TAG""@cee:$MESSAGE_CLOSED"fiexit"$AUDIT_STATUS"}declare-frx+tAUDIT_DEBUGdeclare-frx+tAUDIT_EXITif[-n"$AUDIT_TTY"];thenMESSAGE_OPENED=$(jq-c-n\--arg操作“会话打开”\--arg用户“$AUDIT_LOGINUSER”\--arg成为“$AUDIT_USER”\--argpid“$AUDIT_PID”\--arg信息“${AUDIT_STR}"\'{user:$user,become:$become,pid:$pid,action:$action,info:$info}')logger-plocal6.info-t"$AUDIT_TAG""@cee:$MESSAGE_OPENED"fideclare-rxPROMPT_COMMAND="[-n"$AUDIT_DONE"]&&echo'';AUDIT_DONE=;陷阱'AUDIT_DEBUG&&AUDIT_完成=1;trapDEBUG'DEBUGdeclare-rxBASH_COMMANDdeclare-rxSHELLOPTrapAUDIT_EXITEXIT简单解释一下这个脚本,它大致定义了shell的历史条目,登录超时,审计日志格式和发送配置。/etc/rsyslog.d/40-audit.conf文件,用于将本地local6级系统日志发送到远程rsyslog服务进行集中处理$RepeatedMsgReductionofflocal6.info@<>:514&stop配置完成后,不要'不要忘记重启下载rsyslog服务!数据部分顾名思义,数据部分用于接收和处理客户端发送的操作系统日志。这里我们使用了两个服务,rsyslog和elasticsearch。准备rsyslog-elasticsearch为了rsyslog向elasticsearch发送日志,我们必须安装它的es模块#Ubuntuapt-getinstall-yrsyslog-elasticsearchrsyslog-mmjsonparse#CentOSyuminstallrsyslog-elasticsearchrsyslog-mmjsonparse准备ElasticSearch服务,方便部署,本文直接使用docker快速拉起一个ES服务dockerrun-d--nameelasticsearch-p9200:9200-p9300:9300-e"discovery.type=single-node"elasticsearch:7.3.1配置rsyslog服务器,创建一个文件/etc/rsyslog.d/40-audit-server.conf来定义日志写入策略。$RepeatedMsgReductionoff$ModLoadimudp$UDPServerRun514module(load="mmjsonparse")#用于解析CEE增强的syslog消息模块(load="omelasticsearch")#用于输出到Elasticsearch#trytoparseastructuredlog#thisisforindexnamesto就像:rsyslog-YYYY.MM.DDtemplate(name="rsyslog-index"type="string"string="bashaudit-%$YEAR%.%$MONTH%.%$DAY%")#这是为了格式化我们的带有@timestamptemplate(name="json-syslog"type="list"){constant(value="{")constant(value="\"@timestamp\":\"")property(name="timegenerated"dateFormat="rfc3339"date.inUTC="on")常量(value="\",\"host\":\"")属性(name="fromhost-ip")常量(value="\",\"严重性\":\"")属性(name="syslogseverity-text")常量(value="\",\"facility\":\"")属性(name="syslogfacility-text")常量(value="\",\"program\":\"")property(name="programname")constant(value="\",\"tag\":\"")property(name="syslogtag"format="json")constant(value="\",")property(name="$!all-json"position.from="2")#右大括号在all中-json}if($syslogfacility-text=='local6'and$syslogseverity-text=='info')然后{action(type="mmjsonparse")action(type="omelasticsearch"template="json-syslog"searchIndex="rsyslog-index"dynSearchIndex="on"server="
