Compare commits

..

10 Commits

Author SHA1 Message Date
tim
48fdfeeb3f redis init bugfix 2024-11-01 13:32:58 -04:00
tim
bc6a196bfa redis init bugfix 2024-11-01 13:16:45 -04:00
tim
3b2c58671b beta announcement 2024-10-30 13:55:25 -04:00
tim
b133999314 never approve new Tokens 2024-10-29 20:38:14 -04:00
tim
39be05adaa trigger bugfixes 2024-10-29 00:34:41 -04:00
tim
51852c1250 SwapOrder.inverted 2024-10-28 13:23:06 -04:00
tim
40d8d44676 increased concurrent_rpc_connections to 100 in prod 2024-10-23 17:28:24 -04:00
tim
2a95dd26df rpc connection timeouts 2024-10-23 16:45:43 -04:00
tim
6844f73e4b log tweaks 2024-10-23 16:04:28 -04:00
tim
829ec58f8f log tweaks 2024-10-23 13:37:11 -04:00
11 changed files with 156 additions and 79 deletions

View File

@@ -2,4 +2,4 @@ metadata = '' # this setting approves no tokens
account = '${accounts.gas}'
rpc_url = '${rpc_urls.arbitrum_alchemy}'
ws_url = '${rpc_urls.arbitrum_alchemy_ws}'
concurrent_rpc_connections=16
concurrent_rpc_connections=100

View File

@@ -1,7 +1,7 @@
sqlalchemy
alembic
omegaconf
web3<7
web3
psycopg2-binary
orjson
sortedcontainers
@@ -16,5 +16,15 @@ eth-bloom
python-dateutil
eth_abi
eth_utils
eth_typing
eth-keys
eth-account
eth-utils
eth-typing
pdpyras # pagerduty
numpy
bitarray
typing_extensions
requests
aiohttp
charset-normalizer

View File

@@ -84,6 +84,7 @@ class SwapOrder:
minFillAmount: int
amountIsInput: bool
outputDirectlyToOwner: bool
inverted: bool
conditionalOrder: int
tranches: list['Tranche']
@@ -93,17 +94,19 @@ class SwapOrder:
@staticmethod
def load(obj):
return SwapOrder(obj[0], obj[1], Route.load(obj[2]), int(obj[3]), int(obj[4]), obj[5], obj[6], obj[7],
[Tranche.load(t) for t in obj[8]])
return SwapOrder(obj[0], obj[1], Route.load(obj[2]), int(obj[3]), int(obj[4]), obj[5], obj[6], obj[7], obj[8],
[Tranche.load(t) for t in obj[9]])
@staticmethod
def load_from_chain(obj):
return SwapOrder(obj[0], obj[1], Route.load(obj[2]), int(obj[3]), int(obj[4]), obj[5], obj[6], obj[7],
[Tranche.load_from_chain(t) for t in obj[8]])
return SwapOrder(obj[0], obj[1], Route.load(obj[2]), int(obj[3]), int(obj[4]), obj[5], obj[6], obj[7], obj[8],
[Tranche.load_from_chain(t) for t in obj[9]])
def dump(self):
return (self.tokenIn, self.tokenOut, self.route.dump(), str(self.amount), str(self.minFillAmount), self.amountIsInput,
self.outputDirectlyToOwner, self.conditionalOrder, [t.dump() for t in self.tranches])
return (self.tokenIn, self.tokenOut, self.route.dump(),
str(self.amount), str(self.minFillAmount), self.amountIsInput,
self.outputDirectlyToOwner, self.inverted, self.conditionalOrder,
[t.dump() for t in self.tranches])
def __str__(self):
msg = f'''
@@ -113,6 +116,7 @@ SwapOrder
exchange: {self.route.exchange.name, self.route.fee}
amount: {"input" if self.amountIsInput else "output"} {self.amount}{" to owner" if self.outputDirectlyToOwner else ""}
minFill: {self.minFillAmount}
inverted: {self.inverted}
tranches:
'''
for tranche in self.tranches:

View File

