Add extract command
Some checks failed
/ DCO (pull_request) Failing after 38s
/ Builds (pull_request) Successful in 2m7s
/ Lint (pull_request) Failing after 2m35s
/ Test (pull_request) Successful in 3m8s

Signed-off-by: Aleksey Kravchenko <al.kravchenko@yadro.com>
This commit is contained in:
Aleksey Kravchenko 2025-02-10 21:50:15 +03:00
parent 12b572e62a
commit 636217076f
8 changed files with 2086 additions and 0 deletions

View file

@ -23,6 +23,7 @@ import (
_ "github.com/rclone/rclone/cmd/dedupe"
_ "github.com/rclone/rclone/cmd/delete"
_ "github.com/rclone/rclone/cmd/deletefile"
_ "github.com/rclone/rclone/cmd/extract"
_ "github.com/rclone/rclone/cmd/genautocomplete"
_ "github.com/rclone/rclone/cmd/gendocs"
_ "github.com/rclone/rclone/cmd/gitannex"

46
cmd/extract/extract.go Normal file
View file

@ -0,0 +1,46 @@
package extract
import (
"context"
"errors"
"strings"
"github.com/rclone/rclone/cmd"
"github.com/rclone/rclone/fs/extract"
"github.com/spf13/cobra"
)
func init() {
cmd.Root.AddCommand(commandDefinition)
}
var commandDefinition = &cobra.Command{
Use: "extract source:path/to/archive.tar.gz dest:path",
Short: `Extract the tar.gz archive from source to the dest, skipping identical files.`,
// Note: "|" will be replaced by backticks below
Long: strings.ReplaceAll(`Extract the tar.gz archive files from the source to the destination. Does not transfer files that are
identical on source and destination, testing by size and modification
time or MD5SUM. Doesn't delete files from the destination.
If dest:path doesn't exist, it is created and the contents of the archive source:path/to/targz/archive
go there.
**Note**: Use the |-P|/|--progress| flag to view real-time transfer statistics.
**Note**: Use the |--dry-run| or the |--interactive|/|-i| flag to test without copying anything.
`, "|", "`"),
Annotations: map[string]string{
"groups": "Extract",
},
Run: func(command *cobra.Command, args []string) {
cmd.CheckArgs(2, 2, command, args)
fsrc, srcFileName, fdst := cmd.NewFsSrcFileDst(args)
cmd.Run(true, true, command, func() error {
if srcFileName == "" {
return errors.New("the source is required to be a file")
}
return extract.Extract(context.Background(), fdst, fsrc, srcFileName)
})
},
}

276
fs/extract/dirtreeholder.go Normal file
View file

@ -0,0 +1,276 @@
package extract
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"github.com/rclone/rclone/fs"
)
type Object struct {
isDir bool
data any
}
type Dir struct {
name string
objs map[string]Object
dirs map[string]*Dir
}
func NewDir(name string) *Dir {
return &Dir{
name: name,
objs: make(map[string]Object),
dirs: make(map[string]*Dir),
}
}
func (d *Dir) Name() string {
return d.name
}
func NewObject(isDir bool, data any) Object {
return Object{
isDir: isDir,
data: data,
}
}
// GetDirByName returns a subdirectory with the given name
func (d *Dir) GetDirByName(name string) *Dir {
if d.name == name {
return d
}
if len(name) <= len(d.name) || !strings.HasPrefix(name, d.name) {
return nil
}
if d.name != "" && name[len(d.name)] != '/' {
return nil
}
idx := strings.Index(name[len(d.name)+1:], "/")
nextDir := name
if idx != -1 {
nextDir = name[:idx+len(d.name)+1]
}
if td, ok := d.dirs[nextDir]; ok {
return td.GetDirByName(name)
}
return nil
}
// AddChildDir adds a child directory
func (d *Dir) AddChildDir(child *Dir) {
d.dirs[child.name] = child
}
// AddChildObject adds information about the child object
func (d *Dir) AddChildObject(name string, child Object) {
d.objs[name] = child
}
type listDirFn func(dir string) (*Dir, error)
type DirTreeHolder struct {
listF listDirFn
mu sync.Mutex
// consider changing the variable type to fs.DirTree
root *Dir
dirsInWork map[string]*sync.WaitGroup
}
type strDirIter struct {
dir string
idx int
}
func newStrDirIter(dir string) *strDirIter {
return &strDirIter{
dir: dir,
idx: -1,
}
}
func (s *strDirIter) next() (cur string, next string) {
if s.dir == "" {
return "", ""
}
if s.idx == -1 {
s.idx = 0
idx := strings.Index(s.dir, "/")
if idx == -1 {
return "", s.dir
} else {
return "", s.dir[:idx]
}
}
idx := strings.Index(s.dir[s.idx:], "/")
if idx == -1 {
return s.dir, ""
}
defer func(idx int) {
s.idx = s.idx + idx + 1
}(idx)
idx2 := strings.Index(s.dir[s.idx+idx+1:], "/")
if idx2 == -1 {
return s.dir[:idx+s.idx], s.dir
}
return s.dir[:idx+s.idx], s.dir[:idx+idx2+s.idx+1]
}
func (d *Dir) GetObjectByPath(path string) (Object, bool) {
var dirName string
idx := strings.LastIndex(path, "/")
if idx != -1 {
dirName = path[:idx]
}
dir := d.GetDirByName(dirName)
if dir == nil {
return Object{}, false
}
obj, ok := dir.objs[path]
return obj, ok
}
func NewDirStructHolder(listF listDirFn) *DirTreeHolder {
return &DirTreeHolder{
listF: listF,
root: nil,
dirsInWork: make(map[string]*sync.WaitGroup),
}
}
func (l *DirTreeHolder) GetObjectByPath(path string) (Object, bool) {
l.mu.Lock()
defer l.mu.Unlock()
if l.root == nil {
return Object{}, false
}
return l.root.GetObjectByPath(path)
}
func (l *DirTreeHolder) IsDirExist(path string) bool {
l.mu.Lock()
defer l.mu.Unlock()
if l.root == nil {
return false
}
return l.root.GetDirByName(path) != nil
}
func (l *DirTreeHolder) UpdateDirList(ctx context.Context, dir string) error {
l.mu.Lock()
iter := newStrDirIter(dir)
aborting := func() bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}
for {
if aborting() {
l.mu.Unlock()
return ctx.Err()
}
curDir, nextDir := iter.next()
if nextDir == "" {
l.mu.Unlock()
return nil
}
if wg, ok := l.dirsInWork[curDir]; ok {
l.mu.Unlock()
wg.Wait()
if aborting() {
return ctx.Err()
}
l.mu.Lock()
}
if curDir == "" && l.root == nil {
if wg, ok := l.dirsInWork[curDir]; ok {
l.mu.Unlock()
wg.Wait()
if aborting() {
return ctx.Err()
}
l.mu.Lock()
dir := l.root.GetDirByName(curDir)
if dir == nil {
l.mu.Unlock()
return fmt.Errorf("error while updating info about dir %s", nextDir)
}
} else {
wg := &sync.WaitGroup{}
wg.Add(1)
l.dirsInWork[curDir] = wg
l.mu.Unlock()
d, err := l.listF(curDir)
if err != nil {
if !errors.Is(err, fs.ErrorDirNotFound) {
return err
}
d = NewDir(curDir)
}
l.mu.Lock()
l.root = d
wg = l.dirsInWork[curDir]
delete(l.dirsInWork, curDir)
wg.Done()
}
}
d := l.root.GetDirByName(curDir)
if d == nil {
return errors.New("not possible to go where")
}
if _, ok := d.dirs[nextDir]; ok {
if ok {
continue
}
}
if o, ok := d.objs[nextDir]; ok {
// Where is no such directory
if !o.isDir {
l.mu.Unlock()
return nil
}
if wg, ok := l.dirsInWork[nextDir]; ok {
l.mu.Unlock()
wg.Wait()
if aborting() {
return ctx.Err()
}
l.mu.Lock()
dir := d.GetDirByName(nextDir)
if dir == nil {
l.mu.Unlock()
return fmt.Errorf("error while updating info about dir %s", nextDir)
}
} else {
wg := &sync.WaitGroup{}
wg.Add(1)
l.dirsInWork[nextDir] = wg
l.mu.Unlock()
td, err := l.listF(nextDir)
if err != nil {
return err
}
l.mu.Lock()
d.dirs[nextDir] = td
wg = l.dirsInWork[nextDir]
delete(l.dirsInWork, nextDir)
wg.Done()
}
} else {
l.mu.Unlock()
return nil
}
}
}

