整合多个输入和输出插件

编辑

整合多个输入和输出插件

编辑

您需要管理的信息通常来自多个不同的来源,并且用例可能需要将您的数据发送到多个目的地。您的 Logstash 管道可以使用多个输入和输出插件来处理这些需求。

在本节中,您将创建一个 Logstash 管道,该管道从 Twitter Feed 和 Filebeat 客户端接收输入,然后将信息发送到 Elasticsearch 集群,并将信息直接写入文件。

从 Twitter Feed 读取数据

编辑

要添加 Twitter Feed,您可以使用 twitter 输入插件。要配置此插件,您需要一些信息

  • 消费者密钥,用于唯一标识您的 Twitter 应用。
  • 消费者密钥秘钥,充当您 Twitter 应用的密码。
  • 一个或多个关键词,用于在传入的 Feed 中搜索。示例中使用“cloud”作为关键词,但您可以使用任何您想要的关键词。
  • OAuth 令牌,用于标识使用此应用的 Twitter 帐户。
  • OAuth 令牌秘钥,充当 Twitter 帐户的密码。

访问 https://dev.twitter.com/apps 设置 Twitter 帐户并生成您的消费者密钥和秘钥,以及您的访问令牌和秘钥。如果您不确定如何生成这些密钥,请参阅 twitter 输入插件的文档。

就像您之前在处理 使用 Logstash 解析日志 时所做的那样,创建一个配置文件(名为 second-pipeline.conf),其中包含配置管道的框架。如果需要,您可以重用之前创建的文件,但请确保在运行 Logstash 时传入正确的配置文件名。

将以下几行添加到 second-pipeline.conf 文件的 input 部分,用您的值替换此处显示的占位符值

    twitter {
        consumer_key => "enter_your_consumer_key_here"
        consumer_secret => "enter_your_secret_here"
        keywords => ["cloud"]
        oauth_token => "enter_your_access_token_here"
        oauth_token_secret => "enter_your_access_token_secret_here"
    }

配置 Filebeat 将日志行发送到 Logstash

编辑

正如您之前在 配置 Filebeat 将日志行发送到 Logstash 中了解到的那样,Filebeat 客户端是一个轻量级、资源友好的工具,它从服务器上的文件收集日志,并将这些日志转发到您的 Logstash 实例进行处理。

安装 Filebeat 后,您需要对其进行配置。打开位于 Filebeat 安装目录中的 filebeat.yml 文件,并将内容替换为以下几行。确保 paths 指向您的 syslog

filebeat.inputs:
- type: log
  paths:
    - /var/log/*.log 
  fields:
    type: syslog 
output.logstash:
  hosts: ["localhost:5044"]

Filebeat 处理的文件或文件的绝对路径。

向事件添加名为 type 的字段,其值为 syslog

保存您的更改。

为了简化配置,您不会像在实际场景中那样指定 TLS/SSL 设置。

通过将以下几行添加到 second-pipeline.conf 文件的 input 部分,配置您的 Logstash 实例以使用 Filebeat 输入插件

    beats {
        port => "5044"
    }

将 Logstash 数据写入文件

编辑

您可以使用 file 输出插件配置您的 Logstash 管道以直接将数据写入文件。

通过将以下几行添加到 second-pipeline.conf 文件的 output 部分,配置您的 Logstash 实例以使用 file 输出插件

    file {
        path => "/path/to/target/file"
    }

写入多个 Elasticsearch 节点

编辑

写入多个 Elasticsearch 节点可以减轻给定 Elasticsearch 节点的资源需求,并且在特定节点不可用时提供冗余的集群入口点。

要配置您的 Logstash 实例以写入多个 Elasticsearch 节点,请编辑 second-pipeline.conf 文件的 output 部分,使其读取

output {
    elasticsearch {
        hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
    }
}

在 host 行中使用 Elasticsearch 集群中三个非主节点的 IP 地址。当 hosts 参数列出多个 IP 地址时,Logstash 会在地址列表中进行负载均衡请求。另请注意,Elasticsearch 的默认端口为 9200,可以在上面的配置中省略。

测试管道
编辑

此时,您的 second-pipeline.conf 文件如下所示

input {
    twitter {
        consumer_key => "enter_your_consumer_key_here"
        consumer_secret => "enter_your_secret_here"
        keywords => ["cloud"]
        oauth_token => "enter_your_access_token_here"
        oauth_token_secret => "enter_your_access_token_secret_here"
    }
    beats {
        port => "5044"
    }
}
output {
    elasticsearch {
        hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
    }
    file {
        path => "/path/to/target/file"
    }
}

Logstash 正在使用您配置的 Twitter Feed 消费数据,从 Filebeat 接收数据,并将这些信息索引到 Elasticsearch 集群的三个节点中,并写入文件。

在数据源计算机上,使用以下命令运行 Filebeat

sudo ./filebeat -e -c filebeat.yml -d "publish"

Filebeat 将尝试在 5044 端口上连接。在 Logstash 启动并具有活动的 Beats 插件之前,该端口上不会有任何响应,因此,您看到的关于无法在该端口上连接的任何消息目前都是正常的。

要验证您的配置,请运行以下命令

bin/logstash -f second-pipeline.conf --config.test_and_exit

--config.test_and_exit 选项会解析您的配置文件并报告任何错误。当配置文件通过配置测试后,使用以下命令启动 Logstash

bin/logstash -f second-pipeline.conf

使用 grep 实用程序在目标文件中搜索以验证信息是否存在

grep syslog /path/to/target/file

运行 Elasticsearch 查询以在 Elasticsearch 集群中查找相同的信息

curl -XGET 'localhost:9200/logstash-$DATE/_search?pretty&q=fields.type:syslog'

将 $DATE 替换为当前日期,格式为 YYYY.MM.DD。

要查看来自 Twitter Feed 的数据,请尝试此查询

curl -XGET 'https://127.0.0.1:9200/logstash-$DATE/_search?pretty&q=client:iphone'

同样,请记住将 $DATE 替换为当前日期,格式为 YYYY.MM.DD。