diff --git a/cmd/all/all.go b/cmd/all/all.go index 37b15c5c3..34c3d68f9 100644 --- a/cmd/all/all.go +++ b/cmd/all/all.go @@ -23,6 +23,7 @@ import ( _ "github.com/rclone/rclone/cmd/dedupe" _ "github.com/rclone/rclone/cmd/delete" _ "github.com/rclone/rclone/cmd/deletefile" + _ "github.com/rclone/rclone/cmd/extract" _ "github.com/rclone/rclone/cmd/genautocomplete" _ "github.com/rclone/rclone/cmd/gendocs" _ "github.com/rclone/rclone/cmd/gitannex" diff --git a/cmd/copy/copy.go b/cmd/copy/copy.go index 44235ae19..29c726e89 100644 --- a/cmd/copy/copy.go +++ b/cmd/copy/copy.go @@ -97,6 +97,7 @@ for more info. Run: func(command *cobra.Command, args []string) { cmd.CheckArgs(2, 2, command, args) + fsrc, srcFileName, fdst := cmd.NewFsSrcFileDst(args) cmd.Run(true, true, command, func() error { if srcFileName == "" { diff --git a/cmd/extract/extract.go b/cmd/extract/extract.go new file mode 100644 index 000000000..2af96f4ee --- /dev/null +++ b/cmd/extract/extract.go @@ -0,0 +1,62 @@ +// Package extract provides the extract command. +package extract + +import ( + "context" + "errors" + "strings" + + "github.com/rclone/rclone/cmd" + "github.com/rclone/rclone/fs/config/flags" + "github.com/rclone/rclone/fs/extract" + "github.com/spf13/cobra" +) + +var ( + maxFileSizeForCache = 0x100000 // 1Mb +) + +func init() { + cmd.Root.AddCommand(commandDefinition) + cmdFlags := commandDefinition.Flags() + flags.IntVarP(cmdFlags, + &maxFileSizeForCache, + "max-file-size-for-cache", + "", + maxFileSizeForCache, + `Maximum file size that can be placed in the cache. Files of larger +size will be read without caching, which will prevent parallelization +of copy operations and may result in longer archive unpacking time.`, + "") +} + +var commandDefinition = &cobra.Command{ + Use: "extract source:path/to/archive.tar.gz dest:path", + Short: `Extract the tar.gz archive from source to the dest, skipping identical files.`, + // Note: "|" will be replaced by backticks below + Long: strings.ReplaceAll(`Extract the tar.gz archive files from the source to the destination. Does not transfer files that are +identical on source and destination, testing by size and modification +time or MD5SUM. Doesn't delete files from the destination. + +If dest:path doesn't exist, it is created and the contents of the archive source:path/to/targz/archive +go there. + +**Note**: Use the |-P|/|--progress| flag to view real-time transfer statistics. + +**Note**: Use the |--dry-run| or the |--interactive|/|-i| flag to test without copying anything. +`, "|", "`"), + Annotations: map[string]string{ + "groups": "Important", + }, + Run: func(command *cobra.Command, args []string) { + + cmd.CheckArgs(2, 2, command, args) + fsrc, srcFileName, fdst := cmd.NewFsSrcFileDst(args) + cmd.Run(true, true, command, func() error { + if srcFileName == "" { + return errors.New("the source is required to be a file") + } + return extract.Extract(context.Background(), fdst, fsrc, srcFileName, maxFileSizeForCache) + }) + }, +} diff --git a/fs/extract/dirtreeholder.go b/fs/extract/dirtreeholder.go new file mode 100644 index 000000000..26ef24ff8 --- /dev/null +++ b/fs/extract/dirtreeholder.go @@ -0,0 +1,280 @@ +package extract + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + + "github.com/rclone/rclone/fs" +) + +// Object represents a file or directory. +type Object struct { + isDir bool + data any +} + +// Dir represents a directory with objects. +type Dir struct { + name string + objs map[string]Object + dirs map[string]*Dir +} + +// NewDir creates a new directory. +func NewDir(name string) *Dir { + return &Dir{ + name: name, + objs: make(map[string]Object), + dirs: make(map[string]*Dir), + } +} + +// Name returns the name of the directory. +func (d *Dir) Name() string { + return d.name +} + +// NewObject creates a new Object. +func NewObject(isDir bool, data any) Object { + return Object{ + isDir: isDir, + data: data, + } +} + +// GetDirByName returns a subdirectory with the given name +func (d *Dir) GetDirByName(name string) *Dir { + if d.name == name { + return d + } + if len(name) <= len(d.name) || !strings.HasPrefix(name, d.name) { + return nil + } + if d.name != "" && name[len(d.name)] != '/' { + return nil + } + idx := strings.Index(name[len(d.name)+1:], "/") + nextDir := name + if idx != -1 { + nextDir = name[:idx+len(d.name)+1] + } + if td, ok := d.dirs[nextDir]; ok { + return td.GetDirByName(name) + } + return nil +} + +// AddChildDir adds a child directory +func (d *Dir) AddChildDir(child *Dir) { + d.dirs[child.name] = child +} + +// AddChildObject adds information about the child object +func (d *Dir) AddChildObject(name string, child Object) { + d.objs[name] = child +} + +type listDirFn func(dir string) (*Dir, error) + +type dirTreeHolder struct { + listF listDirFn + + mu sync.Mutex + // consider changing the variable type to fs.DirTree + root *Dir + dirsInWork map[string]*sync.WaitGroup +} + +type strDirIter struct { + dir string + idx int +} + +func newStrDirIter(dir string) *strDirIter { + return &strDirIter{ + dir: dir, + idx: -1, + } +} + +func (s *strDirIter) next() (cur string, next string) { + if s.dir == "" { + return "", "" + } + if s.idx == -1 { + s.idx = 0 + idx := strings.Index(s.dir, "/") + if idx == -1 { + return "", s.dir + } + return "", s.dir[:idx] + } + idx := strings.Index(s.dir[s.idx:], "/") + if idx == -1 { + return s.dir, "" + } + defer func(idx int) { + s.idx = s.idx + idx + 1 + }(idx) + + idx2 := strings.Index(s.dir[s.idx+idx+1:], "/") + if idx2 == -1 { + return s.dir[:idx+s.idx], s.dir + } + return s.dir[:idx+s.idx], s.dir[:idx+idx2+s.idx+1] +} + +func (d *Dir) getObjectByPath(path string) (Object, bool) { + var dirName string + idx := strings.LastIndex(path, "/") + if idx != -1 { + dirName = path[:idx] + } + dir := d.GetDirByName(dirName) + if dir == nil { + return Object{}, false + } + obj, ok := dir.objs[path] + return obj, ok +} + +func newDirStructHolder(listF listDirFn) *dirTreeHolder { + return &dirTreeHolder{ + listF: listF, + root: nil, + dirsInWork: make(map[string]*sync.WaitGroup), + } +} + +func (l *dirTreeHolder) getObjectByPath(path string) (Object, bool) { + l.mu.Lock() + defer l.mu.Unlock() + if l.root == nil { + return Object{}, false + } + return l.root.getObjectByPath(path) +} + +func (l *dirTreeHolder) isDirExist(path string) bool { + l.mu.Lock() + defer l.mu.Unlock() + if l.root == nil { + return false + } + return l.root.GetDirByName(path) != nil +} + +func (l *dirTreeHolder) updateDirList(ctx context.Context, dir string) error { + l.mu.Lock() + iter := newStrDirIter(dir) + aborting := func() bool { + select { + case <-ctx.Done(): + return true + default: + return false + } + } + for { + if aborting() { + l.mu.Unlock() + return ctx.Err() + } + curDir, nextDir := iter.next() + if nextDir == "" { + l.mu.Unlock() + return nil + } + if wg, ok := l.dirsInWork[curDir]; ok { + l.mu.Unlock() + wg.Wait() + if aborting() { + return ctx.Err() + } + l.mu.Lock() + } + if curDir == "" && l.root == nil { + if wg, ok := l.dirsInWork[curDir]; ok { + l.mu.Unlock() + wg.Wait() + if aborting() { + return ctx.Err() + } + l.mu.Lock() + dir := l.root.GetDirByName(curDir) + if dir == nil { + l.mu.Unlock() + return fmt.Errorf("error while updating info about dir %s", nextDir) + } + } else { + wg := &sync.WaitGroup{} + wg.Add(1) + l.dirsInWork[curDir] = wg + l.mu.Unlock() + d, err := l.listF(curDir) + + if err != nil { + if !errors.Is(err, fs.ErrorDirNotFound) { + return err + } + d = NewDir(curDir) + } + l.mu.Lock() + l.root = d + wg = l.dirsInWork[curDir] + delete(l.dirsInWork, curDir) + wg.Done() + } + } + d := l.root.GetDirByName(curDir) + if d == nil { + return errors.New("not possible to go where") + } + if _, ok := d.dirs[nextDir]; ok { + if ok { + continue + } + } + if o, ok := d.objs[nextDir]; ok { + // Where is no such directory + if !o.isDir { + l.mu.Unlock() + return nil + } + + if wg, ok := l.dirsInWork[nextDir]; ok { + l.mu.Unlock() + wg.Wait() + if aborting() { + return ctx.Err() + } + l.mu.Lock() + dir := d.GetDirByName(nextDir) + if dir == nil { + l.mu.Unlock() + return fmt.Errorf("error while updating info about dir %s", nextDir) + } + } else { + wg := &sync.WaitGroup{} + wg.Add(1) + l.dirsInWork[nextDir] = wg + l.mu.Unlock() + td, err := l.listF(nextDir) + if err != nil { + return err + } + l.mu.Lock() + d.dirs[nextDir] = td + wg = l.dirsInWork[nextDir] + delete(l.dirsInWork, nextDir) + wg.Done() + } + } else { + l.mu.Unlock() + return nil + } + } +} diff --git a/fs/extract/dirtreeholder_test.go b/fs/extract/dirtreeholder_test.go new file mode 100644 index 000000000..3823297cc --- /dev/null +++ b/fs/extract/dirtreeholder_test.go @@ -0,0 +1,321 @@ +package extract + +import ( + "context" + "encoding/hex" + "fmt" + "math/rand" + "os" + "path/filepath" + "sort" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +func dice(rnd *rand.Rand, percent int) bool { + return rnd.Intn(100) < percent +} + +func randomFileName(rnd *rand.Rand) string { + b := make([]byte, 16) + rnd.Read(b) + return hex.EncodeToString(b) +} + +func createDirTree(dir *Dir, deep, minItemsInDir, maxItemsInDir int) { + if deep <= 0 { + return + } + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + + nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir + + for j := 0; j < nItem; j++ { + isFile := deep == 1 || dice(rnd, 50) + var prefix string + if len(dir.name) > 0 { + prefix = dir.name + "/" + } + if isFile { + name := fmt.Sprintf("%sfile_%s", prefix, randomFileName(rnd)) + newItem := NewObject(false, name) + dir.objs[name] = newItem + } else { + name := fmt.Sprintf("%sdir_%s", prefix, randomFileName(rnd)) + newItem := NewObject(true, name) + dir.objs[name] = newItem + childDir := NewDir(name) + createDirTree(childDir, deep-1, minItemsInDir, maxItemsInDir) + if len(childDir.dirs) != 0 || len(childDir.objs) != 0 { + dir.dirs[childDir.name] = childDir + } + } + } +} + +type listFnHelper struct { + mu sync.Mutex + cnt atomic.Uint64 + root *Dir +} + +func (l *listFnHelper) Cnt() uint64 { + return l.cnt.Load() +} + +func (l *listFnHelper) ResetCnt() { + l.cnt.Store(0) +} + +func (l *listFnHelper) listDir(dir string) (*Dir, error) { + l.cnt.Add(1) + l.mu.Lock() + defer l.mu.Unlock() + d := l.root.GetDirByName(dir) + if d == nil { + return nil, os.ErrNotExist + } + newDir := NewDir(d.name) + for path, child := range d.objs { + newDir.AddChildObject(path, child) + } + return newDir, nil +} + +func fillSliceWithDirsInfo(s *[]string, dir *Dir) { + for path, child := range dir.objs { + if child.isDir { + *s = append(*s, path) + } + } + for _, child := range dir.dirs { + fillSliceWithDirsInfo(s, child) + } +} + +func fillMapWithFilesInfo(m map[string][]string, dir *Dir) { + files := make([]string, 0) + for path, child := range dir.objs { + if !child.isDir { + files = append(files, filepath.Base(path)) + } + } + + m[dir.Name()] = files + + for _, child := range dir.dirs { + fillMapWithFilesInfo(m, child) + } +} + +func TestListing(t *testing.T) { + t.Run("Dir struct holder", testDirTreeHolder) + + t.Run("Dir", testDir) + + t.Run("String directory iterating", testStrDirIter) +} + +func testDirTreeHolder(t *testing.T) { + root := NewDir("") + createDirTree(root, 6, 5, 10) + listF := &listFnHelper{ + root: root, + } + t.Run("Concurrent listing", func(t *testing.T) { + s := make([]string, 0) + m := make(map[string][]string) + fillSliceWithDirsInfo(&s, root) + fillMapWithFilesInfo(m, root) + sort.Slice(s, func(i, j int) bool { + return len(s[i]) < len(s[j]) + }) + require.NotNil(t, root) + holder := newDirStructHolder(listF.listDir) + halfLen := len(s) / 2 + path := s[halfLen] + expectedCnt := 0 + dIter := newStrDirIter(path) + for { + expectedCnt++ + _, next := dIter.next() + if next == "" { + break + } + } + require.NotNil(t, holder) + + eg, ctx := errgroup.WithContext(context.Background()) + eg.Go(func() error { + return holder.updateDirList(ctx, path) + }) + eg.Go(func() error { + return holder.updateDirList(ctx, path) + }) + eg.Go(func() error { + return holder.updateDirList(ctx, path) + }) + eg.Go(func() error { + return holder.updateDirList(ctx, path+"not.exists") + }) + err := eg.Wait() + require.NoError(t, err) + require.Equal(t, uint64(expectedCnt), listF.Cnt()) + dIter = newStrDirIter(path) + var cur, next string + next = "1" + for next != "" { + cur, next = dIter.next() + if next == "" { + break + } + require.True(t, holder.isDirExist(cur)) + files, ok := m[cur] + require.True(t, ok) + for _, file := range files { + var filePath string + if cur == "" { + filePath = file + } else { + filePath = cur + "/" + file + } + var obj Object + obj, ok = holder.getObjectByPath(filePath) + if !ok { + require.True(t, ok) + } + require.True(t, ok) + require.Equal(t, filePath, obj.data.(string)) + } + } + }) +} + +func testDir(t *testing.T) { + finalDir := "path/with/more/than/one/dir" + files := map[string][]string{ + "": {"file0.1.ext", "file0.2.ext"}, + "path": {"file1.1.ext", "file1.2.ext"}, + "path/with/more/than/one": {"file2.1.ext", "file2.2.ext"}, + "path/with/more/than/one/dir": {"file3.1.ext", "file3.2.ext"}, + } + + dirIter := newStrDirIter(finalDir) + var root, prevDir *Dir + next := "1" + for next != "" { + var cur string + cur, next = dirIter.next() + dir := NewDir(cur) + if root == nil { + root = dir + } + if files, ok := files[cur]; ok { + for _, file := range files { + obj := NewObject(false, struct{}{}) + if cur == "" { + dir.AddChildObject(file, obj) + } else { + dir.AddChildObject(cur+"/"+file, obj) + } + + } + } + if prevDir != nil { + prevDir.AddChildDir(dir) + prevDir.AddChildObject(dir.Name(), NewObject(true, struct{}{})) + } + prevDir = dir + } + + t.Run("GetDirByName", func(t *testing.T) { + dirIter = newStrDirIter(finalDir) + next = "1" + cnt := 0 + for next != "" { + var cur string + cur, next = dirIter.next() + dir := root.GetDirByName(cur) + require.NotNil(t, dir) + cnt++ + } + require.Equal(t, 7, cnt) + }) + + t.Run("getObjectByPath", func(t *testing.T) { + for dirName, fileNames := range files { + for _, fileName := range fileNames { + var filePath string + if dirName == "" { + filePath = fileName + } else { + filePath = dirName + "/" + fileName + } + obj, ok := root.getObjectByPath(filePath) + require.True(t, ok) + require.False(t, obj.isDir) + filePath += ".fail" + _, ok = root.getObjectByPath(filePath) + require.False(t, ok) + } + } + }) + +} + +func testStrDirIter(t *testing.T) { + for _, tc := range []struct { + name string + dir string + res []string + }{ + { + name: "path with more than one dir", + dir: "path/with/more/than/one/dir", + res: []string{ + "", + "path", + "path/with", + "path/with/more", + "path/with/more/than", + "path/with/more/than/one", + "path/with/more/than/one/dir", + "", + }, + }, + { + name: "path with one dir", + dir: "one_dir", + res: []string{ + "", + "one_dir", + "", + }, + }, + { + name: "empty path", + dir: "", + res: []string{ + "", + "", + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + l := newStrDirIter(tc.dir) + for i, p := range tc.res { + if p == "" && i > 0 { + break + } + cur, next := l.next() + require.Equal(t, cur, p) + require.Equal(t, next, tc.res[i+1]) + } + }) + } +} diff --git a/fs/extract/extract.go b/fs/extract/extract.go new file mode 100644 index 000000000..53e1c441b --- /dev/null +++ b/fs/extract/extract.go @@ -0,0 +1,780 @@ +// Package extract is the implementation of extract command +package extract + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "context" + "errors" + "fmt" + "io" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/extract/tarcache" + "github.com/rclone/rclone/fs/filter" + "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/fs/list" + "github.com/rclone/rclone/fs/operations" +) + +type payloadType int + +const ( + payloadTypeBytes payloadType = iota + payloadTypeIo +) + +type TarCacheFile struct { + owner *TarCache + pt payloadType + fileInfo tarcache.FileInfo + reader io.Reader +} + +func (t *TarCacheFile) Release() { + t.fileInfo.ReleaseFn() +} +func (t *TarCacheFile) Read(p []byte) (n int, err error) { + return t.reader.Read(p) +} + +func (t *TarCacheFile) reopenIo(ctx context.Context) (err error) { + return t.owner.requestReopenFile(ctx, t) +} + +func (t *TarCacheFile) reopenBytes(ctx context.Context) (err error) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + t.reader = bytes.NewReader(t.fileInfo.Payload) + return nil + } +} + +func (t *TarCacheFile) Reopen(ctx context.Context) (err error) { + if t.pt == payloadTypeBytes { + return t.reopenBytes(ctx) + } + return t.reopenIo(ctx) +} + +func (t *TarCache) requestReopenFile(ctx context.Context, file *TarCacheFile) error { + reopenReq := reopenRequest{ + file: file, + respCh: make(chan error, 1), + } + + select { + case t.reopenReqCh <- reopenReq: + case <-ctx.Done(): + return ctx.Err() + case <-t.finishCh: + return errTarCacheIsClosed + } + + select { + case err := <-reopenReq.respCh: + return err + case <-ctx.Done(): + return ctx.Err() + case <-t.finishCh: + return errTarCacheIsClosed + } +} + +func (t *TarCache) newTarCacheFile(info tarcache.FileInfo) *TarCacheFile { + res := &TarCacheFile{ + owner: t, + pt: payloadTypeIo, + fileInfo: info, + reader: info.IoPayload, + } + if info.IoPayload == nil { + res.pt = payloadTypeBytes + res.reader = bytes.NewReader(info.Payload) + } + return res +} + +type TarCache struct { + // parameters + fsrc fs.Fs + srcFileName string + newTarReaderFn newTarReader + transfers int + maxFileSizeForCache int + options []fs.OpenOption + + // no thread safe internal state + closeStreamFn func() error + tarCache *tarcache.Cache + workerCh chan func() + reopenReqCh chan reopenRequest + finishCh chan struct{} + pendingCloseCache []*tarcache.Cache + + // thread safe internal state + mu sync.Mutex + cache map[string]tarcache.FileInfo +} + +func newTarCache(fsrc fs.Fs, srcFileName string, newTarReaderFn newTarReader, transfers int, maxFileSizeForCache int) *TarCache { + res := &TarCache{ + fsrc: fsrc, + srcFileName: srcFileName, + newTarReaderFn: newTarReaderFn, + transfers: transfers, + maxFileSizeForCache: maxFileSizeForCache, + workerCh: make(chan func()), + reopenReqCh: make(chan reopenRequest), + finishCh: make(chan struct{}), + cache: make(map[string]tarcache.FileInfo), + } + + if res.newTarReaderFn == nil { + res.newTarReaderFn = func(reader io.Reader) tarcache.TarReader { + return tar.NewReader(reader) + } + } + + go func() { + for workFn := range res.workerCh { + workFn() + } + }() + return res +} + +func (t *TarCache) Close() error { + close(t.workerCh) + err := t.closeHelper() + if t.tarCache != nil { + _ = t.tarCache.Close() + } + return err +} + +type reopenRequest struct { + file *TarCacheFile + respCh chan error +} + +var errReopenArchiveRequest = errors.New("reopen archive request") +var errTarCacheIsClosed = errors.New("tar cache is closed") + +func (t *TarCache) handleReopenRequest(ctx context.Context, reopenReq reopenRequest) { + defer close(reopenReq.respCh) + t.pendingCloseCache = append(t.pendingCloseCache, t.tarCache) + _ = t.closeHelper() + reopenReq.file.Release() + var err error + prev := t.tarCache + _ = prev.Close() + t.tarCache, t.closeStreamFn, err = t.openHelper(ctx) + if err != nil { + reopenReq.respCh <- err + return + } + err = t.FindFile(ctx, reopenReq.file.fileInfo.FileInfo.FilePath) + if err == nil { + file := t.GetFile(reopenReq.file.fileInfo.FileInfo.FilePath) + if file == nil { + panic("shouldn't get here") + } + *reopenReq.file = *file + } + reopenReq.respCh <- err +} + +func (t *TarCache) FindFile(ctx context.Context, filePath string) error { + for { + select { + case <-t.finishCh: + return errTarCacheIsClosed + default: + } + + h, err := t.tarCache.Next(ctx) + + if err != nil { + return err + } + if h.FileInfo.FilePath != filePath { + h.ReleaseFn() + continue + } + + t.mu.Lock() + t.cache[h.FileInfo.FilePath] = h + t.mu.Unlock() + return nil + } +} + +func (t *TarCache) GetFile(filePath string) *TarCacheFile { + t.mu.Lock() + defer t.mu.Unlock() + info, ok := t.cache[filePath] + if !ok { + return nil + } + return t.newTarCacheFile(info) +} + +func (t *TarCache) ReleaseUnprocessedFiles() { + t.mu.Lock() + // Here we are preparing a list of functions that release resources, because we cannot call them when + // iterating over the map, since the ReleaseFn method also uses the same mutex that protects the map. + releaseFns := make([]func(), 0, len(t.cache)) + for _, info := range t.cache { + releaseFns = append(releaseFns, info.ReleaseFn) + } + t.cache = make(map[string]tarcache.FileInfo) + t.mu.Unlock() + for _, fn := range releaseFns { + fn() + } +} + +func (t *TarCache) Next(ctx context.Context) (string, error) { + select { + case <-t.finishCh: + return "", errTarCacheIsClosed + default: + } + for { + watchdog := make(chan struct{}) + var reopenReq reopenRequest + ctx2, cancel := context.WithCancelCause(ctx) + var wg sync.WaitGroup + wg.Add(1) + tmp := atomic.Bool{} + t.workerCh <- func() { + defer wg.Done() + select { + case <-watchdog: + case reopenReq = <-t.reopenReqCh: + tmp.Store(true) + cancel(errReopenArchiveRequest) + } + } + h, err := t.tarCache.Next(ctx2) + relFn := h.ReleaseFn + h.ReleaseFn = func() { + t.mu.Lock() + delete(t.cache, h.FileInfo.FilePath) + t.mu.Unlock() + relFn() + } + close(watchdog) + wg.Wait() + for _, cache := range t.pendingCloseCache { + _ = cache.Close() + } + t.pendingCloseCache = nil + if err != nil { + if errors.Is(context.Cause(ctx2), errReopenArchiveRequest) { + t.handleReopenRequest(ctx, reopenReq) + continue + } + cancel(nil) + close(t.finishCh) + if err == io.EOF { + return "", nil + } + + return "", err + } + + cancel(nil) + + t.mu.Lock() + defer t.mu.Unlock() + _, ok := t.cache[h.FileInfo.FilePath] + if ok { + panic("shouldn't get here") + } + t.cache[h.FileInfo.FilePath] = h + + return h.FileInfo.FilePath, nil + } +} + +func (t *TarCache) closeHelper() error { + if t.closeStreamFn != nil { + fn := t.closeStreamFn + t.closeStreamFn = nil + return fn() + } + return nil +} + +func (t *TarCache) openHelper(ctx context.Context) (cache *tarcache.Cache, closeFn func() error, err error) { + obj, err := t.fsrc.NewObject(ctx, t.srcFileName) + if err != nil { + return nil, nil, err + } + + reader, err := obj.Open(ctx, t.options...) + + if err != nil { + return nil, nil, err + } + + gzr, err := gzip.NewReader(reader) + if err != nil { + // we don't care about the error, we just need to close the reader + _ = reader.Close() + return nil, nil, err + } + + tarReader := t.newTarReaderFn(gzr) + cache = tarcache.NewTarCache(tarReader, t.transfers, int64(t.maxFileSizeForCache)) + return cache, func() error { + gzErr := gzr.Close() + readerErr := reader.Close() + if gzErr != nil { + return gzErr + } + return readerErr + }, nil +} + +func (t *TarCache) Open(ctx context.Context, options ...fs.OpenOption) error { + t.options = options + var err error + t.tarCache, t.closeStreamFn, err = t.openHelper(ctx) + return err +} + +type copyJob struct { + src fs.Object + dst fs.Object + path string +} + +func makeListDir(ctx context.Context, f fs.Fs) listDirFn { + return func(dir string) (*Dir, error) { + dirCtx := filter.SetUseFilter(ctx, f.Features().FilterAware) // make filter-aware backends constrain List + entities, err := list.DirSorted(dirCtx, f, false, dir) + if err != nil { + return nil, err + } + res := NewDir(dir) + for _, entry := range entities { + if obj, ok := entry.(fs.Object); ok { + o := NewObject(false, obj) + res.AddChildObject(obj.Remote(), o) + } + if d, ok := entry.(fs.Directory); ok { + o := NewObject(true, nil) + res.AddChildObject(d.Remote(), o) + } + } + return res, nil + } +} + +type extractObj struct { + ownerFs fs.Fs + isFirstOpen atomic.Bool + file *TarCacheFile +} + +func (e *extractFs) newExtractObj(file *TarCacheFile) *extractObj { + res := &extractObj{ + ownerFs: e, + file: file, + } + res.isFirstOpen.Store(true) + return res +} + +func (o *extractObj) Release() { + o.file.Release() +} + +type extractFs struct { + // parameters + fsrc fs.Fs + srcFileName string + features *fs.Features + // internal state + tarCache *TarCache +} + +type extract struct { + fdst fs.Fs + fsrc fs.Fs + extrFs *extractFs + srcFileName string + maxFileSizeForCache int + hashType hash.Type // common hash to use + hashOption *fs.HashesOption // open option for the common hash + // internal state + ci *fs.ConfigInfo // global config + ctx context.Context // internal context for controlling go-routines + cancel func() // cancel the context + maxDurationEndTime time.Time // end time if --max-duration is set + toBeChecked chan string // checkers channel + checkersWg sync.WaitGroup // wait for checkers + toBeUploaded chan copyJob // copiers channels + transfersWg sync.WaitGroup // wait for transfers + newTarReaderFn newTarReader + + mu sync.Mutex + lastError error + errCnt atomic.Int32 + dsh *dirTreeHolder +} + +func (o *extractObj) Fs() fs.Info { + return o.ownerFs +} + +func (o *extractObj) String() string { + if o == nil { + return "" + } + return o.Remote() +} + +func (o *extractObj) Remote() string { + return o.file.fileInfo.FileInfo.FilePath +} + +func (o *extractObj) ModTime(ctx context.Context) time.Time { + return o.file.fileInfo.FileInfo.ModTime +} + +func (o *extractObj) Size() int64 { + return o.file.fileInfo.FileInfo.Size +} + +func (o *extractObj) Hash(ctx context.Context, ty hash.Type) (string, error) { + return "", nil +} + +func (o *extractObj) Storable() bool { + return true +} + +func (o *extractObj) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) { + if o.isFirstOpen.Load() { + o.isFirstOpen.Store(false) + + return io.NopCloser(o.file), nil + } + err := o.file.Reopen(ctx) + return io.NopCloser(o.file), err +} + +func (o *extractObj) SetModTime(ctx context.Context, t time.Time) error { + panic("not implemented") +} + +func (o *extractObj) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { + panic("not implemented") +} + +func (o *extractObj) Remove(ctx context.Context) error { + panic("not implemented") +} + +func (e *extractFs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { + panic("not implemented") +} + +func (e *extractFs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + panic("not implemented") +} + +func (e *extractFs) Mkdir(ctx context.Context, dir string) error { + panic("not implemented") +} + +func (e *extractFs) Rmdir(ctx context.Context, dir string) error { + panic("not implemented") +} + +func (e *extractFs) Name() string { + return "extract" +} + +func (e *extractFs) Root() string { + return fmt.Sprintf("%s:%s/%s", e.fsrc.Name(), e.fsrc.Root(), e.srcFileName) +} + +func (e *extractFs) String() string { + return fmt.Sprintf("tar.gz archive %s at %s", e.Name(), e.Root()) +} + +func (e *extractFs) Precision() time.Duration { + //TODO: change this + return time.Second +} + +func (e *extractFs) Hashes() hash.Set { + return hash.Set(hash.None) +} + +func (e *extractFs) Features() *fs.Features { + return e.features +} + +func (e *extractFs) NewObject(ctx context.Context, remote string) (fs.Object, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } + + file := e.tarCache.GetFile(remote) + if file == nil { + panic("cache MUST contain information about this file") + } + + return e.newExtractObj(file), nil +} + +type newTarReader func(io.Reader) tarcache.TarReader + +func newExtract(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, srcFileName string, maxFileSizeForCache int, newTarReaderFn newTarReader) *extract { + ci := fs.GetConfig(ctx) + + s := &extract{ + ctx: ctx, + ci: ci, + fdst: fdst, + fsrc: fsrc, + srcFileName: srcFileName, + maxFileSizeForCache: maxFileSizeForCache, + newTarReaderFn: newTarReaderFn, + toBeChecked: make(chan string, ci.Transfers), + toBeUploaded: make(chan copyJob, ci.Transfers), + } + if ci.MaxDuration > 0 { + s.maxDurationEndTime = time.Now().Add(ci.MaxDuration) + fs.Infof(s.fdst, "Transfer session %v deadline: %s", ci.CutoffMode, s.maxDurationEndTime.Format("2006/01/02 15:04:05")) + } + + // If a max session duration has been defined add a deadline + // to the main context if cutoff mode is hard. This will cut + // the transfers off. + if !s.maxDurationEndTime.IsZero() && ci.CutoffMode == fs.CutoffModeHard { + s.ctx, s.cancel = context.WithDeadline(ctx, s.maxDurationEndTime) + } else { + s.ctx, s.cancel = context.WithCancel(ctx) + } + return s +} + +// transfer copy job.scr to job.path. +func (e *extract) transfer(ctx context.Context, in <-chan copyJob) { + defer e.transfersWg.Done() + for { + select { + case <-ctx.Done(): + return + case job, ok := <-in: + if !ok { + return + } + obj := job.src.(*extractObj) + + _, err := operations.Copy(ctx, e.fdst, job.dst, job.path, job.src) + + obj.Release() + + if err != nil { + e.errCnt.Add(1) + e.setLastError(err) + e.cancel() + return + } + } + } +} + +// runTransfers starts transfers +func (e *extract) runTransfers(ctx context.Context, in <-chan copyJob) { + for i := 0; i < e.ci.Transfers; i++ { + e.transfersWg.Add(1) + go e.transfer(ctx, in) + } +} + +// stopTransfers stops all transfers and waits for them to finish +func (e *extract) stopTransfers() { + close(e.toBeUploaded) + fs.Debugf(e.fdst, "Waiting for transfers to finish") + e.transfersWg.Wait() +} + +func (e *extract) setLastError(err error) { + e.mu.Lock() + defer e.mu.Unlock() + if e.lastError == nil { + e.lastError = err + } +} + +func (e *extract) getLastError() error { + e.mu.Lock() + defer e.mu.Unlock() + return e.lastError +} + +// checker checks if filePath exists in fdst and depending on the result, prepare the task for transfer +func (e *extract) checker(ctx context.Context, in <-chan string, out chan<- copyJob) { + defer e.checkersWg.Done() + for { + var filePath string + var ok bool + + select { + case <-ctx.Done(): + return + case filePath, ok = <-in: + + if !ok { + return + } + } + + idx := strings.LastIndex(filePath, "/") + var dir string + if idx != -1 { + dir = filePath[:idx] + } + err := e.dsh.updateDirList(ctx, dir) + if err != nil { + // TODO: think about retries + e.cancel() + return + } + src, err := e.extrFs.NewObject(ctx, filePath) + if err != nil { + e.setLastError(err) + e.cancel() + return + } + + job := copyJob{ + src: src, + path: filePath, + } + if objInfo, ok := e.dsh.getObjectByPath(filePath); ok && !objInfo.isDir { + if dst := objInfo.data.(fs.Object); dst != nil { + job.dst = dst + } + } + out <- job + } +} + +// runCheckers starts checkers +func (e *extract) runCheckers(ctx context.Context, in <-chan string, out chan<- copyJob) { + for i := 0; i < e.ci.Transfers; i++ { + e.checkersWg.Add(1) + go e.checker(ctx, in, out) + } +} + +// stopTransfers stops all checkers and waits for them to finish +func (e *extract) stopCheckers() { + close(e.toBeChecked) + fs.Debugf(e.fdst, "Waiting for checkers to finish") + e.checkersWg.Wait() +} + +func newExtractFs(ctx context.Context, fsrc fs.Fs, srcFileName string, transfers int, maxFileSizeForCache int, newTarReaderFn newTarReader) *extractFs { + res := &extractFs{ + fsrc: fsrc, + srcFileName: srcFileName, + tarCache: newTarCache(fsrc, srcFileName, newTarReaderFn, transfers, maxFileSizeForCache), + } + res.features = (&fs.Features{ + CanHaveEmptyDirectories: false, + }).Fill(ctx, res) + return res +} + +// Open opens an archive from the source filesystem +func (e *extractFs) Open(ctx context.Context, options ...fs.OpenOption) error { + return e.tarCache.Open(ctx, options...) +} + +func (e *extractFs) Close() error { + return e.tarCache.Close() +} + +func (e *extractFs) AddNextObject(ctx context.Context) (string, error) { + return e.tarCache.Next(ctx) +} + +func (e *extract) run() error { + fExtr := newExtractFs(e.ctx, e.fsrc, e.srcFileName, e.ci.Transfers, e.maxFileSizeForCache, e.newTarReaderFn) + + e.dsh = newDirStructHolder(makeListDir(e.ctx, e.fdst)) + e.extrFs = fExtr + + e.hashType, e.hashOption = operations.CommonHash(e.ctx, fExtr, e.fdst) + + // Options for the download + downloadOptions := []fs.OpenOption{e.hashOption} + for _, option := range e.ci.DownloadHeaders { + downloadOptions = append(downloadOptions, option) + } + + err := fExtr.Open(e.ctx, downloadOptions...) + + if err != nil { + return err + } + + e.toBeChecked = make(chan string, e.ci.Transfers) + e.toBeUploaded = make(chan copyJob, e.ci.Transfers) + + e.runTransfers(e.ctx, e.toBeUploaded) + e.runCheckers(e.ctx, e.toBeChecked, e.toBeUploaded) + + var exitErr error + + for { + path, err := fExtr.AddNextObject(e.ctx) + if err != nil { + exitErr = err + break + } + if path == "" { + break + } + + e.toBeChecked <- path + } + + e.stopCheckers() + e.stopTransfers() + fExtr.tarCache.ReleaseUnprocessedFiles() + if err := fExtr.Close(); err != nil { + if exitErr == nil { + exitErr = err + } + } + + lastError := e.getLastError() + if lastError != nil { + return lastError + } + return exitErr +} + +func Extract(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, srcFileName string, maxFileSizeForCache int) error { + do := newExtract(ctx, fdst, fsrc, srcFileName, maxFileSizeForCache, nil) + return do.run() +} diff --git a/fs/extract/extract_test.go b/fs/extract/extract_test.go new file mode 100644 index 000000000..15054ae59 --- /dev/null +++ b/fs/extract/extract_test.go @@ -0,0 +1,383 @@ +package extract + +import ( + "archive/tar" + "context" + "fmt" + "io" + "sync" + "sync/atomic" + "testing" + + "github.com/rclone/rclone/backend/memory" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/extract/tarcache" + "github.com/rclone/rclone/fs/extract/tarcache/tartest" + "github.com/rclone/rclone/fs/fserrors" + "github.com/rclone/rclone/fstest/mockobject" + "github.com/stretchr/testify/require" +) + +type FailObj struct { + fs.Object + fscr *FailFs +} + +type FailFs struct { + fs.Fs + failFile string + maxFails atomic.Int32 + totalFails atomic.Int32 +} + +type FailReadCloser struct { + io.ReadCloser +} + +type FailTarReader struct { + *tar.Reader + failFile string + maxFails atomic.Int32 + totalFails atomic.Int32 + + currentHeader *tar.Header + mu sync.Mutex +} + +func (f *FailTarReader) UpdateTarReader(r *tar.Reader) { + f.mu.Lock() + defer f.mu.Unlock() + f.Reader = r +} + +func (f *FailTarReader) tarReader() *tar.Reader { + f.mu.Lock() + defer f.mu.Unlock() + return f.Reader +} + +func (f *FailTarReader) Read(p []byte) (n int, err error) { + if f.currentHeader == nil { + return f.tarReader().Read(p) + } + if f.currentHeader.Name == f.failFile { + if f.isNeedToFail() { + f.decReadWriteFailAttempts() + return 0, fserrors.RetryError(io.ErrNoProgress) + } + } + return f.tarReader().Read(p) +} + +func (f *FailTarReader) Next() (*tar.Header, error) { + h, err := f.Reader.Next() + if err != nil { + return h, err + } + f.currentHeader = h + return h, nil +} + +func (f *FailTarReader) decReadWriteFailAttempts() { + f.maxFails.Add(-1) + f.totalFails.Add(1) +} + +func (f *FailTarReader) isNeedToFail() bool { + return f.maxFails.Load() > 0 +} + +func newFailTarReader(t *tar.Reader, failFile string, maxFails int) *FailTarReader { + res := &FailTarReader{ + Reader: t, + failFile: failFile, + } + res.maxFails.Store(int32(maxFails)) + return res +} + +func NewFailReadCloser(rc io.ReadCloser) *FailReadCloser { + return &FailReadCloser{rc} +} + +func (r *FailReadCloser) Read([]byte) (int, error) { + return 0, fserrors.RetryError(io.ErrNoProgress) +} + +func NewFailObj(f fs.Object, failFs *FailFs) *FailObj { + return &FailObj{f, failFs} +} + +func (o *FailObj) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) { + rc, err := o.Object.Open(ctx, options...) + if err != nil { + return nil, err + } + if o.fscr.IsNeedToFail() { + o.fscr.DecReadWriteFailAttempts() + return NewFailReadCloser(rc), nil + } + + return rc, nil +} + +func (o *FailObj) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { + if o.fscr.IsNeedToFail() { + o.fscr.DecReadWriteFailAttempts() + return fserrors.RetryError(io.ErrNoProgress) + } + return o.Object.Update(ctx, in, src, options...) +} + +func NewFailFs(f fs.Fs, failFile string, maxFails int) *FailFs { + res := &FailFs{Fs: f, failFile: failFile} + res.maxFails.Store(int32(maxFails)) + return res +} + +func (f *FailFs) IsNeedToFail() bool { + return f.maxFails.Load() > 0 +} + +func (f *FailFs) TotalFails() int { + return int(f.totalFails.Load()) +} + +func (f *FailFs) DecReadWriteFailAttempts() { + f.maxFails.Add(-1) + f.totalFails.Add(1) +} + +func (f *FailFs) NewObject(ctx context.Context, remote string) (fs.Object, error) { + retFs, err := f.Fs.NewObject(ctx, remote) + if f.failFile != remote || err != nil { + return retFs, err + } + + fmt.Println("new object", remote) + + return NewFailObj(retFs, f), nil +} + +func (f *FailFs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + if src.Remote() == f.failFile { + if f.IsNeedToFail() { + f.DecReadWriteFailAttempts() + return nil, fserrors.RetryError(io.ErrNoProgress) + } + } + return f.Fs.Put(ctx, in, src, options...) +} + +// test cases +// 1. Happy path: extract all files from archive to a directory (done); +// 2. Fail once when writing a file to the target directory and write it successfully when writing the same file again; +// 3. Fail all attempts to write a file to the target directory; +// 4. Fail once when reading a file from the archive, but read it successfully the second time; +// 5. Fail all attempts to read a file from the archive; + +func TestExtract(t *testing.T) { + t.Run("Happy path", testHappyPath) + + t.Run("Double error while writing a small file to a remote server.", testFdstFailSmallFile) + t.Run("Double error while writing a large file to a remote server.", testFdstFailLargeFile) + + t.Run("Multiple errors while writing a small file to a remote server.", testFdstFail2SmallFile) + t.Run("Multiple errors while writing a large file to a remote server.", testFdstFail2LargeFile) + + t.Run("Double errors while reading a small file from an archive.", sdstFailSmallFile) + t.Run("Double errors while reading a large file from an archive.", sdstFailLargeFile) +} + +func helperTestFdstFail2(t *testing.T, isStream bool) { + bigFileSize := 2000 + archiveFile := "archive.tar.gz" + ctx, srcFs, dstFs, rootItem := prepareSrcAndDstFs(t, fmt.Sprintf("src:dir%t", isStream), fmt.Sprintf("dst:dir%t", isStream), archiveFile, bigFileSize) + + skipFiles := 10 + filePath := rootItem.FindFilePath(&skipFiles, bigFileSize, !isStream) + require.NotEqual(t, "", filePath) + expectedFalls := 100 + newDstFs := NewFailFs(dstFs, filePath, expectedFalls) + // Extract archive to dst fs + err := Extract(ctx, newDstFs, srcFs, archiveFile, bigFileSize) + require.Error(t, err) + require.Greater(t, newDstFs.TotalFails(), 2) +} + +func testFdstFail2SmallFile(t *testing.T) { + helperTestFdstFail2(t, false) +} + +func testFdstFail2LargeFile(t *testing.T) { + helperTestFdstFail2(t, true) +} + +func helperTestFdstFail(t *testing.T, isStream bool) { + bigFileSize := 2000 + archiveFile := "archive.tar.gz" + ctx, srcFs, dstFs, rootItem := prepareSrcAndDstFs(t, fmt.Sprintf("src:dir%t", isStream), fmt.Sprintf("dst:dir%t", isStream), archiveFile, bigFileSize) + + skipFiles := 10 + filePath := rootItem.FindFilePath(&skipFiles, bigFileSize, !isStream) + require.NotEqual(t, "", filePath) + expectedFalls := 2 + newDstFs := NewFailFs(dstFs, filePath, expectedFalls) + // Extract archive to dst fs + err := Extract(ctx, newDstFs, srcFs, archiveFile, bigFileSize) + require.NoError(t, err) + require.Equal(t, expectedFalls, newDstFs.TotalFails()) + ll := make(map[string]struct{}) + listRer := dstFs.(fs.ListRer) + require.NotNil(t, listRer) + err = listRer.ListR(ctx, "", func(entries fs.DirEntries) error { + for _, entry := range entries { + obj, ok := entry.(fs.Object) + if !ok { + continue + } + ll[obj.Remote()] = struct{}{} + } + return nil + }) + require.NoError(t, err) + checkFunc(ctx, t, dstFs, ll, rootItem) + + require.Equal(t, 0, len(ll)) +} + +func sdstFailSmallFile(t *testing.T) { + sdstFailHelper(t, false) +} + +func sdstFailLargeFile(t *testing.T) { + sdstFailHelper(t, true) +} + +func sdstFailHelper(t *testing.T, isStream bool) { + bigFileSize := 2000 + archiveFile := "archive.tar.gz" + ctx, srcFs, dstFs, rootItem := prepareSrcAndDstFs(t, fmt.Sprintf("src:didir2%t", isStream), fmt.Sprintf("dst:dir2%t", isStream), archiveFile, bigFileSize) + + skipFiles := 10 + filePath := rootItem.FindFilePath(&skipFiles, bigFileSize, !isStream) + require.NotEqual(t, "", filePath) + expectedFalls := 2 + // Extract archive to dst fs + tr := newFailTarReader(nil, filePath, expectedFalls) + extr := newExtract(ctx, dstFs, srcFs, archiveFile, bigFileSize, func(reader io.Reader) tarcache.TarReader { + tarReader := tar.NewReader(reader) + tr.UpdateTarReader(tarReader) + return tr + }) + err := extr.run() + if !isStream { + // When processing files that are placed in the cache, tarcahe.Cache returns an error + // at the moment of reading the header, so operation.Copy will not be able to initiate + // a second read of this file. In this case, if an error occurs for the first time, + // it is expected that it will be returned immediately + require.Error(t, err) + return + } + require.NoError(t, err) + + ll := make(map[string]struct{}) + listRer := dstFs.(fs.ListRer) + require.NotNil(t, listRer) + err = listRer.ListR(ctx, "", func(entries fs.DirEntries) error { + for _, entry := range entries { + obj, ok := entry.(fs.Object) + if !ok { + continue + } + ll[obj.Remote()] = struct{}{} + } + return nil + }) + require.NoError(t, err) + checkFunc(ctx, t, dstFs, ll, rootItem) + + require.Equal(t, 0, len(ll)) +} + +func testFdstFailSmallFile(t *testing.T) { + helperTestFdstFail(t, false) +} + +func testFdstFailLargeFile(t *testing.T) { + helperTestFdstFail(t, true) +} + +func checkFunc(ctx context.Context, t *testing.T, dstFs fs.Fs, files map[string]struct{}, itm *tartest.Item) { + for _, obj := range itm.Children { + if obj.IsDir() { + checkFunc(ctx, t, dstFs, files, obj) + continue + } + _, ok := files[obj.Name()] + require.True(t, ok) + delete(files, obj.Name()) + o, err := dstFs.NewObject(ctx, obj.Name()) + require.NoError(t, err) + reader, err := o.Open(ctx) + require.NoError(t, err) + require.NotNil(t, reader) + actual, err := io.ReadAll(reader) + require.NoError(t, err) + require.Equal(t, obj.FileContent, actual) + } +} + +func prepareSrcAndDstFs(t *testing.T, srcRoot, dstRoot, archiveFile string, bigFileSize int) (ctx context.Context, src fs.Fs, dst fs.Fs, rootItem *tartest.Item) { + ctx = context.Background() + ctx, ci := fs.AddConfig(ctx) + ci.Transfers = 10 + ci2 := fs.GetConfig(ctx) + require.Equal(t, ci.Transfers, ci2.Transfers) + var err error + var archiveContent []byte + rootItem, archiveContent = tartest.CreateTarArchive(t, 5, 10, 12, 50, bigFileSize) + src, err = memory.NewFs(ctx, "memory", srcRoot, nil) + require.NoError(t, err) + o := mockobject.New(archiveFile).WithContent(archiveContent, mockobject.SeekModeNone) + reader, err := o.Open(ctx) + require.NoError(t, err) + + // Put archive file to source fs + _, err = src.Put(ctx, reader, o) + require.NoError(t, err) + dst, err = memory.NewFs(ctx, "memory", dstRoot, nil) + require.NoError(t, err) + return +} + +func testHappyPath(t *testing.T) { + bigFileSize := 2000 + archiveFile := "archive.tar.gz" + ctx, srcFs, dstFs, rootItem := prepareSrcAndDstFs(t, "src:dir", "dst:dir", archiveFile, bigFileSize) + defer func() { + require.NoError(t, srcFs.Rmdir(ctx, "src:dir")) + require.NoError(t, srcFs.Rmdir(ctx, "dst:dir")) + }() + // Extract archive to dst fs + err := Extract(ctx, dstFs, srcFs, archiveFile, bigFileSize) + require.NoError(t, err) + + ll := make(map[string]struct{}) + listRer := dstFs.(fs.ListRer) + require.NotNil(t, listRer) + err = listRer.ListR(ctx, "", func(entries fs.DirEntries) error { + for _, entry := range entries { + obj, ok := entry.(fs.Object) + if !ok { + continue + } + ll[obj.Remote()] = struct{}{} + } + return nil + }) + require.NoError(t, err) + checkFunc(ctx, t, dstFs, ll, rootItem) + + require.Equal(t, 0, len(ll)) +} diff --git a/fs/extract/tarcache/tarcache.go b/fs/extract/tarcache/tarcache.go new file mode 100644 index 000000000..e82201120 --- /dev/null +++ b/fs/extract/tarcache/tarcache.go @@ -0,0 +1,198 @@ +// Package tarcache provides a cache for tar archives. +package tarcache + +import ( + "archive/tar" + "bytes" + "context" + "errors" + "fmt" + "io" + "sync" + "time" +) + +type TarReader interface { + io.Reader + Next() (*tar.Header, error) +} + +// Cache is a cache for tar archives. +type Cache struct { + reader TarReader + workerCnt int + maxSizeInMemory int64 + + ch chan struct{} + helperCh chan func() + streamWg sync.WaitGroup + wg sync.WaitGroup +} + +// Header holds info about file in tar archive. +type Header struct { + FilePath string + ModTime time.Time + Size int64 + Format tar.Format +} + +// FileInfo holds info about file in tar archive and its content. +type FileInfo struct { + Payload []byte + IoPayload io.Reader + ReleaseFn func() + FileInfo Header +} + +func (t *Cache) waitWorkersDone() { + t.wg.Wait() +} + +// Close closes the cache and free all resources. +func (t *Cache) Close() error { + t.waitWorkersDone() + if t.ch != nil { + close(t.ch) + t.ch = nil + close(t.helperCh) + t.helperCh = nil + return nil + } + return errors.New("cache already closed") +} + +// NewTarCache creates a new tar cache. +func NewTarCache(reader TarReader, workerCnt int, maxSizeInMemory int64) *Cache { + res := &Cache{ + reader: reader, + workerCnt: workerCnt, + maxSizeInMemory: maxSizeInMemory, + ch: make(chan struct{}, workerCnt), + helperCh: make(chan func(), 1), + } + go func() { + for fn := range res.helperCh { + fn() + } + }() + return res +} + +type helper struct { + io.Reader + releaseFn func() + once sync.Once +} + +// Close invokes the closer function. +func (h *helper) Close() error { + h.once.Do(h.releaseFn) + return nil +} + +// NextPayload returns the next archive file as io.ReadCloser +func (t *Cache) NextPayload(ctx context.Context) (_ io.ReadCloser, _ Header, err error) { + res, err := t.Next(ctx) + if err != nil { + return nil, Header{}, err + } + if res.IoPayload != nil { + return &helper{Reader: res.IoPayload, releaseFn: res.ReleaseFn}, res.FileInfo, nil + } + res.IoPayload = bytes.NewReader(res.Payload) + return &helper{Reader: res.IoPayload, releaseFn: res.ReleaseFn}, res.FileInfo, nil +} + +type streamCloser struct { + ch chan struct{} + cache *Cache + isBytes bool +} + +func newStreamCloser(cache *Cache, isBytes bool) *streamCloser { + if !isBytes { + cache.streamWg.Add(1) + } + cache.wg.Add(1) + return &streamCloser{ + ch: cache.ch, + cache: cache, + isBytes: isBytes, + } +} + +func (s *streamCloser) close() { + if s.ch == nil { + fmt.Printf("ch is nil: %p", s) + fmt.Println("") + return + } + ch := s.ch + s.ch = nil + <-ch + if !s.isBytes { + s.cache.streamWg.Done() + } + s.cache.wg.Done() +} + +// Next returns info about the next file in the archive +func (t *Cache) Next(ctx context.Context) (_ FileInfo, err error) { + // We block the execution flow if the number of currently processed files exceeds the limit. + select { + case t.ch <- struct{}{}: + case <-ctx.Done(): + return FileInfo{}, ctx.Err() + } + watcher := make(chan struct{}) + t.helperCh <- func() { + // We also block the execution flow if there is a reader without caching. + t.streamWg.Wait() + close(watcher) + } + + select { + case <-watcher: + case <-ctx.Done(): + return FileInfo{}, ctx.Err() + } + + defer func() { + if err != nil { + <-t.ch + } + }() + + for { + header, err := t.reader.Next() + if err != nil { + return FileInfo{nil, nil, nil, Header{}}, err + } + switch header.Typeflag { + case tar.TypeDir: + continue + case tar.TypeReg: + h := Header{ + FilePath: header.Name, + ModTime: header.ModTime, + Size: header.Size, + Format: header.Format, + } + + if header.Size < t.maxSizeInMemory { + var payload []byte + payload, err = io.ReadAll(t.reader) + if err != nil { + return FileInfo{nil, nil, nil, Header{}}, err + } + closer := newStreamCloser(t, true) + return FileInfo{payload, nil, closer.close, h}, nil + } + closer := newStreamCloser(t, false) + return FileInfo{nil, t.reader, closer.close, h}, nil + default: + continue + } + } +} diff --git a/fs/extract/tarcache/tarcache_test.go b/fs/extract/tarcache/tarcache_test.go new file mode 100644 index 000000000..30c5d652f --- /dev/null +++ b/fs/extract/tarcache/tarcache_test.go @@ -0,0 +1,525 @@ +package tarcache + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "context" + "encoding/hex" + "errors" + "fmt" + "io" + "io/fs" + "math" + "math/rand" + "path" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func (i *Item) FindFilePath(skipCnt *int, bigFileSize int, fitInCache bool) string { + for _, obj := range i.Children { + if !obj.IsDir() { + isBig := len(obj.FileContent) >= bigFileSize + if isBig && !fitInCache || !isBig && fitInCache { + *skipCnt-- + if *skipCnt == 0 { + return obj.Name() + } + } + } + } + for _, obj := range i.Children { + if obj.IsDir() { + res := obj.FindFilePath(skipCnt, bigFileSize, fitInCache) + if len(res) > 0 { + return res + } + } + } + return "" +} + +type ItemType int + +const ( + Dir ItemType = iota + File +) + +type Item struct { + Type ItemType + ItemName string + Children map[string]*Item + FileContent []byte +} + +func (i *Item) Name() string { + return i.ItemName +} + +func (i *Item) Size() int64 { + return int64(len(i.FileContent)) +} + +func (i *Item) Mode() fs.FileMode { + res := fs.ModePerm + if i.Type == Dir { + res |= fs.ModeDir + } + return res +} + +func (i *Item) ModTime() time.Time { + return time.Now() +} + +func (i *Item) IsDir() bool { + return i.Type == Dir +} + +func (i *Item) Sys() any { + return nil +} + +func dice(rnd *rand.Rand, percent int) bool { + return rnd.Intn(100) < percent +} + +func randomFileName(rnd *rand.Rand) string { + b := make([]byte, 16) + rnd.Read(b) + return hex.EncodeToString(b) +} + +type errorReader struct { + io.Reader + + bytesRead int + threshold int +} + +var errRead = errors.New("read error") + +func newReaderWithError(rc io.Reader, threshold int) *errorReader { + return &errorReader{rc, 0, threshold} +} + +func (e *errorReader) Read(p []byte) (n int, err error) { + if e.bytesRead > e.threshold { + return 0, errRead + } + n, err = e.Reader.Read(p) + e.bytesRead += n + return +} + +func (i *Item) createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) { + if deep <= 0 { + return + } + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + + nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir + + for j := 0; j < nItem; j++ { + isFile := deep == 1 || dice(rnd, 60) + newItem := Item{} + var prefix string + if len(i.ItemName) > 0 { + prefix = i.ItemName + "/" + } + if isFile { + newItem.ItemName = fmt.Sprintf("%sfile_%s", prefix, randomFileName(rnd)) + newItem.Type = File + var fileSize int + isSmallFile := dice(rnd, smallFilesPercent) + if isSmallFile { + // [1, bigFileSize) + fileSize = rnd.Intn(bigFileSize-1) + 1 + } else { + // [bigFileSize, 2*bigFileSize) + fileSize = rnd.Intn(bigFileSize) + bigFileSize + } + newItem.FileContent = make([]byte, fileSize) + rnd.Read(newItem.FileContent) + i.Children[newItem.ItemName] = &newItem + } else { + newItem.ItemName = fmt.Sprintf("%sdir_%s", prefix, randomFileName(rnd)) + newItem.Type = Dir + newItem.Children = make(map[string]*Item) + newItem.createTarArchiveContent(deep-1, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize) + i.Children[newItem.ItemName] = &newItem + } + } +} + +func (i *Item) fillMap(m *sync.Map) { + for _, v := range i.Children { + if v.Type == File { + m.Store(v.ItemName, v.FileContent) + } else if v.Type == Dir { + v.fillMap(m) + } + } +} + +func CreateTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) *Item { + root := &Item{ + Type: Dir, + ItemName: "", + Children: make(map[string]*Item), + } + + root.createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize) + return root +} + +func (i *Item) writeItemToArchiver(tw *tar.Writer) error { + // In the first pass, we write the files from the current directory. + for _, child := range i.Children { + if child.Type == File { + head, err := tar.FileInfoHeader(child, path.Base(child.ItemName)) + if err != nil { + return err + } + head.Name = child.ItemName + err = tw.WriteHeader(head) + if err != nil { + return err + } + _, err = io.Copy(tw, bytes.NewReader(child.FileContent)) + if err != nil { + return err + } + } + } + + // In the second pass, we write files from child directories. + for _, child := range i.Children { + if child.Type == Dir { + if err := child.writeItemToArchiver(tw); err != nil { + return err + } + } + } + + return nil +} + +func createTarArchiveHelper(writer io.Writer, content *Item) error { + gz := gzip.NewWriter(writer) + defer func() { + _ = gz.Close() + }() + + tw := tar.NewWriter(gz) + defer func() { + _ = tw.Close() + }() + + return content.writeItemToArchiver(tw) +} + +func CreateTarArchive(t *testing.T, deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) (*Item, []byte) { + content := CreateTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize) + + writer := bytes.NewBuffer(make([]byte, 0)) + + err := createTarArchiveHelper(writer, content) + archData := writer.Bytes() + require.NoError(t, err) + require.NotNil(t, content) + require.NotNil(t, archData) + + return content, archData +} + +func createTarArchiveWithOneFileHelper(w io.Writer) error { + gz := gzip.NewWriter(w) + defer func() { + _ = gz.Close() + }() + + tw := tar.NewWriter(gz) + defer func() { + _ = tw.Close() + }() + itm := Item{ + Type: File, + ItemName: "test.bin", + FileContent: make([]byte, 0x80000), + } + + rootItm := Item{ + Type: Dir, + Children: map[string]*Item{ + itm.ItemName: &itm, + }, + } + + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + rnd.Read(itm.FileContent) + return rootItm.writeItemToArchiver(tw) +} + +func createTarArchiveWithOneFile(t *testing.T) []byte { + writer := bytes.NewBuffer(make([]byte, 0)) + err := createTarArchiveWithOneFileHelper(writer) + require.NoError(t, err) + return writer.Bytes() +} + +func doubleCloseCaseHelper(t *testing.T, readerTypeChecker func(io.Reader) bool) { + bigFileSize := 0x50 + content, archData := CreateTarArchive(t, 5, 8, 10, 50, bigFileSize) + + reader := bytes.NewReader(archData) + + gzf, err := gzip.NewReader(reader) + require.NoError(t, err) + + tarReader := tar.NewReader(gzf) + tarCache := NewTarCache(tarReader, 10, int64(bigFileSize)) + + var m sync.Map + content.fillMap(&m) + + errCh := make(chan error, 100) + ctx, cancel := context.WithCancel(context.Background()) + ctx2 := context.Background() + defer cancel() + var wg sync.WaitGroup + var cnt int32 + +L: + for { + select { + case <-ctx.Done(): + break L + default: + } + contentReader, header, err := tarCache.NextPayload(ctx2) + + if err != nil { + if err == io.EOF { + break + } + require.NoError(t, err) + } + wg.Add(1) + go func(reader io.ReadCloser, head Header) { + defer wg.Done() + defer func() { + if err := reader.Close(); err != nil { + errCh <- err + cancel() + } + }() + select { + case <-ctx.Done(): + return + default: + } + val, ok := m.Load(header.FilePath) + if !ok { + errCh <- errors.New(header.FilePath + " not found") + cancel() + return + } + expected := val.([]byte) + content, err := io.ReadAll(reader) + if err != nil { + errCh <- err + cancel() + return + } + if !bytes.Equal(expected, content) { + errCh <- errors.New(header.FilePath + " content mismatch") + cancel() + return + } + if atomic.AddInt32(&cnt, 1) >= 100 { + if readerTypeChecker(reader) { + if err := reader.Close(); err != nil { + errCh <- err + cancel() + return + } + } + } + }(contentReader, header) + } + err = tarCache.Close() + require.NoError(t, err) + wg.Wait() + close(errCh) + + hasError := false + for e := range errCh { + if e != nil { + hasError = true + } + } + require.False(t, hasError) +} + +func readErrorCase(t *testing.T) { + ctx := context.Background() + archData := createTarArchiveWithOneFile(t) + + reader := newReaderWithError(bytes.NewReader(archData), 0x800) + + gzf, err := gzip.NewReader(reader) + require.NoError(t, err) + + tarReader := tar.NewReader(gzf) + tarCache := NewTarCache(tarReader, 10, math.MaxInt) + + errCh := make(chan error, 100) + var wg sync.WaitGroup + _, _, err = tarCache.NextPayload(ctx) + + require.NotErrorIs(t, err, io.EOF) + require.Error(t, err, errRead.Error()) + + err = tarCache.Close() + require.NoError(t, err) + wg.Wait() + close(errCh) + + for e := range errCh { + if err == nil { + err = e + } + } + require.NoError(t, err) +} + +func successfulCase(t *testing.T) { + bigFileSize := 0x200 + content, archData := CreateTarArchive(t, 5, 5, 30, 70, bigFileSize) + + reader := bytes.NewReader(archData) + gzf, err := gzip.NewReader(reader) + require.NoError(t, err) + + tarReader := tar.NewReader(gzf) + tarCache := NewTarCache(tarReader, 10, int64(bigFileSize)) + + var m sync.Map + content.fillMap(&m) + + errCh := make(chan error, 100) + ctx, cancel := context.WithCancel(context.Background()) + ctx2 := context.Background() + defer cancel() + var wg sync.WaitGroup +L: + for { + select { + case <-ctx.Done(): + break L + default: + } + contentReader, header, err := tarCache.NextPayload(ctx2) + + if err != nil { + if err == io.EOF { + break + } + require.NoError(t, err) + } + wg.Add(1) + go func(reader io.ReadCloser, head Header) { + defer wg.Done() + defer func() { + if err := reader.Close(); err != nil { + errCh <- err + cancel() + } + }() + select { + case <-ctx.Done(): + return + default: + } + val, ok := m.Load(header.FilePath) + if !ok { + errCh <- errors.New(header.FilePath + " not found") + cancel() + return + } + expected := val.([]byte) + content, err := io.ReadAll(reader) + if err != nil { + errCh <- err + cancel() + return + } + if !bytes.Equal(expected, content) { + errCh <- errors.New(header.FilePath + " content mismatch") + cancel() + return + } + m.Delete(header.FilePath) + + }(contentReader, header) + } + err = tarCache.Close() + wg.Wait() + close(errCh) + var err2 error + for e := range errCh { + if err2 == nil { + err2 = e + } + } + require.NoError(t, err2) + require.NoError(t, err) + + // Checking for dictionary emptiness + var l int + m.Range(func(k, v interface{}) bool { + l++ + return true + }) + require.Equal(t, 0, l) +} + +func TestTarCache(t *testing.T) { + t.Run("Successful case", func(t *testing.T) { + successfulCase(t) + }) + + t.Run("Double close of the byte reader", func(t *testing.T) { + doubleCloseCaseHelper(t, func(reader io.Reader) bool { + e, ok := reader.(*helper) + if !ok { + return false + } + _, ok = e.Reader.(*bytes.Reader) + return ok + }) + }) + + t.Run("Double close of the stream reader", func(t *testing.T) { + doubleCloseCaseHelper(t, func(reader io.Reader) bool { + e, ok := reader.(*helper) + if !ok { + return false + } + _, ok = e.Reader.(*bytes.Reader) + return !ok + }) + }) + + t.Run("Read error", func(t *testing.T) { + readErrorCase(t) + }) + +} diff --git a/fs/extract/tarcache/tartest/tartest.go b/fs/extract/tarcache/tartest/tartest.go new file mode 100644 index 000000000..8a2d6dc00 --- /dev/null +++ b/fs/extract/tarcache/tartest/tartest.go @@ -0,0 +1,216 @@ +// Package tartest is a set of functions that assist with the testing of tar archives +package tartest + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "encoding/hex" + "fmt" + "io" + "io/fs" + "math/rand" + "path" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// FindFilePath is a helper function to find the file path of an item +func (i *Item) FindFilePath(skipCnt *int, bigFileSize int, fitInCache bool) string { + for _, obj := range i.Children { + if !obj.IsDir() { + isBig := len(obj.FileContent) >= bigFileSize + if isBig && !fitInCache || !isBig && fitInCache { + if *skipCnt == 0 { + return obj.Name() + } + *skipCnt-- + } + } + } + for _, obj := range i.Children { + if obj.IsDir() { + res := obj.FindFilePath(skipCnt, bigFileSize, fitInCache) + if len(res) > 0 { + return res + } + } + } + return "" +} + +// ItemType is an item type +type ItemType int + +// Item types +const ( + Dir ItemType = iota // directory type + File // regular file type +) + +// Item represents a file or directory +type Item struct { + Type ItemType + ItemName string + Children map[string]*Item + FileContent []byte +} + +// Name returns file name +func (i *Item) Name() string { + return i.ItemName +} + +// Size returns file size +func (i *Item) Size() int64 { + return int64(len(i.FileContent)) +} + +// Mode returns file mode +func (i *Item) Mode() fs.FileMode { + res := fs.ModePerm + if i.Type == Dir { + res |= fs.ModeDir + } + return res +} + +// ModTime returns modification time +func (i *Item) ModTime() time.Time { + return time.Now() +} + +// IsDir returns true if item is a directory +func (i *Item) IsDir() bool { + return i.Type == Dir +} + +// Sys returns underlying data source (can return nil) +func (i *Item) Sys() any { + return nil +} + +func dice(rnd *rand.Rand, percent int) bool { + return rnd.Intn(100) < percent +} + +func randomFileName(rnd *rand.Rand) string { + b := make([]byte, 16) + rnd.Read(b) + return hex.EncodeToString(b) +} + +func (i *Item) createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) { + if deep <= 0 { + return + } + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + + nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir + + for j := 0; j < nItem; j++ { + isFile := deep == 1 || dice(rnd, 60) + newItem := Item{} + var prefix string + if len(i.ItemName) > 0 { + prefix = i.ItemName + "/" + } + if isFile { + newItem.ItemName = fmt.Sprintf("%sfile_%s", prefix, randomFileName(rnd)) + newItem.Type = File + var fileSize int + isSmallFile := dice(rnd, smallFilesPercent) + if isSmallFile { + // [1, bigFileSize) + fileSize = rnd.Intn(bigFileSize-1) + 1 + } else { + // [bigFileSize, 2*bigFileSize) + fileSize = rnd.Intn(bigFileSize) + bigFileSize + } + newItem.FileContent = make([]byte, fileSize) + rnd.Read(newItem.FileContent) + i.Children[newItem.ItemName] = &newItem + } else { + newItem.ItemName = fmt.Sprintf("%sdir_%s", prefix, randomFileName(rnd)) + newItem.Type = Dir + newItem.Children = make(map[string]*Item) + newItem.createTarArchiveContent(deep-1, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize) + i.Children[newItem.ItemName] = &newItem + } + } +} + +// CreateTarArchiveContent creates a tar archive of the given deep. +func CreateTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) *Item { + root := &Item{ + Type: Dir, + ItemName: "", + Children: make(map[string]*Item), + } + + root.createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize) + return root +} + +func (i *Item) writeItemToArchiver(tw *tar.Writer) error { + // In the first pass, we write the files from the current directory. + for _, child := range i.Children { + if child.Type == File { + head, err := tar.FileInfoHeader(child, path.Base(child.ItemName)) + if err != nil { + return err + } + head.Name = child.ItemName + err = tw.WriteHeader(head) + if err != nil { + return err + } + _, err = io.Copy(tw, bytes.NewReader(child.FileContent)) + if err != nil { + return err + } + } + } + + // In the second pass, we write files from child directories. + for _, child := range i.Children { + if child.Type == Dir { + if err := child.writeItemToArchiver(tw); err != nil { + return err + } + } + } + + return nil +} + +func createTarArchiveHelper(writer io.Writer, content *Item) error { + gz := gzip.NewWriter(writer) + defer func() { + _ = gz.Close() + }() + + tw := tar.NewWriter(gz) + defer func() { + _ = tw.Close() + }() + + return content.writeItemToArchiver(tw) +} + +// CreateTarArchive creates a tar archive of the given content. +func CreateTarArchive(t *testing.T, deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) (*Item, []byte) { + content := CreateTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize) + + writer := bytes.NewBuffer(make([]byte, 0)) + + err := createTarArchiveHelper(writer, content) + archData := writer.Bytes() + require.NoError(t, err) + require.NotNil(t, content) + require.NotNil(t, archData) + + return content, archData +} diff --git a/go.mod b/go.mod index 6cffa5cba..7406061b7 100644 --- a/go.mod +++ b/go.mod @@ -168,6 +168,7 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.1 // indirect + github.com/google/gops v0.3.28 // indirect github.com/google/s2a-go v0.1.7 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.5 // indirect @@ -232,6 +233,7 @@ require ( github.com/twmb/murmur3 v1.1.8 // indirect github.com/urfave/cli/v2 v2.27.4 // indirect github.com/willscott/go-nfs-client v0.0.0-20240104095149-b44639837b00 // indirect + github.com/xlab/treeprint v1.2.0 // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect github.com/zeebo/blake3 v0.2.3 // indirect @@ -249,6 +251,7 @@ require ( google.golang.org/protobuf v1.34.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect moul.io/http2curl/v2 v2.3.0 // indirect + rsc.io/goversion v1.2.0 // indirect storj.io/common v0.0.0-20240812101423-26b53789c348 // indirect storj.io/drpc v0.0.35-0.20240709171858-0075ac871661 // indirect storj.io/eventkit v0.0.0-20240415002644-1d9596fee086 // indirect diff --git a/go.sum b/go.sum index a9b511896..66f4c0fe0 100644 --- a/go.sum +++ b/go.sum @@ -369,6 +369,8 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gops v0.3.28 h1:2Xr57tqKAmQYRAfG12E+yLcoa2Y42UJo2lOrUFL9ark= +github.com/google/gops v0.3.28/go.mod h1:6f6+Nl8LcHrzJwi8+p0ii+vmBFSlB4f8cOOkTJ7sk4c= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= @@ -722,6 +724,8 @@ github.com/winfsp/cgofuse v1.5.1-0.20221118130120-84c0898ad2e0 h1:j3un8DqYvvAOqK github.com/winfsp/cgofuse v1.5.1-0.20221118130120-84c0898ad2e0/go.mod h1:uxjoF2jEYT3+x+vC2KJddEGdk/LU8pRowXmyVMHSV5I= github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM= github.com/xanzy/ssh-agent v0.3.3/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw= +github.com/xlab/treeprint v1.2.0 h1:HzHnuAF1plUN2zGlAFHbSQP2qJ0ZAD3XF5XD7OesXRQ= +github.com/xlab/treeprint v1.2.0/go.mod h1:gj5Gd3gPdKtR1ikdDK6fnFLdmIS0X30kTTuNd/WEJu0= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a h1:fZHgsYlfvtyqToslyjUt3VOPF4J7aK/3MPcK7xp3PDk= @@ -1155,6 +1159,8 @@ honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 moul.io/http2curl/v2 v2.3.0 h1:9r3JfDzWPcbIklMOs2TnIFzDYvfAZvjeavG6EzP7jYs= moul.io/http2curl/v2 v2.3.0/go.mod h1:RW4hyBjTWSYDOxapodpNEtX0g5Eb16sxklBqmd2RHcE= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= +rsc.io/goversion v1.2.0 h1:SPn+NLTiAG7w30IRK/DKp1BjvpWabYgxlLp/+kx5J8w= +rsc.io/goversion v1.2.0/go.mod h1:Eih9y/uIBS3ulggl7KNJ09xGSLcuNaLgmvvqa07sgfo= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= rsc.io/tmplfunc v0.0.3 h1:53XFQh69AfOa8Tw0Jm7t+GV7KZhOi6jzsCzTtKbMvzU=