blockstate rework, not debugged

This commit is contained in:
Tim
2024-04-03 00:03:01 -04:00
parent 26736ad437
commit 7acc51a652
48 changed files with 1126 additions and 870 deletions

View File

@@ -1,9 +1,9 @@
"""
initial schema
"""initial_schema
Revision ID: db62e7db828d
Revision ID: 516b55c83144
Revises:
Create Date: 2023-09-28 23:04:41.020644
Create Date: 2024-04-02 22:52:44.614707
"""
from typing import Sequence, Union
@@ -14,75 +14,26 @@ import dexorder.database.column_types
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = 'db62e7db828d'
revision: str = '516b55c83144'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table('block',
sa.Column('chain', sa.Integer(), nullable=False),
sa.Column('height', sa.Integer(), nullable=False),
sa.Column('hash', sa.LargeBinary(), nullable=False),
sa.Column('parent', sa.LargeBinary(), nullable=False),
sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint('chain', 'height', 'hash')
)
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('keyvalue',
sa.Column('key', sa.String(), nullable=False),
sa.Column('value', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.PrimaryKeyConstraint('key')
)
op.create_table('seriesdict',
sa.Column('value', sa.String(), nullable=False),
sa.Column('chain', sa.Integer(), nullable=False),
sa.Column('series', sa.String(), nullable=False),
sa.Column('key', sa.String(), nullable=False),
sa.PrimaryKeyConstraint('chain', 'series', 'key')
)
op.create_table('seriesset',
sa.Column('chain', sa.Integer(), nullable=False),
sa.Column('series', sa.String(), nullable=False),
sa.Column('key', sa.String(), nullable=False),
sa.PrimaryKeyConstraint('chain', 'series', 'key')
)
op.create_table('transactionjob',
sa.Column('id', sa.UUID(), nullable=False),
sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False),
sa.Column('height', sa.Integer(), nullable=False),
# sa.Column('state', sa.Enum(name='transactionjobstate'), nullable=False),
sa.Column('state', sa.Enum('Requested', 'Signed', 'Sent', 'Mined', name='transactionjobstate'), nullable=False),
sa.Column('request', dexorder.database.column_types.DataclassDictBase(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_transactionjob_chain'), 'transactionjob', ['chain'], unique=False)
op.create_index(op.f('ix_transactionjob_height'), 'transactionjob', ['height'], unique=False)
op.create_index(op.f('ix_transactionjob_state'), 'transactionjob', ['state'], unique=False)
op.create_table('tx',
sa.Column('id', postgresql.BYTEA(), nullable=False),
sa.Column('data', postgresql.BYTEA(), nullable=False),
sa.Column('job_id', sa.UUID(), nullable=False),
sa.Column('receipt', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.ForeignKeyConstraint(['job_id'], ['transactionjob.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_table('orderindex',
sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False),
sa.Column('vault', sa.String(), nullable=False),
sa.Column('order_index', sa.Integer(), nullable=False),
sa.Column('state', sa.Enum('Open', 'Canceled', 'Filled', 'Expired', 'Underfunded', name='swaporderstate'), nullable=False),
sa.Column('state', sa.Enum('Unknown', 'Signing', 'Underfunded', 'Open', 'Canceled', 'Expired', 'Filled', name='swaporderstate'), nullable=False),
sa.PrimaryKeyConstraint('chain', 'vault', 'order_index')
)
op.create_table('token',
sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False),
sa.Column('address', dexorder.database.column_types.Address(), nullable=False),
sa.Column('name', sa.String(), nullable=False),
sa.Column('symbol', sa.String(), nullable=False),
sa.Column('decimals', sa.SMALLINT(), nullable=False),
sa.Column('approved', sa.Boolean(), nullable=False),
sa.PrimaryKeyConstraint('chain', 'address')
)
op.create_table('pool',
sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False),
sa.Column('address', dexorder.database.column_types.Address(), nullable=False),
@@ -95,20 +46,64 @@ def upgrade() -> None:
)
op.create_index(op.f('ix_pool_base'), 'pool', ['base'], unique=False)
op.create_index(op.f('ix_pool_quote'), 'pool', ['quote'], unique=False)
op.create_table('seriesdict',
sa.Column('value', sa.String(), nullable=False),
sa.Column('chain', sa.Integer(), nullable=False),
sa.Column('series', sa.String(), nullable=False),
sa.Column('key', sa.String(), nullable=False),
sa.PrimaryKeyConstraint('chain', 'series', 'key')
)
op.create_table('seriesset',
sa.Column('chain', sa.Integer(), nullable=False),
sa.Column('series', sa.String(), nullable=False),
sa.Column('key', sa.String(), nullable=False),
sa.PrimaryKeyConstraint('chain', 'series', 'key')
)
op.create_table('token',
sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False),
sa.Column('address', dexorder.database.column_types.Address(), nullable=False),
sa.Column('name', sa.String(), nullable=False),
sa.Column('symbol', sa.String(), nullable=False),
sa.Column('decimals', sa.SMALLINT(), nullable=False),
sa.Column('approved', sa.Boolean(), nullable=False),
sa.PrimaryKeyConstraint('chain', 'address')
)
op.create_table('transactionjob',
sa.Column('id', sa.UUID(), nullable=False),
sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False),
sa.Column('height', sa.Integer(), nullable=False),
sa.Column('state', sa.Enum('Requested', 'Signed', 'Sent', 'Mined', name='transactionjobstate'), nullable=False),
sa.Column('request', dexorder.database.column_types.DataclassDictBase(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_transactionjob_chain'), 'transactionjob', ['chain'], unique=False)
op.create_index(op.f('ix_transactionjob_height'), 'transactionjob', ['height'], unique=False)
op.create_index(op.f('ix_transactionjob_state'), 'transactionjob', ['state'], unique=False)
op.create_table('tx',
sa.Column('id', postgresql.BYTEA(), nullable=False),
sa.Column('data', postgresql.BYTEA(), nullable=False),
sa.Column('job_id', sa.UUID(), nullable=False),
sa.Column('receipt', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.ForeignKeyConstraint(['job_id'], ['transactionjob.id'], ),
sa.PrimaryKeyConstraint('id')
)
# ### end Alembic commands ###
def downgrade() -> None:
op.drop_table('tx')
op.drop_index(op.f('ix_transactionjob_state'), table_name='transactionjob')
op.drop_index(op.f('ix_transactionjob_height'), table_name='transactionjob')
op.drop_index(op.f('ix_transactionjob_chain'), table_name='transactionjob')
op.drop_table('transactionjob')
op.drop_table('token')
op.drop_table('seriesset')
op.drop_table('seriesdict')
op.drop_index(op.f('ix_pool_quote'), table_name='pool')
op.drop_index(op.f('ix_pool_base'), table_name='pool')
op.drop_table('pool')
op.drop_table('orderindex')
op.drop_table('seriesset')
op.drop_table('seriesdict')
op.drop_table('keyvalue')
op.drop_table('block')
op.drop_table('tx')
op.drop_table('transactionjob')
op.execute('drop type swaporderstate') # enum type
op.execute('drop type transactionjobstate') # enum type
op.execute('drop type exchange') # enum type

View File

@@ -0,0 +1,66 @@
import logging
from typing import TypedDict, Literal
from dexorder.util import hexbytes, hexint, hexstr
log = logging.getLogger(__name__)
class BigNumber (TypedDict):
type: Literal['BigNumber']
hex: str
class BlockInfo (TypedDict):
number: int
hash: bytes
parentHash: bytes
nonce: bytes
sha3Uncles: bytes
logsBloom: bytes
transactionsRoot: bytes
stateRoot: bytes
receiptsRoot: bytes
miner: str
difficulty: int
totalDifficulty: int
extraData: bytes
size: int
gasLimit: BigNumber
gasUsed: BigNumber
timestamp: int
transactions: list[bytes]
uncles: list[bytes]
mixHash: bytes
baseFeePerGas: int
def block_db_key(chain_id: int, blockhash: bytes):
return f'Block|{chain_id}|{hexstr(blockhash)}'
class Block:
def __init__(self, chain_id: int, blockdata: BlockInfo):
self.chain_id = chain_id
self.hash = hexbytes(blockdata['hash'])
# noinspection PyTypeChecker
self.height = hexint(blockdata['number'])
self.timestamp = hexint(blockdata['timestamp'])
self.parent = hexbytes(blockdata['parentHash'])
self.data = blockdata
@property
def db_key(self):
return block_db_key(self.chain_id, self.hash)
def __str__(self):
return f'{self.height}_{hexstr(self.hash)[2:7]}'
def __hash__(self):
return hash(self.hash) # blockhashes should be unique even across chains
def __eq__(self, other):
return self.hash == other.hash and self.chain_id == other.chain_id
latest_block: dict[int,Block] = {} # most recent discovered block but maybe not the currently processing one, indexed by chain_id

View File

@@ -26,7 +26,7 @@ class Blockchain:
confirms is the number of blocks until a block can be considered finalized and unforkable
batch_size is the number of blocks to fetch per logs query
"""
self.chain_id = chain_id
self.id = chain_id
self.name = name
self.confirms = confirms
self.batch_size = batch_size
@@ -48,10 +48,10 @@ Polygon = Blockchain(137, 'Polygon') # POS not zkEVM
Mumbai = Blockchain(80001, 'Mumbai')
BSC = Blockchain(56, 'BSC')
Arbitrum = Blockchain(42161, 'Arbitrum', 3, batch_size=2000)
Mock = Blockchain(31337, 'Mock', 3, batch_size=10000)
Mock = Blockchain(31337, 'Mock', 3, batch_size=2000)
Alpha = Blockchain(1337, 'Dexorder Alpha', 3, batch_size=1000)
current_chain = ContextVar[Blockchain]('current_chain')
current_chain = ContextVar[Blockchain]('current_chain', default=Mock)
class BlockClock:

View File

@@ -1,76 +0,0 @@
import logging
from contextvars import ContextVar
from typing import Iterable, Optional
from dexorder.database.model import Block
log = logging.getLogger(__name__)
class Fork:
"""
A Fork is an ancestor path, stored as block hashes in reverse-chronological order from the "current" block at the start to ancestors at the end. The
getitem [] operator indexes by block height for positive values, while negative value are relative to the latest block, so [-1] is the latest
block and [-2] is its parent, etc. Any blocks older than the tail of the fork are considered finalized and may be referenced by height.
"""
def __init__(self, ancestry: Iterable[bytes], *, height: int):
self.ancestry = list(ancestry)
self.height = height
self.disjoint = False
def __contains__(self, item):
"""
item can be a Block or another Fork. returns True iff the given item appears on this fork. if item is ahead of this fork
or a cousin chain, returns False
"""
index = self.height - item.height # index is reverse chronological in order to index our ancentry list
if index < 0: # item is ahead of us in height
return False
if index >= len(self.ancestry): # item is older than this fork
return True # consider old blocks settled and on this fork
return self.ancestry[index] == item.hash
@property
def hash(self):
return self.ancestry[0]
@property
def parent(self):
return self.ancestry[1] if len(self.ancestry) > 1 else None
def for_height(self, height):
""" returns a new Fork object for an older block along this fork. used for root promotion. """
if height > self.height :
raise ValueError
if height <= self.height - len(self.ancestry):
return None
return Fork(self.ancestry[self.height-height:], height=height)
def __str__(self):
return f'{self.height}_[{"->".join(h.hex() for h in self.ancestry)}]'
class DisjointFork:
"""
duck type of Fork for blocks that connect directly to root with a parent gap in-between. these forks are associated with backfill.
"""
def __init__(self, block: Block, root: Block):
self.height = block.height
self.hash = block.hash
self.parent = root.hash
self.disjoint = True
self.root = root
def __contains__(self, item):
if item.height > self.height:
return False # item is in the future
if item.height < self.root.height:
return True # item is ancient
return item.hash in (self.hash, self.parent)
def __str__(self):
return f'{self.height}_[{self.hash.hex()}->{self.parent.hash.hex()}]'
current_fork = ContextVar[Optional[Fork]]('current_fork', default=None)

View File

