redis integration; bin scripts

This commit is contained in:
Tim Olson
2023-10-04 03:43:26 -04:00
parent 416016d31b
commit 5c71305293
11 changed files with 60 additions and 27 deletions

2
bin/df.sh Executable file
View File

@@ -0,0 +1,2 @@
#!/bin/bash
docker run --network=host --ulimit memlock=-1 docker.dragonflydb.io/dragonflydb/dragonfly --maxmemory 1G "$@"

1
bin/reset_db_df.sh Executable file
View File

@@ -0,0 +1 @@
alembic downgrade -1 && alembic upgrade head && ./bin/df.sh

View File

@@ -8,3 +8,4 @@ sortedcontainers~=2.4.0
hexbytes~=0.3.1 hexbytes~=0.3.1
defaultlist~=1.0.0 defaultlist~=1.0.0
redis[hiredis] redis[hiredis]
socket.io-emitter

View File

@@ -4,6 +4,7 @@ from decimal import Decimal as dec
class _NARG: class _NARG:
def __bool__(self): return False def __bool__(self): return False
NARG = _NARG() NARG = _NARG()
DELETE = object() # used as a value token to indicate removal of the key
ADDRESS_0 = '0x0000000000000000000000000000000000000000' ADDRESS_0 = '0x0000000000000000000000000000000000000000'
WEI = 1 WEI = 1
GWEI = 1_000_000_000 GWEI = 1_000_000_000

View File

@@ -24,10 +24,10 @@ async def main():
state = None state = None
if memcache: if memcache:
await memcache.connect() await memcache.connect()
redis_state = RedisState(BlockData.by_tag['redis']) redis_state = RedisState(BlockData.by_opt('redis'))
if db: if db:
db.connect() db.connect()
db_state = DbState(BlockData.by_tag['db']) db_state = DbState(BlockData.by_opt('db'))
with db.session: with db.session:
state = db_state.load() state = db_state.load()
if state is not None: if state is not None:

View File

@@ -1,11 +1,9 @@
import logging import logging
from collections import defaultdict
from enum import Enum from enum import Enum
from typing import TypeVar, Generic, Iterable, Union from typing import TypeVar, Generic, Iterable, Union
from dexorder import NARG from dexorder import NARG, DELETE
from dexorder.base.fork import current_fork from dexorder.base.fork import current_fork
from .diff import DELETE
from .state import current_blockstate from .state import current_blockstate
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -21,16 +19,13 @@ class DataType(Enum):
class BlockData: class BlockData:
registry: dict[str,'BlockData'] = {} # series name and instance registry: dict[str,'BlockData'] = {} # series name and instance
by_tag: dict[str, list['BlockData']] = defaultdict(list)
def __init__(self, series:str, data_type: DataType, **tags): def __init__(self, series:str, data_type: DataType, **opts):
assert series not in BlockData.registry assert series not in BlockData.registry
BlockData.registry[series] = self BlockData.registry[series] = self
self.series = series self.series = series
self.type = data_type self.type = data_type
for tag, value in tags.items(): self.opts = opts
if value:
BlockData.by_tag[tag].append(self)
def setitem(self, item, value, overwrite=True): def setitem(self, item, value, overwrite=True):
state = current_blockstate.get() state = current_blockstate.get()
@@ -58,6 +53,10 @@ class BlockData:
fork = current_fork.get() fork = current_fork.get()
return state.iteritems(fork, series_key) return state.iteritems(fork, series_key)
@staticmethod
def by_opt(key):
yield from (s for s in BlockData.registry.values() if key in s.opts)
class BlockSet(Generic[T], Iterable[T], BlockData): class BlockSet(Generic[T], Iterable[T], BlockData):
def __init__(self, series: str, **tags): def __init__(self, series: str, **tags):
@@ -104,7 +103,7 @@ class BlockDict(Generic[T], BlockData):
class SeriesCollection: class SeriesCollection:
def __init__(self, series_or_datavars: Iterable[Union[str,BlockData]]): def __init__(self, series_or_datavars: Iterable[Union[str,BlockData]]):
self.types = { self.datas: dict[str,BlockData] = {
(d:=BlockData.registry[x] if type(x) is str else x).series:d.type (d := BlockData.registry[x] if type(x) is str else x).series: d
for x in series_or_datavars for x in series_or_datavars
} }

View File

@@ -20,7 +20,7 @@ class DbState(SeriesCollection):
chain_id = current_chain.get().chain_id chain_id = current_chain.get().chain_id
for diff in diffs: for diff in diffs:
try: try:
t = self.types[diff.series] t = self.datas[diff.series].type
except KeyError: except KeyError:
continue continue
diffseries = keystr(diff.series) diffseries = keystr(diff.series)
@@ -60,7 +60,8 @@ class DbState(SeriesCollection):
state = BlockState(root_block) state = BlockState(root_block)
current_blockstate.set(state) current_blockstate.set(state)
current_fork.set(None) # root fork current_fork.set(None) # root fork
for series, t in self.types.items(): for series, data in self.datas.items():
t = data.type
if t == DataType.SET: if t == DataType.SET:
# noinspection PyTypeChecker # noinspection PyTypeChecker
var: BlockSet = BlockData.registry[series] var: BlockSet = BlockData.registry[series]

View File

@@ -1,8 +1,7 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import Union, Any from typing import Union, Any
from dexorder import DELETE
DELETE = object() # used as a value token to indicate removal of the key
@dataclass @dataclass

View File

