[#34] service/object: Implement object Search distributed service

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-09-22 09:51:47 +03:00 committed by Alex Vanin
parent a5ebdd1891
commit 09084a7bff
12 changed files with 825 additions and 0 deletions

View file

@ -0,0 +1,64 @@
package searchsvc
import (
"context"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"
"github.com/nspcc-dev/neofs-node/pkg/services/object/search/query"
"github.com/pkg/errors"
)
type localStream struct {
query query.Query
storage *localstore.Storage
}
func (s *localStream) stream(ctx context.Context, ch chan<- []*object.ID) error {
idList := make([]*object.ID, 0)
if err := s.storage.Iterate(newFilterPipeline(s.query), func(meta *localstore.ObjectMeta) bool {
select {
case <-ctx.Done():
return true
default:
idList = append(idList, meta.Head().GetID())
return false
}
}); err != nil && !errors.Is(errors.Cause(err), bucket.ErrIteratingAborted) {
return errors.Wrapf(err, "(%T) could not iterate over local storage", s)
}
ch <- idList
return nil
}
func newFilterPipeline(q query.Query) localstore.FilterPipeline {
res := localstore.NewFilter(&localstore.FilterParams{
Name: "SEARCH_OBJECTS_FILTER",
FilterFunc: func(context.Context, *localstore.ObjectMeta) *localstore.FilterResult {
return localstore.ResultPass()
},
})
if err := res.PutSubFilter(localstore.SubFilterParams{
FilterPipeline: localstore.NewFilter(&localstore.FilterParams{
FilterFunc: func(_ context.Context, o *localstore.ObjectMeta) *localstore.FilterResult {
if !q.Match(o.Head()) {
return localstore.ResultFail()
}
return localstore.ResultPass()
},
}),
OnIgnore: localstore.CodeFail,
OnFail: localstore.CodeFail,
}); err != nil {
panic(errors.Wrap(err, "could not create all pass including filter"))
}
return res
}

View file

@ -0,0 +1,38 @@
package searchsvc
import (
"github.com/nspcc-dev/neofs-api-go/pkg/container"
"github.com/nspcc-dev/neofs-node/pkg/services/object/search/query"
)
type Prm struct {
local bool
cid *container.ID
query query.Query
}
func (p *Prm) WithContainerID(v *container.ID) *Prm {
if p != nil {
p.cid = v
}
return p
}
func (p *Prm) WithSearchQuery(v query.Query) *Prm {
if p != nil {
p.query = v
}
return p
}
func (p *Prm) OnlyLocal(v bool) *Prm {
if p != nil {
p.local = v
}
return p
}

View file

@ -0,0 +1,11 @@
package query
import (
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
)
type Query interface {
ToSearchFilters() objectSDK.SearchFilters
Match(*object.Object) bool
}

View file

@ -0,0 +1,112 @@
package query
import (
"encoding/hex"
"fmt"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object/search/query"
)
type Query struct {
filters []*Filter
}
type matchType uint8
type Filter struct {
matchType matchType
key, val string
}
const (
_ matchType = iota
matchStringEqual
)
func New(filters ...*Filter) query.Query {
return &Query{
filters: filters,
}
}
func idValue(id *objectSDK.ID) string {
return hex.EncodeToString(id.ToV2().GetValue())
}
func NewIDEqualFilter(id *objectSDK.ID) *Filter {
return NewFilterEqual(objectSDK.HdrSysNameID, idValue(id))
}
func cidValue(id *container.ID) string {
return hex.EncodeToString(id.ToV2().GetValue())
}
func NewContainerIDEqualFilter(id *container.ID) *Filter {
return NewFilterEqual(objectSDK.HdrSysNameCID, cidValue(id))
}
func ownerIDValue(id *owner.ID) string {
return hex.EncodeToString(id.ToV2().GetValue())
}
func NewOwnerIDEqualFilter(id *owner.ID) *Filter {
return NewFilterEqual(objectSDK.HdrSysNameOwnerID, ownerIDValue(id))
}
func NewFilterEqual(key, val string) *Filter {
return &Filter{
matchType: matchStringEqual,
key: key,
val: val,
}
}
func (q *Query) Match(obj *object.Object) bool {
for _, f := range q.filters {
switch f.matchType {
case matchStringEqual:
if !headerEqual(obj, f.key, f.val) {
return false
}
default:
panic(fmt.Sprintf("unsupported match type %d", f.matchType))
}
}
return true
}
func headerEqual(obj *object.Object, key, value string) bool {
switch key {
default:
for _, attr := range obj.GetAttributes() {
if attr.GetKey() == key && attr.GetValue() == value {
return true
}
}
return false
case objectSDK.HdrSysNameID:
return value == idValue(obj.GetID())
case objectSDK.HdrSysNameCID:
return value == cidValue(obj.GetContainerID())
case objectSDK.HdrSysNameOwnerID:
return value == ownerIDValue(obj.GetOwnerID())
// TODO: add other headers
}
}
func (q *Query) ToSearchFilters() objectSDK.SearchFilters {
fs := make(objectSDK.SearchFilters, 0, len(q.filters))
for i := range q.filters {
fs.AddFilter(q.filters[i].key, q.filters[i].val, objectSDK.MatchStringEqual)
}
return fs
}

View file

@ -0,0 +1,123 @@
package query
import (
"crypto/rand"
"crypto/sha256"
"testing"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/stretchr/testify/require"
)
func testID(t *testing.T) *objectSDK.ID {
cs := [sha256.Size]byte{}
_, err := rand.Read(cs[:])
require.NoError(t, err)
id := objectSDK.NewID()
id.SetSHA256(cs)
return id
}
func testCID(t *testing.T) *container.ID {
cs := [sha256.Size]byte{}
_, err := rand.Read(cs[:])
require.NoError(t, err)
id := container.NewID()
id.SetSHA256(cs)
return id
}
func testOwnerID(t *testing.T) *owner.ID {
w := new(owner.NEO3Wallet)
_, err := rand.Read(w.Bytes())
require.NoError(t, err)
id := owner.NewID()
id.SetNeo3Wallet(w)
return id
}
func TestQ_Match(t *testing.T) {
t.Run("object identifier equal", func(t *testing.T) {
obj := object.NewRaw()
id := testID(t)
obj.SetID(id)
q := New(
NewIDEqualFilter(id),
)
require.True(t, q.Match(obj.Object()))
obj.SetID(testID(t))
require.False(t, q.Match(obj.Object()))
})
t.Run("container identifier equal", func(t *testing.T) {
obj := object.NewRaw()
id := testCID(t)
obj.SetContainerID(id)
q := New(
NewContainerIDEqualFilter(id),
)
require.True(t, q.Match(obj.Object()))
obj.SetContainerID(testCID(t))
require.False(t, q.Match(obj.Object()))
})
t.Run("owner identifier equal", func(t *testing.T) {
obj := object.NewRaw()
id := testOwnerID(t)
obj.SetOwnerID(id)
q := New(
NewOwnerIDEqualFilter(id),
)
require.True(t, q.Match(obj.Object()))
obj.SetOwnerID(testOwnerID(t))
require.False(t, q.Match(obj.Object()))
})
t.Run("attribute equal", func(t *testing.T) {
obj := object.NewRaw()
k, v := "key", "val"
a := new(objectSDK.Attribute)
a.SetKey(k)
a.SetValue(v)
obj.SetAttributes(a)
q := New(
NewFilterEqual(k, v),
)
require.True(t, q.Match(obj.Object()))
a.SetKey(k + "1")
require.False(t, q.Match(obj.Object()))
})
}

View file

@ -0,0 +1,44 @@
package searchsvc
import (
"context"
"crypto/ecdsa"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/pkg/errors"
)
type remoteStream struct {
prm *Prm
key *ecdsa.PrivateKey
addr *network.Address
}
func (s *remoteStream) stream(ctx context.Context, ch chan<- []*object.ID) error {
addr := s.addr.NetAddr()
c, err := client.New(s.key,
client.WithAddress(addr),
)
if err != nil {
return errors.Wrapf(err, "(%T) could not create SDK client %s", s, addr)
}
// TODO: add writer parameter to SDK client
id, err := c.SearchObject(ctx, new(client.SearchObjectParams).
WithContainerID(s.prm.cid).
WithSearchFilters(s.prm.query.ToSearchFilters()),
client.WithTTL(1), // FIXME: use constant
)
if err != nil {
return errors.Wrapf(err, "(%T) could not search objects in %s", s, addr)
}
ch <- id
return nil
}

View file

@ -0,0 +1,13 @@
package searchsvc
import (
"github.com/nspcc-dev/neofs-api-go/pkg/object"
)
type Response struct {
idList []*object.ID
}
func (r *Response) IDList() []*object.ID {
return r.idList
}

View file

@ -0,0 +1,98 @@
package searchsvc
import (
"context"
"crypto/ecdsa"
"sync"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/util"
)
type Service struct {
*cfg
}
type Option func(*cfg)
type cfg struct {
key *ecdsa.PrivateKey
localStore *localstore.Storage
cnrSrc container.Source
netMapSrc netmap.Source
workerPool util.WorkerPool
localAddrSrc network.LocalAddressSource
}
func defaultCfg() *cfg {
return &cfg{
workerPool: new(util.SyncWorkerPool),
}
}
func NewService(opts ...Option) *Service {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
return &Service{
cfg: c,
}
}
func (p *Service) Search(ctx context.Context, prm *Prm) (*Streamer, error) {
return &Streamer{
cfg: p.cfg,
once: new(sync.Once),
prm: prm,
ctx: ctx,
cache: make([][]*object.ID, 0, 10),
}, nil
}
func WithKey(v *ecdsa.PrivateKey) Option {
return func(c *cfg) {
c.key = v
}
}
func WithLocalStorage(v *localstore.Storage) Option {
return func(c *cfg) {
c.localStore = v
}
}
func WithContainerSource(v container.Source) Option {
return func(c *cfg) {
c.cnrSrc = v
}
}
func WithNetworkMapSource(v netmap.Source) Option {
return func(c *cfg) {
c.netMapSrc = v
}
}
func WithWorkerPool(v util.WorkerPool) Option {
return func(c *cfg) {
c.workerPool = v
}
}
func WithLocalAddressSource(v network.LocalAddressSource) Option {
return func(c *cfg) {
c.localAddrSrc = v
}
}

View file

@ -0,0 +1,181 @@
package searchsvc
import (
"context"
"io"
"sync"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/pkg/errors"
)
type Streamer struct {
*cfg
once *sync.Once
prm *Prm
traverser *placement.Traverser
ctx context.Context
ch chan []*object.ID
cache [][]*object.ID
}
func (p *Streamer) Recv() (*Response, error) {
var err error
p.once.Do(func() {
if err = p.preparePrm(p.prm); err == nil {
go p.start(p.prm)
}
})
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not start streaming", p)
}
select {
case <-p.ctx.Done():
return nil, errors.Wrapf(p.ctx.Err(), "(%T) context is done", p)
case v, ok := <-p.ch:
if !ok {
return nil, io.EOF
}
v = p.cutCached(v)
return &Response{
idList: v,
}, nil
}
}
func (p *Streamer) cutCached(ids []*object.ID) []*object.ID {
loop:
for i := 0; i < len(ids); i++ {
for j := range p.cache {
for k := range p.cache[j] {
if ids[i].Equal(p.cache[j][k]) {
ids = append(ids[:i], ids[i+1:]...)
i--
continue loop
}
}
}
}
if len(ids) > 0 {
p.cache = append(p.cache, ids)
}
return ids
}
func (p *Streamer) preparePrm(prm *Prm) error {
var err error
// get latest network map
nm, err := netmap.GetLatestNetworkMap(p.netMapSrc)
if err != nil {
return errors.Wrapf(err, "(%T) could not get latest network map", p)
}
// get container to store the object
cnr, err := p.cnrSrc.Get(prm.cid)
if err != nil {
return errors.Wrapf(err, "(%T) could not get container by ID", p)
}
// allocate placement traverser options
traverseOpts := make([]placement.Option, 0, 4)
// add common options
traverseOpts = append(traverseOpts,
// set processing container
placement.ForContainer(cnr),
)
// create placement builder from network map
builder := placement.NewNetworkMapBuilder(nm)
if prm.local {
// restrict success count to 1 stored copy (to local storage)
traverseOpts = append(traverseOpts, placement.SuccessAfter(1))
// use local-only placement builder
builder = util.NewLocalPlacement(placement.NewNetworkMapBuilder(nm), p.localAddrSrc)
}
// set placement builder
traverseOpts = append(traverseOpts, placement.UseBuilder(builder))
// build placement traverser
if p.traverser, err = placement.NewTraverser(traverseOpts...); err != nil {
return errors.Wrapf(err, "(%T) could not build placement traverser", p)
}
p.ch = make(chan []*object.ID)
return nil
}
func (p *Streamer) start(prm *Prm) {
defer close(p.ch)
loop:
for {
addrs := p.traverser.Next()
if len(addrs) == 0 {
break
}
wg := new(sync.WaitGroup)
for i := range addrs {
wg.Add(1)
addr := addrs[i]
if err := p.workerPool.Submit(func() {
defer wg.Done()
var streamer interface {
stream(context.Context, chan<- []*object.ID) error
}
if network.IsLocalAddress(p.localAddrSrc, addr) {
streamer = &localStream{
query: prm.query,
storage: p.localStore,
}
} else {
streamer = &remoteStream{
prm: prm,
key: p.key,
addr: addr,
}
}
if err := streamer.stream(p.ctx, p.ch); err != nil {
// TODO: log error
}
}); err != nil {
wg.Done()
// TODO: log error
break loop
}
}
wg.Wait()
}
}

