transaction manager separation; get_block() caching

This commit is contained in:
Tim
2024-03-27 12:05:11 -04:00
parent da5f921953
commit f22f8bf017
12 changed files with 81 additions and 48 deletions

2
bin/RESET_DB.sh Executable file
View File

@@ -0,0 +1,2 @@
#!/bin/bash
alembic downgrade base && alembic upgrade head

View File

@@ -9,7 +9,7 @@ from dexorder import from_timestamp, blockchain, config
from dexorder.addrmeta import address_metadata from dexorder.addrmeta import address_metadata
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.bin.executable import execute from dexorder.bin.executable import execute
from dexorder.blocktime import get_block_timestamp from dexorder.blocks import get_block_timestamp
from dexorder.configuration import parse_args from dexorder.configuration import parse_args
from dexorder.contract import get_contract_event from dexorder.contract import get_contract_event
from dexorder.database.model.block import current_block, latest_block from dexorder.database.model.block import current_block, latest_block

View File

@@ -66,8 +66,6 @@ def setup_logevent_triggers(runner):
runner.postprocess_cbs.append(activate_price_triggers) runner.postprocess_cbs.append(activate_price_triggers)
runner.postprocess_cbs.append(process_active_tranches) runner.postprocess_cbs.append(process_active_tranches)
runner.postprocess_cbs.append(process_execution_requests) runner.postprocess_cbs.append(process_execution_requests)
runner.postprocess_cbs.append(create_transactions)
runner.postprocess_cbs.append(send_transactions)
# noinspection DuplicatedCode # noinspection DuplicatedCode

View File

@@ -3,19 +3,26 @@ import logging
from async_lru import alru_cache from async_lru import alru_cache
from dexorder import current_w3 from dexorder import current_w3
from dexorder.base.chain import current_chain
from dexorder.blockstate import current_blockstate from dexorder.blockstate import current_blockstate
from dexorder.database.model import Block
from dexorder.util import hexint from dexorder.util import hexint
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@alru_cache(maxsize=1024)
async def get_block_timestamp(blockhash) -> int: async def get_block_timestamp(blockhash) -> int:
block = await get_block(blockhash)
return block.timestamp
@alru_cache(maxsize=128)
async def get_block(blockhash) -> Block:
# first look in the state
try: try:
return current_blockstate.get().by_hash[blockhash].timestamp return current_blockstate.get().by_hash[blockhash]
except (LookupError, KeyError): except (LookupError, KeyError):
pass pass
# otherwise query
response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False]) response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False])
raw = hexint(response['result']['timestamp']) return Block.from_data(current_chain.get().chain_id, response['result'])
# noinspection PyTypeChecker
return raw if type(raw) is int else hexint(raw)

View File

@@ -71,13 +71,27 @@ class Db:
def session(self) -> Session: def session(self) -> Session:
s = _session.get() s = _session.get()
if s is None: if s is None:
engine = _engine.get() s = self.make_session()
if engine is None:
raise RuntimeError('Cannot create session: no database engine set. Use dexorder.db.connect() first')
s = Session(engine, expire_on_commit=False, autoflush=False, autocommit=False)
_session.set(s)
return s return s
@staticmethod
def make_session(**kwargs) -> Session:
engine = _engine.get()
if engine is None:
raise RuntimeError('Cannot create session: no database engine set. Use dexorder.db.connect() first')
kwargs.setdefault('expire_on_commit', False)
s = Session(engine, **kwargs)
_session.set(s)
return s
@staticmethod
def close_session():
s = _session.get()
if s is not None:
s.close()
# noinspection PyTypeChecker
_session.set(None)
# noinspection PyShadowingNames # noinspection PyShadowingNames
def connect(self, url=None, migrate=True, reconnect=False, dump_sql=None): def connect(self, url=None, migrate=True, reconnect=False, dump_sql=None):
if _engine.get() is not None and not reconnect: if _engine.get() is not None and not reconnect:

View File

@@ -11,8 +11,9 @@ class Block(Base):
@staticmethod @staticmethod
def from_data(chain_id:int, data:dict): def from_data(chain_id:int, data:dict):
return Block(chain=chain_id, height=data['number'], hash=hexstr(data['hash']), """ Builds a Block using the response data from an RPC server """
parent=hexstr(data['parentHash']), data=data) return Block(chain=chain_id, height=int(data['number'],0),
hash=hexstr(data['hash']), parent=hexstr(data['parentHash']), data=data)
chain: Mapped[int] = mapped_column(primary_key=True) chain: Mapped[int] = mapped_column(primary_key=True)
height: Mapped[int] = mapped_column(primary_key=True) height: Mapped[int] = mapped_column(primary_key=True)

View File

