diff --git a/src/dexorder/memcache/__init__.py b/src/dexorder/memcache/__init__.py index 56bc6c3..8d8df71 100644 --- a/src/dexorder/memcache/__init__.py +++ b/src/dexorder/memcache/__init__.py @@ -1,14 +1,69 @@ +import itertools import logging from contextlib import asynccontextmanager from contextvars import ContextVar import redis.asyncio as redis_async from redis.asyncio import Redis +from redis.asyncio.client import Pipeline from dexorder import config log = logging.getLogger(__name__) +BATCH_SIZE = 1_000 + +class PipelineProxy: + def __init__(self, pipe: Pipeline): + self.pipe = pipe + self.ops = 0 + + async def push(self, num=1): + self.ops += num + if self.ops >= BATCH_SIZE: + self.ops = 0 + await self.pipe.execute() + + async def sadd(self, series, *keys): + while keys: + most = min(BATCH_SIZE-self.ops, len(keys)) + assert most > 0 + send = keys[:most] + keys = keys[most:] + await self.pipe.sadd(series, *send) + await self.push(len(send)) + + async def srem(self, series, *keys): + while keys: + most = min(BATCH_SIZE-self.ops, len(keys)) + assert most > 0 + send = keys[:most] + keys = keys[most:] + await self.pipe.srem(series, *send) + await self.push(len(send)) + + async def hset(self, series, *, mapping): + items = list(mapping.items()) + while items: + most = min(BATCH_SIZE-self.ops, len(items)) + assert most > 0 + send = items[:most] + items = items[most:] + await self.pipe.hset(series, mapping={k:v for k,v in send}) + await self.push(len(send)) + + async def hdel(self, series, *keys): + while keys: + most = min(BATCH_SIZE-self.ops, len(keys)) + assert most > 0 + send = keys[:most] + keys = keys[most:] + await self.pipe.hdel(series, *send) + await self.push(len(send)) + + def __getattr__(self, item): + return getattr(self.pipe, item) + class Memcache: @staticmethod @@ -19,7 +74,7 @@ class Memcache: # noinspection PyTypeChecker current_redis.set(pipe) try: - yield pipe + yield PipelineProxy(pipe) await pipe.execute() finally: current_redis.set(old_redis) diff --git a/src/dexorder/memcache/memcache_state.py b/src/dexorder/memcache/memcache_state.py index e5759ae..d4a6269 100644 --- a/src/dexorder/memcache/memcache_state.py +++ b/src/dexorder/memcache/memcache_state.py @@ -12,7 +12,7 @@ from dexorder.blockstate.blockdata import SeriesCollection, BlockData from dexorder.blockstate.diff import DiffEntryItem from dexorder.blockstate.fork import Fork from dexorder.blockstate.state import compress_diffs -from dexorder.memcache import current_redis, memcache +from dexorder.memcache import current_redis, memcache, PipelineProxy from dexorder.util import hexstr from dexorder.util.async_util import maywait from dexorder.util.json import json_encoder @@ -40,7 +40,6 @@ class RedisState (SeriesCollection): for series in self.datas.keys(): for k, v in state.iteritems(fork, series): diffs.append(DiffItem(series, k, v)) - # todo tim fix pubs await self.save(fork, diffs, use_transaction=False, skip_pubs=True) # use_transaction=False if the data is too big @@ -92,18 +91,17 @@ class RedisState (SeriesCollection): hsets[series][key] = value else: raise NotImplementedError + async with memcache.batch(use_transaction) as r: - # Redis pipelines fill up before our state can be sent, so we cannot do this atomically. - # However, sending many individual calls is super slow, so we - r: Pipeline + r: PipelineProxy for series, keys in sadds.items(): - r.sadd(series, *keys) + await r.sadd(series, *keys) for series, keys in sdels.items(): - r.srem(series, *keys) + await r.srem(series, *keys) for series, kvs in hsets.items(): - r.hset(series, mapping=kvs) + await r.hset(series, mapping=kvs) for series, keys in hdels.items(): - r.hdel(series, *keys) + await r.hdel(series, *keys) block_series = f'{chain_id}|head' headstr = hexstr(fork.head) r.json(json_encoder).set(block_series,'$',[fork.height, headstr]) diff --git a/src/dexorder/ohlc.py b/src/dexorder/ohlc.py index 2d94977..c614c60 100644 --- a/src/dexorder/ohlc.py +++ b/src/dexorder/ohlc.py @@ -394,8 +394,8 @@ class OHLCRepository: if trim > 0: updated = updated[trim:] - if len(updated) > 3: - log.debug(f'\tnew recents ({len(updated)}): {updated}') + # if len(updated) > 3: + # log.debug(f'\tnew recents ({len(updated)}): {updated}') recent_ohlcs.setitem(key, updated) return updated