View file

@ -0,0 +1,57 @@
package searchsvc
import (
"context"
"github.com/nspcc-dev/neofs-api-go/v2/object"
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
"github.com/pkg/errors"
)
// Service implements Search operation of Object service v2.
type Service struct {
*cfg
}
// Option represents Service constructor option.
type Option func(*cfg)
type cfg struct {
svc *searchsvc.Service
}
// NewService constructs Service instance from provided options.
func NewService(opts ...Option) *Service {
c := new(cfg)
for i := range opts {
opts[i](c)
}
return &Service{
cfg: c,
}
}
// Search calls internal service and returns v2 search object streamer.
func (s *Service) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) {
prm, err := toPrm(req.GetBody(), req.GetMetaHeader().GetTTL())
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not convert search parameters", s)
}
stream, err := s.svc.Search(ctx, prm)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not open object search stream", s)
}
return &streamer{
stream: stream,
}, nil
}
func WithInternalService(v *searchsvc.Service) Option {
return func(c *cfg) {
c.svc = v
}
}

View file

@ -0,0 +1,26 @@
package searchsvc
import (
"io"
"github.com/nspcc-dev/neofs-api-go/v2/object"
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
"github.com/pkg/errors"
)
type streamer struct {
stream *searchsvc.Streamer
}
func (s *streamer) Recv() (*object.SearchResponse, error) {
r, err := s.stream.Recv()
if err != nil {
if errors.Is(errors.Cause(err), io.EOF) {
return nil, io.EOF
}
return nil, errors.Wrapf(err, "(%T) could not receive search response", s)
}
return fromResponse(r), nil
}