@@ -47,12 +47,10 @@ class SwapOrder:
chainOrder: int
tranches: list['Tranche']
state: SwapOrderState # this is not in the blockchain orderstatus: it's a computed and cached field.
@staticmethod
def load(obj):
return SwapOrder(obj[0], obj[1], Route.load(obj[2]), int(obj[3]), int(obj[4]), obj[5], obj[6], obj[7],
[Tranche.load(t) for t in obj[8]], SwapOrderState.Unknown)
[Tranche.load(t) for t in obj[8]])
def dump(self):
return (self.tokenIn, self.tokenOut, self.route.dump(), str(self.amount), str(self.minFillAmount), self.amountIsInput,

View File

@@ -3,29 +3,30 @@ import sys
from asyncio import CancelledError
from typing import Union, Reversible
from dexorder import blockchain, config, from_timestamp, now
from dexorder import blockchain, config, from_timestamp
from dexorder.bin.executable import execute
from dexorder.blocks import get_block_timestamp
from dexorder.blockstate import DiffItem
from dexorder.blockstate.blockdata import BlockData
from dexorder.blockstate.db_state import DbState
from dexorder.blockstate.diff import DiffEntryItem
from dexorder.blockstate.fork import Fork
from dexorder.configuration import parse_args
from dexorder.contract import get_contract_event
from dexorder.database import db
from dexorder.database.model import Block
from dexorder.event_handler import check_ohlc_rollover, handle_uniswap_swaps
from dexorder.memcache import memcache
from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.ohlc import recent_ohlcs, ohlc_save, ohlcs
from dexorder.runner import BlockStateRunner
from dexorder.util import hexstr
log = logging.getLogger('dexorder.backfill')
def finalize_callback(block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
ohlc_save(block, diffs)
log.info(f'backfill completed through block {block.height} {from_timestamp(block.timestamp):%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}')
async def finalize_callback(fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
ohlc_save(diffs)
ts = await get_block_timestamp(fork.head)
log.info(f'backfill completed through block {fork.height} {from_timestamp(ts):%Y-%m-%d %H:%M:%S}')
# noinspection DuplicatedCode

View File

@@ -7,7 +7,7 @@ from dexorder.base.chain import current_chain
from dexorder.bin.executable import execute
from dateutil.parser import parse as parse_date
from dexorder.database.model import Block
from dexorder.base.block import Block
log = logging.getLogger(__name__)
@@ -15,9 +15,9 @@ log = logging.getLogger(__name__)
async def main():
log.debug(f'Finding block nearest to {time}')
w3 = await blockchain.connect()
chain_id = current_chain.get().chain_id
chain_id = current_chain.get().id
blockdata = await w3.eth.get_block('latest')
latest = cur = Block.from_data(chain_id, blockdata)
latest = cur = Block(chain_id, blockdata)
while True:
cur_time = from_timestamp(cur.timestamp)
delta = (time - cur_time).total_seconds()
@@ -28,7 +28,7 @@ async def main():
elif estimated == cur.height:
print(f'Closest block to {time}: {cur.height} {cur_time}')
exit(0)
cur = Block.from_data(chain_id, await w3.eth.get_block(estimated))
cur = Block(chain_id, await w3.eth.get_block(estimated))
if __name__ == '__main__':
if len(sys.argv) < 3:

View File

@@ -7,12 +7,13 @@ from web3.types import EventData
from dexorder import from_timestamp, blockchain, config
from dexorder.addrmeta import address_metadata
from dexorder.base.block import latest_block
from dexorder.base.chain import current_chain
from dexorder.bin.executable import execute
from dexorder.blocks import get_block_timestamp
from dexorder.blocks import get_block_timestamp, get_block_by_number
from dexorder.blockstate.fork import current_fork
from dexorder.configuration import parse_args
from dexorder.contract import get_contract_event
from dexorder.database.model.block import current_block, latest_block
from dexorder.ohlc import FinalOHLCRepository
from dexorder.pools import get_uniswap_data
from dexorder.util import hexstr
@@ -37,21 +38,23 @@ async def handle_backfill_uniswap_swaps(swaps: list[EventData]):
# log.debug(f'OHLC {pool["address"]} {time} {price}')
ohlcs.light_update_all(pool['address'], time, price)
def flush_callback():
async def flush_callback():
# start = now()
# log.info("finalizing OHLC's")
# log.info(f'\ttook {(now() - start).total_seconds():.1f} seconds')
block = current_block.get()
confirms = (config.confirms if config.confirms is not None else current_chain.get().confirms) - 1
if latest_block.get().height - block.height <= 2*confirms:
chain_id = current_chain.get().id
fork = current_fork.get()
block = await get_block_by_number(fork.height, chain_id=chain_id)
time = from_timestamp(block.timestamp)
if latest_block[chain_id].height - fork.height <= 2*confirms:
log.info(f'forward filling to present time')
for addr, data in address_metadata.items():
if data['type'] == 'Pool' and data['exchange'] >= 0:
ohlcs.light_update_all(addr, from_timestamp(block.timestamp), None)
ohlcs.light_update_all(addr, time, None)
log.info("flushing OHLC's")
ohlcs.flush()
log.info(f'backfill completed through block {block.height} '
f'{from_timestamp(block.timestamp):%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}')
log.info(f'backfill completed through block {block.height} {time:%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}')
async def main():
logging.basicConfig(level=logging.INFO, stream=sys.stdout)

View File

@@ -1,7 +1,7 @@
import logging
from asyncio import CancelledError
from dexorder import db, blockchain, config
from dexorder import db, blockchain
from dexorder.base.chain import current_chain
from dexorder.bin.executable import execute
from dexorder.blockstate.blockdata import BlockData
@@ -14,9 +14,8 @@ from dexorder.event_handler import init_order_triggers, init, dump_log, handle_v
process_active_tranches, process_execution_requests, check_ohlc_rollover, handle_uniswap_swaps
from dexorder.memcache import memcache
from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.ohlc import ohlc_save
from dexorder.runner import BlockStateRunner
from dexorder.transaction import handle_transaction_receipts, create_transactions, send_transactions
from dexorder.transaction import handle_transaction_receipts
log = logging.getLogger('dexorder')
LOG_ALL_EVENTS = False # for debug todo config
@@ -80,18 +79,19 @@ async def main():
db.connect()
db_state = DbState(BlockData.by_opt('db'))
with db.session:
state = db_state.load()
state = await db_state.load()
if state is None:
log.info('no state in database')
else:
if redis_state:
await redis_state.init(state)
log.info(f'loaded state from db for root block {state.root_block}')
await redis_state.init(state, state.root_fork)
log.info(f'loaded state from db for root block {state.root_branch.height}')
runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None)
setup_logevent_triggers(runner)
if config.ohlc_dir:
runner.on_promotion.append(ohlc_save)
# OHLC printing hard-disabled for main. Use the finaldata process.
# if config.ohlc_dir:
# runner.on_promotion.append(ohlc_save)
if db:
runner.on_state_init.append(init_order_triggers)
# noinspection PyUnboundLocalVariable

View File

@@ -110,7 +110,7 @@ async def write_metadata( pools, mirror_pools ):
last_prices = {}
async def complete_update(mirrorenv, pool, price, tx):
async def complete_update(_mirrorenv, pool, price, tx):
await tx.wait()
last_prices[pool] = price
log.debug(f'Mirrored {pool} {price}')

View File

@@ -1,29 +0,0 @@
# Prints a JSON string to stdout containing metadata information for all the known tokens and pools
#
# see metadata.py
import logging
import sys
from sqlalchemy import select
from dexorder import db
from dexorder.configuration import parse_args
from dexorder.database.model import Pool, Token
from dexorder.metadata import generate_metadata
log = logging.getLogger(__name__)
def main():
logging.basicConfig(level=logging.INFO, stream=sys.stderr)
log.setLevel(logging.DEBUG)
parse_args()
db.connect(migrate=False)
tokens = db.session.scalars(select(Token))
pools = db.session.scalars(select(Pool))
generate_metadata(tokens, pools)
if __name__ == '__main__':
main()

View File

@@ -19,20 +19,20 @@ class ByBlockchainCollection (Generic[_T]):
self.by_blockchain = by_blockchain if by_blockchain is not None else {}
def __getitem__(self, item) -> _T:
return self.by_blockchain[current_chain.get().chain_id][item]
return self.by_blockchain[current_chain.get().id][item]
class ByBlockchainDict (ByBlockchainCollection[_T], Generic[_T]):
def __getattr__(self, name: str) -> _T:
return self.by_blockchain[current_chain.get().chain_id][name]
return self.by_blockchain[current_chain.get().id][name]
def get(self, item, default=None, *, chain_id=None) -> _T:
# will raise if default is NARG
if chain_id is None:
chain_id = current_chain.get().chain_id
chain_id = current_chain.get().id
if chain_id is None:
raise KeyError('no ctx.chain_id set')
raise KeyError('no current_chain set')
found = self.by_blockchain.get(chain_id, {}).get(item, default)
if found is NARG:
raise KeyError
@@ -41,16 +41,16 @@ class ByBlockchainDict (ByBlockchainCollection[_T], Generic[_T]):
class ByBlockchainList (ByBlockchainCollection[_T], Generic[_T]):
def __iter__(self) -> Iterator[_T]:
return iter(self.by_blockchain[current_chain.get().chain_id])
return iter(self.by_blockchain[current_chain.get().id])
def iter(self, *, chain_id=None) -> Iterator[_T]:
if chain_id is None:
chain_id = current_chain.get().chain_id
chain_id = current_chain.get().id
return iter(self.by_blockchain[chain_id])
def get(self, index, *, chain_id=None) -> _T:
if chain_id is None:
chain_id = current_chain.get().chain_id
chain_id = current_chain.get().id
if chain_id is None:
raise KeyError('no ctx.chain_id set')
raise KeyError('no current_chain set')
return self.by_blockchain[chain_id][index]

View File

@@ -1,12 +1,19 @@
"""
Blocks are stored locally in an LRU cache and queried via RPC lazily.
Use `await get_block()` to retreive a Block from a given hash using the full caching mechanism.
Use `await fetch_block()` to force an RPC query for the Block, adding that block to the LRU cache.
"""
import logging
from typing import Union
from async_lru import alru_cache
from cachetools import LRUCache
from dexorder import current_w3
from dexorder import current_w3, NARG, config
from dexorder.base.block import Block, BlockInfo
from dexorder.base.chain import current_chain
from dexorder.blockstate import current_blockstate
from dexorder.database.model import Block
from dexorder.util import hexint
from dexorder.util.async_dict import AsyncDict
log = logging.getLogger(__name__)
@@ -16,13 +23,54 @@ async def get_block_timestamp(blockhash) -> int:
return block.timestamp
@alru_cache(maxsize=128)
async def get_block(blockhash) -> Block:
# first look in the state
async def _cache_fetch(key: tuple[int, bytes], default: Union[Block, NARG]) -> Block:
assert default is NARG
# try LRU cache first
try:
return current_blockstate.get().by_hash[blockhash]
except (LookupError, KeyError):
return _lru[key]
except KeyError:
pass
# otherwise query
# fetch from RPC
chain_id, blockhash = key
result = await fetch_block(blockhash, chain_id=chain_id)
_lru[key] = result
return result
_lru = LRUCache[tuple[int, bytes], Block](maxsize=128)
_cache = AsyncDict[tuple[int, bytes], Block](fetch=_cache_fetch)
def cache_block(block: Block):
_lru[block.chain_id, block.hash] = block
async def get_block(blockhash, *, chain_id=None) -> Block:
if chain_id is None:
chain_id = current_chain.get().id
return await _cache.get((chain_id, blockhash))
async def get_block_by_number(height: int, *, chain_id=None) -> Block:
if chain_id is None:
chain_id = current_chain.get().id
response = await current_w3.get().provider.make_request('eth_getBlockByNumber', [height, False])
block = Block(chain_id, response['result'])
cache_block(block)
return block
async def fetch_block(blockhash, *, chain_id=None):
if chain_id is None:
chain_id = current_chain.get().id
response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False])
return Block.from_data(current_chain.get().chain_id, response['result'])
blockdict: BlockInfo = response['result']
block = Block(chain_id, blockdict)
# if db:
# db.kv[block.db_key] = blockdict
cache_block(block)
return block
def promotion_height(chain, latest_height):
confirm_offset = (config.confirms if config.confirms is not None else chain.confirms) - 1
return latest_height - confirm_offset

View File

@@ -1,11 +1,10 @@
import copy
import json
import logging
from enum import Enum
from typing import TypeVar, Generic, Iterable, Union, Any, Iterator, Callable
from typing import TypeVar, Generic, Iterable, Union, Any, Iterator, Callable, Optional
from dexorder import NARG, DELETE
from dexorder.base.fork import current_fork
from dexorder.blockstate.fork import current_fork
from .state import current_blockstate
from dexorder.util import key2str as util_key2str, str2key as util_str2key
@@ -43,7 +42,8 @@ class BlockData (Generic[T]):
self.value2str = value2str
self.str2value = str2value
self.savecb = savecb
self.lazy_getitem = None
# set this to a method which fetches final data (e.g. database)
self.lazy_getitem: Optional[Callable[['BlockData',Any],Union[NARG,T]]] = None
@property
def seriesstr(self):
@@ -63,10 +63,9 @@ class BlockData (Generic[T]):
result = default
if self.lazy_getitem:
lazy = self.lazy_getitem(self, item)
if lazy is not None:
lookup_fork, lookup_value = lazy
if lookup_fork in fork:
result = lookup_value
if lazy is not NARG:
state.set(state.root_fork, self.series, item, lazy)
result = lazy
if result is NARG:
raise KeyError
return result
@@ -126,7 +125,7 @@ class BlockSet(Generic[T], Iterable[T], BlockData[T]):
def add(self, item: T):
""" set-like semantics. the item key is added with a value of None. """
self.setitem(item, None, overwrite=False)
self.setitem(item, None, overwrite=False) # setting overwrite to False means don't create a new DiffEntry if the key exists
def remove(self, item: T):
self.delitem(item)

View File

