Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,5 @@ require (
github.com/ziutek/mymysql v1.5.4 // indirect
golang.org/x/exp v0.0.0-20251113190631-e25ba8c21ef6
)

replace go.viam.com/api => ../api
14 changes: 14 additions & 0 deletions robot/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
datasyncpb "go.viam.com/api/app/datasync/v1"
commonpb "go.viam.com/api/common/v1"
pb "go.viam.com/api/robot/v1"
"go.viam.com/utils"
Expand Down Expand Up @@ -1336,6 +1337,19 @@ func (rc *RobotClient) SendTraces(ctx context.Context, spans []*otlpv1.ResourceS
return err
}

// UploadDataFromPath uploads a file or directory to the cloud via the robot's data manager.
func (rc *RobotClient) UploadDataFromPath(ctx context.Context, path string, uploadMetadata *datasyncpb.UploadMetadata, isSequence bool) (filesUploaded, filesFailed, bytesUploaded, bytesTotal uint64, err error) {
resp, err := rc.client.UploadDataFromPath(ctx, &pb.UploadDataFromPathRequest{
Path: path,
UploadMetadata: uploadMetadata,
IsSequence: isSequence,
})
if err != nil {
return 0, 0, 0, 0, err
}
return resp.GetFilesUploaded(), resp.GetFilesFailed(), resp.GetBytesUploaded(), resp.GetBytesTotal(), nil
}

