diff --git a/src/dexorder/bin/backfill_ohlc.py b/src/dexorder/bin/backfill_ohlc.py index bfdc99b..8d517db 100644 --- a/src/dexorder/bin/backfill_ohlc.py +++ b/src/dexorder/bin/backfill_ohlc.py @@ -1,9 +1,9 @@ import logging import sys from asyncio import CancelledError -from typing import Iterable, Union +from typing import Union, Reversible -from dexorder import blockchain, config +from dexorder import blockchain, config, from_timestamp, now from dexorder.bin.executable import execute from dexorder.blockstate import DiffItem from dexorder.blockstate.blockdata import BlockData @@ -13,17 +13,22 @@ from dexorder.configuration import parse_args from dexorder.contract import get_contract_event from dexorder.database import db from dexorder.database.model import Block -from dexorder.event_handler import handle_uniswap_swap +from dexorder.event_handler import handle_uniswap_swap, check_ohlc_rollover from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.memcache import memcache -from dexorder.ohlc import recent_ohlcs, ohlc_finalize, ohlcs +from dexorder.ohlc import recent_ohlcs, ohlc_save, ohlcs from dexorder.runner import BlockStateRunner +from dexorder.util import hexstr log = logging.getLogger('dexorder') -def finalize_callback(block: Block, _diffs: Iterable[Union[DiffItem, DiffEntryItem]]): - log.info(f'backfill completed through block {block.height} {block.timestamp:%Y-%m-%d %H:%M:%S} {block.hash}') +def finalize_callback(block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): + start = now() + log.info("finalizing OHLC's...") + ohlc_save(block, diffs) + log.info(f'\ttook {(now() - start).total_seconds():.1f} seconds') + log.info(f'backfill completed through block {block.height} {from_timestamp(block.timestamp):%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}') # noinspection DuplicatedCode @@ -34,8 +39,7 @@ async def main(): parse_args() if not config.ohlc_dir: config.ohlc_dir = './ohlc' - log.warning('Defaulting ohlc_dir to ./ohlc') - ohlcs.dir = config.ohlc_dir + ohlcs.dir = config.ohlc_dir await blockchain.connect() redis_state = None state = None @@ -56,12 +60,11 @@ async def main(): runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None, timer_period=0) runner.add_event_trigger(handle_uniswap_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap')) - - runner.on_promotion.append(ohlc_finalize) + runner.postprocess_cbs.append(check_ohlc_rollover) + runner.on_promotion.append(finalize_callback) if db: # noinspection PyUnboundLocalVariable runner.on_promotion.append(db_state.save) - runner.on_promotion.append(finalize_callback) if redis_state: runner.on_head_update.append(redis_state.save) diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index b3a141b..526406a 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -15,7 +15,7 @@ from dexorder.event_handler import init_order_triggers, init, dump_log, handle_v process_active_tranches, process_execution_requests, check_ohlc_rollover from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.memcache import memcache -from dexorder.ohlc import ohlc_finalize +from dexorder.ohlc import ohlc_save from dexorder.runner import BlockStateRunner from dexorder.transaction import handle_transaction_receipts, create_transactions, send_transactions @@ -97,7 +97,7 @@ async def main(): runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None) setup_logevent_triggers(runner) if config.ohlc_dir: - runner.on_promotion.append(ohlc_finalize) + runner.on_promotion.append(ohlc_save) if db: runner.on_state_init.append(init_order_triggers) # noinspection PyUnboundLocalVariable diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index e28cbbe..3f43849 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -10,7 +10,7 @@ from typing import Optional @dataclass class Config: rpc_url: str = 'http://localhost:8545' - ws_url: str = 'ws://localhost:8545' + ws_url: Optional[str] = 'ws://localhost:8545' rpc_urls: Optional[dict[str,str]] = field(default_factory=dict) db_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorder' datadb_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorderdata' diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index 42014c5..c0bcf6c 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -1,7 +1,6 @@ import asyncio import functools import logging -from datetime import datetime from uuid import UUID from web3.types import EventData diff --git a/src/dexorder/ohlc.py b/src/dexorder/ohlc.py index 456befe..5640441 100644 --- a/src/dexorder/ohlc.py +++ b/src/dexorder/ohlc.py @@ -9,18 +9,17 @@ from cachetools import LFUCache from dexorder import dec, config, from_isotime, minutely, from_timestamp from dexorder.base.chain import current_chain -from dexorder.blockstate import BlockDict, DiffItem +from dexorder.blockstate import BlockDict, DiffItem, current_blockstate from dexorder.blockstate.diff import DiffEntryItem from dexorder.database.model import Block -from dexorder.database.model.block import current_block log = logging.getLogger(__name__) OHLC_PERIODS = [ timedelta(minutes=1), - # timedelta(minutes=3), timedelta(minutes=5), timedelta(minutes=10), timedelta(minutes=15), timedelta(minutes=30), - # timedelta(hours=1), timedelta(hours=2), timedelta(hours=4), timedelta(hours=8), timedelta(hours=12), - # timedelta(days=1), timedelta(days=2), timedelta(days=3), timedelta(days=7) + timedelta(minutes=3), timedelta(minutes=5), timedelta(minutes=10), timedelta(minutes=15), timedelta(minutes=30), + timedelta(hours=1), timedelta(hours=2), timedelta(hours=4), timedelta(hours=8), timedelta(hours=12), + timedelta(days=1), timedelta(days=2), timedelta(days=3), timedelta(days=7) ] OHLC_DATE_ROOT = datetime(2009, 1, 4, tzinfo=timezone.utc) # Sunday before Bitcoin Genesis @@ -51,7 +50,7 @@ class NativeOHLC: @property def ohlc(self) -> OHLC: return [ - self.start.isoformat(timespec='minutes'), + minutely(self.start), None if self.open is None else str(self.open), None if self.high is None else str(self.high), None if self.low is None else str(self.low), @@ -59,8 +58,7 @@ class NativeOHLC: ] - -def ohlc_name(period: timedelta) -> str: +def period_name(period: timedelta) -> str: return f'{period // timedelta(minutes=1)}m' if period < timedelta(hours=1) \ else f'{period // timedelta(hours=1)}H' if period < timedelta(days=1) \ else f'{period // timedelta(days=7)}W' if period == timedelta(days=7) \ @@ -86,7 +84,7 @@ def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[d returns an ordered list of OHLC's that have been created/modified by the new time/price if price is None, then bars are advanced based on the time but no new price is added to the series. """ - log.debug(f'\tupdating {prev} with {minutely(time)} {price}') + # log.debug(f'\tupdating {prev} with {minutely(time)} {price}') cur = NativeOHLC.from_ohlc(prev) assert time >= cur.start result = [] @@ -97,7 +95,7 @@ def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[d break result.append(cur.ohlc) cur = NativeOHLC(end, None, None, None, cur.close) - log.debug(f'\ttime advancements: {result}') + # log.debug(f'\ttime advancements: {result}') # if we are setting a price, update the current bar if price is not None: if cur.open is None: @@ -109,8 +107,8 @@ def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[d cur.low = min(cur.low, price) cur.close = price result.append(cur.ohlc) - log.debug(f'\tappended current bar: {cur.ohlc}') - log.debug(f'\tupdate result: {result}') + # log.debug(f'\tappended current bar: {cur.ohlc}') + # log.debug(f'\tupdate result: {result}') return result class OHLCKey (NamedTuple): @@ -145,22 +143,23 @@ class OHLCRepository: """ if price is None, then bars are advanced based on the time but no new price is added to the series. """ - logname = f'{symbol} {ohlc_name(period)}' - log.debug(f'Updating OHLC {logname} {minutely(time)} {price}') + logname = f'{symbol} {period_name(period)}' + # log.debug(f'Updating OHLC {logname} {minutely(time)} {price}') key = (symbol, period) # bars is a list of "recent" OHLC's stored as blockdata. we try to keep the recent array long enough to extend before the root block time historical: Optional[list[OHLC]] = recent_ohlcs.get(key) + # log.debug(f'got recent {historical}') if not historical: if create is False or price is None: return # do not track symbols which have not been explicity set up p = str(price) historical = [] updated = [OHLC((minutely(ohlc_start_time(time, period)), p, p, p, p))] - log.debug(f'\tcreated new bars {updated}') + # log.debug(f'\tcreated new bars {updated}') else: updated = update_ohlc(historical[-1], period, time, price) # drop any historical bars that are older than we need - oldest_needed = from_timestamp(current_block.get().timestamp) - period # cover the root block time plus one period prior + oldest_needed = from_timestamp(current_blockstate.get().root_block.timestamp) - period # cover the root block time plus one period prior trim = (oldest_needed - from_isotime(historical[0][0])) // period if trim > 0: historical = historical[trim:] @@ -173,15 +172,16 @@ class OHLCRepository: first_updated = from_isotime(updated[0][0]) overlap = (first_updated - last_bar) // period + 1 updated = historical[:-overlap] + updated if overlap > 0 else historical + updated - log.debug(f'\tnew recents: {updated}') + # log.debug(f'\tnew recents: {updated}') recent_ohlcs.setitem(key, updated) return updated def save_all(self, symbol: str, period: timedelta, ohlc_list: list[OHLC]) -> None: for ohlc in ohlc_list: - self.save(symbol, period, ohlc) # we need to act sequentially so we don't have conflicting access to chunks + self.save(symbol, period, ohlc) # we need to act synchronously so we don't have conflicting access to chunks def save(self, symbol: str, period: timedelta, ohlc: OHLC) -> None: + # log.debug(f'saving ohlc {symbol} {period_name(period)} {ohlc}') time = dt(ohlc[0]) chunk = self.get_chunk(symbol, period, time) if not chunk: @@ -189,8 +189,10 @@ class OHLCRepository: else: start = from_isotime(chunk[0][0]) index = (time - start) // period - log.debug(f'save {symbol} {ohlc_name(period)} chunk {start} index {index} <= {len(chunk)}') - assert index <= len(chunk) + # log.debug(f'save {symbol} {ohlc_name(period)} chunk {start} index {index} <= {len(chunk)}') + if index > len(chunk): + log.error(f'chunk gap: {index} > {len(chunk)} {symbol} {period_name(period)} {ohlc}'+''.join(f'\n\t{c}' for c in chunk)) + exit(1) if index == len(chunk): assert from_isotime(chunk[-1][0]) + period == time chunk.append(ohlc) @@ -221,21 +223,22 @@ class OHLCRepository: if not chunk: return path = self.chunk_path(symbol, period, from_isotime(chunk[0][0])) - try: - with open(path, 'w') as file: - json.dump(chunk, file) - return - except FileNotFoundError: - os.makedirs(os.path.dirname(path), exist_ok=True) - with open(path, 'w') as file: - json.dump(chunk, file) + for _ in range(2): + try: + with open(path, 'w') as file: + json.dump(chunk, file) + self.cache[path] = chunk + return + except FileNotFoundError: + os.makedirs(os.path.dirname(path), exist_ok=True) + raise IOError(f'Could not write chunk {path}') def chunk_path(self, symbol: str, period: timedelta, time: datetime, *, chain_id: int = None) -> str: if chain_id is None: chain_id = current_chain.get().chain_id start = ohlc_start_time(time, period) - name = ohlc_name(period) + name = period_name(period) return f'{self.dir}/{chain_id}/{symbol}/{name}/' + ( f'{start.year}/{symbol}-{name}-{start:%Y%m%d}.json' if period < timedelta(hours=1) else # <1H data has a file per day f'{start.year}/{symbol}-{name}-{start:%Y%m}.json' if period < timedelta(days=7) else # <1W data has a file per month @@ -247,19 +250,19 @@ def pub_ohlc(_series:str, key: OHLCKey, bars: list[OHLC]): pool_addr, period = key chain_id = current_chain.get().chain_id return ( - f'{chain_id}|{pool_addr}|{ohlc_name(period)}', # channel name is like 0x...|1m + f'{chain_id}|{pool_addr}|{period_name(period)}', # channel name is like 0x...|1m 'ohlcs', (chain_id, pool_addr, bars) ) def ohlc_key_to_str(k): - return f'{k[0]}|{ohlc_name(k[1])}' + return f'{k[0]}|{period_name(k[1])}' def ohlc_str_to_key(s): pool, period_name = s.split('|') return pool, period_from_name(period_name) -def ohlc_finalize(_block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): +def ohlc_save(_block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): """ used as a finalization callback from BlockState data. """ diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index 5a7c285..782f65e 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -21,9 +21,10 @@ TimeTrigger = Callable[[int], None] # func(timestamp) time_triggers:BlockSet[TimeTrigger] = BlockSet('tt') PriceTrigger = Callable[[dec], Union[Awaitable[None],None]] # [async] func(pool_price) +UnconstrainedPriceTrigger = Callable[[Optional[dec]], Union[Awaitable[None],None]] # [async] func(pool_price) price_triggers:dict[str, BlockSet[PriceTrigger]] = defaultdictk(lambda addr:BlockSet(f'pt|{addr}')) # different BlockSet per pool address new_price_triggers:dict[str, set[PriceTrigger]] = defaultdict(set) # when price triggers are first set, they must be tested against the current price even if it didnt change this block -unconstrained_price_triggers: BlockSet[PriceTrigger] = BlockSet('upt') # tranches with no price constraints, whose time constraint is fulfilled +unconstrained_price_triggers: BlockSet[UnconstrainedPriceTrigger] = BlockSet('upt') # tranches with no price constraints, whose time constraint is fulfilled active_tranches: BlockDict[TrancheKey, Optional[PriceProof]] = BlockDict('at') # tranches which have passed all constraints and should be executed execution_requests:BlockDict[TrancheKey, ExecutionRequest] = BlockDict('e') # generated by the active tranches # todo should this really be blockdata? diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 02428de..6166c51 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -98,7 +98,7 @@ class BlockStateRunner: if self.state: self.max_height_seen = max(self.max_height_seen, self.state.root_block.height) self.running = True - return await (self.run_polling() if config.polling > 0 else self.run_ws()) + return await (self.run_polling() if config.polling > 0 or not config.ws_url else self.run_ws()) async def run_ws(self): w3ws = await create_w3_ws() @@ -124,8 +124,8 @@ class BlockStateRunner: if not self.running: break await async_yield() - except (ConnectionClosedError, TimeoutError): - pass + except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e: + log.debug(f'runner timeout {e}') finally: # noinspection PyBroadException try: @@ -168,8 +168,8 @@ class BlockStateRunner: if not self.running: break await asyncio.sleep(config.polling) - except ConnectionClosedError: - pass + except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e: + log.debug(f'runner timeout {e}') finally: # noinspection PyBroadException try: @@ -258,7 +258,7 @@ class BlockStateRunner: async def handle_head(self, chain, block, w3): print(f'logger {log} {log.name} level {log.level} {logging.DEBUG} {logging.FATAL}') - log.debug(f'handle_head {block.height} {block.hash}') + log.debug(f'handle_head {block.height} {hexstr(block.hash)}') session = None batches = [] try: @@ -365,7 +365,7 @@ class BlockStateRunner: else: new_root_fork = fork.for_height(promotion_height) if new_root_fork: - log.debug(f'promoting root {new_root_fork.height} {new_root_fork.hash}') + log.debug(f'promoting root {new_root_fork.height} {hexstr(new_root_fork.hash)}') diff_items = self.state.promote_root(new_root_fork) for callback in self.on_promotion: # todo try/except for known retryable errors