Skip to content
Closed
3 changes: 3 additions & 0 deletions internal/csi-common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ func getReqID(req interface{}) string {
case *csi.ControllerExpandVolumeRequest:
reqID = r.GetVolumeId()

case *csi.ControllerModifyVolumeRequest:
reqID = r.GetVolumeId()

case *csi.ControllerPublishVolumeRequest:
reqID = r.GetVolumeId()
case *csi.ControllerUnpublishVolumeRequest:
Expand Down
222 changes: 214 additions & 8 deletions internal/nvmeof/controller/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/nvmeof"
nvmeoferrors "github.com/ceph/ceph-csi/internal/nvmeof/errors"
"github.com/ceph/ceph-csi/internal/rbd"
rbdutil "github.com/ceph/ceph-csi/internal/rbd"
rbddriver "github.com/ceph/ceph-csi/internal/rbd/driver"
Expand Down Expand Up @@ -282,7 +283,7 @@ func (cs *Server) ControllerUnpublishVolume(
if err != nil {
log.ErrorLog(ctx, "failed to get NVMe-oF metadata for volumeID %s: %v", volumeID, err)

return nil, status.Errorf(codes.Internal, "failed to get NVMe-oF metadata: %v", err)
return nil, nvmeoferrors.ToGRPCError(err)
}

// Unpublish NVMe-oF resources
Expand All @@ -293,6 +294,54 @@ func (cs *Server) ControllerUnpublishVolume(
return &csi.ControllerUnpublishVolumeResponse{}, nil
}

// ControllerModifyVolume modifies the volume's QoS parameters.
func (cs *Server) ControllerModifyVolume(
ctx context.Context,
req *csi.ControllerModifyVolumeRequest,
) (*csi.ControllerModifyVolumeResponse, error) {
volumeID := req.GetVolumeId()
params := req.GetMutableParameters()

// Step 1: Acquire volume lock
if acquired := cs.volumeLocks.TryAcquire(volumeID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)

return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer cs.volumeLocks.Release(volumeID)

// Step 2: Parse QoS parameters from mutable_parameters
hasRBDQoS := rbd.HasQoSParams(params)
if hasRBDQoS {
log.ErrorLog(ctx, "Cannot set RBD QoS parameters on NVMe-oF volumes")

return nil, status.Error(codes.InvalidArgument, "cannot set RBD QoS parameters on NVMe-oF volumes")
}
nvmeofQoS, err := parseQoSParameters(params)
if err != nil {
log.ErrorLog(ctx, "failed to parse NVMe-oF QoS parameters: %v", err)

return nil, status.Errorf(codes.InvalidArgument, "failed to parse QoS parameters: %v", err)
}
if nvmeofQoS != nil {
return cs.modifyNVMeoFQoS(ctx, req, nvmeofQoS)
}

return &csi.ControllerModifyVolumeResponse{}, nil
}

// ControllerExpandVolume handles volume expansion requests.
// For now it only updates the capacity in the response as NVMe-oF
// this must be added because ControllerModifyVolume requires the sidecar csi-resizer. and
// csi-resizer searches for the capacity ControllerServiceCapability_RPC_EXPAND_VOLUME.
// In the future, if NVMe-oF gateway supports volume expansion, the logic must be added here.
func (cs *Server) ControllerExpandVolume(
ctx context.Context,
req *csi.ControllerExpandVolumeRequest,
) (*csi.ControllerExpandVolumeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "ControllerExpandVolume is not implemented for NVMe-oF volumes")
}

// validateCreateVolumeRequest validates the incoming request for nvmeof.
// the rest of the parameters are validated by RBD.
func validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
Expand All @@ -312,6 +361,22 @@ func validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
if err != nil {
return fmt.Errorf("invalid listeners parameter: %w", err)
}
// Validate QoS parameters - cannot mix RBD and NVMe-oF QoS
mutableParams := req.GetMutableParameters()

// check for RBD QoS parameters in both params and mutableParams
if hasRBDQoS := rbd.HasQoSParams(params); hasRBDQoS {
return errors.New("setting RBD QoS parameters on NVMe-oF volumes is not supported")
}
if hasRBDQoS := rbd.HasQoSParams(mutableParams); hasRBDQoS {
return errors.New("setting RBD QoS parameters on NVMe-oF volumes is not supported")
}

// It take the mutableParams value from the volumeAttributesClassName in the PersistentVolumeClaim yaml.
_, err = parseQoSParameters(mutableParams)
if err != nil {
return fmt.Errorf("invalid NVMe-oF QoS parameters: %w", err)
}

return nil
}
Expand Down Expand Up @@ -351,6 +416,123 @@ func parseListeners(listenersJSON string) ([]nvmeof.ListenerDetails, error) {
return listeners, nil
}

