execute refactor for extraconf; accounting fixes

This commit is contained in:
tim
2025-02-28 01:02:36 -04:00
parent e868ea5a4b
commit af0f35eba5
10 changed files with 106 additions and 75 deletions

View File

@@ -28,7 +28,7 @@ def upgrade() -> None:
sa.Column('time', sa.DateTime(), nullable=False),
sa.Column('account', sa.String(), nullable=False),
sa.Column('category', sa.Enum('Transfer', 'Income', 'Expense', 'Trade', 'Special', name='accountingcategory'), nullable=False),
sa.Column('subcategory', sa.Enum('OrderFee', 'GasFee', 'FillFee', 'VaultCreation', 'Execution', 'FeeAdjustment', 'InitialBalance', name='accountingsubcategory'), nullable=True),
sa.Column('subcategory', sa.Enum('OrderFee', 'GasFee', 'FillFee', 'Admin', 'TransactionGas', 'VaultCreation', 'Execution', 'FeeAdjustment', 'InitialBalance', name='accountingsubcategory'), nullable=True),
sa.Column('token', sa.String(), nullable=False),
sa.Column('amount', dexorder.database.column_types.DecimalNumeric(), nullable=False),
sa.Column('value', dexorder.database.column_types.DecimalNumeric(), nullable=True),

View File

@@ -35,14 +35,16 @@ class _Token:
def __repr__(self): return self.__token_name
def __str__(self): return self.__token_name
class _FalseToken (_Token):
class _FalseyToken (_Token):
def __bool__(self): return False
NARG = _FalseToken('NARG')
DELETE = _FalseToken('DELETE') # used as a value token to indicate removal of the key
NARG = _FalseyToken('NARG')
DELETE = _FalseyToken('DELETE') # used as a value token to indicate removal of the key
ADDRESS_0 = '0x0000000000000000000000000000000000000000'
NATIVE_TOKEN = '0x0000000000000000000000000000000000000001' # We use 0x01 to indicate the use of native ETH wherever a token address is normally required
USD_FIAT = '0x0000000000000000000000000000000000000055' # We use 0x55 (ASCII 'U') to indicate the use of fiat USD
CHAIN_ID_OFFCHAIN = -1
WEI = 1
GWEI = 1_000_000_000
ETH = 1_000_000_000_000_000_000

View File

