accounts reconciliation; tosacceptance

This commit is contained in:
tim
2025-01-28 01:09:10 -04:00
parent adebbb833c
commit cda2446c0e
17 changed files with 296 additions and 74 deletions

View File

@@ -42,6 +42,7 @@ def upgrade() -> None:
op.create_index(op.f('ix_accounting_time'), 'accounting', ['time'], unique=False) op.create_index(op.f('ix_accounting_time'), 'accounting', ['time'], unique=False)
op.create_index(op.f('ix_accounting_token'), 'accounting', ['token'], unique=False) op.create_index(op.f('ix_accounting_token'), 'accounting', ['token'], unique=False)
op.create_index(op.f('ix_accounting_account'), 'accounting', ['account'], unique=False) op.create_index(op.f('ix_accounting_account'), 'accounting', ['account'], unique=False)
op.create_index(op.f('ix_accounting_chain_id'), 'accounting', ['chain_id'], unique=False)
op.create_table('ofac', op.create_table('ofac',
sa.Column('address', sa.String(), nullable=False), sa.Column('address', sa.String(), nullable=False),
sa.PrimaryKeyConstraint('address') sa.PrimaryKeyConstraint('address')
@@ -63,31 +64,60 @@ def upgrade() -> None:
sa.PrimaryKeyConstraint('chain', 'owner', 'num') sa.PrimaryKeyConstraint('chain', 'owner', 'num')
) )
op.create_index('ix_vault_address_not_null', 'vaultcreationrequest', ['vault'], unique=False, postgresql_where='vault IS NOT NULL') op.create_index('ix_vault_address_not_null', 'vaultcreationrequest', ['vault'], unique=False, postgresql_where='vault IS NOT NULL')
op.create_table('accounts', op.create_table('account',
sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False),
sa.Column('address', sa.String(), nullable=False), sa.Column('address', sa.String(), nullable=False),
sa.Column('kind', sa.Enum('Admin', 'OrderFee', 'GasFee', 'FillFee', 'Execution', name='accountkind'), nullable=False), sa.Column('kind', sa.Enum('Admin', 'OrderFee', 'GasFee', 'FillFee', 'Execution', name='accountkind'), nullable=False),
sa.Column('balances', dexorder.database.column_types.Balances(astext_type=sa.Text()), server_default='{}', nullable=False), sa.Column('balances', dexorder.database.column_types.Balances(astext_type=sa.Text()), server_default='{}', nullable=False),
sa.PrimaryKeyConstraint('chain', 'address') sa.PrimaryKeyConstraint('chain', 'address')
) )
op.create_index(op.f('ix_accounts_kind'), 'accounts', ['kind'], unique=False) op.create_index(op.f('ix_account_kind'), 'account', ['kind'], unique=False)
op.create_table('reconciliation',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False),
sa.Column('address', sa.String(), nullable=False),
sa.Column('accounting_id', sa.Integer(), nullable=False),
sa.Column('height', sa.Integer(), nullable=False),
sa.Column('balances', dexorder.database.column_types.Balances(astext_type=sa.Text()), server_default='{}', nullable=False),
sa.ForeignKeyConstraint(['chain', 'address'], ['account.chain', 'account.address'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_reconciliation_accounting_id'), 'reconciliation', ['accounting_id'], unique=False)
op.create_index(op.f('ix_reconciliation_address'), 'reconciliation', ['address'], unique=False)
op.create_index(op.f('ix_reconciliation_chain'), 'reconciliation', ['chain'], unique=False)
op.create_index(op.f('ix_reconciliation_height'), 'reconciliation', ['height'], unique=False)
op.create_table('tosacceptance',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('ipaddr', sa.String(), nullable=False),
sa.Column('time', sa.DateTime(), nullable=False),
sa.Column('version', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
def downgrade() -> None: def downgrade() -> None:
op.drop_index(op.f('ix_accounting_account'), table_name='accounting') op.drop_table('tosacceptance')
op.drop_index(op.f('ix_accounts_kind'), table_name='accounts') op.drop_index(op.f('ix_reconciliation_height'), table_name='reconciliation')
op.drop_table('accounts') op.drop_index(op.f('ix_reconciliation_chain'), table_name='reconciliation')
op.drop_index(op.f('ix_reconciliation_address'), table_name='reconciliation')
op.drop_index(op.f('ix_reconciliation_accounting_id'), table_name='reconciliation')
op.drop_table('reconciliation')
op.drop_index(op.f('ix_account_kind'), table_name='account')
op.drop_table('account')
op.drop_index('ix_vault_address_not_null', table_name='vaultcreationrequest', postgresql_where='vault IS NOT NULL') op.drop_index('ix_vault_address_not_null', table_name='vaultcreationrequest', postgresql_where='vault IS NOT NULL')
op.drop_table('vaultcreationrequest') op.drop_table('vaultcreationrequest')
op.drop_table('ofacalerts') op.drop_table('ofacalerts')
# op.execute('drop sequence ofacalerts_id_seq') # autoincrement sequence
op.drop_table('ofac') op.drop_table('ofac')
op.drop_index(op.f('ix_accounting_chain_id'), table_name='accounting')
op.drop_index(op.f('ix_accounting_account'), table_name='accounting')
op.drop_index(op.f('ix_accounting_token'), table_name='accounting') op.drop_index(op.f('ix_accounting_token'), table_name='accounting')
op.drop_index(op.f('ix_accounting_time'), table_name='accounting') op.drop_index(op.f('ix_accounting_time'), table_name='accounting')
op.drop_index(op.f('ix_accounting_subcategory'), table_name='accounting') op.drop_index(op.f('ix_accounting_subcategory'), table_name='accounting')
op.drop_index(op.f('ix_accounting_category'), table_name='accounting') op.drop_index(op.f('ix_accounting_category'), table_name='accounting')
op.drop_table('accounting') op.drop_table('accounting')
# op.execute('drop sequence accounting_id_seq') # autoincrement sequence
op.execute('drop type accountkind') # enum type op.execute('drop type accountkind') # enum type
op.execute('drop type accountingcategory') # enum type op.execute('drop type accountingcategory') # enum type
op.execute('drop type accountingsubcategory') # enum type op.execute('drop type accountingsubcategory') # enum type

View File

@@ -9,9 +9,9 @@ accounts = [
# '0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d', # 0x70997970C51812dc3A010C7d01b50e0d17dc79C8 # '0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d', # 0x70997970C51812dc3A010C7d01b50e0d17dc79C8
# '0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a', # 0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC # '0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a', # 0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC
# '0x7c852118294e51e653712a81e05800f419141751be58f605c371e15141b007a6', # 0x90F79bf6EB2c4f870365E785982E1f101E93b906 # '0x7c852118294e51e653712a81e05800f419141751be58f605c371e15141b007a6', # 0x90F79bf6EB2c4f870365E785982E1f101E93b906
'0x47e179ec197488593b187f80a00eb0da91f1b9d0b13f8733639f19c30a34926a', # 0x15d34AAf54267DB7D7c367839AAf71A00a2C6A65 # '0x47e179ec197488593b187f80a00eb0da91f1b9d0b13f8733639f19c30a34926a', # 0x15d34AAf54267DB7D7c367839AAf71A00a2C6A65
'0x8b3a350cf5c34c9194ca85829a2df0ec3153be0318b5e2d3348e872092edffba', # 0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc '0x8b3a350cf5c34c9194ca85829a2df0ec3153be0318b5e2d3348e872092edffba', # 0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc
# '0x92db14e403b83dfe3df233f83dfa3a0d7096f21ca9b0d6d6b8d88b2b4ec1564e', # 0x976EA74026E726554dB657fA54763abd0C3a0aa9 '0x92db14e403b83dfe3df233f83dfa3a0d7096f21ca9b0d6d6b8d88b2b4ec1564e', # 0x976EA74026E726554dB657fA54763abd0C3a0aa9
# '0x4bbbf85ce3377467afe5d46f804f221813b2bb87f24d81f60f1fcdbf7cbf4356', # 0x14dC79964da2C08b23698B3D3cc7Ca32193d9955 # '0x4bbbf85ce3377467afe5d46f804f221813b2bb87f24d81f60f1fcdbf7cbf4356', # 0x14dC79964da2C08b23698B3D3cc7Ca32193d9955
# '0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97', # 0x23618e81E3f5cdF7f54C3d65f7FBc0aBf5B21E8f # '0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97', # 0x23618e81E3f5cdF7f54C3d65f7FBc0aBf5B21E8f
# '0x2a871d0798f97d79848a013d4936a73bf4cc922c825d33c1cf7073dff6d409c6', # 0xa0Ee7A142d267C1f36714E4a8F75612F20a79720 # '0x2a871d0798f97d79848a013d4936a73bf4cc922c825d33c1cf7073dff6d409c6', # 0xa0Ee7A142d267C1f36714E4a8F75612F20a79720

View File

@@ -1,18 +1,19 @@
import asyncio import asyncio
import logging import logging
from sqlalchemy import select, func
from typing_extensions import Optional from typing_extensions import Optional
from web3.exceptions import ContractLogicError from web3.exceptions import ContractLogicError
from web3.types import EventData from web3.types import EventData
from dexorder import db, dec, NATIVE_TOKEN, from_timestamp, config, ADDRESS_0, now from dexorder import db, dec, NATIVE_TOKEN, from_timestamp, config, ADDRESS_0, now, Account
from dexorder.base import TransactionReceiptDict from dexorder.base import TransactionReceiptDict
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.blocks import get_block_timestamp from dexorder.blocks import get_block_timestamp, get_block, current_block
from dexorder.contract import ContractProxy
from dexorder.contract.dexorder import get_factory_contract, get_mirrorenv, get_mockenv from dexorder.contract.dexorder import get_factory_contract, get_mirrorenv, get_mockenv
from dexorder.database.model.accounting import AccountingSubcategory, Accounting, AccountingCategory, AccountKind, \ from dexorder.database.model.accounting import AccountingSubcategory, Accounting, AccountingCategory, AccountKind, \
Accounts DbAccount, Reconciliation
from dexorder.feemanager import FeeManager
from dexorder.pools import mark_to_market, pool_prices, get_pool, add_mark_pool, quotes from dexorder.pools import mark_to_market, pool_prices, get_pool, add_mark_pool, quotes
from dexorder.tokens import adjust_decimals as adj_dec, get_token, get_balance from dexorder.tokens import adjust_decimals as adj_dec, get_token, get_balance
from dexorder.util import hexstr from dexorder.util import hexstr
@@ -21,12 +22,9 @@ log = logging.getLogger(__name__)
accounting_initialized = False accounting_initialized = False
# noinspection PyTypeChecker
order_fee_account_addr: str = None class ReconciliationException(Exception):
# noinspection PyTypeChecker pass
gas_fee_account_addr: str = None
# noinspection PyTypeChecker
fill_fee_account_addr: str = None
async def initialize_accounting(): async def initialize_accounting():
@@ -39,16 +37,6 @@ async def initialize_accounting():
async def initialize_accounts(): async def initialize_accounts():
factory_contract = get_factory_contract()
implementation_address = await factory_contract.implementation()
fee_manager_address = await ContractProxy(implementation_address, 'IVaultImpl').feeManager()
fee_manager = ContractProxy(fee_manager_address, 'IFeeManager')
global order_fee_account_addr, gas_fee_account_addr, fill_fee_account_addr
order_fee_account_addr, gas_fee_account_addr, fill_fee_account_addr = await asyncio.gather(
fee_manager.orderFeeAccount(),
fee_manager.gasFeeAccount(),
fee_manager.fillFeeAccount()
)
# Since this is called by top-level main functions outside the Runner, we trigger an explicit db commit/rollback # Since this is called by top-level main functions outside the Runner, we trigger an explicit db commit/rollback
try: try:
# noinspection PyStatementEffect # noinspection PyStatementEffect
@@ -60,15 +48,15 @@ async def initialize_accounts():
async def initialize_accounts_2(): async def initialize_accounts_2():
of_account = ensure_account(order_fee_account_addr, AccountKind.OrderFee) fm = await FeeManager.get()
gf_account = ensure_account(gas_fee_account_addr, AccountKind.GasFee) of_account = ensure_account(fm.order_fee_account_addr, AccountKind.OrderFee)
ff_account = ensure_account(fill_fee_account_addr, AccountKind.FillFee) gf_account = ensure_account(fm.gas_fee_account_addr, AccountKind.GasFee)
ff_account = ensure_account(fm.fill_fee_account_addr, AccountKind.FillFee)
exe_accounts = [ensure_account(account.address, AccountKind.Execution) for account in Account.all()]
if current_chain.get().id in [1337, 31337]: if current_chain.get().id in [1337, 31337]:
log.debug('initializing debug accounts') log.debug('adjusting debug account balances')
await asyncio.gather( await asyncio.gather(
adjust_balance(of_account), *map(adjust_balance, (of_account, gf_account, ff_account, *exe_accounts))
adjust_balance(gf_account),
adjust_balance(ff_account),
) )
@@ -119,26 +107,29 @@ async def initialize_mark_to_market():
async def handle_feeaccountschanged(fee_accounts: EventData): async def handle_feeaccountschanged(fee_accounts: EventData):
try: try:
global order_fee_account_addr, gas_fee_account_addr, fill_fee_account_addr
order_fee_account_addr = fee_accounts['args']['orderFeeAccount'] order_fee_account_addr = fee_accounts['args']['orderFeeAccount']
gas_fee_account_addr = fee_accounts['args']['gasFeeAccount'] gas_fee_account_addr = fee_accounts['args']['gasFeeAccount']
fill_fee_account_addr = fee_accounts['args']['fillFeeAccount'] fill_fee_account_addr = fee_accounts['args']['fillFeeAccount']
await initialize_accounts_2()
except KeyError: except KeyError:
log.warning(f'Could not parse FeeAccountsChanged {fee_accounts}') log.warning(f'Could not parse FeeAccountsChanged {fee_accounts}')
return return
fm = await FeeManager.get()
fm.order_fee_account_addr = order_fee_account_addr
fm.gas_fee_account_addr = gas_fee_account_addr
fm.fill_fee_account_addr = fill_fee_account_addr
await initialize_accounts_2()
def ensure_account(addr: str, kind: AccountKind): def ensure_account(addr: str, kind: AccountKind) -> DbAccount:
chain = current_chain.get() chain = current_chain.get()
found = db.session.get(Accounts, (chain, addr)) found = db.session.get(DbAccount, (chain, addr))
if found: if found:
if found.kind != kind: if found.kind != kind:
log.warning(f'Account {addr} has wrong kind {found.kind} != {kind}') log.warning(f'Account {addr} has wrong kind {found.kind} != {kind}')
found.kind = kind found.kind = kind
db.session.add(found) db.session.add(found)
else: else:
found = Accounts(chain=chain, address=addr, kind=kind, balances={}) found = DbAccount(chain=chain, address=addr, kind=kind, balances={})
db.session.add(found) db.session.add(found)
return found return found
@@ -156,9 +147,10 @@ async def accounting_transfer(receipt: TransactionReceiptDict, token: str, sende
async def accounting_transaction_gas(receipt: TransactionReceiptDict, subcategory: AccountingSubcategory): async def accounting_transaction_gas(receipt: TransactionReceiptDict, subcategory: AccountingSubcategory):
""" Accounts for the gas spent on the given transaction """ """ Accounts for the gas spent on the given transaction """
amount = dec(receipt['gasUsed']) * dec(receipt['effectiveGasPrice'])
await add_accounting_row( receipt['from'], await add_accounting_row( receipt['from'],
hexstr(receipt['blockHash']), hexstr(receipt['transactionHash']), hexstr(receipt['blockHash']), hexstr(receipt['transactionHash']),
AccountingCategory.Expense, subcategory, NATIVE_TOKEN, -dec(receipt['gasUsed']) AccountingCategory.Expense, subcategory, NATIVE_TOKEN, -amount
) )
@@ -171,9 +163,10 @@ async def accounting_placement(order_placed: EventData):
except KeyError: except KeyError:
log.warning(f'Rogue DexorderPlacedEvent in tx {hexstr(tx_id)}') log.warning(f'Rogue DexorderPlacedEvent in tx {hexstr(tx_id)}')
return return
await add_accounting_row( order_fee_account_addr, block_hash, tx_id, AccountingCategory.Income, fm = await FeeManager.get()
await add_accounting_row( fm.order_fee_account_addr, block_hash, tx_id, AccountingCategory.Income,
AccountingSubcategory.OrderFee, NATIVE_TOKEN, order_fee) AccountingSubcategory.OrderFee, NATIVE_TOKEN, order_fee)
await add_accounting_row( gas_fee_account_addr, block_hash, tx_id, AccountingCategory.Income, await add_accounting_row( fm.gas_fee_account_addr, block_hash, tx_id, AccountingCategory.Income,
AccountingSubcategory.GasFee, NATIVE_TOKEN, gas_fee) AccountingSubcategory.GasFee, NATIVE_TOKEN, gas_fee)
@@ -181,8 +174,9 @@ async def accounting_fill(fill: EventData, out_token: str):
block_hash = hexstr(fill['blockHash']) block_hash = hexstr(fill['blockHash'])
tx_id = hexstr(fill['transactionHash']) tx_id = hexstr(fill['transactionHash'])
fee = int(fill['args']['fillFee']) fee = int(fill['args']['fillFee'])
await add_accounting_row(fill_fee_account_addr, block_hash, tx_id, AccountingCategory.Income, fm = await FeeManager.get()
AccountingSubcategory.FillFee, out_token, fee, adjust_decimals=True) await add_accounting_row(fm.fill_fee_account_addr, block_hash, tx_id, AccountingCategory.Income,
AccountingSubcategory.FillFee, out_token, fee)
async def add_accounting_row(account: str, block_hash: Optional[str], tx_id: Optional[str], category, subcategory, token, amount, note=None, async def add_accounting_row(account: str, block_hash: Optional[str], tx_id: Optional[str], category, subcategory, token, amount, note=None,
@@ -201,15 +195,81 @@ async def add_accounting_row(account: str, block_hash: Optional[str], tx_id: Opt
chain_id=current_chain.get().id, tx_id=tx_id, chain_id=current_chain.get().id, tx_id=tx_id,
)) ))
# Adjust database account if it exists # Adjust database account if it exists
account_db = db.session.get(Accounts, (current_chain.get(), account)) account_db = db.session.get(DbAccount, (current_chain.get(), account))
if account_db is not None: if account_db is not None:
new_amount = account_db.balances.get(token,dec(0)) + amount new_amount = account_db.balances.get(token, dec(0)) + amount
if new_amount < 0: if new_amount < 0:
log.error(f'negative balance for account {account} when applying accounting row {time} {category} {subcategory} {token} {amount} ${value}') log.error(f'negative balance for account {account} when applying accounting row {time} {category} {subcategory} {token} {amount} ${value}')
account_db.balances[token] = new_amount account_db.balances[token] = new_amount
db.session.add(account_db) # deep changes would not be detected by the ORM db.session.add(account_db) # deep changes would not be detected by the ORM
else:
log.warning(f'No db account found for {account}')
async def adjust_balance(account: Accounts, token=NATIVE_TOKEN, subcategory=AccountingSubcategory.InitialBalance, note=None): async def adjust_balance(account: DbAccount, token=NATIVE_TOKEN, subcategory=AccountingSubcategory.InitialBalance, note=None):
true_balance = await get_balance(account.address, token) true_balance = await get_balance(account.address, token)
amount = true_balance - account.balances.get(token,dec(0)) amount = true_balance - account.balances.get(token, dec(0))
await add_accounting_row(account.address, None, None, AccountingCategory.Special, subcategory, NATIVE_TOKEN, amount, note) await add_accounting_row(account.address, None, None, AccountingCategory.Special, subcategory, NATIVE_TOKEN, amount, note, adjust_decimals=False)
async def reconcile(account: DbAccount, block_id: Optional[str] = None, last_accounting_row_id: Optional[int] = None):
# Fetch the latest reconciliation for the account
latest_recon = db.session.execute(
select(Reconciliation).where(
Reconciliation.account == account
).order_by(Reconciliation.accounting_id.desc()).limit(1)
).scalar_one_or_none()
first_accounting_row_id = latest_recon.accounting_id+1 if latest_recon else None
# Retrieve the end height corresponding to the block_id
block = await get_block(block_id) if block_id else current_block.get()
chain_id = current_chain.get().id
balances = dict(latest_recon.balances) if latest_recon else {}
# Retrieve all accounting rows for this account within the reconciliation period
accounting_query = select(Accounting).where(
Accounting.chain_id == chain_id,
Accounting.account == account.address,
)
if first_accounting_row_id is not None:
accounting_query = accounting_query.where(Accounting.id >= first_accounting_row_id)
if last_accounting_row_id is not None:
accounting_query = accounting_query.where(Accounting.id <= last_accounting_row_id)
accounting_query = accounting_query.order_by(Accounting.id)
accounting_rows = db.session.execute(accounting_query).scalars().all()
if last_accounting_row_id is None:
last_accounting_row_id = db.session.execute(select(func.max(Accounting.id))).scalar_one_or_none()
if last_accounting_row_id is None:
log.warning("No records found in the Accounting table")
return
# Update balances using accounting rows
for row in accounting_rows:
balances[row.token] = balances.get(row.token, dec(0)) + row.amount
# Verify balances with the stored DbAccount balances
for token, balance in balances.items():
db_balance = account.balances.get(token, dec(0))
if balance != db_balance:
raise ReconciliationException(
f"DB mismatch in balances for account {account.address} token {token}: accounting={balance} db={db_balance}"
)
on_chain_balance = await get_balance(account.address, token)
if balance != on_chain_balance:
raise ReconciliationException(
f"Blockchain mismatch for account {account.address} token {token}: accounting={balances[token]} on-chain={on_chain_balance}"
)
# Create a new reconciliation record
new_recon = Reconciliation(
chain = current_chain.get(),
address=account.address,
accounting_id=last_accounting_row_id,
height=block.height,
balances=balances,
)
db.session.add(new_recon)
db.session.commit()
log.info(f'reconciled account {account.address} at height {block.height}')

View File

@@ -18,7 +18,12 @@ log = logging.getLogger(__name__)
class Account (LocalAccount): class Account (LocalAccount):
_main_account = None _main_account = None
_pool = None _pool = None
_pool_count = 0 _all = []
@staticmethod
def all():
Account._init_pool()
return Account._all
@staticmethod @staticmethod
def get(): def get():
@@ -41,21 +46,19 @@ class Account (LocalAccount):
except asyncio.TimeoutError: except asyncio.TimeoutError:
log.error('waiting for an available account') log.error('waiting for an available account')
result = await Account._pool.get() result = await Account._pool.get()
Account._pool_count -= 1
return result return result
@staticmethod @staticmethod
def _init_pool(): def _init_pool():
if Account._pool is None: if Account._pool is None:
Account._pool = asyncio.Queue() Account._pool = asyncio.Queue()
Account._pool_count = 0
for key in config.accounts: for key in config.accounts:
local_account = eth_account.Account.from_key(key) local_account = eth_account.Account.from_key(key)
account = Account(local_account) account = Account(local_account)
if Account._main_account is None: if Account._main_account is None:
Account._main_account = account Account._main_account = account
Account._pool.put_nowait(account) Account._pool.put_nowait(account)
Account._pool_count += 1 Account._all.append(account)
def __init__(self, local_account: LocalAccount): # todo chain_id? def __init__(self, local_account: LocalAccount): # todo chain_id?
@@ -81,7 +84,6 @@ class Account (LocalAccount):
def release(self): def release(self):
Account._pool.put_nowait(self) Account._pool.put_nowait(self)
Account._pool_count += 1
def __str__(self): def __str__(self):
return self.address return self.address

View File

@@ -35,6 +35,9 @@ class Blockchain:
def __str__(self): def __str__(self):
return self.name return self.name
def __hash__(self):
return self.id
_instances_by_id = {} _instances_by_id = {}
_instances_by_name = {} _instances_by_name = {}

View File

@@ -0,0 +1,30 @@
import logging
from sqlalchemy import select
from dexorder import db, blockchain
from dexorder.accounting import reconcile
from dexorder.bin.executable import execute
from dexorder.blocks import current_block, fetch_latest_block
from dexorder.database.model import DbAccount
log = logging.getLogger(__name__)
async def main():
await blockchain.connect()
db.connect()
current_block.set(await fetch_latest_block())
try:
accounts = db.session.execute(select(DbAccount)).scalars().all()
for account in accounts:
await reconcile(account)
db.session.commit()
log.info('Reconciliation complete')
except:
db.session.rollback()
raise
if __name__ == '__main__':
execute(main())

View File

@@ -17,6 +17,7 @@ from dexorder import current_w3, config, db, Blockchain
from dexorder.base.block import Block, BlockInfo, latest_block from dexorder.base.block import Block, BlockInfo, latest_block
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.database.model import DbBlock from dexorder.database.model import DbBlock
from dexorder.util import hexbytes
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -35,6 +36,11 @@ class FetchLock:
self.exception = None self.exception = None
async def fetch_latest_block() -> Block:
blockdata = await current_w3.get().eth.get_block('latest')
return Block(current_chain.get(), blockdata)
async def _fetch(fetch: FetchLock, chain_id: int, block_id: Union[int,bytes]) -> Optional[Block]: async def _fetch(fetch: FetchLock, chain_id: int, block_id: Union[int,bytes]) -> Optional[Block]:
# try database first # try database first
if config.cache_blocks_in_db and db: if config.cache_blocks_in_db and db:
@@ -75,8 +81,10 @@ def cache_block(block: Block, confirmed=False):
confirmed=confirmed, data=block.data)) confirmed=confirmed, data=block.data))
async def get_block(block_id: Union[bytes,int], *, chain_id=None) -> Block: async def get_block(block_id: Union[bytes,str,int], *, chain_id=None) -> Block:
# log.debug(f'get_block {block_id}') # log.debug(f'get_block {block_id}')
if type(block_id) is str:
block_id = hexbytes(block_id)
if chain_id is None: if chain_id is None:
chain_id = current_chain.get().id chain_id = current_chain.get().id

View File

@@ -21,11 +21,11 @@ log.info(f'Version: {version}')
chain_info = version['chainInfo'] chain_info = version['chainInfo']
for chain_id, info in chain_info.items(): for _chain_id, info in chain_info.items():
chain_id = int(chain_id) _chain_id = int(_chain_id)
_factory[chain_id] = ContractProxy(info['factory'], 'VaultFactory') _factory[_chain_id] = ContractProxy(info['factory'], 'VaultFactory')
_dexorder[chain_id] = ContractProxy(info['dexorder'], 'Dexorder') _dexorder[_chain_id] = ContractProxy(info['dexorder'], 'Dexorder')
_vault_init_code_hash[chain_id] = to_bytes(hexstr=info['vaultInitCodeHash']) _vault_init_code_hash[_chain_id] = to_bytes(hexstr=info['vaultInitCodeHash'])
def get_by_chain(d): def get_by_chain(d):
return d[current_chain.get().id] return d[current_chain.get().id]
@@ -66,3 +66,11 @@ def VaultContract(addr):
def DexorderContract(addr): def DexorderContract(addr):
return ContractProxy(addr, 'Dexorder') return ContractProxy(addr, 'Dexorder')
async def get_fee_manager_contract():
factory_contract = get_factory_contract()
implementation_address = await factory_contract.implementation()
fee_manager_address = await ContractProxy(implementation_address, 'IVaultImpl').feeManager()
fee_manager = ContractProxy(fee_manager_address, 'IFeeManager')
return fee_manager

View File

@@ -83,7 +83,7 @@ class DecimalNumeric (TypeDecorator):
return value return value
def process_result_value(self, value, dialect): def process_result_value(self, value, dialect):
return dec(value) return None if value is None else dec(value)
class Balances (TypeDecorator): class Balances (TypeDecorator):

View File

@@ -7,5 +7,6 @@ from .orderindex import OrderIndex
from .pool import Pool from .pool import Pool
from .token import Token from .token import Token
from .ofac import OFAC, OFACAlerts from .ofac import OFAC, OFACAlerts
from .accounting import Accounting from .accounting import Accounting, DbAccount
from .vaultcreationrequest import VaultCreationRequest from .vaultcreationrequest import VaultCreationRequest
from .tos import TOSAcceptance

View File

@@ -3,8 +3,9 @@ from datetime import datetime
from decimal import Decimal as dec from decimal import Decimal as dec
from enum import Enum from enum import Enum
from sqlalchemy import ForeignKey from sqlalchemy import ForeignKeyConstraint
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.orm import Mapped, mapped_column, relationship
from typing_extensions import Optional from typing_extensions import Optional
from dexorder import now from dexorder import now
@@ -43,6 +44,7 @@ class AccountingSubcategory (Enum):
class Accounting (Base): class Accounting (Base):
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
time: Mapped[datetime] = mapped_column(default=now(), index=True) time: Mapped[datetime] = mapped_column(default=now(), index=True)
chain_id: Mapped[int] = mapped_column(index=True)
account: Mapped[str] = mapped_column(index=True) account: Mapped[str] = mapped_column(index=True)
category: Mapped[AccountingCategory] = mapped_column(index=True) category: Mapped[AccountingCategory] = mapped_column(index=True)
subcategory: Mapped[Optional[AccountingSubcategory]] = mapped_column(index=True) subcategory: Mapped[Optional[AccountingSubcategory]] = mapped_column(index=True)
@@ -50,7 +52,6 @@ class Accounting (Base):
amount: Mapped[dec] = mapped_column(DecimalNumeric) amount: Mapped[dec] = mapped_column(DecimalNumeric)
value: Mapped[Optional[dec]] = mapped_column(DecimalNumeric) # USD value of the amount. If NULL then accounting has not been done for this row. value: Mapped[Optional[dec]] = mapped_column(DecimalNumeric) # USD value of the amount. If NULL then accounting has not been done for this row.
chain_id: Mapped[int]
tx_id: Mapped[Optional[str]] tx_id: Mapped[Optional[str]]
note: Mapped[Optional[str]] # format depends on the type of entry note: Mapped[Optional[str]] # format depends on the type of entry
@@ -62,9 +63,39 @@ class AccountKind (Enum):
Execution = 4 # spends gas Execution = 4 # spends gas
class Accounts(Base): class DbAccount(Base):
__tablename__ = "account"
chain: Mapped[Blockchain] = mapped_column(primary_key=True) chain: Mapped[Blockchain] = mapped_column(primary_key=True)
address: Mapped[str] = mapped_column(primary_key=True) address: Mapped[str] = mapped_column(primary_key=True)
kind: Mapped[AccountKind] = mapped_column(index=True) kind: Mapped[AccountKind] = mapped_column(index=True)
balances: Mapped[dict[str, dec]] = mapped_column(MutableDict.as_mutable(Balances), default=dict, server_default="{}")
reconciliations: Mapped[list["Reconciliation"]] = relationship(
"Reconciliation",
back_populates="account",
cascade="all, delete-orphan",
)
# records balance snapshots that have been verified by matching all three sources of balance records:
# 1. the on-chain balance as of the given block height (must be finalized)
# 2. the DbAccount row balance (as saved in the single DbAccount row)
# 3. the sum of all accounting rows for this address (check the summation of itemizations)
class Reconciliation (Base):
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
chain: Mapped[Blockchain] = mapped_column(index=True)
address: Mapped[str] = mapped_column(index=True) # address of the Account we are reconciling
accounting_id: Mapped[int] = mapped_column(index=True) # ID of the last accounting row to be processed for this Reconciliation
height: Mapped[int] = mapped_column(index=True) # blockchain height
balances: Mapped[dict[str, dec]] = mapped_column(Balances, default=dict, server_default="{}") balances: Mapped[dict[str, dec]] = mapped_column(Balances, default=dict, server_default="{}")
account: Mapped[DbAccount] = relationship(
"DbAccount",
back_populates="reconciliations",
foreign_keys=[chain, address],
)
__table_args__ = (
ForeignKeyConstraint(
["chain", "address"], ["account.chain", "account.address"], ondelete="CASCADE"
),
)

View File

@@ -0,0 +1,13 @@
from datetime import datetime
from sqlalchemy.orm import Mapped, mapped_column
from dexorder.database.model import Base
# We do not index this table since it is warehouse information and rarely if ever queried
class TOSAcceptance (Base):
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
ipaddr: Mapped[str]
time: Mapped[datetime]
version: Mapped[datetime]

View File

@@ -0,0 +1,31 @@
import asyncio
import logging
from dexorder.contract import ContractProxy
from dexorder.contract.dexorder import get_factory_contract, get_fee_manager_contract
log = logging.getLogger(__name__)
class FeeManager (ContractProxy):
_instance: 'FeeManager' = None
@staticmethod
async def get():
if FeeManager._instance is None:
fee_manager = await get_fee_manager_contract()
order_fee_account_addr, gas_fee_account_addr, fill_fee_account_addr = await asyncio.gather(
fee_manager.orderFeeAccount(),
fee_manager.gasFeeAccount(),
fee_manager.fillFeeAccount()
)
FeeManager._instance = FeeManager(fee_manager.address, order_fee_account_addr, gas_fee_account_addr, fill_fee_account_addr)
return FeeManager._instance
def __init__(self, address, order_fee_account_addr, gas_fee_account_addr, fill_fee_account_addr):
super().__init__(address, 'IFeeManager')
self.order_fee_account_addr = order_fee_account_addr
self.gas_fee_account_addr = gas_fee_account_addr
self.fill_fee_account_addr = fill_fee_account_addr

View File

@@ -146,8 +146,11 @@ class OHLCFile:
t, o, c = self.cur t, o, c = self.cur
self.cur = t, o, max(o,c,price), min(o,c,price), price self.cur = t, o, max(o,c,price), min(o,c,price), price
else: else:
t, o, h, line, c = self.cur try:
self.cur = t, o, max(h,line,price), min(h,line,price), price t, o, h, line, c = self.cur
self.cur = t, o, max(h,line,price), min(h,line,price), price
except ValueError:
log.error(f'Could not unpack cur {self.cur}')
@staticmethod @staticmethod
def row_bytes(row): def row_bytes(row):

View File

@@ -12,7 +12,7 @@ from dexorder import Blockchain, db, current_pub, async_yield, current_w3, confi
from dexorder.base.block import Block, latest_block from dexorder.base.block import Block, latest_block
from dexorder.base.chain import current_chain, current_clock, BlockClock from dexorder.base.chain import current_chain, current_clock, BlockClock
from dexorder.blockchain.connection import create_w3_ws, create_w3 from dexorder.blockchain.connection import create_w3_ws, create_w3
from dexorder.blocks import cache_block, get_block, promotion_height, current_block from dexorder.blocks import cache_block, get_block, promotion_height, current_block, fetch_latest_block
from dexorder.blockstate import BlockState, current_blockstate from dexorder.blockstate import BlockState, current_blockstate
from dexorder.blockstate.branch import Branch from dexorder.blockstate.branch import Branch
from dexorder.blockstate.diff import DiffEntryItem from dexorder.blockstate.diff import DiffEntryItem
@@ -157,8 +157,7 @@ class BlockStateRunner(BlockProgressor):
# 'latest' polling for blocks, and we push the entire block to the queue since apparently this is the only # 'latest' polling for blocks, and we push the entire block to the queue since apparently this is the only
# rpc call Hardhat seems to consistently support. The worker must then detect the type of object pushed to the # rpc call Hardhat seems to consistently support. The worker must then detect the type of object pushed to the
# work queue and either use the block directly or query for the block if the queue object is a hashcode. # work queue and either use the block directly or query for the block if the queue object is a hashcode.
blockdata = await w3.eth.get_block('latest') block = await fetch_latest_block()
block = Block(chain.id, blockdata)
if block.hash == prev_blockhash and ( if block.hash == prev_blockhash and (
self.state is None or self.state.root_branch is None or self.state.height == block.height): self.state is None or self.state.root_branch is None or self.state.height == block.height):
return prev_blockhash return prev_blockhash

View File

@@ -15,6 +15,7 @@ from dexorder.metadata import get_metadata
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
# noinspection PyShadowingNames
async def get_balance(addr, token_addr, *, adjust_decimals=True) -> dec: async def get_balance(addr, token_addr, *, adjust_decimals=True) -> dec:
if token_addr == NATIVE_TOKEN: if token_addr == NATIVE_TOKEN:
return await get_native_balance(addr, adjust_decimals=adjust_decimals) return await get_native_balance(addr, adjust_decimals=adjust_decimals)
@@ -22,6 +23,7 @@ async def get_balance(addr, token_addr, *, adjust_decimals=True) -> dec:
return await get_erc20_balance(addr, token_addr, adjust_decimals=adjust_decimals) return await get_erc20_balance(addr, token_addr, adjust_decimals=adjust_decimals)
# noinspection PyShadowingNames
async def get_erc20_balance(addr, token_addr, *, adjust_decimals=True): async def get_erc20_balance(addr, token_addr, *, adjust_decimals=True):
value = dec(await ERC20(token_addr).balanceOf(addr)) value = dec(await ERC20(token_addr).balanceOf(addr))
if adjust_decimals: if adjust_decimals:
@@ -30,6 +32,7 @@ async def get_erc20_balance(addr, token_addr, *, adjust_decimals=True):
return value return value
# noinspection PyShadowingNames
async def get_native_balance(addr, *, adjust_decimals=True) -> dec: async def get_native_balance(addr, *, adjust_decimals=True) -> dec:
value = dec(await current_w3.get().eth.get_balance(addr)) value = dec(await current_w3.get().eth.get_balance(addr))
if adjust_decimals: if adjust_decimals:

View File

@@ -101,7 +101,7 @@ async def create_and_send_transactions():
except asyncio.TimeoutError: except asyncio.TimeoutError:
account = None account = None
if account is None: if account is None:
log.error(f'No account available for transaction request type "{handler.tag}"') log.warning(f'No account available for job {job.id} type "{handler.tag}"')
continue continue
await ctx.sign(account) await ctx.sign(account)
log.info(f'servicing job {job.request.__class__.__name__} {job.id} with account {account.address} nonce {ctx.tx["nonce"]} tx {ctx.id}') log.info(f'servicing job {job.request.__class__.__name__} {job.id} with account {account.address} nonce {ctx.tx["nonce"]} tx {ctx.id}')