forked from TrueCloudLab/frostfs-node
[#1452] container: Add ListStream method
* Added new method for listing containers to container service. It opens stream and sends containers in batches. * Added TransportSplitter wrapper around ExecutionService to split container ID list read from contract in parts that are smaller than grpc max message size. Batch size can be changed in node configuration file (as in example config file). * Changed `container list` implementaion in cli: now ListStream is called by default. Old List is called only if ListStream is not implemented. * Changed `internalclient.ListContainersPrm`.`Account` to `OwnerID` since `client.PrmContainerList`.`Account` was renamed to `OwnerID` in sdk. Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
This commit is contained in:
parent
b6c8ebf493
commit
df05057ed4
20 changed files with 435 additions and 36 deletions
|
@ -83,6 +83,25 @@ func (x ListContainersRes) SortedIDList() []cid.ID {
|
|||
return list
|
||||
}
|
||||
|
||||
func ListContainersStream(ctx context.Context, prm ListContainersPrm, processCnr func(id cid.ID) bool) (err error) {
|
||||
cliPrm := &client.PrmContainerListStream{
|
||||
XHeaders: prm.XHeaders,
|
||||
OwnerID: prm.OwnerID,
|
||||
Session: prm.Session,
|
||||
}
|
||||
rdr, err := prm.cli.ContainerListInit(ctx, *cliPrm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("init container list: %w", err)
|
||||
}
|
||||
|
||||
err = rdr.Iterate(processCnr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read container list: %w", err)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// PutContainerPrm groups parameters of PutContainer operation.
|
||||
type PutContainerPrm struct {
|
||||
Client *client.Client
|
||||
|
|
|
@ -6,8 +6,11 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/spf13/cobra"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// flags of list command.
|
||||
|
@ -51,42 +54,58 @@ var listContainersCmd = &cobra.Command{
|
|||
|
||||
var prm internalclient.ListContainersPrm
|
||||
prm.SetClient(cli)
|
||||
prm.Account = idUser
|
||||
|
||||
res, err := internalclient.ListContainers(cmd.Context(), prm)
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
|
||||
prm.OwnerID = idUser
|
||||
prmGet := internalclient.GetContainerPrm{
|
||||
Client: cli,
|
||||
}
|
||||
var containerIDs []cid.ID
|
||||
|
||||
containerIDs := res.SortedIDList()
|
||||
for _, cnrID := range containerIDs {
|
||||
if flagVarListName == "" && !flagVarListPrintAttr {
|
||||
cmd.Println(cnrID.String())
|
||||
continue
|
||||
err := internalclient.ListContainersStream(cmd.Context(), prm, func(id cid.ID) bool {
|
||||
printContainer(cmd, prmGet, id)
|
||||
return false
|
||||
})
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
prmGet.ClientParams.ContainerID = &cnrID
|
||||
if e, ok := status.FromError(err); ok && e.Code() == codes.Unimplemented {
|
||||
res, err := internalclient.ListContainers(cmd.Context(), prm)
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
containerIDs = res.SortedIDList()
|
||||
} else {
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
}
|
||||
|
||||
for _, cnrID := range containerIDs {
|
||||
printContainer(cmd, prmGet, cnrID)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
func printContainer(cmd *cobra.Command, prmGet internalclient.GetContainerPrm, id cid.ID) {
|
||||
if flagVarListName == "" && !flagVarListPrintAttr {
|
||||
cmd.Println(id.String())
|
||||
return
|
||||
}
|
||||
|
||||
prmGet.ClientParams.ContainerID = &id
|
||||
res, err := internalclient.GetContainer(cmd.Context(), prmGet)
|
||||
if err != nil {
|
||||
cmd.Printf(" failed to read attributes: %v\n", err)
|
||||
continue
|
||||
return
|
||||
}
|
||||
|
||||
cnr := res.Container()
|
||||
if cnrName := containerSDK.Name(cnr); flagVarListName != "" && cnrName != flagVarListName {
|
||||
continue
|
||||
return
|
||||
}
|
||||
cmd.Println(cnrID.String())
|
||||
cmd.Println(id.String())
|
||||
|
||||
if flagVarListPrintAttr {
|
||||
cnr.IterateUserAttributes(func(key, val string) {
|
||||
cmd.Printf(" %s: %s\n", key, val)
|
||||
})
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
func initContainerListContainersCmd() {
|
||||
|
|
|
@ -609,6 +609,7 @@ type cfgContainer struct {
|
|||
parsers map[event.Type]event.NotificationParser
|
||||
subscribers map[event.Type][]event.Handler
|
||||
workerPool util.WorkerPool // pool for asynchronous handlers
|
||||
containerBatchSize uint32
|
||||
}
|
||||
|
||||
type cfgFrostfsID struct {
|
||||
|
|
27
cmd/frostfs-node/config/container/container.go
Normal file
27
cmd/frostfs-node/config/container/container.go
Normal file
|
@ -0,0 +1,27 @@
|
|||
package containerconfig
|
||||
|
||||
import "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
|
||||
const (
|
||||
subsection = "container"
|
||||
listStreamSubsection = "list_stream"
|
||||
|
||||
// ContainerBatchSizeDefault represents he maximum amount of containers to send via stream at once.
|
||||
ContainerBatchSizeDefault = 1000
|
||||
)
|
||||
|
||||
// ContainerBatchSize returns the value of "batch_size" config parameter
|
||||
// from "list_stream" subsection of "container" section.
|
||||
//
|
||||
// Returns ContainerBatchSizeDefault if the value is missing or if
|
||||
// the value is not positive integer.
|
||||
func ContainerBatchSize(c *config.Config) uint32 {
|
||||
if c.Sub(subsection).Sub(listStreamSubsection).Value("batch_size") == nil {
|
||||
return ContainerBatchSizeDefault
|
||||
}
|
||||
size := config.Uint32Safe(c.Sub(subsection).Sub(listStreamSubsection), "batch_size")
|
||||
if size == 0 {
|
||||
return ContainerBatchSizeDefault
|
||||
}
|
||||
return size
|
||||
}
|
27
cmd/frostfs-node/config/container/container_test.go
Normal file
27
cmd/frostfs-node/config/container/container_test.go
Normal file
|
@ -0,0 +1,27 @@
|
|||
package containerconfig_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
containerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/container"
|
||||
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestContainerSection(t *testing.T) {
|
||||
t.Run("defaults", func(t *testing.T) {
|
||||
empty := configtest.EmptyConfig()
|
||||
require.Equal(t, uint32(containerconfig.ContainerBatchSizeDefault), containerconfig.ContainerBatchSize(empty))
|
||||
})
|
||||
|
||||
const path = "../../../../config/example/node"
|
||||
fileConfigTest := func(c *config.Config) {
|
||||
require.Equal(t, uint32(500), containerconfig.ContainerBatchSize(c))
|
||||
}
|
||||
|
||||
configtest.ForEachFileType(path, fileConfigTest)
|
||||
t.Run("ENV", func(t *testing.T) {
|
||||
configtest.ForEnvFileType(t, path, fileConfigTest)
|
||||
})
|
||||
}
|
|
@ -5,6 +5,7 @@ import (
|
|||
"context"
|
||||
"net"
|
||||
|
||||
containerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/container"
|
||||
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||
|
@ -47,6 +48,7 @@ func initContainerService(_ context.Context, c *cfg) {
|
|||
}
|
||||
|
||||
c.shared.frostfsidClient = frostfsIDSubjectProvider
|
||||
c.cfgContainer.containerBatchSize = containerconfig.ContainerBatchSize(c.appCfg)
|
||||
|
||||
defaultChainRouter := engine.NewDefaultChainRouterWithLocalOverrides(
|
||||
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage(),
|
||||
|
@ -56,7 +58,9 @@ func initContainerService(_ context.Context, c *cfg) {
|
|||
&c.key.PrivateKey,
|
||||
containerService.NewAPEServer(defaultChainRouter, cnrRdr,
|
||||
newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient,
|
||||
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
|
||||
containerService.NewSplitterService(
|
||||
c.cfgContainer.containerBatchSize, c.respSvc,
|
||||
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc)),
|
||||
),
|
||||
)
|
||||
service = containerService.NewAuditService(service, c.log, c.audit)
|
||||
|
|
|
@ -83,6 +83,9 @@ FROSTFS_POLICER_HEAD_TIMEOUT=15s
|
|||
FROSTFS_REPLICATOR_PUT_TIMEOUT=15s
|
||||
FROSTFS_REPLICATOR_POOL_SIZE=10
|
||||
|
||||
# Container service section
|
||||
FROSTFS_CONTAINER_LIST_STREAM_BATCH_SIZE=500
|
||||
|
||||
# Object service section
|
||||
FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
|
||||
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
|
||||
|
|
|
@ -124,6 +124,11 @@
|
|||
"pool_size": 10,
|
||||
"put_timeout": "15s"
|
||||
},
|
||||
"container": {
|
||||
"list_stream": {
|
||||
"batch_size": "500"
|
||||
}
|
||||
},
|
||||
"object": {
|
||||
"delete": {
|
||||
"tombstone_lifetime": 10
|
||||
|
|
|
@ -108,6 +108,10 @@ replicator:
|
|||
put_timeout: 15s # timeout for the Replicator PUT remote operation
|
||||
pool_size: 10 # maximum amount of concurrent replications
|
||||
|
||||
container:
|
||||
list_stream:
|
||||
batch_size: 500 # container_batch_size is the maximum amount of containers to send via stream at once
|
||||
|
||||
object:
|
||||
delete:
|
||||
tombstone_lifetime: 10 # tombstone "local" lifetime in epochs
|
||||
|
|
2
go.mod
2
go.mod
|
@ -8,7 +8,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241206094944-81c423e7094d
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241210104938-c4463df8d467
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
|
||||
|
|
4
go.sum
4
go.sum
|
@ -8,8 +8,8 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb
|
|||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88 h1:9bvBDLApbbO5sXBKdODpE9tzy3HV99nXxkDWNn22rdI=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241206094944-81c423e7094d h1:FpXI+mOrmJk3t2MKQFZuhLjCHDyDeo5rtP1WXl7gUWc=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241206094944-81c423e7094d/go.mod h1:eoK7+KZQ9GJxbzIs6vTnoUJqFDppavInLRHaN4MYgZg=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241210104938-c4463df8d467 h1:MH9uHZFZNyUCL+YChiDcVeXPjhTDcFDeoGr8Mc8NY9M=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241210104938-c4463df8d467/go.mod h1:eoK7+KZQ9GJxbzIs6vTnoUJqFDppavInLRHaN4MYgZg=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 h1:/960fWeyn2AFHwQUwDsWB3sbP6lTEnFnMzLMM6tx6N8=
|
||||
|
|
|
@ -80,3 +80,26 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con
|
|||
|
||||
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
|
||||
}
|
||||
|
||||
type containerStreamerV2 struct {
|
||||
containerGRPC.ContainerService_ListStreamServer
|
||||
}
|
||||
|
||||
func (s *containerStreamerV2) Send(resp *container.ListStreamResponse) error {
|
||||
return s.ContainerService_ListStreamServer.Send(
|
||||
resp.ToGRPCMessage().(*containerGRPC.ListStreamResponse),
|
||||
)
|
||||
}
|
||||
|
||||
// ListStream converts gRPC ListRequest message and server-side stream and overtakes its data
|
||||
// to gRPC stream.
|
||||
func (s *Server) ListStream(req *containerGRPC.ListStreamRequest, gStream containerGRPC.ContainerService_ListStreamServer) error {
|
||||
listReq := new(container.ListStreamRequest)
|
||||
if err := listReq.FromGRPCMessage(req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.srv.ListStream(listReq, &containerStreamerV2{
|
||||
ContainerService_ListStreamServer: gStream,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -175,6 +175,79 @@ func (ac *apeChecker) List(ctx context.Context, req *container.ListRequest) (*co
|
|||
return nil, apeErr(nativeschema.MethodListContainers, s)
|
||||
}
|
||||
|
||||
func (ac *apeChecker) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||
ctx, span := tracing.StartSpanFromContext(stream.Context(), "apeChecker.ListStream")
|
||||
defer span.End()
|
||||
|
||||
role, pk, err := ac.getRoleWithoutContainerID(req.GetBody().GetOwnerID(), req.GetMetaHeader(), req.GetVerificationHeader())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reqProps := map[string]string{
|
||||
nativeschema.PropertyKeyActorPublicKey: hex.EncodeToString(pk.Bytes()),
|
||||
nativeschema.PropertyKeyActorRole: role,
|
||||
}
|
||||
|
||||
reqProps, err = ac.fillWithUserClaimTags(reqProps, pk)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if p, ok := peer.FromContext(ctx); ok {
|
||||
if tcpAddr, ok := p.Addr.(*net.TCPAddr); ok {
|
||||
reqProps[commonschema.PropertyKeyFrostFSSourceIP] = tcpAddr.IP.String()
|
||||
}
|
||||
}
|
||||
|
||||
namespace, err := ac.namespaceByOwner(req.GetBody().GetOwnerID())
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get owner namespace: %w", err)
|
||||
}
|
||||
if err := ac.validateNamespaceByPublicKey(pk, namespace); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
request := aperequest.NewRequest(
|
||||
nativeschema.MethodListContainers,
|
||||
aperequest.NewResource(
|
||||
resourceName(namespace, ""),
|
||||
make(map[string]string),
|
||||
),
|
||||
reqProps,
|
||||
)
|
||||
|
||||
groups, err := aperequest.Groups(ac.frostFSIDClient, pk)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get group ids: %w", err)
|
||||
}
|
||||
|
||||
// Policy contract keeps group related chains as namespace-group pair.
|
||||
for i := range groups {
|
||||
groups[i] = fmt.Sprintf("%s:%s", namespace, groups[i])
|
||||
}
|
||||
|
||||
rt := policyengine.NewRequestTargetWithNamespace(namespace)
|
||||
rt.User = &policyengine.Target{
|
||||
Type: policyengine.User,
|
||||
Name: fmt.Sprintf("%s:%s", namespace, pk.Address()),
|
||||
}
|
||||
rt.Groups = make([]policyengine.Target, len(groups))
|
||||
for i := range groups {
|
||||
rt.Groups[i] = policyengine.GroupTarget(groups[i])
|
||||
}
|
||||
|
||||
s, found, err := ac.router.IsAllowed(apechain.Ingress, rt, request)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if found && s == apechain.Allow {
|
||||
return ac.next.ListStream(req, stream)
|
||||
}
|
||||
|
||||
return apeErr(nativeschema.MethodListContainers, s)
|
||||
}
|
||||
|
||||
func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put")
|
||||
defer span.End()
|
||||
|
|
|
@ -1079,6 +1079,11 @@ func (s *srvStub) List(context.Context, *container.ListRequest) (*container.List
|
|||
return &container.ListResponse{}, nil
|
||||
}
|
||||
|
||||
func (s *srvStub) ListStream(*container.ListStreamRequest, ListStream) error {
|
||||
s.calls["ListStream"]++
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) {
|
||||
s.calls["Put"]++
|
||||
return &container.PutResponse{}, nil
|
||||
|
|
|
@ -63,6 +63,17 @@ func (a *auditService) List(ctx context.Context, req *container.ListRequest) (*c
|
|||
return res, err
|
||||
}
|
||||
|
||||
// ListStream implements Server.
|
||||
func (a *auditService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||
err := a.next.ListStream(req, stream)
|
||||
if !a.enabled.Load() {
|
||||
return err
|
||||
}
|
||||
audit.LogRequest(stream.Context(), a.log, container_grpc.ContainerService_ListStream_FullMethodName, req,
|
||||
audit.TargetFromRef(req.GetBody().GetOwnerID(), &user.ID{}), err == nil)
|
||||
return err
|
||||
}
|
||||
|
||||
// Put implements Server.
|
||||
func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||
res, err := a.next.Put(ctx, req)
|
||||
|
|
|
@ -14,6 +14,7 @@ type ServiceExecutor interface {
|
|||
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
|
||||
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
|
||||
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
|
||||
ListStream(context.Context, *container.ListStreamRequest, ListStream) error
|
||||
}
|
||||
|
||||
type executorSvc struct {
|
||||
|
@ -93,3 +94,11 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
|
|||
s.respSvc.SetMeta(resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *executorSvc) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||
err := s.exec.ListStream(stream.Context(), req, stream)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not execute ListStream request: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -200,3 +200,35 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
|
|||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error {
|
||||
body := req.GetBody()
|
||||
idV2 := body.GetOwnerID()
|
||||
if idV2 == nil {
|
||||
return errMissingUserID
|
||||
}
|
||||
|
||||
var id user.ID
|
||||
|
||||
err := id.ReadFromV2(*idV2)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid user ID: %w", err)
|
||||
}
|
||||
|
||||
cnrs, err := s.rdr.ContainersOf(&id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cidList := make([]refs.ContainerID, len(cnrs))
|
||||
for i := range cnrs {
|
||||
cnrs[i].WriteToV2(&cidList[i])
|
||||
}
|
||||
|
||||
resBody := new(container.ListStreamResponseBody)
|
||||
resBody.SetContainerIDs(cidList)
|
||||
r := new(container.ListStreamResponse)
|
||||
r.SetBody(resBody)
|
||||
|
||||
return stream.Send(r)
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package container
|
|||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
||||
)
|
||||
|
||||
|
@ -12,4 +13,11 @@ type Server interface {
|
|||
Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
|
||||
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
|
||||
List(context.Context, *container.ListRequest) (*container.ListResponse, error)
|
||||
ListStream(*container.ListStreamRequest, ListStream) error
|
||||
}
|
||||
|
||||
// ListStream is an interface of FrostFS API v2 compatible search streamer.
|
||||
type ListStream interface {
|
||||
util.ServerStream
|
||||
Send(*container.ListStreamResponse) error
|
||||
}
|
||||
|
|
|
@ -56,3 +56,40 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co
|
|||
resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req))
|
||||
return resp, s.sigSvc.SignResponse(resp, err)
|
||||
}
|
||||
|
||||
func (s *signService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
||||
resp := new(container.ListStreamResponse)
|
||||
_ = s.sigSvc.SignResponse(resp, err)
|
||||
return stream.Send(resp)
|
||||
}
|
||||
|
||||
ss := &listStreamSigner{
|
||||
ListStream: stream,
|
||||
sigSvc: s.sigSvc,
|
||||
}
|
||||
err := s.svc.ListStream(req, ss)
|
||||
if err != nil || !ss.nonEmptyResp {
|
||||
return ss.send(new(container.ListStreamResponse), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type listStreamSigner struct {
|
||||
ListStream
|
||||
sigSvc *util.SignService
|
||||
|
||||
nonEmptyResp bool // set on first Send call
|
||||
}
|
||||
|
||||
func (s *listStreamSigner) Send(resp *container.ListStreamResponse) error {
|
||||
s.nonEmptyResp = true
|
||||
return s.send(resp, nil)
|
||||
}
|
||||
|
||||
func (s *listStreamSigner) send(resp *container.ListStreamResponse, err error) error {
|
||||
if err := s.sigSvc.SignResponse(resp, err); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.ListStream.Send(resp)
|
||||
}
|
||||
|
|
92
pkg/services/container/transport_splitter.go
Normal file
92
pkg/services/container/transport_splitter.go
Normal file
|
@ -0,0 +1,92 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
||||
)
|
||||
|
||||
type (
|
||||
TransportSplitter struct {
|
||||
next Server
|
||||
|
||||
respSvc *response.Service
|
||||
cnrAmount uint32
|
||||
}
|
||||
|
||||
listStreamMsgSizeCtrl struct {
|
||||
util.ServerStream
|
||||
stream ListStream
|
||||
respSvc *response.Service
|
||||
cnrAmount uint32
|
||||
}
|
||||
)
|
||||
|
||||
func NewSplitterService(cnrAmount uint32, respSvc *response.Service, next Server) Server {
|
||||
return &TransportSplitter{
|
||||
next: next,
|
||||
respSvc: respSvc,
|
||||
cnrAmount: cnrAmount,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TransportSplitter) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||
return s.next.Put(ctx, req)
|
||||
}
|
||||
|
||||
func (s *TransportSplitter) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
|
||||
return s.next.Delete(ctx, req)
|
||||
}
|
||||
|
||||
func (s *TransportSplitter) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) {
|
||||
return s.next.Get(ctx, req)
|
||||
}
|
||||
|
||||
func (s *TransportSplitter) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) {
|
||||
return s.next.List(ctx, req)
|
||||
}
|
||||
|
||||
func (s *TransportSplitter) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||
return s.next.ListStream(req, &listStreamMsgSizeCtrl{
|
||||
ServerStream: stream,
|
||||
stream: stream,
|
||||
respSvc: s.respSvc,
|
||||
cnrAmount: s.cnrAmount,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *listStreamMsgSizeCtrl) Send(resp *container.ListStreamResponse) error {
|
||||
s.respSvc.SetMeta(resp)
|
||||
body := resp.GetBody()
|
||||
ids := body.GetContainerIDs()
|
||||
|
||||
var newResp *container.ListStreamResponse
|
||||
|
||||
for {
|
||||
if newResp == nil {
|
||||
newResp = new(container.ListStreamResponse)
|
||||
newResp.SetBody(body)
|
||||
}
|
||||
|
||||
cut := min(s.cnrAmount, uint32(len(ids)))
|
||||
|
||||
body.SetContainerIDs(ids[:cut])
|
||||
newResp.SetMetaHeader(resp.GetMetaHeader())
|
||||
newResp.SetVerificationHeader(resp.GetVerificationHeader())
|
||||
|
||||
if err := s.stream.Send(newResp); err != nil {
|
||||
return fmt.Errorf("TransportSplitter: %w", err)
|
||||
}
|
||||
|
||||
ids = ids[cut:]
|
||||
|
||||
if len(ids) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Loading…
Reference in a new issue