OHLC rework

This commit is contained in:
tim
2024-08-04 19:55:22 -04:00
parent 89ac1aa92b
commit 4bcbcaf09e
11 changed files with 460 additions and 114 deletions

3
bin/update-deps Executable file
View File

@@ -0,0 +1,3 @@
#!/bin/bash
pip install --upgrade pip && pip install --upgrade -r requirements.txt && pip freeze > requirements-lock.txt

View File

@@ -2,26 +2,3 @@ metadata='metadata.json' # the Dockerfile will move metadata-finaldata.json int
account = '${accounts.admin}' # todo switch back to accounts.gas
rpc_url = '${rpc_urls.arbsep_alchemy}'
ws_url = '${rpc_urls.arbsep_alchemy_ws}'
mirror_pools = [
# Arbitrum Pools
'0xC6962004f452bE9203591991D15f6b388e09E8D0', # WETH/USDC 0.05%
# '0x2f5e87C9312fa29aed5c179E456625D79015299c', # WBTC/WETH 0.05%
# '0x0d94947374cbc779a0FB4D1bfF795C0Af6Dfae25', # USDC/UNI 1.00%
# '0x689C96ceAb93f5E131631D225D75DeA3fD37747E', # WBTC/ARB 0.30%
# '0x0E4831319A50228B9e450861297aB92dee15B44F', # WBTC/USDC 0.05%
# '0x2038eEAa7100E08739352a37Ed67852E8529E8ED', # ARB/UNI 1.00%
# '0x468b88941e7Cc0B88c1869d68ab6b570bCEF62Ff', # WETH/LINK 0.30%
# '0xC24f7d8E51A64dc1238880BD00bb961D54cbeb29', # WETH/UNI 0.30%
# '0xbBe36e6f0331C6a36AB44Bc8421E28E1a1871C1e', # USDC/LINK 0.30%
# '0xa79fD76cA2b24631Ec3151f10c0660a30Bc946E7', # WBTC/LINK 0.30%
# '0xb0f6cA40411360c03d41C5fFc5F179b8403CdcF8', # ARB/USDC 0.05%
# '0xC6F780497A95e246EB9449f5e4770916DCd6396A', # WETH/ARB 0.05%
# '0x8b6149aF984140BD3F8e158CcDCD05984a4ad0f5', # ARB/LINK 0.30%
# '0xEd701Ba0cec723d85B7d96c80C21148E49D2Bf05', # LINK/UNI 1.00%
# Base Pools
# '0xd0b53D9277642d899DF5C87A3966A349A798F224', # WETH/USDC
# '0xb4CB800910B228ED3d0834cF79D697127BBB00e5', # WETH/USDC
]

View File

@@ -0,0 +1,27 @@
metadata='metadata.json' # the Dockerfile will move metadata-finaldata.json into positon
account = '${accounts.admin}' # todo switch back to accounts.gas
rpc_url = '${rpc_urls.arbsep_alchemy}'
mirror_source_rpc_url='${rpc_urls.arbsep_alchemy}'
mirror_pools = [
# Arbitrum Pools
'0xC6962004f452bE9203591991D15f6b388e09E8D0', # WETH/USDC 0.05%
# '0x2f5e87C9312fa29aed5c179E456625D79015299c', # WBTC/WETH 0.05%
# '0x0d94947374cbc779a0FB4D1bfF795C0Af6Dfae25', # USDC/UNI 1.00%
# '0x689C96ceAb93f5E131631D225D75DeA3fD37747E', # WBTC/ARB 0.30%
# '0x0E4831319A50228B9e450861297aB92dee15B44F', # WBTC/USDC 0.05%
# '0x2038eEAa7100E08739352a37Ed67852E8529E8ED', # ARB/UNI 1.00%
# '0x468b88941e7Cc0B88c1869d68ab6b570bCEF62Ff', # WETH/LINK 0.30%
# '0xC24f7d8E51A64dc1238880BD00bb961D54cbeb29', # WETH/UNI 0.30%
# '0xbBe36e6f0331C6a36AB44Bc8421E28E1a1871C1e', # USDC/LINK 0.30%
# '0xa79fD76cA2b24631Ec3151f10c0660a30Bc946E7', # WBTC/LINK 0.30%
# '0xb0f6cA40411360c03d41C5fFc5F179b8403CdcF8', # ARB/USDC 0.05%
# '0xC6F780497A95e246EB9449f5e4770916DCd6396A', # WETH/ARB 0.05%
# '0x8b6149aF984140BD3F8e158CcDCD05984a4ad0f5', # ARB/LINK 0.30%
# '0xEd701Ba0cec723d85B7d96c80C21148E49D2Bf05', # LINK/UNI 1.00%
# Base Pools
# '0xd0b53D9277642d899DF5C87A3966A349A798F224', # WETH/USDC
# '0xb4CB800910B228ED3d0834cF79D697127BBB00e5', # WETH/USDC
]