@@ -88,16 +88,21 @@ async def main():
db_state = DbState(BlockData.by_opt('db'))
with db.transaction():
state = await db_state.load()
if state is None:
log.info('no state in database')
if redis_state:
await redis_state.clear()
else:
current_blockstate.set(state)
current_fork.set(state.root_fork)
global activate_orders_needed
activate_orders_needed = True
log.info(f'loaded state from db for root block {state.root_branch.height}')
if state is None:
log.info('no state in database')
if redis_state:
await redis_state.clear()
else:
current_blockstate.set(state)
current_fork.set(state.root_fork)
global activate_orders_needed
activate_orders_needed = True
log.info(f'loaded state from db for root block {state.root_branch.height}')
if redis_state:
# load initial state
log.info('initializing redis with root state')
await redis_state.save(state.root_fork, state.diffs_by_branch[state.root_branch.id])
runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None)
setup_logevent_triggers(runner)
# OHLC printing hard-disabled for main. Use the finaldata process.

View File

@@ -3,8 +3,11 @@ import logging
from random import random
from typing import Any, Optional, Union
import requests
import requests.adapters
# noinspection PyPackageRequirements
from aiohttp import ClientResponseError
from aiohttp import ClientResponseError, ClientSession, BaseConnector, ClientTimeout, TCPConnector
from eth_typing import URI
from hexbytes import HexBytes
from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider
@@ -35,7 +38,11 @@ async def create_w3(rpc_url=None, account=NARG, autosign=True, name='default'):
# assert all(w3.eth.chain_id == chain_id for w3 in self.w3s) # all rpc urls must be the same blockchain
# self.w3iter = itertools.cycle(self.w3s)
url = resolve_rpc_url(rpc_url)
w3 = AsyncWeb3(RetryHTTPProvider(url))
connector = TCPConnector(limit=config.concurrent_rpc_connections)
session = ClientSession(connector=connector, timeout=ClientTimeout(config.rpc_timeout))
http_provider = RetryHTTPProvider(url)
await http_provider.cache_async_session(session)
w3 = AsyncWeb3(http_provider)
# w3.middleware_onion.inject(geth_poa_middleware, layer=0) # todo is this line needed?
# w3.middleware_onion.add(simple_cache_middleware)
# log.debug(f'middleware {list(w3.middleware_onion.middlewares)}')
@@ -129,8 +136,9 @@ class RetryHTTPProvider (AsyncHTTPProvider):
self.rate_allowed.set()
async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
MAX_TRIES = 10
wait = 0
while True:
for _ in range(MAX_TRIES):
try:
async with self.in_flight:
await self.rate_allowed.wait()
@@ -152,3 +160,4 @@ class RetryHTTPProvider (AsyncHTTPProvider):
self.rate_allowed.set()
# finally:
# log.debug(f'Ended request of RPC call {method}')
raise IOError(f'Could not query rpc server after {MAX_TRIES} tries: {method} {params}')

View File

@@ -91,22 +91,25 @@ class DbState(SeriesCollection):
root_fork = state.init_root_block(root_block)
for series, data in self.datas.items():
if data.opts.get('db') != 'lazy':
log.debug(f'loading series {series}')
t = data.type
count = 0
if t == DataType.SET:
# noinspection PyTypeChecker
var: BlockSet = BlockData.registry[series]
for row in db.session.query(SeriesSet).where(SeriesSet.chain == chain_id, SeriesSet.series == data.series2str(series)):
key = data.str2key(row.key)
log.debug(f'load {series} {key}')
# log.debug(f'load {series} {key}')
state.set(root_fork, var.series, key, None, overwrite=False)
count += 1
elif t == DataType.DICT:
# noinspection PyTypeChecker
var: BlockDict = BlockData.registry[series]
for row in db.session.query(SeriesDict).where(SeriesDict.chain == chain_id, SeriesDict.series == data.series2str(series)):
key = data.str2key(row.key)
value = data.str2value(row.value)
log.debug(f'load {series} {key} {value}')
# log.debug(f'load {series} {key} {value}')
state.set(root_fork, var.series, key, value, overwrite=True)
count += 1
log.debug(f'loaded {count} rows from db series {series}')
log.debug(f'loaded db state from block {root_block}')
return state

View File

@@ -25,6 +25,7 @@ class Config:
concurrent_rpc_connections: int = 4
parallel_logevent_queries: bool = True
rpc_timeout: float = 3
polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead
backfill: int = 0 # if not 0, then runner will initialize an empty database by backfilling from the given block height. Use negative numbers to indicate a number of blocks before the present.

View File

@@ -52,7 +52,7 @@ async def handle_order_placed(event: EventData):
log.debug(f'raw order status {obj}')
order = Order.create(addr, index, event['transactionHash'], obj)
await activate_order(order)
log.debug(f'new order{order}')
log.debug(f'new order {order.key}{order}')
async def handle_swap_filled(event: EventData):

