[#1452] container: Add ListStream method
Some checks failed
DCO action / DCO (pull_request) Successful in 1m25s
Tests and linters / Tests with -race (pull_request) Failing after 1m40s
Tests and linters / Run gofumpt (pull_request) Successful in 1m45s
Tests and linters / Tests (pull_request) Failing after 1m58s
Build / Build Components (pull_request) Failing after 2m6s
Vulncheck / Vulncheck (pull_request) Failing after 2m21s
Tests and linters / Staticcheck (pull_request) Failing after 2m29s
Pre-commit hooks / Pre-commit (pull_request) Failing after 3m7s
Tests and linters / Lint (pull_request) Failing after 3m11s
Tests and linters / gopls check (pull_request) Failing after 3m28s

* Added TransportSplitter wrapper around ExecutionService to
  split container ID list read from contract in parts that are
  smaller than grpc max message size. Batch size can be changed
  in node configuration file (as in example config file).

* Changed `container list` implementaion in cli: now ListStream
  is called by default. Old List is called only if ListStream
  is not implemented.

Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
This commit is contained in:
Ekaterina Lebedeva 2024-10-28 18:10:07 +03:00
parent 7eac5fb18b
commit d816e508ab
16 changed files with 379 additions and 7 deletions

View file

@ -85,6 +85,62 @@ func (x ListContainersRes) SortedIDList() []cid.ID {
return list
}
// ContainerListStreamRes groups the resulting values of ListStream operation.
type ContainerListStreamRes struct {
ids []cid.ID
}
// SortedIDList returns sorted identifiers of the matched containers.
func (x ContainerListStreamRes) SortedIDList() []cid.ID {
list := x.ids
sort.Slice(list, func(i, j int) bool {
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString()
return strings.Compare(lhs, rhs) < 0
})
return list
}
func ListContainersStream(ctx context.Context, prm ListContainersPrm) (res ContainerListStreamRes, err error) {
cliPrm := &client.PrmContainerListStream{
XHeaders: prm.XHeaders,
Account: prm.Account,
Session: prm.Session,
}
rdr, err := prm.cli.ContainerListInit(ctx, *cliPrm)
if err != nil {
return res, fmt.Errorf("init container list: %w", err)
}
buf := make([]cid.ID, 10)
var list []cid.ID
var n int
var ok bool
for {
n, ok = rdr.Read(buf)
for i := range n {
list = append(list, buf[i])
}
if !ok {
break
}
}
_, err = rdr.Close()
if err != nil {
return res, fmt.Errorf("read container list: %w", err)
}
sort.Slice(list, func(i, j int) bool {
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString()
return strings.Compare(lhs, rhs) < 0
})
res.ids = list
return
}
// PutContainerPrm groups parameters of PutContainer operation.
type PutContainerPrm struct {
Client *client.Client

View file

@ -6,8 +6,11 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// flags of list command.
@ -53,14 +56,27 @@ var listContainersCmd = &cobra.Command{
prm.SetClient(cli)
prm.Account = idUser
res, err := internalclient.ListContainers(cmd.Context(), prm)
var containerIDs []cid.ID
res, err := internalclient.ListContainersStream(cmd.Context(), prm)
if err != nil {
if e, ok := status.FromError(err); ok {
switch e.Code() {
case codes.Unimplemented:
resV1, err := internalclient.ListContainers(cmd.Context(), prm)
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
containerIDs = resV1.SortedIDList()
default:
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
}
}
} else {
containerIDs = res.SortedIDList()
}
prmGet := internalclient.GetContainerPrm{
Client: cli,
}
containerIDs := res.SortedIDList()
for _, cnrID := range containerIDs {
if flagVarListName == "" && !flagVarListPrintAttr {
cmd.Println(cnrID.String())

View file

@ -611,6 +611,7 @@ type cfgContainer struct {
parsers map[event.Type]event.NotificationParser
subscribers map[event.Type][]event.Handler
workerPool util.WorkerPool // pool for asynchronous handlers
cnrAmount uint64 // amount of containers to send via stream at once
}
type cfgFrostfsID struct {
@ -841,9 +842,15 @@ func initContainer(appCfg *config.Config) cfgContainer {
containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
fatalOnErr(err)
amount := config.UintSafe(appCfg.Sub("morph"), "container_batch_size")
if amount <= 0 {
amount = 1000
}
return cfgContainer{
scriptHash: contractsconfig.Container(appCfg),
workerPool: containerWorkerPool,
cnrAmount: amount,
}
}

View file

@ -56,7 +56,9 @@ func initContainerService(_ context.Context, c *cfg) {
&c.key.PrivateKey,
containerService.NewAPEServer(defaultChainRouter, cnrRdr,
newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient,
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
containerService.NewSplitterService(
c.cfgContainer.cnrAmount, c.respSvc,
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc)),
),
)
service = containerService.NewAuditService(service, c.log, c.audit)

View file

@ -83,6 +83,7 @@ morph:
# Default value: block time. It is recommended to have this value less or equal to block time.
# Cached entities: containers, container lists, eACL tables.
container_cache_size: 100 # container_cache_size is is the maximum number of containers in the cache.
container_batch_size: 1000 # container_batch_size is the maximum amount of containers to send via stream at once
switch_interval: 3m # interval b/w RPC switch attempts if the node is connected not to the highest priority node
rpc_endpoint: # side chain NEO RPC endpoints; are shuffled and used one by one until the first success
- address: wss://rpc1.morph.frostfs.info:40341/ws

4
go.mod
View file

@ -8,7 +8,7 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241107121119-cb813e27a823
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241113074125-afdc2d8340bb
git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
@ -134,3 +134,5 @@ require (
)
replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07
replace git.frostfs.info/TrueCloudLab/frostfs-sdk-go => /home/w0lframm/tmp/frostfs-sdk-go

2
go.sum
View file

@ -8,8 +8,6 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88 h1:9bvBDLApbbO5sXBKdODpE9tzy3HV99nXxkDWNn22rdI=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241107121119-cb813e27a823 h1:sepm9FeuoInmygH1K/+3L+Yp5bJhGiVi/oGCH6Emp2c=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241107121119-cb813e27a823/go.mod h1:eoK7+KZQ9GJxbzIs6vTnoUJqFDppavInLRHaN4MYgZg=
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8=

View file

@ -80,3 +80,26 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
}
type containerStreamerV2 struct {
containerGRPC.ContainerService_ListStreamServer
}
func (s *containerStreamerV2) Send(resp *container.ListStreamResponse) error {
return s.ContainerService_ListStreamServer.Send(
resp.ToGRPCMessage().(*containerGRPC.ListStreamResponse),
)
}
// ListStream converts gRPC ListRequest message and server-side stream and overtakes its data
// to gRPC stream.
func (s *Server) ListStream(req *containerGRPC.ListStreamRequest, gStream containerGRPC.ContainerService_ListStreamServer) error {
listReq := new(container.ListStreamRequest)
if err := listReq.FromGRPCMessage(req); err != nil {
return err
}
return s.srv.ListStream(listReq, &containerStreamerV2{
ContainerService_ListStreamServer: gStream,
})
}

View file

@ -175,6 +175,79 @@ func (ac *apeChecker) List(ctx context.Context, req *container.ListRequest) (*co
return nil, apeErr(nativeschema.MethodListContainers, s)
}
func (ac *apeChecker) ListStream(req *container.ListStreamRequest, stream ListStream) error {
ctx, span := tracing.StartSpanFromContext(stream.Context(), "apeChecker.ListStream")
defer span.End()
role, pk, err := ac.getRoleWithoutContainerID(req.GetBody().GetOwnerID(), req.GetMetaHeader(), req.GetVerificationHeader())
if err != nil {
return err
}
reqProps := map[string]string{
nativeschema.PropertyKeyActorPublicKey: hex.EncodeToString(pk.Bytes()),
nativeschema.PropertyKeyActorRole: role,
}
reqProps, err = ac.fillWithUserClaimTags(reqProps, pk)
if err != nil {
return err
}
if p, ok := peer.FromContext(ctx); ok {
if tcpAddr, ok := p.Addr.(*net.TCPAddr); ok {
reqProps[commonschema.PropertyKeyFrostFSSourceIP] = tcpAddr.IP.String()
}
}
namespace, err := ac.namespaceByOwner(req.GetBody().GetOwnerID())
if err != nil {
return fmt.Errorf("could not get owner namespace: %w", err)
}
if err := ac.validateNamespaceByPublicKey(pk, namespace); err != nil {
return err
}
request := aperequest.NewRequest(
nativeschema.MethodListContainers,
aperequest.NewResource(
resourceName(namespace, ""),
make(map[string]string),
),
reqProps,
)
groups, err := aperequest.Groups(ac.frostFSIDClient, pk)
if err != nil {
return fmt.Errorf("failed to get group ids: %w", err)
}
// Policy contract keeps group related chains as namespace-group pair.
for i := range groups {
groups[i] = fmt.Sprintf("%s:%s", namespace, groups[i])
}
rt := policyengine.NewRequestTargetWithNamespace(namespace)
rt.User = &policyengine.Target{
Type: policyengine.User,
Name: fmt.Sprintf("%s:%s", namespace, pk.Address()),
}
rt.Groups = make([]policyengine.Target, len(groups))
for i := range groups {
rt.Groups[i] = policyengine.GroupTarget(groups[i])
}
s, found, err := ac.router.IsAllowed(apechain.Ingress, rt, request)
if err != nil {
return err
}
if found && s == apechain.Allow {
return ac.next.ListStream(req, stream)
}
return apeErr(nativeschema.MethodListContainers, s)
}
func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put")
defer span.End()

View file

@ -1079,6 +1079,11 @@ func (s *srvStub) List(context.Context, *container.ListRequest) (*container.List
return &container.ListResponse{}, nil
}
func (s *srvStub) ListStream(*container.ListStreamRequest, ListStream) error {
s.calls["ListStream"]++
return nil
}
func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) {
s.calls["Put"]++
return &container.PutResponse{}, nil

View file

@ -63,6 +63,17 @@ func (a *auditService) List(ctx context.Context, req *container.ListRequest) (*c
return res, err
}
// ListStream implements Server.
func (a *auditService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
err := a.next.ListStream(req, stream)
if !a.enabled.Load() {
return err
}
audit.LogRequest(stream.Context(), a.log, container_grpc.ContainerService_ListStream_FullMethodName, req,
audit.TargetFromRef(req.GetBody().GetOwnerID(), &user.ID{}), err == nil)
return err
}
// Put implements Server.
func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
res, err := a.next.Put(ctx, req)

View file

@ -14,6 +14,7 @@ type ServiceExecutor interface {
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
ListStream(context.Context, *container.ListStreamRequest, ListStream) error
}
type executorSvc struct {
@ -93,3 +94,11 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
s.respSvc.SetMeta(resp)
return resp, nil
}
func (s *executorSvc) ListStream(req *container.ListStreamRequest, stream ListStream) error {
err := s.exec.ListStream(stream.Context(), req, stream)
if err != nil {
return fmt.Errorf("could not execute ListStream request: %w", err)
}
return nil
}

View file

@ -200,3 +200,35 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
return res, nil
}
func (s *morphExecutor) ListStream(ctx context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error {
body := req.GetBody()
idV2 := body.GetOwnerID()
if idV2 == nil {
return errMissingUserID
}
var id user.ID
err := id.ReadFromV2(*idV2)
if err != nil {
return fmt.Errorf("invalid user ID: %w", err)
}
cnrs, err := s.rdr.ContainersOf(&id)
if err != nil {
return err
}
cidList := make([]refs.ContainerID, len(cnrs))
for i := range cnrs {
cnrs[i].WriteToV2(&cidList[i])
}
resBody := new(container.ListStreamResponseBody)
resBody.SetContainerIDs(cidList)
r := new(container.ListStreamResponse)
r.SetBody(resBody)
return stream.Send(r)
}

View file

@ -3,6 +3,7 @@ package container
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
)
@ -12,4 +13,11 @@ type Server interface {
Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
List(context.Context, *container.ListRequest) (*container.ListResponse, error)
ListStream(*container.ListStreamRequest, ListStream) error
}
// ListStream is an interface of FrostFS API v2 compatible search streamer.
type ListStream interface {
util.ServerStream
Send(*container.ListStreamResponse) error
}

View file

@ -56,3 +56,40 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co
resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req))
return resp, s.sigSvc.SignResponse(resp, err)
}
func (s *signService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.ListStreamResponse)
_ = s.sigSvc.SignResponse(resp, err)
return stream.Send(resp)
}
ss := &listStreamSigner{
ListStream: stream,
sigSvc: s.sigSvc,
}
err := s.svc.ListStream(req, ss)
if err != nil || !ss.nonEmptyResp {
return ss.send(new(container.ListStreamResponse), err)
}
return nil
}
type listStreamSigner struct {
ListStream
sigSvc *util.SignService
nonEmptyResp bool // set on first Send call
}
func (s *listStreamSigner) Send(resp *container.ListStreamResponse) error {
s.nonEmptyResp = true
return s.send(resp, nil)
}
func (s *listStreamSigner) send(resp *container.ListStreamResponse, err error) error {
if err := s.sigSvc.SignResponse(resp, err); err != nil {
return err
}
return s.ListStream.Send(resp)
}

