From 3ad8c883e5d1148e6a88a4efa568eb72ac815d53 Mon Sep 17 00:00:00 2001 From: Tim Date: Tue, 19 Mar 2024 21:22:45 -0400 Subject: [PATCH] mirror fixes --- src/dexorder/bin/mirror.py | 80 ++++++++++++++++++------- src/dexorder/blockchain/connection.py | 2 + src/dexorder/contract/contract_proxy.py | 6 +- 3 files changed, 65 insertions(+), 23 deletions(-) diff --git a/src/dexorder/bin/mirror.py b/src/dexorder/bin/mirror.py index d57bc91..31c3ca9 100644 --- a/src/dexorder/bin/mirror.py +++ b/src/dexorder/bin/mirror.py @@ -3,7 +3,7 @@ import logging import os from datetime import timedelta -from dexorder import config, blockchain, current_w3, now +from dexorder import config, blockchain, current_w3, now, ADDRESS_0 from dexorder.bin.executable import execute from dexorder.blockchain.connection import create_w3 from dexorder.blockstate import current_blockstate @@ -17,6 +17,7 @@ from dexorder.util.async_util import maywait log = logging.getLogger('dexorder') +LOTSA_GAS = 10_000_000 _token_infos = {} source_w3 = None @@ -65,9 +66,9 @@ async def get_pool_info( pool ): # uint256 amount1; # } p = UniswapV3Pool(pool) - t0, t1 = await asyncio.gather(get_token_info(p.token0()), get_token_info(p.token1())) + t0, t1 = await asyncio.gather(p.token0(), p.token1()) amount0, amount1, (price,*_), fee = \ - await asyncio.gather(ERC20(t0[0]).balanceOf(pool), ERC20(t1[0]).balanceOf(pool), p.slot0(), p.fee()) + await asyncio.gather(ERC20(t0).balanceOf(pool), ERC20(t1).balanceOf(pool), p.slot0(), p.fee()) return [pool, t0, t1, fee, price, amount0, amount1] async def write_metadata( pools, mirror_pools ): @@ -86,14 +87,13 @@ async def write_metadata( pools, mirror_pools ): log.info(f'wrote {filename}') -async def await_mirror(tx, pool_addr, mirror_addr, mirror_inverted ): - await tx.wait() - log.info(f'Updated {pool_addr} => {"1/" if mirror_inverted else ""}{mirror_addr}') +last_prices = {} - -async def send_update(mirrorenv, pool, price): - tx = await mirrorenv.transact.updatePool(pool, price) +async def complete_update(mirrorenv, pool, price, tx): await tx.wait() + last_prices[pool] = price + log.debug(f'Mirrored {pool} {price}') + async def main(): init_generating_metadata() @@ -103,6 +103,7 @@ async def main(): if config.mirror_source_rpc_url is None: log.error('Must configure mirror_source_rpc_url') return + delay = config.polling if config.polling > 0 else 1 global source_w3 source_w3 = await create_w3(config.mirror_source_rpc_url) pools = (config.mirror_pools or []) @@ -114,7 +115,9 @@ async def main(): config.account = '0x8b3a350cf5c34c9194ca85829a2df0ec3153be0318b5e2d3348e872092edffba' await blockchain.connect() current_blockstate.set(FinalizedBlockState()) + mirror_addr = config.mirror_env + log.info(f'Initializing with MirrorEnv {mirror_addr}') if mirror_addr is None: mirror_addr = os.environ.get('MIRRORENV') if mirror_addr is None: @@ -122,31 +125,68 @@ async def main(): if mirror_addr is None: log.error('must configure mirror_env or set envioronment MIRRORENV') return - log.info(f'Initializing with MirrorEnv {mirror_addr}') mirrorenv = ContractProxy(mirror_addr, 'MirrorEnv') - log.debug(f'Mirroring pools {", ".join(pools)}') + pool_infos = await asyncio.gather(*[get_pool_info(pool) for pool in pools]) - txs = await asyncio.gather(*[mirrorenv.transact.mirrorPool(info) for info in pool_infos]) + tokens = set(i[1] for i in pool_infos).union(i[2] for i in pool_infos) + + log.debug(f'Mirroring tokens') + token_infos = await asyncio.gather(*[get_token_info(t) for t in tokens], return_exceptions=True) + for token, x in zip(tokens, token_infos): + if isinstance(x, Exception): + log.error(f'Failed to mirror token {token}: {x}') + exit(1) + txs = [await mirrorenv.transact.mirrorToken(info, gas=LOTSA_GAS) for info in token_infos] + for token, x in zip(tokens, txs): + if isinstance(x, Exception): + log.error(f'Failed to mirror token {token}: {x}') + exit(1) await asyncio.gather(*[tx.wait() for tx in txs]) + log.info('Tokens deployed') + # for token in tokens: + # # log.debug(f'Mirroring token {token}') + # info = await get_token_info(token) + # tx = await mirrorenv.transact.mirrorToken(info, gas=LOTSA_GAS) + # await tx.wait() + # log.debug(f'Mirrored token {token}') + + log.debug(f'Mirroring pools {", ".join(pools)}') + to_mirror = list(pool_infos) + while to_mirror: + results = await asyncio.gather(*[(await mirrorenv.transact.mirrorPool(info, gas=LOTSA_GAS)).wait() + for info in to_mirror], return_exceptions=True) + pool_exceptions = [(p,r) for p,r in zip(to_mirror,results) if isinstance(r,Exception)] + for p,r in pool_exceptions: + log.error(f'Error mirroring {p}: {r}') + to_mirror = [p for p,r in pool_exceptions] + if to_mirror: + await asyncio.sleep(delay) mirror_pools = [] + # log.debug(f'Getting pool info {" ".join(pools)}') for pool in pools: mirror_addr, mirror_inverted = await mirrorenv.pools(pool) + if mirror_inverted == ADDRESS_0: + raise ValueError(f'Pool {pool} was not successfully mirrored') log.debug(f'\t{pool} => {mirror_addr} inverted={mirror_inverted}') mirror_pools.append((mirror_addr, mirror_inverted)) await write_metadata(pools, mirror_pools) - delay = config.polling if config.polling > 0 else 1 - log.info(f'Mirroring pools every {delay} seconds') + + log.info(f'Updating pools every {delay} seconds') delay = timedelta(seconds=delay) + to_update = pools while True: wake_up = now() + delay - prices = await asyncio.gather(*[get_pool_price(pool) for pool in pools]) - updates = [send_update(mirrorenv, pool, price) for pool, price in zip(pools, prices)] - results = await asyncio.gather(*updates, return_exceptions=True) - for result, pool, price in zip(results,pools,prices): + prices = await asyncio.gather(*[get_pool_price(pool) for pool in to_update]) + updates = [(pool, price, await mirrorenv.transact.updatePool(pool, price, gas=LOTSA_GAS)) + for pool, price in zip(to_update, prices) if price != last_prices.get(pool)] + results = await asyncio.gather(*[complete_update(mirrorenv, pool, price, tx) + for (pool,price,tx) in updates], return_exceptions=True) + failed = [] + for result, pool, price in zip(results,to_update,prices): if isinstance(result, Exception): log.debug(f'Could not update {pool}: {result}') - else: - log.debug(f'Mirrored {pool} {price}') + failed.append(pool) + to_update = failed if failed else pools sleep = (wake_up - now()).total_seconds() if sleep > 0: await asyncio.sleep(sleep) diff --git a/src/dexorder/blockchain/connection.py b/src/dexorder/blockchain/connection.py index 5575b4d..ece18c9 100644 --- a/src/dexorder/blockchain/connection.py +++ b/src/dexorder/blockchain/connection.py @@ -38,6 +38,7 @@ async def create_w3(rpc_url=None, account=NARG, autosign=False): w3 = AsyncWeb3(RetryHTTPProvider(url)) # w3.middleware_onion.inject(geth_poa_middleware, layer=0) # todo is this line needed? # w3.middleware_onion.add(simple_cache_middleware) + # log.debug(f'middleware {list(w3.middleware_onion.middlewares)}') w3.middleware_onion.remove('attrdict') w3.middleware_onion.add(clean_input_async, 'clean_input') w3.eth.Contract = _make_contract(w3.eth) @@ -128,6 +129,7 @@ class RetryHTTPProvider (AsyncHTTPProvider): try: async with self.in_flight: await self.rate_allowed.wait() + # log.debug(f'Requesting RPC call {method}') return await super().make_request(method, params) except ClientResponseError as e: if e.status != 429: diff --git a/src/dexorder/contract/contract_proxy.py b/src/dexorder/contract/contract_proxy.py index 18f9807..f8117ba 100644 --- a/src/dexorder/contract/contract_proxy.py +++ b/src/dexorder/contract/contract_proxy.py @@ -53,7 +53,7 @@ def call_wrapper(addr, name, func): except LookupError: blockhash = 'latest' try: - return await func(*args, **kwargs).call(block_identifier=blockhash) + return await func(*args).call(block_identifier=blockhash, **kwargs) except Web3Exception as e: e.args += addr, name raise e @@ -63,7 +63,7 @@ def call_wrapper(addr, name, func): def transact_wrapper(addr, name, func): async def f(*args, **kwargs): try: - tx_id = await func(*args, **kwargs).transact() + tx_id = await func(*args).transact(kwargs) except Web3Exception as e: e.args += addr, name raise e @@ -77,7 +77,7 @@ def build_wrapper(addr, name, func): account = current_account.get() except LookupError: raise RuntimeError(f'Cannot invoke transaction {addr}.{name}() without setting an Account.') - tx = await func(*args, **kwargs).build_transaction() + tx = await func(*args).build_transaction(kwargs) tx['from'] = account.address tx['nonce'] = await account.next_nonce() signed = eth_account.Account.sign_transaction(tx, private_key=account.key)