backfill debugged

This commit is contained in:
Tim
2024-01-26 00:29:56 -04:00
parent 64384c3d3a
commit 455df0f038
7 changed files with 61 additions and 55 deletions

View File

@@ -1,9 +1,9 @@
import logging
import sys
from asyncio import CancelledError
from typing import Iterable, Union
from typing import Union, Reversible
from dexorder import blockchain, config
from dexorder import blockchain, config, from_timestamp, now
from dexorder.bin.executable import execute
from dexorder.blockstate import DiffItem
from dexorder.blockstate.blockdata import BlockData
@@ -13,17 +13,22 @@ from dexorder.configuration import parse_args
from dexorder.contract import get_contract_event
from dexorder.database import db
from dexorder.database.model import Block
from dexorder.event_handler import handle_uniswap_swap
from dexorder.event_handler import handle_uniswap_swap, check_ohlc_rollover
from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.memcache import memcache
from dexorder.ohlc import recent_ohlcs, ohlc_finalize, ohlcs
from dexorder.ohlc import recent_ohlcs, ohlc_save, ohlcs
from dexorder.runner import BlockStateRunner
from dexorder.util import hexstr
log = logging.getLogger('dexorder')
def finalize_callback(block: Block, _diffs: Iterable[Union[DiffItem, DiffEntryItem]]):
log.info(f'backfill completed through block {block.height} {block.timestamp:%Y-%m-%d %H:%M:%S} {block.hash}')
def finalize_callback(block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
start = now()
log.info("finalizing OHLC's...")
ohlc_save(block, diffs)
log.info(f'\ttook {(now() - start).total_seconds():.1f} seconds')
log.info(f'backfill completed through block {block.height} {from_timestamp(block.timestamp):%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}')
# noinspection DuplicatedCode
@@ -34,8 +39,7 @@ async def main():
parse_args()
if not config.ohlc_dir:
config.ohlc_dir = './ohlc'
log.warning('Defaulting ohlc_dir to ./ohlc')
ohlcs.dir = config.ohlc_dir
ohlcs.dir = config.ohlc_dir
await blockchain.connect()
redis_state = None
state = None
@@ -56,12 +60,11 @@ async def main():
runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None, timer_period=0)
runner.add_event_trigger(handle_uniswap_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap'))
runner.on_promotion.append(ohlc_finalize)
runner.postprocess_cbs.append(check_ohlc_rollover)
runner.on_promotion.append(finalize_callback)
if db:
# noinspection PyUnboundLocalVariable
runner.on_promotion.append(db_state.save)
runner.on_promotion.append(finalize_callback)
if redis_state:
runner.on_head_update.append(redis_state.save)

View File

@@ -15,7 +15,7 @@ from dexorder.event_handler import init_order_triggers, init, dump_log, handle_v
process_active_tranches, process_execution_requests, check_ohlc_rollover
from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.memcache import memcache
from dexorder.ohlc import ohlc_finalize
from dexorder.ohlc import ohlc_save
from dexorder.runner import BlockStateRunner
from dexorder.transaction import handle_transaction_receipts, create_transactions, send_transactions
@@ -97,7 +97,7 @@ async def main():
runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None)
setup_logevent_triggers(runner)
if config.ohlc_dir:
runner.on_promotion.append(ohlc_finalize)
runner.on_promotion.append(ohlc_save)
if db:
runner.on_state_init.append(init_order_triggers)
# noinspection PyUnboundLocalVariable

View File

@@ -10,7 +10,7 @@ from typing import Optional
@dataclass
class Config:
rpc_url: str = 'http://localhost:8545'
ws_url: str = 'ws://localhost:8545'
ws_url: Optional[str] = 'ws://localhost:8545'
rpc_urls: Optional[dict[str,str]] = field(default_factory=dict)
db_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorder'
datadb_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorderdata'

View File

@@ -1,7 +1,6 @@
import asyncio
import functools
import logging
from datetime import datetime
from uuid import UUID
from web3.types import EventData

