walker/finaldata fixes; mirror.py

This commit is contained in:
Tim
2024-02-25 21:45:32 -04:00
parent e39d219743
commit a68c6a3c86
16 changed files with 420 additions and 154 deletions

1
.gitignore vendored
View File

@@ -7,3 +7,4 @@ __pycache__
.idea .idea
.vscode .vscode
ohlc/ ohlc/
/metadata.json

View File

@@ -45,7 +45,7 @@ def execute(main:Coroutine, shutdown=None, parse_args=True):
print_exception(x) print_exception(x)
for t in asyncio.all_tasks(): for t in asyncio.all_tasks():
t.cancel() t.cancel()
else: # else:
loop.run_forever() # loop.run_forever()
loop.stop() loop.stop()
loop.close() loop.close()

View File

@@ -6,11 +6,13 @@ from datetime import timedelta
from web3.types import EventData from web3.types import EventData
from dexorder import from_timestamp, blockchain, config from dexorder import from_timestamp, blockchain, config
from dexorder.addrmeta import address_metadata
from dexorder.base.chain import current_chain
from dexorder.bin.executable import execute from dexorder.bin.executable import execute
from dexorder.blocktime import get_block_timestamp from dexorder.blocktime import get_block_timestamp
from dexorder.configuration import parse_args from dexorder.configuration import parse_args
from dexorder.contract import get_contract_event from dexorder.contract import get_contract_event
from dexorder.database.model.block import current_block from dexorder.database.model.block import current_block, latest_block
from dexorder.ohlc import LightOHLCRepository from dexorder.ohlc import LightOHLCRepository
from dexorder.pools import get_uniswap_data from dexorder.pools import get_uniswap_data
from dexorder.util import hexstr from dexorder.util import hexstr
@@ -39,9 +41,16 @@ def flush_callback():
# log.info("finalizing OHLC's") # log.info("finalizing OHLC's")
# log.info(f'\ttook {(now() - start).total_seconds():.1f} seconds') # log.info(f'\ttook {(now() - start).total_seconds():.1f} seconds')
block = current_block.get() block = current_block.get()
confirms = (config.confirms if config.confirms is not None else current_chain.get().confirms) - 1
if latest_block.get().height - block.height <= 2*confirms:
log.info(f'forward filling to present time')
for addr, data in address_metadata.items():
if data['type'] == 'Pool':
ohlcs.light_update_all(addr, from_timestamp(block.timestamp), None)
log.info("flushing OHLC's")
ohlcs.flush()
log.info(f'backfill completed through block {block.height} ' log.info(f'backfill completed through block {block.height} '
f'{from_timestamp(block.timestamp):%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}') f'{from_timestamp(block.timestamp):%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}')
ohlcs.flush()
async def main(): async def main():
logging.basicConfig(level=logging.INFO, stream=sys.stdout) logging.basicConfig(level=logging.INFO, stream=sys.stdout)

View File

@@ -0,0 +1,29 @@
# Prints a JSON string to stdout containing metadata information for all the known tokens and pools
#
# see metadata.py
import logging
import sys
from sqlalchemy import select
from dexorder import db
from dexorder.configuration import parse_args
from dexorder.database.model import Pool, Token
from dexorder.metadata import generate_metadata
log = logging.getLogger(__name__)
def main():
logging.basicConfig(level=logging.INFO, stream=sys.stderr)
log.setLevel(logging.DEBUG)
parse_args()
db.connect(migrate=False)
tokens = db.session.scalars(select(Token))
pools = db.session.scalars(select(Pool))
generate_metadata(tokens, pools)
if __name__ == '__main__':
main()

View File

