metrics; --config <file>

This commit is contained in:
tim
2025-02-10 09:22:38 -04:00
parent d838412b2b
commit b22c044028
22 changed files with 274 additions and 44 deletions

View File

@@ -3,13 +3,14 @@ import logging
from contextvars import ContextVar
from datetime import datetime, timezone
from decimal import Decimal
from typing import Callable, Any
from typing import Callable, Any, Union, Optional
from web3 import AsyncWeb3
order_log = logging.getLogger('dexorder.order.log')
dec = Decimal
def now():
return datetime.now(timezone.utc)

View File

@@ -1,12 +1,13 @@
import asyncio
import logging
from typing import Union
from sqlalchemy import select, func, text
from typing_extensions import Optional
from web3.exceptions import ContractLogicError
from web3.types import EventData
from dexorder import db, dec, NATIVE_TOKEN, from_timestamp, config, ADDRESS_0, now, Account
from dexorder import db, dec, NATIVE_TOKEN, from_timestamp, config, ADDRESS_0, now, Account, metric
from dexorder.base import TransactionReceiptDict
from dexorder.base.chain import current_chain
from dexorder.blocks import get_block_timestamp, get_block, current_block
@@ -22,6 +23,11 @@ log = logging.getLogger(__name__)
accounting_initialized = False
_tracked_addrs = set()
def is_tracked_address(addr: str) -> bool:
return addr in _tracked_addrs
class ReconciliationException(Exception):
pass
@@ -58,6 +64,8 @@ async def initialize_accounts_2():
await asyncio.gather(
*map(adjust_balance, (of_account, gf_account, ff_account, *exe_accounts))
)
for db_account in db.session.execute(select(DbAccount)).scalars():
_tracked_addrs.add(db_account.address)
async def initialize_mark_to_market():
@@ -128,13 +136,16 @@ def ensure_account(addr: str, kind: AccountKind) -> DbAccount:
log.warning(f'Account {addr} has wrong kind {found.kind} != {kind}')
found.kind = kind
db.session.add(found)
_tracked_addrs.add(found.address)
else:
found = DbAccount(chain=chain, address=addr, kind=kind, balances={})
db.session.add(found)
_tracked_addrs.add(found.address)
return found
async def accounting_transfer(receipt: TransactionReceiptDict, token: str, sender: str, receiver: str, amount: dec, adjust_decimals=True):
async def accounting_transfer(receipt: TransactionReceiptDict, token: str,
sender: str, receiver: str, amount: Union[dec,int], adjust_decimals=True):
block_hash = hexstr(receipt['blockHash'])
tx_id = hexstr(receipt['transactionHash'])
await asyncio.gather(
@@ -189,18 +200,20 @@ async def add_accounting_row(account: str, block_hash: Optional[str], tx_id: Opt
time = now() if block_hash is None else from_timestamp(await get_block_timestamp(block_hash))
value = mark_to_market(token, amount)
log.debug(f'accounting row {time} {account} {category} {subcategory} {token} {amount} ${value}')
db.session.add(Accounting( account=account,
time=time, category=category, subcategory=subcategory,
token=token, amount=amount, value=value, note=note,
chain_id=current_chain.get().id, tx_id=tx_id,
))
chain_id = current_chain.get().id
db.session.add(Accounting(account=account,
time=time, category=category, subcategory=subcategory,
token=token, amount=amount, value=value, note=note,
chain_id=chain_id, tx_id=tx_id,
))
# Adjust database account if it exists
account_db = db.session.get(DbAccount, (current_chain.get(), account))
if account_db is not None:
if is_tracked_address(account):
account_db = db.session.get(DbAccount, (current_chain.get(), account))
new_amount = account_db.balances.get(token, dec(0)) + amount
if new_amount < 0:
log.error(f'negative balance for account {account} when applying accounting row {time} {category} {subcategory} {token} {amount} ${value}')
account_db.balances[token] = new_amount
metric.account_balance.labels(address=account, token=token).set(new_amount)
db.session.add(account_db) # deep changes would not be detected by the ORM
else:
log.warning(f'No db account found for {account}')

View File

@@ -6,7 +6,7 @@ import eth_account
from eth_account.signers.local import LocalAccount
from web3.middleware import construct_sign_and_send_raw_middleware
from dexorder import config, current_w3
from dexorder import config, current_w3, metric
from dexorder.base.chain import current_chain
log = logging.getLogger(__name__)
@@ -46,6 +46,7 @@ class Account (LocalAccount):
except asyncio.TimeoutError:
log.error('waiting for an available account')
result = await Account._pool.get()
metric.account_available.set(Account._pool.qsize())
return result
@staticmethod
@@ -59,7 +60,8 @@ class Account (LocalAccount):
Account._main_account = account
Account._pool.put_nowait(account)
Account._all.append(account)
metric.account_available.set(Account._pool.qsize())
metric.account_total.set(len(Account._all))
def __init__(self, local_account: LocalAccount): # todo chain_id?
super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code
@@ -83,6 +85,7 @@ class Account (LocalAccount):
return current_w3.get().eth.get_balance(self.address)
def release(self):
metric.account_available.set(Account._pool.qsize() + 1)
Account._pool.put_nowait(self)
def __str__(self):

View File

@@ -1,17 +1,17 @@
import asyncio
import inspect
import logging
import logging.config
import sys
import tomllib
from asyncio import CancelledError
from traceback import print_exception
import asyncio
from signal import Signals
from traceback import print_exception
from typing import Coroutine
import sys
from dexorder import configuration
from dexorder import configuration, config
from dexorder.alert import init_alerts
from dexorder.metric.metric_startup import start_metrics_server
if __name__ == '__main__':
raise Exception('this file is meant to be imported not executed')
@@ -43,9 +43,15 @@ def execute(main:Coroutine, shutdown=None, *, parse_logging=True, parse_args=Tru
log.setLevel(logging.DEBUG)
log.info('Logging configured to default')
if parse_args:
# NOTE: there is special command-line argument handling in config/load.py to get a config filename.
# The -c/--config flag MUST BE FIRST if present.
configuration.parse_args()
init_alerts()
if config.metrics_port:
start_metrics_server()
# loop setup
loop = asyncio.get_event_loop()
signals = Signals.SIGQUIT, Signals.SIGTERM, Signals.SIGINT

View File

@@ -2,6 +2,7 @@ import logging
from asyncio import CancelledError
from dexorder import db, blockchain
from dexorder.alert import warningAlert
from dexorder.base.chain import current_chain
from dexorder.bin.executable import execute
from dexorder.blockstate import current_blockstate
@@ -12,7 +13,7 @@ from dexorder.contract import get_contract_event
from dexorder.contract.dexorder import get_dexorder_contract
from dexorder.event_handler import (init, dump_log, handle_vault_created, handle_order_placed,
handle_transfer, handle_swap_filled, handle_order_canceled, handle_order_cancel_all,
handle_uniswap_swaps, handle_vault_impl_changed)
handle_uniswap_swaps, handle_vault_impl_changed, update_metrics)
from dexorder.memcache import memcache
from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.order.executionhandler import handle_dexorderexecutions, execute_tranches
@@ -74,10 +75,12 @@ def setup_logevent_triggers(runner):
runner.add_callback(end_trigger_updates)
runner.add_callback(execute_tranches)
runner.add_callback(update_metrics)
# noinspection DuplicatedCode
async def main():
warningAlert('Started', 'backend has started', log_level=logging.INFO)
await blockchain.connect(autosign=False) # the transaction manager checks out accounts and releases them.
redis_state = None
state = None

View File

@@ -5,7 +5,7 @@ from sqlalchemy import select
from dexorder import db, blockchain
from dexorder.accounting import reconcile
from dexorder.bin.executable import execute
from dexorder.blocks import current_block, fetch_latest_block
from dexorder.blocks import fetch_latest_block, current_block
from dexorder.database.model import DbAccount
log = logging.getLogger(__name__)
@@ -13,7 +13,8 @@ log = logging.getLogger(__name__)
async def main():
await blockchain.connect()
db.connect()
current_block.set(await fetch_latest_block())
block = await fetch_latest_block()
current_block.set(block)
try:
accounts = db.session.execute(select(DbAccount)).scalars().all()
for account in accounts:

View File

@@ -1,11 +1,13 @@
import logging
from dexorder import blockchain, db
from dexorder.bin.executable import execute
log = logging.getLogger(__name__)
async def main():
pass
await blockchain.connect()
db.connect()
if __name__ == '__main__':

View File

@@ -24,7 +24,8 @@ async def connect(rpc_url=None, account=NARG, autosign=True, name='default'):
"""
w3 = await create_w3(rpc_url, account, autosign, name)
current_w3.set(w3)
current_chain.set(Blockchain.get(await w3.eth.chain_id))
chain = Blockchain.get(await w3.eth.chain_id)
current_chain.set(chain)
return w3

View File

@@ -117,6 +117,14 @@ class BlockData (Generic[T]):
fork = current_fork.get()
state.delete_series(fork, self.series)
def upper_len(self):
"""
Since record values may be marked DELETE there is not an efficient way to know the exact length of a series.
We could track it per-branch but instead we just return the number of keys, which is an upper bound on the
series length.
"""
state = current_blockstate.get()
return state.upper_len(self.series)
class BlockSet(Generic[T], Iterable[T], BlockData[T]):
def __init__(self, series: Any, **tags):

View File

@@ -179,6 +179,8 @@ class BlockState:
difflist.remove(diff.entry)
state_log.info(('removed' if remove_series_diffs else 'promoted')+f' branch {branch}')
def upper_len(self, series):
return len(self.diffs_by_series.get(series, {}))
def get(self, fork: Fork, series, key, default=NARG):
series_diffs = self.diffs_by_series.get(series)

View File

@@ -2,6 +2,7 @@ import os
import tomllib
from tomllib import TOMLDecodeError
import sys
from omegaconf import OmegaConf, DictConfig
from omegaconf.errors import OmegaConfBaseException
@@ -9,6 +10,7 @@ from .schema import Config
schema = OmegaConf.structured(Config())
_config_file = 'dexorder.toml'
class ConfigException (Exception):
pass
@@ -19,7 +21,7 @@ def load_config():
result:ConfigDict = OmegaConf.merge(
schema,
from_toml('.secret.toml'),
from_toml('dexorder.toml'),
from_toml(_config_file),
from_toml('config.toml'),
from_env()
)
@@ -66,5 +68,12 @@ def parse_args(args=None):
class ConfigDict (Config, DictConfig): # give type hints from Config plus methods from DictConfig
pass
# Special command-line argument handling to get a config file. The -c/--config flag MUST BE FIRST.
if sys.argv[1] == '-c' or sys.argv[1] == '--config':
if len(sys.argv) < 3:
raise ConfigException('Missing config file argument')
else:
_config_file = sys.argv[2]
sys.argv = [sys.argv[0], *sys.argv[3:]]
config = load_config()

View File

@@ -18,6 +18,8 @@ class Config:
dump_sql: bool = False
redis_url: Optional[str] = 'redis://localhost:6379'
metrics_port: int = 9090
cache_blocks_in_db: bool = False
metadata: Optional[str] = None
ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk

View File

@@ -7,6 +7,7 @@ from web3.exceptions import Web3Exception
from web3.types import TxReceipt, TxData
from dexorder import current_w3, Account
from dexorder.blocks import current_block
from dexorder.blockstate.fork import current_fork
from dexorder.util import hexstr
@@ -27,9 +28,7 @@ class ContractTransaction:
self.receipt: Optional[TxReceipt] = None # todo could be multiple receipts for different branches!
def __repr__(self):
# todo this is from an old status system
receipt_status = 'IN_FLIGHT' if self.receipt is None else 'REVERTED' if self.receipt.status == 0 else self.receipt.blockNumber
return f'Transaction({self.id},{receipt_status})'
return f'tx-{self.id}'
async def wait(self) -> TxReceipt:
if self.receipt is None:
@@ -59,13 +58,14 @@ class DeployTransaction (ContractTransaction):
def call_wrapper(addr, name, func):
async def f(*args, **kwargs):
async def f(*args, block_identifier=None, **kwargs):
if block_identifier is None:
try:
block_identifier = current_block.get().height
except (LookupError, AttributeError):
block_identifier = 'latest'
try:
blockid = current_fork.get().head_identifier
except (LookupError, AttributeError):
blockid = 'latest'
try:
return await func(*args).call(block_identifier=blockid, **kwargs)
return await func(*args).call(block_identifier=block_identifier, **kwargs)
except Web3Exception as e:
e.args += addr, name
raise e

View File

@@ -3,19 +3,20 @@ import logging
from web3.types import EventData
from dexorder import db
from dexorder.accounting import accounting_fill, accounting_placement
from dexorder import db, metric
from dexorder.accounting import accounting_fill, accounting_placement, accounting_transfer, is_tracked_address
from dexorder.base.chain import current_chain
from dexorder.base.order import TrancheKey, OrderKey
from dexorder.base.orderlib import SwapOrderState
from dexorder.blocks import get_block_timestamp
from dexorder.blockstate import current_blockstate
from dexorder.contract.dexorder import VaultContract, get_factory_contract
from dexorder.database.model import VaultCreationRequest
from dexorder.impls import get_impl_version
from dexorder.ohlc import ohlcs
from dexorder.order.orderstate import Order
from dexorder.order.triggers import (OrderTriggers, activate_order, update_balance_triggers, start_trigger_updates,
update_price_triggers)
update_price_triggers, TimeTrigger, PriceLineTrigger)
from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data
from dexorder.util import hexstr
from dexorder.vault_blockdata import vault_owners, adjust_balance, verify_vault, publish_vaults
@@ -118,13 +119,11 @@ async def handle_order_cancel_all(event: EventData):
async def handle_transfer(transfer: EventData):
# todo handle native transfers incl gas for token transfers
# log.debug(f'Transfer {transfer}')
from_address = transfer['args']['from']
to_address = transfer['args']['to']
if to_address == from_address:
return
amount = int(transfer['args']['value'])
token_address = transfer['address']
if to_address in vault_owners:
log.debug(f'deposit {to_address} {amount}')
vault = to_address
@@ -134,10 +133,11 @@ async def handle_transfer(transfer: EventData):
else:
vault = None
if vault is not None:
token_address = transfer['address']
await adjust_balance(vault, token_address, amount)
await update_balance_triggers(vault, token_address, amount)
if is_tracked_address(to_address):
# noinspection PyTypeChecker
await accounting_transfer(transfer, token_address, from_address, to_address, amount, adjust_decimals=True)
async def handle_uniswap_swaps(swaps: list[EventData]):
# asynchronously prefetch the block timestamps we'll need
@@ -184,6 +184,10 @@ async def handle_vault_created(created: EventData):
vault_owners[addr] = owner
log.debug(f'VaultCreated {owner} #{num} => {addr}')
publish_vaults(chain_id, owner)
# BlockData doesn't have an easy way to calculate exact sizes because some keys could hold DELETE values, so
# this is actually an upper limit on the size.
approx_size = len(current_blockstate.get().diffs_by_series)
metric.vaults.set(approx_size)
async def handle_vault_impl_changed(upgrade: EventData):
@@ -204,3 +208,9 @@ async def handle_vault_impl_changed(upgrade: EventData):
version = await get_impl_version(impl)
log.debug(f'Vault {addr} upgraded to impl version {version}')
def update_metrics():
metric.vaults.set_function(vault_owners.upper_len)
metric.open_orders.set_function(Order.open_orders.upper_len)
metric.triggers_time.set_function(lambda: len(TimeTrigger.all))
metric.triggers_line.set_function(lambda: len(PriceLineTrigger.triggers_set))

View File

@@ -0,0 +1,8 @@
default_labels = dict(
# Default label keys must be defined here but set their actual values later in metrics_startup.py:setup_default_labels()
pod=None,
chain_id=None,
)
from .metric_type import *
from .metrics import *

View File

@@ -0,0 +1,34 @@
import os
from prometheus_client import start_http_server
from dexorder import config, metric, now
from dexorder.base.chain import current_chain
_chain_id = None
def _height_or_none(block):
return None if block is None else block.height
def start_metrics_server():
# First, set default_labels
setup_default_labels()
# Start the http daemon thread
start_http_server(config.metrics_port)
def setup_default_labels():
global _chain_id
_chain_id = current_chain.get().id
metric.default_labels['chain_id'] = _chain_id
metric.default_labels['pod'] = os.environ.get('HOSTNAME', '')
metric.info.info({
# MUST BE STRING VALUES
'started': now().isoformat(),
# 'pod': os.environ.get('HOSTNAME', ''),
# 'chain_id': _chain_id,
})

View File

@@ -0,0 +1,94 @@
__all__ = ['Counter', 'Gauge', 'Summary', 'Histogram', 'Info', 'Enum']
import functools
import prometheus_client as pc
from dexorder import metric
from dexorder.metric import default_labels
def metric_class(OurCls, PcCls, **defaultkw):
def construct(defkw, name, documentation, labelnames=(), namespace='dexorder', **kwargs):
kw = dict(defkw)
kw.update(kwargs)
labelnames = tuple(labelnames) + tuple(metric.default_labels.keys()) + tuple(kw.pop('labelnames', ()))
return OurCls(PcCls(name, documentation, labelnames, namespace, **kw))
return functools.partial(construct,defaultkw)
class _Labeled:
def __init__(self, obj, labels=None):
self._obj = obj
self._labels = labels or {}
def labels(self, **kwargs):
kw = self._labels.copy()
kw.update(kwargs)
return self.__class__(self._obj, kw)
def _apply(self):
labels = default_labels | self._labels
return self._obj.labels(**labels) if labels else self._obj
class _Counter(_Labeled):
def inc(self, amount=1):
self._apply().inc(amount)
Counter = metric_class(_Counter, pc.Counter)
class _Gauge(_Labeled):
def inc(self, amount=1):
self._apply().inc(amount)
def dec(self, amount=1):
self._apply().dec(amount)
def set(self, value):
self._apply().set(value)
def set_to_current_time(self):
self._apply().set_to_current_time()
def set_function(self, f):
self._apply().set_function(f)
Gauge = metric_class(_Gauge, pc.Gauge)
class _Summary(_Labeled):
def observe(self, amount):
self._apply().observe(amount)
Summary = metric_class(_Summary, pc.Summary)
class _Histogram(_Labeled):
def observe(self, amount):
self._apply().observe(amount)
Histogram = metric_class(_Histogram, pc.Histogram)
class _Info(_Labeled):
def info(self, val):
self._apply().info(val)
Info = metric_class(_Info, pc.Info)
class _Enum(_Labeled):
def state(self, state):
self._apply().state(state)
Enum = metric_class(_Enum, pc.Enum)

View File

@@ -0,0 +1,20 @@
from .metric_type import *
# Put any set_function(...) calls in metric_startup.py:automatic_metrics()
info = Info("backend_info", "Information about the backend process")
block_current = Gauge("block_current", "Current block number being processed")
block_latest = Gauge("block_latest", "Highest block number seen")
runner_loops = Counter("runner_loops", "Number of times the runner loop has been completed")
runner_latency = Summary("runner_latency", "How old the current block being processed is, in seconds")
vaults = Gauge("vaults", "Total vault count", )
open_orders = Gauge("open_orders", "Total active orders", )
triggers_time = Gauge("triggers_time", "Total active time triggers", )
triggers_line = Gauge("triggers_line", "Total active line triggers", )
account_balance = Gauge("account_balance", "Account balance", ["address", "token"])
account_total = Gauge('account_total', 'Total number of accounts configured')
account_available = Gauge('account_available', 'Number of accounts that do not have any pending transactions')

View File

@@ -8,7 +8,7 @@ from eth_bloom import BloomFilter
# noinspection PyPackageRequirements
from websockets.exceptions import ConnectionClosedError
from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now, timestamp
from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now, timestamp, metric
from dexorder.base.block import Block, latest_block
from dexorder.base.chain import current_chain, current_clock, BlockClock
from dexorder.blockchain.connection import create_w3_ws, create_w3
@@ -272,6 +272,8 @@ class BlockStateRunner(BlockProgressor):
elif was_behind:
log.info('Runner has caught up')
was_behind = False
metric.runner_latency.observe(behind)
metric.runner_loops.inc()
except Exception as e:
fatal('Unhandled exception in runner worker', exception=e)
finally:
@@ -283,6 +285,7 @@ class BlockStateRunner(BlockProgressor):
w3 = current_w3.get()
current_blockstate.set(self.state)
current_fork.set(fork)
metric.block_current.set(fork.height)
batches = []
pubs = []
session = db.make_session(autocommit=False)
@@ -423,5 +426,6 @@ class BlockStateRunner(BlockProgressor):
def set_latest_block(block):
cache_block(block)
latest_block[block.chain_id] = block
metric.block_latest.set(block.height)
current_clock.get().update(block.timestamp)

View File

@@ -7,6 +7,7 @@ from web3.exceptions import BadFunctionCallOutput
from dexorder import ADDRESS_0, db, NATIVE_TOKEN, dec, current_w3
from dexorder.addrmeta import address_metadata
from dexorder.base.chain import current_chain
from dexorder.blockstate.fork import current_fork
from dexorder.contract import ERC20, ContractProxy, CONTRACT_ERRORS
from dexorder.database.model import Token
from dexorder.database.model.token import OldTokenDict
@@ -34,7 +35,11 @@ async def get_erc20_balance(addr, token_addr, *, adjust_decimals=True):
# noinspection PyShadowingNames
async def get_native_balance(addr, *, adjust_decimals=True) -> dec:
value = dec(await current_w3.get().eth.get_balance(addr))
try:
block_id = current_fork.get().height
except LookupError:
block_id = 'latest'
value = dec(await current_w3.get().eth.get_balance(addr, block_identifier=block_id))
if adjust_decimals:
value /= 10 ** 18
return value

View File

@@ -4,7 +4,7 @@ from asyncio import Queue
from datetime import timedelta
from typing import Union, Callable
from dexorder import config, db, now, current_w3
from dexorder import config, db, now, current_w3, metric
from dexorder.base.block import Block, BlockInfo, latest_block
from dexorder.base.chain import current_chain
from dexorder.blocks import promotion_height
@@ -57,6 +57,7 @@ class BlockWalker (BlockProgressor):
processed_height = cur + config.backfill if config.backfill < 0 else cur
log.info(f'walker starting at block {processed_height}')
metric.block_current.set(processed_height)
last_flush = processed_height if self.flush_type == 'blocks' else now() if self.flush_type == 'time' else None
prev_height = None
session = db.session
@@ -67,6 +68,7 @@ class BlockWalker (BlockProgressor):
latest_blockdata: BlockInfo = await w3.eth.get_block('latest')
latest = Block(chain_id, latest_blockdata)
latest_block[chain_id] = latest
metric.block_latest.set(latest.height)
if prev_height is None or latest.height > prev_height:
prev_height = latest.height
log.debug(f'polled new block {latest.height}')
@@ -93,6 +95,7 @@ class BlockWalker (BlockProgressor):
db.session.commit()
db.session.begin()
processed_height = cur_height
metric.block_current.set(cur_height)
if not self.running or config.walker_stop is not None and config.walker_stop <= processed_height:
break
await asyncio.sleep(config.polling or 1)