@@ -0,0 +1,66 @@
from contextvars import ContextVar
from uuid import uuid4, UUID
from dexorder.base.block import Block
from dexorder.base.chain import current_chain
from dexorder.util import hexstr
BranchId = UUID
class Branch:
def __init__(self, height, start, parent=bytes(), path: list[bytes] = None, *, chain=None):
assert (0 if path is None else len(path)) <= height - start + 1
if chain is None:
chain = current_chain.get()
self.id: BranchId = uuid4()
self.chain = chain
self.height = height # highest block number in the path
self.start = start # lowest block number in the path
# parent is the blockhash of the block from which this branch started. Empty bytes indicates unknown.
self.parent = parent
# path is a list of blockhashes included in the branch, from highest block to lowest. path[0], if present, must
# be the hash of the head block in the branch.
# Branches without a complete path are called "disjoint" since their interior is unspecified. Branches that do have
# explicit paths are called "contiguous."
self.path = path if path is not None else []
@property
def head(self):
""" the blockhash of the head of this Branch, if known """
return None if not self.path else self.path[0]
@property
def disjoint(self):
""" branches that are disjoint do not have a complete list of blockhashes for their interior path """
return not self.contiguous
@property
def contiguous(self):
""" contiguous branches have a complete list of blockhashes in their path attribute """
return len(self.path) == self.height - self.start + 1
@staticmethod
def from_block(block: Block) -> 'Branch':
""" create a new Branch from a single Block """
return Branch(chain=block.chain_id, height=block.height, start=block.height,
parent=block.parent, path=[block.hash])
@staticmethod
def from_blocks(blocks: list[Block]):
""" create a new Branch from a list of Block objects """
# check continuity of block parents
assert all(b.parent == a.hash for a, b in zip(blocks, blocks[1:]))
return Branch(chain=blocks[0].chain_id, height=blocks[-1].height, start=blocks[0].height,
parent=blocks[0].parent, path=[b.hash for b in blocks])
def __len__(self):
return self.height - self.start + 1
def __str__(self):
# noinspection PyTypeChecker
return (f"Branch#{str(self.id)[2:7]}[" +
(','.join(hexstr(b)[2:7] for b in self.path) if self.path else f'{self.start},{self.height}') + ']')
current_branch = ContextVar[Branch]('current_branch')

View File

@@ -1,15 +1,14 @@
import logging
from typing import Iterable, Optional, Union, Any
from . import BlockSet, BlockDict, BlockState, current_blockstate, DataType
from dexorder.blockstate.fork import Fork
from . import BlockSet, BlockDict, BlockState, DataType
from .blockdata import BlockData, SeriesCollection
from .diff import DiffItem, DiffEntryItem
from .. import db, DELETE
from ..base.chain import current_chain
from ..base.fork import current_fork, Fork
from ..database.model import SeriesSet, SeriesDict, Block
from ..database.model.block import current_block, latest_block, completed_block
from ..util import hexbytes
from ..blocks import get_block
from ..database.model import SeriesSet, SeriesDict
log = logging.getLogger(__name__)
@@ -25,21 +24,16 @@ class DbState(SeriesCollection):
@staticmethod
def lazy_getitem(var: BlockData, item):
chain_id = current_chain.get().chain_id
chain_id = current_chain.get().id
t = var.type
Entity = SeriesSet if t == DataType.SET else SeriesDict if t == DataType.DICT else None
series = var.series2str(var.series)
key = var.key2str(item)
try:
height, blockhash = db.kv[f'root_block|{chain_id}']
except Exception:
return None
fork = Fork([hexbytes(blockhash)], height=height)
value = db.session.get(Entity, (chain_id, series, key))
return fork, var.str2value(value.value)
return var.str2value(value.value)
def save(self, root_block: Block, diffs: Iterable[Union[DiffItem,DiffEntryItem]] ):
chain_id = current_chain.get().chain_id
def save(self, fork: Fork, diffs: Iterable[Union[DiffItem,DiffEntryItem]] ):
chain_id = current_chain.get().id
for diff in diffs:
try:
d = self.datas[diff.series]
@@ -70,23 +64,19 @@ class DbState(SeriesCollection):
if d.savecb:
d.savecb(diff.key, diff.value)
# save root block info
db.kv[f'root_block|{root_block.chain}'] = [root_block.height, root_block.hash]
db.kv[f'root_block|{chain_id}'] = [fork.height, fork.head]
# noinspection PyShadowingBuiltins
def load(self) -> Optional[BlockState]:
chain_id = current_chain.get().chain_id
async def load(self) -> Optional[BlockState]:
chain_id = current_chain.get().id
try:
height, hash = db.kv[f'root_block|{chain_id}']
except (KeyError, ValueError):
return None
root_block = db.session.get(Block, dict(chain=chain_id, height=height, hash=hexbytes(hash)))
if root_block is None:
return None
current_block.set(root_block)
latest_block.set(root_block)
state = BlockState(root_block)
current_blockstate.set(state)
current_fork.set(None) # root fork
root_block = await get_block(hash)
assert root_block.height == height
state = BlockState()
root_fork = state.init_root_block(root_block)
for series, data in self.datas.items():
if data.opts.get('db') != 'lazy':
log.debug(f'loading series {series}')
@@ -97,7 +87,7 @@ class DbState(SeriesCollection):
for row in db.session.query(SeriesSet).where(SeriesSet.chain == chain_id, SeriesSet.series == data.series2str(series)):
key = data.str2key(row.key)
log.debug(f'load {series} {key}')
var.add(key)
state.set(root_fork, var.series, key, None, overwrite=False)
elif t == DataType.DICT:
# noinspection PyTypeChecker
var: BlockDict = BlockData.registry[series]
@@ -105,7 +95,6 @@ class DbState(SeriesCollection):
key = data.str2key(row.key)
value = data.str2value(row.value)
# log.debug(f'load {series} {key} {value}')
var[key] = value
completed_block.set(root_block)
state.set(root_fork, var.series, key, value, overwrite=True)
log.debug(f'loaded db state from block {root_block}')
return state

View File

