diff --git a/alembic/versions/509010f13e8b_accounting_vaultcreation_ofac.py b/alembic/versions/509010f13e8b_accounting_vaultcreation_ofac.py new file mode 100644 index 0000000..d55a1df --- /dev/null +++ b/alembic/versions/509010f13e8b_accounting_vaultcreation_ofac.py @@ -0,0 +1,93 @@ +"""accounting, vaultcreation, ofac + +Revision ID: 509010f13e8b +Revises: 86afa7b6415d +Create Date: 2025-01-03 19:11:22.073682 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +import dexorder.database +import dexorder.database.column_types + +# revision identifiers, used by Alembic. +revision: str = '509010f13e8b' +down_revision: Union[str, None] = '86afa7b6415d' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.execute("ALTER TYPE transactionjobstate ADD VALUE 'Declined'") + op.create_table('accounting', + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('time', sa.DateTime(), nullable=False), + sa.Column('account', sa.String(), nullable=False), + sa.Column('category', sa.Enum('Transfer', 'Income', 'Expense', 'Trade', 'Special', name='accountingcategory'), nullable=False), + sa.Column('subcategory', sa.Enum('OrderFee', 'GasFee', 'FillFee', 'VaultCreation', 'Execution', 'InitialBalance', name='accountingsubcategory'), nullable=True), + sa.Column('token', sa.String(), nullable=False), + sa.Column('amount', dexorder.database.column_types.DecimalNumeric(), nullable=False), + sa.Column('value', dexorder.database.column_types.DecimalNumeric(), nullable=True), + sa.Column('chain_id', sa.Integer(), nullable=False), + sa.Column('tx_id', sa.String(), nullable=True), + sa.Column('note', sa.String(), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_accounting_category'), 'accounting', ['category'], unique=False) + op.create_index(op.f('ix_accounting_subcategory'), 'accounting', ['subcategory'], unique=False) + op.create_index(op.f('ix_accounting_time'), 'accounting', ['time'], unique=False) + op.create_index(op.f('ix_accounting_token'), 'accounting', ['token'], unique=False) + op.create_index(op.f('ix_accounting_account'), 'accounting', ['account'], unique=False) + op.create_table('ofac', + sa.Column('address', sa.String(), nullable=False), + sa.PrimaryKeyConstraint('address') + ) + op.create_table('ofacalerts', + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('time', sa.DateTime(), nullable=False), + sa.Column('address', sa.String(), nullable=False), + sa.Column('ip', sa.String(), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + op.create_table('vaultcreationrequest', + sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), + sa.Column('owner', sa.String(), nullable=False), + sa.Column('num', sa.Integer(), nullable=False), + sa.Column('time', sa.DateTime(), nullable=False), + sa.Column('ipaddr', postgresql.INET(), nullable=False), + sa.Column('vault', sa.String(), nullable=True), + sa.PrimaryKeyConstraint('chain', 'owner', 'num') + ) + op.create_index('ix_vault_address_not_null', 'vaultcreationrequest', ['vault'], unique=False, postgresql_where='vault IS NOT NULL') + op.create_table('accounts', + sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), + sa.Column('address', sa.String(), nullable=False), + sa.Column('kind', sa.Enum('Admin', 'OrderFee', 'GasFee', 'FillFee', 'Execution', name='accountkind'), nullable=False), + sa.Column('balances', dexorder.database.column_types.Balances(astext_type=sa.Text()), server_default='{}', nullable=False), + sa.PrimaryKeyConstraint('chain', 'address') + ) + op.create_index(op.f('ix_accounts_kind'), 'accounts', ['kind'], unique=False) + + +def downgrade() -> None: + op.drop_index(op.f('ix_accounting_account'), table_name='accounting') + op.drop_index(op.f('ix_accounts_kind'), table_name='accounts') + op.drop_table('accounts') + op.drop_index('ix_vault_address_not_null', table_name='vaultcreationrequest', postgresql_where='vault IS NOT NULL') + op.drop_table('vaultcreationrequest') + op.drop_table('ofacalerts') + # op.execute('drop sequence ofacalerts_id_seq') # autoincrement sequence + op.drop_table('ofac') + op.drop_index(op.f('ix_accounting_token'), table_name='accounting') + op.drop_index(op.f('ix_accounting_time'), table_name='accounting') + op.drop_index(op.f('ix_accounting_subcategory'), table_name='accounting') + op.drop_index(op.f('ix_accounting_category'), table_name='accounting') + op.drop_table('accounting') + # op.execute('drop sequence accounting_id_seq') # autoincrement sequence + op.execute('drop type accountkind') # enum type + op.execute('drop type accountingcategory') # enum type + op.execute('drop type accountingsubcategory') # enum type diff --git a/conf/finaldata/logging-finaldata.toml b/conf/finaldata/logging-finaldata.toml index 376ed2b..fb3e7f4 100644 --- a/conf/finaldata/logging-finaldata.toml +++ b/conf/finaldata/logging-finaldata.toml @@ -10,9 +10,15 @@ level='DEBUG' [handlers.console] class='logging.StreamHandler' -formatter='notime' +formatter='default' stream='ext://sys.stdout' +[formatters.default] +# https://docs.python.org/3/library/logging.html#logrecord-attributes +format='%(asctime)s %(levelname)s %(name)s %(message)s' +# https://docs.python.org/3/library/time.html#time.strftime +datefmt='%Y-%m-%d %H:%M:%S' + [formatters.notime] # https://docs.python.org/3/library/logging.html#logrecord-attributes format='%(levelname)s %(name)s %(message)s' diff --git a/conf/mirrorprice/dexorder-mirrorprice.toml b/conf/mirrorprice/dexorder-mirrorprice.toml index 2f46433..18c46e9 100644 --- a/conf/mirrorprice/dexorder-mirrorprice.toml +++ b/conf/mirrorprice/dexorder-mirrorprice.toml @@ -1,5 +1,8 @@ metadata='metadata.json' # the Dockerfile will move metadata-finaldata.json into positon -account = '${accounts.admin}' # todo switch back to accounts.gas +accounts = [ + # dev account #6 + '0x92db14e403b83dfe3df233f83dfa3a0d7096f21ca9b0d6d6b8d88b2b4ec1564e', # 0x976EA74026E726554dB657fA54763abd0C3a0aa9 +] rpc_url = '${rpc_urls.arbsep_alchemy}' mirror_source_rpc_url='${rpc_urls.arbsep_alchemy}' diff --git a/conf/mirrorprice/logging-mirrorprice.toml b/conf/mirrorprice/logging-mirrorprice.toml index 376ed2b..fb3e7f4 100644 --- a/conf/mirrorprice/logging-mirrorprice.toml +++ b/conf/mirrorprice/logging-mirrorprice.toml @@ -10,9 +10,15 @@ level='DEBUG' [handlers.console] class='logging.StreamHandler' -formatter='notime' +formatter='default' stream='ext://sys.stdout' +[formatters.default] +# https://docs.python.org/3/library/logging.html#logrecord-attributes +format='%(asctime)s %(levelname)s %(name)s %(message)s' +# https://docs.python.org/3/library/time.html#time.strftime +datefmt='%Y-%m-%d %H:%M:%S' + [formatters.notime] # https://docs.python.org/3/library/logging.html#logrecord-attributes format='%(levelname)s %(name)s %(message)s' diff --git a/conf/mock/.secret-mock.toml b/conf/mock/.secret-mock.toml index ed35c58..3b60898 100644 --- a/conf/mock/.secret-mock.toml +++ b/conf/mock/.secret-mock.toml @@ -1,6 +1,18 @@ -[accounts] - [rpc_urls] local='http://localhost:8545' local_ws='ws://localhost:8545' arbitrum_alchemy='https://arb-mainnet.g.alchemy.com/v2/opbIf1mSo9GMXLhA1a4nhwtEzscgGdhW' + +accounts = [ + # these are the first ten dev accounts in anvil/hardhat +# '0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80', # 0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 +# '0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d', # 0x70997970C51812dc3A010C7d01b50e0d17dc79C8 +# '0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a', # 0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC +# '0x7c852118294e51e653712a81e05800f419141751be58f605c371e15141b007a6', # 0x90F79bf6EB2c4f870365E785982E1f101E93b906 + '0x47e179ec197488593b187f80a00eb0da91f1b9d0b13f8733639f19c30a34926a', # 0x15d34AAf54267DB7D7c367839AAf71A00a2C6A65 + '0x8b3a350cf5c34c9194ca85829a2df0ec3153be0318b5e2d3348e872092edffba', # 0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc +# '0x92db14e403b83dfe3df233f83dfa3a0d7096f21ca9b0d6d6b8d88b2b4ec1564e', # 0x976EA74026E726554dB657fA54763abd0C3a0aa9 +# '0x4bbbf85ce3377467afe5d46f804f221813b2bb87f24d81f60f1fcdbf7cbf4356', # 0x14dC79964da2C08b23698B3D3cc7Ca32193d9955 +# '0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97', # 0x23618e81E3f5cdF7f54C3d65f7FBc0aBf5B21E8f +# '0x2a871d0798f97d79848a013d4936a73bf4cc922c825d33c1cf7073dff6d409c6', # 0xa0Ee7A142d267C1f36714E4a8F75612F20a79720 +] diff --git a/conf/mock/dexorder-mock.toml b/conf/mock/dexorder-mock.toml index 3889313..1c638e9 100644 --- a/conf/mock/dexorder-mock.toml +++ b/conf/mock/dexorder-mock.toml @@ -1,3 +1,2 @@ -account='test1' rpc_url='local' ws_url='local_ws' diff --git a/requirements-lock.txt b/requirements-lock.txt index 5609a5b..1fc4500 100644 --- a/requirements-lock.txt +++ b/requirements-lock.txt @@ -1,18 +1,19 @@ -aiohttp==3.9.5 +aiohappyeyeballs==2.4.3 +aiohttp==3.11.10 aiosignal==1.3.1 -alembic==1.13.3 +alembic==1.14.0 annotated-types==0.7.0 antlr4-python3-runtime==4.9.3 asn1crypto==1.5.1 async-lru==2.0.4 attrs==23.2.0 bip-utils==2.9.3 -bitarray==2.9.2 +bitarray==3.0.0 cachetools==5.5.0 cbor2==5.6.4 certifi==2024.2.2 cffi==1.16.0 -charset-normalizer==3.3.2 +charset-normalizer==3.4.0 ckzg==1.0.2 coincurve==20.0.0 crcmod==1.7 @@ -25,7 +26,7 @@ eth-account==0.11.3 eth-bloom==3.0.1 eth-hash==0.7.0 eth-keyfile==0.8.1 -eth-keys==0.5.1 +eth-keys==0.6.0 eth-rlp==1.0.1 eth-typing==4.4.0 eth-utils==4.1.1 @@ -44,14 +45,15 @@ Mako==1.3.3 MarkupSafe==2.1.5 msgpack-python==0.5.6 multidict==6.0.5 -numpy==2.1.2 +numpy==2.2.0 oauthlib==3.2.2 omegaconf==2.3.0 -orjson==3.10.7 +orjson==3.10.12 parsimonious==0.10.0 pdpyras==5.3.0 +propcache==0.2.0 protobuf==5.26.1 -psycopg2-binary==2.9.9 +psycopg2-binary==2.9.10 py-sr25519-bindings==0.2.0 pyasn1==0.6.1 pyasn1_modules==0.4.1 @@ -61,12 +63,13 @@ pydantic==2.9.2 pydantic_core==2.23.4 PyNaCl==1.5.0 python-dateutil==2.9.0.post0 +pytz==2024.2 pyunormalize==15.1.0 PyYAML==6.0.1 -redis==5.1.1 +redis==5.2.1 referencing==0.35.0 regex==2024.4.28 -requests==2.31.0 +requests==2.32.3 requests-oauthlib==2.0.0 rlp==4.0.1 rpds-py==0.18.0 @@ -74,12 +77,12 @@ rsa==4.9 six==1.16.0 socket.io-emitter==0.1.5.1 sortedcontainers==2.4.0 -SQLAlchemy==2.0.35 +SQLAlchemy==2.0.36 toolz==0.12.1 types-requests==2.32.0.20240914 -typing_extensions==4.11.0 +typing_extensions==4.12.2 urllib3==2.2.1 web3==6.20.3 websocket-client==1.8.0 -websockets==13.1 -yarl==1.9.4 +websockets==14.1 +yarl==1.17.2 diff --git a/requirements.txt b/requirements.txt index 927247b..27c7ad9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ sqlalchemy alembic omegaconf -web3 +web3>=6,<7 psycopg2-binary orjson sortedcontainers @@ -28,3 +28,4 @@ typing_extensions requests aiohttp charset-normalizer +pytz diff --git a/src/dexorder/__init__.py b/src/dexorder/__init__.py index 19b5eab..b191ab4 100644 --- a/src/dexorder/__init__.py +++ b/src/dexorder/__init__.py @@ -41,6 +41,7 @@ class _FalseToken (_Token): NARG = _FalseToken('NARG') DELETE = _FalseToken('DELETE') # used as a value token to indicate removal of the key ADDRESS_0 = '0x0000000000000000000000000000000000000000' +NATIVE_TOKEN = '0x0000000000000000000000000000000000000001' # We use 0x01 to indicate the use of native ETH wherever a token address is normally required WEI = 1 GWEI = 1_000_000_000 ETH = 1_000_000_000_000_000_000 diff --git a/src/dexorder/accounting.py b/src/dexorder/accounting.py new file mode 100644 index 0000000..90cf006 --- /dev/null +++ b/src/dexorder/accounting.py @@ -0,0 +1,215 @@ +import asyncio +import logging + +from typing_extensions import Optional +from web3.exceptions import ContractLogicError +from web3.types import EventData + +from dexorder import db, dec, NATIVE_TOKEN, from_timestamp, config, ADDRESS_0, now +from dexorder.base import TransactionReceiptDict +from dexorder.base.chain import current_chain +from dexorder.blocks import get_block_timestamp +from dexorder.contract import ContractProxy +from dexorder.contract.dexorder import get_factory_contract, get_mirrorenv, get_mockenv +from dexorder.database.model.accounting import AccountingSubcategory, Accounting, AccountingCategory, AccountKind, \ + Accounts +from dexorder.pools import mark_to_market, pool_prices, get_pool, add_mark_pool, quotes +from dexorder.tokens import adjust_decimals as adj_dec, get_token, get_balance +from dexorder.util import hexstr + +log = logging.getLogger(__name__) + +accounting_initialized = False + +# noinspection PyTypeChecker +order_fee_account_addr: str = None +# noinspection PyTypeChecker +gas_fee_account_addr: str = None +# noinspection PyTypeChecker +fill_fee_account_addr: str = None + + +async def initialize_accounting(): + global accounting_initialized + if not accounting_initialized: + await initialize_mark_to_market() # set up mark-to-market first, so accounts can value their initial balances + await initialize_accounts() + accounting_initialized = True + log.info(f'accounting initialized\n\tstablecoins: {config.stablecoins}\n\tquotecoins: {config.quotecoins}\n\tnativecoin: {config.nativecoin}') + + +async def initialize_accounts(): + factory_contract = get_factory_contract() + implementation_address = await factory_contract.implementation() + fee_manager_address = await ContractProxy(implementation_address, 'IVaultImpl').feeManager() + fee_manager = ContractProxy(fee_manager_address, 'IFeeManager') + global order_fee_account_addr, gas_fee_account_addr, fill_fee_account_addr + order_fee_account_addr, gas_fee_account_addr, fill_fee_account_addr = await asyncio.gather( + fee_manager.orderFeeAccount(), + fee_manager.gasFeeAccount(), + fee_manager.fillFeeAccount() + ) + # Since this is called by top-level main functions outside the Runner, we trigger an explicit db commit/rollback + try: + # noinspection PyStatementEffect + await initialize_accounts_2() + db.session.commit() + except: + db.session.rollback() + raise + + +async def initialize_accounts_2(): + of_account = ensure_account(order_fee_account_addr, AccountKind.OrderFee) + gf_account = ensure_account(gas_fee_account_addr, AccountKind.GasFee) + ff_account = ensure_account(fill_fee_account_addr, AccountKind.FillFee) + if current_chain.get().id in [1337, 31337]: + log.debug('initializing debug accounts') + await asyncio.gather( + adjust_balance(of_account), + adjust_balance(gf_account), + adjust_balance(ff_account), + ) + + +async def initialize_mark_to_market(): + quotes.clear() + quotes.extend(config.stablecoins) + quotes.extend(config.quotecoins) + if not quotes and current_chain.get().id in [1337, 31337]: + weth = meh = usdc = usxd = None + mirror = get_mirrorenv() + if mirror is not None: + num_tokens = await mirror.numTokens() + for i in range(num_tokens): + try: + token_key = await mirror.tokenKeys(i) + except ContractLogicError: + break + else: + mirror_token = await mirror.tokens(token_key) + token = await get_token(mirror_token) + log.info(f'found mirror token {token["symbol"]} {mirror_token}') + if token['symbol'] == 'WETH': + weth = mirror_token + elif token['symbol'] == 'USDC': + usdc = mirror_token + + mock = get_mockenv() + if mock is not None: + meh = await mock.COIN() + usxd = await mock.USD() + + config.stablecoins = [t for t in (usdc, usxd) if t is not None] + config.quotecoins = [t for t in (weth, meh) if t is not None] + if not config.nativecoin: + config.nativecoin = weth if weth is not None else meh if meh is not None else None + elif not config.nativecoin: + factory = await get_factory_contract() + wrapper = await factory.wrapper() + if wrapper != ADDRESS_0: + config.nativecoin = wrapper + quotes.clear() + quotes.extend(config.stablecoins) + quotes.extend(config.quotecoins) + for addr in pool_prices.keys(): + pool = await get_pool(addr) + add_mark_pool(addr, pool['base'], pool['quote'], pool['fee']) + + +async def handle_feeaccountschanged(fee_accounts: EventData): + try: + global order_fee_account_addr, gas_fee_account_addr, fill_fee_account_addr + order_fee_account_addr = fee_accounts['args']['orderFeeAccount'] + gas_fee_account_addr = fee_accounts['args']['gasFeeAccount'] + fill_fee_account_addr = fee_accounts['args']['fillFeeAccount'] + await initialize_accounts_2() + except KeyError: + log.warning(f'Could not parse FeeAccountsChanged {fee_accounts}') + return + + +def ensure_account(addr: str, kind: AccountKind): + chain = current_chain.get() + found = db.session.get(Accounts, (chain, addr)) + if found: + if found.kind != kind: + log.warning(f'Account {addr} has wrong kind {found.kind} != {kind}') + found.kind = kind + db.session.add(found) + else: + found = Accounts(chain=chain, address=addr, kind=kind, balances={}) + db.session.add(found) + return found + + +async def accounting_transfer(receipt: TransactionReceiptDict, token: str, sender: str, receiver: str, amount: dec, adjust_decimals=True): + block_hash = hexstr(receipt['blockHash']) + tx_id = hexstr(receipt['transactionHash']) + await asyncio.gather( + add_accounting_row( sender, block_hash, tx_id, AccountingCategory.Transfer, None, + token, -amount, receiver, adjust_decimals=adjust_decimals), + add_accounting_row( receiver, block_hash, tx_id, AccountingCategory.Transfer, None, + token, amount, sender, adjust_decimals=adjust_decimals), + ) + + +async def accounting_transaction_gas(receipt: TransactionReceiptDict, subcategory: AccountingSubcategory): + """ Accounts for the gas spent on the given transaction """ + await add_accounting_row( receipt['from'], + hexstr(receipt['blockHash']), hexstr(receipt['transactionHash']), + AccountingCategory.Expense, subcategory, NATIVE_TOKEN, -dec(receipt['gasUsed']) + ) + + +async def accounting_placement(order_placed: EventData): + block_hash = hexstr(order_placed['blockHash']) + tx_id = hexstr(order_placed['transactionHash']) + try: + order_fee = int(order_placed['args']['orderFee']) + gas_fee = int(order_placed['args']['gasFee']) + except KeyError: + log.warning(f'Rogue DexorderPlacedEvent in tx {hexstr(tx_id)}') + return + await add_accounting_row( order_fee_account_addr, block_hash, tx_id, AccountingCategory.Income, + AccountingSubcategory.OrderFee, NATIVE_TOKEN, order_fee) + await add_accounting_row( gas_fee_account_addr, block_hash, tx_id, AccountingCategory.Income, + AccountingSubcategory.GasFee, NATIVE_TOKEN, gas_fee) + + +async def accounting_fill(fill: EventData, out_token: str): + block_hash = hexstr(fill['blockHash']) + tx_id = hexstr(fill['transactionHash']) + fee = int(fill['args']['fillFee']) + await add_accounting_row(fill_fee_account_addr, block_hash, tx_id, AccountingCategory.Income, + AccountingSubcategory.FillFee, out_token, fee, adjust_decimals=True) + + +async def add_accounting_row(account: str, block_hash: Optional[str], tx_id: Optional[str], category, subcategory, token, amount, note=None, + *, adjust_decimals=True): + if amount == 0: + return + if adjust_decimals: + amount = await adj_dec(token, amount) + # noinspection PyTypeChecker + time = now() if block_hash is None else from_timestamp(await get_block_timestamp(block_hash)) + value = mark_to_market(token, amount) + log.debug(f'accounting row {time} {account} {category} {subcategory} {token} {amount} ${value}') + db.session.add(Accounting( account=account, + time=time, category=category, subcategory=subcategory, + token=token, amount=amount, value=value, note=note, + chain_id=current_chain.get().id, tx_id=tx_id, + )) + # Adjust database account if it exists + account_db = db.session.get(Accounts, (current_chain.get(), account)) + if account_db is not None: + new_amount = account_db.balances.get(token,dec(0)) + amount + if new_amount < 0: + log.error(f'negative balance for account {account} when applying accounting row {time} {category} {subcategory} {token} {amount} ${value}') + account_db.balances[token] = new_amount + db.session.add(account_db) # deep changes would not be detected by the ORM + +async def adjust_balance(account: Accounts, token=NATIVE_TOKEN, subcategory=AccountingSubcategory.InitialBalance, note=None): + true_balance = await get_balance(account.address, token) + amount = true_balance - account.balances.get(token,dec(0)) + await add_accounting_row(account.address, None, None, AccountingCategory.Special, subcategory, NATIVE_TOKEN, amount, note) diff --git a/src/dexorder/base/__init__.py b/src/dexorder/base/__init__.py index 5535b7f..cd62662 100644 --- a/src/dexorder/base/__init__.py +++ b/src/dexorder/base/__init__.py @@ -1,5 +1,6 @@ +from abc import abstractmethod from dataclasses import dataclass -from typing import TypedDict, Union, Type +from typing import TypedDict, Union, Type, Any, Callable Address = str Quantity = Union[str,int] @@ -13,8 +14,13 @@ class TransactionRequest: """ type: str + def __init__(self, type: str, key: Any): + self.type = type + self.key = key + + # subclasses of TransactionRequest must register their type code here so the appropriate dataclass may be constructed -transaction_request_registry: dict[str, Type[TransactionRequest]] = {} +transaction_request_deserializers: dict[str, Callable[[...],TransactionRequest]] = {} TransactionDict = TypedDict( 'TransactionDict', { diff --git a/src/dexorder/base/account.py b/src/dexorder/base/account.py index 96c2e4d..b3660c1 100644 --- a/src/dexorder/base/account.py +++ b/src/dexorder/base/account.py @@ -1,81 +1,90 @@ -from contextvars import ContextVar -from typing import Union, Optional +import asyncio +import logging +from typing import Optional import eth_account from eth_account.signers.local import LocalAccount from web3.middleware import construct_sign_and_send_raw_middleware -from dexorder import NARG, config, current_w3 +from dexorder import config, current_w3 +from dexorder.base.chain import current_chain + +log = logging.getLogger(__name__) # this is just here for typing the extra .name. the __new__() function returns an eth_account...LocalAccount # we do it this way because web3py expects a LocalAccount object but we cannot construct one directly with a super() # call but must instead use a factory :( class Account (LocalAccount): + _main_account = None + _pool = None + _pool_count = 0 @staticmethod - def get_named(account_name: str) -> Optional['Account']: - account = config.accounts.get(account_name) - return Account.get(account) if account else Account.get() + def get(): + """ + Always returns the main account, even if it's busy. + """ + Account._init_pool() + return Account._main_account @staticmethod - # noinspection PyInitNewSignature - def get(account:[Union,str]=NARG) -> Optional['Account']: - if account is NARG: - account = config.account - if type(account) is not str: - return account - - key_str = config.accounts.get(account, account) + async def acquire(): + """ + MUST call account.release() after the transaction has completed, to return this Account to the available pool. + """ + Account._init_pool() + log.debug(f'available accounts: {Account._pool.qsize()}') try: - local_account = eth_account.Account.from_key(key_str) - return Account(local_account, key_str, account) - except ValueError: - try: - # was the key missing a leading '0x'? - fixed = '0x' + key_str - local_account = eth_account.Account.from_key(fixed) - print(f'WARNING: account "{account}" is missing a leading "0x"') - return Account(local_account, fixed, account) - except ValueError: - pass - try: - # was the key an integer posing as a string? - converted = f'{int(key_str):#0{66}x}' - local_account = eth_account.Account.from_key(converted) - print(f'WARNING: account "{account}" is set as an integer instead of a string. Converted to: {converted}') - return Account(local_account, converted, account) - except ValueError: - pass - raise ValueError(f'Could not construct account for name "{account}"') + async with asyncio.timeout(1): + result = await Account._pool.get() + except asyncio.TimeoutError: + log.error('waiting for an available account') + result = await Account._pool.get() + Account._pool_count -= 1 + return result - def __init__(self, local_account: LocalAccount, key_str, name: str): # todo chain_id? + @staticmethod + def _init_pool(): + if Account._pool is None: + Account._pool = asyncio.Queue() + Account._pool_count = 0 + for key in config.accounts: + local_account = eth_account.Account.from_key(key) + account = Account(local_account) + if Account._main_account is None: + Account._main_account = account + Account._pool.put_nowait(account) + Account._pool_count += 1 + + + def __init__(self, local_account: LocalAccount): # todo chain_id? super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code - self.name = name - self.key_str = key_str + self.chain_id = current_chain.get().id self.signing_middleware = construct_sign_and_send_raw_middleware(self) self._nonce: Optional[int] = None + self.tx_id: Optional[str] = None # current transaction id async def next_nonce(self): if self._nonce is None: self._nonce = await current_w3.get().eth.get_transaction_count(self.address, 'pending') + log.debug(f'queried nonce for account {self.address}: {self._nonce}') else: self._nonce += 1 return self._nonce - def attach(self, w3): - w3.eth.default_account = self.address - try: - w3.middleware_onion.remove('account_signer') - except ValueError: - pass - w3.middleware_onion.add(self.signing_middleware, 'account_signer') + def reset_nonce(self): + self._nonce = None def balance(self): return current_w3.get().eth.get_balance(self.address) + def release(self): + Account._pool.put_nowait(self) + Account._pool_count += 1 + def __str__(self): - return self.name + return self.address - -current_account: ContextVar[Optional[Account]] = ContextVar('current_account', default=Account.get()) + def __hash__(self): + return \ No newline at end of file diff --git a/src/dexorder/base/chain.py b/src/dexorder/base/chain.py index 682c7a7..70e4cec 100644 --- a/src/dexorder/base/chain.py +++ b/src/dexorder/base/chain.py @@ -1,6 +1,5 @@ -import logging from contextvars import ContextVar -from datetime import datetime, timezone +from datetime import datetime import dexorder diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index d17cff4..b33d282 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -17,8 +17,10 @@ from dexorder.memcache import memcache from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.order.executionhandler import handle_dexorderexecutions, execute_tranches from dexorder.order.triggers import activate_orders, end_trigger_updates +from dexorder.accounting import initialize_accounting from dexorder.runner import BlockStateRunner from dexorder.transactions import handle_transaction_receipts, finalize_transactions +from dexorder.vaultcreationhandler import handle_vault_creation_requests log = logging.getLogger('dexorder') LOG_ALL_EVENTS = False # for debug todo config @@ -68,6 +70,7 @@ def setup_logevent_triggers(runner): runner.add_event_trigger(handle_transaction_receipts) # todo handle only the transactions that were posted to this block runner.add_event_trigger(handle_dexorderexecutions, executions) + runner.add_event_trigger(handle_vault_creation_requests) runner.add_callback(end_trigger_updates) runner.add_callback(execute_tranches) @@ -75,7 +78,7 @@ def setup_logevent_triggers(runner): # noinspection DuplicatedCode async def main(): - await blockchain.connect() + await blockchain.connect(autosign=False) # the transaction manager checks out accounts and releases them. redis_state = None state = None if memcache: @@ -103,6 +106,8 @@ async def main(): log.info('initializing redis with root state') await redis_state.save(state.root_fork, state.diffs_by_branch[state.root_branch.id]) + await initialize_accounting() + runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None) setup_logevent_triggers(runner) # OHLC printing hard-disabled for main. Use the finaldata process. diff --git a/src/dexorder/bin/mirror.py b/src/dexorder/bin/mirror.py index f8d5221..9b61a78 100644 --- a/src/dexorder/bin/mirror.py +++ b/src/dexorder/bin/mirror.py @@ -124,9 +124,9 @@ async def main(): if not pools: log.error('must configure mirror_pools') return - if config.account is None: - # Dev Account #5 - config.account = '0x8b3a350cf5c34c9194ca85829a2df0ec3153be0318b5e2d3348e872092edffba' + if not config.accounts: + # Dev Account #6 0x976EA74026E726554dB657fA54763abd0C3a0aa9 + config.accounts = ['0x92db14e403b83dfe3df233f83dfa3a0d7096f21ca9b0d6d6b8d88b2b4ec1564e'] await blockchain.connect(name='target') mirror_addr = config.mirror_env diff --git a/src/dexorder/blockchain/connection.py b/src/dexorder/blockchain/connection.py index 14a468d..075b921 100644 --- a/src/dexorder/blockchain/connection.py +++ b/src/dexorder/blockchain/connection.py @@ -3,11 +3,8 @@ import logging from random import random from typing import Any, Optional, Union -import requests -import requests.adapters - # noinspection PyPackageRequirements -from aiohttp import ClientResponseError, ClientSession, BaseConnector, ClientTimeout, TCPConnector +from aiohttp import ClientResponseError, ClientSession, ClientTimeout, TCPConnector from eth_typing import URI from hexbytes import HexBytes from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider @@ -51,13 +48,14 @@ async def create_w3(rpc_url=None, account=NARG, autosign=True, name='default'): w3.eth.Contract = _make_contract(w3.eth) has_account = False if autosign: - a = Account.get(account) - if a is not None: + if account is NARG: + account = Account.get() + if account is not None: # noinspection PyTypeChecker - w3.middleware_onion.add(await async_construct_sign_and_send_raw_middleware(a)) - w3.eth.default_account = a.address + w3.middleware_onion.add(await async_construct_sign_and_send_raw_middleware(account)) + w3.eth.default_account = account.address has_account = True - log.info(f'{name} w3 configured to autosign as {a.address}') + log.info(f'{name} w3 configured to autosign as {account.address}') if not has_account: log.info(f'No account set for {name} w3') return w3 diff --git a/src/dexorder/blocks.py b/src/dexorder/blocks.py index c2b723c..9907645 100644 --- a/src/dexorder/blocks.py +++ b/src/dexorder/blocks.py @@ -136,7 +136,11 @@ async def fetch_block(blockhash, *, chain_id=None) -> Optional[Block]: chain_id = current_chain.get().id response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False]) # log.debug(f'fetch_block response {blockhash} {chain_id} {response}') - blockdict: BlockInfo = response['result'] + try: + blockdict: BlockInfo = response['result'] + except KeyError: + log.error(f'fetch_block got strange response {response}') + return None if blockdict is None: log.debug(f'block {blockhash} not found') return None diff --git a/src/dexorder/configuration/load.py b/src/dexorder/configuration/load.py index 6425646..5646284 100644 --- a/src/dexorder/configuration/load.py +++ b/src/dexorder/configuration/load.py @@ -6,7 +6,6 @@ from omegaconf import OmegaConf, DictConfig from omegaconf.errors import OmegaConfBaseException from .schema import Config -from .standard_accounts import default_accounts_config schema = OmegaConf.structured(Config()) @@ -19,7 +18,6 @@ def load_config(): # noinspection PyTypeChecker result:ConfigDict = OmegaConf.merge( schema, - load_accounts(), from_toml('.secret.toml'), from_toml('dexorder.toml'), from_toml('config.toml'), @@ -28,15 +26,6 @@ def load_config(): return result -def load_accounts(): - accounts_conf = OmegaConf.create({'accounts': default_accounts_config}) - try: - OmegaConf.merge(schema, accounts_conf) - return accounts_conf - except OmegaConfBaseException as _x: - raise ConfigException(f'Error while processing default accounts:\n{_x}') - - def from_env(prefix='DEXORDER_'): merge = {} for key, value in os.environ.items(): diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index d70f92e..9c742a5 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -29,8 +29,7 @@ class Config: polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead backfill: int = 0 # if not 0, then runner will initialize an empty database by backfilling from the given block height. Use negative numbers to indicate a number of blocks before the present. - account: Optional[str] = None # may be a private key or an account alias - accounts: Optional[dict[str,str]] = field(default_factory=dict) # account aliases + accounts: list[str] = field(default_factory=list) # the pool of accounts is used round-robin min_gas: str = '0' # Order slashing @@ -48,3 +47,7 @@ class Config: mirror_env: Optional[str] = None pagerduty: Optional[str] = None + + stablecoins: list[str] = field(default_factory=list) # primary stablecoins which are marked to $1 + quotecoins: list[str] = field(default_factory=list) # quote tokens like WETH that have stablecoin markets + nativecoin: Optional[str] = None # used for accounting of native values. e.g. address of WETH diff --git a/src/dexorder/contract/dexorder.py b/src/dexorder/contract/dexorder.py index 4bf67d5..531bac3 100644 --- a/src/dexorder/contract/dexorder.py +++ b/src/dexorder/contract/dexorder.py @@ -3,6 +3,7 @@ import logging from eth_abi.packed import encode_packed from eth_utils import keccak, to_bytes, to_checksum_address +from typing_extensions import Optional from dexorder.base.chain import current_chain from dexorder.contract import ContractProxy @@ -38,6 +39,14 @@ def get_dexorder_contract() -> ContractProxy: def get_vault_init_code_hash() -> bytes: return get_by_chain(_vault_init_code_hash) +def get_mockenv() -> Optional[ContractProxy]: + addr = chain_info.get(str(current_chain.get().id),{}).get('mockenv') + return ContractProxy(addr, 'MockEnv') if addr is not None else None + +def get_mirrorenv() -> Optional[ContractProxy]: + addr = chain_info.get(str(current_chain.get().id),{}).get('mirrorenv') + return ContractProxy(addr, 'MirrorEnv') if addr is not None else None + def vault_address(owner, num): salt = keccak(encode_packed(['address','uint8'],[owner,num])) contract_address = keccak( diff --git a/src/dexorder/database/column_types.py b/src/dexorder/database/column_types.py index b06cd12..74d4b4c 100644 --- a/src/dexorder/database/column_types.py +++ b/src/dexorder/database/column_types.py @@ -1,12 +1,14 @@ import dataclasses +import json import math -from typing import Union +from typing import Union, Optional, Any -from sqlalchemy import TypeDecorator, BIGINT -from sqlalchemy.dialects.postgresql import BYTEA, JSONB +from sqlalchemy import TypeDecorator, BIGINT, Dialect +from sqlalchemy.dialects.postgresql import BYTEA, JSONB, NUMERIC +from sqlalchemy.sql.type_api import _T from web3 import Web3 -from dexorder import Fixed2 as NativeFixed, Blockchain as NativeBlockchain +from dexorder import Fixed2 as NativeFixed, Blockchain as NativeBlockchain, dec from dexorder.util import hexstr, hexbytes @@ -74,6 +76,31 @@ def Fixed(bits, dbits, signed=False): return result +class DecimalNumeric (TypeDecorator): + impl = NUMERIC + + def process_bind_param(self, value, dialect): + return value + + def process_result_value(self, value, dialect): + return dec(value) + + +class Balances (TypeDecorator): + """ + Dictionary of decimals keyed by strings + """ + impl = JSONB + + def process_bind_param(self, value: Optional[_T], dialect: Dialect) -> Any: + return json.dumps({k: str(v) for k,v in value.items()}) + + def process_result_value( + self, value: Optional[Any], dialect: Dialect + ) -> Optional[_T]: + return {k: dec(v) for k,v in json.loads(value).items()} + + class DataclassDictBase(TypeDecorator): impl = JSONB diff --git a/src/dexorder/database/model/__init__.py b/src/dexorder/database/model/__init__.py index 6b33f78..87ddda2 100644 --- a/src/dexorder/database/model/__init__.py +++ b/src/dexorder/database/model/__init__.py @@ -6,3 +6,6 @@ from .dbblock import DbBlock from .orderindex import OrderIndex from .pool import Pool from .token import Token +from .ofac import OFAC, OFACAlerts +from .accounting import Accounting +from .vaultcreationrequest import VaultCreationRequest diff --git a/src/dexorder/database/model/accounting.py b/src/dexorder/database/model/accounting.py new file mode 100644 index 0000000..184c770 --- /dev/null +++ b/src/dexorder/database/model/accounting.py @@ -0,0 +1,70 @@ +import logging +from datetime import datetime +from decimal import Decimal as dec +from enum import Enum + +from sqlalchemy import ForeignKey +from sqlalchemy.orm import Mapped, mapped_column +from typing_extensions import Optional + +from dexorder import now +from dexorder.database.column import Blockchain +from dexorder.database.column_types import DecimalNumeric, Balances +from dexorder.database.model import Base + +log = logging.getLogger(__name__) + + +class AccountingCategory (Enum): + Transfer = 0 + Income = 1 + Expense = 2 + Trade = 3 + Special = 4 + +class AccountingSubcategory (Enum): + # Income + OrderFee = 0 + GasFee = 1 + FillFee = 2 + + # Expense + VaultCreation = 3 + Execution = 4 + + # Transfer + # Transfers have no subcategories, but the note field will be the address of the other account. Both a debit and a + # credit entry will be created, one for each account participating in the transfer. + + # Special Codes + InitialBalance = 5 + + +class Accounting (Base): + id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + time: Mapped[datetime] = mapped_column(default=now(), index=True) + account: Mapped[str] = mapped_column(index=True) + category: Mapped[AccountingCategory] = mapped_column(index=True) + subcategory: Mapped[Optional[AccountingSubcategory]] = mapped_column(index=True) + token: Mapped[str] = mapped_column(index=True) + amount: Mapped[dec] = mapped_column(DecimalNumeric) + value: Mapped[Optional[dec]] = mapped_column(DecimalNumeric) # USD value of the amount. If NULL then accounting has not been done for this row. + + chain_id: Mapped[int] + tx_id: Mapped[Optional[str]] + note: Mapped[Optional[str]] # format depends on the type of entry + +class AccountKind (Enum): + Admin = 0 # administrative (contract deployments etc) + OrderFee = 1 # receives order placement fees + GasFee = 2 # receives gas fees + FillFee = 3 # receives fill fees + Execution = 4 # spends gas + + +class Accounts(Base): + chain: Mapped[Blockchain] = mapped_column(primary_key=True) + address: Mapped[str] = mapped_column(primary_key=True) + kind: Mapped[AccountKind] = mapped_column(index=True) + balances: Mapped[dict[str, dec]] = mapped_column(Balances, default=dict, server_default="{}") + diff --git a/src/dexorder/database/model/ofac.py b/src/dexorder/database/model/ofac.py new file mode 100644 index 0000000..39bcbd4 --- /dev/null +++ b/src/dexorder/database/model/ofac.py @@ -0,0 +1,26 @@ +import logging +from datetime import datetime + +from sqlalchemy.orm import Mapped +from sqlalchemy.testing.schema import mapped_column +from typing_extensions import Optional + +from dexorder import now +from dexorder.database.model import Base + +log = logging.getLogger(__name__) + +# todo check broad country restrictions + +class OFAC (Base): + address: Mapped[str] = mapped_column(primary_key=True) + +class OFACAlerts (Base): + """ + This table records any time when a banned address tries to use our service. + """ + id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + time: Mapped[datetime] = mapped_column(default=now()) + address: Mapped[str] + ip: Mapped[Optional[str]] + diff --git a/src/dexorder/database/model/token.py b/src/dexorder/database/model/token.py index 63f7de2..9fc53bc 100644 --- a/src/dexorder/database/model/token.py +++ b/src/dexorder/database/model/token.py @@ -39,13 +39,13 @@ class Token (Base): chain: Mapped[Blockchain] = mapped_column(primary_key=True) address: Mapped[Address] = mapped_column(primary_key=True) - name: Mapped[str] + name: Mapped[str] # indexed below symbol: Mapped[str] = mapped_column(index=True) decimals: Mapped[Uint8] approved: Mapped[bool] = mapped_column(index=True) __table_args__ = ( - Index('idx_name', 'name', postgresql_using='gist'), # full text search on name + Index('ix_token_name', 'name', postgresql_using='gist'), # full text search on name ) diff --git a/src/dexorder/database/model/transaction.py b/src/dexorder/database/model/transaction.py index b30eb27..1a707ca 100644 --- a/src/dexorder/database/model/transaction.py +++ b/src/dexorder/database/model/transaction.py @@ -5,7 +5,7 @@ from typing import Optional import sqlalchemy as sa from sqlalchemy.orm import mapped_column, Mapped -from dexorder.base import TransactionRequest, transaction_request_registry +from dexorder.base import TransactionRequest, transaction_request_deserializers from dexorder.database.column import Dict, Bytes, UUID_PK, Blockchain from dexorder.database.column_types import DataclassDict from dexorder.database.model import Base @@ -18,6 +18,7 @@ class TransactionJobState (Enum): Sent = 's' # tx has been delivered to a node Mined = 'z' # mined on at least one fork, whether reverted or not. todo handle forks that didnt confirm: receipts are per-fork! Error = 'x' # an exception has prevented this job from sending a transaction + Declined = 'd' # the transaction builder successfully returned None # noinspection PyProtectedMember @@ -27,7 +28,7 @@ TransactionJobStateColumnType = sa.Enum(TransactionJobState) def deserialize_transaction_request(**d): t = d['type'] - Class = transaction_request_registry.get(t) + Class = transaction_request_deserializers.get(t) if Class is None: raise ValueError(f'No TransactionRequest for type "{t}"') # noinspection PyArgumentList diff --git a/src/dexorder/database/model/vaultcreationrequest.py b/src/dexorder/database/model/vaultcreationrequest.py new file mode 100644 index 0000000..1947598 --- /dev/null +++ b/src/dexorder/database/model/vaultcreationrequest.py @@ -0,0 +1,25 @@ +import logging +from datetime import datetime +from typing import Optional + +from sqlalchemy.dialects.postgresql import INET +from sqlalchemy.orm import mapped_column, Mapped +from sqlalchemy.schema import Index + +from dexorder.database.column import Blockchain +from dexorder.database.model import Base + +log = logging.getLogger(__name__) + +class VaultCreationRequest (Base): + chain: Mapped[Blockchain] = mapped_column(primary_key=True) + owner: Mapped[str] = mapped_column(primary_key=True) + num: Mapped[int] = mapped_column(primary_key=True) + time: Mapped[datetime] + ipaddr: Mapped[str] = mapped_column(INET) + vault: Mapped[Optional[str]] = mapped_column(default=None, server_default=None) # filled in after the vault is indeed created + + __table_args__ = ( + Index("ix_vault_address_not_null", "vault", postgresql_where="vault IS NOT NULL"), + ) + diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index 597bda8..70e9fa7 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -3,12 +3,14 @@ import logging from web3.types import EventData -from dexorder import current_pub, minutely +from dexorder import db +from dexorder.accounting import accounting_fill, accounting_placement from dexorder.base.chain import current_chain from dexorder.base.order import TrancheKey, OrderKey from dexorder.base.orderlib import SwapOrderState from dexorder.blocks import get_block_timestamp -from dexorder.contract.dexorder import vault_address, VaultContract, get_factory_contract +from dexorder.contract.dexorder import VaultContract, get_factory_contract +from dexorder.database.model import VaultCreationRequest from dexorder.impls import get_impl_version from dexorder.ohlc import ohlcs from dexorder.order.orderstate import Order @@ -16,7 +18,7 @@ from dexorder.order.triggers import (OrderTriggers, activate_order, update_balan update_price_triggers) from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data from dexorder.util import hexstr -from dexorder.vault_blockdata import vault_owners, adjust_balance, MAX_VAULTS, verify_vault +from dexorder.vault_blockdata import vault_owners, adjust_balance, verify_vault, publish_vaults log = logging.getLogger(__name__) @@ -31,14 +33,16 @@ def init(): async def handle_order_placed(event: EventData): - # event DexorderPlaced (uint64 startOrderIndex, uint8 numOrders); + # event DexorderSwapPlaced (uint64 startOrderIndex, uint8 numOrders, uint); addr = event['address'] - start_index = int(event['args']['startOrderIndex']) - num_orders = int(event['args']['numOrders']) - # todo accounting - order_fee = int(event['args']['orderFee']) - gas_fee = int(event['args']['gasFee']) - log.debug(f'DexorderPlaced {addr} {start_index} {num_orders}') + try: + start_index = int(event['args']['startOrderIndex']) + num_orders = int(event['args']['numOrders']) + except KeyError: + log.warning(f'Rogue DexorderSwapPlaced in tx {hexstr(event["transactionHash"])}') + return + await accounting_placement(event) + log.debug(f'DexorderSwapPlaced {addr} {start_index} {num_orders}') if not await verify_vault(addr): log.warning(f'Discarding order from rogue vault {addr}.') return @@ -60,17 +64,22 @@ async def handle_swap_filled(event: EventData): log.debug(f'DexorderSwapFilled {event}') args = event['args'] vault = event['address'] - order_index = args['orderIndex'] - tranche_index = args['trancheIndex'] - amount_in = args['amountIn'] - amount_out = args['amountOut'] - fill_fee = args['fillFee'] - next_execution_time = args['nextExecutionTime'] + try: + order_index = args['orderIndex'] + tranche_index = args['trancheIndex'] + amount_in = args['amountIn'] + amount_out = args['amountOut'] + fill_fee = args['fillFee'] + next_execution_time = args['nextExecutionTime'] + except KeyError: + log.warning(f'Rogue DexorderSwapFilled in tx {hexstr(event["transactionHash"])}') + return try: order: Order = Order.of(vault, order_index) except KeyError: log.warning(f'DexorderSwapFilled IGNORED due to missing order {vault} {order_index}') return + await accounting_fill(event, order.order.tokenOut) order.status.trancheStatus[tranche_index].activationTime = next_execution_time # update rate limit try: triggers = OrderTriggers.instances[order.key] @@ -159,20 +168,22 @@ async def handle_vault_created(created: EventData): except KeyError: log.debug('couldnt parse event data for VaultCreated', created) return + + # stop trying to create the vault + chain_id = current_chain.get().id + db_req = db.session.get(VaultCreationRequest, (chain_id, owner, num)) + if db_req is None: + log.warning(f'could not find vault creation request {chain_id}|{owner}|{num}') + else: + db_req.vault = addr + # Verify the authenticity of the vault. We are permissive on Mockchain due to irregular restarts of various components if not await verify_vault(addr, owner, num): log.warning(f'Discarding rogue vault {addr}') return vault_owners[addr] = owner log.debug(f'VaultCreated {owner} #{num} => {addr}') - vaults = [] - for num in range(MAX_VAULTS): - addr = vault_address(owner, num) - if addr in vault_owners: - vaults.append(addr) - else: - break - current_pub.get()(f'{current_chain.get().id}|{owner}', 'vaults', vaults) + publish_vaults(chain_id, owner) async def handle_vault_impl_changed(upgrade: EventData): diff --git a/src/dexorder/memcache/memcache_state.py b/src/dexorder/memcache/memcache_state.py index 16d37a2..f112b33 100644 --- a/src/dexorder/memcache/memcache_state.py +++ b/src/dexorder/memcache/memcache_state.py @@ -13,6 +13,7 @@ from dexorder.blockstate.diff import DiffEntryItem from dexorder.blockstate.fork import Fork from dexorder.blockstate.state import compress_diffs from dexorder.memcache import current_redis, memcache +from dexorder.util import hexstr from dexorder.util.async_util import maywait from dexorder.util.json import json_encoder @@ -101,8 +102,9 @@ class RedisState (SeriesCollection): for series, keys in hdels.items(): r.hdel(series, *keys) block_series = f'{chain_id}|head' - r.json(json_encoder).set(block_series,'$',[fork.height, fork.head]) - pubs.append((str(chain_id), 'head', [fork.height, fork.head])) + headstr = hexstr(fork.head) + r.json(json_encoder).set(block_series,'$',[fork.height, headstr]) + pubs.append((str(chain_id), 'head', [fork.height, headstr])) # separate batch for pubs if pubs: await publish_all(pubs) diff --git a/src/dexorder/ofac.py b/src/dexorder/ofac.py new file mode 100644 index 0000000..a7cba8a --- /dev/null +++ b/src/dexorder/ofac.py @@ -0,0 +1,128 @@ +import json +import logging +import re +import xml.etree.ElementTree as ET +from datetime import datetime, timedelta +from pytz import timezone + +import requests +from eth_utils import to_checksum_address + +from dexorder import db +from dexorder.base.chain import current_chain +from dexorder.database.model.ofac import OFAC + +log = logging.getLogger(__name__) +deny = set() +ofac_enabled = True + +def init_ofac(): + if current_chain.get().id in [1337,31337]: + log.info(f'OFAC disabled on test network {current_chain.get()}') + global ofac_enabled + ofac_enabled = False + return + global deny + deny = set(o.address for o in db.session.query(OFAC).all()) + update_ofac() + +# feature_ids are the OFAC feature integer ID's for cryptocurrency features e.g. "Digital Currency - BTC" +def save_ofac_meta(date: datetime, feature_ids: set[int]): + # todo tim debug + pass + # db.kv['ofac_meta'] = dict(date=date.isoformat(), feature_ids=sorted(feature_ids)) + +def get_ofac_meta(): + # found = db.kv.get('ofac_meta') + found = None # todo tim debug + if found is None: + return None, set() + [date, feature_ids] = found + date = datetime.fromisoformat(date) + return date, set(feature_ids) + +def add_ofac_denial(addr): + db.session.add(OFAC(addr)) + deny.add(addr) + # todo push to redis + +def process_ofac_xml(xml: str, date: datetime = None, feature_ids: set[int] = None): + if date is None or feature_ids is None: + [date, feature_ids] = get_ofac_meta() + + # Parse the XML string + doc = ET.parse(xml) + ns = {'o':'https://sanctionslistservice.ofac.treas.gov/api/PublicationPreview/exports/ENHANCED_XML'} + new_date = datetime.fromisoformat(doc.find('./o:publicationInfo/o:dataAsOf', ns).text) + if date is not None and new_date <= date: + if new_date < date: + log.debug('ignoring old OFAC XML') + return + for ft in doc.findall('./o:featureTypes/o:featureType', ns): + found = re.match(r'Digital Currency Address - (.+)', ft.find('./o:type', ns).text) + if found: + currency = found.group(1) + feature_id = int(ft.attrib['featureTypeId']) + print(currency, feature_id) + feature_ids.add(feature_id) + for e in doc.findall('./o:entities/o:entity', ns): + for f in e.findall('./o:features/o:feature', ns): + t = f.find('./o:type', ns) + feature_id = int(t.attrib['featureTypeId']) + if feature_id in feature_ids: + addr = f.find('./o:value', ns).text + if addr.startswith('0x'): + check = to_checksum_address(addr) + add_ofac_denial(check) + print(check) + save_ofac_meta(new_date, feature_ids) + return new_date, feature_ids + + +def day(date, tz=None): + if tz is None: + tz = date.tzinfo + return datetime(date.year, date.month, date.day, tzinfo=tz) + + +treasury_tz = timezone('America/New_York') + + +def update_ofac(): + if not ofac_enabled: + return + date: datetime + [date, feature_ids] = get_ofac_meta() + if date is None: + # fetch and process the full dataset + log.info(f'initializing OFAC') + url = 'https://sanctionslistservice.ofac.treas.gov/entities' + xml = requests.get(url).text + process_ofac_xml(xml, date, feature_ids) + else: + # fetch only the changes since last time + now = day(date) + last = day(datetime.now(date.tzinfo)) + publications = [] + while now <= last: + url = f'https://sanctionslistservice.ofac.treas.gov/changes/history/{now.year}/{now.month}/{now.day}' + for pub in json.loads(requests.get(url).text): + # {"publicationID":407,"datePublished":"2024-11-19T10:03:00.790295"} + # NO TIMEZONE IN THIS RESPONSE :((((( + pub_id = pub['publicationId'] + pub_date = day(pub['datePublished'], treasury_tz) + if pub_date > date: + publications.append(pub_id) + now += timedelta(days=1) + if not publications: + log.info(f'OFAC table is current. Last publication date: {date}') + else: + for pub_id in publications: + url = f'https://sanctionslistservice.ofac.treas.gov/changes/{pub_id}' + xml = requests.get(url).text + date, feature_ids = process_ofac_xml(xml, date, feature_ids) + log.info(f'OFAC updated with publication {pub_id} {date}') + + +if __name__ == '__main__': + process_ofac_xml('/tmp/entities.xml') diff --git a/src/dexorder/order/__init__.py b/src/dexorder/order/__init__.py index b982d94..4a7f0e5 100644 --- a/src/dexorder/order/__init__.py +++ b/src/dexorder/order/__init__.py @@ -1,5 +1,3 @@ -from .executionhandler import TrancheExecutionHandler # do not remove. ensures the handler is registered. - def order_key(vault:str, ): return f'{vault}' diff --git a/src/dexorder/order/executionhandler.py b/src/dexorder/order/executionhandler.py index f4144b0..171b483 100644 --- a/src/dexorder/order/executionhandler.py +++ b/src/dexorder/order/executionhandler.py @@ -1,26 +1,68 @@ import logging -from typing import Optional +from dataclasses import dataclass +from typing import Optional, Union, Any from uuid import UUID from web3.exceptions import ContractPanicError, ContractLogicError from web3.types import EventData from dexorder import db -from dexorder.base import TransactionReceiptDict +from dexorder.accounting import accounting_transaction_gas +from dexorder.base import TransactionReceiptDict, TransactionRequest, transaction_request_deserializers from dexorder.base.order import TrancheKey, OrderKey from dexorder.base.orderlib import PriceProof from dexorder.contract.dexorder import get_dexorder_contract +from dexorder.database.model.accounting import AccountingSubcategory from dexorder.database.model.transaction import TransactionJob from dexorder.order.orderstate import Order from dexorder.order.triggers import (inflight_execution_requests, OrderTriggers, TrancheState, active_tranches, order_error) -from dexorder.transactions import TransactionHandler, TrancheExecutionRequest, submit_transaction_request, \ - new_tranche_execution_request +from dexorder.transactions import TransactionHandler, submit_transaction_request from dexorder.util import hexbytes log = logging.getLogger(__name__) +@dataclass +class TrancheExecutionRequest (TransactionRequest): + TYPE = 'te' + + # type='te' for tranche execution + vault: str + order_index: int + tranche_index: int + price_proof: Union[None,dict,tuple[int]] + + def __init__(self, vault: str, order_index: int, tranche_index: int, price_proof: Union[None,dict,tuple[int]], **_): + super().__init__(TrancheExecutionRequest.TYPE, (vault, order_index, tranche_index)) + self.vault = vault + self.order_index = order_index + self.tranche_index = tranche_index + self.price_proof = price_proof + + def key(self) -> Any: + return self.vault, self.order_index, self.tranche_index + + @property + def order_key(self): + return OrderKey(self.vault, self.order_index) + + @property + def tranche_key(self): + return TrancheKey(self.vault, self.order_index, self.tranche_index) + + +# Must register the class for deserialization +# noinspection PyTypeChecker +transaction_request_deserializers[TrancheExecutionRequest.TYPE] = lambda **data: TrancheExecutionRequest(data['vault'], data['order_index'], data['tranche_index'], data['price_proof'] if 'price_proof' in data else None) + + +def new_tranche_execution_request(tk: TrancheKey, proof: Optional[PriceProof]=None) -> TrancheExecutionRequest: + if proof is None: + proof = PriceProof(0) + return TrancheExecutionRequest(tk.vault, tk.order_index, tk.tranche_index, proof.dump()) + + class TrancheExecutionHandler (TransactionHandler): def __init__(self): super().__init__('te') @@ -41,8 +83,8 @@ class TrancheExecutionHandler (TransactionHandler): raise exception async def complete_transaction(self, job: TransactionJob, receipt: TransactionReceiptDict) -> None: - # we handle execution results using the DexorderExecution event, so there's nothing to do here. - pass + # we handle execution results using the DexorderExecution event. here, only accounting is required + await accounting_transaction_gas(receipt, AccountingSubcategory.Execution) async def transaction_exception(self, job: TransactionJob, e: Exception) -> None: log.error('Could not build execution transaction due to exception', exc_info=e) @@ -164,7 +206,8 @@ def execute_tranches(): def create_execution_request(tk: TrancheKey, proof: PriceProof): inflight_execution_requests.add(tk) job = submit_transaction_request(new_tranche_execution_request(tk, proof)) - log.debug(f'Executing {tk} as job {job.id}') + if job is not None: + log.debug(f'Executing {tk} as job {job.id}') return job diff --git a/src/dexorder/pools.py b/src/dexorder/pools.py index fabc795..5378f32 100644 --- a/src/dexorder/pools.py +++ b/src/dexorder/pools.py @@ -1,12 +1,13 @@ import asyncio import logging +from dataclasses import dataclass from datetime import datetime from typing import Optional from web3.exceptions import ContractLogicError from web3.types import EventData -from dexorder import dec, ADDRESS_0, from_timestamp, db +from dexorder import dec, ADDRESS_0, from_timestamp, db, config, NATIVE_TOKEN from dexorder.addrmeta import address_metadata from dexorder.base.chain import current_chain from dexorder.base.orderlib import Exchange @@ -15,7 +16,7 @@ from dexorder.blockstate import BlockDict from dexorder.blockstate.blockdata import K, V from dexorder.database.model import Pool from dexorder.database.model.pool import OldPoolDict -from dexorder.tokens import get_token +from dexorder.tokens import get_token, adjust_decimals as adj_dec from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address log = logging.getLogger(__name__) @@ -23,6 +24,7 @@ log = logging.getLogger(__name__) async def get_pool(address: str) -> OldPoolDict: try: + # noinspection PyTypeChecker result: OldPoolDict = address_metadata[address] except KeyError: result = address_metadata[address] = await load_pool(address) @@ -61,6 +63,7 @@ async def load_pool(address: str, *, use_db=True) -> OldPoolDict: base=t0, quote=t1, fee=fee, decimals=decimals) log.debug(f'new UniswapV3 pool {token0["symbol"]}/{token1["symbol"]} {fee/1_000_000:.2%} ' f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}') + add_mark_pool(address, t0, t1, fee) except ContractLogicError: pass except ValueError as v: @@ -129,3 +132,88 @@ async def get_uniswap_data(swap: EventData) -> Optional[tuple[OldPoolDict, datet timestamp = await get_block_timestamp(swap['blockHash']) dt = from_timestamp(timestamp) return pool, dt, price + + +# +# Mark-to-Market +# + +@dataclass +class MarkPool: + addr: str + base: str + quote: str + fee: int + inverted: bool + +mark_pools: dict[str, MarkPool] = {} + +quotes = [] # ordered list of preferred quote tokens + + +def add_mark_pool(addr: str, base: str, quote: str, fee: int): + """ + Called for every discovered pool, this function registers the pool if it connects to a stablecoin or quotecoin and + has a better fee than other pools for that pair. + """ + # determine inversion + try: + base_index = quotes.index(base) + except ValueError: + base_index = None + try: + quote_index = quotes.index(quote) + except ValueError: + quote_index = None + if base_index is None and quote_index is None: + return + inverted = base_index is not None and (quote_index is None or base_index < quote_index) + if inverted: + base, quote = quote, base + + # determine whether this pool is better than the already registered mark pool + add = False + if base not in mark_pools: + add = True + else: + mp = mark_pools[base] + mp_index = quotes.index(mp.quote) + try: + index = quotes.index(quote) + if index < mp_index or index == mp_index and fee < mp.fee: + add = True + except ValueError: + pass + if add: + pool = MarkPool(addr, base, quote, fee, inverted) + mark_pools[base] = pool + if base == config.nativecoin: + mark_pools[NATIVE_TOKEN] = pool + + +async def mark_to_market_adj_dec(token: str, amount: dec, adjust_decimals=True) -> Optional[dec]: + """ + Returns the current USD value for the amount of token. + """ + if not accounting_initialized: + await initialize_accounting() + if adjust_decimals: + amount = await adj_dec(token, amount) + return mark_to_market(token, amount) + + +def mark_to_market(token: str, amount: dec) -> Optional[dec]: + """ + amount must already be adjusted for decimals + """ + if token in config.stablecoins: + return dec(1) * amount + try: + mp = mark_pools[token] + except KeyError: + log.info(f'no mark pool for token {token}') + return None + price = pool_prices[mp.addr] + value = amount / price if mp.inverted else amount * price + return mark_to_market(mp.quote, value) + diff --git a/src/dexorder/tokens.py b/src/dexorder/tokens.py index b7d5816..c8c73c5 100644 --- a/src/dexorder/tokens.py +++ b/src/dexorder/tokens.py @@ -4,7 +4,7 @@ from typing import Optional from eth_abi.exceptions import InsufficientDataBytes from web3.exceptions import BadFunctionCallOutput -from dexorder import ADDRESS_0, config, db +from dexorder import ADDRESS_0, db, NATIVE_TOKEN, dec, current_w3 from dexorder.addrmeta import address_metadata from dexorder.base.chain import current_chain from dexorder.contract import ERC20, ContractProxy, CONTRACT_ERRORS @@ -15,11 +15,33 @@ from dexorder.metadata import get_metadata log = logging.getLogger(__name__) -# todo needs chain_id +async def get_balance(addr, token_addr, *, adjust_decimals=True) -> dec: + if token_addr == NATIVE_TOKEN: + return await get_native_balance(addr, adjust_decimals=adjust_decimals) + else: + return await get_erc20_balance(addr, token_addr, adjust_decimals=adjust_decimals) + + +async def get_erc20_balance(addr, token_addr, *, adjust_decimals=True): + value = dec(await ERC20(token_addr).balanceOf(addr)) + if adjust_decimals: + token = await get_token(token_addr) + value *= dec(10) ** dec(-token['decimals']) + return value + + +async def get_native_balance(addr, *, adjust_decimals=True) -> dec: + value = dec(await current_w3.get().eth.get_balance(addr)) + if adjust_decimals: + value /= 10 ** 18 + return value + + async def get_token(address) -> Optional[OldTokenDict]: if address == ADDRESS_0: raise ValueError('No token at address 0') try: + # noinspection PyTypeChecker return address_metadata[address] except KeyError: result = address_metadata[address] = await load_token(address) @@ -80,3 +102,13 @@ async def load_token(address: str) -> Optional[OldTokenDict]: td['decimals'] = md['d'] log.debug(f'new token {name} {symbol} {address}') return td + + +async def adjust_decimals(token, value): + if token == NATIVE_TOKEN: + decimals = 18 + else: + token = await get_token(token) + decimals = token['decimals'] + value *= dec(10) ** dec(-decimals) + return value diff --git a/src/dexorder/transactions.py b/src/dexorder/transactions.py index d8bc9fa..f866f38 100644 --- a/src/dexorder/transactions.py +++ b/src/dexorder/transactions.py @@ -1,21 +1,20 @@ +import asyncio import logging from abc import abstractmethod -from dataclasses import dataclass -from typing import Union, Optional +from typing import Optional from uuid import uuid4 from sqlalchemy import select from web3.exceptions import TransactionNotFound, ContractPanicError, ContractLogicError from dexorder import db, current_w3, Account -from dexorder.base import TransactionReceiptDict, TransactionRequest, transaction_request_registry +from dexorder.base import TransactionReceiptDict, TransactionRequest from dexorder.base.chain import current_chain -from dexorder.base.order import TrancheKey, OrderKey -from dexorder.base.orderlib import PriceProof from dexorder.blockstate.diff import DiffEntryItem from dexorder.blockstate.fork import current_fork, Fork from dexorder.contract.contract_proxy import ContractTransaction from dexorder.database.model.transaction import TransactionJob, TransactionJobState +from dexorder.util import hexstr log = logging.getLogger(__name__) @@ -32,7 +31,7 @@ class TransactionHandler: TransactionHandler.instances[tag] = self @abstractmethod - async def build_transaction(self, job_id: int, tr: TransactionRequest) -> ContractTransaction: ... + async def build_transaction(self, job_id: int, tr: TransactionRequest) -> Optional[ContractTransaction]: ... @abstractmethod async def complete_transaction(self, job: TransactionJob, receipt: TransactionReceiptDict) -> None: ... @@ -40,50 +39,25 @@ class TransactionHandler: @abstractmethod async def transaction_exception(self, job: TransactionJob, e: Exception) -> None: ... -@dataclass -class TrancheExecutionRequest (TransactionRequest): - TYPE = 'te' - # type='te' for tranche execution - vault: str - order_index: int - tranche_index: int - price_proof: Union[None,dict,tuple[int]] - - def __init__(self, vault: str, order_index: int, tranche_index: int, price_proof: Union[None,dict,tuple[int]], **_): - super().__init__(TrancheExecutionRequest.TYPE) - self.vault = vault - self.order_index = order_index - self.tranche_index = tranche_index - self.price_proof = price_proof - - @property - def order_key(self): - return OrderKey(self.vault, self.order_index) - - @property - def tranche_key(self): - return TrancheKey(self.vault, self.order_index, self.tranche_index) - -# Must register the class for deserialization -transaction_request_registry[TrancheExecutionRequest.TYPE] = TrancheExecutionRequest +in_flight = set() +accounts_in_flight: dict[bytes, Account] = {} # tx_id_bytes: account -def new_tranche_execution_request(tk: TrancheKey, proof: Optional[PriceProof]=None) -> TrancheExecutionRequest: - if proof is None: - proof = PriceProof(0) - return TrancheExecutionRequest(tk.vault, tk.order_index, tk.tranche_index, proof.dump()) - - -def submit_transaction_request(tr: TransactionRequest): +def submit_transaction_request(tr: TransactionRequest) -> Optional[TransactionJob]: """ Once a transaction request has been submitted, it is this module's responsibility to see that it gets mined, at - which point `tr.complete_transaction()` is called with the transaction receipt. - The building of a transaction can also fail, + which point `tr.complete_transaction()` is called with the transaction receipt. If the same-keyed request is + already in-flight, None is returned. """ + key = tr.type, tr.key + if key in in_flight: + log.debug(f'transaction request {tr.key} already in flight') + return None job = TransactionJob(id=uuid4(), chain=current_chain.get(), height=current_fork.get().height, state=TransactionJobState.Requested, request=tr) db.session.add(job) + in_flight.add(key) return job @@ -109,32 +83,51 @@ async def create_and_send_transactions(): job.state = TransactionJobState.Error db.session.add(job) await handler.transaction_exception(job, x) + in_flight.discard((job.request.type, job.request.key)) return except Exception as x: log.warning(f'unable to send transaction for job {job.id}', exc_info=x) return + if ctx is None: + log.info(f'Transaction request {job.request.__class__.__name__} {job.id} declined to build a tx.') + job.state = TransactionJobState.Declined + db.session.add(job) + in_flight.discard((job.request.type, job.request.key)) + return w3 = current_w3.get() - account = Account.get_named(handler.tag) - if account is None: - account = Account.get() + try: + async with asyncio.timeout(1): + account = await Account.acquire() + except asyncio.TimeoutError: + account = None if account is None: log.error(f'No account available for transaction request type "{handler.tag}"') continue await ctx.sign(account) - job.state = TransactionJobState.Signed - job.tx_id = ctx.id_bytes - job.tx_data = ctx.data - db.session.add(job) - log.info(f'servicing job {job.request.__class__.__name__} {job.id} with tx {ctx.id}') + log.info(f'servicing job {job.request.__class__.__name__} {job.id} with account {account.address} nonce {ctx.tx["nonce"]} tx {ctx.id}') + # noinspection PyBroadException try: - sent = await w3.eth.send_raw_transaction(job.tx_data) + sent = await w3.eth.send_raw_transaction(ctx.data) + except ValueError as e: + if e.args[0]['code'] == -32003: + # Nonce too low + log.warning(f'Account {account.address} nonce too low') + account.reset_nonce() + else: + log.exception(f'Failure sending transaction for job {job.id}') + account.release() except: log.exception(f'Failure sending transaction for job {job.id}') # todo pager # todo send state unknown! + account.release() else: - assert sent == job.tx_id + account.tx_id = hexstr(ctx.id_bytes) + accounts_in_flight[ctx.id_bytes] = account job.state = TransactionJobState.Sent + job.tx_id = ctx.id_bytes + job.tx_data = ctx.data + assert sent == job.tx_id db.session.add(job) @@ -149,19 +142,23 @@ async def handle_transaction_receipts(): try: receipt: TransactionReceiptDict = await w3.eth.get_transaction_receipt(job.tx_id) except TransactionNotFound: + return + fork = current_fork.get() + assert fork is not None + if fork.branch.contiguous and receipt['blockHash'] in fork.branch.path or \ + fork.branch.disjoint and receipt['blockNumber'] <= fork.height: + try: + handler = TransactionHandler.of(job.request.type) + except KeyError: + # todo remove bad request? + log.warning(f'ignoring transaction request with bad type "{job.request.type}"') + else: + await handler.complete_transaction(job, receipt) + in_flight.discard((job.request.type, job.request.key)) + try: + accounts_in_flight.pop(job.tx_id).release() + except KeyError: pass - else: - fork = current_fork.get() - assert fork is not None - if fork.branch.contiguous and receipt['blockHash'] in fork.branch.path or \ - fork.branch.disjoint and receipt['blockNumber'] <= fork.height: - try: - handler = TransactionHandler.of(job.request.type) - except KeyError: - # todo remove bad request? - log.warning(f'ignoring transaction request with bad type "{job.request.type}"') - else: - await handler.complete_transaction(job, receipt) def finalize_transactions(_fork: Fork, diffs: list[DiffEntryItem]): diff --git a/src/dexorder/util/__init__.py b/src/dexorder/util/__init__.py index 5b2c070..f8a37fa 100644 --- a/src/dexorder/util/__init__.py +++ b/src/dexorder/util/__init__.py @@ -16,7 +16,7 @@ def align_decimal(value, left_columns) -> str: return ' ' * pad + s -def hexstr(value: Union[HexBytes, bytes, str]): +def hexstr(value: Union[HexBytes, bytes, str]) -> str: """ returns an 0x-prefixed hex string """ if type(value) is HexBytes: return value.hex() diff --git a/src/dexorder/vault_blockdata.py b/src/dexorder/vault_blockdata.py index 90954c1..bfc2ce4 100644 --- a/src/dexorder/vault_blockdata.py +++ b/src/dexorder/vault_blockdata.py @@ -1,6 +1,7 @@ import functools import logging +from dexorder import current_pub from dexorder.base.chain import current_chain from dexorder.blockstate import BlockDict from dexorder.contract import ERC20, CONTRACT_ERRORS @@ -77,3 +78,15 @@ def balance_adjuster(vault, token_address, amount): result[taddr] = new_amt return result return functools.partial(_adjust, vault, token_address, amount) + + +def publish_vaults(chain_id, owner): + vaults = [] + for num in range(MAX_VAULTS): + addr = vault_address(owner, num) + if addr in vault_owners: + vaults.append(addr) + else: + break + log.debug(f'publish_vaults {chain_id} {owner} {vaults}') + current_pub.get()(f'{chain_id}|{owner}', 'vaults', chain_id, owner, vaults) diff --git a/src/dexorder/vaultcreationhandler.py b/src/dexorder/vaultcreationhandler.py new file mode 100644 index 0000000..ae8e27f --- /dev/null +++ b/src/dexorder/vaultcreationhandler.py @@ -0,0 +1,98 @@ +import logging +from dataclasses import dataclass +from typing import Optional, Any + +from eth_utils import to_checksum_address +from web3.exceptions import ContractLogicError + +from dexorder import db +from dexorder.accounting import accounting_transaction_gas +from dexorder.base import TransactionReceiptDict, TransactionRequest, transaction_request_deserializers +from dexorder.base.chain import current_chain +from dexorder.contract import ContractProxy +from dexorder.contract.contract_proxy import ContractTransaction +from dexorder.contract.dexorder import get_factory_contract, vault_address +from dexorder.database.model import TransactionJob +from dexorder.database.model import VaultCreationRequest as DbVaultCreationRequest +from dexorder.database.model.accounting import AccountingSubcategory +from dexorder.transactions import TransactionHandler, submit_transaction_request +from dexorder.vault_blockdata import publish_vaults + +log = logging.getLogger(__name__) + + +@dataclass +class VaultCreationRequest (TransactionRequest): + TYPE = 'vcr' + + chain_id: int + owner: str + num: int + + def __init__(self, chain_id: int, owner: str, num: int): + super().__init__(VaultCreationRequest.TYPE, (chain_id, owner, num)) + self.chain_id = chain_id + self.owner = to_checksum_address(owner) + self.orig_owner = owner # for the database key + self.num = num + + def key(self) -> Any: + return self.chain_id, self.owner, self.num + +def deserialize_vault_creation_request(**data) -> VaultCreationRequest: + return VaultCreationRequest(data['chain_id'], data['owner'], data['num']) + +# Must register the class for deserialization +transaction_request_deserializers[VaultCreationRequest.TYPE] = deserialize_vault_creation_request + +in_flight = set() # (chain, owner, num) + + +class VaultCreationHandler (TransactionHandler): + + def __init__(self): + super().__init__(VaultCreationRequest.TYPE) + + async def build_transaction(self, job_id: int, tr: VaultCreationRequest) -> Optional[ContractTransaction]: + factory = get_factory_contract() + owner_address = to_checksum_address(tr.owner) + try: + return await factory.build.deployVault(owner_address, tr.num) + except ContractLogicError: + in_flight.discard((tr.chain_id, tr.owner, tr.num)) + # maybe the vault already exists? + addr = vault_address(tr.owner, tr.num) + owner = await ContractProxy(addr, 'Vault').owner() + if owner == owner_address: + log.debug(f'detected existing vault at {addr}') + publish_vaults(tr.chain_id, owner) + return None + raise + except Exception: + in_flight.discard((tr.chain_id, tr.owner, tr.num)) + raise + + + async def complete_transaction(self, job: TransactionJob, receipt: TransactionReceiptDict) -> None: + await accounting_transaction_gas(receipt, AccountingSubcategory.VaultCreation) # vault creation gas + # noinspection PyTypeChecker + req: VaultCreationRequest = job.request + in_flight.discard((req.chain_id, req.owner, req.num)) + + + async def transaction_exception(self, job: TransactionJob, e: Exception) -> None: + log.error(f'exception from createVault transaction: {job.tx_id}', exc_info=e) + +VaultCreationHandler() + +last_seen = None + +def handle_vault_creation_requests(): + for req in db.session.query(DbVaultCreationRequest).where( + DbVaultCreationRequest.vault == None, DbVaultCreationRequest.chain==current_chain.get()): + req: DbVaultCreationRequest + key = req.chain.id, req.owner, req.num + if key not in in_flight: + vcr = VaultCreationRequest(*key) + submit_transaction_request(vcr) + in_flight.add(key)