Skip to content

Commit

Permalink
Merge pull request #2063 from GSA/API-1328_Logging_Formatter_With_Scrub
Browse files Browse the repository at this point in the history
API-1328 - Using a custom formatter to scrub PII from all log records.
  • Loading branch information
ccostino authored Oct 30, 2024
2 parents 64b979c + 278541f commit 1933dad
Show file tree
Hide file tree
Showing 2 changed files with 976 additions and 893 deletions.
67 changes: 37 additions & 30 deletions notifications_utils/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import re
import sys
from itertools import product
from typing import Any, override

from flask import g, request
from flask.ctx import has_app_context, has_request_context
Expand All @@ -17,6 +18,40 @@

logger = logging.getLogger(__name__)

_phone_regex = re.compile("(?:\\+ *)?\\d[\\d\\- ]{7,}\\d")
_email_regex = re.compile(r"[\w\.-]+@[\w\.-]+") # ['[email protected]', '[email protected]']


def _scrub(msg: Any) -> Any:
# Sometimes just an exception object is passed in for the message, skip those.
if not isinstance(msg, str):
return msg
phones = _phone_regex.findall(msg)
phones = [phone.replace("-", "").replace(" ", "") for phone in phones]
for phone in phones:
msg = msg.replace(phone, "1XXXXXXXXXX")

emails = _email_regex.findall(msg)
for email in emails:
# do something with each found email string
masked_email = "XXXXX@XXXXXXX"
msg = msg.replace(email, masked_email)

return msg


class PIIFilter(logging.Filter):
@override
def filter(self, record: logging.LogRecord) -> logging.LogRecord:
record.msg = _scrub(record.msg)
return record


class PIIFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
record.msg = _scrub(record.msg)
return super().format(record)


def init_app(app):
app.config.setdefault("NOTIFY_LOG_LEVEL", "INFO")
Expand Down Expand Up @@ -130,35 +165,7 @@ def process_log_record(self, log_record):
log_record["logType"] = "application"
try:
log_record["message"] = log_record["message"].format(**log_record)
log_record["message"] = _scrub(log_record["message"]) # PII Scrubbing
except (KeyError, IndexError) as e:
logger.exception("failed to format log message: {} not found".format(e))
logger.exception(f"failed to format log message: {e} not found")
return log_record


class PIIFilter(logging.Filter):
def scrub(self, msg):
# Eventually we want to scrub all messages in all logs for phone numbers
# and email addresses, masking them. Ultimately this will probably get
# refactored into a 'SafeLogger' subclass or something, but let's start here
# with phones.

# Sometimes just an exception object is passed in for the message, skip those.
if not isinstance(msg, str):
return msg
phones = re.findall("(?:\\+ *)?\\d[\\d\\- ]{7,}\\d", msg)
phones = [phone.replace("-", "").replace(" ", "") for phone in phones]
for phone in phones:
msg = msg.replace(phone, "1XXXXXXXXXX")

emails = re.findall(
r"[\w\.-]+@[\w\.-]+", msg
) # ['[email protected]', '[email protected]']
for email in emails:
# do something with each found email string
masked_email = "XXXXX@XXXXXXX"
msg = msg.replace(email, masked_email)
return msg

def filter(self, record):
record.msg = self.scrub(record.msg)
return record
Loading

0 comments on commit 1933dad

Please sign in to comment.