View file

@ -0,0 +1,92 @@
package container
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
)
type (
TransportSplitter struct {
next Server
respSvc *response.Service
cnrAmount uint64
}
listStreamMsgSizeCtrl struct {
util.ServerStream
stream ListStream
respSvc *response.Service
cnrAmount uint64
}
)
func NewSplitterService(cnrAmount uint64, respSvc *response.Service, next Server) Server {
return &TransportSplitter{
next: next,
respSvc: respSvc,
cnrAmount: cnrAmount,
}
}
func (s *TransportSplitter) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
return s.next.Put(ctx, req)
}
func (s *TransportSplitter) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
return s.next.Delete(ctx, req)
}
func (s *TransportSplitter) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) {
return s.next.Get(ctx, req)
}
func (s *TransportSplitter) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) {
return s.next.List(ctx, req)
}
func (s *TransportSplitter) ListStream(req *container.ListStreamRequest, stream ListStream) error {
return s.next.ListStream(req, &listStreamMsgSizeCtrl{
ServerStream: stream,
stream: stream,
respSvc: s.respSvc,
cnrAmount: s.cnrAmount,
})
}
func (s *listStreamMsgSizeCtrl) Send(resp *container.ListStreamResponse) error {
s.respSvc.SetMeta(resp)
body := resp.GetBody()
ids := body.GetContainerIDs()
var newResp *container.ListStreamResponse
for ln := uint64(len(ids)); ; {
if newResp == nil {
newResp = new(container.ListStreamResponse)
newResp.SetBody(body)
}
cut := min(s.cnrAmount, ln)
body.SetContainerIDs(ids[:cut])
newResp.SetMetaHeader(resp.GetMetaHeader())
newResp.SetVerificationHeader(resp.GetVerificationHeader())
if err := s.stream.Send(newResp); err != nil {
return fmt.Errorf("streamSplitter: %w", err)
}
ids = ids[cut:]
if len(ids) == 0 {
break
}
}
return nil
}