refactored runner callback setup into main as prep for separate ohlc process

This commit is contained in:
tim
2024-01-15 23:12:13 -04:00
parent 7a4f4c485f
commit be91e75d71
7 changed files with 152 additions and 74 deletions

View File

@@ -1,2 +1,2 @@
#!/bin/bash
docker run --network=host --ulimit memlock=-1 docker.dragonflydb.io/dragonflydb/dragonfly:latest --maxmemory 1G --hz=1 --dbfilename '' "$@"
docker run --network=host --ulimit memlock=-1 docker.dragonflydb.io/dragonflydb/dragonfly:latest --proactor_threads 1 --maxmemory 256MiB --hz=1 --dbfilename '' "$@"

85
src/dexorder/base/ohlc.py Normal file
View File

@@ -0,0 +1,85 @@
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from dexorder import dec
log = logging.getLogger(__name__)
OHLC_PERIODS = [
timedelta(minutes=1), timedelta(minutes=3), timedelta(minutes=5), timedelta(minutes=10), timedelta(minutes=15), timedelta(minutes=30),
timedelta(hours=1), timedelta(hours=2), timedelta(hours=4), timedelta(hours=8), timedelta(hours=12),
timedelta(days=1), timedelta(days=2), timedelta(days=3), timedelta(days=7)
]
OHLC_DATE_ROOT = datetime(2009, 1, 4) # Sunday before Bitcoin Genesis
def ohlc_name(period: timedelta) -> str:
return f'{period//timedelta(minutes=1)}m' if period < timedelta(hours=1) \
else f'{period//timedelta(hours=1)}H' if period < timedelta(days=1) \
else f'{period//timedelta(days=7)}W' if period == timedelta(days=7) \
else f'{period//timedelta(days=1)}D'
def ohlc_start_time(time, period: timedelta):
""" returns the start time of the ohlc containing time, such that start_time <= time < start_time + period """
period_sec = int(period.total_seconds())
period_count = (time - OHLC_DATE_ROOT).total_seconds() // period_sec
return OHLC_DATE_ROOT + timedelta(seconds=period_sec * period_count)
@dataclass(frozen=True)
class OHLC:
start_time: datetime # first datetime included in this range
period: timedelta # the interval covered by this range, starting from start_time
# if no swaps happen during the interval, heights are set to prev_ohlc.last_height
first_height: int = None # blockchain height of the first trade in this range.
last_height: int = None # last_height == first_height if there's zero or one trades during this interval
# if no swaps happen during the interval, prices are set to prev_ohlc.close
open: dec = None
high: dec = None
low: dec = None
close: dec = None
has_data: bool = False # True iff any trade has happened this period
def update(self, height: int, time: datetime, price: dec) -> list['OHLC',...:'OHLC']:
""" returns an ordered list of OHLC's that have been created/modified by the new price """
assert time >= self.start_time
result = []
cur = self
start = self.start_time
while True:
end = start + self.period
if time < end:
break
result.append(cur)
start = end
cur = OHLC(start, self.period, cur.last_height, cur.last_height, cur.close, cur.close, cur.close, cur.close)
if not cur.has_data:
cur = OHLC(cur.start_time, self.period, height, height, price, price, price, price, True)
else:
cur = OHLC(cur.start_time, self.period, cur.first_height, height, cur.open, max(cur.high,price), min(cur.low,price), price, True)
result.append(cur)
return result
class OHLCRepository:
def __init__(self, base_dir: str):
self.dir = base_dir
def chunk_path(self, symbol: str, period: timedelta, time: datetime) -> str:
start = ohlc_start_time(time, period)
name = ohlc_name(period)
return f'{self.dir}/{symbol}/{name}/' + (
f'{start.year}/{symbol}-{name}-{start:%Y%m%d}.json' if period < timedelta(hours=1) else # <1H data has a file per day
f'{start.year}/{symbol}-{name}-{start:%Y%m}.json' if period < timedelta(days=7) else # <1W data has a file per month
f'{symbol}-{name}.json' # long periods are a single file for all of history
)
if __name__ == '__main__':
r = OHLCRepository('')
for p in OHLC_PERIODS:
print(f'{ohlc_name(p)}\t{r.chunk_path("symbol",p, datetime.utcnow())}')

View File

