From 07e998d846a6450ac5ccebdea19aa529e7600b52 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 26 Nov 2020 17:26:53 +0300 Subject: [PATCH] [#216] blobovnicza: Implement main methods and logic Signed-off-by: Leonard Lyubich --- go.mod | 1 + go.sum | Bin 60001 -> 60234 bytes .../blobovnicza/blobovnicza_test.go | 138 ++++++++++++++++++ .../blobovnicza/control.go | 72 +++++++++ .../blobovnicza/delete.go | 69 +++++++++ pkg/local_object_storage/blobovnicza/get.go | 78 ++++++++++ .../blobovnicza/iterate.go | 46 ++++++ pkg/local_object_storage/blobovnicza/put.go | 108 ++++++++++++++ pkg/local_object_storage/blobovnicza/sizes.go | 75 ++++++++++ .../blobovnicza/sizes_test.go | 38 +++++ 10 files changed, 625 insertions(+) create mode 100644 pkg/local_object_storage/blobovnicza/blobovnicza_test.go create mode 100644 pkg/local_object_storage/blobovnicza/control.go create mode 100644 pkg/local_object_storage/blobovnicza/delete.go create mode 100644 pkg/local_object_storage/blobovnicza/get.go create mode 100644 pkg/local_object_storage/blobovnicza/iterate.go create mode 100644 pkg/local_object_storage/blobovnicza/put.go create mode 100644 pkg/local_object_storage/blobovnicza/sizes.go create mode 100644 pkg/local_object_storage/blobovnicza/sizes_test.go diff --git a/go.mod b/go.mod index bdff4ed3..66c88841 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.14 require ( bou.ke/monkey v1.0.2 + code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48 github.com/alecthomas/participle v0.6.0 github.com/golang/protobuf v1.4.3 github.com/google/uuid v1.1.1 diff --git a/go.sum b/go.sum index 6ffdced8960a196efb79dd32c54f2715ab5ec642..4ac85ba46f4787618ca0d6a33fbf5ea08c43d013 100644 GIT binary patch delta 270 zcmaEOh56Jq<_#}c>XY+RQuUH^@=H_F@=NnliYoQ;i_-OzDoaw+a!VA-4D^6d*T}%g zz`)Sh(7?dR#MoRnInC5G#ni~cBGJS`A;Zv0-__TnsMN#REh5OW!Z|0~JxssEpeWa) zC?zApM7yLYFf`ezFsnS-vclDt3)RAU(rwgF&)3V%PXSq6?x$asZJwJEX8v%>0joUQk-WRY2sYvUzBXim6DrTQk1DzoKaeom{XZwrk9+btH1d# HOUhFKz|~sd delta 14 WcmX?gjrrjf<_#}cHeY0|d sz { + raw.SetPayload(raw.Payload()[:sz-(ln-sz)]) + } + + raw.SetAttributes() // for require.Equal + + return raw.Object() +} + +func testPutGet(t *testing.T, blz *Blobovnicza, sz uint64, expPut, expGet error) *objectSDK.Address { + // create new object + obj := testObject(sz) + + // try to save object in Blobovnicza + pPut := new(PutPrm) + pPut.SetObject(obj) + _, err := blz.Put(pPut) + require.True(t, errors.Is(err, expPut)) + + if expPut != nil { + return nil + } + + testGet(t, blz, obj.Address(), obj, expGet) + + return obj.Address() +} + +func testGet(t *testing.T, blz *Blobovnicza, addr *objectSDK.Address, expObj *object.Object, expErr error) { + pGet := new(GetPrm) + pGet.SetAddress(addr) + + // try to read object from Blobovnicza + res, err := blz.Get(pGet) + require.True(t, errors.Is(err, expErr)) + + if expErr == nil { + require.Equal(t, expObj, res.Object()) + } +} + +func TestBlobovnicza(t *testing.T) { + p := "./test_blz" + + sizeLim := uint64(256 * 1 << 10) // 256KB + objSizeLim := sizeLim / 2 + + // create Blobovnicza instance + blz := New( + WithPath(p), + WithObjectSizeLimit(objSizeLim), + WithFullSizeLimit(sizeLim), + WithLogger(test.NewLogger(false)), + ) + + defer os.Remove(p) + + // open Blobovnicza + require.NoError(t, blz.Open()) + + // initialize Blobovnicza + require.NoError(t, blz.Init()) + + // try to read non-existent address + testGet(t, blz, testAddress(), nil, ErrObjectNotFound) + + filled := uint64(15 * 1 << 10) + + // test object 15KB + addr := testPutGet(t, blz, filled, nil, nil) + + // remove the object + dPrm := new(DeletePrm) + dPrm.SetAddress(addr) + + _, err := blz.Delete(dPrm) + require.NoError(t, err) + + // should return 404 + testGet(t, blz, addr, nil, ErrObjectNotFound) + + // fill Blobovnicza fully + for ; filled < sizeLim; filled += objSizeLim { + testPutGet(t, blz, objSizeLim, nil, nil) + } + + // from now objects should not be saved + testPutGet(t, blz, 1024, ErrFull, nil) + + require.NoError(t, blz.Close()) +} diff --git a/pkg/local_object_storage/blobovnicza/control.go b/pkg/local_object_storage/blobovnicza/control.go new file mode 100644 index 00000000..a188f3cd --- /dev/null +++ b/pkg/local_object_storage/blobovnicza/control.go @@ -0,0 +1,72 @@ +package blobovnicza + +import ( + "os" + "path" + + "github.com/pkg/errors" + "go.etcd.io/bbolt" + "go.uber.org/zap" +) + +// Open opens an internal database at configured path with configured permissions. +// +// If the database file does not exist then it will be created automatically. +func (b *Blobovnicza) Open() error { + b.log.Debug("creating directory for BoltDB", + zap.String("path", b.path), + ) + + err := os.MkdirAll(path.Dir(b.path), b.perm) + if err == nil { + b.log.Debug("opening BoltDB", + zap.String("path", b.path), + zap.Stringer("permissions", b.perm), + ) + + b.boltDB, err = bbolt.Open(b.path, b.perm, b.boltOptions) + } + + return err +} + +// Init initializes internal database structure. +// +// If Blobovnicza is already initialized, then no action is taken. +func (b *Blobovnicza) Init() error { + b.log.Debug("initializing...", + zap.Uint64("object size limit", b.objSizeLimit), + zap.Uint64("storage size limit", b.fullSizeLimit), + ) + + return b.boltDB.Update(func(tx *bbolt.Tx) error { + return b.iterateBucketKeys(func(lower, upper uint64, key []byte) (bool, error) { + // create size range bucket + + b.log.Debug("creating bucket for size range", + zap.String("range", stringifyBounds(lower, upper)), + ) + + _, err := tx.CreateBucket(key) + if errors.Is(err, bbolt.ErrBucketExists) { + // => "smallest" bucket exists => already initialized => do nothing + // TODO: consider separate bucket structure allocation step + // and state initialization/activation + + b.log.Debug("bucket already exists, initializing state") + + return true, b.syncFullnessCounter() + } + + return false, errors.Wrapf(err, + "(%T) could not create bucket for bounds [%d:%d]", b, lower, upper) + }) + }) +} + +// Close releases all internal database resources. +func (b *Blobovnicza) Close() error { + b.log.Debug("closing BoltDB") + + return b.boltDB.Close() +} diff --git a/pkg/local_object_storage/blobovnicza/delete.go b/pkg/local_object_storage/blobovnicza/delete.go new file mode 100644 index 00000000..e1d069ac --- /dev/null +++ b/pkg/local_object_storage/blobovnicza/delete.go @@ -0,0 +1,69 @@ +package blobovnicza + +import ( + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "go.etcd.io/bbolt" + "go.uber.org/zap" +) + +// DeletePrm groups the parameters of Delete operation. +type DeletePrm struct { + addr *objectSDK.Address +} + +// DeleteRes groups resulting values of Delete operation. +type DeleteRes struct { +} + +// SetAddress sets address of the requested object. +func (p *DeletePrm) SetAddress(addr *objectSDK.Address) { + p.addr = addr +} + +// Delete removes object from Blobovnicza by address. +// +// Returns any error encountered that +// did not allow to completely delete the object. +// +// Returns ErrObjectNotFound if the object to be deleted is not in blobovnicza. +func (b *Blobovnicza) Delete(prm *DeletePrm) (*DeleteRes, error) { + addrKey := addressKey(prm.addr) + + removed := false + + err := b.boltDB.Update(func(tx *bbolt.Tx) error { + return b.iterateBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) { + objData := buck.Get(addrKey) + if objData == nil { + // object is not in bucket => continue iterating + return false, nil + } + + sz := uint64(len(objData)) + + // decrease fullness counter + b.decSize(sz) + + // remove object from the bucket + err := buck.Delete(addrKey) + + if err == nil { + b.log.Debug("object was removed from bucket", + zap.String("binary size", stringifyByteSize(sz)), + zap.String("range", stringifyBounds(lower, upper)), + ) + } + + removed = true + + // stop iteration + return true, err + }) + }) + + if err == nil && !removed { + err = ErrObjectNotFound + } + + return nil, err +} diff --git a/pkg/local_object_storage/blobovnicza/get.go b/pkg/local_object_storage/blobovnicza/get.go new file mode 100644 index 00000000..327f0700 --- /dev/null +++ b/pkg/local_object_storage/blobovnicza/get.go @@ -0,0 +1,78 @@ +package blobovnicza + +import ( + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/pkg/errors" + "go.etcd.io/bbolt" + "go.uber.org/zap" +) + +// GetPrm groups the parameters of Get operation. +type GetPrm struct { + addr *objectSDK.Address +} + +// GetRes groups resulting values of Get operation. +type GetRes struct { + obj *object.Object +} + +// ErrObjectNotFound is returns on read operations requested on a missing object. +var ErrObjectNotFound = errors.New("object not found") + +// SetAddress sets address of the requested object. +func (p *GetPrm) SetAddress(addr *objectSDK.Address) { + p.addr = addr +} + +// Object returns the requested object. +func (p *GetRes) Object() *object.Object { + return p.obj +} + +// Get reads the object from Blobovnicza by address. +// +// Returns any error encountered that +// did not allow to completely read the object. +func (b *Blobovnicza) Get(prm *GetPrm) (*GetRes, error) { + var ( + data []byte + addrKey = addressKey(prm.addr) + ) + + if err := b.boltDB.View(func(tx *bbolt.Tx) error { + return b.iterateBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) { + data = buck.Get(addrKey) + + stop := data != nil + + if stop { + b.log.Debug("object is found in bucket", + zap.String("binary size", stringifyByteSize(uint64(len(data)))), + zap.String("range", stringifyBounds(lower, upper)), + ) + } + + return stop, nil + }) + }); err != nil { + return nil, err + } + + if data == nil { + return nil, ErrObjectNotFound + } + + // TODO: add decompression step + + // unmarshal the object + obj := object.New() + if err := obj.Unmarshal(data); err != nil { + return nil, errors.Wrap(err, "could not unmarshal the object") + } + + return &GetRes{ + obj: obj, + }, nil +} diff --git a/pkg/local_object_storage/blobovnicza/iterate.go b/pkg/local_object_storage/blobovnicza/iterate.go new file mode 100644 index 00000000..cf06282e --- /dev/null +++ b/pkg/local_object_storage/blobovnicza/iterate.go @@ -0,0 +1,46 @@ +package blobovnicza + +import ( + "github.com/pkg/errors" + "go.etcd.io/bbolt" +) + +func (b *Blobovnicza) iterateBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error { + return b.iterateBucketKeys(func(lower uint64, upper uint64, key []byte) (bool, error) { + buck := tx.Bucket(key) + if buck == nil { + // expected to happen: + // - before initialization step (incorrect usage by design) + // - if DB is corrupted (in future this case should be handled) + return false, errors.Errorf("(%T) could not get bucket %s", b, stringifyBounds(lower, upper)) + } + + return f(lower, upper, buck) + }) +} + +func (b *Blobovnicza) iterateBucketKeys(f func(uint64, uint64, []byte) (bool, error)) error { + return b.iterateBounds(func(lower, upper uint64) (bool, error) { + return f(lower, upper, bucketKeyFromBounds(upper)) + }) +} + +func (b *Blobovnicza) iterateBounds(f func(uint64, uint64) (bool, error)) error { + for upper := firstBucketBound; upper <= b.objSizeLimit; upper *= 2 { + var lower uint64 + + if upper == firstBucketBound { + lower = 0 + } else { + lower = upper/2 + 1 + } + + if stop, err := f(lower, upper); err != nil { + return err + } else if stop { + break + } + } + + return nil +} diff --git a/pkg/local_object_storage/blobovnicza/put.go b/pkg/local_object_storage/blobovnicza/put.go new file mode 100644 index 00000000..461fb87a --- /dev/null +++ b/pkg/local_object_storage/blobovnicza/put.go @@ -0,0 +1,108 @@ +package blobovnicza + +import ( + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/pkg/errors" + "go.etcd.io/bbolt" +) + +// PutPrm groups the parameters of Put operation. +type PutPrm struct { + addr *objectSDK.Address + + obj *object.Object + + objData []byte +} + +// PutRes groups resulting values of Put operation. +type PutRes struct { +} + +// ErrFull is returned returned when trying to save an +// object to a filled blobovnicza. +var ErrFull = errors.New("blobovnicza is full") + +var errNilAddress = errors.New("object address is nil") + +// SetAddress sets address of saving object. +func (p *PutPrm) SetAddress(addr *objectSDK.Address) { + p.addr = addr +} + +// SetObject sets the object. +func (p *PutPrm) SetObject(obj *object.Object) { + p.obj = obj +} + +// SetMarshaledObject sets binary representation of the object. +func (p *PutPrm) SetMarshaledObject(data []byte) { + p.objData = data +} + +// Put saves object in Blobovnicza. +// +// If binary representation of the object is not set, +// it is calculated via Marshal method. +// +// The size of the object MUST BE less that or equal to +// the size specified in WithObjectSizeLimit option. +// +// Returns any error encountered that +// did not allow to completely save the object. +// +// Returns ErrFull if blobovnicza is filled. +func (b *Blobovnicza) Put(prm *PutPrm) (*PutRes, error) { + addr := prm.addr + if addr == nil { + if addr = prm.obj.Address(); addr == nil { + return nil, errNilAddress + } + } + + err := b.boltDB.Update(func(tx *bbolt.Tx) error { + if b.full() { + return ErrFull + } + + // marshal the object + data := prm.objData + if data == nil { + var err error + + if data, err = prm.obj.Marshal(); err != nil { + return errors.Wrapf(err, "(%T) could not marshal the object", b) + } + } + // TODO: add compression step + + // calculate size + sz := uint64(len(data)) + + // get bucket for size + buck := tx.Bucket(bucketForSize(sz)) + if buck == nil { + // expected to happen: + // - before initialization step (incorrect usage by design) + // - if DB is corrupted (in future this case should be handled) + return errors.Errorf("(%T) bucket for size %d not created", b, sz) + } + + // save the object in bucket + if err := buck.Put(addressKey(addr), data); err != nil { + return errors.Wrapf(err, "(%T) could not save object in bucket", b) + } + + // increase fullness counter + b.incSize(sz) + + return nil + }) + + return nil, err +} + +func addressKey(addr *objectSDK.Address) []byte { + return []byte(addr.String()) +} diff --git a/pkg/local_object_storage/blobovnicza/sizes.go b/pkg/local_object_storage/blobovnicza/sizes.go new file mode 100644 index 00000000..1bcee4bd --- /dev/null +++ b/pkg/local_object_storage/blobovnicza/sizes.go @@ -0,0 +1,75 @@ +package blobovnicza + +import ( + "encoding/binary" + "fmt" + + "code.cloudfoundry.org/bytefmt" + "github.com/pkg/errors" + "go.etcd.io/bbolt" +) + +const firstBucketBound = uint64(32 * 1 << 10) // 32KB + +func stringifyBounds(lower, upper uint64) string { + return fmt.Sprintf("[%s:%s]", + stringifyByteSize(lower), + stringifyByteSize(upper), + ) +} + +func stringifyByteSize(sz uint64) string { + return bytefmt.ByteSize(sz) +} + +func bucketKeyFromBounds(upperBound uint64) []byte { + buf := make([]byte, binary.MaxVarintLen64) + + ln := binary.PutUvarint(buf, upperBound) + + return buf[:ln] +} + +func bucketForSize(sz uint64) []byte { + var upperBound uint64 + + for upperBound = firstBucketBound; upperBound < sz; upperBound *= 2 { + } + + return bucketKeyFromBounds(upperBound) +} + +func (b *Blobovnicza) incSize(sz uint64) { + b.filled.Add(sz) +} + +func (b *Blobovnicza) decSize(sz uint64) { + b.filled.Sub(sz) +} + +func (b *Blobovnicza) full() bool { + return b.filled.Load() >= b.fullSizeLimit +} + +func (b *Blobovnicza) syncFullnessCounter() error { + return errors.Wrap(b.boltDB.View(func(tx *bbolt.Tx) error { + sz := uint64(0) + + if err := b.iterateBucketKeys(func(lower, upper uint64, key []byte) (bool, error) { + buck := tx.Bucket(key) + if buck == nil { + return false, errors.Errorf("bucket not found %s", stringifyBounds(lower, upper)) + } + + sz += uint64(buck.Stats().KeyN) * (upper - lower) + + return false, nil + }); err != nil { + return err + } + + b.filled.Store(sz) + + return nil + }), "(%T) could not sync fullness counter") +} diff --git a/pkg/local_object_storage/blobovnicza/sizes_test.go b/pkg/local_object_storage/blobovnicza/sizes_test.go new file mode 100644 index 00000000..c218dc63 --- /dev/null +++ b/pkg/local_object_storage/blobovnicza/sizes_test.go @@ -0,0 +1,38 @@ +package blobovnicza + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSizes(t *testing.T) { + for _, item := range []struct { + sz uint64 // object size + + upperBound uint64 // upper bound of expected range + }{ + { + sz: 0, + upperBound: firstBucketBound, + }, + { + sz: firstBucketBound, + upperBound: firstBucketBound, + }, + { + sz: firstBucketBound + 1, + upperBound: 2 * firstBucketBound, + }, + { + sz: 2 * firstBucketBound, + upperBound: 2 * firstBucketBound, + }, + { + sz: 2*firstBucketBound + 1, + upperBound: 4 * firstBucketBound, + }, + } { + require.Equal(t, bucketKeyFromBounds(item.upperBound), bucketForSize(item.sz)) + } +}