@@ -1,99 +0,0 @@
# Prints a JSON string to stdout containing metadata information for all the known tokens and pools
#
# {
# "t": [
# {
# "a": "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1", // address
# "n": "Wrapped Ether", // name
# "s": "WETH", // symbol
# "d": 18 // decimals
# }
# ],
# "p": [
# {
# "a": "0x17c14D2c404D167802b16C450d3c99F88F2c4F4d", // address
# "b": "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1", // base token (token0)
# "q": "0xFF970A61A04b1cA14834A43f5dE4533eBDDB5CC8", // quote token (token1)
# "f": 3000, // fee in millionths
# "e": 1, // exchange (see Exchange enum) 1=UniswapV3
# "d": 12 // decimals, may be negative
# }
# ]
# }
#
import json
import logging
import sys
from sqlalchemy import select
from dexorder import db
from dexorder.configuration import parse_args
from dexorder.database.model import Pool, Token
log = logging.getLogger(__name__)
token_map: dict[str,Token] = {}
def dump(*args):
print(*args, end='')
def json_dump(**kwargs):
dump(json.dumps(dict(**kwargs),separators=(',', ':')))
def dump_tokens():
first = True
for token in db.session.scalars(select(Token)):
token: Token
token_map[token.address] = token
if first:
first = False
else:
dump(',')
json_dump(
a=token.address,
n=token.name,
s=token.symbol,
d=token.decimals,
)
def dump_pools():
first = True
for pool in db.session.scalars(select(Pool)):
pool: Pool
if first:
first = False
else:
dump(',')
json_dump(
a=pool.address,
b=pool.base,
q=pool.quote,
f=pool.fee,
e=pool.exchange.value,
d=pool.decimals,
)
def generate_metadata():
dump('{"t":[')
dump_tokens()
dump('],"p":[')
dump_pools()
dump(']}')
def main():
logging.basicConfig(level=logging.INFO, stream=sys.stderr)
log.setLevel(logging.DEBUG)
parse_args()
db.connect(migrate=False)
generate_metadata()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,87 @@
import asyncio
import logging
import os
import sys
from dexorder import config, blockchain
from dexorder.bin.executable import execute
from dexorder.blockstate import current_blockstate
from dexorder.blockstate.state import FinalizedBlockState
from dexorder.contract import get_deployment_address, ContractProxy
from dexorder.metadata import generate_metadata
from dexorder.pools import get_pool
from dexorder.tokens import get_token
log = logging.getLogger('dexorder')
async def write_metadata( pools, mirror_pools ):
filename = config.mirror_metadata
if filename is None:
return
pool_dicts = [get_pool(addr) for (addr,_inverted) in mirror_pools]
pool_dicts = await asyncio.gather(*pool_dicts)
for data, addr, (_,inverted) in zip(pool_dicts, pools, mirror_pools):
data['x'] = dict(data=dict(uri=f'https://alpha.dexorder.trade/42161/{addr}/', inverted=inverted))
tokens = set(p['base'] for p in pool_dicts)
tokens.update(p['quote'] for p in pool_dicts)
tokens = await asyncio.gather(*[get_token(t) for t in tokens])
with open(filename, 'w') as f:
generate_metadata(tokens, pool_dicts, f)
log.info(f'wrote {filename}')
async def await_mirror(tx, pool_addr, mirror_addr, mirror_inverted ):
await tx.wait()
log.info(f'Updated {pool_addr} => {"1/" if mirror_inverted else ""}{mirror_addr}')
async def main():
pools = (config.mirror_pools or [])
if not pools:
log.error('must configure mirror_pools')
return
if config.account is None:
# Dev Account #5
config.account = '0x8b3a350cf5c34c9194ca85829a2df0ec3153be0318b5e2d3348e872092edffba'
await blockchain.connect()
current_blockstate.set(FinalizedBlockState())
mirror_addr = config.mirror_env
if mirror_addr is None:
mirror_addr = os.environ.get('MIRRORENV')
if mirror_addr is None:
mirror_addr = get_deployment_address('DeployMirror', 'MirrorEnv')
if mirror_addr is None:
log.error('must configure mirror_env or set envioronment MIRRORENV')
return
log.info(f'Initializing with MirrorEnv {mirror_addr}')
mirrorenv = ContractProxy(mirror_addr, 'MirrorEnv')
proms = []
mirror_pools = []
for pool in pools:
log.debug(f'Mirroring pool {pool}')
proms.append(await mirrorenv.transact.mirrorPool(pool))
await asyncio.gather(*[p.wait() for p in proms])
for pool in pools:
mirror_addr, mirror_inverted = await mirrorenv.pools(pool)
log.debug(f'\tmirror result {mirror_addr} {mirror_inverted}')
mirror_pools.append((mirror_addr, mirror_inverted))
await write_metadata(pools, mirror_pools)
log.info('Mirroring pools')
while True:
proms = []
for pool_addr, (mirror_addr, mirror_inverted) in zip(pools, mirror_pools):
try:
log.info(f'mirroring {pool_addr}')
tx = await mirrorenv.transact.updatePool(pool_addr)
proms.append((tx, pool_addr, mirror_addr, mirror_inverted))
except Exception as x:
log.exception(x)
await asyncio.gather(*[await_mirror(*args) for args in proms])
await asyncio.sleep(config.polling if config.polling > 0 else 1)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
log.setLevel(logging.DEBUG)
execute(main())

