Compare commits
5 Commits
34fa439b3c
...
0bb670b356
| Author | SHA1 | Date | |
|---|---|---|---|
| 0bb670b356 | |||
| 52b406ba17 | |||
| 3d0342d19d | |||
| dbf960bae9 | |||
| d49f142fe3 |
@@ -117,7 +117,7 @@ async def main():
|
|||||||
if redis_state:
|
if redis_state:
|
||||||
# load initial state
|
# load initial state
|
||||||
log.info('initializing redis with root state')
|
log.info('initializing redis with root state')
|
||||||
await redis_state.save(state.root_fork, state.diffs_by_branch[state.root_branch.id])
|
await redis_state.init(state, state.root_fork)
|
||||||
|
|
||||||
await initialize_accounting_runner()
|
await initialize_accounting_runner()
|
||||||
|
|
||||||
|
|||||||
@@ -220,7 +220,7 @@ async def update_metrics():
|
|||||||
metric.vaults.set(vault_owners.upper_len())
|
metric.vaults.set(vault_owners.upper_len())
|
||||||
metric.open_orders.set(Order.open_orders.upper_len())
|
metric.open_orders.set(Order.open_orders.upper_len())
|
||||||
metric.triggers_time.set(len(TimeTrigger.all))
|
metric.triggers_time.set(len(TimeTrigger.all))
|
||||||
metric.triggers_line.set(len(PriceLineTrigger.triggers_set))
|
metric.triggers_line.set(sum(len(s) for s in PriceLineTrigger.by_pool.values()))
|
||||||
|
|
||||||
# slow updates
|
# slow updates
|
||||||
global slow_metric_update
|
global slow_metric_update
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ from contextvars import ContextVar
|
|||||||
|
|
||||||
import redis.asyncio as redis_async
|
import redis.asyncio as redis_async
|
||||||
from redis.asyncio import Redis
|
from redis.asyncio import Redis
|
||||||
from redis.asyncio.client import Pipeline
|
|
||||||
|
|
||||||
from dexorder import config
|
from dexorder import config
|
||||||
|
|
||||||
@@ -14,9 +13,10 @@ log = logging.getLogger(__name__)
|
|||||||
class Memcache:
|
class Memcache:
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def batch():
|
async def batch(transaction=True):
|
||||||
old_redis: Redis = current_redis.get()
|
old_redis: Redis = current_redis.get()
|
||||||
pipe: Pipeline = old_redis.pipeline()
|
pipe = old_redis.pipeline(transaction=transaction)
|
||||||
|
# noinspection PyTypeChecker
|
||||||
current_redis.set(pipe)
|
current_redis.set(pipe)
|
||||||
try:
|
try:
|
||||||
yield pipe
|
yield pipe
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ import logging
|
|||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from typing import Iterable, Union, Reversible, Any
|
from typing import Iterable, Union, Reversible, Any
|
||||||
|
|
||||||
from redis.asyncio.client import Pipeline, Redis
|
from redis.asyncio.client import Pipeline
|
||||||
from socket_io_emitter import Emitter
|
from socket_io_emitter import Emitter
|
||||||
|
|
||||||
from dexorder import DELETE
|
from dexorder import DELETE
|
||||||
@@ -40,11 +40,12 @@ class RedisState (SeriesCollection):
|
|||||||
for series in self.datas.keys():
|
for series in self.datas.keys():
|
||||||
for k, v in state.iteritems(fork, series):
|
for k, v in state.iteritems(fork, series):
|
||||||
diffs.append(DiffItem(series, k, v))
|
diffs.append(DiffItem(series, k, v))
|
||||||
await self.save(fork, diffs)
|
# todo tim fix pubs
|
||||||
|
await self.save(fork, diffs, use_transaction=True, skip_pubs=True) # use_transaction=False if the data is too big
|
||||||
|
|
||||||
|
|
||||||
# noinspection PyAsyncCall
|
# noinspection PyAsyncCall
|
||||||
async def save(self, fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
|
async def save(self, fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]], *, use_transaction=True, skip_pubs=False):
|
||||||
# the diffs must be already compressed such that there is only one action per key
|
# the diffs must be already compressed such that there is only one action per key
|
||||||
chain = current_chain.get()
|
chain = current_chain.get()
|
||||||
chain_id = chain.id
|
chain_id = chain.id
|
||||||
@@ -91,21 +92,24 @@ class RedisState (SeriesCollection):
|
|||||||
hsets[series][key] = value
|
hsets[series][key] = value
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
r: Redis = current_redis.get()
|
async with memcache.batch(use_transaction) as r:
|
||||||
for series, keys in sadds.items():
|
# Redis pipelines fill up before our state can be sent, so we cannot do this atomically.
|
||||||
await r.sadd(series, *keys)
|
# However, sending many individual calls is super slow, so we
|
||||||
for series, keys in sdels.items():
|
r: Pipeline
|
||||||
await r.srem(series, *keys)
|
for series, keys in sadds.items():
|
||||||
for series, kvs in hsets.items():
|
r.sadd(series, *keys)
|
||||||
await r.hset(series, mapping=kvs)
|
for series, keys in sdels.items():
|
||||||
for series, keys in hdels.items():
|
r.srem(series, *keys)
|
||||||
await r.hdel(series, *keys)
|
for series, kvs in hsets.items():
|
||||||
block_series = f'{chain_id}|head'
|
r.hset(series, mapping=kvs)
|
||||||
headstr = hexstr(fork.head)
|
for series, keys in hdels.items():
|
||||||
await r.json(json_encoder).set(block_series,'$',[fork.height, headstr])
|
r.hdel(series, *keys)
|
||||||
pubs.append((str(chain_id), 'head', [fork.height, headstr]))
|
block_series = f'{chain_id}|head'
|
||||||
|
headstr = hexstr(fork.head)
|
||||||
|
r.json(json_encoder).set(block_series,'$',[fork.height, headstr])
|
||||||
|
pubs.append((str(chain_id), 'head', [fork.height, headstr]))
|
||||||
# separate batch for pubs
|
# separate batch for pubs
|
||||||
if pubs:
|
if pubs and not skip_pubs:
|
||||||
await publish_all(pubs)
|
await publish_all(pubs)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -359,7 +359,7 @@ class OHLCRepository:
|
|||||||
if price is None, then bars are advanced based on the time but no new price is added to the series.
|
if price is None, then bars are advanced based on the time but no new price is added to the series.
|
||||||
"""
|
"""
|
||||||
if OHLC_LIMIT_POOLS_DEBUG is not None and (symbol,period) not in OHLC_LIMIT_POOLS_DEBUG:
|
if OHLC_LIMIT_POOLS_DEBUG is not None and (symbol,period) not in OHLC_LIMIT_POOLS_DEBUG:
|
||||||
return
|
return None
|
||||||
# logname = f'{symbol} {period_name(period)}'
|
# logname = f'{symbol} {period_name(period)}'
|
||||||
# log.debug(f'Updating OHLC {logname} {minutely(time)} {price}')
|
# log.debug(f'Updating OHLC {logname} {minutely(time)} {price}')
|
||||||
if price is not None:
|
if price is not None:
|
||||||
@@ -371,33 +371,31 @@ class OHLCRepository:
|
|||||||
# log.debug(f'got recent {historical}')
|
# log.debug(f'got recent {historical}')
|
||||||
if not historical:
|
if not historical:
|
||||||
if create is False or price is None:
|
if create is False or price is None:
|
||||||
return # do not track symbols which have not been explicity set up
|
return None # do not track symbols which have not been explicity set up
|
||||||
historical = []
|
|
||||||
updated = [NativeOHLC(ohlc_start_time(time, period), price, price, price, price)]
|
updated = [NativeOHLC(ohlc_start_time(time, period), price, price, price, price)]
|
||||||
# log.debug(f'\tcreated new bars {updated}')
|
# log.debug(f'\tcreated new bars {updated}')
|
||||||
else:
|
else:
|
||||||
updated = update_ohlc(historical[-1], period, time, price)
|
updated = update_ohlc(historical[-1], period, time, price)
|
||||||
# drop any historical bars that are older than we need
|
# overlap the updated OHLC's on top of the historical ones
|
||||||
# oldest_needed = cover the root block time plus one period prior
|
|
||||||
root_branch = current_blockstate.get().root_branch
|
|
||||||
root_hash = root_branch.head
|
|
||||||
if root_hash is not None:
|
|
||||||
root_timestamp = await get_block_timestamp(root_hash)
|
|
||||||
oldest_needed = from_timestamp(root_timestamp) - period
|
|
||||||
# noinspection PyTypeChecker
|
|
||||||
trim = (oldest_needed - historical[0].start) // period
|
|
||||||
if trim > 0:
|
|
||||||
historical = historical[trim:]
|
|
||||||
|
|
||||||
# now overlap the updated data on top of the historical data
|
|
||||||
if not historical or not updated:
|
|
||||||
updated = historical + updated
|
|
||||||
else:
|
|
||||||
last_bar = historical[-1].start
|
last_bar = historical[-1].start
|
||||||
first_updated = updated[0].start
|
first_updated = updated[0].start
|
||||||
overlap = (first_updated - last_bar) // period + 1
|
overlap = (first_updated - last_bar) // period + 1
|
||||||
updated = historical[:-overlap] + updated if overlap > 0 else historical + updated
|
updated = historical[:-overlap] + updated if overlap > 0 else historical + updated
|
||||||
# log.debug(f'\tnew recents: {updated}')
|
|
||||||
|
# drop any bars that are older than we need
|
||||||
|
# oldest_needed = cover the root block time plus one period prior
|
||||||
|
root_branch = current_blockstate.get().root_branch
|
||||||
|
root_hash = root_branch.head
|
||||||
|
if root_hash is not None:
|
||||||
|
root_timestamp = await get_block_timestamp(root_hash)
|
||||||
|
oldest_needed = from_timestamp(root_timestamp) - period
|
||||||
|
# noinspection PyTypeChecker
|
||||||
|
trim = (oldest_needed - updated[0].start) // period
|
||||||
|
if trim > 0:
|
||||||
|
updated = updated[trim:]
|
||||||
|
|
||||||
|
if len(updated) > 3:
|
||||||
|
log.debug(f'\tnew recents ({len(updated)}): {updated}')
|
||||||
recent_ohlcs.setitem(key, updated)
|
recent_ohlcs.setitem(key, updated)
|
||||||
return updated
|
return updated
|
||||||
|
|
||||||
|
|||||||
@@ -393,7 +393,7 @@ class PriceLineTrigger (Trigger):
|
|||||||
if inverted:
|
if inverted:
|
||||||
price_now = 1/price_now
|
price_now = 1/price_now
|
||||||
activated = value_now < price_now if is_min else value_now > price_now
|
activated = value_now < price_now if is_min else value_now > price_now
|
||||||
log.debug(f'initial price line {value_now} {"<" if is_min else ">"} {price_now} {activated}')
|
# log.debug(f'initial price line {value_now} {"<" if is_min else ">"} {price_now} {activated}')
|
||||||
trigger_type = Trigger.TriggerType.MinLine if is_min else Trigger.TriggerType.MaxLine
|
trigger_type = Trigger.TriggerType.MinLine if is_min else Trigger.TriggerType.MaxLine
|
||||||
super().__init__(trigger_type, tk, activated)
|
super().__init__(trigger_type, tk, activated)
|
||||||
self.inverted = inverted
|
self.inverted = inverted
|
||||||
@@ -573,11 +573,11 @@ class TrancheTrigger:
|
|||||||
TrancheState.Error if self.market_order and self.slippage < MIN_SLIPPAGE - MIN_SLIPPAGE_EPSILON else \
|
TrancheState.Error if self.market_order and self.slippage < MIN_SLIPPAGE - MIN_SLIPPAGE_EPSILON else \
|
||||||
TrancheState.Filled if tranche_remaining == 0 or tranche_remaining < self.order.min_fill_amount else \
|
TrancheState.Filled if tranche_remaining == 0 or tranche_remaining < self.order.min_fill_amount else \
|
||||||
TrancheState.Expired if self.expiration_trigger is not None and not self.expiration_trigger else \
|
TrancheState.Expired if self.expiration_trigger is not None and not self.expiration_trigger else \
|
||||||
TrancheState.Early if self.activation_trigger is None and not self.activation_trigger else \
|
TrancheState.Early if self.activation_trigger is not None and not self.activation_trigger else \
|
||||||
TrancheState.Active
|
TrancheState.Active
|
||||||
_dirty.add(tk)
|
_dirty.add(tk)
|
||||||
TrancheTrigger.all[tk] = self
|
TrancheTrigger.all[tk] = self
|
||||||
log.debug(f'Tranche {tk} initial status {self.status} {self}')
|
# log.debug(f'Tranche {tk} initial status {self.status} {self}')
|
||||||
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
|||||||
Reference in New Issue
Block a user