diff --git a/src/dexorder/base/chain.py b/src/dexorder/base/chain.py index 37d26fc..108d06c 100644 --- a/src/dexorder/base/chain.py +++ b/src/dexorder/base/chain.py @@ -1,5 +1,7 @@ +import logging import math from contextvars import ContextVar +from datetime import datetime, timezone import dexorder @@ -60,15 +62,23 @@ class Clock: class BlockClock (Clock): - def __init__(self, block_timestamp=0, adjustment=None): - self.block_timestamp = block_timestamp if block_timestamp != 0 else dexorder.timestamp() - self.adjustment = 0 if block_timestamp == 0 \ - else adjustment if adjustment is not None \ - else block_timestamp - dexorder.timestamp() + """ + This clock estimates the blockchain timestamp by recording the most recent difference from the real time. + """ + + def __init__(self): + self.adjustment = 0. + + # noinspection PyAttributeOutsideInit + def update(self, block_timestamp: int): + now = datetime.now().timestamp() + self.adjustment = block_timestamp - now + # logging.getLogger(__name__).debug(f'blocktime {datetime.fromtimestamp(block_timestamp,tz=timezone.utc)} {block_timestamp} - now {now} = {self.adjustment}') @property def timestamp(self): - return math.ceil(dexorder.timestamp() + self.adjustment) + # todo add the mean block time to produce the expected timestamp of the next block + return round(datetime.now().timestamp() + self.adjustment) class SystemClock (Clock): diff --git a/src/dexorder/order/executionhandler.py b/src/dexorder/order/executionhandler.py index 9eebd8e..56aee41 100644 --- a/src/dexorder/order/executionhandler.py +++ b/src/dexorder/order/executionhandler.py @@ -79,6 +79,13 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None): if trig is not None: trig.slash() + def retry(): + trig = get_trigger() + if trig is None: + log.warning(f'Trying to touch a nonexistent trigger for tranche {tk}') + else: + trig.touch() + # # execute() error handling # @@ -88,6 +95,7 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None): # Insufficient Input Amount token = order.order.tokenIn log.debug(f'insufficient funds {tk.vault} {token} ') + retry() elif error == 'SPL': # todo tight slippage can cause excessive executions as the backend repeatedly retries the remainder. The symptom is error 'SPL'. # Square-root price limit from Uniswap means we asked for a limit price that isn't met. This is a fault of @@ -109,12 +117,13 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None): # from UniswapV3 SwapRouter when not even 1 satoshi of output was gained log.debug('warning: de minimis liquidity in pool') slash() + retry() elif error == 'RL': log.debug(f'tranche {tk} execution failed due to "RL" rate limit') - pass + retry() elif error == 'TE': log.debug(f'tranche {tk} execution failed due to "TE" too early') - pass + retry() elif error == 'TL': log.debug(f'tranche {tk} execution failed due to "TL" too late') try: @@ -123,10 +132,10 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None): pass elif error == 'LL': log.debug(f'tranche {tk} execution failed due to "LL" lower limit') - pass + retry() elif error == 'LU': log.debug(f'tranche {tk} execution failed due to "LU" upper limit') - pass + retry() elif error == 'OVR': log.warning(f'tranche {tk} execution failed due to "OVR" overfilled') # this should never happen. Shut down the entire order. diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index 6a4ec67..24e4575 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -97,6 +97,7 @@ def start_trigger_updates(): """ Called near the beginning of block handling to initialize any per-block trigger data structures """ + # log.debug(f'current clock is {current_clock.get().timestamp} vs {timestamp()} => Δ{current_clock.get().timestamp-timestamp():.1f}s') TimeTrigger.update_all(current_clock.get().timestamp) PriceLineTrigger.clear_data() @@ -514,13 +515,16 @@ class TrancheTrigger: self.slash_count = 0 # reset slash count return filled + def touch(self): + _dirty.add(self.tk) + def check_expire(self): # if the expiration constraint has become False then the tranche can never execute again if self.expiration_trigger is not None and self.expiration_trigger: OrderTriggers.instances[self.tk.order_key].expire_tranche(self.tk.tranche_index) def expire(self): - if self.status == TrancheState.Expired: + if self.closed: return order_log.debug(f'tranche expired {self.tk}') self.status = TrancheState.Expired diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 44edd4d..726f43d 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -8,7 +8,7 @@ from eth_bloom import BloomFilter # noinspection PyPackageRequirements from websockets.exceptions import ConnectionClosedError -from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now +from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now, timestamp from dexorder.base.block import Block, latest_block from dexorder.base.chain import current_chain, current_clock, BlockClock from dexorder.blockchain.connection import create_w3_ws, create_w3 @@ -61,6 +61,7 @@ class BlockStateRunner(BlockProgressor): self.running = False async def run(self): + current_clock.set(BlockClock()) # this run() process discovers new heads and puts them on a queue for the worker to process. the discovery is ether websockets or polling self.running = True # this run() process discovers new heads and puts them on a queue for the worker to process @@ -84,10 +85,11 @@ class BlockStateRunner(BlockProgressor): while self.running: async for message in w3ws.ws.process_subscriptions(): block = Block(chain_id, message['result']) - cache_block(block) - latest_block[chain_id] = block + self.set_latest_block(block) self.new_head_event.set() log.debug(f'detected new head {block}') + if abs(block.timestamp-timestamp()) > 3: + log.warning(f'Blockchain {chain_id} time is off by {block.timestamp-timestamp():.1f}s') if not self.running: break await async_yield() @@ -160,7 +162,7 @@ class BlockStateRunner(BlockProgressor): self.state is None or self.state.root_branch is None or self.state.height == block.height): return prev_blockhash log.debug(f'polled new head {block}') - latest_block[chain.id] = block + self.set_latest_block(block) # prefetch the head's ancestors if self.state is not None and self.state.root_branch is not None: if self.state.root_branch.height >= block.height - chain.confirms * 2: @@ -215,7 +217,6 @@ class BlockStateRunner(BlockProgressor): w3 = current_w3.get() chain = current_chain.get() assert chain.id == await w3.eth.chain_id - current_clock.set(BlockClock()) while self.running: try: await asyncio.wait_for(self.new_head_event.wait(), timeout=1) # todo configure @@ -388,3 +389,10 @@ class BlockStateRunner(BlockProgressor): # noinspection PyCallingNonCallable await maywait(cb()) self.state_initialized = True + + @staticmethod + def set_latest_block(block): + cache_block(block) + latest_block[block.chain_id] = block + current_clock.get().update(block.timestamp) +