diff --git a/cmd/frostfs-cli/internal/client/client.go b/cmd/frostfs-cli/internal/client/client.go index 948d61f36..6d8cbf5c3 100644 --- a/cmd/frostfs-cli/internal/client/client.go +++ b/cmd/frostfs-cli/internal/client/client.go @@ -85,6 +85,62 @@ func (x ListContainersRes) SortedIDList() []cid.ID { return list } +// ContainerListStreamRes groups the resulting values of ListStream operation. +type ContainerListStreamRes struct { + ids []cid.ID +} + +// SortedIDList returns sorted identifiers of the matched containers. +func (x ContainerListStreamRes) SortedIDList() []cid.ID { + list := x.ids + sort.Slice(list, func(i, j int) bool { + lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString() + return strings.Compare(lhs, rhs) < 0 + }) + return list +} + +func ListContainersStream(ctx context.Context, prm ListContainersPrm) (res ContainerListStreamRes, err error) { + cliPrm := &client.PrmContainerListStream{ + XHeaders: prm.XHeaders, + OwnerID: prm.OwnerID, + Session: prm.Session, + } + rdr, err := prm.cli.ContainerListInit(ctx, *cliPrm) + if err != nil { + return res, fmt.Errorf("init container list: %w", err) + } + + buf := make([]cid.ID, 10) + var list []cid.ID + var n int + var ok bool + + for { + n, ok = rdr.Read(buf) + for i := range n { + list = append(list, buf[i]) + } + if !ok { + break + } + } + + _, err = rdr.Close() + if err != nil { + return res, fmt.Errorf("read container list: %w", err) + } + + sort.Slice(list, func(i, j int) bool { + lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString() + return strings.Compare(lhs, rhs) < 0 + }) + + res.ids = list + + return +} + // PutContainerPrm groups parameters of PutContainer operation. type PutContainerPrm struct { Client *client.Client diff --git a/cmd/frostfs-cli/modules/container/list.go b/cmd/frostfs-cli/modules/container/list.go index f01e4db4d..4d571d8f7 100644 --- a/cmd/frostfs-cli/modules/container/list.go +++ b/cmd/frostfs-cli/modules/container/list.go @@ -6,8 +6,11 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key" commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/spf13/cobra" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // flags of list command. @@ -51,16 +54,29 @@ var listContainersCmd = &cobra.Command{ var prm internalclient.ListContainersPrm prm.SetClient(cli) - prm.Account = idUser + prm.OwnerID = idUser - res, err := internalclient.ListContainers(cmd.Context(), prm) - commonCmd.ExitOnErr(cmd, "rpc error: %w", err) + var containerIDs []cid.ID + res, err := internalclient.ListContainersStream(cmd.Context(), prm) + if err != nil { + if e, ok := status.FromError(err); ok { + switch e.Code() { + case codes.Unimplemented: + resV1, err := internalclient.ListContainers(cmd.Context(), prm) + commonCmd.ExitOnErr(cmd, "rpc error: %w", err) + containerIDs = resV1.SortedIDList() + default: + commonCmd.ExitOnErr(cmd, "rpc error: %w", err) + } + } + } else { + containerIDs = res.SortedIDList() + } prmGet := internalclient.GetContainerPrm{ Client: cli, } - containerIDs := res.SortedIDList() for _, cnrID := range containerIDs { if flagVarListName == "" && !flagVarListPrintAttr { cmd.Println(cnrID.String()) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 9b727e41a..06d987aeb 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -609,6 +609,7 @@ type cfgContainer struct { parsers map[event.Type]event.NotificationParser subscribers map[event.Type][]event.Handler workerPool util.WorkerPool // pool for asynchronous handlers + cnrAmount uint64 // amount of containers to send via stream at once } type cfgFrostfsID struct { @@ -839,9 +840,15 @@ func initContainer(appCfg *config.Config) cfgContainer { containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize) fatalOnErr(err) + amount := config.UintSafe(appCfg.Sub("morph"), "container_batch_size") + if amount <= 0 { + amount = 1000 + } + return cfgContainer{ scriptHash: contractsconfig.Container(appCfg), workerPool: containerWorkerPool, + cnrAmount: amount, } } diff --git a/cmd/frostfs-node/container.go b/cmd/frostfs-node/container.go index 7d558dacb..46411239d 100644 --- a/cmd/frostfs-node/container.go +++ b/cmd/frostfs-node/container.go @@ -56,7 +56,9 @@ func initContainerService(_ context.Context, c *cfg) { &c.key.PrivateKey, containerService.NewAPEServer(defaultChainRouter, cnrRdr, newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient, - containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc), + containerService.NewSplitterService( + c.cfgContainer.cnrAmount, c.respSvc, + containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc)), ), ) service = containerService.NewAuditService(service, c.log, c.audit) diff --git a/config/example/node.yaml b/config/example/node.yaml index 8f9300b4a..41ceef032 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -83,6 +83,7 @@ morph: # Default value: block time. It is recommended to have this value less or equal to block time. # Cached entities: containers, container lists, eACL tables. container_cache_size: 100 # container_cache_size is is the maximum number of containers in the cache. + container_batch_size: 1000 # container_batch_size is the maximum amount of containers to send via stream at once switch_interval: 3m # interval b/w RPC switch attempts if the node is connected not to the highest priority node rpc_endpoint: # side chain NEO RPC endpoints; are shuffled and used one by one until the first success - address: wss://rpc1.morph.frostfs.info:40341/ws diff --git a/go.mod b/go.mod index 6ac37d343..441837867 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88 - git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241206094944-81c423e7094d + git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241210104938-c4463df8d467 git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88 @@ -134,3 +134,5 @@ require ( ) replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07 + +replace git.frostfs.info/TrueCloudLab/frostfs-sdk-go => /home/w0lframm/tmp/frostfs-sdk-go diff --git a/go.sum b/go.sum index e084c2445..079a71c13 100644 Binary files a/go.sum and b/go.sum differ diff --git a/pkg/network/transport/container/grpc/service.go b/pkg/network/transport/container/grpc/service.go index 49d083a90..8cbf8d9c3 100644 --- a/pkg/network/transport/container/grpc/service.go +++ b/pkg/network/transport/container/grpc/service.go @@ -80,3 +80,26 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil } + +type containerStreamerV2 struct { + containerGRPC.ContainerService_ListStreamServer +} + +func (s *containerStreamerV2) Send(resp *container.ListStreamResponse) error { + return s.ContainerService_ListStreamServer.Send( + resp.ToGRPCMessage().(*containerGRPC.ListStreamResponse), + ) +} + +// ListStream converts gRPC ListRequest message and server-side stream and overtakes its data +// to gRPC stream. +func (s *Server) ListStream(req *containerGRPC.ListStreamRequest, gStream containerGRPC.ContainerService_ListStreamServer) error { + listReq := new(container.ListStreamRequest) + if err := listReq.FromGRPCMessage(req); err != nil { + return err + } + + return s.srv.ListStream(listReq, &containerStreamerV2{ + ContainerService_ListStreamServer: gStream, + }) +} diff --git a/pkg/services/container/ape.go b/pkg/services/container/ape.go index 2cdb30b45..493452fa6 100644 --- a/pkg/services/container/ape.go +++ b/pkg/services/container/ape.go @@ -175,6 +175,79 @@ func (ac *apeChecker) List(ctx context.Context, req *container.ListRequest) (*co return nil, apeErr(nativeschema.MethodListContainers, s) } +func (ac *apeChecker) ListStream(req *container.ListStreamRequest, stream ListStream) error { + ctx, span := tracing.StartSpanFromContext(stream.Context(), "apeChecker.ListStream") + defer span.End() + + role, pk, err := ac.getRoleWithoutContainerID(req.GetBody().GetOwnerID(), req.GetMetaHeader(), req.GetVerificationHeader()) + if err != nil { + return err + } + + reqProps := map[string]string{ + nativeschema.PropertyKeyActorPublicKey: hex.EncodeToString(pk.Bytes()), + nativeschema.PropertyKeyActorRole: role, + } + + reqProps, err = ac.fillWithUserClaimTags(reqProps, pk) + if err != nil { + return err + } + if p, ok := peer.FromContext(ctx); ok { + if tcpAddr, ok := p.Addr.(*net.TCPAddr); ok { + reqProps[commonschema.PropertyKeyFrostFSSourceIP] = tcpAddr.IP.String() + } + } + + namespace, err := ac.namespaceByOwner(req.GetBody().GetOwnerID()) + if err != nil { + return fmt.Errorf("could not get owner namespace: %w", err) + } + if err := ac.validateNamespaceByPublicKey(pk, namespace); err != nil { + return err + } + + request := aperequest.NewRequest( + nativeschema.MethodListContainers, + aperequest.NewResource( + resourceName(namespace, ""), + make(map[string]string), + ), + reqProps, + ) + + groups, err := aperequest.Groups(ac.frostFSIDClient, pk) + if err != nil { + return fmt.Errorf("failed to get group ids: %w", err) + } + + // Policy contract keeps group related chains as namespace-group pair. + for i := range groups { + groups[i] = fmt.Sprintf("%s:%s", namespace, groups[i]) + } + + rt := policyengine.NewRequestTargetWithNamespace(namespace) + rt.User = &policyengine.Target{ + Type: policyengine.User, + Name: fmt.Sprintf("%s:%s", namespace, pk.Address()), + } + rt.Groups = make([]policyengine.Target, len(groups)) + for i := range groups { + rt.Groups[i] = policyengine.GroupTarget(groups[i]) + } + + s, found, err := ac.router.IsAllowed(apechain.Ingress, rt, request) + if err != nil { + return err + } + + if found && s == apechain.Allow { + return ac.next.ListStream(req, stream) + } + + return apeErr(nativeschema.MethodListContainers, s) +} + func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) { ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put") defer span.End() diff --git a/pkg/services/container/ape_test.go b/pkg/services/container/ape_test.go index b6b42a559..513ffff02 100644 --- a/pkg/services/container/ape_test.go +++ b/pkg/services/container/ape_test.go @@ -1079,6 +1079,11 @@ func (s *srvStub) List(context.Context, *container.ListRequest) (*container.List return &container.ListResponse{}, nil } +func (s *srvStub) ListStream(*container.ListStreamRequest, ListStream) error { + s.calls["ListStream"]++ + return nil +} + func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) { s.calls["Put"]++ return &container.PutResponse{}, nil diff --git a/pkg/services/container/audit.go b/pkg/services/container/audit.go index 411eb4863..b235efa3c 100644 --- a/pkg/services/container/audit.go +++ b/pkg/services/container/audit.go @@ -63,6 +63,17 @@ func (a *auditService) List(ctx context.Context, req *container.ListRequest) (*c return res, err } +// ListStream implements Server. +func (a *auditService) ListStream(req *container.ListStreamRequest, stream ListStream) error { + err := a.next.ListStream(req, stream) + if !a.enabled.Load() { + return err + } + audit.LogRequest(stream.Context(), a.log, container_grpc.ContainerService_ListStream_FullMethodName, req, + audit.TargetFromRef(req.GetBody().GetOwnerID(), &user.ID{}), err == nil) + return err +} + // Put implements Server. func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) { res, err := a.next.Put(ctx, req) diff --git a/pkg/services/container/executor.go b/pkg/services/container/executor.go index 70234d3de..cdd0d2514 100644 --- a/pkg/services/container/executor.go +++ b/pkg/services/container/executor.go @@ -14,6 +14,7 @@ type ServiceExecutor interface { Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error) Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error) List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error) + ListStream(context.Context, *container.ListStreamRequest, ListStream) error } type executorSvc struct { @@ -93,3 +94,11 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co s.respSvc.SetMeta(resp) return resp, nil } + +func (s *executorSvc) ListStream(req *container.ListStreamRequest, stream ListStream) error { + err := s.exec.ListStream(stream.Context(), req, stream) + if err != nil { + return fmt.Errorf("could not execute ListStream request: %w", err) + } + return nil +} diff --git a/pkg/services/container/morph/executor.go b/pkg/services/container/morph/executor.go index 211f469f3..e9d1606f1 100644 --- a/pkg/services/container/morph/executor.go +++ b/pkg/services/container/morph/executor.go @@ -200,3 +200,35 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody) return res, nil } + +func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error { + body := req.GetBody() + idV2 := body.GetOwnerID() + if idV2 == nil { + return errMissingUserID + } + + var id user.ID + + err := id.ReadFromV2(*idV2) + if err != nil { + return fmt.Errorf("invalid user ID: %w", err) + } + + cnrs, err := s.rdr.ContainersOf(&id) + if err != nil { + return err + } + + cidList := make([]refs.ContainerID, len(cnrs)) + for i := range cnrs { + cnrs[i].WriteToV2(&cidList[i]) + } + + resBody := new(container.ListStreamResponseBody) + resBody.SetContainerIDs(cidList) + r := new(container.ListStreamResponse) + r.SetBody(resBody) + + return stream.Send(r) +} diff --git a/pkg/services/container/server.go b/pkg/services/container/server.go index 78fd3d34c..d9208077d 100644 --- a/pkg/services/container/server.go +++ b/pkg/services/container/server.go @@ -3,6 +3,7 @@ package container import ( "context" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container" ) @@ -12,4 +13,11 @@ type Server interface { Get(context.Context, *container.GetRequest) (*container.GetResponse, error) Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error) List(context.Context, *container.ListRequest) (*container.ListResponse, error) + ListStream(*container.ListStreamRequest, ListStream) error +} + +// ListStream is an interface of FrostFS API v2 compatible search streamer. +type ListStream interface { + util.ServerStream + Send(*container.ListStreamResponse) error } diff --git a/pkg/services/container/sign.go b/pkg/services/container/sign.go index c478c0e1c..85fe7ae87 100644 --- a/pkg/services/container/sign.go +++ b/pkg/services/container/sign.go @@ -56,3 +56,40 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req)) return resp, s.sigSvc.SignResponse(resp, err) } + +func (s *signService) ListStream(req *container.ListStreamRequest, stream ListStream) error { + if err := s.sigSvc.VerifyRequest(req); err != nil { + resp := new(container.ListStreamResponse) + _ = s.sigSvc.SignResponse(resp, err) + return stream.Send(resp) + } + + ss := &listStreamSigner{ + ListStream: stream, + sigSvc: s.sigSvc, + } + err := s.svc.ListStream(req, ss) + if err != nil || !ss.nonEmptyResp { + return ss.send(new(container.ListStreamResponse), err) + } + return nil +} + +type listStreamSigner struct { + ListStream + sigSvc *util.SignService + + nonEmptyResp bool // set on first Send call +} + +func (s *listStreamSigner) Send(resp *container.ListStreamResponse) error { + s.nonEmptyResp = true + return s.send(resp, nil) +} + +func (s *listStreamSigner) send(resp *container.ListStreamResponse, err error) error { + if err := s.sigSvc.SignResponse(resp, err); err != nil { + return err + } + return s.ListStream.Send(resp) +} diff --git a/pkg/services/container/transport_splitter.go b/pkg/services/container/transport_splitter.go new file mode 100644 index 000000000..91fcb0538 --- /dev/null +++ b/pkg/services/container/transport_splitter.go @@ -0,0 +1,92 @@ +package container + +import ( + "context" + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container" +) + +type ( + TransportSplitter struct { + next Server + + respSvc *response.Service + cnrAmount uint64 + } + + listStreamMsgSizeCtrl struct { + util.ServerStream + stream ListStream + respSvc *response.Service + cnrAmount uint64 + } +) + +func NewSplitterService(cnrAmount uint64, respSvc *response.Service, next Server) Server { + return &TransportSplitter{ + next: next, + respSvc: respSvc, + cnrAmount: cnrAmount, + } +} + +func (s *TransportSplitter) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) { + return s.next.Put(ctx, req) +} + +func (s *TransportSplitter) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) { + return s.next.Delete(ctx, req) +} + +func (s *TransportSplitter) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) { + return s.next.Get(ctx, req) +} + +func (s *TransportSplitter) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) { + return s.next.List(ctx, req) +} + +func (s *TransportSplitter) ListStream(req *container.ListStreamRequest, stream ListStream) error { + return s.next.ListStream(req, &listStreamMsgSizeCtrl{ + ServerStream: stream, + stream: stream, + respSvc: s.respSvc, + cnrAmount: s.cnrAmount, + }) +} + +func (s *listStreamMsgSizeCtrl) Send(resp *container.ListStreamResponse) error { + s.respSvc.SetMeta(resp) + body := resp.GetBody() + ids := body.GetContainerIDs() + + var newResp *container.ListStreamResponse + + for { + if newResp == nil { + newResp = new(container.ListStreamResponse) + newResp.SetBody(body) + } + + cut := min(s.cnrAmount, uint64(len(ids))) + + body.SetContainerIDs(ids[:cut]) + newResp.SetMetaHeader(resp.GetMetaHeader()) + newResp.SetVerificationHeader(resp.GetVerificationHeader()) + + if err := s.stream.Send(newResp); err != nil { + return fmt.Errorf("TransportSplitter: %w", err) + } + + ids = ids[cut:] + + if len(ids) == 0 { + break + } + } + + return nil +}