From 09084a7bffd605802f0e47cc0aac35c2976cf9c3 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 22 Sep 2020 09:51:47 +0300 Subject: [PATCH] [#34] service/object: Implement object Search distributed service Signed-off-by: Leonard Lyubich --- pkg/services/object/search/local.go | 64 +++++++ pkg/services/object/search/prm.go | 38 ++++ pkg/services/object/search/query/query.go | 11 ++ pkg/services/object/search/query/v1/v1.go | 112 +++++++++++ .../object/search/query/v1/v1_test.go | 123 ++++++++++++ pkg/services/object/search/remote.go | 44 +++++ pkg/services/object/search/res.go | 13 ++ pkg/services/object/search/service.go | 98 ++++++++++ pkg/services/object/search/streamer.go | 181 ++++++++++++++++++ pkg/services/object/search/v2/service.go | 57 ++++++ pkg/services/object/search/v2/streamer.go | 26 +++ pkg/services/object/search/v2/util.go | 58 ++++++ 12 files changed, 825 insertions(+) create mode 100644 pkg/services/object/search/local.go create mode 100644 pkg/services/object/search/prm.go create mode 100644 pkg/services/object/search/query/query.go create mode 100644 pkg/services/object/search/query/v1/v1.go create mode 100644 pkg/services/object/search/query/v1/v1_test.go create mode 100644 pkg/services/object/search/remote.go create mode 100644 pkg/services/object/search/res.go create mode 100644 pkg/services/object/search/service.go create mode 100644 pkg/services/object/search/streamer.go create mode 100644 pkg/services/object/search/v2/service.go create mode 100644 pkg/services/object/search/v2/streamer.go create mode 100644 pkg/services/object/search/v2/util.go diff --git a/pkg/services/object/search/local.go b/pkg/services/object/search/local.go new file mode 100644 index 000000000..e8e2bc0b4 --- /dev/null +++ b/pkg/services/object/search/local.go @@ -0,0 +1,64 @@ +package searchsvc + +import ( + "context" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" + "github.com/nspcc-dev/neofs-node/pkg/services/object/search/query" + "github.com/pkg/errors" +) + +type localStream struct { + query query.Query + + storage *localstore.Storage +} + +func (s *localStream) stream(ctx context.Context, ch chan<- []*object.ID) error { + idList := make([]*object.ID, 0) + + if err := s.storage.Iterate(newFilterPipeline(s.query), func(meta *localstore.ObjectMeta) bool { + select { + case <-ctx.Done(): + return true + default: + idList = append(idList, meta.Head().GetID()) + + return false + } + }); err != nil && !errors.Is(errors.Cause(err), bucket.ErrIteratingAborted) { + return errors.Wrapf(err, "(%T) could not iterate over local storage", s) + } + + ch <- idList + + return nil +} + +func newFilterPipeline(q query.Query) localstore.FilterPipeline { + res := localstore.NewFilter(&localstore.FilterParams{ + Name: "SEARCH_OBJECTS_FILTER", + FilterFunc: func(context.Context, *localstore.ObjectMeta) *localstore.FilterResult { + return localstore.ResultPass() + }, + }) + + if err := res.PutSubFilter(localstore.SubFilterParams{ + FilterPipeline: localstore.NewFilter(&localstore.FilterParams{ + FilterFunc: func(_ context.Context, o *localstore.ObjectMeta) *localstore.FilterResult { + if !q.Match(o.Head()) { + return localstore.ResultFail() + } + return localstore.ResultPass() + }, + }), + OnIgnore: localstore.CodeFail, + OnFail: localstore.CodeFail, + }); err != nil { + panic(errors.Wrap(err, "could not create all pass including filter")) + } + + return res +} diff --git a/pkg/services/object/search/prm.go b/pkg/services/object/search/prm.go new file mode 100644 index 000000000..9defd68d7 --- /dev/null +++ b/pkg/services/object/search/prm.go @@ -0,0 +1,38 @@ +package searchsvc + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-node/pkg/services/object/search/query" +) + +type Prm struct { + local bool + + cid *container.ID + + query query.Query +} + +func (p *Prm) WithContainerID(v *container.ID) *Prm { + if p != nil { + p.cid = v + } + + return p +} + +func (p *Prm) WithSearchQuery(v query.Query) *Prm { + if p != nil { + p.query = v + } + + return p +} + +func (p *Prm) OnlyLocal(v bool) *Prm { + if p != nil { + p.local = v + } + + return p +} diff --git a/pkg/services/object/search/query/query.go b/pkg/services/object/search/query/query.go new file mode 100644 index 000000000..33051d3d7 --- /dev/null +++ b/pkg/services/object/search/query/query.go @@ -0,0 +1,11 @@ +package query + +import ( + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" +) + +type Query interface { + ToSearchFilters() objectSDK.SearchFilters + Match(*object.Object) bool +} diff --git a/pkg/services/object/search/query/v1/v1.go b/pkg/services/object/search/query/v1/v1.go new file mode 100644 index 000000000..0aff37722 --- /dev/null +++ b/pkg/services/object/search/query/v1/v1.go @@ -0,0 +1,112 @@ +package query + +import ( + "encoding/hex" + "fmt" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-api-go/pkg/owner" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/services/object/search/query" +) + +type Query struct { + filters []*Filter +} + +type matchType uint8 + +type Filter struct { + matchType matchType + + key, val string +} + +const ( + _ matchType = iota + matchStringEqual +) + +func New(filters ...*Filter) query.Query { + return &Query{ + filters: filters, + } +} + +func idValue(id *objectSDK.ID) string { + return hex.EncodeToString(id.ToV2().GetValue()) +} + +func NewIDEqualFilter(id *objectSDK.ID) *Filter { + return NewFilterEqual(objectSDK.HdrSysNameID, idValue(id)) +} + +func cidValue(id *container.ID) string { + return hex.EncodeToString(id.ToV2().GetValue()) +} + +func NewContainerIDEqualFilter(id *container.ID) *Filter { + return NewFilterEqual(objectSDK.HdrSysNameCID, cidValue(id)) +} + +func ownerIDValue(id *owner.ID) string { + return hex.EncodeToString(id.ToV2().GetValue()) +} + +func NewOwnerIDEqualFilter(id *owner.ID) *Filter { + return NewFilterEqual(objectSDK.HdrSysNameOwnerID, ownerIDValue(id)) +} + +func NewFilterEqual(key, val string) *Filter { + return &Filter{ + matchType: matchStringEqual, + key: key, + val: val, + } +} + +func (q *Query) Match(obj *object.Object) bool { + for _, f := range q.filters { + switch f.matchType { + case matchStringEqual: + if !headerEqual(obj, f.key, f.val) { + return false + } + default: + panic(fmt.Sprintf("unsupported match type %d", f.matchType)) + } + } + + return true +} + +func headerEqual(obj *object.Object, key, value string) bool { + switch key { + default: + for _, attr := range obj.GetAttributes() { + if attr.GetKey() == key && attr.GetValue() == value { + return true + } + } + + return false + case objectSDK.HdrSysNameID: + return value == idValue(obj.GetID()) + case objectSDK.HdrSysNameCID: + return value == cidValue(obj.GetContainerID()) + case objectSDK.HdrSysNameOwnerID: + return value == ownerIDValue(obj.GetOwnerID()) + // TODO: add other headers + } +} + +func (q *Query) ToSearchFilters() objectSDK.SearchFilters { + fs := make(objectSDK.SearchFilters, 0, len(q.filters)) + + for i := range q.filters { + fs.AddFilter(q.filters[i].key, q.filters[i].val, objectSDK.MatchStringEqual) + } + + return fs +} diff --git a/pkg/services/object/search/query/v1/v1_test.go b/pkg/services/object/search/query/v1/v1_test.go new file mode 100644 index 000000000..9540b5915 --- /dev/null +++ b/pkg/services/object/search/query/v1/v1_test.go @@ -0,0 +1,123 @@ +package query + +import ( + "crypto/rand" + "crypto/sha256" + "testing" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-api-go/pkg/owner" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/stretchr/testify/require" +) + +func testID(t *testing.T) *objectSDK.ID { + cs := [sha256.Size]byte{} + + _, err := rand.Read(cs[:]) + require.NoError(t, err) + + id := objectSDK.NewID() + id.SetSHA256(cs) + + return id +} + +func testCID(t *testing.T) *container.ID { + cs := [sha256.Size]byte{} + + _, err := rand.Read(cs[:]) + require.NoError(t, err) + + id := container.NewID() + id.SetSHA256(cs) + + return id +} + +func testOwnerID(t *testing.T) *owner.ID { + w := new(owner.NEO3Wallet) + + _, err := rand.Read(w.Bytes()) + require.NoError(t, err) + + id := owner.NewID() + id.SetNeo3Wallet(w) + + return id +} + +func TestQ_Match(t *testing.T) { + t.Run("object identifier equal", func(t *testing.T) { + obj := object.NewRaw() + + id := testID(t) + obj.SetID(id) + + q := New( + NewIDEqualFilter(id), + ) + + require.True(t, q.Match(obj.Object())) + + obj.SetID(testID(t)) + + require.False(t, q.Match(obj.Object())) + }) + + t.Run("container identifier equal", func(t *testing.T) { + obj := object.NewRaw() + + id := testCID(t) + obj.SetContainerID(id) + + q := New( + NewContainerIDEqualFilter(id), + ) + + require.True(t, q.Match(obj.Object())) + + obj.SetContainerID(testCID(t)) + + require.False(t, q.Match(obj.Object())) + }) + + t.Run("owner identifier equal", func(t *testing.T) { + obj := object.NewRaw() + + id := testOwnerID(t) + obj.SetOwnerID(id) + + q := New( + NewOwnerIDEqualFilter(id), + ) + + require.True(t, q.Match(obj.Object())) + + obj.SetOwnerID(testOwnerID(t)) + + require.False(t, q.Match(obj.Object())) + }) + + t.Run("attribute equal", func(t *testing.T) { + obj := object.NewRaw() + + k, v := "key", "val" + a := new(objectSDK.Attribute) + a.SetKey(k) + a.SetValue(v) + + obj.SetAttributes(a) + + q := New( + NewFilterEqual(k, v), + ) + + require.True(t, q.Match(obj.Object())) + + a.SetKey(k + "1") + + require.False(t, q.Match(obj.Object())) + }) +} diff --git a/pkg/services/object/search/remote.go b/pkg/services/object/search/remote.go new file mode 100644 index 000000000..d597d3018 --- /dev/null +++ b/pkg/services/object/search/remote.go @@ -0,0 +1,44 @@ +package searchsvc + +import ( + "context" + "crypto/ecdsa" + + "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/pkg/errors" +) + +type remoteStream struct { + prm *Prm + + key *ecdsa.PrivateKey + + addr *network.Address +} + +func (s *remoteStream) stream(ctx context.Context, ch chan<- []*object.ID) error { + addr := s.addr.NetAddr() + + c, err := client.New(s.key, + client.WithAddress(addr), + ) + if err != nil { + return errors.Wrapf(err, "(%T) could not create SDK client %s", s, addr) + } + + // TODO: add writer parameter to SDK client + id, err := c.SearchObject(ctx, new(client.SearchObjectParams). + WithContainerID(s.prm.cid). + WithSearchFilters(s.prm.query.ToSearchFilters()), + client.WithTTL(1), // FIXME: use constant + ) + if err != nil { + return errors.Wrapf(err, "(%T) could not search objects in %s", s, addr) + } + + ch <- id + + return nil +} diff --git a/pkg/services/object/search/res.go b/pkg/services/object/search/res.go new file mode 100644 index 000000000..be99ffe60 --- /dev/null +++ b/pkg/services/object/search/res.go @@ -0,0 +1,13 @@ +package searchsvc + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/object" +) + +type Response struct { + idList []*object.ID +} + +func (r *Response) IDList() []*object.ID { + return r.idList +} diff --git a/pkg/services/object/search/service.go b/pkg/services/object/search/service.go new file mode 100644 index 000000000..33dae8ee2 --- /dev/null +++ b/pkg/services/object/search/service.go @@ -0,0 +1,98 @@ +package searchsvc + +import ( + "context" + "crypto/ecdsa" + "sync" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/container" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/util" +) + +type Service struct { + *cfg +} + +type Option func(*cfg) + +type cfg struct { + key *ecdsa.PrivateKey + + localStore *localstore.Storage + + cnrSrc container.Source + + netMapSrc netmap.Source + + workerPool util.WorkerPool + + localAddrSrc network.LocalAddressSource +} + +func defaultCfg() *cfg { + return &cfg{ + workerPool: new(util.SyncWorkerPool), + } +} + +func NewService(opts ...Option) *Service { + c := defaultCfg() + + for i := range opts { + opts[i](c) + } + + return &Service{ + cfg: c, + } +} + +func (p *Service) Search(ctx context.Context, prm *Prm) (*Streamer, error) { + return &Streamer{ + cfg: p.cfg, + once: new(sync.Once), + prm: prm, + ctx: ctx, + cache: make([][]*object.ID, 0, 10), + }, nil +} + +func WithKey(v *ecdsa.PrivateKey) Option { + return func(c *cfg) { + c.key = v + } +} + +func WithLocalStorage(v *localstore.Storage) Option { + return func(c *cfg) { + c.localStore = v + } +} + +func WithContainerSource(v container.Source) Option { + return func(c *cfg) { + c.cnrSrc = v + } +} + +func WithNetworkMapSource(v netmap.Source) Option { + return func(c *cfg) { + c.netMapSrc = v + } +} + +func WithWorkerPool(v util.WorkerPool) Option { + return func(c *cfg) { + c.workerPool = v + } +} + +func WithLocalAddressSource(v network.LocalAddressSource) Option { + return func(c *cfg) { + c.localAddrSrc = v + } +} diff --git a/pkg/services/object/search/streamer.go b/pkg/services/object/search/streamer.go new file mode 100644 index 000000000..5300fb49d --- /dev/null +++ b/pkg/services/object/search/streamer.go @@ -0,0 +1,181 @@ +package searchsvc + +import ( + "context" + "io" + "sync" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + "github.com/pkg/errors" +) + +type Streamer struct { + *cfg + + once *sync.Once + + prm *Prm + + traverser *placement.Traverser + + ctx context.Context + + ch chan []*object.ID + + cache [][]*object.ID +} + +func (p *Streamer) Recv() (*Response, error) { + var err error + + p.once.Do(func() { + if err = p.preparePrm(p.prm); err == nil { + go p.start(p.prm) + } + }) + + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not start streaming", p) + } + + select { + case <-p.ctx.Done(): + return nil, errors.Wrapf(p.ctx.Err(), "(%T) context is done", p) + case v, ok := <-p.ch: + if !ok { + return nil, io.EOF + } + + v = p.cutCached(v) + + return &Response{ + idList: v, + }, nil + } +} + +func (p *Streamer) cutCached(ids []*object.ID) []*object.ID { +loop: + for i := 0; i < len(ids); i++ { + for j := range p.cache { + for k := range p.cache[j] { + if ids[i].Equal(p.cache[j][k]) { + ids = append(ids[:i], ids[i+1:]...) + + i-- + + continue loop + } + } + } + } + + if len(ids) > 0 { + p.cache = append(p.cache, ids) + } + + return ids +} + +func (p *Streamer) preparePrm(prm *Prm) error { + var err error + + // get latest network map + nm, err := netmap.GetLatestNetworkMap(p.netMapSrc) + if err != nil { + return errors.Wrapf(err, "(%T) could not get latest network map", p) + } + + // get container to store the object + cnr, err := p.cnrSrc.Get(prm.cid) + if err != nil { + return errors.Wrapf(err, "(%T) could not get container by ID", p) + } + + // allocate placement traverser options + traverseOpts := make([]placement.Option, 0, 4) + + // add common options + traverseOpts = append(traverseOpts, + // set processing container + placement.ForContainer(cnr), + ) + + // create placement builder from network map + builder := placement.NewNetworkMapBuilder(nm) + + if prm.local { + // restrict success count to 1 stored copy (to local storage) + traverseOpts = append(traverseOpts, placement.SuccessAfter(1)) + + // use local-only placement builder + builder = util.NewLocalPlacement(placement.NewNetworkMapBuilder(nm), p.localAddrSrc) + } + + // set placement builder + traverseOpts = append(traverseOpts, placement.UseBuilder(builder)) + + // build placement traverser + if p.traverser, err = placement.NewTraverser(traverseOpts...); err != nil { + return errors.Wrapf(err, "(%T) could not build placement traverser", p) + } + + p.ch = make(chan []*object.ID) + + return nil +} + +func (p *Streamer) start(prm *Prm) { + defer close(p.ch) + +loop: + for { + addrs := p.traverser.Next() + if len(addrs) == 0 { + break + } + + wg := new(sync.WaitGroup) + + for i := range addrs { + wg.Add(1) + + addr := addrs[i] + + if err := p.workerPool.Submit(func() { + defer wg.Done() + + var streamer interface { + stream(context.Context, chan<- []*object.ID) error + } + + if network.IsLocalAddress(p.localAddrSrc, addr) { + streamer = &localStream{ + query: prm.query, + storage: p.localStore, + } + } else { + streamer = &remoteStream{ + prm: prm, + key: p.key, + addr: addr, + } + } + + if err := streamer.stream(p.ctx, p.ch); err != nil { + // TODO: log error + } + }); err != nil { + wg.Done() + // TODO: log error + break loop + } + } + + wg.Wait() + } +} diff --git a/pkg/services/object/search/v2/service.go b/pkg/services/object/search/v2/service.go new file mode 100644 index 000000000..3722b741b --- /dev/null +++ b/pkg/services/object/search/v2/service.go @@ -0,0 +1,57 @@ +package searchsvc + +import ( + "context" + + "github.com/nspcc-dev/neofs-api-go/v2/object" + searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" + "github.com/pkg/errors" +) + +// Service implements Search operation of Object service v2. +type Service struct { + *cfg +} + +// Option represents Service constructor option. +type Option func(*cfg) + +type cfg struct { + svc *searchsvc.Service +} + +// NewService constructs Service instance from provided options. +func NewService(opts ...Option) *Service { + c := new(cfg) + + for i := range opts { + opts[i](c) + } + + return &Service{ + cfg: c, + } +} + +// Search calls internal service and returns v2 search object streamer. +func (s *Service) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) { + prm, err := toPrm(req.GetBody(), req.GetMetaHeader().GetTTL()) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not convert search parameters", s) + } + + stream, err := s.svc.Search(ctx, prm) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not open object search stream", s) + } + + return &streamer{ + stream: stream, + }, nil +} + +func WithInternalService(v *searchsvc.Service) Option { + return func(c *cfg) { + c.svc = v + } +} diff --git a/pkg/services/object/search/v2/streamer.go b/pkg/services/object/search/v2/streamer.go new file mode 100644 index 000000000..e97f6dae0 --- /dev/null +++ b/pkg/services/object/search/v2/streamer.go @@ -0,0 +1,26 @@ +package searchsvc + +import ( + "io" + + "github.com/nspcc-dev/neofs-api-go/v2/object" + searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" + "github.com/pkg/errors" +) + +type streamer struct { + stream *searchsvc.Streamer +} + +func (s *streamer) Recv() (*object.SearchResponse, error) { + r, err := s.stream.Recv() + if err != nil { + if errors.Is(errors.Cause(err), io.EOF) { + return nil, io.EOF + } + + return nil, errors.Wrapf(err, "(%T) could not receive search response", s) + } + + return fromResponse(r), nil +} diff --git a/pkg/services/object/search/v2/util.go b/pkg/services/object/search/v2/util.go new file mode 100644 index 000000000..affab8c60 --- /dev/null +++ b/pkg/services/object/search/v2/util.go @@ -0,0 +1,58 @@ +package searchsvc + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-api-go/v2/refs" + searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" + "github.com/nspcc-dev/neofs-node/pkg/services/object/search/query" + queryV1 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/query/v1" + "github.com/pkg/errors" +) + +func toPrm(req *object.SearchRequestBody, ttl uint32) (*searchsvc.Prm, error) { + var q query.Query + + switch v := req.GetVersion(); v { + default: + return nil, errors.Errorf("unsupported query version #%d", v) + case 1: + fs := req.GetFilters() + fsV1 := make([]*queryV1.Filter, 0, len(fs)) + + for _, f := range fs { + switch mt := f.GetMatchType(); mt { + default: + return nil, errors.Errorf("unsupported match type %d in query version #%d", mt, v) + case object.MatchStringEqual: + fsV1 = append(fsV1, queryV1.NewFilterEqual(f.GetName(), f.GetValue())) + } + } + + q = queryV1.New(fsV1...) + } + + return new(searchsvc.Prm). + WithContainerID( + container.NewIDFromV2(req.GetContainerID()), + ). + WithSearchQuery(q). + OnlyLocal(ttl == 1), nil // FIXME: use constant +} + +func fromResponse(r *searchsvc.Response) *object.SearchResponse { + ids := r.IDList() + idsV2 := make([]*refs.ObjectID, 0, len(ids)) + + for i := range ids { + idsV2 = append(idsV2, ids[i].ToV2()) + } + + body := new(object.SearchResponseBody) + body.SetIDList(idsV2) + + resp := new(object.SearchResponse) + resp.SetBody(body) + + return resp +}