diff --git a/bin/update-deps b/bin/update-deps new file mode 100755 index 0000000..82b5eba --- /dev/null +++ b/bin/update-deps @@ -0,0 +1,3 @@ +#!/bin/bash + +pip install --upgrade pip && pip install --upgrade -r requirements.txt && pip freeze > requirements-lock.txt diff --git a/conf/arbsep/dexorder-arbsep.toml b/conf/arbsep/dexorder-arbsep.toml index f6609ff..045e0b1 100644 --- a/conf/arbsep/dexorder-arbsep.toml +++ b/conf/arbsep/dexorder-arbsep.toml @@ -2,26 +2,3 @@ metadata='metadata.json' # the Dockerfile will move metadata-finaldata.json int account = '${accounts.admin}' # todo switch back to accounts.gas rpc_url = '${rpc_urls.arbsep_alchemy}' ws_url = '${rpc_urls.arbsep_alchemy_ws}' - -mirror_pools = [ - # Arbitrum Pools - '0xC6962004f452bE9203591991D15f6b388e09E8D0', # WETH/USDC 0.05% - # '0x2f5e87C9312fa29aed5c179E456625D79015299c', # WBTC/WETH 0.05% - # '0x0d94947374cbc779a0FB4D1bfF795C0Af6Dfae25', # USDC/UNI 1.00% - # '0x689C96ceAb93f5E131631D225D75DeA3fD37747E', # WBTC/ARB 0.30% - # '0x0E4831319A50228B9e450861297aB92dee15B44F', # WBTC/USDC 0.05% - # '0x2038eEAa7100E08739352a37Ed67852E8529E8ED', # ARB/UNI 1.00% - # '0x468b88941e7Cc0B88c1869d68ab6b570bCEF62Ff', # WETH/LINK 0.30% - # '0xC24f7d8E51A64dc1238880BD00bb961D54cbeb29', # WETH/UNI 0.30% - # '0xbBe36e6f0331C6a36AB44Bc8421E28E1a1871C1e', # USDC/LINK 0.30% - # '0xa79fD76cA2b24631Ec3151f10c0660a30Bc946E7', # WBTC/LINK 0.30% - # '0xb0f6cA40411360c03d41C5fFc5F179b8403CdcF8', # ARB/USDC 0.05% - # '0xC6F780497A95e246EB9449f5e4770916DCd6396A', # WETH/ARB 0.05% - # '0x8b6149aF984140BD3F8e158CcDCD05984a4ad0f5', # ARB/LINK 0.30% - # '0xEd701Ba0cec723d85B7d96c80C21148E49D2Bf05', # LINK/UNI 1.00% - - # Base Pools - # '0xd0b53D9277642d899DF5C87A3966A349A798F224', # WETH/USDC - # '0xb4CB800910B228ED3d0834cF79D697127BBB00e5', # WETH/USDC - -] diff --git a/conf/mirrorprice/dexorder-mirrorprice.toml b/conf/mirrorprice/dexorder-mirrorprice.toml new file mode 100644 index 0000000..2f46433 --- /dev/null +++ b/conf/mirrorprice/dexorder-mirrorprice.toml @@ -0,0 +1,27 @@ +metadata='metadata.json' # the Dockerfile will move metadata-finaldata.json into positon +account = '${accounts.admin}' # todo switch back to accounts.gas +rpc_url = '${rpc_urls.arbsep_alchemy}' +mirror_source_rpc_url='${rpc_urls.arbsep_alchemy}' + +mirror_pools = [ + # Arbitrum Pools + '0xC6962004f452bE9203591991D15f6b388e09E8D0', # WETH/USDC 0.05% + # '0x2f5e87C9312fa29aed5c179E456625D79015299c', # WBTC/WETH 0.05% + # '0x0d94947374cbc779a0FB4D1bfF795C0Af6Dfae25', # USDC/UNI 1.00% + # '0x689C96ceAb93f5E131631D225D75DeA3fD37747E', # WBTC/ARB 0.30% + # '0x0E4831319A50228B9e450861297aB92dee15B44F', # WBTC/USDC 0.05% + # '0x2038eEAa7100E08739352a37Ed67852E8529E8ED', # ARB/UNI 1.00% + # '0x468b88941e7Cc0B88c1869d68ab6b570bCEF62Ff', # WETH/LINK 0.30% + # '0xC24f7d8E51A64dc1238880BD00bb961D54cbeb29', # WETH/UNI 0.30% + # '0xbBe36e6f0331C6a36AB44Bc8421E28E1a1871C1e', # USDC/LINK 0.30% + # '0xa79fD76cA2b24631Ec3151f10c0660a30Bc946E7', # WBTC/LINK 0.30% + # '0xb0f6cA40411360c03d41C5fFc5F179b8403CdcF8', # ARB/USDC 0.05% + # '0xC6F780497A95e246EB9449f5e4770916DCd6396A', # WETH/ARB 0.05% + # '0x8b6149aF984140BD3F8e158CcDCD05984a4ad0f5', # ARB/LINK 0.30% + # '0xEd701Ba0cec723d85B7d96c80C21148E49D2Bf05', # LINK/UNI 1.00% + + # Base Pools + # '0xd0b53D9277642d899DF5C87A3966A349A798F224', # WETH/USDC + # '0xb4CB800910B228ED3d0834cF79D697127BBB00e5', # WETH/USDC + +] diff --git a/conf/mirrorprice/logging-mirrorprice.toml b/conf/mirrorprice/logging-mirrorprice.toml new file mode 100644 index 0000000..376ed2b --- /dev/null +++ b/conf/mirrorprice/logging-mirrorprice.toml @@ -0,0 +1,20 @@ +# https://docs.python.org/3/library/logging.config.html#logging-config-dictschema +version=1 + +[loggers.''] +level='INFO' +handlers=['console',] + +[loggers.dexorder] +level='DEBUG' + +[handlers.console] +class='logging.StreamHandler' +formatter='notime' +stream='ext://sys.stdout' + +[formatters.notime] +# https://docs.python.org/3/library/logging.html#logrecord-attributes +format='%(levelname)s %(name)s %(message)s' +# https://docs.python.org/3/library/time.html#time.strftime +datefmt='%Y-%m-%d %H:%M:%S' diff --git a/requirements-lock.txt b/requirements-lock.txt index fdf08c5..f5b4f7d 100644 --- a/requirements-lock.txt +++ b/requirements-lock.txt @@ -2,15 +2,23 @@ aiohttp==3.9.5 aiosignal==1.3.1 alembic==1.13.2 antlr4-python3-runtime==4.9.3 +asn1crypto==1.5.1 async-lru==2.0.4 attrs==23.2.0 +bip-utils==2.9.3 bitarray==2.9.2 cachetools==5.4.0 +cbor2==5.6.4 certifi==2024.2.2 +cffi==1.16.0 charset-normalizer==3.3.2 ckzg==1.0.1 +coincurve==20.0.0 +crcmod==1.7 cytoolz==0.12.3 defaultlist==1.0.0 +ecdsa==0.19.0 +ed25519-blake2b==1.4.1 eth-account==0.11.2 eth-bloom==3.0.1 eth-hash==0.7.0 @@ -37,11 +45,14 @@ orjson==3.10.6 parsimonious==0.10.0 protobuf==5.26.1 psycopg2-binary==2.9.9 +py-sr25519-bindings==0.2.0 +pycparser==2.22 pycryptodome==3.20.0 +PyNaCl==1.5.0 python-dateutil==2.9.0.post0 pyunormalize==15.1.0 PyYAML==6.0.1 -redis==5.0.7 +redis==5.0.8 referencing==0.35.0 regex==2024.4.28 requests==2.31.0 diff --git a/src/dexorder/bin/dice_seed.py b/src/dexorder/bin/dice_seed.py new file mode 100644 index 0000000..379af44 --- /dev/null +++ b/src/dexorder/bin/dice_seed.py @@ -0,0 +1,54 @@ +# +# This script generates a BIP-39 24-word key from physical dice rolls (1-6) +# + +KEY_LENGTH=128 # use this to select the size of your seed: 128 (12-word), 160, 192, 224, 256 (24-word) + +from bip_utils import Bip39MnemonicEncoder +from bitarray import bitarray + +print('Throw dice and enter the numbers until there\'s enough:') + +entropy = bitarray() + +while True: + i = input() + inverse = 0 + + def bit(b): + b0 = b ^ inverse + # print(b0, end='') + entropy.append(b0) + + def bits(b0, b1): + bit(b0); bit(b1) + + try: + i = int(i) + assert 1 <= i <= 6 + if i == 1: + bits(0,1) + elif i == 2: + bits(1,0) + elif i == 3: + bits(1,1) + elif i == 4: + bits(0,0) + # we retain entropy from the 5 and 6 rolls by flipping or not flipping the inversion flag. this does not + # generate bits but it increases entropy if there is any nonrandomness in the dice. + elif i == 5: + pass + elif i == 6: + inverse = 1 if inverse == 0 else 0 + + if len(entropy) > KEY_LENGTH: + seed = bytes(entropy[:KEY_LENGTH]) + print(seed.hex()) + mnem = Bip39MnemonicEncoder().Encode(seed) + print(mnem) + break + else: + print(f'{len(entropy):>3}/{KEY_LENGTH} bits') + + except: + print('Input ignored. Enter a number 1-6:') diff --git a/src/dexorder/bin/finaldata.py b/src/dexorder/bin/finaldata.py index 5ce7034..d39a3ed 100644 --- a/src/dexorder/bin/finaldata.py +++ b/src/dexorder/bin/finaldata.py @@ -15,7 +15,7 @@ from dexorder.blocks import get_block_timestamp, get_block from dexorder.blockstate.fork import current_fork from dexorder.configuration import parse_args from dexorder.contract import get_contract_event -from dexorder.ohlc import FinalOHLCRepository +from dexorder.final_ohlc import FinalOHLCRepository from dexorder.pools import get_uniswap_data from dexorder.util import hexstr from dexorder.util.shutdown import fatal @@ -23,8 +23,7 @@ from dexorder.walker import BlockWalker log = logging.getLogger('dexorder') - -ohlcs = FinalOHLCRepository() +ohlcs: FinalOHLCRepository async def handle_backfill_uniswap_swaps(swaps: list[EventData]): @@ -39,7 +38,8 @@ async def handle_backfill_uniswap_swaps(swaps: list[EventData]): if data is not None: pool, time, price = data # log.debug(f'OHLC {pool["address"]} {time} {price}') - ohlcs.light_update_all(pool['address'], time, price) + ohlcs.update(pool['address'], time, price) + async def flush_callback(): # start = now() @@ -54,17 +54,20 @@ async def flush_callback(): log.info(f'forward filling to present time') for addr, data in address_metadata.items(): if data['type'] == 'Pool' and data['exchange'] >= 0: - ohlcs.light_update_all(addr, time, None) + ohlcs.update(addr, time, None) log.info("flushing OHLC's") ohlcs.flush() log.info(f'backfill completed through block {block.height} {time:%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}') + async def main(): logging.basicConfig(level=logging.INFO, stream=sys.stdout) log.setLevel(logging.DEBUG) parse_args() - if config.ohlc_dir is None: - fatal('an ohlc_dir must be configured') + if not config.ohlc_dir: + fatal('Must configure ohlc_dir') + global ohlcs + ohlcs = FinalOHLCRepository() await blockchain.connect() walker = BlockWalker(flush_callback, timedelta(seconds=config.walker_flush_interval)) walker.add_event_trigger(handle_backfill_uniswap_swaps, diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index e4a749f..1fed0e6 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -20,6 +20,7 @@ class Config: metadata: Optional[str] = None ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk + chunk_cache_size: int = 128 # Number of pools that have their OHLC chunks cached concurrent_rpc_connections: int = 4 parallel_logevent_queries: bool = True diff --git a/src/dexorder/final_ohlc.py b/src/dexorder/final_ohlc.py new file mode 100644 index 0000000..4efa1f2 --- /dev/null +++ b/src/dexorder/final_ohlc.py @@ -0,0 +1,317 @@ +import logging +import os +from datetime import timedelta, datetime, timezone +from typing import Optional + +from cachetools import LFUCache + +from dexorder import dec, timestamp, config, from_timestamp +from dexorder.base.chain import current_chain +from dexorder.ohlc import OHLC_PERIODS, period_name, ohlc_start_time + +log = logging.getLogger(__name__) + +# This is a parallel OHLC system that processes only known-final data, allowing it to be much more efficient +# than the leading edge reorgable OHLC's. +# +# See dexorder/doc/ohlc.md +# + + +class OHLCFilePath: + def __init__(self, symbol: str, period: timedelta, time: datetime): + self.symbol = symbol + self.period = period + name = period_name(period) + self.filepath = f'{symbol}/{name}/' + if period < timedelta(minutes=15): + # one file per day is the smallest resolution + # log.debug(f'{name} is daily') + self.start = start = datetime(time.year, time.month, time.day, tzinfo=timezone.utc) + self.end = self.start + timedelta(days=1) + self.file_interval = timedelta(days=1) + self.filepath += f'{start.year}/{symbol}-{name}-{start:%Y%m%d}.csv' + elif period < timedelta(hours=8): + # one file per month + # log.debug(f'{name} is monthly') + self.start = start = datetime(time.year, time.month, 1, tzinfo=timezone.utc) + end_month = time.month + 1 + end_year = time.year + if end_month == 13: + end_month = 1 + end_year += 1 + self.end = datetime(end_year, end_month, 1, tzinfo=timezone.utc) + self.file_interval = timedelta(days=32) # it's ok to add a little more because we will find the start time of the new file. + self.filepath += f'{start.year}/{start.month}/{symbol}-{name}-{start:%Y%m}.csv' + elif period < timedelta(days=7): + # one file per year + # log.debug(f'{name} is yearly') + self.start = start = datetime(time.year, 1, 1, tzinfo=timezone.utc) + self.end = datetime(time.year+1, 1, 1, tzinfo=timezone.utc) + self.file_interval = timedelta(days=366) + self.filepath += f'{start.year}/{symbol}-{name}-{start:%Y%m}.csv' + else: + # weeklies get one file for everything + # log.debug(f'{name} is a single file') + self.start = None + self.end = None + self.file_interval = None + self.filepath += f'{symbol}-{name}.csv' + + + def next(self): + return OHLCFilePath(self.symbol, self.period, self.start + self.file_interval) + + def __repr__(self): + return self.filepath + + def __hash__(self): + return hash(self.filepath) + + def __eq__(self, other): + return other == self.filepath or isinstance(other, OHLCFilePath) and self.filepath == other.filepath + + +class OHLCFile: + @staticmethod + def get(base_dir: str, filepath: OHLCFilePath): + key = base_dir, filepath + if key not in OHLCFile.cache: + OHLCFile.cache[key] = OHLCFile(base_dir, filepath) + return OHLCFile.cache[key] + + def __init__(self, base_dir: str, filepath: OHLCFilePath): + self.base_dir = base_dir + self.filepath = filepath + self.file = None + self._cur = None + self._pending = None + self._final_rows = [] + + class OHLCFileCache(LFUCache[tuple[str,OHLCFilePath], 'OHLCFile']): + def popitem(self): + key, item = super().popitem() + item.close() + + cache = OHLCFileCache(len(OHLC_PERIODS) * config.chunk_cache_size) + + @property + def filename(self): + return os.path.join(self.base_dir, self.filepath.filepath) + + @property + def timestamp(self): + try: + return self.cur[0] + except TypeError: + raise ValueError('cur is None') + + @property + def price(self): + try: + return self.cur[-1] + except TypeError: + raise ValueError('cur is None') + + @property + def cur(self): + return self._pending if self._pending is not None else self._cur + + @cur.setter + def cur(self, value): + self._pending = value + + def update(self, time: datetime, price: dec): + ts = timestamp(ohlc_start_time(time,self.filepath.period)) + if self.file is None: + self._load(ts) + if self.cur is None: + # nothing yet. simple time+price + self.cur = ts, price + elif self.cur[0] < ts: + # the current bar was an old timestamp. Advance bars. + self._final_rows.append(self.cur) + # new bar time+price + self.cur = ts, price + elif len(self.cur) == 2: + self.cur = *self.cur, price + elif len(self.cur) == 3: + t, o, c = self.cur + self.cur = t, o, max(o,c,price), min(o,c,price), price + else: + t, o, h, line, c = self.cur + self.cur = t, o, max(h,line,price), min(h,line,price), price + + @staticmethod + def row_bytes(row): + return (','.join(str(c) for c in row)+'\n').encode('ascii') + + def flush(self): + # first we write the "final" rows which means rows that have been closed and will get no more data. + for row in self._final_rows: + self.file.write(OHLCFile.row_bytes(row)) + # apply any pending changes to the current row + if self._pending is not None: + self._cur = self._pending + self._pending = None + # write the current row + if self._cur is not None: + data = OHLCFile.row_bytes(self._cur) + self.file.write(data) + # rewind our file cursor to the beginning of the current row + self.file.seek(len(data), os.SEEK_END) + self.file.flush() + + def _load(self, earliest_change): + try: + self.file = open(self.filename, 'br+') # br+ is binary read+write + except FileNotFoundError: + # no existing file + os.makedirs(os.path.dirname(self.filename), exist_ok=True) + self.file = open(self.filename, 'bw') + else: + # load existing file + line_number = 0 + row = None + prev_line = None + offset = 0 # this will point to the start of the last row + for line in self.file.readlines(): # all files should be small and can load at once + line_number += 1 + row = line.decode('ascii').strip().split(',') + if not row[0]: # empty line + continue + if prev_line is not None: + # advance cursor past the previous row + offset += len(prev_line) + if int(row[0]) >= earliest_change: + # we are going to replace this row so stop walking forward and truncate the rest of the file + self.file.truncate(offset) + break + prev_line = line + if row is not None: + self._cur = [int(row[0]), *(dec(p) for p in row[1:])] # convert to int timestamp and dec prices + # set the file's write pointer the start of the final row + self.file.seek(offset) + # self._cur is now either None (empty file) or points to a natively-typed list representing the last row + + def close(self): + self.file.close() + + +class OHLCFileSeries: + instances = {} + + @staticmethod + def get(base_dir: str, symbol: str): + key = base_dir, symbol + if key not in OHLCFileSeries.instances: + OHLCFileSeries.instances[key] = OHLCFileSeries(base_dir,symbol) + return OHLCFileSeries.instances[key] + + def __init__(self, base_dir: str, symbol: str): + self.base_dir = base_dir + self.symbol = symbol + self.series_start: Optional[datetime] = None # timestamp of the first datum in the series + self.write_offset: Optional[int] = None + self.last_flush: Optional[tuple[datetime,dec]] = None + self.quote_file = None + self.dirty_files = set() + self.quote: Optional[tuple[datetime,dec]] = None + + def update(self, time: datetime, price: dec): + # + # load quote file + # + if self.quote_file is None: + + quote_filename = os.path.join(self.base_dir, self.symbol, 'quote.csv') + try: + self.quote_file = open(quote_filename, 'br+') + # load existing quote file + line = self.quote_file.read().decode('ascii') + except FileNotFoundError: + os.makedirs(os.path.dirname(quote_filename), exist_ok=True) + self.quote_file = open(quote_filename, 'bw') + line = None + if line: + start, old_time, old_price = line.split(',') + self.series_start = from_timestamp(int(start)) + # position the write cursor at the start of the second column so we can write the latest quote quickly + self.write_offset = len(start) + 1 # after the start time bytes and comma + self.quote_file.seek(self.write_offset) + self.last_flush = from_timestamp(int(old_time)), dec(old_price) + else: + # initialize new quote file with our own series_start time + self.quote_file.write((str(timestamp(time)) + ',').encode('ascii')) + self.write_offset = 0 + self.last_flush = None + + # + # forward-fill OHLC files that would otherwise be empty/skipped + # + if self.last_flush is not None: + for period in OHLC_PERIODS: + # get the path to the file that was last flushed + t, p = self.last_flush + path = OHLCFilePath(self.symbol, period, t) + while path.end and path.end < time: + path = path.next() + # initialize the new file using the carried-forward price + file = OHLCFile.get(self.base_dir, path) + file.update(file.filepath.start, p) # set file opening price + self.dirty_files.add(file) + + self.quote = time, price + + if self.series_start is None: + self.series_start = time + for period in OHLC_PERIODS: + file = OHLCFile.get(self.base_dir, OHLCFilePath(self.symbol, period, time)) + file.update(time, price) + self.dirty_files.add(file) + + def flush(self): + if self.quote is None: + log.warning('OHLCFileSeries was flushed without having any updated data.') + return + time, price = self.quote + ts = timestamp(time) + + # + # flush dirty OHLC files + # + for file in self.dirty_files: + file.flush() + self.dirty_files.clear() + + # + # flush quote file + # + self.quote_file.write(f'{ts},{price:f}'.encode('ascii')) + self.quote_file.flush() + self.quote_file.seek(self.write_offset) + + # remember where we were so we can forward-fill again next time + self.last_flush = self.quote + + +class FinalOHLCRepository: + """ + Used for backfill when all data is known to be final. Keeps a simple table of current NativeOHLC candles. + """ + def __init__(self): + assert config.ohlc_dir + self.dirty_series = set() + + def update(self, symbol: str, time: datetime, price: Optional[dec]): + chain_id = current_chain.get().id + base_dir = os.path.join(config.ohlc_dir, str(chain_id)) + series = OHLCFileSeries.get(base_dir, symbol) + series.update(time, price) + self.dirty_series.add(series) + + def flush(self) -> None: + for series in self.dirty_series: + series.flush() + self.dirty_series.clear() + diff --git a/src/dexorder/ohlc.py b/src/dexorder/ohlc.py index 5a940e7..74bd748 100644 --- a/src/dexorder/ohlc.py +++ b/src/dexorder/ohlc.py @@ -1,17 +1,25 @@ +# +# NOTE: There are two OHLC systems. This one is for recent OHLC series which can get reorged in the backend. There is +# a separate final_ohlc.py system for writing finalized (old) pricing data to disk. To get a complete view of the price, +# a client must load the history from the finaldata disk files and then splice the dynamic recent OHLC series over the +# end of the history. +# + import logging import os from datetime import datetime, timedelta, timezone from decimal import InvalidOperation -from typing import Optional, NamedTuple, Reversible, Union, TypedDict +from typing import Optional, NamedTuple, Reversible, Union from cachetools import LFUCache +from typing_extensions import deprecated from dexorder import dec, config, from_timestamp, timestamp, now, minutely from dexorder.base.chain import current_chain from dexorder.blocks import get_block_timestamp from dexorder.blockstate import BlockDict, DiffItem, current_blockstate from dexorder.blockstate.diff import DiffEntryItem -from dexorder.blockstate.fork import Fork, current_fork +from dexorder.blockstate.fork import Fork from dexorder.util import json from dexorder.util.shutdown import fatal @@ -195,8 +203,6 @@ def series_path(chain_id: int = None): def chunk_path(symbol: str, period: timedelta, time: datetime, *, chain_id: int = None) -> str: - if chain_id is None: - chain_id = current_chain.get().id start = ohlc_start_time(time, period) name = period_name(period) return f'{chain_id}/{symbol}/{name}/' + ( @@ -208,8 +214,7 @@ def chunk_path(symbol: str, period: timedelta, time: datetime, *, chain_id: int class Chunk: """ - Chunks map to files of OHLC's on disk. If an OHLC contains 6 fields instead of just 5, the 6th field is a - timestamp pointing to the next + DEPRECATED """ def __init__(self, repo_dir: str, symbol: str, period: timedelta, time: datetime, *, bars: Optional[list[NativeOHLC]] = None, chain_id: int = None): @@ -304,7 +309,7 @@ class OHLCRepository: """ can't actually make more than one of these because there's a global recent_ohlcs BlockDict """ self._dir = base_dir self._chain_id = chain_id - self.cache = LFUCache(len(OHLC_PERIODS) * 1024) + self.cache = LFUCache(len(OHLC_PERIODS) * config.chunk_cache_size) self.dirty_chunks = set() self._quotes = None @@ -440,79 +445,6 @@ class OHLCRepository: os.makedirs(os.path.dirname(filepath), exist_ok=True) -class SeriesDict (TypedDict): - start: int # timestamp of the start of the series - - -class FinalOHLCRepository (OHLCRepository): - """ - Used for backfill when all data is known to be final. Keeps a simple table of current NativeOHLC candles. - """ - def __init__(self, base_dir: str = None, *, chain_id: int = None): - super().__init__(base_dir, chain_id=chain_id) - self.current:dict[tuple[str,timedelta],NativeOHLC] = {} # current bars keyed by symbol - self.dirty_bars = set() - self.series:dict[str,dict[str,SeriesDict]] = {} # keyed by [str(chain_id)][symbol] - self._series_dirty = False - - def light_update_all(self, symbol: str, time: datetime, price: Optional[dec]): - for period in OHLC_PERIODS: - self.light_update(symbol, period, time, price) - - def light_update(self, symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, - *, backfill=True): - if OHLC_LIMIT_POOLS_DEBUG is not None and (symbol,period) not in OHLC_LIMIT_POOLS_DEBUG: - return - if price is not None: - self.quotes[symbol] = timestamp(time), str(price) - start = ohlc_start_time(time, period) -# log.debug(f'OHLC start_time {start}') - chunk = self.get_chunk(symbol, period, start) - key = symbol, period - prev = self.current.get(key) - if prev is None: - # cache miss. load from chunk. - prev = self.current[key] = chunk.bar_at(start) -# log.debug(f'loaded prev bar from chunk {prev}') - if prev is None and symbol in self.quotes: - latest_bar_time = ohlc_start_time(from_timestamp(self.quotes[symbol][0]), period) - prev = self.current[key] = self.get_chunk(symbol, period, latest_bar_time).bar_at(latest_bar_time) - if prev is None: - # never seen before. create new bar. -# log.debug(f'no prev bar') - if price is not None: - close = price - else: - try: - close = dec(self.quotes[symbol][1]) - except KeyError: - log.warning(f'light_update tried to advance time on {symbol} which has no previous price.') - return # no previous quote, no current price either - bar = self.current[key] = NativeOHLC(start, price, price, price, close) - chunk.update(bar, backfill=backfill) - self.dirty_chunks.add(chunk) - chain_id_str = str(current_chain.get().id) - if chain_id_str not in self.series: - self.series[chain_id_str] = {} - self.series[chain_id_str][f'{key[0]}|{period_name(key[1])}'] = {'start': timestamp(start)} - self._series_dirty = True - else: - updated = update_ohlc(prev, period, time, price) - for bar in updated: - chunk = self.get_chunk(symbol, period, bar.start) - chunk.update(bar, backfill=backfill) - self.dirty_chunks.add(chunk) - self.current[key] = updated[-1] - - def flush(self) -> None: - # flush chunks - super().flush() - # flush series.json if needed - if self._series_dirty: - save_json(self.series, os.path.join(self.dir, series_path(self.chain_id))) - self._series_dirty = False - - def save_json(obj, filename): for _ in range(2): try: diff --git a/src/dexorder/order/executionhandler.py b/src/dexorder/order/executionhandler.py index 38d4107..5473187 100644 --- a/src/dexorder/order/executionhandler.py +++ b/src/dexorder/order/executionhandler.py @@ -1,7 +1,7 @@ import logging from uuid import UUID -from web3.exceptions import ContractPanicError +from web3.exceptions import ContractPanicError, ContractLogicError from dexorder import db from dexorder.base.order import TrancheExecutionRequest, TrancheKey @@ -20,8 +20,9 @@ class TrancheExecutionHandler (TransactionHandler): # noinspection PyBroadException try: return await get_dexorder_contract().build.execute(job_id.bytes, (req.vault, req.order_index, req.tranche_index, req.price_proof)) - except ContractPanicError as px: - log.error(f'While executing job {job_id}: {px}') + except (ContractPanicError, ContractLogicError) as x: + # todo if there's a logic error we shouldn't keep trying + log.error(f'While executing job {job_id}: {x}') await self.complete_transaction(db.session.get(TransactionJob, job_id)) except Exception: log.exception(f'Could not send execution request {req}')