将多个输入和输出插件拼接在一起

edit

将多个输入和输出插件拼接在一起edit

您需要管理的信息通常来自多个不同的来源,并且用例可能需要多个数据目的地。您的 Logstash 管道可以使用多个输入和输出插件来处理这些需求。

在本节中,您将创建一个 Logstash 管道,该管道从 Twitter 提要和 Filebeat 客户端获取输入,然后将信息发送到 Elasticsearch 集群,并将信息直接写入文件。

从 Twitter 提要读取edit

要添加 Twitter 提要,您需要使用 twitter 输入插件。要配置插件,您需要几部分信息

  • 一个消费者密钥,它唯一标识您的 Twitter 应用程序。
  • 一个消费者密钥,它充当您 Twitter 应用程序的密码。
  • 一个或多个关键字,用于在传入提要中搜索。示例显示使用“cloud”作为关键字,但您可以使用任何您想要的关键字。
  • 一个oauth 令牌,它标识使用此应用程序的 Twitter 帐户。
  • 一个oauth 令牌密钥,它充当 Twitter 帐户的密码。

访问 https://dev.twitter.com/apps 设置 Twitter 帐户并生成您的消费者密钥和密钥,以及您的访问令牌和密钥。如果您不确定如何生成这些密钥,请参阅 twitter 输入插件的文档。

就像您之前在 使用 Logstash 解析日志 中所做的那样,创建一个配置文件(称为 second-pipeline.conf),其中包含配置管道的骨架。如果您愿意,您可以重用您之前创建的文件,但请确保在运行 Logstash 时传入正确的配置文件名。

将以下行添加到 second-pipeline.conf 文件的 input 部分,将您的值替换为此处显示的占位符值

    twitter {
        consumer_key => "enter_your_consumer_key_here"
        consumer_secret => "enter_your_secret_here"
        keywords => ["cloud"]
        oauth_token => "enter_your_access_token_here"
        oauth_token_secret => "enter_your_access_token_secret_here"
    }

配置 Filebeat 将日志行发送到 Logstashedit

正如您之前在 配置 Filebeat 将日志行发送到 Logstash 中了解到的那样,Filebeat 客户端是一个轻量级、资源友好的工具,它从服务器上的文件收集日志并将这些日志转发到您的 Logstash 实例进行处理。

安装 Filebeat 后,您需要对其进行配置。打开位于 Filebeat 安装目录中的 filebeat.yml 文件,并将内容替换为以下行。确保 paths 指向您的 syslog

filebeat.inputs:
- type: log
  paths:
    - /var/log/*.log 
  fields:
    type: syslog 
output.logstash:
  hosts: ["localhost:5044"]

Filebeat 处理的文件或文件的绝对路径。

向事件添加一个名为 type 的字段,其值为 syslog

保存您的更改。

为了保持配置简单,您不会像在现实世界场景中那样指定 TLS/SSL 设置。

通过将以下行添加到 second-pipeline.conf 文件的 input 部分,配置您的 Logstash 实例以使用 Filebeat 输入插件

    beats {
        port => "5044"
    }

将 Logstash 数据写入文件edit

您可以使用 file 输出插件配置您的 Logstash 管道以将数据直接写入文件。

通过将以下行添加到 second-pipeline.conf 文件的 output 部分,配置您的 Logstash 实例以使用 file 输出插件

    file {
        path => "/path/to/target/file"
    }

写入多个 Elasticsearch 节点edit

写入多个 Elasticsearch 节点可以减轻给定 Elasticsearch 节点的资源需求,并在特定节点不可用时提供冗余的集群入口点。

要配置您的 Logstash 实例以写入多个 Elasticsearch 节点,请编辑 second-pipeline.conf 文件的 output 部分以读取

output {
    elasticsearch {
        hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
    }
}

在 host 行中使用 Elasticsearch 集群中三个非主节点的 IP 地址。当 hosts 参数列出多个 IP 地址时,Logstash 会在地址列表中负载均衡请求。还要注意,Elasticsearch 的默认端口是 9200,可以在上面的配置中省略。

测试管道edit

此时,您的 second-pipeline.conf 文件如下所示

input {
    twitter {
        consumer_key => "enter_your_consumer_key_here"
        consumer_secret => "enter_your_secret_here"
        keywords => ["cloud"]
        oauth_token => "enter_your_access_token_here"
        oauth_token_secret => "enter_your_access_token_secret_here"
    }
    beats {
        port => "5044"
    }
}
output {
    elasticsearch {
        hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
    }
    file {
        path => "/path/to/target/file"
    }
}

Logstash 正在从您配置的 Twitter 提要中获取数据,从 Filebeat 接收数据,并将这些信息索引到 Elasticsearch 集群中的三个节点,以及写入文件。

在数据源机器上,使用以下命令运行 Filebeat

sudo ./filebeat -e -c filebeat.yml -d "publish"

Filebeat 将尝试连接到端口 5044。在 Logstash 使用活动 Beats 插件启动之前,该端口上不会有任何响应,因此您看到的任何关于无法连接到该端口的消息对于现在来说都是正常的。

要验证您的配置,请运行以下命令

bin/logstash -f second-pipeline.conf --config.test_and_exit

--config.test_and_exit 选项将解析您的配置文件并报告任何错误。当配置文件通过配置测试时,使用以下命令启动 Logstash

bin/logstash -f second-pipeline.conf

使用 grep 实用程序在目标文件中搜索以验证信息是否存在

grep syslog /path/to/target/file

运行 Elasticsearch 查询以在 Elasticsearch 集群中查找相同的信息

curl -XGET 'localhost:9200/logstash-$DATE/_search?pretty&q=fields.type:syslog'

将 $DATE 替换为当前日期,格式为 YYYY.MM.DD。

要查看来自 Twitter 提要的数据,请尝试以下查询

curl -XGET 'https://127.0.0.1:9200/logstash-$DATE/_search?pretty&q=client:iphone'

同样,请记住将 $DATE 替换为当前日期,格式为 YYYY.MM.DD。