finaldata debugged

This commit is contained in:
Tim
2024-02-18 23:12:52 -04:00
parent 6b3e464574
commit ee2ef4c7ac
7 changed files with 232 additions and 64 deletions

View File

@@ -1,18 +1,28 @@
import asyncio
import logging
from datetime import datetime
import sys
from datetime import datetime, timedelta
from web3.types import EventData
from dexorder import dec
from dexorder import dec, from_timestamp, blockchain, config
from dexorder.bin.executable import execute
from dexorder.blockstate import current_blockstate, BlockState
from dexorder.blocktime import get_block_timestamp
from dexorder.configuration import parse_args
from dexorder.contract import get_contract_event
from dexorder.database.model.block import current_block
from dexorder.database.model.pool import PoolDict
from dexorder.ohlc import LightOHLCRepository
from dexorder.pools import get_uniswap_data
from dexorder.util import hexstr
from dexorder.util.shutdown import fatal
from dexorder.walker import BlockWalker
log = logging.getLogger(__name__)
log = logging.getLogger('dexorder')
ohlcs = LightOHLCRepository()
async def handle_backfill_uniswap_swaps(swaps: list[EventData]):
@@ -22,21 +32,26 @@ async def handle_backfill_uniswap_swaps(swaps: list[EventData]):
# now execute the swaps synchronously
for swap in swaps:
pool, time, price = await get_uniswap_data(swap)
await handle_uniswap_swap(pool, time, price)
ohlcs.light_update_all(pool['address'], time, price)
async def handle_uniswap_swap(pool: PoolDict, time: datetime, price: dec):
pass
def finalize_callback(block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
def flush_callback():
# start = now()
log.info("finalizing OHLC's")
ohlc_save(block, diffs)
# log.info("finalizing OHLC's")
# log.info(f'\ttook {(now() - start).total_seconds():.1f} seconds')
log.info(f'backfill completed through block {block.height} {from_timestamp(block.timestamp):%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}')
block = current_block.get()
log.info(f'backfill completed through block {block.height} '
f'{from_timestamp(block.timestamp):%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}')
ohlcs.flush()
async def main():
walker = BlockWalker()
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
log.setLevel(logging.DEBUG)
parse_args()
if config.ohlc_dir is None:
fatal('an ohlc_dir must be configured')
await blockchain.connect()
walker = BlockWalker(flush_callback, timedelta(seconds=5)) # todo flush every minute
walker.add_event_trigger(handle_backfill_uniswap_swaps,
get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True)
await walker.run()

View File

@@ -259,4 +259,52 @@ class BlockState:
return compress_diffs(difflist)
# noinspection PyMethodMayBeStatic
class FinalizedBlockState:
"""
Does not act like a BlockState at all but like a regular underlying datastructure. Used by backfill when working
with finalized data that does not need BlockData since it can never be reorganized
"""
def __init__(self):
self.data = {}
self.by_hash = {}
def add_block(self, block: Block) -> Optional[Fork]:
self.by_hash[block.hash] = block
return self.fork(block)
def delete_block(self, block: Union[Block, Fork, bytes]):
blockhash = block if isinstance(block, bytes) else block.hash
try:
del self.by_hash[blockhash]
except KeyError:
pass
def fork(self, block: Block):
return Fork([block.hash], height=block.height)
def get(self, _fork: Optional[Fork], series, key, default=NARG):
result = self.data.get(series,{}).get(key, default)
if result is NARG:
raise KeyError(key)
return result
def set(self, _fork: Optional[Fork], series, key, value, overwrite=True):
assert overwrite
self.data.setdefault(series, {})[key] = value
def iteritems(self, _fork: Optional[Fork], series):
return self.data.get(series,{}).items()
def iterkeys(self, _fork: Optional[Fork], series):
return self.data.get(series,{}).keys()
def itervalues(self, _fork: Optional[Fork], series):
return self.data.get(series,{}).values()
def delete_series(self, _fork: Optional[Fork], series: str):
del self.data[series]
current_blockstate = ContextVar[BlockState]('current_blockstate')

View File

@@ -11,9 +11,10 @@ log = logging.getLogger(__name__)
@alru_cache(maxsize=1024)
async def get_block_timestamp(blockhash) -> int:
block = current_blockstate.get().by_hash.get(blockhash)
if block is not None:
return block.timestamp
try:
return current_blockstate.get().by_hash[blockhash]
except (LookupError, KeyError):
pass
response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False])
raw = hexint(response['result']['timestamp'])
# noinspection PyTypeChecker

