使用工作队列进行精细并行处理

在此示例中,您将运行一个 Kubernetes Job,它将多个并行任务作为工作进程运行,每个进程都作为单独的 Pod 运行。

在此示例中,每个 Pod 创建时都会从任务队列中获取一个工作单元,进行处理,然后重复此过程,直到队列末尾。

以下是本示例中步骤概述:

  1. 启动一个存储服务以保存工作队列。 在此示例中,您将使用 Redis 来存储工作项。在 上一个示例 中,您使用了 RabbitMQ。在此示例中,您将使用 Redis 和一个自定义工作队列客户端库;这是因为 AMQP 没有为客户端提供一个好的方法来检测有限长度工作队列何时为空。实际上,您会设置一个像 Redis 这样的存储,并为许多作业的工作队列和其他事物重复使用它。
  2. 创建一个队列并用消息填充它。 每条消息代表一个要完成的任务。在此示例中,消息是一个整数,我们将对其进行长时间计算。
  3. 启动一个在队列中处理任务的 Job。 Job 启动多个 Pod。每个 Pod 从消息队列中获取一个任务,处理它,然后重复此过程,直到队列末尾。

开始之前

您需要有一个 Kubernetes 集群,并且 kubectl 命令行工具必须配置为与您的集群通信。建议在至少有两个节点(不充当控制平面主机)的集群上运行本教程。如果您还没有集群,可以使用 minikube 创建一个,或者您可以使用以下 Kubernetes 游乐场之一:

您需要一个容器镜像注册表,您可以在其中上传镜像以在您的集群中运行。此示例使用 Docker Hub,但您可以将其改编为其他容器镜像注册表。

此任务示例还假设您在本地安装了 Docker。您使用 Docker 构建容器镜像。

熟悉 Job 的基本用法(非并行)。

启动 Redis

为了简单起见,在此示例中,您将启动一个 Redis 实例。有关以可扩展和冗余的方式部署 Redis 的示例,请参见 Redis 示例

您也可以直接下载以下文件:

要启动一个 Redis 实例,您需要创建 redis pod 和 redis service:

kubectl apply -f https://k8s.io/examples/application/job/redis/redis-pod.yaml
kubectl apply -f https://k8s.io/examples/application/job/redis/redis-service.yaml

用任务填充队列

现在让我们用一些“任务”填充队列。在此示例中,任务是将要打印的字符串。

启动一个临时交互式 Pod 来运行 Redis CLI。

kubectl run -i --tty temp --image redis --command "/bin/sh"
Waiting for pod default/redis2-c7h78 to be running, status is Pending, pod ready: false
Hit enter for command prompt

现在按回车键,启动 Redis CLI 并创建一个包含一些工作项的列表。

redis-cli -h redis
redis:6379> rpush job2 "apple"
(integer) 1
redis:6379> rpush job2 "banana"
(integer) 2
redis:6379> rpush job2 "cherry"
(integer) 3
redis:6379> rpush job2 "date"
(integer) 4
redis:6379> rpush job2 "fig"
(integer) 5
redis:6379> rpush job2 "grape"
(integer) 6
redis:6379> rpush job2 "lemon"
(integer) 7
redis:6379> rpush job2 "melon"
(integer) 8
redis:6379> rpush job2 "orange"
(integer) 9
redis:6379> lrange job2 0 -1
1) "apple"
2) "banana"
3) "cherry"
4) "date"
5) "fig"
6) "grape"
7) "lemon"
8) "melon"
9) "orange"

因此,具有键 job2 的列表将是工作队列。

注意:如果您没有正确设置 Kube DNS,您可能需要将上述代码块的第一步更改为 redis-cli -h $REDIS_SERVICE_HOST

创建容器镜像

现在,您已准备好创建一个将处理该队列中工作的镜像。

您将使用一个带有 Redis 客户端的 Python 工作程序,从消息队列中读取消息。

提供了一个简单的 Redis 工作队列客户端库,名为 rediswq.py (下载)。

Job 的每个 Pod 中的“工作程序”程序使用工作队列客户端库来获取工作。以下是该程序:

#!/usr/bin/env python

