Compare commits

...

33 Commits

Author SHA1 Message Date
tim
4936150c3b bugfixes; startall works 2025-12-09 2025-12-09 15:11:58 -04:00
tim
88057607d5 put app back on app.dexorder.com and corp site on dexorder.com with www redirecting to apex 2025-05-19 15:19:20 -04:00
tim
36d0a863c6 remove spammy debug logs 2025-05-07 16:02:37 -04:00
tim
89ce46793e dotcom 2025-05-06 13:56:05 -04:00
tim
2bcf5d043c redis pipeline overflow fix 2025-04-23 15:20:00 -04:00
tim
71942d5b8f memcache init doesn't use transaction 2025-04-23 14:13:58 -04:00
tim
ef44973646 sharedata 2025-04-23 12:51:14 -04:00
tim
ce55609297 examine open orders 2025-04-07 01:32:19 -04:00
tim
a27300b5e4 info log for websocket connection drops 2025-04-03 18:15:16 -04:00
tim
f3faaa3dd6 tranchestatus tostring touchup 2025-04-01 14:20:58 -04:00
tim
0bb670b356 redis initial state push fix 2025-04-01 13:52:49 -04:00
tim
52b406ba17 ohlc retained length fix 2025-04-01 13:52:39 -04:00
tim
3d0342d19d price line metrics fix 2025-04-01 13:52:29 -04:00
tim
dbf960bae9 initial TrancheState fix 2025-04-01 13:52:21 -04:00
tim
d49f142fe3 redis pipeline autoflush after 10000 entries 2025-04-01 10:54:25 -04:00
tim
34fa439b3c USD marks 2025-03-29 15:27:13 -04:00
tim
41a1e2d9fe MIN_SLIPPAGE epsilon leeway 2025-03-28 20:05:52 -04:00
tim
66229e67bb bugfix for 0 slippage market orders 2025-03-26 23:48:43 -04:00
tim
31b6ddd314 initial redis state load doesn't use pipeline now, because it overflowed. 2025-03-26 23:25:10 -04:00
tim
07c6423fd5 USDC/USDC.e naming update 2025-03-26 17:17:54 -04:00
tim
4740687167 account release bugfix 2025-03-19 21:05:19 -04:00
tim
a06eeeb10d bugfix 2025-03-19 17:31:34 -04:00
tim
4492d23c47 better "addrmeta is None" fix 2025-03-16 21:17:19 -04:00
tim
1c0c2f0e63 "address_meta None" fix 2025-03-15 06:26:01 -04:00
tim
f3bdfdf97b trigger fixes 2025-03-10 21:09:40 -04:00
tim
be8c8bf019 order pprint touchup 2025-03-10 14:31:55 -04:00
tim
ecf1d21d5f bin/examine.py; readonly state; debug logs for Underfunded 2025-03-10 14:18:40 -04:00
tim
b7ed91d1c0 start of kraken accounting (unfinished) 2025-03-07 19:00:42 -04:00
tim
646449e456 underfunded state 2025-03-03 21:43:17 -04:00
tim
1bcf73de22 execute refactor for extraconf; accounting fixes 2025-02-28 01:04:12 -04:00
tim
af0f35eba5 execute refactor for extraconf; accounting fixes 2025-02-28 01:02:36 -04:00
tim
e868ea5a4b composable cli config 2025-02-27 17:51:07 -04:00
tim
c132f40164 transfer accounting fix 2025-02-27 14:23:07 -04:00
45 changed files with 832 additions and 233 deletions

View File

@@ -28,7 +28,7 @@ def upgrade() -> None:
sa.Column('time', sa.DateTime(), nullable=False),
sa.Column('account', sa.String(), nullable=False),
sa.Column('category', sa.Enum('Transfer', 'Income', 'Expense', 'Trade', 'Special', name='accountingcategory'), nullable=False),
sa.Column('subcategory', sa.Enum('OrderFee', 'GasFee', 'FillFee', 'VaultCreation', 'Execution', 'FeeAdjustment', 'InitialBalance', name='accountingsubcategory'), nullable=True),
sa.Column('subcategory', sa.Enum('OrderFee', 'GasFee', 'FillFee', 'Admin', 'TransactionGas', 'VaultCreation', 'Execution', 'FeeAdjustment', 'InitialBalance', name='accountingsubcategory'), nullable=True),
sa.Column('token', sa.String(), nullable=False),
sa.Column('amount', dexorder.database.column_types.DecimalNumeric(), nullable=False),
sa.Column('value', dexorder.database.column_types.DecimalNumeric(), nullable=True),

View File

