diff --git a/src/dexorder/blockstate/state.py b/src/dexorder/blockstate/state.py index 1e0a2d9..2dff20b 100644 --- a/src/dexorder/blockstate/state.py +++ b/src/dexorder/blockstate/state.py @@ -198,7 +198,10 @@ class BlockState: else: updated_keys.add((d.series, d.key)) del self.diffs_by_hash[dead.hash] - del self.ancestors[dead.hash] + try: + del self.ancestors[dead.hash] + except KeyError: + pass # todo is this bad? # prune diffs_by_series by removing old series diffs that have been superceded by new diffs for s, k in updated_keys: diff --git a/src/dexorder/ohlc.py b/src/dexorder/ohlc.py index f7f088a..9c937f6 100644 --- a/src/dexorder/ohlc.py +++ b/src/dexorder/ohlc.py @@ -1,7 +1,6 @@ import json import logging import os -from dataclasses import dataclass from datetime import datetime, timedelta, timezone from typing import Optional, NamedTuple, Reversible, Union @@ -35,27 +34,63 @@ def opt_dec(v): def dt(v): return v if isinstance(v, datetime) else from_timestamp(v) -@dataclass class NativeOHLC: @staticmethod def from_ohlc(ohlc: OHLC) -> 'NativeOHLC': - return NativeOHLC(*[cast(value) for value, cast in zip(ohlc,(dt, opt_dec, opt_dec, opt_dec, dec))]) + return NativeOHLC(*[cast(value) for value, cast in zip(ohlc,(dt, opt_dec, opt_dec, opt_dec, dec))], ohlc=ohlc) + + # noinspection PyShadowingBuiltins + def __init__(self, start: datetime, open: Optional[dec], high: Optional[dec], low: Optional[dec], close: dec, + *, ohlc: list=None): + self.start = start + self.open = open + self.high = high + self.low = low + self.close = close + self._ohlc = ohlc # JSON-able representation + self._json: str = None if ohlc is None else json.dumps(ohlc) + + + def update(self, price): + pstr = str(price) + if self.open is None: + self.open = price + self.high = price + self.low = price + if self._ohlc: + self._ohlc[1] = pstr + self._ohlc[2] = pstr + self._ohlc[3] = pstr + else: + self.high = max(self.high, price) + self.low = min(self.low, price) + if self._ohlc: + self._ohlc[1] = str(self.high) + self._ohlc[2] = str(self.low) + self.close = price + if self._ohlc: + self._ohlc[3] = pstr + self._json = None - start: datetime - open: Optional[dec] - high: Optional[dec] - low: Optional[dec] - close: dec @property def ohlc(self) -> OHLC: - return [ - timestamp(self.start), - None if self.open is None else str(self.open), - None if self.high is None else str(self.high), - None if self.low is None else str(self.low), - str(self.close) - ] + if self._ohlc is None: + self._ohlc = [ + timestamp(self.start), + None if self.open is None else str(self.open), + None if self.high is None else str(self.high), + None if self.low is None else str(self.low), + str(self.close) + ] + return self._ohlc + + + @property + def json(self) -> str: + if self._json is None: + self._json = json.dumps(self.ohlc) + return self._json def period_name(period: timedelta) -> str: @@ -79,13 +114,13 @@ def ohlc_start_time(time, period: timedelta): return OHLC_DATE_ROOT + timedelta(seconds=period_sec * period_count) -def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[dec]) -> list[OHLC]: +def update_ohlc(prev: NativeOHLC, period: timedelta, time: datetime, price: Optional[dec]) -> list[NativeOHLC]: """ returns an ordered list of OHLC's that have been created/modified by the new time/price if price is None, then bars are advanced based on the time but no new price is added to the series. """ # log.debug(f'\tupdating {prev} with {minutely(time)} {price}') - cur = NativeOHLC.from_ohlc(prev) + cur = prev assert time >= cur.start result = [] # advance time and finalize any past OHLC's into the result array @@ -93,20 +128,13 @@ def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[d end = cur.start + period if time < end: break - result.append(cur.ohlc) + result.append(cur) cur = NativeOHLC(end, None, None, None, cur.close) # log.debug(f'\ttime advancements: {result}') # if we are setting a price, update the current bar if price is not None: - if cur.open is None: - cur.open = price - cur.high = price - cur.low = price - else: - cur.high = max(cur.high, price) - cur.low = min(cur.low, price) - cur.close = price - result.append(cur.ohlc) + cur.update(price) + result.append(cur) # log.debug(f'\tappended current bar: {cur.ohlc}') # log.debug(f'\tupdate result: {result}') return result @@ -116,13 +144,85 @@ class OHLCKey (NamedTuple): period: timedelta +def chunk_path(symbol: str, period: timedelta, time: datetime, *, chain_id: int = None) -> str: + if chain_id is None: + chain_id = current_chain.get().chain_id + start = ohlc_start_time(time, period) + name = period_name(period) + return f'{chain_id}/{symbol}/{name}/' + ( + f'{start.year}/{symbol}-{name}-{start:%Y%m%d}.json' if period < timedelta(hours=1) else # <1H data has a file per day + f'{start.year}/{symbol}-{name}-{start:%Y%m}.json' if period < timedelta(days=7) else # <1W data has a file per month + f'{symbol}-{name}.json' # long periods are a single file for all of history + ) + + +class Chunk: + """ Chunks map to files of OHLC's on disk """ + def __init__(self, repo_dir: str, symbol: str, period: timedelta, time: datetime, + *, bars: Optional[list[NativeOHLC]] = None, chain_id: int = None): + self.repo_dir = repo_dir + self.symbol = symbol + self.period = period + self.chain_id = chain_id if chain_id is not None else current_chain.get().chain_id + self.path = chunk_path(symbol, period, time, chain_id=chain_id) + self.fullpath = os.path.join(repo_dir, self.path) + if bars is not None: + self.bars = bars + else: + try: + with open(self.fullpath, 'r') as file: + bars = json.load(file) + self.bars = [NativeOHLC.from_ohlc(ohlc) for ohlc in bars] + except FileNotFoundError: + self.bars = [] + + + def update(self, native: NativeOHLC): + if not self.bars: + self.bars = [native] + return + start = self.bars[0].start + index = (native.start - start) // self.period + # log.debug(f'save {symbol} {ohlc_name(period)} chunk {start} index {index} <= {len(chunk)}') + if index > len(self.bars): + log.error(f'chunk gap: {index} > {len(self.bars)} {self.symbol} {period_name(self.period)} {native}' + ''.join( + f'\n\t{c}' for c in self.bars)) + exit(1) + if index == len(self.bars): + assert self.bars[-1].start + self.period == native.start + self.bars.append(native) + else: + assert self.bars[index].start == native.start + self.bars[index] = native + + + def save(self): + for _ in range(2): + try: + with open(self.fullpath, 'w') as file: + json.dump([n.ohlc for n in self.bars], file) + return + except FileNotFoundError: + os.makedirs(os.path.dirname(self.fullpath), exist_ok=True) + raise IOError(f'Could not write chunk {self.fullpath}') + + + def __eq__(self, other): + return self.path == other.path + + + def __hash__(self): + return hash(self.path) + + class OHLCRepository: def __init__(self, base_dir: str = None): """ can't actually make more than one of these because there's a global recent_ohlcs BlockDict """ if base_dir is None: base_dir = config.ohlc_dir self.dir = base_dir - self.cache = LFUCache(len(OHLC_PERIODS) * 128) # enough for the top 128 pools + self.cache = LFUCache(len(OHLC_PERIODS) * 1024) + self.dirty_chunks = set() @staticmethod def add_symbol(symbol: str, period: timedelta = None): @@ -139,28 +239,30 @@ class OHLCRepository: self.update(symbol, period, time, price, create=create) @staticmethod - def update(symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, *, create: bool = True) -> Optional[list[OHLC]]: + def update(symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, *, create: bool = True) \ + -> Optional[list[NativeOHLC]]: """ if price is None, then bars are advanced based on the time but no new price is added to the series. """ logname = f'{symbol} {period_name(period)}' # log.debug(f'Updating OHLC {logname} {minutely(time)} {price}') - key = (symbol, period) - # bars is a list of "recent" OHLC's stored as blockdata. we try to keep the recent array long enough to extend before the root block time - historical: Optional[list[OHLC]] = recent_ohlcs.get(key) + key = symbol, period + # recent_ohlcs holds a list of "recent" NativeOHLC's stored as blockdata. we try to keep the recent array long + # enough to extend prior the root block time + historical: Optional[list[NativeOHLC]] = recent_ohlcs.get(key) # log.debug(f'got recent {historical}') if not historical: if create is False or price is None: return # do not track symbols which have not been explicity set up - p = str(price) historical = [] - updated = [OHLC((timestamp(ohlc_start_time(time, period)), p, p, p, p))] + updated = [NativeOHLC(ohlc_start_time(time, period), price, price, price, price)] # log.debug(f'\tcreated new bars {updated}') else: updated = update_ohlc(historical[-1], period, time, price) # drop any historical bars that are older than we need - oldest_needed = from_timestamp(current_blockstate.get().root_block.timestamp) - period # cover the root block time plus one period prior - trim = (oldest_needed - from_timestamp(historical[0][0])) // period + # oldest_needed = cover the root block time plus one period prior + oldest_needed = from_timestamp(current_blockstate.get().root_block.timestamp) - period + trim = (oldest_needed - historical[0].start) // period if trim > 0: historical = historical[trim:] @@ -168,118 +270,89 @@ class OHLCRepository: if not historical or not updated: updated = historical + updated else: - last_bar = from_timestamp(historical[-1][0]) - first_updated = from_timestamp(updated[0][0]) + last_bar = historical[-1].start + first_updated = updated[0].start overlap = (first_updated - last_bar) // period + 1 updated = historical[:-overlap] + updated if overlap > 0 else historical + updated # log.debug(f'\tnew recents: {updated}') recent_ohlcs.setitem(key, updated) return updated - def save_all(self, symbol: str, period: timedelta, ohlc_list: list[OHLC]) -> None: + def save_all(self, symbol: str, period: timedelta, ohlc_list: list[NativeOHLC]) -> None: + """ saves all OHLC's in the list """ for ohlc in ohlc_list: - self.save(symbol, period, ohlc) # we need to act synchronously so we don't have conflicting access to chunks + self.save(symbol, period, ohlc) - def save(self, symbol: str, period: timedelta, ohlc: OHLC) -> None: + def save(self, symbol: str, period: timedelta, native: NativeOHLC) -> None: + """ + pushes the native OHLC to a Chunk but DOES NOT WRITE TO DISK YET. you must call flush() to write all + dirty chunks to disk after a batch of OHLC's has been saved. + """ # log.debug(f'saving ohlc {symbol} {period_name(period)} {ohlc}') - time = dt(ohlc[0]) - chunk = self.get_chunk(symbol, period, time) + chunk = self.get_chunk(symbol, period, native.start) if not chunk: - chunk = [ohlc] + chunk = self.cache[chunk.path] = Chunk(self.dir, symbol, period, native.start, bars=[native]) else: - start = from_timestamp(chunk[0][0]) - index = (time - start) // period - # log.debug(f'save {symbol} {ohlc_name(period)} chunk {start} index {index} <= {len(chunk)}') - if index > len(chunk): - log.error(f'chunk gap: {index} > {len(chunk)} {symbol} {period_name(period)} {ohlc}'+''.join(f'\n\t{c}' for c in chunk)) - exit(1) - if index == len(chunk): - assert from_timestamp(chunk[-1][0]) + period == time - chunk.append(ohlc) - else: - assert from_timestamp(chunk[index][0]) == time - chunk[index] = ohlc - self.save_chunk(symbol, period, chunk) + chunk.update(native) + self.dirty_chunks.add(chunk) - def get_chunk(self, symbol: str, period: timedelta, start_time: datetime) -> list[OHLC]: - path = self.chunk_path(symbol, period, start_time) - found = self.cache.get(path) + def get_chunk(self, symbol: str, period: timedelta, time: datetime) -> Chunk: + start_time = ohlc_start_time(time, period) + chain_id = current_chain.get().chain_id + key = chunk_path(symbol, period, time, chain_id=chain_id) + found = self.cache.get(key) if found is None: - found = self.load_chunk(path) - if found is None: - found = [] - self.cache[path] = found + found = self.cache[key] = Chunk(self.dir, symbol, period, start_time, chain_id=chain_id) return found - @staticmethod - def load_chunk(path: str) -> Optional[list[OHLC]]: - try: - with open(path, 'r') as file: - return json.load(file) - except FileNotFoundError: - return [] - except: - log.error(f'exception loading chunk {path}') - raise - - def save_chunk(self, symbol: str, period: timedelta, chunk: list[OHLC]): - if not chunk: - return - path = self.chunk_path(symbol, period, from_timestamp(chunk[0][0])) - for _ in range(2): - try: - with open(path, 'w') as file: - json.dump(chunk, file) - self.cache[path] = chunk - return - except FileNotFoundError: - os.makedirs(os.path.dirname(path), exist_ok=True) - raise IOError(f'Could not write chunk {path}') + def flush(self) -> None: + for chunk in self.dirty_chunks: + chunk.save() + self.dirty_chunks.clear() - def chunk_path(self, symbol: str, period: timedelta, time: datetime, *, chain_id: int = None) -> str: - if chain_id is None: - chain_id = current_chain.get().chain_id - start = ohlc_start_time(time, period) - name = period_name(period) - return f'{self.dir}/{chain_id}/{symbol}/{name}/' + ( - f'{start.year}/{symbol}-{name}-{start:%Y%m%d}.json' if period < timedelta(hours=1) else # <1H data has a file per day - f'{start.year}/{symbol}-{name}-{start:%Y%m}.json' if period < timedelta(days=7) else # <1W data has a file per month - f'{symbol}-{name}.json' # long periods are a single file for all of history - ) - - -def pub_ohlc(_series:str, key: OHLCKey, bars: list[OHLC]): +def pub_ohlc(_series:str, key: OHLCKey, bars: list[NativeOHLC]): pool_addr, period = key chain_id = current_chain.get().chain_id return ( f'{chain_id}|{pool_addr}|{period_name(period)}', # channel name is like 0x...|1m 'ohlcs', - (chain_id, pool_addr, bars) + (chain_id, pool_addr, [b.ohlc for b in bars]) ) def ohlc_key_to_str(k): return f'{k[0]}|{period_name(k[1])}' def ohlc_str_to_key(s): - pool, period_name = s.split('|') - return pool, period_from_name(period_name) + pool, period = s.split('|') + return pool, period_from_name(period) def ohlc_save(_block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): """ used as a finalization callback from BlockState data. """ + dirty = False for diff in diffs: if diff.series == 'ohlc': symbol, period = diff.key ohlcs.save_all(symbol, period, diff.value) + dirty = True + if dirty: + ohlcs.flush() +def ohlc_value_to_string(bars: list[NativeOHLC]): + return '['+','.join(b.json for b in bars)+']' + +def ohlc_value_from_string(s: str): + return [NativeOHLC.from_ohlc(ohlc) for ohlc in json.loads(s)] # The most recent OHLC's are stored as block data. We store a list of at least the two latest bars, which provides clients with # the latest finalized bar as well as the current open bar. -recent_ohlcs: BlockDict[OHLCKey, list[OHLC]] = BlockDict('ohlc', db=True, redis=True, pub=pub_ohlc, - key2str=ohlc_key_to_str, str2key=ohlc_str_to_key, - series2key=lambda x:x, series2str=lambda x:x) +recent_ohlcs: BlockDict[OHLCKey, list[NativeOHLC]] = BlockDict( + 'ohlc', db=True, redis=True, pub=pub_ohlc, + key2str=ohlc_key_to_str, str2key=ohlc_str_to_key, series2key=lambda x: x, series2str=lambda x: x, + value2str=ohlc_value_to_string, str2value=ohlc_value_from_string +) ohlcs = OHLCRepository() diff --git a/src/dexorder/pools.py b/src/dexorder/pools.py index 767f5fd..a7c7a3d 100644 --- a/src/dexorder/pools.py +++ b/src/dexorder/pools.py @@ -73,7 +73,7 @@ async def uniswap_price(pool: PoolDict, sqrt_price=None) -> dec: pool_dec = pool['decimals'] price = dec(sqrt_price*sqrt_price) / 2 ** (96 * 2) result = price * dec(10) ** dec(pool_dec) - log.debug(f'pool sqrtX96 {sqrt_price} with {pool_dec} decimals = {result}') + # log.debug(f'pool sqrtX96 {sqrt_price} with {pool_dec} decimals = {result}') return result