Add extract command
Signed-off-by: Aleksey Kravchenko <al.kravchenko@yadro.com>
This commit is contained in:
parent
12b572e62a
commit
d36b5683b4
8 changed files with 2086 additions and 0 deletions
|
@ -23,6 +23,7 @@ import (
|
||||||
_ "github.com/rclone/rclone/cmd/dedupe"
|
_ "github.com/rclone/rclone/cmd/dedupe"
|
||||||
_ "github.com/rclone/rclone/cmd/delete"
|
_ "github.com/rclone/rclone/cmd/delete"
|
||||||
_ "github.com/rclone/rclone/cmd/deletefile"
|
_ "github.com/rclone/rclone/cmd/deletefile"
|
||||||
|
_ "github.com/rclone/rclone/cmd/extract"
|
||||||
_ "github.com/rclone/rclone/cmd/genautocomplete"
|
_ "github.com/rclone/rclone/cmd/genautocomplete"
|
||||||
_ "github.com/rclone/rclone/cmd/gendocs"
|
_ "github.com/rclone/rclone/cmd/gendocs"
|
||||||
_ "github.com/rclone/rclone/cmd/gitannex"
|
_ "github.com/rclone/rclone/cmd/gitannex"
|
||||||
|
|
46
cmd/extract/extract.go
Normal file
46
cmd/extract/extract.go
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
package extract
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/rclone/rclone/cmd"
|
||||||
|
"github.com/rclone/rclone/fs/extract"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
cmd.Root.AddCommand(commandDefinition)
|
||||||
|
}
|
||||||
|
|
||||||
|
var commandDefinition = &cobra.Command{
|
||||||
|
Use: "extract source:path/to/archive.tar.gz dest:path",
|
||||||
|
Short: `Extract the tar.gz archive from source to the dest, skipping identical files.`,
|
||||||
|
// Note: "|" will be replaced by backticks below
|
||||||
|
Long: strings.ReplaceAll(`Extract the tar.gz archive files from the source to the destination. Does not transfer files that are
|
||||||
|
identical on source and destination, testing by size and modification
|
||||||
|
time or MD5SUM. Doesn't delete files from the destination.
|
||||||
|
|
||||||
|
If dest:path doesn't exist, it is created and the contents of the archive source:path/to/targz/archive
|
||||||
|
go there.
|
||||||
|
|
||||||
|
**Note**: Use the |-P|/|--progress| flag to view real-time transfer statistics.
|
||||||
|
|
||||||
|
**Note**: Use the |--dry-run| or the |--interactive|/|-i| flag to test without copying anything.
|
||||||
|
`, "|", "`"),
|
||||||
|
Annotations: map[string]string{
|
||||||
|
"groups": "Extract",
|
||||||
|
},
|
||||||
|
Run: func(command *cobra.Command, args []string) {
|
||||||
|
|
||||||
|
cmd.CheckArgs(2, 2, command, args)
|
||||||
|
fsrc, srcFileName, fdst := cmd.NewFsSrcFileDst(args)
|
||||||
|
cmd.Run(true, true, command, func() error {
|
||||||
|
if srcFileName == "" {
|
||||||
|
return errors.New("the source is required to be a file")
|
||||||
|
}
|
||||||
|
return extract.Extract(context.Background(), fdst, fsrc, srcFileName)
|
||||||
|
})
|
||||||
|
},
|
||||||
|
}
|
276
fs/extract/dirtreeholder.go
Normal file
276
fs/extract/dirtreeholder.go
Normal file
|
@ -0,0 +1,276 @@
|
||||||
|
package extract
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/rclone/rclone/fs"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Object struct {
|
||||||
|
isDir bool
|
||||||
|
data any
|
||||||
|
}
|
||||||
|
|
||||||
|
type Dir struct {
|
||||||
|
name string
|
||||||
|
objs map[string]Object
|
||||||
|
dirs map[string]*Dir
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDir(name string) *Dir {
|
||||||
|
return &Dir{
|
||||||
|
name: name,
|
||||||
|
objs: make(map[string]Object),
|
||||||
|
dirs: make(map[string]*Dir),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Dir) Name() string {
|
||||||
|
return d.name
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewObject(isDir bool, data any) Object {
|
||||||
|
return Object{
|
||||||
|
isDir: isDir,
|
||||||
|
data: data,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetDirByName returns a subdirectory with the given name
|
||||||
|
func (d *Dir) GetDirByName(name string) *Dir {
|
||||||
|
if d.name == name {
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
if len(name) <= len(d.name) || !strings.HasPrefix(name, d.name) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if d.name != "" && name[len(d.name)] != '/' {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
idx := strings.Index(name[len(d.name)+1:], "/")
|
||||||
|
nextDir := name
|
||||||
|
if idx != -1 {
|
||||||
|
nextDir = name[:idx+len(d.name)+1]
|
||||||
|
}
|
||||||
|
if td, ok := d.dirs[nextDir]; ok {
|
||||||
|
return td.GetDirByName(name)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddChildDir adds a child directory
|
||||||
|
func (d *Dir) AddChildDir(child *Dir) {
|
||||||
|
d.dirs[child.name] = child
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddChildObject adds information about the child object
|
||||||
|
func (d *Dir) AddChildObject(name string, child Object) {
|
||||||
|
d.objs[name] = child
|
||||||
|
}
|
||||||
|
|
||||||
|
type listDirFn func(dir string) (*Dir, error)
|
||||||
|
|
||||||
|
type DirTreeHolder struct {
|
||||||
|
listF listDirFn
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
// consider changing the variable type to fs.DirTree
|
||||||
|
root *Dir
|
||||||
|
dirsInWork map[string]*sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
type strDirIter struct {
|
||||||
|
dir string
|
||||||
|
idx int
|
||||||
|
}
|
||||||
|
|
||||||
|
func newStrDirIter(dir string) *strDirIter {
|
||||||
|
return &strDirIter{
|
||||||
|
dir: dir,
|
||||||
|
idx: -1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *strDirIter) next() (cur string, next string) {
|
||||||
|
if s.dir == "" {
|
||||||
|
return "", ""
|
||||||
|
}
|
||||||
|
if s.idx == -1 {
|
||||||
|
s.idx = 0
|
||||||
|
idx := strings.Index(s.dir, "/")
|
||||||
|
if idx == -1 {
|
||||||
|
return "", s.dir
|
||||||
|
} else {
|
||||||
|
return "", s.dir[:idx]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
idx := strings.Index(s.dir[s.idx:], "/")
|
||||||
|
if idx == -1 {
|
||||||
|
return s.dir, ""
|
||||||
|
}
|
||||||
|
defer func(idx int) {
|
||||||
|
s.idx = s.idx + idx + 1
|
||||||
|
}(idx)
|
||||||
|
|
||||||
|
idx2 := strings.Index(s.dir[s.idx+idx+1:], "/")
|
||||||
|
if idx2 == -1 {
|
||||||
|
return s.dir[:idx+s.idx], s.dir
|
||||||
|
}
|
||||||
|
return s.dir[:idx+s.idx], s.dir[:idx+idx2+s.idx+1]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Dir) GetObjectByPath(path string) (Object, bool) {
|
||||||
|
var dirName string
|
||||||
|
idx := strings.LastIndex(path, "/")
|
||||||
|
if idx != -1 {
|
||||||
|
dirName = path[:idx]
|
||||||
|
}
|
||||||
|
dir := d.GetDirByName(dirName)
|
||||||
|
if dir == nil {
|
||||||
|
return Object{}, false
|
||||||
|
}
|
||||||
|
obj, ok := dir.objs[path]
|
||||||
|
return obj, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDirStructHolder(listF listDirFn) *DirTreeHolder {
|
||||||
|
return &DirTreeHolder{
|
||||||
|
listF: listF,
|
||||||
|
root: nil,
|
||||||
|
dirsInWork: make(map[string]*sync.WaitGroup),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *DirTreeHolder) GetObjectByPath(path string) (Object, bool) {
|
||||||
|
l.mu.Lock()
|
||||||
|
defer l.mu.Unlock()
|
||||||
|
if l.root == nil {
|
||||||
|
return Object{}, false
|
||||||
|
}
|
||||||
|
return l.root.GetObjectByPath(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *DirTreeHolder) IsDirExist(path string) bool {
|
||||||
|
l.mu.Lock()
|
||||||
|
defer l.mu.Unlock()
|
||||||
|
if l.root == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return l.root.GetDirByName(path) != nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *DirTreeHolder) UpdateDirList(ctx context.Context, dir string) error {
|
||||||
|
l.mu.Lock()
|
||||||
|
iter := newStrDirIter(dir)
|
||||||
|
aborting := func() bool {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
if aborting() {
|
||||||
|
l.mu.Unlock()
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
curDir, nextDir := iter.next()
|
||||||
|
if nextDir == "" {
|
||||||
|
l.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if wg, ok := l.dirsInWork[curDir]; ok {
|
||||||
|
l.mu.Unlock()
|
||||||
|
wg.Wait()
|
||||||
|
if aborting() {
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
l.mu.Lock()
|
||||||
|
}
|
||||||
|
if curDir == "" && l.root == nil {
|
||||||
|
if wg, ok := l.dirsInWork[curDir]; ok {
|
||||||
|
l.mu.Unlock()
|
||||||
|
wg.Wait()
|
||||||
|
if aborting() {
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
l.mu.Lock()
|
||||||
|
dir := l.root.GetDirByName(curDir)
|
||||||
|
if dir == nil {
|
||||||
|
l.mu.Unlock()
|
||||||
|
return fmt.Errorf("error while updating info about dir %s", nextDir)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
wg.Add(1)
|
||||||
|
l.dirsInWork[curDir] = wg
|
||||||
|
l.mu.Unlock()
|
||||||
|
d, err := l.listF(curDir)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if !errors.Is(err, fs.ErrorDirNotFound) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
d = NewDir(curDir)
|
||||||
|
}
|
||||||
|
l.mu.Lock()
|
||||||
|
l.root = d
|
||||||
|
wg = l.dirsInWork[curDir]
|
||||||
|
delete(l.dirsInWork, curDir)
|
||||||
|
wg.Done()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
d := l.root.GetDirByName(curDir)
|
||||||
|
if d == nil {
|
||||||
|
return errors.New("not possible to go where")
|
||||||
|
}
|
||||||
|
if _, ok := d.dirs[nextDir]; ok {
|
||||||
|
if ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if o, ok := d.objs[nextDir]; ok {
|
||||||
|
// Where is no such directory
|
||||||
|
if !o.isDir {
|
||||||
|
l.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if wg, ok := l.dirsInWork[nextDir]; ok {
|
||||||
|
l.mu.Unlock()
|
||||||
|
wg.Wait()
|
||||||
|
if aborting() {
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
l.mu.Lock()
|
||||||
|
dir := d.GetDirByName(nextDir)
|
||||||
|
if dir == nil {
|
||||||
|
l.mu.Unlock()
|
||||||
|
return fmt.Errorf("error while updating info about dir %s", nextDir)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
wg.Add(1)
|
||||||
|
l.dirsInWork[nextDir] = wg
|
||||||
|
l.mu.Unlock()
|
||||||
|
td, err := l.listF(nextDir)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
l.mu.Lock()
|
||||||
|
d.dirs[nextDir] = td
|
||||||
|
wg = l.dirsInWork[nextDir]
|
||||||
|
delete(l.dirsInWork, nextDir)
|
||||||
|
wg.Done()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
l.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
310
fs/extract/dirtreeholder_test.go
Normal file
310
fs/extract/dirtreeholder_test.go
Normal file
|
@ -0,0 +1,310 @@
|
||||||
|
package extract
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
func createDirTree(dir *Dir, deep, minItemsInDir, maxItemsInDir int) {
|
||||||
|
if deep <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
|
||||||
|
nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir
|
||||||
|
|
||||||
|
for j := 0; j < nItem; j++ {
|
||||||
|
isFile := deep == 1 || dice(rnd, 50)
|
||||||
|
var prefix string
|
||||||
|
if len(dir.name) > 0 {
|
||||||
|
prefix = dir.name + "/"
|
||||||
|
}
|
||||||
|
if isFile {
|
||||||
|
name := fmt.Sprintf("%sfile_%s", prefix, randomFileName(rnd))
|
||||||
|
newItem := NewObject(false, name)
|
||||||
|
dir.objs[name] = newItem
|
||||||
|
} else {
|
||||||
|
name := fmt.Sprintf("%sdir_%s", prefix, randomFileName(rnd))
|
||||||
|
newItem := NewObject(true, name)
|
||||||
|
dir.objs[name] = newItem
|
||||||
|
childDir := NewDir(name)
|
||||||
|
createDirTree(childDir, deep-1, minItemsInDir, maxItemsInDir)
|
||||||
|
if len(childDir.dirs) != 0 || len(childDir.objs) != 0 {
|
||||||
|
dir.dirs[childDir.name] = childDir
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type listFnHelper struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
cnt atomic.Uint64
|
||||||
|
root *Dir
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *listFnHelper) Cnt() uint64 {
|
||||||
|
return l.cnt.Load()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *listFnHelper) ResetCnt() {
|
||||||
|
l.cnt.Store(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *listFnHelper) listDir(dir string) (*Dir, error) {
|
||||||
|
l.cnt.Add(1)
|
||||||
|
l.mu.Lock()
|
||||||
|
defer l.mu.Unlock()
|
||||||
|
d := l.root.GetDirByName(dir)
|
||||||
|
if d == nil {
|
||||||
|
return nil, os.ErrNotExist
|
||||||
|
}
|
||||||
|
newDir := NewDir(d.name)
|
||||||
|
for path, child := range d.objs {
|
||||||
|
newDir.AddChildObject(path, child)
|
||||||
|
}
|
||||||
|
return newDir, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func fillSliceWithDirsInfo(s *[]string, dir *Dir) {
|
||||||
|
for path, child := range dir.objs {
|
||||||
|
if child.isDir {
|
||||||
|
*s = append(*s, path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, child := range dir.dirs {
|
||||||
|
fillSliceWithDirsInfo(s, child)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func fillMapWithFilesInfo(m map[string][]string, dir *Dir) {
|
||||||
|
files := make([]string, 0)
|
||||||
|
for path, child := range dir.objs {
|
||||||
|
if !child.isDir {
|
||||||
|
files = append(files, filepath.Base(path))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m[dir.Name()] = files
|
||||||
|
|
||||||
|
for _, child := range dir.dirs {
|
||||||
|
fillMapWithFilesInfo(m, child)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestListing(t *testing.T) {
|
||||||
|
t.Run("Test dir struct holder", testDirStructHolder)
|
||||||
|
|
||||||
|
t.Run("Test dir", testDir)
|
||||||
|
|
||||||
|
t.Run("Test string directory iterating", testStrDirIter)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testDirStructHolder(t *testing.T) {
|
||||||
|
root := NewDir("")
|
||||||
|
createDirTree(root, 6, 5, 10)
|
||||||
|
listF := &listFnHelper{
|
||||||
|
root: root,
|
||||||
|
}
|
||||||
|
t.Run("Concurrent listing", func(t *testing.T) {
|
||||||
|
s := make([]string, 0)
|
||||||
|
m := make(map[string][]string)
|
||||||
|
fillSliceWithDirsInfo(&s, root)
|
||||||
|
fillMapWithFilesInfo(m, root)
|
||||||
|
sort.Slice(s, func(i, j int) bool {
|
||||||
|
return len(s[i]) < len(s[j])
|
||||||
|
})
|
||||||
|
require.NotNil(t, root)
|
||||||
|
holder := NewDirStructHolder(listF.listDir)
|
||||||
|
halfLen := len(s) / 2
|
||||||
|
path := s[halfLen]
|
||||||
|
expectedCnt := 0
|
||||||
|
dIter := newStrDirIter(path)
|
||||||
|
for {
|
||||||
|
expectedCnt++
|
||||||
|
_, next := dIter.next()
|
||||||
|
if next == "" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.NotNil(t, holder)
|
||||||
|
|
||||||
|
eg, ctx := errgroup.WithContext(context.Background())
|
||||||
|
eg.Go(func() error {
|
||||||
|
return holder.UpdateDirList(ctx, path)
|
||||||
|
})
|
||||||
|
eg.Go(func() error {
|
||||||
|
return holder.UpdateDirList(ctx, path)
|
||||||
|
})
|
||||||
|
eg.Go(func() error {
|
||||||
|
return holder.UpdateDirList(ctx, path)
|
||||||
|
})
|
||||||
|
eg.Go(func() error {
|
||||||
|
return holder.UpdateDirList(ctx, path+"not.exists")
|
||||||
|
})
|
||||||
|
err := eg.Wait()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, uint64(expectedCnt), listF.Cnt())
|
||||||
|
dIter = newStrDirIter(path)
|
||||||
|
var cur, next string
|
||||||
|
next = "1"
|
||||||
|
for next != "" {
|
||||||
|
cur, next = dIter.next()
|
||||||
|
if next == "" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
require.True(t, holder.IsDirExist(cur))
|
||||||
|
files, ok := m[cur]
|
||||||
|
require.True(t, ok)
|
||||||
|
for _, file := range files {
|
||||||
|
var filePath string
|
||||||
|
if cur == "" {
|
||||||
|
filePath = file
|
||||||
|
} else {
|
||||||
|
filePath = cur + "/" + file
|
||||||
|
}
|
||||||
|
var obj Object
|
||||||
|
obj, ok = holder.GetObjectByPath(filePath)
|
||||||
|
if !ok {
|
||||||
|
require.True(t, ok)
|
||||||
|
}
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, filePath, obj.data.(string))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func testDir(t *testing.T) {
|
||||||
|
finalDir := "path/with/more/than/one/dir"
|
||||||
|
files := map[string][]string{
|
||||||
|
"": {"file0.1.ext", "file0.2.ext"},
|
||||||
|
"path": {"file1.1.ext", "file1.2.ext"},
|
||||||
|
"path/with/more/than/one": {"file2.1.ext", "file2.2.ext"},
|
||||||
|
"path/with/more/than/one/dir": {"file3.1.ext", "file3.2.ext"},
|
||||||
|
}
|
||||||
|
|
||||||
|
dirIter := newStrDirIter(finalDir)
|
||||||
|
var root, prevDir *Dir
|
||||||
|
next := "1"
|
||||||
|
for next != "" {
|
||||||
|
var cur string
|
||||||
|
cur, next = dirIter.next()
|
||||||
|
dir := NewDir(cur)
|
||||||
|
if root == nil {
|
||||||
|
root = dir
|
||||||
|
}
|
||||||
|
if files, ok := files[cur]; ok {
|
||||||
|
for _, file := range files {
|
||||||
|
obj := NewObject(false, struct{}{})
|
||||||
|
if cur == "" {
|
||||||
|
dir.AddChildObject(file, obj)
|
||||||
|
} else {
|
||||||
|
dir.AddChildObject(cur+"/"+file, obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if prevDir != nil {
|
||||||
|
prevDir.AddChildDir(dir)
|
||||||
|
prevDir.AddChildObject(dir.Name(), NewObject(true, struct{}{}))
|
||||||
|
}
|
||||||
|
prevDir = dir
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("GetDirByName", func(t *testing.T) {
|
||||||
|
dirIter = newStrDirIter(finalDir)
|
||||||
|
next = "1"
|
||||||
|
cnt := 0
|
||||||
|
for next != "" {
|
||||||
|
var cur string
|
||||||
|
cur, next = dirIter.next()
|
||||||
|
dir := root.GetDirByName(cur)
|
||||||
|
require.NotNil(t, dir)
|
||||||
|
cnt++
|
||||||
|
}
|
||||||
|
require.Equal(t, 7, cnt)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("GetObjectByName", func(t *testing.T) {
|
||||||
|
for dirName, fileNames := range files {
|
||||||
|
for _, fileName := range fileNames {
|
||||||
|
var filePath string
|
||||||
|
if dirName == "" {
|
||||||
|
filePath = fileName
|
||||||
|
} else {
|
||||||
|
filePath = dirName + "/" + fileName
|
||||||
|
}
|
||||||
|
obj, ok := root.GetObjectByPath(filePath)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.False(t, obj.isDir)
|
||||||
|
filePath += ".fail"
|
||||||
|
_, ok = root.GetObjectByPath(filePath)
|
||||||
|
require.False(t, ok)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func testStrDirIter(t *testing.T) {
|
||||||
|
for _, tc := range []struct {
|
||||||
|
name string
|
||||||
|
dir string
|
||||||
|
res []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "path with more than one dir",
|
||||||
|
dir: "path/with/more/than/one/dir",
|
||||||
|
res: []string{
|
||||||
|
"",
|
||||||
|
"path",
|
||||||
|
"path/with",
|
||||||
|
"path/with/more",
|
||||||
|
"path/with/more/than",
|
||||||
|
"path/with/more/than/one",
|
||||||
|
"path/with/more/than/one/dir",
|
||||||
|
"",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "path with one dir",
|
||||||
|
dir: "one_dir",
|
||||||
|
res: []string{
|
||||||
|
"",
|
||||||
|
"one_dir",
|
||||||
|
"",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "empty path",
|
||||||
|
dir: "",
|
||||||
|
res: []string{
|
||||||
|
"",
|
||||||
|
"",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
l := newStrDirIter(tc.dir)
|
||||||
|
for i, p := range tc.res {
|
||||||
|
if p == "" && i > 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
cur, next := l.next()
|
||||||
|
require.Equal(t, cur, p)
|
||||||
|
require.Equal(t, next, tc.res[i+1])
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
681
fs/extract/extract.go
Normal file
681
fs/extract/extract.go
Normal file
|
@ -0,0 +1,681 @@
|
||||||
|
package extract
|
||||||
|
|
||||||
|
import (
|
||||||
|
"archive/tar"
|
||||||
|
"bytes"
|
||||||
|
"compress/gzip"
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/rclone/rclone/fs"
|
||||||
|
"github.com/rclone/rclone/fs/filter"
|
||||||
|
"github.com/rclone/rclone/fs/hash"
|
||||||
|
"github.com/rclone/rclone/fs/list"
|
||||||
|
"github.com/rclone/rclone/fs/operations"
|
||||||
|
)
|
||||||
|
|
||||||
|
type payloadType int
|
||||||
|
|
||||||
|
const (
|
||||||
|
payloadTypeBytes payloadType = iota
|
||||||
|
payloadTypeIoReader
|
||||||
|
)
|
||||||
|
|
||||||
|
type extract struct {
|
||||||
|
// parameters
|
||||||
|
fdst fs.Fs
|
||||||
|
fsrc fs.Fs
|
||||||
|
extrFs *extractFs
|
||||||
|
srcFileName string
|
||||||
|
hashType hash.Type // common hash to use
|
||||||
|
hashOption *fs.HashesOption // open option for the common hash
|
||||||
|
// internal state
|
||||||
|
ci *fs.ConfigInfo // global config
|
||||||
|
ctx context.Context // internal context for controlling go-routines
|
||||||
|
cancel func() // cancel the context
|
||||||
|
maxDurationEndTime time.Time // end time if --max-duration is set
|
||||||
|
tarFormat tar.Format // tar format to use
|
||||||
|
checkersWg sync.WaitGroup // wait group for auxiliary goroutines
|
||||||
|
transfersWg sync.WaitGroup // wait group for auxiliary goroutines
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
lastError error
|
||||||
|
errCnt atomic.Int32
|
||||||
|
dsh *DirTreeHolder
|
||||||
|
}
|
||||||
|
|
||||||
|
type objectInfo struct {
|
||||||
|
bytesPayload []byte
|
||||||
|
readerPayload io.Reader
|
||||||
|
closeFn func() error
|
||||||
|
header FileHeader
|
||||||
|
}
|
||||||
|
|
||||||
|
type extractFs struct {
|
||||||
|
// parameters
|
||||||
|
fsrc fs.Fs
|
||||||
|
srcFileName string
|
||||||
|
features *fs.Features
|
||||||
|
transfers int
|
||||||
|
// internal state
|
||||||
|
mu sync.Mutex
|
||||||
|
tarCache *TarCache
|
||||||
|
rawReader io.ReadCloser
|
||||||
|
cache []objectInfo
|
||||||
|
reopenRequests chan reopenRequest
|
||||||
|
openOptions []fs.OpenOption
|
||||||
|
streamTaskWatcher chan struct{}
|
||||||
|
tarFormat tar.Format // tar format to use
|
||||||
|
wg sync.WaitGroup // wait group for auxiliary goroutines
|
||||||
|
}
|
||||||
|
|
||||||
|
type object struct {
|
||||||
|
fs *extractFs
|
||||||
|
header FileHeader
|
||||||
|
payloadType payloadType
|
||||||
|
payload []byte // payload of small-sized objects
|
||||||
|
closeFn func() error
|
||||||
|
reopenRequest chan reopenRequest
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
readerPayload io.Reader // payload of large objects
|
||||||
|
}
|
||||||
|
|
||||||
|
type reopenRequest struct {
|
||||||
|
filePath string
|
||||||
|
obj *object
|
||||||
|
respCh chan error
|
||||||
|
}
|
||||||
|
|
||||||
|
type readCloserWrapper struct {
|
||||||
|
io.Reader
|
||||||
|
closeFn func() error
|
||||||
|
}
|
||||||
|
|
||||||
|
type copyJob struct {
|
||||||
|
src fs.Object
|
||||||
|
dst fs.Object
|
||||||
|
path string
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeListDir(ctx context.Context, f fs.Fs) listDirFn {
|
||||||
|
return func(dir string) (*Dir, error) {
|
||||||
|
dirCtx := filter.SetUseFilter(ctx, f.Features().FilterAware) // make filter-aware backends constrain List
|
||||||
|
entities, err := list.DirSorted(dirCtx, f, false, dir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res := NewDir(dir)
|
||||||
|
for _, entry := range entities {
|
||||||
|
if obj, ok := entry.(fs.Object); ok {
|
||||||
|
o := NewObject(false, obj)
|
||||||
|
res.AddChildObject(obj.Remote(), o)
|
||||||
|
}
|
||||||
|
if d, ok := entry.(fs.Directory); ok {
|
||||||
|
o := NewObject(true, nil)
|
||||||
|
res.AddChildObject(d.Remote(), o)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newReadCloserWrapper(r io.Reader, closeFn func() error) io.ReadCloser {
|
||||||
|
return &readCloserWrapper{
|
||||||
|
r,
|
||||||
|
closeFn,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *readCloserWrapper) Close() error {
|
||||||
|
return r.closeFn()
|
||||||
|
}
|
||||||
|
|
||||||
|
func newReopenRequest(filePath string, obj *object) reopenRequest {
|
||||||
|
return reopenRequest{
|
||||||
|
filePath: filePath,
|
||||||
|
obj: obj,
|
||||||
|
respCh: make(chan error),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *object) Fs() fs.Info {
|
||||||
|
return o.fs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *object) String() string {
|
||||||
|
if o == nil {
|
||||||
|
return "<nil>"
|
||||||
|
}
|
||||||
|
return o.header.FilePath
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *object) IsStream() bool {
|
||||||
|
return o.payloadType == payloadTypeIoReader
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *object) Remote() string {
|
||||||
|
return o.header.FilePath
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *object) ModTime(_ context.Context) time.Time {
|
||||||
|
return o.header.ModTime
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *object) Size() int64 {
|
||||||
|
return o.header.Size
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *object) Hash(_ context.Context, _ hash.Type) (string, error) {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *object) Storable() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *object) SetModTime(_ context.Context, _ time.Time) error {
|
||||||
|
panic("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *object) bytesOpen() (io.ReadCloser, error) {
|
||||||
|
var err error
|
||||||
|
var once sync.Once
|
||||||
|
return newReadCloserWrapper(bytes.NewReader(o.payload), func() error {
|
||||||
|
once.Do(func() {
|
||||||
|
err = o.closeFn()
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *object) streamOpen() (io.ReadCloser, error) {
|
||||||
|
o.mu.Lock()
|
||||||
|
if o.readerPayload == nil {
|
||||||
|
o.mu.Unlock()
|
||||||
|
reopenReq := newReopenRequest(o.header.FilePath, o)
|
||||||
|
o.reopenRequest <- reopenReq
|
||||||
|
err := <-reopenReq.respCh
|
||||||
|
close(reopenReq.respCh)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
o.mu.Lock()
|
||||||
|
if o.readerPayload == nil {
|
||||||
|
panic("nil readerPayload")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
defer o.mu.Unlock()
|
||||||
|
var err error
|
||||||
|
var once sync.Once
|
||||||
|
return newReadCloserWrapper(o.readerPayload, func() error {
|
||||||
|
once.Do(func() {
|
||||||
|
o.mu.Lock()
|
||||||
|
o.readerPayload = nil
|
||||||
|
o.mu.Unlock()
|
||||||
|
err = o.closeFn()
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *object) Open(_ context.Context, _ ...fs.OpenOption) (io.ReadCloser, error) {
|
||||||
|
if o.payloadType == payloadTypeBytes {
|
||||||
|
return o.bytesOpen()
|
||||||
|
}
|
||||||
|
return o.streamOpen()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *object) Update(_ context.Context, _ io.Reader, _ fs.ObjectInfo, _ ...fs.OpenOption) error {
|
||||||
|
panic("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *object) Remove(_ context.Context) error {
|
||||||
|
panic("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extractFs) Name() string {
|
||||||
|
return "extract"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extractFs) Root() string {
|
||||||
|
return fmt.Sprintf("%s:%s/%s", e.fsrc.Name(), e.fsrc.Root(), e.srcFileName)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extractFs) String() string {
|
||||||
|
return fmt.Sprintf("tar.gz archive %s at %s", e.Name(), e.Root())
|
||||||
|
}
|
||||||
|
|
||||||
|
func newExtractFs(ctx context.Context, fsrc fs.Fs, srcFileName string, transfers int) *extractFs {
|
||||||
|
e := &extractFs{
|
||||||
|
fsrc: fsrc,
|
||||||
|
srcFileName: srcFileName,
|
||||||
|
transfers: transfers,
|
||||||
|
reopenRequests: make(chan reopenRequest, transfers*2),
|
||||||
|
streamTaskWatcher: make(chan struct{}),
|
||||||
|
}
|
||||||
|
close(e.streamTaskWatcher)
|
||||||
|
e.features = (&fs.Features{
|
||||||
|
CanHaveEmptyDirectories: false,
|
||||||
|
}).Fill(ctx, e)
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extractFs) Precision() time.Duration {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
switch e.tarFormat {
|
||||||
|
case tar.FormatPAX:
|
||||||
|
return time.Millisecond
|
||||||
|
default:
|
||||||
|
return time.Second
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extractFs) Open(ctx context.Context, options ...fs.OpenOption) error {
|
||||||
|
e.openOptions = options
|
||||||
|
return e.reopen(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extractFs) reopen(ctx context.Context) error {
|
||||||
|
obj, err := e.fsrc.NewObject(ctx, e.srcFileName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
reader, err := obj.Open(ctx, e.openOptions...)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
gzr, err := gzip.NewReader(reader)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
tarReader := tar.NewReader(gzr)
|
||||||
|
t := NewTarCache(tarReader, e.transfers, 2000)
|
||||||
|
e.mu.Lock()
|
||||||
|
e.tarCache = t
|
||||||
|
e.rawReader = reader
|
||||||
|
e.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extractFs) Hashes() hash.Set {
|
||||||
|
return hash.Set(hash.None)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extractFs) Features() *fs.Features {
|
||||||
|
return e.features
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extractFs) processReopenRequest(ctx context.Context, reopenReq reopenRequest) error {
|
||||||
|
err := e.reopen(ctx)
|
||||||
|
if err != nil {
|
||||||
|
reopenReq.respCh <- err
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
e.mu.Lock()
|
||||||
|
tarCache := e.tarCache
|
||||||
|
e.mu.Unlock()
|
||||||
|
bytesPayload, readerPayload, closer, header, err := tarCache.ForwardToFile(reopenReq.filePath)
|
||||||
|
if err != nil {
|
||||||
|
reopenReq.respCh <- err
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
e.mu.Lock()
|
||||||
|
found := false
|
||||||
|
for idx, itm := range e.cache {
|
||||||
|
if itm.header.FilePath == reopenReq.filePath {
|
||||||
|
itm.readerPayload = readerPayload
|
||||||
|
itm.bytesPayload = bytesPayload
|
||||||
|
itm.closeFn = closer
|
||||||
|
itm.header = header
|
||||||
|
e.cache[idx] = itm
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
e.mu.Unlock()
|
||||||
|
if !found {
|
||||||
|
panic("cache must contain information about this object")
|
||||||
|
}
|
||||||
|
|
||||||
|
obj, err := e.NewObject(ctx, reopenReq.filePath)
|
||||||
|
if err != nil {
|
||||||
|
reopenReq.respCh <- err
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
o := obj.(*object)
|
||||||
|
reopenReq.obj.readerPayload = o.readerPayload
|
||||||
|
reopenReq.obj.closeFn = o.closeFn
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extractFs) AddNextObject(ctx context.Context) (string, error) {
|
||||||
|
e.mu.Lock()
|
||||||
|
tarCache := e.tarCache
|
||||||
|
e.mu.Unlock()
|
||||||
|
if tarCache == nil {
|
||||||
|
return "", errors.New("todo: cannot add next object")
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
L:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
// We block execution flow until the goroutine responsible for copying
|
||||||
|
// stream has completed its work.
|
||||||
|
case <-e.streamTaskWatcher:
|
||||||
|
break L
|
||||||
|
case <-ctx.Done():
|
||||||
|
return "", ctx.Err()
|
||||||
|
case reopenReq := <-e.reopenRequests:
|
||||||
|
err = e.processReopenRequest(ctx, reopenReq)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
var o objectInfo
|
||||||
|
o.bytesPayload, o.readerPayload, o.closeFn, o.header, err = e.tarCache.Next()
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
// There can be two possible errors here:
|
||||||
|
|
||||||
|
// 1. An error occurred when reading data by goroutine.
|
||||||
|
// 2. An error occurred while reading the next header.
|
||||||
|
// In any case, we need to wait for all working goroutines to complete their work.
|
||||||
|
watcherCh := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
_ = tarCache.Close()
|
||||||
|
close(watcherCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
L2:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case reopenReq := <-e.reopenRequests:
|
||||||
|
err = e.processReopenRequest(ctx, reopenReq)
|
||||||
|
continue
|
||||||
|
case <-watcherCh:
|
||||||
|
// All the working goroutines have completed their work
|
||||||
|
break L2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
e.cache = append(e.cache, o)
|
||||||
|
if e.tarFormat == tar.FormatUnknown {
|
||||||
|
e.tarFormat = o.header.Format
|
||||||
|
}
|
||||||
|
if len(e.cache) > e.transfers*3 {
|
||||||
|
e.cache = e.cache[1:]
|
||||||
|
}
|
||||||
|
|
||||||
|
return o.header.FilePath, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extractFs) newObject(ctx context.Context, objInfo objectInfo) (fs.Object, error) {
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
pt := payloadTypeBytes
|
||||||
|
if objInfo.readerPayload != nil {
|
||||||
|
pt = payloadTypeIoReader
|
||||||
|
}
|
||||||
|
return &object{
|
||||||
|
fs: e,
|
||||||
|
header: objInfo.header,
|
||||||
|
payload: objInfo.bytesPayload,
|
||||||
|
readerPayload: objInfo.readerPayload,
|
||||||
|
payloadType: pt,
|
||||||
|
closeFn: objInfo.closeFn,
|
||||||
|
reopenRequest: e.reopenRequests,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extractFs) NewObject(ctx context.Context, remote string) (obj fs.Object, err error) {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
for _, objInfo := range e.cache {
|
||||||
|
if objInfo.header.FilePath == remote {
|
||||||
|
return e.newObject(ctx, objInfo)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
|
return nil, fs.ErrorObjectNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extractFs) setStreamTaskWatcherCh(ch chan struct{}) {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
e.streamTaskWatcher = ch
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extractFs) streamTaskWatcherCh() chan struct{} {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
return e.streamTaskWatcher
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extractFs) List(_ context.Context, _ string) (entries fs.DirEntries, err error) {
|
||||||
|
panic("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extractFs) Put(_ context.Context, _ io.Reader, _ fs.ObjectInfo, _ ...fs.OpenOption) (fs.Object, error) {
|
||||||
|
panic("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extractFs) Mkdir(_ context.Context, _ string) error {
|
||||||
|
panic("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extractFs) Rmdir(_ context.Context, _ string) error {
|
||||||
|
panic("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extractFs) Remove(_ context.Context, _ fs.Object) error {
|
||||||
|
panic("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func newExtract(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, srcFileName string) *extract {
|
||||||
|
ci := fs.GetConfig(ctx)
|
||||||
|
|
||||||
|
s := &extract{
|
||||||
|
ctx: ctx,
|
||||||
|
ci: ci,
|
||||||
|
fdst: fdst,
|
||||||
|
fsrc: fsrc,
|
||||||
|
srcFileName: srcFileName,
|
||||||
|
}
|
||||||
|
if ci.MaxDuration > 0 {
|
||||||
|
s.maxDurationEndTime = time.Now().Add(ci.MaxDuration)
|
||||||
|
fs.Infof(s.fdst, "Transfer session %v deadline: %s", ci.CutoffMode, s.maxDurationEndTime.Format("2006/01/02 15:04:05"))
|
||||||
|
}
|
||||||
|
|
||||||
|
// If a max session duration has been defined add a deadline
|
||||||
|
// to the main context if cutoff mode is hard. This will cut
|
||||||
|
// the transfers off.
|
||||||
|
if !s.maxDurationEndTime.IsZero() && ci.CutoffMode == fs.CutoffModeHard {
|
||||||
|
s.ctx, s.cancel = context.WithDeadline(ctx, s.maxDurationEndTime)
|
||||||
|
} else {
|
||||||
|
s.ctx, s.cancel = context.WithCancel(ctx)
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extract) transfer(ctx context.Context, in <-chan copyJob) {
|
||||||
|
defer e.transfersWg.Done()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case job, ok := <-in:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
obj := job.src.(*object)
|
||||||
|
|
||||||
|
var watcher chan struct{}
|
||||||
|
if obj.IsStream() {
|
||||||
|
watcher = make(chan struct{})
|
||||||
|
e.extrFs.setStreamTaskWatcherCh(watcher)
|
||||||
|
}
|
||||||
|
_, err := operations.Copy(ctx, e.fdst, job.dst, job.path, job.src)
|
||||||
|
|
||||||
|
// It is possible that operations.Copy() does not call the Open() method for job.src internally.
|
||||||
|
// Therefore, to avoid freezing when receiving information about the next file in
|
||||||
|
// the extractFs.AddNextObject() method, we force a call to o.closeFn().
|
||||||
|
o := job.src.(*object)
|
||||||
|
_ = o.closeFn()
|
||||||
|
if watcher != nil {
|
||||||
|
close(watcher)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
e.errCnt.Add(1)
|
||||||
|
e.mu.Lock()
|
||||||
|
if e.lastError != nil {
|
||||||
|
e.lastError = err
|
||||||
|
}
|
||||||
|
e.mu.Unlock()
|
||||||
|
e.cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extract) runTransfers(ctx context.Context, in <-chan copyJob) {
|
||||||
|
for i := 0; i < e.ci.Transfers; i++ {
|
||||||
|
e.transfersWg.Add(1)
|
||||||
|
go e.transfer(ctx, in)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extract) checker(ctx context.Context, in <-chan string, out chan<- copyJob) {
|
||||||
|
defer e.checkersWg.Done()
|
||||||
|
for {
|
||||||
|
var filePath string
|
||||||
|
var ok bool
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case filePath, ok = <-in:
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
idx := strings.LastIndex(filePath, "/")
|
||||||
|
var dir string
|
||||||
|
if idx != -1 {
|
||||||
|
dir = filePath[:idx]
|
||||||
|
}
|
||||||
|
err := e.dsh.UpdateDirList(ctx, dir)
|
||||||
|
if err != nil {
|
||||||
|
// TODO: think about retries
|
||||||
|
e.cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
src, err := e.extrFs.NewObject(ctx, filePath)
|
||||||
|
if err != nil {
|
||||||
|
e.cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
job := copyJob{
|
||||||
|
src: src,
|
||||||
|
path: filePath,
|
||||||
|
}
|
||||||
|
if objInfo, ok := e.dsh.GetObjectByPath(filePath); ok && !objInfo.isDir {
|
||||||
|
if dst := objInfo.data.(fs.Object); dst != nil {
|
||||||
|
job.dst = dst
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out <- job
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extract) runCheckers(ctx context.Context, in <-chan string, out chan<- copyJob) {
|
||||||
|
for i := 0; i < e.ci.Transfers; i++ {
|
||||||
|
e.checkersWg.Add(1)
|
||||||
|
go e.checker(ctx, in, out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *extract) run() error {
|
||||||
|
fExtr := newExtractFs(e.ctx, e.fsrc, e.srcFileName, e.ci.Transfers)
|
||||||
|
|
||||||
|
e.dsh = NewDirStructHolder(makeListDir(e.ctx, e.fdst))
|
||||||
|
e.extrFs = fExtr
|
||||||
|
|
||||||
|
e.hashType, e.hashOption = operations.CommonHash(e.ctx, fExtr, e.fdst)
|
||||||
|
|
||||||
|
// Options for the download
|
||||||
|
downloadOptions := []fs.OpenOption{e.hashOption}
|
||||||
|
for _, option := range e.ci.DownloadHeaders {
|
||||||
|
downloadOptions = append(downloadOptions, option)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := fExtr.Open(e.ctx, downloadOptions...)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
pathCh := make(chan string, e.ci.Transfers)
|
||||||
|
jobCh := make(chan copyJob, e.ci.Transfers)
|
||||||
|
|
||||||
|
e.runCheckers(e.ctx, pathCh, jobCh)
|
||||||
|
e.runTransfers(e.ctx, jobCh)
|
||||||
|
|
||||||
|
var exitErr error
|
||||||
|
for {
|
||||||
|
path, err := fExtr.AddNextObject(e.ctx)
|
||||||
|
if err != nil {
|
||||||
|
exitErr = err
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if path == "" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
pathCh <- path
|
||||||
|
}
|
||||||
|
close(pathCh)
|
||||||
|
e.checkersWg.Wait()
|
||||||
|
close(jobCh)
|
||||||
|
e.transfersWg.Wait()
|
||||||
|
if e.lastError != nil {
|
||||||
|
return e.lastError
|
||||||
|
}
|
||||||
|
return exitErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func Extract(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, srcFileName string) error {
|
||||||
|
do := newExtract(ctx, fdst, fsrc, srcFileName)
|
||||||
|
return do.run()
|
||||||
|
}
|
87
fs/extract/extract_test.go
Normal file
87
fs/extract/extract_test.go
Normal file
|
@ -0,0 +1,87 @@
|
||||||
|
package extract
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/rclone/rclone/backend/memory"
|
||||||
|
"github.com/rclone/rclone/fs"
|
||||||
|
"github.com/rclone/rclone/fstest/mockobject"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
// test cases
|
||||||
|
// 1. Happy path: extract all files from archive to a directory;
|
||||||
|
// 2. Fail once when writing a file to the target directory and write it successfully when writing the same file again;
|
||||||
|
// 3. Fail all attempts to write a file to the target directory;
|
||||||
|
// 4. Fail once when reading a file from the archive, but read it successfully the second time;
|
||||||
|
// 5. Fail all attempts to read a file from the archive;
|
||||||
|
|
||||||
|
func TestExtract(t *testing.T) {
|
||||||
|
t.Run("Test happy path", testHappyPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkFunc(ctx context.Context, t *testing.T, dstFs fs.Fs, files map[string]struct{}, itm *item) {
|
||||||
|
for _, obj := range itm.children {
|
||||||
|
if obj.IsDir() {
|
||||||
|
checkFunc(ctx, t, dstFs, files, obj)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
_, ok := files[obj.Name()]
|
||||||
|
require.True(t, ok)
|
||||||
|
delete(files, obj.Name())
|
||||||
|
o, err := dstFs.NewObject(ctx, obj.Name())
|
||||||
|
require.NoError(t, err)
|
||||||
|
reader, err := o.Open(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, reader)
|
||||||
|
actual, err := io.ReadAll(reader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, obj.fileContent, actual)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testHappyPath(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
ctx, ci := fs.AddConfig(ctx)
|
||||||
|
ci.Transfers = 10
|
||||||
|
ci2 := fs.GetConfig(ctx)
|
||||||
|
require.Equal(t, ci.Transfers, ci2.Transfers)
|
||||||
|
|
||||||
|
rootItem, archiveContent := createTarArchive(t, 5, 6, 8, 50, 1000)
|
||||||
|
archiveFile := "archive.tar.gz"
|
||||||
|
srcFs, err := memory.NewFs(ctx, "memory", "src:dir", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
o := mockobject.New(archiveFile).WithContent(archiveContent, mockobject.SeekModeNone)
|
||||||
|
reader, err := o.Open(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Put archive file to source fs
|
||||||
|
_, err = srcFs.Put(ctx, reader, o)
|
||||||
|
require.NoError(t, err)
|
||||||
|
dstFs, err := memory.NewFs(ctx, "memory", "dst:dir", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Extract archive to dst fs
|
||||||
|
err = Extract(ctx, dstFs, srcFs, archiveFile)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ll := make(map[string]struct{})
|
||||||
|
listRer := dstFs.(fs.ListRer)
|
||||||
|
require.NotNil(t, listRer)
|
||||||
|
err = listRer.ListR(ctx, "", func(entries fs.DirEntries) error {
|
||||||
|
for _, entry := range entries {
|
||||||
|
obj, ok := entry.(fs.Object)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ll[obj.Remote()] = struct{}{}
|
||||||
|
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
checkFunc(ctx, t, dstFs, ll, rootItem)
|
||||||
|
|
||||||
|
require.Equal(t, 0, len(ll))
|
||||||
|
}
|
190
fs/extract/tarcache.go
Normal file
190
fs/extract/tarcache.go
Normal file
|
@ -0,0 +1,190 @@
|
||||||
|
package extract
|
||||||
|
|
||||||
|
import (
|
||||||
|
"archive/tar"
|
||||||
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TarCache struct {
|
||||||
|
reader *tar.Reader
|
||||||
|
workerCnt int
|
||||||
|
maxSizeInMemory int64
|
||||||
|
|
||||||
|
ch chan struct{}
|
||||||
|
streamWg sync.WaitGroup
|
||||||
|
wg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
type FileHeader struct {
|
||||||
|
FilePath string
|
||||||
|
ModTime time.Time
|
||||||
|
Size int64
|
||||||
|
Format tar.Format
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
streamReaderAlreadyCloseError = errors.New("streamReadCloser already closed")
|
||||||
|
bytesReaderAlreadyCloseError = errors.New("bytesReadCloser already closed")
|
||||||
|
)
|
||||||
|
|
||||||
|
func (t *TarCache) WaitWorkersDone() {
|
||||||
|
t.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TarCache) Close() error {
|
||||||
|
t.WaitWorkersDone()
|
||||||
|
if t.ch != nil {
|
||||||
|
close(t.ch)
|
||||||
|
t.ch = nil
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return errors.New("TarCache already closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTarCache(reader *tar.Reader, workerCnt int, maxSizeInMemory int64) *TarCache {
|
||||||
|
return &TarCache{
|
||||||
|
reader: reader,
|
||||||
|
workerCnt: workerCnt,
|
||||||
|
maxSizeInMemory: maxSizeInMemory,
|
||||||
|
ch: make(chan struct{}, workerCnt),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type helper struct {
|
||||||
|
io.Reader
|
||||||
|
closer func() error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *helper) Close() error {
|
||||||
|
return h.closer()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TarCache) NextPayload() (_ io.ReadCloser, _ FileHeader, err error) {
|
||||||
|
bytesPayload, reader, closer, header, err := t.Next()
|
||||||
|
if err != nil {
|
||||||
|
return nil, FileHeader{}, err
|
||||||
|
}
|
||||||
|
if reader != nil {
|
||||||
|
return &helper{reader, closer}, header, nil
|
||||||
|
}
|
||||||
|
reader = bytes.NewReader(bytesPayload)
|
||||||
|
return &helper{reader, closer}, header, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type streamCloser struct {
|
||||||
|
ch chan struct{}
|
||||||
|
cache *TarCache
|
||||||
|
isBytes bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func newStreamCloser(cache *TarCache, isBytes bool) *streamCloser {
|
||||||
|
if !isBytes {
|
||||||
|
cache.streamWg.Add(1)
|
||||||
|
}
|
||||||
|
cache.wg.Add(1)
|
||||||
|
return &streamCloser{
|
||||||
|
ch: cache.ch,
|
||||||
|
cache: cache,
|
||||||
|
isBytes: isBytes,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *streamCloser) close() error {
|
||||||
|
if s.ch == nil {
|
||||||
|
if s.isBytes {
|
||||||
|
return bytesReaderAlreadyCloseError
|
||||||
|
}
|
||||||
|
return streamReaderAlreadyCloseError
|
||||||
|
}
|
||||||
|
ch := s.ch
|
||||||
|
s.ch = nil
|
||||||
|
<-ch
|
||||||
|
if !s.isBytes {
|
||||||
|
s.cache.streamWg.Done()
|
||||||
|
}
|
||||||
|
s.cache.wg.Done()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (t *TarCache) ForwardToFile(filePath string) (_ []byte, _ io.Reader, _ func() error, _ FileHeader, err error) {
|
||||||
|
for {
|
||||||
|
header, err := t.reader.Next()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, FileHeader{}, err
|
||||||
|
}
|
||||||
|
switch header.Typeflag {
|
||||||
|
case tar.TypeDir:
|
||||||
|
continue
|
||||||
|
case tar.TypeReg:
|
||||||
|
if filePath != header.Name {
|
||||||
|
// TODO: handle case insensitivity
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
h := FileHeader{
|
||||||
|
FilePath: header.Name,
|
||||||
|
ModTime: header.ModTime,
|
||||||
|
Size: header.Size,
|
||||||
|
Format: header.Format,
|
||||||
|
}
|
||||||
|
|
||||||
|
if header.Size < t.maxSizeInMemory {
|
||||||
|
payload, err := io.ReadAll(t.reader)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, FileHeader{}, err
|
||||||
|
}
|
||||||
|
closer := newStreamCloser(t, true)
|
||||||
|
return payload, nil, closer.close, h, nil
|
||||||
|
}
|
||||||
|
closer := newStreamCloser(t, false)
|
||||||
|
return nil, t.reader, closer.close, h, nil
|
||||||
|
default:
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TarCache) Next() (_ []byte, _ io.Reader, _ func() error, _ FileHeader, err error) {
|
||||||
|
// We block the execution flow if the number of currently processed files exceeds the limit.
|
||||||
|
t.ch <- struct{}{}
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
<-t.ch
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
// We also block the execution flow if there is a reader without caching.
|
||||||
|
t.streamWg.Wait()
|
||||||
|
for {
|
||||||
|
header, err := t.reader.Next()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, FileHeader{}, err
|
||||||
|
}
|
||||||
|
switch header.Typeflag {
|
||||||
|
case tar.TypeDir:
|
||||||
|
continue
|
||||||
|
case tar.TypeReg:
|
||||||
|
h := FileHeader{
|
||||||
|
FilePath: header.Name,
|
||||||
|
ModTime: header.ModTime,
|
||||||
|
Size: header.Size,
|
||||||
|
Format: header.Format,
|
||||||
|
}
|
||||||
|
|
||||||
|
if header.Size < t.maxSizeInMemory {
|
||||||
|
var payload []byte
|
||||||
|
payload, err = io.ReadAll(t.reader)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, FileHeader{}, err
|
||||||
|
}
|
||||||
|
closer := newStreamCloser(t, true)
|
||||||
|
return payload, nil, closer.close, h, nil
|
||||||
|
}
|
||||||
|
closer := newStreamCloser(t, false)
|
||||||
|
return nil, t.reader, closer.close, h, nil
|
||||||
|
default:
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
495
fs/extract/tarcache_test.go
Normal file
495
fs/extract/tarcache_test.go
Normal file
|
@ -0,0 +1,495 @@
|
||||||
|
package extract
|
||||||
|
|
||||||
|
import (
|
||||||
|
"archive/tar"
|
||||||
|
"bytes"
|
||||||
|
"compress/gzip"
|
||||||
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/fs"
|
||||||
|
"math"
|
||||||
|
"math/rand"
|
||||||
|
"path"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type itemType int
|
||||||
|
|
||||||
|
const (
|
||||||
|
dir itemType = iota
|
||||||
|
file
|
||||||
|
)
|
||||||
|
|
||||||
|
type item struct {
|
||||||
|
t itemType
|
||||||
|
name string
|
||||||
|
children map[string]*item
|
||||||
|
fileContent []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *item) Name() string {
|
||||||
|
return i.name
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *item) Size() int64 {
|
||||||
|
return int64(len(i.fileContent))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *item) Mode() fs.FileMode {
|
||||||
|
res := fs.ModePerm
|
||||||
|
if i.t == dir {
|
||||||
|
res |= fs.ModeDir
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *item) ModTime() time.Time {
|
||||||
|
return time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *item) IsDir() bool {
|
||||||
|
return i.t == dir
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *item) Sys() any {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func dice(rnd *rand.Rand, percent int) bool {
|
||||||
|
return rnd.Intn(100) < percent
|
||||||
|
}
|
||||||
|
|
||||||
|
func randomFileName(rnd *rand.Rand) string {
|
||||||
|
b := make([]byte, 16)
|
||||||
|
rnd.Read(b)
|
||||||
|
return hex.EncodeToString(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
type errorReader struct {
|
||||||
|
io.Reader
|
||||||
|
|
||||||
|
bytesRead int
|
||||||
|
threshold int
|
||||||
|
}
|
||||||
|
|
||||||
|
var readError = errors.New("read error")
|
||||||
|
|
||||||
|
func newReaderWithError(rc io.Reader, threshold int) *errorReader {
|
||||||
|
return &errorReader{rc, 0, threshold}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *errorReader) Read(p []byte) (n int, err error) {
|
||||||
|
if e.bytesRead > e.threshold {
|
||||||
|
return 0, readError
|
||||||
|
}
|
||||||
|
n, err = e.Reader.Read(p)
|
||||||
|
e.bytesRead += n
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *item) createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) {
|
||||||
|
if deep <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
|
||||||
|
nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir
|
||||||
|
|
||||||
|
for j := 0; j < nItem; j++ {
|
||||||
|
isFile := deep == 1 || dice(rnd, 60)
|
||||||
|
newItem := item{}
|
||||||
|
var prefix string
|
||||||
|
if len(i.name) > 0 {
|
||||||
|
prefix = i.name + "/"
|
||||||
|
}
|
||||||
|
if isFile {
|
||||||
|
newItem.name = fmt.Sprintf("%sfile_%s", prefix, randomFileName(rnd))
|
||||||
|
newItem.t = file
|
||||||
|
var fileSize int
|
||||||
|
isSmallFile := dice(rnd, smallFilesPercent)
|
||||||
|
if isSmallFile {
|
||||||
|
// [1, bigFileSize)
|
||||||
|
fileSize = rnd.Intn(bigFileSize-1) + 1
|
||||||
|
} else {
|
||||||
|
// [bigFileSize, 2*bigFileSize)
|
||||||
|
fileSize = rnd.Intn(bigFileSize) + bigFileSize
|
||||||
|
}
|
||||||
|
newItem.fileContent = make([]byte, fileSize)
|
||||||
|
rnd.Read(newItem.fileContent)
|
||||||
|
i.children[newItem.name] = &newItem
|
||||||
|
} else {
|
||||||
|
newItem.name = fmt.Sprintf("%sdir_%s", prefix, randomFileName(rnd))
|
||||||
|
newItem.t = dir
|
||||||
|
newItem.children = make(map[string]*item)
|
||||||
|
newItem.createTarArchiveContent(deep-1, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
|
||||||
|
i.children[newItem.name] = &newItem
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *item) fillMap(m *sync.Map) {
|
||||||
|
for _, v := range i.children {
|
||||||
|
if v.t == file {
|
||||||
|
m.Store(v.name, v.fileContent)
|
||||||
|
} else if v.t == dir {
|
||||||
|
v.fillMap(m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) *item {
|
||||||
|
root := &item{
|
||||||
|
t: dir,
|
||||||
|
name: "",
|
||||||
|
children: make(map[string]*item),
|
||||||
|
}
|
||||||
|
|
||||||
|
root.createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
|
||||||
|
return root
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *item) writeItemToArchiver(tw *tar.Writer) error {
|
||||||
|
// In the first pass, we write the files from the current directory.
|
||||||
|
for _, child := range i.children {
|
||||||
|
if child.t == file {
|
||||||
|
head, err := tar.FileInfoHeader(child, path.Base(child.name))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
head.Name = child.name
|
||||||
|
err = tw.WriteHeader(head)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = io.Copy(tw, bytes.NewReader(child.fileContent))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// In the second pass, we write files from child directories.
|
||||||
|
for _, child := range i.children {
|
||||||
|
if child.t == dir {
|
||||||
|
if err := child.writeItemToArchiver(tw); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func createTarArchiveHelper(writer io.Writer, content *item) error {
|
||||||
|
gz := gzip.NewWriter(writer)
|
||||||
|
defer func() {
|
||||||
|
_ = gz.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
tw := tar.NewWriter(gz)
|
||||||
|
defer func() {
|
||||||
|
_ = tw.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
return content.writeItemToArchiver(tw)
|
||||||
|
}
|
||||||
|
|
||||||
|
func createTarArchive(t *testing.T, deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) (*item, []byte) {
|
||||||
|
content := createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
|
||||||
|
|
||||||
|
writer := bytes.NewBuffer(make([]byte, 0))
|
||||||
|
|
||||||
|
err := createTarArchiveHelper(writer, content)
|
||||||
|
archData := writer.Bytes()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, content)
|
||||||
|
require.NotNil(t, archData)
|
||||||
|
|
||||||
|
return content, archData
|
||||||
|
}
|
||||||
|
|
||||||
|
func createTarArchiveWithOneFileHelper(w io.Writer) error {
|
||||||
|
gz := gzip.NewWriter(w)
|
||||||
|
defer func() {
|
||||||
|
_ = gz.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
tw := tar.NewWriter(gz)
|
||||||
|
defer func() {
|
||||||
|
_ = tw.Close()
|
||||||
|
}()
|
||||||
|
itm := item{
|
||||||
|
t: file,
|
||||||
|
name: "test.bin",
|
||||||
|
fileContent: make([]byte, 0x80000),
|
||||||
|
}
|
||||||
|
|
||||||
|
rootItm := item{
|
||||||
|
t: dir,
|
||||||
|
children: map[string]*item{
|
||||||
|
itm.name: &itm,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
rnd.Read(itm.fileContent)
|
||||||
|
return rootItm.writeItemToArchiver(tw)
|
||||||
|
}
|
||||||
|
|
||||||
|
func createTarArchiveWithOneFile(t *testing.T) []byte {
|
||||||
|
writer := bytes.NewBuffer(make([]byte, 0))
|
||||||
|
err := createTarArchiveWithOneFileHelper(writer)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return writer.Bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
func doubleCloseCaseHelper(t *testing.T, readerTypeChecker func(io.Reader) bool, errorText string) {
|
||||||
|
bigFileSize := 0x50
|
||||||
|
content, archData := createTarArchive(t, 5, 5, 10, 70, bigFileSize)
|
||||||
|
|
||||||
|
reader := bytes.NewReader(archData)
|
||||||
|
|
||||||
|
gzf, err := gzip.NewReader(reader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tarReader := tar.NewReader(gzf)
|
||||||
|
tarCache := NewTarCache(tarReader, 10, int64(bigFileSize))
|
||||||
|
|
||||||
|
var m sync.Map
|
||||||
|
content.fillMap(&m)
|
||||||
|
|
||||||
|
errCh := make(chan error, 100)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var cnt int32
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
break
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
contentReader, header, err := tarCache.NextPayload()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
go func(reader io.ReadCloser, head FileHeader) {
|
||||||
|
defer wg.Done()
|
||||||
|
defer func() {
|
||||||
|
if err := reader.Close(); err != nil {
|
||||||
|
errCh <- err
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
val, ok := m.Load(header.FilePath)
|
||||||
|
if !ok {
|
||||||
|
errCh <- errors.New(header.FilePath + " not found")
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
expected := val.([]byte)
|
||||||
|
content, err := io.ReadAll(reader)
|
||||||
|
if err != nil {
|
||||||
|
errCh <- err
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !bytes.Equal(expected, content) {
|
||||||
|
errCh <- errors.New(header.FilePath + " content mismatch")
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if atomic.AddInt32(&cnt, 1) >= 100 {
|
||||||
|
if readerTypeChecker(reader) {
|
||||||
|
if err := reader.Close(); err != nil {
|
||||||
|
errCh <- err
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(contentReader, header)
|
||||||
|
}
|
||||||
|
err = tarCache.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
wg.Wait()
|
||||||
|
close(errCh)
|
||||||
|
|
||||||
|
for e := range errCh {
|
||||||
|
if err == nil {
|
||||||
|
err = e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.EqualError(t, err, errorText)
|
||||||
|
}
|
||||||
|
|
||||||
|
func readErrorCase(t *testing.T) {
|
||||||
|
archData := createTarArchiveWithOneFile(t)
|
||||||
|
|
||||||
|
reader := newReaderWithError(bytes.NewReader(archData), 0x800)
|
||||||
|
|
||||||
|
gzf, err := gzip.NewReader(reader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tarReader := tar.NewReader(gzf)
|
||||||
|
tarCache := NewTarCache(tarReader, 10, math.MaxInt)
|
||||||
|
|
||||||
|
errCh := make(chan error, 100)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
_, _, err = tarCache.NextPayload()
|
||||||
|
|
||||||
|
require.NotErrorIs(t, err, io.EOF)
|
||||||
|
require.Error(t, err, readError.Error())
|
||||||
|
|
||||||
|
err = tarCache.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
wg.Wait()
|
||||||
|
close(errCh)
|
||||||
|
|
||||||
|
for e := range errCh {
|
||||||
|
if err == nil {
|
||||||
|
err = e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func successfulCase(t *testing.T) {
|
||||||
|
bigFileSize := 0x200
|
||||||
|
content, archData := createTarArchive(t, 5, 5, 30, 70, bigFileSize)
|
||||||
|
|
||||||
|
reader := bytes.NewReader(archData)
|
||||||
|
gzf, err := gzip.NewReader(reader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tarReader := tar.NewReader(gzf)
|
||||||
|
tarCache := NewTarCache(tarReader, 10, int64(bigFileSize))
|
||||||
|
|
||||||
|
var m sync.Map
|
||||||
|
content.fillMap(&m)
|
||||||
|
|
||||||
|
errCh := make(chan error, 100)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
break
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
contentReader, header, err := tarCache.NextPayload()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
go func(reader io.ReadCloser, head FileHeader) {
|
||||||
|
defer wg.Done()
|
||||||
|
defer func() {
|
||||||
|
if err := reader.Close(); err != nil {
|
||||||
|
errCh <- err
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
val, ok := m.Load(header.FilePath)
|
||||||
|
if !ok {
|
||||||
|
errCh <- errors.New(header.FilePath + " not found")
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
expected := val.([]byte)
|
||||||
|
content, err := io.ReadAll(reader)
|
||||||
|
if err != nil {
|
||||||
|
errCh <- err
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !bytes.Equal(expected, content) {
|
||||||
|
errCh <- errors.New(header.FilePath + " content mismatch")
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m.Delete(header.FilePath)
|
||||||
|
|
||||||
|
}(contentReader, header)
|
||||||
|
}
|
||||||
|
err = tarCache.Close()
|
||||||
|
wg.Wait()
|
||||||
|
close(errCh)
|
||||||
|
var err2 error
|
||||||
|
for e := range errCh {
|
||||||
|
if err2 == nil {
|
||||||
|
err2 = e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.NoError(t, err2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Checking for dictionary emptiness
|
||||||
|
var l int
|
||||||
|
m.Range(func(k, v interface{}) bool {
|
||||||
|
l++
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
require.Equal(t, 0, l)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTarCache(t *testing.T) {
|
||||||
|
t.Run("Successful case", func(t *testing.T) {
|
||||||
|
successfulCase(t)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Double close of the byte reader", func(t *testing.T) {
|
||||||
|
doubleCloseCaseHelper(t, func(reader io.Reader) bool {
|
||||||
|
e, ok := reader.(*helper)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
_, ok = e.Reader.(*bytes.Reader)
|
||||||
|
return ok
|
||||||
|
}, bytesReaderAlreadyCloseError.Error())
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Double close of the stream reader", func(t *testing.T) {
|
||||||
|
doubleCloseCaseHelper(t, func(reader io.Reader) bool {
|
||||||
|
e, ok := reader.(*helper)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
_, ok = e.Reader.(*bytes.Reader)
|
||||||
|
return !ok
|
||||||
|
}, streamReaderAlreadyCloseError.Error())
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Read error", func(t *testing.T) {
|
||||||
|
readErrorCase(t)
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue