Compare commits

..

2 Commits

Author SHA1 Message Date
tim
ce55609297 examine open orders 2025-04-07 01:32:19 -04:00
tim
a27300b5e4 info log for websocket connection drops 2025-04-03 18:15:16 -04:00
2 changed files with 46 additions and 28 deletions

View File

@@ -11,17 +11,29 @@ from dexorder.blockstate.fork import current_fork
from dexorder.contract.dexorder import VaultContract from dexorder.contract.dexorder import VaultContract
from dexorder.order.orderstate import Order from dexorder.order.orderstate import Order
from dexorder.tokens import adjust_decimals from dexorder.tokens import adjust_decimals
from dexorder.util import json
from dexorder.vault_blockdata import vault_balances, pretty_balances from dexorder.vault_blockdata import vault_balances, pretty_balances
from dexorder.bin.executable import execute from dexorder.bin.executable import execute
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
async def dump_orders(orders, args):
if args.json:
print(json.dumps([order.status.dump() for order in orders]))
else:
first = True
for order in orders:
if first:
first = False
else:
print()
print(await order.pprint())
def command_vault_argparse(subparsers): def command_vault_argparse(subparsers):
parser = subparsers.add_parser('vault', help='show the vault\'s balances and orders') parser = subparsers.add_parser('vault', help='show the vault\'s balances and orders')
parser.add_argument('address', help='address of the vault') parser.add_argument('address', help='address of the vault')
parser.add_argument('--all', help='show all orders including closed ones', action='store_true') parser.add_argument('--all', help='show all orders including closed ones', action='store_true')
# parser.add_argument('--json', help='output in JSON format', action='store_true') parser.add_argument('--json', help='output in JSON format', action='store_true')
async def command_vault(args): async def command_vault(args):
balances = vault_balances.get(args.address, {}) balances = vault_balances.get(args.address, {})
@@ -30,6 +42,7 @@ async def command_vault(args):
print(pretty_balances({k: (await adjust_decimals(k, v)) for k, v in balances.items()})) print(pretty_balances({k: (await adjust_decimals(k, v)) for k, v in balances.items()}))
print(f'Orders:') print(f'Orders:')
i = 0 i = 0
orders = []
while True: while True:
key = OrderKey(args.address, i) key = OrderKey(args.address, i)
try: try:
@@ -37,16 +50,19 @@ async def command_vault(args):
except KeyError: except KeyError:
break break
if args.all or order.is_open: if args.all or order.is_open:
print(await order.pprint()) orders.append(order)
i += 1 i += 1
await dump_orders(orders, args)
def command_open_argparse(subparsers):
parser = subparsers.add_parser('open', help='show all open orders')
parser.add_argument('--json', help='output in JSON format', action='store_true')
async def command_open(args):
await dump_orders([Order.of(key) for key in Order.open_orders], args)
# for key in Order.open_orders:
# order = Order.of(key)
# if args.json:
# print(json.dumps(order.status.dump()))
# else:
# print()
# print(order)
async def main(args: list): async def main(args: list):
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()

View File

@@ -5,7 +5,6 @@ from datetime import timedelta
from typing import Any, Iterable, Callable, Optional from typing import Any, Iterable, Callable, Optional
from eth_bloom import BloomFilter from eth_bloom import BloomFilter
# noinspection PyPackageRequirements
from websockets.exceptions import ConnectionClosedError from websockets.exceptions import ConnectionClosedError
from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now, timestamp, metric from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now, timestamp, metric
@@ -81,8 +80,7 @@ class BlockStateRunner(BlockProgressor):
async with w3ws as w3ws: async with w3ws as w3ws:
log.debug('connecting to ws provider') log.debug('connecting to ws provider')
await w3ws.provider.connect() await w3ws.provider.connect()
subscription = await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc. await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc.
# log.debug(f'subscribed to newHeads {subscription}')
while self.running: while self.running:
async for message in w3ws.ws.process_subscriptions(): async for message in w3ws.ws.process_subscriptions():
block = Block(chain_id, message['result']) block = Block(chain_id, message['result'])
@@ -94,11 +92,15 @@ class BlockStateRunner(BlockProgressor):
if not self.running: if not self.running:
break break
await async_yield() await async_yield()
except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e: except (TimeoutError, asyncio.TimeoutError) as e:
log.debug(f'runner timeout {e}') log.debug(f'runner timeout {e}')
except ConnectionClosedError as e:
log.info(f'websocket connection closed {e}')
except ConnectionRefusedError: except ConnectionRefusedError:
log.warning(f'Could not connect to websocket {config.ws_url}') log.warning(f'Could not connect to websocket {config.ws_url}')
await asyncio.sleep(1) await asyncio.sleep(1)
except StopAsyncIteration:
log.info(f'websocket stream ended')
except Exception: except Exception:
log.exception(f'Unhandled exception during run_ws()') log.exception(f'Unhandled exception during run_ws()')
finally: finally:
@@ -397,21 +399,21 @@ class BlockStateRunner(BlockProgressor):
# propragate to the DB or Redis. # propragate to the DB or Redis.
# TIME TICKS ARE DISABLED FOR THIS REASON # TIME TICKS ARE DISABLED FOR THIS REASON
return return
current_fork.set(fork) # current_fork.set(fork)
session = db.session # session = db.session
session.begin() # session.begin()
try: # try:
for callback, on_timer in self.callbacks: # for callback, on_timer in self.callbacks:
if on_timer: # if on_timer:
# noinspection PyCallingNonCallable # # noinspection PyCallingNonCallable
await maywait(callback()) # await maywait(callback())
except BaseException: # except BaseException:
session.rollback() # session.rollback()
raise # raise
else: # else:
session.commit() # session.commit()
finally: # finally:
db.close_session() # db.close_session()
async def do_state_init_cbs(self): async def do_state_init_cbs(self):