I’m trying to create a dataflow pipeline that read data in stream from pubsub and process it.
I want to use an event time processing trigger to process data in order of publish in pubsub here is the code that I’m using :
PCollection<KV<String,String> pColIn=pubsubMessage.apply("timestamp",ParDo.of(new DoFn<KV<String,String>,KV<String,String>>{
@ProcessElement
public void processElement(@Element KV<String,String> in, OutputReceiver<KV<String,String>> out){
Instant inst=getInstant(in);
out.outputWithTimestamp(in,inst)
}
}))
.apply("Windows", Window.<KV<String,String>,<String,String>>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(
Repeatedly.forever(
AfterWatermark.pastEndOfWindow()
)
)
.withAllowedLateness(Duration.standardSeconds(5))
.discardingFiredPanes()
)
.apply("Test1",ParDo.of(new DoFn<KV<String,String>,KV<String,String>>{
@ProcessElement
public void processElement(@Element KV<String,String>> in, OutputReceiver<KV<String,String>> out){
System.out.println("[Testing1]:"+c.timestamp());
out.output(in);
}})
);
PCollection<KV<String,Iterable<String>> pColGrouped= pColIn.apply("Group by Key / Window", GroupByKey.create());
pColGrouped.apply("Test2",ParDo.of(new DoFn<KV<String,Iterable<String>,KV<String,Iterable<String>>{
@ProcessElement
public void processElement(@Element KV<String, Iterable<String>> in, OutputReceiver<KV<String, Iterable<String>>> out) {
System.out.println("[Testing2]:" + c.timestamp());
out.output(in);
}})
);
- after reading message from pubsub I add a timestamp to its data (task timestamp)
- then I create a fixed window that get triggered repeatedly after 1min (task Windows)
- I test the firing of pCollection (task Test1)
- I group element of Pcollection using key (first element of KV) (task Group by Key / Window)
- finally I add a step to see if data is grouped (task Test2)
I’m debuging the code an put break point in each step, In the first step I’m putting a timestamp in the future (I rub the pipeline at 14:10:00.000Z, I put the time stamp at 14:11:30.000Z)
in the 3rd step I get the message instantly (not After 1minutes)
In the 5th step I dont get any message
did somthing wrong with my code ? why I didn’t get data after the 5th step ?
thank you for help.
EDIT
when I look at the PaneInfo I see: NO_FIRING