@@ -9,7 +9,7 @@ class DiffEntry:
""" DiffEntry is the "value" part of a key-value pair, but DiffEntry also has metadata about the block in which the value was set """
value: Union[Any, DELETE]
height: int
hash: bytes
branch_id: int
@dataclass
@@ -34,5 +34,5 @@ class DiffEntryItem:
return self.entry.value
def __str__(self):
return (f'{self.entry.hash.hex()} {self.series}.{self.key}='
return (f'B{self.entry.branch_id} {self.series}.{self.key}='
f'{"[DEL]" if self.entry.value is DELETE else self.entry.value}')

View File

@@ -0,0 +1,37 @@
import logging
from contextvars import ContextVar
from typing import Optional, Sequence
from dexorder import NARG
from dexorder.blockstate.branch import Branch
log = logging.getLogger(__name__)
class Fork:
"""
A Fork is a collection of Branches describing a path through the blockchain. Branches are stored in reverse
chronological order from newest (branches[0]) to oldest (branches[-1]).
"""
def __init__(self, branches: Sequence[Branch]):
assert len(branches) > 0
self.branches = branches
# "properties"
self.branch = self.branches[0]
self.branch_id = self.branch.id
self.branch_ids = [b.id for b in branches]
self.height = self.branch.height
self.start = branches[-1].height
self.head = self.branch.head
self.parent = branches[-1].parent
def __str__(self):
return f'Fork[{"<-".join(str(b) for b in self.branches)}]'
current_fork = ContextVar[Optional[Fork]]('current_fork', default=NARG)

View File

@@ -1,18 +1,20 @@
import itertools
import logging
from collections import defaultdict
# noinspection PyPackageRequirements
from contextvars import ContextVar
from typing import Any, Optional, Union, Reversible
from typing import Any, Optional, Reversible, Sequence
from sortedcontainers import SortedList
from dexorder import NARG
from dexorder.base.fork import Fork, DisjointFork
from dexorder.database.model import Block
from dexorder.util import hexstr
from dexorder.blockstate.fork import Fork
from .branch import Branch, BranchId
from .diff import DiffEntry, DELETE, DiffEntryItem
from ..base.block import Block
from ..util import hexstr
log = logging.getLogger(__name__)
state_log = logging.getLogger('dexorder.state')
def compress_diffs(difflist: Reversible):
@@ -31,86 +33,143 @@ def compress_diffs(difflist: Reversible):
class BlockState:
by_chain: dict[int, 'BlockState'] = {}
"""
Since recent blocks can be part of temporary forks, we need to be able to undo certain operations if they were part of a reorg. Instead of implementing
undo, we recover state via snapshot plus replay of recent diffs. When old blocks become low enough in the blockheight they may be considered canonical
at which point the deltas may be reliably incorporated into a rolling permanent collection. BlockState manages separate memory areas
for every block, per-block state that defaults to its parent's state, up the ancestry tree to the root. State clients may read the state for their block,
by applying any diffs along the block's fork path to the root data.
Since recent blocks can be part of temporary forks, we need to be able to undo certain operations if they were part
of a reorg. Instead of implementing undo, we recover state via snapshot plus replay of recent diffs. When old
blocks become low enough in the blockheight, they may be considered finalized at which point the deltas may be
reliably incorporated into a rolling permanent collection (the database.) BlockState manages separate memory areas
for every Branch, which is a segment of the blockchain that represents either a generic range (if the Branch
has no path) or an explicit set of block hashes (if the Branch does specify a path.) The Runner processes events
on a per-Branch basis, and eventually chooses to promote a branch of data after it has aged into finalization.
The primary data structure is diffs_by_series, which keys by [series][item] into a list of DiffEntry's which are
sorted by block height. Access of a series item scans the list of diffs in reverse blockheight order, returning
the first valid diff it finds. Diffs are valid if the Branch that generated them is part of the current Fork. A
fork is a list of Branches which describes a path from the current block backwards to, and including, the root
branch. The diff lists are garbage collected during set(). If there is more than one diff whose height is older
than the current root branch, then we need only keep the latest value and may discard the elder. Furthermore,
when branches become older than the root branch but are not promoted, they are discarded, ensuring that data
with a diff height of the root branch or older is always part of the finalized blockchain.
"""
def __init__(self, root_block: Block):
self.root_block: Block = root_block
self.by_height: SortedList[Block] = SortedList(key=lambda x: x.height)
self.by_hash: dict[bytes, Block] = {root_block.hash: root_block}
# diffs_by_series is the main data structure. leaf nodes are list of diffs sorted by blockheight
self.diffs_by_series: dict[Any, dict[Any, SortedList[DiffEntry]]] = defaultdict(lambda: defaultdict(lambda: SortedList(key=lambda x: x.height)))
# diffs_by_hash holds the diff items generated by each block
self.diffs_by_hash: dict[bytes, list[DiffEntryItem]] = defaultdict(list)
self.ancestors: dict[bytes, Block] = {}
self.unloads: dict[bytes, list] = defaultdict(list)
BlockState.by_chain[root_block.chain] = self
def __init__(self):
self._root_branch: Optional[Branch] = None
self._root_fork: Optional[Fork] = None
def add_block(self, block: Block) -> Optional[Fork]:
# Branches indexed by height
self.branches_by_height: dict[int, list[Branch]] = defaultdict(list)
# Branches indexed by id
self.branches_by_id: dict[BranchId, Branch] = {}
# diffs_by_series is the main data structure. leaf nodes are lists of DiffEntrys ordered highest height first
self.diffs_by_series: dict[Any, dict[Any, SortedList[DiffEntry]]] = defaultdict(dict)
# diffs_by_hash holds the diff items generated by each block. this is needed for cleanup of the
# diffs_by_series data structure when branches expire into history. diffs are stored in insertion order.
self.diffs_by_branch: dict[BranchId, list[DiffEntryItem]] = defaultdict(list)
self.unloads: dict[BranchId, list] = defaultdict(list) # unload requests for lazy series, keyed by branch id
@property
def root_branch(self):
return self._root_branch
@root_branch.setter
def root_branch(self, value: Branch):
self._root_branch = value
self._root_fork = Fork([value])
@property
def root_fork(self):
return self._root_fork
@property
def root_hash(self):
return self._root_branch.head
def init_root_block(self, root_block: Block) -> Fork:
assert self.root_branch is None
return self.add_branch(Branch.from_block(root_block))
@property
def heads(self):
result = set(b.head for b in self.branches_by_id.values() if b.head is not None)
result.add(self.root_branch.head)
return result
def add_branch(self, branch: Branch, *, strict=True) -> Fork:
"""
If block is the same age as root_height or older, it is ignored and None is returned. Otherwise, returns a Fork leading to root.
The ancestor block is set in the ancestors dictionary and any state updates to block are considered to have occured between the registered ancestor
block and the given block. This could be an interval of many blocks, and the ancestor does not need to be the block's immediate parent.
If there is no root_branch set yet, this branch becomes the root branch. Otherwise, returns a Fork with the
set of branches leading to the root.
raises ValueError if no path from this branch to the root branch can be found.
If strict is True, then a ValueError is raised if the branch does not have a parent hash set. strict
should only be set to False when it is assured that the branch may be joined by height alone, because
the branch join is known to be at a live-blockchain-finalized height.
"""
# check height
height_diff = block.height - self.root_block.height
if height_diff <= 0:
log.debug(f'IGNORING old block {block}')
return None
if block.hash not in self.by_hash:
self.by_hash[block.hash] = block
parent = self.by_hash.get(block.parent)
self.ancestors[block.hash] = parent or self.root_block
self.by_height.add(block)
log.debug(f'new block state {block}')
return self.fork(block)
assert branch.id not in self.branches_by_id
if self.root_branch is None:
self.root_branch = branch
state_log.info(f'Initialized BlockState with {branch}')
return Fork([self.root_branch])
self.branches_by_height[branch.height].append(branch)
self.branches_by_id[branch.id] = branch
# search for a path to the root branch
def build_fork(cur: Branch) -> list[Branch]:
if cur == self.root_branch:
return [cur]
if strict and not cur.parent:
raise ValueError(f'No parent for branch {branch}')
parent_branches = [
p for p in self.branches_by_height.get(cur.start-1, [])
if not strict or cur.parent == p.head
]
if cur.parent == self.root_branch.head or not strict and cur.start == self.root_branch.height + 1:
parent_branches.append(self.root_branch)
if not parent_branches:
raise ValueError
def branch_score(b: Branch):
if b.path:
return len(b) # score is the length of the branch: bigger = better
return 1_000_000_000 + len(b.path) # score contiguous branches highest. again, bigger = better
parent_branches.sort(key=branch_score)
parent = parent_branches[-1] # highest score
return [cur, *build_fork(parent)]
fork = Fork(build_fork(branch))
state_log.info(f'added branch {fork}')
return fork
def delete_block(self, block: Union[Block, Fork, bytes]):
""" if there was an error during block processing, we need to remove the incomplete block data """
def remove_branch(self, branch: Branch, *, remove_series_diffs=True):
del self.branches_by_id[branch.id]
by_height = self.branches_by_height.get(branch.height)
if by_height is not None:
by_height.remove(branch)
if len(by_height) == 0:
# garbage collect empty arrays
del self.branches_by_height[branch.height]
try:
block = block.hash
except AttributeError:
pass
try:
del self.by_hash[block]
except KeyError:
pass
try:
del self.diffs_by_hash[block]
except KeyError:
pass
try:
del self.ancestors[block]
del self.unloads[branch.id]
except KeyError:
pass
diffs = self.diffs_by_branch.pop(branch.id, [])
if remove_series_diffs: # this will be False for promoted branches
for diff in diffs:
difflist = self.diffs_by_series.get(diff.series,{}).get(diff.key)
if difflist is not None:
difflist.remove(diff.entry)
state_log.info(f'removed branch {branch}'+ ('' if remove_series_diffs else ' (promoting)'))
def fork(self, block: Block):
if block.hash == self.root_block.hash:
return Fork([block.hash], height=block.height)
if block.height - self.ancestors[block.hash].height > 1:
return DisjointFork(block, self.root_block)
def ancestors():
bh = block.hash
while True:
yield bh
if bh == self.root_block.hash:
return
bh = self.ancestors[bh].hash
return Fork(ancestors(), height=block.height)
def get(self, fork: Optional[Fork], series, key, default=NARG):
def get(self, fork: Fork, series, key, default=NARG):
series_diffs = self.diffs_by_series.get(series)
if series_diffs is None:
if default is NARG:
@@ -126,140 +185,147 @@ class BlockState:
raise KeyError((series, key))
return default
def _get_from_diffs(self, fork, diffs):
for diff in reversed(diffs):
if diff.height <= self.root_block.height or fork is not None and diff in fork:
def _get_from_diffs(self, fork, diffs: Sequence[DiffEntry] ):
for diff in diffs:
# diffs with old heights are kept around if and only if their branches were promoted, so we can trust them.
if self._fork_has_diff(fork, diff):
if diff.value is DELETE:
break
else:
if fork and self.root_block not in fork: # todo move this assertion elsewhere so it runs once per task
raise ValueError(f'Cannot get value for a non-root fork {hexstr(fork.hash)}')
return diff.value
return DELETE
def set(self, fork: Optional[Fork], series, key, value, overwrite=True):
diffs = self.diffs_by_series[series][key]
if overwrite or self._get_from_diffs(fork, diffs) is DELETE and value is not DELETE:
diff = DiffEntry(value,
fork.height if fork is not None else self.root_block.height,
fork.hash if fork is not None else self.root_block.hash)
if fork is not None:
self.diffs_by_hash[fork.hash].append(DiffEntryItem(series, key, diff))
diffs.add(diff)
def set(self, fork: Fork, series, key, value, overwrite=True):
# first look for an existing value
branch = fork.branch
diffs = self.diffs_by_series.get(series,{}).get(key)
old_value = DELETE
if diffs:
for diff in diffs:
if diff.branch_id == branch.id:
# if there's an existing value for this branch, we replace it
diff.value = value
return
elif self._fork_has_diff(fork, diff):
# if there's an existing value on this fork, remember it
old_value = diff.value
break
if not overwrite:
overwrite = value != old_value
if overwrite:
if diffs is None:
diffs = self.diffs_by_series[series][key] = SortedList(key=lambda x: -x.height)
diff = DiffEntry(value, branch.height, branch.id)
diffs.add(diff)
self.diffs_by_branch[branch.id].append(DiffEntryItem(series, key, diff))
def unload(self, fork: Optional[Fork], series, key):
self.unloads[fork.hash].append((series, key))
self.unloads[fork.branch_id].append((series, key))
def iteritems(self, fork: Optional[Fork], series):
for k, difflist in self.diffs_by_series.get(series, {}).items():
for diff in reversed(difflist):
if diff.height <= self.root_block.height or fork is not None and diff in fork:
for diff in difflist:
if self._fork_has_diff(fork, diff):
if diff.value is not DELETE:
yield k, diff.value
break
def iterkeys(self, fork: Optional[Fork], series):
for k, difflist in self.diffs_by_series.get(series, {}).items():
for diff in reversed(difflist):
if diff.height <= self.root_block.height or fork is not None and diff in fork:
for diff in difflist:
if self._fork_has_diff(fork, diff):
if diff.value is not DELETE:
yield k
break
def itervalues(self, fork: Optional[Fork], series):
for k, difflist in self.diffs_by_series.get(series, {}).items():
for diff in reversed(difflist):
if diff.height <= self.root_block.height or fork is not None and diff in fork:
for diff in difflist:
if self._fork_has_diff(fork, diff):
if diff.value is not DELETE:
yield diff.value
break
def promote_root(self, new_root_fork: Fork):
block = self.by_hash[new_root_fork.hash]
diffs = self.collect_diffs(block)
# no application of diffs to the internal state is required, just clean up
def _fork_has_diff(self, fork: Optional[Fork], diff: DiffEntry):
# promotion removes diffs from any abandoned branches, so if a diff has a height at least as old as
# the current root branch, then it is known to be a finalized true value for all current forks
return diff.height <= self.root_branch.height or fork is not None and diff.branch_id in fork.branch_ids
# walk the by_height list to delete any aged-out block data
def promote_root(self, fork: Fork):
"""
Fork must be based off the root branch.
The root branch is advanced to be the latest branch in the fork.
Old branches whose height is less than or equal to the new root's height are garbage collected and discarded.
Old braches that are not part of the promotion fork have their diffs discarded as well, such that any
diffs remaining in the diffs_by_series structure that are at least as old as the root branch are known to have
been finalized on chain and are valid data. Thus, the series diffs structure will always have at
least one diff, possibly ancient, representing the latest set value for each key. If promote_root detects that
newly promoted diffs make an older diff no longer relevant, that old diff is finally garbage collected.
Returns the set of diffs for the promoted fork.
"""
found_root = False
promotion_branches = []
for branch in reversed(fork.branches):
if branch == self.root_branch:
found_root = True
elif found_root:
promotion_branches.append(branch)
assert found_root
if not promotion_branches:
state_log.debug('null promotion ignored')
return None
# diffs are ordered from oldest branch to newest, using insertion order within each branch
diffs = [d for b in promotion_branches for d in self.diffs_by_branch.get(b.id, [])]
# walk the branches_by_height list to delete any aged-out block data
# in order to prune diffs_by_series, updated_keys remembers all the keys that were touched by any aged-out block
series_deletions = []
updated_keys = set()
while self.by_height and self.by_height[0].height <= block.height:
dead = self.by_height.pop(0)
if dead is not block:
try:
del self.by_hash[dead.hash]
except KeyError:
pass
block_diffs = self.diffs_by_hash.get(dead.hash)
if block_diffs is not None:
for d in block_diffs:
if d.key == BlockState._DELETE_SERIES_KEY and dead.hash in new_root_fork:
series_deletions.append(d.series)
else:
updated_keys.add((d.series, d.key))
del self.diffs_by_hash[dead.hash]
try:
del self.ancestors[dead.hash]
except KeyError:
pass # todo is this bad?
for height in range(self.root_branch.height + 1, fork.height + 1):
for old in self.branches_by_height.pop(height, []):
# remove diffs if the branch is not in the promotion fork
self.remove_branch(old, remove_series_diffs=old not in fork.branches)
# prune diffs_by_series by removing old series diffs that have been superceded by new diffs
for s, k in updated_keys:
# old diffs from non-promotion branches have been removed. now we scan all the keys changed by the promotion
# fork to see if there are still extra-old diffs we can garbage collect out of diffs_by_series
for s, k in set((d.series, d.key) for d in diffs):
difflist = self.diffs_by_series[s][k]
# remove old diffs on abandoned forks but keep old diffs on the root fork
removals = None
for d in difflist:
if d.height <= new_root_fork.height and d not in new_root_fork:
if removals is None:
removals = [d]
else:
removals.append(d)
if removals is not None:
for r in removals:
difflist.remove(r)
# while the second-oldest diff is still root-age, pop off the oldest diff
while len(difflist) >= 2 and difflist[1].height <= new_root_fork.height:
difflist.pop(0)
# if the second-oldest diff is at least root-age, we don't need the oldest diff
while len(difflist) >= 2 and difflist[-2].height <= fork.height:
difflist.pop()
# if only one diff remains, and it's old, and it's a delete, then we can actually delete the diff list
if not difflist or len(difflist) == 1 and difflist[0].value == DELETE and difflist[0].height <= new_root_fork.height:
if not difflist or len(difflist) == 1 and difflist[0].value == DELETE and difflist[0].height <= fork.height:
del self.diffs_by_series[s][k]
if block.hash in self.unloads:
key_unloads = self.unloads.pop(block.hash)
# series unloads
for branch_id in fork.branch_ids:
key_unloads = self.unloads.pop(branch_id, [])
for s,k in key_unloads:
try:
log.debug(f'unloading ${s} {k}')
del self.diffs_by_series[s][k]
except KeyError:
pass
for s in series_deletions:
del self.diffs_by_series[s]
self.root_block = block
log.debug(f'promoted root {self.root_block}')
self.root_branch = fork.branch
state_log.info(f'promoted {self.root_branch.height} '+(hexstr(self.root_branch.path[0])[:7]+' ' if self.root_branch.path else '')+' '.join(str(b) for b in reversed(promotion_branches)))
return diffs
_DELETE_SERIES_KEY = '!^DEXORDER_DELETE_SERIES^!'
def delete_series(self, fork: Optional[Fork], series: str):
"""
deletes the series entirely. the deletion is part of the blockstate, so the series could remain active on some branches until the
series deletion matures into finality.
"""
self.set(fork, series, BlockState._DELETE_SERIES_KEY, None) # setting any value on this special key will trigger a delete when this block finalizes
def collect_diffs(self, block: Block, series_key=NARG) -> list[DiffEntryItem]:
"""
returns a list of the latest DiffItem for each key change along the ancestor path from block to root
"""
# first collect the exhaustive list of diffs along the ancestry path
diff_lists: list[list[DiffEntryItem]] = []
while block.height > self.root_block.height:
diffs = self.diffs_by_hash.get(block.hash)
if diffs:
if series_key is not NARG:
diffs = [d for d in diffs if d.series == series_key]
diff_lists.append(diffs)
block = self.ancestors[block.hash]
difflist = list(itertools.chain(*reversed(diff_lists)))
return compress_diffs(difflist)
# old code that would remove a series entirely upon promotion of the branch that deleted it
# _DELETE_SERIES_KEY = '!^DEXORDER_DELETE_SERIES^!'
# def delete_series(self, fork: Optional[Fork], series: str):
# """
# deletes the series entirely. the deletion is part of the blockstate, so the series could remain active on some branches until the
# series deletion matures into finality.
# """
# self.set(fork, series, BlockState._DELETE_SERIES_KEY, None) # setting any value on this special key will trigger a delete when this block finalizes
# noinspection PyMethodMayBeStatic
@@ -273,20 +339,6 @@ class FinalizedBlockState:
self.data = {}
self.by_hash = {}
def add_block(self, block: Block) -> Optional[Fork]:
self.by_hash[block.hash] = block
return self.fork(block)
def delete_block(self, block: Union[Block, Fork, bytes]):
blockhash = block if isinstance(block, bytes) else block.hash
try:
del self.by_hash[blockhash]
except KeyError:
pass
def fork(self, block: Block):
return Fork([block.hash], height=block.height)
def get(self, _fork: Optional[Fork], series, key, default=NARG):
result = self.data.get(series,{}).get(key, default)
if result is NARG:

View File

@@ -1,8 +0,0 @@
from .schema import TokenConfig
default_token_config = [
# TokenConfig('Wrapped Matic', 'WMATIC', 18, 'Polygon', '0x0d500B1d8E8eF31E21C99d1Db9A6444d3ADf1270', abi='WMATIC'),
# TokenConfig('Wrapped Ethereum','WETH', 18, 'Polygon', '0x7ceB23fD6bC0adD59E62ac25578270cFf1b9f619'),
# TokenConfig('Wrapped Bitcoin', 'WBTC', 8, 'Polygon', '0x1BFD67037B42Cf73acF2047067bd4F2C47D9BfD6'),
# TokenConfig('USD Coin', 'USDC', 6, 'Polygon', '0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174'),
]

View File

@@ -19,7 +19,7 @@ def get_contract_data(name):
def get_deployment_address(deployment_name, contract_name, *, chain_id=None):
if chain_id is None:
chain_id = current_chain.get().chain_id
chain_id = current_chain.get().id
with open(f'../contract/broadcast/{deployment_name}.sol/{chain_id}/run-latest.json', 'rt') as file:
data = json.load(file)
for tx in data.get('transactions',[]):

View File

@@ -3,12 +3,12 @@ import logging
from typing import Optional
import eth_account
from web3.exceptions import BadFunctionCallOutput, Web3Exception
from web3.exceptions import Web3Exception
from web3.types import TxReceipt
from dexorder import current_w3
from dexorder.base.account import current_account
from dexorder.database.model.block import current_block
from dexorder.blockstate.fork import current_fork
from dexorder.util import hexstr
log = logging.getLogger(__name__)
@@ -49,8 +49,8 @@ class DeployTransaction (ContractTransaction):
def call_wrapper(addr, name, func):
async def f(*args, **kwargs):
try:
blockhash = hexstr(current_block.get().hash)
except LookupError:
blockhash = hexstr(current_fork.get().head)
except (LookupError, AttributeError):
blockhash = 'latest'
try:
return await func(*args).call(block_identifier=blockhash, **kwargs)

View File

@@ -3,7 +3,7 @@ import logging
from eth_abi.exceptions import InsufficientDataBytes
from web3.exceptions import ContractLogicError, BadFunctionCallOutput
from dexorder import db, dec
from dexorder import db
from dexorder.contract import ERC20
log = logging.getLogger(__name__)
@@ -14,6 +14,7 @@ async def token_decimals(addr):
try:
return db.kv[key]
except KeyError:
# noinspection PyBroadException
try:
decimals = await ERC20(addr).decimals()
except (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput):

View File

@@ -34,7 +34,7 @@ def _load_chain(chain_id: int):
def get_by_chain(d):
chain_id = current_chain.get().chain_id
chain_id = current_chain.get().id
try:
return d[chain_id]
except KeyError:

View File

@@ -6,6 +6,7 @@ from sqlalchemy.dialects.postgresql import BYTEA, JSONB
from web3 import Web3
from dexorder import Fixed2 as NativeFixed, Blockchain as NativeBlockchain
from dexorder.util import hexstr, hexbytes
class Address(TypeDecorator):
@@ -24,7 +25,7 @@ class Blockchain(TypeDecorator):
cache_ok = True
def process_bind_param(self, value: NativeBlockchain, dialect):
return value.chain_id
return value.id
def process_result_value(self, value: int, dialect):
return NativeBlockchain.for_id(value)
@@ -85,3 +86,14 @@ def DataclassDict(constructor):
result = DataclassDictBase()
result.Constructor = constructor
return result
class BytesList(TypeDecorator):
impl = JSONB
def process_bind_param(self, value, dialect):
return [hexstr(b) for b in value]
def process_result_value(self, result, dialect):
return [hexbytes(s) for s in result]

View File

@@ -1,8 +1,10 @@
from .base import Base
from .kv import KeyValue
from .block import Block
from .series import SeriesSet, SeriesDict
from .transaction import Transaction, TransactionJob
from .orderindex import OrderIndex
from .pool import Pool
from .token import Token
class Block: pass

View File

@@ -1,36 +0,0 @@
from contextvars import ContextVar
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from dexorder.database.model import Base
from dexorder.util import hexint, Field, hexstr
class Block(Base):
@staticmethod
def from_data(chain_id:int, data:dict):
""" Builds a Block using the response data from an RPC server """
return Block(chain=chain_id, height=data['number'] if type(data['number']) is int else int(data['number'],0),
hash=hexstr(data['hash']), parent=hexstr(data['parentHash']), data=data)
chain: Mapped[int] = mapped_column(primary_key=True)
height: Mapped[int] = mapped_column(primary_key=True)
hash: Mapped[bytes] = mapped_column(primary_key=True)
parent: Mapped[bytes]
data: Mapped[dict] = mapped_column(JSONB)
@property
def timestamp(self) -> int:
raw = self.data['timestamp']
# noinspection PyTypeChecker
return raw if type(raw) is int else hexint(raw)
def __str__(self):
return f'{self.height}_{self.hash.hex()[2:7]}'
current_block = ContextVar[Block]('Block.cur') # block for the current thread
latest_block = Field[Block]() # most recent discovered block but maybe not the currently processing one
completed_block = ContextVar[Block]('Block.completed') # most recent fully-processed block

View File

@@ -1,5 +1,5 @@
import logging
from typing import TypedDict, Optional
from typing import TypedDict
from sqlalchemy.orm import Mapped, mapped_column

View File

@@ -8,13 +8,13 @@ from dexorder import current_pub, db, from_timestamp, minutely
from dexorder.base.chain import current_chain, current_clock
from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey
from dexorder.blocks import get_block_timestamp
from dexorder.blockstate.fork import current_fork
from dexorder.ohlc import ohlcs, recent_ohlcs
from dexorder.transaction import submit_transaction_request
from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data
from dexorder.contract.dexorder import vault_address, VaultContract
from dexorder.contract import ERC20
from dexorder.vault_blockdata import vault_owners, vault_balances, adjust_balance
from dexorder.database.model.block import current_block
from dexorder.database.model.transaction import TransactionJob
from dexorder.base.orderlib import SwapOrderState
from dexorder.order.orderstate import Order
@@ -50,7 +50,7 @@ async def handle_order_placed(event: EventData):
num_orders = int(event['args']['numOrders'])
log.debug(f'DexorderPlaced {addr} {start_index} {num_orders}')
if addr not in vault_owners:
log.warning(f'block {current_block.get()} order from unknown vault {addr}') # todo insert (short) block hash into all logs
log.warning(f'block {current_fork.get().head} order from unknown vault {addr}') # todo insert (short) block hash into all logs
# return todo always discard rogues
# noinspection PyBroadException
try:
@@ -155,7 +155,7 @@ async def handle_uniswap_swap(swap: EventData):
pool, time, price = data
addr = pool['address']
pool_prices[addr] = price
ohlcs.update_all(addr, time, price)
await ohlcs.update_all(addr, time, price)
log.debug(f'pool {addr} {minutely(time)} {price}')
@@ -178,7 +178,7 @@ def handle_vault_created(created: EventData):
else:
break
# log.debug(f'updated vaults: {vaults}')
current_pub.get()(f'{current_chain.get().chain_id}|{owner}', 'vaults', vaults)
current_pub.get()(f'{current_chain.get().id}|{owner}', 'vaults', vaults)
async def activate_time_triggers():
@@ -208,7 +208,7 @@ async def activate_price_triggers():
async def process_active_tranches():
for tk, proof in active_tranches.items():
old_req = execution_requests.get(tk)
height = current_block.get().height
height = current_fork.get().height
if old_req is None or old_req.height <= height: # '<=' is used so proof is updated with more recent values
if await has_funds(tk):
log.info(f'execution request for {tk}')
@@ -234,7 +234,7 @@ async def has_funds(tk: TrancheKey):
async def process_execution_requests():
height = current_block.get().height
height = current_fork.get().height
execs = {} # which requests to act on
for tk, er in execution_requests.items():
tk: TrancheKey
@@ -311,13 +311,13 @@ def finish_execution_request(req: TrancheExecutionRequest, error: str):
last_ohlc_rollover = 0
def check_ohlc_rollover():
async def check_ohlc_rollover():
global last_ohlc_rollover
time = current_block.get().timestamp
time = await get_block_timestamp(current_fork.get().head)
dt = from_timestamp(time)
diff = time - last_ohlc_rollover
if diff >= 60 or dt.minute != from_timestamp(last_ohlc_rollover).minute:
for (symbol, period) in recent_ohlcs.keys():
ohlcs.update(symbol, period, dt)
await ohlcs.update(symbol, period, dt)
last_ohlc_rollover = time

View File

@@ -7,12 +7,11 @@ from socket_io_emitter import Emitter
from dexorder import DELETE
from dexorder.base.chain import current_chain
from dexorder.base.fork import current_fork
from dexorder.blockstate import DiffItem, DataType, BlockState
from dexorder.blockstate.blockdata import SeriesCollection, BlockData
from dexorder.blockstate.diff import DiffEntryItem
from dexorder.blockstate.fork import Fork
from dexorder.blockstate.state import compress_diffs
from dexorder.database.model import Block
from dexorder.memcache import current_redis, memcache
from dexorder.util.async_util import maywait
from dexorder.util.json import json_encoder
@@ -26,29 +25,28 @@ class RedisState (SeriesCollection):
super().__init__(series_or_datavars)
self.exists:set[str] = set()
# noinspection PyMethodMayBeStatic
async def clear(self):
log.debug('clearing memcache')
r = current_redis.get()
await r.delete(*[f'{current_chain.get().chain_id}|{k}' for k in ['latest_block', *self.datas.keys()]])
await r.delete(*[f'{current_chain.get().id}|{k}' for k in ['latest_block', *self.datas.keys()]])
async def init(self, state: BlockState):
fork = current_fork.get()
async def init(self, state: BlockState, fork: Fork):
await self.clear()
diffs = []
for series in self.datas.keys():
for k, v in state.iteritems(fork, series):
diffs.append(DiffItem(series, k, v))
await self.save(state.root_block, diffs)
await self.save(fork, diffs)
# noinspection PyAsyncCall
async def save(self, block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
async def save(self, fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
# the diffs must be already compressed such that there is only one action per key
chain = current_chain.get()
assert block.chain == chain.chain_id
chain_id = chain.chain_id
chain_id = chain.id
sadds: dict[str,set[str]] = defaultdict(set)
sdels: dict[str,set[str]] = defaultdict(set)
hsets: dict[str,dict[str,str]] = defaultdict(dict)
@@ -101,9 +99,9 @@ class RedisState (SeriesCollection):
r.hset(series, mapping=kvs)
for series, keys in hdels.items():
r.hdel(series, *keys)
block_series = f'{chain_id}|block.latest'
r.json(json_encoder).set(block_series,'$',block.data)
pubs.append((str(chain_id), 'block.latest', [block.data]))
block_series = f'{chain_id}|head'
r.json(json_encoder).set(block_series,'$',[fork.height, fork.head])
pubs.append((str(chain_id), 'head', [fork.height, fork.head]))
# separate batch for pubs
if pubs:
await publish_all(pubs)

View File

@@ -123,7 +123,7 @@ def is_generating_metadata():
# noinspection PyShadowingNames
def generate_metadata(tokens: Iterable[Union[Token, TokenDict]], pools: Iterable[Union[Pool, PoolDict]],
file=sys.stdout):
dump(file, '{"'+str(current_chain.get().chain_id)+'":{"t":[')
dump(file, '{"' + str(current_chain.get().id) + '":{"t":[')
dump_tokens(file, tokens)
dump(file, '],"p":[')
dump_pools(file, pools)
@@ -135,7 +135,7 @@ metadata_by_chainaddr = {}
def get_metadata(addr=None, *, chain_id=None):
if chain_id is None:
chain_id = current_chain.get().chain_id
chain_id = current_chain.get().id
global metadata
if metadata is NARG:
if config.metadata is None or generating_metadata:

View File

@@ -8,9 +8,10 @@ from cachetools import LFUCache
from dexorder import dec, config, from_timestamp, timestamp, now, minutely
from dexorder.base.chain import current_chain
from dexorder.blocks import get_block_timestamp
from dexorder.blockstate import BlockDict, DiffItem, current_blockstate
from dexorder.blockstate.diff import DiffEntryItem
from dexorder.database.model import Block
from dexorder.blockstate.fork import Fork, current_fork
from dexorder.util import json
from dexorder.util.shutdown import fatal
@@ -184,18 +185,18 @@ class OHLCKey (NamedTuple):
def quotes_path(chain_id: int = None):
if chain_id is None:
chain_id = current_chain.get().chain_id
chain_id = current_chain.get().id
return f'{chain_id}/quotes.json'
def series_path(chain_id: int = None):
if chain_id is None:
chain_id = current_chain.get().chain_id
chain_id = current_chain.get().id
return f'{chain_id}/series.json'
def chunk_path(symbol: str, period: timedelta, time: datetime, *, chain_id: int = None) -> str:
if chain_id is None:
chain_id = current_chain.get().chain_id
chain_id = current_chain.get().id
start = ohlc_start_time(time, period)
name = period_name(period)
return f'{chain_id}/{symbol}/{name}/' + (
@@ -215,7 +216,7 @@ class Chunk:
self.repo_dir = repo_dir
self.symbol = symbol
self.period = period
self.chain_id = chain_id if chain_id is not None else current_chain.get().chain_id
self.chain_id = chain_id if chain_id is not None else current_chain.get().id
self.path = chunk_path(symbol, period, time, chain_id=chain_id)
self.fullpath = os.path.join(repo_dir, self.path)
if bars is not None:
@@ -299,7 +300,7 @@ class OHLCRepository:
@property
def chain_id(self):
return self._chain_id if self._chain_id is not None else current_chain.get().chain_id
return self._chain_id if self._chain_id is not None else current_chain.get().id
@property
def dir(self):
@@ -332,12 +333,12 @@ class OHLCRepository:
if (symbol, period) not in recent_ohlcs:
recent_ohlcs[(symbol, period)] = []
def update_all(self, symbol: str, time: datetime, price: dec, *, create: bool = True):
async def update_all(self, symbol: str, time: datetime, price: dec, *, create: bool = True):
""" the update_all() and update() methods generate bars for the recent_ohlcs BlockDict """
for period in OHLC_PERIODS:
self.update(symbol, period, time, price, create=create)
await self.update(symbol, period, time, price, create=create)
def update(self, symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, *, create: bool = True) \
async def update(self, symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, *, create: bool = True) \
-> Optional[list[NativeOHLC]]:
"""
if price is None, then bars are advanced based on the time but no new price is added to the series.
@@ -363,11 +364,14 @@ class OHLCRepository:
updated = update_ohlc(historical[-1], period, time, price)
# drop any historical bars that are older than we need
# oldest_needed = cover the root block time plus one period prior
oldest_needed = from_timestamp(current_blockstate.get().root_block.timestamp) - period
# noinspection PyTypeChecker
trim = (oldest_needed - historical[0].start) // period
if trim > 0:
historical = historical[trim:]
root_hash = current_blockstate.get().root_branch.head
if root_hash is not None:
root_timestamp = await get_block_timestamp(root_hash)
oldest_needed = from_timestamp(root_timestamp) - period
# noinspection PyTypeChecker
trim = (oldest_needed - historical[0].start) // period
if trim > 0:
historical = historical[trim:]
# now overlap the updated data on top of the historical data
if not historical or not updated:
@@ -403,7 +407,7 @@ class OHLCRepository:
self.dirty_chunks.add(chunk)
def get_chunk(self, symbol: str, period: timedelta, start_time: datetime) -> Chunk:
chain_id = current_chain.get().chain_id
chain_id = current_chain.get().id
key = chunk_path(symbol, period, start_time, chain_id=chain_id)
found = self.cache.get(key)
if found is None:
@@ -476,7 +480,7 @@ class FinalOHLCRepository (OHLCRepository):
bar = self.current[key] = NativeOHLC(start, price, price, price, close)
chunk.update(bar, backfill=backfill)
self.dirty_chunks.add(chunk)
chain_id_str = str(current_chain.get().chain_id)
chain_id_str = str(current_chain.get().id)
if chain_id_str not in self.series:
self.series[chain_id_str] = {}
self.series[chain_id_str][f'{key[0]}|{period_name(key[1])}'] = {'start': timestamp(start)}
@@ -511,7 +515,7 @@ def save_json(obj, filename):
def pub_ohlc(_series:str, key: OHLCKey, bars: list[NativeOHLC]):
pool_addr, period = key
chain_id = current_chain.get().chain_id
chain_id = current_chain.get().id
period = period_name(period)
key = f'{pool_addr}|{period}'
return f'{chain_id}|{key}', 'ohlc', (chain_id, key, [b.ohlc for b in bars])
@@ -523,7 +527,7 @@ def ohlc_str_to_key(s):
pool, period = s.split('|')
return pool, period_from_name(period)
def ohlc_save(_block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
def ohlc_save(_fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
"""
used as a finalization callback from BlockState data.
"""

View File

@@ -196,7 +196,7 @@ class Order:
log.debug(f'pub order status {_s} {k} {v}')
# publish status updates (on placing and completion) to web clients
try:
chain_id = current_chain.get().chain_id
chain_id = current_chain.get().id
return (f'{chain_id}|{vault_owners[k.vault]}', # publish on the vault owner's channel
'o', # order message type
(chain_id, k.vault, k.order_index, v.dump())) # (order_index, order_status)
@@ -213,7 +213,7 @@ class Order:
if v is DELETE:
return None
try:
chain_id = current_chain.get().chain_id
chain_id = current_chain.get().id
return (f'{chain_id}|{vault_owners[k.vault]}', # publish on the vault owner's channel
'of', # order message type
(chain_id, k.vault, k.order_index, v.dump())) # (order_index, order_fills)
@@ -234,6 +234,7 @@ class Order:
if oi:
oi.state = status.state
else:
order_log.debug(f'saving OrderIndex {key} {status.state}')
oi = OrderIndex(chain=current_chain.get(), vault=key.vault, order_index=key.order_index, state=status.state)
sess.add(oi)

View File

@@ -31,7 +31,7 @@ async def get_pool(address: str) -> PoolDict:
async def load_pool(address: str) -> PoolDict:
found = None
chain_id = current_chain.get().chain_id
chain_id = current_chain.get().id
# todo other exchanges
try:
v3 = UniswapV3Pool(address)
@@ -72,7 +72,7 @@ class PoolPrices (BlockDict[str, dec]):
def pub_pool_price(_s,k,v):
chain_id = current_chain.get().chain_id
chain_id = current_chain.get().id
return f'{chain_id}|{k}', 'p', (chain_id, k, str(v))

View File

@@ -101,8 +101,8 @@ class BlockProgressor(metaclass=ABCMeta):
for log_event in log_events:
try:
parsed = event.process_log(log_event) if event is not None else log_event
except (LogTopicError, MismatchedABI) as e:
# log.debug(f'logevent parse error {e}\n{log_event}') # this happens for Swap events from non-Uniswap pools
except (LogTopicError, MismatchedABI):
# this happens for Swap events from non-Uniswap pools
parsed = NARG # need a placeholder
parsed_events.append(parsed)
# todo try/except for known retryable errors

View File

@@ -1,69 +1,42 @@
import asyncio
import logging
from asyncio import Queue
from typing import Any, Iterable, Callable
from asyncio import Event
from datetime import timedelta
from typing import Any, Iterable, Callable, Optional
from eth_bloom import BloomFilter
from web3.exceptions import LogTopicError, MismatchedABI
# noinspection PyPackageRequirements
from websockets.exceptions import ConnectionClosedError
from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, NARG
from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now
from dexorder.base.block import Block, latest_block
from dexorder.base.chain import current_chain, current_clock, BlockClock
from dexorder.base.fork import current_fork, Fork, DisjointFork
from dexorder.blockchain.connection import create_w3_ws, create_w3
from dexorder.blocks import get_block
from dexorder.blocks import cache_block, get_block, promotion_height
from dexorder.blockstate import BlockState, current_blockstate
from dexorder.blockstate.branch import Branch
from dexorder.blockstate.diff import DiffEntryItem
from dexorder.database.model import Block
from dexorder.database.model.block import current_block, latest_block
from dexorder.blockstate.fork import current_fork, Fork
from dexorder.progressor import BlockProgressor
from dexorder.transaction import create_and_send_transactions
from dexorder.util import hexstr, hexint, hexbytes
from dexorder.util import hexstr, hexbytes, hexint
from dexorder.util.async_util import maywait, Maywaitable
from dexorder.util.shutdown import fatal
log = logging.getLogger(__name__)
class Retry (Exception): ...
# todo detect reorgs and generate correct onHeadUpdate set by unioning the changes along the two forks, not including their common ancestor deltas
class BlockStateRunner(BlockProgressor):
"""
NOTE: This doc is old and not strictly true but still has the basic idea
1. load root stateBlockchain
a. if no root, init from head
b. if root is old, batch forward by height
2. discover new heads
2b. find in-state parent block else use root
3. set the current fork = ancestor->head diff state
4. query blockchain eventlogs
5. process new vaults
6. process new orders and cancels
a. new pools
7. process Swap events and generate pool prices
8. process price horizons
9. process token movement
10. process swap triggers (zero constraint tranches)
11. process price tranche triggers
12. process horizon tranche triggers
13. filter by time tranche triggers
14. bundle execution requests and send tx. tx has require(block<deadline) todo execute deadlines
15. on tx confirmation, the block height of all executed trigger requests is set to the tx block
Most of these steps, the ones handling events, are set up in main.py so that datamain.py can also use Runner for its own purposes
"""
def __init__(self, state: BlockState = None, *, publish_all=None, timer_period: float = 1):
"""
If state is None, then it is initialized as empty using the first block seen as the root block. Then the second block begins log event handling.
If state is None, then it is initialized as empty using the first block seen as the root block.
The Runner has two processes: one process either listens on websockets for new heads or polls for new heads
and their ancestors
"""
super().__init__()
self.state = state
@@ -72,27 +45,26 @@ class BlockStateRunner(BlockProgressor):
self.on_state_init: list[Callable[[],Maywaitable[None]]] = []
self.state_initialized = False
# set by the block-querying process whenever a new head is set on latest_blocks[chain_id]
self.new_head_event = Event()
# onHeadUpdate callbacks are invoked with a list of DiffItems used to update the head state from either the previous head or the root
self.on_head_update: list[Callable[[Block,list[DiffEntryItem]],Maywaitable[None]]] = []
self.on_head_update: list[Callable[[Fork, list[DiffEntryItem]],Maywaitable[None]]] = []
# onPromotion callbacks are invoked with a list of DiffItems used to advance the root state
self.on_promotion: list[Callable[[Block,list[DiffEntryItem]],Maywaitable[None]]] = []
self.on_promotion: list[Callable[[Fork, list[DiffEntryItem]],Maywaitable[None]]] = []
self.publish_all: Callable[[Iterable[tuple[str,str,Any]]],Maywaitable[None]] = publish_all
self.timer_period = timer_period
self.queue: Queue = Queue()
self.max_height_seen = config.backfill if config.backfill is None or config.backfill >= 0 \
else current_block.get().height + config.backfill # if backfill is negative then it's relative to the current block
self.timer_period = timer_period # todo deprecated?
self.running = False
async def run(self):
# this run() process discovers new heads and puts them on a queue for the worker to process. the discovery is ether websockets or polling
if self.state:
self.max_height_seen = max(self.max_height_seen, self.state.root_block.height)
self.running = True
# this run() process discovers new heads and puts them on a queue for the worker to process
_worker_task = asyncio.create_task(self.worker())
return await (self.run_polling() if config.polling > 0 or not config.ws_url else self.run_ws())
async def run_ws(self):
@@ -101,26 +73,28 @@ class BlockStateRunner(BlockProgressor):
chain = Blockchain.for_id(chain_id)
current_chain.set(chain)
# this run() process discovers new heads and puts them on a queue for the worker to process
_worker_task = asyncio.create_task(self.worker())
while self.running:
# noinspection PyBroadException
try:
async with w3ws as w3ws:
log.debug('connecting to ws provider')
await w3ws.provider.connect()
subscription = await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc. do not use it.
subscription = await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc.
log.debug(f'subscribed to newHeads {subscription}')
while self.running:
async for message in w3ws.ws.process_subscriptions():
head = message['result']
log.debug(f'detected new block {head["number"]} {hexstr(head["hash"])}')
await self.add_head(head)
block = Block(chain_id, message['result'])
cache_block(block)
latest_block[chain_id] = block
self.new_head_event.set()
log.debug(f'detected new head {block}')
if not self.running:
break
await async_yield()
except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e:
log.debug(f'runner timeout {e}')
except:
log.exception(f'Unhandled exception during run_polling()')
finally:
# noinspection PyBroadException
try:
@@ -128,7 +102,6 @@ class BlockStateRunner(BlockProgressor):
await w3ws.provider.disconnect()
except Exception:
pass
log.debug('yield')
log.debug('runner run_ws() exiting')
@@ -138,86 +111,96 @@ class BlockStateRunner(BlockProgressor):
https://github.com/NomicFoundation/hardhat/issues/2053
So we implement polling as a workaround.
"""
assert config.polling > 0
w3 = await create_w3()
chain_id = await w3.eth.chain_id
chain = Blockchain.for_id(chain_id)
current_chain.set(chain)
_worker_task = asyncio.create_task(self.worker())
prev_blockhash = None
while self.running:
try:
# polling mode is used primarily because Hardhat fails to deliver newHeads events after about an hour
# unfortunately, hardhat also stops responding to eth_getBlockByHash. so instead, we use the standard (stupid)
# 'latest' polling for blocks, and we push the entire block to the queue since apparently this is the only
# rpc call Hardhat seems to consistently support. The worker must then detect the type of object pushed to the
# work queue and either use the block directly or query for the block if the queue object is a hashcode.
block = await w3.eth.get_block('latest')
head = block['hash']
if head != prev_blockhash:
prev_blockhash = head
log.debug(f'polled new block {hexstr(head)}')
await self.add_head(block)
if not self.running:
break
await asyncio.sleep(config.polling)
except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e:
log.debug(f'runner timeout {e}')
finally:
next_poll = now()
try:
while self.running:
sleep = (next_poll - now()).total_seconds()
if sleep > 0:
await asyncio.sleep(sleep)
next_poll = now() + timedelta(seconds=config.polling)
# noinspection PyBroadException
try:
# noinspection PyUnresolvedReferences
await w3.provider.disconnect()
except Exception:
pass
await async_yield()
log.debug('runner run_polling() exiting')
prev_blockhash = await asyncio.wait_for(
self.poll_head(chain, w3, prev_blockhash), timeout=config.polling)
except TimeoutError as e:
log.debug(f'runner timeout {e}')
except (ConnectionClosedError, TimeoutError) as e:
log.debug(f'runner timeout {e}')
except:
log.exception(f'Unhandled exception during run_polling()')
finally:
# noinspection PyBroadException
try:
# noinspection PyUnresolvedReferences
await w3.provider.disconnect()
except Exception:
pass
log.debug('runner run_polling() exiting')
async def poll_head(self, chain, w3, prev_blockhash):
# polling mode is used primarily because Hardhat fails to deliver newHeads events after about an hour
# unfortunately, hardhat also stops responding to eth_getBlockByHash. so instead, we use the standard (stupid)
# 'latest' polling for blocks, and we push the entire block to the queue since apparently this is the only
# rpc call Hardhat seems to consistently support. The worker must then detect the type of object pushed to the
# work queue and either use the block directly or query for the block if the queue object is a hashcode.
blockdata = await w3.eth.get_block('latest')
head = blockdata['hash']
if head == prev_blockhash:
return prev_blockhash
log.debug(f'polled new head {hexstr(head)} {hexint(blockdata["number"])}')
block = Block(chain.id, blockdata)
latest_block[chain.id] = block
# prefetch the head's ancestors
if self.state is not None and self.state.root_branch is not None:
if self.state.root_branch.height >= block.height - chain.confirms * 2:
# prefetch parent blocks back to the root height
cur = block
while self.state.root_branch is not None and cur.height > self.state.root_branch.height:
cur = await get_block(cur.parent, chain_id=chain.id)
self.new_head_event.set()
return head
async def add_head(self, head):
"""
head can either be a full block-data struct or simply a block hash. this method converts it to a Block
and pushes that Block onto the worker queue
"""
chain = current_chain.get()
w3 = current_w3.get()
try:
block_data = head
blockhash = block_data['hash']
parent = block_data['parentHash']
height = block_data['number']
head = Block(chain=chain.chain_id, height=height, hash=blockhash, parent=parent, data=block_data)
except TypeError:
head = await get_block(head)
latest_block.set(head)
async def create_branch(self, chain: Blockchain) -> Optional[Fork]:
if chain.id not in latest_block:
return None
block = latest_block[chain.id]
if self.state or config.backfill:
# backfill batches
start_height = self.max_height_seen
batch_size = config.batch_size if config.batch_size is not None else chain.batch_size
batch_height = start_height + batch_size - 1
while batch_height < head.height:
# the backfill is larger than a single batch, so we push intermediate head blocks onto the queue
response = await w3.provider.make_request('eth_getBlockByNumber', [hex(batch_height), False])
block_data: dict = response['result']
blockhash = bytes.fromhex(block_data['hash'][2:])
parent = bytes.fromhex(block_data['parentHash'][2:])
height = int(block_data['number'], 0)
assert height == batch_height
block = Block(chain=chain.chain_id, height=height, hash=blockhash, parent=parent, data=block_data)
log.debug(f'enqueueing batch backfill from {start_height} through {batch_height}')
await self.queue.put(block) # add an intermediate block
self.max_height_seen = height
start_height += chain.batch_size
batch_height += chain.batch_size
if self.queue.qsize() > 2:
await asyncio.sleep(1)
else:
await async_yield()
await self.queue.put(head) # add the head block
self.max_height_seen = head.height
if self.state is None:
self.state = BlockState()
if self.state.root_branch is None:
# no root branch, so create one from a single block branch
return self.state.add_branch(Branch.from_block(block))
if block.height - self.state.root_branch.height >= chain.confirms * 2:
# create a disjoint backfilling branch
start = self.state.root_branch.height + 1
# do not query more than the chain's batch size
# do not query into the reorgable area. only query finalized data.
height = min( start + chain.batch_size, block.height - chain.confirms)
branch = Branch(height, start) # no parent or path
return self.state.add_branch(branch, strict=False)
# otherwise construct an explicit list of linked blocks from the most recent head to the latest block
heads = self.state.heads
path = [block.hash]
cur = block
while True:
if cur.parent in heads:
branch = Branch( block.height, block.height - len(path) + 1, block.parent, path, chain=chain )
return self.state.add_branch(branch)
if cur.height <= self.state.root_branch.height:
fatal(f'Latest head {block.hash} does not have the root block {self.state.root_branch.head} as a parent')
cur = await get_block(cur.parent)
path.append(cur.hash)
async def worker(self):
@@ -225,68 +208,54 @@ class BlockStateRunner(BlockProgressor):
log.debug(f'runner worker started')
w3 = current_w3.get()
chain = current_chain.get()
assert chain.chain_id == await w3.eth.chain_id
assert chain.id == await w3.eth.chain_id
current_clock.set(BlockClock())
prev_head = None
while self.running:
try:
if self.timer_period:
async with asyncio.timeout(self.timer_period):
head = await self.queue.get()
else:
head = await self.queue.get()
except TimeoutError:
# 1 second has passed without a new head. Run the postprocess callbacks to check for activated time-based triggers
if prev_head is not None:
await self.handle_time_tick(prev_head)
else:
try:
await self.handle_head(chain, head, w3)
prev_head = head
except Retry:
pass
except Exception as x:
log.exception(x)
await self.new_head_event.wait()
except asyncio.CancelledError:
break
self.new_head_event.clear()
try:
fork = await self.create_branch(chain)
except ValueError:
log.warning(f'Could not build a branch back to root! {hexstr(latest_block[chain.id].hash)} <-?- {hexstr(self.state.root_branch.head)}')
continue
# noinspection PyBroadException
try:
if fork is not None:
await self.process(fork)
except:
log.exception('Reverting branch due to exception')
self.state.remove_branch(fork.branch)
except Exception:
log.exception('exception in runner worker')
log.exception('Unhandled exception in runner worker')
raise
finally:
log.debug('runner worker exiting')
async def handle_head(self, chain, block, w3):
# todo refactor this to generate a fork from the latest block back to whatever ancestor it can find
log.debug(f'handle_head {block.height} {hexstr(block.hash)}')
if self.state and block.height <= self.state.root_block.height:
log.debug(f'ignoring old head')
return
async def process(self, fork: Fork):
log.debug(f'processing {fork}')
chain = current_chain.get()
w3 = current_w3.get()
current_blockstate.set(self.state)
current_fork.set(fork)
session = None
batches = []
pubs = []
try:
if self.state is not None and block.hash in self.state.by_hash:
log.debug(f'block {block.hash} was already processed')
return
if self.state is None:
# initialize
self.state = BlockState(block)
current_blockstate.set(self.state)
fork: Fork = Fork([block.hash], height=block.height)
log.info('Created new empty root state')
branch = fork.branch
if branch.disjoint:
# query the entire range (this assumes branch.height is a finalized height)
batches = await self.get_backfill_batches(branch.start, branch.height)
else:
fork = self.state.add_block(block)
if fork is None:
log.debug(f'discarded late-arriving head {block}')
else:
batches: list
from_height = self.state.by_hash[fork.parent].height + 1 if fork.parent is not None else fork.height
to_height = fork.height
if fork.disjoint:
batches = await self.get_backfill_batches(from_height, to_height, w3)
else:
# query every block explicitly
for blockhash in reversed(branch.path):
# event callbacks are triggered in the order in which they're registered. the events passed to
# each callback are in block transaction order
bloom = BloomFilter(int.from_bytes(block.data['logsBloom']))
block = await get_block(blockhash)
bloom = BloomFilter(int.from_bytes(hexbytes(block.data['logsBloom'])))
for callback, event, log_filter in self.events:
if log_filter is None:
batches.append((None, callback, event, None))
@@ -303,63 +272,44 @@ class BlockStateRunner(BlockProgressor):
if not config.parallel_logevent_queries:
get_logs = await get_logs
batches.append((get_logs, callback, event, log_filter))
for callback in self.postprocess_cbs:
batches.append((None, callback, None, None))
for callback in self.postprocess_cbs:
batches.append((None, callback, None, None))
# set up for callbacks
current_block.set(block)
current_fork.set(fork)
session = db.make_session(autocommit=False)
session.begin()
session.add(block)
current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args))) # used by handle_vault_created
if not self.state_initialized:
await self.do_state_init_cbs()
await self.invoke_callbacks(batches)
# set up for callbacks
session = db.make_session(autocommit=False)
session.begin()
current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args))) # used by handle_vault_created
if not self.state_initialized:
await self.do_state_init_cbs()
log.debug(f'invoking callbacks with fork {current_fork.get()}')
await self.invoke_callbacks(batches)
# todo
# IMPORTANT! check for a reorg and generate a reorg diff list. the diff list we need is the union of the set of keys touched by either
# branch. Then we query all the values for those keys and apply that kv list to redis. This will make sure that any orphaned data that
# isn't updated by the new fork is still queried from the root state to overwrite any stale data from the abandoned branch.
diff_items = self.state.diffs_by_hash[block.hash]
for callback in self.on_head_update:
# todo
# IMPORTANT! check for a reorg and generate a reorg diff list. the diff list we need is the union of the set of keys touched by either
# branch. Then we query all the values for those keys and apply that kv list to redis. This will make sure that any orphaned data that
# isn't updated by the new fork is still queried from the root state to overwrite any stale data from the abandoned branch.
diff_items = self.state.diffs_by_branch[fork.branch_id]
for callback in self.on_head_update:
# noinspection PyCallingNonCallable
await maywait(callback(fork, diff_items))
# check for root promotion
promo_height = promotion_height(chain, fork.height)
promotable_branches = [b for b in fork.branches
if self.state.root_branch.height < b.height <= promo_height
or b == self.state.root_branch]
if len(promotable_branches) > 1:
promotion_fork = Fork(promotable_branches)
diff_items = self.state.promote_root(promotion_fork)
for callback in self.on_promotion:
# todo try/except for known retryable errors
# noinspection PyCallingNonCallable
await maywait(callback(block, diff_items))
# check for root promotion
confirm_offset = (config.confirms if config.confirms is not None else chain.confirms) - 1
promotion_height = latest_block.get().height - confirm_offset # todo latest_block should not be a ContextVar but a global dict by chain_id
new_root_fork = None
if fork.disjoint:
fork: DisjointFork
# individually check the fork's head and ancestor
if fork.height <= promotion_height:
new_root_fork = fork
else:
state = current_blockstate.get()
parent_block = fork.root
if parent_block.height <= promotion_height:
new_root_fork = state.fork(parent_block)
else:
fork: Fork
# non-disjoint, contiguous fork
if fork.height <= promotion_height:
new_root_fork = fork
else:
new_root_fork = fork.for_height(promotion_height)
if new_root_fork:
log.debug(f'promoting root {new_root_fork.height} {hexstr(new_root_fork.hash)}')
diff_items = self.state.promote_root(new_root_fork)
for callback in self.on_promotion:
# todo try/except for known retryable errors
# noinspection PyCallingNonCallable
await maywait(callback(self.state.root_block, diff_items))
await maywait(callback(promotion_fork, diff_items))
except: # legitimately catch EVERYTHING because we re-raise
log.debug('rolling back session')
if session is not None:
session.rollback()
if block.hash is not None and self.state is not None:
self.state.delete_block(block.hash)
if config.parallel_logevent_queries:
for get_logs, *_ in batches:
if get_logs is not None:
@@ -390,16 +340,15 @@ class BlockStateRunner(BlockProgressor):
# noinspection PyCallingNonCallable
await maywait(self.publish_all(pubs))
log.info(f'completed block {block}')
log.info(f'completed {fork.branch}')
finally:
db.close_session()
async def handle_time_tick(self, block):
async def handle_time_tick(self, fork: Fork):
# todo re-enable time ticks
if current_blockstate.get() is None:
return
fork = self.state.fork(block)
current_block.set(block)
current_fork.set(fork)
session = db.session
session.begin()
@@ -415,6 +364,7 @@ class BlockStateRunner(BlockProgressor):
finally:
session.close()
async def do_state_init_cbs(self):
if self.state_initialized:
return

