How to submit Beam Python job onto Kubernetes with Flink runner?
Asked Answered
T

3

1

I'm wanting to run a continuous stream processing job using Beam on a Flink runner within Kubernetes. I've been following this tutorial here (https://python.plainenglish.io/apache-beam-flink-cluster-kubernetes-python-a1965f37b7cb) but I'm not sure what the author is referring to when he talks about the "flink master container". I don't understand how I am supposed to submit my Python code into the cluster, when that code is defined within a container image itself.

The Kubernetes Flink cluster architecture looks like this:

  • single JobManager, exposes the Flink web UI via a Service and Ingress

  • multiple Task Managers, each running 2 containers:

    • Flink task manager
    • Beam worker pool, which exposes port 50000

The Python code in the example tutorial has Beam configuration which looks like this:

options = PipelineOptions([
    "--runner=FlinkRunner",
    "--flink_version=1.10",
    "--flink_master=localhost:8081",
    "--environment_type=EXTERNAL",
    "--environment_config=localhost:50000"
])

It's clear that when you run this locally as per the tutorial, it speaks to the Beam worker pool to launch the application. However, if I have a Docker image containing my application code and I want to start this application within Kubernetes, where do I deploy this image in my Kubernetes cluster? Is it as a container within each Task Manager pod (and therefore using localhost:50000 to communicate to Beam)? Or do I create a single pod containing my application code and point that pod at port 50000 of my Task Managers - if so, is the fact that I have multiple Task Managers a problem?

Any pointers to documentation or examples would be really helpful. This other SO question has an incomplete answer.

Twayblade answered 18/7, 2023 at 12:8 Comment(0)
M
2

The Flink Kubernetes Operator does offer a Beam example that solves most of the tooling issue you are facing. It is written for Java, but by adding your Python source code into the Docker image you should be able to achieve what you are looking for.

Misconstrue answered 25/7, 2023 at 10:12 Comment(0)
T
0

Use custom image with Job manager and task manager of flink

Dockerfile

FROM apache/flink:1.16.2-scala_2.12-java11
ARG FLINK_VERSION=1.16.2
ARG KAFKA_VERSION=2.8.0

# Install python3.7 
RUN set -ex; \
  apt-get update && \
  apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev lzma liblzma-dev && \
  wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
  tar -xvf Python-3.7.9.tgz && \
  cd Python-3.7.9 && \
  ./configure --without-tests --enable-shared && \
  make -j4 && \
  make install && \
  ldconfig /usr/local/lib && \
  cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
  ln -s /usr/local/bin/python3 /usr/local/bin/python && \
  ln -s /usr/local/bin/pip3 /usr/local/bin/pip && \
  apt-get clean && \
  rm -rf /var/lib/apt/lists/* && \
  python -m pip install --upgrade pip; \
  pip install apache-flink==${FLINK_VERSION}; \
  pip install kafka-python

RUN pip install --no-cache-dir apache-beam[gcp]

# Copy files from official SDK image, including script/dependencies.
COPY --from=apache/beam_python3.7_sdk:2.48.0 /opt/apache/beam/ /opt/apache/beam/

Also above deployment don't have volume to store staging-artifacts So create PVC like this and adjust your PVC according to storageclassName

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: staging-artifacts-claim
  namespace: flink
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 5Gi
  storageClassName: standard

Deployment yaml that will use custom image of flink should be like this

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
  namespace: flink
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager
---
apiVersion: v1
kind: Service
metadata:
  name: beam-worker-pool
  namespace: flink
spec:
  selector:
    app: flink
    component: taskmanager
  ports:
    - protocol: TCP
      port: 50000
      targetPort: 50000
      name: pool
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
  namespace: flink
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: custom-flink:latest
        imagePullPolicy: IfNotPresent
        args: ["jobmanager"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob-server
        - containerPort: 8081
          name: webui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        - name: flink-staging
          mountPath: /tmp/beam-artifact-staging 
        securityContext:
          runAsUser: 9999
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
      - name: flink-staging
        persistentVolumeClaim:
          claimName:  staging-artifacts-claim 
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: flink
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: custom-flink:latest
        imagePullPolicy: IfNotPresent
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        - name: flink-staging
          mountPath: /tmp/beam-artifact-staging 
          runAsUser: 9999
      - name: beam-worker-pool
        image: apache/beam_python3.11_sdk
        args: ["--worker_pool"]
        ports:
        - containerPort: 50000
          name: pool
        livenessProbe:
          tcpSocket:
            port: 50000
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-staging
          mountPath: /tmp/beam-artifact-staging 
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
      - name: flink-staging 
        persistentVolumeClaim:
          claimName:  staging-artifacts-claim

Here is Flinkpipeline options

flink_options = PipelineOptions([
        "--runner=FlinkRunner",
        "--flink_master=flink-jobmanager:8081",
        "--environment_type=EXTERNAL",
        "--environment_config=beam-worker-pool:50000",
    ])

Run Beam code from another container may be cronjob/job or deployment and to run Beam code with python sdk create one image with your python code and make to sure use apache/beam_python3.7_sdk:2.48.0 and also install java in it so expansion service will start otherwise it will use docker.

Please adjust the version of sdk according to your need

Tzong answered 4/8, 2023 at 13:40 Comment(0)
A
0

The beam example of the Flink Kubernetes Operator assumes the application deployment mode and it didn't work for me. I had to create a session cluster and submit a Python Beam pipeline using a Kubernetes Job. I wrote a post about what I did.

Deploy Python Stream Processing App on Kubernetes - Part 2 Beam Pipeline on Flink Runner

Hope this helps.

Acanthus answered 2/7 at 10:44 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.