forked from TrueCloudLab/rclone
chunker: md5all must create metadata if base hash is slow
Before this patch the md5all option would skip creating metadata with hashsum if base filesystem provided md5, in hope to pass it through. However, if base hash is slow (for example on local fs), chunker passed slow md5 but never reported this fact in features. This patch makes chunker snapshot base hashsum in metadata when md5all is set and base hashsum is slow since chunker was intended to provide only instant hashsums from the start. Fixes #5508
This commit is contained in:
parent
7d66bfbb7c
commit
cf9b82b8db
2 changed files with 75 additions and 19 deletions
|
@ -432,10 +432,10 @@ func (f *Fs) setHashType(hashType string) error {
|
|||
f.hashFallback = true
|
||||
case "md5all":
|
||||
f.useMD5 = true
|
||||
f.hashAll = !f.base.Hashes().Contains(hash.MD5)
|
||||
f.hashAll = !f.base.Hashes().Contains(hash.MD5) || f.base.Features().SlowHash
|
||||
case "sha1all":
|
||||
f.useSHA1 = true
|
||||
f.hashAll = !f.base.Hashes().Contains(hash.SHA1)
|
||||
f.hashAll = !f.base.Hashes().Contains(hash.SHA1) || f.base.Features().SlowHash
|
||||
default:
|
||||
return fmt.Errorf("unsupported hash type '%s'", hashType)
|
||||
}
|
||||
|
@ -812,7 +812,7 @@ func (f *Fs) processEntries(ctx context.Context, origEntries fs.DirEntries, dirP
|
|||
tempEntries = append(tempEntries, wrapDir)
|
||||
default:
|
||||
if f.opt.FailHard {
|
||||
return nil, fmt.Errorf("Unknown object type %T", entry)
|
||||
return nil, fmt.Errorf("unknown object type %T", entry)
|
||||
}
|
||||
fs.Debugf(f, "unknown object type %T", entry)
|
||||
}
|
||||
|
@ -1099,7 +1099,7 @@ func (o *Object) readXactID(ctx context.Context) (xactID string, err error) {
|
|||
|
||||
switch o.f.opt.MetaFormat {
|
||||
case "simplejson":
|
||||
if data != nil && len(data) > maxMetadataSizeWritten {
|
||||
if len(data) > maxMetadataSizeWritten {
|
||||
return "", nil // this was likely not a metadata object, return empty xactID but don't throw error
|
||||
}
|
||||
var metadata metaSimpleJSON
|
||||
|
@ -1214,7 +1214,7 @@ func (f *Fs) put(
|
|||
// and skips the "EOF" read. Hence, switch to next limit here.
|
||||
if !(c.chunkLimit == 0 || c.chunkLimit == c.chunkSize || c.sizeTotal == -1 || c.done) {
|
||||
silentlyRemove(ctx, chunk)
|
||||
return nil, fmt.Errorf("Destination ignored %d data bytes", c.chunkLimit)
|
||||
return nil, fmt.Errorf("destination ignored %d data bytes", c.chunkLimit)
|
||||
}
|
||||
c.chunkLimit = c.chunkSize
|
||||
|
||||
|
@ -1223,7 +1223,7 @@ func (f *Fs) put(
|
|||
|
||||
// Validate uploaded size
|
||||
if c.sizeTotal != -1 && c.readCount != c.sizeTotal {
|
||||
return nil, fmt.Errorf("Incorrect upload size %d != %d", c.readCount, c.sizeTotal)
|
||||
return nil, fmt.Errorf("incorrect upload size %d != %d", c.readCount, c.sizeTotal)
|
||||
}
|
||||
|
||||
// Check for input that looks like valid metadata
|
||||
|
@ -1260,7 +1260,7 @@ func (f *Fs) put(
|
|||
sizeTotal += chunk.Size()
|
||||
}
|
||||
if sizeTotal != c.readCount {
|
||||
return nil, fmt.Errorf("Incorrect chunks size %d != %d", sizeTotal, c.readCount)
|
||||
return nil, fmt.Errorf("incorrect chunks size %d != %d", sizeTotal, c.readCount)
|
||||
}
|
||||
|
||||
// If previous object was chunked, remove its chunks
|
||||
|
@ -2440,7 +2440,7 @@ func marshalSimpleJSON(ctx context.Context, size int64, nChunks int, md5, sha1,
|
|||
func unmarshalSimpleJSON(ctx context.Context, metaObject fs.Object, data []byte) (info *ObjectInfo, madeByChunker bool, err error) {
|
||||
// Be strict about JSON format
|
||||
// to reduce possibility that a random small file resembles metadata.
|
||||
if data != nil && len(data) > maxMetadataSizeWritten {
|
||||
if len(data) > maxMetadataSizeWritten {
|
||||
return nil, false, ErrMetaTooBig
|
||||
}
|
||||
if data == nil || len(data) < 2 || data[0] != '{' || data[len(data)-1] != '}' {
|
||||
|
|
|
@ -12,6 +12,8 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/config/configmap"
|
||||
"github.com/rclone/rclone/fs/fspath"
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
"github.com/rclone/rclone/fs/object"
|
||||
"github.com/rclone/rclone/fs/operations"
|
||||
|
@ -38,6 +40,30 @@ func testPutLarge(t *testing.T, f *Fs, kilobytes int) {
|
|||
})
|
||||
}
|
||||
|
||||
type settings map[string]interface{}
|
||||
|
||||
func deriveFs(ctx context.Context, t *testing.T, f fs.Fs, path string, opts settings) fs.Fs {
|
||||
fsName := strings.Split(f.Name(), "{")[0] // strip off hash
|
||||
configMap := configmap.Simple{}
|
||||
for key, val := range opts {
|
||||
configMap[key] = fmt.Sprintf("%v", val)
|
||||
}
|
||||
rpath := fspath.JoinRootPath(f.Root(), path)
|
||||
remote := fmt.Sprintf("%s,%s:%s", fsName, configMap.String(), rpath)
|
||||
fixFs, err := fs.NewFs(ctx, remote)
|
||||
require.NoError(t, err)
|
||||
return fixFs
|
||||
}
|
||||
|
||||
var mtime1 = fstest.Time("2001-02-03T04:05:06.499999999Z")
|
||||
|
||||
func testPutFile(ctx context.Context, t *testing.T, f fs.Fs, name, contents, message string, check bool) fs.Object {
|
||||
item := fstest.Item{Path: name, ModTime: mtime1}
|
||||
_, obj := fstests.PutTestContents(ctx, t, f, &item, contents, check)
|
||||
assert.NotNil(t, obj, message)
|
||||
return obj
|
||||
}
|
||||
|
||||
// test chunk name parser
|
||||
func testChunkNameFormat(t *testing.T, f *Fs) {
|
||||
saveOpt := f.opt
|
||||
|
@ -617,22 +643,13 @@ func testMetadataInput(t *testing.T, f *Fs) {
|
|||
}()
|
||||
f.opt.FailHard = false
|
||||
|
||||
modTime := fstest.Time("2001-02-03T04:05:06.499999999Z")
|
||||
|
||||
putFile := func(f fs.Fs, name, contents, message string, check bool) fs.Object {
|
||||
item := fstest.Item{Path: name, ModTime: modTime}
|
||||
_, obj := fstests.PutTestContents(ctx, t, f, &item, contents, check)
|
||||
assert.NotNil(t, obj, message)
|
||||
return obj
|
||||
}
|
||||
|
||||
runSubtest := func(contents, name string) {
|
||||
description := fmt.Sprintf("file with %s metadata", name)
|
||||
filename := path.Join(dir, name)
|
||||
require.True(t, len(contents) > 2 && len(contents) < minChunkForTest, description+" test data is correct")
|
||||
|
||||
part := putFile(f.base, f.makeChunkName(filename, 0, "", ""), "oops", "", true)
|
||||
_ = putFile(f, filename, contents, "upload "+description, false)
|
||||
part := testPutFile(ctx, t, f.base, f.makeChunkName(filename, 0, "", ""), "oops", "", true)
|
||||
_ = testPutFile(ctx, t, f, filename, contents, "upload "+description, false)
|
||||
|
||||
obj, err := f.NewObject(ctx, filename)
|
||||
assert.NoError(t, err, "access "+description)
|
||||
|
@ -844,6 +861,42 @@ func testChunkerServerSideMove(t *testing.T, f *Fs) {
|
|||
_ = operations.Purge(ctx, f.base, dir)
|
||||
}
|
||||
|
||||
// Test that md5all creates metadata even for small files
|
||||
func testMD5AllSlow(t *testing.T, f *Fs) {
|
||||
ctx := context.Background()
|
||||
fsResult := deriveFs(ctx, t, f, "md5all", settings{
|
||||
"chunk_size": "1P",
|
||||
"name_format": "*.#",
|
||||
"hash_type": "md5all",
|
||||
})
|
||||
chunkFs, ok := fsResult.(*Fs)
|
||||
require.True(t, ok, "fs must be a chunker remote")
|
||||
baseFs := chunkFs.base
|
||||
if !baseFs.Features().SlowHash {
|
||||
t.Skipf("this test needs a base fs with slow hash, e.g. local")
|
||||
}
|
||||
|
||||
assert.True(t, chunkFs.useMD5, "must use md5")
|
||||
assert.True(t, chunkFs.hashAll, "must hash all files")
|
||||
|
||||
_ = testPutFile(ctx, t, chunkFs, "file", "-", "error", true)
|
||||
obj, err := chunkFs.NewObject(ctx, "file")
|
||||
require.NoError(t, err)
|
||||
sum, err := obj.Hash(ctx, hash.MD5)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "336d5ebc5436534e61d16e63ddfca327", sum)
|
||||
|
||||
list, err := baseFs.List(ctx, "")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 2, len(list))
|
||||
_, err = baseFs.NewObject(ctx, "file")
|
||||
assert.NoError(t, err, "metadata must be created")
|
||||
_, err = baseFs.NewObject(ctx, "file.1")
|
||||
assert.NoError(t, err, "first chunk must be created")
|
||||
|
||||
require.NoError(t, operations.Purge(ctx, baseFs, ""))
|
||||
}
|
||||
|
||||
// InternalTest dispatches all internal tests
|
||||
func (f *Fs) InternalTest(t *testing.T) {
|
||||
t.Run("PutLarge", func(t *testing.T) {
|
||||
|
@ -876,6 +929,9 @@ func (f *Fs) InternalTest(t *testing.T) {
|
|||
t.Run("ChunkerServerSideMove", func(t *testing.T) {
|
||||
testChunkerServerSideMove(t, f)
|
||||
})
|
||||
t.Run("MD5AllSlow", func(t *testing.T) {
|
||||
testMD5AllSlow(t, f)
|
||||
})
|
||||
}
|
||||
|
||||
var _ fstests.InternalTester = (*Fs)(nil)
|
||||
|
|
Loading…
Reference in a new issue