metadata db load fixes

This commit is contained in:
Tim
2024-04-18 17:17:19 -04:00
parent 84e194536a
commit 756d176a18
5 changed files with 27 additions and 16 deletions

View File

@@ -44,8 +44,8 @@ async def main():
log.error('backfill requires a memcache server') log.error('backfill requires a memcache server')
await memcache.connect() await memcache.connect()
redis_state = RedisState([recent_ohlcs]) # NOTE: ONLY the ohlc's are pushed to Redis. We do not want to touch anything else. redis_state = RedisState([recent_ohlcs]) # NOTE: ONLY the ohlc's are pushed to Redis. We do not want to touch anything else.
db.connect()
if db: if db:
db.connect()
db_state = DbState(BlockData.by_opt('db')) db_state = DbState(BlockData.by_opt('db'))
with db.session: with db.session:
state = await db_state.load() state = await db_state.load()

View File

@@ -1,7 +1,7 @@
import logging import logging
from asyncio import CancelledError from asyncio import CancelledError
from dexorder import db, blockchain from dexorder import db, blockchain, config
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.bin.executable import execute from dexorder.bin.executable import execute
from dexorder.blockstate import current_blockstate from dexorder.blockstate import current_blockstate
@@ -78,8 +78,8 @@ async def main():
if memcache: if memcache:
await memcache.connect() await memcache.connect()
redis_state = RedisState(BlockData.by_opt('redis')) redis_state = RedisState(BlockData.by_opt('redis'))
db.connect()
if db: if db:
db.connect()
db_state = DbState(BlockData.by_opt('db')) db_state = DbState(BlockData.by_opt('db'))
with db.session: with db.session:
state = await db_state.load() state = await db_state.load()

View File

@@ -45,9 +45,10 @@ class Db:
def __init__(self, db_url_config_key='db_url'): def __init__(self, db_url_config_key='db_url'):
self.kv = Kv() self.kv = Kv()
self.db_url_config_key = db_url_config_key self.db_url_config_key = db_url_config_key
self.connected = False
def __bool__(self): def __bool__(self):
return bool(config.db_url) return self.connected
def transaction(self) -> SessionTransaction: def transaction(self) -> SessionTransaction:
""" """
@@ -95,9 +96,16 @@ class Db:
# noinspection PyShadowingNames # noinspection PyShadowingNames
def connect(self, url=None, migrate=True, reconnect=False, dump_sql=None): def connect(self, url=None, migrate=True, reconnect=False, dump_sql=None):
if _engine.get() is not None and not reconnect: if _engine.get() is not None and not reconnect:
return return None
if url is None: if url is None:
url = config[self.db_url_config_key] url = config.get(self.db_url_config_key)
if not url:
# noinspection PyTypeChecker
_engine.set(None)
# noinspection PyTypeChecker
_session.set(None)
self.connected = False
return None
if dump_sql is None: if dump_sql is None:
dump_sql = config.dump_sql dump_sql = config.dump_sql
engine = sqlalchemy.create_engine(url, echo=dump_sql, json_serializer=json.dumps, json_deserializer=json.loads) engine = sqlalchemy.create_engine(url, echo=dump_sql, json_serializer=json.dumps, json_deserializer=json.loads)
@@ -109,7 +117,8 @@ class Db:
for row in result: for row in result:
log.info(f'{url} database revision {row[0]}') log.info(f'{url} database revision {row[0]}')
_engine.set(engine) _engine.set(engine)
return db self.connected = True
return self
raise Exception(f'{url} database version not found') raise Exception(f'{url} database version not found')
db = Db() db = Db()

View File

@@ -35,10 +35,11 @@ async def load_pool(address: str) -> OldPoolDict:
found = None found = None
chain_id = current_chain.get().id chain_id = current_chain.get().id
# todo other exchanges # todo other exchanges
pool = db.session.get(Pool, (chain_id, address)) if db:
if pool is not None: pool = db.session.get(Pool, (chain_id, address))
return OldPoolDict(type='Pool', chain=chain_id, address=address, exchange=pool.exchange.value, if pool is not None:
base=pool.base, quote=pool.quote, fee=pool.fee, decimals=pool.decimals) return OldPoolDict(type='Pool', chain=chain_id, address=address, exchange=pool.exchange.value,
base=pool.base, quote=pool.quote, fee=pool.fee, decimals=pool.decimals)
try: try:
v3 = UniswapV3Pool(address) v3 = UniswapV3Pool(address)
t0, t1 = await asyncio.gather(v3.token0(), v3.token1()) t0, t1 = await asyncio.gather(v3.token0(), v3.token1())

View File

@@ -29,11 +29,12 @@ async def get_token(address) -> Optional[OldTokenDict]:
async def load_token(address: str) -> Optional[OldTokenDict]: async def load_token(address: str) -> Optional[OldTokenDict]:
contract = ERC20(address) contract = ERC20(address)
chain_id = current_chain.get().id chain_id = current_chain.get().id
token = db.session.get(Token, (chain_id, address)) if db:
if token is not None: token = db.session.get(Token, (chain_id, address))
return OldTokenDict(type='Token', chain=chain_id, address=address, if token is not None:
name=token.name, symbol=token.symbol, decimals=token.decimals, return OldTokenDict(type='Token', chain=chain_id, address=address,
approved=token.approved) name=token.name, symbol=token.symbol, decimals=token.decimals,
approved=token.approved)
async def get_string_or_bytes32(func_name): async def get_string_or_bytes32(func_name):
try: try:
result = await getattr(contract, func_name)() result = await getattr(contract, func_name)()