forked from TrueCloudLab/rclone
vfs: add tests and fix bugs for vfscache.Item
Item - Remove unused method getName - Fix Truncate on unopened file - Fix bug when downloading segments to fill out file on close - Fix bug when WriteAt extends the file and we don't mark space as used downloader - Retry failed waiters every 5 seconds
This commit is contained in:
parent
9ac5c6de14
commit
279a516c53
3 changed files with 675 additions and 19 deletions
|
@ -21,6 +21,8 @@ const (
|
|||
maxDownloaderIdleTime = 5 * time.Second
|
||||
// max number of bytes a reader should skip over before closing it
|
||||
maxSkipBytes = 1024 * 1024
|
||||
// time between background kicks of waiters to pick up errors
|
||||
backgroundKickerInterval = 5 * time.Second
|
||||
)
|
||||
|
||||
// downloaders is a number of downloader~s and a queue of waiters
|
||||
|
@ -28,11 +30,13 @@ const (
|
|||
type downloaders struct {
|
||||
// Write once - no locking required
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
item *Item
|
||||
src fs.Object // source object
|
||||
remote string
|
||||
fcache fs.Fs // destination Fs
|
||||
osPath string
|
||||
wg sync.WaitGroup
|
||||
|
||||
// Read write
|
||||
mu sync.Mutex
|
||||
|
@ -68,14 +72,35 @@ type downloader struct {
|
|||
}
|
||||
|
||||
func newDownloaders(item *Item, fcache fs.Fs, remote string, src fs.Object) (dls *downloaders) {
|
||||
if src == nil {
|
||||
panic("internal error: newDownloaders called with nil src object")
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
dls = &downloaders{
|
||||
ctx: context.Background(),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
item: item,
|
||||
src: src,
|
||||
remote: remote,
|
||||
fcache: fcache,
|
||||
osPath: item.c.toOSPath(remote),
|
||||
}
|
||||
dls.wg.Add(1)
|
||||
go func() {
|
||||
defer dls.wg.Done()
|
||||
ticker := time.NewTicker(backgroundKickerInterval)
|
||||
select {
|
||||
case <-ticker.C:
|
||||
err := dls.kickWaiters()
|
||||
if err != nil {
|
||||
fs.Errorf(dls.src, "Failed to kick waiters: %v", err)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
break
|
||||
}
|
||||
ticker.Stop()
|
||||
}()
|
||||
|
||||
return dls
|
||||
}
|
||||
|
||||
|
@ -146,6 +171,8 @@ func (dls *downloaders) close(inErr error) (err error) {
|
|||
err = closeErr
|
||||
}
|
||||
}
|
||||
dls.cancel()
|
||||
dls.wg.Wait()
|
||||
dls.dls = nil
|
||||
dls._dispatchWaiters()
|
||||
dls._closeWaiters(inErr)
|
||||
|
@ -296,10 +323,15 @@ func (dls *downloaders) kickWaiters() (err error) {
|
|||
for _, waiter := range dls.waiters {
|
||||
err = dls._ensureDownloader(waiter.r)
|
||||
if err != nil {
|
||||
// Failures here will be retried by background kicker
|
||||
fs.Errorf(dls.src, "Restart download failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if true {
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -32,13 +33,11 @@ import (
|
|||
// NB Item and downloader are tightly linked so it is necessary to
|
||||
// have a total lock ordering between them. downloader.mu must always
|
||||
// be taken before Item.mu. downloader may call into Item but Item may
|
||||
// **not** call downloader methods with Item.mu held, except for
|
||||
//
|
||||
// - downloader.running
|
||||
// **not** call downloader methods with Item.mu held
|
||||
|
||||
// Item is stored in the item map
|
||||
//
|
||||
// These are written to the backing store to store status
|
||||
// The Info field is written to the backing store to store status
|
||||
type Item struct {
|
||||
// read only
|
||||
c *Cache // cache this is part of
|
||||
|
@ -145,13 +144,6 @@ func (item *Item) getATime() time.Time {
|
|||
return item.info.ATime
|
||||
}
|
||||
|
||||
// getName returns the name of the item
|
||||
func (item *Item) getName() string {
|
||||
item.mu.Lock()
|
||||
defer item.mu.Unlock()
|
||||
return item.name
|
||||
}
|
||||
|
||||
// getDiskSize returns the size on disk (approximately) of the item
|
||||
//
|
||||
// We return the sizes of the chunks we have fetched, however there is
|
||||
|
@ -277,6 +269,10 @@ func (item *Item) Truncate(size int64) (err error) {
|
|||
item.mu.Lock()
|
||||
defer item.mu.Unlock()
|
||||
|
||||
if item.fd == nil {
|
||||
return errors.New("vfs cache item truncate: internal error: didn't Open file")
|
||||
}
|
||||
|
||||
// Read old size
|
||||
oldSize, err := item._getSize()
|
||||
if err != nil {
|
||||
|
@ -454,7 +450,9 @@ func (item *Item) Open(o fs.Object) (err error) {
|
|||
item.mu.Lock()
|
||||
|
||||
// Create the downloaders
|
||||
item.downloaders = newDownloaders(item, item.c.fremote, item.name, item.o)
|
||||
if item.o != nil {
|
||||
item.downloaders = newDownloaders(item, item.c.fremote, item.name, item.o)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -466,12 +464,6 @@ func (item *Item) Open(o fs.Object) (err error) {
|
|||
func (item *Item) _store(ctx context.Context, storeFn StoreFn) (err error) {
|
||||
defer log.Trace(item.name, "item=%p", item)("err=%v", &err)
|
||||
|
||||
// Ensure any segments not transferred are brought in
|
||||
err = item._ensure(0, item.info.Size)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "vfs cache: failed to download missing parts of cache file")
|
||||
}
|
||||
|
||||
// Transfer the temp file to the remote
|
||||
cacheObj, err := item.c.fcache.NewObject(ctx, item.name)
|
||||
if err != nil && err != fs.ErrorObjectNotFound {
|
||||
|
@ -535,6 +527,19 @@ func (item *Item) Close(storeFn StoreFn) (err error) {
|
|||
// Update the size on close
|
||||
_, _ = item._getSize()
|
||||
|
||||
// If the file is dirty ensure any segments not transferred
|
||||
// are brought in first.
|
||||
//
|
||||
// FIXME It would be nice to do this asynchronously howeve it
|
||||
// would require keeping the downloaders alive after the item
|
||||
// has been closed
|
||||
if item.info.Dirty && item.o != nil {
|
||||
err = item._ensure(0, item.info.Size)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "vfs cache: failed to download missing parts of cache file")
|
||||
}
|
||||
}
|
||||
|
||||
// Accumulate and log errors
|
||||
checkErr := func(e error) {
|
||||
if e != nil {
|
||||
|
@ -599,6 +604,8 @@ func (item *Item) Close(storeFn StoreFn) (err error) {
|
|||
|
||||
// reload is called with valid items recovered from a cache reload.
|
||||
//
|
||||
// If they are dirty then it makes sure they get uploaded
|
||||
//
|
||||
// it is called before the cache has started so opens will be 0 and
|
||||
// metaDirty will be false.
|
||||
func (item *Item) reload(ctx context.Context) error {
|
||||
|
@ -729,6 +736,13 @@ func (item *Item) _present() bool {
|
|||
return item.info.Rs.Present(ranges.Range{Pos: 0, Size: item.info.Size})
|
||||
}
|
||||
|
||||
// present returns true if the whole file has been downloaded
|
||||
func (item *Item) present() bool {
|
||||
item.mu.Lock()
|
||||
defer item.mu.Unlock()
|
||||
return item._present()
|
||||
}
|
||||
|
||||
// hasRange returns true if the current ranges entirely include range
|
||||
func (item *Item) hasRange(r ranges.Range) bool {
|
||||
item.mu.Lock()
|
||||
|
@ -781,6 +795,10 @@ func (item *Item) _ensure(offset, size int64) (err error) {
|
|||
// This is called by the downloader downloading file segments and the
|
||||
// vfs layer writing to the file.
|
||||
//
|
||||
// This doesn't mark the item as Dirty - that the the responsibility
|
||||
// of the caller as we don't know here whether we are adding reads or
|
||||
// writes to the cache file.
|
||||
//
|
||||
// call with lock held
|
||||
func (item *Item) _written(offset, size int64) {
|
||||
defer log.Trace(item.name, "offset=%d, size=%d", offset, size)("")
|
||||
|
@ -835,6 +853,10 @@ func (item *Item) ReadAt(b []byte, off int64) (n int, err error) {
|
|||
item.mu.Unlock()
|
||||
return 0, errors.New("vfs cache item ReadAt: internal error: didn't Open file")
|
||||
}
|
||||
if off < 0 {
|
||||
item.mu.Unlock()
|
||||
return 0, io.EOF
|
||||
}
|
||||
err = item._ensure(off, int64(len(b)))
|
||||
if err != nil {
|
||||
item.mu.Unlock()
|
||||
|
@ -865,6 +887,14 @@ func (item *Item) WriteAt(b []byte, off int64) (n int, err error) {
|
|||
item._dirty()
|
||||
}
|
||||
end := off + int64(n)
|
||||
// Writing off the end of the file so need to make some
|
||||
// zeroes. we do this by showing that we have written to the
|
||||
// new parts of the file.
|
||||
if off > item.info.Size {
|
||||
item._written(item.info.Size, off-item.info.Size)
|
||||
item._dirty()
|
||||
}
|
||||
// Update size
|
||||
if end > item.info.Size {
|
||||
item.info.Size = end
|
||||
}
|
||||
|
|
594
vfs/vfscache/item_test.go
Normal file
594
vfs/vfscache/item_test.go
Normal file
|
@ -0,0 +1,594 @@
|
|||
package vfscache
|
||||
|
||||
// FIXME need to test async writeback here
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fstest"
|
||||
"github.com/rclone/rclone/lib/random"
|
||||
"github.com/rclone/rclone/lib/readers"
|
||||
"github.com/rclone/rclone/vfs/vfscommon"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var zeroes = string(make([]byte, 100))
|
||||
|
||||
func newItemTestCache(t *testing.T) (r *fstest.Run, c *Cache, cleanup func()) {
|
||||
opt := vfscommon.DefaultOpt
|
||||
|
||||
// Disable the cache cleaner as it interferes with these tests
|
||||
opt.CachePollInterval = 0
|
||||
|
||||
// Disable synchronous write
|
||||
opt.WriteBack = 0
|
||||
|
||||
return newTestCacheOpt(t, opt)
|
||||
}
|
||||
|
||||
// Check the object has contents
|
||||
func checkObject(t *testing.T, r *fstest.Run, remote string, contents string) {
|
||||
obj, err := r.Fremote.NewObject(context.Background(), remote)
|
||||
require.NoError(t, err)
|
||||
in, err := obj.Open(context.Background())
|
||||
require.NoError(t, err)
|
||||
buf, err := ioutil.ReadAll(in)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, in.Close())
|
||||
assert.Equal(t, contents, string(buf))
|
||||
}
|
||||
|
||||
func newFileLength(t *testing.T, r *fstest.Run, c *Cache, remote string, length int) (contents string, obj fs.Object, item *Item) {
|
||||
contents = random.String(length)
|
||||
r.WriteObject(context.Background(), remote, contents, time.Now())
|
||||
item, _ = c.get(remote)
|
||||
obj, err := r.Fremote.NewObject(context.Background(), remote)
|
||||
require.NoError(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
func newFile(t *testing.T, r *fstest.Run, c *Cache, remote string) (contents string, obj fs.Object, item *Item) {
|
||||
return newFileLength(t, r, c, remote, 100)
|
||||
}
|
||||
|
||||
func TestItemExists(t *testing.T) {
|
||||
_, c, cleanup := newItemTestCache(t)
|
||||
defer cleanup()
|
||||
item, _ := c.get("potato")
|
||||
|
||||
assert.False(t, item.Exists())
|
||||
require.NoError(t, item.Open(nil))
|
||||
assert.True(t, item.Exists())
|
||||
require.NoError(t, item.Close(nil))
|
||||
assert.True(t, item.Exists())
|
||||
item.remove("test")
|
||||
assert.False(t, item.Exists())
|
||||
}
|
||||
|
||||
func TestItemGetSize(t *testing.T) {
|
||||
r, c, cleanup := newItemTestCache(t)
|
||||
defer cleanup()
|
||||
item, _ := c.get("potato")
|
||||
require.NoError(t, item.Open(nil))
|
||||
|
||||
size, err := item.GetSize()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(0), size)
|
||||
|
||||
n, err := item.WriteAt([]byte("hello"), 0)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 5, n)
|
||||
|
||||
size, err = item.GetSize()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(5), size)
|
||||
|
||||
require.NoError(t, item.Close(nil))
|
||||
checkObject(t, r, "potato", "hello")
|
||||
}
|
||||
|
||||
func TestItemDirty(t *testing.T) {
|
||||
r, c, cleanup := newItemTestCache(t)
|
||||
defer cleanup()
|
||||
item, _ := c.get("potato")
|
||||
require.NoError(t, item.Open(nil))
|
||||
|
||||
assert.Equal(t, false, item.IsDirty())
|
||||
|
||||
n, err := item.WriteAt([]byte("hello"), 0)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 5, n)
|
||||
|
||||
assert.Equal(t, true, item.IsDirty())
|
||||
|
||||
require.NoError(t, item.Close(nil))
|
||||
|
||||
// Sync writeback so expect clean here
|
||||
assert.Equal(t, false, item.IsDirty())
|
||||
|
||||
item.Dirty()
|
||||
|
||||
assert.Equal(t, true, item.IsDirty())
|
||||
checkObject(t, r, "potato", "hello")
|
||||
}
|
||||
|
||||
func TestItemSync(t *testing.T) {
|
||||
_, c, cleanup := newItemTestCache(t)
|
||||
defer cleanup()
|
||||
item, _ := c.get("potato")
|
||||
|
||||
require.Error(t, item.Sync())
|
||||
|
||||
require.NoError(t, item.Open(nil))
|
||||
|
||||
require.NoError(t, item.Sync())
|
||||
|
||||
require.NoError(t, item.Close(nil))
|
||||
}
|
||||
|
||||
func TestItemTruncateNew(t *testing.T) {
|
||||
r, c, cleanup := newItemTestCache(t)
|
||||
defer cleanup()
|
||||
item, _ := c.get("potato")
|
||||
|
||||
require.Error(t, item.Truncate(0))
|
||||
|
||||
require.NoError(t, item.Open(nil))
|
||||
|
||||
require.NoError(t, item.Truncate(100))
|
||||
|
||||
size, err := item.GetSize()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(100), size)
|
||||
|
||||
// Check the Close callback works
|
||||
callbackCalled := false
|
||||
callback := func(o fs.Object) {
|
||||
callbackCalled = true
|
||||
assert.Equal(t, "potato", o.Remote())
|
||||
assert.Equal(t, int64(100), o.Size())
|
||||
}
|
||||
require.NoError(t, item.Close(callback))
|
||||
assert.True(t, callbackCalled)
|
||||
|
||||
checkObject(t, r, "potato", zeroes)
|
||||
}
|
||||
|
||||
func TestItemTruncateExisting(t *testing.T) {
|
||||
r, c, cleanup := newItemTestCache(t)
|
||||
defer cleanup()
|
||||
|
||||
contents, obj, item := newFile(t, r, c, "existing")
|
||||
|
||||
require.Error(t, item.Truncate(40))
|
||||
checkObject(t, r, "existing", contents)
|
||||
|
||||
require.NoError(t, item.Open(obj))
|
||||
|
||||
require.NoError(t, item.Truncate(40))
|
||||
|
||||
require.NoError(t, item.Truncate(60))
|
||||
|
||||
require.NoError(t, item.Close(nil))
|
||||
|
||||
checkObject(t, r, "existing", contents[:40]+zeroes[:20])
|
||||
}
|
||||
|
||||
func TestItemReadAt(t *testing.T) {
|
||||
r, c, cleanup := newItemTestCache(t)
|
||||
defer cleanup()
|
||||
|
||||
contents, obj, item := newFile(t, r, c, "existing")
|
||||
buf := make([]byte, 10)
|
||||
|
||||
_, err := item.ReadAt(buf, 10)
|
||||
require.Error(t, err)
|
||||
|
||||
require.NoError(t, item.Open(obj))
|
||||
|
||||
n, err := item.ReadAt(buf, 10)
|
||||
assert.Equal(t, 10, n)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, contents[10:20], string(buf[:n]))
|
||||
|
||||
n, err = item.ReadAt(buf, 95)
|
||||
assert.Equal(t, 5, n)
|
||||
assert.Equal(t, io.EOF, err)
|
||||
assert.Equal(t, contents[95:], string(buf[:n]))
|
||||
|
||||
n, err = item.ReadAt(buf, 1000)
|
||||
assert.Equal(t, 0, n)
|
||||
assert.Equal(t, io.EOF, err)
|
||||
assert.Equal(t, contents[:0], string(buf[:n]))
|
||||
|
||||
n, err = item.ReadAt(buf, -1)
|
||||
assert.Equal(t, 0, n)
|
||||
assert.Equal(t, io.EOF, err)
|
||||
assert.Equal(t, contents[:0], string(buf[:n]))
|
||||
|
||||
require.NoError(t, item.Close(nil))
|
||||
}
|
||||
|
||||
func TestItemWriteAtNew(t *testing.T) {
|
||||
r, c, cleanup := newItemTestCache(t)
|
||||
defer cleanup()
|
||||
item, _ := c.get("potato")
|
||||
buf := make([]byte, 10)
|
||||
|
||||
_, err := item.WriteAt(buf, 10)
|
||||
require.Error(t, err)
|
||||
|
||||
require.NoError(t, item.Open(nil))
|
||||
|
||||
assert.Equal(t, int64(0), item.getDiskSize())
|
||||
|
||||
n, err := item.WriteAt([]byte("HELLO"), 10)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 5, n)
|
||||
|
||||
// FIXME we account for the sparse data we've "written" to
|
||||
// disk here so this is actually 5 bytes higher than expected
|
||||
assert.Equal(t, int64(15), item.getDiskSize())
|
||||
|
||||
n, err = item.WriteAt([]byte("THEND"), 20)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 5, n)
|
||||
|
||||
assert.Equal(t, int64(25), item.getDiskSize())
|
||||
|
||||
require.NoError(t, item.Close(nil))
|
||||
|
||||
checkObject(t, r, "potato", zeroes[:10]+"HELLO"+zeroes[:5]+"THEND")
|
||||
}
|
||||
|
||||
func TestItemWriteAtExisting(t *testing.T) {
|
||||
r, c, cleanup := newItemTestCache(t)
|
||||
defer cleanup()
|
||||
|
||||
contents, obj, item := newFile(t, r, c, "existing")
|
||||
|
||||
require.NoError(t, item.Open(obj))
|
||||
|
||||
n, err := item.WriteAt([]byte("HELLO"), 10)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 5, n)
|
||||
|
||||
n, err = item.WriteAt([]byte("THEND"), 95)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 5, n)
|
||||
|
||||
n, err = item.WriteAt([]byte("THEVERYEND"), 120)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 10, n)
|
||||
|
||||
require.NoError(t, item.Close(nil))
|
||||
|
||||
checkObject(t, r, "existing", contents[:10]+"HELLO"+contents[15:95]+"THEND"+zeroes[:20]+"THEVERYEND")
|
||||
}
|
||||
|
||||
func TestItemLoadMeta(t *testing.T) {
|
||||
r, c, cleanup := newItemTestCache(t)
|
||||
defer cleanup()
|
||||
|
||||
contents, obj, item := newFile(t, r, c, "existing")
|
||||
_ = contents
|
||||
|
||||
// Open the object to create metadata for it
|
||||
require.NoError(t, item.Open(obj))
|
||||
require.NoError(t, item.Close(nil))
|
||||
info := item.info
|
||||
|
||||
// Remove the item from the cache
|
||||
c.mu.Lock()
|
||||
delete(c.item, item.name)
|
||||
c.mu.Unlock()
|
||||
|
||||
// Reload the item so we have to load the metadata
|
||||
item2, _ := c._get("existing")
|
||||
require.NoError(t, item2.Open(obj))
|
||||
info2 := item.info
|
||||
require.NoError(t, item2.Close(nil))
|
||||
|
||||
// Check that the item is different
|
||||
assert.NotEqual(t, item, item2)
|
||||
// ... but the info is the same
|
||||
assert.Equal(t, info, info2)
|
||||
}
|
||||
|
||||
func TestItemReload(t *testing.T) {
|
||||
r, c, cleanup := newItemTestCache(t)
|
||||
defer cleanup()
|
||||
|
||||
contents, obj, item := newFile(t, r, c, "existing")
|
||||
_ = contents
|
||||
|
||||
// Open the object to create metadata for it
|
||||
require.NoError(t, item.Open(obj))
|
||||
|
||||
// Make it dirty
|
||||
n, err := item.WriteAt([]byte("THEENDMYFRIEND"), 95)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 14, n)
|
||||
assert.True(t, item.IsDirty())
|
||||
|
||||
// Close the file to pacify Windows, but don't call item.Close()
|
||||
item.mu.Lock()
|
||||
require.NoError(t, item.fd.Close())
|
||||
item.fd = nil
|
||||
item.mu.Unlock()
|
||||
|
||||
// Remove the item from the cache
|
||||
c.mu.Lock()
|
||||
delete(c.item, item.name)
|
||||
c.mu.Unlock()
|
||||
|
||||
// Reload the item so we have to load the metadata and restart
|
||||
// the transfer
|
||||
item2, _ := c._get("existing")
|
||||
require.NoError(t, item2.reload(context.Background()))
|
||||
assert.False(t, item2.IsDirty())
|
||||
|
||||
// Check that the item is different
|
||||
assert.NotEqual(t, item, item2)
|
||||
|
||||
// And check the contents got written back to the remote
|
||||
checkObject(t, r, "existing", contents[:95]+"THEENDMYFRIEND")
|
||||
}
|
||||
|
||||
func TestItemReloadRemoteGone(t *testing.T) {
|
||||
r, c, cleanup := newItemTestCache(t)
|
||||
defer cleanup()
|
||||
|
||||
contents, obj, item := newFile(t, r, c, "existing")
|
||||
_ = contents
|
||||
|
||||
// Open the object to create metadata for it
|
||||
require.NoError(t, item.Open(obj))
|
||||
|
||||
size, err := item.GetSize()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(100), size)
|
||||
|
||||
// Read something to instantiate the cache file
|
||||
buf := make([]byte, 10)
|
||||
_, err = item.ReadAt(buf, 10)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Test cache file present
|
||||
_, err = os.Stat(item.c.toOSPath(item.name))
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, item.Close(nil))
|
||||
|
||||
// Remove the remote object
|
||||
require.NoError(t, obj.Remove(context.Background()))
|
||||
|
||||
// Re-open with no object
|
||||
require.NoError(t, item.Open(nil))
|
||||
|
||||
// Check size is now 0
|
||||
size, err = item.GetSize()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(0), size)
|
||||
|
||||
// Test cache file is now empty
|
||||
fi, err := os.Stat(item.c.toOSPath(item.name))
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(0), fi.Size())
|
||||
|
||||
require.NoError(t, item.Close(nil))
|
||||
}
|
||||
|
||||
func TestItemReloadCacheStale(t *testing.T) {
|
||||
r, c, cleanup := newItemTestCache(t)
|
||||
defer cleanup()
|
||||
|
||||
contents, obj, item := newFile(t, r, c, "existing")
|
||||
|
||||
// Open the object to create metadata for it
|
||||
require.NoError(t, item.Open(obj))
|
||||
|
||||
size, err := item.GetSize()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(100), size)
|
||||
|
||||
// Read something to instantiate the cache file
|
||||
buf := make([]byte, 10)
|
||||
_, err = item.ReadAt(buf, 10)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Test cache file present
|
||||
_, err = os.Stat(item.c.toOSPath(item.name))
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, item.Close(nil))
|
||||
|
||||
// Update the remote to something different
|
||||
contents2, obj, item := newFileLength(t, r, c, "existing", 110)
|
||||
assert.NotEqual(t, contents, contents2)
|
||||
|
||||
// Re-open with updated object
|
||||
require.NoError(t, item.Open(obj))
|
||||
|
||||
// Check size is now 110
|
||||
size, err = item.GetSize()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(110), size)
|
||||
|
||||
// Test cache file is now correct size
|
||||
fi, err := os.Stat(item.c.toOSPath(item.name))
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(110), fi.Size())
|
||||
|
||||
// Write to the file to make it dirty
|
||||
// This checks we aren't re-using stale data
|
||||
n, err := item.WriteAt([]byte("HELLO"), 0)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 5, n)
|
||||
assert.Equal(t, true, item.IsDirty())
|
||||
|
||||
require.NoError(t, item.Close(nil))
|
||||
|
||||
// Now check with all that swizzling stuff around that the
|
||||
// object is correct
|
||||
|
||||
checkObject(t, r, "existing", "HELLO"+contents2[5:])
|
||||
}
|
||||
|
||||
func TestItemReadWrite(t *testing.T) {
|
||||
r, c, cleanup := newItemTestCache(t)
|
||||
defer cleanup()
|
||||
const (
|
||||
size = 50*1024*1024 + 123
|
||||
fileName = "large"
|
||||
)
|
||||
|
||||
item, _ := c.get(fileName)
|
||||
require.NoError(t, item.Open(nil))
|
||||
|
||||
// Create the test file
|
||||
in := readers.NewPatternReader(size)
|
||||
buf := make([]byte, 1024*1024)
|
||||
buf2 := make([]byte, 1024*1024)
|
||||
offset := int64(0)
|
||||
for {
|
||||
n, err := in.Read(buf)
|
||||
n2, err2 := item.WriteAt(buf[:n], offset)
|
||||
offset += int64(n2)
|
||||
require.NoError(t, err2)
|
||||
require.Equal(t, n, n2)
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Check it is the right size
|
||||
readSize, err := item.GetSize()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(size), readSize)
|
||||
|
||||
require.NoError(t, item.Close(nil))
|
||||
|
||||
assert.False(t, item.remove(fileName))
|
||||
|
||||
obj, err := r.Fremote.NewObject(context.Background(), fileName)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(size), obj.Size())
|
||||
|
||||
// read and check a block of size N at offset
|
||||
// It returns eof true if the end of file has been reached
|
||||
readCheckBuf := func(t *testing.T, in io.ReadSeeker, buf, buf2 []byte, item *Item, offset int64, N int) (n int, eof bool) {
|
||||
what := fmt.Sprintf("buf=%p, buf2=%p, item=%p, offset=%d, N=%d", buf, buf2, item, offset, N)
|
||||
n, err := item.ReadAt(buf, offset)
|
||||
|
||||
_, err2 := in.Seek(offset, io.SeekStart)
|
||||
require.NoError(t, err2, what)
|
||||
n2, err2 := in.Read(buf2[:n])
|
||||
require.Equal(t, n, n2, what)
|
||||
assert.Equal(t, buf[:n], buf2[:n2], what)
|
||||
assert.Equal(t, buf[:n], buf2[:n2], what)
|
||||
|
||||
if err == io.EOF {
|
||||
return n, true
|
||||
}
|
||||
require.NoError(t, err, what)
|
||||
require.NoError(t, err2, what)
|
||||
return n, false
|
||||
}
|
||||
readCheck := func(t *testing.T, item *Item, offset int64, N int) (n int, eof bool) {
|
||||
return readCheckBuf(t, in, buf, buf2, item, offset, N)
|
||||
}
|
||||
|
||||
// Read it back sequentially
|
||||
t.Run("Sequential", func(t *testing.T) {
|
||||
require.NoError(t, item.Open(obj))
|
||||
assert.False(t, item.present())
|
||||
offset := int64(0)
|
||||
for {
|
||||
n, eof := readCheck(t, item, offset, len(buf))
|
||||
offset += int64(n)
|
||||
if eof {
|
||||
break
|
||||
}
|
||||
}
|
||||
assert.Equal(t, int64(size), offset)
|
||||
require.NoError(t, item.Close(nil))
|
||||
assert.False(t, item.remove(fileName))
|
||||
})
|
||||
|
||||
// Read it back randomly
|
||||
t.Run("Random", func(t *testing.T) {
|
||||
require.NoError(t, item.Open(obj))
|
||||
assert.False(t, item.present())
|
||||
for !item.present() {
|
||||
blockSize := rand.Intn(len(buf))
|
||||
offset := rand.Int63n(size+2*int64(blockSize)) - int64(blockSize)
|
||||
if offset < 0 {
|
||||
offset = 0
|
||||
}
|
||||
_, _ = readCheck(t, item, offset, blockSize)
|
||||
}
|
||||
require.NoError(t, item.Close(nil))
|
||||
assert.False(t, item.remove(fileName))
|
||||
})
|
||||
|
||||
// Read it back randomly concurently
|
||||
t.Run("RandomConcurrent", func(t *testing.T) {
|
||||
require.NoError(t, item.Open(obj))
|
||||
assert.False(t, item.present())
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 8; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
in := readers.NewPatternReader(size)
|
||||
buf := make([]byte, 1024*1024)
|
||||
buf2 := make([]byte, 1024*1024)
|
||||
for !item.present() {
|
||||
blockSize := rand.Intn(len(buf))
|
||||
offset := rand.Int63n(size+2*int64(blockSize)) - int64(blockSize)
|
||||
if offset < 0 {
|
||||
offset = 0
|
||||
}
|
||||
_, _ = readCheckBuf(t, in, buf, buf2, item, offset, blockSize)
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
require.NoError(t, item.Close(nil))
|
||||
assert.False(t, item.remove(fileName))
|
||||
})
|
||||
|
||||
// Read it back in reverse which creates the maximum number of
|
||||
// downloaders
|
||||
t.Run("Reverse", func(t *testing.T) {
|
||||
require.NoError(t, item.Open(obj))
|
||||
assert.False(t, item.present())
|
||||
offset := int64(size)
|
||||
for {
|
||||
blockSize := len(buf)
|
||||
offset -= int64(blockSize)
|
||||
if offset < 0 {
|
||||
offset = 0
|
||||
blockSize += int(offset)
|
||||
}
|
||||
_, _ = readCheck(t, item, offset, blockSize)
|
||||
if offset == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
require.NoError(t, item.Close(nil))
|
||||
assert.False(t, item.remove(fileName))
|
||||
})
|
||||
}
|
Loading…
Reference in a new issue