[#241] object/search: Refactor service processing

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-12-10 15:26:40 +03:00 committed by Alex Vanin
parent 3bfb18386b
commit 611a29f682
23 changed files with 1020 additions and 657 deletions

View file

@ -140,8 +140,8 @@ func (s *objectSvc) Head(ctx context.Context, req *object.HeadRequest) (*object.
return s.get.Head(ctx, req)
}
func (s *objectSvc) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) {
return s.search.Search(ctx, req)
func (s *objectSvc) Search(req *object.SearchRequest, stream objectService.SearchStream) error {
return s.search.Search(req, stream)
}
func (s *objectSvc) Get(req *object.GetRequest, stream objectService.GetObjectStream) error {
@ -307,22 +307,23 @@ func initObjectService(c *cfg) {
putsvcV2.WithInternalService(sPut),
)
sSearch := searchsvc.NewService(
searchsvc.WithKeyStorage(keyStorage),
searchsvc.WithClientCache(clientCache),
searchsvc.WithLocalStorage(ls),
searchsvc.WithContainerSource(c.cfgObject.cnrStorage),
searchsvc.WithNetworkMapSource(c.cfgObject.netMapStorage),
searchsvc.WithLocalAddressSource(c),
searchsvc.WithWorkerPool(c.cfgObject.pool.search),
sSearch := searchsvc.New(
searchsvc.WithLogger(c.log),
searchsvc.WithLocalStorageEngine(ls),
searchsvc.WithClientCache(clientCache),
searchsvc.WithClientOptions(
client.WithDialTimeout(c.viper.GetDuration(cfgObjectSearchDialTimeout)),
),
searchsvc.WithTraverserGenerator(
traverseGen.WithTraverseOptions(
placement.WithoutSuccessTracking(),
),
),
)
sSearchV2 := searchsvcV2.NewService(
searchsvcV2.WithInternalService(sSearch),
searchsvcV2.WithKeyStorage(keyStorage),
)
sHead := headsvc.NewService(
@ -364,7 +365,7 @@ func initObjectService(c *cfg) {
deletesvc.WithPutService(sPut),
deletesvc.WithOwnerID(nodeOwner),
deletesvc.WithLinkingHeader(
headsvc.NewRelationHeader(searchsvc.NewLinkingSearcher(sSearch), sHead),
headsvc.NewRelationHeader(nil, sHead),
),
deletesvc.WithLogger(c.log),
)

View file

@ -0,0 +1,28 @@
package object
import (
"github.com/nspcc-dev/neofs-api-go/v2/object"
objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
)
type searchStreamerV2 struct {
objectGRPC.ObjectService_SearchServer
}
func (s *searchStreamerV2) Send(resp *object.SearchResponse) error {
return s.ObjectService_SearchServer.Send(
object.SearchResponseToGRPCMessage(resp),
)
}
// Search converts gRPC SearchRequest message and server-side stream and overtakes its data
// to gRPC stream.
func (s *Server) Search(req *objectGRPC.SearchRequest, gStream objectGRPC.ObjectService_SearchServer) error {
// TODO: think about how we transport errors through gRPC
return s.srv.Search(
object.SearchRequestFromGRPCMessage(req),
&searchStreamerV2{
ObjectService_SearchServer: gStream,
},
)
}

View file

@ -74,31 +74,6 @@ func (s *Server) Head(ctx context.Context, req *objectGRPC.HeadRequest) (*object
return object.HeadResponseToGRPCMessage(resp), nil
}
// Search converts gRPC SearchRequest message, opens internal Object service Search stream and overtakes its data
// to gRPC stream.
func (s *Server) Search(req *objectGRPC.SearchRequest, gStream objectGRPC.ObjectService_SearchServer) error {
stream, err := s.srv.Search(gStream.Context(), object.SearchRequestFromGRPCMessage(req))
if err != nil {
// TODO: think about how we transport errors through gRPC
return err
}
for {
r, err := stream.Recv()
if err != nil {
if errors.Is(errors.Cause(err), io.EOF) {
return nil
}
return err
}
if err := gStream.Send(object.SearchResponseToGRPCMessage(r)); err != nil {
return err
}
}
}
// GetRangeHash converts gRPC GetRangeHashRequest message and passes it to internal Object service.
func (s *Server) GetRangeHash(ctx context.Context, req *objectGRPC.GetRangeHashRequest) (*objectGRPC.GetRangeHashResponse, error) {
resp, err := s.srv.GetRangeHash(ctx, object.GetRangeHashRequestFromGRPCMessage(req))

View file

@ -54,11 +54,11 @@ type (
}
searchStreamBasicChecker struct {
object.SearchObjectStreamer
}
objectSvc.SearchStream
getRangeStreamBasicChecker struct {
object.GetRangeObjectStreamer
info requestInfo
*eACLCfg
}
requestInfo struct {
@ -209,15 +209,12 @@ func (b Service) Head(
return resp, err
}
func (b Service) Search(
ctx context.Context,
request *object.SearchRequest) (object.SearchObjectStreamer, error) {
func (b Service) Search(request *object.SearchRequest, stream objectSvc.SearchStream) error {
var cid *container.ID
cid, err := getContainerIDFromRequest(request)
if err != nil {
return nil, err
return err
}
req := metaWithToken{
@ -228,17 +225,20 @@ func (b Service) Search(
reqInfo, err := b.findRequestInfo(req, cid, acl.OperationSearch)
if err != nil {
return nil, err
return err
}
if !basicACLCheck(reqInfo) {
return nil, basicACLErr(reqInfo)
return basicACLErr(reqInfo)
} else if !eACLCheck(request, reqInfo, b.eACLCfg) {
return nil, eACLErr(reqInfo)
return eACLErr(reqInfo)
}
stream, err := b.next.Search(ctx, request)
return searchStreamBasicChecker{stream}, err
return b.next.Search(request, &searchStreamBasicChecker{
SearchStream: stream,
info: reqInfo,
eACLCfg: b.eACLCfg,
})
}
func (b Service) Delete(
@ -390,6 +390,14 @@ func (g *rangeStreamBasicChecker) Send(resp *object.GetRangeResponse) error {
return g.GetObjectRangeStream.Send(resp)
}
func (g *searchStreamBasicChecker) Send(resp *object.SearchResponse) error {
if !eACLCheck(resp, g.info, g.eACLCfg) {
return eACLErr(g.info)
}
return g.SearchStream.Send(resp)
}
func (b Service) findRequestInfo(
req metaWithToken,
cid *container.ID,

View file

@ -16,7 +16,9 @@ type ResponseService struct {
}
type searchStreamResponser struct {
stream *response.ServerMessageStreamer
util.ServerStream
respWriter util.ResponseMessageWriter
}
type getStreamResponser struct {
@ -101,35 +103,17 @@ func (s *ResponseService) Head(ctx context.Context, req *object.HeadRequest) (*o
return resp.(*object.HeadResponse), nil
}
func (s *searchStreamResponser) Recv() (*object.SearchResponse, error) {
r, err := s.stream.Recv()
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not receive response", s)
}
return r.(*object.SearchResponse), nil
func (s *searchStreamResponser) Send(resp *object.SearchResponse) error {
return s.respWriter(resp)
}
func (s *ResponseService) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) {
stream, err := s.respSvc.HandleServerStreamRequest(ctx, req,
func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) {
stream, err := s.svc.Search(ctx, req.(*object.SearchRequest))
if err != nil {
return nil, err
}
return func() (util.ResponseMessage, error) {
return stream.Recv()
}, nil
},
)
if err != nil {
return nil, err
}
return &searchStreamResponser{
stream: stream,
}, nil
func (s *ResponseService) Search(req *object.SearchRequest, stream SearchStream) error {
return s.svc.Search(req, &searchStreamResponser{
ServerStream: stream,
respWriter: s.respSvc.HandleServerStreamRequest_(func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.SearchResponse))
}),
})
}
func (s *ResponseService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {

View file

@ -0,0 +1,50 @@
package searchsvc
import (
"context"
"go.uber.org/zap"
)
func (exec *execCtx) executeOnContainer() {
if exec.isLocal() {
exec.log.Debug("return result directly")
return
}
exec.log.Debug("trying to execute in container...")
traverser, ok := exec.generateTraverser(exec.containerID())
if !ok {
return
}
ctx, cancel := context.WithCancel(exec.context())
defer cancel()
loop:
for {
addrs := traverser.Next()
if len(addrs) == 0 {
exec.log.Debug("no more nodes, abort placement iteration")
break
}
for i := range addrs {
select {
case <-ctx.Done():
exec.log.Debug("interrupt placement iteration by context",
zap.String("error", ctx.Err().Error()),
)
break loop
default:
}
// TODO: consider parallel execution
exec.processNode(ctx, addrs[i])
}
}
exec.status = statusOK
exec.err = nil
}

View file

@ -0,0 +1,147 @@
package searchsvc
import (
"context"
"crypto/ecdsa"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"go.uber.org/zap"
)
type statusError struct {
status int
err error
}
type execCtx struct {
svc *Service
ctx context.Context
prm Prm
statusError
log *logger.Logger
}
const (
statusUndefined int = iota
statusOK
)
func (exec *execCtx) prepare() {
if _, ok := exec.prm.writer.(*uniqueIDWriter); !ok {
exec.prm.writer = newUniqueAddressWriter(exec.prm.writer)
}
fs := exec.prm.SearchFilters()
fs.AddObjectContainerIDFilter(objectSDK.MatchStringEqual, exec.containerID())
exec.prm.WithSearchFilters(fs)
}
func (exec *execCtx) setLogger(l *logger.Logger) {
exec.log = l.With(
zap.String("request", "SEARCH"),
zap.Stringer("container", exec.containerID()),
zap.Bool("local", exec.isLocal()),
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
)
}
func (exec execCtx) context() context.Context {
return exec.ctx
}
func (exec execCtx) isLocal() bool {
return exec.prm.common.LocalOnly()
}
func (exec execCtx) key() *ecdsa.PrivateKey {
return exec.prm.key
}
func (exec execCtx) callOptions() []client.CallOption {
return exec.prm.callOpts
}
func (exec execCtx) remotePrm() *client.SearchObjectParams {
return &exec.prm.SearchObjectParams
}
func (exec *execCtx) containerID() *container.ID {
return exec.prm.ContainerID()
}
func (exec *execCtx) searchFilters() objectSDK.SearchFilters {
return exec.prm.SearchFilters()
}
func (exec *execCtx) generateTraverser(cid *container.ID) (*placement.Traverser, bool) {
t, err := exec.svc.traverserGenerator.generateTraverser(cid)
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not generate container traverser",
zap.String("error", err.Error()),
)
return nil, false
case err == nil:
return t, true
}
}
func (exec execCtx) remoteClient(node *network.Address) (searchClient, bool) {
ipAddr, err := node.IPAddrString()
log := exec.log.With(zap.Stringer("node", node))
switch {
default:
exec.status = statusUndefined
exec.err = err
log.Debug("could not calculate node IP address")
case err == nil:
c, err := exec.svc.clientCache.get(exec.key(), ipAddr)
switch {
default:
exec.status = statusUndefined
exec.err = err
log.Debug("could not construct remote node client")
case err == nil:
return c, true
}
}
return nil, false
}
func (exec *execCtx) writeIDList(ids []*objectSDK.ID) {
err := exec.prm.writer.WriteIDs(ids)
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not write object identifiers",
zap.String("error", err.Error()),
)
case err == nil:
exec.status = statusOK
exec.err = nil
}
}

View file

@ -1,41 +1,22 @@
package searchsvc
import (
"context"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/services/object/search/query"
"github.com/pkg/errors"
"go.uber.org/zap"
)
type localStream struct {
query query.Query
func (exec *execCtx) executeLocal() {
ids, err := exec.svc.localStorage.search(exec)
storage *engine.StorageEngine
cid *container.ID
}
func (s *localStream) stream(ctx context.Context, ch chan<- []*objectSDK.ID) error {
fs := s.query.ToSearchFilters()
addrList, err := engine.Select(s.storage, s.cid, fs)
if err != nil {
return errors.Wrapf(err, "(%T) could not select objects from local storage", s)
exec.status = statusUndefined
exec.err = err
exec.log.Debug("local operation failed",
zap.String("error", err.Error()),
)
return
}
idList := make([]*objectSDK.ID, 0, len(addrList))
for i := range addrList {
idList = append(idList, addrList[i].ObjectID())
}
select {
case <-ctx.Done():
return ctx.Err()
case ch <- idList:
return nil
}
exec.writeIDList(ids)
}

View file

@ -1,39 +1,49 @@
package searchsvc
import (
"github.com/nspcc-dev/neofs-api-go/pkg/container"
"github.com/nspcc-dev/neofs-node/pkg/services/object/search/query"
"crypto/ecdsa"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
)
// Prm groups parameters of Get service call.
type Prm struct {
writer IDListWriter
// TODO: replace key and callOpts to CommonPrm
key *ecdsa.PrivateKey
callOpts []client.CallOption
common *util.CommonPrm
cid *container.ID
query query.Query
client.SearchObjectParams
}
func (p *Prm) WithContainerID(v *container.ID) *Prm {
if p != nil {
p.cid = v
}
return p
// IDListWriter is an interface of target component
// to write list of object identifiers.
type IDListWriter interface {
WriteIDs([]*objectSDK.ID) error
}
func (p *Prm) WithSearchQuery(v query.Query) *Prm {
if p != nil {
p.query = v
}
return p
// SetPrivateKey sets private key to use during execution.
func (p *Prm) SetPrivateKey(key *ecdsa.PrivateKey) {
p.key = key
}
func (p *Prm) WithCommonPrm(v *util.CommonPrm) *Prm {
if p != nil {
p.common = v
}
return p
// SetRemoteCallOptions sets call options remote remote client calls.
func (p *Prm) SetRemoteCallOptions(opts ...client.CallOption) {
p.callOpts = opts
}
// SetCommonParameters sets common parameters of the operation.
func (p *Prm) SetCommonParameters(common *util.CommonPrm) {
p.common = common
}
// SetWriter sets target component to write list of object identifiers.
func (p *Prm) SetWriter(w IDListWriter) {
p.writer = w
}

View file

@ -1,80 +0,0 @@
package searchsvc
import (
"context"
"io"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object/search/query"
queryV1 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/query/v1"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/pkg/errors"
)
type RelationSearcher struct {
svc *Service
queryGenerator func(*object.Address) query.Query
}
var ErrRelationNotFound = errors.New("relation not found")
func (s *RelationSearcher) SearchRelation(ctx context.Context, addr *object.Address, prm *util.CommonPrm) (*object.ID, error) {
streamer, err := s.svc.Search(ctx, new(Prm).
WithContainerID(addr.ContainerID()).WithCommonPrm(prm).
WithSearchQuery(s.queryGenerator(addr)),
)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not create search streamer", s)
}
res, err := readFullStream(streamer, 1)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not read full search stream", s)
} else if ln := len(res); ln != 1 {
if ln == 0 {
return nil, ErrRelationNotFound
}
return nil, errors.Errorf("(%T) unexpected amount of found objects %d", s, ln)
}
return res[0], nil
}
func readFullStream(s *Streamer, cap int) ([]*object.ID, error) {
res := make([]*object.ID, 0, cap)
for {
r, err := s.Recv()
if err != nil {
if errors.Is(errors.Cause(err), io.EOF) {
break
}
return nil, errors.Wrapf(err, "(%s) could not receive search result", "readFullStream")
}
res = append(res, r.IDList()...)
}
return res, nil
}
func NewRightChildSearcher(svc *Service) *RelationSearcher {
return &RelationSearcher{
svc: svc,
queryGenerator: func(addr *object.Address) query.Query {
return queryV1.NewRightChildQuery(addr.ObjectID())
},
}
}
func NewLinkingSearcher(svc *Service) *RelationSearcher {
return &RelationSearcher{
svc: svc,
queryGenerator: func(addr *object.Address) query.Query {
return queryV1.NewLinkingQuery(addr.ObjectID())
},
}
}

View file

@ -3,55 +3,29 @@ package searchsvc
import (
"context"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/pkg/errors"
"go.uber.org/zap"
)
type remoteStream struct {
prm *Prm
func (exec *execCtx) processNode(ctx context.Context, addr *network.Address) {
log := exec.log.With(zap.Stringer("remote node", addr))
keyStorage *util.KeyStorage
log.Debug("processing node...")
addr *network.Address
client, ok := exec.remoteClient(addr)
if !ok {
return
}
clientCache *cache.ClientCache
ids, err := client.searchObjects(exec)
clientOpts []client.Option
}
func (s *remoteStream) stream(ctx context.Context, ch chan<- []*object.ID) error {
key, err := s.keyStorage.GetKey(s.prm.common.SessionToken())
if err != nil {
return errors.Wrapf(err, "(%T) could not receive private key", s)
}
addr, err := s.addr.IPAddrString()
if err != nil {
return err
}
c, err := s.clientCache.Get(key, addr, s.clientOpts...)
if err != nil {
return errors.Wrapf(err, "(%T) could not create SDK client %s", s, addr)
}
// TODO: add writer parameter to SDK client
id, err := c.SearchObject(ctx, new(client.SearchObjectParams).
WithContainerID(s.prm.cid).
WithSearchFilters(s.prm.query.ToSearchFilters()),
client.WithTTL(1), // FIXME: use constant
client.WithSession(s.prm.common.SessionToken()),
client.WithBearer(s.prm.common.BearerToken()),
)
if err != nil {
return errors.Wrapf(err, "(%T) could not search objects in %s", s, addr)
}
ch <- id
return nil
if err != nil {
exec.log.Debug("local operation failed",
zap.String("error", err.Error()),
)
return
}
exec.writeIDList(ids)
}

View file

@ -1,13 +0,0 @@
package searchsvc
import (
"github.com/nspcc-dev/neofs-api-go/pkg/object"
)
type Response struct {
idList []*object.ID
}
func (r *Response) IDList() []*object.ID {
return r.idList
}

View file

@ -0,0 +1,50 @@
package searchsvc
import (
"context"
"go.uber.org/zap"
)
// Search serves a request to select the objects.
func (s *Service) Search(ctx context.Context, prm Prm) error {
exec := &execCtx{
svc: s,
ctx: ctx,
prm: prm,
}
exec.prepare()
exec.setLogger(s.log)
exec.execute()
return exec.statusError.err
}
func (exec *execCtx) execute() {
exec.log.Debug("serving request...")
// perform local operation
exec.executeLocal()
exec.analyzeStatus(true)
}
func (exec *execCtx) analyzeStatus(execCnr bool) {
// analyze local result
switch exec.status {
default:
exec.log.Debug("operation finished with error",
zap.String("error", exec.err.Error()),
)
case statusOK:
exec.log.Debug("operation finished successfully")
}
if execCnr {
exec.executeOnContainer()
exec.analyzeStatus(false)
}
}

View file

@ -0,0 +1,336 @@
package searchsvc
import (
"context"
"crypto/ecdsa"
"crypto/rand"
"crypto/sha256"
"fmt"
"strconv"
"testing"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/util/logger/test"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)
type idsErr struct {
ids []*objectSDK.ID
err error
}
type testStorage struct {
items map[string]idsErr
}
type testTraverserGenerator struct {
c *container.Container
b placement.Builder
}
type testPlacementBuilder struct {
vectors map[string][]netmap.Nodes
}
type testClientCache struct {
clients map[string]*testStorage
}
type simpleIDWriter struct {
ids []*objectSDK.ID
}
func (s *simpleIDWriter) WriteIDs(ids []*objectSDK.ID) error {
s.ids = append(s.ids, ids...)
return nil
}
func newTestStorage() *testStorage {
return &testStorage{
items: make(map[string]idsErr),
}
}
func (g *testTraverserGenerator) generateTraverser(_ *container.ID) (*placement.Traverser, error) {
return placement.NewTraverser(
placement.ForContainer(g.c),
placement.UseBuilder(g.b),
placement.WithoutSuccessTracking(),
)
}
func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap.PlacementPolicy) ([]netmap.Nodes, error) {
vs, ok := p.vectors[addr.String()]
if !ok {
return nil, errors.New("vectors for address not found")
}
return vs, nil
}
func (c *testClientCache) get(_ *ecdsa.PrivateKey, addr string) (searchClient, error) {
v, ok := c.clients[addr]
if !ok {
return nil, errors.New("could not construct client")
}
return v, nil
}
func (s *testStorage) search(exec *execCtx) ([]*objectSDK.ID, error) {
v, ok := s.items[exec.containerID().String()]
if !ok {
return nil, nil
}
return v.ids, v.err
}
func (c *testStorage) searchObjects(exec *execCtx) ([]*objectSDK.ID, error) {
v, ok := c.items[exec.containerID().String()]
if !ok {
return nil, nil
}
return v.ids, v.err
}
func (c *testStorage) addResult(addr *container.ID, ids []*objectSDK.ID, err error) {
c.items[addr.String()] = idsErr{
ids: ids,
err: err,
}
}
func testSHA256() (cs [sha256.Size]byte) {
rand.Read(cs[:])
return cs
}
func generateCID() *container.ID {
cid := container.NewID()
cid.SetSHA256(testSHA256())
return cid
}
func generateIDs(num int) []*objectSDK.ID {
res := make([]*objectSDK.ID, num)
for i := 0; i < num; i++ {
res[i] = objectSDK.NewID()
res[i].SetSHA256(testSHA256())
}
return res
}
func TestGetLocalOnly(t *testing.T) {
ctx := context.Background()
newSvc := func(storage *testStorage) *Service {
svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(false)
svc.localStorage = storage
return svc
}
newPrm := func(cid *container.ID, w IDListWriter) Prm {
p := Prm{}
p.WithContainerID(cid)
p.SetWriter(w)
p.common = new(util.CommonPrm).WithLocalOnly(true)
return p
}
t.Run("OK", func(t *testing.T) {
storage := newTestStorage()
svc := newSvc(storage)
cid := generateCID()
ids := generateIDs(10)
storage.addResult(cid, ids, nil)
w := new(simpleIDWriter)
p := newPrm(cid, w)
err := svc.Search(ctx, p)
require.NoError(t, err)
require.Equal(t, ids, w.ids)
})
t.Run("FAIL", func(t *testing.T) {
storage := newTestStorage()
svc := newSvc(storage)
cid := generateCID()
testErr := errors.New("any error")
storage.addResult(cid, nil, testErr)
w := new(simpleIDWriter)
p := newPrm(cid, w)
err := svc.Search(ctx, p)
require.True(t, errors.Is(err, testErr))
})
}
func testNodeMatrix(t testing.TB, dim []int) ([]netmap.Nodes, [][]string) {
mNodes := make([]netmap.Nodes, len(dim))
mAddr := make([][]string, len(dim))
for i := range dim {
ns := make([]netmap.NodeInfo, dim[i])
as := make([]string, dim[i])
for j := 0; j < dim[i]; j++ {
a := fmt.Sprintf("/ip4/192.168.0.%s/tcp/%s",
strconv.Itoa(i),
strconv.Itoa(60000+j),
)
var err error
na, err := network.AddressFromString(a)
require.NoError(t, err)
as[j], err = na.IPAddrString()
require.NoError(t, err)
ni := netmap.NewNodeInfo()
ni.SetAddress(a)
ns[j] = *ni
}
mNodes[i] = netmap.NodesFromInfo(ns)
mAddr[i] = as
}
return mNodes, mAddr
}
//
// func generateChain(ln int, cid *container.ID) ([]*object.RawObject, []*objectSDK.ID, []byte) {
// curID := generateID()
// var prevID *objectSDK.ID
//
// addr := objectSDK.NewAddress()
// addr.SetContainerID(cid)
//
// res := make([]*object.RawObject, 0, ln)
// ids := make([]*objectSDK.ID, 0, ln)
// payload := make([]byte, 0, ln*10)
//
// for i := 0; i < ln; i++ {
// ids = append(ids, curID)
// addr.SetObjectID(curID)
//
// payloadPart := make([]byte, 10)
// rand.Read(payloadPart)
//
// o := generateObject(addr, prevID, []byte{byte(i)})
// o.SetPayload(payloadPart)
// o.SetPayloadSize(uint64(len(payloadPart)))
// o.SetID(curID)
//
// payload = append(payload, payloadPart...)
//
// res = append(res, o)
//
// prevID = curID
// curID = generateID()
// }
//
// return res, ids, payload
// }
func TestGetRemoteSmall(t *testing.T) {
ctx := context.Background()
placementDim := []int{2}
rs := make([]*netmap.Replica, 0, len(placementDim))
for i := range placementDim {
r := netmap.NewReplica()
r.SetCount(uint32(placementDim[i]))
rs = append(rs, r)
}
pp := netmap.NewPlacementPolicy()
pp.SetReplicas(rs...)
cnr := container.New(container.WithPolicy(pp))
cid := container.CalculateID(cnr)
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(false)
svc.localStorage = newTestStorage()
svc.traverserGenerator = &testTraverserGenerator{
c: cnr,
b: b,
}
svc.clientCache = c
return svc
}
newPrm := func(cid *container.ID, w IDListWriter) Prm {
p := Prm{}
p.WithContainerID(cid)
p.SetWriter(w)
p.common = new(util.CommonPrm).WithLocalOnly(false)
return p
}
t.Run("OK", func(t *testing.T) {
addr := objectSDK.NewAddress()
addr.SetContainerID(cid)
ns, as := testNodeMatrix(t, placementDim)
builder := &testPlacementBuilder{
vectors: map[string][]netmap.Nodes{
addr.String(): ns,
},
}
c1 := newTestStorage()
ids1 := generateIDs(10)
c1.addResult(cid, ids1, nil)
c2 := newTestStorage()
ids2 := generateIDs(10)
c2.addResult(cid, ids2, nil)
svc := newSvc(builder, &testClientCache{
clients: map[string]*testStorage{
as[0][0]: c1,
as[0][1]: c2,
},
})
w := new(simpleIDWriter)
p := newPrm(cid, w)
err := svc.Search(ctx, p)
require.NoError(t, err)
require.Len(t, w.ids, len(ids1)+len(ids2))
for _, id := range append(ids1, ids2...) {
require.Contains(t, w.ids, id)
}
})
}

View file

@ -1,56 +1,58 @@
package searchsvc
import (
"context"
"sync"
"crypto/ecdsa"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"go.uber.org/zap"
)
// Service is an utility serving requests
// of Object.Search service.
type Service struct {
*cfg
}
// Option is a Service's constructor option.
type Option func(*cfg)
type searchClient interface {
searchObjects(*execCtx) ([]*object.ID, error)
}
type cfg struct {
keyStorage *objutil.KeyStorage
localStore *engine.StorageEngine
cnrSrc container.Source
netMapSrc netmap.Source
workerPool util.WorkerPool
localAddrSrc network.LocalAddressSource
clientCache *cache.ClientCache
log *logger.Logger
clientOpts []client.Option
localStorage interface {
search(*execCtx) ([]*object.ID, error)
}
clientCache interface {
get(*ecdsa.PrivateKey, string) (searchClient, error)
}
traverserGenerator interface {
generateTraverser(*container.ID) (*placement.Traverser, error)
}
}
func defaultCfg() *cfg {
return &cfg{
workerPool: new(util.SyncWorkerPool),
log: zap.L(),
log: zap.L(),
clientCache: new(clientCacheWrapper),
}
}
func NewService(opts ...Option) *Service {
// New creates, initializes and returns utility serving
// Object.Get service requests.
func New(opts ...Option) *Service {
c := defaultCfg()
for i := range opts {
@ -62,66 +64,39 @@ func NewService(opts ...Option) *Service {
}
}
func (p *Service) Search(ctx context.Context, prm *Prm) (*Streamer, error) {
return &Streamer{
cfg: p.cfg,
once: new(sync.Once),
prm: prm,
ctx: ctx,
cache: make([][]*object.ID, 0, 10),
}, nil
}
func WithKeyStorage(v *objutil.KeyStorage) Option {
return func(c *cfg) {
c.keyStorage = v
}
}
func WithLocalStorage(v *engine.StorageEngine) Option {
return func(c *cfg) {
c.localStore = v
}
}
func WithContainerSource(v container.Source) Option {
return func(c *cfg) {
c.cnrSrc = v
}
}
func WithNetworkMapSource(v netmap.Source) Option {
return func(c *cfg) {
c.netMapSrc = v
}
}
func WithWorkerPool(v util.WorkerPool) Option {
return func(c *cfg) {
c.workerPool = v
}
}
func WithLocalAddressSource(v network.LocalAddressSource) Option {
return func(c *cfg) {
c.localAddrSrc = v
}
}
func WithClientCache(v *cache.ClientCache) Option {
return func(c *cfg) {
c.clientCache = v
}
}
// WithLogger returns option to specify Get service's logger.
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l
c.log = l.With(zap.String("component", "Object.Get service"))
}
}
func WithClientOptions(opts ...client.Option) Option {
// WithLocalStorageEngine returns option to set local storage
// instance.
func WithLocalStorageEngine(e *engine.StorageEngine) Option {
return func(c *cfg) {
c.clientOpts = opts
c.localStorage = (*storageEngineWrapper)(e)
}
}
// WithClientCache returns option to set cache of remote node clients.
func WithClientCache(v *cache.ClientCache) Option {
return func(c *cfg) {
c.clientCache.(*clientCacheWrapper).cache = v
}
}
// WithClientOptions returns option to specify options of remote node clients.
func WithClientOptions(opts ...client.Option) Option {
return func(c *cfg) {
c.clientCache.(*clientCacheWrapper).opts = opts
}
}
// WithTraverserGenerator returns option to set generator of
// placement traverser to get the objects from containers.
func WithTraverserGenerator(t *util.TraverserGenerator) Option {
return func(c *cfg) {
c.traverserGenerator = (*traverseGeneratorWrapper)(t)
}
}

View file

@ -1,186 +0,0 @@
package searchsvc
import (
"context"
"io"
"sync"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/network"
svcutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/pkg/errors"
)
type Streamer struct {
*cfg
once *sync.Once
prm *Prm
traverser *placement.Traverser
ctx context.Context
ch chan []*object.ID
cache [][]*object.ID
}
func (p *Streamer) Recv() (*Response, error) {
var err error
p.once.Do(func() {
if err = p.preparePrm(p.prm); err == nil {
go p.start(p.prm)
}
})
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not start streaming", p)
}
select {
case <-p.ctx.Done():
return nil, errors.Wrapf(p.ctx.Err(), "(%T) context is done", p)
case v, ok := <-p.ch:
if !ok {
return nil, io.EOF
}
v = p.cutCached(v)
return &Response{
idList: v,
}, nil
}
}
func (p *Streamer) cutCached(ids []*object.ID) []*object.ID {
loop:
for i := 0; i < len(ids); i++ {
for j := range p.cache {
for k := range p.cache[j] {
if ids[i].Equal(p.cache[j][k]) {
ids = append(ids[:i], ids[i+1:]...)
i--
continue loop
}
}
}
}
if len(ids) > 0 {
p.cache = append(p.cache, ids)
}
return ids
}
func (p *Streamer) preparePrm(prm *Prm) error {
var err error
// get latest network map
nm, err := netmap.GetLatestNetworkMap(p.netMapSrc)
if err != nil {
return errors.Wrapf(err, "(%T) could not get latest network map", p)
}
// get container to store the object
cnr, err := p.cnrSrc.Get(prm.cid)
if err != nil {
return errors.Wrapf(err, "(%T) could not get container by ID", p)
}
// allocate placement traverser options
traverseOpts := make([]placement.Option, 0, 4)
// add common options
traverseOpts = append(traverseOpts,
// set processing container
placement.ForContainer(cnr),
)
// create placement builder from network map
builder := placement.NewNetworkMapBuilder(nm)
if prm.common.LocalOnly() {
// restrict success count to 1 stored copy (to local storage)
traverseOpts = append(traverseOpts, placement.SuccessAfter(1))
// use local-only placement builder
builder = svcutil.NewLocalPlacement(builder, p.localAddrSrc)
}
// set placement builder
traverseOpts = append(traverseOpts, placement.UseBuilder(builder))
// build placement traverser
if p.traverser, err = placement.NewTraverser(traverseOpts...); err != nil {
return errors.Wrapf(err, "(%T) could not build placement traverser", p)
}
p.ch = make(chan []*object.ID)
return nil
}
func (p *Streamer) start(prm *Prm) {
defer close(p.ch)
loop:
for {
addrs := p.traverser.Next()
if len(addrs) == 0 {
break
}
wg := new(sync.WaitGroup)
for i := range addrs {
wg.Add(1)
addr := addrs[i]
if err := p.workerPool.Submit(func() {
defer wg.Done()
var streamer interface {
stream(context.Context, chan<- []*object.ID) error
}
if network.IsLocalAddress(p.localAddrSrc, addr) {
streamer = &localStream{
query: prm.query,
storage: p.localStore,
cid: prm.cid,
}
} else {
streamer = &remoteStream{
prm: prm,
keyStorage: p.keyStorage,
addr: addr,
clientCache: p.clientCache,
clientOpts: p.clientOpts,
}
}
if err := streamer.stream(p.ctx, p.ch); err != nil {
svcutil.LogServiceError(p.log, "SEARCH", addr, err)
}
}); err != nil {
wg.Done()
svcutil.LogWorkerPoolError(p.log, "SEARCH", err)
break loop
}
}
wg.Wait()
}
}

View file

@ -0,0 +1,109 @@
package searchsvc
import (
"crypto/ecdsa"
"sync"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
)
type uniqueIDWriter struct {
mtx sync.Mutex
written map[string]struct{}
writer IDListWriter
}
type clientCacheWrapper struct {
cache *cache.ClientCache
opts []client.Option
}
type clientWrapper struct {
client *client.Client
}
type storageEngineWrapper engine.StorageEngine
type traverseGeneratorWrapper util.TraverserGenerator
func newUniqueAddressWriter(w IDListWriter) IDListWriter {
return &uniqueIDWriter{
written: make(map[string]struct{}),
writer: w,
}
}
func (w *uniqueIDWriter) WriteIDs(list []*objectSDK.ID) error {
w.mtx.Lock()
for i := 0; i < len(list); i++ { // don't use range, slice mutates in body
s := list[i].String()
// standard stringer is quite costly, it is better
// to facilitate the calculation of the key
if _, ok := w.written[s]; !ok {
// mark address as processed
w.written[s] = struct{}{}
continue
}
// exclude processed address
list = append(list[:i], list[i+1:]...)
i--
}
w.mtx.Unlock()
return w.writer.WriteIDs(list)
}
func (c *clientCacheWrapper) get(key *ecdsa.PrivateKey, addr string) (searchClient, error) {
clt, err := c.cache.Get(key, addr, c.opts...)
return &clientWrapper{
client: clt,
}, err
}
func (c *clientWrapper) searchObjects(exec *execCtx) ([]*objectSDK.ID, error) {
return c.client.SearchObject(exec.context(),
exec.remotePrm(),
exec.callOptions()...)
}
func (e *storageEngineWrapper) search(exec *execCtx) ([]*objectSDK.ID, error) {
r, err := (*engine.StorageEngine)(e).Select(new(engine.SelectPrm).
WithFilters(exec.searchFilters()),
)
if err != nil {
return nil, err
}
return idsFromAddresses(r.AddressList()), nil
}
func idsFromAddresses(addrs []*objectSDK.Address) []*objectSDK.ID {
ids := make([]*objectSDK.ID, len(addrs))
for i := range addrs {
ids[i] = addrs[i].ObjectID()
}
return ids
}
func (e *traverseGeneratorWrapper) generateTraverser(cid *container.ID) (*placement.Traverser, error) {
a := objectSDK.NewAddress()
a.SetContainerID(cid)
return (*util.TraverserGenerator)(e).GenerateTraverser(a)
}

View file

@ -1,11 +1,10 @@
package searchsvc
import (
"context"
"github.com/nspcc-dev/neofs-api-go/v2/object"
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object"
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
"github.com/pkg/errors"
objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
)
// Service implements Search operation of Object service v2.
@ -18,6 +17,8 @@ type Option func(*cfg)
type cfg struct {
svc *searchsvc.Service
keyStorage *objutil.KeyStorage
}
// NewService constructs Service instance from provided options.
@ -33,25 +34,27 @@ func NewService(opts ...Option) *Service {
}
}
// Search calls internal service and returns v2 search object streamer.
func (s *Service) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) {
prm, err := toPrm(req.GetBody(), req)
// Get calls internal service and returns v2 object stream.
func (s *Service) Search(req *objectV2.SearchRequest, stream objectSvc.SearchStream) error {
p, err := s.toPrm(req, stream)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not convert search parameters", s)
return err
}
stream, err := s.svc.Search(ctx, prm)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not open object search stream", s)
}
return &streamer{
stream: stream,
}, nil
return s.svc.Search(stream.Context(), *p)
}
// WithInternalService returns option to set entity
// that handles request payload.
func WithInternalService(v *searchsvc.Service) Option {
return func(c *cfg) {
c.svc = v
}
}
// WithKeyStorage returns option to set local private key storage.
func WithKeyStorage(ks *objutil.KeyStorage) Option {
return func(c *cfg) {
c.keyStorage = ks
}
}

View file

@ -1,26 +1,29 @@
package searchsvc
import (
"io"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/v2/object"
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
"github.com/pkg/errors"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object"
)
type streamer struct {
stream *searchsvc.Streamer
type streamWriter struct {
stream objectSvc.SearchStream
}
func (s *streamer) Recv() (*object.SearchResponse, error) {
r, err := s.stream.Recv()
if err != nil {
if errors.Is(errors.Cause(err), io.EOF) {
return nil, io.EOF
}
func (s *streamWriter) WriteIDs(ids []*objectSDK.ID) error {
r := new(object.SearchResponse)
return nil, errors.Wrapf(err, "(%T) could not receive search response", s)
body := new(object.SearchResponseBody)
r.SetBody(body)
idsV2 := make([]*refs.ObjectID, len(ids))
for i := range ids {
idsV2[i] = ids[i].ToV2()
}
return fromResponse(r), nil
body.SetIDList(idsV2)
return s.stream.Send(r)
}

View file

@ -1,50 +1,61 @@
package searchsvc
import (
"github.com/nspcc-dev/neofs-api-go/pkg"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
"github.com/nspcc-dev/neofs-api-go/pkg/token"
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-api-go/v2/session"
objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object"
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
"github.com/nspcc-dev/neofs-node/pkg/services/object/search/query"
queryV1 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/query/v1"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/pkg/errors"
)
func toPrm(body *object.SearchRequestBody, req *object.SearchRequest) (*searchsvc.Prm, error) {
var q query.Query
func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStream) (*searchsvc.Prm, error) {
meta := req.GetMetaHeader()
switch v := body.GetVersion(); v {
default:
return nil, errors.Errorf("unsupported query version #%d", v)
case 1:
q = queryV1.New(
objectSDK.NewSearchFiltersFromV2(body.GetFilters()),
)
key, err := s.keyStorage.GetKey(token.NewSessionTokenFromV2(meta.GetSessionToken()))
if err != nil {
return nil, err
}
return new(searchsvc.Prm).
WithContainerID(
container.NewIDFromV2(body.GetContainerID()),
).
WithSearchQuery(q).
WithCommonPrm(util.CommonPrmFromV2(req)), nil
p := new(searchsvc.Prm)
p.SetPrivateKey(key)
p.SetCommonParameters(commonParameters(meta))
p.SetRemoteCallOptions(remoteCallOptionsFromMeta(meta)...)
p.SetWriter(&streamWriter{
stream: stream,
})
body := req.GetBody()
p.WithContainerID(container.NewIDFromV2(body.GetContainerID()))
p.WithSearchFilters(objectSDK.NewSearchFiltersFromV2(body.GetFilters()))
return p, nil
}
func fromResponse(r *searchsvc.Response) *object.SearchResponse {
ids := r.IDList()
idsV2 := make([]*refs.ObjectID, 0, len(ids))
// can be shared accross all services
func remoteCallOptionsFromMeta(meta *session.RequestMetaHeader) []client.CallOption {
xHdrs := meta.GetXHeaders()
for i := range ids {
idsV2 = append(idsV2, ids[i].ToV2())
opts := make([]client.CallOption, 0, 3+len(xHdrs))
opts = append(opts,
client.WithBearer(token.NewBearerTokenFromV2(meta.GetBearerToken())),
client.WithSession(token.NewSessionTokenFromV2(meta.GetSessionToken())),
client.WithTTL(meta.GetTTL()-1),
)
for i := range xHdrs {
opts = append(opts, client.WithXHeader(pkg.NewXHeaderFromV2(xHdrs[i])))
}
body := new(object.SearchResponseBody)
body.SetIDList(idsV2)
resp := new(object.SearchResponse)
resp.SetBody(body)
return resp
return opts
}
func commonParameters(meta *session.RequestMetaHeader) *util.CommonPrm {
return new(util.CommonPrm).
WithLocalOnly(meta.GetTTL() <= 1)
}

View file

@ -19,13 +19,19 @@ type GetObjectRangeStream interface {
Send(*object.GetRangeResponse) error
}
// SearchStream is an interface of NeoFS API v2 compatible search streamer.
type SearchStream interface {
util.ServerStream
Send(*object.SearchResponse) error
}
// ServiceServer is an interface of utility
// serving v2 Object service.
type ServiceServer interface {
Get(*object.GetRequest, GetObjectStream) error
Put(context.Context) (object.PutObjectStreamer, error)
Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error)
Search(context.Context, *object.SearchRequest) (object.SearchObjectStreamer, error)
Search(*object.SearchRequest, SearchStream) error
Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error)
GetRange(*object.GetRangeRequest, GetObjectRangeStream) error
GetRangeHash(context.Context, *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error)

View file

@ -18,7 +18,9 @@ type SignService struct {
}
type searchStreamSigner struct {
stream *util.ResponseMessageStreamer
util.ServerStream
respWriter util.ResponseMessageWriter
}
type getStreamSigner struct {
@ -109,35 +111,24 @@ func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*objec
return resp.(*object.HeadResponse), nil
}
func (s *searchStreamSigner) Recv() (*object.SearchResponse, error) {
r, err := s.stream.Recv()
if err != nil {
return nil, errors.Wrap(err, "could not receive response")
}
return r.(*object.SearchResponse), nil
func (s *searchStreamSigner) Send(resp *object.SearchResponse) error {
return s.respWriter(resp)
}
func (s *SignService) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) {
stream, err := s.sigSvc.HandleServerStreamRequest(ctx, req,
func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) {
stream, err := s.svc.Search(ctx, req.(*object.SearchRequest))
if err != nil {
return nil, err
}
return func() (util.ResponseMessage, error) {
return stream.Recv()
}, nil
func (s *SignService) Search(req *object.SearchRequest, stream SearchStream) error {
respWriter, err := s.sigSvc.HandleServerStreamRequest_(req,
func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.SearchResponse))
},
)
if err != nil {
return nil, err
return err
}
return &searchStreamSigner{
stream: stream,
}, nil
return s.svc.Search(req, &searchStreamSigner{
ServerStream: stream,
respWriter: respWriter,
})
}
func (s *SignService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {

View file

@ -5,7 +5,6 @@ import (
"context"
"github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
"github.com/nspcc-dev/neofs-node/pkg/services/util"
"github.com/pkg/errors"
)
@ -30,10 +29,11 @@ type (
chunkSize int
}
searchStreamBasicChecker struct {
next object.SearchObjectStreamer
resp *object.SearchResponse
list []*refs.ObjectID
searchStreamMsgSizeCtrl struct {
util.ServerStream
stream SearchStream
addrAmount uint64
}
@ -100,13 +100,12 @@ func (c TransportSplitter) Head(ctx context.Context, request *object.HeadRequest
return c.next.Head(ctx, request)
}
func (c TransportSplitter) Search(ctx context.Context, request *object.SearchRequest) (object.SearchObjectStreamer, error) {
stream, err := c.next.Search(ctx, request)
return &searchStreamBasicChecker{
next: stream,
addrAmount: c.addrAmount,
}, err
func (c TransportSplitter) Search(req *object.SearchRequest, stream SearchStream) error {
return c.next.Search(req, &searchStreamMsgSizeCtrl{
ServerStream: stream,
stream: stream,
addrAmount: c.addrAmount,
})
}
func (c TransportSplitter) Delete(ctx context.Context, request *object.DeleteRequest) (*object.DeleteResponse, error) {
@ -154,34 +153,35 @@ func (c TransportSplitter) GetRangeHash(ctx context.Context, request *object.Get
return c.next.GetRangeHash(ctx, request)
}
func (s *searchStreamBasicChecker) Recv() (*object.SearchResponse, error) {
if s.resp == nil {
resp, err := s.next.Recv()
if err != nil {
return resp, err
func (s *searchStreamMsgSizeCtrl) Send(resp *object.SearchResponse) error {
body := resp.GetBody()
ids := body.GetIDList()
var newResp *object.SearchResponse
for ln := uint64(len(ids)); len(ids) > 0; {
if newResp == nil {
newResp = new(object.SearchResponse)
newResp.SetBody(body)
}
s.resp = resp
s.list = s.resp.GetBody().GetIDList()
cut := s.addrAmount
if cut > ln {
cut = ln
}
body.SetIDList(ids[:cut])
newResp.SetMetaHeader(resp.GetMetaHeader())
newResp.SetVerificationHeader(resp.GetVerificationHeader())
if err := s.stream.Send(newResp); err != nil {
return err
}
ids = ids[cut:]
}
chunk := s.list[:min(int(s.addrAmount), len(s.list))]
s.list = s.list[len(chunk):]
body := new(object.SearchResponseBody)
body.SetIDList(chunk)
resp := new(object.SearchResponse)
resp.SetVerificationHeader(s.resp.GetVerificationHeader())
resp.SetMetaHeader(s.resp.GetMetaHeader())
resp.SetBody(body)
if len(s.list) == 0 {
s.list = nil
s.resp = nil
}
return resp, nil
return nil
}
func min(a, b int) int {