Apache Beam portable runner to run golang based job on flink

I am trying to learn Apache Beam and trying to create a sample project to learn stream processing. For now, I want to read from a Kafka topic “word” and print the data on console.
I have deployed flink(8081) as a standalone cluster and kafka(port 9091) on local using docker.

Since there is no proper documentation with clear example to do this I tried my own, you can find the code below.

package main

import (


var (
//  expansionAddr = flag.String("expansion_addr", "",
//      "Address of Expansion Service. If not specified, attempts to automatically start an appropriate expansion service.")
//  bootstrapServers = flag.String("bootstrap_servers", "",
//      "(Required) URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.")
//  topic = flag.String("topic", "kafka_taxirides_realtime", "Kafka topic to write to and read from.")


func init() {
    register.DoFn2x0[context.Context, []byte](&LogFn{})

// LogFn is a DoFn to log rides.
type LogFn struct{}

// ProcessElement logs each element it receives.
func (fn *LogFn) ProcessElement(ctx context.Context, elm []byte) {
    log.Infof(ctx, "Word info: %v", string(elm))

// FinishBundle waits a bit so the job server finishes receiving logs.
func (fn *LogFn) FinishBundle() {
    time.Sleep(2 * time.Second)

func main() {
    //beam initialization
    ctx := context.Background()
    //creating pipeline object and scope
    pipeline := beam.NewPipeline()
    scope := pipeline.Root()

    //reading from kafka IO --> This is not a native support as of now for beam and golang.
    //it uses a cross-compiled library from java to acheive the kafka connector

    //defining kafka details
    brokerAddr := ""
    bootstrapServer := "bootstrap-server:kafka-1:9091"
    topic := "word"
    // input reader is our consumer which reads from the input topic, this is defined as per kafkaio docs.
    inputReader := kafkaio.Read(scope, brokerAddr, bootstrapServer, []string{topic})
    vals := beam.DropKey(scope, inputReader)
    beam.ParDo0(scope, &LogFn{}, vals)

    if _, err := flink.Execute(ctx, pipeline); err != nil {
        log.Errorf(ctx, "Failed to execute job: %v", err)

I am getting this error while executing the code.

2023/09/10 01:06:49 Downloaded: C:\tmp\artifacts\beam-sdks-java-io-expansion-service-2.49.0-m4yWpU_pIZFwgP3wHCijYZg7hfO6Eg5-Dx3eXCSRTb0.jar (sha256: 9b8c96a54fe921917080fdf01c28a361983b85f3ba120e7e0f1dde5c24914dbd, size: 59370861)
panic: not an App Engine context


Went over the documentation but couldn’t find anything much on this.
Tried changing the context in flink.Execute() to appengine base context as well but doesn’t seem to work.

Leave a Comment