Compare commits

..

7 Commits

Author SHA1 Message Date
tim
4936150c3b bugfixes; startall works 2025-12-09 2025-12-09 15:11:58 -04:00
tim
88057607d5 put app back on app.dexorder.com and corp site on dexorder.com with www redirecting to apex 2025-05-19 15:19:20 -04:00
tim
36d0a863c6 remove spammy debug logs 2025-05-07 16:02:37 -04:00
tim
89ce46793e dotcom 2025-05-06 13:56:05 -04:00
tim
2bcf5d043c redis pipeline overflow fix 2025-04-23 15:20:00 -04:00
tim
71942d5b8f memcache init doesn't use transaction 2025-04-23 14:13:58 -04:00
tim
ef44973646 sharedata 2025-04-23 12:51:14 -04:00
11 changed files with 144 additions and 26 deletions

View File

@@ -0,0 +1,30 @@
"""sharedata
Revision ID: e47d1bca4b3d
Revises: 509010f13e8b
Create Date: 2025-04-23 11:23:10.809341
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = 'e47d1bca4b3d'
down_revision: Union[str, None] = '509010f13e8b'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table('sharedata',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint('id')
)
def downgrade() -> None:
op.drop_table('sharedata')

View File

@@ -42,10 +42,12 @@ class Account (LocalAccount):
# log.debug(f'available accounts: {Account._pool.qsize()}') # log.debug(f'available accounts: {Account._pool.qsize()}')
try: try:
async with asyncio.timeout(1): async with asyncio.timeout(1):
result = await Account._pool.get() result: "Account" = await Account._pool.get()
except asyncio.TimeoutError: except asyncio.TimeoutError:
log.error('waiting for an available account') log.error('waiting for an available account')
result = await Account._pool.get() result = await Account._pool.get()
# mark as out of pool
result._in_pool = False
metric.account_available.set(Account._pool.qsize()) metric.account_available.set(Account._pool.qsize())
return result return result
@@ -59,17 +61,20 @@ class Account (LocalAccount):
if Account._main_account is None: if Account._main_account is None:
Account._main_account = account Account._main_account = account
Account._pool.put_nowait(account) Account._pool.put_nowait(account)
account._in_pool = True # this account is now in the pool
Account._all.append(account) Account._all.append(account)
metric.account_available.set(Account._pool.qsize()) metric.account_available.set(Account._pool.qsize())
metric.account_total.set(len(Account._all)) metric.account_total.set(len(Account._all))
log.info(f'Account pool {[a.address for a in Account._all]}') log.info(f'Account pool {[a.address for a in Account._all]}')
def __init__(self, local_account: LocalAccount): # todo chain_id? def __init__(self, local_account: LocalAccount): # todo chain_id?
super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code
self.chain_id = current_chain.get().id self.chain_id = current_chain.get().id
self.signing_middleware = construct_sign_and_send_raw_middleware(self) self.signing_middleware = construct_sign_and_send_raw_middleware(self)
self._nonce: Optional[int] = None self._nonce: Optional[int] = None
self.tx_id: Optional[str] = None # current transaction id self.tx_id: Optional[str] = None # current transaction id
# release() idempotency tracking
self._in_pool: bool = False
async def next_nonce(self): async def next_nonce(self):
if self._nonce is None: if self._nonce is None:
@@ -86,8 +91,21 @@ class Account (LocalAccount):
return current_w3.get().eth.get_balance(self.address) return current_w3.get().eth.get_balance(self.address)
def release(self): def release(self):
metric.account_available.set(Account._pool.qsize() + 1) """
Return this Account to the pool.
Idempotent: calling release() multiple times without a new acquire()
will only enqueue the account once.
"""
# If we're already in the pool, do nothing.
if self._in_pool:
# Optional debug log; comment out if too noisy.
# log.debug(f'Account {self.address} already in pool; ignoring extra release()')
return
Account._pool.put_nowait(self) Account._pool.put_nowait(self)
self._in_pool = True
metric.account_available.set(Account._pool.qsize())
def __str__(self): def __str__(self):
return self.address return self.address

View File

@@ -95,7 +95,7 @@ async def write_metadata( pools, mirror_pools ):
pool_dicts = [get_pool(addr) for (addr,_inverted) in mirror_pools] pool_dicts = [get_pool(addr) for (addr,_inverted) in mirror_pools]
pool_dicts = await asyncio.gather(*pool_dicts) pool_dicts = await asyncio.gather(*pool_dicts)
for data, addr, (_,inverted) in zip(pool_dicts, pools, mirror_pools): for data, addr, (_,inverted) in zip(pool_dicts, pools, mirror_pools):
data['x'] = dict(data=dict(uri=f'https://app.dexorder.trade/ohlc/', chain=42161, symbol=addr, inverted=inverted)) data['x'] = dict(data=dict(uri=f'https://app.dexorder.com/ohlc/', chain=42161, symbol=addr, inverted=inverted))
tokens = set(p['base'] for p in pool_dicts) tokens = set(p['base'] for p in pool_dicts)
tokens.update(p['quote'] for p in pool_dicts) tokens.update(p['quote'] for p in pool_dicts)
tokens = await asyncio.gather(*[get_token(t) for t in tokens]) tokens = await asyncio.gather(*[get_token(t) for t in tokens])

View File

@@ -33,7 +33,8 @@ class ContractTransaction:
async def wait(self) -> TxReceipt: async def wait(self) -> TxReceipt:
if self.receipt is None: if self.receipt is None:
self.receipt = await current_w3.get().eth.wait_for_transaction_receipt(self.id) self.receipt = await current_w3.get().eth.wait_for_transaction_receipt(self.id)
self.account.release() if self.account is not None:
self.account.release()
return self.receipt return self.receipt
async def sign(self, account: Account): async def sign(self, account: Account):
@@ -153,10 +154,14 @@ class ContractProxy:
def __getattr__(self, item): def __getattr__(self, item):
if item == 'constructor': if item == 'constructor':
found = self.contract.constructor found = self.contract.constructor
elif item in self.contract.functions:
found = self.contract.functions[item]
else: else:
raise AttributeError(item) funcs = self.contract.functions
# In web3.py v6+, contract functions are exposed as attributes, not via __getitem__.
# Using getattr ensures we obtain the callable factory for the function; indexing may return None.
# Additionally, guard against unexpected None to fail fast with a clear error.
found = getattr(funcs, item, None)
if not callable(found):
raise AttributeError(f"Function '{item}' not found on contract {self._interface_name} at {self.address}")
return self._wrapper(self.address, item, found) return self._wrapper(self.address, item, found)
def __repr__(self): def __repr__(self):

View File

@@ -10,3 +10,4 @@ from .ofac import OFAC, OFACAlerts
from .accounting import Accounting, DbAccount from .accounting import Accounting, DbAccount
from .vaultcreationrequest import VaultCreationRequest from .vaultcreationrequest import VaultCreationRequest
from .tos import TOSAcceptance from .tos import TOSAcceptance
from .sharedata import ShareData

View File

@@ -0,0 +1,12 @@
import logging
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from dexorder.database.model import Base
log = logging.getLogger(__name__)
class ShareData (Base):
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
data: Mapped[dict] = mapped_column(JSONB)

View File

@@ -1,14 +1,69 @@
import itertools
import logging import logging
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from contextvars import ContextVar from contextvars import ContextVar
import redis.asyncio as redis_async import redis.asyncio as redis_async
from redis.asyncio import Redis from redis.asyncio import Redis
from redis.asyncio.client import Pipeline
from dexorder import config from dexorder import config
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
BATCH_SIZE = 1_000
class PipelineProxy:
def __init__(self, pipe: Pipeline):
self.pipe = pipe
self.ops = 0
async def push(self, num=1):
self.ops += num
if self.ops >= BATCH_SIZE:
self.ops = 0
await self.pipe.execute()
async def sadd(self, series, *keys):
while keys:
most = min(BATCH_SIZE-self.ops, len(keys))
assert most > 0
send = keys[:most]
keys = keys[most:]
await self.pipe.sadd(series, *send)
await self.push(len(send))
async def srem(self, series, *keys):
while keys:
most = min(BATCH_SIZE-self.ops, len(keys))
assert most > 0
send = keys[:most]
keys = keys[most:]
await self.pipe.srem(series, *send)
await self.push(len(send))
async def hset(self, series, *, mapping):
items = list(mapping.items())
while items:
most = min(BATCH_SIZE-self.ops, len(items))
assert most > 0
send = items[:most]
items = items[most:]
await self.pipe.hset(series, mapping={k:v for k,v in send})
await self.push(len(send))
async def hdel(self, series, *keys):
while keys:
most = min(BATCH_SIZE-self.ops, len(keys))
assert most > 0
send = keys[:most]
keys = keys[most:]
await self.pipe.hdel(series, *send)
await self.push(len(send))
def __getattr__(self, item):
return getattr(self.pipe, item)
class Memcache: class Memcache:
@staticmethod @staticmethod
@@ -19,7 +74,7 @@ class Memcache:
# noinspection PyTypeChecker # noinspection PyTypeChecker
current_redis.set(pipe) current_redis.set(pipe)
try: try:
yield pipe yield PipelineProxy(pipe)
await pipe.execute() await pipe.execute()
finally: finally:
current_redis.set(old_redis) current_redis.set(old_redis)

View File

@@ -12,7 +12,7 @@ from dexorder.blockstate.blockdata import SeriesCollection, BlockData
from dexorder.blockstate.diff import DiffEntryItem from dexorder.blockstate.diff import DiffEntryItem
from dexorder.blockstate.fork import Fork from dexorder.blockstate.fork import Fork
from dexorder.blockstate.state import compress_diffs from dexorder.blockstate.state import compress_diffs
from dexorder.memcache import current_redis, memcache from dexorder.memcache import current_redis, memcache, PipelineProxy
from dexorder.util import hexstr from dexorder.util import hexstr
from dexorder.util.async_util import maywait from dexorder.util.async_util import maywait
from dexorder.util.json import json_encoder from dexorder.util.json import json_encoder
@@ -40,8 +40,7 @@ class RedisState (SeriesCollection):
for series in self.datas.keys(): for series in self.datas.keys():
for k, v in state.iteritems(fork, series): for k, v in state.iteritems(fork, series):
diffs.append(DiffItem(series, k, v)) diffs.append(DiffItem(series, k, v))
# todo tim fix pubs await self.save(fork, diffs, use_transaction=False, skip_pubs=True) # use_transaction=False if the data is too big
await self.save(fork, diffs, use_transaction=True, skip_pubs=True) # use_transaction=False if the data is too big
# noinspection PyAsyncCall # noinspection PyAsyncCall
@@ -92,18 +91,17 @@ class RedisState (SeriesCollection):
hsets[series][key] = value hsets[series][key] = value
else: else:
raise NotImplementedError raise NotImplementedError
async with memcache.batch(use_transaction) as r: async with memcache.batch(use_transaction) as r:
# Redis pipelines fill up before our state can be sent, so we cannot do this atomically. r: PipelineProxy
# However, sending many individual calls is super slow, so we
r: Pipeline
for series, keys in sadds.items(): for series, keys in sadds.items():
r.sadd(series, *keys) await r.sadd(series, *keys)
for series, keys in sdels.items(): for series, keys in sdels.items():
r.srem(series, *keys) await r.srem(series, *keys)
for series, kvs in hsets.items(): for series, kvs in hsets.items():
r.hset(series, mapping=kvs) await r.hset(series, mapping=kvs)
for series, keys in hdels.items(): for series, keys in hdels.items():
r.hdel(series, *keys) await r.hdel(series, *keys)
block_series = f'{chain_id}|head' block_series = f'{chain_id}|head'
headstr = hexstr(fork.head) headstr = hexstr(fork.head)
r.json(json_encoder).set(block_series,'$',[fork.height, headstr]) r.json(json_encoder).set(block_series,'$',[fork.height, headstr])

View File

@@ -394,8 +394,8 @@ class OHLCRepository:
if trim > 0: if trim > 0:
updated = updated[trim:] updated = updated[trim:]
if len(updated) > 3: # if len(updated) > 3:
log.debug(f'\tnew recents ({len(updated)}): {updated}') # log.debug(f'\tnew recents ({len(updated)}): {updated}')
recent_ohlcs.setitem(key, updated) recent_ohlcs.setitem(key, updated)
return updated return updated

View File

@@ -3,7 +3,6 @@ from dataclasses import dataclass
from typing import Optional, Union, Any from typing import Optional, Union, Any
from uuid import UUID from uuid import UUID
from triton.profiler import deactivate
from web3.exceptions import ContractPanicError, ContractLogicError from web3.exceptions import ContractPanicError, ContractLogicError
from web3.types import EventData from web3.types import EventData

View File

@@ -57,13 +57,13 @@ class OrderTriggers:
self.order = order self.order = order
self.triggers = triggers self.triggers = triggers
OrderTriggers.instances[order.key] = self OrderTriggers.instances[order.key] = self
log.debug(f'created OrderTriggers for {order.key}') # log.debug(f'created OrderTriggers for {order.key}')
def disable(self): def disable(self):
for t in self.triggers: for t in self.triggers:
t.disable() t.disable()
del OrderTriggers.instances[self.order.key] del OrderTriggers.instances[self.order.key]
log.debug(f'disabled OrderTriggers for {self.order.key}') # log.debug(f'disabled OrderTriggers for {self.order.key}')
@property @property
def closed(self): def closed(self):
@@ -223,7 +223,7 @@ async def has_funds(tk: TrancheKey):
# log.debug(f'balances {balances}') # log.debug(f'balances {balances}')
token_addr = order.status.order.tokenIn token_addr = order.status.order.tokenIn
token_balance = balances.get(token_addr) token_balance = balances.get(token_addr)
log.debug(f'amount of {token_addr} = {token_balance}') # log.debug(f'amount of {token_addr} = {token_balance}')
if token_balance is None: if token_balance is None:
# unknown balance # unknown balance
token_balance = balances[token_addr] = await ERC20(token_addr).balanceOf(tk.vault) token_balance = balances[token_addr] = await ERC20(token_addr).balanceOf(tk.vault)