[#8] Add extract command
Some checks failed
/ DCO (pull_request) Successful in 49s
/ Builds (pull_request) Successful in 1m27s
/ Lint (pull_request) Failing after 2m29s
/ Test (pull_request) Successful in 6m54s

Signed-off-by: Aleksey Kravchenko <al.kravchenko@yadro.com>
This commit is contained in:
Aleksey Kravchenko 2025-02-10 21:50:15 +03:00
parent 4733d46d83
commit f652af8fac
10 changed files with 2956 additions and 2 deletions

View file

@ -44,7 +44,7 @@ jobs:
AIO_VERSION: 1.7.0-nightly.4
RCLONE_CONFIG: /config/rclone.conf
# run only tests related to FrostFS backend
# run only tests related to FrostFS backend and extract command
run: |-
podman-service.sh
podman info
@ -64,4 +64,5 @@ jobs:
docker cp aio:/config/user-wallet.json /config/wallet.json
echo "Start tests"
go test -v github.com/rclone/rclone/backend/frostfs
go test -v github.com/rclone/rclone/backend/frostfs
go test -v -cover --race ./fs/extract/...

View file

@ -23,6 +23,7 @@ import (
_ "github.com/rclone/rclone/cmd/dedupe"
_ "github.com/rclone/rclone/cmd/delete"
_ "github.com/rclone/rclone/cmd/deletefile"
_ "github.com/rclone/rclone/cmd/extract"
_ "github.com/rclone/rclone/cmd/genautocomplete"
_ "github.com/rclone/rclone/cmd/gendocs"
_ "github.com/rclone/rclone/cmd/gitannex"

62
cmd/extract/extract.go Normal file
View file

@ -0,0 +1,62 @@
// Package extract provides the extract command.
package extract
import (
"context"
"errors"
"strings"
"github.com/rclone/rclone/cmd"
"github.com/rclone/rclone/fs/config/flags"
"github.com/rclone/rclone/fs/extract"
"github.com/spf13/cobra"
)
var (
maxFileSizeForCache = 0x100000 // 1Mb
)
func init() {
cmd.Root.AddCommand(commandDefinition)
cmdFlags := commandDefinition.Flags()
flags.IntVarP(cmdFlags,
&maxFileSizeForCache,
"max-file-size-for-cache",
"",
maxFileSizeForCache,
`Maximum file size that can be placed in the cache. Files of larger
size will be read without caching, which will prevent parallelization
of copy operations and may result in longer archive unpacking time.`,
"")
}
var commandDefinition = &cobra.Command{
Use: "extract source:path/to/archive.tar.gz dest:path",
Short: `Extract the tar.gz archive from source to the dest, skipping identical files.`,
// Note: "|" will be replaced by backticks below
Long: strings.ReplaceAll(`Extract the tar.gz archive files from the source to the destination. Does not transfer files that are
identical on source and destination, testing by size and modification
time or MD5SUM. Doesn't delete files from the destination.
If dest:path doesn't exist, it is created and the contents of the archive source:path/to/targz/archive
go there.
**Note**: Use the |-P|/|--progress| flag to view real-time transfer statistics.
**Note**: Use the |--dry-run| or the |--interactive|/|-i| flag to test without copying anything.
`, "|", "`"),
Annotations: map[string]string{
"groups": "Important",
},
Run: func(command *cobra.Command, args []string) {
cmd.CheckArgs(2, 2, command, args)
fsrc, srcFileName, fdst := cmd.NewFsSrcFileDst(args)
cmd.Run(true, true, command, func() error {
if srcFileName == "" {
return errors.New("the source is required to be a file")
}
return extract.Extract(context.Background(), fdst, fsrc, srcFileName, maxFileSizeForCache)
})
},
}

234
fs/extract/dirtreeholder.go Normal file
View file

@ -0,0 +1,234 @@
package extract
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"github.com/rclone/rclone/fs"
)
// Dir represents a directory with objects.
type Dir struct {
name string
objs map[string]fs.DirEntry
dirs map[string]*Dir
}
// NewDir creates a new directory.
func NewDir(name string) *Dir {
return &Dir{
name: name,
objs: make(map[string]fs.DirEntry),
dirs: make(map[string]*Dir),
}
}
// Name returns the name of the directory.
func (d *Dir) Name() string {
return d.name
}
func (d *Dir) isDirPresent(name string) bool {
if entry, ok := d.objs[name]; ok {
_, ok := entry.(fs.DirEntry)
return ok
}
return false
}
// getDirByName returns a subdirectory with the given name
func (d *Dir) getDirByName(name string) *Dir {
if d.name == name {
return d
}
if !strings.HasPrefix(name, d.name) {
return nil
}
if d.name != "" && name[len(d.name)] != '/' {
return nil
}
idx := strings.Index(name[len(d.name)+1:], "/")
nextDir := name
if idx != -1 {
nextDir = name[:idx+len(d.name)+1]
}
if td, ok := d.dirs[nextDir]; ok {
return td.getDirByName(name)
}
return nil
}
// addChildDir adds a child directory
func (d *Dir) addChildDir(child *Dir) {
d.dirs[child.name] = child
}
// AddChildObject adds information about the child object
func (d *Dir) AddChildObject(child fs.DirEntry) {
d.objs[child.Remote()] = child
}
type listDirFn func(ctx context.Context, dir string) (*Dir, error)
type dirTreeHolder struct {
listF listDirFn
mu sync.Mutex
root *Dir
dirsInWork map[string]*sync.WaitGroup
}
type strDirIter struct {
dir string
idx int
}
func newStrDirIter(dir string) *strDirIter {
return &strDirIter{
dir: dir,
idx: -1,
}
}
func (s *strDirIter) next() (cur string, next string) {
if s.dir == "" {
return "", ""
}
if s.idx == -1 {
s.idx = 0
idx := strings.Index(s.dir, "/")
if idx == -1 {
return "", s.dir
}
return "", s.dir[:idx]
}
idx := strings.Index(s.dir[s.idx:], "/")
if idx == -1 {
return s.dir, ""
}
defer func(idx int) {
s.idx = s.idx + idx + 1
}(idx)
idx2 := strings.Index(s.dir[s.idx+idx+1:], "/")
if idx2 == -1 {
return s.dir[:idx+s.idx], s.dir
}
return s.dir[:idx+s.idx], s.dir[:idx+idx2+s.idx+1]
}
func (d *Dir) getObjectByPath(path string) fs.DirEntry {
var dirName string
idx := strings.LastIndex(path, "/")
if idx != -1 {
dirName = path[:idx]
}
dir := d.getDirByName(dirName)
if dir == nil {
return nil
}
return dir.objs[path]
}
func newDirStructHolder(listF listDirFn) *dirTreeHolder {
return &dirTreeHolder{
listF: listF,
root: nil,
dirsInWork: make(map[string]*sync.WaitGroup),
}
}
func (l *dirTreeHolder) getObjectByPath(path string) fs.DirEntry {
l.mu.Lock()
defer l.mu.Unlock()
if l.root == nil {
return nil
}
return l.root.getObjectByPath(path)
}
func (l *dirTreeHolder) updateDirList(ctx context.Context, dir string) error {
l.mu.Lock()
defer l.mu.Unlock()
iter := newStrDirIter(dir)
// Function that awaits the directory listing result produced by another goroutine
waitF := func(wg *sync.WaitGroup) error {
l.mu.Unlock()
defer l.mu.Lock()
wg.Wait()
return nil
}
// Helper for calling the directory listing function with the mutex unlocked
listF := func(dir string) (*Dir, error) {
l.mu.Unlock()
defer l.mu.Lock()
return l.listF(ctx, dir)
}
var prevDir *Dir
var curDir string
nextDir := "/"
for {
if nextDir == "" {
return nil
}
curDir, nextDir = iter.next()
if l.root != nil {
// Handling the case when the directory was previously processed
if d := l.root.getDirByName(curDir); d != nil {
prevDir = d
if !d.isDirPresent(nextDir) {
return nil
}
continue
}
}
if wg, ok := l.dirsInWork[curDir]; ok {
if err := waitF(wg); err != nil {
return err
}
if l.root == nil {
return fmt.Errorf("unexpected nil root dir while updating information for '%s'", curDir)
}
if dir := l.root.getDirByName(curDir); dir == nil {
return fmt.Errorf("unexpected nil dir while updating information for '%s'", curDir)
} else {
prevDir = dir
}
} else {
if l.root == nil && curDir != "" {
return fmt.Errorf("unexpected nil root dir while updating information for '%s'", curDir)
}
// WaitGroup to prevent concurrent processing of the same directory
wg := &sync.WaitGroup{}
wg.Add(1)
l.dirsInWork[curDir] = wg
d, err := listF(curDir)
if err != nil {
// Unconditionally create only the root directory
if errors.Is(err, fs.ErrorDirNotFound) && curDir == "" {
d = NewDir(curDir)
} else {
wg.Done()
delete(l.dirsInWork, curDir)
return err
}
}
if l.root == nil {
l.root = d
} else {
prevDir.addChildDir(d)
}
prevDir = d
delete(l.dirsInWork, curDir)
wg.Done()
}
if !prevDir.isDirPresent(nextDir) {
return nil
}
}
}

View file

@ -0,0 +1,375 @@
package extract
import (
"context"
"fmt"
"os"
"path/filepath"
"sort"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/extract/tarcache/tartest"
"github.com/rclone/rclone/fs/hash"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
type dirEntryForTest struct {
name string
}
func (d dirEntryForTest) Fs() fs.Info {
return nil
}
func (d dirEntryForTest) String() string {
return d.name
}
func (d dirEntryForTest) Remote() string {
return d.name
}
func (d dirEntryForTest) ModTime(context.Context) time.Time {
return time.Now()
}
func (d dirEntryForTest) Size() int64 {
return -1
}
type dirForTest struct {
dirEntryForTest
}
func (d dirForTest) Items() int64 {
return -1
}
func (d dirForTest) ID() string {
return ""
}
type objectForTest struct {
dirEntryForTest
}
func (o objectForTest) Hash(context.Context, hash.Type) (string, error) {
return "", nil
}
func (o objectForTest) Storable() bool {
return true
}
func createDirectory(dirName string) fs.DirEntry {
return &dirForTest{
dirEntryForTest{name: dirName},
}
}
func createObject(dirName string) fs.DirEntry {
return &objectForTest{
dirEntryForTest{name: dirName},
}
}
func createDirTree(dir *Dir, deep, minItemsInDir, maxItemsInDir int) {
if deep <= 0 {
return
}
rnd := tartest.NewRandomizer()
nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir
for j := 0; j < nItem; j++ {
isFile := deep == 1 || rnd.Dice(50)
var prefix string
if len(dir.name) > 0 {
prefix = dir.name + "/"
}
if isFile {
name := fmt.Sprintf("%sfile_%s", prefix, rnd.RandomFileName())
newItem := createObject(name)
dir.objs[name] = newItem
} else {
name := fmt.Sprintf("%sdir_%s", prefix, rnd.RandomFileName())
newItem := createDirectory(name)
dir.objs[name] = newItem
childDir := NewDir(name)
createDirTree(childDir, deep-1, minItemsInDir, maxItemsInDir)
if len(childDir.dirs) != 0 || len(childDir.objs) != 0 {
dir.dirs[childDir.name] = childDir
}
}
}
}
type listFnHelper struct {
mu sync.Mutex
cnt atomic.Uint64
root *Dir
}
func (l *listFnHelper) Cnt() uint64 {
return l.cnt.Load()
}
func (l *listFnHelper) ResetCnt() {
l.cnt.Store(0)
}
func (l *listFnHelper) listDir(_ context.Context, dir string) (*Dir, error) {
l.cnt.Add(1)
l.mu.Lock()
defer l.mu.Unlock()
d := l.root.getDirByName(dir)
if d == nil {
return nil, os.ErrNotExist
}
newDir := NewDir(d.name)
for _, child := range d.objs {
newDir.AddChildObject(child)
}
return newDir, nil
}
func fillSliceWithDirsInfo(s *[]string, dir *Dir) {
for path, child := range dir.objs {
if _, ok := child.(fs.Directory); ok {
*s = append(*s, path)
}
}
for _, child := range dir.dirs {
fillSliceWithDirsInfo(s, child)
}
}
func fillMapWithFilesInfo(m map[string][]string, dir *Dir) {
files := make([]string, 0)
for path, child := range dir.objs {
if _, ok := child.(fs.ObjectInfo); ok {
files = append(files, filepath.Base(path))
}
}
m[dir.Name()] = files
for _, child := range dir.dirs {
fillMapWithFilesInfo(m, child)
}
}
func TestListing(t *testing.T) {
t.Run("Dir struct holder", testDirTreeHolder)
t.Run("Dir", testDir)
t.Run("String directory iterating", testStrDirIter)
}
func testDirTreeHolder(t *testing.T) {
root := NewDir("")
createDirTree(root, 6, 5, 10)
listF := &listFnHelper{
root: root,
}
t.Run("Concurrent listing", func(t *testing.T) {
s := make([]string, 0)
m := make(map[string][]string)
fillSliceWithDirsInfo(&s, root)
fillMapWithFilesInfo(m, root)
sort.Slice(s, func(i, j int) bool {
return len(s[i]) < len(s[j])
})
require.NotNil(t, root)
holder := newDirStructHolder(listF.listDir)
halfLen := len(s) / 2
path := s[halfLen]
expectedCnt := 0
dIter := newStrDirIter(path)
for {
expectedCnt++
_, next := dIter.next()
if next == "" {
break
}
}
require.NotNil(t, holder)
eg, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 10; i++ {
eg.Go(func() error {
return holder.updateDirList(ctx, path)
})
}
eg.Go(func() error {
return holder.updateDirList(ctx, path+"not.exists")
})
err := eg.Wait()
require.NoError(t, err)
require.Equal(t, uint64(expectedCnt), listF.Cnt())
require.NotNil(t, holder.root)
dIter = newStrDirIter(path)
var cur, next string
next = "1"
for next != "" {
cur, next = dIter.next()
if next == "" {
break
}
if cur != "" {
entity := holder.getObjectByPath(cur)
require.NotNil(t, entity)
_, ok := entity.(fs.DirEntry)
require.True(t, ok)
}
files, ok := m[cur]
require.True(t, ok)
for _, file := range files {
var filePath string
if cur == "" {
filePath = file
} else {
filePath = cur + "/" + file
}
obj := holder.getObjectByPath(filePath)
require.NotNil(t, obj)
_, ok := obj.(fs.ObjectInfo)
require.True(t, ok)
require.Equal(t, filePath, obj.Remote())
}
}
})
}
func testDir(t *testing.T) {
finalDir := "path/with/more/than/one/dir"
files := map[string][]string{
"": {"file0.1.ext", "file0.2.ext"},
"path": {"file1.1.ext", "file1.2.ext"},
"path/with/more/than/one": {"file2.1.ext", "file2.2.ext"},
"path/with/more/than/one/dir": {"file3.1.ext", "file3.2.ext"},
}
dirIter := newStrDirIter(finalDir)
var root, prevDir *Dir
next := "/"
for next != "" {
var cur string
cur, next = dirIter.next()
dir := NewDir(cur)
if root == nil {
root = dir
}
if files, ok := files[cur]; ok {
for _, file := range files {
var path string
if cur == "" {
path = file
} else {
path = cur + "/" + file
}
dir.AddChildObject(createObject(path))
}
}
if prevDir != nil {
prevDir.addChildDir(dir)
prevDir.AddChildObject(createDirectory(dir.Name()))
}
prevDir = dir
}
t.Run("getDirByName", func(t *testing.T) {
dirIter = newStrDirIter(finalDir)
next = "1"
cnt := 0
for next != "" {
var cur string
cur, next = dirIter.next()
dir := root.getDirByName(cur)
require.NotNil(t, dir)
cnt++
}
require.Equal(t, 7, cnt)
})
t.Run("getObjectByPath", func(t *testing.T) {
for dirName, fileNames := range files {
for _, fileName := range fileNames {
var filePath string
if dirName == "" {
filePath = fileName
} else {
filePath = dirName + "/" + fileName
}
obj := root.getObjectByPath(filePath)
require.NotNil(t, obj)
require.NotNil(t, obj.(fs.ObjectInfo))
filePath += ".fail"
obj = root.getObjectByPath(filePath)
require.Nil(t, obj)
}
}
})
}
func testStrDirIter(t *testing.T) {
for _, tc := range []struct {
name string
dir string
res []string
}{
{
name: "path with more than one dir",
dir: "path/with/more/than/one/dir",
res: []string{
"",
"path",
"path/with",
"path/with/more",
"path/with/more/than",
"path/with/more/than/one",
"path/with/more/than/one/dir",
"",
},
},
{
name: "path with one dir",
dir: "one_dir",
res: []string{
"",
"one_dir",
"",
},
},
{
name: "empty path",
dir: "",
res: []string{
"",
"",
},
},
} {
t.Run(tc.name, func(t *testing.T) {
l := newStrDirIter(tc.dir)
for i, p := range tc.res {
if p == "" && i > 0 {
break
}
cur, next := l.next()
require.Equal(t, cur, p)
require.Equal(t, next, tc.res[i+1])
}
})
}
}

846
fs/extract/extract.go Normal file
View file

@ -0,0 +1,846 @@
// Package extract is the implementation of extract command
package extract
import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/extract/tarcache"
"github.com/rclone/rclone/fs/filter"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/list"
"github.com/rclone/rclone/fs/operations"
)
type payloadType int
const (
payloadTypeBytes payloadType = iota
payloadTypeIo
)
// TarCacheFile is a wrapper around tarcache.FileInfo that adds support for file reopening functionality
type TarCacheFile struct {
owner *TarCache
pt payloadType
fileInfo tarcache.FileInfo
reader io.Reader
}
// Release releases resource associated with the file
func (t *TarCacheFile) Release() {
t.fileInfo.ReleaseFn()
}
// Read implements io.Reader interface
func (t *TarCacheFile) Read(p []byte) (n int, err error) {
return t.reader.Read(p)
}
// reopenIo implements file reopening for files that don't fit into memory
func (t *TarCacheFile) reopenIo(ctx context.Context) (err error) {
return t.owner.requestReopenFile(ctx, t)
}
// reopenBytes implements file reopening for files that fit into memory
func (t *TarCacheFile) reopenBytes(ctx context.Context) (err error) {
select {
case <-ctx.Done():
return ctx.Err()
default:
t.reader = bytes.NewReader(t.fileInfo.Payload)
return nil
}
}
// Reopen initializes the file reopening process
func (t *TarCacheFile) Reopen(ctx context.Context) (err error) {
if t.pt == payloadTypeBytes {
return t.reopenBytes(ctx)
}
return t.reopenIo(ctx)
}
// TarCache implements the functionality of reopening files within an archive
type TarCache struct {
// parameters
fsrc fs.Fs
srcFileName string
newTarReaderFn newTarReader
transfers int
maxFileSizeForCache int
options []fs.OpenOption
// no thread safe internal state
closeStreamFn func() error
tarCache *tarcache.Cache
workerCh chan func()
reopenReqCh chan reopenRequest
finishCh chan struct{}
pendingCloseCache []*tarcache.Cache
// thread safe internal state
mu sync.Mutex
cache map[string]tarcache.FileInfo
}
func newTarCache(fsrc fs.Fs, srcFileName string, newTarReaderFn newTarReader, transfers int, maxFileSizeForCache int) *TarCache {
res := &TarCache{
fsrc: fsrc,
srcFileName: srcFileName,
newTarReaderFn: newTarReaderFn,
transfers: transfers,
maxFileSizeForCache: maxFileSizeForCache,
workerCh: make(chan func()),
reopenReqCh: make(chan reopenRequest),
finishCh: make(chan struct{}),
cache: make(map[string]tarcache.FileInfo),
}
if res.newTarReaderFn == nil {
res.newTarReaderFn = func(reader io.Reader) tarcache.TarReader {
return tar.NewReader(reader)
}
}
// Run a goroutine that will handle functions waiting for file reopen requests
go func() {
for workFn := range res.workerCh {
workFn()
}
}()
return res
}
func (t *TarCache) requestReopenFile(ctx context.Context, file *TarCacheFile) error {
reopenReq := reopenRequest{
file: file,
respCh: make(chan error, 1),
}
select {
case t.reopenReqCh <- reopenReq:
case <-ctx.Done():
return ctx.Err()
case <-t.finishCh:
return errTarCacheIsClosed
}
select {
case err := <-reopenReq.respCh:
return err
case <-ctx.Done():
return ctx.Err()
case <-t.finishCh:
return errTarCacheIsClosed
}
}
// newTarCacheFile creates a new TarCacheFile from the given tarcache.FileInfo
func (t *TarCache) newTarCacheFile(info tarcache.FileInfo) *TarCacheFile {
res := &TarCacheFile{
owner: t,
pt: payloadTypeIo,
fileInfo: info,
reader: info.IoPayload,
}
// If IoPayload is nil, we assume that the file is small enough to fit into memory
if info.IoPayload == nil {
res.pt = payloadTypeBytes
res.reader = bytes.NewReader(info.Payload)
}
return res
}
// Close closes the tar cache and release all resources associated with it
func (t *TarCache) Close() error {
close(t.workerCh)
err := t.closeHelper()
if t.tarCache != nil {
_ = t.tarCache.Close()
}
return err
}
type reopenRequest struct {
file *TarCacheFile
respCh chan error
}
var errReopenArchiveRequest = errors.New("reopen archive request")
var errTarCacheIsClosed = errors.New("tar cache is closed")
func (t *TarCache) handleReopenRequest(ctx context.Context, reopenReq reopenRequest) {
defer close(reopenReq.respCh)
t.pendingCloseCache = append(t.pendingCloseCache, t.tarCache)
// Reopening a file in the archive requires closing the currently open archive first
_ = t.closeHelper()
// Release the resources occupied by the file
reopenReq.file.Release()
var err error
prev := t.tarCache
// Close the underlying tarcache.Cache
_ = prev.Close()
// Create a new tarcahce.Cache and function that closes the underlying gzip reader
t.tarCache, t.closeStreamFn, err = t.openHelper(ctx)
if err != nil {
// Notify the goroutine that requested the file reopening about error
reopenReq.respCh <- err
return
}
// Search for the required file within the archive
err = t.FindFile(ctx, reopenReq.file.fileInfo.FileInfo.FilePath)
if err == nil {
// Create a new TarCacheFile from the found FileInfo
file := t.GetFile(reopenReq.file.fileInfo.FileInfo.FilePath)
// If the previous call to FindFile completed without errors, then GetFile must return a non-nil TarCacheFile
if file == nil {
panic("shouldn't get here")
}
*reopenReq.file = *file
}
// Notify the goroutine that requested the file reopening about the result
reopenReq.respCh <- err
}
// FindFile searches for the file with the given filePath within the archive
func (t *TarCache) FindFile(ctx context.Context, filePath string) error {
for {
select {
case <-t.finishCh:
return errTarCacheIsClosed
default:
}
// Iterate through the archive files until the target file is found
h, err := t.tarCache.Next(ctx)
if err != nil {
return err
}
if h.FileInfo.FilePath != filePath {
// If the file is not the target, free its resources right away
h.ReleaseFn()
continue
}
t.mu.Lock()
// Store the information about the file in the cache
t.cache[h.FileInfo.FilePath] = h
t.mu.Unlock()
return nil
}
}
// GetFile constructs a TarCacheFile from cached data, relying on the file path
func (t *TarCache) GetFile(filePath string) *TarCacheFile {
t.mu.Lock()
defer t.mu.Unlock()
info, ok := t.cache[filePath]
if !ok {
return nil
}
return t.newTarCacheFile(info)
}
// ReleaseUnprocessedFiles releases resources occupied by files that were not processed
func (t *TarCache) ReleaseUnprocessedFiles() {
t.mu.Lock()
// Here we are preparing a list of functions that release resources, because we cannot call them when
// iterating over the map, since the ReleaseFn method also uses the same mutex that protects the map.
releaseFns := make([]func(), 0, len(t.cache))
for _, info := range t.cache {
releaseFns = append(releaseFns, info.ReleaseFn)
}
t.cache = make(map[string]tarcache.FileInfo)
t.mu.Unlock()
for _, fn := range releaseFns {
fn()
}
}
// Next advances to the next file in the archive, caches its information, and returns the file's path within the archive.
// This method also handles file reopen requests within the archive
func (t *TarCache) Next(ctx context.Context) (string, error) {
select {
case <-t.finishCh:
return "", errTarCacheIsClosed
default:
}
for {
watchdog := make(chan struct{})
var reopenReq reopenRequest
ctx2, cancel := context.WithCancelCause(ctx)
var wg sync.WaitGroup
wg.Add(1)
t.workerCh <- func() {
defer wg.Done()
select {
case <-watchdog:
case reopenReq = <-t.reopenReqCh:
// When a file reopen request arrives, it is necessary to interrupt the execution of t.tarCache.Next
cancel(errReopenArchiveRequest)
}
}
h, err := t.tarCache.Next(ctx2)
relFn := h.ReleaseFn
// Create a wrapper around the resource release function to additionally remove file information from the cache
h.ReleaseFn = func() {
t.mu.Lock()
delete(t.cache, h.FileInfo.FilePath)
t.mu.Unlock()
relFn()
}
// Close the watchdog channel to exit from the watchdog function
close(watchdog)
// Wait for the watchdog function to complete its execution
wg.Wait()
for _, cache := range t.pendingCloseCache {
_ = cache.Close()
}
t.pendingCloseCache = nil
if err != nil {
// When the error is triggered by a file reopen request, process the request accordingly
if errors.Is(context.Cause(ctx2), errReopenArchiveRequest) {
t.handleReopenRequest(ctx, reopenReq)
continue
}
cancel(nil)
close(t.finishCh)
if err == io.EOF {
return "", nil
}
return "", err
}
cancel(nil)
t.mu.Lock()
_, ok := t.cache[h.FileInfo.FilePath]
if ok {
t.mu.Unlock()
panic("shouldn't get here")
}
t.cache[h.FileInfo.FilePath] = h
t.mu.Unlock()
return h.FileInfo.FilePath, nil
}
}
func (t *TarCache) closeHelper() error {
if t.closeStreamFn != nil {
fn := t.closeStreamFn
t.closeStreamFn = nil
return fn()
}
return nil
}
// openHelper opens a new stream to the tar archive
func (t *TarCache) openHelper(ctx context.Context) (cache *tarcache.Cache, closeFn func() error, err error) {
obj, err := t.fsrc.NewObject(ctx, t.srcFileName)
if err != nil {
return nil, nil, err
}
reader, err := obj.Open(ctx, t.options...)
if err != nil {
return nil, nil, err
}
gzr, err := gzip.NewReader(reader)
if err != nil {
// we don't care about the error, we just need to close the reader
_ = reader.Close()
return nil, nil, err
}
tarReader := t.newTarReaderFn(gzr)
cache = tarcache.NewTarCache(tarReader, t.transfers, int64(t.maxFileSizeForCache))
return cache, func() error {
gzErr := gzr.Close()
readerErr := reader.Close()
if gzErr != nil {
return gzErr
}
return readerErr
}, nil
}
// Open opens the tar archive from remote
func (t *TarCache) Open(ctx context.Context, options ...fs.OpenOption) error {
t.options = options
var err error
t.tarCache, t.closeStreamFn, err = t.openHelper(ctx)
return err
}
type copyJob struct {
src fs.Object
dst fs.Object
path string
}
func makeListDir(f fs.Fs) listDirFn {
return func(ctx context.Context, dir string) (*Dir, error) {
dirCtx := filter.SetUseFilter(ctx, f.Features().FilterAware) // make filter-aware backends constrain List
entities, err := list.DirSorted(dirCtx, f, false, dir)
if err != nil {
return nil, err
}
res := NewDir(dir)
for _, entry := range entities {
res.AddChildObject(entry)
}
return res, nil
}
}
// extractObj implements fs.Object interface
type extractObj struct {
ownerFs fs.Fs
isFirstOpen atomic.Bool
file *TarCacheFile
}
func (e *extractFs) newExtractObj(file *TarCacheFile) *extractObj {
res := &extractObj{
ownerFs: e,
file: file,
}
res.isFirstOpen.Store(true)
return res
}
// Release releases resource associated with the object
func (o *extractObj) Release() {
o.file.Release()
}
// extractFs implements fs.Fs interface
type extractFs struct {
// parameters
fsrc fs.Fs
srcFileName string
features *fs.Features
// internal state
tarCache *TarCache
mu sync.Mutex
tarFormat tar.Format
}
type extract struct {
fdst fs.Fs
fsrc fs.Fs
extrFs *extractFs
srcFileName string
maxFileSizeForCache int
hashType hash.Type // common hash to use
hashOption *fs.HashesOption // open option for the common hash
// internal state
ci *fs.ConfigInfo // global config
ctx context.Context // internal context for controlling go-routines
cancel func() // cancel the context
maxDurationEndTime time.Time // end time if --max-duration is set
toBeChecked chan string // checkers channel
checkersWg sync.WaitGroup // wait for checkers
toBeUploaded chan copyJob // copiers channels
transfersWg sync.WaitGroup // wait for transfers
newTarReaderFn newTarReader
mu sync.Mutex
lastError error
errCnt atomic.Int32
dsh *dirTreeHolder // destination directory tree holder
}
// Fs returns read only access to the Fs that this object is part of
func (o *extractObj) Fs() fs.Info {
return o.ownerFs
}
// String returns a description of the Object
func (o *extractObj) String() string {
if o == nil {
return "<nil>"
}
return o.Remote()
}
// Remote returns the remote path
func (o *extractObj) Remote() string {
return o.file.fileInfo.FileInfo.FilePath
}
// ModTime returns the modification date of the file
// It should return a best guess if one isn't available
func (o *extractObj) ModTime(_ context.Context) time.Time {
return o.file.fileInfo.FileInfo.ModTime
}
// Size returns the size of the object
func (o *extractObj) Size() int64 {
return o.file.fileInfo.FileInfo.Size
}
// Hash returns the selected checksum of the file
// If no checksum is available it returns ""
func (o *extractObj) Hash(_ context.Context, _ hash.Type) (string, error) {
return "", nil
}
// Storable says whether this object can be stored
func (o *extractObj) Storable() bool {
return true
}
// Open opens the file for read. Call Close() on the returned io.ReadCloser
func (o *extractObj) Open(ctx context.Context, _ ...fs.OpenOption) (io.ReadCloser, error) {
if o.isFirstOpen.Load() {
o.isFirstOpen.Store(false)
return io.NopCloser(o.file), nil
}
err := o.file.Reopen(ctx)
return io.NopCloser(o.file), err
}
func (o *extractObj) SetModTime(context.Context, time.Time) error {
panic("not implemented")
}
func (o *extractObj) Update(context.Context, io.Reader, fs.ObjectInfo, ...fs.OpenOption) error {
panic("not implemented")
}
func (o *extractObj) Remove(context.Context) error {
panic("not implemented")
}
func (e *extractFs) List(context.Context, string) (fs.DirEntries, error) {
panic("not implemented")
}
func (e *extractFs) Put(context.Context, io.Reader, fs.ObjectInfo, ...fs.OpenOption) (fs.Object, error) {
panic("not implemented")
}
func (e *extractFs) Mkdir(context.Context, string) error {
panic("not implemented")
}
func (e *extractFs) Rmdir(context.Context, string) error {
panic("not implemented")
}
func (e *extractFs) Name() string {
return "extract"
}
// Root of the remote
func (e *extractFs) Root() string {
return fmt.Sprintf("%s:%s/%s", e.fsrc.Name(), e.fsrc.Root(), e.srcFileName)
}
// String returns a description of the FS
func (e *extractFs) String() string {
return fmt.Sprintf("tar.gz archive %s at %s", e.Name(), e.Root())
}
// Precision of the ModTimes in this Fs
func (e *extractFs) Precision() time.Duration {
e.mu.Lock()
defer e.mu.Unlock()
if e.tarFormat == tar.FormatPAX {
return time.Millisecond
}
return time.Second
}
// Hashes returns the supported hash types of the filesystem
func (e *extractFs) Hashes() hash.Set {
return hash.Set(hash.None)
}
// Features returns the optional features of this Fs
func (e *extractFs) Features() *fs.Features {
return e.features
}
// NewObject finds the Object at remote. If it can't be found
// it returns the error ErrorObjectNotFound.
func (e *extractFs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
file := e.tarCache.GetFile(remote)
if file == nil {
panic("cache MUST contain information about this file")
}
e.mu.Lock()
if e.tarFormat == tar.FormatUnknown {
e.tarFormat = file.fileInfo.FileInfo.Format
}
e.mu.Unlock()
return e.newExtractObj(file), nil
}
type newTarReader func(io.Reader) tarcache.TarReader
func newExtract(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, srcFileName string, maxFileSizeForCache int, newTarReaderFn newTarReader) *extract {
ci := fs.GetConfig(ctx)
s := &extract{
ctx: ctx,
ci: ci,
fdst: fdst,
fsrc: fsrc,
srcFileName: srcFileName,
maxFileSizeForCache: maxFileSizeForCache,
newTarReaderFn: newTarReaderFn,
toBeChecked: make(chan string, ci.Transfers),
toBeUploaded: make(chan copyJob, ci.Transfers),
}
if ci.MaxDuration > 0 {
s.maxDurationEndTime = time.Now().Add(ci.MaxDuration)
fs.Infof(s.fdst, "Transfer session %v deadline: %s", ci.CutoffMode, s.maxDurationEndTime.Format("2006/01/02 15:04:05"))
}
// If a max session duration has been defined add a deadline
// to the main context if cutoff mode is hard. This will cut
// the transfers off.
if !s.maxDurationEndTime.IsZero() && ci.CutoffMode == fs.CutoffModeHard {
s.ctx, s.cancel = context.WithDeadline(ctx, s.maxDurationEndTime)
} else {
s.ctx, s.cancel = context.WithCancel(ctx)
}
return s
}
// transfer copy job.scr to job.path.
func (e *extract) transfer(ctx context.Context, in <-chan copyJob) {
defer e.transfersWg.Done()
for {
select {
case <-ctx.Done():
return
case job, ok := <-in:
if !ok {
return
}
obj := job.src.(*extractObj)
var err error
needTransfer := true
if job.dst != nil {
needTransfer = operations.NeedTransfer(e.ctx, job.dst, job.src)
}
if needTransfer {
_, err = operations.Copy(ctx, e.fdst, job.dst, job.path, job.src)
}
obj.Release()
if err != nil {
e.errCnt.Add(1)
e.setLastError(err)
e.cancel()
return
}
}
}
}
// runTransfers starts transfers
func (e *extract) runTransfers(ctx context.Context, in <-chan copyJob) {
for i := 0; i < e.ci.Transfers; i++ {
e.transfersWg.Add(1)
go e.transfer(ctx, in)
}
}
// stopTransfers stops all transfers and waits for them to finish
func (e *extract) stopTransfers() {
close(e.toBeUploaded)
fs.Debugf(e.fdst, "Waiting for transfers to finish")
e.transfersWg.Wait()
}
func (e *extract) setLastError(err error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.lastError == nil {
e.lastError = err
}
}
func (e *extract) getLastError() error {
e.mu.Lock()
defer e.mu.Unlock()
return e.lastError
}
// checker checks if filePath exists in fdst and depending on the result, prepare the task for transfer
func (e *extract) checker(ctx context.Context, in <-chan string, out chan<- copyJob) {
defer e.checkersWg.Done()
for {
var filePath string
var ok bool
select {
case <-ctx.Done():
return
case filePath, ok = <-in:
if !ok {
return
}
}
idx := strings.LastIndex(filePath, "/")
var dir string
if idx != -1 {
dir = filePath[:idx]
}
err := e.dsh.updateDirList(ctx, dir)
if err != nil {
// TODO: think about retries
e.cancel()
return
}
src, err := e.extrFs.NewObject(ctx, filePath)
if err != nil {
e.setLastError(err)
e.cancel()
return
}
job := copyJob{
src: src,
path: filePath,
}
if objInfo := e.dsh.getObjectByPath(filePath); objInfo != nil {
if dst := objInfo.(fs.Object); dst != nil {
// When a file with the same name as in the archive is found at the destination,
// modify the copyJob to ensure operations.Copy uses Patch instead of Put
job.dst = dst
}
}
out <- job
}
}
// runCheckers starts checkers
func (e *extract) runCheckers(ctx context.Context, in <-chan string, out chan<- copyJob) {
for i := 0; i < e.ci.Transfers; i++ {
e.checkersWg.Add(1)
go e.checker(ctx, in, out)
}
}
// stopTransfers stops all checkers and waits for them to finish
func (e *extract) stopCheckers() {
close(e.toBeChecked)
fs.Debugf(e.fdst, "Waiting for checkers to finish")
e.checkersWg.Wait()
}
func newExtractFs(ctx context.Context, fsrc fs.Fs, srcFileName string, transfers int, maxFileSizeForCache int, newTarReaderFn newTarReader) *extractFs {
res := &extractFs{
fsrc: fsrc,
srcFileName: srcFileName,
tarCache: newTarCache(fsrc, srcFileName, newTarReaderFn, transfers, maxFileSizeForCache),
tarFormat: tar.FormatUnknown,
}
res.features = (&fs.Features{
CanHaveEmptyDirectories: false,
}).Fill(ctx, res)
return res
}
// Open opens an archive from the source filesystem
func (e *extractFs) Open(ctx context.Context, options ...fs.OpenOption) error {
return e.tarCache.Open(ctx, options...)
}
func (e *extractFs) close() error {
return e.tarCache.Close()
}
func (e *extractFs) AddNextObject(ctx context.Context) (string, error) {
return e.tarCache.Next(ctx)
}
func (e *extract) run() error {
fExtr := newExtractFs(e.ctx, e.fsrc, e.srcFileName, e.ci.Transfers, e.maxFileSizeForCache, e.newTarReaderFn)
e.dsh = newDirStructHolder(makeListDir(e.fdst))
e.extrFs = fExtr
e.hashType, e.hashOption = operations.CommonHash(e.ctx, fExtr, e.fdst)
// Options for the download
downloadOptions := []fs.OpenOption{e.hashOption}
for _, option := range e.ci.DownloadHeaders {
downloadOptions = append(downloadOptions, option)
}
err := fExtr.Open(e.ctx, downloadOptions...)
if err != nil {
return err
}
e.toBeChecked = make(chan string, e.ci.Transfers)
e.toBeUploaded = make(chan copyJob, e.ci.Transfers)
e.runTransfers(e.ctx, e.toBeUploaded)
e.runCheckers(e.ctx, e.toBeChecked, e.toBeUploaded)
var exitErr error
for {
path, err := fExtr.AddNextObject(e.ctx)
if err != nil {
exitErr = err
break
}
if path == "" {
break
}
e.toBeChecked <- path
}
e.stopCheckers()
e.stopTransfers()
fExtr.tarCache.ReleaseUnprocessedFiles()
if err := fExtr.close(); err != nil {
if exitErr == nil {
exitErr = err
}
}
lastError := e.getLastError()
if lastError != nil {
return lastError
}
return exitErr
}
// Extract implements the extract command
func Extract(ctx context.Context, fdst fs.Fs, fsrc fs.Fs, srcFileName string, maxFileSizeForCache int) error {
do := newExtract(ctx, fdst, fsrc, srcFileName, maxFileSizeForCache, nil)
return do.run()
}

493
fs/extract/extract_test.go Normal file
View file

@ -0,0 +1,493 @@
package extract
import (
"archive/tar"
"bytes"
"context"
"fmt"
"io"
"math"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/rclone/rclone/backend/memory"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/extract/tarcache"
"github.com/rclone/rclone/fs/extract/tarcache/tartest"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fstest/mockobject"
"github.com/stretchr/testify/require"
)
type objInfo struct {
info fs.Fs
remote string
modTime time.Time
size int64
}
func (o *objInfo) Fs() fs.Info {
return o.info
}
func (o *objInfo) String() string {
return o.remote
}
func (o *objInfo) Remote() string {
return o.remote
}
func (o *objInfo) ModTime(context.Context) time.Time {
return o.modTime
}
func (o *objInfo) Size() int64 {
return o.size
}
func (o *objInfo) Hash(context.Context, hash.Type) (string, error) {
return "", nil
}
func (o *objInfo) Storable() bool {
return true
}
func newObjInfo(info fs.Fs, remote string, size int64) fs.ObjectInfo {
rnd := tartest.NewRandomizer()
offset := time.Millisecond * time.Duration(rnd.Intn(1000000))
return &objInfo{info, remote, time.Now().Add(offset), size}
}
type FailObj struct {
fs.Object
fscr *FailFs
isFailObj bool
}
type FailFs struct {
fs.Fs
failFile string
maxFails atomic.Int32
totalFails atomic.Int32
putCnt atomic.Int32
updateCnt atomic.Int32
}
func (f *FailFs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) error {
listR, ok := f.Fs.(fs.ListRer)
if !ok {
return fmt.Errorf("fail fs %s does not implement fs.ListRer", f.Fs)
}
return listR.ListR(ctx, dir, callback)
}
type FailReadCloser struct {
io.ReadCloser
}
type FailTarReader struct {
*tar.Reader
failFile string
maxFails atomic.Int32
totalFails atomic.Int32
currentHeader *tar.Header
mu sync.Mutex
}
func (f *FailTarReader) UpdateTarReader(r *tar.Reader) {
f.mu.Lock()
defer f.mu.Unlock()
f.Reader = r
}
func (f *FailTarReader) tarReader() *tar.Reader {
f.mu.Lock()
defer f.mu.Unlock()
return f.Reader
}
func (f *FailTarReader) Read(p []byte) (n int, err error) {
if f.currentHeader == nil {
return f.tarReader().Read(p)
}
if f.currentHeader.Name == f.failFile {
if f.isNeedToFail() {
f.decReadWriteFailAttempts()
return 0, fserrors.RetryError(io.ErrNoProgress)
}
}
return f.tarReader().Read(p)
}
func (f *FailTarReader) Next() (*tar.Header, error) {
h, err := f.Reader.Next()
if err != nil {
return h, err
}
f.currentHeader = h
return h, nil
}
func (f *FailTarReader) decReadWriteFailAttempts() {
f.maxFails.Add(-1)
f.totalFails.Add(1)
}
func (f *FailTarReader) isNeedToFail() bool {
return f.maxFails.Load() > 0
}
func newFailTarReader(t *tar.Reader, failFile string, maxFails int) *FailTarReader {
res := &FailTarReader{
Reader: t,
failFile: failFile,
}
res.maxFails.Store(int32(maxFails))
return res
}
func NewFailReadCloser(rc io.ReadCloser) *FailReadCloser {
return &FailReadCloser{rc}
}
func (r *FailReadCloser) Read([]byte) (int, error) {
return 0, fserrors.RetryError(io.ErrNoProgress)
}
func NewFailObj(f fs.Object, failFs *FailFs, isFailObj bool) *FailObj {
return &FailObj{f, failFs, isFailObj}
}
func (o *FailObj) Fs() fs.Info {
return o.fscr
}
func (o *FailObj) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
rc, err := o.Object.Open(ctx, options...)
if err != nil {
return nil, err
}
if o.isFailObj && o.fscr.IsNeedToFail() {
o.fscr.DecReadWriteFailAttempts()
return NewFailReadCloser(rc), nil
}
return rc, nil
}
func (o *FailObj) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
fFs, ok := o.Fs().(*FailFs)
if !ok {
panic("fail fs does not implement fail fs")
}
fFs.updateCnt.Add(1)
if o.isFailObj && o.fscr.IsNeedToFail() {
o.fscr.DecReadWriteFailAttempts()
return fserrors.RetryError(io.ErrNoProgress)
}
return o.Object.Update(ctx, in, src, options...)
}
func NewFailFs(f fs.Fs, failFile string, maxFails int) *FailFs {
res := &FailFs{Fs: f, failFile: failFile}
res.maxFails.Store(int32(maxFails))
return res
}
func (f *FailFs) IsNeedToFail() bool {
return f.maxFails.Load() > 0
}
func (f *FailFs) TotalFails() int {
return int(f.totalFails.Load())
}
func (f *FailFs) DecReadWriteFailAttempts() {
f.maxFails.Add(-1)
f.totalFails.Add(1)
}
func (f *FailFs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
retFs, err := f.Fs.NewObject(ctx, remote)
if err != nil {
return nil, err
}
return NewFailObj(retFs, f, f.failFile == remote), nil
}
func (f *FailFs) List(ctx context.Context, dir string) (fs.DirEntries, error) {
entries, err := f.Fs.List(ctx, dir)
for idx, entry := range entries {
if obj, ok := entry.(fs.Object); ok {
entries[idx] = NewFailObj(obj, f, f.failFile == obj.Remote())
}
}
return entries, err
}
func (f *FailFs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
f.putCnt.Add(1)
if src.Remote() == f.failFile {
if f.IsNeedToFail() {
f.DecReadWriteFailAttempts()
return nil, fserrors.RetryError(io.ErrNoProgress)
}
}
return f.Fs.Put(ctx, in, src, options...)
}
func TestExtract(t *testing.T) {
t.Run("Happy path", testHappyPath)
t.Run("Double error while writing a small file to a remote server.", testFdstFailSmallFile)
t.Run("Double error while writing a large file to a remote server.", testFdstFailLargeFile)
t.Run("Multiple errors while writing a small file to a remote server.", testFdstFail2SmallFile)
t.Run("Multiple errors while writing a large file to a remote server.", testFdstFail2LargeFile)
t.Run("Double errors while reading a small file from an archive.", sdstFailSmallFile)
t.Run("Double errors while reading a large file from an archive.", sdstFailLargeFile)
}
func helperTestFdstFail2(t *testing.T, isStream bool) {
bigFileSize := 2000
archiveFile := "archive.tar.gz"
ctx, srcFs, dstFs, rootItem := prepareSrcAndDstFs(t, fmt.Sprintf("src:dir%t", isStream), fmt.Sprintf("dst:dir%t", isStream), archiveFile, bigFileSize)
skipFiles := 10
filePath := rootItem.FindFilePath(&skipFiles, bigFileSize, !isStream)
require.NotEqual(t, "", filePath)
expectedFalls := 100
newDstFs := NewFailFs(dstFs, filePath, expectedFalls)
// Extract archive to dst fs
err := Extract(ctx, newDstFs, srcFs, archiveFile, bigFileSize)
require.Error(t, err)
require.Greater(t, newDstFs.TotalFails(), 2)
}
func testFdstFail2SmallFile(t *testing.T) {
helperTestFdstFail2(t, false)
}
func testFdstFail2LargeFile(t *testing.T) {
helperTestFdstFail2(t, true)
}
func helperTestFdstFail(t *testing.T, isStream bool) {
bigFileSize := 2000
archiveFile := "archive.tar.gz"
ctx, srcFs, dstFs, rootItem := prepareSrcAndDstFs(t, fmt.Sprintf("src:dir%t", isStream), fmt.Sprintf("dst:dir%t", isStream), archiveFile, bigFileSize)
skipFiles := 10
filePath := rootItem.FindFilePath(&skipFiles, bigFileSize, !isStream)
require.NotEqual(t, "", filePath)
expectedFalls := 2
newDstFs := NewFailFs(dstFs, filePath, expectedFalls)
// Extract archive to dst fs
err := Extract(ctx, newDstFs, srcFs, archiveFile, bigFileSize)
require.NoError(t, err)
require.Equal(t, expectedFalls, newDstFs.TotalFails())
ll := make(map[string]struct{})
listRer := dstFs.(fs.ListRer)
require.NotNil(t, listRer)
err = listRer.ListR(ctx, "", func(entries fs.DirEntries) error {
for _, entry := range entries {
obj, ok := entry.(fs.Object)
if !ok {
continue
}
ll[obj.Remote()] = struct{}{}
}
return nil
})
require.NoError(t, err)
checkFunc(ctx, t, dstFs, ll, rootItem)
require.Equal(t, 0, len(ll))
}
func sdstFailSmallFile(t *testing.T) {
sdstFailHelper(t, false)
}
func sdstFailLargeFile(t *testing.T) {
sdstFailHelper(t, true)
}
func sdstFailHelper(t *testing.T, isStream bool) {
bigFileSize := 2000
archiveFile := "archive.tar.gz"
ctx, srcFs, dstFs, rootItem := prepareSrcAndDstFs(t, fmt.Sprintf("src:didir2%t", isStream), fmt.Sprintf("dst:dir2%t", isStream), archiveFile, bigFileSize)
skipFiles := 10
filePath := rootItem.FindFilePath(&skipFiles, bigFileSize, !isStream)
require.NotEqual(t, "", filePath)
expectedFalls := 2
// Extract archive to dst fs
tr := newFailTarReader(nil, filePath, expectedFalls)
extr := newExtract(ctx, dstFs, srcFs, archiveFile, bigFileSize, func(reader io.Reader) tarcache.TarReader {
tarReader := tar.NewReader(reader)
tr.UpdateTarReader(tarReader)
return tr
})
err := extr.run()
if !isStream {
// When processing files that are placed in the cache, tarcahe.Cache returns an error
// at the moment of reading the header, so operation.Copy will not be able to initiate
// a second read of this file. In this case, if an error occurs for the first time,
// it is expected that it will be returned immediately
require.Error(t, err)
return
}
require.NoError(t, err)
ll := make(map[string]struct{})
listRer := dstFs.(fs.ListRer)
require.NotNil(t, listRer)
err = listRer.ListR(ctx, "", func(entries fs.DirEntries) error {
for _, entry := range entries {
obj, ok := entry.(fs.Object)
if !ok {
continue
}
ll[obj.Remote()] = struct{}{}
}
return nil
})
require.NoError(t, err)
checkFunc(ctx, t, dstFs, ll, rootItem)
require.Equal(t, 0, len(ll))
}
func testFdstFailSmallFile(t *testing.T) {
helperTestFdstFail(t, false)
}
func testFdstFailLargeFile(t *testing.T) {
helperTestFdstFail(t, true)
}
func checkFunc(ctx context.Context, t *testing.T, dstFs fs.Fs, files map[string]struct{}, itm *tartest.Item) {
for _, obj := range itm.Children {
if obj.IsDir() {
checkFunc(ctx, t, dstFs, files, obj)
continue
}
_, ok := files[obj.Name()]
require.True(t, ok)
delete(files, obj.Name())
o, err := dstFs.NewObject(ctx, obj.Name())
require.NoError(t, err)
reader, err := o.Open(ctx)
require.NoError(t, err)
require.NotNil(t, reader)
actual, err := io.ReadAll(reader)
require.NoError(t, err)
require.Equal(t, obj.FileContent, actual)
}
}
func prepareSrcAndDstFs(t *testing.T, srcRoot, dstRoot, archiveFile string, bigFileSize int) (ctx context.Context, src fs.Fs, dst fs.Fs, rootItem *tartest.Item) {
ctx = context.Background()
ctx, ci := fs.AddConfig(ctx)
ci.Transfers = 10
ci2 := fs.GetConfig(ctx)
require.Equal(t, ci.Transfers, ci2.Transfers)
var err error
var archiveContent []byte
rootItem, archiveContent = tartest.CreateTarArchive(t, 5, 10, 12, 50, bigFileSize)
src, err = memory.NewFs(ctx, "memory", srcRoot, nil)
require.NoError(t, err)
o := mockobject.New(archiveFile).WithContent(archiveContent, mockobject.SeekModeNone)
reader, err := o.Open(ctx)
require.NoError(t, err)
// Put archive file to source fs
_, err = src.Put(ctx, reader, o)
require.NoError(t, err)
dst, err = memory.NewFs(ctx, "memory", dstRoot, nil)
require.NoError(t, err)
return
}
func testHappyPath(t *testing.T) {
bigFileSize := 2000
archiveFile := "archive.tar.gz"
ctx, srcFs, dstFs, rootItem := prepareSrcAndDstFs(t, "src:dir", "dst:dir", archiveFile, bigFileSize)
wrapDstFs := NewFailFs(dstFs, "", math.MaxInt64)
dstFs = wrapDstFs
defer func() {
require.NoError(t, srcFs.Rmdir(ctx, "src:dir"))
require.NoError(t, dstFs.Rmdir(ctx, "dst:dir"))
}()
// Extract archive to dst fs
err := Extract(ctx, dstFs, srcFs, archiveFile, bigFileSize)
require.NoError(t, err)
ll := make(map[string]struct{})
files := make([]string, 0)
listRer := dstFs.(fs.ListRer)
require.NotNil(t, listRer)
err = listRer.ListR(ctx, "", func(entries fs.DirEntries) error {
for _, entry := range entries {
obj, ok := entry.(fs.Object)
if !ok {
continue
}
ll[obj.Remote()] = struct{}{}
files = append(files, obj.Remote())
}
return nil
})
filesCnt := len(files)
require.NoError(t, err)
require.Equal(t, int32(filesCnt), wrapDstFs.putCnt.Load())
require.Equal(t, int32(0), wrapDstFs.updateCnt.Load())
require.Equal(t, filesCnt, len(ll))
checkFunc(ctx, t, dstFs, ll, rootItem)
require.Equal(t, 0, len(ll))
rnd := tartest.NewRandomizer()
cnt := 0
for i := 0; i < filesCnt; i += 3 {
filePath := files[i]
cnt++
obj, err := dstFs.NewObject(ctx, filePath)
require.NoError(t, err)
var fileSize int
isSmallFile := rnd.Dice(50)
if isSmallFile {
// [1, bigFileSize)
fileSize = rnd.Intn(bigFileSize-1) + 1
} else {
// [bigFileSize, 2*bigFileSize)
fileSize = rnd.Intn(bigFileSize) + bigFileSize
}
fileContent := make([]byte, fileSize)
_, _ = rnd.Read(fileContent)
reader := bytes.NewReader(fileContent)
err = obj.Update(ctx, reader, newObjInfo(dstFs, filePath, int64(fileSize)))
require.NoError(t, err)
}
wrapDstFs.putCnt.Store(0)
wrapDstFs.updateCnt.Store(0)
// Extract archive to dst fs
err = Extract(ctx, wrapDstFs, srcFs, archiveFile, bigFileSize)
require.NoError(t, err)
require.Equal(t, int32(0), wrapDstFs.putCnt.Load())
require.Equal(t, int32(cnt), wrapDstFs.updateCnt.Load())
}

View file

@ -0,0 +1,175 @@
// Package tarcache provides a cache for tar archives.
package tarcache
import (
"archive/tar"
"context"
"errors"
"io"
"sync"
"time"
)
// TarReader is an interface for reading tar archives.
type TarReader interface {
io.Reader
Next() (*tar.Header, error)
}
// Cache is a cache for tar archives.
type Cache struct {
reader TarReader
workerCnt int
maxSizeInMemory int64
ch chan struct{}
helperCh chan func()
streamWg sync.WaitGroup
wg sync.WaitGroup
}
// Header holds info about file in tar archive.
type Header struct {
FilePath string
ModTime time.Time
Size int64
Format tar.Format
}
// FileInfo holds info about file in tar archive and its content.
type FileInfo struct {
Payload []byte
IoPayload io.Reader
ReleaseFn func()
FileInfo Header
}
func (t *Cache) waitWorkersDone() {
t.wg.Wait()
}
// Close closes the cache and free all resources.
func (t *Cache) Close() error {
t.waitWorkersDone()
if t.ch != nil {
close(t.ch)
t.ch = nil
close(t.helperCh)
t.helperCh = nil
return nil
}
return errors.New("cache already closed")
}
// NewTarCache creates a new tar cache.
func NewTarCache(reader TarReader, workerCnt int, maxSizeInMemory int64) *Cache {
res := &Cache{
reader: reader,
workerCnt: workerCnt,
maxSizeInMemory: maxSizeInMemory,
ch: make(chan struct{}, workerCnt),
helperCh: make(chan func(), 1),
}
// helper goroutine for waiting until the file content is fully read from the stream
go func() {
for fn := range res.helperCh {
fn()
}
}()
return res
}
type streamCloser struct {
ch chan struct{}
cache *Cache
isBytes bool
}
func newStreamCloser(cache *Cache, isBytes bool) *streamCloser {
if !isBytes {
cache.streamWg.Add(1)
}
cache.wg.Add(1)
return &streamCloser{
ch: cache.ch,
cache: cache,
isBytes: isBytes,
}
}
func (s *streamCloser) close() {
if s.ch == nil {
return
}
ch := s.ch
s.ch = nil
<-ch
if !s.isBytes {
s.cache.streamWg.Done()
}
s.cache.wg.Done()
}
// Next returns info about the next file in the archive
func (t *Cache) Next(ctx context.Context) (_ FileInfo, err error) {
// We block the execution flow if the number of currently processed files exceeds the limit.
select {
// Reserve slot for processing the file
case t.ch <- struct{}{}:
case <-ctx.Done():
return FileInfo{}, ctx.Err()
}
watcher := make(chan struct{})
t.helperCh <- func() {
// We also block the execution flow if there is a reader without caching.
t.streamWg.Wait()
close(watcher)
}
// If the method exits because of an error, release the slot used by the processed file right away
defer func() {
if err != nil {
<-t.ch
}
}()
select {
case <-watcher:
case <-ctx.Done():
return FileInfo{}, ctx.Err()
}
for {
header, err := t.reader.Next()
if err != nil {
return FileInfo{nil, nil, nil, Header{}}, err
}
switch header.Typeflag {
case tar.TypeDir:
continue
case tar.TypeReg:
h := Header{
FilePath: header.Name,
ModTime: header.ModTime,
Size: header.Size,
Format: header.Format,
}
// If the file size is smaller than maxSizeInMemory, read its content
if header.Size < t.maxSizeInMemory {
var payload []byte
payload, err = io.ReadAll(t.reader)
if err != nil {
return FileInfo{nil, nil, nil, Header{}}, err
}
closer := newStreamCloser(t, true)
return FileInfo{payload, nil, closer.close, h}, nil
}
// Otherwise, return FileInfo with a stream reader
closer := newStreamCloser(t, false)
return FileInfo{nil, t.reader, closer.close, h}, nil
default:
continue
}
}
}

