-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdash_aggregations.py
195 lines (173 loc) · 6.35 KB
/
dash_aggregations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import pandas as pd
import pickle
from dataclasses import dataclass
from typing import List, Tuple
from datetime import datetime, timedelta
from core_content_metric import compute_core_content_metric
from google_analytics_ops import get_traffic_metrics
from utils import get_valid_users, get_valid_posts, get_valid_comments, get_valid_votes, get_valid_views, print_and_log, timed, get_config_field
from karmametric import compute_karma_metric
BASE_PATH = get_config_field('PATHS','base')
# Configurables
minimum_post_views_for_users = 1
post_required_upvotes = 1
# Abstraction for specifying each plot. All needed fields for generate_timeseries
@dataclass
class PlotSpec:
title: str
data: pd.DataFrame
color: str
date_column: str
period: str = 'D'
moving_averages: List[int] = (1, 7, 28)
agg_func: str = 'size'
agg_column: str = 'dummy'
start_date: datetime = datetime.today().date() - timedelta(28)
end_date: datetime = datetime.today().date()
size: Tuple[int, int] = (800, 400)
remove_last_periods: int = 1
# This structure contains the "business logic" of each plot and uses them to load data and generate actual plot_spec objects that can generate plots.
def generate_specs(collections):
allVotes, _, _ = compute_karma_metric(collections) # calculate karma metric
traffic_metrics = get_traffic_metrics()
core_content_metric = compute_core_content_metric(collections).reset_index()
plot_specs = [
PlotSpec(
title='Karma Metric',
data=allVotes,
date_column='votedAt',
agg_func='sum',
agg_column='effect',
color='red',
),
PlotSpec(
title='Core Content Metric',
data=core_content_metric,
date_column='index',
agg_func='sum',
agg_column=0,
color='purple'
),
PlotSpec(
title='Accounts Created, {}+ posts_viewed'.format(minimum_post_views_for_users),
data=get_valid_users(collections, required_minimum_posts_views=minimum_post_views_for_users),
date_column='true_earliest',
color='grey',
),
PlotSpec(
title='Num Logged-In Users',
data=collections['views'][collections['views']['userId'].isin(get_valid_users(collections)['_id'])],
date_column='createdAt',
color='black',
agg_func='nunique',
agg_column='userId',
),
PlotSpec(
title='Num Posts with 2+ Upvotes',
data=get_valid_posts(collections, required_upvotes=post_required_upvotes),
date_column='postedAt',
color='blue',
),
PlotSpec(
title='Num Unique Posters',
data=get_valid_posts(collections, required_upvotes=post_required_upvotes),
date_column='postedAt',
color='darkblue',
agg_func='nunique',
agg_column='userId'
),
PlotSpec(
title='Num Comments',
data=get_valid_comments(collections),
date_column='postedAt',
color='green'
),
PlotSpec(
title='Num Unique Commenters',
data=get_valid_comments(collections),
date_column='postedAt',
color='darkgreen',
agg_func='nunique',
agg_column='userId'
),
PlotSpec(
title='Num Votes (excluding self-votes)',
data=get_valid_votes(collections),
date_column='votedAt',
color='orange',
),
PlotSpec(
title='Num Unique Voters',
data=get_valid_votes(collections),
date_column='votedAt',
color='darkorange',
agg_func='nunique',
agg_column='userId'
),
PlotSpec(
title='Num Logged-In Post Views',
data=(get_valid_views(collections)
.assign(hour=lambda x: x['createdAt'].dt.round('H'))
.drop_duplicates(subset=['userId', 'documentId', 'hour'])),
date_column='createdAt',
color='red',
),
PlotSpec(
title='GA Users',
data=traffic_metrics,
date_column='date',
agg_func='sum',
agg_column='ga:users',
color='teal'
)
]
return plot_specs
def resample_timeseries(title, data, date_column, agg_column, agg_func, period, moving_average=1,
remove_last_periods=1):
"""Assumes date column is already index"""
return (data
.set_index(date_column)
.assign(dummy=1)
.resample(period)[agg_column]
.agg(agg_func)
.rolling(moving_average)
.mean()
.to_frame(title)
.round(1)
.reset_index()
.iloc[:-remove_last_periods]
)
@timed
def generate_timeseries_dict(plot_specs, periods=['D', 'W', 'M'], moving_averages=[1, 4, 7, 28]):
return {
(spec.title, pr, ma): resample_timeseries(
title=spec.title,
data=spec.data,
date_column=spec.date_column,
agg_column=spec.agg_column,
agg_func=spec.agg_func,
period=pr,
moving_average=ma,
)
for spec in plot_specs for pr in periods for ma in moving_averages
}
@timed
def run_dash_aggregations_pipeline(collections, date_str):
plot_specs = generate_specs(collections)
plot_specs_small = [
PlotSpec(
title=spec.title,
data=pd.DataFrame(),
date_column=spec.date_column,
agg_func=spec.agg_func,
agg_column=spec.agg_column,
color=spec.color,
) for spec in plot_specs
]
timeseries_dict = generate_timeseries_dict(plot_specs)
directory = BASE_PATH + '{folder}/{date}'.format(folder='processed', date=date_str)
print_and_log('Writing timeseries_dict to disk.')
pickle.dump(plot_specs_small, open(directory + '/plot_specs.p', 'wb'), protocol=pickle.HIGHEST_PROTOCOL)
print_and_log('Writing plot_specs to disk.')
pickle.dump(timeseries_dict, open(directory + '/timeseries_dict.p', 'wb'), protocol=pickle.HIGHEST_PROTOCOL)
print_and_log('Writing timeseries_dict and plot_specs to disk completed')