backfill fixes

This commit is contained in:
Tim
2024-01-25 00:59:59 -04:00
parent 878679c9d5
commit 96d54360b6
10 changed files with 111 additions and 33 deletions

View File

@@ -41,7 +41,8 @@ class Fork:
def for_height(self, height):
""" returns a new Fork object for an older block along this fork. used for root promotion. """
assert( height <= self.height )
if height > self.height :
raise ValueError
if height <= self.height - len(self.ancestry):
return None
return Fork(self.ancestry[self.height-height:], height=height)

View File

@@ -3,7 +3,6 @@ import sys
from asyncio import CancelledError
from dexorder import blockchain, config
from dexorder.base.ohlc import recent_ohlcs
from dexorder.bin.executable import execute
from dexorder.blockstate.blockdata import BlockData
from dexorder.blockstate.db_state import DbState
@@ -13,16 +12,22 @@ from dexorder.database import db
from dexorder.event_handler import handle_uniswap_swap
from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.memcache import memcache
from dexorder.ohlc import recent_ohlcs, ohlc_finalize, ohlcs
from dexorder.runner import BlockStateRunner
log = logging.getLogger('dexorder')
# noinspection DuplicatedCode
async def main():
# noinspection DuplicatedCode
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
log.setLevel(logging.DEBUG)
parse_args()
if not config.ohlc_dir:
config.ohlc_dir = './ohlc'
log.warning('Defaulting ohlc_dir to ./ohlc')
ohlcs.dir = config.ohlc_dir
await blockchain.connect()
redis_state = None
state = None
@@ -31,7 +36,6 @@ async def main():
redis_state = RedisState([recent_ohlcs]) # NOTE: ONLY the ohlc's are pushed to Redis. We do not want to touch anything else.
if db:
db.connect(url=config.datadb_url) # our main database is the data db
# noinspection DuplicatedCode
db_state = DbState(BlockData.by_opt('db'))
with db.session:
state = db_state.load()
@@ -46,6 +50,8 @@ async def main():
# noinspection PyTypeChecker
runner.add_event_trigger(handle_uniswap_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap'))
# noinspection PyTypeChecker
runner.on_promotion.append(ohlc_finalize)
if db:
# noinspection PyUnboundLocalVariable,PyTypeChecker
runner.on_promotion.append(db_state.save)

View File

@@ -2,7 +2,7 @@ import logging
import sys
from asyncio import CancelledError
from dexorder import db, blockchain
from dexorder import db, blockchain, config
from dexorder.base.chain import current_chain
from dexorder.bin.executable import execute
from dexorder.blockstate.blockdata import BlockData
@@ -15,11 +15,12 @@ from dexorder.event_handler import init_order_triggers, init, dump_log, handle_v
process_active_tranches, process_execution_requests, check_ohlc_rollover
from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.memcache import memcache
from dexorder.ohlc import ohlc_finalize
from dexorder.runner import BlockStateRunner
from dexorder.transaction import handle_transaction_receipts, create_transactions, send_transactions
log = logging.getLogger('dexorder')
LOG_ALL_EVENTS = True # for debug todo config
LOG_ALL_EVENTS = False # for debug todo config
#
@@ -70,6 +71,7 @@ def setup_logevent_triggers(runner):
runner.postprocess_cbs.append(send_transactions)
# noinspection DuplicatedCode
async def main():
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
log.setLevel(logging.DEBUG)
@@ -94,10 +96,13 @@ async def main():
runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None)
setup_logevent_triggers(runner)
if config.ohlc_dir:
# noinspection PyTypeChecker
runner.on_promotion.append(ohlc_finalize)
if db:
# noinspection PyTypeChecker
runner.on_state_init.append(init_order_triggers)
# noinspection PyUnboundLocalVariable
# noinspection PyUnboundLocalVariable,PyTypeChecker
runner.on_promotion.append(db_state.save)
if redis_state:
# noinspection PyTypeChecker

View File

