-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadd-users-to-okta-group
executable file
·171 lines (143 loc) · 5.38 KB
/
add-users-to-okta-group
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#!/usr/bin/env python
"""
Configurable via environment variables:
OKTA_API_SUBDOMAIN: an okta url subdomain like "example" for the url below
OKTA_API_URL: a url like https://example.okta.com/api/1
OKTA_API_KEY: a token from your Okta admin site at the path /admin/access/api/tokens
If a subdomain is provided it takes priority over a provided url.
This script then takes a minimum of two arguments:
the first being the name of a group and all
remaining arguments are usernames or emails of
users to add to that group.
"""
import os
import sys
from pprint import pprint
import requests
OKTA_API_SUBDOMAIN = os.environ.get('OKTA_API_SUBDOMAIN')
if OKTA_API_SUBDOMAIN:
OKTA_API_URL = f"https://{OKTA_API_SUBDOMAIN}.okta.com"
else:
OKTA_API_URL = os.environ.get('OKTA_API_URL')
OKTA_API_URL = OKTA_API_URL + '/api/v1'
OKTA_API_KEY = os.environ.get('OKTA_API_KEY')
OKTA_API_HEADERS = {
'Accept': 'application/json',
'Content-Type': 'application/json',
'Authorization': f"SSWS {OKTA_API_KEY}"
}
OKTA_GROUP_ALREADY_EXISTS_MESSAGE = \
'name: An object with this field already' \
'exists in the current organization'
USER_IDS = {}
GROUP_IDS = {}
GROUP_USERS = {}
class UnauthorizedError(Exception):
pass
def okta_api_get_request(path: str) -> requests.models.Response:
response = requests.get(
OKTA_API_URL + path,
headers=OKTA_API_HEADERS
)
if response.status_code == 401:
raise UnauthorizedError
return response
def get_okta_group_id(name: str) -> str:
if name in GROUP_IDS:
return GROUP_IDS[name]
response = okta_api_get_request(f"/groups?q={name}")
response_json = response.json()
group_id = ''
if response.status_code == 200:
if len(response_json) == 1:
group_id = response_json[0]['id']
else:
for group in response_json:
if group['profile']['name'] == name:
group_id = group['id']
if not group_id:
print(f"No group with name {name} found", file=sys.stderr)
GROUP_IDS[name] = group_id
return group_id
def add_member_to_okta_group(member: str, group_name: str) -> bool:
user_id = get_user_id(member)
group_id = get_okta_group_id(group_name)
if not user_id or not group_id:
return False
response = requests.put(
OKTA_API_URL + f"/groups/{group_id}/users/{user_id}",
headers=OKTA_API_HEADERS
)
if response.status_code == 204:
print(f"Added {member} to group {group_name}", file=sys.stderr)
return True
elif response.status_code == 401:
raise UnauthorizedError
else:
print(f"Adding {member} to group {group_name} did not return 204",
file=sys.stderr)
pprint(response, stream=sys.stderr)
return False
def check_if_user_in_okta_group(user: str, group_name: str) -> bool:
group_id = get_okta_group_id(group_name)
user_id = get_user_id(user)
if not group_id:
return False
users = get_okta_group_users(group_id)
return user_id in [m['id'] for m in users]
def get_okta_group_users(group_id: str) -> list:
if id in GROUP_USERS:
return GROUP_USERS[id]
response = okta_api_get_request(f"/groups/{group_id}/users?limit=200")
if response.status_code != 200:
print('Failed to get group membership.', file=sys.stderr)
return False
users = response.json()
while 'rel="next"' in response.headers['Link']:
response = okta_api_get_request(f"/groups/{group_id}/users?limit=200&after={users[-1]['id']}")
if response.status_code != 200:
print('Failed to get group membership.', file=sys.stderr)
return False
users += response.json()
GROUP_USERS[id] = users
return users
def get_user_id(query: str) -> str:
if query in USER_IDS:
return USER_IDS[query]
response = okta_api_get_request(f"/users?q={query}")
found_user_count = len(response.json())
user_id = ''
if found_user_count == 1:
user_id = response.json()[0]['id']
elif found_user_count < 1:
user, domain = query.split('@')
if user.endswith('-ext'):
print(f"No users found matching {query}", file=sys.stderr)
else:
ext_query = '@'.join([user + '-ext', domain])
ext_user_search_api_response = okta_api_get_request(f"/users?q={ext_query}")
found_user_count = len(ext_user_search_api_response.json())
if found_user_count == 1:
user_id = ext_user_search_api_response.json()[0]['id']
else:
print(f"No users found matching {query} or {ext_query}",
file=sys.stderr)
elif found_user_count > 1:
print(f"Found multiple users for {query}", file=sys.stderr)
USER_IDS[query] = user_id
return user_id
def main(arguments: list) -> None:
group_name = arguments[0]
for user in arguments[1:]:
if not check_if_user_in_okta_group(user, group_name):
add_member_to_okta_group(user, group_name)
else:
print(f"{user} is already a member of {group_name}", file=sys.stderr)
if __name__ == '__main__':
if not (OKTA_API_URL or OKTA_API_KEY or len(sys.argv) < 2):
print(__doc__)
sys.exit(2)
try:
main(sys.argv[1:])
except UnauthorizedError:
print('Request was unauthorized. Please check your Okta token and try again.', file=sys.stderr)