accounting

This commit is contained in:
tim
2025-01-16 20:15:43 -04:00
parent 48fdfeeb3f
commit adebbb833c
38 changed files with 1133 additions and 210 deletions

View File

@@ -0,0 +1,93 @@
"""accounting, vaultcreation, ofac
Revision ID: 509010f13e8b
Revises: 86afa7b6415d
Create Date: 2025-01-03 19:11:22.073682
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
import dexorder.database
import dexorder.database.column_types
# revision identifiers, used by Alembic.
revision: str = '509010f13e8b'
down_revision: Union[str, None] = '86afa7b6415d'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.execute("ALTER TYPE transactionjobstate ADD VALUE 'Declined'")
op.create_table('accounting',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('time', sa.DateTime(), nullable=False),
sa.Column('account', sa.String(), nullable=False),
sa.Column('category', sa.Enum('Transfer', 'Income', 'Expense', 'Trade', 'Special', name='accountingcategory'), nullable=False),
sa.Column('subcategory', sa.Enum('OrderFee', 'GasFee', 'FillFee', 'VaultCreation', 'Execution', 'InitialBalance', name='accountingsubcategory'), nullable=True),
sa.Column('token', sa.String(), nullable=False),
sa.Column('amount', dexorder.database.column_types.DecimalNumeric(), nullable=False),
sa.Column('value', dexorder.database.column_types.DecimalNumeric(), nullable=True),
sa.Column('chain_id', sa.Integer(), nullable=False),
sa.Column('tx_id', sa.String(), nullable=True),
sa.Column('note', sa.String(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_accounting_category'), 'accounting', ['category'], unique=False)
op.create_index(op.f('ix_accounting_subcategory'), 'accounting', ['subcategory'], unique=False)
op.create_index(op.f('ix_accounting_time'), 'accounting', ['time'], unique=False)
op.create_index(op.f('ix_accounting_token'), 'accounting', ['token'], unique=False)
op.create_index(op.f('ix_accounting_account'), 'accounting', ['account'], unique=False)
op.create_table('ofac',
sa.Column('address', sa.String(), nullable=False),
sa.PrimaryKeyConstraint('address')
)
op.create_table('ofacalerts',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('time', sa.DateTime(), nullable=False),
sa.Column('address', sa.String(), nullable=False),
sa.Column('ip', sa.String(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_table('vaultcreationrequest',
sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False),
sa.Column('owner', sa.String(), nullable=False),
sa.Column('num', sa.Integer(), nullable=False),
sa.Column('time', sa.DateTime(), nullable=False),
sa.Column('ipaddr', postgresql.INET(), nullable=False),
sa.Column('vault', sa.String(), nullable=True),
sa.PrimaryKeyConstraint('chain', 'owner', 'num')
)
op.create_index('ix_vault_address_not_null', 'vaultcreationrequest', ['vault'], unique=False, postgresql_where='vault IS NOT NULL')
op.create_table('accounts',
sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False),
sa.Column('address', sa.String(), nullable=False),
sa.Column('kind', sa.Enum('Admin', 'OrderFee', 'GasFee', 'FillFee', 'Execution', name='accountkind'), nullable=False),
sa.Column('balances', dexorder.database.column_types.Balances(astext_type=sa.Text()), server_default='{}', nullable=False),
sa.PrimaryKeyConstraint('chain', 'address')
)
op.create_index(op.f('ix_accounts_kind'), 'accounts', ['kind'], unique=False)
def downgrade() -> None:
op.drop_index(op.f('ix_accounting_account'), table_name='accounting')
op.drop_index(op.f('ix_accounts_kind'), table_name='accounts')
op.drop_table('accounts')
op.drop_index('ix_vault_address_not_null', table_name='vaultcreationrequest', postgresql_where='vault IS NOT NULL')
op.drop_table('vaultcreationrequest')
op.drop_table('ofacalerts')
# op.execute('drop sequence ofacalerts_id_seq') # autoincrement sequence
op.drop_table('ofac')
op.drop_index(op.f('ix_accounting_token'), table_name='accounting')
op.drop_index(op.f('ix_accounting_time'), table_name='accounting')
op.drop_index(op.f('ix_accounting_subcategory'), table_name='accounting')
op.drop_index(op.f('ix_accounting_category'), table_name='accounting')
op.drop_table('accounting')
# op.execute('drop sequence accounting_id_seq') # autoincrement sequence
op.execute('drop type accountkind') # enum type
op.execute('drop type accountingcategory') # enum type
op.execute('drop type accountingsubcategory') # enum type

View File

@@ -10,9 +10,15 @@ level='DEBUG'
[handlers.console]
class='logging.StreamHandler'
formatter='notime'
formatter='default'
stream='ext://sys.stdout'
[formatters.default]
# https://docs.python.org/3/library/logging.html#logrecord-attributes
format='%(asctime)s %(levelname)s %(name)s %(message)s'
# https://docs.python.org/3/library/time.html#time.strftime
datefmt='%Y-%m-%d %H:%M:%S'
[formatters.notime]
# https://docs.python.org/3/library/logging.html#logrecord-attributes
format='%(levelname)s %(name)s %(message)s'

View File

@@ -1,5 +1,8 @@
metadata='metadata.json' # the Dockerfile will move metadata-finaldata.json into positon
account = '${accounts.admin}' # todo switch back to accounts.gas
accounts = [
# dev account #6
'0x92db14e403b83dfe3df233f83dfa3a0d7096f21ca9b0d6d6b8d88b2b4ec1564e', # 0x976EA74026E726554dB657fA54763abd0C3a0aa9
]
rpc_url = '${rpc_urls.arbsep_alchemy}'
mirror_source_rpc_url='${rpc_urls.arbsep_alchemy}'

View File

@@ -10,9 +10,15 @@ level='DEBUG'
[handlers.console]
class='logging.StreamHandler'
formatter='notime'
formatter='default'
stream='ext://sys.stdout'
[formatters.default]
# https://docs.python.org/3/library/logging.html#logrecord-attributes
format='%(asctime)s %(levelname)s %(name)s %(message)s'
# https://docs.python.org/3/library/time.html#time.strftime
datefmt='%Y-%m-%d %H:%M:%S'
[formatters.notime]
# https://docs.python.org/3/library/logging.html#logrecord-attributes
format='%(levelname)s %(name)s %(message)s'

View File

@@ -1,6 +1,18 @@
[accounts]
[rpc_urls]
local='http://localhost:8545'
local_ws='ws://localhost:8545'
arbitrum_alchemy='https://arb-mainnet.g.alchemy.com/v2/opbIf1mSo9GMXLhA1a4nhwtEzscgGdhW'
accounts = [
# these are the first ten dev accounts in anvil/hardhat
# '0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80', # 0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266
# '0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d', # 0x70997970C51812dc3A010C7d01b50e0d17dc79C8
# '0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a', # 0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC
# '0x7c852118294e51e653712a81e05800f419141751be58f605c371e15141b007a6', # 0x90F79bf6EB2c4f870365E785982E1f101E93b906
'0x47e179ec197488593b187f80a00eb0da91f1b9d0b13f8733639f19c30a34926a', # 0x15d34AAf54267DB7D7c367839AAf71A00a2C6A65
'0x8b3a350cf5c34c9194ca85829a2df0ec3153be0318b5e2d3348e872092edffba', # 0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc
# '0x92db14e403b83dfe3df233f83dfa3a0d7096f21ca9b0d6d6b8d88b2b4ec1564e', # 0x976EA74026E726554dB657fA54763abd0C3a0aa9
# '0x4bbbf85ce3377467afe5d46f804f221813b2bb87f24d81f60f1fcdbf7cbf4356', # 0x14dC79964da2C08b23698B3D3cc7Ca32193d9955
# '0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97', # 0x23618e81E3f5cdF7f54C3d65f7FBc0aBf5B21E8f
# '0x2a871d0798f97d79848a013d4936a73bf4cc922c825d33c1cf7073dff6d409c6', # 0xa0Ee7A142d267C1f36714E4a8F75612F20a79720
]

View File

@@ -1,3 +1,2 @@
account='test1'
rpc_url='local'
ws_url='local_ws'

View File

@@ -1,18 +1,19 @@
aiohttp==3.9.5
aiohappyeyeballs==2.4.3
aiohttp==3.11.10
aiosignal==1.3.1
alembic==1.13.3
alembic==1.14.0
annotated-types==0.7.0
antlr4-python3-runtime==4.9.3
asn1crypto==1.5.1
async-lru==2.0.4
attrs==23.2.0
bip-utils==2.9.3
bitarray==2.9.2
bitarray==3.0.0
cachetools==5.5.0
cbor2==5.6.4
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.3.2
charset-normalizer==3.4.0
ckzg==1.0.2
coincurve==20.0.0
crcmod==1.7
@@ -25,7 +26,7 @@ eth-account==0.11.3
eth-bloom==3.0.1
eth-hash==0.7.0
eth-keyfile==0.8.1
eth-keys==0.5.1
eth-keys==0.6.0
eth-rlp==1.0.1
eth-typing==4.4.0
eth-utils==4.1.1
@@ -44,14 +45,15 @@ Mako==1.3.3
MarkupSafe==2.1.5
msgpack-python==0.5.6
multidict==6.0.5
numpy==2.1.2
numpy==2.2.0
oauthlib==3.2.2
omegaconf==2.3.0
orjson==3.10.7
orjson==3.10.12
parsimonious==0.10.0
pdpyras==5.3.0
propcache==0.2.0
protobuf==5.26.1
psycopg2-binary==2.9.9
psycopg2-binary==2.9.10
py-sr25519-bindings==0.2.0
pyasn1==0.6.1
pyasn1_modules==0.4.1
@@ -61,12 +63,13 @@ pydantic==2.9.2
pydantic_core==2.23.4
PyNaCl==1.5.0
python-dateutil==2.9.0.post0
pytz==2024.2
pyunormalize==15.1.0
PyYAML==6.0.1
redis==5.1.1
redis==5.2.1
referencing==0.35.0
regex==2024.4.28
requests==2.31.0
requests==2.32.3
requests-oauthlib==2.0.0
rlp==4.0.1
rpds-py==0.18.0
@@ -74,12 +77,12 @@ rsa==4.9
six==1.16.0
socket.io-emitter==0.1.5.1
sortedcontainers==2.4.0
SQLAlchemy==2.0.35
SQLAlchemy==2.0.36
toolz==0.12.1
types-requests==2.32.0.20240914
typing_extensions==4.11.0
typing_extensions==4.12.2
urllib3==2.2.1
web3==6.20.3
websocket-client==1.8.0
websockets==13.1
yarl==1.9.4
websockets==14.1
yarl==1.17.2

View File

@@ -1,7 +1,7 @@
sqlalchemy
alembic
omegaconf
web3
web3>=6,<7
psycopg2-binary
orjson
sortedcontainers
@@ -28,3 +28,4 @@ typing_extensions
requests
aiohttp
charset-normalizer
pytz

View File

@@ -41,6 +41,7 @@ class _FalseToken (_Token):
NARG = _FalseToken('NARG')
DELETE = _FalseToken('DELETE') # used as a value token to indicate removal of the key
ADDRESS_0 = '0x0000000000000000000000000000000000000000'
NATIVE_TOKEN = '0x0000000000000000000000000000000000000001' # We use 0x01 to indicate the use of native ETH wherever a token address is normally required
WEI = 1
GWEI = 1_000_000_000
ETH = 1_000_000_000_000_000_000

215
src/dexorder/accounting.py Normal file
View File

@@ -0,0 +1,215 @@
import asyncio
import logging
from typing_extensions import Optional
from web3.exceptions import ContractLogicError
from web3.types import EventData
from dexorder import db, dec, NATIVE_TOKEN, from_timestamp, config, ADDRESS_0, now
from dexorder.base import TransactionReceiptDict
from dexorder.base.chain import current_chain
from dexorder.blocks import get_block_timestamp
from dexorder.contract import ContractProxy
from dexorder.contract.dexorder import get_factory_contract, get_mirrorenv, get_mockenv
from dexorder.database.model.accounting import AccountingSubcategory, Accounting, AccountingCategory, AccountKind, \
Accounts
from dexorder.pools import mark_to_market, pool_prices, get_pool, add_mark_pool, quotes
from dexorder.tokens import adjust_decimals as adj_dec, get_token, get_balance
from dexorder.util import hexstr
log = logging.getLogger(__name__)
accounting_initialized = False
# noinspection PyTypeChecker
order_fee_account_addr: str = None
# noinspection PyTypeChecker
gas_fee_account_addr: str = None
# noinspection PyTypeChecker
fill_fee_account_addr: str = None
async def initialize_accounting():
global accounting_initialized
if not accounting_initialized:
await initialize_mark_to_market() # set up mark-to-market first, so accounts can value their initial balances
await initialize_accounts()
accounting_initialized = True
log.info(f'accounting initialized\n\tstablecoins: {config.stablecoins}\n\tquotecoins: {config.quotecoins}\n\tnativecoin: {config.nativecoin}')
async def initialize_accounts():
factory_contract = get_factory_contract()
implementation_address = await factory_contract.implementation()
fee_manager_address = await ContractProxy(implementation_address, 'IVaultImpl').feeManager()
fee_manager = ContractProxy(fee_manager_address, 'IFeeManager')
global order_fee_account_addr, gas_fee_account_addr, fill_fee_account_addr
order_fee_account_addr, gas_fee_account_addr, fill_fee_account_addr = await asyncio.gather(
fee_manager.orderFeeAccount(),
fee_manager.gasFeeAccount(),
fee_manager.fillFeeAccount()
)
# Since this is called by top-level main functions outside the Runner, we trigger an explicit db commit/rollback
try:
# noinspection PyStatementEffect
await initialize_accounts_2()
db.session.commit()
except:
db.session.rollback()
raise
async def initialize_accounts_2():
of_account = ensure_account(order_fee_account_addr, AccountKind.OrderFee)
gf_account = ensure_account(gas_fee_account_addr, AccountKind.GasFee)
ff_account = ensure_account(fill_fee_account_addr, AccountKind.FillFee)
if current_chain.get().id in [1337, 31337]:
log.debug('initializing debug accounts')
await asyncio.gather(
adjust_balance(of_account),
adjust_balance(gf_account),
adjust_balance(ff_account),
)
async def initialize_mark_to_market():
quotes.clear()
quotes.extend(config.stablecoins)
quotes.extend(config.quotecoins)
if not quotes and current_chain.get().id in [1337, 31337]:
weth = meh = usdc = usxd = None
mirror = get_mirrorenv()
if mirror is not None:
num_tokens = await mirror.numTokens()
for i in range(num_tokens):
try:
token_key = await mirror.tokenKeys(i)
except ContractLogicError:
break
else:
mirror_token = await mirror.tokens(token_key)
token = await get_token(mirror_token)
log.info(f'found mirror token {token["symbol"]} {mirror_token}')
if token['symbol'] == 'WETH':
weth = mirror_token
elif token['symbol'] == 'USDC':
usdc = mirror_token
mock = get_mockenv()
if mock is not None:
meh = await mock.COIN()
usxd = await mock.USD()
config.stablecoins = [t for t in (usdc, usxd) if t is not None]
config.quotecoins = [t for t in (weth, meh) if t is not None]
if not config.nativecoin:
config.nativecoin = weth if weth is not None else meh if meh is not None else None
elif not config.nativecoin:
factory = await get_factory_contract()
wrapper = await factory.wrapper()
if wrapper != ADDRESS_0:
config.nativecoin = wrapper
quotes.clear()
quotes.extend(config.stablecoins)
quotes.extend(config.quotecoins)
for addr in pool_prices.keys():
pool = await get_pool(addr)
add_mark_pool(addr, pool['base'], pool['quote'], pool['fee'])
async def handle_feeaccountschanged(fee_accounts: EventData):
try:
global order_fee_account_addr, gas_fee_account_addr, fill_fee_account_addr
order_fee_account_addr = fee_accounts['args']['orderFeeAccount']
gas_fee_account_addr = fee_accounts['args']['gasFeeAccount']
fill_fee_account_addr = fee_accounts['args']['fillFeeAccount']
await initialize_accounts_2()
except KeyError:
log.warning(f'Could not parse FeeAccountsChanged {fee_accounts}')
return
def ensure_account(addr: str, kind: AccountKind):
chain = current_chain.get()
found = db.session.get(Accounts, (chain, addr))
if found:
if found.kind != kind:
log.warning(f'Account {addr} has wrong kind {found.kind} != {kind}')
found.kind = kind
db.session.add(found)
else:
found = Accounts(chain=chain, address=addr, kind=kind, balances={})
db.session.add(found)
return found
async def accounting_transfer(receipt: TransactionReceiptDict, token: str, sender: str, receiver: str, amount: dec, adjust_decimals=True):
block_hash = hexstr(receipt['blockHash'])
tx_id = hexstr(receipt['transactionHash'])
await asyncio.gather(
add_accounting_row( sender, block_hash, tx_id, AccountingCategory.Transfer, None,
token, -amount, receiver, adjust_decimals=adjust_decimals),
add_accounting_row( receiver, block_hash, tx_id, AccountingCategory.Transfer, None,
token, amount, sender, adjust_decimals=adjust_decimals),
)
async def accounting_transaction_gas(receipt: TransactionReceiptDict, subcategory: AccountingSubcategory):
""" Accounts for the gas spent on the given transaction """
await add_accounting_row( receipt['from'],
hexstr(receipt['blockHash']), hexstr(receipt['transactionHash']),
AccountingCategory.Expense, subcategory, NATIVE_TOKEN, -dec(receipt['gasUsed'])
)
async def accounting_placement(order_placed: EventData):
block_hash = hexstr(order_placed['blockHash'])
tx_id = hexstr(order_placed['transactionHash'])
try:
order_fee = int(order_placed['args']['orderFee'])
gas_fee = int(order_placed['args']['gasFee'])
except KeyError:
log.warning(f'Rogue DexorderPlacedEvent in tx {hexstr(tx_id)}')
return
await add_accounting_row( order_fee_account_addr, block_hash, tx_id, AccountingCategory.Income,
AccountingSubcategory.OrderFee, NATIVE_TOKEN, order_fee)
await add_accounting_row( gas_fee_account_addr, block_hash, tx_id, AccountingCategory.Income,
AccountingSubcategory.GasFee, NATIVE_TOKEN, gas_fee)
async def accounting_fill(fill: EventData, out_token: str):
block_hash = hexstr(fill['blockHash'])
tx_id = hexstr(fill['transactionHash'])
fee = int(fill['args']['fillFee'])
await add_accounting_row(fill_fee_account_addr, block_hash, tx_id, AccountingCategory.Income,
AccountingSubcategory.FillFee, out_token, fee, adjust_decimals=True)
async def add_accounting_row(account: str, block_hash: Optional[str], tx_id: Optional[str], category, subcategory, token, amount, note=None,
*, adjust_decimals=True):
if amount == 0:
return
if adjust_decimals:
amount = await adj_dec(token, amount)
# noinspection PyTypeChecker
time = now() if block_hash is None else from_timestamp(await get_block_timestamp(block_hash))
value = mark_to_market(token, amount)
log.debug(f'accounting row {time} {account} {category} {subcategory} {token} {amount} ${value}')
db.session.add(Accounting( account=account,
time=time, category=category, subcategory=subcategory,
token=token, amount=amount, value=value, note=note,
chain_id=current_chain.get().id, tx_id=tx_id,
))
# Adjust database account if it exists
account_db = db.session.get(Accounts, (current_chain.get(), account))
if account_db is not None:
new_amount = account_db.balances.get(token,dec(0)) + amount
if new_amount < 0:
log.error(f'negative balance for account {account} when applying accounting row {time} {category} {subcategory} {token} {amount} ${value}')
account_db.balances[token] = new_amount
db.session.add(account_db) # deep changes would not be detected by the ORM
async def adjust_balance(account: Accounts, token=NATIVE_TOKEN, subcategory=AccountingSubcategory.InitialBalance, note=None):
true_balance = await get_balance(account.address, token)
amount = true_balance - account.balances.get(token,dec(0))
await add_accounting_row(account.address, None, None, AccountingCategory.Special, subcategory, NATIVE_TOKEN, amount, note)

View File

@@ -1,5 +1,6 @@
from abc import abstractmethod
from dataclasses import dataclass
from typing import TypedDict, Union, Type
from typing import TypedDict, Union, Type, Any, Callable
Address = str
Quantity = Union[str,int]
@@ -13,8 +14,13 @@ class TransactionRequest:
"""
type: str
def __init__(self, type: str, key: Any):
self.type = type
self.key = key
# subclasses of TransactionRequest must register their type code here so the appropriate dataclass may be constructed
transaction_request_registry: dict[str, Type[TransactionRequest]] = {}
transaction_request_deserializers: dict[str, Callable[[...],TransactionRequest]] = {}
TransactionDict = TypedDict( 'TransactionDict', {

View File

@@ -1,81 +1,90 @@
from contextvars import ContextVar
from typing import Union, Optional
import asyncio
import logging
from typing import Optional
import eth_account
from eth_account.signers.local import LocalAccount
from web3.middleware import construct_sign_and_send_raw_middleware
from dexorder import NARG, config, current_w3
from dexorder import config, current_w3
from dexorder.base.chain import current_chain
log = logging.getLogger(__name__)
# this is just here for typing the extra .name. the __new__() function returns an eth_account...LocalAccount
# we do it this way because web3py expects a LocalAccount object but we cannot construct one directly with a super()
# call but must instead use a factory :(
class Account (LocalAccount):
_main_account = None
_pool = None
_pool_count = 0
@staticmethod
def get_named(account_name: str) -> Optional['Account']:
account = config.accounts.get(account_name)
return Account.get(account) if account else Account.get()
def get():
"""
Always returns the main account, even if it's busy.
"""
Account._init_pool()
return Account._main_account
@staticmethod
# noinspection PyInitNewSignature
def get(account:[Union,str]=NARG) -> Optional['Account']:
if account is NARG:
account = config.account
if type(account) is not str:
return account
key_str = config.accounts.get(account, account)
async def acquire():
"""
MUST call account.release() after the transaction has completed, to return this Account to the available pool.
"""
Account._init_pool()
log.debug(f'available accounts: {Account._pool.qsize()}')
try:
local_account = eth_account.Account.from_key(key_str)
return Account(local_account, key_str, account)
except ValueError:
try:
# was the key missing a leading '0x'?
fixed = '0x' + key_str
local_account = eth_account.Account.from_key(fixed)
print(f'WARNING: account "{account}" is missing a leading "0x"')
return Account(local_account, fixed, account)
except ValueError:
pass
try:
# was the key an integer posing as a string?
converted = f'{int(key_str):#0{66}x}'
local_account = eth_account.Account.from_key(converted)
print(f'WARNING: account "{account}" is set as an integer instead of a string. Converted to: {converted}')
return Account(local_account, converted, account)
except ValueError:
pass
raise ValueError(f'Could not construct account for name "{account}"')
async with asyncio.timeout(1):
result = await Account._pool.get()
except asyncio.TimeoutError:
log.error('waiting for an available account')
result = await Account._pool.get()
Account._pool_count -= 1
return result
def __init__(self, local_account: LocalAccount, key_str, name: str): # todo chain_id?
@staticmethod
def _init_pool():
if Account._pool is None:
Account._pool = asyncio.Queue()
Account._pool_count = 0
for key in config.accounts:
local_account = eth_account.Account.from_key(key)
account = Account(local_account)
if Account._main_account is None:
Account._main_account = account
Account._pool.put_nowait(account)
Account._pool_count += 1
def __init__(self, local_account: LocalAccount): # todo chain_id?
super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code
self.name = name
self.key_str = key_str
self.chain_id = current_chain.get().id
self.signing_middleware = construct_sign_and_send_raw_middleware(self)
self._nonce: Optional[int] = None
self.tx_id: Optional[str] = None # current transaction id
async def next_nonce(self):
if self._nonce is None:
self._nonce = await current_w3.get().eth.get_transaction_count(self.address, 'pending')
log.debug(f'queried nonce for account {self.address}: {self._nonce}')
else:
self._nonce += 1
return self._nonce
def attach(self, w3):
w3.eth.default_account = self.address
try:
w3.middleware_onion.remove('account_signer')
except ValueError:
pass
w3.middleware_onion.add(self.signing_middleware, 'account_signer')
def reset_nonce(self):
self._nonce = None
def balance(self):
return current_w3.get().eth.get_balance(self.address)
def release(self):
Account._pool.put_nowait(self)
Account._pool_count += 1
def __str__(self):
return self.name
return self.address
current_account: ContextVar[Optional[Account]] = ContextVar('current_account', default=Account.get())
def __hash__(self):
return

View File

@@ -1,6 +1,5 @@
import logging
from contextvars import ContextVar
from datetime import datetime, timezone
from datetime import datetime
import dexorder

View File

@@ -17,8 +17,10 @@ from dexorder.memcache import memcache
from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.order.executionhandler import handle_dexorderexecutions, execute_tranches
from dexorder.order.triggers import activate_orders, end_trigger_updates
from dexorder.accounting import initialize_accounting
from dexorder.runner import BlockStateRunner
from dexorder.transactions import handle_transaction_receipts, finalize_transactions
from dexorder.vaultcreationhandler import handle_vault_creation_requests
log = logging.getLogger('dexorder')
LOG_ALL_EVENTS = False # for debug todo config
@@ -68,6 +70,7 @@ def setup_logevent_triggers(runner):
runner.add_event_trigger(handle_transaction_receipts) # todo handle only the transactions that were posted to this block
runner.add_event_trigger(handle_dexorderexecutions, executions)
runner.add_event_trigger(handle_vault_creation_requests)
runner.add_callback(end_trigger_updates)
runner.add_callback(execute_tranches)
@@ -75,7 +78,7 @@ def setup_logevent_triggers(runner):
# noinspection DuplicatedCode
async def main():
await blockchain.connect()
await blockchain.connect(autosign=False) # the transaction manager checks out accounts and releases them.
redis_state = None
state = None
if memcache:
@@ -103,6 +106,8 @@ async def main():
log.info('initializing redis with root state')
await redis_state.save(state.root_fork, state.diffs_by_branch[state.root_branch.id])
await initialize_accounting()
runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None)
setup_logevent_triggers(runner)
# OHLC printing hard-disabled for main. Use the finaldata process.

View File

@@ -124,9 +124,9 @@ async def main():
if not pools:
log.error('must configure mirror_pools')
return
if config.account is None:
# Dev Account #5
config.account = '0x8b3a350cf5c34c9194ca85829a2df0ec3153be0318b5e2d3348e872092edffba'
if not config.accounts:
# Dev Account #6 0x976EA74026E726554dB657fA54763abd0C3a0aa9
config.accounts = ['0x92db14e403b83dfe3df233f83dfa3a0d7096f21ca9b0d6d6b8d88b2b4ec1564e']
await blockchain.connect(name='target')
mirror_addr = config.mirror_env

View File

@@ -3,11 +3,8 @@ import logging
from random import random
from typing import Any, Optional, Union
import requests
import requests.adapters
# noinspection PyPackageRequirements
from aiohttp import ClientResponseError, ClientSession, BaseConnector, ClientTimeout, TCPConnector
from aiohttp import ClientResponseError, ClientSession, ClientTimeout, TCPConnector
from eth_typing import URI
from hexbytes import HexBytes
from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider
@@ -51,13 +48,14 @@ async def create_w3(rpc_url=None, account=NARG, autosign=True, name='default'):
w3.eth.Contract = _make_contract(w3.eth)
has_account = False
if autosign:
a = Account.get(account)
if a is not None:
if account is NARG:
account = Account.get()
if account is not None:
# noinspection PyTypeChecker
w3.middleware_onion.add(await async_construct_sign_and_send_raw_middleware(a))
w3.eth.default_account = a.address
w3.middleware_onion.add(await async_construct_sign_and_send_raw_middleware(account))
w3.eth.default_account = account.address
has_account = True
log.info(f'{name} w3 configured to autosign as {a.address}')
log.info(f'{name} w3 configured to autosign as {account.address}')
if not has_account:
log.info(f'No account set for {name} w3')
return w3

View File

@@ -136,7 +136,11 @@ async def fetch_block(blockhash, *, chain_id=None) -> Optional[Block]:
chain_id = current_chain.get().id
response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False])
# log.debug(f'fetch_block response {blockhash} {chain_id} {response}')
blockdict: BlockInfo = response['result']
try:
blockdict: BlockInfo = response['result']
except KeyError:
log.error(f'fetch_block got strange response {response}')
return None
if blockdict is None:
log.debug(f'block {blockhash} not found')
return None

View File

@@ -6,7 +6,6 @@ from omegaconf import OmegaConf, DictConfig
from omegaconf.errors import OmegaConfBaseException
from .schema import Config
from .standard_accounts import default_accounts_config
schema = OmegaConf.structured(Config())
@@ -19,7 +18,6 @@ def load_config():
# noinspection PyTypeChecker
result:ConfigDict = OmegaConf.merge(
schema,
load_accounts(),
from_toml('.secret.toml'),
from_toml('dexorder.toml'),
from_toml('config.toml'),
@@ -28,15 +26,6 @@ def load_config():
return result
def load_accounts():
accounts_conf = OmegaConf.create({'accounts': default_accounts_config})
try:
OmegaConf.merge(schema, accounts_conf)
return accounts_conf
except OmegaConfBaseException as _x:
raise ConfigException(f'Error while processing default accounts:\n{_x}')
def from_env(prefix='DEXORDER_'):
merge = {}
for key, value in os.environ.items():

View File

@@ -29,8 +29,7 @@ class Config:
polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead
backfill: int = 0 # if not 0, then runner will initialize an empty database by backfilling from the given block height. Use negative numbers to indicate a number of blocks before the present.
account: Optional[str] = None # may be a private key or an account alias
accounts: Optional[dict[str,str]] = field(default_factory=dict) # account aliases
accounts: list[str] = field(default_factory=list) # the pool of accounts is used round-robin
min_gas: str = '0'
# Order slashing
@@ -48,3 +47,7 @@ class Config:
mirror_env: Optional[str] = None
pagerduty: Optional[str] = None
stablecoins: list[str] = field(default_factory=list) # primary stablecoins which are marked to $1
quotecoins: list[str] = field(default_factory=list) # quote tokens like WETH that have stablecoin markets
nativecoin: Optional[str] = None # used for accounting of native values. e.g. address of WETH

View File

@@ -3,6 +3,7 @@ import logging
from eth_abi.packed import encode_packed
from eth_utils import keccak, to_bytes, to_checksum_address
from typing_extensions import Optional
from dexorder.base.chain import current_chain
from dexorder.contract import ContractProxy
@@ -38,6 +39,14 @@ def get_dexorder_contract() -> ContractProxy:
def get_vault_init_code_hash() -> bytes:
return get_by_chain(_vault_init_code_hash)
def get_mockenv() -> Optional[ContractProxy]:
addr = chain_info.get(str(current_chain.get().id),{}).get('mockenv')
return ContractProxy(addr, 'MockEnv') if addr is not None else None
def get_mirrorenv() -> Optional[ContractProxy]:
addr = chain_info.get(str(current_chain.get().id),{}).get('mirrorenv')
return ContractProxy(addr, 'MirrorEnv') if addr is not None else None
def vault_address(owner, num):
salt = keccak(encode_packed(['address','uint8'],[owner,num]))
contract_address = keccak(

View File

@@ -1,12 +1,14 @@
import dataclasses
import json
import math
from typing import Union
from typing import Union, Optional, Any
from sqlalchemy import TypeDecorator, BIGINT
from sqlalchemy.dialects.postgresql import BYTEA, JSONB
from sqlalchemy import TypeDecorator, BIGINT, Dialect
from sqlalchemy.dialects.postgresql import BYTEA, JSONB, NUMERIC
from sqlalchemy.sql.type_api import _T
from web3 import Web3
from dexorder import Fixed2 as NativeFixed, Blockchain as NativeBlockchain
from dexorder import Fixed2 as NativeFixed, Blockchain as NativeBlockchain, dec
from dexorder.util import hexstr, hexbytes
@@ -74,6 +76,31 @@ def Fixed(bits, dbits, signed=False):
return result
class DecimalNumeric (TypeDecorator):
impl = NUMERIC
def process_bind_param(self, value, dialect):
return value
def process_result_value(self, value, dialect):
return dec(value)
class Balances (TypeDecorator):
"""
Dictionary of decimals keyed by strings
"""
impl = JSONB
def process_bind_param(self, value: Optional[_T], dialect: Dialect) -> Any:
return json.dumps({k: str(v) for k,v in value.items()})
def process_result_value(
self, value: Optional[Any], dialect: Dialect
) -> Optional[_T]:
return {k: dec(v) for k,v in json.loads(value).items()}
class DataclassDictBase(TypeDecorator):
impl = JSONB

View File

@@ -6,3 +6,6 @@ from .dbblock import DbBlock
from .orderindex import OrderIndex
from .pool import Pool
from .token import Token
from .ofac import OFAC, OFACAlerts
from .accounting import Accounting
from .vaultcreationrequest import VaultCreationRequest

View File

@@ -0,0 +1,70 @@
import logging
from datetime import datetime
from decimal import Decimal as dec
from enum import Enum
from sqlalchemy import ForeignKey
from sqlalchemy.orm import Mapped, mapped_column
from typing_extensions import Optional
from dexorder import now
from dexorder.database.column import Blockchain
from dexorder.database.column_types import DecimalNumeric, Balances
from dexorder.database.model import Base
log = logging.getLogger(__name__)
class AccountingCategory (Enum):
Transfer = 0
Income = 1
Expense = 2
Trade = 3
Special = 4
class AccountingSubcategory (Enum):
# Income
OrderFee = 0
GasFee = 1
FillFee = 2
# Expense
VaultCreation = 3
Execution = 4
# Transfer
# Transfers have no subcategories, but the note field will be the address of the other account. Both a debit and a
# credit entry will be created, one for each account participating in the transfer.
# Special Codes
InitialBalance = 5
class Accounting (Base):
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
time: Mapped[datetime] = mapped_column(default=now(), index=True)
account: Mapped[str] = mapped_column(index=True)
category: Mapped[AccountingCategory] = mapped_column(index=True)
subcategory: Mapped[Optional[AccountingSubcategory]] = mapped_column(index=True)
token: Mapped[str] = mapped_column(index=True)
amount: Mapped[dec] = mapped_column(DecimalNumeric)
value: Mapped[Optional[dec]] = mapped_column(DecimalNumeric) # USD value of the amount. If NULL then accounting has not been done for this row.
chain_id: Mapped[int]
tx_id: Mapped[Optional[str]]
note: Mapped[Optional[str]] # format depends on the type of entry
class AccountKind (Enum):
Admin = 0 # administrative (contract deployments etc)
OrderFee = 1 # receives order placement fees
GasFee = 2 # receives gas fees
FillFee = 3 # receives fill fees
Execution = 4 # spends gas
class Accounts(Base):
chain: Mapped[Blockchain] = mapped_column(primary_key=True)
address: Mapped[str] = mapped_column(primary_key=True)
kind: Mapped[AccountKind] = mapped_column(index=True)
balances: Mapped[dict[str, dec]] = mapped_column(Balances, default=dict, server_default="{}")

View File

@@ -0,0 +1,26 @@
import logging
from datetime import datetime
from sqlalchemy.orm import Mapped
from sqlalchemy.testing.schema import mapped_column
from typing_extensions import Optional
from dexorder import now
from dexorder.database.model import Base
log = logging.getLogger(__name__)
# todo check broad country restrictions
class OFAC (Base):
address: Mapped[str] = mapped_column(primary_key=True)
class OFACAlerts (Base):
"""
This table records any time when a banned address tries to use our service.
"""
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
time: Mapped[datetime] = mapped_column(default=now())
address: Mapped[str]
ip: Mapped[Optional[str]]

View File

@@ -39,13 +39,13 @@ class Token (Base):
chain: Mapped[Blockchain] = mapped_column(primary_key=True)
address: Mapped[Address] = mapped_column(primary_key=True)
name: Mapped[str]
name: Mapped[str] # indexed below
symbol: Mapped[str] = mapped_column(index=True)
decimals: Mapped[Uint8]
approved: Mapped[bool] = mapped_column(index=True)
__table_args__ = (
Index('idx_name', 'name', postgresql_using='gist'), # full text search on name
Index('ix_token_name', 'name', postgresql_using='gist'), # full text search on name
)

View File

@@ -5,7 +5,7 @@ from typing import Optional
import sqlalchemy as sa
from sqlalchemy.orm import mapped_column, Mapped
from dexorder.base import TransactionRequest, transaction_request_registry
from dexorder.base import TransactionRequest, transaction_request_deserializers
from dexorder.database.column import Dict, Bytes, UUID_PK, Blockchain
from dexorder.database.column_types import DataclassDict
from dexorder.database.model import Base
@@ -18,6 +18,7 @@ class TransactionJobState (Enum):
Sent = 's' # tx has been delivered to a node
Mined = 'z' # mined on at least one fork, whether reverted or not. todo handle forks that didnt confirm: receipts are per-fork!
Error = 'x' # an exception has prevented this job from sending a transaction
Declined = 'd' # the transaction builder successfully returned None
# noinspection PyProtectedMember
@@ -27,7 +28,7 @@ TransactionJobStateColumnType = sa.Enum(TransactionJobState)
def deserialize_transaction_request(**d):
t = d['type']
Class = transaction_request_registry.get(t)
Class = transaction_request_deserializers.get(t)
if Class is None:
raise ValueError(f'No TransactionRequest for type "{t}"')
# noinspection PyArgumentList

View File

@@ -0,0 +1,25 @@
import logging
from datetime import datetime
from typing import Optional
from sqlalchemy.dialects.postgresql import INET
from sqlalchemy.orm import mapped_column, Mapped
from sqlalchemy.schema import Index
from dexorder.database.column import Blockchain
from dexorder.database.model import Base
log = logging.getLogger(__name__)
class VaultCreationRequest (Base):
chain: Mapped[Blockchain] = mapped_column(primary_key=True)
owner: Mapped[str] = mapped_column(primary_key=True)
num: Mapped[int] = mapped_column(primary_key=True)
time: Mapped[datetime]
ipaddr: Mapped[str] = mapped_column(INET)
vault: Mapped[Optional[str]] = mapped_column(default=None, server_default=None) # filled in after the vault is indeed created
__table_args__ = (
Index("ix_vault_address_not_null", "vault", postgresql_where="vault IS NOT NULL"),
)

View File

@@ -3,12 +3,14 @@ import logging
from web3.types import EventData
from dexorder import current_pub, minutely
from dexorder import db
from dexorder.accounting import accounting_fill, accounting_placement
from dexorder.base.chain import current_chain
from dexorder.base.order import TrancheKey, OrderKey
from dexorder.base.orderlib import SwapOrderState
from dexorder.blocks import get_block_timestamp
from dexorder.contract.dexorder import vault_address, VaultContract, get_factory_contract
from dexorder.contract.dexorder import VaultContract, get_factory_contract
from dexorder.database.model import VaultCreationRequest
from dexorder.impls import get_impl_version
from dexorder.ohlc import ohlcs
from dexorder.order.orderstate import Order
@@ -16,7 +18,7 @@ from dexorder.order.triggers import (OrderTriggers, activate_order, update_balan
update_price_triggers)
from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data
from dexorder.util import hexstr
from dexorder.vault_blockdata import vault_owners, adjust_balance, MAX_VAULTS, verify_vault
from dexorder.vault_blockdata import vault_owners, adjust_balance, verify_vault, publish_vaults
log = logging.getLogger(__name__)
@@ -31,14 +33,16 @@ def init():
async def handle_order_placed(event: EventData):
# event DexorderPlaced (uint64 startOrderIndex, uint8 numOrders);
# event DexorderSwapPlaced (uint64 startOrderIndex, uint8 numOrders, uint);
addr = event['address']
start_index = int(event['args']['startOrderIndex'])
num_orders = int(event['args']['numOrders'])
# todo accounting
order_fee = int(event['args']['orderFee'])
gas_fee = int(event['args']['gasFee'])
log.debug(f'DexorderPlaced {addr} {start_index} {num_orders}')
try:
start_index = int(event['args']['startOrderIndex'])
num_orders = int(event['args']['numOrders'])
except KeyError:
log.warning(f'Rogue DexorderSwapPlaced in tx {hexstr(event["transactionHash"])}')
return
await accounting_placement(event)
log.debug(f'DexorderSwapPlaced {addr} {start_index} {num_orders}')
if not await verify_vault(addr):
log.warning(f'Discarding order from rogue vault {addr}.')
return
@@ -60,17 +64,22 @@ async def handle_swap_filled(event: EventData):
log.debug(f'DexorderSwapFilled {event}')
args = event['args']
vault = event['address']
order_index = args['orderIndex']
tranche_index = args['trancheIndex']
amount_in = args['amountIn']
amount_out = args['amountOut']
fill_fee = args['fillFee']
next_execution_time = args['nextExecutionTime']
try:
order_index = args['orderIndex']
tranche_index = args['trancheIndex']
amount_in = args['amountIn']
amount_out = args['amountOut']
fill_fee = args['fillFee']
next_execution_time = args['nextExecutionTime']
except KeyError:
log.warning(f'Rogue DexorderSwapFilled in tx {hexstr(event["transactionHash"])}')
return
try:
order: Order = Order.of(vault, order_index)
except KeyError:
log.warning(f'DexorderSwapFilled IGNORED due to missing order {vault} {order_index}')
return
await accounting_fill(event, order.order.tokenOut)
order.status.trancheStatus[tranche_index].activationTime = next_execution_time # update rate limit
try:
triggers = OrderTriggers.instances[order.key]
@@ -159,20 +168,22 @@ async def handle_vault_created(created: EventData):
except KeyError:
log.debug('couldnt parse event data for VaultCreated', created)
return
# stop trying to create the vault
chain_id = current_chain.get().id
db_req = db.session.get(VaultCreationRequest, (chain_id, owner, num))
if db_req is None:
log.warning(f'could not find vault creation request {chain_id}|{owner}|{num}')
else:
db_req.vault = addr
# Verify the authenticity of the vault. We are permissive on Mockchain due to irregular restarts of various components
if not await verify_vault(addr, owner, num):
log.warning(f'Discarding rogue vault {addr}')
return
vault_owners[addr] = owner
log.debug(f'VaultCreated {owner} #{num} => {addr}')
vaults = []
for num in range(MAX_VAULTS):
addr = vault_address(owner, num)
if addr in vault_owners:
vaults.append(addr)
else:
break
current_pub.get()(f'{current_chain.get().id}|{owner}', 'vaults', vaults)
publish_vaults(chain_id, owner)
async def handle_vault_impl_changed(upgrade: EventData):

View File

@@ -13,6 +13,7 @@ from dexorder.blockstate.diff import DiffEntryItem
from dexorder.blockstate.fork import Fork
from dexorder.blockstate.state import compress_diffs
from dexorder.memcache import current_redis, memcache
from dexorder.util import hexstr
from dexorder.util.async_util import maywait
from dexorder.util.json import json_encoder
@@ -101,8 +102,9 @@ class RedisState (SeriesCollection):
for series, keys in hdels.items():
r.hdel(series, *keys)
block_series = f'{chain_id}|head'
r.json(json_encoder).set(block_series,'$',[fork.height, fork.head])
pubs.append((str(chain_id), 'head', [fork.height, fork.head]))
headstr = hexstr(fork.head)
r.json(json_encoder).set(block_series,'$',[fork.height, headstr])
pubs.append((str(chain_id), 'head', [fork.height, headstr]))
# separate batch for pubs
if pubs:
await publish_all(pubs)

128
src/dexorder/ofac.py Normal file
View File

@@ -0,0 +1,128 @@
import json
import logging
import re
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from pytz import timezone
import requests
from eth_utils import to_checksum_address
from dexorder import db
from dexorder.base.chain import current_chain
from dexorder.database.model.ofac import OFAC
log = logging.getLogger(__name__)
deny = set()
ofac_enabled = True
def init_ofac():
if current_chain.get().id in [1337,31337]:
log.info(f'OFAC disabled on test network {current_chain.get()}')
global ofac_enabled
ofac_enabled = False
return
global deny
deny = set(o.address for o in db.session.query(OFAC).all())
update_ofac()
# feature_ids are the OFAC feature integer ID's for cryptocurrency features e.g. "Digital Currency - BTC"
def save_ofac_meta(date: datetime, feature_ids: set[int]):
# todo tim debug
pass
# db.kv['ofac_meta'] = dict(date=date.isoformat(), feature_ids=sorted(feature_ids))
def get_ofac_meta():
# found = db.kv.get('ofac_meta')
found = None # todo tim debug
if found is None:
return None, set()
[date, feature_ids] = found
date = datetime.fromisoformat(date)
return date, set(feature_ids)
def add_ofac_denial(addr):
db.session.add(OFAC(addr))
deny.add(addr)
# todo push to redis
def process_ofac_xml(xml: str, date: datetime = None, feature_ids: set[int] = None):
if date is None or feature_ids is None:
[date, feature_ids] = get_ofac_meta()
# Parse the XML string
doc = ET.parse(xml)
ns = {'o':'https://sanctionslistservice.ofac.treas.gov/api/PublicationPreview/exports/ENHANCED_XML'}
new_date = datetime.fromisoformat(doc.find('./o:publicationInfo/o:dataAsOf', ns).text)
if date is not None and new_date <= date:
if new_date < date:
log.debug('ignoring old OFAC XML')
return
for ft in doc.findall('./o:featureTypes/o:featureType', ns):
found = re.match(r'Digital Currency Address - (.+)', ft.find('./o:type', ns).text)
if found:
currency = found.group(1)
feature_id = int(ft.attrib['featureTypeId'])
print(currency, feature_id)
feature_ids.add(feature_id)
for e in doc.findall('./o:entities/o:entity', ns):
for f in e.findall('./o:features/o:feature', ns):
t = f.find('./o:type', ns)
feature_id = int(t.attrib['featureTypeId'])
if feature_id in feature_ids:
addr = f.find('./o:value', ns).text
if addr.startswith('0x'):
check = to_checksum_address(addr)
add_ofac_denial(check)
print(check)
save_ofac_meta(new_date, feature_ids)
return new_date, feature_ids
def day(date, tz=None):
if tz is None:
tz = date.tzinfo
return datetime(date.year, date.month, date.day, tzinfo=tz)
treasury_tz = timezone('America/New_York')
def update_ofac():
if not ofac_enabled:
return
date: datetime
[date, feature_ids] = get_ofac_meta()
if date is None:
# fetch and process the full dataset
log.info(f'initializing OFAC')
url = 'https://sanctionslistservice.ofac.treas.gov/entities'
xml = requests.get(url).text
process_ofac_xml(xml, date, feature_ids)
else:
# fetch only the changes since last time
now = day(date)
last = day(datetime.now(date.tzinfo))
publications = []
while now <= last:
url = f'https://sanctionslistservice.ofac.treas.gov/changes/history/{now.year}/{now.month}/{now.day}'
for pub in json.loads(requests.get(url).text):
# {"publicationID":407,"datePublished":"2024-11-19T10:03:00.790295"}
# NO TIMEZONE IN THIS RESPONSE :(((((
pub_id = pub['publicationId']
pub_date = day(pub['datePublished'], treasury_tz)
if pub_date > date:
publications.append(pub_id)
now += timedelta(days=1)
if not publications:
log.info(f'OFAC table is current. Last publication date: {date}')
else:
for pub_id in publications:
url = f'https://sanctionslistservice.ofac.treas.gov/changes/{pub_id}'
xml = requests.get(url).text
date, feature_ids = process_ofac_xml(xml, date, feature_ids)
log.info(f'OFAC updated with publication {pub_id} {date}')
if __name__ == '__main__':
process_ofac_xml('/tmp/entities.xml')

View File

@@ -1,5 +1,3 @@
from .executionhandler import TrancheExecutionHandler # do not remove. ensures the handler is registered.
def order_key(vault:str, ):
return f'{vault}'

View File

@@ -1,26 +1,68 @@
import logging
from typing import Optional
from dataclasses import dataclass
from typing import Optional, Union, Any
from uuid import UUID
from web3.exceptions import ContractPanicError, ContractLogicError
from web3.types import EventData
from dexorder import db
from dexorder.base import TransactionReceiptDict
from dexorder.accounting import accounting_transaction_gas
from dexorder.base import TransactionReceiptDict, TransactionRequest, transaction_request_deserializers
from dexorder.base.order import TrancheKey, OrderKey
from dexorder.base.orderlib import PriceProof
from dexorder.contract.dexorder import get_dexorder_contract
from dexorder.database.model.accounting import AccountingSubcategory
from dexorder.database.model.transaction import TransactionJob
from dexorder.order.orderstate import Order
from dexorder.order.triggers import (inflight_execution_requests, OrderTriggers,
TrancheState, active_tranches, order_error)
from dexorder.transactions import TransactionHandler, TrancheExecutionRequest, submit_transaction_request, \
new_tranche_execution_request
from dexorder.transactions import TransactionHandler, submit_transaction_request
from dexorder.util import hexbytes
log = logging.getLogger(__name__)
@dataclass
class TrancheExecutionRequest (TransactionRequest):
TYPE = 'te'
# type='te' for tranche execution
vault: str
order_index: int
tranche_index: int
price_proof: Union[None,dict,tuple[int]]
def __init__(self, vault: str, order_index: int, tranche_index: int, price_proof: Union[None,dict,tuple[int]], **_):
super().__init__(TrancheExecutionRequest.TYPE, (vault, order_index, tranche_index))
self.vault = vault
self.order_index = order_index
self.tranche_index = tranche_index
self.price_proof = price_proof
def key(self) -> Any:
return self.vault, self.order_index, self.tranche_index
@property
def order_key(self):
return OrderKey(self.vault, self.order_index)
@property
def tranche_key(self):
return TrancheKey(self.vault, self.order_index, self.tranche_index)
# Must register the class for deserialization
# noinspection PyTypeChecker
transaction_request_deserializers[TrancheExecutionRequest.TYPE] = lambda **data: TrancheExecutionRequest(data['vault'], data['order_index'], data['tranche_index'], data['price_proof'] if 'price_proof' in data else None)
def new_tranche_execution_request(tk: TrancheKey, proof: Optional[PriceProof]=None) -> TrancheExecutionRequest:
if proof is None:
proof = PriceProof(0)
return TrancheExecutionRequest(tk.vault, tk.order_index, tk.tranche_index, proof.dump())
class TrancheExecutionHandler (TransactionHandler):
def __init__(self):
super().__init__('te')
@@ -41,8 +83,8 @@ class TrancheExecutionHandler (TransactionHandler):
raise exception
async def complete_transaction(self, job: TransactionJob, receipt: TransactionReceiptDict) -> None:
# we handle execution results using the DexorderExecution event, so there's nothing to do here.
pass
# we handle execution results using the DexorderExecution event. here, only accounting is required
await accounting_transaction_gas(receipt, AccountingSubcategory.Execution)
async def transaction_exception(self, job: TransactionJob, e: Exception) -> None:
log.error('Could not build execution transaction due to exception', exc_info=e)
@@ -164,7 +206,8 @@ def execute_tranches():
def create_execution_request(tk: TrancheKey, proof: PriceProof):
inflight_execution_requests.add(tk)
job = submit_transaction_request(new_tranche_execution_request(tk, proof))
log.debug(f'Executing {tk} as job {job.id}')
if job is not None:
log.debug(f'Executing {tk} as job {job.id}')
return job

View File

@@ -1,12 +1,13 @@
import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from web3.exceptions import ContractLogicError
from web3.types import EventData
from dexorder import dec, ADDRESS_0, from_timestamp, db
from dexorder import dec, ADDRESS_0, from_timestamp, db, config, NATIVE_TOKEN
from dexorder.addrmeta import address_metadata
from dexorder.base.chain import current_chain
from dexorder.base.orderlib import Exchange
@@ -15,7 +16,7 @@ from dexorder.blockstate import BlockDict
from dexorder.blockstate.blockdata import K, V
from dexorder.database.model import Pool
from dexorder.database.model.pool import OldPoolDict
from dexorder.tokens import get_token
from dexorder.tokens import get_token, adjust_decimals as adj_dec
from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address
log = logging.getLogger(__name__)
@@ -23,6 +24,7 @@ log = logging.getLogger(__name__)
async def get_pool(address: str) -> OldPoolDict:
try:
# noinspection PyTypeChecker
result: OldPoolDict = address_metadata[address]
except KeyError:
result = address_metadata[address] = await load_pool(address)
@@ -61,6 +63,7 @@ async def load_pool(address: str, *, use_db=True) -> OldPoolDict:
base=t0, quote=t1, fee=fee, decimals=decimals)
log.debug(f'new UniswapV3 pool {token0["symbol"]}/{token1["symbol"]} {fee/1_000_000:.2%} '
f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}')
add_mark_pool(address, t0, t1, fee)
except ContractLogicError:
pass
except ValueError as v:
@@ -129,3 +132,88 @@ async def get_uniswap_data(swap: EventData) -> Optional[tuple[OldPoolDict, datet
timestamp = await get_block_timestamp(swap['blockHash'])
dt = from_timestamp(timestamp)
return pool, dt, price
#
# Mark-to-Market
#
@dataclass
class MarkPool:
addr: str
base: str
quote: str
fee: int
inverted: bool
mark_pools: dict[str, MarkPool] = {}
quotes = [] # ordered list of preferred quote tokens
def add_mark_pool(addr: str, base: str, quote: str, fee: int):
"""
Called for every discovered pool, this function registers the pool if it connects to a stablecoin or quotecoin and
has a better fee than other pools for that pair.
"""
# determine inversion
try:
base_index = quotes.index(base)
except ValueError:
base_index = None
try:
quote_index = quotes.index(quote)
except ValueError:
quote_index = None
if base_index is None and quote_index is None:
return
inverted = base_index is not None and (quote_index is None or base_index < quote_index)
if inverted:
base, quote = quote, base
# determine whether this pool is better than the already registered mark pool
add = False
if base not in mark_pools:
add = True
else:
mp = mark_pools[base]
mp_index = quotes.index(mp.quote)
try:
index = quotes.index(quote)
if index < mp_index or index == mp_index and fee < mp.fee:
add = True
except ValueError:
pass
if add:
pool = MarkPool(addr, base, quote, fee, inverted)
mark_pools[base] = pool
if base == config.nativecoin:
mark_pools[NATIVE_TOKEN] = pool
async def mark_to_market_adj_dec(token: str, amount: dec, adjust_decimals=True) -> Optional[dec]:
"""
Returns the current USD value for the amount of token.
"""
if not accounting_initialized:
await initialize_accounting()
if adjust_decimals:
amount = await adj_dec(token, amount)
return mark_to_market(token, amount)
def mark_to_market(token: str, amount: dec) -> Optional[dec]:
"""
amount must already be adjusted for decimals
"""
if token in config.stablecoins:
return dec(1) * amount
try:
mp = mark_pools[token]
except KeyError:
log.info(f'no mark pool for token {token}')
return None
price = pool_prices[mp.addr]
value = amount / price if mp.inverted else amount * price
return mark_to_market(mp.quote, value)

View File

@@ -4,7 +4,7 @@ from typing import Optional
from eth_abi.exceptions import InsufficientDataBytes
from web3.exceptions import BadFunctionCallOutput
from dexorder import ADDRESS_0, config, db
from dexorder import ADDRESS_0, db, NATIVE_TOKEN, dec, current_w3
from dexorder.addrmeta import address_metadata
from dexorder.base.chain import current_chain
from dexorder.contract import ERC20, ContractProxy, CONTRACT_ERRORS
@@ -15,11 +15,33 @@ from dexorder.metadata import get_metadata
log = logging.getLogger(__name__)
# todo needs chain_id
async def get_balance(addr, token_addr, *, adjust_decimals=True) -> dec:
if token_addr == NATIVE_TOKEN:
return await get_native_balance(addr, adjust_decimals=adjust_decimals)
else:
return await get_erc20_balance(addr, token_addr, adjust_decimals=adjust_decimals)
async def get_erc20_balance(addr, token_addr, *, adjust_decimals=True):
value = dec(await ERC20(token_addr).balanceOf(addr))
if adjust_decimals:
token = await get_token(token_addr)
value *= dec(10) ** dec(-token['decimals'])
return value
async def get_native_balance(addr, *, adjust_decimals=True) -> dec:
value = dec(await current_w3.get().eth.get_balance(addr))
if adjust_decimals:
value /= 10 ** 18
return value
async def get_token(address) -> Optional[OldTokenDict]:
if address == ADDRESS_0:
raise ValueError('No token at address 0')
try:
# noinspection PyTypeChecker
return address_metadata[address]
except KeyError:
result = address_metadata[address] = await load_token(address)
@@ -80,3 +102,13 @@ async def load_token(address: str) -> Optional[OldTokenDict]:
td['decimals'] = md['d']
log.debug(f'new token {name} {symbol} {address}')
return td
async def adjust_decimals(token, value):
if token == NATIVE_TOKEN:
decimals = 18
else:
token = await get_token(token)
decimals = token['decimals']
value *= dec(10) ** dec(-decimals)
return value

View File

@@ -1,21 +1,20 @@
import asyncio
import logging
from abc import abstractmethod
from dataclasses import dataclass
from typing import Union, Optional
from typing import Optional
from uuid import uuid4
from sqlalchemy import select
from web3.exceptions import TransactionNotFound, ContractPanicError, ContractLogicError
from dexorder import db, current_w3, Account
from dexorder.base import TransactionReceiptDict, TransactionRequest, transaction_request_registry
from dexorder.base import TransactionReceiptDict, TransactionRequest
from dexorder.base.chain import current_chain
from dexorder.base.order import TrancheKey, OrderKey
from dexorder.base.orderlib import PriceProof
from dexorder.blockstate.diff import DiffEntryItem
from dexorder.blockstate.fork import current_fork, Fork
from dexorder.contract.contract_proxy import ContractTransaction
from dexorder.database.model.transaction import TransactionJob, TransactionJobState
from dexorder.util import hexstr
log = logging.getLogger(__name__)
@@ -32,7 +31,7 @@ class TransactionHandler:
TransactionHandler.instances[tag] = self
@abstractmethod
async def build_transaction(self, job_id: int, tr: TransactionRequest) -> ContractTransaction: ...
async def build_transaction(self, job_id: int, tr: TransactionRequest) -> Optional[ContractTransaction]: ...
@abstractmethod
async def complete_transaction(self, job: TransactionJob, receipt: TransactionReceiptDict) -> None: ...
@@ -40,50 +39,25 @@ class TransactionHandler:
@abstractmethod
async def transaction_exception(self, job: TransactionJob, e: Exception) -> None: ...
@dataclass
class TrancheExecutionRequest (TransactionRequest):
TYPE = 'te'
# type='te' for tranche execution
vault: str
order_index: int
tranche_index: int
price_proof: Union[None,dict,tuple[int]]
def __init__(self, vault: str, order_index: int, tranche_index: int, price_proof: Union[None,dict,tuple[int]], **_):
super().__init__(TrancheExecutionRequest.TYPE)
self.vault = vault
self.order_index = order_index
self.tranche_index = tranche_index
self.price_proof = price_proof
@property
def order_key(self):
return OrderKey(self.vault, self.order_index)
@property
def tranche_key(self):
return TrancheKey(self.vault, self.order_index, self.tranche_index)
# Must register the class for deserialization
transaction_request_registry[TrancheExecutionRequest.TYPE] = TrancheExecutionRequest
in_flight = set()
accounts_in_flight: dict[bytes, Account] = {} # tx_id_bytes: account
def new_tranche_execution_request(tk: TrancheKey, proof: Optional[PriceProof]=None) -> TrancheExecutionRequest:
if proof is None:
proof = PriceProof(0)
return TrancheExecutionRequest(tk.vault, tk.order_index, tk.tranche_index, proof.dump())
def submit_transaction_request(tr: TransactionRequest):
def submit_transaction_request(tr: TransactionRequest) -> Optional[TransactionJob]:
"""
Once a transaction request has been submitted, it is this module's responsibility to see that it gets mined, at
which point `tr.complete_transaction()` is called with the transaction receipt.
The building of a transaction can also fail,
which point `tr.complete_transaction()` is called with the transaction receipt. If the same-keyed request is
already in-flight, None is returned.
"""
key = tr.type, tr.key
if key in in_flight:
log.debug(f'transaction request {tr.key} already in flight')
return None
job = TransactionJob(id=uuid4(), chain=current_chain.get(), height=current_fork.get().height,
state=TransactionJobState.Requested, request=tr)
db.session.add(job)
in_flight.add(key)
return job
@@ -109,32 +83,51 @@ async def create_and_send_transactions():
job.state = TransactionJobState.Error
db.session.add(job)
await handler.transaction_exception(job, x)
in_flight.discard((job.request.type, job.request.key))
return
except Exception as x:
log.warning(f'unable to send transaction for job {job.id}', exc_info=x)
return
if ctx is None:
log.info(f'Transaction request {job.request.__class__.__name__} {job.id} declined to build a tx.')
job.state = TransactionJobState.Declined
db.session.add(job)
in_flight.discard((job.request.type, job.request.key))
return
w3 = current_w3.get()
account = Account.get_named(handler.tag)
if account is None:
account = Account.get()
try:
async with asyncio.timeout(1):
account = await Account.acquire()
except asyncio.TimeoutError:
account = None
if account is None:
log.error(f'No account available for transaction request type "{handler.tag}"')
continue
await ctx.sign(account)
job.state = TransactionJobState.Signed
job.tx_id = ctx.id_bytes
job.tx_data = ctx.data
db.session.add(job)
log.info(f'servicing job {job.request.__class__.__name__} {job.id} with tx {ctx.id}')
log.info(f'servicing job {job.request.__class__.__name__} {job.id} with account {account.address} nonce {ctx.tx["nonce"]} tx {ctx.id}')
# noinspection PyBroadException
try:
sent = await w3.eth.send_raw_transaction(job.tx_data)
sent = await w3.eth.send_raw_transaction(ctx.data)
except ValueError as e:
if e.args[0]['code'] == -32003:
# Nonce too low
log.warning(f'Account {account.address} nonce too low')
account.reset_nonce()
else:
log.exception(f'Failure sending transaction for job {job.id}')
account.release()
except:
log.exception(f'Failure sending transaction for job {job.id}')
# todo pager
# todo send state unknown!
account.release()
else:
assert sent == job.tx_id
account.tx_id = hexstr(ctx.id_bytes)
accounts_in_flight[ctx.id_bytes] = account
job.state = TransactionJobState.Sent
job.tx_id = ctx.id_bytes
job.tx_data = ctx.data
assert sent == job.tx_id
db.session.add(job)
@@ -149,19 +142,23 @@ async def handle_transaction_receipts():
try:
receipt: TransactionReceiptDict = await w3.eth.get_transaction_receipt(job.tx_id)
except TransactionNotFound:
return
fork = current_fork.get()
assert fork is not None
if fork.branch.contiguous and receipt['blockHash'] in fork.branch.path or \
fork.branch.disjoint and receipt['blockNumber'] <= fork.height:
try:
handler = TransactionHandler.of(job.request.type)
except KeyError:
# todo remove bad request?
log.warning(f'ignoring transaction request with bad type "{job.request.type}"')
else:
await handler.complete_transaction(job, receipt)
in_flight.discard((job.request.type, job.request.key))
try:
accounts_in_flight.pop(job.tx_id).release()
except KeyError:
pass
else:
fork = current_fork.get()
assert fork is not None
if fork.branch.contiguous and receipt['blockHash'] in fork.branch.path or \
fork.branch.disjoint and receipt['blockNumber'] <= fork.height:
try:
handler = TransactionHandler.of(job.request.type)
except KeyError:
# todo remove bad request?
log.warning(f'ignoring transaction request with bad type "{job.request.type}"')
else:
await handler.complete_transaction(job, receipt)
def finalize_transactions(_fork: Fork, diffs: list[DiffEntryItem]):

View File

@@ -16,7 +16,7 @@ def align_decimal(value, left_columns) -> str:
return ' ' * pad + s
def hexstr(value: Union[HexBytes, bytes, str]):
def hexstr(value: Union[HexBytes, bytes, str]) -> str:
""" returns an 0x-prefixed hex string """
if type(value) is HexBytes:
return value.hex()

View File

@@ -1,6 +1,7 @@
import functools
import logging
from dexorder import current_pub
from dexorder.base.chain import current_chain
from dexorder.blockstate import BlockDict
from dexorder.contract import ERC20, CONTRACT_ERRORS
@@ -77,3 +78,15 @@ def balance_adjuster(vault, token_address, amount):
result[taddr] = new_amt
return result
return functools.partial(_adjust, vault, token_address, amount)
def publish_vaults(chain_id, owner):
vaults = []
for num in range(MAX_VAULTS):
addr = vault_address(owner, num)
if addr in vault_owners:
vaults.append(addr)
else:
break
log.debug(f'publish_vaults {chain_id} {owner} {vaults}')
current_pub.get()(f'{chain_id}|{owner}', 'vaults', chain_id, owner, vaults)

View File

@@ -0,0 +1,98 @@
import logging
from dataclasses import dataclass
from typing import Optional, Any
from eth_utils import to_checksum_address
from web3.exceptions import ContractLogicError
from dexorder import db
from dexorder.accounting import accounting_transaction_gas
from dexorder.base import TransactionReceiptDict, TransactionRequest, transaction_request_deserializers
from dexorder.base.chain import current_chain
from dexorder.contract import ContractProxy
from dexorder.contract.contract_proxy import ContractTransaction
from dexorder.contract.dexorder import get_factory_contract, vault_address
from dexorder.database.model import TransactionJob
from dexorder.database.model import VaultCreationRequest as DbVaultCreationRequest
from dexorder.database.model.accounting import AccountingSubcategory
from dexorder.transactions import TransactionHandler, submit_transaction_request
from dexorder.vault_blockdata import publish_vaults
log = logging.getLogger(__name__)
@dataclass
class VaultCreationRequest (TransactionRequest):
TYPE = 'vcr'
chain_id: int
owner: str
num: int
def __init__(self, chain_id: int, owner: str, num: int):
super().__init__(VaultCreationRequest.TYPE, (chain_id, owner, num))
self.chain_id = chain_id
self.owner = to_checksum_address(owner)
self.orig_owner = owner # for the database key
self.num = num
def key(self) -> Any:
return self.chain_id, self.owner, self.num
def deserialize_vault_creation_request(**data) -> VaultCreationRequest:
return VaultCreationRequest(data['chain_id'], data['owner'], data['num'])
# Must register the class for deserialization
transaction_request_deserializers[VaultCreationRequest.TYPE] = deserialize_vault_creation_request
in_flight = set() # (chain, owner, num)
class VaultCreationHandler (TransactionHandler):
def __init__(self):
super().__init__(VaultCreationRequest.TYPE)
async def build_transaction(self, job_id: int, tr: VaultCreationRequest) -> Optional[ContractTransaction]:
factory = get_factory_contract()
owner_address = to_checksum_address(tr.owner)
try:
return await factory.build.deployVault(owner_address, tr.num)
except ContractLogicError:
in_flight.discard((tr.chain_id, tr.owner, tr.num))
# maybe the vault already exists?
addr = vault_address(tr.owner, tr.num)
owner = await ContractProxy(addr, 'Vault').owner()
if owner == owner_address:
log.debug(f'detected existing vault at {addr}')
publish_vaults(tr.chain_id, owner)
return None
raise
except Exception:
in_flight.discard((tr.chain_id, tr.owner, tr.num))
raise
async def complete_transaction(self, job: TransactionJob, receipt: TransactionReceiptDict) -> None:
await accounting_transaction_gas(receipt, AccountingSubcategory.VaultCreation) # vault creation gas
# noinspection PyTypeChecker
req: VaultCreationRequest = job.request
in_flight.discard((req.chain_id, req.owner, req.num))
async def transaction_exception(self, job: TransactionJob, e: Exception) -> None:
log.error(f'exception from createVault transaction: {job.tx_id}', exc_info=e)
VaultCreationHandler()
last_seen = None
def handle_vault_creation_requests():
for req in db.session.query(DbVaultCreationRequest).where(
DbVaultCreationRequest.vault == None, DbVaultCreationRequest.chain==current_chain.get()):
req: DbVaultCreationRequest
key = req.chain.id, req.owner, req.num
if key not in in_flight:
vcr = VaultCreationRequest(*key)
submit_transaction_request(vcr)
in_flight.add(key)