View File

@@ -9,18 +9,17 @@ from cachetools import LFUCache
from dexorder import dec, config, from_isotime, minutely, from_timestamp
from dexorder.base.chain import current_chain
from dexorder.blockstate import BlockDict, DiffItem
from dexorder.blockstate import BlockDict, DiffItem, current_blockstate
from dexorder.blockstate.diff import DiffEntryItem
from dexorder.database.model import Block
from dexorder.database.model.block import current_block
log = logging.getLogger(__name__)
OHLC_PERIODS = [
timedelta(minutes=1),
# timedelta(minutes=3), timedelta(minutes=5), timedelta(minutes=10), timedelta(minutes=15), timedelta(minutes=30),
# timedelta(hours=1), timedelta(hours=2), timedelta(hours=4), timedelta(hours=8), timedelta(hours=12),
# timedelta(days=1), timedelta(days=2), timedelta(days=3), timedelta(days=7)
timedelta(minutes=3), timedelta(minutes=5), timedelta(minutes=10), timedelta(minutes=15), timedelta(minutes=30),
timedelta(hours=1), timedelta(hours=2), timedelta(hours=4), timedelta(hours=8), timedelta(hours=12),
timedelta(days=1), timedelta(days=2), timedelta(days=3), timedelta(days=7)
]
OHLC_DATE_ROOT = datetime(2009, 1, 4, tzinfo=timezone.utc) # Sunday before Bitcoin Genesis
@@ -51,7 +50,7 @@ class NativeOHLC:
@property
def ohlc(self) -> OHLC:
return [
self.start.isoformat(timespec='minutes'),
minutely(self.start),
None if self.open is None else str(self.open),
None if self.high is None else str(self.high),
None if self.low is None else str(self.low),
@@ -59,8 +58,7 @@ class NativeOHLC:
]
def ohlc_name(period: timedelta) -> str:
def period_name(period: timedelta) -> str:
return f'{period // timedelta(minutes=1)}m' if period < timedelta(hours=1) \
else f'{period // timedelta(hours=1)}H' if period < timedelta(days=1) \
else f'{period // timedelta(days=7)}W' if period == timedelta(days=7) \
@@ -86,7 +84,7 @@ def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[d
returns an ordered list of OHLC's that have been created/modified by the new time/price
if price is None, then bars are advanced based on the time but no new price is added to the series.
"""
log.debug(f'\tupdating {prev} with {minutely(time)} {price}')
# log.debug(f'\tupdating {prev} with {minutely(time)} {price}')
cur = NativeOHLC.from_ohlc(prev)
assert time >= cur.start
result = []
@@ -97,7 +95,7 @@ def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[d
break
result.append(cur.ohlc)
cur = NativeOHLC(end, None, None, None, cur.close)
log.debug(f'\ttime advancements: {result}')
# log.debug(f'\ttime advancements: {result}')
# if we are setting a price, update the current bar
if price is not None:
if cur.open is None:
@@ -109,8 +107,8 @@ def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[d
cur.low = min(cur.low, price)
cur.close = price
result.append(cur.ohlc)
log.debug(f'\tappended current bar: {cur.ohlc}')
log.debug(f'\tupdate result: {result}')
# log.debug(f'\tappended current bar: {cur.ohlc}')
# log.debug(f'\tupdate result: {result}')
return result
class OHLCKey (NamedTuple):
@@ -145,22 +143,23 @@ class OHLCRepository:
"""
if price is None, then bars are advanced based on the time but no new price is added to the series.
"""
logname = f'{symbol} {ohlc_name(period)}'
log.debug(f'Updating OHLC {logname} {minutely(time)} {price}')
logname = f'{symbol} {period_name(period)}'
# log.debug(f'Updating OHLC {logname} {minutely(time)} {price}')
key = (symbol, period)
# bars is a list of "recent" OHLC's stored as blockdata. we try to keep the recent array long enough to extend before the root block time
historical: Optional[list[OHLC]] = recent_ohlcs.get(key)
# log.debug(f'got recent {historical}')
if not historical:
if create is False or price is None:
return # do not track symbols which have not been explicity set up
p = str(price)
historical = []
updated = [OHLC((minutely(ohlc_start_time(time, period)), p, p, p, p))]
log.debug(f'\tcreated new bars {updated}')
# log.debug(f'\tcreated new bars {updated}')
else:
updated = update_ohlc(historical[-1], period, time, price)
# drop any historical bars that are older than we need
oldest_needed = from_timestamp(current_block.get().timestamp) - period # cover the root block time plus one period prior
oldest_needed = from_timestamp(current_blockstate.get().root_block.timestamp) - period # cover the root block time plus one period prior
trim = (oldest_needed - from_isotime(historical[0][0])) // period
if trim > 0:
historical = historical[trim:]
@@ -173,15 +172,16 @@ class OHLCRepository:
first_updated = from_isotime(updated[0][0])
overlap = (first_updated - last_bar) // period + 1
updated = historical[:-overlap] + updated if overlap > 0 else historical + updated
log.debug(f'\tnew recents: {updated}')
# log.debug(f'\tnew recents: {updated}')
recent_ohlcs.setitem(key, updated)
return updated
def save_all(self, symbol: str, period: timedelta, ohlc_list: list[OHLC]) -> None:
for ohlc in ohlc_list:
self.save(symbol, period, ohlc) # we need to act sequentially so we don't have conflicting access to chunks
self.save(symbol, period, ohlc) # we need to act synchronously so we don't have conflicting access to chunks
def save(self, symbol: str, period: timedelta, ohlc: OHLC) -> None:
# log.debug(f'saving ohlc {symbol} {period_name(period)} {ohlc}')
time = dt(ohlc[0])
chunk = self.get_chunk(symbol, period, time)
if not chunk:
@@ -189,8 +189,10 @@ class OHLCRepository:
else:
start = from_isotime(chunk[0][0])
index = (time - start) // period
log.debug(f'save {symbol} {ohlc_name(period)} chunk {start} index {index} <= {len(chunk)}')
assert index <= len(chunk)
# log.debug(f'save {symbol} {ohlc_name(period)} chunk {start} index {index} <= {len(chunk)}')
if index > len(chunk):
log.error(f'chunk gap: {index} > {len(chunk)} {symbol} {period_name(period)} {ohlc}'+''.join(f'\n\t{c}' for c in chunk))
exit(1)
if index == len(chunk):
assert from_isotime(chunk[-1][0]) + period == time
chunk.append(ohlc)
@@ -221,21 +223,22 @@ class OHLCRepository:
if not chunk:
return
path = self.chunk_path(symbol, period, from_isotime(chunk[0][0]))
try:
with open(path, 'w') as file:
json.dump(chunk, file)
return
except FileNotFoundError:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w') as file:
json.dump(chunk, file)
for _ in range(2):
try:
with open(path, 'w') as file:
json.dump(chunk, file)
self.cache[path] = chunk
return
except FileNotFoundError:
os.makedirs(os.path.dirname(path), exist_ok=True)
raise IOError(f'Could not write chunk {path}')
def chunk_path(self, symbol: str, period: timedelta, time: datetime, *, chain_id: int = None) -> str:
if chain_id is None:
chain_id = current_chain.get().chain_id
start = ohlc_start_time(time, period)
name = ohlc_name(period)
name = period_name(period)
return f'{self.dir}/{chain_id}/{symbol}/{name}/' + (
f'{start.year}/{symbol}-{name}-{start:%Y%m%d}.json' if period < timedelta(hours=1) else # <1H data has a file per day
f'{start.year}/{symbol}-{name}-{start:%Y%m}.json' if period < timedelta(days=7) else # <1W data has a file per month
@@ -247,19 +250,19 @@ def pub_ohlc(_series:str, key: OHLCKey, bars: list[OHLC]):
pool_addr, period = key
chain_id = current_chain.get().chain_id
return (
f'{chain_id}|{pool_addr}|{ohlc_name(period)}', # channel name is like 0x...|1m
f'{chain_id}|{pool_addr}|{period_name(period)}', # channel name is like 0x...|1m
'ohlcs',
(chain_id, pool_addr, bars)
)
def ohlc_key_to_str(k):
return f'{k[0]}|{ohlc_name(k[1])}'
return f'{k[0]}|{period_name(k[1])}'
def ohlc_str_to_key(s):
pool, period_name = s.split('|')
return pool, period_from_name(period_name)
def ohlc_finalize(_block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
def ohlc_save(_block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
"""
used as a finalization callback from BlockState data.
"""

View File

@@ -21,9 +21,10 @@ TimeTrigger = Callable[[int], None] # func(timestamp)
time_triggers:BlockSet[TimeTrigger] = BlockSet('tt')
PriceTrigger = Callable[[dec], Union[Awaitable[None],None]] # [async] func(pool_price)
UnconstrainedPriceTrigger = Callable[[Optional[dec]], Union[Awaitable[None],None]] # [async] func(pool_price)
price_triggers:dict[str, BlockSet[PriceTrigger]] = defaultdictk(lambda addr:BlockSet(f'pt|{addr}')) # different BlockSet per pool address
new_price_triggers:dict[str, set[PriceTrigger]] = defaultdict(set) # when price triggers are first set, they must be tested against the current price even if it didnt change this block
unconstrained_price_triggers: BlockSet[PriceTrigger] = BlockSet('upt') # tranches with no price constraints, whose time constraint is fulfilled
unconstrained_price_triggers: BlockSet[UnconstrainedPriceTrigger] = BlockSet('upt') # tranches with no price constraints, whose time constraint is fulfilled
active_tranches: BlockDict[TrancheKey, Optional[PriceProof]] = BlockDict('at') # tranches which have passed all constraints and should be executed
execution_requests:BlockDict[TrancheKey, ExecutionRequest] = BlockDict('e') # generated by the active tranches
# todo should this really be blockdata?

View File

@@ -98,7 +98,7 @@ class BlockStateRunner:
if self.state:
self.max_height_seen = max(self.max_height_seen, self.state.root_block.height)
self.running = True
return await (self.run_polling() if config.polling > 0 else self.run_ws())
return await (self.run_polling() if config.polling > 0 or not config.ws_url else self.run_ws())
async def run_ws(self):
w3ws = await create_w3_ws()
@@ -124,8 +124,8 @@ class BlockStateRunner:
if not self.running:
break
await async_yield()
except (ConnectionClosedError, TimeoutError):
pass
except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e:
log.debug(f'runner timeout {e}')
finally:
# noinspection PyBroadException
try:
@@ -168,8 +168,8 @@ class BlockStateRunner:
if not self.running:
break
await asyncio.sleep(config.polling)
except ConnectionClosedError:
pass
except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e:
log.debug(f'runner timeout {e}')
finally:
# noinspection PyBroadException
try:
@@ -258,7 +258,7 @@ class BlockStateRunner:
async def handle_head(self, chain, block, w3):
print(f'logger {log} {log.name} level {log.level} {logging.DEBUG} {logging.FATAL}')
log.debug(f'handle_head {block.height} {block.hash}')
log.debug(f'handle_head {block.height} {hexstr(block.hash)}')
session = None
batches = []
try:
@@ -365,7 +365,7 @@ class BlockStateRunner:
else:
new_root_fork = fork.for_height(promotion_height)
if new_root_fork:
log.debug(f'promoting root {new_root_fork.height} {new_root_fork.hash}')
log.debug(f'promoting root {new_root_fork.height} {hexstr(new_root_fork.hash)}')
diff_items = self.state.promote_root(new_root_fork)
for callback in self.on_promotion:
# todo try/except for known retryable errors