finaldata parallelization
This commit is contained in:
@@ -26,9 +26,8 @@ ohlcs: FinalOHLCRepository
|
|||||||
|
|
||||||
async def handle_backfill_uniswap_swaps(swaps: list[EventData]):
|
async def handle_backfill_uniswap_swaps(swaps: list[EventData]):
|
||||||
# asynchronously prefetch the block timestamps we'll need
|
# asynchronously prefetch the block timestamps we'll need
|
||||||
block_ids = set(swap['blockHash'] for swap in swaps)
|
block_hashes = set(swap['blockHash'] for swap in swaps)
|
||||||
for batch in itertools.batched(block_ids, 4):
|
await asyncio.gather(*[get_block_timestamp(h) for h in block_hashes])
|
||||||
await asyncio.gather(*[get_block_timestamp(h) for h in batch])
|
|
||||||
|
|
||||||
# now execute the swaps synchronously
|
# now execute the swaps synchronously
|
||||||
for swap in swaps:
|
for swap in swaps:
|
||||||
|
|||||||
@@ -150,3 +150,5 @@ class RetryHTTPProvider (AsyncHTTPProvider):
|
|||||||
await asyncio.sleep(wait)
|
await asyncio.sleep(wait)
|
||||||
finally:
|
finally:
|
||||||
self.rate_allowed.set()
|
self.rate_allowed.set()
|
||||||
|
# finally:
|
||||||
|
# log.debug(f'Ended request of RPC call {method}')
|
||||||
|
|||||||
Reference in New Issue
Block a user