Stephen Brown

使用 NLP 和模式匹配检测、评估和编辑日志中的 PII - 第 1 部分

如何使用 Elasticsearch 和 NLP 检测和评估日志中的 PII

26 分钟阅读
Using NLP and Pattern Matching to Detect, Assess, and Redact PII in Logs - Part 1

简介

分布式系统中高熵日志的普遍存在,大大增加了 PII(个人身份信息)泄露到日志中的风险,这可能会导致安全和合规性问题。这篇分为两部分的博客深入探讨了使用 Elastic Stack 识别和管理此问题的关键任务。我们将探讨使用 NLP(自然语言处理)和模式匹配来检测、评估以及在可行的情况下编辑正在提取到 Elasticsearch 中的日志中的 PII。

在这篇博客的第 1 部分中,我们将介绍以下内容:

  • 回顾我们可用于管理日志中 PII 的技术和工具
  • 了解 NLP / NER 在 PII 检测中的作用
  • 构建可组合的处理管道来检测和评估 PII
  • 采样日志并通过 NER 模型运行
  • 评估 NER 模型的结果

在这篇博客的第 2 部分中,我们将介绍以下内容:

  • 使用 NER 和编辑处理器编辑 PII
  • 应用字段级安全性来控制对未编辑数据的访问
  • 增强仪表板和警报
  • 生产注意事项和扩展
  • 如何在传入或历史数据上运行这些进程

这是我们在 2 篇博客中构建的整体流程

本练习的所有代码都可以在以下位置找到:https://github.com/bvader/elastic-pii

工具和技术

在本练习中,我们将使用四种通用功能。

  • 命名实体识别检测 (NER)
  • 模式匹配检测
  • 日志采样
  • 作为可组合处理的提取管道

命名实体识别 (NER) 检测

NER 是自然语言处理 (NLP) 的一个子任务,它涉及将非结构化文本中的命名实体识别并分类为预定义的类别,例如:

  • 人物:个人姓名,包括名人、政治家和历史人物。
  • 组织:公司、机构和组织的名称。
  • 地点:地理位置,包括城市、国家和地标。
  • 事件:事件名称,包括会议、会面和节日。

对于我们的 PII 用例,我们将选择可以从 Hugging Face 下载并作为训练模型加载到 Elasticsearch 中的基本 BERT NER 模型 bert-base-NER

重要提示:NER/NLP 模型是 CPU 密集型的,并且大规模运行成本高昂;因此,我们将采用一种采样技术来了解日志中的风险,而无需将完整的日志量发送到 NER 模型中。我们将在博客的第 2 部分中讨论 NER 模型的性能和扩展。

模式匹配检测

除了使用 NER 之外,正则表达式模式匹配是根据常见模式检测和编辑 PII 的强大工具。Elasticsearch 编辑处理器专为此用例而构建。

日志采样

考虑到 NER 的性能影响,以及我们可能会将大量日志提取到 Elasticsearch 中这一事实,对传入日志进行采样是有意义的。我们将构建一个简单的日志采样器来实现这一点。

作为可组合处理的提取管道

我们将创建多个管道,每个管道专注于特定功能,并创建一个主提取管道来协调整个过程。

构建处理流程

日志采样 + 可组合提取管道

我们要做的第一件事是设置一个采样器来采样我们的日志。此提取管道只需采用 0(无日志)到 10000(所有日志)之间的采样率,该采样率允许低至约 0.01% 的采样率,并使用以下标记已采样的日志:

sample.sampled: true
。对日志的进一步处理将由以下值驱动:
sample.sampled
sample.sample_rate
可以在此处设置,也可以从编排管道中“传入”。

该命令应从 Kibana -> 开发工具运行

可以在此处找到以下三个代码部分的代码。

