Compare commits
5 Commits
34fa439b3c
...
0bb670b356
| Author | SHA1 | Date | |
|---|---|---|---|
| 0bb670b356 | |||
| 52b406ba17 | |||
| 3d0342d19d | |||
| dbf960bae9 | |||
| d49f142fe3 |
@@ -117,7 +117,7 @@ async def main():
|
||||
if redis_state:
|
||||
# load initial state
|
||||
log.info('initializing redis with root state')
|
||||
await redis_state.save(state.root_fork, state.diffs_by_branch[state.root_branch.id])
|
||||
await redis_state.init(state, state.root_fork)
|
||||
|
||||
await initialize_accounting_runner()
|
||||
|
||||
|
||||
@@ -220,7 +220,7 @@ async def update_metrics():
|
||||
metric.vaults.set(vault_owners.upper_len())
|
||||
metric.open_orders.set(Order.open_orders.upper_len())
|
||||
metric.triggers_time.set(len(TimeTrigger.all))
|
||||
metric.triggers_line.set(len(PriceLineTrigger.triggers_set))
|
||||
metric.triggers_line.set(sum(len(s) for s in PriceLineTrigger.by_pool.values()))
|
||||
|
||||
# slow updates
|
||||
global slow_metric_update
|
||||
|
||||
@@ -4,7 +4,6 @@ from contextvars import ContextVar
|
||||
|
||||
import redis.asyncio as redis_async
|
||||
from redis.asyncio import Redis
|
||||
from redis.asyncio.client import Pipeline
|
||||
|
||||
from dexorder import config
|
||||
|
||||
@@ -14,9 +13,10 @@ log = logging.getLogger(__name__)
|
||||
class Memcache:
|
||||
@staticmethod
|
||||
@asynccontextmanager
|
||||
async def batch():
|
||||
async def batch(transaction=True):
|
||||
old_redis: Redis = current_redis.get()
|
||||
pipe: Pipeline = old_redis.pipeline()
|
||||
pipe = old_redis.pipeline(transaction=transaction)
|
||||
# noinspection PyTypeChecker
|
||||
current_redis.set(pipe)
|
||||
try:
|
||||
yield pipe
|
||||
|
||||
@@ -2,7 +2,7 @@ import logging
|
||||
from collections import defaultdict
|
||||
from typing import Iterable, Union, Reversible, Any
|
||||
|
||||
from redis.asyncio.client import Pipeline, Redis
|
||||
from redis.asyncio.client import Pipeline
|
||||
from socket_io_emitter import Emitter
|
||||
|
||||
from dexorder import DELETE
|
||||
@@ -40,11 +40,12 @@ class RedisState (SeriesCollection):
|
||||
for series in self.datas.keys():
|
||||
for k, v in state.iteritems(fork, series):
|
||||
diffs.append(DiffItem(series, k, v))
|
||||
await self.save(fork, diffs)
|
||||
# todo tim fix pubs
|
||||
await self.save(fork, diffs, use_transaction=True, skip_pubs=True) # use_transaction=False if the data is too big
|
||||
|
||||
|
||||
# noinspection PyAsyncCall
|
||||
async def save(self, fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
|
||||
async def save(self, fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]], *, use_transaction=True, skip_pubs=False):
|
||||
# the diffs must be already compressed such that there is only one action per key
|
||||
chain = current_chain.get()
|
||||
chain_id = chain.id
|
||||
@@ -91,21 +92,24 @@ class RedisState (SeriesCollection):
|
||||
hsets[series][key] = value
|
||||
else:
|
||||
raise NotImplementedError
|
||||
r: Redis = current_redis.get()
|
||||
async with memcache.batch(use_transaction) as r:
|
||||
# Redis pipelines fill up before our state can be sent, so we cannot do this atomically.
|
||||
# However, sending many individual calls is super slow, so we
|
||||
r: Pipeline
|
||||
for series, keys in sadds.items():
|
||||
await r.sadd(series, *keys)
|
||||
r.sadd(series, *keys)
|
||||
for series, keys in sdels.items():
|
||||
await r.srem(series, *keys)
|
||||
r.srem(series, *keys)
|
||||
for series, kvs in hsets.items():
|
||||
await r.hset(series, mapping=kvs)
|
||||
r.hset(series, mapping=kvs)
|
||||
for series, keys in hdels.items():
|
||||
await r.hdel(series, *keys)
|
||||
r.hdel(series, *keys)
|
||||
block_series = f'{chain_id}|head'
|
||||
headstr = hexstr(fork.head)
|
||||
await r.json(json_encoder).set(block_series,'$',[fork.height, headstr])
|
||||
r.json(json_encoder).set(block_series,'$',[fork.height, headstr])
|
||||
pubs.append((str(chain_id), 'head', [fork.height, headstr]))
|
||||
# separate batch for pubs
|
||||
if pubs:
|
||||
if pubs and not skip_pubs:
|
||||
await publish_all(pubs)
|
||||
|
||||
|
||||
|
||||
@@ -359,7 +359,7 @@ class OHLCRepository:
|
||||
if price is None, then bars are advanced based on the time but no new price is added to the series.
|
||||
"""
|
||||
if OHLC_LIMIT_POOLS_DEBUG is not None and (symbol,period) not in OHLC_LIMIT_POOLS_DEBUG:
|
||||
return
|
||||
return None
|
||||
# logname = f'{symbol} {period_name(period)}'
|
||||
# log.debug(f'Updating OHLC {logname} {minutely(time)} {price}')
|
||||
if price is not None:
|
||||
@@ -371,13 +371,18 @@ class OHLCRepository:
|
||||
# log.debug(f'got recent {historical}')
|
||||
if not historical:
|
||||
if create is False or price is None:
|
||||
return # do not track symbols which have not been explicity set up
|
||||
historical = []
|
||||
return None # do not track symbols which have not been explicity set up
|
||||
updated = [NativeOHLC(ohlc_start_time(time, period), price, price, price, price)]
|
||||
# log.debug(f'\tcreated new bars {updated}')
|
||||
else:
|
||||
updated = update_ohlc(historical[-1], period, time, price)
|
||||
# drop any historical bars that are older than we need
|
||||
# overlap the updated OHLC's on top of the historical ones
|
||||
last_bar = historical[-1].start
|
||||
first_updated = updated[0].start
|
||||
overlap = (first_updated - last_bar) // period + 1
|
||||
updated = historical[:-overlap] + updated if overlap > 0 else historical + updated
|
||||
|
||||
# drop any bars that are older than we need
|
||||
# oldest_needed = cover the root block time plus one period prior
|
||||
root_branch = current_blockstate.get().root_branch
|
||||
root_hash = root_branch.head
|
||||
@@ -385,19 +390,12 @@ class OHLCRepository:
|
||||
root_timestamp = await get_block_timestamp(root_hash)
|
||||
oldest_needed = from_timestamp(root_timestamp) - period
|
||||
# noinspection PyTypeChecker
|
||||
trim = (oldest_needed - historical[0].start) // period
|
||||
trim = (oldest_needed - updated[0].start) // period
|
||||
if trim > 0:
|
||||
historical = historical[trim:]
|
||||
updated = updated[trim:]
|
||||
|
||||
# now overlap the updated data on top of the historical data
|
||||
if not historical or not updated:
|
||||
updated = historical + updated
|
||||
else:
|
||||
last_bar = historical[-1].start
|
||||
first_updated = updated[0].start
|
||||
overlap = (first_updated - last_bar) // period + 1
|
||||
updated = historical[:-overlap] + updated if overlap > 0 else historical + updated
|
||||
# log.debug(f'\tnew recents: {updated}')
|
||||
if len(updated) > 3:
|
||||
log.debug(f'\tnew recents ({len(updated)}): {updated}')
|
||||
recent_ohlcs.setitem(key, updated)
|
||||
return updated
|
||||
|
||||
|
||||
@@ -393,7 +393,7 @@ class PriceLineTrigger (Trigger):
|
||||
if inverted:
|
||||
price_now = 1/price_now
|
||||
activated = value_now < price_now if is_min else value_now > price_now
|
||||
log.debug(f'initial price line {value_now} {"<" if is_min else ">"} {price_now} {activated}')
|
||||
# log.debug(f'initial price line {value_now} {"<" if is_min else ">"} {price_now} {activated}')
|
||||
trigger_type = Trigger.TriggerType.MinLine if is_min else Trigger.TriggerType.MaxLine
|
||||
super().__init__(trigger_type, tk, activated)
|
||||
self.inverted = inverted
|
||||
@@ -573,11 +573,11 @@ class TrancheTrigger:
|
||||
TrancheState.Error if self.market_order and self.slippage < MIN_SLIPPAGE - MIN_SLIPPAGE_EPSILON else \
|
||||
TrancheState.Filled if tranche_remaining == 0 or tranche_remaining < self.order.min_fill_amount else \
|
||||
TrancheState.Expired if self.expiration_trigger is not None and not self.expiration_trigger else \
|
||||
TrancheState.Early if self.activation_trigger is None and not self.activation_trigger else \
|
||||
TrancheState.Early if self.activation_trigger is not None and not self.activation_trigger else \
|
||||
TrancheState.Active
|
||||
_dirty.add(tk)
|
||||
TrancheTrigger.all[tk] = self
|
||||
log.debug(f'Tranche {tk} initial status {self.status} {self}')
|
||||
# log.debug(f'Tranche {tk} initial status {self.status} {self}')
|
||||
|
||||
|
||||
@property
|
||||
|
||||
Reference in New Issue
Block a user