backend: Make pagination for List configurable

This commit is contained in:
Alexander Neumann 2017-09-18 12:01:54 +02:00
parent 649c536250
commit 3b6a580b32
3 changed files with 72 additions and 54 deletions

View file

@ -17,13 +17,16 @@ import (
// Backend stores data on an azure endpoint. // Backend stores data on an azure endpoint.
type Backend struct { type Backend struct {
accountName string accountName string
container *storage.Container container *storage.Container
sem *backend.Semaphore sem *backend.Semaphore
prefix string prefix string
listMaxItems int
backend.Layout backend.Layout
} }
const defaultListMaxItems = 5000
// make sure that *Backend implements backend.Backend // make sure that *Backend implements backend.Backend
var _ restic.Backend = &Backend{} var _ restic.Backend = &Backend{}
@ -53,6 +56,7 @@ func open(cfg Config) (*Backend, error) {
Path: cfg.Prefix, Path: cfg.Prefix,
Join: path.Join, Join: path.Join,
}, },
listMaxItems: defaultListMaxItems,
} }
return be, nil return be, nil
@ -84,6 +88,11 @@ func Create(cfg Config) (restic.Backend, error) {
return be, nil return be, nil
} }
// SetListMaxItems sets the number of list items to load per request.
func (be *Backend) SetListMaxItems(i int) {
be.listMaxItems = i
}
// IsNotExist returns true if the error is caused by a not existing file. // IsNotExist returns true if the error is caused by a not existing file.
func (be *Backend) IsNotExist(err error) bool { func (be *Backend) IsNotExist(err error) bool {
debug.Log("IsNotExist(%T, %#v)", err, err) debug.Log("IsNotExist(%T, %#v)", err, err)
@ -265,7 +274,7 @@ func (be *Backend) List(ctx context.Context, t restic.FileType) <-chan string {
} }
params := storage.ListBlobsParameters{ params := storage.ListBlobsParameters{
MaxResults: 1000, MaxResults: uint(be.listMaxItems),
Prefix: prefix, Prefix: prefix,
} }

View file

@ -22,11 +22,12 @@ import (
// Backend stores data on an gs endpoint. // Backend stores data on an gs endpoint.
type Backend struct { type Backend struct {
service *storage.Service service *storage.Service
projectID string projectID string
sem *backend.Semaphore sem *backend.Semaphore
bucketName string bucketName string
prefix string prefix string
listMaxItems int
backend.Layout backend.Layout
} }
@ -55,6 +56,8 @@ func getStorageService(jsonKeyPath string) (*storage.Service, error) {
return service, nil return service, nil
} }
const defaultListMaxItems = 1000
func open(cfg Config) (*Backend, error) { func open(cfg Config) (*Backend, error) {
debug.Log("open, config %#v", cfg) debug.Log("open, config %#v", cfg)
@ -78,6 +81,7 @@ func open(cfg Config) (*Backend, error) {
Path: cfg.Prefix, Path: cfg.Prefix,
Join: path.Join, Join: path.Join,
}, },
listMaxItems: defaultListMaxItems,
} }
return be, nil return be, nil
@ -111,6 +115,11 @@ func Create(cfg Config) (restic.Backend, error) {
return be, nil return be, nil
} }
// SetListMaxItems sets the number of list items to load per request.
func (be *Backend) SetListMaxItems(i int) {
be.listMaxItems = i
}
// IsNotExist returns true if the error is caused by a not existing file. // IsNotExist returns true if the error is caused by a not existing file.
func (be *Backend) IsNotExist(err error) bool { func (be *Backend) IsNotExist(err error) bool {
debug.Log("IsNotExist(%T, %#v)", err, err) debug.Log("IsNotExist(%T, %#v)", err, err)
@ -298,7 +307,7 @@ func (be *Backend) List(ctx context.Context, t restic.FileType) <-chan string {
go func() { go func() {
defer close(ch) defer close(ch)
listReq := be.service.Objects.List(be.bucketName).Prefix(prefix) listReq := be.service.Objects.List(be.bucketName).Prefix(prefix).MaxResults(int64(be.listMaxItems))
for { for {
obj, err := listReq.Do() obj, err := listReq.Do()
if err != nil { if err != nil {

View file

@ -11,7 +11,6 @@ import (
"reflect" "reflect"
"sort" "sort"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
@ -241,67 +240,68 @@ func (s *Suite) TestLoad(t *testing.T) {
test.OK(t, b.Remove(context.TODO(), handle)) test.OK(t, b.Remove(context.TODO(), handle))
} }
// TestList makes sure that the backend can list more than a thousand files. // TestList makes sure that the backend implements List() pagination correctly.
func (s *Suite) TestList(t *testing.T) { func (s *Suite) TestList(t *testing.T) {
seedRand(t) seedRand(t)
numTestFiles := rand.Intn(20) + 20
b := s.open(t) b := s.open(t)
defer s.close(t, b) defer s.close(t, b)
const numTestFiles = 1233
list1 := restic.NewIDSet() list1 := restic.NewIDSet()
var wg sync.WaitGroup
input := make(chan int, numTestFiles)
for i := 0; i < numTestFiles; i++ { for i := 0; i < numTestFiles; i++ {
input <- i data := []byte(fmt.Sprintf("random test blob %v", i))
} id := restic.Hash(data)
close(input) h := restic.Handle{Type: restic.DataFile, Name: id.String()}
err := b.Save(context.TODO(), h, bytes.NewReader(data))
output := make(chan restic.ID, numTestFiles) if err != nil {
t.Fatal(err)
for worker := 0; worker < 5; worker++ { }
wg.Add(1)
go func() {
defer wg.Done()
for i := range input {
data := []byte(fmt.Sprintf("random test blob %v", i))
id := restic.Hash(data)
h := restic.Handle{Type: restic.DataFile, Name: id.String()}
err := b.Save(context.TODO(), h, bytes.NewReader(data))
if err != nil {
t.Fatal(err)
}
output <- id
}
}()
}
wg.Wait()
close(output)
for id := range output {
list1.Insert(id) list1.Insert(id)
} }
t.Logf("wrote %v files", len(list1)) t.Logf("wrote %v files", len(list1))
list2 := restic.NewIDSet() var tests = []struct {
for name := range b.List(context.TODO(), restic.DataFile) { maxItems int
id, err := restic.ParseID(name) }{
if err != nil { {3}, {8}, {11}, {13}, {23},
t.Fatal(err) {numTestFiles}, {numTestFiles + 7}, {numTestFiles + 10}, {numTestFiles + 1123},
}
list2.Insert(id)
} }
t.Logf("loaded %v IDs from backend", len(list2)) for _, test := range tests {
t.Run(fmt.Sprintf("max-%v", test.maxItems), func(t *testing.T) {
list2 := restic.NewIDSet()
if !list1.Equals(list2) { type setter interface {
t.Errorf("lists are not equal, list1 %d entries, list2 %d entries", len(list1), len(list2)) SetListMaxItems(int)
}
if s, ok := b.(setter); ok {
t.Logf("setting max list items to %d", test.maxItems)
s.SetListMaxItems(test.maxItems)
}
for name := range b.List(context.TODO(), restic.DataFile) {
id, err := restic.ParseID(name)
if err != nil {
t.Fatal(err)
}
list2.Insert(id)
}
t.Logf("loaded %v IDs from backend", len(list2))
if !list1.Equals(list2) {
t.Errorf("lists are not equal, list1 %d entries, list2 %d entries",
len(list1), len(list2))
}
})
} }
t.Logf("remove %d files", numTestFiles)
for id := range list1 { for id := range list1 {
h := restic.Handle{Type: restic.DataFile, Name: id.String()} h := restic.Handle{Type: restic.DataFile, Name: id.String()}
err := s.delayedRemove(t, b, h) err := s.delayedRemove(t, b, h)