forked from TrueCloudLab/restic
d05f6211d1
restic must be able to refresh lock files in time. However, large uploads over slow connections can cause the lock refresh to be stuck behind the large uploads and thus time out.
98 lines
2.6 KiB
Go
98 lines
2.6 KiB
Go
package sema
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
|
|
"github.com/cenkalti/backoff/v4"
|
|
"github.com/restic/restic/internal/errors"
|
|
"github.com/restic/restic/internal/restic"
|
|
)
|
|
|
|
// make sure that connectionLimitedBackend implements restic.Backend
|
|
var _ restic.Backend = &connectionLimitedBackend{}
|
|
|
|
// connectionLimitedBackend limits the number of concurrent operations.
|
|
type connectionLimitedBackend struct {
|
|
restic.Backend
|
|
sem semaphore
|
|
}
|
|
|
|
// NewBackend creates a backend that limits the concurrent operations on the underlying backend
|
|
func NewBackend(be restic.Backend) restic.Backend {
|
|
sem, err := newSemaphore(be.Connections())
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
return &connectionLimitedBackend{
|
|
Backend: be,
|
|
sem: sem,
|
|
}
|
|
}
|
|
|
|
// typeDependentLimit acquire a token unless the FileType is a lock file. The returned function
|
|
// must be called to release the token.
|
|
func (be *connectionLimitedBackend) typeDependentLimit(t restic.FileType) func() {
|
|
// allow concurrent lock file operations to ensure that the lock refresh is always possible
|
|
if t == restic.LockFile {
|
|
return func() {}
|
|
}
|
|
be.sem.GetToken()
|
|
return be.sem.ReleaseToken
|
|
}
|
|
|
|
// Save adds new Data to the backend.
|
|
func (be *connectionLimitedBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
|
if err := h.Valid(); err != nil {
|
|
return backoff.Permanent(err)
|
|
}
|
|
|
|
defer be.typeDependentLimit(h.Type)()
|
|
|
|
return be.Backend.Save(ctx, h, rd)
|
|
}
|
|
|
|
// Load runs fn with a reader that yields the contents of the file at h at the
|
|
// given offset.
|
|
func (be *connectionLimitedBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
|
if err := h.Valid(); err != nil {
|
|
return backoff.Permanent(err)
|
|
}
|
|
if offset < 0 {
|
|
return backoff.Permanent(errors.New("offset is negative"))
|
|
}
|
|
if length < 0 {
|
|
return backoff.Permanent(errors.Errorf("invalid length %d", length))
|
|
}
|
|
|
|
defer be.typeDependentLimit(h.Type)()
|
|
|
|
return be.Backend.Load(ctx, h, length, offset, fn)
|
|
}
|
|
|
|
// Stat returns information about a file in the backend.
|
|
func (be *connectionLimitedBackend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
|
|
if err := h.Valid(); err != nil {
|
|
return restic.FileInfo{}, backoff.Permanent(err)
|
|
}
|
|
|
|
defer be.typeDependentLimit(h.Type)()
|
|
|
|
return be.Backend.Stat(ctx, h)
|
|
}
|
|
|
|
// Remove deletes a file from the backend.
|
|
func (be *connectionLimitedBackend) Remove(ctx context.Context, h restic.Handle) error {
|
|
if err := h.Valid(); err != nil {
|
|
return backoff.Permanent(err)
|
|
}
|
|
|
|
defer be.typeDependentLimit(h.Type)()
|
|
|
|
return be.Backend.Remove(ctx, h)
|
|
}
|
|
|
|
func (be *connectionLimitedBackend) Unwrap() restic.Backend {
|
|
return be.Backend
|
|
}
|