简介
分布式系统中高熵日志的普遍存在显着增加了 PII(个人身份信息)泄露到我们日志中的风险,这可能导致安全和合规问题。这篇分为两部分的博客深入探讨了使用 Elastic Stack 识别和管理此问题的关键任务。我们将探索使用 NLP(自然语言处理)和模式匹配来检测、评估以及在可行的情况下编辑摄取到 Elasticsearch 中的日志中的 PII。
在这篇博客的第一部分中,我们介绍了以下内容
- 回顾我们可用于管理日志中 PII 的技术和工具
- 了解 NLP / NER 在 PII 检测中的作用
- 构建可组合的处理管道以检测和评估 PII
- 对日志进行采样并通过 NER 模型运行它们
- 评估 NER 模型的结果
在本博客的第二部分中,我们将介绍以下内容
- 应用编辑正则表达式模式处理器并评估结果
- 使用 ESQL 创建警报
- 应用字段级安全性来控制对未编辑数据的访问
- 生产注意事项和扩展
- 如何在传入或历史数据上运行这些进程
提醒一下我们在 2 篇博客中构建的整体流程
本练习的所有代码都可以在以下位置找到:https://github.com/bvader/elastic-pii。
第一部分先决条件
本博客延续了本博客的第一部分的内容。您必须安装并运行第一部分的 NER 模型、摄取管道和仪表板。
- 加载并配置了 NER 模型
- 安装了博客第一部分中的所有可组合摄取管道
- 安装了仪表板
您可以在此处访问博客 1 的完整解决方案。不要忘记加载仪表板,此处可以找到。
应用编辑处理器
处理器。该Elasticsearch 打包了许多有用的预定义模式,这些模式可以通过
在下面的代码中,我们利用了一些预定义的模式以及构建了几个自定义模式。
"patterns": [
"%{EMAILADDRESS:EMAIL_REGEX}", << Predefined
"%{IP:IP_ADDRESS_REGEX}", << Predefined
"%{CREDIT_CARD:CREDIT_CARD_REGEX}", << Custom
"%{SSN:SSN_REGEX}", << Custom
"%{PHONE:PHONE_REGEX}" << Custom
]
我们还将 PII 替换为易于识别的模式,可用于评估。
此外,需要注意的是,由于编辑处理器是一个简单的正则表达式查找和替换,因此它可用于许多“秘密”模式,而不仅仅是 PII。有许多关于正则表达式和秘密模式的参考资料,因此您可以重复使用此功能来检测日志中的秘密。
此处可以找到以下两部分代码的代码。
编辑处理器管道代码 - 单击以打开/关闭
# Add the PII redact processor pipeline
DELETE _ingest/pipeline/logs-pii-redact-processor
PUT _ingest/pipeline/logs-pii-redact-processor
{
"processors": [
{
"set": {
"field": "redact.proc.successful",
"value": true
}
},
{
"set": {
"field": "redact.proc.found",
"value": false
}
},
{
"set": {
"if": "ctx?.redact?.message == null",
"field": "redact.message",
"copy_from": "message"
}
},
{
"redact": {
"field": "redact.message",
"prefix": "<REDACTPROC-",
"suffix": ">",
"patterns": [
"%{EMAILADDRESS:EMAIL_REGEX}",
"%{IP:IP_ADDRESS_REGEX}",
"%{CREDIT_CARD:CREDIT_CARD_REGEX}",
"%{SSN:SSN_REGEX}",
"%{PHONE:PHONE_REGEX}"
],
"pattern_definitions": {
"CREDIT_CARD": """\d{4}[ -]\d{4}[ -]\d{4}[ -]\d{4}""",
"SSN": """\d{3}-\d{2}-\d{4}""",
"PHONE": """(\+\d{1,2}\s?)?1?\-?\.?\s?\(?\d{3}\)?[\s.-]?\d{3}[\s.-]?\d{4}"""
},
"on_failure": [
{
"set": {
"description": "Set 'error.message'",
"field": "failure",
"value": "REDACT_PROCESSOR_FAILED",
"override": false
}
},
{
"set": {
"field": "redact.proc.successful",
"value": false
}
}
]
}
},
{
"set": {
"if": "ctx?.redact?.message.contains('REDACTPROC')",
"field": "redact.proc.found",
"value": true
}
},
{
"set": {
"if": "ctx?.redact?.pii?.found == null",
"field": "redact.pii.found",
"value": false
}
},
{
"set": {
"if": "ctx?.redact?.proc?.found == true",
"field": "redact.pii.found",
"value": true
}
}
],
"on_failure": [
{
"set": {
"field": "failure",
"value": "GENERAL_FAILURE",
"override": false
}
}
]
}
现在,我们将添加
编辑处理器管道代码 - 单击以打开/关闭
# Updated Process PII pipeline that now call the NER and Redact Processor pipeline
DELETE _ingest/pipeline/process-pii
PUT _ingest/pipeline/process-pii
{
"processors": [
{
"set": {
"description": "Set true if enabling sampling, otherwise false",
"field": "sample.enabled",
"value": true
}
},
{
"set": {
"description": "Set Sampling Rate 0 None 10000 all allows for 0.01% precision",
"field": "sample.sample_rate",
"value": 1000
}
},
{
"set": {
"description": "Set to false if you want to drop unsampled data, handy for reindexing hostorical data",
"field": "sample.keep_unsampled",
"value": true
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == true",
"name": "logs-sampler",
"ignore_failure": true
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == false || (ctx.sample.enabled == true && ctx.sample.sampled == true)",
"name": "logs-ner-pii-processor"
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == false || (ctx.sample.enabled == true && ctx.sample.sampled == true)",
"name": "logs-pii-redact-processor"
}
}
]
}
按照重新加载日志中的说明重新加载数据。如果您第一次没有生成日志,请按照数据加载附录中的说明进行操作
转到“发现”并在 KQL 栏中输入以下内容
如果您还没有加载博客第一部分的仪表板,请加载它,它可以在此处找到,使用 Kibana -> Stack Management -> Saved Objects -> Import。
现在它应该看起来像这样。请注意,仪表板的 REGEX 部分现在处于活动状态。
检查点
此时,我们具有以下功能
- 能够对传入日志进行采样并应用此 PII 编辑
- 使用 NER/NLP 和模式匹配检测和评估 PII
- 评估 PII 检测的数量、类型和质量
如果您只是运行所有这些操作一次以查看其工作原理,那么这是一个很好的停止点,但是我们还需要采取一些步骤才能使其在生产系统中发挥作用。
- 清理工作和未编辑的数据
- 更新仪表板以使用清理后的数据
- 应用基于角色的访问控制来保护原始未编辑的数据
- 创建警报
- 生产和扩展注意事项
- 如何在传入或历史数据上运行这些进程
应用于生产系统
清理工作数据并更新仪表板
现在我们将清理代码添加到整体
简而言之,我们设置一个标志
注意:当然,如果您想完全删除未编辑的数据,可以更改此行为。在本练习中,我们将保留它并保护它。
此外,我们设置
这些字段允许对您决定保留和分析的数据进行大量控制。
此处可以找到以下两部分代码的代码。
编辑处理器管道代码 - 单击以打开/关闭
# Updated Process PII pipeline that now call the NER and Redact Processor pipeline and cleans up
DELETE _ingest/pipeline/process-pii
PUT _ingest/pipeline/process-pii
{
"processors": [
{
"set": {
"description": "Set true if enabling sampling, otherwise false",
"field": "sample.enabled",
"value": true
}
},
{
"set": {
"description": "Set Sampling Rate 0 None 10000 all allows for 0.01% precision",
"field": "sample.sample_rate",
"value": 1000
}
},
{
"set": {
"description": "Set to false if you want to drop unsampled data, handy for reindexing hostorical data",
"field": "sample.keep_unsampled",
"value": true
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == true",
"name": "logs-sampler",
"ignore_failure": true
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == false || (ctx.sample.enabled == true && ctx.sample.sampled == true)",
"name": "logs-ner-pii-processor"
}
},
{
"pipeline": {
"if": "ctx.sample.enabled == false || (ctx.sample.enabled == true && ctx.sample.sampled == true)",
"name": "logs-pii-redact-processor"
}
},
{
"set": {
"description": "Set to true to actually redact, false will run processors but leave original",
"field": "redact.enable",
"value": true
}
},
{
"rename": {
"if": "ctx?.redact?.pii?.found == true && ctx?.redact?.enable == true",
"field": "message",
"target_field": "raw.message"
}
},
{
"rename": {
"if": "ctx?.redact?.pii?.found == true && ctx?.redact?.enable == true",
"field": "redact.message",
"target_field": "message"
}
},
{
"set": {
"description": "Set to true to actually to clean up working data",
"field": "redact.cleanup",
"value": true
}
},
{
"remove": {
"if": "ctx?.redact?.cleanup == true",
"field": [
"ml"
],
"ignore_failure": true
}
}
]
}
按照重新加载日志中的说明重新加载数据。
转到“发现”并在 KQL 栏中输入以下内容
您应该会看到类似这样的内容
我们拥有保护 PII 和发出警报所需的一切。
加载适用于清理后数据的新仪表板
要加载仪表板,请转到 Kibana -> Stack Management -> Saved Objects 并导入
新仪表板应如下所示。注意:它在幕后使用不同的字段,因为我们已经清理了底层数据。
您应该会看到类似这样的内容
应用基于角色的访问控制来保护原始未编辑的数据
Elasticsearch 原生支持基于角色的访问控制,包括字段级和文档级访问控制;它显着降低了保护应用程序所需的操作和维护复杂性。
我们将创建一个不允许访问
注意:由于在本练习中我们只对 10% 的数据进行了采样,因此未采样的
此处可以找到以下部分代码的代码。
RBAC protect-pii 角色和用户代码 - 单击以打开/关闭
# Create role with no access to the raw.message field
GET _security/role/protect-pii
DELETE _security/role/protect-pii
PUT _security/role/protect-pii
{
"cluster": [],
"indices": [
{
"names": [
"logs-*"
],
"privileges": [
"read",
"view_index_metadata"
],
"field_security": {
"grant": [
"*"
],
"except": [
"raw.message"
]
},
"allow_restricted_indices": false
}
],
"applications": [
{
"application": "kibana-.kibana",
"privileges": [
"all"
],
"resources": [
"*"
]
}
],
"run_as": [],
"metadata": {},
"transient_metadata": {
"enabled": true
}
}
# Create user stephen with protect-pii role
GET _security/user/stephen
DELETE /_security/user/stephen
POST /_security/user/stephen
{
"password" : "mypassword",
"roles" : [ "protect-pii" ],
"full_name" : "Stephen Brown"
}
现在使用新用户在单独的窗口中登录
您应该会看到类似这样的内容
当检测到 PII 时创建警报
现在,通过管道的处理,当检测到 PII 时创建警报非常容易。 如果需要,请详细查看 Kibana 中的警报
注意:如果需要,重新加载数据以获取最新数据。
首先,我们将在 Discover 中创建一个简单的 ES|QL 查询。
FROM logs-pii-default
| WHERE redact.pii.found == true
| STATS pii_count = count(*)
| WHERE pii_count > 0
当您运行此代码时,您应该看到类似这样的内容。
现在单击“警报”菜单并选择
选择时间字段:@timestamp 设置时间窗口:5 分钟
假设您最近加载了数据,当您运行 测试 时,它应该执行类似以下的操作
pii_count
当警报处于活动状态时添加一个操作。
对于每个警报
Elasticsearch query rule {{rule.name}} is active:
- PII Found: true
- PII Count: {{#context.hits}} {{_source.pii_count}}{{/context.hits}}
- Conditions Met: {{context.conditions}} over {{rule.params.timeWindowSize}}{{rule.params.timeWindowUnit}}
- Timestamp: {{context.date}}
- Link: {{context.link}}
为警报恢复时添加一个操作。
对于每个警报
Elasticsearch query rule {{rule.name}} is Recovered:
- PII Found: false
- Conditions Not Met: {{context.conditions}} over {{rule.params.timeWindowSize}}{{rule.params.timeWindowUnit}}
- Timestamp: {{context.date}}
- Link: {{context.link}}
当所有设置完成后,它应该看起来像这样,然后
如果您有最新的数据,您应该会收到如下所示的活动警报。 我将我的警报发送到了 Slack。
Elasticsearch query rule pii-found-esql is active:
- PII Found: true
- PII Count: 374
- Conditions Met: Query matched documents over 5m
- Timestamp: 2024-10-15T02:44:52.795Z
- Link: https://mydeployment123.aws.found.io:9243/app/management/insightsAndAlerting/triggersActions/rule/7d6faecf-964e-46da-aaba-8a2f89f33989
然后,如果您等待,您将收到如下所示的“已恢复”警报。
Elasticsearch query rule pii-found-esql is Recovered:
- PII Found: false
- Conditions Not Met: Query did NOT match documents over 5m
- Timestamp: 2024-10-15T02:49:04.815Z
- Link: https://mydeployment123.kb.us-west-1.aws.found.io:9243/app/management/insightsAndAlerting/triggersActions/rule/7d6faecf-964e-46da-aaba-8a2f89f33989
生产环境扩展
NER 扩展
正如我们在本博客的 第一部分 中提到的,NER/NLP 模型是 CPU 密集型的,并且大规模运行成本高昂;因此,我们采用了一种采样技术来了解日志中的风险,而无需将全部日志量通过 NER 模型发送。
请查看博客第一部分中有关 NER 模型的设置和配置。
对于我们的 PII 用例,我们选择了基础 BERT NER 模型 bert-base-NER。
为了扩展摄取,我们将重点关注扩展已部署模型的分配。 有关此主题的更多信息,请参见此处。 分配的数量必须小于每个节点可用的已分配处理器(核心,而不是 vCPU)。
以下指标与博客第一部分中的模型和配置有关。
- 4 个分配,以允许更多并行摄取
- 每个分配 1 个线程
- 0 字节缓存,因为我们预计缓存命中率较低 注意 如果有很多重复日志,缓存可能会有所帮助,但是对于时间戳和其他变化,缓存将不会有帮助,甚至会减慢处理速度
- 8192 队列
GET _ml/trained_models/dslim__bert-base-ner/_stats
.....
"node": {
"0m4tq7tMRC2H5p5eeZoQig": {
.....
"attributes": {
"xpack.installed": "true",
"region": "us-west-1",
"ml.allocated_processors": "5", << HERE
.....
},
"inference_count": 5040,
"average_inference_time_ms": 138.44285714285715, << HERE
"average_inference_time_ms_excluding_cache_hits": 138.44285714285715,
"inference_cache_hit_count": 0,
.....
"threads_per_allocation": 1,
"number_of_allocations": 4, <<< HERE
"peak_throughput_per_minute": 1550,
"throughput_last_minute": 1373,
"average_inference_time_ms_last_minute": 137.55280407865988,
"inference_cache_hit_count_last_minute": 0
}
]
}
}
以上有 3 个关键信息
-
“ml.allocated_processors”:“5”可用的物理内核/处理器数量
-
“number_of_allocations”: 4分配的数量,每个物理内核最多 1 个。 注意:我们可以使用 5 个分配,但在此练习中我们仅分配了 4 个
-
“average_inference_time_ms”: 138.44285714285715每个文档的平均推理时间。
对于每个分配(每个物理内核 1 个分配),每分钟推理次数 (IPM) 的吞吐量计算非常简单,因为推理使用单个内核和单个线程。
然后,每个分配的每分钟推理次数 (IPM) 简单地为
然后与每分钟总推理次数一致
假设我们想做 10,000 个 IPM,我需要多少个分配(核心)?
或者,也许日志以 5000 EPS 的速度传入,您想进行 1% 的采样。
然后
想要更快!事实证明,存在更轻量级的 NER 模型 distilbert-NER 模型,该模型速度更快,但缺点是准确性稍差。
通过此模型运行日志会导致推理时间快近两倍!
以下是一些快速计算
假设我们想做 25,000 个 IPM,我需要多少个分配(核心)?
现在,您可以应用此计算来确定正确的采样和 NER 扩展,以支持您的日志记录用例。
编辑处理器扩展
简而言之,
评估传入日志
如果要测试数据流中的传入日志数据。 您需要做的就是更改
注意:请确保您已经考虑了 NER 和 Redact 处理器的适当扩展,如上面的 生产环境扩展中所述
{
"pipeline": {
"description" : "Call the process_pii pipeline on the correct dataset",
"if": "ctx?.data_stream?.dataset == 'pii'", <<< HERE
"name": "process-pii"
}
}
因此,例如,如果您的日志进入
"if": "ctx?.data_stream?.dataset == 'mycustomapp'",
评估历史数据
如果您有历史(已摄取)数据流或索引,则可以使用
注意:请确保您已经考虑了 NER 和 Redact 处理器的适当扩展,如上面的 生产环境扩展中所述
还有一些额外的步骤:代码可以在这里找到。
- 首先,我们可以将参数设置为仅保留采样数据,因为没有理由复制所有未采样的数据。 在process-pii管道中,有一个设置sample.keep_unsampled,我们可以将其设置为false,然后将仅保留采样数据
{
"set": {
"description": "Set to false if you want to drop unsampled data, handy for reindexing hostorical data",
"field": "sample.keep_unsampled",
"value": false <<< SET TO false
}
},
- 其次,我们将创建一个管道,该管道会将数据重新路由到正确的数据流,以运行所有 PII 评估/检测管道。 它还设置正确的数据集和命名空间
DELETE _ingest/pipeline/sendtopii
PUT _ingest/pipeline/sendtopii
{
"processors": [
{
"set": {
"field": "data_stream.dataset",
"value": "pii"
}
},
{
"set": {
"field": "data_stream.namespace",
"value": "default"
}
},
{
"reroute" :
{
"dataset" : "{{data_stream.dataset}}",
"namespace": "{{data_stream.namespace}}"
}
}
]
}
- 最后,我们可以运行_reindex以选择我们要测试/评估的数据。 建议在尝试此操作之前查看 _reindex 文档。 首先,选择要评估的源数据流,在此示例中,它是logs-generic-default日志数据流。 注意:我还添加了一个范围过滤器以选择特定的时间范围。 这里有一个我们需要的“技巧”,因为我们将数据重新路由到数据流logs-pii-default。 为此,我们只需在中将“index”: “logs-tmp-default”_reindex设置为正确的管道中将设置的数据流。 我们必须这样做,因为reroute是一个noop如果从/向同一数据流调用,则为
POST _reindex?wait_for_completion=false
{
"source": {
"index": "logs-generic-default",
"query": {
"bool": {
"filter": [
{
"range": {
"@timestamp": {
"gte": "now-1h/h",
"lt": "now"
}
}
}
]
}
}
},
"dest": {
"op_type": "create",
"index": "logs-tmp-default",
"pipeline": "sendtopii"
}
}
总结
至此,您拥有了评估、检测、分析、发出警报和保护日志中 PII 所需的工具和流程。
在 本博客的第一部分中,我们完成了以下操作。
- 回顾了我们可用于 PII 检测和评估的技术和工具
- 回顾了 NLP/NER 在 PII 检测和评估中的作用
- 构建了必要的可组合摄取管道,以对日志进行采样并通过 NER 模型运行
- 查看了 NER 结果,并准备好进入第二篇博客
在本博客的 第二部分 中,我们介绍了以下内容
- 使用 NER 和编辑处理器编辑 PII
- 应用字段级安全性来控制对未编辑数据的访问
- 增强仪表板和警报
- 生产注意事项和扩展
- 如何在传入或历史数据上运行这些进程
因此,开始工作并降低日志中的风险!
数据加载附录
代码
数据加载代码可以在这里找到
https://github.com/bvader/elastic-pii
$ git clone https://github.com/bvader/elastic-pii.git
创建和加载示例数据集
$ cd elastic-pii
$ cd python
$ python -m venv .env
$ source .env/bin/activate
$ pip install elasticsearch
$ pip install Faker
运行日志生成器
$ python generate_random_logs.py
如果您不更改任何参数,这将创建一个名为 pii.log 的文件中 10000 个随机日志,其中包含混合了包含和不包含 PII 的日志。
编辑
# The Elastic User
ELASTIC_USER = "elastic"
# Password for the 'elastic' user generated by Elasticsearch
ELASTIC_PASSWORD = "askdjfhasldfkjhasdf"
# Found in the 'Manage Deployment' page
ELASTIC_CLOUD_ID = "deployment:sadfjhasfdlkjsdhf3VuZC5pbzo0NDMkYjA0NmQ0YjFiYzg5NDM3ZDgxM2YxM2RhZjQ3OGE3MzIkZGJmNTE0OGEwODEzNGEwN2E3M2YwYjcyZjljYTliZWQ="
然后运行以下命令。
$ python load_logs.py
重新加载日志
注意要重新加载日志,您可以简单地重新运行上述命令。 在此练习期间,您可以多次运行该命令,日志将被重新加载(实际上是再次加载)。 新日志不会与之前的运行冲突,因为每次运行都会有一个唯一的
$ python load_logs.py