forked from keithrozario/potassium40
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathathena_functions.py
118 lines (96 loc) · 4.16 KB
/
athena_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import boto3
import logging
def check_execution_status(execution_id, client):
"""
checks the execution status of a Athena Query
loops until a result is available
returns result
"""
state = 'RUNNING'
while state in ['RUNNING', 'QUEUED']:
result = client.get_query_execution(QueryExecutionId=execution_id)
state = result['QueryExecution']['Status']['State']
if state == 'SUCCEEDED':
location = result['QueryExecution']['ResultConfiguration']['OutputLocation']
else:
location = result['QueryExecution']['Status']['StateChangeReason']
return state, location
def create_athena_db(bucket_name, region):
"""
creates and Athena database and table
Database name hardcoded to p40
Table name hardcoded to robots
"""
logger = logging.getLogger('__main__')
client = boto3.client('athena', region_name=region)
workgroup = 'primary'
db_name = 'p40'
drop_db_query = f"DROP DATABASE IF EXISTS {db_name} CASCADE"
create_db_query = f"CREATE DATABASE IF NOT EXISTS {db_name} LOCATION 's3://{bucket_name}'"
create_table_query = '''
CREATE EXTERNAL TABLE IF NOT EXISTS <db_name>.robots (
`domain` string,
`robots.txt` string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://<bucket_name>/robots/'
TBLPROPERTIES ('has_encrypted_data'='false');
'''.replace('<bucket_name>', bucket_name).replace('<db_name>', db_name)
queries = [drop_db_query, create_db_query, create_table_query]
logger.info('Creating Athena Database and Tables')
for k, query in enumerate(queries):
response = client.start_query_execution(QueryString=query,
ResultConfiguration={'OutputLocation': f"s3://{bucket_name}/athena"},
WorkGroup=workgroup)
result, location = check_execution_status(response['QueryExecutionId'], client)
if result != 'SUCCEEDED':
logger.error(f"Failed Executing {query}")
break
else:
pass
logger.info("Database Created, proceeding to query...")
def query_robots(bucket_name, region):
"""
Queries table and returns location where result file is available
database and table name hardcoded to p40
"""
query = 'select * from p40.robots ORDER BY domain'
workgroup = 'primary'
client = boto3.client('athena', region_name=region)
logger = logging.getLogger('__main__')
response = client.start_query_execution(QueryString=query,
ResultConfiguration={'OutputLocation': f"s3://{bucket_name}/athena"},
QueryExecutionContext={'Database': 'p40'},
WorkGroup=workgroup)
result, location = check_execution_status(response['QueryExecutionId'], client)
if result != 'SUCCEEDED':
logger.info(f"Failed Executing {query}")
else:
logger.info(f"Succeeded in Querying bucket. Result Location: {location}")
return location
def delete_athena_db(bucket_name, region):
"""
creates and Athena database and table
Database name hardcoded to p40
Table name hardcoded to robots
"""
logger = logging.getLogger('__main__')
client = boto3.client('athena', region_name=region)
workgroup = 'primary'
db_name = 'p40'
drop_db_query = f"DROP DATABASE IF EXISTS {db_name} CASCADE"
queries = [drop_db_query]
logger.info('Creating Athena Database and Tables')
for k, query in enumerate(queries):
response = client.start_query_execution(QueryString=query,
ResultConfiguration={'OutputLocation': f"s3://{bucket_name}/athena"},
WorkGroup=workgroup)
result, location = check_execution_status(response['QueryExecutionId'], client)
if result != 'SUCCEEDED':
logger.error(f"Failed Executing {query}")
break
else:
pass
logger.info("Athena Database Deleted, proceeding to query...")