@@ -0,0 +1,30 @@
"""sharedata
Revision ID: e47d1bca4b3d
Revises: 509010f13e8b
Create Date: 2025-04-23 11:23:10.809341
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = 'e47d1bca4b3d'
down_revision: Union[str, None] = '509010f13e8b'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table('sharedata',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint('id')
)
def downgrade() -> None:
op.drop_table('sharedata')

15
bin/examine Executable file
View File

@@ -0,0 +1,15 @@
#!/bin/bash
kubectl port-forward postgres-0 5431:5432 &
PF_PID=$!
shutdown () {
kill $PF_PID
wait
}
trap shutdown INT TERM
PYTHONPATH=src python -m dexorder.bin.examine rpc_url=arbitrum_dxod db_url=postgres://dexorder@localhost:5431/dexorder "$@"
shutdown

File diff suppressed because one or more lines are too long

View File

@@ -1,21 +1,24 @@
aiohappyeyeballs==2.4.3
aiohttp==3.11.12
aiohttp==3.11.13
aiosignal==1.3.1
alembic==1.14.1
alembic==1.15.1
annotated-types==0.7.0
antlr4-python3-runtime==4.9.3
asn1crypto==1.5.1
async-lru==2.0.4
attrs==23.2.0
bip-utils==2.9.3
bitarray==3.0.0
cachetools==5.5.1
bitarray==3.1.1
cachetools==5.5.2
cattrs==24.1.2
cbor2==5.6.4
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.4.1
ckzg==1.0.2
click==8.1.8
coincurve==20.0.0
coremltools==8.2
crcmod==1.7
cytoolz==0.12.3
defaultlist==1.0.0
@@ -31,39 +34,73 @@ eth-rlp==1.0.1
eth-typing==4.4.0
eth-utils==4.1.1
eth_abi==5.2.0
filelock==3.17.0
frozenlist==1.4.1
fsspec==2025.2.0
google-auth==2.35.0
greenlet==3.0.3
hexbytes==0.3.1
hiredis==3.0.0
idna==3.7
imageio==2.37.0
importlib_resources==6.5.2
Jinja2==3.1.6
joblib==1.4.2
jsonschema==4.21.1
jsonschema-specifications==2023.12.1
kraken==5.3.0
kubernetes==31.0.0
lazy_loader==0.4
lightning==2.4.0
lightning-utilities==0.14.0
lru-dict==1.2.0
lxml==5.3.1
Mako==1.3.3
markdown-it-py==3.0.0
MarkupSafe==2.1.5
mdurl==0.1.2
mpmath==1.3.0
msgpack-python==0.5.6
multidict==6.0.5
numpy==2.2.2
networkx==3.4.2
numpy==2.0.2
nvidia-cublas-cu12==12.1.3.1
nvidia-cuda-cupti-cu12==12.1.105
nvidia-cuda-nvrtc-cu12==12.1.105
nvidia-cuda-runtime-cu12==12.1.105
nvidia-cudnn-cu12==9.1.0.70
nvidia-cufft-cu12==11.0.2.54
nvidia-curand-cu12==10.3.2.106
nvidia-cusolver-cu12==11.4.5.107
nvidia-cusparse-cu12==12.1.0.106
nvidia-nccl-cu12==2.20.5
nvidia-nvjitlink-cu12==12.8.93
nvidia-nvtx-cu12==12.1.105
oauthlib==3.2.2
omegaconf==2.3.0
orjson==3.10.15
packaging==24.2
pagerduty==1.0.0
parsimonious==0.10.0
pillow==11.1.0
prometheus_client==0.21.1
propcache==0.2.0
protobuf==5.26.1
psycopg2-binary==2.9.10
py-sr25519-bindings==0.2.0
pyaml==25.1.0
pyarrow==19.0.1
pyasn1==0.6.1
pyasn1_modules==0.4.1
pycparser==2.22
pycryptodome==3.20.0
pydantic==2.9.2
pydantic_core==2.23.4
Pygments==2.19.1
PyNaCl==1.5.0
python-bidi==0.6.6
python-dateutil==2.9.0.post0
pytorch-lightning==2.5.0.post0
pytz==2025.1
pyunormalize==15.1.0
PyYAML==6.0.1
@@ -72,18 +109,32 @@ referencing==0.35.0
regex==2024.4.28
requests==2.32.3
requests-oauthlib==2.0.0
rich==13.9.4
rlp==4.0.1
rpds-py==0.18.0
rsa==4.9
scikit-image==0.24.0
scikit-learn==1.5.2
scipy==1.13.1
setuptools==75.8.2
shapely==2.0.7
six==1.16.0
socket.io-emitter==0.1.5.1
sortedcontainers==2.4.0
SQLAlchemy==2.0.38
sympy==1.13.3
threadpoolctl==3.5.0
tifffile==2025.2.18
toolz==0.12.1
torch==2.4.1
torchmetrics==1.6.2
torchvision==0.19.1
tqdm==4.67.1
triton==3.0.0
types-requests==2.32.0.20240914
typing_extensions==4.12.2
urllib3==2.2.1
web3==6.20.3
web3==6.20.4
websocket-client==1.8.0
websockets==14.2
websockets==13.1
yarl==1.17.2

View File

@@ -30,3 +30,4 @@ aiohttp
charset-normalizer
pytz
prometheus_client
krakenex

View File

@@ -35,14 +35,16 @@ class _Token:
def __repr__(self): return self.__token_name
def __str__(self): return self.__token_name
class _FalseToken (_Token):
class _FalseyToken (_Token):
def __bool__(self): return False
NARG = _FalseToken('NARG')
DELETE = _FalseToken('DELETE') # used as a value token to indicate removal of the key
NARG = _FalseyToken('NARG')
DELETE = _FalseyToken('DELETE') # used as a value token to indicate removal of the key
ADDRESS_0 = '0x0000000000000000000000000000000000000000'
NATIVE_TOKEN = '0x0000000000000000000000000000000000000001' # We use 0x01 to indicate the use of native ETH wherever a token address is normally required
USD_FIAT = '0x0000000000000000000000000000000000000055' # We use 0x55 (ASCII 'U') to indicate the use of fiat USD
CHAIN_ID_OFFCHAIN = -1
WEI = 1
GWEI = 1_000_000_000
ETH = 1_000_000_000_000_000_000
@@ -57,7 +59,7 @@ _cwd() # do this first so that config has the right current working directory
# ordering here is important!
from .base.chain import Blockchain # the singletons are loaded into the dexorder.blockchain.* namespace
from .util import async_yield
from .util import async_yield, json
from .base.fixed import Fixed2, FixedDecimals, Dec18
from .configuration import config
from .base.account import Account

View File

@@ -0,0 +1 @@
from .accounting import *

View File

@@ -34,19 +34,19 @@ class ReconciliationException(Exception):
pass
def accounting_lock():
"""
This must be called before accounting_*() calls are made.
"""
db.session.execute(text("LOCK TABLE account, accounting, reconciliation IN EXCLUSIVE MODE"))
async def initialize_accounting():
def initialize_accounting():
global accounting_initialized
if not accounting_initialized:
load_accounts_cache()
accounting_initialized = True
async def initialize_accounting_runner():
global accounting_initialized
if not accounting_initialized:
accounting_lock()
await _initialize_mark_to_market() # set up mark-to-market first, so accounts can value their initial balances
await _initialize_accounts()
load_accounts_cache()
accounting_initialized = True
log.info(f'accounting initialized\n\tstablecoins: {config.stablecoins}\n\tquotecoins: {config.quotecoins}\n\tnativecoin: {config.nativecoin}')
@@ -64,17 +64,23 @@ async def _initialize_accounts():
async def _initialize_accounts_2():
fm = await FeeManager.get()
of_account = _ensure_account(fm.order_fee_account_addr, AccountKind.OrderFee)
gf_account = _ensure_account(fm.gas_fee_account_addr, AccountKind.GasFee)
ff_account = _ensure_account(fm.fill_fee_account_addr, AccountKind.FillFee)
exe_accounts = [_ensure_account(account.address, AccountKind.Execution) for account in Account.all()]
of_account = ensure_account(fm.order_fee_account_addr, AccountKind.OrderFee)
gf_account = ensure_account(fm.gas_fee_account_addr, AccountKind.GasFee)
ff_account = ensure_account(fm.fill_fee_account_addr, AccountKind.FillFee)
exe_accounts = [ensure_account(account.address, AccountKind.Execution) for account in Account.all()]
if current_chain.get().id in [1337, 31337]:
log.debug('adjusting debug account balances')
await asyncio.gather(
*map(adjust_balance, (of_account, gf_account, ff_account, *exe_accounts))
)
for db_account in db.session.execute(select(DbAccount)).scalars():
def load_accounts_cache(*, chain=None):
if chain is None:
chain = current_chain.get()
for db_account in db.session.execute(select(DbAccount).where(DbAccount.chain==chain)).scalars():
_tracked_addrs.add(db_account.address)
log.info(f'tracking account {db_account.chain.id} {db_account.address}')
async def _initialize_mark_to_market():
@@ -124,13 +130,14 @@ async def _initialize_mark_to_market():
add_mark_pool(addr, pool['base'], pool['quote'], pool['fee'])
def _ensure_account(addr: str, kind: AccountKind) -> DbAccount:
chain = current_chain.get()
def ensure_account(addr: str, kind: AccountKind, *, chain=None) -> DbAccount:
if chain is None:
chain = current_chain.get()
found = db.session.get(DbAccount, (chain, addr))
if found:
if found.kind != kind:
log.warning(f'Account {addr} has wrong kind {found.kind} != {kind}')
found.kind = kind
# found.kind = kind
db.session.add(found)
_tracked_addrs.add(found.address)
else:
@@ -160,20 +167,21 @@ async def accounting_transfer(receipt: TransactionReceiptDict, token: str,
block_hash = hexstr(receipt['blockHash'])
tx_id = hexstr(receipt['transactionHash'])
await asyncio.gather(
add_accounting_row( sender, block_hash, tx_id, AccountingCategory.Transfer, None,
token, -amount, receiver, adjust_decimals=adjust_decimals),
add_accounting_row( receiver, block_hash, tx_id, AccountingCategory.Transfer, None,
token, amount, sender, adjust_decimals=adjust_decimals),
accounting_transaction_gas(receipt),
add_accounting_entry_m2m(sender, block_hash, tx_id, AccountingCategory.Transfer, None,
token, -amount, receiver, adjust_decimals=adjust_decimals),
add_accounting_entry_m2m(receiver, block_hash, tx_id, AccountingCategory.Transfer, None,
token, amount, sender, adjust_decimals=adjust_decimals),
)
async def accounting_transaction_gas(receipt: TransactionReceiptDict, subcategory: AccountingSubcategory):
async def accounting_transaction_gas(receipt: TransactionReceiptDict, subcategory: AccountingSubcategory = AccountingSubcategory.TransactionGas):
""" Accounts for the gas spent on the given transaction """
amount = dec(receipt['gasUsed']) * dec(receipt['effectiveGasPrice'])
await add_accounting_row( receipt['from'],
hexstr(receipt['blockHash']), hexstr(receipt['transactionHash']),
AccountingCategory.Expense, subcategory, NATIVE_TOKEN, -amount
)
await add_accounting_entry_m2m(receipt['from'],
hexstr(receipt['blockHash']), hexstr(receipt['transactionHash']),
AccountingCategory.Expense, subcategory, NATIVE_TOKEN, -amount
)
async def accounting_placement(order_placed: EventData):
@@ -186,10 +194,10 @@ async def accounting_placement(order_placed: EventData):
log.warning(f'Rogue DexorderPlacedEvent in tx {hexstr(tx_id)}')
return
fm = await FeeManager.get()
await add_accounting_row( fm.order_fee_account_addr, block_hash, tx_id, AccountingCategory.Income,
AccountingSubcategory.OrderFee, NATIVE_TOKEN, order_fee)
await add_accounting_row( fm.gas_fee_account_addr, block_hash, tx_id, AccountingCategory.Income,
AccountingSubcategory.GasFee, NATIVE_TOKEN, gas_fee)
await add_accounting_entry_m2m(fm.order_fee_account_addr, block_hash, tx_id, AccountingCategory.Income,
AccountingSubcategory.OrderFee, NATIVE_TOKEN, order_fee)
await add_accounting_entry_m2m(fm.gas_fee_account_addr, block_hash, tx_id, AccountingCategory.Income,
AccountingSubcategory.GasFee, NATIVE_TOKEN, gas_fee)
async def accounting_fill(fill: EventData, out_token: str) -> dec:
@@ -200,14 +208,14 @@ async def accounting_fill(fill: EventData, out_token: str) -> dec:
tx_id = hexstr(fill['transactionHash'])
fee = int(fill['args']['fillFee'])
fm = await FeeManager.get()
return await add_accounting_row(fm.fill_fee_account_addr, block_hash, tx_id, AccountingCategory.Income,
AccountingSubcategory.FillFee, out_token, fee)
return await add_accounting_entry_m2m(fm.fill_fee_account_addr, block_hash, tx_id, AccountingCategory.Income,
AccountingSubcategory.FillFee, out_token, fee)
async def add_accounting_row(account: str, block_hash: Optional[str], tx_id: Optional[str], category, subcategory, token, amount, note=None,
*, adjust_decimals=True) -> dec:
async def add_accounting_entry_m2m(account: str, block_hash: Optional[str], tx_id: Optional[str], category, subcategory, token, amount, note=None,
*, adjust_decimals=True) -> dec:
"""
Returns the mark-to-market USD value of the transaction.
Returns the mark-to-market USD value of the entry.
"""
if amount == 0:
return dec(0)
@@ -221,6 +229,13 @@ async def add_accounting_row(account: str, block_hash: Optional[str], tx_id: Opt
value = mark_to_market(token, amount)
log.debug(f'accounting row {time} {account} {category} {subcategory} {token} {amount} ${value}')
chain_id = current_chain.get().id
add_accounting_entry(chain_id, account, time, category, subcategory, token, amount, value, tx_id, note)
return value
def add_accounting_entry(chain_id, account, time, category, subcategory, token, amount, value=None, tx_id=None, note=None):
if not is_tracked_address(account):
return
db.session.add(Accounting(account=account,
time=time, category=category, subcategory=subcategory,
token=token, amount=amount, value=value, note=note,
@@ -229,15 +244,17 @@ async def add_accounting_row(account: str, block_hash: Optional[str], tx_id: Opt
account_db = db.session.get(DbAccount, (current_chain.get(), account))
new_amount = account_db.balances.get(token, dec(0)) + amount
if new_amount < 0:
log.error(f'negative balance for account {account} when applying accounting row {time} {category} {subcategory} {token} {amount} ${value}')
log.error(
f'negative balance for account {account} when applying accounting row {time} {category} {subcategory} {token} {amount} ${value}')
account_db.balances[token] = new_amount
db.session.add(account_db) # deep changes would not be detected by the ORM
return value
db.session.flush()
async def adjust_balance(account: DbAccount, token=NATIVE_TOKEN, subcategory=AccountingSubcategory.InitialBalance, note=None):
true_balance = await get_balance(account.address, token)
amount = true_balance - account.balances.get(token, dec(0))
await add_accounting_row(account.address, None, None, AccountingCategory.Special, subcategory, NATIVE_TOKEN, amount, note, adjust_decimals=False)
await add_accounting_entry_m2m(account.address, None, None, AccountingCategory.Special, subcategory, NATIVE_TOKEN, amount, note, adjust_decimals=False)
async def accounting_reconcile(account: DbAccount, block_id: Optional[str] = None, last_accounting_row_id: Optional[int] = None):

View File

@@ -0,0 +1,65 @@
import logging
import tempfile
from dataclasses import dataclass
from typing import Optional
import krakenex
from dexorder import timestamp
from dexorder.bin.executable import execute
log = logging.getLogger(__name__)
kraken_api_key=r'HqPHnGsAHunFtaP8YZTFsyh+LauVrcgFHi/US+RseR/4DiT+NG/JpONV'
kraken_api_secret=r'4hvdMdaN5TlNlyk2PShdRCsOE/T4sFzeBrR7ZjC+LUGuAXhBehY8vvWDZSUSyna2OFeOJ9GntPvyXOhrpx70Bg=='
kraken = krakenex.API()
# start and end should be timestamps or datetimes. inclusiveness is [start,end) as usual
def kraken_get_ledger(start=None, end=None):
entries = []
offset=1 # 1-based ffs
if start:
start = timestamp(start) - 1 # kraken start is EXCLUSIVE for some reason
if end:
end = timestamp(end) - 1 # kraken end is INCLUSIVE. :/
while True:
kl = kraken.query_private('Ledgers', {'start':start, 'end':end, 'ofs':offset})
print(repr(kl))
break
if kl.empty:
break
for t in kl.itertuples():
print(t)
# noinspection PyShadowingBuiltins
offset += len(kl)
return entries
@dataclass
class KrakenConfig:
kraken_api_key: Optional[str] = None
kraken_api_secret: Optional[str] = None
kraken_start: Optional[str]= None # timestamp or date
kraken_end: Optional[str] = None # timestamp or date
async def main(kconfig: KrakenConfig):
load_kraken_key(kconfig)
kraken_get_ledger()
def load_kraken_key(kconfig):
temp = tempfile.NamedTemporaryFile()
if not kconfig.kraken_api_key or not kconfig.kraken_api_secret:
log.error("Must set kraken_api_key= and kraken_api_secret= on the command line")
exit(1)
temp.write(kconfig.kraken_api_key.encode())
temp.write(b'\n')
temp.write(kconfig.kraken_api_secret.encode())
temp.write(b'\n')
kraken.load_key(temp.name)
if __name__ == '__main__':
execute(main, parse_args=KrakenConfig)

View File

@@ -19,7 +19,9 @@ class AddressMetadata (TypedDict):
def save_addrmeta(address: str, meta: AddressMetadata):
if meta['type'] == 'Token':
if meta is None:
pass
elif meta['type'] == 'Token':
meta: OldTokenDict
updated = Token.load(meta)
token = db.session.get(Token, (current_chain.get().id, address))

View File

@@ -42,10 +42,12 @@ class Account (LocalAccount):
# log.debug(f'available accounts: {Account._pool.qsize()}')
try:
async with asyncio.timeout(1):
result = await Account._pool.get()
result: "Account" = await Account._pool.get()
except asyncio.TimeoutError:
log.error('waiting for an available account')
result = await Account._pool.get()
# mark as out of pool
result._in_pool = False
metric.account_available.set(Account._pool.qsize())
return result
@@ -59,17 +61,20 @@ class Account (LocalAccount):
if Account._main_account is None:
Account._main_account = account
Account._pool.put_nowait(account)
account._in_pool = True # this account is now in the pool
Account._all.append(account)
metric.account_available.set(Account._pool.qsize())
metric.account_total.set(len(Account._all))
log.info(f'Account pool {[a.address for a in Account._all]}')
def __init__(self, local_account: LocalAccount): # todo chain_id?
super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code
def __init__(self, local_account: LocalAccount): # todo chain_id?
super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code
self.chain_id = current_chain.get().id
self.signing_middleware = construct_sign_and_send_raw_middleware(self)
self._nonce: Optional[int] = None
self.tx_id: Optional[str] = None # current transaction id
# release() idempotency tracking
self._in_pool: bool = False
async def next_nonce(self):
if self._nonce is None:
@@ -86,8 +91,21 @@ class Account (LocalAccount):
return current_w3.get().eth.get_balance(self.address)
def release(self):
metric.account_available.set(Account._pool.qsize() + 1)
"""
Return this Account to the pool.
Idempotent: calling release() multiple times without a new acquire()
will only enqueue the account once.
"""
# If we're already in the pool, do nothing.
if self._in_pool:
# Optional debug log; comment out if too noisy.
# log.debug(f'Account {self.address} already in pool; ignoring extra release()')
return
Account._pool.put_nowait(self)
self._in_pool = True
metric.account_available.set(Account._pool.qsize())
def __str__(self):
return self.address

View File

@@ -4,7 +4,7 @@ from dataclasses import dataclass
from enum import Enum
from typing import Optional
from dexorder import timestamp
from dexorder import timestamp, from_timestamp
from dexorder.util import hexbytes
from dexorder.util.convert import decode_IEEE754
@@ -250,6 +250,26 @@ class ElaboratedSwapOrderStatus:
def copy(self):
return copy.deepcopy(self)
def __str__(self):
msg = f'''
SwapOrder
status: {self.state.name}
in: {self.order.tokenIn}
out: {self.order.tokenOut}
exchange: {self.order.route.exchange.name, self.order.route.fee}
amount: {"input" if self.order.amountIsInput else "output"} {self.filledIn if self.order.amountIsInput else self.filledOut}/{self.order.amount}{" to owner" if self.order.outputDirectlyToOwner else ""}
minFill: {self.order.minFillAmount}
inverted: {self.order.inverted}
tranches:
'''
for i in range(len(self.trancheStatus)):
tranche = self.order.tranches[i]
ts = self.trancheStatus[i]
msg += f' {tranche}\n'
for fill in ts.fills:
msg += f' {fill}\n'
return msg
NO_OCO = 18446744073709551615 # max uint64
@@ -263,6 +283,9 @@ DISTANT_FUTURE = 4294967295 # max uint32
MAX_FRACTION = 65535 # max uint16
MIN_SLIPPAGE = 0.0001 # one bip
MIN_SLIPPAGE_EPSILON = 0.000000000003
@dataclass
class Tranche:
@@ -344,7 +367,7 @@ class Tranche:
)
def __str__(self):
msg = f'{self.fraction/MAX_FRACTION:.1%} {"start+" if self.startTimeIsRelative else ""}{self.startTime} to {"start+" if self.startTimeIsRelative else ""}{self.endTime}'
msg = f'{self.fraction/MAX_FRACTION:.1%} {"start+" if self.startTimeIsRelative else ""}{from_timestamp(self.startTime)} to {"start+" if self.startTimeIsRelative else ""}{from_timestamp(self.endTime)}'
if self.marketOrder:
# for marketOrders, minLine.intercept is the slippage
msg += f' market order slippage {self.minLine.intercept:.2%}'
@@ -352,11 +375,11 @@ class Tranche:
if self.minLine.intercept or self.minLine.slope:
msg += f' >{self.minLine.intercept:.5g}'
if self.minLine.slope:
msg += f'{self.minLine.slope:+.5g}/s({self.minLine.value():5g})'
msg += f'{self.minLine.slope:+.5g}/s={self.minLine.value():5g}'
if self.maxLine.intercept or self.maxLine.slope:
msg += f' <{self.maxLine.intercept:.5g}'
if self.maxLine.slope:
msg += f'{self.maxLine.slope:+.5g}/s({self.maxLine.value():5g})'
msg += f'{self.maxLine.slope:+.5g}/s={self.maxLine.value():5g}'
if self.rateLimitPeriod:
msg += f' {self.rateLimitFraction/MAX_FRACTION:.1%} every {self.rateLimitPeriod/60:.0} minutes'
return msg

View File

@@ -74,4 +74,4 @@ async def main():
if __name__ == '__main__':
execute(main())
execute(main)

View File

@@ -37,4 +37,4 @@ if __name__ == '__main__':
time = parse_date(sys.argv[1], ignoretz=True).replace(tzinfo=timezone.utc)
seconds_per_block = float(sys.argv[2])
sys.argv = [sys.argv[0], *sys.argv[3:]]
execute(main())
execute(main)

View File

@@ -0,0 +1,95 @@
import argparse
import logging
from dexorder import db, blockchain
from dexorder.base.order import OrderKey
from dexorder.blocks import current_block, get_block
from dexorder.blockstate import current_blockstate
from dexorder.blockstate.blockdata import BlockData
from dexorder.blockstate.db_state import DbState
from dexorder.blockstate.fork import current_fork
from dexorder.contract.dexorder import VaultContract
from dexorder.order.orderstate import Order
from dexorder.tokens import adjust_decimals
from dexorder.util import json
from dexorder.vault_blockdata import vault_balances, pretty_balances
from dexorder.bin.executable import execute
log = logging.getLogger(__name__)
async def dump_orders(orders, args):
if args.json:
print(json.dumps([order.status.dump() for order in orders]))
else:
first = True
for order in orders:
if first:
first = False
else:
print()
print(await order.pprint())
def command_vault_argparse(subparsers):
parser = subparsers.add_parser('vault', help='show the vault\'s balances and orders')
parser.add_argument('address', help='address of the vault')
parser.add_argument('--all', help='show all orders including closed ones', action='store_true')
parser.add_argument('--json', help='output in JSON format', action='store_true')
async def command_vault(args):
balances = vault_balances.get(args.address, {})
print(f'Vault {args.address} v{await VaultContract(args.address).version()}')
print(f'Balances:')
print(pretty_balances({k: (await adjust_decimals(k, v)) for k, v in balances.items()}))
print(f'Orders:')
i = 0
orders = []
while True:
key = OrderKey(args.address, i)
try:
order = Order.of(key)
except KeyError:
break
if args.all or order.is_open:
orders.append(order)
i += 1
await dump_orders(orders, args)
def command_open_argparse(subparsers):
parser = subparsers.add_parser('open', help='show all open orders')
parser.add_argument('--json', help='output in JSON format', action='store_true')
async def command_open(args):
await dump_orders([Order.of(key) for key in Order.open_orders], args)
async def main(args: list):
parser = argparse.ArgumentParser()
parser.add_argument('--chain-id', default=None)
subparsers = parser.add_subparsers(dest='command')
for name in globals():
if name.startswith('command_') and name.endswith('_argparse'):
globals()[name](subparsers)
parsed = parser.parse_args(args)
print(parsed)
try:
subcommand = globals()[f'command_{parsed.command}']
except KeyError:
parser.print_help()
exit(1)
await blockchain.connect()
db.connect()
db_state = DbState(BlockData.by_opt('db'))
with db.transaction():
state = await db_state.load()
# state.readonly = True
current_blockstate.set(state)
block = await get_block(state.root_hash)
current_block.set(block)
current_fork.set(state.root_fork)
await subcommand(parsed)
if __name__ == '__main__':
execute(main, parse_args=True)

View File

@@ -7,10 +7,13 @@ import tomllib
from asyncio import CancelledError
from signal import Signals
from traceback import print_exception
from typing import Coroutine
from typing import Coroutine, Callable, Union, Any
from omegaconf import OmegaConf
from dexorder import configuration, config
from dexorder.alert import init_alerts
from dexorder.configuration.schema import Config
from dexorder.metric.metric_startup import start_metrics_server
if __name__ == '__main__':
@@ -25,7 +28,27 @@ async def _shutdown_coro(_sig, _loop):
if task is not this_task:
task.cancel()
def execute(main:Coroutine, shutdown=None, *, parse_logging=True, parse_args=True):
def split_args():
omegaconf_args = []
regular_args = []
for arg in sys.argv[1:]:
if '=' in arg and not arg.startswith('--'):
key, value = arg.split('=', 1)
if hasattr(Config, key):
omegaconf_args.append(arg)
continue
regular_args.append(arg)
return omegaconf_args, regular_args
def execute(main:Callable[...,Coroutine[Any,Any,Any]], shutdown=None, *, parse_logging=True,
parse_args: Union[Callable[[list[str]],Any], type, bool]=True):
"""
if parse_args is a function, then the command-line arguments are given to OmegaConf first, and any args parsed by
OmegaConf are stripped from the args list. The remaining args are then passed to parse_args(args)
if parse_args is a type, then the type is used to parse the extra command-line arguments using OmegaConf.
"""
# config
configured = False
if parse_logging:
@@ -42,10 +65,23 @@ def execute(main:Coroutine, shutdown=None, *, parse_logging=True, parse_args=Tru
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
log.setLevel(logging.DEBUG)
log.info('Logging configured to default')
xconf = None
if parse_args:
# NOTE: there is special command-line argument handling in config/load.py to get a config filename.
# The -c/--config flag MUST BE FIRST if present.
configuration.parse_args()
# The rest of the arguments are split by format into key=value for omegaconf and anything else is "regular args"
omegaconf_args, regular_args = split_args()
configuration.parse_args(omegaconf_args)
# must check for `type` before `callable`, because types are also callables
if isinstance(parse_args, type):
# noinspection PyUnboundLocalVariable
xconf = OmegaConf.merge(OmegaConf.structured(parse_args), OmegaConf.from_cli(regular_args))
elif callable(parse_args):
# noinspection PyUnboundLocalVariable
xconf = parse_args(regular_args)
else:
# just pass the regular args to main
xconf = regular_args
init_alerts()
@@ -59,7 +95,14 @@ def execute(main:Coroutine, shutdown=None, *, parse_logging=True, parse_args=Tru
loop.add_signal_handler(s, lambda sig=s: asyncio.create_task(_shutdown_coro(sig, loop), name=f'{s.name} handler'))
# main
task = loop.create_task(main, name='main')
num_args = len(inspect.signature(main).parameters)
if num_args == 0:
coro = main()
elif num_args == 1:
coro = main(xconf)
else:
raise Exception(f'main() must accept 0 or 1 arguments, not {num_args}')
task = loop.create_task(coro, name='main')
try:
loop.run_until_complete(task)
except CancelledError:

View File

@@ -62,4 +62,4 @@ async def main():
if __name__ == '__main__':
execute(main())
execute(main)

View File

@@ -2,7 +2,7 @@ import logging
from asyncio import CancelledError
from dexorder import db, blockchain
from dexorder.accounting import initialize_accounting
from dexorder.accounting import initialize_accounting_runner
from dexorder.alert import infoAlert
from dexorder.base.chain import current_chain
from dexorder.bin.executable import execute
@@ -15,6 +15,7 @@ from dexorder.contract.dexorder import get_dexorder_contract
from dexorder.event_handler import (init, dump_log, handle_vault_created, handle_order_placed,
handle_transfer, handle_swap_filled, handle_order_canceled, handle_order_cancel_all,
handle_uniswap_swaps, handle_vault_impl_changed, update_metrics)
from dexorder.marks import publish_marks
from dexorder.memcache import memcache
from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.order.executionhandler import handle_dexorderexecutions, execute_tranches
@@ -83,6 +84,7 @@ def setup_logevent_triggers(runner):
# runner.add_callback(adjust_gas)
runner.add_callback(cleanup_jobs)
runner.add_callback(publish_marks)
runner.add_callback(update_metrics)
@@ -115,9 +117,9 @@ async def main():
if redis_state:
# load initial state
log.info('initializing redis with root state')
await redis_state.save(state.root_fork, state.diffs_by_branch[state.root_branch.id])
await redis_state.init(state, state.root_fork)
await initialize_accounting()
await initialize_accounting_runner()
runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None)
setup_logevent_triggers(runner)
@@ -138,4 +140,4 @@ async def main():
if __name__ == '__main__':
execute(main())
execute(main)

View File

@@ -95,7 +95,7 @@ async def write_metadata( pools, mirror_pools ):
pool_dicts = [get_pool(addr) for (addr,_inverted) in mirror_pools]
pool_dicts = await asyncio.gather(*pool_dicts)
for data, addr, (_,inverted) in zip(pool_dicts, pools, mirror_pools):
data['x'] = dict(data=dict(uri=f'https://app.dexorder.trade/ohlc/', chain=42161, symbol=addr, inverted=inverted))
data['x'] = dict(data=dict(uri=f'https://app.dexorder.com/ohlc/', chain=42161, symbol=addr, inverted=inverted))
tokens = set(p['base'] for p in pool_dicts)
tokens.update(p['quote'] for p in pool_dicts)
tokens = await asyncio.gather(*[get_token(t) for t in tokens])
@@ -190,6 +190,7 @@ async def main():
while True:
wake_up = now() + delay
# log.debug(f'querying {pool}')
tx = None
try:
price = await get_pool_price(pool)
if price != last_prices.get(pool):
@@ -200,7 +201,10 @@ async def main():
addr, inverted = mirror_pools[pool]
log.debug(f'Mirrored {addr} {price}')
except Exception as x:
log.debug(f'Could not update {pool}: {x}')
log.debug(f'Could not update {pool}: {x} {tx}')
if tx is not None:
tx.account.reset_nonce()
tx.account.release()
continue
try:
pool = next(pool_iter)
@@ -216,4 +220,4 @@ async def main():
if __name__ == '__main__':
execute(main())
execute(main)

View File

@@ -1,9 +1,9 @@
import logging
from sqlalchemy import select
from sqlalchemy import select, text
from dexorder import db, blockchain
from dexorder.accounting import accounting_reconcile, accounting_lock
from dexorder.accounting import accounting_reconcile
from dexorder.bin.executable import execute
from dexorder.blocks import fetch_latest_block, current_block
from dexorder.database.model import DbAccount
@@ -15,7 +15,7 @@ async def main():
db.connect()
block = await fetch_latest_block()
current_block.set(block)
accounting_lock()
db.session.execute(text("LOCK TABLE account, accounting, reconciliation IN EXCLUSIVE MODE"))
try:
accounts = db.session.execute(select(DbAccount)).scalars().all()
for account in accounts:
@@ -28,5 +28,4 @@ async def main():
if __name__ == '__main__':
execute(main())
execute(main)

View File

@@ -1,14 +1,23 @@
import logging
from dataclasses import dataclass
from dexorder import blockchain, db
from dexorder import blockchain, db, dec
from dexorder.bin.executable import execute
log = logging.getLogger(__name__)
async def main():
await blockchain.connect()
db.connect()
@dataclass
class RefillConfig:
refill_level: dec
refill_accounts: list[str]
async def main(refill_config: RefillConfig):
# await blockchain.connect()
# db.connect()
log.info(f'Refilling to {refill_config.refill_level:.18f} ETH')
log.info(f'Refilling accounts: {refill_config.refill_accounts}')
if __name__ == '__main__':
execute(main())
execute(main, parse_args=RefillConfig)

View File

@@ -22,6 +22,11 @@ from dexorder.util import hexbytes
log = logging.getLogger(__name__)
def blocktime():
""" timestamp of the most recent block seen in real-time, NOT the current block being worked on """
return latest_block[current_chain.get().id].timestamp
async def get_block_timestamp(block_id: Union[bytes,int]) -> int:
block = await get_block(block_id)
if block is None:

View File

@@ -64,7 +64,7 @@ class BlockData (Generic[T]):
if self.lazy_getitem:
lazy = self.lazy_getitem(self, item)
if lazy is not NARG:
state.set(state.root_fork, self.series, item, lazy)
state.set(state.root_fork, self.series, item, lazy, readonly_override=True)
result = lazy
if result is NARG:
raise KeyError

View File

@@ -53,7 +53,10 @@ class BlockState:
with a diff height of the root branch or older is always part of the finalized blockchain.
"""
class ReadOnlyError(Exception): ...
def __init__(self):
self.readonly = False
self._root_branch: Optional[Branch] = None
self._root_fork: Optional[Fork] = None
self.height: int = 0 # highest branch seen
@@ -80,6 +83,8 @@ class BlockState:
@root_branch.setter
def root_branch(self, value: Branch):
if self.readonly:
raise self.ReadOnlyError()
self._root_branch = value
self._root_fork = Fork([value])
@@ -92,6 +97,8 @@ class BlockState:
return self._root_branch.head
def init_root_block(self, root_block: Block) -> Fork:
if self.readonly:
raise self.ReadOnlyError()
assert self.root_branch is None
return self.add_branch(Branch.from_block(root_block))
@@ -113,6 +120,8 @@ class BlockState:
should only be set to False when it is assured that the branch may be joined by height alone, because
the branch join is known to be at a live-blockchain-finalized height.
"""
if self.readonly:
raise self.ReadOnlyError()
assert branch.id not in self.branches_by_id
if self.root_branch is None:
@@ -155,6 +164,8 @@ class BlockState:
def remove_branch(self, branch: Branch, *, remove_series_diffs=True):
if self.readonly:
raise self.ReadOnlyError()
if branch.height == self.height and len(self.branches_by_height[branch.height]) == 1:
# this is the only branch at this height: compute the new lower height
other_heights = [b.height for b in self.branches_by_id.values() if b is not branch]
@@ -210,7 +221,9 @@ class BlockState:
return DELETE
def set(self, fork: Fork, series, key, value, overwrite=True):
def set(self, fork: Fork, series, key, value, overwrite=True, *, readonly_override=False):
if not readonly_override and self.readonly:
raise self.ReadOnlyError()
# first look for an existing value
branch = fork.branch
diffs = self.diffs_by_series.get(series,{}).get(key)
@@ -236,6 +249,8 @@ class BlockState:
return old_value
def unload(self, fork: Optional[Fork], series, key):
if self.readonly:
raise self.ReadOnlyError()
self.unloads[fork.branch_id].append((series, key))
def iteritems(self, fork: Optional[Fork], series):
@@ -285,6 +300,8 @@ class BlockState:
Returns the set of diffs for the promoted fork.
"""
if self.readonly:
raise self.ReadOnlyError()
found_root = False
promotion_branches = []
for branch in reversed(fork.branches):
@@ -350,6 +367,7 @@ class FinalizedBlockState:
"""
def __init__(self):
self.readonly = False
self.data = {}
self.by_hash = {}
@@ -361,6 +379,8 @@ class FinalizedBlockState:
def set(self, _fork: Optional[Fork], series, key, value, overwrite=True):
assert overwrite
if self.readonly:
raise BlockState.ReadOnlyError()
self.data.setdefault(series, {})[key] = value
def iteritems(self, _fork: Optional[Fork], series):
@@ -373,6 +393,8 @@ class FinalizedBlockState:
return self.data.get(series,{}).values()
def delete_series(self, _fork: Optional[Fork], series: str):
if self.readonly:
raise BlockState.ReadOnlyError()
del self.data[series]

View File

@@ -8,7 +8,7 @@ from omegaconf.errors import OmegaConfBaseException
from .schema import Config
schema = OmegaConf.structured(Config())
schema = OmegaConf.structured(Config(), flags={'struct': False})
_config_file = 'dexorder.toml'

View File

@@ -16,6 +16,7 @@ class Config:
ws_url: Optional[str] = 'ws://localhost:8545'
rpc_urls: Optional[dict[str,str]] = field(default_factory=dict)
db_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorder'
db_readonly: bool = False
dump_sql: bool = False
redis_url: Optional[str] = 'redis://localhost:6379'
@@ -41,12 +42,17 @@ class Config:
fee_leeway = 0.1 # do not adjust fees if they are within this proportion
min_gas: str = '0'
mark_publish_seconds: float = 60 # publish mark prices every this number of seconds
# Order slashing
slash_kill_count: int = 5
slash_delay_base: float = 60 # one minute
slash_delay_mul: float = 2 # double the delay each time
slash_delay_max: int = 15 * 60
# Tranches are paused for this long after they trigger a slippage control
slippage_control_delay: float = 10 # matches the 10-second TWAP used by our uniswap router
walker_name: str = 'default'
walker_flush_interval: float = 300
walker_stop: Optional[int] = None # block number of the last block the walker should process
@@ -60,6 +66,3 @@ class Config:
stablecoins: list[str] = field(default_factory=list) # primary stablecoins which are marked to $1
quotecoins: list[str] = field(default_factory=list) # quote tokens like WETH that have stablecoin markets
nativecoin: Optional[str] = None # used for accounting of native values. e.g. address of WETH
# account: target_balance
refill: dict[str,str] = field(default_factory=dict)

View File

@@ -33,7 +33,8 @@ class ContractTransaction:
async def wait(self) -> TxReceipt:
if self.receipt is None:
self.receipt = await current_w3.get().eth.wait_for_transaction_receipt(self.id)
self.account.release()
if self.account is not None:
self.account.release()
return self.receipt
async def sign(self, account: Account):
@@ -81,14 +82,17 @@ def transact_wrapper(addr, name, func):
account = await Account.acquire()
if account is None:
raise ValueError(f'No account to sign transaction {addr}.{name}()')
await ct.sign(account)
try:
tx_id = await current_w3.get().eth.send_raw_transaction(ct.data)
assert tx_id == ct.id_bytes
return ct
except Web3Exception as e:
e.args += addr, name
raise e
await ct.sign(account)
try:
tx_id = await current_w3.get().eth.send_raw_transaction(ct.data)
assert tx_id == ct.id_bytes
return ct
except Web3Exception as e:
e.args += addr, name
raise e
finally:
account.release()
return f
@@ -150,10 +154,14 @@ class ContractProxy:
def __getattr__(self, item):
if item == 'constructor':
found = self.contract.constructor
elif item in self.contract.functions:
found = self.contract.functions[item]
else:
raise AttributeError(item)
funcs = self.contract.functions
# In web3.py v6+, contract functions are exposed as attributes, not via __getitem__.
# Using getattr ensures we obtain the callable factory for the function; indexing may return None.
# Additionally, guard against unexpected None to fail fast with a clear error.
found = getattr(funcs, item, None)
if not callable(found):
raise AttributeError(f"Function '{item}' not found on contract {self._interface_name} at {self.address}")
return self._wrapper(self.address, item, found)
def __repr__(self):

View File

@@ -1,24 +0,0 @@
import logging
from dexorder import db
from dexorder.contract import ERC20, CONTRACT_ERRORS
log = logging.getLogger(__name__)
async def token_decimals(addr):
key = f'td|{addr}'
try:
return db.kv[key]
except KeyError:
# noinspection PyBroadException
try:
decimals = await ERC20(addr).decimals()
except CONTRACT_ERRORS:
log.warning(f'token {addr} has no decimals()')
decimals = 0
except Exception:
log.debug(f'could not get token decimals for {addr}')
return None
db.kv[key] = decimals
return decimals

View File

@@ -3,7 +3,7 @@ import logging
from contextvars import ContextVar
import sqlalchemy
from sqlalchemy import Engine
from sqlalchemy import Engine, event
from sqlalchemy.orm import Session, SessionTransaction
from .migrate import migrate_database
@@ -99,7 +99,7 @@ class Db:
_session.set(None)
# noinspection PyShadowingNames
def connect(self, url=None, migrate=True, reconnect=False, dump_sql=None):
def connect(self, url=None, migrate=True, reconnect=False, dump_sql=None, readonly:bool=None):
if _engine.get() is not None and not reconnect:
return None
if url is None:
@@ -114,6 +114,19 @@ class Db:
if dump_sql is None:
dump_sql = config.dump_sql
engine = sqlalchemy.create_engine(url, echo=dump_sql, json_serializer=json.dumps, json_deserializer=json.loads)
if readonly is None:
readonly = config.db_readonly
if readonly:
@event.listens_for(engine, "connect")
def set_readonly(dbapi_connection, _connection_record):
cursor = dbapi_connection.cursor()
try:
cursor.execute("SET default_transaction_read_only = on;")
log.info('database connection set to READ ONLY')
finally:
cursor.close()
if migrate:
migrate_database(url)
with engine.connect() as connection:

View File

@@ -10,3 +10,4 @@ from .ofac import OFAC, OFACAlerts
from .accounting import Accounting, DbAccount
from .vaultcreationrequest import VaultCreationRequest
from .tos import TOSAcceptance
from .sharedata import ShareData

View File

@@ -1,7 +1,7 @@
import logging
from datetime import datetime
from decimal import Decimal as dec
from enum import Enum
from enum import Enum, auto
from sqlalchemy import ForeignKeyConstraint
from sqlalchemy.ext.mutable import MutableDict
@@ -17,35 +17,37 @@ log = logging.getLogger(__name__)
class AccountingCategory (Enum):
Transfer = 0
Income = 1
Expense = 2
Trade = 3
Special = 4
Transfer = auto()
Income = auto()
Expense = auto()
Trade = auto()
Special = auto()
class AccountingSubcategory (Enum):
# Income
OrderFee = 0
GasFee = 1
FillFee = 2
OrderFee = auto()
GasFee = auto()
FillFee = auto()
# Expense
VaultCreation = 3
Execution = 4
FeeAdjustment = 5 # includes adjusting fee limits
Admin = auto() # contract deployments and upgrades, changing adjuster address, etc.
TransactionGas = auto()
VaultCreation = auto()
Execution = auto()
FeeAdjustment = auto() # includes adjusting fee limits
# Transfer
# Transfers have no subcategories, but the note field will be the address of the other account. Both a debit and a
# credit entry will be created, one for each account participating in the transfer.
# Special Codes
InitialBalance = 5
InitialBalance = auto()
class Accounting (Base):
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
time: Mapped[datetime] = mapped_column(default=now(), index=True)
chain_id: Mapped[int] = mapped_column(index=True)
chain_id: Mapped[int] = mapped_column(index=True) # chain_id
account: Mapped[str] = mapped_column(index=True)
category: Mapped[AccountingCategory] = mapped_column(index=True)
subcategory: Mapped[Optional[AccountingSubcategory]] = mapped_column(index=True)

View File

@@ -0,0 +1,12 @@
import logging
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from dexorder.database.model import Base
log = logging.getLogger(__name__)
class ShareData (Base):
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
data: Mapped[dict] = mapped_column(JSONB)

View File

@@ -4,8 +4,7 @@ import logging
from web3.types import EventData
from dexorder import db, metric, current_w3, timestamp
from dexorder.accounting import accounting_fill, accounting_placement, accounting_transfer, is_tracked_address, \
accounting_lock
from dexorder.accounting import accounting_fill, accounting_placement
from dexorder.base.chain import current_chain
from dexorder.base.order import TrancheKey, OrderKey
from dexorder.base.orderlib import SwapOrderState
@@ -32,7 +31,6 @@ def dump_log(eventlog):
def init():
new_pool_prices.clear()
start_trigger_updates()
accounting_lock()
async def handle_order_placed(event: EventData):
@@ -60,7 +58,7 @@ async def handle_order_placed(event: EventData):
log.debug(f'raw order status {obj}')
order = Order.create(addr, index, event['transactionHash'], obj)
await activate_order(order)
log.debug(f'new order {order.key}{order}')
log.debug(f'new order {order.key} {await order.pprint()}')
async def handle_swap_filled(event: EventData):
@@ -139,10 +137,11 @@ async def handle_transfer(transfer: EventData):
vault = None
if vault is not None:
await adjust_balance(vault, token_address, amount)
await update_balance_triggers(vault, token_address, amount)
if is_tracked_address(to_address):
# noinspection PyTypeChecker
await accounting_transfer(transfer, token_address, from_address, to_address, amount, adjust_decimals=True)
await update_balance_triggers(vault, token_address)
# This wuold double-count fill fees. Instead, we book the transfer when sending money to the account as part of a refill.
# if is_tracked_address(to_address):
# # noinspection PyTypeChecker
# await accounting_transfer(transfer, token_address, from_address, to_address, amount, adjust_decimals=True)
async def handle_uniswap_swaps(swaps: list[EventData]):
# asynchronously prefetch the block timestamps we'll need
@@ -221,7 +220,7 @@ async def update_metrics():
metric.vaults.set(vault_owners.upper_len())
metric.open_orders.set(Order.open_orders.upper_len())
metric.triggers_time.set(len(TimeTrigger.all))
metric.triggers_line.set(len(PriceLineTrigger.triggers_set))
metric.triggers_line.set(sum(len(s) for s in PriceLineTrigger.by_pool.values()))
# slow updates
global slow_metric_update

44
src/dexorder/marks.py Normal file
View File

@@ -0,0 +1,44 @@
"""
"marks" are mark-to-market USD values of a selected set of tokens called quote tokens. Publishing a set of USD marks
for the quote tokens allows almost any token to be marked to USD via one hop.
"""
import logging
import time
from dexorder import dec, NATIVE_TOKEN, config
from dexorder.base.chain import current_chain
from dexorder.blockstate import BlockDict
from dexorder.pools import quotes, mark_to_market
log = logging.getLogger(__name__)
def pub_marks(_s,k,v):
chain_id = current_chain.get().id
return str(chain_id), 'marks.usd', (chain_id, k, str(v))
marks: BlockDict[str, dec] = BlockDict('mark.usd', db=False, redis=True, pub=pub_marks, value2str=str)
class RateLimiter:
def __init__(self, rate: float):
self.rate = rate
self.last_update = 0.0
def ready(self):
now = time.monotonic()
if now - self.last_update < self.rate:
return False
self.last_update = now
return True
mark_publish_rate = RateLimiter(config.mark_publish_seconds)
def publish_marks():
if mark_publish_rate.ready():
for token_addr in [NATIVE_TOKEN]+quotes:
# overwrite=False checks the previous value and does not generate a diff if the values match. This prevents
# excessive updates to Redis
value = mark_to_market(token_addr)
if value is not None:
marks.setitem(token_addr, value, overwrite=False)

View File

@@ -1,3 +1,4 @@
import itertools
import logging
from contextlib import asynccontextmanager
from contextvars import ContextVar
@@ -10,16 +11,70 @@ from dexorder import config
log = logging.getLogger(__name__)
BATCH_SIZE = 1_000
class PipelineProxy:
def __init__(self, pipe: Pipeline):
self.pipe = pipe
self.ops = 0
async def push(self, num=1):
self.ops += num
if self.ops >= BATCH_SIZE:
self.ops = 0
await self.pipe.execute()
async def sadd(self, series, *keys):
while keys:
most = min(BATCH_SIZE-self.ops, len(keys))
assert most > 0
send = keys[:most]
keys = keys[most:]
await self.pipe.sadd(series, *send)
await self.push(len(send))
async def srem(self, series, *keys):
while keys:
most = min(BATCH_SIZE-self.ops, len(keys))
assert most > 0
send = keys[:most]
keys = keys[most:]
await self.pipe.srem(series, *send)
await self.push(len(send))
async def hset(self, series, *, mapping):
items = list(mapping.items())
while items:
most = min(BATCH_SIZE-self.ops, len(items))
assert most > 0
send = items[:most]
items = items[most:]
await self.pipe.hset(series, mapping={k:v for k,v in send})
await self.push(len(send))
async def hdel(self, series, *keys):
while keys:
most = min(BATCH_SIZE-self.ops, len(keys))
assert most > 0
send = keys[:most]
keys = keys[most:]
await self.pipe.hdel(series, *send)
await self.push(len(send))
def __getattr__(self, item):
return getattr(self.pipe, item)
class Memcache:
@staticmethod
@asynccontextmanager
async def batch():
async def batch(transaction=True):
old_redis: Redis = current_redis.get()
pipe: Pipeline = old_redis.pipeline()
pipe = old_redis.pipeline(transaction=transaction)
# noinspection PyTypeChecker
current_redis.set(pipe)
try:
yield pipe
yield PipelineProxy(pipe)
await pipe.execute()
finally:
current_redis.set(old_redis)

View File

@@ -12,7 +12,7 @@ from dexorder.blockstate.blockdata import SeriesCollection, BlockData
from dexorder.blockstate.diff import DiffEntryItem
from dexorder.blockstate.fork import Fork
from dexorder.blockstate.state import compress_diffs
from dexorder.memcache import current_redis, memcache
from dexorder.memcache import current_redis, memcache, PipelineProxy
from dexorder.util import hexstr
from dexorder.util.async_util import maywait
from dexorder.util.json import json_encoder
@@ -40,11 +40,11 @@ class RedisState (SeriesCollection):
for series in self.datas.keys():
for k, v in state.iteritems(fork, series):
diffs.append(DiffItem(series, k, v))
await self.save(fork, diffs)
await self.save(fork, diffs, use_transaction=False, skip_pubs=True) # use_transaction=False if the data is too big
# noinspection PyAsyncCall
async def save(self, fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
async def save(self, fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]], *, use_transaction=True, skip_pubs=False):
# the diffs must be already compressed such that there is only one action per key
chain = current_chain.get()
chain_id = chain.id
@@ -91,22 +91,23 @@ class RedisState (SeriesCollection):
hsets[series][key] = value
else:
raise NotImplementedError
async with memcache.batch() as r:
r: Pipeline
async with memcache.batch(use_transaction) as r:
r: PipelineProxy
for series, keys in sadds.items():
r.sadd(series, *keys)
await r.sadd(series, *keys)
for series, keys in sdels.items():
r.srem(series, *keys)
await r.srem(series, *keys)
for series, kvs in hsets.items():
r.hset(series, mapping=kvs)
await r.hset(series, mapping=kvs)
for series, keys in hdels.items():
r.hdel(series, *keys)
await r.hdel(series, *keys)
block_series = f'{chain_id}|head'
headstr = hexstr(fork.head)
r.json(json_encoder).set(block_series,'$',[fork.height, headstr])
pubs.append((str(chain_id), 'head', [fork.height, headstr]))
# separate batch for pubs
if pubs:
if pubs and not skip_pubs:
await publish_all(pubs)

View File

@@ -359,7 +359,7 @@ class OHLCRepository:
if price is None, then bars are advanced based on the time but no new price is added to the series.
"""
if OHLC_LIMIT_POOLS_DEBUG is not None and (symbol,period) not in OHLC_LIMIT_POOLS_DEBUG:
return
return None
# logname = f'{symbol} {period_name(period)}'
# log.debug(f'Updating OHLC {logname} {minutely(time)} {price}')
if price is not None:
@@ -371,33 +371,31 @@ class OHLCRepository:
# log.debug(f'got recent {historical}')
if not historical:
if create is False or price is None:
return # do not track symbols which have not been explicity set up
historical = []
return None # do not track symbols which have not been explicity set up
updated = [NativeOHLC(ohlc_start_time(time, period), price, price, price, price)]
# log.debug(f'\tcreated new bars {updated}')
else:
updated = update_ohlc(historical[-1], period, time, price)
# drop any historical bars that are older than we need
# oldest_needed = cover the root block time plus one period prior
root_branch = current_blockstate.get().root_branch
root_hash = root_branch.head
if root_hash is not None:
root_timestamp = await get_block_timestamp(root_hash)
oldest_needed = from_timestamp(root_timestamp) - period
# noinspection PyTypeChecker
trim = (oldest_needed - historical[0].start) // period
if trim > 0:
historical = historical[trim:]
# now overlap the updated data on top of the historical data
if not historical or not updated:
updated = historical + updated
else:
# overlap the updated OHLC's on top of the historical ones
last_bar = historical[-1].start
first_updated = updated[0].start
overlap = (first_updated - last_bar) // period + 1
updated = historical[:-overlap] + updated if overlap > 0 else historical + updated
# log.debug(f'\tnew recents: {updated}')
# drop any bars that are older than we need
# oldest_needed = cover the root block time plus one period prior
root_branch = current_blockstate.get().root_branch
root_hash = root_branch.head
if root_hash is not None:
root_timestamp = await get_block_timestamp(root_hash)
oldest_needed = from_timestamp(root_timestamp) - period
# noinspection PyTypeChecker
trim = (oldest_needed - updated[0].start) // period
if trim > 0:
updated = updated[trim:]
# if len(updated) > 3:
# log.debug(f'\tnew recents ({len(updated)}): {updated}')
recent_ohlcs.setitem(key, updated)
return updated

