Stephen Brown

Using NLP and Pattern Matching to Detect, Assess, and Redact PII in Logs -  Part 2

How to detect, assess, and redact PII in your logs using Elasticsearch, NLP and Pattern Matching

32 min read
Using NLP and Pattern Matching to Detect, Assess, and Redact PII in Logs - Part 2

Introduction:

The prevalence of high-entropy logs in distributed systems has significantly raised the risk of PII (Personally Identifiable Information) seeping into our logs, which can result in security and compliance issues. This 2-part blog delves into the crucial task of identifying and managing this issue using the Elastic Stack. We will explore using NLP (Natural Language Processing) and Pattern matching to detect, assess, and, where feasible, redact PII from logs being ingested into Elasticsearch.

In Part 1 of this blog, we covered the following:

  • Review the techniques and tools we have available to manage PII in our logs
  • Understand the roles of NLP / NER in PII detection
  • Build a composable processing pipeline to detect and assess PII
  • Sample logs and run them through the NER Model
  • Assess the results of the NER Model

In Part 2 of this blog, we will cover the following:

  • Apply the
    redact
    regex pattern processor and assess the results
  • Create Alerts using ESQL
  • Apply field-level security to control access to the un-redacted data
  • Production considerations and scaling
  • How to run these processes on incoming or historical data

Reminder of the overall flow we will construct over the 2 blogs:

All code for this exercise can be found at: https://github.com/bvader/elastic-pii.

Part 1 Prerequisites

This blog picks up where Part 1 of this blog left off. You must have the NER model, ingest pipelines, and dashboard from Part 1 installed and working.

  • Loaded and configured NER Model
  • Installed all the composable ingest pipelines from Part 1 of the blog
  • Installed dashboard

You can access the complete solution for Blog 1 here. Don't forget to load the dashboard, found here.

Applying the Redact Processor

Next, we will apply the

processor. The
redact
processor is a simple regex-based processor that takes a list of regex patterns and looks for them in a field and replaces them with literals when found. The
redact
processor is reasonably performant and can run at scale. At the end, we will discuss this in detail in the production scaling section.

Elasticsearch comes packaged with a number of useful predefined patterns that can be conveniently referenced by the

redact
processor. If one does not suit your needs, create a new pattern with a custom definition. The Redact processor replaces every occurrence of a match. If there are multiple matches, they will all be replaced with the pattern name.

In the code below, we leveraged some of the predefined patterns as well as constructing several custom patterns.

        "patterns": [
          "%{EMAILADDRESS:EMAIL_REGEX}",      << Predefined
          "%{IP:IP_ADDRESS_REGEX}",           << Predefined
          "%{CREDIT_CARD:CREDIT_CARD_REGEX}", << Custom
          "%{SSN:SSN_REGEX}",                 << Custom
          "%{PHONE:PHONE_REGEX}"              << Custom
        ]

We also replaced the PII with easily identifiable patterns we can use for assessment.

In addition, it is important to note that since the redact processor is a simple regex find and replace, it can be used against many "secrets" patterns, not just PII. There are many references for regex and secrets patterns, so you can reuse this capability to detect secrets in your logs.

The code can be found here for the following two sections of code.