View File

@@ -1,4 +1,3 @@
import asyncio
import logging
from typing import Optional
@@ -53,7 +52,7 @@ async def load_token(address: str) -> Optional[TokenDict]:
log.warning(f'token {address} has no decimals()')
decimals = 0
approved = config.metadata is None
chain_id = current_chain.get().chain_id
chain_id = current_chain.get().id
symbol = await symbol_prom
name = await name_prom
td = TokenDict(type='Token', chain=chain_id, address=address,

View File

@@ -7,8 +7,8 @@ from web3.exceptions import TransactionNotFound
from dexorder import db, current_w3
from dexorder.base.chain import current_chain
from dexorder.base.order import TransactionRequest
from dexorder.blockstate.fork import current_fork
from dexorder.contract.contract_proxy import ContractTransaction
from dexorder.database.model.block import current_block
from dexorder.database.model.transaction import TransactionJob, Transaction as DbTransaction, TransactionJobState
log = logging.getLogger(__name__)
@@ -32,7 +32,7 @@ class TransactionHandler:
def submit_transaction_request(tr: TransactionRequest):
job = TransactionJob(id=uuid4(), chain=current_chain.get(), height=current_block.get().height, state=TransactionJobState.Requested, request=tr)
job = TransactionJob(id=uuid4(), chain=current_chain.get(), height=current_fork.get().height, state=TransactionJobState.Requested, request=tr)
db.session.add(job)
return job

View File

@@ -54,7 +54,7 @@ class _UniswapContracts (ByBlockchainDict[ContractProxy]):
'quoter': ContractProxy('0xb27308f9F90D607463bb33eA1BeBb41C27CE5AB6', 'IQuoter'),
'swap_router': ContractProxy('0xE592427A0AEce92De3Edee1F18E0157C05861564', 'ISwapRouter'),
}
super().__init__({chain.chain_id:std for chain in (Ethereum, Polygon, Goerli, Mumbai, Arbitrum, Mock, Alpha)})
super().__init__({chain.id:std for chain in (Ethereum, Polygon, Goerli, Mumbai, Arbitrum, Mock, Alpha)})
uniswapV3 = _UniswapContracts()

