From 12432a736f0f74c7fc91686513d2d9bbe57e65f7 Mon Sep 17 00:00:00 2001 From: gloriacai01 Date: Tue, 7 Apr 2026 14:06:57 -0400 Subject: [PATCH 1/2] poc --- go.mod | 2 + go.sum | 2 - services/datamanager/builtin/builtin.go | 8 ++ services/datamanager/builtin/sync/sync.go | 140 ++++++++++++++++++++-- services/datamanager/client.go | 40 +++++++ services/datamanager/data_manager.go | 15 +++ services/datamanager/server.go | 22 ++++ 7 files changed, 214 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index b42ab180f11..25a82d2b630 100644 --- a/go.mod +++ b/go.mod @@ -351,3 +351,5 @@ require ( github.com/ziutek/mymysql v1.5.4 // indirect golang.org/x/exp v0.0.0-20251113190631-e25ba8c21ef6 ) + +replace go.viam.com/api => ../api diff --git a/go.sum b/go.sum index 93ea18ace53..21910cda3ce 100644 --- a/go.sum +++ b/go.sum @@ -1158,8 +1158,6 @@ go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -go.viam.com/api v0.1.537 h1:KMoahNoyRA29rQAFVkr0e39xDvNio5sash3ba/UdKjo= -go.viam.com/api v0.1.537/go.mod h1:qSrz3j4+QlXvw7ANs1G2fsX532vUQzLBbgaTxMu3lAw= go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug= go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI= go.viam.com/utils v0.4.17 h1:k+sBmoSZDl7bOu5lL+dwrc+kd4E6wIebUmT5GHMPg4A= diff --git a/services/datamanager/builtin/builtin.go b/services/datamanager/builtin/builtin.go index 84f6099f48a..b2c5c1087a2 100644 --- a/services/datamanager/builtin/builtin.go +++ b/services/datamanager/builtin/builtin.go @@ -426,6 +426,14 @@ func (b *builtIn) UploadImageToDatasets(ctx context.Context, return b.sync.UploadBinaryDataToDatasets(ctx, imgBytes, datasetIDs, tags, mimeType) } +func (b *builtIn) UploadPath(ctx context.Context, path string, tags, datasetIDs []string, onProgress func(datamanager.UploadPathProgress), _ map[string]interface{}) error { + b.logger.Debug("UploadPath START") + defer b.logger.Debug("UploadPath END") + b.mu.Lock() + defer b.mu.Unlock() + return b.sync.UploadPathNow(ctx, path, tags, datasetIDs, onProgress) +} + type dataManagerStats struct { SyncPaths syncPathsSummary DiskUsage diskUsageSummary diff --git a/services/datamanager/builtin/sync/sync.go b/services/datamanager/builtin/sync/sync.go index 6f16a5c9831..7d141176b16 100644 --- a/services/datamanager/builtin/sync/sync.go +++ b/services/datamanager/builtin/sync/sync.go @@ -435,13 +435,13 @@ func (s *Sync) syncFile(config Config, filePath string) { } if data.IsDataCaptureFile(f) { - s.syncDataCaptureFile(f, config.CaptureDir, s.logger) + s.syncDataCaptureFile(s.configCtx, f, config.CaptureDir, s.logger) //nolint:errcheck } else { - s.syncArbitraryFile(f, config.Tags, []string{}, config.FileLastModifiedMillis, s.logger) + s.syncArbitraryFile(s.configCtx, f, config.Tags, []string{}, config.FileLastModifiedMillis, s.logger, &s.uploadStats.arbitrary.uploadingBytes) //nolint:errcheck } } -func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging.Logger) { +func (s *Sync) syncDataCaptureFile(ctx context.Context, f *os.File, captureDir string, logger logging.Logger) error { captureFile, err := data.ReadCaptureFile(f) // if you can't read the capture file's metadata field, close & move it to the failed directory if err != nil { @@ -455,7 +455,7 @@ func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging s.logger.Error(err) } s.uploadStats.tabular.uploadFailedFileCount.Add(1) - return + return cause } isBinary := captureFile.ReadMetadata().GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR @@ -467,7 +467,7 @@ func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging } // setup a retry struct that will try to upload the capture file - retry := newExponentialRetry(s.configCtx, s.clock, s.logger, f.Name(), func(ctx context.Context) (uint64, error) { + retry := newExponentialRetry(ctx, s.clock, s.logger, f.Name(), func(ctx context.Context) (uint64, error) { msg := "error uploading data capture file %s, size: %s, md: %s" errMetadata := fmt.Sprintf(msg, captureFile.GetPath(), data.FormatBytesI64(captureFile.Size()), captureFile.ReadMetadata()) bytesUploaded, err := uploadDataCaptureFile(ctx, captureFile, s.cloudConn, logger, uploadingBytesCounter) @@ -488,7 +488,7 @@ func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging // if we stopped due to a cancelled context, // return without deleting the file or moving it to the failed directory if errors.Is(err, context.Canceled) { - return + return err } // otherwise we hit a terminal error, and we should move the file to the failed directory @@ -500,7 +500,7 @@ func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging } else { s.uploadStats.tabular.uploadFailedFileCount.Add(1) } - return + return err } // file was successfully uploaded, delete it and log an error if unable to delete @@ -514,13 +514,14 @@ func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging s.uploadStats.tabular.uploadedFileCount.Add(1) s.uploadStats.tabular.completedUploadBytes.Add(bytesUploaded) } + return nil } -func (s *Sync) syncArbitraryFile(f *os.File, tags, datasetIDs []string, fileLastModifiedMillis int, logger logging.Logger) { - retry := newExponentialRetry(s.configCtx, s.clock, s.logger, f.Name(), func(ctx context.Context) (uint64, error) { +func (s *Sync) syncArbitraryFile(ctx context.Context, f *os.File, tags, datasetIDs []string, fileLastModifiedMillis int, logger logging.Logger, bytesUploadingCounter *atomic.Uint64) error { + retry := newExponentialRetry(ctx, s.clock, s.logger, f.Name(), func(ctx context.Context) (uint64, error) { errMetadata := fmt.Sprintf("error uploading arbitrary file %s", f.Name()) bytesUploaded, err := uploadArbitraryFile( - ctx, f, s.cloudConn, tags, datasetIDs, fileLastModifiedMillis, s.clock, logger, &s.uploadStats.arbitrary.uploadingBytes, + ctx, f, s.cloudConn, tags, datasetIDs, fileLastModifiedMillis, s.clock, logger, bytesUploadingCounter, ) if err != nil { return 0, errors.Wrap(err, errMetadata) @@ -538,7 +539,7 @@ func (s *Sync) syncArbitraryFile(f *os.File, tags, datasetIDs []string, fileLast // if we stopped due to a cancelled context, // return without deleting the file or moving it to the failed directory if errors.Is(err, context.Canceled) { - return + return err } // otherwise we hit a terminal error, and we should move the file to the failed directory @@ -546,7 +547,7 @@ func (s *Sync) syncArbitraryFile(f *os.File, tags, datasetIDs []string, fileLast logger.Error(err.Error()) } s.uploadStats.arbitrary.uploadFailedFileCount.Add(1) - return + return err } if err := f.Close(); err != nil { @@ -558,6 +559,7 @@ func (s *Sync) syncArbitraryFile(f *os.File, tags, datasetIDs []string, fileLast } s.uploadStats.arbitrary.uploadedFileCount.Add(1) s.uploadStats.arbitrary.completedUploadBytes.Add(bytesUploaded) + return nil } // UploadBinaryDataToDatasets simultaneously uploads binary data and adds it to a dataset. @@ -591,12 +593,124 @@ func (s *Sync) UploadBinaryDataToDatasets(ctx context.Context, binaryData []byte } // Since we wrote to the file, the file last modified time should be 0, indicating we should wait no time // before deciding this file is ready for upload and is not still being written to. - s.syncArbitraryFile(f, tags, datasetIDs, 0, s.logger) + s.syncArbitraryFile(ctx, f, tags, datasetIDs, 0, s.logger, &s.uploadStats.arbitrary.uploadingBytes) //nolint:errcheck }() return <-errChan } +// UploadPathNow uploads a file or directory at path to the cloud, streaming per-file progress +// via onProgress. For directories, all files are attempted and errors collected with multierr. +// Each file is deleted on success and moved to the failed directory on terminal error. +func (s *Sync) UploadPathNow(ctx context.Context, path string, tags, datasetIDs []string, onProgress func(datamanager.UploadPathProgress)) error { + select { + case <-s.cloudConn.ready: + default: + return errors.New("not connected to the cloud") + } + + info, err := os.Stat(path) + if err != nil { + return err + } + + s.configMu.Lock() + captureDir := s.config.CaptureDir + s.configMu.Unlock() + + if info.IsDir() { + var errs []error + err := filepath.Walk(path, func(filePath string, fi os.FileInfo, err error) error { + if err != nil { + return nil + } + if fi.IsDir() { + return nil + } + if uploadErr := s.uploadPathNow(ctx, filePath, captureDir, tags, datasetIDs, onProgress); uploadErr != nil { + s.logger.Errorw("failed to upload file", "path", filePath, "error", uploadErr) + errs = append(errs, uploadErr) + } + return ctx.Err() + }) + return multierr.Combine(append(errs, err)...) + } + return s.uploadPathNow(ctx, path, captureDir, tags, datasetIDs, onProgress) +} + +func (s *Sync) uploadPathNow(ctx context.Context, path, captureDir string, tags, datasetIDs []string, onProgress func(datamanager.UploadPathProgress)) error { + if filepath.Ext(path) == data.InProgressCaptureFileExt { + return errors.Errorf("cannot upload in-progress capture file: %s", path) + } + + if !s.fileTracker.markInProgress(path) { + return errors.Errorf("file is already being synced: %s", path) + } + defer s.fileTracker.unmarkInProgress(path) + + info, err := os.Stat(path) + if err != nil { + return err + } + bytesTotal := uint64(info.Size()) + + //nolint:gosec + f, err := os.Open(path) + if err != nil { + return err + } + + if data.IsDataCaptureFile(f) { + // capture files don't support intermediate progress — emit a single completion message + err := s.syncDataCaptureFile(ctx, f, captureDir, s.logger) + if onProgress != nil { + onProgress(datamanager.UploadPathProgress{ + Path: path, + BytesTotal: bytesTotal, + Success: err == nil, + Err: err, + }) + } + return err + } + + // for arbitrary files, poll the per-file bytes counter and stream progress + var perFileCounter atomic.Uint64 + progressCtx, stopProgress := context.WithCancel(ctx) + defer stopProgress() + if onProgress != nil { + go func() { + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-progressCtx.Done(): + return + case <-ticker.C: + onProgress(datamanager.UploadPathProgress{ + Path: path, + BytesUploaded: perFileCounter.Load(), + BytesTotal: bytesTotal, + }) + } + } + }() + } + + err = s.syncArbitraryFile(ctx, f, tags, datasetIDs, 0, s.logger, &perFileCounter) + stopProgress() + if onProgress != nil { + onProgress(datamanager.UploadPathProgress{ + Path: path, + BytesUploaded: bytesTotal, + BytesTotal: bytesTotal, + Success: err == nil, + Err: err, + }) + } + return err +} + // moveFailedData takes any data that could not be synced in the parentDir and // moves it to a new subdirectory "failed" that will not be synced. func moveFailedData(path, parentDir string, cause error, logger logging.Logger) error { diff --git a/services/datamanager/client.go b/services/datamanager/client.go index 8780ad79000..d3c37dddf44 100644 --- a/services/datamanager/client.go +++ b/services/datamanager/client.go @@ -8,6 +8,7 @@ import ( "image" "image/jpeg" "image/png" + "io" datasyncpb "go.viam.com/api/app/datasync/v1" pb "go.viam.com/api/service/datamanager/v1" @@ -63,6 +64,45 @@ func (c *client) DoCommand(ctx context.Context, cmd map[string]interface{}) (map return rprotoutils.DoFromResourceClient(ctx, c.client, c.name, cmd) } +func (c *client) UploadPath(ctx context.Context, path string, tags, datasetIDs []string, onProgress func(UploadPathProgress), extra map[string]interface{}) error { + extraPb, err := protoutils.StructToStructPb(extra) + if err != nil { + return err + } + stream, err := c.client.UploadPath(ctx, &pb.UploadPathRequest{ + Name: c.name, + Path: path, + Tags: tags, + DatasetIds: datasetIDs, + Extra: extraPb, + }) + if err != nil { + return err + } + for { + resp, err := stream.Recv() + if errors.Is(err, io.EOF) { + return nil + } + if err != nil { + return err + } + if onProgress != nil { + var progressErr error + if resp.GetError() != "" { + progressErr = errors.New(resp.GetError()) + } + onProgress(UploadPathProgress{ + Path: resp.GetPath(), + BytesUploaded: resp.GetBytesUploaded(), + BytesTotal: resp.GetBytesTotal(), + Success: resp.GetSuccess(), + Err: progressErr, + }) + } + } +} + func (c *client) Status(ctx context.Context) (map[string]interface{}, error) { return rprotoutils.GetStatusFromResourceClient(ctx, c.client, c.name) } diff --git a/services/datamanager/data_manager.go b/services/datamanager/data_manager.go index de641a97d6a..76d22dd9b68 100644 --- a/services/datamanager/data_manager.go +++ b/services/datamanager/data_manager.go @@ -56,6 +56,21 @@ type Service interface { mimeType datasyncpb.MimeType, extra map[string]interface{}) error UploadImageToDatasets(ctx context.Context, image image.Image, datasetIDs, tags []string, mimeType datasyncpb.MimeType, extra map[string]interface{}) error + // UploadPath uploads a file or directory at the given path to the cloud. + // onProgress is called with per-file progress updates and a final completion message per file. + // For directories, all files are attempted; errors are surfaced via the completion message. + UploadPath(ctx context.Context, path string, tags, datasetIDs []string, onProgress func(UploadPathProgress), extra map[string]interface{}) error +} + +// UploadPathProgress reports the upload progress of a single file. +// Intermediate progress messages have Success=false and Err=nil. +// The final message per file has Success=true on success, or Err set on failure. +type UploadPathProgress struct { + Path string + BytesUploaded uint64 + BytesTotal uint64 + Success bool + Err error } // SubtypeName is the name of the type of service. diff --git a/services/datamanager/server.go b/services/datamanager/server.go index 4ddc17e9a8d..4739a358311 100644 --- a/services/datamanager/server.go +++ b/services/datamanager/server.go @@ -50,6 +50,28 @@ func (server *serviceServer) UploadBinaryDataToDatasets( return &pb.UploadBinaryDataToDatasetsResponse{}, nil } +func (server *serviceServer) UploadPath(req *pb.UploadPathRequest, stream pb.DataManagerService_UploadPathServer) error { + svc, err := server.coll.Resource(req.Name) + if err != nil { + return err + } + return svc.UploadPath(stream.Context(), req.GetPath(), req.GetTags(), req.GetDatasetIds(), + func(p UploadPathProgress) { + errStr := "" + if p.Err != nil { + errStr = p.Err.Error() + } + //nolint:errcheck + stream.Send(&pb.UploadPathResponse{ + Path: p.Path, + BytesUploaded: p.BytesUploaded, + BytesTotal: p.BytesTotal, + Success: p.Success, + Error: errStr, + }) + }, req.Extra.AsMap()) +} + // DoCommand receives arbitrary commands. func (server *serviceServer) DoCommand(ctx context.Context, req *commonpb.DoCommandRequest, From d13b965e03bdbb0e2f71ce1634d7f20f0dafcfeb Mon Sep 17 00:00:00 2001 From: gloriacai01 Date: Fri, 17 Apr 2026 10:18:12 -0400 Subject: [PATCH 2/2] api on robotservice --- robot/client/client.go | 14 +++ robot/impl/local_robot.go | 24 ++++ robot/robot.go | 7 ++ robot/server/server.go | 16 +++ services/datamanager/builtin/builtin.go | 8 +- services/datamanager/builtin/sync/sync.go | 133 ++++++++-------------- services/datamanager/client.go | 40 ------- services/datamanager/data_manager.go | 15 --- services/datamanager/server.go | 22 ---- 9 files changed, 110 insertions(+), 169 deletions(-) diff --git a/robot/client/client.go b/robot/client/client.go index 9a28cf84f6d..7459d094093 100644 --- a/robot/client/client.go +++ b/robot/client/client.go @@ -24,6 +24,7 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" "go.uber.org/zap/zapcore" + datasyncpb "go.viam.com/api/app/datasync/v1" commonpb "go.viam.com/api/common/v1" pb "go.viam.com/api/robot/v1" "go.viam.com/utils" @@ -1336,6 +1337,19 @@ func (rc *RobotClient) SendTraces(ctx context.Context, spans []*otlpv1.ResourceS return err } +// UploadDataFromPath uploads a file or directory to the cloud via the robot's data manager. +func (rc *RobotClient) UploadDataFromPath(ctx context.Context, path string, uploadMetadata *datasyncpb.UploadMetadata, isSequence bool) (filesUploaded, filesFailed, bytesUploaded, bytesTotal uint64, err error) { + resp, err := rc.client.UploadDataFromPath(ctx, &pb.UploadDataFromPathRequest{ + Path: path, + UploadMetadata: uploadMetadata, + IsSequence: isSequence, + }) + if err != nil { + return 0, 0, 0, 0, err + } + return resp.GetFilesUploaded(), resp.GetFilesFailed(), resp.GetBytesUploaded(), resp.GetBytesTotal(), nil +} + // Tunnel tunnels data to/from the read writer from/to the destination port on the server. This // function will close the connection passed in as part of cleanup. func (rc *RobotClient) Tunnel(ctx context.Context, conn io.ReadWriteCloser, dest int) error { diff --git a/robot/impl/local_robot.go b/robot/impl/local_robot.go index 30d244b8f17..0962137d285 100644 --- a/robot/impl/local_robot.go +++ b/robot/impl/local_robot.go @@ -27,6 +27,7 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.37.0" otlpv1 "go.opentelemetry.io/proto/otlp/trace/v1" "go.uber.org/multierr" + datasyncpb "go.viam.com/api/app/datasync/v1" packagespb "go.viam.com/api/app/packages/v1" goutils "go.viam.com/utils" "go.viam.com/utils/perf" @@ -58,10 +59,16 @@ import ( "go.viam.com/rdk/robot/packages" "go.viam.com/rdk/robot/web" weboptions "go.viam.com/rdk/robot/web/options" + "go.viam.com/rdk/services/datamanager" "go.viam.com/rdk/session" "go.viam.com/rdk/utils" ) +// dataFromPathUploader is the subset of the data manager builtin used by UploadDataFromPath. +type dataFromPathUploader interface { + UploadDataFromPath(ctx context.Context, path string, uploadMetadata *datasyncpb.UploadMetadata, isSequence bool) (filesUploaded, filesFailed, bytesUploaded, bytesTotal uint64, err error) +} + const localConfigPartID = "local-config" var _ = robot.LocalRobot(&localRobot{}) @@ -158,6 +165,23 @@ func (r *localRobot) RemoteByName(name string) (robot.Robot, bool) { return r.manager.RemoteByName(name) } +// UploadDataFromPath uploads a file or directory to the cloud via the data manager service. +func (r *localRobot) UploadDataFromPath(ctx context.Context, path string, uploadMetadata *datasyncpb.UploadMetadata, isSequence bool) (filesUploaded, filesFailed, bytesUploaded, bytesTotal uint64, err error) { + names := datamanager.NamesFromRobot(r) + if len(names) == 0 { + return 0, 0, 0, 0, errors.New("no data manager service configured") + } + svc, err := datamanager.FromRobot(r, names[0]) + if err != nil { + return 0, 0, 0, 0, err + } + uploader, ok := svc.(dataFromPathUploader) + if !ok { + return 0, 0, 0, 0, errors.New("data manager does not support UploadDataFromPath") + } + return uploader.UploadDataFromPath(ctx, path, uploadMetadata, isSequence) +} + // WriteTraceMessages writes trace spans to any configured exporters. func (r *localRobot) WriteTraceMessages(ctx context.Context, spans []*otlpv1.ResourceSpans) error { traceClients := r.traceClients.Load() diff --git a/robot/robot.go b/robot/robot.go index ff26a5532d7..81c46a246e5 100644 --- a/robot/robot.go +++ b/robot/robot.go @@ -12,6 +12,7 @@ import ( "github.com/jhump/protoreflect/dynamic" "github.com/pkg/errors" otlpv1 "go.opentelemetry.io/proto/otlp/trace/v1" + datasyncpb "go.viam.com/api/app/datasync/v1" "go.viam.com/rdk/cloud" "go.viam.com/rdk/config" @@ -196,6 +197,12 @@ type LocalRobot interface { // WriteTraceMessages writes trace spans to any configured exporters. WriteTraceMessages(context.Context, []*otlpv1.ResourceSpans) error + + // UploadDataFromPath uploads a file or directory at the given path to the cloud via the data manager. + // Returns aggregate upload counts. Returns an error if no data manager is configured. + // + // TODO: currently backed by robot.proto UploadDataFromPath RPC; in-process path bypasses gRPC. + UploadDataFromPath(ctx context.Context, path string, uploadMetadata *datasyncpb.UploadMetadata, isSequence bool) (filesUploaded, filesFailed, bytesUploaded, bytesTotal uint64, err error) } // A RemoteRobot is a Robot that was created through a connection. diff --git a/robot/server/server.go b/robot/server/server.go index 16b9d36f989..03e1deb7ece 100644 --- a/robot/server/server.go +++ b/robot/server/server.go @@ -70,6 +70,22 @@ func (s *Server) SendTraces(ctx context.Context, req *pb.SendTracesRequest) (*pb return nil, s.robot.WriteTraceMessages(ctx, req.ResourceSpans) } +// UploadDataFromPath uploads a file or directory at the given path to the cloud via the data manager. +func (s *Server) UploadDataFromPath(ctx context.Context, req *pb.UploadDataFromPathRequest) (*pb.UploadDataFromPathResponse, error) { + filesUploaded, filesFailed, bytesUploaded, bytesTotal, err := s.robot.UploadDataFromPath( + ctx, req.GetPath(), req.GetUploadMetadata(), req.GetIsSequence(), + ) + if err != nil { + return nil, err + } + return &pb.UploadDataFromPathResponse{ + FilesUploaded: filesUploaded, + FilesFailed: filesFailed, + BytesUploaded: bytesUploaded, + BytesTotal: bytesTotal, + }, nil +} + // Tunnel tunnels traffic to/from the client from/to a specified port on the server. func (s *Server) Tunnel(srv pb.RobotService_TunnelServer) error { req, err := srv.Recv() diff --git a/services/datamanager/builtin/builtin.go b/services/datamanager/builtin/builtin.go index b2c5c1087a2..2f405f2907e 100644 --- a/services/datamanager/builtin/builtin.go +++ b/services/datamanager/builtin/builtin.go @@ -426,12 +426,12 @@ func (b *builtIn) UploadImageToDatasets(ctx context.Context, return b.sync.UploadBinaryDataToDatasets(ctx, imgBytes, datasetIDs, tags, mimeType) } -func (b *builtIn) UploadPath(ctx context.Context, path string, tags, datasetIDs []string, onProgress func(datamanager.UploadPathProgress), _ map[string]interface{}) error { - b.logger.Debug("UploadPath START") - defer b.logger.Debug("UploadPath END") +func (b *builtIn) UploadDataFromPath(ctx context.Context, path string, uploadMetadata *v1.UploadMetadata, isSequence bool) (filesUploaded, filesFailed, bytesUploaded, bytesTotal uint64, err error) { + b.logger.Debug("UploadDataFromPath START") + defer b.logger.Debug("UploadDataFromPath END") b.mu.Lock() defer b.mu.Unlock() - return b.sync.UploadPathNow(ctx, path, tags, datasetIDs, onProgress) + return b.sync.UploadDataFromPath(ctx, path, uploadMetadata, isSequence) } type dataManagerStats struct { diff --git a/services/datamanager/builtin/sync/sync.go b/services/datamanager/builtin/sync/sync.go index 7d141176b16..2d8cb9f568a 100644 --- a/services/datamanager/builtin/sync/sync.go +++ b/services/datamanager/builtin/sync/sync.go @@ -599,116 +599,73 @@ func (s *Sync) UploadBinaryDataToDatasets(ctx context.Context, binaryData []byte return <-errChan } -// UploadPathNow uploads a file or directory at path to the cloud, streaming per-file progress -// via onProgress. For directories, all files are attempted and errors collected with multierr. -// Each file is deleted on success and moved to the failed directory on terminal error. -func (s *Sync) UploadPathNow(ctx context.Context, path string, tags, datasetIDs []string, onProgress func(datamanager.UploadPathProgress)) error { +// UploadDataFromPath uploads a file or directory at path to the cloud. +// For directories, all files are attempted; errors are collected per-file. +// Returns aggregate counts of files and bytes uploaded/failed. +func (s *Sync) UploadDataFromPath(ctx context.Context, path string, uploadMetadata *v1.UploadMetadata, _ bool) (filesUploaded, filesFailed, bytesUploaded, bytesTotal uint64, err error) { select { case <-s.cloudConn.ready: default: - return errors.New("not connected to the cloud") + return 0, 0, 0, 0, errors.New("not connected to the cloud") } info, err := os.Stat(path) if err != nil { - return err + return 0, 0, 0, 0, err } s.configMu.Lock() captureDir := s.config.CaptureDir s.configMu.Unlock() - if info.IsDir() { - var errs []error - err := filepath.Walk(path, func(filePath string, fi os.FileInfo, err error) error { - if err != nil { - return nil - } - if fi.IsDir() { - return nil - } - if uploadErr := s.uploadPathNow(ctx, filePath, captureDir, tags, datasetIDs, onProgress); uploadErr != nil { - s.logger.Errorw("failed to upload file", "path", filePath, "error", uploadErr) - errs = append(errs, uploadErr) - } - return ctx.Err() - }) - return multierr.Combine(append(errs, err)...) - } - return s.uploadPathNow(ctx, path, captureDir, tags, datasetIDs, onProgress) -} - -func (s *Sync) uploadPathNow(ctx context.Context, path, captureDir string, tags, datasetIDs []string, onProgress func(datamanager.UploadPathProgress)) error { - if filepath.Ext(path) == data.InProgressCaptureFileExt { - return errors.Errorf("cannot upload in-progress capture file: %s", path) - } - - if !s.fileTracker.markInProgress(path) { - return errors.Errorf("file is already being synced: %s", path) - } - defer s.fileTracker.unmarkInProgress(path) + tags := uploadMetadata.GetTags() - info, err := os.Stat(path) - if err != nil { - return err - } - bytesTotal := uint64(info.Size()) + uploadOne := func(filePath string) { + fi, statErr := os.Stat(filePath) + if statErr != nil { + filesFailed++ + return + } + bytesTotal += uint64(fi.Size()) - //nolint:gosec - f, err := os.Open(path) - if err != nil { - return err - } + //nolint:gosec + f, openErr := os.Open(filePath) + if openErr != nil { + filesFailed++ + return + } - if data.IsDataCaptureFile(f) { - // capture files don't support intermediate progress — emit a single completion message - err := s.syncDataCaptureFile(ctx, f, captureDir, s.logger) - if onProgress != nil { - onProgress(datamanager.UploadPathProgress{ - Path: path, - BytesTotal: bytesTotal, - Success: err == nil, - Err: err, - }) + var perFileCounter atomic.Uint64 + if data.IsDataCaptureFile(f) { + if syncErr := s.syncDataCaptureFile(ctx, f, captureDir, s.logger); syncErr != nil { + filesFailed++ + } else { + filesUploaded++ + bytesUploaded += uint64(fi.Size()) + } + } else { + if syncErr := s.syncArbitraryFile(ctx, f, tags, nil, 0, s.logger, &perFileCounter); syncErr != nil { + filesFailed++ + } else { + filesUploaded++ + bytesUploaded += perFileCounter.Load() + } } - return err } - // for arbitrary files, poll the per-file bytes counter and stream progress - var perFileCounter atomic.Uint64 - progressCtx, stopProgress := context.WithCancel(ctx) - defer stopProgress() - if onProgress != nil { - go func() { - ticker := time.NewTicker(500 * time.Millisecond) - defer ticker.Stop() - for { - select { - case <-progressCtx.Done(): - return - case <-ticker.C: - onProgress(datamanager.UploadPathProgress{ - Path: path, - BytesUploaded: perFileCounter.Load(), - BytesTotal: bytesTotal, - }) - } + if info.IsDir() { + filepath.Walk(path, func(filePath string, fi os.FileInfo, walkErr error) error { //nolint:errcheck + if walkErr != nil || fi.IsDir() { + return nil } - }() - } - - err = s.syncArbitraryFile(ctx, f, tags, datasetIDs, 0, s.logger, &perFileCounter) - stopProgress() - if onProgress != nil { - onProgress(datamanager.UploadPathProgress{ - Path: path, - BytesUploaded: bytesTotal, - BytesTotal: bytesTotal, - Success: err == nil, - Err: err, + uploadOne(filePath) + return ctx.Err() }) + } else { + uploadOne(path) } - return err + + return filesUploaded, filesFailed, bytesUploaded, bytesTotal, nil } // moveFailedData takes any data that could not be synced in the parentDir and diff --git a/services/datamanager/client.go b/services/datamanager/client.go index d3c37dddf44..8780ad79000 100644 --- a/services/datamanager/client.go +++ b/services/datamanager/client.go @@ -8,7 +8,6 @@ import ( "image" "image/jpeg" "image/png" - "io" datasyncpb "go.viam.com/api/app/datasync/v1" pb "go.viam.com/api/service/datamanager/v1" @@ -64,45 +63,6 @@ func (c *client) DoCommand(ctx context.Context, cmd map[string]interface{}) (map return rprotoutils.DoFromResourceClient(ctx, c.client, c.name, cmd) } -func (c *client) UploadPath(ctx context.Context, path string, tags, datasetIDs []string, onProgress func(UploadPathProgress), extra map[string]interface{}) error { - extraPb, err := protoutils.StructToStructPb(extra) - if err != nil { - return err - } - stream, err := c.client.UploadPath(ctx, &pb.UploadPathRequest{ - Name: c.name, - Path: path, - Tags: tags, - DatasetIds: datasetIDs, - Extra: extraPb, - }) - if err != nil { - return err - } - for { - resp, err := stream.Recv() - if errors.Is(err, io.EOF) { - return nil - } - if err != nil { - return err - } - if onProgress != nil { - var progressErr error - if resp.GetError() != "" { - progressErr = errors.New(resp.GetError()) - } - onProgress(UploadPathProgress{ - Path: resp.GetPath(), - BytesUploaded: resp.GetBytesUploaded(), - BytesTotal: resp.GetBytesTotal(), - Success: resp.GetSuccess(), - Err: progressErr, - }) - } - } -} - func (c *client) Status(ctx context.Context) (map[string]interface{}, error) { return rprotoutils.GetStatusFromResourceClient(ctx, c.client, c.name) } diff --git a/services/datamanager/data_manager.go b/services/datamanager/data_manager.go index 76d22dd9b68..de641a97d6a 100644 --- a/services/datamanager/data_manager.go +++ b/services/datamanager/data_manager.go @@ -56,21 +56,6 @@ type Service interface { mimeType datasyncpb.MimeType, extra map[string]interface{}) error UploadImageToDatasets(ctx context.Context, image image.Image, datasetIDs, tags []string, mimeType datasyncpb.MimeType, extra map[string]interface{}) error - // UploadPath uploads a file or directory at the given path to the cloud. - // onProgress is called with per-file progress updates and a final completion message per file. - // For directories, all files are attempted; errors are surfaced via the completion message. - UploadPath(ctx context.Context, path string, tags, datasetIDs []string, onProgress func(UploadPathProgress), extra map[string]interface{}) error -} - -// UploadPathProgress reports the upload progress of a single file. -// Intermediate progress messages have Success=false and Err=nil. -// The final message per file has Success=true on success, or Err set on failure. -type UploadPathProgress struct { - Path string - BytesUploaded uint64 - BytesTotal uint64 - Success bool - Err error } // SubtypeName is the name of the type of service. diff --git a/services/datamanager/server.go b/services/datamanager/server.go index 4739a358311..4ddc17e9a8d 100644 --- a/services/datamanager/server.go +++ b/services/datamanager/server.go @@ -50,28 +50,6 @@ func (server *serviceServer) UploadBinaryDataToDatasets( return &pb.UploadBinaryDataToDatasetsResponse{}, nil } -func (server *serviceServer) UploadPath(req *pb.UploadPathRequest, stream pb.DataManagerService_UploadPathServer) error { - svc, err := server.coll.Resource(req.Name) - if err != nil { - return err - } - return svc.UploadPath(stream.Context(), req.GetPath(), req.GetTags(), req.GetDatasetIds(), - func(p UploadPathProgress) { - errStr := "" - if p.Err != nil { - errStr = p.Err.Error() - } - //nolint:errcheck - stream.Send(&pb.UploadPathResponse{ - Path: p.Path, - BytesUploaded: p.BytesUploaded, - BytesTotal: p.BytesTotal, - Success: p.Success, - Error: errStr, - }) - }, req.Extra.AsMap()) -} - // DoCommand receives arbitrary commands. func (server *serviceServer) DoCommand(ctx context.Context, req *commonpb.DoCommandRequest,