Files
backend/src/dexorder/final_ohlc.py

320 lines
12 KiB
Python

import logging
import os
from datetime import timedelta, datetime, timezone
from typing import Optional
from cachetools import LFUCache
from dexorder import dec, timestamp, config, from_timestamp
from dexorder.base.chain import current_chain
from dexorder.ohlc import OHLC_PERIODS, period_name, ohlc_start_time
log = logging.getLogger(__name__)
# This is a parallel OHLC system that processes only known-final data, allowing it to be much more efficient
# than the leading edge reorgable OHLC's.
#
# See dexorder/doc/ohlc.md
#
class OHLCFilePath:
def __init__(self, symbol: str, period: timedelta, time: datetime):
self.symbol = symbol
self.period = period
name = period_name(period)
self.filepath = f'{symbol}/{name}/'
if period < timedelta(minutes=15):
# one file per day is the smallest resolution
# log.debug(f'{name} is daily')
self.start = start = datetime(time.year, time.month, time.day, tzinfo=timezone.utc)
self.end = self.start + timedelta(days=1)
self.file_interval = timedelta(days=1)
self.filepath += f'{start.year}/{symbol}-{name}-{start:%Y%m%d}.csv'
elif period < timedelta(hours=8):
# one file per month
# log.debug(f'{name} is monthly')
self.start = start = datetime(time.year, time.month, 1, tzinfo=timezone.utc)
end_month = time.month + 1
end_year = time.year
if end_month == 13:
end_month = 1
end_year += 1
self.end = datetime(end_year, end_month, 1, tzinfo=timezone.utc)
self.file_interval = timedelta(days=32) # it's ok to add a little more because we will find the start time of the new file.
self.filepath += f'{start.year}/{start.month}/{symbol}-{name}-{start:%Y%m}.csv'
elif period < timedelta(days=7):
# one file per year
# log.debug(f'{name} is yearly')
self.start = start = datetime(time.year, 1, 1, tzinfo=timezone.utc)
self.end = datetime(time.year+1, 1, 1, tzinfo=timezone.utc)
self.file_interval = timedelta(days=366)
self.filepath += f'{start.year}/{symbol}-{name}-{start:%Y%m}.csv'
else:
# weeklies get one file for everything
# log.debug(f'{name} is a single file')
self.start = None
self.end = None
self.file_interval = None
self.filepath += f'{symbol}-{name}.csv'
def next(self):
return OHLCFilePath(self.symbol, self.period, self.start + self.file_interval)
def __repr__(self):
return self.filepath
def __hash__(self):
return hash(self.filepath)
def __eq__(self, other):
return other == self.filepath or isinstance(other, OHLCFilePath) and self.filepath == other.filepath
class OHLCFile:
@staticmethod
def get(base_dir: str, filepath: OHLCFilePath):
key = base_dir, filepath
if key not in OHLCFile.cache:
OHLCFile.cache[key] = OHLCFile(base_dir, filepath)
return OHLCFile.cache[key]
def __init__(self, base_dir: str, filepath: OHLCFilePath):
self.base_dir = base_dir
self.filepath = filepath
self.file = None
self._cur = None
self._pending = None
self._final_rows = []
class OHLCFileCache(LFUCache[tuple[str,OHLCFilePath], 'OHLCFile']):
def popitem(self):
key, item = super().popitem()
item.close()
cache = OHLCFileCache(len(OHLC_PERIODS) * config.chunk_cache_size)
@property
def filename(self):
return os.path.join(self.base_dir, self.filepath.filepath)
@property
def timestamp(self):
try:
return self.cur[0]
except TypeError:
raise ValueError('cur is None')
@property
def price(self):
try:
return self.cur[-1]
except TypeError:
raise ValueError('cur is None')
@property
def cur(self):
return self._pending if self._pending is not None else self._cur
@cur.setter
def cur(self, value):
self._pending = value
def update(self, time: datetime, price: dec):
ts = timestamp(ohlc_start_time(time,self.filepath.period))
if self.file is None:
self._load(ts)
if self.cur is None:
# nothing yet. simple time+price
self.cur = ts, price
elif self.cur[0] < ts:
# the current bar was an old timestamp. Advance bars.
self._final_rows.append(self.cur)
# new bar time+price
self.cur = ts, price
elif len(self.cur) == 2:
self.cur = *self.cur, price
elif len(self.cur) == 3:
t, o, c = self.cur
self.cur = t, o, max(o,c,price), min(o,c,price), price
else:
t, o, h, line, c = self.cur
self.cur = t, o, max(h,line,price), min(h,line,price), price
@staticmethod
def row_bytes(row):
return (','.join([str(row[0])] + [f'{c:f}' for c in row[1:]])+'\n').encode('ascii')
def flush(self):
# first we write the "final" rows which means rows that have been closed and will get no more data.
for row in self._final_rows:
self.file.write(OHLCFile.row_bytes(row))
# apply any pending changes to the current row
if self._pending is not None:
self._cur = self._pending
self._pending = None
# write the current row
if self._cur is not None:
data = OHLCFile.row_bytes(self._cur)
start_of_current_row = self.file.tell()
self.file.write(data)
# rewind our file cursor to the beginning of the current row
self.file.seek(start_of_current_row)
log.debug(f'flushing {self.filename}')
self.file.flush()
def _load(self, earliest_change):
try:
self.file = open(self.filename, 'br+') # br+ is binary read+write
except FileNotFoundError:
# no existing file
os.makedirs(os.path.dirname(self.filename), exist_ok=True)
self.file = open(self.filename, 'bw')
else:
# load existing file
line_number = 0
row = None
prev_line = None
offset = 0 # this will point to the start of the last row
for line in self.file.readlines(): # all files should be small and can load at once
line_number += 1
row = line.decode('ascii').strip().split(',')
if not row[0]: # empty line
continue
if prev_line is not None:
# advance cursor past the previous row
offset += len(prev_line)
if int(row[0]) >= earliest_change:
# we are going to replace this row so stop walking forward and truncate the rest of the file
self.file.truncate(offset)
break
prev_line = line
if row is not None:
self._cur = [int(row[0]), *(dec(p) for p in row[1:])] # convert to int timestamp and dec prices
# set the file's write pointer the start of the final row
self.file.seek(offset)
# self._cur is now either None (empty file) or points to a natively-typed list representing the last row
def close(self):
self.file.close()
class OHLCFileSeries:
instances = {}
@staticmethod
def get(base_dir: str, symbol: str):
key = base_dir, symbol
if key not in OHLCFileSeries.instances:
OHLCFileSeries.instances[key] = OHLCFileSeries(base_dir,symbol)
return OHLCFileSeries.instances[key]
def __init__(self, base_dir: str, symbol: str):
self.base_dir = base_dir
self.symbol = symbol
self.series_start: Optional[datetime] = None # timestamp of the first datum in the series
self.write_offset: Optional[int] = None
self.last_flush: Optional[tuple[datetime,dec]] = None
self.quote_file = None
self.dirty_files = set()
self.quote: Optional[tuple[datetime,dec]] = None
def update(self, time: datetime, price: dec):
#
# load quote file
#
if self.quote_file is None:
quote_filename = os.path.join(self.base_dir, self.symbol, 'quote.csv')
try:
self.quote_file = open(quote_filename, 'br+')
# load existing quote file
line = self.quote_file.read().decode('ascii')
except FileNotFoundError:
os.makedirs(os.path.dirname(quote_filename), exist_ok=True)
self.quote_file = open(quote_filename, 'bw')
line = None
if line:
start, old_time, old_price = line.split(',')
self.series_start = from_timestamp(int(start))
# position the write cursor at the start of the second column so we can write the latest quote quickly
self.write_offset = len(start) + 1 # after the start time bytes and comma
self.quote_file.seek(self.write_offset)
self.last_flush = from_timestamp(int(old_time)), dec(old_price)
else:
# initialize new quote file with our own series_start time
self.quote_file.write((str(timestamp(time)) + ',').encode('ascii'))
self.write_offset = 0
self.last_flush = None
#
# forward-fill OHLC files that would otherwise be empty/skipped
#
if self.last_flush is not None:
for period in OHLC_PERIODS:
# get the path to the file that was last flushed
t, p = self.last_flush
path = OHLCFilePath(self.symbol, period, t)
while path.end and path.end < time:
path = path.next()
# initialize the new file using the carried-forward price
file = OHLCFile.get(self.base_dir, path)
file.update(file.filepath.start, p) # set file opening price
self.dirty_files.add(file)
self.quote = time, price
if self.series_start is None:
self.series_start = time
for period in OHLC_PERIODS:
file = OHLCFile.get(self.base_dir, OHLCFilePath(self.symbol, period, time))
file.update(time, price)
self.dirty_files.add(file)
def flush(self):
if self.quote is None:
log.warning('OHLCFileSeries was flushed without having any updated data.')
return
time, price = self.quote
ts = timestamp(time)
#
# flush dirty OHLC files
#
for file in self.dirty_files:
file.flush()
self.dirty_files.clear()
#
# flush quote file
#
self.quote_file.write(f'{ts},{price:f}'.encode('ascii'))
self.quote_file.flush()
self.quote_file.seek(self.write_offset)
# remember where we were so we can forward-fill again next time
self.last_flush = self.quote
class FinalOHLCRepository:
"""
Used for backfill when all data is known to be final. Keeps a simple table of current NativeOHLC candles.
"""
def __init__(self):
assert config.ohlc_dir
self.dirty_series = set()
def update(self, symbol: str, time: datetime, price: Optional[dec]):
chain_id = current_chain.get().id
base_dir = os.path.join(config.ohlc_dir, str(chain_id))
series = OHLCFileSeries.get(base_dir, symbol)
series.update(time, price)
self.dirty_series.add(series)
def flush(self) -> None:
for series in self.dirty_series:
series.flush()
self.dirty_series.clear()