View file

@ -0,0 +1,528 @@
package tarcache
import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"encoding/hex"
"errors"
"fmt"
"io"
"io/fs"
"math"
"math/rand"
"path"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
)
type itemType int
const (
Dir itemType = iota
File
)
// item implements fs.FileInfo interface
type item struct {
iType itemType
itemName string
children map[string]*item
fileContent []byte
}
func (i *item) Name() string {
return i.itemName
}
func (i *item) Size() int64 {
return int64(len(i.fileContent))
}
func (i *item) Mode() fs.FileMode {
res := fs.ModePerm
if i.iType == Dir {
res |= fs.ModeDir
}
return res
}
func (i *item) ModTime() time.Time {
return time.Now()
}
func (i *item) IsDir() bool {
return i.iType == Dir
}
func (i *item) Sys() any {
return nil
}
func dice(rnd *rand.Rand, percent int) bool {
return rnd.Intn(100) < percent
}
func randomFileName(rnd *rand.Rand) string {
b := make([]byte, 16)
rnd.Read(b)
return hex.EncodeToString(b)
}
type errorReader struct {
io.Reader
bytesRead int
threshold int
}
var errRead = errors.New("read error")
func newReaderWithError(rc io.Reader, threshold int) *errorReader {
return &errorReader{rc, 0, threshold}
}
func (e *errorReader) Read(p []byte) (n int, err error) {
if e.bytesRead > e.threshold {
return 0, errRead
}
n, err = e.Reader.Read(p)
e.bytesRead += n
return
}
func (i *item) createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) {
if deep <= 0 {
return
}
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir
for j := 0; j < nItem; j++ {
isFile := deep == 1 || dice(rnd, 60)
newItem := item{}
var prefix string
if len(i.itemName) > 0 {
prefix = i.itemName + "/"
}
if isFile {
newItem.itemName = fmt.Sprintf("%sfile_%s", prefix, randomFileName(rnd))
newItem.iType = File
var fileSize int
isSmallFile := dice(rnd, smallFilesPercent)
if isSmallFile {
// [1, bigFileSize)
fileSize = rnd.Intn(bigFileSize-1) + 1
} else {
// [bigFileSize, 2*bigFileSize)
fileSize = rnd.Intn(bigFileSize) + bigFileSize
}
newItem.fileContent = make([]byte, fileSize)
rnd.Read(newItem.fileContent)
i.children[newItem.itemName] = &newItem
} else {
newItem.itemName = fmt.Sprintf("%sdir_%s", prefix, randomFileName(rnd))
newItem.iType = Dir
newItem.children = make(map[string]*item)
newItem.createTarArchiveContent(deep-1, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
i.children[newItem.itemName] = &newItem
}
}
}
func (i *item) fillMap(m *sync.Map) {
for _, v := range i.children {
if v.iType == File {
m.Store(v.itemName, v.fileContent)
} else if v.iType == Dir {
v.fillMap(m)
}
}
}
func createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) *item {
root := &item{
iType: Dir,
itemName: "",
children: make(map[string]*item),
}
root.createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
return root
}
func (i *item) writeItemToArchive(tw *tar.Writer) error {
// In the first pass, we write the files from the current directory.
for _, child := range i.children {
if child.iType == File {
head, err := tar.FileInfoHeader(child, path.Base(child.itemName))
if err != nil {
return err
}
head.Name = child.itemName
err = tw.WriteHeader(head)
if err != nil {
return err
}
_, err = io.Copy(tw, bytes.NewReader(child.fileContent))
if err != nil {
return err
}
}
}
// In the second pass, we write files from child directories.
for _, child := range i.children {
if child.iType == Dir {
if err := child.writeItemToArchive(tw); err != nil {
return err
}
}
}
return nil
}
func createTarArchiveHelper(writer io.Writer, content *item) error {
gz := gzip.NewWriter(writer)
defer func() {
_ = gz.Close()
}()
tw := tar.NewWriter(gz)
defer func() {
_ = tw.Close()
}()
return content.writeItemToArchive(tw)
}
func createTarArchive(t *testing.T, deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) (*item, []byte) {
content := createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
writer := bytes.NewBuffer(make([]byte, 0))
err := createTarArchiveHelper(writer, content)
archData := writer.Bytes()
require.NoError(t, err)
require.NotNil(t, content)
require.NotNil(t, archData)
return content, archData
}
func createTarArchiveWithOneFileHelper(w io.Writer) error {
gz := gzip.NewWriter(w)
defer func() {
_ = gz.Close()
}()
tw := tar.NewWriter(gz)
defer func() {
_ = tw.Close()
}()
itm := item{
iType: File,
itemName: "test.bin",
fileContent: make([]byte, 0x80000),
}
rootItm := item{
iType: Dir,
children: map[string]*item{
itm.itemName: &itm,
},
}
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
rnd.Read(itm.fileContent)
return rootItm.writeItemToArchive(tw)
}
func createTarArchiveWithOneFile(t *testing.T) []byte {
writer := bytes.NewBuffer(make([]byte, 0))
err := createTarArchiveWithOneFileHelper(writer)
require.NoError(t, err)
return writer.Bytes()
}
func doubleCloseCaseHelper(t *testing.T, readerTypeChecker func(io.Reader) bool) {
bigFileSize := 0x50
content, archData := createTarArchive(t, 5, 8, 10, 50, bigFileSize)
reader := bytes.NewReader(archData)
gzf, err := gzip.NewReader(reader)
require.NoError(t, err)
tarReader := tar.NewReader(gzf)
tarCache := NewTarCache(tarReader, 10, int64(bigFileSize))
var m sync.Map
content.fillMap(&m)
errCh := make(chan error, 100)
ctx, cancel := context.WithCancel(context.Background())
ctx2 := context.Background()
defer cancel()
var wg sync.WaitGroup
var cnt int32
L:
for {
select {
case <-ctx.Done():
break L
default:
}
contentReader, header, err := nextPayload(ctx2, tarCache)
if err != nil {
if err == io.EOF {
break
}
require.NoError(t, err)
}
wg.Add(1)
go func(reader io.ReadCloser, head Header) {
defer wg.Done()
defer func() {
if err := reader.Close(); err != nil {
errCh <- err
cancel()
}
}()
select {
case <-ctx.Done():
return
default:
}
val, ok := m.Load(header.FilePath)
if !ok {
errCh <- errors.New(header.FilePath + " not found")
cancel()
return
}
expected := val.([]byte)
content, err := io.ReadAll(reader)
if err != nil {
errCh <- err
cancel()
return
}
if !bytes.Equal(expected, content) {
errCh <- errors.New(header.FilePath + " content mismatch")
cancel()
return
}
if atomic.AddInt32(&cnt, 1) >= 100 {
if readerTypeChecker(reader) {
if err := reader.Close(); err != nil {
errCh <- err
cancel()
return
}
}
}
}(contentReader, header)
}
err = tarCache.Close()
require.NoError(t, err)
wg.Wait()
close(errCh)
hasError := false
for e := range errCh {
if e != nil {
hasError = true
}
}
require.False(t, hasError)
}
func readErrorCase(t *testing.T) {
ctx := context.Background()
archData := createTarArchiveWithOneFile(t)
reader := newReaderWithError(bytes.NewReader(archData), 0x800)
gzf, err := gzip.NewReader(reader)
require.NoError(t, err)
tarReader := tar.NewReader(gzf)
tarCache := NewTarCache(tarReader, 10, math.MaxInt)
errCh := make(chan error, 100)
var wg sync.WaitGroup
_, _, err = nextPayload(ctx, tarCache)
require.NotErrorIs(t, err, io.EOF)
require.Error(t, err, errRead.Error())
err = tarCache.Close()
require.NoError(t, err)
wg.Wait()
close(errCh)
for e := range errCh {
if err == nil {
err = e
}
}
require.NoError(t, err)
}
type helper struct {
io.Reader
releaseFn func()
once sync.Once
}
// Close invokes the closer function.
func (h *helper) Close() error {
h.once.Do(h.releaseFn)
return nil
}
// nextPayload returns the next archive file as io.ReadCloser
func nextPayload(ctx context.Context, t *Cache) (io.ReadCloser, Header, error) {
res, err := t.Next(ctx)
if err != nil {
return nil, Header{}, err
}
if res.IoPayload != nil {
return &helper{Reader: res.IoPayload, releaseFn: res.ReleaseFn}, res.FileInfo, nil
}
res.IoPayload = bytes.NewReader(res.Payload)
return &helper{Reader: res.IoPayload, releaseFn: res.ReleaseFn}, res.FileInfo, nil
}
func successfulCase(t *testing.T) {
bigFileSize := 0x200
content, archData := createTarArchive(t, 5, 5, 30, 70, bigFileSize)
reader := bytes.NewReader(archData)
gzf, err := gzip.NewReader(reader)
require.NoError(t, err)
tarReader := tar.NewReader(gzf)
tarCache := NewTarCache(tarReader, 10, int64(bigFileSize))
var m sync.Map
content.fillMap(&m)
errCh := make(chan error, 100)
ctx, cancel := context.WithCancel(context.Background())
ctx2 := context.Background()
defer cancel()
var wg sync.WaitGroup
L:
for {
select {
case <-ctx.Done():
break L
default:
}
contentReader, header, err := nextPayload(ctx2, tarCache)
if err != nil {
if err == io.EOF {
break
}
require.NoError(t, err)
}
wg.Add(1)
go func(reader io.ReadCloser, head Header) {
defer wg.Done()
defer func() {
if err := reader.Close(); err != nil {
errCh <- err
cancel()
}
}()
select {
case <-ctx.Done():
return
default:
}
val, ok := m.Load(header.FilePath)
if !ok {
errCh <- errors.New(header.FilePath + " not found")
cancel()
return
}
expected := val.([]byte)
content, err := io.ReadAll(reader)
if err != nil {
errCh <- err
cancel()
return
}
if !bytes.Equal(expected, content) {
errCh <- errors.New(header.FilePath + " content mismatch")
cancel()
return
}
m.Delete(header.FilePath)
}(contentReader, header)
}
err = tarCache.Close()
wg.Wait()
close(errCh)
var err2 error
for e := range errCh {
if err2 == nil {
err2 = e
}
}
require.NoError(t, err2)
require.NoError(t, err)
// Checking for dictionary emptiness
var l int
m.Range(func(k, v interface{}) bool {
l++
return true
})
require.Equal(t, 0, l)
}
func TestTarCache(t *testing.T) {
t.Run("Successful case", func(t *testing.T) {
successfulCase(t)
})
t.Run("Double close of the byte reader", func(t *testing.T) {
doubleCloseCaseHelper(t, func(reader io.Reader) bool {
e, ok := reader.(*helper)
if !ok {
return false
}
_, ok = e.Reader.(*bytes.Reader)
return ok
})
})
t.Run("Double close of the stream reader", func(t *testing.T) {
doubleCloseCaseHelper(t, func(reader io.Reader) bool {
e, ok := reader.(*helper)
if !ok {
return false
}
_, ok = e.Reader.(*bytes.Reader)
return !ok
})
})
t.Run("Read error", func(t *testing.T) {
readErrorCase(t)
})
}