// parseQoSParameters extracts and parses QoS parameters from the given map.
func parseQoSParameters(params map[string]string) (*nvmeof.NVMeoFQosVolume, error) {
qos := &nvmeof.NVMeoFQosVolume{}
hasAnyQoS := false

parseParam := func(key, name string, dest **uint64) error {
if val, exists := params[key]; exists && val != "" {
parsed, err := strconv.ParseUint(val, 10, 64)
if err != nil {
return fmt.Errorf("invalid %s: %w", name, err)
}
*dest = &parsed
hasAnyQoS = true
}

return nil
}

if err := parseParam(nvmeof.RwIosPerSecond, nvmeof.RwIosPerSecond, &qos.RwIosPerSecond); err != nil {
return nil, err
}
if err := parseParam(nvmeof.RwMbytesPerSecond, nvmeof.RwMbytesPerSecond, &qos.RwMbytesPerSecond); err != nil {
return nil, err
}
if err := parseParam(nvmeof.RMbytesPerSecond, nvmeof.RMbytesPerSecond, &qos.RMbytesPerSecond); err != nil {
return nil, err
}
if err := parseParam(nvmeof.WMbytesPerSecond, nvmeof.WMbytesPerSecond, &qos.WMbytesPerSecond); err != nil {
return nil, err
}

if !hasAnyQoS {
return nil, nil
}

return qos, nil
}

// modifyNVMeoFQoS handles NVMe-oF gateway QoS modification.
func (cs *Server) modifyNVMeoFQoS(
ctx context.Context,
req *csi.ControllerModifyVolumeRequest,
qos *nvmeof.NVMeoFQosVolume,
) (*csi.ControllerModifyVolumeResponse, error) {
volumeID := req.GetVolumeId()

// Step 1: Get secrets

// Since ControllerModifyVolume doesn't receive volume context and dont have option to take secrets
// because there is no "csi.storage.k8s.io/controller-modify-secret-name" field in the SC !,
// the full solution for it is to use GetControllerExpandSecretRef but there is no such function yet.
// TODO: change the call to GetControllerExpandSecretRef once it is implemented.
secrets := req.GetSecrets()
if secrets == nil {
secretName, secretNamespace, err := util.GetControllerPublishSecretRef(volumeID, util.RBDType)
if err != nil {
log.ErrorLog(ctx, "Failed to get secret reference: %v", err)

return nil, status.Errorf(codes.Internal, "failed to get secret reference: %v", err)
}

secrets, err = k8s.GetSecret(secretName, secretNamespace)
if err != nil {
log.ErrorLog(ctx, "Failed to get secret from k8s: %v", err)

return nil, status.Errorf(codes.Internal, "failed to get secret: %v", err)
}
}

// Step 2: Get NVMe-oF metadata
nvmeofData, err := cs.getNVMeoFMetadata(ctx, secrets, volumeID)
if err != nil {
log.ErrorLog(ctx, "Failed to get NVMe-oF metadata: %v", err)

return nil, nvmeoferrors.ToGRPCError(err)
}

// Step 3: Connect to gateway
config := &nvmeof.GatewayConfig{
Address: nvmeofData.GatewayManagementInfo.Address,
Port: nvmeofData.GatewayManagementInfo.Port,
}
gateway, err := connectGateway(ctx, config)
if err != nil {
log.ErrorLog(ctx, "Gateway connection failed: %v", err)

return nil, status.Errorf(codes.Unavailable, "gateway connection failed: %v", err)
}
defer func() {
if closeErr := gateway.Destroy(); closeErr != nil {
log.ErrorLog(ctx, "Failed to close gateway connection: %v", closeErr)
}
}()

// Step 4: Apply NVMe-oF QoS via gateway
log.DebugLog(ctx, "Setting QoS for subsystem=%s, nsid=%d", nvmeofData.SubsystemNQN, nvmeofData.NamespaceID)

err = gateway.SetQoSLimitsForNamespace(ctx, nvmeofData.SubsystemNQN, nvmeofData.NamespaceID, *qos)
if err != nil {
// Check if error is EEXIST (RBD QoS already set)
if errors.Is(err, nvmeoferrors.ErrRbdQoSExists) {
log.ErrorLog(ctx, "RBD QoS already configured on volume")

return nil, status.Error(codes.InvalidArgument,
"RBD QoS already configured on this volume, cannot set NVMe-oF gateway QoS")
}

log.ErrorLog(ctx, "Failed to set QoS limits: %v", err)

return nil, status.Errorf(codes.Internal, "failed to set QoS limits: %v", err)
}

log.DebugLog(ctx, "Successfully modified NVMe-oF QoS for volume %s", volumeID)

return &csi.ControllerModifyVolumeResponse{}, nil
}

