diff --git a/alembic/versions/509010f13e8b_accounting_vaultcreation_ofac.py b/alembic/versions/509010f13e8b_accounting_vaultcreation_ofac.py index c05e96f..5edce11 100644 --- a/alembic/versions/509010f13e8b_accounting_vaultcreation_ofac.py +++ b/alembic/versions/509010f13e8b_accounting_vaultcreation_ofac.py @@ -28,7 +28,7 @@ def upgrade() -> None: sa.Column('time', sa.DateTime(), nullable=False), sa.Column('account', sa.String(), nullable=False), sa.Column('category', sa.Enum('Transfer', 'Income', 'Expense', 'Trade', 'Special', name='accountingcategory'), nullable=False), - sa.Column('subcategory', sa.Enum('OrderFee', 'GasFee', 'FillFee', 'VaultCreation', 'Execution', 'FeeAdjustment', 'InitialBalance', name='accountingsubcategory'), nullable=True), + sa.Column('subcategory', sa.Enum('OrderFee', 'GasFee', 'FillFee', 'Admin', 'TransactionGas', 'VaultCreation', 'Execution', 'FeeAdjustment', 'InitialBalance', name='accountingsubcategory'), nullable=True), sa.Column('token', sa.String(), nullable=False), sa.Column('amount', dexorder.database.column_types.DecimalNumeric(), nullable=False), sa.Column('value', dexorder.database.column_types.DecimalNumeric(), nullable=True), diff --git a/src/dexorder/__init__.py b/src/dexorder/__init__.py index b82970c..da60f96 100644 --- a/src/dexorder/__init__.py +++ b/src/dexorder/__init__.py @@ -35,14 +35,16 @@ class _Token: def __repr__(self): return self.__token_name def __str__(self): return self.__token_name -class _FalseToken (_Token): +class _FalseyToken (_Token): def __bool__(self): return False -NARG = _FalseToken('NARG') -DELETE = _FalseToken('DELETE') # used as a value token to indicate removal of the key +NARG = _FalseyToken('NARG') +DELETE = _FalseyToken('DELETE') # used as a value token to indicate removal of the key ADDRESS_0 = '0x0000000000000000000000000000000000000000' NATIVE_TOKEN = '0x0000000000000000000000000000000000000001' # We use 0x01 to indicate the use of native ETH wherever a token address is normally required +USD_FIAT = '0x0000000000000000000000000000000000000055' # We use 0x55 (ASCII 'U') to indicate the use of fiat USD +CHAIN_ID_OFFCHAIN = -1 WEI = 1 GWEI = 1_000_000_000 ETH = 1_000_000_000_000_000_000 diff --git a/src/dexorder/accounting.py b/src/dexorder/accounting.py index f3bfb8f..d934a54 100644 --- a/src/dexorder/accounting.py +++ b/src/dexorder/accounting.py @@ -34,19 +34,19 @@ class ReconciliationException(Exception): pass -def accounting_lock(): - """ - This must be called before accounting_*() calls are made. - """ - db.session.execute(text("LOCK TABLE account, accounting, reconciliation IN EXCLUSIVE MODE")) - - -async def initialize_accounting(): +def initialize_accounting(): + global accounting_initialized + if not accounting_initialized: + load_accounts_cache() + accounting_initialized = True + + +async def initialize_accounting_runner(): global accounting_initialized if not accounting_initialized: - accounting_lock() await _initialize_mark_to_market() # set up mark-to-market first, so accounts can value their initial balances await _initialize_accounts() + load_accounts_cache() accounting_initialized = True log.info(f'accounting initialized\n\tstablecoins: {config.stablecoins}\n\tquotecoins: {config.quotecoins}\n\tnativecoin: {config.nativecoin}') @@ -64,17 +64,23 @@ async def _initialize_accounts(): async def _initialize_accounts_2(): fm = await FeeManager.get() - of_account = _ensure_account(fm.order_fee_account_addr, AccountKind.OrderFee) - gf_account = _ensure_account(fm.gas_fee_account_addr, AccountKind.GasFee) - ff_account = _ensure_account(fm.fill_fee_account_addr, AccountKind.FillFee) - exe_accounts = [_ensure_account(account.address, AccountKind.Execution) for account in Account.all()] + of_account = ensure_account(fm.order_fee_account_addr, AccountKind.OrderFee) + gf_account = ensure_account(fm.gas_fee_account_addr, AccountKind.GasFee) + ff_account = ensure_account(fm.fill_fee_account_addr, AccountKind.FillFee) + exe_accounts = [ensure_account(account.address, AccountKind.Execution) for account in Account.all()] if current_chain.get().id in [1337, 31337]: log.debug('adjusting debug account balances') await asyncio.gather( *map(adjust_balance, (of_account, gf_account, ff_account, *exe_accounts)) ) - for db_account in db.session.execute(select(DbAccount)).scalars(): + + +def load_accounts_cache(*, chain=None): + if chain is None: + chain = current_chain.get() + for db_account in db.session.execute(select(DbAccount).where(DbAccount.chain==chain)).scalars(): _tracked_addrs.add(db_account.address) + log.info(f'tracking account {db_account.chain.id} {db_account.address}') async def _initialize_mark_to_market(): @@ -124,13 +130,14 @@ async def _initialize_mark_to_market(): add_mark_pool(addr, pool['base'], pool['quote'], pool['fee']) -def _ensure_account(addr: str, kind: AccountKind) -> DbAccount: - chain = current_chain.get() +def ensure_account(addr: str, kind: AccountKind, *, chain=None) -> DbAccount: + if chain is None: + chain = current_chain.get() found = db.session.get(DbAccount, (chain, addr)) if found: if found.kind != kind: log.warning(f'Account {addr} has wrong kind {found.kind} != {kind}') - found.kind = kind + # found.kind = kind db.session.add(found) _tracked_addrs.add(found.address) else: @@ -160,20 +167,21 @@ async def accounting_transfer(receipt: TransactionReceiptDict, token: str, block_hash = hexstr(receipt['blockHash']) tx_id = hexstr(receipt['transactionHash']) await asyncio.gather( - add_accounting_row( sender, block_hash, tx_id, AccountingCategory.Transfer, None, - token, -amount, receiver, adjust_decimals=adjust_decimals), - add_accounting_row( receiver, block_hash, tx_id, AccountingCategory.Transfer, None, - token, amount, sender, adjust_decimals=adjust_decimals), + accounting_transaction_gas(receipt), + add_accounting_entry_m2m(sender, block_hash, tx_id, AccountingCategory.Transfer, None, + token, -amount, receiver, adjust_decimals=adjust_decimals), + add_accounting_entry_m2m(receiver, block_hash, tx_id, AccountingCategory.Transfer, None, + token, amount, sender, adjust_decimals=adjust_decimals), ) -async def accounting_transaction_gas(receipt: TransactionReceiptDict, subcategory: AccountingSubcategory): +async def accounting_transaction_gas(receipt: TransactionReceiptDict, subcategory: AccountingSubcategory = AccountingSubcategory.TransactionGas): """ Accounts for the gas spent on the given transaction """ amount = dec(receipt['gasUsed']) * dec(receipt['effectiveGasPrice']) - await add_accounting_row( receipt['from'], - hexstr(receipt['blockHash']), hexstr(receipt['transactionHash']), - AccountingCategory.Expense, subcategory, NATIVE_TOKEN, -amount - ) + await add_accounting_entry_m2m(receipt['from'], + hexstr(receipt['blockHash']), hexstr(receipt['transactionHash']), + AccountingCategory.Expense, subcategory, NATIVE_TOKEN, -amount + ) async def accounting_placement(order_placed: EventData): @@ -186,10 +194,10 @@ async def accounting_placement(order_placed: EventData): log.warning(f'Rogue DexorderPlacedEvent in tx {hexstr(tx_id)}') return fm = await FeeManager.get() - await add_accounting_row( fm.order_fee_account_addr, block_hash, tx_id, AccountingCategory.Income, - AccountingSubcategory.OrderFee, NATIVE_TOKEN, order_fee) - await add_accounting_row( fm.gas_fee_account_addr, block_hash, tx_id, AccountingCategory.Income, - AccountingSubcategory.GasFee, NATIVE_TOKEN, gas_fee) + await add_accounting_entry_m2m(fm.order_fee_account_addr, block_hash, tx_id, AccountingCategory.Income, + AccountingSubcategory.OrderFee, NATIVE_TOKEN, order_fee) + await add_accounting_entry_m2m(fm.gas_fee_account_addr, block_hash, tx_id, AccountingCategory.Income, + AccountingSubcategory.GasFee, NATIVE_TOKEN, gas_fee) async def accounting_fill(fill: EventData, out_token: str) -> dec: @@ -200,14 +208,14 @@ async def accounting_fill(fill: EventData, out_token: str) -> dec: tx_id = hexstr(fill['transactionHash']) fee = int(fill['args']['fillFee']) fm = await FeeManager.get() - return await add_accounting_row(fm.fill_fee_account_addr, block_hash, tx_id, AccountingCategory.Income, - AccountingSubcategory.FillFee, out_token, fee) + return await add_accounting_entry_m2m(fm.fill_fee_account_addr, block_hash, tx_id, AccountingCategory.Income, + AccountingSubcategory.FillFee, out_token, fee) -async def add_accounting_row(account: str, block_hash: Optional[str], tx_id: Optional[str], category, subcategory, token, amount, note=None, - *, adjust_decimals=True) -> dec: +async def add_accounting_entry_m2m(account: str, block_hash: Optional[str], tx_id: Optional[str], category, subcategory, token, amount, note=None, + *, adjust_decimals=True) -> dec: """ - Returns the mark-to-market USD value of the transaction. + Returns the mark-to-market USD value of the entry. """ if amount == 0: return dec(0) @@ -221,6 +229,13 @@ async def add_accounting_row(account: str, block_hash: Optional[str], tx_id: Opt value = mark_to_market(token, amount) log.debug(f'accounting row {time} {account} {category} {subcategory} {token} {amount} ${value}') chain_id = current_chain.get().id + add_accounting_entry(chain_id, account, time, category, subcategory, token, amount, value, tx_id, note) + return value + + +def add_accounting_entry(chain_id, account, time, category, subcategory, token, amount, value=None, tx_id=None, note=None): + if not is_tracked_address(account): + return db.session.add(Accounting(account=account, time=time, category=category, subcategory=subcategory, token=token, amount=amount, value=value, note=note, @@ -229,15 +244,17 @@ async def add_accounting_row(account: str, block_hash: Optional[str], tx_id: Opt account_db = db.session.get(DbAccount, (current_chain.get(), account)) new_amount = account_db.balances.get(token, dec(0)) + amount if new_amount < 0: - log.error(f'negative balance for account {account} when applying accounting row {time} {category} {subcategory} {token} {amount} ${value}') + log.error( + f'negative balance for account {account} when applying accounting row {time} {category} {subcategory} {token} {amount} ${value}') account_db.balances[token] = new_amount db.session.add(account_db) # deep changes would not be detected by the ORM - return value + db.session.flush() + async def adjust_balance(account: DbAccount, token=NATIVE_TOKEN, subcategory=AccountingSubcategory.InitialBalance, note=None): true_balance = await get_balance(account.address, token) amount = true_balance - account.balances.get(token, dec(0)) - await add_accounting_row(account.address, None, None, AccountingCategory.Special, subcategory, NATIVE_TOKEN, amount, note, adjust_decimals=False) + await add_accounting_entry_m2m(account.address, None, None, AccountingCategory.Special, subcategory, NATIVE_TOKEN, amount, note, adjust_decimals=False) async def accounting_reconcile(account: DbAccount, block_id: Optional[str] = None, last_accounting_row_id: Optional[int] = None): diff --git a/src/dexorder/bin/executable.py b/src/dexorder/bin/executable.py index b63a27c..3921d9d 100644 --- a/src/dexorder/bin/executable.py +++ b/src/dexorder/bin/executable.py @@ -9,6 +9,8 @@ from signal import Signals from traceback import print_exception from typing import Coroutine, Callable, Union, Any +from omegaconf import OmegaConf + from dexorder import configuration, config from dexorder.alert import init_alerts from dexorder.configuration.schema import Config @@ -40,10 +42,12 @@ def split_args(): return omegaconf_args, regular_args -def execute(main:Callable[...,Coroutine[Any,Any,Any]], shutdown=None, *, parse_logging=True, parse_args: Union[Callable[[list[str]],Any],bool]=True): +def execute(main:Callable[...,Coroutine[Any,Any,Any]], shutdown=None, *, parse_logging=True, + parse_args: Union[Callable[[list[str]],Any], type, bool]=True): """ if parse_args is a function, then the command-line arguments are given to OmegaConf first, and any args parsed by OmegaConf are stripped from the args list. The remaining args are then passed to parse_args(args) + if parse_args is a type, then the type is used to parse the extra command-line arguments using OmegaConf. """ # config configured = False @@ -63,7 +67,7 @@ def execute(main:Callable[...,Coroutine[Any,Any,Any]], shutdown=None, *, parse_l log.info('Logging configured to default') xconf = None if parse_args: - if callable(parse_args): + if callable(parse_args) or isinstance(parse_args, type): omegaconf_args, regular_args = split_args() else: omegaconf_args = None @@ -73,6 +77,9 @@ def execute(main:Callable[...,Coroutine[Any,Any,Any]], shutdown=None, *, parse_l if callable(parse_args): # noinspection PyUnboundLocalVariable xconf = parse_args(regular_args) + elif isinstance(parse_args, type): + # noinspection PyUnboundLocalVariable + xconf = OmegaConf.merge(OmegaConf.structured(parse_args), OmegaConf.from_cli(regular_args)) init_alerts() diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index 9e5efcc..c95a402 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -2,7 +2,7 @@ import logging from asyncio import CancelledError from dexorder import db, blockchain -from dexorder.accounting import initialize_accounting +from dexorder.accounting import initialize_accounting_runner from dexorder.alert import infoAlert from dexorder.base.chain import current_chain from dexorder.bin.executable import execute @@ -117,7 +117,7 @@ async def main(): log.info('initializing redis with root state') await redis_state.save(state.root_fork, state.diffs_by_branch[state.root_branch.id]) - await initialize_accounting() + await initialize_accounting_runner() runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None) setup_logevent_triggers(runner) diff --git a/src/dexorder/bin/reconcile.py b/src/dexorder/bin/reconcile.py index e65ef7d..1bdcda6 100644 --- a/src/dexorder/bin/reconcile.py +++ b/src/dexorder/bin/reconcile.py @@ -1,9 +1,9 @@ import logging -from sqlalchemy import select +from sqlalchemy import select, text from dexorder import db, blockchain -from dexorder.accounting import accounting_reconcile, accounting_lock +from dexorder.accounting import accounting_reconcile from dexorder.bin.executable import execute from dexorder.blocks import fetch_latest_block, current_block from dexorder.database.model import DbAccount @@ -15,7 +15,7 @@ async def main(): db.connect() block = await fetch_latest_block() current_block.set(block) - accounting_lock() + db.session.execute(text("LOCK TABLE account, accounting, reconciliation IN EXCLUSIVE MODE")) try: accounts = db.session.execute(select(DbAccount)).scalars().all() for account in accounts: @@ -29,4 +29,3 @@ async def main(): if __name__ == '__main__': execute(main) - \ No newline at end of file diff --git a/src/dexorder/bin/refill.py b/src/dexorder/bin/refill.py index c203d96..f63d10e 100644 --- a/src/dexorder/bin/refill.py +++ b/src/dexorder/bin/refill.py @@ -1,14 +1,23 @@ import logging +from dataclasses import dataclass -from dexorder import blockchain, db +from dexorder import blockchain, db, dec from dexorder.bin.executable import execute log = logging.getLogger(__name__) -async def main(): - await blockchain.connect() - db.connect() +@dataclass +class RefillConfig: + refill_level: dec + refill_accounts: list[str] + + +async def main(refill_config: RefillConfig): + # await blockchain.connect() + # db.connect() + log.info(f'Refilling to {refill_config.refill_level:.18f} ETH') + log.info(f'Refilling accounts: {refill_config.refill_accounts}') if __name__ == '__main__': - execute(main()) + execute(main, parse_args=RefillConfig) diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index 8d389e5..992220b 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -60,6 +60,3 @@ class Config: stablecoins: list[str] = field(default_factory=list) # primary stablecoins which are marked to $1 quotecoins: list[str] = field(default_factory=list) # quote tokens like WETH that have stablecoin markets nativecoin: Optional[str] = None # used for accounting of native values. e.g. address of WETH - - # account: target_balance - refill: dict[str,str] = field(default_factory=dict) diff --git a/src/dexorder/database/model/accounting.py b/src/dexorder/database/model/accounting.py index 8630f61..8d5df3c 100644 --- a/src/dexorder/database/model/accounting.py +++ b/src/dexorder/database/model/accounting.py @@ -1,7 +1,7 @@ import logging from datetime import datetime from decimal import Decimal as dec -from enum import Enum +from enum import Enum, auto from sqlalchemy import ForeignKeyConstraint from sqlalchemy.ext.mutable import MutableDict @@ -17,35 +17,37 @@ log = logging.getLogger(__name__) class AccountingCategory (Enum): - Transfer = 0 - Income = 1 - Expense = 2 - Trade = 3 - Special = 4 + Transfer = auto() + Income = auto() + Expense = auto() + Trade = auto() + Special = auto() class AccountingSubcategory (Enum): # Income - OrderFee = 0 - GasFee = 1 - FillFee = 2 + OrderFee = auto() + GasFee = auto() + FillFee = auto() # Expense - VaultCreation = 3 - Execution = 4 - FeeAdjustment = 5 # includes adjusting fee limits + Admin = auto() # contract deployments and upgrades, changing adjuster address, etc. + TransactionGas = auto() + VaultCreation = auto() + Execution = auto() + FeeAdjustment = auto() # includes adjusting fee limits # Transfer # Transfers have no subcategories, but the note field will be the address of the other account. Both a debit and a # credit entry will be created, one for each account participating in the transfer. # Special Codes - InitialBalance = 5 + InitialBalance = auto() class Accounting (Base): id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) time: Mapped[datetime] = mapped_column(default=now(), index=True) - chain_id: Mapped[int] = mapped_column(index=True) + chain_id: Mapped[int] = mapped_column(index=True) # chain_id account: Mapped[str] = mapped_column(index=True) category: Mapped[AccountingCategory] = mapped_column(index=True) subcategory: Mapped[Optional[AccountingSubcategory]] = mapped_column(index=True) diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index d996a60..56cc374 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -4,8 +4,7 @@ import logging from web3.types import EventData from dexorder import db, metric, current_w3, timestamp -from dexorder.accounting import accounting_fill, accounting_placement, accounting_transfer, is_tracked_address, \ - accounting_lock +from dexorder.accounting import accounting_fill, accounting_placement, accounting_lock from dexorder.base.chain import current_chain from dexorder.base.order import TrancheKey, OrderKey from dexorder.base.orderlib import SwapOrderState @@ -32,7 +31,6 @@ def dump_log(eventlog): def init(): new_pool_prices.clear() start_trigger_updates() - accounting_lock() async def handle_order_placed(event: EventData):