redact processor pipeline code - click to open/close
# Add the PII redact processor pipeline
DELETE _ingest/pipeline/logs-pii-redact-processor
PUT _ingest/pipeline/logs-pii-redact-processor
{
  "processors": [
    {
      "set": {
        "field": "redact.proc.successful",
        "value": true
      }
    },
    {
      "set": {
        "field": "redact.proc.found",
        "value": false
      }
    },
    {
      "set": {
        "if": "ctx?.redact?.message == null",
        "field": "redact.message",
        "copy_from": "message"
      }
    },
    {
      "redact": {
        "field": "redact.message",
        "prefix": "<REDACTPROC-",
        "suffix": ">",
        "patterns": [
          "%{EMAILADDRESS:EMAIL_REGEX}",
          "%{IP:IP_ADDRESS_REGEX}",
          "%{CREDIT_CARD:CREDIT_CARD_REGEX}",
          "%{SSN:SSN_REGEX}",
          "%{PHONE:PHONE_REGEX}"
        ],
        "pattern_definitions": {
          "CREDIT_CARD": """\d{4}[ -]\d{4}[ -]\d{4}[ -]\d{4}""",
          "SSN": """\d{3}-\d{2}-\d{4}""",
          "PHONE": """(\+\d{1,2}\s?)?1?\-?\.?\s?\(?\d{3}\)?[\s.-]?\d{3}[\s.-]?\d{4}"""
        },
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "failure",
              "value": "REDACT_PROCESSOR_FAILED",
              "override": false
            }
          },
          {
            "set": {
              "field": "redact.proc.successful",
              "value": false
            }
          }
        ]
      }
    },
    {
      "set": {
        "if": "ctx?.redact?.message.contains('REDACTPROC')",
        "field": "redact.proc.found",
        "value": true
      }
    },
    {
      "set": {
        "if": "ctx?.redact?.pii?.found == null",
        "field": "redact.pii.found",
        "value": false
      }
    },
    {
      "set": {
        "if": "ctx?.redact?.proc?.found == true",
        "field": "redact.pii.found",
        "value": true
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "failure",
        "value": "GENERAL_FAILURE",
        "override": false
      }
    }
  ]
}

And now, we will add the

logs-pii-redact-processor
pipeline to the overall
process-pii
pipeline

redact processor pipeline code - click to open/close
# Updated Process PII pipeline that now call the NER and Redact Processor pipeline
DELETE _ingest/pipeline/process-pii
PUT _ingest/pipeline/process-pii
{
  "processors": [
    {
      "set": {
        "description": "Set true if enabling sampling, otherwise false",
        "field": "sample.enabled",
        "value": true
      }
    },
    {
      "set": {
        "description": "Set Sampling Rate 0 None 10000 all allows for 0.01% precision",
        "field": "sample.sample_rate",
        "value": 1000
      }
    },
    {
      "set": {
        "description": "Set to false if you want to drop unsampled data, handy for reindexing hostorical data",
        "field": "sample.keep_unsampled",
        "value": true
      }
    },
    {
      "pipeline": {
        "if": "ctx.sample.enabled == true",
        "name": "logs-sampler",
        "ignore_failure": true
      }
    },
    {
      "pipeline": {
        "if": "ctx.sample.enabled == false || (ctx.sample.enabled == true && ctx.sample.sampled == true)",
        "name": "logs-ner-pii-processor"
      }
    },
    {
      "pipeline": {
        "if": "ctx.sample.enabled == false || (ctx.sample.enabled == true &&  ctx.sample.sampled == true)",
        "name": "logs-pii-redact-processor"
      }
    }
  ]
}

Reload the data as described in the Reloading the logs. If you have not generated the logs the first time, follow the instructions in the Data Loading Appendix

Go to Discover and enter the following into the KQL bar

sample.sampled : true and redact.message: REDACTPROC
and add the
redact.message
to the table and you should see something like this.

And if you did not load the dashboard from Blog Part 1 at already, load it, it can be found here using the Kibana -> Stack Management -> Saved Objects -> Import.

It should look something like this now. Note that the REGEX portions of the dashboard are now active.

Checkpoint

At this point, we have the following capabilities:

  • Ability to sample incoming logs and apply this PII redaction
  • Detect and Assess PII with the NER/NLP and Pattern Matching
  • Assess the amount, type and quality of the PII detections

This is a great point to stop if you are just running all this once to see how it works, but we have a few more steps to make this useful in production systems.

  • Clean up the working and unredacted data
  • Update the Dashboard to work with the cleaned-up data
  • Apply Role Based Access Control to protect the raw unredacted data
  • Create Alerts
  • Production and Scaling Considerations
  • How to run these processes on incoming or historical data

Applying to Production Systems

Cleanup working data and update the dashboard

And now we will add the cleanup code to the overall

process-pii
pipeline.

In short, we set a flag

redact.enable: true
that directs the pipeline to move the unredacted
message
field to
raw.message
and the move the redacted message field
redact.message
to the
message
field. We will "protect" the
raw.message
in the following section.

NOTE: Of course you can change this behavior if you want to completely delete the unredacted data. In this exercise we will keep it and protect it.

