-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
184 additions
and
139 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,13 @@ | ||
import base64 | ||
import contextlib | ||
import io | ||
import json | ||
import time | ||
from unittest import mock | ||
from urllib.error import HTTPError | ||
|
||
import chalice | ||
import jwt | ||
import pytest | ||
import yaml | ||
from botocore.exceptions import ClientError | ||
|
@@ -41,6 +44,25 @@ def user_profile(): | |
) | ||
|
||
|
||
@pytest.fixture | ||
def bearer_token(private_key): | ||
return jwt.encode( | ||
{ | ||
"type": "User", | ||
"uid": "test_user", | ||
"exp": int(time.time()) + 50_000, | ||
"iat": int(time.time()), | ||
"iss": "https://uat.urs.earthdata.nasa.gov", | ||
"identity_provider": "edl_uat", | ||
"acr": "edl", | ||
"assurance_level": 3, | ||
}, | ||
private_key, | ||
headers={"origin": "Earthdata Login", "sig": "edlfakepubkey_uat"}, | ||
algorithm="RS256", | ||
) | ||
|
||
|
||
@pytest.fixture | ||
def _clear_caches(): | ||
app.get_bc_config_client.cache.clear() | ||
|
@@ -134,11 +156,9 @@ def test_request_authorizer_no_headers(current_request, mock_get_urs_url): | |
assert authorizer.get_success_response_headers() == {} | ||
|
||
|
||
@mock.patch(f"{MODULE}.get_user_from_token", autospec=True) | ||
@mock.patch(f"{MODULE}.get_new_token_and_profile", autospec=True) | ||
@mock.patch(f"{MODULE}.get_profile_with_jwt_bearer", autospec=True) | ||
def test_request_authorizer_bearer_header( | ||
mock_get_new_token_and_profile, | ||
mock_get_user_from_token, | ||
mock_get_profile_with_jwt_bearer, | ||
_clear_caches, | ||
current_request, | ||
): | ||
|
@@ -147,20 +167,12 @@ def test_request_authorizer_bearer_header( | |
"x-origin-request-id": "origin_request_id" | ||
} | ||
mock_user_profile = mock.Mock() | ||
mock_get_new_token_and_profile.return_value = mock_user_profile | ||
mock_get_user_from_token.return_value = "user_name" | ||
mock_get_profile_with_jwt_bearer.return_value = mock_user_profile | ||
|
||
authorizer = app.RequestAuthorizer() | ||
|
||
assert authorizer.get_profile() == mock_user_profile | ||
mock_get_new_token_and_profile.assert_called_once_with( | ||
"user_name", | ||
True, | ||
aux_headers={ | ||
"x-request-id": "request_1234", | ||
"x-origin-request-id": "origin_request_id" | ||
} | ||
) | ||
mock_get_profile_with_jwt_bearer.assert_called_once_with("token") | ||
|
||
|
||
@mock.patch(f"{MODULE}.do_auth_and_return", autospec=True) | ||
|
@@ -182,17 +194,22 @@ def test_request_authorizer_basic_header( | |
assert authorizer.get_error_response() == mock_response | ||
|
||
|
||
@mock.patch(f"{MODULE}.get_user_from_token", autospec=True) | ||
@mock.patch(f"{MODULE}.get_profile_with_jwt_bearer", autospec=True) | ||
def test_request_authorizer_bearer_header_eula_error( | ||
mock_get_user_from_token, | ||
mock_get_profile_with_jwt_bearer, | ||
_clear_caches, | ||
current_request, | ||
): | ||
current_request.headers = {"Authorization": "Bearer token"} | ||
mock_get_user_from_token.side_effect = EulaException( | ||
mock_get_profile_with_jwt_bearer.side_effect = EulaException( | ||
HTTPError("", 403, "Forbidden", {}, io.StringIO()), | ||
{}, | ||
"", | ||
{ | ||
"status_code": 403, | ||
"error": "invalid_token", | ||
"error_description": "EULA Acceptance Failure", | ||
"resolution_url": "http://example", | ||
}, | ||
None, | ||
) | ||
|
||
authorizer = app.RequestAuthorizer() | ||
|
@@ -204,9 +221,9 @@ def test_request_authorizer_bearer_header_eula_error( | |
assert response.headers == {} | ||
|
||
|
||
@mock.patch(f"{MODULE}.get_user_from_token", autospec=True) | ||
@mock.patch(f"{MODULE}.get_profile_with_jwt_bearer", autospec=True) | ||
def test_request_authorizer_bearer_header_eula_error_browser( | ||
mock_get_user_from_token, | ||
mock_get_profile_with_jwt_bearer, | ||
mock_make_html_response, | ||
_clear_caches, | ||
current_request, | ||
|
@@ -220,7 +237,7 @@ def test_request_authorizer_bearer_header_eula_error_browser( | |
"error_description": "EULA Acceptance Failure", | ||
"resolution_url": "http://resolution_url", | ||
} | ||
mock_get_user_from_token.side_effect = EulaException( | ||
mock_get_profile_with_jwt_bearer.side_effect = EulaException( | ||
HTTPError("", 403, "Forbidden", {}, None), | ||
msg, | ||
None, | ||
|
@@ -249,13 +266,11 @@ def test_request_authorizer_bearer_header_eula_error_browser( | |
assert response.headers == {"Content-Type": "text/html"} | ||
|
||
|
||
@mock.patch(f"{MODULE}.get_user_from_token", autospec=True) | ||
@mock.patch(f"{MODULE}.get_new_token_and_profile", autospec=True) | ||
@mock.patch(f"{MODULE}.get_profile_with_jwt_bearer", autospec=True) | ||
@mock.patch(f"{MODULE}.do_auth_and_return", autospec=True) | ||
def test_request_authorizer_bearer_header_no_profile( | ||
def test_request_authorizer_bearer_header_other_error( | ||
mock_do_auth_and_return, | ||
mock_get_new_token_and_profile, | ||
mock_get_user_from_token, | ||
mock_get_profile_with_jwt_bearer, | ||
_clear_caches, | ||
current_request, | ||
): | ||
|
@@ -265,29 +280,21 @@ def test_request_authorizer_bearer_header_no_profile( | |
} | ||
mock_response = mock.Mock() | ||
mock_do_auth_and_return.return_value = mock_response | ||
mock_get_new_token_and_profile.return_value = None | ||
mock_get_user_from_token.return_value = "user_name" | ||
mock_get_profile_with_jwt_bearer.side_effect = Exception("test exception") | ||
|
||
authorizer = app.RequestAuthorizer() | ||
|
||
assert authorizer.get_profile() is None | ||
assert authorizer.get_error_response() == mock_response | ||
mock_get_new_token_and_profile.assert_called_once_with( | ||
"user_name", | ||
True, | ||
aux_headers={ | ||
"x-request-id": "request_1234", | ||
"x-origin-request-id": "origin_request_id" | ||
} | ||
) | ||
mock_get_profile_with_jwt_bearer.assert_called_once_with("token") | ||
mock_do_auth_and_return.assert_called_once_with(current_request.context) | ||
|
||
|
||
@mock.patch(f"{MODULE}.get_user_from_token", autospec=True) | ||
@mock.patch(f"{MODULE}.get_profile_with_jwt_bearer", autospec=True) | ||
@mock.patch(f"{MODULE}.do_auth_and_return", autospec=True) | ||
def test_request_authorizer_bearer_header_no_user_id( | ||
mock_do_auth_and_return, | ||
mock_get_user_from_token, | ||
mock_get_profile_with_jwt_bearer, | ||
_clear_caches, | ||
current_request, | ||
): | ||
|
@@ -297,7 +304,7 @@ def test_request_authorizer_bearer_header_no_user_id( | |
} | ||
mock_response = mock.Mock() | ||
mock_do_auth_and_return.return_value = mock_response | ||
mock_get_user_from_token.side_effect = EdlException(KeyError("uid"), {}, None) | ||
mock_get_profile_with_jwt_bearer.side_effect = KeyError("uid") | ||
|
||
authorizer = app.RequestAuthorizer() | ||
|
||
|
@@ -341,37 +348,67 @@ def test_check_for_browser(): | |
assert app.check_for_browser({"user-agent": "Not a valid user agent"}) is False | ||
|
||
|
||
def test_get_user_from_token(mock_request, mock_get_urs_creds, current_request): | ||
def test_get_profile_with_jwt_bearer( | ||
mock_request, | ||
mock_get_urs_creds, | ||
bearer_token, | ||
current_request, | ||
): | ||
del current_request | ||
|
||
payload = '{"uid": "user_name"}' | ||
payload = json.dumps({ | ||
"uid": "test_user", | ||
"user_groups": [], | ||
"first_name": "John", | ||
"last_name": "Smith", | ||
"email_address": "[email protected]", | ||
}) | ||
mock_response = mock.MagicMock() | ||
with mock_response as mock_f: | ||
mock_f.read.return_value = payload | ||
mock_response.code = 200 | ||
mock_request.urlopen.return_value = mock_response | ||
|
||
assert app.get_user_from_token("token") == "user_name" | ||
profile = app.get_profile_with_jwt_bearer(bearer_token) | ||
assert profile.user_id == "test_user" | ||
assert profile.token == bearer_token | ||
mock_get_urs_creds.assert_called_once() | ||
|
||
|
||
def test_get_user_from_token_eula_error(mock_request, mock_get_urs_creds, current_request): | ||
def test_get_profile_with_jwt_bearer_eula_error( | ||
mock_request, | ||
mock_get_urs_creds, | ||
bearer_token, | ||
current_request, | ||
): | ||
del current_request | ||
|
||
payload = """{ | ||
"status_code": 403, | ||
"error": "invalid_token", | ||
"error_description": "EULA Acceptance Failure", | ||
"resolution_url": "http://uat.urs.earthdata.nasa.gov/approve_app?client_id=asdf" | ||
} | ||
""" | ||
mock_request.urlopen.side_effect = HTTPError("", 403, "Forbidden", {}, io.StringIO(payload)) | ||
mock_request.urlopen.side_effect = HTTPError( | ||
"", | ||
403, | ||
"Forbidden", | ||
{}, | ||
io.StringIO(payload), | ||
) | ||
|
||
with pytest.raises(EulaException): | ||
app.get_user_from_token("token") | ||
app.get_profile_with_jwt_bearer(bearer_token) | ||
mock_get_urs_creds.assert_called_once() | ||
|
||
|
||
def test_get_user_from_token_other_error(mock_request, mock_get_urs_creds, current_request): | ||
def test_get_profile_with_jwt_bearer_other_error( | ||
mock_request, | ||
mock_get_urs_creds, | ||
bearer_token, | ||
current_request, | ||
): | ||
del current_request | ||
|
||
payload = """{ | ||
|
@@ -380,21 +417,39 @@ def test_get_user_from_token_other_error(mock_request, mock_get_urs_creds, curre | |
"error_description": "some error description" | ||
} | ||
""" | ||
mock_request.urlopen.side_effect = HTTPError("", 401, "Bad Request", {}, io.StringIO(payload)) | ||
mock_request.urlopen.side_effect = HTTPError( | ||
"", | ||
401, | ||
"Bad Request", | ||
{}, | ||
io.StringIO(payload), | ||
) | ||
|
||
with pytest.raises(EdlException): | ||
assert app.get_user_from_token("token") | ||
assert app.get_profile_with_jwt_bearer(bearer_token) | ||
mock_get_urs_creds.assert_called_once() | ||
|
||
|
||
@pytest.mark.parametrize("code", (200, 403, 500)) | ||
def test_get_user_from_token_json_error(mock_request, mock_get_urs_creds, current_request, code): | ||
def test_get_profile_with_jwt_bearer_json_error( | ||
mock_request, | ||
mock_get_urs_creds, | ||
bearer_token, | ||
current_request, | ||
code, | ||
): | ||
del current_request | ||
|
||
mock_request.urlopen.side_effect = HTTPError("", code, "Message", {}, io.StringIO("not valid json")) | ||
mock_request.urlopen.side_effect = HTTPError( | ||
"", | ||
code, | ||
"Message", | ||
{}, | ||
io.StringIO("not valid json"), | ||
) | ||
|
||
with pytest.raises(EdlException): | ||
assert app.get_user_from_token("token") | ||
assert app.get_profile_with_jwt_bearer(bearer_token) is not None | ||
mock_get_urs_creds.assert_called_once() | ||
|
||
|
||
|
@@ -1553,27 +1608,28 @@ def test_dynamic_url_directory( | |
@mock.patch(f"{MODULE}.get_yaml_file", autospec=True) | ||
@mock.patch(f"{MODULE}.get_api_request_uuid", autospec=True) | ||
@mock.patch(f"{MODULE}.try_download_from_bucket", autospec=True) | ||
@mock.patch(f"{MODULE}.get_user_from_token", autospec=True) | ||
@mock.patch(f"{MODULE}.get_new_token_and_profile", autospec=True) | ||
@mock.patch(f"{MODULE}.get_profile_with_jwt_bearer", autospec=True) | ||
@mock.patch(f"{MODULE}.JwtManager.get_profile_from_headers", autospec=True) | ||
@mock.patch(f"{MODULE}.JwtManager.get_header_to_set_auth_cookie", autospec=True) | ||
@mock.patch(f"{MODULE}.JWT_COOKIE_NAME", "asf-cookie") | ||
@mock.patch(f"{MODULE}.b_map", None) | ||
def test_dynamic_url_bearer_auth( | ||
mock_get_header_to_set_auth_cookie, | ||
mock_get_profile_from_headers, | ||
mock_get_new_token_and_profile, | ||
mock_get_user_from_token, | ||
mock_get_profile_with_jwt_bearer, | ||
mock_try_download_from_bucket, | ||
mock_get_api_request_uuid, | ||
mock_get_yaml_file, | ||
data_path, | ||
user_profile, | ||
current_request, | ||
): | ||
mock_try_download_from_bucket.return_value = chalice.Response(body="Mock response", headers={}, status_code=200) | ||
mock_get_new_token_and_profile.return_value = user_profile | ||
mock_get_user_from_token.return_value = user_profile.user_id | ||
mock_try_download_from_bucket.return_value = chalice.Response( | ||
body="Mock response", | ||
headers={}, | ||
status_code=200, | ||
) | ||
mock_get_profile_with_jwt_bearer.return_value = user_profile | ||
mock_get_header_to_set_auth_cookie.return_value = {"SET-COOKIE": "cookie"} | ||
with open(data_path / "bucket_map_example.yaml") as f: | ||
mock_get_yaml_file.return_value = yaml.full_load(f) | ||
|
Oops, something went wrong.