mirror fixes

This commit is contained in:
Tim
2024-03-19 21:22:45 -04:00
parent 6a5950b18b
commit 3ad8c883e5
3 changed files with 65 additions and 23 deletions

View File

@@ -3,7 +3,7 @@ import logging
import os
from datetime import timedelta
from dexorder import config, blockchain, current_w3, now
from dexorder import config, blockchain, current_w3, now, ADDRESS_0
from dexorder.bin.executable import execute
from dexorder.blockchain.connection import create_w3
from dexorder.blockstate import current_blockstate
@@ -17,6 +17,7 @@ from dexorder.util.async_util import maywait
log = logging.getLogger('dexorder')
LOTSA_GAS = 10_000_000
_token_infos = {}
source_w3 = None
@@ -65,9 +66,9 @@ async def get_pool_info( pool ):
# uint256 amount1;
# }
p = UniswapV3Pool(pool)
t0, t1 = await asyncio.gather(get_token_info(p.token0()), get_token_info(p.token1()))
t0, t1 = await asyncio.gather(p.token0(), p.token1())
amount0, amount1, (price,*_), fee = \
await asyncio.gather(ERC20(t0[0]).balanceOf(pool), ERC20(t1[0]).balanceOf(pool), p.slot0(), p.fee())
await asyncio.gather(ERC20(t0).balanceOf(pool), ERC20(t1).balanceOf(pool), p.slot0(), p.fee())
return [pool, t0, t1, fee, price, amount0, amount1]
async def write_metadata( pools, mirror_pools ):
@@ -86,14 +87,13 @@ async def write_metadata( pools, mirror_pools ):
log.info(f'wrote {filename}')
async def await_mirror(tx, pool_addr, mirror_addr, mirror_inverted ):
await tx.wait()
log.info(f'Updated {pool_addr} => {"1/" if mirror_inverted else ""}{mirror_addr}')
last_prices = {}
async def send_update(mirrorenv, pool, price):
tx = await mirrorenv.transact.updatePool(pool, price)
async def complete_update(mirrorenv, pool, price, tx):
await tx.wait()
last_prices[pool] = price
log.debug(f'Mirrored {pool} {price}')
async def main():
init_generating_metadata()
@@ -103,6 +103,7 @@ async def main():
if config.mirror_source_rpc_url is None:
log.error('Must configure mirror_source_rpc_url')
return
delay = config.polling if config.polling > 0 else 1
global source_w3
source_w3 = await create_w3(config.mirror_source_rpc_url)
pools = (config.mirror_pools or [])
@@ -114,7 +115,9 @@ async def main():
config.account = '0x8b3a350cf5c34c9194ca85829a2df0ec3153be0318b5e2d3348e872092edffba'
await blockchain.connect()
current_blockstate.set(FinalizedBlockState())
mirror_addr = config.mirror_env
log.info(f'Initializing with MirrorEnv {mirror_addr}')
if mirror_addr is None:
mirror_addr = os.environ.get('MIRRORENV')
if mirror_addr is None:
@@ -122,31 +125,68 @@ async def main():
if mirror_addr is None:
log.error('must configure mirror_env or set envioronment MIRRORENV')
return
log.info(f'Initializing with MirrorEnv {mirror_addr}')
mirrorenv = ContractProxy(mirror_addr, 'MirrorEnv')
log.debug(f'Mirroring pools {", ".join(pools)}')
pool_infos = await asyncio.gather(*[get_pool_info(pool) for pool in pools])
txs = await asyncio.gather(*[mirrorenv.transact.mirrorPool(info) for info in pool_infos])
tokens = set(i[1] for i in pool_infos).union(i[2] for i in pool_infos)
log.debug(f'Mirroring tokens')
token_infos = await asyncio.gather(*[get_token_info(t) for t in tokens], return_exceptions=True)
for token, x in zip(tokens, token_infos):
if isinstance(x, Exception):
log.error(f'Failed to mirror token {token}: {x}')
exit(1)
txs = [await mirrorenv.transact.mirrorToken(info, gas=LOTSA_GAS) for info in token_infos]
for token, x in zip(tokens, txs):
if isinstance(x, Exception):
log.error(f'Failed to mirror token {token}: {x}')
exit(1)
await asyncio.gather(*[tx.wait() for tx in txs])
log.info('Tokens deployed')
# for token in tokens:
# # log.debug(f'Mirroring token {token}')
# info = await get_token_info(token)
# tx = await mirrorenv.transact.mirrorToken(info, gas=LOTSA_GAS)
# await tx.wait()
# log.debug(f'Mirrored token {token}')
log.debug(f'Mirroring pools {", ".join(pools)}')
to_mirror = list(pool_infos)
while to_mirror:
results = await asyncio.gather(*[(await mirrorenv.transact.mirrorPool(info, gas=LOTSA_GAS)).wait()
for info in to_mirror], return_exceptions=True)
pool_exceptions = [(p,r) for p,r in zip(to_mirror,results) if isinstance(r,Exception)]
for p,r in pool_exceptions:
log.error(f'Error mirroring {p}: {r}')
to_mirror = [p for p,r in pool_exceptions]
if to_mirror:
await asyncio.sleep(delay)
mirror_pools = []
# log.debug(f'Getting pool info {" ".join(pools)}')
for pool in pools:
mirror_addr, mirror_inverted = await mirrorenv.pools(pool)
if mirror_inverted == ADDRESS_0:
raise ValueError(f'Pool {pool} was not successfully mirrored')
log.debug(f'\t{pool} => {mirror_addr} inverted={mirror_inverted}')
mirror_pools.append((mirror_addr, mirror_inverted))
await write_metadata(pools, mirror_pools)
delay = config.polling if config.polling > 0 else 1
log.info(f'Mirroring pools every {delay} seconds')
log.info(f'Updating pools every {delay} seconds')
delay = timedelta(seconds=delay)
to_update = pools
while True:
wake_up = now() + delay
prices = await asyncio.gather(*[get_pool_price(pool) for pool in pools])
updates = [send_update(mirrorenv, pool, price) for pool, price in zip(pools, prices)]
results = await asyncio.gather(*updates, return_exceptions=True)
for result, pool, price in zip(results,pools,prices):
prices = await asyncio.gather(*[get_pool_price(pool) for pool in to_update])
updates = [(pool, price, await mirrorenv.transact.updatePool(pool, price, gas=LOTSA_GAS))
for pool, price in zip(to_update, prices) if price != last_prices.get(pool)]
results = await asyncio.gather(*[complete_update(mirrorenv, pool, price, tx)
for (pool,price,tx) in updates], return_exceptions=True)
failed = []
for result, pool, price in zip(results,to_update,prices):
if isinstance(result, Exception):
log.debug(f'Could not update {pool}: {result}')
else:
log.debug(f'Mirrored {pool} {price}')
failed.append(pool)
to_update = failed if failed else pools
sleep = (wake_up - now()).total_seconds()
if sleep > 0:
await asyncio.sleep(sleep)