@@ -104,7 +104,7 @@ class DbState(SeriesCollection):
for row in db.session.query(SeriesDict).where(SeriesDict.chain == chain_id, SeriesDict.series == data.series2str(series)):
key = data.str2key(row.key)
value = data.str2value(row.value)
log.debug(f'load {series} {key} {value}')
# log.debug(f'load {series} {key} {value}')
var[key] = value
completed_block.set(root_block)
log.debug(f'loaded db state from block {root_block}')

View File

@@ -6,6 +6,7 @@ from dexorder import DELETE
@dataclass
class DiffEntry:
""" DiffEntry is the "value" part of a key-value pair, but DiffEntry also has metadata about the block in which the value was set """
value: Union[Any, DELETE]
height: int
hash: bytes
@@ -13,6 +14,7 @@ class DiffEntry:
@dataclass
class DiffItem:
""" DiffItem is a simple series-key-value triple """
series: Any
key: Any
value: Any
@@ -22,6 +24,7 @@ class DiffItem:
@dataclass
class DiffEntryItem:
""" DiffEntryItem is a DiffItem that has a DiffEntry as its extended value, instead of storing just the primary value directly """
series: Any
key: Any
entry: DiffEntry

View File

@@ -12,11 +12,11 @@ class Config:
rpc_url: str = 'http://localhost:8545'
ws_url: str = 'ws://localhost:8545'
rpc_urls: Optional[dict[str,str]] = field(default_factory=dict)
db_url: str = 'postgresql://dexorder:redroxed@localhost/dexorder'
datadb_url: str = 'postgresql://dexorder:redroxed@localhost/dexorderdata'
ohlc_dir: str = './ohlc'
db_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorder'
datadb_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorderdata'
ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk
dump_sql: bool = False
redis_url: str = 'redis://localhost:6379'
redis_url: Optional[str] = 'redis://localhost:6379'
parallel_logevent_queries: bool = True
polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead

View File

@@ -4,7 +4,7 @@ from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from dexorder.database.model import Base
from dexorder.util import hexint
from dexorder.util import hexint, Field
class Block(Base):
@@ -25,5 +25,5 @@ class Block(Base):
current_block = ContextVar[Block]('Block.cur') # block for the current thread
latest_block = ContextVar[Block]('Block.latest') # most recent discovered but may not be processed yet
latest_block = Field[Block]() # most recent discovered block but maybe not the currently processing one
completed_block = ContextVar[Block]('Block.completed') # most recent fully-processed block

View File

