ChatOps による Kubernetes Job
この記事は、くふうカンパニー Advent Calendar 2019 - Qiita 23日目の記事です。
はじめに
Kubernetes には Job のリソースがありますが、これは単体だと非常に使い勝手が悪い。 定期的に実行する処理は CronJob で なんとかなるが、任意のタイミングで実行するような処理を Job でやろうとする場合、とても面倒だ。
辛いところ
- Job は apply された瞬間から実行開始するので、実行したいタイミングで yaml を手元に用意して
kubectl apply
する必要がある - 一度実行された Job のリソースは完了してもクラスター内に残り、次に同じ yaml で実行しようとすると名前がコンフリクトするので、前のJob リソースを消してから実行する必要がある
辛い。
例えば DB のマイグレーションなどに利用する場合、それを利用するエンジニア全員に権限を渡すことになり、しかも kubectl
を直接触ることになる。
許せん。
そうだ、ChatOps しよう
こうだ。
流れ
- bot に mention をつけて job を実行してほしいとお願いする
- Slack の Event API で bot サーバーにリクエストが行く
- bot サーバーは対象の yaml を GitHub から GET する
- その yaml にランダムな suffix をつけ、 client-go を利用して Kubernetes API を叩く
- Kubernetes API は Job を実行する
- bot は Job の実行が終わるまで Job の状態をポーリングする
- Job が終了したら実行結果を Slack で通知する
Slack Bot API
Event API に app_mention event があるので今回はこれを利用する。
この event は、bot が mention されると以下のような payload のついた POST Request を送りつけてくる。
{ "type": "app_mention", "user": "U061F7AUR", "text": "<@U0LAN0Z89> is it everything a river should be?", "ts": "1515449522.000016", "channel": "C0LAN2Q65", "event_ts": "1515449522000016" }
これを利用して Job 実行に必要な情報を取得し実行しよう。
実装
@bot https://github.com/YasukeXXX/jobch/blob/master/examples/manifests/job.yaml Hello
と言った感じで Slack 上で話しかけるとリンク先のジョブのマニフェストに Hello
を Command として実行するようなインターフェースを作っていきます。
全体のコードはこちら github.com
以下、簡単に説明していく。
まず Slack の API エンドポイント。 これはほとんど こちら から拝借させていただいた。
var api = slack.New(os.Getenv("CONFIG_SLACK_OAUTH_TOKEN")) func main() { jobHandler := JobHandler{api} http.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) { buf := new(bytes.Buffer) buf.ReadFrom(r.Body) body := buf.String() eventsAPIEvent, err := slackevents.ParseEvent(json.RawMessage(body), slackevents.OptionVerifyToken(&slackevents.TokenComparator{VerificationToken: os.Getenv("CONFIG_SLACK_VERIFICATION_TOKEN")})) if err != nil { fmt.Println(err) w.WriteHeader(http.StatusInternalServerError) } if eventsAPIEvent.Type == slackevents.URLVerification { var r *slackevents.ChallengeResponse err = json.Unmarshal([]byte(body), &r) if err != nil { fmt.Println(err) w.WriteHeader(http.StatusInternalServerError) } w.Header().Set("Content-Type", "text") w.Write([]byte(r.Challenge)) } if eventsAPIEvent.Type == slackevents.CallbackEvent { innerEvent := eventsAPIEvent.InnerEvent switch ev := innerEvent.Data.(type) { case *slackevents.AppMentionEvent: fmt.Printf("[INFO] %s\n", ev.Text) if match := regexp.MustCompile(`\n*<([0-9a-zA-Z-/_.:]+)> (.+)$`).FindAllStringSubmatch(ev.Text, -1); match != nil { fmt.Println("Event is triggered") url := match[0][1] commands := strings.Split(match[0][2], " ") api.PostMessage(ev.Channel, slack.MsgOptionText(fmt.Sprintf("仕方ありませんね。\n%s を実行します。", match[0][1]), false)) job, err := jobHandler.Execute(url, commands, ev.Channel) if err != nil { blockObject := slack.NewTextBlockObject("mrkdwn", err.Error(), false, false) api.PostMessage(ev.Channel, slack.MsgOptionBlocks(slack.NewSectionBlock(blockObject, nil, nil))) return } api.PostMessage(ev.Channel, slack.MsgOptionText(fmt.Sprintf("ジョブを開始しました\nName: %s", job.Name), false)) } } } }) fmt.Println("[INFO] Server listening") http.ListenAndServe(":3000", nil) }
やってることはメッセージをurlとcommandsにパースしてJobHandlerに渡すだけ。Execute で error が返ってきたらそれをメッセージとして返す。成功したらジョブを開始したことを通知している。
次は JobHandler。
type JobHandler struct { client *slack.Client } func (j JobHandler) Execute(url string, commands []string, channel string) (job batchv1.Job, err error) { rawFile, err := GetFile(url) if err != nil { return } jsonFile, err := yaml.ToJSON(rawFile) err = json.Unmarshal(jsonFile, &job) if err != nil { return } job.Name = job.Name + "-" + RandString(10) for i, container := range job.Spec.Template.Spec.Containers { job.Spec.Template.Spec.Containers[i].Command = append(container.Command, commands...) } if err = createJob(&job); err != nil { return } go j.watchAndNotify(job.Name, channel) return } func (j JobHandler) watchAndNotify(jobName string, channel string) { t := time.NewTicker(time.Duration(20) * time.Second) for { select { case <-t.C: job, err := getJob(jobName) if err != nil { fmt.Println("[ERROR] Quit watching job ", job.Name) t.Stop() return } fmt.Println("[INFO] Watch job", job.Name) if job.Status.Succeeded >= 1 { msg := slack.Attachment{Color: "#36a64f", Text: fmt.Sprintf("Succeed %s Job execution", jobName)} j.client.PostMessage(channel, slack.MsgOptionAttachments(msg)) t.Stop() return } if job.Status.Failed >= 1 { msg := slack.Attachment{Color: "#e01e5a", Text: fmt.Sprintf("Failed %s execution", jobName)} j.client.PostMessage(channel, slack.MsgOptionAttachments(msg)) t.Stop() return } } } t.Stop() }
ここではURLから実際のファイルを取得してJob 構造体に Unmarshal し、重複してしまうとエラーになってしまう Name フィールドにランダムな suffix をつけてから Kubernetes API に Job の実行をリクエストしている。その際に指定された Commands も追記する。
さらに Job の実行結果を取得するため、Job の Status をポーリングする goruntime を実行しておく。これで Job の実際の処理が終了した際に通知される。
実際に実行するとこんな感じ。 少し生意気な感じがある。
Role Base Access Control
これをクラスター内部で実行する場合、Job リソースを扱う権限を与える必要がある。
ServiceAccount
ServiceAccount はクラスター内のリソースに対して割り当てるアカウント。 kubectl
を利用するユーザーに割り当てられるのは UserAccount だ。AWS でいう IAM Role と IAM User のようなもの。
ClusterRole と Role
これらは権限を具体的に定義するリソース。Job リソースの create と get の権限といったことを定義する。二つの違いは、 ClusterRole がどの namespace での権限かというところまで定義するのに対し、 Role は紐づけられたリソースが存在する namespace のみに権限が制限されていることだ。これも AWS で例えると Policy のようなもの。
ClusterRoleBinding と RoleBinding
これは前述した ServiceAccount と ClusterRole/Role を紐づけるリソース。
今回は Job リソースを扱いたいので Job の create と get の権限を Role で定義し、ServiceAccount に紐づける。 Manifest は以下の通り。
Manifest
RBAC
apiVersion: v1 kind: ServiceAccount metadata: name: jobch namespace: default --- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: jobch-role rules: - apiGroups: ["batch"] resources: - jobs verbs: - "create" - "get" --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: jobch-role-binding roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: jobch-role subjects: - kind: ServiceAccount name: jobch namespace: default
Workloads
apiVersion: extensions/v1beta1 kind: Deployment metadata: labels: app: jobch name: jobch spec: revisionHistoryLimit: 5 replicas: 1 template: metadata: labels: app: jobch spec: serviceAccountName: jobch containers: - name: jobch image: jobch env: - name: CONFIG_SLACK_VERIFICATION_TOKEN value: hogehoge - name: CONFIG_SLACK_OAUTH_TOKEN value: hogehoge ports: - containerPort: 3000 name: jobch --- apiVersion: v1 kind: Service metadata: labels: app: jobch name: jobch spec: ports: - name: jobch port: 8080 protocol: TCP targetPort: 3000 selector: app: jobch type: NodePort
Ingress は各Cloud Provider の設定に基づいて記述してください。
まとめ
Kubernetes におけるワンショット Job の環境を ChatOps によって改善できた。
ただ、この実装だといろいろ問題がある。
GitHub にマニフェストがあればなんでも実行できてしまうし、Slack にログインできるユーザーであれば誰でも実行できてしまうのでよろしくない。また、リンクを直接張るのもなかなか野暮ったいし、Command も何でも設定できてしまう。
(この辺りを克服した bot を弊社では使っており、近々 OSS 化したいと思っているのでそちらがリリースされたら使ってみて欲しい。)
くふうカンパニー Advent Calendar 2019、明日は bbz さんによる 何か
です。