Compare commits
9 Commits
f3faaa3dd6
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4936150c3b | ||
| 88057607d5 | |||
| 36d0a863c6 | |||
| 89ce46793e | |||
| 2bcf5d043c | |||
| 71942d5b8f | |||
| ef44973646 | |||
| ce55609297 | |||
| a27300b5e4 |
30
alembic/versions/e47d1bca4b3d_sharedata.py
Normal file
30
alembic/versions/e47d1bca4b3d_sharedata.py
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
"""sharedata
|
||||||
|
|
||||||
|
Revision ID: e47d1bca4b3d
|
||||||
|
Revises: 509010f13e8b
|
||||||
|
Create Date: 2025-04-23 11:23:10.809341
|
||||||
|
|
||||||
|
"""
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from alembic import op
|
||||||
|
from sqlalchemy.dialects import postgresql
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision: str = 'e47d1bca4b3d'
|
||||||
|
down_revision: Union[str, None] = '509010f13e8b'
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
op.create_table('sharedata',
|
||||||
|
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
|
||||||
|
sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
|
||||||
|
sa.PrimaryKeyConstraint('id')
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
op.drop_table('sharedata')
|
||||||
@@ -42,10 +42,12 @@ class Account (LocalAccount):
|
|||||||
# log.debug(f'available accounts: {Account._pool.qsize()}')
|
# log.debug(f'available accounts: {Account._pool.qsize()}')
|
||||||
try:
|
try:
|
||||||
async with asyncio.timeout(1):
|
async with asyncio.timeout(1):
|
||||||
result = await Account._pool.get()
|
result: "Account" = await Account._pool.get()
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
log.error('waiting for an available account')
|
log.error('waiting for an available account')
|
||||||
result = await Account._pool.get()
|
result = await Account._pool.get()
|
||||||
|
# mark as out of pool
|
||||||
|
result._in_pool = False
|
||||||
metric.account_available.set(Account._pool.qsize())
|
metric.account_available.set(Account._pool.qsize())
|
||||||
return result
|
return result
|
||||||
|
|
||||||
@@ -59,17 +61,20 @@ class Account (LocalAccount):
|
|||||||
if Account._main_account is None:
|
if Account._main_account is None:
|
||||||
Account._main_account = account
|
Account._main_account = account
|
||||||
Account._pool.put_nowait(account)
|
Account._pool.put_nowait(account)
|
||||||
|
account._in_pool = True # this account is now in the pool
|
||||||
Account._all.append(account)
|
Account._all.append(account)
|
||||||
metric.account_available.set(Account._pool.qsize())
|
metric.account_available.set(Account._pool.qsize())
|
||||||
metric.account_total.set(len(Account._all))
|
metric.account_total.set(len(Account._all))
|
||||||
log.info(f'Account pool {[a.address for a in Account._all]}')
|
log.info(f'Account pool {[a.address for a in Account._all]}')
|
||||||
|
|
||||||
def __init__(self, local_account: LocalAccount): # todo chain_id?
|
def __init__(self, local_account: LocalAccount): # todo chain_id?
|
||||||
super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code
|
super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code
|
||||||
self.chain_id = current_chain.get().id
|
self.chain_id = current_chain.get().id
|
||||||
self.signing_middleware = construct_sign_and_send_raw_middleware(self)
|
self.signing_middleware = construct_sign_and_send_raw_middleware(self)
|
||||||
self._nonce: Optional[int] = None
|
self._nonce: Optional[int] = None
|
||||||
self.tx_id: Optional[str] = None # current transaction id
|
self.tx_id: Optional[str] = None # current transaction id
|
||||||
|
# release() idempotency tracking
|
||||||
|
self._in_pool: bool = False
|
||||||
|
|
||||||
async def next_nonce(self):
|
async def next_nonce(self):
|
||||||
if self._nonce is None:
|
if self._nonce is None:
|
||||||
@@ -86,8 +91,21 @@ class Account (LocalAccount):
|
|||||||
return current_w3.get().eth.get_balance(self.address)
|
return current_w3.get().eth.get_balance(self.address)
|
||||||
|
|
||||||
def release(self):
|
def release(self):
|
||||||
metric.account_available.set(Account._pool.qsize() + 1)
|
"""
|
||||||
|
Return this Account to the pool.
|
||||||
|
|
||||||
|
Idempotent: calling release() multiple times without a new acquire()
|
||||||
|
will only enqueue the account once.
|
||||||
|
"""
|
||||||
|
# If we're already in the pool, do nothing.
|
||||||
|
if self._in_pool:
|
||||||
|
# Optional debug log; comment out if too noisy.
|
||||||
|
# log.debug(f'Account {self.address} already in pool; ignoring extra release()')
|
||||||
|
return
|
||||||
|
|
||||||
Account._pool.put_nowait(self)
|
Account._pool.put_nowait(self)
|
||||||
|
self._in_pool = True
|
||||||
|
metric.account_available.set(Account._pool.qsize())
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return self.address
|
return self.address
|
||||||
|
|||||||
@@ -11,17 +11,29 @@ from dexorder.blockstate.fork import current_fork
|
|||||||
from dexorder.contract.dexorder import VaultContract
|
from dexorder.contract.dexorder import VaultContract
|
||||||
from dexorder.order.orderstate import Order
|
from dexorder.order.orderstate import Order
|
||||||
from dexorder.tokens import adjust_decimals
|
from dexorder.tokens import adjust_decimals
|
||||||
|
from dexorder.util import json
|
||||||
from dexorder.vault_blockdata import vault_balances, pretty_balances
|
from dexorder.vault_blockdata import vault_balances, pretty_balances
|
||||||
from dexorder.bin.executable import execute
|
from dexorder.bin.executable import execute
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
async def dump_orders(orders, args):
|
||||||
|
if args.json:
|
||||||
|
print(json.dumps([order.status.dump() for order in orders]))
|
||||||
|
else:
|
||||||
|
first = True
|
||||||
|
for order in orders:
|
||||||
|
if first:
|
||||||
|
first = False
|
||||||
|
else:
|
||||||
|
print()
|
||||||
|
print(await order.pprint())
|
||||||
|
|
||||||
def command_vault_argparse(subparsers):
|
def command_vault_argparse(subparsers):
|
||||||
parser = subparsers.add_parser('vault', help='show the vault\'s balances and orders')
|
parser = subparsers.add_parser('vault', help='show the vault\'s balances and orders')
|
||||||
parser.add_argument('address', help='address of the vault')
|
parser.add_argument('address', help='address of the vault')
|
||||||
parser.add_argument('--all', help='show all orders including closed ones', action='store_true')
|
parser.add_argument('--all', help='show all orders including closed ones', action='store_true')
|
||||||
# parser.add_argument('--json', help='output in JSON format', action='store_true')
|
parser.add_argument('--json', help='output in JSON format', action='store_true')
|
||||||
|
|
||||||
async def command_vault(args):
|
async def command_vault(args):
|
||||||
balances = vault_balances.get(args.address, {})
|
balances = vault_balances.get(args.address, {})
|
||||||
@@ -30,6 +42,7 @@ async def command_vault(args):
|
|||||||
print(pretty_balances({k: (await adjust_decimals(k, v)) for k, v in balances.items()}))
|
print(pretty_balances({k: (await adjust_decimals(k, v)) for k, v in balances.items()}))
|
||||||
print(f'Orders:')
|
print(f'Orders:')
|
||||||
i = 0
|
i = 0
|
||||||
|
orders = []
|
||||||
while True:
|
while True:
|
||||||
key = OrderKey(args.address, i)
|
key = OrderKey(args.address, i)
|
||||||
try:
|
try:
|
||||||
@@ -37,16 +50,19 @@ async def command_vault(args):
|
|||||||
except KeyError:
|
except KeyError:
|
||||||
break
|
break
|
||||||
if args.all or order.is_open:
|
if args.all or order.is_open:
|
||||||
print(await order.pprint())
|
orders.append(order)
|
||||||
i += 1
|
i += 1
|
||||||
|
await dump_orders(orders, args)
|
||||||
|
|
||||||
|
|
||||||
|
def command_open_argparse(subparsers):
|
||||||
|
parser = subparsers.add_parser('open', help='show all open orders')
|
||||||
|
parser.add_argument('--json', help='output in JSON format', action='store_true')
|
||||||
|
|
||||||
|
|
||||||
|
async def command_open(args):
|
||||||
|
await dump_orders([Order.of(key) for key in Order.open_orders], args)
|
||||||
|
|
||||||
# for key in Order.open_orders:
|
|
||||||
# order = Order.of(key)
|
|
||||||
# if args.json:
|
|
||||||
# print(json.dumps(order.status.dump()))
|
|
||||||
# else:
|
|
||||||
# print()
|
|
||||||
# print(order)
|
|
||||||
|
|
||||||
async def main(args: list):
|
async def main(args: list):
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
|
|||||||
@@ -95,7 +95,7 @@ async def write_metadata( pools, mirror_pools ):
|
|||||||
pool_dicts = [get_pool(addr) for (addr,_inverted) in mirror_pools]
|
pool_dicts = [get_pool(addr) for (addr,_inverted) in mirror_pools]
|
||||||
pool_dicts = await asyncio.gather(*pool_dicts)
|
pool_dicts = await asyncio.gather(*pool_dicts)
|
||||||
for data, addr, (_,inverted) in zip(pool_dicts, pools, mirror_pools):
|
for data, addr, (_,inverted) in zip(pool_dicts, pools, mirror_pools):
|
||||||
data['x'] = dict(data=dict(uri=f'https://app.dexorder.trade/ohlc/', chain=42161, symbol=addr, inverted=inverted))
|
data['x'] = dict(data=dict(uri=f'https://app.dexorder.com/ohlc/', chain=42161, symbol=addr, inverted=inverted))
|
||||||
tokens = set(p['base'] for p in pool_dicts)
|
tokens = set(p['base'] for p in pool_dicts)
|
||||||
tokens.update(p['quote'] for p in pool_dicts)
|
tokens.update(p['quote'] for p in pool_dicts)
|
||||||
tokens = await asyncio.gather(*[get_token(t) for t in tokens])
|
tokens = await asyncio.gather(*[get_token(t) for t in tokens])
|
||||||
|
|||||||
@@ -33,7 +33,8 @@ class ContractTransaction:
|
|||||||
async def wait(self) -> TxReceipt:
|
async def wait(self) -> TxReceipt:
|
||||||
if self.receipt is None:
|
if self.receipt is None:
|
||||||
self.receipt = await current_w3.get().eth.wait_for_transaction_receipt(self.id)
|
self.receipt = await current_w3.get().eth.wait_for_transaction_receipt(self.id)
|
||||||
self.account.release()
|
if self.account is not None:
|
||||||
|
self.account.release()
|
||||||
return self.receipt
|
return self.receipt
|
||||||
|
|
||||||
async def sign(self, account: Account):
|
async def sign(self, account: Account):
|
||||||
@@ -153,10 +154,14 @@ class ContractProxy:
|
|||||||
def __getattr__(self, item):
|
def __getattr__(self, item):
|
||||||
if item == 'constructor':
|
if item == 'constructor':
|
||||||
found = self.contract.constructor
|
found = self.contract.constructor
|
||||||
elif item in self.contract.functions:
|
|
||||||
found = self.contract.functions[item]
|
|
||||||
else:
|
else:
|
||||||
raise AttributeError(item)
|
funcs = self.contract.functions
|
||||||
|
# In web3.py v6+, contract functions are exposed as attributes, not via __getitem__.
|
||||||
|
# Using getattr ensures we obtain the callable factory for the function; indexing may return None.
|
||||||
|
# Additionally, guard against unexpected None to fail fast with a clear error.
|
||||||
|
found = getattr(funcs, item, None)
|
||||||
|
if not callable(found):
|
||||||
|
raise AttributeError(f"Function '{item}' not found on contract {self._interface_name} at {self.address}")
|
||||||
return self._wrapper(self.address, item, found)
|
return self._wrapper(self.address, item, found)
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
|
|||||||
@@ -10,3 +10,4 @@ from .ofac import OFAC, OFACAlerts
|
|||||||
from .accounting import Accounting, DbAccount
|
from .accounting import Accounting, DbAccount
|
||||||
from .vaultcreationrequest import VaultCreationRequest
|
from .vaultcreationrequest import VaultCreationRequest
|
||||||
from .tos import TOSAcceptance
|
from .tos import TOSAcceptance
|
||||||
|
from .sharedata import ShareData
|
||||||
|
|||||||
12
src/dexorder/database/model/sharedata.py
Normal file
12
src/dexorder/database/model/sharedata.py
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
from sqlalchemy.dialects.postgresql import JSONB
|
||||||
|
from sqlalchemy.orm import Mapped, mapped_column
|
||||||
|
|
||||||
|
from dexorder.database.model import Base
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class ShareData (Base):
|
||||||
|
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
|
||||||
|
data: Mapped[dict] = mapped_column(JSONB)
|
||||||
@@ -1,14 +1,69 @@
|
|||||||
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from contextvars import ContextVar
|
from contextvars import ContextVar
|
||||||
|
|
||||||
import redis.asyncio as redis_async
|
import redis.asyncio as redis_async
|
||||||
from redis.asyncio import Redis
|
from redis.asyncio import Redis
|
||||||
|
from redis.asyncio.client import Pipeline
|
||||||
|
|
||||||
from dexorder import config
|
from dexorder import config
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
BATCH_SIZE = 1_000
|
||||||
|
|
||||||
|
class PipelineProxy:
|
||||||
|
def __init__(self, pipe: Pipeline):
|
||||||
|
self.pipe = pipe
|
||||||
|
self.ops = 0
|
||||||
|
|
||||||
|
async def push(self, num=1):
|
||||||
|
self.ops += num
|
||||||
|
if self.ops >= BATCH_SIZE:
|
||||||
|
self.ops = 0
|
||||||
|
await self.pipe.execute()
|
||||||
|
|
||||||
|
async def sadd(self, series, *keys):
|
||||||
|
while keys:
|
||||||
|
most = min(BATCH_SIZE-self.ops, len(keys))
|
||||||
|
assert most > 0
|
||||||
|
send = keys[:most]
|
||||||
|
keys = keys[most:]
|
||||||
|
await self.pipe.sadd(series, *send)
|
||||||
|
await self.push(len(send))
|
||||||
|
|
||||||
|
async def srem(self, series, *keys):
|
||||||
|
while keys:
|
||||||
|
most = min(BATCH_SIZE-self.ops, len(keys))
|
||||||
|
assert most > 0
|
||||||
|
send = keys[:most]
|
||||||
|
keys = keys[most:]
|
||||||
|
await self.pipe.srem(series, *send)
|
||||||
|
await self.push(len(send))
|
||||||
|
|
||||||
|
async def hset(self, series, *, mapping):
|
||||||
|
items = list(mapping.items())
|
||||||
|
while items:
|
||||||
|
most = min(BATCH_SIZE-self.ops, len(items))
|
||||||
|
assert most > 0
|
||||||
|
send = items[:most]
|
||||||
|
items = items[most:]
|
||||||
|
await self.pipe.hset(series, mapping={k:v for k,v in send})
|
||||||
|
await self.push(len(send))
|
||||||
|
|
||||||
|
async def hdel(self, series, *keys):
|
||||||
|
while keys:
|
||||||
|
most = min(BATCH_SIZE-self.ops, len(keys))
|
||||||
|
assert most > 0
|
||||||
|
send = keys[:most]
|
||||||
|
keys = keys[most:]
|
||||||
|
await self.pipe.hdel(series, *send)
|
||||||
|
await self.push(len(send))
|
||||||
|
|
||||||
|
def __getattr__(self, item):
|
||||||
|
return getattr(self.pipe, item)
|
||||||
|
|
||||||
|
|
||||||
class Memcache:
|
class Memcache:
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@@ -19,7 +74,7 @@ class Memcache:
|
|||||||
# noinspection PyTypeChecker
|
# noinspection PyTypeChecker
|
||||||
current_redis.set(pipe)
|
current_redis.set(pipe)
|
||||||
try:
|
try:
|
||||||
yield pipe
|
yield PipelineProxy(pipe)
|
||||||
await pipe.execute()
|
await pipe.execute()
|
||||||
finally:
|
finally:
|
||||||
current_redis.set(old_redis)
|
current_redis.set(old_redis)
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ from dexorder.blockstate.blockdata import SeriesCollection, BlockData
|
|||||||
from dexorder.blockstate.diff import DiffEntryItem
|
from dexorder.blockstate.diff import DiffEntryItem
|
||||||
from dexorder.blockstate.fork import Fork
|
from dexorder.blockstate.fork import Fork
|
||||||
from dexorder.blockstate.state import compress_diffs
|
from dexorder.blockstate.state import compress_diffs
|
||||||
from dexorder.memcache import current_redis, memcache
|
from dexorder.memcache import current_redis, memcache, PipelineProxy
|
||||||
from dexorder.util import hexstr
|
from dexorder.util import hexstr
|
||||||
from dexorder.util.async_util import maywait
|
from dexorder.util.async_util import maywait
|
||||||
from dexorder.util.json import json_encoder
|
from dexorder.util.json import json_encoder
|
||||||
@@ -40,8 +40,7 @@ class RedisState (SeriesCollection):
|
|||||||
for series in self.datas.keys():
|
for series in self.datas.keys():
|
||||||
for k, v in state.iteritems(fork, series):
|
for k, v in state.iteritems(fork, series):
|
||||||
diffs.append(DiffItem(series, k, v))
|
diffs.append(DiffItem(series, k, v))
|
||||||
# todo tim fix pubs
|
await self.save(fork, diffs, use_transaction=False, skip_pubs=True) # use_transaction=False if the data is too big
|
||||||
await self.save(fork, diffs, use_transaction=True, skip_pubs=True) # use_transaction=False if the data is too big
|
|
||||||
|
|
||||||
|
|
||||||
# noinspection PyAsyncCall
|
# noinspection PyAsyncCall
|
||||||
@@ -92,18 +91,17 @@ class RedisState (SeriesCollection):
|
|||||||
hsets[series][key] = value
|
hsets[series][key] = value
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
async with memcache.batch(use_transaction) as r:
|
async with memcache.batch(use_transaction) as r:
|
||||||
# Redis pipelines fill up before our state can be sent, so we cannot do this atomically.
|
r: PipelineProxy
|
||||||
# However, sending many individual calls is super slow, so we
|
|
||||||
r: Pipeline
|
|
||||||
for series, keys in sadds.items():
|
for series, keys in sadds.items():
|
||||||
r.sadd(series, *keys)
|
await r.sadd(series, *keys)
|
||||||
for series, keys in sdels.items():
|
for series, keys in sdels.items():
|
||||||
r.srem(series, *keys)
|
await r.srem(series, *keys)
|
||||||
for series, kvs in hsets.items():
|
for series, kvs in hsets.items():
|
||||||
r.hset(series, mapping=kvs)
|
await r.hset(series, mapping=kvs)
|
||||||
for series, keys in hdels.items():
|
for series, keys in hdels.items():
|
||||||
r.hdel(series, *keys)
|
await r.hdel(series, *keys)
|
||||||
block_series = f'{chain_id}|head'
|
block_series = f'{chain_id}|head'
|
||||||
headstr = hexstr(fork.head)
|
headstr = hexstr(fork.head)
|
||||||
r.json(json_encoder).set(block_series,'$',[fork.height, headstr])
|
r.json(json_encoder).set(block_series,'$',[fork.height, headstr])
|
||||||
|
|||||||
@@ -394,8 +394,8 @@ class OHLCRepository:
|
|||||||
if trim > 0:
|
if trim > 0:
|
||||||
updated = updated[trim:]
|
updated = updated[trim:]
|
||||||
|
|
||||||
if len(updated) > 3:
|
# if len(updated) > 3:
|
||||||
log.debug(f'\tnew recents ({len(updated)}): {updated}')
|
# log.debug(f'\tnew recents ({len(updated)}): {updated}')
|
||||||
recent_ohlcs.setitem(key, updated)
|
recent_ohlcs.setitem(key, updated)
|
||||||
return updated
|
return updated
|
||||||
|
|
||||||
|
|||||||
@@ -3,7 +3,6 @@ from dataclasses import dataclass
|
|||||||
from typing import Optional, Union, Any
|
from typing import Optional, Union, Any
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
from triton.profiler import deactivate
|
|
||||||
from web3.exceptions import ContractPanicError, ContractLogicError
|
from web3.exceptions import ContractPanicError, ContractLogicError
|
||||||
from web3.types import EventData
|
from web3.types import EventData
|
||||||
|
|
||||||
|
|||||||
@@ -57,13 +57,13 @@ class OrderTriggers:
|
|||||||
self.order = order
|
self.order = order
|
||||||
self.triggers = triggers
|
self.triggers = triggers
|
||||||
OrderTriggers.instances[order.key] = self
|
OrderTriggers.instances[order.key] = self
|
||||||
log.debug(f'created OrderTriggers for {order.key}')
|
# log.debug(f'created OrderTriggers for {order.key}')
|
||||||
|
|
||||||
def disable(self):
|
def disable(self):
|
||||||
for t in self.triggers:
|
for t in self.triggers:
|
||||||
t.disable()
|
t.disable()
|
||||||
del OrderTriggers.instances[self.order.key]
|
del OrderTriggers.instances[self.order.key]
|
||||||
log.debug(f'disabled OrderTriggers for {self.order.key}')
|
# log.debug(f'disabled OrderTriggers for {self.order.key}')
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def closed(self):
|
def closed(self):
|
||||||
@@ -223,7 +223,7 @@ async def has_funds(tk: TrancheKey):
|
|||||||
# log.debug(f'balances {balances}')
|
# log.debug(f'balances {balances}')
|
||||||
token_addr = order.status.order.tokenIn
|
token_addr = order.status.order.tokenIn
|
||||||
token_balance = balances.get(token_addr)
|
token_balance = balances.get(token_addr)
|
||||||
log.debug(f'amount of {token_addr} = {token_balance}')
|
# log.debug(f'amount of {token_addr} = {token_balance}')
|
||||||
if token_balance is None:
|
if token_balance is None:
|
||||||
# unknown balance
|
# unknown balance
|
||||||
token_balance = balances[token_addr] = await ERC20(token_addr).balanceOf(tk.vault)
|
token_balance = balances[token_addr] = await ERC20(token_addr).balanceOf(tk.vault)
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ from datetime import timedelta
|
|||||||
from typing import Any, Iterable, Callable, Optional
|
from typing import Any, Iterable, Callable, Optional
|
||||||
|
|
||||||
from eth_bloom import BloomFilter
|
from eth_bloom import BloomFilter
|
||||||
# noinspection PyPackageRequirements
|
|
||||||
from websockets.exceptions import ConnectionClosedError
|
from websockets.exceptions import ConnectionClosedError
|
||||||
|
|
||||||
from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now, timestamp, metric
|
from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now, timestamp, metric
|
||||||
@@ -81,8 +80,7 @@ class BlockStateRunner(BlockProgressor):
|
|||||||
async with w3ws as w3ws:
|
async with w3ws as w3ws:
|
||||||
log.debug('connecting to ws provider')
|
log.debug('connecting to ws provider')
|
||||||
await w3ws.provider.connect()
|
await w3ws.provider.connect()
|
||||||
subscription = await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc.
|
await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc.
|
||||||
# log.debug(f'subscribed to newHeads {subscription}')
|
|
||||||
while self.running:
|
while self.running:
|
||||||
async for message in w3ws.ws.process_subscriptions():
|
async for message in w3ws.ws.process_subscriptions():
|
||||||
block = Block(chain_id, message['result'])
|
block = Block(chain_id, message['result'])
|
||||||
@@ -94,11 +92,15 @@ class BlockStateRunner(BlockProgressor):
|
|||||||
if not self.running:
|
if not self.running:
|
||||||
break
|
break
|
||||||
await async_yield()
|
await async_yield()
|
||||||
except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e:
|
except (TimeoutError, asyncio.TimeoutError) as e:
|
||||||
log.debug(f'runner timeout {e}')
|
log.debug(f'runner timeout {e}')
|
||||||
|
except ConnectionClosedError as e:
|
||||||
|
log.info(f'websocket connection closed {e}')
|
||||||
except ConnectionRefusedError:
|
except ConnectionRefusedError:
|
||||||
log.warning(f'Could not connect to websocket {config.ws_url}')
|
log.warning(f'Could not connect to websocket {config.ws_url}')
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
|
except StopAsyncIteration:
|
||||||
|
log.info(f'websocket stream ended')
|
||||||
except Exception:
|
except Exception:
|
||||||
log.exception(f'Unhandled exception during run_ws()')
|
log.exception(f'Unhandled exception during run_ws()')
|
||||||
finally:
|
finally:
|
||||||
@@ -397,21 +399,21 @@ class BlockStateRunner(BlockProgressor):
|
|||||||
# propragate to the DB or Redis.
|
# propragate to the DB or Redis.
|
||||||
# TIME TICKS ARE DISABLED FOR THIS REASON
|
# TIME TICKS ARE DISABLED FOR THIS REASON
|
||||||
return
|
return
|
||||||
current_fork.set(fork)
|
# current_fork.set(fork)
|
||||||
session = db.session
|
# session = db.session
|
||||||
session.begin()
|
# session.begin()
|
||||||
try:
|
# try:
|
||||||
for callback, on_timer in self.callbacks:
|
# for callback, on_timer in self.callbacks:
|
||||||
if on_timer:
|
# if on_timer:
|
||||||
# noinspection PyCallingNonCallable
|
# # noinspection PyCallingNonCallable
|
||||||
await maywait(callback())
|
# await maywait(callback())
|
||||||
except BaseException:
|
# except BaseException:
|
||||||
session.rollback()
|
# session.rollback()
|
||||||
raise
|
# raise
|
||||||
else:
|
# else:
|
||||||
session.commit()
|
# session.commit()
|
||||||
finally:
|
# finally:
|
||||||
db.close_session()
|
# db.close_session()
|
||||||
|
|
||||||
|
|
||||||
async def do_state_init_cbs(self):
|
async def do_state_init_cbs(self):
|
||||||
|
|||||||
Reference in New Issue
Block a user