Compare commits

...

5 Commits

Author SHA1 Message Date
tim
0bb670b356 redis initial state push fix 2025-04-01 13:52:49 -04:00
tim
52b406ba17 ohlc retained length fix 2025-04-01 13:52:39 -04:00
tim
3d0342d19d price line metrics fix 2025-04-01 13:52:29 -04:00
tim
dbf960bae9 initial TrancheState fix 2025-04-01 13:52:21 -04:00
tim
d49f142fe3 redis pipeline autoflush after 10000 entries 2025-04-01 10:54:25 -04:00
6 changed files with 47 additions and 45 deletions

View File

@@ -117,7 +117,7 @@ async def main():
if redis_state:
# load initial state
log.info('initializing redis with root state')
await redis_state.save(state.root_fork, state.diffs_by_branch[state.root_branch.id])
await redis_state.init(state, state.root_fork)
await initialize_accounting_runner()

View File

@@ -220,7 +220,7 @@ async def update_metrics():
metric.vaults.set(vault_owners.upper_len())
metric.open_orders.set(Order.open_orders.upper_len())
metric.triggers_time.set(len(TimeTrigger.all))
metric.triggers_line.set(len(PriceLineTrigger.triggers_set))
metric.triggers_line.set(sum(len(s) for s in PriceLineTrigger.by_pool.values()))
# slow updates
global slow_metric_update

View File

@@ -4,7 +4,6 @@ from contextvars import ContextVar
import redis.asyncio as redis_async
from redis.asyncio import Redis
from redis.asyncio.client import Pipeline
from dexorder import config
@@ -14,9 +13,10 @@ log = logging.getLogger(__name__)
class Memcache:
@staticmethod
@asynccontextmanager
async def batch():
async def batch(transaction=True):
old_redis: Redis = current_redis.get()
pipe: Pipeline = old_redis.pipeline()
pipe = old_redis.pipeline(transaction=transaction)
# noinspection PyTypeChecker
current_redis.set(pipe)
try:
yield pipe

View File

@@ -2,7 +2,7 @@ import logging
from collections import defaultdict
from typing import Iterable, Union, Reversible, Any
from redis.asyncio.client import Pipeline, Redis
from redis.asyncio.client import Pipeline
from socket_io_emitter import Emitter
from dexorder import DELETE
@@ -40,11 +40,12 @@ class RedisState (SeriesCollection):
for series in self.datas.keys():
for k, v in state.iteritems(fork, series):
diffs.append(DiffItem(series, k, v))
await self.save(fork, diffs)
# todo tim fix pubs
await self.save(fork, diffs, use_transaction=True, skip_pubs=True) # use_transaction=False if the data is too big
# noinspection PyAsyncCall
async def save(self, fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
async def save(self, fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]], *, use_transaction=True, skip_pubs=False):
# the diffs must be already compressed such that there is only one action per key
chain = current_chain.get()
chain_id = chain.id
@@ -91,21 +92,24 @@ class RedisState (SeriesCollection):
hsets[series][key] = value
else:
raise NotImplementedError
r: Redis = current_redis.get()
for series, keys in sadds.items():
await r.sadd(series, *keys)
for series, keys in sdels.items():
await r.srem(series, *keys)
for series, kvs in hsets.items():
await r.hset(series, mapping=kvs)
for series, keys in hdels.items():
await r.hdel(series, *keys)
block_series = f'{chain_id}|head'
headstr = hexstr(fork.head)
await r.json(json_encoder).set(block_series,'$',[fork.height, headstr])
pubs.append((str(chain_id), 'head', [fork.height, headstr]))
async with memcache.batch(use_transaction) as r:
# Redis pipelines fill up before our state can be sent, so we cannot do this atomically.
# However, sending many individual calls is super slow, so we
r: Pipeline
for series, keys in sadds.items():
r.sadd(series, *keys)
for series, keys in sdels.items():
r.srem(series, *keys)
for series, kvs in hsets.items():
r.hset(series, mapping=kvs)
for series, keys in hdels.items():
r.hdel(series, *keys)
block_series = f'{chain_id}|head'
headstr = hexstr(fork.head)
r.json(json_encoder).set(block_series,'$',[fork.height, headstr])
pubs.append((str(chain_id), 'head', [fork.height, headstr]))
# separate batch for pubs
if pubs:
if pubs and not skip_pubs:
await publish_all(pubs)

