finaldata ohlc fixes

This commit is contained in:
Tim
2024-03-18 19:43:39 -04:00
parent db76ddccac
commit a0e79b3547
9 changed files with 39 additions and 26 deletions

View File

@@ -46,7 +46,7 @@ def flush_callback():
if latest_block.get().height - block.height <= 2*confirms: if latest_block.get().height - block.height <= 2*confirms:
log.info(f'forward filling to present time') log.info(f'forward filling to present time')
for addr, data in address_metadata.items(): for addr, data in address_metadata.items():
if data['type'] == 'Pool': if data['type'] == 'Pool' and data['exchange'] >= 0:
ohlcs.light_update_all(addr, from_timestamp(block.timestamp), None) ohlcs.light_update_all(addr, from_timestamp(block.timestamp), None)
log.info("flushing OHLC's") log.info("flushing OHLC's")
ohlcs.flush() ohlcs.flush()

View File

@@ -13,6 +13,7 @@ from dexorder.util import hexstr
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class ContractTransaction: class ContractTransaction:
def __init__(self, id_bytes: bytes, rawtx: Optional[bytes] = None): def __init__(self, id_bytes: bytes, rawtx: Optional[bytes] = None):
self.id_bytes = id_bytes self.id_bytes = id_bytes
@@ -54,7 +55,7 @@ def call_wrapper(addr, name, func):
try: try:
return await func(*args, **kwargs).call(block_identifier=blockhash) return await func(*args, **kwargs).call(block_identifier=blockhash)
except Web3Exception as e: except Web3Exception as e:
log.error(f"Exception calling {addr}.{name}()") e.args += addr, name
raise e raise e
return f return f
@@ -64,7 +65,7 @@ def transact_wrapper(addr, name, func):
try: try:
tx_id = await func(*args, **kwargs).transact() tx_id = await func(*args, **kwargs).transact()
except Web3Exception as e: except Web3Exception as e:
log.error(f'Exception transacting {addr}.{name}()') e.args += addr, name
raise e raise e
return ContractTransaction(tx_id) return ContractTransaction(tx_id)
return f return f

View File

@@ -19,8 +19,6 @@ class PoolDict (TypedDict):
quote: str quote: str
fee: int fee: int
decimals: int decimals: int
approved: bool # whether this pool has only whitelisted tokens
liquidity: Optional[int]
class Pool (Base): class Pool (Base):

View File

@@ -1,6 +1,5 @@
import logging import logging
import os import os
from collections import defaultdict
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from decimal import InvalidOperation from decimal import InvalidOperation
from typing import Optional, NamedTuple, Reversible, Union, TypedDict from typing import Optional, NamedTuple, Reversible, Union, TypedDict
@@ -401,16 +400,18 @@ class OHLCRepository:
return found return found
def flush(self) -> None: def flush(self) -> None:
log.debug(f'flushing {len(self.dirty_chunks)} chunks')
for chunk in self.dirty_chunks: for chunk in self.dirty_chunks:
chunk.save() chunk.save()
self.dirty_chunks.clear() self.dirty_chunks.clear()
filepath = os.path.join(self.dir, quotes_path()) if self._quotes:
for _ in range(2): filepath = os.path.join(self.dir, quotes_path())
try: for _ in range(2):
with open(filepath, 'w') as f: try:
json.dump(self.quotes, f) with open(filepath, 'w') as f:
except FileNotFoundError: json.dump(self._quotes, f)
os.makedirs(os.path.dirname(filepath), exist_ok=True) except FileNotFoundError:
os.makedirs(os.path.dirname(filepath), exist_ok=True)
class SeriesDict (TypedDict): class SeriesDict (TypedDict):
@@ -425,7 +426,7 @@ class FinalOHLCRepository (OHLCRepository):
super().__init__(base_dir, chain_id=chain_id) super().__init__(base_dir, chain_id=chain_id)
self.current:dict[tuple[str,timedelta],NativeOHLC] = {} # current bars keyed by symbol self.current:dict[tuple[str,timedelta],NativeOHLC] = {} # current bars keyed by symbol
self.dirty_bars = set() self.dirty_bars = set()
self.series:dict[int,dict[str,SeriesDict]] = defaultdict(dict) # keyed by [chain_id][symbol] self.series:dict[str,dict[str,SeriesDict]] = {} # keyed by [str(chain_id)][symbol]
self._series_dirty = False self._series_dirty = False
def light_update_all(self, symbol: str, time: datetime, price: Optional[dec]): def light_update_all(self, symbol: str, time: datetime, price: Optional[dec]):
@@ -446,7 +447,7 @@ class FinalOHLCRepository (OHLCRepository):
prev = self.current[key] = chunk.bar_at(start) prev = self.current[key] = chunk.bar_at(start)
# log.debug(f'loaded prev bar from chunk {prev}') # log.debug(f'loaded prev bar from chunk {prev}')
if prev is None and symbol in self.quotes: if prev is None and symbol in self.quotes:
latest_bar_time = ohlc_start_time(self.quotes[symbol][0], period) latest_bar_time = ohlc_start_time(from_timestamp(self.quotes[symbol][0]), period)
prev = self.current[key] = self.get_chunk(symbol, period, latest_bar_time).bar_at(latest_bar_time) prev = self.current[key] = self.get_chunk(symbol, period, latest_bar_time).bar_at(latest_bar_time)
if prev is None: if prev is None:
# never seen before. create new bar. # never seen before. create new bar.
@@ -462,7 +463,10 @@ class FinalOHLCRepository (OHLCRepository):
bar = self.current[key] = NativeOHLC(start, price, price, price, close) bar = self.current[key] = NativeOHLC(start, price, price, price, close)
chunk.update(bar, backfill=backfill) chunk.update(bar, backfill=backfill)
self.dirty_chunks.add(chunk) self.dirty_chunks.add(chunk)
self.series[current_chain.get().chain_id][f'{key[0]}|{key[1]}'] = {'start': timestamp(start)} chain_id_str = str(current_chain.get().chain_id)
if chain_id_str not in self.series:
self.series[chain_id_str] = {}
self.series[chain_id_str][f'{key[0]}|{key[1]}'] = {'start': timestamp(start)}
self._series_dirty = True self._series_dirty = True
else: else:
updated = update_ohlc(prev, period, time, price) updated = update_ohlc(prev, period, time, price)