View file

@ -0,0 +1,58 @@
package searchsvc
import (
"github.com/nspcc-dev/neofs-api-go/pkg/container"
"github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
"github.com/nspcc-dev/neofs-node/pkg/services/object/search/query"
queryV1 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/query/v1"
"github.com/pkg/errors"
)
func toPrm(req *object.SearchRequestBody, ttl uint32) (*searchsvc.Prm, error) {
var q query.Query
switch v := req.GetVersion(); v {
default:
return nil, errors.Errorf("unsupported query version #%d", v)
case 1:
fs := req.GetFilters()
fsV1 := make([]*queryV1.Filter, 0, len(fs))
for _, f := range fs {
switch mt := f.GetMatchType(); mt {
default:
return nil, errors.Errorf("unsupported match type %d in query version #%d", mt, v)
case object.MatchStringEqual:
fsV1 = append(fsV1, queryV1.NewFilterEqual(f.GetName(), f.GetValue()))
}
}
q = queryV1.New(fsV1...)
}
return new(searchsvc.Prm).
WithContainerID(
container.NewIDFromV2(req.GetContainerID()),
).
WithSearchQuery(q).
OnlyLocal(ttl == 1), nil // FIXME: use constant
}
func fromResponse(r *searchsvc.Response) *object.SearchResponse {
ids := r.IDList()
idsV2 := make([]*refs.ObjectID, 0, len(ids))
for i := range ids {
idsV2 = append(idsV2, ids[i].ToV2())
}
body := new(object.SearchResponseBody)
body.SetIDList(idsV2)
resp := new(object.SearchResponse)
resp.SetBody(body)
return resp
}