redis pipeline overflow fix
This commit is contained in:
@@ -1,14 +1,69 @@
|
|||||||
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from contextvars import ContextVar
|
from contextvars import ContextVar
|
||||||
|
|
||||||
import redis.asyncio as redis_async
|
import redis.asyncio as redis_async
|
||||||
from redis.asyncio import Redis
|
from redis.asyncio import Redis
|
||||||
|
from redis.asyncio.client import Pipeline
|
||||||
|
|
||||||
from dexorder import config
|
from dexorder import config
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
BATCH_SIZE = 1_000
|
||||||
|
|
||||||
|
class PipelineProxy:
|
||||||
|
def __init__(self, pipe: Pipeline):
|
||||||
|
self.pipe = pipe
|
||||||
|
self.ops = 0
|
||||||
|
|
||||||
|
async def push(self, num=1):
|
||||||
|
self.ops += num
|
||||||
|
if self.ops >= BATCH_SIZE:
|
||||||
|
self.ops = 0
|
||||||
|
await self.pipe.execute()
|
||||||
|
|
||||||
|
async def sadd(self, series, *keys):
|
||||||
|
while keys:
|
||||||
|
most = min(BATCH_SIZE-self.ops, len(keys))
|
||||||
|
assert most > 0
|
||||||
|
send = keys[:most]
|
||||||
|
keys = keys[most:]
|
||||||
|
await self.pipe.sadd(series, *send)
|
||||||
|
await self.push(len(send))
|
||||||
|
|
||||||
|
async def srem(self, series, *keys):
|
||||||
|
while keys:
|
||||||
|
most = min(BATCH_SIZE-self.ops, len(keys))
|
||||||
|
assert most > 0
|
||||||
|
send = keys[:most]
|
||||||
|
keys = keys[most:]
|
||||||
|
await self.pipe.srem(series, *send)
|
||||||
|
await self.push(len(send))
|
||||||
|
|
||||||
|
async def hset(self, series, *, mapping):
|
||||||
|
items = list(mapping.items())
|
||||||
|
while items:
|
||||||
|
most = min(BATCH_SIZE-self.ops, len(items))
|
||||||
|
assert most > 0
|
||||||
|
send = items[:most]
|
||||||
|
items = items[most:]
|
||||||
|
await self.pipe.hset(series, mapping={k:v for k,v in send})
|
||||||
|
await self.push(len(send))
|
||||||
|
|
||||||
|
async def hdel(self, series, *keys):
|
||||||
|
while keys:
|
||||||
|
most = min(BATCH_SIZE-self.ops, len(keys))
|
||||||
|
assert most > 0
|
||||||
|
send = keys[:most]
|
||||||
|
keys = keys[most:]
|
||||||
|
await self.pipe.hdel(series, *send)
|
||||||
|
await self.push(len(send))
|
||||||
|
|
||||||
|
def __getattr__(self, item):
|
||||||
|
return getattr(self.pipe, item)
|
||||||
|
|
||||||
|
|
||||||
class Memcache:
|
class Memcache:
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@@ -19,7 +74,7 @@ class Memcache:
|
|||||||
# noinspection PyTypeChecker
|
# noinspection PyTypeChecker
|
||||||
current_redis.set(pipe)
|
current_redis.set(pipe)
|
||||||
try:
|
try:
|
||||||
yield pipe
|
yield PipelineProxy(pipe)
|
||||||
await pipe.execute()
|
await pipe.execute()
|
||||||
finally:
|
finally:
|
||||||
current_redis.set(old_redis)
|
current_redis.set(old_redis)
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ from dexorder.blockstate.blockdata import SeriesCollection, BlockData
|
|||||||
from dexorder.blockstate.diff import DiffEntryItem
|
from dexorder.blockstate.diff import DiffEntryItem
|
||||||
from dexorder.blockstate.fork import Fork
|
from dexorder.blockstate.fork import Fork
|
||||||
from dexorder.blockstate.state import compress_diffs
|
from dexorder.blockstate.state import compress_diffs
|
||||||
from dexorder.memcache import current_redis, memcache
|
from dexorder.memcache import current_redis, memcache, PipelineProxy
|
||||||
from dexorder.util import hexstr
|
from dexorder.util import hexstr
|
||||||
from dexorder.util.async_util import maywait
|
from dexorder.util.async_util import maywait
|
||||||
from dexorder.util.json import json_encoder
|
from dexorder.util.json import json_encoder
|
||||||
@@ -40,7 +40,6 @@ class RedisState (SeriesCollection):
|
|||||||
for series in self.datas.keys():
|
for series in self.datas.keys():
|
||||||
for k, v in state.iteritems(fork, series):
|
for k, v in state.iteritems(fork, series):
|
||||||
diffs.append(DiffItem(series, k, v))
|
diffs.append(DiffItem(series, k, v))
|
||||||
# todo tim fix pubs
|
|
||||||
await self.save(fork, diffs, use_transaction=False, skip_pubs=True) # use_transaction=False if the data is too big
|
await self.save(fork, diffs, use_transaction=False, skip_pubs=True) # use_transaction=False if the data is too big
|
||||||
|
|
||||||
|
|
||||||
@@ -92,18 +91,17 @@ class RedisState (SeriesCollection):
|
|||||||
hsets[series][key] = value
|
hsets[series][key] = value
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
async with memcache.batch(use_transaction) as r:
|
async with memcache.batch(use_transaction) as r:
|
||||||
# Redis pipelines fill up before our state can be sent, so we cannot do this atomically.
|
r: PipelineProxy
|
||||||
# However, sending many individual calls is super slow, so we
|
|
||||||
r: Pipeline
|
|
||||||
for series, keys in sadds.items():
|
for series, keys in sadds.items():
|
||||||
r.sadd(series, *keys)
|
await r.sadd(series, *keys)
|
||||||
for series, keys in sdels.items():
|
for series, keys in sdels.items():
|
||||||
r.srem(series, *keys)
|
await r.srem(series, *keys)
|
||||||
for series, kvs in hsets.items():
|
for series, kvs in hsets.items():
|
||||||
r.hset(series, mapping=kvs)
|
await r.hset(series, mapping=kvs)
|
||||||
for series, keys in hdels.items():
|
for series, keys in hdels.items():
|
||||||
r.hdel(series, *keys)
|
await r.hdel(series, *keys)
|
||||||
block_series = f'{chain_id}|head'
|
block_series = f'{chain_id}|head'
|
||||||
headstr = hexstr(fork.head)
|
headstr = hexstr(fork.head)
|
||||||
r.json(json_encoder).set(block_series,'$',[fork.height, headstr])
|
r.json(json_encoder).set(block_series,'$',[fork.height, headstr])
|
||||||
|
|||||||
@@ -394,8 +394,8 @@ class OHLCRepository:
|
|||||||
if trim > 0:
|
if trim > 0:
|
||||||
updated = updated[trim:]
|
updated = updated[trim:]
|
||||||
|
|
||||||
if len(updated) > 3:
|
# if len(updated) > 3:
|
||||||
log.debug(f'\tnew recents ({len(updated)}): {updated}')
|
# log.debug(f'\tnew recents ({len(updated)}): {updated}')
|
||||||
recent_ohlcs.setitem(key, updated)
|
recent_ohlcs.setitem(key, updated)
|
||||||
return updated
|
return updated
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user