Compare commits
8 Commits
e95acda52d
...
adba28db32
| Author | SHA1 | Date | |
|---|---|---|---|
| adba28db32 | |||
| 90d6440c5a | |||
| 08e421712a | |||
| 7f501222f8 | |||
| 5bcbae1d94 | |||
| 416cff80b0 | |||
| b22c044028 | |||
| d838412b2b |
File diff suppressed because one or more lines are too long
@@ -1,6 +1,8 @@
|
||||
rpc_url = '${rpc_urls.finaldata}'
|
||||
rpc_url = 'arbitrum_dxod'
|
||||
archive_url = 'finaldata'
|
||||
ws_url = ''
|
||||
redis_url = ''
|
||||
ohlc_dir = '/ohlc'
|
||||
walker_flush_interval=25
|
||||
concurrent_rpc_connections=9999
|
||||
pagerduty='' # disable
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
aiohappyeyeballs==2.4.3
|
||||
aiohttp==3.11.10
|
||||
aiohttp==3.11.12
|
||||
aiosignal==1.3.1
|
||||
alembic==1.14.0
|
||||
alembic==1.14.1
|
||||
annotated-types==0.7.0
|
||||
antlr4-python3-runtime==4.9.3
|
||||
asn1crypto==1.5.1
|
||||
@@ -9,11 +9,11 @@ async-lru==2.0.4
|
||||
attrs==23.2.0
|
||||
bip-utils==2.9.3
|
||||
bitarray==3.0.0
|
||||
cachetools==5.5.0
|
||||
cachetools==5.5.1
|
||||
cbor2==5.6.4
|
||||
certifi==2024.2.2
|
||||
cffi==1.16.0
|
||||
charset-normalizer==3.4.0
|
||||
charset-normalizer==3.4.1
|
||||
ckzg==1.0.2
|
||||
coincurve==20.0.0
|
||||
crcmod==1.7
|
||||
@@ -23,14 +23,14 @@ durationpy==0.9
|
||||
ecdsa==0.19.0
|
||||
ed25519-blake2b==1.4.1
|
||||
eth-account==0.11.3
|
||||
eth-bloom==3.0.1
|
||||
eth-bloom==3.1.0
|
||||
eth-hash==0.7.0
|
||||
eth-keyfile==0.8.1
|
||||
eth-keys==0.6.0
|
||||
eth-keys==0.6.1
|
||||
eth-rlp==1.0.1
|
||||
eth-typing==4.4.0
|
||||
eth-utils==4.1.1
|
||||
eth_abi==5.1.0
|
||||
eth_abi==5.2.0
|
||||
frozenlist==1.4.1
|
||||
google-auth==2.35.0
|
||||
greenlet==3.0.3
|
||||
@@ -45,12 +45,13 @@ Mako==1.3.3
|
||||
MarkupSafe==2.1.5
|
||||
msgpack-python==0.5.6
|
||||
multidict==6.0.5
|
||||
numpy==2.2.0
|
||||
numpy==2.2.2
|
||||
oauthlib==3.2.2
|
||||
omegaconf==2.3.0
|
||||
orjson==3.10.12
|
||||
orjson==3.10.15
|
||||
parsimonious==0.10.0
|
||||
pdpyras==5.3.0
|
||||
pdpyras==5.4.0
|
||||
prometheus_client==0.21.1
|
||||
propcache==0.2.0
|
||||
protobuf==5.26.1
|
||||
psycopg2-binary==2.9.10
|
||||
@@ -63,7 +64,7 @@ pydantic==2.9.2
|
||||
pydantic_core==2.23.4
|
||||
PyNaCl==1.5.0
|
||||
python-dateutil==2.9.0.post0
|
||||
pytz==2024.2
|
||||
pytz==2025.1
|
||||
pyunormalize==15.1.0
|
||||
PyYAML==6.0.1
|
||||
redis==5.2.1
|
||||
@@ -77,12 +78,12 @@ rsa==4.9
|
||||
six==1.16.0
|
||||
socket.io-emitter==0.1.5.1
|
||||
sortedcontainers==2.4.0
|
||||
SQLAlchemy==2.0.36
|
||||
SQLAlchemy==2.0.38
|
||||
toolz==0.12.1
|
||||
types-requests==2.32.0.20240914
|
||||
typing_extensions==4.12.2
|
||||
urllib3==2.2.1
|
||||
web3==6.20.3
|
||||
websocket-client==1.8.0
|
||||
websockets==14.1
|
||||
websockets==14.2
|
||||
yarl==1.17.2
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
sqlalchemy
|
||||
alembic
|
||||
omegaconf
|
||||
web3>=6,<7
|
||||
web3<7
|
||||
psycopg2-binary
|
||||
orjson
|
||||
sortedcontainers
|
||||
@@ -29,3 +29,4 @@ requests
|
||||
aiohttp
|
||||
charset-normalizer
|
||||
pytz
|
||||
prometheus_client
|
||||
|
||||
@@ -3,13 +3,14 @@ import logging
|
||||
from contextvars import ContextVar
|
||||
from datetime import datetime, timezone
|
||||
from decimal import Decimal
|
||||
from typing import Callable, Any
|
||||
from typing import Callable, Any, Union, Optional
|
||||
|
||||
from web3 import AsyncWeb3
|
||||
|
||||
order_log = logging.getLogger('dexorder.order.log')
|
||||
|
||||
dec = Decimal
|
||||
|
||||
def now():
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Union
|
||||
|
||||
from sqlalchemy import select, func, text
|
||||
from typing_extensions import Optional
|
||||
from web3.exceptions import ContractLogicError
|
||||
from web3.types import EventData
|
||||
|
||||
from dexorder import db, dec, NATIVE_TOKEN, from_timestamp, config, ADDRESS_0, now, Account
|
||||
from dexorder import db, dec, NATIVE_TOKEN, from_timestamp, config, ADDRESS_0, now, Account, metric
|
||||
from dexorder.base import TransactionReceiptDict
|
||||
from dexorder.base.chain import current_chain
|
||||
from dexorder.blocks import get_block_timestamp, get_block, current_block
|
||||
@@ -22,6 +23,11 @@ log = logging.getLogger(__name__)
|
||||
|
||||
accounting_initialized = False
|
||||
|
||||
_tracked_addrs = set()
|
||||
|
||||
def is_tracked_address(addr: str) -> bool:
|
||||
return addr in _tracked_addrs
|
||||
|
||||
|
||||
class ReconciliationException(Exception):
|
||||
pass
|
||||
@@ -58,6 +64,8 @@ async def initialize_accounts_2():
|
||||
await asyncio.gather(
|
||||
*map(adjust_balance, (of_account, gf_account, ff_account, *exe_accounts))
|
||||
)
|
||||
for db_account in db.session.execute(select(DbAccount)).scalars():
|
||||
_tracked_addrs.add(db_account.address)
|
||||
|
||||
|
||||
async def initialize_mark_to_market():
|
||||
@@ -128,13 +136,16 @@ def ensure_account(addr: str, kind: AccountKind) -> DbAccount:
|
||||
log.warning(f'Account {addr} has wrong kind {found.kind} != {kind}')
|
||||
found.kind = kind
|
||||
db.session.add(found)
|
||||
_tracked_addrs.add(found.address)
|
||||
else:
|
||||
found = DbAccount(chain=chain, address=addr, kind=kind, balances={})
|
||||
db.session.add(found)
|
||||
_tracked_addrs.add(found.address)
|
||||
return found
|
||||
|
||||
|
||||
async def accounting_transfer(receipt: TransactionReceiptDict, token: str, sender: str, receiver: str, amount: dec, adjust_decimals=True):
|
||||
async def accounting_transfer(receipt: TransactionReceiptDict, token: str,
|
||||
sender: str, receiver: str, amount: Union[dec,int], adjust_decimals=True):
|
||||
block_hash = hexstr(receipt['blockHash'])
|
||||
tx_id = hexstr(receipt['transactionHash'])
|
||||
await asyncio.gather(
|
||||
@@ -189,18 +200,20 @@ async def add_accounting_row(account: str, block_hash: Optional[str], tx_id: Opt
|
||||
time = now() if block_hash is None else from_timestamp(await get_block_timestamp(block_hash))
|
||||
value = mark_to_market(token, amount)
|
||||
log.debug(f'accounting row {time} {account} {category} {subcategory} {token} {amount} ${value}')
|
||||
db.session.add(Accounting( account=account,
|
||||
time=time, category=category, subcategory=subcategory,
|
||||
token=token, amount=amount, value=value, note=note,
|
||||
chain_id=current_chain.get().id, tx_id=tx_id,
|
||||
))
|
||||
chain_id = current_chain.get().id
|
||||
db.session.add(Accounting(account=account,
|
||||
time=time, category=category, subcategory=subcategory,
|
||||
token=token, amount=amount, value=value, note=note,
|
||||
chain_id=chain_id, tx_id=tx_id,
|
||||
))
|
||||
# Adjust database account if it exists
|
||||
account_db = db.session.get(DbAccount, (current_chain.get(), account))
|
||||
if account_db is not None:
|
||||
if is_tracked_address(account):
|
||||
account_db = db.session.get(DbAccount, (current_chain.get(), account))
|
||||
new_amount = account_db.balances.get(token, dec(0)) + amount
|
||||
if new_amount < 0:
|
||||
log.error(f'negative balance for account {account} when applying accounting row {time} {category} {subcategory} {token} {amount} ${value}')
|
||||
account_db.balances[token] = new_amount
|
||||
metric.account_balance.labels(address=account, token=token).set(new_amount)
|
||||
db.session.add(account_db) # deep changes would not be detected by the ORM
|
||||
else:
|
||||
log.warning(f'No db account found for {account}')
|
||||
|
||||
@@ -10,7 +10,7 @@ log = logging.getLogger(__name__)
|
||||
|
||||
def alert(title, message, dedup_key=NARG, log_level=logging.ERROR, do_log=True):
|
||||
if dedup_key is NARG:
|
||||
dedup_key = str(hash(title + '|' + message))
|
||||
dedup_key = str(hash(title))
|
||||
if do_log:
|
||||
msg = f'{title}: {message}'
|
||||
log.log(log_level, msg) # if log_level=CRITICAL for example, make sure this does not re-alert!
|
||||
|
||||
@@ -6,7 +6,7 @@ import eth_account
|
||||
from eth_account.signers.local import LocalAccount
|
||||
from web3.middleware import construct_sign_and_send_raw_middleware
|
||||
|
||||
from dexorder import config, current_w3
|
||||
from dexorder import config, current_w3, metric
|
||||
from dexorder.base.chain import current_chain
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -46,6 +46,7 @@ class Account (LocalAccount):
|
||||
except asyncio.TimeoutError:
|
||||
log.error('waiting for an available account')
|
||||
result = await Account._pool.get()
|
||||
metric.account_available.set(Account._pool.qsize())
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
@@ -59,7 +60,8 @@ class Account (LocalAccount):
|
||||
Account._main_account = account
|
||||
Account._pool.put_nowait(account)
|
||||
Account._all.append(account)
|
||||
|
||||
metric.account_available.set(Account._pool.qsize())
|
||||
metric.account_total.set(len(Account._all))
|
||||
|
||||
def __init__(self, local_account: LocalAccount): # todo chain_id?
|
||||
super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code
|
||||
@@ -83,6 +85,7 @@ class Account (LocalAccount):
|
||||
return current_w3.get().eth.get_balance(self.address)
|
||||
|
||||
def release(self):
|
||||
metric.account_available.set(Account._pool.qsize() + 1)
|
||||
Account._pool.put_nowait(self)
|
||||
|
||||
def __str__(self):
|
||||
|
||||
@@ -1,17 +1,17 @@
|
||||
import asyncio
|
||||
import inspect
|
||||
import logging
|
||||
import logging.config
|
||||
import sys
|
||||
import tomllib
|
||||
from asyncio import CancelledError
|
||||
from traceback import print_exception
|
||||
import asyncio
|
||||
from signal import Signals
|
||||
from traceback import print_exception
|
||||
from typing import Coroutine
|
||||
|
||||
import sys
|
||||
|
||||
from dexorder import configuration
|
||||
from dexorder import configuration, config
|
||||
from dexorder.alert import init_alerts
|
||||
from dexorder.metric.metric_startup import start_metrics_server
|
||||
|
||||
if __name__ == '__main__':
|
||||
raise Exception('this file is meant to be imported not executed')
|
||||
@@ -43,9 +43,15 @@ def execute(main:Coroutine, shutdown=None, *, parse_logging=True, parse_args=Tru
|
||||
log.setLevel(logging.DEBUG)
|
||||
log.info('Logging configured to default')
|
||||
if parse_args:
|
||||
# NOTE: there is special command-line argument handling in config/load.py to get a config filename.
|
||||
# The -c/--config flag MUST BE FIRST if present.
|
||||
configuration.parse_args()
|
||||
|
||||
init_alerts()
|
||||
|
||||
if config.metrics_port:
|
||||
start_metrics_server()
|
||||
|
||||
# loop setup
|
||||
loop = asyncio.get_event_loop()
|
||||
signals = Signals.SIGQUIT, Signals.SIGTERM, Signals.SIGINT
|
||||
|
||||
@@ -2,6 +2,7 @@ import logging
|
||||
from asyncio import CancelledError
|
||||
|
||||
from dexorder import db, blockchain
|
||||
from dexorder.alert import warningAlert
|
||||
from dexorder.base.chain import current_chain
|
||||
from dexorder.bin.executable import execute
|
||||
from dexorder.blockstate import current_blockstate
|
||||
@@ -12,7 +13,7 @@ from dexorder.contract import get_contract_event
|
||||
from dexorder.contract.dexorder import get_dexorder_contract
|
||||
from dexorder.event_handler import (init, dump_log, handle_vault_created, handle_order_placed,
|
||||
handle_transfer, handle_swap_filled, handle_order_canceled, handle_order_cancel_all,
|
||||
handle_uniswap_swaps, handle_vault_impl_changed)
|
||||
handle_uniswap_swaps, handle_vault_impl_changed, update_metrics)
|
||||
from dexorder.memcache import memcache
|
||||
from dexorder.memcache.memcache_state import RedisState, publish_all
|
||||
from dexorder.order.executionhandler import handle_dexorderexecutions, execute_tranches
|
||||
@@ -74,10 +75,12 @@ def setup_logevent_triggers(runner):
|
||||
|
||||
runner.add_callback(end_trigger_updates)
|
||||
runner.add_callback(execute_tranches)
|
||||
runner.add_callback(update_metrics)
|
||||
|
||||
|
||||
# noinspection DuplicatedCode
|
||||
async def main():
|
||||
warningAlert('Started', 'backend has started', log_level=logging.INFO)
|
||||
await blockchain.connect(autosign=False) # the transaction manager checks out accounts and releases them.
|
||||
redis_state = None
|
||||
state = None
|
||||
|
||||
@@ -5,7 +5,7 @@ from sqlalchemy import select
|
||||
from dexorder import db, blockchain
|
||||
from dexorder.accounting import reconcile
|
||||
from dexorder.bin.executable import execute
|
||||
from dexorder.blocks import current_block, fetch_latest_block
|
||||
from dexorder.blocks import fetch_latest_block, current_block
|
||||
from dexorder.database.model import DbAccount
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -13,7 +13,8 @@ log = logging.getLogger(__name__)
|
||||
async def main():
|
||||
await blockchain.connect()
|
||||
db.connect()
|
||||
current_block.set(await fetch_latest_block())
|
||||
block = await fetch_latest_block()
|
||||
current_block.set(block)
|
||||
try:
|
||||
accounts = db.session.execute(select(DbAccount)).scalars().all()
|
||||
for account in accounts:
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
import logging
|
||||
|
||||
from dexorder import blockchain, db
|
||||
from dexorder.bin.executable import execute
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
async def main():
|
||||
pass
|
||||
await blockchain.connect()
|
||||
db.connect()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@@ -1,64 +1,94 @@
|
||||
import asyncio
|
||||
import itertools
|
||||
import logging
|
||||
from random import random
|
||||
from typing import Any, Optional, Union
|
||||
from typing import Any, Optional, Union, Callable
|
||||
|
||||
# noinspection PyPackageRequirements
|
||||
from aiohttp import ClientResponseError, ClientSession, ClientTimeout, TCPConnector
|
||||
from eth_typing import URI
|
||||
from hexbytes import HexBytes
|
||||
from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider
|
||||
from web3.exceptions import Web3Exception
|
||||
from web3.middleware.signing import async_construct_sign_and_send_raw_middleware
|
||||
from web3.types import RPCEndpoint, RPCResponse
|
||||
|
||||
from .. import current_w3, Blockchain, config, Account, NARG
|
||||
from ..base.chain import current_chain
|
||||
from ..configuration import resolve_rpc_url
|
||||
from ..configuration.resolve import resolve_ws_url
|
||||
from ..contract import get_contract_data
|
||||
|
||||
|
||||
async def connect(rpc_url=None, account=NARG, autosign=True, name='default'):
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def connect(rpc_url: Union[str,list[str]]=None, account=NARG, autosign=True, name='default', *, archive_url: Union[str,list[str]] = None):
|
||||
"""
|
||||
connects to the rpc_url and configures context vars
|
||||
"""
|
||||
w3 = await create_w3(rpc_url, account, autosign, name)
|
||||
w3 = await create_w3(rpc_url, account, autosign, name, archive_url=archive_url)
|
||||
current_w3.set(w3)
|
||||
current_chain.set(Blockchain.get(await w3.eth.chain_id))
|
||||
chain = Blockchain.get(await w3.eth.chain_id)
|
||||
current_chain.set(chain)
|
||||
return w3
|
||||
|
||||
|
||||
async def create_w3(rpc_url=None, account=NARG, autosign=True, name='default'):
|
||||
# todo create a proxy w3 that rotates among rpc urls
|
||||
# self.w3s = tuple(await create_w3(url) for url in rpc_url_or_tag)
|
||||
# chain_id = self.w3s[0].eth.chain_id
|
||||
# assert all(w3.eth.chain_id == chain_id for w3 in self.w3s) # all rpc urls must be the same blockchain
|
||||
# self.w3iter = itertools.cycle(self.w3s)
|
||||
url = resolve_rpc_url(rpc_url)
|
||||
connector = TCPConnector(limit=config.concurrent_rpc_connections)
|
||||
session = ClientSession(connector=connector, timeout=ClientTimeout(config.rpc_timeout))
|
||||
http_provider = RetryHTTPProvider(url)
|
||||
await http_provider.cache_async_session(session)
|
||||
w3 = AsyncWeb3(http_provider)
|
||||
# w3.middleware_onion.inject(geth_poa_middleware, layer=0) # todo is this line needed?
|
||||
# w3.middleware_onion.add(simple_cache_middleware)
|
||||
# log.debug(f'middleware {list(w3.middleware_onion.middlewares)}')
|
||||
w3.middleware_onion.remove('attrdict')
|
||||
w3.middleware_onion.add(clean_input_async, 'clean_input')
|
||||
w3.eth.Contract = _make_contract(w3.eth)
|
||||
has_account = False
|
||||
if autosign:
|
||||
if account is NARG:
|
||||
account = Account.get()
|
||||
if account is not None:
|
||||
# noinspection PyTypeChecker
|
||||
w3.middleware_onion.add(await async_construct_sign_and_send_raw_middleware(account))
|
||||
w3.eth.default_account = account.address
|
||||
has_account = True
|
||||
log.info(f'{name} w3 configured to autosign as {account.address}')
|
||||
if not has_account:
|
||||
log.info(f'No account set for {name} w3')
|
||||
return w3
|
||||
async def create_w3(rpc_url: Union[str,list[str]]=None, account=NARG, autosign=True, name='default', *, archive_url: Union[str,list[str]] = None) -> AsyncWeb3:
|
||||
if rpc_url is None:
|
||||
rpc_url = config.rpc_url
|
||||
if isinstance(rpc_url, str):
|
||||
rpc_urls = [resolve_rpc_url(s) for url in rpc_url.split(',') if (s:=url.strip()) != '']
|
||||
elif isinstance(rpc_url, list):
|
||||
rpc_urls = [resolve_rpc_url(s) for url in rpc_url if (s:=url.strip()) != '']
|
||||
else:
|
||||
raise ValueError("rpc_url must be a string or list of strings")
|
||||
|
||||
if archive_url is None:
|
||||
archive_url = config.archive_url
|
||||
if archive_url is None:
|
||||
archive_urls = []
|
||||
elif isinstance(archive_url, str):
|
||||
archive_urls = [resolve_rpc_url(s) for url in archive_url.split(',') if (s:=url.strip()) != '']
|
||||
elif isinstance(archive_url, list):
|
||||
archive_urls = [resolve_rpc_url(s) for url in archive_url if (s:=url.strip()) != '']
|
||||
else:
|
||||
raise ValueError("archive_url must be a string or list of strings")
|
||||
|
||||
if not rpc_urls:
|
||||
raise ValueError("No rpc_url configured")
|
||||
|
||||
w3_instances = []
|
||||
archive_instances = []
|
||||
for (url, archive) in itertools.chain(((url, False) for url in rpc_urls), ((url, True) for url in archive_urls)):
|
||||
connector = TCPConnector(limit=config.concurrent_rpc_connections)
|
||||
session = ClientSession(connector=connector, timeout=ClientTimeout(config.rpc_timeout))
|
||||
http_provider = RetryHTTPProvider(url)
|
||||
await http_provider.cache_async_session(session)
|
||||
w3 = AsyncWeb3(http_provider)
|
||||
w3.middleware_onion.add(archive_intercept_middleware, 'block_number_intercept_middleware')
|
||||
w3.middleware_onion.remove('attrdict')
|
||||
w3.middleware_onion.add(clean_input_async, 'clean_input')
|
||||
w3.eth.Contract = _make_contract(w3.eth)
|
||||
# Highest block number that has reported a -32000 error indicating a lack of history that far back
|
||||
w3.archive_fault_height = -1
|
||||
if autosign:
|
||||
if account is NARG:
|
||||
account = Account.get()
|
||||
if account is not None:
|
||||
# noinspection PyTypeChecker
|
||||
w3.middleware_onion.add(await async_construct_sign_and_send_raw_middleware(account))
|
||||
w3.eth.default_account = account.address
|
||||
if archive:
|
||||
archive_instances.append(w3)
|
||||
else:
|
||||
w3_instances.append(w3)
|
||||
|
||||
# Ensure all instances share the same chain ID
|
||||
chain_ids = await asyncio.gather(*(w3.eth.chain_id for w3 in itertools.chain(w3_instances, archive_instances)))
|
||||
if len(set(chain_ids)) != 1:
|
||||
raise RuntimeError("All RPC URLs must belong to the same blockchain")
|
||||
|
||||
# noinspection PyTypeChecker
|
||||
return RoundRobinWebProxy(w3_instances, archive_instances) if len(w3_instances) > 1 or archive_instances else w3_instances[0]
|
||||
|
||||
|
||||
async def create_w3_ws(ws_url=None) -> AsyncWeb3:
|
||||
@@ -79,6 +109,30 @@ async def create_w3_ws(ws_url=None) -> AsyncWeb3:
|
||||
return w3
|
||||
|
||||
|
||||
def resolve_rpc_url(rpc_url=None):
|
||||
if rpc_url is None:
|
||||
rpc_url = config.rpc_url
|
||||
if rpc_url == 'test':
|
||||
return 'http://localhost:8545'
|
||||
try:
|
||||
return config.rpc_urls[rpc_url] # look up aliases
|
||||
except KeyError:
|
||||
pass
|
||||
return rpc_url
|
||||
|
||||
|
||||
def resolve_ws_url(ws_url=None):
|
||||
if ws_url is None:
|
||||
ws_url = config.ws_url
|
||||
if ws_url == 'test':
|
||||
return 'ws://localhost:8545'
|
||||
try:
|
||||
return config.rpc_urls[ws_url] # look up aliases
|
||||
except KeyError:
|
||||
pass
|
||||
return ws_url
|
||||
|
||||
|
||||
def _clean(obj):
|
||||
if type(obj) is HexBytes:
|
||||
return bytes(obj)
|
||||
@@ -122,7 +176,123 @@ def _make_contract(w3_eth):
|
||||
return f
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
#
|
||||
# ARCHIVE NODE MANAGEMENT
|
||||
#
|
||||
|
||||
# Regular RPC nodes do not necessarily have the full state history available. The code below tracks the block heights
|
||||
# of historical data lookup failures on rpc nodes and automatically retries failed history requests on an archive_rpc
|
||||
# node. Archive RPC nodes are otherwise not used unless they are also listed in the rpc_url config.
|
||||
|
||||
# Define methods that may carry a `block_identifier` parameter,
|
||||
# along with the required number of arguments to include it.
|
||||
|
||||
ARCHIVE_METHODS = {
|
||||
# Examples:
|
||||
"eth_getBalance": 2, # e.g., get_balance(address, block_identifier)
|
||||
"eth_call": 2, # e.g., contract.call(params, block_identifier)
|
||||
"eth_getStorageAt": 3, # e.g., get_storage_at(address, position, block_identifier)
|
||||
# Add more methods as needed
|
||||
}
|
||||
|
||||
ARCHIVE_ERRORS = {
|
||||
'state recreation l2 gas depth limit exceeded',
|
||||
}
|
||||
|
||||
async def archive_intercept_middleware(make_request, w3):
|
||||
"""
|
||||
Middleware to intercept any call with `block_number` and manage marking archive_fault_height
|
||||
"""
|
||||
async def middleware(method, params):
|
||||
# Only intercept relevant methods
|
||||
expected_args = ARCHIVE_METHODS.get(method,-1)
|
||||
is_archive_method = len(params) == expected_args
|
||||
block_height = None
|
||||
if is_archive_method:
|
||||
block_identifier = params[-1]
|
||||
if block_identifier != 'latest':
|
||||
block_height = int(block_identifier, 16) if type(block_identifier) is str else int(params[-1])
|
||||
if block_height <= w3.archive_fault_height:
|
||||
# this block is at least as old as another block that already failed to fetch history from this RPC
|
||||
raise ArchiveException(method, block_height)
|
||||
resp = await make_request(method, params)
|
||||
if is_archive_method and 'error' in resp and resp['error']['message'] in ARCHIVE_ERRORS:
|
||||
if block_height is None:
|
||||
# noinspection PyUnboundLocalVariable
|
||||
raise Exception(f'Got an archive fault using a block_identifier of {block_identifier}: {w3.provider.endpoint_uri} {method} {params}\n{resp}')
|
||||
# noinspection PyTypeChecker
|
||||
w3.archive_fault_height = max(w3.archive_fault_height, block_height)
|
||||
raise ArchiveException(method, block_height)
|
||||
resp = await make_request(method, params)
|
||||
return resp
|
||||
|
||||
return middleware
|
||||
|
||||
|
||||
class ArchiveException (Exception):
|
||||
def __init__(self, method, block_number):
|
||||
super().__init__(f"Archive fault for method {method} at block {block_number}", block_number)
|
||||
self.method = method
|
||||
self.block_number = block_number
|
||||
|
||||
|
||||
class RoundRobinWebProxy:
|
||||
def __init__(self, w3_instances, archive_instances):
|
||||
if not w3_instances:
|
||||
raise ValueError("At least one w3 instance is required")
|
||||
self._w3_instances = w3_instances
|
||||
self._archive_instances = archive_instances
|
||||
self._index = 0
|
||||
self._archive_index = 0
|
||||
for w3 in self._w3_instances:
|
||||
w3.manager.coro_request = self.make_coro_request_function(w3)
|
||||
|
||||
def __getattr__(self, name):
|
||||
# proxy in a round-robin fashion
|
||||
return getattr(self._current(), name)
|
||||
|
||||
def _current(self):
|
||||
if self._index >= len(self._w3_instances):
|
||||
self._index = 0
|
||||
current_instance = self._w3_instances[self._index]
|
||||
self._index += 1
|
||||
return current_instance
|
||||
|
||||
def _current_archive(self):
|
||||
if self._archive_index >= len(self._archive_instances):
|
||||
self._archive_index = 0
|
||||
current_instance = self._archive_instances[self._archive_index]
|
||||
self._archive_index += 1
|
||||
return current_instance
|
||||
|
||||
|
||||
def make_coro_request_function(rrwp, w3):
|
||||
# This replaces w3.manager.coro_request with our own version that catches ArchiveExceptions and retries them
|
||||
# on an archive w3
|
||||
|
||||
### NOTE!!! ###
|
||||
# we use `self` to mean the RequestManager so we can copy that code directly over here.
|
||||
# instead we rename the RoundRobinWebProxy rrwp and name the w3 instance too
|
||||
self = w3.manager
|
||||
|
||||
async def RequestManager__coro_request(
|
||||
method: Union[RPCEndpoint, Callable[..., RPCEndpoint]],
|
||||
params: Any,
|
||||
error_formatters: Optional[Callable[..., Any]] = None,
|
||||
null_result_formatters: Optional[Callable[..., Any]] = None,
|
||||
) -> Any:
|
||||
"""
|
||||
Coroutine for making a request using the provider
|
||||
"""
|
||||
try:
|
||||
response = await self._coro_make_request(method, params)
|
||||
return self.formatted_response(
|
||||
response, params, error_formatters, null_result_formatters
|
||||
)
|
||||
except ArchiveException as e:
|
||||
w3.archive_fault_height = max(w3.archive_fault_height, e.block_number)
|
||||
return await rrwp._current_archive().manager.coro_request(method, params, error_formatters, null_result_formatters)
|
||||
return RequestManager__coro_request
|
||||
|
||||
|
||||
class RetryHTTPProvider (AsyncHTTPProvider):
|
||||
@@ -140,7 +310,6 @@ class RetryHTTPProvider (AsyncHTTPProvider):
|
||||
try:
|
||||
async with self.in_flight:
|
||||
await self.rate_allowed.wait()
|
||||
# log.debug(f'Requesting RPC call {method}')
|
||||
return await super().make_request(method, params)
|
||||
except ClientResponseError as e:
|
||||
if e.status != 429:
|
||||
|
||||
@@ -117,6 +117,14 @@ class BlockData (Generic[T]):
|
||||
fork = current_fork.get()
|
||||
state.delete_series(fork, self.series)
|
||||
|
||||
def upper_len(self):
|
||||
"""
|
||||
Since record values may be marked DELETE there is not an efficient way to know the exact length of a series.
|
||||
We could track it per-branch but instead we just return the number of keys, which is an upper bound on the
|
||||
series length.
|
||||
"""
|
||||
state = current_blockstate.get()
|
||||
return state.upper_len(self.series)
|
||||
|
||||
class BlockSet(Generic[T], Iterable[T], BlockData[T]):
|
||||
def __init__(self, series: Any, **tags):
|
||||
|
||||
@@ -179,6 +179,8 @@ class BlockState:
|
||||
difflist.remove(diff.entry)
|
||||
state_log.info(('removed' if remove_series_diffs else 'promoted')+f' branch {branch}')
|
||||
|
||||
def upper_len(self, series):
|
||||
return len(self.diffs_by_series.get(series, {}))
|
||||
|
||||
def get(self, fork: Fork, series, key, default=NARG):
|
||||
series_diffs = self.diffs_by_series.get(series)
|
||||
|
||||
@@ -1,3 +1,2 @@
|
||||
from .standard_accounts import test_accounts
|
||||
from .load import config, parse_args
|
||||
from .resolve import resolve_rpc_url
|
||||
|
||||
@@ -2,6 +2,7 @@ import os
|
||||
import tomllib
|
||||
from tomllib import TOMLDecodeError
|
||||
|
||||
import sys
|
||||
from omegaconf import OmegaConf, DictConfig
|
||||
from omegaconf.errors import OmegaConfBaseException
|
||||
|
||||
@@ -9,6 +10,7 @@ from .schema import Config
|
||||
|
||||
schema = OmegaConf.structured(Config())
|
||||
|
||||
_config_file = 'dexorder.toml'
|
||||
|
||||
class ConfigException (Exception):
|
||||
pass
|
||||
@@ -19,7 +21,7 @@ def load_config():
|
||||
result:ConfigDict = OmegaConf.merge(
|
||||
schema,
|
||||
from_toml('.secret.toml'),
|
||||
from_toml('dexorder.toml'),
|
||||
from_toml(_config_file),
|
||||
from_toml('config.toml'),
|
||||
from_env()
|
||||
)
|
||||
@@ -66,5 +68,12 @@ def parse_args(args=None):
|
||||
class ConfigDict (Config, DictConfig): # give type hints from Config plus methods from DictConfig
|
||||
pass
|
||||
|
||||
# Special command-line argument handling to get a config file. The -c/--config flag MUST BE FIRST.
|
||||
if len(sys.argv) > 1 and (sys.argv[1] == '-c' or sys.argv[1] == '--config'):
|
||||
if len(sys.argv) < 3:
|
||||
raise ConfigException('Missing config file argument')
|
||||
else:
|
||||
_config_file = sys.argv[2]
|
||||
sys.argv = [sys.argv[0], *sys.argv[3:]]
|
||||
|
||||
config = load_config()
|
||||
|
||||
@@ -1,25 +1,3 @@
|
||||
from .load import config
|
||||
|
||||
|
||||
def resolve_rpc_url(rpc_url=None):
|
||||
if rpc_url is None:
|
||||
rpc_url = config.rpc_url
|
||||
if rpc_url == 'test':
|
||||
return 'http://localhost:8545'
|
||||
try:
|
||||
return config.rpc_urls[rpc_url] # look up aliases
|
||||
except KeyError:
|
||||
pass
|
||||
return rpc_url
|
||||
|
||||
|
||||
def resolve_ws_url(ws_url=None):
|
||||
if ws_url is None:
|
||||
ws_url = config.ws_url
|
||||
if ws_url == 'test':
|
||||
return 'ws://localhost:8545'
|
||||
try:
|
||||
return config.rpc_urls[ws_url] # look up aliases
|
||||
except KeyError:
|
||||
pass
|
||||
return ws_url
|
||||
|
||||
@@ -11,13 +11,16 @@ from typing import Optional
|
||||
class Config:
|
||||
confirms: Optional[int] = None # number of blocks before data is considered finalized. if None then the chain's default setting is used
|
||||
batch_size: Optional[int] = None # max number of blocks to query in a single backfill rpc request
|
||||
rpc_url: str = 'http://localhost:8545'
|
||||
rpc_url: str = 'http://localhost:8545' # may be a comma-separated list. may include names of entries in rpc_urls.
|
||||
archive_url: str = '' # these rpc URL's are not used unless a query uses an old block number that prior to what the currently-assigned rpc_url can provide
|
||||
ws_url: Optional[str] = 'ws://localhost:8545'
|
||||
rpc_urls: Optional[dict[str,str]] = field(default_factory=dict)
|
||||
db_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorder'
|
||||
dump_sql: bool = False
|
||||
redis_url: Optional[str] = 'redis://localhost:6379'
|
||||
|
||||
metrics_port: Optional[int] = None
|
||||
|
||||
cache_blocks_in_db: bool = False
|
||||
metadata: Optional[str] = None
|
||||
ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk
|
||||
|
||||
@@ -7,6 +7,7 @@ from web3.exceptions import Web3Exception
|
||||
from web3.types import TxReceipt, TxData
|
||||
|
||||
from dexorder import current_w3, Account
|
||||
from dexorder.blocks import current_block
|
||||
from dexorder.blockstate.fork import current_fork
|
||||
from dexorder.util import hexstr
|
||||
|
||||
@@ -27,9 +28,7 @@ class ContractTransaction:
|
||||
self.receipt: Optional[TxReceipt] = None # todo could be multiple receipts for different branches!
|
||||
|
||||
def __repr__(self):
|
||||
# todo this is from an old status system
|
||||
receipt_status = 'IN_FLIGHT' if self.receipt is None else 'REVERTED' if self.receipt.status == 0 else self.receipt.blockNumber
|
||||
return f'Transaction({self.id},{receipt_status})'
|
||||
return f'tx-{self.id}'
|
||||
|
||||
async def wait(self) -> TxReceipt:
|
||||
if self.receipt is None:
|
||||
@@ -59,13 +58,14 @@ class DeployTransaction (ContractTransaction):
|
||||
|
||||
|
||||
def call_wrapper(addr, name, func):
|
||||
async def f(*args, **kwargs):
|
||||
async def f(*args, block_identifier=None, **kwargs):
|
||||
if block_identifier is None:
|
||||
try:
|
||||
block_identifier = current_block.get().height
|
||||
except (LookupError, AttributeError):
|
||||
block_identifier = 'latest'
|
||||
try:
|
||||
blockid = current_fork.get().head_identifier
|
||||
except (LookupError, AttributeError):
|
||||
blockid = 'latest'
|
||||
try:
|
||||
return await func(*args).call(block_identifier=blockid, **kwargs)
|
||||
return await func(*args).call(block_identifier=block_identifier, **kwargs)
|
||||
except Web3Exception as e:
|
||||
e.args += addr, name
|
||||
raise e
|
||||
|
||||
@@ -3,19 +3,20 @@ import logging
|
||||
|
||||
from web3.types import EventData
|
||||
|
||||
from dexorder import db
|
||||
from dexorder.accounting import accounting_fill, accounting_placement
|
||||
from dexorder import db, metric
|
||||
from dexorder.accounting import accounting_fill, accounting_placement, accounting_transfer, is_tracked_address
|
||||
from dexorder.base.chain import current_chain
|
||||
from dexorder.base.order import TrancheKey, OrderKey
|
||||
from dexorder.base.orderlib import SwapOrderState
|
||||
from dexorder.blocks import get_block_timestamp
|
||||
from dexorder.blockstate import current_blockstate
|
||||
from dexorder.contract.dexorder import VaultContract, get_factory_contract
|
||||
from dexorder.database.model import VaultCreationRequest
|
||||
from dexorder.impls import get_impl_version
|
||||
from dexorder.ohlc import ohlcs
|
||||
from dexorder.order.orderstate import Order
|
||||
from dexorder.order.triggers import (OrderTriggers, activate_order, update_balance_triggers, start_trigger_updates,
|
||||
update_price_triggers)
|
||||
update_price_triggers, TimeTrigger, PriceLineTrigger)
|
||||
from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data
|
||||
from dexorder.util import hexstr
|
||||
from dexorder.vault_blockdata import vault_owners, adjust_balance, verify_vault, publish_vaults
|
||||
@@ -118,13 +119,11 @@ async def handle_order_cancel_all(event: EventData):
|
||||
|
||||
|
||||
async def handle_transfer(transfer: EventData):
|
||||
# todo handle native transfers incl gas for token transfers
|
||||
# log.debug(f'Transfer {transfer}')
|
||||
from_address = transfer['args']['from']
|
||||
to_address = transfer['args']['to']
|
||||
if to_address == from_address:
|
||||
return
|
||||
amount = int(transfer['args']['value'])
|
||||
token_address = transfer['address']
|
||||
if to_address in vault_owners:
|
||||
log.debug(f'deposit {to_address} {amount}')
|
||||
vault = to_address
|
||||
@@ -134,10 +133,11 @@ async def handle_transfer(transfer: EventData):
|
||||
else:
|
||||
vault = None
|
||||
if vault is not None:
|
||||
token_address = transfer['address']
|
||||
await adjust_balance(vault, token_address, amount)
|
||||
await update_balance_triggers(vault, token_address, amount)
|
||||
|
||||
if is_tracked_address(to_address):
|
||||
# noinspection PyTypeChecker
|
||||
await accounting_transfer(transfer, token_address, from_address, to_address, amount, adjust_decimals=True)
|
||||
|
||||
async def handle_uniswap_swaps(swaps: list[EventData]):
|
||||
# asynchronously prefetch the block timestamps we'll need
|
||||
@@ -184,6 +184,10 @@ async def handle_vault_created(created: EventData):
|
||||
vault_owners[addr] = owner
|
||||
log.debug(f'VaultCreated {owner} #{num} => {addr}')
|
||||
publish_vaults(chain_id, owner)
|
||||
# BlockData doesn't have an easy way to calculate exact sizes because some keys could hold DELETE values, so
|
||||
# this is actually an upper limit on the size.
|
||||
approx_size = len(current_blockstate.get().diffs_by_series)
|
||||
metric.vaults.set(approx_size)
|
||||
|
||||
|
||||
async def handle_vault_impl_changed(upgrade: EventData):
|
||||
@@ -204,3 +208,9 @@ async def handle_vault_impl_changed(upgrade: EventData):
|
||||
version = await get_impl_version(impl)
|
||||
log.debug(f'Vault {addr} upgraded to impl version {version}')
|
||||
|
||||
|
||||
def update_metrics():
|
||||
metric.vaults.set_function(vault_owners.upper_len)
|
||||
metric.open_orders.set_function(Order.open_orders.upper_len)
|
||||
metric.triggers_time.set_function(lambda: len(TimeTrigger.all))
|
||||
metric.triggers_line.set_function(lambda: len(PriceLineTrigger.triggers_set))
|
||||
|
||||
8
src/dexorder/metric/__init__.py
Normal file
8
src/dexorder/metric/__init__.py
Normal file
@@ -0,0 +1,8 @@
|
||||
default_labels = dict(
|
||||
# Default label keys must be defined here but set their actual values later in metrics_startup.py:setup_default_labels()
|
||||
pod=None,
|
||||
chain_id=None,
|
||||
)
|
||||
|
||||
from .metric_type import *
|
||||
from .metrics import *
|
||||
34
src/dexorder/metric/metric_startup.py
Normal file
34
src/dexorder/metric/metric_startup.py
Normal file
@@ -0,0 +1,34 @@
|
||||
import os
|
||||
|
||||
from prometheus_client import start_http_server
|
||||
|
||||
from dexorder import config, metric, now
|
||||
from dexorder.base.chain import current_chain
|
||||
|
||||
_chain_id = None
|
||||
|
||||
|
||||
def _height_or_none(block):
|
||||
return None if block is None else block.height
|
||||
|
||||
|
||||
def start_metrics_server():
|
||||
# First, set default_labels
|
||||
setup_default_labels()
|
||||
# Start the http daemon thread
|
||||
start_http_server(config.metrics_port)
|
||||
|
||||
|
||||
def setup_default_labels():
|
||||
global _chain_id
|
||||
_chain_id = current_chain.get().id
|
||||
metric.default_labels['chain_id'] = _chain_id
|
||||
metric.default_labels['pod'] = os.environ.get('HOSTNAME', '')
|
||||
|
||||
metric.info.info({
|
||||
# MUST BE STRING VALUES
|
||||
'started': now().isoformat(),
|
||||
# 'pod': os.environ.get('HOSTNAME', ''),
|
||||
# 'chain_id': _chain_id,
|
||||
})
|
||||
|
||||
94
src/dexorder/metric/metric_type.py
Normal file
94
src/dexorder/metric/metric_type.py
Normal file
@@ -0,0 +1,94 @@
|
||||
__all__ = ['Counter', 'Gauge', 'Summary', 'Histogram', 'Info', 'Enum']
|
||||
|
||||
import functools
|
||||
|
||||
import prometheus_client as pc
|
||||
|
||||
from dexorder import metric
|
||||
from dexorder.metric import default_labels
|
||||
|
||||
|
||||
def metric_class(OurCls, PcCls, **defaultkw):
|
||||
def construct(defkw, name, documentation, labelnames=(), namespace='dexorder', **kwargs):
|
||||
kw = dict(defkw)
|
||||
kw.update(kwargs)
|
||||
labelnames = tuple(labelnames) + tuple(metric.default_labels.keys()) + tuple(kw.pop('labelnames', ()))
|
||||
return OurCls(PcCls(name, documentation, labelnames, namespace, **kw))
|
||||
|
||||
return functools.partial(construct,defaultkw)
|
||||
|
||||
|
||||
class _Labeled:
|
||||
|
||||
def __init__(self, obj, labels=None):
|
||||
self._obj = obj
|
||||
self._labels = labels or {}
|
||||
|
||||
def labels(self, **kwargs):
|
||||
kw = self._labels.copy()
|
||||
kw.update(kwargs)
|
||||
return self.__class__(self._obj, kw)
|
||||
|
||||
def _apply(self):
|
||||
labels = default_labels | self._labels
|
||||
return self._obj.labels(**labels) if labels else self._obj
|
||||
|
||||
|
||||
class _Counter(_Labeled):
|
||||
def inc(self, amount=1):
|
||||
self._apply().inc(amount)
|
||||
|
||||
|
||||
Counter = metric_class(_Counter, pc.Counter)
|
||||
|
||||
|
||||
class _Gauge(_Labeled):
|
||||
def inc(self, amount=1):
|
||||
self._apply().inc(amount)
|
||||
|
||||
def dec(self, amount=1):
|
||||
self._apply().dec(amount)
|
||||
|
||||
def set(self, value):
|
||||
self._apply().set(value)
|
||||
|
||||
def set_to_current_time(self):
|
||||
self._apply().set_to_current_time()
|
||||
|
||||
def set_function(self, f):
|
||||
self._apply().set_function(f)
|
||||
|
||||
Gauge = metric_class(_Gauge, pc.Gauge)
|
||||
|
||||
|
||||
class _Summary(_Labeled):
|
||||
def observe(self, amount):
|
||||
self._apply().observe(amount)
|
||||
|
||||
|
||||
Summary = metric_class(_Summary, pc.Summary)
|
||||
|
||||
|
||||
class _Histogram(_Labeled):
|
||||
def observe(self, amount):
|
||||
self._apply().observe(amount)
|
||||
|
||||
|
||||
Histogram = metric_class(_Histogram, pc.Histogram)
|
||||
|
||||
|
||||
class _Info(_Labeled):
|
||||
def info(self, val):
|
||||
self._apply().info(val)
|
||||
|
||||
|
||||
Info = metric_class(_Info, pc.Info)
|
||||
|
||||
|
||||
class _Enum(_Labeled):
|
||||
def state(self, state):
|
||||
self._apply().state(state)
|
||||
|
||||
|
||||
Enum = metric_class(_Enum, pc.Enum)
|
||||
|
||||
20
src/dexorder/metric/metrics.py
Normal file
20
src/dexorder/metric/metrics.py
Normal file
@@ -0,0 +1,20 @@
|
||||
from .metric_type import *
|
||||
|
||||
# Put any set_function(...) calls in metric_startup.py:automatic_metrics()
|
||||
|
||||
info = Info("backend_info", "Information about the backend process")
|
||||
|
||||
block_current = Gauge("block_current", "Current block number being processed")
|
||||
block_latest = Gauge("block_latest", "Highest block number seen")
|
||||
|
||||
runner_loops = Counter("runner_loops", "Number of times the runner loop has been completed")
|
||||
runner_latency = Summary("runner_latency", "How old the current block being processed is, in seconds")
|
||||
|
||||
vaults = Gauge("vaults", "Total vault count", )
|
||||
open_orders = Gauge("open_orders", "Total active orders", )
|
||||
triggers_time = Gauge("triggers_time", "Total active time triggers", )
|
||||
triggers_line = Gauge("triggers_line", "Total active line triggers", )
|
||||
|
||||
account_balance = Gauge("account_balance", "Account balance", ["address", "token"])
|
||||
account_total = Gauge('account_total', 'Total number of accounts configured')
|
||||
account_available = Gauge('account_available', 'Number of accounts that do not have any pending transactions')
|
||||
@@ -8,7 +8,7 @@ from eth_bloom import BloomFilter
|
||||
# noinspection PyPackageRequirements
|
||||
from websockets.exceptions import ConnectionClosedError
|
||||
|
||||
from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now, timestamp
|
||||
from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now, timestamp, metric
|
||||
from dexorder.base.block import Block, latest_block
|
||||
from dexorder.base.chain import current_chain, current_clock, BlockClock
|
||||
from dexorder.blockchain.connection import create_w3_ws, create_w3
|
||||
@@ -272,6 +272,8 @@ class BlockStateRunner(BlockProgressor):
|
||||
elif was_behind:
|
||||
log.info('Runner has caught up')
|
||||
was_behind = False
|
||||
metric.runner_latency.observe(behind)
|
||||
metric.runner_loops.inc()
|
||||
except Exception as e:
|
||||
fatal('Unhandled exception in runner worker', exception=e)
|
||||
finally:
|
||||
@@ -283,6 +285,7 @@ class BlockStateRunner(BlockProgressor):
|
||||
w3 = current_w3.get()
|
||||
current_blockstate.set(self.state)
|
||||
current_fork.set(fork)
|
||||
metric.block_current.set(fork.height)
|
||||
batches = []
|
||||
pubs = []
|
||||
session = db.make_session(autocommit=False)
|
||||
@@ -423,5 +426,6 @@ class BlockStateRunner(BlockProgressor):
|
||||
def set_latest_block(block):
|
||||
cache_block(block)
|
||||
latest_block[block.chain_id] = block
|
||||
metric.block_latest.set(block.height)
|
||||
current_clock.get().update(block.timestamp)
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ from web3.exceptions import BadFunctionCallOutput
|
||||
from dexorder import ADDRESS_0, db, NATIVE_TOKEN, dec, current_w3
|
||||
from dexorder.addrmeta import address_metadata
|
||||
from dexorder.base.chain import current_chain
|
||||
from dexorder.blocks import current_block
|
||||
from dexorder.contract import ERC20, ContractProxy, CONTRACT_ERRORS
|
||||
from dexorder.database.model import Token
|
||||
from dexorder.database.model.token import OldTokenDict
|
||||
@@ -34,7 +35,11 @@ async def get_erc20_balance(addr, token_addr, *, adjust_decimals=True):
|
||||
|
||||
# noinspection PyShadowingNames
|
||||
async def get_native_balance(addr, *, adjust_decimals=True) -> dec:
|
||||
value = dec(await current_w3.get().eth.get_balance(addr))
|
||||
try:
|
||||
block_id = current_block.get().height
|
||||
except LookupError:
|
||||
block_id = 'latest'
|
||||
value = dec(await current_w3.get().eth.get_balance(addr, block_identifier=block_id))
|
||||
if adjust_decimals:
|
||||
value /= 10 ** 18
|
||||
return value
|
||||
|
||||
@@ -4,7 +4,7 @@ from asyncio import Queue
|
||||
from datetime import timedelta
|
||||
from typing import Union, Callable
|
||||
|
||||
from dexorder import config, db, now, current_w3
|
||||
from dexorder import config, db, now, current_w3, metric
|
||||
from dexorder.base.block import Block, BlockInfo, latest_block
|
||||
from dexorder.base.chain import current_chain
|
||||
from dexorder.blocks import promotion_height
|
||||
@@ -57,6 +57,7 @@ class BlockWalker (BlockProgressor):
|
||||
processed_height = cur + config.backfill if config.backfill < 0 else cur
|
||||
|
||||
log.info(f'walker starting at block {processed_height}')
|
||||
metric.block_current.set(processed_height)
|
||||
last_flush = processed_height if self.flush_type == 'blocks' else now() if self.flush_type == 'time' else None
|
||||
prev_height = None
|
||||
session = db.session
|
||||
@@ -67,6 +68,7 @@ class BlockWalker (BlockProgressor):
|
||||
latest_blockdata: BlockInfo = await w3.eth.get_block('latest')
|
||||
latest = Block(chain_id, latest_blockdata)
|
||||
latest_block[chain_id] = latest
|
||||
metric.block_latest.set(latest.height)
|
||||
if prev_height is None or latest.height > prev_height:
|
||||
prev_height = latest.height
|
||||
log.debug(f'polled new block {latest.height}')
|
||||
@@ -93,6 +95,7 @@ class BlockWalker (BlockProgressor):
|
||||
db.session.commit()
|
||||
db.session.begin()
|
||||
processed_height = cur_height
|
||||
metric.block_current.set(cur_height)
|
||||
if not self.running or config.walker_stop is not None and config.walker_stop <= processed_height:
|
||||
break
|
||||
await asyncio.sleep(config.polling or 1)
|
||||
|
||||
Reference in New Issue
Block a user