View File

@@ -6,7 +6,7 @@ from uuid import UUID
from web3.exceptions import ContractPanicError, ContractLogicError
from web3.types import EventData
from dexorder import db, metric
from dexorder import db, metric, config
from dexorder.accounting import accounting_transaction_gas
from dexorder.base import TransactionReceiptDict, TransactionRequest, transaction_request_deserializers
from dexorder.base.order import TrancheKey, OrderKey
@@ -121,6 +121,11 @@ async def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
if trig is not None:
trig.touch()
def delay(secs=None):
trig = get_trigger()
if trig is not None:
trig.deactivate(secs if secs is not None else config.slippage_control_delay)
if error is None:
metric.executions.inc()
else:
@@ -162,6 +167,7 @@ async def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
retry()
elif error == 'RL':
log.debug(f'tranche {tk} execution failed due to "RL" rate limit')
delay()
retry()
elif error == 'TE':
log.debug(f'tranche {tk} execution failed due to "TE" too early')

View File

@@ -3,13 +3,14 @@ import logging
from dataclasses import dataclass
from typing import overload
from dexorder import DELETE, db, order_log
from dexorder import DELETE, db, order_log, from_timestamp
from dexorder.base.chain import current_chain
from dexorder.base.order import OrderKey, TrancheKey
from dexorder.base.orderlib import SwapOrderState, ElaboratedSwapOrderStatus, Fill
from dexorder.blockstate import BlockDict, BlockSet
from dexorder.database.model.orderindex import OrderIndex
from dexorder.routing import pool_address
from dexorder.tokens import adjust_decimals
from dexorder.util import json
from dexorder.vault_blockdata import vault_owners
@@ -127,7 +128,7 @@ class Order:
key = a if b is None else OrderKey(a, b)
assert key not in Order.instances
self.key = key
self.status: ElaboratedSwapOrderStatus = Order.order_statuses[key].copy()
self._status: ElaboratedSwapOrderStatus = Order.order_statuses[key].copy()
self.pool_address: str = pool_address(self.status.order)
self.tranche_keys = [TrancheKey(key.vault, key.order_index, i) for i in range(len(self.status.trancheStatus))]
# flattenings of various static data
@@ -138,6 +139,14 @@ class Order:
self.tranche_amounts = [t.fraction_of(self.amount) for t in self.order.tranches]
Order.instances[self.key] = self
@property
def status(self):
return self._status
@status.setter
def status(self, v):
self._status = Order.order_statuses[self.key] = v
@property
def state(self):
return self.status.state
@@ -279,6 +288,33 @@ class Order:
Order.vault_recently_closed_orders.listremove(key.vault, key.order_index)
def __str__(self):
return str(self.key)
async def pprint(self):
amount_token = self.order.tokenIn if self.order.amountIsInput else self.order.tokenOut
msg = f'''
SwapOrder {self.key}
status: {self.state.name}
placed: {from_timestamp(self.status.startTime)}
in: {self.order.tokenIn}
out: {self.order.tokenOut}
exchange: {self.order.route.exchange.name, self.order.route.fee}
amount: {"input" if self.order.amountIsInput else "output"} {await adjust_decimals(amount_token, self.filled):f}/{await adjust_decimals(amount_token, self.amount):f}{" to owner" if self.order.outputDirectlyToOwner else ""}
minFill: {await adjust_decimals(amount_token, self.min_fill_amount):f}
inverted: {self.order.inverted}
tranches:
'''
for i in range(len(self.order.tranches)):
tranche = self.order.tranches[i]
msg += f' {tranche}'
filled_amount = self.tranche_filled(i)
if filled_amount:
msg += f' filled {await adjust_decimals(amount_token, filled_amount)}'
msg += '\n'
return msg
# ORDER STATE
# various blockstate fields hold different aspects of an order's state.
@@ -310,8 +346,6 @@ class Order:
'of', db=True, redis=True, pub=pub_order_fills,
str2key=OrderKey.str2key, value2str=lambda v: json.dumps(v.dump()), str2value=lambda s:OrderFilled.load(json.loads(s)))
def __str__(self):
return str(self.order)
# "active" means the order wants to be executed now. this is not BlockData because it's cleared every block
active_orders: dict[OrderKey,Order] = {}

