From f22f8bf0172cf82797283b74c4b058c83baa10bc Mon Sep 17 00:00:00 2001 From: Tim Date: Wed, 27 Mar 2024 12:05:11 -0400 Subject: [PATCH] transaction manager separation; get_block() caching --- bin/RESET_DB.sh | 2 + src/dexorder/bin/finaldata.py | 2 +- src/dexorder/bin/main.py | 2 - src/dexorder/{blocktime.py => blocks.py} | 17 ++++++--- src/dexorder/database/__init__.py | 24 +++++++++--- src/dexorder/database/model/block.py | 5 ++- src/dexorder/event_handler.py | 13 +++---- src/dexorder/order/triggers.py | 2 +- src/dexorder/pools.py | 2 +- src/dexorder/runner.py | 47 ++++++++++++++---------- src/dexorder/transaction.py | 7 ++++ src/dexorder/walker.py | 6 +-- 12 files changed, 81 insertions(+), 48 deletions(-) create mode 100755 bin/RESET_DB.sh rename src/dexorder/{blocktime.py => blocks.py} (50%) diff --git a/bin/RESET_DB.sh b/bin/RESET_DB.sh new file mode 100755 index 0000000..e1d08e3 --- /dev/null +++ b/bin/RESET_DB.sh @@ -0,0 +1,2 @@ +#!/bin/bash +alembic downgrade base && alembic upgrade head diff --git a/src/dexorder/bin/finaldata.py b/src/dexorder/bin/finaldata.py index 38f5bb6..60bd5cc 100644 --- a/src/dexorder/bin/finaldata.py +++ b/src/dexorder/bin/finaldata.py @@ -9,7 +9,7 @@ from dexorder import from_timestamp, blockchain, config from dexorder.addrmeta import address_metadata from dexorder.base.chain import current_chain from dexorder.bin.executable import execute -from dexorder.blocktime import get_block_timestamp +from dexorder.blocks import get_block_timestamp from dexorder.configuration import parse_args from dexorder.contract import get_contract_event from dexorder.database.model.block import current_block, latest_block diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index 7ee06e9..84a56c0 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -66,8 +66,6 @@ def setup_logevent_triggers(runner): runner.postprocess_cbs.append(activate_price_triggers) runner.postprocess_cbs.append(process_active_tranches) runner.postprocess_cbs.append(process_execution_requests) - runner.postprocess_cbs.append(create_transactions) - runner.postprocess_cbs.append(send_transactions) # noinspection DuplicatedCode diff --git a/src/dexorder/blocktime.py b/src/dexorder/blocks.py similarity index 50% rename from src/dexorder/blocktime.py rename to src/dexorder/blocks.py index cff9806..01172a4 100644 --- a/src/dexorder/blocktime.py +++ b/src/dexorder/blocks.py @@ -3,19 +3,26 @@ import logging from async_lru import alru_cache from dexorder import current_w3 +from dexorder.base.chain import current_chain from dexorder.blockstate import current_blockstate +from dexorder.database.model import Block from dexorder.util import hexint log = logging.getLogger(__name__) -@alru_cache(maxsize=1024) async def get_block_timestamp(blockhash) -> int: + block = await get_block(blockhash) + return block.timestamp + + +@alru_cache(maxsize=128) +async def get_block(blockhash) -> Block: + # first look in the state try: - return current_blockstate.get().by_hash[blockhash].timestamp + return current_blockstate.get().by_hash[blockhash] except (LookupError, KeyError): pass + # otherwise query response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False]) - raw = hexint(response['result']['timestamp']) - # noinspection PyTypeChecker - return raw if type(raw) is int else hexint(raw) + return Block.from_data(current_chain.get().chain_id, response['result']) diff --git a/src/dexorder/database/__init__.py b/src/dexorder/database/__init__.py index 1aff459..b1c3c3d 100644 --- a/src/dexorder/database/__init__.py +++ b/src/dexorder/database/__init__.py @@ -71,13 +71,27 @@ class Db: def session(self) -> Session: s = _session.get() if s is None: - engine = _engine.get() - if engine is None: - raise RuntimeError('Cannot create session: no database engine set. Use dexorder.db.connect() first') - s = Session(engine, expire_on_commit=False, autoflush=False, autocommit=False) - _session.set(s) + s = self.make_session() return s + @staticmethod + def make_session(**kwargs) -> Session: + engine = _engine.get() + if engine is None: + raise RuntimeError('Cannot create session: no database engine set. Use dexorder.db.connect() first') + kwargs.setdefault('expire_on_commit', False) + s = Session(engine, **kwargs) + _session.set(s) + return s + + @staticmethod + def close_session(): + s = _session.get() + if s is not None: + s.close() + # noinspection PyTypeChecker + _session.set(None) + # noinspection PyShadowingNames def connect(self, url=None, migrate=True, reconnect=False, dump_sql=None): if _engine.get() is not None and not reconnect: diff --git a/src/dexorder/database/model/block.py b/src/dexorder/database/model/block.py index 3cf7d00..a858ca2 100644 --- a/src/dexorder/database/model/block.py +++ b/src/dexorder/database/model/block.py @@ -11,8 +11,9 @@ class Block(Base): @staticmethod def from_data(chain_id:int, data:dict): - return Block(chain=chain_id, height=data['number'], hash=hexstr(data['hash']), - parent=hexstr(data['parentHash']), data=data) + """ Builds a Block using the response data from an RPC server """ + return Block(chain=chain_id, height=int(data['number'],0), + hash=hexstr(data['hash']), parent=hexstr(data['parentHash']), data=data) chain: Mapped[int] = mapped_column(primary_key=True) height: Mapped[int] = mapped_column(primary_key=True) diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index 7d29b8a..89217e8 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -8,7 +8,7 @@ from web3.types import EventData from dexorder import current_pub, db, from_timestamp, minutely from dexorder.base.chain import current_chain, current_clock from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey -from dexorder.blocktime import get_block_timestamp +from dexorder.blocks import get_block_timestamp from dexorder.ohlc import ohlcs, recent_ohlcs from dexorder.transaction import submit_transaction_request from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data @@ -287,6 +287,10 @@ def finish_execution_request(req: TrancheExecutionRequest, error: str): log.error(f'Could not get order {OrderKey(req.vault, req.order_index)}') return tk = TrancheKey(req.vault, req.order_index, req.tranche_index) + try: + del execution_requests[tk] + except KeyError: + pass if error != '': log.debug(f'execution request for tranche {tk} had error "{error}"') if error == '': @@ -313,13 +317,6 @@ def finish_execution_request(req: TrancheExecutionRequest, error: str): # todo dont keep trying else: log.error(f'Unhandled execution error for transaction request {req} ERROR: "{error}"') - try: - er = execution_requests[tk] - except KeyError: - pass - else: - if er.height < current_block.get().height: - del execution_requests[tk] last_ohlc_rollover = 0 diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index e560ad2..4a7564d 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -168,7 +168,7 @@ class TrancheTrigger: cur = pool_prices[addr] if cur is not None: if self.pool_price_multiplier is None: - self.pool_price_multiplier = dec(10) ** dec(-pool['decimals']) + self.pool_price_multiplier = dec(10) ** dec(pool['decimals']) log.debug(f'adjusted cur price from {cur} => {cur*self.pool_price_multiplier}') cur *= self.pool_price_multiplier if cur is None or not self.has_line_constraint or all(await asyncio.gather( diff --git a/src/dexorder/pools.py b/src/dexorder/pools.py index 163ba78..98135d1 100644 --- a/src/dexorder/pools.py +++ b/src/dexorder/pools.py @@ -12,7 +12,7 @@ from dexorder.base.chain import current_chain from dexorder.base.orderlib import Exchange from dexorder.blockstate import BlockDict from dexorder.blockstate.blockdata import K, V -from dexorder.blocktime import get_block_timestamp +from dexorder.blocks import get_block_timestamp from dexorder.database.model.pool import PoolDict from dexorder.metadata import is_generating_metadata from dexorder.tokens import get_token diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 5228633..3d9ca3c 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -12,11 +12,13 @@ from dexorder import Blockchain, db, current_pub, async_yield, current_w3, confi from dexorder.base.chain import current_chain, current_clock, BlockClock from dexorder.base.fork import current_fork, Fork, DisjointFork from dexorder.blockchain.connection import create_w3_ws, create_w3 +from dexorder.blocks import get_block from dexorder.blockstate import BlockState, current_blockstate from dexorder.blockstate.diff import DiffEntryItem from dexorder.database.model import Block from dexorder.database.model.block import current_block, latest_block from dexorder.progressor import BlockProgressor +from dexorder.transaction import create_and_send_transactions from dexorder.util import hexstr, hexint, hexbytes from dexorder.util.async_util import maywait, Maywaitable from dexorder.util.shutdown import fatal @@ -113,7 +115,7 @@ class BlockStateRunner(BlockProgressor): async for message in w3ws.ws.process_subscriptions(): head = message['result'] log.debug(f'detected new block {head["number"]} {hexstr(head["hash"])}') - await self.add_head(head["hash"]) + await self.add_head(head) if not self.running: break await async_yield() @@ -186,13 +188,9 @@ class BlockStateRunner(BlockProgressor): blockhash = block_data['hash'] parent = block_data['parentHash'] height = block_data['number'] + head = Block(chain=chain.chain_id, height=height, hash=blockhash, parent=parent, data=block_data) except TypeError: - blockhash = head - response = await w3.provider.make_request('eth_getBlockByHash', [blockhash, False]) - block_data:dict = response['result'] - parent = bytes.fromhex(block_data['parentHash'][2:]) - height = int(block_data['number'], 0) - head = Block(chain=chain.chain_id, height=height, hash=blockhash, parent=parent, data=block_data) + head = await get_block(head) latest_block.set(head) if self.state or config.backfill: @@ -260,6 +258,7 @@ class BlockStateRunner(BlockProgressor): log.debug(f'handle_head {block.height} {hexstr(block.hash)}') session = None batches = [] + pubs = [] try: if self.state is not None and block.hash in self.state.by_hash: log.debug(f'block {block.hash} was already processed') @@ -306,10 +305,9 @@ class BlockStateRunner(BlockProgressor): # set up for callbacks current_block.set(block) current_fork.set(fork) - session = db.session + session = db.make_session(autocommit=False) session.begin() session.add(block) - pubs = [] current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args))) # used by handle_vault_created if not self.state_initialized: await self.do_state_init_cbs() @@ -352,11 +350,6 @@ class BlockStateRunner(BlockProgressor): # todo try/except for known retryable errors # noinspection PyCallingNonCallable await maywait(callback(self.state.root_block, diff_items)) - - # publish messages - if pubs and self.publish_all: - # noinspection PyCallingNonCallable - await maywait(self.publish_all(pubs)) except: # legitimately catch EVERYTHING because we re-raise log.debug('rolling back session') if session is not None: @@ -374,11 +367,28 @@ class BlockStateRunner(BlockProgressor): raise else: if session is not None: - session.commit() + db.session.commit() + + # manage transactions in a separate database session + # todo separate out the transaction manager completely from runner + try: + await create_and_send_transactions() + except: + db.session.rollback() + raise + else: + db.session.commit() + finally: + db.close_session() + + # publish messages + if pubs and self.publish_all: + # noinspection PyCallingNonCallable + await maywait(self.publish_all(pubs)) + log.info(f'completed block {block}') finally: - if session is not None: - session.close() + db.close_session() async def handle_time_tick(self, block): @@ -399,8 +409,7 @@ class BlockStateRunner(BlockProgressor): else: session.commit() finally: - if session is not None: - session.close() + session.close() async def do_state_init_cbs(self): if self.state_initialized: diff --git a/src/dexorder/transaction.py b/src/dexorder/transaction.py index 8f645b5..6579af5 100644 --- a/src/dexorder/transaction.py +++ b/src/dexorder/transaction.py @@ -37,6 +37,12 @@ def submit_transaction_request(tr: TransactionRequest): return job +async def create_and_send_transactions(): + """ called by the Runner after the events have all been processed and the db committed """ + await create_transactions() + await send_transactions() + + async def create_transactions(): for job in db.session.query(TransactionJob).filter( TransactionJob.chain == current_chain.get(), @@ -76,6 +82,7 @@ async def send_transactions(): sent = await w3.eth.send_raw_transaction(job.tx.data) assert sent == job.tx.id job.state = TransactionJobState.Sent + db.session.add(job) async def handle_transaction_receipts(): diff --git a/src/dexorder/walker.py b/src/dexorder/walker.py index ee5b9fb..1210b1c 100644 --- a/src/dexorder/walker.py +++ b/src/dexorder/walker.py @@ -79,10 +79,8 @@ class BlockWalker (BlockProgressor): while processed_height < promotion_height: cur_height = min(promotion_height, processed_height+batch_size-1) block_data = await w3.eth.get_block(cur_height) - height = block_data['number'] - assert height == cur_height - block = Block(chain=chain.chain_id, height=cur_height, hash=(block_data['hash']), - parent=(block_data['parentHash']), data=block_data) + block = Block.from_data(chain_id, block_data) + assert block.height == cur_height current_block.set(block) await self.handle(processed_height, cur_height, chain=chain, w3=w3) if self.flush_delay is None or \