View File

@@ -38,6 +38,7 @@ async def create_w3(rpc_url=None, account=NARG, autosign=False):
w3 = AsyncWeb3(RetryHTTPProvider(url))
# w3.middleware_onion.inject(geth_poa_middleware, layer=0) # todo is this line needed?
# w3.middleware_onion.add(simple_cache_middleware)
# log.debug(f'middleware {list(w3.middleware_onion.middlewares)}')
w3.middleware_onion.remove('attrdict')
w3.middleware_onion.add(clean_input_async, 'clean_input')
w3.eth.Contract = _make_contract(w3.eth)
@@ -128,6 +129,7 @@ class RetryHTTPProvider (AsyncHTTPProvider):
try:
async with self.in_flight:
await self.rate_allowed.wait()
# log.debug(f'Requesting RPC call {method}')
return await super().make_request(method, params)
except ClientResponseError as e:
if e.status != 429:

View File

@@ -53,7 +53,7 @@ def call_wrapper(addr, name, func):
except LookupError:
blockhash = 'latest'
try:
return await func(*args, **kwargs).call(block_identifier=blockhash)
return await func(*args).call(block_identifier=blockhash, **kwargs)
except Web3Exception as e:
e.args += addr, name
raise e
@@ -63,7 +63,7 @@ def call_wrapper(addr, name, func):
def transact_wrapper(addr, name, func):
async def f(*args, **kwargs):
try:
tx_id = await func(*args, **kwargs).transact()
tx_id = await func(*args).transact(kwargs)
except Web3Exception as e:
e.args += addr, name
raise e
@@ -77,7 +77,7 @@ def build_wrapper(addr, name, func):
account = current_account.get()
except LookupError:
raise RuntimeError(f'Cannot invoke transaction {addr}.{name}() without setting an Account.')
tx = await func(*args, **kwargs).build_transaction()
tx = await func(*args).build_transaction(kwargs)
tx['from'] = account.address
tx['nonce'] = await account.next_nonce()
signed = eth_account.Account.sign_transaction(tx, private_key=account.key)