View file

@ -0,0 +1,239 @@
// Package tartest is a set of functions that assist with the testing of tar archives
package tartest
import (
"archive/tar"
"bytes"
"compress/gzip"
"encoding/hex"
"fmt"
"io"
"io/fs"
"math/rand"
"path"
"testing"
"time"
"github.com/stretchr/testify/require"
)
// FindFilePath is a helper function to find the file path of an item
func (i *Item) FindFilePath(skipCnt *int, bigFileSize int, fitInCache bool) string {
for _, obj := range i.Children {
if !obj.IsDir() {
isBig := len(obj.FileContent) >= bigFileSize
if isBig && !fitInCache || !isBig && fitInCache {
if *skipCnt == 0 {
return obj.Name()
}
*skipCnt--
}
}
}
for _, obj := range i.Children {
if obj.IsDir() {
res := obj.FindFilePath(skipCnt, bigFileSize, fitInCache)
if len(res) > 0 {
return res
}
}
}
return ""
}
// ItemType is an item type
type ItemType int
// Item types
const (
Dir ItemType = iota // directory type
File // regular file type
)
// Item represents a file or directory
type Item struct {
Type ItemType
ItemName string
Children map[string]*Item
FileContent []byte
}
// Name returns file name
func (i *Item) Name() string {
return i.ItemName
}
// Size returns file size
func (i *Item) Size() int64 {
return int64(len(i.FileContent))
}
// Mode returns file mode
func (i *Item) Mode() fs.FileMode {
res := fs.ModePerm
if i.Type == Dir {
res |= fs.ModeDir
}
return res
}
// ModTime returns modification time
func (i *Item) ModTime() time.Time {
return time.Now()
}
// IsDir returns true if item is a directory
func (i *Item) IsDir() bool {
return i.Type == Dir
}
// Sys returns underlying data source (can return nil)
func (i *Item) Sys() any {
return nil
}
// Randomizer is a helper to generate random data
type Randomizer struct {
rnd *rand.Rand
}
// NewRandomizer returns a new Randomizer
func NewRandomizer() Randomizer {
return Randomizer{
rnd: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}
func (r Randomizer) Read(p []byte) (n int, err error) {
return r.rnd.Read(p)
}
// Dice returns true if dice roll is less than the specified percent
func (r Randomizer) Dice(percent int) bool {
return r.rnd.Intn(100) < percent
}
// RandomFileName generates random file name
func (r Randomizer) RandomFileName() string {
b := make([]byte, 16)
r.rnd.Read(b)
return hex.EncodeToString(b)
}
// Intn returns random integer in the range [0, n)
func (r Randomizer) Intn(n int) int {
return r.rnd.Intn(n)
}
func (i *Item) createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) {
if deep <= 0 {
return
}
rnd := NewRandomizer()
nItem := rnd.Intn(maxItemsInDir-minItemsInDir) + minItemsInDir
for j := 0; j < nItem; j++ {
isFile := deep == 1 || rnd.Dice(60)
newItem := Item{}
var prefix string
if len(i.ItemName) > 0 {
prefix = i.ItemName + "/"
}
if isFile {
newItem.ItemName = fmt.Sprintf("%sfile_%s", prefix, rnd.RandomFileName())
newItem.Type = File
var fileSize int
isSmallFile := rnd.Dice(smallFilesPercent)
if isSmallFile {
// [1, bigFileSize)
fileSize = rnd.Intn(bigFileSize-1) + 1
} else {
// [bigFileSize, 2*bigFileSize)
fileSize = rnd.Intn(bigFileSize) + bigFileSize
}
newItem.FileContent = make([]byte, fileSize)
_, _ = rnd.Read(newItem.FileContent)
i.Children[newItem.ItemName] = &newItem
} else {
newItem.ItemName = fmt.Sprintf("%sdir_%s", prefix, rnd.RandomFileName())
newItem.Type = Dir
newItem.Children = make(map[string]*Item)
newItem.createTarArchiveContent(deep-1, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
i.Children[newItem.ItemName] = &newItem
}
}
}
// CreateTarArchiveContent generates a tar archive containing random data, structured to the specified depth
func CreateTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) *Item {
root := &Item{
Type: Dir,
ItemName: "",
Children: make(map[string]*Item),
}
root.createTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
return root
}
func (i *Item) writeItemToArchiver(tw *tar.Writer) error {
// In the first pass, we write the files from the current directory.
for _, child := range i.Children {
if child.Type == File {
head, err := tar.FileInfoHeader(child, path.Base(child.ItemName))
if err != nil {
return err
}
head.Name = child.ItemName
err = tw.WriteHeader(head)
if err != nil {
return err
}
_, err = io.Copy(tw, bytes.NewReader(child.FileContent))
if err != nil {
return err
}
}
}
// In the second pass, we write files from child directories.
for _, child := range i.Children {
if child.Type == Dir {
if err := child.writeItemToArchiver(tw); err != nil {
return err
}
}
}
return nil
}
func createTarArchiveHelper(writer io.Writer, content *Item) error {
gz := gzip.NewWriter(writer)
defer func() {
_ = gz.Close()
}()
tw := tar.NewWriter(gz)
defer func() {
_ = tw.Close()
}()
return content.writeItemToArchiver(tw)
}
// CreateTarArchive creates a tar archive of the given content.
func CreateTarArchive(t *testing.T, deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize int) (*Item, []byte) {
content := CreateTarArchiveContent(deep, minItemsInDir, maxItemsInDir, smallFilesPercent, bigFileSize)
writer := bytes.NewBuffer(make([]byte, 0))
err := createTarArchiveHelper(writer, content)
archData := writer.Bytes()
require.NoError(t, err)
require.NotNil(t, content)
require.NotNil(t, archData)
return content, archData
}