Compare commits
10 Commits
67ba314d46
...
48fdfeeb3f
| Author | SHA1 | Date | |
|---|---|---|---|
| 48fdfeeb3f | |||
| bc6a196bfa | |||
| 3b2c58671b | |||
| b133999314 | |||
| 39be05adaa | |||
| 51852c1250 | |||
| 40d8d44676 | |||
| 2a95dd26df | |||
| 6844f73e4b | |||
| 829ec58f8f |
@@ -2,4 +2,4 @@ metadata = '' # this setting approves no tokens
|
|||||||
account = '${accounts.gas}'
|
account = '${accounts.gas}'
|
||||||
rpc_url = '${rpc_urls.arbitrum_alchemy}'
|
rpc_url = '${rpc_urls.arbitrum_alchemy}'
|
||||||
ws_url = '${rpc_urls.arbitrum_alchemy_ws}'
|
ws_url = '${rpc_urls.arbitrum_alchemy_ws}'
|
||||||
concurrent_rpc_connections=16
|
concurrent_rpc_connections=100
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
sqlalchemy
|
sqlalchemy
|
||||||
alembic
|
alembic
|
||||||
omegaconf
|
omegaconf
|
||||||
web3<7
|
web3
|
||||||
psycopg2-binary
|
psycopg2-binary
|
||||||
orjson
|
orjson
|
||||||
sortedcontainers
|
sortedcontainers
|
||||||
@@ -16,5 +16,15 @@ eth-bloom
|
|||||||
python-dateutil
|
python-dateutil
|
||||||
eth_abi
|
eth_abi
|
||||||
eth_utils
|
eth_utils
|
||||||
|
eth_typing
|
||||||
|
eth-keys
|
||||||
|
eth-account
|
||||||
|
eth-utils
|
||||||
|
eth-typing
|
||||||
pdpyras # pagerduty
|
pdpyras # pagerduty
|
||||||
numpy
|
numpy
|
||||||
|
bitarray
|
||||||
|
typing_extensions
|
||||||
|
requests
|
||||||
|
aiohttp
|
||||||
|
charset-normalizer
|
||||||
|
|||||||
@@ -84,6 +84,7 @@ class SwapOrder:
|
|||||||
minFillAmount: int
|
minFillAmount: int
|
||||||
amountIsInput: bool
|
amountIsInput: bool
|
||||||
outputDirectlyToOwner: bool
|
outputDirectlyToOwner: bool
|
||||||
|
inverted: bool
|
||||||
conditionalOrder: int
|
conditionalOrder: int
|
||||||
tranches: list['Tranche']
|
tranches: list['Tranche']
|
||||||
|
|
||||||
@@ -93,17 +94,19 @@ class SwapOrder:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def load(obj):
|
def load(obj):
|
||||||
return SwapOrder(obj[0], obj[1], Route.load(obj[2]), int(obj[3]), int(obj[4]), obj[5], obj[6], obj[7],
|
return SwapOrder(obj[0], obj[1], Route.load(obj[2]), int(obj[3]), int(obj[4]), obj[5], obj[6], obj[7], obj[8],
|
||||||
[Tranche.load(t) for t in obj[8]])
|
[Tranche.load(t) for t in obj[9]])
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def load_from_chain(obj):
|
def load_from_chain(obj):
|
||||||
return SwapOrder(obj[0], obj[1], Route.load(obj[2]), int(obj[3]), int(obj[4]), obj[5], obj[6], obj[7],
|
return SwapOrder(obj[0], obj[1], Route.load(obj[2]), int(obj[3]), int(obj[4]), obj[5], obj[6], obj[7], obj[8],
|
||||||
[Tranche.load_from_chain(t) for t in obj[8]])
|
[Tranche.load_from_chain(t) for t in obj[9]])
|
||||||
|
|
||||||
def dump(self):
|
def dump(self):
|
||||||
return (self.tokenIn, self.tokenOut, self.route.dump(), str(self.amount), str(self.minFillAmount), self.amountIsInput,
|
return (self.tokenIn, self.tokenOut, self.route.dump(),
|
||||||
self.outputDirectlyToOwner, self.conditionalOrder, [t.dump() for t in self.tranches])
|
str(self.amount), str(self.minFillAmount), self.amountIsInput,
|
||||||
|
self.outputDirectlyToOwner, self.inverted, self.conditionalOrder,
|
||||||
|
[t.dump() for t in self.tranches])
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
msg = f'''
|
msg = f'''
|
||||||
@@ -113,6 +116,7 @@ SwapOrder
|
|||||||
exchange: {self.route.exchange.name, self.route.fee}
|
exchange: {self.route.exchange.name, self.route.fee}
|
||||||
amount: {"input" if self.amountIsInput else "output"} {self.amount}{" to owner" if self.outputDirectlyToOwner else ""}
|
amount: {"input" if self.amountIsInput else "output"} {self.amount}{" to owner" if self.outputDirectlyToOwner else ""}
|
||||||
minFill: {self.minFillAmount}
|
minFill: {self.minFillAmount}
|
||||||
|
inverted: {self.inverted}
|
||||||
tranches:
|
tranches:
|
||||||
'''
|
'''
|
||||||
for tranche in self.tranches:
|
for tranche in self.tranches:
|
||||||
|
|||||||
@@ -88,16 +88,21 @@ async def main():
|
|||||||
db_state = DbState(BlockData.by_opt('db'))
|
db_state = DbState(BlockData.by_opt('db'))
|
||||||
with db.transaction():
|
with db.transaction():
|
||||||
state = await db_state.load()
|
state = await db_state.load()
|
||||||
if state is None:
|
if state is None:
|
||||||
log.info('no state in database')
|
log.info('no state in database')
|
||||||
if redis_state:
|
if redis_state:
|
||||||
await redis_state.clear()
|
await redis_state.clear()
|
||||||
else:
|
else:
|
||||||
current_blockstate.set(state)
|
current_blockstate.set(state)
|
||||||
current_fork.set(state.root_fork)
|
current_fork.set(state.root_fork)
|
||||||
global activate_orders_needed
|
global activate_orders_needed
|
||||||
activate_orders_needed = True
|
activate_orders_needed = True
|
||||||
log.info(f'loaded state from db for root block {state.root_branch.height}')
|
log.info(f'loaded state from db for root block {state.root_branch.height}')
|
||||||
|
if redis_state:
|
||||||
|
# load initial state
|
||||||
|
log.info('initializing redis with root state')
|
||||||
|
await redis_state.save(state.root_fork, state.diffs_by_branch[state.root_branch.id])
|
||||||
|
|
||||||
runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None)
|
runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None)
|
||||||
setup_logevent_triggers(runner)
|
setup_logevent_triggers(runner)
|
||||||
# OHLC printing hard-disabled for main. Use the finaldata process.
|
# OHLC printing hard-disabled for main. Use the finaldata process.
|
||||||
|
|||||||
@@ -3,8 +3,11 @@ import logging
|
|||||||
from random import random
|
from random import random
|
||||||
from typing import Any, Optional, Union
|
from typing import Any, Optional, Union
|
||||||
|
|
||||||
|
import requests
|
||||||
|
import requests.adapters
|
||||||
|
|
||||||
# noinspection PyPackageRequirements
|
# noinspection PyPackageRequirements
|
||||||
from aiohttp import ClientResponseError
|
from aiohttp import ClientResponseError, ClientSession, BaseConnector, ClientTimeout, TCPConnector
|
||||||
from eth_typing import URI
|
from eth_typing import URI
|
||||||
from hexbytes import HexBytes
|
from hexbytes import HexBytes
|
||||||
from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider
|
from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider
|
||||||
@@ -35,7 +38,11 @@ async def create_w3(rpc_url=None, account=NARG, autosign=True, name='default'):
|
|||||||
# assert all(w3.eth.chain_id == chain_id for w3 in self.w3s) # all rpc urls must be the same blockchain
|
# assert all(w3.eth.chain_id == chain_id for w3 in self.w3s) # all rpc urls must be the same blockchain
|
||||||
# self.w3iter = itertools.cycle(self.w3s)
|
# self.w3iter = itertools.cycle(self.w3s)
|
||||||
url = resolve_rpc_url(rpc_url)
|
url = resolve_rpc_url(rpc_url)
|
||||||
w3 = AsyncWeb3(RetryHTTPProvider(url))
|
connector = TCPConnector(limit=config.concurrent_rpc_connections)
|
||||||
|
session = ClientSession(connector=connector, timeout=ClientTimeout(config.rpc_timeout))
|
||||||
|
http_provider = RetryHTTPProvider(url)
|
||||||
|
await http_provider.cache_async_session(session)
|
||||||
|
w3 = AsyncWeb3(http_provider)
|
||||||
# w3.middleware_onion.inject(geth_poa_middleware, layer=0) # todo is this line needed?
|
# w3.middleware_onion.inject(geth_poa_middleware, layer=0) # todo is this line needed?
|
||||||
# w3.middleware_onion.add(simple_cache_middleware)
|
# w3.middleware_onion.add(simple_cache_middleware)
|
||||||
# log.debug(f'middleware {list(w3.middleware_onion.middlewares)}')
|
# log.debug(f'middleware {list(w3.middleware_onion.middlewares)}')
|
||||||
@@ -129,8 +136,9 @@ class RetryHTTPProvider (AsyncHTTPProvider):
|
|||||||
self.rate_allowed.set()
|
self.rate_allowed.set()
|
||||||
|
|
||||||
async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
|
async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
|
||||||
|
MAX_TRIES = 10
|
||||||
wait = 0
|
wait = 0
|
||||||
while True:
|
for _ in range(MAX_TRIES):
|
||||||
try:
|
try:
|
||||||
async with self.in_flight:
|
async with self.in_flight:
|
||||||
await self.rate_allowed.wait()
|
await self.rate_allowed.wait()
|
||||||
@@ -152,3 +160,4 @@ class RetryHTTPProvider (AsyncHTTPProvider):
|
|||||||
self.rate_allowed.set()
|
self.rate_allowed.set()
|
||||||
# finally:
|
# finally:
|
||||||
# log.debug(f'Ended request of RPC call {method}')
|
# log.debug(f'Ended request of RPC call {method}')
|
||||||
|
raise IOError(f'Could not query rpc server after {MAX_TRIES} tries: {method} {params}')
|
||||||
|
|||||||
@@ -91,22 +91,25 @@ class DbState(SeriesCollection):
|
|||||||
root_fork = state.init_root_block(root_block)
|
root_fork = state.init_root_block(root_block)
|
||||||
for series, data in self.datas.items():
|
for series, data in self.datas.items():
|
||||||
if data.opts.get('db') != 'lazy':
|
if data.opts.get('db') != 'lazy':
|
||||||
log.debug(f'loading series {series}')
|
|
||||||
t = data.type
|
t = data.type
|
||||||
|
count = 0
|
||||||
if t == DataType.SET:
|
if t == DataType.SET:
|
||||||
# noinspection PyTypeChecker
|
# noinspection PyTypeChecker
|
||||||
var: BlockSet = BlockData.registry[series]
|
var: BlockSet = BlockData.registry[series]
|
||||||
for row in db.session.query(SeriesSet).where(SeriesSet.chain == chain_id, SeriesSet.series == data.series2str(series)):
|
for row in db.session.query(SeriesSet).where(SeriesSet.chain == chain_id, SeriesSet.series == data.series2str(series)):
|
||||||
key = data.str2key(row.key)
|
key = data.str2key(row.key)
|
||||||
log.debug(f'load {series} {key}')
|
# log.debug(f'load {series} {key}')
|
||||||
state.set(root_fork, var.series, key, None, overwrite=False)
|
state.set(root_fork, var.series, key, None, overwrite=False)
|
||||||
|
count += 1
|
||||||
elif t == DataType.DICT:
|
elif t == DataType.DICT:
|
||||||
# noinspection PyTypeChecker
|
# noinspection PyTypeChecker
|
||||||
var: BlockDict = BlockData.registry[series]
|
var: BlockDict = BlockData.registry[series]
|
||||||
for row in db.session.query(SeriesDict).where(SeriesDict.chain == chain_id, SeriesDict.series == data.series2str(series)):
|
for row in db.session.query(SeriesDict).where(SeriesDict.chain == chain_id, SeriesDict.series == data.series2str(series)):
|
||||||
key = data.str2key(row.key)
|
key = data.str2key(row.key)
|
||||||
value = data.str2value(row.value)
|
value = data.str2value(row.value)
|
||||||
log.debug(f'load {series} {key} {value}')
|
# log.debug(f'load {series} {key} {value}')
|
||||||
state.set(root_fork, var.series, key, value, overwrite=True)
|
state.set(root_fork, var.series, key, value, overwrite=True)
|
||||||
|
count += 1
|
||||||
|
log.debug(f'loaded {count} rows from db series {series}')
|
||||||
log.debug(f'loaded db state from block {root_block}')
|
log.debug(f'loaded db state from block {root_block}')
|
||||||
return state
|
return state
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ class Config:
|
|||||||
|
|
||||||
concurrent_rpc_connections: int = 4
|
concurrent_rpc_connections: int = 4
|
||||||
parallel_logevent_queries: bool = True
|
parallel_logevent_queries: bool = True
|
||||||
|
rpc_timeout: float = 3
|
||||||
polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead
|
polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead
|
||||||
backfill: int = 0 # if not 0, then runner will initialize an empty database by backfilling from the given block height. Use negative numbers to indicate a number of blocks before the present.
|
backfill: int = 0 # if not 0, then runner will initialize an empty database by backfilling from the given block height. Use negative numbers to indicate a number of blocks before the present.
|
||||||
|
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ async def handle_order_placed(event: EventData):
|
|||||||
log.debug(f'raw order status {obj}')
|
log.debug(f'raw order status {obj}')
|
||||||
order = Order.create(addr, index, event['transactionHash'], obj)
|
order = Order.create(addr, index, event['transactionHash'], obj)
|
||||||
await activate_order(order)
|
await activate_order(order)
|
||||||
log.debug(f'new order{order}')
|
log.debug(f'new order {order.key}{order}')
|
||||||
|
|
||||||
|
|
||||||
async def handle_swap_filled(event: EventData):
|
async def handle_swap_filled(event: EventData):
|
||||||
|
|||||||
@@ -154,6 +154,8 @@ def execute_tranches():
|
|||||||
for tk, proof in active_tranches.items():
|
for tk, proof in active_tranches.items():
|
||||||
if tk not in inflight_execution_requests:
|
if tk not in inflight_execution_requests:
|
||||||
new_execution_requests.append((tk, proof))
|
new_execution_requests.append((tk, proof))
|
||||||
|
else:
|
||||||
|
log.debug(f'execute {tk} already in flight')
|
||||||
# todo order requests and batch
|
# todo order requests and batch
|
||||||
for tk, proof in new_execution_requests:
|
for tk, proof in new_execution_requests:
|
||||||
create_execution_request(tk, proof)
|
create_execution_request(tk, proof)
|
||||||
|
|||||||
@@ -81,8 +81,8 @@ class OrderTriggers:
|
|||||||
|
|
||||||
def fill(self, tx: str, time: int, tranche_index, amount_in, amount_out, fee, next_activation_time):
|
def fill(self, tx: str, time: int, tranche_index, amount_in, amount_out, fee, next_activation_time):
|
||||||
self.order.add_fill(tx, time, tranche_index, amount_in, amount_out, fee, next_activation_time)
|
self.order.add_fill(tx, time, tranche_index, amount_in, amount_out, fee, next_activation_time)
|
||||||
if self.triggers[tranche_index].fill(amount_in, amount_out, next_activation_time):
|
self.triggers[tranche_index].fill(amount_in, amount_out, next_activation_time)
|
||||||
self.check_complete()
|
self.check_complete()
|
||||||
|
|
||||||
def expire_tranche(self, tranche_index):
|
def expire_tranche(self, tranche_index):
|
||||||
self.triggers[tranche_index].expire()
|
self.triggers[tranche_index].expire()
|
||||||
@@ -97,7 +97,7 @@ def start_trigger_updates():
|
|||||||
"""
|
"""
|
||||||
Called near the beginning of block handling to initialize any per-block trigger data structures
|
Called near the beginning of block handling to initialize any per-block trigger data structures
|
||||||
"""
|
"""
|
||||||
log.debug(f'current clock is {current_clock.get().timestamp} vs {timestamp()} => Δ{current_clock.get().timestamp-timestamp():.1f}s')
|
# log.debug(f'current clock is {current_clock.get().timestamp} vs {timestamp()} => Δ{current_clock.get().timestamp-timestamp():.1f}s')
|
||||||
TimeTrigger.update_all(current_clock.get().timestamp)
|
TimeTrigger.update_all(current_clock.get().timestamp)
|
||||||
PriceLineTrigger.clear_data()
|
PriceLineTrigger.clear_data()
|
||||||
|
|
||||||
@@ -119,28 +119,27 @@ async def end_trigger_updates():
|
|||||||
Call once after all updates have been handled. This updates the active_tranches array based on final trigger state.
|
Call once after all updates have been handled. This updates the active_tranches array based on final trigger state.
|
||||||
"""
|
"""
|
||||||
PriceLineTrigger.end_updates(current_clock.get().timestamp)
|
PriceLineTrigger.end_updates(current_clock.get().timestamp)
|
||||||
# dirty can change
|
|
||||||
global _dirty
|
|
||||||
while _dirty:
|
while _dirty:
|
||||||
working_set = _dirty
|
tk = _dirty.pop()
|
||||||
_dirty = set()
|
log.debug(f'check dirty tranche {tk}')
|
||||||
for tk in working_set:
|
if _trigger_state.get(tk,0) == 0:
|
||||||
log.debug(f'check dirty tranche {tk}')
|
# all clear for execution. add to active list with any necessary proofs
|
||||||
if _trigger_state.get(tk,0) == 0:
|
active_tranches[tk] = PriceProof(0)
|
||||||
# all clear for execution. add to active list with any necessary proofs
|
log.debug(f'active tranche {tk}')
|
||||||
active_tranches[tk] = PriceProof(0)
|
else:
|
||||||
else:
|
# blocked by one or more triggers being False (nonzero mask)
|
||||||
# blocked by one or more triggers being False (nonzero mask)
|
reason = ', '.join(t.name for t in TrancheTrigger.all[tk].blocking_triggers)
|
||||||
# check expiry constraint
|
log.debug(f'tranche {tk} blocked by {reason}')
|
||||||
try:
|
# check expiry constraint
|
||||||
TrancheTrigger.all[tk].check_expire()
|
try:
|
||||||
except KeyError:
|
TrancheTrigger.all[tk].check_expire()
|
||||||
pass
|
except KeyError:
|
||||||
# delete from active list.
|
pass
|
||||||
try:
|
# delete from active list.
|
||||||
del active_tranches[tk]
|
try:
|
||||||
except KeyError:
|
del active_tranches[tk]
|
||||||
pass
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def _Order__disable_triggers(order):
|
def _Order__disable_triggers(order):
|
||||||
@@ -168,11 +167,20 @@ _dirty:set[TrancheKey] = set()
|
|||||||
|
|
||||||
|
|
||||||
class Trigger:
|
class Trigger:
|
||||||
def __init__(self, position: int, tk: TrancheKey, value: bool):
|
|
||||||
|
class TriggerType (Enum):
|
||||||
|
Balance = 0
|
||||||
|
Activation = 1
|
||||||
|
Expiration = 2
|
||||||
|
MinLine = 3
|
||||||
|
MaxLine = 4
|
||||||
|
|
||||||
|
def __init__(self, trigger_type: TriggerType, tk: TrancheKey, value: bool):
|
||||||
"""
|
"""
|
||||||
position is the bit position of the boolean result in the tranche's constraint bitfield.
|
position is the bit position of the boolean result in the tranche's constraint bitfield.
|
||||||
"""
|
"""
|
||||||
self.position = position
|
self.position = trigger_type.value
|
||||||
|
self.name = trigger_type.name
|
||||||
self.tk = tk
|
self.tk = tk
|
||||||
self.value = value
|
self.value = value
|
||||||
_dirty.add(self.tk)
|
_dirty.add(self.tk)
|
||||||
@@ -187,15 +195,13 @@ class Trigger:
|
|||||||
|
|
||||||
@value.setter
|
@value.setter
|
||||||
def value(self, value):
|
def value(self, value):
|
||||||
state = _trigger_state.get(self.tk,0)
|
state = _trigger_state.get(self.tk, 0)
|
||||||
old = state & (1 << self.position) == 0 # NOTE: inverted
|
if not value: # this conditional is inverted
|
||||||
|
_trigger_state[self.tk] = state | (1 << self.position) # set
|
||||||
if value != old:
|
else:
|
||||||
_dirty.add(self.tk)
|
_trigger_state[self.tk] = state & ~(1 << self.position) # clear
|
||||||
if not value: # this conditional is inverted
|
_dirty.add(self.tk)
|
||||||
_trigger_state[self.tk] = state | (1 << self.position) # set
|
if value != (state & (1 << self.position) == 0):
|
||||||
else:
|
|
||||||
_trigger_state[self.tk] = state & ~(1 << self.position) # clear
|
|
||||||
self._value_changed()
|
self._value_changed()
|
||||||
|
|
||||||
|
|
||||||
@@ -249,7 +255,7 @@ class BalanceTrigger (Trigger):
|
|||||||
return BalanceTrigger(tk, value)
|
return BalanceTrigger(tk, value)
|
||||||
|
|
||||||
def __init__(self, tk: TrancheKey, value: bool):
|
def __init__(self, tk: TrancheKey, value: bool):
|
||||||
super().__init__(0, tk, value)
|
super().__init__(Trigger.TriggerType.Balance, tk, value)
|
||||||
self.order = Order.of(self.tk)
|
self.order = Order.of(self.tk)
|
||||||
self.vault_token = self.tk.vault, self.order.status.order.tokenIn
|
self.vault_token = self.tk.vault, self.order.status.order.tokenIn
|
||||||
BalanceTrigger.by_vault_token[self.vault_token].add(self)
|
BalanceTrigger.by_vault_token[self.vault_token].add(self)
|
||||||
@@ -279,7 +285,7 @@ class TimeTrigger (Trigger):
|
|||||||
return TimeTrigger(is_start, tk, time, time_now)
|
return TimeTrigger(is_start, tk, time, time_now)
|
||||||
|
|
||||||
def __init__(self, is_start: bool, tk: TrancheKey, time: int, time_now: int):
|
def __init__(self, is_start: bool, tk: TrancheKey, time: int, time_now: int):
|
||||||
trigger_type = 1 if is_start else 2
|
trigger_type = Trigger.TriggerType.Activation if is_start else Trigger.TriggerType.Expiration
|
||||||
in_future = time_now >= time
|
in_future = time_now >= time
|
||||||
value = in_future is is_start
|
value = in_future is is_start
|
||||||
self.is_start = is_start
|
self.is_start = is_start
|
||||||
@@ -314,7 +320,7 @@ class TimeTrigger (Trigger):
|
|||||||
time = self._time
|
time = self._time
|
||||||
next_active = time_now < time
|
next_active = time_now < time
|
||||||
activate = not self.active and next_active
|
activate = not self.active and next_active
|
||||||
log.debug(f'update_active {self} | {self.active} => {next_active} = {activate}')
|
# log.debug(f'update_active {self} | {self.active} => {next_active} = {activate}')
|
||||||
if activate:
|
if activate:
|
||||||
# log.debug(f'adding time trigger {self}')
|
# log.debug(f'adding time trigger {self}')
|
||||||
TimeTrigger.all.add(self)
|
TimeTrigger.all.add(self)
|
||||||
@@ -358,6 +364,7 @@ class TimeTrigger (Trigger):
|
|||||||
|
|
||||||
class PriceLineTrigger (Trigger):
|
class PriceLineTrigger (Trigger):
|
||||||
by_pool: dict[str,set['PriceLineTrigger']] = defaultdict(set)
|
by_pool: dict[str,set['PriceLineTrigger']] = defaultdict(set)
|
||||||
|
diagonals: set['PriceLineTrigger'] = set()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def create(tk: TrancheKey, inverted: bool, price: dec, line: Line, is_min: bool, is_barrier: bool):
|
def create(tk: TrancheKey, inverted: bool, price: dec, line: Line, is_min: bool, is_barrier: bool):
|
||||||
@@ -374,14 +381,19 @@ class PriceLineTrigger (Trigger):
|
|||||||
price_now = 1/price_now
|
price_now = 1/price_now
|
||||||
activated = value_now < price_now if is_min else value_now > price_now
|
activated = value_now < price_now if is_min else value_now > price_now
|
||||||
log.debug(f'initial price line {value_now} {"<" if is_min else ">"} {price_now} {activated}')
|
log.debug(f'initial price line {value_now} {"<" if is_min else ">"} {price_now} {activated}')
|
||||||
super().__init__(3 if is_min else 4, tk, activated)
|
trigger_type = Trigger.TriggerType.MinLine if is_min else Trigger.TriggerType.MaxLine
|
||||||
|
super().__init__(trigger_type, tk, activated)
|
||||||
self.inverted = inverted
|
self.inverted = inverted
|
||||||
self.line = line
|
self.line = line
|
||||||
self.is_min = is_min
|
self.is_min = is_min
|
||||||
self.is_barrier = is_barrier
|
self.is_barrier = is_barrier
|
||||||
self.pool_address = Order.of(tk).pool_address
|
self.pool_address = Order.of(tk).pool_address
|
||||||
self.index: Optional[int] = None
|
self.index: Optional[int] = None
|
||||||
|
self.active = True
|
||||||
|
self.last_price = price_now
|
||||||
PriceLineTrigger.by_pool[self.pool_address].add(self)
|
PriceLineTrigger.by_pool[self.pool_address].add(self)
|
||||||
|
if self.line.slope != 0:
|
||||||
|
PriceLineTrigger.diagonals.add(self)
|
||||||
|
|
||||||
# lines that need evaluating add their data to these arrays, which are then sent to SIMD for evaluation. each
|
# lines that need evaluating add their data to these arrays, which are then sent to SIMD for evaluation. each
|
||||||
# array must always have the same size as the others.
|
# array must always have the same size as the others.
|
||||||
@@ -405,21 +417,32 @@ class PriceLineTrigger (Trigger):
|
|||||||
# oldPrice = price
|
# oldPrice = price
|
||||||
if self.inverted:
|
if self.inverted:
|
||||||
price = 1/price
|
price = 1/price
|
||||||
|
self.last_price = price
|
||||||
log.debug(f'price trigger {price}')
|
log.debug(f'price trigger {price}')
|
||||||
if self not in PriceLineTrigger.triggers_set:
|
if self not in PriceLineTrigger.triggers_set:
|
||||||
self.index = len(PriceLineTrigger.y)
|
self.add_computation(price)
|
||||||
PriceLineTrigger.y.append(price)
|
|
||||||
PriceLineTrigger.m.append(self.line.slope)
|
|
||||||
PriceLineTrigger.b.append(self.line.intercept)
|
|
||||||
PriceLineTrigger.sign.append(1 if self.is_min else -1)
|
|
||||||
PriceLineTrigger.triggers.append(self)
|
|
||||||
PriceLineTrigger.triggers_set.add(self)
|
|
||||||
else:
|
else:
|
||||||
# update an existing equation's price
|
# update an existing equation's price
|
||||||
PriceLineTrigger.y[self.index] = price
|
PriceLineTrigger.y[self.index] = price
|
||||||
|
|
||||||
|
def touch(self):
|
||||||
|
if self not in PriceLineTrigger.triggers_set:
|
||||||
|
self.add_computation(self.last_price)
|
||||||
|
|
||||||
|
def add_computation(self, price):
|
||||||
|
self.index = len(PriceLineTrigger.y)
|
||||||
|
PriceLineTrigger.y.append(price)
|
||||||
|
PriceLineTrigger.m.append(self.line.slope)
|
||||||
|
PriceLineTrigger.b.append(self.line.intercept)
|
||||||
|
PriceLineTrigger.sign.append(1 if self.is_min else -1)
|
||||||
|
PriceLineTrigger.triggers.append(self)
|
||||||
|
PriceLineTrigger.triggers_set.add(self)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def end_updates(time: int):
|
def end_updates(time: int):
|
||||||
|
for t in PriceLineTrigger.diagonals:
|
||||||
|
t.touch() # always evaluate any line with a slope
|
||||||
|
|
||||||
# here we use numpy to compute all dirty lines using SIMD
|
# here we use numpy to compute all dirty lines using SIMD
|
||||||
y, m, b, sign = map(np.array,
|
y, m, b, sign = map(np.array,
|
||||||
(PriceLineTrigger.y, PriceLineTrigger.m, PriceLineTrigger.b, PriceLineTrigger.sign))
|
(PriceLineTrigger.y, PriceLineTrigger.m, PriceLineTrigger.b, PriceLineTrigger.sign))
|
||||||
@@ -433,14 +456,19 @@ class PriceLineTrigger (Trigger):
|
|||||||
PriceLineTrigger.clear_data()
|
PriceLineTrigger.clear_data()
|
||||||
|
|
||||||
def handle_result(self, value: bool):
|
def handle_result(self, value: bool):
|
||||||
if not self.is_barrier or value: # barriers that are False do not update their values to False
|
if self.active and (not self.is_barrier or value): # barriers that are False do not update their values to False
|
||||||
self.value = value
|
self.value = value
|
||||||
|
|
||||||
def remove(self):
|
def remove(self):
|
||||||
|
self.active = False
|
||||||
try:
|
try:
|
||||||
PriceLineTrigger.by_pool[self.pool_address].remove(self)
|
PriceLineTrigger.by_pool[self.pool_address].remove(self)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
|
try:
|
||||||
|
PriceLineTrigger.diagonals.remove(self)
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
async def activate_orders():
|
async def activate_orders():
|
||||||
@@ -497,11 +525,10 @@ class TrancheTrigger:
|
|||||||
# tranche minLine and maxLine are relative to the pool and will be flipped from the orderspec if the
|
# tranche minLine and maxLine are relative to the pool and will be flipped from the orderspec if the
|
||||||
# order is buying the base and selling the quote.
|
# order is buying the base and selling the quote.
|
||||||
price = pool_prices[pool['address']] * dec(10) ** -pool['decimals']
|
price = pool_prices[pool['address']] * dec(10) ** -pool['decimals']
|
||||||
inverted = order.order.tokenIn != pool['base']
|
inverted = order.order.inverted
|
||||||
assert inverted and order.order.tokenIn == pool['quote'] or not inverted and order.order.tokenIn == pool['base']
|
|
||||||
min_trigger = PriceLineTrigger.create(tk, inverted, price, tranche.minLine, True, tranche.minIsBarrier)
|
min_trigger = PriceLineTrigger.create(tk, inverted, price, tranche.minLine, True, tranche.minIsBarrier)
|
||||||
max_trigger = PriceLineTrigger.create(tk, inverted, price, tranche.maxLine, False, tranche.maxIsBarrier)
|
max_trigger = PriceLineTrigger.create(tk, inverted, price, tranche.maxLine, False, tranche.maxIsBarrier)
|
||||||
return TrancheTrigger(order, tk, balance_trigger, activation_trigger, expiration_trigger, min_trigger, max_trigger)
|
return TrancheTrigger(order, tk, balance_trigger, activation_trigger, expiration_trigger, min_trigger, max_trigger, tranche.marketOrder)
|
||||||
|
|
||||||
def __init__(self, order: Order, tk: TrancheKey,
|
def __init__(self, order: Order, tk: TrancheKey,
|
||||||
balance_trigger: BalanceTrigger,
|
balance_trigger: BalanceTrigger,
|
||||||
@@ -509,6 +536,7 @@ class TrancheTrigger:
|
|||||||
expiration_trigger: Optional[TimeTrigger],
|
expiration_trigger: Optional[TimeTrigger],
|
||||||
min_trigger: Optional[PriceLineTrigger],
|
min_trigger: Optional[PriceLineTrigger],
|
||||||
max_trigger: Optional[PriceLineTrigger],
|
max_trigger: Optional[PriceLineTrigger],
|
||||||
|
market_order: bool,
|
||||||
):
|
):
|
||||||
assert order.key.vault == tk.vault and order.key.order_index == tk.order_index
|
assert order.key.vault == tk.vault and order.key.order_index == tk.order_index
|
||||||
tranche = order.order.tranches[tk.tranche_index]
|
tranche = order.order.tranches[tk.tranche_index]
|
||||||
@@ -521,6 +549,7 @@ class TrancheTrigger:
|
|||||||
self.expiration_trigger = expiration_trigger
|
self.expiration_trigger = expiration_trigger
|
||||||
self.min_trigger = min_trigger
|
self.min_trigger = min_trigger
|
||||||
self.max_trigger = max_trigger
|
self.max_trigger = max_trigger
|
||||||
|
self.market_order = market_order
|
||||||
|
|
||||||
self.slippage = tranche.minLine.intercept if tranche.marketOrder else 0
|
self.slippage = tranche.minLine.intercept if tranche.marketOrder else 0
|
||||||
self.slash_count = 0
|
self.slash_count = 0
|
||||||
@@ -536,6 +565,18 @@ class TrancheTrigger:
|
|||||||
log.debug(f'Tranche {tk} initial status {self.status} {self}')
|
log.debug(f'Tranche {tk} initial status {self.status} {self}')
|
||||||
|
|
||||||
|
|
||||||
|
@property
|
||||||
|
def order_trigger(self):
|
||||||
|
return OrderTriggers.instances[self.tk.order_key]
|
||||||
|
|
||||||
|
|
||||||
|
@property
|
||||||
|
def blocking_triggers(self):
|
||||||
|
triggers = [self.balance_trigger, self.activation_trigger, self.expiration_trigger,
|
||||||
|
self.min_trigger, self.max_trigger]
|
||||||
|
return [t for t in triggers if t is not None and not t.value]
|
||||||
|
|
||||||
|
|
||||||
def fill(self, _amount_in, _amount_out, _next_activation_time ):
|
def fill(self, _amount_in, _amount_out, _next_activation_time ):
|
||||||
if _next_activation_time != DISTANT_PAST:
|
if _next_activation_time != DISTANT_PAST:
|
||||||
# rate limit
|
# rate limit
|
||||||
@@ -551,8 +592,9 @@ class TrancheTrigger:
|
|||||||
self.disable()
|
self.disable()
|
||||||
else:
|
else:
|
||||||
order_log.debug(f'tranche part-filled {self.tk} in:{_amount_in} out:{_amount_out} remaining:{remaining}')
|
order_log.debug(f'tranche part-filled {self.tk} in:{_amount_in} out:{_amount_out} remaining:{remaining}')
|
||||||
|
if self.market_order:
|
||||||
|
self.expire()
|
||||||
self.slash_count = 0 # reset slash count
|
self.slash_count = 0 # reset slash count
|
||||||
return filled
|
|
||||||
|
|
||||||
def touch(self):
|
def touch(self):
|
||||||
_dirty.add(self.tk)
|
_dirty.add(self.tk)
|
||||||
@@ -560,7 +602,7 @@ class TrancheTrigger:
|
|||||||
def check_expire(self):
|
def check_expire(self):
|
||||||
# if the expiration constraint has become False then the tranche can never execute again
|
# if the expiration constraint has become False then the tranche can never execute again
|
||||||
if self.expiration_trigger is not None and not self.expiration_trigger:
|
if self.expiration_trigger is not None and not self.expiration_trigger:
|
||||||
OrderTriggers.instances[self.tk.order_key].expire_tranche(self.tk.tranche_index)
|
self.order_trigger.expire_tranche(self.tk.tranche_index)
|
||||||
|
|
||||||
def expire(self):
|
def expire(self):
|
||||||
if self.closed:
|
if self.closed:
|
||||||
@@ -573,6 +615,7 @@ class TrancheTrigger:
|
|||||||
order_log.warning(f'tranche KILLED {self.tk}')
|
order_log.warning(f'tranche KILLED {self.tk}')
|
||||||
self.status = TrancheState.Error
|
self.status = TrancheState.Error
|
||||||
self.disable()
|
self.disable()
|
||||||
|
self.order_trigger.check_complete()
|
||||||
|
|
||||||
def slash(self):
|
def slash(self):
|
||||||
# slash() is called when an execute() transaction on this tranche reverts without a recognized reason.
|
# slash() is called when an execute() transaction on this tranche reverts without a recognized reason.
|
||||||
|
|||||||
@@ -63,7 +63,7 @@ async def load_token(address: str) -> Optional[OldTokenDict]:
|
|||||||
except CONTRACT_ERRORS:
|
except CONTRACT_ERRORS:
|
||||||
log.warning(f'token {address} has no decimals()')
|
log.warning(f'token {address} has no decimals()')
|
||||||
decimals = 0
|
decimals = 0
|
||||||
approved = config.metadata is None
|
approved = False # never approve new coins
|
||||||
chain_id = current_chain.get().id
|
chain_id = current_chain.get().id
|
||||||
symbol = await symbol_prom
|
symbol = await symbol_prom
|
||||||
name = await name_prom
|
name = await name_prom
|
||||||
@@ -78,5 +78,5 @@ async def load_token(address: str) -> Optional[OldTokenDict]:
|
|||||||
td['symbol'] = md['s']
|
td['symbol'] = md['s']
|
||||||
if 'd' in md:
|
if 'd' in md:
|
||||||
td['decimals'] = md['d']
|
td['decimals'] = md['d']
|
||||||
log.debug(f'new token {name} {symbol} {address}{" approved" if approved else ""}')
|
log.debug(f'new token {name} {symbol} {address}')
|
||||||
return td
|
return td
|
||||||
|
|||||||
Reference in New Issue
Block a user