From be91e75d716c06bd651a266930ba7dc9b9351595 Mon Sep 17 00:00:00 2001 From: tim Date: Mon, 15 Jan 2024 23:12:13 -0400 Subject: [PATCH] refactored runner callback setup into main as prep for separate ohlc process --- bin/df.sh | 2 +- src/dexorder/base/ohlc.py | 85 ++++++++++++++++++++++++++++++++++ src/dexorder/bin/main.py | 60 +++++++++++++++++++++++- src/dexorder/event_handler.py | 57 ++--------------------- src/dexorder/order/triggers.py | 1 - src/dexorder/runner.py | 8 ++-- src/dexorder/transaction.py | 13 ------ 7 files changed, 152 insertions(+), 74 deletions(-) create mode 100644 src/dexorder/base/ohlc.py diff --git a/bin/df.sh b/bin/df.sh index 180132b..143d53c 100755 --- a/bin/df.sh +++ b/bin/df.sh @@ -1,2 +1,2 @@ #!/bin/bash -docker run --network=host --ulimit memlock=-1 docker.dragonflydb.io/dragonflydb/dragonfly:latest --maxmemory 1G --hz=1 --dbfilename '' "$@" +docker run --network=host --ulimit memlock=-1 docker.dragonflydb.io/dragonflydb/dragonfly:latest --proactor_threads 1 --maxmemory 256MiB --hz=1 --dbfilename '' "$@" diff --git a/src/dexorder/base/ohlc.py b/src/dexorder/base/ohlc.py new file mode 100644 index 0000000..c135de1 --- /dev/null +++ b/src/dexorder/base/ohlc.py @@ -0,0 +1,85 @@ +import logging +from dataclasses import dataclass +from datetime import datetime, timedelta + +from dexorder import dec + +log = logging.getLogger(__name__) + + +OHLC_PERIODS = [ + timedelta(minutes=1), timedelta(minutes=3), timedelta(minutes=5), timedelta(minutes=10), timedelta(minutes=15), timedelta(minutes=30), + timedelta(hours=1), timedelta(hours=2), timedelta(hours=4), timedelta(hours=8), timedelta(hours=12), + timedelta(days=1), timedelta(days=2), timedelta(days=3), timedelta(days=7) +] + +OHLC_DATE_ROOT = datetime(2009, 1, 4) # Sunday before Bitcoin Genesis + +def ohlc_name(period: timedelta) -> str: + return f'{period//timedelta(minutes=1)}m' if period < timedelta(hours=1) \ + else f'{period//timedelta(hours=1)}H' if period < timedelta(days=1) \ + else f'{period//timedelta(days=7)}W' if period == timedelta(days=7) \ + else f'{period//timedelta(days=1)}D' + + +def ohlc_start_time(time, period: timedelta): + """ returns the start time of the ohlc containing time, such that start_time <= time < start_time + period """ + period_sec = int(period.total_seconds()) + period_count = (time - OHLC_DATE_ROOT).total_seconds() // period_sec + return OHLC_DATE_ROOT + timedelta(seconds=period_sec * period_count) + + +@dataclass(frozen=True) +class OHLC: + start_time: datetime # first datetime included in this range + period: timedelta # the interval covered by this range, starting from start_time + + # if no swaps happen during the interval, heights are set to prev_ohlc.last_height + first_height: int = None # blockchain height of the first trade in this range. + last_height: int = None # last_height == first_height if there's zero or one trades during this interval + + # if no swaps happen during the interval, prices are set to prev_ohlc.close + open: dec = None + high: dec = None + low: dec = None + close: dec = None + + has_data: bool = False # True iff any trade has happened this period + + def update(self, height: int, time: datetime, price: dec) -> list['OHLC',...:'OHLC']: + """ returns an ordered list of OHLC's that have been created/modified by the new price """ + assert time >= self.start_time + result = [] + cur = self + start = self.start_time + while True: + end = start + self.period + if time < end: + break + result.append(cur) + start = end + cur = OHLC(start, self.period, cur.last_height, cur.last_height, cur.close, cur.close, cur.close, cur.close) + if not cur.has_data: + cur = OHLC(cur.start_time, self.period, height, height, price, price, price, price, True) + else: + cur = OHLC(cur.start_time, self.period, cur.first_height, height, cur.open, max(cur.high,price), min(cur.low,price), price, True) + result.append(cur) + return result + +class OHLCRepository: + def __init__(self, base_dir: str): + self.dir = base_dir + + def chunk_path(self, symbol: str, period: timedelta, time: datetime) -> str: + start = ohlc_start_time(time, period) + name = ohlc_name(period) + return f'{self.dir}/{symbol}/{name}/' + ( + f'{start.year}/{symbol}-{name}-{start:%Y%m%d}.json' if period < timedelta(hours=1) else # <1H data has a file per day + f'{start.year}/{symbol}-{name}-{start:%Y%m}.json' if period < timedelta(days=7) else # <1W data has a file per month + f'{symbol}-{name}.json' # long periods are a single file for all of history + ) + +if __name__ == '__main__': + r = OHLCRepository('') + for p in OHLC_PERIODS: + print(f'{ohlc_name(p)}\t{r.chunk_path("symbol",p, datetime.utcnow())}') diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index 1deeefa..c06fb0a 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -3,16 +3,70 @@ import sys from asyncio import CancelledError from dexorder import db, blockchain +from dexorder.base.chain import current_chain from dexorder.bin.executable import execute from dexorder.blockstate.blockdata import BlockData from dexorder.blockstate.db_state import DbState from dexorder.configuration import parse_args -from dexorder.event_handler import init_order_triggers +from dexorder.contract import get_contract_event +from dexorder.contract.dexorder import get_factory_contract, get_dexorder_contract +from dexorder.event_handler import init_order_triggers, init, dump_log, handle_vault_created, handle_order_placed, handle_transfer, handle_uniswap_swap, \ + handle_swap_filled, handle_order_canceled, handle_order_cancel_all, handle_dexorderexecutions, activate_time_triggers, activate_price_triggers, \ + process_active_tranches, process_execution_requests from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.memcache import memcache from dexorder.runner import BlockStateRunner +from dexorder.transaction import handle_transaction_receipts, create_transactions, send_transactions log = logging.getLogger('dexorder') +LOG_ALL_EVENTS = True # for debug todo config + + +# +# These event callbacks are basically the run loop for every block +# + +def setup_logevent_triggers(runner): + # the triggers for each log events are triggered in the order of event registry, so the + # code ordering here is also the trigger order: e.g. we process all vault creation events + # before any order creations + + if LOG_ALL_EVENTS: + log.debug('all events:') + runner.add_event_trigger(dump_log, None, {}) + + factory = get_factory_contract() + if factory is None: + log.warning(f'No Factory for {current_chain.get()}') + vault_created = get_contract_event('Factory', 'VaultCreated') + else: + vault_created = factory.events.VaultCreated() + + dexorder = get_dexorder_contract() + if dexorder is None: + log.warning(f'No Dexorder for {current_chain.get()}') + executions = get_contract_event('Dexorder', 'DexorderExecutions') + else: + executions = dexorder.events.DexorderExecutions() + + runner.add_event_trigger(init) + runner.add_event_trigger(handle_vault_created, vault_created) + runner.add_event_trigger(handle_order_placed, get_contract_event('OrderLib', 'DexorderSwapPlaced')) + runner.add_event_trigger(handle_transfer, get_contract_event('ERC20', 'Transfer')) + runner.add_event_trigger(handle_uniswap_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap')) + runner.add_event_trigger(handle_swap_filled, get_contract_event('OrderLib', 'DexorderSwapFilled')) + runner.add_event_trigger(handle_order_canceled, get_contract_event('OrderLib', 'DexorderSwapCanceled')) + runner.add_event_trigger(handle_order_cancel_all, get_contract_event('OrderLib', 'DexorderCancelAll')) + runner.add_event_trigger(handle_transaction_receipts) + runner.add_event_trigger(handle_dexorderexecutions, executions) + + # these callbacks run after the ones above on each block, plus these also run every second + runner.add_postprocess_trigger(activate_time_triggers) + runner.add_postprocess_trigger(activate_price_triggers) + runner.add_postprocess_trigger(process_active_tranches) + runner.add_postprocess_trigger(process_execution_requests) + runner.add_postprocess_trigger(create_transactions) + runner.add_postprocess_trigger(send_transactions) async def main(): @@ -31,19 +85,21 @@ async def main(): with db.session: state = db_state.load() if state is not None: - if redis_state: await redis_state.init(state) log.info(f'loaded state from db for root block {state.root_block}') runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None) + setup_logevent_triggers(runner) if db: + # noinspection PyTypeChecker runner.on_state_init.append(init_order_triggers) # noinspection PyUnboundLocalVariable runner.on_promotion.append(db_state.save) if redis_state: # noinspection PyTypeChecker runner.on_head_update.append(redis_state.save) + try: await runner.run() except CancelledError: diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index e3ae3da..9ad3cc8 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -24,57 +24,6 @@ from dexorder.util.async_util import maywait log = logging.getLogger(__name__) -LOG_ALL_EVENTS = True # for debug - - -def setup_logevent_triggers(runner): - runner.events.clear() - - # the triggers for each log events are triggered in the order of event registry, so the - # code ordering here is also the trigger order: e.g. we process all vault creation events - # before any order creations - - if LOG_ALL_EVENTS: - log.debug('all events:') - runner.add_event_trigger(dump_log, None, {}) - - factory = get_factory_contract() - if factory is None: - log.warning(f'No Factory for {current_chain.get()}') - vault_created = get_contract_event('Factory', 'VaultCreated') - else: - vault_created = factory.events.VaultCreated() - - dexorder = get_dexorder_contract() - if dexorder is None: - log.warning(f'No Dexorder for {current_chain.get()}') - executions = get_contract_event('Dexorder', 'DexorderExecutions') - else: - executions = dexorder.events.DexorderExecutions() - - # - # THIS IS BASICALLY THE MAIN RUN LOOP FOR EVERY BLOCK - # - - runner.add_event_trigger(init) - runner.add_event_trigger(handle_vault_created, vault_created) - runner.add_event_trigger(handle_order_placed, get_contract_event('OrderLib', 'DexorderSwapPlaced')) - runner.add_event_trigger(handle_transfer, get_contract_event('ERC20', 'Transfer')) - runner.add_event_trigger(handle_uniswap_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap')) - runner.add_event_trigger(handle_swap_filled, get_contract_event('OrderLib', 'DexorderSwapFilled')) - runner.add_event_trigger(handle_order_canceled, get_contract_event('OrderLib', 'DexorderSwapCanceled')) - runner.add_event_trigger(handle_order_cancel_all, get_contract_event('OrderLib', 'DexorderCancelAll')) - runner.add_event_trigger(handle_transaction_receipts) - runner.add_event_trigger(handle_dexorderexecutions, executions) - - # these callbacks run after the ones above on each block, plus these also run every second - runner.add_postprocess_trigger(activate_time_triggers) - runner.add_postprocess_trigger(activate_price_triggers) - runner.add_postprocess_trigger(process_active_tranches) - runner.add_postprocess_trigger(process_execution_requests) - runner.add_postprocess_trigger(create_transactions) - runner.add_postprocess_trigger(send_transactions) - def dump_log(eventlog): log.debug(f'\t{eventlog}') @@ -265,7 +214,7 @@ def process_active_tranches(): for tk, proof in active_tranches.items(): old_req = execution_requests.get(tk) height = current_block.get().height - if old_req is None or old_req.height <= height: # <= used so proof is updated with more recent values + if old_req is None or old_req.height <= height: # '<=' is used so proof is updated with more recent values log.info(f'execution request for {tk}') execution_requests[tk] = ExecutionRequest(height, proof) @@ -277,7 +226,9 @@ async def process_execution_requests(): tk: TrancheKey er: ExecutionRequest pending = inflight_execution_requests.get(tk) - if pending is None or height-pending >= 30: # todo execution timeout => retry ; should we use timestamps? configure per-chain. + if pending is None or height-pending >= 30: + # todo execution timeout => retry ; should we use timestamps? configure per-chain. + # todo check balances execs[tk] = er else: log.debug(f'tranche {tk} is pending execution') diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index 42024a9..8f8ac13 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -26,7 +26,6 @@ new_price_triggers:dict[str, set[PriceTrigger]] = defaultdict(set) # when price unconstrained_price_triggers: BlockSet[PriceTrigger] = BlockSet('upt') # tranches with no price constraints, whose time constraint is fulfilled active_tranches: BlockDict[TrancheKey, Optional[PriceProof]] = BlockDict('at') # tranches which have passed all constraints and should be executed execution_requests:BlockDict[TrancheKey, ExecutionRequest] = BlockDict('e') # generated by the active tranches - # todo should this really be blockdata? inflight_execution_requests:BlockDict[TrancheKey, int] = BlockDict('ei') # value is block height when the request was sent diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index c6ad7fb..e2e0d6e 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -16,7 +16,6 @@ from dexorder.blockstate import BlockState, current_blockstate from dexorder.blockstate.diff import DiffEntryItem from dexorder.database.model import Block from dexorder.database.model.block import current_block, latest_block -from dexorder.event_handler import setup_logevent_triggers from dexorder.util import hexstr, topic from dexorder.util.async_util import maywait @@ -64,9 +63,11 @@ class BlockStateRunner: self.events.append((callback, event, log_filter)) def add_postprocess_trigger(self, callback: Callable[[dict], None]): + # noinspection PyTypeChecker self.postprocess_cbs.append(callback) async def run(self): + # this run() process discovers new heads and puts them on a queue for the worker to process. the discovery is ether websockets or polling return await (self.run_polling() if config.polling > 0 else self.run_ws()) async def run_ws(self): @@ -102,7 +103,7 @@ class BlockStateRunner: chain = Blockchain.for_id(chain_id) current_chain.set(chain) - setup_logevent_triggers(self) + # this run() process discovers new heads and puts them on a queue for the worker to process _worker_task = asyncio.create_task(self.worker()) while self.running: @@ -147,10 +148,9 @@ class BlockStateRunner: chain = Blockchain.for_id(chain_id) current_chain.set(chain) - setup_logevent_triggers(self) _worker_task = asyncio.create_task(self.worker()) - prev_blockhash = None + prev_blockhash = None while self.running: try: # polling mode is used primarily because Hardhat fails to deliver newHeads events after about an hour diff --git a/src/dexorder/transaction.py b/src/dexorder/transaction.py index f21ed34..8f645b5 100644 --- a/src/dexorder/transaction.py +++ b/src/dexorder/transaction.py @@ -1,13 +1,10 @@ import logging from abc import abstractmethod -from asyncio import Queue -from contextvars import ContextVar from uuid import uuid4 from web3.exceptions import TransactionNotFound from dexorder import db, current_w3 -from dexorder.base import TransactionDict from dexorder.base.chain import current_chain from dexorder.base.order import TransactionRequest from dexorder.contract.contract_proxy import ContractTransaction @@ -34,16 +31,6 @@ class TransactionHandler: async def complete_transaction(self, job: TransactionJob) -> None: ... -class TransactionSender: - def __init__(self): - self.queue = Queue[TransactionDict]() - def run(self): - while True: - pass # todo - -current_transaction_sender: ContextVar[TransactionSender] = ContextVar('current_transaction_sender') - - def submit_transaction_request(tr: TransactionRequest): job = TransactionJob(id=uuid4(), chain=current_chain.get(), height=current_block.get().height, state=TransactionJobState.Requested, request=tr) db.session.add(job)