@@ -34,19 +34,19 @@ class ReconciliationException(Exception):
pass
def accounting_lock():
"""
This must be called before accounting_*() calls are made.
"""
db.session.execute(text("LOCK TABLE account, accounting, reconciliation IN EXCLUSIVE MODE"))
async def initialize_accounting():
def initialize_accounting():
global accounting_initialized
if not accounting_initialized:
load_accounts_cache()
accounting_initialized = True
async def initialize_accounting_runner():
global accounting_initialized
if not accounting_initialized:
accounting_lock()
await _initialize_mark_to_market() # set up mark-to-market first, so accounts can value their initial balances
await _initialize_accounts()
load_accounts_cache()
accounting_initialized = True
log.info(f'accounting initialized\n\tstablecoins: {config.stablecoins}\n\tquotecoins: {config.quotecoins}\n\tnativecoin: {config.nativecoin}')
@@ -64,17 +64,23 @@ async def _initialize_accounts():
async def _initialize_accounts_2():
fm = await FeeManager.get()
of_account = _ensure_account(fm.order_fee_account_addr, AccountKind.OrderFee)
gf_account = _ensure_account(fm.gas_fee_account_addr, AccountKind.GasFee)
ff_account = _ensure_account(fm.fill_fee_account_addr, AccountKind.FillFee)
exe_accounts = [_ensure_account(account.address, AccountKind.Execution) for account in Account.all()]
of_account = ensure_account(fm.order_fee_account_addr, AccountKind.OrderFee)
gf_account = ensure_account(fm.gas_fee_account_addr, AccountKind.GasFee)
ff_account = ensure_account(fm.fill_fee_account_addr, AccountKind.FillFee)
exe_accounts = [ensure_account(account.address, AccountKind.Execution) for account in Account.all()]
if current_chain.get().id in [1337, 31337]:
log.debug('adjusting debug account balances')
await asyncio.gather(
*map(adjust_balance, (of_account, gf_account, ff_account, *exe_accounts))
)
for db_account in db.session.execute(select(DbAccount)).scalars():
def load_accounts_cache(*, chain=None):
if chain is None:
chain = current_chain.get()
for db_account in db.session.execute(select(DbAccount).where(DbAccount.chain==chain)).scalars():
_tracked_addrs.add(db_account.address)
log.info(f'tracking account {db_account.chain.id} {db_account.address}')
async def _initialize_mark_to_market():
@@ -124,13 +130,14 @@ async def _initialize_mark_to_market():
add_mark_pool(addr, pool['base'], pool['quote'], pool['fee'])
def _ensure_account(addr: str, kind: AccountKind) -> DbAccount:
chain = current_chain.get()
def ensure_account(addr: str, kind: AccountKind, *, chain=None) -> DbAccount:
if chain is None:
chain = current_chain.get()
found = db.session.get(DbAccount, (chain, addr))
if found:
if found.kind != kind:
log.warning(f'Account {addr} has wrong kind {found.kind} != {kind}')
found.kind = kind
# found.kind = kind
db.session.add(found)
_tracked_addrs.add(found.address)
else:
@@ -160,20 +167,21 @@ async def accounting_transfer(receipt: TransactionReceiptDict, token: str,
block_hash = hexstr(receipt['blockHash'])
tx_id = hexstr(receipt['transactionHash'])
await asyncio.gather(
add_accounting_row( sender, block_hash, tx_id, AccountingCategory.Transfer, None,
token, -amount, receiver, adjust_decimals=adjust_decimals),
add_accounting_row( receiver, block_hash, tx_id, AccountingCategory.Transfer, None,
token, amount, sender, adjust_decimals=adjust_decimals),
accounting_transaction_gas(receipt),
add_accounting_entry_m2m(sender, block_hash, tx_id, AccountingCategory.Transfer, None,
token, -amount, receiver, adjust_decimals=adjust_decimals),
add_accounting_entry_m2m(receiver, block_hash, tx_id, AccountingCategory.Transfer, None,
token, amount, sender, adjust_decimals=adjust_decimals),
)
async def accounting_transaction_gas(receipt: TransactionReceiptDict, subcategory: AccountingSubcategory):
async def accounting_transaction_gas(receipt: TransactionReceiptDict, subcategory: AccountingSubcategory = AccountingSubcategory.TransactionGas):
""" Accounts for the gas spent on the given transaction """
amount = dec(receipt['gasUsed']) * dec(receipt['effectiveGasPrice'])
await add_accounting_row( receipt['from'],
hexstr(receipt['blockHash']), hexstr(receipt['transactionHash']),
AccountingCategory.Expense, subcategory, NATIVE_TOKEN, -amount
)
await add_accounting_entry_m2m(receipt['from'],
hexstr(receipt['blockHash']), hexstr(receipt['transactionHash']),
AccountingCategory.Expense, subcategory, NATIVE_TOKEN, -amount
)
async def accounting_placement(order_placed: EventData):
@@ -186,10 +194,10 @@ async def accounting_placement(order_placed: EventData):
log.warning(f'Rogue DexorderPlacedEvent in tx {hexstr(tx_id)}')
return
fm = await FeeManager.get()
await add_accounting_row( fm.order_fee_account_addr, block_hash, tx_id, AccountingCategory.Income,
AccountingSubcategory.OrderFee, NATIVE_TOKEN, order_fee)
await add_accounting_row( fm.gas_fee_account_addr, block_hash, tx_id, AccountingCategory.Income,
AccountingSubcategory.GasFee, NATIVE_TOKEN, gas_fee)
await add_accounting_entry_m2m(fm.order_fee_account_addr, block_hash, tx_id, AccountingCategory.Income,
AccountingSubcategory.OrderFee, NATIVE_TOKEN, order_fee)
await add_accounting_entry_m2m(fm.gas_fee_account_addr, block_hash, tx_id, AccountingCategory.Income,
AccountingSubcategory.GasFee, NATIVE_TOKEN, gas_fee)
async def accounting_fill(fill: EventData, out_token: str) -> dec:
@@ -200,14 +208,14 @@ async def accounting_fill(fill: EventData, out_token: str) -> dec:
tx_id = hexstr(fill['transactionHash'])
fee = int(fill['args']['fillFee'])
fm = await FeeManager.get()
return await add_accounting_row(fm.fill_fee_account_addr, block_hash, tx_id, AccountingCategory.Income,
AccountingSubcategory.FillFee, out_token, fee)
return await add_accounting_entry_m2m(fm.fill_fee_account_addr, block_hash, tx_id, AccountingCategory.Income,
AccountingSubcategory.FillFee, out_token, fee)
async def add_accounting_row(account: str, block_hash: Optional[str], tx_id: Optional[str], category, subcategory, token, amount, note=None,
*, adjust_decimals=True) -> dec:
async def add_accounting_entry_m2m(account: str, block_hash: Optional[str], tx_id: Optional[str], category, subcategory, token, amount, note=None,
*, adjust_decimals=True) -> dec:
"""
Returns the mark-to-market USD value of the transaction.
Returns the mark-to-market USD value of the entry.
"""
if amount == 0:
return dec(0)
@@ -221,6 +229,13 @@ async def add_accounting_row(account: str, block_hash: Optional[str], tx_id: Opt
value = mark_to_market(token, amount)
log.debug(f'accounting row {time} {account} {category} {subcategory} {token} {amount} ${value}')
chain_id = current_chain.get().id
add_accounting_entry(chain_id, account, time, category, subcategory, token, amount, value, tx_id, note)
return value
def add_accounting_entry(chain_id, account, time, category, subcategory, token, amount, value=None, tx_id=None, note=None):
if not is_tracked_address(account):
return
db.session.add(Accounting(account=account,
time=time, category=category, subcategory=subcategory,
token=token, amount=amount, value=value, note=note,
@@ -229,15 +244,17 @@ async def add_accounting_row(account: str, block_hash: Optional[str], tx_id: Opt
account_db = db.session.get(DbAccount, (current_chain.get(), account))
new_amount = account_db.balances.get(token, dec(0)) + amount
if new_amount < 0:
log.error(f'negative balance for account {account} when applying accounting row {time} {category} {subcategory} {token} {amount} ${value}')
log.error(
f'negative balance for account {account} when applying accounting row {time} {category} {subcategory} {token} {amount} ${value}')
account_db.balances[token] = new_amount
db.session.add(account_db) # deep changes would not be detected by the ORM
return value
db.session.flush()
async def adjust_balance(account: DbAccount, token=NATIVE_TOKEN, subcategory=AccountingSubcategory.InitialBalance, note=None):
true_balance = await get_balance(account.address, token)
amount = true_balance - account.balances.get(token, dec(0))
await add_accounting_row(account.address, None, None, AccountingCategory.Special, subcategory, NATIVE_TOKEN, amount, note, adjust_decimals=False)
await add_accounting_entry_m2m(account.address, None, None, AccountingCategory.Special, subcategory, NATIVE_TOKEN, amount, note, adjust_decimals=False)
async def accounting_reconcile(account: DbAccount, block_id: Optional[str] = None, last_accounting_row_id: Optional[int] = None):

View File

@@ -9,6 +9,8 @@ from signal import Signals
from traceback import print_exception
from typing import Coroutine, Callable, Union, Any
from omegaconf import OmegaConf
from dexorder import configuration, config
from dexorder.alert import init_alerts
from dexorder.configuration.schema import Config
@@ -40,10 +42,12 @@ def split_args():
return omegaconf_args, regular_args
def execute(main:Callable[...,Coroutine[Any,Any,Any]], shutdown=None, *, parse_logging=True, parse_args: Union[Callable[[list[str]],Any],bool]=True):
def execute(main:Callable[...,Coroutine[Any,Any,Any]], shutdown=None, *, parse_logging=True,
parse_args: Union[Callable[[list[str]],Any], type, bool]=True):
"""
if parse_args is a function, then the command-line arguments are given to OmegaConf first, and any args parsed by
OmegaConf are stripped from the args list. The remaining args are then passed to parse_args(args)
if parse_args is a type, then the type is used to parse the extra command-line arguments using OmegaConf.
"""
# config
configured = False
@@ -63,7 +67,7 @@ def execute(main:Callable[...,Coroutine[Any,Any,Any]], shutdown=None, *, parse_l
log.info('Logging configured to default')
xconf = None
if parse_args:
if callable(parse_args):
if callable(parse_args) or isinstance(parse_args, type):
omegaconf_args, regular_args = split_args()
else:
omegaconf_args = None
@@ -73,6 +77,9 @@ def execute(main:Callable[...,Coroutine[Any,Any,Any]], shutdown=None, *, parse_l
if callable(parse_args):
# noinspection PyUnboundLocalVariable
xconf = parse_args(regular_args)
elif isinstance(parse_args, type):
# noinspection PyUnboundLocalVariable
xconf = OmegaConf.merge(OmegaConf.structured(parse_args), OmegaConf.from_cli(regular_args))
init_alerts()

View File

@@ -2,7 +2,7 @@ import logging
from asyncio import CancelledError
from dexorder import db, blockchain
from dexorder.accounting import initialize_accounting
from dexorder.accounting import initialize_accounting_runner
from dexorder.alert import infoAlert
from dexorder.base.chain import current_chain
from dexorder.bin.executable import execute
@@ -117,7 +117,7 @@ async def main():
log.info('initializing redis with root state')
await redis_state.save(state.root_fork, state.diffs_by_branch[state.root_branch.id])
await initialize_accounting()
await initialize_accounting_runner()
runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None)
setup_logevent_triggers(runner)

View File

@@ -1,9 +1,9 @@
import logging
from sqlalchemy import select
from sqlalchemy import select, text
from dexorder import db, blockchain
from dexorder.accounting import accounting_reconcile, accounting_lock
from dexorder.accounting import accounting_reconcile
from dexorder.bin.executable import execute
from dexorder.blocks import fetch_latest_block, current_block
from dexorder.database.model import DbAccount
@@ -15,7 +15,7 @@ async def main():
db.connect()
block = await fetch_latest_block()
current_block.set(block)
accounting_lock()
db.session.execute(text("LOCK TABLE account, accounting, reconciliation IN EXCLUSIVE MODE"))
try:
accounts = db.session.execute(select(DbAccount)).scalars().all()
for account in accounts:
@@ -29,4 +29,3 @@ async def main():
if __name__ == '__main__':
execute(main)

View File

@@ -1,14 +1,23 @@
import logging
from dataclasses import dataclass
from dexorder import blockchain, db
from dexorder import blockchain, db, dec
from dexorder.bin.executable import execute
log = logging.getLogger(__name__)
async def main():
await blockchain.connect()
db.connect()
@dataclass
class RefillConfig:
refill_level: dec
refill_accounts: list[str]
async def main(refill_config: RefillConfig):
# await blockchain.connect()
# db.connect()
log.info(f'Refilling to {refill_config.refill_level:.18f} ETH')
log.info(f'Refilling accounts: {refill_config.refill_accounts}')
if __name__ == '__main__':
execute(main())
execute(main, parse_args=RefillConfig)

View File

@@ -60,6 +60,3 @@ class Config:
stablecoins: list[str] = field(default_factory=list) # primary stablecoins which are marked to $1
quotecoins: list[str] = field(default_factory=list) # quote tokens like WETH that have stablecoin markets
nativecoin: Optional[str] = None # used for accounting of native values. e.g. address of WETH
# account: target_balance
refill: dict[str,str] = field(default_factory=dict)

View File

@@ -1,7 +1,7 @@
import logging
from datetime import datetime
from decimal import Decimal as dec
from enum import Enum
from enum import Enum, auto
from sqlalchemy import ForeignKeyConstraint
from sqlalchemy.ext.mutable import MutableDict
@@ -17,35 +17,37 @@ log = logging.getLogger(__name__)
class AccountingCategory (Enum):
Transfer = 0
Income = 1
Expense = 2
Trade = 3
Special = 4
Transfer = auto()
Income = auto()
Expense = auto()
Trade = auto()
Special = auto()
class AccountingSubcategory (Enum):
# Income
OrderFee = 0
GasFee = 1
FillFee = 2
OrderFee = auto()
GasFee = auto()
FillFee = auto()
# Expense
VaultCreation = 3
Execution = 4
FeeAdjustment = 5 # includes adjusting fee limits
Admin = auto() # contract deployments and upgrades, changing adjuster address, etc.
TransactionGas = auto()
VaultCreation = auto()
Execution = auto()
FeeAdjustment = auto() # includes adjusting fee limits
# Transfer
# Transfers have no subcategories, but the note field will be the address of the other account. Both a debit and a
# credit entry will be created, one for each account participating in the transfer.
# Special Codes
InitialBalance = 5
InitialBalance = auto()
class Accounting (Base):
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
time: Mapped[datetime] = mapped_column(default=now(), index=True)
chain_id: Mapped[int] = mapped_column(index=True)
chain_id: Mapped[int] = mapped_column(index=True) # chain_id
account: Mapped[str] = mapped_column(index=True)
category: Mapped[AccountingCategory] = mapped_column(index=True)
subcategory: Mapped[Optional[AccountingSubcategory]] = mapped_column(index=True)

View File

@@ -4,8 +4,7 @@ import logging
from web3.types import EventData
from dexorder import db, metric, current_w3, timestamp
from dexorder.accounting import accounting_fill, accounting_placement, accounting_transfer, is_tracked_address, \
accounting_lock
from dexorder.accounting import accounting_fill, accounting_placement, accounting_lock
from dexorder.base.chain import current_chain
from dexorder.base.order import TrancheKey, OrderKey
from dexorder.base.orderlib import SwapOrderState
@@ -32,7 +31,6 @@ def dump_log(eventlog):
def init():
new_pool_prices.clear()
start_trigger_updates()
accounting_lock()
async def handle_order_placed(event: EventData):