I am trying to learn Apache Beam and trying to create a sample project to learn stream processing. For now, I want to read from a Kafka topic “word” and print the data on console.
I have deployed flink(8081) as a standalone cluster and kafka(port 9091) on local using docker.
Since there is no proper documentation with clear example to do this I tried my own, you can find the code below.
package main
import (
"context"
"flag"
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/kafkaio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
"google.golang.org/appengine/log"
)
var (
// expansionAddr = flag.String("expansion_addr", "",
// "Address of Expansion Service. If not specified, attempts to automatically start an appropriate expansion service.")
// bootstrapServers = flag.String("bootstrap_servers", "",
// "(Required) URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.")
// topic = flag.String("topic", "kafka_taxirides_realtime", "Kafka topic to write to and read from.")
)
func init() {
register.DoFn2x0[context.Context, []byte](&LogFn{})
}
// LogFn is a DoFn to log rides.
type LogFn struct{}
// ProcessElement logs each element it receives.
func (fn *LogFn) ProcessElement(ctx context.Context, elm []byte) {
log.Infof(ctx, "Word info: %v", string(elm))
}
// FinishBundle waits a bit so the job server finishes receiving logs.
func (fn *LogFn) FinishBundle() {
time.Sleep(2 * time.Second)
}
func main() {
flag.Parse()
//beam initialization
beam.Init()
ctx := context.Background()
//creating pipeline object and scope
pipeline := beam.NewPipeline()
scope := pipeline.Root()
//reading from kafka IO --> This is not a native support as of now for beam and golang.
//it uses a cross-compiled library from java to acheive the kafka connector
//defining kafka details
brokerAddr := ""
bootstrapServer := "bootstrap-server:kafka-1:9091"
topic := "word"
// input reader is our consumer which reads from the input topic, this is defined as per kafkaio docs.
inputReader := kafkaio.Read(scope, brokerAddr, bootstrapServer, []string{topic})
vals := beam.DropKey(scope, inputReader)
beam.ParDo0(scope, &LogFn{}, vals)
if _, err := flink.Execute(ctx, pipeline); err != nil {
log.Errorf(ctx, "Failed to execute job: %v", err)
}
}
I am getting this error while executing the code.
`
2023/09/10 01:06:49 Downloaded: C:\tmp\artifacts\beam-sdks-java-io-expansion-service-2.49.0-m4yWpU_pIZFwgP3wHCijYZg7hfO6Eg5-Dx3eXCSRTb0.jar (sha256: 9b8c96a54fe921917080fdf01c28a361983b85f3ba120e7e0f1dde5c24914dbd, size: 59370861)
panic: not an App Engine context
`
Went over the documentation but couldn’t find anything much on this.
Tried changing the context in flink.Execute() to appengine base context as well but doesn’t seem to work.