Compare commits

...

24 Commits

Author SHA1 Message Date
tim
4936150c3b bugfixes; startall works 2025-12-09 2025-12-09 15:11:58 -04:00
tim
88057607d5 put app back on app.dexorder.com and corp site on dexorder.com with www redirecting to apex 2025-05-19 15:19:20 -04:00
tim
36d0a863c6 remove spammy debug logs 2025-05-07 16:02:37 -04:00
tim
89ce46793e dotcom 2025-05-06 13:56:05 -04:00
tim
2bcf5d043c redis pipeline overflow fix 2025-04-23 15:20:00 -04:00
tim
71942d5b8f memcache init doesn't use transaction 2025-04-23 14:13:58 -04:00
tim
ef44973646 sharedata 2025-04-23 12:51:14 -04:00
tim
ce55609297 examine open orders 2025-04-07 01:32:19 -04:00
tim
a27300b5e4 info log for websocket connection drops 2025-04-03 18:15:16 -04:00
tim
f3faaa3dd6 tranchestatus tostring touchup 2025-04-01 14:20:58 -04:00
tim
0bb670b356 redis initial state push fix 2025-04-01 13:52:49 -04:00
tim
52b406ba17 ohlc retained length fix 2025-04-01 13:52:39 -04:00
tim
3d0342d19d price line metrics fix 2025-04-01 13:52:29 -04:00
tim
dbf960bae9 initial TrancheState fix 2025-04-01 13:52:21 -04:00
tim
d49f142fe3 redis pipeline autoflush after 10000 entries 2025-04-01 10:54:25 -04:00
tim
34fa439b3c USD marks 2025-03-29 15:27:13 -04:00
tim
41a1e2d9fe MIN_SLIPPAGE epsilon leeway 2025-03-28 20:05:52 -04:00
tim
66229e67bb bugfix for 0 slippage market orders 2025-03-26 23:48:43 -04:00
tim
31b6ddd314 initial redis state load doesn't use pipeline now, because it overflowed. 2025-03-26 23:25:10 -04:00
tim
07c6423fd5 USDC/USDC.e naming update 2025-03-26 17:17:54 -04:00
tim
4740687167 account release bugfix 2025-03-19 21:05:19 -04:00
tim
a06eeeb10d bugfix 2025-03-19 17:31:34 -04:00
tim
4492d23c47 better "addrmeta is None" fix 2025-03-16 21:17:19 -04:00
tim
1c0c2f0e63 "address_meta None" fix 2025-03-15 06:26:01 -04:00
22 changed files with 329 additions and 110 deletions

View File

@@ -0,0 +1,30 @@
"""sharedata
Revision ID: e47d1bca4b3d
Revises: 509010f13e8b
Create Date: 2025-04-23 11:23:10.809341
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = 'e47d1bca4b3d'
down_revision: Union[str, None] = '509010f13e8b'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table('sharedata',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint('id')
)
def downgrade() -> None:
op.drop_table('sharedata')

File diff suppressed because one or more lines are too long

View File

@@ -19,7 +19,9 @@ class AddressMetadata (TypedDict):
def save_addrmeta(address: str, meta: AddressMetadata): def save_addrmeta(address: str, meta: AddressMetadata):
if meta['type'] == 'Token': if meta is None:
pass
elif meta['type'] == 'Token':
meta: OldTokenDict meta: OldTokenDict
updated = Token.load(meta) updated = Token.load(meta)
token = db.session.get(Token, (current_chain.get().id, address)) token = db.session.get(Token, (current_chain.get().id, address))

View File

@@ -42,10 +42,12 @@ class Account (LocalAccount):
# log.debug(f'available accounts: {Account._pool.qsize()}') # log.debug(f'available accounts: {Account._pool.qsize()}')
try: try:
async with asyncio.timeout(1): async with asyncio.timeout(1):
result = await Account._pool.get() result: "Account" = await Account._pool.get()
except asyncio.TimeoutError: except asyncio.TimeoutError:
log.error('waiting for an available account') log.error('waiting for an available account')
result = await Account._pool.get() result = await Account._pool.get()
# mark as out of pool
result._in_pool = False
metric.account_available.set(Account._pool.qsize()) metric.account_available.set(Account._pool.qsize())
return result return result
@@ -59,17 +61,20 @@ class Account (LocalAccount):
if Account._main_account is None: if Account._main_account is None:
Account._main_account = account Account._main_account = account
Account._pool.put_nowait(account) Account._pool.put_nowait(account)
account._in_pool = True # this account is now in the pool
Account._all.append(account) Account._all.append(account)
metric.account_available.set(Account._pool.qsize()) metric.account_available.set(Account._pool.qsize())
metric.account_total.set(len(Account._all)) metric.account_total.set(len(Account._all))
log.info(f'Account pool {[a.address for a in Account._all]}') log.info(f'Account pool {[a.address for a in Account._all]}')
def __init__(self, local_account: LocalAccount): # todo chain_id? def __init__(self, local_account: LocalAccount): # todo chain_id?
super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code
self.chain_id = current_chain.get().id self.chain_id = current_chain.get().id
self.signing_middleware = construct_sign_and_send_raw_middleware(self) self.signing_middleware = construct_sign_and_send_raw_middleware(self)
self._nonce: Optional[int] = None self._nonce: Optional[int] = None
self.tx_id: Optional[str] = None # current transaction id self.tx_id: Optional[str] = None # current transaction id
# release() idempotency tracking
self._in_pool: bool = False
async def next_nonce(self): async def next_nonce(self):
if self._nonce is None: if self._nonce is None:
@@ -86,8 +91,21 @@ class Account (LocalAccount):
return current_w3.get().eth.get_balance(self.address) return current_w3.get().eth.get_balance(self.address)
def release(self): def release(self):
metric.account_available.set(Account._pool.qsize() + 1) """
Return this Account to the pool.
Idempotent: calling release() multiple times without a new acquire()
will only enqueue the account once.
"""
# If we're already in the pool, do nothing.
if self._in_pool:
# Optional debug log; comment out if too noisy.
# log.debug(f'Account {self.address} already in pool; ignoring extra release()')
return
Account._pool.put_nowait(self) Account._pool.put_nowait(self)
self._in_pool = True
metric.account_available.set(Account._pool.qsize())
def __str__(self): def __str__(self):
return self.address return self.address

View File

@@ -283,6 +283,9 @@ DISTANT_FUTURE = 4294967295 # max uint32
MAX_FRACTION = 65535 # max uint16 MAX_FRACTION = 65535 # max uint16
MIN_SLIPPAGE = 0.0001 # one bip
MIN_SLIPPAGE_EPSILON = 0.000000000003
@dataclass @dataclass
class Tranche: class Tranche:
@@ -372,11 +375,11 @@ class Tranche:
if self.minLine.intercept or self.minLine.slope: if self.minLine.intercept or self.minLine.slope:
msg += f' >{self.minLine.intercept:.5g}' msg += f' >{self.minLine.intercept:.5g}'
if self.minLine.slope: if self.minLine.slope:
msg += f'{self.minLine.slope:+.5g}/s({self.minLine.value():5g})' msg += f'{self.minLine.slope:+.5g}/s={self.minLine.value():5g}'
if self.maxLine.intercept or self.maxLine.slope: if self.maxLine.intercept or self.maxLine.slope:
msg += f' <{self.maxLine.intercept:.5g}' msg += f' <{self.maxLine.intercept:.5g}'
if self.maxLine.slope: if self.maxLine.slope:
msg += f'{self.maxLine.slope:+.5g}/s({self.maxLine.value():5g})' msg += f'{self.maxLine.slope:+.5g}/s={self.maxLine.value():5g}'
if self.rateLimitPeriod: if self.rateLimitPeriod:
msg += f' {self.rateLimitFraction/MAX_FRACTION:.1%} every {self.rateLimitPeriod/60:.0} minutes' msg += f' {self.rateLimitFraction/MAX_FRACTION:.1%} every {self.rateLimitPeriod/60:.0} minutes'
return msg return msg

View File

@@ -7,20 +7,33 @@ from dexorder.blocks import current_block, get_block
from dexorder.blockstate import current_blockstate from dexorder.blockstate import current_blockstate
from dexorder.blockstate.blockdata import BlockData from dexorder.blockstate.blockdata import BlockData
from dexorder.blockstate.db_state import DbState from dexorder.blockstate.db_state import DbState
from dexorder.blockstate.fork import current_fork
from dexorder.contract.dexorder import VaultContract from dexorder.contract.dexorder import VaultContract
from dexorder.order.orderstate import Order from dexorder.order.orderstate import Order
from dexorder.tokens import adjust_decimals from dexorder.tokens import adjust_decimals
from dexorder.util import json
from dexorder.vault_blockdata import vault_balances, pretty_balances from dexorder.vault_blockdata import vault_balances, pretty_balances
from dexorder.bin.executable import execute from dexorder.bin.executable import execute
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
async def dump_orders(orders, args):
if args.json:
print(json.dumps([order.status.dump() for order in orders]))
else:
first = True
for order in orders:
if first:
first = False
else:
print()
print(await order.pprint())
def command_vault_argparse(subparsers): def command_vault_argparse(subparsers):
parser = subparsers.add_parser('vault', help='show the vault\'s balances and orders') parser = subparsers.add_parser('vault', help='show the vault\'s balances and orders')
parser.add_argument('address', help='address of the vault') parser.add_argument('address', help='address of the vault')
parser.add_argument('--all', help='show all orders including closed ones', action='store_true') parser.add_argument('--all', help='show all orders including closed ones', action='store_true')
# parser.add_argument('--json', help='output in JSON format', action='store_true') parser.add_argument('--json', help='output in JSON format', action='store_true')
async def command_vault(args): async def command_vault(args):
balances = vault_balances.get(args.address, {}) balances = vault_balances.get(args.address, {})
@@ -29,6 +42,7 @@ async def command_vault(args):
print(pretty_balances({k: (await adjust_decimals(k, v)) for k, v in balances.items()})) print(pretty_balances({k: (await adjust_decimals(k, v)) for k, v in balances.items()}))
print(f'Orders:') print(f'Orders:')
i = 0 i = 0
orders = []
while True: while True:
key = OrderKey(args.address, i) key = OrderKey(args.address, i)
try: try:
@@ -36,16 +50,19 @@ async def command_vault(args):
except KeyError: except KeyError:
break break
if args.all or order.is_open: if args.all or order.is_open:
print(await order.pprint()) orders.append(order)
i += 1 i += 1
await dump_orders(orders, args)
def command_open_argparse(subparsers):
parser = subparsers.add_parser('open', help='show all open orders')
parser.add_argument('--json', help='output in JSON format', action='store_true')
async def command_open(args):
await dump_orders([Order.of(key) for key in Order.open_orders], args)
# for key in Order.open_orders:
# order = Order.of(key)
# if args.json:
# print(json.dumps(order.status.dump()))
# else:
# print()
# print(order)
async def main(args: list): async def main(args: list):
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
@@ -66,10 +83,11 @@ async def main(args: list):
db_state = DbState(BlockData.by_opt('db')) db_state = DbState(BlockData.by_opt('db'))
with db.transaction(): with db.transaction():
state = await db_state.load() state = await db_state.load()
state.readonly = True # state.readonly = True
current_blockstate.set(state) current_blockstate.set(state)
block = await get_block(state.root_hash) block = await get_block(state.root_hash)
current_block.set(block) current_block.set(block)
current_fork.set(state.root_fork)
await subcommand(parsed) await subcommand(parsed)

View File

@@ -15,6 +15,7 @@ from dexorder.contract.dexorder import get_dexorder_contract
from dexorder.event_handler import (init, dump_log, handle_vault_created, handle_order_placed, from dexorder.event_handler import (init, dump_log, handle_vault_created, handle_order_placed,
handle_transfer, handle_swap_filled, handle_order_canceled, handle_order_cancel_all, handle_transfer, handle_swap_filled, handle_order_canceled, handle_order_cancel_all,
handle_uniswap_swaps, handle_vault_impl_changed, update_metrics) handle_uniswap_swaps, handle_vault_impl_changed, update_metrics)
from dexorder.marks import publish_marks
from dexorder.memcache import memcache from dexorder.memcache import memcache
from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.order.executionhandler import handle_dexorderexecutions, execute_tranches from dexorder.order.executionhandler import handle_dexorderexecutions, execute_tranches
@@ -83,6 +84,7 @@ def setup_logevent_triggers(runner):
# runner.add_callback(adjust_gas) # runner.add_callback(adjust_gas)
runner.add_callback(cleanup_jobs) runner.add_callback(cleanup_jobs)
runner.add_callback(publish_marks)
runner.add_callback(update_metrics) runner.add_callback(update_metrics)
@@ -115,7 +117,7 @@ async def main():
if redis_state: if redis_state:
# load initial state # load initial state
log.info('initializing redis with root state') log.info('initializing redis with root state')
await redis_state.save(state.root_fork, state.diffs_by_branch[state.root_branch.id]) await redis_state.init(state, state.root_fork)
await initialize_accounting_runner() await initialize_accounting_runner()

View File

@@ -95,7 +95,7 @@ async def write_metadata( pools, mirror_pools ):
pool_dicts = [get_pool(addr) for (addr,_inverted) in mirror_pools] pool_dicts = [get_pool(addr) for (addr,_inverted) in mirror_pools]
pool_dicts = await asyncio.gather(*pool_dicts) pool_dicts = await asyncio.gather(*pool_dicts)
for data, addr, (_,inverted) in zip(pool_dicts, pools, mirror_pools): for data, addr, (_,inverted) in zip(pool_dicts, pools, mirror_pools):
data['x'] = dict(data=dict(uri=f'https://app.dexorder.trade/ohlc/', chain=42161, symbol=addr, inverted=inverted)) data['x'] = dict(data=dict(uri=f'https://app.dexorder.com/ohlc/', chain=42161, symbol=addr, inverted=inverted))
tokens = set(p['base'] for p in pool_dicts) tokens = set(p['base'] for p in pool_dicts)
tokens.update(p['quote'] for p in pool_dicts) tokens.update(p['quote'] for p in pool_dicts)
tokens = await asyncio.gather(*[get_token(t) for t in tokens]) tokens = await asyncio.gather(*[get_token(t) for t in tokens])
@@ -190,6 +190,7 @@ async def main():
while True: while True:
wake_up = now() + delay wake_up = now() + delay
# log.debug(f'querying {pool}') # log.debug(f'querying {pool}')
tx = None
try: try:
price = await get_pool_price(pool) price = await get_pool_price(pool)
if price != last_prices.get(pool): if price != last_prices.get(pool):
@@ -200,7 +201,10 @@ async def main():
addr, inverted = mirror_pools[pool] addr, inverted = mirror_pools[pool]
log.debug(f'Mirrored {addr} {price}') log.debug(f'Mirrored {addr} {price}')
except Exception as x: except Exception as x:
log.debug(f'Could not update {pool}: {x}') log.debug(f'Could not update {pool}: {x} {tx}')
if tx is not None:
tx.account.reset_nonce()
tx.account.release()
continue continue
try: try:
pool = next(pool_iter) pool = next(pool_iter)

View File

@@ -42,6 +42,8 @@ class Config:
fee_leeway = 0.1 # do not adjust fees if they are within this proportion fee_leeway = 0.1 # do not adjust fees if they are within this proportion
min_gas: str = '0' min_gas: str = '0'
mark_publish_seconds: float = 60 # publish mark prices every this number of seconds
# Order slashing # Order slashing
slash_kill_count: int = 5 slash_kill_count: int = 5
slash_delay_base: float = 60 # one minute slash_delay_base: float = 60 # one minute

View File

@@ -33,7 +33,8 @@ class ContractTransaction:
async def wait(self) -> TxReceipt: async def wait(self) -> TxReceipt:
if self.receipt is None: if self.receipt is None:
self.receipt = await current_w3.get().eth.wait_for_transaction_receipt(self.id) self.receipt = await current_w3.get().eth.wait_for_transaction_receipt(self.id)
self.account.release() if self.account is not None:
self.account.release()
return self.receipt return self.receipt
async def sign(self, account: Account): async def sign(self, account: Account):
@@ -81,14 +82,17 @@ def transact_wrapper(addr, name, func):
account = await Account.acquire() account = await Account.acquire()
if account is None: if account is None:
raise ValueError(f'No account to sign transaction {addr}.{name}()') raise ValueError(f'No account to sign transaction {addr}.{name}()')
await ct.sign(account)
try: try:
tx_id = await current_w3.get().eth.send_raw_transaction(ct.data) await ct.sign(account)
assert tx_id == ct.id_bytes try:
return ct tx_id = await current_w3.get().eth.send_raw_transaction(ct.data)
except Web3Exception as e: assert tx_id == ct.id_bytes
e.args += addr, name return ct
raise e except Web3Exception as e:
e.args += addr, name
raise e
finally:
account.release()
return f return f
@@ -150,10 +154,14 @@ class ContractProxy:
def __getattr__(self, item): def __getattr__(self, item):
if item == 'constructor': if item == 'constructor':
found = self.contract.constructor found = self.contract.constructor
elif item in self.contract.functions:
found = self.contract.functions[item]
else: else:
raise AttributeError(item) funcs = self.contract.functions
# In web3.py v6+, contract functions are exposed as attributes, not via __getitem__.
# Using getattr ensures we obtain the callable factory for the function; indexing may return None.
# Additionally, guard against unexpected None to fail fast with a clear error.
found = getattr(funcs, item, None)
if not callable(found):
raise AttributeError(f"Function '{item}' not found on contract {self._interface_name} at {self.address}")
return self._wrapper(self.address, item, found) return self._wrapper(self.address, item, found)
def __repr__(self): def __repr__(self):

View File

@@ -10,3 +10,4 @@ from .ofac import OFAC, OFACAlerts
from .accounting import Accounting, DbAccount from .accounting import Accounting, DbAccount
from .vaultcreationrequest import VaultCreationRequest from .vaultcreationrequest import VaultCreationRequest
from .tos import TOSAcceptance from .tos import TOSAcceptance
from .sharedata import ShareData

View File

@@ -0,0 +1,12 @@
import logging
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from dexorder.database.model import Base
log = logging.getLogger(__name__)
class ShareData (Base):
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
data: Mapped[dict] = mapped_column(JSONB)

View File

@@ -220,7 +220,7 @@ async def update_metrics():
metric.vaults.set(vault_owners.upper_len()) metric.vaults.set(vault_owners.upper_len())
metric.open_orders.set(Order.open_orders.upper_len()) metric.open_orders.set(Order.open_orders.upper_len())
metric.triggers_time.set(len(TimeTrigger.all)) metric.triggers_time.set(len(TimeTrigger.all))
metric.triggers_line.set(len(PriceLineTrigger.triggers_set)) metric.triggers_line.set(sum(len(s) for s in PriceLineTrigger.by_pool.values()))
# slow updates # slow updates
global slow_metric_update global slow_metric_update

44
src/dexorder/marks.py Normal file
View File

@@ -0,0 +1,44 @@
"""
"marks" are mark-to-market USD values of a selected set of tokens called quote tokens. Publishing a set of USD marks
for the quote tokens allows almost any token to be marked to USD via one hop.
"""
import logging
import time
from dexorder import dec, NATIVE_TOKEN, config
from dexorder.base.chain import current_chain
from dexorder.blockstate import BlockDict
from dexorder.pools import quotes, mark_to_market
log = logging.getLogger(__name__)
def pub_marks(_s,k,v):
chain_id = current_chain.get().id
return str(chain_id), 'marks.usd', (chain_id, k, str(v))
marks: BlockDict[str, dec] = BlockDict('mark.usd', db=False, redis=True, pub=pub_marks, value2str=str)
class RateLimiter:
def __init__(self, rate: float):
self.rate = rate
self.last_update = 0.0
def ready(self):
now = time.monotonic()
if now - self.last_update < self.rate:
return False
self.last_update = now
return True
mark_publish_rate = RateLimiter(config.mark_publish_seconds)
def publish_marks():
if mark_publish_rate.ready():
for token_addr in [NATIVE_TOKEN]+quotes:
# overwrite=False checks the previous value and does not generate a diff if the values match. This prevents
# excessive updates to Redis
value = mark_to_market(token_addr)
if value is not None:
marks.setitem(token_addr, value, overwrite=False)

View File

@@ -1,3 +1,4 @@
import itertools
import logging import logging
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from contextvars import ContextVar from contextvars import ContextVar
@@ -10,16 +11,70 @@ from dexorder import config
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
BATCH_SIZE = 1_000
class PipelineProxy:
def __init__(self, pipe: Pipeline):
self.pipe = pipe
self.ops = 0
async def push(self, num=1):
self.ops += num
if self.ops >= BATCH_SIZE:
self.ops = 0
await self.pipe.execute()
async def sadd(self, series, *keys):
while keys:
most = min(BATCH_SIZE-self.ops, len(keys))
assert most > 0
send = keys[:most]
keys = keys[most:]
await self.pipe.sadd(series, *send)
await self.push(len(send))
async def srem(self, series, *keys):
while keys:
most = min(BATCH_SIZE-self.ops, len(keys))
assert most > 0
send = keys[:most]
keys = keys[most:]
await self.pipe.srem(series, *send)
await self.push(len(send))
async def hset(self, series, *, mapping):
items = list(mapping.items())
while items:
most = min(BATCH_SIZE-self.ops, len(items))
assert most > 0
send = items[:most]
items = items[most:]
await self.pipe.hset(series, mapping={k:v for k,v in send})
await self.push(len(send))
async def hdel(self, series, *keys):
while keys:
most = min(BATCH_SIZE-self.ops, len(keys))
assert most > 0
send = keys[:most]
keys = keys[most:]
await self.pipe.hdel(series, *send)
await self.push(len(send))
def __getattr__(self, item):
return getattr(self.pipe, item)
class Memcache: class Memcache:
@staticmethod @staticmethod
@asynccontextmanager @asynccontextmanager
async def batch(): async def batch(transaction=True):
old_redis: Redis = current_redis.get() old_redis: Redis = current_redis.get()
pipe: Pipeline = old_redis.pipeline() pipe = old_redis.pipeline(transaction=transaction)
# noinspection PyTypeChecker
current_redis.set(pipe) current_redis.set(pipe)
try: try:
yield pipe yield PipelineProxy(pipe)
await pipe.execute() await pipe.execute()
finally: finally:
current_redis.set(old_redis) current_redis.set(old_redis)

View File

@@ -12,7 +12,7 @@ from dexorder.blockstate.blockdata import SeriesCollection, BlockData
from dexorder.blockstate.diff import DiffEntryItem from dexorder.blockstate.diff import DiffEntryItem
from dexorder.blockstate.fork import Fork from dexorder.blockstate.fork import Fork
from dexorder.blockstate.state import compress_diffs from dexorder.blockstate.state import compress_diffs
from dexorder.memcache import current_redis, memcache from dexorder.memcache import current_redis, memcache, PipelineProxy
from dexorder.util import hexstr from dexorder.util import hexstr
from dexorder.util.async_util import maywait from dexorder.util.async_util import maywait
from dexorder.util.json import json_encoder from dexorder.util.json import json_encoder
@@ -40,11 +40,11 @@ class RedisState (SeriesCollection):
for series in self.datas.keys(): for series in self.datas.keys():
for k, v in state.iteritems(fork, series): for k, v in state.iteritems(fork, series):
diffs.append(DiffItem(series, k, v)) diffs.append(DiffItem(series, k, v))
await self.save(fork, diffs) await self.save(fork, diffs, use_transaction=False, skip_pubs=True) # use_transaction=False if the data is too big
# noinspection PyAsyncCall # noinspection PyAsyncCall
async def save(self, fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): async def save(self, fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]], *, use_transaction=True, skip_pubs=False):
# the diffs must be already compressed such that there is only one action per key # the diffs must be already compressed such that there is only one action per key
chain = current_chain.get() chain = current_chain.get()
chain_id = chain.id chain_id = chain.id
@@ -91,22 +91,23 @@ class RedisState (SeriesCollection):
hsets[series][key] = value hsets[series][key] = value
else: else:
raise NotImplementedError raise NotImplementedError
async with memcache.batch() as r:
r: Pipeline async with memcache.batch(use_transaction) as r:
r: PipelineProxy
for series, keys in sadds.items(): for series, keys in sadds.items():
r.sadd(series, *keys) await r.sadd(series, *keys)
for series, keys in sdels.items(): for series, keys in sdels.items():
r.srem(series, *keys) await r.srem(series, *keys)
for series, kvs in hsets.items(): for series, kvs in hsets.items():
r.hset(series, mapping=kvs) await r.hset(series, mapping=kvs)
for series, keys in hdels.items(): for series, keys in hdels.items():
r.hdel(series, *keys) await r.hdel(series, *keys)
block_series = f'{chain_id}|head' block_series = f'{chain_id}|head'
headstr = hexstr(fork.head) headstr = hexstr(fork.head)
r.json(json_encoder).set(block_series,'$',[fork.height, headstr]) r.json(json_encoder).set(block_series,'$',[fork.height, headstr])
pubs.append((str(chain_id), 'head', [fork.height, headstr])) pubs.append((str(chain_id), 'head', [fork.height, headstr]))
# separate batch for pubs # separate batch for pubs
if pubs: if pubs and not skip_pubs:
await publish_all(pubs) await publish_all(pubs)

View File

@@ -359,7 +359,7 @@ class OHLCRepository:
if price is None, then bars are advanced based on the time but no new price is added to the series. if price is None, then bars are advanced based on the time but no new price is added to the series.
""" """
if OHLC_LIMIT_POOLS_DEBUG is not None and (symbol,period) not in OHLC_LIMIT_POOLS_DEBUG: if OHLC_LIMIT_POOLS_DEBUG is not None and (symbol,period) not in OHLC_LIMIT_POOLS_DEBUG:
return return None
# logname = f'{symbol} {period_name(period)}' # logname = f'{symbol} {period_name(period)}'
# log.debug(f'Updating OHLC {logname} {minutely(time)} {price}') # log.debug(f'Updating OHLC {logname} {minutely(time)} {price}')
if price is not None: if price is not None:
@@ -371,33 +371,31 @@ class OHLCRepository:
# log.debug(f'got recent {historical}') # log.debug(f'got recent {historical}')
if not historical: if not historical:
if create is False or price is None: if create is False or price is None:
return # do not track symbols which have not been explicity set up return None # do not track symbols which have not been explicity set up
historical = []
updated = [NativeOHLC(ohlc_start_time(time, period), price, price, price, price)] updated = [NativeOHLC(ohlc_start_time(time, period), price, price, price, price)]
# log.debug(f'\tcreated new bars {updated}') # log.debug(f'\tcreated new bars {updated}')
else: else:
updated = update_ohlc(historical[-1], period, time, price) updated = update_ohlc(historical[-1], period, time, price)
# drop any historical bars that are older than we need # overlap the updated OHLC's on top of the historical ones
# oldest_needed = cover the root block time plus one period prior
root_branch = current_blockstate.get().root_branch
root_hash = root_branch.head
if root_hash is not None:
root_timestamp = await get_block_timestamp(root_hash)
oldest_needed = from_timestamp(root_timestamp) - period
# noinspection PyTypeChecker
trim = (oldest_needed - historical[0].start) // period
if trim > 0:
historical = historical[trim:]
# now overlap the updated data on top of the historical data
if not historical or not updated:
updated = historical + updated
else:
last_bar = historical[-1].start last_bar = historical[-1].start
first_updated = updated[0].start first_updated = updated[0].start
overlap = (first_updated - last_bar) // period + 1 overlap = (first_updated - last_bar) // period + 1
updated = historical[:-overlap] + updated if overlap > 0 else historical + updated updated = historical[:-overlap] + updated if overlap > 0 else historical + updated
# log.debug(f'\tnew recents: {updated}')
# drop any bars that are older than we need
# oldest_needed = cover the root block time plus one period prior
root_branch = current_blockstate.get().root_branch
root_hash = root_branch.head
if root_hash is not None:
root_timestamp = await get_block_timestamp(root_hash)
oldest_needed = from_timestamp(root_timestamp) - period
# noinspection PyTypeChecker
trim = (oldest_needed - updated[0].start) // period
if trim > 0:
updated = updated[trim:]
# if len(updated) > 3:
# log.debug(f'\tnew recents ({len(updated)}): {updated}')
recent_ohlcs.setitem(key, updated) recent_ohlcs.setitem(key, updated)
return updated return updated

View File

@@ -6,7 +6,7 @@ from uuid import UUID
from web3.exceptions import ContractPanicError, ContractLogicError from web3.exceptions import ContractPanicError, ContractLogicError
from web3.types import EventData from web3.types import EventData
from dexorder import db, metric from dexorder import db, metric, config
from dexorder.accounting import accounting_transaction_gas from dexorder.accounting import accounting_transaction_gas
from dexorder.base import TransactionReceiptDict, TransactionRequest, transaction_request_deserializers from dexorder.base import TransactionReceiptDict, TransactionRequest, transaction_request_deserializers
from dexorder.base.order import TrancheKey, OrderKey from dexorder.base.order import TrancheKey, OrderKey
@@ -121,6 +121,11 @@ async def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
if trig is not None: if trig is not None:
trig.touch() trig.touch()
def delay(secs=None):
trig = get_trigger()
if trig is not None:
trig.deactivate(secs if secs is not None else config.slippage_control_delay)
if error is None: if error is None:
metric.executions.inc() metric.executions.inc()
else: else:
@@ -162,6 +167,7 @@ async def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
retry() retry()
elif error == 'RL': elif error == 'RL':
log.debug(f'tranche {tk} execution failed due to "RL" rate limit') log.debug(f'tranche {tk} execution failed due to "RL" rate limit')
delay()
retry() retry()
elif error == 'TE': elif error == 'TE':
log.debug(f'tranche {tk} execution failed due to "TE" too early') log.debug(f'tranche {tk} execution failed due to "TE" too early')

View File

@@ -295,17 +295,17 @@ class Order:
async def pprint(self): async def pprint(self):
amount_token = self.order.tokenIn if self.order.amountIsInput else self.order.tokenOut amount_token = self.order.tokenIn if self.order.amountIsInput else self.order.tokenOut
msg = f''' msg = f'''
SwapOrder {self.key} SwapOrder {self.key}
status: {self.state.name} status: {self.state.name}
placed: {from_timestamp(self.status.startTime)} placed: {from_timestamp(self.status.startTime)}
in: {self.order.tokenIn} in: {self.order.tokenIn}
out: {self.order.tokenOut} out: {self.order.tokenOut}
exchange: {self.order.route.exchange.name, self.order.route.fee} exchange: {self.order.route.exchange.name, self.order.route.fee}
amount: {"input" if self.order.amountIsInput else "output"} {await adjust_decimals(amount_token, self.filled):f}/{await adjust_decimals(amount_token, self.amount):f}{" to owner" if self.order.outputDirectlyToOwner else ""} amount: {"input" if self.order.amountIsInput else "output"} {await adjust_decimals(amount_token, self.filled):f}/{await adjust_decimals(amount_token, self.amount):f}{" to owner" if self.order.outputDirectlyToOwner else ""}
minFill: {await adjust_decimals(amount_token, self.min_fill_amount):f} minFill: {await adjust_decimals(amount_token, self.min_fill_amount):f}
inverted: {self.order.inverted} inverted: {self.order.inverted}
tranches: tranches:
''' '''
for i in range(len(self.order.tranches)): for i in range(len(self.order.tranches)):
tranche = self.order.tranches[i] tranche = self.order.tranches[i]
msg += f' {tranche}' msg += f' {tranche}'

View File

@@ -9,7 +9,8 @@ from typing import Optional, Sequence, Union
import numpy as np import numpy as np
from sortedcontainers import SortedList from sortedcontainers import SortedList
from dexorder.base.orderlib import SwapOrderState, PriceProof, DISTANT_FUTURE, DISTANT_PAST, Line from dexorder.base.orderlib import SwapOrderState, PriceProof, DISTANT_FUTURE, DISTANT_PAST, Line, MIN_SLIPPAGE, \
MIN_SLIPPAGE_EPSILON
from dexorder.blockstate import BlockDict from dexorder.blockstate import BlockDict
from .orderstate import Order from .orderstate import Order
from .. import dec, order_log, timestamp, from_timestamp, config from .. import dec, order_log, timestamp, from_timestamp, config
@@ -56,13 +57,13 @@ class OrderTriggers:
self.order = order self.order = order
self.triggers = triggers self.triggers = triggers
OrderTriggers.instances[order.key] = self OrderTriggers.instances[order.key] = self
log.debug(f'created OrderTriggers for {order.key}') # log.debug(f'created OrderTriggers for {order.key}')
def disable(self): def disable(self):
for t in self.triggers: for t in self.triggers:
t.disable() t.disable()
del OrderTriggers.instances[self.order.key] del OrderTriggers.instances[self.order.key]
log.debug(f'disabled OrderTriggers for {self.order.key}') # log.debug(f'disabled OrderTriggers for {self.order.key}')
@property @property
def closed(self): def closed(self):
@@ -72,6 +73,10 @@ class OrderTriggers:
def open(self): def open(self):
return not self.closed return not self.closed
@property
def error(self):
return any(t.error for t in self.triggers)
def check_complete(self): def check_complete(self):
if self.closed: if self.closed:
final_state = SwapOrderState.Filled if self.order.remaining == 0 or self.order.remaining < self.order.min_fill_amount else SwapOrderState.Expired final_state = SwapOrderState.Filled if self.order.remaining == 0 or self.order.remaining < self.order.min_fill_amount else SwapOrderState.Expired
@@ -218,7 +223,7 @@ async def has_funds(tk: TrancheKey):
# log.debug(f'balances {balances}') # log.debug(f'balances {balances}')
token_addr = order.status.order.tokenIn token_addr = order.status.order.tokenIn
token_balance = balances.get(token_addr) token_balance = balances.get(token_addr)
log.debug(f'amount of {token_addr} = {token_balance}') # log.debug(f'amount of {token_addr} = {token_balance}')
if token_balance is None: if token_balance is None:
# unknown balance # unknown balance
token_balance = balances[token_addr] = await ERC20(token_addr).balanceOf(tk.vault) token_balance = balances[token_addr] = await ERC20(token_addr).balanceOf(tk.vault)
@@ -388,7 +393,7 @@ class PriceLineTrigger (Trigger):
if inverted: if inverted:
price_now = 1/price_now price_now = 1/price_now
activated = value_now < price_now if is_min else value_now > price_now activated = value_now < price_now if is_min else value_now > price_now
log.debug(f'initial price line {value_now} {"<" if is_min else ">"} {price_now} {activated}') # log.debug(f'initial price line {value_now} {"<" if is_min else ">"} {price_now} {activated}')
trigger_type = Trigger.TriggerType.MinLine if is_min else Trigger.TriggerType.MaxLine trigger_type = Trigger.TriggerType.MinLine if is_min else Trigger.TriggerType.MaxLine
super().__init__(trigger_type, tk, activated) super().__init__(trigger_type, tk, activated)
self.inverted = inverted self.inverted = inverted
@@ -503,7 +508,8 @@ async def activate_order(order: Order):
triggers = await OrderTriggers.create(order) triggers = await OrderTriggers.create(order)
if triggers.closed: if triggers.closed:
log.debug(f'order {order.key} was immediately closed') log.debug(f'order {order.key} was immediately closed')
final_state = SwapOrderState.Filled if order.remaining == 0 or order.remaining < order.min_fill_amount \ final_state = SwapOrderState.Error if triggers.error \
else SwapOrderState.Filled if order.remaining == 0 or order.remaining < order.min_fill_amount \
else SwapOrderState.Expired else SwapOrderState.Expired
order.complete(final_state) order.complete(final_state)
@@ -564,13 +570,14 @@ class TrancheTrigger:
tranche_remaining = tranche.fraction_of(order.amount) - order.tranche_filled(self.tk.tranche_index) tranche_remaining = tranche.fraction_of(order.amount) - order.tranche_filled(self.tk.tranche_index)
self.status = \ self.status = \
TrancheState.Error if self.market_order and self.slippage < MIN_SLIPPAGE - MIN_SLIPPAGE_EPSILON else \
TrancheState.Filled if tranche_remaining == 0 or tranche_remaining < self.order.min_fill_amount else \ TrancheState.Filled if tranche_remaining == 0 or tranche_remaining < self.order.min_fill_amount else \
TrancheState.Expired if self.expiration_trigger is not None and not self.expiration_trigger else \ TrancheState.Expired if self.expiration_trigger is not None and not self.expiration_trigger else \
TrancheState.Early if self.activation_trigger is None and not self.activation_trigger else \ TrancheState.Early if self.activation_trigger is not None and not self.activation_trigger else \
TrancheState.Active TrancheState.Active
_dirty.add(tk) _dirty.add(tk)
TrancheTrigger.all[tk] = self TrancheTrigger.all[tk] = self
log.debug(f'Tranche {tk} initial status {self.status} {self}') # log.debug(f'Tranche {tk} initial status {self.status} {self}')
@property @property
@@ -614,11 +621,11 @@ class TrancheTrigger:
self.order_trigger.expire_tranche(self.tk.tranche_index) self.order_trigger.expire_tranche(self.tk.tranche_index)
def expire(self): def expire(self):
self.disable()
if self.closed: if self.closed:
return return
order_log.debug(f'tranche expired {self.tk}') order_log.debug(f'tranche expired {self.tk}')
self.status = TrancheState.Expired self.status = TrancheState.Expired
self.disable()
def kill(self): def kill(self):
order_log.warning(f'tranche KILLED {self.tk}') order_log.warning(f'tranche KILLED {self.tk}')
@@ -643,11 +650,13 @@ class TrancheTrigger:
def deactivate_until(self, until): def deactivate_until(self, until):
# Temporarily deactivate the tranche due to a rate limit. Use disable() to permanently halt the trigger. # Temporarily deactivate the tranche due to a rate limit. Use disable() to permanently halt the trigger.
log.debug(f'deactivating tranche {self.tk} until {from_timestamp(until)}') now = current_clock.get().timestamp
if until < now:
return
if self.activation_trigger is None: if self.activation_trigger is None:
self.activation_trigger = TimeTrigger.create(True, self.tk, until) self.activation_trigger = TimeTrigger.create(True, self.tk, until)
else: else:
self.activation_trigger.time = until self.activation_trigger.time = max(until, self.activation_trigger.time)
try: try:
del active_tranches[self.tk] del active_tranches[self.tk]
except KeyError: except KeyError:
@@ -689,6 +698,10 @@ class TrancheTrigger:
def open(self): def open(self):
return not self.closed return not self.closed
@property
def error(self):
return self.status == TrancheState.Error
def __str__(self): def __str__(self):
trigs = [] trigs = []
if self.balance_trigger is not None: if self.balance_trigger is not None:

View File

@@ -148,7 +148,7 @@ class MarkPool:
mark_pools: dict[str, MarkPool] = {} mark_pools: dict[str, MarkPool] = {}
quotes = [] # ordered list of preferred quote tokens quotes = [] # ordered list of preferred quote token addresses
def add_mark_pool(addr: str, base: str, quote: str, fee: int): def add_mark_pool(addr: str, base: str, quote: str, fee: int):
@@ -200,7 +200,7 @@ async def mark_to_market_adj_dec(token: str, amount: dec, adjust_decimals=True)
return mark_to_market(token, amount) return mark_to_market(token, amount)
def mark_to_market(token: str, amount: dec) -> Optional[dec]: def mark_to_market(token: str, amount: dec = dec(1)) -> Optional[dec]:
""" """
amount must already be adjusted for decimals amount must already be adjusted for decimals
""" """

View File

@@ -5,7 +5,6 @@ from datetime import timedelta
from typing import Any, Iterable, Callable, Optional from typing import Any, Iterable, Callable, Optional
from eth_bloom import BloomFilter from eth_bloom import BloomFilter
# noinspection PyPackageRequirements
from websockets.exceptions import ConnectionClosedError from websockets.exceptions import ConnectionClosedError
from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now, timestamp, metric from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now, timestamp, metric
@@ -81,8 +80,7 @@ class BlockStateRunner(BlockProgressor):
async with w3ws as w3ws: async with w3ws as w3ws:
log.debug('connecting to ws provider') log.debug('connecting to ws provider')
await w3ws.provider.connect() await w3ws.provider.connect()
subscription = await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc. await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc.
# log.debug(f'subscribed to newHeads {subscription}')
while self.running: while self.running:
async for message in w3ws.ws.process_subscriptions(): async for message in w3ws.ws.process_subscriptions():
block = Block(chain_id, message['result']) block = Block(chain_id, message['result'])
@@ -94,11 +92,15 @@ class BlockStateRunner(BlockProgressor):
if not self.running: if not self.running:
break break
await async_yield() await async_yield()
except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e: except (TimeoutError, asyncio.TimeoutError) as e:
log.debug(f'runner timeout {e}') log.debug(f'runner timeout {e}')
except ConnectionClosedError as e:
log.info(f'websocket connection closed {e}')
except ConnectionRefusedError: except ConnectionRefusedError:
log.warning(f'Could not connect to websocket {config.ws_url}') log.warning(f'Could not connect to websocket {config.ws_url}')
await asyncio.sleep(1) await asyncio.sleep(1)
except StopAsyncIteration:
log.info(f'websocket stream ended')
except Exception: except Exception:
log.exception(f'Unhandled exception during run_ws()') log.exception(f'Unhandled exception during run_ws()')
finally: finally:
@@ -397,21 +399,21 @@ class BlockStateRunner(BlockProgressor):
# propragate to the DB or Redis. # propragate to the DB or Redis.
# TIME TICKS ARE DISABLED FOR THIS REASON # TIME TICKS ARE DISABLED FOR THIS REASON
return return
current_fork.set(fork) # current_fork.set(fork)
session = db.session # session = db.session
session.begin() # session.begin()
try: # try:
for callback, on_timer in self.callbacks: # for callback, on_timer in self.callbacks:
if on_timer: # if on_timer:
# noinspection PyCallingNonCallable # # noinspection PyCallingNonCallable
await maywait(callback()) # await maywait(callback())
except BaseException: # except BaseException:
session.rollback() # session.rollback()
raise # raise
else: # else:
session.commit() # session.commit()
finally: # finally:
db.close_session() # db.close_session()
async def do_state_init_cbs(self): async def do_state_init_cbs(self):