[#8] Add extract command
Signed-off-by: Aleksey Kravchenko <al.kravchenko@yadro.com>
This commit is contained in:
parent
12b572e62a
commit
86592ef735
12 changed files with 2776 additions and 0 deletions
|
@ -23,6 +23,7 @@ import (
|
|||
_ "github.com/rclone/rclone/cmd/dedupe"
|
||||
_ "github.com/rclone/rclone/cmd/delete"
|
||||
_ "github.com/rclone/rclone/cmd/deletefile"
|
||||
_ "github.com/rclone/rclone/cmd/extract"
|
||||
_ "github.com/rclone/rclone/cmd/genautocomplete"
|
||||
_ "github.com/rclone/rclone/cmd/gendocs"
|
||||
_ "github.com/rclone/rclone/cmd/gitannex"
|
||||
|
|
|
@ -97,6 +97,7 @@ for more info.
|
|||
Run: func(command *cobra.Command, args []string) {
|
||||
|
||||
cmd.CheckArgs(2, 2, command, args)
|
||||
|
||||
fsrc, srcFileName, fdst := cmd.NewFsSrcFileDst(args)
|
||||
cmd.Run(true, true, command, func() error {
|
||||
if srcFileName == "" {
|
||||
|
|
62
cmd/extract/extract.go
Normal file
62
cmd/extract/extract.go
Normal file
|
@ -0,0 +1,62 @@
|
|||
// Package extract provides the extract command.
|
||||
package extract
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
|
||||
"github.com/rclone/rclone/cmd"
|
||||
"github.com/rclone/rclone/fs/config/flags"
|
||||
"github.com/rclone/rclone/fs/extract"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var (
|
||||
maxFileSizeForCache = 0x100000 // 1Mb
|
||||
)
|
||||
|
||||
func init() {
|
||||
cmd.Root.AddCommand(commandDefinition)
|
||||
cmdFlags := commandDefinition.Flags()
|
||||
flags.IntVarP(cmdFlags,
|
||||
&maxFileSizeForCache,
|
||||
"max-file-size-for-cache",
|
||||
"",
|
||||
maxFileSizeForCache,
|
||||
`Maximum file size that can be placed in the cache. Files of larger
|
||||
size will be read without caching, which will prevent parallelization
|
||||
of copy operations and may result in longer archive unpacking time.`,
|
||||
"")
|
||||
}
|
||||
|
||||
var commandDefinition = &cobra.Command{
|
||||
Use: "extract source:path/to/archive.tar.gz dest:path",
|
||||
Short: `Extract the tar.gz archive from source to the dest, skipping identical files.`,
|
||||
// Note: "|" will be replaced by backticks below
|
||||
Long: strings.ReplaceAll(`Extract the tar.gz archive files from the source to the destination. Does not transfer files that are
|
||||
identical on source and destination, testing by size and modification
|
||||
time or MD5SUM. Doesn't delete files from the destination.
|
||||
|
||||
If dest:path doesn't exist, it is created and the contents of the archive source:path/to/targz/archive
|
||||
go there.
|
||||
|
||||
**Note**: Use the |-P|/|--progress| flag to view real-time transfer statistics.
|
||||
|
||||
**Note**: Use the |--dry-run| or the |--interactive|/|-i| flag to test without copying anything.
|
||||
`, "|", "`"),
|
||||
Annotations: map[string]string{
|
||||
"groups": "Important",
|
||||
},
|
||||
Run: func(command *cobra.Command, args []string) {
|
||||
|
||||
cmd.CheckArgs(2, 2, command, args)
|
||||
fsrc, srcFileName, fdst := cmd.NewFsSrcFileDst(args)
|
||||
cmd.Run(true, true, command, func() error {
|
||||
if srcFileName == "" {
|
||||
return errors.New("the source is required to be a file")
|
||||
}
|
||||
return extract.Extract(context.Background(), fdst, fsrc, srcFileName, maxFileSizeForCache)
|
||||
})
|
||||
},
|
||||
}
|
280
fs/extract/dirtreeholder.go
Normal file
280
fs/extract/dirtreeholder.go
Normal file
|
@ -0,0 +1,280 @@
|
|||
package extract
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
// Object represents a file or directory.
|
||||
type Object struct {
|
||||
isDir bool
|
||||
data any
|
||||
}
|
||||
|
||||
// Dir represents a directory with objects.
|
||||
type Dir struct {
|
||||
name string
|
||||
objs map[string]Object
|
||||
dirs map[string]*Dir
|
||||
}
|
||||
|
||||
// NewDir creates a new directory.
|
||||
func NewDir(name string) *Dir {
|
||||
return &Dir{
|
||||
name: name,
|
||||
objs: make(map[string]Object),
|
||||
dirs: make(map[string]*Dir),
|
||||
}
|
||||
}
|
||||
|
||||
// Name returns the name of the directory.
|
||||
func (d *Dir) Name() string {
|
||||
return d.name
|
||||
}
|
||||
|
||||
// NewObject creates a new Object.
|
||||
func NewObject(isDir bool, data any) Object {
|
||||
return Object{
|
||||
isDir: isDir,
|
||||
data: data,
|
||||
}
|
||||
}
|
||||
|
||||
// GetDirByName returns a subdirectory with the given name
|
||||
func (d *Dir) GetDirByName(name string) *Dir {
|
||||
if d.name == name {
|
||||
return d
|
||||
}
|
||||
if len(name) <= len(d.name) || !strings.HasPrefix(name, d.name) {
|
||||
return nil
|
||||
}
|
||||
if d.name != "" && name[len(d.name)] != '/' {
|
||||
return nil
|
||||
}
|
||||
idx := strings.Index(name[len(d.name)+1:], "/")
|
||||
nextDir := name
|
||||
if idx != -1 {
|
||||
nextDir = name[:idx+len(d.name)+1]
|
||||
}
|
||||
if td, ok := d.dirs[nextDir]; ok {
|
||||
return td.GetDirByName(name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddChildDir adds a child directory
|
||||
func (d *Dir) AddChildDir(child *Dir) {
|
||||
d.dirs[child.name] = child
|
||||
}
|
||||
|
||||
// AddChildObject adds information about the child object
|
||||
func (d *Dir) AddChildObject(name string, child Object) {
|
||||
d.objs[name] = child
|
||||
}
|
||||
|
||||
type listDirFn func(dir string) (*Dir, error)
|
||||
|
||||
type dirTreeHolder struct {
|
||||
listF listDirFn
|
||||
|
||||
mu sync.Mutex
|
||||
// consider changing the variable type to fs.DirTree
|
||||
root *Dir
|
||||
dirsInWork map[string]*sync.WaitGroup
|
||||
}
|
||||
|
||||
type strDirIter struct {
|
||||
dir string
|
||||
idx int
|
||||
}
|
||||
|
||||
func newStrDirIter(dir string) *strDirIter {
|
||||
return &strDirIter{
|
||||
dir: dir,
|
||||
idx: -1,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *strDirIter) next() (cur string, next string) {
|
||||
if s.dir == "" {
|
||||
return "", ""
|
||||
}
|
||||
if s.idx == -1 {
|
||||
s.idx = 0
|
||||
idx := strings.Index(s.dir, "/")
|
||||
if idx == -1 {
|
||||
return "", s.dir
|
||||
}
|
||||
return "", s.dir[:idx]
|
||||
}
|
||||
idx := strings.Index(s.dir[s.idx:], "/")
|
||||
if idx == -1 {
|
||||
return s.dir, ""
|
||||
}
|
||||
defer func(idx int) {
|
||||
s.idx = s.idx + idx + 1
|
||||
}(idx)
|
||||
|
||||
idx2 := strings.Index(s.dir[s.idx+idx+1:], "/")
|
||||
if idx2 == -1 {
|
||||
return s.dir[:idx+s.idx], s.dir
|
||||
}
|
||||
return s.dir[:idx+s.idx], s.dir[:idx+idx2+s.idx+1]
|
||||
}
|
||||
|
||||
func (d *Dir) getObjectByPath(path string) (Object, bool) {
|
||||
var dirName string
|
||||
idx := strings.LastIndex(path, "/")
|
||||
if idx != -1 {
|
||||
dirName = path[:idx]
|
||||
}
|
||||
dir := d.GetDirByName(dirName)
|
||||
if dir == nil {
|
||||
return Object{}, false
|
||||
}
|
||||
obj, ok := dir.objs[path]
|
||||
return obj, ok
|
||||
}
|
||||
|
||||
func newDirStructHolder(listF listDirFn) *dirTreeHolder {
|
||||
return &dirTreeHolder{
|
||||
listF: listF,
|
||||
root: nil,
|
||||
dirsInWork: make(map[string]*sync.WaitGroup),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *dirTreeHolder) getObjectByPath(path string) (Object, bool) {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
if l.root == nil {
|
||||
return Object{}, false
|
||||
}
|
||||
return l.root.getObjectByPath(path)
|
||||
}
|
||||
|
||||
func (l *dirTreeHolder) isDirExist(path string) bool {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
if l.root == nil {
|
||||
return false
|
||||
}
|
||||
return l.root.GetDirByName(path) != nil
|
||||
}
|
||||
|
||||
func (l *dirTreeHolder) updateDirList(ctx context.Context, dir string) error {
|
||||
l.mu.Lock()
|
||||
iter := newStrDirIter(dir)
|
||||
aborting := func() bool {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
for {
|
||||
if aborting() {
|
||||
l.mu.Unlock()
|
||||
return ctx.Err()
|
||||
}
|
||||
curDir, nextDir := iter.next()
|
||||
if nextDir == "" {
|
||||
l.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
if wg, ok := l.dirsInWork[curDir]; ok {
|
||||
l.mu.Unlock()
|
||||
wg.Wait()
|
||||
if aborting() {
|
||||
return ctx.Err()
|
||||
}
|
||||
l.mu.Lock()
|
||||
}
|
||||
if curDir == "" && l.root == nil {
|
||||
if wg, ok := l.dirsInWork[curDir]; ok {
|
||||
l.mu.Unlock()
|
||||
wg.Wait()
|
||||
if aborting() {
|
||||
return ctx.Err()
|
||||
}
|
||||
l.mu.Lock()
|
||||
dir := l.root.GetDirByName(curDir)
|
||||
if dir == nil {
|
||||
l.mu.Unlock()
|
||||
return fmt.Errorf("error while updating info about dir %s", nextDir)
|
||||
}
|
||||
} else {
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
l.dirsInWork[curDir] = wg
|
||||
l.mu.Unlock()
|
||||
d, err := l.listF(curDir)
|
||||
|
||||
if err != nil {
|
||||
if !errors.Is(err, fs.ErrorDirNotFound) {
|
||||
return err
|
||||
}
|
||||
d = NewDir(curDir)
|
||||
}
|
||||
l.mu.Lock()
|
||||
l.root = d
|
||||
wg = l.dirsInWork[curDir]
|
||||
delete(l.dirsInWork, curDir)
|
||||
wg.Done()
|
||||
}
|
||||
}
|
||||
d := l.root.GetDirByName(curDir)
|
||||
if d == nil {
|
||||
return errors.New("not possible to go where")
|
||||
}
|
||||
if _, ok := d.dirs[nextDir]; ok {
|
||||
if ok {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if o, ok := d.objs[nextDir]; ok {
|
||||
// Where is no such directory
|
||||
if !o.isDir {
|
||||
l.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
if wg, ok := l.dirsInWork[nextDir]; ok {
|
||||
l.mu.Unlock()
|
||||
wg.Wait()
|
||||
if aborting() {
|
||||
return ctx.Err()
|
||||
}
|
||||
l.mu.Lock()
|
||||
dir := d.GetDirByName(nextDir)
|
||||
if dir == nil {
|
||||
l.mu.Unlock()
|
||||
return fmt.Errorf("error while updating info about dir %s", nextDir)
|
||||
}
|
||||
} else {
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
l.dirsInWork[nextDir] = wg
|
||||
l.mu.Unlock()
|
||||
td, err := l.listF(nextDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
l.mu.Lock()
|
||||
d.dirs[nextDir] = td
|
||||
wg = l.dirsInWork[nextDir]
|
||||
delete(l.dirsInWork, nextDir)
|
||||
wg.Done()
|
||||
}
|
||||
} else {
|
||||
l.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
321
fs/extract/dirtreeholder_test.go
Normal file
321
fs/extract/dirtreeholder_test.go
Normal file
|
@ -0,0 +1,321 @@
|
|||
package extract
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func dice(rnd *rand.Rand, percent int) bool {
|
||||
return rnd.Intn(100) < percent
|
||||
}
|
||||
|
||||
func randomFileName(rnd *rand.Rand) string {
|
||||
b := make([]byte, 16)
|
||||
rnd.Read(b)
|
||||
return hex.EncodeToString(b)
|
||||
}
|
||||
|
||||
func createDirTree(dir *Dir, deep, minItemsInDir, maxItemsInDir int) {
|
||||
if deep <= 0 {
|
||||
return
|
||||
}
|
||||
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir
|
||||
|
||||
for j := 0; j < nItem; j++ {
|
||||
isFile := deep == 1 || dice(rnd, 50)
|
||||
var prefix string
|
||||
if len(dir.name) > 0 {
|
||||
prefix = dir.name + "/"
|
||||
}
|
||||
if isFile {
|
||||
name := fmt.Sprintf("%sfile_%s", prefix, randomFileName(rnd))
|
||||
newItem := NewObject(false, name)
|
||||
dir.objs[name] = newItem
|
||||
} else {
|
||||
name := fmt.Sprintf("%sdir_%s", prefix, randomFileName(rnd))
|
||||
newItem := NewObject(true, name)
|
||||
dir.objs[name] = newItem
|
||||
childDir := NewDir(name)
|
||||
createDirTree(childDir, deep-1, minItemsInDir, maxItemsInDir)
|
||||
if len(childDir.dirs) != 0 || len(childDir.objs) != 0 {
|
||||
dir.dirs[childDir.name] = childDir
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type listFnHelper struct {
|
||||
mu sync.Mutex
|
||||
cnt atomic.Uint64
|
||||
root *Dir
|
||||
}
|
||||
|
||||
func (l *listFnHelper) Cnt() uint64 {
|
||||
return l.cnt.Load()
|
||||
}
|
||||
|
||||
func (l *listFnHelper) ResetCnt() {
|
||||
l.cnt.Store(0)
|
||||
}
|
||||
|
||||
func (l *listFnHelper) listDir(dir string) (*Dir, error) {
|
||||
l.cnt.Add(1)
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
d := l.root.GetDirByName(dir)
|
||||
if d == nil {
|
||||
return nil, os.ErrNotExist
|
||||
}
|
||||
newDir := NewDir(d.name)
|
||||
for path, child := range d.objs {
|
||||
newDir.AddChildObject(path, child)
|
||||
}
|
||||
return newDir, nil
|
||||
}
|
||||
|
||||
func fillSliceWithDirsInfo(s *[]string, dir *Dir) {
|
||||
for path, child := range dir.objs {
|
||||
if child.isDir {
|
||||
*s = append(*s, path)
|
||||
}
|
||||
}
|
||||
for _, child := range dir.dirs {
|
||||
fillSliceWithDirsInfo(s, child)
|
||||
}
|
||||
}
|
||||
|
||||
func fillMapWithFilesInfo(m map[string][]string, dir *Dir) {
|
||||
files := make([]string, 0)
|
||||
for path, child := range dir.objs {
|
||||
if !child.isDir {
|
||||
files = append(files, filepath.Base(path))
|
||||
}
|
||||
}
|
||||
|
||||
m[dir.Name()] = files
|
||||
|
||||
for _, child := range dir.dirs {
|
||||
fillMapWithFilesInfo(m, child)
|
||||
}
|
||||
}
|
||||
|
||||
func TestListing(t *testing.T) {
|
||||
t.Run("Dir struct holder", testDirTreeHolder)
|
||||
|
||||
t.Run("Dir", testDir)
|
||||
|
||||
t.Run("String directory iterating", testStrDirIter)
|
||||
}
|
||||
|
||||
func testDirTreeHolder(t *testing.T) {
|
||||
root := NewDir("")
|
||||
createDirTree(root, 6, 5, 10)
|
||||
listF := &listFnHelper{
|
||||
root: root,
|
||||
}
|
||||
t.Run("Concurrent listing", func(t *testing.T) {
|
||||
s := make([]string, 0)
|
||||
m := make(map[string][]string)
|
||||
fillSliceWithDirsInfo(&s, root)
|
||||
fillMapWithFilesInfo(m, root)
|
||||
sort.Slice(s, func(i, j int) bool {
|
||||
return len(s[i]) < len(s[j])
|
||||
})
|
||||
require.NotNil(t, root)
|
||||
holder := newDirStructHolder(listF.listDir)
|
||||
halfLen := len(s) / 2
|
||||
path := s[halfLen]
|
||||
expectedCnt := 0
|
||||
dIter := newStrDirIter(path)
|
||||
for {
|
||||
expectedCnt++
|
||||
_, next := dIter.next()
|
||||
if next == "" {
|
||||
break
|
||||
}
|
||||
}
|
||||
require.NotNil(t, holder)
|
||||
|
||||
eg, ctx := errgroup.WithContext(context.Background())
|
||||
eg.Go(func() error {
|
||||
return holder.updateDirList(ctx, path)
|
||||
})
|
||||
eg.Go(func() error {
|
||||
return holder.updateDirList(ctx, path)
|
||||
})
|
||||
eg.Go(func() error {
|
||||
return holder.updateDirList(ctx, path)
|
||||
})
|
||||
eg.Go(func() error {
|
||||
return holder.updateDirList(ctx, path+"not.exists")
|
||||
})
|
||||
err := eg.Wait()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(expectedCnt), listF.Cnt())
|
||||
dIter = newStrDirIter(path)
|
||||
var cur, next string
|
||||
next = "1"
|
||||
for next != "" {
|
||||
cur, next = dIter.next()
|
||||
if next == "" {
|
||||
break
|
||||
}
|
||||
require.True(t, holder.isDirExist(cur))
|
||||
files, ok := m[cur]
|
||||
require.True(t, ok)
|
||||
for _, file := range files {
|
||||
var filePath string
|
||||
if cur == "" {
|
||||
filePath = file
|
||||
} else {
|
||||
filePath = cur + "/" + file
|
||||
}
|
||||
var obj Object
|
||||
obj, ok = holder.getObjectByPath(filePath)
|
||||
if !ok {
|
||||
require.True(t, ok)
|
||||
}
|
||||
require.True(t, ok)
|
||||
require.Equal(t, filePath, obj.data.(string))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func testDir(t *testing.T) {
|
||||
finalDir := "path/with/more/than/one/dir"
|
||||
files := map[string][]string{
|
||||
"": {"file0.1.ext", "file0.2.ext"},
|
||||
"path": {"file1.1.ext", "file1.2.ext"},
|
||||
"path/with/more/than/one": {"file2.1.ext", "file2.2.ext"},
|
||||
"path/with/more/than/one/dir": {"file3.1.ext", "file3.2.ext"},
|
||||
}
|
||||
|
||||
dirIter := newStrDirIter(finalDir)
|
||||
var root, prevDir *Dir
|
||||
next := "1"
|
||||
for next != "" {
|
||||
var cur string
|
||||
cur, next = dirIter.next()
|
||||
dir := NewDir(cur)
|
||||
if root == nil {
|
||||
root = dir
|
||||
}
|
||||
if files, ok := files[cur]; ok {
|
||||
for _, file := range files {
|
||||
obj := NewObject(false, struct{}{})
|
||||
if cur == "" {
|
||||
dir.AddChildObject(file, obj)
|
||||
} else {
|
||||
dir.AddChildObject(cur+"/"+file, obj)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
if prevDir != nil {
|
||||
prevDir.AddChildDir(dir)
|
||||
prevDir.AddChildObject(dir.Name(), NewObject(true, struct{}{}))
|
||||
}
|
||||
prevDir = dir
|
||||
}
|
||||
|
||||
t.Run("GetDirByName", func(t *testing.T) {
|
||||
dirIter = newStrDirIter(finalDir)
|
||||
next = "1"
|
||||
cnt := 0
|
||||
for next != "" {
|
||||
var cur string
|
||||
cur, next = dirIter.next()
|
||||
dir := root.GetDirByName(cur)
|
||||
require.NotNil(t, dir)
|
||||
cnt++
|
||||
}
|
||||
require.Equal(t, 7, cnt)
|
||||
})
|
||||
|
||||
t.Run("getObjectByPath", func(t *testing.T) {
|
||||
for dirName, fileNames := range files {
|
||||
for _, fileName := range fileNames {
|
||||
var filePath string
|
||||
if dirName == "" {
|
||||
filePath = fileName
|
||||
} else {
|
||||
filePath = dirName + "/" + fileName
|
||||
}
|
||||
obj, ok := root.getObjectByPath(filePath)
|
||||
require.True(t, ok)
|
||||
require.False(t, obj.isDir)
|
||||
filePath += ".fail"
|
||||
_, ok = root.getObjectByPath(filePath)
|
||||
require.False(t, ok)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func testStrDirIter(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
dir string
|
||||
res []string
|
||||
}{
|
||||
{
|
||||
name: "path with more than one dir",
|
||||
dir: "path/with/more/than/one/dir",
|
||||
res: []string{
|
||||
"",
|
||||
"path",
|
||||
"path/with",
|
||||
"path/with/more",
|
||||
"path/with/more/than",
|
||||
"path/with/more/than/one",
|
||||
"path/with/more/than/one/dir",
|
||||
"",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "path with one dir",
|
||||
dir: "one_dir",
|
||||
res: []string{
|
||||
"",
|
||||
"one_dir",
|
||||
"",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "empty path",
|
||||
dir: "",
|
||||
res: []string{
|
||||
"",
|
||||
"",
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
l := newStrDirIter(tc.dir)
|
||||
for i, p := range tc.res {
|
||||
if p == "" && i > 0 {
|
||||
break
|
||||
}
|
||||
cur, next := l.next()
|
||||
require.Equal(t, cur, p)
|
||||
require.Equal(t, next, tc.res[i+1])
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
780
fs/extract/extract.go
Normal file
780
fs/extract/extract.go
Normal file
|
@ -0,0 +1,780 @@
|
|||
// Package extract is the implementation of extract command
|
||||
package extract
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/extract/tarcache"
|
||||
"github.com/rclone/rclone/fs/filter"
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
"github.com/rclone/rclone/fs/list"
|
||||
"github.com/rclone/rclone/fs/operations"
|
||||
)
|
||||
|
||||
type payloadType int
|
||||
|
||||
const (
|
||||
payloadTypeBytes payloadType = iota
|
||||
payloadTypeIo
|
||||
)
|
||||
|
||||
type TarCacheFile struct {
|
||||
owner *TarCache
|
||||
pt payloadType
|
||||
fileInfo tarcache.FileInfo
|
||||
reader io.Reader
|
||||
}
|
||||
|
||||
func (t *TarCacheFile) Release() {
|
||||
t.fileInfo.ReleaseFn()
|
||||
}
|
||||
func (t *TarCacheFile) Read(p []byte) (n int, err error) {
|
||||
return t.reader.Read(p)
|
||||
}
|
||||
|
||||
func (t *TarCacheFile) reopenIo(ctx context.Context) (err error) {
|
||||
return t.owner.requestReopenFile(ctx, t)
|
||||
}
|
||||
|
||||
func (t *TarCacheFile) reopenBytes(ctx context.Context) (err error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
t.reader = bytes.NewReader(t.fileInfo.Payload)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TarCacheFile) Reopen(ctx context.Context) (err error) {
|
||||
if t.pt == payloadTypeBytes {
|
||||
return t.reopenBytes(ctx)
|
||||
}
|
||||
return t.reopenIo(ctx)
|
||||
}
|
||||
|
||||
func (t *TarCache) requestReopenFile(ctx context.Context, file *TarCacheFile) error {
|
||||
reopenReq := reopenRequest{
|
||||
file: file,
|
||||
respCh: make(chan error, 1),
|
||||
}
|
||||
|
||||
select {
|
||||
case t.reopenReqCh <- reopenReq:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-t.finishCh:
|
||||
return errTarCacheIsClosed
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-reopenReq.respCh:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-t.finishCh:
|
||||
return errTarCacheIsClosed
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TarCache) newTarCacheFile(info tarcache.FileInfo) *TarCacheFile {
|
||||
res := &TarCacheFile{
|
||||
owner: t,
|
||||
pt: payloadTypeIo,
|
||||
fileInfo: info,
|
||||
reader: info.IoPayload,
|
||||
}
|
||||
if info.IoPayload == nil {
|
||||
res.pt = payloadTypeBytes
|
||||
res.reader = bytes.NewReader(info.Payload)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
type TarCache struct {
|
||||
// parameters
|
||||
fsrc fs.Fs
|
||||
srcFileName string
|
||||
newTarReaderFn newTarReader
|
||||
transfers int
|
||||
maxFileSizeForCache int
|
||||
options []fs.OpenOption
|
||||
|
||||
// no thread safe internal state
|
||||
closeStreamFn func() error
|
||||
tarCache *tarcache.Cache
|
||||
workerCh chan func()
|
||||
reopenReqCh chan reopenRequest
|
||||
finishCh chan struct{}
|
||||
pendingCloseCache []*tarcache.Cache
|
||||
|
||||
// thread safe internal state
|
||||
mu sync.Mutex
|
||||
cache map[string]tarcache.FileInfo
|
||||
}
|
||||
|
||||
func newTarCache(fsrc fs.Fs, srcFileName string, newTarReaderFn newTarReader, transfers int, maxFileSizeForCache int) *TarCache {
|
||||
res := &TarCache{
|
||||
fsrc: fsrc,
|
||||
srcFileName: srcFileName,
|
||||
newTarReaderFn: newTarReaderFn,
|
||||
transfers: transfers,
|
||||
maxFileSizeForCache: maxFileSizeForCache,
|
||||
workerCh: make(chan func()),
|
||||
reopenReqCh: make(chan reopenRequest),
|
||||
finishCh: make(chan struct{}),
|
||||
cache: make(map[string]tarcache.FileInfo),
|
||||
}
|
||||
|
||||
if res.newTarReaderFn == nil {
|
||||
res.newTarReaderFn = func(reader io.Reader) tarcache.TarReader {
|
||||
return tar.NewReader(reader)
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
for workFn := range res.workerCh {
|
||||
workFn()
|
||||
}
|
||||
}()
|
||||
return res
|
||||
}
|
||||
|
||||
func (t *TarCache) Close() error {
|
||||
close(t.workerCh)
|
||||
err := t.closeHelper()
|
||||
if t.tarCache != nil {
|
||||
_ = t.tarCache.Close()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
type reopenRequest struct {
|
||||
file *TarCacheFile
|
||||
respCh chan error
|
||||
}
|
||||
|
||||
var errReopenArchiveRequest = errors.New("reopen archive request")
|
||||
var errTarCacheIsClosed = errors.New("tar cache is closed")
|
||||
|
||||
func (t *TarCache) handleReopenRequest(ctx context.Context, reopenReq reopenRequest) {
|
||||
defer close(reopenReq.respCh)
|
||||
t.pendingCloseCache = append(t.pendingCloseCache, t.tarCache)
|
||||
_ = t.closeHelper()
|
||||
reopenReq.file.Release()
|
||||
var err error
|
||||
prev := t.tarCache
|
||||
_ = prev.Close()
|
||||
t.tarCache, t.closeStreamFn, err = t.openHelper(ctx)
|
||||
if err != nil {
|
||||
reopenReq.respCh <- err
|
||||
return
|
||||
}
|
||||
err = t.FindFile(ctx, reopenReq.file.fileInfo.FileInfo.FilePath)
|
||||
if err == nil {
|
||||
file := t.GetFile(reopenReq.file.fileInfo.FileInfo.FilePath)
|
||||
if file == nil {
|
||||
panic("shouldn't get here")
|
||||
}
|
||||
*reopenReq.file = *file
|
||||
}
|
||||
reopenReq.respCh <- err
|
||||
}
|
||||
|
||||
func (t *TarCache) FindFile(ctx context.Context, filePath string) error {
|
||||
for {
|
||||
select {
|
||||
case <-t.finishCh:
|
||||
return errTarCacheIsClosed
|
||||
default:
|
||||
}
|
||||
|
||||
h, err := t.tarCache.Next(ctx)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if h.FileInfo.FilePath != filePath {
|
||||
h.ReleaseFn()
|
||||
continue
|
||||
}
|
||||
|
||||
t.mu.Lock()
|
||||
t.cache[h.FileInfo.FilePath] = h
|
||||
t.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TarCache) GetFile(filePath string) *TarCacheFile {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
info, ok := t.cache[filePath]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return t.newTarCacheFile(info)
|
||||
}
|
||||
|
||||
func (t *TarCache) ReleaseUnprocessedFiles() {
|
||||
t.mu.Lock()
|
||||
// Here we are preparing a list of functions that release resources, because we cannot call them when
|
||||
// iterating over the map, since the ReleaseFn method also uses the same mutex that protects the map.
|
||||
releaseFns := make([]func(), 0, len(t.cache))
|
||||
for _, info := range t.cache {
|
||||
releaseFns = append(releaseFns, info.ReleaseFn)
|
||||
}
|
||||
t.cache = make(map[string]tarcache.FileInfo)
|
||||
t.mu.Unlock()
|
||||
for _, fn := range releaseFns {
|
||||
fn()
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TarCache) Next(ctx context.Context) (string, error) {
|
||||
select {
|
||||
case <-t.finishCh:
|
||||
return "", errTarCacheIsClosed
|
||||
default:
|
||||
}
|
||||
for {
|
||||
watchdog := make(chan struct{})
|
||||
var reopenReq reopenRequest
|
||||
ctx2, cancel := context.WithCancelCause(ctx)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
tmp := atomic.Bool{}
|
||||
t.workerCh <- func() {
|
||||
defer wg.Done()
|
||||
select {
|
||||
case <-watchdog:
|
||||
case reopenReq = <-t.reopenReqCh:
|
||||
tmp.Store(true)
|
||||
cancel(errReopenArchiveRequest)
|
||||
}
|
||||
}
|
||||
h, err := t.tarCache.Next(ctx2)
|
||||
relFn := h.ReleaseFn
|
||||
h.ReleaseFn = func() {
|
||||
t.mu.Lock()
|
||||
delete(t.cache, h.FileInfo.FilePath)
|
||||
t.mu.Unlock()
|
||||
relFn()
|
||||
}
|
||||
close(watchdog)
|
||||
wg.Wait()
|
||||
for _, cache := range t.pendingCloseCache {
|
||||
_ = cache.Close()
|
||||
}
|
||||
t.pendingCloseCache = nil
|
||||
if err != nil {
|
||||
if errors.Is(context.Cause(ctx2), errReopenArchiveRequest) {
|
||||
t.handleReopenRequest(ctx, reopenReq)
|
||||
continue
|
||||
}
|
||||
cancel(nil)
|
||||
close(t.finishCh)
|
||||
if err == io.EOF {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
return "", err
|
||||
}
|
||||
|
||||
cancel(nil)
|
||||
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
_, ok := t.cache[h.FileInfo.FilePath]
|
||||
if ok {
|
||||
panic("shouldn't get here")
|
||||
}
|
||||
t.cache[h.FileInfo.FilePath] = h
|
||||
|
||||
return h.FileInfo.FilePath, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TarCache) closeHelper() error {
|
||||
if t.closeStreamFn != nil {
|
||||
fn := t.closeStreamFn
|
||||
t.closeStreamFn = nil
|
||||
return fn()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TarCache) openHelper(ctx context.Context) (cache *tarcache.Cache, closeFn func() error, err error) {
|
||||
obj, err := t.fsrc.NewObject(ctx, t.srcFileName)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
reader, err := obj.Open(ctx, t.options...)
|
||||
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
gzr, err := gzip.NewReader(reader)
|
||||
if err != nil {
|
||||
// we don't care about the error, we just need to close the reader
|
||||
_ = reader.Close()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
tarReader := t.newTarReaderFn(gzr)
|
||||
cache = tarcache.NewTarCache(tarReader, t.transfers, int64(t.maxFileSizeForCache))
|
||||
return cache, func() error {
|
||||
gzErr := gzr.Close()
|
||||
readerErr := reader.Close()
|
||||
if gzErr != nil {
|
||||
return gzErr
|
||||
}
|
||||
return readerErr
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (t *TarCache) Open(ctx context.Context, options ...fs.OpenOption) error {
|
||||
t.options = options
|
||||
var err error
|
||||
t.tarCache, t.closeStreamFn, err = t.openHelper(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
type copyJob struct {
|
||||
src fs.Object
|
||||
dst fs.Object
|
||||
path string
|
||||
}
|
||||
|
||||
func makeListDir(ctx context.Context, f fs.Fs) listDirFn {
|
||||
return func(dir string) (*Dir, error) {
|
||||
dirCtx := filter.SetUseFilter(ctx, f.Features().FilterAware) // make filter-aware backends constrain List
|
||||
entities, err := list.DirSorted(dirCtx, f, false, dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res := NewDir(dir)
|
||||
for _, entry := range entities {
|
||||
if obj, ok := entry.(fs.Object); ok {
|
||||
o := NewObject(false, obj)
|
||||
res.AddChildObject(obj.Remote(), o)
|
||||
}
|
||||
if d, ok := entry.(fs.Directory); ok {
|
||||
o := NewObject(true, nil)
|
||||
res.AddChildObject(d.Remote(), o)
|
||||
}
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
}
|
||||
|
||||
type extractObj struct {
|
||||
ownerFs fs.Fs
|
||||
isFirstOpen atomic.Bool
|
||||
file *TarCacheFile
|
||||
}
|
||||
|
||||
func (e *extractFs) newExtractObj(file *TarCacheFile) *extractObj {
|
||||
res := &extractObj{
|
||||
ownerFs: e,
|
||||
file: file,
|
||||
}
|
||||
res.isFirstOpen.Store(true)
|
||||
return res
|
||||
}
|
||||
|
||||
func (o *extractObj) Release() {
|
||||
o.file.Release()
|
||||
}
|
||||
|
||||
type extractFs struct {
|
||||
// parameters
|
||||
fsrc fs.Fs
|
||||
srcFileName string
|
||||
features *fs.Features
|
||||
// internal state
|
||||
tarCache *TarCache
|
||||
}
|
||||
|
||||
type extract struct {
|
||||
fdst fs.Fs
|
||||
fsrc fs.Fs
|
||||
extrFs *extractFs
|
||||
srcFileName string
|
||||
maxFileSizeForCache int
|
||||
hashType hash.Type // common hash to use
|
||||
hashOption *fs.HashesOption // open option for the common hash
|
||||
// internal state
|
||||
ci *fs.ConfigInfo // global config
|
||||
ctx context.Context // internal context for controlling go-routines
|
||||
cancel func() // cancel the context
|
||||
maxDurationEndTime time.Time // end time if --max-duration is set
|
||||
toBeChecked chan string // checkers channel
|
||||
checkersWg sync.WaitGroup // wait for checkers
|
||||
toBeUploaded chan copyJob // copiers channels
|
||||
transfersWg sync.WaitGroup // wait for transfers
|
||||
newTarReaderFn newTarReader
|
||||
|
||||
mu sync.Mutex
|
||||
lastError error
|
||||
errCnt atomic.Int32
|
||||
dsh *dirTreeHolder
|
||||
}
|
||||
|
||||
func (o *extractObj) Fs() fs.Info {
|
||||
return o.ownerFs
|
||||
}
|
||||
|
||||
func (o *extractObj) String() string {
|
||||
if o == nil {
|
||||
return "<nil>"
|
||||
}
|
||||
return o.Remote()
|
||||
}
|
||||
|
||||
func (o *extractObj) Remote() string {
|
||||
return o.file.fileInfo.FileInfo.FilePath
|
||||
}
|
||||
|
||||
func (o *extractObj) ModTime(ctx context.Context) time.Time {
|
||||
return o.file.fileInfo.FileInfo.ModTime
|
||||
}
|
||||
|
||||
func (o *extractObj) Size() int64 {
|
||||
return o.file.fileInfo.FileInfo.Size
|
||||
}
|
||||
|
||||
func (o *extractObj) Hash(ctx context.Context, ty hash.Type) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func (o *extractObj) Storable() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (o *extractObj) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
|
||||
if o.isFirstOpen.Load() {
|
||||
o.isFirstOpen.Store(false)
|
||||
|
||||
return io.NopCloser(o.file), nil
|
||||
}
|
||||
err := o.file.Reopen(ctx)
|
||||
return io.NopCloser(o.file), err
|
||||
}
|
||||
|
||||
func (o *extractObj) SetModTime(ctx context.Context, t time.Time) error {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (o *extractObj) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (o *extractObj) Remove(ctx context.Context) error {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (e *extractFs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (e *extractFs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (e *extractFs) Mkdir(ctx context.Context, dir string) error {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (e *extractFs) Rmdir(ctx context.Context, dir string) error {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (e *extractFs) Name() string {
|
||||
return "extract"
|
||||
}
|
||||
|
||||
func (e *extractFs) Root() string {
|
||||
return fmt.Sprintf("%s:%s/%s", e.fsrc.Name(), e.fsrc.Root(), e.srcFileName)
|
||||
}
|
||||
|
||||
func (e *extractFs) String() string {
|
||||
return fmt.Sprintf("tar.gz archive %s at %s", e.Name(), e.Root())
|
||||
}
|
||||
|
||||
func (e *extractFs) Precision() time.Duration {
|
||||
//TODO: change this
|
||||
return time.Second
|
||||
}
|
||||
|
||||
func (e *extractFs) Hashes() hash.Set {
|
||||
return hash.Set(hash.None)
|
||||
}
|
||||
|
||||
func (e *extractFs) Features() *fs.Features {
|
||||
return e.features
|
||||
}
|
||||
|
||||
func (e *extractFs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
file := e.tarCache.GetFile(remote)
|
||||
if file == nil {
|
||||
panic("cache MUST contain information about this file")
|
||||
}
|
||||
|
||||
return e.newExtractObj(file), nil
|
||||
}
|
||||
|
||||
type newTarReader func(io.Reader) tarcache.TarReader
|
||||
|
||||
func newExtract(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, srcFileName string, maxFileSizeForCache int, newTarReaderFn newTarReader) *extract {
|
||||
ci := fs.GetConfig(ctx)
|
||||
|
||||
s := &extract{
|
||||
ctx: ctx,
|
||||
ci: ci,
|
||||
fdst: fdst,
|
||||
fsrc: fsrc,
|
||||
srcFileName: srcFileName,
|
||||
maxFileSizeForCache: maxFileSizeForCache,
|
||||
newTarReaderFn: newTarReaderFn,
|
||||
toBeChecked: make(chan string, ci.Transfers),
|
||||
toBeUploaded: make(chan copyJob, ci.Transfers),
|
||||
}
|
||||
if ci.MaxDuration > 0 {
|
||||
s.maxDurationEndTime = time.Now().Add(ci.MaxDuration)
|
||||
fs.Infof(s.fdst, "Transfer session %v deadline: %s", ci.CutoffMode, s.maxDurationEndTime.Format("2006/01/02 15:04:05"))
|
||||
}
|
||||
|
||||
// If a max session duration has been defined add a deadline
|
||||
// to the main context if cutoff mode is hard. This will cut
|
||||
// the transfers off.
|
||||
if !s.maxDurationEndTime.IsZero() && ci.CutoffMode == fs.CutoffModeHard {
|
||||
s.ctx, s.cancel = context.WithDeadline(ctx, s.maxDurationEndTime)
|
||||
} else {
|
||||
s.ctx, s.cancel = context.WithCancel(ctx)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// transfer copy job.scr to job.path.
|
||||
func (e *extract) transfer(ctx context.Context, in <-chan copyJob) {
|
||||
defer e.transfersWg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case job, ok := <-in:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
obj := job.src.(*extractObj)
|
||||
|
||||
_, err := operations.Copy(ctx, e.fdst, job.dst, job.path, job.src)
|
||||
|
||||
obj.Release()
|
||||
|
||||
if err != nil {
|
||||
e.errCnt.Add(1)
|
||||
e.setLastError(err)
|
||||
e.cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runTransfers starts transfers
|
||||
func (e *extract) runTransfers(ctx context.Context, in <-chan copyJob) {
|
||||
for i := 0; i < e.ci.Transfers; i++ {
|
||||
e.transfersWg.Add(1)
|
||||
go e.transfer(ctx, in)
|
||||
}
|
||||
}
|
||||
|
||||
// stopTransfers stops all transfers and waits for them to finish
|
||||
func (e *extract) stopTransfers() {
|
||||
close(e.toBeUploaded)
|
||||
fs.Debugf(e.fdst, "Waiting for transfers to finish")
|
||||
e.transfersWg.Wait()
|
||||
}
|
||||
|
||||
func (e *extract) setLastError(err error) {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
if e.lastError == nil {
|
||||
e.lastError = err
|
||||
}
|
||||
}
|
||||
|
||||
func (e *extract) getLastError() error {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
return e.lastError
|
||||
}
|
||||
|
||||
// checker checks if filePath exists in fdst and depending on the result, prepare the task for transfer
|
||||
func (e *extract) checker(ctx context.Context, in <-chan string, out chan<- copyJob) {
|
||||
defer e.checkersWg.Done()
|
||||
for {
|
||||
var filePath string
|
||||
var ok bool
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case filePath, ok = <-in:
|
||||
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
idx := strings.LastIndex(filePath, "/")
|
||||
var dir string
|
||||
if idx != -1 {
|
||||
dir = filePath[:idx]
|
||||
}
|
||||
err := e.dsh.updateDirList(ctx, dir)
|
||||
if err != nil {
|
||||
// TODO: think about retries
|
||||
e.cancel()
|
||||
return
|
||||
}
|
||||
src, err := e.extrFs.NewObject(ctx, filePath)
|
||||
if err != nil {
|
||||
e.setLastError(err)
|
||||
e.cancel()
|
||||
return
|
||||
}
|
||||
|
||||
job := copyJob{
|
||||
src: src,
|
||||
path: filePath,
|
||||
}
|
||||
if objInfo, ok := e.dsh.getObjectByPath(filePath); ok && !objInfo.isDir {
|
||||
if dst := objInfo.data.(fs.Object); dst != nil {
|
||||
job.dst = dst
|
||||
}
|
||||
}
|
||||
out <- job
|
||||
}
|
||||
}
|
||||
|
||||
// runCheckers starts checkers
|
||||
func (e *extract) runCheckers(ctx context.Context, in <-chan string, out chan<- copyJob) {
|
||||
for i := 0; i < e.ci.Transfers; i++ {
|
||||
e.checkersWg.Add(1)
|
||||
go e.checker(ctx, in, out)
|
||||
}
|
||||
}
|
||||
|
||||
// stopTransfers stops all checkers and waits for them to finish
|
||||
func (e *extract) stopCheckers() {
|
||||
close(e.toBeChecked)
|
||||
fs.Debugf(e.fdst, "Waiting for checkers to finish")
|
||||
e.checkersWg.Wait()
|
||||
}
|
||||
|
||||
func newExtractFs(ctx context.Context, fsrc fs.Fs, srcFileName string, transfers int, maxFileSizeForCache int, newTarReaderFn newTarReader) *extractFs {
|
||||
res := &extractFs{
|
||||
fsrc: fsrc,
|
||||
srcFileName: srcFileName,
|
||||
tarCache: newTarCache(fsrc, srcFileName, newTarReaderFn, transfers, maxFileSizeForCache),
|
||||
}
|
||||
res.features = (&fs.Features{
|
||||
CanHaveEmptyDirectories: false,
|
||||
}).Fill(ctx, res)
|
||||
return res
|
||||
}
|
||||
|
||||
// Open opens an archive from the source filesystem
|
||||
func (e *extractFs) Open(ctx context.Context, options ...fs.OpenOption) error {
|
||||
return e.tarCache.Open(ctx, options...)
|
||||
}
|
||||
|
||||
func (e *extractFs) Close() error {
|
||||
return e.tarCache.Close()
|
||||
}
|
||||
|
||||
func (e *extractFs) AddNextObject(ctx context.Context) (string, error) {
|
||||
return e.tarCache.Next(ctx)
|
||||
}
|
||||
|
||||
func (e *extract) run() error {
|
||||
fExtr := newExtractFs(e.ctx, e.fsrc, e.srcFileName, e.ci.Transfers, e.maxFileSizeForCache, e.newTarReaderFn)
|
||||
|
||||
e.dsh = newDirStructHolder(makeListDir(e.ctx, e.fdst))
|
||||
e.extrFs = fExtr
|
||||
|
||||
e.hashType, e.hashOption = operations.CommonHash(e.ctx, fExtr, e.fdst)
|
||||
|
||||
// Options for the download
|
||||
downloadOptions := []fs.OpenOption{e.hashOption}
|
||||
for _, option := range e.ci.DownloadHeaders {
|
||||
downloadOptions = append(downloadOptions, option)
|
||||
}
|
||||
|
||||
err := fExtr.Open(e.ctx, downloadOptions...)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.toBeChecked = make(chan string, e.ci.Transfers)
|
||||
e.toBeUploaded = make(chan copyJob, e.ci.Transfers)
|
||||
|
||||
e.runTransfers(e.ctx, e.toBeUploaded)
|
||||
e.runCheckers(e.ctx, e.toBeChecked, e.toBeUploaded)
|
||||
|
||||
var exitErr error
|
||||
|
||||
for {
|
||||
path, err := fExtr.AddNextObject(e.ctx)
|
||||
if err != nil {
|
||||
exitErr = err
|
||||
break
|
||||
}
|
||||
if path == "" {
|
||||
break
|
||||
}
|
||||
|
||||
e.toBeChecked <- path
|
||||
}
|
||||
|
||||
e.stopCheckers()
|
||||
e.stopTransfers()
|
||||
fExtr.tarCache.ReleaseUnprocessedFiles()
|
||||
if err := fExtr.Close(); err != nil {
|
||||
if exitErr == nil {
|
||||
exitErr = err
|
||||
}
|
||||
}
|
||||
|
||||
lastError := e.getLastError()
|
||||
if lastError != nil {
|
||||
return lastError
|
||||
}
|
||||
return exitErr
|
||||
}
|
||||
|
||||
func Extract(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, srcFileName string, maxFileSizeForCache int) error {
|
||||
do := newExtract(ctx, fdst, fsrc, srcFileName, maxFileSizeForCache, nil)
|
||||
return do.run()
|
||||
}
|
383
fs/extract/extract_test.go
Normal file
383
fs/extract/extract_test.go
Normal file
|
@ -0,0 +1,383 @@
|
|||
package extract
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/rclone/rclone/backend/memory"
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/extract/tarcache"
|
||||
"github.com/rclone/rclone/fs/extract/tarcache/tartest"
|
||||
"github.com/rclone/rclone/fs/fserrors"
|
||||
"github.com/rclone/rclone/fstest/mockobject"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type FailObj struct {
|
||||
fs.Object
|
||||
fscr *FailFs
|
||||
}
|
||||
|
||||
type FailFs struct {
|
||||
fs.Fs
|
||||
failFile string
|
||||
maxFails atomic.Int32
|
||||
totalFails atomic.Int32
|
||||
}
|
||||
|
||||
type FailReadCloser struct {
|
||||
io.ReadCloser
|
||||
}
|
||||
|
||||
type FailTarReader struct {
|
||||
*tar.Reader
|
||||
failFile string
|
||||
maxFails atomic.Int32
|
||||
totalFails atomic.Int32
|
||||
|
||||
currentHeader *tar.Header
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (f *FailTarReader) UpdateTarReader(r *tar.Reader) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
f.Reader = r
|
||||
}
|
||||
|
||||
func (f *FailTarReader) tarReader() *tar.Reader {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
return f.Reader
|
||||
}
|
||||
|
||||
func (f *FailTarReader) Read(p []byte) (n int, err error) {
|
||||
if f.currentHeader == nil {
|
||||
return f.tarReader().Read(p)
|
||||
}
|
||||
if f.currentHeader.Name == f.failFile {
|
||||
if f.isNeedToFail() {
|
||||
f.decReadWriteFailAttempts()
|
||||
return 0, fserrors.RetryError(io.ErrNoProgress)
|
||||
}
|
||||
}
|
||||
return f.tarReader().Read(p)
|
||||
}
|
||||
|
||||
func (f *FailTarReader) Next() (*tar.Header, error) {
|
||||
h, err := f.Reader.Next()
|
||||
if err != nil {
|
||||
return h, err
|
||||
}
|
||||
f.currentHeader = h
|
||||
return h, nil
|
||||
}
|
||||
|
||||
func (f *FailTarReader) decReadWriteFailAttempts() {
|
||||
f.maxFails.Add(-1)
|
||||
f.totalFails.Add(1)
|
||||
}
|
||||
|
||||
func (f *FailTarReader) isNeedToFail() bool {
|
||||
return f.maxFails.Load() > 0
|
||||
}
|
||||
|
||||
func newFailTarReader(t *tar.Reader, failFile string, maxFails int) *FailTarReader {
|
||||
res := &FailTarReader{
|
||||
Reader: t,
|
||||
failFile: failFile,
|
||||
}
|
||||
res.maxFails.Store(int32(maxFails))
|
||||
return res
|
||||
}
|
||||
|
||||
func NewFailReadCloser(rc io.ReadCloser) *FailReadCloser {
|
||||
return &FailReadCloser{rc}
|
||||
}
|
||||
|
||||
func (r *FailReadCloser) Read([]byte) (int, error) {
|
||||
return 0, fserrors.RetryError(io.ErrNoProgress)
|
||||
}
|
||||
|
||||
func NewFailObj(f fs.Object, failFs *FailFs) *FailObj {
|
||||
return &FailObj{f, failFs}
|
||||
}
|
||||
|
||||
func (o *FailObj) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
|
||||
rc, err := o.Object.Open(ctx, options...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if o.fscr.IsNeedToFail() {
|
||||
o.fscr.DecReadWriteFailAttempts()
|
||||
return NewFailReadCloser(rc), nil
|
||||
}
|
||||
|
||||
return rc, nil
|
||||
}
|
||||
|
||||
func (o *FailObj) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||
if o.fscr.IsNeedToFail() {
|
||||
o.fscr.DecReadWriteFailAttempts()
|
||||
return fserrors.RetryError(io.ErrNoProgress)
|
||||
}
|
||||
return o.Object.Update(ctx, in, src, options...)
|
||||
}
|
||||
|
||||
func NewFailFs(f fs.Fs, failFile string, maxFails int) *FailFs {
|
||||
res := &FailFs{Fs: f, failFile: failFile}
|
||||
res.maxFails.Store(int32(maxFails))
|
||||
return res
|
||||
}
|
||||
|
||||
func (f *FailFs) IsNeedToFail() bool {
|
||||
return f.maxFails.Load() > 0
|
||||
}
|
||||
|
||||
func (f *FailFs) TotalFails() int {
|
||||
return int(f.totalFails.Load())
|
||||
}
|
||||
|
||||
func (f *FailFs) DecReadWriteFailAttempts() {
|
||||
f.maxFails.Add(-1)
|
||||
f.totalFails.Add(1)
|
||||
}
|
||||
|
||||
func (f *FailFs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
||||
retFs, err := f.Fs.NewObject(ctx, remote)
|
||||
if f.failFile != remote || err != nil {
|
||||
return retFs, err
|
||||
}
|
||||
|
||||
fmt.Println("new object", remote)
|
||||
|
||||
return NewFailObj(retFs, f), nil
|
||||
}
|
||||
|
||||
func (f *FailFs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||
if src.Remote() == f.failFile {
|
||||
if f.IsNeedToFail() {
|
||||
f.DecReadWriteFailAttempts()
|
||||
return nil, fserrors.RetryError(io.ErrNoProgress)
|
||||
}
|
||||
}
|
||||
return f.Fs.Put(ctx, in, src, options...)
|
||||
}
|
||||
|
||||
// test cases
|
||||
// 1. Happy path: extract all files from archive to a directory (done);
|
||||
// 2. Fail once when writing a file to the target directory and write it successfully when writing the same file again;
|
||||
// 3. Fail all attempts to write a file to the target directory;
|
||||
// 4. Fail once when reading a file from the archive, but read it successfully the second time;
|
||||
// 5. Fail all attempts to read a file from the archive;
|
||||
|
||||
func TestExtract(t *testing.T) {
|
||||
t.Run("Happy path", testHappyPath)
|
||||
|
||||
t.Run("Double error while writing a small file to a remote server.", testFdstFailSmallFile)
|
||||
t.Run("Double error while writing a large file to a remote server.", testFdstFailLargeFile)
|
||||
|
||||
t.Run("Multiple errors while writing a small file to a remote server.", testFdstFail2SmallFile)
|
||||
t.Run("Multiple errors while writing a large file to a remote server.", testFdstFail2LargeFile)
|
||||
|
||||
t.Run("Double errors while reading a small file from an archive.", sdstFailSmallFile)
|
||||
t.Run("Double errors while reading a large file from an archive.", sdstFailLargeFile)
|
||||
}
|
||||
|
||||
func helperTestFdstFail2(t *testing.T, isStream bool) {
|
||||
bigFileSize := 2000
|
||||
archiveFile := "archive.tar.gz"
|
||||
ctx, srcFs, dstFs, rootItem := prepareSrcAndDstFs(t, fmt.Sprintf("src:dir%t", isStream), fmt.Sprintf("dst:dir%t", isStream), archiveFile, bigFileSize)
|
||||
|
||||
skipFiles := 10
|
||||
filePath := rootItem.FindFilePath(&skipFiles, bigFileSize, !isStream)
|
||||
require.NotEqual(t, "", filePath)
|
||||
expectedFalls := 100
|
||||
newDstFs := NewFailFs(dstFs, filePath, expectedFalls)
|
||||
// Extract archive to dst fs
|
||||
err := Extract(ctx, newDstFs, srcFs, archiveFile, bigFileSize)
|
||||
require.Error(t, err)
|
||||
require.Greater(t, newDstFs.TotalFails(), 2)
|
||||
}
|
||||
|
||||
func testFdstFail2SmallFile(t *testing.T) {
|
||||
helperTestFdstFail2(t, false)
|
||||
}
|
||||
|
||||
func testFdstFail2LargeFile(t *testing.T) {
|
||||
helperTestFdstFail2(t, true)
|
||||
}
|
||||
|
||||
func helperTestFdstFail(t *testing.T, isStream bool) {
|
||||
bigFileSize := 2000
|
||||
archiveFile := "archive.tar.gz"
|
||||
ctx, srcFs, dstFs, rootItem := prepareSrcAndDstFs(t, fmt.Sprintf("src:dir%t", isStream), fmt.Sprintf("dst:dir%t", isStream), archiveFile, bigFileSize)
|
||||
|
||||
skipFiles := 10
|
||||
filePath := rootItem.FindFilePath(&skipFiles, bigFileSize, !isStream)
|
||||
require.NotEqual(t, "", filePath)
|
||||
expectedFalls := 2
|
||||
newDstFs := NewFailFs(dstFs, filePath, expectedFalls)
|
||||
// Extract archive to dst fs
|
||||
err := Extract(ctx, newDstFs, srcFs, archiveFile, bigFileSize)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedFalls, newDstFs.TotalFails())
|
||||
ll := make(map[string]struct{})
|
||||
listRer := dstFs.(fs.ListRer)
|
||||
require.NotNil(t, listRer)
|
||||
err = listRer.ListR(ctx, "", func(entries fs.DirEntries) error {
|
||||
for _, entry := range entries {
|
||||
obj, ok := entry.(fs.Object)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
ll[obj.Remote()] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
checkFunc(ctx, t, dstFs, ll, rootItem)
|
||||
|
||||
require.Equal(t, 0, len(ll))
|
||||
}
|
||||
|
||||
func sdstFailSmallFile(t *testing.T) {
|
||||
sdstFailHelper(t, false)
|
||||
}
|
||||
|
||||
func sdstFailLargeFile(t *testing.T) {
|
||||
sdstFailHelper(t, true)
|
||||
}
|
||||
|
||||
func sdstFailHelper(t *testing.T, isStream bool) {
|
||||
bigFileSize := 2000
|
||||
archiveFile := "archive.tar.gz"
|
||||
ctx, srcFs, dstFs, rootItem := prepareSrcAndDstFs(t, fmt.Sprintf("src:didir2%t", isStream), fmt.Sprintf("dst:dir2%t", isStream), archiveFile, bigFileSize)
|
||||
|
||||
skipFiles := 10
|
||||
filePath := rootItem.FindFilePath(&skipFiles, bigFileSize, !isStream)
|
||||
require.NotEqual(t, "", filePath)
|
||||
expectedFalls := 2
|
||||
// Extract archive to dst fs
|
||||
tr := newFailTarReader(nil, filePath, expectedFalls)
|
||||
extr := newExtract(ctx, dstFs, srcFs, archiveFile, bigFileSize, func(reader io.Reader) tarcache.TarReader {
|
||||
tarReader := tar.NewReader(reader)
|
||||
tr.UpdateTarReader(tarReader)
|
||||
return tr
|
||||
})
|
||||
err := extr.run()
|
||||
if !isStream {
|
||||
// When processing files that are placed in the cache, tarcahe.Cache returns an error
|
||||
// at the moment of reading the header, so operation.Copy will not be able to initiate
|
||||
// a second read of this file. In this case, if an error occurs for the first time,
|
||||
// it is expected that it will be returned immediately
|
||||
require.Error(t, err)
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
ll := make(map[string]struct{})
|
||||
listRer := dstFs.(fs.ListRer)
|
||||
require.NotNil(t, listRer)
|
||||
err = listRer.ListR(ctx, "", func(entries fs.DirEntries) error {
|
||||
for _, entry := range entries {
|
||||
obj, ok := entry.(fs.Object)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
ll[obj.Remote()] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
checkFunc(ctx, t, dstFs, ll, rootItem)
|
||||
|
||||
require.Equal(t, 0, len(ll))
|
||||
}
|
||||
|
||||
func testFdstFailSmallFile(t *testing.T) {
|
||||
helperTestFdstFail(t, false)
|
||||
}
|
||||
|
||||
func testFdstFailLargeFile(t *testing.T) {
|
||||
helperTestFdstFail(t, true)
|
||||
}
|
||||
|
||||
func checkFunc(ctx context.Context, t *testing.T, dstFs fs.Fs, files map[string]struct{}, itm *tartest.Item) {
|
||||
for _, obj := range itm.Children {
|
||||
if obj.IsDir() {
|
||||
checkFunc(ctx, t, dstFs, files, obj)
|
||||
continue
|
||||
}
|
||||
_, ok := files[obj.Name()]
|
||||
require.True(t, ok)
|
||||
delete(files, obj.Name())
|
||||
o, err := dstFs.NewObject(ctx, obj.Name())
|
||||
require.NoError(t, err)
|
||||
reader, err := o.Open(ctx)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, reader)
|
||||
actual, err := io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, obj.FileContent, actual)
|
||||
}
|
||||
}
|
||||
|
||||
func prepareSrcAndDstFs(t *testing.T, srcRoot, dstRoot, archiveFile string, bigFileSize int) (ctx context.Context, src fs.Fs, dst fs.Fs, rootItem *tartest.Item) {
|
||||
ctx = context.Background()
|
||||
ctx, ci := fs.AddConfig(ctx)
|
||||
ci.Transfers = 10
|
||||
ci2 := fs.GetConfig(ctx)
|
||||
require.Equal(t, ci.Transfers, ci2.Transfers)
|
||||
var err error
|
||||
var archiveContent []byte
|
||||
rootItem, archiveContent = tartest.CreateTarArchive(t, 5, 10, 12, 50, bigFileSize)
|
||||
src, err = memory.NewFs(ctx, "memory", srcRoot, nil)
|
||||
require.NoError(t, err)
|
||||
o := mockobject.New(archiveFile).WithContent(archiveContent, mockobject.SeekModeNone)
|
||||
reader, err := o.Open(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Put archive file to source fs
|
||||
_, err = src.Put(ctx, reader, o)
|
||||
require.NoError(t, err)
|
||||
dst, err = memory.NewFs(ctx, "memory", dstRoot, nil)
|
||||
require.NoError(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
func testHappyPath(t *testing.T) {
|
||||
bigFileSize := 2000
|
||||
archiveFile := "archive.tar.gz"
|
||||
ctx, srcFs, dstFs, rootItem := prepareSrcAndDstFs(t, "src:dir", "dst:dir", archiveFile, bigFileSize)
|
||||
defer func() {
|
||||
require.NoError(t, srcFs.Rmdir(ctx, "src:dir"))
|
||||
require.NoError(t, srcFs.Rmdir(ctx, "dst:dir"))
|
||||
}()
|
||||
// Extract archive to dst fs
|
||||
err := Extract(ctx, dstFs, srcFs, archiveFile, bigFileSize)
|
||||
require.NoError(t, err)
|
||||
|
||||
ll := make(map[string]struct{})
|
||||
listRer := dstFs.(fs.ListRer)
|
||||
require.NotNil(t, listRer)
|
||||
err = listRer.ListR(ctx, "", func(entries fs.DirEntries) error {
|
||||
for _, entry := range entries {
|
||||
obj, ok := entry.(fs.Object)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
ll[obj.Remote()] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
checkFunc(ctx, t, dstFs, ll, rootItem)
|
||||
|
||||
require.Equal(t, 0, len(ll))
|
||||
}
|
198
fs/extract/tarcache/tarcache.go
Normal file
198
fs/extract/tarcache/tarcache.go
Normal file
|
@ -0,0 +1,198 @@
|
|||
// Package tarcache provides a cache for tar archives.
|
||||
package tarcache
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type TarReader interface {
|
||||
io.Reader
|
||||
Next() (*tar.Header, error)
|
||||
}
|
||||
|
||||
// Cache is a cache for tar archives.
|
||||
type Cache struct {
|
||||
reader TarReader
|
||||
workerCnt int
|
||||
maxSizeInMemory int64
|
||||
|
||||
ch chan struct{}
|
||||
helperCh chan func()
|
||||
streamWg sync.WaitGroup
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// Header holds info about file in tar archive.
|
||||
type Header struct {
|
||||
FilePath string
|
||||
ModTime time.Time
|
||||
Size int64
|
||||
Format tar.Format
|
||||
}
|
||||
|
||||
// FileInfo holds info about file in tar archive and its content.
|
||||
type FileInfo struct {
|
||||
Payload []byte
|
||||
IoPayload io.Reader
|
||||
ReleaseFn func()
|
||||
FileInfo Header
|
||||
}
|
||||
|
||||
func (t *Cache) waitWorkersDone() {
|
||||
t.wg.Wait()
|
||||
}
|
||||
|
||||
// Close closes the cache and free all resources.
|
||||
func (t *Cache) Close() error {
|
||||
t.waitWorkersDone()
|
||||
if t.ch != nil {
|
||||
close(t.ch)
|
||||
t.ch = nil
|
||||
close(t.helperCh)
|
||||
t.helperCh = nil
|
||||
return nil
|
||||
}
|
||||
return errors.New("cache already closed")
|
||||
}
|
||||
|
||||
// NewTarCache creates a new tar cache.
|
||||
func NewTarCache(reader TarReader, workerCnt int, maxSizeInMemory int64) *Cache {
|
||||
res := &Cache{
|
||||
reader: reader,
|
||||
workerCnt: workerCnt,
|
||||
maxSizeInMemory: maxSizeInMemory,
|
||||
ch: make(chan struct{}, workerCnt),
|
||||
helperCh: make(chan func(), 1),
|
||||
}
|
||||
go func() {
|
||||
for fn := range res.helperCh {
|
||||
fn()
|
||||
}
|
||||
}()
|
||||
return res
|
||||
}
|
||||
|
||||
type helper struct {
|
||||
io.Reader
|
||||
releaseFn func()
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
// Close invokes the closer function.
|
||||
func (h *helper) Close() error {
|
||||
h.once.Do(h.releaseFn)
|
||||
return nil
|
||||
}
|
||||
|
||||
// NextPayload returns the next archive file as io.ReadCloser
|
||||
func (t *Cache) NextPayload(ctx context.Context) (_ io.ReadCloser, _ Header, err error) {
|
||||
res, err := t.Next(ctx)
|
||||
if err != nil {
|
||||
return nil, Header{}, err
|
||||
}
|
||||
if res.IoPayload != nil {
|
||||
return &helper{Reader: res.IoPayload, releaseFn: res.ReleaseFn}, res.FileInfo, nil
|
||||
}
|
||||
res.IoPayload = bytes.NewReader(res.Payload)
|
||||
return &helper{Reader: res.IoPayload, releaseFn: res.ReleaseFn}, res.FileInfo, nil
|
||||
}
|
||||
|
||||
type streamCloser struct {
|
||||
ch chan struct{}
|
||||
cache *Cache
|
||||
isBytes bool
|
||||
}
|
||||
|
||||
func newStreamCloser(cache *Cache, isBytes bool) *streamCloser {
|
||||
if !isBytes {
|
||||
cache.streamWg.Add(1)
|
||||
}
|
||||
cache.wg.Add(1)
|
||||
return &streamCloser{
|
||||
ch: cache.ch,
|
||||
cache: cache,
|
||||
isBytes: isBytes,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *streamCloser) close() {
|
||||
if s.ch == nil {
|
||||
fmt.Printf("ch is nil: %p", s)
|
||||
fmt.Println("")
|
||||
return
|
||||
}
|
||||
ch := s.ch
|
||||
s.ch = nil
|
||||
<-ch
|
||||
if !s.isBytes {
|
||||
s.cache.streamWg.Done()
|
||||
}
|
||||
s.cache.wg.Done()
|
||||
}
|
||||
|
||||
// Next returns info about the next file in the archive
|
||||
func (t *Cache) Next(ctx context.Context) (_ FileInfo, err error) {
|
||||
// We block the execution flow if the number of currently processed files exceeds the limit.
|
||||
select {
|
||||
case t.ch <- struct{}{}:
|
||||
case <-ctx.Done():
|
||||
return FileInfo{}, ctx.Err()
|
||||
}
|
||||
watcher := make(chan struct{})
|
||||
t.helperCh <- func() {
|
||||
// We also block the execution flow if there is a reader without caching.
|
||||
t.streamWg.Wait()
|
||||
close(watcher)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-watcher:
|
||||
case <-ctx.Done():
|
||||
return FileInfo{}, ctx.Err()
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
<-t.ch
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
header, err := t.reader.Next()
|
||||
if err != nil {
|
||||
return FileInfo{nil, nil, nil, Header{}}, err
|
||||
}
|
||||
switch header.Typeflag {
|
||||
case tar.TypeDir:
|
||||
continue
|
||||
case tar.TypeReg:
|
||||
h := Header{
|
||||
FilePath: header.Name,
|
||||
ModTime: header.ModTime,
|
||||
Size: header.Size,
|
||||
Format: header.Format,
|
||||
}
|
||||
|
||||
if header.Size < t.maxSizeInMemory {
|
||||
var payload []byte
|
||||
payload, err = io.ReadAll(t.reader)
|
||||
if err != nil {
|
||||
return FileInfo{nil, nil, nil, Header{}}, err
|
||||
}
|
||||
closer := newStreamCloser(t, true)
|
||||
return FileInfo{payload, nil, closer.close, h}, nil
|
||||
}
|
||||
closer := newStreamCloser(t, false)
|
||||
return FileInfo{nil, t.reader, closer.close, h}, nil
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
525
fs/extract/tarcache/tarcache_test.go
Normal file
525
fs/extract/tarcache/tarcache_test.go
Normal file
|
@ -0,0 +1,525 @@
|
|||
package tarcache
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"math"
|
||||
"math/rand"
|
||||
"path"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func (i *Item) FindFilePath(skipCnt *int, bigFileSize int, fitInCache bool) string {
|
||||
for _, obj := range i.Children {
|
||||
if !obj.IsDir() {
|
||||
isBig := len(obj.FileContent) >= bigFileSize
|
||||
if isBig && !fitInCache || !isBig && fitInCache {
|
||||
*skipCnt--
|
||||
if *skipCnt == 0 {
|
||||
return obj.Name()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, obj := range i.Children {
|
||||
if obj.IsDir() {
|
||||
res := obj.FindFilePath(skipCnt, bigFileSize, fitInCache)
|
||||
if len(res) > 0 {
|
||||
return res
|
||||
}
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type ItemType int
|
||||
|
||||
const (
|
||||
Dir ItemType = iota
|
||||
File
|
||||
)
|
||||
|
||||
type Item struct {
|
||||
Type ItemType
|
||||
ItemName string
|
||||
Children map[string]*Item
|
||||
FileContent []byte
|
||||
}
|
||||
|
||||
func (i *Item) Name() string {
|
||||
return i.ItemName
|
||||
}
|
||||
|
||||
func (i *Item) Size() int64 {
|
||||
return int64(len(i.FileContent))
|
||||
}
|
||||
|
||||
func (i *Item) Mode() fs.FileMode {
|
||||
res := fs.ModePerm
|
||||
if i.Type == Dir {
|
||||
res |= fs.ModeDir
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func (i *Item) ModTime() time.Time {
|
||||
return time.Now()
|
||||
}
|
||||
|
||||
func (i *Item) IsDir() bool {
|
||||
return i.Type == Dir
|
||||
}
|
||||
|
||||
func (i *Item) Sys() any {
|
||||
return nil
|
||||
}
|
||||
|
||||
func dice(rnd *rand.Rand, percent int) bool {
|
||||
return rnd.Intn(100) < percent
|
||||
}
|
||||
|
||||
func randomFileName(rnd *rand.Rand) string {
|
||||
b := make([]byte, 16)
|
||||
rnd.Read(b)
|
||||
return hex.EncodeToString(b)
|
||||
}
|
||||
|
||||
type errorReader struct {
|
||||
io.Reader
|
||||
|
||||
bytesRead int
|
||||
threshold int
|
||||
}
|
||||
|
||||
var errRead = errors.New("read error")
|
||||
|
||||
func newReaderWithError(rc io.Reader, threshold int) *errorReader {
|
||||
return &errorReader{rc, 0, threshold}
|
||||
}
|
||||
|
||||
func (e *errorReader) Read(p []byte) (n int, err error) {
|
||||
if e.bytesRead > e.threshold {
|
||||
return 0, errRead
|
||||
}
|
||||
n, err = e.Reader.Read(p)
|
||||
e.bytesRead += n
|
||||
return
|
||||
}
|
||||
|
||||
func (i *Item) createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) {
|
||||
if deep <= 0 {
|
||||
return
|
||||
}
|
||||
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir
|
||||
|
||||
for j := 0; j < nItem; j++ {
|
||||
isFile := deep == 1 || dice(rnd, 60)
|
||||
newItem := Item{}
|
||||
var prefix string
|
||||
if len(i.ItemName) > 0 {
|
||||
prefix = i.ItemName + "/"
|
||||
}
|
||||
if isFile {
|
||||
newItem.ItemName = fmt.Sprintf("%sfile_%s", prefix, randomFileName(rnd))
|
||||
newItem.Type = File
|
||||
var fileSize int
|
||||
isSmallFile := dice(rnd, smallFilesPercent)
|
||||
if isSmallFile {
|
||||
// [1, bigFileSize)
|
||||
fileSize = rnd.Intn(bigFileSize-1) + 1
|
||||
} else {
|
||||
// [bigFileSize, 2*bigFileSize)
|
||||
fileSize = rnd.Intn(bigFileSize) + bigFileSize
|
||||
}
|
||||
newItem.FileContent = make([]byte, fileSize)
|
||||
rnd.Read(newItem.FileContent)
|
||||
i.Children[newItem.ItemName] = &newItem
|
||||
} else {
|
||||
newItem.ItemName = fmt.Sprintf("%sdir_%s", prefix, randomFileName(rnd))
|
||||
newItem.Type = Dir
|
||||
newItem.Children = make(map[string]*Item)
|
||||
newItem.createTarArchiveContent(deep-1, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
|
||||
i.Children[newItem.ItemName] = &newItem
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (i *Item) fillMap(m *sync.Map) {
|
||||
for _, v := range i.Children {
|
||||
if v.Type == File {
|
||||
m.Store(v.ItemName, v.FileContent)
|
||||
} else if v.Type == Dir {
|
||||
v.fillMap(m)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func CreateTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) *Item {
|
||||
root := &Item{
|
||||
Type: Dir,
|
||||
ItemName: "",
|
||||
Children: make(map[string]*Item),
|
||||
}
|
||||
|
||||
root.createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
|
||||
return root
|
||||
}
|
||||
|
||||
func (i *Item) writeItemToArchiver(tw *tar.Writer) error {
|
||||
// In the first pass, we write the files from the current directory.
|
||||
for _, child := range i.Children {
|
||||
if child.Type == File {
|
||||
head, err := tar.FileInfoHeader(child, path.Base(child.ItemName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
head.Name = child.ItemName
|
||||
err = tw.WriteHeader(head)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = io.Copy(tw, bytes.NewReader(child.FileContent))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// In the second pass, we write files from child directories.
|
||||
for _, child := range i.Children {
|
||||
if child.Type == Dir {
|
||||
if err := child.writeItemToArchiver(tw); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func createTarArchiveHelper(writer io.Writer, content *Item) error {
|
||||
gz := gzip.NewWriter(writer)
|
||||
defer func() {
|
||||
_ = gz.Close()
|
||||
}()
|
||||
|
||||
tw := tar.NewWriter(gz)
|
||||
defer func() {
|
||||
_ = tw.Close()
|
||||
}()
|
||||
|
||||
return content.writeItemToArchiver(tw)
|
||||
}
|
||||
|
||||
func CreateTarArchive(t *testing.T, deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) (*Item, []byte) {
|
||||
content := CreateTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
|
||||
|
||||
writer := bytes.NewBuffer(make([]byte, 0))
|
||||
|
||||
err := createTarArchiveHelper(writer, content)
|
||||
archData := writer.Bytes()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, content)
|
||||
require.NotNil(t, archData)
|
||||
|
||||
return content, archData
|
||||
}
|
||||
|
||||
func createTarArchiveWithOneFileHelper(w io.Writer) error {
|
||||
gz := gzip.NewWriter(w)
|
||||
defer func() {
|
||||
_ = gz.Close()
|
||||
}()
|
||||
|
||||
tw := tar.NewWriter(gz)
|
||||
defer func() {
|
||||
_ = tw.Close()
|
||||
}()
|
||||
itm := Item{
|
||||
Type: File,
|
||||
ItemName: "test.bin",
|
||||
FileContent: make([]byte, 0x80000),
|
||||
}
|
||||
|
||||
rootItm := Item{
|
||||
Type: Dir,
|
||||
Children: map[string]*Item{
|
||||
itm.ItemName: &itm,
|
||||
},
|
||||
}
|
||||
|
||||
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
rnd.Read(itm.FileContent)
|
||||
return rootItm.writeItemToArchiver(tw)
|
||||
}
|
||||
|
||||
func createTarArchiveWithOneFile(t *testing.T) []byte {
|
||||
writer := bytes.NewBuffer(make([]byte, 0))
|
||||
err := createTarArchiveWithOneFileHelper(writer)
|
||||
require.NoError(t, err)
|
||||
return writer.Bytes()
|
||||
}
|
||||
|
||||
func doubleCloseCaseHelper(t *testing.T, readerTypeChecker func(io.Reader) bool) {
|
||||
bigFileSize := 0x50
|
||||
content, archData := CreateTarArchive(t, 5, 8, 10, 50, bigFileSize)
|
||||
|
||||
reader := bytes.NewReader(archData)
|
||||
|
||||
gzf, err := gzip.NewReader(reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
tarReader := tar.NewReader(gzf)
|
||||
tarCache := NewTarCache(tarReader, 10, int64(bigFileSize))
|
||||
|
||||
var m sync.Map
|
||||
content.fillMap(&m)
|
||||
|
||||
errCh := make(chan error, 100)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ctx2 := context.Background()
|
||||
defer cancel()
|
||||
var wg sync.WaitGroup
|
||||
var cnt int32
|
||||
|
||||
L:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break L
|
||||
default:
|
||||
}
|
||||
contentReader, header, err := tarCache.NextPayload(ctx2)
|
||||
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
require.NoError(t, err)
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(reader io.ReadCloser, head Header) {
|
||||
defer wg.Done()
|
||||
defer func() {
|
||||
if err := reader.Close(); err != nil {
|
||||
errCh <- err
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
val, ok := m.Load(header.FilePath)
|
||||
if !ok {
|
||||
errCh <- errors.New(header.FilePath + " not found")
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
expected := val.([]byte)
|
||||
content, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
if !bytes.Equal(expected, content) {
|
||||
errCh <- errors.New(header.FilePath + " content mismatch")
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
if atomic.AddInt32(&cnt, 1) >= 100 {
|
||||
if readerTypeChecker(reader) {
|
||||
if err := reader.Close(); err != nil {
|
||||
errCh <- err
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}(contentReader, header)
|
||||
}
|
||||
err = tarCache.Close()
|
||||
require.NoError(t, err)
|
||||
wg.Wait()
|
||||
close(errCh)
|
||||
|
||||
hasError := false
|
||||
for e := range errCh {
|
||||
if e != nil {
|
||||
hasError = true
|
||||
}
|
||||
}
|
||||
require.False(t, hasError)
|
||||
}
|
||||
|
||||
func readErrorCase(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
archData := createTarArchiveWithOneFile(t)
|
||||
|
||||
reader := newReaderWithError(bytes.NewReader(archData), 0x800)
|
||||
|
||||
gzf, err := gzip.NewReader(reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
tarReader := tar.NewReader(gzf)
|
||||
tarCache := NewTarCache(tarReader, 10, math.MaxInt)
|
||||
|
||||
errCh := make(chan error, 100)
|
||||
var wg sync.WaitGroup
|
||||
_, _, err = tarCache.NextPayload(ctx)
|
||||
|
||||
require.NotErrorIs(t, err, io.EOF)
|
||||
require.Error(t, err, errRead.Error())
|
||||
|
||||
err = tarCache.Close()
|
||||
require.NoError(t, err)
|
||||
wg.Wait()
|
||||
close(errCh)
|
||||
|
||||
for e := range errCh {
|
||||
if err == nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func successfulCase(t *testing.T) {
|
||||
bigFileSize := 0x200
|
||||
content, archData := CreateTarArchive(t, 5, 5, 30, 70, bigFileSize)
|
||||
|
||||
reader := bytes.NewReader(archData)
|
||||
gzf, err := gzip.NewReader(reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
tarReader := tar.NewReader(gzf)
|
||||
tarCache := NewTarCache(tarReader, 10, int64(bigFileSize))
|
||||
|
||||
var m sync.Map
|
||||
content.fillMap(&m)
|
||||
|
||||
errCh := make(chan error, 100)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ctx2 := context.Background()
|
||||
defer cancel()
|
||||
var wg sync.WaitGroup
|
||||
L:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break L
|
||||
default:
|
||||
}
|
||||
contentReader, header, err := tarCache.NextPayload(ctx2)
|
||||
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
require.NoError(t, err)
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(reader io.ReadCloser, head Header) {
|
||||
defer wg.Done()
|
||||
defer func() {
|
||||
if err := reader.Close(); err != nil {
|
||||
errCh <- err
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
val, ok := m.Load(header.FilePath)
|
||||
if !ok {
|
||||
errCh <- errors.New(header.FilePath + " not found")
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
expected := val.([]byte)
|
||||
content, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
if !bytes.Equal(expected, content) {
|
||||
errCh <- errors.New(header.FilePath + " content mismatch")
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
m.Delete(header.FilePath)
|
||||
|
||||
}(contentReader, header)
|
||||
}
|
||||
err = tarCache.Close()
|
||||
wg.Wait()
|
||||
close(errCh)
|
||||
var err2 error
|
||||
for e := range errCh {
|
||||
if err2 == nil {
|
||||
err2 = e
|
||||
}
|
||||
}
|
||||
require.NoError(t, err2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Checking for dictionary emptiness
|
||||
var l int
|
||||
m.Range(func(k, v interface{}) bool {
|
||||
l++
|
||||
return true
|
||||
})
|
||||
require.Equal(t, 0, l)
|
||||
}
|
||||
|
||||
func TestTarCache(t *testing.T) {
|
||||
t.Run("Successful case", func(t *testing.T) {
|
||||
successfulCase(t)
|
||||
})
|
||||
|
||||
t.Run("Double close of the byte reader", func(t *testing.T) {
|
||||
doubleCloseCaseHelper(t, func(reader io.Reader) bool {
|
||||
e, ok := reader.(*helper)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
_, ok = e.Reader.(*bytes.Reader)
|
||||
return ok
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Double close of the stream reader", func(t *testing.T) {
|
||||
doubleCloseCaseHelper(t, func(reader io.Reader) bool {
|
||||
e, ok := reader.(*helper)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
_, ok = e.Reader.(*bytes.Reader)
|
||||
return !ok
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Read error", func(t *testing.T) {
|
||||
readErrorCase(t)
|
||||
})
|
||||
|
||||
}
|
216
fs/extract/tarcache/tartest/tartest.go
Normal file
216
fs/extract/tarcache/tartest/tartest.go
Normal file
|
@ -0,0 +1,216 @@
|
|||
// Package tartest is a set of functions that assist with the testing of tar archives
|
||||
package tartest
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"math/rand"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// FindFilePath is a helper function to find the file path of an item
|
||||
func (i *Item) FindFilePath(skipCnt *int, bigFileSize int, fitInCache bool) string {
|
||||
for _, obj := range i.Children {
|
||||
if !obj.IsDir() {
|
||||
isBig := len(obj.FileContent) >= bigFileSize
|
||||
if isBig && !fitInCache || !isBig && fitInCache {
|
||||
if *skipCnt == 0 {
|
||||
return obj.Name()
|
||||
}
|
||||
*skipCnt--
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, obj := range i.Children {
|
||||
if obj.IsDir() {
|
||||
res := obj.FindFilePath(skipCnt, bigFileSize, fitInCache)
|
||||
if len(res) > 0 {
|
||||
return res
|
||||
}
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// ItemType is an item type
|
||||
type ItemType int
|
||||
|
||||
// Item types
|
||||
const (
|
||||
Dir ItemType = iota // directory type
|
||||
File // regular file type
|
||||
)
|
||||
|
||||
// Item represents a file or directory
|
||||
type Item struct {
|
||||
Type ItemType
|
||||
ItemName string
|
||||
Children map[string]*Item
|
||||
FileContent []byte
|
||||
}
|
||||
|
||||
// Name returns file name
|
||||
func (i *Item) Name() string {
|
||||
return i.ItemName
|
||||
}
|
||||
|
||||
// Size returns file size
|
||||
func (i *Item) Size() int64 {
|
||||
return int64(len(i.FileContent))
|
||||
}
|
||||
|
||||
// Mode returns file mode
|
||||
func (i *Item) Mode() fs.FileMode {
|
||||
res := fs.ModePerm
|
||||
if i.Type == Dir {
|
||||
res |= fs.ModeDir
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// ModTime returns modification time
|
||||
func (i *Item) ModTime() time.Time {
|
||||
return time.Now()
|
||||
}
|
||||
|
||||
// IsDir returns true if item is a directory
|
||||
func (i *Item) IsDir() bool {
|
||||
return i.Type == Dir
|
||||
}
|
||||
|
||||
// Sys returns underlying data source (can return nil)
|
||||
func (i *Item) Sys() any {
|
||||
return nil
|
||||
}
|
||||
|
||||
func dice(rnd *rand.Rand, percent int) bool {
|
||||
return rnd.Intn(100) < percent
|
||||
}
|
||||
|
||||
func randomFileName(rnd *rand.Rand) string {
|
||||
b := make([]byte, 16)
|
||||
rnd.Read(b)
|
||||
return hex.EncodeToString(b)
|
||||
}
|
||||
|
||||
func (i *Item) createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) {
|
||||
if deep <= 0 {
|
||||
return
|
||||
}
|
||||
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir
|
||||
|
||||
for j := 0; j < nItem; j++ {
|
||||
isFile := deep == 1 || dice(rnd, 60)
|
||||
newItem := Item{}
|
||||
var prefix string
|
||||
if len(i.ItemName) > 0 {
|
||||
prefix = i.ItemName + "/"
|
||||
}
|
||||
if isFile {
|
||||
newItem.ItemName = fmt.Sprintf("%sfile_%s", prefix, randomFileName(rnd))
|
||||
newItem.Type = File
|
||||
var fileSize int
|
||||
isSmallFile := dice(rnd, smallFilesPercent)
|
||||
if isSmallFile {
|
||||
// [1, bigFileSize)
|
||||
fileSize = rnd.Intn(bigFileSize-1) + 1
|
||||
} else {
|
||||
// [bigFileSize, 2*bigFileSize)
|
||||
fileSize = rnd.Intn(bigFileSize) + bigFileSize
|
||||
}
|
||||
newItem.FileContent = make([]byte, fileSize)
|
||||
rnd.Read(newItem.FileContent)
|
||||
i.Children[newItem.ItemName] = &newItem
|
||||
} else {
|
||||
newItem.ItemName = fmt.Sprintf("%sdir_%s", prefix, randomFileName(rnd))
|
||||
newItem.Type = Dir
|
||||
newItem.Children = make(map[string]*Item)
|
||||
newItem.createTarArchiveContent(deep-1, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
|
||||
i.Children[newItem.ItemName] = &newItem
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// CreateTarArchiveContent creates a tar archive of the given deep.
|
||||
func CreateTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) *Item {
|
||||
root := &Item{
|
||||
Type: Dir,
|
||||
ItemName: "",
|
||||
Children: make(map[string]*Item),
|
||||
}
|
||||
|
||||
root.createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
|
||||
return root
|
||||
}
|
||||
|
||||
func (i *Item) writeItemToArchiver(tw *tar.Writer) error {
|
||||
// In the first pass, we write the files from the current directory.
|
||||
for _, child := range i.Children {
|
||||
if child.Type == File {
|
||||
head, err := tar.FileInfoHeader(child, path.Base(child.ItemName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
head.Name = child.ItemName
|
||||
err = tw.WriteHeader(head)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = io.Copy(tw, bytes.NewReader(child.FileContent))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// In the second pass, we write files from child directories.
|
||||
for _, child := range i.Children {
|
||||
if child.Type == Dir {
|
||||
if err := child.writeItemToArchiver(tw); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func createTarArchiveHelper(writer io.Writer, content *Item) error {
|
||||
gz := gzip.NewWriter(writer)
|
||||
defer func() {
|
||||
_ = gz.Close()
|
||||
}()
|
||||
|
||||
tw := tar.NewWriter(gz)
|
||||
defer func() {
|
||||
_ = tw.Close()
|
||||
}()
|
||||
|
||||
return content.writeItemToArchiver(tw)
|
||||
}
|
||||
|
||||
// CreateTarArchive creates a tar archive of the given content.
|
||||
func CreateTarArchive(t *testing.T, deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) (*Item, []byte) {
|
||||
content := CreateTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
|
||||
|
||||
writer := bytes.NewBuffer(make([]byte, 0))
|
||||
|
||||
err := createTarArchiveHelper(writer, content)
|
||||
archData := writer.Bytes()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, content)
|
||||
require.NotNil(t, archData)
|
||||
|
||||
return content, archData
|
||||
}
|
3
go.mod
3
go.mod
|
@ -168,6 +168,7 @@ require (
|
|||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
github.com/golang/protobuf v1.5.4 // indirect
|
||||
github.com/golang/snappy v0.0.1 // indirect
|
||||
github.com/google/gops v0.3.28 // indirect
|
||||
github.com/google/s2a-go v0.1.7 // indirect
|
||||
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
|
||||
github.com/googleapis/gax-go/v2 v2.12.5 // indirect
|
||||
|
@ -232,6 +233,7 @@ require (
|
|||
github.com/twmb/murmur3 v1.1.8 // indirect
|
||||
github.com/urfave/cli/v2 v2.27.4 // indirect
|
||||
github.com/willscott/go-nfs-client v0.0.0-20240104095149-b44639837b00 // indirect
|
||||
github.com/xlab/treeprint v1.2.0 // indirect
|
||||
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
|
||||
github.com/yusufpapurcu/wmi v1.2.4 // indirect
|
||||
github.com/zeebo/blake3 v0.2.3 // indirect
|
||||
|
@ -249,6 +251,7 @@ require (
|
|||
google.golang.org/protobuf v1.34.2 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
moul.io/http2curl/v2 v2.3.0 // indirect
|
||||
rsc.io/goversion v1.2.0 // indirect
|
||||
storj.io/common v0.0.0-20240812101423-26b53789c348 // indirect
|
||||
storj.io/drpc v0.0.35-0.20240709171858-0075ac871661 // indirect
|
||||
storj.io/eventkit v0.0.0-20240415002644-1d9596fee086 // indirect
|
||||
|
|
6
go.sum
6
go.sum
|
@ -369,6 +369,8 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
|||
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/gops v0.3.28 h1:2Xr57tqKAmQYRAfG12E+yLcoa2Y42UJo2lOrUFL9ark=
|
||||
github.com/google/gops v0.3.28/go.mod h1:6f6+Nl8LcHrzJwi8+p0ii+vmBFSlB4f8cOOkTJ7sk4c=
|
||||
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
|
||||
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
|
||||
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
|
||||
|
@ -722,6 +724,8 @@ github.com/winfsp/cgofuse v1.5.1-0.20221118130120-84c0898ad2e0 h1:j3un8DqYvvAOqK
|
|||
github.com/winfsp/cgofuse v1.5.1-0.20221118130120-84c0898ad2e0/go.mod h1:uxjoF2jEYT3+x+vC2KJddEGdk/LU8pRowXmyVMHSV5I=
|
||||
github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM=
|
||||
github.com/xanzy/ssh-agent v0.3.3/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw=
|
||||
github.com/xlab/treeprint v1.2.0 h1:HzHnuAF1plUN2zGlAFHbSQP2qJ0ZAD3XF5XD7OesXRQ=
|
||||
github.com/xlab/treeprint v1.2.0/go.mod h1:gj5Gd3gPdKtR1ikdDK6fnFLdmIS0X30kTTuNd/WEJu0=
|
||||
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4=
|
||||
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM=
|
||||
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a h1:fZHgsYlfvtyqToslyjUt3VOPF4J7aK/3MPcK7xp3PDk=
|
||||
|
@ -1155,6 +1159,8 @@ honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9
|
|||
moul.io/http2curl/v2 v2.3.0 h1:9r3JfDzWPcbIklMOs2TnIFzDYvfAZvjeavG6EzP7jYs=
|
||||
moul.io/http2curl/v2 v2.3.0/go.mod h1:RW4hyBjTWSYDOxapodpNEtX0g5Eb16sxklBqmd2RHcE=
|
||||
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
|
||||
rsc.io/goversion v1.2.0 h1:SPn+NLTiAG7w30IRK/DKp1BjvpWabYgxlLp/+kx5J8w=
|
||||
rsc.io/goversion v1.2.0/go.mod h1:Eih9y/uIBS3ulggl7KNJ09xGSLcuNaLgmvvqa07sgfo=
|
||||
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
|
||||
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
|
||||
rsc.io/tmplfunc v0.0.3 h1:53XFQh69AfOa8Tw0Jm7t+GV7KZhOi6jzsCzTtKbMvzU=
|
||||
|
|
Loading…
Add table
Reference in a new issue