@@ -1,7 +1,11 @@
from dexorder.blockstate import BlockSet, BlockDict from dexorder.blockstate import BlockSet, BlockDict
vault_addresses = BlockSet('v', db=True, redis=True) # pub=... publishes to a channel for web clients to consume. argument is (key,value) and return must be (event,room,value)
vault_tokens = BlockDict('vt', db=True, redis=True) # if pub is True, then event is the current series name, room is the key, and value is passed through
pool_prices = BlockDict('p', db=True, redis=True) # values of DELETE are serialized as nulls
vault_addresses = BlockSet('v', db=True, redis=True, pub=True)
vault_tokens = BlockDict('vt', db=True, redis=True, pub=True)
pool_prices = BlockDict('p', db=True, redis=True, pub=True)
underfunded_vaults = BlockSet('uv', db=True) underfunded_vaults = BlockSet('uv', db=True)
active_orders = BlockSet('a', db=True) active_orders = BlockSet('a', db=True)

View File

@@ -1,8 +1,9 @@
import logging import logging
from collections import defaultdict from collections import defaultdict
from typing import Iterable, Union, Reversible from typing import Iterable, Union, Reversible, Any
from redis.asyncio.client import Pipeline from redis.asyncio.client import Pipeline
from socket_io_emitter import Emitter
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.base.fork import current_fork from dexorder.base.fork import current_fork
@@ -26,21 +27,21 @@ class RedisState (SeriesCollection):
async def clear(self): async def clear(self):
r = current_redis.get() r = current_redis.get()
await r.delete(f'{current_chain.get().chain_id}|latest_block', *self.types.keys()) await r.delete(f'{current_chain.get().chain_id}|latest_block', *self.datas.keys())
async def init(self, state: BlockState): async def init(self, state: BlockState):
fork = current_fork.get() fork = current_fork.get()
await self.clear() await self.clear()
diffs = [] diffs = []
for series, t in self.types.items(): for series in self.datas.keys():
for k, v in state.iteritems(fork, series): for k, v in state.iteritems(fork, series):
diffs.append(DiffItem(series, k, v)) diffs.append(DiffItem(series, k, v))
await self.save(state.root_block, diffs) await self.save(state.root_block, diffs)
# noinspection PyAsyncCall # noinspection PyAsyncCall
async def save(self, block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]] ): async def save(self, block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]], *, publish=False ):
# the diffs must be already compressed such that there is only one action per key # the diffs must be already compressed such that there is only one action per key
chain = current_chain.get() chain = current_chain.get()
assert block.chain == chain.chain_id assert block.chain == chain.chain_id
@@ -49,13 +50,25 @@ class RedisState (SeriesCollection):
sdels: dict[str,set[str]] = defaultdict(set) sdels: dict[str,set[str]] = defaultdict(set)
hsets: dict[str,dict[str,str]] = defaultdict(dict) hsets: dict[str,dict[str,str]] = defaultdict(dict)
hdels: dict[str,set[str]] = defaultdict(set) hdels: dict[str,set[str]] = defaultdict(set)
pubs: list[tuple[str,str,Any]] = [] # series, key, value => room, event, value
for diff in compress_diffs(diffs): for diff in compress_diffs(diffs):
try: try:
t = self.types[diff.series] d = self.datas[diff.series]
except KeyError: except KeyError:
continue continue
t = d.type
series = f'{chain_id}|{keystr(diff.series)}' series = f'{chain_id}|{keystr(diff.series)}'
key = keystr(diff.key) key = keystr(diff.key)
value = keystr(diff.value)
# pub/sub socketio/redis
pub_kv = d.opts.get('pub')
if pub_kv is True:
pub_kv = key, value
elif callable(pub_kv):
pub_kv = pub_kv((key,value))
if pub_kv is not None:
# noinspection PyTypeChecker
pubs.append((series,*pub_kv))
if diff.value is DELETE: if diff.value is DELETE:
if t == DataType.SET: if t == DataType.SET:
sdels[series].add(key) sdels[series].add(key)
@@ -67,7 +80,7 @@ class RedisState (SeriesCollection):
if t == DataType.SET: if t == DataType.SET:
sadds[series].add(key) sadds[series].add(key)
elif t == DataType.DICT: elif t == DataType.DICT:
hsets[series][key] = keystr(diff.value) hsets[series][key] = value
else: else:
raise NotImplementedError raise NotImplementedError
async with memcache.batch() as r: async with memcache.batch() as r:
@@ -80,5 +93,13 @@ class RedisState (SeriesCollection):
r.hset(series, mapping=kvs) r.hset(series, mapping=kvs)
for series, keys in hdels.items(): for series, keys in hdels.items():
r.hdel(series, *keys) r.hdel(series, *keys)
r.json(json_encoder).set(f'{current_chain.get().chain_id}|latest_block','$',block.data) block_series = f'{chain_id}|block.latest'
r.json(json_encoder).set(block_series,'$',block.data)
pubs.append((str(chain_id), 'block.latest', block.data))
# separate batch for
if pubs:
async with memcache.batch() as r:
r: Pipeline
io = Emitter(dict(client=r))
for s,k,v in pubs:
io.To(s).Emit(k,v)

View File

@@ -6,6 +6,8 @@ from hexbytes import HexBytes
from orjson import orjson from orjson import orjson
from web3.datastructures import AttributeDict from web3.datastructures import AttributeDict
from dexorder import DELETE
def _serialize(v): def _serialize(v):
if type(v) is HexBytes: if type(v) is HexBytes:
@@ -16,6 +18,8 @@ def _serialize(v):
return v.__dict__ return v.__dict__
elif type(v) is Decimal: elif type(v) is Decimal:
return f'{v:f}' return f'{v:f}'
elif v is DELETE:
return None
raise TypeError raise TypeError