View File

@@ -2,13 +2,15 @@ import asyncio
import logging
from abc import abstractmethod
from collections import defaultdict
from datetime import timedelta
from enum import Enum, auto
from typing import Optional, Sequence
from typing import Optional, Sequence, Union
import numpy as np
from sortedcontainers import SortedList
from dexorder.base.orderlib import SwapOrderState, PriceProof, DISTANT_FUTURE, DISTANT_PAST, Line
from dexorder.base.orderlib import SwapOrderState, PriceProof, DISTANT_FUTURE, DISTANT_PAST, Line, MIN_SLIPPAGE, \
MIN_SLIPPAGE_EPSILON
from dexorder.blockstate import BlockDict
from .orderstate import Order
from .. import dec, order_log, timestamp, from_timestamp, config
@@ -55,13 +57,13 @@ class OrderTriggers:
self.order = order
self.triggers = triggers
OrderTriggers.instances[order.key] = self
log.debug(f'created OrderTriggers for {order.key}')
# log.debug(f'created OrderTriggers for {order.key}')
def disable(self):
for t in self.triggers:
t.disable()
del OrderTriggers.instances[self.order.key]
log.debug(f'disabled OrderTriggers for {self.order.key}')
# log.debug(f'disabled OrderTriggers for {self.order.key}')
@property
def closed(self):
@@ -71,6 +73,10 @@ class OrderTriggers:
def open(self):
return not self.closed
@property
def error(self):
return any(t.error for t in self.triggers)
def check_complete(self):
if self.closed:
final_state = SwapOrderState.Filled if self.order.remaining == 0 or self.order.remaining < self.order.min_fill_amount else SwapOrderState.Expired
@@ -99,7 +105,8 @@ def start_trigger_updates():
PriceLineTrigger.clear_data()
async def update_balance_triggers(vault: str, token: str, balance: int):
async def update_balance_triggers(vault: str, token: str):
balance = vault_balances.get(vault, {}).get(token)
updates = [bt.update(balance) for bt in BalanceTrigger.by_vault_token.get((vault, token), [])]
await asyncio.gather(*updates)
@@ -210,13 +217,13 @@ class Trigger:
async def has_funds(tk: TrancheKey):
log.debug(f'has funds? {tk.vault}')
# log.debug(f'has funds? {tk.vault}')
order = Order.of(tk)
balances = vault_balances.get(tk.vault, {})
log.debug(f'balances {balances}')
# log.debug(f'balances {balances}')
token_addr = order.status.order.tokenIn
token_balance = balances.get(token_addr)
log.debug(f'amount of {token_addr} = {token_balance}')
# log.debug(f'amount of {token_addr} = {token_balance}')
if token_balance is None:
# unknown balance
token_balance = balances[token_addr] = await ERC20(token_addr).balanceOf(tk.vault)
@@ -256,11 +263,12 @@ class BalanceTrigger (Trigger):
self.order = Order.of(self.tk)
self.vault_token = self.tk.vault, self.order.status.order.tokenIn
BalanceTrigger.by_vault_token[self.vault_token].add(self)
self._value_changed()
# log.debug(f'initializing Balance Trigger {id(self)} {tk} {value} {self.value}')
async def update(self, balance):
self.value = await input_amount_is_sufficient(self.order, balance)
# log.debug(f'update balance {balance} was sufficient? {self.value}')
# log.debug(f'update balance {balance} was sufficient? {self.value} {self.order.key}')
def remove(self):
try:
@@ -268,6 +276,17 @@ class BalanceTrigger (Trigger):
except (KeyError, ValueError):
pass
def _value_changed(self):
ok = self.value
order = Order.of(self.tk)
old_state = order.status.state
if not ok and old_state == SwapOrderState.Open:
order.status = order.status.copy()
order.status.state = SwapOrderState.Underfunded
elif ok and old_state == SwapOrderState.Underfunded:
order.status = order.status.copy()
order.status.state = SwapOrderState.Open
class TimeTrigger (Trigger):
@@ -304,8 +323,8 @@ class TimeTrigger (Trigger):
if time == self._time:
return
self._time = time
self.remove()
self.update_active(time_now)
in_future = time_now >= time
self.value = in_future is self.is_start
def update_active(self, time_now: int = None, time: int = None):
if time_now is None:
@@ -374,7 +393,7 @@ class PriceLineTrigger (Trigger):
if inverted:
price_now = 1/price_now
activated = value_now < price_now if is_min else value_now > price_now
log.debug(f'initial price line {value_now} {"<" if is_min else ">"} {price_now} {activated}')
# log.debug(f'initial price line {value_now} {"<" if is_min else ">"} {price_now} {activated}')
trigger_type = Trigger.TriggerType.MinLine if is_min else Trigger.TriggerType.MaxLine
super().__init__(trigger_type, tk, activated)
self.inverted = inverted
@@ -489,7 +508,8 @@ async def activate_order(order: Order):
triggers = await OrderTriggers.create(order)
if triggers.closed:
log.debug(f'order {order.key} was immediately closed')
final_state = SwapOrderState.Filled if order.remaining == 0 or order.remaining < order.min_fill_amount \
final_state = SwapOrderState.Error if triggers.error \
else SwapOrderState.Filled if order.remaining == 0 or order.remaining < order.min_fill_amount \
else SwapOrderState.Expired
order.complete(final_state)
@@ -550,13 +570,14 @@ class TrancheTrigger:
tranche_remaining = tranche.fraction_of(order.amount) - order.tranche_filled(self.tk.tranche_index)
self.status = \
TrancheState.Error if self.market_order and self.slippage < MIN_SLIPPAGE - MIN_SLIPPAGE_EPSILON else \
TrancheState.Filled if tranche_remaining == 0 or tranche_remaining < self.order.min_fill_amount else \
TrancheState.Expired if self.expiration_trigger is not None and not self.expiration_trigger else \
TrancheState.Early if self.activation_trigger is None and not self.activation_trigger else \
TrancheState.Early if self.activation_trigger is not None and not self.activation_trigger else \
TrancheState.Active
_dirty.add(tk)
TrancheTrigger.all[tk] = self
log.debug(f'Tranche {tk} initial status {self.status} {self}')
# log.debug(f'Tranche {tk} initial status {self.status} {self}')
@property
@@ -587,7 +608,8 @@ class TrancheTrigger:
else:
order_log.debug(f'tranche part-filled {self.tk} in:{_amount_in} out:{_amount_out} remaining:{remaining}')
if self.market_order:
self.expire()
order_log.debug(f'tranche {self.tk} delayed {config.slippage_control_delay} seconds due to slippage control')
self.deactivate(config.slippage_control_delay)
self.slash_count = 0 # reset slash count
def touch(self):
@@ -599,11 +621,11 @@ class TrancheTrigger:
self.order_trigger.expire_tranche(self.tk.tranche_index)
def expire(self):
self.disable()
if self.closed:
return
order_log.debug(f'tranche expired {self.tk}')
self.status = TrancheState.Expired
self.disable()
def kill(self):
order_log.warning(f'tranche KILLED {self.tk}')
@@ -619,15 +641,26 @@ class TrancheTrigger:
self.kill()
else:
delay = round(config.slash_delay_base * config.slash_delay_mul ** (self.slash_count-1))
self.deactivate(timestamp()+delay)
self.deactivate(delay)
def deactivate(self, until):
def deactivate(self, interval: Union[timedelta, int, float]):
# todo this timestamp should be consistent with the trigger time which is blockchain
now = current_clock.get().timestamp
self.deactivate_until(now + (interval.total_seconds() if isinstance(interval, timedelta) else interval))
def deactivate_until(self, until):
# Temporarily deactivate the tranche due to a rate limit. Use disable() to permanently halt the trigger.
log.debug(f'deactivating tranche {self.tk} until {from_timestamp(until)}')
now = current_clock.get().timestamp
if until < now:
return
if self.activation_trigger is None:
self.activation_trigger = TimeTrigger.create(True, self.tk, until)
else:
self.activation_trigger.time = until
self.activation_trigger.time = max(until, self.activation_trigger.time)
try:
del active_tranches[self.tk]
except KeyError:
pass
def disable(self):
# permanently stop this trigger and deconstruct
@@ -665,6 +698,10 @@ class TrancheTrigger:
def open(self):
return not self.closed
@property
def error(self):
return self.status == TrancheState.Error
def __str__(self):
trigs = []
if self.balance_trigger is not None:

View File

@@ -148,7 +148,7 @@ class MarkPool:
mark_pools: dict[str, MarkPool] = {}
quotes = [] # ordered list of preferred quote tokens
quotes = [] # ordered list of preferred quote token addresses
def add_mark_pool(addr: str, base: str, quote: str, fee: int):
@@ -200,7 +200,7 @@ async def mark_to_market_adj_dec(token: str, amount: dec, adjust_decimals=True)
return mark_to_market(token, amount)
def mark_to_market(token: str, amount: dec) -> Optional[dec]:
def mark_to_market(token: str, amount: dec = dec(1)) -> Optional[dec]:
"""
amount must already be adjusted for decimals
"""

View File

@@ -5,7 +5,6 @@ from datetime import timedelta
from typing import Any, Iterable, Callable, Optional
from eth_bloom import BloomFilter
# noinspection PyPackageRequirements
from websockets.exceptions import ConnectionClosedError
from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now, timestamp, metric
@@ -81,8 +80,7 @@ class BlockStateRunner(BlockProgressor):
async with w3ws as w3ws:
log.debug('connecting to ws provider')
await w3ws.provider.connect()
subscription = await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc.
# log.debug(f'subscribed to newHeads {subscription}')
await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc.
while self.running:
async for message in w3ws.ws.process_subscriptions():
block = Block(chain_id, message['result'])
@@ -94,11 +92,15 @@ class BlockStateRunner(BlockProgressor):
if not self.running:
break
await async_yield()
except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e:
except (TimeoutError, asyncio.TimeoutError) as e:
log.debug(f'runner timeout {e}')
except ConnectionClosedError as e:
log.info(f'websocket connection closed {e}')
except ConnectionRefusedError:
log.warning(f'Could not connect to websocket {config.ws_url}')
await asyncio.sleep(1)
except StopAsyncIteration:
log.info(f'websocket stream ended')
except Exception:
log.exception(f'Unhandled exception during run_ws()')
finally:
@@ -397,21 +399,21 @@ class BlockStateRunner(BlockProgressor):
# propragate to the DB or Redis.
# TIME TICKS ARE DISABLED FOR THIS REASON
return
current_fork.set(fork)
session = db.session
session.begin()
try:
for callback, on_timer in self.callbacks:
if on_timer:
# noinspection PyCallingNonCallable
await maywait(callback())
except BaseException:
session.rollback()
raise
else:
session.commit()
finally:
db.close_session()
# current_fork.set(fork)
# session = db.session
# session.begin()
# try:
# for callback, on_timer in self.callbacks:
# if on_timer:
# # noinspection PyCallingNonCallable
# await maywait(callback())
# except BaseException:
# session.rollback()
# raise
# else:
# session.commit()
# finally:
# db.close_session()
async def do_state_init_cbs(self):

View File

@@ -91,8 +91,9 @@ async def load_token(address: str) -> Optional[OldTokenDict]:
try:
decimals = await dec_prom
except CONTRACT_ERRORS:
log.warning(f'token {address} has no decimals()')
log.info(f'token {address} has no decimals()')
decimals = 0
return None # we do not support coins that don't specify decimals.
approved = False # never approve new coins
chain_id = current_chain.get().id
symbol = await symbol_prom

View File

@@ -2,12 +2,12 @@ import asyncio
import functools
import logging
from dexorder import current_pub
from dexorder import current_pub, dec
from dexorder.base.chain import current_chain
from dexorder.blockstate import BlockDict
from dexorder.contract import ERC20, CONTRACT_ERRORS
from dexorder.contract.dexorder import VaultContract, vault_address
from dexorder.util import json
from dexorder.util import json, align_decimal
log = logging.getLogger(__name__)
@@ -102,3 +102,6 @@ async def refresh_vault_balances(vault, *tokens):
result[t] = a
return result
vault_balances.modify(vault, functools.partial(_adjust, vault, tokens, amounts))
def pretty_balances(b: dict[str,dec], padding=8) -> str:
return '\n'.join(f'{k:>} {align_decimal(v,padding)}' for k,v in b.items())