diff --git a/bin/examine b/bin/examine new file mode 100755 index 0000000..ec96f31 --- /dev/null +++ b/bin/examine @@ -0,0 +1,15 @@ +#!/bin/bash + +kubectl port-forward postgres-0 5431:5432 & +PF_PID=$! + +shutdown () { + kill $PF_PID + wait +} + +trap shutdown INT TERM + +PYTHONPATH=src python -m dexorder.bin.examine rpc_url=arbitrum_dxod db_url=postgres://dexorder@localhost:5431/dexorder "$@" + +shutdown diff --git a/src/dexorder/__init__.py b/src/dexorder/__init__.py index da60f96..e253e2d 100644 --- a/src/dexorder/__init__.py +++ b/src/dexorder/__init__.py @@ -59,7 +59,7 @@ _cwd() # do this first so that config has the right current working directory # ordering here is important! from .base.chain import Blockchain # the singletons are loaded into the dexorder.blockchain.* namespace -from .util import async_yield +from .util import async_yield, json from .base.fixed import Fixed2, FixedDecimals, Dec18 from .configuration import config from .base.account import Account diff --git a/src/dexorder/base/orderlib.py b/src/dexorder/base/orderlib.py index 2e14ba3..38a5d92 100644 --- a/src/dexorder/base/orderlib.py +++ b/src/dexorder/base/orderlib.py @@ -4,7 +4,7 @@ from dataclasses import dataclass from enum import Enum from typing import Optional -from dexorder import timestamp +from dexorder import timestamp, from_timestamp from dexorder.util import hexbytes from dexorder.util.convert import decode_IEEE754 @@ -250,6 +250,26 @@ class ElaboratedSwapOrderStatus: def copy(self): return copy.deepcopy(self) + def __str__(self): + msg = f''' +SwapOrder + status: {self.state.name} + in: {self.order.tokenIn} + out: {self.order.tokenOut} + exchange: {self.order.route.exchange.name, self.order.route.fee} + amount: {"input" if self.order.amountIsInput else "output"} {self.filledIn if self.order.amountIsInput else self.filledOut}/{self.order.amount}{" to owner" if self.order.outputDirectlyToOwner else ""} + minFill: {self.order.minFillAmount} + inverted: {self.order.inverted} + tranches: +''' + for i in range(len(self.trancheStatus)): + tranche = self.order.tranches[i] + ts = self.trancheStatus[i] + msg += f' {tranche}\n' + for fill in ts.fills: + msg += f' {fill}\n' + return msg + NO_OCO = 18446744073709551615 # max uint64 @@ -344,7 +364,7 @@ class Tranche: ) def __str__(self): - msg = f'{self.fraction/MAX_FRACTION:.1%} {"start+" if self.startTimeIsRelative else ""}{self.startTime} to {"start+" if self.startTimeIsRelative else ""}{self.endTime}' + msg = f'{self.fraction/MAX_FRACTION:.1%} {"start+" if self.startTimeIsRelative else ""}{from_timestamp(self.startTime)} to {"start+" if self.startTimeIsRelative else ""}{from_timestamp(self.endTime)}' if self.marketOrder: # for marketOrders, minLine.intercept is the slippage msg += f' market order slippage {self.minLine.intercept:.2%}' diff --git a/src/dexorder/bin/examine.py b/src/dexorder/bin/examine.py new file mode 100644 index 0000000..751e86c --- /dev/null +++ b/src/dexorder/bin/examine.py @@ -0,0 +1,77 @@ +import argparse +import logging + +from dexorder import db, blockchain +from dexorder.base.order import OrderKey +from dexorder.blocks import current_block, get_block +from dexorder.blockstate import current_blockstate +from dexorder.blockstate.blockdata import BlockData +from dexorder.blockstate.db_state import DbState +from dexorder.contract.dexorder import VaultContract +from dexorder.order.orderstate import Order +from dexorder.tokens import adjust_decimals +from dexorder.vault_blockdata import vault_balances, pretty_balances +from dexorder.bin.executable import execute + +log = logging.getLogger(__name__) + + +def command_vault_argparse(subparsers): + parser = subparsers.add_parser('vault', help='show the vault\'s balances and orders') + parser.add_argument('address', help='address of the vault') + parser.add_argument('--all', help='show all orders including closed ones', action='store_true') + # parser.add_argument('--json', help='output in JSON format', action='store_true') + +async def command_vault(args): + balances = vault_balances.get(args.address, {}) + print(f'Vault {args.address} v{await VaultContract(args.address).version()}') + print(f'Balances:') + print(pretty_balances({k: (await adjust_decimals(k, v)) for k, v in balances.items()})) + print(f'Orders:') + i = 0 + while True: + key = OrderKey(args.address, i) + try: + order = Order.of(key) + except KeyError: + break + if args.all or order.is_open: + print(await order.pprint()) + i += 1 + + # for key in Order.open_orders: + # order = Order.of(key) + # if args.json: + # print(json.dumps(order.status.dump())) + # else: + # print() + # print(order) + +async def main(args: list): + parser = argparse.ArgumentParser() + parser.add_argument('--chain-id', default=None) + subparsers = parser.add_subparsers(dest='command') + for name in globals(): + if name.startswith('command_') and name.endswith('_argparse'): + globals()[name](subparsers) + parsed = parser.parse_args(args) + print(parsed) + try: + subcommand = globals()[f'command_{parsed.command}'] + except KeyError: + parser.print_help() + exit(1) + await blockchain.connect() + db.connect() + db_state = DbState(BlockData.by_opt('db')) + with db.transaction(): + state = await db_state.load() + state.readonly = True + current_blockstate.set(state) + block = await get_block(state.root_hash) + current_block.set(block) + await subcommand(parsed) + + +if __name__ == '__main__': + execute(main, parse_args=True) diff --git a/src/dexorder/bin/executable.py b/src/dexorder/bin/executable.py index a8ecbc7..a85fdfa 100644 --- a/src/dexorder/bin/executable.py +++ b/src/dexorder/bin/executable.py @@ -33,12 +33,12 @@ def split_args(): omegaconf_args = [] regular_args = [] for arg in sys.argv[1:]: - if '=' in arg: + if '=' in arg and not arg.startswith('--'): key, value = arg.split('=', 1) if hasattr(Config, key): omegaconf_args.append(arg) - else: - regular_args.append(arg) + continue + regular_args.append(arg) return omegaconf_args, regular_args @@ -67,12 +67,10 @@ def execute(main:Callable[...,Coroutine[Any,Any,Any]], shutdown=None, *, parse_l log.info('Logging configured to default') xconf = None if parse_args: - if callable(parse_args) or isinstance(parse_args, type): - omegaconf_args, regular_args = split_args() - else: - omegaconf_args = None # NOTE: there is special command-line argument handling in config/load.py to get a config filename. # The -c/--config flag MUST BE FIRST if present. + # The rest of the arguments are split by format into key=value for omegaconf and anything else is "regular args" + omegaconf_args, regular_args = split_args() configuration.parse_args(omegaconf_args) # must check for `type` before `callable`, because types are also callables if isinstance(parse_args, type): @@ -81,6 +79,9 @@ def execute(main:Callable[...,Coroutine[Any,Any,Any]], shutdown=None, *, parse_l elif callable(parse_args): # noinspection PyUnboundLocalVariable xconf = parse_args(regular_args) + else: + # just pass the regular args to main + xconf = regular_args init_alerts() diff --git a/src/dexorder/blockstate/blockdata.py b/src/dexorder/blockstate/blockdata.py index ccb83cf..db62b1e 100644 --- a/src/dexorder/blockstate/blockdata.py +++ b/src/dexorder/blockstate/blockdata.py @@ -64,7 +64,7 @@ class BlockData (Generic[T]): if self.lazy_getitem: lazy = self.lazy_getitem(self, item) if lazy is not NARG: - state.set(state.root_fork, self.series, item, lazy) + state.set(state.root_fork, self.series, item, lazy, readonly_override=True) result = lazy if result is NARG: raise KeyError diff --git a/src/dexorder/blockstate/state.py b/src/dexorder/blockstate/state.py index ea972a4..625d89c 100644 --- a/src/dexorder/blockstate/state.py +++ b/src/dexorder/blockstate/state.py @@ -53,7 +53,10 @@ class BlockState: with a diff height of the root branch or older is always part of the finalized blockchain. """ + class ReadOnlyError(Exception): ... + def __init__(self): + self.readonly = False self._root_branch: Optional[Branch] = None self._root_fork: Optional[Fork] = None self.height: int = 0 # highest branch seen @@ -80,6 +83,8 @@ class BlockState: @root_branch.setter def root_branch(self, value: Branch): + if self.readonly: + raise self.ReadOnlyError() self._root_branch = value self._root_fork = Fork([value]) @@ -92,6 +97,8 @@ class BlockState: return self._root_branch.head def init_root_block(self, root_block: Block) -> Fork: + if self.readonly: + raise self.ReadOnlyError() assert self.root_branch is None return self.add_branch(Branch.from_block(root_block)) @@ -113,6 +120,8 @@ class BlockState: should only be set to False when it is assured that the branch may be joined by height alone, because the branch join is known to be at a live-blockchain-finalized height. """ + if self.readonly: + raise self.ReadOnlyError() assert branch.id not in self.branches_by_id if self.root_branch is None: @@ -155,6 +164,8 @@ class BlockState: def remove_branch(self, branch: Branch, *, remove_series_diffs=True): + if self.readonly: + raise self.ReadOnlyError() if branch.height == self.height and len(self.branches_by_height[branch.height]) == 1: # this is the only branch at this height: compute the new lower height other_heights = [b.height for b in self.branches_by_id.values() if b is not branch] @@ -210,7 +221,9 @@ class BlockState: return DELETE - def set(self, fork: Fork, series, key, value, overwrite=True): + def set(self, fork: Fork, series, key, value, overwrite=True, *, readonly_override=False): + if not readonly_override and self.readonly: + raise self.ReadOnlyError() # first look for an existing value branch = fork.branch diffs = self.diffs_by_series.get(series,{}).get(key) @@ -236,6 +249,8 @@ class BlockState: return old_value def unload(self, fork: Optional[Fork], series, key): + if self.readonly: + raise self.ReadOnlyError() self.unloads[fork.branch_id].append((series, key)) def iteritems(self, fork: Optional[Fork], series): @@ -285,6 +300,8 @@ class BlockState: Returns the set of diffs for the promoted fork. """ + if self.readonly: + raise self.ReadOnlyError() found_root = False promotion_branches = [] for branch in reversed(fork.branches): @@ -350,6 +367,7 @@ class FinalizedBlockState: """ def __init__(self): + self.readonly = False self.data = {} self.by_hash = {} @@ -361,6 +379,8 @@ class FinalizedBlockState: def set(self, _fork: Optional[Fork], series, key, value, overwrite=True): assert overwrite + if self.readonly: + raise BlockState.ReadOnlyError() self.data.setdefault(series, {})[key] = value def iteritems(self, _fork: Optional[Fork], series): @@ -373,6 +393,8 @@ class FinalizedBlockState: return self.data.get(series,{}).values() def delete_series(self, _fork: Optional[Fork], series: str): + if self.readonly: + raise BlockState.ReadOnlyError() del self.data[series] diff --git a/src/dexorder/configuration/load.py b/src/dexorder/configuration/load.py index c0814b7..34b21cd 100644 --- a/src/dexorder/configuration/load.py +++ b/src/dexorder/configuration/load.py @@ -8,7 +8,7 @@ from omegaconf.errors import OmegaConfBaseException from .schema import Config -schema = OmegaConf.structured(Config()) +schema = OmegaConf.structured(Config(), flags={'struct': False}) _config_file = 'dexorder.toml' diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index 992220b..d5c2551 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -16,6 +16,7 @@ class Config: ws_url: Optional[str] = 'ws://localhost:8545' rpc_urls: Optional[dict[str,str]] = field(default_factory=dict) db_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorder' + db_readonly: bool = False dump_sql: bool = False redis_url: Optional[str] = 'redis://localhost:6379' diff --git a/src/dexorder/contract/decimals.py b/src/dexorder/contract/decimals.py deleted file mode 100644 index 9bc7802..0000000 --- a/src/dexorder/contract/decimals.py +++ /dev/null @@ -1,24 +0,0 @@ -import logging - -from dexorder import db -from dexorder.contract import ERC20, CONTRACT_ERRORS - -log = logging.getLogger(__name__) - - -async def token_decimals(addr): - key = f'td|{addr}' - try: - return db.kv[key] - except KeyError: - # noinspection PyBroadException - try: - decimals = await ERC20(addr).decimals() - except CONTRACT_ERRORS: - log.debug(f'token {addr} has no decimals()') - decimals = 0 - except Exception: - log.debug(f'could not get token decimals for {addr}') - return None - db.kv[key] = decimals - return decimals diff --git a/src/dexorder/database/__init__.py b/src/dexorder/database/__init__.py index bfc42f0..dd319ba 100644 --- a/src/dexorder/database/__init__.py +++ b/src/dexorder/database/__init__.py @@ -3,7 +3,7 @@ import logging from contextvars import ContextVar import sqlalchemy -from sqlalchemy import Engine +from sqlalchemy import Engine, event from sqlalchemy.orm import Session, SessionTransaction from .migrate import migrate_database @@ -99,7 +99,7 @@ class Db: _session.set(None) # noinspection PyShadowingNames - def connect(self, url=None, migrate=True, reconnect=False, dump_sql=None): + def connect(self, url=None, migrate=True, reconnect=False, dump_sql=None, readonly:bool=None): if _engine.get() is not None and not reconnect: return None if url is None: @@ -114,6 +114,19 @@ class Db: if dump_sql is None: dump_sql = config.dump_sql engine = sqlalchemy.create_engine(url, echo=dump_sql, json_serializer=json.dumps, json_deserializer=json.loads) + + if readonly is None: + readonly = config.db_readonly + if readonly: + @event.listens_for(engine, "connect") + def set_readonly(dbapi_connection, _connection_record): + cursor = dbapi_connection.cursor() + try: + cursor.execute("SET default_transaction_read_only = on;") + log.info('database connection set to READ ONLY') + finally: + cursor.close() + if migrate: migrate_database(url) with engine.connect() as connection: diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index eba8dee..c833950 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -58,7 +58,7 @@ async def handle_order_placed(event: EventData): log.debug(f'raw order status {obj}') order = Order.create(addr, index, event['transactionHash'], obj) await activate_order(order) - log.debug(f'new order {order.key}{order}') + log.debug(f'new order {order.key} {await order.pprint()}') async def handle_swap_filled(event: EventData): diff --git a/src/dexorder/order/orderstate.py b/src/dexorder/order/orderstate.py index 2e1da82..803f022 100644 --- a/src/dexorder/order/orderstate.py +++ b/src/dexorder/order/orderstate.py @@ -3,13 +3,14 @@ import logging from dataclasses import dataclass from typing import overload -from dexorder import DELETE, db, order_log +from dexorder import DELETE, db, order_log, from_timestamp from dexorder.base.chain import current_chain from dexorder.base.order import OrderKey, TrancheKey from dexorder.base.orderlib import SwapOrderState, ElaboratedSwapOrderStatus, Fill from dexorder.blockstate import BlockDict, BlockSet from dexorder.database.model.orderindex import OrderIndex from dexorder.routing import pool_address +from dexorder.tokens import adjust_decimals from dexorder.util import json from dexorder.vault_blockdata import vault_owners @@ -287,6 +288,29 @@ class Order: Order.vault_recently_closed_orders.listremove(key.vault, key.order_index) + def __str__(self): + return str(self.key) + + + async def pprint(self): + amount_token = self.order.tokenIn if self.order.amountIsInput else self.order.tokenOut + msg = f''' + SwapOrder {self.key} + status: {self.state.name} + placed: {from_timestamp(self.status.startTime)} + in: {self.order.tokenIn} + out: {self.order.tokenOut} + exchange: {self.order.route.exchange.name, self.order.route.fee} + amount: {"input" if self.order.amountIsInput else "output"} {await adjust_decimals(amount_token, self.filled):f}/{await adjust_decimals(amount_token, self.amount):f}{" to owner" if self.order.outputDirectlyToOwner else ""} + minFill: {await adjust_decimals(amount_token, self.min_fill_amount):f} + inverted: {self.order.inverted} + tranches: + ''' + for i in range(len(self.order.tranches)): + tranche = self.order.tranches[i] + msg += f' {tranche} filled {await adjust_decimals(amount_token, self.tranche_filled(i))}\n' + return msg + # ORDER STATE # various blockstate fields hold different aspects of an order's state. @@ -318,8 +342,6 @@ class Order: 'of', db=True, redis=True, pub=pub_order_fills, str2key=OrderKey.str2key, value2str=lambda v: json.dumps(v.dump()), str2value=lambda s:OrderFilled.load(json.loads(s))) - def __str__(self): - return str(self.order) # "active" means the order wants to be executed now. this is not BlockData because it's cleared every block active_orders: dict[OrderKey,Order] = {} diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index e7858b0..5543a29 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -226,20 +226,20 @@ async def has_funds(tk: TrancheKey): async def input_amount_is_sufficient(order, token_balance): - # log.debug(f'input is sufficient? {order.min_fill_amount}') + log.debug(f'input is sufficient? {order.min_fill_amount}') if order.amount_is_input: - # log.debug(f'amount is input: {token_balance} >= {order.min_fill_amount}') + log.debug(f'amount is input: {token_balance} >= {order.min_fill_amount}') return token_balance >= order.min_fill_amount # amount is an output amount, so we need to know the price price = pool_prices.get(order.pool_address) - # log.debug(f'amount is output amount. price={price}') + log.debug(f'amount is output amount. price={price}') if price is None: return token_balance > 0 # we don't know the price so we allow any nonzero amount to be sufficient pool = await get_pool(order.pool_address) price *= dec(10) ** -pool['decimals'] inverted = order.order.tokenIn != pool['base'] minimum = dec(order.min_fill_amount)*price if inverted else dec(order.min_fill_amount)/price - # log.debug(f'order minimum amount is {order.min_fill_amount} '+ ("input" if order.amount_is_input else f"output @ {price} = {minimum} ")+f'< {token_balance} balance') + log.debug(f'order minimum amount is {order.min_fill_amount} '+ ("input" if order.amount_is_input else f"output @ {price} = {minimum} ")+f'< {token_balance} balance') return token_balance >= minimum @@ -261,7 +261,7 @@ class BalanceTrigger (Trigger): async def update(self, balance): self.value = await input_amount_is_sufficient(self.order, balance) - # log.debug(f'update balance {balance} was sufficient? {self.value}') + log.debug(f'update balance {balance} was sufficient? {self.value} {self.order.key}') def remove(self): try: diff --git a/src/dexorder/vault_blockdata.py b/src/dexorder/vault_blockdata.py index c82cdec..1ea3952 100644 --- a/src/dexorder/vault_blockdata.py +++ b/src/dexorder/vault_blockdata.py @@ -2,12 +2,12 @@ import asyncio import functools import logging -from dexorder import current_pub +from dexorder import current_pub, dec from dexorder.base.chain import current_chain from dexorder.blockstate import BlockDict from dexorder.contract import ERC20, CONTRACT_ERRORS from dexorder.contract.dexorder import VaultContract, vault_address -from dexorder.util import json +from dexorder.util import json, align_decimal log = logging.getLogger(__name__) @@ -102,3 +102,6 @@ async def refresh_vault_balances(vault, *tokens): result[t] = a return result vault_balances.modify(vault, functools.partial(_adjust, vault, tokens, amounts)) + +def pretty_balances(b: dict[str,dec], padding=8) -> str: + return '\n'.join(f'{k:>} {align_decimal(v,padding)}' for k,v in b.items())