@@ -3,20 +3,24 @@ import logging
import os
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Optional, NamedTuple
from typing import Optional, NamedTuple, Reversible, Union
from cachetools import LFUCache
from dexorder import dec, config, from_isotime, minutely
from dexorder import dec, config, from_isotime, minutely, from_timestamp
from dexorder.base.chain import current_chain
from dexorder.blockstate import BlockDict
from dexorder.blockstate import BlockDict, DiffItem
from dexorder.blockstate.diff import DiffEntryItem
from dexorder.database.model import Block
from dexorder.database.model.block import current_block
log = logging.getLogger(__name__)
OHLC_PERIODS = [
timedelta(minutes=1), timedelta(minutes=3), timedelta(minutes=5), timedelta(minutes=10), timedelta(minutes=15), timedelta(minutes=30),
timedelta(hours=1), timedelta(hours=2), timedelta(hours=4), timedelta(hours=8), timedelta(hours=12),
timedelta(days=1), timedelta(days=2), timedelta(days=3), timedelta(days=7)
timedelta(minutes=1),
# timedelta(minutes=3), timedelta(minutes=5), timedelta(minutes=10), timedelta(minutes=15), timedelta(minutes=30),
# timedelta(hours=1), timedelta(hours=2), timedelta(hours=4), timedelta(hours=8), timedelta(hours=12),
# timedelta(days=1), timedelta(days=2), timedelta(days=3), timedelta(days=7)
]
OHLC_DATE_ROOT = datetime(2009, 1, 4, tzinfo=timezone.utc) # Sunday before Bitcoin Genesis
@@ -136,13 +140,15 @@ class OHLCRepository:
for period in OHLC_PERIODS:
self.update(symbol, period, time, price, create=create)
def update(self, symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, *, create: bool = True) -> Optional[list[OHLC]]:
@staticmethod
def update(symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, *, create: bool = True) -> Optional[list[OHLC]]:
"""
if price is None, then bars are advanced based on the time but no new price is added to the series.
"""
logname = f'{symbol} {ohlc_name(period)}'
log.debug(f'Updating OHLC {logname} {minutely(time)} {price}')
key = (symbol, period)
# bars is a list of "recent" OHLC's stored as blockdata. we try to keep the recent array long enough to extend before the root block time
bars: Optional[list[OHLC]] = recent_ohlcs.get(key)
if not bars:
if create is False or price is None:
@@ -152,18 +158,27 @@ class OHLCRepository:
log.debug(f'\tcreated new bars {updated}')
else:
updated = update_ohlc(bars[-1], period, time, price)
if len(updated) == 1:
updated = [bars[-1], updated[0]] # return the previous finalized bar along with the updated current bar
# we need to retain enough recent history to at least cover the root block time, plus one previous finalized block
# first we construct the longest possible sequence
if not bars or not updated:
updated = (bars or []) + (updated or [])
else:
last_bar = from_isotime(bars[-1][0])
first_updated = from_isotime(updated[0][0])
overlap = (first_updated - last_bar) // period
updated = bars[:-overlap] + updated if overlap > 0 else bars + updated
# now we drop history that is older than we need
oldest_needed = from_timestamp(current_block.get().timestamp) - period # cover the root block time plus one period prior
trim = (oldest_needed - from_isotime(updated[0][0])) // period
if trim > 0:
updated = updated[trim:]
log.debug(f'\tnew recents: {updated}')
recent_ohlcs.setitem(key, updated)
if len(updated) > 1:
log.debug(f'\tsaving finalized bars: {updated[:-1]}')
self.save_all(symbol, period, updated[:-1]) # save any finalized bars to storage
return updated
def save_all(self, symbol: str, period: timedelta, ohlc_list: list[OHLC]) -> None:
for ohlc in ohlc_list:
self.save(symbol, period, ohlc)
self.save(symbol, period, ohlc) # we need to act sequentially so we don't have conflicting access to chunks
def save(self, symbol: str, period: timedelta, ohlc: OHLC) -> None:
time = dt(ohlc[0])
@@ -173,6 +188,7 @@ class OHLCRepository:
else:
start = from_isotime(chunk[0][0])
index = (time - start) // period
log.debug(f'save {symbol} {ohlc_name(period)} chunk {start} index {index} <= {len(chunk)}')
assert index <= len(chunk)
if index == len(chunk):
assert from_isotime(chunk[-1][0]) + period == time
@@ -214,10 +230,12 @@ class OHLCRepository:
json.dump(chunk, file)
def chunk_path(self, symbol: str, period: timedelta, time: datetime) -> str:
def chunk_path(self, symbol: str, period: timedelta, time: datetime, *, chain_id: int = None) -> str:
if chain_id is None:
chain_id = current_chain.get().chain_id
start = ohlc_start_time(time, period)
name = ohlc_name(period)
return f'{self.dir}/{symbol}/{name}/' + (
return f'{self.dir}/{chain_id}/{symbol}/{name}/' + (
f'{start.year}/{symbol}-{name}-{start:%Y%m%d}.json' if period < timedelta(hours=1) else # <1H data has a file per day
f'{start.year}/{symbol}-{name}-{start:%Y%m}.json' if period < timedelta(days=7) else # <1W data has a file per month
f'{symbol}-{name}.json' # long periods are a single file for all of history
@@ -240,6 +258,16 @@ def ohlc_str_to_key(s):
pool, period_name = s.split('|')
return pool, period_from_name(period_name)
def ohlc_finalize(_block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
"""
used as a finalization callback from BlockState data.
"""
for diff in diffs:
if diff.series == 'ohlc':
symbol, period = diff.key
ohlcs.save_all(symbol, period, diff.value)
# The most recent OHLC's are stored as block data. We store a list of at least the two latest bars, which provides clients with
# the latest finalized bar as well as the current open bar.
recent_ohlcs: BlockDict[OHLCKey, list[OHLC]] = BlockDict('ohlc', db=True, redis=True, pub=pub_ohlc,

View File

@@ -200,6 +200,7 @@ class BlockStateRunner:
parent = bytes.fromhex(block_data['parentHash'][2:])
height = int(block_data['number'], 0)
head = Block(chain=chain.chain_id, height=height, hash=blockhash, parent=parent, data=block_data)
latest_block.set(head)
if self.state or config.backfill:
# backfill batches
@@ -264,7 +265,6 @@ class BlockStateRunner:
if self.state is not None and block.hash in self.state.by_hash:
log.debug(f'block {block.hash} was already processed')
return
latest_block.set(block)
if self.state is None:
# initialize
self.state = BlockState(block)
@@ -343,18 +343,38 @@ class BlockStateRunner:
# isn't updated by the new fork is still queried from the root state to overwrite any stale data from the abandoned branch.
diff_items = self.state.diffs_by_hash[block.hash]
for callback in self.on_head_update:
# noinspection PyCallingNonCallable
await maywait(callback(block, diff_items))
# check for root promotion
promotion_height = fork.height - chain.confirms
if not fork.disjoint and promotion_height > self.state.root_block.height and (
new_root_fork := fork.for_height(promotion_height)):
promotion_height = latest_block.get().height - chain.confirms
new_root_fork = None
if fork.disjoint:
# individually check the fork's head and ancestor
if fork.height <= promotion_height:
new_root_fork = fork
else:
state = current_blockstate.get()
parent_block = state.by_hash[fork.parent]
if parent_block.height <= promotion_height:
new_root_fork = state.fork(parent_block)
else:
# non-disjoint, contiguous fork
if fork.height <= promotion_height:
new_root_fork = fork
else:
new_root_fork = fork.for_height(promotion_height)
if new_root_fork:
log.debug(f'promoting root {new_root_fork.height} {new_root_fork.hash}')
diff_items = self.state.promote_root(new_root_fork)
for callback in self.on_promotion:
# todo try/except for known retryable errors
# noinspection PyCallingNonCallable
await maywait(callback(self.state.root_block, diff_items))
# publish messages
if pubs and self.publish_all:
# noinspection PyCallingNonCallable
await maywait(self.publish_all(pubs))
except: # legitimately catch EVERYTHING because we re-raise
log.debug('rolling back session')
@@ -364,6 +384,7 @@ class BlockStateRunner:
self.state.delete_block(block.hash)
if config.parallel_logevent_queries:
for get_logs, *_ in batches:
# noinspection PyBroadException
try:
await get_logs
except Exception:
@@ -388,6 +409,7 @@ class BlockStateRunner:
session.begin()
try:
for callback in self.postprocess_cbs:
# noinspection PyCallingNonCallable
await maywait(callback())
except:
session.rollback()
@@ -402,5 +424,6 @@ class BlockStateRunner:
if self.state_initialized:
return
for cb in self.on_state_init:
# noinspection PyCallingNonCallable
await maywait(cb())
self.state_initialized = True

View File

@@ -1,5 +1,5 @@
import re
from typing import Callable, TypeVar, Generic, Union
from typing import Callable, TypeVar, Generic, Union, Any
from eth_utils import keccak
from hexbytes import HexBytes
@@ -71,3 +71,15 @@ class defaultdictk (Generic[K,V], dict[K,V]):
except KeyError:
default = self[item] = self.default_factory(item)
return default
T = TypeVar('T')
class Field (Generic[T]):
def __init__(self, value: T = None):
self._value = value
def get(self) -> T:
return self._value
def set(self, value: T):
self._value = value