From 4c3d231e8226d4145409be2d2fa20396f03afe97 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Mon, 30 Nov 2020 11:11:37 +0300 Subject: [PATCH] [#218] blobstor: Implement blobovnicza tree Signed-off-by: Leonard Lyubich --- .../blobstor/blobovnicza.go | 642 ++++++++++++++++++ .../blobstor/blobovnicza_test.go | 134 ++++ 2 files changed, 776 insertions(+) create mode 100644 pkg/local_object_storage/blobstor/blobovnicza.go create mode 100644 pkg/local_object_storage/blobstor/blobovnicza_test.go diff --git a/pkg/local_object_storage/blobstor/blobovnicza.go b/pkg/local_object_storage/blobstor/blobovnicza.go new file mode 100644 index 000000000..f694eb5b5 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobovnicza.go @@ -0,0 +1,642 @@ +package blobstor + +import ( + "fmt" + "path" + "strconv" + "sync" + + lru "github.com/hashicorp/golang-lru" + "github.com/nspcc-dev/hrw" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +// represents storage of the "small" objects. +// +// Each object is stored in Blobovnicza's (B-s). +// B-s are structured in a multilevel directory hierarchy +// with fixed depth and width (configured by BlobStor). +// +// Example (width = 4, depth = 3): +// +// x===============================x +// |[0] [1] [2] [3]| +// | \ / | +// | \ / | +// | \ / | +// | \ / | +// |[0] [1] [2] [3]| +// | | / | +// | | / | +// | | / | +// | | / | +// |[0](F) [1](A) [X] [X]| +// x===============================x +// +// Elements of the deepest level are B-s. +// B-s are allocated dynamically. At each moment of the time there is +// an active B (ex. A), set of already filled B-s (ex. F) and +// list of not yet initialized B-s (ex. X). After filling the active B +// it becomes full, and next B becomes initialized and active. +// +// Active B and some of the full B-s are cached (LRU). All cached +// B-s are intitialized and opened. +// +// Object is saved as follows: +// 1. at each level, according to HRW, the next one is selected and +// dives into it until we reach the deepest; +// 2. at the B-s level object is saved to the active B. If active B +// is full, next B is opened, initialized and cached. If there +// is no more X candidates, goto 1 and process next level. +// +// After the object is saved in B, path concatenation is returned +// in system path format as B identifier (ex. "0/1/1" or "3/2/1"). +type blobovniczas struct { + *cfg + + // cache of opened filled blobovniczas + opened *lru.Cache + + // list of active (opened, non-filled) blobovniczas + activeMtx sync.RWMutex + active map[string]blobovniczaWithIndex +} + +type blobovniczaWithIndex struct { + ind uint64 + + blz *blobovnicza.Blobovnicza +} + +var errPutFailed = errors.New("could not save the object in any blobovnicza") + +func newBlobovniczaTree(c *cfg) (blz *blobovniczas) { + cache, err := lru.NewWithEvict(c.openedCacheSize, func(key interface{}, value interface{}) { + blz.activeMtx.RLock() + defer blz.activeMtx.RUnlock() + + if _, ok := blz.active[path.Dir(key.(string))]; ok { + return + } else if err := value.(*blobovnicza.Blobovnicza).Close(); err != nil { + c.log.Error("could not close Blobovnicza", + zap.String("id", key.(string)), + zap.String("error", err.Error()), + ) + } else { + c.log.Debug("blobovnicza succesfully closed on evict", + zap.String("id", key.(string)), + ) + } + }) + if err != nil { + // occurs only if the size is not positive + panic(errors.Wrapf(err, "could not create LRU cache of size %d", c.openedCacheSize)) + } + + cp := uint64(1) + for i := uint64(0); i < c.blzShallowDepth; i++ { + cp *= c.blzShallowWidth + } + + return &blobovniczas{ + cfg: c, + opened: cache, + active: make(map[string]blobovniczaWithIndex, cp), + } +} + +// makes slice of uint64 values from 0 to number-1. +func indexSlice(number uint64) []uint64 { + s := make([]uint64, number) + + for i := range s { + s[i] = uint64(i) + } + + return s +} + +// save object in the maximum weight blobobnicza. +// +// returns error if could not save object in any blobovnicza. +func (b *blobovniczas) put(addr *objectSDK.Address, data []byte) (*blobovnicza.ID, error) { + prm := new(blobovnicza.PutPrm) + prm.SetAddress(addr) + prm.SetMarshaledObject(data) + + var ( + fn func(string) (bool, error) + id *blobovnicza.ID + ) + + fn = func(p string) (bool, error) { + active, err := b.getActivated(p) + if err != nil { + b.log.Debug("could not get active blobovnicza", + zap.String("error", err.Error()), + ) + + return false, nil + } + + if _, err := active.blz.Put(prm); err != nil { + // check if blobovnicza is full + if errors.Is(err, blobovnicza.ErrFull) { + b.log.Debug("blobovnicza overflowed", + zap.String("path", path.Join(p, u64ToHexString(active.ind))), + ) + + if err := b.updateActive(p, &active.ind); err != nil { + b.log.Debug("could not update active blobovnicza", + zap.String("level", p), + zap.String("error", err.Error()), + ) + + return false, nil + } + + return fn(p) + } + + b.log.Debug("could not put object to active blobovnicza", + zap.String("path", path.Join(p, u64ToHexString(active.ind))), + zap.String("error", err.Error()), + ) + + return false, nil + } + + p = path.Join(p, u64ToHexString(active.ind)) + + id = blobovnicza.NewIDFromBytes([]byte(p)) + + b.log.Debug("object successfully saved in active blobovnicza", + zap.String("path", p), + zap.Stringer("addr", addr), + ) + + return true, nil + } + + if err := b.iterateDeepest(addr, fn); err != nil { + return nil, err + } else if id == nil { + return nil, errPutFailed + } + + return id, nil +} + +// reads object from blobovnicza tree. +// +// If blobocvnicza ID is specified, only this blobovnicza is processed. +// Otherwise, all blobovniczas are processed descending weight. +func (b *blobovniczas) get(prm *GetSmallPrm) (res *GetSmallRes, err error) { + bPrm := new(blobovnicza.GetPrm) + bPrm.SetAddress(prm.addr) + + if prm.blobovniczaID != nil { + blz, err := b.openBlobovnicza(prm.blobovniczaID.String()) + if err != nil { + return nil, err + } + + return b.getObject(blz, bPrm) + } + + activeCache := make(map[string]struct{}) + + err = b.iterateSortedLeaves(prm.addr, func(p string) (bool, error) { + dirPath := path.Dir(p) + + _, ok := activeCache[dirPath] + + res, err = b.getObjectFromLevel(bPrm, p, !ok) + if err != nil { + if !errors.Is(err, ErrObjectNotFound) { + b.log.Debug("could not get object from level", + zap.String("level", p), + zap.String("error", err.Error()), + ) + } + } + + activeCache[dirPath] = struct{}{} + + // abort iterator if found, otherwise process all blobovniczas + return err == nil, nil + }) + + if err == nil && res == nil { + // not found in any blobovnicza + err = ErrObjectNotFound + } + + return +} + +// removes object from blobovnicza tree. +// +// If blobocvnicza ID is specified, only this blobovnicza is processed. +// Otherwise, all blobovniczas are processed descending weight. +// +// TODO:quite similar to GET, can be unified +func (b *blobovniczas) delete(prm *DeleteSmallPrm) (res *DeleteSmallRes, err error) { + bPrm := new(blobovnicza.DeletePrm) + bPrm.SetAddress(prm.addr) + + if prm.blobovniczaID != nil { + blz, err := b.openBlobovnicza(prm.blobovniczaID.String()) + if err != nil { + return nil, err + } + + return b.deleteObject(blz, bPrm) + } + + activeCache := make(map[string]struct{}) + + err = b.iterateSortedLeaves(prm.addr, func(p string) (bool, error) { + dirPath := path.Dir(p) + + // don't process active blobovnicza of the level twice + _, ok := activeCache[dirPath] + + res, err = b.deleteObjectFromLevel(bPrm, p, !ok) + if err != nil { + if !errors.Is(err, ErrObjectNotFound) { + b.log.Debug("could not remove object from level", + zap.String("level", p), + zap.String("error", err.Error()), + ) + } + } + + activeCache[dirPath] = struct{}{} + + if err == nil { + res = new(DeleteSmallRes) + } + + // abort iterator if found, otherwise process all blobovniczas + return err == nil, nil + }) + + if err == nil && res == nil { + // not found in any blobovnicza + err = ErrObjectNotFound + } + + return +} + +// tries to delete object from particular blobovnicza. +// +// returns no error if object was removed from some blobovnicza of the same level. +func (b *blobovniczas) deleteObjectFromLevel(prm *blobovnicza.DeletePrm, blzPath string, tryActive bool) (*DeleteSmallRes, error) { + lvlPath := path.Dir(blzPath) + + log := b.log.With( + zap.String("path", blzPath), + ) + + // try to remove from blobovnicza if it is opened + v, ok := b.opened.Get(blzPath) + if ok { + if res, err := b.deleteObject(v.(*blobovnicza.Blobovnicza), prm); err == nil { + return res, err + } else if !errors.Is(err, ErrObjectNotFound) { + log.Debug("could not remove object from opened blobovnicza", + zap.String("error", err.Error()), + ) + } + } + + // therefore the object is possibly placed in a lighter blobovnicza + + // next we check in the active level blobobnicza: + // * the active blobovnicza is always opened. + b.activeMtx.RLock() + active, ok := b.active[lvlPath] + b.activeMtx.RUnlock() + + if ok && tryActive { + if res, err := b.deleteObject(active.blz, prm); err == nil { + return res, err + } else if !errors.Is(err, ErrObjectNotFound) { + log.Debug("could not remove object from active blobovnicza", + zap.String("error", err.Error()), + ) + } + } + + // then object is possibly placed in closed blobovnicza + + // check if it makes sense to try to open the blob + // (blobovniczas "after" the active one are empty anyway, + // and it's pointless to open them). + if u64FromHexString(path.Base(blzPath)) > active.ind { + log.Debug("index is too big") + return nil, ErrObjectNotFound + } + + // open blobovnicza (cached inside) + blz, err := b.openBlobovnicza(blzPath) + if err != nil { + return nil, err + } + + return b.deleteObject(blz, prm) +} + +// tries to read object from particular blobovnicza. +// +// returns error if object could not be read from any blobovnicza of the same level. +func (b *blobovniczas) getObjectFromLevel(prm *blobovnicza.GetPrm, blzPath string, tryActive bool) (*GetSmallRes, error) { + lvlPath := path.Dir(blzPath) + + log := b.log.With( + zap.String("path", blzPath), + ) + + // try to read from blobovnicza if it is opened + v, ok := b.opened.Get(blzPath) + if ok { + if res, err := b.getObject(v.(*blobovnicza.Blobovnicza), prm); err == nil { + return res, err + } else if !errors.Is(err, ErrObjectNotFound) { + log.Debug("could not read object from opened blobovnicza", + zap.String("error", err.Error()), + ) + } + } + + // therefore the object is possibly placed in a lighter blobovnicza + + // next we check in the active level blobobnicza: + // * the freshest objects are probably the most demanded; + // * the active blobovnicza is always opened. + b.activeMtx.RLock() + active, ok := b.active[lvlPath] + b.activeMtx.RUnlock() + + if ok && tryActive { + if res, err := b.getObject(active.blz, prm); err == nil { + return res, err + } else if !errors.Is(err, ErrObjectNotFound) { + log.Debug("could not get object from active blobovnicza", + zap.String("error", err.Error()), + ) + } + } + + // then object is possibly placed in closed blobovnicza + + // check if it makes sense to try to open the blob + // (blobovniczas "after" the active one are empty anyway, + // and it's pointless to open them). + if u64FromHexString(path.Base(blzPath)) > active.ind { + log.Debug("index is too big") + return nil, ErrObjectNotFound + } + + // open blobovnicza (cached inside) + blz, err := b.openBlobovnicza(blzPath) + if err != nil { + return nil, err + } + + return b.getObject(blz, prm) +} + +// removes object from blobovnicza and returns DeleteSmallRes. +func (b *blobovniczas) deleteObject(blz *blobovnicza.Blobovnicza, prm *blobovnicza.DeletePrm) (*DeleteSmallRes, error) { + _, err := blz.Delete(prm) + if err != nil { + if errors.Is(err, blobovnicza.ErrObjectNotFound) { + err = ErrObjectNotFound + } + + return nil, err + } + + return new(DeleteSmallRes), nil +} + +// reads object from blobovnicza and returns GetSmallRes. +func (b *blobovniczas) getObject(blz *blobovnicza.Blobovnicza, prm *blobovnicza.GetPrm) (*GetSmallRes, error) { + res, err := blz.Get(prm) + if err != nil { + if errors.Is(err, blobovnicza.ErrObjectNotFound) { + err = ErrObjectNotFound + } + + return nil, err + } + + return &GetSmallRes{ + roObject: roObject{ + obj: res.Object(), + }, + }, nil +} + +// iterator over the paths of blobovniczas in random order. +func (b *blobovniczas) iterateLeaves(f func(string) (bool, error)) error { + return b.iterateSortedLeaves(nil, f) +} + +// iterator over the paths of blobovniczas sorted by weight. +func (b *blobovniczas) iterateSortedLeaves(addr *objectSDK.Address, f func(string) (bool, error)) error { + _, err := b.iterateSorted( + addr, + make([]string, 0, b.blzShallowDepth), + b.blzShallowDepth, + func(p []string) (bool, error) { return f(path.Join(p...)) }, + ) + + return err +} + +// iterator over directories with blobovniczas sorted by weight. +func (b *blobovniczas) iterateDeepest(addr *objectSDK.Address, f func(string) (bool, error)) error { + _, err := b.iterateSorted( + addr, + make([]string, 0, b.blzShallowDepth-1), + b.blzShallowDepth-1, + func(p []string) (bool, error) { return f(path.Join(p...)) }, + ) + + return err +} + +// iterator over particular level of directories. +func (b *blobovniczas) iterateSorted(addr *objectSDK.Address, curPath []string, execDepth uint64, f func([]string) (bool, error)) (bool, error) { + indices := indexSlice(b.blzShallowWidth) + + hrw.SortSliceByValue(indices, addressHash(addr, path.Join(curPath...))) + + exec := uint64(len(curPath)) == execDepth + + for i := range indices { + if i == 0 { + curPath = append(curPath, u64ToHexString(indices[i])) + } else { + curPath[len(curPath)-1] = u64ToHexString(indices[i]) + } + + if exec { + if stop, err := f(curPath); err != nil { + return false, err + } else if stop { + return true, nil + } + } else if stop, err := b.iterateSorted(addr, curPath, execDepth, f); err != nil { + return false, err + } else if stop { + return true, nil + } + } + + return false, nil +} + +// activates and returns activated blobovnicza of p-level (dir). +// +// returns error if blobvnicza could not be activated. +func (b *blobovniczas) getActivated(p string) (blobovniczaWithIndex, error) { + b.activeMtx.Lock() + defer b.activeMtx.Unlock() + + // try again + return b.updateAndGet(p, nil) +} + +// updates active blobovnicza of p-level (dir). +// +// if current active blobovnicza's index is not old, it remains unchanged. +func (b *blobovniczas) updateActive(p string, old *uint64) error { + log := b.log.With(zap.String("path", p)) + + log.Debug("updating active blobovnicza...") + + b.activeMtx.Lock() + _, err := b.updateAndGet(p, old) + b.activeMtx.Unlock() + + log.Debug("active blobovnicza successfully updated") + + return err +} + +// updates and returns active blobovnicza of p-level (dir). +// +// if current active blobovnicza's index is not old, it is returned unchanged. +func (b *blobovniczas) updateAndGet(p string, old *uint64) (blobovniczaWithIndex, error) { + active, ok := b.active[p] + if ok { + if old != nil { + if active.ind == b.blzShallowWidth-1 { + return active, errors.New("no more blobovniczas") + } else if active.ind != *old { + // sort of CAS in order to control concurrent + // updateActive calls + return active, nil + } + } else { + return active, nil + } + + active.ind++ + } + + var err error + if active.blz, err = b.openBlobovnicza(path.Join(p, u64ToHexString(active.ind))); err != nil { + return active, err + } + + // remove from opened cache (active blobovnicza should always be opened) + b.opened.Remove(p) + b.active[p] = active + + b.log.Debug("blobovnicza succesfully activated", + zap.String("path", path.Join(p, u64ToHexString(active.ind))), + ) + + return active, nil +} + +// initializes blobovnicza tree. +// +// Should be called exactly once. +func (b *blobovniczas) init() error { + b.log.Debug("initializing Blobovnicza's") + + return b.iterateLeaves(func(p string) (bool, error) { + blz, err := b.openBlobovnicza(p) + if err != nil { + return false, errors.Wrapf(err, "could not open blobovnicza %s", p) + } else if err := blz.Init(); err != nil { + return false, errors.Wrapf(err, "could not initialize blobovnicza structure %s", p) + } + + log := b.log.With(zap.String("id", p)) + + log.Debug("blobovnicza successfully initialized, closing...") + + return false, nil + }) +} + +// opens and returns blobovnicza with path p. +// +// If blobovnicza is already opened and cached, instance from cache is returned w/o changes. +func (b *blobovniczas) openBlobovnicza(p string) (*blobovnicza.Blobovnicza, error) { + v, ok := b.opened.Get(p) + if ok { + // blobovnicza should be opened in cache + return v.(*blobovnicza.Blobovnicza), nil + } + + blz := blobovnicza.New(append(b.blzOpts, + blobovnicza.WithPath(path.Join(b.blzRootPath, p)), + )...) + + if err := blz.Open(); err != nil { + return nil, errors.Wrapf(err, "could not open blobovnicza %s", p) + } + + b.opened.Add(p, blz) + + return blz, nil +} + +// returns hash of the object address. +func addressHash(addr *objectSDK.Address, path string) uint64 { + var a string + + if addr != nil { + a = addr.String() + } + + return hrw.Hash([]byte(a + path)) +} + +// converts uint64 to hex string. +func u64ToHexString(ind uint64) string { + return strconv.FormatUint(ind, 16) +} + +// converts uint64 hex string to uint64. +func u64FromHexString(str string) uint64 { + v, err := strconv.ParseUint(str, 16, 64) + if err != nil { + panic(fmt.Sprintf("blobovnicza name is not an index %s", str)) + } + + return v + +} diff --git a/pkg/local_object_storage/blobstor/blobovnicza_test.go b/pkg/local_object_storage/blobstor/blobovnicza_test.go new file mode 100644 index 000000000..a27000487 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobovnicza_test.go @@ -0,0 +1,134 @@ +package blobstor + +import ( + "crypto/rand" + "crypto/sha256" + "errors" + "os" + "testing" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/util/logger/test" + "github.com/stretchr/testify/require" +) + +func testSHA256() (h [sha256.Size]byte) { + rand.Read(h[:]) + + return h +} + +func testAddress() *objectSDK.Address { + cid := container.NewID() + cid.SetSHA256(testSHA256()) + + oid := objectSDK.NewID() + oid.SetSHA256(testSHA256()) + + addr := objectSDK.NewAddress() + addr.SetObjectID(oid) + addr.SetContainerID(cid) + + return addr +} + +func testObject(sz uint64) *object.Object { + raw := object.NewRaw() + + addr := testAddress() + raw.SetID(addr.ObjectID()) + raw.SetContainerID(addr.ContainerID()) + + raw.SetPayload(make([]byte, sz)) + + // fit the binary size to the required + data, _ := raw.Marshal() + if ln := uint64(len(data)); ln > sz { + raw.SetPayload(raw.Payload()[:sz-(ln-sz)]) + } + + raw.SetAttributes() // for require.Equal + + return raw.Object() +} + +func TestBlobovniczas(t *testing.T) { + l := test.NewLogger(false) + p := "./test_blz" + + c := defaultCfg() + + var width, depth, szLim uint64 = 2, 2, 2 << 10 + + for _, opt := range []Option{ + WithLogger(l), + WithSmallSizeLimit(szLim), + WithBlobovniczaShallowWidth(width), + WithBlobovniczaShallowDepth(depth), + WithBlobovniczaRootPath(p), + WithBlobovniczaSize(szLim), + } { + opt(c) + } + + c.blzRootPath = p + + b := newBlobovniczaTree(c) + + defer os.RemoveAll(p) + + require.NoError(t, b.init()) + + objSz := uint64(szLim / 2) + + addrList := make([]*objectSDK.Address, 0) + minFitObjNum := 2 * 2 * szLim / objSz + + for i := uint64(0); i < minFitObjNum; i++ { + obj := testObject(objSz) + addrList = append(addrList, obj.Address()) + + d, err := obj.Marshal() + require.NoError(t, err) + + // save object in blobovnicza + id, err := b.put(obj.Address(), d) + require.NoError(t, err) + + // get w/ blobovnicza ID + prm := new(GetSmallPrm) + prm.SetBlobovniczaID(id) + prm.SetAddress(obj.Address()) + + res, err := b.get(prm) + require.NoError(t, err) + require.Equal(t, obj, res.Object()) + + // get w/o blobovnicza ID + prm.SetBlobovniczaID(nil) + + res, err = b.get(prm) + require.NoError(t, err) + require.Equal(t, obj, res.Object()) + } + + dPrm := new(DeleteSmallPrm) + gPrm := new(GetSmallPrm) + + for i := range addrList { + dPrm.SetAddress(addrList[i]) + + _, err := b.delete(dPrm) + require.NoError(t, err) + + gPrm.SetAddress(addrList[i]) + + _, err = b.get(gPrm) + require.True(t, errors.Is(err, ErrObjectNotFound)) + + _, err = b.delete(dPrm) + require.True(t, errors.Is(err, ErrObjectNotFound)) + } +}