OHLC chunk caching and broad NativeOHLC use

This commit is contained in:
Tim
2024-02-14 20:56:19 -04:00
parent a14a26d38d
commit 1f9b778ac0
3 changed files with 185 additions and 109 deletions

View File

@@ -198,7 +198,10 @@ class BlockState:
else: else:
updated_keys.add((d.series, d.key)) updated_keys.add((d.series, d.key))
del self.diffs_by_hash[dead.hash] del self.diffs_by_hash[dead.hash]
del self.ancestors[dead.hash] try:
del self.ancestors[dead.hash]
except KeyError:
pass # todo is this bad?
# prune diffs_by_series by removing old series diffs that have been superceded by new diffs # prune diffs_by_series by removing old series diffs that have been superceded by new diffs
for s, k in updated_keys: for s, k in updated_keys:

View File

@@ -1,7 +1,6 @@
import json import json
import logging import logging
import os import os
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Optional, NamedTuple, Reversible, Union from typing import Optional, NamedTuple, Reversible, Union
@@ -35,27 +34,63 @@ def opt_dec(v):
def dt(v): def dt(v):
return v if isinstance(v, datetime) else from_timestamp(v) return v if isinstance(v, datetime) else from_timestamp(v)
@dataclass
class NativeOHLC: class NativeOHLC:
@staticmethod @staticmethod
def from_ohlc(ohlc: OHLC) -> 'NativeOHLC': def from_ohlc(ohlc: OHLC) -> 'NativeOHLC':
return NativeOHLC(*[cast(value) for value, cast in zip(ohlc,(dt, opt_dec, opt_dec, opt_dec, dec))]) return NativeOHLC(*[cast(value) for value, cast in zip(ohlc,(dt, opt_dec, opt_dec, opt_dec, dec))], ohlc=ohlc)
# noinspection PyShadowingBuiltins
def __init__(self, start: datetime, open: Optional[dec], high: Optional[dec], low: Optional[dec], close: dec,
*, ohlc: list=None):
self.start = start
self.open = open
self.high = high
self.low = low
self.close = close
self._ohlc = ohlc # JSON-able representation
self._json: str = None if ohlc is None else json.dumps(ohlc)
def update(self, price):
pstr = str(price)
if self.open is None:
self.open = price
self.high = price
self.low = price
if self._ohlc:
self._ohlc[1] = pstr
self._ohlc[2] = pstr
self._ohlc[3] = pstr
else:
self.high = max(self.high, price)
self.low = min(self.low, price)
if self._ohlc:
self._ohlc[1] = str(self.high)
self._ohlc[2] = str(self.low)
self.close = price
if self._ohlc:
self._ohlc[3] = pstr
self._json = None
start: datetime
open: Optional[dec]
high: Optional[dec]
low: Optional[dec]
close: dec
@property @property
def ohlc(self) -> OHLC: def ohlc(self) -> OHLC:
return [ if self._ohlc is None:
timestamp(self.start), self._ohlc = [
None if self.open is None else str(self.open), timestamp(self.start),
None if self.high is None else str(self.high), None if self.open is None else str(self.open),
None if self.low is None else str(self.low), None if self.high is None else str(self.high),
str(self.close) None if self.low is None else str(self.low),
] str(self.close)
]
return self._ohlc
@property
def json(self) -> str:
if self._json is None:
self._json = json.dumps(self.ohlc)
return self._json
def period_name(period: timedelta) -> str: def period_name(period: timedelta) -> str:
@@ -79,13 +114,13 @@ def ohlc_start_time(time, period: timedelta):
return OHLC_DATE_ROOT + timedelta(seconds=period_sec * period_count) return OHLC_DATE_ROOT + timedelta(seconds=period_sec * period_count)
def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[dec]) -> list[OHLC]: def update_ohlc(prev: NativeOHLC, period: timedelta, time: datetime, price: Optional[dec]) -> list[NativeOHLC]:
""" """
returns an ordered list of OHLC's that have been created/modified by the new time/price returns an ordered list of OHLC's that have been created/modified by the new time/price
if price is None, then bars are advanced based on the time but no new price is added to the series. if price is None, then bars are advanced based on the time but no new price is added to the series.
""" """
# log.debug(f'\tupdating {prev} with {minutely(time)} {price}') # log.debug(f'\tupdating {prev} with {minutely(time)} {price}')
cur = NativeOHLC.from_ohlc(prev) cur = prev
assert time >= cur.start assert time >= cur.start
result = [] result = []
# advance time and finalize any past OHLC's into the result array # advance time and finalize any past OHLC's into the result array
@@ -93,20 +128,13 @@ def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[d
end = cur.start + period end = cur.start + period
if time < end: if time < end:
break break
result.append(cur.ohlc) result.append(cur)
cur = NativeOHLC(end, None, None, None, cur.close) cur = NativeOHLC(end, None, None, None, cur.close)
# log.debug(f'\ttime advancements: {result}') # log.debug(f'\ttime advancements: {result}')
# if we are setting a price, update the current bar # if we are setting a price, update the current bar
if price is not None: if price is not None:
if cur.open is None: cur.update(price)
cur.open = price result.append(cur)
cur.high = price
cur.low = price
else:
cur.high = max(cur.high, price)
cur.low = min(cur.low, price)
cur.close = price
result.append(cur.ohlc)
# log.debug(f'\tappended current bar: {cur.ohlc}') # log.debug(f'\tappended current bar: {cur.ohlc}')
# log.debug(f'\tupdate result: {result}') # log.debug(f'\tupdate result: {result}')
return result return result
@@ -116,13 +144,85 @@ class OHLCKey (NamedTuple):
period: timedelta period: timedelta
def chunk_path(symbol: str, period: timedelta, time: datetime, *, chain_id: int = None) -> str:
if chain_id is None:
chain_id = current_chain.get().chain_id
start = ohlc_start_time(time, period)
name = period_name(period)
return f'{chain_id}/{symbol}/{name}/' + (
f'{start.year}/{symbol}-{name}-{start:%Y%m%d}.json' if period < timedelta(hours=1) else # <1H data has a file per day
f'{start.year}/{symbol}-{name}-{start:%Y%m}.json' if period < timedelta(days=7) else # <1W data has a file per month
f'{symbol}-{name}.json' # long periods are a single file for all of history
)
class Chunk:
""" Chunks map to files of OHLC's on disk """
def __init__(self, repo_dir: str, symbol: str, period: timedelta, time: datetime,
*, bars: Optional[list[NativeOHLC]] = None, chain_id: int = None):
self.repo_dir = repo_dir
self.symbol = symbol
self.period = period
self.chain_id = chain_id if chain_id is not None else current_chain.get().chain_id
self.path = chunk_path(symbol, period, time, chain_id=chain_id)
self.fullpath = os.path.join(repo_dir, self.path)
if bars is not None:
self.bars = bars
else:
try:
with open(self.fullpath, 'r') as file:
bars = json.load(file)
self.bars = [NativeOHLC.from_ohlc(ohlc) for ohlc in bars]
except FileNotFoundError:
self.bars = []
def update(self, native: NativeOHLC):
if not self.bars:
self.bars = [native]
return
start = self.bars[0].start
index = (native.start - start) // self.period
# log.debug(f'save {symbol} {ohlc_name(period)} chunk {start} index {index} <= {len(chunk)}')
if index > len(self.bars):
log.error(f'chunk gap: {index} > {len(self.bars)} {self.symbol} {period_name(self.period)} {native}' + ''.join(
f'\n\t{c}' for c in self.bars))
exit(1)
if index == len(self.bars):
assert self.bars[-1].start + self.period == native.start
self.bars.append(native)
else:
assert self.bars[index].start == native.start
self.bars[index] = native
def save(self):
for _ in range(2):
try:
with open(self.fullpath, 'w') as file:
json.dump([n.ohlc for n in self.bars], file)
return
except FileNotFoundError:
os.makedirs(os.path.dirname(self.fullpath), exist_ok=True)
raise IOError(f'Could not write chunk {self.fullpath}')
def __eq__(self, other):
return self.path == other.path
def __hash__(self):
return hash(self.path)
class OHLCRepository: class OHLCRepository:
def __init__(self, base_dir: str = None): def __init__(self, base_dir: str = None):
""" can't actually make more than one of these because there's a global recent_ohlcs BlockDict """ """ can't actually make more than one of these because there's a global recent_ohlcs BlockDict """
if base_dir is None: if base_dir is None:
base_dir = config.ohlc_dir base_dir = config.ohlc_dir
self.dir = base_dir self.dir = base_dir
self.cache = LFUCache(len(OHLC_PERIODS) * 128) # enough for the top 128 pools self.cache = LFUCache(len(OHLC_PERIODS) * 1024)
self.dirty_chunks = set()
@staticmethod @staticmethod
def add_symbol(symbol: str, period: timedelta = None): def add_symbol(symbol: str, period: timedelta = None):
@@ -139,28 +239,30 @@ class OHLCRepository:
self.update(symbol, period, time, price, create=create) self.update(symbol, period, time, price, create=create)
@staticmethod @staticmethod
def update(symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, *, create: bool = True) -> Optional[list[OHLC]]: def update(symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, *, create: bool = True) \
-> Optional[list[NativeOHLC]]:
""" """
if price is None, then bars are advanced based on the time but no new price is added to the series. if price is None, then bars are advanced based on the time but no new price is added to the series.
""" """
logname = f'{symbol} {period_name(period)}' logname = f'{symbol} {period_name(period)}'
# log.debug(f'Updating OHLC {logname} {minutely(time)} {price}') # log.debug(f'Updating OHLC {logname} {minutely(time)} {price}')
key = (symbol, period) key = symbol, period
# bars is a list of "recent" OHLC's stored as blockdata. we try to keep the recent array long enough to extend before the root block time # recent_ohlcs holds a list of "recent" NativeOHLC's stored as blockdata. we try to keep the recent array long
historical: Optional[list[OHLC]] = recent_ohlcs.get(key) # enough to extend prior the root block time
historical: Optional[list[NativeOHLC]] = recent_ohlcs.get(key)
# log.debug(f'got recent {historical}') # log.debug(f'got recent {historical}')
if not historical: if not historical:
if create is False or price is None: if create is False or price is None:
return # do not track symbols which have not been explicity set up return # do not track symbols which have not been explicity set up
p = str(price)
historical = [] historical = []
updated = [OHLC((timestamp(ohlc_start_time(time, period)), p, p, p, p))] updated = [NativeOHLC(ohlc_start_time(time, period), price, price, price, price)]
# log.debug(f'\tcreated new bars {updated}') # log.debug(f'\tcreated new bars {updated}')
else: else:
updated = update_ohlc(historical[-1], period, time, price) updated = update_ohlc(historical[-1], period, time, price)
# drop any historical bars that are older than we need # drop any historical bars that are older than we need
oldest_needed = from_timestamp(current_blockstate.get().root_block.timestamp) - period # cover the root block time plus one period prior # oldest_needed = cover the root block time plus one period prior
trim = (oldest_needed - from_timestamp(historical[0][0])) // period oldest_needed = from_timestamp(current_blockstate.get().root_block.timestamp) - period
trim = (oldest_needed - historical[0].start) // period
if trim > 0: if trim > 0:
historical = historical[trim:] historical = historical[trim:]
@@ -168,118 +270,89 @@ class OHLCRepository:
if not historical or not updated: if not historical or not updated:
updated = historical + updated updated = historical + updated
else: else:
last_bar = from_timestamp(historical[-1][0]) last_bar = historical[-1].start
first_updated = from_timestamp(updated[0][0]) first_updated = updated[0].start
overlap = (first_updated - last_bar) // period + 1 overlap = (first_updated - last_bar) // period + 1
updated = historical[:-overlap] + updated if overlap > 0 else historical + updated updated = historical[:-overlap] + updated if overlap > 0 else historical + updated
# log.debug(f'\tnew recents: {updated}') # log.debug(f'\tnew recents: {updated}')
recent_ohlcs.setitem(key, updated) recent_ohlcs.setitem(key, updated)
return updated return updated
def save_all(self, symbol: str, period: timedelta, ohlc_list: list[OHLC]) -> None: def save_all(self, symbol: str, period: timedelta, ohlc_list: list[NativeOHLC]) -> None:
""" saves all OHLC's in the list """
for ohlc in ohlc_list: for ohlc in ohlc_list:
self.save(symbol, period, ohlc) # we need to act synchronously so we don't have conflicting access to chunks self.save(symbol, period, ohlc)
def save(self, symbol: str, period: timedelta, ohlc: OHLC) -> None: def save(self, symbol: str, period: timedelta, native: NativeOHLC) -> None:
"""
pushes the native OHLC to a Chunk but DOES NOT WRITE TO DISK YET. you must call flush() to write all
dirty chunks to disk after a batch of OHLC's has been saved.
"""
# log.debug(f'saving ohlc {symbol} {period_name(period)} {ohlc}') # log.debug(f'saving ohlc {symbol} {period_name(period)} {ohlc}')
time = dt(ohlc[0]) chunk = self.get_chunk(symbol, period, native.start)
chunk = self.get_chunk(symbol, period, time)
if not chunk: if not chunk:
chunk = [ohlc] chunk = self.cache[chunk.path] = Chunk(self.dir, symbol, period, native.start, bars=[native])
else: else:
start = from_timestamp(chunk[0][0]) chunk.update(native)
index = (time - start) // period self.dirty_chunks.add(chunk)
# log.debug(f'save {symbol} {ohlc_name(period)} chunk {start} index {index} <= {len(chunk)}')
if index > len(chunk):
log.error(f'chunk gap: {index} > {len(chunk)} {symbol} {period_name(period)} {ohlc}'+''.join(f'\n\t{c}' for c in chunk))
exit(1)
if index == len(chunk):
assert from_timestamp(chunk[-1][0]) + period == time
chunk.append(ohlc)
else:
assert from_timestamp(chunk[index][0]) == time
chunk[index] = ohlc
self.save_chunk(symbol, period, chunk)
def get_chunk(self, symbol: str, period: timedelta, start_time: datetime) -> list[OHLC]: def get_chunk(self, symbol: str, period: timedelta, time: datetime) -> Chunk:
path = self.chunk_path(symbol, period, start_time) start_time = ohlc_start_time(time, period)
found = self.cache.get(path) chain_id = current_chain.get().chain_id
key = chunk_path(symbol, period, time, chain_id=chain_id)
found = self.cache.get(key)
if found is None: if found is None:
found = self.load_chunk(path) found = self.cache[key] = Chunk(self.dir, symbol, period, start_time, chain_id=chain_id)
if found is None:
found = []
self.cache[path] = found
return found return found
@staticmethod def flush(self) -> None:
def load_chunk(path: str) -> Optional[list[OHLC]]: for chunk in self.dirty_chunks:
try: chunk.save()
with open(path, 'r') as file: self.dirty_chunks.clear()
return json.load(file)
except FileNotFoundError:
return []
except:
log.error(f'exception loading chunk {path}')
raise
def save_chunk(self, symbol: str, period: timedelta, chunk: list[OHLC]):
if not chunk:
return
path = self.chunk_path(symbol, period, from_timestamp(chunk[0][0]))
for _ in range(2):
try:
with open(path, 'w') as file:
json.dump(chunk, file)
self.cache[path] = chunk
return
except FileNotFoundError:
os.makedirs(os.path.dirname(path), exist_ok=True)
raise IOError(f'Could not write chunk {path}')
def chunk_path(self, symbol: str, period: timedelta, time: datetime, *, chain_id: int = None) -> str: def pub_ohlc(_series:str, key: OHLCKey, bars: list[NativeOHLC]):
if chain_id is None:
chain_id = current_chain.get().chain_id
start = ohlc_start_time(time, period)
name = period_name(period)
return f'{self.dir}/{chain_id}/{symbol}/{name}/' + (
f'{start.year}/{symbol}-{name}-{start:%Y%m%d}.json' if period < timedelta(hours=1) else # <1H data has a file per day
f'{start.year}/{symbol}-{name}-{start:%Y%m}.json' if period < timedelta(days=7) else # <1W data has a file per month
f'{symbol}-{name}.json' # long periods are a single file for all of history
)
def pub_ohlc(_series:str, key: OHLCKey, bars: list[OHLC]):
pool_addr, period = key pool_addr, period = key
chain_id = current_chain.get().chain_id chain_id = current_chain.get().chain_id
return ( return (
f'{chain_id}|{pool_addr}|{period_name(period)}', # channel name is like 0x...|1m f'{chain_id}|{pool_addr}|{period_name(period)}', # channel name is like 0x...|1m
'ohlcs', 'ohlcs',
(chain_id, pool_addr, bars) (chain_id, pool_addr, [b.ohlc for b in bars])
) )
def ohlc_key_to_str(k): def ohlc_key_to_str(k):
return f'{k[0]}|{period_name(k[1])}' return f'{k[0]}|{period_name(k[1])}'
def ohlc_str_to_key(s): def ohlc_str_to_key(s):
pool, period_name = s.split('|') pool, period = s.split('|')
return pool, period_from_name(period_name) return pool, period_from_name(period)
def ohlc_save(_block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): def ohlc_save(_block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
""" """
used as a finalization callback from BlockState data. used as a finalization callback from BlockState data.
""" """
dirty = False
for diff in diffs: for diff in diffs:
if diff.series == 'ohlc': if diff.series == 'ohlc':
symbol, period = diff.key symbol, period = diff.key
ohlcs.save_all(symbol, period, diff.value) ohlcs.save_all(symbol, period, diff.value)
dirty = True
if dirty:
ohlcs.flush()
def ohlc_value_to_string(bars: list[NativeOHLC]):
return '['+','.join(b.json for b in bars)+']'
def ohlc_value_from_string(s: str):
return [NativeOHLC.from_ohlc(ohlc) for ohlc in json.loads(s)]
# The most recent OHLC's are stored as block data. We store a list of at least the two latest bars, which provides clients with # The most recent OHLC's are stored as block data. We store a list of at least the two latest bars, which provides clients with
# the latest finalized bar as well as the current open bar. # the latest finalized bar as well as the current open bar.
recent_ohlcs: BlockDict[OHLCKey, list[OHLC]] = BlockDict('ohlc', db=True, redis=True, pub=pub_ohlc, recent_ohlcs: BlockDict[OHLCKey, list[NativeOHLC]] = BlockDict(
key2str=ohlc_key_to_str, str2key=ohlc_str_to_key, 'ohlc', db=True, redis=True, pub=pub_ohlc,
series2key=lambda x:x, series2str=lambda x:x) key2str=ohlc_key_to_str, str2key=ohlc_str_to_key, series2key=lambda x: x, series2str=lambda x: x,
value2str=ohlc_value_to_string, str2value=ohlc_value_from_string
)
ohlcs = OHLCRepository() ohlcs = OHLCRepository()

View File

@@ -73,7 +73,7 @@ async def uniswap_price(pool: PoolDict, sqrt_price=None) -> dec:
pool_dec = pool['decimals'] pool_dec = pool['decimals']
price = dec(sqrt_price*sqrt_price) / 2 ** (96 * 2) price = dec(sqrt_price*sqrt_price) / 2 ** (96 * 2)
result = price * dec(10) ** dec(pool_dec) result = price * dec(10) ** dec(pool_dec)
log.debug(f'pool sqrtX96 {sqrt_price} with {pool_dec} decimals = {result}') # log.debug(f'pool sqrtX96 {sqrt_price} with {pool_dec} decimals = {result}')
return result return result