日志采样器管道代码 - 单击以打开/关闭
# logs-sampler pipeline - part 1
DELETE _ingest/pipeline/logs-sampler
PUT _ingest/pipeline/logs-sampler
{
  "processors": [
    {
      "set": {
        "description": "Set Sampling Rate 0 None 10000 all allows for 0.01% precision",
        "if": "ctx.sample.sample_rate == null",
        "field": "sample.sample_rate",
        "value": 10000
      }
    },
    {
      "set": {
        "description": "Determine if keeping unsampled docs",
        "if": "ctx.sample.keep_unsampled == null",
        "field": "sample.keep_unsampled",
        "value": true
      }
    },
    {
      "set": {
        "field": "sample.sampled",
        "value": false
      }
    },
    {
      "script": {
        "source": """ Random r = new Random();
        ctx.sample.random = r.nextInt(params.max); """,
        "params": {
          "max": 10000
        }
      }
    },
    {
      "set": {
        "if": "ctx.sample.random <= ctx.sample.sample_rate",
        "field": "sample.sampled",
        "value": true
      }
    },
    {
      "drop": {
         "description": "Drop unsampled document if applicable",
        "if": "ctx.sample.keep_unsampled == false && ctx.sample.sampled == false"
      }
    }
  ]
}

现在,让我们测试一下日志采样器。我们将构建可组合管道的第一部分。我们将把日志发送到 logs-generic-default 数据流。考虑到这一点,我们将创建

logs@custom
提取管道,该管道将使用日志数据流框架自动调用以进行自定义。我们将添加一个额外的抽象级别,以便您可以将此 PII 处理应用于其他数据流。

接下来,我们将创建

process-pii
管道。这是核心处理管道,我们将在其中编排 PII 处理组件管道。在第一步中,我们将简单地应用采样逻辑。请注意,我们将采样率设置为 100,这相当于 10% 的日志。

process-pii 管道代码 - 单击以打开/关闭
# Process PII pipeline - part 1
DELETE _ingest/pipeline/process-pii
PUT _ingest/pipeline/process-pii
{
  "processors": [
    {
      "set": {
        "description": "Set true if enabling sampling, otherwise false",
        "field": "sample.enabled",
        "value": true
      }
    },
    {
      "set": {
        "description": "Set Sampling Rate 0 None 10000 all allows for 0.01% precision",
        "field": "sample.sample_rate",
        "value": 1000
      }
    },
    {
      "set": {
        "description": "Set to false if you want to drop unsampled data, handy for reindexing hostorical data",
        "field": "sample.keep_unsampled",
        "value": true
      }
    },
    {
      "pipeline": {
        "if": "ctx.sample.enabled == true",
        "name": "logs-sampler",
        "ignore_failure": true
      }
    }
  ]
}

最后,我们创建日志

logs@custom
,它将简单地调用我们的
process-pii
管道,该管道基于正确的
data_stream.dataset

logs@custom 管道代码 - 单击以打开/关闭
# logs@custom pipeline - part 1
DELETE _ingest/pipeline/logs@custom
PUT _ingest/pipeline/logs@custom
{
  "processors": [
    {
      "set": {
        "field": "pipelinetoplevel",
        "value": "logs@custom"
      }
    },
        {
      "set": {
        "field": "pipelinetoplevelinfo",
        "value": "{{{data_stream.dataset}}}"
      }
    },
    {
      "pipeline": {
        "description" : "Call the process_pii pipeline on the correct dataset",
        "if": "ctx?.data_stream?.dataset == 'pii'", 
        "name": "process-pii"
      }
    }
  ]
}

现在,让我们测试一下采样是否正在工作。

按照此处描述的方式加载数据数据加载附录。让我们先使用示例数据,稍后在本博客结尾处我们将讨论如何使用传入或历史日志进行测试。

如果您查看可观察性 -> 日志 -> 日志浏览器,并使用 KQL 过滤器

data_stream.dataset : pii
并按 sample.sampled 分类,您应该看到分解约为 10%

此时,我们有一个“采样”日志的可组合提取管道。作为奖励,您也可以将此日志采样器用于您的任何其他用例。

加载、配置和执行 NER 管道

加载 NER 模型

您将需要一个机器学习节点来运行 NER 模型。在本练习中,我们正在使用在 AWS 上弹性云托管部署,其采用的是CPU 优化 (ARM)架构。NER 推理将在机器学习 AWS c5d 节点上运行。将来会有 GPU 选项,但今天我们将坚持使用 CPU 架构。

