diff --git a/flink/src/main/java/com/dexorder/flink/iceberg/SchemaInitializer.java b/flink/src/main/java/com/dexorder/flink/iceberg/SchemaInitializer.java index f8313b54..f3ca42c4 100644 --- a/flink/src/main/java/com/dexorder/flink/iceberg/SchemaInitializer.java +++ b/flink/src/main/java/com/dexorder/flink/iceberg/SchemaInitializer.java @@ -81,7 +81,7 @@ public class SchemaInitializer { // Bump this when the schema changes. Tables with a different (or missing) version // will be dropped and recreated. Increment by 1 for each incompatible change. // v1: open/high/low/close required; ingestor forward-fills interior gaps with previous close - private static final String OHLC_SCHEMA_VERSION = "1"; + private static final String OHLC_SCHEMA_VERSION = "5"; private static final String SCHEMA_VERSION_PROP = "app.schema.version"; private void initializeOhlcTable() { @@ -179,7 +179,7 @@ public class SchemaInitializer { // v2: removed tick_denom/base_denom/quote_denom; added Nautilus instrument fields // (price_precision, size_precision, tick_size, lot_size, min_notional, // margin_init, margin_maint, maker_fee, taker_fee, contract_multiplier) - private static final String SYMBOL_METADATA_SCHEMA_VERSION = "2"; + private static final String SYMBOL_METADATA_SCHEMA_VERSION = "5"; private void initializeSymbolMetadataTable() { TableIdentifier tableId = TableIdentifier.of(namespace, "symbol_metadata"); diff --git a/flink/src/main/java/com/dexorder/flink/publisher/OHLCBatchDeserializer.java b/flink/src/main/java/com/dexorder/flink/publisher/OHLCBatchDeserializer.java index 10972e31..b696fc86 100644 --- a/flink/src/main/java/com/dexorder/flink/publisher/OHLCBatchDeserializer.java +++ b/flink/src/main/java/com/dexorder/flink/publisher/OHLCBatchDeserializer.java @@ -65,10 +65,10 @@ public class OHLCBatchDeserializer implements DeserializationSchema