View File

@@ -8,30 +8,29 @@ from aiohttp import ClientResponseError
from eth_typing import URI from eth_typing import URI
from hexbytes import HexBytes from hexbytes import HexBytes
from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider
from web3.middleware.signing import async_construct_sign_and_send_raw_middleware
from web3.types import RPCEndpoint, RPCResponse from web3.types import RPCEndpoint, RPCResponse
from .. import current_w3, Blockchain, config from .. import current_w3, Blockchain, config, Account, NARG
from ..base.chain import current_chain from ..base.chain import current_chain
from ..configuration import resolve_rpc_url from ..configuration import resolve_rpc_url
from ..configuration.resolve import resolve_ws_url from ..configuration.resolve import resolve_ws_url
from ..contract import get_contract_data from ..contract import get_contract_data
async def connect(rpc_url=None): async def connect(rpc_url=None, account=None, autosign=False):
""" """
connects to the rpc_url and configures the context connects to the rpc_url and configures context vars
if you don't want to use ctx.account for this w3, either set ctx.account first or
use create_w3() and set w3.eth.default_account separately
""" """
w3 = create_w3(rpc_url) w3 = await create_w3(rpc_url, account, autosign)
current_w3.set(w3) current_w3.set(w3)
current_chain.set(Blockchain.get(await w3.eth.chain_id)) current_chain.set(Blockchain.get(await w3.eth.chain_id))
return w3 return w3
def create_w3(rpc_url=None): async def create_w3(rpc_url=None, account=NARG, autosign=False):
# todo create a proxy w3 that rotates among rpc urls # todo create a proxy w3 that rotates among rpc urls
# self.w3s = tuple(create_w3(url) for url in rpc_url_or_tag) # self.w3s = tuple(await create_w3(url) for url in rpc_url_or_tag)
# chain_id = self.w3s[0].eth.chain_id # chain_id = self.w3s[0].eth.chain_id
# assert all(w3.eth.chain_id == chain_id for w3 in self.w3s) # all rpc urls must be the same blockchain # assert all(w3.eth.chain_id == chain_id for w3 in self.w3s) # all rpc urls must be the same blockchain
# self.w3iter = itertools.cycle(self.w3s) # self.w3iter = itertools.cycle(self.w3s)
@@ -43,6 +42,12 @@ def create_w3(rpc_url=None):
w3.middleware_onion.remove('attrdict') w3.middleware_onion.remove('attrdict')
w3.middleware_onion.add(clean_input_async, 'clean_input') w3.middleware_onion.add(clean_input_async, 'clean_input')
w3.eth.Contract = _make_contract(w3.eth) w3.eth.Contract = _make_contract(w3.eth)
if autosign:
a = Account.get(account)
if a is not None:
# noinspection PyTypeChecker
w3.middleware_onion.add(await async_construct_sign_and_send_raw_middleware(a))
w3.eth.default_account = a.address
return w3 return w3
@@ -52,7 +57,7 @@ async def create_w3_ws(ws_url=None) -> AsyncWeb3:
this does *not* attach any signer to the w3. make sure to inject the proper middleware with Account.attach(w3) this does *not* attach any signer to the w3. make sure to inject the proper middleware with Account.attach(w3)
""" """
# todo create a proxy w3 that rotates among rpc urls # todo create a proxy w3 that rotates among rpc urls
# self.w3s = tuple(create_w3(url) for url in rpc_url_or_tag) # self.w3s = tuple(await create_w3(url) for url in rpc_url_or_tag)
# chain_id = self.w3s[0].eth.chain_id # chain_id = self.w3s[0].eth.chain_id
# assert all(w3.eth.chain_id == chain_id for w3 in self.w3s) # all rpc urls must be the same blockchain # assert all(w3.eth.chain_id == chain_id for w3 in self.w3s) # all rpc urls must be the same blockchain
# self.w3iter = itertools.cycle(self.w3s) # self.w3iter = itertools.cycle(self.w3s)

View File