View file

@ -0,0 +1,310 @@
package extract
import (
"context"
"fmt"
"math/rand"
"os"
"path/filepath"
"sort"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func createDirTree(dir *Dir, deep, minItemsInDir, maxItemsInDir int) {
if deep <= 0 {
return
}
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir
for j := 0; j < nItem; j++ {
isFile := deep == 1 || dice(rnd, 50)
var prefix string
if len(dir.name) > 0 {
prefix = dir.name + "/"
}
if isFile {
name := fmt.Sprintf("%sfile_%s", prefix, randomFileName(rnd))
newItem := NewObject(false, name)
dir.objs[name] = newItem
} else {
name := fmt.Sprintf("%sdir_%s", prefix, randomFileName(rnd))
newItem := NewObject(true, name)
dir.objs[name] = newItem
childDir := NewDir(name)
createDirTree(childDir, deep-1, minItemsInDir, maxItemsInDir)
if len(childDir.dirs) != 0 || len(childDir.objs) != 0 {
dir.dirs[childDir.name] = childDir
}
}
}
}
type listFnHelper struct {
mu sync.Mutex
cnt atomic.Uint64
root *Dir
}
func (l *listFnHelper) Cnt() uint64 {
return l.cnt.Load()
}
func (l *listFnHelper) ResetCnt() {
l.cnt.Store(0)
}
func (l *listFnHelper) listDir(dir string) (*Dir, error) {
l.cnt.Add(1)
l.mu.Lock()
defer l.mu.Unlock()
d := l.root.GetDirByName(dir)
if d == nil {
return nil, os.ErrNotExist
}
newDir := NewDir(d.name)
for path, child := range d.objs {
newDir.AddChildObject(path, child)
}
return newDir, nil
}
func fillSliceWithDirsInfo(s *[]string, dir *Dir) {
for path, child := range dir.objs {
if child.isDir {
*s = append(*s, path)
}
}
for _, child := range dir.dirs {
fillSliceWithDirsInfo(s, child)
}
}
func fillMapWithFilesInfo(m map[string][]string, dir *Dir) {
files := make([]string, 0)
for path, child := range dir.objs {
if !child.isDir {
files = append(files, filepath.Base(path))
}
}
m[dir.Name()] = files
for _, child := range dir.dirs {
fillMapWithFilesInfo(m, child)
}
}
func TestListing(t *testing.T) {
t.Run("Test dir struct holder", testDirStructHolder)
t.Run("Test dir", testDir)
t.Run("Test string directory iterating", testStrDirIter)
}
func testDirStructHolder(t *testing.T) {
root := NewDir("")
createDirTree(root, 6, 5, 10)
listF := &listFnHelper{
root: root,
}
t.Run("Concurrent listing", func(t *testing.T) {
s := make([]string, 0)
m := make(map[string][]string)
fillSliceWithDirsInfo(&s, root)
fillMapWithFilesInfo(m, root)
sort.Slice(s, func(i, j int) bool {
return len(s[i]) < len(s[j])
})
require.NotNil(t, root)
holder := NewDirStructHolder(listF.listDir)
halfLen := len(s) / 2
path := s[halfLen]
expectedCnt := 0
dIter := newStrDirIter(path)
for {
expectedCnt++
_, next := dIter.next()
if next == "" {
break
}
}
require.NotNil(t, holder)
eg, ctx := errgroup.WithContext(context.Background())
eg.Go(func() error {
return holder.UpdateDirList(ctx, path)
})
eg.Go(func() error {
return holder.UpdateDirList(ctx, path)
})
eg.Go(func() error {
return holder.UpdateDirList(ctx, path)
})
eg.Go(func() error {
return holder.UpdateDirList(ctx, path+"not.exists")
})
err := eg.Wait()
require.NoError(t, err)
require.Equal(t, uint64(expectedCnt), listF.Cnt())
dIter = newStrDirIter(path)
var cur, next string
next = "1"
for next != "" {
cur, next = dIter.next()
if next == "" {
break
}
require.True(t, holder.IsDirExist(cur))
files, ok := m[cur]
require.True(t, ok)
for _, file := range files {
var filePath string
if cur == "" {
filePath = file
} else {
filePath = cur + "/" + file
}
var obj Object
obj, ok = holder.GetObjectByPath(filePath)
if !ok {
require.True(t, ok)
}
require.True(t, ok)
require.Equal(t, filePath, obj.data.(string))
}
}
})
}
func testDir(t *testing.T) {
finalDir := "path/with/more/than/one/dir"
files := map[string][]string{
"": {"file0.1.ext", "file0.2.ext"},
"path": {"file1.1.ext", "file1.2.ext"},
"path/with/more/than/one": {"file2.1.ext", "file2.2.ext"},
"path/with/more/than/one/dir": {"file3.1.ext", "file3.2.ext"},
}
dirIter := newStrDirIter(finalDir)
var root, prevDir *Dir
next := "1"
for next != "" {
var cur string
cur, next = dirIter.next()
dir := NewDir(cur)
if root == nil {
root = dir
}
if files, ok := files[cur]; ok {
for _, file := range files {
obj := NewObject(false, struct{}{})
if cur == "" {
dir.AddChildObject(file, obj)
} else {
dir.AddChildObject(cur+"/"+file, obj)
}
}
}
if prevDir != nil {
prevDir.AddChildDir(dir)
prevDir.AddChildObject(dir.Name(), NewObject(true, struct{}{}))
}
prevDir = dir
}
t.Run("GetDirByName", func(t *testing.T) {
dirIter = newStrDirIter(finalDir)
next = "1"
cnt := 0
for next != "" {
var cur string
cur, next = dirIter.next()
dir := root.GetDirByName(cur)
require.NotNil(t, dir)
cnt++
}
require.Equal(t, 7, cnt)
})
t.Run("GetObjectByName", func(t *testing.T) {
for dirName, fileNames := range files {
for _, fileName := range fileNames {
var filePath string
if dirName == "" {
filePath = fileName
} else {
filePath = dirName + "/" + fileName
}
obj, ok := root.GetObjectByPath(filePath)
require.True(t, ok)
require.False(t, obj.isDir)
filePath += ".fail"
_, ok = root.GetObjectByPath(filePath)
require.False(t, ok)
}
}
})
}
func testStrDirIter(t *testing.T) {
for _, tc := range []struct {
name string
dir string
res []string
}{
{
name: "path with more than one dir",
dir: "path/with/more/than/one/dir",
res: []string{
"",
"path",
"path/with",
"path/with/more",
"path/with/more/than",
"path/with/more/than/one",
"path/with/more/than/one/dir",
"",
},
},
{
name: "path with one dir",
dir: "one_dir",
res: []string{
"",
"one_dir",
"",
},
},
{
name: "empty path",
dir: "",
res: []string{
"",
"",
},
},
} {
t.Run(tc.name, func(t *testing.T) {
l := newStrDirIter(tc.dir)
for i, p := range tc.res {
if p == "" && i > 0 {
break
}
cur, next := l.next()
require.Equal(t, cur, p)
require.Equal(t, next, tc.res[i+1])
}
})
}
}

681
fs/extract/extract.go Normal file
View file

@ -0,0 +1,681 @@
package extract
import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/filter"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/list"
"github.com/rclone/rclone/fs/operations"
)
type payloadType int
const (
payloadTypeBytes payloadType = iota
payloadTypeIoReader
)
type extract struct {
// parameters
fdst fs.Fs
fsrc fs.Fs
extrFs *extractFs
srcFileName string
hashType hash.Type // common hash to use
hashOption *fs.HashesOption // open option for the common hash
// internal state
ci *fs.ConfigInfo // global config
ctx context.Context // internal context for controlling go-routines
cancel func() // cancel the context
maxDurationEndTime time.Time // end time if --max-duration is set
tarFormat tar.Format // tar format to use
checkersWg sync.WaitGroup // wait group for auxiliary goroutines
transfersWg sync.WaitGroup // wait group for auxiliary goroutines
mu sync.Mutex
lastError error
errCnt atomic.Int32
dsh *DirTreeHolder
}
type objectInfo struct {
bytesPayload []byte
readerPayload io.Reader
closeFn func() error
header FileHeader
}
type extractFs struct {
// parameters
fsrc fs.Fs
srcFileName string
features *fs.Features
transfers int
// internal state
mu sync.Mutex
tarCache *TarCache
rawReader io.ReadCloser
cache []objectInfo
reopenRequests chan reopenRequest
openOptions []fs.OpenOption
streamTaskWatcher chan struct{}
tarFormat tar.Format // tar format to use
wg sync.WaitGroup // wait group for auxiliary goroutines
}
type object struct {
fs *extractFs
header FileHeader
payloadType payloadType
payload []byte // payload of small-sized objects
closeFn func() error
reopenRequest chan reopenRequest
mu sync.Mutex
readerPayload io.Reader // payload of large objects
}
type reopenRequest struct {
filePath string
obj *object
respCh chan error
}
type readCloserWrapper struct {
io.Reader
closeFn func() error
}
type copyJob struct {
src fs.Object
dst fs.Object
path string
}
func makeListDir(ctx context.Context, f fs.Fs) listDirFn {
return func(dir string) (*Dir, error) {
dirCtx := filter.SetUseFilter(ctx, f.Features().FilterAware) // make filter-aware backends constrain List
entities, err := list.DirSorted(dirCtx, f, false, dir)
if err != nil {
return nil, err
}
res := NewDir(dir)
for _, entry := range entities {
if obj, ok := entry.(fs.Object); ok {
o := NewObject(false, obj)
res.AddChildObject(obj.Remote(), o)
}
if d, ok := entry.(fs.Directory); ok {
o := NewObject(true, nil)
res.AddChildObject(d.Remote(), o)
}
}
return res, nil
}
}
func newReadCloserWrapper(r io.Reader, closeFn func() error) io.ReadCloser {
return &readCloserWrapper{
r,
closeFn,
}
}
func (r *readCloserWrapper) Close() error {
return r.closeFn()
}
func newReopenRequest(filePath string, obj *object) reopenRequest {
return reopenRequest{
filePath: filePath,
obj: obj,
respCh: make(chan error),
}
}
func (o *object) Fs() fs.Info {
return o.fs
}
func (o *object) String() string {
if o == nil {
return "<nil>"
}
return o.header.FilePath
}
func (o *object) IsStream() bool {
return o.payloadType == payloadTypeIoReader
}
func (o *object) Remote() string {
return o.header.FilePath
}
func (o *object) ModTime(_ context.Context) time.Time {
return o.header.ModTime
}
func (o *object) Size() int64 {
return o.header.Size
}
func (o *object) Hash(_ context.Context, _ hash.Type) (string, error) {
return "", nil
}
func (o *object) Storable() bool {
return true
}
func (o *object) SetModTime(_ context.Context, _ time.Time) error {
panic("not implemented")
}
func (o *object) bytesOpen() (io.ReadCloser, error) {
var err error
var once sync.Once
return newReadCloserWrapper(bytes.NewReader(o.payload), func() error {
once.Do(func() {
err = o.closeFn()
})
return err
}), nil
}
func (o *object) streamOpen() (io.ReadCloser, error) {
o.mu.Lock()
if o.readerPayload == nil {
o.mu.Unlock()
reopenReq := newReopenRequest(o.header.FilePath, o)
o.reopenRequest <- reopenReq
err := <-reopenReq.respCh
close(reopenReq.respCh)
if err != nil {
return nil, err
}
o.mu.Lock()
if o.readerPayload == nil {
panic("nil readerPayload")
}
}
defer o.mu.Unlock()
var err error
var once sync.Once
return newReadCloserWrapper(o.readerPayload, func() error {
once.Do(func() {
o.mu.Lock()
o.readerPayload = nil
o.mu.Unlock()
err = o.closeFn()
})
return err
}), nil
}
func (o *object) Open(_ context.Context, _ ...fs.OpenOption) (io.ReadCloser, error) {
if o.payloadType == payloadTypeBytes {
return o.bytesOpen()
}
return o.streamOpen()
}
func (o *object) Update(_ context.Context, _ io.Reader, _ fs.ObjectInfo, _ ...fs.OpenOption) error {
panic("not implemented")
}
func (o *object) Remove(_ context.Context) error {
panic("not implemented")
}
func (e *extractFs) Name() string {
return "extract"
}
func (e *extractFs) Root() string {
return fmt.Sprintf("%s:%s/%s", e.fsrc.Name(), e.fsrc.Root(), e.srcFileName)
}
func (e *extractFs) String() string {
return fmt.Sprintf("tar.gz archive %s at %s", e.Name(), e.Root())
}
func newExtractFs(ctx context.Context, fsrc fs.Fs, srcFileName string, transfers int) *extractFs {
e := &extractFs{
fsrc: fsrc,
srcFileName: srcFileName,
transfers: transfers,
reopenRequests: make(chan reopenRequest, transfers*2),
streamTaskWatcher: make(chan struct{}),
}
close(e.streamTaskWatcher)
e.features = (&fs.Features{
CanHaveEmptyDirectories: false,
}).Fill(ctx, e)
return e
}
func (e *extractFs) Precision() time.Duration {
e.mu.Lock()
defer e.mu.Unlock()
switch e.tarFormat {
case tar.FormatPAX:
return time.Millisecond
default:
return time.Second
}
}
func (e *extractFs) Open(ctx context.Context, options ...fs.OpenOption) error {
e.openOptions = options
return e.reopen(ctx)
}
func (e *extractFs) reopen(ctx context.Context) error {
obj, err := e.fsrc.NewObject(ctx, e.srcFileName)
if err != nil {
return err
}
reader, err := obj.Open(ctx, e.openOptions...)
if err != nil {
return err
}
gzr, err := gzip.NewReader(reader)
if err != nil {
return err
}
tarReader := tar.NewReader(gzr)
t := NewTarCache(tarReader, e.transfers, 2000)
e.mu.Lock()
e.tarCache = t
e.rawReader = reader
e.mu.Unlock()
return nil
}
func (e *extractFs) Hashes() hash.Set {
return hash.Set(hash.None)
}
func (e *extractFs) Features() *fs.Features {
return e.features
}
func (e *extractFs) processReopenRequest(ctx context.Context, reopenReq reopenRequest) error {
err := e.reopen(ctx)
if err != nil {
reopenReq.respCh <- err
return err
}
e.mu.Lock()
tarCache := e.tarCache
e.mu.Unlock()
bytesPayload, readerPayload, closer, header, err := tarCache.ForwardToFile(reopenReq.filePath)
if err != nil {
reopenReq.respCh <- err
return err
}
e.mu.Lock()
found := false
for idx, itm := range e.cache {
if itm.header.FilePath == reopenReq.filePath {
itm.readerPayload = readerPayload
itm.bytesPayload = bytesPayload
itm.closeFn = closer
itm.header = header
e.cache[idx] = itm
found = true
break
}
}
e.mu.Unlock()
if !found {
panic("cache must contain information about this object")
}
obj, err := e.NewObject(ctx, reopenReq.filePath)
if err != nil {
reopenReq.respCh <- err
return err
}
o := obj.(*object)
reopenReq.obj.readerPayload = o.readerPayload
reopenReq.obj.closeFn = o.closeFn
return nil
}
func (e *extractFs) AddNextObject(ctx context.Context) (string, error) {
e.mu.Lock()
tarCache := e.tarCache
e.mu.Unlock()
if tarCache == nil {
return "", errors.New("todo: cannot add next object")
}
var err error
L:
for {
select {
// We block execution flow until the goroutine responsible for copying
// stream has completed its work.
case <-e.streamTaskWatcher:
break L
case <-ctx.Done():
return "", ctx.Err()
case reopenReq := <-e.reopenRequests:
err = e.processReopenRequest(ctx, reopenReq)
}
}
if err != nil {
return "", err
}
var o objectInfo
o.bytesPayload, o.readerPayload, o.closeFn, o.header, err = e.tarCache.Next()
if err != nil {
if err == io.EOF {
return "", nil
}
// There can be two possible errors here:
// 1. An error occurred when reading data by goroutine.
// 2. An error occurred while reading the next header.
// In any case, we need to wait for all working goroutines to complete their work.
watcherCh := make(chan struct{})
go func() {
_ = tarCache.Close()
close(watcherCh)
}()
L2:
for {
select {
case reopenReq := <-e.reopenRequests:
err = e.processReopenRequest(ctx, reopenReq)
continue
case <-watcherCh:
// All the working goroutines have completed their work
break L2
}
}
return "", err
}
e.mu.Lock()
defer e.mu.Unlock()
e.cache = append(e.cache, o)
if e.tarFormat == tar.FormatUnknown {
e.tarFormat = o.header.Format
}
if len(e.cache) > e.transfers*3 {
e.cache = e.cache[1:]
}
return o.header.FilePath, nil
}
func (e *extractFs) newObject(ctx context.Context, objInfo objectInfo) (fs.Object, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
pt := payloadTypeBytes
if objInfo.readerPayload != nil {
pt = payloadTypeIoReader
}
return &object{
fs: e,
header: objInfo.header,
payload: objInfo.bytesPayload,
readerPayload: objInfo.readerPayload,
payloadType: pt,
closeFn: objInfo.closeFn,
reopenRequest: e.reopenRequests,
}, nil
}
func (e *extractFs) NewObject(ctx context.Context, remote string) (obj fs.Object, err error) {
e.mu.Lock()
defer e.mu.Unlock()
for _, objInfo := range e.cache {
if objInfo.header.FilePath == remote {
return e.newObject(ctx, objInfo)
}
}
if ctx.Err() != nil {
return nil, ctx.Err()
}
return nil, fs.ErrorObjectNotFound
}
func (e *extractFs) setStreamTaskWatcherCh(ch chan struct{}) {
e.mu.Lock()
defer e.mu.Unlock()
e.streamTaskWatcher = ch
}
func (e *extractFs) streamTaskWatcherCh() chan struct{} {
e.mu.Lock()
defer e.mu.Unlock()
return e.streamTaskWatcher
}
func (e *extractFs) List(_ context.Context, _ string) (entries fs.DirEntries, err error) {
panic("not implemented")
}
func (e *extractFs) Put(_ context.Context, _ io.Reader, _ fs.ObjectInfo, _ ...fs.OpenOption) (fs.Object, error) {
panic("not implemented")
}
func (e *extractFs) Mkdir(_ context.Context, _ string) error {
panic("not implemented")
}
func (e *extractFs) Rmdir(_ context.Context, _ string) error {
panic("not implemented")
}
func (e *extractFs) Remove(_ context.Context, _ fs.Object) error {
panic("not implemented")
}
func newExtract(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, srcFileName string) *extract {
ci := fs.GetConfig(ctx)
s := &extract{
ctx: ctx,
ci: ci,
fdst: fdst,
fsrc: fsrc,
srcFileName: srcFileName,
}
if ci.MaxDuration > 0 {
s.maxDurationEndTime = time.Now().Add(ci.MaxDuration)
fs.Infof(s.fdst, "Transfer session %v deadline: %s", ci.CutoffMode, s.maxDurationEndTime.Format("2006/01/02 15:04:05"))
}
// If a max session duration has been defined add a deadline
// to the main context if cutoff mode is hard. This will cut
// the transfers off.
if !s.maxDurationEndTime.IsZero() && ci.CutoffMode == fs.CutoffModeHard {
s.ctx, s.cancel = context.WithDeadline(ctx, s.maxDurationEndTime)
} else {
s.ctx, s.cancel = context.WithCancel(ctx)
}
return s
}
func (e *extract) transfer(ctx context.Context, in <-chan copyJob) {
defer e.transfersWg.Done()
for {
select {
case <-ctx.Done():
return
case job, ok := <-in:
if !ok {
return
}
obj := job.src.(*object)
var watcher chan struct{}
if obj.IsStream() {
watcher = make(chan struct{})
e.extrFs.setStreamTaskWatcherCh(watcher)
}
_, err := operations.Copy(ctx, e.fdst, job.dst, job.path, job.src)
// It is possible that operations.Copy() does not call the Open() method for job.src internally.
// Therefore, to avoid hanging when receiving information about the next file in
// the extractFs.AddNextObject() method, we force a call to o.closeFn().
o := job.src.(*object)
_ = o.closeFn()
if watcher != nil {
close(watcher)
}
if err != nil {
e.errCnt.Add(1)
e.mu.Lock()
if e.lastError != nil {
e.lastError = err
}
e.mu.Unlock()
e.cancel()
return
}
}
}
}
func (e *extract) runTransfers(ctx context.Context, in <-chan copyJob) {
for i := 0; i < e.ci.Transfers; i++ {
e.transfersWg.Add(1)
go e.transfer(ctx, in)
}
}
func (e *extract) checker(ctx context.Context, in <-chan string, out chan<- copyJob) {
defer e.checkersWg.Done()
for {
var filePath string
var ok bool
select {
case <-ctx.Done():
return
case filePath, ok = <-in:
if !ok {
return
}
}
idx := strings.LastIndex(filePath, "/")
var dir string
if idx != -1 {
dir = filePath[:idx]
}
err := e.dsh.UpdateDirList(ctx, dir)
if err != nil {
// TODO: think about retries
e.cancel()
return
}
src, err := e.extrFs.NewObject(ctx, filePath)
if err != nil {
e.cancel()
return
}
job := copyJob{
src: src,
path: filePath,
}
if objInfo, ok := e.dsh.GetObjectByPath(filePath); ok && !objInfo.isDir {
if dst := objInfo.data.(fs.Object); dst != nil {
job.dst = dst
}
}
out <- job
}
}
func (e *extract) runCheckers(ctx context.Context, in <-chan string, out chan<- copyJob) {
for i := 0; i < e.ci.Transfers; i++ {
e.checkersWg.Add(1)
go e.checker(ctx, in, out)
}
}
func (e *extract) run() error {
fExtr := newExtractFs(e.ctx, e.fsrc, e.srcFileName, e.ci.Transfers)
e.dsh = NewDirStructHolder(makeListDir(e.ctx, e.fdst))
e.extrFs = fExtr
e.hashType, e.hashOption = operations.CommonHash(e.ctx, fExtr, e.fdst)
// Options for the download
downloadOptions := []fs.OpenOption{e.hashOption}
for _, option := range e.ci.DownloadHeaders {
downloadOptions = append(downloadOptions, option)
}
err := fExtr.Open(e.ctx, downloadOptions...)
if err != nil {
return err
}
pathCh := make(chan string, e.ci.Transfers)
jobCh := make(chan copyJob, e.ci.Transfers)
e.runCheckers(e.ctx, pathCh, jobCh)
e.runTransfers(e.ctx, jobCh)
var exitErr error
for {
path, err := fExtr.AddNextObject(e.ctx)
if err != nil {
exitErr = err
break
}
if path == "" {
break
}
pathCh <- path
}
close(pathCh)
e.checkersWg.Wait()
close(jobCh)
e.transfersWg.Wait()
if e.lastError != nil {
return e.lastError
}
return exitErr
}
func Extract(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, srcFileName string) error {
do := newExtract(ctx, fdst, fsrc, srcFileName)
return do.run()
}

