backfill dynamic batch size
This commit is contained in:
@@ -22,6 +22,8 @@ from dexorder.util.async_util import maywait, Maywaitable
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
class Retry (Exception): ...
|
||||
|
||||
# todo detect reorgs and generate correct onHeadUpdate set by unioning the changes along the two forks, not including their common ancestor deltas
|
||||
class BlockStateRunner:
|
||||
|
||||
@@ -247,6 +249,8 @@ class BlockStateRunner:
|
||||
try:
|
||||
await self.handle_head(chain, head, w3)
|
||||
prev_head = head
|
||||
except Retry:
|
||||
pass
|
||||
except Exception as x:
|
||||
log.exception(x)
|
||||
except Exception:
|
||||
@@ -333,12 +337,20 @@ class BlockStateRunner:
|
||||
if future is None:
|
||||
await maywait(callback()) # non-log callback
|
||||
else:
|
||||
log_events = await future if config.parallel_logevent_queries else future
|
||||
try:
|
||||
log_events = await future if config.parallel_logevent_queries else future
|
||||
except ValueError as e:
|
||||
if e.args[0].get('code') == -32602:
|
||||
# too many logs were returned in the batch, so decrease the batch size.
|
||||
batch_size = int(chain.batch_size * 0.9)
|
||||
chain.batch_size = batch_size
|
||||
log.info(f'Decreasing batch size for {chain} to {batch_size}')
|
||||
raise Retry
|
||||
for log_event in log_events:
|
||||
try:
|
||||
parsed = event.process_log(log_event) if event is not None else log_event
|
||||
except (LogTopicError, MismatchedABI) as x:
|
||||
log.warning(f'logevent parse error {x}\n{log_event}')
|
||||
except (LogTopicError, MismatchedABI) as e:
|
||||
log.warning(f'logevent parse error {e}\n{log_event}')
|
||||
else:
|
||||
# todo try/except for known retryable errors
|
||||
await maywait(callback(parsed))
|
||||
|
||||
Reference in New Issue
Block a user