diff --git a/requirements.txt b/requirements.txt index 27c7ad9..393263d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ sqlalchemy alembic omegaconf -web3>=6,<7 +web3 psycopg2-binary orjson sortedcontainers @@ -29,3 +29,4 @@ requests aiohttp charset-normalizer pytz +prometheus_client diff --git a/src/dexorder/__init__.py b/src/dexorder/__init__.py index b191ab4..b82970c 100644 --- a/src/dexorder/__init__.py +++ b/src/dexorder/__init__.py @@ -3,13 +3,14 @@ import logging from contextvars import ContextVar from datetime import datetime, timezone from decimal import Decimal -from typing import Callable, Any +from typing import Callable, Any, Union, Optional from web3 import AsyncWeb3 order_log = logging.getLogger('dexorder.order.log') dec = Decimal + def now(): return datetime.now(timezone.utc) diff --git a/src/dexorder/accounting.py b/src/dexorder/accounting.py index af18ec5..29152ed 100644 --- a/src/dexorder/accounting.py +++ b/src/dexorder/accounting.py @@ -1,12 +1,13 @@ import asyncio import logging +from typing import Union from sqlalchemy import select, func, text from typing_extensions import Optional from web3.exceptions import ContractLogicError from web3.types import EventData -from dexorder import db, dec, NATIVE_TOKEN, from_timestamp, config, ADDRESS_0, now, Account +from dexorder import db, dec, NATIVE_TOKEN, from_timestamp, config, ADDRESS_0, now, Account, metric from dexorder.base import TransactionReceiptDict from dexorder.base.chain import current_chain from dexorder.blocks import get_block_timestamp, get_block, current_block @@ -22,6 +23,11 @@ log = logging.getLogger(__name__) accounting_initialized = False +_tracked_addrs = set() + +def is_tracked_address(addr: str) -> bool: + return addr in _tracked_addrs + class ReconciliationException(Exception): pass @@ -58,6 +64,8 @@ async def initialize_accounts_2(): await asyncio.gather( *map(adjust_balance, (of_account, gf_account, ff_account, *exe_accounts)) ) + for db_account in db.session.execute(select(DbAccount)).scalars(): + _tracked_addrs.add(db_account.address) async def initialize_mark_to_market(): @@ -128,13 +136,16 @@ def ensure_account(addr: str, kind: AccountKind) -> DbAccount: log.warning(f'Account {addr} has wrong kind {found.kind} != {kind}') found.kind = kind db.session.add(found) + _tracked_addrs.add(found.address) else: found = DbAccount(chain=chain, address=addr, kind=kind, balances={}) db.session.add(found) + _tracked_addrs.add(found.address) return found -async def accounting_transfer(receipt: TransactionReceiptDict, token: str, sender: str, receiver: str, amount: dec, adjust_decimals=True): +async def accounting_transfer(receipt: TransactionReceiptDict, token: str, + sender: str, receiver: str, amount: Union[dec,int], adjust_decimals=True): block_hash = hexstr(receipt['blockHash']) tx_id = hexstr(receipt['transactionHash']) await asyncio.gather( @@ -189,18 +200,20 @@ async def add_accounting_row(account: str, block_hash: Optional[str], tx_id: Opt time = now() if block_hash is None else from_timestamp(await get_block_timestamp(block_hash)) value = mark_to_market(token, amount) log.debug(f'accounting row {time} {account} {category} {subcategory} {token} {amount} ${value}') - db.session.add(Accounting( account=account, - time=time, category=category, subcategory=subcategory, - token=token, amount=amount, value=value, note=note, - chain_id=current_chain.get().id, tx_id=tx_id, - )) + chain_id = current_chain.get().id + db.session.add(Accounting(account=account, + time=time, category=category, subcategory=subcategory, + token=token, amount=amount, value=value, note=note, + chain_id=chain_id, tx_id=tx_id, + )) # Adjust database account if it exists - account_db = db.session.get(DbAccount, (current_chain.get(), account)) - if account_db is not None: + if is_tracked_address(account): + account_db = db.session.get(DbAccount, (current_chain.get(), account)) new_amount = account_db.balances.get(token, dec(0)) + amount if new_amount < 0: log.error(f'negative balance for account {account} when applying accounting row {time} {category} {subcategory} {token} {amount} ${value}') account_db.balances[token] = new_amount + metric.account_balance.labels(address=account, token=token).set(new_amount) db.session.add(account_db) # deep changes would not be detected by the ORM else: log.warning(f'No db account found for {account}') diff --git a/src/dexorder/base/account.py b/src/dexorder/base/account.py index a0ca521..8c00bd7 100644 --- a/src/dexorder/base/account.py +++ b/src/dexorder/base/account.py @@ -6,7 +6,7 @@ import eth_account from eth_account.signers.local import LocalAccount from web3.middleware import construct_sign_and_send_raw_middleware -from dexorder import config, current_w3 +from dexorder import config, current_w3, metric from dexorder.base.chain import current_chain log = logging.getLogger(__name__) @@ -46,6 +46,7 @@ class Account (LocalAccount): except asyncio.TimeoutError: log.error('waiting for an available account') result = await Account._pool.get() + metric.account_available.set(Account._pool.qsize()) return result @staticmethod @@ -59,7 +60,8 @@ class Account (LocalAccount): Account._main_account = account Account._pool.put_nowait(account) Account._all.append(account) - + metric.account_available.set(Account._pool.qsize()) + metric.account_total.set(len(Account._all)) def __init__(self, local_account: LocalAccount): # todo chain_id? super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code @@ -83,6 +85,7 @@ class Account (LocalAccount): return current_w3.get().eth.get_balance(self.address) def release(self): + metric.account_available.set(Account._pool.qsize() + 1) Account._pool.put_nowait(self) def __str__(self): diff --git a/src/dexorder/bin/executable.py b/src/dexorder/bin/executable.py index e7a982c..a60469a 100644 --- a/src/dexorder/bin/executable.py +++ b/src/dexorder/bin/executable.py @@ -1,17 +1,17 @@ +import asyncio import inspect import logging import logging.config +import sys import tomllib from asyncio import CancelledError -from traceback import print_exception -import asyncio from signal import Signals +from traceback import print_exception from typing import Coroutine -import sys - -from dexorder import configuration +from dexorder import configuration, config from dexorder.alert import init_alerts +from dexorder.metric.metric_startup import start_metrics_server if __name__ == '__main__': raise Exception('this file is meant to be imported not executed') @@ -43,9 +43,15 @@ def execute(main:Coroutine, shutdown=None, *, parse_logging=True, parse_args=Tru log.setLevel(logging.DEBUG) log.info('Logging configured to default') if parse_args: + # NOTE: there is special command-line argument handling in config/load.py to get a config filename. + # The -c/--config flag MUST BE FIRST if present. configuration.parse_args() + init_alerts() + if config.metrics_port: + start_metrics_server() + # loop setup loop = asyncio.get_event_loop() signals = Signals.SIGQUIT, Signals.SIGTERM, Signals.SIGINT diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index b33d282..5fcb283 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -2,6 +2,7 @@ import logging from asyncio import CancelledError from dexorder import db, blockchain +from dexorder.alert import warningAlert from dexorder.base.chain import current_chain from dexorder.bin.executable import execute from dexorder.blockstate import current_blockstate @@ -12,7 +13,7 @@ from dexorder.contract import get_contract_event from dexorder.contract.dexorder import get_dexorder_contract from dexorder.event_handler import (init, dump_log, handle_vault_created, handle_order_placed, handle_transfer, handle_swap_filled, handle_order_canceled, handle_order_cancel_all, - handle_uniswap_swaps, handle_vault_impl_changed) + handle_uniswap_swaps, handle_vault_impl_changed, update_metrics) from dexorder.memcache import memcache from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.order.executionhandler import handle_dexorderexecutions, execute_tranches @@ -74,10 +75,12 @@ def setup_logevent_triggers(runner): runner.add_callback(end_trigger_updates) runner.add_callback(execute_tranches) + runner.add_callback(update_metrics) # noinspection DuplicatedCode async def main(): + warningAlert('Started', 'backend has started', log_level=logging.INFO) await blockchain.connect(autosign=False) # the transaction manager checks out accounts and releases them. redis_state = None state = None diff --git a/src/dexorder/bin/reconcile.py b/src/dexorder/bin/reconcile.py index 6ae4a8e..e12b478 100644 --- a/src/dexorder/bin/reconcile.py +++ b/src/dexorder/bin/reconcile.py @@ -5,7 +5,7 @@ from sqlalchemy import select from dexorder import db, blockchain from dexorder.accounting import reconcile from dexorder.bin.executable import execute -from dexorder.blocks import current_block, fetch_latest_block +from dexorder.blocks import fetch_latest_block, current_block from dexorder.database.model import DbAccount log = logging.getLogger(__name__) @@ -13,7 +13,8 @@ log = logging.getLogger(__name__) async def main(): await blockchain.connect() db.connect() - current_block.set(await fetch_latest_block()) + block = await fetch_latest_block() + current_block.set(block) try: accounts = db.session.execute(select(DbAccount)).scalars().all() for account in accounts: diff --git a/src/dexorder/bin/refill.py b/src/dexorder/bin/refill.py index 55c9106..c203d96 100644 --- a/src/dexorder/bin/refill.py +++ b/src/dexorder/bin/refill.py @@ -1,11 +1,13 @@ import logging +from dexorder import blockchain, db from dexorder.bin.executable import execute log = logging.getLogger(__name__) async def main(): - pass + await blockchain.connect() + db.connect() if __name__ == '__main__': diff --git a/src/dexorder/blockchain/connection.py b/src/dexorder/blockchain/connection.py index 075b921..1e79f2b 100644 --- a/src/dexorder/blockchain/connection.py +++ b/src/dexorder/blockchain/connection.py @@ -24,7 +24,8 @@ async def connect(rpc_url=None, account=NARG, autosign=True, name='default'): """ w3 = await create_w3(rpc_url, account, autosign, name) current_w3.set(w3) - current_chain.set(Blockchain.get(await w3.eth.chain_id)) + chain = Blockchain.get(await w3.eth.chain_id) + current_chain.set(chain) return w3 diff --git a/src/dexorder/blockstate/blockdata.py b/src/dexorder/blockstate/blockdata.py index 9a2e875..ccb83cf 100644 --- a/src/dexorder/blockstate/blockdata.py +++ b/src/dexorder/blockstate/blockdata.py @@ -117,6 +117,14 @@ class BlockData (Generic[T]): fork = current_fork.get() state.delete_series(fork, self.series) + def upper_len(self): + """ + Since record values may be marked DELETE there is not an efficient way to know the exact length of a series. + We could track it per-branch but instead we just return the number of keys, which is an upper bound on the + series length. + """ + state = current_blockstate.get() + return state.upper_len(self.series) class BlockSet(Generic[T], Iterable[T], BlockData[T]): def __init__(self, series: Any, **tags): diff --git a/src/dexorder/blockstate/state.py b/src/dexorder/blockstate/state.py index cc28f10..0af1179 100644 --- a/src/dexorder/blockstate/state.py +++ b/src/dexorder/blockstate/state.py @@ -179,6 +179,8 @@ class BlockState: difflist.remove(diff.entry) state_log.info(('removed' if remove_series_diffs else 'promoted')+f' branch {branch}') + def upper_len(self, series): + return len(self.diffs_by_series.get(series, {})) def get(self, fork: Fork, series, key, default=NARG): series_diffs = self.diffs_by_series.get(series) diff --git a/src/dexorder/configuration/load.py b/src/dexorder/configuration/load.py index 5646284..dd387ac 100644 --- a/src/dexorder/configuration/load.py +++ b/src/dexorder/configuration/load.py @@ -2,6 +2,7 @@ import os import tomllib from tomllib import TOMLDecodeError +import sys from omegaconf import OmegaConf, DictConfig from omegaconf.errors import OmegaConfBaseException @@ -9,6 +10,7 @@ from .schema import Config schema = OmegaConf.structured(Config()) +_config_file = 'dexorder.toml' class ConfigException (Exception): pass @@ -19,7 +21,7 @@ def load_config(): result:ConfigDict = OmegaConf.merge( schema, from_toml('.secret.toml'), - from_toml('dexorder.toml'), + from_toml(_config_file), from_toml('config.toml'), from_env() ) @@ -66,5 +68,12 @@ def parse_args(args=None): class ConfigDict (Config, DictConfig): # give type hints from Config plus methods from DictConfig pass +# Special command-line argument handling to get a config file. The -c/--config flag MUST BE FIRST. +if sys.argv[1] == '-c' or sys.argv[1] == '--config': + if len(sys.argv) < 3: + raise ConfigException('Missing config file argument') + else: + _config_file = sys.argv[2] + sys.argv = [sys.argv[0], *sys.argv[3:]] config = load_config() diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index 33e0ff6..8529096 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -18,6 +18,8 @@ class Config: dump_sql: bool = False redis_url: Optional[str] = 'redis://localhost:6379' + metrics_port: int = 9090 + cache_blocks_in_db: bool = False metadata: Optional[str] = None ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk diff --git a/src/dexorder/contract/contract_proxy.py b/src/dexorder/contract/contract_proxy.py index 5df3a4f..a0a5869 100644 --- a/src/dexorder/contract/contract_proxy.py +++ b/src/dexorder/contract/contract_proxy.py @@ -7,6 +7,7 @@ from web3.exceptions import Web3Exception from web3.types import TxReceipt, TxData from dexorder import current_w3, Account +from dexorder.blocks import current_block from dexorder.blockstate.fork import current_fork from dexorder.util import hexstr @@ -27,9 +28,7 @@ class ContractTransaction: self.receipt: Optional[TxReceipt] = None # todo could be multiple receipts for different branches! def __repr__(self): - # todo this is from an old status system - receipt_status = 'IN_FLIGHT' if self.receipt is None else 'REVERTED' if self.receipt.status == 0 else self.receipt.blockNumber - return f'Transaction({self.id},{receipt_status})' + return f'tx-{self.id}' async def wait(self) -> TxReceipt: if self.receipt is None: @@ -59,13 +58,14 @@ class DeployTransaction (ContractTransaction): def call_wrapper(addr, name, func): - async def f(*args, **kwargs): + async def f(*args, block_identifier=None, **kwargs): + if block_identifier is None: + try: + block_identifier = current_block.get().height + except (LookupError, AttributeError): + block_identifier = 'latest' try: - blockid = current_fork.get().head_identifier - except (LookupError, AttributeError): - blockid = 'latest' - try: - return await func(*args).call(block_identifier=blockid, **kwargs) + return await func(*args).call(block_identifier=block_identifier, **kwargs) except Web3Exception as e: e.args += addr, name raise e diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index 70e9fa7..229275c 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -3,19 +3,20 @@ import logging from web3.types import EventData -from dexorder import db -from dexorder.accounting import accounting_fill, accounting_placement +from dexorder import db, metric +from dexorder.accounting import accounting_fill, accounting_placement, accounting_transfer, is_tracked_address from dexorder.base.chain import current_chain from dexorder.base.order import TrancheKey, OrderKey from dexorder.base.orderlib import SwapOrderState from dexorder.blocks import get_block_timestamp +from dexorder.blockstate import current_blockstate from dexorder.contract.dexorder import VaultContract, get_factory_contract from dexorder.database.model import VaultCreationRequest from dexorder.impls import get_impl_version from dexorder.ohlc import ohlcs from dexorder.order.orderstate import Order from dexorder.order.triggers import (OrderTriggers, activate_order, update_balance_triggers, start_trigger_updates, - update_price_triggers) + update_price_triggers, TimeTrigger, PriceLineTrigger) from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data from dexorder.util import hexstr from dexorder.vault_blockdata import vault_owners, adjust_balance, verify_vault, publish_vaults @@ -118,13 +119,11 @@ async def handle_order_cancel_all(event: EventData): async def handle_transfer(transfer: EventData): - # todo handle native transfers incl gas for token transfers # log.debug(f'Transfer {transfer}') from_address = transfer['args']['from'] to_address = transfer['args']['to'] - if to_address == from_address: - return amount = int(transfer['args']['value']) + token_address = transfer['address'] if to_address in vault_owners: log.debug(f'deposit {to_address} {amount}') vault = to_address @@ -134,10 +133,11 @@ async def handle_transfer(transfer: EventData): else: vault = None if vault is not None: - token_address = transfer['address'] await adjust_balance(vault, token_address, amount) await update_balance_triggers(vault, token_address, amount) - + if is_tracked_address(to_address): + # noinspection PyTypeChecker + await accounting_transfer(transfer, token_address, from_address, to_address, amount, adjust_decimals=True) async def handle_uniswap_swaps(swaps: list[EventData]): # asynchronously prefetch the block timestamps we'll need @@ -184,6 +184,10 @@ async def handle_vault_created(created: EventData): vault_owners[addr] = owner log.debug(f'VaultCreated {owner} #{num} => {addr}') publish_vaults(chain_id, owner) + # BlockData doesn't have an easy way to calculate exact sizes because some keys could hold DELETE values, so + # this is actually an upper limit on the size. + approx_size = len(current_blockstate.get().diffs_by_series) + metric.vaults.set(approx_size) async def handle_vault_impl_changed(upgrade: EventData): @@ -204,3 +208,9 @@ async def handle_vault_impl_changed(upgrade: EventData): version = await get_impl_version(impl) log.debug(f'Vault {addr} upgraded to impl version {version}') + +def update_metrics(): + metric.vaults.set_function(vault_owners.upper_len) + metric.open_orders.set_function(Order.open_orders.upper_len) + metric.triggers_time.set_function(lambda: len(TimeTrigger.all)) + metric.triggers_line.set_function(lambda: len(PriceLineTrigger.triggers_set)) diff --git a/src/dexorder/metric/__init__.py b/src/dexorder/metric/__init__.py new file mode 100644 index 0000000..671b90b --- /dev/null +++ b/src/dexorder/metric/__init__.py @@ -0,0 +1,8 @@ +default_labels = dict( + # Default label keys must be defined here but set their actual values later in metrics_startup.py:setup_default_labels() + pod=None, + chain_id=None, +) + +from .metric_type import * +from .metrics import * diff --git a/src/dexorder/metric/metric_startup.py b/src/dexorder/metric/metric_startup.py new file mode 100644 index 0000000..a5da5e0 --- /dev/null +++ b/src/dexorder/metric/metric_startup.py @@ -0,0 +1,34 @@ +import os + +from prometheus_client import start_http_server + +from dexorder import config, metric, now +from dexorder.base.chain import current_chain + +_chain_id = None + + +def _height_or_none(block): + return None if block is None else block.height + + +def start_metrics_server(): + # First, set default_labels + setup_default_labels() + # Start the http daemon thread + start_http_server(config.metrics_port) + + +def setup_default_labels(): + global _chain_id + _chain_id = current_chain.get().id + metric.default_labels['chain_id'] = _chain_id + metric.default_labels['pod'] = os.environ.get('HOSTNAME', '') + + metric.info.info({ + # MUST BE STRING VALUES + 'started': now().isoformat(), + # 'pod': os.environ.get('HOSTNAME', ''), + # 'chain_id': _chain_id, + }) + diff --git a/src/dexorder/metric/metric_type.py b/src/dexorder/metric/metric_type.py new file mode 100644 index 0000000..984b0f5 --- /dev/null +++ b/src/dexorder/metric/metric_type.py @@ -0,0 +1,94 @@ +__all__ = ['Counter', 'Gauge', 'Summary', 'Histogram', 'Info', 'Enum'] + +import functools + +import prometheus_client as pc + +from dexorder import metric +from dexorder.metric import default_labels + + +def metric_class(OurCls, PcCls, **defaultkw): + def construct(defkw, name, documentation, labelnames=(), namespace='dexorder', **kwargs): + kw = dict(defkw) + kw.update(kwargs) + labelnames = tuple(labelnames) + tuple(metric.default_labels.keys()) + tuple(kw.pop('labelnames', ())) + return OurCls(PcCls(name, documentation, labelnames, namespace, **kw)) + + return functools.partial(construct,defaultkw) + + +class _Labeled: + + def __init__(self, obj, labels=None): + self._obj = obj + self._labels = labels or {} + + def labels(self, **kwargs): + kw = self._labels.copy() + kw.update(kwargs) + return self.__class__(self._obj, kw) + + def _apply(self): + labels = default_labels | self._labels + return self._obj.labels(**labels) if labels else self._obj + + +class _Counter(_Labeled): + def inc(self, amount=1): + self._apply().inc(amount) + + +Counter = metric_class(_Counter, pc.Counter) + + +class _Gauge(_Labeled): + def inc(self, amount=1): + self._apply().inc(amount) + + def dec(self, amount=1): + self._apply().dec(amount) + + def set(self, value): + self._apply().set(value) + + def set_to_current_time(self): + self._apply().set_to_current_time() + + def set_function(self, f): + self._apply().set_function(f) + +Gauge = metric_class(_Gauge, pc.Gauge) + + +class _Summary(_Labeled): + def observe(self, amount): + self._apply().observe(amount) + + +Summary = metric_class(_Summary, pc.Summary) + + +class _Histogram(_Labeled): + def observe(self, amount): + self._apply().observe(amount) + + +Histogram = metric_class(_Histogram, pc.Histogram) + + +class _Info(_Labeled): + def info(self, val): + self._apply().info(val) + + +Info = metric_class(_Info, pc.Info) + + +class _Enum(_Labeled): + def state(self, state): + self._apply().state(state) + + +Enum = metric_class(_Enum, pc.Enum) + diff --git a/src/dexorder/metric/metrics.py b/src/dexorder/metric/metrics.py new file mode 100644 index 0000000..51547aa --- /dev/null +++ b/src/dexorder/metric/metrics.py @@ -0,0 +1,20 @@ +from .metric_type import * + +# Put any set_function(...) calls in metric_startup.py:automatic_metrics() + +info = Info("backend_info", "Information about the backend process") + +block_current = Gauge("block_current", "Current block number being processed") +block_latest = Gauge("block_latest", "Highest block number seen") + +runner_loops = Counter("runner_loops", "Number of times the runner loop has been completed") +runner_latency = Summary("runner_latency", "How old the current block being processed is, in seconds") + +vaults = Gauge("vaults", "Total vault count", ) +open_orders = Gauge("open_orders", "Total active orders", ) +triggers_time = Gauge("triggers_time", "Total active time triggers", ) +triggers_line = Gauge("triggers_line", "Total active line triggers", ) + +account_balance = Gauge("account_balance", "Account balance", ["address", "token"]) +account_total = Gauge('account_total', 'Total number of accounts configured') +account_available = Gauge('account_available', 'Number of accounts that do not have any pending transactions') diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index ea56d89..00c64ed 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -8,7 +8,7 @@ from eth_bloom import BloomFilter # noinspection PyPackageRequirements from websockets.exceptions import ConnectionClosedError -from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now, timestamp +from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now, timestamp, metric from dexorder.base.block import Block, latest_block from dexorder.base.chain import current_chain, current_clock, BlockClock from dexorder.blockchain.connection import create_w3_ws, create_w3 @@ -272,6 +272,8 @@ class BlockStateRunner(BlockProgressor): elif was_behind: log.info('Runner has caught up') was_behind = False + metric.runner_latency.observe(behind) + metric.runner_loops.inc() except Exception as e: fatal('Unhandled exception in runner worker', exception=e) finally: @@ -283,6 +285,7 @@ class BlockStateRunner(BlockProgressor): w3 = current_w3.get() current_blockstate.set(self.state) current_fork.set(fork) + metric.block_current.set(fork.height) batches = [] pubs = [] session = db.make_session(autocommit=False) @@ -423,5 +426,6 @@ class BlockStateRunner(BlockProgressor): def set_latest_block(block): cache_block(block) latest_block[block.chain_id] = block + metric.block_latest.set(block.height) current_clock.get().update(block.timestamp) diff --git a/src/dexorder/tokens.py b/src/dexorder/tokens.py index f1e9a52..3a7d683 100644 --- a/src/dexorder/tokens.py +++ b/src/dexorder/tokens.py @@ -7,6 +7,7 @@ from web3.exceptions import BadFunctionCallOutput from dexorder import ADDRESS_0, db, NATIVE_TOKEN, dec, current_w3 from dexorder.addrmeta import address_metadata from dexorder.base.chain import current_chain +from dexorder.blockstate.fork import current_fork from dexorder.contract import ERC20, ContractProxy, CONTRACT_ERRORS from dexorder.database.model import Token from dexorder.database.model.token import OldTokenDict @@ -34,7 +35,11 @@ async def get_erc20_balance(addr, token_addr, *, adjust_decimals=True): # noinspection PyShadowingNames async def get_native_balance(addr, *, adjust_decimals=True) -> dec: - value = dec(await current_w3.get().eth.get_balance(addr)) + try: + block_id = current_fork.get().height + except LookupError: + block_id = 'latest' + value = dec(await current_w3.get().eth.get_balance(addr, block_identifier=block_id)) if adjust_decimals: value /= 10 ** 18 return value diff --git a/src/dexorder/walker.py b/src/dexorder/walker.py index ecf6107..a1e1202 100644 --- a/src/dexorder/walker.py +++ b/src/dexorder/walker.py @@ -4,7 +4,7 @@ from asyncio import Queue from datetime import timedelta from typing import Union, Callable -from dexorder import config, db, now, current_w3 +from dexorder import config, db, now, current_w3, metric from dexorder.base.block import Block, BlockInfo, latest_block from dexorder.base.chain import current_chain from dexorder.blocks import promotion_height @@ -57,6 +57,7 @@ class BlockWalker (BlockProgressor): processed_height = cur + config.backfill if config.backfill < 0 else cur log.info(f'walker starting at block {processed_height}') + metric.block_current.set(processed_height) last_flush = processed_height if self.flush_type == 'blocks' else now() if self.flush_type == 'time' else None prev_height = None session = db.session @@ -67,6 +68,7 @@ class BlockWalker (BlockProgressor): latest_blockdata: BlockInfo = await w3.eth.get_block('latest') latest = Block(chain_id, latest_blockdata) latest_block[chain_id] = latest + metric.block_latest.set(latest.height) if prev_height is None or latest.height > prev_height: prev_height = latest.height log.debug(f'polled new block {latest.height}') @@ -93,6 +95,7 @@ class BlockWalker (BlockProgressor): db.session.commit() db.session.begin() processed_height = cur_height + metric.block_current.set(cur_height) if not self.running or config.walker_stop is not None and config.walker_stop <= processed_height: break await asyncio.sleep(config.polling or 1)