Logstash 配置示例
编辑Logstash 配置示例
编辑这些示例说明了如何配置 Logstash 来过滤事件、处理 Apache 日志和 syslog 消息,以及如何使用条件来控制哪些事件由过滤器或输出处理。
如果您需要有关构建 grok 模式的帮助,请尝试使用 Grok 调试器。
配置过滤器
编辑过滤器是一种内联处理机制,可灵活地对数据进行切片和处理以满足您的需求。让我们来看看一些实际应用的过滤器。以下配置文件设置了 grok
和 date
过滤器。
input { stdin { } } filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] } } output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
使用此配置运行 Logstash
bin/logstash -f logstash-filter.conf
现在,将以下行粘贴到您的终端并按 Enter 键,以便它将被 stdin 输入处理
127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"
您应该看到返回到 stdout 的内容,如下所示
{ "message" => "127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] \"GET /xampp/status.php HTTP/1.1\" 200 3891 \"http://cadenza/xampp/navi.php\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\"", "@timestamp" => "2013-12-11T08:01:45.000Z", "@version" => "1", "host" => "cadenza", "clientip" => "127.0.0.1", "ident" => "-", "auth" => "-", "timestamp" => "11/Dec/2013:00:01:45 -0800", "verb" => "GET", "request" => "/xampp/status.php", "httpversion" => "1.1", "response" => "200", "bytes" => "3891", "referrer" => "\"http://cadenza/xampp/navi.php\"", "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\"" }
如您所见,Logstash(在 grok
过滤器的帮助下)能够解析日志行(恰好是 Apache "组合日志" 格式),并将其分解为许多不同的离散信息位。一旦您开始查询和分析我们的日志数据,这将非常有用。例如,您将能够轻松地运行有关 HTTP 响应代码、IP 地址、引用等的报告。Logstash 开箱即用包含相当多的 grok 模式,因此如果您需要解析常见的日志格式,很可能已经有人为您完成了这项工作。有关更多信息,请参阅 GitHub 上的 Logstash grok 模式列表。
此示例中使用的另一个过滤器是 date
过滤器。此过滤器解析出一个时间戳,并将其用作事件的时间戳(无论您何时摄取日志数据)。您会注意到,此示例中的 @timestamp
字段设置为 2013 年 12 月 11 日,即使 Logstash 在此之后某个时间摄取事件。这在回填日志时非常方便。它可以让您告诉 Logstash“使用此值作为此事件的时间戳”。
处理 Apache 日志
编辑让我们做一些真正 有用 的事情:处理 apache2 访问日志文件!我们将从本地主机上的文件中读取输入,并使用 条件 来根据我们的需求处理事件。首先,创建一个名为类似 logstash-apache.conf 的文件,内容如下(您可以更改日志的文件路径以适合您的需求)
input { file { path => "/tmp/access_log" start_position => "beginning" } } filter { if [path] =~ "access" { mutate { replace => { "type" => "apache_access" } } grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] } } output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
然后,创建您上面配置的输入文件(在本示例中为 "/tmp/access_log"),其中包含以下日志条目(或使用您自己的 Web 服务器中的一些)
71.141.244.242 - kurt [18/May/2011:01:48:10 -0700] "GET /admin HTTP/1.1" 301 566 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3" 134.39.72.245 - - [18/May/2011:12:40:18 -0700] "GET /favicon.ico HTTP/1.1" 200 1189 "-" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; InfoPath.2; .NET4.0C; .NET4.0E)" 98.83.179.51 - - [18/May/2011:19:35:08 -0700] "GET /css/main.css HTTP/1.1" 200 1837 "http://www.safesand.com/information.htm" "Mozilla/5.0 (Windows NT 6.0; WOW64; rv:2.0.1) Gecko/20100101 Firefox/4.0.1"
现在,使用 -f 标志运行 Logstash 以传入配置文件
bin/logstash -f logstash-apache.conf
现在您应该在 Elasticsearch 中看到您的 Apache 日志数据!Logstash 打开并读取指定的输入文件,处理它遇到的每个事件。记录到此文件的任何其他行也将被捕获,由 Logstash 作为事件处理,并存储在 Elasticsearch 中。此外,它们还会被存储,并将 "type" 字段设置为 "apache_access"(这是通过输入配置中的 type ⇒ "apache_access" 行完成的)。
在此配置中,Logstash 仅监视 apache access_log,但通过更改上述配置中的一行,可以轻松地同时监视 access_log 和 error_log(实际上,是任何匹配 *log
的文件)
input { file { path => "/tmp/*_log" ...
当您重新启动 Logstash 时,它将处理错误日志和访问日志。但是,如果您检查您的数据(可能使用 elasticsearch-kopf),您会看到 access_log 被分解为离散的字段,但 error_log 没有。这是因为我们使用 grok
过滤器来匹配标准的组合 Apache 日志格式,并自动将数据拆分为单独的字段。如果我们能够根据行的格式来控制如何解析行,岂不是更好 吗?嗯,我们可以……
请注意,Logstash 没有重新处理已经在 access_log 文件中看到的事件。从文件读取时,Logstash 会保存其位置,并且仅处理添加的新行。太棒了!
使用条件
编辑您可以使用条件来控制哪些事件由过滤器或输出处理。例如,您可以根据事件出现在哪个文件中(access_log、error_log 和其他以 "log" 结尾的随机文件)来标记每个事件。
input { file { path => "/tmp/*_log" } } filter { if [path] =~ "access" { mutate { replace => { type => "apache_access" } } grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } date { match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] } } else if [path] =~ "error" { mutate { replace => { type => "apache_error" } } } else { mutate { replace => { type => "random_logs" } } } } output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
此示例使用 type
字段标记所有事件,但实际上不解析 error
或 random
文件。错误日志的类型太多了,它们应该如何标记实际上取决于您正在处理的日志。
同样,您可以使用条件将事件定向到特定的输出。例如,您可以
- 警报 Nagios 任何状态为 5xx 的 Apache 事件
- 将任何 4xx 状态记录到 Elasticsearch
- 通过 statsd 记录所有状态代码命中
要告诉 Nagios 任何状态代码为 5xx 的 HTTP 事件,您首先需要检查 type
字段的值。如果它是 Apache,则可以检查 status
字段是否包含 5xx 错误。如果是,则将其发送到 Nagios。如果它不是 5xx 错误,请检查 status
字段是否包含 4xx 错误。如果是,则将其发送到 Elasticsearch。最后,无论 status
字段包含什么,都将所有 Apache 状态代码发送到 statsd
output { if [type] == "apache" { if [status] =~ /^5\d\d/ { nagios { ... } } else if [status] =~ /^4\d\d/ { elasticsearch { ... } } statsd { increment => "apache.%{status}" } } }
处理 Syslog 消息
编辑Syslog 是 Logstash 最常见的用例之一,并且它可以很好地处理它(只要日志行大致符合 RFC3164)。Syslog 是事实上的 UNIX 网络日志记录标准,它将消息从客户端计算机发送到本地文件,或通过 rsyslog 发送到集中式日志服务器。对于此示例,您不需要正常工作的 syslog 实例;我们将从命令行模拟它,以便您了解会发生什么。
首先,让我们为 Logstash + syslog 创建一个简单的配置文件,名为 logstash-syslog.conf。
input { tcp { port => 5000 type => syslog } udp { port => 5000 type => syslog } } filter { if [type] == "syslog" { grok { match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" } add_field => [ "received_at", "%{@timestamp}" ] add_field => [ "received_from", "%{host}" ] } date { match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] } } } output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
使用此新配置运行 Logstash
bin/logstash -f logstash-syslog.conf
通常,客户端计算机将连接到端口 5000 上的 Logstash 实例并发送其消息。对于此示例,我们将只是 telnet 到 Logstash 并输入日志行(类似于我们之前在 STDIN 中输入日志行的方式)。打开另一个 shell 窗口以与 Logstash syslog 输入进行交互,并输入以下命令
telnet localhost 5000
复制并粘贴以下行作为示例。(可以随意尝试您自己的行,但请记住,如果 grok
过滤器不适合您的数据,它们可能无法解析)。
Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154] Dec 23 14:42:56 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' denied Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log) Dec 22 18:28:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'.
现在,您应该在原始 shell 中看到 Logstash 在处理和解析消息时的输出!
{ "message" => "Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)", "@timestamp" => "2013-12-23T22:30:01.000Z", "@version" => "1", "type" => "syslog", "host" => "0:0:0:0:0:0:0:1:52617", "syslog_timestamp" => "Dec 23 14:30:01", "syslog_hostname" => "louis", "syslog_program" => "CRON", "syslog_pid" => "619", "syslog_message" => "(www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)", "received_at" => "2013-12-23 22:49:22 UTC", "received_from" => "0:0:0:0:0:0:0:1:52617", "syslog_severity_code" => 5, "syslog_facility_code" => 1, "syslog_facility" => "user-level", "syslog_severity" => "notice" }