企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] 以下示例说明如何配置LogStash来过滤事件,对Apache日志和syslog进行处理,并且使用条件式来控制被filter或output处理的事件. > <font color=#1E90FF size=4>TIP</font>:如果你需要构建grok匹配的帮助,试试我们的[Grok Debugger](https://www.elastic.co/guide/en/kibana/6.5/xpack-grokdebugger.html).Grok Debugger是基础授权下的X-Pack的一个特性所以是可以免费使用的. ### 配置Filters Filters是一种在线处理机制,可以灵活地对数据进行切片和切割,以满足您的需求.让我们看一些正在使用的filters.下面的配置文件设置了`grok`和`date`filters. ```ruby input { stdin { } } filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] } } output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } } ``` 使用此配置文件运行Logstash: ```shell bin/logstash -f logstash-filter.conf ``` 现在,将下面的内容粘贴到你的终端上,并按下回车stdin input进行处理: ```ruby 127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0" ``` 你会看到类似下面的输出: ``` { "message" => "127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] \"GET /xampp/status.php HTTP/1.1\" 200 3891 \"http://cadenza/xampp/navi.php\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\"", "@timestamp" => "2013-12-11T08:01:45.000Z", "@version" => "1", "host" => "cadenza", "clientip" => "127.0.0.1", "ident" => "-", "auth" => "-", "timestamp" => "11/Dec/2013:00:01:45 -0800", "verb" => "GET", "request" => "/xampp/status.php", "httpversion" => "1.1", "response" => "200", "bytes" => "3891", "referrer" => "\"http://cadenza/xampp/navi.php\"", "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\"" } ``` 如你所见,Logstash(在`grok`filter的帮助下)可以解析日志行(日志刚好是Apache "combined log"格式的)并将其分解为分散的信息位.这是非常有用的,一旦你开始查询和分析我们的日志数据.比如,你可以更加轻松的管理关于HTTP响应码,IP地址,referres等信息. Logstash中包含了相当多的开箱即用的grok匹配.因此,如果你需要解析一个通用的日志格式,很有可能有人已经为你做了这项工作。更多信息,可以查看我们在GitHub上的[Logstash grok patterns](https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns)列表. 另一个在本示例中使用的filter是`date`filter.这个filter解析出一个timestamp(时间戳)并将其作为事件的时间戳(不论你何时获取的你的日志数据).你可能注意到了在这个示例中`@timestamp`字段被设置为2013.12.11尽管Logstash是在这之后的某个事件点接收的这个事件. ### 处理Apache日志 让我们做一些真正有用的事情:处理Apache2的访问日志文件!我们将从本地读取一个文件到input,并且使用[条件式](https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#conditionals)来根据我们的需要处理事件.首先,创建一个名字类似于logstash-apache.conf的文件,内容如下(你可以根据自己的情况修改日志文件的路径): ```ruby input { file { path => "/tmp/access_log" start_position => "beginning" } } filter { if [path] =~ "access" { mutate { replace => { "type" => "apache_access" } } grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] } } output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } } ``` 然后,创建你在上述配置中的input文件(示例中为"/tmp/access_log")并使用下面的日志内容(或者你可以使用来自自己网站的内容): ```js 71.141.244.242 - kurt [18/May/2011:01:48:10 -0700] "GET /admin HTTP/1.1" 301 566 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3" 134.39.72.245 - - [18/May/2011:12:40:18 -0700] "GET /favicon.ico HTTP/1.1" 200 1189 "-" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; InfoPath.2; .NET4.0C; .NET4.0E)" 98.83.179.51 - - [18/May/2011:19:35:08 -0700] "GET /css/main.css HTTP/1.1" 200 1837 "http://www.safesand.com/information.htm" "Mozilla/5.0 (Windows NT 6.0; WOW64; rv:2.0.1) Gecko/20100101 Firefox/4.0.1" ``` 现在,用-f选项来使用指定的配置文件启动Logstash ```shell bin/logstash -f logstash-apache.conf ``` 现在你应该可以在Elasticsearch中看到你的Apache日志数据.Logstash打开并读取指定的input文件,处理每一个遇到的事件.所有向此文件追加的行也会被捕获,被Logstash作为事件处理,然后存储在Elasticsearch中.一个额外的好处是"type"字段被设置为"apache_access"(这是由 type => "apache_access"这一行决定的). 这个配置文件中,Logstash只监视了Apache访问日志,不过你可以很轻松的监控访问日志和错误日志(事实上,所有可以匹配`*log`的文件),通过修改上述配置中的一行: ```ruby input { file { path => "/tmp/*_log" } } ``` 当你重启Logstash,它将同时处理错误日志和访问日志.不过,如果你检查你的数据(使用Elasticsearch-kopf之类的),你可能会发现access_log被解析成了多个字段,但是error_log不是.这是因为我们使用的`grok`filter只匹配了标准的apache日志格式,并自动将其分割成不同的字段.如果我们能根据每一行的格式来控制其解析方式不是更好吗?当然,我们可以... 注意Logstash不会重新处理已经看到过的access_log文件中的时间.当从一个文件读取的时候,Logstash会保存其位置并且只处理新增的行.Neat! ### 使用条件式 你可以使用条件式来控制被filter或output处理的事件.比如,你可以根据事件出现在的文件对其进行标记(access_log, error_log, 或者随便其他以 "log"结尾的文件). ```ruby input { file { path => "/tmp/*_log" } } filter { if [path] =~ "access" { mutate { replace => { type => "apache_access" } } grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] } } else if [path] =~ "error" { mutate { replace => { type => "apache_error" } } } else { mutate { replace => { type => "random_logs" } } } } output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } } ``` 这个示例使用`type`字段标记了所有事件,但并没有真正解析`error`或`random`文件.错误日志的类型如此之多,如何给它们标记取决于你使用的日志文件. 同样的,你可以使用条件式来将时间重定向到指定的outputs.比如,你可以: + 将apache状态码为5xx的时间发送给nagios报警 + 将状态码4xx的记录到Elasticsearch + 将所有的信息记录到statsd 要通知nagios关于http 5xx的事件,首先需要检查`type`字段.如果是apache,就看它的状态码是否包含5xx错误.如果是,发送给nagios.如果不是5xx错误,检查是够包含4xx错误,如果是,发送给Elasticsearch.最后将所有的apache状态码发送给statsd. ```ruby output { if [type] == "apache" { if [status] =~ /^5\d\d/ { nagios { ... } } else if [status] =~ /^4\d\d/ { elasticsearch { ... } } statsd { increment => "apache.%{status}" } } } ``` ### 处理Syslog信息 处理Syslog是Logstash最常见的用例之一.并且处理的非常好(只要日志大致符合RFC3164).Syslog是事实上的UNIX网络日志记录标准,从客户端机器向本地文件发送消息,或者通过rsyslog发送给集中式的日志服务器.对于这个例子,你不需要一个syslog实例,我们将从命令行中模拟它,以便您能够了解所发生的事情。 首先,我们为Logstash+syslog创建一个简单的配置文件,叫做logstash-syslog.conf. ```ruby input { tcp { port => 5000 type => syslog } udp { port => 5000 type => syslog } } filter { if [type] == "syslog" { grok { match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" } add_field => [ "received_at", "%{@timestamp}" ] add_field => [ "received_from", "%{host}" ] } date { match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] } } } output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } } ``` 使用这个新的配置文件来运行Logstash ```shell bin/logstash -f logstash-syslog.conf ``` 通常,客户端机器将连接到端口5000上的Logstash实例并发送其消息。在这个例子中,我们telnet到Logstash并输入一个日志行(类似于我们之前在STDIN中输入日志行).打开另一个shell窗口,与Logstash Syslog输入进行交互,并输入以下命令: ```shell telnet localhost 5000 ``` 粘贴下面的示例内容(你可以使用自己的内容,但要注意,`grok`filter可能不能正确解析你的数据) ```ruby Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154] Dec 23 14:42:56 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' denied Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log) Dec 22 18:28:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'. ``` 现在你应该可以在你之前的Logstash窗口看到处理并解析过的信息. ```ruby { "message" => "Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)", "@timestamp" => "2013-12-23T22:30:01.000Z", "@version" => "1", "type" => "syslog", "host" => "0:0:0:0:0:0:0:1:52617", "syslog_timestamp" => "Dec 23 14:30:01", "syslog_hostname" => "louis", "syslog_program" => "CRON", "syslog_pid" => "619", "syslog_message" => "(www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)", "received_at" => "2013-12-23 22:49:22 UTC", "received_from" => "0:0:0:0:0:0:0:1:52617", "syslog_severity_code" => 5, "syslog_facility_code" => 1, "syslog_facility" => "user-level", "syslog_severity" => "notice" } ```