View File

@@ -1,8 +1,9 @@
import json, os, logging
import json
import os
from .. import current_w3 as _current_w3
from .abi import abis
from .contract_proxy import ContractProxy
from .. import current_w3
def get_contract_data(name):
@@ -14,7 +15,7 @@ def get_contract_data(name):
def get_contract_event(contract_name:str, event_name:str):
return getattr(_current_w3.get().eth.contract(abi=get_contract_data(contract_name)['abi']).events, event_name)()
return getattr(current_w3.get().eth.contract(abi=get_contract_data(contract_name)['abi']).events, event_name)()
class ERC20 (ContractProxy):

View File

@@ -174,10 +174,22 @@ class Chunk:
bars = json.load(file)
self.bars = [NativeOHLC.from_ohlc(ohlc) for ohlc in bars]
except FileNotFoundError:
self.bars = []
self.bars: list[NativeOHLC] = []
def update(self, native: NativeOHLC):
def bar_at(self, time: datetime) -> Optional[NativeOHLC]:
if not self.bars:
return None
start = self.bars[0].start
index = (time - start) // self.period
if index >= len(self.bars):
return None
bar = self.bars[index]
assert bar.start == time
return bar
def update(self, native: NativeOHLC, *, backfill=False):
if not self.bars:
self.bars = [native]
return
@@ -185,9 +197,18 @@ class Chunk:
index = (native.start - start) // self.period
# log.debug(f'save {symbol} {ohlc_name(period)} chunk {start} index {index} <= {len(chunk)}')
if index > len(self.bars):
log.error(f'chunk gap: {index} > {len(self.bars)} {self.symbol} {period_name(self.period)} {native}' + ''.join(
f'\n\t{c}' for c in self.bars))
exit(1)
if not backfill:
log.error(f'chunk gap: {index} > {len(self.bars)} {self.symbol} {period_name(self.period)} {native}' + ''.join(
f'\n\t{c}' for c in self.bars))
exit(1)
else:
last_bar = self.bars[-1]
last_price = last_bar.close
time = last_bar.start
for _ in range(index-len(self.bars)):
time += self.period
self.bars.append(NativeOHLC(time, None, None, None, last_price))
assert index == len(self.bars)
if index == len(self.bars):
assert self.bars[-1].start + self.period == native.start
self.bars.append(native)
@@ -218,12 +239,19 @@ class Chunk:
class OHLCRepository:
def __init__(self, base_dir: str = None):
""" can't actually make more than one of these because there's a global recent_ohlcs BlockDict """
if base_dir is None:
base_dir = config.ohlc_dir
self.dir = base_dir
self._dir = base_dir
self.cache = LFUCache(len(OHLC_PERIODS) * 1024)
self.dirty_chunks = set()
@property
def dir(self):
if self._dir is None:
self._dir = config.ohlc_dir
if self._dir is None:
raise ValueError('OHLCRepository needs an ohlc_dir configured')
return self._dir
@staticmethod
def add_symbol(symbol: str, period: timedelta = None):
if period is not None:
@@ -235,6 +263,7 @@ class OHLCRepository:
recent_ohlcs[(symbol, period)] = []
def update_all(self, symbol: str, time: datetime, price: dec, *, create: bool = True):
""" the update_all() and update() methods generate bars for the recent_ohlcs BlockDict """
for period in OHLC_PERIODS:
self.update(symbol, period, time, price, create=create)
@@ -279,7 +308,10 @@ class OHLCRepository:
return updated
def save_all(self, symbol: str, period: timedelta, ohlc_list: list[NativeOHLC]) -> None:
""" saves all OHLC's in the list """
"""
The save_all() and save() methods interact with Chunks (disk files) not the BlockState.
saves all OHLC's in the list
"""
for ohlc in ohlc_list:
self.save(symbol, period, ohlc)
@@ -311,6 +343,39 @@ class OHLCRepository:
self.dirty_chunks.clear()
class LightOHLCRepository (OHLCRepository):
"""
Used for backfill when all data is known to be final. Keeps a simple table of current NativeOHLC candles.
"""
def __init__(self, base_dir: str = None):
super().__init__(base_dir)
self.current:dict[tuple[str,timedelta],NativeOHLC] = {} # current bars keyed by symbol
self.dirty_bars = set()
def light_update_all(self, symbol: str, time: datetime, price: dec):
for period in OHLC_PERIODS:
self.light_update(symbol, period, time, price)
def light_update(self, symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None,
*, backfill=True):
start = ohlc_start_time(time, period)
chunk = self.get_chunk(symbol, period, start)
key = symbol, period
prev = self.current.get(key)
if prev is None:
prev = self.current[key] = chunk.bar_at(start)
if prev is None:
bar = NativeOHLC(start, price, price, price, price)
chunk.update(bar, backfill=backfill)
self.dirty_chunks.add(chunk)
else:
updated = update_ohlc(prev, period, time, price)
for bar in updated:
chunk = self.get_chunk(symbol, period, bar.start)
chunk.update(bar, backfill=backfill)
self.dirty_chunks.add(chunk)
def pub_ohlc(_series:str, key: OHLCKey, bars: list[NativeOHLC]):
pool_addr, period = key
chain_id = current_chain.get().chain_id

