From bd4c4828f0af8c3ea64367dba581cbb536b850be Mon Sep 17 00:00:00 2001 From: Ekaterina Lebedeva Date: Mon, 28 Oct 2024 18:10:07 +0300 Subject: [PATCH] [#1452] container: Add ListStream method * Added new method for listing containers to container service. It opens stream and sends containers in batches. * Added TransportSplitter wrapper around ExecutionService to split container ID list read from contract in parts that are smaller than grpc max message size. Batch size can be changed in node configuration file (as in example config file). * Changed `container list` implementaion in cli: now ListStream is called by default. Old List is called only if ListStream is not implemented. Signed-off-by: Ekaterina Lebedeva --- cmd/frostfs-cli/internal/client/client.go | 47 +++++++++ cmd/frostfs-cli/modules/container/list.go | 52 ++++++++-- cmd/frostfs-node/config.go | 7 +- .../config/container/container.go | 22 +++++ .../config/container/container_test.go | 27 +++++ cmd/frostfs-node/container.go | 6 +- config/example/node.env | 3 + config/example/node.json | 3 + config/example/node.yaml | 3 + go.mod | 2 +- go.sum | Bin 41960 -> 41960 bytes .../transport/container/grpc/service.go | 23 +++++ pkg/services/container/ape.go | 73 ++++++++++++++ pkg/services/container/ape_test.go | 5 + pkg/services/container/audit.go | 11 +++ pkg/services/container/executor.go | 9 ++ pkg/services/container/morph/executor.go | 32 ++++++ pkg/services/container/server.go | 8 ++ pkg/services/container/sign.go | 37 +++++++ pkg/services/container/transport_splitter.go | 92 ++++++++++++++++++ 20 files changed, 451 insertions(+), 11 deletions(-) create mode 100644 cmd/frostfs-node/config/container/container.go create mode 100644 cmd/frostfs-node/config/container/container_test.go create mode 100644 pkg/services/container/transport_splitter.go diff --git a/cmd/frostfs-cli/internal/client/client.go b/cmd/frostfs-cli/internal/client/client.go index 948d61f36..ba1d42a18 100644 --- a/cmd/frostfs-cli/internal/client/client.go +++ b/cmd/frostfs-cli/internal/client/client.go @@ -85,6 +85,53 @@ func (x ListContainersRes) SortedIDList() []cid.ID { return list } +// ContainerListStreamRes groups the resulting values of ListStream operation. +type ContainerListStreamRes struct { + ids []cid.ID +} + +// SortedIDList returns sorted identifiers of the matched containers. +func (x ContainerListStreamRes) SortedIDList() []cid.ID { + list := x.ids + slices.SortFunc(list, func(lhs, rhs cid.ID) int { + return strings.Compare(lhs.EncodeToString(), rhs.EncodeToString()) + }) + return list +} + +func ListContainersStream(ctx context.Context, prm ListContainersPrm, printCnr func(id cid.ID)) (err error) { + cliPrm := &client.PrmContainerListStream{ + XHeaders: prm.XHeaders, + OwnerID: prm.OwnerID, + Session: prm.Session, + } + rdr, err := prm.cli.ContainerListInit(ctx, *cliPrm) + if err != nil { + return fmt.Errorf("init container list: %w", err) + } + + buf := make([]cid.ID, 10) + var n int + var ok bool + + for { + n, ok = rdr.Read(buf) + for i := range n { + printCnr(buf[i]) + } + if !ok || n == 0 { + break + } + } + + _, err = rdr.Close() + if err != nil { + return fmt.Errorf("read container list: %w", err) + } + + return +} + // PutContainerPrm groups parameters of PutContainer operation. type PutContainerPrm struct { Client *client.Client diff --git a/cmd/frostfs-cli/modules/container/list.go b/cmd/frostfs-cli/modules/container/list.go index f01e4db4d..35cd10075 100644 --- a/cmd/frostfs-cli/modules/container/list.go +++ b/cmd/frostfs-cli/modules/container/list.go @@ -6,8 +6,11 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key" commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/spf13/cobra" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // flags of list command. @@ -51,16 +54,53 @@ var listContainersCmd = &cobra.Command{ var prm internalclient.ListContainersPrm prm.SetClient(cli) - prm.Account = idUser - - res, err := internalclient.ListContainers(cmd.Context(), prm) - commonCmd.ExitOnErr(cmd, "rpc error: %w", err) - + prm.OwnerID = idUser prmGet := internalclient.GetContainerPrm{ Client: cli, } + var containerIDs []cid.ID + + err := internalclient.ListContainersStream(cmd.Context(), prm, func(id cid.ID) { + if flagVarListName == "" && !flagVarListPrintAttr { + cmd.Println(id.String()) + return + } + + prmGet.ClientParams.ContainerID = &id + res, err := internalclient.GetContainer(cmd.Context(), prmGet) + if err != nil { + cmd.Printf(" failed to read attributes: %v\n", err) + return + } + + cnr := res.Container() + if cnrName := containerSDK.Name(cnr); flagVarListName != "" && cnrName != flagVarListName { + return + } + cmd.Println(id.String()) + + if flagVarListPrintAttr { + cnr.IterateUserAttributes(func(key, val string) { + cmd.Printf(" %s: %s\n", key, val) + }) + } + }) + + if err != nil { + if e, ok := status.FromError(err); ok { + switch e.Code() { + case codes.Unimplemented: + res, err := internalclient.ListContainers(cmd.Context(), prm) + commonCmd.ExitOnErr(cmd, "rpc error: %w", err) + containerIDs = res.SortedIDList() + default: + commonCmd.ExitOnErr(cmd, "rpc error: %w", err) + } + } + } else { + return + } - containerIDs := res.SortedIDList() for _, cnrID := range containerIDs { if flagVarListName == "" && !flagVarListPrintAttr { cmd.Println(cnrID.String()) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 9b727e41a..70b0a474f 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -606,9 +606,10 @@ type cfgAccounting struct { type cfgContainer struct { scriptHash neogoutil.Uint160 - parsers map[event.Type]event.NotificationParser - subscribers map[event.Type][]event.Handler - workerPool util.WorkerPool // pool for asynchronous handlers + parsers map[event.Type]event.NotificationParser + subscribers map[event.Type][]event.Handler + workerPool util.WorkerPool // pool for asynchronous handlers + containerBatchSize uint32 } type cfgFrostfsID struct { diff --git a/cmd/frostfs-node/config/container/container.go b/cmd/frostfs-node/config/container/container.go new file mode 100644 index 000000000..ebb1c2ec0 --- /dev/null +++ b/cmd/frostfs-node/config/container/container.go @@ -0,0 +1,22 @@ +package containerconfig + +import "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" + +const ( + subsection = "container" + + // ContainerBatchSizeDefault represents he maximum amount of containers to send via stream at once. + ContainerBatchSizeDefault = 1000 +) + +// ContainerBatchSize returns the value of "container_batch_size" config parameter +// from "container" section. +// +// Returns 0 if the value is not positive integer. +// Returns ContainerBatchSizeDefault if the value is missing. +func ContainerBatchSize(c *config.Config) uint32 { + if c.Sub(subsection).Value("container_batch_size") == nil { + return ContainerBatchSizeDefault + } + return config.Uint32Safe(c.Sub(subsection), "container_batch_size") +} diff --git a/cmd/frostfs-node/config/container/container_test.go b/cmd/frostfs-node/config/container/container_test.go new file mode 100644 index 000000000..1909c9ad1 --- /dev/null +++ b/cmd/frostfs-node/config/container/container_test.go @@ -0,0 +1,27 @@ +package containerconfig_test + +import ( + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" + containerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/container" + configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test" + "github.com/stretchr/testify/require" +) + +func TestContainerSection(t *testing.T) { + t.Run("defaults", func(t *testing.T) { + empty := configtest.EmptyConfig() + require.Equal(t, uint32(containerconfig.ContainerBatchSizeDefault), containerconfig.ContainerBatchSize(empty)) + }) + + const path = "../../../../config/example/node" + fileConfigTest := func(c *config.Config) { + require.Equal(t, uint32(1000), containerconfig.ContainerBatchSize(c)) + } + + configtest.ForEachFileType(path, fileConfigTest) + t.Run("ENV", func(t *testing.T) { + configtest.ForEnvFileType(t, path, fileConfigTest) + }) +} diff --git a/cmd/frostfs-node/container.go b/cmd/frostfs-node/container.go index 7d558dacb..fb2550a03 100644 --- a/cmd/frostfs-node/container.go +++ b/cmd/frostfs-node/container.go @@ -5,6 +5,7 @@ import ( "context" "net" + containerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/container" morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics" @@ -47,6 +48,7 @@ func initContainerService(_ context.Context, c *cfg) { } c.shared.frostfsidClient = frostfsIDSubjectProvider + c.cfgContainer.containerBatchSize = containerconfig.ContainerBatchSize(c.appCfg) defaultChainRouter := engine.NewDefaultChainRouterWithLocalOverrides( c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage(), @@ -56,7 +58,9 @@ func initContainerService(_ context.Context, c *cfg) { &c.key.PrivateKey, containerService.NewAPEServer(defaultChainRouter, cnrRdr, newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient, - containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc), + containerService.NewSplitterService( + c.cfgContainer.containerBatchSize, c.respSvc, + containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc)), ), ) service = containerService.NewAuditService(service, c.log, c.audit) diff --git a/config/example/node.env b/config/example/node.env index f470acf3e..05d5fe36d 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -83,6 +83,9 @@ FROSTFS_POLICER_HEAD_TIMEOUT=15s FROSTFS_REPLICATOR_PUT_TIMEOUT=15s FROSTFS_REPLICATOR_POOL_SIZE=10 +# Container service section +FROSTFS_CONTAINER_BATCH_SIZE=1000 + # Object service section FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100 FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200 diff --git a/config/example/node.json b/config/example/node.json index dba3bad8b..c38b8564a 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -124,6 +124,9 @@ "pool_size": 10, "put_timeout": "15s" }, + "container": { + "container_batch_size": "1000" + }, "object": { "delete": { "tombstone_lifetime": 10 diff --git a/config/example/node.yaml b/config/example/node.yaml index 8f9300b4a..7fad742ca 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -108,6 +108,9 @@ replicator: put_timeout: 15s # timeout for the Replicator PUT remote operation pool_size: 10 # maximum amount of concurrent replications +container: + container_batch_size: 1000 # container_batch_size is the maximum amount of containers to send via stream at once + object: delete: tombstone_lifetime: 10 # tombstone "local" lifetime in epochs diff --git a/go.mod b/go.mod index 6ac37d343..8f4053872 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88 - git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241206094944-81c423e7094d + git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241210104938-c4463df8d467 git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88 diff --git a/go.sum b/go.sum index e084c2445a2f0767c86590041f5b7c99264ec26d..d63396202000696dbbab183d81533d2a0358ac49 100644 GIT binary patch delta 117 zcmaEHoax1JrVYz#cJN>8rGb70D?#( Aod5s; diff --git a/pkg/network/transport/container/grpc/service.go b/pkg/network/transport/container/grpc/service.go index 49d083a90..8cbf8d9c3 100644 --- a/pkg/network/transport/container/grpc/service.go +++ b/pkg/network/transport/container/grpc/service.go @@ -80,3 +80,26 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil } + +type containerStreamerV2 struct { + containerGRPC.ContainerService_ListStreamServer +} + +func (s *containerStreamerV2) Send(resp *container.ListStreamResponse) error { + return s.ContainerService_ListStreamServer.Send( + resp.ToGRPCMessage().(*containerGRPC.ListStreamResponse), + ) +} + +// ListStream converts gRPC ListRequest message and server-side stream and overtakes its data +// to gRPC stream. +func (s *Server) ListStream(req *containerGRPC.ListStreamRequest, gStream containerGRPC.ContainerService_ListStreamServer) error { + listReq := new(container.ListStreamRequest) + if err := listReq.FromGRPCMessage(req); err != nil { + return err + } + + return s.srv.ListStream(listReq, &containerStreamerV2{ + ContainerService_ListStreamServer: gStream, + }) +} diff --git a/pkg/services/container/ape.go b/pkg/services/container/ape.go index 2cdb30b45..493452fa6 100644 --- a/pkg/services/container/ape.go +++ b/pkg/services/container/ape.go @@ -175,6 +175,79 @@ func (ac *apeChecker) List(ctx context.Context, req *container.ListRequest) (*co return nil, apeErr(nativeschema.MethodListContainers, s) } +func (ac *apeChecker) ListStream(req *container.ListStreamRequest, stream ListStream) error { + ctx, span := tracing.StartSpanFromContext(stream.Context(), "apeChecker.ListStream") + defer span.End() + + role, pk, err := ac.getRoleWithoutContainerID(req.GetBody().GetOwnerID(), req.GetMetaHeader(), req.GetVerificationHeader()) + if err != nil { + return err + } + + reqProps := map[string]string{ + nativeschema.PropertyKeyActorPublicKey: hex.EncodeToString(pk.Bytes()), + nativeschema.PropertyKeyActorRole: role, + } + + reqProps, err = ac.fillWithUserClaimTags(reqProps, pk) + if err != nil { + return err + } + if p, ok := peer.FromContext(ctx); ok { + if tcpAddr, ok := p.Addr.(*net.TCPAddr); ok { + reqProps[commonschema.PropertyKeyFrostFSSourceIP] = tcpAddr.IP.String() + } + } + + namespace, err := ac.namespaceByOwner(req.GetBody().GetOwnerID()) + if err != nil { + return fmt.Errorf("could not get owner namespace: %w", err) + } + if err := ac.validateNamespaceByPublicKey(pk, namespace); err != nil { + return err + } + + request := aperequest.NewRequest( + nativeschema.MethodListContainers, + aperequest.NewResource( + resourceName(namespace, ""), + make(map[string]string), + ), + reqProps, + ) + + groups, err := aperequest.Groups(ac.frostFSIDClient, pk) + if err != nil { + return fmt.Errorf("failed to get group ids: %w", err) + } + + // Policy contract keeps group related chains as namespace-group pair. + for i := range groups { + groups[i] = fmt.Sprintf("%s:%s", namespace, groups[i]) + } + + rt := policyengine.NewRequestTargetWithNamespace(namespace) + rt.User = &policyengine.Target{ + Type: policyengine.User, + Name: fmt.Sprintf("%s:%s", namespace, pk.Address()), + } + rt.Groups = make([]policyengine.Target, len(groups)) + for i := range groups { + rt.Groups[i] = policyengine.GroupTarget(groups[i]) + } + + s, found, err := ac.router.IsAllowed(apechain.Ingress, rt, request) + if err != nil { + return err + } + + if found && s == apechain.Allow { + return ac.next.ListStream(req, stream) + } + + return apeErr(nativeschema.MethodListContainers, s) +} + func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) { ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put") defer span.End() diff --git a/pkg/services/container/ape_test.go b/pkg/services/container/ape_test.go index b6b42a559..513ffff02 100644 --- a/pkg/services/container/ape_test.go +++ b/pkg/services/container/ape_test.go @@ -1079,6 +1079,11 @@ func (s *srvStub) List(context.Context, *container.ListRequest) (*container.List return &container.ListResponse{}, nil } +func (s *srvStub) ListStream(*container.ListStreamRequest, ListStream) error { + s.calls["ListStream"]++ + return nil +} + func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) { s.calls["Put"]++ return &container.PutResponse{}, nil diff --git a/pkg/services/container/audit.go b/pkg/services/container/audit.go index 411eb4863..b235efa3c 100644 --- a/pkg/services/container/audit.go +++ b/pkg/services/container/audit.go @@ -63,6 +63,17 @@ func (a *auditService) List(ctx context.Context, req *container.ListRequest) (*c return res, err } +// ListStream implements Server. +func (a *auditService) ListStream(req *container.ListStreamRequest, stream ListStream) error { + err := a.next.ListStream(req, stream) + if !a.enabled.Load() { + return err + } + audit.LogRequest(stream.Context(), a.log, container_grpc.ContainerService_ListStream_FullMethodName, req, + audit.TargetFromRef(req.GetBody().GetOwnerID(), &user.ID{}), err == nil) + return err +} + // Put implements Server. func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) { res, err := a.next.Put(ctx, req) diff --git a/pkg/services/container/executor.go b/pkg/services/container/executor.go index 70234d3de..cdd0d2514 100644 --- a/pkg/services/container/executor.go +++ b/pkg/services/container/executor.go @@ -14,6 +14,7 @@ type ServiceExecutor interface { Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error) Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error) List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error) + ListStream(context.Context, *container.ListStreamRequest, ListStream) error } type executorSvc struct { @@ -93,3 +94,11 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co s.respSvc.SetMeta(resp) return resp, nil } + +func (s *executorSvc) ListStream(req *container.ListStreamRequest, stream ListStream) error { + err := s.exec.ListStream(stream.Context(), req, stream) + if err != nil { + return fmt.Errorf("could not execute ListStream request: %w", err) + } + return nil +} diff --git a/pkg/services/container/morph/executor.go b/pkg/services/container/morph/executor.go index 211f469f3..e9d1606f1 100644 --- a/pkg/services/container/morph/executor.go +++ b/pkg/services/container/morph/executor.go @@ -200,3 +200,35 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody) return res, nil } + +func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error { + body := req.GetBody() + idV2 := body.GetOwnerID() + if idV2 == nil { + return errMissingUserID + } + + var id user.ID + + err := id.ReadFromV2(*idV2) + if err != nil { + return fmt.Errorf("invalid user ID: %w", err) + } + + cnrs, err := s.rdr.ContainersOf(&id) + if err != nil { + return err + } + + cidList := make([]refs.ContainerID, len(cnrs)) + for i := range cnrs { + cnrs[i].WriteToV2(&cidList[i]) + } + + resBody := new(container.ListStreamResponseBody) + resBody.SetContainerIDs(cidList) + r := new(container.ListStreamResponse) + r.SetBody(resBody) + + return stream.Send(r) +} diff --git a/pkg/services/container/server.go b/pkg/services/container/server.go index 78fd3d34c..d9208077d 100644 --- a/pkg/services/container/server.go +++ b/pkg/services/container/server.go @@ -3,6 +3,7 @@ package container import ( "context" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container" ) @@ -12,4 +13,11 @@ type Server interface { Get(context.Context, *container.GetRequest) (*container.GetResponse, error) Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error) List(context.Context, *container.ListRequest) (*container.ListResponse, error) + ListStream(*container.ListStreamRequest, ListStream) error +} + +// ListStream is an interface of FrostFS API v2 compatible search streamer. +type ListStream interface { + util.ServerStream + Send(*container.ListStreamResponse) error } diff --git a/pkg/services/container/sign.go b/pkg/services/container/sign.go index c478c0e1c..85fe7ae87 100644 --- a/pkg/services/container/sign.go +++ b/pkg/services/container/sign.go @@ -56,3 +56,40 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req)) return resp, s.sigSvc.SignResponse(resp, err) } + +func (s *signService) ListStream(req *container.ListStreamRequest, stream ListStream) error { + if err := s.sigSvc.VerifyRequest(req); err != nil { + resp := new(container.ListStreamResponse) + _ = s.sigSvc.SignResponse(resp, err) + return stream.Send(resp) + } + + ss := &listStreamSigner{ + ListStream: stream, + sigSvc: s.sigSvc, + } + err := s.svc.ListStream(req, ss) + if err != nil || !ss.nonEmptyResp { + return ss.send(new(container.ListStreamResponse), err) + } + return nil +} + +type listStreamSigner struct { + ListStream + sigSvc *util.SignService + + nonEmptyResp bool // set on first Send call +} + +func (s *listStreamSigner) Send(resp *container.ListStreamResponse) error { + s.nonEmptyResp = true + return s.send(resp, nil) +} + +func (s *listStreamSigner) send(resp *container.ListStreamResponse, err error) error { + if err := s.sigSvc.SignResponse(resp, err); err != nil { + return err + } + return s.ListStream.Send(resp) +} diff --git a/pkg/services/container/transport_splitter.go b/pkg/services/container/transport_splitter.go new file mode 100644 index 000000000..4f8708da7 --- /dev/null +++ b/pkg/services/container/transport_splitter.go @@ -0,0 +1,92 @@ +package container + +import ( + "context" + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container" +) + +type ( + TransportSplitter struct { + next Server + + respSvc *response.Service + cnrAmount uint32 + } + + listStreamMsgSizeCtrl struct { + util.ServerStream + stream ListStream + respSvc *response.Service + cnrAmount uint32 + } +) + +func NewSplitterService(cnrAmount uint32, respSvc *response.Service, next Server) Server { + return &TransportSplitter{ + next: next, + respSvc: respSvc, + cnrAmount: cnrAmount, + } +} + +func (s *TransportSplitter) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) { + return s.next.Put(ctx, req) +} + +func (s *TransportSplitter) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) { + return s.next.Delete(ctx, req) +} + +func (s *TransportSplitter) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) { + return s.next.Get(ctx, req) +} + +func (s *TransportSplitter) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) { + return s.next.List(ctx, req) +} + +func (s *TransportSplitter) ListStream(req *container.ListStreamRequest, stream ListStream) error { + return s.next.ListStream(req, &listStreamMsgSizeCtrl{ + ServerStream: stream, + stream: stream, + respSvc: s.respSvc, + cnrAmount: s.cnrAmount, + }) +} + +func (s *listStreamMsgSizeCtrl) Send(resp *container.ListStreamResponse) error { + s.respSvc.SetMeta(resp) + body := resp.GetBody() + ids := body.GetContainerIDs() + + var newResp *container.ListStreamResponse + + for { + if newResp == nil { + newResp = new(container.ListStreamResponse) + newResp.SetBody(body) + } + + cut := min(s.cnrAmount, uint32(len(ids))) + + body.SetContainerIDs(ids[:cut]) + newResp.SetMetaHeader(resp.GetMetaHeader()) + newResp.SetVerificationHeader(resp.GetVerificationHeader()) + + if err := s.stream.Send(newResp); err != nil { + return fmt.Errorf("TransportSplitter: %w", err) + } + + ids = ids[cut:] + + if len(ids) == 0 { + break + } + } + + return nil +}