finalohlc bugfixes

This commit is contained in:
tim
2024-08-05 00:25:05 -04:00
parent 12ea3a988b
commit a1ba66787a

View File

@@ -73,12 +73,15 @@ class OHLCFilePath:
class OHLCFile: class OHLCFile:
@staticmethod @staticmethod
def get(base_dir: str, filepath: OHLCFilePath): def get(base_dir: str, filepath: OHLCFilePath):
key = base_dir, filepath key = base_dir, filepath
if key not in OHLCFile.cache: if key in OHLCFile._cache:
OHLCFile.cache[key] = OHLCFile(base_dir, filepath) return OHLCFile._cache[key]
return OHLCFile.cache[key] item = OHLCFile._closing.pop(key, False) or OHLCFile(base_dir, filepath)
OHLCFile._cache[key] = item
return item
def __init__(self, base_dir: str, filepath: OHLCFilePath): def __init__(self, base_dir: str, filepath: OHLCFilePath):
self.base_dir = base_dir self.base_dir = base_dir
@@ -93,7 +96,11 @@ class OHLCFile:
key, item = super().popitem() key, item = super().popitem()
item.close() item.close()
cache = OHLCFileCache(len(OHLC_PERIODS) * config.chunk_cache_size) _cache = OHLCFileCache(len(OHLC_PERIODS) * config.chunk_cache_size)
# We must wait to close files until the end of a flush, in case they have pending data. These items are no longer
# in the cache, but they are still active and not yet closed.
_closing:dict[tuple[str,OHLCFilePath],:'OHLCFile'] = {}
@property @property
def filename(self): def filename(self):
@@ -175,9 +182,9 @@ class OHLCFile:
else: else:
# load existing file # load existing file
line_number = 0 line_number = 0
row = None row = None # this will be the last row of the file that is not earlier than the earliest change
prev_line = None prev_line = None
offset = 0 # this will point to the start of the last row offset = self.file.tell() # this will end up pointing to the start of the last row
for line in self.file.readlines(): # all files should be small and can load at once for line in self.file.readlines(): # all files should be small and can load at once
line_number += 1 line_number += 1
row = line.decode('ascii').strip().split(',') row = line.decode('ascii').strip().split(',')
@@ -186,19 +193,28 @@ class OHLCFile:
if prev_line is not None: if prev_line is not None:
# advance cursor past the previous row # advance cursor past the previous row
offset += len(prev_line) offset += len(prev_line)
if int(row[0]) >= earliest_change: try:
# we are going to replace this row so stop walking forward and truncate the rest of the file if int(row[0]) >= earliest_change:
self.file.truncate(offset) # we are going to replace this row so stop walking forward and truncate the rest of the file
break self.file.truncate(offset)
break
except:
log.error(f'Couldn\'t read line {line_number} in {self.filename} row {row}')
raise
prev_line = line prev_line = line
if row is not None: if row is not None:
self._cur = [int(row[0]), *(dec(p) for p in row[1:])] # convert to int timestamp and dec prices try:
self._cur = [int(row[0]), *(dec(p) for p in row[1:])] # convert to int timestamp and dec prices
except:
log.error(f'problem in {self.filename}:{line_number} row {row}')
raise
# set the file's write pointer the start of the final row # set the file's write pointer the start of the final row
self.file.seek(offset) self.file.seek(offset)
# self._cur is now either None (empty file) or points to a natively-typed list representing the last row # self._cur is now either None (empty file) or points to a natively-typed list representing the last row
def close(self): def close(self):
self.file.close() OHLCFile._closing[self.base_dir, self.filepath] = self
class OHLCFileSeries: class OHLCFileSeries:
@@ -232,7 +248,6 @@ class OHLCFileSeries:
# load quote file # load quote file
# #
if self.quote_file is None: if self.quote_file is None:
quote_filename = self.quote_filename quote_filename = self.quote_filename
try: try:
self.quote_file = open(quote_filename, 'br+') self.quote_file = open(quote_filename, 'br+')
@@ -337,4 +352,9 @@ class FinalOHLCRepository:
for series in self.dirty_series: for series in self.dirty_series:
series.flush() series.flush()
self.dirty_series.clear() self.dirty_series.clear()
# We must wait to close files until the end of a flush, in case they have pending data.
# noinspection PyProtectedMember
for closing in OHLCFile._closing.values():
closing.file.close()
# noinspection PyProtectedMember
OHLCFile._closing.clear()