简介
分布式系统中高熵日志的普遍存在,大大增加了 PII(个人身份信息)泄露到日志中的风险,这可能会导致安全和合规性问题。这篇分为两部分的博客深入探讨了使用 Elastic Stack 识别和管理此问题的关键任务。我们将探讨使用 NLP(自然语言处理)和模式匹配来检测、评估以及在可行的情况下编辑正在提取到 Elasticsearch 中的日志中的 PII。
在这篇博客的第 1 部分中,我们将介绍以下内容:
- 回顾我们可用于管理日志中 PII 的技术和工具
- 了解 NLP / NER 在 PII 检测中的作用
- 构建可组合的处理管道来检测和评估 PII
- 采样日志并通过 NER 模型运行
- 评估 NER 模型的结果
在这篇博客的第 2 部分中,我们将介绍以下内容:
- 使用 NER 和编辑处理器编辑 PII
- 应用字段级安全性来控制对未编辑数据的访问
- 增强仪表板和警报
- 生产注意事项和扩展
- 如何在传入或历史数据上运行这些进程
这是我们在 2 篇博客中构建的整体流程
本练习的所有代码都可以在以下位置找到:https://github.com/bvader/elastic-pii。
工具和技术
在本练习中,我们将使用四种通用功能。
- 命名实体识别检测 (NER)
- 模式匹配检测
- 日志采样
- 作为可组合处理的提取管道
命名实体识别 (NER) 检测
NER 是自然语言处理 (NLP) 的一个子任务,它涉及将非结构化文本中的命名实体识别并分类为预定义的类别,例如:
- 人物:个人姓名,包括名人、政治家和历史人物。
- 组织:公司、机构和组织的名称。
- 地点:地理位置,包括城市、国家和地标。
- 事件:事件名称,包括会议、会面和节日。
对于我们的 PII 用例,我们将选择可以从 Hugging Face 下载并作为训练模型加载到 Elasticsearch 中的基本 BERT NER 模型 bert-base-NER。
重要提示:NER/NLP 模型是 CPU 密集型的,并且大规模运行成本高昂;因此,我们将采用一种采样技术来了解日志中的风险,而无需将完整的日志量发送到 NER 模型中。我们将在博客的第 2 部分中讨论 NER 模型的性能和扩展。
模式匹配检测
除了使用 NER 之外,正则表达式模式匹配是根据常见模式检测和编辑 PII 的强大工具。Elasticsearch 编辑处理器专为此用例而构建。
日志采样
考虑到 NER 的性能影响,以及我们可能会将大量日志提取到 Elasticsearch 中这一事实,对传入日志进行采样是有意义的。我们将构建一个简单的日志采样器来实现这一点。
作为可组合处理的提取管道
我们将创建多个管道,每个管道专注于特定功能,并创建一个主提取管道来协调整个过程。
构建处理流程
日志采样 + 可组合提取管道
我们要做的第一件事是设置一个采样器来采样我们的日志。此提取管道只需采用 0(无日志)到 10000(所有日志)之间的采样率,该采样率允许低至约 0.01% 的采样率,并使用以下标记已采样的日志:
该命令应从 Kibana -> 开发工具运行
可以在此处找到以下三个代码部分的代码。
日志采样器管道代码 - 单击以打开/关闭
# logs-sampler pipeline - part 1
DELETE _ingest/pipeline/logs-sampler
PUT _ingest/pipeline/logs-sampler
{
"processors": [
{
"set": {
"description": "Set Sampling Rate 0 None 10000 all allows for 0.01% precision",
"if": "ctx.sample.sample_rate == null",
"field": "sample.sample_rate",
"value": 10000
}
},
{
"set": {
"description": "Determine if keeping unsampled docs",
"if": "ctx.sample.keep_unsampled == null",
"field": "sample.keep_unsampled",
"value": true
}
},
{
"set": {
"field": "sample.sampled",
"value": false
}
},
{
"script": {
"source": """ Random r = new Random();
ctx.sample.random = r.nextInt(params.max); """,
"params": {
"max": 10000
}
}
},
{
"set": {
"if": "ctx.sample.random <= ctx.sample.sample_rate",
"field": "sample.sampled",
"value": true
}
},
{
"drop": {
"description": "Drop unsampled document if applicable",
"if": "ctx.sample.keep_unsampled == false && ctx.sample.sampled == false"
}
}
]
}
现在,让我们测试一下日志采样器。我们将构建可组合管道的第一部分。我们将把日志发送到 logs-generic-default 数据流。考虑到这一点,我们将创建
接下来,我们将创建
process-pii 管道代码 - 单击以打开/关闭
# Process PII pipeline - part 1
DELETE _ingest/pipeline/process-pii
PUT _ingest/pipeline/process-pii
{
"processors": [
{
"set": {
"description": "Set true if enabling sampling, otherwise false",
"field": "sample.enabled",
"value": true
}
},
{
"set": {
"description": "Set Sampling Rate 0 None 10000 all allows for 0.01% precision",
"field": "sample.sample_rate",
"value": 1000
}
},
{
"set": {
"description": "Set to false if you want to drop unsampled data, handy for reindexing hostorical data",
"field": "sample.keep_unsampled",
"value": true
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == true",
"name": "logs-sampler",
"ignore_failure": true
}
}
]
}
最后,我们创建日志
logs@custom 管道代码 - 单击以打开/关闭
# logs@custom pipeline - part 1
DELETE _ingest/pipeline/logs@custom
PUT _ingest/pipeline/logs@custom
{
"processors": [
{
"set": {
"field": "pipelinetoplevel",
"value": "logs@custom"
}
},
{
"set": {
"field": "pipelinetoplevelinfo",
"value": "{{{data_stream.dataset}}}"
}
},
{
"pipeline": {
"description" : "Call the process_pii pipeline on the correct dataset",
"if": "ctx?.data_stream?.dataset == 'pii'",
"name": "process-pii"
}
}
]
}
现在,让我们测试一下采样是否正在工作。
按照此处描述的方式加载数据数据加载附录。让我们先使用示例数据,稍后在本博客结尾处我们将讨论如何使用传入或历史日志进行测试。
如果您查看可观察性 -> 日志 -> 日志浏览器,并使用 KQL 过滤器
此时,我们有一个“采样”日志的可组合提取管道。作为奖励,您也可以将此日志采样器用于您的任何其他用例。
加载、配置和执行 NER 管道
加载 NER 模型
您将需要一个机器学习节点来运行 NER 模型。在本练习中,我们正在使用在 AWS 上弹性云托管部署,其采用的是CPU 优化 (ARM)架构。NER 推理将在机器学习 AWS c5d 节点上运行。将来会有 GPU 选项,但今天我们将坚持使用 CPU 架构。
本练习将使用单个具有 8 GB RAM 和 4.2 个 vCPU(最高可达 8.4 个 vCPU)的 c5d
有关上载、配置和部署模型的完整说明,请参阅有关如何将 NLP 训练模型导入 Elasticsearch 的官方文档。
获取模型的最快方法是使用 Eland Docker 方法。
以下命令会将模型加载到 Elasticsearch 中,但不会启动它。我们将在下一步执行此操作。
docker run -it --rm --network host docker.elastic.co/eland/eland \
eland_import_hub_model \
--url https://mydeployment.es.us-west-1.aws.found.io:443/ \
-u elastic -p password \
--hub-model-id dslim/bert-base-NER --task-type ner
部署和启动 NER 模型
一般来说,为了提高提取性能,请通过向部署添加更多分配来提高吞吐量。为了提高搜索速度,请增加每个分配的线程数。
为了扩展提取,我们将专注于扩展已部署模型的分配。有关此主题的更多信息,请此处提供。分配数量必须小于每个节点可用的已分配处理器(核心,而不是 vCPU)。
要部署和启动 NER 模型。我们将使用启动训练模型部署 API 来执行此操作
我们将配置以下内容
- 4 个分配以允许更多并行提取
- 每个分配 1 个线程
- 0 个 Byes 缓存,因为我们预计缓存命中率较低
- 8192 队列
# Start the model with 4 Allocators x 1 Thread, no cache, and 8192 queue
POST _ml/trained_models/dslim__bert-base-ner/deployment/_start?cache_size=0b&number_of_allocations=4&threads_per_allocation=1&queue_capacity=8192
您应该会收到类似这样的响应。
{
"assignment": {
"task_parameters": {
"model_id": "dslim__bert-base-ner",
"deployment_id": "dslim__bert-base-ner",
"model_bytes": 430974836,
"threads_per_allocation": 1,
"number_of_allocations": 4,
"queue_capacity": 8192,
"cache_size": "0",
"priority": "normal",
"per_deployment_memory_bytes": 430914596,
"per_allocation_memory_bytes": 629366952
},
...
"assignment_state": "started",
"start_time": "2024-09-23T21:39:18.476066615Z",
"max_assigned_allocations": 4
}
}
NER 模型已部署并启动,可以使用。
以下提取管道通过 inference 处理器实现 NER 模型。
这里有大量的代码,但现在只有两项是相关的。其余代码是条件逻辑,用于驱动一些将在未来更仔细地研究的其他特定行为。
-
推理处理器通过 ID 调用先前加载的 NER 模型,并将待分析的文本传递给它。在本例中,待分析的文本是消息字段,也就是我们要传递给 NER 模型进行 PII 分析的 text_field。
-
脚本处理器循环遍历消息字段,并使用 NER 模型生成的数据,将识别出的 PII 替换为已编辑的占位符。这看起来比实际情况复杂,因为它只是简单地循环遍历 ML 预测数组,并在消息字符串中使用常量替换它们,并将结果存储在新字段中。
redact.message。我们将在接下来的步骤中更仔细地研究这一点。
NER PII 管道
logs-ner-pii-processor 管道代码 - 单击以打开/关闭
# NER Pipeline
DELETE _ingest/pipeline/logs-ner-pii-processor
PUT _ingest/pipeline/logs-ner-pii-processor
{
"processors": [
{
"set": {
"description": "Set to true to actually redact, false will run processors but leave original",
"field": "redact.enable",
"value": true
}
},
{
"set": {
"description": "Set to true to keep ml results for debugging",
"field": "redact.ner.keep_result",
"value": true
}
},
{
"set": {
"description": "Set to PER, LOC, ORG to skip, or NONE to not drop any replacement",
"field": "redact.ner.skip_entity",
"value": "NONE"
}
},
{
"set": {
"description": "Set to PER, LOC, ORG to skip, or NONE to not drop any replacement",
"field": "redact.ner.minimum_score",
"value": 0
}
},
{
"set": {
"if": "ctx?.redact?.message == null",
"field": "redact.message",
"copy_from": "message"
}
},
{
"set": {
"field": "redact.ner.successful",
"value": true
}
},
{
"set": {
"field": "redact.ner.found",
"value": false
}
},
{
"inference": {
"model_id": "dslim__bert-base-ner",
"field_map": {
"message": "text_field"
},
"on_failure": [
{
"set": {
"description": "Set 'error.message'",
"field": "failure",
"value": "REDACT_NER_FAILED"
}
},
{
"set": {
"field": "redact.ner.successful",
"value": false
}
}
]
}
},
{
"script": {
"if": "ctx.failure_ner != 'REDACT_NER_FAILED'",
"lang": "painless",
"source": """String msg = ctx['message'];
for (item in ctx['ml']['inference']['entities']) {
if ((item['class_name'] != ctx.redact.ner.skip_entity) &&
(item['class_probability'] >= ctx.redact.ner.minimum_score)) {
msg = msg.replace(item['entity'], '<' +
'REDACTNER-'+ item['class_name'] + '_NER>')
}
}
ctx.redact.message = msg""",
"on_failure": [
{
"set": {
"description": "Set 'error.message'",
"field": "failure",
"value": "REDACT_REPLACEMENT_SCRIPT_FAILED",
"override": false
}
},
{
"set": {
"field": "redact.successful",
"value": false
}
}
]
}
},
{
"set": {
"if": "ctx?.ml?.inference?.entities.size() > 0",
"field": "redact.ner.found",
"value": true,
"ignore_failure": true
}
},
{
"set": {
"if": "ctx?.redact?.pii?.found == null",
"field": "redact.pii.found",
"value": false
}
},
{
"set": {
"if": "ctx?.redact?.ner?.found == true",
"field": "redact.pii.found",
"value": true
}
},
{
"remove": {
"if": "ctx.redact.ner.keep_result != true",
"field": [
"ml"
],
"ignore_missing": true,
"ignore_failure": true
}
}
],
"on_failure": [
{
"set": {
"field": "failure",
"value": "GENERAL_FAILURE",
"override": false
}
}
]
}
更新后的 PII 处理器管道,现在调用 NER 管道
process-pii 管道代码 - 单击以打开/关闭
# Updated Process PII pipeline that now call the NER pipeline
DELETE _ingest/pipeline/process-pii
PUT _ingest/pipeline/process-pii
{
"processors": [
{
"set": {
"description": "Set true if enabling sampling, otherwise false",
"field": "sample.enabled",
"value": true
}
},
{
"set": {
"description": "Set Sampling Rate 0 None 10000 all allows for 0.01% precision",
"field": "sample.sample_rate",
"value": 1000
}
},
{
"set": {
"description": "Set to false if you want to drop unsampled data, handy for reindexing hostorical data",
"field": "sample.keep_unsampled",
"value": true
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == true",
"name": "logs-sampler",
"ignore_failure": true
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == false || (ctx.sample.enabled == true && ctx.sample.sampled == true)",
"name": "logs-ner-pii-processor"
}
}
]
}
现在按照重新加载日志中的描述重新加载数据。
结果
让我们看看启用 NER 处理后的结果。在带有 KQL 查询栏的日志浏览器中,执行以下查询
日志浏览器应如下所示,打开顶部的消息以查看详细信息。
NER 模型结果
让我们仔细看看这些字段的含义。
字段
示例值
描述: NER 模型识别出的命名实体类别的数组。
字段
示例值
描述: class_probability 是介于 0 和 1 之间的值,表示给定数据点属于特定类别的可能性。数值越高,数据点属于命名类别的可能性就越高。这很重要,因为在下一篇博客中,我们可以决定要使用哪个阈值来发出警报和进行编辑。' 你可以在此示例中看到它识别了一个
字段
示例值
描述: 识别出的实体数组,其位置与
字段
示例值
[2024-09-23T14:32:14.608207-07:00Z] log.level=INFO: Payment successful for order #4594 (user: [Paul Buck](PER&Paul+Buck), [email protected]). Phone: 726-632-0527x520, Address: 3713 [Steven Glens](PER&Steven+Glens), [South Amyborough](LOC&South+Amyborough), [ME](ORG&ME) 93580, Ordered from: [Costco](ORG&Costco)
描述: 模型的预测值。
PII 评估仪表板
让我们快速浏览一下为评估 PII 数据而构建的仪表板。
https://github.com/bvader/elastic-pii/blob/main/elastic/blog-part-1/pii-dashboard-part-1.ndjson
有关 Kibana 已保存对象的更完整说明,请参见此处。
加载仪表板后,导航到它并选择正确的时间范围,您应该会看到如下所示的内容。它显示了诸如采样率、带有 NER 的日志百分比、NER 分数趋势等指标。我们将在本博客的第二部分中研究评估和操作。
总结和下一步
- 在本博客的第一部分中,我们完成了以下工作。
- 回顾了我们可用于 PII 检测和评估的技术和工具
- 回顾了 NLP / NER 在 PII 检测和评估中的作用
- 构建了必要的组合摄取管道,以对日志进行采样并通过 NER 模型运行它们
回顾了 NER 结果,并准备好进入第二篇博客
- 在即将发布的本博客的第二部分中,我们将介绍以下内容
- 应用字段级安全性来控制对未编辑数据的访问
- 增强仪表板和警报
- 生产注意事项和扩展
- 如何在传入或历史数据上运行这些进程
使用 NER 和 redact 处理器编辑 PII
数据加载附录
代码
$ git clone https://github.com/bvader/elastic-pii.git
https://github.com/bvader/elastic-pii
$ cd elastic-pii
$ cd python
$ python -m venv .env
$ source .env/bin/activate
$ pip install elasticsearch
$ pip install Faker
创建和加载示例数据集
$ python generate_random_logs.py
运行日志生成器
如果您不更改任何参数,这将创建一个名为 pii.log 的文件中包含 10000 个随机日志,其中包含带有和不带有 PII 的混合日志。
# The Elastic User
ELASTIC_USER = "elastic"
# Password for the 'elastic' user generated by Elasticsearch
ELASTIC_PASSWORD = "askdjfhasldfkjhasdf"
# Found in the 'Manage Deployment' page
ELASTIC_CLOUD_ID = "deployment:sadfjhasfdlkjsdhf3VuZC5pbzo0NDMkYjA0NmQ0YjFiYzg5NDM3ZDgxM2YxM2RhZjQ3OGE3MzIkZGJmNTE0OGEwODEzNGEwN2E3M2YwYjcyZjljYTliZWQ="
并设置以下内容
$ python load_logs.py
然后运行以下命令。
重新加载日志
$ python load_logs.py