Apache beam data trigger when using event time trigger

I’m trying to create a dataflow pipeline that read data in stream from pubsub and process it.
I want to use an event time processing trigger to process data in order of publish in pubsub here is the code that I’m using :

PCollection<KV<String,String> pColIn=pubsubMessage.apply("timestamp",ParDo.of(new DoFn<KV<String,String>,KV<String,String>>{
          @ProcessElement
          public void processElement(@Element KV<String,String> in, OutputReceiver<KV<String,String>> out){
           Instant inst=getInstant(in);
           out.outputWithTimestamp(in,inst)
     }
      }))
      .apply("Windows", Window.<KV<String,String>,<String,String>>into(FixedWindows.of(Duration.standardMinutes(1)))
                  .triggering(
                        Repeatedly.forever(
                              AfterWatermark.pastEndOfWindow()
                        )
                  )
                  .withAllowedLateness(Duration.standardSeconds(5))
                  .discardingFiredPanes()
      )
      .apply("Test1",ParDo.of(new DoFn<KV<String,String>,KV<String,String>>{
          @ProcessElement
          public void processElement(@Element KV<String,String>> in, OutputReceiver<KV<String,String>> out){
            System.out.println("[Testing1]:"+c.timestamp());
            out.output(in);
      }})
      );
PCollection<KV<String,Iterable<String>> pColGrouped= pColIn.apply("Group by Key / Window", GroupByKey.create());
pColGrouped.apply("Test2",ParDo.of(new DoFn<KV<String,Iterable<String>,KV<String,Iterable<String>>{
          @ProcessElement
          public void processElement(@Element KV<String, Iterable<String>> in, OutputReceiver<KV<String, Iterable<String>>> out) {
            System.out.println("[Testing2]:" + c.timestamp());
            out.output(in);
      }})
      );
  1. after reading message from pubsub I add a timestamp to its data (task timestamp)
  2. then I create a fixed window that get triggered repeatedly after 1min (task Windows)
  3. I test the firing of pCollection (task Test1)
  4. I group element of Pcollection using key (first element of KV) (task Group by Key / Window)
  5. finally I add a step to see if data is grouped (task Test2)

I’m debuging the code an put break point in each step, In the first step I’m putting a timestamp in the future (I rub the pipeline at 14:10:00.000Z, I put the time stamp at 14:11:30.000Z)
in the 3rd step I get the message instantly (not After 1minutes)
In the 5th step I dont get any message

did somthing wrong with my code ? why I didn’t get data after the 5th step ?
thank you for help.
EDIT
when I look at the PaneInfo I see: NO_FIRING

Leave a Comment