View File

@@ -13,7 +13,7 @@ from dexorder.blockstate.blockdata import K, V
from dexorder.blocktime import get_block_timestamp
from dexorder.database.model.pool import PoolDict
from dexorder.tokens import get_token
from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address, log
from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address
log = logging.getLogger(__name__)
@@ -38,7 +38,8 @@ async def load_pool(address: str) -> PoolDict:
decimals = token0['decimals'] - token1['decimals']
found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value,
base=t0, quote=t1, fee=fee, decimals=decimals)
log.debug(f'new UniswapV3 pool {token0["symbol"]}\\{token1["symbol"]} .{decimals} {address}')
log.debug(f'new UniswapV3 pool {token0["symbol"]}\\{token1["symbol"]} '
f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}')
else: # NOT a genuine Uniswap V3 pool if the address test doesn't pass
log.debug(f'new Unknown pool at {address}')
except ContractLogicError:

View File

@@ -1,13 +1,20 @@
import asyncio
import logging
from asyncio import Queue
from datetime import timedelta
from typing import Union, Callable
from websockets import ConnectionClosedError
from dexorder import Blockchain, config, db
from dexorder import Blockchain, config, db, now, current_w3
from dexorder.base.chain import current_chain
from dexorder.blockchain.connection import create_w3
from dexorder.blockstate import current_blockstate
from dexorder.blockstate.state import FinalizedBlockState
from dexorder.database.model import Block
from dexorder.database.model.block import current_block
from dexorder.progressor import BlockProgressor
from dexorder.util.async_util import Maywaitable
log = logging.getLogger(__name__)
@@ -16,44 +23,87 @@ class BlockWalker (BlockProgressor):
"""
Walker is similar to the Runner in that it progresses through the blockchain, but unlike the Runner, the Walker only
processes blocks after they have reached finalization age. It does not create or maintain any BlockState.
Walker additionally has a delayed finalization mechanism to allow infrequent flushing of accumulated data.
"""
def __init__(self):
def __init__(self,
flush_callback:Callable[[],Maywaitable[None]] = None,
flush_delay: Union[None, int, timedelta] = None):
"""
:param flush_delay: either a number of blocks or a time interval. after the flush_delay
"""
super().__init__()
self.queue: Queue = Queue()
self.running = False
self.flush_callback = flush_callback
self.flush_delay = flush_delay
self.flush_type = None if flush_delay is None else 'time' if isinstance(flush_delay, timedelta) else 'blocks'
async def run(self):
self.running = True
db.connect()
w3 = create_w3()
chain_id = await w3.eth.chain_id
chain = Blockchain.for_id(chain_id)
current_chain.set(chain)
w3 = current_w3.get()
chain = current_chain.get()
chain_id = chain.chain_id
confirm_offset = (config.confirms if config.confirms is not None else chain.confirms) - 1
batch_size = config.batch_size if config.batch_size is not None else chain.batch_size
current_blockstate.set(FinalizedBlockState())
kv_key = f'walker_height|{chain_id}'
processed_height = db.kv.get(kv_key, config.backfill)
with db.session:
processed_height = db.kv.get(kv_key, config.backfill)
log.info(f'walker starting at block {processed_height}')
last_flush = processed_height if self.flush_type == 'blocks' else now() if self.flush_type == 'time' else None
prev_height = None
session = db.session
session.begin()
while self.running:
try:
block = await w3.eth.get_block('latest')
latest_height = block['number']
if latest_height > prev_height:
latest_rawblock = await w3.eth.get_block('latest')
latest_height = latest_rawblock['number']
if prev_height is None or latest_height > prev_height:
prev_height = latest_height
log.debug(f'polled new block {latest_height}')
promotion_height = latest_height - confirm_offset
while processed_height <= promotion_height:
stop = min(promotion_height, processed_height+batch_size-1)
await self.handle(processed_height, stop, chain)
processed_height = db.kv[kv_key] = stop
cur_height = min(promotion_height, processed_height+batch_size-1)
block_data = await w3.eth.get_block(cur_height)
height = block_data['number']
assert height == cur_height
block = Block(chain=chain.chain_id, height=cur_height, hash=(block_data['hash']),
parent=(block_data['parentHash']), data=block_data)
current_block.set(block)
await self.handle(processed_height, cur_height, chain=chain, w3=w3)
if self.flush_delay is None or \
self.flush_type=='blocks' and last_flush + self.flush_delay <= processed_height or \
self.flush_type=='time' and last_flush + self.flush_delay <= now():
if self.flush_callback is not None:
self.flush_callback()
# flush height to db
db.kv[kv_key] = cur_height
if self.flush_type=='blocks':
last_flush = cur_height
elif self.flush_type=='time':
last_flush = now()
# this is the only way to commit any data to the db. everything else is a rollback.
db.session.commit()
db.session.begin()
processed_height = cur_height
if not self.running:
break
await asyncio.sleep(config.polling)
except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e:
log.debug(f'walker timeout {e}')
finally:
# anything that wasnt committed yet by the flush timer is discarded
# noinspection PyBroadException
try:
db.session.rollback()
except Exception:
pass
# noinspection PyBroadException
try:
db.session.close()
except Exception:
pass
# noinspection PyBroadException
try:
# noinspection PyUnresolvedReferences
@@ -62,25 +112,12 @@ class BlockWalker (BlockProgressor):
pass
async def handle(self, from_height, to_height, chain=None):
async def handle(self, from_height, to_height, *, chain=None, w3=None):
log.info(f'processing blocks {from_height} - {to_height}')
if chain is None:
chain = current_chain.get()
session = None
try:
batches = await self.get_backfill_batches(from_height, to_height)
session = db.session
session.begin()
await self.invoke_callbacks(batches, chain)
except: # legitimately catch EVERYTHING because we re-raise
log.debug('rolling back session')
if session is not None:
session.rollback()
raise
else:
if session is not None:
session.commit()
log.info(f'completed through block {to_height}')
finally:
if session is not None:
session.close()
if w3 is None:
w3 = current_w3.get()
batches = await self.get_backfill_batches(from_height, to_height, w3=w3)
await self.invoke_callbacks(batches, chain)
log.info(f'completed through block {to_height}')