Compare commits

...

5 commits

Author SHA1 Message Date
Nick Craig-Wood
3fa5a424a9 serve nbd: serve an rclone remote as a Network Block Device - WIP FIXME
TODO

- Need to finalise rclone/gonbdserver and upload and change go.mod/go.sum
- Remove uneeded dependencies from rclone/gonbdserver

Maybe make companion `mount nbd` command?

Fixes #7337
2024-07-30 14:04:07 +01:00
Nick Craig-Wood
9fb0afad88 vfs: chunked files which can be read and written at will
This introduces the vfs/chunked library which can open a file like
object which is stored in parts on the remote. This can be read and
written to anywhere and at any time.
2024-07-30 14:04:07 +01:00
Nick Craig-Wood
2f9c2cf75e vfs: add vfs.WriteFile as an analogue to os.WriteFile 2024-07-30 13:32:45 +01:00
Nick Craig-Wood
1ac18e5765 docs: s3: add section on using too much memory #7974 2024-07-30 09:51:30 +01:00
Nick Craig-Wood
3e8cee148a docs: link the workaround for big directory syncs in the FAQ #7974 2024-07-30 09:41:54 +01:00
12 changed files with 1621 additions and 4 deletions

View file

@ -0,0 +1,140 @@
// Implements an nbd.Backend for serving from a chunked file in the VFS.
package nbd
import (
"errors"
"fmt"
"github.com/rclone/gonbdserver/nbd"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/log"
"github.com/rclone/rclone/vfs/chunked"
"golang.org/x/net/context"
)
// Backend for a single chunked file
type chunkedBackend struct {
file *chunked.File
ec *nbd.ExportConfig
}
// Create Backend for a single chunked file
type chunkedBackendFactory struct {
s *NBD
file *chunked.File
}
// WriteAt implements Backend.WriteAt
func (cb *chunkedBackend) WriteAt(ctx context.Context, b []byte, offset int64, fua bool) (n int, err error) {
defer log.Trace(logPrefix, "size=%d, off=%d", len(b), offset)("n=%d, err=%v", &n, &err)
n, err = cb.file.WriteAt(b, offset)
if err != nil || !fua {
return n, err
}
err = cb.file.Sync()
if err != nil {
return 0, err
}
return n, err
}
// ReadAt implements Backend.ReadAt
func (cb *chunkedBackend) ReadAt(ctx context.Context, b []byte, offset int64) (n int, err error) {
defer log.Trace(logPrefix, "size=%d, off=%d", len(b), offset)("n=%d, err=%v", &n, &err)
return cb.file.ReadAt(b, offset)
}
// TrimAt implements Backend.TrimAt
func (cb *chunkedBackend) TrimAt(ctx context.Context, length int, offset int64) (n int, err error) {
defer log.Trace(logPrefix, "size=%d, off=%d", length, offset)("n=%d, err=%v", &n, &err)
return length, nil
}
// Flush implements Backend.Flush
func (cb *chunkedBackend) Flush(ctx context.Context) (err error) {
defer log.Trace(logPrefix, "")("err=%v", &err)
return nil
}
// Close implements Backend.Close
func (cb *chunkedBackend) Close(ctx context.Context) (err error) {
defer log.Trace(logPrefix, "")("err=%v", &err)
err = cb.file.Close()
return nil
}
// Geometry implements Backend.Geometry
func (cb *chunkedBackend) Geometry(ctx context.Context) (size uint64, minBS uint64, prefBS uint64, maxBS uint64, err error) {
defer log.Trace(logPrefix, "")("size=%d, minBS=%d, prefBS=%d, maxBS=%d, err=%v", &size, &minBS, &prefBS, &maxBS, &err)
size = uint64(cb.file.Size())
minBS = cb.ec.MinimumBlockSize
prefBS = cb.ec.PreferredBlockSize
maxBS = cb.ec.MaximumBlockSize
err = nil
return
}
// HasFua implements Backend.HasFua
func (cb *chunkedBackend) HasFua(ctx context.Context) (fua bool) {
defer log.Trace(logPrefix, "")("fua=%v", &fua)
return true
}
// HasFlush implements Backend.HasFua
func (cb *chunkedBackend) HasFlush(ctx context.Context) (flush bool) {
defer log.Trace(logPrefix, "")("flush=%v", &flush)
return true
}
// New generates a new chunked backend
func (cbf *chunkedBackendFactory) newBackend(ctx context.Context, ec *nbd.ExportConfig) (nbd.Backend, error) {
err := cbf.file.Open(false, 0)
if err != nil {
return nil, fmt.Errorf("failed to open chunked file: %w", err)
}
cb := &chunkedBackend{
file: cbf.file,
ec: ec,
}
return cb, nil
}
// Generate a chunked backend factory
func (s *NBD) newChunkedBackendFactory(ctx context.Context) (bf backendFactory, err error) {
create := s.opt.Create > 0
if s.vfs.Opt.ReadOnly && create {
return nil, errors.New("can't create files with --read-only")
}
file := chunked.New(s.vfs, s.leaf)
err = file.Open(create, s.log2ChunkSize)
if err != nil {
return nil, fmt.Errorf("failed to open chunked file: %w", err)
}
defer fs.CheckClose(file, &err)
var truncateSize fs.SizeSuffix
if create {
if file.Size() == 0 {
truncateSize = s.opt.Create
}
} else {
truncateSize = s.opt.Resize
}
if truncateSize > 0 {
err = file.Truncate(int64(truncateSize))
if err != nil {
return nil, fmt.Errorf("failed to create chunked file: %w", err)
}
fs.Logf(logPrefix, "Size of network block device is now %v", truncateSize)
}
return &chunkedBackendFactory{
s: s,
file: file,
}, nil
}
// Check interfaces
var (
_ nbd.Backend = (*chunkedBackend)(nil)
_ backendFactory = (*chunkedBackendFactory)(nil)
)

View file

