I am trying to print the datastream
by applying the tumbling process window
for every 5
seconds. Since I couldn’t implement the custom deserializer for now, I created the process function which returns the result as tuple, and as per this documentation link I could link the process function with the windowing operation so I tried this out:
def get_data(self):
source = self.__get_kafka_source()
ds = self.env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source").window_all(
TumblingProcessingTimeWindows.of(Time.seconds(5)))
ds.process(ExtractingRecordAttributes(),
output_type=Types.TUPLE(
[Types.STRING(), Types.STRING(),
Types.STRING()])).print()
self.env.execute("source")
def __get_kafka_source(self):
source = KafkaSource.builder() \
.set_bootstrap_servers("localhost:9092") \
.set_topics("test-topic1") \
.set_group_id("my-group") \
.set_starting_offsets(KafkaOffsetsInitializer.latest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
return source
class ExtractingRecordAttributes(KeyedProcessFunction):
def __init__(self):
pass
def process_element(self, value: str, ctx: 'KeyedProcessFunction.Context'):
parts = UserData(*ast.literal_eval(value))
result = (parts.user, parts.rank, str(ctx.timestamp()))
yield result
def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
yield "On timer timestamp: " + str(timestamp)
When I trigger the get_data
method, it gives me the below error:
return self._wrapped_function.process(self._internal_context, input_data)
AttributeError: 'ExtractingRecordAttributes' object has no attribute 'process'
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:61)
at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:421)
... 7 more
If I don’t use window_all, everything works fine. But the moment I introduce it, it fails. What am I doing wrong here? Any hints would be helpful.
I am using Pyflink 1.17.1
TIA.
The problem is for windowing
you need to use ProcessWindowFunction
and not ProcessFunction
.
Ideally, your code should look like this:
class ExtractingRecordAttributes(ProcessWindowFunction):
def __init__(self):
pass
def process(self, key: str,
context: 'ProcessWindowFunction.Context',
elements: Iterable[Tuple[str, str]]) -> Iterable[Tuple[str, str, str]]:
result = ""
for element in elements:
parts = UserData(*ast.literal_eval(str(element)))
result = (parts.user, parts.rank, str(context.current_processing_time()))
yield result
def close(self):
pass
I haven’t tested this, but I think it should work. Here is the reference from the documentation.