View File

@@ -359,7 +359,7 @@ class OHLCRepository:
if price is None, then bars are advanced based on the time but no new price is added to the series.
"""
if OHLC_LIMIT_POOLS_DEBUG is not None and (symbol,period) not in OHLC_LIMIT_POOLS_DEBUG:
return
return None
# logname = f'{symbol} {period_name(period)}'
# log.debug(f'Updating OHLC {logname} {minutely(time)} {price}')
if price is not None:
@@ -371,33 +371,31 @@ class OHLCRepository:
# log.debug(f'got recent {historical}')
if not historical:
if create is False or price is None:
return # do not track symbols which have not been explicity set up
historical = []
return None # do not track symbols which have not been explicity set up
updated = [NativeOHLC(ohlc_start_time(time, period), price, price, price, price)]
# log.debug(f'\tcreated new bars {updated}')
else:
updated = update_ohlc(historical[-1], period, time, price)
# drop any historical bars that are older than we need
# oldest_needed = cover the root block time plus one period prior
root_branch = current_blockstate.get().root_branch
root_hash = root_branch.head
if root_hash is not None:
root_timestamp = await get_block_timestamp(root_hash)
oldest_needed = from_timestamp(root_timestamp) - period
# noinspection PyTypeChecker
trim = (oldest_needed - historical[0].start) // period
if trim > 0:
historical = historical[trim:]
# now overlap the updated data on top of the historical data
if not historical or not updated:
updated = historical + updated
else:
# overlap the updated OHLC's on top of the historical ones
last_bar = historical[-1].start
first_updated = updated[0].start
overlap = (first_updated - last_bar) // period + 1
updated = historical[:-overlap] + updated if overlap > 0 else historical + updated
# log.debug(f'\tnew recents: {updated}')
# drop any bars that are older than we need
# oldest_needed = cover the root block time plus one period prior
root_branch = current_blockstate.get().root_branch
root_hash = root_branch.head
if root_hash is not None:
root_timestamp = await get_block_timestamp(root_hash)
oldest_needed = from_timestamp(root_timestamp) - period
# noinspection PyTypeChecker
trim = (oldest_needed - updated[0].start) // period
if trim > 0:
updated = updated[trim:]
if len(updated) > 3:
log.debug(f'\tnew recents ({len(updated)}): {updated}')
recent_ohlcs.setitem(key, updated)
return updated

View File

@@ -393,7 +393,7 @@ class PriceLineTrigger (Trigger):
if inverted:
price_now = 1/price_now
activated = value_now < price_now if is_min else value_now > price_now
log.debug(f'initial price line {value_now} {"<" if is_min else ">"} {price_now} {activated}')
# log.debug(f'initial price line {value_now} {"<" if is_min else ">"} {price_now} {activated}')
trigger_type = Trigger.TriggerType.MinLine if is_min else Trigger.TriggerType.MaxLine
super().__init__(trigger_type, tk, activated)
self.inverted = inverted
@@ -573,11 +573,11 @@ class TrancheTrigger:
TrancheState.Error if self.market_order and self.slippage < MIN_SLIPPAGE - MIN_SLIPPAGE_EPSILON else \
TrancheState.Filled if tranche_remaining == 0 or tranche_remaining < self.order.min_fill_amount else \
TrancheState.Expired if self.expiration_trigger is not None and not self.expiration_trigger else \
TrancheState.Early if self.activation_trigger is None and not self.activation_trigger else \
TrancheState.Early if self.activation_trigger is not None and not self.activation_trigger else \
TrancheState.Active
_dirty.add(tk)
TrancheTrigger.all[tk] = self
log.debug(f'Tranche {tk} initial status {self.status} {self}')
# log.debug(f'Tranche {tk} initial status {self.status} {self}')
@property