@@ -8,7 +8,7 @@ from web3.types import EventData
from dexorder import current_pub, db, from_timestamp, minutely from dexorder import current_pub, db, from_timestamp, minutely
from dexorder.base.chain import current_chain, current_clock from dexorder.base.chain import current_chain, current_clock
from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey
from dexorder.blocktime import get_block_timestamp from dexorder.blocks import get_block_timestamp
from dexorder.ohlc import ohlcs, recent_ohlcs from dexorder.ohlc import ohlcs, recent_ohlcs
from dexorder.transaction import submit_transaction_request from dexorder.transaction import submit_transaction_request
from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data
@@ -287,6 +287,10 @@ def finish_execution_request(req: TrancheExecutionRequest, error: str):
log.error(f'Could not get order {OrderKey(req.vault, req.order_index)}') log.error(f'Could not get order {OrderKey(req.vault, req.order_index)}')
return return
tk = TrancheKey(req.vault, req.order_index, req.tranche_index) tk = TrancheKey(req.vault, req.order_index, req.tranche_index)
try:
del execution_requests[tk]
except KeyError:
pass
if error != '': if error != '':
log.debug(f'execution request for tranche {tk} had error "{error}"') log.debug(f'execution request for tranche {tk} had error "{error}"')
if error == '': if error == '':
@@ -313,13 +317,6 @@ def finish_execution_request(req: TrancheExecutionRequest, error: str):
# todo dont keep trying # todo dont keep trying
else: else:
log.error(f'Unhandled execution error for transaction request {req} ERROR: "{error}"') log.error(f'Unhandled execution error for transaction request {req} ERROR: "{error}"')
try:
er = execution_requests[tk]
except KeyError:
pass
else:
if er.height < current_block.get().height:
del execution_requests[tk]
last_ohlc_rollover = 0 last_ohlc_rollover = 0

View File

