operations: ensure SrcFsType is set correctly when using --metadata-mapper
Before this change on files which have unknown length (like Google Documents) the SrcFsType would be set to "memoryFs". This change fixes the problem by getting the Copy function to pass the src Fs into a variant of Rcat. Fixes #7848
This commit is contained in:
parent
7b89735ae7
commit
faa58315c5
3 changed files with 26 additions and 4 deletions
|
@ -195,6 +195,7 @@ type MemoryObject struct {
|
||||||
modTime time.Time
|
modTime time.Time
|
||||||
content []byte
|
content []byte
|
||||||
meta fs.Metadata
|
meta fs.Metadata
|
||||||
|
fs fs.Fs
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMemoryObject returns an in memory Object with the modTime and content passed in
|
// NewMemoryObject returns an in memory Object with the modTime and content passed in
|
||||||
|
@ -203,6 +204,7 @@ func NewMemoryObject(remote string, modTime time.Time, content []byte) *MemoryOb
|
||||||
remote: remote,
|
remote: remote,
|
||||||
modTime: modTime,
|
modTime: modTime,
|
||||||
content: content,
|
content: content,
|
||||||
|
fs: MemoryFs,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -219,7 +221,16 @@ func (o *MemoryObject) Content() []byte {
|
||||||
|
|
||||||
// Fs returns read only access to the Fs that this object is part of
|
// Fs returns read only access to the Fs that this object is part of
|
||||||
func (o *MemoryObject) Fs() fs.Info {
|
func (o *MemoryObject) Fs() fs.Info {
|
||||||
return MemoryFs
|
return o.fs
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetFs sets the Fs that this memory object thinks it is part of
|
||||||
|
// It will ignore nil f
|
||||||
|
func (o *MemoryObject) SetFs(f fs.Fs) *MemoryObject {
|
||||||
|
if f != nil {
|
||||||
|
o.fs = f
|
||||||
|
}
|
||||||
|
return o
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remote returns the remote path
|
// Remote returns the remote path
|
||||||
|
|
|
@ -180,7 +180,11 @@ func (c *copy) rcat(ctx context.Context, in io.ReadCloser) (actionTaken string,
|
||||||
}
|
}
|
||||||
|
|
||||||
// NB Rcat closes in0
|
// NB Rcat closes in0
|
||||||
newDst, err = Rcat(ctx, c.f, c.remoteForCopy, in, c.src.ModTime(ctx), meta)
|
fsrc, ok := c.src.Fs().(fs.Fs)
|
||||||
|
if !ok {
|
||||||
|
fsrc = nil
|
||||||
|
}
|
||||||
|
newDst, err = rcatSrc(ctx, c.f, c.remoteForCopy, in, c.src.ModTime(ctx), meta, fsrc)
|
||||||
if c.doUpdate {
|
if c.doUpdate {
|
||||||
actionTaken = "Copied (Rcat, replaced existing)"
|
actionTaken = "Copied (Rcat, replaced existing)"
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1270,6 +1270,13 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64, sep []b
|
||||||
|
|
||||||
// Rcat reads data from the Reader until EOF and uploads it to a file on remote
|
// Rcat reads data from the Reader until EOF and uploads it to a file on remote
|
||||||
func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time, meta fs.Metadata) (dst fs.Object, err error) {
|
func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time, meta fs.Metadata) (dst fs.Object, err error) {
|
||||||
|
return rcatSrc(ctx, fdst, dstFileName, in, modTime, meta, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// rcatSrc reads data from the Reader until EOF and uploads it to a file on remote
|
||||||
|
//
|
||||||
|
// Pass in fsrc if known or nil if not
|
||||||
|
func rcatSrc(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time, meta fs.Metadata, fsrc fs.Fs) (dst fs.Object, err error) {
|
||||||
ci := fs.GetConfig(ctx)
|
ci := fs.GetConfig(ctx)
|
||||||
tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, -1, nil, fdst)
|
tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, -1, nil, fdst)
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -1322,7 +1329,7 @@ func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser,
|
||||||
buf := make([]byte, ci.StreamingUploadCutoff)
|
buf := make([]byte, ci.StreamingUploadCutoff)
|
||||||
if n, err := io.ReadFull(trackingIn, buf); err == io.EOF || err == io.ErrUnexpectedEOF {
|
if n, err := io.ReadFull(trackingIn, buf); err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||||
fs.Debugf(fdst, "File to upload is small (%d bytes), uploading instead of streaming", n)
|
fs.Debugf(fdst, "File to upload is small (%d bytes), uploading instead of streaming", n)
|
||||||
src := object.NewMemoryObject(dstFileName, modTime, buf[:n]).WithMetadata(meta)
|
src := object.NewMemoryObject(dstFileName, modTime, buf[:n]).WithMetadata(meta).SetFs(fsrc)
|
||||||
return Copy(ctx, fdst, nil, dstFileName, src)
|
return Copy(ctx, fdst, nil, dstFileName, src)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1355,7 +1362,7 @@ func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser,
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
objInfo := object.NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, nil).WithMetadata(meta)
|
objInfo := object.NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, fsrc).WithMetadata(meta)
|
||||||
if dst, err = fStreamTo.Features().PutStream(ctx, in, objInfo, options...); err != nil {
|
if dst, err = fStreamTo.Features().PutStream(ctx, in, objInfo, options...); err != nil {
|
||||||
return dst, err
|
return dst, err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue