diff --git a/internal/csi-common/utils.go b/internal/csi-common/utils.go index 91fc3496545..4c5ac7521f6 100644 --- a/internal/csi-common/utils.go +++ b/internal/csi-common/utils.go @@ -209,6 +209,9 @@ func getReqID(req interface{}) string { case *csi.ControllerExpandVolumeRequest: reqID = r.GetVolumeId() + case *csi.ControllerModifyVolumeRequest: + reqID = r.GetVolumeId() + case *csi.ControllerPublishVolumeRequest: reqID = r.GetVolumeId() case *csi.ControllerUnpublishVolumeRequest: diff --git a/internal/nvmeof/controller/controllerserver.go b/internal/nvmeof/controller/controllerserver.go index 6ea472ca4c1..d131fc93dee 100644 --- a/internal/nvmeof/controller/controllerserver.go +++ b/internal/nvmeof/controller/controllerserver.go @@ -29,6 +29,7 @@ import ( csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/nvmeof" + nvmeoferrors "github.com/ceph/ceph-csi/internal/nvmeof/errors" "github.com/ceph/ceph-csi/internal/rbd" rbdutil "github.com/ceph/ceph-csi/internal/rbd" rbddriver "github.com/ceph/ceph-csi/internal/rbd/driver" @@ -282,7 +283,7 @@ func (cs *Server) ControllerUnpublishVolume( if err != nil { log.ErrorLog(ctx, "failed to get NVMe-oF metadata for volumeID %s: %v", volumeID, err) - return nil, status.Errorf(codes.Internal, "failed to get NVMe-oF metadata: %v", err) + return nil, nvmeoferrors.ToGRPCError(err) } // Unpublish NVMe-oF resources @@ -293,6 +294,54 @@ func (cs *Server) ControllerUnpublishVolume( return &csi.ControllerUnpublishVolumeResponse{}, nil } +// ControllerModifyVolume modifies the volume's QoS parameters. +func (cs *Server) ControllerModifyVolume( + ctx context.Context, + req *csi.ControllerModifyVolumeRequest, +) (*csi.ControllerModifyVolumeResponse, error) { + volumeID := req.GetVolumeId() + params := req.GetMutableParameters() + + // Step 1: Acquire volume lock + if acquired := cs.volumeLocks.TryAcquire(volumeID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + } + defer cs.volumeLocks.Release(volumeID) + + // Step 2: Parse QoS parameters from mutable_parameters + hasRBDQoS := rbd.HasQoSParams(params) + if hasRBDQoS { + log.ErrorLog(ctx, "Cannot set RBD QoS parameters on NVMe-oF volumes") + + return nil, status.Error(codes.InvalidArgument, "cannot set RBD QoS parameters on NVMe-oF volumes") + } + nvmeofQoS, err := parseQoSParameters(params) + if err != nil { + log.ErrorLog(ctx, "failed to parse NVMe-oF QoS parameters: %v", err) + + return nil, status.Errorf(codes.InvalidArgument, "failed to parse QoS parameters: %v", err) + } + if nvmeofQoS != nil { + return cs.modifyNVMeoFQoS(ctx, req, nvmeofQoS) + } + + return &csi.ControllerModifyVolumeResponse{}, nil +} + +// ControllerExpandVolume handles volume expansion requests. +// For now it only updates the capacity in the response as NVMe-oF +// this must be added because ControllerModifyVolume requires the sidecar csi-resizer. and +// csi-resizer searches for the capacity ControllerServiceCapability_RPC_EXPAND_VOLUME. +// In the future, if NVMe-oF gateway supports volume expansion, the logic must be added here. +func (cs *Server) ControllerExpandVolume( + ctx context.Context, + req *csi.ControllerExpandVolumeRequest, +) (*csi.ControllerExpandVolumeResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "ControllerExpandVolume is not implemented for NVMe-oF volumes") +} + // validateCreateVolumeRequest validates the incoming request for nvmeof. // the rest of the parameters are validated by RBD. func validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { @@ -312,6 +361,22 @@ func validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { if err != nil { return fmt.Errorf("invalid listeners parameter: %w", err) } + // Validate QoS parameters - cannot mix RBD and NVMe-oF QoS + mutableParams := req.GetMutableParameters() + + // check for RBD QoS parameters in both params and mutableParams + if hasRBDQoS := rbd.HasQoSParams(params); hasRBDQoS { + return errors.New("setting RBD QoS parameters on NVMe-oF volumes is not supported") + } + if hasRBDQoS := rbd.HasQoSParams(mutableParams); hasRBDQoS { + return errors.New("setting RBD QoS parameters on NVMe-oF volumes is not supported") + } + + // It take the mutableParams value from the volumeAttributesClassName in the PersistentVolumeClaim yaml. + _, err = parseQoSParameters(mutableParams) + if err != nil { + return fmt.Errorf("invalid NVMe-oF QoS parameters: %w", err) + } return nil } @@ -351,6 +416,123 @@ func parseListeners(listenersJSON string) ([]nvmeof.ListenerDetails, error) { return listeners, nil } +// parseQoSParameters extracts and parses QoS parameters from the given map. +func parseQoSParameters(params map[string]string) (*nvmeof.NVMeoFQosVolume, error) { + qos := &nvmeof.NVMeoFQosVolume{} + hasAnyQoS := false + + parseParam := func(key, name string, dest **uint64) error { + if val, exists := params[key]; exists && val != "" { + parsed, err := strconv.ParseUint(val, 10, 64) + if err != nil { + return fmt.Errorf("invalid %s: %w", name, err) + } + *dest = &parsed + hasAnyQoS = true + } + + return nil + } + + if err := parseParam(nvmeof.RwIosPerSecond, nvmeof.RwIosPerSecond, &qos.RwIosPerSecond); err != nil { + return nil, err + } + if err := parseParam(nvmeof.RwMbytesPerSecond, nvmeof.RwMbytesPerSecond, &qos.RwMbytesPerSecond); err != nil { + return nil, err + } + if err := parseParam(nvmeof.RMbytesPerSecond, nvmeof.RMbytesPerSecond, &qos.RMbytesPerSecond); err != nil { + return nil, err + } + if err := parseParam(nvmeof.WMbytesPerSecond, nvmeof.WMbytesPerSecond, &qos.WMbytesPerSecond); err != nil { + return nil, err + } + + if !hasAnyQoS { + return nil, nil + } + + return qos, nil +} + +// modifyNVMeoFQoS handles NVMe-oF gateway QoS modification. +func (cs *Server) modifyNVMeoFQoS( + ctx context.Context, + req *csi.ControllerModifyVolumeRequest, + qos *nvmeof.NVMeoFQosVolume, +) (*csi.ControllerModifyVolumeResponse, error) { + volumeID := req.GetVolumeId() + + // Step 1: Get secrets + + // Since ControllerModifyVolume doesn't receive volume context and dont have option to take secrets + // because there is no "csi.storage.k8s.io/controller-modify-secret-name" field in the SC !, + // the full solution for it is to use GetControllerExpandSecretRef but there is no such function yet. + // TODO: change the call to GetControllerExpandSecretRef once it is implemented. + secrets := req.GetSecrets() + if secrets == nil { + secretName, secretNamespace, err := util.GetControllerPublishSecretRef(volumeID, util.RBDType) + if err != nil { + log.ErrorLog(ctx, "Failed to get secret reference: %v", err) + + return nil, status.Errorf(codes.Internal, "failed to get secret reference: %v", err) + } + + secrets, err = k8s.GetSecret(secretName, secretNamespace) + if err != nil { + log.ErrorLog(ctx, "Failed to get secret from k8s: %v", err) + + return nil, status.Errorf(codes.Internal, "failed to get secret: %v", err) + } + } + + // Step 2: Get NVMe-oF metadata + nvmeofData, err := cs.getNVMeoFMetadata(ctx, secrets, volumeID) + if err != nil { + log.ErrorLog(ctx, "Failed to get NVMe-oF metadata: %v", err) + + return nil, nvmeoferrors.ToGRPCError(err) + } + + // Step 3: Connect to gateway + config := &nvmeof.GatewayConfig{ + Address: nvmeofData.GatewayManagementInfo.Address, + Port: nvmeofData.GatewayManagementInfo.Port, + } + gateway, err := connectGateway(ctx, config) + if err != nil { + log.ErrorLog(ctx, "Gateway connection failed: %v", err) + + return nil, status.Errorf(codes.Unavailable, "gateway connection failed: %v", err) + } + defer func() { + if closeErr := gateway.Destroy(); closeErr != nil { + log.ErrorLog(ctx, "Failed to close gateway connection: %v", closeErr) + } + }() + + // Step 4: Apply NVMe-oF QoS via gateway + log.DebugLog(ctx, "Setting QoS for subsystem=%s, nsid=%d", nvmeofData.SubsystemNQN, nvmeofData.NamespaceID) + + err = gateway.SetQoSLimitsForNamespace(ctx, nvmeofData.SubsystemNQN, nvmeofData.NamespaceID, *qos) + if err != nil { + // Check if error is EEXIST (RBD QoS already set) + if errors.Is(err, nvmeoferrors.ErrRbdQoSExists) { + log.ErrorLog(ctx, "RBD QoS already configured on volume") + + return nil, status.Error(codes.InvalidArgument, + "RBD QoS already configured on this volume, cannot set NVMe-oF gateway QoS") + } + + log.ErrorLog(ctx, "Failed to set QoS limits: %v", err) + + return nil, status.Errorf(codes.Internal, "failed to set QoS limits: %v", err) + } + + log.DebugLog(ctx, "Successfully modified NVMe-oF QoS for volume %s", volumeID) + + return &csi.ControllerModifyVolumeResponse{}, nil +} + // ensureSubsystem checks if the subsystem exists, and creates it if not. // then creates the listener. func ensureSubsystem( @@ -434,7 +616,6 @@ func cleanupEmptySubsystem( } // createNVMeoFResources sets up the NVMe-oF resources for the given RBD volume. -// TODO - need to support multiple listeners. func (cs *Server) createNVMeoFResources( ctx context.Context, req *csi.CreateVolumeRequest, @@ -465,6 +646,16 @@ func (cs *Server) createNVMeoFResources( Port: uint32(nvmeofGatewayPort), }, } + // extract Qos parameters if any + mutableParams := req.GetMutableParameters() + // It take the mutableParams value from the volumeAttributesClassName in the PersistentVolumeClaim yaml. + // We already verified in the validateCreateVolumeRequest that there is no RBD QoS + nvmeofQoS, err := parseQoSParameters(mutableParams) + if err != nil { + log.ErrorLog(ctx, "failed to parse NVMe-oF QoS parameters: %v", err) + + return nil, fmt.Errorf("failed to parse QoS parameters: %w", err) + } // Step 2: Connect to gateway config, err := getGatewayConfigFromRequest(params) @@ -505,6 +696,15 @@ func (cs *Server) createNVMeoFResources( log.DebugLog(ctx, "Namespace created: %s/%s with NSID: %d", rbdPoolName, rbdImageName, nsid) nvmeofData.NamespaceID = nsid + // Step 5: Set QoS limits if any + if nvmeofQoS != nil { + log.DebugLog(ctx, "Setting QoS limits: %s", nvmeofQoS) + if err := gateway.SetQoSLimitsForNamespace(ctx, nvmeofData.SubsystemNQN, nvmeofData.NamespaceID, + *nvmeofQoS); err != nil { + return nil, fmt.Errorf("setting QoS limits failed: %w", err) + } + } + uuid, err := gateway.GetUUIDBySubsystemAndNameSpaceID(ctx, nvmeofData.SubsystemNQN, nvmeofData.NamespaceID) if err != nil { return nil, fmt.Errorf("get namespace uuid failed: %w", err) @@ -799,7 +999,8 @@ func (cs *Server) getNVMeoFMetadata( // Get RBD volume rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) if err != nil { - return nil, fmt.Errorf("failed to find volume with ID %q: %w", volumeID, err) + return nil, fmt.Errorf("%w: failed to find volume with ID %q: %w", + nvmeoferrors.ErrMetadataNotFound, volumeID, err) } defer rbdVol.Destroy(ctx) @@ -820,10 +1021,12 @@ func (cs *Server) getNVMeoFMetadata( for _, key := range requiredKeys { value, err := rbdVol.GetMetadata(key) if err != nil { - return nil, fmt.Errorf("failed to get %s: %w", key, err) + return nil, fmt.Errorf("%w: failed to get %s: %w", + nvmeoferrors.ErrMetadataNotFound, key, err) } if value == "" { - return nil, fmt.Errorf("metadata %s is empty", key) + return nil, fmt.Errorf("%w: metadata %s is empty", + nvmeoferrors.ErrMetadataNotFound, key) } metadata[key] = value } @@ -831,18 +1034,21 @@ func (cs *Server) getNVMeoFMetadata( // Parse namespace ID nsid, err := strconv.ParseUint(metadata[toRBDMetadataKey(vcNamespaceID)], 10, 32) if err != nil { - return nil, fmt.Errorf("invalid namespace ID: %w", err) + return nil, fmt.Errorf("%w: invalid namespace ID: %w", + nvmeoferrors.ErrMetadataCorrupted, err) } gatewayPort, err := strconv.ParseUint(metadata[toRBDMetadataKey(vcGatewayPort)], 10, 32) if err != nil { - return nil, fmt.Errorf("invalid gateway port: %w", err) + return nil, fmt.Errorf("%w: invalid gateway port: %w", + nvmeoferrors.ErrMetadataCorrupted, err) } // Parse listeners from JSON var listeners []nvmeof.ListenerDetails if err := json.Unmarshal([]byte(metadata[toRBDMetadataKey(vcListeners)]), &listeners); err != nil { - return nil, fmt.Errorf("failed to parse listeners JSON: %w", err) + return nil, fmt.Errorf("%w: failed to parse listeners JSON: %w", + nvmeoferrors.ErrMetadataCorrupted, err) } // Construct NVMe-oF volume data diff --git a/internal/nvmeof/controller/qos_test.go b/internal/nvmeof/controller/qos_test.go new file mode 100644 index 00000000000..35c8b615f8d --- /dev/null +++ b/internal/nvmeof/controller/qos_test.go @@ -0,0 +1,153 @@ +/* +Copyright 2025 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ceph/ceph-csi/internal/nvmeof" +) + +func TestParseQoSParameters(t *testing.T) { + t.Parallel() + + uint64Ptr := func(v uint64) *uint64 { return &v } + + tests := []struct { + name string + params map[string]string + expected *nvmeof.NVMeoFQosVolume + expectError bool + }{ + { + name: "empty parameters", + params: map[string]string{}, + expected: nil, + }, + { + name: "all QoS parameters", + params: map[string]string{ + nvmeof.RwIosPerSecond: "10000", + nvmeof.RwMbytesPerSecond: "100", + nvmeof.RMbytesPerSecond: "50", + nvmeof.WMbytesPerSecond: "50", + }, + expected: &nvmeof.NVMeoFQosVolume{ + RwIosPerSecond: uint64Ptr(10000), + RwMbytesPerSecond: uint64Ptr(100), + RMbytesPerSecond: uint64Ptr(50), + WMbytesPerSecond: uint64Ptr(50), + }, + }, + { + name: "single QoS parameter", + params: map[string]string{ + nvmeof.RwIosPerSecond: "5000", + }, + expected: &nvmeof.NVMeoFQosVolume{ + RwIosPerSecond: uint64Ptr(5000), + }, + }, + { + name: "zero value (unlimited)", + params: map[string]string{ + nvmeof.RwIosPerSecond: "0", + nvmeof.RwMbytesPerSecond: "100", + }, + expected: &nvmeof.NVMeoFQosVolume{ + RwIosPerSecond: uint64Ptr(0), + RwMbytesPerSecond: uint64Ptr(100), + }, + }, + { + name: "partial QoS parameters", + params: map[string]string{ + nvmeof.RwIosPerSecond: "10000", + nvmeof.RMbytesPerSecond: "50", + }, + expected: &nvmeof.NVMeoFQosVolume{ + RwIosPerSecond: uint64Ptr(10000), + RMbytesPerSecond: uint64Ptr(50), + }, + }, + { + name: "empty string values ignored", + params: map[string]string{ + nvmeof.RwIosPerSecond: "", + nvmeof.RwMbytesPerSecond: "100", + }, + expected: &nvmeof.NVMeoFQosVolume{ + RwMbytesPerSecond: uint64Ptr(100), + }, + }, + { + name: "invalid number format", + params: map[string]string{ + nvmeof.RwIosPerSecond: "invalid", + }, + expectError: true, + }, + { + name: "negative number", + params: map[string]string{ + nvmeof.RwIosPerSecond: "-100", + }, + expectError: true, + }, + { + name: "number too large", + params: map[string]string{ + nvmeof.RwIosPerSecond: "18446744073709551616", // uint64 max + 1 + }, + expectError: true, + }, + { + name: "floating point number", + params: map[string]string{ + nvmeof.RwMbytesPerSecond: "100.5", + }, + expectError: true, + }, + { + name: "mixed valid and invalid", + params: map[string]string{ + nvmeof.RwIosPerSecond: "10000", + nvmeof.RwMbytesPerSecond: "invalid", + }, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + result, err := parseQoSParameters(tt.params) + + if tt.expectError { + require.Error(t, err) + assert.Nil(t, result) + } else { + require.NoError(t, err) + assert.Equal(t, tt.expected, result) + } + }) + } +} diff --git a/internal/nvmeof/driver/driver.go b/internal/nvmeof/driver/driver.go index 6263c958965..a900603d657 100644 --- a/internal/nvmeof/driver/driver.go +++ b/internal/nvmeof/driver/driver.go @@ -63,6 +63,8 @@ func (d *nvmeofDriver) Run(conf *util.Config) { cd.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{ csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME, + csi.ControllerServiceCapability_RPC_MODIFY_VOLUME, + csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, }) cd.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{ diff --git a/internal/nvmeof/errors/errors.go b/internal/nvmeof/errors/errors.go new file mode 100644 index 00000000000..a2ab54a61c4 --- /dev/null +++ b/internal/nvmeof/errors/errors.go @@ -0,0 +1,60 @@ +/* +Copyright 2025 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nvmeoferrors + +import ( + "errors" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// TODO: next PR - look at nvmeof package and see if any errors can be moved here. +var ( + // ErrRbdQoSExists is returned when the user tries to set a QoS for namespace, and that namespace + // already has a RBD QoS associated with it. + ErrRbdQoSExists = errors.New("QoS limits already configured (RBD QoS exists)") + // ErrMetadataNotFound is returned when NVMe-oF volume metadata is not found. + ErrMetadataNotFound = errors.New("metadata not found") + // ErrMetadataCorrupted is returned when NVMe-oF volume metadata(=rbd metadata) is corrupted or invalid. + ErrMetadataCorrupted = errors.New("metadata corrupted or invalid") +) + +// errorToGRPCCode maps custom errors to gRPC status codes. +var errorToGRPCCode = map[error]codes.Code{ + ErrMetadataNotFound: codes.NotFound, + ErrMetadataCorrupted: codes.Internal, + ErrRbdQoSExists: codes.InvalidArgument, +} + +// ToGRPCError converts a custom error to a gRPC status error. +// If the error is not recognized, it returns codes.Internal. +func ToGRPCError(err error) error { + if err == nil { + return nil + } + + // Check if it's one of our custom errors + for customErr, code := range errorToGRPCCode { + if errors.Is(err, customErr) { + return status.Error(code, err.Error()) + } + } + + // Unknown error - return Internal + return status.Errorf(codes.Internal, "internal error: %v", err) +} diff --git a/internal/nvmeof/nvmeof.go b/internal/nvmeof/nvmeof.go index 093d6864af3..a814a7e5377 100644 --- a/internal/nvmeof/nvmeof.go +++ b/internal/nvmeof/nvmeof.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/proto" + nvmeoferrors "github.com/ceph/ceph-csi/internal/nvmeof/errors" "github.com/ceph/ceph-csi/internal/util/log" ) @@ -167,6 +168,43 @@ func (gw *GatewayRpcClient) DeleteNamespace(ctx context.Context, subsystemNQN st status.GetStatus(), status.GetErrorMessage()) } +// SetQoSLimitsForNamespace sets QoS limits on a namespace. +func (gw *GatewayRpcClient) SetQoSLimitsForNamespace( + ctx context.Context, + subsystemNQN string, + namespaceID uint32, + qos NVMeoFQosVolume, +) error { + log.DebugLog(ctx, "Setting QoS limits on namespace %d in subsystem %s", namespaceID, subsystemNQN) + + req := &pb.NamespaceSetQosReq{ + SubsystemNqn: subsystemNQN, + Nsid: namespaceID, + RwIosPerSecond: qos.RwIosPerSecond, + RwMbytesPerSecond: qos.RwMbytesPerSecond, + RMbytesPerSecond: qos.RMbytesPerSecond, + WMbytesPerSecond: qos.WMbytesPerSecond, + } + + status, err := gw.client.NamespaceSetQosLimits(ctx, req) + if err != nil { + return fmt.Errorf("failed to set QoS limits on namespace %d: %w", namespaceID, err) + } + + if status.GetStatus() == 0 { + log.DebugLog(ctx, "QoS limits set successfully on namespace %d", namespaceID) + + return nil + } + + if status.GetStatus() == int32(syscall.EEXIST) { // EEXIST + return fmt.Errorf("%w: %s", nvmeoferrors.ErrRbdQoSExists, status.GetErrorMessage()) + } + + return fmt.Errorf("gateway SetNamespaceQos returned error (status=%d): %s", + status.GetStatus(), status.GetErrorMessage()) +} + // GetUUIDBySubsystemAndNameSpaceID get the uuid of namespace by given subsystem and ns-id. func (gw *GatewayRpcClient) GetUUIDBySubsystemAndNameSpaceID( ctx context.Context, diff --git a/internal/nvmeof/qos.go b/internal/nvmeof/qos.go new file mode 100644 index 00000000000..2c16158a882 --- /dev/null +++ b/internal/nvmeof/qos.go @@ -0,0 +1,66 @@ +/* +Copyright 2025 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package nvmeof + +import ( + "fmt" + "strings" +) + +// NVMeoFQosVolume holds the QoS parameters for an NVMe-oF volume. +type NVMeoFQosVolume struct { + RwIosPerSecond *uint64 // R/W IOs per second limit, 0 means unlimited. + RwMbytesPerSecond *uint64 // R/W megabytes per second limit, 0 means unlimited. + RMbytesPerSecond *uint64 // Read megabytes per second limit, 0 means unlimited. + WMbytesPerSecond *uint64 // Write megabytes per second limit, 0 means unlimited. +} + +// QoS parameter keys that user can set. +// these parameters are used to configure the QoS limits for NVMe-oF volumes. +// They correspond to the fields in NVMeoFQosVolume struct. +const ( + RwIosPerSecond = "rwIosPerSecond" + RwMbytesPerSecond = "rwMbytesPerSecond" + RMbytesPerSecond = "rMbytesPerSecond" + WMbytesPerSecond = "wMbytesPerSecond" +) + +// String returns a string representation of the NVMeoFQosVolume. +func (q *NVMeoFQosVolume) String() string { + if q == nil { + return "nil" + } + + parts := []string{} + if q.RwIosPerSecond != nil { + parts = append(parts, fmt.Sprintf("RwIops=%d", *q.RwIosPerSecond)) + } + if q.RwMbytesPerSecond != nil { + parts = append(parts, fmt.Sprintf("RwMB/s=%d", *q.RwMbytesPerSecond)) + } + if q.RMbytesPerSecond != nil { + parts = append(parts, fmt.Sprintf("RMB/s=%d", *q.RMbytesPerSecond)) + } + if q.WMbytesPerSecond != nil { + parts = append(parts, fmt.Sprintf("WMB/s=%d", *q.WMbytesPerSecond)) + } + + if len(parts) == 0 { + return "no QoS limits" + } + + return strings.Join(parts, ", ") +} diff --git a/internal/rbd/qos.go b/internal/rbd/qos.go index 220942e7aad..51e703e875f 100644 --- a/internal/rbd/qos.go +++ b/internal/rbd/qos.go @@ -89,6 +89,18 @@ type qosSpec struct { present bool } +// HasQoSParams checks if any RBD QoS parameters are present. +func HasQoSParams(params map[string]string) bool { + rbdQosParams := parseQosParams(params) + for _, qos := range rbdQosParams { + if qos.present { + return true + } + } + + return false +} + func parseQosParams( scParams map[string]string, ) map[string]*qosSpec {