Expand model tag support: add GLM-5.1, simplify Anthropic IDs, scan tags anywhere in message
- Flink update_bars debouncing - update_bars subscription idempotency bugfix - Price decimal correction bugfix of previous commit - Add GLM-5.1 model tag alongside renamed GLM-5 - Use short Anthropic model IDs (sonnet/haiku/opus) instead of full version strings - Allow @tags anywhere in message content, not just at start - Return hasOtherContent flag instead of trimmed rest string - Only trigger greeting stream when tag has no other content - Update workspace knowledge base references to platform/workspace and platform/shapes - Hierarchical knowledge base catalog - 151 Trading Strategies knowledge base articles - Shapes knowledge base article - MutateShapes tool instead of workspace patch
This commit is contained in:
@@ -269,7 +269,7 @@ public class TradingFlinkApp {
|
||||
|
||||
DataStream<RealtimeBar> barStream = tickStream
|
||||
.keyBy(TickWrapper::getTicker)
|
||||
.flatMap(new RealtimeBarFunction(periods))
|
||||
.process(new RealtimeBarFunction(periods))
|
||||
.setParallelism(1);
|
||||
|
||||
barStream.addSink(new RealtimeBarPublisher(notificationEndpoint))
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
package com.dexorder.flink.publisher;
|
||||
|
||||
import org.apache.flink.api.common.functions.RichFlatMapFunction;
|
||||
import org.apache.flink.api.common.state.MapState;
|
||||
import org.apache.flink.api.common.state.MapStateDescriptor;
|
||||
import org.apache.flink.api.common.state.ValueState;
|
||||
import org.apache.flink.api.common.state.ValueStateDescriptor;
|
||||
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
|
||||
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -20,11 +22,16 @@ import org.slf4j.LoggerFactory;
|
||||
* one tick interval (~10s for realtime polling).
|
||||
*
|
||||
* Emits two types of bars per tick:
|
||||
* - Open bar (isClosed=false): the current accumulator state, every tick.
|
||||
* Topic: "{ticker}|ohlc:{period}:open" — consumed by charts for live price display.
|
||||
* - Closed bar (isClosed=true): emitted once when a window boundary is crossed.
|
||||
* - Open bar (isClosed=false): current accumulator state, debounced via processing-time
|
||||
* timer. Multiple ticks within DEBOUNCE_MS share a single emission. Emitted on
|
||||
* topic "{ticker}|ohlc:{period}:open" — consumed by charts for live price display.
|
||||
* - Closed bar (isClosed=true): emitted immediately when a window boundary is crossed.
|
||||
* Topic: "{ticker}|ohlc:{period}" — consumed by strategies/triggers.
|
||||
*
|
||||
* Debouncing: open bars are not emitted per-tick. Instead the first tick in a batch
|
||||
* registers a processing-time timer (DEBOUNCE_MS in the future). onTimer() emits
|
||||
* the final accumulated state once, after the Kafka poll queue drains.
|
||||
*
|
||||
* Replay protection: ticks whose trade timestamp predates a period's current window start
|
||||
* are discarded (prevents Kafka replay from contaminating current bars). Open bars are
|
||||
* additionally suppressed until the first live tick (within LIVE_TICK_THRESHOLD_MS of now)
|
||||
@@ -40,14 +47,19 @@ import org.slf4j.LoggerFactory;
|
||||
* [6] tickCount
|
||||
* [7] valid (1 = seeded or fresh window, 0 = mid-window cold start — open bars suppressed)
|
||||
*/
|
||||
public class RealtimeBarFunction extends RichFlatMapFunction<TickWrapper, RealtimeBar> {
|
||||
public class RealtimeBarFunction extends KeyedProcessFunction<String, TickWrapper, RealtimeBar> {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RealtimeBarFunction.class);
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
// Ticks within this many ms of wall-clock time are considered live (vs. Kafka catch-up).
|
||||
private static final long LIVE_TICK_THRESHOLD_MS = 30_000L;
|
||||
// Open bars are debounced: first tick in a batch registers a timer; onTimer emits once.
|
||||
private static final long DEBOUNCE_MS = 50L;
|
||||
|
||||
private final int[] periods;
|
||||
private transient MapState<Integer, long[]> accumState;
|
||||
// Tracks the timestamp of the pending debounce timer (null if none registered).
|
||||
private transient ValueState<Long> pendingTimerTs;
|
||||
// Suppresses open bar emissions during Kafka catch-up; set to true on first live tick.
|
||||
private transient boolean caughtUp = false;
|
||||
|
||||
@@ -66,10 +78,12 @@ public class RealtimeBarFunction extends RichFlatMapFunction<TickWrapper, Realti
|
||||
PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO
|
||||
);
|
||||
accumState = getRuntimeContext().getMapState(desc);
|
||||
pendingTimerTs = getRuntimeContext().getState(
|
||||
new ValueStateDescriptor<>("pendingTimerTs", Long.class));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flatMap(TickWrapper tick, Collector<RealtimeBar> out) throws Exception {
|
||||
public void processElement(TickWrapper tick, Context ctx, Collector<RealtimeBar> out) throws Exception {
|
||||
if (tick == null) return;
|
||||
|
||||
long nowMs = System.currentTimeMillis();
|
||||
@@ -88,6 +102,8 @@ public class RealtimeBarFunction extends RichFlatMapFunction<TickWrapper, Realti
|
||||
LOG.info("Caught up to live data: ticker={}", tick.getTicker());
|
||||
}
|
||||
|
||||
boolean needsOpenBarTimer = false;
|
||||
|
||||
for (int period : periods) {
|
||||
long periodMs = period * 1000L;
|
||||
long windowStart = (nowMs / periodMs) * periodMs;
|
||||
@@ -112,7 +128,6 @@ public class RealtimeBarFunction extends RichFlatMapFunction<TickWrapper, Realti
|
||||
accumState.put(period, seeded);
|
||||
LOG.info("Applied seed: ticker={}, period={}s, windowStart={}", tick.getTicker(), period, windowStart);
|
||||
} else if (tick.getSeedPeriodSeconds() == period) {
|
||||
// Period matched but seed was not applied — log the mismatch reason
|
||||
LOG.info("Seed not applied: ticker={}, period={}s, accumNull={}, seedWindow={}, currentWindow={}",
|
||||
tick.getTicker(), period, accum == null, tick.getSeedWindowStartMs(), windowStart);
|
||||
}
|
||||
@@ -132,7 +147,7 @@ public class RealtimeBarFunction extends RichFlatMapFunction<TickWrapper, Realti
|
||||
LOG.info("Cold-start (no seed): ticker={}, period={}s, valid=0, open bars suppressed", tick.getTicker(), period);
|
||||
|
||||
} else if (accum[5] != windowStart) {
|
||||
// Window boundary crossed — emit closed bar, then start a fresh valid window
|
||||
// Window boundary crossed — emit closed bar immediately, then start a fresh valid window
|
||||
if (accum[6] > 0) {
|
||||
out.collect(toBar(tick.getTicker(), period, accum, true));
|
||||
LOG.debug("Emitted closed bar: ticker={}, period={}s, windowStart={}, ticks={}",
|
||||
@@ -141,7 +156,7 @@ public class RealtimeBarFunction extends RichFlatMapFunction<TickWrapper, Realti
|
||||
long[] newAccum = openWindow(tick, windowStart, true);
|
||||
accumState.put(period, newAccum);
|
||||
if (caughtUp) {
|
||||
out.collect(toBar(tick.getTicker(), period, newAccum, false));
|
||||
needsOpenBarTimer = true;
|
||||
}
|
||||
|
||||
} else {
|
||||
@@ -153,12 +168,48 @@ public class RealtimeBarFunction extends RichFlatMapFunction<TickWrapper, Realti
|
||||
accum[6]++; // tick count
|
||||
accumState.put(period, accum);
|
||||
if (accum[7] == 1 && caughtUp) {
|
||||
out.collect(toBar(tick.getTicker(), period, accum, false));
|
||||
needsOpenBarTimer = true;
|
||||
} else if (accum[7] == 0 && caughtUp) {
|
||||
LOG.debug("Open bar suppressed (valid=0, no seed): ticker={}, period={}s", tick.getTicker(), period);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Register a debounce timer for open bar emission (if not already pending).
|
||||
// The timer fires after DEBOUNCE_MS, emitting the final accumulated state once
|
||||
// for all ticks that arrived in the same Kafka poll batch.
|
||||
if (needsOpenBarTimer && pendingTimerTs.value() == null) {
|
||||
long timerTs = ctx.timerService().currentProcessingTime() + DEBOUNCE_MS;
|
||||
ctx.timerService().registerProcessingTimeTimer(timerTs);
|
||||
pendingTimerTs.update(timerTs);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fires after DEBOUNCE_MS: emits the current open bar state for all valid periods.
|
||||
* By this point the Kafka poll queue has drained, so this represents one combined
|
||||
* update for all ticks that arrived in the batch.
|
||||
*/
|
||||
@Override
|
||||
public void onTimer(long timestamp, OnTimerContext ctx, Collector<RealtimeBar> out) throws Exception {
|
||||
pendingTimerTs.clear();
|
||||
if (!caughtUp) return;
|
||||
|
||||
String ticker = ctx.getCurrentKey();
|
||||
long nowMs = System.currentTimeMillis();
|
||||
|
||||
for (int period : periods) {
|
||||
long[] accum = accumState.get(period);
|
||||
if (accum == null || accum[7] != 1) continue;
|
||||
|
||||
// Verify accumulator is still in the current window (guard against stale state)
|
||||
long periodMs = period * 1000L;
|
||||
long windowStart = (nowMs / periodMs) * periodMs;
|
||||
if (accum[5] != windowStart) continue;
|
||||
|
||||
out.collect(toBar(ticker, period, accum, false));
|
||||
LOG.debug("Debounced open bar emitted: ticker={}, period={}s", ticker, period);
|
||||
}
|
||||
}
|
||||
|
||||
private static long[] openWindow(TickWrapper tick, long windowStart, boolean valid) {
|
||||
|
||||
Reference in New Issue
Block a user