Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
camelpac committed Jun 19, 2021
1 parent df46fee commit 4de3c2c
Showing 1 changed file with 13 additions and 25 deletions.
38 changes: 13 additions & 25 deletions alpaca_backtrader_api/alpacastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import pandas as pd

import backtrader as bt
from alpaca_trade_api.entity import Aggs
from backtrader.metabase import MetaParams
from backtrader.utils.py3 import queue, with_metaclass

Expand Down Expand Up @@ -167,6 +166,7 @@ async def on_trade(self, msg):

class MetaSingleton(MetaParams):
'''Metaclass to make a metaclassed class a singleton'''

def __init__(cls, name, bases, dct):
super(MetaSingleton, cls).__init__(name, bases, dct)
cls._singleton = None
Expand Down Expand Up @@ -285,11 +285,11 @@ def get_notifications(self):

# Alpaca supported granularities
_GRANULARITIES = {
(bt.TimeFrame.Minutes, 1): '1Min',
(bt.TimeFrame.Minutes, 5): '5Min',
(bt.TimeFrame.Minutes, 1): '1Min',
(bt.TimeFrame.Minutes, 5): '5Min',
(bt.TimeFrame.Minutes, 15): '15Min',
(bt.TimeFrame.Minutes, 60): '1H',
(bt.TimeFrame.Days, 1): '1D',
(bt.TimeFrame.Days, 1): '1D',
}

def get_positions(self):
Expand Down Expand Up @@ -489,6 +489,7 @@ def get_aggs_from_alpaca(self,
but we need to manipulate it to be able to work with it
smoothly
"""

def _granularity_to_timeframe(granularity):
if granularity in [Granularity.Minute, Granularity.Ticks]:
timeframe = TimeFrame.Minute
Expand Down Expand Up @@ -573,19 +574,6 @@ def _resample(df):
else:
return df

# def _back_to_aggs(df):
# response = []
# for i, v in df.iterrows():
# response.append({
# "o": v.open,
# "h": v.high,
# "l": v.low,
# "c": v.close,
# "v": v.volume,
# "t": i.timestamp() * 1000,
# })
# return Aggs({"results": response})

if not start:
timeframe = _granularity_to_timeframe(granularity)
start = end - timedelta(days=1)
Expand All @@ -608,11 +596,11 @@ def _resample(df):
def streaming_prices(self,
dataname, timeframe, tmout=None, data_feed='iex'):
q = queue.Queue()
kwargs = {'q': q,
'dataname': dataname,
kwargs = {'q': q,
'dataname': dataname,
'timeframe': timeframe,
'data_feed': data_feed,
'tmout': tmout}
'tmout': tmout}
t = threading.Thread(target=self._t_streaming_prices, kwargs=kwargs)
t.daemon = True
t.start()
Expand All @@ -629,7 +617,6 @@ def _t_streaming_prices(self, dataname, timeframe, q, tmout, data_feed):
else:
method = StreamingMethod.MinuteAgg


streamer = Streamer(q,
api_key=self.p.key_id,
api_secret=self.p.secret_key,
Expand All @@ -648,9 +635,9 @@ def get_value(self):
return self._value

_ORDEREXECS = {
bt.Order.Market: 'market',
bt.Order.Limit: 'limit',
bt.Order.Stop: 'stop',
bt.Order.Market: 'market',
bt.Order.Limit: 'limit',
bt.Order.Stop: 'stop',
bt.Order.StopLimit: 'stop_limit',
bt.Order.StopTrail: 'trailing_stop',
}
Expand Down Expand Up @@ -760,6 +747,7 @@ def _check_if_transaction_occurred(order_id):
if trans is None:
break
self._process_transaction(order_id, trans)

while True:
try:
if self.q_ordercreate.empty():
Expand Down Expand Up @@ -847,7 +835,7 @@ def _transaction(self, trans):
self._transpend[oid].append(trans)
self._process_transaction(oid, trans)

_X_ORDER_FILLED = ('partially_filled', 'filled', )
_X_ORDER_FILLED = ('partially_filled', 'filled',)

def _process_transaction(self, oid, trans):
try:
Expand Down

0 comments on commit 4de3c2c

Please sign in to comment.