Use custom image with Job manager and task manager of flink
Dockerfile
FROM apache/flink:1.16.2-scala_2.12-java11
ARG FLINK_VERSION=1.16.2
ARG KAFKA_VERSION=2.8.0
# Install python3.7
RUN set -ex; \
apt-get update && \
apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev lzma liblzma-dev && \
wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
tar -xvf Python-3.7.9.tgz && \
cd Python-3.7.9 && \
./configure --without-tests --enable-shared && \
make -j4 && \
make install && \
ldconfig /usr/local/lib && \
cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
ln -s /usr/local/bin/python3 /usr/local/bin/python && \
ln -s /usr/local/bin/pip3 /usr/local/bin/pip && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* && \
python -m pip install --upgrade pip; \
pip install apache-flink==${FLINK_VERSION}; \
pip install kafka-python
RUN pip install --no-cache-dir apache-beam[gcp]
# Copy files from official SDK image, including script/dependencies.
COPY --from=apache/beam_python3.7_sdk:2.48.0 /opt/apache/beam/ /opt/apache/beam/
Also above deployment don't have volume to store staging-artifacts So create PVC like this and adjust your PVC according to storageclassName
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: staging-artifacts-claim
namespace: flink
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
storageClassName: standard
Deployment yaml that will use custom image of flink should be like this
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
namespace: flink
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink
component: jobmanager
---
apiVersion: v1
kind: Service
metadata:
name: beam-worker-pool
namespace: flink
spec:
selector:
app: flink
component: taskmanager
ports:
- protocol: TCP
port: 50000
targetPort: 50000
name: pool
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
namespace: flink
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: custom-flink:latest
imagePullPolicy: IfNotPresent
args: ["jobmanager"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: flink-staging
mountPath: /tmp/beam-artifact-staging
securityContext:
runAsUser: 9999
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: flink-staging
persistentVolumeClaim:
claimName: staging-artifacts-claim
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: flink
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: custom-flink:latest
imagePullPolicy: IfNotPresent
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: flink-staging
mountPath: /tmp/beam-artifact-staging
runAsUser: 9999
- name: beam-worker-pool
image: apache/beam_python3.11_sdk
args: ["--worker_pool"]
ports:
- containerPort: 50000
name: pool
livenessProbe:
tcpSocket:
port: 50000
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-staging
mountPath: /tmp/beam-artifact-staging
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: flink-staging
persistentVolumeClaim:
claimName: staging-artifacts-claim
Here is Flinkpipeline options
flink_options = PipelineOptions([
"--runner=FlinkRunner",
"--flink_master=flink-jobmanager:8081",
"--environment_type=EXTERNAL",
"--environment_config=beam-worker-pool:50000",
])
Run Beam code from another container may be cronjob/job or deployment and to run Beam code with python sdk create one image with your python code and make to sure use apache/beam_python3.7_sdk:2.48.0
and also install java in it so expansion service will start otherwise it will use docker.
Please adjust the version of sdk according to your need