redis pipeline autoflush after 10000 entries

This commit is contained in:
tim
2025-04-01 10:54:25 -04:00
parent 34fa439b3c
commit d49f142fe3
2 changed files with 42 additions and 14 deletions

View File

@@ -11,12 +11,39 @@ from dexorder import config
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class FlushingPipeline:
def __init__(self, redis: Redis):
self.redis = redis
self.pipe: Pipeline = redis.pipeline()
self.full_pipes: list[Pipeline] = []
self.count = 0
self.flush_at = 10_000
def __getattr__(self, item):
if item in ('sadd', 'srem', 'hset', 'hdel', 'json'):
self.count += 1
if self.count >= self.flush_at:
self.full_pipes.append(self.pipe)
self.pipe = self.redis.pipeline()
self.count = 0
return getattr(self.pipe, item)
async def execute(self):
for pipe in self.full_pipes:
await pipe.execute()
await self.pipe.execute()
self.pipe = None
self.full_pipes.clear()
self.count = 0
class Memcache: class Memcache:
@staticmethod @staticmethod
@asynccontextmanager @asynccontextmanager
async def batch(): async def batch():
old_redis: Redis = current_redis.get() old_redis: Redis = current_redis.get()
pipe: Pipeline = old_redis.pipeline() pipe = FlushingPipeline(old_redis)
# noinspection PyTypeChecker
current_redis.set(pipe) current_redis.set(pipe)
try: try:
yield pipe yield pipe

View File

@@ -91,19 +91,20 @@ class RedisState (SeriesCollection):
hsets[series][key] = value hsets[series][key] = value
else: else:
raise NotImplementedError raise NotImplementedError
r: Redis = current_redis.get() async with memcache.batch() as r:
for series, keys in sadds.items(): r: Pipeline
await r.sadd(series, *keys) for series, keys in sadds.items():
for series, keys in sdels.items(): r.sadd(series, *keys)
await r.srem(series, *keys) for series, keys in sdels.items():
for series, kvs in hsets.items(): r.srem(series, *keys)
await r.hset(series, mapping=kvs) for series, kvs in hsets.items():
for series, keys in hdels.items(): r.hset(series, mapping=kvs)
await r.hdel(series, *keys) for series, keys in hdels.items():
block_series = f'{chain_id}|head' r.hdel(series, *keys)
headstr = hexstr(fork.head) block_series = f'{chain_id}|head'
await r.json(json_encoder).set(block_series,'$',[fork.height, headstr]) headstr = hexstr(fork.head)
pubs.append((str(chain_id), 'head', [fork.height, headstr])) r.json(json_encoder).set(block_series,'$',[fork.height, headstr])
pubs.append((str(chain_id), 'head', [fork.height, headstr]))
# separate batch for pubs # separate batch for pubs
if pubs: if pubs:
await publish_all(pubs) await publish_all(pubs)