diff --git a/cmd/frostfs-cli/internal/client/client.go b/cmd/frostfs-cli/internal/client/client.go index dcd67f0d..90d7a52e 100644 --- a/cmd/frostfs-cli/internal/client/client.go +++ b/cmd/frostfs-cli/internal/client/client.go @@ -85,6 +85,57 @@ func (x ListContainersRes) SortedIDList() []cid.ID { return list } +// SearchObjectsRes groups the resulting values of SearchObjects operation. +type ContainerListStreamRes struct { + ids []cid.ID +} + +// IDList returns identifiers of the matched objects. +func (x ContainerListStreamRes) IDList() []cid.ID { + return x.ids +} + +func ListContainersStream(ctx context.Context, prm ListContainersPrm) (res ContainerListStreamRes, err error) { + cliPrm := &client.PrmContainerListStream{ + XHeaders: prm.XHeaders, + Account: prm.Account, + Session: prm.Session, + } + rdr, err := prm.cli.ContainerListInit(ctx, *cliPrm) + if err != nil { + return res, fmt.Errorf("init container list: %w", err) + } + + buf := make([]cid.ID, 10) + var list []cid.ID + var n int + var ok bool + + for { + n, ok = rdr.Read(buf) + for i := range n { + list = append(list, buf[i]) + } + if !ok { + break + } + } + + _, err = rdr.Close() + if err != nil { + return res, fmt.Errorf("read container list: %w", err) + } + + sort.Slice(list, func(i, j int) bool { + lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString() + return strings.Compare(lhs, rhs) < 0 + }) + + res.ids = list + + return +} + // PutContainerPrm groups parameters of PutContainer operation. type PutContainerPrm struct { Client *client.Client diff --git a/cmd/frostfs-cli/modules/container/list_stream.go b/cmd/frostfs-cli/modules/container/list_stream.go new file mode 100644 index 00000000..41ad6c3a --- /dev/null +++ b/cmd/frostfs-cli/modules/container/list_stream.go @@ -0,0 +1,91 @@ +package container + +import ( + internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags" + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key" + commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" + containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" + "github.com/spf13/cobra" +) + +var listContainersStreamCmd = &cobra.Command{ + Use: "list-stream", + Short: "List all created containers", + Long: "List all created containers", + Run: func(cmd *cobra.Command, _ []string) { + var idUser user.ID + + generateKey, _ := cmd.Flags().GetBool(commonflags.GenerateKey) + if flagVarListContainerOwner == "" && generateKey { + cmd.PrintErrln("WARN: using -g without --owner - output will be empty") + } + + key := key.GetOrGenerate(cmd) + + if flagVarListContainerOwner == "" { + user.IDFromKey(&idUser, key.PublicKey) + } else { + err := idUser.DecodeString(flagVarListContainerOwner) + commonCmd.ExitOnErr(cmd, "invalid user ID: %w", err) + } + + cli := internalclient.GetSDKClientByFlag(cmd, key, commonflags.RPC) + + var prm internalclient.ListContainersPrm + prm.SetClient(cli) + prm.Account = idUser + + res, err := internalclient.ListContainersStream(cmd.Context(), prm) + commonCmd.ExitOnErr(cmd, "rpc error: %w", err) + + prmGet := internalclient.GetContainerPrm{ + Client: cli, + } + + containerIDs := res.IDList() + for _, cnrID := range containerIDs { + if flagVarListName == "" && !flagVarListPrintAttr { + cmd.Println(cnrID.String()) + continue + } + + prmGet.ClientParams.ContainerID = &cnrID + res, err := internalclient.GetContainer(cmd.Context(), prmGet) + if err != nil { + cmd.Printf(" failed to read attributes: %v\n", err) + continue + } + + cnr := res.Container() + if cnrName := containerSDK.Name(cnr); flagVarListName != "" && cnrName != flagVarListName { + continue + } + cmd.Println(cnrID.String()) + + if flagVarListPrintAttr { + cnr.IterateUserAttributes(func(key, val string) { + cmd.Printf(" %s: %s\n", key, val) + }) + } + } + }, +} + +func initContainerListStreamContainersCmd() { + commonflags.Init(listContainersStreamCmd) + + flags := listContainersStreamCmd.Flags() + + flags.StringVar(&flagVarListName, flagListName, "", + "List containers by the attribute name", + ) + flags.StringVar(&flagVarListContainerOwner, flagListContainerOwner, "", + "Owner of containers (omit to use owner from private key)", + ) + flags.BoolVar(&flagVarListPrintAttr, flagListPrintAttr, false, + "Request and print attributes of each container", + ) + flags.Lookup(commonflags.GenerateKey).Usage = generateKeyContainerUsage +} diff --git a/cmd/frostfs-cli/modules/container/root.go b/cmd/frostfs-cli/modules/container/root.go index 2da21e76..d7c82b73 100644 --- a/cmd/frostfs-cli/modules/container/root.go +++ b/cmd/frostfs-cli/modules/container/root.go @@ -21,6 +21,7 @@ var Cmd = &cobra.Command{ func init() { containerChildCommand := []*cobra.Command{ listContainersCmd, + listContainersStreamCmd, createContainerCmd, deleteContainerCmd, listContainerObjectsCmd, @@ -32,6 +33,7 @@ func init() { Cmd.AddCommand(containerChildCommand...) initContainerListContainersCmd() + initContainerListStreamContainersCmd() initContainerCreateCmd() initContainerDeleteCmd() initContainerListObjectsCmd() diff --git a/pkg/network/transport/container/grpc/service.go b/pkg/network/transport/container/grpc/service.go index 9fae22b4..b77de0c0 100644 --- a/pkg/network/transport/container/grpc/service.go +++ b/pkg/network/transport/container/grpc/service.go @@ -80,3 +80,26 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil } + +type containerStreamerV2 struct { + containerGRPC.ContainerService_ListStreamServer +} + +func (s *containerStreamerV2) Send(resp *container.ListStreamResponse) error { + return s.ContainerService_ListStreamServer.Send( + resp.ToGRPCMessage().(*containerGRPC.ListStreamResponse), + ) +} + +// ListStream converts gRPC ListRequest message and server-side stream and overtakes its data +// to gRPC stream. +func (s *Server) ListStream(req *containerGRPC.ListStreamRequest, gStream containerGRPC.ContainerService_ListStreamServer) error { + listReq := new(container.ListStreamRequest) + if err := listReq.FromGRPCMessage(req); err != nil { + return err + } + + return s.srv.ListStream(listReq, &containerStreamerV2{ + ContainerService_ListStreamServer: gStream, + }) +} diff --git a/pkg/services/container/ape.go b/pkg/services/container/ape.go index d92ecf58..1cd164e0 100644 --- a/pkg/services/container/ape.go +++ b/pkg/services/container/ape.go @@ -175,6 +175,79 @@ func (ac *apeChecker) List(ctx context.Context, req *container.ListRequest) (*co return nil, apeErr(nativeschema.MethodListContainers, s) } +func (ac *apeChecker) ListStream(req *container.ListStreamRequest, stream ListStream) error { + ctx, span := tracing.StartSpanFromContext(stream.Context(), "apeChecker.ListStream") + defer span.End() + + role, pk, err := ac.getRoleWithoutContainerID(req.GetBody().GetOwnerID(), req.GetMetaHeader(), req.GetVerificationHeader()) + if err != nil { + return err + } + + reqProps := map[string]string{ + nativeschema.PropertyKeyActorPublicKey: hex.EncodeToString(pk.Bytes()), + nativeschema.PropertyKeyActorRole: role, + } + + reqProps, err = ac.fillWithUserClaimTags(reqProps, pk) + if err != nil { + return err + } + if p, ok := peer.FromContext(ctx); ok { + if tcpAddr, ok := p.Addr.(*net.TCPAddr); ok { + reqProps[commonschema.PropertyKeyFrostFSSourceIP] = tcpAddr.IP.String() + } + } + + namespace, err := ac.namespaceByOwner(req.GetBody().GetOwnerID()) + if err != nil { + return fmt.Errorf("could not get owner namespace: %w", err) + } + if err := ac.validateNamespaceByPublicKey(pk, namespace); err != nil { + return err + } + + request := aperequest.NewRequest( + nativeschema.MethodListContainers, + aperequest.NewResource( + resourceName(namespace, ""), + make(map[string]string), + ), + reqProps, + ) + + groups, err := aperequest.Groups(ac.frostFSIDClient, pk) + if err != nil { + return fmt.Errorf("failed to get group ids: %w", err) + } + + // Policy contract keeps group related chains as namespace-group pair. + for i := range groups { + groups[i] = fmt.Sprintf("%s:%s", namespace, groups[i]) + } + + rt := policyengine.NewRequestTargetWithNamespace(namespace) + rt.User = &policyengine.Target{ + Type: policyengine.User, + Name: fmt.Sprintf("%s:%s", namespace, pk.Address()), + } + rt.Groups = make([]policyengine.Target, len(groups)) + for i := range groups { + rt.Groups[i] = policyengine.GroupTarget(groups[i]) + } + + s, found, err := ac.router.IsAllowed(apechain.Ingress, rt, request) + if err != nil { + return err + } + + if found && s == apechain.Allow { + return ac.next.ListStream(req, stream) + } + + return apeErr(nativeschema.MethodListContainers, s) +} + func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) { ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put") defer span.End() diff --git a/pkg/services/container/ape_test.go b/pkg/services/container/ape_test.go index d6f9b75e..7b258f79 100644 --- a/pkg/services/container/ape_test.go +++ b/pkg/services/container/ape_test.go @@ -1079,6 +1079,11 @@ func (s *srvStub) List(context.Context, *container.ListRequest) (*container.List return &container.ListResponse{}, nil } +func (s *srvStub) ListStream(*container.ListStreamRequest, ListStream) error { + s.calls["ListStream"]++ + return nil +} + func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) { s.calls["Put"]++ return &container.PutResponse{}, nil diff --git a/pkg/services/container/audit.go b/pkg/services/container/audit.go index b257272f..073584b0 100644 --- a/pkg/services/container/audit.go +++ b/pkg/services/container/audit.go @@ -63,6 +63,17 @@ func (a *auditService) List(ctx context.Context, req *container.ListRequest) (*c return res, err } +// ListStream implements Server. +func (a *auditService) ListStream(req *container.ListStreamRequest, stream ListStream) error { + err := a.next.ListStream(req, stream) + if !a.enabled.Load() { + return err + } + audit.LogRequest(a.log, container_grpc.ContainerService_ListStream_FullMethodName, req, + audit.TargetFromRef(req.GetBody().GetOwnerID(), &user.ID{}), err == nil) + return err +} + // Put implements Server. func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) { res, err := a.next.Put(ctx, req) diff --git a/pkg/services/container/executor.go b/pkg/services/container/executor.go index 0917e3bd..b9a6f8c7 100644 --- a/pkg/services/container/executor.go +++ b/pkg/services/container/executor.go @@ -14,6 +14,7 @@ type ServiceExecutor interface { Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error) Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error) List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error) + ListStream(*container.ListStreamRequest, ListStream) error } type executorSvc struct { @@ -93,3 +94,18 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co s.respSvc.SetMeta(resp) return resp, nil } + +func (s *executorSvc) ListStream(req *container.ListStreamRequest, stream ListStream) error { + err := s.exec.ListStream(req, stream) + if err != nil { + return fmt.Errorf("could not execute ListStream request: %w", err) + } + + r := new(container.ListStreamResponse) + respBody := new(container.ListStreamResponseBody) + r.SetBody(respBody) + + s.respSvc.SetMeta(r) + + return stream.Send(r) +} diff --git a/pkg/services/container/morph/executor.go b/pkg/services/container/morph/executor.go index 05d8749c..0ad301ed 100644 --- a/pkg/services/container/morph/executor.go +++ b/pkg/services/container/morph/executor.go @@ -201,3 +201,35 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody) return res, nil } + +func (s *morphExecutor) ListStream(req *container.ListStreamRequest, stream containerSvc.ListStream) error { + body := req.GetBody() + idV2 := body.GetOwnerID() + if idV2 == nil { + return errMissingUserID + } + + var id user.ID + + err := id.ReadFromV2(*idV2) + if err != nil { + return fmt.Errorf("invalid user ID: %w", err) + } + + cnrs, err := s.rdr.ContainersOf(&id) + if err != nil { + return err + } + + cidList := make([]refs.ContainerID, len(cnrs)) + for i := range cnrs { + cnrs[i].WriteToV2(&cidList[i]) + } + + resBody := new(container.ListStreamResponseBody) + resBody.SetContainerIDs(cidList) + r := new(container.ListStreamResponse) + r.SetBody(resBody) + + return stream.Send(r) +} diff --git a/pkg/services/container/server.go b/pkg/services/container/server.go index a19d83c5..0873081e 100644 --- a/pkg/services/container/server.go +++ b/pkg/services/container/server.go @@ -4,6 +4,7 @@ import ( "context" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util" ) // Server is an interface of the FrostFS API Container service server. @@ -12,4 +13,11 @@ type Server interface { Get(context.Context, *container.GetRequest) (*container.GetResponse, error) Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error) List(context.Context, *container.ListRequest) (*container.ListResponse, error) + ListStream(*container.ListStreamRequest, ListStream) error +} + +// ListStream is an interface of FrostFS API v2 compatible search streamer. +type ListStream interface { + util.ServerStream + Send(*container.ListStreamResponse) error } diff --git a/pkg/services/container/sign.go b/pkg/services/container/sign.go index f7f5d648..3788cc84 100644 --- a/pkg/services/container/sign.go +++ b/pkg/services/container/sign.go @@ -56,3 +56,40 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req)) return resp, s.sigSvc.SignResponse(resp, err) } + +func (s *signService) ListStream(req *container.ListStreamRequest, stream ListStream) error { + if err := s.sigSvc.VerifyRequest(req); err != nil { + resp := new(container.ListStreamResponse) + _ = s.sigSvc.SignResponse(resp, err) + return stream.Send(resp) + } + + ss := &listStreamSigner{ + ListStream: stream, + sigSvc: s.sigSvc, + } + err := s.svc.ListStream(req, ss) + if err != nil || !ss.nonEmptyResp { + return ss.send(new(container.ListStreamResponse), err) + } + return nil +} + +type listStreamSigner struct { + ListStream + sigSvc *util.SignService + + nonEmptyResp bool // set on first Send call +} + +func (s *listStreamSigner) Send(resp *container.ListStreamResponse) error { + s.nonEmptyResp = true + return s.send(resp, nil) +} + +func (s *listStreamSigner) send(resp *container.ListStreamResponse, err error) error { + if err := s.sigSvc.SignResponse(resp, err); err != nil { + return err + } + return s.ListStream.Send(resp) +}