[#136] localstorage: Make local storage to use new metabase

Replace meta Bucket with meta.DB instance in local storage implementation.
Adopt all dependent components to new local storage.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-10-29 20:16:49 +03:00 committed by Alex Vanin
parent a61f8d44d1
commit c0aa892161
10 changed files with 42 additions and 500 deletions

View file

@ -1,284 +0,0 @@
package localstore
import (
"context"
"math"
"sort"
"sync"
"github.com/pkg/errors"
)
type (
// FilterCode is an enumeration of filter return codes.
FilterCode int
// PriorityFlag is an enumeration of priority flags.
PriorityFlag int
filterPipelineSet []FilterPipeline
// FilterFunc is a function that checks whether an ObjectMeta matches a specific criterion.
FilterFunc func(ctx context.Context, meta *ObjectMeta) *FilterResult
// FilterResult groups of ObjectMeta filter result values.
FilterResult struct {
c FilterCode
e error
}
// FilterPipeline is an interface of ObjectMeta filtering tool with sub-filters and priorities.
FilterPipeline interface {
Pass(ctx context.Context, meta *ObjectMeta) *FilterResult
PutSubFilter(params SubFilterParams) error
GetPriority() uint64
SetPriority(uint64)
GetName() string
}
// FilterParams groups the parameters of FilterPipeline constructor.
FilterParams struct {
Name string
Priority uint64
FilterFunc FilterFunc
}
// SubFilterParams groups the parameters of sub-filter registration.
SubFilterParams struct {
PriorityFlag
FilterPipeline
OnIgnore FilterCode
OnPass FilterCode
OnFail FilterCode
}
filterPipeline struct {
*sync.RWMutex
name string
pri uint64
filterFn FilterFunc
maxSubPri uint64
mSubResult map[string]map[FilterCode]FilterCode
subFilters []FilterPipeline
}
)
const (
// PriorityValue is a PriorityFlag of the sub-filter registration with GetPriority() value.
PriorityValue PriorityFlag = iota
// PriorityMax is a PriorityFlag of the sub-filter registration with maximum priority.
PriorityMax
// PriorityMin is a PriorityFlag of the sub-filter registration with minimum priority.
PriorityMin
)
const (
// CodeUndefined is a undefined FilterCode.
CodeUndefined FilterCode = iota
// CodePass is a FilterCode of filter passage.
CodePass
// CodeFail is a FilterCode of filter failure.
CodeFail
// CodeIgnore is a FilterCode of filter ignoring.
CodeIgnore
)
var (
rPass = &FilterResult{
c: CodePass,
}
rFail = &FilterResult{
c: CodeFail,
}
rIgnore = &FilterResult{
c: CodeIgnore,
}
rUndefined = &FilterResult{
c: CodeUndefined,
}
)
// ResultPass returns the FilterResult with CodePass code and nil error.
func ResultPass() *FilterResult {
return rPass
}
// ResultFail returns the FilterResult with CodeFail code and nil error.
func ResultFail() *FilterResult {
return rFail
}
// ResultIgnore returns the FilterResult with CodeIgnore code and nil error.
func ResultIgnore() *FilterResult {
return rIgnore
}
// ResultUndefined returns the FilterResult with CodeUndefined code and nil error.
func ResultUndefined() *FilterResult {
return rUndefined
}
// ResultWithError returns the FilterResult with passed code and error.
func ResultWithError(c FilterCode, e error) *FilterResult {
return &FilterResult{
e: e,
c: c,
}
}
// Code returns the filter result code.
func (s *FilterResult) Code() FilterCode {
return s.c
}
// Err returns the filter result error.
func (s *FilterResult) Err() error {
return s.e
}
func (f filterPipelineSet) Len() int { return len(f) }
func (f filterPipelineSet) Less(i, j int) bool { return f[i].GetPriority() > f[j].GetPriority() }
func (f filterPipelineSet) Swap(i, j int) { f[i], f[j] = f[j], f[i] }
func (r FilterCode) String() string {
switch r {
case CodePass:
return "PASSED"
case CodeFail:
return "FAILED"
case CodeIgnore:
return "IGNORED"
default:
return "UNDEFINED"
}
}
// NewFilter is a FilterPipeline constructor.
func NewFilter(p *FilterParams) FilterPipeline {
return &filterPipeline{
RWMutex: new(sync.RWMutex),
name: p.Name,
pri: p.Priority,
filterFn: p.FilterFunc,
mSubResult: make(map[string]map[FilterCode]FilterCode),
}
}
func (p *filterPipeline) Pass(ctx context.Context, meta *ObjectMeta) *FilterResult {
p.RLock()
defer p.RUnlock()
for i := range p.subFilters {
subResult := p.subFilters[i].Pass(ctx, meta)
subName := p.subFilters[i].GetName()
cSub := subResult.Code()
if cSub <= CodeUndefined {
return ResultUndefined()
}
if cFin := p.mSubResult[subName][cSub]; cFin != CodeIgnore {
return ResultWithError(cFin, subResult.Err())
}
}
if p.filterFn == nil {
return ResultUndefined()
}
return p.filterFn(ctx, meta)
}
func (p *filterPipeline) PutSubFilter(params SubFilterParams) error {
p.Lock()
defer p.Unlock()
if params.FilterPipeline == nil {
return errors.New("could not put sub filter: empty filter pipeline")
}
name := params.FilterPipeline.GetName()
if _, ok := p.mSubResult[name]; ok {
return errors.Errorf("filter %s is already in pipeline %s", name, p.GetName())
}
if params.PriorityFlag != PriorityMin {
if pri := params.FilterPipeline.GetPriority(); pri < math.MaxUint64 {
params.FilterPipeline.SetPriority(pri + 1)
}
} else {
params.FilterPipeline.SetPriority(0)
}
switch pri := params.FilterPipeline.GetPriority(); params.PriorityFlag {
case PriorityMax:
if p.maxSubPri < math.MaxUint64 {
p.maxSubPri++
}
params.FilterPipeline.SetPriority(p.maxSubPri)
case PriorityValue:
if pri > p.maxSubPri {
p.maxSubPri = pri
}
}
if params.OnFail <= 0 {
params.OnFail = CodeIgnore
}
if params.OnIgnore <= 0 {
params.OnIgnore = CodeIgnore
}
if params.OnPass <= 0 {
params.OnPass = CodeIgnore
}
p.mSubResult[name] = map[FilterCode]FilterCode{
CodePass: params.OnPass,
CodeIgnore: params.OnIgnore,
CodeFail: params.OnFail,
}
p.subFilters = append(p.subFilters, params.FilterPipeline)
sort.Sort(filterPipelineSet(p.subFilters))
return nil
}
func (p *filterPipeline) GetPriority() uint64 {
p.RLock()
defer p.RUnlock()
return p.pri
}
func (p *filterPipeline) SetPriority(pri uint64) {
p.Lock()
p.pri = pri
p.Unlock()
}
func (p *filterPipeline) GetName() string {
p.RLock()
defer p.RUnlock()
if p.name == "" {
return "FILTER_UNNAMED"
}
return p.name
}

View file

@ -1,32 +0,0 @@
package localstore
import (
"errors"
"testing"
"github.com/stretchr/testify/require"
)
func TestFilterResult(t *testing.T) {
var (
r *FilterResult
c = CodePass
e = errors.New("test error")
)
r = ResultPass()
require.Equal(t, CodePass, r.Code())
require.NoError(t, r.Err())
r = ResultFail()
require.Equal(t, CodeFail, r.Code())
require.NoError(t, r.Err())
r = ResultIgnore()
require.Equal(t, CodeIgnore, r.Code())
require.NoError(t, r.Err())
r = ResultWithError(c, e)
require.Equal(t, c, r.Code())
require.EqualError(t, r.Err(), e.Error())
}

View file

@ -1,82 +0,0 @@
package localstore
import (
"encoding/binary"
"io"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/pkg/errors"
)
// ObjectMeta represents meta information about
// the object that is stored in meta storage.
type ObjectMeta struct {
head *object.Object
savedAtEpoch uint64
}
// SavedAtEpoch returns the number of epoch
// at which the object was saved locally.
func (m *ObjectMeta) SavedAtEpoch() uint64 {
if m != nil {
return m.savedAtEpoch
}
return 0
}
// Head returns the object w/o payload.
func (m *ObjectMeta) Head() *object.Object {
if m != nil {
return m.head
}
return nil
}
// AddressFromMeta extracts the Address from object meta.
func AddressFromMeta(m *ObjectMeta) *objectSDK.Address {
return m.Head().Address()
}
func metaFromObject(o *object.Object) *ObjectMeta {
// FIXME: remove hard-code
meta := new(ObjectMeta)
meta.savedAtEpoch = 10
meta.head = object.NewRawFromObject(o).CutPayload().Object()
return meta
}
func metaToBytes(m *ObjectMeta) ([]byte, error) {
data := make([]byte, 8)
binary.BigEndian.PutUint64(data, m.savedAtEpoch)
objBytes, err := objectBytes(m.head)
if err != nil {
return nil, err
}
return append(data, objBytes...), nil
}
func metaFromBytes(data []byte) (*ObjectMeta, error) {
if len(data) < 8 {
return nil, io.ErrUnexpectedEOF
}
obj, err := object.FromBytes(data[8:])
if err != nil {
return nil, errors.Wrap(err, "could not get object address from bytes")
}
meta := new(ObjectMeta)
meta.savedAtEpoch = binary.BigEndian.Uint64(data)
meta.head = obj
return meta, nil
}

View file

@ -1,8 +1,6 @@
package localstore
import (
"context"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/pkg/errors"
@ -28,16 +26,11 @@ func (s *Storage) Put(obj *object.Object) error {
return errors.Wrap(err, "could not marshal the object")
}
metaBytes, err := metaToBytes(metaFromObject(obj))
if err != nil {
return errors.Wrap(err, "could not marshal object meta")
}
if err := s.blobBucket.Set(addrBytes, objBytes); err != nil {
return errors.Wrap(err, "could no save object in BLOB storage")
}
if err := s.metaBucket.Set(addrBytes, metaBytes); err != nil {
if err := s.metaBase.Put(object.NewRawFromObject(obj).CutPayload().Object()); err != nil {
return errors.Wrap(err, "could not save object in meta storage")
}
@ -56,7 +49,7 @@ func (s *Storage) Delete(addr *objectSDK.Address) error {
)
}
if err := s.metaBucket.Del(addrBytes); err != nil {
if err := s.metaBase.Delete(addr); err != nil {
return errors.Wrap(err, "could not remove object from meta storage")
}
@ -77,40 +70,10 @@ func (s *Storage) Get(addr *objectSDK.Address) (*object.Object, error) {
return object.FromBytes(objBytes)
}
func (s *Storage) Head(addr *objectSDK.Address) (*ObjectMeta, error) {
addrBytes, err := addressBytes(addr)
if err != nil {
return nil, errors.Wrap(err, "could not marshal object address")
func (s *Storage) Head(addr *objectSDK.Address) (*object.Object, error) {
return s.metaBase.Get(addr)
}
metaBytes, err := s.metaBucket.Get(addrBytes)
if err != nil {
return nil, errors.Wrap(err, "could not get object from meta storage")
}
return metaFromBytes(metaBytes)
}
func (s *Storage) Iterate(filter FilterPipeline, handler func(*ObjectMeta) bool) error {
if filter == nil {
filter = NewFilter(&FilterParams{
Name: "SKIPPING_FILTER",
FilterFunc: func(context.Context, *ObjectMeta) *FilterResult {
return ResultPass()
},
})
}
return s.metaBucket.Iterate(func(_, v []byte) bool {
meta, err := metaFromBytes(v)
if err != nil {
s.log.Error("unmarshal meta bucket item failure",
zap.Error(err),
)
} else if filter.Pass(context.TODO(), meta).Code() == CodePass {
return !handler(meta)
}
return true
})
func (s *Storage) Select(fs objectSDK.SearchFilters) ([]*objectSDK.Address, error) {
return s.metaBase.Select(fs)
}

View file

@ -2,6 +2,7 @@ package localstore
import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"go.uber.org/zap"
)
@ -10,7 +11,7 @@ import (
type Storage struct {
log *logger.Logger
metaBucket bucket.Bucket
metaBase *meta.DB
blobBucket bucket.Bucket
}
@ -29,7 +30,7 @@ func defaultCfg() *cfg {
}
// New is a local object storage constructor.
func New(blob, meta bucket.Bucket, opts ...Option) *Storage {
func New(blob bucket.Bucket, meta *meta.DB, opts ...Option) *Storage {
cfg := defaultCfg()
for i := range opts {
@ -37,9 +38,9 @@ func New(blob, meta bucket.Bucket, opts ...Option) *Storage {
}
return &Storage{
metaBucket: meta,
blobBucket: blob,
log: cfg.logger,
metaBase: meta,
blobBucket: blob,
}
}

View file

@ -23,6 +23,10 @@ func NewDB(boltDB *bbolt.DB) *DB {
}
}
func (db *DB) Close() error {
return db.boltDB.Close()
}
func stringEqualMatcher(key, objVal, filterVal string) bool {
switch key {
default:

View file

@ -17,10 +17,5 @@ func (s *localStorage) Head(addr *objectSDK.Address) (*object.Object, error) {
return nil, io.ErrUnexpectedEOF
}
meta, err := s.ls.Head(addr)
if err != nil {
return nil, err
}
return meta.Head(), nil
return s.ls.Head(addr)
}

View file

@ -13,12 +13,12 @@ type localHeader struct {
}
func (h *localHeader) head(ctx context.Context, prm *Prm, handler func(*object.Object)) error {
m, err := h.storage.Head(prm.addr)
head, err := h.storage.Head(prm.addr)
if err != nil {
return errors.Wrapf(err, "(%T) could not get header from local storage", h)
}
handler(m.Head())
handler(head)
return nil
}

View file

@ -5,7 +5,6 @@ import (
"github.com/nspcc-dev/neofs-api-go/pkg/container"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"
"github.com/nspcc-dev/neofs-node/pkg/services/object/search/query"
"github.com/pkg/errors"
@ -19,47 +18,25 @@ type localStream struct {
cid *container.ID
}
type searchQueryFilter struct {
localstore.FilterPipeline
query query.Query
ch chan<- []*objectSDK.ID
cid *container.ID
}
func (s *localStream) stream(ctx context.Context, ch chan<- []*objectSDK.ID) error {
filter := &searchQueryFilter{
query: s.query,
ch: ch,
cid: s.cid,
fs := s.query.ToSearchFilters()
fs.AddObjectContainerIDFilter(objectSDK.MatchStringEqual, s.cid)
addrList, err := s.storage.Select(fs)
if err != nil {
return errors.Wrapf(err, "(%T) could not select objects from local storage", s)
}
idList := make([]*objectSDK.ID, 0, len(addrList))
for i := range addrList {
idList = append(idList, addrList[i].GetObjectID())
}
if err := s.storage.Iterate(filter, func(meta *localstore.ObjectMeta) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}); err != nil && !errors.Is(errors.Cause(err), bucket.ErrIteratingAborted) {
return errors.Wrapf(err, "(%T) could not iterate over local storage", s)
}
return ctx.Err()
case ch <- idList:
return nil
}
func (f *searchQueryFilter) Pass(ctx context.Context, meta *localstore.ObjectMeta) *localstore.FilterResult {
if obj := meta.Head(); f.cid.Equal(obj.GetContainerID()) {
f.query.Match(meta.Head(), func(id *objectSDK.ID) {
select {
case <-ctx.Done():
return
case f.ch <- []*objectSDK.ID{id}:
}
})
}
return localstore.ResultPass()
}

View file

@ -14,15 +14,15 @@ func (q *jobQueue) Select(limit int) ([]*object.Address, error) {
// We can prioritize objects for migration, newly arrived objects, etc.
// It is recommended to make changes after updating the metabase
res := make([]*object.Address, 0, limit)
if err := q.localStorage.Iterate(nil, func(meta *localstore.ObjectMeta) bool {
res = append(res, meta.Head().Address())
return len(res) >= limit
}); err != nil {
// FIXME: add the ability to limit Select result
res, err := q.localStorage.Select(object.SearchFilters{})
if err != nil {
return nil, err
}
if len(res) < limit {
return res, nil
}
return res[:limit], nil
}