[#1452] container: Add ListStream method
All checks were successful
Vulncheck / Vulncheck (pull_request) Successful in 2m56s
Pre-commit hooks / Pre-commit (pull_request) Successful in 3m20s
Tests and linters / Run gofumpt (pull_request) Successful in 3m27s
DCO action / DCO (pull_request) Successful in 3m52s
Tests and linters / gopls check (pull_request) Successful in 3m50s
Build / Build Components (pull_request) Successful in 4m32s
Tests and linters / Staticcheck (pull_request) Successful in 5m23s
Tests and linters / Tests (pull_request) Successful in 5m33s
Tests and linters / Lint (pull_request) Successful in 5m55s
Tests and linters / Tests with -race (pull_request) Successful in 6m39s
All checks were successful
Vulncheck / Vulncheck (pull_request) Successful in 2m56s
Pre-commit hooks / Pre-commit (pull_request) Successful in 3m20s
Tests and linters / Run gofumpt (pull_request) Successful in 3m27s
DCO action / DCO (pull_request) Successful in 3m52s
Tests and linters / gopls check (pull_request) Successful in 3m50s
Build / Build Components (pull_request) Successful in 4m32s
Tests and linters / Staticcheck (pull_request) Successful in 5m23s
Tests and linters / Tests (pull_request) Successful in 5m33s
Tests and linters / Lint (pull_request) Successful in 5m55s
Tests and linters / Tests with -race (pull_request) Successful in 6m39s
* Added new method for listing containers to container service. It opens stream and sends containers in batches. * Added TransportSplitter wrapper around ExecutionService to split container ID list read from contract in parts that are smaller than grpc max message size. Batch size can be changed in node configuration file (as in example config file). * Changed `container list` implementaion in cli: now ListStream is called by default. Old List is called only if ListStream is not implemented. Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
This commit is contained in:
parent
3821645085
commit
591479e92b
20 changed files with 408 additions and 8 deletions
|
@ -85,6 +85,53 @@ func (x ListContainersRes) SortedIDList() []cid.ID {
|
|||
return list
|
||||
}
|
||||
|
||||
// ContainerListStreamRes groups the resulting values of ListStream operation.
|
||||
type ContainerListStreamRes struct {
|
||||
ids []cid.ID
|
||||
}
|
||||
|
||||
// SortedIDList returns sorted identifiers of the matched containers.
|
||||
func (x ContainerListStreamRes) SortedIDList() []cid.ID {
|
||||
list := x.ids
|
||||
slices.SortFunc(list, func(lhs, rhs cid.ID) int {
|
||||
return strings.Compare(lhs.EncodeToString(), rhs.EncodeToString())
|
||||
})
|
||||
return list
|
||||
}
|
||||
|
||||
func ListContainersStream(ctx context.Context, prm ListContainersPrm, printCnr func(id cid.ID)) (err error) {
|
||||
cliPrm := &client.PrmContainerListStream{
|
||||
XHeaders: prm.XHeaders,
|
||||
OwnerID: prm.OwnerID,
|
||||
Session: prm.Session,
|
||||
}
|
||||
rdr, err := prm.cli.ContainerListInit(ctx, *cliPrm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("init container list: %w", err)
|
||||
}
|
||||
|
||||
buf := make([]cid.ID, 10)
|
||||
var n int
|
||||
var ok bool
|
||||
|
||||
for {
|
||||
n, ok = rdr.Read(buf)
|
||||
for i := range n {
|
||||
printCnr(buf[i])
|
||||
}
|
||||
if !ok || n == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
_, err = rdr.Close()
|
||||
if err != nil {
|
||||
return fmt.Errorf("read container list: %w", err)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// PutContainerPrm groups parameters of PutContainer operation.
|
||||
type PutContainerPrm struct {
|
||||
Client *client.Client
|
||||
|
|
|
@ -6,8 +6,11 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/spf13/cobra"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// flags of list command.
|
||||
|
@ -51,16 +54,53 @@ var listContainersCmd = &cobra.Command{
|
|||
|
||||
var prm internalclient.ListContainersPrm
|
||||
prm.SetClient(cli)
|
||||
prm.Account = idUser
|
||||
|
||||
res, err := internalclient.ListContainers(cmd.Context(), prm)
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
|
||||
prm.OwnerID = idUser
|
||||
prmGet := internalclient.GetContainerPrm{
|
||||
Client: cli,
|
||||
}
|
||||
var containerIDs []cid.ID
|
||||
|
||||
err := internalclient.ListContainersStream(cmd.Context(), prm, func(id cid.ID) {
|
||||
if flagVarListName == "" && !flagVarListPrintAttr {
|
||||
cmd.Println(id.String())
|
||||
return
|
||||
}
|
||||
|
||||
prmGet.ClientParams.ContainerID = &id
|
||||
res, err := internalclient.GetContainer(cmd.Context(), prmGet)
|
||||
if err != nil {
|
||||
cmd.Printf(" failed to read attributes: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
cnr := res.Container()
|
||||
if cnrName := containerSDK.Name(cnr); flagVarListName != "" && cnrName != flagVarListName {
|
||||
return
|
||||
}
|
||||
cmd.Println(id.String())
|
||||
|
||||
if flagVarListPrintAttr {
|
||||
cnr.IterateUserAttributes(func(key, val string) {
|
||||
cmd.Printf(" %s: %s\n", key, val)
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
if e, ok := status.FromError(err); ok {
|
||||
switch e.Code() {
|
||||
case codes.Unimplemented:
|
||||
res, err := internalclient.ListContainers(cmd.Context(), prm)
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
containerIDs = res.SortedIDList()
|
||||
default:
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return
|
||||
}
|
||||
|
||||
containerIDs := res.SortedIDList()
|
||||
for _, cnrID := range containerIDs {
|
||||
if flagVarListName == "" && !flagVarListPrintAttr {
|
||||
cmd.Println(cnrID.String())
|
||||
|
|
|
@ -596,6 +596,8 @@ type cfgMorph struct {
|
|||
|
||||
containerCacheSize uint32
|
||||
|
||||
containerBatchSize uint32
|
||||
|
||||
proxyScriptHash neogoutil.Uint160
|
||||
}
|
||||
|
||||
|
|
|
@ -33,6 +33,9 @@ const (
|
|||
|
||||
// ContainerCacheSizeDefault represents the default size for the container cache.
|
||||
ContainerCacheSizeDefault = 100
|
||||
|
||||
// ContainerBatchSizeDefault represents he maximum amount of containers to send via stream at once.
|
||||
ContainerBatchSizeDefault = 1000
|
||||
)
|
||||
|
||||
var errNoMorphEndpoints = errors.New("no morph chain RPC endpoints, see `morph.rpc_endpoint` section")
|
||||
|
@ -118,6 +121,18 @@ func ContainerCacheSize(c *config.Config) uint32 {
|
|||
return config.Uint32Safe(c.Sub(subsection), "container_cache_size")
|
||||
}
|
||||
|
||||
// ContainerBatchSize returns the value of "container_batch_size" config parameter
|
||||
// from "morph" section.
|
||||
//
|
||||
// Returns 0 if the value is not positive integer.
|
||||
// Returns ContainerBatchSizeDefault if the value is missing.
|
||||
func ContainerBatchSize(c *config.Config) uint32 {
|
||||
if c.Sub(subsection).Value("container_batch_size") == nil {
|
||||
return ContainerBatchSizeDefault
|
||||
}
|
||||
return config.Uint32Safe(c.Sub(subsection), "container_batch_size")
|
||||
}
|
||||
|
||||
// SwitchInterval returns the value of "switch_interval" config parameter
|
||||
// from "morph" section.
|
||||
//
|
||||
|
|
|
@ -56,7 +56,9 @@ func initContainerService(_ context.Context, c *cfg) {
|
|||
&c.key.PrivateKey,
|
||||
containerService.NewAPEServer(defaultChainRouter, cnrRdr,
|
||||
newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient,
|
||||
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
|
||||
containerService.NewSplitterService(
|
||||
c.cfgMorph.containerBatchSize, c.respSvc,
|
||||
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc)),
|
||||
),
|
||||
)
|
||||
service = containerService.NewAuditService(service, c.log, c.audit)
|
||||
|
|
|
@ -50,6 +50,7 @@ func (c *cfg) initMorphComponents(ctx context.Context) {
|
|||
var netmapSource netmap.Source
|
||||
|
||||
c.cfgMorph.containerCacheSize = morphconfig.ContainerCacheSize(c.appCfg)
|
||||
c.cfgMorph.containerBatchSize = morphconfig.ContainerBatchSize(c.appCfg)
|
||||
c.cfgMorph.cacheTTL = morphconfig.CacheTTL(c.appCfg)
|
||||
|
||||
if c.cfgMorph.cacheTTL == 0 {
|
||||
|
|
|
@ -60,6 +60,7 @@ FROSTFS_CONTRACTS_PROXY=ad7c6b55b737b696e5c82c85445040964a03e97f
|
|||
# Morph chain section
|
||||
FROSTFS_MORPH_DIAL_TIMEOUT=30s
|
||||
FROSTFS_MORPH_CACHE_TTL=15s
|
||||
FROSTFS_MORPH_CONTAINER_BATCH_SIZE=1000
|
||||
FROSTFS_MORPH_SWITCH_INTERVAL=3m
|
||||
FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS="wss://rpc1.morph.frostfs.info:40341/ws"
|
||||
FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY=0
|
||||
|
|
|
@ -93,6 +93,7 @@
|
|||
"morph": {
|
||||
"dial_timeout": "30s",
|
||||
"cache_ttl": "15s",
|
||||
"container_batch_size": "1000",
|
||||
"switch_interval": "3m",
|
||||
"rpc_endpoint": [
|
||||
{
|
||||
|
|
|
@ -83,6 +83,7 @@ morph:
|
|||
# Default value: block time. It is recommended to have this value less or equal to block time.
|
||||
# Cached entities: containers, container lists, eACL tables.
|
||||
container_cache_size: 100 # container_cache_size is is the maximum number of containers in the cache.
|
||||
container_batch_size: 1000 # container_batch_size is the maximum amount of containers to send via stream at once
|
||||
switch_interval: 3m # interval b/w RPC switch attempts if the node is connected not to the highest priority node
|
||||
rpc_endpoint: # side chain NEO RPC endpoints; are shuffled and used one by one until the first success
|
||||
- address: wss://rpc1.morph.frostfs.info:40341/ws
|
||||
|
|
2
go.mod
2
go.mod
|
@ -8,7 +8,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241206094944-81c423e7094d
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241210104938-c4463df8d467
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -80,3 +80,26 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con
|
|||
|
||||
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
|
||||
}
|
||||
|
||||
type containerStreamerV2 struct {
|
||||
containerGRPC.ContainerService_ListStreamServer
|
||||
}
|
||||
|
||||
func (s *containerStreamerV2) Send(resp *container.ListStreamResponse) error {
|
||||
return s.ContainerService_ListStreamServer.Send(
|
||||
resp.ToGRPCMessage().(*containerGRPC.ListStreamResponse),
|
||||
)
|
||||
}
|
||||
|
||||
// ListStream converts gRPC ListRequest message and server-side stream and overtakes its data
|
||||
// to gRPC stream.
|
||||
func (s *Server) ListStream(req *containerGRPC.ListStreamRequest, gStream containerGRPC.ContainerService_ListStreamServer) error {
|
||||
listReq := new(container.ListStreamRequest)
|
||||
if err := listReq.FromGRPCMessage(req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.srv.ListStream(listReq, &containerStreamerV2{
|
||||
ContainerService_ListStreamServer: gStream,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -175,6 +175,79 @@ func (ac *apeChecker) List(ctx context.Context, req *container.ListRequest) (*co
|
|||
return nil, apeErr(nativeschema.MethodListContainers, s)
|
||||
}
|
||||
|
||||
func (ac *apeChecker) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||
ctx, span := tracing.StartSpanFromContext(stream.Context(), "apeChecker.ListStream")
|
||||
defer span.End()
|
||||
|
||||
role, pk, err := ac.getRoleWithoutContainerID(req.GetBody().GetOwnerID(), req.GetMetaHeader(), req.GetVerificationHeader())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reqProps := map[string]string{
|
||||
nativeschema.PropertyKeyActorPublicKey: hex.EncodeToString(pk.Bytes()),
|
||||
nativeschema.PropertyKeyActorRole: role,
|
||||
}
|
||||
|
||||
reqProps, err = ac.fillWithUserClaimTags(reqProps, pk)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if p, ok := peer.FromContext(ctx); ok {
|
||||
if tcpAddr, ok := p.Addr.(*net.TCPAddr); ok {
|
||||
reqProps[commonschema.PropertyKeyFrostFSSourceIP] = tcpAddr.IP.String()
|
||||
}
|
||||
}
|
||||
|
||||
namespace, err := ac.namespaceByOwner(req.GetBody().GetOwnerID())
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get owner namespace: %w", err)
|
||||
}
|
||||
if err := ac.validateNamespaceByPublicKey(pk, namespace); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
request := aperequest.NewRequest(
|
||||
nativeschema.MethodListContainers,
|
||||
aperequest.NewResource(
|
||||
resourceName(namespace, ""),
|
||||
make(map[string]string),
|
||||
),
|
||||
reqProps,
|
||||
)
|
||||
|
||||
groups, err := aperequest.Groups(ac.frostFSIDClient, pk)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get group ids: %w", err)
|
||||
}
|
||||
|
||||
// Policy contract keeps group related chains as namespace-group pair.
|
||||
for i := range groups {
|
||||
groups[i] = fmt.Sprintf("%s:%s", namespace, groups[i])
|
||||
}
|
||||
|
||||
rt := policyengine.NewRequestTargetWithNamespace(namespace)
|
||||
rt.User = &policyengine.Target{
|
||||
Type: policyengine.User,
|
||||
Name: fmt.Sprintf("%s:%s", namespace, pk.Address()),
|
||||
}
|
||||
rt.Groups = make([]policyengine.Target, len(groups))
|
||||
for i := range groups {
|
||||
rt.Groups[i] = policyengine.GroupTarget(groups[i])
|
||||
}
|
||||
|
||||
s, found, err := ac.router.IsAllowed(apechain.Ingress, rt, request)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if found && s == apechain.Allow {
|
||||
return ac.next.ListStream(req, stream)
|
||||
}
|
||||
|
||||
return apeErr(nativeschema.MethodListContainers, s)
|
||||
}
|
||||
|
||||
func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put")
|
||||
defer span.End()
|
||||
|
|
|
@ -1079,6 +1079,11 @@ func (s *srvStub) List(context.Context, *container.ListRequest) (*container.List
|
|||
return &container.ListResponse{}, nil
|
||||
}
|
||||
|
||||
func (s *srvStub) ListStream(*container.ListStreamRequest, ListStream) error {
|
||||
s.calls["ListStream"]++
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) {
|
||||
s.calls["Put"]++
|
||||
return &container.PutResponse{}, nil
|
||||
|
|
|
@ -63,6 +63,17 @@ func (a *auditService) List(ctx context.Context, req *container.ListRequest) (*c
|
|||
return res, err
|
||||
}
|
||||
|
||||
// ListStream implements Server.
|
||||
func (a *auditService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||
err := a.next.ListStream(req, stream)
|
||||
if !a.enabled.Load() {
|
||||
return err
|
||||
}
|
||||
audit.LogRequest(stream.Context(), a.log, container_grpc.ContainerService_ListStream_FullMethodName, req,
|
||||
audit.TargetFromRef(req.GetBody().GetOwnerID(), &user.ID{}), err == nil)
|
||||
return err
|
||||
}
|
||||
|
||||
// Put implements Server.
|
||||
func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||
res, err := a.next.Put(ctx, req)
|
||||
|
|
|
@ -14,6 +14,7 @@ type ServiceExecutor interface {
|
|||
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
|
||||
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
|
||||
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
|
||||
ListStream(context.Context, *container.ListStreamRequest, ListStream) error
|
||||
}
|
||||
|
||||
type executorSvc struct {
|
||||
|
@ -93,3 +94,11 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
|
|||
s.respSvc.SetMeta(resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *executorSvc) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||
err := s.exec.ListStream(stream.Context(), req, stream)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not execute ListStream request: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -200,3 +200,35 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
|
|||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error {
|
||||
body := req.GetBody()
|
||||
idV2 := body.GetOwnerID()
|
||||
if idV2 == nil {
|
||||
return errMissingUserID
|
||||
}
|
||||
|
||||
var id user.ID
|
||||
|
||||
err := id.ReadFromV2(*idV2)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid user ID: %w", err)
|
||||
}
|
||||
|
||||
cnrs, err := s.rdr.ContainersOf(&id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cidList := make([]refs.ContainerID, len(cnrs))
|
||||
for i := range cnrs {
|
||||
cnrs[i].WriteToV2(&cidList[i])
|
||||
}
|
||||
|
||||
resBody := new(container.ListStreamResponseBody)
|
||||
resBody.SetContainerIDs(cidList)
|
||||
r := new(container.ListStreamResponse)
|
||||
r.SetBody(resBody)
|
||||
|
||||
return stream.Send(r)
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package container
|
|||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
||||
)
|
||||
|
||||
|
@ -12,4 +13,11 @@ type Server interface {
|
|||
Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
|
||||
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
|
||||
List(context.Context, *container.ListRequest) (*container.ListResponse, error)
|
||||
ListStream(*container.ListStreamRequest, ListStream) error
|
||||
}
|
||||
|
||||
// ListStream is an interface of FrostFS API v2 compatible search streamer.
|
||||
type ListStream interface {
|
||||
util.ServerStream
|
||||
Send(*container.ListStreamResponse) error
|
||||
}
|
||||
|
|
|
@ -56,3 +56,40 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co
|
|||
resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req))
|
||||
return resp, s.sigSvc.SignResponse(resp, err)
|
||||
}
|
||||
|
||||
func (s *signService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
||||
resp := new(container.ListStreamResponse)
|
||||
_ = s.sigSvc.SignResponse(resp, err)
|
||||
return stream.Send(resp)
|
||||
}
|
||||
|
||||
ss := &listStreamSigner{
|
||||
ListStream: stream,
|
||||
sigSvc: s.sigSvc,
|
||||
}
|
||||
err := s.svc.ListStream(req, ss)
|
||||
if err != nil || !ss.nonEmptyResp {
|
||||
return ss.send(new(container.ListStreamResponse), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type listStreamSigner struct {
|
||||
ListStream
|
||||
sigSvc *util.SignService
|
||||
|
||||
nonEmptyResp bool // set on first Send call
|
||||
}
|
||||
|
||||
func (s *listStreamSigner) Send(resp *container.ListStreamResponse) error {
|
||||
s.nonEmptyResp = true
|
||||
return s.send(resp, nil)
|
||||
}
|
||||
|
||||
func (s *listStreamSigner) send(resp *container.ListStreamResponse, err error) error {
|
||||
if err := s.sigSvc.SignResponse(resp, err); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.ListStream.Send(resp)
|
||||
}
|
||||
|
|
92
pkg/services/container/transport_splitter.go
Normal file
92
pkg/services/container/transport_splitter.go
Normal file
|
@ -0,0 +1,92 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
||||
)
|
||||
|
||||
type (
|
||||
TransportSplitter struct {
|
||||
next Server
|
||||
|
||||
respSvc *response.Service
|
||||
cnrAmount uint32
|
||||
}
|
||||
|
||||
listStreamMsgSizeCtrl struct {
|
||||
util.ServerStream
|
||||
stream ListStream
|
||||
respSvc *response.Service
|
||||
cnrAmount uint32
|
||||
}
|
||||
)
|
||||
|
||||
func NewSplitterService(cnrAmount uint32, respSvc *response.Service, next Server) Server {
|
||||
return &TransportSplitter{
|
||||
next: next,
|
||||
respSvc: respSvc,
|
||||
cnrAmount: cnrAmount,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TransportSplitter) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||
return s.next.Put(ctx, req)
|
||||
}
|
||||
|
||||
func (s *TransportSplitter) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
|
||||
return s.next.Delete(ctx, req)
|
||||
}
|
||||
|
||||
func (s *TransportSplitter) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) {
|
||||
return s.next.Get(ctx, req)
|
||||
}
|
||||
|
||||
func (s *TransportSplitter) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) {
|
||||
return s.next.List(ctx, req)
|
||||
}
|
||||
|
||||
func (s *TransportSplitter) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||
return s.next.ListStream(req, &listStreamMsgSizeCtrl{
|
||||
ServerStream: stream,
|
||||
stream: stream,
|
||||
respSvc: s.respSvc,
|
||||
cnrAmount: s.cnrAmount,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *listStreamMsgSizeCtrl) Send(resp *container.ListStreamResponse) error {
|
||||
s.respSvc.SetMeta(resp)
|
||||
body := resp.GetBody()
|
||||
ids := body.GetContainerIDs()
|
||||
|
||||
var newResp *container.ListStreamResponse
|
||||
|
||||
for {
|
||||
if newResp == nil {
|
||||
newResp = new(container.ListStreamResponse)
|
||||
newResp.SetBody(body)
|
||||
}
|
||||
|
||||
cut := min(s.cnrAmount, uint32(len(ids)))
|
||||
|
||||
body.SetContainerIDs(ids[:cut])
|
||||
newResp.SetMetaHeader(resp.GetMetaHeader())
|
||||
newResp.SetVerificationHeader(resp.GetVerificationHeader())
|
||||
|
||||
if err := s.stream.Send(newResp); err != nil {
|
||||
return fmt.Errorf("TransportSplitter: %w", err)
|
||||
}
|
||||
|
||||
ids = ids[cut:]
|
||||
|
||||
if len(ids) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Loading…
Reference in a new issue