// ensureSubsystem checks if the subsystem exists, and creates it if not.
// then creates the listener.
func ensureSubsystem(
Expand Down Expand Up @@ -434,7 +616,6 @@ func cleanupEmptySubsystem(
}

// createNVMeoFResources sets up the NVMe-oF resources for the given RBD volume.
// TODO - need to support multiple listeners.
func (cs *Server) createNVMeoFResources(
ctx context.Context,
req *csi.CreateVolumeRequest,
Expand Down Expand Up @@ -465,6 +646,16 @@ func (cs *Server) createNVMeoFResources(
Port: uint32(nvmeofGatewayPort),
},
}
// extract Qos parameters if any
mutableParams := req.GetMutableParameters()
// It take the mutableParams value from the volumeAttributesClassName in the PersistentVolumeClaim yaml.
// We already verified in the validateCreateVolumeRequest that there is no RBD QoS
nvmeofQoS, err := parseQoSParameters(mutableParams)
if err != nil {
log.ErrorLog(ctx, "failed to parse NVMe-oF QoS parameters: %v", err)

return nil, fmt.Errorf("failed to parse QoS parameters: %w", err)
}

// Step 2: Connect to gateway
config, err := getGatewayConfigFromRequest(params)
Expand Down Expand Up @@ -505,6 +696,15 @@ func (cs *Server) createNVMeoFResources(
log.DebugLog(ctx, "Namespace created: %s/%s with NSID: %d", rbdPoolName, rbdImageName, nsid)
nvmeofData.NamespaceID = nsid

// Step 5: Set QoS limits if any
if nvmeofQoS != nil {
log.DebugLog(ctx, "Setting QoS limits: %s", nvmeofQoS)
if err := gateway.SetQoSLimitsForNamespace(ctx, nvmeofData.SubsystemNQN, nvmeofData.NamespaceID,
*nvmeofQoS); err != nil {
return nil, fmt.Errorf("setting QoS limits failed: %w", err)
}
}

uuid, err := gateway.GetUUIDBySubsystemAndNameSpaceID(ctx, nvmeofData.SubsystemNQN, nvmeofData.NamespaceID)
if err != nil {
return nil, fmt.Errorf("get namespace uuid failed: %w", err)
Expand Down Expand Up @@ -799,7 +999,8 @@ func (cs *Server) getNVMeoFMetadata(
// Get RBD volume
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
if err != nil {
return nil, fmt.Errorf("failed to find volume with ID %q: %w", volumeID, err)
return nil, fmt.Errorf("%w: failed to find volume with ID %q: %w",
nvmeoferrors.ErrMetadataNotFound, volumeID, err)
}
defer rbdVol.Destroy(ctx)

Expand All @@ -820,29 +1021,34 @@ func (cs *Server) getNVMeoFMetadata(
for _, key := range requiredKeys {
value, err := rbdVol.GetMetadata(key)
if err != nil {
return nil, fmt.Errorf("failed to get %s: %w", key, err)
return nil, fmt.Errorf("%w: failed to get %s: %w",
nvmeoferrors.ErrMetadataNotFound, key, err)
}
if value == "" {
return nil, fmt.Errorf("metadata %s is empty", key)
return nil, fmt.Errorf("%w: metadata %s is empty",
nvmeoferrors.ErrMetadataNotFound, key)
}
metadata[key] = value
}

// Parse namespace ID
nsid, err := strconv.ParseUint(metadata[toRBDMetadataKey(vcNamespaceID)], 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid namespace ID: %w", err)
return nil, fmt.Errorf("%w: invalid namespace ID: %w",
nvmeoferrors.ErrMetadataCorrupted, err)
}

gatewayPort, err := strconv.ParseUint(metadata[toRBDMetadataKey(vcGatewayPort)], 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid gateway port: %w", err)
return nil, fmt.Errorf("%w: invalid gateway port: %w",
nvmeoferrors.ErrMetadataCorrupted, err)
}

// Parse listeners from JSON
var listeners []nvmeof.ListenerDetails
if err := json.Unmarshal([]byte(metadata[toRBDMetadataKey(vcListeners)]), &listeners); err != nil {
return nil, fmt.Errorf("failed to parse listeners JSON: %w", err)
return nil, fmt.Errorf("%w: failed to parse listeners JSON: %w",
nvmeoferrors.ErrMetadataCorrupted, err)
}

// Construct NVMe-oF volume data
Expand Down
Loading
Loading