Compare commits

..

5 Commits

Author SHA1 Message Date
tim
0bb670b356 redis initial state push fix 2025-04-01 13:52:49 -04:00
tim
52b406ba17 ohlc retained length fix 2025-04-01 13:52:39 -04:00
tim
3d0342d19d price line metrics fix 2025-04-01 13:52:29 -04:00
tim
dbf960bae9 initial TrancheState fix 2025-04-01 13:52:21 -04:00
tim
d49f142fe3 redis pipeline autoflush after 10000 entries 2025-04-01 10:54:25 -04:00
6 changed files with 47 additions and 45 deletions

View File

@@ -117,7 +117,7 @@ async def main():
if redis_state: if redis_state:
# load initial state # load initial state
log.info('initializing redis with root state') log.info('initializing redis with root state')
await redis_state.save(state.root_fork, state.diffs_by_branch[state.root_branch.id]) await redis_state.init(state, state.root_fork)
await initialize_accounting_runner() await initialize_accounting_runner()

View File

@@ -220,7 +220,7 @@ async def update_metrics():
metric.vaults.set(vault_owners.upper_len()) metric.vaults.set(vault_owners.upper_len())
metric.open_orders.set(Order.open_orders.upper_len()) metric.open_orders.set(Order.open_orders.upper_len())
metric.triggers_time.set(len(TimeTrigger.all)) metric.triggers_time.set(len(TimeTrigger.all))
metric.triggers_line.set(len(PriceLineTrigger.triggers_set)) metric.triggers_line.set(sum(len(s) for s in PriceLineTrigger.by_pool.values()))
# slow updates # slow updates
global slow_metric_update global slow_metric_update

View File

@@ -4,7 +4,6 @@ from contextvars import ContextVar
import redis.asyncio as redis_async import redis.asyncio as redis_async
from redis.asyncio import Redis from redis.asyncio import Redis
from redis.asyncio.client import Pipeline
from dexorder import config from dexorder import config
@@ -14,9 +13,10 @@ log = logging.getLogger(__name__)
class Memcache: class Memcache:
@staticmethod @staticmethod
@asynccontextmanager @asynccontextmanager
async def batch(): async def batch(transaction=True):
old_redis: Redis = current_redis.get() old_redis: Redis = current_redis.get()
pipe: Pipeline = old_redis.pipeline() pipe = old_redis.pipeline(transaction=transaction)
# noinspection PyTypeChecker
current_redis.set(pipe) current_redis.set(pipe)
try: try:
yield pipe yield pipe

View File

@@ -2,7 +2,7 @@ import logging
from collections import defaultdict from collections import defaultdict
from typing import Iterable, Union, Reversible, Any from typing import Iterable, Union, Reversible, Any
from redis.asyncio.client import Pipeline, Redis from redis.asyncio.client import Pipeline
from socket_io_emitter import Emitter from socket_io_emitter import Emitter
from dexorder import DELETE from dexorder import DELETE
@@ -40,11 +40,12 @@ class RedisState (SeriesCollection):
for series in self.datas.keys(): for series in self.datas.keys():
for k, v in state.iteritems(fork, series): for k, v in state.iteritems(fork, series):
diffs.append(DiffItem(series, k, v)) diffs.append(DiffItem(series, k, v))
await self.save(fork, diffs) # todo tim fix pubs
await self.save(fork, diffs, use_transaction=True, skip_pubs=True) # use_transaction=False if the data is too big
# noinspection PyAsyncCall # noinspection PyAsyncCall
async def save(self, fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): async def save(self, fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]], *, use_transaction=True, skip_pubs=False):
# the diffs must be already compressed such that there is only one action per key # the diffs must be already compressed such that there is only one action per key
chain = current_chain.get() chain = current_chain.get()
chain_id = chain.id chain_id = chain.id
@@ -91,21 +92,24 @@ class RedisState (SeriesCollection):
hsets[series][key] = value hsets[series][key] = value
else: else:
raise NotImplementedError raise NotImplementedError
r: Redis = current_redis.get() async with memcache.batch(use_transaction) as r:
for series, keys in sadds.items(): # Redis pipelines fill up before our state can be sent, so we cannot do this atomically.
await r.sadd(series, *keys) # However, sending many individual calls is super slow, so we
for series, keys in sdels.items(): r: Pipeline
await r.srem(series, *keys) for series, keys in sadds.items():
for series, kvs in hsets.items(): r.sadd(series, *keys)
await r.hset(series, mapping=kvs) for series, keys in sdels.items():
for series, keys in hdels.items(): r.srem(series, *keys)
await r.hdel(series, *keys) for series, kvs in hsets.items():
block_series = f'{chain_id}|head' r.hset(series, mapping=kvs)
headstr = hexstr(fork.head) for series, keys in hdels.items():
await r.json(json_encoder).set(block_series,'$',[fork.height, headstr]) r.hdel(series, *keys)
pubs.append((str(chain_id), 'head', [fork.height, headstr])) block_series = f'{chain_id}|head'
headstr = hexstr(fork.head)
r.json(json_encoder).set(block_series,'$',[fork.height, headstr])
pubs.append((str(chain_id), 'head', [fork.height, headstr]))
# separate batch for pubs # separate batch for pubs
if pubs: if pubs and not skip_pubs:
await publish_all(pubs) await publish_all(pubs)