// Tunnel tunnels data to/from the read writer from/to the destination port on the server. This
// function will close the connection passed in as part of cleanup.
func (rc *RobotClient) Tunnel(ctx context.Context, conn io.ReadWriteCloser, dest int) error {
Expand Down
24 changes: 24 additions & 0 deletions robot/impl/local_robot.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
otlpv1 "go.opentelemetry.io/proto/otlp/trace/v1"
"go.uber.org/multierr"
datasyncpb "go.viam.com/api/app/datasync/v1"
packagespb "go.viam.com/api/app/packages/v1"
goutils "go.viam.com/utils"
"go.viam.com/utils/perf"
Expand Down Expand Up @@ -58,10 +59,16 @@ import (
"go.viam.com/rdk/robot/packages"
"go.viam.com/rdk/robot/web"
weboptions "go.viam.com/rdk/robot/web/options"
"go.viam.com/rdk/services/datamanager"
"go.viam.com/rdk/session"
"go.viam.com/rdk/utils"
)

// dataFromPathUploader is the subset of the data manager builtin used by UploadDataFromPath.
type dataFromPathUploader interface {
UploadDataFromPath(ctx context.Context, path string, uploadMetadata *datasyncpb.UploadMetadata, isSequence bool) (filesUploaded, filesFailed, bytesUploaded, bytesTotal uint64, err error)
}

const localConfigPartID = "local-config"

var _ = robot.LocalRobot(&localRobot{})
Expand Down Expand Up @@ -158,6 +165,23 @@ func (r *localRobot) RemoteByName(name string) (robot.Robot, bool) {
return r.manager.RemoteByName(name)
}

// UploadDataFromPath uploads a file or directory to the cloud via the data manager service.
func (r *localRobot) UploadDataFromPath(ctx context.Context, path string, uploadMetadata *datasyncpb.UploadMetadata, isSequence bool) (filesUploaded, filesFailed, bytesUploaded, bytesTotal uint64, err error) {
names := datamanager.NamesFromRobot(r)
if len(names) == 0 {
return 0, 0, 0, 0, errors.New("no data manager service configured")
}
svc, err := datamanager.FromRobot(r, names[0])
if err != nil {
return 0, 0, 0, 0, err
}
uploader, ok := svc.(dataFromPathUploader)
if !ok {
return 0, 0, 0, 0, errors.New("data manager does not support UploadDataFromPath")
}
return uploader.UploadDataFromPath(ctx, path, uploadMetadata, isSequence)
}

// WriteTraceMessages writes trace spans to any configured exporters.
func (r *localRobot) WriteTraceMessages(ctx context.Context, spans []*otlpv1.ResourceSpans) error {
traceClients := r.traceClients.Load()
Expand Down
7 changes: 7 additions & 0 deletions robot/robot.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/jhump/protoreflect/dynamic"
"github.com/pkg/errors"
otlpv1 "go.opentelemetry.io/proto/otlp/trace/v1"
datasyncpb "go.viam.com/api/app/datasync/v1"

"go.viam.com/rdk/cloud"
"go.viam.com/rdk/config"
Expand Down Expand Up @@ -196,6 +197,12 @@ type LocalRobot interface {

// WriteTraceMessages writes trace spans to any configured exporters.
WriteTraceMessages(context.Context, []*otlpv1.ResourceSpans) error

// UploadDataFromPath uploads a file or directory at the given path to the cloud via the data manager.
// Returns aggregate upload counts. Returns an error if no data manager is configured.
//
// TODO: currently backed by robot.proto UploadDataFromPath RPC; in-process path bypasses gRPC.
UploadDataFromPath(ctx context.Context, path string, uploadMetadata *datasyncpb.UploadMetadata, isSequence bool) (filesUploaded, filesFailed, bytesUploaded, bytesTotal uint64, err error)
}

// A RemoteRobot is a Robot that was created through a connection.
Expand Down
16 changes: 16 additions & 0 deletions robot/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,22 @@ func (s *Server) SendTraces(ctx context.Context, req *pb.SendTracesRequest) (*pb
return nil, s.robot.WriteTraceMessages(ctx, req.ResourceSpans)
}

// UploadDataFromPath uploads a file or directory at the given path to the cloud via the data manager.
func (s *Server) UploadDataFromPath(ctx context.Context, req *pb.UploadDataFromPathRequest) (*pb.UploadDataFromPathResponse, error) {
filesUploaded, filesFailed, bytesUploaded, bytesTotal, err := s.robot.UploadDataFromPath(
ctx, req.GetPath(), req.GetUploadMetadata(), req.GetIsSequence(),
)
if err != nil {
return nil, err
}
return &pb.UploadDataFromPathResponse{
FilesUploaded: filesUploaded,
FilesFailed: filesFailed,
BytesUploaded: bytesUploaded,
BytesTotal: bytesTotal,
}, nil
}

// Tunnel tunnels traffic to/from the client from/to a specified port on the server.
func (s *Server) Tunnel(srv pb.RobotService_TunnelServer) error {
req, err := srv.Recv()
Expand Down
8 changes: 8 additions & 0 deletions services/datamanager/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,14 @@ func (b *builtIn) UploadImageToDatasets(ctx context.Context,
return b.sync.UploadBinaryDataToDatasets(ctx, imgBytes, datasetIDs, tags, mimeType)
}

func (b *builtIn) UploadDataFromPath(ctx context.Context, path string, uploadMetadata *v1.UploadMetadata, isSequence bool) (filesUploaded, filesFailed, bytesUploaded, bytesTotal uint64, err error) {
b.logger.Debug("UploadDataFromPath START")
defer b.logger.Debug("UploadDataFromPath END")
b.mu.Lock()
defer b.mu.Unlock()
return b.sync.UploadDataFromPath(ctx, path, uploadMetadata, isSequence)
}

type dataManagerStats struct {
SyncPaths syncPathsSummary
DiskUsage diskUsageSummary
Expand Down
97 changes: 84 additions & 13 deletions services/datamanager/builtin/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,13 +435,13 @@ func (s *Sync) syncFile(config Config, filePath string) {
}

if data.IsDataCaptureFile(f) {
s.syncDataCaptureFile(f, config.CaptureDir, s.logger)
s.syncDataCaptureFile(s.configCtx, f, config.CaptureDir, s.logger) //nolint:errcheck
} else {
s.syncArbitraryFile(f, config.Tags, []string{}, config.FileLastModifiedMillis, s.logger)
s.syncArbitraryFile(s.configCtx, f, config.Tags, []string{}, config.FileLastModifiedMillis, s.logger, &s.uploadStats.arbitrary.uploadingBytes) //nolint:errcheck
}
}

func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging.Logger) {
func (s *Sync) syncDataCaptureFile(ctx context.Context, f *os.File, captureDir string, logger logging.Logger) error {
captureFile, err := data.ReadCaptureFile(f)
// if you can't read the capture file's metadata field, close & move it to the failed directory
if err != nil {
Expand All @@ -455,7 +455,7 @@ func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging
s.logger.Error(err)
}
s.uploadStats.tabular.uploadFailedFileCount.Add(1)
return
return cause
}
isBinary := captureFile.ReadMetadata().GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR

Expand All @@ -467,7 +467,7 @@ func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging
}

// setup a retry struct that will try to upload the capture file
retry := newExponentialRetry(s.configCtx, s.clock, s.logger, f.Name(), func(ctx context.Context) (uint64, error) {
retry := newExponentialRetry(ctx, s.clock, s.logger, f.Name(), func(ctx context.Context) (uint64, error) {
msg := "error uploading data capture file %s, size: %s, md: %s"
errMetadata := fmt.Sprintf(msg, captureFile.GetPath(), data.FormatBytesI64(captureFile.Size()), captureFile.ReadMetadata())
bytesUploaded, err := uploadDataCaptureFile(ctx, captureFile, s.cloudConn, logger, uploadingBytesCounter)
Expand All @@ -488,7 +488,7 @@ func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging
// if we stopped due to a cancelled context,
// return without deleting the file or moving it to the failed directory
if errors.Is(err, context.Canceled) {
return
return err
}

// otherwise we hit a terminal error, and we should move the file to the failed directory
Expand All @@ -500,7 +500,7 @@ func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging
} else {
s.uploadStats.tabular.uploadFailedFileCount.Add(1)
}
return
return err
}

// file was successfully uploaded, delete it and log an error if unable to delete
Expand All @@ -514,13 +514,14 @@ func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging
s.uploadStats.tabular.uploadedFileCount.Add(1)
s.uploadStats.tabular.completedUploadBytes.Add(bytesUploaded)
}
return nil
}

func (s *Sync) syncArbitraryFile(f *os.File, tags, datasetIDs []string, fileLastModifiedMillis int, logger logging.Logger) {
retry := newExponentialRetry(s.configCtx, s.clock, s.logger, f.Name(), func(ctx context.Context) (uint64, error) {
func (s *Sync) syncArbitraryFile(ctx context.Context, f *os.File, tags, datasetIDs []string, fileLastModifiedMillis int, logger logging.Logger, bytesUploadingCounter *atomic.Uint64) error {
retry := newExponentialRetry(ctx, s.clock, s.logger, f.Name(), func(ctx context.Context) (uint64, error) {
errMetadata := fmt.Sprintf("error uploading arbitrary file %s", f.Name())
bytesUploaded, err := uploadArbitraryFile(
ctx, f, s.cloudConn, tags, datasetIDs, fileLastModifiedMillis, s.clock, logger, &s.uploadStats.arbitrary.uploadingBytes,
ctx, f, s.cloudConn, tags, datasetIDs, fileLastModifiedMillis, s.clock, logger, bytesUploadingCounter,
)
if err != nil {
return 0, errors.Wrap(err, errMetadata)
Expand All @@ -538,15 +539,15 @@ func (s *Sync) syncArbitraryFile(f *os.File, tags, datasetIDs []string, fileLast
// if we stopped due to a cancelled context,
// return without deleting the file or moving it to the failed directory
if errors.Is(err, context.Canceled) {
return
return err
}

// otherwise we hit a terminal error, and we should move the file to the failed directory
if err := moveFailedData(f.Name(), path.Dir(f.Name()), err, logger); err != nil {
logger.Error(err.Error())
}
s.uploadStats.arbitrary.uploadFailedFileCount.Add(1)
return
return err
}

if err := f.Close(); err != nil {
Expand All @@ -558,6 +559,7 @@ func (s *Sync) syncArbitraryFile(f *os.File, tags, datasetIDs []string, fileLast
}
s.uploadStats.arbitrary.uploadedFileCount.Add(1)
s.uploadStats.arbitrary.completedUploadBytes.Add(bytesUploaded)
return nil
}

// UploadBinaryDataToDatasets simultaneously uploads binary data and adds it to a dataset.
Expand Down Expand Up @@ -591,12 +593,81 @@ func (s *Sync) UploadBinaryDataToDatasets(ctx context.Context, binaryData []byte
}
// Since we wrote to the file, the file last modified time should be 0, indicating we should wait no time
// before deciding this file is ready for upload and is not still being written to.
s.syncArbitraryFile(f, tags, datasetIDs, 0, s.logger)
s.syncArbitraryFile(ctx, f, tags, datasetIDs, 0, s.logger, &s.uploadStats.arbitrary.uploadingBytes) //nolint:errcheck
}()

return <-errChan
}

// UploadDataFromPath uploads a file or directory at path to the cloud.
// For directories, all files are attempted; errors are collected per-file.
// Returns aggregate counts of files and bytes uploaded/failed.
func (s *Sync) UploadDataFromPath(ctx context.Context, path string, uploadMetadata *v1.UploadMetadata, _ bool) (filesUploaded, filesFailed, bytesUploaded, bytesTotal uint64, err error) {
select {
case <-s.cloudConn.ready:
default:
return 0, 0, 0, 0, errors.New("not connected to the cloud")
}

info, err := os.Stat(path)
if err != nil {
return 0, 0, 0, 0, err
}

s.configMu.Lock()
captureDir := s.config.CaptureDir
s.configMu.Unlock()

tags := uploadMetadata.GetTags()

uploadOne := func(filePath string) {
fi, statErr := os.Stat(filePath)
if statErr != nil {
filesFailed++
return
}
bytesTotal += uint64(fi.Size())

//nolint:gosec
f, openErr := os.Open(filePath)
if openErr != nil {
filesFailed++
return
}

var perFileCounter atomic.Uint64
if data.IsDataCaptureFile(f) {
if syncErr := s.syncDataCaptureFile(ctx, f, captureDir, s.logger); syncErr != nil {
filesFailed++
} else {
filesUploaded++
bytesUploaded += uint64(fi.Size())
}
} else {
if syncErr := s.syncArbitraryFile(ctx, f, tags, nil, 0, s.logger, &perFileCounter); syncErr != nil {
filesFailed++
} else {
filesUploaded++
bytesUploaded += perFileCounter.Load()
}
}
}

if info.IsDir() {
filepath.Walk(path, func(filePath string, fi os.FileInfo, walkErr error) error { //nolint:errcheck
if walkErr != nil || fi.IsDir() {
return nil
}
uploadOne(filePath)
return ctx.Err()
})
} else {
uploadOne(path)
}

return filesUploaded, filesFailed, bytesUploaded, bytesTotal, nil
}

// moveFailedData takes any data that could not be synced in the parentDir and
// moves it to a new subdirectory "failed" that will not be synced.
func moveFailedData(path, parentDir string, cause error, logger logging.Logger) error {
Expand Down
Loading