Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,8 @@ public void clone(
requestData.resource_group,
requestData.override_props,
new HashSet<>(requestData.delete_props),
new HashSet<>(requestData.delete_namespaces)
new HashSet<>(requestData.delete_namespaces),
requestData.rebalance
);
requestHelper.doFlux(
ApiConsts.API_CLONE_RSCDFN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public void restoreResource(
rscName,
snapName,
snapRestore.to_resource,
snapRestore.stor_pool_rename == null ? Collections.emptyMap() : snapRestore.stor_pool_rename
snapRestore.stor_pool_rename == null ? Collections.emptyMap() : snapRestore.stor_pool_rename,
snapRestore.rebalance
);

requestHelper.doFlux(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,10 @@ public static class SnapshotRestore
*/
public List<String> nodes = Collections.emptyList();
public Map<String, String> stor_pool_rename = Collections.emptyMap();
/**
* If true, after restore completes, replicas will be migrated to the autoplacer-optimal nodes via migrate-disk.
*/
public @Nullable Boolean rebalance;
}

@JsonInclude(JsonInclude.Include.NON_EMPTY)
Expand Down Expand Up @@ -1899,6 +1903,10 @@ public static class ResourceDefinitionCloneRequest
public Map<String, String> override_props = Collections.emptyMap();
public List<String> delete_props = Collections.emptyList();
public List<String> delete_namespaces = Collections.emptyList();
/**
* If true, after cloning completes, replicas will be migrated to the autoplacer-optimal nodes via migrate-disk.
*/
public @Nullable Boolean rebalance;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public class CtrlRscDfnApiCallHandler
private final Provider<PropsChangedListenerBuilder> propsChangeListenerBuilder;
private final Autoplacer autoplacer;
private final BackgroundRunner backgroundRunner;
private final CtrlRscRebalanceHelper rscRebalanceHelper;

@Inject
public CtrlRscDfnApiCallHandler(
Expand Down Expand Up @@ -204,7 +205,8 @@ public CtrlRscDfnApiCallHandler(
CtrlRscDfnApiCallHelper ctrlRscDfnApiCallHelperRef,
Provider<PropsChangedListenerBuilder> propsChangeListenerBuilderRef,
Autoplacer autoplacerRef,
BackgroundRunner backgroundRunnerRef
BackgroundRunner backgroundRunnerRef,
CtrlRscRebalanceHelper rscRebalanceHelperRef
)
{
errorReporter = errorReporterRef;
Expand Down Expand Up @@ -242,6 +244,7 @@ public CtrlRscDfnApiCallHandler(
propsChangeListenerBuilder = propsChangeListenerBuilderRef;
autoplacer = autoplacerRef;
backgroundRunner = backgroundRunnerRef;
rscRebalanceHelper = rscRebalanceHelperRef;
}

public @Nullable ResourceDefinition createResourceDefinition(
Expand Down Expand Up @@ -820,7 +823,8 @@ public Flux<ApiCallRc> cloneRscDfn(
@Nullable String intoRscGrpName,
Map<String, String> overrideProps,
Set<String> deletePropKeys,
Set<String> deleteNamespaces
Set<String> deleteNamespaces,
@Nullable Boolean rebalance
)
{
ResponseContext context = makeResourceDefinitionContext(
Expand Down Expand Up @@ -860,7 +864,8 @@ public Flux<ApiCallRc> cloneRscDfn(
intoRscGrpName,
overrideProps,
deletePropKeys,
deleteNamespaces
deleteNamespaces,
rebalance
)
)
.transform(responses -> responseConverter.reportingExceptions(context, responses))
Expand Down Expand Up @@ -1318,7 +1323,8 @@ public Flux<ApiCallRc> cloneRscDfnInTransaction(
@Nullable String intoRscGrpName,
Map<String, String> overrideProps,
Set<String> deletePropKeys,
Set<String> deleteNamespaces)
Set<String> deleteNamespaces,
@Nullable Boolean rebalance)
{
Flux<ApiCallRc> flux;

Expand Down Expand Up @@ -1535,6 +1541,13 @@ public Flux<ApiCallRc> cloneRscDfnInTransaction(
.concatWith(resumeIOAndClearCloneProp(srcRscDfn, clonedRscName))
.concatWith(deploymentResponses)
.concatWith(removeStartCloning(clonedRscDfn))
.concatWith(Boolean.TRUE.equals(rebalance)
? rscRebalanceHelper.trigger(clonedRscDfn)
.onErrorResume(exc -> {
errorReporter.reportError(exc);
return Flux.empty();
})
: Flux.empty())
.onErrorResume(exception -> resumeIOAndClearCloneProp(srcRscDfn, clonedRscName));
}
catch (ApiRcException exc)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package com.linbit.linstor.core.apicallhandler.controller;

import com.linbit.linstor.annotation.Nullable;
import com.linbit.linstor.annotation.PeerContext;
import com.linbit.linstor.api.ApiCallRc;
import com.linbit.linstor.api.ApiCallRcImpl;
import com.linbit.linstor.api.ApiConsts;
import com.linbit.linstor.api.pojo.AutoSelectFilterPojo;
import com.linbit.linstor.api.pojo.builder.AutoSelectFilterBuilder;
import com.linbit.linstor.core.apicallhandler.controller.autoplacer.Autoplacer;
import com.linbit.linstor.core.apicallhandler.response.ApiAccessDeniedException;
import com.linbit.linstor.core.objects.Node;
import com.linbit.linstor.core.objects.Resource;
import com.linbit.linstor.core.objects.ResourceDefinition;
import com.linbit.linstor.core.objects.StorPool;
import com.linbit.linstor.logging.ErrorReporter;
import com.linbit.linstor.security.AccessContext;
import com.linbit.linstor.security.AccessDeniedException;

import javax.inject.Inject;
import javax.inject.Provider;
import javax.inject.Singleton;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import reactor.core.publisher.Flux;

/**
* Helper for rebalancing resources after clone or snapshot restore.
*
* When resources are cloned or restored from snapshot, replicas are placed on the
* same nodes as the source. This helper migrates replicas to the autoplacer-optimal
* nodes via sequential migrate-disk (toggle-disk with MigrateFrom) calls.
*/
@Singleton
public class CtrlRscRebalanceHelper
{
private final ErrorReporter errorReporter;
private final Autoplacer autoplacer;
private final CtrlRscToggleDiskApiCallHandler rscToggleDiskHelper;
private final Provider<AccessContext> peerAccCtx;

@Inject
public CtrlRscRebalanceHelper(
ErrorReporter errorReporterRef,
Autoplacer autoplacerRef,
CtrlRscToggleDiskApiCallHandler rscToggleDiskHelperRef,
@PeerContext Provider<AccessContext> peerAccCtxRef
)
{
errorReporter = errorReporterRef;
autoplacer = autoplacerRef;
rscToggleDiskHelper = rscToggleDiskHelperRef;
peerAccCtx = peerAccCtxRef;
}

/**
* Trigger rebalance for the given resource definition.
*
* Computes optimal placement via autoplacer, then sequentially calls
* migrate-disk for each misplaced replica.
*
* @param rscDfn the resource definition to rebalance
* @return Flux of ApiCallRc from the migrate-disk operations
*/
public Flux<ApiCallRc> trigger(ResourceDefinition rscDfn)
{
try
{
AccessContext accCtx = peerAccCtx.get();
String rscName = rscDfn.getName().displayValue;

List<Resource> currentDiskful = rscDfn.getDiskfulResources(accCtx);
if (currentDiskful.isEmpty())
{
return Flux.empty();
}

int replicaCount = currentDiskful.size();
long rscSize = CtrlRscAutoPlaceApiCallHandler.calculateResourceDefinitionSize(rscDfn, accCtx);

// Query autoplacer for optimal fresh placement (null rscDfn = ignore existing replicas)
AutoSelectFilterPojo selectFilter = AutoSelectFilterPojo.merge(
new AutoSelectFilterBuilder()
.setPlaceCount(replicaCount)
.build(),
rscDfn.getResourceGroup().getAutoPlaceConfig().getApiData()
);

@Nullable Set<StorPool> optimalPools = autoplacer.autoPlace(selectFilter, null, rscSize);
if (optimalPools == null)
{
errorReporter.logWarning(
"Rebalance: autoplacer could not find optimal placement for '%s', skipping rebalance",
rscName
);
return Flux.empty();
}

Set<String> optimalNodeNames = optimalPools.stream()
.map(sp -> sp.getNode().getName().displayValue)
.collect(Collectors.toSet());

Set<String> currentNodeNames = currentDiskful.stream()
.map(rsc -> rsc.getNode().getName().displayValue)
.collect(Collectors.toSet());

// Compute migration pairs: source nodes that are not optimal -> target nodes that are not current
List<String> nodesToRemove = currentNodeNames.stream()
.filter(n -> !optimalNodeNames.contains(n))
.collect(Collectors.toList());

List<String> nodesToAdd = optimalNodeNames.stream()
.filter(n -> !currentNodeNames.contains(n))
.collect(Collectors.toList());

int pairCount = Math.min(nodesToRemove.size(), nodesToAdd.size());
if (pairCount == 0)
{
errorReporter.logInfo(
"Rebalance: '%s' is already on optimal nodes, no migration needed",
rscName
);
return Flux.empty();
}

// Build migration pairs: source -> target (with target storage pool name)
Map<StorPool, String> targetPoolToSource = new LinkedHashMap<>();
for (int i = 0; i < pairCount; i++)
{
String targetNodeName = nodesToAdd.get(i);
String sourceNodeName = nodesToRemove.get(i);

// Find the storage pool for target node from autoplacer results
StorPool targetPool = optimalPools.stream()
.filter(sp -> sp.getNode().getName().displayValue.equals(targetNodeName))
.findFirst()
.orElse(null);

if (targetPool != null)
{
targetPoolToSource.put(targetPool, sourceNodeName);
}
}

if (targetPoolToSource.isEmpty())
{
return Flux.empty();
}

List<String> migrationSummary = new ArrayList<>();
for (Map.Entry<StorPool, String> entry : targetPoolToSource.entrySet())
{
migrationSummary.add(
entry.getValue() + " -> " + entry.getKey().getNode().getName().displayValue
);
}
errorReporter.logInfo(
"Rebalance: '%s' scheduling %d migration(s): %s",
rscName,
pairCount,
String.join(", ", migrationSummary)
);

// Chain sequential migrate-disk calls via toggle-disk
Flux<ApiCallRc> migrationFlux = Flux.empty();
for (Map.Entry<StorPool, String> entry : targetPoolToSource.entrySet())
{
StorPool targetPool = entry.getKey();
String sourceNodeName = entry.getValue();
String targetNodeName = targetPool.getNode().getName().displayValue;
String storPoolName = targetPool.getName().displayValue;

migrationFlux = migrationFlux.concatWith(
rscToggleDiskHelper.resourceToggleDisk(
targetNodeName,
rscName,
storPoolName,
sourceNodeName, // migrateFrom
null, // layerList
false, // removeDisk = false (adding disk)
Resource.DiskfulBy.USER
)
);
}

return Flux.<ApiCallRc>just(
ApiCallRcImpl.singleApiCallRc(
ApiConsts.MASK_INFO,
"Rebalance: scheduling " + pairCount + " migration(s) for resource '" + rscName + "'"
)
).concatWith(migrationFlux);
}
catch (AccessDeniedException exc)
{
throw new ApiAccessDeniedException(
exc,
"rebalancing resource",
ApiConsts.FAIL_ACC_DENIED_RSC
);
}
}
}
Loading