diff --git a/.forgejo/workflows/tests.yml b/.forgejo/workflows/tests.yml index 496df944c..5bfa28b88 100644 --- a/.forgejo/workflows/tests.yml +++ b/.forgejo/workflows/tests.yml @@ -44,7 +44,7 @@ jobs: AIO_VERSION: 1.7.0-nightly.4 RCLONE_CONFIG: /config/rclone.conf - # run only tests related to FrostFS backend + # run only tests related to FrostFS backend and extract command run: |- podman-service.sh podman info @@ -64,4 +64,5 @@ jobs: docker cp aio:/config/user-wallet.json /config/wallet.json echo "Start tests" - go test -v github.com/rclone/rclone/backend/frostfs \ No newline at end of file + go test -v github.com/rclone/rclone/backend/frostfs + go test -v -cover --race ./fs/extract/... \ No newline at end of file diff --git a/cmd/all/all.go b/cmd/all/all.go index 37b15c5c3..34c3d68f9 100644 --- a/cmd/all/all.go +++ b/cmd/all/all.go @@ -23,6 +23,7 @@ import ( _ "github.com/rclone/rclone/cmd/dedupe" _ "github.com/rclone/rclone/cmd/delete" _ "github.com/rclone/rclone/cmd/deletefile" + _ "github.com/rclone/rclone/cmd/extract" _ "github.com/rclone/rclone/cmd/genautocomplete" _ "github.com/rclone/rclone/cmd/gendocs" _ "github.com/rclone/rclone/cmd/gitannex" diff --git a/cmd/extract/extract.go b/cmd/extract/extract.go new file mode 100644 index 000000000..2af96f4ee --- /dev/null +++ b/cmd/extract/extract.go @@ -0,0 +1,62 @@ +// Package extract provides the extract command. +package extract + +import ( + "context" + "errors" + "strings" + + "github.com/rclone/rclone/cmd" + "github.com/rclone/rclone/fs/config/flags" + "github.com/rclone/rclone/fs/extract" + "github.com/spf13/cobra" +) + +var ( + maxFileSizeForCache = 0x100000 // 1Mb +) + +func init() { + cmd.Root.AddCommand(commandDefinition) + cmdFlags := commandDefinition.Flags() + flags.IntVarP(cmdFlags, + &maxFileSizeForCache, + "max-file-size-for-cache", + "", + maxFileSizeForCache, + `Maximum file size that can be placed in the cache. Files of larger +size will be read without caching, which will prevent parallelization +of copy operations and may result in longer archive unpacking time.`, + "") +} + +var commandDefinition = &cobra.Command{ + Use: "extract source:path/to/archive.tar.gz dest:path", + Short: `Extract the tar.gz archive from source to the dest, skipping identical files.`, + // Note: "|" will be replaced by backticks below + Long: strings.ReplaceAll(`Extract the tar.gz archive files from the source to the destination. Does not transfer files that are +identical on source and destination, testing by size and modification +time or MD5SUM. Doesn't delete files from the destination. + +If dest:path doesn't exist, it is created and the contents of the archive source:path/to/targz/archive +go there. + +**Note**: Use the |-P|/|--progress| flag to view real-time transfer statistics. + +**Note**: Use the |--dry-run| or the |--interactive|/|-i| flag to test without copying anything. +`, "|", "`"), + Annotations: map[string]string{ + "groups": "Important", + }, + Run: func(command *cobra.Command, args []string) { + + cmd.CheckArgs(2, 2, command, args) + fsrc, srcFileName, fdst := cmd.NewFsSrcFileDst(args) + cmd.Run(true, true, command, func() error { + if srcFileName == "" { + return errors.New("the source is required to be a file") + } + return extract.Extract(context.Background(), fdst, fsrc, srcFileName, maxFileSizeForCache) + }) + }, +} diff --git a/fs/extract/dirtreeholder.go b/fs/extract/dirtreeholder.go new file mode 100644 index 000000000..ccf52f79e --- /dev/null +++ b/fs/extract/dirtreeholder.go @@ -0,0 +1,267 @@ +package extract + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + + "github.com/rclone/rclone/fs" +) + +// Dir represents a directory with objects. +type Dir struct { + name string + objs map[string]fs.DirEntry + dirs map[string]*Dir +} + +// NewDir creates a new directory. +func NewDir(name string) *Dir { + return &Dir{ + name: name, + objs: make(map[string]fs.DirEntry), + dirs: make(map[string]*Dir), + } +} + +// Name returns the name of the directory. +func (d *Dir) Name() string { + return d.name +} + +// GetDirByName returns a subdirectory with the given name +func (d *Dir) GetDirByName(name string) *Dir { + if d.name == name { + return d + } + if len(name) <= len(d.name) || !strings.HasPrefix(name, d.name) { + return nil + } + if d.name != "" && name[len(d.name)] != '/' { + return nil + } + idx := strings.Index(name[len(d.name)+1:], "/") + nextDir := name + if idx != -1 { + nextDir = name[:idx+len(d.name)+1] + } + if td, ok := d.dirs[nextDir]; ok { + return td.GetDirByName(name) + } + return nil +} + +// AddChildDir adds a child directory +func (d *Dir) AddChildDir(child *Dir) { + d.dirs[child.name] = child +} + +// AddChildObject adds information about the child object +func (d *Dir) AddChildObject(child fs.DirEntry) { + d.objs[child.Remote()] = child +} + +type listDirFn func(dir string) (*Dir, error) + +type dirTreeHolder struct { + listF listDirFn + mu sync.Mutex + root *Dir + dirsInWork map[string]*sync.WaitGroup +} + +type strDirIter struct { + dir string + idx int +} + +func newStrDirIter(dir string) *strDirIter { + return &strDirIter{ + dir: dir, + idx: -1, + } +} + +func (s *strDirIter) next() (cur string, next string) { + if s.dir == "" { + return "", "" + } + if s.idx == -1 { + s.idx = 0 + idx := strings.Index(s.dir, "/") + if idx == -1 { + return "", s.dir + } + return "", s.dir[:idx] + } + idx := strings.Index(s.dir[s.idx:], "/") + if idx == -1 { + return s.dir, "" + } + defer func(idx int) { + s.idx = s.idx + idx + 1 + }(idx) + + idx2 := strings.Index(s.dir[s.idx+idx+1:], "/") + if idx2 == -1 { + return s.dir[:idx+s.idx], s.dir + } + return s.dir[:idx+s.idx], s.dir[:idx+idx2+s.idx+1] +} + +func (d *Dir) getObjectByPath(path string) fs.DirEntry { + var dirName string + idx := strings.LastIndex(path, "/") + if idx != -1 { + dirName = path[:idx] + } + dir := d.GetDirByName(dirName) + if dir == nil { + return nil + } + return dir.objs[path] +} + +func newDirStructHolder(listF listDirFn) *dirTreeHolder { + return &dirTreeHolder{ + listF: listF, + root: nil, + dirsInWork: make(map[string]*sync.WaitGroup), + } +} + +func (l *dirTreeHolder) getObjectByPath(path string) fs.DirEntry { + l.mu.Lock() + defer l.mu.Unlock() + if l.root == nil { + return nil + } + return l.root.getObjectByPath(path) +} + +func (l *dirTreeHolder) isDirExist(path string) bool { + l.mu.Lock() + defer l.mu.Unlock() + if l.root == nil { + return false + } + return l.root.GetDirByName(path) != nil +} + +func (l *dirTreeHolder) updateDirList(ctx context.Context, dir string) error { + l.mu.Lock() + iter := newStrDirIter(dir) + aborting := func() bool { + select { + case <-ctx.Done(): + return true + default: + return false + } + } + + // Special case: even if an empty dir is passed, listing the root folder is still required + firstRun := true + for { + if aborting() { + l.mu.Unlock() + return ctx.Err() + } + curDir, nextDir := iter.next() + if nextDir == "" && !firstRun { + l.mu.Unlock() + return nil + } + firstRun = true + if wg, ok := l.dirsInWork[curDir]; ok { + l.mu.Unlock() + wg.Wait() + if aborting() { + return ctx.Err() + } + l.mu.Lock() + } + if curDir == "" && l.root == nil { + if wg, ok := l.dirsInWork[curDir]; ok { + l.mu.Unlock() + wg.Wait() + if aborting() { + return ctx.Err() + } + l.mu.Lock() + dir := l.root.GetDirByName(curDir) + if dir == nil { + l.mu.Unlock() + return fmt.Errorf("error while updating info about dir %s", nextDir) + } + } else { + wg := &sync.WaitGroup{} + wg.Add(1) + l.dirsInWork[curDir] = wg + l.mu.Unlock() + d, err := l.listF(curDir) + + if err != nil { + if !errors.Is(err, fs.ErrorDirNotFound) { + return err + } + d = NewDir(curDir) + } + l.mu.Lock() + l.root = d + wg = l.dirsInWork[curDir] + delete(l.dirsInWork, curDir) + wg.Done() + } + } + d := l.root.GetDirByName(curDir) + if d == nil { + return errors.New("not possible to go where") + } + if _, ok := d.dirs[nextDir]; ok { + if ok { + continue + } + } + if o, ok := d.objs[nextDir]; ok { + // Where is no such directory + if _, ok := o.(fs.Directory); !ok { + l.mu.Unlock() + return nil + } + + if wg, ok := l.dirsInWork[nextDir]; ok { + l.mu.Unlock() + wg.Wait() + if aborting() { + return ctx.Err() + } + l.mu.Lock() + dir := d.GetDirByName(nextDir) + if dir == nil { + l.mu.Unlock() + return fmt.Errorf("error while updating info about dir %s", nextDir) + } + } else { + wg := &sync.WaitGroup{} + wg.Add(1) + l.dirsInWork[nextDir] = wg + l.mu.Unlock() + td, err := l.listF(nextDir) + if err != nil { + return err + } + l.mu.Lock() + d.dirs[nextDir] = td + wg = l.dirsInWork[nextDir] + delete(l.dirsInWork, nextDir) + wg.Done() + } + } else { + l.mu.Unlock() + return nil + } + } +} diff --git a/fs/extract/dirtreeholder_test.go b/fs/extract/dirtreeholder_test.go new file mode 100644 index 000000000..5f618c2e4 --- /dev/null +++ b/fs/extract/dirtreeholder_test.go @@ -0,0 +1,371 @@ +package extract + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sort" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/extract/tarcache/tartest" + "github.com/rclone/rclone/fs/hash" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +type dirEntryForTest struct { + name string +} + +func (d dirEntryForTest) Fs() fs.Info { + return nil +} + +func (d dirEntryForTest) String() string { + return d.name +} + +func (d dirEntryForTest) Remote() string { + return d.name +} + +func (d dirEntryForTest) ModTime(context.Context) time.Time { + return time.Now() +} + +func (d dirEntryForTest) Size() int64 { + return -1 +} + +type dirForTest struct { + dirEntryForTest +} + +func (d dirForTest) Items() int64 { + return -1 +} + +func (d dirForTest) ID() string { + return "" +} + +type objectForTest struct { + dirEntryForTest +} + +func (o objectForTest) Hash(context.Context, hash.Type) (string, error) { + return "", nil +} + +func (o objectForTest) Storable() bool { + return true +} + +func createDirectory(dirName string) fs.DirEntry { + return &dirForTest{ + dirEntryForTest{name: dirName}, + } +} + +func createObject(dirName string) fs.DirEntry { + return &objectForTest{ + dirEntryForTest{name: dirName}, + } +} + +func createDirTree(dir *Dir, deep, minItemsInDir, maxItemsInDir int) { + if deep <= 0 { + return + } + rnd := tartest.NewRandomizer() + + nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir + + for j := 0; j < nItem; j++ { + isFile := deep == 1 || rnd.Dice(50) + var prefix string + if len(dir.name) > 0 { + prefix = dir.name + "/" + } + if isFile { + name := fmt.Sprintf("%sfile_%s", prefix, rnd.RandomFileName()) + newItem := createObject(name) + dir.objs[name] = newItem + } else { + name := fmt.Sprintf("%sdir_%s", prefix, rnd.RandomFileName()) + newItem := createDirectory(name) + dir.objs[name] = newItem + childDir := NewDir(name) + createDirTree(childDir, deep-1, minItemsInDir, maxItemsInDir) + if len(childDir.dirs) != 0 || len(childDir.objs) != 0 { + dir.dirs[childDir.name] = childDir + } + } + } +} + +type listFnHelper struct { + mu sync.Mutex + cnt atomic.Uint64 + root *Dir +} + +func (l *listFnHelper) Cnt() uint64 { + return l.cnt.Load() +} + +func (l *listFnHelper) ResetCnt() { + l.cnt.Store(0) +} + +func (l *listFnHelper) listDir(dir string) (*Dir, error) { + l.cnt.Add(1) + l.mu.Lock() + defer l.mu.Unlock() + d := l.root.GetDirByName(dir) + if d == nil { + return nil, os.ErrNotExist + } + newDir := NewDir(d.name) + for _, child := range d.objs { + newDir.AddChildObject(child) + } + return newDir, nil +} + +func fillSliceWithDirsInfo(s *[]string, dir *Dir) { + for path, child := range dir.objs { + if _, ok := child.(fs.Directory); ok { + *s = append(*s, path) + } + } + for _, child := range dir.dirs { + fillSliceWithDirsInfo(s, child) + } +} + +func fillMapWithFilesInfo(m map[string][]string, dir *Dir) { + files := make([]string, 0) + for path, child := range dir.objs { + if _, ok := child.(fs.ObjectInfo); ok { + files = append(files, filepath.Base(path)) + } + } + + m[dir.Name()] = files + + for _, child := range dir.dirs { + fillMapWithFilesInfo(m, child) + } +} + +func TestListing(t *testing.T) { + t.Run("Dir struct holder", testDirTreeHolder) + + t.Run("Dir", testDir) + + t.Run("String directory iterating", testStrDirIter) +} + +func testDirTreeHolder(t *testing.T) { + root := NewDir("") + createDirTree(root, 6, 5, 10) + listF := &listFnHelper{ + root: root, + } + t.Run("Concurrent listing", func(t *testing.T) { + s := make([]string, 0) + m := make(map[string][]string) + fillSliceWithDirsInfo(&s, root) + fillMapWithFilesInfo(m, root) + sort.Slice(s, func(i, j int) bool { + return len(s[i]) < len(s[j]) + }) + require.NotNil(t, root) + holder := newDirStructHolder(listF.listDir) + halfLen := len(s) / 2 + path := s[halfLen] + expectedCnt := 0 + dIter := newStrDirIter(path) + for { + expectedCnt++ + _, next := dIter.next() + if next == "" { + break + } + } + require.NotNil(t, holder) + + eg, ctx := errgroup.WithContext(context.Background()) + eg.Go(func() error { + return holder.updateDirList(ctx, path) + }) + eg.Go(func() error { + return holder.updateDirList(ctx, path) + }) + eg.Go(func() error { + return holder.updateDirList(ctx, path) + }) + eg.Go(func() error { + return holder.updateDirList(ctx, path+"not.exists") + }) + err := eg.Wait() + require.NoError(t, err) + require.Equal(t, uint64(expectedCnt), listF.Cnt()) + dIter = newStrDirIter(path) + var cur, next string + next = "1" + for next != "" { + cur, next = dIter.next() + if next == "" { + break + } + require.True(t, holder.isDirExist(cur)) + files, ok := m[cur] + require.True(t, ok) + for _, file := range files { + var filePath string + if cur == "" { + filePath = file + } else { + filePath = cur + "/" + file + } + obj := holder.getObjectByPath(filePath) + require.NotNil(t, obj) + + require.True(t, ok) + require.Equal(t, filePath, obj.Remote()) + } + } + }) +} + +func testDir(t *testing.T) { + finalDir := "path/with/more/than/one/dir" + files := map[string][]string{ + "": {"file0.1.ext", "file0.2.ext"}, + "path": {"file1.1.ext", "file1.2.ext"}, + "path/with/more/than/one": {"file2.1.ext", "file2.2.ext"}, + "path/with/more/than/one/dir": {"file3.1.ext", "file3.2.ext"}, + } + + dirIter := newStrDirIter(finalDir) + var root, prevDir *Dir + next := "1" + for next != "" { + var cur string + cur, next = dirIter.next() + dir := NewDir(cur) + if root == nil { + root = dir + } + if files, ok := files[cur]; ok { + for _, file := range files { + var path string + if cur == "" { + path = file + } else { + path = cur + "/" + file + } + dir.AddChildObject(createObject(path)) + + } + } + if prevDir != nil { + prevDir.AddChildDir(dir) + prevDir.AddChildObject(createDirectory(dir.Name())) + } + prevDir = dir + } + + t.Run("GetDirByName", func(t *testing.T) { + dirIter = newStrDirIter(finalDir) + next = "1" + cnt := 0 + for next != "" { + var cur string + cur, next = dirIter.next() + dir := root.GetDirByName(cur) + require.NotNil(t, dir) + cnt++ + } + require.Equal(t, 7, cnt) + }) + + t.Run("getObjectByPath", func(t *testing.T) { + for dirName, fileNames := range files { + for _, fileName := range fileNames { + var filePath string + if dirName == "" { + filePath = fileName + } else { + filePath = dirName + "/" + fileName + } + obj := root.getObjectByPath(filePath) + require.NotNil(t, obj) + require.NotNil(t, obj.(fs.ObjectInfo)) + filePath += ".fail" + obj = root.getObjectByPath(filePath) + require.Nil(t, obj) + } + } + }) + +} + +func testStrDirIter(t *testing.T) { + for _, tc := range []struct { + name string + dir string + res []string + }{ + { + name: "path with more than one dir", + dir: "path/with/more/than/one/dir", + res: []string{ + "", + "path", + "path/with", + "path/with/more", + "path/with/more/than", + "path/with/more/than/one", + "path/with/more/than/one/dir", + "", + }, + }, + { + name: "path with one dir", + dir: "one_dir", + res: []string{ + "", + "one_dir", + "", + }, + }, + { + name: "empty path", + dir: "", + res: []string{ + "", + "", + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + l := newStrDirIter(tc.dir) + for i, p := range tc.res { + if p == "" && i > 0 { + break + } + cur, next := l.next() + require.Equal(t, cur, p) + require.Equal(t, next, tc.res[i+1]) + } + }) + } +} diff --git a/fs/extract/extract.go b/fs/extract/extract.go new file mode 100644 index 000000000..5e027458e --- /dev/null +++ b/fs/extract/extract.go @@ -0,0 +1,845 @@ +// Package extract is the implementation of extract command +package extract + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "context" + "errors" + "fmt" + "io" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/extract/tarcache" + "github.com/rclone/rclone/fs/filter" + "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/fs/list" + "github.com/rclone/rclone/fs/operations" +) + +type payloadType int + +const ( + payloadTypeBytes payloadType = iota + payloadTypeIo +) + +// TarCacheFile is a wrapper around tarcache.FileInfo that adds support for file reopening functionality +type TarCacheFile struct { + owner *TarCache + pt payloadType + fileInfo tarcache.FileInfo + reader io.Reader +} + +// Release releases resource associated with the file +func (t *TarCacheFile) Release() { + t.fileInfo.ReleaseFn() +} + +// Read implements io.Reader interface +func (t *TarCacheFile) Read(p []byte) (n int, err error) { + return t.reader.Read(p) +} + +// reopenIo implements file reopening for files that don't fit into memory +func (t *TarCacheFile) reopenIo(ctx context.Context) (err error) { + return t.owner.requestReopenFile(ctx, t) +} + +// reopenBytes implements file reopening for files that fit into memory +func (t *TarCacheFile) reopenBytes(ctx context.Context) (err error) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + t.reader = bytes.NewReader(t.fileInfo.Payload) + return nil + } +} + +// Reopen initializes the file reopening process +func (t *TarCacheFile) Reopen(ctx context.Context) (err error) { + if t.pt == payloadTypeBytes { + return t.reopenBytes(ctx) + } + return t.reopenIo(ctx) +} + +func (t *TarCache) requestReopenFile(ctx context.Context, file *TarCacheFile) error { + reopenReq := reopenRequest{ + file: file, + respCh: make(chan error, 1), + } + + select { + case t.reopenReqCh <- reopenReq: + case <-ctx.Done(): + return ctx.Err() + case <-t.finishCh: + return errTarCacheIsClosed + } + + select { + case err := <-reopenReq.respCh: + return err + case <-ctx.Done(): + return ctx.Err() + case <-t.finishCh: + return errTarCacheIsClosed + } +} + +// newTarCacheFile creates a new TarCacheFile from the given tarcache.FileInfo +func (t *TarCache) newTarCacheFile(info tarcache.FileInfo) *TarCacheFile { + res := &TarCacheFile{ + owner: t, + pt: payloadTypeIo, + fileInfo: info, + reader: info.IoPayload, + } + // If IoPayload is nil, we assume that the file is small enough to fit into memory + if info.IoPayload == nil { + res.pt = payloadTypeBytes + res.reader = bytes.NewReader(info.Payload) + } + return res +} + +// TarCache implements the functionality of reopening files within an archive +type TarCache struct { + // parameters + fsrc fs.Fs + srcFileName string + newTarReaderFn newTarReader + transfers int + maxFileSizeForCache int + options []fs.OpenOption + + // no thread safe internal state + closeStreamFn func() error + tarCache *tarcache.Cache + workerCh chan func() + reopenReqCh chan reopenRequest + finishCh chan struct{} + pendingCloseCache []*tarcache.Cache + + // thread safe internal state + mu sync.Mutex + cache map[string]tarcache.FileInfo +} + +func newTarCache(fsrc fs.Fs, srcFileName string, newTarReaderFn newTarReader, transfers int, maxFileSizeForCache int) *TarCache { + res := &TarCache{ + fsrc: fsrc, + srcFileName: srcFileName, + newTarReaderFn: newTarReaderFn, + transfers: transfers, + maxFileSizeForCache: maxFileSizeForCache, + workerCh: make(chan func()), + reopenReqCh: make(chan reopenRequest), + finishCh: make(chan struct{}), + cache: make(map[string]tarcache.FileInfo), + } + + if res.newTarReaderFn == nil { + res.newTarReaderFn = func(reader io.Reader) tarcache.TarReader { + return tar.NewReader(reader) + } + } + // Run a goroutine that will handle functions waiting for file reopen requests + go func() { + for workFn := range res.workerCh { + workFn() + } + }() + return res +} + +// Close closes the tar cache and release all resources associated with it +func (t *TarCache) Close() error { + close(t.workerCh) + err := t.closeHelper() + if t.tarCache != nil { + _ = t.tarCache.Close() + } + return err +} + +type reopenRequest struct { + file *TarCacheFile + respCh chan error +} + +var errReopenArchiveRequest = errors.New("reopen archive request") +var errTarCacheIsClosed = errors.New("tar cache is closed") + +func (t *TarCache) handleReopenRequest(ctx context.Context, reopenReq reopenRequest) { + defer close(reopenReq.respCh) + t.pendingCloseCache = append(t.pendingCloseCache, t.tarCache) + // Reopening a file in the archive requires closing the currently open archive first + _ = t.closeHelper() + // Release the resources occupied by the file + reopenReq.file.Release() + var err error + prev := t.tarCache + // Close the underlying tarcache.Cache + _ = prev.Close() + // Create a new tarcahce.Cache and function that closes the underlying gzip reader + t.tarCache, t.closeStreamFn, err = t.openHelper(ctx) + if err != nil { + // Notify the goroutine that requested the file reopening about error + reopenReq.respCh <- err + return + } + // Search for the required file within the archive + err = t.FindFile(ctx, reopenReq.file.fileInfo.FileInfo.FilePath) + if err == nil { + // Create a new TarCacheFile from the found FileInfo + file := t.GetFile(reopenReq.file.fileInfo.FileInfo.FilePath) + // If the previous call to FindFile completed without errors, then GetFile must return a non-nil TarCacheFile + if file == nil { + panic("shouldn't get here") + } + *reopenReq.file = *file + } + // Notify the goroutine that requested the file reopening about the result + reopenReq.respCh <- err +} + +// FindFile searches for the file with the given filePath within the archive +func (t *TarCache) FindFile(ctx context.Context, filePath string) error { + for { + select { + case <-t.finishCh: + return errTarCacheIsClosed + default: + } + // Iterate through the archive files until the target file is found + h, err := t.tarCache.Next(ctx) + if err != nil { + return err + } + if h.FileInfo.FilePath != filePath { + // If the file is not the target, free its resources right away + h.ReleaseFn() + continue + } + + t.mu.Lock() + // Store the information about the file in the cache + t.cache[h.FileInfo.FilePath] = h + t.mu.Unlock() + return nil + } +} + +// GetFile constructs a TarCacheFile from cached data, relying on the file path +func (t *TarCache) GetFile(filePath string) *TarCacheFile { + t.mu.Lock() + defer t.mu.Unlock() + info, ok := t.cache[filePath] + if !ok { + return nil + } + return t.newTarCacheFile(info) +} + +// ReleaseUnprocessedFiles releases resources occupied by files that were not processed +func (t *TarCache) ReleaseUnprocessedFiles() { + t.mu.Lock() + // Here we are preparing a list of functions that release resources, because we cannot call them when + // iterating over the map, since the ReleaseFn method also uses the same mutex that protects the map. + releaseFns := make([]func(), 0, len(t.cache)) + for _, info := range t.cache { + releaseFns = append(releaseFns, info.ReleaseFn) + } + t.cache = make(map[string]tarcache.FileInfo) + t.mu.Unlock() + for _, fn := range releaseFns { + fn() + } +} + +// Next advances to the next file in the archive, caches its information, and returns the file's path within the archive. +// This method also handles file reopen requests within the archive +func (t *TarCache) Next(ctx context.Context) (string, error) { + select { + case <-t.finishCh: + return "", errTarCacheIsClosed + default: + } + for { + watchdog := make(chan struct{}) + var reopenReq reopenRequest + ctx2, cancel := context.WithCancelCause(ctx) + var wg sync.WaitGroup + wg.Add(1) + t.workerCh <- func() { + defer wg.Done() + select { + case <-watchdog: + case reopenReq = <-t.reopenReqCh: + // When a file reopen request arrives, it is necessary to interrupt the execution of t.tarCache.Next + cancel(errReopenArchiveRequest) + } + } + h, err := t.tarCache.Next(ctx2) + relFn := h.ReleaseFn + // Create a wrapper around the resource release function to additionally remove file information from the cache + h.ReleaseFn = func() { + t.mu.Lock() + delete(t.cache, h.FileInfo.FilePath) + t.mu.Unlock() + relFn() + } + // Close the watchdog channel to exit from the watchdog function + close(watchdog) + // Wait for the watchdog function to complete its execution + wg.Wait() + for _, cache := range t.pendingCloseCache { + _ = cache.Close() + } + t.pendingCloseCache = nil + if err != nil { + // When the error is triggered by a file reopen request, process the request accordingly + if errors.Is(context.Cause(ctx2), errReopenArchiveRequest) { + t.handleReopenRequest(ctx, reopenReq) + continue + } + cancel(nil) + close(t.finishCh) + if err == io.EOF { + return "", nil + } + + return "", err + } + cancel(nil) + + t.mu.Lock() + _, ok := t.cache[h.FileInfo.FilePath] + if ok { + panic("shouldn't get here") + } + t.cache[h.FileInfo.FilePath] = h + t.mu.Unlock() + return h.FileInfo.FilePath, nil + } +} + +func (t *TarCache) closeHelper() error { + if t.closeStreamFn != nil { + fn := t.closeStreamFn + t.closeStreamFn = nil + return fn() + } + return nil +} + +// openHelper opens a new stream to the tar archive +func (t *TarCache) openHelper(ctx context.Context) (cache *tarcache.Cache, closeFn func() error, err error) { + obj, err := t.fsrc.NewObject(ctx, t.srcFileName) + if err != nil { + return nil, nil, err + } + + reader, err := obj.Open(ctx, t.options...) + + if err != nil { + return nil, nil, err + } + + gzr, err := gzip.NewReader(reader) + if err != nil { + // we don't care about the error, we just need to close the reader + _ = reader.Close() + return nil, nil, err + } + + tarReader := t.newTarReaderFn(gzr) + cache = tarcache.NewTarCache(tarReader, t.transfers, int64(t.maxFileSizeForCache)) + return cache, func() error { + gzErr := gzr.Close() + readerErr := reader.Close() + if gzErr != nil { + return gzErr + } + return readerErr + }, nil +} + +// Open opens the tar archive from remote +func (t *TarCache) Open(ctx context.Context, options ...fs.OpenOption) error { + t.options = options + var err error + t.tarCache, t.closeStreamFn, err = t.openHelper(ctx) + return err +} + +type copyJob struct { + src fs.Object + dst fs.Object + path string +} + +func makeListDir(ctx context.Context, f fs.Fs) listDirFn { + return func(dir string) (*Dir, error) { + dirCtx := filter.SetUseFilter(ctx, f.Features().FilterAware) // make filter-aware backends constrain List + entities, err := list.DirSorted(dirCtx, f, false, dir) + if err != nil { + return nil, err + } + res := NewDir(dir) + for _, entry := range entities { + res.AddChildObject(entry) + } + return res, nil + } +} + +// extractObj implements fs.Object interface +type extractObj struct { + ownerFs fs.Fs + isFirstOpen atomic.Bool + file *TarCacheFile +} + +func (e *extractFs) newExtractObj(file *TarCacheFile) *extractObj { + res := &extractObj{ + ownerFs: e, + file: file, + } + res.isFirstOpen.Store(true) + return res +} + +// Release releases resource associated with the object +func (o *extractObj) Release() { + o.file.Release() +} + +// extractFs implements fs.Fs interface +type extractFs struct { + // parameters + fsrc fs.Fs + srcFileName string + features *fs.Features + // internal state + tarCache *TarCache + + mu sync.Mutex + tarFormat tar.Format +} + +type extract struct { + fdst fs.Fs + fsrc fs.Fs + extrFs *extractFs + srcFileName string + maxFileSizeForCache int + hashType hash.Type // common hash to use + hashOption *fs.HashesOption // open option for the common hash + // internal state + ci *fs.ConfigInfo // global config + ctx context.Context // internal context for controlling go-routines + cancel func() // cancel the context + maxDurationEndTime time.Time // end time if --max-duration is set + toBeChecked chan string // checkers channel + checkersWg sync.WaitGroup // wait for checkers + toBeUploaded chan copyJob // copiers channels + transfersWg sync.WaitGroup // wait for transfers + newTarReaderFn newTarReader + + mu sync.Mutex + lastError error + errCnt atomic.Int32 + dsh *dirTreeHolder // destination directory tree holder +} + +// Fs returns read only access to the Fs that this object is part of +func (o *extractObj) Fs() fs.Info { + return o.ownerFs +} + +// String returns a description of the Object +func (o *extractObj) String() string { + if o == nil { + return "" + } + return o.Remote() +} + +// Remote returns the remote path +func (o *extractObj) Remote() string { + return o.file.fileInfo.FileInfo.FilePath +} + +// ModTime returns the modification date of the file +// It should return a best guess if one isn't available +func (o *extractObj) ModTime(_ context.Context) time.Time { + return o.file.fileInfo.FileInfo.ModTime +} + +// Size returns the size of the object +func (o *extractObj) Size() int64 { + return o.file.fileInfo.FileInfo.Size +} + +// Hash returns the selected checksum of the file +// If no checksum is available it returns "" +func (o *extractObj) Hash(_ context.Context, _ hash.Type) (string, error) { + return "", nil +} + +// Storable says whether this object can be stored +func (o *extractObj) Storable() bool { + return true +} + +// Open opens the file for read. Call Close() on the returned io.ReadCloser +func (o *extractObj) Open(ctx context.Context, _ ...fs.OpenOption) (io.ReadCloser, error) { + if o.isFirstOpen.Load() { + o.isFirstOpen.Store(false) + + return io.NopCloser(o.file), nil + } + err := o.file.Reopen(ctx) + return io.NopCloser(o.file), err +} + +func (o *extractObj) SetModTime(context.Context, time.Time) error { + panic("not implemented") +} + +func (o *extractObj) Update(context.Context, io.Reader, fs.ObjectInfo, ...fs.OpenOption) error { + panic("not implemented") +} + +func (o *extractObj) Remove(context.Context) error { + panic("not implemented") +} + +func (e *extractFs) List(context.Context, string) (fs.DirEntries, error) { + panic("not implemented") +} + +func (e *extractFs) Put(context.Context, io.Reader, fs.ObjectInfo, ...fs.OpenOption) (fs.Object, error) { + panic("not implemented") +} + +func (e *extractFs) Mkdir(context.Context, string) error { + panic("not implemented") +} + +func (e *extractFs) Rmdir(context.Context, string) error { + panic("not implemented") +} + +func (e *extractFs) Name() string { + return "extract" +} + +// Root of the remote +func (e *extractFs) Root() string { + return fmt.Sprintf("%s:%s/%s", e.fsrc.Name(), e.fsrc.Root(), e.srcFileName) +} + +// String returns a description of the FS +func (e *extractFs) String() string { + return fmt.Sprintf("tar.gz archive %s at %s", e.Name(), e.Root()) +} + +// Precision of the ModTimes in this Fs +func (e *extractFs) Precision() time.Duration { + e.mu.Lock() + defer e.mu.Unlock() + if e.tarFormat == tar.FormatPAX { + return time.Millisecond + } + return time.Second +} + +// Hashes returns the supported hash types of the filesystem +func (e *extractFs) Hashes() hash.Set { + return hash.Set(hash.None) +} + +// Features returns the optional features of this Fs +func (e *extractFs) Features() *fs.Features { + return e.features +} + +// NewObject finds the Object at remote. If it can't be found +// it returns the error ErrorObjectNotFound. +func (e *extractFs) NewObject(ctx context.Context, remote string) (fs.Object, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } + + file := e.tarCache.GetFile(remote) + if file == nil { + panic("cache MUST contain information about this file") + } + e.mu.Lock() + if e.tarFormat == tar.FormatUnknown { + e.tarFormat = file.fileInfo.FileInfo.Format + } + e.mu.Unlock() + return e.newExtractObj(file), nil +} + +type newTarReader func(io.Reader) tarcache.TarReader + +func newExtract(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, srcFileName string, maxFileSizeForCache int, newTarReaderFn newTarReader) *extract { + ci := fs.GetConfig(ctx) + + s := &extract{ + ctx: ctx, + ci: ci, + fdst: fdst, + fsrc: fsrc, + srcFileName: srcFileName, + maxFileSizeForCache: maxFileSizeForCache, + newTarReaderFn: newTarReaderFn, + toBeChecked: make(chan string, ci.Transfers), + toBeUploaded: make(chan copyJob, ci.Transfers), + } + if ci.MaxDuration > 0 { + s.maxDurationEndTime = time.Now().Add(ci.MaxDuration) + fs.Infof(s.fdst, "Transfer session %v deadline: %s", ci.CutoffMode, s.maxDurationEndTime.Format("2006/01/02 15:04:05")) + } + + // If a max session duration has been defined add a deadline + // to the main context if cutoff mode is hard. This will cut + // the transfers off. + if !s.maxDurationEndTime.IsZero() && ci.CutoffMode == fs.CutoffModeHard { + s.ctx, s.cancel = context.WithDeadline(ctx, s.maxDurationEndTime) + } else { + s.ctx, s.cancel = context.WithCancel(ctx) + } + return s +} + +// transfer copy job.scr to job.path. +func (e *extract) transfer(ctx context.Context, in <-chan copyJob) { + defer e.transfersWg.Done() + for { + select { + case <-ctx.Done(): + return + case job, ok := <-in: + if !ok { + return + } + obj := job.src.(*extractObj) + + var err error + + needTransfer := true + if job.dst != nil { + needTransfer = operations.NeedTransfer(e.ctx, job.dst, job.src) + } + + if needTransfer { + _, err = operations.Copy(ctx, e.fdst, job.dst, job.path, job.src) + } + obj.Release() + + if err != nil { + e.errCnt.Add(1) + e.setLastError(err) + e.cancel() + return + } + } + } +} + +// runTransfers starts transfers +func (e *extract) runTransfers(ctx context.Context, in <-chan copyJob) { + for i := 0; i < e.ci.Transfers; i++ { + e.transfersWg.Add(1) + go e.transfer(ctx, in) + } +} + +// stopTransfers stops all transfers and waits for them to finish +func (e *extract) stopTransfers() { + close(e.toBeUploaded) + fs.Debugf(e.fdst, "Waiting for transfers to finish") + e.transfersWg.Wait() +} + +func (e *extract) setLastError(err error) { + e.mu.Lock() + defer e.mu.Unlock() + if e.lastError == nil { + e.lastError = err + } +} + +func (e *extract) getLastError() error { + e.mu.Lock() + defer e.mu.Unlock() + return e.lastError +} + +// checker checks if filePath exists in fdst and depending on the result, prepare the task for transfer +func (e *extract) checker(ctx context.Context, in <-chan string, out chan<- copyJob) { + defer e.checkersWg.Done() + for { + var filePath string + var ok bool + + select { + case <-ctx.Done(): + return + case filePath, ok = <-in: + if !ok { + return + } + } + + idx := strings.LastIndex(filePath, "/") + var dir string + if idx != -1 { + dir = filePath[:idx] + } + err := e.dsh.updateDirList(ctx, dir) + if err != nil { + // TODO: think about retries + e.cancel() + return + } + src, err := e.extrFs.NewObject(ctx, filePath) + if err != nil { + e.setLastError(err) + e.cancel() + return + } + + job := copyJob{ + src: src, + path: filePath, + } + if objInfo := e.dsh.getObjectByPath(filePath); objInfo != nil { + if dst := objInfo.(fs.Object); dst != nil { + // When a file with the same name as in the archive is found at the destination, + // modify the copyJob to ensure operations.Copy uses Patch instead of Put + job.dst = dst + } + } + + out <- job + } +} + +// runCheckers starts checkers +func (e *extract) runCheckers(ctx context.Context, in <-chan string, out chan<- copyJob) { + for i := 0; i < e.ci.Transfers; i++ { + e.checkersWg.Add(1) + go e.checker(ctx, in, out) + } +} + +// stopTransfers stops all checkers and waits for them to finish +func (e *extract) stopCheckers() { + close(e.toBeChecked) + fs.Debugf(e.fdst, "Waiting for checkers to finish") + e.checkersWg.Wait() +} + +func newExtractFs(ctx context.Context, fsrc fs.Fs, srcFileName string, transfers int, maxFileSizeForCache int, newTarReaderFn newTarReader) *extractFs { + res := &extractFs{ + fsrc: fsrc, + srcFileName: srcFileName, + tarCache: newTarCache(fsrc, srcFileName, newTarReaderFn, transfers, maxFileSizeForCache), + tarFormat: tar.FormatUnknown, + } + res.features = (&fs.Features{ + CanHaveEmptyDirectories: false, + }).Fill(ctx, res) + return res +} + +// Open opens an archive from the source filesystem +func (e *extractFs) Open(ctx context.Context, options ...fs.OpenOption) error { + return e.tarCache.Open(ctx, options...) +} + +func (e *extractFs) Close() error { + return e.tarCache.Close() +} + +func (e *extractFs) AddNextObject(ctx context.Context) (string, error) { + return e.tarCache.Next(ctx) +} + +func (e *extract) run() error { + fExtr := newExtractFs(e.ctx, e.fsrc, e.srcFileName, e.ci.Transfers, e.maxFileSizeForCache, e.newTarReaderFn) + + e.dsh = newDirStructHolder(makeListDir(e.ctx, e.fdst)) + e.extrFs = fExtr + + e.hashType, e.hashOption = operations.CommonHash(e.ctx, fExtr, e.fdst) + + // Options for the download + downloadOptions := []fs.OpenOption{e.hashOption} + for _, option := range e.ci.DownloadHeaders { + downloadOptions = append(downloadOptions, option) + } + + err := fExtr.Open(e.ctx, downloadOptions...) + + if err != nil { + return err + } + + e.toBeChecked = make(chan string, e.ci.Transfers) + e.toBeUploaded = make(chan copyJob, e.ci.Transfers) + + e.runTransfers(e.ctx, e.toBeUploaded) + e.runCheckers(e.ctx, e.toBeChecked, e.toBeUploaded) + + var exitErr error + + for { + path, err := fExtr.AddNextObject(e.ctx) + if err != nil { + exitErr = err + break + } + if path == "" { + break + } + + e.toBeChecked <- path + } + + e.stopCheckers() + e.stopTransfers() + fExtr.tarCache.ReleaseUnprocessedFiles() + if err := fExtr.Close(); err != nil { + if exitErr == nil { + exitErr = err + } + } + + lastError := e.getLastError() + if lastError != nil { + return lastError + } + return exitErr +} + +// Extract implements the extract command +func Extract(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, srcFileName string, maxFileSizeForCache int) error { + do := newExtract(ctx, fdst, fsrc, srcFileName, maxFileSizeForCache, nil) + return do.run() +} diff --git a/fs/extract/extract_test.go b/fs/extract/extract_test.go new file mode 100644 index 000000000..33e44802b --- /dev/null +++ b/fs/extract/extract_test.go @@ -0,0 +1,493 @@ +package extract + +import ( + "archive/tar" + "bytes" + "context" + "fmt" + "io" + "math" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/rclone/rclone/backend/memory" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/extract/tarcache" + "github.com/rclone/rclone/fs/extract/tarcache/tartest" + "github.com/rclone/rclone/fs/fserrors" + "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/fstest/mockobject" + "github.com/stretchr/testify/require" +) + +type objInfo struct { + info fs.Fs + remote string + modTime time.Time + size int64 +} + +func (o *objInfo) Fs() fs.Info { + return o.info +} + +func (o *objInfo) String() string { + return o.remote +} + +func (o *objInfo) Remote() string { + return o.remote +} + +func (o *objInfo) ModTime(context.Context) time.Time { + return o.modTime +} + +func (o *objInfo) Size() int64 { + return o.size +} + +func (o *objInfo) Hash(context.Context, hash.Type) (string, error) { + return "", nil +} + +func (o *objInfo) Storable() bool { + return true +} + +func newObjInfo(info fs.Fs, remote string, size int64) fs.ObjectInfo { + rnd := tartest.NewRandomizer() + offset := time.Millisecond * time.Duration(rnd.Intn(1000000)) + + return &objInfo{info, remote, time.Now().Add(offset), size} +} + +type FailObj struct { + fs.Object + fscr *FailFs + isFailObj bool +} + +type FailFs struct { + fs.Fs + failFile string + maxFails atomic.Int32 + totalFails atomic.Int32 + putCnt atomic.Int32 + updateCnt atomic.Int32 +} + +func (f *FailFs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) error { + listR, ok := f.Fs.(fs.ListRer) + if !ok { + return fmt.Errorf("fail fs %s does not implement fs.ListRer", f.Fs) + } + return listR.ListR(ctx, dir, callback) +} + +type FailReadCloser struct { + io.ReadCloser +} + +type FailTarReader struct { + *tar.Reader + failFile string + maxFails atomic.Int32 + totalFails atomic.Int32 + + currentHeader *tar.Header + mu sync.Mutex +} + +func (f *FailTarReader) UpdateTarReader(r *tar.Reader) { + f.mu.Lock() + defer f.mu.Unlock() + f.Reader = r +} + +func (f *FailTarReader) tarReader() *tar.Reader { + f.mu.Lock() + defer f.mu.Unlock() + return f.Reader +} + +func (f *FailTarReader) Read(p []byte) (n int, err error) { + if f.currentHeader == nil { + return f.tarReader().Read(p) + } + if f.currentHeader.Name == f.failFile { + if f.isNeedToFail() { + f.decReadWriteFailAttempts() + return 0, fserrors.RetryError(io.ErrNoProgress) + } + } + return f.tarReader().Read(p) +} + +func (f *FailTarReader) Next() (*tar.Header, error) { + h, err := f.Reader.Next() + if err != nil { + return h, err + } + f.currentHeader = h + return h, nil +} + +func (f *FailTarReader) decReadWriteFailAttempts() { + f.maxFails.Add(-1) + f.totalFails.Add(1) +} + +func (f *FailTarReader) isNeedToFail() bool { + return f.maxFails.Load() > 0 +} + +func newFailTarReader(t *tar.Reader, failFile string, maxFails int) *FailTarReader { + res := &FailTarReader{ + Reader: t, + failFile: failFile, + } + res.maxFails.Store(int32(maxFails)) + return res +} + +func NewFailReadCloser(rc io.ReadCloser) *FailReadCloser { + return &FailReadCloser{rc} +} + +func (r *FailReadCloser) Read([]byte) (int, error) { + return 0, fserrors.RetryError(io.ErrNoProgress) +} + +func NewFailObj(f fs.Object, failFs *FailFs, isFailObj bool) *FailObj { + return &FailObj{f, failFs, isFailObj} +} + +func (o *FailObj) Fs() fs.Info { + return o.fscr +} + +func (o *FailObj) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) { + rc, err := o.Object.Open(ctx, options...) + if err != nil { + return nil, err + } + if o.isFailObj && o.fscr.IsNeedToFail() { + o.fscr.DecReadWriteFailAttempts() + return NewFailReadCloser(rc), nil + } + + return rc, nil +} + +func (o *FailObj) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { + fFs, ok := o.Fs().(*FailFs) + if !ok { + panic("fail fs does not implement fail fs") + } + fFs.updateCnt.Add(1) + if o.isFailObj && o.fscr.IsNeedToFail() { + o.fscr.DecReadWriteFailAttempts() + return fserrors.RetryError(io.ErrNoProgress) + } + return o.Object.Update(ctx, in, src, options...) +} + +func NewFailFs(f fs.Fs, failFile string, maxFails int) *FailFs { + res := &FailFs{Fs: f, failFile: failFile} + res.maxFails.Store(int32(maxFails)) + return res +} + +func (f *FailFs) IsNeedToFail() bool { + return f.maxFails.Load() > 0 +} + +func (f *FailFs) TotalFails() int { + return int(f.totalFails.Load()) +} + +func (f *FailFs) DecReadWriteFailAttempts() { + f.maxFails.Add(-1) + f.totalFails.Add(1) +} + +func (f *FailFs) NewObject(ctx context.Context, remote string) (fs.Object, error) { + retFs, err := f.Fs.NewObject(ctx, remote) + if err != nil { + return nil, err + } + + return NewFailObj(retFs, f, f.failFile == remote), nil +} + +func (f *FailFs) List(ctx context.Context, dir string) (fs.DirEntries, error) { + entries, err := f.Fs.List(ctx, dir) + for idx, entry := range entries { + if obj, ok := entry.(fs.Object); ok { + entries[idx] = NewFailObj(obj, f, f.failFile == obj.Remote()) + } + } + + return entries, err +} + +func (f *FailFs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + f.putCnt.Add(1) + if src.Remote() == f.failFile { + if f.IsNeedToFail() { + f.DecReadWriteFailAttempts() + return nil, fserrors.RetryError(io.ErrNoProgress) + } + } + return f.Fs.Put(ctx, in, src, options...) +} + +func TestExtract(t *testing.T) { + t.Run("Happy path", testHappyPath) + + t.Run("Double error while writing a small file to a remote server.", testFdstFailSmallFile) + t.Run("Double error while writing a large file to a remote server.", testFdstFailLargeFile) + + t.Run("Multiple errors while writing a small file to a remote server.", testFdstFail2SmallFile) + t.Run("Multiple errors while writing a large file to a remote server.", testFdstFail2LargeFile) + + t.Run("Double errors while reading a small file from an archive.", sdstFailSmallFile) + t.Run("Double errors while reading a large file from an archive.", sdstFailLargeFile) +} + +func helperTestFdstFail2(t *testing.T, isStream bool) { + bigFileSize := 2000 + archiveFile := "archive.tar.gz" + ctx, srcFs, dstFs, rootItem := prepareSrcAndDstFs(t, fmt.Sprintf("src:dir%t", isStream), fmt.Sprintf("dst:dir%t", isStream), archiveFile, bigFileSize) + + skipFiles := 10 + filePath := rootItem.FindFilePath(&skipFiles, bigFileSize, !isStream) + require.NotEqual(t, "", filePath) + expectedFalls := 100 + newDstFs := NewFailFs(dstFs, filePath, expectedFalls) + // Extract archive to dst fs + err := Extract(ctx, newDstFs, srcFs, archiveFile, bigFileSize) + require.Error(t, err) + require.Greater(t, newDstFs.TotalFails(), 2) +} + +func testFdstFail2SmallFile(t *testing.T) { + helperTestFdstFail2(t, false) +} + +func testFdstFail2LargeFile(t *testing.T) { + helperTestFdstFail2(t, true) +} + +func helperTestFdstFail(t *testing.T, isStream bool) { + bigFileSize := 2000 + archiveFile := "archive.tar.gz" + ctx, srcFs, dstFs, rootItem := prepareSrcAndDstFs(t, fmt.Sprintf("src:dir%t", isStream), fmt.Sprintf("dst:dir%t", isStream), archiveFile, bigFileSize) + + skipFiles := 10 + filePath := rootItem.FindFilePath(&skipFiles, bigFileSize, !isStream) + require.NotEqual(t, "", filePath) + expectedFalls := 2 + newDstFs := NewFailFs(dstFs, filePath, expectedFalls) + // Extract archive to dst fs + err := Extract(ctx, newDstFs, srcFs, archiveFile, bigFileSize) + require.NoError(t, err) + require.Equal(t, expectedFalls, newDstFs.TotalFails()) + ll := make(map[string]struct{}) + listRer := dstFs.(fs.ListRer) + require.NotNil(t, listRer) + err = listRer.ListR(ctx, "", func(entries fs.DirEntries) error { + for _, entry := range entries { + obj, ok := entry.(fs.Object) + if !ok { + continue + } + ll[obj.Remote()] = struct{}{} + } + return nil + }) + require.NoError(t, err) + checkFunc(ctx, t, dstFs, ll, rootItem) + + require.Equal(t, 0, len(ll)) +} + +func sdstFailSmallFile(t *testing.T) { + sdstFailHelper(t, false) +} + +func sdstFailLargeFile(t *testing.T) { + sdstFailHelper(t, true) +} + +func sdstFailHelper(t *testing.T, isStream bool) { + bigFileSize := 2000 + archiveFile := "archive.tar.gz" + ctx, srcFs, dstFs, rootItem := prepareSrcAndDstFs(t, fmt.Sprintf("src:didir2%t", isStream), fmt.Sprintf("dst:dir2%t", isStream), archiveFile, bigFileSize) + + skipFiles := 10 + filePath := rootItem.FindFilePath(&skipFiles, bigFileSize, !isStream) + require.NotEqual(t, "", filePath) + expectedFalls := 2 + // Extract archive to dst fs + tr := newFailTarReader(nil, filePath, expectedFalls) + extr := newExtract(ctx, dstFs, srcFs, archiveFile, bigFileSize, func(reader io.Reader) tarcache.TarReader { + tarReader := tar.NewReader(reader) + tr.UpdateTarReader(tarReader) + return tr + }) + err := extr.run() + if !isStream { + // When processing files that are placed in the cache, tarcahe.Cache returns an error + // at the moment of reading the header, so operation.Copy will not be able to initiate + // a second read of this file. In this case, if an error occurs for the first time, + // it is expected that it will be returned immediately + require.Error(t, err) + return + } + require.NoError(t, err) + + ll := make(map[string]struct{}) + listRer := dstFs.(fs.ListRer) + require.NotNil(t, listRer) + err = listRer.ListR(ctx, "", func(entries fs.DirEntries) error { + for _, entry := range entries { + obj, ok := entry.(fs.Object) + if !ok { + continue + } + ll[obj.Remote()] = struct{}{} + } + return nil + }) + require.NoError(t, err) + checkFunc(ctx, t, dstFs, ll, rootItem) + + require.Equal(t, 0, len(ll)) +} + +func testFdstFailSmallFile(t *testing.T) { + helperTestFdstFail(t, false) +} + +func testFdstFailLargeFile(t *testing.T) { + helperTestFdstFail(t, true) +} + +func checkFunc(ctx context.Context, t *testing.T, dstFs fs.Fs, files map[string]struct{}, itm *tartest.Item) { + for _, obj := range itm.Children { + if obj.IsDir() { + checkFunc(ctx, t, dstFs, files, obj) + continue + } + _, ok := files[obj.Name()] + require.True(t, ok) + delete(files, obj.Name()) + o, err := dstFs.NewObject(ctx, obj.Name()) + require.NoError(t, err) + reader, err := o.Open(ctx) + require.NoError(t, err) + require.NotNil(t, reader) + actual, err := io.ReadAll(reader) + require.NoError(t, err) + require.Equal(t, obj.FileContent, actual) + } +} + +func prepareSrcAndDstFs(t *testing.T, srcRoot, dstRoot, archiveFile string, bigFileSize int) (ctx context.Context, src fs.Fs, dst fs.Fs, rootItem *tartest.Item) { + ctx = context.Background() + ctx, ci := fs.AddConfig(ctx) + ci.Transfers = 10 + ci2 := fs.GetConfig(ctx) + require.Equal(t, ci.Transfers, ci2.Transfers) + var err error + var archiveContent []byte + rootItem, archiveContent = tartest.CreateTarArchive(t, 5, 10, 12, 50, bigFileSize) + src, err = memory.NewFs(ctx, "memory", srcRoot, nil) + require.NoError(t, err) + o := mockobject.New(archiveFile).WithContent(archiveContent, mockobject.SeekModeNone) + reader, err := o.Open(ctx) + require.NoError(t, err) + + // Put archive file to source fs + _, err = src.Put(ctx, reader, o) + require.NoError(t, err) + dst, err = memory.NewFs(ctx, "memory", dstRoot, nil) + require.NoError(t, err) + return +} + +func testHappyPath(t *testing.T) { + bigFileSize := 2000 + archiveFile := "archive.tar.gz" + ctx, srcFs, dstFs, rootItem := prepareSrcAndDstFs(t, "src:dir", "dst:dir", archiveFile, bigFileSize) + wrapDstFs := NewFailFs(dstFs, "", math.MaxInt64) + dstFs = wrapDstFs + defer func() { + require.NoError(t, srcFs.Rmdir(ctx, "src:dir")) + require.NoError(t, dstFs.Rmdir(ctx, "dst:dir")) + }() + // Extract archive to dst fs + err := Extract(ctx, dstFs, srcFs, archiveFile, bigFileSize) + require.NoError(t, err) + + ll := make(map[string]struct{}) + files := make([]string, 0) + listRer := dstFs.(fs.ListRer) + require.NotNil(t, listRer) + err = listRer.ListR(ctx, "", func(entries fs.DirEntries) error { + for _, entry := range entries { + obj, ok := entry.(fs.Object) + if !ok { + continue + } + ll[obj.Remote()] = struct{}{} + files = append(files, obj.Remote()) + } + return nil + }) + filesCnt := len(files) + require.NoError(t, err) + require.Equal(t, int32(filesCnt), wrapDstFs.putCnt.Load()) + require.Equal(t, int32(0), wrapDstFs.updateCnt.Load()) + require.Equal(t, filesCnt, len(ll)) + checkFunc(ctx, t, dstFs, ll, rootItem) + require.Equal(t, 0, len(ll)) + + rnd := tartest.NewRandomizer() + cnt := 0 + + for i := 0; i < filesCnt; i += 3 { + filePath := files[i] + cnt++ + obj, err := dstFs.NewObject(ctx, filePath) + require.NoError(t, err) + + var fileSize int + isSmallFile := rnd.Dice(50) + if isSmallFile { + // [1, bigFileSize) + fileSize = rnd.Intn(bigFileSize-1) + 1 + } else { + // [bigFileSize, 2*bigFileSize) + fileSize = rnd.Intn(bigFileSize) + bigFileSize + } + fileContent := make([]byte, fileSize) + _, _ = rnd.Read(fileContent) + reader := bytes.NewReader(fileContent) + + err = obj.Update(ctx, reader, newObjInfo(dstFs, filePath, int64(fileSize))) + require.NoError(t, err) + } + + wrapDstFs.putCnt.Store(0) + wrapDstFs.updateCnt.Store(0) + // Extract archive to dst fs + err = Extract(ctx, wrapDstFs, srcFs, archiveFile, bigFileSize) + require.NoError(t, err) + require.Equal(t, int32(0), wrapDstFs.putCnt.Load()) + require.Equal(t, int32(cnt), wrapDstFs.updateCnt.Load()) +} diff --git a/fs/extract/tarcache/tarcache.go b/fs/extract/tarcache/tarcache.go new file mode 100644 index 000000000..de03a539f --- /dev/null +++ b/fs/extract/tarcache/tarcache.go @@ -0,0 +1,201 @@ +// Package tarcache provides a cache for tar archives. +package tarcache + +import ( + "archive/tar" + "bytes" + "context" + "errors" + "io" + "sync" + "time" +) + +// TarReader is an interface for reading tar archives. +type TarReader interface { + io.Reader + Next() (*tar.Header, error) +} + +// Cache is a cache for tar archives. +type Cache struct { + reader TarReader + workerCnt int + maxSizeInMemory int64 + + ch chan struct{} + helperCh chan func() + streamWg sync.WaitGroup + wg sync.WaitGroup +} + +// Header holds info about file in tar archive. +type Header struct { + FilePath string + ModTime time.Time + Size int64 + Format tar.Format +} + +// FileInfo holds info about file in tar archive and its content. +type FileInfo struct { + Payload []byte + IoPayload io.Reader + ReleaseFn func() + FileInfo Header +} + +func (t *Cache) waitWorkersDone() { + t.wg.Wait() +} + +// Close closes the cache and free all resources. +func (t *Cache) Close() error { + t.waitWorkersDone() + if t.ch != nil { + close(t.ch) + t.ch = nil + close(t.helperCh) + t.helperCh = nil + return nil + } + return errors.New("cache already closed") +} + +// NewTarCache creates a new tar cache. +func NewTarCache(reader TarReader, workerCnt int, maxSizeInMemory int64) *Cache { + res := &Cache{ + reader: reader, + workerCnt: workerCnt, + maxSizeInMemory: maxSizeInMemory, + ch: make(chan struct{}, workerCnt), + helperCh: make(chan func(), 1), + } + + // helper goroutine for waiting until the file content is fully read from the stream + go func() { + for fn := range res.helperCh { + fn() + } + }() + return res +} + +type helper struct { + io.Reader + releaseFn func() + once sync.Once +} + +// Close invokes the closer function. +func (h *helper) Close() error { + h.once.Do(h.releaseFn) + return nil +} + +// NextPayload returns the next archive file as io.ReadCloser +func (t *Cache) NextPayload(ctx context.Context) (io.ReadCloser, Header, error) { + res, err := t.Next(ctx) + if err != nil { + return nil, Header{}, err + } + if res.IoPayload != nil { + return &helper{Reader: res.IoPayload, releaseFn: res.ReleaseFn}, res.FileInfo, nil + } + res.IoPayload = bytes.NewReader(res.Payload) + return &helper{Reader: res.IoPayload, releaseFn: res.ReleaseFn}, res.FileInfo, nil +} + +type streamCloser struct { + ch chan struct{} + cache *Cache + isBytes bool +} + +func newStreamCloser(cache *Cache, isBytes bool) *streamCloser { + if !isBytes { + cache.streamWg.Add(1) + } + cache.wg.Add(1) + return &streamCloser{ + ch: cache.ch, + cache: cache, + isBytes: isBytes, + } +} + +func (s *streamCloser) close() { + if s.ch == nil { + return + } + ch := s.ch + s.ch = nil + <-ch + if !s.isBytes { + s.cache.streamWg.Done() + } + s.cache.wg.Done() +} + +// Next returns info about the next file in the archive +func (t *Cache) Next(ctx context.Context) (_ FileInfo, err error) { + // We block the execution flow if the number of currently processed files exceeds the limit. + select { + // Reserve slot for processing the file + case t.ch <- struct{}{}: + case <-ctx.Done(): + return FileInfo{}, ctx.Err() + } + watcher := make(chan struct{}) + t.helperCh <- func() { + // We also block the execution flow if there is a reader without caching. + t.streamWg.Wait() + close(watcher) + } + + // If the method exits because of an error, release the slot used by the processed file right away + defer func() { + if err != nil { + <-t.ch + } + }() + + select { + case <-watcher: + case <-ctx.Done(): + return FileInfo{}, ctx.Err() + } + + for { + header, err := t.reader.Next() + if err != nil { + return FileInfo{nil, nil, nil, Header{}}, err + } + switch header.Typeflag { + case tar.TypeDir: + continue + case tar.TypeReg: + h := Header{ + FilePath: header.Name, + ModTime: header.ModTime, + Size: header.Size, + Format: header.Format, + } + // If the file size is smaller than maxSizeInMemory, read its content + if header.Size < t.maxSizeInMemory { + var payload []byte + payload, err = io.ReadAll(t.reader) + if err != nil { + return FileInfo{nil, nil, nil, Header{}}, err + } + closer := newStreamCloser(t, true) + return FileInfo{payload, nil, closer.close, h}, nil + } + // Otherwise, return FileInfo with a stream reader + closer := newStreamCloser(t, false) + return FileInfo{nil, t.reader, closer.close, h}, nil + default: + continue + } + } +} diff --git a/fs/extract/tarcache/tarcache_test.go b/fs/extract/tarcache/tarcache_test.go new file mode 100644 index 000000000..96f71ffa5 --- /dev/null +++ b/fs/extract/tarcache/tarcache_test.go @@ -0,0 +1,503 @@ +package tarcache + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "context" + "encoding/hex" + "errors" + "fmt" + "io" + "io/fs" + "math" + "math/rand" + "path" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type itemType int + +const ( + Dir itemType = iota + File +) + +// item implements fs.FileInfo interface +type item struct { + iType itemType + itemName string + children map[string]*item + fileContent []byte +} + +func (i *item) Name() string { + return i.itemName +} + +func (i *item) Size() int64 { + return int64(len(i.fileContent)) +} + +func (i *item) Mode() fs.FileMode { + res := fs.ModePerm + if i.iType == Dir { + res |= fs.ModeDir + } + return res +} + +func (i *item) ModTime() time.Time { + return time.Now() +} + +func (i *item) IsDir() bool { + return i.iType == Dir +} + +func (i *item) Sys() any { + return nil +} + +func dice(rnd *rand.Rand, percent int) bool { + return rnd.Intn(100) < percent +} + +func randomFileName(rnd *rand.Rand) string { + b := make([]byte, 16) + rnd.Read(b) + return hex.EncodeToString(b) +} + +type errorReader struct { + io.Reader + + bytesRead int + threshold int +} + +var errRead = errors.New("read error") + +func newReaderWithError(rc io.Reader, threshold int) *errorReader { + return &errorReader{rc, 0, threshold} +} + +func (e *errorReader) Read(p []byte) (n int, err error) { + if e.bytesRead > e.threshold { + return 0, errRead + } + n, err = e.Reader.Read(p) + e.bytesRead += n + return +} + +func (i *item) createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) { + if deep <= 0 { + return + } + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + + nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir + + for j := 0; j < nItem; j++ { + isFile := deep == 1 || dice(rnd, 60) + newItem := item{} + var prefix string + if len(i.itemName) > 0 { + prefix = i.itemName + "/" + } + if isFile { + newItem.itemName = fmt.Sprintf("%sfile_%s", prefix, randomFileName(rnd)) + newItem.iType = File + var fileSize int + isSmallFile := dice(rnd, smallFilesPercent) + if isSmallFile { + // [1, bigFileSize) + fileSize = rnd.Intn(bigFileSize-1) + 1 + } else { + // [bigFileSize, 2*bigFileSize) + fileSize = rnd.Intn(bigFileSize) + bigFileSize + } + newItem.fileContent = make([]byte, fileSize) + rnd.Read(newItem.fileContent) + i.children[newItem.itemName] = &newItem + } else { + newItem.itemName = fmt.Sprintf("%sdir_%s", prefix, randomFileName(rnd)) + newItem.iType = Dir + newItem.children = make(map[string]*item) + newItem.createTarArchiveContent(deep-1, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize) + i.children[newItem.itemName] = &newItem + } + } +} + +func (i *item) fillMap(m *sync.Map) { + for _, v := range i.children { + if v.iType == File { + m.Store(v.itemName, v.fileContent) + } else if v.iType == Dir { + v.fillMap(m) + } + } +} + +func createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) *item { + root := &item{ + iType: Dir, + itemName: "", + children: make(map[string]*item), + } + + root.createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize) + return root +} + +func (i *item) writeItemToArchive(tw *tar.Writer) error { + // In the first pass, we write the files from the current directory. + for _, child := range i.children { + if child.iType == File { + head, err := tar.FileInfoHeader(child, path.Base(child.itemName)) + if err != nil { + return err + } + head.Name = child.itemName + err = tw.WriteHeader(head) + if err != nil { + return err + } + _, err = io.Copy(tw, bytes.NewReader(child.fileContent)) + if err != nil { + return err + } + } + } + + // In the second pass, we write files from child directories. + for _, child := range i.children { + if child.iType == Dir { + if err := child.writeItemToArchive(tw); err != nil { + return err + } + } + } + + return nil +} + +func createTarArchiveHelper(writer io.Writer, content *item) error { + gz := gzip.NewWriter(writer) + defer func() { + _ = gz.Close() + }() + + tw := tar.NewWriter(gz) + defer func() { + _ = tw.Close() + }() + + return content.writeItemToArchive(tw) +} + +func createTarArchive(t *testing.T, deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) (*item, []byte) { + content := createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize) + + writer := bytes.NewBuffer(make([]byte, 0)) + + err := createTarArchiveHelper(writer, content) + archData := writer.Bytes() + require.NoError(t, err) + require.NotNil(t, content) + require.NotNil(t, archData) + + return content, archData +} + +func createTarArchiveWithOneFileHelper(w io.Writer) error { + gz := gzip.NewWriter(w) + defer func() { + _ = gz.Close() + }() + + tw := tar.NewWriter(gz) + defer func() { + _ = tw.Close() + }() + itm := item{ + iType: File, + itemName: "test.bin", + fileContent: make([]byte, 0x80000), + } + + rootItm := item{ + iType: Dir, + children: map[string]*item{ + itm.itemName: &itm, + }, + } + + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + rnd.Read(itm.fileContent) + return rootItm.writeItemToArchive(tw) +} + +func createTarArchiveWithOneFile(t *testing.T) []byte { + writer := bytes.NewBuffer(make([]byte, 0)) + err := createTarArchiveWithOneFileHelper(writer) + require.NoError(t, err) + return writer.Bytes() +} + +func doubleCloseCaseHelper(t *testing.T, readerTypeChecker func(io.Reader) bool) { + bigFileSize := 0x50 + content, archData := createTarArchive(t, 5, 8, 10, 50, bigFileSize) + + reader := bytes.NewReader(archData) + + gzf, err := gzip.NewReader(reader) + require.NoError(t, err) + + tarReader := tar.NewReader(gzf) + tarCache := NewTarCache(tarReader, 10, int64(bigFileSize)) + + var m sync.Map + content.fillMap(&m) + + errCh := make(chan error, 100) + ctx, cancel := context.WithCancel(context.Background()) + ctx2 := context.Background() + defer cancel() + var wg sync.WaitGroup + var cnt int32 + +L: + for { + select { + case <-ctx.Done(): + break L + default: + } + contentReader, header, err := tarCache.NextPayload(ctx2) + + if err != nil { + if err == io.EOF { + break + } + require.NoError(t, err) + } + wg.Add(1) + go func(reader io.ReadCloser, head Header) { + defer wg.Done() + defer func() { + if err := reader.Close(); err != nil { + errCh <- err + cancel() + } + }() + select { + case <-ctx.Done(): + return + default: + } + val, ok := m.Load(header.FilePath) + if !ok { + errCh <- errors.New(header.FilePath + " not found") + cancel() + return + } + expected := val.([]byte) + content, err := io.ReadAll(reader) + if err != nil { + errCh <- err + cancel() + return + } + if !bytes.Equal(expected, content) { + errCh <- errors.New(header.FilePath + " content mismatch") + cancel() + return + } + if atomic.AddInt32(&cnt, 1) >= 100 { + if readerTypeChecker(reader) { + if err := reader.Close(); err != nil { + errCh <- err + cancel() + return + } + } + } + }(contentReader, header) + } + err = tarCache.Close() + require.NoError(t, err) + wg.Wait() + close(errCh) + + hasError := false + for e := range errCh { + if e != nil { + hasError = true + } + } + require.False(t, hasError) +} + +func readErrorCase(t *testing.T) { + ctx := context.Background() + archData := createTarArchiveWithOneFile(t) + + reader := newReaderWithError(bytes.NewReader(archData), 0x800) + + gzf, err := gzip.NewReader(reader) + require.NoError(t, err) + + tarReader := tar.NewReader(gzf) + tarCache := NewTarCache(tarReader, 10, math.MaxInt) + + errCh := make(chan error, 100) + var wg sync.WaitGroup + _, _, err = tarCache.NextPayload(ctx) + + require.NotErrorIs(t, err, io.EOF) + require.Error(t, err, errRead.Error()) + + err = tarCache.Close() + require.NoError(t, err) + wg.Wait() + close(errCh) + + for e := range errCh { + if err == nil { + err = e + } + } + require.NoError(t, err) +} + +func successfulCase(t *testing.T) { + bigFileSize := 0x200 + content, archData := createTarArchive(t, 5, 5, 30, 70, bigFileSize) + + reader := bytes.NewReader(archData) + gzf, err := gzip.NewReader(reader) + require.NoError(t, err) + + tarReader := tar.NewReader(gzf) + tarCache := NewTarCache(tarReader, 10, int64(bigFileSize)) + + var m sync.Map + content.fillMap(&m) + + errCh := make(chan error, 100) + ctx, cancel := context.WithCancel(context.Background()) + ctx2 := context.Background() + defer cancel() + var wg sync.WaitGroup +L: + for { + select { + case <-ctx.Done(): + break L + default: + } + contentReader, header, err := tarCache.NextPayload(ctx2) + + if err != nil { + if err == io.EOF { + break + } + require.NoError(t, err) + } + wg.Add(1) + go func(reader io.ReadCloser, head Header) { + defer wg.Done() + defer func() { + if err := reader.Close(); err != nil { + errCh <- err + cancel() + } + }() + select { + case <-ctx.Done(): + return + default: + } + val, ok := m.Load(header.FilePath) + if !ok { + errCh <- errors.New(header.FilePath + " not found") + cancel() + return + } + expected := val.([]byte) + content, err := io.ReadAll(reader) + if err != nil { + errCh <- err + cancel() + return + } + if !bytes.Equal(expected, content) { + errCh <- errors.New(header.FilePath + " content mismatch") + cancel() + return + } + m.Delete(header.FilePath) + + }(contentReader, header) + } + err = tarCache.Close() + wg.Wait() + close(errCh) + var err2 error + for e := range errCh { + if err2 == nil { + err2 = e + } + } + require.NoError(t, err2) + require.NoError(t, err) + + // Checking for dictionary emptiness + var l int + m.Range(func(k, v interface{}) bool { + l++ + return true + }) + require.Equal(t, 0, l) +} + +func TestTarCache(t *testing.T) { + t.Run("Successful case", func(t *testing.T) { + successfulCase(t) + }) + + t.Run("Double close of the byte reader", func(t *testing.T) { + doubleCloseCaseHelper(t, func(reader io.Reader) bool { + e, ok := reader.(*helper) + if !ok { + return false + } + _, ok = e.Reader.(*bytes.Reader) + return ok + }) + }) + + t.Run("Double close of the stream reader", func(t *testing.T) { + doubleCloseCaseHelper(t, func(reader io.Reader) bool { + e, ok := reader.(*helper) + if !ok { + return false + } + _, ok = e.Reader.(*bytes.Reader) + return !ok + }) + }) + + t.Run("Read error", func(t *testing.T) { + readErrorCase(t) + }) + +} diff --git a/fs/extract/tarcache/tartest/tartest.go b/fs/extract/tarcache/tartest/tartest.go new file mode 100644 index 000000000..540df0357 --- /dev/null +++ b/fs/extract/tarcache/tartest/tartest.go @@ -0,0 +1,239 @@ +// Package tartest is a set of functions that assist with the testing of tar archives +package tartest + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "encoding/hex" + "fmt" + "io" + "io/fs" + "math/rand" + "path" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// FindFilePath is a helper function to find the file path of an item +func (i *Item) FindFilePath(skipCnt *int, bigFileSize int, fitInCache bool) string { + for _, obj := range i.Children { + if !obj.IsDir() { + isBig := len(obj.FileContent) >= bigFileSize + if isBig && !fitInCache || !isBig && fitInCache { + if *skipCnt == 0 { + return obj.Name() + } + *skipCnt-- + } + } + } + for _, obj := range i.Children { + if obj.IsDir() { + res := obj.FindFilePath(skipCnt, bigFileSize, fitInCache) + if len(res) > 0 { + return res + } + } + } + return "" +} + +// ItemType is an item type +type ItemType int + +// Item types +const ( + Dir ItemType = iota // directory type + File // regular file type +) + +// Item represents a file or directory +type Item struct { + Type ItemType + ItemName string + Children map[string]*Item + FileContent []byte +} + +// Name returns file name +func (i *Item) Name() string { + return i.ItemName +} + +// Size returns file size +func (i *Item) Size() int64 { + return int64(len(i.FileContent)) +} + +// Mode returns file mode +func (i *Item) Mode() fs.FileMode { + res := fs.ModePerm + if i.Type == Dir { + res |= fs.ModeDir + } + return res +} + +// ModTime returns modification time +func (i *Item) ModTime() time.Time { + return time.Now() +} + +// IsDir returns true if item is a directory +func (i *Item) IsDir() bool { + return i.Type == Dir +} + +// Sys returns underlying data source (can return nil) +func (i *Item) Sys() any { + return nil +} + +// Randomizer is a helper to generate random data +type Randomizer struct { + rnd *rand.Rand +} + +// NewRandomizer returns a new Randomizer +func NewRandomizer() Randomizer { + return Randomizer{ + rnd: rand.New(rand.NewSource(time.Now().UnixNano())), + } +} + +func (r Randomizer) Read(p []byte) (n int, err error) { + return r.rnd.Read(p) +} + +// Dice returns true if dice roll is less than the specified percent +func (r Randomizer) Dice(percent int) bool { + return r.rnd.Intn(100) < percent +} + +// RandomFileName generates random file name +func (r Randomizer) RandomFileName() string { + b := make([]byte, 16) + r.rnd.Read(b) + return hex.EncodeToString(b) +} + +// Intn returns random integer in the range [0, n) +func (r Randomizer) Intn(n int) int { + return r.rnd.Intn(n) +} + +func (i *Item) createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) { + if deep <= 0 { + return + } + + rnd := NewRandomizer() + nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir + + for j := 0; j < nItem; j++ { + isFile := deep == 1 || rnd.Dice(60) + newItem := Item{} + var prefix string + if len(i.ItemName) > 0 { + prefix = i.ItemName + "/" + } + if isFile { + newItem.ItemName = fmt.Sprintf("%sfile_%s", prefix, rnd.RandomFileName()) + newItem.Type = File + var fileSize int + isSmallFile := rnd.Dice(smallFilesPercent) + if isSmallFile { + // [1, bigFileSize) + fileSize = rnd.Intn(bigFileSize-1) + 1 + } else { + // [bigFileSize, 2*bigFileSize) + fileSize = rnd.Intn(bigFileSize) + bigFileSize + } + newItem.FileContent = make([]byte, fileSize) + _, _ = rnd.Read(newItem.FileContent) + i.Children[newItem.ItemName] = &newItem + } else { + newItem.ItemName = fmt.Sprintf("%sdir_%s", prefix, rnd.RandomFileName()) + newItem.Type = Dir + newItem.Children = make(map[string]*Item) + newItem.createTarArchiveContent(deep-1, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize) + i.Children[newItem.ItemName] = &newItem + } + } +} + +// CreateTarArchiveContent generates a tar archive containing random data, structured to the specified depth +func CreateTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) *Item { + root := &Item{ + Type: Dir, + ItemName: "", + Children: make(map[string]*Item), + } + + root.createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize) + return root +} + +func (i *Item) writeItemToArchiver(tw *tar.Writer) error { + // In the first pass, we write the files from the current directory. + for _, child := range i.Children { + if child.Type == File { + head, err := tar.FileInfoHeader(child, path.Base(child.ItemName)) + if err != nil { + return err + } + head.Name = child.ItemName + err = tw.WriteHeader(head) + if err != nil { + return err + } + _, err = io.Copy(tw, bytes.NewReader(child.FileContent)) + if err != nil { + return err + } + } + } + + // In the second pass, we write files from child directories. + for _, child := range i.Children { + if child.Type == Dir { + if err := child.writeItemToArchiver(tw); err != nil { + return err + } + } + } + + return nil +} + +func createTarArchiveHelper(writer io.Writer, content *Item) error { + gz := gzip.NewWriter(writer) + defer func() { + _ = gz.Close() + }() + + tw := tar.NewWriter(gz) + defer func() { + _ = tw.Close() + }() + + return content.writeItemToArchiver(tw) +} + +// CreateTarArchive creates a tar archive of the given content. +func CreateTarArchive(t *testing.T, deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) (*Item, []byte) { + content := CreateTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize) + + writer := bytes.NewBuffer(make([]byte, 0)) + + err := createTarArchiveHelper(writer, content) + archData := writer.Bytes() + require.NoError(t, err) + require.NotNil(t, content) + require.NotNil(t, archData) + + return content, archData +}