fs: implement --metadata-mapper to transform metatadata with a user supplied program

This commit is contained in:
Nick Craig-Wood 2023-10-23 23:47:18 +01:00
parent 54196f34e3
commit 47ca0c326e
14 changed files with 423 additions and 51 deletions

View file

@ -596,7 +596,7 @@ func (f *Fs) updateMetadata(ctx context.Context, updateInfo *drive.File, meta fs
// Fetch metadata and update updateInfo if --metadata is in use
func (f *Fs) fetchAndUpdateMetadata(ctx context.Context, src fs.ObjectInfo, options []fs.OpenOption, updateInfo *drive.File, update bool) (callback updateMetadataFn, err error) {
meta, err := fs.GetMetadataOptions(ctx, src, options)
meta, err := fs.GetMetadataOptions(ctx, f, src, options)
if err != nil {
return nil, fmt.Errorf("failed to read metadata from source object: %w", err)
}

View file

@ -802,7 +802,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
headers["x-archive-size-hint"] = fmt.Sprintf("%d", size)
}
var mdata fs.Metadata
mdata, err = fs.GetMetadataOptions(ctx, src, options)
mdata, err = fs.GetMetadataOptions(ctx, o.fs, src, options)
if err == nil && mdata != nil {
for mk, mv := range mdata {
mk = strings.ToLower(mk)

View file

@ -1944,7 +1944,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
in = wrap(in)
}
// Fetch metadata if --metadata is in use
meta, err := fs.GetMetadataOptions(ctx, src, options)
meta, err := fs.GetMetadataOptions(ctx, o.fs, src, options)
if err != nil {
return fmt.Errorf("failed to read metadata from source object: %w", err)
}

View file

@ -1298,7 +1298,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
}
// Fetch and set metadata if --metadata is in use
meta, err := fs.GetMetadataOptions(ctx, src, options)
meta, err := fs.GetMetadataOptions(ctx, o.fs, src, options)
if err != nil {
return fmt.Errorf("failed to read metadata from source object: %w", err)
}

View file

@ -295,7 +295,7 @@ func (o *Object) prepareUpload(ctx context.Context, src fs.ObjectInfo, options [
// Set the mtime in the metadata
modTime := src.ModTime(ctx)
// Fetch metadata if --metadata is in use
meta, err := fs.GetMetadataOptions(ctx, src, options)
meta, err := fs.GetMetadataOptions(ctx, o.fs, src, options)
if err != nil {
return ui, fmt.Errorf("failed to read metadata from source object: %w", err)
}

View file

@ -6008,7 +6008,7 @@ func (o *Object) prepareUpload(ctx context.Context, src fs.ObjectInfo, options [
}
// Fetch metadata if --metadata is in use
meta, err := fs.GetMetadataOptions(ctx, src, options)
meta, err := fs.GetMetadataOptions(ctx, o.fs, src, options)
if err != nil {
return ui, fmt.Errorf("failed to read metadata from source object: %w", err)
}

24
bin/test_metadata_mapper.py Executable file
View file

@ -0,0 +1,24 @@
#!/usr/bin/env python3
"""
A demo metadata mapper
"""
import sys
import json
def main():
i = json.load(sys.stdin)
# Add tag to description
metadata = i["Metadata"]
if "description" in metadata:
metadata["description"] += " [migrated from domain1]"
else:
metadata["description"] = "[migrated from domain1]"
# Modify owner
if "owner" in metadata:
metadata["owner"] = metadata["owner"].replace("domain1.com", "domain2.com")
o = { "Metadata": metadata }
json.dump(o, sys.stdout, indent="\t")
if __name__ == "__main__":
main()

View file

@ -475,6 +475,10 @@ Note that arbitrary metadata may be added to objects using the
`--metadata-set key=value` flag when the object is first uploaded.
This flag can be repeated as many times as necessary.
The [--metadata-mapper](#metadata-mapper) flag can be used to pass the
name of a program in which can transform metadata when it is being
copied from source to destination.
### Types of metadata
Metadata is divided into two type. System metadata and User metadata.
@ -1504,12 +1508,123 @@ from reaching the limit. Only applicable for `--max-transfer`
Setting this flag enables rclone to copy the metadata from the source
to the destination. For local backends this is ownership, permissions,
xattr etc. See the [#metadata](metadata section) for more info.
xattr etc. See the [metadata section](#metadata) for more info.
### --metadata-mapper SpaceSepList {#metadata-mapper}
If you supply the parameter `--metadata-mapper /path/to/program` then
rclone will use that program to map metadata from source object to
destination object.
The argument to this flag should be a command with an optional space separated
list of arguments. If one of the arguments has a space in then enclose
it in `"`, if you want a literal `"` in an argument then enclose the
argument in `"` and double the `"`. See [CSV encoding](https://godoc.org/encoding/csv)
for more info.
--metadata-mapper "python bin/test_metadata_mapper.py"
--metadata-mapper 'python bin/test_metadata_mapper.py "argument with a space"'
--metadata-mapper 'python bin/test_metadata_mapper.py "argument with ""two"" quotes"'
This uses a simple JSON based protocol with input on STDIN and output
on STDOUT. This will be called for every file and directory copied and
may be called concurrently.
The program's job is to take a metadata blob on the input and turn it
into a metadata blob on the output suitable for the destination
backend.
Input to the program (via STDIN) might look like this. This provides
some context for the `Metadata` which may be important.
- `SrcFs` is the config string for the remote that the object is currently on.
- `SrcFsType` is the name of the source backend.
- `DstFs` is the config string for the remote that the object is being copied to
- `DstFsType` is the name of the destination backend.
- `Remote` is the path of the file relative to the root.
- `Size`, `MimeType`, `ModTime` are attributes of the file.
- `IsDir` is `true` if this is a directory (not yet implemented).
- `ID` is the source `ID` of the file if known.
- `Metadata` is the backend specific metadata as described in the backend docs.
```json
{
"SrcFs": "gdrive:",
"SrcFsType": "drive",
"DstFs": "newdrive:user",
"DstFsType": "onedrive",
"Remote": "test.txt",
"Size": 6,
"MimeType": "text/plain; charset=utf-8",
"ModTime": "2022-10-11T17:53:10.286745272+01:00",
"IsDir": false,
"ID": "xyz",
"Metadata": {
"btime": "2022-10-11T16:53:11Z",
"content-type": "text/plain; charset=utf-8",
"mtime": "2022-10-11T17:53:10.286745272+01:00",
"owner": "user1@domain1.com",
"permissions": "...",
"description": "my nice file",
"starred": "false"
}
}
```
The program should then modify the input as desired and send it to
STDOUT. The returned `Metadata` field will be used in its entirety for
the destination object. Any other fields will be ignored. Note in this
example we translate user names and permissions and add something to
the description:
```json
{
"Metadata": {
"btime": "2022-10-11T16:53:11Z",
"content-type": "text/plain; charset=utf-8",
"mtime": "2022-10-11T17:53:10.286745272+01:00",
"owner": "user1@domain2.com",
"permissions": "...",
"description": "my nice file [migrated from domain1]",
"starred": "false"
}
}
```
Metadata can be removed here too.
An example python program might look something like this to implement
the above transformations.
```python
import sys, json
i = json.load(sys.stdin)
metadata = i["Metadata"]
# Add tag to description
if "description" in metadata:
metadata["description"] += " [migrated from domain1]"
else:
metadata["description"] = "[migrated from domain1]"
# Modify owner
if "owner" in metadata:
metadata["owner"] = metadata["owner"].replace("domain1.com", "domain2.com")
o = { "Metadata": metadata }
json.dump(o, sys.stdout, indent="\t")
```
You can find this example (slightly expanded) in the rclone source code at
[bin/test_metadata_mapper.py](https://github.com/rclone/rclone/blob/master/test_metadata_mapper.py).
If you want to see the input to the metadata mapper and the output
returned from it in the log you can use `-vv --dump mapper`.
See the [metadata section](#metadata) for more info.
### --metadata-set key=value
Add metadata `key` = `value` when uploading. This can be repeated as
many times as required. See the [#metadata](metadata section) for more
many times as required. See the [metadata section](#metadata) for more
info.
### --modify-window=TIME ###
@ -1752,9 +1867,9 @@ for more info.
Eg
--password-command echo hello
--password-command echo "hello with space"
--password-command echo "hello with ""quotes"" and space"
--password-command "echo hello"
--password-command 'echo "hello with space"'
--password-command 'echo "hello with ""quotes"" and space"'
See the [Configuration Encryption](#configuration-encryption) for more info.
@ -2503,6 +2618,12 @@ This dumps a list of the open files at the end of the command. It
uses the `lsof` command to do that so you'll need that installed to
use it.
#### --dump mapper ####
This shows the JSON blobs being sent to the program supplied with
`--metadata-mapper` and received from it. It can be useful for
debugging the metadata mapper interface.
### --memprofile=FILE ###
Write memory profile to file. This can be analysed with `go tool pprof`.

View file

@ -149,6 +149,7 @@ type ConfigInfo struct {
DefaultTime Time // time that directories with no time should display
Inplace bool // Download directly to destination file instead of atomic download to temp/rename
PartialSuffix string
MetadataMapper SpaceSepList
}
// NewConfig creates a new config with everything set to the default

View file

@ -150,6 +150,7 @@ func AddFlags(ci *fs.ConfigInfo, flagSet *pflag.FlagSet) {
flags.FVarP(flagSet, &ci.DefaultTime, "default-time", "", "Time to show if modtime is unknown for files and directories", "Config,Listing")
flags.BoolVarP(flagSet, &ci.Inplace, "inplace", "", ci.Inplace, "Download directly to destination file instead of atomic download to temp/rename", "Copy")
flags.StringVarP(flagSet, &partialSuffix, "partial-suffix", "", ci.PartialSuffix, "Add partial-suffix to temporary file name when --inplace is not used", "Copy")
flags.FVarP(flagSet, &ci.MetadataMapper, "metadata-mapper", "", "Program to run to transforming metadata before upload", "Metadata")
}
// ParseHeaders converts the strings passed in via the header flags into HTTPOptions

View file

@ -13,6 +13,7 @@ const (
DumpFilters
DumpGoRoutines
DumpOpenFiles
DumpMapper
)
type dumpChoices struct{}
@ -27,6 +28,7 @@ func (dumpChoices) Choices() []BitsChoicesInfo {
{uint64(DumpFilters), "filters"},
{uint64(DumpGoRoutines), "goroutines"},
{uint64(DumpOpenFiles), "openfiles"},
{uint64(DumpMapper), "mapper"},
}
}

View file

@ -1,6 +1,14 @@
package fs
import "context"
import (
"bytes"
"context"
"encoding/json"
"fmt"
"os/exec"
"strings"
"time"
)
// Metadata represents Object metadata in a standardised form
//
@ -66,12 +74,85 @@ func GetMetadata(ctx context.Context, o ObjectInfo) (metadata Metadata, err erro
return do.Metadata(ctx)
}
// mapItem descripts the item to be mapped
type mapItem struct {
SrcFs string
SrcFsType string
DstFs string
DstFsType string
Remote string
Size int64
MimeType string `json:",omitempty"`
ModTime time.Time
IsDir bool
ID string `json:",omitempty"`
Metadata Metadata `json:",omitempty"`
}
// This runs an external program on the metadata which can be used to
// map it from one form to another.
func metadataMapper(ctx context.Context, cmdLine SpaceSepList, dstFs Fs, o ObjectInfo, metadata Metadata) (newMetadata Metadata, err error) {
ci := GetConfig(ctx)
cmd := exec.Command(cmdLine[0], cmdLine[1:]...)
in := mapItem{
DstFs: ConfigString(dstFs),
DstFsType: Type(dstFs),
Remote: o.Remote(),
Size: o.Size(),
MimeType: MimeType(ctx, o),
ModTime: o.ModTime(ctx),
IsDir: false,
Metadata: metadata,
}
fInfo := o.Fs()
if f, ok := fInfo.(Fs); ok {
in.SrcFs = ConfigString(f)
in.SrcFsType = Type(f)
} else {
in.SrcFs = fInfo.Name() + ":" + fInfo.Root()
in.SrcFsType = "unknown"
}
if do, ok := o.(IDer); ok {
in.ID = do.ID()
}
inBytes, err := json.MarshalIndent(in, "", "\t")
if err != nil {
return nil, fmt.Errorf("metadata mapper: failed to marshal input: %w", err)
}
if ci.Dump.IsSet(DumpMapper) {
Debugf(nil, "Metadata mapper sent: \n%s\n", string(inBytes))
}
var stdout, stderr bytes.Buffer
cmd.Stdin = bytes.NewBuffer(inBytes)
cmd.Stdout = &stdout
cmd.Stderr = &stderr
start := time.Now()
err = cmd.Run()
Debugf(o, "Calling metadata mapper %v", cmdLine)
duration := time.Since(start)
if err != nil {
return nil, fmt.Errorf("metadata mapper: failed on %v: %q: %w", cmdLine, strings.TrimSpace(stderr.String()), err)
}
if ci.Dump.IsSet(DumpMapper) {
Debugf(nil, "Metadata mapper received: \n%s\n", stdout.String())
}
var out mapItem
err = json.Unmarshal(stdout.Bytes(), &out)
if err != nil {
return nil, fmt.Errorf("metadata mapper: failed to read output: %q: %w", stdout.String(), err)
}
Debugf(o, "Metadata mapper returned in %v", duration)
return out.Metadata, nil
}
// GetMetadataOptions from an ObjectInfo and merge it with any in options
//
// If --metadata isn't in use it will return nil
// If --metadata isn't in use it will return nil.
//
// If the object has no metadata then metadata will be nil
func GetMetadataOptions(ctx context.Context, o ObjectInfo, options []OpenOption) (metadata Metadata, err error) {
// If the object has no metadata then metadata will be nil.
//
// This should be passed the destination Fs for the metadata mapper
func GetMetadataOptions(ctx context.Context, dstFs Fs, o ObjectInfo, options []OpenOption) (metadata Metadata, err error) {
ci := GetConfig(ctx)
if !ci.Metadata {
return nil, nil
@ -81,5 +162,11 @@ func GetMetadataOptions(ctx context.Context, o ObjectInfo, options []OpenOption)
return nil, err
}
metadata.MergeOptions(options)
if len(ci.MetadataMapper) != 0 {
metadata, err = metadataMapper(ctx, ci.MetadataMapper, dstFs, o, metadata)
if err != nil {
return nil, err
}
}
return metadata, nil
}

View file

@ -0,0 +1,75 @@
//go:build ignore
// +build ignore
// A simple metadata mapper for testing purposes
package main
import (
"encoding/json"
"fmt"
"log"
"os"
)
func check[T comparable](in map[string]any, key string, want T) {
value, ok := in[key]
if !ok {
fmt.Fprintf(os.Stderr, "%s key not found\n", key)
os.Exit(1)
}
if value.(T) != want {
fmt.Fprintf(os.Stderr, "%s wrong - expecting %s but got %s\n", key, want, value)
os.Exit(1)
}
}
func main() {
// Read the input
var in map[string]any
err := json.NewDecoder(os.Stdin).Decode(&in)
if err != nil {
log.Fatal(err)
}
// Check the input
metadata, ok := in["Metadata"]
if !ok {
fmt.Fprintf(os.Stderr, "Metadata key not found\n")
os.Exit(1)
}
check(in, "Size", 5.0)
check(in, "SrcFs", "memory:")
check(in, "SrcFsType", "object.memoryFs")
check(in, "DstFs", "dstFs:dstFsRoot")
check(in, "DstFsType", "mockfs")
check(in, "Remote", "file.txt")
check(in, "MimeType", "text/plain; charset=utf-8")
check(in, "ModTime", "2001-02-03T04:05:06.000000007Z")
check(in, "IsDir", false)
//check(in, "ID", "Potato")
// Map the metadata
metadataOut := map[string]string{}
var out = map[string]any{
"Metadata": metadataOut,
}
for k, v := range metadata.(map[string]any) {
switch k {
case "error":
fmt.Fprintf(os.Stderr, "Error: %s\n", v)
os.Exit(1)
case "key1":
v = "two " + v.(string)
case "key3":
continue
}
metadataOut[k] = v.(string)
}
metadataOut["key0"] = "cabbage"
// Write the output
json.NewEncoder(os.Stdout).Encode(&out)
if err != nil {
log.Fatal(err)
}
}

View file

@ -1,14 +1,20 @@
package fs
package fs_test
import (
"context"
"fmt"
"testing"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/object"
"github.com/rclone/rclone/fstest/mockfs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMetadataSet(t *testing.T) {
var m Metadata
var m fs.Metadata
assert.Nil(t, m)
m.Set("key", "value")
assert.NotNil(t, m)
@ -19,34 +25,34 @@ func TestMetadataSet(t *testing.T) {
func TestMetadataMerge(t *testing.T) {
for _, test := range []struct {
in Metadata
merge Metadata
want Metadata
in fs.Metadata
merge fs.Metadata
want fs.Metadata
}{
{
in: Metadata{},
merge: Metadata{},
want: Metadata{},
in: fs.Metadata{},
merge: fs.Metadata{},
want: fs.Metadata{},
}, {
in: nil,
merge: nil,
want: nil,
}, {
in: nil,
merge: Metadata{},
merge: fs.Metadata{},
want: nil,
}, {
in: nil,
merge: Metadata{"a": "1", "b": "2"},
want: Metadata{"a": "1", "b": "2"},
merge: fs.Metadata{"a": "1", "b": "2"},
want: fs.Metadata{"a": "1", "b": "2"},
}, {
in: Metadata{"a": "1", "b": "2"},
in: fs.Metadata{"a": "1", "b": "2"},
merge: nil,
want: Metadata{"a": "1", "b": "2"},
want: fs.Metadata{"a": "1", "b": "2"},
}, {
in: Metadata{"a": "1", "b": "2"},
merge: Metadata{"b": "B", "c": "3"},
want: Metadata{"a": "1", "b": "B", "c": "3"},
in: fs.Metadata{"a": "1", "b": "2"},
merge: fs.Metadata{"b": "B", "c": "3"},
want: fs.Metadata{"a": "1", "b": "B", "c": "3"},
},
} {
what := fmt.Sprintf("in=%v, merge=%v", test.in, test.merge)
@ -57,36 +63,36 @@ func TestMetadataMerge(t *testing.T) {
func TestMetadataMergeOptions(t *testing.T) {
for _, test := range []struct {
in Metadata
opts []OpenOption
want Metadata
in fs.Metadata
opts []fs.OpenOption
want fs.Metadata
}{
{
opts: []OpenOption{},
opts: []fs.OpenOption{},
want: nil,
}, {
opts: []OpenOption{&HTTPOption{}},
opts: []fs.OpenOption{&fs.HTTPOption{}},
want: nil,
}, {
opts: []OpenOption{MetadataOption{"a": "1", "b": "2"}},
want: Metadata{"a": "1", "b": "2"},
opts: []fs.OpenOption{fs.MetadataOption{"a": "1", "b": "2"}},
want: fs.Metadata{"a": "1", "b": "2"},
}, {
opts: []OpenOption{
&HTTPOption{},
MetadataOption{"a": "1", "b": "2"},
MetadataOption{"b": "B", "c": "3"},
&HTTPOption{},
opts: []fs.OpenOption{
&fs.HTTPOption{},
fs.MetadataOption{"a": "1", "b": "2"},
fs.MetadataOption{"b": "B", "c": "3"},
&fs.HTTPOption{},
},
want: Metadata{"a": "1", "b": "B", "c": "3"},
want: fs.Metadata{"a": "1", "b": "B", "c": "3"},
}, {
in: Metadata{"a": "first", "z": "OK"},
opts: []OpenOption{
&HTTPOption{},
MetadataOption{"a": "1", "b": "2"},
MetadataOption{"b": "B", "c": "3"},
&HTTPOption{},
in: fs.Metadata{"a": "first", "z": "OK"},
opts: []fs.OpenOption{
&fs.HTTPOption{},
fs.MetadataOption{"a": "1", "b": "2"},
fs.MetadataOption{"b": "B", "c": "3"},
&fs.HTTPOption{},
},
want: Metadata{"a": "1", "b": "B", "c": "3", "z": "OK"},
want: fs.Metadata{"a": "1", "b": "B", "c": "3", "z": "OK"},
},
} {
what := fmt.Sprintf("in=%v, opts=%v", test.in, test.opts)
@ -94,3 +100,58 @@ func TestMetadataMergeOptions(t *testing.T) {
assert.Equal(t, test.want, test.in, what)
}
}
func TestMetadataMapper(t *testing.T) {
ctx := context.Background()
ctx, ci := fs.AddConfig(ctx)
ci.Metadata = true
require.NoError(t, ci.MetadataMapper.Set("go run metadata_mapper_code.go"))
now := time.Date(2001, 2, 3, 4, 5, 6, 7, time.UTC)
f, err := mockfs.NewFs(ctx, "dstFs", "dstFsRoot", nil)
require.NoError(t, err)
t.Run("Normal", func(t *testing.T) {
o := object.NewMemoryObject("file.txt", now, []byte("hello")).WithMetadata(fs.Metadata{
"key1": "potato",
"key2": "sausage",
"key3": "gravy",
})
metadata, err := fs.GetMetadataOptions(ctx, f, o, nil)
require.NoError(t, err)
assert.Equal(t, fs.Metadata{
"key0": "cabbage",
"key1": "two potato",
"key2": "sausage",
}, metadata)
})
t.Run("Error", func(t *testing.T) {
o := object.NewMemoryObject("file.txt", now, []byte("hello")).WithMetadata(fs.Metadata{
"error": "Red Alert",
})
metadata, err := fs.GetMetadataOptions(ctx, f, o, nil)
require.Error(t, err)
assert.ErrorContains(t, err, "Red Alert")
require.Nil(t, metadata)
})
t.Run("Merge", func(t *testing.T) {
o := object.NewMemoryObject("file.txt", now, []byte("hello")).WithMetadata(fs.Metadata{
"key1": "potato",
"key2": "sausage",
"key3": "gravy",
})
metadata, err := fs.GetMetadataOptions(ctx, f, o, []fs.OpenOption{fs.MetadataOption(fs.Metadata{
"option": "optionValue",
"key1": "new potato",
"key2": "salami",
})})
require.NoError(t, err)
assert.Equal(t, fs.Metadata{
"key0": "cabbage",
"key1": "two new potato",
"key2": "salami",
"option": "optionValue",
}, metadata)
})
}