Compare commits

...

2 Commits

Author SHA1 Message Date
tim
b18eeb5069 mirror.py connection fix 2025-02-12 18:26:51 -04:00
tim
91973304e2 accounting_lock 2025-02-12 13:02:08 -04:00
9 changed files with 65 additions and 63 deletions

View File

@@ -1,7 +1,7 @@
metadata='metadata.json' # the Dockerfile will move metadata-finaldata.json into positon
accounts = [
# dev account #6
'0x92db14e403b83dfe3df233f83dfa3a0d7096f21ca9b0d6d6b8d88b2b4ec1564e', # 0x976EA74026E726554dB657fA54763abd0C3a0aa9
# dev account #4
'0x7c852118294e51e653712a81e05800f419141751be58f605c371e15141b007a6', # 0x90F79bf6EB2c4f870365E785982E1f101E93b906
]
rpc_url = '${rpc_urls.arbsep_alchemy}'
mirror_source_rpc_url='${rpc_urls.arbsep_alchemy}'

View File

@@ -33,32 +33,40 @@ class ReconciliationException(Exception):
pass
def accounting_lock():
"""
This must be called before accounting_*() calls are made.
"""
db.session.execute(text("LOCK TABLE account, accounting, reconciliation IN EXCLUSIVE MODE"))
async def initialize_accounting():
global accounting_initialized
if not accounting_initialized:
await initialize_mark_to_market() # set up mark-to-market first, so accounts can value their initial balances
await initialize_accounts()
accounting_lock()
await _initialize_mark_to_market() # set up mark-to-market first, so accounts can value their initial balances
await _initialize_accounts()
accounting_initialized = True
log.info(f'accounting initialized\n\tstablecoins: {config.stablecoins}\n\tquotecoins: {config.quotecoins}\n\tnativecoin: {config.nativecoin}')
async def initialize_accounts():
async def _initialize_accounts():
# Since this is called by top-level main functions outside the Runner, we trigger an explicit db commit/rollback
try:
# noinspection PyStatementEffect
await initialize_accounts_2()
await _initialize_accounts_2()
db.session.commit()
except:
db.session.rollback()
raise
async def initialize_accounts_2():
async def _initialize_accounts_2():
fm = await FeeManager.get()
of_account = ensure_account(fm.order_fee_account_addr, AccountKind.OrderFee)
gf_account = ensure_account(fm.gas_fee_account_addr, AccountKind.GasFee)
ff_account = ensure_account(fm.fill_fee_account_addr, AccountKind.FillFee)
exe_accounts = [ensure_account(account.address, AccountKind.Execution) for account in Account.all()]
of_account = _ensure_account(fm.order_fee_account_addr, AccountKind.OrderFee)
gf_account = _ensure_account(fm.gas_fee_account_addr, AccountKind.GasFee)
ff_account = _ensure_account(fm.fill_fee_account_addr, AccountKind.FillFee)
exe_accounts = [_ensure_account(account.address, AccountKind.Execution) for account in Account.all()]
if current_chain.get().id in [1337, 31337]:
log.debug('adjusting debug account balances')
await asyncio.gather(
@@ -68,7 +76,7 @@ async def initialize_accounts_2():
_tracked_addrs.add(db_account.address)
async def initialize_mark_to_market():
async def _initialize_mark_to_market():
quotes.clear()
quotes.extend(config.stablecoins)
quotes.extend(config.quotecoins)
@@ -113,22 +121,7 @@ async def initialize_mark_to_market():
add_mark_pool(addr, pool['base'], pool['quote'], pool['fee'])
async def handle_feeaccountschanged(fee_accounts: EventData):
try:
order_fee_account_addr = fee_accounts['args']['orderFeeAccount']
gas_fee_account_addr = fee_accounts['args']['gasFeeAccount']
fill_fee_account_addr = fee_accounts['args']['fillFeeAccount']
except KeyError:
log.warning(f'Could not parse FeeAccountsChanged {fee_accounts}')
return
fm = await FeeManager.get()
fm.order_fee_account_addr = order_fee_account_addr
fm.gas_fee_account_addr = gas_fee_account_addr
fm.fill_fee_account_addr = fill_fee_account_addr
await initialize_accounts_2()
def ensure_account(addr: str, kind: AccountKind) -> DbAccount:
def _ensure_account(addr: str, kind: AccountKind) -> DbAccount:
chain = current_chain.get()
found = db.session.get(DbAccount, (chain, addr))
if found:
@@ -144,6 +137,21 @@ def ensure_account(addr: str, kind: AccountKind) -> DbAccount:
return found
async def handle_feeaccountschanged(fee_accounts: EventData):
try:
order_fee_account_addr = fee_accounts['args']['orderFeeAccount']
gas_fee_account_addr = fee_accounts['args']['gasFeeAccount']
fill_fee_account_addr = fee_accounts['args']['fillFeeAccount']
except KeyError:
log.warning(f'Could not parse FeeAccountsChanged {fee_accounts}')
return
fm = await FeeManager.get()
fm.order_fee_account_addr = order_fee_account_addr
fm.gas_fee_account_addr = gas_fee_account_addr
fm.fill_fee_account_addr = fill_fee_account_addr
await _initialize_accounts_2()
async def accounting_transfer(receipt: TransactionReceiptDict, token: str,
sender: str, receiver: str, amount: Union[dec,int], adjust_decimals=True):
block_hash = hexstr(receipt['blockHash'])
@@ -224,10 +232,7 @@ async def adjust_balance(account: DbAccount, token=NATIVE_TOKEN, subcategory=Acc
await add_accounting_row(account.address, None, None, AccountingCategory.Special, subcategory, NATIVE_TOKEN, amount, note, adjust_decimals=False)
async def reconcile(account: DbAccount, block_id: Optional[str] = None, last_accounting_row_id: Optional[int] = None):
# First we lock all the relevant tables to ensure consistency
db.session.execute(text("LOCK TABLE account, accounting, reconciliation IN EXCLUSIVE MODE"))
async def accounting_reconcile(account: DbAccount, block_id: Optional[str] = None, last_accounting_row_id: Optional[int] = None):
# Fetch the latest reconciliation for the account
latest_recon = db.session.execute(
select(Reconciliation).where(

View File

@@ -39,7 +39,7 @@ class Account (LocalAccount):
MUST call account.release() after the transaction has completed, to return this Account to the available pool.
"""
Account._init_pool()
log.debug(f'available accounts: {Account._pool.qsize()}')
# log.debug(f'available accounts: {Account._pool.qsize()}')
try:
async with asyncio.timeout(1):
result = await Account._pool.get()

View File

@@ -144,35 +144,28 @@ async def main():
tokens = set(i[1] for i in pool_infos).union(i[2] for i in pool_infos)
log.debug(f'Mirroring tokens')
txs = []
for t in tokens:
# noinspection PyBroadException
try:
info = await get_token_info(t)
# anvil had trouble estimating the gas, so we hardcode it.
tx = await mirrorenv.transact.mirrorToken(info, gas=1_000_000)
txs.append(tx.wait())
await tx.wait()
except Exception:
log.exception(f'Failed to mirror token {t}')
exit(1)
results = await asyncio.gather(*txs)
if any(result['status'] != 1 for result in results):
log.error('Mirroring a token reverted.')
exit(1)
log.info(f'Tokens deployed')
log.debug(f'Mirroring pools {", ".join(pools)}')
txs = []
for pool, info in zip(pools, pool_infos):
# noinspection PyBroadException
try:
# anvil had trouble estimating the gas, so we hardcode it.
tx = await mirrorenv.transact.mirrorPool(info, gas=5_500_000)
await tx.wait()
except Exception:
log.exception(f'Failed to mirror pool {pool}')
exit(1)
txs.append(tx.wait())
await asyncio.gather(*txs)
log.info('Pools deployed')
mirror_pool_list = []

View File

@@ -3,7 +3,7 @@ import logging
from sqlalchemy import select
from dexorder import db, blockchain
from dexorder.accounting import reconcile
from dexorder.accounting import accounting_reconcile, accounting_lock
from dexorder.bin.executable import execute
from dexorder.blocks import fetch_latest_block, current_block
from dexorder.database.model import DbAccount
@@ -15,10 +15,11 @@ async def main():
db.connect()
block = await fetch_latest_block()
current_block.set(block)
accounting_lock()
try:
accounts = db.session.execute(select(DbAccount)).scalars().all()
for account in accounts:
await reconcile(account)
await accounting_reconcile(account)
db.session.commit()
log.info('Reconciliation complete')
except:

View File

@@ -9,7 +9,6 @@ from aiohttp import ClientResponseError, ClientSession, ClientTimeout, TCPConnec
from eth_typing import URI
from hexbytes import HexBytes
from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider
from web3.exceptions import Web3Exception
from web3.middleware.signing import async_construct_sign_and_send_raw_middleware
from web3.types import RPCEndpoint, RPCResponse
@@ -17,7 +16,6 @@ from .. import current_w3, Blockchain, config, Account, NARG
from ..base.chain import current_chain
from ..contract import get_contract_data
log = logging.getLogger(__name__)
@@ -64,7 +62,8 @@ async def create_w3(rpc_url: Union[str,list[str]]=None, account=NARG, autosign=T
http_provider = RetryHTTPProvider(url)
await http_provider.cache_async_session(session)
w3 = AsyncWeb3(http_provider)
w3.middleware_onion.add(archive_intercept_middleware, 'block_number_intercept_middleware')
if archive_urls:
w3.middleware_onion.add(archive_intercept_middleware, 'block_number_intercept_middleware')
w3.middleware_onion.remove('attrdict')
w3.middleware_onion.add(clean_input_async, 'clean_input')
w3.eth.Contract = _make_contract(w3.eth)
@@ -86,7 +85,7 @@ async def create_w3(rpc_url: Union[str,list[str]]=None, account=NARG, autosign=T
chain_ids = await asyncio.gather(*(w3.eth.chain_id for w3 in itertools.chain(w3_instances, archive_instances)))
if len(set(chain_ids)) != 1:
raise RuntimeError("All RPC URLs must belong to the same blockchain")
# noinspection PyTypeChecker
return RoundRobinWebProxy(w3_instances, archive_instances) if len(w3_instances) > 1 or archive_instances else w3_instances[0]

View File

@@ -8,7 +8,6 @@ from web3.types import TxReceipt, TxData
from dexorder import current_w3, Account
from dexorder.blocks import current_block
from dexorder.blockstate.fork import current_fork
from dexorder.util import hexstr
log = logging.getLogger(__name__)
@@ -19,10 +18,11 @@ class ContractTransaction:
# This is the standard RPC transaction dictionary
self.tx = tx
# These three fields are populated only after signing
# These fields are populated only after signing
self.id_bytes: Optional[bytes] = None
self.id: Optional[str] = None
self.data: Optional[bytes] = None
self.account: Optional[Account] = None
# This field is populated only after the transaction has been mined
self.receipt: Optional[TxReceipt] = None # todo could be multiple receipts for different branches!
@@ -33,6 +33,7 @@ class ContractTransaction:
async def wait(self) -> TxReceipt:
if self.receipt is None:
self.receipt = await current_w3.get().eth.wait_for_transaction_receipt(self.id)
self.account.release()
return self.receipt
async def sign(self, account: Account):
@@ -42,6 +43,7 @@ class ContractTransaction:
self.data = signed['rawTransaction']
self.id_bytes = signed['hash']
self.id = hexstr(self.id_bytes)
self.account = account
class DeployTransaction (ContractTransaction):
@@ -74,13 +76,13 @@ def call_wrapper(addr, name, func):
def transact_wrapper(addr, name, func):
async def f(*args, **kwargs):
tx = await func(*args).build_transaction(kwargs)
ct = ContractTransaction(tx)
account = await Account.acquire()
if account is None:
raise ValueError(f'No account to sign transaction {addr}.{name}()')
await ct.sign(account)
try:
tx = await func(*args).build_transaction(kwargs)
ct = ContractTransaction(tx)
account = Account.get()
if account is None:
raise ValueError(f'No account to sign transaction {addr}.{name}()')
await ct.sign(account)
tx_id = await current_w3.get().eth.send_raw_transaction(ct.data)
assert tx_id == ct.id_bytes
return ct

View File

@@ -4,7 +4,8 @@ import logging
from web3.types import EventData
from dexorder import db, metric
from dexorder.accounting import accounting_fill, accounting_placement, accounting_transfer, is_tracked_address
from dexorder.accounting import accounting_fill, accounting_placement, accounting_transfer, is_tracked_address, \
accounting_lock
from dexorder.base.chain import current_chain
from dexorder.base.order import TrancheKey, OrderKey
from dexorder.base.orderlib import SwapOrderState
@@ -31,6 +32,7 @@ def dump_log(eventlog):
def init():
new_pool_prices.clear()
start_trigger_updates()
accounting_lock()
async def handle_order_placed(event: EventData):

View File

@@ -121,15 +121,15 @@ async def end_trigger_updates():
PriceLineTrigger.end_updates(current_clock.get().timestamp)
while _dirty:
tk = _dirty.pop()
log.debug(f'check dirty tranche {tk}')
# log.debug(f'check dirty tranche {tk}')
if _trigger_state.get(tk,0) == 0:
# all clear for execution. add to active list with any necessary proofs
active_tranches[tk] = PriceProof(0)
log.debug(f'active tranche {tk}')
# log.debug(f'active tranche {tk}')
else:
# blocked by one or more triggers being False (nonzero mask)
reason = ', '.join(t.name for t in TrancheTrigger.all[tk].blocking_triggers)
log.debug(f'tranche {tk} blocked by {reason}')
# log.debug(f'tranche {tk} blocked by {reason}')
# check expiry constraint
try:
TrancheTrigger.all[tk].check_expire()
@@ -418,7 +418,7 @@ class PriceLineTrigger (Trigger):
if self.inverted:
price = 1/price
self.last_price = price
log.debug(f'price trigger {price}')
# log.debug(f'price trigger {price}')
if self not in PriceLineTrigger.triggers_set:
self.add_computation(price)
else:
@@ -449,8 +449,8 @@ class PriceLineTrigger (Trigger):
line_value = m * time + b
price_diff = sign * (y - line_value)
activated = price_diff > 0
for price, line, s, a, diff in zip(y, line_value, sign, activated, price_diff):
log.debug(f'price: {line} {"<" if s == 1 else ">"} {price} {a} ({diff:+})')
# for price, line, s, a, diff in zip(y, line_value, sign, activated, price_diff):
# log.debug(f'price: {line} {"<" if s == 1 else ">"} {price} {a} ({diff:+})')
for t, activated in zip(PriceLineTrigger.triggers, activated):
t.handle_result(activated)
PriceLineTrigger.clear_data()