diff --git a/requirements-lock.txt b/requirements-lock.txt index 6c685f0..e738108 100644 --- a/requirements-lock.txt +++ b/requirements-lock.txt @@ -37,6 +37,8 @@ parsimonious==0.9.0 protobuf==4.25.2 psycopg2-binary==2.9.9 pycryptodome==3.20.0 +python-dateutil==2.9.0.post0 +pytz==2024.1 pyunormalize==15.1.0 PyYAML==6.0.1 redis==5.0.3 @@ -45,6 +47,7 @@ regex==2023.12.25 requests==2.31.0 rlp==4.0.0 rpds-py==0.17.1 +six==1.16.0 socket.io-emitter==0.1.5.1 sortedcontainers==2.4.0 SQLAlchemy==2.0.28 diff --git a/requirements.txt b/requirements.txt index b0ebf9b..9c3b04c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,3 +13,4 @@ websockets cachetools async-lru eth-bloom +python-dateutil diff --git a/src/dexorder/bin/block_for_time.py b/src/dexorder/bin/block_for_time.py new file mode 100644 index 0000000..bae3845 --- /dev/null +++ b/src/dexorder/bin/block_for_time.py @@ -0,0 +1,40 @@ +import logging +import sys +from datetime import timezone + +from dexorder import blockchain, from_timestamp +from dexorder.base.chain import current_chain +from dexorder.bin.executable import execute +from dateutil.parser import parse as parse_date + +from dexorder.database.model import Block + +log = logging.getLogger(__name__) + + +async def main(): + log.debug(f'Finding block nearest to {time}') + w3 = await blockchain.connect() + chain_id = current_chain.get().chain_id + blockdata = await w3.eth.get_block('latest') + latest = cur = Block.from_data(chain_id, blockdata) + while True: + cur_time = from_timestamp(cur.timestamp) + delta = (time - cur_time).total_seconds() + estimated = round(cur.height + delta/seconds_per_block) + if estimated > latest.height: + print(f'Estimated block is after the latest: {latest.height}') + exit(0) + elif estimated == cur.height: + print(f'Closest block to {time}: {cur.height} {cur_time}') + exit(0) + cur = Block.from_data(chain_id, await w3.eth.get_block(estimated)) + +if __name__ == '__main__': + if len(sys.argv) < 3: + log.error("Usage: python -m dexorder.bin.block_for_time ") + exit(1) + time = parse_date(sys.argv[1], ignoretz=True).replace(tzinfo=timezone.utc) + seconds_per_block = float(sys.argv[2]) + sys.argv = [sys.argv[0], *sys.argv[3:]] + execute(main()) diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index 064bcaa..66a3158 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -33,6 +33,7 @@ class Config: walker_name: str = 'default' walker_flush_interval: float = 300 + walker_stop: Optional[int] = None # block number of the last block the walker should process mirror_source_rpc_url: Optional[str] = None # source RPC for original pools mirror_pools: list[str] = field(default_factory=list) diff --git a/src/dexorder/database/model/block.py b/src/dexorder/database/model/block.py index a858ca2..a0d21cc 100644 --- a/src/dexorder/database/model/block.py +++ b/src/dexorder/database/model/block.py @@ -12,7 +12,7 @@ class Block(Base): @staticmethod def from_data(chain_id:int, data:dict): """ Builds a Block using the response data from an RPC server """ - return Block(chain=chain_id, height=int(data['number'],0), + return Block(chain=chain_id, height=data['number'] if type(data['number']) is int else int(data['number'],0), hash=hexstr(data['hash']), parent=hexstr(data['parentHash']), data=data) chain: Mapped[int] = mapped_column(primary_key=True) diff --git a/src/dexorder/walker.py b/src/dexorder/walker.py index 1210b1c..cfacd21 100644 --- a/src/dexorder/walker.py +++ b/src/dexorder/walker.py @@ -4,11 +4,8 @@ from asyncio import Queue from datetime import timedelta from typing import Union, Callable -from websockets import ConnectionClosedError - -from dexorder import Blockchain, config, db, now, current_w3 +from dexorder import config, db, now, current_w3 from dexorder.base.chain import current_chain -from dexorder.blockchain.connection import create_w3 from dexorder.blockstate import current_blockstate from dexorder.blockstate.state import FinalizedBlockState from dexorder.database.model import Block @@ -76,8 +73,11 @@ class BlockWalker (BlockProgressor): prev_height = latest_height log.debug(f'polled new block {latest_height}') promotion_height = latest_height - confirm_offset - while processed_height < promotion_height: + while (processed_height < promotion_height and + (config.walker_stop is None or processed_height < config.walker_stop)): cur_height = min(promotion_height, processed_height+batch_size-1) + if config.walker_stop is not None: + cur_height = min(cur_height, config.walker_stop) block_data = await w3.eth.get_block(cur_height) block = Block.from_data(chain_id, block_data) assert block.height == cur_height @@ -98,12 +98,7 @@ class BlockWalker (BlockProgressor): db.session.commit() db.session.begin() processed_height = cur_height - # if self.latest_callback: - # latest_block = Block(chain=chain.chain_id, height=latest_height, - # hash=latest_rawblock['hash'], parent=latest_rawblock['parentHash'], - # data=latest_rawblock) - # self.latest_callback(latest_block) - if not self.running: + if not self.running or config.walker_stop is not None and config.walker_stop <= processed_height: break await asyncio.sleep(config.polling or 1) except Exception: