backend: Move semaphores to a dedicated package

... called backend/sema. I resisted the temptation to call the main
type sema.Phore. Also, semaphores are now passed by value to skip a
level of indirection when using them.
This commit is contained in:
greatroar 2022-06-12 17:45:34 +02:00
parent 1dd4b9b60e
commit 910d917b71
11 changed files with 101 additions and 98 deletions

View file

@ -12,6 +12,7 @@ import (
"strings" "strings"
"github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/sema"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
@ -25,7 +26,7 @@ type Backend struct {
accountName string accountName string
container *storage.Container container *storage.Container
connections uint connections uint
sem *backend.Semaphore sem sema.Semaphore
prefix string prefix string
listMaxItems int listMaxItems int
backend.Layout backend.Layout
@ -48,7 +49,7 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) {
service := client.GetBlobService() service := client.GetBlobService()
sem, err := backend.NewSemaphore(cfg.Connections) sem, err := sema.New(cfg.Connections)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -8,6 +8,7 @@ import (
"path" "path"
"github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/sema"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
@ -23,7 +24,7 @@ type b2Backend struct {
cfg Config cfg Config
listMaxItems int listMaxItems int
backend.Layout backend.Layout
sem *backend.Semaphore sem sema.Semaphore
} }
const defaultListMaxItems = 1000 const defaultListMaxItems = 1000
@ -58,7 +59,7 @@ func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend
return nil, errors.Wrap(err, "Bucket") return nil, errors.Wrap(err, "Bucket")
} }
sem, err := backend.NewSemaphore(cfg.Connections) sem, err := sema.New(cfg.Connections)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -99,7 +100,7 @@ func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backe
return nil, errors.Wrap(err, "NewBucket") return nil, errors.Wrap(err, "NewBucket")
} }
sem, err := backend.NewSemaphore(cfg.Connections) sem, err := sema.New(cfg.Connections)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -284,11 +285,11 @@ func (be *b2Backend) Remove(ctx context.Context, h restic.Handle) error {
} }
type semLocker struct { type semLocker struct {
*backend.Semaphore sema.Semaphore
} }
func (sm semLocker) Lock() { sm.GetToken() } func (sm *semLocker) Lock() { sm.GetToken() }
func (sm semLocker) Unlock() { sm.ReleaseToken() } func (sm *semLocker) Unlock() { sm.ReleaseToken() }
// List returns a channel that yields all names of blobs of type t. // List returns a channel that yields all names of blobs of type t.
func (be *b2Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { func (be *b2Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
@ -298,7 +299,7 @@ func (be *b2Backend) List(ctx context.Context, t restic.FileType, fn func(restic
defer cancel() defer cancel()
prefix, _ := be.Basedir(t) prefix, _ := be.Basedir(t)
iter := be.bucket.List(ctx, b2.ListPrefix(prefix), b2.ListPageSize(be.listMaxItems), b2.ListLocker(semLocker{be.sem})) iter := be.bucket.List(ctx, b2.ListPrefix(prefix), b2.ListPageSize(be.listMaxItems), b2.ListLocker(&semLocker{be.sem}))
for iter.Next() { for iter.Next() {
obj := iter.Object() obj := iter.Object()

View file

@ -14,6 +14,7 @@ import (
"cloud.google.com/go/storage" "cloud.google.com/go/storage"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/sema"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
@ -35,7 +36,7 @@ type Backend struct {
gcsClient *storage.Client gcsClient *storage.Client
projectID string projectID string
connections uint connections uint
sem *backend.Semaphore sem sema.Semaphore
bucketName string bucketName string
bucket *storage.BucketHandle bucket *storage.BucketHandle
prefix string prefix string
@ -97,7 +98,7 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) {
return nil, errors.Wrap(err, "getStorageClient") return nil, errors.Wrap(err, "getStorageClient")
} }
sem, err := backend.NewSemaphore(cfg.Connections) sem, err := sema.New(cfg.Connections)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -9,12 +9,12 @@ import (
"path/filepath" "path/filepath"
"syscall" "syscall"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/sema"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/fs" "github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/restic"
"github.com/cenkalti/backoff/v4" "github.com/cenkalti/backoff/v4"
) )
@ -22,7 +22,7 @@ import (
// Local is a backend in a local directory. // Local is a backend in a local directory.
type Local struct { type Local struct {
Config Config
sem *backend.Semaphore sem sema.Semaphore
backend.Layout backend.Layout
backend.Modes backend.Modes
} }
@ -38,7 +38,7 @@ func open(ctx context.Context, cfg Config) (*Local, error) {
return nil, err return nil, err
} }
sem, err := backend.NewSemaphore(cfg.Connections) sem, err := sema.New(cfg.Connections)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -11,6 +11,7 @@ import (
"sync" "sync"
"github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/sema"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
@ -32,12 +33,12 @@ const connectionCount = 2
type MemoryBackend struct { type MemoryBackend struct {
data memMap data memMap
m sync.Mutex m sync.Mutex
sem *backend.Semaphore sem sema.Semaphore
} }
// New returns a new backend that saves all data in a map in memory. // New returns a new backend that saves all data in a map in memory.
func New() *MemoryBackend { func New() *MemoryBackend {
sem, err := backend.NewSemaphore(connectionCount) sem, err := sema.New(connectionCount)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View file

@ -17,6 +17,7 @@ import (
"golang.org/x/net/context/ctxhttp" "golang.org/x/net/context/ctxhttp"
"github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/sema"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
@ -31,7 +32,7 @@ var _ restic.Backend = &Backend{}
type Backend struct { type Backend struct {
url *url.URL url *url.URL
connections uint connections uint
sem *backend.Semaphore sem sema.Semaphore
client *http.Client client *http.Client
backend.Layout backend.Layout
} }
@ -46,7 +47,7 @@ const (
func Open(cfg Config, rt http.RoundTripper) (*Backend, error) { func Open(cfg Config, rt http.RoundTripper) (*Backend, error) {
client := &http.Client{Transport: rt} client := &http.Client{Transport: rt}
sem, err := backend.NewSemaphore(cfg.Connections) sem, err := sema.New(cfg.Connections)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -13,6 +13,7 @@ import (
"time" "time"
"github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/sema"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
@ -25,7 +26,7 @@ import (
// Backend stores data on an S3 endpoint. // Backend stores data on an S3 endpoint.
type Backend struct { type Backend struct {
client *minio.Client client *minio.Client
sem *backend.Semaphore sem sema.Semaphore
cfg Config cfg Config
backend.Layout backend.Layout
} }
@ -101,7 +102,7 @@ func open(ctx context.Context, cfg Config, rt http.RoundTripper) (*Backend, erro
return nil, errors.Wrap(err, "minio.New") return nil, errors.Wrap(err, "minio.New")
} }
sem, err := backend.NewSemaphore(cfg.Connections) sem, err := sema.New(cfg.Connections)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -0,0 +1,65 @@
// Package sema implements semaphores.
package sema
import (
"context"
"io"
"github.com/restic/restic/internal/errors"
)
// A Semaphore limits access to a restricted resource.
type Semaphore struct {
ch chan struct{}
}
// New returns a new semaphore with capacity n.
func New(n uint) (Semaphore, error) {
if n == 0 {
return Semaphore{}, errors.New("capacity must be a positive number")
}
return Semaphore{
ch: make(chan struct{}, n),
}, nil
}
// GetToken blocks until a Token is available.
func (s Semaphore) GetToken() { s.ch <- struct{}{} }
// ReleaseToken returns a token.
func (s Semaphore) ReleaseToken() { <-s.ch }
// ReleaseTokenOnClose wraps an io.ReadCloser to return a token on Close.
// Before returning the token, cancel, if not nil, will be run
// to free up context resources.
func (s Semaphore) ReleaseTokenOnClose(rc io.ReadCloser, cancel context.CancelFunc) io.ReadCloser {
return &wrapReader{ReadCloser: rc, sem: s, cancel: cancel}
}
type wrapReader struct {
io.ReadCloser
eofSeen bool
sem Semaphore
cancel context.CancelFunc
}
func (wr *wrapReader) Read(p []byte) (int, error) {
if wr.eofSeen { // XXX Why do we do this?
return 0, io.EOF
}
n, err := wr.ReadCloser.Read(p)
if err == io.EOF {
wr.eofSeen = true
}
return n, err
}
func (wr *wrapReader) Close() error {
err := wr.ReadCloser.Close()
if wr.cancel != nil {
wr.cancel()
}
wr.sem.ReleaseToken()
return err
}

View file

@ -1,69 +0,0 @@
package backend
import (
"context"
"io"
"github.com/restic/restic/internal/errors"
)
// Semaphore limits access to a restricted resource.
type Semaphore struct {
ch chan struct{}
}
// NewSemaphore returns a new semaphore with capacity n.
func NewSemaphore(n uint) (*Semaphore, error) {
if n == 0 {
return nil, errors.New("must be a positive number")
}
return &Semaphore{
ch: make(chan struct{}, n),
}, nil
}
// GetToken blocks until a Token is available.
func (s *Semaphore) GetToken() {
s.ch <- struct{}{}
}
// ReleaseToken returns a token.
func (s *Semaphore) ReleaseToken() {
<-s.ch
}
// ReleaseTokenOnClose wraps an io.ReadCloser to return a token on Close. Before returning the token,
// cancel, if provided, will be run to free up context resources.
func (s *Semaphore) ReleaseTokenOnClose(rc io.ReadCloser, cancel context.CancelFunc) io.ReadCloser {
return &wrapReader{rc, false, func() {
if cancel != nil {
cancel()
}
s.ReleaseToken()
}}
}
// wrapReader wraps an io.ReadCloser to run an additional function on Close.
type wrapReader struct {
io.ReadCloser
eofSeen bool
f func()
}
func (wr *wrapReader) Read(p []byte) (int, error) {
if wr.eofSeen {
return 0, io.EOF
}
n, err := wr.ReadCloser.Read(p)
if err == io.EOF {
wr.eofSeen = true
}
return n, err
}
func (wr *wrapReader) Close() error {
err := wr.ReadCloser.Close()
wr.f()
return err
}

View file

@ -13,12 +13,12 @@ import (
"path" "path"
"time" "time"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/sema"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/debug"
"github.com/cenkalti/backoff/v4" "github.com/cenkalti/backoff/v4"
"github.com/pkg/sftp" "github.com/pkg/sftp"
) )
@ -33,7 +33,7 @@ type SFTP struct {
posixRename bool posixRename bool
sem *backend.Semaphore sem sema.Semaphore
backend.Layout backend.Layout
Config Config
backend.Modes backend.Modes
@ -121,7 +121,7 @@ func (r *SFTP) clientError() error {
func Open(ctx context.Context, cfg Config) (*SFTP, error) { func Open(ctx context.Context, cfg Config) (*SFTP, error) {
debug.Log("open backend with config %#v", cfg) debug.Log("open backend with config %#v", cfg)
sem, err := backend.NewSemaphore(cfg.Connections) sem, err := sema.New(cfg.Connections)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -14,6 +14,7 @@ import (
"time" "time"
"github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/sema"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
@ -26,7 +27,7 @@ import (
type beSwift struct { type beSwift struct {
conn *swift.Connection conn *swift.Connection
connections uint connections uint
sem *backend.Semaphore sem sema.Semaphore
container string // Container name container string // Container name
prefix string // Prefix of object names in the container prefix string // Prefix of object names in the container
backend.Layout backend.Layout
@ -40,7 +41,7 @@ var _ restic.Backend = &beSwift{}
func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend, error) { func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend, error) {
debug.Log("config %#v", cfg) debug.Log("config %#v", cfg)
sem, err := backend.NewSemaphore(cfg.Connections) sem, err := sema.New(cfg.Connections)
if err != nil { if err != nil {
return nil, err return nil, err
} }