View File

@@ -0,0 +1,20 @@
# https://docs.python.org/3/library/logging.config.html#logging-config-dictschema
version=1
[loggers.'']
level='INFO'
handlers=['console',]
[loggers.dexorder]
level='DEBUG'
[handlers.console]
class='logging.StreamHandler'
formatter='notime'
stream='ext://sys.stdout'
[formatters.notime]
# https://docs.python.org/3/library/logging.html#logrecord-attributes
format='%(levelname)s %(name)s %(message)s'
# https://docs.python.org/3/library/time.html#time.strftime
datefmt='%Y-%m-%d %H:%M:%S'

View File

@@ -2,15 +2,23 @@ aiohttp==3.9.5
aiosignal==1.3.1
alembic==1.13.2
antlr4-python3-runtime==4.9.3
asn1crypto==1.5.1
async-lru==2.0.4
attrs==23.2.0
bip-utils==2.9.3
bitarray==2.9.2
cachetools==5.4.0
cbor2==5.6.4
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.3.2
ckzg==1.0.1
coincurve==20.0.0
crcmod==1.7
cytoolz==0.12.3
defaultlist==1.0.0
ecdsa==0.19.0
ed25519-blake2b==1.4.1
eth-account==0.11.2
eth-bloom==3.0.1
eth-hash==0.7.0
@@ -37,11 +45,14 @@ orjson==3.10.6
parsimonious==0.10.0
protobuf==5.26.1
psycopg2-binary==2.9.9
py-sr25519-bindings==0.2.0
pycparser==2.22
pycryptodome==3.20.0
PyNaCl==1.5.0
python-dateutil==2.9.0.post0
pyunormalize==15.1.0
PyYAML==6.0.1
redis==5.0.7
redis==5.0.8
referencing==0.35.0
regex==2024.4.28
requests==2.31.0

View File

@@ -0,0 +1,54 @@
#
# This script generates a BIP-39 24-word key from physical dice rolls (1-6)
#
KEY_LENGTH=128 # use this to select the size of your seed: 128 (12-word), 160, 192, 224, 256 (24-word)
from bip_utils import Bip39MnemonicEncoder
from bitarray import bitarray
print('Throw dice and enter the numbers until there\'s enough:')
entropy = bitarray()
while True:
i = input()
inverse = 0
def bit(b):
b0 = b ^ inverse
# print(b0, end='')
entropy.append(b0)
def bits(b0, b1):
bit(b0); bit(b1)
try:
i = int(i)
assert 1 <= i <= 6
if i == 1:
bits(0,1)
elif i == 2:
bits(1,0)
elif i == 3:
bits(1,1)
elif i == 4:
bits(0,0)
# we retain entropy from the 5 and 6 rolls by flipping or not flipping the inversion flag. this does not
# generate bits but it increases entropy if there is any nonrandomness in the dice.
elif i == 5:
pass
elif i == 6:
inverse = 1 if inverse == 0 else 0
if len(entropy) > KEY_LENGTH:
seed = bytes(entropy[:KEY_LENGTH])
print(seed.hex())
mnem = Bip39MnemonicEncoder().Encode(seed)
print(mnem)
break
else:
print(f'{len(entropy):>3}/{KEY_LENGTH} bits')
except:
print('Input ignored. Enter a number 1-6:')

View File

