Skip to content

Commit

Permalink
feat(upload-notes): add datetimes to each DocRef header
Browse files Browse the repository at this point in the history
Also:
- Order the DocRefs in an encounter by datetime
- Switch from freezegun to time-machine

Unrelatedly:
- Switch from deprecated cgi module to email module for mimetype
  parsing.

Fixes #298
  • Loading branch information
mikix committed Mar 14, 2024
1 parent e551495 commit 9800ef1
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 41 deletions.
2 changes: 1 addition & 1 deletion cumulus_etl/fhir/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Support for talking to FHIR servers & handling the FHIR spec"""

from .fhir_client import FhirClient, create_fhir_client_for_cli
from .fhir_utils import download_reference, get_docref_note, ref_resource, unref_resource
from .fhir_utils import download_reference, get_docref_note, parse_datetime, ref_resource, unref_resource
46 changes: 42 additions & 4 deletions cumulus_etl/fhir/fhir_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""FHIR utility methods"""

import base64
import cgi
import datetime
import email.message
import re

import inscriptis
Expand Down Expand Up @@ -71,6 +72,43 @@ def unref_resource(ref: dict | None) -> (str | None, str):
return ref.get("type"), tokens[0]


######################################################################################################################
#
# Field parsing
#
######################################################################################################################


def parse_datetime(value: str | None) -> datetime.datetime | None:
"""
Converts FHIR instant/dateTime/date types into a Python format.
- This tries to be very graceful - any errors will result in a None return.
- Missing month/day fields are treated as the earliest possible date (i.e. '1')
CAUTION: Returned datetime might be naive - which makes more sense for dates without a time.
The spec says any field with hours/minutes SHALL have a timezone.
But fields that are just dates SHALL NOT have a timezone.
"""
if not value:
return None

try:
# Handle partial dates like "1980-12" (which spec allows, but fromisoformat can't handle)
pieces = value.split("-")
if len(pieces) == 1:
return datetime.datetime(int(pieces[0]), 1, 1) # note: naive datetime
elif len(pieces) == 2:
return datetime.datetime(int(pieces[0]), int(pieces[1]), 1) # note: naive datetime

# Until we depend on Python 3.11+, manually handle Z
value = value.replace("Z", "+00:00")

return datetime.datetime.fromisoformat(value)
except ValueError:
return None


######################################################################################################################
#
# Resource downloading
Expand Down Expand Up @@ -104,9 +142,9 @@ async def download_reference(client: FhirClient, reference: str) -> dict | None:

def _parse_content_type(content_type: str) -> (str, str):
"""Returns (mimetype, encoding)"""
# TODO: switch to message.Message parsing, since cgi is deprecated
mimetype, params = cgi.parse_header(content_type)
return mimetype, params.get("charset", "utf8")
msg = email.message.EmailMessage()
msg["content-type"] = content_type
return msg.get_content_type(), msg.get_content_charset("utf8")


def _mimetype_priority(mimetype: str) -> int:
Expand Down
24 changes: 24 additions & 0 deletions cumulus_etl/upload_notes/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import argparse
import asyncio
import datetime
import sys
from collections.abc import Collection

Expand Down Expand Up @@ -53,6 +54,15 @@ async def gather_docrefs(
)


def datetime_from_docref(docref: dict) -> datetime.datetime | None:
"""Returns the date of a docref - preferring `context.period.start`, then `date`"""
if start := fhir.parse_datetime(docref.get("context", {}).get("period", {}).get("start")):
return start
if date := fhir.parse_datetime(docref.get("date")):
return date
return None


async def read_notes_from_ndjson(
client: fhir.FhirClient, dirname: str, codebook: deid.Codebook
) -> list[LabelStudioNote]:
Expand Down Expand Up @@ -95,6 +105,7 @@ async def read_notes_from_ndjson(
doc_spans=doc_spans,
title=title,
text=text,
date=datetime_from_docref(docrefs[i]),
)
)

