From 7dcafc323142078b62b392ee4ca8bd2e60ecd11d Mon Sep 17 00:00:00 2001 From: Caroline6312 Date: Wed, 15 May 2024 10:23:00 -0700 Subject: [PATCH] Add unit tests --- tests/test_identity_map_client.py | 172 +++++++++++++++++++++++++++ uid2_client/identity_map_input.py | 20 ++-- uid2_client/identity_map_response.py | 2 +- uid2_client/input_util.py | 13 ++ 4 files changed, 197 insertions(+), 10 deletions(-) create mode 100644 tests/test_identity_map_client.py diff --git a/tests/test_identity_map_client.py b/tests/test_identity_map_client.py new file mode 100644 index 0000000..4f054a4 --- /dev/null +++ b/tests/test_identity_map_client.py @@ -0,0 +1,172 @@ +import os +import unittest +from urllib.error import URLError, HTTPError + +from uid2_client import IdentityMapClient, IdentityMapInput, normalize_and_hash_email, normalize_and_hash_phone + + +class IdentityMapIntegrationTests(unittest.TestCase): + UID2_BASE_URL = None + UID2_API_KEY = None + UID2_SECRET_KEY = None + + identity_map_client = None + + @classmethod + def setUpClass(cls): + cls.UID2_BASE_URL = os.getenv("UID2_BASE_URL") + cls.UID2_API_KEY = os.getenv("UID2_API_KEY") + cls.UID2_SECRET_KEY = os.getenv("UID2_SECRET_KEY") + + print(cls.UID2_BASE_URL, cls.UID2_API_KEY, cls.UID2_SECRET_KEY) + + if cls.UID2_BASE_URL and cls.UID2_API_KEY and cls.UID2_SECRET_KEY: + cls.identity_map_client = IdentityMapClient(cls.UID2_BASE_URL, cls.UID2_API_KEY, cls.UID2_SECRET_KEY) + else: + raise Exception("set the required UID2_BASE_URL/UID2_API_KEY/UID2_SECRET_KEY environment variables first") + + def test_identity_map_emails(self): + identity_map_input = IdentityMapInput.from_emails( + ["hopefully-not-opted-out@example.com", "somethingelse@example.com", "optout@example.com"]) + response = self.identity_map_client.generate_identity_map(identity_map_input) + self.assert_mapped(response, "hopefully-not-opted-out@example.com") + self.assert_mapped(response, "somethingelse@example.com") + + self.assert_unmapped(response, "optout", "optout@example.com") + + def test_identity_map_nothing_unmapped(self): + identity_map_input = IdentityMapInput.from_emails( + ["hopefully-not-opted-out@example.com", "somethingelse@example.com"]) + response = self.identity_map_client.generate_identity_map(identity_map_input) + self.assert_mapped(response, "hopefully-not-opted-out@example.com") + self.assert_mapped(response, "somethingelse@example.com") + + def test_identity_map_nothing_mapped(self): + identity_map_input = IdentityMapInput.from_emails(["optout@example.com"]) + response = self.identity_map_client.generate_identity_map(identity_map_input) + self.assert_unmapped(response, "optout", "optout@example.com") + + def test_identity_map_invalid_email(self): + self.assertRaises(ValueError, IdentityMapInput.from_emails, + ["email@example.com", "this is not an email"]) + + def test_identity_map_invalid_phone(self): + self.assertRaises(ValueError, IdentityMapInput.from_phones, + ["+12345678901", "this is not a phone number"]) + + def test_identity_map_invalid_hashed_email(self): + identity_map_input = IdentityMapInput.from_hashed_emails(["this is not a hashed email"]) + response = self.identity_map_client.generate_identity_map(identity_map_input) + self.assert_unmapped(response, "invalid identifier", "this is not a hashed email") + + def test_identity_map_invalid_hashed_phone(self): + identity_map_input = IdentityMapInput.from_hashed_emails(["this is not a hashed phone"]) + response = self.identity_map_client.generate_identity_map(identity_map_input) + self.assert_unmapped(response, "invalid identifier", "this is not a hashed phone") + + def test_identity_map_hashed_emails(self): + hashed_email1 = normalize_and_hash_email("hopefully-not-opted-out@example.com") + hashed_email2 = normalize_and_hash_email("somethingelse@example.com") + hashed_opted_out_email = normalize_and_hash_email("optout@example.com") + identity_map_input = IdentityMapInput.from_hashed_emails([hashed_email1, hashed_email2, hashed_opted_out_email]) + + response = self.identity_map_client.generate_identity_map(identity_map_input) + + self.assert_mapped(response, hashed_email1) + self.assert_mapped(response, hashed_email2) + + self.assert_unmapped(response, "optout", hashed_opted_out_email) + + def test_identity_map_duplicate_emails(self): + identity_map_input = IdentityMapInput.from_emails( + ["JANE.SAOIRSE@gmail.com", "Jane.Saoirse@gmail.com", "JaneSaoirse+UID2@gmail.com", "janesaoirse@gmail.com", + "JANE.SAOIRSE@gmail.com"]) + response = self.identity_map_client.generate_identity_map(identity_map_input) + + mapped_identities = response.mapped_identities + self.assertEqual(4, len(mapped_identities)) + + raw_uid = mapped_identities.get("JANE.SAOIRSE@gmail.com").get_raw_id() + self.assertEqual(raw_uid, mapped_identities.get("Jane.Saoirse@gmail.com").get_raw_id()) + self.assertEqual(raw_uid, mapped_identities.get("JaneSaoirse+UID2@gmail.com").get_raw_id()) + self.assertEqual(raw_uid, mapped_identities.get("janesaoirse@gmail.com").get_raw_id()) + + def test_identity_map_duplicate_hashed_emails(self): + hashed_email = normalize_and_hash_email("hopefully-not-opted-out@example.com") + duplicate_hashed_email = hashed_email + hashed_opted_out_email = normalize_and_hash_email("optout@example.com") + duplicate_hashed_opted_out_email = hashed_opted_out_email + + identity_map_input = IdentityMapInput.from_hashed_emails( + [hashed_email, duplicate_hashed_email, hashed_opted_out_email, duplicate_hashed_opted_out_email]) + response = self.identity_map_client.generate_identity_map(identity_map_input) + + self.assert_mapped(response, hashed_email) + self.assert_mapped(response, duplicate_hashed_email) + + self.assert_unmapped(response, "optout", hashed_opted_out_email) + self.assert_unmapped(response, "optout", duplicate_hashed_opted_out_email) + + '''def test_identity_map_empty_input(self): + identity_map_input = IdentityMapInput.from_emails([]) + response = self.identity_map_client.generate_identity_map(identity_map_input) + self.assertIsNone(response.mapped_identities) + self.assertIsNone(response.unmapped_identities)''' + + def test_identity_map_phones(self): + identity_map_input = IdentityMapInput.from_phones(["+12345678901", "+98765432109", "+00000000000"]) + response = self.identity_map_client.generate_identity_map(identity_map_input) + self.assert_mapped(response, "+12345678901") + self.assert_mapped(response, "+98765432109") + + self.assert_unmapped(response, "optout", "+00000000000") + + def test_identity_map_hashed_phones(self): + hashed_phone1 = normalize_and_hash_phone("+12345678901") + hashed_phone2 = normalize_and_hash_phone("+98765432109") + hashed_opted_out_phone = normalize_and_hash_phone("+00000000000") + identity_map_input = IdentityMapInput.from_hashed_phones([hashed_phone1, hashed_phone2, hashed_opted_out_phone]) + response = self.identity_map_client.generate_identity_map(identity_map_input) + self.assert_mapped(response, hashed_phone1) + self.assert_mapped(response, hashed_phone2) + + self.assert_unmapped(response, "optout", hashed_opted_out_phone) + + def test_identity_map_bad_url(self): + identity_map_input = IdentityMapInput.from_emails( + ["hopefully-not-opted-out@example.com", "somethingelse@example.com", "optout@example.com"]) + client = IdentityMapClient("https://operator-bad-url.uidapi.com", os.getenv("UID2_API_KEY"), os.getenv("UID2_SECRET_KEY")) + self.assertRaises(URLError, client.generate_identity_map, identity_map_input) + + def test_identity_map_bad_api_key(self): + identity_map_input = IdentityMapInput.from_emails( + ["hopefully-not-opted-out@example.com", "somethingelse@example.com", "optout@example.com"]) + client = IdentityMapClient(os.getenv("UID2_BASE_URL"), "bad-api-key", os.getenv("UID2_SECRET_KEY")) + self.assertRaises(HTTPError, client.generate_identity_map,identity_map_input) + + def test_identity_map_bad_secret(self): + identity_map_input = IdentityMapInput.from_emails( + ["hopefully-not-opted-out@example.com", "somethingelse@example.com", "optout@example.com"]) + client = IdentityMapClient(os.getenv("UID2_BASE_URL"), os.getenv("UID2_API_KEY"), "wJ0hP19QU4hmpB64Y3fV2dAed8t/mupw3sjN5jNRFzg=") + self.assertRaises(HTTPError, client.generate_identity_map, + identity_map_input) + + def assert_mapped(self, response, ddi): + mapped_identity = response.mapped_identities.get(ddi) + self.assertIsNotNone(mapped_identity) + self.assertIsNotNone(mapped_identity.get_raw_id()) + self.assertIsNotNone(mapped_identity.get_bucket_id()) + + unmapped_identity = response.unmapped_identities.get(ddi) + self.assertIsNone(unmapped_identity) + + def assert_unmapped(self, response, reason, ddi): + unmapped_identity = response.unmapped_identities.get(ddi) + self.assertEqual(reason, unmapped_identity.get_reason()) + + mapped_identity = response.mapped_identities.get(ddi) + self.assertIsNone(mapped_identity) + + +if __name__ == '__main__': + unittest.main() diff --git a/uid2_client/identity_map_input.py b/uid2_client/identity_map_input.py index 0cb7b5f..e401a7d 100644 --- a/uid2_client/identity_map_input.py +++ b/uid2_client/identity_map_input.py @@ -1,6 +1,7 @@ import json -from uid2_client import IdentityType, normalize_email_string, get_base64_encoded_hash, is_phone_number_normalized +from uid2_client import IdentityType, normalize_email_string, get_base64_encoded_hash, is_phone_number_normalized, \ + normalize_and_hash_email, normalize_and_hash_phone class IdentityMapInput: @@ -15,20 +16,15 @@ def __init__(self, identity_type, emails_or_phones, already_hashed): if already_hashed: self.hashed_normalized_emails.append(email) else: - normalized_email = normalize_email_string(email) - if normalized_email is None: - raise ValueError("invalid email address") - hashed_normalized_email = get_base64_encoded_hash(normalized_email) - self.hashed_normalized_emails.append(hashed_normalized_email) + hashed_normalized_email = normalize_and_hash_email(email) self._add_hashed_to_raw_dii_mapping(hashed_normalized_email, email) + self.hashed_normalized_emails.append(hashed_normalized_email) else: # phone for phone in emails_or_phones: if already_hashed: self.hashed_normalized_phones.append(phone) else: - if not is_phone_number_normalized(phone): - raise ValueError("phone number is not normalized: " + phone) - hashed_normalized_phone = get_base64_encoded_hash(phone) + hashed_normalized_phone = normalize_and_hash_phone(phone) self._add_hashed_to_raw_dii_mapping(hashed_normalized_phone, phone) self.hashed_normalized_phones.append(hashed_normalized_phone) @@ -51,6 +47,12 @@ def from_hashed_phones(hashed_phones): def _add_hashed_to_raw_dii_mapping(self, hashed_dii, raw_dii): self.hashed_dii_to_raw_diis.setdefault(hashed_dii, []).append(raw_dii) + def get_raw_diis(self, identifier): + if len(self.hashed_dii_to_raw_diis) <= 0: + return [identifier] + else: + return self.hashed_dii_to_raw_diis[identifier] + def get_identity_map_input_as_json_string(self): json_object = { "email_hash": self.hashed_normalized_emails, diff --git a/uid2_client/identity_map_response.py b/uid2_client/identity_map_response.py index cab7670..d7eaf4e 100644 --- a/uid2_client/identity_map_response.py +++ b/uid2_client/identity_map_response.py @@ -32,7 +32,7 @@ def _get_body_as_json(json_response): @staticmethod def _get_raw_diis(identity, identity_map_input): identifier = identity["identifier"] - return identity_map_input.hashed_dii_to_raw_diis[identifier] + return identity_map_input.get_raw_diis(identifier) def is_success(self): return self.status == "success" diff --git a/uid2_client/input_util.py b/uid2_client/input_util.py index b54c4a6..6d85f63 100644 --- a/uid2_client/input_util.py +++ b/uid2_client/input_util.py @@ -106,3 +106,16 @@ def get_base64_encoded_hash(input): def get_sha256_bytes(input): return hashlib.sha256(input.encode()).digest() + + +def normalize_and_hash_email(email): + normalized_email = normalize_email_string(email) + if normalized_email is None: + raise ValueError("invalid email address: " + email) + return get_base64_encoded_hash(normalized_email) + + +def normalize_and_hash_phone(phone): + if not is_phone_number_normalized(phone): + raise ValueError("phone number is not normalized: " + phone) + return get_base64_encoded_hash(phone)