View file

@ -0,0 +1,87 @@
package extract
import (
"context"
"io"
"testing"
"github.com/rclone/rclone/backend/memory"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fstest/mockobject"
"github.com/stretchr/testify/require"
)
// test cases
// 1. Happy path: extract all files from archive to a directory;
// 2. Fail once when writing a file to the target directory and write it successfully when writing the same file again;
// 3. Fail all attempts to write a file to the target directory;
// 4. Fail once when reading a file from the archive, but read it successfully the second time;
// 5. Fail all attempts to read a file from the archive;
func TestExtract(t *testing.T) {
t.Run("Test happy path", testHappyPath)
}
func checkFunc(ctx context.Context, t *testing.T, dstFs fs.Fs, files map[string]struct{}, itm *item) {
for _, obj := range itm.children {
if obj.IsDir() {
checkFunc(ctx, t, dstFs, files, obj)
continue
}
_, ok := files[obj.Name()]
require.True(t, ok)
delete(files, obj.Name())
o, err := dstFs.NewObject(ctx, obj.Name())
require.NoError(t, err)
reader, err := o.Open(ctx)
require.NoError(t, err)
require.NotNil(t, reader)
actual, err := io.ReadAll(reader)
require.NoError(t, err)
require.Equal(t, obj.fileContent, actual)
}
}
func testHappyPath(t *testing.T) {
ctx := context.Background()
ctx, ci := fs.AddConfig(ctx)
ci.Transfers = 10
ci2 := fs.GetConfig(ctx)
require.Equal(t, ci.Transfers, ci2.Transfers)
rootItem, archiveContent := createTarArchive(t, 5, 6, 8, 50, 1000)
archiveFile := "archive.tar.gz"
srcFs, err := memory.NewFs(ctx, "memory", "src:dir", nil)
require.NoError(t, err)
o := mockobject.New(archiveFile).WithContent(archiveContent, mockobject.SeekModeNone)
reader, err := o.Open(ctx)
require.NoError(t, err)
// Put archive file to source fs
_, err = srcFs.Put(ctx, reader, o)
require.NoError(t, err)
dstFs, err := memory.NewFs(ctx, "memory", "dst:dir", nil)
require.NoError(t, err)
// Extract archive to dst fs
err = Extract(ctx, dstFs, srcFs, archiveFile)
require.NoError(t, err)
ll := make(map[string]struct{})
listRer := dstFs.(fs.ListRer)
require.NotNil(t, listRer)
err = listRer.ListR(ctx, "", func(entries fs.DirEntries) error {
for _, entry := range entries {
obj, ok := entry.(fs.Object)
if !ok {
continue
}
ll[obj.Remote()] = struct{}{}
}
return nil
})
checkFunc(ctx, t, dstFs, ll, rootItem)
require.Equal(t, 0, len(ll))
}

