[#216] blobovnicza: Implement main methods and logic

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-11-26 17:26:53 +03:00 committed by Alex Vanin
parent 52b0c1f19a
commit 07e998d846
10 changed files with 627 additions and 0 deletions

1
go.mod
View file

@ -4,6 +4,7 @@ go 1.14
require (
bou.ke/monkey v1.0.2
code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48
github.com/alecthomas/participle v0.6.0
github.com/golang/protobuf v1.4.3
github.com/google/uuid v1.1.1

2
go.sum
View file

@ -13,6 +13,8 @@ cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7
cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk=
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48 h1:/EMHruHCFXR9xClkGV/t0rmHrdhX4+trQUcBqjwc9xE=
code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=

View file

@ -0,0 +1,138 @@
package blobovnicza
import (
"crypto/rand"
"crypto/sha256"
"os"
"testing"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/util/logger/test"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)
func testSHA256() (h [sha256.Size]byte) {
rand.Read(h[:])
return h
}
func testAddress() *objectSDK.Address {
cid := container.NewID()
cid.SetSHA256(testSHA256())
oid := objectSDK.NewID()
oid.SetSHA256(testSHA256())
addr := objectSDK.NewAddress()
addr.SetObjectID(oid)
addr.SetContainerID(cid)
return addr
}
func testObject(sz uint64) *object.Object {
raw := object.NewRaw()
addr := testAddress()
raw.SetID(addr.ObjectID())
raw.SetContainerID(addr.ContainerID())
raw.SetPayload(make([]byte, sz))
// fit the binary size to the required
data, _ := raw.Marshal()
if ln := uint64(len(data)); ln > sz {
raw.SetPayload(raw.Payload()[:sz-(ln-sz)])
}
raw.SetAttributes() // for require.Equal
return raw.Object()
}
func testPutGet(t *testing.T, blz *Blobovnicza, sz uint64, expPut, expGet error) *objectSDK.Address {
// create new object
obj := testObject(sz)
// try to save object in Blobovnicza
pPut := new(PutPrm)
pPut.SetObject(obj)
_, err := blz.Put(pPut)
require.True(t, errors.Is(err, expPut))
if expPut != nil {
return nil
}
testGet(t, blz, obj.Address(), obj, expGet)
return obj.Address()
}
func testGet(t *testing.T, blz *Blobovnicza, addr *objectSDK.Address, expObj *object.Object, expErr error) {
pGet := new(GetPrm)
pGet.SetAddress(addr)
// try to read object from Blobovnicza
res, err := blz.Get(pGet)
require.True(t, errors.Is(err, expErr))
if expErr == nil {
require.Equal(t, expObj, res.Object())
}
}
func TestBlobovnicza(t *testing.T) {
p := "./test_blz"
sizeLim := uint64(256 * 1 << 10) // 256KB
objSizeLim := sizeLim / 2
// create Blobovnicza instance
blz := New(
WithPath(p),
WithObjectSizeLimit(objSizeLim),
WithFullSizeLimit(sizeLim),
WithLogger(test.NewLogger(false)),
)
defer os.Remove(p)
// open Blobovnicza
require.NoError(t, blz.Open())
// initialize Blobovnicza
require.NoError(t, blz.Init())
// try to read non-existent address
testGet(t, blz, testAddress(), nil, ErrObjectNotFound)
filled := uint64(15 * 1 << 10)
// test object 15KB
addr := testPutGet(t, blz, filled, nil, nil)
// remove the object
dPrm := new(DeletePrm)
dPrm.SetAddress(addr)
_, err := blz.Delete(dPrm)
require.NoError(t, err)
// should return 404
testGet(t, blz, addr, nil, ErrObjectNotFound)
// fill Blobovnicza fully
for ; filled < sizeLim; filled += objSizeLim {
testPutGet(t, blz, objSizeLim, nil, nil)
}
// from now objects should not be saved
testPutGet(t, blz, 1024, ErrFull, nil)
require.NoError(t, blz.Close())
}

View file

@ -0,0 +1,72 @@
package blobovnicza
import (
"os"
"path"
"github.com/pkg/errors"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
// Open opens an internal database at configured path with configured permissions.
//
// If the database file does not exist then it will be created automatically.
func (b *Blobovnicza) Open() error {
b.log.Debug("creating directory for BoltDB",
zap.String("path", b.path),
)
err := os.MkdirAll(path.Dir(b.path), b.perm)
if err == nil {
b.log.Debug("opening BoltDB",
zap.String("path", b.path),
zap.Stringer("permissions", b.perm),
)
b.boltDB, err = bbolt.Open(b.path, b.perm, b.boltOptions)
}
return err
}
// Init initializes internal database structure.
//
// If Blobovnicza is already initialized, then no action is taken.
func (b *Blobovnicza) Init() error {
b.log.Debug("initializing...",
zap.Uint64("object size limit", b.objSizeLimit),
zap.Uint64("storage size limit", b.fullSizeLimit),
)
return b.boltDB.Update(func(tx *bbolt.Tx) error {
return b.iterateBucketKeys(func(lower, upper uint64, key []byte) (bool, error) {
// create size range bucket
b.log.Debug("creating bucket for size range",
zap.String("range", stringifyBounds(lower, upper)),
)
_, err := tx.CreateBucket(key)
if errors.Is(err, bbolt.ErrBucketExists) {
// => "smallest" bucket exists => already initialized => do nothing
// TODO: consider separate bucket structure allocation step
// and state initialization/activation
b.log.Debug("bucket already exists, initializing state")
return true, b.syncFullnessCounter()
}
return false, errors.Wrapf(err,
"(%T) could not create bucket for bounds [%d:%d]", b, lower, upper)
})
})
}
// Close releases all internal database resources.
func (b *Blobovnicza) Close() error {
b.log.Debug("closing BoltDB")
return b.boltDB.Close()
}

View file

@ -0,0 +1,69 @@
package blobovnicza
import (
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
// DeletePrm groups the parameters of Delete operation.
type DeletePrm struct {
addr *objectSDK.Address
}
// DeleteRes groups resulting values of Delete operation.
type DeleteRes struct {
}
// SetAddress sets address of the requested object.
func (p *DeletePrm) SetAddress(addr *objectSDK.Address) {
p.addr = addr
}
// Delete removes object from Blobovnicza by address.
//
// Returns any error encountered that
// did not allow to completely delete the object.
//
// Returns ErrObjectNotFound if the object to be deleted is not in blobovnicza.
func (b *Blobovnicza) Delete(prm *DeletePrm) (*DeleteRes, error) {
addrKey := addressKey(prm.addr)
removed := false
err := b.boltDB.Update(func(tx *bbolt.Tx) error {
return b.iterateBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) {
objData := buck.Get(addrKey)
if objData == nil {
// object is not in bucket => continue iterating
return false, nil
}
sz := uint64(len(objData))
// decrease fullness counter
b.decSize(sz)
// remove object from the bucket
err := buck.Delete(addrKey)
if err == nil {
b.log.Debug("object was removed from bucket",
zap.String("binary size", stringifyByteSize(sz)),
zap.String("range", stringifyBounds(lower, upper)),
)
}
removed = true
// stop iteration
return true, err
})
})
if err == nil && !removed {
err = ErrObjectNotFound
}
return nil, err
}

View file

@ -0,0 +1,78 @@
package blobovnicza
import (
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/pkg/errors"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
// GetPrm groups the parameters of Get operation.
type GetPrm struct {
addr *objectSDK.Address
}
// GetRes groups resulting values of Get operation.
type GetRes struct {
obj *object.Object
}
// ErrObjectNotFound is returns on read operations requested on a missing object.
var ErrObjectNotFound = errors.New("object not found")
// SetAddress sets address of the requested object.
func (p *GetPrm) SetAddress(addr *objectSDK.Address) {
p.addr = addr
}
// Object returns the requested object.
func (p *GetRes) Object() *object.Object {
return p.obj
}
// Get reads the object from Blobovnicza by address.
//
// Returns any error encountered that
// did not allow to completely read the object.
func (b *Blobovnicza) Get(prm *GetPrm) (*GetRes, error) {
var (
data []byte
addrKey = addressKey(prm.addr)
)
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
return b.iterateBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) {
data = buck.Get(addrKey)
stop := data != nil
if stop {
b.log.Debug("object is found in bucket",
zap.String("binary size", stringifyByteSize(uint64(len(data)))),
zap.String("range", stringifyBounds(lower, upper)),
)
}
return stop, nil
})
}); err != nil {
return nil, err
}
if data == nil {
return nil, ErrObjectNotFound
}
// TODO: add decompression step
// unmarshal the object
obj := object.New()
if err := obj.Unmarshal(data); err != nil {
return nil, errors.Wrap(err, "could not unmarshal the object")
}
return &GetRes{
obj: obj,
}, nil
}

View file

@ -0,0 +1,46 @@
package blobovnicza
import (
"github.com/pkg/errors"
"go.etcd.io/bbolt"
)
func (b *Blobovnicza) iterateBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error {
return b.iterateBucketKeys(func(lower uint64, upper uint64, key []byte) (bool, error) {
buck := tx.Bucket(key)
if buck == nil {
// expected to happen:
// - before initialization step (incorrect usage by design)
// - if DB is corrupted (in future this case should be handled)
return false, errors.Errorf("(%T) could not get bucket %s", b, stringifyBounds(lower, upper))
}
return f(lower, upper, buck)
})
}
func (b *Blobovnicza) iterateBucketKeys(f func(uint64, uint64, []byte) (bool, error)) error {
return b.iterateBounds(func(lower, upper uint64) (bool, error) {
return f(lower, upper, bucketKeyFromBounds(upper))
})
}
func (b *Blobovnicza) iterateBounds(f func(uint64, uint64) (bool, error)) error {
for upper := firstBucketBound; upper <= b.objSizeLimit; upper *= 2 {
var lower uint64
if upper == firstBucketBound {
lower = 0
} else {
lower = upper/2 + 1
}
if stop, err := f(lower, upper); err != nil {
return err
} else if stop {
break
}
}
return nil
}

View file

@ -0,0 +1,108 @@
package blobovnicza
import (
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/pkg/errors"
"go.etcd.io/bbolt"
)
// PutPrm groups the parameters of Put operation.
type PutPrm struct {
addr *objectSDK.Address
obj *object.Object
objData []byte
}
// PutRes groups resulting values of Put operation.
type PutRes struct {
}
// ErrFull is returned returned when trying to save an
// object to a filled blobovnicza.
var ErrFull = errors.New("blobovnicza is full")
var errNilAddress = errors.New("object address is nil")
// SetAddress sets address of saving object.
func (p *PutPrm) SetAddress(addr *objectSDK.Address) {
p.addr = addr
}
// SetObject sets the object.
func (p *PutPrm) SetObject(obj *object.Object) {
p.obj = obj
}
// SetMarshaledObject sets binary representation of the object.
func (p *PutPrm) SetMarshaledObject(data []byte) {
p.objData = data
}
// Put saves object in Blobovnicza.
//
// If binary representation of the object is not set,
// it is calculated via Marshal method.
//
// The size of the object MUST BE less that or equal to
// the size specified in WithObjectSizeLimit option.
//
// Returns any error encountered that
// did not allow to completely save the object.
//
// Returns ErrFull if blobovnicza is filled.
func (b *Blobovnicza) Put(prm *PutPrm) (*PutRes, error) {
addr := prm.addr
if addr == nil {
if addr = prm.obj.Address(); addr == nil {
return nil, errNilAddress
}
}
err := b.boltDB.Update(func(tx *bbolt.Tx) error {
if b.full() {
return ErrFull
}
// marshal the object
data := prm.objData
if data == nil {
var err error
if data, err = prm.obj.Marshal(); err != nil {
return errors.Wrapf(err, "(%T) could not marshal the object", b)
}
}
// TODO: add compression step
// calculate size
sz := uint64(len(data))
// get bucket for size
buck := tx.Bucket(bucketForSize(sz))
if buck == nil {
// expected to happen:
// - before initialization step (incorrect usage by design)
// - if DB is corrupted (in future this case should be handled)
return errors.Errorf("(%T) bucket for size %d not created", b, sz)
}
// save the object in bucket
if err := buck.Put(addressKey(addr), data); err != nil {
return errors.Wrapf(err, "(%T) could not save object in bucket", b)
}
// increase fullness counter
b.incSize(sz)
return nil
})
return nil, err
}
func addressKey(addr *objectSDK.Address) []byte {
return []byte(addr.String())
}

View file

@ -0,0 +1,75 @@
package blobovnicza
import (
"encoding/binary"
"fmt"
"code.cloudfoundry.org/bytefmt"
"github.com/pkg/errors"
"go.etcd.io/bbolt"
)
const firstBucketBound = uint64(32 * 1 << 10) // 32KB
func stringifyBounds(lower, upper uint64) string {
return fmt.Sprintf("[%s:%s]",
stringifyByteSize(lower),
stringifyByteSize(upper),
)
}
func stringifyByteSize(sz uint64) string {
return bytefmt.ByteSize(sz)
}
func bucketKeyFromBounds(upperBound uint64) []byte {
buf := make([]byte, binary.MaxVarintLen64)
ln := binary.PutUvarint(buf, upperBound)
return buf[:ln]
}
func bucketForSize(sz uint64) []byte {
var upperBound uint64
for upperBound = firstBucketBound; upperBound < sz; upperBound *= 2 {
}
return bucketKeyFromBounds(upperBound)
}
func (b *Blobovnicza) incSize(sz uint64) {
b.filled.Add(sz)
}
func (b *Blobovnicza) decSize(sz uint64) {
b.filled.Sub(sz)
}
func (b *Blobovnicza) full() bool {
return b.filled.Load() >= b.fullSizeLimit
}
func (b *Blobovnicza) syncFullnessCounter() error {
return errors.Wrap(b.boltDB.View(func(tx *bbolt.Tx) error {
sz := uint64(0)
if err := b.iterateBucketKeys(func(lower, upper uint64, key []byte) (bool, error) {
buck := tx.Bucket(key)
if buck == nil {
return false, errors.Errorf("bucket not found %s", stringifyBounds(lower, upper))
}
sz += uint64(buck.Stats().KeyN) * (upper - lower)
return false, nil
}); err != nil {
return err
}
b.filled.Store(sz)
return nil
}), "(%T) could not sync fullness counter")
}

View file

@ -0,0 +1,38 @@
package blobovnicza
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestSizes(t *testing.T) {
for _, item := range []struct {
sz uint64 // object size
upperBound uint64 // upper bound of expected range
}{
{
sz: 0,
upperBound: firstBucketBound,
},
{
sz: firstBucketBound,
upperBound: firstBucketBound,
},
{
sz: firstBucketBound + 1,
upperBound: 2 * firstBucketBound,
},
{
sz: 2 * firstBucketBound,
upperBound: 2 * firstBucketBound,
},
{
sz: 2*firstBucketBound + 1,
upperBound: 4 * firstBucketBound,
},
} {
require.Equal(t, bucketKeyFromBounds(item.upperBound), bucketForSize(item.sz))
}
}