Compare commits

..

2 Commits

Author SHA1 Message Date
tim
b18eeb5069 mirror.py connection fix 2025-02-12 18:26:51 -04:00
tim
91973304e2 accounting_lock 2025-02-12 13:02:08 -04:00
9 changed files with 65 additions and 63 deletions

View File

@@ -1,7 +1,7 @@
metadata='metadata.json' # the Dockerfile will move metadata-finaldata.json into positon metadata='metadata.json' # the Dockerfile will move metadata-finaldata.json into positon
accounts = [ accounts = [
# dev account #6 # dev account #4
'0x92db14e403b83dfe3df233f83dfa3a0d7096f21ca9b0d6d6b8d88b2b4ec1564e', # 0x976EA74026E726554dB657fA54763abd0C3a0aa9 '0x7c852118294e51e653712a81e05800f419141751be58f605c371e15141b007a6', # 0x90F79bf6EB2c4f870365E785982E1f101E93b906
] ]
rpc_url = '${rpc_urls.arbsep_alchemy}' rpc_url = '${rpc_urls.arbsep_alchemy}'
mirror_source_rpc_url='${rpc_urls.arbsep_alchemy}' mirror_source_rpc_url='${rpc_urls.arbsep_alchemy}'

View File

@@ -33,32 +33,40 @@ class ReconciliationException(Exception):
pass pass
def accounting_lock():
"""
This must be called before accounting_*() calls are made.
"""
db.session.execute(text("LOCK TABLE account, accounting, reconciliation IN EXCLUSIVE MODE"))
async def initialize_accounting(): async def initialize_accounting():
global accounting_initialized global accounting_initialized
if not accounting_initialized: if not accounting_initialized:
await initialize_mark_to_market() # set up mark-to-market first, so accounts can value their initial balances accounting_lock()
await initialize_accounts() await _initialize_mark_to_market() # set up mark-to-market first, so accounts can value their initial balances
await _initialize_accounts()
accounting_initialized = True accounting_initialized = True
log.info(f'accounting initialized\n\tstablecoins: {config.stablecoins}\n\tquotecoins: {config.quotecoins}\n\tnativecoin: {config.nativecoin}') log.info(f'accounting initialized\n\tstablecoins: {config.stablecoins}\n\tquotecoins: {config.quotecoins}\n\tnativecoin: {config.nativecoin}')
async def initialize_accounts(): async def _initialize_accounts():
# Since this is called by top-level main functions outside the Runner, we trigger an explicit db commit/rollback # Since this is called by top-level main functions outside the Runner, we trigger an explicit db commit/rollback
try: try:
# noinspection PyStatementEffect # noinspection PyStatementEffect
await initialize_accounts_2() await _initialize_accounts_2()
db.session.commit() db.session.commit()
except: except:
db.session.rollback() db.session.rollback()
raise raise
async def initialize_accounts_2(): async def _initialize_accounts_2():
fm = await FeeManager.get() fm = await FeeManager.get()
of_account = ensure_account(fm.order_fee_account_addr, AccountKind.OrderFee) of_account = _ensure_account(fm.order_fee_account_addr, AccountKind.OrderFee)
gf_account = ensure_account(fm.gas_fee_account_addr, AccountKind.GasFee) gf_account = _ensure_account(fm.gas_fee_account_addr, AccountKind.GasFee)
ff_account = ensure_account(fm.fill_fee_account_addr, AccountKind.FillFee) ff_account = _ensure_account(fm.fill_fee_account_addr, AccountKind.FillFee)
exe_accounts = [ensure_account(account.address, AccountKind.Execution) for account in Account.all()] exe_accounts = [_ensure_account(account.address, AccountKind.Execution) for account in Account.all()]
if current_chain.get().id in [1337, 31337]: if current_chain.get().id in [1337, 31337]:
log.debug('adjusting debug account balances') log.debug('adjusting debug account balances')
await asyncio.gather( await asyncio.gather(
@@ -68,7 +76,7 @@ async def initialize_accounts_2():
_tracked_addrs.add(db_account.address) _tracked_addrs.add(db_account.address)
async def initialize_mark_to_market(): async def _initialize_mark_to_market():
quotes.clear() quotes.clear()
quotes.extend(config.stablecoins) quotes.extend(config.stablecoins)
quotes.extend(config.quotecoins) quotes.extend(config.quotecoins)
@@ -113,22 +121,7 @@ async def initialize_mark_to_market():
add_mark_pool(addr, pool['base'], pool['quote'], pool['fee']) add_mark_pool(addr, pool['base'], pool['quote'], pool['fee'])
async def handle_feeaccountschanged(fee_accounts: EventData): def _ensure_account(addr: str, kind: AccountKind) -> DbAccount:
try:
order_fee_account_addr = fee_accounts['args']['orderFeeAccount']
gas_fee_account_addr = fee_accounts['args']['gasFeeAccount']
fill_fee_account_addr = fee_accounts['args']['fillFeeAccount']
except KeyError:
log.warning(f'Could not parse FeeAccountsChanged {fee_accounts}')
return
fm = await FeeManager.get()
fm.order_fee_account_addr = order_fee_account_addr
fm.gas_fee_account_addr = gas_fee_account_addr
fm.fill_fee_account_addr = fill_fee_account_addr
await initialize_accounts_2()
def ensure_account(addr: str, kind: AccountKind) -> DbAccount:
chain = current_chain.get() chain = current_chain.get()
found = db.session.get(DbAccount, (chain, addr)) found = db.session.get(DbAccount, (chain, addr))
if found: if found:
@@ -144,6 +137,21 @@ def ensure_account(addr: str, kind: AccountKind) -> DbAccount:
return found return found
async def handle_feeaccountschanged(fee_accounts: EventData):
try:
order_fee_account_addr = fee_accounts['args']['orderFeeAccount']
gas_fee_account_addr = fee_accounts['args']['gasFeeAccount']
fill_fee_account_addr = fee_accounts['args']['fillFeeAccount']
except KeyError:
log.warning(f'Could not parse FeeAccountsChanged {fee_accounts}')
return
fm = await FeeManager.get()
fm.order_fee_account_addr = order_fee_account_addr
fm.gas_fee_account_addr = gas_fee_account_addr
fm.fill_fee_account_addr = fill_fee_account_addr
await _initialize_accounts_2()
async def accounting_transfer(receipt: TransactionReceiptDict, token: str, async def accounting_transfer(receipt: TransactionReceiptDict, token: str,
sender: str, receiver: str, amount: Union[dec,int], adjust_decimals=True): sender: str, receiver: str, amount: Union[dec,int], adjust_decimals=True):
block_hash = hexstr(receipt['blockHash']) block_hash = hexstr(receipt['blockHash'])
@@ -224,10 +232,7 @@ async def adjust_balance(account: DbAccount, token=NATIVE_TOKEN, subcategory=Acc
await add_accounting_row(account.address, None, None, AccountingCategory.Special, subcategory, NATIVE_TOKEN, amount, note, adjust_decimals=False) await add_accounting_row(account.address, None, None, AccountingCategory.Special, subcategory, NATIVE_TOKEN, amount, note, adjust_decimals=False)
async def reconcile(account: DbAccount, block_id: Optional[str] = None, last_accounting_row_id: Optional[int] = None): async def accounting_reconcile(account: DbAccount, block_id: Optional[str] = None, last_accounting_row_id: Optional[int] = None):
# First we lock all the relevant tables to ensure consistency
db.session.execute(text("LOCK TABLE account, accounting, reconciliation IN EXCLUSIVE MODE"))
# Fetch the latest reconciliation for the account # Fetch the latest reconciliation for the account
latest_recon = db.session.execute( latest_recon = db.session.execute(
select(Reconciliation).where( select(Reconciliation).where(

View File

@@ -39,7 +39,7 @@ class Account (LocalAccount):
MUST call account.release() after the transaction has completed, to return this Account to the available pool. MUST call account.release() after the transaction has completed, to return this Account to the available pool.
""" """
Account._init_pool() Account._init_pool()
log.debug(f'available accounts: {Account._pool.qsize()}') # log.debug(f'available accounts: {Account._pool.qsize()}')
try: try:
async with asyncio.timeout(1): async with asyncio.timeout(1):
result = await Account._pool.get() result = await Account._pool.get()

View File

@@ -144,35 +144,28 @@ async def main():
tokens = set(i[1] for i in pool_infos).union(i[2] for i in pool_infos) tokens = set(i[1] for i in pool_infos).union(i[2] for i in pool_infos)
log.debug(f'Mirroring tokens') log.debug(f'Mirroring tokens')
txs = []
for t in tokens: for t in tokens:
# noinspection PyBroadException # noinspection PyBroadException
try: try:
info = await get_token_info(t) info = await get_token_info(t)
# anvil had trouble estimating the gas, so we hardcode it. # anvil had trouble estimating the gas, so we hardcode it.
tx = await mirrorenv.transact.mirrorToken(info, gas=1_000_000) tx = await mirrorenv.transact.mirrorToken(info, gas=1_000_000)
txs.append(tx.wait()) await tx.wait()
except Exception: except Exception:
log.exception(f'Failed to mirror token {t}') log.exception(f'Failed to mirror token {t}')
exit(1) exit(1)
results = await asyncio.gather(*txs)
if any(result['status'] != 1 for result in results):
log.error('Mirroring a token reverted.')
exit(1)
log.info(f'Tokens deployed') log.info(f'Tokens deployed')
log.debug(f'Mirroring pools {", ".join(pools)}') log.debug(f'Mirroring pools {", ".join(pools)}')
txs = []
for pool, info in zip(pools, pool_infos): for pool, info in zip(pools, pool_infos):
# noinspection PyBroadException # noinspection PyBroadException
try: try:
# anvil had trouble estimating the gas, so we hardcode it. # anvil had trouble estimating the gas, so we hardcode it.
tx = await mirrorenv.transact.mirrorPool(info, gas=5_500_000) tx = await mirrorenv.transact.mirrorPool(info, gas=5_500_000)
await tx.wait()
except Exception: except Exception:
log.exception(f'Failed to mirror pool {pool}') log.exception(f'Failed to mirror pool {pool}')
exit(1) exit(1)
txs.append(tx.wait())
await asyncio.gather(*txs)
log.info('Pools deployed') log.info('Pools deployed')
mirror_pool_list = [] mirror_pool_list = []

View File

@@ -3,7 +3,7 @@ import logging
from sqlalchemy import select from sqlalchemy import select
from dexorder import db, blockchain from dexorder import db, blockchain
from dexorder.accounting import reconcile from dexorder.accounting import accounting_reconcile, accounting_lock
from dexorder.bin.executable import execute from dexorder.bin.executable import execute
from dexorder.blocks import fetch_latest_block, current_block from dexorder.blocks import fetch_latest_block, current_block
from dexorder.database.model import DbAccount from dexorder.database.model import DbAccount
@@ -15,10 +15,11 @@ async def main():
db.connect() db.connect()
block = await fetch_latest_block() block = await fetch_latest_block()
current_block.set(block) current_block.set(block)
accounting_lock()
try: try:
accounts = db.session.execute(select(DbAccount)).scalars().all() accounts = db.session.execute(select(DbAccount)).scalars().all()
for account in accounts: for account in accounts:
await reconcile(account) await accounting_reconcile(account)
db.session.commit() db.session.commit()
log.info('Reconciliation complete') log.info('Reconciliation complete')
except: except:

View File

@@ -9,7 +9,6 @@ from aiohttp import ClientResponseError, ClientSession, ClientTimeout, TCPConnec
from eth_typing import URI from eth_typing import URI
from hexbytes import HexBytes from hexbytes import HexBytes
from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider
from web3.exceptions import Web3Exception
from web3.middleware.signing import async_construct_sign_and_send_raw_middleware from web3.middleware.signing import async_construct_sign_and_send_raw_middleware
from web3.types import RPCEndpoint, RPCResponse from web3.types import RPCEndpoint, RPCResponse
@@ -17,7 +16,6 @@ from .. import current_w3, Blockchain, config, Account, NARG
from ..base.chain import current_chain from ..base.chain import current_chain
from ..contract import get_contract_data from ..contract import get_contract_data
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -64,7 +62,8 @@ async def create_w3(rpc_url: Union[str,list[str]]=None, account=NARG, autosign=T
http_provider = RetryHTTPProvider(url) http_provider = RetryHTTPProvider(url)
await http_provider.cache_async_session(session) await http_provider.cache_async_session(session)
w3 = AsyncWeb3(http_provider) w3 = AsyncWeb3(http_provider)
w3.middleware_onion.add(archive_intercept_middleware, 'block_number_intercept_middleware') if archive_urls:
w3.middleware_onion.add(archive_intercept_middleware, 'block_number_intercept_middleware')
w3.middleware_onion.remove('attrdict') w3.middleware_onion.remove('attrdict')
w3.middleware_onion.add(clean_input_async, 'clean_input') w3.middleware_onion.add(clean_input_async, 'clean_input')
w3.eth.Contract = _make_contract(w3.eth) w3.eth.Contract = _make_contract(w3.eth)
@@ -86,7 +85,7 @@ async def create_w3(rpc_url: Union[str,list[str]]=None, account=NARG, autosign=T
chain_ids = await asyncio.gather(*(w3.eth.chain_id for w3 in itertools.chain(w3_instances, archive_instances))) chain_ids = await asyncio.gather(*(w3.eth.chain_id for w3 in itertools.chain(w3_instances, archive_instances)))
if len(set(chain_ids)) != 1: if len(set(chain_ids)) != 1:
raise RuntimeError("All RPC URLs must belong to the same blockchain") raise RuntimeError("All RPC URLs must belong to the same blockchain")
# noinspection PyTypeChecker # noinspection PyTypeChecker
return RoundRobinWebProxy(w3_instances, archive_instances) if len(w3_instances) > 1 or archive_instances else w3_instances[0] return RoundRobinWebProxy(w3_instances, archive_instances) if len(w3_instances) > 1 or archive_instances else w3_instances[0]

View File

@@ -8,7 +8,6 @@ from web3.types import TxReceipt, TxData
from dexorder import current_w3, Account from dexorder import current_w3, Account
from dexorder.blocks import current_block from dexorder.blocks import current_block
from dexorder.blockstate.fork import current_fork
from dexorder.util import hexstr from dexorder.util import hexstr
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -19,10 +18,11 @@ class ContractTransaction:
# This is the standard RPC transaction dictionary # This is the standard RPC transaction dictionary
self.tx = tx self.tx = tx
# These three fields are populated only after signing # These fields are populated only after signing
self.id_bytes: Optional[bytes] = None self.id_bytes: Optional[bytes] = None
self.id: Optional[str] = None self.id: Optional[str] = None
self.data: Optional[bytes] = None self.data: Optional[bytes] = None
self.account: Optional[Account] = None
# This field is populated only after the transaction has been mined # This field is populated only after the transaction has been mined
self.receipt: Optional[TxReceipt] = None # todo could be multiple receipts for different branches! self.receipt: Optional[TxReceipt] = None # todo could be multiple receipts for different branches!
@@ -33,6 +33,7 @@ class ContractTransaction:
async def wait(self) -> TxReceipt: async def wait(self) -> TxReceipt:
if self.receipt is None: if self.receipt is None:
self.receipt = await current_w3.get().eth.wait_for_transaction_receipt(self.id) self.receipt = await current_w3.get().eth.wait_for_transaction_receipt(self.id)
self.account.release()
return self.receipt return self.receipt
async def sign(self, account: Account): async def sign(self, account: Account):
@@ -42,6 +43,7 @@ class ContractTransaction:
self.data = signed['rawTransaction'] self.data = signed['rawTransaction']
self.id_bytes = signed['hash'] self.id_bytes = signed['hash']
self.id = hexstr(self.id_bytes) self.id = hexstr(self.id_bytes)
self.account = account
class DeployTransaction (ContractTransaction): class DeployTransaction (ContractTransaction):
@@ -74,13 +76,13 @@ def call_wrapper(addr, name, func):
def transact_wrapper(addr, name, func): def transact_wrapper(addr, name, func):
async def f(*args, **kwargs): async def f(*args, **kwargs):
tx = await func(*args).build_transaction(kwargs)
ct = ContractTransaction(tx)
account = await Account.acquire()
if account is None:
raise ValueError(f'No account to sign transaction {addr}.{name}()')
await ct.sign(account)
try: try:
tx = await func(*args).build_transaction(kwargs)
ct = ContractTransaction(tx)
account = Account.get()
if account is None:
raise ValueError(f'No account to sign transaction {addr}.{name}()')
await ct.sign(account)
tx_id = await current_w3.get().eth.send_raw_transaction(ct.data) tx_id = await current_w3.get().eth.send_raw_transaction(ct.data)
assert tx_id == ct.id_bytes assert tx_id == ct.id_bytes
return ct return ct

View File

@@ -4,7 +4,8 @@ import logging
from web3.types import EventData from web3.types import EventData
from dexorder import db, metric from dexorder import db, metric
from dexorder.accounting import accounting_fill, accounting_placement, accounting_transfer, is_tracked_address from dexorder.accounting import accounting_fill, accounting_placement, accounting_transfer, is_tracked_address, \
accounting_lock
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.base.order import TrancheKey, OrderKey from dexorder.base.order import TrancheKey, OrderKey
from dexorder.base.orderlib import SwapOrderState from dexorder.base.orderlib import SwapOrderState
@@ -31,6 +32,7 @@ def dump_log(eventlog):
def init(): def init():
new_pool_prices.clear() new_pool_prices.clear()
start_trigger_updates() start_trigger_updates()
accounting_lock()
async def handle_order_placed(event: EventData): async def handle_order_placed(event: EventData):

View File

@@ -121,15 +121,15 @@ async def end_trigger_updates():
PriceLineTrigger.end_updates(current_clock.get().timestamp) PriceLineTrigger.end_updates(current_clock.get().timestamp)
while _dirty: while _dirty:
tk = _dirty.pop() tk = _dirty.pop()
log.debug(f'check dirty tranche {tk}') # log.debug(f'check dirty tranche {tk}')
if _trigger_state.get(tk,0) == 0: if _trigger_state.get(tk,0) == 0:
# all clear for execution. add to active list with any necessary proofs # all clear for execution. add to active list with any necessary proofs
active_tranches[tk] = PriceProof(0) active_tranches[tk] = PriceProof(0)
log.debug(f'active tranche {tk}') # log.debug(f'active tranche {tk}')
else: else:
# blocked by one or more triggers being False (nonzero mask) # blocked by one or more triggers being False (nonzero mask)
reason = ', '.join(t.name for t in TrancheTrigger.all[tk].blocking_triggers) reason = ', '.join(t.name for t in TrancheTrigger.all[tk].blocking_triggers)
log.debug(f'tranche {tk} blocked by {reason}') # log.debug(f'tranche {tk} blocked by {reason}')
# check expiry constraint # check expiry constraint
try: try:
TrancheTrigger.all[tk].check_expire() TrancheTrigger.all[tk].check_expire()
@@ -418,7 +418,7 @@ class PriceLineTrigger (Trigger):
if self.inverted: if self.inverted:
price = 1/price price = 1/price
self.last_price = price self.last_price = price
log.debug(f'price trigger {price}') # log.debug(f'price trigger {price}')
if self not in PriceLineTrigger.triggers_set: if self not in PriceLineTrigger.triggers_set:
self.add_computation(price) self.add_computation(price)
else: else:
@@ -449,8 +449,8 @@ class PriceLineTrigger (Trigger):
line_value = m * time + b line_value = m * time + b
price_diff = sign * (y - line_value) price_diff = sign * (y - line_value)
activated = price_diff > 0 activated = price_diff > 0
for price, line, s, a, diff in zip(y, line_value, sign, activated, price_diff): # for price, line, s, a, diff in zip(y, line_value, sign, activated, price_diff):
log.debug(f'price: {line} {"<" if s == 1 else ">"} {price} {a} ({diff:+})') # log.debug(f'price: {line} {"<" if s == 1 else ">"} {price} {a} ({diff:+})')
for t, activated in zip(PriceLineTrigger.triggers, activated): for t, activated in zip(PriceLineTrigger.triggers, activated):
t.handle_result(activated) t.handle_result(activated)
PriceLineTrigger.clear_data() PriceLineTrigger.clear_data()