190
fs/extract/tarcache.go Normal file
View file

@ -0,0 +1,190 @@
package extract
import (
"archive/tar"
"bytes"
"errors"
"io"
"sync"
"time"
)
type TarCache struct {
reader *tar.Reader
workerCnt int
maxSizeInMemory int64
ch chan struct{}
streamWg sync.WaitGroup
wg sync.WaitGroup
}
type FileHeader struct {
FilePath string
ModTime time.Time
Size int64
Format tar.Format
}
var (
streamReaderAlreadyCloseError = errors.New("streamReadCloser already closed")
bytesReaderAlreadyCloseError = errors.New("bytesReadCloser already closed")
)
func (t *TarCache) WaitWorkersDone() {
t.wg.Wait()
}
func (t *TarCache) Close() error {
t.WaitWorkersDone()
if t.ch != nil {
close(t.ch)
t.ch = nil
return nil
}
return errors.New("TarCache already closed")
}
func NewTarCache(reader *tar.Reader, workerCnt int, maxSizeInMemory int64) *TarCache {
return &TarCache{
reader: reader,
workerCnt: workerCnt,
maxSizeInMemory: maxSizeInMemory,
ch: make(chan struct{}, workerCnt),
}
}
type helper struct {
io.Reader
closer func() error
}
func (h *helper) Close() error {
return h.closer()
}
func (t *TarCache) NextPayload() (_ io.ReadCloser, _ FileHeader, err error) {
bytesPayload, reader, closer, header, err := t.Next()
if err != nil {
return nil, FileHeader{}, err
}
if reader != nil {
return &helper{reader, closer}, header, nil
}
reader = bytes.NewReader(bytesPayload)
return &helper{reader, closer}, header, nil
}
type streamCloser struct {
ch chan struct{}
cache *TarCache
isBytes bool
}
func newStreamCloser(cache *TarCache, isBytes bool) *streamCloser {
if !isBytes {
cache.streamWg.Add(1)
}
cache.wg.Add(1)
return &streamCloser{
ch: cache.ch,
cache: cache,
isBytes: isBytes,
}
}
func (s *streamCloser) close() error {
if s.ch == nil {
if s.isBytes {
return bytesReaderAlreadyCloseError
}
return streamReaderAlreadyCloseError
}
ch := s.ch
s.ch = nil
<-ch
if !s.isBytes {
s.cache.streamWg.Done()
}
s.cache.wg.Done()
return nil
}
func (t *TarCache) ForwardToFile(filePath string) (_ []byte, _ io.Reader, _ func() error, _ FileHeader, err error) {
for {
header, err := t.reader.Next()
if err != nil {
return nil, nil, nil, FileHeader{}, err
}
switch header.Typeflag {
case tar.TypeDir:
continue
case tar.TypeReg:
if filePath != header.Name {
// TODO: handle case insensitivity
continue
}
h := FileHeader{
FilePath: header.Name,
ModTime: header.ModTime,
Size: header.Size,
Format: header.Format,
}
if header.Size < t.maxSizeInMemory {
payload, err := io.ReadAll(t.reader)
if err != nil {
return nil, nil, nil, FileHeader{}, err
}
closer := newStreamCloser(t, true)
return payload, nil, closer.close, h, nil
}
closer := newStreamCloser(t, false)
return nil, t.reader, closer.close, h, nil
default:
continue
}
}
}
func (t *TarCache) Next() (_ []byte, _ io.Reader, _ func() error, _ FileHeader, err error) {
// We block the execution flow if the number of currently processed files exceeds the limit.
t.ch <- struct{}{}
defer func() {
if err != nil {
<-t.ch
}
}()
// We also block the execution flow if there is a reader without caching.
t.streamWg.Wait()
for {
header, err := t.reader.Next()
if err != nil {
return nil, nil, nil, FileHeader{}, err
}
switch header.Typeflag {
case tar.TypeDir:
continue
case tar.TypeReg:
h := FileHeader{
FilePath: header.Name,
ModTime: header.ModTime,
Size: header.Size,
Format: header.Format,
}
if header.Size < t.maxSizeInMemory {
var payload []byte
payload, err = io.ReadAll(t.reader)
if err != nil {
return nil, nil, nil, FileHeader{}, err
}
closer := newStreamCloser(t, true)
return payload, nil, closer.close, h, nil
}
closer := newStreamCloser(t, false)
return nil, t.reader, closer.close, h, nil
default:
continue
}
}
}

