diff --git a/pkg/local_object_storage/localstore/filter.go b/pkg/local_object_storage/localstore/filter.go deleted file mode 100644 index 7f7891bdd..000000000 --- a/pkg/local_object_storage/localstore/filter.go +++ /dev/null @@ -1,284 +0,0 @@ -package localstore - -import ( - "context" - "math" - "sort" - "sync" - - "github.com/pkg/errors" -) - -type ( - // FilterCode is an enumeration of filter return codes. - FilterCode int - - // PriorityFlag is an enumeration of priority flags. - PriorityFlag int - - filterPipelineSet []FilterPipeline - - // FilterFunc is a function that checks whether an ObjectMeta matches a specific criterion. - FilterFunc func(ctx context.Context, meta *ObjectMeta) *FilterResult - - // FilterResult groups of ObjectMeta filter result values. - FilterResult struct { - c FilterCode - - e error - } - - // FilterPipeline is an interface of ObjectMeta filtering tool with sub-filters and priorities. - FilterPipeline interface { - Pass(ctx context.Context, meta *ObjectMeta) *FilterResult - PutSubFilter(params SubFilterParams) error - GetPriority() uint64 - SetPriority(uint64) - GetName() string - } - - // FilterParams groups the parameters of FilterPipeline constructor. - FilterParams struct { - Name string - Priority uint64 - FilterFunc FilterFunc - } - - // SubFilterParams groups the parameters of sub-filter registration. - SubFilterParams struct { - PriorityFlag - FilterPipeline - OnIgnore FilterCode - OnPass FilterCode - OnFail FilterCode - } - - filterPipeline struct { - *sync.RWMutex - - name string - pri uint64 - filterFn FilterFunc - - maxSubPri uint64 - mSubResult map[string]map[FilterCode]FilterCode - subFilters []FilterPipeline - } -) - -const ( - // PriorityValue is a PriorityFlag of the sub-filter registration with GetPriority() value. - PriorityValue PriorityFlag = iota - - // PriorityMax is a PriorityFlag of the sub-filter registration with maximum priority. - PriorityMax - - // PriorityMin is a PriorityFlag of the sub-filter registration with minimum priority. - PriorityMin -) - -const ( - // CodeUndefined is a undefined FilterCode. - CodeUndefined FilterCode = iota - - // CodePass is a FilterCode of filter passage. - CodePass - - // CodeFail is a FilterCode of filter failure. - CodeFail - - // CodeIgnore is a FilterCode of filter ignoring. - CodeIgnore -) - -var ( - rPass = &FilterResult{ - c: CodePass, - } - - rFail = &FilterResult{ - c: CodeFail, - } - - rIgnore = &FilterResult{ - c: CodeIgnore, - } - - rUndefined = &FilterResult{ - c: CodeUndefined, - } -) - -// ResultPass returns the FilterResult with CodePass code and nil error. -func ResultPass() *FilterResult { - return rPass -} - -// ResultFail returns the FilterResult with CodeFail code and nil error. -func ResultFail() *FilterResult { - return rFail -} - -// ResultIgnore returns the FilterResult with CodeIgnore code and nil error. -func ResultIgnore() *FilterResult { - return rIgnore -} - -// ResultUndefined returns the FilterResult with CodeUndefined code and nil error. -func ResultUndefined() *FilterResult { - return rUndefined -} - -// ResultWithError returns the FilterResult with passed code and error. -func ResultWithError(c FilterCode, e error) *FilterResult { - return &FilterResult{ - e: e, - c: c, - } -} - -// Code returns the filter result code. -func (s *FilterResult) Code() FilterCode { - return s.c -} - -// Err returns the filter result error. -func (s *FilterResult) Err() error { - return s.e -} - -func (f filterPipelineSet) Len() int { return len(f) } -func (f filterPipelineSet) Less(i, j int) bool { return f[i].GetPriority() > f[j].GetPriority() } -func (f filterPipelineSet) Swap(i, j int) { f[i], f[j] = f[j], f[i] } - -func (r FilterCode) String() string { - switch r { - case CodePass: - return "PASSED" - case CodeFail: - return "FAILED" - case CodeIgnore: - return "IGNORED" - default: - return "UNDEFINED" - } -} - -// NewFilter is a FilterPipeline constructor. -func NewFilter(p *FilterParams) FilterPipeline { - return &filterPipeline{ - RWMutex: new(sync.RWMutex), - name: p.Name, - pri: p.Priority, - filterFn: p.FilterFunc, - mSubResult: make(map[string]map[FilterCode]FilterCode), - } -} - -func (p *filterPipeline) Pass(ctx context.Context, meta *ObjectMeta) *FilterResult { - p.RLock() - defer p.RUnlock() - - for i := range p.subFilters { - subResult := p.subFilters[i].Pass(ctx, meta) - subName := p.subFilters[i].GetName() - - cSub := subResult.Code() - - if cSub <= CodeUndefined { - return ResultUndefined() - } - - if cFin := p.mSubResult[subName][cSub]; cFin != CodeIgnore { - return ResultWithError(cFin, subResult.Err()) - } - } - - if p.filterFn == nil { - return ResultUndefined() - } - - return p.filterFn(ctx, meta) -} - -func (p *filterPipeline) PutSubFilter(params SubFilterParams) error { - p.Lock() - defer p.Unlock() - - if params.FilterPipeline == nil { - return errors.New("could not put sub filter: empty filter pipeline") - } - - name := params.FilterPipeline.GetName() - if _, ok := p.mSubResult[name]; ok { - return errors.Errorf("filter %s is already in pipeline %s", name, p.GetName()) - } - - if params.PriorityFlag != PriorityMin { - if pri := params.FilterPipeline.GetPriority(); pri < math.MaxUint64 { - params.FilterPipeline.SetPriority(pri + 1) - } - } else { - params.FilterPipeline.SetPriority(0) - } - - switch pri := params.FilterPipeline.GetPriority(); params.PriorityFlag { - case PriorityMax: - if p.maxSubPri < math.MaxUint64 { - p.maxSubPri++ - } - - params.FilterPipeline.SetPriority(p.maxSubPri) - case PriorityValue: - if pri > p.maxSubPri { - p.maxSubPri = pri - } - } - - if params.OnFail <= 0 { - params.OnFail = CodeIgnore - } - - if params.OnIgnore <= 0 { - params.OnIgnore = CodeIgnore - } - - if params.OnPass <= 0 { - params.OnPass = CodeIgnore - } - - p.mSubResult[name] = map[FilterCode]FilterCode{ - CodePass: params.OnPass, - CodeIgnore: params.OnIgnore, - CodeFail: params.OnFail, - } - - p.subFilters = append(p.subFilters, params.FilterPipeline) - - sort.Sort(filterPipelineSet(p.subFilters)) - - return nil -} - -func (p *filterPipeline) GetPriority() uint64 { - p.RLock() - defer p.RUnlock() - - return p.pri -} -func (p *filterPipeline) SetPriority(pri uint64) { - p.Lock() - p.pri = pri - p.Unlock() -} - -func (p *filterPipeline) GetName() string { - p.RLock() - defer p.RUnlock() - - if p.name == "" { - return "FILTER_UNNAMED" - } - - return p.name -} diff --git a/pkg/local_object_storage/localstore/filter_test.go b/pkg/local_object_storage/localstore/filter_test.go deleted file mode 100644 index 96fd8c6a3..000000000 --- a/pkg/local_object_storage/localstore/filter_test.go +++ /dev/null @@ -1,32 +0,0 @@ -package localstore - -import ( - "errors" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestFilterResult(t *testing.T) { - var ( - r *FilterResult - c = CodePass - e = errors.New("test error") - ) - - r = ResultPass() - require.Equal(t, CodePass, r.Code()) - require.NoError(t, r.Err()) - - r = ResultFail() - require.Equal(t, CodeFail, r.Code()) - require.NoError(t, r.Err()) - - r = ResultIgnore() - require.Equal(t, CodeIgnore, r.Code()) - require.NoError(t, r.Err()) - - r = ResultWithError(c, e) - require.Equal(t, c, r.Code()) - require.EqualError(t, r.Err(), e.Error()) -} diff --git a/pkg/local_object_storage/localstore/meta.go b/pkg/local_object_storage/localstore/meta.go deleted file mode 100644 index 6c99ca122..000000000 --- a/pkg/local_object_storage/localstore/meta.go +++ /dev/null @@ -1,82 +0,0 @@ -package localstore - -import ( - "encoding/binary" - "io" - - objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-node/pkg/core/object" - "github.com/pkg/errors" -) - -// ObjectMeta represents meta information about -// the object that is stored in meta storage. -type ObjectMeta struct { - head *object.Object - - savedAtEpoch uint64 -} - -// SavedAtEpoch returns the number of epoch -// at which the object was saved locally. -func (m *ObjectMeta) SavedAtEpoch() uint64 { - if m != nil { - return m.savedAtEpoch - } - - return 0 -} - -// Head returns the object w/o payload. -func (m *ObjectMeta) Head() *object.Object { - if m != nil { - return m.head - } - - return nil -} - -// AddressFromMeta extracts the Address from object meta. -func AddressFromMeta(m *ObjectMeta) *objectSDK.Address { - return m.Head().Address() -} - -func metaFromObject(o *object.Object) *ObjectMeta { - // FIXME: remove hard-code - meta := new(ObjectMeta) - meta.savedAtEpoch = 10 - - meta.head = object.NewRawFromObject(o).CutPayload().Object() - - return meta -} - -func metaToBytes(m *ObjectMeta) ([]byte, error) { - data := make([]byte, 8) - - binary.BigEndian.PutUint64(data, m.savedAtEpoch) - - objBytes, err := objectBytes(m.head) - if err != nil { - return nil, err - } - - return append(data, objBytes...), nil -} - -func metaFromBytes(data []byte) (*ObjectMeta, error) { - if len(data) < 8 { - return nil, io.ErrUnexpectedEOF - } - - obj, err := object.FromBytes(data[8:]) - if err != nil { - return nil, errors.Wrap(err, "could not get object address from bytes") - } - - meta := new(ObjectMeta) - meta.savedAtEpoch = binary.BigEndian.Uint64(data) - meta.head = obj - - return meta, nil -} diff --git a/pkg/local_object_storage/localstore/methods.go b/pkg/local_object_storage/localstore/methods.go index 2634096a3..b30cab007 100644 --- a/pkg/local_object_storage/localstore/methods.go +++ b/pkg/local_object_storage/localstore/methods.go @@ -1,8 +1,6 @@ package localstore import ( - "context" - objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/pkg/errors" @@ -28,16 +26,11 @@ func (s *Storage) Put(obj *object.Object) error { return errors.Wrap(err, "could not marshal the object") } - metaBytes, err := metaToBytes(metaFromObject(obj)) - if err != nil { - return errors.Wrap(err, "could not marshal object meta") - } - if err := s.blobBucket.Set(addrBytes, objBytes); err != nil { return errors.Wrap(err, "could no save object in BLOB storage") } - if err := s.metaBucket.Set(addrBytes, metaBytes); err != nil { + if err := s.metaBase.Put(object.NewRawFromObject(obj).CutPayload().Object()); err != nil { return errors.Wrap(err, "could not save object in meta storage") } @@ -56,7 +49,7 @@ func (s *Storage) Delete(addr *objectSDK.Address) error { ) } - if err := s.metaBucket.Del(addrBytes); err != nil { + if err := s.metaBase.Delete(addr); err != nil { return errors.Wrap(err, "could not remove object from meta storage") } @@ -77,40 +70,10 @@ func (s *Storage) Get(addr *objectSDK.Address) (*object.Object, error) { return object.FromBytes(objBytes) } -func (s *Storage) Head(addr *objectSDK.Address) (*ObjectMeta, error) { - addrBytes, err := addressBytes(addr) - if err != nil { - return nil, errors.Wrap(err, "could not marshal object address") - } - - metaBytes, err := s.metaBucket.Get(addrBytes) - if err != nil { - return nil, errors.Wrap(err, "could not get object from meta storage") - } - - return metaFromBytes(metaBytes) +func (s *Storage) Head(addr *objectSDK.Address) (*object.Object, error) { + return s.metaBase.Get(addr) } -func (s *Storage) Iterate(filter FilterPipeline, handler func(*ObjectMeta) bool) error { - if filter == nil { - filter = NewFilter(&FilterParams{ - Name: "SKIPPING_FILTER", - FilterFunc: func(context.Context, *ObjectMeta) *FilterResult { - return ResultPass() - }, - }) - } - - return s.metaBucket.Iterate(func(_, v []byte) bool { - meta, err := metaFromBytes(v) - if err != nil { - s.log.Error("unmarshal meta bucket item failure", - zap.Error(err), - ) - } else if filter.Pass(context.TODO(), meta).Code() == CodePass { - return !handler(meta) - } - - return true - }) +func (s *Storage) Select(fs objectSDK.SearchFilters) ([]*objectSDK.Address, error) { + return s.metaBase.Select(fs) } diff --git a/pkg/local_object_storage/localstore/storage.go b/pkg/local_object_storage/localstore/storage.go index 284ac3851..add90ac4f 100644 --- a/pkg/local_object_storage/localstore/storage.go +++ b/pkg/local_object_storage/localstore/storage.go @@ -2,6 +2,7 @@ package localstore import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "go.uber.org/zap" ) @@ -10,7 +11,7 @@ import ( type Storage struct { log *logger.Logger - metaBucket bucket.Bucket + metaBase *meta.DB blobBucket bucket.Bucket } @@ -29,7 +30,7 @@ func defaultCfg() *cfg { } // New is a local object storage constructor. -func New(blob, meta bucket.Bucket, opts ...Option) *Storage { +func New(blob bucket.Bucket, meta *meta.DB, opts ...Option) *Storage { cfg := defaultCfg() for i := range opts { @@ -37,9 +38,9 @@ func New(blob, meta bucket.Bucket, opts ...Option) *Storage { } return &Storage{ - metaBucket: meta, - blobBucket: blob, log: cfg.logger, + metaBase: meta, + blobBucket: blob, } } diff --git a/pkg/local_object_storage/metabase/db.go b/pkg/local_object_storage/metabase/db.go index 7bea903be..b5cce3ab7 100644 --- a/pkg/local_object_storage/metabase/db.go +++ b/pkg/local_object_storage/metabase/db.go @@ -23,6 +23,10 @@ func NewDB(boltDB *bbolt.DB) *DB { } } +func (db *DB) Close() error { + return db.boltDB.Close() +} + func stringEqualMatcher(key, objVal, filterVal string) bool { switch key { default: diff --git a/pkg/services/object/acl/eacl/v2/localstore.go b/pkg/services/object/acl/eacl/v2/localstore.go index 4757ef02c..aef44fafe 100644 --- a/pkg/services/object/acl/eacl/v2/localstore.go +++ b/pkg/services/object/acl/eacl/v2/localstore.go @@ -17,10 +17,5 @@ func (s *localStorage) Head(addr *objectSDK.Address) (*object.Object, error) { return nil, io.ErrUnexpectedEOF } - meta, err := s.ls.Head(addr) - if err != nil { - return nil, err - } - - return meta.Head(), nil + return s.ls.Head(addr) } diff --git a/pkg/services/object/head/local.go b/pkg/services/object/head/local.go index 1cec2c1c4..908c3419a 100644 --- a/pkg/services/object/head/local.go +++ b/pkg/services/object/head/local.go @@ -13,12 +13,12 @@ type localHeader struct { } func (h *localHeader) head(ctx context.Context, prm *Prm, handler func(*object.Object)) error { - m, err := h.storage.Head(prm.addr) + head, err := h.storage.Head(prm.addr) if err != nil { return errors.Wrapf(err, "(%T) could not get header from local storage", h) } - handler(m.Head()) + handler(head) return nil } diff --git a/pkg/services/object/search/local.go b/pkg/services/object/search/local.go index ed178fa67..d0075bf1b 100644 --- a/pkg/services/object/search/local.go +++ b/pkg/services/object/search/local.go @@ -5,7 +5,6 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/container" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" "github.com/nspcc-dev/neofs-node/pkg/services/object/search/query" "github.com/pkg/errors" @@ -19,47 +18,25 @@ type localStream struct { cid *container.ID } -type searchQueryFilter struct { - localstore.FilterPipeline - - query query.Query - - ch chan<- []*objectSDK.ID - - cid *container.ID -} - func (s *localStream) stream(ctx context.Context, ch chan<- []*objectSDK.ID) error { - filter := &searchQueryFilter{ - query: s.query, - ch: ch, - cid: s.cid, + fs := s.query.ToSearchFilters() + fs.AddObjectContainerIDFilter(objectSDK.MatchStringEqual, s.cid) + + addrList, err := s.storage.Select(fs) + if err != nil { + return errors.Wrapf(err, "(%T) could not select objects from local storage", s) } - if err := s.storage.Iterate(filter, func(meta *localstore.ObjectMeta) bool { - select { - case <-ctx.Done(): - return true - default: - return false - } - }); err != nil && !errors.Is(errors.Cause(err), bucket.ErrIteratingAborted) { - return errors.Wrapf(err, "(%T) could not iterate over local storage", s) + idList := make([]*objectSDK.ID, 0, len(addrList)) + + for i := range addrList { + idList = append(idList, addrList[i].GetObjectID()) } - return nil -} - -func (f *searchQueryFilter) Pass(ctx context.Context, meta *localstore.ObjectMeta) *localstore.FilterResult { - if obj := meta.Head(); f.cid.Equal(obj.GetContainerID()) { - f.query.Match(meta.Head(), func(id *objectSDK.ID) { - select { - case <-ctx.Done(): - return - case f.ch <- []*objectSDK.ID{id}: - } - }) - } - - return localstore.ResultPass() + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- idList: + return nil + } } diff --git a/pkg/services/policer/queue.go b/pkg/services/policer/queue.go index 63348a0a0..b0b26e582 100644 --- a/pkg/services/policer/queue.go +++ b/pkg/services/policer/queue.go @@ -14,15 +14,15 @@ func (q *jobQueue) Select(limit int) ([]*object.Address, error) { // We can prioritize objects for migration, newly arrived objects, etc. // It is recommended to make changes after updating the metabase - res := make([]*object.Address, 0, limit) - - if err := q.localStorage.Iterate(nil, func(meta *localstore.ObjectMeta) bool { - res = append(res, meta.Head().Address()) - - return len(res) >= limit - }); err != nil { + // FIXME: add the ability to limit Select result + res, err := q.localStorage.Select(object.SearchFilters{}) + if err != nil { return nil, err } - return res, nil + if len(res) < limit { + return res, nil + } + + return res[:limit], nil }