Kubernetes 使用工作隊列進行精細的并行處理

2022-06-15 09:59 更新

使用工作隊列進行精細的并行處理

在這個例子中,我們會運行一個Kubernetes Job,其中的 Pod 會運行多個并行工作進程。

在這個例子中,當每個pod被創(chuàng)建時,它會從一個任務(wù)隊列中獲取一個工作單元,處理它,然后重復(fù),直到到達隊列的尾部。

下面是這個示例的步驟概述:

  1. 啟動存儲服務(wù)用于保存工作隊列。 在這個例子中,我們使用 Redis 來存儲工作項。 在上一個例子中,我們使用了 RabbitMQ。 在這個例子中,由于 AMQP 不能為客戶端提供一個良好的方法來檢測一個有限長度的工作隊列是否為空, 我們使用了 Redis 和一個自定義的工作隊列客戶端庫。 在實踐中,你可能會設(shè)置一個類似于 Redis 的存儲庫,并將其同時用于多項任務(wù)或其他事務(wù)的工作隊列。
  2. 創(chuàng)建一個隊列,然后向其中填充消息。 每個消息表示一個將要被處理的工作任務(wù)。 在這個例子中,消息是一個我們將用于進行長度計算的整數(shù)。
  3. 啟動一個 Job 對隊列中的任務(wù)進行處理。這個 Job 啟動了若干個 Pod 。 每個 Pod 從消息隊列中取出一個工作任務(wù),處理它,然后重復(fù),直到到達隊列的尾部。

在開始之前

你必須擁有一個 Kubernetes 的集群,同時你的 Kubernetes 集群必須帶有 kubectl 命令行工具。 建議在至少有兩個節(jié)點的集群上運行本教程,且這些節(jié)點不作為控制平面主機。 如果你還沒有集群,你可以通過 Minikube 構(gòu)建一個你自己的集群,或者你可以使用下面任意一個 Kubernetes 工具構(gòu)建:

您的 Kubernetes 服務(wù)器版本必須不低于版本 v1.8. 要獲知版本信息,請輸入 ?kubectl version?。

熟悉基本的、非并行的 Job。

啟動 Redis

對于這個例子,為了簡單起見,我們將啟動一個單實例的 Redis。 了解如何部署一個可伸縮、高可用的 Redis 例子,請查看 Redis 示例

你也可以直接下載如下文件:

使用任務(wù)填充隊列

現(xiàn)在,讓我們往隊列里添加一些“任務(wù)”。在這個例子中,我們的任務(wù)是一些將被打印出來的字符串。

啟動一個臨時的可交互的 pod 用于運行 Redis 命令行界面。

kubectl run -i --tty temp --image redis --command "/bin/sh"
Waiting for pod default/redis2-c7h78 to be running, status is Pending, pod ready: false
Hit enter for command prompt

現(xiàn)在按回車鍵,啟動 redis 命令行界面,然后創(chuàng)建一個存在若干個工作項的列表。

# redis-cli -h redis
redis:6379> rpush job2 "apple"
(integer) 1
redis:6379> rpush job2 "banana"
(integer) 2
redis:6379> rpush job2 "cherry"
(integer) 3
redis:6379> rpush job2 "date"
(integer) 4
redis:6379> rpush job2 "fig"
(integer) 5
redis:6379> rpush job2 "grape"
(integer) 6
redis:6379> rpush job2 "lemon"
(integer) 7
redis:6379> rpush job2 "melon"
(integer) 8
redis:6379> rpush job2 "orange"
(integer) 9
redis:6379> lrange job2 0 -1
1) "apple"
2) "banana"
3) "cherry"
4) "date"
5) "fig"
6) "grape"
7) "lemon"
8) "melon"
9) "orange"

因此,這個鍵為 ?job2 ?的列表就是我們的工作隊列。

注意:如果你還沒有正確地配置 Kube DNS,你可能需要將上面的第一步改為 ?redis-cli -h $REDIS_SERVICE_HOST?。

創(chuàng)建鏡像

現(xiàn)在我們已經(jīng)準備好創(chuàng)建一個我們要運行的鏡像

我們會使用一個帶有 redis 客戶端的 python 工作程序從消息隊列中讀出消息。

這里提供了一個簡單的 Redis 工作隊列客戶端庫,叫 rediswq.py (下載)。

Job 中每個 Pod 內(nèi)的 “工作程序” 使用工作隊列客戶端庫獲取工作。如下:

#!/usr/bin/env python

import time
import rediswq

host="redis"
# Uncomment next two lines if you do not have Kube-DNS working.
# import os
# host = os.getenv("REDIS_SERVICE_HOST")

q = rediswq.RedisWQ(name="job2", host=host)
print("Worker with sessionID: " +  q.sessionID())
print("Initial queue state: empty=" + str(q.empty()))
while not q.empty():
  item = q.lease(lease_secs=10, block=True, timeout=2)
  if item is not None:
    itemstr = item.decode("utf-8")
    print("Working on " + itemstr)
    time.sleep(10) # Put your actual work here instead of sleep.
    q.complete(item)
  else:
    print("Waiting for work")
print("Queue empty, exiting")

你也可以下載 worker.py、 rediswq.py 和 Dockerfile。然后構(gòu)建鏡像:

docker build -t job-wq-2 .

Push 鏡像

對于 Docker Hub,請先用你的用戶名給鏡像打上標簽, 然后使用下面的命令 push 你的鏡像到倉庫。請將 ?<username>? 替換為你自己的 Hub 用戶名。

docker tag job-wq-2 <username>/job-wq-2
docker push <username>/job-wq-2

你需要將鏡像 push 到一個公共倉庫或者 配置集群訪問你的私有倉庫。

如果你使用的是 Google Container Registry, 請先用你的 project ID 給你的鏡像打上標簽,然后 push 到 GCR 。請將 ?<project>? 替換為你自己的 project ID

docker tag job-wq-2 gcr.io/<project>/job-wq-2
gcloud docker -- push gcr.io/<project>/job-wq-2

定義一個 Job

這是 job 定義:

apiVersion: batch/v1
kind: Job
metadata:
  name: job-wq-2
spec:
  parallelism: 2
  template:
    metadata:
      name: job-wq-2
    spec:
      containers:
      - name: c
        image: gcr.io/myproject/job-wq-2
      restartPolicy: OnFailure

請確保將 job 模板中的 ?gcr.io/myproject? 更改為你自己的路徑。

在這個例子中,每個 pod 處理了隊列中的多個項目,直到隊列中沒有項目時便退出。 因為是由工作程序自行檢測工作隊列是否為空,并且 Job 控制器不知道工作隊列的存在, 這依賴于工作程序在完成工作時發(fā)出信號。 工作程序以成功退出的形式發(fā)出信號表示工作隊列已經(jīng)為空。 所以,只要有任意一個工作程序成功退出,控制器就知道工作已經(jīng)完成了,所有的 Pod 將很快會退出。 因此,我們將 Job 的完成計數(shù)(Completion Count)設(shè)置為 1 。 盡管如此,Job 控制器還是會等待其它 Pod 完成。

運行 Job 

現(xiàn)在運行這個 Job :

kubectl apply -f ./job.yaml

稍等片刻,然后檢查這個 Job。

kubectl describe jobs/job-wq-2
Name:             job-wq-2
Namespace:        default
Selector:         controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
Labels:           controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
                  job-name=job-wq-2
Annotations:      <none>
Parallelism:      2
Completions:      <unset>
Start Time:       Mon, 11 Jan 2016 17:07:59 -0800
Pods Statuses:    1 Running / 0 Succeeded / 0 Failed
Pod Template:
  Labels:       controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
                job-name=job-wq-2
  Containers:
   c:
    Image:              gcr.io/exampleproject/job-wq-2
    Port:
    Environment:        <none>
    Mounts:             <none>
  Volumes:              <none>
Events:
  FirstSeen    LastSeen    Count    From            SubobjectPath    Type        Reason            Message
  ---------    --------    -----    ----            -------------    --------    ------            -------
  33s          33s         1        {job-controller }                Normal      SuccessfulCreate  Created pod: job-wq-2-lglf8

查看日志:

kubectl logs pods/job-wq-2-7r7b2
Worker with sessionID: bbd72d0a-9e5c-4dd6-abf6-416cc267991f
Initial queue state: empty=False
Working on banana
Working on date
Working on lemon

你可以看到,其中的一個 pod 處理了若干個工作單元。

替代方案

如果你不方便運行一個隊列服務(wù)或者修改你的容器用于運行一個工作隊列,你可以考慮其它的 Job 模式。

如果你有持續(xù)的后臺處理業(yè)務(wù),那么可以考慮使用 ?ReplicaSet ?來運行你的后臺業(yè)務(wù), 和運行一個類似 https://github.com/resque/resque 的后臺處理庫。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號