View File

@@ -1,11 +1,10 @@
import re
from typing import Callable, TypeVar, Generic, Union, Any
from typing import Callable, TypeVar, Generic, Union
from eth_utils import keccak
from hexbytes import HexBytes
from .async_util import async_yield
from .tick_math import nearest_available_ticks, round_tick, spans_tick, spans_range
def align_decimal(value, left_columns) -> str:
@@ -26,16 +25,18 @@ def hexstr(value: Union[HexBytes, bytes, str]):
elif type(value) is str:
return value if value.startswith('0x') else '0x' + value
else:
raise ValueError
raise ValueError(f'Could not convert hexstr {value}')
def hexbytes(value: Union[str|bytes]):
def hexbytes(value: Union[str,bytes]):
""" converts an optionally 0x-prefixed hex string into bytes """
return value if type(value) is bytes else bytes.fromhex(value[2:] if value.startswith('0x') else value)
return value if type(value) is bytes else \
bytes(value) if type(value) is HexBytes else \
bytes.fromhex(value[2:] if value.startswith('0x') else value)
def hexint(value: str):
return int(value[2:] if value.startswith('0x') else value, 16)
def hexint(value: Union[str,int]):
return value if type(value) is int else int(value,0)
def _keystr1(value):
@@ -73,7 +74,7 @@ class defaultdictk (Generic[K,V], dict[K,V]):
T = TypeVar('T')
class Field (Generic[T]):
class GlobalVar (Generic[T]):
def __init__(self, value: T = None):
self._value = value

View File

@@ -0,0 +1,80 @@
import asyncio
import logging
from abc import abstractmethod
from asyncio import Event
from typing import TypeVar, Generic, Awaitable, Callable, Optional
from dexorder import NARG
log = logging.getLogger(__name__)
K = TypeVar('K')
V = TypeVar('V')
class _Query (Generic[V]):
def __init__ (self):
self.event = Event()
self.result: V = NARG
self.exception: Optional[Exception] = None
def __bool__(self):
return self.result is not NARG
class AsyncDict (Generic[K,V]):
"""
Implements per-key locks around accessing dictionary values.
Either supply fetch and store functions in the constructor, or override those methods in a subclass.
"""
def __init__(self,
fetch: Callable[[K,V], Awaitable[V]] = None,
store: Callable[[K,V], Awaitable[V]] = None,
):
self._queries: dict[K,_Query[V]] = {}
if fetch is not None:
self.fetch = fetch
if store is not None:
self.store = store
async def get(self, key: K, default: V = NARG) -> V:
query = self._queries.get(key)
if query is None:
return await self._query(key, self.fetch(key, default))
else:
await query.event.wait()
if query.exception is not None:
raise query.exception
return query.result
async def set(self, key: K, value: V):
query = self._queries.get(K)
if query is not None:
await query.event.wait()
await self._query(key, self.store(key, value))
# noinspection PyMethodMayBeStatic,PyUnusedLocal
@abstractmethod
async def fetch(self, key: K, default: V = NARG) -> V:
raise NotImplementedError
# noinspection PyMethodMayBeStatic,PyUnusedLocal
@abstractmethod
async def store(self, key: K, value: V) -> V:
"""
Must return the value that was just set.
"""
raise NotImplementedError
async def _query(self, key: K, coro: Awaitable[V]) -> V:
assert key not in self._queries
query = _Query()
self._queries[key] = query
try:
query.result = await coro
except Exception as e:
query.exception = e
finally:
del self._queries[key]
query.event.set()
return query.result

