diff --git a/cmd/all/all.go b/cmd/all/all.go index 37b15c5c3..34c3d68f9 100644 --- a/cmd/all/all.go +++ b/cmd/all/all.go @@ -23,6 +23,7 @@ import ( _ "github.com/rclone/rclone/cmd/dedupe" _ "github.com/rclone/rclone/cmd/delete" _ "github.com/rclone/rclone/cmd/deletefile" + _ "github.com/rclone/rclone/cmd/extract" _ "github.com/rclone/rclone/cmd/genautocomplete" _ "github.com/rclone/rclone/cmd/gendocs" _ "github.com/rclone/rclone/cmd/gitannex" diff --git a/cmd/extract/extract.go b/cmd/extract/extract.go new file mode 100644 index 000000000..a3d2abc26 --- /dev/null +++ b/cmd/extract/extract.go @@ -0,0 +1,46 @@ +package extract + +import ( + "context" + "errors" + "strings" + + "github.com/rclone/rclone/cmd" + "github.com/rclone/rclone/fs/extract" + "github.com/spf13/cobra" +) + +func init() { + cmd.Root.AddCommand(commandDefinition) +} + +var commandDefinition = &cobra.Command{ + Use: "extract source:path/to/archive.tar.gz dest:path", + Short: `Extract the tar.gz archive from source to the dest, skipping identical files.`, + // Note: "|" will be replaced by backticks below + Long: strings.ReplaceAll(`Extract the tar.gz archive files from the source to the destination. Does not transfer files that are +identical on source and destination, testing by size and modification +time or MD5SUM. Doesn't delete files from the destination. + +If dest:path doesn't exist, it is created and the contents of the archive source:path/to/targz/archive +go there. + +**Note**: Use the |-P|/|--progress| flag to view real-time transfer statistics. + +**Note**: Use the |--dry-run| or the |--interactive|/|-i| flag to test without copying anything. +`, "|", "`"), + Annotations: map[string]string{ + "groups": "Extract", + }, + Run: func(command *cobra.Command, args []string) { + + cmd.CheckArgs(2, 2, command, args) + fsrc, srcFileName, fdst := cmd.NewFsSrcFileDst(args) + cmd.Run(true, true, command, func() error { + if srcFileName == "" { + return errors.New("the source is required to be a file") + } + return extract.Extract(context.Background(), fdst, fsrc, srcFileName) + }) + }, +} diff --git a/fs/extract/dirtreeholder.go b/fs/extract/dirtreeholder.go new file mode 100644 index 000000000..7cd9c4ae7 --- /dev/null +++ b/fs/extract/dirtreeholder.go @@ -0,0 +1,276 @@ +package extract + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + + "github.com/rclone/rclone/fs" +) + +type Object struct { + isDir bool + data any +} + +type Dir struct { + name string + objs map[string]Object + dirs map[string]*Dir +} + +func NewDir(name string) *Dir { + return &Dir{ + name: name, + objs: make(map[string]Object), + dirs: make(map[string]*Dir), + } +} + +func (d *Dir) Name() string { + return d.name +} + +func NewObject(isDir bool, data any) Object { + return Object{ + isDir: isDir, + data: data, + } +} + +// GetDirByName returns a subdirectory with the given name +func (d *Dir) GetDirByName(name string) *Dir { + if d.name == name { + return d + } + if len(name) <= len(d.name) || !strings.HasPrefix(name, d.name) { + return nil + } + if d.name != "" && name[len(d.name)] != '/' { + return nil + } + idx := strings.Index(name[len(d.name)+1:], "/") + nextDir := name + if idx != -1 { + nextDir = name[:idx+len(d.name)+1] + } + if td, ok := d.dirs[nextDir]; ok { + return td.GetDirByName(name) + } + return nil +} + +// AddChildDir adds a child directory +func (d *Dir) AddChildDir(child *Dir) { + d.dirs[child.name] = child +} + +// AddChildObject adds information about the child object +func (d *Dir) AddChildObject(name string, child Object) { + d.objs[name] = child +} + +type listDirFn func(dir string) (*Dir, error) + +type DirTreeHolder struct { + listF listDirFn + + mu sync.Mutex + // consider changing the variable type to fs.DirTree + root *Dir + dirsInWork map[string]*sync.WaitGroup +} + +type strDirIter struct { + dir string + idx int +} + +func newStrDirIter(dir string) *strDirIter { + return &strDirIter{ + dir: dir, + idx: -1, + } +} + +func (s *strDirIter) next() (cur string, next string) { + if s.dir == "" { + return "", "" + } + if s.idx == -1 { + s.idx = 0 + idx := strings.Index(s.dir, "/") + if idx == -1 { + return "", s.dir + } else { + return "", s.dir[:idx] + } + } + idx := strings.Index(s.dir[s.idx:], "/") + if idx == -1 { + return s.dir, "" + } + defer func(idx int) { + s.idx = s.idx + idx + 1 + }(idx) + + idx2 := strings.Index(s.dir[s.idx+idx+1:], "/") + if idx2 == -1 { + return s.dir[:idx+s.idx], s.dir + } + return s.dir[:idx+s.idx], s.dir[:idx+idx2+s.idx+1] +} + +func (d *Dir) GetObjectByPath(path string) (Object, bool) { + var dirName string + idx := strings.LastIndex(path, "/") + if idx != -1 { + dirName = path[:idx] + } + dir := d.GetDirByName(dirName) + if dir == nil { + return Object{}, false + } + obj, ok := dir.objs[path] + return obj, ok +} + +func NewDirStructHolder(listF listDirFn) *DirTreeHolder { + return &DirTreeHolder{ + listF: listF, + root: nil, + dirsInWork: make(map[string]*sync.WaitGroup), + } +} + +func (l *DirTreeHolder) GetObjectByPath(path string) (Object, bool) { + l.mu.Lock() + defer l.mu.Unlock() + if l.root == nil { + return Object{}, false + } + return l.root.GetObjectByPath(path) +} + +func (l *DirTreeHolder) IsDirExist(path string) bool { + l.mu.Lock() + defer l.mu.Unlock() + if l.root == nil { + return false + } + return l.root.GetDirByName(path) != nil +} + +func (l *DirTreeHolder) UpdateDirList(ctx context.Context, dir string) error { + l.mu.Lock() + iter := newStrDirIter(dir) + aborting := func() bool { + select { + case <-ctx.Done(): + return true + default: + return false + } + } + for { + if aborting() { + l.mu.Unlock() + return ctx.Err() + } + curDir, nextDir := iter.next() + if nextDir == "" { + l.mu.Unlock() + return nil + } + if wg, ok := l.dirsInWork[curDir]; ok { + l.mu.Unlock() + wg.Wait() + if aborting() { + return ctx.Err() + } + l.mu.Lock() + } + if curDir == "" && l.root == nil { + if wg, ok := l.dirsInWork[curDir]; ok { + l.mu.Unlock() + wg.Wait() + if aborting() { + return ctx.Err() + } + l.mu.Lock() + dir := l.root.GetDirByName(curDir) + if dir == nil { + l.mu.Unlock() + return fmt.Errorf("error while updating info about dir %s", nextDir) + } + } else { + wg := &sync.WaitGroup{} + wg.Add(1) + l.dirsInWork[curDir] = wg + l.mu.Unlock() + d, err := l.listF(curDir) + + if err != nil { + if !errors.Is(err, fs.ErrorDirNotFound) { + return err + } + d = NewDir(curDir) + } + l.mu.Lock() + l.root = d + wg = l.dirsInWork[curDir] + delete(l.dirsInWork, curDir) + wg.Done() + } + } + d := l.root.GetDirByName(curDir) + if d == nil { + return errors.New("not possible to go where") + } + if _, ok := d.dirs[nextDir]; ok { + if ok { + continue + } + } + if o, ok := d.objs[nextDir]; ok { + // Where is no such directory + if !o.isDir { + l.mu.Unlock() + return nil + } + + if wg, ok := l.dirsInWork[nextDir]; ok { + l.mu.Unlock() + wg.Wait() + if aborting() { + return ctx.Err() + } + l.mu.Lock() + dir := d.GetDirByName(nextDir) + if dir == nil { + l.mu.Unlock() + return fmt.Errorf("error while updating info about dir %s", nextDir) + } + } else { + wg := &sync.WaitGroup{} + wg.Add(1) + l.dirsInWork[nextDir] = wg + l.mu.Unlock() + td, err := l.listF(nextDir) + if err != nil { + return err + } + l.mu.Lock() + d.dirs[nextDir] = td + wg = l.dirsInWork[nextDir] + delete(l.dirsInWork, nextDir) + wg.Done() + } + } else { + l.mu.Unlock() + return nil + } + } +} diff --git a/fs/extract/dirtreeholder_test.go b/fs/extract/dirtreeholder_test.go new file mode 100644 index 000000000..6516f5f85 --- /dev/null +++ b/fs/extract/dirtreeholder_test.go @@ -0,0 +1,310 @@ +package extract + +import ( + "context" + "fmt" + "math/rand" + "os" + "path/filepath" + "sort" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +func createDirTree(dir *Dir, deep, minItemsInDir, maxItemsInDir int) { + if deep <= 0 { + return + } + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + + nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir + + for j := 0; j < nItem; j++ { + isFile := deep == 1 || dice(rnd, 50) + var prefix string + if len(dir.name) > 0 { + prefix = dir.name + "/" + } + if isFile { + name := fmt.Sprintf("%sfile_%s", prefix, randomFileName(rnd)) + newItem := NewObject(false, name) + dir.objs[name] = newItem + } else { + name := fmt.Sprintf("%sdir_%s", prefix, randomFileName(rnd)) + newItem := NewObject(true, name) + dir.objs[name] = newItem + childDir := NewDir(name) + createDirTree(childDir, deep-1, minItemsInDir, maxItemsInDir) + if len(childDir.dirs) != 0 || len(childDir.objs) != 0 { + dir.dirs[childDir.name] = childDir + } + } + } +} + +type listFnHelper struct { + mu sync.Mutex + cnt atomic.Uint64 + root *Dir +} + +func (l *listFnHelper) Cnt() uint64 { + return l.cnt.Load() +} + +func (l *listFnHelper) ResetCnt() { + l.cnt.Store(0) +} + +func (l *listFnHelper) listDir(dir string) (*Dir, error) { + l.cnt.Add(1) + l.mu.Lock() + defer l.mu.Unlock() + d := l.root.GetDirByName(dir) + if d == nil { + return nil, os.ErrNotExist + } + newDir := NewDir(d.name) + for path, child := range d.objs { + newDir.AddChildObject(path, child) + } + return newDir, nil +} + +func fillSliceWithDirsInfo(s *[]string, dir *Dir) { + for path, child := range dir.objs { + if child.isDir { + *s = append(*s, path) + } + } + for _, child := range dir.dirs { + fillSliceWithDirsInfo(s, child) + } +} + +func fillMapWithFilesInfo(m map[string][]string, dir *Dir) { + files := make([]string, 0) + for path, child := range dir.objs { + if !child.isDir { + files = append(files, filepath.Base(path)) + } + } + + m[dir.Name()] = files + + for _, child := range dir.dirs { + fillMapWithFilesInfo(m, child) + } +} + +func TestListing(t *testing.T) { + t.Run("Test dir struct holder", testDirStructHolder) + + t.Run("Test dir", testDir) + + t.Run("Test string directory iterating", testStrDirIter) +} + +func testDirStructHolder(t *testing.T) { + root := NewDir("") + createDirTree(root, 6, 5, 10) + listF := &listFnHelper{ + root: root, + } + t.Run("Concurrent listing", func(t *testing.T) { + s := make([]string, 0) + m := make(map[string][]string) + fillSliceWithDirsInfo(&s, root) + fillMapWithFilesInfo(m, root) + sort.Slice(s, func(i, j int) bool { + return len(s[i]) < len(s[j]) + }) + require.NotNil(t, root) + holder := NewDirStructHolder(listF.listDir) + halfLen := len(s) / 2 + path := s[halfLen] + expectedCnt := 0 + dIter := newStrDirIter(path) + for { + expectedCnt++ + _, next := dIter.next() + if next == "" { + break + } + } + require.NotNil(t, holder) + + eg, ctx := errgroup.WithContext(context.Background()) + eg.Go(func() error { + return holder.UpdateDirList(ctx, path) + }) + eg.Go(func() error { + return holder.UpdateDirList(ctx, path) + }) + eg.Go(func() error { + return holder.UpdateDirList(ctx, path) + }) + eg.Go(func() error { + return holder.UpdateDirList(ctx, path+"not.exists") + }) + err := eg.Wait() + require.NoError(t, err) + require.Equal(t, uint64(expectedCnt), listF.Cnt()) + dIter = newStrDirIter(path) + var cur, next string + next = "1" + for next != "" { + cur, next = dIter.next() + if next == "" { + break + } + require.True(t, holder.IsDirExist(cur)) + files, ok := m[cur] + require.True(t, ok) + for _, file := range files { + var filePath string + if cur == "" { + filePath = file + } else { + filePath = cur + "/" + file + } + var obj Object + obj, ok = holder.GetObjectByPath(filePath) + if !ok { + require.True(t, ok) + } + require.True(t, ok) + require.Equal(t, filePath, obj.data.(string)) + } + } + }) +} + +func testDir(t *testing.T) { + finalDir := "path/with/more/than/one/dir" + files := map[string][]string{ + "": {"file0.1.ext", "file0.2.ext"}, + "path": {"file1.1.ext", "file1.2.ext"}, + "path/with/more/than/one": {"file2.1.ext", "file2.2.ext"}, + "path/with/more/than/one/dir": {"file3.1.ext", "file3.2.ext"}, + } + + dirIter := newStrDirIter(finalDir) + var root, prevDir *Dir + next := "1" + for next != "" { + var cur string + cur, next = dirIter.next() + dir := NewDir(cur) + if root == nil { + root = dir + } + if files, ok := files[cur]; ok { + for _, file := range files { + obj := NewObject(false, struct{}{}) + if cur == "" { + dir.AddChildObject(file, obj) + } else { + dir.AddChildObject(cur+"/"+file, obj) + } + + } + } + if prevDir != nil { + prevDir.AddChildDir(dir) + prevDir.AddChildObject(dir.Name(), NewObject(true, struct{}{})) + } + prevDir = dir + } + + t.Run("GetDirByName", func(t *testing.T) { + dirIter = newStrDirIter(finalDir) + next = "1" + cnt := 0 + for next != "" { + var cur string + cur, next = dirIter.next() + dir := root.GetDirByName(cur) + require.NotNil(t, dir) + cnt++ + } + require.Equal(t, 7, cnt) + }) + + t.Run("GetObjectByName", func(t *testing.T) { + for dirName, fileNames := range files { + for _, fileName := range fileNames { + var filePath string + if dirName == "" { + filePath = fileName + } else { + filePath = dirName + "/" + fileName + } + obj, ok := root.GetObjectByPath(filePath) + require.True(t, ok) + require.False(t, obj.isDir) + filePath += ".fail" + _, ok = root.GetObjectByPath(filePath) + require.False(t, ok) + } + } + }) + +} + +func testStrDirIter(t *testing.T) { + for _, tc := range []struct { + name string + dir string + res []string + }{ + { + name: "path with more than one dir", + dir: "path/with/more/than/one/dir", + res: []string{ + "", + "path", + "path/with", + "path/with/more", + "path/with/more/than", + "path/with/more/than/one", + "path/with/more/than/one/dir", + "", + }, + }, + { + name: "path with one dir", + dir: "one_dir", + res: []string{ + "", + "one_dir", + "", + }, + }, + { + name: "empty path", + dir: "", + res: []string{ + "", + "", + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + l := newStrDirIter(tc.dir) + for i, p := range tc.res { + if p == "" && i > 0 { + break + } + cur, next := l.next() + require.Equal(t, cur, p) + require.Equal(t, next, tc.res[i+1]) + } + }) + } +} diff --git a/fs/extract/extract.go b/fs/extract/extract.go new file mode 100644 index 000000000..ad4e08c83 --- /dev/null +++ b/fs/extract/extract.go @@ -0,0 +1,681 @@ +package extract + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "context" + "errors" + "fmt" + "io" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/filter" + "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/fs/list" + "github.com/rclone/rclone/fs/operations" +) + +type payloadType int + +const ( + payloadTypeBytes payloadType = iota + payloadTypeIoReader +) + +type extract struct { + // parameters + fdst fs.Fs + fsrc fs.Fs + extrFs *extractFs + srcFileName string + hashType hash.Type // common hash to use + hashOption *fs.HashesOption // open option for the common hash + // internal state + ci *fs.ConfigInfo // global config + ctx context.Context // internal context for controlling go-routines + cancel func() // cancel the context + maxDurationEndTime time.Time // end time if --max-duration is set + tarFormat tar.Format // tar format to use + checkersWg sync.WaitGroup // wait group for auxiliary goroutines + transfersWg sync.WaitGroup // wait group for auxiliary goroutines + + mu sync.Mutex + lastError error + errCnt atomic.Int32 + dsh *DirTreeHolder +} + +type objectInfo struct { + bytesPayload []byte + readerPayload io.Reader + closeFn func() error + header FileHeader +} + +type extractFs struct { + // parameters + fsrc fs.Fs + srcFileName string + features *fs.Features + transfers int + // internal state + mu sync.Mutex + tarCache *TarCache + rawReader io.ReadCloser + cache []objectInfo + reopenRequests chan reopenRequest + openOptions []fs.OpenOption + streamTaskWatcher chan struct{} + tarFormat tar.Format // tar format to use + wg sync.WaitGroup // wait group for auxiliary goroutines +} + +type object struct { + fs *extractFs + header FileHeader + payloadType payloadType + payload []byte // payload of small-sized objects + closeFn func() error + reopenRequest chan reopenRequest + + mu sync.Mutex + readerPayload io.Reader // payload of large objects +} + +type reopenRequest struct { + filePath string + obj *object + respCh chan error +} + +type readCloserWrapper struct { + io.Reader + closeFn func() error +} + +type copyJob struct { + src fs.Object + dst fs.Object + path string +} + +func makeListDir(ctx context.Context, f fs.Fs) listDirFn { + return func(dir string) (*Dir, error) { + dirCtx := filter.SetUseFilter(ctx, f.Features().FilterAware) // make filter-aware backends constrain List + entities, err := list.DirSorted(dirCtx, f, false, dir) + if err != nil { + return nil, err + } + res := NewDir(dir) + for _, entry := range entities { + if obj, ok := entry.(fs.Object); ok { + o := NewObject(false, obj) + res.AddChildObject(obj.Remote(), o) + } + if d, ok := entry.(fs.Directory); ok { + o := NewObject(true, nil) + res.AddChildObject(d.Remote(), o) + } + } + return res, nil + } +} + +func newReadCloserWrapper(r io.Reader, closeFn func() error) io.ReadCloser { + return &readCloserWrapper{ + r, + closeFn, + } +} + +func (r *readCloserWrapper) Close() error { + return r.closeFn() +} + +func newReopenRequest(filePath string, obj *object) reopenRequest { + return reopenRequest{ + filePath: filePath, + obj: obj, + respCh: make(chan error), + } +} + +func (o *object) Fs() fs.Info { + return o.fs +} + +func (o *object) String() string { + if o == nil { + return "" + } + return o.header.FilePath +} + +func (o *object) IsStream() bool { + return o.payloadType == payloadTypeIoReader +} + +func (o *object) Remote() string { + return o.header.FilePath +} + +func (o *object) ModTime(_ context.Context) time.Time { + return o.header.ModTime +} + +func (o *object) Size() int64 { + return o.header.Size +} + +func (o *object) Hash(_ context.Context, _ hash.Type) (string, error) { + return "", nil +} + +func (o *object) Storable() bool { + return true +} + +func (o *object) SetModTime(_ context.Context, _ time.Time) error { + panic("not implemented") +} + +func (o *object) bytesOpen() (io.ReadCloser, error) { + var err error + var once sync.Once + return newReadCloserWrapper(bytes.NewReader(o.payload), func() error { + once.Do(func() { + err = o.closeFn() + }) + return err + }), nil +} + +func (o *object) streamOpen() (io.ReadCloser, error) { + o.mu.Lock() + if o.readerPayload == nil { + o.mu.Unlock() + reopenReq := newReopenRequest(o.header.FilePath, o) + o.reopenRequest <- reopenReq + err := <-reopenReq.respCh + close(reopenReq.respCh) + if err != nil { + return nil, err + } + o.mu.Lock() + if o.readerPayload == nil { + panic("nil readerPayload") + } + } + + defer o.mu.Unlock() + var err error + var once sync.Once + return newReadCloserWrapper(o.readerPayload, func() error { + once.Do(func() { + o.mu.Lock() + o.readerPayload = nil + o.mu.Unlock() + err = o.closeFn() + }) + return err + }), nil +} + +func (o *object) Open(_ context.Context, _ ...fs.OpenOption) (io.ReadCloser, error) { + if o.payloadType == payloadTypeBytes { + return o.bytesOpen() + } + return o.streamOpen() +} + +func (o *object) Update(_ context.Context, _ io.Reader, _ fs.ObjectInfo, _ ...fs.OpenOption) error { + panic("not implemented") +} + +func (o *object) Remove(_ context.Context) error { + panic("not implemented") +} + +func (e *extractFs) Name() string { + return "extract" +} + +func (e *extractFs) Root() string { + return fmt.Sprintf("%s:%s/%s", e.fsrc.Name(), e.fsrc.Root(), e.srcFileName) +} + +func (e *extractFs) String() string { + return fmt.Sprintf("tar.gz archive %s at %s", e.Name(), e.Root()) +} + +func newExtractFs(ctx context.Context, fsrc fs.Fs, srcFileName string, transfers int) *extractFs { + e := &extractFs{ + fsrc: fsrc, + srcFileName: srcFileName, + transfers: transfers, + reopenRequests: make(chan reopenRequest, transfers*2), + streamTaskWatcher: make(chan struct{}), + } + close(e.streamTaskWatcher) + e.features = (&fs.Features{ + CanHaveEmptyDirectories: false, + }).Fill(ctx, e) + return e +} + +func (e *extractFs) Precision() time.Duration { + e.mu.Lock() + defer e.mu.Unlock() + switch e.tarFormat { + case tar.FormatPAX: + return time.Millisecond + default: + return time.Second + } +} + +func (e *extractFs) Open(ctx context.Context, options ...fs.OpenOption) error { + e.openOptions = options + return e.reopen(ctx) +} + +func (e *extractFs) reopen(ctx context.Context) error { + obj, err := e.fsrc.NewObject(ctx, e.srcFileName) + if err != nil { + return err + } + + reader, err := obj.Open(ctx, e.openOptions...) + + if err != nil { + return err + } + + gzr, err := gzip.NewReader(reader) + if err != nil { + return err + } + tarReader := tar.NewReader(gzr) + t := NewTarCache(tarReader, e.transfers, 2000) + e.mu.Lock() + e.tarCache = t + e.rawReader = reader + e.mu.Unlock() + return nil +} + +func (e *extractFs) Hashes() hash.Set { + return hash.Set(hash.None) +} + +func (e *extractFs) Features() *fs.Features { + return e.features +} + +func (e *extractFs) processReopenRequest(ctx context.Context, reopenReq reopenRequest) error { + err := e.reopen(ctx) + if err != nil { + reopenReq.respCh <- err + return err + } + + e.mu.Lock() + tarCache := e.tarCache + e.mu.Unlock() + bytesPayload, readerPayload, closer, header, err := tarCache.ForwardToFile(reopenReq.filePath) + if err != nil { + reopenReq.respCh <- err + return err + } + e.mu.Lock() + found := false + for idx, itm := range e.cache { + if itm.header.FilePath == reopenReq.filePath { + itm.readerPayload = readerPayload + itm.bytesPayload = bytesPayload + itm.closeFn = closer + itm.header = header + e.cache[idx] = itm + found = true + break + } + } + e.mu.Unlock() + if !found { + panic("cache must contain information about this object") + } + + obj, err := e.NewObject(ctx, reopenReq.filePath) + if err != nil { + reopenReq.respCh <- err + return err + } + o := obj.(*object) + reopenReq.obj.readerPayload = o.readerPayload + reopenReq.obj.closeFn = o.closeFn + return nil +} + +func (e *extractFs) AddNextObject(ctx context.Context) (string, error) { + e.mu.Lock() + tarCache := e.tarCache + e.mu.Unlock() + if tarCache == nil { + return "", errors.New("todo: cannot add next object") + } + + var err error +L: + for { + select { + // We block execution flow until the goroutine responsible for copying + // stream has completed its work. + case <-e.streamTaskWatcher: + break L + case <-ctx.Done(): + return "", ctx.Err() + case reopenReq := <-e.reopenRequests: + err = e.processReopenRequest(ctx, reopenReq) + } + } + + if err != nil { + return "", err + } + + var o objectInfo + o.bytesPayload, o.readerPayload, o.closeFn, o.header, err = e.tarCache.Next() + if err != nil { + if err == io.EOF { + return "", nil + } + // There can be two possible errors here: + + // 1. An error occurred when reading data by goroutine. + // 2. An error occurred while reading the next header. + // In any case, we need to wait for all working goroutines to complete their work. + watcherCh := make(chan struct{}) + go func() { + _ = tarCache.Close() + close(watcherCh) + }() + + L2: + for { + select { + case reopenReq := <-e.reopenRequests: + err = e.processReopenRequest(ctx, reopenReq) + continue + case <-watcherCh: + // All the working goroutines have completed their work + break L2 + } + } + return "", err + } + + e.mu.Lock() + defer e.mu.Unlock() + e.cache = append(e.cache, o) + if e.tarFormat == tar.FormatUnknown { + e.tarFormat = o.header.Format + } + if len(e.cache) > e.transfers*3 { + e.cache = e.cache[1:] + } + + return o.header.FilePath, nil +} + +func (e *extractFs) newObject(ctx context.Context, objInfo objectInfo) (fs.Object, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } + + pt := payloadTypeBytes + if objInfo.readerPayload != nil { + pt = payloadTypeIoReader + } + return &object{ + fs: e, + header: objInfo.header, + payload: objInfo.bytesPayload, + readerPayload: objInfo.readerPayload, + payloadType: pt, + closeFn: objInfo.closeFn, + reopenRequest: e.reopenRequests, + }, nil +} + +func (e *extractFs) NewObject(ctx context.Context, remote string) (obj fs.Object, err error) { + e.mu.Lock() + defer e.mu.Unlock() + for _, objInfo := range e.cache { + if objInfo.header.FilePath == remote { + return e.newObject(ctx, objInfo) + } + } + if ctx.Err() != nil { + return nil, ctx.Err() + } + return nil, fs.ErrorObjectNotFound +} + +func (e *extractFs) setStreamTaskWatcherCh(ch chan struct{}) { + e.mu.Lock() + defer e.mu.Unlock() + e.streamTaskWatcher = ch +} + +func (e *extractFs) streamTaskWatcherCh() chan struct{} { + e.mu.Lock() + defer e.mu.Unlock() + return e.streamTaskWatcher +} + +func (e *extractFs) List(_ context.Context, _ string) (entries fs.DirEntries, err error) { + panic("not implemented") +} + +func (e *extractFs) Put(_ context.Context, _ io.Reader, _ fs.ObjectInfo, _ ...fs.OpenOption) (fs.Object, error) { + panic("not implemented") +} + +func (e *extractFs) Mkdir(_ context.Context, _ string) error { + panic("not implemented") +} + +func (e *extractFs) Rmdir(_ context.Context, _ string) error { + panic("not implemented") +} + +func (e *extractFs) Remove(_ context.Context, _ fs.Object) error { + panic("not implemented") +} + +func newExtract(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, srcFileName string) *extract { + ci := fs.GetConfig(ctx) + + s := &extract{ + ctx: ctx, + ci: ci, + fdst: fdst, + fsrc: fsrc, + srcFileName: srcFileName, + } + if ci.MaxDuration > 0 { + s.maxDurationEndTime = time.Now().Add(ci.MaxDuration) + fs.Infof(s.fdst, "Transfer session %v deadline: %s", ci.CutoffMode, s.maxDurationEndTime.Format("2006/01/02 15:04:05")) + } + + // If a max session duration has been defined add a deadline + // to the main context if cutoff mode is hard. This will cut + // the transfers off. + if !s.maxDurationEndTime.IsZero() && ci.CutoffMode == fs.CutoffModeHard { + s.ctx, s.cancel = context.WithDeadline(ctx, s.maxDurationEndTime) + } else { + s.ctx, s.cancel = context.WithCancel(ctx) + } + return s +} + +func (e *extract) transfer(ctx context.Context, in <-chan copyJob) { + defer e.transfersWg.Done() + for { + select { + case <-ctx.Done(): + return + case job, ok := <-in: + if !ok { + return + } + obj := job.src.(*object) + + var watcher chan struct{} + if obj.IsStream() { + watcher = make(chan struct{}) + e.extrFs.setStreamTaskWatcherCh(watcher) + } + _, err := operations.Copy(ctx, e.fdst, job.dst, job.path, job.src) + + // It is possible that operations.Copy() does not call the Open() method for job.src internally. + // Therefore, to avoid freezing when receiving information about the next file in + // the extractFs.AddNextObject() method, we force a call to o.closeFn(). + o := job.src.(*object) + _ = o.closeFn() + if watcher != nil { + close(watcher) + } + + if err != nil { + e.errCnt.Add(1) + e.mu.Lock() + if e.lastError != nil { + e.lastError = err + } + e.mu.Unlock() + e.cancel() + return + } + } + } +} + +func (e *extract) runTransfers(ctx context.Context, in <-chan copyJob) { + for i := 0; i < e.ci.Transfers; i++ { + e.transfersWg.Add(1) + go e.transfer(ctx, in) + } +} + +func (e *extract) checker(ctx context.Context, in <-chan string, out chan<- copyJob) { + defer e.checkersWg.Done() + for { + var filePath string + var ok bool + + select { + case <-ctx.Done(): + return + case filePath, ok = <-in: + + if !ok { + return + } + } + + idx := strings.LastIndex(filePath, "/") + var dir string + if idx != -1 { + dir = filePath[:idx] + } + err := e.dsh.UpdateDirList(ctx, dir) + if err != nil { + // TODO: think about retries + e.cancel() + return + } + src, err := e.extrFs.NewObject(ctx, filePath) + if err != nil { + e.cancel() + return + } + + job := copyJob{ + src: src, + path: filePath, + } + if objInfo, ok := e.dsh.GetObjectByPath(filePath); ok && !objInfo.isDir { + if dst := objInfo.data.(fs.Object); dst != nil { + job.dst = dst + } + } + out <- job + } +} + +func (e *extract) runCheckers(ctx context.Context, in <-chan string, out chan<- copyJob) { + for i := 0; i < e.ci.Transfers; i++ { + e.checkersWg.Add(1) + go e.checker(ctx, in, out) + } +} + +func (e *extract) run() error { + fExtr := newExtractFs(e.ctx, e.fsrc, e.srcFileName, e.ci.Transfers) + + e.dsh = NewDirStructHolder(makeListDir(e.ctx, e.fdst)) + e.extrFs = fExtr + + e.hashType, e.hashOption = operations.CommonHash(e.ctx, fExtr, e.fdst) + + // Options for the download + downloadOptions := []fs.OpenOption{e.hashOption} + for _, option := range e.ci.DownloadHeaders { + downloadOptions = append(downloadOptions, option) + } + + err := fExtr.Open(e.ctx, downloadOptions...) + + if err != nil { + return err + } + + pathCh := make(chan string, e.ci.Transfers) + jobCh := make(chan copyJob, e.ci.Transfers) + + e.runCheckers(e.ctx, pathCh, jobCh) + e.runTransfers(e.ctx, jobCh) + + var exitErr error + for { + path, err := fExtr.AddNextObject(e.ctx) + if err != nil { + exitErr = err + break + } + if path == "" { + break + } + + pathCh <- path + } + close(pathCh) + e.checkersWg.Wait() + close(jobCh) + e.transfersWg.Wait() + if e.lastError != nil { + return e.lastError + } + return exitErr +} + +func Extract(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, srcFileName string) error { + do := newExtract(ctx, fdst, fsrc, srcFileName) + return do.run() +} diff --git a/fs/extract/extract_test.go b/fs/extract/extract_test.go new file mode 100644 index 000000000..9d4c6eaa9 --- /dev/null +++ b/fs/extract/extract_test.go @@ -0,0 +1,87 @@ +package extract + +import ( + "context" + "io" + "testing" + + "github.com/rclone/rclone/backend/memory" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fstest/mockobject" + "github.com/stretchr/testify/require" +) + +// test cases +// 1. Happy path: extract all files from archive to a directory; +// 2. Fail once when writing a file to the target directory and write it successfully when writing the same file again; +// 3. Fail all attempts to write a file to the target directory; +// 4. Fail once when reading a file from the archive, but read it successfully the second time; +// 5. Fail all attempts to read a file from the archive; + +func TestExtract(t *testing.T) { + t.Run("Test happy path", testHappyPath) +} + +func checkFunc(ctx context.Context, t *testing.T, dstFs fs.Fs, files map[string]struct{}, itm *item) { + for _, obj := range itm.children { + if obj.IsDir() { + checkFunc(ctx, t, dstFs, files, obj) + continue + } + _, ok := files[obj.Name()] + require.True(t, ok) + delete(files, obj.Name()) + o, err := dstFs.NewObject(ctx, obj.Name()) + require.NoError(t, err) + reader, err := o.Open(ctx) + require.NoError(t, err) + require.NotNil(t, reader) + actual, err := io.ReadAll(reader) + require.NoError(t, err) + require.Equal(t, obj.fileContent, actual) + } +} + +func testHappyPath(t *testing.T) { + ctx := context.Background() + ctx, ci := fs.AddConfig(ctx) + ci.Transfers = 10 + ci2 := fs.GetConfig(ctx) + require.Equal(t, ci.Transfers, ci2.Transfers) + + rootItem, archiveContent := createTarArchive(t, 5, 6, 8, 50, 1000) + archiveFile := "archive.tar.gz" + srcFs, err := memory.NewFs(ctx, "memory", "src:dir", nil) + require.NoError(t, err) + o := mockobject.New(archiveFile).WithContent(archiveContent, mockobject.SeekModeNone) + reader, err := o.Open(ctx) + require.NoError(t, err) + + // Put archive file to source fs + _, err = srcFs.Put(ctx, reader, o) + require.NoError(t, err) + dstFs, err := memory.NewFs(ctx, "memory", "dst:dir", nil) + require.NoError(t, err) + + // Extract archive to dst fs + err = Extract(ctx, dstFs, srcFs, archiveFile) + require.NoError(t, err) + + ll := make(map[string]struct{}) + listRer := dstFs.(fs.ListRer) + require.NotNil(t, listRer) + err = listRer.ListR(ctx, "", func(entries fs.DirEntries) error { + for _, entry := range entries { + obj, ok := entry.(fs.Object) + if !ok { + continue + } + ll[obj.Remote()] = struct{}{} + + } + return nil + }) + checkFunc(ctx, t, dstFs, ll, rootItem) + + require.Equal(t, 0, len(ll)) +} diff --git a/fs/extract/tarcache.go b/fs/extract/tarcache.go new file mode 100644 index 000000000..3c21e1d38 --- /dev/null +++ b/fs/extract/tarcache.go @@ -0,0 +1,190 @@ +package extract + +import ( + "archive/tar" + "bytes" + "errors" + "io" + "sync" + "time" +) + +type TarCache struct { + reader *tar.Reader + workerCnt int + maxSizeInMemory int64 + + ch chan struct{} + streamWg sync.WaitGroup + wg sync.WaitGroup +} + +type FileHeader struct { + FilePath string + ModTime time.Time + Size int64 + Format tar.Format +} + +var ( + streamReaderAlreadyCloseError = errors.New("streamReadCloser already closed") + bytesReaderAlreadyCloseError = errors.New("bytesReadCloser already closed") +) + +func (t *TarCache) WaitWorkersDone() { + t.wg.Wait() +} + +func (t *TarCache) Close() error { + t.WaitWorkersDone() + if t.ch != nil { + close(t.ch) + t.ch = nil + return nil + } + return errors.New("TarCache already closed") +} + +func NewTarCache(reader *tar.Reader, workerCnt int, maxSizeInMemory int64) *TarCache { + return &TarCache{ + reader: reader, + workerCnt: workerCnt, + maxSizeInMemory: maxSizeInMemory, + ch: make(chan struct{}, workerCnt), + } +} + +type helper struct { + io.Reader + closer func() error +} + +func (h *helper) Close() error { + return h.closer() +} + +func (t *TarCache) NextPayload() (_ io.ReadCloser, _ FileHeader, err error) { + bytesPayload, reader, closer, header, err := t.Next() + if err != nil { + return nil, FileHeader{}, err + } + if reader != nil { + return &helper{reader, closer}, header, nil + } + reader = bytes.NewReader(bytesPayload) + return &helper{reader, closer}, header, nil +} + +type streamCloser struct { + ch chan struct{} + cache *TarCache + isBytes bool +} + +func newStreamCloser(cache *TarCache, isBytes bool) *streamCloser { + if !isBytes { + cache.streamWg.Add(1) + } + cache.wg.Add(1) + return &streamCloser{ + ch: cache.ch, + cache: cache, + isBytes: isBytes, + } +} + +func (s *streamCloser) close() error { + if s.ch == nil { + if s.isBytes { + return bytesReaderAlreadyCloseError + } + return streamReaderAlreadyCloseError + } + ch := s.ch + s.ch = nil + <-ch + if !s.isBytes { + s.cache.streamWg.Done() + } + s.cache.wg.Done() + return nil +} +func (t *TarCache) ForwardToFile(filePath string) (_ []byte, _ io.Reader, _ func() error, _ FileHeader, err error) { + for { + header, err := t.reader.Next() + if err != nil { + return nil, nil, nil, FileHeader{}, err + } + switch header.Typeflag { + case tar.TypeDir: + continue + case tar.TypeReg: + if filePath != header.Name { + // TODO: handle case insensitivity + continue + } + h := FileHeader{ + FilePath: header.Name, + ModTime: header.ModTime, + Size: header.Size, + Format: header.Format, + } + + if header.Size < t.maxSizeInMemory { + payload, err := io.ReadAll(t.reader) + if err != nil { + return nil, nil, nil, FileHeader{}, err + } + closer := newStreamCloser(t, true) + return payload, nil, closer.close, h, nil + } + closer := newStreamCloser(t, false) + return nil, t.reader, closer.close, h, nil + default: + continue + } + } +} + +func (t *TarCache) Next() (_ []byte, _ io.Reader, _ func() error, _ FileHeader, err error) { + // We block the execution flow if the number of currently processed files exceeds the limit. + t.ch <- struct{}{} + defer func() { + if err != nil { + <-t.ch + } + }() + // We also block the execution flow if there is a reader without caching. + t.streamWg.Wait() + for { + header, err := t.reader.Next() + if err != nil { + return nil, nil, nil, FileHeader{}, err + } + switch header.Typeflag { + case tar.TypeDir: + continue + case tar.TypeReg: + h := FileHeader{ + FilePath: header.Name, + ModTime: header.ModTime, + Size: header.Size, + Format: header.Format, + } + + if header.Size < t.maxSizeInMemory { + var payload []byte + payload, err = io.ReadAll(t.reader) + if err != nil { + return nil, nil, nil, FileHeader{}, err + } + closer := newStreamCloser(t, true) + return payload, nil, closer.close, h, nil + } + closer := newStreamCloser(t, false) + return nil, t.reader, closer.close, h, nil + default: + continue + } + } +} diff --git a/fs/extract/tarcache_test.go b/fs/extract/tarcache_test.go new file mode 100644 index 000000000..768f1dcef --- /dev/null +++ b/fs/extract/tarcache_test.go @@ -0,0 +1,495 @@ +package extract + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "context" + "encoding/hex" + "errors" + "fmt" + "io" + "io/fs" + "math" + "math/rand" + "path" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type itemType int + +const ( + dir itemType = iota + file +) + +type item struct { + t itemType + name string + children map[string]*item + fileContent []byte +} + +func (i *item) Name() string { + return i.name +} + +func (i *item) Size() int64 { + return int64(len(i.fileContent)) +} + +func (i *item) Mode() fs.FileMode { + res := fs.ModePerm + if i.t == dir { + res |= fs.ModeDir + } + return res +} + +func (i *item) ModTime() time.Time { + return time.Now() +} + +func (i *item) IsDir() bool { + return i.t == dir +} + +func (i *item) Sys() any { + return nil +} + +func dice(rnd *rand.Rand, percent int) bool { + return rnd.Intn(100) < percent +} + +func randomFileName(rnd *rand.Rand) string { + b := make([]byte, 16) + rnd.Read(b) + return hex.EncodeToString(b) +} + +type errorReader struct { + io.Reader + + bytesRead int + threshold int +} + +var readError = errors.New("read error") + +func newReaderWithError(rc io.Reader, threshold int) *errorReader { + return &errorReader{rc, 0, threshold} +} + +func (e *errorReader) Read(p []byte) (n int, err error) { + if e.bytesRead > e.threshold { + return 0, readError + } + n, err = e.Reader.Read(p) + e.bytesRead += n + return +} + +func (i *item) createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) { + if deep <= 0 { + return + } + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + + nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir + + for j := 0; j < nItem; j++ { + isFile := deep == 1 || dice(rnd, 60) + newItem := item{} + var prefix string + if len(i.name) > 0 { + prefix = i.name + "/" + } + if isFile { + newItem.name = fmt.Sprintf("%sfile_%s", prefix, randomFileName(rnd)) + newItem.t = file + var fileSize int + isSmallFile := dice(rnd, smallFilesPercent) + if isSmallFile { + // [1, bigFileSize) + fileSize = rnd.Intn(bigFileSize-1) + 1 + } else { + // [bigFileSize, 2*bigFileSize) + fileSize = rnd.Intn(bigFileSize) + bigFileSize + } + newItem.fileContent = make([]byte, fileSize) + rnd.Read(newItem.fileContent) + i.children[newItem.name] = &newItem + } else { + newItem.name = fmt.Sprintf("%sdir_%s", prefix, randomFileName(rnd)) + newItem.t = dir + newItem.children = make(map[string]*item) + newItem.createTarArchiveContent(deep-1, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize) + i.children[newItem.name] = &newItem + } + } +} + +func (i *item) fillMap(m *sync.Map) { + for _, v := range i.children { + if v.t == file { + m.Store(v.name, v.fileContent) + } else if v.t == dir { + v.fillMap(m) + } + } +} + +func createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) *item { + root := &item{ + t: dir, + name: "", + children: make(map[string]*item), + } + + root.createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize) + return root +} + +func (i *item) writeItemToArchiver(tw *tar.Writer) error { + // In the first pass, we write the files from the current directory. + for _, child := range i.children { + if child.t == file { + head, err := tar.FileInfoHeader(child, path.Base(child.name)) + if err != nil { + return err + } + head.Name = child.name + err = tw.WriteHeader(head) + if err != nil { + return err + } + _, err = io.Copy(tw, bytes.NewReader(child.fileContent)) + if err != nil { + return err + } + } + } + + // In the second pass, we write files from child directories. + for _, child := range i.children { + if child.t == dir { + if err := child.writeItemToArchiver(tw); err != nil { + return err + } + } + } + + return nil +} + +func createTarArchiveHelper(writer io.Writer, content *item) error { + gz := gzip.NewWriter(writer) + defer func() { + _ = gz.Close() + }() + + tw := tar.NewWriter(gz) + defer func() { + _ = tw.Close() + }() + + return content.writeItemToArchiver(tw) +} + +func createTarArchive(t *testing.T, deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) (*item, []byte) { + content := createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize) + + writer := bytes.NewBuffer(make([]byte, 0)) + + err := createTarArchiveHelper(writer, content) + archData := writer.Bytes() + require.NoError(t, err) + require.NotNil(t, content) + require.NotNil(t, archData) + + return content, archData +} + +func createTarArchiveWithOneFileHelper(w io.Writer) error { + gz := gzip.NewWriter(w) + defer func() { + _ = gz.Close() + }() + + tw := tar.NewWriter(gz) + defer func() { + _ = tw.Close() + }() + itm := item{ + t: file, + name: "test.bin", + fileContent: make([]byte, 0x80000), + } + + rootItm := item{ + t: dir, + children: map[string]*item{ + itm.name: &itm, + }, + } + + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + rnd.Read(itm.fileContent) + return rootItm.writeItemToArchiver(tw) +} + +func createTarArchiveWithOneFile(t *testing.T) []byte { + writer := bytes.NewBuffer(make([]byte, 0)) + err := createTarArchiveWithOneFileHelper(writer) + require.NoError(t, err) + return writer.Bytes() +} + +func doubleCloseCaseHelper(t *testing.T, readerTypeChecker func(io.Reader) bool, errorText string) { + bigFileSize := 0x50 + content, archData := createTarArchive(t, 5, 5, 10, 70, bigFileSize) + + reader := bytes.NewReader(archData) + + gzf, err := gzip.NewReader(reader) + require.NoError(t, err) + + tarReader := tar.NewReader(gzf) + tarCache := NewTarCache(tarReader, 10, int64(bigFileSize)) + + var m sync.Map + content.fillMap(&m) + + errCh := make(chan error, 100) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var wg sync.WaitGroup + var cnt int32 + for { + select { + case <-ctx.Done(): + break + default: + } + contentReader, header, err := tarCache.NextPayload() + + if err != nil { + if err == io.EOF { + break + } + require.NoError(t, err) + } + wg.Add(1) + go func(reader io.ReadCloser, head FileHeader) { + defer wg.Done() + defer func() { + if err := reader.Close(); err != nil { + errCh <- err + cancel() + } + }() + select { + case <-ctx.Done(): + return + default: + } + val, ok := m.Load(header.FilePath) + if !ok { + errCh <- errors.New(header.FilePath + " not found") + cancel() + return + } + expected := val.([]byte) + content, err := io.ReadAll(reader) + if err != nil { + errCh <- err + cancel() + return + } + if !bytes.Equal(expected, content) { + errCh <- errors.New(header.FilePath + " content mismatch") + cancel() + return + } + if atomic.AddInt32(&cnt, 1) >= 100 { + if readerTypeChecker(reader) { + if err := reader.Close(); err != nil { + errCh <- err + cancel() + return + } + } + } + }(contentReader, header) + } + err = tarCache.Close() + require.NoError(t, err) + wg.Wait() + close(errCh) + + for e := range errCh { + if err == nil { + err = e + } + } + require.EqualError(t, err, errorText) +} + +func readErrorCase(t *testing.T) { + archData := createTarArchiveWithOneFile(t) + + reader := newReaderWithError(bytes.NewReader(archData), 0x800) + + gzf, err := gzip.NewReader(reader) + require.NoError(t, err) + + tarReader := tar.NewReader(gzf) + tarCache := NewTarCache(tarReader, 10, math.MaxInt) + + errCh := make(chan error, 100) + var wg sync.WaitGroup + _, _, err = tarCache.NextPayload() + + require.NotErrorIs(t, err, io.EOF) + require.Error(t, err, readError.Error()) + + err = tarCache.Close() + require.NoError(t, err) + wg.Wait() + close(errCh) + + for e := range errCh { + if err == nil { + err = e + } + } + require.NoError(t, err) +} + +func successfulCase(t *testing.T) { + bigFileSize := 0x200 + content, archData := createTarArchive(t, 5, 5, 30, 70, bigFileSize) + + reader := bytes.NewReader(archData) + gzf, err := gzip.NewReader(reader) + require.NoError(t, err) + + tarReader := tar.NewReader(gzf) + tarCache := NewTarCache(tarReader, 10, int64(bigFileSize)) + + var m sync.Map + content.fillMap(&m) + + errCh := make(chan error, 100) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var wg sync.WaitGroup + for { + select { + case <-ctx.Done(): + break + default: + } + contentReader, header, err := tarCache.NextPayload() + + if err != nil { + if err == io.EOF { + break + } + require.NoError(t, err) + } + wg.Add(1) + go func(reader io.ReadCloser, head FileHeader) { + defer wg.Done() + defer func() { + if err := reader.Close(); err != nil { + errCh <- err + cancel() + } + }() + select { + case <-ctx.Done(): + return + default: + } + val, ok := m.Load(header.FilePath) + if !ok { + errCh <- errors.New(header.FilePath + " not found") + cancel() + return + } + expected := val.([]byte) + content, err := io.ReadAll(reader) + if err != nil { + errCh <- err + cancel() + return + } + if !bytes.Equal(expected, content) { + errCh <- errors.New(header.FilePath + " content mismatch") + cancel() + return + } + m.Delete(header.FilePath) + + }(contentReader, header) + } + err = tarCache.Close() + wg.Wait() + close(errCh) + var err2 error + for e := range errCh { + if err2 == nil { + err2 = e + } + } + require.NoError(t, err2) + require.NoError(t, err) + + // Checking for dictionary emptiness + var l int + m.Range(func(k, v interface{}) bool { + l++ + return true + }) + require.Equal(t, 0, l) +} + +func TestTarCache(t *testing.T) { + t.Run("Successful case", func(t *testing.T) { + successfulCase(t) + }) + + t.Run("Double close of the byte reader", func(t *testing.T) { + doubleCloseCaseHelper(t, func(reader io.Reader) bool { + e, ok := reader.(*helper) + if !ok { + return false + } + _, ok = e.Reader.(*bytes.Reader) + return ok + }, bytesReaderAlreadyCloseError.Error()) + }) + + t.Run("Double close of the stream reader", func(t *testing.T) { + doubleCloseCaseHelper(t, func(reader io.Reader) bool { + e, ok := reader.(*helper) + if !ok { + return false + } + _, ok = e.Reader.(*bytes.Reader) + return !ok + }, streamReaderAlreadyCloseError.Error()) + }) + + t.Run("Read error", func(t *testing.T) { + readErrorCase(t) + }) + +}