Compare commits
24 Commits
f3bdfdf97b
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4936150c3b | ||
| 88057607d5 | |||
| 36d0a863c6 | |||
| 89ce46793e | |||
| 2bcf5d043c | |||
| 71942d5b8f | |||
| ef44973646 | |||
| ce55609297 | |||
| a27300b5e4 | |||
| f3faaa3dd6 | |||
| 0bb670b356 | |||
| 52b406ba17 | |||
| 3d0342d19d | |||
| dbf960bae9 | |||
| d49f142fe3 | |||
| 34fa439b3c | |||
| 41a1e2d9fe | |||
| 66229e67bb | |||
| 31b6ddd314 | |||
| 07c6423fd5 | |||
| 4740687167 | |||
| a06eeeb10d | |||
| 4492d23c47 | |||
| 1c0c2f0e63 |
30
alembic/versions/e47d1bca4b3d_sharedata.py
Normal file
30
alembic/versions/e47d1bca4b3d_sharedata.py
Normal file
@@ -0,0 +1,30 @@
|
||||
"""sharedata
|
||||
|
||||
Revision ID: e47d1bca4b3d
|
||||
Revises: 509010f13e8b
|
||||
Create Date: 2025-04-23 11:23:10.809341
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = 'e47d1bca4b3d'
|
||||
down_revision: Union[str, None] = '509010f13e8b'
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.create_table('sharedata',
|
||||
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
|
||||
sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
|
||||
sa.PrimaryKeyConstraint('id')
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_table('sharedata')
|
||||
File diff suppressed because one or more lines are too long
@@ -19,7 +19,9 @@ class AddressMetadata (TypedDict):
|
||||
|
||||
|
||||
def save_addrmeta(address: str, meta: AddressMetadata):
|
||||
if meta['type'] == 'Token':
|
||||
if meta is None:
|
||||
pass
|
||||
elif meta['type'] == 'Token':
|
||||
meta: OldTokenDict
|
||||
updated = Token.load(meta)
|
||||
token = db.session.get(Token, (current_chain.get().id, address))
|
||||
|
||||
@@ -42,10 +42,12 @@ class Account (LocalAccount):
|
||||
# log.debug(f'available accounts: {Account._pool.qsize()}')
|
||||
try:
|
||||
async with asyncio.timeout(1):
|
||||
result = await Account._pool.get()
|
||||
result: "Account" = await Account._pool.get()
|
||||
except asyncio.TimeoutError:
|
||||
log.error('waiting for an available account')
|
||||
result = await Account._pool.get()
|
||||
# mark as out of pool
|
||||
result._in_pool = False
|
||||
metric.account_available.set(Account._pool.qsize())
|
||||
return result
|
||||
|
||||
@@ -59,17 +61,20 @@ class Account (LocalAccount):
|
||||
if Account._main_account is None:
|
||||
Account._main_account = account
|
||||
Account._pool.put_nowait(account)
|
||||
account._in_pool = True # this account is now in the pool
|
||||
Account._all.append(account)
|
||||
metric.account_available.set(Account._pool.qsize())
|
||||
metric.account_total.set(len(Account._all))
|
||||
log.info(f'Account pool {[a.address for a in Account._all]}')
|
||||
|
||||
def __init__(self, local_account: LocalAccount): # todo chain_id?
|
||||
super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code
|
||||
def __init__(self, local_account: LocalAccount): # todo chain_id?
|
||||
super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code
|
||||
self.chain_id = current_chain.get().id
|
||||
self.signing_middleware = construct_sign_and_send_raw_middleware(self)
|
||||
self._nonce: Optional[int] = None
|
||||
self.tx_id: Optional[str] = None # current transaction id
|
||||
# release() idempotency tracking
|
||||
self._in_pool: bool = False
|
||||
|
||||
async def next_nonce(self):
|
||||
if self._nonce is None:
|
||||
@@ -86,8 +91,21 @@ class Account (LocalAccount):
|
||||
return current_w3.get().eth.get_balance(self.address)
|
||||
|
||||
def release(self):
|
||||
metric.account_available.set(Account._pool.qsize() + 1)
|
||||
"""
|
||||
Return this Account to the pool.
|
||||
|
||||
Idempotent: calling release() multiple times without a new acquire()
|
||||
will only enqueue the account once.
|
||||
"""
|
||||
# If we're already in the pool, do nothing.
|
||||
if self._in_pool:
|
||||
# Optional debug log; comment out if too noisy.
|
||||
# log.debug(f'Account {self.address} already in pool; ignoring extra release()')
|
||||
return
|
||||
|
||||
Account._pool.put_nowait(self)
|
||||
self._in_pool = True
|
||||
metric.account_available.set(Account._pool.qsize())
|
||||
|
||||
def __str__(self):
|
||||
return self.address
|
||||
|
||||
@@ -283,6 +283,9 @@ DISTANT_FUTURE = 4294967295 # max uint32
|
||||
|
||||
MAX_FRACTION = 65535 # max uint16
|
||||
|
||||
MIN_SLIPPAGE = 0.0001 # one bip
|
||||
MIN_SLIPPAGE_EPSILON = 0.000000000003
|
||||
|
||||
|
||||
@dataclass
|
||||
class Tranche:
|
||||
@@ -372,11 +375,11 @@ class Tranche:
|
||||
if self.minLine.intercept or self.minLine.slope:
|
||||
msg += f' >{self.minLine.intercept:.5g}'
|
||||
if self.minLine.slope:
|
||||
msg += f'{self.minLine.slope:+.5g}/s({self.minLine.value():5g})'
|
||||
msg += f'{self.minLine.slope:+.5g}/s={self.minLine.value():5g}'
|
||||
if self.maxLine.intercept or self.maxLine.slope:
|
||||
msg += f' <{self.maxLine.intercept:.5g}'
|
||||
if self.maxLine.slope:
|
||||
msg += f'{self.maxLine.slope:+.5g}/s({self.maxLine.value():5g})'
|
||||
msg += f'{self.maxLine.slope:+.5g}/s={self.maxLine.value():5g}'
|
||||
if self.rateLimitPeriod:
|
||||
msg += f' {self.rateLimitFraction/MAX_FRACTION:.1%} every {self.rateLimitPeriod/60:.0} minutes'
|
||||
return msg
|
||||
|
||||
@@ -7,20 +7,33 @@ from dexorder.blocks import current_block, get_block
|
||||
from dexorder.blockstate import current_blockstate
|
||||
from dexorder.blockstate.blockdata import BlockData
|
||||
from dexorder.blockstate.db_state import DbState
|
||||
from dexorder.blockstate.fork import current_fork
|
||||
from dexorder.contract.dexorder import VaultContract
|
||||
from dexorder.order.orderstate import Order
|
||||
from dexorder.tokens import adjust_decimals
|
||||
from dexorder.util import json
|
||||
from dexorder.vault_blockdata import vault_balances, pretty_balances
|
||||
from dexorder.bin.executable import execute
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
async def dump_orders(orders, args):
|
||||
if args.json:
|
||||
print(json.dumps([order.status.dump() for order in orders]))
|
||||
else:
|
||||
first = True
|
||||
for order in orders:
|
||||
if first:
|
||||
first = False
|
||||
else:
|
||||
print()
|
||||
print(await order.pprint())
|
||||
|
||||
def command_vault_argparse(subparsers):
|
||||
parser = subparsers.add_parser('vault', help='show the vault\'s balances and orders')
|
||||
parser.add_argument('address', help='address of the vault')
|
||||
parser.add_argument('--all', help='show all orders including closed ones', action='store_true')
|
||||
# parser.add_argument('--json', help='output in JSON format', action='store_true')
|
||||
parser.add_argument('--json', help='output in JSON format', action='store_true')
|
||||
|
||||
async def command_vault(args):
|
||||
balances = vault_balances.get(args.address, {})
|
||||
@@ -29,6 +42,7 @@ async def command_vault(args):
|
||||
print(pretty_balances({k: (await adjust_decimals(k, v)) for k, v in balances.items()}))
|
||||
print(f'Orders:')
|
||||
i = 0
|
||||
orders = []
|
||||
while True:
|
||||
key = OrderKey(args.address, i)
|
||||
try:
|
||||
@@ -36,16 +50,19 @@ async def command_vault(args):
|
||||
except KeyError:
|
||||
break
|
||||
if args.all or order.is_open:
|
||||
print(await order.pprint())
|
||||
orders.append(order)
|
||||
i += 1
|
||||
await dump_orders(orders, args)
|
||||
|
||||
|
||||
def command_open_argparse(subparsers):
|
||||
parser = subparsers.add_parser('open', help='show all open orders')
|
||||
parser.add_argument('--json', help='output in JSON format', action='store_true')
|
||||
|
||||
|
||||
async def command_open(args):
|
||||
await dump_orders([Order.of(key) for key in Order.open_orders], args)
|
||||
|
||||
# for key in Order.open_orders:
|
||||
# order = Order.of(key)
|
||||
# if args.json:
|
||||
# print(json.dumps(order.status.dump()))
|
||||
# else:
|
||||
# print()
|
||||
# print(order)
|
||||
|
||||
async def main(args: list):
|
||||
parser = argparse.ArgumentParser()
|
||||
@@ -66,10 +83,11 @@ async def main(args: list):
|
||||
db_state = DbState(BlockData.by_opt('db'))
|
||||
with db.transaction():
|
||||
state = await db_state.load()
|
||||
state.readonly = True
|
||||
# state.readonly = True
|
||||
current_blockstate.set(state)
|
||||
block = await get_block(state.root_hash)
|
||||
current_block.set(block)
|
||||
current_fork.set(state.root_fork)
|
||||
await subcommand(parsed)
|
||||
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ from dexorder.contract.dexorder import get_dexorder_contract
|
||||
from dexorder.event_handler import (init, dump_log, handle_vault_created, handle_order_placed,
|
||||
handle_transfer, handle_swap_filled, handle_order_canceled, handle_order_cancel_all,
|
||||
handle_uniswap_swaps, handle_vault_impl_changed, update_metrics)
|
||||
from dexorder.marks import publish_marks
|
||||
from dexorder.memcache import memcache
|
||||
from dexorder.memcache.memcache_state import RedisState, publish_all
|
||||
from dexorder.order.executionhandler import handle_dexorderexecutions, execute_tranches
|
||||
@@ -83,6 +84,7 @@ def setup_logevent_triggers(runner):
|
||||
# runner.add_callback(adjust_gas)
|
||||
|
||||
runner.add_callback(cleanup_jobs)
|
||||
runner.add_callback(publish_marks)
|
||||
runner.add_callback(update_metrics)
|
||||
|
||||
|
||||
@@ -115,7 +117,7 @@ async def main():
|
||||
if redis_state:
|
||||
# load initial state
|
||||
log.info('initializing redis with root state')
|
||||
await redis_state.save(state.root_fork, state.diffs_by_branch[state.root_branch.id])
|
||||
await redis_state.init(state, state.root_fork)
|
||||
|
||||
await initialize_accounting_runner()
|
||||
|
||||
|
||||
@@ -95,7 +95,7 @@ async def write_metadata( pools, mirror_pools ):
|
||||
pool_dicts = [get_pool(addr) for (addr,_inverted) in mirror_pools]
|
||||
pool_dicts = await asyncio.gather(*pool_dicts)
|
||||
for data, addr, (_,inverted) in zip(pool_dicts, pools, mirror_pools):
|
||||
data['x'] = dict(data=dict(uri=f'https://app.dexorder.trade/ohlc/', chain=42161, symbol=addr, inverted=inverted))
|
||||
data['x'] = dict(data=dict(uri=f'https://app.dexorder.com/ohlc/', chain=42161, symbol=addr, inverted=inverted))
|
||||
tokens = set(p['base'] for p in pool_dicts)
|
||||
tokens.update(p['quote'] for p in pool_dicts)
|
||||
tokens = await asyncio.gather(*[get_token(t) for t in tokens])
|
||||
@@ -190,6 +190,7 @@ async def main():
|
||||
while True:
|
||||
wake_up = now() + delay
|
||||
# log.debug(f'querying {pool}')
|
||||
tx = None
|
||||
try:
|
||||
price = await get_pool_price(pool)
|
||||
if price != last_prices.get(pool):
|
||||
@@ -200,7 +201,10 @@ async def main():
|
||||
addr, inverted = mirror_pools[pool]
|
||||
log.debug(f'Mirrored {addr} {price}')
|
||||
except Exception as x:
|
||||
log.debug(f'Could not update {pool}: {x}')
|
||||
log.debug(f'Could not update {pool}: {x} {tx}')
|
||||
if tx is not None:
|
||||
tx.account.reset_nonce()
|
||||
tx.account.release()
|
||||
continue
|
||||
try:
|
||||
pool = next(pool_iter)
|
||||
|
||||
@@ -42,6 +42,8 @@ class Config:
|
||||
fee_leeway = 0.1 # do not adjust fees if they are within this proportion
|
||||
min_gas: str = '0'
|
||||
|
||||
mark_publish_seconds: float = 60 # publish mark prices every this number of seconds
|
||||
|
||||
# Order slashing
|
||||
slash_kill_count: int = 5
|
||||
slash_delay_base: float = 60 # one minute
|
||||
|
||||
@@ -33,7 +33,8 @@ class ContractTransaction:
|
||||
async def wait(self) -> TxReceipt:
|
||||
if self.receipt is None:
|
||||
self.receipt = await current_w3.get().eth.wait_for_transaction_receipt(self.id)
|
||||
self.account.release()
|
||||
if self.account is not None:
|
||||
self.account.release()
|
||||
return self.receipt
|
||||
|
||||
async def sign(self, account: Account):
|
||||
@@ -81,14 +82,17 @@ def transact_wrapper(addr, name, func):
|
||||
account = await Account.acquire()
|
||||
if account is None:
|
||||
raise ValueError(f'No account to sign transaction {addr}.{name}()')
|
||||
await ct.sign(account)
|
||||
try:
|
||||
tx_id = await current_w3.get().eth.send_raw_transaction(ct.data)
|
||||
assert tx_id == ct.id_bytes
|
||||
return ct
|
||||
except Web3Exception as e:
|
||||
e.args += addr, name
|
||||
raise e
|
||||
await ct.sign(account)
|
||||
try:
|
||||
tx_id = await current_w3.get().eth.send_raw_transaction(ct.data)
|
||||
assert tx_id == ct.id_bytes
|
||||
return ct
|
||||
except Web3Exception as e:
|
||||
e.args += addr, name
|
||||
raise e
|
||||
finally:
|
||||
account.release()
|
||||
return f
|
||||
|
||||
|
||||
@@ -150,10 +154,14 @@ class ContractProxy:
|
||||
def __getattr__(self, item):
|
||||
if item == 'constructor':
|
||||
found = self.contract.constructor
|
||||
elif item in self.contract.functions:
|
||||
found = self.contract.functions[item]
|
||||
else:
|
||||
raise AttributeError(item)
|
||||
funcs = self.contract.functions
|
||||
# In web3.py v6+, contract functions are exposed as attributes, not via __getitem__.
|
||||
# Using getattr ensures we obtain the callable factory for the function; indexing may return None.
|
||||
# Additionally, guard against unexpected None to fail fast with a clear error.
|
||||
found = getattr(funcs, item, None)
|
||||
if not callable(found):
|
||||
raise AttributeError(f"Function '{item}' not found on contract {self._interface_name} at {self.address}")
|
||||
return self._wrapper(self.address, item, found)
|
||||
|
||||
def __repr__(self):
|
||||
|
||||
@@ -10,3 +10,4 @@ from .ofac import OFAC, OFACAlerts
|
||||
from .accounting import Accounting, DbAccount
|
||||
from .vaultcreationrequest import VaultCreationRequest
|
||||
from .tos import TOSAcceptance
|
||||
from .sharedata import ShareData
|
||||
|
||||
12
src/dexorder/database/model/sharedata.py
Normal file
12
src/dexorder/database/model/sharedata.py
Normal file
@@ -0,0 +1,12 @@
|
||||
import logging
|
||||
|
||||
from sqlalchemy.dialects.postgresql import JSONB
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from dexorder.database.model import Base
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
class ShareData (Base):
|
||||
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
|
||||
data: Mapped[dict] = mapped_column(JSONB)
|
||||
@@ -220,7 +220,7 @@ async def update_metrics():
|
||||
metric.vaults.set(vault_owners.upper_len())
|
||||
metric.open_orders.set(Order.open_orders.upper_len())
|
||||
metric.triggers_time.set(len(TimeTrigger.all))
|
||||
metric.triggers_line.set(len(PriceLineTrigger.triggers_set))
|
||||
metric.triggers_line.set(sum(len(s) for s in PriceLineTrigger.by_pool.values()))
|
||||
|
||||
# slow updates
|
||||
global slow_metric_update
|
||||
|
||||
44
src/dexorder/marks.py
Normal file
44
src/dexorder/marks.py
Normal file
@@ -0,0 +1,44 @@
|
||||
"""
|
||||
"marks" are mark-to-market USD values of a selected set of tokens called quote tokens. Publishing a set of USD marks
|
||||
for the quote tokens allows almost any token to be marked to USD via one hop.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
|
||||
from dexorder import dec, NATIVE_TOKEN, config
|
||||
from dexorder.base.chain import current_chain
|
||||
from dexorder.blockstate import BlockDict
|
||||
from dexorder.pools import quotes, mark_to_market
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
def pub_marks(_s,k,v):
|
||||
chain_id = current_chain.get().id
|
||||
return str(chain_id), 'marks.usd', (chain_id, k, str(v))
|
||||
|
||||
|
||||
marks: BlockDict[str, dec] = BlockDict('mark.usd', db=False, redis=True, pub=pub_marks, value2str=str)
|
||||
|
||||
class RateLimiter:
|
||||
def __init__(self, rate: float):
|
||||
self.rate = rate
|
||||
self.last_update = 0.0
|
||||
|
||||
def ready(self):
|
||||
now = time.monotonic()
|
||||
if now - self.last_update < self.rate:
|
||||
return False
|
||||
self.last_update = now
|
||||
return True
|
||||
|
||||
mark_publish_rate = RateLimiter(config.mark_publish_seconds)
|
||||
|
||||
def publish_marks():
|
||||
if mark_publish_rate.ready():
|
||||
for token_addr in [NATIVE_TOKEN]+quotes:
|
||||
# overwrite=False checks the previous value and does not generate a diff if the values match. This prevents
|
||||
# excessive updates to Redis
|
||||
value = mark_to_market(token_addr)
|
||||
if value is not None:
|
||||
marks.setitem(token_addr, value, overwrite=False)
|
||||
@@ -1,3 +1,4 @@
|
||||
import itertools
|
||||
import logging
|
||||
from contextlib import asynccontextmanager
|
||||
from contextvars import ContextVar
|
||||
@@ -10,16 +11,70 @@ from dexorder import config
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
BATCH_SIZE = 1_000
|
||||
|
||||
class PipelineProxy:
|
||||
def __init__(self, pipe: Pipeline):
|
||||
self.pipe = pipe
|
||||
self.ops = 0
|
||||
|
||||
async def push(self, num=1):
|
||||
self.ops += num
|
||||
if self.ops >= BATCH_SIZE:
|
||||
self.ops = 0
|
||||
await self.pipe.execute()
|
||||
|
||||
async def sadd(self, series, *keys):
|
||||
while keys:
|
||||
most = min(BATCH_SIZE-self.ops, len(keys))
|
||||
assert most > 0
|
||||
send = keys[:most]
|
||||
keys = keys[most:]
|
||||
await self.pipe.sadd(series, *send)
|
||||
await self.push(len(send))
|
||||
|
||||
async def srem(self, series, *keys):
|
||||
while keys:
|
||||
most = min(BATCH_SIZE-self.ops, len(keys))
|
||||
assert most > 0
|
||||
send = keys[:most]
|
||||
keys = keys[most:]
|
||||
await self.pipe.srem(series, *send)
|
||||
await self.push(len(send))
|
||||
|
||||
async def hset(self, series, *, mapping):
|
||||
items = list(mapping.items())
|
||||
while items:
|
||||
most = min(BATCH_SIZE-self.ops, len(items))
|
||||
assert most > 0
|
||||
send = items[:most]
|
||||
items = items[most:]
|
||||
await self.pipe.hset(series, mapping={k:v for k,v in send})
|
||||
await self.push(len(send))
|
||||
|
||||
async def hdel(self, series, *keys):
|
||||
while keys:
|
||||
most = min(BATCH_SIZE-self.ops, len(keys))
|
||||
assert most > 0
|
||||
send = keys[:most]
|
||||
keys = keys[most:]
|
||||
await self.pipe.hdel(series, *send)
|
||||
await self.push(len(send))
|
||||
|
||||
def __getattr__(self, item):
|
||||
return getattr(self.pipe, item)
|
||||
|
||||
|
||||
class Memcache:
|
||||
@staticmethod
|
||||
@asynccontextmanager
|
||||
async def batch():
|
||||
async def batch(transaction=True):
|
||||
old_redis: Redis = current_redis.get()
|
||||
pipe: Pipeline = old_redis.pipeline()
|
||||
pipe = old_redis.pipeline(transaction=transaction)
|
||||
# noinspection PyTypeChecker
|
||||
current_redis.set(pipe)
|
||||
try:
|
||||
yield pipe
|
||||
yield PipelineProxy(pipe)
|
||||
await pipe.execute()
|
||||
finally:
|
||||
current_redis.set(old_redis)
|
||||
|
||||
@@ -12,7 +12,7 @@ from dexorder.blockstate.blockdata import SeriesCollection, BlockData
|
||||
from dexorder.blockstate.diff import DiffEntryItem
|
||||
from dexorder.blockstate.fork import Fork
|
||||
from dexorder.blockstate.state import compress_diffs
|
||||
from dexorder.memcache import current_redis, memcache
|
||||
from dexorder.memcache import current_redis, memcache, PipelineProxy
|
||||
from dexorder.util import hexstr
|
||||
from dexorder.util.async_util import maywait
|
||||
from dexorder.util.json import json_encoder
|
||||
@@ -40,11 +40,11 @@ class RedisState (SeriesCollection):
|
||||
for series in self.datas.keys():
|
||||
for k, v in state.iteritems(fork, series):
|
||||
diffs.append(DiffItem(series, k, v))
|
||||
await self.save(fork, diffs)
|
||||
await self.save(fork, diffs, use_transaction=False, skip_pubs=True) # use_transaction=False if the data is too big
|
||||
|
||||
|
||||
# noinspection PyAsyncCall
|
||||
async def save(self, fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
|
||||
async def save(self, fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]], *, use_transaction=True, skip_pubs=False):
|
||||
# the diffs must be already compressed such that there is only one action per key
|
||||
chain = current_chain.get()
|
||||
chain_id = chain.id
|
||||
@@ -91,22 +91,23 @@ class RedisState (SeriesCollection):
|
||||
hsets[series][key] = value
|
||||
else:
|
||||
raise NotImplementedError
|
||||
async with memcache.batch() as r:
|
||||
r: Pipeline
|
||||
|
||||
async with memcache.batch(use_transaction) as r:
|
||||
r: PipelineProxy
|
||||
for series, keys in sadds.items():
|
||||
r.sadd(series, *keys)
|
||||
await r.sadd(series, *keys)
|
||||
for series, keys in sdels.items():
|
||||
r.srem(series, *keys)
|
||||
await r.srem(series, *keys)
|
||||
for series, kvs in hsets.items():
|
||||
r.hset(series, mapping=kvs)
|
||||
await r.hset(series, mapping=kvs)
|
||||
for series, keys in hdels.items():
|
||||
r.hdel(series, *keys)
|
||||
await r.hdel(series, *keys)
|
||||
block_series = f'{chain_id}|head'
|
||||
headstr = hexstr(fork.head)
|
||||
r.json(json_encoder).set(block_series,'$',[fork.height, headstr])
|
||||
pubs.append((str(chain_id), 'head', [fork.height, headstr]))
|
||||
# separate batch for pubs
|
||||
if pubs:
|
||||
if pubs and not skip_pubs:
|
||||
await publish_all(pubs)
|
||||
|
||||
|
||||
|
||||
@@ -359,7 +359,7 @@ class OHLCRepository:
|
||||
if price is None, then bars are advanced based on the time but no new price is added to the series.
|
||||
"""
|
||||
if OHLC_LIMIT_POOLS_DEBUG is not None and (symbol,period) not in OHLC_LIMIT_POOLS_DEBUG:
|
||||
return
|
||||
return None
|
||||
# logname = f'{symbol} {period_name(period)}'
|
||||
# log.debug(f'Updating OHLC {logname} {minutely(time)} {price}')
|
||||
if price is not None:
|
||||
@@ -371,33 +371,31 @@ class OHLCRepository:
|
||||
# log.debug(f'got recent {historical}')
|
||||
if not historical:
|
||||
if create is False or price is None:
|
||||
return # do not track symbols which have not been explicity set up
|
||||
historical = []
|
||||
return None # do not track symbols which have not been explicity set up
|
||||
updated = [NativeOHLC(ohlc_start_time(time, period), price, price, price, price)]
|
||||
# log.debug(f'\tcreated new bars {updated}')
|
||||
else:
|
||||
updated = update_ohlc(historical[-1], period, time, price)
|
||||
# drop any historical bars that are older than we need
|
||||
# oldest_needed = cover the root block time plus one period prior
|
||||
root_branch = current_blockstate.get().root_branch
|
||||
root_hash = root_branch.head
|
||||
if root_hash is not None:
|
||||
root_timestamp = await get_block_timestamp(root_hash)
|
||||
oldest_needed = from_timestamp(root_timestamp) - period
|
||||
# noinspection PyTypeChecker
|
||||
trim = (oldest_needed - historical[0].start) // period
|
||||
if trim > 0:
|
||||
historical = historical[trim:]
|
||||
|
||||
# now overlap the updated data on top of the historical data
|
||||
if not historical or not updated:
|
||||
updated = historical + updated
|
||||
else:
|
||||
# overlap the updated OHLC's on top of the historical ones
|
||||
last_bar = historical[-1].start
|
||||
first_updated = updated[0].start
|
||||
overlap = (first_updated - last_bar) // period + 1
|
||||
updated = historical[:-overlap] + updated if overlap > 0 else historical + updated
|
||||
# log.debug(f'\tnew recents: {updated}')
|
||||
|
||||
# drop any bars that are older than we need
|
||||
# oldest_needed = cover the root block time plus one period prior
|
||||
root_branch = current_blockstate.get().root_branch
|
||||
root_hash = root_branch.head
|
||||
if root_hash is not None:
|
||||
root_timestamp = await get_block_timestamp(root_hash)
|
||||
oldest_needed = from_timestamp(root_timestamp) - period
|
||||
# noinspection PyTypeChecker
|
||||
trim = (oldest_needed - updated[0].start) // period
|
||||
if trim > 0:
|
||||
updated = updated[trim:]
|
||||
|
||||
# if len(updated) > 3:
|
||||
# log.debug(f'\tnew recents ({len(updated)}): {updated}')
|
||||
recent_ohlcs.setitem(key, updated)
|
||||
return updated
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ from uuid import UUID
|
||||
from web3.exceptions import ContractPanicError, ContractLogicError
|
||||
from web3.types import EventData
|
||||
|
||||
from dexorder import db, metric
|
||||
from dexorder import db, metric, config
|
||||
from dexorder.accounting import accounting_transaction_gas
|
||||
from dexorder.base import TransactionReceiptDict, TransactionRequest, transaction_request_deserializers
|
||||
from dexorder.base.order import TrancheKey, OrderKey
|
||||
@@ -121,6 +121,11 @@ async def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
|
||||
if trig is not None:
|
||||
trig.touch()
|
||||
|
||||
def delay(secs=None):
|
||||
trig = get_trigger()
|
||||
if trig is not None:
|
||||
trig.deactivate(secs if secs is not None else config.slippage_control_delay)
|
||||
|
||||
if error is None:
|
||||
metric.executions.inc()
|
||||
else:
|
||||
@@ -162,6 +167,7 @@ async def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
|
||||
retry()
|
||||
elif error == 'RL':
|
||||
log.debug(f'tranche {tk} execution failed due to "RL" rate limit')
|
||||
delay()
|
||||
retry()
|
||||
elif error == 'TE':
|
||||
log.debug(f'tranche {tk} execution failed due to "TE" too early')
|
||||
|
||||
@@ -295,17 +295,17 @@ class Order:
|
||||
async def pprint(self):
|
||||
amount_token = self.order.tokenIn if self.order.amountIsInput else self.order.tokenOut
|
||||
msg = f'''
|
||||
SwapOrder {self.key}
|
||||
status: {self.state.name}
|
||||
placed: {from_timestamp(self.status.startTime)}
|
||||
in: {self.order.tokenIn}
|
||||
out: {self.order.tokenOut}
|
||||
exchange: {self.order.route.exchange.name, self.order.route.fee}
|
||||
amount: {"input" if self.order.amountIsInput else "output"} {await adjust_decimals(amount_token, self.filled):f}/{await adjust_decimals(amount_token, self.amount):f}{" to owner" if self.order.outputDirectlyToOwner else ""}
|
||||
minFill: {await adjust_decimals(amount_token, self.min_fill_amount):f}
|
||||
inverted: {self.order.inverted}
|
||||
tranches:
|
||||
'''
|
||||
SwapOrder {self.key}
|
||||
status: {self.state.name}
|
||||
placed: {from_timestamp(self.status.startTime)}
|
||||
in: {self.order.tokenIn}
|
||||
out: {self.order.tokenOut}
|
||||
exchange: {self.order.route.exchange.name, self.order.route.fee}
|
||||
amount: {"input" if self.order.amountIsInput else "output"} {await adjust_decimals(amount_token, self.filled):f}/{await adjust_decimals(amount_token, self.amount):f}{" to owner" if self.order.outputDirectlyToOwner else ""}
|
||||
minFill: {await adjust_decimals(amount_token, self.min_fill_amount):f}
|
||||
inverted: {self.order.inverted}
|
||||
tranches:
|
||||
'''
|
||||
for i in range(len(self.order.tranches)):
|
||||
tranche = self.order.tranches[i]
|
||||
msg += f' {tranche}'
|
||||
|
||||
@@ -9,7 +9,8 @@ from typing import Optional, Sequence, Union
|
||||
import numpy as np
|
||||
from sortedcontainers import SortedList
|
||||
|
||||
from dexorder.base.orderlib import SwapOrderState, PriceProof, DISTANT_FUTURE, DISTANT_PAST, Line
|
||||
from dexorder.base.orderlib import SwapOrderState, PriceProof, DISTANT_FUTURE, DISTANT_PAST, Line, MIN_SLIPPAGE, \
|
||||
MIN_SLIPPAGE_EPSILON
|
||||
from dexorder.blockstate import BlockDict
|
||||
from .orderstate import Order
|
||||
from .. import dec, order_log, timestamp, from_timestamp, config
|
||||
@@ -56,13 +57,13 @@ class OrderTriggers:
|
||||
self.order = order
|
||||
self.triggers = triggers
|
||||
OrderTriggers.instances[order.key] = self
|
||||
log.debug(f'created OrderTriggers for {order.key}')
|
||||
# log.debug(f'created OrderTriggers for {order.key}')
|
||||
|
||||
def disable(self):
|
||||
for t in self.triggers:
|
||||
t.disable()
|
||||
del OrderTriggers.instances[self.order.key]
|
||||
log.debug(f'disabled OrderTriggers for {self.order.key}')
|
||||
# log.debug(f'disabled OrderTriggers for {self.order.key}')
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
@@ -72,6 +73,10 @@ class OrderTriggers:
|
||||
def open(self):
|
||||
return not self.closed
|
||||
|
||||
@property
|
||||
def error(self):
|
||||
return any(t.error for t in self.triggers)
|
||||
|
||||
def check_complete(self):
|
||||
if self.closed:
|
||||
final_state = SwapOrderState.Filled if self.order.remaining == 0 or self.order.remaining < self.order.min_fill_amount else SwapOrderState.Expired
|
||||
@@ -218,7 +223,7 @@ async def has_funds(tk: TrancheKey):
|
||||
# log.debug(f'balances {balances}')
|
||||
token_addr = order.status.order.tokenIn
|
||||
token_balance = balances.get(token_addr)
|
||||
log.debug(f'amount of {token_addr} = {token_balance}')
|
||||
# log.debug(f'amount of {token_addr} = {token_balance}')
|
||||
if token_balance is None:
|
||||
# unknown balance
|
||||
token_balance = balances[token_addr] = await ERC20(token_addr).balanceOf(tk.vault)
|
||||
@@ -388,7 +393,7 @@ class PriceLineTrigger (Trigger):
|
||||
if inverted:
|
||||
price_now = 1/price_now
|
||||
activated = value_now < price_now if is_min else value_now > price_now
|
||||
log.debug(f'initial price line {value_now} {"<" if is_min else ">"} {price_now} {activated}')
|
||||
# log.debug(f'initial price line {value_now} {"<" if is_min else ">"} {price_now} {activated}')
|
||||
trigger_type = Trigger.TriggerType.MinLine if is_min else Trigger.TriggerType.MaxLine
|
||||
super().__init__(trigger_type, tk, activated)
|
||||
self.inverted = inverted
|
||||
@@ -503,7 +508,8 @@ async def activate_order(order: Order):
|
||||
triggers = await OrderTriggers.create(order)
|
||||
if triggers.closed:
|
||||
log.debug(f'order {order.key} was immediately closed')
|
||||
final_state = SwapOrderState.Filled if order.remaining == 0 or order.remaining < order.min_fill_amount \
|
||||
final_state = SwapOrderState.Error if triggers.error \
|
||||
else SwapOrderState.Filled if order.remaining == 0 or order.remaining < order.min_fill_amount \
|
||||
else SwapOrderState.Expired
|
||||
order.complete(final_state)
|
||||
|
||||
@@ -564,13 +570,14 @@ class TrancheTrigger:
|
||||
|
||||
tranche_remaining = tranche.fraction_of(order.amount) - order.tranche_filled(self.tk.tranche_index)
|
||||
self.status = \
|
||||
TrancheState.Error if self.market_order and self.slippage < MIN_SLIPPAGE - MIN_SLIPPAGE_EPSILON else \
|
||||
TrancheState.Filled if tranche_remaining == 0 or tranche_remaining < self.order.min_fill_amount else \
|
||||
TrancheState.Expired if self.expiration_trigger is not None and not self.expiration_trigger else \
|
||||
TrancheState.Early if self.activation_trigger is None and not self.activation_trigger else \
|
||||
TrancheState.Early if self.activation_trigger is not None and not self.activation_trigger else \
|
||||
TrancheState.Active
|
||||
_dirty.add(tk)
|
||||
TrancheTrigger.all[tk] = self
|
||||
log.debug(f'Tranche {tk} initial status {self.status} {self}')
|
||||
# log.debug(f'Tranche {tk} initial status {self.status} {self}')
|
||||
|
||||
|
||||
@property
|
||||
@@ -614,11 +621,11 @@ class TrancheTrigger:
|
||||
self.order_trigger.expire_tranche(self.tk.tranche_index)
|
||||
|
||||
def expire(self):
|
||||
self.disable()
|
||||
if self.closed:
|
||||
return
|
||||
order_log.debug(f'tranche expired {self.tk}')
|
||||
self.status = TrancheState.Expired
|
||||
self.disable()
|
||||
|
||||
def kill(self):
|
||||
order_log.warning(f'tranche KILLED {self.tk}')
|
||||
@@ -643,11 +650,13 @@ class TrancheTrigger:
|
||||
|
||||
def deactivate_until(self, until):
|
||||
# Temporarily deactivate the tranche due to a rate limit. Use disable() to permanently halt the trigger.
|
||||
log.debug(f'deactivating tranche {self.tk} until {from_timestamp(until)}')
|
||||
now = current_clock.get().timestamp
|
||||
if until < now:
|
||||
return
|
||||
if self.activation_trigger is None:
|
||||
self.activation_trigger = TimeTrigger.create(True, self.tk, until)
|
||||
else:
|
||||
self.activation_trigger.time = until
|
||||
self.activation_trigger.time = max(until, self.activation_trigger.time)
|
||||
try:
|
||||
del active_tranches[self.tk]
|
||||
except KeyError:
|
||||
@@ -689,6 +698,10 @@ class TrancheTrigger:
|
||||
def open(self):
|
||||
return not self.closed
|
||||
|
||||
@property
|
||||
def error(self):
|
||||
return self.status == TrancheState.Error
|
||||
|
||||
def __str__(self):
|
||||
trigs = []
|
||||
if self.balance_trigger is not None:
|
||||
|
||||
@@ -148,7 +148,7 @@ class MarkPool:
|
||||
|
||||
mark_pools: dict[str, MarkPool] = {}
|
||||
|
||||
quotes = [] # ordered list of preferred quote tokens
|
||||
quotes = [] # ordered list of preferred quote token addresses
|
||||
|
||||
|
||||
def add_mark_pool(addr: str, base: str, quote: str, fee: int):
|
||||
@@ -200,7 +200,7 @@ async def mark_to_market_adj_dec(token: str, amount: dec, adjust_decimals=True)
|
||||
return mark_to_market(token, amount)
|
||||
|
||||
|
||||
def mark_to_market(token: str, amount: dec) -> Optional[dec]:
|
||||
def mark_to_market(token: str, amount: dec = dec(1)) -> Optional[dec]:
|
||||
"""
|
||||
amount must already be adjusted for decimals
|
||||
"""
|
||||
|
||||
@@ -5,7 +5,6 @@ from datetime import timedelta
|
||||
from typing import Any, Iterable, Callable, Optional
|
||||
|
||||
from eth_bloom import BloomFilter
|
||||
# noinspection PyPackageRequirements
|
||||
from websockets.exceptions import ConnectionClosedError
|
||||
|
||||
from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now, timestamp, metric
|
||||
@@ -81,8 +80,7 @@ class BlockStateRunner(BlockProgressor):
|
||||
async with w3ws as w3ws:
|
||||
log.debug('connecting to ws provider')
|
||||
await w3ws.provider.connect()
|
||||
subscription = await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc.
|
||||
# log.debug(f'subscribed to newHeads {subscription}')
|
||||
await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc.
|
||||
while self.running:
|
||||
async for message in w3ws.ws.process_subscriptions():
|
||||
block = Block(chain_id, message['result'])
|
||||
@@ -94,11 +92,15 @@ class BlockStateRunner(BlockProgressor):
|
||||
if not self.running:
|
||||
break
|
||||
await async_yield()
|
||||
except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e:
|
||||
except (TimeoutError, asyncio.TimeoutError) as e:
|
||||
log.debug(f'runner timeout {e}')
|
||||
except ConnectionClosedError as e:
|
||||
log.info(f'websocket connection closed {e}')
|
||||
except ConnectionRefusedError:
|
||||
log.warning(f'Could not connect to websocket {config.ws_url}')
|
||||
await asyncio.sleep(1)
|
||||
except StopAsyncIteration:
|
||||
log.info(f'websocket stream ended')
|
||||
except Exception:
|
||||
log.exception(f'Unhandled exception during run_ws()')
|
||||
finally:
|
||||
@@ -397,21 +399,21 @@ class BlockStateRunner(BlockProgressor):
|
||||
# propragate to the DB or Redis.
|
||||
# TIME TICKS ARE DISABLED FOR THIS REASON
|
||||
return
|
||||
current_fork.set(fork)
|
||||
session = db.session
|
||||
session.begin()
|
||||
try:
|
||||
for callback, on_timer in self.callbacks:
|
||||
if on_timer:
|
||||
# noinspection PyCallingNonCallable
|
||||
await maywait(callback())
|
||||
except BaseException:
|
||||
session.rollback()
|
||||
raise
|
||||
else:
|
||||
session.commit()
|
||||
finally:
|
||||
db.close_session()
|
||||
# current_fork.set(fork)
|
||||
# session = db.session
|
||||
# session.begin()
|
||||
# try:
|
||||
# for callback, on_timer in self.callbacks:
|
||||
# if on_timer:
|
||||
# # noinspection PyCallingNonCallable
|
||||
# await maywait(callback())
|
||||
# except BaseException:
|
||||
# session.rollback()
|
||||
# raise
|
||||
# else:
|
||||
# session.commit()
|
||||
# finally:
|
||||
# db.close_session()
|
||||
|
||||
|
||||
async def do_state_init_cbs(self):
|
||||
|
||||
Reference in New Issue
Block a user