@@ -30,3 +30,7 @@ class Config:
min_gas: str = '0' min_gas: str = '0'
walker_flush_interval: float = 300 walker_flush_interval: float = 300
mirror_env: Optional[str] = None
mirror_pools: Optional[list[str]] = field(default_factory=list)
mirror_metadata: str = field(default='metadata.json')

View File

@@ -4,6 +4,7 @@ import os
from .abi import abis from .abi import abis
from .contract_proxy import ContractProxy from .contract_proxy import ContractProxy
from .. import current_w3 from .. import current_w3
from ..base.chain import current_chain
def get_contract_data(name): def get_contract_data(name):
@@ -14,6 +15,17 @@ def get_contract_data(name):
return json.load(file) return json.load(file)
def get_deployment_address(deployment_name, contract_name, *, chain_id=None):
if chain_id is None:
chain_id = current_chain.get().chain_id
with open(f'../contract/broadcast/{deployment_name}.sol/{chain_id}/run-latest.json', 'rt') as file:
data = json.load(file)
for tx in data.get('transactions',[]):
if tx.get('contractName') == contract_name:
return tx['contractAddress']
return None
def get_contract_event(contract_name:str, event_name:str): def get_contract_event(contract_name:str, event_name:str):
return getattr(current_w3.get().eth.contract(abi=get_contract_data(contract_name)['abi']).events, event_name)() return getattr(current_w3.get().eth.contract(abi=get_contract_data(contract_name)['abi']).events, event_name)()

View File

@@ -10,15 +10,52 @@ from dexorder.database.model.block import current_block
from dexorder.util import hexstr from dexorder.util import hexstr
class ContractTransaction:
def __init__(self, id_bytes: bytes, rawtx: Optional[bytes] = None):
self.id_bytes = id_bytes
self.id = hexstr(self.id_bytes)
self.data = rawtx
self.receipt: Optional[TxReceipt] = None
def __repr__(self):
# todo this is from an old status system
receipt_status = 'IN_FLIGHT' if self.receipt is None else 'REVERTED' if self.receipt.status == 0 else self.receipt.blockNumber
return f'Transaction({self.id},{receipt_status})'
async def wait(self) -> TxReceipt:
if self.receipt is None:
self.receipt = await current_w3.get().eth.wait_for_transaction_receipt(self.id)
return self.receipt
class DeployTransaction (ContractTransaction):
def __init__(self, contract: 'ContractProxy', id_bytes: bytes):
super().__init__(id_bytes)
self.contract = contract
async def wait(self) -> TxReceipt:
receipt = await super().wait()
self.contract.address = receipt['contractAddress']
# noinspection PyProtectedMember
self.contract._contracts.clear()
return receipt
def call_wrapper(func): def call_wrapper(func):
async def f(*args, **kwargs): async def f(*args, **kwargs):
return await func(*args, **kwargs).call(block_identifier=hexstr(current_block.get().hash)) try:
blockhash = hexstr(current_block.get().hash)
except LookupError:
blockhash = 'latest'
return await func(*args, **kwargs).call(block_identifier=blockhash)
return f return f
def transact_wrapper(func): def transact_wrapper(func):
async def f(*args, **kwargs): async def f(*args, **kwargs):
return await func(*args, **kwargs).transact() tx_id = await func(*args, **kwargs).transact()
return ContractTransaction(tx_id)
return f return f
@@ -69,15 +106,12 @@ class ContractProxy:
def events(self): def events(self):
return self.contract.events return self.contract.events
# def deploy(self, *args): async def deploy(self, *args) -> DeployTransaction:
# """ """
# Calls the contract constructor transaction and waits to receive a transaction receipt. Calls the contract constructor transaction and waits to receive a transaction receipt.
# """ """
# tx: ContractTransaction = self.transact.constructor(*args) tx: ContractTransaction = await self.transact.constructor(*args)
# receipt = tx.wait() return DeployTransaction(self, tx.id_bytes)
# self.address = receipt.contractAddress
# self._contracts.clear()
# return receipt
@property @property
def transact(self): def transact(self):
@@ -93,16 +127,3 @@ class ContractProxy:
def __repr__(self): def __repr__(self):
addr = self.contract.address addr = self.contract.address
return f'{self._interface_name}({addr or ""})' return f'{self._interface_name}({addr or ""})'
class ContractTransaction:
def __init__(self, id_bytes: bytes, rawtx: bytes):
self.id_bytes = id_bytes
self.id = hexstr(self.id_bytes)
self.data = rawtx
self.receipt: Optional[TxReceipt] = None
def __repr__(self):
# todo this is from an old status system
receipt_status = 'IN_FLIGHT' if self.receipt is None else 'REVERTED' if self.receipt.status == 0 else self.receipt.blockNumber
return f'Transaction({self.id},{receipt_status})'

