retry after known errors; BlockClock fix
This commit is contained in:
@@ -1,5 +1,7 @@
|
||||
import logging
|
||||
import math
|
||||
from contextvars import ContextVar
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import dexorder
|
||||
|
||||
@@ -60,15 +62,23 @@ class Clock:
|
||||
|
||||
|
||||
class BlockClock (Clock):
|
||||
def __init__(self, block_timestamp=0, adjustment=None):
|
||||
self.block_timestamp = block_timestamp if block_timestamp != 0 else dexorder.timestamp()
|
||||
self.adjustment = 0 if block_timestamp == 0 \
|
||||
else adjustment if adjustment is not None \
|
||||
else block_timestamp - dexorder.timestamp()
|
||||
"""
|
||||
This clock estimates the blockchain timestamp by recording the most recent difference from the real time.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.adjustment = 0.
|
||||
|
||||
# noinspection PyAttributeOutsideInit
|
||||
def update(self, block_timestamp: int):
|
||||
now = datetime.now().timestamp()
|
||||
self.adjustment = block_timestamp - now
|
||||
# logging.getLogger(__name__).debug(f'blocktime {datetime.fromtimestamp(block_timestamp,tz=timezone.utc)} {block_timestamp} - now {now} = {self.adjustment}')
|
||||
|
||||
@property
|
||||
def timestamp(self):
|
||||
return math.ceil(dexorder.timestamp() + self.adjustment)
|
||||
# todo add the mean block time to produce the expected timestamp of the next block
|
||||
return round(datetime.now().timestamp() + self.adjustment)
|
||||
|
||||
|
||||
class SystemClock (Clock):
|
||||
|
||||
@@ -79,6 +79,13 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
|
||||
if trig is not None:
|
||||
trig.slash()
|
||||
|
||||
def retry():
|
||||
trig = get_trigger()
|
||||
if trig is None:
|
||||
log.warning(f'Trying to touch a nonexistent trigger for tranche {tk}')
|
||||
else:
|
||||
trig.touch()
|
||||
|
||||
#
|
||||
# execute() error handling
|
||||
#
|
||||
@@ -88,6 +95,7 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
|
||||
# Insufficient Input Amount
|
||||
token = order.order.tokenIn
|
||||
log.debug(f'insufficient funds {tk.vault} {token} ')
|
||||
retry()
|
||||
elif error == 'SPL':
|
||||
# todo tight slippage can cause excessive executions as the backend repeatedly retries the remainder. The symptom is error 'SPL'.
|
||||
# Square-root price limit from Uniswap means we asked for a limit price that isn't met. This is a fault of
|
||||
@@ -109,12 +117,13 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
|
||||
# from UniswapV3 SwapRouter when not even 1 satoshi of output was gained
|
||||
log.debug('warning: de minimis liquidity in pool')
|
||||
slash()
|
||||
retry()
|
||||
elif error == 'RL':
|
||||
log.debug(f'tranche {tk} execution failed due to "RL" rate limit')
|
||||
pass
|
||||
retry()
|
||||
elif error == 'TE':
|
||||
log.debug(f'tranche {tk} execution failed due to "TE" too early')
|
||||
pass
|
||||
retry()
|
||||
elif error == 'TL':
|
||||
log.debug(f'tranche {tk} execution failed due to "TL" too late')
|
||||
try:
|
||||
@@ -123,10 +132,10 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
|
||||
pass
|
||||
elif error == 'LL':
|
||||
log.debug(f'tranche {tk} execution failed due to "LL" lower limit')
|
||||
pass
|
||||
retry()
|
||||
elif error == 'LU':
|
||||
log.debug(f'tranche {tk} execution failed due to "LU" upper limit')
|
||||
pass
|
||||
retry()
|
||||
elif error == 'OVR':
|
||||
log.warning(f'tranche {tk} execution failed due to "OVR" overfilled')
|
||||
# this should never happen. Shut down the entire order.
|
||||
|
||||
@@ -97,6 +97,7 @@ def start_trigger_updates():
|
||||
"""
|
||||
Called near the beginning of block handling to initialize any per-block trigger data structures
|
||||
"""
|
||||
# log.debug(f'current clock is {current_clock.get().timestamp} vs {timestamp()} => Δ{current_clock.get().timestamp-timestamp():.1f}s')
|
||||
TimeTrigger.update_all(current_clock.get().timestamp)
|
||||
PriceLineTrigger.clear_data()
|
||||
|
||||
@@ -514,13 +515,16 @@ class TrancheTrigger:
|
||||
self.slash_count = 0 # reset slash count
|
||||
return filled
|
||||
|
||||
def touch(self):
|
||||
_dirty.add(self.tk)
|
||||
|
||||
def check_expire(self):
|
||||
# if the expiration constraint has become False then the tranche can never execute again
|
||||
if self.expiration_trigger is not None and self.expiration_trigger:
|
||||
OrderTriggers.instances[self.tk.order_key].expire_tranche(self.tk.tranche_index)
|
||||
|
||||
def expire(self):
|
||||
if self.status == TrancheState.Expired:
|
||||
if self.closed:
|
||||
return
|
||||
order_log.debug(f'tranche expired {self.tk}')
|
||||
self.status = TrancheState.Expired
|
||||
|
||||
@@ -8,7 +8,7 @@ from eth_bloom import BloomFilter
|
||||
# noinspection PyPackageRequirements
|
||||
from websockets.exceptions import ConnectionClosedError
|
||||
|
||||
from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now
|
||||
from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now, timestamp
|
||||
from dexorder.base.block import Block, latest_block
|
||||
from dexorder.base.chain import current_chain, current_clock, BlockClock
|
||||
from dexorder.blockchain.connection import create_w3_ws, create_w3
|
||||
@@ -61,6 +61,7 @@ class BlockStateRunner(BlockProgressor):
|
||||
self.running = False
|
||||
|
||||
async def run(self):
|
||||
current_clock.set(BlockClock())
|
||||
# this run() process discovers new heads and puts them on a queue for the worker to process. the discovery is ether websockets or polling
|
||||
self.running = True
|
||||
# this run() process discovers new heads and puts them on a queue for the worker to process
|
||||
@@ -84,10 +85,11 @@ class BlockStateRunner(BlockProgressor):
|
||||
while self.running:
|
||||
async for message in w3ws.ws.process_subscriptions():
|
||||
block = Block(chain_id, message['result'])
|
||||
cache_block(block)
|
||||
latest_block[chain_id] = block
|
||||
self.set_latest_block(block)
|
||||
self.new_head_event.set()
|
||||
log.debug(f'detected new head {block}')
|
||||
if abs(block.timestamp-timestamp()) > 3:
|
||||
log.warning(f'Blockchain {chain_id} time is off by {block.timestamp-timestamp():.1f}s')
|
||||
if not self.running:
|
||||
break
|
||||
await async_yield()
|
||||
@@ -160,7 +162,7 @@ class BlockStateRunner(BlockProgressor):
|
||||
self.state is None or self.state.root_branch is None or self.state.height == block.height):
|
||||
return prev_blockhash
|
||||
log.debug(f'polled new head {block}')
|
||||
latest_block[chain.id] = block
|
||||
self.set_latest_block(block)
|
||||
# prefetch the head's ancestors
|
||||
if self.state is not None and self.state.root_branch is not None:
|
||||
if self.state.root_branch.height >= block.height - chain.confirms * 2:
|
||||
@@ -215,7 +217,6 @@ class BlockStateRunner(BlockProgressor):
|
||||
w3 = current_w3.get()
|
||||
chain = current_chain.get()
|
||||
assert chain.id == await w3.eth.chain_id
|
||||
current_clock.set(BlockClock())
|
||||
while self.running:
|
||||
try:
|
||||
await asyncio.wait_for(self.new_head_event.wait(), timeout=1) # todo configure
|
||||
@@ -388,3 +389,10 @@ class BlockStateRunner(BlockProgressor):
|
||||
# noinspection PyCallingNonCallable
|
||||
await maywait(cb())
|
||||
self.state_initialized = True
|
||||
|
||||
@staticmethod
|
||||
def set_latest_block(block):
|
||||
cache_block(block)
|
||||
latest_block[block.chain_id] = block
|
||||
current_clock.get().update(block.timestamp)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user