本练习将使用单个具有 8 GB RAM 和 4.2 个 vCPU(最高可达 8.4 个 vCPU)的 c5d

有关上载、配置和部署模型的完整说明,请参阅有关如何将 NLP 训练模型导入 Elasticsearch 的官方文档。

获取模型的最快方法是使用 Eland Docker 方法。

以下命令会将模型加载到 Elasticsearch 中,但不会启动它。我们将在下一步执行此操作。

docker run -it --rm --network host docker.elastic.co/eland/eland \
  eland_import_hub_model \
  --url https://mydeployment.es.us-west-1.aws.found.io:443/ \
  -u elastic -p password \
  --hub-model-id dslim/bert-base-NER --task-type ner

部署和启动 NER 模型

一般来说,为了提高提取性能,请通过向部署添加更多分配来提高吞吐量。为了提高搜索速度,请增加每个分配的线程数。

为了扩展提取,我们将专注于扩展已部署模型的分配。有关此主题的更多信息,请此处提供。分配数量必须小于每个节点可用的已分配处理器(核心,而不是 vCPU)。

要部署和启动 NER 模型。我们将使用启动训练模型部署 API 来执行此操作

我们将配置以下内容

  • 4 个分配以允许更多并行提取
  • 每个分配 1 个线程
  • 0 个 Byes 缓存,因为我们预计缓存命中率较低
  • 8192 队列
# Start the model with 4 Allocators x 1 Thread, no cache, and 8192 queue
POST _ml/trained_models/dslim__bert-base-ner/deployment/_start?cache_size=0b&number_of_allocations=4&threads_per_allocation=1&queue_capacity=8192

您应该会收到类似这样的响应。

{
  "assignment": {
    "task_parameters": {
      "model_id": "dslim__bert-base-ner",
      "deployment_id": "dslim__bert-base-ner",
      "model_bytes": 430974836,
      "threads_per_allocation": 1,
      "number_of_allocations": 4,
      "queue_capacity": 8192,
      "cache_size": "0",
      "priority": "normal",
      "per_deployment_memory_bytes": 430914596,
      "per_allocation_memory_bytes": 629366952
    },
...
    "assignment_state": "started",
    "start_time": "2024-09-23T21:39:18.476066615Z",
    "max_assigned_allocations": 4
  }
}

NER 模型已部署并启动,可以使用。

以下提取管道通过 inference 处理器实现 NER 模型。

这里有大量的代码,但现在只有两项是相关的。其余代码是条件逻辑,用于驱动一些将在未来更仔细地研究的其他特定行为。

  1. 推理处理器通过 ID 调用先前加载的 NER 模型,并将待分析的文本传递给它。在本例中,待分析的文本是消息字段,也就是我们要传递给 NER 模型进行 PII 分析的 text_field。

  2. 脚本处理器循环遍历消息字段,并使用 NER 模型生成的数据,将识别出的 PII 替换为已编辑的占位符。这看起来比实际情况复杂,因为它只是简单地循环遍历 ML 预测数组,并在消息字符串中使用常量替换它们,并将结果存储在新字段中。

    redact.message
    。我们将在接下来的步骤中更仔细地研究这一点。

以下三个代码段的代码可以在这里找到

NER PII 管道

