From 1f4fc7e7b7579f013ac5096cd01237721994b8f8 Mon Sep 17 00:00:00 2001 From: gadi-didi Date: Mon, 10 Nov 2025 17:20:09 +0200 Subject: [PATCH 01/10] nvmeof: add errors.go for nvmeof driver init that file. in the future add more nvmeof errors to this file. Signed-off-by: gadi-didi --- internal/nvmeof/errors/errors.go | 60 ++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 internal/nvmeof/errors/errors.go diff --git a/internal/nvmeof/errors/errors.go b/internal/nvmeof/errors/errors.go new file mode 100644 index 00000000000..a2ab54a61c4 --- /dev/null +++ b/internal/nvmeof/errors/errors.go @@ -0,0 +1,60 @@ +/* +Copyright 2025 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nvmeoferrors + +import ( + "errors" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// TODO: next PR - look at nvmeof package and see if any errors can be moved here. +var ( + // ErrRbdQoSExists is returned when the user tries to set a QoS for namespace, and that namespace + // already has a RBD QoS associated with it. + ErrRbdQoSExists = errors.New("QoS limits already configured (RBD QoS exists)") + // ErrMetadataNotFound is returned when NVMe-oF volume metadata is not found. + ErrMetadataNotFound = errors.New("metadata not found") + // ErrMetadataCorrupted is returned when NVMe-oF volume metadata(=rbd metadata) is corrupted or invalid. + ErrMetadataCorrupted = errors.New("metadata corrupted or invalid") +) + +// errorToGRPCCode maps custom errors to gRPC status codes. +var errorToGRPCCode = map[error]codes.Code{ + ErrMetadataNotFound: codes.NotFound, + ErrMetadataCorrupted: codes.Internal, + ErrRbdQoSExists: codes.InvalidArgument, +} + +// ToGRPCError converts a custom error to a gRPC status error. +// If the error is not recognized, it returns codes.Internal. +func ToGRPCError(err error) error { + if err == nil { + return nil + } + + // Check if it's one of our custom errors + for customErr, code := range errorToGRPCCode { + if errors.Is(err, customErr) { + return status.Error(code, err.Error()) + } + } + + // Unknown error - return Internal + return status.Errorf(codes.Internal, "internal error: %v", err) +} From 2bb7459420870c88ad4bd6e6fe1dbd48c2ad6fe8 Mon Sep 17 00:00:00 2001 From: gadi-didi Date: Wed, 12 Nov 2025 15:28:09 +0200 Subject: [PATCH 02/10] nvmeof: change getNVMeoFMetadata() function returns make getNVMeoFMetadata() returns nvmeof error codes. Signed-off-by: gadi-didi --- .../nvmeof/controller/controllerserver.go | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/internal/nvmeof/controller/controllerserver.go b/internal/nvmeof/controller/controllerserver.go index 6ea472ca4c1..43bc77b7da2 100644 --- a/internal/nvmeof/controller/controllerserver.go +++ b/internal/nvmeof/controller/controllerserver.go @@ -282,7 +282,7 @@ func (cs *Server) ControllerUnpublishVolume( if err != nil { log.ErrorLog(ctx, "failed to get NVMe-oF metadata for volumeID %s: %v", volumeID, err) - return nil, status.Errorf(codes.Internal, "failed to get NVMe-oF metadata: %v", err) + return nil, nvmeoferrors.ToGRPCError(err) } // Unpublish NVMe-oF resources @@ -799,7 +799,8 @@ func (cs *Server) getNVMeoFMetadata( // Get RBD volume rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) if err != nil { - return nil, fmt.Errorf("failed to find volume with ID %q: %w", volumeID, err) + return nil, fmt.Errorf("%w: failed to find volume with ID %q: %w", + nvmeoferrors.ErrMetadataNotFound, volumeID, err) } defer rbdVol.Destroy(ctx) @@ -820,10 +821,12 @@ func (cs *Server) getNVMeoFMetadata( for _, key := range requiredKeys { value, err := rbdVol.GetMetadata(key) if err != nil { - return nil, fmt.Errorf("failed to get %s: %w", key, err) + return nil, fmt.Errorf("%w: failed to get %s: %w", + nvmeoferrors.ErrMetadataNotFound, key, err) } if value == "" { - return nil, fmt.Errorf("metadata %s is empty", key) + return nil, fmt.Errorf("%w: metadata %s is empty", + nvmeoferrors.ErrMetadataNotFound, key) } metadata[key] = value } @@ -831,18 +834,21 @@ func (cs *Server) getNVMeoFMetadata( // Parse namespace ID nsid, err := strconv.ParseUint(metadata[toRBDMetadataKey(vcNamespaceID)], 10, 32) if err != nil { - return nil, fmt.Errorf("invalid namespace ID: %w", err) + return nil, fmt.Errorf("%w: invalid namespace ID: %w", + nvmeoferrors.ErrMetadataCorrupted, err) } gatewayPort, err := strconv.ParseUint(metadata[toRBDMetadataKey(vcGatewayPort)], 10, 32) if err != nil { - return nil, fmt.Errorf("invalid gateway port: %w", err) + return nil, fmt.Errorf("%w: invalid gateway port: %w", + nvmeoferrors.ErrMetadataCorrupted, err) } // Parse listeners from JSON var listeners []nvmeof.ListenerDetails if err := json.Unmarshal([]byte(metadata[toRBDMetadataKey(vcListeners)]), &listeners); err != nil { - return nil, fmt.Errorf("failed to parse listeners JSON: %w", err) + return nil, fmt.Errorf("%w: failed to parse listeners JSON: %w", + nvmeoferrors.ErrMetadataCorrupted, err) } // Construct NVMe-oF volume data From e90f0da1b71af02f9fe75212fd36a024ebc88d06 Mon Sep 17 00:00:00 2001 From: gadi-didi Date: Wed, 29 Oct 2025 13:54:15 +0200 Subject: [PATCH 03/10] nvmeof: adding od Qos grpc call Qos for nvmeof namespace is added. allow the user limit the namesapce capabilities. Signed-off-by: gadi-didi --- internal/nvmeof/nvmeof.go | 38 ++++++++++++++++++++++ internal/nvmeof/qos.go | 66 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+) create mode 100644 internal/nvmeof/qos.go diff --git a/internal/nvmeof/nvmeof.go b/internal/nvmeof/nvmeof.go index 093d6864af3..a814a7e5377 100644 --- a/internal/nvmeof/nvmeof.go +++ b/internal/nvmeof/nvmeof.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/proto" + nvmeoferrors "github.com/ceph/ceph-csi/internal/nvmeof/errors" "github.com/ceph/ceph-csi/internal/util/log" ) @@ -167,6 +168,43 @@ func (gw *GatewayRpcClient) DeleteNamespace(ctx context.Context, subsystemNQN st status.GetStatus(), status.GetErrorMessage()) } +// SetQoSLimitsForNamespace sets QoS limits on a namespace. +func (gw *GatewayRpcClient) SetQoSLimitsForNamespace( + ctx context.Context, + subsystemNQN string, + namespaceID uint32, + qos NVMeoFQosVolume, +) error { + log.DebugLog(ctx, "Setting QoS limits on namespace %d in subsystem %s", namespaceID, subsystemNQN) + + req := &pb.NamespaceSetQosReq{ + SubsystemNqn: subsystemNQN, + Nsid: namespaceID, + RwIosPerSecond: qos.RwIosPerSecond, + RwMbytesPerSecond: qos.RwMbytesPerSecond, + RMbytesPerSecond: qos.RMbytesPerSecond, + WMbytesPerSecond: qos.WMbytesPerSecond, + } + + status, err := gw.client.NamespaceSetQosLimits(ctx, req) + if err != nil { + return fmt.Errorf("failed to set QoS limits on namespace %d: %w", namespaceID, err) + } + + if status.GetStatus() == 0 { + log.DebugLog(ctx, "QoS limits set successfully on namespace %d", namespaceID) + + return nil + } + + if status.GetStatus() == int32(syscall.EEXIST) { // EEXIST + return fmt.Errorf("%w: %s", nvmeoferrors.ErrRbdQoSExists, status.GetErrorMessage()) + } + + return fmt.Errorf("gateway SetNamespaceQos returned error (status=%d): %s", + status.GetStatus(), status.GetErrorMessage()) +} + // GetUUIDBySubsystemAndNameSpaceID get the uuid of namespace by given subsystem and ns-id. func (gw *GatewayRpcClient) GetUUIDBySubsystemAndNameSpaceID( ctx context.Context, diff --git a/internal/nvmeof/qos.go b/internal/nvmeof/qos.go new file mode 100644 index 00000000000..2c16158a882 --- /dev/null +++ b/internal/nvmeof/qos.go @@ -0,0 +1,66 @@ +/* +Copyright 2025 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package nvmeof + +import ( + "fmt" + "strings" +) + +// NVMeoFQosVolume holds the QoS parameters for an NVMe-oF volume. +type NVMeoFQosVolume struct { + RwIosPerSecond *uint64 // R/W IOs per second limit, 0 means unlimited. + RwMbytesPerSecond *uint64 // R/W megabytes per second limit, 0 means unlimited. + RMbytesPerSecond *uint64 // Read megabytes per second limit, 0 means unlimited. + WMbytesPerSecond *uint64 // Write megabytes per second limit, 0 means unlimited. +} + +// QoS parameter keys that user can set. +// these parameters are used to configure the QoS limits for NVMe-oF volumes. +// They correspond to the fields in NVMeoFQosVolume struct. +const ( + RwIosPerSecond = "rwIosPerSecond" + RwMbytesPerSecond = "rwMbytesPerSecond" + RMbytesPerSecond = "rMbytesPerSecond" + WMbytesPerSecond = "wMbytesPerSecond" +) + +// String returns a string representation of the NVMeoFQosVolume. +func (q *NVMeoFQosVolume) String() string { + if q == nil { + return "nil" + } + + parts := []string{} + if q.RwIosPerSecond != nil { + parts = append(parts, fmt.Sprintf("RwIops=%d", *q.RwIosPerSecond)) + } + if q.RwMbytesPerSecond != nil { + parts = append(parts, fmt.Sprintf("RwMB/s=%d", *q.RwMbytesPerSecond)) + } + if q.RMbytesPerSecond != nil { + parts = append(parts, fmt.Sprintf("RMB/s=%d", *q.RMbytesPerSecond)) + } + if q.WMbytesPerSecond != nil { + parts = append(parts, fmt.Sprintf("WMB/s=%d", *q.WMbytesPerSecond)) + } + + if len(parts) == 0 { + return "no QoS limits" + } + + return strings.Join(parts, ", ") +} From 3e23e50c3a6fd807a7e9e61415c8875ebfe641c9 Mon Sep 17 00:00:00 2001 From: gadi-didi Date: Thu, 6 Nov 2025 18:44:22 +0200 Subject: [PATCH 04/10] rbd: add query if params have rbd qos vars added because want to reuse the private function parseQosParams() Signed-off-by: gadi-didi --- internal/rbd/qos.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/internal/rbd/qos.go b/internal/rbd/qos.go index 220942e7aad..51e703e875f 100644 --- a/internal/rbd/qos.go +++ b/internal/rbd/qos.go @@ -89,6 +89,18 @@ type qosSpec struct { present bool } +// HasQoSParams checks if any RBD QoS parameters are present. +func HasQoSParams(params map[string]string) bool { + rbdQosParams := parseQosParams(params) + for _, qos := range rbdQosParams { + if qos.present { + return true + } + } + + return false +} + func parseQosParams( scParams map[string]string, ) map[string]*qosSpec { From 0a23abf1466f18dd314bbc2ad21734f72303f763 Mon Sep 17 00:00:00 2001 From: gadi-didi Date: Wed, 29 Oct 2025 13:52:12 +0200 Subject: [PATCH 05/10] nvmeof: add the call to qos in CreateVolume() if qos params are provided, call to qos grpc after ns was created. Signed-off-by: gadi-didi --- .../nvmeof/controller/controllerserver.go | 74 ++++++++++++++++++- 1 file changed, 73 insertions(+), 1 deletion(-) diff --git a/internal/nvmeof/controller/controllerserver.go b/internal/nvmeof/controller/controllerserver.go index 43bc77b7da2..a5c0b49b2c8 100644 --- a/internal/nvmeof/controller/controllerserver.go +++ b/internal/nvmeof/controller/controllerserver.go @@ -312,6 +312,22 @@ func validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { if err != nil { return fmt.Errorf("invalid listeners parameter: %w", err) } + // Validate QoS parameters - cannot mix RBD and NVMe-oF QoS + mutableParams := req.GetMutableParameters() + + // check for RBD QoS parameters in both params and mutableParams + if hasRBDQoS := rbd.HasQoSParams(params); hasRBDQoS { + return errors.New("setting RBD QoS parameters on NVMe-oF volumes is not supported") + } + if hasRBDQoS := rbd.HasQoSParams(mutableParams); hasRBDQoS { + return errors.New("setting RBD QoS parameters on NVMe-oF volumes is not supported") + } + + // It take the mutableParams value from the volumeAttributesClassName in the PersistentVolumeClaim yaml. + _, err = parseQoSParameters(mutableParams) + if err != nil { + return fmt.Errorf("invalid NVMe-oF QoS parameters: %w", err) + } return nil } @@ -351,6 +367,44 @@ func parseListeners(listenersJSON string) ([]nvmeof.ListenerDetails, error) { return listeners, nil } +// parseQoSParameters extracts and parses QoS parameters from the given map. +func parseQoSParameters(params map[string]string) (*nvmeof.NVMeoFQosVolume, error) { + qos := &nvmeof.NVMeoFQosVolume{} + hasAnyQoS := false + + parseParam := func(key, name string, dest **uint64) error { + if val, exists := params[key]; exists && val != "" { + parsed, err := strconv.ParseUint(val, 10, 64) + if err != nil { + return fmt.Errorf("invalid %s: %w", name, err) + } + *dest = &parsed + hasAnyQoS = true + } + + return nil + } + + if err := parseParam(nvmeof.RwIosPerSecond, nvmeof.RwIosPerSecond, &qos.RwIosPerSecond); err != nil { + return nil, err + } + if err := parseParam(nvmeof.RwMbytesPerSecond, nvmeof.RwMbytesPerSecond, &qos.RwMbytesPerSecond); err != nil { + return nil, err + } + if err := parseParam(nvmeof.RMbytesPerSecond, nvmeof.RMbytesPerSecond, &qos.RMbytesPerSecond); err != nil { + return nil, err + } + if err := parseParam(nvmeof.WMbytesPerSecond, nvmeof.WMbytesPerSecond, &qos.WMbytesPerSecond); err != nil { + return nil, err + } + + if !hasAnyQoS { + return nil, nil + } + + return qos, nil +} + // ensureSubsystem checks if the subsystem exists, and creates it if not. // then creates the listener. func ensureSubsystem( @@ -434,7 +488,6 @@ func cleanupEmptySubsystem( } // createNVMeoFResources sets up the NVMe-oF resources for the given RBD volume. -// TODO - need to support multiple listeners. func (cs *Server) createNVMeoFResources( ctx context.Context, req *csi.CreateVolumeRequest, @@ -465,6 +518,16 @@ func (cs *Server) createNVMeoFResources( Port: uint32(nvmeofGatewayPort), }, } + // extract Qos parameters if any + mutableParams := req.GetMutableParameters() + // It take the mutableParams value from the volumeAttributesClassName in the PersistentVolumeClaim yaml. + // We already verified in the validateCreateVolumeRequest that there is no RBD QoS + nvmeofQoS, err := parseQoSParameters(mutableParams) + if err != nil { + log.ErrorLog(ctx, "failed to parse NVMe-oF QoS parameters: %v", err) + + return nil, fmt.Errorf("failed to parse QoS parameters: %w", err) + } // Step 2: Connect to gateway config, err := getGatewayConfigFromRequest(params) @@ -505,6 +568,15 @@ func (cs *Server) createNVMeoFResources( log.DebugLog(ctx, "Namespace created: %s/%s with NSID: %d", rbdPoolName, rbdImageName, nsid) nvmeofData.NamespaceID = nsid + // Step 5: Set QoS limits if any + if nvmeofQoS != nil { + log.DebugLog(ctx, "Setting QoS limits: %s", nvmeofQoS) + if err := gateway.SetQoSLimitsForNamespace(ctx, nvmeofData.SubsystemNQN, nvmeofData.NamespaceID, + *nvmeofQoS); err != nil { + return nil, fmt.Errorf("setting QoS limits failed: %w", err) + } + } + uuid, err := gateway.GetUUIDBySubsystemAndNameSpaceID(ctx, nvmeofData.SubsystemNQN, nvmeofData.NamespaceID) if err != nil { return nil, fmt.Errorf("get namespace uuid failed: %w", err) From 5935519c74bd0b993c69ed237f7054d958a6948a Mon Sep 17 00:00:00 2001 From: gadi-didi Date: Wed, 29 Oct 2025 14:52:38 +0200 Subject: [PATCH 06/10] nvmeof: add ControllerModifyVolume() fucntion the purpose of that function is to modify the qos for namesapce on the fly. Signed-off-by: gadi-didi --- .../nvmeof/controller/controllerserver.go | 116 ++++++++++++++++++ 1 file changed, 116 insertions(+) diff --git a/internal/nvmeof/controller/controllerserver.go b/internal/nvmeof/controller/controllerserver.go index a5c0b49b2c8..ae3b93538c1 100644 --- a/internal/nvmeof/controller/controllerserver.go +++ b/internal/nvmeof/controller/controllerserver.go @@ -29,6 +29,7 @@ import ( csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/nvmeof" + nvmeoferrors "github.com/ceph/ceph-csi/internal/nvmeof/errors" "github.com/ceph/ceph-csi/internal/rbd" rbdutil "github.com/ceph/ceph-csi/internal/rbd" rbddriver "github.com/ceph/ceph-csi/internal/rbd/driver" @@ -293,6 +294,42 @@ func (cs *Server) ControllerUnpublishVolume( return &csi.ControllerUnpublishVolumeResponse{}, nil } +// ControllerModifyVolume modifies the volume's QoS parameters. +func (cs *Server) ControllerModifyVolume( + ctx context.Context, + req *csi.ControllerModifyVolumeRequest, +) (*csi.ControllerModifyVolumeResponse, error) { + volumeID := req.GetVolumeId() + params := req.GetMutableParameters() + + // Step 1: Acquire volume lock + if acquired := cs.volumeLocks.TryAcquire(volumeID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + } + defer cs.volumeLocks.Release(volumeID) + + // Step 2: Parse QoS parameters from mutable_parameters + hasRBDQoS := rbd.HasQoSParams(params) + if hasRBDQoS { + log.ErrorLog(ctx, "Cannot set RBD QoS parameters on NVMe-oF volumes") + + return nil, status.Error(codes.InvalidArgument, "cannot set RBD QoS parameters on NVMe-oF volumes") + } + nvmeofQoS, err := parseQoSParameters(params) + if err != nil { + log.ErrorLog(ctx, "failed to parse NVMe-oF QoS parameters: %v", err) + + return nil, status.Errorf(codes.InvalidArgument, "failed to parse QoS parameters: %v", err) + } + if nvmeofQoS != nil { + return cs.modifyNVMeoFQoS(ctx, req, nvmeofQoS) + } + + return &csi.ControllerModifyVolumeResponse{}, nil +} + // validateCreateVolumeRequest validates the incoming request for nvmeof. // the rest of the parameters are validated by RBD. func validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { @@ -405,6 +442,85 @@ func parseQoSParameters(params map[string]string) (*nvmeof.NVMeoFQosVolume, erro return qos, nil } +// modifyNVMeoFQoS handles NVMe-oF gateway QoS modification. +func (cs *Server) modifyNVMeoFQoS( + ctx context.Context, + req *csi.ControllerModifyVolumeRequest, + qos *nvmeof.NVMeoFQosVolume, +) (*csi.ControllerModifyVolumeResponse, error) { + volumeID := req.GetVolumeId() + + // Step 1: Get secrets + + // Since ControllerModifyVolume doesn't receive volume context and dont have option to take secrets + // because there is no "csi.storage.k8s.io/controller-modify-secret-name" field in the SC !, + // the full solution for it is to use GetControllerExpandSecretRef but there is no such function yet. + // TODO: change the call to GetControllerExpandSecretRef once it is implemented. + secrets := req.GetSecrets() + if secrets == nil { + secretName, secretNamespace, err := util.GetControllerPublishSecretRef(volumeID, util.RBDType) + if err != nil { + log.ErrorLog(ctx, "Failed to get secret reference: %v", err) + + return nil, status.Errorf(codes.Internal, "failed to get secret reference: %v", err) + } + + secrets, err = k8s.GetSecret(secretName, secretNamespace) + if err != nil { + log.ErrorLog(ctx, "Failed to get secret from k8s: %v", err) + + return nil, status.Errorf(codes.Internal, "failed to get secret: %v", err) + } + } + + // Step 2: Get NVMe-oF metadata + nvmeofData, err := cs.getNVMeoFMetadata(ctx, secrets, volumeID) + if err != nil { + log.ErrorLog(ctx, "Failed to get NVMe-oF metadata: %v", err) + + return nil, nvmeoferrors.ToGRPCError(err) + } + + // Step 3: Connect to gateway + config := &nvmeof.GatewayConfig{ + Address: nvmeofData.GatewayManagementInfo.Address, + Port: nvmeofData.GatewayManagementInfo.Port, + } + gateway, err := connectGateway(ctx, config) + if err != nil { + log.ErrorLog(ctx, "Gateway connection failed: %v", err) + + return nil, status.Errorf(codes.Unavailable, "gateway connection failed: %v", err) + } + defer func() { + if closeErr := gateway.Destroy(); closeErr != nil { + log.ErrorLog(ctx, "Failed to close gateway connection: %v", closeErr) + } + }() + + // Step 4: Apply NVMe-oF QoS via gateway + log.DebugLog(ctx, "Setting QoS for subsystem=%s, nsid=%d", nvmeofData.SubsystemNQN, nvmeofData.NamespaceID) + + err = gateway.SetQoSLimitsForNamespace(ctx, nvmeofData.SubsystemNQN, nvmeofData.NamespaceID, *qos) + if err != nil { + // Check if error is EEXIST (RBD QoS already set) + if errors.Is(err, nvmeoferrors.ErrRbdQoSExists) { + log.ErrorLog(ctx, "RBD QoS already configured on volume") + + return nil, status.Error(codes.InvalidArgument, + "RBD QoS already configured on this volume, cannot set NVMe-oF gateway QoS") + } + + log.ErrorLog(ctx, "Failed to set QoS limits: %v", err) + + return nil, status.Errorf(codes.Internal, "failed to set QoS limits: %v", err) + } + + log.DebugLog(ctx, "Successfully modified NVMe-oF QoS for volume %s", volumeID) + + return &csi.ControllerModifyVolumeResponse{}, nil +} + // ensureSubsystem checks if the subsystem exists, and creates it if not. // then creates the listener. func ensureSubsystem( From 84a5c67ed2204b861901c1aee35f5c2a0383284b Mon Sep 17 00:00:00 2001 From: gadi-didi Date: Thu, 30 Oct 2025 14:31:16 +0200 Subject: [PATCH 07/10] nvmeof: add modify_volume capability modify_volume capability is added to nvmeof driver in order to call ControllerModifyVolume(). Signed-off-by: gadi-didi --- internal/nvmeof/driver/driver.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/nvmeof/driver/driver.go b/internal/nvmeof/driver/driver.go index 6263c958965..945575968a7 100644 --- a/internal/nvmeof/driver/driver.go +++ b/internal/nvmeof/driver/driver.go @@ -63,6 +63,7 @@ func (d *nvmeofDriver) Run(conf *util.Config) { cd.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{ csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME, + csi.ControllerServiceCapability_RPC_MODIFY_VOLUME, }) cd.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{ From 395d0b7312639f5971612dc99b895260673e8bfd Mon Sep 17 00:00:00 2001 From: gadi-didi Date: Thu, 30 Oct 2025 14:07:28 +0200 Subject: [PATCH 08/10] nvmeof: add stub ControllerExpandVolume for csi-resizer compatibility Add EXPAND_VOLUME capability and stub implementation to allow csi-resizer to start and handle VolumeAttributesClass modifications. Signed-off-by: gadi-didi --- internal/nvmeof/controller/controllerserver.go | 12 ++++++++++++ internal/nvmeof/driver/driver.go | 1 + 2 files changed, 13 insertions(+) diff --git a/internal/nvmeof/controller/controllerserver.go b/internal/nvmeof/controller/controllerserver.go index ae3b93538c1..d131fc93dee 100644 --- a/internal/nvmeof/controller/controllerserver.go +++ b/internal/nvmeof/controller/controllerserver.go @@ -330,6 +330,18 @@ func (cs *Server) ControllerModifyVolume( return &csi.ControllerModifyVolumeResponse{}, nil } +// ControllerExpandVolume handles volume expansion requests. +// For now it only updates the capacity in the response as NVMe-oF +// this must be added because ControllerModifyVolume requires the sidecar csi-resizer. and +// csi-resizer searches for the capacity ControllerServiceCapability_RPC_EXPAND_VOLUME. +// In the future, if NVMe-oF gateway supports volume expansion, the logic must be added here. +func (cs *Server) ControllerExpandVolume( + ctx context.Context, + req *csi.ControllerExpandVolumeRequest, +) (*csi.ControllerExpandVolumeResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "ControllerExpandVolume is not implemented for NVMe-oF volumes") +} + // validateCreateVolumeRequest validates the incoming request for nvmeof. // the rest of the parameters are validated by RBD. func validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { diff --git a/internal/nvmeof/driver/driver.go b/internal/nvmeof/driver/driver.go index 945575968a7..a900603d657 100644 --- a/internal/nvmeof/driver/driver.go +++ b/internal/nvmeof/driver/driver.go @@ -64,6 +64,7 @@ func (d *nvmeofDriver) Run(conf *util.Config) { csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME, csi.ControllerServiceCapability_RPC_MODIFY_VOLUME, + csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, }) cd.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{ From a3ea18fc064648caba1596ef7b24df35f7a41d8a Mon Sep 17 00:00:00 2001 From: gadi-didi Date: Mon, 10 Nov 2025 17:38:17 +0200 Subject: [PATCH 09/10] util: add ControllerModifyVolume to request ID extraction add ControllerModifyVolume to request ID extraction for proper log correlation. Signed-off-by: gadi-didi --- internal/csi-common/utils.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/csi-common/utils.go b/internal/csi-common/utils.go index 91fc3496545..4c5ac7521f6 100644 --- a/internal/csi-common/utils.go +++ b/internal/csi-common/utils.go @@ -209,6 +209,9 @@ func getReqID(req interface{}) string { case *csi.ControllerExpandVolumeRequest: reqID = r.GetVolumeId() + case *csi.ControllerModifyVolumeRequest: + reqID = r.GetVolumeId() + case *csi.ControllerPublishVolumeRequest: reqID = r.GetVolumeId() case *csi.ControllerUnpublishVolumeRequest: From f482113f31175633c960066eafe75c7f16b56e50 Mon Sep 17 00:00:00 2001 From: gadi-didi Date: Wed, 12 Nov 2025 15:49:36 +0200 Subject: [PATCH 10/10] nvmeof: add unit tests for parseQoSParameters add unit tests with multiple cases. Signed-off-by: gadi-didi --- internal/nvmeof/controller/qos_test.go | 153 +++++++++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 internal/nvmeof/controller/qos_test.go diff --git a/internal/nvmeof/controller/qos_test.go b/internal/nvmeof/controller/qos_test.go new file mode 100644 index 00000000000..35c8b615f8d --- /dev/null +++ b/internal/nvmeof/controller/qos_test.go @@ -0,0 +1,153 @@ +/* +Copyright 2025 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ceph/ceph-csi/internal/nvmeof" +) + +func TestParseQoSParameters(t *testing.T) { + t.Parallel() + + uint64Ptr := func(v uint64) *uint64 { return &v } + + tests := []struct { + name string + params map[string]string + expected *nvmeof.NVMeoFQosVolume + expectError bool + }{ + { + name: "empty parameters", + params: map[string]string{}, + expected: nil, + }, + { + name: "all QoS parameters", + params: map[string]string{ + nvmeof.RwIosPerSecond: "10000", + nvmeof.RwMbytesPerSecond: "100", + nvmeof.RMbytesPerSecond: "50", + nvmeof.WMbytesPerSecond: "50", + }, + expected: &nvmeof.NVMeoFQosVolume{ + RwIosPerSecond: uint64Ptr(10000), + RwMbytesPerSecond: uint64Ptr(100), + RMbytesPerSecond: uint64Ptr(50), + WMbytesPerSecond: uint64Ptr(50), + }, + }, + { + name: "single QoS parameter", + params: map[string]string{ + nvmeof.RwIosPerSecond: "5000", + }, + expected: &nvmeof.NVMeoFQosVolume{ + RwIosPerSecond: uint64Ptr(5000), + }, + }, + { + name: "zero value (unlimited)", + params: map[string]string{ + nvmeof.RwIosPerSecond: "0", + nvmeof.RwMbytesPerSecond: "100", + }, + expected: &nvmeof.NVMeoFQosVolume{ + RwIosPerSecond: uint64Ptr(0), + RwMbytesPerSecond: uint64Ptr(100), + }, + }, + { + name: "partial QoS parameters", + params: map[string]string{ + nvmeof.RwIosPerSecond: "10000", + nvmeof.RMbytesPerSecond: "50", + }, + expected: &nvmeof.NVMeoFQosVolume{ + RwIosPerSecond: uint64Ptr(10000), + RMbytesPerSecond: uint64Ptr(50), + }, + }, + { + name: "empty string values ignored", + params: map[string]string{ + nvmeof.RwIosPerSecond: "", + nvmeof.RwMbytesPerSecond: "100", + }, + expected: &nvmeof.NVMeoFQosVolume{ + RwMbytesPerSecond: uint64Ptr(100), + }, + }, + { + name: "invalid number format", + params: map[string]string{ + nvmeof.RwIosPerSecond: "invalid", + }, + expectError: true, + }, + { + name: "negative number", + params: map[string]string{ + nvmeof.RwIosPerSecond: "-100", + }, + expectError: true, + }, + { + name: "number too large", + params: map[string]string{ + nvmeof.RwIosPerSecond: "18446744073709551616", // uint64 max + 1 + }, + expectError: true, + }, + { + name: "floating point number", + params: map[string]string{ + nvmeof.RwMbytesPerSecond: "100.5", + }, + expectError: true, + }, + { + name: "mixed valid and invalid", + params: map[string]string{ + nvmeof.RwIosPerSecond: "10000", + nvmeof.RwMbytesPerSecond: "invalid", + }, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + result, err := parseQoSParameters(tt.params) + + if tt.expectError { + require.Error(t, err) + assert.Nil(t, result) + } else { + require.NoError(t, err) + assert.Equal(t, tt.expected, result) + } + }) + } +}