View File

@@ -359,7 +359,7 @@ class OHLCRepository:
if price is None, then bars are advanced based on the time but no new price is added to the series. if price is None, then bars are advanced based on the time but no new price is added to the series.
""" """
if OHLC_LIMIT_POOLS_DEBUG is not None and (symbol,period) not in OHLC_LIMIT_POOLS_DEBUG: if OHLC_LIMIT_POOLS_DEBUG is not None and (symbol,period) not in OHLC_LIMIT_POOLS_DEBUG:
return return None
# logname = f'{symbol} {period_name(period)}' # logname = f'{symbol} {period_name(period)}'
# log.debug(f'Updating OHLC {logname} {minutely(time)} {price}') # log.debug(f'Updating OHLC {logname} {minutely(time)} {price}')
if price is not None: if price is not None:
@@ -371,33 +371,31 @@ class OHLCRepository:
# log.debug(f'got recent {historical}') # log.debug(f'got recent {historical}')
if not historical: if not historical:
if create is False or price is None: if create is False or price is None:
return # do not track symbols which have not been explicity set up return None # do not track symbols which have not been explicity set up
historical = []
updated = [NativeOHLC(ohlc_start_time(time, period), price, price, price, price)] updated = [NativeOHLC(ohlc_start_time(time, period), price, price, price, price)]
# log.debug(f'\tcreated new bars {updated}') # log.debug(f'\tcreated new bars {updated}')
else: else:
updated = update_ohlc(historical[-1], period, time, price) updated = update_ohlc(historical[-1], period, time, price)
# drop any historical bars that are older than we need # overlap the updated OHLC's on top of the historical ones
# oldest_needed = cover the root block time plus one period prior
root_branch = current_blockstate.get().root_branch
root_hash = root_branch.head
if root_hash is not None:
root_timestamp = await get_block_timestamp(root_hash)
oldest_needed = from_timestamp(root_timestamp) - period
# noinspection PyTypeChecker
trim = (oldest_needed - historical[0].start) // period
if trim > 0:
historical = historical[trim:]
# now overlap the updated data on top of the historical data
if not historical or not updated:
updated = historical + updated
else:
last_bar = historical[-1].start last_bar = historical[-1].start
first_updated = updated[0].start first_updated = updated[0].start
overlap = (first_updated - last_bar) // period + 1 overlap = (first_updated - last_bar) // period + 1
updated = historical[:-overlap] + updated if overlap > 0 else historical + updated updated = historical[:-overlap] + updated if overlap > 0 else historical + updated
# log.debug(f'\tnew recents: {updated}')
# drop any bars that are older than we need
# oldest_needed = cover the root block time plus one period prior
root_branch = current_blockstate.get().root_branch
root_hash = root_branch.head
if root_hash is not None:
root_timestamp = await get_block_timestamp(root_hash)
oldest_needed = from_timestamp(root_timestamp) - period
# noinspection PyTypeChecker
trim = (oldest_needed - updated[0].start) // period
if trim > 0:
updated = updated[trim:]
if len(updated) > 3:
log.debug(f'\tnew recents ({len(updated)}): {updated}')
recent_ohlcs.setitem(key, updated) recent_ohlcs.setitem(key, updated)
return updated return updated

View File

@@ -393,7 +393,7 @@ class PriceLineTrigger (Trigger):
if inverted: if inverted:
price_now = 1/price_now price_now = 1/price_now
activated = value_now < price_now if is_min else value_now > price_now activated = value_now < price_now if is_min else value_now > price_now
log.debug(f'initial price line {value_now} {"<" if is_min else ">"} {price_now} {activated}') # log.debug(f'initial price line {value_now} {"<" if is_min else ">"} {price_now} {activated}')
trigger_type = Trigger.TriggerType.MinLine if is_min else Trigger.TriggerType.MaxLine trigger_type = Trigger.TriggerType.MinLine if is_min else Trigger.TriggerType.MaxLine
super().__init__(trigger_type, tk, activated) super().__init__(trigger_type, tk, activated)
self.inverted = inverted self.inverted = inverted
@@ -573,11 +573,11 @@ class TrancheTrigger:
TrancheState.Error if self.market_order and self.slippage < MIN_SLIPPAGE - MIN_SLIPPAGE_EPSILON else \ TrancheState.Error if self.market_order and self.slippage < MIN_SLIPPAGE - MIN_SLIPPAGE_EPSILON else \
TrancheState.Filled if tranche_remaining == 0 or tranche_remaining < self.order.min_fill_amount else \ TrancheState.Filled if tranche_remaining == 0 or tranche_remaining < self.order.min_fill_amount else \
TrancheState.Expired if self.expiration_trigger is not None and not self.expiration_trigger else \ TrancheState.Expired if self.expiration_trigger is not None and not self.expiration_trigger else \
TrancheState.Early if self.activation_trigger is None and not self.activation_trigger else \ TrancheState.Early if self.activation_trigger is not None and not self.activation_trigger else \
TrancheState.Active TrancheState.Active
_dirty.add(tk) _dirty.add(tk)
TrancheTrigger.all[tk] = self TrancheTrigger.all[tk] = self
log.debug(f'Tranche {tk} initial status {self.status} {self}') # log.debug(f'Tranche {tk} initial status {self.status} {self}')
@property @property