In addition we set

redact.cleanup: true
to clean up the NLP working data.

These fields allow a lot of control over what data you decide to keep and analyze.

The code can be found here for the following two sections of code.

redact processor pipeline code - click to open/close
# Updated Process PII pipeline that now call the NER and Redact Processor pipeline and cleans up 
DELETE _ingest/pipeline/process-pii
PUT _ingest/pipeline/process-pii
{
  "processors": [
    {
      "set": {
        "description": "Set true if enabling sampling, otherwise false",
        "field": "sample.enabled",
        "value": true
      }
    },
    {
      "set": {
        "description": "Set Sampling Rate 0 None 10000 all allows for 0.01% precision",
        "field": "sample.sample_rate",
        "value": 1000
      }
    },
    {
      "set": {
        "description": "Set to false if you want to drop unsampled data, handy for reindexing hostorical data",
        "field": "sample.keep_unsampled",
        "value": true
      }
    },
    {
      "pipeline": {
        "if": "ctx.sample.enabled == true",
        "name": "logs-sampler",
        "ignore_failure": true
      }
    },
    {
      "pipeline": {
        "if": "ctx.sample.enabled == false || (ctx.sample.enabled == true && ctx.sample.sampled == true)",
        "name": "logs-ner-pii-processor"
      }
    },
    {
      "pipeline": {
        "if": "ctx.sample.enabled == false || (ctx.sample.enabled == true &&  ctx.sample.sampled == true)",
        "name": "logs-pii-redact-processor"
      }
    },
    {
      "set": {
        "description": "Set to true to actually redact, false will run processors but leave original",
        "field": "redact.enable",
        "value": true
      }
    },
    {
      "rename": {
        "if": "ctx?.redact?.pii?.found == true && ctx?.redact?.enable == true",
        "field": "message",
        "target_field": "raw.message"
      }
    },
    {
      "rename": {
        "if": "ctx?.redact?.pii?.found == true && ctx?.redact?.enable == true",
        "field": "redact.message",
        "target_field": "message"
      }
    },
    {
      "set": {
        "description": "Set to true to actually to clean up working data",
        "field": "redact.cleanup",
        "value": true
      }
    },
    {
      "remove": {
        "if": "ctx?.redact?.cleanup == true",
        "field": [
          "ml"
        ],
        "ignore_failure": true
      }
    }
  ]
}

Reload the data as described here in the Reloading the logs.

Go to Discover and enter the following into the KQL bar

sample.sampled : true and redact.pii.found: true
and add the following fields to the table

message
,
raw.message
,
redact.ner.found
,
redact.proc.found
,
redact.pii.found

You should see something like this

We have everything we need to move forward with protecting the PII and Alerting on it.

Load up the new dashboard that works on the cleaned-up data

To load the dashboard, go to Kibana -> Stack Management -> Saved Objects and import the

pii-dashboard-part-2.ndjson
file that can be found here.

The new dashboard should look like this. Note: It uses different fields under the covers since we have cleaned up the underlying data.

You should see something like this

Apply Role Based Access Control to protect the raw unredacted data

Elasticsearch supports role-based access control, including field and document level access control natively; it dramatically reduces the operational and maintenance complexity required to secure our application.

We will create a Role that does not allow access to the

raw.message
field and then create a user and assign that user the role. With that role, the user will only be able to see the redacted message, which is now in the
message
field, but will not be able to access the protected
raw.message
field.

NOTE: Since we only sampled 10% of the data in this exercise the non-sampled

message
fields are not moved to the
raw.message
, so they are still viewable, but this shows the capability you can apply in a production system.

The code can be found here for the following section of code.

RBAC protect-pii role and user code - click to open/close
# Create role with no access to the raw.message field
GET _security/role/protect-pii
DELETE _security/role/protect-pii
PUT _security/role/protect-pii
{
 "cluster": [],
 "indices": [
   {
     "names": [
       "logs-*"
     ],
     "privileges": [
       "read",
       "view_index_metadata"
     ],
     "field_security": {
       "grant": [
         "*"
       ],
       "except": [
         "raw.message"
       ]
     },
     "allow_restricted_indices": false
   }
 ],
 "applications": [
   {
     "application": "kibana-.kibana",
     "privileges": [
       "all"
     ],
     "resources": [
       "*"
     ]
   }
 ],
 "run_as": [],
 "metadata": {},
 "transient_metadata": {
   "enabled": true
 }
}

