chunker: fix case-insensitive NewObject, test metadata detection #4902

- fix test case FsNewObjectCaseInsensitive (PR #4830)
- continue PR #4917, add comments in metadata detection code
- add warning about metadata detection in user documentation
- change metadata size limits, make room for future development
- hide critical chunker parameters from command line
This commit is contained in:
Ivan Andreev 2021-01-04 04:08:22 +03:00
parent 847625822f
commit 35a4de2030
3 changed files with 134 additions and 12 deletions

View file

@ -97,7 +97,8 @@ var (
//
// And still chunker's primary function is to chunk large files
// rather than serve as a generic metadata container.
const maxMetadataSize = 255
const maxMetadataSize = 1023
const maxMetadataSizeWritten = 255
// Current/highest supported metadata format.
const metadataVersion = 1
@ -152,6 +153,7 @@ Normally should contain a ':' and a path, e.g. "myremote:path/to/dir",
}, {
Name: "name_format",
Advanced: true,
Hide: fs.OptionHideCommandLine,
Default: `*.rclone_chunk.###`,
Help: `String format of chunk file names.
The two placeholders are: base file name (*) and chunk number (#...).
@ -162,12 +164,14 @@ Possible chunk files are ignored if their name does not match given format.`,
}, {
Name: "start_from",
Advanced: true,
Hide: fs.OptionHideCommandLine,
Default: 1,
Help: `Minimum valid chunk number. Usually 0 or 1.
By default chunk numbers start from 1.`,
}, {
Name: "meta_format",
Advanced: true,
Hide: fs.OptionHideCommandLine,
Default: "simplejson",
Help: `Format of the metadata object or "none". By default "simplejson".
Metadata is a small JSON file named after the composite file.`,
@ -725,6 +729,9 @@ func (f *Fs) processEntries(ctx context.Context, origEntries fs.DirEntries, dirP
fs.Infof(f, "ignore non-data chunk %q", remote)
}
// need to read metadata to ensure actual object type
// no need to read if metaobject is too big or absent,
// use the fact that before calling validate()
// the `size` field caches metaobject size, if any
if f.useMeta && mainObject != nil && mainObject.size <= maxMetadataSize {
mainObject.unsure = true
}
@ -805,6 +812,7 @@ func (f *Fs) scanObject(ctx context.Context, remote string, quickScan bool) (fs.
o *Object
baseObj fs.Object
err error
sameMain bool
)
if f.useMeta {
@ -818,6 +826,7 @@ func (f *Fs) scanObject(ctx context.Context, remote string, quickScan bool) (fs.
// as a hard limit. Anything larger than that is treated as a
// non-chunked file without even checking its contents, so it's
// paramount to prevent metadata from exceeding the maximum size.
// Anything smaller is additionally checked for format.
o = f.newObject("", baseObj, nil)
if o.size > maxMetadataSize {
return o, nil
@ -847,18 +856,27 @@ func (f *Fs) scanObject(ctx context.Context, remote string, quickScan bool) (fs.
return nil, errors.Wrap(err, "can't detect composite file")
}
caseInsensitive := f.features.CaseInsensitive
for _, dirOrObject := range entries {
entry, ok := dirOrObject.(fs.Object)
if !ok {
continue
}
entryRemote := entry.Remote()
if !strings.Contains(entryRemote, remote) {
if !caseInsensitive && !strings.Contains(entryRemote, remote) {
continue // bypass regexp to save cpu
}
mainRemote, chunkNo, ctrlType, xactID := f.parseChunkName(entryRemote)
if mainRemote == "" || mainRemote != remote {
continue // skip non-conforming chunks
if mainRemote == "" {
continue // skip non-chunks
}
if caseInsensitive {
sameMain = strings.EqualFold(mainRemote, remote)
} else {
sameMain = mainRemote == remote
}
if !sameMain {
continue // skip alien chunks
}
if ctrlType != "" || xactID != "" {
if f.useMeta {
@ -906,11 +924,22 @@ func (f *Fs) scanObject(ctx context.Context, remote string, quickScan bool) (fs.
return o, nil
}
// readMetadata reads composite object metadata and caches results,
// in case of critical errors metadata is not cached.
// Returns ErrMetaUnknown if an unsupported metadata format is detected.
// If object is not chunked but marked by List or NewObject for recheck,
// readMetadata will attempt to parse object as composite with fallback
// to non-chunked representation if the attempt fails.
func (o *Object) readMetadata(ctx context.Context) error {
// return quickly if metadata is absent or has been already cached
if !o.f.useMeta {
o.isFull = true
}
if o.isFull {
return nil
}
if !o.f.useMeta || (!o.isComposite() && !o.unsure) {
if !o.isComposite() && !o.unsure {
// this for sure is a non-chunked standalone file
o.isFull = true
return nil
}
@ -928,6 +957,7 @@ func (o *Object) readMetadata(ctx context.Context) error {
return ErrMetaTooBig
}
// size is within limits, perform consistency checks
reader, err := metaObject.Open(ctx)
if err != nil {
return err
@ -965,7 +995,7 @@ func (o *Object) readMetadata(ctx context.Context) error {
o.sha1 = metaInfo.sha1
}
o.isFull = true
o.isFull = true // cache results
return nil
}
@ -974,11 +1004,14 @@ func (f *Fs) put(
ctx context.Context, in io.Reader, src fs.ObjectInfo, remote string, options []fs.OpenOption,
basePut putFn, action string, target fs.Object) (obj fs.Object, err error) {
// Perform consistency checks
if err := f.forbidChunk(src, remote); err != nil {
return nil, errors.Wrap(err, action+" refused")
}
if target == nil {
// Get target object with a quick directory scan
// skip metadata check if target object does not exist.
// ignore not-chunked objects, skip chunk size checks.
if obj, err := f.scanObject(ctx, remote, true); err == nil {
target = obj
}
@ -991,6 +1024,7 @@ func (f *Fs) put(
}
}
// Prepare to upload
c := f.newChunkingReader(src)
wrapIn := c.wrapStream(ctx, in, src)
@ -1593,6 +1627,8 @@ func (f *Fs) okForServerSide(ctx context.Context, src fs.Object, opName string)
diff = "chunk sizes"
case f.opt.NameFormat != obj.f.opt.NameFormat:
diff = "chunk name formats"
case f.opt.StartFrom != obj.f.opt.StartFrom:
diff = "chunk numbering"
case f.opt.MetaFormat != obj.f.opt.MetaFormat:
diff = "meta formats"
}
@ -1821,6 +1857,9 @@ func (o *Object) addChunk(chunk fs.Object, chunkNo int) error {
copy(newChunks, o.chunks)
o.chunks = newChunks
}
if o.chunks[chunkNo] != nil {
return fmt.Errorf("duplicate chunk number %d", chunkNo+o.f.opt.StartFrom)
}
o.chunks[chunkNo] = chunk
return nil
}
@ -2248,15 +2287,17 @@ func marshalSimpleJSON(ctx context.Context, size int64, nChunks int, md5, sha1 s
SHA1: sha1,
}
data, err := json.Marshal(&metadata)
if err == nil && data != nil && len(data) >= maxMetadataSize {
if err == nil && data != nil && len(data) >= maxMetadataSizeWritten {
// be a nitpicker, never produce something you can't consume
return nil, errors.New("metadata can't be this big, please report to rclone developers")
}
return data, err
}
// unmarshalSimpleJSON
// unmarshalSimpleJSON parses metadata.
//
// In case of errors returns a flag telling whether input has been
// produced by incompatible version of rclone vs wasn't metadata at all.
// Only metadata format version 1 is supported atm.
// Future releases will transparently migrate older metadata objects.
// New format will have a higher version number and cannot be correctly
@ -2266,7 +2307,7 @@ func marshalSimpleJSON(ctx context.Context, size int64, nChunks int, md5, sha1 s
func unmarshalSimpleJSON(ctx context.Context, metaObject fs.Object, data []byte) (info *ObjectInfo, madeByChunker bool, err error) {
// Be strict about JSON format
// to reduce possibility that a random small file resembles metadata.
if data != nil && len(data) > maxMetadataSize {
if data != nil && len(data) > maxMetadataSizeWritten {
return nil, false, ErrMetaTooBig
}
if data == nil || len(data) < 2 || data[0] != '{' || data[len(data)-1] != '}' {

View file

@ -13,6 +13,7 @@ import (
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/object"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/fstest"
"github.com/rclone/rclone/fstest/fstests"
@ -663,6 +664,80 @@ func testMetadataInput(t *testing.T, f *Fs) {
runSubtest(futureMeta, "future")
}
// test that chunker refuses to change on objects with future/unknowm metadata
func testFutureProof(t *testing.T, f *Fs) {
if f.opt.MetaFormat == "none" {
t.Skip("this test requires metadata support")
}
saveOpt := f.opt
ctx := context.Background()
f.opt.FailHard = true
const dir = "future"
const file = dir + "/test"
defer func() {
f.opt.FailHard = false
_ = operations.Purge(ctx, f.base, dir)
f.opt = saveOpt
}()
modTime := fstest.Time("2001-02-03T04:05:06.499999999Z")
putPart := func(name string, part int, data, msg string) {
if part > 0 {
name = f.makeChunkName(name, part-1, "", "")
}
item := fstest.Item{Path: name, ModTime: modTime}
_, obj := fstests.PutTestContents(ctx, t, f.base, &item, data, true)
assert.NotNil(t, obj, msg)
}
// simulate chunked object from future
meta := `{"ver":999,"nchunks":3,"size":9,"garbage":"litter","sha1":"0707f2970043f9f7c22029482db27733deaec029"}`
putPart(file, 0, meta, "metaobject")
putPart(file, 1, "abc", "chunk1")
putPart(file, 2, "def", "chunk2")
putPart(file, 3, "ghi", "chunk3")
// List should succeed
ls, err := f.List(ctx, dir)
assert.NoError(t, err)
assert.Equal(t, 1, len(ls))
assert.Equal(t, int64(9), ls[0].Size())
// NewObject should succeed
obj, err := f.NewObject(ctx, file)
assert.NoError(t, err)
assert.Equal(t, file, obj.Remote())
assert.Equal(t, int64(9), obj.Size())
// Hash must fail
_, err = obj.Hash(ctx, hash.SHA1)
assert.Equal(t, ErrMetaUnknown, err)
// Move must fail
mobj, err := operations.Move(ctx, f, nil, file+"2", obj)
assert.Nil(t, mobj)
assert.Error(t, err)
if err != nil {
assert.Contains(t, err.Error(), "please upgrade rclone")
}
// Put must fail
oi := object.NewStaticObjectInfo(file, modTime, 3, true, nil, nil)
buf := bytes.NewBufferString("abc")
_, err = f.Put(ctx, buf, oi)
assert.Error(t, err)
// Rcat must fail
in := ioutil.NopCloser(bytes.NewBufferString("abc"))
robj, err := operations.Rcat(ctx, f, file, in, modTime)
assert.Nil(t, robj)
assert.NotNil(t, err)
if err != nil {
assert.Contains(t, err.Error(), "please upgrade rclone")
}
}
// InternalTest dispatches all internal tests
func (f *Fs) InternalTest(t *testing.T) {
t.Run("PutLarge", func(t *testing.T) {
@ -686,6 +761,9 @@ func (f *Fs) InternalTest(t *testing.T) {
t.Run("MetadataInput", func(t *testing.T) {
testMetadataInput(t, f)
})
t.Run("FutureProof", func(t *testing.T) {
testFutureProof(t, f)
})
}
var _ fstests.InternalTester = (*Fs)(nil)

View file

@ -299,6 +299,9 @@ If wrapped remote is case insensitive, the chunker overlay will inherit
that property (so you can't have a file called "Hello.doc" and "hello.doc"
in the same directory).
Chunker included in rclone releases up to `v1.54` can sometimes fail to
detect metadata produced by recent versions of rclone. We recommend users
to keep rclone up-to-date to avoid data corruption.
{{< rem autogenerated options start" - DO NOT EDIT - instead edit fs.RegInfo in backend/chunker/chunker.go then run make backenddocs" >}}
### Standard Options