View File

@@ -1,7 +1,6 @@
import asyncio
import inspect
from abc import ABC
from typing import Union, Callable, Awaitable, TypeVar, Generic
from typing import Union, Awaitable, TypeVar
async def async_yield():

View File

@@ -1,4 +1,3 @@
from collections import defaultdict
from decimal import Decimal
from json import JSONEncoder
from typing import Any

59
src/dexorder/util/lru.py Normal file
View File

@@ -0,0 +1,59 @@
from collections import OrderedDict
from typing import Optional, Callable, TypeVar, Generic, MutableMapping, Iterator
K = TypeVar('K')
V = TypeVar('V')
class LRUCache (MutableMapping[K,V], Generic[K,V]):
def __init__(self, capacity: int, factory: Optional[Callable[[K],V]]=None):
assert capacity >= 0
super().__init__()
self._d = OrderedDict()
self.capacity = capacity
self.factory = factory
def __setitem__(self, key, value):
self._d.__setitem__(key, value)
self._d.move_to_end(key)
if len(self._d) > self.capacity:
self._d.popitem(last=False)
def __delitem__(self, key: K):
del self._d[key]
def __getitem__(self, key: K) -> V:
try:
result = self._d[key]
self._d.move_to_end(key) # mark as recently used
except KeyError:
if self.factory is None:
raise
result = self.factory(key)
self._d[key] = result
return result
def get(self, key, default: Optional[V] = None) -> V:
try:
result = self._d[key]
self._d.move_to_end(key) # mark as recently used
except KeyError:
if self.factory is None:
raise
result = self.factory(key)
self._d[key] = result
return result
def __len__(self) -> int:
return len(self._d)
def __iter__(self) -> Iterator[K]:
return iter(self._d)
@property
def is_empty(self):
return len(self._d) == 0
@property
def is_full(self):
return len(self._d) == self.capacity

View File

@@ -1,24 +0,0 @@
def round_tick(tick, tick_spacing):
"""
returns the nearest available tick
"""
return round(tick/tick_spacing) * tick_spacing
def nearest_available_ticks(tick, tick_spacing):
"""
returns the two available ticks just below and above the given tick
"""
lower = tick // tick_spacing * tick_spacing
upper = lower + tick_spacing
return lower, upper
def spans_tick(tick, lower, upper):
return spans_range( *nearest_available_ticks(tick), lower, upper)
def spans_range(below, above, lower, upper):
return lower < above and upper > below

View File

@@ -13,7 +13,7 @@ log = logging.getLogger(__name__)
# values of DELETE are serialized as nulls
def pub_vault_balances(_s, k, v):
chain_id = current_chain.get().chain_id
chain_id = current_chain.get().id
try:
return f'{chain_id}|{vault_owners[k]}', 'vb', (chain_id, k, json.dumps({k2: str(v2) for k2, v2 in v.items()}))
except KeyError:

View File

@@ -7,9 +7,10 @@ from typing import Union, Callable
from dexorder import config, db, now, current_w3
from dexorder.base.chain import current_chain
from dexorder.blockstate import current_blockstate
from dexorder.blockstate.branch import Branch
from dexorder.blockstate.fork import Fork, current_fork
from dexorder.blockstate.state import FinalizedBlockState
from dexorder.database.model import Block
from dexorder.database.model.block import current_block, latest_block
from dexorder.base.block import Block, BlockInfo
from dexorder.progressor import BlockProgressor
from dexorder.util.async_util import Maywaitable
@@ -43,7 +44,7 @@ class BlockWalker (BlockProgressor):
db.connect()
w3 = current_w3.get()
chain = current_chain.get()
chain_id = chain.chain_id
chain_id = chain.id
confirm_offset = (config.confirms if config.confirms is not None else chain.confirms) - 1
batch_size = config.batch_size if config.batch_size is not None else chain.batch_size
current_blockstate.set(FinalizedBlockState())
@@ -66,22 +67,17 @@ class BlockWalker (BlockProgressor):
while self.running:
# noinspection PyBroadException
try:
latest_rawblock = await w3.eth.get_block('latest')
latest_height = latest_rawblock['number']
latest_block.set(Block.from_data(chain_id, latest_rawblock))
if prev_height is None or latest_height > prev_height:
prev_height = latest_height
log.debug(f'polled new block {latest_height}')
promotion_height = latest_height - confirm_offset
latest_blockdata: BlockInfo = await w3.eth.get_block('latest')
latest = Block(chain_id, latest_blockdata)
if prev_height is None or latest.height > prev_height:
prev_height = latest.height
log.debug(f'polled new block {latest.height}')
promotion_height = latest.height - confirm_offset
while (processed_height < promotion_height and
(config.walker_stop is None or processed_height < config.walker_stop)):
cur_height = min(promotion_height, processed_height+batch_size-1)
if config.walker_stop is not None:
cur_height = min(cur_height, config.walker_stop)
block_data = await w3.eth.get_block(cur_height)
block = Block.from_data(chain_id, block_data)
assert block.height == cur_height
current_block.set(block)
await self.handle(processed_height+1, cur_height, chain=chain, w3=w3)
if self.flush_delay is None or \
self.flush_type=='blocks' and last_flush + self.flush_delay <= processed_height or \
@@ -129,6 +125,9 @@ class BlockWalker (BlockProgressor):
chain = current_chain.get()
if w3 is None:
w3 = current_w3.get()
branch = Branch(to_height, from_height)
fork = Fork([branch])
current_fork.set(fork)
batches = await self.get_backfill_batches(from_height, to_height, w3=w3)
await self.invoke_callbacks(batches, chain)
log.info(f'completed through block {to_height}')

View File

@@ -1,62 +1,133 @@
from dexorder.blockstate import BlockState, BlockDict
from dexorder.database.model.block import Block
import logging
import sys
block_10 = Block(chain=1, height=10, hash=bytes.fromhex('10'), parent=bytes.fromhex('09'), data=None)
block_11a = Block(chain=1, height=11, hash=bytes.fromhex('1a'), parent=block_10.hash, data=None)
block_11b = Block(chain=1, height=11, hash=bytes.fromhex('1b'), parent=block_10.hash, data=None)
block_12a = Block(chain=1, height=12, hash=bytes.fromhex('12'), parent=block_11a.hash, data=None)
state = BlockState(block_10, {'series':{'foo':'bar'}})
BlockState.set_cur(state)
d = BlockDict('series')
from dexorder import DELETE, NARG
from dexorder.base.chain import current_chain, Mock
from dexorder.blockstate import BlockState, BlockDict, current_blockstate
from dexorder.blockstate.branch import Branch
from dexorder.blockstate.fork import current_fork, Fork
def start_block(b):
Block.set_cur(b)
state.add_block(b)
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logging.getLogger('dexorder').setLevel(logging.DEBUG)
start_block(block_11a)
del d['foo']
d['foue'] = 'barre'
current_chain.set(Mock)
start_block(block_12a)
d['foo'] = 'bar2'
b0 = bytes([0]) # genesis block hash
root_branch = Branch(0, 0, bytes(), [b0])
start_block(block_11b)
d['fu'] = 'ku'
def new_state():
state = BlockState()
state.add_branch(root_branch)
current_blockstate.set(state)
return state
def print_dict(x:dict=d):
for k, v in x.items():
print(f'{k:>10} : {v}')
s = new_state()
for block in [block_10,block_11a,block_12a,block_11b]:
Block.set_cur(block)
print()
print(Block.cur().hash)
print_dict()
series_name = 'test'
series = BlockDict(series_name)
def test11b():
Block.set_cur(block_11b)
assert 'fu' in d
assert d['fu'] == 'ku'
assert 'foo' in d
assert d['foo'] == 'bar'
def get(fork: Fork, default=NARG):
value = s.get(fork, series_name, 'foo', default)
# print(f'{fork} => {value}')
return value
def test12a():
Block.set_cur(block_12a)
assert 'fu' not in d
assert 'foo' in d
assert d['foo'] == 'bar2'
assert 'foue' in d
assert d['foue'] == 'barre'
test11b()
test12a()
state.promote_root(block_11a)
print()
print('promoted root')
print_dict(state.root_state)
test12a()
state.promote_root(block_12a)
print()
print('promoted root')
print_dict(state.root_state)
test12a()
block_data = {}
def make_block(num: int, data: dict=None):
key = bytes([num])
block_data[key] = data if data is not None else dict(foo=hex(num)[2:])
return key
# blocks are by height and then an a-b-c fork
# by default, each block sets foo=<blockname>
b1a = make_block(0x1a)
b2a = make_block(0x2a)
b3a = make_block(0x3a)
b4a = make_block(0x4a)
b5a = make_block(0x5a)
def make_branch(state: BlockState, height: int, start: int, parent: bytes, path: list[bytes]):
branch = Branch(height, start, parent, path)
fork = state.add_branch(branch)
current_fork.set(fork)
for block_id in reversed(branch.path):
for k,v in block_data[block_id].items():
series[k] = v
return fork
fork_a = make_branch(s, 5, 1, b0, [b5a, b4a, b3a, b2a, b1a])
fork_a1 = make_branch(s, 1, 1, b0, [b1a])
fork_a2 = make_branch(s, 2, 2, b1a, [b2a])
fork_a3 = make_branch(s, 3, 3, b2a, [b3a])
fork_aa = make_branch(s, 3, 1, b0, [b3a, b2a, b1a])
fork_ab = make_branch(s, 5, 4, b3a, [b5a, b4a])
# this fork has multiple branch combinations. the algo should prefer using fewer branches.
assert fork_ab.branches[1] == fork_aa.branch
assert get(fork_a) == '5a'
assert get(fork_aa) == '3a'
assert get(fork_ab) == '5a'
# now change the current value at the end of fork_a
current_fork.set(fork_a)
diff_count = len(s.diffs_by_branch[fork_a.branch_id])
series['foo'] = 'not'
assert get(fork_a) == 'not'
series['foo'] = 'bar'
assert get(fork_a) == 'bar'
# make sure it didn't create any extra diffs but performed value replacement in the DiffEntry instead
assert diff_count == len(s.diffs_by_branch[fork_a.branch_id])
# chain B does nothing until deleting foo at height 3, then it sets it back at height 5
# block 1 is taken from a-chain
b2b = make_block(0x2b, {})
b3b = make_block(0x3b, dict(foo=DELETE))
b4b = make_block(0x4b, {})
b5b = make_block(0x5b)
fork_from_a = make_branch(s, 2, 2, b1a, [b2b])
# this fork should have joined the branch from fork_a1, which connects to genesis for a total of three branches
assert len(fork_from_a.branches) == 3
assert fork_from_a.branches[1] == fork_a1.branch
# the value should have carried over from the other branch
assert get(fork_from_a) == '1a'
fork_delete = make_branch(s, 4, 3, b2b, [b4b, b3b])
missing = 'missing'
assert get(fork_delete, missing) is missing
# make sure it throws KeyError since the key is deleted
try:
found = series['foo']
assert False
except KeyError:
pass
# restore the 'foo' key with a value of '5b'
fork_restore = make_branch(s, 5, 5, b4b, [b5b])
assert get(fork_restore) == '5b'
s.promote_root(fork_aa)
# test garbage collection
diffs = s.diffs_by_series[series_name].get('foo')
assert diffs
assert diffs[-1].height == 3 # only the very latest value should be maintained
try:
s.promote_root(fork_from_a)
assert False # fork B should not be able to be promoted
except AssertionError:
pass
# chain C
b1c = make_block(0x1c)
b2c = make_block(0x2c)
b3c = make_block(0x3c)
b4c = make_block(0x4c)
b5c = make_block(0x5c)
logging.getLogger('dexorder').error('Insufficient number of test cases')