-
Notifications
You must be signed in to change notification settings - Fork 820
Add file_extension field to BlobType #7009
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
ba4c792
cd7111a
c203375
9327eb0
32627a1
1ba7c15
5a0bb83
93ff903
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -9,6 +9,7 @@ import ( | |||||
| "path" | ||||||
| "path/filepath" | ||||||
| "reflect" | ||||||
| "regexp" | ||||||
| "strconv" | ||||||
| "strings" | ||||||
| "sync" | ||||||
|
|
@@ -26,13 +27,46 @@ import ( | |||||
| "github.com/flyteorg/flyte/flytestdlib/storage" | ||||||
| ) | ||||||
|
|
||||||
| var validFileExtensionRe = regexp.MustCompile(`^[a-zA-Z0-9]+(\.[a-zA-Z0-9]+)*$`) | ||||||
|
|
||||||
| type Downloader struct { | ||||||
| format core.DataLoadingConfig_LiteralMapFormat | ||||||
| store *storage.DataStore | ||||||
| // TODO support download mode | ||||||
| mode core.IOStrategy_DownloadMode | ||||||
| } | ||||||
|
|
||||||
| // By default, blobs (FlyteFiles) were not and still are not written with a | ||||||
| // file extension. For example, a data: FlyteFile["csv"] blob is written | ||||||
| // to "inputs/data", even though Format="csv". | ||||||
| // | ||||||
| // When FileExtension="" (the default), this old behavior is preserved. | ||||||
| // | ||||||
| // However, an input blob | ||||||
| // `data: Annotated[FlyteFile["csv"], FileDownloadConfig(file_extension="csv")]` | ||||||
| // should be written to "inputs/data.csv" (when FileExtension="csv" - new behavior). | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
nit |
||||||
| func resolveVarFilenames(ctx context.Context, vars *core.VariableMap) map[string]string { | ||||||
| varFilenames := make(map[string]string, len(vars.GetVariables())) | ||||||
| for varName, variable := range vars.GetVariables() { | ||||||
| varType := variable.GetType() | ||||||
| switch varType.GetType().(type) { | ||||||
| case *core.LiteralType_Blob: | ||||||
| ext := varType.GetBlob().GetFileExtension() | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we check |
||||||
| if ext == "" { | ||||||
| varFilenames[varName] = varName | ||||||
| } else if !validFileExtensionRe.MatchString(ext) { | ||||||
| logger.Warnf(ctx, "invalid file extension for variable %q [%q], ignoring...", varName, ext) | ||||||
| varFilenames[varName] = varName | ||||||
| } else { | ||||||
| varFilenames[varName] = varName + "." + ext | ||||||
| } | ||||||
| default: | ||||||
| varFilenames[varName] = varName | ||||||
| } | ||||||
| } | ||||||
| return varFilenames | ||||||
| } | ||||||
|
|
||||||
| // TODO add timeout and rate limit | ||||||
| // TODO use chunk to download | ||||||
| func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath string) (interface{}, error) { | ||||||
|
|
@@ -432,7 +466,7 @@ func (d Downloader) handleLiteral(ctx context.Context, lit *core.Literal, filePa | |||||
| if err != nil { | ||||||
| return nil, nil, errors.Wrapf(err, "failed to create directory [%s]", filePath) | ||||||
| } | ||||||
| v, m, err := d.RecursiveDownload(ctx, lit.GetMap(), filePath, writeToFile) | ||||||
| v, m, err := d.RecursiveDownload(ctx, lit.GetMap(), filePath, make(map[string]string), writeToFile) | ||||||
| if err != nil { | ||||||
| return nil, nil, err | ||||||
| } | ||||||
|
|
@@ -468,7 +502,7 @@ type downloadedResult struct { | |||||
| v interface{} | ||||||
| } | ||||||
|
|
||||||
| func (d Downloader) RecursiveDownload(ctx context.Context, inputs *core.LiteralMap, dir string, writePrimitiveToFile bool) (VarMap, *core.LiteralMap, error) { | ||||||
| func (d Downloader) RecursiveDownload(ctx context.Context, inputs *core.LiteralMap, dir string, varFilenames map[string]string, writePrimitiveToFile bool) (VarMap, *core.LiteralMap, error) { | ||||||
| childCtx, cancel := context.WithCancel(ctx) | ||||||
| defer cancel() | ||||||
| if inputs == nil || len(inputs.GetLiterals()) == 0 { | ||||||
|
|
@@ -486,7 +520,11 @@ func (d Downloader) RecursiveDownload(ctx context.Context, inputs *core.LiteralM | |||||
| } | ||||||
| logger.Infof(ctx, "read object at location [%s]", offloadedMetadataURI) | ||||||
| } | ||||||
| varPath := path.Join(dir, variable) | ||||||
| filename := variable | ||||||
| if varFilename, ok := varFilenames[variable]; ok { | ||||||
| filename = varFilename | ||||||
| } | ||||||
| varPath := path.Join(dir, filename) | ||||||
|
Comment on lines
+523
to
+527
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we get file extension through |
||||||
| lit := literal | ||||||
| f[variable] = futures.NewAsyncFuture(childCtx, func(ctx2 context.Context) (interface{}, error) { | ||||||
| v, lit, err := d.handleLiteral(ctx2, lit, varPath, writePrimitiveToFile) | ||||||
|
|
@@ -520,7 +558,7 @@ func (d Downloader) RecursiveDownload(ctx context.Context, inputs *core.LiteralM | |||||
| return vmap, m, nil | ||||||
| } | ||||||
|
|
||||||
| func (d Downloader) DownloadInputs(ctx context.Context, inputRef storage.DataReference, outputDir string) error { | ||||||
| func (d Downloader) DownloadInputs(ctx context.Context, vars *core.VariableMap, inputRef storage.DataReference, outputDir string) error { | ||||||
| logger.Infof(ctx, "Downloading inputs from [%s]", inputRef) | ||||||
| defer logger.Infof(ctx, "Exited downloading inputs from [%s]", inputRef) | ||||||
| if err := os.MkdirAll(outputDir, os.ModePerm); err != nil { | ||||||
|
|
@@ -533,7 +571,9 @@ func (d Downloader) DownloadInputs(ctx context.Context, inputRef storage.DataRef | |||||
| logger.Errorf(ctx, "Failed to download inputs from [%s], err [%s]", inputRef, err) | ||||||
| return errors.Wrapf(err, "failed to download input metadata message from remote store") | ||||||
| } | ||||||
| varMap, lMap, err := d.RecursiveDownload(ctx, inputs, outputDir, true) | ||||||
|
|
||||||
| varFilenames := resolveVarFilenames(ctx, vars) | ||||||
| varMap, lMap, err := d.RecursiveDownload(ctx, inputs, outputDir, varFilenames, true) | ||||||
| if err != nil { | ||||||
| return errors.Wrapf(err, "failed to download input variable from remote store") | ||||||
| } | ||||||
|
|
||||||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could we add a comment saying that this matches single and multi-part file extension (e.g.
tar.gz)?