Skip to content
Merged
85 changes: 85 additions & 0 deletions examples/customresources/demos/singletonsensor/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Package main is a singleton-on-disk sensor used by
// TestOptionalReconfigureFailureAndRecovery. First construction writes a
// lock file; the next construction (the one fired by the weak/optional
// reconfigure path) finds the lock and is refused, but clears it on the
// way out so the following worker tick can recover.
package main

import (
"context"
"errors"
"os"
"sync/atomic"

"go.viam.com/rdk/components/sensor"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/module"
"go.viam.com/rdk/resource"
)

var model = resource.NewModel("viam-test", "demo", "singleton-sensor")

type config struct {
LockPath string `json:"lock_path"`
}

func (cfg *config) Validate(path string) ([]string, []string, error) {
if cfg.LockPath == "" {
return nil, nil, errors.New("lock_path required")
}
return nil, []string{sensor.Named("opt").String()}, nil
}

type singletonSensor struct {
resource.Named
resource.AlwaysRebuild
closed atomic.Bool
}

// refused gates the bug pattern to a single failure; subsequent attempts
// succeed so the test can observe recovery.
var refused atomic.Bool

func main() {
resource.RegisterComponent(sensor.API, model, resource.Registration[sensor.Sensor, *config]{
Constructor: newSensor,
})
module.ModularMain(resource.APIModel{API: sensor.API, Model: model})
}

func newSensor(_ context.Context, _ resource.Dependencies, conf resource.Config, _ logging.Logger) (sensor.Sensor, error) {
cfg, err := resource.NativeConfig[*config](conf)
if err != nil {
return nil, err
}
if _, err := os.Stat(cfg.LockPath); err == nil {
if rmErr := os.Remove(cfg.LockPath); rmErr != nil {
return nil, rmErr
}
if !refused.Swap(true) {
return nil, errors.New("singleton-sensor: lock present at " + cfg.LockPath)
}
} else if !errors.Is(err, os.ErrNotExist) {
return nil, err
}
f, err := os.Create(cfg.LockPath)
if err != nil {
return nil, err
}
if err := f.Close(); err != nil {
return nil, err
}
return &singletonSensor{Named: conf.ResourceName().AsNamed()}, nil
}

func (s *singletonSensor) Close(context.Context) error {
s.closed.Store(true)
return nil
}

func (s *singletonSensor) Readings(context.Context, map[string]interface{}) (map[string]interface{}, error) {
if s.closed.Load() {
return nil, errors.New("sensor is closed")
}
return map[string]interface{}{}, nil
}
9 changes: 5 additions & 4 deletions robot/impl/local_robot.go
Original file line number Diff line number Diff line change
Expand Up @@ -1104,11 +1104,12 @@ func (r *localRobot) updateWeakAndOptionalDependents(ctx context.Context) {
"resource", resName,
)
} else {
r.Logger().CErrorw(
ctx,
"failed to reconfigure resource during weak/optional dependencies update",
// A failed Reconfigure can leave the resource in an indeterminate state,
// so mark the node unhealthy. Callers will see a clear error from
// ResourceByName instead of dispatching into a broken instance.
resNode.LogAndSetLastError(
fmt.Errorf("failed to reconfigure resource during weak/optional dependencies update: %w", err),
"resource", resName,
"error", err,
)
}
}
Expand Down
120 changes: 114 additions & 6 deletions robot/impl/robot_reconfigure_weak_dependencies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package robotimpl