Expand Down Expand Up @@ -159,13 +170,26 @@ def group_notes_by_encounter(notes: Collection[LabelStudioNote]) -> list[LabelSt
grouped_doc_mappings = {}
grouped_doc_spans = {}

# Sort notes by date (putting Nones last)
enc_notes = sorted(enc_notes, key=lambda x: (x.date or datetime.datetime.max).timestamp())

for note in enc_notes:
grouped_doc_mappings.update(note.doc_mappings)

if not note.date:
date_string = "Unknown time"
elif note.date.tzinfo:
# aware datetime, with hours/minutes (using original timezone, not local)
date_string = f"{note.date:%x %X}" # locale-based date + time
else:
# non-aware datetime, only show the date (fhir spec says times must have timezones)
date_string = f"{note.date:%x}" # locale-based date

if grouped_text:
grouped_text += "\n\n\n"
grouped_text += "########################################\n########################################\n"
grouped_text += f"{note.title}\n"
grouped_text += f"{date_string}\n"
grouped_text += "########################################\n########################################\n\n\n"
offset = len(grouped_text)
grouped_text += note.text
Expand Down
2 changes: 2 additions & 0 deletions cumulus_etl/upload_notes/labelstudio.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""LabelStudio document annotation"""

import dataclasses
import datetime
from collections.abc import Collection, Iterable

import ctakesclient.typesystem
Expand All @@ -25,6 +26,7 @@ class LabelStudioNote:
enc_id: str # real Encounter ID
anon_id: str # anonymized Encounter ID
text: str = "" # text of the note, sent to Label Studio
date: datetime.datetime | None = None # date of the note

# A title is only used when combining notes into one big encounter note. It's not sent to Label Studio.
title: str = ""
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ line-length = 120
tests = [
"coverage",
"ddt",
"freezegun",
"moto[server,s3] >= 5.0",
"pytest",
"respx",
"time-machine",
]
dev = [
"black == 23.11.0",
Expand Down
2 changes: 1 addition & 1 deletion tests/fhir/test_fhir_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def setUp(self):
"iss": self.client_id,
"sub": self.client_id,
"aud": self.token_url,
"exp": int(time.time()) + 299, # aided by freezegun not changing time under us
"exp": int(time.time()) + 299, # aided by time-machine not changing time under us
"jti": "1234",
},
)
Expand Down
27 changes: 27 additions & 0 deletions tests/fhir/test_fhir_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Tests for fhir_utils.py"""

import base64
import datetime
import shutil
from unittest import mock

Expand Down Expand Up @@ -45,6 +47,31 @@ def test_ref_resource(self, resource_type, resource_id, expected):
self.assertEqual({"reference": expected}, fhir.ref_resource(resource_type, resource_id))


@ddt.ddt
class TestDateParsing(utils.AsyncTestCase):
"""Tests for the parse_datetime method"""

@ddt.data(
(None, None),
("", None),
("abc", None),
("abc-de", None),
("abc-de-fg", None),
("2018", datetime.datetime(2018, 1, 1)), # naive
("2021-07", datetime.datetime(2021, 7, 1)), # naive
("1992-11-06", datetime.datetime(1992, 11, 6)), # naive
(
"1992-11-06T13:28:17.239+02:00",
datetime.datetime(1992, 11, 6, 13, 28, 17, 239000, tzinfo=datetime.timezone(datetime.timedelta(hours=2))),
),
("1992-11-06T13:28:17.239Z", datetime.datetime(1992, 11, 6, 13, 28, 17, 239000, tzinfo=datetime.timezone.utc)),
)
@ddt.unpack
def test_parse_datetime(self, input_value, expected_value):
parsed = fhir.parse_datetime(input_value)
self.assertEqual(expected_value, parsed)


@ddt.ddt
class TestDocrefNotesUtils(utils.AsyncTestCase):
"""Tests for the utility methods dealing with document reference clinical notes"""
Expand Down
99 changes: 69 additions & 30 deletions tests/upload_notes/test_upload_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,19 @@ async def run_upload_notes(
await cli.main(args)

@staticmethod
def make_docref(doc_id: str, text: str = None, content: list[dict] = None, enc_id: str = None) -> dict:
def make_docref(
doc_id: str,
text: str = None,
content: list[dict] = None,
enc_id: str = None,
date: str = None,
period_start: str = None,
) -> dict:
docref = {
"resourceType": "DocumentReference",
"id": doc_id,
}

if content is None:
text = text or "What's up doc?"
content = [
Expand All @@ -119,14 +131,18 @@ def make_docref(doc_id: str, text: str = None, content: list[dict] = None, enc_i
},
}
]
docref["content"] = content

enc_id = enc_id or f"enc-{doc_id}"
return {
"resourceType": "DocumentReference",
"id": doc_id,
"content": content,
"context": {"encounter": [{"reference": f"Encounter/{enc_id}"}]},
}
docref["context"] = {"encounter": [{"reference": f"Encounter/{enc_id}"}]}

if date:
docref["date"] = date

if period_start:
docref["context"]["period"] = {"start": period_start}

return docref

@staticmethod
def mock_search_url(respx_mock: respx.MockRouter, patient: str, doc_ids: Iterable[str]) -> None:
Expand Down Expand Up @@ -170,13 +186,14 @@ def get_pushed_ids(self) -> set[str]:
return set(itertools.chain.from_iterable(n.doc_mappings.keys() for n in notes))

@staticmethod
def wrap_note(title: str, text: str, first: bool = True) -> str:
def wrap_note(title: str, text: str, first: bool = True, date: str | None = None) -> str:
"""Format a note in the expected output format, with header"""
finalized = ""
if not first:
finalized += "\n\n\n"
finalized += "########################################\n########################################\n"
finalized += f"{title}\n"
finalized += f"{date or 'Unknown time'}\n"
finalized += "########################################\n########################################\n\n\n"
finalized += text.strip()
return finalized
Expand Down Expand Up @@ -300,15 +317,15 @@ async def test_successful_push_to_label_studio(self):
)
self.assertEqual(
[
self.wrap_note("Admission MD", "Notes for fever"),
self.wrap_note("Admission MD", "Notes! for fever"),
self.wrap_note("Admission MD", "Notes for fever", date="06/23/21"),
self.wrap_note("Admission MD", "Notes! for fever", date="06/24/21"),
],
[t.text for t in tasks],
)
self.assertEqual(
{
"begin": 103,
"end": 106,
"begin": 112,
"end": 115,
"text": "for",
"polarity": 0,
"conceptAttributes": [
Expand Down Expand Up @@ -369,18 +386,40 @@ async def test_philter_label(self):
task = tasks[0]

# High span numbers because we insert some header text
self.assertEqual({93: 97, 98: 103}, task.philter_map)
self.assertEqual({106: 110, 111: 116}, task.philter_map)

async def test_grouped_datetime(self):
with tempfile.TemporaryDirectory() as tmpdir:
with common.NdjsonWriter(f"{tmpdir}/DocumentReference.ndjson") as writer:
writer.write(TestUploadNotes.make_docref("D1", enc_id="E1", text="DocRef 1"))
writer.write(
TestUploadNotes.make_docref("D2", enc_id="E1", text="DocRef 2", date="2018-01-03T13:10:10+01:00")
)
writer.write(
TestUploadNotes.make_docref(
"D3", enc_id="E1", text="DocRef 3", date="2018-01-03T13:10:20Z", period_start="2018"
)
)
await self.run_upload_notes(input_path=tmpdir, philter="disable")

@respx.mock(assert_all_mocked=False)
async def test_combined_encounter_offsets(self, respx_mock):
# use server notes just for ease of making fake ones
self.mock_read_url(respx_mock, "D1", enc_id="43")
self.mock_read_url(respx_mock, "D2", enc_id="43")
respx_mock.post(os.environ["URL_CTAKES_REST"]).pass_through() # ignore cTAKES
notes = self.ls_client.push_tasks.call_args[0][0]
self.assertEqual(1, len(notes))
note = notes[0]

with tempfile.NamedTemporaryFile() as file:
self.write_real_docrefs(file.name, ["D1", "D2"])
await self.run_upload_notes(input_path="https://localhost", docrefs=file.name)
# The order will be oldest->newest (None placed last)
self.assertEqual(
self.wrap_note("Document", "DocRef 3", date="01/01/18")
+ self.wrap_note("Document", "DocRef 2", date="01/03/18 13:10:10", first=False)
+ self.wrap_note("Document", "DocRef 1", first=False),
note.text,
)

async def test_grouped_encounter_offsets(self):
with tempfile.TemporaryDirectory() as tmpdir:
with common.NdjsonWriter(f"{tmpdir}/DocumentReference.ndjson") as writer:
writer.write(TestUploadNotes.make_docref("D1", enc_id="43"))
writer.write(TestUploadNotes.make_docref("D2", enc_id="43"))
await self.run_upload_notes(input_path=tmpdir)

notes = self.ls_client.push_tasks.call_args[0][0]
self.assertEqual(1, len(notes))
Expand All @@ -390,19 +429,19 @@ async def test_combined_encounter_offsets(self, respx_mock):
self.assertEqual({"D1": ANON_D1, "D2": ANON_D2}, note.doc_mappings)

# Did we mark the internal docref spans correctly?
first_span = (93, 107)
second_span = (285, 299)
first_span = (106, 120)
second_span = (311, 325)
self.assertEqual("What's up doc?", note.text[first_span[0] : first_span[1]])
self.assertEqual("What's up doc?", note.text[second_span[0] : second_span[1]])
self.assertEqual({"D1": first_span, "D2": second_span}, note.doc_spans)

# Did we edit cTAKES results correctly?
match1a = (93, 99)
match1b = (100, 102)
match1c = (103, 107)
match2a = (285, 291)
match2b = (292, 294)
match2c = (295, 299)
match1a = (106, 112)
match1b = (113, 115)
match1c = (116, 120)
match2a = (311, 317)
match2b = (318, 320)
match2c = (321, 325)
self.assertEqual("What's", note.text[match1a[0] : match1a[1]])
self.assertEqual("up", note.text[match1b[0] : match1b[1]])
self.assertEqual("doc?", note.text[match1c[0] : match1c[1]])
Expand Down
7 changes: 3 additions & 4 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,20 @@
import unittest
from unittest import mock

import freezegun
import httpx
import respx
import time_machine

from cumulus_etl.formats.deltalake import DeltaLakeFormat

# Pass a non-UTC time to freezegun to help notice any bad timezone handling.
# Pass a non-UTC time to time-machine to help notice any bad timezone handling.
# But only bother exposing the UTC version to other test code, since that's what will be most useful/common.
_FROZEN_TIME = datetime.datetime(2021, 9, 15, 1, 23, 45, tzinfo=datetime.timezone(datetime.timedelta(hours=4)))
FROZEN_TIME_UTC = _FROZEN_TIME.astimezone(datetime.timezone.utc)


# Several tests involve timestamps in some form, so just pick a standard time for all tests.
# We ignore socketserver because it checks the result of time() when evaluating timeouts.
@freezegun.freeze_time(_FROZEN_TIME, ignore=["socketserver"])
@time_machine.travel(_FROZEN_TIME, tick=False)
class AsyncTestCase(unittest.IsolatedAsyncioTestCase):
"""
Test case to hold some common code (suitable for async *OR* sync tests)
Expand Down

0 comments on commit 9800ef1

Please sign in to comment.