initial redis state load doesn't use pipeline now, because it overflowed.

This commit is contained in:
tim
2025-03-26 23:25:10 -04:00
parent 07c6423fd5
commit 31b6ddd314
2 changed files with 17 additions and 16 deletions

View File

@@ -7,6 +7,7 @@ from dexorder.blocks import current_block, get_block
from dexorder.blockstate import current_blockstate from dexorder.blockstate import current_blockstate
from dexorder.blockstate.blockdata import BlockData from dexorder.blockstate.blockdata import BlockData
from dexorder.blockstate.db_state import DbState from dexorder.blockstate.db_state import DbState
from dexorder.blockstate.fork import current_fork
from dexorder.contract.dexorder import VaultContract from dexorder.contract.dexorder import VaultContract
from dexorder.order.orderstate import Order from dexorder.order.orderstate import Order
from dexorder.tokens import adjust_decimals from dexorder.tokens import adjust_decimals
@@ -66,10 +67,11 @@ async def main(args: list):
db_state = DbState(BlockData.by_opt('db')) db_state = DbState(BlockData.by_opt('db'))
with db.transaction(): with db.transaction():
state = await db_state.load() state = await db_state.load()
state.readonly = True # state.readonly = True
current_blockstate.set(state) current_blockstate.set(state)
block = await get_block(state.root_hash) block = await get_block(state.root_hash)
current_block.set(block) current_block.set(block)
current_fork.set(state.root_fork)
await subcommand(parsed) await subcommand(parsed)

View File

@@ -2,7 +2,7 @@ import logging
from collections import defaultdict from collections import defaultdict
from typing import Iterable, Union, Reversible, Any from typing import Iterable, Union, Reversible, Any
from redis.asyncio.client import Pipeline from redis.asyncio.client import Pipeline, Redis
from socket_io_emitter import Emitter from socket_io_emitter import Emitter
from dexorder import DELETE from dexorder import DELETE
@@ -91,20 +91,19 @@ class RedisState (SeriesCollection):
hsets[series][key] = value hsets[series][key] = value
else: else:
raise NotImplementedError raise NotImplementedError
async with memcache.batch() as r: r: Redis = current_redis.get()
r: Pipeline for series, keys in sadds.items():
for series, keys in sadds.items(): await r.sadd(series, *keys)
r.sadd(series, *keys) for series, keys in sdels.items():
for series, keys in sdels.items(): await r.srem(series, *keys)
r.srem(series, *keys) for series, kvs in hsets.items():
for series, kvs in hsets.items(): await r.hset(series, mapping=kvs)
r.hset(series, mapping=kvs) for series, keys in hdels.items():
for series, keys in hdels.items(): await r.hdel(series, *keys)
r.hdel(series, *keys) block_series = f'{chain_id}|head'
block_series = f'{chain_id}|head' headstr = hexstr(fork.head)
headstr = hexstr(fork.head) await r.json(json_encoder).set(block_series,'$',[fork.height, headstr])
r.json(json_encoder).set(block_series,'$',[fork.height, headstr]) pubs.append((str(chain_id), 'head', [fork.height, headstr]))
pubs.append((str(chain_id), 'head', [fork.height, headstr]))
# separate batch for pubs # separate batch for pubs
if pubs: if pubs:
await publish_all(pubs) await publish_all(pubs)