forked from TrueCloudLab/frostfs-node
[#218] blobstor: Implement blobovnicza tree
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
3acbd3011b
commit
4c3d231e82
2 changed files with 776 additions and 0 deletions
642
pkg/local_object_storage/blobstor/blobovnicza.go
Normal file
642
pkg/local_object_storage/blobstor/blobovnicza.go
Normal file
|
@ -0,0 +1,642 @@
|
|||
package blobstor
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
"github.com/nspcc-dev/hrw"
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// represents storage of the "small" objects.
|
||||
//
|
||||
// Each object is stored in Blobovnicza's (B-s).
|
||||
// B-s are structured in a multilevel directory hierarchy
|
||||
// with fixed depth and width (configured by BlobStor).
|
||||
//
|
||||
// Example (width = 4, depth = 3):
|
||||
//
|
||||
// x===============================x
|
||||
// |[0] [1] [2] [3]|
|
||||
// | \ / |
|
||||
// | \ / |
|
||||
// | \ / |
|
||||
// | \ / |
|
||||
// |[0] [1] [2] [3]|
|
||||
// | | / |
|
||||
// | | / |
|
||||
// | | / |
|
||||
// | | / |
|
||||
// |[0](F) [1](A) [X] [X]|
|
||||
// x===============================x
|
||||
//
|
||||
// Elements of the deepest level are B-s.
|
||||
// B-s are allocated dynamically. At each moment of the time there is
|
||||
// an active B (ex. A), set of already filled B-s (ex. F) and
|
||||
// list of not yet initialized B-s (ex. X). After filling the active B
|
||||
// it becomes full, and next B becomes initialized and active.
|
||||
//
|
||||
// Active B and some of the full B-s are cached (LRU). All cached
|
||||
// B-s are intitialized and opened.
|
||||
//
|
||||
// Object is saved as follows:
|
||||
// 1. at each level, according to HRW, the next one is selected and
|
||||
// dives into it until we reach the deepest;
|
||||
// 2. at the B-s level object is saved to the active B. If active B
|
||||
// is full, next B is opened, initialized and cached. If there
|
||||
// is no more X candidates, goto 1 and process next level.
|
||||
//
|
||||
// After the object is saved in B, path concatenation is returned
|
||||
// in system path format as B identifier (ex. "0/1/1" or "3/2/1").
|
||||
type blobovniczas struct {
|
||||
*cfg
|
||||
|
||||
// cache of opened filled blobovniczas
|
||||
opened *lru.Cache
|
||||
|
||||
// list of active (opened, non-filled) blobovniczas
|
||||
activeMtx sync.RWMutex
|
||||
active map[string]blobovniczaWithIndex
|
||||
}
|
||||
|
||||
type blobovniczaWithIndex struct {
|
||||
ind uint64
|
||||
|
||||
blz *blobovnicza.Blobovnicza
|
||||
}
|
||||
|
||||
var errPutFailed = errors.New("could not save the object in any blobovnicza")
|
||||
|
||||
func newBlobovniczaTree(c *cfg) (blz *blobovniczas) {
|
||||
cache, err := lru.NewWithEvict(c.openedCacheSize, func(key interface{}, value interface{}) {
|
||||
blz.activeMtx.RLock()
|
||||
defer blz.activeMtx.RUnlock()
|
||||
|
||||
if _, ok := blz.active[path.Dir(key.(string))]; ok {
|
||||
return
|
||||
} else if err := value.(*blobovnicza.Blobovnicza).Close(); err != nil {
|
||||
c.log.Error("could not close Blobovnicza",
|
||||
zap.String("id", key.(string)),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
} else {
|
||||
c.log.Debug("blobovnicza succesfully closed on evict",
|
||||
zap.String("id", key.(string)),
|
||||
)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
// occurs only if the size is not positive
|
||||
panic(errors.Wrapf(err, "could not create LRU cache of size %d", c.openedCacheSize))
|
||||
}
|
||||
|
||||
cp := uint64(1)
|
||||
for i := uint64(0); i < c.blzShallowDepth; i++ {
|
||||
cp *= c.blzShallowWidth
|
||||
}
|
||||
|
||||
return &blobovniczas{
|
||||
cfg: c,
|
||||
opened: cache,
|
||||
active: make(map[string]blobovniczaWithIndex, cp),
|
||||
}
|
||||
}
|
||||
|
||||
// makes slice of uint64 values from 0 to number-1.
|
||||
func indexSlice(number uint64) []uint64 {
|
||||
s := make([]uint64, number)
|
||||
|
||||
for i := range s {
|
||||
s[i] = uint64(i)
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// save object in the maximum weight blobobnicza.
|
||||
//
|
||||
// returns error if could not save object in any blobovnicza.
|
||||
func (b *blobovniczas) put(addr *objectSDK.Address, data []byte) (*blobovnicza.ID, error) {
|
||||
prm := new(blobovnicza.PutPrm)
|
||||
prm.SetAddress(addr)
|
||||
prm.SetMarshaledObject(data)
|
||||
|
||||
var (
|
||||
fn func(string) (bool, error)
|
||||
id *blobovnicza.ID
|
||||
)
|
||||
|
||||
fn = func(p string) (bool, error) {
|
||||
active, err := b.getActivated(p)
|
||||
if err != nil {
|
||||
b.log.Debug("could not get active blobovnicza",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if _, err := active.blz.Put(prm); err != nil {
|
||||
// check if blobovnicza is full
|
||||
if errors.Is(err, blobovnicza.ErrFull) {
|
||||
b.log.Debug("blobovnicza overflowed",
|
||||
zap.String("path", path.Join(p, u64ToHexString(active.ind))),
|
||||
)
|
||||
|
||||
if err := b.updateActive(p, &active.ind); err != nil {
|
||||
b.log.Debug("could not update active blobovnicza",
|
||||
zap.String("level", p),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return fn(p)
|
||||
}
|
||||
|
||||
b.log.Debug("could not put object to active blobovnicza",
|
||||
zap.String("path", path.Join(p, u64ToHexString(active.ind))),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
p = path.Join(p, u64ToHexString(active.ind))
|
||||
|
||||
id = blobovnicza.NewIDFromBytes([]byte(p))
|
||||
|
||||
b.log.Debug("object successfully saved in active blobovnicza",
|
||||
zap.String("path", p),
|
||||
zap.Stringer("addr", addr),
|
||||
)
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if err := b.iterateDeepest(addr, fn); err != nil {
|
||||
return nil, err
|
||||
} else if id == nil {
|
||||
return nil, errPutFailed
|
||||
}
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// reads object from blobovnicza tree.
|
||||
//
|
||||
// If blobocvnicza ID is specified, only this blobovnicza is processed.
|
||||
// Otherwise, all blobovniczas are processed descending weight.
|
||||
func (b *blobovniczas) get(prm *GetSmallPrm) (res *GetSmallRes, err error) {
|
||||
bPrm := new(blobovnicza.GetPrm)
|
||||
bPrm.SetAddress(prm.addr)
|
||||
|
||||
if prm.blobovniczaID != nil {
|
||||
blz, err := b.openBlobovnicza(prm.blobovniczaID.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return b.getObject(blz, bPrm)
|
||||
}
|
||||
|
||||
activeCache := make(map[string]struct{})
|
||||
|
||||
err = b.iterateSortedLeaves(prm.addr, func(p string) (bool, error) {
|
||||
dirPath := path.Dir(p)
|
||||
|
||||
_, ok := activeCache[dirPath]
|
||||
|
||||
res, err = b.getObjectFromLevel(bPrm, p, !ok)
|
||||
if err != nil {
|
||||
if !errors.Is(err, ErrObjectNotFound) {
|
||||
b.log.Debug("could not get object from level",
|
||||
zap.String("level", p),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
activeCache[dirPath] = struct{}{}
|
||||
|
||||
// abort iterator if found, otherwise process all blobovniczas
|
||||
return err == nil, nil
|
||||
})
|
||||
|
||||
if err == nil && res == nil {
|
||||
// not found in any blobovnicza
|
||||
err = ErrObjectNotFound
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// removes object from blobovnicza tree.
|
||||
//
|
||||
// If blobocvnicza ID is specified, only this blobovnicza is processed.
|
||||
// Otherwise, all blobovniczas are processed descending weight.
|
||||
//
|
||||
// TODO:quite similar to GET, can be unified
|
||||
func (b *blobovniczas) delete(prm *DeleteSmallPrm) (res *DeleteSmallRes, err error) {
|
||||
bPrm := new(blobovnicza.DeletePrm)
|
||||
bPrm.SetAddress(prm.addr)
|
||||
|
||||
if prm.blobovniczaID != nil {
|
||||
blz, err := b.openBlobovnicza(prm.blobovniczaID.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return b.deleteObject(blz, bPrm)
|
||||
}
|
||||
|
||||
activeCache := make(map[string]struct{})
|
||||
|
||||
err = b.iterateSortedLeaves(prm.addr, func(p string) (bool, error) {
|
||||
dirPath := path.Dir(p)
|
||||
|
||||
// don't process active blobovnicza of the level twice
|
||||
_, ok := activeCache[dirPath]
|
||||
|
||||
res, err = b.deleteObjectFromLevel(bPrm, p, !ok)
|
||||
if err != nil {
|
||||
if !errors.Is(err, ErrObjectNotFound) {
|
||||
b.log.Debug("could not remove object from level",
|
||||
zap.String("level", p),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
activeCache[dirPath] = struct{}{}
|
||||
|
||||
if err == nil {
|
||||
res = new(DeleteSmallRes)
|
||||
}
|
||||
|
||||
// abort iterator if found, otherwise process all blobovniczas
|
||||
return err == nil, nil
|
||||
})
|
||||
|
||||
if err == nil && res == nil {
|
||||
// not found in any blobovnicza
|
||||
err = ErrObjectNotFound
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// tries to delete object from particular blobovnicza.
|
||||
//
|
||||
// returns no error if object was removed from some blobovnicza of the same level.
|
||||
func (b *blobovniczas) deleteObjectFromLevel(prm *blobovnicza.DeletePrm, blzPath string, tryActive bool) (*DeleteSmallRes, error) {
|
||||
lvlPath := path.Dir(blzPath)
|
||||
|
||||
log := b.log.With(
|
||||
zap.String("path", blzPath),
|
||||
)
|
||||
|
||||
// try to remove from blobovnicza if it is opened
|
||||
v, ok := b.opened.Get(blzPath)
|
||||
if ok {
|
||||
if res, err := b.deleteObject(v.(*blobovnicza.Blobovnicza), prm); err == nil {
|
||||
return res, err
|
||||
} else if !errors.Is(err, ErrObjectNotFound) {
|
||||
log.Debug("could not remove object from opened blobovnicza",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// therefore the object is possibly placed in a lighter blobovnicza
|
||||
|
||||
// next we check in the active level blobobnicza:
|
||||
// * the active blobovnicza is always opened.
|
||||
b.activeMtx.RLock()
|
||||
active, ok := b.active[lvlPath]
|
||||
b.activeMtx.RUnlock()
|
||||
|
||||
if ok && tryActive {
|
||||
if res, err := b.deleteObject(active.blz, prm); err == nil {
|
||||
return res, err
|
||||
} else if !errors.Is(err, ErrObjectNotFound) {
|
||||
log.Debug("could not remove object from active blobovnicza",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// then object is possibly placed in closed blobovnicza
|
||||
|
||||
// check if it makes sense to try to open the blob
|
||||
// (blobovniczas "after" the active one are empty anyway,
|
||||
// and it's pointless to open them).
|
||||
if u64FromHexString(path.Base(blzPath)) > active.ind {
|
||||
log.Debug("index is too big")
|
||||
return nil, ErrObjectNotFound
|
||||
}
|
||||
|
||||
// open blobovnicza (cached inside)
|
||||
blz, err := b.openBlobovnicza(blzPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return b.deleteObject(blz, prm)
|
||||
}
|
||||
|
||||
// tries to read object from particular blobovnicza.
|
||||
//
|
||||
// returns error if object could not be read from any blobovnicza of the same level.
|
||||
func (b *blobovniczas) getObjectFromLevel(prm *blobovnicza.GetPrm, blzPath string, tryActive bool) (*GetSmallRes, error) {
|
||||
lvlPath := path.Dir(blzPath)
|
||||
|
||||
log := b.log.With(
|
||||
zap.String("path", blzPath),
|
||||
)
|
||||
|
||||
// try to read from blobovnicza if it is opened
|
||||
v, ok := b.opened.Get(blzPath)
|
||||
if ok {
|
||||
if res, err := b.getObject(v.(*blobovnicza.Blobovnicza), prm); err == nil {
|
||||
return res, err
|
||||
} else if !errors.Is(err, ErrObjectNotFound) {
|
||||
log.Debug("could not read object from opened blobovnicza",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// therefore the object is possibly placed in a lighter blobovnicza
|
||||
|
||||
// next we check in the active level blobobnicza:
|
||||
// * the freshest objects are probably the most demanded;
|
||||
// * the active blobovnicza is always opened.
|
||||
b.activeMtx.RLock()
|
||||
active, ok := b.active[lvlPath]
|
||||
b.activeMtx.RUnlock()
|
||||
|
||||
if ok && tryActive {
|
||||
if res, err := b.getObject(active.blz, prm); err == nil {
|
||||
return res, err
|
||||
} else if !errors.Is(err, ErrObjectNotFound) {
|
||||
log.Debug("could not get object from active blobovnicza",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// then object is possibly placed in closed blobovnicza
|
||||
|
||||
// check if it makes sense to try to open the blob
|
||||
// (blobovniczas "after" the active one are empty anyway,
|
||||
// and it's pointless to open them).
|
||||
if u64FromHexString(path.Base(blzPath)) > active.ind {
|
||||
log.Debug("index is too big")
|
||||
return nil, ErrObjectNotFound
|
||||
}
|
||||
|
||||
// open blobovnicza (cached inside)
|
||||
blz, err := b.openBlobovnicza(blzPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return b.getObject(blz, prm)
|
||||
}
|
||||
|
||||
// removes object from blobovnicza and returns DeleteSmallRes.
|
||||
func (b *blobovniczas) deleteObject(blz *blobovnicza.Blobovnicza, prm *blobovnicza.DeletePrm) (*DeleteSmallRes, error) {
|
||||
_, err := blz.Delete(prm)
|
||||
if err != nil {
|
||||
if errors.Is(err, blobovnicza.ErrObjectNotFound) {
|
||||
err = ErrObjectNotFound
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return new(DeleteSmallRes), nil
|
||||
}
|
||||
|
||||
// reads object from blobovnicza and returns GetSmallRes.
|
||||
func (b *blobovniczas) getObject(blz *blobovnicza.Blobovnicza, prm *blobovnicza.GetPrm) (*GetSmallRes, error) {
|
||||
res, err := blz.Get(prm)
|
||||
if err != nil {
|
||||
if errors.Is(err, blobovnicza.ErrObjectNotFound) {
|
||||
err = ErrObjectNotFound
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &GetSmallRes{
|
||||
roObject: roObject{
|
||||
obj: res.Object(),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// iterator over the paths of blobovniczas in random order.
|
||||
func (b *blobovniczas) iterateLeaves(f func(string) (bool, error)) error {
|
||||
return b.iterateSortedLeaves(nil, f)
|
||||
}
|
||||
|
||||
// iterator over the paths of blobovniczas sorted by weight.
|
||||
func (b *blobovniczas) iterateSortedLeaves(addr *objectSDK.Address, f func(string) (bool, error)) error {
|
||||
_, err := b.iterateSorted(
|
||||
addr,
|
||||
make([]string, 0, b.blzShallowDepth),
|
||||
b.blzShallowDepth,
|
||||
func(p []string) (bool, error) { return f(path.Join(p...)) },
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// iterator over directories with blobovniczas sorted by weight.
|
||||
func (b *blobovniczas) iterateDeepest(addr *objectSDK.Address, f func(string) (bool, error)) error {
|
||||
_, err := b.iterateSorted(
|
||||
addr,
|
||||
make([]string, 0, b.blzShallowDepth-1),
|
||||
b.blzShallowDepth-1,
|
||||
func(p []string) (bool, error) { return f(path.Join(p...)) },
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// iterator over particular level of directories.
|
||||
func (b *blobovniczas) iterateSorted(addr *objectSDK.Address, curPath []string, execDepth uint64, f func([]string) (bool, error)) (bool, error) {
|
||||
indices := indexSlice(b.blzShallowWidth)
|
||||
|
||||
hrw.SortSliceByValue(indices, addressHash(addr, path.Join(curPath...)))
|
||||
|
||||
exec := uint64(len(curPath)) == execDepth
|
||||
|
||||
for i := range indices {
|
||||
if i == 0 {
|
||||
curPath = append(curPath, u64ToHexString(indices[i]))
|
||||
} else {
|
||||
curPath[len(curPath)-1] = u64ToHexString(indices[i])
|
||||
}
|
||||
|
||||
if exec {
|
||||
if stop, err := f(curPath); err != nil {
|
||||
return false, err
|
||||
} else if stop {
|
||||
return true, nil
|
||||
}
|
||||
} else if stop, err := b.iterateSorted(addr, curPath, execDepth, f); err != nil {
|
||||
return false, err
|
||||
} else if stop {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// activates and returns activated blobovnicza of p-level (dir).
|
||||
//
|
||||
// returns error if blobvnicza could not be activated.
|
||||
func (b *blobovniczas) getActivated(p string) (blobovniczaWithIndex, error) {
|
||||
b.activeMtx.Lock()
|
||||
defer b.activeMtx.Unlock()
|
||||
|
||||
// try again
|
||||
return b.updateAndGet(p, nil)
|
||||
}
|
||||
|
||||
// updates active blobovnicza of p-level (dir).
|
||||
//
|
||||
// if current active blobovnicza's index is not old, it remains unchanged.
|
||||
func (b *blobovniczas) updateActive(p string, old *uint64) error {
|
||||
log := b.log.With(zap.String("path", p))
|
||||
|
||||
log.Debug("updating active blobovnicza...")
|
||||
|
||||
b.activeMtx.Lock()
|
||||
_, err := b.updateAndGet(p, old)
|
||||
b.activeMtx.Unlock()
|
||||
|
||||
log.Debug("active blobovnicza successfully updated")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// updates and returns active blobovnicza of p-level (dir).
|
||||
//
|
||||
// if current active blobovnicza's index is not old, it is returned unchanged.
|
||||
func (b *blobovniczas) updateAndGet(p string, old *uint64) (blobovniczaWithIndex, error) {
|
||||
active, ok := b.active[p]
|
||||
if ok {
|
||||
if old != nil {
|
||||
if active.ind == b.blzShallowWidth-1 {
|
||||
return active, errors.New("no more blobovniczas")
|
||||
} else if active.ind != *old {
|
||||
// sort of CAS in order to control concurrent
|
||||
// updateActive calls
|
||||
return active, nil
|
||||
}
|
||||
} else {
|
||||
return active, nil
|
||||
}
|
||||
|
||||
active.ind++
|
||||
}
|
||||
|
||||
var err error
|
||||
if active.blz, err = b.openBlobovnicza(path.Join(p, u64ToHexString(active.ind))); err != nil {
|
||||
return active, err
|
||||
}
|
||||
|
||||
// remove from opened cache (active blobovnicza should always be opened)
|
||||
b.opened.Remove(p)
|
||||
b.active[p] = active
|
||||
|
||||
b.log.Debug("blobovnicza succesfully activated",
|
||||
zap.String("path", path.Join(p, u64ToHexString(active.ind))),
|
||||
)
|
||||
|
||||
return active, nil
|
||||
}
|
||||
|
||||
// initializes blobovnicza tree.
|
||||
//
|
||||
// Should be called exactly once.
|
||||
func (b *blobovniczas) init() error {
|
||||
b.log.Debug("initializing Blobovnicza's")
|
||||
|
||||
return b.iterateLeaves(func(p string) (bool, error) {
|
||||
blz, err := b.openBlobovnicza(p)
|
||||
if err != nil {
|
||||
return false, errors.Wrapf(err, "could not open blobovnicza %s", p)
|
||||
} else if err := blz.Init(); err != nil {
|
||||
return false, errors.Wrapf(err, "could not initialize blobovnicza structure %s", p)
|
||||
}
|
||||
|
||||
log := b.log.With(zap.String("id", p))
|
||||
|
||||
log.Debug("blobovnicza successfully initialized, closing...")
|
||||
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
|
||||
// opens and returns blobovnicza with path p.
|
||||
//
|
||||
// If blobovnicza is already opened and cached, instance from cache is returned w/o changes.
|
||||
func (b *blobovniczas) openBlobovnicza(p string) (*blobovnicza.Blobovnicza, error) {
|
||||
v, ok := b.opened.Get(p)
|
||||
if ok {
|
||||
// blobovnicza should be opened in cache
|
||||
return v.(*blobovnicza.Blobovnicza), nil
|
||||
}
|
||||
|
||||
blz := blobovnicza.New(append(b.blzOpts,
|
||||
blobovnicza.WithPath(path.Join(b.blzRootPath, p)),
|
||||
)...)
|
||||
|
||||
if err := blz.Open(); err != nil {
|
||||
return nil, errors.Wrapf(err, "could not open blobovnicza %s", p)
|
||||
}
|
||||
|
||||
b.opened.Add(p, blz)
|
||||
|
||||
return blz, nil
|
||||
}
|
||||
|
||||
// returns hash of the object address.
|
||||
func addressHash(addr *objectSDK.Address, path string) uint64 {
|
||||
var a string
|
||||
|
||||
if addr != nil {
|
||||
a = addr.String()
|
||||
}
|
||||
|
||||
return hrw.Hash([]byte(a + path))
|
||||
}
|
||||
|
||||
// converts uint64 to hex string.
|
||||
func u64ToHexString(ind uint64) string {
|
||||
return strconv.FormatUint(ind, 16)
|
||||
}
|
||||
|
||||
// converts uint64 hex string to uint64.
|
||||
func u64FromHexString(str string) uint64 {
|
||||
v, err := strconv.ParseUint(str, 16, 64)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("blobovnicza name is not an index %s", str))
|
||||
}
|
||||
|
||||
return v
|
||||
|
||||
}
|
134
pkg/local_object_storage/blobstor/blobovnicza_test.go
Normal file
134
pkg/local_object_storage/blobstor/blobovnicza_test.go
Normal file
|
@ -0,0 +1,134 @@
|
|||
package blobstor
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func testSHA256() (h [sha256.Size]byte) {
|
||||
rand.Read(h[:])
|
||||
|
||||
return h
|
||||
}
|
||||
|
||||
func testAddress() *objectSDK.Address {
|
||||
cid := container.NewID()
|
||||
cid.SetSHA256(testSHA256())
|
||||
|
||||
oid := objectSDK.NewID()
|
||||
oid.SetSHA256(testSHA256())
|
||||
|
||||
addr := objectSDK.NewAddress()
|
||||
addr.SetObjectID(oid)
|
||||
addr.SetContainerID(cid)
|
||||
|
||||
return addr
|
||||
}
|
||||
|
||||
func testObject(sz uint64) *object.Object {
|
||||
raw := object.NewRaw()
|
||||
|
||||
addr := testAddress()
|
||||
raw.SetID(addr.ObjectID())
|
||||
raw.SetContainerID(addr.ContainerID())
|
||||
|
||||
raw.SetPayload(make([]byte, sz))
|
||||
|
||||
// fit the binary size to the required
|
||||
data, _ := raw.Marshal()
|
||||
if ln := uint64(len(data)); ln > sz {
|
||||
raw.SetPayload(raw.Payload()[:sz-(ln-sz)])
|
||||
}
|
||||
|
||||
raw.SetAttributes() // for require.Equal
|
||||
|
||||
return raw.Object()
|
||||
}
|
||||
|
||||
func TestBlobovniczas(t *testing.T) {
|
||||
l := test.NewLogger(false)
|
||||
p := "./test_blz"
|
||||
|
||||
c := defaultCfg()
|
||||
|
||||
var width, depth, szLim uint64 = 2, 2, 2 << 10
|
||||
|
||||
for _, opt := range []Option{
|
||||
WithLogger(l),
|
||||
WithSmallSizeLimit(szLim),
|
||||
WithBlobovniczaShallowWidth(width),
|
||||
WithBlobovniczaShallowDepth(depth),
|
||||
WithBlobovniczaRootPath(p),
|
||||
WithBlobovniczaSize(szLim),
|
||||
} {
|
||||
opt(c)
|
||||
}
|
||||
|
||||
c.blzRootPath = p
|
||||
|
||||
b := newBlobovniczaTree(c)
|
||||
|
||||
defer os.RemoveAll(p)
|
||||
|
||||
require.NoError(t, b.init())
|
||||
|
||||
objSz := uint64(szLim / 2)
|
||||
|
||||
addrList := make([]*objectSDK.Address, 0)
|
||||
minFitObjNum := 2 * 2 * szLim / objSz
|
||||
|
||||
for i := uint64(0); i < minFitObjNum; i++ {
|
||||
obj := testObject(objSz)
|
||||
addrList = append(addrList, obj.Address())
|
||||
|
||||
d, err := obj.Marshal()
|
||||
require.NoError(t, err)
|
||||
|
||||
// save object in blobovnicza
|
||||
id, err := b.put(obj.Address(), d)
|
||||
require.NoError(t, err)
|
||||
|
||||
// get w/ blobovnicza ID
|
||||
prm := new(GetSmallPrm)
|
||||
prm.SetBlobovniczaID(id)
|
||||
prm.SetAddress(obj.Address())
|
||||
|
||||
res, err := b.get(prm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, obj, res.Object())
|
||||
|
||||
// get w/o blobovnicza ID
|
||||
prm.SetBlobovniczaID(nil)
|
||||
|
||||
res, err = b.get(prm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, obj, res.Object())
|
||||
}
|
||||
|
||||
dPrm := new(DeleteSmallPrm)
|
||||
gPrm := new(GetSmallPrm)
|
||||
|
||||
for i := range addrList {
|
||||
dPrm.SetAddress(addrList[i])
|
||||
|
||||
_, err := b.delete(dPrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
gPrm.SetAddress(addrList[i])
|
||||
|
||||
_, err = b.get(gPrm)
|
||||
require.True(t, errors.Is(err, ErrObjectNotFound))
|
||||
|
||||
_, err = b.delete(dPrm)
|
||||
require.True(t, errors.Is(err, ErrObjectNotFound))
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue