Compare commits

...

7 Commits

Author SHA1 Message Date
tim
4936150c3b bugfixes; startall works 2025-12-09 2025-12-09 15:11:58 -04:00
tim
88057607d5 put app back on app.dexorder.com and corp site on dexorder.com with www redirecting to apex 2025-05-19 15:19:20 -04:00
tim
36d0a863c6 remove spammy debug logs 2025-05-07 16:02:37 -04:00
tim
89ce46793e dotcom 2025-05-06 13:56:05 -04:00
tim
2bcf5d043c redis pipeline overflow fix 2025-04-23 15:20:00 -04:00
tim
71942d5b8f memcache init doesn't use transaction 2025-04-23 14:13:58 -04:00
tim
ef44973646 sharedata 2025-04-23 12:51:14 -04:00
11 changed files with 144 additions and 26 deletions

View File

@@ -0,0 +1,30 @@
"""sharedata
Revision ID: e47d1bca4b3d
Revises: 509010f13e8b
Create Date: 2025-04-23 11:23:10.809341
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = 'e47d1bca4b3d'
down_revision: Union[str, None] = '509010f13e8b'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table('sharedata',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint('id')
)
def downgrade() -> None:
op.drop_table('sharedata')

View File

@@ -42,10 +42,12 @@ class Account (LocalAccount):
# log.debug(f'available accounts: {Account._pool.qsize()}')
try:
async with asyncio.timeout(1):
result = await Account._pool.get()
result: "Account" = await Account._pool.get()
except asyncio.TimeoutError:
log.error('waiting for an available account')
result = await Account._pool.get()
# mark as out of pool
result._in_pool = False
metric.account_available.set(Account._pool.qsize())
return result
@@ -59,17 +61,20 @@ class Account (LocalAccount):
if Account._main_account is None:
Account._main_account = account
Account._pool.put_nowait(account)
account._in_pool = True # this account is now in the pool
Account._all.append(account)
metric.account_available.set(Account._pool.qsize())
metric.account_total.set(len(Account._all))
log.info(f'Account pool {[a.address for a in Account._all]}')
def __init__(self, local_account: LocalAccount): # todo chain_id?
super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code
def __init__(self, local_account: LocalAccount): # todo chain_id?
super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code
self.chain_id = current_chain.get().id
self.signing_middleware = construct_sign_and_send_raw_middleware(self)
self._nonce: Optional[int] = None
self.tx_id: Optional[str] = None # current transaction id
# release() idempotency tracking
self._in_pool: bool = False
async def next_nonce(self):
if self._nonce is None:
@@ -86,8 +91,21 @@ class Account (LocalAccount):
return current_w3.get().eth.get_balance(self.address)
def release(self):
metric.account_available.set(Account._pool.qsize() + 1)
"""
Return this Account to the pool.
Idempotent: calling release() multiple times without a new acquire()
will only enqueue the account once.
"""
# If we're already in the pool, do nothing.
if self._in_pool:
# Optional debug log; comment out if too noisy.
# log.debug(f'Account {self.address} already in pool; ignoring extra release()')
return
Account._pool.put_nowait(self)
self._in_pool = True
metric.account_available.set(Account._pool.qsize())
def __str__(self):
return self.address

View File

@@ -95,7 +95,7 @@ async def write_metadata( pools, mirror_pools ):
pool_dicts = [get_pool(addr) for (addr,_inverted) in mirror_pools]
pool_dicts = await asyncio.gather(*pool_dicts)
for data, addr, (_,inverted) in zip(pool_dicts, pools, mirror_pools):
data['x'] = dict(data=dict(uri=f'https://app.dexorder.trade/ohlc/', chain=42161, symbol=addr, inverted=inverted))
data['x'] = dict(data=dict(uri=f'https://app.dexorder.com/ohlc/', chain=42161, symbol=addr, inverted=inverted))
tokens = set(p['base'] for p in pool_dicts)
tokens.update(p['quote'] for p in pool_dicts)
tokens = await asyncio.gather(*[get_token(t) for t in tokens])

View File

@@ -33,7 +33,8 @@ class ContractTransaction:
async def wait(self) -> TxReceipt:
if self.receipt is None:
self.receipt = await current_w3.get().eth.wait_for_transaction_receipt(self.id)
self.account.release()
if self.account is not None:
self.account.release()
return self.receipt
async def sign(self, account: Account):
@@ -153,10 +154,14 @@ class ContractProxy:
def __getattr__(self, item):
if item == 'constructor':
found = self.contract.constructor
elif item in self.contract.functions:
found = self.contract.functions[item]
else:
raise AttributeError(item)
funcs = self.contract.functions
# In web3.py v6+, contract functions are exposed as attributes, not via __getitem__.
# Using getattr ensures we obtain the callable factory for the function; indexing may return None.
# Additionally, guard against unexpected None to fail fast with a clear error.
found = getattr(funcs, item, None)
if not callable(found):
raise AttributeError(f"Function '{item}' not found on contract {self._interface_name} at {self.address}")
return self._wrapper(self.address, item, found)
def __repr__(self):

View File

@@ -10,3 +10,4 @@ from .ofac import OFAC, OFACAlerts
from .accounting import Accounting, DbAccount
from .vaultcreationrequest import VaultCreationRequest
from .tos import TOSAcceptance
from .sharedata import ShareData

View File

@@ -0,0 +1,12 @@
import logging
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from dexorder.database.model import Base
log = logging.getLogger(__name__)
class ShareData (Base):
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
data: Mapped[dict] = mapped_column(JSONB)

View File

@@ -1,14 +1,69 @@
import itertools
import logging
from contextlib import asynccontextmanager
from contextvars import ContextVar
import redis.asyncio as redis_async
from redis.asyncio import Redis
from redis.asyncio.client import Pipeline
from dexorder import config
log = logging.getLogger(__name__)
BATCH_SIZE = 1_000
class PipelineProxy:
def __init__(self, pipe: Pipeline):
self.pipe = pipe
self.ops = 0
async def push(self, num=1):
self.ops += num
if self.ops >= BATCH_SIZE:
self.ops = 0
await self.pipe.execute()
async def sadd(self, series, *keys):
while keys:
most = min(BATCH_SIZE-self.ops, len(keys))
assert most > 0
send = keys[:most]
keys = keys[most:]
await self.pipe.sadd(series, *send)
await self.push(len(send))
async def srem(self, series, *keys):
while keys:
most = min(BATCH_SIZE-self.ops, len(keys))
assert most > 0
send = keys[:most]
keys = keys[most:]
await self.pipe.srem(series, *send)
await self.push(len(send))
async def hset(self, series, *, mapping):
items = list(mapping.items())
while items:
most = min(BATCH_SIZE-self.ops, len(items))
assert most > 0
send = items[:most]
items = items[most:]
await self.pipe.hset(series, mapping={k:v for k,v in send})
await self.push(len(send))
async def hdel(self, series, *keys):
while keys:
most = min(BATCH_SIZE-self.ops, len(keys))
assert most > 0
send = keys[:most]
keys = keys[most:]
await self.pipe.hdel(series, *send)
await self.push(len(send))
def __getattr__(self, item):
return getattr(self.pipe, item)
class Memcache:
@staticmethod
@@ -19,7 +74,7 @@ class Memcache:
# noinspection PyTypeChecker
current_redis.set(pipe)
try:
yield pipe
yield PipelineProxy(pipe)
await pipe.execute()
finally:
current_redis.set(old_redis)

View File

@@ -12,7 +12,7 @@ from dexorder.blockstate.blockdata import SeriesCollection, BlockData
from dexorder.blockstate.diff import DiffEntryItem
from dexorder.blockstate.fork import Fork
from dexorder.blockstate.state import compress_diffs
from dexorder.memcache import current_redis, memcache
from dexorder.memcache import current_redis, memcache, PipelineProxy
from dexorder.util import hexstr
from dexorder.util.async_util import maywait
from dexorder.util.json import json_encoder
@@ -40,8 +40,7 @@ class RedisState (SeriesCollection):
for series in self.datas.keys():
for k, v in state.iteritems(fork, series):
diffs.append(DiffItem(series, k, v))
# todo tim fix pubs
await self.save(fork, diffs, use_transaction=True, skip_pubs=True) # use_transaction=False if the data is too big
await self.save(fork, diffs, use_transaction=False, skip_pubs=True) # use_transaction=False if the data is too big
# noinspection PyAsyncCall
@@ -92,18 +91,17 @@ class RedisState (SeriesCollection):
hsets[series][key] = value
else:
raise NotImplementedError
async with memcache.batch(use_transaction) as r:
# Redis pipelines fill up before our state can be sent, so we cannot do this atomically.
# However, sending many individual calls is super slow, so we
r: Pipeline
r: PipelineProxy
for series, keys in sadds.items():
r.sadd(series, *keys)
await r.sadd(series, *keys)
for series, keys in sdels.items():
r.srem(series, *keys)
await r.srem(series, *keys)
for series, kvs in hsets.items():
r.hset(series, mapping=kvs)
await r.hset(series, mapping=kvs)
for series, keys in hdels.items():
r.hdel(series, *keys)
await r.hdel(series, *keys)
block_series = f'{chain_id}|head'
headstr = hexstr(fork.head)
r.json(json_encoder).set(block_series,'$',[fork.height, headstr])

View File

@@ -394,8 +394,8 @@ class OHLCRepository:
if trim > 0:
updated = updated[trim:]
if len(updated) > 3:
log.debug(f'\tnew recents ({len(updated)}): {updated}')
# if len(updated) > 3:
# log.debug(f'\tnew recents ({len(updated)}): {updated}')
recent_ohlcs.setitem(key, updated)
return updated

View File

@@ -3,7 +3,6 @@ from dataclasses import dataclass
from typing import Optional, Union, Any
from uuid import UUID
from triton.profiler import deactivate
from web3.exceptions import ContractPanicError, ContractLogicError
from web3.types import EventData

View File

@@ -57,13 +57,13 @@ class OrderTriggers:
self.order = order
self.triggers = triggers
OrderTriggers.instances[order.key] = self
log.debug(f'created OrderTriggers for {order.key}')
# log.debug(f'created OrderTriggers for {order.key}')
def disable(self):
for t in self.triggers:
t.disable()
del OrderTriggers.instances[self.order.key]
log.debug(f'disabled OrderTriggers for {self.order.key}')
# log.debug(f'disabled OrderTriggers for {self.order.key}')
@property
def closed(self):
@@ -223,7 +223,7 @@ async def has_funds(tk: TrancheKey):
# log.debug(f'balances {balances}')
token_addr = order.status.order.tokenIn
token_balance = balances.get(token_addr)
log.debug(f'amount of {token_addr} = {token_balance}')
# log.debug(f'amount of {token_addr} = {token_balance}')
if token_balance is None:
# unknown balance
token_balance = balances[token_addr] = await ERC20(token_addr).balanceOf(tk.vault)