View File

@@ -4,10 +4,16 @@ from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column
from dexorder.database.model import Base from dexorder.database.model import Base
from dexorder.util import hexint, Field from dexorder.util import hexint, Field, hexstr
class Block(Base): class Block(Base):
@staticmethod
def from_data(chain_id:int, data:dict):
return Block(chain=chain_id, height=data['number'], hash=hexstr(data['hash']),
parent=hexstr(data['parentHash']), data=data)
chain: Mapped[int] = mapped_column(primary_key=True) chain: Mapped[int] = mapped_column(primary_key=True)
height: Mapped[int] = mapped_column(primary_key=True) height: Mapped[int] = mapped_column(primary_key=True)
hash: Mapped[bytes] = mapped_column(primary_key=True) hash: Mapped[bytes] = mapped_column(primary_key=True)

105
src/dexorder/metadata.py Normal file
View File

@@ -0,0 +1,105 @@
# Outputs JSON describing tokens and pools, for static delivery to browsers
#
# {
# "t": [ // TOKENS
# {
# "a": "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1", // address
# "n": "Wrapped Ether", // name
# "s": "WETH", // symbol
# "d": 18 // decimals
# }
# ],
# "p": [ // POOLS
# {
# "a": "0x17c14D2c404D167802b16C450d3c99F88F2c4F4d", // address
# "b": "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1", // base token (token0)
# "q": "0xFF970A61A04b1cA14834A43f5dE4533eBDDB5CC8", // quote token (token1)
# "f": 3000, // fee in millionths (hundredths of a basis point, %%%)
# "e": 1, // exchange (see Exchange enum) 1=UniswapV3
# "d": 12 // decimals, may be negative
# }
# ]
# }
#
import sys
from typing import Union, Iterable
from dexorder.database.model import Token, Pool
from dexorder.database.model.pool import PoolDict
from dexorder.database.model.token import TokenDict
from dexorder.util import json
token_map: dict[str, Token] = {}
def dump(file, *args):
print(*args, end='', file=file)
def json_dump(out, **kwargs):
dump(out, json.dumps(dict(**kwargs)))
def dump_tokens(out, tokens):
first = True
for token in tokens:
token: Token
if first:
first = False
else:
dump(out, ',')
if isinstance(token, Token):
token: Token
a = token.address,
n = token.name,
s = token.symbol,
d = token.decimals,
else:
token: TokenDict
a = token['address']
n = token['name']
s = token['symbol']
d = token['decimals']
token_map[a] = token
json_dump(out, a=a, n=n, s=s, d=d)
def dump_pools(out, pools):
first = True
for pool in pools:
if first:
first = False
else:
dump(out, ',')
if isinstance(pool, Pool):
a = pool.address
b = pool.base
q = pool.quote
f = pool.fee
e = pool.exchange.value
d = pool.decimals
x = None
else:
pool: PoolDict
a = pool['address']
b = pool['base']
q = pool['quote']
f = pool['fee']
e = pool['exchange']
d = pool['decimals']
# noinspection PyTypedDict
x = None if 'x' not in pool else pool['x']
data = dict(a=a, b=b, q=q, f=f, e=e, d=d)
if x is not None:
data['x'] = x
json_dump(out,**data)
def generate_metadata(tokens: Iterable[Union[Token, TokenDict]], pools: Iterable[Union[Pool, PoolDict]],
file=sys.stdout):
dump(file, '{"t":[')
dump_tokens(file, tokens)
dump(file, '],"p":[')
dump_pools(file, pools)
dump(file, ']}')

View File

