data fixes; indicator=>workspace sync
This commit is contained in:
@@ -80,7 +80,8 @@ public class SchemaInitializer {
|
||||
*/
|
||||
// Bump this when the schema changes. Tables with a different (or missing) version
|
||||
// will be dropped and recreated. Increment by 1 for each incompatible change.
|
||||
private static final String OHLC_SCHEMA_VERSION = "1";
|
||||
// v2: open/high/low/close changed from required to optional to support null gap bars
|
||||
private static final String OHLC_SCHEMA_VERSION = "2";
|
||||
private static final String SCHEMA_VERSION_PROP = "app.schema.version";
|
||||
|
||||
private void initializeOhlcTable() {
|
||||
@@ -109,7 +110,7 @@ public class SchemaInitializer {
|
||||
Table existing = catalog.loadTable(tableId);
|
||||
String existingVersion = existing.properties().get(SCHEMA_VERSION_PROP);
|
||||
if (!OHLC_SCHEMA_VERSION.equals(existingVersion)) {
|
||||
LOG.warn("Table {} has schema version '{}', expected '{}' — manual migration required",
|
||||
LOG.warn("Table {} has schema version '{}', expected '{}' — skipping (manual migration required if needed)",
|
||||
tableId, existingVersion, OHLC_SCHEMA_VERSION);
|
||||
}
|
||||
LOG.info("Table {} already exists at schema version {} — skipping creation", tableId, existingVersion);
|
||||
@@ -127,11 +128,11 @@ public class SchemaInitializer {
|
||||
required(2, "period_seconds", Types.IntegerType.get(), "OHLC period in seconds"),
|
||||
required(3, "timestamp", Types.LongType.get(), "Candle timestamp in microseconds since epoch"),
|
||||
|
||||
// OHLC price data
|
||||
required(4, "open", Types.LongType.get(), "Opening price"),
|
||||
required(5, "high", Types.LongType.get(), "Highest price"),
|
||||
required(6, "low", Types.LongType.get(), "Lowest price"),
|
||||
required(7, "close", Types.LongType.get(), "Closing price"),
|
||||
// OHLC price data — optional to support gap bars (null = no trades that period)
|
||||
optional(4, "open", Types.LongType.get(), "Opening price"),
|
||||
optional(5, "high", Types.LongType.get(), "Highest price"),
|
||||
optional(6, "low", Types.LongType.get(), "Lowest price"),
|
||||
optional(7, "close", Types.LongType.get(), "Closing price"),
|
||||
|
||||
// Volume data
|
||||
optional(8, "volume", Types.LongType.get(), "Total volume"),
|
||||
|
||||
@@ -65,11 +65,11 @@ public class OHLCBatchDeserializer implements DeserializationSchema<OHLCBatchWra
|
||||
rows.add(new OHLCBatchWrapper.OHLCRow(
|
||||
row.getTimestamp(),
|
||||
row.getTicker(),
|
||||
row.getOpen(),
|
||||
row.getHigh(),
|
||||
row.getLow(),
|
||||
row.getClose(),
|
||||
row.hasVolume() ? row.getVolume() : 0
|
||||
row.hasOpen() ? row.getOpen() : null,
|
||||
row.hasHigh() ? row.getHigh() : null,
|
||||
row.hasLow() ? row.getLow() : null,
|
||||
row.hasClose() ? row.getClose() : null,
|
||||
row.hasVolume() ? row.getVolume() : null
|
||||
));
|
||||
}
|
||||
|
||||
|
||||
@@ -107,21 +107,22 @@ public class OHLCBatchWrapper implements Serializable {
|
||||
}
|
||||
|
||||
/**
|
||||
* Single OHLC row
|
||||
* Single OHLC row. open/high/low/close/volume are nullable to support gap bars
|
||||
* (periods where no trades occurred).
|
||||
*/
|
||||
public static class OHLCRow implements Serializable {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private final long timestamp;
|
||||
private final String ticker;
|
||||
private final long open;
|
||||
private final long high;
|
||||
private final long low;
|
||||
private final long close;
|
||||
private final long volume;
|
||||
private final Long open; // null for gap bars
|
||||
private final Long high; // null for gap bars
|
||||
private final Long low; // null for gap bars
|
||||
private final Long close; // null for gap bars
|
||||
private final Long volume; // null when no volume data
|
||||
|
||||
public OHLCRow(long timestamp, String ticker, long open, long high,
|
||||
long low, long close, long volume) {
|
||||
public OHLCRow(long timestamp, String ticker, Long open, Long high,
|
||||
Long low, Long close, Long volume) {
|
||||
this.timestamp = timestamp;
|
||||
this.ticker = ticker;
|
||||
this.open = open;
|
||||
@@ -139,35 +140,37 @@ public class OHLCBatchWrapper implements Serializable {
|
||||
return ticker;
|
||||
}
|
||||
|
||||
public long getOpen() {
|
||||
public Long getOpen() {
|
||||
return open;
|
||||
}
|
||||
|
||||
public long getHigh() {
|
||||
public Long getHigh() {
|
||||
return high;
|
||||
}
|
||||
|
||||
public long getLow() {
|
||||
public Long getLow() {
|
||||
return low;
|
||||
}
|
||||
|
||||
public long getClose() {
|
||||
public Long getClose() {
|
||||
return close;
|
||||
}
|
||||
|
||||
public long getVolume() {
|
||||
public Long getVolume() {
|
||||
return volume;
|
||||
}
|
||||
|
||||
public boolean isGapBar() {
|
||||
return open == null && high == null && low == null && close == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "OHLCRow{" +
|
||||
"timestamp=" + timestamp +
|
||||
", ticker='" + ticker + '\'' +
|
||||
", open=" + open +
|
||||
", high=" + high +
|
||||
", low=" + low +
|
||||
", close=" + close +
|
||||
(isGapBar() ? ", gap=true" :
|
||||
", open=" + open + ", high=" + high + ", low=" + low + ", close=" + close) +
|
||||
", volume=" + volume +
|
||||
'}';
|
||||
}
|
||||
|
||||
@@ -77,11 +77,11 @@ public class HistoricalBatchWriter extends RichFlatMapFunction<OHLCBatchWrapper,
|
||||
record.setField("ticker", batch.getTicker());
|
||||
record.setField("period_seconds", batch.getPeriodSeconds());
|
||||
record.setField("timestamp", row.getTimestamp());
|
||||
record.setField("open", row.getOpen());
|
||||
record.setField("high", row.getHigh());
|
||||
record.setField("low", row.getLow());
|
||||
record.setField("close", row.getClose());
|
||||
record.setField("volume", row.getVolume() != 0 ? row.getVolume() : null);
|
||||
record.setField("open", row.getOpen()); // null for gap bars
|
||||
record.setField("high", row.getHigh()); // null for gap bars
|
||||
record.setField("low", row.getLow()); // null for gap bars
|
||||
record.setField("close", row.getClose()); // null for gap bars
|
||||
record.setField("volume", row.getVolume());
|
||||
record.setField("buy_vol", null);
|
||||
record.setField("sell_vol", null);
|
||||
record.setField("open_time", null);
|
||||
@@ -102,7 +102,13 @@ public class HistoricalBatchWriter extends RichFlatMapFunction<OHLCBatchWrapper,
|
||||
.appendFile(writer.toDataFile())
|
||||
.commit();
|
||||
|
||||
LOG.info("Committed {} rows to Iceberg for request_id={}", batch.getRowCount(), batch.getRequestId());
|
||||
long gapCount = batch.getRows().stream().filter(OHLCBatchWrapper.OHLCRow::isGapBar).count();
|
||||
if (gapCount > 0) {
|
||||
LOG.info("Committed {} rows ({} gap bars) to Iceberg for request_id={}",
|
||||
batch.getRowCount(), gapCount, batch.getRequestId());
|
||||
} else {
|
||||
LOG.info("Committed {} rows to Iceberg for request_id={}", batch.getRowCount(), batch.getRequestId());
|
||||
}
|
||||
|
||||
// Emit batch downstream only after successful commit
|
||||
out.collect(batch);
|
||||
|
||||
Reference in New Issue
Block a user