# Create user stephen with protect-pii role
GET _security/user/stephen
DELETE /_security/user/stephen
POST /_security/user/stephen
{
 "password" : "mypassword",
 "roles" : [ "protect-pii" ],
 "full_name" : "Stephen Brown"
}

Now log into a separate window with the new user

stephen
with the
protect-pii role
. Go to Discover and put
redact.pii.found : true
in the KQL bar and add the
message
field to the table. Also, notice that the
raw.message
is not available.

You should see something like this

Create an Alert when PII Detected

Now, with the processing of the pipelines, creating an alert when PII is detected is easy. To review Alerting in Kibana in detail if needed

NOTE: Reload the data if needed to have recent data.

First, we will create a simple ES|QL query in Discover.

The code can be found here.

FROM logs-pii-default
| WHERE redact.pii.found == true
| STATS pii_count = count(*)
| WHERE pii_count > 0

When you run this you should see something like this.

Now click the Alerts menu and select

Create search threshold rule
, and will create an alert to alert us when PII is found.

Select a time field: @timestamp Set the time window: 5 minutes

Assuming you loaded the data recently when you run Test it should do something like

pii_count :

343
Alerts generated
query matched

Add an action when the alert is Active.

For each alert:

On status changes
Run when:
Query matched

Elasticsearch query rule {{rule.name}} is active:

- PII Found: true
- PII Count: {{#context.hits}} {{_source.pii_count}}{{/context.hits}}
- Conditions Met: {{context.conditions}} over {{rule.params.timeWindowSize}}{{rule.params.timeWindowUnit}}
- Timestamp: {{context.date}}
- Link: {{context.link}}

Add an Action for when the Alert is Recovered.

For each alert:

On status changes
Run when:
Recovered

Elasticsearch query rule {{rule.name}} is Recovered:

- PII Found: false
- Conditions Not Met: {{context.conditions}} over {{rule.params.timeWindowSize}}{{rule.params.timeWindowUnit}}
- Timestamp: {{context.date}}
- Link: {{context.link}}

When all setup it should look like this and

Save



You should get an Active alert that looks like this if you have recent data. I sent mine to Slack.

Elasticsearch query rule pii-found-esql is active:
- PII Found: true
- PII Count:  374
- Conditions Met: Query matched documents over 5m
- Timestamp: 2024-10-15T02:44:52.795Z
- Link: https://mydeployment123.aws.found.io:9243/app/management/insightsAndAlerting/triggersActions/rule/7d6faecf-964e-46da-aaba-8a2f89f33989

And then if you wait you will get a Recovered alert that looks like this.

Elasticsearch query rule pii-found-esql is Recovered:
- PII Found: false
- Conditions Not Met: Query did NOT match documents over 5m
- Timestamp: 2024-10-15T02:49:04.815Z
- Link: https://mydeployment123.kb.us-west-1.aws.found.io:9243/app/management/insightsAndAlerting/triggersActions/rule/7d6faecf-964e-46da-aaba-8a2f89f33989

Production Scaling

NER Scaling

As we mentioned Part 1 of this blog of this blog, NER / NLP Models are CPU-intensive and expensive to run at scale; thus, we employed a sampling technique to understand the risk in our logs without sending the full logs volume through the NER Model.

Please review the setup and configuration of the NER model from Part 1 of the blog.

We chose the base BERT NER model bert-base-NER for our PII case.

To scale ingest, we will focus on scaling the allocations for the deployed model. More information on this topic is available here. The number of allocations must be less than the available allocated processors (cores, not vCPUs) per node.

The metrics below are related to the model and configuration from Part 1 of the blog.

  • 4 Allocations to allow for more parallel ingestion
  • 1 Thread per Allocation
  • 0 Byes Cache, as we expect a low cache hit rate Note If there are many repeated logs, cache can help, but with timestamps and other variations, cache will not help and can even slow down the process
  • 8192 Queue
GET _ml/trained_models/dslim__bert-base-ner/_stats
.....
           "node": {
              "0m4tq7tMRC2H5p5eeZoQig": {
.....
                "attributes": {
                  "xpack.installed": "true",
                  "region": "us-west-1",
                  "ml.allocated_processors": "5", << HERE 
.....
            },
            "inference_count": 5040,
            "average_inference_time_ms": 138.44285714285715, << HERE 
            "average_inference_time_ms_excluding_cache_hits": 138.44285714285715,
            "inference_cache_hit_count": 0,
.....
            "threads_per_allocation": 1,
            "number_of_allocations": 4,  <<< HERE
            "peak_throughput_per_minute": 1550,
            "throughput_last_minute": 1373,
            "average_inference_time_ms_last_minute": 137.55280407865988,
            "inference_cache_hit_count_last_minute": 0
          }
        ]
      }
    }

