diff --git a/src/dexorder/bin/finaldata.py b/src/dexorder/bin/finaldata.py index a057de9..38f5bb6 100644 --- a/src/dexorder/bin/finaldata.py +++ b/src/dexorder/bin/finaldata.py @@ -46,7 +46,7 @@ def flush_callback(): if latest_block.get().height - block.height <= 2*confirms: log.info(f'forward filling to present time') for addr, data in address_metadata.items(): - if data['type'] == 'Pool': + if data['type'] == 'Pool' and data['exchange'] >= 0: ohlcs.light_update_all(addr, from_timestamp(block.timestamp), None) log.info("flushing OHLC's") ohlcs.flush() diff --git a/src/dexorder/contract/contract_proxy.py b/src/dexorder/contract/contract_proxy.py index 1628bd1..18f9807 100644 --- a/src/dexorder/contract/contract_proxy.py +++ b/src/dexorder/contract/contract_proxy.py @@ -13,6 +13,7 @@ from dexorder.util import hexstr log = logging.getLogger(__name__) + class ContractTransaction: def __init__(self, id_bytes: bytes, rawtx: Optional[bytes] = None): self.id_bytes = id_bytes @@ -54,7 +55,7 @@ def call_wrapper(addr, name, func): try: return await func(*args, **kwargs).call(block_identifier=blockhash) except Web3Exception as e: - log.error(f"Exception calling {addr}.{name}()") + e.args += addr, name raise e return f @@ -64,7 +65,7 @@ def transact_wrapper(addr, name, func): try: tx_id = await func(*args, **kwargs).transact() except Web3Exception as e: - log.error(f'Exception transacting {addr}.{name}()') + e.args += addr, name raise e return ContractTransaction(tx_id) return f diff --git a/src/dexorder/database/model/pool.py b/src/dexorder/database/model/pool.py index 58a98d1..fea0e75 100644 --- a/src/dexorder/database/model/pool.py +++ b/src/dexorder/database/model/pool.py @@ -19,8 +19,6 @@ class PoolDict (TypedDict): quote: str fee: int decimals: int - approved: bool # whether this pool has only whitelisted tokens - liquidity: Optional[int] class Pool (Base): diff --git a/src/dexorder/ohlc.py b/src/dexorder/ohlc.py index 589cdf7..ec5023b 100644 --- a/src/dexorder/ohlc.py +++ b/src/dexorder/ohlc.py @@ -1,6 +1,5 @@ import logging import os -from collections import defaultdict from datetime import datetime, timedelta, timezone from decimal import InvalidOperation from typing import Optional, NamedTuple, Reversible, Union, TypedDict @@ -401,16 +400,18 @@ class OHLCRepository: return found def flush(self) -> None: + log.debug(f'flushing {len(self.dirty_chunks)} chunks') for chunk in self.dirty_chunks: chunk.save() self.dirty_chunks.clear() - filepath = os.path.join(self.dir, quotes_path()) - for _ in range(2): - try: - with open(filepath, 'w') as f: - json.dump(self.quotes, f) - except FileNotFoundError: - os.makedirs(os.path.dirname(filepath), exist_ok=True) + if self._quotes: + filepath = os.path.join(self.dir, quotes_path()) + for _ in range(2): + try: + with open(filepath, 'w') as f: + json.dump(self._quotes, f) + except FileNotFoundError: + os.makedirs(os.path.dirname(filepath), exist_ok=True) class SeriesDict (TypedDict): @@ -425,7 +426,7 @@ class FinalOHLCRepository (OHLCRepository): super().__init__(base_dir, chain_id=chain_id) self.current:dict[tuple[str,timedelta],NativeOHLC] = {} # current bars keyed by symbol self.dirty_bars = set() - self.series:dict[int,dict[str,SeriesDict]] = defaultdict(dict) # keyed by [chain_id][symbol] + self.series:dict[str,dict[str,SeriesDict]] = {} # keyed by [str(chain_id)][symbol] self._series_dirty = False def light_update_all(self, symbol: str, time: datetime, price: Optional[dec]): @@ -446,7 +447,7 @@ class FinalOHLCRepository (OHLCRepository): prev = self.current[key] = chunk.bar_at(start) # log.debug(f'loaded prev bar from chunk {prev}') if prev is None and symbol in self.quotes: - latest_bar_time = ohlc_start_time(self.quotes[symbol][0], period) + latest_bar_time = ohlc_start_time(from_timestamp(self.quotes[symbol][0]), period) prev = self.current[key] = self.get_chunk(symbol, period, latest_bar_time).bar_at(latest_bar_time) if prev is None: # never seen before. create new bar. @@ -462,7 +463,10 @@ class FinalOHLCRepository (OHLCRepository): bar = self.current[key] = NativeOHLC(start, price, price, price, close) chunk.update(bar, backfill=backfill) self.dirty_chunks.add(chunk) - self.series[current_chain.get().chain_id][f'{key[0]}|{key[1]}'] = {'start': timestamp(start)} + chain_id_str = str(current_chain.get().chain_id) + if chain_id_str not in self.series: + self.series[chain_id_str] = {} + self.series[chain_id_str][f'{key[0]}|{key[1]}'] = {'start': timestamp(start)} self._series_dirty = True else: updated = update_ohlc(prev, period, time, price) diff --git a/src/dexorder/pools.py b/src/dexorder/pools.py index 1e274d5..c918c2b 100644 --- a/src/dexorder/pools.py +++ b/src/dexorder/pools.py @@ -14,7 +14,7 @@ from dexorder.blockstate import BlockDict from dexorder.blockstate.blockdata import K, V from dexorder.blocktime import get_block_timestamp from dexorder.database.model.pool import PoolDict -from dexorder.metadata import generating_metadata, is_generating_metadata +from dexorder.metadata import is_generating_metadata from dexorder.tokens import get_token from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address @@ -44,7 +44,7 @@ async def load_pool(address: str) -> PoolDict: decimals = token0['decimals'] - token1['decimals'] found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value, base=t0, quote=t1, fee=fee, decimals=decimals) - log.debug(f'new UniswapV3 pool {token0["symbol"]}\\{token1["symbol"]} ' + log.debug(f'new UniswapV3 pool {token0["symbol"]}/{token1["symbol"]} ' f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}') except ContractLogicError: pass diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 04759b6..656f233 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -80,7 +80,8 @@ class BlockStateRunner(BlockProgressor): self.timer_period = timer_period self.queue: Queue = Queue() - self.max_height_seen = config.backfill + self.max_height_seen = config.backfill if config.backfill is None or config.backfill >= 0 \ + else current_block.get().height + config.backfill # if backfill is negative then it's relative to the current block self.running = False diff --git a/src/dexorder/tokens.py b/src/dexorder/tokens.py index f00e6dd..66c7278 100644 --- a/src/dexorder/tokens.py +++ b/src/dexorder/tokens.py @@ -5,7 +5,7 @@ from typing import Optional from eth_abi.exceptions import InsufficientDataBytes from web3.exceptions import ContractLogicError, BadFunctionCallOutput -from dexorder import ADDRESS_0 +from dexorder import ADDRESS_0, config from dexorder.addrmeta import address_metadata from dexorder.base.chain import current_chain from dexorder.contract import ERC20, ContractProxy @@ -48,7 +48,7 @@ async def load_token(address: str) -> Optional[TokenDict]: log.debug(f'new token {name} {symbol} {address}') chain_id = current_chain.get().chain_id td = TokenDict(type='Token', chain=chain_id, address=address, - name=name, symbol=symbol, decimals=decimals, approved=False) + name=name, symbol=symbol, decimals=decimals, approved=config.metadata is None) md = get_metadata(address, chain_id=chain_id) if md is not None: td['approved'] = True diff --git a/src/dexorder/util/json.py b/src/dexorder/util/json.py index fccfbdc..efa709b 100644 --- a/src/dexorder/util/json.py +++ b/src/dexorder/util/json.py @@ -1,3 +1,4 @@ +from collections import defaultdict from decimal import Decimal from json import JSONEncoder from typing import Any @@ -10,13 +11,14 @@ from dexorder import DELETE def _serialize(v): - if type(v) is HexBytes: + t = type(v) + if t is HexBytes: return v.hex() - elif type(v) is bytes: + elif t is bytes: return '0x' + v.hex() - elif type(v) is AttributeDict: + elif t is AttributeDict: return v.__dict__ - elif type(v) is Decimal: + elif t is Decimal: return f'{v:f}' elif v is DELETE: return None diff --git a/src/dexorder/walker.py b/src/dexorder/walker.py index ea296ec..dbcd918 100644 --- a/src/dexorder/walker.py +++ b/src/dexorder/walker.py @@ -51,7 +51,14 @@ class BlockWalker (BlockProgressor): kv_key = f'walker_height|{chain_id}' with db.session: - processed_height = db.kv.get(kv_key, config.backfill) + processed_height = db.kv.get(kv_key) + if processed_height is None: + if config.backfill > 0: + processed_height = config.backfill + else: + cur = await w3.eth.get_block_number() + processed_height = cur + config.backfill if config.backfill < 0 else cur + log.info(f'walker starting at block {processed_height}') last_flush = processed_height if self.flush_type == 'blocks' else now() if self.flush_type == 'time' else None prev_height = None