diff --git a/src/dexorder/bin/finaldata.py b/src/dexorder/bin/finaldata.py index 3b51aa4..329ad38 100644 --- a/src/dexorder/bin/finaldata.py +++ b/src/dexorder/bin/finaldata.py @@ -34,6 +34,7 @@ async def handle_backfill_uniswap_swaps(swaps: list[EventData]): data = await get_uniswap_data(swap) if data is not None: pool, time, price = data + log.debug(f'OHLC {pool["address"]} {time} {price}') ohlcs.light_update_all(pool['address'], time, price) def flush_callback(): diff --git a/src/dexorder/bin/mirror.py b/src/dexorder/bin/mirror.py index 1fbc2bb..722b840 100644 --- a/src/dexorder/bin/mirror.py +++ b/src/dexorder/bin/mirror.py @@ -67,7 +67,8 @@ async def main(): log.debug(f'\tmirror result {mirror_addr} {mirror_inverted}') mirror_pools.append((mirror_addr, mirror_inverted)) await write_metadata(pools, mirror_pools) - log.info('Mirroring pools') + delay = config.polling if config.polling > 0 else 1 + log.info(f'Mirroring pools every {delay} seconds') while True: proms = [] for pool_addr, (mirror_addr, mirror_inverted) in zip(pools, mirror_pools): @@ -78,7 +79,7 @@ async def main(): except Exception as x: log.exception(x) await asyncio.gather(*[await_mirror(*args) for args in proms]) - await asyncio.sleep(config.polling if config.polling > 0 else 1) + await asyncio.sleep(delay) if __name__ == '__main__': diff --git a/src/dexorder/ohlc.py b/src/dexorder/ohlc.py index e35dfcd..d66e0d0 100644 --- a/src/dexorder/ohlc.py +++ b/src/dexorder/ohlc.py @@ -387,8 +387,13 @@ class OHLCRepository: for chunk in self.dirty_chunks: chunk.save() self.dirty_chunks.clear() - with open(os.path.join(self.dir, quotes_path()), 'w') as f: - json.dump(self.quotes, f) + filepath = os.path.join(self.dir, quotes_path()) + for _ in range(2): + try: + with open(filepath, 'w') as f: + json.dump(self.quotes, f) + except FileNotFoundError: + os.makedirs(os.path.dirname(filepath), exist_ok=True) class LightOHLCRepository (OHLCRepository): @@ -409,14 +414,17 @@ class LightOHLCRepository (OHLCRepository): if price is not None: self.quotes[symbol] = timestamp(time), str(price) start = ohlc_start_time(time, period) + log.debug(f'OHLC start_time {start}') chunk = self.get_chunk(symbol, period, start) key = symbol, period prev = self.current.get(key) if prev is None: # cache miss. load from chunk. prev = self.current[key] = chunk.bar_at(start) + log.debug(f'loaded prev bar from chunk {prev}') if prev is None: # not in cache or chunk. create new bar. + log.debug(f'no prev bar') if price is not None: close = price else: @@ -425,7 +433,7 @@ class LightOHLCRepository (OHLCRepository): except KeyError: log.warning(f'light_update tried to advance time on {symbol} which has no previous price.') return # no previous quote, no current price either - bar = NativeOHLC(start, price, price, price, close) + bar = self.current[key] = NativeOHLC(start, price, price, price, close) chunk.update(bar, backfill=backfill) self.dirty_chunks.add(chunk) else: @@ -434,6 +442,7 @@ class LightOHLCRepository (OHLCRepository): chunk = self.get_chunk(symbol, period, bar.start) chunk.update(bar, backfill=backfill) self.dirty_chunks.add(chunk) + self.current[key] = updated[-1] def pub_ohlc(_series:str, key: OHLCKey, bars: list[NativeOHLC]):