WIP: container: Add ListStream method #1453

Draft
elebedeva wants to merge 1 commit from elebedeva/frostfs-node:feat/stream-for-list into master
11 changed files with 349 additions and 0 deletions

View file

@ -85,6 +85,57 @@ func (x ListContainersRes) SortedIDList() []cid.ID {
return list
}
// SearchObjectsRes groups the resulting values of SearchObjects operation.
type ContainerListStreamRes struct {
ids []cid.ID
}
// IDList returns identifiers of the matched objects.
func (x ContainerListStreamRes) IDList() []cid.ID {
return x.ids
}
func ListContainersStream(ctx context.Context, prm ListContainersPrm) (res ContainerListStreamRes, err error) {
cliPrm := &client.PrmContainerListStream{
XHeaders: prm.XHeaders,
Account: prm.Account,
Session: prm.Session,
}
rdr, err := prm.cli.ContainerListInit(ctx, *cliPrm)
if err != nil {
return res, fmt.Errorf("init container list: %w", err)
}
buf := make([]cid.ID, 10)
var list []cid.ID
var n int
var ok bool
for {
n, ok = rdr.Read(buf)
for i := range n {
list = append(list, buf[i])
}
if !ok {
break
}
}
_, err = rdr.Close()
if err != nil {
return res, fmt.Errorf("read container list: %w", err)
}
sort.Slice(list, func(i, j int) bool {
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString()
return strings.Compare(lhs, rhs) < 0
})
res.ids = list
return
}
// PutContainerPrm groups parameters of PutContainer operation.
type PutContainerPrm struct {
Client *client.Client

View file

@ -0,0 +1,91 @@
package container
import (
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/spf13/cobra"
)
var listContainersStreamCmd = &cobra.Command{
Use: "list-stream",
Short: "List all created containers",
Long: "List all created containers",
Run: func(cmd *cobra.Command, _ []string) {
var idUser user.ID
generateKey, _ := cmd.Flags().GetBool(commonflags.GenerateKey)
if flagVarListContainerOwner == "" && generateKey {
cmd.PrintErrln("WARN: using -g without --owner - output will be empty")
}
key := key.GetOrGenerate(cmd)
if flagVarListContainerOwner == "" {
user.IDFromKey(&idUser, key.PublicKey)
} else {
err := idUser.DecodeString(flagVarListContainerOwner)
commonCmd.ExitOnErr(cmd, "invalid user ID: %w", err)
}
cli := internalclient.GetSDKClientByFlag(cmd, key, commonflags.RPC)
var prm internalclient.ListContainersPrm
prm.SetClient(cli)
prm.Account = idUser
res, err := internalclient.ListContainersStream(cmd.Context(), prm)
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
prmGet := internalclient.GetContainerPrm{
Client: cli,
}
containerIDs := res.IDList()
for _, cnrID := range containerIDs {
if flagVarListName == "" && !flagVarListPrintAttr {
cmd.Println(cnrID.String())
continue
}
prmGet.ClientParams.ContainerID = &cnrID
res, err := internalclient.GetContainer(cmd.Context(), prmGet)
if err != nil {
cmd.Printf(" failed to read attributes: %v\n", err)
continue
}
cnr := res.Container()
if cnrName := containerSDK.Name(cnr); flagVarListName != "" && cnrName != flagVarListName {
continue
}
cmd.Println(cnrID.String())
if flagVarListPrintAttr {
cnr.IterateUserAttributes(func(key, val string) {
cmd.Printf(" %s: %s\n", key, val)
})
}
}
},
}
func initContainerListStreamContainersCmd() {
commonflags.Init(listContainersStreamCmd)
flags := listContainersStreamCmd.Flags()
flags.StringVar(&flagVarListName, flagListName, "",
"List containers by the attribute name",
)
flags.StringVar(&flagVarListContainerOwner, flagListContainerOwner, "",
"Owner of containers (omit to use owner from private key)",
)
flags.BoolVar(&flagVarListPrintAttr, flagListPrintAttr, false,
"Request and print attributes of each container",
)
flags.Lookup(commonflags.GenerateKey).Usage = generateKeyContainerUsage
}

View file

@ -21,6 +21,7 @@ var Cmd = &cobra.Command{
func init() {
containerChildCommand := []*cobra.Command{
listContainersCmd,
listContainersStreamCmd,
createContainerCmd,
deleteContainerCmd,
listContainerObjectsCmd,
@ -32,6 +33,7 @@ func init() {
Cmd.AddCommand(containerChildCommand...)
initContainerListContainersCmd()
initContainerListStreamContainersCmd()
initContainerCreateCmd()
initContainerDeleteCmd()
initContainerListObjectsCmd()

View file

@ -80,3 +80,26 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
}
type containerStreamerV2 struct {
containerGRPC.ContainerService_ListStreamServer
}
func (s *containerStreamerV2) Send(resp *container.ListStreamResponse) error {
return s.ContainerService_ListStreamServer.Send(
resp.ToGRPCMessage().(*containerGRPC.ListStreamResponse),
)
}
// ListStream converts gRPC ListRequest message and server-side stream and overtakes its data
// to gRPC stream.
func (s *Server) ListStream(req *containerGRPC.ListStreamRequest, gStream containerGRPC.ContainerService_ListStreamServer) error {
listReq := new(container.ListStreamRequest)
if err := listReq.FromGRPCMessage(req); err != nil {
return err
}
return s.srv.ListStream(listReq, &containerStreamerV2{
ContainerService_ListStreamServer: gStream,
})
}

View file

@ -175,6 +175,79 @@ func (ac *apeChecker) List(ctx context.Context, req *container.ListRequest) (*co
return nil, apeErr(nativeschema.MethodListContainers, s)
}
func (ac *apeChecker) ListStream(req *container.ListStreamRequest, stream ListStream) error {
ctx, span := tracing.StartSpanFromContext(stream.Context(), "apeChecker.ListStream")
defer span.End()
role, pk, err := ac.getRoleWithoutContainerID(req.GetBody().GetOwnerID(), req.GetMetaHeader(), req.GetVerificationHeader())
if err != nil {
return err
}
reqProps := map[string]string{
nativeschema.PropertyKeyActorPublicKey: hex.EncodeToString(pk.Bytes()),
nativeschema.PropertyKeyActorRole: role,
}
reqProps, err = ac.fillWithUserClaimTags(reqProps, pk)
if err != nil {
return err
}
if p, ok := peer.FromContext(ctx); ok {
if tcpAddr, ok := p.Addr.(*net.TCPAddr); ok {
reqProps[commonschema.PropertyKeyFrostFSSourceIP] = tcpAddr.IP.String()
}
}
namespace, err := ac.namespaceByOwner(req.GetBody().GetOwnerID())
if err != nil {
return fmt.Errorf("could not get owner namespace: %w", err)
}
if err := ac.validateNamespaceByPublicKey(pk, namespace); err != nil {
return err
}
request := aperequest.NewRequest(
nativeschema.MethodListContainers,
aperequest.NewResource(
resourceName(namespace, ""),
make(map[string]string),
),
reqProps,
)
groups, err := aperequest.Groups(ac.frostFSIDClient, pk)
if err != nil {
return fmt.Errorf("failed to get group ids: %w", err)
}
// Policy contract keeps group related chains as namespace-group pair.
for i := range groups {
groups[i] = fmt.Sprintf("%s:%s", namespace, groups[i])
}
rt := policyengine.NewRequestTargetWithNamespace(namespace)
rt.User = &policyengine.Target{
Type: policyengine.User,
Name: fmt.Sprintf("%s:%s", namespace, pk.Address()),
}
rt.Groups = make([]policyengine.Target, len(groups))
for i := range groups {
rt.Groups[i] = policyengine.GroupTarget(groups[i])
}
s, found, err := ac.router.IsAllowed(apechain.Ingress, rt, request)
if err != nil {
return err
}
if found && s == apechain.Allow {
return ac.next.ListStream(req, stream)
}
return apeErr(nativeschema.MethodListContainers, s)
}
func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put")
defer span.End()

View file

@ -1079,6 +1079,11 @@ func (s *srvStub) List(context.Context, *container.ListRequest) (*container.List
return &container.ListResponse{}, nil
}
func (s *srvStub) ListStream(*container.ListStreamRequest, ListStream) error {
s.calls["ListStream"]++
return nil
}
func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) {
s.calls["Put"]++
return &container.PutResponse{}, nil

View file

@ -63,6 +63,17 @@ func (a *auditService) List(ctx context.Context, req *container.ListRequest) (*c
return res, err
}
// ListStream implements Server.
func (a *auditService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
err := a.next.ListStream(req, stream)
if !a.enabled.Load() {
return err
}
audit.LogRequest(a.log, container_grpc.ContainerService_ListStream_FullMethodName, req,
audit.TargetFromRef(req.GetBody().GetOwnerID(), &user.ID{}), err == nil)
return err
}
// Put implements Server.
func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
res, err := a.next.Put(ctx, req)

View file

@ -14,6 +14,7 @@ type ServiceExecutor interface {
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
ListStream(*container.ListStreamRequest, ListStream) error
}
type executorSvc struct {
@ -93,3 +94,18 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
s.respSvc.SetMeta(resp)
return resp, nil
}
func (s *executorSvc) ListStream(req *container.ListStreamRequest, stream ListStream) error {
err := s.exec.ListStream(req, stream)
if err != nil {
return fmt.Errorf("could not execute ListStream request: %w", err)
}
r := new(container.ListStreamResponse)
respBody := new(container.ListStreamResponseBody)
r.SetBody(respBody)
s.respSvc.SetMeta(r)
return stream.Send(r)
}

View file

@ -201,3 +201,35 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
return res, nil
}
func (s *morphExecutor) ListStream(req *container.ListStreamRequest, stream containerSvc.ListStream) error {
body := req.GetBody()
idV2 := body.GetOwnerID()
if idV2 == nil {
return errMissingUserID
}
var id user.ID
err := id.ReadFromV2(*idV2)
if err != nil {
return fmt.Errorf("invalid user ID: %w", err)
}
cnrs, err := s.rdr.ContainersOf(&id)
if err != nil {
return err
}
cidList := make([]refs.ContainerID, len(cnrs))
for i := range cnrs {
cnrs[i].WriteToV2(&cidList[i])
}
resBody := new(container.ListStreamResponseBody)
resBody.SetContainerIDs(cidList)
r := new(container.ListStreamResponse)
r.SetBody(resBody)
return stream.Send(r)
}

View file

@ -4,6 +4,7 @@ import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
)
// Server is an interface of the FrostFS API Container service server.
@ -12,4 +13,11 @@ type Server interface {
Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
List(context.Context, *container.ListRequest) (*container.ListResponse, error)
ListStream(*container.ListStreamRequest, ListStream) error
}
// ListStream is an interface of FrostFS API v2 compatible search streamer.
type ListStream interface {
util.ServerStream
Send(*container.ListStreamResponse) error
}

View file

@ -56,3 +56,40 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co
resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req))
return resp, s.sigSvc.SignResponse(resp, err)
}
func (s *signService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.ListStreamResponse)
_ = s.sigSvc.SignResponse(resp, err)
return stream.Send(resp)
}
ss := &listStreamSigner{
ListStream: stream,
sigSvc: s.sigSvc,
}
err := s.svc.ListStream(req, ss)
if err != nil || !ss.nonEmptyResp {
return ss.send(new(container.ListStreamResponse), err)
}
return nil
}
type listStreamSigner struct {
ListStream
sigSvc *util.SignService
nonEmptyResp bool // set on first Send call
}
func (s *listStreamSigner) Send(resp *container.ListStreamResponse) error {
s.nonEmptyResp = true
return s.send(resp, nil)
}
func (s *listStreamSigner) send(resp *container.ListStreamResponse, err error) error {
if err := s.sigSvc.SignResponse(resp, err); err != nil {
return err
}
return s.ListStream.Send(resp)
}