-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
28ab011
commit 7dcafc3
Showing
4 changed files
with
197 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
import os | ||
import unittest | ||
from urllib.error import URLError, HTTPError | ||
|
||
from uid2_client import IdentityMapClient, IdentityMapInput, normalize_and_hash_email, normalize_and_hash_phone | ||
|
||
|
||
class IdentityMapIntegrationTests(unittest.TestCase): | ||
UID2_BASE_URL = None | ||
UID2_API_KEY = None | ||
UID2_SECRET_KEY = None | ||
|
||
identity_map_client = None | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
cls.UID2_BASE_URL = os.getenv("UID2_BASE_URL") | ||
cls.UID2_API_KEY = os.getenv("UID2_API_KEY") | ||
cls.UID2_SECRET_KEY = os.getenv("UID2_SECRET_KEY") | ||
|
||
print(cls.UID2_BASE_URL, cls.UID2_API_KEY, cls.UID2_SECRET_KEY) | ||
|
||
if cls.UID2_BASE_URL and cls.UID2_API_KEY and cls.UID2_SECRET_KEY: | ||
cls.identity_map_client = IdentityMapClient(cls.UID2_BASE_URL, cls.UID2_API_KEY, cls.UID2_SECRET_KEY) | ||
else: | ||
raise Exception("set the required UID2_BASE_URL/UID2_API_KEY/UID2_SECRET_KEY environment variables first") | ||
|
||
def test_identity_map_emails(self): | ||
identity_map_input = IdentityMapInput.from_emails( | ||
["[email protected]", "[email protected]", "[email protected]"]) | ||
response = self.identity_map_client.generate_identity_map(identity_map_input) | ||
self.assert_mapped(response, "[email protected]") | ||
self.assert_mapped(response, "[email protected]") | ||
|
||
self.assert_unmapped(response, "optout", "[email protected]") | ||
|
||
def test_identity_map_nothing_unmapped(self): | ||
identity_map_input = IdentityMapInput.from_emails( | ||
["[email protected]", "[email protected]"]) | ||
response = self.identity_map_client.generate_identity_map(identity_map_input) | ||
self.assert_mapped(response, "[email protected]") | ||
self.assert_mapped(response, "[email protected]") | ||
|
||
def test_identity_map_nothing_mapped(self): | ||
identity_map_input = IdentityMapInput.from_emails(["[email protected]"]) | ||
response = self.identity_map_client.generate_identity_map(identity_map_input) | ||
self.assert_unmapped(response, "optout", "[email protected]") | ||
|
||
def test_identity_map_invalid_email(self): | ||
self.assertRaises(ValueError, IdentityMapInput.from_emails, | ||
["[email protected]", "this is not an email"]) | ||
|
||
def test_identity_map_invalid_phone(self): | ||
self.assertRaises(ValueError, IdentityMapInput.from_phones, | ||
["+12345678901", "this is not a phone number"]) | ||
|
||
def test_identity_map_invalid_hashed_email(self): | ||
identity_map_input = IdentityMapInput.from_hashed_emails(["this is not a hashed email"]) | ||
response = self.identity_map_client.generate_identity_map(identity_map_input) | ||
self.assert_unmapped(response, "invalid identifier", "this is not a hashed email") | ||
|
||
def test_identity_map_invalid_hashed_phone(self): | ||
identity_map_input = IdentityMapInput.from_hashed_emails(["this is not a hashed phone"]) | ||
response = self.identity_map_client.generate_identity_map(identity_map_input) | ||
self.assert_unmapped(response, "invalid identifier", "this is not a hashed phone") | ||
|
||
def test_identity_map_hashed_emails(self): | ||
hashed_email1 = normalize_and_hash_email("[email protected]") | ||
hashed_email2 = normalize_and_hash_email("[email protected]") | ||
hashed_opted_out_email = normalize_and_hash_email("[email protected]") | ||
identity_map_input = IdentityMapInput.from_hashed_emails([hashed_email1, hashed_email2, hashed_opted_out_email]) | ||
|
||
response = self.identity_map_client.generate_identity_map(identity_map_input) | ||
|
||
self.assert_mapped(response, hashed_email1) | ||
self.assert_mapped(response, hashed_email2) | ||
|
||
self.assert_unmapped(response, "optout", hashed_opted_out_email) | ||
|
||
def test_identity_map_duplicate_emails(self): | ||
identity_map_input = IdentityMapInput.from_emails( | ||
["[email protected]", "[email protected]", "[email protected]", "[email protected]", | ||
"[email protected]"]) | ||
response = self.identity_map_client.generate_identity_map(identity_map_input) | ||
|
||
mapped_identities = response.mapped_identities | ||
self.assertEqual(4, len(mapped_identities)) | ||
|
||
raw_uid = mapped_identities.get("[email protected]").get_raw_id() | ||
self.assertEqual(raw_uid, mapped_identities.get("[email protected]").get_raw_id()) | ||
self.assertEqual(raw_uid, mapped_identities.get("[email protected]").get_raw_id()) | ||
self.assertEqual(raw_uid, mapped_identities.get("[email protected]").get_raw_id()) | ||
|
||
def test_identity_map_duplicate_hashed_emails(self): | ||
hashed_email = normalize_and_hash_email("[email protected]") | ||
duplicate_hashed_email = hashed_email | ||
hashed_opted_out_email = normalize_and_hash_email("[email protected]") | ||
duplicate_hashed_opted_out_email = hashed_opted_out_email | ||
|
||
identity_map_input = IdentityMapInput.from_hashed_emails( | ||
[hashed_email, duplicate_hashed_email, hashed_opted_out_email, duplicate_hashed_opted_out_email]) | ||
response = self.identity_map_client.generate_identity_map(identity_map_input) | ||
|
||
self.assert_mapped(response, hashed_email) | ||
self.assert_mapped(response, duplicate_hashed_email) | ||
|
||
self.assert_unmapped(response, "optout", hashed_opted_out_email) | ||
self.assert_unmapped(response, "optout", duplicate_hashed_opted_out_email) | ||
|
||
'''def test_identity_map_empty_input(self): | ||
identity_map_input = IdentityMapInput.from_emails([]) | ||
response = self.identity_map_client.generate_identity_map(identity_map_input) | ||
self.assertIsNone(response.mapped_identities) | ||
self.assertIsNone(response.unmapped_identities)''' | ||
|
||
def test_identity_map_phones(self): | ||
identity_map_input = IdentityMapInput.from_phones(["+12345678901", "+98765432109", "+00000000000"]) | ||
response = self.identity_map_client.generate_identity_map(identity_map_input) | ||
self.assert_mapped(response, "+12345678901") | ||
self.assert_mapped(response, "+98765432109") | ||
|
||
self.assert_unmapped(response, "optout", "+00000000000") | ||
|
||
def test_identity_map_hashed_phones(self): | ||
hashed_phone1 = normalize_and_hash_phone("+12345678901") | ||
hashed_phone2 = normalize_and_hash_phone("+98765432109") | ||
hashed_opted_out_phone = normalize_and_hash_phone("+00000000000") | ||
identity_map_input = IdentityMapInput.from_hashed_phones([hashed_phone1, hashed_phone2, hashed_opted_out_phone]) | ||
response = self.identity_map_client.generate_identity_map(identity_map_input) | ||
self.assert_mapped(response, hashed_phone1) | ||
self.assert_mapped(response, hashed_phone2) | ||
|
||
self.assert_unmapped(response, "optout", hashed_opted_out_phone) | ||
|
||
def test_identity_map_bad_url(self): | ||
identity_map_input = IdentityMapInput.from_emails( | ||
["[email protected]", "[email protected]", "[email protected]"]) | ||
client = IdentityMapClient("https://operator-bad-url.uidapi.com", os.getenv("UID2_API_KEY"), os.getenv("UID2_SECRET_KEY")) | ||
self.assertRaises(URLError, client.generate_identity_map, identity_map_input) | ||
|
||
def test_identity_map_bad_api_key(self): | ||
identity_map_input = IdentityMapInput.from_emails( | ||
["[email protected]", "[email protected]", "[email protected]"]) | ||
client = IdentityMapClient(os.getenv("UID2_BASE_URL"), "bad-api-key", os.getenv("UID2_SECRET_KEY")) | ||
self.assertRaises(HTTPError, client.generate_identity_map,identity_map_input) | ||
|
||
def test_identity_map_bad_secret(self): | ||
identity_map_input = IdentityMapInput.from_emails( | ||
["[email protected]", "[email protected]", "[email protected]"]) | ||
client = IdentityMapClient(os.getenv("UID2_BASE_URL"), os.getenv("UID2_API_KEY"), "wJ0hP19QU4hmpB64Y3fV2dAed8t/mupw3sjN5jNRFzg=") | ||
self.assertRaises(HTTPError, client.generate_identity_map, | ||
identity_map_input) | ||
|
||
def assert_mapped(self, response, ddi): | ||
mapped_identity = response.mapped_identities.get(ddi) | ||
self.assertIsNotNone(mapped_identity) | ||
self.assertIsNotNone(mapped_identity.get_raw_id()) | ||
self.assertIsNotNone(mapped_identity.get_bucket_id()) | ||
|
||
unmapped_identity = response.unmapped_identities.get(ddi) | ||
self.assertIsNone(unmapped_identity) | ||
|
||
def assert_unmapped(self, response, reason, ddi): | ||
unmapped_identity = response.unmapped_identities.get(ddi) | ||
self.assertEqual(reason, unmapped_identity.get_reason()) | ||
|
||
mapped_identity = response.mapped_identities.get(ddi) | ||
self.assertIsNone(mapped_identity) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters