From 1b5d1e334717897d8f468a0345588ff62cae01eb Mon Sep 17 00:00:00 2001 From: Tim Date: Sat, 27 Jan 2024 18:39:11 -0400 Subject: [PATCH] backfill dynamic batch size --- src/dexorder/runner.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 6176185..20c22a4 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -22,6 +22,8 @@ from dexorder.util.async_util import maywait, Maywaitable log = logging.getLogger(__name__) +class Retry (Exception): ... + # todo detect reorgs and generate correct onHeadUpdate set by unioning the changes along the two forks, not including their common ancestor deltas class BlockStateRunner: @@ -247,6 +249,8 @@ class BlockStateRunner: try: await self.handle_head(chain, head, w3) prev_head = head + except Retry: + pass except Exception as x: log.exception(x) except Exception: @@ -333,12 +337,20 @@ class BlockStateRunner: if future is None: await maywait(callback()) # non-log callback else: - log_events = await future if config.parallel_logevent_queries else future + try: + log_events = await future if config.parallel_logevent_queries else future + except ValueError as e: + if e.args[0].get('code') == -32602: + # too many logs were returned in the batch, so decrease the batch size. + batch_size = int(chain.batch_size * 0.9) + chain.batch_size = batch_size + log.info(f'Decreasing batch size for {chain} to {batch_size}') + raise Retry for log_event in log_events: try: parsed = event.process_log(log_event) if event is not None else log_event - except (LogTopicError, MismatchedABI) as x: - log.warning(f'logevent parse error {x}\n{log_event}') + except (LogTopicError, MismatchedABI) as e: + log.warning(f'logevent parse error {e}\n{log_event}') else: # todo try/except for known retryable errors await maywait(callback(parsed))