[#1707] blobovnicza/get: Iterate over all buckets regardless of config
Re-configuration of Blobovnicza's object size limit must not affect already stored objects. In previous implementation `Blobovnicza` didn't see objects stored in buckets which became too big after size limit re-configuration. For example, lets consider 1st configuration with 64KB size limit for stored objects. Lets assume that we stored object of 64KB size, and re-configured `Blobovnicza` with 32KB limit. After reboot object should be still available, but actually it isn't. This is caused by `Get` operation algorithm which iterates over configured size ranges only, and doesn't process any other existing size bucket. By the way, increasing of the object size limit didn't lead to the problem even in previous implementation. Make `Blobovnicza.Get` method to iterate over all size buckets regardless of current configuration. This covers the described scenario. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
8e6e89aca3
commit
8e8265d5ea
2 changed files with 87 additions and 13 deletions
|
@ -1,11 +1,12 @@
|
||||||
package blobovnicza
|
package blobovnicza
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
||||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// GetPrm groups the parameters of Get operation.
|
// GetPrm groups the parameters of Get operation.
|
||||||
|
@ -28,6 +29,9 @@ func (p GetRes) Object() []byte {
|
||||||
return p.obj
|
return p.obj
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// special error for normal bbolt.Tx.ForEach interruption.
|
||||||
|
var errInterruptForEach = errors.New("interrupt for-each")
|
||||||
|
|
||||||
// Get reads an object from Blobovnicza by address.
|
// Get reads an object from Blobovnicza by address.
|
||||||
//
|
//
|
||||||
// Returns any error encountered that
|
// Returns any error encountered that
|
||||||
|
@ -42,22 +46,17 @@ func (b *Blobovnicza) Get(prm GetPrm) (GetRes, error) {
|
||||||
)
|
)
|
||||||
|
|
||||||
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
return b.iterateBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) {
|
return tx.ForEach(func(_ []byte, buck *bbolt.Bucket) error {
|
||||||
data = buck.Get(addrKey)
|
data = buck.Get(addrKey)
|
||||||
|
if data == nil {
|
||||||
stop := data != nil
|
return nil
|
||||||
|
|
||||||
if stop {
|
|
||||||
b.log.Debug("object is found in bucket",
|
|
||||||
zap.String("binary size", stringifyByteSize(uint64(len(data)))),
|
|
||||||
zap.String("range", stringifyBounds(lower, upper)),
|
|
||||||
)
|
|
||||||
data = slice.Copy(data)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return stop, nil
|
data = slice.Copy(data)
|
||||||
|
|
||||||
|
return errInterruptForEach
|
||||||
})
|
})
|
||||||
}); err != nil {
|
}); err != nil && err != errInterruptForEach {
|
||||||
return GetRes{}, err
|
return GetRes{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
75
pkg/local_object_storage/blobovnicza/get_test.go
Normal file
75
pkg/local_object_storage/blobovnicza/get_test.go
Normal file
|
@ -0,0 +1,75 @@
|
||||||
|
package blobovnicza
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBlobovnicza_Get(t *testing.T) {
|
||||||
|
t.Run("re-configure object size limit", func(t *testing.T) {
|
||||||
|
filename := filepath.Join(t.TempDir(), "blob")
|
||||||
|
|
||||||
|
var blz *Blobovnicza
|
||||||
|
|
||||||
|
t.Cleanup(func() {
|
||||||
|
blz.Close()
|
||||||
|
os.RemoveAll(filename)
|
||||||
|
})
|
||||||
|
|
||||||
|
fnInit := func(szLimit uint64) {
|
||||||
|
if blz != nil {
|
||||||
|
require.NoError(t, blz.Close())
|
||||||
|
}
|
||||||
|
|
||||||
|
blz = New(
|
||||||
|
WithPath(filename),
|
||||||
|
WithObjectSizeLimit(szLimit),
|
||||||
|
)
|
||||||
|
|
||||||
|
require.NoError(t, blz.Open())
|
||||||
|
require.NoError(t, blz.Init())
|
||||||
|
}
|
||||||
|
|
||||||
|
// initial distribution: [0:32K] (32K:64K]
|
||||||
|
fnInit(2 * firstBucketBound)
|
||||||
|
|
||||||
|
addr := oidtest.Address()
|
||||||
|
obj := make([]byte, firstBucketBound+1)
|
||||||
|
|
||||||
|
var prmPut PutPrm
|
||||||
|
prmPut.SetAddress(addr)
|
||||||
|
prmPut.SetMarshaledObject(obj)
|
||||||
|
|
||||||
|
// place object to [32K:64K] bucket
|
||||||
|
_, err := blz.Put(prmPut)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var prmGet GetPrm
|
||||||
|
prmGet.SetAddress(addr)
|
||||||
|
|
||||||
|
checkObj := func() {
|
||||||
|
res, err := blz.Get(prmGet)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, obj, res.Object())
|
||||||
|
}
|
||||||
|
|
||||||
|
// object should be available
|
||||||
|
checkObj()
|
||||||
|
|
||||||
|
// new distribution (extended): [0:32K] (32K:64K] (64K:128K]
|
||||||
|
fnInit(3 * firstBucketBound)
|
||||||
|
|
||||||
|
// object should be still available
|
||||||
|
checkObj()
|
||||||
|
|
||||||
|
// new distribution (shrunk): [0:32K]
|
||||||
|
fnInit(firstBucketBound)
|
||||||
|
|
||||||
|
// object should be still available
|
||||||
|
checkObj()
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in a new issue