logs-ner-pii-processor 管道代码 - 单击以打开/关闭
# NER Pipeline
DELETE _ingest/pipeline/logs-ner-pii-processor
PUT _ingest/pipeline/logs-ner-pii-processor
{
  "processors": [
    {
      "set": {
        "description": "Set to true to actually redact, false will run processors but leave original",
        "field": "redact.enable",
        "value": true
      }
    },
    {
      "set": {
        "description": "Set to true to keep ml results for debugging",
        "field": "redact.ner.keep_result",
        "value": true
      }
    },
    {
      "set": {
        "description": "Set to PER, LOC, ORG to skip, or NONE to not drop any replacement",
        "field": "redact.ner.skip_entity",
        "value": "NONE"
      }
    },
    {
      "set": {
        "description": "Set to PER, LOC, ORG to skip, or NONE to not drop any replacement",
        "field": "redact.ner.minimum_score",
        "value": 0
      }
    },
    {
      "set": {
        "if": "ctx?.redact?.message == null",
        "field": "redact.message",
        "copy_from": "message"
      }
    },
    {
      "set": {
        "field": "redact.ner.successful",
        "value": true
      }
    },
    {
      "set": {
        "field": "redact.ner.found",
        "value": false
      }
    },
    {
      "inference": {
        "model_id": "dslim__bert-base-ner",
        "field_map": {
          "message": "text_field"
        },
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "failure",
              "value": "REDACT_NER_FAILED"
            }
          },
          {
            "set": {
              "field": "redact.ner.successful",
              "value": false
            }
          }
        ]
      }
    },
    {
      "script": {
        "if": "ctx.failure_ner != 'REDACT_NER_FAILED'",
        "lang": "painless",
        "source": """String msg = ctx['message'];
          for (item in ctx['ml']['inference']['entities']) {
          	if ((item['class_name'] != ctx.redact.ner.skip_entity) && 
          	  (item['class_probability'] >= ctx.redact.ner.minimum_score)) {  
          		  msg = msg.replace(item['entity'], '<' + 
          		  'REDACTNER-'+ item['class_name'] + '_NER>')
          	}
          }
          ctx.redact.message = msg""",
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "failure",
              "value": "REDACT_REPLACEMENT_SCRIPT_FAILED",
              "override": false
            }
          },
          {
            "set": {
              "field": "redact.successful",
              "value": false
            }
          }
        ]
      }
    },
    
    {
      "set": {
        "if": "ctx?.ml?.inference?.entities.size() > 0", 
        "field": "redact.ner.found",
        "value": true,
        "ignore_failure": true
      }
    },
    {
      "set": {
        "if": "ctx?.redact?.pii?.found == null",
        "field": "redact.pii.found",
        "value": false
      }
    },
    {
      "set": {
        "if": "ctx?.redact?.ner?.found == true",
        "field": "redact.pii.found",
        "value": true
      }
    },
    {
      "remove": {
        "if": "ctx.redact.ner.keep_result != true",
        "field": [
          "ml"
        ],
        "ignore_missing": true,
        "ignore_failure": true
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "failure",
        "value": "GENERAL_FAILURE",
        "override": false
      }
    }
  ]
}

更新后的 PII 处理器管道,现在调用 NER 管道

process-pii 管道代码 - 单击以打开/关闭
# Updated Process PII pipeline that now call the NER pipeline
DELETE _ingest/pipeline/process-pii
PUT _ingest/pipeline/process-pii
{
  "processors": [
    {
      "set": {
        "description": "Set true if enabling sampling, otherwise false",
        "field": "sample.enabled",
        "value": true
      }
    },
    {
      "set": {
        "description": "Set Sampling Rate 0 None 10000 all allows for 0.01% precision",
        "field": "sample.sample_rate",
        "value": 1000
      }
    },
    {
      "set": {
        "description": "Set to false if you want to drop unsampled data, handy for reindexing hostorical data",
        "field": "sample.keep_unsampled",
        "value": true
      }
    },
    {
      "pipeline": {
        "if": "ctx.sample.enabled == true",
        "name": "logs-sampler",
        "ignore_failure": true
      }
    },
    {
      "pipeline": {
        "if": "ctx.sample.enabled == false || (ctx.sample.enabled == true && ctx.sample.sampled == true)",
        "name": "logs-ner-pii-processor"
      }
    }
  ]
}

现在按照重新加载日志中的描述重新加载数据。

结果

让我们看看启用 NER 处理后的结果。在带有 KQL 查询栏的日志浏览器中,执行以下查询

data_stream.dataset : pii and ml.inference.entities.class_name : ("PER" and "LOC" and "ORG" )

日志浏览器应如下所示,打开顶部的消息以查看详细信息。

NER 模型结果

让我们仔细看看这些字段的含义。

字段

ml.inference.entities.class_name

示例值
[PER, PER, LOC, ORG, ORG]

描述: NER 模型识别出的命名实体类别的数组。

字段

ml.inference.entities.class_probability

