walker_stop; bin.block_for_time

This commit is contained in:
Tim
2024-03-27 14:23:11 -04:00
parent f22f8bf017
commit 38d48a8f5f
6 changed files with 52 additions and 12 deletions

View File

@@ -37,6 +37,8 @@ parsimonious==0.9.0
protobuf==4.25.2 protobuf==4.25.2
psycopg2-binary==2.9.9 psycopg2-binary==2.9.9
pycryptodome==3.20.0 pycryptodome==3.20.0
python-dateutil==2.9.0.post0
pytz==2024.1
pyunormalize==15.1.0 pyunormalize==15.1.0
PyYAML==6.0.1 PyYAML==6.0.1
redis==5.0.3 redis==5.0.3
@@ -45,6 +47,7 @@ regex==2023.12.25
requests==2.31.0 requests==2.31.0
rlp==4.0.0 rlp==4.0.0
rpds-py==0.17.1 rpds-py==0.17.1
six==1.16.0
socket.io-emitter==0.1.5.1 socket.io-emitter==0.1.5.1
sortedcontainers==2.4.0 sortedcontainers==2.4.0
SQLAlchemy==2.0.28 SQLAlchemy==2.0.28

View File

@@ -13,3 +13,4 @@ websockets
cachetools cachetools
async-lru async-lru
eth-bloom eth-bloom
python-dateutil

View File

@@ -0,0 +1,40 @@
import logging
import sys
from datetime import timezone
from dexorder import blockchain, from_timestamp
from dexorder.base.chain import current_chain
from dexorder.bin.executable import execute
from dateutil.parser import parse as parse_date
from dexorder.database.model import Block
log = logging.getLogger(__name__)
async def main():
log.debug(f'Finding block nearest to {time}')
w3 = await blockchain.connect()
chain_id = current_chain.get().chain_id
blockdata = await w3.eth.get_block('latest')
latest = cur = Block.from_data(chain_id, blockdata)
while True:
cur_time = from_timestamp(cur.timestamp)
delta = (time - cur_time).total_seconds()
estimated = round(cur.height + delta/seconds_per_block)
if estimated > latest.height:
print(f'Estimated block is after the latest: {latest.height}')
exit(0)
elif estimated == cur.height:
print(f'Closest block to {time}: {cur.height} {cur_time}')
exit(0)
cur = Block.from_data(chain_id, await w3.eth.get_block(estimated))
if __name__ == '__main__':
if len(sys.argv) < 3:
log.error("Usage: python -m dexorder.bin.block_for_time <target_time> <seconds_per_block>")
exit(1)
time = parse_date(sys.argv[1], ignoretz=True).replace(tzinfo=timezone.utc)
seconds_per_block = float(sys.argv[2])
sys.argv = [sys.argv[0], *sys.argv[3:]]
execute(main())

View File

@@ -33,6 +33,7 @@ class Config:
walker_name: str = 'default' walker_name: str = 'default'
walker_flush_interval: float = 300 walker_flush_interval: float = 300
walker_stop: Optional[int] = None # block number of the last block the walker should process
mirror_source_rpc_url: Optional[str] = None # source RPC for original pools mirror_source_rpc_url: Optional[str] = None # source RPC for original pools
mirror_pools: list[str] = field(default_factory=list) mirror_pools: list[str] = field(default_factory=list)

View File

@@ -12,7 +12,7 @@ class Block(Base):
@staticmethod @staticmethod
def from_data(chain_id:int, data:dict): def from_data(chain_id:int, data:dict):
""" Builds a Block using the response data from an RPC server """ """ Builds a Block using the response data from an RPC server """
return Block(chain=chain_id, height=int(data['number'],0), return Block(chain=chain_id, height=data['number'] if type(data['number']) is int else int(data['number'],0),
hash=hexstr(data['hash']), parent=hexstr(data['parentHash']), data=data) hash=hexstr(data['hash']), parent=hexstr(data['parentHash']), data=data)
chain: Mapped[int] = mapped_column(primary_key=True) chain: Mapped[int] = mapped_column(primary_key=True)

View File

@@ -4,11 +4,8 @@ from asyncio import Queue
from datetime import timedelta from datetime import timedelta
from typing import Union, Callable from typing import Union, Callable
from websockets import ConnectionClosedError from dexorder import config, db, now, current_w3
from dexorder import Blockchain, config, db, now, current_w3
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.blockchain.connection import create_w3
from dexorder.blockstate import current_blockstate from dexorder.blockstate import current_blockstate
from dexorder.blockstate.state import FinalizedBlockState from dexorder.blockstate.state import FinalizedBlockState
from dexorder.database.model import Block from dexorder.database.model import Block
@@ -76,8 +73,11 @@ class BlockWalker (BlockProgressor):
prev_height = latest_height prev_height = latest_height
log.debug(f'polled new block {latest_height}') log.debug(f'polled new block {latest_height}')
promotion_height = latest_height - confirm_offset promotion_height = latest_height - confirm_offset
while processed_height < promotion_height: while (processed_height < promotion_height and
(config.walker_stop is None or processed_height < config.walker_stop)):
cur_height = min(promotion_height, processed_height+batch_size-1) cur_height = min(promotion_height, processed_height+batch_size-1)
if config.walker_stop is not None:
cur_height = min(cur_height, config.walker_stop)
block_data = await w3.eth.get_block(cur_height) block_data = await w3.eth.get_block(cur_height)
block = Block.from_data(chain_id, block_data) block = Block.from_data(chain_id, block_data)
assert block.height == cur_height assert block.height == cur_height
@@ -98,12 +98,7 @@ class BlockWalker (BlockProgressor):
db.session.commit() db.session.commit()
db.session.begin() db.session.begin()
processed_height = cur_height processed_height = cur_height
# if self.latest_callback: if not self.running or config.walker_stop is not None and config.walker_stop <= processed_height:
# latest_block = Block(chain=chain.chain_id, height=latest_height,
# hash=latest_rawblock['hash'], parent=latest_rawblock['parentHash'],
# data=latest_rawblock)
# self.latest_callback(latest_block)
if not self.running:
break break
await asyncio.sleep(config.polling or 1) await asyncio.sleep(config.polling or 1)
except Exception: except Exception: