ThinkSSL🔒 一键申购 5分钟快速签发 30天无理由退款 购买更放心 广告
[TOC] ## logstash模块组成 Logstash由三个组件构造成,分别是input、filter以及output。三个组件的工作流为: | 组件 | 功能 | | --- | --- | | input | 收集数据 | | filter | 处理数据 | | output | 输出数据 | filter过滤器插件非必须,但filter插件才能体现logtash的强大。 ~~~yml input{   输入插件 } filter{   过滤器插件 } outer{   输出插件 } ~~~ add\_field、remove\_field、add\_tag、remove\_tag 是所有 Logstash 插件都有。相关使用方法看字段名就可以知道。 ### 常用启动参数 |参数 |说明 |举例| | --- | --- | --- | |-e |用命令行里的配置参数启动实例 |./bin/logstash -e ‘input {stdin {}} output {stdout {}}’| |-f |指定启动实例的配置文件 |./bin/logstash -f config/test.conf| |-t |测试配置文件的正确性 |./bin/logstash-f config/test.conf -t| |-l |指定日志文件名称 |./bin/logstash-f config/test.conf -l logs/test.log| |-w |指定filter线程数量(5) |./bin/logstash-f config/test.conf -w 8| ### 数据类型 | 类型 | 示例 | | --- | --- | | bool | debug => true | | bytes | my\_bytes => "113" # 113 bytes | | string | host => "hostname" | | number | port => 214 | | array | match =>\[ "/var/log/messages", "/var/log/\*.log" \] | | hash | options => {key1 => "value1",key2 => "value2" } | ### 条件判断 logshash支持if...else的条件判断,支持下面的操作符: 等性:==, !=, <, >, <=, >= 正则:=~, !~ 包含:in, not in 布尔:and, or, nand, xor 取反:! ## input模块 `input`[插件官方详解:](https://www.elastic.co/guide/en/logstash/current/input-plugins.html) ### input是怎么样接收日志的 logstash使用一个名为filewatch的ruby gem库来监听文件变化,读取进度记录到一个叫.sincedb的数据库文件中。 这个文件的默认路径在`plugins/inputs/file`下面。 ### 从file-stdin-beat-redis读取示例 logstash的input模块可以从各种输入类型获取数据,如文件(file),日志(log),数据库(mysql,reids)等,各个输入类型涉及的内部关键字不一样,下面以常用的几个输入类型讲解 **从文件`file`输入** ~~~ input{ file{ path => ["/var/log/messages"]   } } #其他选项 path 必须的选项,文件路径,可以多个,中括号包围 exclude 排除不想被监听的文件 close_older 已经监听的文件多久未更新就关闭监听(3600秒 1小时) ignore_older 检查文件列表时,若有文件的最后修改时间超过(84600秒 一天)就忽略 discover_interval 多久去查一次被监听的path下是否有新文件(15秒) sincedb_path 自定义 sincedb 文件到其他位置。 sincedb_write_interval 每隔多久写一次 sincedb 文件(15秒) stat_interval 被监听文件的检查频率(1秒) start_position 从什么位置开始读取文件数据,值"end(默认)/beginning" ~~~ **标准输入`stdin{}`** ```sh input{ stdin{ type => "stdin" } } ``` **filebeat输入** ~~~ input { beats { hosts => "10.0.0.11" port => "5044" } } ~~~ **redis输入** ~~~ input { redis { data_type => "list" key => "filebeat-1011" host => "10.0.0.11" port => 6379 password => 'abcd1234e' db => "2" threads => 5 codec => "json" } } ~~~ ### input通用参数 下列6个参数,是所有input插件都支持的参数 ```sh codec 输入编解码器,在数据输入之前解码数据(line) add_field 向输入数据中添加一个K/V字段 enable_metric 禁用或启用日志记录 id 设置stdin的ID,不设置随机生成,多个时stdin有用 tags 给数据数据添加标签,方便后续处理 type 向此输入的数据添加一个字段,主要用于过滤器 ``` `codec`参数,在input/output中都有,接下来专门讲解 ## CODEC编码插件 编码插件(codec)可以在logstash输入或输出时处理不同类型的数据,同时,还可以更好更方便的与其他自定义格式的数据产品共存,比如:fluent、netflow、collectd等通用数据格式的其他产品。 因此,logstash不只是一个input-->filter-->output的数据流,而且是一个input-->decode-->filter-->encode-->output的数据流。 常用的codec插件有plain,json,multiline等 ### **plain插件:** plain是最简单的编码插件,你输入什么信息,就返回什么信息 ~~~sh input { stdin { codec => plain {} } } ~~~ ### **json插件:** 有时候logstash采集的日志是JSON格式,那我们可以在input字段加入codec => json来进行解析,这样就可以根据具体内容生成字段,方便分析和储存。 如果想让logstash输出为json格式,可以在output字段加入codec=>json。 如果数据为json格式,可直接使用该插件,从而省掉filter/grok的配置,降低过滤器的cpu消耗 ~~~sh input { stdin { codec => json } } ~~~ ### **multiline插件:** 用于合并多行数据 java类程序,一条有用的数据可能输出在很多行,分析日志时得把这些日志按一行处理,multiline插件用于解决此类问题。 例:tomcat的日志catalina.out有很多调试的日志,日志都以时间戳格式"20-Apr-2016 11:29:28.535"开头,那么我们可以配置如下: ~~~sh input { file{ path => "xxx/tomcat/ogs/catalina.out" tags => ["api-core"] codec => multiline { pattern => "^\d{2}\:\d{2}\:\d{2}\.\d{3}" auto_flush_interval => 10 negate => true what => "previous" } stat_interval => "1" } } ~~~ pattern 为正则表达式匹配 negate true表示不匹配正则表达式。默认false what 如果匹配,事件属于previous(上一个)或者next(下一个)事件 auto_flush_interval 多少秒没有新数据,之前积累的多行数据转换为一个事件 >以上的配置解释:不匹配pattern时间戳格式开头的行,都归属到上一个事件中,等待10秒没有新数据产生,那么最后一个时间戳格式后的所有行数据就是最后一个事件。 ## output模块 [output官方插件地址:](https://www.elastic.co/guide/en/logstash/6.8/output-plugins.html) Logstash的output模块,相比于input模块来说是一个输出模块,output模块集成了大量的输出插件,可以输出到指定文件,也可输出到指定的网络端口,当然也可以输出数据到ES. ### output通用参数 | 参数名 | 参数说明 | | --- | --- | | workers | 设置输出线程数量 | | codec | 输出编解码器,对输出的数据编码(line) | | enable_metric | 禁用或启用日志记录| ### 输出到标准输出(stdout) 标准输出默认使用`rubydebug`,还可以使用`json` ~~~bash #使用rubydebug output { stdout { codec => rubydebug } } #使用json output { stdout { codec => json } } ~~~ ### 输出到redis ```sh output { redis{ codec => plain data_type => list db => 3 host => ["10.0.0.11:6379"] key => xxx password => xxx port => 6379 timeout => 5 } } ``` ### 输出到Elasticsearch 到es的输出参数有很多,但重要的就是hosts和index,其他参数可以参考官方文档: [es参数](https://www.elastic.co/guide/en/logstash/6.8/plugins-outputs-elasticsearch.html) ```sh output { elasticsearch { hosts => ["10.0.0.12:9200"] index => logstash-%{+YYYY.MM.dd} } } ``` ### 增加条件的判断的output 在logstash进行output时,可以根据type和tag值不同,做不同的条件判断,输出不同的数据,如: ```sh output{ if "api-01" in [tags] { elasticsearch { hosts => ["10.0.0.12:9200"] index => "pro-api-01-%{+YYYY.MM.dd}" } } if "api-02" in [tags] { elasticsearch{ hosts => ["10.0.0.12:9200"] index => "pro-api-02-%{+YYYY.MM.dd}" } } } ```