There are 3 key pieces of information above:

  • "ml.allocated_processors": "5"
    The number of physical cores / processors available

  • "number_of_allocations": 4
    The number of allocations which is maximum 1 per physical core. Note: we could have used 5 allocations, but we only allocated 4 for this exercise

  • "average_inference_time_ms": 138.44285714285715
    The averages inference time per document.

The math is pretty straightforward for throughput for Inferences per Min (IPM) per allocation (1 allocation per physical core), since an inference uses a single core and a single thread.

Then the Inferences per Min per Allocation is simply:

IPM per allocation = 60,000 ms (in a minute) / 138ms per inference = 435

When then lines up with the Total Inferences per Minute

Total IPM = 435 IPM / allocation * 4 Allocations = ~1740

Suppose we want to do 10,000 IPMs, how many allocations (cores) would I need?

Allocations = 10,000 IPM / 435 IPM per allocation = 23 Allocation (cores rounded up)

Or perhaps logs are coming in at 5000 EPS and you want to do 1% Sampling.

IPM = 5000 EPS * 60sec * 0.01 sampling = 3000 IPM sampled

Then

Number of Allocators = 3000 IPM / 435 IPM per allocation = 7 allocations (cores rounded up)

Want Faster! Turns out there is a more lightweight NER Model distilbert-NER model that is faster, but the tradeoff is a little less accuracy.

Running the logs through this model results in an inference time nearly twice as fast!

"average_inference_time_ms": 66.0263959390863

Here is some quick math:

$IPM per allocation = 60,000 ms (in a minute) / 61ms per inference = 983

Suppose we want to do 25,000 IPMs, how many allocations (cores) would I need?

Allocations = 25,000 IPM / 983 IPM per allocation = 26 Allocation (cores rounded up)

Now you can apply this math to determine the correct sampling and NER scaling to support your logging use case.

Redact Processor Scaling

In short, the

redact
processor should scale to production loads as long as you are using appropriately sized and configured nodes and have well-constructed regex patterns.

Assessing incoming logs

If you want to test on incoming logs data in a data stream. All you need to do is change the conditional in the

logs@custom
pipeline to apply the
process-pii
to the dataset you want to. You can use any conditional that fits your condition.

Note: Just make sure that you have accounted for the proper scaling for the NER and Redact processors they were described above in Production Scaling

    {
      "pipeline": {
        "description" : "Call the process_pii pipeline on the correct dataset",
        "if": "ctx?.data_stream?.dataset == 'pii'", <<< HERE
        "name": "process-pii"
      }
    }

So if for example your logs are coming into

logs-mycustomapp-default
you would just change the conditional to

        "if": "ctx?.data_stream?.dataset == 'mycustomapp'",

Assessing historical data

If you have a historical (already ingested) data stream or index you can run the assessment over them using the

_reindex
API>

Note: Just make sure that you have accounted for the proper scaling for the NER and Redact processors they were described above in Production Scaling

