diff --git a/src/dexorder/__init__.py b/src/dexorder/__init__.py index 96c8019..41730fa 100644 --- a/src/dexorder/__init__.py +++ b/src/dexorder/__init__.py @@ -1,6 +1,6 @@ # noinspection PyPackageRequirements from contextvars import ContextVar -from datetime import datetime +from datetime import datetime, timezone from decimal import Decimal from typing import Callable, Any @@ -8,11 +8,19 @@ from web3 import AsyncWeb3 dec = Decimal def now(): - return datetime.utcnow() # we use naive datetimes that are always UTC + return datetime.now(timezone.utc) def timestamp(): return datetime.now().timestamp() +def from_timestamp(ts): + return datetime.fromtimestamp(ts, timezone.utc) + +def from_isotime(string): + return datetime.fromisoformat(string).replace(tzinfo=timezone.utc) + +def minutely(dt: datetime): + return dt.replace(tzinfo=None).isoformat(timespec="minutes") # NARG is used in argument defaults to mean "not specified" rather than "specified as None" class _Token: diff --git a/src/dexorder/base/chain.py b/src/dexorder/base/chain.py index 18ca96b..6c86e47 100644 --- a/src/dexorder/base/chain.py +++ b/src/dexorder/base/chain.py @@ -2,7 +2,11 @@ import math # noinspection PyPackageRequirements from contextvars import ContextVar +from async_lru import alru_cache + import dexorder +from dexorder import current_w3 +from dexorder.util import hexint class Blockchain: @@ -55,16 +59,21 @@ current_chain = ContextVar[Blockchain]('current_chain') class BlockClock: - def __init__(self): - self.timestamp = 0 - self.adjustment = 0 - - def set(self, timestamp): - self.timestamp = timestamp - self.adjustment = timestamp - dexorder.timestamp() + def __init__(self, block_timestamp=0, adjustment=None): + self.block_timestamp = block_timestamp if block_timestamp != 0 else dexorder.timestamp() + self.adjustment = 0 if block_timestamp == 0 \ + else adjustment if adjustment is not None \ + else block_timestamp - dexorder.timestamp() + @property def timestamp(self): return math.ceil(dexorder.timestamp() + self.adjustment) -current_clock = ContextVar[BlockClock]('clock') # current estimated timestamp of the blockchain. will be different than current_block.get().timestamp when evaluating time triggers in-between blocks +current_clock = ContextVar[BlockClock]('clock') # current estimated timestamp of the blockchain. will be different than current_block.get().timestamp when evaluating time triggers in-between blocks or for historical playbacks +@alru_cache +async def get_block_timestamp(blockhash) -> int: + response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False]) + raw = hexint(response['result']['timestamp']) + # noinspection PyTypeChecker + return raw if type(raw) is int else hexint(raw) diff --git a/src/dexorder/bin/datamain.py b/src/dexorder/bin/backfill_ohlc.py similarity index 62% rename from src/dexorder/bin/datamain.py rename to src/dexorder/bin/backfill_ohlc.py index e06c8d5..ded13f2 100644 --- a/src/dexorder/bin/datamain.py +++ b/src/dexorder/bin/backfill_ohlc.py @@ -1,56 +1,23 @@ import logging import sys from asyncio import CancelledError -from datetime import datetime -from async_lru import alru_cache -from web3.types import EventData - -from dexorder import blockchain, config, dec, current_w3 -from dexorder.base.ohlc import ohlcs, recent_ohlcs -from dexorder.base.orderlib import Exchange +from dexorder import blockchain, config +from dexorder.base.ohlc import recent_ohlcs from dexorder.bin.executable import execute from dexorder.blockstate.blockdata import BlockData from dexorder.blockstate.db_state import DbState from dexorder.configuration import parse_args from dexorder.contract import get_contract_event from dexorder.database import db +from dexorder.event_handler import handle_uniswap_swap from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.memcache import memcache -from dexorder.pools import uniswap_price, Pools from dexorder.runner import BlockStateRunner -from dexorder.util import hexint log = logging.getLogger('dexorder') -@alru_cache -async def get_block_timestamp(blockhash) -> int: - response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False]) - raw = hexint(response['result']['timestamp']) - # noinspection PyTypeChecker - return raw if type(raw) is int else hexint(raw) - - -async def handle_uniswap_swap(swap: EventData): - try: - sqrt_price = swap['args']['sqrtPriceX96'] - except KeyError: - return - addr = swap['address'] - pool = await Pools.get(addr) - if pool is None: - return - if pool.exchange != Exchange.UniswapV3: - log.debug(f'Ignoring {pool.exchange} pool {addr}') - return - price: dec = await uniswap_price(pool, sqrt_price) - timestamp = await get_block_timestamp(swap['blockHash']) - dt = datetime.fromtimestamp(timestamp) - log.debug(f'pool {addr} {dt} {price}') - ohlcs.update_all(addr, dt, price, create=True) - - async def main(): # noinspection DuplicatedCode logging.basicConfig(level=logging.INFO, stream=sys.stdout) diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index c77f7c5..81da665 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -12,7 +12,7 @@ from dexorder.contract import get_contract_event from dexorder.contract.dexorder import get_factory_contract, get_dexorder_contract from dexorder.event_handler import init_order_triggers, init, dump_log, handle_vault_created, handle_order_placed, handle_transfer, handle_uniswap_swap, \ handle_swap_filled, handle_order_canceled, handle_order_cancel_all, handle_dexorderexecutions, activate_time_triggers, activate_price_triggers, \ - process_active_tranches, process_execution_requests + process_active_tranches, process_execution_requests, check_ohlc_rollover from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.memcache import memcache from dexorder.runner import BlockStateRunner @@ -61,12 +61,13 @@ def setup_logevent_triggers(runner): runner.add_event_trigger(handle_dexorderexecutions, executions) # these callbacks run after the ones above on each block, plus these also run every second - runner.add_postprocess_trigger(activate_time_triggers) - runner.add_postprocess_trigger(activate_price_triggers) - runner.add_postprocess_trigger(process_active_tranches) - runner.add_postprocess_trigger(process_execution_requests) - runner.add_postprocess_trigger(create_transactions) - runner.add_postprocess_trigger(send_transactions) + runner.postprocess_cbs.append(check_ohlc_rollover) + runner.postprocess_cbs.append(activate_time_triggers) + runner.postprocess_cbs.append(activate_price_triggers) + runner.postprocess_cbs.append(process_active_tranches) + runner.postprocess_cbs.append(process_execution_requests) + runner.postprocess_cbs.append(create_transactions) + runner.postprocess_cbs.append(send_transactions) async def main(): diff --git a/src/dexorder/blockstate/blockdata.py b/src/dexorder/blockstate/blockdata.py index 94684a1..84b50c3 100644 --- a/src/dexorder/blockstate/blockdata.py +++ b/src/dexorder/blockstate/blockdata.py @@ -22,7 +22,7 @@ class DataType(Enum): DICT: int = 3 -class BlockData: +class BlockData (Generic[T]): registry: dict[Any,'BlockData'] = {} # series name and instance def __init__(self, data_type: DataType, series: Any, *, @@ -49,12 +49,12 @@ class BlockData: def seriesstr(self): return self.series2str(self.series) - def setitem(self, item, value, overwrite=True): + def setitem(self, item, value: T, overwrite=True): state = current_blockstate.get() fork = current_fork.get() state.set(fork, self.series, item, value, overwrite) - def getitem(self, item, default=NARG): + def getitem(self, item, default=NARG) -> T: state = current_blockstate.get() fork = current_fork.get() try: @@ -124,7 +124,7 @@ class BlockData: state.delete_series(fork, self.series) -class BlockSet(Generic[T], Iterable[T], BlockData): +class BlockSet(Generic[T], Iterable[T], BlockData[T]): def __init__(self, series: Any, **tags): super().__init__(DataType.SET, series, **tags) self.series = series @@ -143,7 +143,7 @@ class BlockSet(Generic[T], Iterable[T], BlockData): yield from (k for k,v in self.iter_items(self.series)) -class BlockDict(Generic[K,V], BlockData): +class BlockDict(Generic[K,V], BlockData[V]): def __init__(self, series: Any, **tags): super().__init__(DataType.DICT, series, **tags) diff --git a/src/dexorder/data.py b/src/dexorder/data.py index 345439b..2e11815 100644 --- a/src/dexorder/data.py +++ b/src/dexorder/data.py @@ -10,7 +10,7 @@ log = logging.getLogger(__name__) # if pub is True, then event is the current series name, room is the key, and args is [value] # values of DELETE are serialized as nulls -def pub_vault_balances(k, v): +def pub_vault_balances(_s, k, v): chain_id = current_chain.get().chain_id try: return f'{chain_id}|{vault_owners[k]}', 'vb', (chain_id, k, json.dumps({k2: str(v2) for k2, v2 in v.items()})) diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index 8440665..54d36f4 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -1,13 +1,15 @@ import asyncio import functools import logging +from datetime import datetime from uuid import UUID from web3.types import EventData -from dexorder import current_pub, db, dec -from dexorder.base.chain import current_chain, current_clock +from dexorder import current_pub, db, dec, from_timestamp, minutely +from dexorder.base.chain import current_chain, current_clock, get_block_timestamp from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey +from dexorder.ohlc import ohlcs, recent_ohlcs from dexorder.transaction import submit_transaction_request from dexorder.pools import uniswap_price, new_pool_prices, pool_prices, Pools from dexorder.contract.dexorder import vault_address, VaultContract @@ -15,7 +17,7 @@ from dexorder.contract import ERC20 from dexorder.data import vault_owners, vault_balances from dexorder.database.model.block import current_block from dexorder.database.model.transaction import TransactionJob -from dexorder.base.orderlib import SwapOrderState +from dexorder.base.orderlib import SwapOrderState, Exchange from dexorder.order.orderstate import Order from dexorder.order.triggers import OrderTriggers, price_triggers, time_triggers, \ unconstrained_price_triggers, execution_requests, inflight_execution_requests, TrancheStatus, active_tranches, new_price_triggers, activate_order @@ -151,6 +153,15 @@ async def handle_transfer(transfer: EventData): # log.debug(f'vaults: {list(vaults)}') +async def handle_uniswap_swap_old(swap: EventData): + try: + sqrt_price = swap['args']['sqrtPriceX96'] + except KeyError: + return + addr = swap['address'] + price: dec = await uniswap_price(await Pools.get(addr), sqrt_price) + pool_prices[addr] = price + async def handle_uniswap_swap(swap: EventData): try: @@ -158,9 +169,18 @@ async def handle_uniswap_swap(swap: EventData): except KeyError: return addr = swap['address'] - price: dec = await uniswap_price(await Pools.get(addr), sqrt_price) - log.debug(f'pool {addr} {price}') + pool = await Pools.get(addr) + if pool is None: + return + if pool.exchange != Exchange.UniswapV3: + log.debug(f'Ignoring {pool.exchange} pool {addr}') + return + price: dec = await uniswap_price(pool, sqrt_price) + timestamp = await get_block_timestamp(swap['blockHash']) + dt = from_timestamp(timestamp) pool_prices[addr] = price + ohlcs.update_all(addr, dt, price) + log.debug(f'pool {addr} {minutely(dt)} {price}') def handle_vault_created(created: EventData): @@ -186,23 +206,27 @@ def handle_vault_created(created: EventData): async def activate_time_triggers(): - now = current_clock.get().timestamp() + now = current_clock.get().timestamp # log.debug(f'activating time triggers at {now}') # time triggers for tt in tuple(time_triggers): + # noinspection PyTypeChecker await maywait(tt(now)) + async def activate_price_triggers(): # log.debug(f'activating price triggers') pools_triggered = set() for pool, price in new_pool_prices.items(): pools_triggered.add(pool) for pt in tuple(price_triggers[pool]): + # noinspection PyTypeChecker await maywait(pt(price)) for pool, triggers in new_price_triggers.items(): if pool not in pools_triggered: price = pool_prices[pool] for pt in triggers: + # noinspection PyTypeChecker await maywait(pt(price)) for t in tuple(unconstrained_price_triggers): # noinspection PyTypeChecker @@ -296,3 +320,16 @@ def finish_execution_request(req: TrancheExecutionRequest, error: str): else: if er.height < current_block.get().height: del execution_requests[tk] + + +last_ohlc_rollover = 0 +def check_ohlc_rollover(): + global last_ohlc_rollover + time = current_block.get().timestamp + dt = from_timestamp(time) + diff = time - last_ohlc_rollover + if diff >= 60 or dt.minute != from_timestamp(last_ohlc_rollover).minute: + for (symbol, period) in recent_ohlcs.keys(): + ohlcs.update(symbol, period, dt) + last_ohlc_rollover = time + diff --git a/src/dexorder/memcache/memcache_state.py b/src/dexorder/memcache/memcache_state.py index 54cf6d6..20d644b 100644 --- a/src/dexorder/memcache/memcache_state.py +++ b/src/dexorder/memcache/memcache_state.py @@ -70,7 +70,7 @@ class RedisState (SeriesCollection): value = d.value2str(diff.value) pub_era = series, key, [value] elif callable(pub_era): - pub_era = await maywait(pub_era(diff.key, diff.value)) + pub_era = await maywait(pub_era(diff.series, diff.key, diff.value)) if pub_era is not None: e, r, a = pub_era # noinspection PyTypeChecker diff --git a/src/dexorder/base/ohlc.py b/src/dexorder/ohlc.py similarity index 73% rename from src/dexorder/base/ohlc.py rename to src/dexorder/ohlc.py index 9bd1777..fc32d89 100644 --- a/src/dexorder/base/ohlc.py +++ b/src/dexorder/ohlc.py @@ -2,12 +2,13 @@ import json import logging import os from dataclasses import dataclass -from datetime import datetime, timedelta -from typing import Optional +from datetime import datetime, timedelta, timezone +from typing import Optional, NamedTuple from cachetools import LFUCache -from dexorder import dec, config +from dexorder import dec, config, from_isotime, minutely +from dexorder.base.chain import current_chain from dexorder.blockstate import BlockDict log = logging.getLogger(__name__) @@ -18,7 +19,7 @@ OHLC_PERIODS = [ timedelta(days=1), timedelta(days=2), timedelta(days=3), timedelta(days=7) ] -OHLC_DATE_ROOT = datetime(2009, 1, 4) # Sunday before Bitcoin Genesis +OHLC_DATE_ROOT = datetime(2009, 1, 4, tzinfo=timezone.utc) # Sunday before Bitcoin Genesis # OHLC's are stored as [time, open, high, low, close] string values. If there was no data during the interval, # then open, high, and low are None but the close value is carried over from the previous interval. @@ -29,7 +30,7 @@ def opt_dec(v): return None if v is None else dec(v) def dt(v): - return v if isinstance(v, datetime) else datetime.fromisoformat(v) + return v if isinstance(v, datetime) else from_isotime(v) @dataclass class NativeOHLC: @@ -81,6 +82,7 @@ def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[d returns an ordered list of OHLC's that have been created/modified by the new time/price if price is None, then bars are advanced based on the time but no new price is added to the series. """ + log.debug(f'\tupdating {prev} with {minutely(time)} {price}') cur = NativeOHLC.from_ohlc(prev) assert time >= cur.start result = [] @@ -91,6 +93,7 @@ def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[d break result.append(cur.ohlc) cur = NativeOHLC(end, None, None, None, cur.close) + log.debug(f'\tresult after finalization: {result}') # if we are setting a price, update the current bar if price is not None: if cur.open is None: @@ -102,12 +105,13 @@ def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[d cur.low = min(cur.low, price) cur.close = price result.append(cur.ohlc) + log.debug(f'\tappended current bar: {cur.ohlc}') + log.debug(f'\tupdate result: {result}') return result - -# The most recent OHLC's are stored as block data. We store a list of at least the two latest bars, which provides clients with -# the latest finalized bar as well as the current open bar. -recent_ohlcs = BlockDict('ohlc', db=True, redis=True) +class OHLCKey (NamedTuple): + symbol: str + period: timedelta class OHLCRepository: @@ -128,26 +132,33 @@ class OHLCRepository: if (symbol, period) not in recent_ohlcs: recent_ohlcs[(symbol, period)] = [] - def update_all(self, symbol: str, time: datetime, price: dec, *, create: bool = False): + def update_all(self, symbol: str, time: datetime, price: dec, *, create: bool = True): for period in OHLC_PERIODS: self.update(symbol, period, time, price, create=create) - def update(self, symbol: str, period: timedelta, time: datetime, price: Optional[dec], *, create: bool = False) -> Optional[list[OHLC]]: + def update(self, symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, *, create: bool = True) -> Optional[list[OHLC]]: """ if price is None, then bars are advanced based on the time but no new price is added to the series. """ + logname = f'{symbol} {ohlc_name(period)}' + log.debug(f'Updating OHLC {logname} {minutely(time)} {price}') key = (symbol, period) bars: Optional[list[OHLC]] = recent_ohlcs.get(key) - if bars is None: - if create is False: + if not bars: + if create is False or price is None: return # do not track symbols which have not been explicity set up - bars = [OHLC((ohlc_start_time(time, period).isoformat(timespec='minutes'), price, price, price, price))] - updated = update_ohlc(bars[-1], period, time, price) - if len(updated) == 1: - updated = [*bars[:-1], updated[0]] # return the previous finalized bars along with the updated current bar + p = str(price) + updated = [OHLC((minutely(ohlc_start_time(time, period)), p, p, p, p))] + log.debug(f'\tcreated new bars {updated}') + else: + updated = update_ohlc(bars[-1], period, time, price) + if len(updated) == 1: + updated = [bars[-1], updated[0]] # return the previous finalized bar along with the updated current bar + log.debug(f'\tnew recents: {updated}') recent_ohlcs.setitem(key, updated) if len(updated) > 1: - self.save_all(symbol, period, updated[:-1]) + log.debug(f'\tsaving finalized bars: {updated[:-1]}') + self.save_all(symbol, period, updated[:-1]) # save any finalized bars to storage return updated def save_all(self, symbol: str, period: timedelta, ohlc_list: list[OHLC]) -> None: @@ -160,13 +171,14 @@ class OHLCRepository: if not chunk: chunk = [ohlc] else: - start = datetime.fromisoformat(chunk[0][0]) + start = from_isotime(chunk[0][0]) index = (time - start) // period + assert index <= len(chunk) if index == len(chunk): - assert datetime.fromisoformat(chunk[-1][0]) + period == time + assert from_isotime(chunk[-1][0]) + period == time chunk.append(ohlc) else: - assert datetime.fromisoformat(chunk[index][0]) == time + assert from_isotime(chunk[index][0]) == time chunk[index] = ohlc self.save_chunk(symbol, period, chunk) @@ -191,7 +203,7 @@ class OHLCRepository: def save_chunk(self, symbol: str, period: timedelta, chunk: list[OHLC]): if not chunk: return - path = self.chunk_path(symbol, period, datetime.fromisoformat(chunk[0][0])) + path = self.chunk_path(symbol, period, from_isotime(chunk[0][0])) try: with open(path, 'w') as file: json.dump(chunk, file) @@ -212,4 +224,27 @@ class OHLCRepository: ) +def pub_ohlc(_series:str, key: OHLCKey, bars: list[OHLC]): + pool_addr, period = key + chain_id = current_chain.get().chain_id + return ( + f'{chain_id}|{pool_addr}|{ohlc_name(period)}', # channel name is like 0x...|1m + 'ohlcs', + (chain_id, pool_addr, bars) + ) + +def ohlc_key_to_str(k): + return f'{k[0]}|{ohlc_name(k[1])}' + +def ohlc_str_to_key(s): + pool, period_name = s.split('|') + return pool, period_from_name(period_name) + +# The most recent OHLC's are stored as block data. We store a list of at least the two latest bars, which provides clients with +# the latest finalized bar as well as the current open bar. +recent_ohlcs: BlockDict[OHLCKey, list[OHLC]] = BlockDict('ohlc', db=True, redis=True, pub=pub_ohlc, + key2str=ohlc_key_to_str, str2key=ohlc_str_to_key, + series2key=lambda x:x, series2str=lambda x:x) + + ohlcs = OHLCRepository() diff --git a/src/dexorder/order/orderstate.py b/src/dexorder/order/orderstate.py index d23dfc4..122dc22 100644 --- a/src/dexorder/order/orderstate.py +++ b/src/dexorder/order/orderstate.py @@ -182,7 +182,7 @@ class Order: Order.order_statuses.unload(self.key) # but then unload from memory after root promotion @staticmethod - async def pub_order_status(k, v): + def pub_order_status(_s, k, v): # publish status updates (on placing and completion) to web clients try: chain_id = current_chain.get().chain_id @@ -196,7 +196,7 @@ class Order: log.error(f'could not dump {v}') @staticmethod - async def pub_order_fills(k, v): + def pub_order_fills(_s, k, v): # publish status updates (on placing and completion) to web clients if v is DELETE: return None diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index 6135d1f..5a7c285 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -54,7 +54,7 @@ async def line_passes(lc: tuple[float,float], is_min: bool, price: dec) -> bool: b, m = lc if b == 0 and m == 0: return True - limit = m * current_clock.get().timestamp() + b + limit = m * current_clock.get().timestamp + b log.debug(f'line passes {limit} {"<" if is_min else ">"} {price}') # todo ratios # prices AT the limit get zero volume, so we only trigger on >, not >= @@ -98,7 +98,7 @@ class TrancheTrigger: if tranche_remaining == 0 or tranche_remaining < self.order.min_fill_amount: # min_fill_amount could be 0 (disabled) so we also check for the 0 case separately self._status = TrancheStatus.Filled return - timestamp = current_clock.get().timestamp() + timestamp = current_clock.get().timestamp self._status = \ TrancheStatus.Pricing if self.time_constraint is None else \ TrancheStatus.Early if timestamp < self.time_constraint[0] else \ diff --git a/src/dexorder/pools.py b/src/dexorder/pools.py index b8668e5..81ec780 100644 --- a/src/dexorder/pools.py +++ b/src/dexorder/pools.py @@ -52,7 +52,7 @@ class PoolPrices (BlockDict[str, dec]): new_pool_prices[item] = value -def pub_pool_price(k,v): +def pub_pool_price(_s,k,v): chain_id = current_chain.get().chain_id return f'{chain_id}|{k}', 'p', (chain_id, k, str(v)) diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 7ba8463..2a0a01b 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -93,10 +93,6 @@ class BlockStateRunner: log_filter = {'topics': [topic(event.abi)]} self.events.append((callback, event, log_filter)) - def add_postprocess_trigger(self, callback: Maywaitable[[], None]): - # noinspection PyTypeChecker - self.postprocess_cbs.append(callback) - async def run(self): # this run() process discovers new heads and puts them on a queue for the worker to process. the discovery is ether websockets or polling if self.state: diff --git a/src/dexorder/util/sql.py b/src/dexorder/util/sql.py index 62a6581..124ef70 100644 --- a/src/dexorder/util/sql.py +++ b/src/dexorder/util/sql.py @@ -1,14 +1,16 @@ from datetime import datetime, timedelta from typing import Union +from dexorder import now + def where_time_range(sql, time_column, start: Union[datetime,timedelta,None] = None, end: Union[datetime,timedelta,None] = None): if start is not None: if isinstance(start, timedelta): - start = datetime.now() - abs(start) + start = now() - abs(start) sql = sql.where(time_column >= start) if end is not None: if isinstance(end, timedelta): - end = datetime.now() - abs(end) + end = now() - abs(end) sql = sql.where(time_column < end) return sql