diff --git a/bin/df.sh b/bin/df.sh new file mode 100755 index 0000000..df92638 --- /dev/null +++ b/bin/df.sh @@ -0,0 +1,2 @@ +#!/bin/bash +docker run --network=host --ulimit memlock=-1 docker.dragonflydb.io/dragonflydb/dragonfly --maxmemory 1G "$@" diff --git a/bin/reset_db_df.sh b/bin/reset_db_df.sh new file mode 100755 index 0000000..6f2a5e2 --- /dev/null +++ b/bin/reset_db_df.sh @@ -0,0 +1 @@ +alembic downgrade -1 && alembic upgrade head && ./bin/df.sh \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 5ed5b4c..5d926be 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,4 @@ sortedcontainers~=2.4.0 hexbytes~=0.3.1 defaultlist~=1.0.0 redis[hiredis] +socket.io-emitter diff --git a/src/dexorder/__init__.py b/src/dexorder/__init__.py index 3aee608..60b104c 100644 --- a/src/dexorder/__init__.py +++ b/src/dexorder/__init__.py @@ -4,6 +4,7 @@ from decimal import Decimal as dec class _NARG: def __bool__(self): return False NARG = _NARG() +DELETE = object() # used as a value token to indicate removal of the key ADDRESS_0 = '0x0000000000000000000000000000000000000000' WEI = 1 GWEI = 1_000_000_000 diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index 08cbac3..5a504a0 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -24,10 +24,10 @@ async def main(): state = None if memcache: await memcache.connect() - redis_state = RedisState(BlockData.by_tag['redis']) + redis_state = RedisState(BlockData.by_opt('redis')) if db: db.connect() - db_state = DbState(BlockData.by_tag['db']) + db_state = DbState(BlockData.by_opt('db')) with db.session: state = db_state.load() if state is not None: diff --git a/src/dexorder/blockstate/blockdata.py b/src/dexorder/blockstate/blockdata.py index b9f22d5..8ca6757 100644 --- a/src/dexorder/blockstate/blockdata.py +++ b/src/dexorder/blockstate/blockdata.py @@ -1,11 +1,9 @@ import logging -from collections import defaultdict from enum import Enum from typing import TypeVar, Generic, Iterable, Union -from dexorder import NARG +from dexorder import NARG, DELETE from dexorder.base.fork import current_fork -from .diff import DELETE from .state import current_blockstate log = logging.getLogger(__name__) @@ -21,16 +19,13 @@ class DataType(Enum): class BlockData: registry: dict[str,'BlockData'] = {} # series name and instance - by_tag: dict[str, list['BlockData']] = defaultdict(list) - def __init__(self, series:str, data_type: DataType, **tags): + def __init__(self, series:str, data_type: DataType, **opts): assert series not in BlockData.registry BlockData.registry[series] = self self.series = series self.type = data_type - for tag, value in tags.items(): - if value: - BlockData.by_tag[tag].append(self) + self.opts = opts def setitem(self, item, value, overwrite=True): state = current_blockstate.get() @@ -58,6 +53,10 @@ class BlockData: fork = current_fork.get() return state.iteritems(fork, series_key) + @staticmethod + def by_opt(key): + yield from (s for s in BlockData.registry.values() if key in s.opts) + class BlockSet(Generic[T], Iterable[T], BlockData): def __init__(self, series: str, **tags): @@ -104,7 +103,7 @@ class BlockDict(Generic[T], BlockData): class SeriesCollection: def __init__(self, series_or_datavars: Iterable[Union[str,BlockData]]): - self.types = { - (d:=BlockData.registry[x] if type(x) is str else x).series:d.type + self.datas: dict[str,BlockData] = { + (d := BlockData.registry[x] if type(x) is str else x).series: d for x in series_or_datavars } diff --git a/src/dexorder/blockstate/db_state.py b/src/dexorder/blockstate/db_state.py index 385d468..1ed7b72 100644 --- a/src/dexorder/blockstate/db_state.py +++ b/src/dexorder/blockstate/db_state.py @@ -20,7 +20,7 @@ class DbState(SeriesCollection): chain_id = current_chain.get().chain_id for diff in diffs: try: - t = self.types[diff.series] + t = self.datas[diff.series].type except KeyError: continue diffseries = keystr(diff.series) @@ -60,7 +60,8 @@ class DbState(SeriesCollection): state = BlockState(root_block) current_blockstate.set(state) current_fork.set(None) # root fork - for series, t in self.types.items(): + for series, data in self.datas.items(): + t = data.type if t == DataType.SET: # noinspection PyTypeChecker var: BlockSet = BlockData.registry[series] diff --git a/src/dexorder/blockstate/diff.py b/src/dexorder/blockstate/diff.py index 05fb48f..6a0871a 100644 --- a/src/dexorder/blockstate/diff.py +++ b/src/dexorder/blockstate/diff.py @@ -1,8 +1,7 @@ from dataclasses import dataclass from typing import Union, Any - -DELETE = object() # used as a value token to indicate removal of the key +from dexorder import DELETE @dataclass diff --git a/src/dexorder/data/__init__.py b/src/dexorder/data/__init__.py index e6398bc..328c9df 100644 --- a/src/dexorder/data/__init__.py +++ b/src/dexorder/data/__init__.py @@ -1,7 +1,11 @@ from dexorder.blockstate import BlockSet, BlockDict -vault_addresses = BlockSet('v', db=True, redis=True) -vault_tokens = BlockDict('vt', db=True, redis=True) -pool_prices = BlockDict('p', db=True, redis=True) +# pub=... publishes to a channel for web clients to consume. argument is (key,value) and return must be (event,room,value) +# if pub is True, then event is the current series name, room is the key, and value is passed through +# values of DELETE are serialized as nulls + +vault_addresses = BlockSet('v', db=True, redis=True, pub=True) +vault_tokens = BlockDict('vt', db=True, redis=True, pub=True) +pool_prices = BlockDict('p', db=True, redis=True, pub=True) underfunded_vaults = BlockSet('uv', db=True) active_orders = BlockSet('a', db=True) diff --git a/src/dexorder/memcache/memcache_state.py b/src/dexorder/memcache/memcache_state.py index 4eb77e0..bb0372e 100644 --- a/src/dexorder/memcache/memcache_state.py +++ b/src/dexorder/memcache/memcache_state.py @@ -1,8 +1,9 @@ import logging from collections import defaultdict -from typing import Iterable, Union, Reversible +from typing import Iterable, Union, Reversible, Any from redis.asyncio.client import Pipeline +from socket_io_emitter import Emitter from dexorder.base.chain import current_chain from dexorder.base.fork import current_fork @@ -26,21 +27,21 @@ class RedisState (SeriesCollection): async def clear(self): r = current_redis.get() - await r.delete(f'{current_chain.get().chain_id}|latest_block', *self.types.keys()) + await r.delete(f'{current_chain.get().chain_id}|latest_block', *self.datas.keys()) async def init(self, state: BlockState): fork = current_fork.get() await self.clear() diffs = [] - for series, t in self.types.items(): + for series in self.datas.keys(): for k, v in state.iteritems(fork, series): diffs.append(DiffItem(series, k, v)) await self.save(state.root_block, diffs) # noinspection PyAsyncCall - async def save(self, block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]] ): + async def save(self, block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]], *, publish=False ): # the diffs must be already compressed such that there is only one action per key chain = current_chain.get() assert block.chain == chain.chain_id @@ -49,13 +50,25 @@ class RedisState (SeriesCollection): sdels: dict[str,set[str]] = defaultdict(set) hsets: dict[str,dict[str,str]] = defaultdict(dict) hdels: dict[str,set[str]] = defaultdict(set) + pubs: list[tuple[str,str,Any]] = [] # series, key, value => room, event, value for diff in compress_diffs(diffs): try: - t = self.types[diff.series] + d = self.datas[diff.series] except KeyError: continue + t = d.type series = f'{chain_id}|{keystr(diff.series)}' key = keystr(diff.key) + value = keystr(diff.value) + # pub/sub socketio/redis + pub_kv = d.opts.get('pub') + if pub_kv is True: + pub_kv = key, value + elif callable(pub_kv): + pub_kv = pub_kv((key,value)) + if pub_kv is not None: + # noinspection PyTypeChecker + pubs.append((series,*pub_kv)) if diff.value is DELETE: if t == DataType.SET: sdels[series].add(key) @@ -67,7 +80,7 @@ class RedisState (SeriesCollection): if t == DataType.SET: sadds[series].add(key) elif t == DataType.DICT: - hsets[series][key] = keystr(diff.value) + hsets[series][key] = value else: raise NotImplementedError async with memcache.batch() as r: @@ -80,5 +93,13 @@ class RedisState (SeriesCollection): r.hset(series, mapping=kvs) for series, keys in hdels.items(): r.hdel(series, *keys) - r.json(json_encoder).set(f'{current_chain.get().chain_id}|latest_block','$',block.data) - + block_series = f'{chain_id}|block.latest' + r.json(json_encoder).set(block_series,'$',block.data) + pubs.append((str(chain_id), 'block.latest', block.data)) + # separate batch for + if pubs: + async with memcache.batch() as r: + r: Pipeline + io = Emitter(dict(client=r)) + for s,k,v in pubs: + io.To(s).Emit(k,v) diff --git a/src/dexorder/util/json.py b/src/dexorder/util/json.py index 9a688b9..d1895b0 100644 --- a/src/dexorder/util/json.py +++ b/src/dexorder/util/json.py @@ -6,6 +6,8 @@ from hexbytes import HexBytes from orjson import orjson from web3.datastructures import AttributeDict +from dexorder import DELETE + def _serialize(v): if type(v) is HexBytes: @@ -16,6 +18,8 @@ def _serialize(v): return v.__dict__ elif type(v) is Decimal: return f'{v:f}' + elif v is DELETE: + return None raise TypeError