示例值
[0.999, 0.972, 0.896, 0.506, 0.595]

描述: class_probability 是介于 0 和 1 之间的值,表示给定数据点属于特定类别的可能性。数值越高,数据点属于命名类别的可能性就越高。这很重要,因为在下一篇博客中,我们可以决定要使用哪个阈值来发出警报和进行编辑。' 你可以在此示例中看到它识别了一个
LOC
作为
ORG
,我们可以通过设置阈值来过滤掉/找到它们。

字段

ml.inference.entities.entity

示例值
[Paul Buck, Steven Glens, South Amyborough, ME, Costco]

描述: 识别出的实体数组,其位置与
class_name
class_probability
.

字段

对齐。

示例值
ml.inference.predicted_value

[2024-09-23T14:32:14.608207-07:00Z] log.level=INFO: Payment successful for order #4594 (user: [Paul Buck](PER&Paul+Buck), [email protected]). Phone: 726-632-0527x520, Address: 3713 [Steven Glens](PER&Steven+Glens), [South Amyborough](LOC&South+Amyborough), [ME](ORG&ME) 93580, Ordered from: [Costco](ORG&Costco)

描述: 模型的预测值。

PII 评估仪表板

让我们快速浏览一下为评估 PII 数据而构建的仪表板。

要加载仪表板,请转到 Kibana -> 堆栈管理 -> 已保存对象,然后导入
pii-dashboard-part-1.ndjson

文件,该文件可以在这里找到

https://github.com/bvader/elastic-pii/blob/main/elastic/blog-part-1/pii-dashboard-part-1.ndjson

有关 Kibana 已保存对象的更完整说明,请参见此处

加载仪表板后,导航到它并选择正确的时间范围,您应该会看到如下所示的内容。它显示了诸如采样率、带有 NER 的日志百分比、NER 分数趋势等指标。我们将在本博客的第二部分中研究评估和操作。

总结和下一步

  • 在本博客的第一部分中,我们完成了以下工作。
  • 回顾了我们可用于 PII 检测和评估的技术和工具
  • 回顾了 NLP / NER 在 PII 检测和评估中的作用
  • 构建了必要的组合摄取管道,以对日志进行采样并通过 NER 模型运行它们

回顾了 NER 结果,并准备好进入第二篇博客

  • 在即将发布的本博客的第二部分中,我们将介绍以下内容
  • 应用字段级安全性来控制对未编辑数据的访问
  • 增强仪表板和警报
  • 生产注意事项和扩展
  • 如何在传入或历史数据上运行这些进程

使用 NER 和 redact 处理器编辑 PII

数据加载附录

代码

数据加载代码可以在这里找到

$ git clone https://github.com/bvader/elastic-pii.git

https://github.com/bvader/elastic-pii

$ cd elastic-pii
$ cd python
$ python -m venv .env
$ source .env/bin/activate
$ pip install elasticsearch
$ pip install Faker

创建和加载示例数据集

$ python generate_random_logs.py

运行日志生成器

如果您不更改任何参数,这将创建一个名为 pii.log 的文件中包含 10000 个随机日志,其中包含带有和不带有 PII 的混合日志。

编辑
load_logs.py

# The Elastic User 
ELASTIC_USER = "elastic"

# Password for the 'elastic' user generated by Elasticsearch
ELASTIC_PASSWORD = "askdjfhasldfkjhasdf"

# Found in the 'Manage Deployment' page
ELASTIC_CLOUD_ID = "deployment:sadfjhasfdlkjsdhf3VuZC5pbzo0NDMkYjA0NmQ0YjFiYzg5NDM3ZDgxM2YxM2RhZjQ3OGE3MzIkZGJmNTE0OGEwODEzNGEwN2E3M2YwYjcyZjljYTliZWQ="

并设置以下内容

$ python load_logs.py

然后运行以下命令。

重新加载日志

注意 要重新加载日志,您只需重新运行上面的命令即可。在此练习期间,您可以多次运行该命令,并且日志将被重新加载(实际上是再次加载)。新日志不会与之前的运行冲突,因为每次运行都会有一个唯一的
run.id

$ python load_logs.py

显示更多

分享此文章