@@ -1,6 +1,7 @@
import logging import logging
import os import os
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from decimal import InvalidOperation
from typing import Optional, NamedTuple, Reversible, Union from typing import Optional, NamedTuple, Reversible, Union
from cachetools import LFUCache from cachetools import LFUCache
@@ -30,7 +31,19 @@ OHLC = list[Union[None,int,str]] # typedef
def opt_dec(v): def opt_dec(v):
return None if v is None else dec(v) try:
return None if v is None else dec(v)
except InvalidOperation as x:
log.error(f'Could not convert to decimal "{repr(v)}"')
raise x
def try_dec(v):
try:
return dec(v)
except InvalidOperation as x:
log.error(f'Could not convert to decimal "{repr(v)}"')
raise x
def dt(v): def dt(v):
return v if isinstance(v, datetime) else from_timestamp(v) return v if isinstance(v, datetime) else from_timestamp(v)
@@ -38,11 +51,12 @@ def dt(v):
class NativeOHLC: class NativeOHLC:
@staticmethod @staticmethod
def from_ohlc(ohlc: OHLC) -> 'NativeOHLC': def from_ohlc(ohlc: OHLC) -> 'NativeOHLC':
return NativeOHLC(*[cast(value) for value, cast in zip(ohlc,(dt, opt_dec, opt_dec, opt_dec, dec))], ohlc=ohlc) return NativeOHLC(*[cast(value) for value, cast in zip(ohlc,(dt, opt_dec, opt_dec, opt_dec, try_dec))], ohlc=ohlc)
# noinspection PyShadowingBuiltins # noinspection PyShadowingBuiltins
def __init__(self, start: datetime, open: Optional[dec], high: Optional[dec], low: Optional[dec], close: dec, def __init__(self, start: datetime, open: Optional[dec], high: Optional[dec], low: Optional[dec], close: dec,
*, ohlc: list=None): *, ohlc: list=None):
assert close is not None
self.start = start self.start = start
self.open = open self.open = open
self.high = high self.high = high
@@ -52,6 +66,16 @@ class NativeOHLC:
self._json: str = None if ohlc is None else json.dumps(ohlc) self._json: str = None if ohlc is None else json.dumps(ohlc)
def copy_from(self, other):
assert self.start == other.start
self.open = other.open
self.high = other.high
self.low = other.low
self.close = other.close
self._ohlc = None
self._json = None
def update(self, price): def update(self, price):
pstr = str(price) pstr = str(price)
if self.open is None: if self.open is None:
@@ -110,9 +134,8 @@ def period_from_name(name: str) -> timedelta:
def ohlc_start_time(time, period: timedelta): def ohlc_start_time(time, period: timedelta):
""" returns the start time of the ohlc containing time, such that start_time <= time < start_time + period """ """ returns the start time of the ohlc containing time, such that start_time <= time < start_time + period """
period_sec = int(period.total_seconds()) period_count = (time - OHLC_DATE_ROOT) // period
period_count = (time - OHLC_DATE_ROOT).total_seconds() // period_sec return OHLC_DATE_ROOT + period * period_count
return OHLC_DATE_ROOT + timedelta(seconds=period_sec * period_count)
def update_ohlc(prev: NativeOHLC, period: timedelta, time: datetime, price: Optional[dec]) -> list[NativeOHLC]: def update_ohlc(prev: NativeOHLC, period: timedelta, time: datetime, price: Optional[dec]) -> list[NativeOHLC]:
@@ -124,7 +147,7 @@ def update_ohlc(prev: NativeOHLC, period: timedelta, time: datetime, price: Opti
cur = prev cur = prev
if time < cur.start: if time < cur.start:
# data corruption. just shut down # data corruption. just shut down
fatal(f'update_ohlc({prev}, {period}, {time}, {price}) failed because time is before the start of the candle') fatal(f'update_ohlc({prev.ohlc}, {period}, {time}, {price}) failed because time is before the start of the candle')
result = [] result = []
# advance time and finalize any past OHLC's into the result array # advance time and finalize any past OHLC's into the result array
while True: while True:
@@ -147,6 +170,12 @@ class OHLCKey (NamedTuple):
period: timedelta period: timedelta
def quotes_path(chain_id: int = None):
if chain_id is None:
chain_id = current_chain.get().chain_id
return f'{chain_id}/quotes.json'
def chunk_path(symbol: str, period: timedelta, time: datetime, *, chain_id: int = None) -> str: def chunk_path(symbol: str, period: timedelta, time: datetime, *, chain_id: int = None) -> str:
if chain_id is None: if chain_id is None:
chain_id = current_chain.get().chain_id chain_id = current_chain.get().chain_id
@@ -200,6 +229,7 @@ class Chunk:
index = (native.start - start) // self.period index = (native.start - start) // self.period
# log.debug(f'save {symbol} {ohlc_name(period)} chunk {start} index {index} <= {len(chunk)}') # log.debug(f'save {symbol} {ohlc_name(period)} chunk {start} index {index} <= {len(chunk)}')
if index > len(self.bars): if index > len(self.bars):
# the incoming bar is gapped into the future. fill in the gap with null data bars.
if not backfill: if not backfill:
log.error(f'chunk gap: {index} > {len(self.bars)} {self.symbol} {period_name(self.period)} {native}' + ''.join( log.error(f'chunk gap: {index} > {len(self.bars)} {self.symbol} {period_name(self.period)} {native}' + ''.join(
f'\n\t{c}' for c in self.bars)) f'\n\t{c}' for c in self.bars))
@@ -216,8 +246,8 @@ class Chunk:
assert self.bars[-1].start + self.period == native.start assert self.bars[-1].start + self.period == native.start
self.bars.append(native) self.bars.append(native)
else: else:
assert self.bars[index].start == native.start # index is within bounds
self.bars[index] = native self.bars[index].copy_from(native) # we do a copy not a value replacement because Natives get cached by the repo
def save(self): def save(self):
@@ -245,6 +275,7 @@ class OHLCRepository:
self._dir = base_dir self._dir = base_dir
self.cache = LFUCache(len(OHLC_PERIODS) * 1024) self.cache = LFUCache(len(OHLC_PERIODS) * 1024)
self.dirty_chunks = set() self.dirty_chunks = set()
self._quotes:Optional[dict[str,tuple[int,str]]] = None # todo tim
@property @property
def dir(self): def dir(self):
@@ -255,6 +286,17 @@ class OHLCRepository:
return self._dir return self._dir
@property
def quotes(self) -> dict[str,tuple[int,str]]:
if self._quotes is None:
try:
with open(os.path.join(self.dir, quotes_path()), 'r') as f:
self._quotes = json.load(f)
except FileNotFoundError:
self._quotes = {}
return self._quotes
@staticmethod @staticmethod
def add_symbol(symbol: str, period: timedelta = None): def add_symbol(symbol: str, period: timedelta = None):
if period is not None: if period is not None:
@@ -270,14 +312,15 @@ class OHLCRepository:
for period in OHLC_PERIODS: for period in OHLC_PERIODS:
self.update(symbol, period, time, price, create=create) self.update(symbol, period, time, price, create=create)
@staticmethod def update(self, symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, *, create: bool = True) \
def update(symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, *, create: bool = True) \
-> Optional[list[NativeOHLC]]: -> Optional[list[NativeOHLC]]:
""" """
if price is None, then bars are advanced based on the time but no new price is added to the series. if price is None, then bars are advanced based on the time but no new price is added to the series.
""" """
logname = f'{symbol} {period_name(period)}' logname = f'{symbol} {period_name(period)}'
# log.debug(f'Updating OHLC {logname} {minutely(time)} {price}') # log.debug(f'Updating OHLC {logname} {minutely(time)} {price}')
if price is not None:
self.quotes[symbol] = timestamp(time), str(price)
key = symbol, period key = symbol, period
# recent_ohlcs holds a list of "recent" NativeOHLC's stored as blockdata. we try to keep the recent array long # recent_ohlcs holds a list of "recent" NativeOHLC's stored as blockdata. we try to keep the recent array long
# enough to extend prior the root block time # enough to extend prior the root block time
@@ -344,6 +387,8 @@ class OHLCRepository:
for chunk in self.dirty_chunks: for chunk in self.dirty_chunks:
chunk.save() chunk.save()
self.dirty_chunks.clear() self.dirty_chunks.clear()
with open(os.path.join(self.dir, quotes_path()), 'w') as f:
json.dump(self.quotes, f)
class LightOHLCRepository (OHLCRepository): class LightOHLCRepository (OHLCRepository):
@@ -355,20 +400,32 @@ class LightOHLCRepository (OHLCRepository):
self.current:dict[tuple[str,timedelta],NativeOHLC] = {} # current bars keyed by symbol self.current:dict[tuple[str,timedelta],NativeOHLC] = {} # current bars keyed by symbol
self.dirty_bars = set() self.dirty_bars = set()
def light_update_all(self, symbol: str, time: datetime, price: dec): def light_update_all(self, symbol: str, time: datetime, price: Optional[dec]):
for period in OHLC_PERIODS: for period in OHLC_PERIODS:
self.light_update(symbol, period, time, price) self.light_update(symbol, period, time, price)
def light_update(self, symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, def light_update(self, symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None,
*, backfill=True): *, backfill=True):
if price is not None:
self.quotes[symbol] = timestamp(time), str(price)
start = ohlc_start_time(time, period) start = ohlc_start_time(time, period)
chunk = self.get_chunk(symbol, period, start) chunk = self.get_chunk(symbol, period, start)
key = symbol, period key = symbol, period
prev = self.current.get(key) prev = self.current.get(key)
if prev is None: if prev is None:
# cache miss. load from chunk.
prev = self.current[key] = chunk.bar_at(start) prev = self.current[key] = chunk.bar_at(start)
if prev is None: if prev is None:
bar = NativeOHLC(start, price, price, price, price) # not in cache or chunk. create new bar.
if price is not None:
close = price
else:
try:
close = dec(self.quotes[symbol][1])
except KeyError:
log.warning(f'light_update tried to advance time on {symbol} which has no previous price.')
return # no previous quote, no current price either
bar = NativeOHLC(start, price, price, price, close)
chunk.update(bar, backfill=backfill) chunk.update(bar, backfill=backfill)
self.dirty_chunks.add(chunk) self.dirty_chunks.add(chunk)
else: else:

View File

@@ -135,7 +135,7 @@ class BlockStateRunner(BlockProgressor):
So we implement polling as a workaround. So we implement polling as a workaround.
""" """
w3 = create_w3() w3 = await create_w3()
chain_id = await w3.eth.chain_id chain_id = await w3.eth.chain_id
chain = Blockchain.for_id(chain_id) chain = Blockchain.for_id(chain_id)
current_chain.set(chain) current_chain.set(chain)

File diff suppressed because one or more lines are too long

View File

@@ -12,7 +12,7 @@ from dexorder.blockchain.connection import create_w3
from dexorder.blockstate import current_blockstate from dexorder.blockstate import current_blockstate
from dexorder.blockstate.state import FinalizedBlockState from dexorder.blockstate.state import FinalizedBlockState
from dexorder.database.model import Block from dexorder.database.model import Block
from dexorder.database.model.block import current_block from dexorder.database.model.block import current_block, latest_block
from dexorder.progressor import BlockProgressor from dexorder.progressor import BlockProgressor
from dexorder.util.async_util import Maywaitable from dexorder.util.async_util import Maywaitable
@@ -62,11 +62,12 @@ class BlockWalker (BlockProgressor):
try: try:
latest_rawblock = await w3.eth.get_block('latest') latest_rawblock = await w3.eth.get_block('latest')
latest_height = latest_rawblock['number'] latest_height = latest_rawblock['number']
latest_block.set(Block.from_data(chain_id, latest_rawblock))
if prev_height is None or latest_height > prev_height: if prev_height is None or latest_height > prev_height:
prev_height = latest_height prev_height = latest_height
log.debug(f'polled new block {latest_height}') log.debug(f'polled new block {latest_height}')
promotion_height = latest_height - confirm_offset promotion_height = latest_height - confirm_offset
while processed_height <= promotion_height: while processed_height < promotion_height:
cur_height = min(promotion_height, processed_height+batch_size-1) cur_height = min(promotion_height, processed_height+batch_size-1)
block_data = await w3.eth.get_block(cur_height) block_data = await w3.eth.get_block(cur_height)
height = block_data['number'] height = block_data['number']
@@ -90,9 +91,14 @@ class BlockWalker (BlockProgressor):
db.session.commit() db.session.commit()
db.session.begin() db.session.begin()
processed_height = cur_height processed_height = cur_height
# if self.latest_callback:
# latest_block = Block(chain=chain.chain_id, height=latest_height,
# hash=latest_rawblock['hash'], parent=latest_rawblock['parentHash'],
# data=latest_rawblock)
# self.latest_callback(latest_block)
if not self.running: if not self.running:
break break
await asyncio.sleep(config.polling) await asyncio.sleep(config.polling or 1)
except Exception: except Exception:
log.exception('Exception in walker loop') log.exception('Exception in walker loop')
finally: finally: