Skip to content
This repository has been archived by the owner on Sep 2, 2022. It is now read-only.

Latest commit

 

History

History
146 lines (115 loc) · 4.27 KB

kafka_test_guide.md

File metadata and controls

146 lines (115 loc) · 4.27 KB

Testing the Flink Operator with Apache Kafka

Often times developers or users want to be able to quickly try out the Flink Operator with a long-running streaming application and test features like taking savepoints. The WordCount example including in the Flink release cannot do the job, because it exits after processing the input file. In this case, you might need to have a streaming data source (e.g., a Apache Kafka cluster), a streaming data generator and a Flink streaming application for testing purposes. This document introduces how to setup such a test environment.

Prerequisites

  • a running Kubernetes cluster with enough capacity
  • a running Flink Operator in the cluster
  • Helm 3+ installed on your local machine

Steps

1. Install Kafka

Create namespace kafka and install Kafka including Zookeeper in it:

kubectl create ns kafka
helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
helm install my-kafka incubator/kafka --namespace kafka

After that Kafka broker service will be available at my-kafka.kafka.svc.cluster.local:9092, run the following command to view more details:

helm status my-kafka -n kafka

2. Ingest streaming data into Kafka

Deploy the ClickGenerator application from the Flink Operations Playground to write data to the Kafka cluster.

You can create a Docker image from the Dockerfile or use the existing image functicons/flink-ops-playground:2-FLINK-1.9-scala_2.11 to create a deployment manifest.

kafka_click_generator.yaml:

apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: kafka-click-generator
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: kafka-click-generator
    spec:
      containers:
        - name: kafka-click-generator
          image: functicons/flink-ops-playground:2-FLINK-1.9-scala_2.11
          command: ["java"]
          args:
            - "-classpath"
            - "/opt/ClickCountJob.jar:/opt/flink/lib/*"
            - "org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator"
            - "--bootstrap.servers"
            - "my-kafka.kafka.svc.cluster.local:9092"
            - "--topic"
            - "input"

then run

kubectl apply -f kafka_click_generator.yaml

3. Run streaming application with the operator

Now you can create a Flink job cluster CR with the ClickEventCount application.

flinkcluster_clickcount.yaml:

apiVersion: flinkoperator.k8s.io/v1beta1
kind: FlinkCluster
metadata:
  name: flinkcluster-clickcount
spec:
  image:
    name: functicons/flink-ops-playground:2-FLINK-1.9-scala_2.11
  jobManager:
    ports:
      ui: 8081
    resources:
      limits:
        memory: "2Gi"
        cpu: "200m"
  taskManager:
    replicas: 2
    resources:
      limits:
        memory: "2Gi"
        cpu: "200m"
  job:
    jarFile: /opt/ClickCountJob.jar
    className: org.apache.flink.playgrounds.ops.clickcount.ClickEventCount
    args:
      [
        "--bootstrap.servers",
        "my-kafka.kafka.svc.cluster.local:9092",
        "--checkpointing",
        "--event-time",
      ]
    parallelism: 2

then run the following command to launch the streaming application:

kubectl apply -f flinkcluster_clickcount.yaml

After that you can check the Flink cluster and job status with:

kubectl describe flinkclusters flinkcluster-clickcount

4. Tear down

Delete the FlinkCluster custom resource:

kubectl delete flinkclusters flinkcluster-clickcount

Delete ClickGenerator:

kubectl delete deployments kafka-click-generator

Delete Kafka:

helm uninstall my-kafka -n kafka