diff --git a/alembic/versions/509010f13e8b_accounting_vaultcreation_ofac.py b/alembic/versions/509010f13e8b_accounting_vaultcreation_ofac.py index d55a1df..0791584 100644 --- a/alembic/versions/509010f13e8b_accounting_vaultcreation_ofac.py +++ b/alembic/versions/509010f13e8b_accounting_vaultcreation_ofac.py @@ -42,6 +42,7 @@ def upgrade() -> None: op.create_index(op.f('ix_accounting_time'), 'accounting', ['time'], unique=False) op.create_index(op.f('ix_accounting_token'), 'accounting', ['token'], unique=False) op.create_index(op.f('ix_accounting_account'), 'accounting', ['account'], unique=False) + op.create_index(op.f('ix_accounting_chain_id'), 'accounting', ['chain_id'], unique=False) op.create_table('ofac', sa.Column('address', sa.String(), nullable=False), sa.PrimaryKeyConstraint('address') @@ -63,31 +64,60 @@ def upgrade() -> None: sa.PrimaryKeyConstraint('chain', 'owner', 'num') ) op.create_index('ix_vault_address_not_null', 'vaultcreationrequest', ['vault'], unique=False, postgresql_where='vault IS NOT NULL') - op.create_table('accounts', + op.create_table('account', sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), sa.Column('address', sa.String(), nullable=False), sa.Column('kind', sa.Enum('Admin', 'OrderFee', 'GasFee', 'FillFee', 'Execution', name='accountkind'), nullable=False), sa.Column('balances', dexorder.database.column_types.Balances(astext_type=sa.Text()), server_default='{}', nullable=False), sa.PrimaryKeyConstraint('chain', 'address') ) - op.create_index(op.f('ix_accounts_kind'), 'accounts', ['kind'], unique=False) + op.create_index(op.f('ix_account_kind'), 'account', ['kind'], unique=False) + + op.create_table('reconciliation', + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), + sa.Column('address', sa.String(), nullable=False), + sa.Column('accounting_id', sa.Integer(), nullable=False), + sa.Column('height', sa.Integer(), nullable=False), + sa.Column('balances', dexorder.database.column_types.Balances(astext_type=sa.Text()), server_default='{}', nullable=False), + sa.ForeignKeyConstraint(['chain', 'address'], ['account.chain', 'account.address'], ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_reconciliation_accounting_id'), 'reconciliation', ['accounting_id'], unique=False) + op.create_index(op.f('ix_reconciliation_address'), 'reconciliation', ['address'], unique=False) + op.create_index(op.f('ix_reconciliation_chain'), 'reconciliation', ['chain'], unique=False) + op.create_index(op.f('ix_reconciliation_height'), 'reconciliation', ['height'], unique=False) + + op.create_table('tosacceptance', + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('ipaddr', sa.String(), nullable=False), + sa.Column('time', sa.DateTime(), nullable=False), + sa.Column('version', sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint('id') + ) + def downgrade() -> None: - op.drop_index(op.f('ix_accounting_account'), table_name='accounting') - op.drop_index(op.f('ix_accounts_kind'), table_name='accounts') - op.drop_table('accounts') + op.drop_table('tosacceptance') + op.drop_index(op.f('ix_reconciliation_height'), table_name='reconciliation') + op.drop_index(op.f('ix_reconciliation_chain'), table_name='reconciliation') + op.drop_index(op.f('ix_reconciliation_address'), table_name='reconciliation') + op.drop_index(op.f('ix_reconciliation_accounting_id'), table_name='reconciliation') + op.drop_table('reconciliation') + op.drop_index(op.f('ix_account_kind'), table_name='account') + op.drop_table('account') op.drop_index('ix_vault_address_not_null', table_name='vaultcreationrequest', postgresql_where='vault IS NOT NULL') op.drop_table('vaultcreationrequest') op.drop_table('ofacalerts') - # op.execute('drop sequence ofacalerts_id_seq') # autoincrement sequence op.drop_table('ofac') + op.drop_index(op.f('ix_accounting_chain_id'), table_name='accounting') + op.drop_index(op.f('ix_accounting_account'), table_name='accounting') op.drop_index(op.f('ix_accounting_token'), table_name='accounting') op.drop_index(op.f('ix_accounting_time'), table_name='accounting') op.drop_index(op.f('ix_accounting_subcategory'), table_name='accounting') op.drop_index(op.f('ix_accounting_category'), table_name='accounting') op.drop_table('accounting') - # op.execute('drop sequence accounting_id_seq') # autoincrement sequence op.execute('drop type accountkind') # enum type op.execute('drop type accountingcategory') # enum type op.execute('drop type accountingsubcategory') # enum type diff --git a/conf/mock/.secret-mock.toml b/conf/mock/.secret-mock.toml index 3b60898..8646d35 100644 --- a/conf/mock/.secret-mock.toml +++ b/conf/mock/.secret-mock.toml @@ -9,9 +9,9 @@ accounts = [ # '0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d', # 0x70997970C51812dc3A010C7d01b50e0d17dc79C8 # '0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a', # 0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC # '0x7c852118294e51e653712a81e05800f419141751be58f605c371e15141b007a6', # 0x90F79bf6EB2c4f870365E785982E1f101E93b906 - '0x47e179ec197488593b187f80a00eb0da91f1b9d0b13f8733639f19c30a34926a', # 0x15d34AAf54267DB7D7c367839AAf71A00a2C6A65 +# '0x47e179ec197488593b187f80a00eb0da91f1b9d0b13f8733639f19c30a34926a', # 0x15d34AAf54267DB7D7c367839AAf71A00a2C6A65 '0x8b3a350cf5c34c9194ca85829a2df0ec3153be0318b5e2d3348e872092edffba', # 0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc -# '0x92db14e403b83dfe3df233f83dfa3a0d7096f21ca9b0d6d6b8d88b2b4ec1564e', # 0x976EA74026E726554dB657fA54763abd0C3a0aa9 + '0x92db14e403b83dfe3df233f83dfa3a0d7096f21ca9b0d6d6b8d88b2b4ec1564e', # 0x976EA74026E726554dB657fA54763abd0C3a0aa9 # '0x4bbbf85ce3377467afe5d46f804f221813b2bb87f24d81f60f1fcdbf7cbf4356', # 0x14dC79964da2C08b23698B3D3cc7Ca32193d9955 # '0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97', # 0x23618e81E3f5cdF7f54C3d65f7FBc0aBf5B21E8f # '0x2a871d0798f97d79848a013d4936a73bf4cc922c825d33c1cf7073dff6d409c6', # 0xa0Ee7A142d267C1f36714E4a8F75612F20a79720 diff --git a/src/dexorder/accounting.py b/src/dexorder/accounting.py index 90cf006..58158f6 100644 --- a/src/dexorder/accounting.py +++ b/src/dexorder/accounting.py @@ -1,18 +1,19 @@ import asyncio import logging +from sqlalchemy import select, func from typing_extensions import Optional from web3.exceptions import ContractLogicError from web3.types import EventData -from dexorder import db, dec, NATIVE_TOKEN, from_timestamp, config, ADDRESS_0, now +from dexorder import db, dec, NATIVE_TOKEN, from_timestamp, config, ADDRESS_0, now, Account from dexorder.base import TransactionReceiptDict from dexorder.base.chain import current_chain -from dexorder.blocks import get_block_timestamp -from dexorder.contract import ContractProxy +from dexorder.blocks import get_block_timestamp, get_block, current_block from dexorder.contract.dexorder import get_factory_contract, get_mirrorenv, get_mockenv from dexorder.database.model.accounting import AccountingSubcategory, Accounting, AccountingCategory, AccountKind, \ - Accounts + DbAccount, Reconciliation +from dexorder.feemanager import FeeManager from dexorder.pools import mark_to_market, pool_prices, get_pool, add_mark_pool, quotes from dexorder.tokens import adjust_decimals as adj_dec, get_token, get_balance from dexorder.util import hexstr @@ -21,12 +22,9 @@ log = logging.getLogger(__name__) accounting_initialized = False -# noinspection PyTypeChecker -order_fee_account_addr: str = None -# noinspection PyTypeChecker -gas_fee_account_addr: str = None -# noinspection PyTypeChecker -fill_fee_account_addr: str = None + +class ReconciliationException(Exception): + pass async def initialize_accounting(): @@ -39,16 +37,6 @@ async def initialize_accounting(): async def initialize_accounts(): - factory_contract = get_factory_contract() - implementation_address = await factory_contract.implementation() - fee_manager_address = await ContractProxy(implementation_address, 'IVaultImpl').feeManager() - fee_manager = ContractProxy(fee_manager_address, 'IFeeManager') - global order_fee_account_addr, gas_fee_account_addr, fill_fee_account_addr - order_fee_account_addr, gas_fee_account_addr, fill_fee_account_addr = await asyncio.gather( - fee_manager.orderFeeAccount(), - fee_manager.gasFeeAccount(), - fee_manager.fillFeeAccount() - ) # Since this is called by top-level main functions outside the Runner, we trigger an explicit db commit/rollback try: # noinspection PyStatementEffect @@ -60,15 +48,15 @@ async def initialize_accounts(): async def initialize_accounts_2(): - of_account = ensure_account(order_fee_account_addr, AccountKind.OrderFee) - gf_account = ensure_account(gas_fee_account_addr, AccountKind.GasFee) - ff_account = ensure_account(fill_fee_account_addr, AccountKind.FillFee) + fm = await FeeManager.get() + of_account = ensure_account(fm.order_fee_account_addr, AccountKind.OrderFee) + gf_account = ensure_account(fm.gas_fee_account_addr, AccountKind.GasFee) + ff_account = ensure_account(fm.fill_fee_account_addr, AccountKind.FillFee) + exe_accounts = [ensure_account(account.address, AccountKind.Execution) for account in Account.all()] if current_chain.get().id in [1337, 31337]: - log.debug('initializing debug accounts') + log.debug('adjusting debug account balances') await asyncio.gather( - adjust_balance(of_account), - adjust_balance(gf_account), - adjust_balance(ff_account), + *map(adjust_balance, (of_account, gf_account, ff_account, *exe_accounts)) ) @@ -119,26 +107,29 @@ async def initialize_mark_to_market(): async def handle_feeaccountschanged(fee_accounts: EventData): try: - global order_fee_account_addr, gas_fee_account_addr, fill_fee_account_addr order_fee_account_addr = fee_accounts['args']['orderFeeAccount'] gas_fee_account_addr = fee_accounts['args']['gasFeeAccount'] fill_fee_account_addr = fee_accounts['args']['fillFeeAccount'] - await initialize_accounts_2() except KeyError: log.warning(f'Could not parse FeeAccountsChanged {fee_accounts}') return + fm = await FeeManager.get() + fm.order_fee_account_addr = order_fee_account_addr + fm.gas_fee_account_addr = gas_fee_account_addr + fm.fill_fee_account_addr = fill_fee_account_addr + await initialize_accounts_2() -def ensure_account(addr: str, kind: AccountKind): +def ensure_account(addr: str, kind: AccountKind) -> DbAccount: chain = current_chain.get() - found = db.session.get(Accounts, (chain, addr)) + found = db.session.get(DbAccount, (chain, addr)) if found: if found.kind != kind: log.warning(f'Account {addr} has wrong kind {found.kind} != {kind}') found.kind = kind db.session.add(found) else: - found = Accounts(chain=chain, address=addr, kind=kind, balances={}) + found = DbAccount(chain=chain, address=addr, kind=kind, balances={}) db.session.add(found) return found @@ -156,9 +147,10 @@ async def accounting_transfer(receipt: TransactionReceiptDict, token: str, sende async def accounting_transaction_gas(receipt: TransactionReceiptDict, subcategory: AccountingSubcategory): """ Accounts for the gas spent on the given transaction """ + amount = dec(receipt['gasUsed']) * dec(receipt['effectiveGasPrice']) await add_accounting_row( receipt['from'], hexstr(receipt['blockHash']), hexstr(receipt['transactionHash']), - AccountingCategory.Expense, subcategory, NATIVE_TOKEN, -dec(receipt['gasUsed']) + AccountingCategory.Expense, subcategory, NATIVE_TOKEN, -amount ) @@ -171,9 +163,10 @@ async def accounting_placement(order_placed: EventData): except KeyError: log.warning(f'Rogue DexorderPlacedEvent in tx {hexstr(tx_id)}') return - await add_accounting_row( order_fee_account_addr, block_hash, tx_id, AccountingCategory.Income, + fm = await FeeManager.get() + await add_accounting_row( fm.order_fee_account_addr, block_hash, tx_id, AccountingCategory.Income, AccountingSubcategory.OrderFee, NATIVE_TOKEN, order_fee) - await add_accounting_row( gas_fee_account_addr, block_hash, tx_id, AccountingCategory.Income, + await add_accounting_row( fm.gas_fee_account_addr, block_hash, tx_id, AccountingCategory.Income, AccountingSubcategory.GasFee, NATIVE_TOKEN, gas_fee) @@ -181,8 +174,9 @@ async def accounting_fill(fill: EventData, out_token: str): block_hash = hexstr(fill['blockHash']) tx_id = hexstr(fill['transactionHash']) fee = int(fill['args']['fillFee']) - await add_accounting_row(fill_fee_account_addr, block_hash, tx_id, AccountingCategory.Income, - AccountingSubcategory.FillFee, out_token, fee, adjust_decimals=True) + fm = await FeeManager.get() + await add_accounting_row(fm.fill_fee_account_addr, block_hash, tx_id, AccountingCategory.Income, + AccountingSubcategory.FillFee, out_token, fee) async def add_accounting_row(account: str, block_hash: Optional[str], tx_id: Optional[str], category, subcategory, token, amount, note=None, @@ -201,15 +195,81 @@ async def add_accounting_row(account: str, block_hash: Optional[str], tx_id: Opt chain_id=current_chain.get().id, tx_id=tx_id, )) # Adjust database account if it exists - account_db = db.session.get(Accounts, (current_chain.get(), account)) + account_db = db.session.get(DbAccount, (current_chain.get(), account)) if account_db is not None: - new_amount = account_db.balances.get(token,dec(0)) + amount + new_amount = account_db.balances.get(token, dec(0)) + amount if new_amount < 0: log.error(f'negative balance for account {account} when applying accounting row {time} {category} {subcategory} {token} {amount} ${value}') account_db.balances[token] = new_amount db.session.add(account_db) # deep changes would not be detected by the ORM + else: + log.warning(f'No db account found for {account}') -async def adjust_balance(account: Accounts, token=NATIVE_TOKEN, subcategory=AccountingSubcategory.InitialBalance, note=None): +async def adjust_balance(account: DbAccount, token=NATIVE_TOKEN, subcategory=AccountingSubcategory.InitialBalance, note=None): true_balance = await get_balance(account.address, token) - amount = true_balance - account.balances.get(token,dec(0)) - await add_accounting_row(account.address, None, None, AccountingCategory.Special, subcategory, NATIVE_TOKEN, amount, note) + amount = true_balance - account.balances.get(token, dec(0)) + await add_accounting_row(account.address, None, None, AccountingCategory.Special, subcategory, NATIVE_TOKEN, amount, note, adjust_decimals=False) + + +async def reconcile(account: DbAccount, block_id: Optional[str] = None, last_accounting_row_id: Optional[int] = None): + # Fetch the latest reconciliation for the account + latest_recon = db.session.execute( + select(Reconciliation).where( + Reconciliation.account == account + ).order_by(Reconciliation.accounting_id.desc()).limit(1) + ).scalar_one_or_none() + first_accounting_row_id = latest_recon.accounting_id+1 if latest_recon else None + + # Retrieve the end height corresponding to the block_id + block = await get_block(block_id) if block_id else current_block.get() + chain_id = current_chain.get().id + + balances = dict(latest_recon.balances) if latest_recon else {} + + # Retrieve all accounting rows for this account within the reconciliation period + accounting_query = select(Accounting).where( + Accounting.chain_id == chain_id, + Accounting.account == account.address, + ) + if first_accounting_row_id is not None: + accounting_query = accounting_query.where(Accounting.id >= first_accounting_row_id) + if last_accounting_row_id is not None: + accounting_query = accounting_query.where(Accounting.id <= last_accounting_row_id) + accounting_query = accounting_query.order_by(Accounting.id) + accounting_rows = db.session.execute(accounting_query).scalars().all() + + if last_accounting_row_id is None: + last_accounting_row_id = db.session.execute(select(func.max(Accounting.id))).scalar_one_or_none() + if last_accounting_row_id is None: + log.warning("No records found in the Accounting table") + return + + # Update balances using accounting rows + for row in accounting_rows: + balances[row.token] = balances.get(row.token, dec(0)) + row.amount + + # Verify balances with the stored DbAccount balances + for token, balance in balances.items(): + db_balance = account.balances.get(token, dec(0)) + if balance != db_balance: + raise ReconciliationException( + f"DB mismatch in balances for account {account.address} token {token}: accounting={balance} db={db_balance}" + ) + on_chain_balance = await get_balance(account.address, token) + if balance != on_chain_balance: + raise ReconciliationException( + f"Blockchain mismatch for account {account.address} token {token}: accounting={balances[token]} on-chain={on_chain_balance}" + ) + + # Create a new reconciliation record + new_recon = Reconciliation( + chain = current_chain.get(), + address=account.address, + accounting_id=last_accounting_row_id, + height=block.height, + balances=balances, + ) + db.session.add(new_recon) + db.session.commit() + log.info(f'reconciled account {account.address} at height {block.height}') + diff --git a/src/dexorder/base/account.py b/src/dexorder/base/account.py index b3660c1..a0ca521 100644 --- a/src/dexorder/base/account.py +++ b/src/dexorder/base/account.py @@ -18,7 +18,12 @@ log = logging.getLogger(__name__) class Account (LocalAccount): _main_account = None _pool = None - _pool_count = 0 + _all = [] + + @staticmethod + def all(): + Account._init_pool() + return Account._all @staticmethod def get(): @@ -41,21 +46,19 @@ class Account (LocalAccount): except asyncio.TimeoutError: log.error('waiting for an available account') result = await Account._pool.get() - Account._pool_count -= 1 return result @staticmethod def _init_pool(): if Account._pool is None: Account._pool = asyncio.Queue() - Account._pool_count = 0 for key in config.accounts: local_account = eth_account.Account.from_key(key) account = Account(local_account) if Account._main_account is None: Account._main_account = account Account._pool.put_nowait(account) - Account._pool_count += 1 + Account._all.append(account) def __init__(self, local_account: LocalAccount): # todo chain_id? @@ -81,7 +84,6 @@ class Account (LocalAccount): def release(self): Account._pool.put_nowait(self) - Account._pool_count += 1 def __str__(self): return self.address diff --git a/src/dexorder/base/chain.py b/src/dexorder/base/chain.py index 70e4cec..d9041bd 100644 --- a/src/dexorder/base/chain.py +++ b/src/dexorder/base/chain.py @@ -35,6 +35,9 @@ class Blockchain: def __str__(self): return self.name + def __hash__(self): + return self.id + _instances_by_id = {} _instances_by_name = {} diff --git a/src/dexorder/bin/reconcile.py b/src/dexorder/bin/reconcile.py new file mode 100644 index 0000000..6ae4a8e --- /dev/null +++ b/src/dexorder/bin/reconcile.py @@ -0,0 +1,30 @@ +import logging + +from sqlalchemy import select + +from dexorder import db, blockchain +from dexorder.accounting import reconcile +from dexorder.bin.executable import execute +from dexorder.blocks import current_block, fetch_latest_block +from dexorder.database.model import DbAccount + +log = logging.getLogger(__name__) + +async def main(): + await blockchain.connect() + db.connect() + current_block.set(await fetch_latest_block()) + try: + accounts = db.session.execute(select(DbAccount)).scalars().all() + for account in accounts: + await reconcile(account) + db.session.commit() + log.info('Reconciliation complete') + except: + db.session.rollback() + raise + + +if __name__ == '__main__': + execute(main()) + \ No newline at end of file diff --git a/src/dexorder/blocks.py b/src/dexorder/blocks.py index 9907645..69dabb0 100644 --- a/src/dexorder/blocks.py +++ b/src/dexorder/blocks.py @@ -17,6 +17,7 @@ from dexorder import current_w3, config, db, Blockchain from dexorder.base.block import Block, BlockInfo, latest_block from dexorder.base.chain import current_chain from dexorder.database.model import DbBlock +from dexorder.util import hexbytes log = logging.getLogger(__name__) @@ -35,6 +36,11 @@ class FetchLock: self.exception = None +async def fetch_latest_block() -> Block: + blockdata = await current_w3.get().eth.get_block('latest') + return Block(current_chain.get(), blockdata) + + async def _fetch(fetch: FetchLock, chain_id: int, block_id: Union[int,bytes]) -> Optional[Block]: # try database first if config.cache_blocks_in_db and db: @@ -75,8 +81,10 @@ def cache_block(block: Block, confirmed=False): confirmed=confirmed, data=block.data)) -async def get_block(block_id: Union[bytes,int], *, chain_id=None) -> Block: +async def get_block(block_id: Union[bytes,str,int], *, chain_id=None) -> Block: # log.debug(f'get_block {block_id}') + if type(block_id) is str: + block_id = hexbytes(block_id) if chain_id is None: chain_id = current_chain.get().id diff --git a/src/dexorder/contract/dexorder.py b/src/dexorder/contract/dexorder.py index 531bac3..1a1582e 100644 --- a/src/dexorder/contract/dexorder.py +++ b/src/dexorder/contract/dexorder.py @@ -21,11 +21,11 @@ log.info(f'Version: {version}') chain_info = version['chainInfo'] -for chain_id, info in chain_info.items(): - chain_id = int(chain_id) - _factory[chain_id] = ContractProxy(info['factory'], 'VaultFactory') - _dexorder[chain_id] = ContractProxy(info['dexorder'], 'Dexorder') - _vault_init_code_hash[chain_id] = to_bytes(hexstr=info['vaultInitCodeHash']) +for _chain_id, info in chain_info.items(): + _chain_id = int(_chain_id) + _factory[_chain_id] = ContractProxy(info['factory'], 'VaultFactory') + _dexorder[_chain_id] = ContractProxy(info['dexorder'], 'Dexorder') + _vault_init_code_hash[_chain_id] = to_bytes(hexstr=info['vaultInitCodeHash']) def get_by_chain(d): return d[current_chain.get().id] @@ -66,3 +66,11 @@ def VaultContract(addr): def DexorderContract(addr): return ContractProxy(addr, 'Dexorder') + + +async def get_fee_manager_contract(): + factory_contract = get_factory_contract() + implementation_address = await factory_contract.implementation() + fee_manager_address = await ContractProxy(implementation_address, 'IVaultImpl').feeManager() + fee_manager = ContractProxy(fee_manager_address, 'IFeeManager') + return fee_manager diff --git a/src/dexorder/database/column_types.py b/src/dexorder/database/column_types.py index 74d4b4c..72a41dd 100644 --- a/src/dexorder/database/column_types.py +++ b/src/dexorder/database/column_types.py @@ -83,7 +83,7 @@ class DecimalNumeric (TypeDecorator): return value def process_result_value(self, value, dialect): - return dec(value) + return None if value is None else dec(value) class Balances (TypeDecorator): diff --git a/src/dexorder/database/model/__init__.py b/src/dexorder/database/model/__init__.py index 87ddda2..bf38065 100644 --- a/src/dexorder/database/model/__init__.py +++ b/src/dexorder/database/model/__init__.py @@ -7,5 +7,6 @@ from .orderindex import OrderIndex from .pool import Pool from .token import Token from .ofac import OFAC, OFACAlerts -from .accounting import Accounting +from .accounting import Accounting, DbAccount from .vaultcreationrequest import VaultCreationRequest +from .tos import TOSAcceptance diff --git a/src/dexorder/database/model/accounting.py b/src/dexorder/database/model/accounting.py index 184c770..68b8891 100644 --- a/src/dexorder/database/model/accounting.py +++ b/src/dexorder/database/model/accounting.py @@ -3,8 +3,9 @@ from datetime import datetime from decimal import Decimal as dec from enum import Enum -from sqlalchemy import ForeignKey -from sqlalchemy.orm import Mapped, mapped_column +from sqlalchemy import ForeignKeyConstraint +from sqlalchemy.ext.mutable import MutableDict +from sqlalchemy.orm import Mapped, mapped_column, relationship from typing_extensions import Optional from dexorder import now @@ -43,6 +44,7 @@ class AccountingSubcategory (Enum): class Accounting (Base): id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) time: Mapped[datetime] = mapped_column(default=now(), index=True) + chain_id: Mapped[int] = mapped_column(index=True) account: Mapped[str] = mapped_column(index=True) category: Mapped[AccountingCategory] = mapped_column(index=True) subcategory: Mapped[Optional[AccountingSubcategory]] = mapped_column(index=True) @@ -50,7 +52,6 @@ class Accounting (Base): amount: Mapped[dec] = mapped_column(DecimalNumeric) value: Mapped[Optional[dec]] = mapped_column(DecimalNumeric) # USD value of the amount. If NULL then accounting has not been done for this row. - chain_id: Mapped[int] tx_id: Mapped[Optional[str]] note: Mapped[Optional[str]] # format depends on the type of entry @@ -62,9 +63,39 @@ class AccountKind (Enum): Execution = 4 # spends gas -class Accounts(Base): +class DbAccount(Base): + __tablename__ = "account" chain: Mapped[Blockchain] = mapped_column(primary_key=True) address: Mapped[str] = mapped_column(primary_key=True) kind: Mapped[AccountKind] = mapped_column(index=True) + balances: Mapped[dict[str, dec]] = mapped_column(MutableDict.as_mutable(Balances), default=dict, server_default="{}") + + reconciliations: Mapped[list["Reconciliation"]] = relationship( + "Reconciliation", + back_populates="account", + cascade="all, delete-orphan", + ) + +# records balance snapshots that have been verified by matching all three sources of balance records: +# 1. the on-chain balance as of the given block height (must be finalized) +# 2. the DbAccount row balance (as saved in the single DbAccount row) +# 3. the sum of all accounting rows for this address (check the summation of itemizations) +class Reconciliation (Base): + id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + chain: Mapped[Blockchain] = mapped_column(index=True) + address: Mapped[str] = mapped_column(index=True) # address of the Account we are reconciling + accounting_id: Mapped[int] = mapped_column(index=True) # ID of the last accounting row to be processed for this Reconciliation + height: Mapped[int] = mapped_column(index=True) # blockchain height balances: Mapped[dict[str, dec]] = mapped_column(Balances, default=dict, server_default="{}") + account: Mapped[DbAccount] = relationship( + "DbAccount", + back_populates="reconciliations", + foreign_keys=[chain, address], + ) + + __table_args__ = ( + ForeignKeyConstraint( + ["chain", "address"], ["account.chain", "account.address"], ondelete="CASCADE" + ), + ) diff --git a/src/dexorder/database/model/tos.py b/src/dexorder/database/model/tos.py new file mode 100644 index 0000000..9b58615 --- /dev/null +++ b/src/dexorder/database/model/tos.py @@ -0,0 +1,13 @@ +from datetime import datetime + +from sqlalchemy.orm import Mapped, mapped_column + +from dexorder.database.model import Base + + +# We do not index this table since it is warehouse information and rarely if ever queried +class TOSAcceptance (Base): + id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + ipaddr: Mapped[str] + time: Mapped[datetime] + version: Mapped[datetime] diff --git a/src/dexorder/feemanager.py b/src/dexorder/feemanager.py new file mode 100644 index 0000000..dbed88a --- /dev/null +++ b/src/dexorder/feemanager.py @@ -0,0 +1,31 @@ +import asyncio +import logging + +from dexorder.contract import ContractProxy +from dexorder.contract.dexorder import get_factory_contract, get_fee_manager_contract + +log = logging.getLogger(__name__) + +class FeeManager (ContractProxy): + _instance: 'FeeManager' = None + + @staticmethod + async def get(): + if FeeManager._instance is None: + fee_manager = await get_fee_manager_contract() + order_fee_account_addr, gas_fee_account_addr, fill_fee_account_addr = await asyncio.gather( + fee_manager.orderFeeAccount(), + fee_manager.gasFeeAccount(), + fee_manager.fillFeeAccount() + ) + FeeManager._instance = FeeManager(fee_manager.address, order_fee_account_addr, gas_fee_account_addr, fill_fee_account_addr) + return FeeManager._instance + + + def __init__(self, address, order_fee_account_addr, gas_fee_account_addr, fill_fee_account_addr): + super().__init__(address, 'IFeeManager') + self.order_fee_account_addr = order_fee_account_addr + self.gas_fee_account_addr = gas_fee_account_addr + self.fill_fee_account_addr = fill_fee_account_addr + + diff --git a/src/dexorder/final_ohlc.py b/src/dexorder/final_ohlc.py index 395d7ae..a9f00cb 100644 --- a/src/dexorder/final_ohlc.py +++ b/src/dexorder/final_ohlc.py @@ -146,8 +146,11 @@ class OHLCFile: t, o, c = self.cur self.cur = t, o, max(o,c,price), min(o,c,price), price else: - t, o, h, line, c = self.cur - self.cur = t, o, max(h,line,price), min(h,line,price), price + try: + t, o, h, line, c = self.cur + self.cur = t, o, max(h,line,price), min(h,line,price), price + except ValueError: + log.error(f'Could not unpack cur {self.cur}') @staticmethod def row_bytes(row): diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index ccf1170..ea56d89 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -12,7 +12,7 @@ from dexorder import Blockchain, db, current_pub, async_yield, current_w3, confi from dexorder.base.block import Block, latest_block from dexorder.base.chain import current_chain, current_clock, BlockClock from dexorder.blockchain.connection import create_w3_ws, create_w3 -from dexorder.blocks import cache_block, get_block, promotion_height, current_block +from dexorder.blocks import cache_block, get_block, promotion_height, current_block, fetch_latest_block from dexorder.blockstate import BlockState, current_blockstate from dexorder.blockstate.branch import Branch from dexorder.blockstate.diff import DiffEntryItem @@ -157,8 +157,7 @@ class BlockStateRunner(BlockProgressor): # 'latest' polling for blocks, and we push the entire block to the queue since apparently this is the only # rpc call Hardhat seems to consistently support. The worker must then detect the type of object pushed to the # work queue and either use the block directly or query for the block if the queue object is a hashcode. - blockdata = await w3.eth.get_block('latest') - block = Block(chain.id, blockdata) + block = await fetch_latest_block() if block.hash == prev_blockhash and ( self.state is None or self.state.root_branch is None or self.state.height == block.height): return prev_blockhash diff --git a/src/dexorder/tokens.py b/src/dexorder/tokens.py index c8c73c5..f1e9a52 100644 --- a/src/dexorder/tokens.py +++ b/src/dexorder/tokens.py @@ -15,6 +15,7 @@ from dexorder.metadata import get_metadata log = logging.getLogger(__name__) +# noinspection PyShadowingNames async def get_balance(addr, token_addr, *, adjust_decimals=True) -> dec: if token_addr == NATIVE_TOKEN: return await get_native_balance(addr, adjust_decimals=adjust_decimals) @@ -22,6 +23,7 @@ async def get_balance(addr, token_addr, *, adjust_decimals=True) -> dec: return await get_erc20_balance(addr, token_addr, adjust_decimals=adjust_decimals) +# noinspection PyShadowingNames async def get_erc20_balance(addr, token_addr, *, adjust_decimals=True): value = dec(await ERC20(token_addr).balanceOf(addr)) if adjust_decimals: @@ -30,6 +32,7 @@ async def get_erc20_balance(addr, token_addr, *, adjust_decimals=True): return value +# noinspection PyShadowingNames async def get_native_balance(addr, *, adjust_decimals=True) -> dec: value = dec(await current_w3.get().eth.get_balance(addr)) if adjust_decimals: diff --git a/src/dexorder/transactions.py b/src/dexorder/transactions.py index f866f38..63eb07d 100644 --- a/src/dexorder/transactions.py +++ b/src/dexorder/transactions.py @@ -101,7 +101,7 @@ async def create_and_send_transactions(): except asyncio.TimeoutError: account = None if account is None: - log.error(f'No account available for transaction request type "{handler.tag}"') + log.warning(f'No account available for job {job.id} type "{handler.tag}"') continue await ctx.sign(account) log.info(f'servicing job {job.request.__class__.__name__} {job.id} with account {account.address} nonce {ctx.tx["nonce"]} tx {ctx.id}')