fs: openwriterat: enable backends to give hints for defaults
This commit is contained in:
parent
a78bc093de
commit
d1c054906c
7 changed files with 52 additions and 27 deletions
|
@ -1316,10 +1316,11 @@ func (w *writerAt) Close() error {
|
|||
// Pass in the remote desired and the size if known.
|
||||
//
|
||||
// It truncates any existing object
|
||||
func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error) {
|
||||
func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.OpenWriterAtInfo, fs.WriterAtCloser, error) {
|
||||
info := fs.OpenWriterAtInfo{}
|
||||
err := f.mkParentDir(ctx, remote)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("OpenWriterAt: failed to create parent directory: %w", err)
|
||||
return info, nil, fmt.Errorf("OpenWriterAt: failed to create parent directory: %w", err)
|
||||
}
|
||||
fc := f.fileClient(remote)
|
||||
if size < 0 {
|
||||
|
@ -1327,7 +1328,7 @@ func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.Wr
|
|||
}
|
||||
_, err = fc.Create(ctx, size, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("OpenWriterAt: unable to create file: %w", err)
|
||||
return info, nil, fmt.Errorf("OpenWriterAt: unable to create file: %w", err)
|
||||
}
|
||||
w := &writerAt{
|
||||
ctx: ctx,
|
||||
|
@ -1335,7 +1336,7 @@ func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.Wr
|
|||
fc: fc,
|
||||
size: size,
|
||||
}
|
||||
return w, nil
|
||||
return info, w, nil
|
||||
}
|
||||
|
||||
// About gets quota information
|
||||
|
|
|
@ -1029,14 +1029,14 @@ func (f *Fs) CleanUp(ctx context.Context) error {
|
|||
// Pass in the remote desired and the size if known.
|
||||
//
|
||||
// It truncates any existing object
|
||||
func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error) {
|
||||
func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.OpenWriterAtInfo, fs.WriterAtCloser, error) {
|
||||
u, uRemote, err := f.findUpstream(remote)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return fs.OpenWriterAtInfo{}, nil, err
|
||||
}
|
||||
do := u.f.Features().OpenWriterAt
|
||||
if do == nil {
|
||||
return nil, fs.ErrorNotImplemented
|
||||
return fs.OpenWriterAtInfo{}, nil, fs.ErrorNotImplemented
|
||||
}
|
||||
return do(ctx, uRemote, size)
|
||||
}
|
||||
|
|
|
@ -1484,22 +1484,23 @@ var sparseWarning sync.Once
|
|||
// Pass in the remote desired and the size if known.
|
||||
//
|
||||
// It truncates any existing object
|
||||
func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error) {
|
||||
func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.OpenWriterAtInfo, fs.WriterAtCloser, error) {
|
||||
// Temporary Object under construction
|
||||
o := f.newObject(remote)
|
||||
info := fs.OpenWriterAtInfo{}
|
||||
|
||||
err := o.mkdirAll()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return info, nil, err
|
||||
}
|
||||
|
||||
if o.translatedLink {
|
||||
return nil, errors.New("can't open a symlink for random writing")
|
||||
return info, nil, errors.New("can't open a symlink for random writing")
|
||||
}
|
||||
|
||||
out, err := file.OpenFile(o.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return info, nil, err
|
||||
}
|
||||
// Pre-allocate the file for performance reasons
|
||||
if !f.opt.NoPreAllocate {
|
||||
|
@ -1519,7 +1520,7 @@ func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.Wr
|
|||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
return info, out, nil
|
||||
}
|
||||
|
||||
// setMetadata sets the file info from the os.FileInfo passed in
|
||||
|
|
|
@ -166,7 +166,7 @@ type Features struct {
|
|||
// Pass in the remote desired and the size if known.
|
||||
//
|
||||
// It truncates any existing object
|
||||
OpenWriterAt func(ctx context.Context, remote string, size int64) (WriterAtCloser, error)
|
||||
OpenWriterAt OpenWriterAtFn
|
||||
|
||||
// OpenChunkWriter returns the chunk size and a ChunkWriter
|
||||
//
|
||||
|
@ -686,11 +686,19 @@ type OpenWriterAter interface {
|
|||
// Pass in the remote desired and the size if known.
|
||||
//
|
||||
// It truncates any existing object
|
||||
OpenWriterAt(ctx context.Context, remote string, size int64) (WriterAtCloser, error)
|
||||
OpenWriterAt(ctx context.Context, remote string, size int64) (OpenWriterAtInfo, WriterAtCloser, error)
|
||||
}
|
||||
|
||||
// OpenWriterAtInfo describes how a backend would like ChunkWriter called
|
||||
type OpenWriterAtInfo struct {
|
||||
BufferSize int64 // preferred buffer size
|
||||
ChunkSize int64 // preferred chunk size
|
||||
Concurrency int // how many chunks to write at once
|
||||
LeavePartsOnError bool // if set don't delete parts uploaded so far on error
|
||||
}
|
||||
|
||||
// OpenWriterAtFn describes the OpenWriterAt function pointer
|
||||
type OpenWriterAtFn func(ctx context.Context, remote string, size int64) (WriterAtCloser, error)
|
||||
type OpenWriterAtFn func(ctx context.Context, remote string, size int64) (OpenWriterAtInfo, WriterAtCloser, error)
|
||||
|
||||
// ChunkWriterInfo describes how a backend would like ChunkWriter called
|
||||
type ChunkWriterInfo struct {
|
||||
|
|
|
@ -138,7 +138,9 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
|
|||
if openWriterAt == nil {
|
||||
return nil, errors.New("multi-thread copy: neither OpenChunkWriter nor OpenWriterAt supported")
|
||||
}
|
||||
openChunkWriter = openChunkWriterFromOpenWriterAt(openWriterAt, int64(ci.MultiThreadChunkSize), int64(ci.MultiThreadWriteBufferSize), f)
|
||||
|
||||
openChunkWriter = openChunkWriterFromOpenWriterAt(openWriterAt, f)
|
||||
|
||||
// If we are using OpenWriterAt we don't seek the chunks so don't need to buffer
|
||||
fs.Debugf(src, "multi-thread copy: disabling buffering because destination uses OpenWriterAt")
|
||||
noBuffering = true
|
||||
|
@ -346,31 +348,44 @@ func (w *writerAtChunkWriter) Abort(ctx context.Context) error {
|
|||
}
|
||||
|
||||
// openChunkWriterFromOpenWriterAt adapts an OpenWriterAtFn into an OpenChunkWriterFn using chunkSize and writeBufferSize
|
||||
func openChunkWriterFromOpenWriterAt(openWriterAt fs.OpenWriterAtFn, chunkSize int64, writeBufferSize int64, f fs.Fs) fs.OpenChunkWriterFn {
|
||||
func openChunkWriterFromOpenWriterAt(openWriterAt fs.OpenWriterAtFn, f fs.Fs) fs.OpenChunkWriterFn {
|
||||
return func(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) {
|
||||
ci := fs.GetConfig(ctx)
|
||||
|
||||
writerAt, err := openWriterAt(ctx, remote, src.Size())
|
||||
writerAtInfo, writerAt, err := openWriterAt(ctx, remote, src.Size())
|
||||
if err != nil {
|
||||
return info, nil, err
|
||||
}
|
||||
if !fs.ConfigOptionsInfo.Get("multi_thread_chunk_size").IsDefault() || writerAtInfo.ChunkSize == 0 {
|
||||
// if user did provided a specifc value or the backend didn't provide hint, use config value
|
||||
writerAtInfo.ChunkSize = int64(ci.MultiThreadChunkSize)
|
||||
}
|
||||
if !fs.ConfigOptionsInfo.Get("multi_thread_write_buffer_size").IsDefault() || writerAtInfo.BufferSize == 0 {
|
||||
// if user did provided a specifc value or the backend didn't provide hint, use config value
|
||||
writerAtInfo.BufferSize = int64(ci.MultiThreadWriteBufferSize)
|
||||
}
|
||||
if !fs.ConfigOptionsInfo.Get("multi_thread_streams").IsDefault() || writerAtInfo.Concurrency == 0 {
|
||||
// if user did provided a specifc value or the backend didn't provide hint, use config value
|
||||
writerAtInfo.Concurrency = ci.MultiThreadStreams
|
||||
}
|
||||
|
||||
if writeBufferSize > 0 {
|
||||
fs.Debugf(src.Remote(), "multi-thread copy: write buffer set to %v", writeBufferSize)
|
||||
if writerAtInfo.BufferSize > 0 {
|
||||
fs.Debugf(src.Remote(), "multi-thread copy: write buffer set to %v", writerAtInfo.BufferSize)
|
||||
}
|
||||
|
||||
chunkWriter := &writerAtChunkWriter{
|
||||
remote: remote,
|
||||
size: src.Size(),
|
||||
chunkSize: chunkSize,
|
||||
chunks: calculateNumChunks(src.Size(), chunkSize),
|
||||
chunkSize: writerAtInfo.ChunkSize,
|
||||
chunks: calculateNumChunks(src.Size(), writerAtInfo.ChunkSize),
|
||||
writerAt: writerAt,
|
||||
writeBufferSize: writeBufferSize,
|
||||
writeBufferSize: writerAtInfo.BufferSize,
|
||||
f: f,
|
||||
}
|
||||
info = fs.ChunkWriterInfo{
|
||||
ChunkSize: chunkSize,
|
||||
Concurrency: ci.MultiThreadStreams,
|
||||
ChunkSize: writerAtInfo.ChunkSize,
|
||||
Concurrency: writerAtInfo.Concurrency,
|
||||
LeavePartsOnError: writerAtInfo.LeavePartsOnError,
|
||||
}
|
||||
return info, chunkWriter, nil
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ func TestDoMultiThreadCopy(t *testing.T) {
|
|||
ci.MultiThreadStreams, ci.MultiThreadCutoff = 4, 50
|
||||
ci.MultiThreadSet = false
|
||||
|
||||
nullWriterAt := func(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error) {
|
||||
nullWriterAt := func(ctx context.Context, remote string, size int64) (fs.OpenWriterAtInfo, fs.WriterAtCloser, error) {
|
||||
panic("don't call me")
|
||||
}
|
||||
f.Features().OpenWriterAt = nullWriterAt
|
||||
|
|
|
@ -790,7 +790,7 @@ func Run(t *testing.T, opt *Opt) {
|
|||
t.Skip("FS has no OpenWriterAt interface")
|
||||
}
|
||||
path := "writer-at-subdir/writer-at-file"
|
||||
out, err := openWriterAt(ctx, path, -1)
|
||||
_, out, err := openWriterAt(ctx, path, -1)
|
||||
require.NoError(t, err)
|
||||
|
||||
var n int
|
||||
|
|
Loading…
Add table
Reference in a new issue