495
fs/extract/tarcache_test.go Normal file
View file

@ -0,0 +1,495 @@
package extract
import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"encoding/hex"
"errors"
"fmt"
"io"
"io/fs"
"math"
"math/rand"
"path"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
)
type itemType int
const (
dir itemType = iota
file
)
type item struct {
t itemType
name string
children map[string]*item
fileContent []byte
}
func (i *item) Name() string {
return i.name
}
func (i *item) Size() int64 {
return int64(len(i.fileContent))
}
func (i *item) Mode() fs.FileMode {
res := fs.ModePerm
if i.t == dir {
res |= fs.ModeDir
}
return res
}
func (i *item) ModTime() time.Time {
return time.Now()
}
func (i *item) IsDir() bool {
return i.t == dir
}
func (i *item) Sys() any {
return nil
}
func dice(rnd *rand.Rand, percent int) bool {
return rnd.Intn(100) < percent
}
func randomFileName(rnd *rand.Rand) string {
b := make([]byte, 16)
rnd.Read(b)
return hex.EncodeToString(b)
}
type errorReader struct {
io.Reader
bytesRead int
threshold int
}
var readError = errors.New("read error")
func newReaderWithError(rc io.Reader, threshold int) *errorReader {
return &errorReader{rc, 0, threshold}
}
func (e *errorReader) Read(p []byte) (n int, err error) {
if e.bytesRead > e.threshold {
return 0, readError
}
n, err = e.Reader.Read(p)
e.bytesRead += n
return
}
func (i *item) createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) {
if deep <= 0 {
return
}
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir
for j := 0; j < nItem; j++ {
isFile := deep == 1 || dice(rnd, 60)
newItem := item{}
var prefix string
if len(i.name) > 0 {
prefix = i.name + "/"
}
if isFile {
newItem.name = fmt.Sprintf("%sfile_%s", prefix, randomFileName(rnd))
newItem.t = file
var fileSize int
isSmallFile := dice(rnd, smallFilesPercent)
if isSmallFile {
// [1, bigFileSize)
fileSize = rnd.Intn(bigFileSize-1) + 1
} else {
// [bigFileSize, 2*bigFileSize)
fileSize = rnd.Intn(bigFileSize) + bigFileSize
}
newItem.fileContent = make([]byte, fileSize)
rnd.Read(newItem.fileContent)
i.children[newItem.name] = &newItem
} else {
newItem.name = fmt.Sprintf("%sdir_%s", prefix, randomFileName(rnd))
newItem.t = dir
newItem.children = make(map[string]*item)
newItem.createTarArchiveContent(deep-1, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
i.children[newItem.name] = &newItem
}
}
}
func (i *item) fillMap(m *sync.Map) {
for _, v := range i.children {
if v.t == file {
m.Store(v.name, v.fileContent)
} else if v.t == dir {
v.fillMap(m)
}
}
}
func createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) *item {
root := &item{
t: dir,
name: "",
children: make(map[string]*item),
}
root.createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
return root
}
func (i *item) writeItemToArchiver(tw *tar.Writer) error {
// In the first pass, we write the files from the current directory.
for _, child := range i.children {
if child.t == file {
head, err := tar.FileInfoHeader(child, path.Base(child.name))
if err != nil {
return err
}
head.Name = child.name
err = tw.WriteHeader(head)
if err != nil {
return err
}
_, err = io.Copy(tw, bytes.NewReader(child.fileContent))
if err != nil {
return err
}
}
}
// In the second pass, we write files from child directories.
for _, child := range i.children {
if child.t == dir {
if err := child.writeItemToArchiver(tw); err != nil {
return err
}
}
}
return nil
}
func createTarArchiveHelper(writer io.Writer, content *item) error {
gz := gzip.NewWriter(writer)
defer func() {
_ = gz.Close()
}()
tw := tar.NewWriter(gz)
defer func() {
_ = tw.Close()
}()
return content.writeItemToArchiver(tw)
}
func createTarArchive(t *testing.T, deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) (*item, []byte) {
content := createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
writer := bytes.NewBuffer(make([]byte, 0))
err := createTarArchiveHelper(writer, content)
archData := writer.Bytes()
require.NoError(t, err)
require.NotNil(t, content)
require.NotNil(t, archData)
return content, archData
}
func createTarArchiveWithOneFileHelper(w io.Writer) error {
gz := gzip.NewWriter(w)
defer func() {
_ = gz.Close()
}()
tw := tar.NewWriter(gz)
defer func() {
_ = tw.Close()
}()
itm := item{
t: file,
name: "test.bin",
fileContent: make([]byte, 0x80000),
}
rootItm := item{
t: dir,
children: map[string]*item{
itm.name: &itm,
},
}
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
rnd.Read(itm.fileContent)
return rootItm.writeItemToArchiver(tw)
}
func createTarArchiveWithOneFile(t *testing.T) []byte {
writer := bytes.NewBuffer(make([]byte, 0))
err := createTarArchiveWithOneFileHelper(writer)
require.NoError(t, err)
return writer.Bytes()
}
func doubleCloseCaseHelper(t *testing.T, readerTypeChecker func(io.Reader) bool, errorText string) {
bigFileSize := 0x50
content, archData := createTarArchive(t, 5, 5, 10, 70, bigFileSize)
reader := bytes.NewReader(archData)
gzf, err := gzip.NewReader(reader)
require.NoError(t, err)
tarReader := tar.NewReader(gzf)
tarCache := NewTarCache(tarReader, 10, int64(bigFileSize))
var m sync.Map
content.fillMap(&m)
errCh := make(chan error, 100)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
var cnt int32
for {
select {
case <-ctx.Done():
break
default:
}
contentReader, header, err := tarCache.NextPayload()
if err != nil {
if err == io.EOF {
break
}
require.NoError(t, err)
}
wg.Add(1)
go func(reader io.ReadCloser, head FileHeader) {
defer wg.Done()
defer func() {
if err := reader.Close(); err != nil {
errCh <- err
cancel()
}
}()
select {
case <-ctx.Done():
return
default:
}
val, ok := m.Load(header.FilePath)
if !ok {
errCh <- errors.New(header.FilePath + " not found")
cancel()
return
}
expected := val.([]byte)
content, err := io.ReadAll(reader)
if err != nil {
errCh <- err
cancel()
return
}
if !bytes.Equal(expected, content) {
errCh <- errors.New(header.FilePath + " content mismatch")
cancel()
return
}
if atomic.AddInt32(&cnt, 1) >= 100 {
if readerTypeChecker(reader) {
if err := reader.Close(); err != nil {
errCh <- err
cancel()
return
}
}
}
}(contentReader, header)
}
err = tarCache.Close()
require.NoError(t, err)
wg.Wait()
close(errCh)
for e := range errCh {
if err == nil {
err = e
}
}
require.EqualError(t, err, errorText)
}
func readErrorCase(t *testing.T) {
archData := createTarArchiveWithOneFile(t)
reader := newReaderWithError(bytes.NewReader(archData), 0x800)
gzf, err := gzip.NewReader(reader)
require.NoError(t, err)
tarReader := tar.NewReader(gzf)
tarCache := NewTarCache(tarReader, 10, math.MaxInt)
errCh := make(chan error, 100)
var wg sync.WaitGroup
_, _, err = tarCache.NextPayload()
require.NotErrorIs(t, err, io.EOF)
require.Error(t, err, readError.Error())
err = tarCache.Close()
require.NoError(t, err)
wg.Wait()
close(errCh)
for e := range errCh {
if err == nil {
err = e
}
}
require.NoError(t, err)
}
func successfulCase(t *testing.T) {
bigFileSize := 0x200
content, archData := createTarArchive(t, 5, 5, 30, 70, bigFileSize)
reader := bytes.NewReader(archData)
gzf, err := gzip.NewReader(reader)
require.NoError(t, err)
tarReader := tar.NewReader(gzf)
tarCache := NewTarCache(tarReader, 10, int64(bigFileSize))
var m sync.Map
content.fillMap(&m)
errCh := make(chan error, 100)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
for {
select {
case <-ctx.Done():
break
default:
}
contentReader, header, err := tarCache.NextPayload()
if err != nil {
if err == io.EOF {
break
}
require.NoError(t, err)
}
wg.Add(1)
go func(reader io.ReadCloser, head FileHeader) {
defer wg.Done()
defer func() {
if err := reader.Close(); err != nil {
errCh <- err
cancel()
}
}()
select {
case <-ctx.Done():
return
default:
}
val, ok := m.Load(header.FilePath)
if !ok {
errCh <- errors.New(header.FilePath + " not found")
cancel()
return
}
expected := val.([]byte)
content, err := io.ReadAll(reader)
if err != nil {
errCh <- err
cancel()
return
}
if !bytes.Equal(expected, content) {
errCh <- errors.New(header.FilePath + " content mismatch")
cancel()
return
}
m.Delete(header.FilePath)
}(contentReader, header)
}
err = tarCache.Close()
wg.Wait()
close(errCh)
var err2 error
for e := range errCh {
if err2 == nil {
err2 = e
}
}
require.NoError(t, err2)
require.NoError(t, err)
// Checking for dictionary emptiness
var l int
m.Range(func(k, v interface{}) bool {
l++
return true
})
require.Equal(t, 0, l)
}
func TestTarCache(t *testing.T) {
t.Run("Successful case", func(t *testing.T) {
successfulCase(t)
})
t.Run("Double close of the byte reader", func(t *testing.T) {
doubleCloseCaseHelper(t, func(reader io.Reader) bool {
e, ok := reader.(*helper)
if !ok {
return false
}
_, ok = e.Reader.(*bytes.Reader)
return ok
}, bytesReaderAlreadyCloseError.Error())
})
t.Run("Double close of the stream reader", func(t *testing.T) {
doubleCloseCaseHelper(t, func(reader io.Reader) bool {
e, ok := reader.(*helper)
if !ok {
return false
}
_, ok = e.Reader.(*bytes.Reader)
return !ok
}, streamReaderAlreadyCloseError.Error())
})
t.Run("Read error", func(t *testing.T) {
readErrorCase(t)
})
}