@ -0,0 +1,140 @@
// Implements an nbd.Backend for serving from the VFS.
package nbd
import (
"fmt"
"os"
"github.com/rclone/gonbdserver/nbd"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/log"
"github.com/rclone/rclone/vfs"
"golang.org/x/net/context"
)
// Backend for a single file
type fileBackend struct {
file vfs.Handle
ec *nbd.ExportConfig
}
// Create Backend for a single file
type fileBackendFactory struct {
s *NBD
vfs *vfs.VFS
filePath string
perms int
}
// WriteAt implements Backend.WriteAt
func (fb *fileBackend) WriteAt(ctx context.Context, b []byte, offset int64, fua bool) (n int, err error) {
defer log.Trace(logPrefix, "size=%d, off=%d", len(b), offset)("n=%d, err=%v", &n, &err)
n, err = fb.file.WriteAt(b, offset)
if err != nil || !fua {
return n, err
}
err = fb.file.Sync()
if err != nil {
return 0, err
}
return n, err
}
// ReadAt implements Backend.ReadAt
func (fb *fileBackend) ReadAt(ctx context.Context, b []byte, offset int64) (n int, err error) {
defer log.Trace(logPrefix, "size=%d, off=%d", len(b), offset)("n=%d, err=%v", &n, &err)
return fb.file.ReadAt(b, offset)
}
// TrimAt implements Backend.TrimAt
func (fb *fileBackend) TrimAt(ctx context.Context, length int, offset int64) (n int, err error) {
defer log.Trace(logPrefix, "size=%d, off=%d", length, offset)("n=%d, err=%v", &n, &err)
return length, nil
}
// Flush implements Backend.Flush
func (fb *fileBackend) Flush(ctx context.Context) (err error) {
defer log.Trace(logPrefix, "")("err=%v", &err)
return nil
}
// Close implements Backend.Close
func (fb *fileBackend) Close(ctx context.Context) (err error) {
defer log.Trace(logPrefix, "")("err=%v", &err)
err = fb.file.Close()
return nil
}
// Geometry implements Backend.Geometry
func (fb *fileBackend) Geometry(ctx context.Context) (size uint64, minBS uint64, prefBS uint64, maxBS uint64, err error) {
defer log.Trace(logPrefix, "")("size=%d, minBS=%d, prefBS=%d, maxBS=%d, err=%v", &size, &minBS, &prefBS, &maxBS, &err)
fi, err := fb.file.Stat()
if err != nil {
err = fmt.Errorf("failed read info about open backing file: %w", err)
return
}
size = uint64(fi.Size())
minBS = fb.ec.MinimumBlockSize
prefBS = fb.ec.PreferredBlockSize
maxBS = fb.ec.MaximumBlockSize
err = nil
return
}
// HasFua implements Backend.HasFua
func (fb *fileBackend) HasFua(ctx context.Context) (fua bool) {
defer log.Trace(logPrefix, "")("fua=%v", &fua)
return true
}
// HasFlush implements Backend.HasFua
func (fb *fileBackend) HasFlush(ctx context.Context) (flush bool) {
defer log.Trace(logPrefix, "")("flush=%v", &flush)
return true
}
// open the backing file
func (fbf *fileBackendFactory) open() (vfs.Handle, error) {
return fbf.vfs.OpenFile(fbf.filePath, fbf.perms, 0700)
}
// New generates a new file backend
func (fbf *fileBackendFactory) newBackend(ctx context.Context, ec *nbd.ExportConfig) (nbd.Backend, error) {
fd, err := fbf.open()
if err != nil {
return nil, fmt.Errorf("failed to open backing file: %w", err)
}
fb := &fileBackend{
file: fd,
ec: ec,
}
return fb, nil
}
// Generate a file backend factory
func (s *NBD) newFileBackendFactory(ctx context.Context) (bf backendFactory, err error) {
perms := os.O_RDWR
if s.vfs.Opt.ReadOnly {
perms = os.O_RDONLY
}
fbf := &fileBackendFactory{
s: s,
vfs: s.vfs,
perms: perms,
filePath: s.leaf,
}
// Try opening the file so we get errors now rather than later when they are more difficult to report.
fd, err := fbf.open()
if err != nil {
return nil, fmt.Errorf("failed to open backing file: %w", err)
}
defer fs.CheckClose(fd, &err)
return fbf, nil
}
// Check interfaces
var (
_ nbd.Backend = (*fileBackend)(nil)
_ backendFactory = (*fileBackendFactory)(nil)
)

260
cmd/serve/nbd/nbd.go Normal file
View file

@ -0,0 +1,260 @@
// Package nbd provides a network block device server
package nbd
import (
"bufio"
"context"
_ "embed"
"errors"
"fmt"
"io"
"log"
"math/bits"
"path/filepath"
"strings"
"sync"
"github.com/rclone/gonbdserver/nbd"
"github.com/rclone/rclone/cmd"
"github.com/rclone/rclone/cmd/serve/proxy/proxyflags"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/config/flags"
"github.com/rclone/rclone/lib/systemd"
"github.com/rclone/rclone/vfs"
"github.com/rclone/rclone/vfs/vfscommon"
"github.com/rclone/rclone/vfs/vfsflags"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
)
const logPrefix = "nbd"
// OptionsInfo descripts the Options in use
var OptionsInfo = fs.Options{{
Name: "addr",
Default: "localhost:10809",
Help: "IPaddress:Port or :Port to bind server to",
}, {
Name: "min_block_size",
Default: fs.SizeSuffix(512), // FIXME
Help: "Minimum block size to advertise",
}, {
Name: "preferred_block_size",
Default: fs.SizeSuffix(4096), // FIXME this is the max according to nbd-client
Help: "Preferred block size to advertise",
}, {
Name: "max_block_size",
Default: fs.SizeSuffix(1024 * 1024), // FIXME,
Help: "Maximum block size to advertise",
}, {
Name: "create",
Default: fs.SizeSuffix(-1),
Help: "If the destination does not exist, create it with this size",
}, {
Name: "chunk_size",
Default: fs.SizeSuffix(0),
Help: "If creating the destination use this chunk size. Must be a power of 2.",
}, {
Name: "resize",
Default: fs.SizeSuffix(-1),
Help: "If the destination exists, resize it to this size",
}}
// name := flag.String("name", "default", "Export name")
// description := flag.String("description", "The default export", "Export description")
// Options required for nbd server
type Options struct {
ListenAddr string `config:"addr"` // Port to listen on
MinBlockSize fs.SizeSuffix `config:"min_block_size"`
PreferredBlockSize fs.SizeSuffix `config:"preferred_block_size"`
MaxBlockSize fs.SizeSuffix `config:"max_block_size"`
Create fs.SizeSuffix `config:"create"`
ChunkSize fs.SizeSuffix `config:"chunk_size"`
Resize fs.SizeSuffix `config:"resize"`
}
func init() {
fs.RegisterGlobalOptions(fs.OptionsInfo{Name: "nbd", Opt: &Opt, Options: OptionsInfo})
}
// Opt is options set by command line flags
var Opt Options
// AddFlags adds flags for the nbd
func AddFlags(flagSet *pflag.FlagSet, Opt *Options) {
flags.AddFlagsFromOptions(flagSet, "", OptionsInfo)
}
func init() {
flagSet := Command.Flags()
vfsflags.AddFlags(flagSet)
proxyflags.AddFlags(flagSet)
AddFlags(flagSet, &Opt)
}
//go:embed nbd.md
var helpText string
// Command definition for cobra
var Command = &cobra.Command{
Use: "nbd remote:path",
Short: `Serve the remote over NBD.`,
Long: helpText + vfs.Help(),
Annotations: map[string]string{
"versionIntroduced": "v1.65",
"status": "experimental",
},
Run: func(command *cobra.Command, args []string) {
// FIXME could serve more than one nbd?
cmd.CheckArgs(1, 1, command, args)
f, leaf := cmd.NewFsFile(args[0])
cmd.Run(false, true, command, func() error {
s, err := run(context.Background(), f, leaf, Opt)
if err != nil {
log.Fatal(err)
}
defer systemd.Notify()()
// FIXME
_ = s
s.Wait()
return nil
})
},
}
// NBD contains everything to run the server
type NBD struct {
f fs.Fs
leaf string
vfs *vfs.VFS // don't use directly, use getVFS
opt Options
wg sync.WaitGroup
sessionWaitGroup sync.WaitGroup
logRd *io.PipeReader
logWr *io.PipeWriter
log2ChunkSize uint
readOnly bool // Set for read only by vfs config
backendFactory backendFactory
}
// interface for creating backend factories
type backendFactory interface {
newBackend(ctx context.Context, ec *nbd.ExportConfig) (nbd.Backend, error)
}
// Create and start the server for nbd either on directory f or using file leaf in f
func run(ctx context.Context, f fs.Fs, leaf string, opt Options) (s *NBD, err error) {
s = &NBD{
f: f,
leaf: leaf,
opt: opt,
vfs: vfs.New(f, &vfscommon.Opt),
readOnly: vfscommon.Opt.ReadOnly,
}
if opt.ChunkSize != 0 {
if set := bits.OnesCount64(uint64(opt.ChunkSize)); set != 1 {
return nil, fmt.Errorf("--chunk-size must be a power of 2 (counted %d bits set)", set)
}
s.log2ChunkSize = uint(bits.TrailingZeros64(uint64(opt.ChunkSize)))
fs.Debugf(logPrefix, "Using ChunkSize %v (%v), Log2ChunkSize %d", opt.ChunkSize, fs.SizeSuffix(1<<s.log2ChunkSize), s.log2ChunkSize)
}
if !vfscommon.Opt.ReadOnly && vfscommon.Opt.CacheMode < vfscommon.CacheModeWrites {
return nil, errors.New("need --vfs-cache-mode writes or full when serving read/write")
}
// Create the backend factory
if leaf != "" {
s.backendFactory, err = s.newFileBackendFactory(ctx)
} else {
s.backendFactory, err = s.newChunkedBackendFactory(ctx)
}
if err != nil {
return nil, err
}
nbd.RegisterBackend("rclone", s.backendFactory.newBackend)
fs.Debugf(logPrefix, "Registered backends: %v", nbd.GetBackendNames())
var (
protocol = "tcp"
addr = Opt.ListenAddr
)
if strings.HasPrefix(addr, "unix://") || filepath.IsAbs(addr) {
protocol = "unix"
addr = strings.TrimPrefix(addr, "unix://")
}
ec := nbd.ExportConfig{
Name: "default",
Description: fs.ConfigString(f),
Driver: "rclone",
ReadOnly: vfscommon.Opt.ReadOnly,
Workers: 8, // should this be --checkers or a new config flag FIXME
TLSOnly: false, // FIXME
MinimumBlockSize: uint64(Opt.MinBlockSize),
PreferredBlockSize: uint64(Opt.PreferredBlockSize),
MaximumBlockSize: uint64(Opt.MaxBlockSize),
DriverParameters: nbd.DriverParametersConfig{
"sync": "false",
"path": "/tmp/diskimage",
},
}
// Make a logger to feed gonbdserver's logs into rclone's logging system
s.logRd, s.logWr = io.Pipe()
go func() {
scanner := bufio.NewScanner(s.logRd)
for scanner.Scan() {
line := scanner.Text()
if s, ok := strings.CutPrefix(line, "[DEBUG] "); ok {
fs.Debugf(logPrefix, "%s", s)
} else if s, ok := strings.CutPrefix(line, "[INFO] "); ok {
fs.Infof(logPrefix, "%s", s)
} else if s, ok := strings.CutPrefix(line, "[WARN] "); ok {
fs.Logf(logPrefix, "%s", s)
} else if s, ok := strings.CutPrefix(line, "[ERROR] "); ok {
fs.Errorf(logPrefix, "%s", s)
} else if s, ok := strings.CutPrefix(line, "[CRIT] "); ok {
fs.Errorf(logPrefix, "%s", s)
} else {
fs.Infof(logPrefix, "%s", line)
}
}
if err := scanner.Err(); err != nil {
fs.Errorf(logPrefix, "Log writer failed: %v", err)
}
}()
logger := log.New(s.logWr, "", 0)
ci := fs.GetConfig(ctx)
dump := ci.Dump & (fs.DumpHeaders | fs.DumpBodies | fs.DumpAuth | fs.DumpRequests | fs.DumpResponses)
var serverConfig = nbd.ServerConfig{
Protocol: protocol, // protocol it should listen on (in net.Conn form)
Address: addr, // address to listen on
DefaultExport: "default", // name of default export
Exports: []nbd.ExportConfig{ec}, // array of configurations of exported items
//TLS: nbd.TLSConfig{}, // TLS configuration
DisableNoZeroes: false, // Disable NoZereos extension FIXME
Debug: dump != 0, // Verbose debug
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
// FIXME contexts
nbd.StartServer(ctx, ctx, &s.sessionWaitGroup, logger, serverConfig)
}()
return s, nil
}
// Wait for the server to finish
func (s *NBD) Wait() {
s.wg.Wait()
_ = s.logWr.Close()
_ = s.logRd.Close()
}

139
cmd/serve/nbd/nbd.md Normal file
View file

@ -0,0 +1,139 @@
Run a Network Block Device server using remote:path to store the object.
You can use a unix socket by setting the url to `unix:/path/to/socket`
or just by using an absolute path name.
`rclone serve nbd` will run on any OS, but the examples for using it
are Linux specific. There do exist Windows and macOS NBD clients but
these haven't been tested yet.
To see the packets on the wire use `--dump headers` or `--dump bodies`.
**NB** this has no authentication. It may in the future allow SSL
certificates. If you need access control then you will have to provide
it on the network layer, or use unix sockets.
### remote:path pointing to a file
If the `remote:path` points to a file then rclone will serve the file
directly as a network block device.
Using this with `--read-only` is recommended. You can use any
`--vfs-cache-mode` and only parts of the file that are read will be
cached locally if using `--vfs-cache-mode full`.
If you don't use `--read-only` then `--vfs-cache-mode full` is
required and the entire file will be cached locally and won't be
uploaded until the client has disconnected (`nbd-client -d`).
### remote:path pointing to a directory
If the `remote:path` points to a directory then rclone will treat the
directory as a place to store chunks of the exported network block device.
It will store an `info.json` file in the top level and store the
individual chunks in a hierarchical directory scheme with no more than
256 chunks or directories in any directory.
The first time you use this, you should use the `--create` flag
indicating how big you want the network block device to appear. Rclone
only allocates space you use so you can make this large. For example
`--create 1T`. You can also pass the `--chunk-size` flag at this
point. If you don't you will get the default of 64k chunks.
Rclone will then chunk the network block device into `--chunk-size`
chunks. Rclone has to download the entire chunk in order to change
only part of it and it will cache the chunk on disk so bear that in
mind when choosing `--chunk-size`.
If you wish to change the size of the network block device you can use
the `--resize` flag. This won't remove any data, it just changes the
size advertised. So if you have made a file system on the block device
you will need to resize it too.
If you are using `--read-only` then you can use any
`--vfs-cache-mode`.
If you are not using `--read-only` then you will need
`--vfs-cache-mode writes` or `--vfs-cache-mode full`.
Note that rclone will be acting as a writeback cache with
`--vfs-cache-mode writes` or `--vfs-cache-mode full`. Note that rclone
will only write `--transfers` files at once so the cache can get a
backlog of uploads. You can reduce the writeback caching slightly
setting `--vfs-write-back 0`, however due to the way the kernel works,
this will only reduce it slightly.
If using `--vfs-cache-mode writes` or `--vfs-cache-mode full` it is
recommended to set limits on the cache size using some or all of these
flags as the VFS can use a lot of disk space very quickly.
--vfs-cache-max-age duration Max time since last access of objects in the cache (default 1h0m0s)
--vfs-cache-max-size SizeSuffix Max total size of objects in the cache (default off)
--vfs-cache-min-free-space SizeSuffix Target minimum free space on the disk containing the cache (default off)
You might also need to set this smaller as the cache will only be
examined at this interval.
--vfs-cache-poll-interval duration Interval to poll the cache for stale objects (default 1m0s)
### Linux Examples
Install
sudo apt install nbd-client
Start server on localhost:10809 by default.
rclone -v --vfs-cache-mode full serve ndb remote:path
List devices
sudo modprobe nbd
sudo nbd-client --list localhost
Format the partition and mount read write
sudo nbd-client -g localhost 10809 /dev/nbd0
sudo mkfs.ext4 /dev/nbd0
sudo mkdir -p /mnt/tmp
sudo mount -t ext4 /dev/nbd0 /mnt/tmp
Mount read only
rclone -v --vfs-cache-mode full --read-only serve ndb remote:path
sudo nbd-client --readonly -g localhost 10809 /dev/nbd0
sudo mount -t ext4 -o ro /dev/nbd0 /mnt/tmp
Disconnect
sudo umount /mnt/tmp
sudo nbd-client -d /dev/nbd0
### TODO
Experiment with `-connections` option. This is supported by the code.
Does it improve performance?
-connections num
-C Use num connections to the server, to allow speeding up request
handling, at the cost of higher resource usage on the server.
Use of this option requires kernel support available first with
Linux 4.9.
Experiment with `-persist` option - is that a good idea?
-persist
-p When this option is specified, nbd-client will immediately try
to reconnect an nbd device if the connection ever drops unex
pectedly due to a lost server or something similar.
Need to implement Trim and see if Trim is being called.
Need to delete zero files before upload (do in VFS layer?)
FIXME need better back pressure from VFS cache to writers.
FIXME need Sync to actually work!

View file

