-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcache.py
206 lines (167 loc) · 7.02 KB
/
cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import pandas as pd
import hashlib
MB = 1024 * 1024
GB = 1024 * MB
TB = 1024 * GB
PB = 1024 * TB
def load_data(sites, periods, kinds, skipFiles=[]):
all_data = pd.DataFrame()
counts = []
for site in sites:
for month in periods:
for kind in kinds:
site_data = pd.read_hdf("../data/" + month + '/' + site + '_' + kind + '_' + month + '.h5', key=site, mode='r')
site_data['site'] = 'xc_' + site
nfiles = site_data.filesize.count()
print(site, month, kind, nfiles)
ufiles = site_data.index.unique().shape[0]
totsize = site_data.filesize.sum() / PB
avgfilesize = site_data.filesize.mean() / GB
all_data = pd.concat([all_data, site_data])
counts.append([site, month, kind, nfiles, ufiles, totsize, avgfilesize])
df = pd.DataFrame(counts, columns=['site', 'month', 'kind', 'files', 'unique files', 'total size [PB]', 'avg. filesize [GB]'])
print(df)
if len(counts) == 1:
return all_data
print('---------- merged data -----------')
print(all_data.shape[0], 'files\t', all_data.index.unique().shape[0], 'unique\t',
all_data.filesize.sum() / PB, "PB\t", all_data.filesize.mean() / GB, "GB avg. file size")
all_data = all_data.sort_values('transfer_start')
if len(skipFiles) == 0:
return all_data
for rem in skipFiles:
print('removing: ', rem)
all_data = all_data[~all_data.index.str.contains(rem)]
print('---------- after removing files not to cache -----------')
print(all_data.shape[0], 'files\t', all_data.index.unique().shape[0], 'unique\t',
all_data.filesize.sum() / PB, "PB\t", all_data.filesize.mean() / GB, "GB avg. file size")
return all_data
class XCacheServer(object):
def __init__(self, size=TB, lwm=0.85, hwm=0.95):
self.size = size
self.lwm_bytes = size * lwm
self.hwm_bytes = size * hwm
self.lwm = lwm
self.hwm = hwm
self.cleanups = 0
self.files = {}
self.used = 0
def add_request(self, fn, fs, ts):
if fn in self.files:
self.files[fn][1] += 1
self.files[fn][2] = ts
return True
else:
if self.used + fs > self.hwm_bytes:
self.clean()
self.files[fn] = [fs, 1, ts]
self.used += fs
return False
def clean(self):
# print("cleaning...")
self.cleanups += 1
df = pd.DataFrame.from_dict(self.files, orient='index')
df.columns = ['filesize', 'accesses', 'access_time']
# here access time is last time file was accessed, sort it in ascending order.
df.sort_values(['access_time'], ascending=[True], inplace=True)
df['cum_sum'] = df.filesize.cumsum()
# print('files in cache:', df.shape[0], end=' ')
df = df[df.cum_sum < (self.hwm_bytes - self.lwm_bytes)]
# print('files to flush:', df.shape[0])
for fn in df.index.values:
cr = self.files.pop(fn)
self.used -= cr[0]
def get_stats(self):
df = pd.DataFrame.from_dict(self.files, orient='index')
df.columns = ['filesize', 'accesses', 'access_time']
return [self.cleanups, df.filesize.mean(), df.accesses.mean(), df.access_time.max() - df.access_time.mean()]
class XCacheSite(object):
def __init__(self, name, upstream='Origin', servers=1, size=TB, lwm=0.85, hwm=0.95):
""" cache size is in bytes """
self.name = name
self.upstream = upstream
self.nservers = servers
self.size = servers * size
self.lwm = lwm
self.hwm = hwm
self.hits = 0
self.requests = 0
self.data_from_cache = 0
self.data_asked_for = 0
self.servers = []
self.init()
def init(self):
for s in range(self.nservers):
self.servers.append(XCacheServer(self.size, self.lwm, self.hwm))
def add_request(self, fn, fs, ts):
# determine server
self.requests += 1
self.data_asked_for += fs
if self.name == 'Origin':
self.hits += 1
self.data_from_cache += fs
return True
server = int(hashlib.md5(fn.encode('utf-8')).hexdigest(), 16) % self.nservers
found = self.servers[server].add_request(fn, fs, ts)
if found:
self.hits += 1
self.data_from_cache += fs
return found
def get_servers_stats(self):
data = []
for s in self.servers:
data.append(s.get_stats())
df = pd.DataFrame(data)
df.columns = ['cleanups', 'avg. filesize', 'avg. accesses', 'avg. age']
df['site'] = self.name
return df
# def plot_cache_state(self):
# """ most important plots. """
# df = pd.DataFrame.from_dict(self.cache, orient='index')
# df.columns = ['filesize', 'accesses', 'first access', 'last access']
# plt.figure(figsize=(18, 6))
# plt.suptitle(self.name, fontsize=18)
# plt.subplot(131)
# plt.xlabel('filesize [MB]')
# plt.ylabel('count')
# plt.yscale('log', nonposy='clip')
# plt.xscale('log', nonposy='clip')
# plt.hist(df['filesize'], 200)
# plt.subplot(132)
# plt.xlabel('accesses (files in cache)')
# plt.ylabel('count')
# # plt.yscale('log', nonposy='clip')
# plt.hist(df.accesses, 100, log=True)
# plt.subplot(133)
# plt.xlabel('accesses (all files)')
# plt.ylabel('count')
# # plt.yscale('log', nonposy='clip')
# per_file_counts = XCache.all_accesses.groupby(['filename']).size().reset_index(name='counts')
# plt.hist(per_file_counts.counts, 100, log=True)
# # plt.show()
# plt.savefig(self.name + '.png')
# # show cache utilization vs time
# # show cache hit rate vs time
# # show age of the oldest file in cache vs time
# # show filesize distribution
# # show filesize vs accesses heat map
# def get_cache_stats(self):
# """ just a summary print """
# df = pd.DataFrame.from_dict(self.cache, orient='index')
# df.columns = ['filesize', 'accesses', 'access_time']
# res = {
# 'total accesses': XCache.all_accesses.shape[0],
# 'cache hits': self.total_hits,
# 'delivered data': XCache.all_accesses.filesize.sum() / XCache.TB,
# 'delivered from cache': self.data_from_cache / XCache.TB,
# 'cleanups': self.cleanups,
# 'files in cache': df.shape[0],
# 'avg. accesses of cached files': df['accesses'].mean(),
# 'avg. cached file size': df['filesize'].mean() / XCache.MB,
# }
# return res
# def store_result(self):
# '''storing results into the file'''
# df = pd.DataFrame.from_dict(self.get_cache_stats(), orient='index')
# df.columns = [self.get_name()]
# df.to_hdf(self.get_name() + '_results.h5', key=self.name, mode='w')