@@ -15,7 +15,7 @@ from dexorder.blocks import get_block_timestamp, get_block
from dexorder.blockstate.fork import current_fork
from dexorder.configuration import parse_args
from dexorder.contract import get_contract_event
from dexorder.ohlc import FinalOHLCRepository
from dexorder.final_ohlc import FinalOHLCRepository
from dexorder.pools import get_uniswap_data
from dexorder.util import hexstr
from dexorder.util.shutdown import fatal
@@ -23,8 +23,7 @@ from dexorder.walker import BlockWalker
log = logging.getLogger('dexorder')
ohlcs = FinalOHLCRepository()
ohlcs: FinalOHLCRepository
async def handle_backfill_uniswap_swaps(swaps: list[EventData]):
@@ -39,7 +38,8 @@ async def handle_backfill_uniswap_swaps(swaps: list[EventData]):
if data is not None:
pool, time, price = data
# log.debug(f'OHLC {pool["address"]} {time} {price}')
ohlcs.light_update_all(pool['address'], time, price)
ohlcs.update(pool['address'], time, price)
async def flush_callback():
# start = now()
@@ -54,17 +54,20 @@ async def flush_callback():
log.info(f'forward filling to present time')
for addr, data in address_metadata.items():
if data['type'] == 'Pool' and data['exchange'] >= 0:
ohlcs.light_update_all(addr, time, None)
ohlcs.update(addr, time, None)
log.info("flushing OHLC's")
ohlcs.flush()
log.info(f'backfill completed through block {block.height} {time:%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}')
async def main():
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
log.setLevel(logging.DEBUG)
parse_args()
if config.ohlc_dir is None:
fatal('an ohlc_dir must be configured')
if not config.ohlc_dir:
fatal('Must configure ohlc_dir')
global ohlcs
ohlcs = FinalOHLCRepository()
await blockchain.connect()
walker = BlockWalker(flush_callback, timedelta(seconds=config.walker_flush_interval))
walker.add_event_trigger(handle_backfill_uniswap_swaps,

View File

@@ -20,6 +20,7 @@ class Config:
metadata: Optional[str] = None
ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk
chunk_cache_size: int = 128 # Number of pools that have their OHLC chunks cached
concurrent_rpc_connections: int = 4
parallel_logevent_queries: bool = True

317
src/dexorder/final_ohlc.py Normal file
View File

@@ -0,0 +1,317 @@
import logging
import os
from datetime import timedelta, datetime, timezone
from typing import Optional
from cachetools import LFUCache
from dexorder import dec, timestamp, config, from_timestamp
from dexorder.base.chain import current_chain
from dexorder.ohlc import OHLC_PERIODS, period_name, ohlc_start_time
log = logging.getLogger(__name__)
# This is a parallel OHLC system that processes only known-final data, allowing it to be much more efficient
# than the leading edge reorgable OHLC's.
#
# See dexorder/doc/ohlc.md
#
class OHLCFilePath:
def __init__(self, symbol: str, period: timedelta, time: datetime):
self.symbol = symbol
self.period = period
name = period_name(period)
self.filepath = f'{symbol}/{name}/'
if period < timedelta(minutes=15):
# one file per day is the smallest resolution
# log.debug(f'{name} is daily')
self.start = start = datetime(time.year, time.month, time.day, tzinfo=timezone.utc)
self.end = self.start + timedelta(days=1)
self.file_interval = timedelta(days=1)
self.filepath += f'{start.year}/{symbol}-{name}-{start:%Y%m%d}.csv'
elif period < timedelta(hours=8):
# one file per month
# log.debug(f'{name} is monthly')
self.start = start = datetime(time.year, time.month, 1, tzinfo=timezone.utc)
end_month = time.month + 1
end_year = time.year
if end_month == 13:
end_month = 1
end_year += 1
self.end = datetime(end_year, end_month, 1, tzinfo=timezone.utc)
self.file_interval = timedelta(days=32) # it's ok to add a little more because we will find the start time of the new file.
self.filepath += f'{start.year}/{start.month}/{symbol}-{name}-{start:%Y%m}.csv'
elif period < timedelta(days=7):
# one file per year
# log.debug(f'{name} is yearly')
self.start = start = datetime(time.year, 1, 1, tzinfo=timezone.utc)
self.end = datetime(time.year+1, 1, 1, tzinfo=timezone.utc)
self.file_interval = timedelta(days=366)
self.filepath += f'{start.year}/{symbol}-{name}-{start:%Y%m}.csv'
else:
# weeklies get one file for everything
# log.debug(f'{name} is a single file')
self.start = None
self.end = None
self.file_interval = None
self.filepath += f'{symbol}-{name}.csv'
def next(self):
return OHLCFilePath(self.symbol, self.period, self.start + self.file_interval)
def __repr__(self):
return self.filepath
def __hash__(self):
return hash(self.filepath)
def __eq__(self, other):
return other == self.filepath or isinstance(other, OHLCFilePath) and self.filepath == other.filepath
class OHLCFile:
@staticmethod
def get(base_dir: str, filepath: OHLCFilePath):
key = base_dir, filepath
if key not in OHLCFile.cache:
OHLCFile.cache[key] = OHLCFile(base_dir, filepath)
return OHLCFile.cache[key]
def __init__(self, base_dir: str, filepath: OHLCFilePath):
self.base_dir = base_dir
self.filepath = filepath
self.file = None
self._cur = None
self._pending = None
self._final_rows = []
class OHLCFileCache(LFUCache[tuple[str,OHLCFilePath], 'OHLCFile']):
def popitem(self):
key, item = super().popitem()
item.close()
cache = OHLCFileCache(len(OHLC_PERIODS) * config.chunk_cache_size)
@property
def filename(self):
return os.path.join(self.base_dir, self.filepath.filepath)
@property
def timestamp(self):
try:
return self.cur[0]
except TypeError:
raise ValueError('cur is None')
@property
def price(self):
try:
return self.cur[-1]
except TypeError:
raise ValueError('cur is None')
@property
def cur(self):
return self._pending if self._pending is not None else self._cur
@cur.setter
def cur(self, value):
self._pending = value
def update(self, time: datetime, price: dec):
ts = timestamp(ohlc_start_time(time,self.filepath.period))
if self.file is None:
self._load(ts)
if self.cur is None:
# nothing yet. simple time+price
self.cur = ts, price
elif self.cur[0] < ts:
# the current bar was an old timestamp. Advance bars.
self._final_rows.append(self.cur)
# new bar time+price
self.cur = ts, price
elif len(self.cur) == 2:
self.cur = *self.cur, price
elif len(self.cur) == 3:
t, o, c = self.cur
self.cur = t, o, max(o,c,price), min(o,c,price), price
else:
t, o, h, line, c = self.cur
self.cur = t, o, max(h,line,price), min(h,line,price), price
@staticmethod
def row_bytes(row):
return (','.join(str(c) for c in row)+'\n').encode('ascii')
def flush(self):
# first we write the "final" rows which means rows that have been closed and will get no more data.
for row in self._final_rows:
self.file.write(OHLCFile.row_bytes(row))
# apply any pending changes to the current row
if self._pending is not None:
self._cur = self._pending
self._pending = None
# write the current row
if self._cur is not None:
data = OHLCFile.row_bytes(self._cur)
self.file.write(data)
# rewind our file cursor to the beginning of the current row
self.file.seek(len(data), os.SEEK_END)
self.file.flush()
def _load(self, earliest_change):
try:
self.file = open(self.filename, 'br+') # br+ is binary read+write
except FileNotFoundError:
# no existing file
os.makedirs(os.path.dirname(self.filename), exist_ok=True)
self.file = open(self.filename, 'bw')
else:
# load existing file
line_number = 0
row = None
prev_line = None
offset = 0 # this will point to the start of the last row
for line in self.file.readlines(): # all files should be small and can load at once
line_number += 1
row = line.decode('ascii').strip().split(',')
if not row[0]: # empty line
continue
if prev_line is not None:
# advance cursor past the previous row
offset += len(prev_line)
if int(row[0]) >= earliest_change:
# we are going to replace this row so stop walking forward and truncate the rest of the file
self.file.truncate(offset)
break
prev_line = line
if row is not None:
self._cur = [int(row[0]), *(dec(p) for p in row[1:])] # convert to int timestamp and dec prices
# set the file's write pointer the start of the final row
self.file.seek(offset)
# self._cur is now either None (empty file) or points to a natively-typed list representing the last row
def close(self):
self.file.close()
class OHLCFileSeries:
instances = {}
@staticmethod
def get(base_dir: str, symbol: str):
key = base_dir, symbol
if key not in OHLCFileSeries.instances:
OHLCFileSeries.instances[key] = OHLCFileSeries(base_dir,symbol)
return OHLCFileSeries.instances[key]
def __init__(self, base_dir: str, symbol: str):
self.base_dir = base_dir
self.symbol = symbol
self.series_start: Optional[datetime] = None # timestamp of the first datum in the series
self.write_offset: Optional[int] = None
self.last_flush: Optional[tuple[datetime,dec]] = None
self.quote_file = None
self.dirty_files = set()
self.quote: Optional[tuple[datetime,dec]] = None
def update(self, time: datetime, price: dec):
#
# load quote file
#
if self.quote_file is None:
quote_filename = os.path.join(self.base_dir, self.symbol, 'quote.csv')
try:
self.quote_file = open(quote_filename, 'br+')
# load existing quote file
line = self.quote_file.read().decode('ascii')
except FileNotFoundError:
os.makedirs(os.path.dirname(quote_filename), exist_ok=True)
self.quote_file = open(quote_filename, 'bw')
line = None
if line:
start, old_time, old_price = line.split(',')
self.series_start = from_timestamp(int(start))
# position the write cursor at the start of the second column so we can write the latest quote quickly
self.write_offset = len(start) + 1 # after the start time bytes and comma
self.quote_file.seek(self.write_offset)
self.last_flush = from_timestamp(int(old_time)), dec(old_price)
else:
# initialize new quote file with our own series_start time
self.quote_file.write((str(timestamp(time)) + ',').encode('ascii'))
self.write_offset = 0
self.last_flush = None
#
# forward-fill OHLC files that would otherwise be empty/skipped
#
if self.last_flush is not None:
for period in OHLC_PERIODS:
# get the path to the file that was last flushed
t, p = self.last_flush
path = OHLCFilePath(self.symbol, period, t)
while path.end and path.end < time:
path = path.next()
# initialize the new file using the carried-forward price
file = OHLCFile.get(self.base_dir, path)
file.update(file.filepath.start, p) # set file opening price
self.dirty_files.add(file)
self.quote = time, price
if self.series_start is None:
self.series_start = time
for period in OHLC_PERIODS:
file = OHLCFile.get(self.base_dir, OHLCFilePath(self.symbol, period, time))
file.update(time, price)
self.dirty_files.add(file)
def flush(self):
if self.quote is None:
log.warning('OHLCFileSeries was flushed without having any updated data.')
return
time, price = self.quote
ts = timestamp(time)
#
# flush dirty OHLC files
#
for file in self.dirty_files:
file.flush()
self.dirty_files.clear()
#
# flush quote file
#
self.quote_file.write(f'{ts},{price:f}'.encode('ascii'))
self.quote_file.flush()
self.quote_file.seek(self.write_offset)
# remember where we were so we can forward-fill again next time
self.last_flush = self.quote
class FinalOHLCRepository:
"""
Used for backfill when all data is known to be final. Keeps a simple table of current NativeOHLC candles.
"""
def __init__(self):
assert config.ohlc_dir
self.dirty_series = set()
def update(self, symbol: str, time: datetime, price: Optional[dec]):
chain_id = current_chain.get().id
base_dir = os.path.join(config.ohlc_dir, str(chain_id))
series = OHLCFileSeries.get(base_dir, symbol)
series.update(time, price)
self.dirty_series.add(series)
def flush(self) -> None:
for series in self.dirty_series:
series.flush()
self.dirty_series.clear()

View File

@@ -1,17 +1,25 @@
#
# NOTE: There are two OHLC systems. This one is for recent OHLC series which can get reorged in the backend. There is
# a separate final_ohlc.py system for writing finalized (old) pricing data to disk. To get a complete view of the price,
# a client must load the history from the finaldata disk files and then splice the dynamic recent OHLC series over the
# end of the history.
#
import logging
import os
from datetime import datetime, timedelta, timezone
from decimal import InvalidOperation
from typing import Optional, NamedTuple, Reversible, Union, TypedDict
from typing import Optional, NamedTuple, Reversible, Union
from cachetools import LFUCache
from typing_extensions import deprecated
from dexorder import dec, config, from_timestamp, timestamp, now, minutely
from dexorder.base.chain import current_chain
from dexorder.blocks import get_block_timestamp
from dexorder.blockstate import BlockDict, DiffItem, current_blockstate
from dexorder.blockstate.diff import DiffEntryItem
from dexorder.blockstate.fork import Fork, current_fork
from dexorder.blockstate.fork import Fork
from dexorder.util import json
from dexorder.util.shutdown import fatal
@@ -195,8 +203,6 @@ def series_path(chain_id: int = None):
def chunk_path(symbol: str, period: timedelta, time: datetime, *, chain_id: int = None) -> str:
if chain_id is None:
chain_id = current_chain.get().id
start = ohlc_start_time(time, period)
name = period_name(period)
return f'{chain_id}/{symbol}/{name}/' + (
@@ -208,8 +214,7 @@ def chunk_path(symbol: str, period: timedelta, time: datetime, *, chain_id: int
class Chunk:
"""
Chunks map to files of OHLC's on disk. If an OHLC contains 6 fields instead of just 5, the 6th field is a
timestamp pointing to the next
DEPRECATED
"""
def __init__(self, repo_dir: str, symbol: str, period: timedelta, time: datetime,
*, bars: Optional[list[NativeOHLC]] = None, chain_id: int = None):
@@ -304,7 +309,7 @@ class OHLCRepository:
""" can't actually make more than one of these because there's a global recent_ohlcs BlockDict """
self._dir = base_dir
self._chain_id = chain_id
self.cache = LFUCache(len(OHLC_PERIODS) * 1024)
self.cache = LFUCache(len(OHLC_PERIODS) * config.chunk_cache_size)
self.dirty_chunks = set()
self._quotes = None
@@ -440,79 +445,6 @@ class OHLCRepository:
os.makedirs(os.path.dirname(filepath), exist_ok=True)
class SeriesDict (TypedDict):
start: int # timestamp of the start of the series
class FinalOHLCRepository (OHLCRepository):
"""
Used for backfill when all data is known to be final. Keeps a simple table of current NativeOHLC candles.
"""
def __init__(self, base_dir: str = None, *, chain_id: int = None):
super().__init__(base_dir, chain_id=chain_id)
self.current:dict[tuple[str,timedelta],NativeOHLC] = {} # current bars keyed by symbol
self.dirty_bars = set()
self.series:dict[str,dict[str,SeriesDict]] = {} # keyed by [str(chain_id)][symbol]
self._series_dirty = False
def light_update_all(self, symbol: str, time: datetime, price: Optional[dec]):
for period in OHLC_PERIODS:
self.light_update(symbol, period, time, price)
def light_update(self, symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None,
*, backfill=True):
if OHLC_LIMIT_POOLS_DEBUG is not None and (symbol,period) not in OHLC_LIMIT_POOLS_DEBUG:
return
if price is not None:
self.quotes[symbol] = timestamp(time), str(price)
start = ohlc_start_time(time, period)
# log.debug(f'OHLC start_time {start}')
chunk = self.get_chunk(symbol, period, start)
key = symbol, period
prev = self.current.get(key)
if prev is None:
# cache miss. load from chunk.
prev = self.current[key] = chunk.bar_at(start)
# log.debug(f'loaded prev bar from chunk {prev}')
if prev is None and symbol in self.quotes:
latest_bar_time = ohlc_start_time(from_timestamp(self.quotes[symbol][0]), period)
prev = self.current[key] = self.get_chunk(symbol, period, latest_bar_time).bar_at(latest_bar_time)
if prev is None:
# never seen before. create new bar.
# log.debug(f'no prev bar')
if price is not None:
close = price
else:
try:
close = dec(self.quotes[symbol][1])
except KeyError:
log.warning(f'light_update tried to advance time on {symbol} which has no previous price.')
return # no previous quote, no current price either
bar = self.current[key] = NativeOHLC(start, price, price, price, close)
chunk.update(bar, backfill=backfill)
self.dirty_chunks.add(chunk)
chain_id_str = str(current_chain.get().id)
if chain_id_str not in self.series:
self.series[chain_id_str] = {}
self.series[chain_id_str][f'{key[0]}|{period_name(key[1])}'] = {'start': timestamp(start)}
self._series_dirty = True
else:
updated = update_ohlc(prev, period, time, price)
for bar in updated:
chunk = self.get_chunk(symbol, period, bar.start)
chunk.update(bar, backfill=backfill)
self.dirty_chunks.add(chunk)
self.current[key] = updated[-1]
def flush(self) -> None:
# flush chunks
super().flush()
# flush series.json if needed
if self._series_dirty:
save_json(self.series, os.path.join(self.dir, series_path(self.chain_id)))
self._series_dirty = False
def save_json(obj, filename):
for _ in range(2):
try:

View File

@@ -1,7 +1,7 @@
import logging
from uuid import UUID
from web3.exceptions import ContractPanicError
from web3.exceptions import ContractPanicError, ContractLogicError
from dexorder import db
from dexorder.base.order import TrancheExecutionRequest, TrancheKey
@@ -20,8 +20,9 @@ class TrancheExecutionHandler (TransactionHandler):
# noinspection PyBroadException
try:
return await get_dexorder_contract().build.execute(job_id.bytes, (req.vault, req.order_index, req.tranche_index, req.price_proof))
except ContractPanicError as px:
log.error(f'While executing job {job_id}: {px}')
except (ContractPanicError, ContractLogicError) as x:
# todo if there's a logic error we shouldn't keep trying
log.error(f'While executing job {job_id}: {x}')
await self.complete_transaction(db.session.get(TransactionJob, job_id))
except Exception:
log.exception(f'Could not send execution request {req}')