From d2009c8731def5d34c530484e7950d95791396f2 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Wed, 30 Sep 2020 11:39:45 +0300 Subject: [PATCH] [#59] Add grpc payload splitter in object service chain GRPC has default message limit of 4MiB. Since every transmitted neofs message has to be signed, then original message should be split into transfer fit structures before signature service. This commit introduce transport payload splitter for object service pipeline. This splitter works with stream response for methods: - object.Get - object.Range - object.Search Signed-off-by: Alex Vanin --- cmd/neofs-node/config.go | 9 + cmd/neofs-node/object.go | 22 ++- pkg/services/object/transport_splitter.go | 198 ++++++++++++++++++++++ 3 files changed, 220 insertions(+), 9 deletions(-) create mode 100644 pkg/services/object/transport_splitter.go diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 8216b7490..b021f2b4d 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -16,6 +16,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/morph/client" nmwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/object" tokenStorage "github.com/nspcc-dev/neofs-node/pkg/services/session/storage" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/spf13/viper" @@ -91,6 +92,10 @@ type cfgGRPC struct { listener net.Listener server *grpc.Server + + maxChunkSize uint64 + + maxAddrAmount uint64 } type cfgMorph struct { @@ -186,6 +191,10 @@ func initCfg(path string) *cfg { cfgObject: cfgObject{ maxObjectSize: viperCfg.GetUint64(cfgMaxObjectSize), }, + cfgGRPC: cfgGRPC{ + maxChunkSize: object.GRPCPayloadChunkSize, + maxAddrAmount: object.GRPCSearchAddrAmount, + }, localAddr: netAddr, } } diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 4979bf308..d61b53cc0 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -253,15 +253,19 @@ func initObjectService(c *cfg) { objectTransportGRPC.New( objectService.NewSignService( c.key, - &objectSvc{ - put: sPutV2, - search: sSearchV2, - head: sHeadV2, - rng: sRangeV2, - get: sGetV2, - rngHash: sRangeHashV2, - delete: sDeleteV2, - }, + objectService.NewTransportSplitter( + c.cfgGRPC.maxChunkSize, + c.cfgGRPC.maxAddrAmount, + &objectSvc{ + put: sPutV2, + search: sSearchV2, + head: sHeadV2, + rng: sRangeV2, + get: sGetV2, + rngHash: sRangeHashV2, + delete: sDeleteV2, + }, + ), ), ), ) diff --git a/pkg/services/object/transport_splitter.go b/pkg/services/object/transport_splitter.go new file mode 100644 index 000000000..d0c3533ed --- /dev/null +++ b/pkg/services/object/transport_splitter.go @@ -0,0 +1,198 @@ +package object + +import ( + "bytes" + "context" + + "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-api-go/v2/refs" + "github.com/pkg/errors" +) + +const ( + GRPCPayloadChunkSize = 1024 * 1024 * 3 // 4 MiB is a max limit, 3 MiB should be okay + GRPCSearchAddrAmount = 1024 * 32 // 64 bytes per addr, in total about 2 MiB +) + +var ( + errChunking = errors.New("can't split message to stream chunks") +) + +type ( + TransportSplitter struct { + next object.Service + + chunkSize uint64 + addrAmount uint64 + } + + getStreamBasicChecker struct { + next object.GetObjectStreamer + buf *bytes.Buffer + resp *object.GetResponse + chunkSize int + } + + searchStreamBasicChecker struct { + next object.SearchObjectStreamer + resp *object.SearchResponse + list []*refs.ObjectID + addrAmount uint64 + } + + rangeStreamBasicChecker struct { + next object.GetRangeObjectStreamer + buf *bytes.Buffer + resp *object.GetRangeResponse + chunkSize int + } +) + +func NewTransportSplitter(size, amount uint64, next object.Service) *TransportSplitter { + return &TransportSplitter{ + next: next, + chunkSize: size, + addrAmount: amount, + } +} + +func (c TransportSplitter) Get(ctx context.Context, request *object.GetRequest) (object.GetObjectStreamer, error) { + stream, err := c.next.Get(ctx, request) + + return &getStreamBasicChecker{ + next: stream, + chunkSize: int(c.chunkSize), + }, err +} + +func (c TransportSplitter) Put(ctx context.Context) (object.PutObjectStreamer, error) { + return c.next.Put(ctx) +} + +func (c TransportSplitter) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) { + return c.next.Head(ctx, request) +} + +func (c TransportSplitter) Search(ctx context.Context, request *object.SearchRequest) (object.SearchObjectStreamer, error) { + stream, err := c.next.Search(ctx, request) + + return &searchStreamBasicChecker{ + next: stream, + addrAmount: c.addrAmount, + }, err +} + +func (c TransportSplitter) Delete(ctx context.Context, request *object.DeleteRequest) (*object.DeleteResponse, error) { + return c.next.Delete(ctx, request) +} + +func (c TransportSplitter) GetRange(ctx context.Context, request *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) { + stream, err := c.next.GetRange(ctx, request) + + return &rangeStreamBasicChecker{ + next: stream, + chunkSize: int(c.chunkSize), + }, err +} + +func (c TransportSplitter) GetRangeHash(ctx context.Context, request *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { + return c.next.GetRangeHash(ctx, request) +} + +func (g *getStreamBasicChecker) Recv() (*object.GetResponse, error) { + if g.resp == nil { + resp, err := g.next.Recv() + if err != nil { + return resp, err + } + + if part, ok := resp.GetBody().GetObjectPart().(*object.GetObjectPartChunk); !ok { + return resp, err + } else { + g.resp = resp + g.buf = bytes.NewBuffer(part.GetChunk()) + } + } + + chunkBody := new(object.GetObjectPartChunk) + chunkBody.SetChunk(g.buf.Next(g.chunkSize)) + + body := new(object.GetResponseBody) + body.SetObjectPart(chunkBody) + + resp := new(object.GetResponse) + resp.SetVerificationHeader(g.resp.GetVerificationHeader()) + resp.SetMetaHeader(g.resp.GetMetaHeader()) + resp.SetBody(body) + + if g.buf.Len() == 0 { + g.buf = nil + g.resp = nil + } + + return resp, nil +} + +func (r *rangeStreamBasicChecker) Recv() (*object.GetRangeResponse, error) { + if r.resp == nil { + resp, err := r.next.Recv() + if err != nil { + return resp, err + } + + r.resp = resp + r.buf = bytes.NewBuffer(resp.GetBody().GetChunk()) + } + + body := new(object.GetRangeResponseBody) + body.SetChunk(r.buf.Next(r.chunkSize)) + + resp := new(object.GetRangeResponse) + resp.SetVerificationHeader(r.resp.GetVerificationHeader()) + resp.SetMetaHeader(r.resp.GetMetaHeader()) + resp.SetBody(body) + + if r.buf.Len() == 0 { + r.buf = nil + r.resp = nil + } + + return resp, nil +} + +func (s *searchStreamBasicChecker) Recv() (*object.SearchResponse, error) { + if s.resp == nil { + resp, err := s.next.Recv() + if err != nil { + return resp, err + } + + s.resp = resp + s.list = s.resp.GetBody().GetIDList() + } + + chunk := s.list[:min(int(s.addrAmount), len(s.list))] + s.list = s.list[len(chunk):] + + body := new(object.SearchResponseBody) + body.SetIDList(chunk) + + resp := new(object.SearchResponse) + resp.SetVerificationHeader(s.resp.GetVerificationHeader()) + resp.SetMetaHeader(s.resp.GetMetaHeader()) + resp.SetBody(body) + + if len(s.list) == 0 { + s.list = nil + s.resp = nil + } + + return resp, nil +} + +func min(a, b int) int { + if a > b { + return b + } + return a +}