[#8] Add extract command
Signed-off-by: Aleksey Kravchenko <al.kravchenko@yadro.com>
This commit is contained in:
parent
4733d46d83
commit
b767fe35e6
10 changed files with 2985 additions and 2 deletions
|
@ -44,7 +44,7 @@ jobs:
|
|||
AIO_VERSION: 1.7.0-nightly.4
|
||||
RCLONE_CONFIG: /config/rclone.conf
|
||||
|
||||
# run only tests related to FrostFS backend
|
||||
# run only tests related to FrostFS backend and extract command
|
||||
run: |-
|
||||
podman-service.sh
|
||||
podman info
|
||||
|
@ -64,4 +64,5 @@ jobs:
|
|||
docker cp aio:/config/user-wallet.json /config/wallet.json
|
||||
|
||||
echo "Start tests"
|
||||
go test -v github.com/rclone/rclone/backend/frostfs
|
||||
go test -v github.com/rclone/rclone/backend/frostfs
|
||||
go test -v -cover --race ./fs/extract/...
|
|
@ -23,6 +23,7 @@ import (
|
|||
_ "github.com/rclone/rclone/cmd/dedupe"
|
||||
_ "github.com/rclone/rclone/cmd/delete"
|
||||
_ "github.com/rclone/rclone/cmd/deletefile"
|
||||
_ "github.com/rclone/rclone/cmd/extract"
|
||||
_ "github.com/rclone/rclone/cmd/genautocomplete"
|
||||
_ "github.com/rclone/rclone/cmd/gendocs"
|
||||
_ "github.com/rclone/rclone/cmd/gitannex"
|
||||
|
|
62
cmd/extract/extract.go
Normal file
62
cmd/extract/extract.go
Normal file
|
@ -0,0 +1,62 @@
|
|||
// Package extract provides the extract command.
|
||||
package extract
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
|
||||
"github.com/rclone/rclone/cmd"
|
||||
"github.com/rclone/rclone/fs/config/flags"
|
||||
"github.com/rclone/rclone/fs/extract"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var (
|
||||
maxFileSizeForCache = 0x100000 // 1Mb
|
||||
)
|
||||
|
||||
func init() {
|
||||
cmd.Root.AddCommand(commandDefinition)
|
||||
cmdFlags := commandDefinition.Flags()
|
||||
flags.IntVarP(cmdFlags,
|
||||
&maxFileSizeForCache,
|
||||
"max-file-size-for-cache",
|
||||
"",
|
||||
maxFileSizeForCache,
|
||||
`Maximum file size that can be placed in the cache. Files of larger
|
||||
size will be read without caching, which will prevent parallelization
|
||||
of copy operations and may result in longer archive unpacking time.`,
|
||||
"")
|
||||
}
|
||||
|
||||
var commandDefinition = &cobra.Command{
|
||||
Use: "extract source:path/to/archive.tar.gz dest:path",
|
||||
Short: `Extract the tar.gz archive from source to the dest, skipping identical files.`,
|
||||
// Note: "|" will be replaced by backticks below
|
||||
Long: strings.ReplaceAll(`Extract the tar.gz archive files from the source to the destination. Does not transfer files that are
|
||||
identical on source and destination, testing by size and modification
|
||||
time or MD5SUM. Doesn't delete files from the destination.
|
||||
|
||||
If dest:path doesn't exist, it is created and the contents of the archive source:path/to/targz/archive
|
||||
go there.
|
||||
|
||||
**Note**: Use the |-P|/|--progress| flag to view real-time transfer statistics.
|
||||
|
||||
**Note**: Use the |--dry-run| or the |--interactive|/|-i| flag to test without copying anything.
|
||||
`, "|", "`"),
|
||||
Annotations: map[string]string{
|
||||
"groups": "Important",
|
||||
},
|
||||
Run: func(command *cobra.Command, args []string) {
|
||||
|
||||
cmd.CheckArgs(2, 2, command, args)
|
||||
fsrc, srcFileName, fdst := cmd.NewFsSrcFileDst(args)
|
||||
cmd.Run(true, true, command, func() error {
|
||||
if srcFileName == "" {
|
||||
return errors.New("the source is required to be a file")
|
||||
}
|
||||
return extract.Extract(context.Background(), fdst, fsrc, srcFileName, maxFileSizeForCache)
|
||||
})
|
||||
},
|
||||
}
|
267
fs/extract/dirtreeholder.go
Normal file
267
fs/extract/dirtreeholder.go
Normal file
|
@ -0,0 +1,267 @@
|
|||
package extract
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
// Dir represents a directory with objects.
|
||||
type Dir struct {
|
||||
name string
|
||||
objs map[string]fs.DirEntry
|
||||
dirs map[string]*Dir
|
||||
}
|
||||
|
||||
// NewDir creates a new directory.
|
||||
func NewDir(name string) *Dir {
|
||||
return &Dir{
|
||||
name: name,
|
||||
objs: make(map[string]fs.DirEntry),
|
||||
dirs: make(map[string]*Dir),
|
||||
}
|
||||
}
|
||||
|
||||
// Name returns the name of the directory.
|
||||
func (d *Dir) Name() string {
|
||||
return d.name
|
||||
}
|
||||
|
||||
// GetDirByName returns a subdirectory with the given name
|
||||
func (d *Dir) GetDirByName(name string) *Dir {
|
||||
if d.name == name {
|
||||
return d
|
||||
}
|
||||
if len(name) <= len(d.name) || !strings.HasPrefix(name, d.name) {
|
||||
return nil
|
||||
}
|
||||
if d.name != "" && name[len(d.name)] != '/' {
|
||||
return nil
|
||||
}
|
||||
idx := strings.Index(name[len(d.name)+1:], "/")
|
||||
nextDir := name
|
||||
if idx != -1 {
|
||||
nextDir = name[:idx+len(d.name)+1]
|
||||
}
|
||||
if td, ok := d.dirs[nextDir]; ok {
|
||||
return td.GetDirByName(name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddChildDir adds a child directory
|
||||
func (d *Dir) AddChildDir(child *Dir) {
|
||||
d.dirs[child.name] = child
|
||||
}
|
||||
|
||||
// AddChildObject adds information about the child object
|
||||
func (d *Dir) AddChildObject(child fs.DirEntry) {
|
||||
d.objs[child.Remote()] = child
|
||||
}
|
||||
|
||||
type listDirFn func(dir string) (*Dir, error)
|
||||
|
||||
type dirTreeHolder struct {
|
||||
listF listDirFn
|
||||
mu sync.Mutex
|
||||
root *Dir
|
||||
dirsInWork map[string]*sync.WaitGroup
|
||||
}
|
||||
|
||||
type strDirIter struct {
|
||||
dir string
|
||||
idx int
|
||||
}
|
||||
|
||||
func newStrDirIter(dir string) *strDirIter {
|
||||
return &strDirIter{
|
||||
dir: dir,
|
||||
idx: -1,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *strDirIter) next() (cur string, next string) {
|
||||
if s.dir == "" {
|
||||
return "", ""
|
||||
}
|
||||
if s.idx == -1 {
|
||||
s.idx = 0
|
||||
idx := strings.Index(s.dir, "/")
|
||||
if idx == -1 {
|
||||
return "", s.dir
|
||||
}
|
||||
return "", s.dir[:idx]
|
||||
}
|
||||
idx := strings.Index(s.dir[s.idx:], "/")
|
||||
if idx == -1 {
|
||||
return s.dir, ""
|
||||
}
|
||||
defer func(idx int) {
|
||||
s.idx = s.idx + idx + 1
|
||||
}(idx)
|
||||
|
||||
idx2 := strings.Index(s.dir[s.idx+idx+1:], "/")
|
||||
if idx2 == -1 {
|
||||
return s.dir[:idx+s.idx], s.dir
|
||||
}
|
||||
return s.dir[:idx+s.idx], s.dir[:idx+idx2+s.idx+1]
|
||||
}
|
||||
|
||||
func (d *Dir) getObjectByPath(path string) fs.DirEntry {
|
||||
var dirName string
|
||||
idx := strings.LastIndex(path, "/")
|
||||
if idx != -1 {
|
||||
dirName = path[:idx]
|
||||
}
|
||||
dir := d.GetDirByName(dirName)
|
||||
if dir == nil {
|
||||
return nil
|
||||
}
|
||||
return dir.objs[path]
|
||||
}
|
||||
|
||||
func newDirStructHolder(listF listDirFn) *dirTreeHolder {
|
||||
return &dirTreeHolder{
|
||||
listF: listF,
|
||||
root: nil,
|
||||
dirsInWork: make(map[string]*sync.WaitGroup),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *dirTreeHolder) getObjectByPath(path string) fs.DirEntry {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
if l.root == nil {
|
||||
return nil
|
||||
}
|
||||
return l.root.getObjectByPath(path)
|
||||
}
|
||||
|
||||
func (l *dirTreeHolder) isDirExist(path string) bool {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
if l.root == nil {
|
||||
return false
|
||||
}
|
||||
return l.root.GetDirByName(path) != nil
|
||||
}
|
||||
|
||||
func (l *dirTreeHolder) updateDirList(ctx context.Context, dir string) error {
|
||||
l.mu.Lock()
|
||||
iter := newStrDirIter(dir)
|
||||
aborting := func() bool {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Special case: even if an empty dir is passed, listing the root folder is still required
|
||||
firstRun := true
|
||||
for {
|
||||
if aborting() {
|
||||
l.mu.Unlock()
|
||||
return ctx.Err()
|
||||
}
|
||||
curDir, nextDir := iter.next()
|
||||
if nextDir == "" && !firstRun {
|
||||
l.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
firstRun = false
|
||||
if wg, ok := l.dirsInWork[curDir]; ok {
|
||||
l.mu.Unlock()
|
||||
wg.Wait()
|
||||
if aborting() {
|
||||
return ctx.Err()
|
||||
}
|
||||
l.mu.Lock()
|
||||
}
|
||||
if curDir == "" && l.root == nil {
|
||||
if wg, ok := l.dirsInWork[curDir]; ok {
|
||||
l.mu.Unlock()
|
||||
wg.Wait()
|
||||
if aborting() {
|
||||
return ctx.Err()
|
||||
}
|
||||
l.mu.Lock()
|
||||
dir := l.root.GetDirByName(curDir)
|
||||
if dir == nil {
|
||||
l.mu.Unlock()
|
||||
return fmt.Errorf("error while updating info about dir %s", nextDir)
|
||||
}
|
||||
} else {
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
l.dirsInWork[curDir] = wg
|
||||
l.mu.Unlock()
|
||||
d, err := l.listF(curDir)
|
||||
|
||||
if err != nil {
|
||||
if !errors.Is(err, fs.ErrorDirNotFound) {
|
||||
return err
|
||||
}
|
||||
d = NewDir(curDir)
|
||||
}
|
||||
l.mu.Lock()
|
||||
l.root = d
|
||||
wg = l.dirsInWork[curDir]
|
||||
delete(l.dirsInWork, curDir)
|
||||
wg.Done()
|
||||
}
|
||||
}
|
||||
d := l.root.GetDirByName(curDir)
|
||||
if d == nil {
|
||||
return errors.New("not possible to go where")
|
||||
}
|
||||
if _, ok := d.dirs[nextDir]; ok {
|
||||
if ok {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if o, ok := d.objs[nextDir]; ok {
|
||||
// Where is no such directory
|
||||
if _, ok := o.(fs.Directory); !ok {
|
||||
l.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
if wg, ok := l.dirsInWork[nextDir]; ok {
|
||||
l.mu.Unlock()
|
||||
wg.Wait()
|
||||
if aborting() {
|
||||
return ctx.Err()
|
||||
}
|
||||
l.mu.Lock()
|
||||
dir := d.GetDirByName(nextDir)
|
||||
if dir == nil {
|
||||
l.mu.Unlock()
|
||||
return fmt.Errorf("error while updating info about dir %s", nextDir)
|
||||
}
|
||||
} else {
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
l.dirsInWork[nextDir] = wg
|
||||
l.mu.Unlock()
|
||||
td, err := l.listF(nextDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
l.mu.Lock()
|
||||
d.dirs[nextDir] = td
|
||||
wg = l.dirsInWork[nextDir]
|
||||
delete(l.dirsInWork, nextDir)
|
||||
wg.Done()
|
||||
}
|
||||
} else {
|
||||
l.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
371
fs/extract/dirtreeholder_test.go
Normal file
371
fs/extract/dirtreeholder_test.go
Normal file
|
@ -0,0 +1,371 @@
|
|||
package extract
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/extract/tarcache/tartest"
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type dirEntryForTest struct {
|
||||
name string
|
||||
}
|
||||
|
||||
func (d dirEntryForTest) Fs() fs.Info {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d dirEntryForTest) String() string {
|
||||
return d.name
|
||||
}
|
||||
|
||||
func (d dirEntryForTest) Remote() string {
|
||||
return d.name
|
||||
}
|
||||
|
||||
func (d dirEntryForTest) ModTime(context.Context) time.Time {
|
||||
return time.Now()
|
||||
}
|
||||
|
||||
func (d dirEntryForTest) Size() int64 {
|
||||
return -1
|
||||
}
|
||||
|
||||
type dirForTest struct {
|
||||
dirEntryForTest
|
||||
}
|
||||
|
||||
func (d dirForTest) Items() int64 {
|
||||
return -1
|
||||
}
|
||||
|
||||
func (d dirForTest) ID() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
type objectForTest struct {
|
||||
dirEntryForTest
|
||||
}
|
||||
|
||||
func (o objectForTest) Hash(context.Context, hash.Type) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func (o objectForTest) Storable() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func createDirectory(dirName string) fs.DirEntry {
|
||||
return &dirForTest{
|
||||
dirEntryForTest{name: dirName},
|
||||
}
|
||||
}
|
||||
|
||||
func createObject(dirName string) fs.DirEntry {
|
||||
return &objectForTest{
|
||||
dirEntryForTest{name: dirName},
|
||||
}
|
||||
}
|
||||
|
||||
func createDirTree(dir *Dir, deep, minItemsInDir, maxItemsInDir int) {
|
||||
if deep <= 0 {
|
||||
return
|
||||
}
|
||||
rnd := tartest.NewRandomizer()
|
||||
|
||||
nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir
|
||||
|
||||
for j := 0; j < nItem; j++ {
|
||||
isFile := deep == 1 || rnd.Dice(50)
|
||||
var prefix string
|
||||
if len(dir.name) > 0 {
|
||||
prefix = dir.name + "/"
|
||||
}
|
||||
if isFile {
|
||||
name := fmt.Sprintf("%sfile_%s", prefix, rnd.RandomFileName())
|
||||
newItem := createObject(name)
|
||||
dir.objs[name] = newItem
|
||||
} else {
|
||||
name := fmt.Sprintf("%sdir_%s", prefix, rnd.RandomFileName())
|
||||
newItem := createDirectory(name)
|
||||
dir.objs[name] = newItem
|
||||
childDir := NewDir(name)
|
||||
createDirTree(childDir, deep-1, minItemsInDir, maxItemsInDir)
|
||||
if len(childDir.dirs) != 0 || len(childDir.objs) != 0 {
|
||||
dir.dirs[childDir.name] = childDir
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type listFnHelper struct {
|
||||
mu sync.Mutex
|
||||
cnt atomic.Uint64
|
||||
root *Dir
|
||||
}
|
||||
|
||||
func (l *listFnHelper) Cnt() uint64 {
|
||||
return l.cnt.Load()
|
||||
}
|
||||
|
||||
func (l *listFnHelper) ResetCnt() {
|
||||
l.cnt.Store(0)
|
||||
}
|
||||
|
||||
func (l *listFnHelper) listDir(dir string) (*Dir, error) {
|
||||
l.cnt.Add(1)
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
d := l.root.GetDirByName(dir)
|
||||
if d == nil {
|
||||
return nil, os.ErrNotExist
|
||||
}
|
||||
newDir := NewDir(d.name)
|
||||
for _, child := range d.objs {
|
||||
newDir.AddChildObject(child)
|
||||
}
|
||||
return newDir, nil
|
||||
}
|
||||
|
||||
func fillSliceWithDirsInfo(s *[]string, dir *Dir) {
|
||||
for path, child := range dir.objs {
|
||||
if _, ok := child.(fs.Directory); ok {
|
||||
*s = append(*s, path)
|
||||
}
|
||||
}
|
||||
for _, child := range dir.dirs {
|
||||
fillSliceWithDirsInfo(s, child)
|
||||
}
|
||||
}
|
||||
|
||||
func fillMapWithFilesInfo(m map[string][]string, dir *Dir) {
|
||||
files := make([]string, 0)
|
||||
for path, child := range dir.objs {
|
||||
if _, ok := child.(fs.ObjectInfo); ok {
|
||||
files = append(files, filepath.Base(path))
|
||||
}
|
||||
}
|
||||
|
||||
m[dir.Name()] = files
|
||||
|
||||
for _, child := range dir.dirs {
|
||||
fillMapWithFilesInfo(m, child)
|
||||
}
|
||||
}
|
||||
|
||||
func TestListing(t *testing.T) {
|
||||
t.Run("Dir struct holder", testDirTreeHolder)
|
||||
|
||||
t.Run("Dir", testDir)
|
||||
|
||||
t.Run("String directory iterating", testStrDirIter)
|
||||
}
|
||||
|
||||
func testDirTreeHolder(t *testing.T) {
|
||||
root := NewDir("")
|
||||
createDirTree(root, 6, 5, 10)
|
||||
listF := &listFnHelper{
|
||||
root: root,
|
||||
}
|
||||
t.Run("Concurrent listing", func(t *testing.T) {
|
||||
s := make([]string, 0)
|
||||
m := make(map[string][]string)
|
||||
fillSliceWithDirsInfo(&s, root)
|
||||
fillMapWithFilesInfo(m, root)
|
||||
sort.Slice(s, func(i, j int) bool {
|
||||
return len(s[i]) < len(s[j])
|
||||
})
|
||||
require.NotNil(t, root)
|
||||
holder := newDirStructHolder(listF.listDir)
|
||||
halfLen := len(s) / 2
|
||||
path := s[halfLen]
|
||||
expectedCnt := 0
|
||||
dIter := newStrDirIter(path)
|
||||
for {
|
||||
expectedCnt++
|
||||
_, next := dIter.next()
|
||||
if next == "" {
|
||||
break
|
||||
}
|
||||
}
|
||||
require.NotNil(t, holder)
|
||||
|
||||
eg, ctx := errgroup.WithContext(context.Background())
|
||||
eg.Go(func() error {
|
||||
return holder.updateDirList(ctx, path)
|
||||
})
|
||||
eg.Go(func() error {
|
||||
return holder.updateDirList(ctx, path)
|
||||
})
|
||||
eg.Go(func() error {
|
||||
return holder.updateDirList(ctx, path)
|
||||
})
|
||||
eg.Go(func() error {
|
||||
return holder.updateDirList(ctx, path+"not.exists")
|
||||
})
|
||||
err := eg.Wait()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(expectedCnt), listF.Cnt())
|
||||
dIter = newStrDirIter(path)
|
||||
var cur, next string
|
||||
next = "1"
|
||||
for next != "" {
|
||||
cur, next = dIter.next()
|
||||
if next == "" {
|
||||
break
|
||||
}
|
||||
require.True(t, holder.isDirExist(cur))
|
||||
files, ok := m[cur]
|
||||
require.True(t, ok)
|
||||
for _, file := range files {
|
||||
var filePath string
|
||||
if cur == "" {
|
||||
filePath = file
|
||||
} else {
|
||||
filePath = cur + "/" + file
|
||||
}
|
||||
obj := holder.getObjectByPath(filePath)
|
||||
require.NotNil(t, obj)
|
||||
|
||||
require.True(t, ok)
|
||||
require.Equal(t, filePath, obj.Remote())
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func testDir(t *testing.T) {
|
||||
finalDir := "path/with/more/than/one/dir"
|
||||
files := map[string][]string{
|
||||
"": {"file0.1.ext", "file0.2.ext"},
|
||||
"path": {"file1.1.ext", "file1.2.ext"},
|
||||
"path/with/more/than/one": {"file2.1.ext", "file2.2.ext"},
|
||||
"path/with/more/than/one/dir": {"file3.1.ext", "file3.2.ext"},
|
||||
}
|
||||
|
||||
dirIter := newStrDirIter(finalDir)
|
||||
var root, prevDir *Dir
|
||||
next := "1"
|
||||
for next != "" {
|
||||
var cur string
|
||||
cur, next = dirIter.next()
|
||||
dir := NewDir(cur)
|
||||
if root == nil {
|
||||
root = dir
|
||||
}
|
||||
if files, ok := files[cur]; ok {
|
||||
for _, file := range files {
|
||||
var path string
|
||||
if cur == "" {
|
||||
path = file
|
||||
} else {
|
||||
path = cur + "/" + file
|
||||
}
|
||||
dir.AddChildObject(createObject(path))
|
||||
|
||||
}
|
||||
}
|
||||
if prevDir != nil {
|
||||
prevDir.AddChildDir(dir)
|
||||
prevDir.AddChildObject(createDirectory(dir.Name()))
|
||||
}
|
||||
prevDir = dir
|
||||
}
|
||||
|
||||
t.Run("GetDirByName", func(t *testing.T) {
|
||||
dirIter = newStrDirIter(finalDir)
|
||||
next = "1"
|
||||
cnt := 0
|
||||
for next != "" {
|
||||
var cur string
|
||||
cur, next = dirIter.next()
|
||||
dir := root.GetDirByName(cur)
|
||||
require.NotNil(t, dir)
|
||||
cnt++
|
||||
}
|
||||
require.Equal(t, 7, cnt)
|
||||
})
|
||||
|
||||
t.Run("getObjectByPath", func(t *testing.T) {
|
||||
for dirName, fileNames := range files {
|
||||
for _, fileName := range fileNames {
|
||||
var filePath string
|
||||
if dirName == "" {
|
||||
filePath = fileName
|
||||
} else {
|
||||
filePath = dirName + "/" + fileName
|
||||
}
|
||||
obj := root.getObjectByPath(filePath)
|
||||
require.NotNil(t, obj)
|
||||
require.NotNil(t, obj.(fs.ObjectInfo))
|
||||
filePath += ".fail"
|
||||
obj = root.getObjectByPath(filePath)
|
||||
require.Nil(t, obj)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func testStrDirIter(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
dir string
|
||||
res []string
|
||||
}{
|
||||
{
|
||||
name: "path with more than one dir",
|
||||
dir: "path/with/more/than/one/dir",
|
||||
res: []string{
|
||||
"",
|
||||
"path",
|
||||
"path/with",
|
||||
"path/with/more",
|
||||
"path/with/more/than",
|
||||
"path/with/more/than/one",
|
||||
"path/with/more/than/one/dir",
|
||||
"",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "path with one dir",
|
||||
dir: "one_dir",
|
||||
res: []string{
|
||||
"",
|
||||
"one_dir",
|
||||
"",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "empty path",
|
||||
dir: "",
|
||||
res: []string{
|
||||
"",
|
||||
"",
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
l := newStrDirIter(tc.dir)
|
||||
for i, p := range tc.res {
|
||||
if p == "" && i > 0 {
|
||||
break
|
||||
}
|
||||
cur, next := l.next()
|
||||
require.Equal(t, cur, p)
|
||||
require.Equal(t, next, tc.res[i+1])
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
845
fs/extract/extract.go
Normal file
845
fs/extract/extract.go
Normal file
|
@ -0,0 +1,845 @@
|
|||
// Package extract is the implementation of extract command
|
||||
package extract
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/extract/tarcache"
|
||||
"github.com/rclone/rclone/fs/filter"
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
"github.com/rclone/rclone/fs/list"
|
||||
"github.com/rclone/rclone/fs/operations"
|
||||
)
|
||||
|
||||
type payloadType int
|
||||
|
||||
const (
|
||||
payloadTypeBytes payloadType = iota
|
||||
payloadTypeIo
|
||||
)
|
||||
|
||||
// TarCacheFile is a wrapper around tarcache.FileInfo that adds support for file reopening functionality
|
||||
type TarCacheFile struct {
|
||||
owner *TarCache
|
||||
pt payloadType
|
||||
fileInfo tarcache.FileInfo
|
||||
reader io.Reader
|
||||
}
|
||||
|
||||
// Release releases resource associated with the file
|
||||
func (t *TarCacheFile) Release() {
|
||||
t.fileInfo.ReleaseFn()
|
||||
}
|
||||
|
||||
// Read implements io.Reader interface
|
||||
func (t *TarCacheFile) Read(p []byte) (n int, err error) {
|
||||
return t.reader.Read(p)
|
||||
}
|
||||
|
||||
// reopenIo implements file reopening for files that don't fit into memory
|
||||
func (t *TarCacheFile) reopenIo(ctx context.Context) (err error) {
|
||||
return t.owner.requestReopenFile(ctx, t)
|
||||
}
|
||||
|
||||
// reopenBytes implements file reopening for files that fit into memory
|
||||
func (t *TarCacheFile) reopenBytes(ctx context.Context) (err error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
t.reader = bytes.NewReader(t.fileInfo.Payload)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Reopen initializes the file reopening process
|
||||
func (t *TarCacheFile) Reopen(ctx context.Context) (err error) {
|
||||
if t.pt == payloadTypeBytes {
|
||||
return t.reopenBytes(ctx)
|
||||
}
|
||||
return t.reopenIo(ctx)
|
||||
}
|
||||
|
||||
func (t *TarCache) requestReopenFile(ctx context.Context, file *TarCacheFile) error {
|
||||
reopenReq := reopenRequest{
|
||||
file: file,
|
||||
respCh: make(chan error, 1),
|
||||
}
|
||||
|
||||
select {
|
||||
case t.reopenReqCh <- reopenReq:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-t.finishCh:
|
||||
return errTarCacheIsClosed
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-reopenReq.respCh:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-t.finishCh:
|
||||
return errTarCacheIsClosed
|
||||
}
|
||||
}
|
||||
|
||||
// newTarCacheFile creates a new TarCacheFile from the given tarcache.FileInfo
|
||||
func (t *TarCache) newTarCacheFile(info tarcache.FileInfo) *TarCacheFile {
|
||||
res := &TarCacheFile{
|
||||
owner: t,
|
||||
pt: payloadTypeIo,
|
||||
fileInfo: info,
|
||||
reader: info.IoPayload,
|
||||
}
|
||||
// If IoPayload is nil, we assume that the file is small enough to fit into memory
|
||||
if info.IoPayload == nil {
|
||||
res.pt = payloadTypeBytes
|
||||
res.reader = bytes.NewReader(info.Payload)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// TarCache implements the functionality of reopening files within an archive
|
||||
type TarCache struct {
|
||||
// parameters
|
||||
fsrc fs.Fs
|
||||
srcFileName string
|
||||
newTarReaderFn newTarReader
|
||||
transfers int
|
||||
maxFileSizeForCache int
|
||||
options []fs.OpenOption
|
||||
|
||||
// no thread safe internal state
|
||||
closeStreamFn func() error
|
||||
tarCache *tarcache.Cache
|
||||
workerCh chan func()
|
||||
reopenReqCh chan reopenRequest
|
||||
finishCh chan struct{}
|
||||
pendingCloseCache []*tarcache.Cache
|
||||
|
||||
// thread safe internal state
|
||||
mu sync.Mutex
|
||||
cache map[string]tarcache.FileInfo
|
||||
}
|
||||
|
||||
func newTarCache(fsrc fs.Fs, srcFileName string, newTarReaderFn newTarReader, transfers int, maxFileSizeForCache int) *TarCache {
|
||||
res := &TarCache{
|
||||
fsrc: fsrc,
|
||||
srcFileName: srcFileName,
|
||||
newTarReaderFn: newTarReaderFn,
|
||||
transfers: transfers,
|
||||
maxFileSizeForCache: maxFileSizeForCache,
|
||||
workerCh: make(chan func()),
|
||||
reopenReqCh: make(chan reopenRequest),
|
||||
finishCh: make(chan struct{}),
|
||||
cache: make(map[string]tarcache.FileInfo),
|
||||
}
|
||||
|
||||
if res.newTarReaderFn == nil {
|
||||
res.newTarReaderFn = func(reader io.Reader) tarcache.TarReader {
|
||||
return tar.NewReader(reader)
|
||||
}
|
||||
}
|
||||
// Run a goroutine that will handle functions waiting for file reopen requests
|
||||
go func() {
|
||||
for workFn := range res.workerCh {
|
||||
workFn()
|
||||
}
|
||||
}()
|
||||
return res
|
||||
}
|
||||
|
||||
// Close closes the tar cache and release all resources associated with it
|
||||
func (t *TarCache) Close() error {
|
||||
close(t.workerCh)
|
||||
err := t.closeHelper()
|
||||
if t.tarCache != nil {
|
||||
_ = t.tarCache.Close()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
type reopenRequest struct {
|
||||
file *TarCacheFile
|
||||
respCh chan error
|
||||
}
|
||||
|
||||
var errReopenArchiveRequest = errors.New("reopen archive request")
|
||||
var errTarCacheIsClosed = errors.New("tar cache is closed")
|
||||
|
||||
func (t *TarCache) handleReopenRequest(ctx context.Context, reopenReq reopenRequest) {
|
||||
defer close(reopenReq.respCh)
|
||||
t.pendingCloseCache = append(t.pendingCloseCache, t.tarCache)
|
||||
// Reopening a file in the archive requires closing the currently open archive first
|
||||
_ = t.closeHelper()
|
||||
// Release the resources occupied by the file
|
||||
reopenReq.file.Release()
|
||||
var err error
|
||||
prev := t.tarCache
|
||||
// Close the underlying tarcache.Cache
|
||||
_ = prev.Close()
|
||||
// Create a new tarcahce.Cache and function that closes the underlying gzip reader
|
||||
t.tarCache, t.closeStreamFn, err = t.openHelper(ctx)
|
||||
if err != nil {
|
||||
// Notify the goroutine that requested the file reopening about error
|
||||
reopenReq.respCh <- err
|
||||
return
|
||||
}
|
||||
// Search for the required file within the archive
|
||||
err = t.FindFile(ctx, reopenReq.file.fileInfo.FileInfo.FilePath)
|
||||
if err == nil {
|
||||
// Create a new TarCacheFile from the found FileInfo
|
||||
file := t.GetFile(reopenReq.file.fileInfo.FileInfo.FilePath)
|
||||
// If the previous call to FindFile completed without errors, then GetFile must return a non-nil TarCacheFile
|
||||
if file == nil {
|
||||
panic("shouldn't get here")
|
||||
}
|
||||
*reopenReq.file = *file
|
||||
}
|
||||
// Notify the goroutine that requested the file reopening about the result
|
||||
reopenReq.respCh <- err
|
||||
}
|
||||
|
||||
// FindFile searches for the file with the given filePath within the archive
|
||||
func (t *TarCache) FindFile(ctx context.Context, filePath string) error {
|
||||
for {
|
||||
select {
|
||||
case <-t.finishCh:
|
||||
return errTarCacheIsClosed
|
||||
default:
|
||||
}
|
||||
// Iterate through the archive files until the target file is found
|
||||
h, err := t.tarCache.Next(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if h.FileInfo.FilePath != filePath {
|
||||
// If the file is not the target, free its resources right away
|
||||
h.ReleaseFn()
|
||||
continue
|
||||
}
|
||||
|
||||
t.mu.Lock()
|
||||
// Store the information about the file in the cache
|
||||
t.cache[h.FileInfo.FilePath] = h
|
||||
t.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// GetFile constructs a TarCacheFile from cached data, relying on the file path
|
||||
func (t *TarCache) GetFile(filePath string) *TarCacheFile {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
info, ok := t.cache[filePath]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return t.newTarCacheFile(info)
|
||||
}
|
||||
|
||||
// ReleaseUnprocessedFiles releases resources occupied by files that were not processed
|
||||
func (t *TarCache) ReleaseUnprocessedFiles() {
|
||||
t.mu.Lock()
|
||||
// Here we are preparing a list of functions that release resources, because we cannot call them when
|
||||
// iterating over the map, since the ReleaseFn method also uses the same mutex that protects the map.
|
||||
releaseFns := make([]func(), 0, len(t.cache))
|
||||
for _, info := range t.cache {
|
||||
releaseFns = append(releaseFns, info.ReleaseFn)
|
||||
}
|
||||
t.cache = make(map[string]tarcache.FileInfo)
|
||||
t.mu.Unlock()
|
||||
for _, fn := range releaseFns {
|
||||
fn()
|
||||
}
|
||||
}
|
||||
|
||||
// Next advances to the next file in the archive, caches its information, and returns the file's path within the archive.
|
||||
// This method also handles file reopen requests within the archive
|
||||
func (t *TarCache) Next(ctx context.Context) (string, error) {
|
||||
select {
|
||||
case <-t.finishCh:
|
||||
return "", errTarCacheIsClosed
|
||||
default:
|
||||
}
|
||||
for {
|
||||
watchdog := make(chan struct{})
|
||||
var reopenReq reopenRequest
|
||||
ctx2, cancel := context.WithCancelCause(ctx)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
t.workerCh <- func() {
|
||||
defer wg.Done()
|
||||
select {
|
||||
case <-watchdog:
|
||||
case reopenReq = <-t.reopenReqCh:
|
||||
// When a file reopen request arrives, it is necessary to interrupt the execution of t.tarCache.Next
|
||||
cancel(errReopenArchiveRequest)
|
||||
}
|
||||
}
|
||||
h, err := t.tarCache.Next(ctx2)
|
||||
relFn := h.ReleaseFn
|
||||
// Create a wrapper around the resource release function to additionally remove file information from the cache
|
||||
h.ReleaseFn = func() {
|
||||
t.mu.Lock()
|
||||
delete(t.cache, h.FileInfo.FilePath)
|
||||
t.mu.Unlock()
|
||||
relFn()
|
||||
}
|
||||
// Close the watchdog channel to exit from the watchdog function
|
||||
close(watchdog)
|
||||
// Wait for the watchdog function to complete its execution
|
||||
wg.Wait()
|
||||
for _, cache := range t.pendingCloseCache {
|
||||
_ = cache.Close()
|
||||
}
|
||||
t.pendingCloseCache = nil
|
||||
if err != nil {
|
||||
// When the error is triggered by a file reopen request, process the request accordingly
|
||||
if errors.Is(context.Cause(ctx2), errReopenArchiveRequest) {
|
||||
t.handleReopenRequest(ctx, reopenReq)
|
||||
continue
|
||||
}
|
||||
cancel(nil)
|
||||
close(t.finishCh)
|
||||
if err == io.EOF {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
return "", err
|
||||
}
|
||||
cancel(nil)
|
||||
|
||||
t.mu.Lock()
|
||||
_, ok := t.cache[h.FileInfo.FilePath]
|
||||
if ok {
|
||||
panic("shouldn't get here")
|
||||
}
|
||||
t.cache[h.FileInfo.FilePath] = h
|
||||
t.mu.Unlock()
|
||||
return h.FileInfo.FilePath, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TarCache) closeHelper() error {
|
||||
if t.closeStreamFn != nil {
|
||||
fn := t.closeStreamFn
|
||||
t.closeStreamFn = nil
|
||||
return fn()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// openHelper opens a new stream to the tar archive
|
||||
func (t *TarCache) openHelper(ctx context.Context) (cache *tarcache.Cache, closeFn func() error, err error) {
|
||||
obj, err := t.fsrc.NewObject(ctx, t.srcFileName)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
reader, err := obj.Open(ctx, t.options...)
|
||||
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
gzr, err := gzip.NewReader(reader)
|
||||
if err != nil {
|
||||
// we don't care about the error, we just need to close the reader
|
||||
_ = reader.Close()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
tarReader := t.newTarReaderFn(gzr)
|
||||
cache = tarcache.NewTarCache(tarReader, t.transfers, int64(t.maxFileSizeForCache))
|
||||
return cache, func() error {
|
||||
gzErr := gzr.Close()
|
||||
readerErr := reader.Close()
|
||||
if gzErr != nil {
|
||||
return gzErr
|
||||
}
|
||||
return readerErr
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Open opens the tar archive from remote
|
||||
func (t *TarCache) Open(ctx context.Context, options ...fs.OpenOption) error {
|
||||
t.options = options
|
||||
var err error
|
||||
t.tarCache, t.closeStreamFn, err = t.openHelper(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
type copyJob struct {
|
||||
src fs.Object
|
||||
dst fs.Object
|
||||
path string
|
||||
}
|
||||
|
||||
func makeListDir(ctx context.Context, f fs.Fs) listDirFn {
|
||||
return func(dir string) (*Dir, error) {
|
||||
dirCtx := filter.SetUseFilter(ctx, f.Features().FilterAware) // make filter-aware backends constrain List
|
||||
entities, err := list.DirSorted(dirCtx, f, false, dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res := NewDir(dir)
|
||||
for _, entry := range entities {
|
||||
res.AddChildObject(entry)
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
}
|
||||
|
||||
// extractObj implements fs.Object interface
|
||||
type extractObj struct {
|
||||
ownerFs fs.Fs
|
||||
isFirstOpen atomic.Bool
|
||||
file *TarCacheFile
|
||||
}
|
||||
|
||||
func (e *extractFs) newExtractObj(file *TarCacheFile) *extractObj {
|
||||
res := &extractObj{
|
||||
ownerFs: e,
|
||||
file: file,
|
||||
}
|
||||
res.isFirstOpen.Store(true)
|
||||
return res
|
||||
}
|
||||
|
||||
// Release releases resource associated with the object
|
||||
func (o *extractObj) Release() {
|
||||
o.file.Release()
|
||||
}
|
||||
|
||||
// extractFs implements fs.Fs interface
|
||||
type extractFs struct {
|
||||
// parameters
|
||||
fsrc fs.Fs
|
||||
srcFileName string
|
||||
features *fs.Features
|
||||
// internal state
|
||||
tarCache *TarCache
|
||||
|
||||
mu sync.Mutex
|
||||
tarFormat tar.Format
|
||||
}
|
||||
|
||||
type extract struct {
|
||||
fdst fs.Fs
|
||||
fsrc fs.Fs
|
||||
extrFs *extractFs
|
||||
srcFileName string
|
||||
maxFileSizeForCache int
|
||||
hashType hash.Type // common hash to use
|
||||
hashOption *fs.HashesOption // open option for the common hash
|
||||
// internal state
|
||||
ci *fs.ConfigInfo // global config
|
||||
ctx context.Context // internal context for controlling go-routines
|
||||
cancel func() // cancel the context
|
||||
maxDurationEndTime time.Time // end time if --max-duration is set
|
||||
toBeChecked chan string // checkers channel
|
||||
checkersWg sync.WaitGroup // wait for checkers
|
||||
toBeUploaded chan copyJob // copiers channels
|
||||
transfersWg sync.WaitGroup // wait for transfers
|
||||
newTarReaderFn newTarReader
|
||||
|
||||
mu sync.Mutex
|
||||
lastError error
|
||||
errCnt atomic.Int32
|
||||
dsh *dirTreeHolder // destination directory tree holder
|
||||
}
|
||||
|
||||
// Fs returns read only access to the Fs that this object is part of
|
||||
func (o *extractObj) Fs() fs.Info {
|
||||
return o.ownerFs
|
||||
}
|
||||
|
||||
// String returns a description of the Object
|
||||
func (o *extractObj) String() string {
|
||||
if o == nil {
|
||||
return "<nil>"
|
||||
}
|
||||
return o.Remote()
|
||||
}
|
||||
|
||||
// Remote returns the remote path
|
||||
func (o *extractObj) Remote() string {
|
||||
return o.file.fileInfo.FileInfo.FilePath
|
||||
}
|
||||
|
||||
// ModTime returns the modification date of the file
|
||||
// It should return a best guess if one isn't available
|
||||
func (o *extractObj) ModTime(_ context.Context) time.Time {
|
||||
return o.file.fileInfo.FileInfo.ModTime
|
||||
}
|
||||
|
||||
// Size returns the size of the object
|
||||
func (o *extractObj) Size() int64 {
|
||||
return o.file.fileInfo.FileInfo.Size
|
||||
}
|
||||
|
||||
// Hash returns the selected checksum of the file
|
||||
// If no checksum is available it returns ""
|
||||
func (o *extractObj) Hash(_ context.Context, _ hash.Type) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// Storable says whether this object can be stored
|
||||
func (o *extractObj) Storable() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Open opens the file for read. Call Close() on the returned io.ReadCloser
|
||||
func (o *extractObj) Open(ctx context.Context, _ ...fs.OpenOption) (io.ReadCloser, error) {
|
||||
if o.isFirstOpen.Load() {
|
||||
o.isFirstOpen.Store(false)
|
||||
|
||||
return io.NopCloser(o.file), nil
|
||||
}
|
||||
err := o.file.Reopen(ctx)
|
||||
return io.NopCloser(o.file), err
|
||||
}
|
||||
|
||||
func (o *extractObj) SetModTime(context.Context, time.Time) error {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (o *extractObj) Update(context.Context, io.Reader, fs.ObjectInfo, ...fs.OpenOption) error {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (o *extractObj) Remove(context.Context) error {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (e *extractFs) List(context.Context, string) (fs.DirEntries, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (e *extractFs) Put(context.Context, io.Reader, fs.ObjectInfo, ...fs.OpenOption) (fs.Object, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (e *extractFs) Mkdir(context.Context, string) error {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (e *extractFs) Rmdir(context.Context, string) error {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (e *extractFs) Name() string {
|
||||
return "extract"
|
||||
}
|
||||
|
||||
// Root of the remote
|
||||
func (e *extractFs) Root() string {
|
||||
return fmt.Sprintf("%s:%s/%s", e.fsrc.Name(), e.fsrc.Root(), e.srcFileName)
|
||||
}
|
||||
|
||||
// String returns a description of the FS
|
||||
func (e *extractFs) String() string {
|
||||
return fmt.Sprintf("tar.gz archive %s at %s", e.Name(), e.Root())
|
||||
}
|
||||
|
||||
// Precision of the ModTimes in this Fs
|
||||
func (e *extractFs) Precision() time.Duration {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
if e.tarFormat == tar.FormatPAX {
|
||||
return time.Millisecond
|
||||
}
|
||||
return time.Second
|
||||
}
|
||||
|
||||
// Hashes returns the supported hash types of the filesystem
|
||||
func (e *extractFs) Hashes() hash.Set {
|
||||
return hash.Set(hash.None)
|
||||
}
|
||||
|
||||
// Features returns the optional features of this Fs
|
||||
func (e *extractFs) Features() *fs.Features {
|
||||
return e.features
|
||||
}
|
||||
|
||||
// NewObject finds the Object at remote. If it can't be found
|
||||
// it returns the error ErrorObjectNotFound.
|
||||
func (e *extractFs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
file := e.tarCache.GetFile(remote)
|
||||
if file == nil {
|
||||
panic("cache MUST contain information about this file")
|
||||
}
|
||||
e.mu.Lock()
|
||||
if e.tarFormat == tar.FormatUnknown {
|
||||
e.tarFormat = file.fileInfo.FileInfo.Format
|
||||
}
|
||||
e.mu.Unlock()
|
||||
return e.newExtractObj(file), nil
|
||||
}
|
||||
|
||||
type newTarReader func(io.Reader) tarcache.TarReader
|
||||
|
||||
func newExtract(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, srcFileName string, maxFileSizeForCache int, newTarReaderFn newTarReader) *extract {
|
||||
ci := fs.GetConfig(ctx)
|
||||
|
||||
s := &extract{
|
||||
ctx: ctx,
|
||||
ci: ci,
|
||||
fdst: fdst,
|
||||
fsrc: fsrc,
|
||||
srcFileName: srcFileName,
|
||||
maxFileSizeForCache: maxFileSizeForCache,
|
||||
newTarReaderFn: newTarReaderFn,
|
||||
toBeChecked: make(chan string, ci.Transfers),
|
||||
toBeUploaded: make(chan copyJob, ci.Transfers),
|
||||
}
|
||||
if ci.MaxDuration > 0 {
|
||||
s.maxDurationEndTime = time.Now().Add(ci.MaxDuration)
|
||||
fs.Infof(s.fdst, "Transfer session %v deadline: %s", ci.CutoffMode, s.maxDurationEndTime.Format("2006/01/02 15:04:05"))
|
||||
}
|
||||
|
||||
// If a max session duration has been defined add a deadline
|
||||
// to the main context if cutoff mode is hard. This will cut
|
||||
// the transfers off.
|
||||
if !s.maxDurationEndTime.IsZero() && ci.CutoffMode == fs.CutoffModeHard {
|
||||
s.ctx, s.cancel = context.WithDeadline(ctx, s.maxDurationEndTime)
|
||||
} else {
|
||||
s.ctx, s.cancel = context.WithCancel(ctx)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// transfer copy job.scr to job.path.
|
||||
func (e *extract) transfer(ctx context.Context, in <-chan copyJob) {
|
||||
defer e.transfersWg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case job, ok := <-in:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
obj := job.src.(*extractObj)
|
||||
|
||||
var err error
|
||||
|
||||
needTransfer := true
|
||||
if job.dst != nil {
|
||||
needTransfer = operations.NeedTransfer(e.ctx, job.dst, job.src)
|
||||
}
|
||||
|
||||
if needTransfer {
|
||||
_, err = operations.Copy(ctx, e.fdst, job.dst, job.path, job.src)
|
||||
}
|
||||
obj.Release()
|
||||
|
||||
if err != nil {
|
||||
e.errCnt.Add(1)
|
||||
e.setLastError(err)
|
||||
e.cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runTransfers starts transfers
|
||||
func (e *extract) runTransfers(ctx context.Context, in <-chan copyJob) {
|
||||
for i := 0; i < e.ci.Transfers; i++ {
|
||||
e.transfersWg.Add(1)
|
||||
go e.transfer(ctx, in)
|
||||
}
|
||||
}
|
||||
|
||||
// stopTransfers stops all transfers and waits for them to finish
|
||||
func (e *extract) stopTransfers() {
|
||||
close(e.toBeUploaded)
|
||||
fs.Debugf(e.fdst, "Waiting for transfers to finish")
|
||||
e.transfersWg.Wait()
|
||||
}
|
||||
|
||||
func (e *extract) setLastError(err error) {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
if e.lastError == nil {
|
||||
e.lastError = err
|
||||
}
|
||||
}
|
||||
|
||||
func (e *extract) getLastError() error {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
return e.lastError
|
||||
}
|
||||
|
||||
// checker checks if filePath exists in fdst and depending on the result, prepare the task for transfer
|
||||
func (e *extract) checker(ctx context.Context, in <-chan string, out chan<- copyJob) {
|
||||
defer e.checkersWg.Done()
|
||||
for {
|
||||
var filePath string
|
||||
var ok bool
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case filePath, ok = <-in:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
idx := strings.LastIndex(filePath, "/")
|
||||
var dir string
|
||||
if idx != -1 {
|
||||
dir = filePath[:idx]
|
||||
}
|
||||
err := e.dsh.updateDirList(ctx, dir)
|
||||
if err != nil {
|
||||
// TODO: think about retries
|
||||
e.cancel()
|
||||
return
|
||||
}
|
||||
src, err := e.extrFs.NewObject(ctx, filePath)
|
||||
if err != nil {
|
||||
e.setLastError(err)
|
||||
e.cancel()
|
||||
return
|
||||
}
|
||||
|
||||
job := copyJob{
|
||||
src: src,
|
||||
path: filePath,
|
||||
}
|
||||
if objInfo := e.dsh.getObjectByPath(filePath); objInfo != nil {
|
||||
if dst := objInfo.(fs.Object); dst != nil {
|
||||
// When a file with the same name as in the archive is found at the destination,
|
||||
// modify the copyJob to ensure operations.Copy uses Patch instead of Put
|
||||
job.dst = dst
|
||||
}
|
||||
}
|
||||
|
||||
out <- job
|
||||
}
|
||||
}
|
||||
|
||||
// runCheckers starts checkers
|
||||
func (e *extract) runCheckers(ctx context.Context, in <-chan string, out chan<- copyJob) {
|
||||
for i := 0; i < e.ci.Transfers; i++ {
|
||||
e.checkersWg.Add(1)
|
||||
go e.checker(ctx, in, out)
|
||||
}
|
||||
}
|
||||
|
||||
// stopTransfers stops all checkers and waits for them to finish
|
||||
func (e *extract) stopCheckers() {
|
||||
close(e.toBeChecked)
|
||||
fs.Debugf(e.fdst, "Waiting for checkers to finish")
|
||||
e.checkersWg.Wait()
|
||||
}
|
||||
|
||||
func newExtractFs(ctx context.Context, fsrc fs.Fs, srcFileName string, transfers int, maxFileSizeForCache int, newTarReaderFn newTarReader) *extractFs {
|
||||
res := &extractFs{
|
||||
fsrc: fsrc,
|
||||
srcFileName: srcFileName,
|
||||
tarCache: newTarCache(fsrc, srcFileName, newTarReaderFn, transfers, maxFileSizeForCache),
|
||||
tarFormat: tar.FormatUnknown,
|
||||
}
|
||||
res.features = (&fs.Features{
|
||||
CanHaveEmptyDirectories: false,
|
||||
}).Fill(ctx, res)
|
||||
return res
|
||||
}
|
||||
|
||||
// Open opens an archive from the source filesystem
|
||||
func (e *extractFs) Open(ctx context.Context, options ...fs.OpenOption) error {
|
||||
return e.tarCache.Open(ctx, options...)
|
||||
}
|
||||
|
||||
func (e *extractFs) close() error {
|
||||
return e.tarCache.Close()
|
||||
}
|
||||
|
||||
func (e *extractFs) AddNextObject(ctx context.Context) (string, error) {
|
||||
return e.tarCache.Next(ctx)
|
||||
}
|
||||
|
||||
func (e *extract) run() error {
|
||||
fExtr := newExtractFs(e.ctx, e.fsrc, e.srcFileName, e.ci.Transfers, e.maxFileSizeForCache, e.newTarReaderFn)
|
||||
|
||||
e.dsh = newDirStructHolder(makeListDir(e.ctx, e.fdst))
|
||||
e.extrFs = fExtr
|
||||
|
||||
e.hashType, e.hashOption = operations.CommonHash(e.ctx, fExtr, e.fdst)
|
||||
|
||||
// Options for the download
|
||||
downloadOptions := []fs.OpenOption{e.hashOption}
|
||||
for _, option := range e.ci.DownloadHeaders {
|
||||
downloadOptions = append(downloadOptions, option)
|
||||
}
|
||||
|
||||
err := fExtr.Open(e.ctx, downloadOptions...)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.toBeChecked = make(chan string, e.ci.Transfers)
|
||||
e.toBeUploaded = make(chan copyJob, e.ci.Transfers)
|
||||
|
||||
e.runTransfers(e.ctx, e.toBeUploaded)
|
||||
e.runCheckers(e.ctx, e.toBeChecked, e.toBeUploaded)
|
||||
|
||||
var exitErr error
|
||||
|
||||
for {
|
||||
path, err := fExtr.AddNextObject(e.ctx)
|
||||
if err != nil {
|
||||
exitErr = err
|
||||
break
|
||||
}
|
||||
if path == "" {
|
||||
break
|
||||
}
|
||||
|
||||
e.toBeChecked <- path
|
||||
}
|
||||
|
||||
e.stopCheckers()
|
||||
e.stopTransfers()
|
||||
fExtr.tarCache.ReleaseUnprocessedFiles()
|
||||
if err := fExtr.close(); err != nil {
|
||||
if exitErr == nil {
|
||||
exitErr = err
|
||||
}
|
||||
}
|
||||
|
||||
lastError := e.getLastError()
|
||||
if lastError != nil {
|
||||
return lastError
|
||||
}
|
||||
return exitErr
|
||||
}
|
||||
|
||||
// Extract implements the extract command
|
||||
func Extract(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, srcFileName string, maxFileSizeForCache int) error {
|
||||
do := newExtract(ctx, fdst, fsrc, srcFileName, maxFileSizeForCache, nil)
|
||||
return do.run()
|
||||
}
|
493
fs/extract/extract_test.go
Normal file
493
fs/extract/extract_test.go
Normal file
|
@ -0,0 +1,493 @@
|
|||
package extract
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/backend/memory"
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/extract/tarcache"
|
||||
"github.com/rclone/rclone/fs/extract/tarcache/tartest"
|
||||
"github.com/rclone/rclone/fs/fserrors"
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
"github.com/rclone/rclone/fstest/mockobject"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type objInfo struct {
|
||||
info fs.Fs
|
||||
remote string
|
||||
modTime time.Time
|
||||
size int64
|
||||
}
|
||||
|
||||
func (o *objInfo) Fs() fs.Info {
|
||||
return o.info
|
||||
}
|
||||
|
||||
func (o *objInfo) String() string {
|
||||
return o.remote
|
||||
}
|
||||
|
||||
func (o *objInfo) Remote() string {
|
||||
return o.remote
|
||||
}
|
||||
|
||||
func (o *objInfo) ModTime(context.Context) time.Time {
|
||||
return o.modTime
|
||||
}
|
||||
|
||||
func (o *objInfo) Size() int64 {
|
||||
return o.size
|
||||
}
|
||||
|
||||
func (o *objInfo) Hash(context.Context, hash.Type) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func (o *objInfo) Storable() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func newObjInfo(info fs.Fs, remote string, size int64) fs.ObjectInfo {
|
||||
rnd := tartest.NewRandomizer()
|
||||
offset := time.Millisecond * time.Duration(rnd.Intn(1000000))
|
||||
|
||||
return &objInfo{info, remote, time.Now().Add(offset), size}
|
||||
}
|
||||
|
||||
type FailObj struct {
|
||||
fs.Object
|
||||
fscr *FailFs
|
||||
isFailObj bool
|
||||
}
|
||||
|
||||
type FailFs struct {
|
||||
fs.Fs
|
||||
failFile string
|
||||
maxFails atomic.Int32
|
||||
totalFails atomic.Int32
|
||||
putCnt atomic.Int32
|
||||
updateCnt atomic.Int32
|
||||
}
|
||||
|
||||
func (f *FailFs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) error {
|
||||
listR, ok := f.Fs.(fs.ListRer)
|
||||
if !ok {
|
||||
return fmt.Errorf("fail fs %s does not implement fs.ListRer", f.Fs)
|
||||
}
|
||||
return listR.ListR(ctx, dir, callback)
|
||||
}
|
||||
|
||||
type FailReadCloser struct {
|
||||
io.ReadCloser
|
||||
}
|
||||
|
||||
type FailTarReader struct {
|
||||
*tar.Reader
|
||||
failFile string
|
||||
maxFails atomic.Int32
|
||||
totalFails atomic.Int32
|
||||
|
||||
currentHeader *tar.Header
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (f *FailTarReader) UpdateTarReader(r *tar.Reader) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
f.Reader = r
|
||||
}
|
||||
|
||||
func (f *FailTarReader) tarReader() *tar.Reader {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
return f.Reader
|
||||
}
|
||||
|
||||
func (f *FailTarReader) Read(p []byte) (n int, err error) {
|
||||
if f.currentHeader == nil {
|
||||
return f.tarReader().Read(p)
|
||||
}
|
||||
if f.currentHeader.Name == f.failFile {
|
||||
if f.isNeedToFail() {
|
||||
f.decReadWriteFailAttempts()
|
||||
return 0, fserrors.RetryError(io.ErrNoProgress)
|
||||
}
|
||||
}
|
||||
return f.tarReader().Read(p)
|
||||
}
|
||||
|
||||
func (f *FailTarReader) Next() (*tar.Header, error) {
|
||||
h, err := f.Reader.Next()
|
||||
if err != nil {
|
||||
return h, err
|
||||
}
|
||||
f.currentHeader = h
|
||||
return h, nil
|
||||
}
|
||||
|
||||
func (f *FailTarReader) decReadWriteFailAttempts() {
|
||||
f.maxFails.Add(-1)
|
||||
f.totalFails.Add(1)
|
||||
}
|
||||
|
||||
func (f *FailTarReader) isNeedToFail() bool {
|
||||
return f.maxFails.Load() > 0
|
||||
}
|
||||
|
||||
func newFailTarReader(t *tar.Reader, failFile string, maxFails int) *FailTarReader {
|
||||
res := &FailTarReader{
|
||||
Reader: t,
|
||||
failFile: failFile,
|
||||
}
|
||||
res.maxFails.Store(int32(maxFails))
|
||||
return res
|
||||
}
|
||||
|
||||
func NewFailReadCloser(rc io.ReadCloser) *FailReadCloser {
|
||||
return &FailReadCloser{rc}
|
||||
}
|
||||
|
||||
func (r *FailReadCloser) Read([]byte) (int, error) {
|
||||
return 0, fserrors.RetryError(io.ErrNoProgress)
|
||||
}
|
||||
|
||||
func NewFailObj(f fs.Object, failFs *FailFs, isFailObj bool) *FailObj {
|
||||
return &FailObj{f, failFs, isFailObj}
|
||||
}
|
||||
|
||||
func (o *FailObj) Fs() fs.Info {
|
||||
return o.fscr
|
||||
}
|
||||
|
||||
func (o *FailObj) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
|
||||
rc, err := o.Object.Open(ctx, options...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if o.isFailObj && o.fscr.IsNeedToFail() {
|
||||
o.fscr.DecReadWriteFailAttempts()
|
||||
return NewFailReadCloser(rc), nil
|
||||
}
|
||||
|
||||
return rc, nil
|
||||
}
|
||||
|
||||
func (o *FailObj) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||
fFs, ok := o.Fs().(*FailFs)
|
||||
if !ok {
|
||||
panic("fail fs does not implement fail fs")
|
||||
}
|
||||
fFs.updateCnt.Add(1)
|
||||
if o.isFailObj && o.fscr.IsNeedToFail() {
|
||||
o.fscr.DecReadWriteFailAttempts()
|
||||
return fserrors.RetryError(io.ErrNoProgress)
|
||||
}
|
||||
return o.Object.Update(ctx, in, src, options...)
|
||||
}
|
||||
|
||||
func NewFailFs(f fs.Fs, failFile string, maxFails int) *FailFs {
|
||||
res := &FailFs{Fs: f, failFile: failFile}
|
||||
res.maxFails.Store(int32(maxFails))
|
||||
return res
|
||||
}
|
||||
|
||||
func (f *FailFs) IsNeedToFail() bool {
|
||||
return f.maxFails.Load() > 0
|
||||
}
|
||||
|
||||
func (f *FailFs) TotalFails() int {
|
||||
return int(f.totalFails.Load())
|
||||
}
|
||||
|
||||
func (f *FailFs) DecReadWriteFailAttempts() {
|
||||
f.maxFails.Add(-1)
|
||||
f.totalFails.Add(1)
|
||||
}
|
||||
|
||||
func (f *FailFs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
||||
retFs, err := f.Fs.NewObject(ctx, remote)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewFailObj(retFs, f, f.failFile == remote), nil
|
||||
}
|
||||
|
||||
func (f *FailFs) List(ctx context.Context, dir string) (fs.DirEntries, error) {
|
||||
entries, err := f.Fs.List(ctx, dir)
|
||||
for idx, entry := range entries {
|
||||
if obj, ok := entry.(fs.Object); ok {
|
||||
entries[idx] = NewFailObj(obj, f, f.failFile == obj.Remote())
|
||||
}
|
||||
}
|
||||
|
||||
return entries, err
|
||||
}
|
||||
|
||||
func (f *FailFs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||
f.putCnt.Add(1)
|
||||
if src.Remote() == f.failFile {
|
||||
if f.IsNeedToFail() {
|
||||
f.DecReadWriteFailAttempts()
|
||||
return nil, fserrors.RetryError(io.ErrNoProgress)
|
||||
}
|
||||
}
|
||||
return f.Fs.Put(ctx, in, src, options...)
|
||||
}
|
||||
|
||||
func TestExtract(t *testing.T) {
|
||||
t.Run("Happy path", testHappyPath)
|
||||
|
||||
t.Run("Double error while writing a small file to a remote server.", testFdstFailSmallFile)
|
||||
t.Run("Double error while writing a large file to a remote server.", testFdstFailLargeFile)
|
||||
|
||||
t.Run("Multiple errors while writing a small file to a remote server.", testFdstFail2SmallFile)
|
||||
t.Run("Multiple errors while writing a large file to a remote server.", testFdstFail2LargeFile)
|
||||
|
||||
t.Run("Double errors while reading a small file from an archive.", sdstFailSmallFile)
|
||||
t.Run("Double errors while reading a large file from an archive.", sdstFailLargeFile)
|
||||
}
|
||||
|
||||
func helperTestFdstFail2(t *testing.T, isStream bool) {
|
||||
bigFileSize := 2000
|
||||
archiveFile := "archive.tar.gz"
|
||||
ctx, srcFs, dstFs, rootItem := prepareSrcAndDstFs(t, fmt.Sprintf("src:dir%t", isStream), fmt.Sprintf("dst:dir%t", isStream), archiveFile, bigFileSize)
|
||||
|
||||
skipFiles := 10
|
||||
filePath := rootItem.FindFilePath(&skipFiles, bigFileSize, !isStream)
|
||||
require.NotEqual(t, "", filePath)
|
||||
expectedFalls := 100
|
||||
newDstFs := NewFailFs(dstFs, filePath, expectedFalls)
|
||||
// Extract archive to dst fs
|
||||
err := Extract(ctx, newDstFs, srcFs, archiveFile, bigFileSize)
|
||||
require.Error(t, err)
|
||||
require.Greater(t, newDstFs.TotalFails(), 2)
|
||||
}
|
||||
|
||||
func testFdstFail2SmallFile(t *testing.T) {
|
||||
helperTestFdstFail2(t, false)
|
||||
}
|
||||
|
||||
func testFdstFail2LargeFile(t *testing.T) {
|
||||
helperTestFdstFail2(t, true)
|
||||
}
|
||||
|
||||
func helperTestFdstFail(t *testing.T, isStream bool) {
|
||||
bigFileSize := 2000
|
||||
archiveFile := "archive.tar.gz"
|
||||
ctx, srcFs, dstFs, rootItem := prepareSrcAndDstFs(t, fmt.Sprintf("src:dir%t", isStream), fmt.Sprintf("dst:dir%t", isStream), archiveFile, bigFileSize)
|
||||
|
||||
skipFiles := 10
|
||||
filePath := rootItem.FindFilePath(&skipFiles, bigFileSize, !isStream)
|
||||
require.NotEqual(t, "", filePath)
|
||||
expectedFalls := 2
|
||||
newDstFs := NewFailFs(dstFs, filePath, expectedFalls)
|
||||
// Extract archive to dst fs
|
||||
err := Extract(ctx, newDstFs, srcFs, archiveFile, bigFileSize)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedFalls, newDstFs.TotalFails())
|
||||
ll := make(map[string]struct{})
|
||||
listRer := dstFs.(fs.ListRer)
|
||||
require.NotNil(t, listRer)
|
||||
err = listRer.ListR(ctx, "", func(entries fs.DirEntries) error {
|
||||
for _, entry := range entries {
|
||||
obj, ok := entry.(fs.Object)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
ll[obj.Remote()] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
checkFunc(ctx, t, dstFs, ll, rootItem)
|
||||
|
||||
require.Equal(t, 0, len(ll))
|
||||
}
|
||||
|
||||
func sdstFailSmallFile(t *testing.T) {
|
||||
sdstFailHelper(t, false)
|
||||
}
|
||||
|
||||
func sdstFailLargeFile(t *testing.T) {
|
||||
sdstFailHelper(t, true)
|
||||
}
|
||||
|
||||
func sdstFailHelper(t *testing.T, isStream bool) {
|
||||
bigFileSize := 2000
|
||||
archiveFile := "archive.tar.gz"
|
||||
ctx, srcFs, dstFs, rootItem := prepareSrcAndDstFs(t, fmt.Sprintf("src:didir2%t", isStream), fmt.Sprintf("dst:dir2%t", isStream), archiveFile, bigFileSize)
|
||||
|
||||
skipFiles := 10
|
||||
filePath := rootItem.FindFilePath(&skipFiles, bigFileSize, !isStream)
|
||||
require.NotEqual(t, "", filePath)
|
||||
expectedFalls := 2
|
||||
// Extract archive to dst fs
|
||||
tr := newFailTarReader(nil, filePath, expectedFalls)
|
||||
extr := newExtract(ctx, dstFs, srcFs, archiveFile, bigFileSize, func(reader io.Reader) tarcache.TarReader {
|
||||
tarReader := tar.NewReader(reader)
|
||||
tr.UpdateTarReader(tarReader)
|
||||
return tr
|
||||
})
|
||||
err := extr.run()
|
||||
if !isStream {
|
||||
// When processing files that are placed in the cache, tarcahe.Cache returns an error
|
||||
// at the moment of reading the header, so operation.Copy will not be able to initiate
|
||||
// a second read of this file. In this case, if an error occurs for the first time,
|
||||
// it is expected that it will be returned immediately
|
||||
require.Error(t, err)
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
ll := make(map[string]struct{})
|
||||
listRer := dstFs.(fs.ListRer)
|
||||
require.NotNil(t, listRer)
|
||||
err = listRer.ListR(ctx, "", func(entries fs.DirEntries) error {
|
||||
for _, entry := range entries {
|
||||
obj, ok := entry.(fs.Object)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
ll[obj.Remote()] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
checkFunc(ctx, t, dstFs, ll, rootItem)
|
||||
|
||||
require.Equal(t, 0, len(ll))
|
||||
}
|
||||
|
||||
func testFdstFailSmallFile(t *testing.T) {
|
||||
helperTestFdstFail(t, false)
|
||||
}
|
||||
|
||||
func testFdstFailLargeFile(t *testing.T) {
|
||||
helperTestFdstFail(t, true)
|
||||
}
|
||||
|
||||
func checkFunc(ctx context.Context, t *testing.T, dstFs fs.Fs, files map[string]struct{}, itm *tartest.Item) {
|
||||
for _, obj := range itm.Children {
|
||||
if obj.IsDir() {
|
||||
checkFunc(ctx, t, dstFs, files, obj)
|
||||
continue
|
||||
}
|
||||
_, ok := files[obj.Name()]
|
||||
require.True(t, ok)
|
||||
delete(files, obj.Name())
|
||||
o, err := dstFs.NewObject(ctx, obj.Name())
|
||||
require.NoError(t, err)
|
||||
reader, err := o.Open(ctx)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, reader)
|
||||
actual, err := io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, obj.FileContent, actual)
|
||||
}
|
||||
}
|
||||
|
||||
func prepareSrcAndDstFs(t *testing.T, srcRoot, dstRoot, archiveFile string, bigFileSize int) (ctx context.Context, src fs.Fs, dst fs.Fs, rootItem *tartest.Item) {
|
||||
ctx = context.Background()
|
||||
ctx, ci := fs.AddConfig(ctx)
|
||||
ci.Transfers = 10
|
||||
ci2 := fs.GetConfig(ctx)
|
||||
require.Equal(t, ci.Transfers, ci2.Transfers)
|
||||
var err error
|
||||
var archiveContent []byte
|
||||
rootItem, archiveContent = tartest.CreateTarArchive(t, 5, 10, 12, 50, bigFileSize)
|
||||
src, err = memory.NewFs(ctx, "memory", srcRoot, nil)
|
||||
require.NoError(t, err)
|
||||
o := mockobject.New(archiveFile).WithContent(archiveContent, mockobject.SeekModeNone)
|
||||
reader, err := o.Open(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Put archive file to source fs
|
||||
_, err = src.Put(ctx, reader, o)
|
||||
require.NoError(t, err)
|
||||
dst, err = memory.NewFs(ctx, "memory", dstRoot, nil)
|
||||
require.NoError(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
func testHappyPath(t *testing.T) {
|
||||
bigFileSize := 2000
|
||||
archiveFile := "archive.tar.gz"
|
||||
ctx, srcFs, dstFs, rootItem := prepareSrcAndDstFs(t, "src:dir", "dst:dir", archiveFile, bigFileSize)
|
||||
wrapDstFs := NewFailFs(dstFs, "", math.MaxInt64)
|
||||
dstFs = wrapDstFs
|
||||
defer func() {
|
||||
require.NoError(t, srcFs.Rmdir(ctx, "src:dir"))
|
||||
require.NoError(t, dstFs.Rmdir(ctx, "dst:dir"))
|
||||
}()
|
||||
// Extract archive to dst fs
|
||||
err := Extract(ctx, dstFs, srcFs, archiveFile, bigFileSize)
|
||||
require.NoError(t, err)
|
||||
|
||||
ll := make(map[string]struct{})
|
||||
files := make([]string, 0)
|
||||
listRer := dstFs.(fs.ListRer)
|
||||
require.NotNil(t, listRer)
|
||||
err = listRer.ListR(ctx, "", func(entries fs.DirEntries) error {
|
||||
for _, entry := range entries {
|
||||
obj, ok := entry.(fs.Object)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
ll[obj.Remote()] = struct{}{}
|
||||
files = append(files, obj.Remote())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
filesCnt := len(files)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int32(filesCnt), wrapDstFs.putCnt.Load())
|
||||
require.Equal(t, int32(0), wrapDstFs.updateCnt.Load())
|
||||
require.Equal(t, filesCnt, len(ll))
|
||||
checkFunc(ctx, t, dstFs, ll, rootItem)
|
||||
require.Equal(t, 0, len(ll))
|
||||
|
||||
rnd := tartest.NewRandomizer()
|
||||
cnt := 0
|
||||
|
||||
for i := 0; i < filesCnt; i += 3 {
|
||||
filePath := files[i]
|
||||
cnt++
|
||||
obj, err := dstFs.NewObject(ctx, filePath)
|
||||
require.NoError(t, err)
|
||||
|
||||
var fileSize int
|
||||
isSmallFile := rnd.Dice(50)
|
||||
if isSmallFile {
|
||||
// [1, bigFileSize)
|
||||
fileSize = rnd.Intn(bigFileSize-1) + 1
|
||||
} else {
|
||||
// [bigFileSize, 2*bigFileSize)
|
||||
fileSize = rnd.Intn(bigFileSize) + bigFileSize
|
||||
}
|
||||
fileContent := make([]byte, fileSize)
|
||||
_, _ = rnd.Read(fileContent)
|
||||
reader := bytes.NewReader(fileContent)
|
||||
|
||||
err = obj.Update(ctx, reader, newObjInfo(dstFs, filePath, int64(fileSize)))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
wrapDstFs.putCnt.Store(0)
|
||||
wrapDstFs.updateCnt.Store(0)
|
||||
// Extract archive to dst fs
|
||||
err = Extract(ctx, wrapDstFs, srcFs, archiveFile, bigFileSize)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int32(0), wrapDstFs.putCnt.Load())
|
||||
require.Equal(t, int32(cnt), wrapDstFs.updateCnt.Load())
|
||||
}
|
201
fs/extract/tarcache/tarcache.go
Normal file
201
fs/extract/tarcache/tarcache.go
Normal file
|
@ -0,0 +1,201 @@
|
|||
// Package tarcache provides a cache for tar archives.
|
||||
package tarcache
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TarReader is an interface for reading tar archives.
|
||||
type TarReader interface {
|
||||
io.Reader
|
||||
Next() (*tar.Header, error)
|
||||
}
|
||||
|
||||
// Cache is a cache for tar archives.
|
||||
type Cache struct {
|
||||
reader TarReader
|
||||
workerCnt int
|
||||
maxSizeInMemory int64
|
||||
|
||||
ch chan struct{}
|
||||
helperCh chan func()
|
||||
streamWg sync.WaitGroup
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// Header holds info about file in tar archive.
|
||||
type Header struct {
|
||||
FilePath string
|
||||
ModTime time.Time
|
||||
Size int64
|
||||
Format tar.Format
|
||||
}
|
||||
|
||||
// FileInfo holds info about file in tar archive and its content.
|
||||
type FileInfo struct {
|
||||
Payload []byte
|
||||
IoPayload io.Reader
|
||||
ReleaseFn func()
|
||||
FileInfo Header
|
||||
}
|
||||
|
||||
func (t *Cache) waitWorkersDone() {
|
||||
t.wg.Wait()
|
||||
}
|
||||
|
||||
// Close closes the cache and free all resources.
|
||||
func (t *Cache) Close() error {
|
||||
t.waitWorkersDone()
|
||||
if t.ch != nil {
|
||||
close(t.ch)
|
||||
t.ch = nil
|
||||
close(t.helperCh)
|
||||
t.helperCh = nil
|
||||
return nil
|
||||
}
|
||||
return errors.New("cache already closed")
|
||||
}
|
||||
|
||||
// NewTarCache creates a new tar cache.
|
||||
func NewTarCache(reader TarReader, workerCnt int, maxSizeInMemory int64) *Cache {
|
||||
res := &Cache{
|
||||
reader: reader,
|
||||
workerCnt: workerCnt,
|
||||
maxSizeInMemory: maxSizeInMemory,
|
||||
ch: make(chan struct{}, workerCnt),
|
||||
helperCh: make(chan func(), 1),
|
||||
}
|
||||
|
||||
// helper goroutine for waiting until the file content is fully read from the stream
|
||||
go func() {
|
||||
for fn := range res.helperCh {
|
||||
fn()
|
||||
}
|
||||
}()
|
||||
return res
|
||||
}
|
||||
|
||||
type helper struct {
|
||||
io.Reader
|
||||
releaseFn func()
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
// Close invokes the closer function.
|
||||
func (h *helper) Close() error {
|
||||
h.once.Do(h.releaseFn)
|
||||
return nil
|
||||
}
|
||||
|
||||
// NextPayload returns the next archive file as io.ReadCloser
|
||||
func (t *Cache) NextPayload(ctx context.Context) (io.ReadCloser, Header, error) {
|
||||
res, err := t.Next(ctx)
|
||||
if err != nil {
|
||||
return nil, Header{}, err
|
||||
}
|
||||
if res.IoPayload != nil {
|
||||
return &helper{Reader: res.IoPayload, releaseFn: res.ReleaseFn}, res.FileInfo, nil
|
||||
}
|
||||
res.IoPayload = bytes.NewReader(res.Payload)
|
||||
return &helper{Reader: res.IoPayload, releaseFn: res.ReleaseFn}, res.FileInfo, nil
|
||||
}
|
||||
|
||||
type streamCloser struct {
|
||||
ch chan struct{}
|
||||
cache *Cache
|
||||
isBytes bool
|
||||
}
|
||||
|
||||
func newStreamCloser(cache *Cache, isBytes bool) *streamCloser {
|
||||
if !isBytes {
|
||||
cache.streamWg.Add(1)
|
||||
}
|
||||
cache.wg.Add(1)
|
||||
return &streamCloser{
|
||||
ch: cache.ch,
|
||||
cache: cache,
|
||||
isBytes: isBytes,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *streamCloser) close() {
|
||||
if s.ch == nil {
|
||||
return
|
||||
}
|
||||
ch := s.ch
|
||||
s.ch = nil
|
||||
<-ch
|
||||
if !s.isBytes {
|
||||
s.cache.streamWg.Done()
|
||||
}
|
||||
s.cache.wg.Done()
|
||||
}
|
||||
|
||||
// Next returns info about the next file in the archive
|
||||
func (t *Cache) Next(ctx context.Context) (_ FileInfo, err error) {
|
||||
// We block the execution flow if the number of currently processed files exceeds the limit.
|
||||
select {
|
||||
// Reserve slot for processing the file
|
||||
case t.ch <- struct{}{}:
|
||||
case <-ctx.Done():
|
||||
return FileInfo{}, ctx.Err()
|
||||
}
|
||||
watcher := make(chan struct{})
|
||||
t.helperCh <- func() {
|
||||
// We also block the execution flow if there is a reader without caching.
|
||||
t.streamWg.Wait()
|
||||
close(watcher)
|
||||
}
|
||||
|
||||
// If the method exits because of an error, release the slot used by the processed file right away
|
||||
defer func() {
|
||||
if err != nil {
|
||||
<-t.ch
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-watcher:
|
||||
case <-ctx.Done():
|
||||
return FileInfo{}, ctx.Err()
|
||||
}
|
||||
|
||||
for {
|
||||
header, err := t.reader.Next()
|
||||
if err != nil {
|
||||
return FileInfo{nil, nil, nil, Header{}}, err
|
||||
}
|
||||
switch header.Typeflag {
|
||||
case tar.TypeDir:
|
||||
continue
|
||||
case tar.TypeReg:
|
||||
h := Header{
|
||||
FilePath: header.Name,
|
||||
ModTime: header.ModTime,
|
||||
Size: header.Size,
|
||||
Format: header.Format,
|
||||
}
|
||||
// If the file size is smaller than maxSizeInMemory, read its content
|
||||
if header.Size < t.maxSizeInMemory {
|
||||
var payload []byte
|
||||
payload, err = io.ReadAll(t.reader)
|
||||
if err != nil {
|
||||
return FileInfo{nil, nil, nil, Header{}}, err
|
||||
}
|
||||
closer := newStreamCloser(t, true)
|
||||
return FileInfo{payload, nil, closer.close, h}, nil
|
||||
}
|
||||
// Otherwise, return FileInfo with a stream reader
|
||||
closer := newStreamCloser(t, false)
|
||||
return FileInfo{nil, t.reader, closer.close, h}, nil
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
503
fs/extract/tarcache/tarcache_test.go
Normal file
503
fs/extract/tarcache/tarcache_test.go
Normal file
|
@ -0,0 +1,503 @@
|
|||
package tarcache
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"math"
|
||||
"math/rand"
|
||||
"path"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type itemType int
|
||||
|
||||
const (
|
||||
Dir itemType = iota
|
||||
File
|
||||
)
|
||||
|
||||
// item implements fs.FileInfo interface
|
||||
type item struct {
|
||||
iType itemType
|
||||
itemName string
|
||||
children map[string]*item
|
||||
fileContent []byte
|
||||
}
|
||||
|
||||
func (i *item) Name() string {
|
||||
return i.itemName
|
||||
}
|
||||
|
||||
func (i *item) Size() int64 {
|
||||
return int64(len(i.fileContent))
|
||||
}
|
||||
|
||||
func (i *item) Mode() fs.FileMode {
|
||||
res := fs.ModePerm
|
||||
if i.iType == Dir {
|
||||
res |= fs.ModeDir
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func (i *item) ModTime() time.Time {
|
||||
return time.Now()
|
||||
}
|
||||
|
||||
func (i *item) IsDir() bool {
|
||||
return i.iType == Dir
|
||||
}
|
||||
|
||||
func (i *item) Sys() any {
|
||||
return nil
|
||||
}
|
||||
|
||||
func dice(rnd *rand.Rand, percent int) bool {
|
||||
return rnd.Intn(100) < percent
|
||||
}
|
||||
|
||||
func randomFileName(rnd *rand.Rand) string {
|
||||
b := make([]byte, 16)
|
||||
rnd.Read(b)
|
||||
return hex.EncodeToString(b)
|
||||
}
|
||||
|
||||
type errorReader struct {
|
||||
io.Reader
|
||||
|
||||
bytesRead int
|
||||
threshold int
|
||||
}
|
||||
|
||||
var errRead = errors.New("read error")
|
||||
|
||||
func newReaderWithError(rc io.Reader, threshold int) *errorReader {
|
||||
return &errorReader{rc, 0, threshold}
|
||||
}
|
||||
|
||||
func (e *errorReader) Read(p []byte) (n int, err error) {
|
||||
if e.bytesRead > e.threshold {
|
||||
return 0, errRead
|
||||
}
|
||||
n, err = e.Reader.Read(p)
|
||||
e.bytesRead += n
|
||||
return
|
||||
}
|
||||
|
||||
func (i *item) createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) {
|
||||
if deep <= 0 {
|
||||
return
|
||||
}
|
||||
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir
|
||||
|
||||
for j := 0; j < nItem; j++ {
|
||||
isFile := deep == 1 || dice(rnd, 60)
|
||||
newItem := item{}
|
||||
var prefix string
|
||||
if len(i.itemName) > 0 {
|
||||
prefix = i.itemName + "/"
|
||||
}
|
||||
if isFile {
|
||||
newItem.itemName = fmt.Sprintf("%sfile_%s", prefix, randomFileName(rnd))
|
||||
newItem.iType = File
|
||||
var fileSize int
|
||||
isSmallFile := dice(rnd, smallFilesPercent)
|
||||
if isSmallFile {
|
||||
// [1, bigFileSize)
|
||||
fileSize = rnd.Intn(bigFileSize-1) + 1
|
||||
} else {
|
||||
// [bigFileSize, 2*bigFileSize)
|
||||
fileSize = rnd.Intn(bigFileSize) + bigFileSize
|
||||
}
|
||||
newItem.fileContent = make([]byte, fileSize)
|
||||
rnd.Read(newItem.fileContent)
|
||||
i.children[newItem.itemName] = &newItem
|
||||
} else {
|
||||
newItem.itemName = fmt.Sprintf("%sdir_%s", prefix, randomFileName(rnd))
|
||||
newItem.iType = Dir
|
||||
newItem.children = make(map[string]*item)
|
||||
newItem.createTarArchiveContent(deep-1, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
|
||||
i.children[newItem.itemName] = &newItem
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (i *item) fillMap(m *sync.Map) {
|
||||
for _, v := range i.children {
|
||||
if v.iType == File {
|
||||
m.Store(v.itemName, v.fileContent)
|
||||
} else if v.iType == Dir {
|
||||
v.fillMap(m)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) *item {
|
||||
root := &item{
|
||||
iType: Dir,
|
||||
itemName: "",
|
||||
children: make(map[string]*item),
|
||||
}
|
||||
|
||||
root.createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
|
||||
return root
|
||||
}
|
||||
|
||||
func (i *item) writeItemToArchive(tw *tar.Writer) error {
|
||||
// In the first pass, we write the files from the current directory.
|
||||
for _, child := range i.children {
|
||||
if child.iType == File {
|
||||
head, err := tar.FileInfoHeader(child, path.Base(child.itemName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
head.Name = child.itemName
|
||||
err = tw.WriteHeader(head)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = io.Copy(tw, bytes.NewReader(child.fileContent))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// In the second pass, we write files from child directories.
|
||||
for _, child := range i.children {
|
||||
if child.iType == Dir {
|
||||
if err := child.writeItemToArchive(tw); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func createTarArchiveHelper(writer io.Writer, content *item) error {
|
||||
gz := gzip.NewWriter(writer)
|
||||
defer func() {
|
||||
_ = gz.Close()
|
||||
}()
|
||||
|
||||
tw := tar.NewWriter(gz)
|
||||
defer func() {
|
||||
_ = tw.Close()
|
||||
}()
|
||||
|
||||
return content.writeItemToArchive(tw)
|
||||
}
|
||||
|
||||
func createTarArchive(t *testing.T, deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) (*item, []byte) {
|
||||
content := createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
|
||||
|
||||
writer := bytes.NewBuffer(make([]byte, 0))
|
||||
|
||||
err := createTarArchiveHelper(writer, content)
|
||||
archData := writer.Bytes()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, content)
|
||||
require.NotNil(t, archData)
|
||||
|
||||
return content, archData
|
||||
}
|
||||
|
||||
func createTarArchiveWithOneFileHelper(w io.Writer) error {
|
||||
gz := gzip.NewWriter(w)
|
||||
defer func() {
|
||||
_ = gz.Close()
|
||||
}()
|
||||
|
||||
tw := tar.NewWriter(gz)
|
||||
defer func() {
|
||||
_ = tw.Close()
|
||||
}()
|
||||
itm := item{
|
||||
iType: File,
|
||||
itemName: "test.bin",
|
||||
fileContent: make([]byte, 0x80000),
|
||||
}
|
||||
|
||||
rootItm := item{
|
||||
iType: Dir,
|
||||
children: map[string]*item{
|
||||
itm.itemName: &itm,
|
||||
},
|
||||
}
|
||||
|
||||
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
rnd.Read(itm.fileContent)
|
||||
return rootItm.writeItemToArchive(tw)
|
||||
}
|
||||
|
||||
func createTarArchiveWithOneFile(t *testing.T) []byte {
|
||||
writer := bytes.NewBuffer(make([]byte, 0))
|
||||
err := createTarArchiveWithOneFileHelper(writer)
|
||||
require.NoError(t, err)
|
||||
return writer.Bytes()
|
||||
}
|
||||
|
||||
func doubleCloseCaseHelper(t *testing.T, readerTypeChecker func(io.Reader) bool) {
|
||||
bigFileSize := 0x50
|
||||
content, archData := createTarArchive(t, 5, 8, 10, 50, bigFileSize)
|
||||
|
||||
reader := bytes.NewReader(archData)
|
||||
|
||||
gzf, err := gzip.NewReader(reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
tarReader := tar.NewReader(gzf)
|
||||
tarCache := NewTarCache(tarReader, 10, int64(bigFileSize))
|
||||
|
||||
var m sync.Map
|
||||
content.fillMap(&m)
|
||||
|
||||
errCh := make(chan error, 100)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ctx2 := context.Background()
|
||||
defer cancel()
|
||||
var wg sync.WaitGroup
|
||||
var cnt int32
|
||||
|
||||
L:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break L
|
||||
default:
|
||||
}
|
||||
contentReader, header, err := tarCache.NextPayload(ctx2)
|
||||
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
require.NoError(t, err)
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(reader io.ReadCloser, head Header) {
|
||||
defer wg.Done()
|
||||
defer func() {
|
||||
if err := reader.Close(); err != nil {
|
||||
errCh <- err
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
val, ok := m.Load(header.FilePath)
|
||||
if !ok {
|
||||
errCh <- errors.New(header.FilePath + " not found")
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
expected := val.([]byte)
|
||||
content, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
if !bytes.Equal(expected, content) {
|
||||
errCh <- errors.New(header.FilePath + " content mismatch")
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
if atomic.AddInt32(&cnt, 1) >= 100 {
|
||||
if readerTypeChecker(reader) {
|
||||
if err := reader.Close(); err != nil {
|
||||
errCh <- err
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}(contentReader, header)
|
||||
}
|
||||
err = tarCache.Close()
|
||||
require.NoError(t, err)
|
||||
wg.Wait()
|
||||
close(errCh)
|
||||
|
||||
hasError := false
|
||||
for e := range errCh {
|
||||
if e != nil {
|
||||
hasError = true
|
||||
}
|
||||
}
|
||||
require.False(t, hasError)
|
||||
}
|
||||
|
||||
func readErrorCase(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
archData := createTarArchiveWithOneFile(t)
|
||||
|
||||
reader := newReaderWithError(bytes.NewReader(archData), 0x800)
|
||||
|
||||
gzf, err := gzip.NewReader(reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
tarReader := tar.NewReader(gzf)
|
||||
tarCache := NewTarCache(tarReader, 10, math.MaxInt)
|
||||
|
||||
errCh := make(chan error, 100)
|
||||
var wg sync.WaitGroup
|
||||
_, _, err = tarCache.NextPayload(ctx)
|
||||
|
||||
require.NotErrorIs(t, err, io.EOF)
|
||||
require.Error(t, err, errRead.Error())
|
||||
|
||||
err = tarCache.Close()
|
||||
require.NoError(t, err)
|
||||
wg.Wait()
|
||||
close(errCh)
|
||||
|
||||
for e := range errCh {
|
||||
if err == nil {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func successfulCase(t *testing.T) {
|
||||
bigFileSize := 0x200
|
||||
content, archData := createTarArchive(t, 5, 5, 30, 70, bigFileSize)
|
||||
|
||||
reader := bytes.NewReader(archData)
|
||||
gzf, err := gzip.NewReader(reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
tarReader := tar.NewReader(gzf)
|
||||
tarCache := NewTarCache(tarReader, 10, int64(bigFileSize))
|
||||
|
||||
var m sync.Map
|
||||
content.fillMap(&m)
|
||||
|
||||
errCh := make(chan error, 100)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ctx2 := context.Background()
|
||||
defer cancel()
|
||||
var wg sync.WaitGroup
|
||||
L:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break L
|
||||
default:
|
||||
}
|
||||
contentReader, header, err := tarCache.NextPayload(ctx2)
|
||||
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
require.NoError(t, err)
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(reader io.ReadCloser, head Header) {
|
||||
defer wg.Done()
|
||||
defer func() {
|
||||
if err := reader.Close(); err != nil {
|
||||
errCh <- err
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
val, ok := m.Load(header.FilePath)
|
||||
if !ok {
|
||||
errCh <- errors.New(header.FilePath + " not found")
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
expected := val.([]byte)
|
||||
content, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
if !bytes.Equal(expected, content) {
|
||||
errCh <- errors.New(header.FilePath + " content mismatch")
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
m.Delete(header.FilePath)
|
||||
|
||||
}(contentReader, header)
|
||||
}
|
||||
err = tarCache.Close()
|
||||
wg.Wait()
|
||||
close(errCh)
|
||||
var err2 error
|
||||
for e := range errCh {
|
||||
if err2 == nil {
|
||||
err2 = e
|
||||
}
|
||||
}
|
||||
require.NoError(t, err2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Checking for dictionary emptiness
|
||||
var l int
|
||||
m.Range(func(k, v interface{}) bool {
|
||||
l++
|
||||
return true
|
||||
})
|
||||
require.Equal(t, 0, l)
|
||||
}
|
||||
|
||||
func TestTarCache(t *testing.T) {
|
||||
t.Run("Successful case", func(t *testing.T) {
|
||||
successfulCase(t)
|
||||
})
|
||||
|
||||
t.Run("Double close of the byte reader", func(t *testing.T) {
|
||||
doubleCloseCaseHelper(t, func(reader io.Reader) bool {
|
||||
e, ok := reader.(*helper)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
_, ok = e.Reader.(*bytes.Reader)
|
||||
return ok
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Double close of the stream reader", func(t *testing.T) {
|
||||
doubleCloseCaseHelper(t, func(reader io.Reader) bool {
|
||||
e, ok := reader.(*helper)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
_, ok = e.Reader.(*bytes.Reader)
|
||||
return !ok
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Read error", func(t *testing.T) {
|
||||
readErrorCase(t)
|
||||
})
|
||||
|
||||
}
|
239
fs/extract/tarcache/tartest/tartest.go
Normal file
239
fs/extract/tarcache/tartest/tartest.go
Normal file
|
@ -0,0 +1,239 @@
|
|||
// Package tartest is a set of functions that assist with the testing of tar archives
|
||||
package tartest
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"math/rand"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// FindFilePath is a helper function to find the file path of an item
|
||||
func (i *Item) FindFilePath(skipCnt *int, bigFileSize int, fitInCache bool) string {
|
||||
for _, obj := range i.Children {
|
||||
if !obj.IsDir() {
|
||||
isBig := len(obj.FileContent) >= bigFileSize
|
||||
if isBig && !fitInCache || !isBig && fitInCache {
|
||||
if *skipCnt == 0 {
|
||||
return obj.Name()
|
||||
}
|
||||
*skipCnt--
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, obj := range i.Children {
|
||||
if obj.IsDir() {
|
||||
res := obj.FindFilePath(skipCnt, bigFileSize, fitInCache)
|
||||
if len(res) > 0 {
|
||||
return res
|
||||
}
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// ItemType is an item type
|
||||
type ItemType int
|
||||
|
||||
// Item types
|
||||
const (
|
||||
Dir ItemType = iota // directory type
|
||||
File // regular file type
|
||||
)
|
||||
|
||||
// Item represents a file or directory
|
||||
type Item struct {
|
||||
Type ItemType
|
||||
ItemName string
|
||||
Children map[string]*Item
|
||||
FileContent []byte
|
||||
}
|
||||
|
||||
// Name returns file name
|
||||
func (i *Item) Name() string {
|
||||
return i.ItemName
|
||||
}
|
||||
|
||||
// Size returns file size
|
||||
func (i *Item) Size() int64 {
|
||||
return int64(len(i.FileContent))
|
||||
}
|
||||
|
||||
// Mode returns file mode
|
||||
func (i *Item) Mode() fs.FileMode {
|
||||
res := fs.ModePerm
|
||||
if i.Type == Dir {
|
||||
res |= fs.ModeDir
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// ModTime returns modification time
|
||||
func (i *Item) ModTime() time.Time {
|
||||
return time.Now()
|
||||
}
|
||||
|
||||
// IsDir returns true if item is a directory
|
||||
func (i *Item) IsDir() bool {
|
||||
return i.Type == Dir
|
||||
}
|
||||
|
||||
// Sys returns underlying data source (can return nil)
|
||||
func (i *Item) Sys() any {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Randomizer is a helper to generate random data
|
||||
type Randomizer struct {
|
||||
rnd *rand.Rand
|
||||
}
|
||||
|
||||
// NewRandomizer returns a new Randomizer
|
||||
func NewRandomizer() Randomizer {
|
||||
return Randomizer{
|
||||
rnd: rand.New(rand.NewSource(time.Now().UnixNano())),
|
||||
}
|
||||
}
|
||||
|
||||
func (r Randomizer) Read(p []byte) (n int, err error) {
|
||||
return r.rnd.Read(p)
|
||||
}
|
||||
|
||||
// Dice returns true if dice roll is less than the specified percent
|
||||
func (r Randomizer) Dice(percent int) bool {
|
||||
return r.rnd.Intn(100) < percent
|
||||
}
|
||||
|
||||
// RandomFileName generates random file name
|
||||
func (r Randomizer) RandomFileName() string {
|
||||
b := make([]byte, 16)
|
||||
r.rnd.Read(b)
|
||||
return hex.EncodeToString(b)
|
||||
}
|
||||
|
||||
// Intn returns random integer in the range [0, n)
|
||||
func (r Randomizer) Intn(n int) int {
|
||||
return r.rnd.Intn(n)
|
||||
}
|
||||
|
||||
func (i *Item) createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) {
|
||||
if deep <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
rnd := NewRandomizer()
|
||||
nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir
|
||||
|
||||
for j := 0; j < nItem; j++ {
|
||||
isFile := deep == 1 || rnd.Dice(60)
|
||||
newItem := Item{}
|
||||
var prefix string
|
||||
if len(i.ItemName) > 0 {
|
||||
prefix = i.ItemName + "/"
|
||||
}
|
||||
if isFile {
|
||||
newItem.ItemName = fmt.Sprintf("%sfile_%s", prefix, rnd.RandomFileName())
|
||||
newItem.Type = File
|
||||
var fileSize int
|
||||
isSmallFile := rnd.Dice(smallFilesPercent)
|
||||
if isSmallFile {
|
||||
// [1, bigFileSize)
|
||||
fileSize = rnd.Intn(bigFileSize-1) + 1
|
||||
} else {
|
||||
// [bigFileSize, 2*bigFileSize)
|
||||
fileSize = rnd.Intn(bigFileSize) + bigFileSize
|
||||
}
|
||||
newItem.FileContent = make([]byte, fileSize)
|
||||
_, _ = rnd.Read(newItem.FileContent)
|
||||
i.Children[newItem.ItemName] = &newItem
|
||||
} else {
|
||||
newItem.ItemName = fmt.Sprintf("%sdir_%s", prefix, rnd.RandomFileName())
|
||||
newItem.Type = Dir
|
||||
newItem.Children = make(map[string]*Item)
|
||||
newItem.createTarArchiveContent(deep-1, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
|
||||
i.Children[newItem.ItemName] = &newItem
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// CreateTarArchiveContent generates a tar archive containing random data, structured to the specified depth
|
||||
func CreateTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) *Item {
|
||||
root := &Item{
|
||||
Type: Dir,
|
||||
ItemName: "",
|
||||
Children: make(map[string]*Item),
|
||||
}
|
||||
|
||||
root.createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
|
||||
return root
|
||||
}
|
||||
|
||||
func (i *Item) writeItemToArchiver(tw *tar.Writer) error {
|
||||
// In the first pass, we write the files from the current directory.
|
||||
for _, child := range i.Children {
|
||||
if child.Type == File {
|
||||
head, err := tar.FileInfoHeader(child, path.Base(child.ItemName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
head.Name = child.ItemName
|
||||
err = tw.WriteHeader(head)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = io.Copy(tw, bytes.NewReader(child.FileContent))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// In the second pass, we write files from child directories.
|
||||
for _, child := range i.Children {
|
||||
if child.Type == Dir {
|
||||
if err := child.writeItemToArchiver(tw); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func createTarArchiveHelper(writer io.Writer, content *Item) error {
|
||||
gz := gzip.NewWriter(writer)
|
||||
defer func() {
|
||||
_ = gz.Close()
|
||||
}()
|
||||
|
||||
tw := tar.NewWriter(gz)
|
||||
defer func() {
|
||||
_ = tw.Close()
|
||||
}()
|
||||
|
||||
return content.writeItemToArchiver(tw)
|
||||
}
|
||||
|
||||
// CreateTarArchive creates a tar archive of the given content.
|
||||
func CreateTarArchive(t *testing.T, deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) (*Item, []byte) {
|
||||
content := CreateTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
|
||||
|
||||
writer := bytes.NewBuffer(make([]byte, 0))
|
||||
|
||||
err := createTarArchiveHelper(writer, content)
|
||||
archData := writer.Bytes()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, content)
|
||||
require.NotNil(t, archData)
|
||||
|
||||
return content, archData
|
||||
}
|
Loading…
Add table
Reference in a new issue