@@ -3,16 +3,70 @@ import sys
from asyncio import CancelledError
from dexorder import db, blockchain
from dexorder.base.chain import current_chain
from dexorder.bin.executable import execute
from dexorder.blockstate.blockdata import BlockData
from dexorder.blockstate.db_state import DbState
from dexorder.configuration import parse_args
from dexorder.event_handler import init_order_triggers
from dexorder.contract import get_contract_event
from dexorder.contract.dexorder import get_factory_contract, get_dexorder_contract
from dexorder.event_handler import init_order_triggers, init, dump_log, handle_vault_created, handle_order_placed, handle_transfer, handle_uniswap_swap, \
handle_swap_filled, handle_order_canceled, handle_order_cancel_all, handle_dexorderexecutions, activate_time_triggers, activate_price_triggers, \
process_active_tranches, process_execution_requests
from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.memcache import memcache
from dexorder.runner import BlockStateRunner
from dexorder.transaction import handle_transaction_receipts, create_transactions, send_transactions
log = logging.getLogger('dexorder')
LOG_ALL_EVENTS = True # for debug todo config
#
# These event callbacks are basically the run loop for every block
#
def setup_logevent_triggers(runner):
# the triggers for each log events are triggered in the order of event registry, so the
# code ordering here is also the trigger order: e.g. we process all vault creation events
# before any order creations
if LOG_ALL_EVENTS:
log.debug('all events:')
runner.add_event_trigger(dump_log, None, {})
factory = get_factory_contract()
if factory is None:
log.warning(f'No Factory for {current_chain.get()}')
vault_created = get_contract_event('Factory', 'VaultCreated')
else:
vault_created = factory.events.VaultCreated()
dexorder = get_dexorder_contract()
if dexorder is None:
log.warning(f'No Dexorder for {current_chain.get()}')
executions = get_contract_event('Dexorder', 'DexorderExecutions')
else:
executions = dexorder.events.DexorderExecutions()
runner.add_event_trigger(init)
runner.add_event_trigger(handle_vault_created, vault_created)
runner.add_event_trigger(handle_order_placed, get_contract_event('OrderLib', 'DexorderSwapPlaced'))
runner.add_event_trigger(handle_transfer, get_contract_event('ERC20', 'Transfer'))
runner.add_event_trigger(handle_uniswap_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap'))
runner.add_event_trigger(handle_swap_filled, get_contract_event('OrderLib', 'DexorderSwapFilled'))
runner.add_event_trigger(handle_order_canceled, get_contract_event('OrderLib', 'DexorderSwapCanceled'))
runner.add_event_trigger(handle_order_cancel_all, get_contract_event('OrderLib', 'DexorderCancelAll'))
runner.add_event_trigger(handle_transaction_receipts)
runner.add_event_trigger(handle_dexorderexecutions, executions)
# these callbacks run after the ones above on each block, plus these also run every second
runner.add_postprocess_trigger(activate_time_triggers)
runner.add_postprocess_trigger(activate_price_triggers)
runner.add_postprocess_trigger(process_active_tranches)
runner.add_postprocess_trigger(process_execution_requests)
runner.add_postprocess_trigger(create_transactions)
runner.add_postprocess_trigger(send_transactions)
async def main():
@@ -31,19 +85,21 @@ async def main():
with db.session:
state = db_state.load()
if state is not None:
if redis_state:
await redis_state.init(state)
log.info(f'loaded state from db for root block {state.root_block}')
runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None)
setup_logevent_triggers(runner)
if db:
# noinspection PyTypeChecker
runner.on_state_init.append(init_order_triggers)
# noinspection PyUnboundLocalVariable
runner.on_promotion.append(db_state.save)
if redis_state:
# noinspection PyTypeChecker
runner.on_head_update.append(redis_state.save)
try:
await runner.run()
except CancelledError:

View File

@@ -24,57 +24,6 @@ from dexorder.util.async_util import maywait
log = logging.getLogger(__name__)
LOG_ALL_EVENTS = True # for debug
def setup_logevent_triggers(runner):
runner.events.clear()
# the triggers for each log events are triggered in the order of event registry, so the
# code ordering here is also the trigger order: e.g. we process all vault creation events
# before any order creations
if LOG_ALL_EVENTS:
log.debug('all events:')
runner.add_event_trigger(dump_log, None, {})
factory = get_factory_contract()
if factory is None:
log.warning(f'No Factory for {current_chain.get()}')
vault_created = get_contract_event('Factory', 'VaultCreated')
else:
vault_created = factory.events.VaultCreated()
dexorder = get_dexorder_contract()
if dexorder is None:
log.warning(f'No Dexorder for {current_chain.get()}')
executions = get_contract_event('Dexorder', 'DexorderExecutions')
else:
executions = dexorder.events.DexorderExecutions()
#
# THIS IS BASICALLY THE MAIN RUN LOOP FOR EVERY BLOCK
#
runner.add_event_trigger(init)
runner.add_event_trigger(handle_vault_created, vault_created)
runner.add_event_trigger(handle_order_placed, get_contract_event('OrderLib', 'DexorderSwapPlaced'))
runner.add_event_trigger(handle_transfer, get_contract_event('ERC20', 'Transfer'))
runner.add_event_trigger(handle_uniswap_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap'))
runner.add_event_trigger(handle_swap_filled, get_contract_event('OrderLib', 'DexorderSwapFilled'))
runner.add_event_trigger(handle_order_canceled, get_contract_event('OrderLib', 'DexorderSwapCanceled'))
runner.add_event_trigger(handle_order_cancel_all, get_contract_event('OrderLib', 'DexorderCancelAll'))
runner.add_event_trigger(handle_transaction_receipts)
runner.add_event_trigger(handle_dexorderexecutions, executions)
# these callbacks run after the ones above on each block, plus these also run every second
runner.add_postprocess_trigger(activate_time_triggers)
runner.add_postprocess_trigger(activate_price_triggers)
runner.add_postprocess_trigger(process_active_tranches)
runner.add_postprocess_trigger(process_execution_requests)
runner.add_postprocess_trigger(create_transactions)
runner.add_postprocess_trigger(send_transactions)
def dump_log(eventlog):
log.debug(f'\t{eventlog}')
@@ -265,7 +214,7 @@ def process_active_tranches():
for tk, proof in active_tranches.items():
old_req = execution_requests.get(tk)
height = current_block.get().height
if old_req is None or old_req.height <= height: # <= used so proof is updated with more recent values
if old_req is None or old_req.height <= height: # '<=' is used so proof is updated with more recent values
log.info(f'execution request for {tk}')
execution_requests[tk] = ExecutionRequest(height, proof)
@@ -277,7 +226,9 @@ async def process_execution_requests():
tk: TrancheKey
er: ExecutionRequest
pending = inflight_execution_requests.get(tk)
if pending is None or height-pending >= 30: # todo execution timeout => retry ; should we use timestamps? configure per-chain.
if pending is None or height-pending >= 30:
# todo execution timeout => retry ; should we use timestamps? configure per-chain.
# todo check balances
execs[tk] = er
else:
log.debug(f'tranche {tk} is pending execution')

View File

@@ -26,7 +26,6 @@ new_price_triggers:dict[str, set[PriceTrigger]] = defaultdict(set) # when price
unconstrained_price_triggers: BlockSet[PriceTrigger] = BlockSet('upt') # tranches with no price constraints, whose time constraint is fulfilled
active_tranches: BlockDict[TrancheKey, Optional[PriceProof]] = BlockDict('at') # tranches which have passed all constraints and should be executed
execution_requests:BlockDict[TrancheKey, ExecutionRequest] = BlockDict('e') # generated by the active tranches
# todo should this really be blockdata?
inflight_execution_requests:BlockDict[TrancheKey, int] = BlockDict('ei') # value is block height when the request was sent

View File

@@ -16,7 +16,6 @@ from dexorder.blockstate import BlockState, current_blockstate
from dexorder.blockstate.diff import DiffEntryItem
from dexorder.database.model import Block
from dexorder.database.model.block import current_block, latest_block
from dexorder.event_handler import setup_logevent_triggers
from dexorder.util import hexstr, topic
from dexorder.util.async_util import maywait
@@ -64,9 +63,11 @@ class BlockStateRunner:
self.events.append((callback, event, log_filter))
def add_postprocess_trigger(self, callback: Callable[[dict], None]):
# noinspection PyTypeChecker
self.postprocess_cbs.append(callback)
async def run(self):
# this run() process discovers new heads and puts them on a queue for the worker to process. the discovery is ether websockets or polling
return await (self.run_polling() if config.polling > 0 else self.run_ws())
async def run_ws(self):
@@ -102,7 +103,7 @@ class BlockStateRunner:
chain = Blockchain.for_id(chain_id)
current_chain.set(chain)
setup_logevent_triggers(self)
# this run() process discovers new heads and puts them on a queue for the worker to process
_worker_task = asyncio.create_task(self.worker())
while self.running:
@@ -147,10 +148,9 @@ class BlockStateRunner:
chain = Blockchain.for_id(chain_id)
current_chain.set(chain)
setup_logevent_triggers(self)
_worker_task = asyncio.create_task(self.worker())
prev_blockhash = None
prev_blockhash = None
while self.running:
try:
# polling mode is used primarily because Hardhat fails to deliver newHeads events after about an hour

View File

@@ -1,13 +1,10 @@
import logging
from abc import abstractmethod
from asyncio import Queue
from contextvars import ContextVar
from uuid import uuid4
from web3.exceptions import TransactionNotFound
from dexorder import db, current_w3
from dexorder.base import TransactionDict
from dexorder.base.chain import current_chain
from dexorder.base.order import TransactionRequest
from dexorder.contract.contract_proxy import ContractTransaction
@@ -34,16 +31,6 @@ class TransactionHandler:
async def complete_transaction(self, job: TransactionJob) -> None: ...
class TransactionSender:
def __init__(self):
self.queue = Queue[TransactionDict]()
def run(self):
while True:
pass # todo
current_transaction_sender: ContextVar[TransactionSender] = ContextVar('current_transaction_sender')
def submit_transaction_request(tr: TransactionRequest):
job = TransactionJob(id=uuid4(), chain=current_chain.get(), height=current_block.get().height, state=TransactionJobState.Requested, request=tr)
db.session.add(job)