From 9d7cc307927e90196239e32fe01e30231758e5da Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Tue, 31 Mar 2026 14:49:47 -0400 Subject: [PATCH 01/23] test setting wait at end and interpolate to false in go to inputs --- components/arm/client.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/components/arm/client.go b/components/arm/client.go index 5a5fdc13426..ad36877b798 100644 --- a/components/arm/client.go +++ b/components/arm/client.go @@ -211,7 +211,10 @@ func (c *client) CurrentInputs(ctx context.Context) ([]referenceframe.Input, err } func (c *client) GoToInputs(ctx context.Context, inputSteps ...[]referenceframe.Input) error { - return c.MoveThroughJointPositions(ctx, inputSteps, nil, nil) + return c.MoveThroughJointPositions(ctx, inputSteps, nil, map[string]interface{}{ + "waitAtEnd": false, + "interpolate": false, + }) } func (c *client) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) { From a2d5dab7a37e8625fd0587c23a686561721cd829 Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Tue, 31 Mar 2026 18:27:17 -0400 Subject: [PATCH 02/23] test default frame step --- services/motion/builtin/teleop.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/motion/builtin/teleop.go b/services/motion/builtin/teleop.go index 8b6ef4d6049..b49f932fbfc 100644 --- a/services/motion/builtin/teleop.go +++ b/services/motion/builtin/teleop.go @@ -224,7 +224,7 @@ func (tp *teleopPipeline) buildMoveReq( "timeout": 5.0, // seconds; default is 300 "max_ik_solutions": 20, // default is 100 "min_ik_score": 0.05, // default is 0.01 - "frame_step": 0.05, // default is 0.01; reduces trajectory steps from ~14 to ~3-4 + "frame_step": 0.01, // default is 0.01; reduces trajectory steps from ~14 to ~3-4 } for k, v := range teleopDefaults { if _, ok := extra[k]; !ok { From 92bb4296bb841434847cf8b547a0a365316b3dff Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Tue, 31 Mar 2026 18:35:01 -0400 Subject: [PATCH 03/23] Update teleop.go --- services/motion/builtin/teleop.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/motion/builtin/teleop.go b/services/motion/builtin/teleop.go index b49f932fbfc..8b6ef4d6049 100644 --- a/services/motion/builtin/teleop.go +++ b/services/motion/builtin/teleop.go @@ -224,7 +224,7 @@ func (tp *teleopPipeline) buildMoveReq( "timeout": 5.0, // seconds; default is 300 "max_ik_solutions": 20, // default is 100 "min_ik_score": 0.05, // default is 0.01 - "frame_step": 0.01, // default is 0.01; reduces trajectory steps from ~14 to ~3-4 + "frame_step": 0.05, // default is 0.01; reduces trajectory steps from ~14 to ~3-4 } for k, v := range teleopDefaults { if _, ok := extra[k]; !ok { From 011c1dac8efad0a8d67303ef150cec6bf4866219 Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Wed, 1 Apr 2026 10:06:10 -0400 Subject: [PATCH 04/23] interpolate true --- components/arm/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/arm/client.go b/components/arm/client.go index ad36877b798..5c7eecbb09a 100644 --- a/components/arm/client.go +++ b/components/arm/client.go @@ -213,7 +213,7 @@ func (c *client) CurrentInputs(ctx context.Context) ([]referenceframe.Input, err func (c *client) GoToInputs(ctx context.Context, inputSteps ...[]referenceframe.Input) error { return c.MoveThroughJointPositions(ctx, inputSteps, nil, map[string]interface{}{ "waitAtEnd": false, - "interpolate": false, + "interpolate": true, }) } From 18ca3c5cf9beb442e07122c802970eefe763b73d Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Wed, 1 Apr 2026 10:15:46 -0400 Subject: [PATCH 05/23] Update client.go --- components/arm/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/components/arm/client.go b/components/arm/client.go index 5c7eecbb09a..6da6345eea8 100644 --- a/components/arm/client.go +++ b/components/arm/client.go @@ -212,8 +212,8 @@ func (c *client) CurrentInputs(ctx context.Context) ([]referenceframe.Input, err func (c *client) GoToInputs(ctx context.Context, inputSteps ...[]referenceframe.Input) error { return c.MoveThroughJointPositions(ctx, inputSteps, nil, map[string]interface{}{ - "waitAtEnd": false, - "interpolate": true, + "waitAtEnd": true, + "interpolate": false, }) } From 08d5210d589fdb6ab4f487aa3df961559976d633 Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Wed, 1 Apr 2026 10:20:59 -0400 Subject: [PATCH 06/23] Update client.go --- components/arm/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/arm/client.go b/components/arm/client.go index 6da6345eea8..ad36877b798 100644 --- a/components/arm/client.go +++ b/components/arm/client.go @@ -212,7 +212,7 @@ func (c *client) CurrentInputs(ctx context.Context) ([]referenceframe.Input, err func (c *client) GoToInputs(ctx context.Context, inputSteps ...[]referenceframe.Input) error { return c.MoveThroughJointPositions(ctx, inputSteps, nil, map[string]interface{}{ - "waitAtEnd": true, + "waitAtEnd": false, "interpolate": false, }) } From 068ad5654212b0bba2b3ff92d1f7febfa053eef9 Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Wed, 1 Apr 2026 13:02:56 -0400 Subject: [PATCH 07/23] test teleop_small_move_rad --- services/motion/builtin/builtin.go | 42 ++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/services/motion/builtin/builtin.go b/services/motion/builtin/builtin.go index 326702b6333..f583affe29d 100644 --- a/services/motion/builtin/builtin.go +++ b/services/motion/builtin/builtin.go @@ -23,6 +23,7 @@ import ( "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/structpb" + "go.viam.com/rdk/components/arm" "go.viam.com/rdk/components/movementsensor" "go.viam.com/rdk/logging" "go.viam.com/rdk/motionplan" @@ -97,6 +98,11 @@ type Config struct { // example { "arm" : { "3" : { "min" : 0, "max" : 2 } } } InputRangeOverride map[string]map[string]referenceframe.Limit `json:"input_range_override"` + + // TeleopSmallMoveRad is the joint-space L-inf displacement threshold (radians) + // below which execute() uses interpolate=true for smoother servo tracking. + // 0 disables adaptive behavior. Suggested starting value: 0.1 (~5.7 degrees). + TeleopSmallMoveRad float64 `json:"teleop_small_move_rad,omitempty"` } func (c *Config) shouldWritePlan(start time.Time, err error) bool { @@ -688,6 +694,29 @@ func (ms *builtIn) execute(ctx context.Context, trajectory motionplan.Trajectory if err != nil { return err } + + // Adaptive interpolation: for small movements on arms, use interpolate=true + // for smooth servo tracking instead of the default fast-streaming GoToInputs. + if threshold := ms.conf.TeleopSmallMoveRad; threshold > 0 && len(inputs) >= 2 { + if maxLinfDisplacement(inputs) < threshold { + if armComp, ok := r.(arm.Arm); ok { + err := armComp.MoveThroughJointPositions(ctx, inputs, nil, map[string]interface{}{ + "waitAtEnd": false, + "interpolate": true, + }) + if err != nil { + if actuator, ok := r.(inputEnabledActuator); ok { + if stopErr := actuator.Stop(ctx, nil); stopErr != nil { + return errors.Wrap(err, stopErr.Error()) + } + } + return err + } + continue + } + } + } + if err := ie.GoToInputs(ctx, inputs...); err != nil { // If there is an error on GoToInputs, stop the component if possible before returning the error if actuator, ok := r.(inputEnabledActuator); ok { @@ -702,6 +731,19 @@ func (ms *builtIn) execute(ctx context.Context, trajectory motionplan.Trajectory return nil } +// maxLinfDisplacement returns the max L-inf joint displacement (radians) across +// consecutive pairs of input steps. +func maxLinfDisplacement(steps [][]referenceframe.Input) float64 { + maxDisp := 0.0 + for i := 1; i < len(steps); i++ { + d := referenceframe.InputsLinfDistance(steps[i-1], steps[i]) + if d > maxDisp { + maxDisp = d + } + } + return maxDisp +} + // applyDefaultExtras iterates through the list of default extras configured on the builtIn motion service and adds them to the // given map of extras if the key does not already exist. func (ms *builtIn) applyDefaultExtras(extras map[string]any) { From 455f941cf0ef110e5c9369cc9b7f710a07fa0eaf Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Wed, 1 Apr 2026 17:03:58 -0400 Subject: [PATCH 08/23] test bimanual teleop --- services/motion/builtin/builtin.go | 21 ++-- services/motion/builtin/teleop.go | 152 +++++++++++++++++++++-------- 2 files changed, 125 insertions(+), 48 deletions(-) diff --git a/services/motion/builtin/builtin.go b/services/motion/builtin/builtin.go index f583affe29d..4bb576387f4 100644 --- a/services/motion/builtin/builtin.go +++ b/services/motion/builtin/builtin.go @@ -147,9 +147,9 @@ type builtIn struct { logger logging.Logger configuredDefaultExtras map[string]any - // Teleop pipeline. Protected by teleopMu (separate from mu to simplify lock ordering). - teleopMu sync.RWMutex - teleopPipeline *teleopPipeline + // Teleop pipelines keyed by component name. Protected by teleopMu (separate from mu to simplify lock ordering). + teleopMu sync.RWMutex + teleopPipelines map[string]*teleopPipeline } // NewBuiltIn returns a new move and grab service for the given robot. @@ -160,6 +160,7 @@ func NewBuiltIn( Named: conf.ResourceName().AsNamed(), logger: logger, configuredDefaultExtras: make(map[string]any), + teleopPipelines: make(map[string]*teleopPipeline), } if err := ms.Reconfigure(ctx, deps, conf); err != nil { @@ -174,11 +175,11 @@ func (ms *builtIn) Reconfigure( deps resource.Dependencies, conf resource.Config, ) error { - // Stop teleop pipeline before acquiring write lock (goroutines may hold RLock). + // Stop all teleop pipelines before acquiring write lock (goroutines may hold RLock). ms.teleopMu.Lock() - if ms.teleopPipeline != nil { - ms.teleopPipeline.stop(ctx, ms) - ms.teleopPipeline = nil + for name, tp := range ms.teleopPipelines { + tp.stop(ctx, ms) + delete(ms.teleopPipelines, name) } ms.teleopMu.Unlock() @@ -226,9 +227,9 @@ func (ms *builtIn) Reconfigure( func (ms *builtIn) Close(ctx context.Context) error { ms.teleopMu.Lock() - if ms.teleopPipeline != nil { - ms.teleopPipeline.stop(ctx, ms) - ms.teleopPipeline = nil + for name, tp := range ms.teleopPipelines { + tp.stop(ctx, ms) + delete(ms.teleopPipelines, name) } ms.teleopMu.Unlock() diff --git a/services/motion/builtin/teleop.go b/services/motion/builtin/teleop.go index 8b6ef4d6049..d36675ae8b7 100644 --- a/services/motion/builtin/teleop.go +++ b/services/motion/builtin/teleop.go @@ -141,6 +141,14 @@ func (tp *teleopPipeline) planOnce(ctx context.Context, ms *builtIn, pose *refer for k, v := range planningHead { mergedInputs[k] = v } + // Overlay planning heads from other active teleop pipelines so that this + // planner collision-checks against where the other arms are heading, not + // just where they currently are. + for _, head := range ms.otherPlanningHeads(tp.moveReqBase.ComponentName) { + for k, v := range head { + mergedInputs[k] = v + } + } // Build a MoveReq with start_state set to the merged config. req := tp.buildMoveReq(pose, mergedInputs) @@ -315,12 +323,14 @@ func (tp *teleopPipeline) stop(ctx context.Context, ms *builtIn) { tp.workers.Stop() } -// startTeleopPipeline creates and starts a new teleop pipeline. +// startTeleopPipeline creates and starts a new teleop pipeline for the given component. +// If a pipeline already exists for that component, it is stopped first. Other component pipelines are unaffected. func (ms *builtIn) startTeleopPipeline(cmdCtx context.Context, req motion.MoveReq) error { - // Stop any existing pipeline first. ms.teleopMu.Lock() - if ms.teleopPipeline != nil { - ms.teleopPipeline.stop(cmdCtx, ms) + // Stop only the pipeline for this component, if one exists. + if existing, ok := ms.teleopPipelines[req.ComponentName]; ok { + existing.stop(cmdCtx, ms) + delete(ms.teleopPipelines, req.ComponentName) } defer ms.teleopMu.Unlock() @@ -348,8 +358,8 @@ func (ms *builtIn) startTeleopPipeline(cmdCtx context.Context, req motion.MoveRe } ms.mu.RUnlock() - ms.teleopPipeline = &teleopPipeline{ - logger: ms.logger.Sublogger("teleop"), + tp := &teleopPipeline{ + logger: ms.logger.Sublogger("teleop." + req.ComponentName), moveReqBase: req, cachedFrameSys: frameSys, poseCh: make(chan *referenceframe.PoseInFrame, 1), @@ -357,15 +367,76 @@ func (ms *builtIn) startTeleopPipeline(cmdCtx context.Context, req motion.MoveRe planningHead: fsInputs, } - ms.teleopPipeline.poseCh <- req.Destination - ms.teleopPipeline.workers = goutils.NewBackgroundStoppableWorkers( - func(pipelineCtx context.Context) { ms.teleopPipeline.runPlanner(pipelineCtx, ms) }, - func(pipelineCtx context.Context) { ms.teleopPipeline.runExecutor(pipelineCtx, ms) }, + tp.poseCh <- req.Destination + tp.workers = goutils.NewBackgroundStoppableWorkers( + func(pipelineCtx context.Context) { tp.runPlanner(pipelineCtx, ms) }, + func(pipelineCtx context.Context) { tp.runExecutor(pipelineCtx, ms) }, ) + ms.teleopPipelines[req.ComponentName] = tp + + return nil +} + +// otherPlanningHeads returns the planning heads of all active teleop pipelines +// except the one for the given component. This allows each pipeline's planner to +// collision-check against where other arms are heading. +func (ms *builtIn) otherPlanningHeads(self string) map[string]referenceframe.FrameSystemInputs { + ms.teleopMu.RLock() + defer ms.teleopMu.RUnlock() + + result := make(map[string]referenceframe.FrameSystemInputs, len(ms.teleopPipelines)-1) + for name, tp := range ms.teleopPipelines { + if name == self { + continue + } + tp.planningHeadMu.RLock() + result[name] = tp.planningHead + tp.planningHeadMu.RUnlock() + } + return result +} + +// lookupPipeline finds the teleop pipeline for the given DoCommand map. +// It uses the "component_name" key if present; otherwise, if exactly one pipeline +// exists, it returns that one (backward compat for single-arm teleop). +func (ms *builtIn) lookupPipeline(cmd map[string]interface{}) *teleopPipeline { + componentName, _ := cmd["component_name"].(string) + + ms.teleopMu.RLock() + defer ms.teleopMu.RUnlock() + + if componentName != "" { + return ms.teleopPipelines[componentName] + } + // Backward compat: if no component_name and exactly one pipeline, use it. + if len(ms.teleopPipelines) == 1 { + for _, tp := range ms.teleopPipelines { + return tp + } + } return nil } +// buildPipelineStatus returns a status map for a single teleop pipeline. +func buildPipelineStatus(tp *teleopPipeline) map[string]any { + status := map[string]any{ + "running": true, + "queued_poses": len(tp.poseCh), + "queued_plans": len(tp.trajCh), + "last_inputs_ms": float64(tp.lastInputsNanos.Load()) / 1e6, + "last_plan_ms": float64(tp.lastPlanNanos.Load()) / 1e6, + "last_exec_ms": float64(tp.lastExecNanos.Load()) / 1e6, + "last_exec_wait_ms": float64(tp.lastExecWaitNanos.Load()) / 1e6, + "plan_count": tp.planCount.Load(), + "exec_count": tp.execCount.Load(), + } + if lastErr := tp.lastErr.Load(); lastErr != nil { + status["error"] = (*lastErr).Error() + } + return status +} + // handleTeleopCommand handles teleop DoCommand requests. // Returns (response, handled, error). If handled is false, the caller should // continue processing other DoCommand keys. @@ -409,9 +480,7 @@ func (ms *builtIn) handleTeleopCommand( } if req, ok := cmd[DoTeleopMove]; ok { - ms.teleopMu.RLock() - tp := ms.teleopPipeline - ms.teleopMu.RUnlock() + tp := ms.lookupPipeline(cmd) if tp == nil { return nil, true, fmt.Errorf("teleop pipeline is not running; call %s first", DoTeleopStart) } @@ -437,10 +506,20 @@ func (ms *builtIn) handleTeleopCommand( } if _, ok := cmd[DoTeleopStop]; ok { + componentName, _ := cmd["component_name"].(string) + ms.teleopMu.Lock() - if ms.teleopPipeline != nil { - ms.teleopPipeline.stop(ctx, ms) - ms.teleopPipeline = nil + if componentName == "" { + // Backward compat: stop ALL pipelines. + for name, tp := range ms.teleopPipelines { + tp.stop(ctx, ms) + delete(ms.teleopPipelines, name) + } + } else { + if tp, ok := ms.teleopPipelines[componentName]; ok { + tp.stop(ctx, ms) + delete(ms.teleopPipelines, componentName) + } } ms.teleopMu.Unlock() @@ -449,32 +528,29 @@ func (ms *builtIn) handleTeleopCommand( } if _, ok := cmd[DoTeleopStatus]; ok { - ms.teleopMu.RLock() - tp := ms.teleopPipeline - ms.teleopMu.RUnlock() + componentName, _ := cmd["component_name"].(string) - if tp == nil { - return map[string]any{ - "running": tp != nil, - }, true, nil + ms.teleopMu.RLock() + defer ms.teleopMu.RUnlock() + + if componentName != "" { + tp, exists := ms.teleopPipelines[componentName] + if !exists { + return map[string]any{ + "running": false, + }, true, nil + } + resp[DoTeleopStatus] = buildPipelineStatus(tp) + return resp, true, nil } - status := map[string]any{ - "queued_poses": len(tp.poseCh), - "queued_plans": len(tp.trajCh), - "last_inputs_ms": float64(tp.lastInputsNanos.Load()) / 1e6, - "last_plan_ms": float64(tp.lastPlanNanos.Load()) / 1e6, - "last_exec_ms": float64(tp.lastExecNanos.Load()) / 1e6, - "last_exec_wait_ms": float64(tp.lastExecWaitNanos.Load()) / 1e6, - "plan_count": tp.planCount.Load(), - "exec_count": tp.execCount.Load(), + // No component_name: return status of all pipelines. + allStatus := make(map[string]any, len(ms.teleopPipelines)+1) + allStatus["pipeline_count"] = len(ms.teleopPipelines) + for name, tp := range ms.teleopPipelines { + allStatus[name] = buildPipelineStatus(tp) } - if lastErr := tp.lastErr.Load(); lastErr != nil { - status["error"] = (*lastErr).Error() - } - - resp[DoTeleopStatus] = status - + resp[DoTeleopStatus] = allStatus return resp, true, nil } From b4f9b6e8e754efc496164416866baab62659d276 Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Wed, 1 Apr 2026 17:29:04 -0400 Subject: [PATCH 09/23] cache inputs --- services/motion/builtin/teleop.go | 42 +++++++++++++------------------ 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/services/motion/builtin/teleop.go b/services/motion/builtin/teleop.go index d36675ae8b7..2841b468ade 100644 --- a/services/motion/builtin/teleop.go +++ b/services/motion/builtin/teleop.go @@ -29,8 +29,9 @@ type teleopPipeline struct { logger logging.Logger // Immutable after creation. - moveReqBase motion.MoveReq - cachedFrameSys *referenceframe.FrameSystem // built once at pipeline start + moveReqBase motion.MoveReq + cachedFrameSys *referenceframe.FrameSystem // built once at pipeline start + cachedBaseInputs referenceframe.FrameSystemInputs // snapshot at pipeline start, avoids blocking CurrentInputs calls // Channels. poseCh chan *referenceframe.PoseInFrame // buffer 1, latest-value semantics @@ -117,24 +118,12 @@ func (tp *teleopPipeline) planOnce(ctx context.Context, ms *builtIn, pose *refer planningHead := tp.planningHead tp.planningHeadMu.RUnlock() - // Merge live inputs with the planning head: use fresh CurrentInputs for - // all components (including the other arm in bimanual setups), then overlay - // the planning head for the teleop'd component so trajectory chaining works. - // Timing includes RLock acquisition — intentional: wall-clock latency is what - // matters for diagnosing stutter; lock contention is a real part of that latency. + // Build merged inputs from cached base inputs (snapshot at pipeline start) and + // planning heads. This avoids calling CurrentInputs() which would block on arms + // that are mid-execution, causing stuttery alternating movement in multi-arm setups. inputsStart := time.Now() - ms.mu.RLock() - liveInputs, err := ms.fsService.CurrentInputs(ctx) - ms.mu.RUnlock() - inputsDur := time.Since(inputsStart) - tp.lastInputsNanos.Store(inputsDur.Nanoseconds()) - if err != nil { - tp.lastErr.Store(&err) - tp.logger.CWarnf(ctx, "teleop planner: failed to get current inputs: %v", err) - return - } - mergedInputs := make(referenceframe.FrameSystemInputs, len(liveInputs)) - for k, v := range liveInputs { + mergedInputs := make(referenceframe.FrameSystemInputs, len(tp.cachedBaseInputs)) + for k, v := range tp.cachedBaseInputs { mergedInputs[k] = v } // Overlay planning head entries for the teleop'd arm's frames. @@ -149,6 +138,8 @@ func (tp *teleopPipeline) planOnce(ctx context.Context, ms *builtIn, pose *refer mergedInputs[k] = v } } + inputsDur := time.Since(inputsStart) + tp.lastInputsNanos.Store(inputsDur.Nanoseconds()) // Build a MoveReq with start_state set to the merged config. req := tp.buildMoveReq(pose, mergedInputs) @@ -359,12 +350,13 @@ func (ms *builtIn) startTeleopPipeline(cmdCtx context.Context, req motion.MoveRe ms.mu.RUnlock() tp := &teleopPipeline{ - logger: ms.logger.Sublogger("teleop." + req.ComponentName), - moveReqBase: req, - cachedFrameSys: frameSys, - poseCh: make(chan *referenceframe.PoseInFrame, 1), - trajCh: make(chan motionplan.Trajectory, 1), - planningHead: fsInputs, + logger: ms.logger.Sublogger("teleop." + req.ComponentName), + moveReqBase: req, + cachedFrameSys: frameSys, + cachedBaseInputs: fsInputs, + poseCh: make(chan *referenceframe.PoseInFrame, 1), + trajCh: make(chan motionplan.Trajectory, 1), + planningHead: fsInputs, } tp.poseCh <- req.Destination From 117244a39b59a4e87c40af3bf2f5e8baa1bcd1ce Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Wed, 1 Apr 2026 17:46:23 -0400 Subject: [PATCH 10/23] filter components --- services/motion/builtin/teleop.go | 33 +++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/services/motion/builtin/teleop.go b/services/motion/builtin/teleop.go index 2841b468ade..02cac002cf7 100644 --- a/services/motion/builtin/teleop.go +++ b/services/motion/builtin/teleop.go @@ -236,6 +236,33 @@ func (tp *teleopPipeline) buildMoveReq( return req } +// filterTrajectoryForComponent returns a copy of the trajectory containing only +// entries for the named component. This prevents a teleop pipeline from sending +// GoToInputs to components owned by other pipelines. +func filterTrajectoryForComponent(traj motionplan.Trajectory, componentName string, logger logging.Logger) motionplan.Trajectory { + filtered := make(motionplan.Trajectory, len(traj)) + for i, step := range traj { + filteredStep := make(referenceframe.FrameSystemInputs, 1) + for name, inputs := range step { + if name == componentName && len(inputs) > 0 { + filteredStep[name] = inputs + } + } + filtered[i] = filteredStep + } + // Log if no matching entries found — likely a name mismatch. + if len(traj) > 0 { + if _, ok := traj[0][componentName]; !ok { + keys := make([]string, 0, len(traj[0])) + for k := range traj[0] { + keys = append(keys, k) + } + logger.Warnf("filterTrajectoryForComponent: component %q not found in trajectory keys %v", componentName, keys) + } + } + return filtered +} + // runExecutor is the executor goroutine. It reads trajectories from trajCh // and executes them on the arm via ms.execute. func (tp *teleopPipeline) runExecutor(ctx context.Context, ms *builtIn) { @@ -251,6 +278,12 @@ func (tp *teleopPipeline) runExecutor(ctx context.Context, ms *builtIn) { waitDur := time.Since(waitStart) tp.lastExecWaitNanos.Store(waitDur.Nanoseconds()) + // Filter trajectory to only include this pipeline's component. + // Without filtering, the trajectory includes all frame system components, + // and the executor would send GoToInputs to other arms — causing them + // to fight with their own pipelines. + traj = filterTrajectoryForComponent(traj, tp.moveReqBase.ComponentName, tp.logger) + execStart := time.Now() // Skip start-position check (math.MaxFloat64) because the arm // is in continuous motion and won't be exactly at the trajectory start. From 136339b661450806ac7885af43f8499fbb20329b Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Thu, 2 Apr 2026 11:57:51 -0400 Subject: [PATCH 11/23] Update teleop.go --- services/motion/builtin/teleop.go | 49 ++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/services/motion/builtin/teleop.go b/services/motion/builtin/teleop.go index 02cac002cf7..a61fa17b7b2 100644 --- a/services/motion/builtin/teleop.go +++ b/services/motion/builtin/teleop.go @@ -236,30 +236,45 @@ func (tp *teleopPipeline) buildMoveReq( return req } -// filterTrajectoryForComponent returns a copy of the trajectory containing only -// entries for the named component. This prevents a teleop pipeline from sending -// GoToInputs to components owned by other pipelines. -func filterTrajectoryForComponent(traj motionplan.Trajectory, componentName string, logger logging.Logger) motionplan.Trajectory { +// filterTrajectoryMovingFrames returns a copy of the trajectory containing only +// entries for frames whose inputs actually change across the trajectory. The planner +// outputs entries for ALL frames, but only the teleop'd arm's inputs change — static +// entries for other arms must be excluded to prevent conflicting GoToInputs calls +// from concurrent pipelines. +func filterTrajectoryMovingFrames(traj motionplan.Trajectory) motionplan.Trajectory { + if len(traj) < 2 { + return traj + } + + // Identify which frames change between the first and last step. + first := traj[0] + last := traj[len(traj)-1] + moving := make(map[string]bool) + for name, firstInputs := range first { + lastInputs, ok := last[name] + if !ok || len(firstInputs) != len(lastInputs) { + moving[name] = true + continue + } + for i := range firstInputs { + if firstInputs[i] != lastInputs[i] { + moving[name] = true + break + } + } + } + + // Build filtered trajectory with only moving frames. filtered := make(motionplan.Trajectory, len(traj)) for i, step := range traj { - filteredStep := make(referenceframe.FrameSystemInputs, 1) + filteredStep := make(referenceframe.FrameSystemInputs, len(moving)) for name, inputs := range step { - if name == componentName && len(inputs) > 0 { + if moving[name] { filteredStep[name] = inputs } } filtered[i] = filteredStep } - // Log if no matching entries found — likely a name mismatch. - if len(traj) > 0 { - if _, ok := traj[0][componentName]; !ok { - keys := make([]string, 0, len(traj[0])) - for k := range traj[0] { - keys = append(keys, k) - } - logger.Warnf("filterTrajectoryForComponent: component %q not found in trajectory keys %v", componentName, keys) - } - } return filtered } @@ -282,7 +297,7 @@ func (tp *teleopPipeline) runExecutor(ctx context.Context, ms *builtIn) { // Without filtering, the trajectory includes all frame system components, // and the executor would send GoToInputs to other arms — causing them // to fight with their own pipelines. - traj = filterTrajectoryForComponent(traj, tp.moveReqBase.ComponentName, tp.logger) + traj = filterTrajectoryMovingFrames(traj) execStart := time.Now() // Skip start-position check (math.MaxFloat64) because the arm From 92feec9ac60b4247bc18e821c817322e484dd71e Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Thu, 2 Apr 2026 13:00:35 -0400 Subject: [PATCH 12/23] single pipeline --- services/motion/builtin/builtin.go | 65 +++- services/motion/builtin/teleop.go | 460 +++++++++++++---------------- 2 files changed, 266 insertions(+), 259 deletions(-) diff --git a/services/motion/builtin/builtin.go b/services/motion/builtin/builtin.go index 4bb576387f4..1df9d1ce273 100644 --- a/services/motion/builtin/builtin.go +++ b/services/motion/builtin/builtin.go @@ -147,9 +147,9 @@ type builtIn struct { logger logging.Logger configuredDefaultExtras map[string]any - // Teleop pipelines keyed by component name. Protected by teleopMu (separate from mu to simplify lock ordering). - teleopMu sync.RWMutex - teleopPipelines map[string]*teleopPipeline + // Teleop pipeline. Protected by teleopMu (separate from mu to simplify lock ordering). + teleopMu sync.RWMutex + teleopPipeline *teleopPipeline } // NewBuiltIn returns a new move and grab service for the given robot. @@ -160,7 +160,6 @@ func NewBuiltIn( Named: conf.ResourceName().AsNamed(), logger: logger, configuredDefaultExtras: make(map[string]any), - teleopPipelines: make(map[string]*teleopPipeline), } if err := ms.Reconfigure(ctx, deps, conf); err != nil { @@ -175,11 +174,11 @@ func (ms *builtIn) Reconfigure( deps resource.Dependencies, conf resource.Config, ) error { - // Stop all teleop pipelines before acquiring write lock (goroutines may hold RLock). + // Stop teleop pipeline before acquiring write lock (goroutines may hold RLock). ms.teleopMu.Lock() - for name, tp := range ms.teleopPipelines { - tp.stop(ctx, ms) - delete(ms.teleopPipelines, name) + if ms.teleopPipeline != nil { + ms.teleopPipeline.stop(ctx, ms) + ms.teleopPipeline = nil } ms.teleopMu.Unlock() @@ -227,9 +226,9 @@ func (ms *builtIn) Reconfigure( func (ms *builtIn) Close(ctx context.Context) error { ms.teleopMu.Lock() - for name, tp := range ms.teleopPipelines { - tp.stop(ctx, ms) - delete(ms.teleopPipelines, name) + if ms.teleopPipeline != nil { + ms.teleopPipeline.stop(ctx, ms) + ms.teleopPipeline = nil } ms.teleopMu.Unlock() @@ -607,6 +606,50 @@ func (ms *builtIn) planTeleop( return plan, err } +// planTeleopMulti plans a trajectory for multiple components simultaneously. +// It builds a multi-frame goal from the given poses map, allowing the planner +// to find collision-free paths for all arms jointly. +func (ms *builtIn) planTeleopMulti( + ctx context.Context, + goals referenceframe.FrameSystemPoses, + extra map[string]interface{}, + frameSys *referenceframe.FrameSystem, + fsInputs referenceframe.FrameSystemInputs, + logger logging.Logger, +) (motionplan.Plan, error) { + ctx, span := trace.StartSpan(ctx, "motion::builtin::planTeleopMulti") + defer span.End() + + // Transform all goal poses to world frame. + worldGoals := make(referenceframe.FrameSystemPoses, len(goals)) + for fName, destination := range goals { + tf, err := frameSys.Transform(fsInputs.ToLinearInputs(), destination, referenceframe.World) + if err != nil { + return nil, err + } + goalPose, _ := tf.(*referenceframe.PoseInFrame) + worldGoals[fName] = goalPose + } + + startState := armplanning.NewPlanState(nil, fsInputs) + goalState := armplanning.NewPlanState(worldGoals, nil) + + planOpts, err := armplanning.NewPlannerOptionsFromExtra(extra) + if err != nil { + return nil, err + } + + planRequest := &armplanning.PlanRequest{ + FrameSystem: frameSys, + Goals: []*armplanning.PlanState{goalState}, + StartState: startState, + PlannerOptions: planOpts, + } + + plan, _, err := armplanning.PlanMotion(ctx, logger, planRequest) + return plan, err +} + func (ms *builtIn) execute(ctx context.Context, trajectory motionplan.Trajectory, epsilon float64) error { // Batch GoToInputs calls if possible; components may want to blend between inputs combinedSteps := []map[string][][]referenceframe.Input{} diff --git a/services/motion/builtin/teleop.go b/services/motion/builtin/teleop.go index a61fa17b7b2..7916f033e6c 100644 --- a/services/motion/builtin/teleop.go +++ b/services/motion/builtin/teleop.go @@ -21,21 +21,35 @@ import ( "go.viam.com/rdk/utils" ) +// teleopComponent tracks a single component being teleop'd within the pipeline. +type teleopComponent struct { + name string + moveReqBase motion.MoveReq + latestPose atomic.Pointer[referenceframe.PoseInFrame] +} + // teleopPipeline manages the continuous motion pipeline for low-latency teleop. -// It runs two goroutines connected by channels: +// It supports multiple components (arms) planned jointly in a single pipeline +// to guarantee collision-free trajectories. // -// poseCh → Planner goroutine → trajCh → Executor goroutine → arm.GoToInputs() +// notify → Planner goroutine → trajCh → Executor goroutine → arm.GoToInputs() type teleopPipeline struct { logger logging.Logger // Immutable after creation. - moveReqBase motion.MoveReq - cachedFrameSys *referenceframe.FrameSystem // built once at pipeline start - cachedBaseInputs referenceframe.FrameSystemInputs // snapshot at pipeline start, avoids blocking CurrentInputs calls + cachedFrameSys *referenceframe.FrameSystem // built once at pipeline start + cachedBaseInputs referenceframe.FrameSystemInputs // snapshot at pipeline start + + // Components being teleop'd. Protected by componentsMu. + componentsMu sync.RWMutex + components map[string]*teleopComponent - // Channels. - poseCh chan *referenceframe.PoseInFrame // buffer 1, latest-value semantics - trajCh chan motionplan.Trajectory // buffer 1, one-ahead lookahead + // Notification channel — poked when any component gets a new pose. + // Buffer 1, latest-value semantics. + notify chan struct{} + + // Trajectory output channel. Buffer 1, one-ahead lookahead. + trajCh chan motionplan.Trajectory // Planning head: the last configuration the planner planned TO. // This allows trajectories to chain seamlessly. @@ -57,43 +71,38 @@ type teleopPipeline struct { workers *goutils.StoppableWorkers } -// trySendLatest sends pose on ch using latest-value semantics: -// if a stale value is buffered, it is drained first so the new value replaces it. +// trySendNotify pokes the notify channel using latest-value semantics. // Safe for concurrent callers: never blocks. -func trySendLatest(ch chan *referenceframe.PoseInFrame, pose *referenceframe.PoseInFrame) { - // Fast path: channel is empty, send directly. +func trySendNotify(ch chan struct{}) { select { - case ch <- pose: + case ch <- struct{}{}: return default: } - // Channel full — drain stale value and retry. select { case <-ch: default: } select { - case ch <- pose: + case ch <- struct{}{}: default: - // Another writer beat us; their pose is equally fresh. } } -// runPlanner is the planner goroutine. It reads poses from poseCh, -// plans trajectories from the planning head, and sends them on trajCh. +// runPlanner is the planner goroutine. It wakes on notify signals, +// reads all components' latest poses, and plans a joint trajectory. func (tp *teleopPipeline) runPlanner(ctx context.Context, ms *builtIn) { for { select { case <-ctx.Done(): return - case pose := <-tp.poseCh: - tp.planOnce(ctx, ms, pose) + case <-tp.notify: + tp.planOnce(ctx, ms) } } } // planningHeadEqual reports whether two FrameSystemInputs snapshots are identical. -// Used to detect whether the planning head was reset while a plan was in flight. func planningHeadEqual(a, b referenceframe.FrameSystemInputs) bool { if len(a) != len(b) { return false @@ -112,46 +121,52 @@ func planningHeadEqual(a, b referenceframe.FrameSystemInputs) bool { return true } -func (tp *teleopPipeline) planOnce(ctx context.Context, ms *builtIn, pose *referenceframe.PoseInFrame) { - // Read the current planning head for the teleop'd arm. +func (tp *teleopPipeline) planOnce(ctx context.Context, ms *builtIn) { + // Snapshot the planning head. tp.planningHeadMu.RLock() planningHead := tp.planningHead tp.planningHeadMu.RUnlock() - // Build merged inputs from cached base inputs (snapshot at pipeline start) and - // planning heads. This avoids calling CurrentInputs() which would block on arms - // that are mid-execution, causing stuttery alternating movement in multi-arm setups. + // Build merged inputs from cached base + planning head. inputsStart := time.Now() mergedInputs := make(referenceframe.FrameSystemInputs, len(tp.cachedBaseInputs)) for k, v := range tp.cachedBaseInputs { mergedInputs[k] = v } - // Overlay planning head entries for the teleop'd arm's frames. for k, v := range planningHead { mergedInputs[k] = v } - // Overlay planning heads from other active teleop pipelines so that this - // planner collision-checks against where the other arms are heading, not - // just where they currently are. - for _, head := range ms.otherPlanningHeads(tp.moveReqBase.ComponentName) { - for k, v := range head { - mergedInputs[k] = v - } - } inputsDur := time.Since(inputsStart) tp.lastInputsNanos.Store(inputsDur.Nanoseconds()) - // Build a MoveReq with start_state set to the merged config. - req := tp.buildMoveReq(pose, mergedInputs) + // Collect latest poses from all registered components into a multi-frame goal. + tp.componentsMu.RLock() + goals := make(referenceframe.FrameSystemPoses, len(tp.components)) + var extra map[string]interface{} + for _, comp := range tp.components { + pose := comp.latestPose.Load() + if pose == nil { + continue + } + goals[comp.name] = pose + // Use the first component's extra for planner options (they should be the same). + if extra == nil { + extra = tp.buildExtra(comp.moveReqBase.Extra, mergedInputs) + } + } + tp.componentsMu.RUnlock() + + if len(goals) == 0 { + return + } - // Call ms.planTeleop with cached frame system and merged inputs. + // Plan for all components jointly. planStart := time.Now() ms.mu.RLock() - plan, err := ms.planTeleop(ctx, req, tp.cachedFrameSys, mergedInputs, tp.logger) + plan, err := ms.planTeleopMulti(ctx, goals, extra, tp.cachedFrameSys, mergedInputs, tp.logger) ms.mu.RUnlock() planDur := time.Since(planStart) tp.lastPlanNanos.Store(planDur.Nanoseconds()) - // Includes failed plans; compare with exec_count for success rate. tp.planCount.Add(1) if err != nil { @@ -162,18 +177,14 @@ func (tp *teleopPipeline) planOnce(ctx context.Context, ms *builtIn, pose *refer tp.lastErr.Store(nil) traj := plan.Trajectory() - tp.logger.CInfof(ctx, "teleop planner: inputs took: %s, plan took: %s, traj size: %d", inputsDur, planDur, len(traj)) + tp.logger.CInfof(ctx, "teleop planner: inputs took: %s, plan took: %s, traj size: %d, components: %d", + inputsDur, planDur, len(traj), len(goals)) if len(traj) == 0 { return } - // Re-acquire the write lock to atomically validate that the planning head - // hasn't been reset (by an execution error) while we were planning, update - // it to the last step of this trajectory, and enqueue the trajectory. - // The send must be non-blocking: a blocking send while holding the lock - // would deadlock with resetPlanningHead, which also needs the write lock to - // drain trajCh. If the channel is full the head is left unchanged so the - // next planning iteration re-plans from the same base. + // Atomically validate the planning head hasn't been reset, update it, + // and enqueue the trajectory. lastStep := traj[len(traj)-1] tp.planningHeadMu.Lock() if !planningHeadEqual(tp.planningHead, planningHead) { @@ -185,28 +196,22 @@ func (tp *teleopPipeline) planOnce(ctx context.Context, ms *builtIn, pose *refer case tp.trajCh <- traj: tp.planningHead = lastStep default: - // Executor is busy; leave head unchanged and let the next pose trigger a fresh plan. + // Executor is busy; leave head unchanged and let the next notify trigger a fresh plan. } tp.planningHeadMu.Unlock() } -// buildMoveReq creates a MoveReq from the template with the given destination -// and start_state set to the planning head configuration. -func (tp *teleopPipeline) buildMoveReq( - pose *referenceframe.PoseInFrame, +// buildExtra creates the extra map for planner options with teleop defaults and start_state. +func (tp *teleopPipeline) buildExtra( + baseExtra map[string]interface{}, startConfig referenceframe.FrameSystemInputs, -) motion.MoveReq { - req := tp.moveReqBase - req.Destination = pose - - // Clone Extra to avoid mutating the template. - extra := make(map[string]interface{}, len(tp.moveReqBase.Extra)+1) - for k, v := range tp.moveReqBase.Extra { +) map[string]interface{} { + extra := make(map[string]interface{}, len(baseExtra)+5) + for k, v := range baseExtra { extra[k] = v } - // Build start_state in the format DeserializePlanState expects ([]interface{} - // values, not native []float64) since this path doesn't go through a proto - // round-trip that would convert the types. + + // Build start_state in the format DeserializePlanState expects. confMap := make(map[string]interface{}, len(startConfig)) for fName, inputs := range startConfig { iArr := make([]interface{}, len(inputs)) @@ -217,13 +222,12 @@ func (tp *teleopPipeline) buildMoveReq( } extra["start_state"] = map[string]interface{}{"configuration": confMap} - // Apply teleop-optimized planner defaults. These only set values not - // already present so callers can override via teleop_start extra. + // Apply teleop-optimized planner defaults. teleopDefaults := map[string]interface{}{ - "timeout": 5.0, // seconds; default is 300 - "max_ik_solutions": 20, // default is 100 - "min_ik_score": 0.05, // default is 0.01 - "frame_step": 0.05, // default is 0.01; reduces trajectory steps from ~14 to ~3-4 + "timeout": 5.0, + "max_ik_solutions": 20, + "min_ik_score": 0.05, + "frame_step": 0.05, } for k, v := range teleopDefaults { if _, ok := extra[k]; !ok { @@ -231,55 +235,14 @@ func (tp *teleopPipeline) buildMoveReq( } } - req.Extra = extra + // Clear waypoints — not used in teleop. + extra["waypoints"] = nil - return req -} - -// filterTrajectoryMovingFrames returns a copy of the trajectory containing only -// entries for frames whose inputs actually change across the trajectory. The planner -// outputs entries for ALL frames, but only the teleop'd arm's inputs change — static -// entries for other arms must be excluded to prevent conflicting GoToInputs calls -// from concurrent pipelines. -func filterTrajectoryMovingFrames(traj motionplan.Trajectory) motionplan.Trajectory { - if len(traj) < 2 { - return traj - } - - // Identify which frames change between the first and last step. - first := traj[0] - last := traj[len(traj)-1] - moving := make(map[string]bool) - for name, firstInputs := range first { - lastInputs, ok := last[name] - if !ok || len(firstInputs) != len(lastInputs) { - moving[name] = true - continue - } - for i := range firstInputs { - if firstInputs[i] != lastInputs[i] { - moving[name] = true - break - } - } - } - - // Build filtered trajectory with only moving frames. - filtered := make(motionplan.Trajectory, len(traj)) - for i, step := range traj { - filteredStep := make(referenceframe.FrameSystemInputs, len(moving)) - for name, inputs := range step { - if moving[name] { - filteredStep[name] = inputs - } - } - filtered[i] = filteredStep - } - return filtered + return extra } // runExecutor is the executor goroutine. It reads trajectories from trajCh -// and executes them on the arm via ms.execute. +// and executes them via ms.execute, which calls GoToInputs on all components. func (tp *teleopPipeline) runExecutor(ctx context.Context, ms *builtIn) { var lastExecEnd time.Time var totalCycle time.Duration @@ -293,12 +256,6 @@ func (tp *teleopPipeline) runExecutor(ctx context.Context, ms *builtIn) { waitDur := time.Since(waitStart) tp.lastExecWaitNanos.Store(waitDur.Nanoseconds()) - // Filter trajectory to only include this pipeline's component. - // Without filtering, the trajectory includes all frame system components, - // and the executor would send GoToInputs to other arms — causing them - // to fight with their own pipelines. - traj = filterTrajectoryMovingFrames(traj) - execStart := time.Now() // Skip start-position check (math.MaxFloat64) because the arm // is in continuous motion and won't be exactly at the trajectory start. @@ -307,7 +264,6 @@ func (tp *teleopPipeline) runExecutor(ctx context.Context, ms *builtIn) { ms.mu.RUnlock() execDur := time.Since(execStart) tp.lastExecNanos.Store(execDur.Nanoseconds()) - // Includes failed executions; compare with plan_count for pipeline health. tp.execCount.Add(1) if !lastExecEnd.IsZero() { @@ -334,10 +290,7 @@ func (tp *teleopPipeline) runExecutor(ctx context.Context, ms *builtIn) { } // resetPlanningHead sets the planning head to the arm's actual current position -// after an execution error. Resetting the planning head invalidates all previously -// planned trajectories: any trajectory in trajCh was chained from the old (now -// incorrect) head. The drain of trajCh and the head reset are held under the same -// write lock so that planOnce cannot enqueue a stale trajectory between them. +// after an execution error. func (tp *teleopPipeline) resetPlanningHead(ctx context.Context, ms *builtIn) { ms.mu.RLock() defer ms.mu.RUnlock() @@ -357,124 +310,106 @@ func (tp *teleopPipeline) resetPlanningHead(ctx context.Context, ms *builtIn) { tp.planningHeadMu.Unlock() } -// stop shuts down the pipeline goroutines and best-effort stops the arm. +// stop shuts down the pipeline goroutines. func (tp *teleopPipeline) stop(ctx context.Context, ms *builtIn) { tp.workers.Stop() } -// startTeleopPipeline creates and starts a new teleop pipeline for the given component. -// If a pipeline already exists for that component, it is stopped first. Other component pipelines are unaffected. -func (ms *builtIn) startTeleopPipeline(cmdCtx context.Context, req motion.MoveReq) error { +// addTeleopComponent adds a component to the teleop pipeline, creating the pipeline if needed. +func (ms *builtIn) addTeleopComponent(cmdCtx context.Context, req motion.MoveReq) error { ms.teleopMu.Lock() - // Stop only the pipeline for this component, if one exists. - if existing, ok := ms.teleopPipelines[req.ComponentName]; ok { - existing.stop(cmdCtx, ms) - delete(ms.teleopPipelines, req.ComponentName) - } defer ms.teleopMu.Unlock() ms.mu.RLock() - fsInputs, err := ms.fsService.CurrentInputs(cmdCtx) - if err != nil { - ms.mu.RUnlock() - return err - } - - // Validate the command. + // Validate the component. if _, ok := ms.components[req.ComponentName]; !ok || req.Destination == nil { ms.mu.RUnlock() - return fmt.Errorf("Component must exist and destination must be set. Component: %v Destination: %v", + return fmt.Errorf("component must exist and destination must be set. Component: %v Destination: %v", req.ComponentName, req.Destination) } - // Build and cache the frame system once for the lifetime of this pipeline. - // The kinematic structure doesn't change during teleop; Reconfigure() stops - // the pipeline before any config changes. - frameSys, err := ms.getFrameSystem(cmdCtx, req.WorldState.Transforms()) - if err != nil { + if ms.teleopPipeline == nil { + // Create a new pipeline. + fsInputs, err := ms.fsService.CurrentInputs(cmdCtx) + if err != nil { + ms.mu.RUnlock() + return err + } + + frameSys, err := ms.getFrameSystem(cmdCtx, req.WorldState.Transforms()) + if err != nil { + ms.mu.RUnlock() + return err + } ms.mu.RUnlock() - return err - } - ms.mu.RUnlock() - tp := &teleopPipeline{ - logger: ms.logger.Sublogger("teleop." + req.ComponentName), - moveReqBase: req, - cachedFrameSys: frameSys, - cachedBaseInputs: fsInputs, - poseCh: make(chan *referenceframe.PoseInFrame, 1), - trajCh: make(chan motionplan.Trajectory, 1), - planningHead: fsInputs, - } + tp := &teleopPipeline{ + logger: ms.logger.Sublogger("teleop"), + cachedFrameSys: frameSys, + cachedBaseInputs: fsInputs, + components: make(map[string]*teleopComponent), + notify: make(chan struct{}, 1), + trajCh: make(chan motionplan.Trajectory, 1), + planningHead: fsInputs, + } - tp.poseCh <- req.Destination - tp.workers = goutils.NewBackgroundStoppableWorkers( - func(pipelineCtx context.Context) { tp.runPlanner(pipelineCtx, ms) }, - func(pipelineCtx context.Context) { tp.runExecutor(pipelineCtx, ms) }, - ) + comp := &teleopComponent{ + name: req.ComponentName, + moveReqBase: req, + } + comp.latestPose.Store(req.Destination) + tp.components[req.ComponentName] = comp - ms.teleopPipelines[req.ComponentName] = tp + // Send initial notification to trigger first plan. + trySendNotify(tp.notify) - return nil -} + tp.workers = goutils.NewBackgroundStoppableWorkers( + func(pipelineCtx context.Context) { tp.runPlanner(pipelineCtx, ms) }, + func(pipelineCtx context.Context) { tp.runExecutor(pipelineCtx, ms) }, + ) -// otherPlanningHeads returns the planning heads of all active teleop pipelines -// except the one for the given component. This allows each pipeline's planner to -// collision-check against where other arms are heading. -func (ms *builtIn) otherPlanningHeads(self string) map[string]referenceframe.FrameSystemInputs { - ms.teleopMu.RLock() - defer ms.teleopMu.RUnlock() + ms.teleopPipeline = tp + } else { + ms.mu.RUnlock() - result := make(map[string]referenceframe.FrameSystemInputs, len(ms.teleopPipelines)-1) - for name, tp := range ms.teleopPipelines { - if name == self { - continue + // Add component to existing pipeline. + tp := ms.teleopPipeline + tp.componentsMu.Lock() + comp := &teleopComponent{ + name: req.ComponentName, + moveReqBase: req, } - tp.planningHeadMu.RLock() - result[name] = tp.planningHead - tp.planningHeadMu.RUnlock() - } - return result -} - -// lookupPipeline finds the teleop pipeline for the given DoCommand map. -// It uses the "component_name" key if present; otherwise, if exactly one pipeline -// exists, it returns that one (backward compat for single-arm teleop). -func (ms *builtIn) lookupPipeline(cmd map[string]interface{}) *teleopPipeline { - componentName, _ := cmd["component_name"].(string) + comp.latestPose.Store(req.Destination) + tp.components[req.ComponentName] = comp + tp.componentsMu.Unlock() - ms.teleopMu.RLock() - defer ms.teleopMu.RUnlock() - - if componentName != "" { - return ms.teleopPipelines[componentName] - } - // Backward compat: if no component_name and exactly one pipeline, use it. - if len(ms.teleopPipelines) == 1 { - for _, tp := range ms.teleopPipelines { - return tp - } + // Trigger a replan with the new component included. + trySendNotify(tp.notify) } + return nil } -// buildPipelineStatus returns a status map for a single teleop pipeline. -func buildPipelineStatus(tp *teleopPipeline) map[string]any { - status := map[string]any{ - "running": true, - "queued_poses": len(tp.poseCh), - "queued_plans": len(tp.trajCh), - "last_inputs_ms": float64(tp.lastInputsNanos.Load()) / 1e6, - "last_plan_ms": float64(tp.lastPlanNanos.Load()) / 1e6, - "last_exec_ms": float64(tp.lastExecNanos.Load()) / 1e6, - "last_exec_wait_ms": float64(tp.lastExecWaitNanos.Load()) / 1e6, - "plan_count": tp.planCount.Load(), - "exec_count": tp.execCount.Load(), +// removeTeleopComponent removes a component from the pipeline. +// If no components remain, the pipeline is stopped. +func (ms *builtIn) removeTeleopComponent(ctx context.Context, componentName string) { + ms.teleopMu.Lock() + defer ms.teleopMu.Unlock() + + tp := ms.teleopPipeline + if tp == nil { + return } - if lastErr := tp.lastErr.Load(); lastErr != nil { - status["error"] = (*lastErr).Error() + + tp.componentsMu.Lock() + delete(tp.components, componentName) + remaining := len(tp.components) + tp.componentsMu.Unlock() + + if remaining == 0 { + tp.stop(ctx, ms) + ms.teleopPipeline = nil } - return status } // handleTeleopCommand handles teleop DoCommand requests. @@ -511,7 +446,7 @@ func (ms *builtIn) handleTeleopCommand( return nil, true, err } - if err := ms.startTeleopPipeline(ctx, moveReq); err != nil { + if err := ms.addTeleopComponent(ctx, moveReq); err != nil { return nil, true, err } @@ -520,7 +455,11 @@ func (ms *builtIn) handleTeleopCommand( } if req, ok := cmd[DoTeleopMove]; ok { - tp := ms.lookupPipeline(cmd) + componentName, _ := cmd["component_name"].(string) + + ms.teleopMu.RLock() + tp := ms.teleopPipeline + ms.teleopMu.RUnlock() if tp == nil { return nil, true, fmt.Errorf("teleop pipeline is not running; call %s first", DoTeleopStart) } @@ -535,12 +474,31 @@ func (ms *builtIn) handleTeleopCommand( } pif := referenceframe.ProtobufToPoseInFrame(&pifProto) + + // Update the component's latest pose. + tp.componentsMu.RLock() + comp := tp.components[componentName] + // Backward compat: if no component_name and exactly one component, use it. + if comp == nil && componentName == "" && len(tp.components) == 1 { + for _, c := range tp.components { + comp = c + } + } + tp.componentsMu.RUnlock() + + if comp == nil { + return nil, true, fmt.Errorf("component %q not registered in teleop pipeline", componentName) + } + if seq, ok := cmd["seq"]; ok { if seqF, ok := seq.(float64); ok { - tp.logger.CDebugf(ctx, "teleop received seq=%d", int64(seqF)) + tp.logger.CDebugf(ctx, "teleop received component=%s seq=%d", comp.name, int64(seqF)) } } - trySendLatest(tp.poseCh, pif) + + comp.latestPose.Store(pif) + trySendNotify(tp.notify) + resp[DoTeleopMove] = true return resp, true, nil } @@ -548,49 +506,55 @@ func (ms *builtIn) handleTeleopCommand( if _, ok := cmd[DoTeleopStop]; ok { componentName, _ := cmd["component_name"].(string) - ms.teleopMu.Lock() if componentName == "" { - // Backward compat: stop ALL pipelines. - for name, tp := range ms.teleopPipelines { - tp.stop(ctx, ms) - delete(ms.teleopPipelines, name) + // Backward compat: stop entire pipeline. + ms.teleopMu.Lock() + if ms.teleopPipeline != nil { + ms.teleopPipeline.stop(ctx, ms) + ms.teleopPipeline = nil } + ms.teleopMu.Unlock() } else { - if tp, ok := ms.teleopPipelines[componentName]; ok { - tp.stop(ctx, ms) - delete(ms.teleopPipelines, componentName) - } + ms.removeTeleopComponent(ctx, componentName) } - ms.teleopMu.Unlock() resp[DoTeleopStop] = true return resp, true, nil } if _, ok := cmd[DoTeleopStatus]; ok { - componentName, _ := cmd["component_name"].(string) - ms.teleopMu.RLock() - defer ms.teleopMu.RUnlock() - - if componentName != "" { - tp, exists := ms.teleopPipelines[componentName] - if !exists { - return map[string]any{ - "running": false, - }, true, nil - } - resp[DoTeleopStatus] = buildPipelineStatus(tp) - return resp, true, nil + tp := ms.teleopPipeline + ms.teleopMu.RUnlock() + + if tp == nil { + return map[string]any{ + DoTeleopStatus: map[string]any{"running": false}, + }, true, nil } - // No component_name: return status of all pipelines. - allStatus := make(map[string]any, len(ms.teleopPipelines)+1) - allStatus["pipeline_count"] = len(ms.teleopPipelines) - for name, tp := range ms.teleopPipelines { - allStatus[name] = buildPipelineStatus(tp) + status := map[string]any{ + "running": true, + "last_inputs_ms": float64(tp.lastInputsNanos.Load()) / 1e6, + "last_plan_ms": float64(tp.lastPlanNanos.Load()) / 1e6, + "last_exec_ms": float64(tp.lastExecNanos.Load()) / 1e6, + "last_exec_wait_ms": float64(tp.lastExecWaitNanos.Load()) / 1e6, + "plan_count": tp.planCount.Load(), + "exec_count": tp.execCount.Load(), + } + if lastErr := tp.lastErr.Load(); lastErr != nil { + status["error"] = (*lastErr).Error() } - resp[DoTeleopStatus] = allStatus + + tp.componentsMu.RLock() + compNames := make([]string, 0, len(tp.components)) + for name := range tp.components { + compNames = append(compNames, name) + } + tp.componentsMu.RUnlock() + status["components"] = compNames + + resp[DoTeleopStatus] = status return resp, true, nil } From ea25e588c9ff56516d083f82fc3ec14bbb81a3f8 Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Thu, 2 Apr 2026 16:26:36 -0400 Subject: [PATCH 13/23] lower planning thresholds --- services/motion/builtin/teleop.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/services/motion/builtin/teleop.go b/services/motion/builtin/teleop.go index 7916f033e6c..42150d41ce4 100644 --- a/services/motion/builtin/teleop.go +++ b/services/motion/builtin/teleop.go @@ -222,12 +222,14 @@ func (tp *teleopPipeline) buildExtra( } extra["start_state"] = map[string]interface{}{"configuration": confMap} - // Apply teleop-optimized planner defaults. + // Apply teleop-optimized planner defaults. These are aggressive because + // teleop movements are tiny and frequent — we trade solution optimality + // for speed. teleopDefaults := map[string]interface{}{ "timeout": 5.0, - "max_ik_solutions": 20, - "min_ik_score": 0.05, - "frame_step": 0.05, + "max_ik_solutions": 10, // fewer solutions = faster (was 20) + "min_ik_score": 0.1, // accept worse solutions faster (was 0.05) + "frame_step": 0.1, // fewer trajectory steps (was 0.05) } for k, v := range teleopDefaults { if _, ok := extra[k]; !ok { From e04d14fc165133bb20c3b76779984a3694f4143a Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Thu, 2 Apr 2026 16:36:05 -0400 Subject: [PATCH 14/23] send commands in parallel --- components/arm/client.go | 5 +- services/motion/builtin/builtin.go | 108 ----------------------------- services/motion/builtin/teleop.go | 76 ++++++++++++++++++-- 3 files changed, 72 insertions(+), 117 deletions(-) diff --git a/components/arm/client.go b/components/arm/client.go index ad36877b798..5a5fdc13426 100644 --- a/components/arm/client.go +++ b/components/arm/client.go @@ -211,10 +211,7 @@ func (c *client) CurrentInputs(ctx context.Context) ([]referenceframe.Input, err } func (c *client) GoToInputs(ctx context.Context, inputSteps ...[]referenceframe.Input) error { - return c.MoveThroughJointPositions(ctx, inputSteps, nil, map[string]interface{}{ - "waitAtEnd": false, - "interpolate": false, - }) + return c.MoveThroughJointPositions(ctx, inputSteps, nil, nil) } func (c *client) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) { diff --git a/services/motion/builtin/builtin.go b/services/motion/builtin/builtin.go index 1df9d1ce273..f6f4cde962f 100644 --- a/services/motion/builtin/builtin.go +++ b/services/motion/builtin/builtin.go @@ -23,7 +23,6 @@ import ( "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/structpb" - "go.viam.com/rdk/components/arm" "go.viam.com/rdk/components/movementsensor" "go.viam.com/rdk/logging" "go.viam.com/rdk/motionplan" @@ -99,10 +98,6 @@ type Config struct { // example { "arm" : { "3" : { "min" : 0, "max" : 2 } } } InputRangeOverride map[string]map[string]referenceframe.Limit `json:"input_range_override"` - // TeleopSmallMoveRad is the joint-space L-inf displacement threshold (radians) - // below which execute() uses interpolate=true for smoother servo tracking. - // 0 disables adaptive behavior. Suggested starting value: 0.1 (~5.7 degrees). - TeleopSmallMoveRad float64 `json:"teleop_small_move_rad,omitempty"` } func (c *Config) shouldWritePlan(start time.Time, err error) bool { @@ -538,74 +533,6 @@ func (ms *builtIn) plan(ctx context.Context, req motion.MoveReq, logger logging. return plan, err } -// planTeleop is a low-latency variant of plan() for the teleop pipeline. -// It uses a pre-built frame system and caller-provided fsInputs (merged from -// live CurrentInputs + planning head) to avoid per-call overhead. -func (ms *builtIn) planTeleop( - ctx context.Context, - req motion.MoveReq, - frameSys *referenceframe.FrameSystem, - fsInputs referenceframe.FrameSystemInputs, - logger logging.Logger, -) (motionplan.Plan, error) { - ctx, span := trace.StartSpan(ctx, "motion::builtin::planTeleop") - defer span.End() - - movingFrame := frameSys.Frame(req.ComponentName) - if movingFrame == nil { - return nil, fmt.Errorf("component named %s not found in robot frame system", req.ComponentName) - } - - startState, waypoints, err := waypointsFromRequest(req, fsInputs) - if err != nil { - return nil, err - } - if len(waypoints) == 0 { - return nil, errors.New("could not find any waypoints to plan for in MoveRequest. Fill in Destination or goal_state") - } - - if req.Extra != nil { - req.Extra["waypoints"] = nil - } - - // Re-evaluate goal poses to be in the frame of World. - worldWaypoints := []*armplanning.PlanState{} - solvingFrame := referenceframe.World - for _, wp := range waypoints { - if wp.Poses() != nil { - step := referenceframe.FrameSystemPoses{} - for fName, destination := range wp.Poses() { - tf, err := frameSys.Transform(fsInputs.ToLinearInputs(), destination, solvingFrame) - if err != nil { - return nil, err - } - goalPose, _ := tf.(*referenceframe.PoseInFrame) - step[fName] = goalPose - } - worldWaypoints = append(worldWaypoints, armplanning.NewPlanState(step, wp.Configuration())) - } else { - worldWaypoints = append(worldWaypoints, wp) - } - } - - planOpts, err := armplanning.NewPlannerOptionsFromExtra(req.Extra) - if err != nil { - return nil, err - } - - planRequest := &armplanning.PlanRequest{ - FrameSystem: frameSys, - Goals: worldWaypoints, - StartState: startState, - WorldState: req.WorldState, - Constraints: req.Constraints, - PlannerOptions: planOpts, - } - - plan, _, err := armplanning.PlanMotion(ctx, logger, planRequest) - return plan, err -} - // planTeleopMulti plans a trajectory for multiple components simultaneously. // It builds a multi-frame goal from the given poses map, allowing the planner // to find collision-free paths for all arms jointly. @@ -739,28 +666,6 @@ func (ms *builtIn) execute(ctx context.Context, trajectory motionplan.Trajectory return err } - // Adaptive interpolation: for small movements on arms, use interpolate=true - // for smooth servo tracking instead of the default fast-streaming GoToInputs. - if threshold := ms.conf.TeleopSmallMoveRad; threshold > 0 && len(inputs) >= 2 { - if maxLinfDisplacement(inputs) < threshold { - if armComp, ok := r.(arm.Arm); ok { - err := armComp.MoveThroughJointPositions(ctx, inputs, nil, map[string]interface{}{ - "waitAtEnd": false, - "interpolate": true, - }) - if err != nil { - if actuator, ok := r.(inputEnabledActuator); ok { - if stopErr := actuator.Stop(ctx, nil); stopErr != nil { - return errors.Wrap(err, stopErr.Error()) - } - } - return err - } - continue - } - } - } - if err := ie.GoToInputs(ctx, inputs...); err != nil { // If there is an error on GoToInputs, stop the component if possible before returning the error if actuator, ok := r.(inputEnabledActuator); ok { @@ -775,19 +680,6 @@ func (ms *builtIn) execute(ctx context.Context, trajectory motionplan.Trajectory return nil } -// maxLinfDisplacement returns the max L-inf joint displacement (radians) across -// consecutive pairs of input steps. -func maxLinfDisplacement(steps [][]referenceframe.Input) float64 { - maxDisp := 0.0 - for i := 1; i < len(steps); i++ { - d := referenceframe.InputsLinfDistance(steps[i-1], steps[i]) - if d > maxDisp { - maxDisp = d - } - } - return maxDisp -} - // applyDefaultExtras iterates through the list of default extras configured on the builtIn motion service and adds them to the // given map of extras if the key does not already exist. func (ms *builtIn) applyDefaultExtras(extras map[string]any) { diff --git a/services/motion/builtin/teleop.go b/services/motion/builtin/teleop.go index 42150d41ce4..f537bbfe0b5 100644 --- a/services/motion/builtin/teleop.go +++ b/services/motion/builtin/teleop.go @@ -3,7 +3,6 @@ package builtin import ( "context" "fmt" - "math" "sync" "sync/atomic" "time" @@ -14,9 +13,12 @@ import ( "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/structpb" + "go.viam.com/rdk/components/arm" "go.viam.com/rdk/logging" "go.viam.com/rdk/motionplan" "go.viam.com/rdk/referenceframe" + "go.viam.com/rdk/resource" + "go.viam.com/rdk/robot/framesystem" "go.viam.com/rdk/services/motion" "go.viam.com/rdk/utils" ) @@ -243,8 +245,74 @@ func (tp *teleopPipeline) buildExtra( return extra } +// executeTeleop executes a trajectory by calling GoToInputs on all components +// in parallel. Unlike ms.execute, it skips the step-0 position check (which +// blocks on CurrentInputs gRPC calls) and sends commands to all arms concurrently +// rather than sequentially. +func (tp *teleopPipeline) executeTeleop(ctx context.Context, ms *builtIn, traj motionplan.Trajectory) error { + if len(traj) < 2 { + return nil + } + + // Group inputs per component across all trajectory steps (skip step 0 = start position). + perComponent := make(map[string][][]referenceframe.Input) + for i := 1; i < len(traj); i++ { + for name, inputs := range traj[i] { + if len(inputs) == 0 { + continue + } + perComponent[name] = append(perComponent[name], inputs) + } + } + + // Execute GoToInputs on all components in parallel. + var wg sync.WaitGroup + errs := make([]error, len(perComponent)) + idx := 0 + for name, inputs := range perComponent { + r, ok := ms.components[name] + if !ok { + continue + } + ie, err := utils.AssertType[framesystem.InputEnabled](r) + if err != nil { + continue + } + wg.Add(1) + go func(i int, ie framesystem.InputEnabled, inputs [][]referenceframe.Input, r resource.Resource) { + defer wg.Done() + var err error + // For arms, call MoveThroughJointPositions directly with teleop-specific + // extras (no wait, no interpolation) to avoid changing core GoToInputs behavior. + if armComp, ok := r.(arm.Arm); ok { + err = armComp.MoveThroughJointPositions(ctx, inputs, nil, map[string]interface{}{ + "waitAtEnd": false, + "interpolate": false, + }) + } else { + err = ie.GoToInputs(ctx, inputs...) + } + if err != nil { + if actuator, ok := r.(inputEnabledActuator); ok { + _ = actuator.Stop(ctx, nil) + } + errs[i] = err + } + }(idx, ie, inputs, r) + idx++ + } + wg.Wait() + + for _, err := range errs { + if err != nil { + return err + } + } + return nil +} + // runExecutor is the executor goroutine. It reads trajectories from trajCh -// and executes them via ms.execute, which calls GoToInputs on all components. +// and executes them in parallel across all components. func (tp *teleopPipeline) runExecutor(ctx context.Context, ms *builtIn) { var lastExecEnd time.Time var totalCycle time.Duration @@ -259,10 +327,8 @@ func (tp *teleopPipeline) runExecutor(ctx context.Context, ms *builtIn) { tp.lastExecWaitNanos.Store(waitDur.Nanoseconds()) execStart := time.Now() - // Skip start-position check (math.MaxFloat64) because the arm - // is in continuous motion and won't be exactly at the trajectory start. ms.mu.RLock() - err := ms.execute(ctx, traj, math.MaxFloat64) + err := tp.executeTeleop(ctx, ms, traj) ms.mu.RUnlock() execDur := time.Since(execStart) tp.lastExecNanos.Store(execDur.Nanoseconds()) From de2c2e7b5a4dc27053a42e6af24db5def1bb5561 Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Fri, 10 Apr 2026 12:06:55 -0400 Subject: [PATCH 15/23] add smoothing knob --- services/motion/builtin/builtin.go | 6 +++++ services/motion/builtin/teleop.go | 38 +++++++++++++++++++++++++++++- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/services/motion/builtin/builtin.go b/services/motion/builtin/builtin.go index f6f4cde962f..cc5eeb5f6c1 100644 --- a/services/motion/builtin/builtin.go +++ b/services/motion/builtin/builtin.go @@ -98,6 +98,12 @@ type Config struct { // example { "arm" : { "3" : { "min" : 0, "max" : 2 } } } InputRangeOverride map[string]map[string]referenceframe.Limit `json:"input_range_override"` + // TeleopSmallMoveRad is the max joint displacement (radians) below which + // the arm's built-in interpolation is enabled for smooth motion. Default 0.005. + TeleopSmallMoveRad float64 `json:"teleop_small_move_rad"` + // TeleopInterpolateOverride when true forces interpolation ON for all teleop + // movements regardless of size. Useful for testing. + TeleopInterpolateOverride bool `json:"teleop_interpolate_override"` } func (c *Config) shouldWritePlan(start time.Time, err error) bool { diff --git a/services/motion/builtin/teleop.go b/services/motion/builtin/teleop.go index f537bbfe0b5..a997e63eeca 100644 --- a/services/motion/builtin/teleop.go +++ b/services/motion/builtin/teleop.go @@ -3,6 +3,7 @@ package builtin import ( "context" "fmt" + "math" "sync" "sync/atomic" "time" @@ -23,6 +24,8 @@ import ( "go.viam.com/rdk/utils" ) +const defaultTeleopSmallMoveRad = 0.005 + // teleopComponent tracks a single component being teleop'd within the pipeline. type teleopComponent struct { name string @@ -245,6 +248,27 @@ func (tp *teleopPipeline) buildExtra( return extra } +// shouldInterpolate returns true if the max single-joint displacement +// across the trajectory is below the threshold, meaning the arm's +// built-in trapezoidal interpolation should be used for smooth motion. +func shouldInterpolate(inputs [][]referenceframe.Input, threshold float64) bool { + if len(inputs) < 2 { + return true + } + first := inputs[0] + last := inputs[len(inputs)-1] + if len(first) != len(last) { + return false + } + var maxDisp float64 + for j := range first { + if d := math.Abs(last[j] - first[j]); d > maxDisp { + maxDisp = d + } + } + return maxDisp < threshold +} + // executeTeleop executes a trajectory by calling GoToInputs on all components // in parallel. Unlike ms.execute, it skips the step-0 position check (which // blocks on CurrentInputs gRPC calls) and sends commands to all arms concurrently @@ -265,6 +289,18 @@ func (tp *teleopPipeline) executeTeleop(ctx context.Context, ms *builtIn, traj m } } + // Read teleop interpolation config. + smallMoveRad := defaultTeleopSmallMoveRad + interpolateOverride := false + ms.mu.RLock() + if ms.conf != nil { + if ms.conf.TeleopSmallMoveRad > 0 { + smallMoveRad = ms.conf.TeleopSmallMoveRad + } + interpolateOverride = ms.conf.TeleopInterpolateOverride + } + ms.mu.RUnlock() + // Execute GoToInputs on all components in parallel. var wg sync.WaitGroup errs := make([]error, len(perComponent)) @@ -287,7 +323,7 @@ func (tp *teleopPipeline) executeTeleop(ctx context.Context, ms *builtIn, traj m if armComp, ok := r.(arm.Arm); ok { err = armComp.MoveThroughJointPositions(ctx, inputs, nil, map[string]interface{}{ "waitAtEnd": false, - "interpolate": false, + "interpolate": interpolateOverride || shouldInterpolate(inputs, smallMoveRad), }) } else { err = ie.GoToInputs(ctx, inputs...) From d30b808c2dcf5b377bf7ff3caae308561464c823 Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Fri, 10 Apr 2026 12:53:06 -0400 Subject: [PATCH 16/23] add logging --- services/motion/builtin/teleop.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/services/motion/builtin/teleop.go b/services/motion/builtin/teleop.go index a997e63eeca..dfc318f3b12 100644 --- a/services/motion/builtin/teleop.go +++ b/services/motion/builtin/teleop.go @@ -251,14 +251,14 @@ func (tp *teleopPipeline) buildExtra( // shouldInterpolate returns true if the max single-joint displacement // across the trajectory is below the threshold, meaning the arm's // built-in trapezoidal interpolation should be used for smooth motion. -func shouldInterpolate(inputs [][]referenceframe.Input, threshold float64) bool { +func maxJointDisplacement(inputs [][]referenceframe.Input) float64 { if len(inputs) < 2 { - return true + return 0 } first := inputs[0] last := inputs[len(inputs)-1] if len(first) != len(last) { - return false + return 0 } var maxDisp float64 for j := range first { @@ -266,7 +266,7 @@ func shouldInterpolate(inputs [][]referenceframe.Input, threshold float64) bool maxDisp = d } } - return maxDisp < threshold + return maxDisp } // executeTeleop executes a trajectory by calling GoToInputs on all components @@ -321,9 +321,13 @@ func (tp *teleopPipeline) executeTeleop(ctx context.Context, ms *builtIn, traj m // For arms, call MoveThroughJointPositions directly with teleop-specific // extras (no wait, no interpolation) to avoid changing core GoToInputs behavior. if armComp, ok := r.(arm.Arm); ok { + disp := maxJointDisplacement(inputs) + interp := interpolateOverride || disp < smallMoveRad + tp.logger.CDebugf(ctx, "teleop exec: interpolate=%v override=%v maxDisp=%f threshold=%f steps=%d", + interp, interpolateOverride, disp, smallMoveRad, len(inputs)) err = armComp.MoveThroughJointPositions(ctx, inputs, nil, map[string]interface{}{ "waitAtEnd": false, - "interpolate": interpolateOverride || shouldInterpolate(inputs, smallMoveRad), + "interpolate": interp, }) } else { err = ie.GoToInputs(ctx, inputs...) From 29de7a4c028fc9a93996f612dcec9b91cf0e8050 Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Fri, 10 Apr 2026 13:16:29 -0400 Subject: [PATCH 17/23] add more logs --- services/motion/builtin/teleop.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/motion/builtin/teleop.go b/services/motion/builtin/teleop.go index dfc318f3b12..807f2cb958f 100644 --- a/services/motion/builtin/teleop.go +++ b/services/motion/builtin/teleop.go @@ -323,13 +323,14 @@ func (tp *teleopPipeline) executeTeleop(ctx context.Context, ms *builtIn, traj m if armComp, ok := r.(arm.Arm); ok { disp := maxJointDisplacement(inputs) interp := interpolateOverride || disp < smallMoveRad - tp.logger.CDebugf(ctx, "teleop exec: interpolate=%v override=%v maxDisp=%f threshold=%f steps=%d", + tp.logger.CInfof(ctx, "teleop exec: interpolate=%v override=%v maxDisp=%f threshold=%f steps=%d", interp, interpolateOverride, disp, smallMoveRad, len(inputs)) err = armComp.MoveThroughJointPositions(ctx, inputs, nil, map[string]interface{}{ "waitAtEnd": false, "interpolate": interp, }) } else { + tp.logger.CInfof(ctx, "teleop exec: component is not arm.Arm (type %T), falling back to GoToInputs", r) err = ie.GoToInputs(ctx, inputs...) } if err != nil { From 58941866354b6cbb5dad98805442670c8343276a Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Fri, 10 Apr 2026 13:25:58 -0400 Subject: [PATCH 18/23] Update teleop.go --- services/motion/builtin/teleop.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/services/motion/builtin/teleop.go b/services/motion/builtin/teleop.go index 807f2cb958f..6c7d14334a7 100644 --- a/services/motion/builtin/teleop.go +++ b/services/motion/builtin/teleop.go @@ -290,16 +290,15 @@ func (tp *teleopPipeline) executeTeleop(ctx context.Context, ms *builtIn, traj m } // Read teleop interpolation config. + // NOTE: caller (runExecutor) already holds ms.mu.RLock, so safe to read ms.conf directly. smallMoveRad := defaultTeleopSmallMoveRad interpolateOverride := false - ms.mu.RLock() if ms.conf != nil { if ms.conf.TeleopSmallMoveRad > 0 { smallMoveRad = ms.conf.TeleopSmallMoveRad } interpolateOverride = ms.conf.TeleopInterpolateOverride } - ms.mu.RUnlock() // Execute GoToInputs on all components in parallel. var wg sync.WaitGroup From 7c7fbb7a2b3ca0de57828ffb3e3e42a69c7b3ccb Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Fri, 10 Apr 2026 13:37:09 -0400 Subject: [PATCH 19/23] Update teleop.go --- services/motion/builtin/teleop.go | 32 ++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/services/motion/builtin/teleop.go b/services/motion/builtin/teleop.go index 6c7d14334a7..6e046421ac2 100644 --- a/services/motion/builtin/teleop.go +++ b/services/motion/builtin/teleop.go @@ -251,18 +251,13 @@ func (tp *teleopPipeline) buildExtra( // shouldInterpolate returns true if the max single-joint displacement // across the trajectory is below the threshold, meaning the arm's // built-in trapezoidal interpolation should be used for smooth motion. -func maxJointDisplacement(inputs [][]referenceframe.Input) float64 { - if len(inputs) < 2 { - return 0 - } - first := inputs[0] - last := inputs[len(inputs)-1] - if len(first) != len(last) { +func maxJointDisplacement(start, goal []referenceframe.Input) float64 { + if len(start) != len(goal) || len(start) == 0 { return 0 } var maxDisp float64 - for j := range first { - if d := math.Abs(last[j] - first[j]); d > maxDisp { + for j := range start { + if d := math.Abs(goal[j] - start[j]); d > maxDisp { maxDisp = d } } @@ -278,14 +273,20 @@ func (tp *teleopPipeline) executeTeleop(ctx context.Context, ms *builtIn, traj m return nil } - // Group inputs per component across all trajectory steps (skip step 0 = start position). + // Group inputs per component across all trajectory steps (skip step 0 for execution, + // but record it for displacement calculation). + startInputs := make(map[string][]referenceframe.Input) perComponent := make(map[string][][]referenceframe.Input) - for i := 1; i < len(traj); i++ { + for i := 0; i < len(traj); i++ { for name, inputs := range traj[i] { if len(inputs) == 0 { continue } - perComponent[name] = append(perComponent[name], inputs) + if i == 0 { + startInputs[name] = inputs + } else { + perComponent[name] = append(perComponent[name], inputs) + } } } @@ -314,13 +315,14 @@ func (tp *teleopPipeline) executeTeleop(ctx context.Context, ms *builtIn, traj m continue } wg.Add(1) - go func(i int, ie framesystem.InputEnabled, inputs [][]referenceframe.Input, r resource.Resource) { + go func(i int, ie framesystem.InputEnabled, inputs [][]referenceframe.Input, r resource.Resource, start []referenceframe.Input) { defer wg.Done() var err error // For arms, call MoveThroughJointPositions directly with teleop-specific // extras (no wait, no interpolation) to avoid changing core GoToInputs behavior. if armComp, ok := r.(arm.Arm); ok { - disp := maxJointDisplacement(inputs) + goal := inputs[len(inputs)-1] + disp := maxJointDisplacement(start, goal) interp := interpolateOverride || disp < smallMoveRad tp.logger.CInfof(ctx, "teleop exec: interpolate=%v override=%v maxDisp=%f threshold=%f steps=%d", interp, interpolateOverride, disp, smallMoveRad, len(inputs)) @@ -338,7 +340,7 @@ func (tp *teleopPipeline) executeTeleop(ctx context.Context, ms *builtIn, traj m } errs[i] = err } - }(idx, ie, inputs, r) + }(idx, ie, inputs, r, startInputs[name]) idx++ } wg.Wait() From 242301f50c6ddc9ccd7636381101bd8b9c665a17 Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Fri, 10 Apr 2026 13:58:00 -0400 Subject: [PATCH 20/23] Update teleop.go --- services/motion/builtin/teleop.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/services/motion/builtin/teleop.go b/services/motion/builtin/teleop.go index 6e046421ac2..2213ef06931 100644 --- a/services/motion/builtin/teleop.go +++ b/services/motion/builtin/teleop.go @@ -323,12 +323,12 @@ func (tp *teleopPipeline) executeTeleop(ctx context.Context, ms *builtIn, traj m if armComp, ok := r.(arm.Arm); ok { goal := inputs[len(inputs)-1] disp := maxJointDisplacement(start, goal) - interp := interpolateOverride || disp < smallMoveRad - tp.logger.CInfof(ctx, "teleop exec: interpolate=%v override=%v maxDisp=%f threshold=%f steps=%d", - interp, interpolateOverride, disp, smallMoveRad, len(inputs)) + smallMove := interpolateOverride || disp < smallMoveRad + tp.logger.CInfof(ctx, "teleop exec: smallMove=%v override=%v maxDisp=%f threshold=%f steps=%d", + smallMove, interpolateOverride, disp, smallMoveRad, len(inputs)) err = armComp.MoveThroughJointPositions(ctx, inputs, nil, map[string]interface{}{ - "waitAtEnd": false, - "interpolate": interp, + "waitAtEnd": smallMove, + "interpolate": smallMove, }) } else { tp.logger.CInfof(ctx, "teleop exec: component is not arm.Arm (type %T), falling back to GoToInputs", r) From 79c05def86df25a9026effdd71115cdfd7f96d13 Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Fri, 10 Apr 2026 14:52:36 -0400 Subject: [PATCH 21/23] smoothing --- services/motion/builtin/builtin.go | 4 ++ services/motion/builtin/teleop.go | 90 ++++++++++++++++-------------- 2 files changed, 52 insertions(+), 42 deletions(-) diff --git a/services/motion/builtin/builtin.go b/services/motion/builtin/builtin.go index cc5eeb5f6c1..7e750aa9db2 100644 --- a/services/motion/builtin/builtin.go +++ b/services/motion/builtin/builtin.go @@ -104,6 +104,10 @@ type Config struct { // TeleopInterpolateOverride when true forces interpolation ON for all teleop // movements regardless of size. Useful for testing. TeleopInterpolateOverride bool `json:"teleop_interpolate_override"` + // TeleopSmoothAlpha is the EMA smoothing factor for joint positions (0-1). + // Lower = smoother but more latency, higher = more responsive but less smooth. + // 0 = frozen, 1 = no smoothing. Default 0.5. + TeleopSmoothAlpha float64 `json:"teleop_smooth_alpha"` } func (c *Config) shouldWritePlan(start time.Time, err error) bool { diff --git a/services/motion/builtin/teleop.go b/services/motion/builtin/teleop.go index 2213ef06931..3cf0a22ffd5 100644 --- a/services/motion/builtin/teleop.go +++ b/services/motion/builtin/teleop.go @@ -3,7 +3,6 @@ package builtin import ( "context" "fmt" - "math" "sync" "sync/atomic" "time" @@ -24,7 +23,7 @@ import ( "go.viam.com/rdk/utils" ) -const defaultTeleopSmallMoveRad = 0.005 +const defaultTeleopSmoothAlpha = 0.5 // teleopComponent tracks a single component being teleop'd within the pipeline. type teleopComponent struct { @@ -72,6 +71,9 @@ type teleopPipeline struct { planCount atomic.Int64 execCount atomic.Int64 + // EMA-smoothed joint positions per component. Only accessed by executor goroutine. + smoothedJoints map[string][]referenceframe.Input + // Lifecycle. workers *goutils.StoppableWorkers } @@ -248,20 +250,20 @@ func (tp *teleopPipeline) buildExtra( return extra } -// shouldInterpolate returns true if the max single-joint displacement -// across the trajectory is below the threshold, meaning the arm's -// built-in trapezoidal interpolation should be used for smooth motion. -func maxJointDisplacement(start, goal []referenceframe.Input) float64 { - if len(start) != len(goal) || len(start) == 0 { - return 0 +// emaSmooth applies exponential moving average smoothing to joint positions. +// alpha controls responsiveness: 0 = frozen, 1 = no smoothing. +func emaSmooth(target, previous []referenceframe.Input, alpha float64) []referenceframe.Input { + if previous == nil || len(previous) != len(target) || alpha >= 1.0 { + result := make([]referenceframe.Input, len(target)) + copy(result, target) + return result } - var maxDisp float64 - for j := range start { - if d := math.Abs(goal[j] - start[j]); d > maxDisp { - maxDisp = d - } + b := 1.0 - alpha + result := make([]referenceframe.Input, len(target)) + for j := range target { + result[j] = alpha*target[j] + b*previous[j] } - return maxDisp + return result } // executeTeleop executes a trajectory by calling GoToInputs on all components @@ -273,30 +275,24 @@ func (tp *teleopPipeline) executeTeleop(ctx context.Context, ms *builtIn, traj m return nil } - // Group inputs per component across all trajectory steps (skip step 0 for execution, - // but record it for displacement calculation). - startInputs := make(map[string][]referenceframe.Input) + // Group inputs per component across all trajectory steps (skip step 0 = start position). perComponent := make(map[string][][]referenceframe.Input) - for i := 0; i < len(traj); i++ { + for i := 1; i < len(traj); i++ { for name, inputs := range traj[i] { if len(inputs) == 0 { continue } - if i == 0 { - startInputs[name] = inputs - } else { - perComponent[name] = append(perComponent[name], inputs) - } + perComponent[name] = append(perComponent[name], inputs) } } - // Read teleop interpolation config. + // Read teleop config. // NOTE: caller (runExecutor) already holds ms.mu.RLock, so safe to read ms.conf directly. - smallMoveRad := defaultTeleopSmallMoveRad + smoothAlpha := defaultTeleopSmoothAlpha interpolateOverride := false if ms.conf != nil { - if ms.conf.TeleopSmallMoveRad > 0 { - smallMoveRad = ms.conf.TeleopSmallMoveRad + if ms.conf.TeleopSmoothAlpha > 0 { + smoothAlpha = ms.conf.TeleopSmoothAlpha } interpolateOverride = ms.conf.TeleopInterpolateOverride } @@ -314,25 +310,34 @@ func (tp *teleopPipeline) executeTeleop(ctx context.Context, ms *builtIn, traj m if err != nil { continue } + + // Apply EMA smoothing to joint positions before sending (main goroutine only). + smoothed := make([][]referenceframe.Input, len(inputs)) + prev := tp.smoothedJoints[name] + for k, step := range inputs { + smoothed[k] = emaSmooth(step, prev, smoothAlpha) + prev = smoothed[k] + } + tp.smoothedJoints[name] = prev + wg.Add(1) - go func(i int, ie framesystem.InputEnabled, inputs [][]referenceframe.Input, r resource.Resource, start []referenceframe.Input) { + go func(i int, ie framesystem.InputEnabled, smoothed [][]referenceframe.Input, r resource.Resource) { defer wg.Done() var err error - // For arms, call MoveThroughJointPositions directly with teleop-specific - // extras (no wait, no interpolation) to avoid changing core GoToInputs behavior. if armComp, ok := r.(arm.Arm); ok { - goal := inputs[len(inputs)-1] - disp := maxJointDisplacement(start, goal) - smallMove := interpolateOverride || disp < smallMoveRad - tp.logger.CInfof(ctx, "teleop exec: smallMove=%v override=%v maxDisp=%f threshold=%f steps=%d", - smallMove, interpolateOverride, disp, smallMoveRad, len(inputs)) - err = armComp.MoveThroughJointPositions(ctx, inputs, nil, map[string]interface{}{ - "waitAtEnd": smallMove, - "interpolate": smallMove, - }) + if interpolateOverride { + err = armComp.MoveThroughJointPositions(ctx, smoothed, nil, map[string]interface{}{ + "waitAtEnd": true, + "interpolate": true, + }) + } else { + err = armComp.MoveThroughJointPositions(ctx, smoothed, nil, map[string]interface{}{ + "waitAtEnd": false, + "interpolate": false, + }) + } } else { - tp.logger.CInfof(ctx, "teleop exec: component is not arm.Arm (type %T), falling back to GoToInputs", r) - err = ie.GoToInputs(ctx, inputs...) + err = ie.GoToInputs(ctx, smoothed...) } if err != nil { if actuator, ok := r.(inputEnabledActuator); ok { @@ -340,7 +345,7 @@ func (tp *teleopPipeline) executeTeleop(ctx context.Context, ms *builtIn, traj m } errs[i] = err } - }(idx, ie, inputs, r, startInputs[name]) + }(idx, ie, smoothed, r) idx++ } wg.Wait() @@ -461,6 +466,7 @@ func (ms *builtIn) addTeleopComponent(cmdCtx context.Context, req motion.MoveReq notify: make(chan struct{}, 1), trajCh: make(chan motionplan.Trajectory, 1), planningHead: fsInputs, + smoothedJoints: make(map[string][]referenceframe.Input), } comp := &teleopComponent{ From e31afddc6ed4429fab2025d35435c9eb46891e5f Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Mon, 13 Apr 2026 16:53:37 -0400 Subject: [PATCH 22/23] update planning head - wip --- services/motion/builtin/teleop.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/services/motion/builtin/teleop.go b/services/motion/builtin/teleop.go index 3cf0a22ffd5..09aecb7796d 100644 --- a/services/motion/builtin/teleop.go +++ b/services/motion/builtin/teleop.go @@ -399,6 +399,13 @@ func (tp *teleopPipeline) runExecutor(ctx context.Context, ms *builtIn) { tp.resetPlanningHead(ctx, ms) } else { tp.lastErr.Store(nil) + // Update planning head to smoothed positions (what was actually + // sent to the arm) so the next plan starts from reality. + tp.planningHeadMu.Lock() + for name, joints := range tp.smoothedJoints { + tp.planningHead[name] = joints + } + tp.planningHeadMu.Unlock() } } } From c4e88c18bb869aed5b5038dd446e2acd6143a119 Mon Sep 17 00:00:00 2001 From: Clint Purser Date: Mon, 13 Apr 2026 18:21:25 -0400 Subject: [PATCH 23/23] fix crash --- services/motion/builtin/teleop.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/services/motion/builtin/teleop.go b/services/motion/builtin/teleop.go index 09aecb7796d..0d3aaac4865 100644 --- a/services/motion/builtin/teleop.go +++ b/services/motion/builtin/teleop.go @@ -129,12 +129,16 @@ func planningHeadEqual(a, b referenceframe.FrameSystemInputs) bool { } func (tp *teleopPipeline) planOnce(ctx context.Context, ms *builtIn) { - // Snapshot the planning head. + // Snapshot the planning head under RLock. We need both a copy for safe iteration + // and for the stale-check after planning completes (line ~195). tp.planningHeadMu.RLock() - planningHead := tp.planningHead + planningHead := make(referenceframe.FrameSystemInputs, len(tp.planningHead)) + for k, v := range tp.planningHead { + planningHead[k] = v + } tp.planningHeadMu.RUnlock() - // Build merged inputs from cached base + planning head. + // Build merged inputs from cached base + planning head snapshot. inputsStart := time.Now() mergedInputs := make(referenceframe.FrameSystemInputs, len(tp.cachedBaseInputs)) for k, v := range tp.cachedBaseInputs {