Files
backend/src/dexorder/ohlc.py
2024-01-23 01:31:47 -04:00

251 lines
9.6 KiB
Python

import json
import logging
import os
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Optional, NamedTuple
from cachetools import LFUCache
from dexorder import dec, config, from_isotime, minutely
from dexorder.base.chain import current_chain
from dexorder.blockstate import BlockDict
log = logging.getLogger(__name__)
OHLC_PERIODS = [
timedelta(minutes=1), timedelta(minutes=3), timedelta(minutes=5), timedelta(minutes=10), timedelta(minutes=15), timedelta(minutes=30),
timedelta(hours=1), timedelta(hours=2), timedelta(hours=4), timedelta(hours=8), timedelta(hours=12),
timedelta(days=1), timedelta(days=2), timedelta(days=3), timedelta(days=7)
]
OHLC_DATE_ROOT = datetime(2009, 1, 4, tzinfo=timezone.utc) # Sunday before Bitcoin Genesis
# OHLC's are stored as [time, open, high, low, close] string values. If there was no data during the interval,
# then open, high, and low are None but the close value is carried over from the previous interval.
OHLC = list[str] # typedef
def opt_dec(v):
return None if v is None else dec(v)
def dt(v):
return v if isinstance(v, datetime) else from_isotime(v)
@dataclass
class NativeOHLC:
@staticmethod
def from_ohlc(ohlc: OHLC) -> 'NativeOHLC':
return NativeOHLC(*[cast(value) for value, cast in zip(ohlc,(dt, opt_dec, opt_dec, opt_dec, dec))])
start: datetime
open: Optional[dec]
high: Optional[dec]
low: Optional[dec]
close: dec
@property
def ohlc(self) -> OHLC:
return [
self.start.isoformat(timespec='minutes'),
None if self.open is None else str(self.open),
None if self.high is None else str(self.high),
None if self.low is None else str(self.low),
str(self.close)
]
def ohlc_name(period: timedelta) -> str:
return f'{period // timedelta(minutes=1)}m' if period < timedelta(hours=1) \
else f'{period // timedelta(hours=1)}H' if period < timedelta(days=1) \
else f'{period // timedelta(days=7)}W' if period == timedelta(days=7) \
else f'{period // timedelta(days=1)}D'
def period_from_name(name: str) -> timedelta:
value = int(name[:-1])
unit = name[-1:]
factor = {'m':timedelta(minutes=1), 'H':timedelta(hours=1), 'D':timedelta(days=1), 'W':timedelta(days=7)}[unit]
return value * factor
def ohlc_start_time(time, period: timedelta):
""" returns the start time of the ohlc containing time, such that start_time <= time < start_time + period """
period_sec = int(period.total_seconds())
period_count = (time - OHLC_DATE_ROOT).total_seconds() // period_sec
return OHLC_DATE_ROOT + timedelta(seconds=period_sec * period_count)
def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[dec]) -> list[OHLC]:
"""
returns an ordered list of OHLC's that have been created/modified by the new time/price
if price is None, then bars are advanced based on the time but no new price is added to the series.
"""
log.debug(f'\tupdating {prev} with {minutely(time)} {price}')
cur = NativeOHLC.from_ohlc(prev)
assert time >= cur.start
result = []
# advance time and finalize any past OHLC's into the result array
while True:
end = cur.start + period
if time < end:
break
result.append(cur.ohlc)
cur = NativeOHLC(end, None, None, None, cur.close)
log.debug(f'\tresult after finalization: {result}')
# if we are setting a price, update the current bar
if price is not None:
if cur.open is None:
cur.open = price
cur.high = price
cur.low = price
else:
cur.high = max(cur.high, price)
cur.low = min(cur.low, price)
cur.close = price
result.append(cur.ohlc)
log.debug(f'\tappended current bar: {cur.ohlc}')
log.debug(f'\tupdate result: {result}')
return result
class OHLCKey (NamedTuple):
symbol: str
period: timedelta
class OHLCRepository:
def __init__(self, base_dir: str = None):
""" can't actually make more than one of these because there's a global recent_ohlcs BlockDict """
if base_dir is None:
base_dir = config.ohlc_dir
self.dir = base_dir
self.cache = LFUCache(len(OHLC_PERIODS) * 128) # enough for the top 128 pools
@staticmethod
def add_symbol(symbol: str, period: timedelta = None):
if period is not None:
if (symbol, period) not in recent_ohlcs:
recent_ohlcs[(symbol, period)] = [] # setting an empty value will initiate price capture
else:
for period in OHLC_PERIODS:
if (symbol, period) not in recent_ohlcs:
recent_ohlcs[(symbol, period)] = []
def update_all(self, symbol: str, time: datetime, price: dec, *, create: bool = True):
for period in OHLC_PERIODS:
self.update(symbol, period, time, price, create=create)
def update(self, symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, *, create: bool = True) -> Optional[list[OHLC]]:
"""
if price is None, then bars are advanced based on the time but no new price is added to the series.
"""
logname = f'{symbol} {ohlc_name(period)}'
log.debug(f'Updating OHLC {logname} {minutely(time)} {price}')
key = (symbol, period)
bars: Optional[list[OHLC]] = recent_ohlcs.get(key)
if not bars:
if create is False or price is None:
return # do not track symbols which have not been explicity set up
p = str(price)
updated = [OHLC((minutely(ohlc_start_time(time, period)), p, p, p, p))]
log.debug(f'\tcreated new bars {updated}')
else:
updated = update_ohlc(bars[-1], period, time, price)
if len(updated) == 1:
updated = [bars[-1], updated[0]] # return the previous finalized bar along with the updated current bar
log.debug(f'\tnew recents: {updated}')
recent_ohlcs.setitem(key, updated)
if len(updated) > 1:
log.debug(f'\tsaving finalized bars: {updated[:-1]}')
self.save_all(symbol, period, updated[:-1]) # save any finalized bars to storage
return updated
def save_all(self, symbol: str, period: timedelta, ohlc_list: list[OHLC]) -> None:
for ohlc in ohlc_list:
self.save(symbol, period, ohlc)
def save(self, symbol: str, period: timedelta, ohlc: OHLC) -> None:
time = dt(ohlc[0])
chunk = self.get_chunk(symbol, period, time)
if not chunk:
chunk = [ohlc]
else:
start = from_isotime(chunk[0][0])
index = (time - start) // period
assert index <= len(chunk)
if index == len(chunk):
assert from_isotime(chunk[-1][0]) + period == time
chunk.append(ohlc)
else:
assert from_isotime(chunk[index][0]) == time
chunk[index] = ohlc
self.save_chunk(symbol, period, chunk)
def get_chunk(self, symbol: str, period: timedelta, start_time: datetime) -> list[OHLC]:
path = self.chunk_path(symbol, period, start_time)
found = self.cache.get(path)
if found is None:
found = self.load_chunk(path)
if found is None:
found = []
self.cache[path] = found
return found
@staticmethod
def load_chunk(path: str) -> Optional[list[OHLC]]:
try:
with open(path, 'r') as file:
return json.load(file)
except FileNotFoundError:
return []
def save_chunk(self, symbol: str, period: timedelta, chunk: list[OHLC]):
if not chunk:
return
path = self.chunk_path(symbol, period, from_isotime(chunk[0][0]))
try:
with open(path, 'w') as file:
json.dump(chunk, file)
return
except FileNotFoundError:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w') as file:
json.dump(chunk, file)
def chunk_path(self, symbol: str, period: timedelta, time: datetime) -> str:
start = ohlc_start_time(time, period)
name = ohlc_name(period)
return f'{self.dir}/{symbol}/{name}/' + (
f'{start.year}/{symbol}-{name}-{start:%Y%m%d}.json' if period < timedelta(hours=1) else # <1H data has a file per day
f'{start.year}/{symbol}-{name}-{start:%Y%m}.json' if period < timedelta(days=7) else # <1W data has a file per month
f'{symbol}-{name}.json' # long periods are a single file for all of history
)
def pub_ohlc(_series:str, key: OHLCKey, bars: list[OHLC]):
pool_addr, period = key
chain_id = current_chain.get().chain_id
return (
f'{chain_id}|{pool_addr}|{ohlc_name(period)}', # channel name is like 0x...|1m
'ohlcs',
(chain_id, pool_addr, bars)
)
def ohlc_key_to_str(k):
return f'{k[0]}|{ohlc_name(k[1])}'
def ohlc_str_to_key(s):
pool, period_name = s.split('|')
return pool, period_from_name(period_name)
# The most recent OHLC's are stored as block data. We store a list of at least the two latest bars, which provides clients with
# the latest finalized bar as well as the current open bar.
recent_ohlcs: BlockDict[OHLCKey, list[OHLC]] = BlockDict('ohlc', db=True, redis=True, pub=pub_ohlc,
key2str=ohlc_key_to_str, str2key=ohlc_str_to_key,
series2key=lambda x:x, series2str=lambda x:x)
ohlcs = OHLCRepository()