View File

@@ -14,7 +14,7 @@ from dexorder.blockstate import BlockDict
from dexorder.blockstate.blockdata import K, V from dexorder.blockstate.blockdata import K, V
from dexorder.blocktime import get_block_timestamp from dexorder.blocktime import get_block_timestamp
from dexorder.database.model.pool import PoolDict from dexorder.database.model.pool import PoolDict
from dexorder.metadata import generating_metadata, is_generating_metadata from dexorder.metadata import is_generating_metadata
from dexorder.tokens import get_token from dexorder.tokens import get_token
from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address
@@ -44,7 +44,7 @@ async def load_pool(address: str) -> PoolDict:
decimals = token0['decimals'] - token1['decimals'] decimals = token0['decimals'] - token1['decimals']
found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value, found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value,
base=t0, quote=t1, fee=fee, decimals=decimals) base=t0, quote=t1, fee=fee, decimals=decimals)
log.debug(f'new UniswapV3 pool {token0["symbol"]}\\{token1["symbol"]} ' log.debug(f'new UniswapV3 pool {token0["symbol"]}/{token1["symbol"]} '
f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}') f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}')
except ContractLogicError: except ContractLogicError:
pass pass

View File

@@ -80,7 +80,8 @@ class BlockStateRunner(BlockProgressor):
self.timer_period = timer_period self.timer_period = timer_period
self.queue: Queue = Queue() self.queue: Queue = Queue()
self.max_height_seen = config.backfill self.max_height_seen = config.backfill if config.backfill is None or config.backfill >= 0 \
else current_block.get().height + config.backfill # if backfill is negative then it's relative to the current block
self.running = False self.running = False

View File

@@ -5,7 +5,7 @@ from typing import Optional
from eth_abi.exceptions import InsufficientDataBytes from eth_abi.exceptions import InsufficientDataBytes
from web3.exceptions import ContractLogicError, BadFunctionCallOutput from web3.exceptions import ContractLogicError, BadFunctionCallOutput
from dexorder import ADDRESS_0 from dexorder import ADDRESS_0, config
from dexorder.addrmeta import address_metadata from dexorder.addrmeta import address_metadata
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.contract import ERC20, ContractProxy from dexorder.contract import ERC20, ContractProxy
@@ -48,7 +48,7 @@ async def load_token(address: str) -> Optional[TokenDict]:
log.debug(f'new token {name} {symbol} {address}') log.debug(f'new token {name} {symbol} {address}')
chain_id = current_chain.get().chain_id chain_id = current_chain.get().chain_id
td = TokenDict(type='Token', chain=chain_id, address=address, td = TokenDict(type='Token', chain=chain_id, address=address,
name=name, symbol=symbol, decimals=decimals, approved=False) name=name, symbol=symbol, decimals=decimals, approved=config.metadata is None)
md = get_metadata(address, chain_id=chain_id) md = get_metadata(address, chain_id=chain_id)
if md is not None: if md is not None:
td['approved'] = True td['approved'] = True

View File

@@ -1,3 +1,4 @@
from collections import defaultdict
from decimal import Decimal from decimal import Decimal
from json import JSONEncoder from json import JSONEncoder
from typing import Any from typing import Any
@@ -10,13 +11,14 @@ from dexorder import DELETE
def _serialize(v): def _serialize(v):
if type(v) is HexBytes: t = type(v)
if t is HexBytes:
return v.hex() return v.hex()
elif type(v) is bytes: elif t is bytes:
return '0x' + v.hex() return '0x' + v.hex()
elif type(v) is AttributeDict: elif t is AttributeDict:
return v.__dict__ return v.__dict__
elif type(v) is Decimal: elif t is Decimal:
return f'{v:f}' return f'{v:f}'
elif v is DELETE: elif v is DELETE:
return None return None

View File

@@ -51,7 +51,14 @@ class BlockWalker (BlockProgressor):
kv_key = f'walker_height|{chain_id}' kv_key = f'walker_height|{chain_id}'
with db.session: with db.session:
processed_height = db.kv.get(kv_key, config.backfill) processed_height = db.kv.get(kv_key)
if processed_height is None:
if config.backfill > 0:
processed_height = config.backfill
else:
cur = await w3.eth.get_block_number()
processed_height = cur + config.backfill if config.backfill < 0 else cur
log.info(f'walker starting at block {processed_height}') log.info(f'walker starting at block {processed_height}')
last_flush = processed_height if self.flush_type == 'blocks' else now() if self.flush_type == 'time' else None last_flush = processed_height if self.flush_type == 'blocks' else now() if self.flush_type == 'time' else None
prev_height = None prev_height = None