-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathperformance.py
107 lines (98 loc) · 3.28 KB
/
performance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
"""
This module calculates different performance metrics for the
backtests and saves them in a summary folder
"""
import os
import json
import tempfile
import concurrent.futures
import pandas as pd
import numpy as np
import pyfolio as pf
from fastbt.rapid import metrics
try:
from google.cloud import storage
GOOGLE_CLOUD = True
client = storage.Client()
except ImportError:
print('Google Cloud library not installed')
GOOGLE_CLOUD = False
def get_benchmark(filename='indices.csv', parse_dates=['Date']):
"""
Get the benchmark returns
filename
filename with date and ohlc columns
parse_dates
columns to parse dates as list
returns calculated on the close column
"""
df = pd.read_csv(filename, parse_dates=parse_dates)
df.rename(lambda x: x.lower(), axis='columns', inplace=True)
df = df.set_index('date').sort_index()
df['chg'] = df.close.pct_change(1)
df = df.loc['2012':]
return df
def all_metrics(result, benchmark):
"""
Calculate all the metrics for the given backtest results
result
backtest results in the expected format - results generated
by rapid
"""
dct = {}
byday = result.groupby('timestamp').net_profit.sum()
stats = pf.timeseries.perf_stats(byday/100000,
factor_returns=benchmark.chg)
dct.update(stats.to_dict())
simple = metrics(result)
dct.update(simple)
dct['open_high'] = result.query('open==high').net_profit.sum()
dct['open_low'] = result.query('open==low').net_profit.sum()
by_year = byday.groupby(lambda x: x.year).sum()
dct.update(by_year.to_dict())
return dct
def runner(filename, output_file, benchmark, counter):
"""
Runner for concurrent execution
"""
results = pd.read_hdf(filename)
perf_stats = all_metrics(results, benchmark)
print(counter, output_file)
with open(output_file, 'w') as f:
json.dump(perf_stats, f)
def get_data_from_file(idnum, bucket_name='temp_btzoo', directory='results'):
"""
Get data from google cloud given id number
idnum
id of the backtest
bucket_name
name of the bucket
directory
directory from which files would be fetched
returns the content of the HDF file
"""
if not(GOOGLE_CLOUD):
print('Google Cloud library not installed')
return
bucket = client.bucket(bucket_name)
blob = bucket.blob('{}/{}.h5'.format(directory, idnum))
fp = tempfile.NamedTemporaryFile()
blob.download_to_filename(fp.name)
return pd.read_hdf(fp.name)
def main():
return
benchmark = get_benchmark()
counter = 0
for root,directory,files in os.walk(DIR):
for file in files:
if file.endswith('h5'):
filename = os.path.join(root, file)
output_filename = os.path.join(OUTPUT_DIR, file.split('.')[0])+'.json'
counter+=1
# Using thread pool since this is an I/O bound task
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.submit(runner, filename, output_filename, benchmark, counter)
if __name__ == "__main__":
DIR = '/media/machine/4E1EA2D152455460/temp/btzoo_results/results'
OUTPUT_DIR = '/media/machine/4E1EA2D152455460/temp/btzoo_results/summary'
main()