View File

@@ -154,6 +154,8 @@ def execute_tranches():
for tk, proof in active_tranches.items():
if tk not in inflight_execution_requests:
new_execution_requests.append((tk, proof))
else:
log.debug(f'execute {tk} already in flight')
# todo order requests and batch
for tk, proof in new_execution_requests:
create_execution_request(tk, proof)

View File

@@ -81,8 +81,8 @@ class OrderTriggers:
def fill(self, tx: str, time: int, tranche_index, amount_in, amount_out, fee, next_activation_time):
self.order.add_fill(tx, time, tranche_index, amount_in, amount_out, fee, next_activation_time)
if self.triggers[tranche_index].fill(amount_in, amount_out, next_activation_time):
self.check_complete()
self.triggers[tranche_index].fill(amount_in, amount_out, next_activation_time)
self.check_complete()
def expire_tranche(self, tranche_index):
self.triggers[tranche_index].expire()
@@ -97,7 +97,7 @@ def start_trigger_updates():
"""
Called near the beginning of block handling to initialize any per-block trigger data structures
"""
log.debug(f'current clock is {current_clock.get().timestamp} vs {timestamp()} => Δ{current_clock.get().timestamp-timestamp():.1f}s')
# log.debug(f'current clock is {current_clock.get().timestamp} vs {timestamp()} => Δ{current_clock.get().timestamp-timestamp():.1f}s')
TimeTrigger.update_all(current_clock.get().timestamp)
PriceLineTrigger.clear_data()
@@ -119,28 +119,27 @@ async def end_trigger_updates():
Call once after all updates have been handled. This updates the active_tranches array based on final trigger state.
"""
PriceLineTrigger.end_updates(current_clock.get().timestamp)
# dirty can change
global _dirty
while _dirty:
working_set = _dirty
_dirty = set()
for tk in working_set:
log.debug(f'check dirty tranche {tk}')
if _trigger_state.get(tk,0) == 0:
# all clear for execution. add to active list with any necessary proofs
active_tranches[tk] = PriceProof(0)
else:
# blocked by one or more triggers being False (nonzero mask)
# check expiry constraint
try:
TrancheTrigger.all[tk].check_expire()
except KeyError:
pass
# delete from active list.
try:
del active_tranches[tk]
except KeyError:
pass
tk = _dirty.pop()
log.debug(f'check dirty tranche {tk}')
if _trigger_state.get(tk,0) == 0:
# all clear for execution. add to active list with any necessary proofs
active_tranches[tk] = PriceProof(0)
log.debug(f'active tranche {tk}')
else:
# blocked by one or more triggers being False (nonzero mask)
reason = ', '.join(t.name for t in TrancheTrigger.all[tk].blocking_triggers)
log.debug(f'tranche {tk} blocked by {reason}')
# check expiry constraint
try:
TrancheTrigger.all[tk].check_expire()
except KeyError:
pass
# delete from active list.
try:
del active_tranches[tk]
except KeyError:
pass
def _Order__disable_triggers(order):
@@ -168,11 +167,20 @@ _dirty:set[TrancheKey] = set()
class Trigger:
def __init__(self, position: int, tk: TrancheKey, value: bool):
class TriggerType (Enum):
Balance = 0
Activation = 1
Expiration = 2
MinLine = 3
MaxLine = 4
def __init__(self, trigger_type: TriggerType, tk: TrancheKey, value: bool):
"""
position is the bit position of the boolean result in the tranche's constraint bitfield.
"""
self.position = position
self.position = trigger_type.value
self.name = trigger_type.name
self.tk = tk
self.value = value
_dirty.add(self.tk)
@@ -187,15 +195,13 @@ class Trigger:
@value.setter
def value(self, value):
state = _trigger_state.get(self.tk,0)
old = state & (1 << self.position) == 0 # NOTE: inverted
if value != old:
_dirty.add(self.tk)
if not value: # this conditional is inverted
_trigger_state[self.tk] = state | (1 << self.position) # set
else:
_trigger_state[self.tk] = state & ~(1 << self.position) # clear
state = _trigger_state.get(self.tk, 0)
if not value: # this conditional is inverted
_trigger_state[self.tk] = state | (1 << self.position) # set
else:
_trigger_state[self.tk] = state & ~(1 << self.position) # clear
_dirty.add(self.tk)
if value != (state & (1 << self.position) == 0):
self._value_changed()
@@ -249,7 +255,7 @@ class BalanceTrigger (Trigger):
return BalanceTrigger(tk, value)
def __init__(self, tk: TrancheKey, value: bool):
super().__init__(0, tk, value)
super().__init__(Trigger.TriggerType.Balance, tk, value)
self.order = Order.of(self.tk)
self.vault_token = self.tk.vault, self.order.status.order.tokenIn
BalanceTrigger.by_vault_token[self.vault_token].add(self)
@@ -279,7 +285,7 @@ class TimeTrigger (Trigger):
return TimeTrigger(is_start, tk, time, time_now)
def __init__(self, is_start: bool, tk: TrancheKey, time: int, time_now: int):
trigger_type = 1 if is_start else 2
trigger_type = Trigger.TriggerType.Activation if is_start else Trigger.TriggerType.Expiration
in_future = time_now >= time
value = in_future is is_start
self.is_start = is_start
@@ -314,7 +320,7 @@ class TimeTrigger (Trigger):
time = self._time
next_active = time_now < time
activate = not self.active and next_active
log.debug(f'update_active {self} | {self.active} => {next_active} = {activate}')
# log.debug(f'update_active {self} | {self.active} => {next_active} = {activate}')
if activate:
# log.debug(f'adding time trigger {self}')
TimeTrigger.all.add(self)
@@ -358,6 +364,7 @@ class TimeTrigger (Trigger):
class PriceLineTrigger (Trigger):
by_pool: dict[str,set['PriceLineTrigger']] = defaultdict(set)
diagonals: set['PriceLineTrigger'] = set()
@staticmethod
def create(tk: TrancheKey, inverted: bool, price: dec, line: Line, is_min: bool, is_barrier: bool):
@@ -374,14 +381,19 @@ class PriceLineTrigger (Trigger):
price_now = 1/price_now
activated = value_now < price_now if is_min else value_now > price_now
log.debug(f'initial price line {value_now} {"<" if is_min else ">"} {price_now} {activated}')
super().__init__(3 if is_min else 4, tk, activated)
trigger_type = Trigger.TriggerType.MinLine if is_min else Trigger.TriggerType.MaxLine
super().__init__(trigger_type, tk, activated)
self.inverted = inverted
self.line = line
self.is_min = is_min
self.is_barrier = is_barrier
self.pool_address = Order.of(tk).pool_address
self.index: Optional[int] = None
self.active = True
self.last_price = price_now
PriceLineTrigger.by_pool[self.pool_address].add(self)
if self.line.slope != 0:
PriceLineTrigger.diagonals.add(self)
# lines that need evaluating add their data to these arrays, which are then sent to SIMD for evaluation. each
# array must always have the same size as the others.
@@ -405,21 +417,32 @@ class PriceLineTrigger (Trigger):
# oldPrice = price
if self.inverted:
price = 1/price
self.last_price = price
log.debug(f'price trigger {price}')
if self not in PriceLineTrigger.triggers_set:
self.index = len(PriceLineTrigger.y)
PriceLineTrigger.y.append(price)
PriceLineTrigger.m.append(self.line.slope)
PriceLineTrigger.b.append(self.line.intercept)
PriceLineTrigger.sign.append(1 if self.is_min else -1)
PriceLineTrigger.triggers.append(self)
PriceLineTrigger.triggers_set.add(self)
self.add_computation(price)
else:
# update an existing equation's price
PriceLineTrigger.y[self.index] = price
def touch(self):
if self not in PriceLineTrigger.triggers_set:
self.add_computation(self.last_price)
def add_computation(self, price):
self.index = len(PriceLineTrigger.y)
PriceLineTrigger.y.append(price)
PriceLineTrigger.m.append(self.line.slope)
PriceLineTrigger.b.append(self.line.intercept)
PriceLineTrigger.sign.append(1 if self.is_min else -1)
PriceLineTrigger.triggers.append(self)
PriceLineTrigger.triggers_set.add(self)
@staticmethod
def end_updates(time: int):
for t in PriceLineTrigger.diagonals:
t.touch() # always evaluate any line with a slope
# here we use numpy to compute all dirty lines using SIMD
y, m, b, sign = map(np.array,
(PriceLineTrigger.y, PriceLineTrigger.m, PriceLineTrigger.b, PriceLineTrigger.sign))
@@ -433,14 +456,19 @@ class PriceLineTrigger (Trigger):
PriceLineTrigger.clear_data()
def handle_result(self, value: bool):
if not self.is_barrier or value: # barriers that are False do not update their values to False
if self.active and (not self.is_barrier or value): # barriers that are False do not update their values to False
self.value = value
def remove(self):
self.active = False
try:
PriceLineTrigger.by_pool[self.pool_address].remove(self)
except KeyError:
pass
try:
PriceLineTrigger.diagonals.remove(self)
except KeyError:
pass
async def activate_orders():
@@ -497,11 +525,10 @@ class TrancheTrigger:
# tranche minLine and maxLine are relative to the pool and will be flipped from the orderspec if the
# order is buying the base and selling the quote.
price = pool_prices[pool['address']] * dec(10) ** -pool['decimals']
inverted = order.order.tokenIn != pool['base']
assert inverted and order.order.tokenIn == pool['quote'] or not inverted and order.order.tokenIn == pool['base']
inverted = order.order.inverted
min_trigger = PriceLineTrigger.create(tk, inverted, price, tranche.minLine, True, tranche.minIsBarrier)
max_trigger = PriceLineTrigger.create(tk, inverted, price, tranche.maxLine, False, tranche.maxIsBarrier)
return TrancheTrigger(order, tk, balance_trigger, activation_trigger, expiration_trigger, min_trigger, max_trigger)
return TrancheTrigger(order, tk, balance_trigger, activation_trigger, expiration_trigger, min_trigger, max_trigger, tranche.marketOrder)
def __init__(self, order: Order, tk: TrancheKey,
balance_trigger: BalanceTrigger,
@@ -509,6 +536,7 @@ class TrancheTrigger:
expiration_trigger: Optional[TimeTrigger],
min_trigger: Optional[PriceLineTrigger],
max_trigger: Optional[PriceLineTrigger],
market_order: bool,
):
assert order.key.vault == tk.vault and order.key.order_index == tk.order_index
tranche = order.order.tranches[tk.tranche_index]
@@ -521,6 +549,7 @@ class TrancheTrigger:
self.expiration_trigger = expiration_trigger
self.min_trigger = min_trigger
self.max_trigger = max_trigger
self.market_order = market_order
self.slippage = tranche.minLine.intercept if tranche.marketOrder else 0
self.slash_count = 0
@@ -536,6 +565,18 @@ class TrancheTrigger:
log.debug(f'Tranche {tk} initial status {self.status} {self}')
@property
def order_trigger(self):
return OrderTriggers.instances[self.tk.order_key]
@property
def blocking_triggers(self):
triggers = [self.balance_trigger, self.activation_trigger, self.expiration_trigger,
self.min_trigger, self.max_trigger]
return [t for t in triggers if t is not None and not t.value]
def fill(self, _amount_in, _amount_out, _next_activation_time ):
if _next_activation_time != DISTANT_PAST:
# rate limit
@@ -551,8 +592,9 @@ class TrancheTrigger:
self.disable()
else:
order_log.debug(f'tranche part-filled {self.tk} in:{_amount_in} out:{_amount_out} remaining:{remaining}')
if self.market_order:
self.expire()
self.slash_count = 0 # reset slash count
return filled
def touch(self):
_dirty.add(self.tk)
@@ -560,7 +602,7 @@ class TrancheTrigger:
def check_expire(self):
# if the expiration constraint has become False then the tranche can never execute again
if self.expiration_trigger is not None and not self.expiration_trigger:
OrderTriggers.instances[self.tk.order_key].expire_tranche(self.tk.tranche_index)
self.order_trigger.expire_tranche(self.tk.tranche_index)
def expire(self):
if self.closed:
@@ -573,6 +615,7 @@ class TrancheTrigger:
order_log.warning(f'tranche KILLED {self.tk}')
self.status = TrancheState.Error
self.disable()
self.order_trigger.check_complete()
def slash(self):
# slash() is called when an execute() transaction on this tranche reverts without a recognized reason.

View File

@@ -63,7 +63,7 @@ async def load_token(address: str) -> Optional[OldTokenDict]:
except CONTRACT_ERRORS:
log.warning(f'token {address} has no decimals()')
decimals = 0
approved = config.metadata is None
approved = False # never approve new coins
chain_id = current_chain.get().id
symbol = await symbol_prom
name = await name_prom
@@ -78,5 +78,5 @@ async def load_token(address: str) -> Optional[OldTokenDict]:
td['symbol'] = md['s']
if 'd' in md:
td['decimals'] = md['d']
log.debug(f'new token {name} {symbol} {address}{" approved" if approved else ""}')
log.debug(f'new token {name} {symbol} {address}')
return td