Logstash入门什么是Logstash?Logstash是一个开源的数据流工具,主要做三件事:从数据源中拉取数据,对数据进行过滤和转换,将处理后的数据写入目标例如:监控某个目录下的日志文件,读取文件内容,处理数据,并写入influxdb。从kafka消费消息,处理数据,写入elasticsearch。为什么要使用Logstash?方便又轻松。假设你需要从kafka消费数据,然后写入elasticsearch。自己编码的话,还得对接kafka和elasticsearch的API。如果你使用Logstash,这部分你不需要自己实现,因为Logstash已经帮你封装好了。对应的plugin,只需要写一个这样的配置文件:input{kafka{#kafkaconsumerconfiguration}}filter{#dataprocessingconfiguration}output{elasticsearch{#elasticsearchoutputconfiguration}}然后运行logstash。Logstash提供了两百多个打包的plugin插件,分为三类:inputplugin:从哪里拉取数据filterplugin:如何处理数据outputplugin:从哪里写入数据使用logstash,只需要写一个配置文件,在配置文件中选择组合这些插件,可以轻松实现数据从输入源到输出源的实时流转。安装logstash,请参数:官方文档第一个例子假设你已经安装了logstash,并且在PATH环境变量中添加了可执行文件的路径。下面开始我们的第一个例子,编写pipeline.conf文件,内容为:input{stdin{}}filter{}output{stdout{}}这个配置文件的意思是:inputinput是stdin(标准输入)filter是空(即没有数据处理)输出为stdout(标准输出)执行命令:logstash-fpipeline.conf等待logstash启动,输入helloworld回车,会看到如下输出:{"message"=>"helloworld","@version"=>"1","@timestamp"=>2020-11-01T08:25:10.987Z,"host"=>"local"}我们输入的内容已经存在在消息字段中。当您键入其他内容时,您会看到类似的输出。至此,我们的第一个例子就完成了,按照配置文件中的定义,Logstash从stdin标准输入中读取数据,不对源数据做任何处理,然后输出到stdout标准输出。具体名词和字段event:数据在logstash中被打包成一个event事件,从input流向filter,再流向output。@timestamp:标记事件发生时间的特殊字段。@version:一个特殊字段,标记事件的版本号。message:源数据内容。@metadata:元数据,key/value形式,是否有数据取决于具体的插件,比如kafka的input插件会记录一些元数据,比如topic,consumer_group,partition,offset等.在@metadata中。tags:记录标签的字符串数组。字段引用在配置文件中,字段内容可以以[field]的形式引用,如果是字符串,可以以%{[field]}的形式引用。示例:input{kafka{#kafka配置}}filter{#参考log_level字段内容进行判断if[log_level]=="debug"{}}output{elasticsearch{#%{+yyyy.MM.dd}来自@timestampindex=>"log-%{+yyyy.MM.dd}"document_type=>"_doc"document_id=>"%{[@metadata][kafka][key]}"hosts=>["127.0.0.1:9200"]}}Plugin插件列表用好Logstash的第一步是熟悉plugin插件。只有熟悉了这些插件,才能快速高效地搭建数据管道。输入插件输入插件定义数据源,logstash从中提取数据。beats:从ElasticBeats框架接收数据。示例:input{beats{port=>5044}}dead_letter_queue:从Logstash自带的死信队列中拉取数据。目前死信队列只支持在输出为elasticsearch时记录写入为400或404的数据。示例:input{dead_letter_queue{path=>"/var/logstash/data/dead_letter_queue"start_timestamp=>"2017-04-04T23:40:37"}}elasticsearch:从elasticsearch读取搜索查询的结果。示例:input{elasticsearch{hosts=>"localhost"query=>'{"query":{"match":{"statuscode":200}}}'}}exec:定期执行shell命令并捕获其输出。示例:input{exec{command=>"ls"interval=>30}}file:从文件流式传输内容。示例:input{file{path=>["/var/log/*.log","/var/log/message"]start_position=>"beginning"}}generator:生成随机数据。示例:input{generator{count=>3lines=>["line1","line2","line3"]}}github:从githubwebhooks读取数据。graphite:接受graphite的度量指标数据。heartbeat:生成心跳信息。这样做的一般目的是测试Logstash的性能和可用性。http:Logstash接受http请求作为数据。http_poller:Logstash发起http请求,读取响应数据。示例:输入{http_poller{urls=>{test1=>"http://localhost:9200"test2=>{method=>getuser=>"AzureDiamond"password=>"hunter2"url=>"http://localhost:9200/_cluster/health"headers=>{Accept=>"application/json"}}}request_timeout=>60schedule=>{cron=>"*****UTC"}codec=>"json"metadata_target=>"http_poller_metadata"}}imap:从IMAP服务器读取邮件。jdbc:通过JDBC接口从数据库导入数据。示例:输入{jdbc{jdbc_driver_library=>"mysql-connector-java-5.1.36-bin.jar"jdbc_driver_class=>"com.mysql.jdbc.Driver"jdbc_connection_string=>"jdbc:mysql://localhost:3306/mydb"jdbc_user=>"mysql"parameters=>{"favorite_artist"=>"Beethoven"}schedule=>"*****"语句=>"SELECT*fromsongswhereartist=:favorite_artist"}}kafka:消费卡夫卡中的消息。示例:输入{kafka{bootstrap_servers=>"127.0.0.1:9092"group_id=>"consumer_group"topics=>["kafka_topic"]enable_auto_commit=>trueauto_commit_interval_ms=>5000auto_offset_reset=>"latest"decorate_events=>trueisovel=>>"read_uncommitted"max_poll_records=>1000}}rabbitmq:从RabbitMQ队列中拉取数据。redis:从redis读取数据。stdin:从标准输入读取数据。syslog:读取系统日志数据。tcp:通过TCP套接字读取数据。udp:通过udp读取数据。unix:通过UNIXsocket读取数据。websocket:通过websocket协议读取数据。OutputpluginOutputplugin定义了数据的输出位置,即logstash写入数据的位置。csv:将数据写入csv文件。elasticsearch:写入Elasticsearch。电子邮件:发送电子邮件。exec:执行命令。文件:写入磁盘文件。石墨:写入石墨。http:发送http请求。influxdb:写入InfluxDB。kafka:写给卡夫卡。mongodb:写入MongoDB。opentsdb:写入OpenTSDB。rabbitmq:写入RabbitMQ。redis:使用RPUSH写入Redis队列。sink:丢弃数据而不将其写入任何地方。syslog:将数据发送到syslog服务器。tcp:发送TCP套接字。udp:发送UDP。webhdfs:通过webhdfsRESTAPI写入HDFS。websocket:推送websocket消息。过滤器插件过滤器插件定义了如何处理数据。聚合:聚合数据。alter:修改数据。bytes:将存储大小的字符串表示形式(例如“123MB”或“5.6gb”)解析为以字节为单位的数值。cidr:检查IP地址是否在指定范围内。例子:filter{cidr{add_tag=>["testnet"]address=>["%{src_ip}","%{dst_ip}"]network=>["192.0.2.0/24"]}}cipher:用于数据加密或解密。克隆:克隆事件事件。csv:解析CSV格式的数据。date:解析字段中的日期数据。例如,匹配输入的时间戳字段,然后替换@timestamp:filter{date{match=>["timestamp","dd/MMM/yyyy:HH:mm:ssZZ"]target=>"@timestamp"}}dissect:使用%{}的形式拆分字符串,提取具体内容,比较常用。具体语法见解剖文档。drop:删除此事件。示例:filter{if[loglevel]=="debug"{drop{}}}elapsed:通过记录开始和结束时间来跟踪事件的经过时间。elasticsearch:在elasticsearch中搜索,将数据复制到当前事件中。environment:将环境变量中的数据存储到@metadata字段中。extractnumbers:提取字符串中找到的所有数字。fingerprint:根据一个或多个字段的内容创建哈希值并将其存储在新字段中。geoip:使用绑定的GeoLite2数据库添加IP地址的地理位置信息。这个插件非常有用。可以根据IP地址获取对应的国家、省份、城市、经纬度等地理位置数据。比如通过clent_ip字段获取对应的地理位置信息:>{“geo_country_name”=>“%{[geo][country_name]}”“geo_region_name”=>“%{[geo][region_name]}”“geo_city_name”=>“%{[geo][city_name]}”"geo_location"=>"%{[geo][latitude]},%{[geo][longitude]}"}remove_field=>["geo"]}}grok:使用正则表达式处理字符串,比较常用,具体语法见grok文档。http:与外部网络服务/RESTAPI集成。i18n:从字段中删除特殊字符。java_uuid:生成一个UUID。jdbc_static:从远程数据库读取数据,然后丰富事件。jdbc_streaming:执行SQL查询并将结果存储在指定字段中。json:解析json字符串生成字段和值。例子:filter{json{skip_on_invalid_json=>truesource=>"message"}}如果输入的message字段是json字符串如"{"a":1,"b":2}",那么它会增加之后parsing两个字段,字段名为a和b。kv:以key=value的形式解析数据。memcached:与外部memcached集成。metrics:logstash在内存中聚合指标数据。mutate:对该字段进行一些常规更改。示例:filter{mutate{split=>["hostname","."]add_field=>{"shortHostname"=>"%{hostname[0]}"}}mutate{rename=>["shortHostname","hostname"]}}prune:通过黑白名单的方式删除多余字段。示例:filter{prune{blacklist_names=>["method","(referrer|status)","${some}_field"]}}ruby:执行ruby代码。例子,解析http://example.com/abc?q=haha形式的查询参数q的值string:filter{ruby??{code=>"require'cgi'req=event.get('request_uri').split('?')query=''ifreq.length>1query=req[1]qh=CGI::parse(query)event.set('search_q',qh['q'][0])end"}}在ruby代码中,字段的get和set是通过event.get()和event.set()方法进行操作的。sleep:睡眠指定的时间。split:拆分字段。throttle:限流,限制事件数量。translate:根据指定的字典文件翻译数据。示例:filter{translate{field=>"[http_status]"destination=>"[http_status_description]"dictionary=>{"100"=>"Continue""101"=>"SwitchingProtocols""200"=>"OK""500"=>"ServerError"}fallback=>"I'mateapot"}}truncate:修剪字段内容中超出长度的部分。urldecode:解码urlencoded内容。useragent:解析user-agent的内容,得到设备、操作系统、版本等信息。例子:filter{#ua_device:device#ua_name:browser#ua_os:operatingsystemuseragent{lru_cache_size=>1000source=>"user_agent"target=>"ua"add_field=>{"ua_device"=>"%{[ua][device]}""ua_name"=>"%{[ua][name]}""ua_os"=>"%{[ua][os_name]}"}remove_field=>["ua"]}}uuid:生成UUID。xml:解析XML格式的数据。结束语除了本文提到的那些之外,还有很多用于Logstash的插件。如果想详细了解各个插件的使用方法,请参考官方文档。得益于Logstash的插件体系,你只需要写一个配置文件声明使用哪些插件,就可以轻松搭建数据管道。
