Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 147 additions & 101 deletions cmd/linstor-scheduler-admission/linstor-scheduler-admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,22 @@ package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"

"github.com/sirupsen/logrus"

admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/sirupsen/logrus"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

kwhhttp "github.com/slok/kubewebhook/v2/pkg/http"
kwhlogrus "github.com/slok/kubewebhook/v2/pkg/log/logrus"
kwhmodel "github.com/slok/kubewebhook/v2/pkg/model"
kwhmutating "github.com/slok/kubewebhook/v2/pkg/webhook/mutating"
)

const annBetaStorageProvisioner = "volume.beta.kubernetes.io/storage-provisioner"
Expand All @@ -45,112 +43,168 @@ func initFlags() *config {
return cfg
}

func run(cli kubernetes.Interface) error {
logrusLogEntry := logrus.NewEntry(logrus.New())
logrusLogEntry.Logger.SetLevel(logrus.DebugLevel)
logger := kwhlogrus.NewLogrus(logrusLogEntry)
type jsonPatchOp struct {
Op string `json:"op"`
Path string `json:"path"`
Value interface{} `json:"value,omitempty"`
}

cfg := initFlags()
// shouldMutateScheduler checks if a pod uses LINSTOR volumes and should have its scheduler overridden.
func shouldMutateScheduler(ctx context.Context, pod *corev1.Pod, namespace string, cli kubernetes.Interface, cfg *config) bool {
var pvcNames []string

// Create mutator.
mt := kwhmutating.MutatorFunc(func(ctx context.Context, ar *kwhmodel.AdmissionReview, obj metav1.Object) (*kwhmutating.MutatorResult, error) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return &kwhmutating.MutatorResult{}, nil
for _, volume := range pod.Spec.Volumes {
// Volume has inline CSI driver assigned
if volume.CSI != nil && volume.CSI.Driver == cfg.driverName {
return true
}

// Scheduler name is already assigned
if pod.Spec.SchedulerName != "" && pod.Spec.SchedulerName != "default-scheduler" {
return &kwhmutating.MutatorResult{}, nil
// Volume is not PVC, it does not interest us
if volume.PersistentVolumeClaim == nil {
continue
}
pvcNames = append(pvcNames, volume.PersistentVolumeClaim.ClaimName)
}

var pvcNames []string

// Collect all PVCs attached to Pod
for _, volume := range pod.Spec.Volumes {
// Volume has inline CSI driver assigned
if volume.CSI != nil && volume.CSI.Driver == cfg.driverName {
pod.Spec.SchedulerName = cfg.schedulerName
return &kwhmutating.MutatorResult{MutatedObject: pod}, nil
// Check PVCs
for _, pvcName := range pvcNames {
var discoveredProvisioner string
pvc, err := cli.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, pvcName, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
logrus.Warnf("Failed to get PVC %s/%s: %v", namespace, pvcName, err)
continue
}
// Try to gather provisioner name from annotations
if pvc != nil {
if provisioner, ok := pvc.Annotations[annStorageProvisioner]; ok {
discoveredProvisioner = provisioner
}
// Volume is not PVC, it does not interest us
if volume.PersistentVolumeClaim == nil {
continue
if provisioner, ok := pvc.Annotations[annBetaStorageProvisioner]; ok {
discoveredProvisioner = provisioner
}
pvcNames = append(pvcNames, volume.PersistentVolumeClaim.ClaimName)
}

// Check PVCs
for _, pvcName := range pvcNames {
var discoveredProvisioner string
pvc, err := cli.CoreV1().PersistentVolumeClaims(ar.Namespace).Get(ctx, pvcName, metav1.GetOptions{})
// Try to gather provisioner name from associated StorageClass
if discoveredProvisioner == "" && pvc != nil && pvc.Spec.StorageClassName != nil && *pvc.Spec.StorageClassName != "" {
sc, err := cli.StorageV1().StorageClasses().Get(ctx, *pvc.Spec.StorageClassName, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
return &kwhmutating.MutatorResult{}, err
}
// Try to gather provisioner name from annotations
if pvc != nil {
if provisioner, ok := pvc.Annotations[annStorageProvisioner]; ok {
discoveredProvisioner = provisioner
}
if provisioner, ok := pvc.Annotations[annBetaStorageProvisioner]; ok {
discoveredProvisioner = provisioner
}
logrus.Warnf("Failed to get StorageClass %s: %v", *pvc.Spec.StorageClassName, err)
continue
}
// Try to gather provisioner name from associated StorageClass
if discoveredProvisioner == "" && pvc.Spec.StorageClassName != nil && *pvc.Spec.StorageClassName != "" {
sc, err := cli.StorageV1().StorageClasses().Get(ctx, *pvc.Spec.StorageClassName, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
return &kwhmutating.MutatorResult{}, err
}
if sc != nil && sc.Provisioner == cfg.driverName {
discoveredProvisioner = sc.Provisioner
}
if sc != nil && sc.Provisioner == cfg.driverName {
discoveredProvisioner = sc.Provisioner
}
// Try to gather provisioner name from associated PV
if discoveredProvisioner == "" && pvc.Spec.VolumeName != "" {
pv, err := cli.CoreV1().PersistentVolumes().Get(ctx, pvc.Spec.VolumeName, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
return &kwhmutating.MutatorResult{}, err
}
if pv != nil && pv.Spec.CSI != nil {
discoveredProvisioner = pv.Spec.CSI.Driver
}
}
// Try to gather provisioner name from associated PV
if discoveredProvisioner == "" && pvc != nil && pvc.Spec.VolumeName != "" {
pv, err := cli.CoreV1().PersistentVolumes().Get(ctx, pvc.Spec.VolumeName, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
logrus.Warnf("Failed to get PV %s: %v", pvc.Spec.VolumeName, err)
continue
}
// Overwrite the scheduler name
if discoveredProvisioner == cfg.driverName {
pod.Spec.SchedulerName = cfg.schedulerName
break
if pv != nil && pv.Spec.CSI != nil {
discoveredProvisioner = pv.Spec.CSI.Driver
}
}
// Overwrite the scheduler name
if discoveredProvisioner == cfg.driverName {
return true
}
}

return &kwhmutating.MutatorResult{MutatedObject: pod}, nil
})
return false
}

// Create webhook.
mcfg := kwhmutating.WebhookConfig{
ID: "linstor-scheduler-admission",
Mutator: mt,
Logger: logger,
}
wh, err := kwhmutating.NewWebhook(mcfg)
func handleMutate(w http.ResponseWriter, r *http.Request, cli kubernetes.Interface, cfg *config) {
body, err := io.ReadAll(r.Body)
if err != nil {
return fmt.Errorf("error creating webhook: %w", err)
logrus.Errorf("Failed to read request body: %v", err)
http.Error(w, "failed to read body", http.StatusBadRequest)
return
}

// Get HTTP handler from webhook.
whHandler, err := kwhhttp.HandlerFor(kwhhttp.HandlerConfig{Webhook: wh, Logger: logger})
if err != nil {
return fmt.Errorf("error creating webhook handler: %w", err)
var review admissionv1.AdmissionReview
if err := json.Unmarshal(body, &review); err != nil {
logrus.Errorf("Failed to decode admission review: %v", err)
http.Error(w, "failed to decode admission review", http.StatusBadRequest)
return
}

// Serve.
logger.Infof("Listening on :8080")
err = http.ListenAndServeTLS(":8080", cfg.certFile, cfg.keyFile, whHandler)
if err != nil {
return fmt.Errorf("error serving webhook: %w", err)
if review.Request == nil {
logrus.Error("Admission review has no request")
http.Error(w, "missing request", http.StatusBadRequest)
return
}

response := admissionv1.AdmissionResponse{
UID: review.Request.UID,
Allowed: true,
}

// Deserialize pod for READING only. Unknown fields (like init container
// restartPolicy on older k8s.io/api versions) are dropped during
// deserialization, but that's fine — we only read known fields for
// decision-making. Mutations use a targeted JSON Patch that never
// touches fields we didn't explicitly set.
var pod corev1.Pod
if err := json.Unmarshal(review.Request.Object.Raw, &pod); err != nil {
logrus.Errorf("Failed to deserialize pod: %v", err)
response.Result = &metav1.Status{Message: fmt.Sprintf("failed to deserialize pod: %v", err)}
writeAdmissionResponse(w, &response)
return
}

// Scheduler name is already assigned to a non-default scheduler
if pod.Spec.SchedulerName != "" && pod.Spec.SchedulerName != "default-scheduler" {
writeAdmissionResponse(w, &response)
return
}

return nil
ctx := r.Context()
if shouldMutateScheduler(ctx, &pod, review.Request.Namespace, cli, cfg) {
patch := []jsonPatchOp{{
Op: "add",
Path: "/spec/schedulerName",
Value: cfg.schedulerName,
}}
patchBytes, err := json.Marshal(patch)
if err != nil {
logrus.Errorf("Failed to marshal patch: %v", err)
writeAdmissionResponse(w, &response)
return
}
patchType := admissionv1.PatchTypeJSONPatch
response.Patch = patchBytes
response.PatchType = &patchType
logrus.Debugf("Mutating pod %s/%s: setting schedulerName=%s",
review.Request.Namespace, pod.Name, cfg.schedulerName)
}

writeAdmissionResponse(w, &response)
}

func writeAdmissionResponse(w http.ResponseWriter, response *admissionv1.AdmissionResponse) {
review := admissionv1.AdmissionReview{
TypeMeta: metav1.TypeMeta{
APIVersion: "admission.k8s.io/v1",
Kind: "AdmissionReview",
},
Response: response,
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(review); err != nil {
logrus.Errorf("Failed to encode admission response: %v", err)
}
}

func run(cli kubernetes.Interface) error {
cfg := initFlags()

mux := http.NewServeMux()
mux.HandleFunc("/mutate", func(w http.ResponseWriter, r *http.Request) {
handleMutate(w, r, cli, cfg)
})

logrus.Infof("Listening on :8080")
return http.ListenAndServeTLS(":8080", cfg.certFile, cfg.keyFile, mux)
}

func main() {
Expand All @@ -166,19 +220,11 @@ func main() {
}
}

// GetK8sSTDClients returns a all k8s clients.
// GetK8sSTDClients returns the kubernetes clientset using in-cluster config.
func GetK8sSTDClients() (kubernetes.Interface, error) {
// creates the in-cluster config
config, err := rest.InClusterConfig()
if err != nil {
panic(err.Error())
}

// Get the client.
k8sCli, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
return nil, fmt.Errorf("error getting in-cluster config: %w", err)
}

return k8sCli, nil
return kubernetes.NewForConfig(config)
}
3 changes: 0 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ require (
github.com/libopenstorage/stork v1.4.1-0.20220512171133-b99428ee1ddf
github.com/portworx/sched-ops v1.20.4-rc1.0.20220401024625-dbc61a336f65
github.com/sirupsen/logrus v1.9.0
github.com/slok/kubewebhook/v2 v2.1.0
github.com/urfave/cli v1.22.5
k8s.io/api v0.25.6
k8s.io/apimachinery v0.25.6
Expand Down Expand Up @@ -137,8 +136,6 @@ require (
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
gomodules.xyz/jsonpatch/v3 v3.0.1 // indirect
gomodules.xyz/orderedmap v0.1.0 // indirect
google.golang.org/api v0.114.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
Expand Down
10 changes: 0 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1136,7 +1136,6 @@ github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3O
github.com/prometheus/client_golang v1.5.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.9.0/go.mod h1:FqZLKOZnGdFAhOK4nqGHa7D66IdsO+O441Eve7ptJDU=
github.com/prometheus/client_golang v1.10.0/go.mod h1:WJM3cc3yu7XKBKa/I8WeZm+V3eltZnBwfENSU7mdogU=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY=
Expand Down Expand Up @@ -1164,7 +1163,6 @@ github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt2
github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.15.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
github.com/prometheus/common v0.18.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/common v0.37.0 h1:ccBbHCgIiT9uSoFY0vX8H3zsNR5eLt17/RQLUvn8pXE=
Expand Down Expand Up @@ -1234,8 +1232,6 @@ github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/skyrings/skyring-common v0.0.0-20160929130248-d1c0bb1cbd5e/go.mod h1:d8hQseuYt4rJoOo21lFzYJdhMjmDqLY++ayArbgYjWI=
github.com/slok/kubewebhook/v2 v2.1.0 h1:IBWFP2zBxsjq/UApmQNct7eZ4s0Cg5+8VS+x8l5+rwI=
github.com/slok/kubewebhook/v2 v2.1.0/go.mod h1:N0boRHV3HgWsAgHJedFDzlYojoxd+QE3gYE0bjMBaoo=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v1.1.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs=
Expand Down Expand Up @@ -1277,7 +1273,6 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand Down Expand Up @@ -1681,7 +1676,6 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210301091718-77cc2087c03b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down Expand Up @@ -1838,10 +1832,6 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gomodules.xyz/jsonpatch/v2 v2.2.0 h1:4pT439QV83L+G9FkcCriY6EkpcK6r6bK+A5FBUMI7qY=
gomodules.xyz/jsonpatch/v2 v2.2.0/go.mod h1:WXp+iVDkoLQqPudfQ9GBlwB2eZ5DKOnjQZCYdOS8GPY=
gomodules.xyz/jsonpatch/v3 v3.0.1 h1:Te7hKxV52TKCbNYq3t84tzKav3xhThdvSsSp/W89IyI=
gomodules.xyz/jsonpatch/v3 v3.0.1/go.mod h1:CBhndykehEwTOlEfnsfJwvkFQbSN8YZFr9M+cIHAJto=
gomodules.xyz/orderedmap v0.1.0 h1:fM/+TGh/O1KkqGR5xjTKg6bU8OKBkg7p0Y+x/J9m8Os=
gomodules.xyz/orderedmap v0.1.0/go.mod h1:g9/TPUCm1t2gwD3j3zfV8uylyYhVdCNSi+xCEIu7yTU=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.0.0-20190331200053-3d26580ed485/go.mod h1:2ltnJ7xHfj0zHS40VVPYEAAMTa3ZGguvHGBSJeRWqE0=
gonum.org/v1/gonum v0.6.2/go.mod h1:9mxDZsDKxgMAuccQkewq682L+0eCu4dCN2yonUJTCLU=
Expand Down