Apache Beam portable runner to run golang based job on flink

I am trying to learn Apache Beam and trying to create a sample project to learn stream processing. For now, I want to read from a Kafka topic “word” and print the data on console.
I have deployed flink(8081) as a standalone cluster and kafka(port 9091) on local using docker.

Since there is no proper documentation with clear example to do this I tried my own, you can find the code below.

package main

import (
    "context"
    "flag"
    "time"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/kafkaio"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
    "google.golang.org/appengine/log"
)

var (
//  expansionAddr = flag.String("expansion_addr", "",
//      "Address of Expansion Service. If not specified, attempts to automatically start an appropriate expansion service.")
//  bootstrapServers = flag.String("bootstrap_servers", "",
//      "(Required) URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.")
//  topic = flag.String("topic", "kafka_taxirides_realtime", "Kafka topic to write to and read from.")

)

func init() {
    register.DoFn2x0[context.Context, []byte](&LogFn{})
}

// LogFn is a DoFn to log rides.
type LogFn struct{}

// ProcessElement logs each element it receives.
func (fn *LogFn) ProcessElement(ctx context.Context, elm []byte) {
    log.Infof(ctx, "Word info: %v", string(elm))
}

// FinishBundle waits a bit so the job server finishes receiving logs.
func (fn *LogFn) FinishBundle() {
    time.Sleep(2 * time.Second)
}

func main() {
    flag.Parse()
    //beam initialization
    beam.Init()
    ctx := context.Background()
    //creating pipeline object and scope
    pipeline := beam.NewPipeline()
    scope := pipeline.Root()

    //reading from kafka IO --> This is not a native support as of now for beam and golang.
    //it uses a cross-compiled library from java to acheive the kafka connector

    //defining kafka details
    brokerAddr := ""
    bootstrapServer := "bootstrap-server:kafka-1:9091"
    topic := "word"
    // input reader is our consumer which reads from the input topic, this is defined as per kafkaio docs.
    inputReader := kafkaio.Read(scope, brokerAddr, bootstrapServer, []string{topic})
    vals := beam.DropKey(scope, inputReader)
    beam.ParDo0(scope, &LogFn{}, vals)

    if _, err := flink.Execute(ctx, pipeline); err != nil {
        log.Errorf(ctx, "Failed to execute job: %v", err)
    }
}

I am getting this error while executing the code.

`
2023/09/10 01:06:49 Downloaded: C:\tmp\artifacts\beam-sdks-java-io-expansion-service-2.49.0-m4yWpU_pIZFwgP3wHCijYZg7hfO6Eg5-Dx3eXCSRTb0.jar (sha256: 9b8c96a54fe921917080fdf01c28a361983b85f3ba120e7e0f1dde5c24914dbd, size: 59370861)
panic: not an App Engine context

`

Went over the documentation but couldn’t find anything much on this.
Tried changing the context in flink.Execute() to appengine base context as well but doesn’t seem to work.

Leave a Comment