import (
"context"
"path/filepath"
"testing"
"time"

"github.com/google/uuid"
"go.viam.com/test"
"go.viam.com/utils"
"go.viam.com/utils/testutils"

// TODO(RSDK-7884): change everything that depends on this import to a mock.
"go.viam.com/rdk/components/arm"
Expand All @@ -16,9 +19,13 @@ import (
"go.viam.com/rdk/components/base"
// TODO(RSDK-7884): change everything that depends on this import to a mock.
"go.viam.com/rdk/components/generic"
"go.viam.com/rdk/components/sensor"
// register fake sensor (model rdk:builtin:fake)
_ "go.viam.com/rdk/components/sensor/fake"
"go.viam.com/rdk/config"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/resource"
rtestutils "go.viam.com/rdk/testutils"
rutils "go.viam.com/rdk/utils"
)

Expand Down Expand Up @@ -127,9 +134,10 @@ func TestUpdateWeakDependents(t *testing.T) {
weakCfg2 := config.Config{
Components: []resource.Config{
{
Name: weak1Name.Name,
API: weakAPI,
Model: weakModel,
Name: weak1Name.Name,
API: weakAPI,
Model: weakModel,
ConvertedAttributes: &someTypeWithWeakAndStrongDepsConfig{},
},
{
Name: base1Name.Name,
Expand All @@ -156,9 +164,10 @@ func TestUpdateWeakDependents(t *testing.T) {
weakCfg3 := config.Config{
Components: []resource.Config{
{
Name: weak1Name.Name,
API: weakAPI,
Model: weakModel,
Name: weak1Name.Name,
API: weakAPI,
Model: weakModel,
ConvertedAttributes: &someTypeWithWeakAndStrongDepsConfig{},
},
{
Name: base1Name.Name,
Expand Down Expand Up @@ -655,3 +664,102 @@ func TestWeakDependentsDependedOn(t *testing.T) {
lRobot.getDependencies(base1Name, node)
test.That(t, weak1.reconfigCount, test.ShouldEqual, 4)
}

// TestWeakReconfigureFailureMarksUnhealthy verifies the framework marks a
// resource Unhealthy when its weak/optional Reconfigure returns an error.
func TestWeakReconfigureFailureMarksUnhealthy(t *testing.T) {
logger := logging.NewTestLogger(t)

weakAPI := resource.NewAPI(uuid.NewString(), "component", "weak")
weakModel := resource.DefaultModelFamily.WithModel(utils.RandomAlphaString(5))
weak1Name := resource.NewName(weakAPI, "weak1")
resource.Register(weakAPI, weakModel,
resource.Registration[*someTypeWithWeakAndStrongDeps, *someTypeWithWeakAndStrongDepsConfig]{
Constructor: func(_ context.Context, deps resource.Dependencies, conf resource.Config, _ logging.Logger,
) (*someTypeWithWeakAndStrongDeps, error) {
return &someTypeWithWeakAndStrongDeps{Named: conf.ResourceName().AsNamed(), resources: deps}, nil
},
WeakDependencies: []resource.Matcher{resource.TypeMatcher{Type: resource.APITypeComponentName}},
})
defer resource.Deregister(weakAPI, weakModel)

// weak1 has no ConvertedAttributes, so its Reconfigure returns a
// NativeConfig error when the weak/optional update fires.
cfg := config.Config{
Components: []resource.Config{
{Name: weak1Name.Name, API: weakAPI, Model: weakModel},
{Name: "base1", API: base.API, Model: fake.Model},
},
}
test.That(t, cfg.Ensure(false, logger), test.ShouldBeNil)

robot := setupLocalRobot(t, context.Background(), &cfg, logger)

_, err := robot.ResourceByName(weak1Name)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring,
"failed to reconfigure resource during weak/optional dependencies update")
}

// TestOptionalReconfigureFailureAndRecovery exercises the modular path
// end-to-end with the singleton-sensor module: the first weak/optional
// rebuild fails on a lock conflict, the framework marks the node
// Unhealthy, and a subsequent worker tick reconstructs it successfully.
func TestOptionalReconfigureFailureAndRecovery(t *testing.T) {
logger := logging.NewTestLogger(t)
ctx := context.Background()

modulePath := rtestutils.BuildTempModule(t, "examples/customresources/demos/singletonsensor")
lockPath := filepath.Join(t.TempDir(), "singleton.lock")

singletonModel := resource.NewModel("viam-test", "demo", "singleton-sensor")
mySensor := sensor.Named("mysensor")

cfg := &config.Config{
Modules: []config.Module{
{Name: "singletonsensor", ExePath: modulePath},
},
Components: []resource.Config{
{
Name: mySensor.Name,
API: sensor.API,
Model: singletonModel,
Attributes: rutils.AttributeMap{"lock_path": lockPath},
},
{
Name: "opt",
API: sensor.API,
Model: resource.DefaultModelFamily.WithModel("fake"),
},
},
}
test.That(t, cfg.Ensure(false, logger), test.ShouldBeNil)

robot := setupLocalRobot(t, ctx, cfg, logger)

// setupLocalRobot runs the weak/optional update at the end of
// completeConfig, which fires the rebuild that fails on the still-
// present lock. The "lock present at" string is singleton-sensor-only,
// so matching it confirms the failure came from the rebuild path.
_, err := robot.ResourceByName(mySensor)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, "lock present at")

// Recovery: run the worker's retry path synchronously (equivalent to a
// tick firing) so we don't need to poll for the resource to come back.
anyChanges := robot.(*localRobot).updateRemotesAndRetryResourceConfigure()
test.That(t, anyChanges, test.ShouldBeTrue)

res, err := robot.ResourceByName(mySensor)
test.That(t, err, test.ShouldBeNil)
s, ok := res.(sensor.Sensor)
test.That(t, ok, test.ShouldBeTrue)

testutils.WaitForAssertionWithSleep(t, 200*time.Millisecond, 25, func(tb testing.TB) {
tb.Helper()
callCtx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
_, err := s.Readings(callCtx, nil)
test.That(tb, err, test.ShouldBeNil)
})
}
Loading