@ -9,6 +9,7 @@ import (
"github.com/rclone/rclone/cmd/serve/docker"
"github.com/rclone/rclone/cmd/serve/ftp"
"github.com/rclone/rclone/cmd/serve/http"
"github.com/rclone/rclone/cmd/serve/nbd"
"github.com/rclone/rclone/cmd/serve/nfs"
"github.com/rclone/rclone/cmd/serve/restic"
"github.com/rclone/rclone/cmd/serve/s3"
@ -43,6 +44,9 @@ func init() {
if s3.Command != nil {
Command.AddCommand(s3.Command)
}
if nbd.Command != nil {
Command.AddCommand(nbd.Command)
}
cmd.Root.AddCommand(Command)
}

View file

@ -233,9 +233,11 @@ value, say `export GOGC=20`. This will make the garbage collector
work harder, reducing memory size at the expense of CPU usage.
The most common cause of rclone using lots of memory is a single
directory with thousands or millions of files in. Rclone has to load
this entirely into memory as rclone objects. Each rclone object takes
0.5k-1k of memory.
directory with millions of files in. Rclone has to load this entirely
into memory as rclone objects. Each rclone object takes 0.5k-1k of
memory. There is
[a workaround for this](https://github.com/rclone/rclone/wiki/Big-syncs-with-millions-of-files)
which involves a bit of scripting.
### Rclone changes fullwidth Unicode punctuation marks in file names

View file

@ -5018,6 +5018,28 @@ nodes across the network.
For more detailed comparison please check the documentation of the
[storj](/storj) backend.
## Memory usage {memory}
The most common cause of rclone using lots of memory is a single
directory with millions of files in. Despite s3 not really having the
concepts of directories, rclone does the sync on a directory by
directory basis to be compatible with normal filing systems.
Rclone loads each directory into memory as rclone objects. Each rclone
object takes 0.5k-1k of memory, so approximately 1GB per 1,000,000
files, and the sync for that directory does not begin until it is
entirely loaded in memory. So the sync can take a long time to start
for large directories.
To sync a directory with 100,000,000 files in you would need approximately
100 GB of memory. At some point the amount of memory becomes difficult
to provide so there is
[a workaround for this](https://github.com/rclone/rclone/wiki/Big-syncs-with-millions-of-files)
which involves a bit of scripting.
At some point rclone will gain a sync mode which is effectively this
workaround but built in to rclone.
## Limitations
`rclone about` is not supported by the S3 backend. Backends without
@ -5028,7 +5050,6 @@ remote.
See [List of backends that do not support rclone about](https://rclone.org/overview/#optional-features) and [rclone about](https://rclone.org/commands/rclone_about/)
### Synology C2 Object Storage {#synology-c2}
[Synology C2 Object Storage](https://c2.synology.com/en-global/object-storage/overview) provides a secure, S3-compatible, and cost-effective cloud storage solution without API request, download fees, and deletion penalty.

3
go.mod
View file

@ -2,6 +2,8 @@ module github.com/rclone/rclone
go 1.21
//replace github.com/rclone/gonbdserver => /home/ncw/go/src/github.com/rclone/gonbdserver
require (
bazil.org/fuse v0.0.0-20230120002735-62a210ff1fd5
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.13.0
@ -53,6 +55,7 @@ require (
github.com/prometheus/client_golang v1.19.1
github.com/putdotio/go-putio/putio v0.0.0-20200123120452-16d982cac2b8
github.com/rclone/gofakes3 v0.0.3-0.20240716093803-d6abc178be56
github.com/rclone/gonbdserver v0.0.0-20230928185136-7adb4642e1cb
github.com/rfjakob/eme v1.1.2
github.com/rivo/uniseg v0.4.7
github.com/rogpeppe/go-internal v1.12.0

2
go.sum
View file

@ -460,6 +460,8 @@ github.com/rasky/go-xdr v0.0.0-20170124162913-1a41d1a06c93 h1:UVArwN/wkKjMVhh2EQ
github.com/rasky/go-xdr v0.0.0-20170124162913-1a41d1a06c93/go.mod h1:Nfe4efndBz4TibWycNE+lqyJZiMX4ycx+QKV8Ta0f/o=
github.com/rclone/gofakes3 v0.0.3-0.20240716093803-d6abc178be56 h1:JmCt3EsTnlZrg/PHIyZqvKDRvBCde/rmThAQFliE9bU=
github.com/rclone/gofakes3 v0.0.3-0.20240716093803-d6abc178be56/go.mod h1:L0VIBE0mT6ArN/5dfHsJm3UjqCpi5B/cdN+qWDNh7ko=
github.com/rclone/gonbdserver v0.0.0-20230928185136-7adb4642e1cb h1:4FyF15nQLPIhLcJDpn2ItwcuO3E/pYQXdPVOt+v3Duk=
github.com/rclone/gonbdserver v0.0.0-20230928185136-7adb4642e1cb/go.mod h1:HwROhGq4gA7vncM5mLZjoNbI9CrS52aDuHReB3NMWp4=
github.com/relvacode/iso8601 v1.3.0 h1:HguUjsGpIMh/zsTczGN3DVJFxTU/GX+MMmzcKoMO7ko=
github.com/relvacode/iso8601 v1.3.0/go.mod h1:FlNp+jz+TXpyRqgmM7tnzHHzBnz776kmAH2h3sZCn0I=
github.com/rfjakob/eme v1.1.2 h1:SxziR8msSOElPayZNFfQw4Tjx/Sbaeeh3eRvrHVMUs4=

439
vfs/chunked/chunked.go Normal file
View file

@ -0,0 +1,439 @@
// Package chunked provides an infinite chunked file abstraction from
// the VFS.
//
// This can be used in the vfs layer to make chunked files, and in
// something like rclone serve nbd.
package chunked
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
stdfs "io/fs"
"os"
"path"
"sync"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/log"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/vfs"
"github.com/rclone/rclone/vfs/vfscommon"
)
const (
infoName = "info.json" // name of chunk info file
minChunkBits = 4 // min size of chunk is 16 bytes
maxChunkBits = 30 // max size of chunk is 1 GB
defaultChunkBits = 16 // 64k chunks by default
maxBufferChunks = 1024 // default number of chunks in read buffer
maxDirtyChunks = 128 // default number of chuns in write buffer
currentInfoVersion = 1 // version of the info file
)
// Info is serialized to the directory
type Info struct {
Version int // version of chunk file
Comment string // note about this file
Size int64 // current size of the file
ChunkBits uint // number of bits in the chunk
ChunkSize int // must be power of two (1 << ChunkBits)
}
// File stores info about the file
type File struct {
// these are read-only after creation so no locking required
vfs *vfs.VFS // underlying VFS
dir string // path to directory
chunkSize int // size of a chunk 1 << info.ChunkBits
mask int64 // mask an offset onto a chunk boundary ^(chunkSize-1)
chunkMask int64 // mask an offset into an intra chunk index (chunkSize-1)
mu sync.Mutex // lock for info
opens int // number of file handles open on this File
accessed time.Time // time file was last opened or closed
valid bool // true if the info is valid
info Info // info about the file
infoRemote string // path to info object
sizeChanged time.Time // when the size was changed
}
// New creates a new chunked file at dir.
func New(vfs *vfs.VFS, dir string) (cf *File) {
cf = &File{
vfs: vfs,
dir: dir,
infoRemote: path.Join(dir, infoName),
}
return cf
}
// Open - open an existing file or create a new one with bits chunksize
//
// if create is not set then it will error if the file does not exist
//
// if bits is 0 then it uses the default value.
//
// Call Close() to show that you are no longer using this file.
//
// Open and Close can be called multiple times on one *File
func (cf *File) Open(create bool, bits uint) (err error) {
cf.mu.Lock()
defer cf.mu.Unlock()
if bits == 0 {
bits = defaultChunkBits
}
if bits < minChunkBits {
return fmt.Errorf("chunk bits %d too small, must be >= %d", bits, minChunkBits)
}
if bits > maxChunkBits {
return fmt.Errorf("chunk bits %d too large, must be <= %d", bits, maxChunkBits)
}
if !cf.valid {
err = cf._readInfo()
if err != nil && (!create || !errors.Is(err, stdfs.ErrNotExist)) {
return fmt.Errorf("failed to open chunked file: read info failed: %w", err)
}
if err != nil {
cf.info = Info{
Size: 0,
ChunkBits: bits,
ChunkSize: 1 << bits,
Version: currentInfoVersion,
Comment: "rclone chunked file",
}
err = cf._writeInfo()
if err != nil && err != fs.ErrorObjectNotFound {
return fmt.Errorf("failed to open chunked file: write info failed: %w", err)
}
}
cf.valid = true
cf._updateChunkBits()
}
// Show another open
cf.accessed = time.Now()
cf.opens++
return nil
}
// Close this *File
//
// It also writes the size out if it has changed and flushes the
// buffers.
//
// Open and Close can be called multiple times on one *File
func (cf *File) Close() error {
cf.mu.Lock()
defer cf.mu.Unlock()
cf.accessed = time.Now()
if cf.opens <= 0 {
return errors.New("unbalanced open/close on File")
}
cf.opens--
return cf._sync()
}
// sets all the constants which depend on cf.info.ChunkBits
//
// call with mu held
func (cf *File) _updateChunkBits() {
cf.chunkSize = 1 << cf.info.ChunkBits
cf.chunkMask = int64(cf.chunkSize - 1)
cf.mask = ^cf.chunkMask
cf.info.ChunkSize = cf.chunkSize
}
// makeChunkFileName makes a remote name for the chunk
func (cf *File) makeChunkFileName(off int64) string {
if off&cf.chunkMask != 0 {
panic("makeChunkFileName: non chunk aligned offset")
}
cf.mu.Lock()
off >>= cf.info.ChunkBits
Bits := 64 - cf.info.ChunkBits
cf.mu.Unlock()
Bytes := Bits >> 3
// round up
if Bits&7 != 0 {
Bytes += 1
}
// Format to correct number of bytes
// offS = "01234567"
offS := fmt.Sprintf("%0*X", 2*Bytes, off)
// Now interpolated / except for the last
var out bytes.Buffer
if cf.dir != "" {
out.WriteString(cf.dir)
out.WriteRune('/')
}
// out = "path/to/file/"
for i := uint(0); i < Bytes-1; i++ {
out.WriteString(offS[i*2 : i*2+2])
out.WriteRune('/')
}
// out = "path/to/file/01/23/45/"
// now add full string
out.WriteString(offS)
// out = "path/to/file/01/23/45/01234567"
out.WriteString(".bin")
// out = "path/to/file/01/23/45/01234567.bin"
return out.String()
}
// readInfo writes the ChunkInfo to the object
//
// if it wasn't found then it returns fs.ErrorObjectNotFound
//
// Call with mu held
func (cf *File) _readInfo() (err error) {
content, err := cf.vfs.ReadFile(cf.infoRemote)
if err != nil {
return fmt.Errorf("failed to find chunk info file %q: %w", cf.infoRemote, err)
}
err = json.Unmarshal(content, &cf.info)
if err != nil {
return fmt.Errorf("failed to decode chunk info file %q: %w", cf.infoRemote, err)
}
if cf.info.Version > currentInfoVersion {
return fmt.Errorf("don't understand version %d info files (current version in %d)", cf.info.Version, currentInfoVersion)
}
if cf.info.ChunkBits < minChunkBits {
return fmt.Errorf("chunk bits %d too small, must be >= %d", cf.info.ChunkBits, minChunkBits)
}
if cf.info.ChunkBits > maxChunkBits {
return fmt.Errorf("chunk bits %d too large, must be <= %d", cf.info.ChunkBits, maxChunkBits)
}
return nil
}
// _writeInfo writes the ChunkInfo to the object
//
// call with mu held
func (cf *File) _writeInfo() (err error) {
content, err := json.Marshal(&cf.info)
if err != nil {
return fmt.Errorf("failed to encode chunk info file %q: %w", cf.infoRemote, err)
}
err = cf.vfs.WriteFile(cf.infoRemote, content, 0600)
if err != nil {
return fmt.Errorf("failed to write chunk info file %q: %w", cf.infoRemote, err)
}
// show size is now unchanged
cf.sizeChanged = time.Time{}
return nil
}
// _writeSize writes the ChunkInfo if the size has changed
//
// call with mu held
func (cf *File) _writeSize() (err error) {
if cf.sizeChanged.IsZero() {
return nil
}
return cf._writeInfo()
}
// zeroBytes zeroes n bytes at the start of buf, or until the end of
// buf, whichever comes first. It returns the number of bytes it
// wrote.
func zeroBytes(buf []byte, n int) int {
if n > len(buf) {
n = len(buf)
}
for i := 0; i < n; i++ {
buf[i] = 0
}
return n
}
// Read bytes from the chunk at chunkStart from offset off in the
// chunk.
//
// Return number of bytes read
func (cf *File) chunkReadAt(b []byte, chunkStart int64, off int64) (n int, err error) {
defer log.Trace(nil, "size=%d, chunkStart=%016x, off=%d", len(b), chunkStart, off)("n=%d, err=%v", &n, &err)
fileName := cf.makeChunkFileName(chunkStart)
if endPos := int64(cf.chunkSize) - off; endPos < int64(len(b)) {
b = b[:endPos]
}
file, err := cf.vfs.Open(fileName)
// If file doesn't exist, it is zero
if errors.Is(err, stdfs.ErrNotExist) {
return zeroBytes(b, len(b)), nil
} else if err != nil {
return 0, err
}
defer fs.CheckClose(file, &err)
n, err = file.ReadAt(b, off)
if err == io.EOF && off+int64(n) >= int64(cf.chunkSize) {
err = nil
}
return
}
// ReadAt reads len(b) bytes from the File starting at byte offset off. It
// returns the number of bytes read and the error, if any. ReadAt always
// returns a non-nil error when n < len(b). At end of file, that error is
// io.EOF.
func (cf *File) ReadAt(b []byte, off int64) (n int, err error) {
cf.mu.Lock()
size := cf.info.Size
cf.mu.Unlock()
if off >= size {
return 0, io.EOF
}
isEOF := false
if bytesToEnd := size - off; bytesToEnd < int64(len(b)) {
b = b[:bytesToEnd]
isEOF = true
}
for n < len(b) {
chunkStart := off & cf.mask
end := n + cf.chunkSize
if end > len(b) {
end = len(b)
}
var nn int
nn, err = cf.chunkReadAt(b[n:end], chunkStart, off-chunkStart)
n += nn
off += int64(nn)
if err != nil {
break
}
}
if err == nil && isEOF {
err = io.EOF
}
return
}
// Write b to the chunk at chunkStart at offset off
//
// Return number of bytes written
func (cf *File) chunkWriteAt(b []byte, chunkStart int64, off int64) (n int, err error) {
defer log.Trace(nil, "size=%d, chunkStart=%016x, off=%d", len(b), chunkStart, off)("n=%d, err=%v", &n, &err)
fileName := cf.makeChunkFileName(chunkStart)
err = cf.vfs.MkdirAll(path.Dir(fileName), 0700)
if err != nil {
return 0, err
}
file, err := cf.vfs.OpenFile(fileName, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return 0, err
}
defer fs.CheckClose(file, &err)
// Make the file full size if we can
if cf.vfs.Opt.CacheMode >= vfscommon.CacheModeWrites {
err = file.Truncate(int64(cf.chunkSize))
if err != nil {
return 0, err
}
}
if endPos := int64(cf.chunkSize) - off; endPos < int64(len(b)) {
b = b[:endPos]
}
return file.WriteAt(b, off)
}
// WriteAt writes len(b) bytes to the File starting at byte offset off. It
// returns the number of bytes written and an error, if any. WriteAt returns a
// non-nil error when n != len(b).
func (cf *File) WriteAt(b []byte, off int64) (n int, err error) {
for n < len(b) {
chunkStart := off & cf.mask
var nn int
end := n + cf.chunkSize
if end > len(b) {
end = len(b)
}
nn, err = cf.chunkWriteAt(b[n:end], chunkStart, off-chunkStart)
n += nn
off += int64(nn)
if err != nil {
break
}
}
// Write new size if needed
cf.mu.Lock()
size := cf.info.Size
if off > size {
cf.info.Size = off // extend the file if necessary
cf.sizeChanged = time.Now()
}
cf.mu.Unlock()
return
}
// Size reads the current size of the file
func (cf *File) Size() int64 {
cf.mu.Lock()
if !cf.valid {
err := cf._readInfo()
if err != nil {
fs.Errorf(cf.dir, "Failed to read size: %v", err)
}
}
size := cf.info.Size
cf.mu.Unlock()
return size
}
// Truncate sets the current size of the file
//
// FIXME it doesn't delete any data...
func (cf *File) Truncate(size int64) error {
cf.mu.Lock()
if cf.info.Size != size {
cf.info.Size = size
cf.sizeChanged = time.Now()
}
cf.mu.Unlock()
return nil
}
// _sync writes any pending data to disk by flushing the write queue
//
// call with the lock held
func (cf *File) _sync() error {
err := cf._writeSize()
// FIXME need a VFS function to flush everything to disk
return err
}
// Sync writes any pending data to disk by flushing the write queue
func (cf *File) Sync() error {
cf.mu.Lock()
defer cf.mu.Unlock()
return cf._sync()
}
// Remove removes all the data in the file
func (cf *File) Remove() error {
cf.mu.Lock()
defer cf.mu.Unlock()
if !cf.valid {
return nil
}
if cf.opens > 0 {
return errors.New("can't delete chunked file when it is open")
}
cf.valid = false
_ = cf._sync()
// Purge all the files
// FIXME should get this into the VFS as RemoveAll
err := operations.Purge(context.TODO(), cf.vfs.Fs(), cf.dir)
cf.vfs.FlushDirCache()
return err
}

452
vfs/chunked/chunked_test.go Normal file
View file

@ -0,0 +1,452 @@
package chunked
import (
"context"
"errors"
"fmt"
"io"
stdfs "io/fs"
"testing"
"time"
_ "github.com/rclone/rclone/backend/all" // import all the file systems
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/object"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/fstest"
"github.com/rclone/rclone/vfs"
"github.com/rclone/rclone/vfs/vfscommon"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestMain drives the tests
func TestMain(m *testing.M) {
fstest.TestMain(m)
}
func TestChunkFileName(t *testing.T) {
cf := &File{
dir: "path/to/dir",
}
for _, test := range []struct {
bits uint
off int64
want string
panic bool
}{
{
8,
0,
"path/to/dir/00/00/00/00/00/00/00000000000000.bin",
false,
},
{
8,
0x123456789ABCDE00,
"path/to/dir/12/34/56/78/9A/BC/123456789ABCDE.bin",
false,
},
{
8,
0x123456789ABCDE80,
"",
true,
},
{
12,
0,
"path/to/dir/00/00/00/00/00/00/00000000000000.bin",
false,
},
{
12,
0x123456789ABCD000,
"path/to/dir/01/23/45/67/89/AB/0123456789ABCD.bin",
false,
},
{
15,
0,
"path/to/dir/00/00/00/00/00/00/00000000000000.bin",
false,
},
{
15,
0x123456789ABCC000,
"",
true,
},
{
15,
0x123456789ABC0000,
"path/to/dir/00/24/68/AC/F1/35/002468ACF13578.bin",
false,
},
{
16,
0,
"path/to/dir/00/00/00/00/00/000000000000.bin",
false,
},
{
16,
0x123456789ABC8000,
"",
true,
},
{
16,
0x123456789ABC0000,
"path/to/dir/12/34/56/78/9A/123456789ABC.bin",
false,
},
{
20,
0,
"path/to/dir/00/00/00/00/00/000000000000.bin",
false,
},
{
23,
0,
"path/to/dir/00/00/00/00/00/000000000000.bin",
false,
},
{
24,
0,
"path/to/dir/00/00/00/00/0000000000.bin",
false,
},
{
24,
0x7EFDFCFBFA000000,
"path/to/dir/7E/FD/FC/FB/7EFDFCFBFA.bin",
false,
},
{
28,
0x7EFDFCFBF0000000,
"path/to/dir/07/EF/DF/CF/07EFDFCFBF.bin",
false,
},
} {
cf.info.ChunkBits = test.bits
cf._updateChunkBits()
what := fmt.Sprintf("bits=%d, off=0x%X, panic=%v", test.bits, test.off, test.panic)
if !test.panic {
got := cf.makeChunkFileName(test.off)
assert.Equal(t, test.want, got, what)
} else {
assert.Panics(t, func() {
cf.makeChunkFileName(test.off)
}, what)
}
}
}
// check that the object exists and has the contents
func checkObject(t *testing.T, f fs.Fs, remote string, want string) {
o, err := f.NewObject(context.TODO(), remote)
require.NoError(t, err)
dst := object.NewMemoryObject(remote, time.Now(), nil)
_, err = operations.Copy(context.TODO(), object.MemoryFs, dst, "", o)
require.NoError(t, err)
assert.Equal(t, want, string(dst.Content()))
}
// Constants uses in the tests
const (
writeBackDelay = 100 * time.Millisecond // A short writeback delay for testing
waitForWritersDelay = 30 * time.Second // time to wait for existing writers
)
// Clean up a test VFS
func cleanupVFS(t *testing.T, vfs *vfs.VFS) {
vfs.WaitForWriters(waitForWritersDelay)
err := vfs.CleanUp()
require.NoError(t, err)
vfs.Shutdown()
}
// Create a new VFS
func newTestVFSOpt(t *testing.T, opt *vfscommon.Options) (r *fstest.Run, VFS *vfs.VFS) {
r = fstest.NewRun(t)
VFS = vfs.New(r.Fremote, opt)
t.Cleanup(func() {
cleanupVFS(t, VFS)
})
return r, VFS
}
func TestNew(t *testing.T) {
_, VFS := newTestVFSOpt(t, nil)
// check default open
cf := New(VFS, "")
assert.Equal(t, 0, cf.opens)
err := cf.Open(true, defaultChunkBits)
assert.NoError(t, err)
assert.Equal(t, int64(0), cf.info.Size)
assert.Equal(t, uint(defaultChunkBits), cf.info.ChunkBits)
assert.Equal(t, 0x10000, cf.chunkSize)
assert.Equal(t, int64(0xFFFF), cf.chunkMask)
assert.Equal(t, ^int64(0xFFFF), cf.mask)
assert.Equal(t, 1, cf.opens)
// check the close
err = cf.Close()
assert.NoError(t, err)
assert.Equal(t, 0, cf.opens)
// check the double close
err = cf.Close()
assert.Error(t, err)
assert.Equal(t, 0, cf.opens)
// check that the info got written
checkObject(t, VFS.Fs(), cf.infoRemote, `{"Version":1,"Comment":"rclone chunked file","Size":0,"ChunkBits":16,"ChunkSize":65536}`)
// change the info
cf.info.Size = 100
cf.info.ChunkBits = 20
cf._updateChunkBits()
err = cf._writeInfo()
assert.NoError(t, err)
// read it back in
cf = New(VFS, "")
err = cf.Open(false, 0)
assert.NoError(t, err)
assert.Equal(t, int64(100), cf.info.Size)
assert.Equal(t, uint(20), cf.info.ChunkBits)
assert.Equal(t, 0x100000, cf.chunkSize)
assert.Equal(t, int64(0xFFFFF), cf.chunkMask)
assert.Equal(t, ^int64(0xFFFFF), cf.mask)
// check opens
// test limits for readInfo
for _, test := range []struct {
info string
error string
}{
{
`{"Version":1,"Comment":"rclone chunked file","Size":0,"ChunkBits":16,"ChunkSize":65536`,
"failed to decode chunk info file",
},
{
`{"Version":99,"Comment":"rclone chunked file","Size":0,"ChunkBits":16,"ChunkSize":65536}`,
"don't understand version 99 info files",
},
{
`{"Version":1,"Comment":"rclone chunked file","Size":0,"ChunkBits":1,"ChunkSize":65536}`,
"chunk bits 1 too small",
},
{
`{"Version":1,"Comment":"rclone chunked file","Size":0,"ChunkBits":99,"ChunkSize":65536}`,
"chunk bits 99 too large",
},
} {
require.NoError(t, VFS.WriteFile(cf.infoRemote, []byte(test.info), 0600))
err = cf._readInfo()
require.Error(t, err)
assert.Contains(t, err.Error(), test.error)
}
}
func newTestFile(t *testing.T) (*vfs.VFS, *File) {
opt := vfscommon.Opt
opt.CacheMode = vfscommon.CacheModeFull
opt.WriteBack = 0 // make writeback synchronous
_, VFS := newTestVFSOpt(t, &opt)
cf := New(VFS, "")
err := cf.Open(true, 4)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, cf.Close())
})
return VFS, cf
}
func TestReadWriteChunk(t *testing.T) {
VFS, cf := newTestFile(t)
const (
off = 0x123456789ABCDEF0
wantRemote = "01/23/45/67/89/AB/CD/0123456789ABCDEF.bin"
)
// pretend the file is big
require.NoError(t, cf.Truncate(2*off))
// check reading non existent chunk gives 0
var zero = make([]byte, 16)
var b = make([]byte, 16)
n, err := cf.ReadAt(b, off)
require.NoError(t, err)
assert.Equal(t, 16, n)
assert.Equal(t, zero, b)
// create a new chunk and write some data
n, err = cf.WriteAt([]byte("0123456789abcdef"), off)
require.NoError(t, err)
assert.Equal(t, 16, n)
// check the chunk on disk
checkObject(t, VFS.Fs(), wantRemote, "0123456789abcdef")
// read the chunk off disk and check it
n, err = cf.ReadAt(b, off)
require.NoError(t, err)
assert.Equal(t, 16, n)
assert.Equal(t, "0123456789abcdef", string(b))
}
func TestZeroBytes(t *testing.T) {
b := []byte{1, 2, 3, 4}
zeroBytes(b, 2)
assert.Equal(t, []byte{0, 0, 3, 4}, b)
b = []byte{1, 2, 3, 4}
zeroBytes(b, 17)
assert.Equal(t, []byte{0, 0, 0, 0}, b)
}
func TestReadAt(t *testing.T) {
_, cf := newTestFile(t)
// make a new chunk and write it to disk as chunk 1
zero := make([]byte, 16)
middle := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
n, err := cf.WriteAt(middle, 16)
require.NoError(t, err)
assert.Equal(t, 16, n)
// set the size to 0
cf.info.Size = 0
// check reading
b := make([]byte, 40)
n, err = cf.ReadAt(b, 0)
assert.Equal(t, 0, n)
assert.Equal(t, io.EOF, err)
// set the size to 38
cf.info.Size = 38
// read to end
n, err = cf.ReadAt(b, 0)
assert.Equal(t, 38, n)
assert.Equal(t, io.EOF, err)
expected := append([]byte(nil), zero...)
expected = append(expected, middle...)
expected = append(expected, zero[:6]...)
assert.Equal(t, expected, b[:n])
// read not to end
b = make([]byte, 16)
n, err = cf.ReadAt(b, 10)
assert.Equal(t, 16, n)
assert.NoError(t, err)
expected = append([]byte(nil), zero[10:]...)
expected = append(expected, middle[:10]...)
assert.Equal(t, expected, b[:n])
// read at end
n, err = cf.ReadAt(b, 38)
assert.Equal(t, 0, n)
assert.Equal(t, io.EOF, err)
// read past end
n, err = cf.ReadAt(b, 99)
assert.Equal(t, 0, n)
assert.Equal(t, io.EOF, err)
}
func TestWriteAt(t *testing.T) {
VFS, cf := newTestFile(t)
f := VFS.Fs()
// Make test buffer
b := []byte{}
for i := byte(0); i < 30; i++ {
b = append(b, '0'+i)
}
t.Run("SizeZero", func(t *testing.T) {
assert.Equal(t, int64(0), cf.Size())
})
const (
wantRemote1 = "00/00/00/00/00/00/00/0000000000000000.bin"
wantRemote2 = "00/00/00/00/00/00/00/0000000000000001.bin"
wantRemote3 = "00/00/00/00/00/00/00/0000000000000002.bin"
)
t.Run("Extended", func(t *testing.T) {
// write it and check file is extended
n, err := cf.WriteAt(b, 8)
assert.Equal(t, 30, n)
assert.NoError(t, err)
assert.Equal(t, int64(38), cf.info.Size)
// flush the parts to disk
require.NoError(t, cf.Sync())
// check the parts on disk
checkObject(t, f, wantRemote1, "\x00\x00\x00\x00\x00\x00\x00\x0001234567")
checkObject(t, f, wantRemote2, "89:;<=>?@ABCDEFG")
checkObject(t, f, wantRemote3, "HIJKLM\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")
})
t.Run("Size", func(t *testing.T) {
assert.Equal(t, int64(38), cf.Size())
})
t.Run("Overwrite", func(t *testing.T) {
// overwrite a part
n, err := cf.WriteAt([]byte("abcdefgh"), 12)
assert.Equal(t, 8, n)
assert.NoError(t, err)
assert.Equal(t, int64(38), cf.info.Size)
// flush the parts to disk
require.NoError(t, cf.Sync())
// check the parts on disk
checkObject(t, f, wantRemote1, "\x00\x00\x00\x00\x00\x00\x00\x000123abcd")
checkObject(t, f, wantRemote2, "efgh<=>?@ABCDEFG")
checkObject(t, f, wantRemote3, "HIJKLM\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")
})
t.Run("Remove", func(t *testing.T) {
require.Error(t, cf.Remove())
require.NoError(t, cf.Close())
// Check files are there
fis, err := VFS.ReadDir(cf.dir)
require.NoError(t, err)
assert.True(t, len(fis) > 0)
// Remove the file
require.NoError(t, cf.Remove())
// Check files have gone
fis, err = VFS.ReadDir(cf.dir)
what := fmt.Sprintf("err=%v, fis=%v", err, fis)
if err == nil {
assert.Equal(t, 0, len(fis), what)
} else {
require.True(t, errors.Is(err, stdfs.ErrNotExist), what)
}
// Reopen for cleanup
require.NoError(t, cf.Open(true, 0))
})
}

View file

@ -766,6 +766,21 @@ func (vfs *VFS) ReadFile(filename string) (b []byte, err error) {
return io.ReadAll(f)
}
// WriteFile writes data to the named file, creating it if necessary.
// If the file does not exist, WriteFile creates it with permissions perm (before umask);
// otherwise WriteFile truncates it before writing, without changing permissions.
// Since Writefile requires multiple system calls to complete, a failure mid-operation
// can leave the file in a partially written state.
func (vfs *VFS) WriteFile(name string, data []byte, perm os.FileMode) (err error) {
f, err := vfs.OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return err
}
defer fs.CheckClose(f, &err)
_, err = f.Write(data)
return err
}
// AddVirtual adds the object (file or dir) to the directory cache
func (vfs *VFS) AddVirtual(remote string, size int64, isDir bool) (err error) {
remote = strings.TrimRight(remote, "/")