@@ -168,7 +168,7 @@ class TrancheTrigger:
cur = pool_prices[addr] cur = pool_prices[addr]
if cur is not None: if cur is not None:
if self.pool_price_multiplier is None: if self.pool_price_multiplier is None:
self.pool_price_multiplier = dec(10) ** dec(-pool['decimals']) self.pool_price_multiplier = dec(10) ** dec(pool['decimals'])
log.debug(f'adjusted cur price from {cur} => {cur*self.pool_price_multiplier}') log.debug(f'adjusted cur price from {cur} => {cur*self.pool_price_multiplier}')
cur *= self.pool_price_multiplier cur *= self.pool_price_multiplier
if cur is None or not self.has_line_constraint or all(await asyncio.gather( if cur is None or not self.has_line_constraint or all(await asyncio.gather(

View File

@@ -12,7 +12,7 @@ from dexorder.base.chain import current_chain
from dexorder.base.orderlib import Exchange from dexorder.base.orderlib import Exchange
from dexorder.blockstate import BlockDict from dexorder.blockstate import BlockDict
from dexorder.blockstate.blockdata import K, V from dexorder.blockstate.blockdata import K, V
from dexorder.blocktime import get_block_timestamp from dexorder.blocks import get_block_timestamp
from dexorder.database.model.pool import PoolDict from dexorder.database.model.pool import PoolDict
from dexorder.metadata import is_generating_metadata from dexorder.metadata import is_generating_metadata
from dexorder.tokens import get_token from dexorder.tokens import get_token

View File

@@ -12,11 +12,13 @@ from dexorder import Blockchain, db, current_pub, async_yield, current_w3, confi
from dexorder.base.chain import current_chain, current_clock, BlockClock from dexorder.base.chain import current_chain, current_clock, BlockClock
from dexorder.base.fork import current_fork, Fork, DisjointFork from dexorder.base.fork import current_fork, Fork, DisjointFork
from dexorder.blockchain.connection import create_w3_ws, create_w3 from dexorder.blockchain.connection import create_w3_ws, create_w3
from dexorder.blocks import get_block
from dexorder.blockstate import BlockState, current_blockstate from dexorder.blockstate import BlockState, current_blockstate
from dexorder.blockstate.diff import DiffEntryItem from dexorder.blockstate.diff import DiffEntryItem
from dexorder.database.model import Block from dexorder.database.model import Block
from dexorder.database.model.block import current_block, latest_block from dexorder.database.model.block import current_block, latest_block
from dexorder.progressor import BlockProgressor from dexorder.progressor import BlockProgressor
from dexorder.transaction import create_and_send_transactions
from dexorder.util import hexstr, hexint, hexbytes from dexorder.util import hexstr, hexint, hexbytes
from dexorder.util.async_util import maywait, Maywaitable from dexorder.util.async_util import maywait, Maywaitable
from dexorder.util.shutdown import fatal from dexorder.util.shutdown import fatal
@@ -113,7 +115,7 @@ class BlockStateRunner(BlockProgressor):
async for message in w3ws.ws.process_subscriptions(): async for message in w3ws.ws.process_subscriptions():
head = message['result'] head = message['result']
log.debug(f'detected new block {head["number"]} {hexstr(head["hash"])}') log.debug(f'detected new block {head["number"]} {hexstr(head["hash"])}')
await self.add_head(head["hash"]) await self.add_head(head)
if not self.running: if not self.running:
break break
await async_yield() await async_yield()
@@ -186,13 +188,9 @@ class BlockStateRunner(BlockProgressor):
blockhash = block_data['hash'] blockhash = block_data['hash']
parent = block_data['parentHash'] parent = block_data['parentHash']
height = block_data['number'] height = block_data['number']
head = Block(chain=chain.chain_id, height=height, hash=blockhash, parent=parent, data=block_data)
except TypeError: except TypeError:
blockhash = head head = await get_block(head)
response = await w3.provider.make_request('eth_getBlockByHash', [blockhash, False])
block_data:dict = response['result']
parent = bytes.fromhex(block_data['parentHash'][2:])
height = int(block_data['number'], 0)
head = Block(chain=chain.chain_id, height=height, hash=blockhash, parent=parent, data=block_data)
latest_block.set(head) latest_block.set(head)
if self.state or config.backfill: if self.state or config.backfill:
@@ -260,6 +258,7 @@ class BlockStateRunner(BlockProgressor):
log.debug(f'handle_head {block.height} {hexstr(block.hash)}') log.debug(f'handle_head {block.height} {hexstr(block.hash)}')
session = None session = None
batches = [] batches = []
pubs = []
try: try:
if self.state is not None and block.hash in self.state.by_hash: if self.state is not None and block.hash in self.state.by_hash:
log.debug(f'block {block.hash} was already processed') log.debug(f'block {block.hash} was already processed')
@@ -306,10 +305,9 @@ class BlockStateRunner(BlockProgressor):
# set up for callbacks # set up for callbacks
current_block.set(block) current_block.set(block)
current_fork.set(fork) current_fork.set(fork)
session = db.session session = db.make_session(autocommit=False)
session.begin() session.begin()
session.add(block) session.add(block)
pubs = []
current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args))) # used by handle_vault_created current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args))) # used by handle_vault_created
if not self.state_initialized: if not self.state_initialized:
await self.do_state_init_cbs() await self.do_state_init_cbs()
@@ -352,11 +350,6 @@ class BlockStateRunner(BlockProgressor):
# todo try/except for known retryable errors # todo try/except for known retryable errors
# noinspection PyCallingNonCallable # noinspection PyCallingNonCallable
await maywait(callback(self.state.root_block, diff_items)) await maywait(callback(self.state.root_block, diff_items))
# publish messages
if pubs and self.publish_all:
# noinspection PyCallingNonCallable
await maywait(self.publish_all(pubs))
except: # legitimately catch EVERYTHING because we re-raise except: # legitimately catch EVERYTHING because we re-raise
log.debug('rolling back session') log.debug('rolling back session')
if session is not None: if session is not None:
@@ -374,11 +367,28 @@ class BlockStateRunner(BlockProgressor):
raise raise
else: else:
if session is not None: if session is not None:
session.commit() db.session.commit()
# manage transactions in a separate database session
# todo separate out the transaction manager completely from runner
try:
await create_and_send_transactions()
except:
db.session.rollback()
raise
else:
db.session.commit()
finally:
db.close_session()
# publish messages
if pubs and self.publish_all:
# noinspection PyCallingNonCallable
await maywait(self.publish_all(pubs))
log.info(f'completed block {block}') log.info(f'completed block {block}')
finally: finally:
if session is not None: db.close_session()
session.close()
async def handle_time_tick(self, block): async def handle_time_tick(self, block):
@@ -399,8 +409,7 @@ class BlockStateRunner(BlockProgressor):
else: else:
session.commit() session.commit()
finally: finally:
if session is not None: session.close()
session.close()
async def do_state_init_cbs(self): async def do_state_init_cbs(self):
if self.state_initialized: if self.state_initialized:

View File

@@ -37,6 +37,12 @@ def submit_transaction_request(tr: TransactionRequest):
return job return job
async def create_and_send_transactions():
""" called by the Runner after the events have all been processed and the db committed """
await create_transactions()
await send_transactions()
async def create_transactions(): async def create_transactions():
for job in db.session.query(TransactionJob).filter( for job in db.session.query(TransactionJob).filter(
TransactionJob.chain == current_chain.get(), TransactionJob.chain == current_chain.get(),
@@ -76,6 +82,7 @@ async def send_transactions():
sent = await w3.eth.send_raw_transaction(job.tx.data) sent = await w3.eth.send_raw_transaction(job.tx.data)
assert sent == job.tx.id assert sent == job.tx.id
job.state = TransactionJobState.Sent job.state = TransactionJobState.Sent
db.session.add(job)
async def handle_transaction_receipts(): async def handle_transaction_receipts():

View File

@@ -79,10 +79,8 @@ class BlockWalker (BlockProgressor):
while processed_height < promotion_height: while processed_height < promotion_height:
cur_height = min(promotion_height, processed_height+batch_size-1) cur_height = min(promotion_height, processed_height+batch_size-1)
block_data = await w3.eth.get_block(cur_height) block_data = await w3.eth.get_block(cur_height)
height = block_data['number'] block = Block.from_data(chain_id, block_data)
assert height == cur_height assert block.height == cur_height
block = Block(chain=chain.chain_id, height=cur_height, hash=(block_data['hash']),
parent=(block_data['parentHash']), data=block_data)
current_block.set(block) current_block.set(block)
await self.handle(processed_height, cur_height, chain=chain, w3=w3) await self.handle(processed_height, cur_height, chain=chain, w3=w3)
if self.flush_delay is None or \ if self.flush_delay is None or \