From 9439cee5505754e82d7302dc303ca774997ba972 Mon Sep 17 00:00:00 2001 From: Ekaterina Lebedeva Date: Mon, 28 Oct 2024 18:10:07 +0300 Subject: [PATCH] [#1452] container: Add ListStream method * Added TransportSplitter wrapper around ExecutionService to split container ID list read from contract in parts that are smaller than grpc max message size. Batch size can be changed in node configuration file (as in example config file). * Changed `container list` implementaion in cli: now ListStream is called by default. Old List is called only if ListStream is not implemented. Signed-off-by: Ekaterina Lebedeva --- cmd/frostfs-cli/internal/client/client.go | 56 +++++++++++ cmd/frostfs-cli/modules/container/list.go | 22 ++++- cmd/frostfs-node/config.go | 7 ++ cmd/frostfs-node/container.go | 4 +- config/example/node.yaml | 1 + go.mod | 4 +- go.sum | Bin 41910 -> 41647 bytes .../transport/container/grpc/service.go | 23 +++++ pkg/services/container/ape.go | 73 ++++++++++++++ pkg/services/container/ape_test.go | 5 + pkg/services/container/audit.go | 11 +++ pkg/services/container/executor.go | 9 ++ pkg/services/container/morph/executor.go | 32 ++++++ pkg/services/container/server.go | 8 ++ pkg/services/container/sign.go | 37 +++++++ pkg/services/container/transport_splitter.go | 92 ++++++++++++++++++ 16 files changed, 379 insertions(+), 5 deletions(-) create mode 100644 pkg/services/container/transport_splitter.go diff --git a/cmd/frostfs-cli/internal/client/client.go b/cmd/frostfs-cli/internal/client/client.go index 948d61f36..03466b558 100644 --- a/cmd/frostfs-cli/internal/client/client.go +++ b/cmd/frostfs-cli/internal/client/client.go @@ -85,6 +85,62 @@ func (x ListContainersRes) SortedIDList() []cid.ID { return list } +// ContainerListStreamRes groups the resulting values of ListStream operation. +type ContainerListStreamRes struct { + ids []cid.ID +} + +// SortedIDList returns sorted identifiers of the matched containers. +func (x ContainerListStreamRes) SortedIDList() []cid.ID { + list := x.ids + sort.Slice(list, func(i, j int) bool { + lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString() + return strings.Compare(lhs, rhs) < 0 + }) + return list +} + +func ListContainersStream(ctx context.Context, prm ListContainersPrm) (res ContainerListStreamRes, err error) { + cliPrm := &client.PrmContainerListStream{ + XHeaders: prm.XHeaders, + Account: prm.Account, + Session: prm.Session, + } + rdr, err := prm.cli.ContainerListInit(ctx, *cliPrm) + if err != nil { + return res, fmt.Errorf("init container list: %w", err) + } + + buf := make([]cid.ID, 10) + var list []cid.ID + var n int + var ok bool + + for { + n, ok = rdr.Read(buf) + for i := range n { + list = append(list, buf[i]) + } + if !ok { + break + } + } + + _, err = rdr.Close() + if err != nil { + return res, fmt.Errorf("read container list: %w", err) + } + + sort.Slice(list, func(i, j int) bool { + lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString() + return strings.Compare(lhs, rhs) < 0 + }) + + res.ids = list + + return +} + // PutContainerPrm groups parameters of PutContainer operation. type PutContainerPrm struct { Client *client.Client diff --git a/cmd/frostfs-cli/modules/container/list.go b/cmd/frostfs-cli/modules/container/list.go index f01e4db4d..ac4481d19 100644 --- a/cmd/frostfs-cli/modules/container/list.go +++ b/cmd/frostfs-cli/modules/container/list.go @@ -6,8 +6,11 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key" commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/spf13/cobra" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // flags of list command. @@ -53,14 +56,27 @@ var listContainersCmd = &cobra.Command{ prm.SetClient(cli) prm.Account = idUser - res, err := internalclient.ListContainers(cmd.Context(), prm) - commonCmd.ExitOnErr(cmd, "rpc error: %w", err) + var containerIDs []cid.ID + res, err := internalclient.ListContainersStream(cmd.Context(), prm) + if err != nil { + if e, ok := status.FromError(err); ok { + switch e.Code() { + case codes.Unimplemented: + resV1, err := internalclient.ListContainers(cmd.Context(), prm) + commonCmd.ExitOnErr(cmd, "rpc error: %w", err) + containerIDs = resV1.SortedIDList() + default: + commonCmd.ExitOnErr(cmd, "rpc error: %w", err) + } + } + } else { + containerIDs = res.SortedIDList() + } prmGet := internalclient.GetContainerPrm{ Client: cli, } - containerIDs := res.SortedIDList() for _, cnrID := range containerIDs { if flagVarListName == "" && !flagVarListPrintAttr { cmd.Println(cnrID.String()) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 5af37865f..1a60ba20e 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -611,6 +611,7 @@ type cfgContainer struct { parsers map[event.Type]event.NotificationParser subscribers map[event.Type][]event.Handler workerPool util.WorkerPool // pool for asynchronous handlers + cnrAmount uint64 // amount of containers to send via stream at once } type cfgFrostfsID struct { @@ -841,9 +842,15 @@ func initContainer(appCfg *config.Config) cfgContainer { containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize) fatalOnErr(err) + amount := config.UintSafe(appCfg.Sub("morph"), "container_batch_size") + if amount <= 0 { + amount = 1000 + } + return cfgContainer{ scriptHash: contractsconfig.Container(appCfg), workerPool: containerWorkerPool, + cnrAmount: amount, } } diff --git a/cmd/frostfs-node/container.go b/cmd/frostfs-node/container.go index d3e1b2766..4d78a6fbb 100644 --- a/cmd/frostfs-node/container.go +++ b/cmd/frostfs-node/container.go @@ -56,7 +56,9 @@ func initContainerService(_ context.Context, c *cfg) { &c.key.PrivateKey, containerService.NewAPEServer(defaultChainRouter, cnrRdr, newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient, - containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc), + containerService.NewSplitterService( + c.cfgContainer.cnrAmount, c.respSvc, + containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc)), ), ) service = containerService.NewAuditService(service, c.log, c.audit) diff --git a/config/example/node.yaml b/config/example/node.yaml index 8f9300b4a..41ceef032 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -83,6 +83,7 @@ morph: # Default value: block time. It is recommended to have this value less or equal to block time. # Cached entities: containers, container lists, eACL tables. container_cache_size: 100 # container_cache_size is is the maximum number of containers in the cache. + container_batch_size: 1000 # container_batch_size is the maximum amount of containers to send via stream at once switch_interval: 3m # interval b/w RPC switch attempts if the node is connected not to the highest priority node rpc_endpoint: # side chain NEO RPC endpoints; are shuffled and used one by one until the first success - address: wss://rpc1.morph.frostfs.info:40341/ws diff --git a/go.mod b/go.mod index c538a3178..fef2d8bac 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88 - git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241107121119-cb813e27a823 + git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241113074125-afdc2d8340bb git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88 @@ -134,3 +134,5 @@ require ( ) replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07 + +replace git.frostfs.info/TrueCloudLab/frostfs-sdk-go => /home/w0lframm/tmp/frostfs-sdk-go diff --git a/go.sum b/go.sum index 064f3274e4cc79d8789b54ce3fdc6455024c046d..17a5393c45dedb5d5db4aa98e95478177ab623a0 100644 GIT binary patch delta 14 WcmdmXoN4`0rVTS$HZNcmn+E_i9R^PT delta 221 zcmZ2~lxf>>rVTS$;?s)qi%Zgqb&FH7b<^_|$_(^?P}j)7$i&dlz}(Qt(9qCQH#y0| z&^Xn|Jki3)SRupEsyMYE*U~MuG~Y8Xw=&(s&|6>I*hf3Ez%xfMHa<0lI_q>&)3V%PXSq#n(u9{?Hv_p>F!mLROMN0Ru+<%AL>=;=2B3QSmv4M X6XcQTXW|=~9yQsJU2*eF7O{B%)(1q0 diff --git a/pkg/network/transport/container/grpc/service.go b/pkg/network/transport/container/grpc/service.go index 49d083a90..8cbf8d9c3 100644 --- a/pkg/network/transport/container/grpc/service.go +++ b/pkg/network/transport/container/grpc/service.go @@ -80,3 +80,26 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil } + +type containerStreamerV2 struct { + containerGRPC.ContainerService_ListStreamServer +} + +func (s *containerStreamerV2) Send(resp *container.ListStreamResponse) error { + return s.ContainerService_ListStreamServer.Send( + resp.ToGRPCMessage().(*containerGRPC.ListStreamResponse), + ) +} + +// ListStream converts gRPC ListRequest message and server-side stream and overtakes its data +// to gRPC stream. +func (s *Server) ListStream(req *containerGRPC.ListStreamRequest, gStream containerGRPC.ContainerService_ListStreamServer) error { + listReq := new(container.ListStreamRequest) + if err := listReq.FromGRPCMessage(req); err != nil { + return err + } + + return s.srv.ListStream(listReq, &containerStreamerV2{ + ContainerService_ListStreamServer: gStream, + }) +} diff --git a/pkg/services/container/ape.go b/pkg/services/container/ape.go index 2cdb30b45..493452fa6 100644 --- a/pkg/services/container/ape.go +++ b/pkg/services/container/ape.go @@ -175,6 +175,79 @@ func (ac *apeChecker) List(ctx context.Context, req *container.ListRequest) (*co return nil, apeErr(nativeschema.MethodListContainers, s) } +func (ac *apeChecker) ListStream(req *container.ListStreamRequest, stream ListStream) error { + ctx, span := tracing.StartSpanFromContext(stream.Context(), "apeChecker.ListStream") + defer span.End() + + role, pk, err := ac.getRoleWithoutContainerID(req.GetBody().GetOwnerID(), req.GetMetaHeader(), req.GetVerificationHeader()) + if err != nil { + return err + } + + reqProps := map[string]string{ + nativeschema.PropertyKeyActorPublicKey: hex.EncodeToString(pk.Bytes()), + nativeschema.PropertyKeyActorRole: role, + } + + reqProps, err = ac.fillWithUserClaimTags(reqProps, pk) + if err != nil { + return err + } + if p, ok := peer.FromContext(ctx); ok { + if tcpAddr, ok := p.Addr.(*net.TCPAddr); ok { + reqProps[commonschema.PropertyKeyFrostFSSourceIP] = tcpAddr.IP.String() + } + } + + namespace, err := ac.namespaceByOwner(req.GetBody().GetOwnerID()) + if err != nil { + return fmt.Errorf("could not get owner namespace: %w", err) + } + if err := ac.validateNamespaceByPublicKey(pk, namespace); err != nil { + return err + } + + request := aperequest.NewRequest( + nativeschema.MethodListContainers, + aperequest.NewResource( + resourceName(namespace, ""), + make(map[string]string), + ), + reqProps, + ) + + groups, err := aperequest.Groups(ac.frostFSIDClient, pk) + if err != nil { + return fmt.Errorf("failed to get group ids: %w", err) + } + + // Policy contract keeps group related chains as namespace-group pair. + for i := range groups { + groups[i] = fmt.Sprintf("%s:%s", namespace, groups[i]) + } + + rt := policyengine.NewRequestTargetWithNamespace(namespace) + rt.User = &policyengine.Target{ + Type: policyengine.User, + Name: fmt.Sprintf("%s:%s", namespace, pk.Address()), + } + rt.Groups = make([]policyengine.Target, len(groups)) + for i := range groups { + rt.Groups[i] = policyengine.GroupTarget(groups[i]) + } + + s, found, err := ac.router.IsAllowed(apechain.Ingress, rt, request) + if err != nil { + return err + } + + if found && s == apechain.Allow { + return ac.next.ListStream(req, stream) + } + + return apeErr(nativeschema.MethodListContainers, s) +} + func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) { ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put") defer span.End() diff --git a/pkg/services/container/ape_test.go b/pkg/services/container/ape_test.go index b6b42a559..513ffff02 100644 --- a/pkg/services/container/ape_test.go +++ b/pkg/services/container/ape_test.go @@ -1079,6 +1079,11 @@ func (s *srvStub) List(context.Context, *container.ListRequest) (*container.List return &container.ListResponse{}, nil } +func (s *srvStub) ListStream(*container.ListStreamRequest, ListStream) error { + s.calls["ListStream"]++ + return nil +} + func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) { s.calls["Put"]++ return &container.PutResponse{}, nil diff --git a/pkg/services/container/audit.go b/pkg/services/container/audit.go index 411eb4863..b235efa3c 100644 --- a/pkg/services/container/audit.go +++ b/pkg/services/container/audit.go @@ -63,6 +63,17 @@ func (a *auditService) List(ctx context.Context, req *container.ListRequest) (*c return res, err } +// ListStream implements Server. +func (a *auditService) ListStream(req *container.ListStreamRequest, stream ListStream) error { + err := a.next.ListStream(req, stream) + if !a.enabled.Load() { + return err + } + audit.LogRequest(stream.Context(), a.log, container_grpc.ContainerService_ListStream_FullMethodName, req, + audit.TargetFromRef(req.GetBody().GetOwnerID(), &user.ID{}), err == nil) + return err +} + // Put implements Server. func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) { res, err := a.next.Put(ctx, req) diff --git a/pkg/services/container/executor.go b/pkg/services/container/executor.go index 70234d3de..cdd0d2514 100644 --- a/pkg/services/container/executor.go +++ b/pkg/services/container/executor.go @@ -14,6 +14,7 @@ type ServiceExecutor interface { Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error) Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error) List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error) + ListStream(context.Context, *container.ListStreamRequest, ListStream) error } type executorSvc struct { @@ -93,3 +94,11 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co s.respSvc.SetMeta(resp) return resp, nil } + +func (s *executorSvc) ListStream(req *container.ListStreamRequest, stream ListStream) error { + err := s.exec.ListStream(stream.Context(), req, stream) + if err != nil { + return fmt.Errorf("could not execute ListStream request: %w", err) + } + return nil +} diff --git a/pkg/services/container/morph/executor.go b/pkg/services/container/morph/executor.go index 211f469f3..e9d1606f1 100644 --- a/pkg/services/container/morph/executor.go +++ b/pkg/services/container/morph/executor.go @@ -200,3 +200,35 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody) return res, nil } + +func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error { + body := req.GetBody() + idV2 := body.GetOwnerID() + if idV2 == nil { + return errMissingUserID + } + + var id user.ID + + err := id.ReadFromV2(*idV2) + if err != nil { + return fmt.Errorf("invalid user ID: %w", err) + } + + cnrs, err := s.rdr.ContainersOf(&id) + if err != nil { + return err + } + + cidList := make([]refs.ContainerID, len(cnrs)) + for i := range cnrs { + cnrs[i].WriteToV2(&cidList[i]) + } + + resBody := new(container.ListStreamResponseBody) + resBody.SetContainerIDs(cidList) + r := new(container.ListStreamResponse) + r.SetBody(resBody) + + return stream.Send(r) +} diff --git a/pkg/services/container/server.go b/pkg/services/container/server.go index 78fd3d34c..d9208077d 100644 --- a/pkg/services/container/server.go +++ b/pkg/services/container/server.go @@ -3,6 +3,7 @@ package container import ( "context" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container" ) @@ -12,4 +13,11 @@ type Server interface { Get(context.Context, *container.GetRequest) (*container.GetResponse, error) Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error) List(context.Context, *container.ListRequest) (*container.ListResponse, error) + ListStream(*container.ListStreamRequest, ListStream) error +} + +// ListStream is an interface of FrostFS API v2 compatible search streamer. +type ListStream interface { + util.ServerStream + Send(*container.ListStreamResponse) error } diff --git a/pkg/services/container/sign.go b/pkg/services/container/sign.go index c478c0e1c..85fe7ae87 100644 --- a/pkg/services/container/sign.go +++ b/pkg/services/container/sign.go @@ -56,3 +56,40 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req)) return resp, s.sigSvc.SignResponse(resp, err) } + +func (s *signService) ListStream(req *container.ListStreamRequest, stream ListStream) error { + if err := s.sigSvc.VerifyRequest(req); err != nil { + resp := new(container.ListStreamResponse) + _ = s.sigSvc.SignResponse(resp, err) + return stream.Send(resp) + } + + ss := &listStreamSigner{ + ListStream: stream, + sigSvc: s.sigSvc, + } + err := s.svc.ListStream(req, ss) + if err != nil || !ss.nonEmptyResp { + return ss.send(new(container.ListStreamResponse), err) + } + return nil +} + +type listStreamSigner struct { + ListStream + sigSvc *util.SignService + + nonEmptyResp bool // set on first Send call +} + +func (s *listStreamSigner) Send(resp *container.ListStreamResponse) error { + s.nonEmptyResp = true + return s.send(resp, nil) +} + +func (s *listStreamSigner) send(resp *container.ListStreamResponse, err error) error { + if err := s.sigSvc.SignResponse(resp, err); err != nil { + return err + } + return s.ListStream.Send(resp) +} diff --git a/pkg/services/container/transport_splitter.go b/pkg/services/container/transport_splitter.go new file mode 100644 index 000000000..91fcb0538 --- /dev/null +++ b/pkg/services/container/transport_splitter.go @@ -0,0 +1,92 @@ +package container + +import ( + "context" + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container" +) + +type ( + TransportSplitter struct { + next Server + + respSvc *response.Service + cnrAmount uint64 + } + + listStreamMsgSizeCtrl struct { + util.ServerStream + stream ListStream + respSvc *response.Service + cnrAmount uint64 + } +) + +func NewSplitterService(cnrAmount uint64, respSvc *response.Service, next Server) Server { + return &TransportSplitter{ + next: next, + respSvc: respSvc, + cnrAmount: cnrAmount, + } +} + +func (s *TransportSplitter) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) { + return s.next.Put(ctx, req) +} + +func (s *TransportSplitter) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) { + return s.next.Delete(ctx, req) +} + +func (s *TransportSplitter) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) { + return s.next.Get(ctx, req) +} + +func (s *TransportSplitter) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) { + return s.next.List(ctx, req) +} + +func (s *TransportSplitter) ListStream(req *container.ListStreamRequest, stream ListStream) error { + return s.next.ListStream(req, &listStreamMsgSizeCtrl{ + ServerStream: stream, + stream: stream, + respSvc: s.respSvc, + cnrAmount: s.cnrAmount, + }) +} + +func (s *listStreamMsgSizeCtrl) Send(resp *container.ListStreamResponse) error { + s.respSvc.SetMeta(resp) + body := resp.GetBody() + ids := body.GetContainerIDs() + + var newResp *container.ListStreamResponse + + for { + if newResp == nil { + newResp = new(container.ListStreamResponse) + newResp.SetBody(body) + } + + cut := min(s.cnrAmount, uint64(len(ids))) + + body.SetContainerIDs(ids[:cut]) + newResp.SetMetaHeader(resp.GetMetaHeader()) + newResp.SetVerificationHeader(resp.GetVerificationHeader()) + + if err := s.stream.Send(newResp); err != nil { + return fmt.Errorf("TransportSplitter: %w", err) + } + + ids = ids[cut:] + + if len(ids) == 0 { + break + } + } + + return nil +}