There are a couple of extra steps: The code can be found here.

  1. First we can set the parameters to ONLY keep the sampled data as there is no reason to make a copy of all the unsampled data. In the
    process-pii
    pipeline, there is a setting
    sample.keep_unsampled
    , which we can set to
    false
    , which will then only keep the sampled data
    {
      "set": {
        "description": "Set to false if you want to drop unsampled data, handy for reindexing hostorical data",
        "field": "sample.keep_unsampled",
        "value": false <<< SET TO false
      }
    },
  1. Second, we will create a pipeline that will reroute the data to the correct data stream to run through all the PII assessment/detection pipelines. It also sets the correct
    dataset
    and
    namespace
DELETE _ingest/pipeline/sendtopii
PUT _ingest/pipeline/sendtopii
{
  "processors": [
    {
      "set": {
        "field": "data_stream.dataset",
        "value": "pii"
      }
    },
    {
      "set": {
        "field": "data_stream.namespace",
        "value": "default"
      }
    },
    {
      "reroute" : 
      {
        "dataset" : "{{data_stream.dataset}}",
        "namespace": "{{data_stream.namespace}}"
      }
    }
  ]
}
  1. Finally, we can run a
    _reindex
    to select the data we want to test/assess. It is recommended to review the _reindex documents before trying this. First, select the source data stream you want to assess, in this example, it is the
    logs-generic-default
    logs data stream. Note: I also added a
    range
    filter to select a specific time range. There is a bit of a "trick" that we need to use since we are re-routing the data to the data stream
    logs-pii-default
    . To do this, we just set
    "index": "logs-tmp-default"
    in the
    _reindex
    as the correct data stream will be set in the pipeline. We must do that because
    reroute
    is a
    noop
    if it is called from/to the same datastream.
POST _reindex?wait_for_completion=false
{
  "source": {
    "index": "logs-generic-default",
    "query": {
      "bool": {
        "filter": [
          {
            "range": {
              "@timestamp": {
                "gte": "now-1h/h",
                "lt": "now"
              }
            }
          }
        ]
      }
    }
  },
  "dest": {
    "op_type": "create",
    "index": "logs-tmp-default",
    "pipeline": "sendtopii"
  }
}

Summary

At this point, you have the tools and processes need to assess, detect, analyze, alert and protect PII in your logs.

The end state solution can be found here:.

In Part 1 of this blog, we accomplished the following.

  • Reviewed the techniques and tools we have available for PII detection and assessment
  • Reviewed NLP / NER role in PII detection and assessment
  • Built the necessary composable ingest pipelines to sample logs and run them through the NER Model
  • Reviewed the NER results and are ready to move to the second blog

In Part 2 of this blog, we covered the following:

  • Redact PII using NER and redact processor
  • Apply field-level security to control access to the un-redacted data
  • Enhance the dashboards and alerts
  • Production considerations and scaling
  • How to run these processes on incoming or historical data

So get to work and reduce risk in your logs!

Data Loading Appendix

Code

The data loading code can be found here:

https://github.com/bvader/elastic-pii

$ git clone https://github.com/bvader/elastic-pii.git

Creating and Loading the Sample Data Set

$ cd elastic-pii
$ cd python
$ python -m venv .env
$ source .env/bin/activate
$ pip install elasticsearch
$ pip install Faker

Run the log generator

$ python generate_random_logs.py

If you do not changes any parameters, this will create 10000 random logs in a file named pii.log with a mix of logs that containe and do not contain PII.

Edit

load_logs.py
and set the following

# The Elastic User 
ELASTIC_USER = "elastic"

# Password for the 'elastic' user generated by Elasticsearch
ELASTIC_PASSWORD = "askdjfhasldfkjhasdf"

# Found in the 'Manage Deployment' page
ELASTIC_CLOUD_ID = "deployment:sadfjhasfdlkjsdhf3VuZC5pbzo0NDMkYjA0NmQ0YjFiYzg5NDM3ZDgxM2YxM2RhZjQ3OGE3MzIkZGJmNTE0OGEwODEzNGEwN2E3M2YwYjcyZjljYTliZWQ="

Then run the following command.

$ python load_logs.py

Reloading the logs

Note To reload the logs, you can simply re-run the above command. You can run the command multiple time during this exercise and the logs will be reloaded (actually loaded again). The new logs will not collide with previous runs as there will be a unique

run.id
for each run which is displayed at the end of the loading process.

$ python load_logs.py

Share this article