import time
import rediswq

host="redis"
# Uncomment next two lines if you do not have Kube-DNS working.
# import os
# host = os.getenv("REDIS_SERVICE_HOST")

q = rediswq.RedisWQ(name="job2", host=host)
print("Worker with sessionID: " +  q.sessionID())
print("Initial queue state: empty=" + str(q.empty()))
while not q.empty():
  item = q.lease(lease_secs=10, block=True, timeout=2) 
  if item is not None:
    itemstr = item.decode("utf-8")
    print("Working on " + itemstr)
    time.sleep(10) # Put your actual work here instead of sleep.
    q.complete(item)
  else:
    print("Waiting for work")
print("Queue empty, exiting")

您也可以下载 worker.pyrediswq.pyDockerfile 文件,然后构建容器镜像。以下是如何使用 Docker 进行镜像构建的示例:

docker build -t job-wq-2 .

推送镜像

对于 Docker Hub,请使用您的用户名标记您的应用程序镜像,并使用以下命令将其推送到 Hub。请将 <username> 替换为您的 Hub 用户名。

docker tag job-wq-2 <username>/job-wq-2
docker push <username>/job-wq-2

您需要推送到公共存储库或 配置您的集群以能够访问您的私有存储库

定义一个 Job

以下是要创建的 Job 的清单:

apiVersion: batch/v1
kind: Job
metadata:
  name: job-wq-2
spec:
  parallelism: 2
  template:
    metadata:
      name: job-wq-2
    spec:
      containers:
      - name: c
        image: gcr.io/myproject/job-wq-2
      restartPolicy: OnFailure

在此示例中,每个 Pod 处理队列中的多个项目,然后在没有更多项目时退出。由于工作程序本身检测到工作队列何时为空,而 Job 控制器不知道工作队列,因此它依赖于工作程序来发出工作完成的信号。工作程序通过成功退出来发出队列为空的信号。因此,只要任何工作程序成功退出,控制器就知道工作已完成,并且 Pod 将很快退出。因此,您需要将 Job 的完成计数留空。作业控制器将等待其他 Pod 也完成。

运行 Job

因此,现在运行 Job:

# this assumes you downloaded and then edited the manifest already
kubectl apply -f ./job.yaml

现在等待片刻,然后检查 Job:

kubectl describe jobs/job-wq-2
Name:             job-wq-2
Namespace:        default
Selector:         controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
Labels:           controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
                  job-name=job-wq-2
Annotations:      <none>
Parallelism:      2
Completions:      <unset>
Start Time:       Mon, 11 Jan 2022 17:07:59 +0000
Pods Statuses:    1 Running / 0 Succeeded / 0 Failed
Pod Template:
  Labels:       controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
                job-name=job-wq-2
  Containers:
   c:
    Image:              container-registry.example/exampleproject/job-wq-2
    Port:
    Environment:        <none>
    Mounts:             <none>
  Volumes:              <none>
Events:
  FirstSeen    LastSeen    Count    From            SubobjectPath    Type        Reason            Message
  ---------    --------    -----    ----            -------------    --------    ------            -------
  33s          33s         1        {job-controller }                Normal      SuccessfulCreate  Created pod: job-wq-2-lglf8

您可以等待 Job 成功完成,并设置一个超时时间:

# The check for condition name is case insensitive
kubectl wait --for=condition=complete --timeout=300s job/job-wq-2
kubectl logs pods/job-wq-2-7r7b2
Worker with sessionID: bbd72d0a-9e5c-4dd6-abf6-416cc267991f
Initial queue state: empty=False
Working on banana
Working on date
Working on lemon

如您所见,此 Job 的一个 Pod 处理了多个工作单元。

替代方案

如果您运行队列服务或修改容器以使用工作队列不方便,您可能需要考虑其他 作业模式 之一。

如果您有持续的后台处理工作流要运行,请考虑使用 ReplicaSet 运行您的后台工作程序,并考虑运行一个后台处理库,例如 https://github.com/resque/resque

上次修改时间:2024 年 3 月 16 日凌晨 2:39 PST:修复了并行处理工作队列任务的文档。 (bed970676c)