将多个输入和输出插件组合在一起

编辑

将多个输入和输出插件组合在一起

编辑

您需要管理的信息通常来自多个不同的来源,并且用例可能需要将数据发送到多个目的地。您的 Logstash 管道可以使用多个输入和输出插件来处理这些需求。

在本节中,您将创建一个 Logstash 管道,该管道从 Twitter 源和 Filebeat 客户端获取输入,然后将信息发送到 Elasticsearch 集群,并将信息直接写入文件。

从 Twitter 源读取

编辑

要添加 Twitter 源,您可以使用 twitter 输入插件。要配置插件,您需要以下几条信息

  • 一个消费者密钥,用于唯一标识您的 Twitter 应用程序。
  • 一个消费者密钥,用作您的 Twitter 应用程序的密码。
  • 在传入源中搜索的一个或多个关键字。示例显示使用“cloud”作为关键字,但您可以使用任何您想要的关键字。
  • 一个 oauth 令牌,用于标识使用此应用程序的 Twitter 帐户。
  • 一个 oauth 令牌密钥,用作 Twitter 帐户的密码。

访问 https://dev.twitter.com/apps 以设置 Twitter 帐户并生成您的消费者密钥和密钥,以及您的访问令牌和密钥。如果您不确定如何生成这些密钥,请参阅 twitter 输入插件的文档。

就像您之前在 使用 Logstash 解析日志 中所做的一样,创建一个配置文件(名为 second-pipeline.conf),其中包含配置管道的框架。如果需要,您可以重用您之前创建的文件,但请确保在运行 Logstash 时传入正确的配置文件名。

将以下行添加到 second-pipeline.conf 文件的 input 部分,并替换此处显示的占位符值

    twitter {
        consumer_key => "enter_your_consumer_key_here"
        consumer_secret => "enter_your_secret_here"
        keywords => ["cloud"]
        oauth_token => "enter_your_access_token_here"
        oauth_token_secret => "enter_your_access_token_secret_here"
    }

配置 Filebeat 将日志行发送到 Logstash

编辑

正如您在 配置 Filebeat 将日志行发送到 Logstash 中所了解到的,Filebeat 客户端是一个轻量级、资源友好的工具,可以从服务器上的文件中收集日志,并将这些日志转发到您的 Logstash 实例进行处理。

安装 Filebeat 后,您需要对其进行配置。打开位于 Filebeat 安装目录中的 filebeat.yml 文件,并将内容替换为以下行。确保 paths 指向您的 syslog

filebeat.inputs:
- type: log
  paths:
    - /var/log/*.log 
  fields:
    type: syslog 
output.logstash:
  hosts: ["localhost:5044"]

Filebeat 处理的文件或文件的绝对路径。

向事件添加一个名为 type 且值为 syslog 的字段。

保存您的更改。

为了保持配置简单,您不会像在真实场景中那样指定 TLS/SSL 设置。

通过将以下行添加到 second-pipeline.conf 文件的 input 部分,配置您的 Logstash 实例以使用 Filebeat 输入插件

    beats {
        port => "5044"
    }

将 Logstash 数据写入文件

编辑

您可以配置 Logstash 管道以使用 file 输出插件直接将数据写入文件。

通过将以下行添加到 second-pipeline.conf 文件的 output 部分,配置您的 Logstash 实例以使用 file 输出插件

    file {
        path => "/path/to/target/file"
    }

写入多个 Elasticsearch 节点

编辑

写入多个 Elasticsearch 节点可以减轻给定 Elasticsearch 节点的资源需求,并在特定节点不可用时提供冗余的集群入口点。

要配置您的 Logstash 实例以写入多个 Elasticsearch 节点,请编辑 second-pipeline.conf 文件的 output 部分,使其读取

output {
    elasticsearch {
        hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
    }
}

在主机行中使用 Elasticsearch 集群中三个非主节点的 IP 地址。当 hosts 参数列出多个 IP 地址时,Logstash 会在地址列表中进行负载均衡。另请注意,Elasticsearch 的默认端口是 9200,并且可以在上面的配置中省略。

测试管道
编辑

此时,您的 second-pipeline.conf 文件如下所示

input {
    twitter {
        consumer_key => "enter_your_consumer_key_here"
        consumer_secret => "enter_your_secret_here"
        keywords => ["cloud"]
        oauth_token => "enter_your_access_token_here"
        oauth_token_secret => "enter_your_access_token_secret_here"
    }
    beats {
        port => "5044"
    }
}
output {
    elasticsearch {
        hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
    }
    file {
        path => "/path/to/target/file"
    }
}

Logstash 正在使用您配置的 Twitter 源中的数据,接收来自 Filebeat 的数据,并将此信息索引到 Elasticsearch 集群中的三个节点,并写入文件。

在数据源机器上,运行带有以下命令的 Filebeat

sudo ./filebeat -e -c filebeat.yml -d "publish"

Filebeat 将尝试在端口 5044 上进行连接。在 Logstash 使用活动的 Beats 插件启动之前,该端口上不会有任何响应,因此您看到的任何关于无法在该端口上连接的消息都是正常的。

要验证您的配置,请运行以下命令

bin/logstash -f second-pipeline.conf --config.test_and_exit

--config.test_and_exit 选项解析您的配置文件并报告任何错误。当配置文件通过配置测试时,使用以下命令启动 Logstash

bin/logstash -f second-pipeline.conf

使用 grep 实用程序在目标文件中搜索以验证信息是否存在

grep syslog /path/to/target/file

运行 Elasticsearch 查询以在 Elasticsearch 集群中查找相同的信息

curl -XGET 'localhost:9200/logstash-$DATE/_search?pretty&q=fields.type:syslog'

将 $DATE 替换为当前日期,格式为 YYYY.MM.DD。

要查看来自 Twitter 源的数据,请尝试此查询

curl -XGET 'https://127.0.0.1:9200/logstash-$DATE/_search?pretty&q=client:iphone'

同样,请记住将 $DATE 替换为当前日期,格式为 YYYY.MM.DD。