forked from TrueCloudLab/frostfs-node
Compare commits
2 commits
03d5a1d5ad
...
7ffc82a066
Author | SHA1 | Date | |
---|---|---|---|
7ffc82a066 | |||
bfa35cba26 |
20 changed files with 430 additions and 15 deletions
|
@ -9,7 +9,6 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"slices"
|
"slices"
|
||||||
"sort"
|
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting"
|
||||||
|
@ -78,13 +77,31 @@ func ListContainers(ctx context.Context, prm ListContainersPrm) (res ListContain
|
||||||
// SortedIDList returns sorted list of identifiers of user's containers.
|
// SortedIDList returns sorted list of identifiers of user's containers.
|
||||||
func (x ListContainersRes) SortedIDList() []cid.ID {
|
func (x ListContainersRes) SortedIDList() []cid.ID {
|
||||||
list := x.cliRes.Containers()
|
list := x.cliRes.Containers()
|
||||||
sort.Slice(list, func(i, j int) bool {
|
slices.SortFunc(list, func(lhs, rhs cid.ID) int {
|
||||||
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString()
|
return strings.Compare(lhs.EncodeToString(), rhs.EncodeToString())
|
||||||
return strings.Compare(lhs, rhs) < 0
|
|
||||||
})
|
})
|
||||||
return list
|
return list
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ListContainersStream(ctx context.Context, prm ListContainersPrm, printCnr func(id cid.ID) bool) (err error) {
|
||||||
|
cliPrm := &client.PrmContainerListStream{
|
||||||
|
XHeaders: prm.XHeaders,
|
||||||
|
OwnerID: prm.OwnerID,
|
||||||
|
Session: prm.Session,
|
||||||
|
}
|
||||||
|
rdr, err := prm.cli.ContainerListInit(ctx, *cliPrm)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("init container list: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = rdr.Iterate(printCnr)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("read container list: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// PutContainerPrm groups parameters of PutContainer operation.
|
// PutContainerPrm groups parameters of PutContainer operation.
|
||||||
type PutContainerPrm struct {
|
type PutContainerPrm struct {
|
||||||
Client *client.Client
|
Client *client.Client
|
||||||
|
|
|
@ -6,8 +6,11 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
// flags of list command.
|
// flags of list command.
|
||||||
|
@ -51,16 +54,50 @@ var listContainersCmd = &cobra.Command{
|
||||||
|
|
||||||
var prm internalclient.ListContainersPrm
|
var prm internalclient.ListContainersPrm
|
||||||
prm.SetClient(cli)
|
prm.SetClient(cli)
|
||||||
prm.Account = idUser
|
prm.OwnerID = idUser
|
||||||
|
|
||||||
res, err := internalclient.ListContainers(cmd.Context(), prm)
|
|
||||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
|
||||||
|
|
||||||
prmGet := internalclient.GetContainerPrm{
|
prmGet := internalclient.GetContainerPrm{
|
||||||
Client: cli,
|
Client: cli,
|
||||||
}
|
}
|
||||||
|
var containerIDs []cid.ID
|
||||||
|
|
||||||
|
err := internalclient.ListContainersStream(cmd.Context(), prm, func(id cid.ID) bool {
|
||||||
|
if flagVarListName == "" && !flagVarListPrintAttr {
|
||||||
|
cmd.Println(id.String())
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
prmGet.ClientParams.ContainerID = &id
|
||||||
|
res, err := internalclient.GetContainer(cmd.Context(), prmGet)
|
||||||
|
if err != nil {
|
||||||
|
cmd.Printf(" failed to read attributes: %v\n", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
cnr := res.Container()
|
||||||
|
if cnrName := containerSDK.Name(cnr); flagVarListName != "" && cnrName != flagVarListName {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
cmd.Println(id.String())
|
||||||
|
|
||||||
|
if flagVarListPrintAttr {
|
||||||
|
cnr.IterateUserAttributes(func(key, val string) {
|
||||||
|
cmd.Printf(" %s: %s\n", key, val)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if e, ok := status.FromError(err); ok && e.Code() == codes.Unimplemented {
|
||||||
|
res, err := internalclient.ListContainers(cmd.Context(), prm)
|
||||||
|
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||||
|
containerIDs = res.SortedIDList()
|
||||||
|
} else {
|
||||||
|
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
containerIDs := res.SortedIDList()
|
|
||||||
for _, cnrID := range containerIDs {
|
for _, cnrID := range containerIDs {
|
||||||
if flagVarListName == "" && !flagVarListPrintAttr {
|
if flagVarListName == "" && !flagVarListPrintAttr {
|
||||||
cmd.Println(cnrID.String())
|
cmd.Println(cnrID.String())
|
||||||
|
|
|
@ -609,6 +609,7 @@ type cfgContainer struct {
|
||||||
parsers map[event.Type]event.NotificationParser
|
parsers map[event.Type]event.NotificationParser
|
||||||
subscribers map[event.Type][]event.Handler
|
subscribers map[event.Type][]event.Handler
|
||||||
workerPool util.WorkerPool // pool for asynchronous handlers
|
workerPool util.WorkerPool // pool for asynchronous handlers
|
||||||
|
containerBatchSize uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfgFrostfsID struct {
|
type cfgFrostfsID struct {
|
||||||
|
|
27
cmd/frostfs-node/config/container/container.go
Normal file
27
cmd/frostfs-node/config/container/container.go
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
package containerconfig
|
||||||
|
|
||||||
|
import "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
|
|
||||||
|
const (
|
||||||
|
subsection = "container"
|
||||||
|
listStreamSubsection = "list_stream"
|
||||||
|
|
||||||
|
// ContainerBatchSizeDefault represents he maximum amount of containers to send via stream at once.
|
||||||
|
ContainerBatchSizeDefault = 1000
|
||||||
|
)
|
||||||
|
|
||||||
|
// ContainerBatchSize returns the value of "batch_size" config parameter
|
||||||
|
// from "list_stream" subsection of "container" section.
|
||||||
|
//
|
||||||
|
// Returns ContainerBatchSizeDefault if the value is missing or if
|
||||||
|
// the value is not positive integer.
|
||||||
|
func ContainerBatchSize(c *config.Config) uint32 {
|
||||||
|
if c.Sub(subsection).Sub(listStreamSubsection).Value("batch_size") == nil {
|
||||||
|
return ContainerBatchSizeDefault
|
||||||
|
}
|
||||||
|
size := config.Uint32Safe(c.Sub(subsection).Sub(listStreamSubsection), "batch_size")
|
||||||
|
if size == 0 {
|
||||||
|
return ContainerBatchSizeDefault
|
||||||
|
}
|
||||||
|
return size
|
||||||
|
}
|
27
cmd/frostfs-node/config/container/container_test.go
Normal file
27
cmd/frostfs-node/config/container/container_test.go
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
package containerconfig_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
|
containerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/container"
|
||||||
|
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestContainerSection(t *testing.T) {
|
||||||
|
t.Run("defaults", func(t *testing.T) {
|
||||||
|
empty := configtest.EmptyConfig()
|
||||||
|
require.Equal(t, uint32(containerconfig.ContainerBatchSizeDefault), containerconfig.ContainerBatchSize(empty))
|
||||||
|
})
|
||||||
|
|
||||||
|
const path = "../../../../config/example/node"
|
||||||
|
fileConfigTest := func(c *config.Config) {
|
||||||
|
require.Equal(t, uint32(1000), containerconfig.ContainerBatchSize(c))
|
||||||
|
}
|
||||||
|
|
||||||
|
configtest.ForEachFileType(path, fileConfigTest)
|
||||||
|
t.Run("ENV", func(t *testing.T) {
|
||||||
|
configtest.ForEnvFileType(t, path, fileConfigTest)
|
||||||
|
})
|
||||||
|
}
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
|
containerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/container"
|
||||||
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||||
|
@ -47,6 +48,7 @@ func initContainerService(_ context.Context, c *cfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
c.shared.frostfsidClient = frostfsIDSubjectProvider
|
c.shared.frostfsidClient = frostfsIDSubjectProvider
|
||||||
|
c.cfgContainer.containerBatchSize = containerconfig.ContainerBatchSize(c.appCfg)
|
||||||
|
|
||||||
defaultChainRouter := engine.NewDefaultChainRouterWithLocalOverrides(
|
defaultChainRouter := engine.NewDefaultChainRouterWithLocalOverrides(
|
||||||
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage(),
|
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage(),
|
||||||
|
@ -56,7 +58,9 @@ func initContainerService(_ context.Context, c *cfg) {
|
||||||
&c.key.PrivateKey,
|
&c.key.PrivateKey,
|
||||||
containerService.NewAPEServer(defaultChainRouter, cnrRdr,
|
containerService.NewAPEServer(defaultChainRouter, cnrRdr,
|
||||||
newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient,
|
newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient,
|
||||||
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
|
containerService.NewSplitterService(
|
||||||
|
c.cfgContainer.containerBatchSize, c.respSvc,
|
||||||
|
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc)),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
service = containerService.NewAuditService(service, c.log, c.audit)
|
service = containerService.NewAuditService(service, c.log, c.audit)
|
||||||
|
|
|
@ -83,6 +83,9 @@ FROSTFS_POLICER_HEAD_TIMEOUT=15s
|
||||||
FROSTFS_REPLICATOR_PUT_TIMEOUT=15s
|
FROSTFS_REPLICATOR_PUT_TIMEOUT=15s
|
||||||
FROSTFS_REPLICATOR_POOL_SIZE=10
|
FROSTFS_REPLICATOR_POOL_SIZE=10
|
||||||
|
|
||||||
|
# Container service section
|
||||||
|
FROSTFS_CONTAINER_LIST_STREAM_BATCH_SIZE=1000
|
||||||
|
|
||||||
# Object service section
|
# Object service section
|
||||||
FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
|
FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
|
||||||
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
|
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
|
||||||
|
|
|
@ -124,6 +124,11 @@
|
||||||
"pool_size": 10,
|
"pool_size": 10,
|
||||||
"put_timeout": "15s"
|
"put_timeout": "15s"
|
||||||
},
|
},
|
||||||
|
"container": {
|
||||||
|
"list_stream": {
|
||||||
|
"batch_size": "1000"
|
||||||
|
}
|
||||||
|
},
|
||||||
"object": {
|
"object": {
|
||||||
"delete": {
|
"delete": {
|
||||||
"tombstone_lifetime": 10
|
"tombstone_lifetime": 10
|
||||||
|
|
|
@ -108,6 +108,10 @@ replicator:
|
||||||
put_timeout: 15s # timeout for the Replicator PUT remote operation
|
put_timeout: 15s # timeout for the Replicator PUT remote operation
|
||||||
pool_size: 10 # maximum amount of concurrent replications
|
pool_size: 10 # maximum amount of concurrent replications
|
||||||
|
|
||||||
|
container:
|
||||||
|
list_stream:
|
||||||
|
batch_size: 1000 # container_batch_size is the maximum amount of containers to send via stream at once
|
||||||
|
|
||||||
object:
|
object:
|
||||||
delete:
|
delete:
|
||||||
tombstone_lifetime: 10 # tombstone "local" lifetime in epochs
|
tombstone_lifetime: 10 # tombstone "local" lifetime in epochs
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -8,7 +8,7 @@ require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241206094944-81c423e7094d
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241210104938-c4463df8d467
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
|
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -80,3 +80,26 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con
|
||||||
|
|
||||||
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
|
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type containerStreamerV2 struct {
|
||||||
|
containerGRPC.ContainerService_ListStreamServer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *containerStreamerV2) Send(resp *container.ListStreamResponse) error {
|
||||||
|
return s.ContainerService_ListStreamServer.Send(
|
||||||
|
resp.ToGRPCMessage().(*containerGRPC.ListStreamResponse),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListStream converts gRPC ListRequest message and server-side stream and overtakes its data
|
||||||
|
// to gRPC stream.
|
||||||
|
func (s *Server) ListStream(req *containerGRPC.ListStreamRequest, gStream containerGRPC.ContainerService_ListStreamServer) error {
|
||||||
|
listReq := new(container.ListStreamRequest)
|
||||||
|
if err := listReq.FromGRPCMessage(req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.srv.ListStream(listReq, &containerStreamerV2{
|
||||||
|
ContainerService_ListStreamServer: gStream,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -175,6 +175,79 @@ func (ac *apeChecker) List(ctx context.Context, req *container.ListRequest) (*co
|
||||||
return nil, apeErr(nativeschema.MethodListContainers, s)
|
return nil, apeErr(nativeschema.MethodListContainers, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ac *apeChecker) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(stream.Context(), "apeChecker.ListStream")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
role, pk, err := ac.getRoleWithoutContainerID(req.GetBody().GetOwnerID(), req.GetMetaHeader(), req.GetVerificationHeader())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
reqProps := map[string]string{
|
||||||
|
nativeschema.PropertyKeyActorPublicKey: hex.EncodeToString(pk.Bytes()),
|
||||||
|
nativeschema.PropertyKeyActorRole: role,
|
||||||
|
}
|
||||||
|
|
||||||
|
reqProps, err = ac.fillWithUserClaimTags(reqProps, pk)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if p, ok := peer.FromContext(ctx); ok {
|
||||||
|
if tcpAddr, ok := p.Addr.(*net.TCPAddr); ok {
|
||||||
|
reqProps[commonschema.PropertyKeyFrostFSSourceIP] = tcpAddr.IP.String()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace, err := ac.namespaceByOwner(req.GetBody().GetOwnerID())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not get owner namespace: %w", err)
|
||||||
|
}
|
||||||
|
if err := ac.validateNamespaceByPublicKey(pk, namespace); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
request := aperequest.NewRequest(
|
||||||
|
nativeschema.MethodListContainers,
|
||||||
|
aperequest.NewResource(
|
||||||
|
resourceName(namespace, ""),
|
||||||
|
make(map[string]string),
|
||||||
|
),
|
||||||
|
reqProps,
|
||||||
|
)
|
||||||
|
|
||||||
|
groups, err := aperequest.Groups(ac.frostFSIDClient, pk)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get group ids: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Policy contract keeps group related chains as namespace-group pair.
|
||||||
|
for i := range groups {
|
||||||
|
groups[i] = fmt.Sprintf("%s:%s", namespace, groups[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
rt := policyengine.NewRequestTargetWithNamespace(namespace)
|
||||||
|
rt.User = &policyengine.Target{
|
||||||
|
Type: policyengine.User,
|
||||||
|
Name: fmt.Sprintf("%s:%s", namespace, pk.Address()),
|
||||||
|
}
|
||||||
|
rt.Groups = make([]policyengine.Target, len(groups))
|
||||||
|
for i := range groups {
|
||||||
|
rt.Groups[i] = policyengine.GroupTarget(groups[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
s, found, err := ac.router.IsAllowed(apechain.Ingress, rt, request)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if found && s == apechain.Allow {
|
||||||
|
return ac.next.ListStream(req, stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
return apeErr(nativeschema.MethodListContainers, s)
|
||||||
|
}
|
||||||
|
|
||||||
func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put")
|
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
|
@ -1079,6 +1079,11 @@ func (s *srvStub) List(context.Context, *container.ListRequest) (*container.List
|
||||||
return &container.ListResponse{}, nil
|
return &container.ListResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *srvStub) ListStream(*container.ListStreamRequest, ListStream) error {
|
||||||
|
s.calls["ListStream"]++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) {
|
func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) {
|
||||||
s.calls["Put"]++
|
s.calls["Put"]++
|
||||||
return &container.PutResponse{}, nil
|
return &container.PutResponse{}, nil
|
||||||
|
|
|
@ -63,6 +63,17 @@ func (a *auditService) List(ctx context.Context, req *container.ListRequest) (*c
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListStream implements Server.
|
||||||
|
func (a *auditService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||||
|
err := a.next.ListStream(req, stream)
|
||||||
|
if !a.enabled.Load() {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
audit.LogRequest(stream.Context(), a.log, container_grpc.ContainerService_ListStream_FullMethodName, req,
|
||||||
|
audit.TargetFromRef(req.GetBody().GetOwnerID(), &user.ID{}), err == nil)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Put implements Server.
|
// Put implements Server.
|
||||||
func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||||
res, err := a.next.Put(ctx, req)
|
res, err := a.next.Put(ctx, req)
|
||||||
|
|
|
@ -14,6 +14,7 @@ type ServiceExecutor interface {
|
||||||
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
|
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
|
||||||
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
|
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
|
||||||
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
|
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
|
||||||
|
ListStream(context.Context, *container.ListStreamRequest, ListStream) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type executorSvc struct {
|
type executorSvc struct {
|
||||||
|
@ -93,3 +94,11 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
|
||||||
s.respSvc.SetMeta(resp)
|
s.respSvc.SetMeta(resp)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *executorSvc) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||||
|
err := s.exec.ListStream(stream.Context(), req, stream)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not execute ListStream request: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -200,3 +200,35 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
|
||||||
|
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error {
|
||||||
|
body := req.GetBody()
|
||||||
|
idV2 := body.GetOwnerID()
|
||||||
|
if idV2 == nil {
|
||||||
|
return errMissingUserID
|
||||||
|
}
|
||||||
|
|
||||||
|
var id user.ID
|
||||||
|
|
||||||
|
err := id.ReadFromV2(*idV2)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("invalid user ID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cnrs, err := s.rdr.ContainersOf(&id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
cidList := make([]refs.ContainerID, len(cnrs))
|
||||||
|
for i := range cnrs {
|
||||||
|
cnrs[i].WriteToV2(&cidList[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
resBody := new(container.ListStreamResponseBody)
|
||||||
|
resBody.SetContainerIDs(cidList)
|
||||||
|
r := new(container.ListStreamResponse)
|
||||||
|
r.SetBody(resBody)
|
||||||
|
|
||||||
|
return stream.Send(r)
|
||||||
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package container
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -12,4 +13,11 @@ type Server interface {
|
||||||
Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
|
Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
|
||||||
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
|
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
|
||||||
List(context.Context, *container.ListRequest) (*container.ListResponse, error)
|
List(context.Context, *container.ListRequest) (*container.ListResponse, error)
|
||||||
|
ListStream(*container.ListStreamRequest, ListStream) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListStream is an interface of FrostFS API v2 compatible search streamer.
|
||||||
|
type ListStream interface {
|
||||||
|
util.ServerStream
|
||||||
|
Send(*container.ListStreamResponse) error
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,3 +56,40 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co
|
||||||
resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req))
|
resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req))
|
||||||
return resp, s.sigSvc.SignResponse(resp, err)
|
return resp, s.sigSvc.SignResponse(resp, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *signService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||||
|
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
||||||
|
resp := new(container.ListStreamResponse)
|
||||||
|
_ = s.sigSvc.SignResponse(resp, err)
|
||||||
|
return stream.Send(resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
ss := &listStreamSigner{
|
||||||
|
ListStream: stream,
|
||||||
|
sigSvc: s.sigSvc,
|
||||||
|
}
|
||||||
|
err := s.svc.ListStream(req, ss)
|
||||||
|
if err != nil || !ss.nonEmptyResp {
|
||||||
|
return ss.send(new(container.ListStreamResponse), err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type listStreamSigner struct {
|
||||||
|
ListStream
|
||||||
|
sigSvc *util.SignService
|
||||||
|
|
||||||
|
nonEmptyResp bool // set on first Send call
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *listStreamSigner) Send(resp *container.ListStreamResponse) error {
|
||||||
|
s.nonEmptyResp = true
|
||||||
|
return s.send(resp, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *listStreamSigner) send(resp *container.ListStreamResponse, err error) error {
|
||||||
|
if err := s.sigSvc.SignResponse(resp, err); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return s.ListStream.Send(resp)
|
||||||
|
}
|
||||||
|
|
92
pkg/services/container/transport_splitter.go
Normal file
92
pkg/services/container/transport_splitter.go
Normal file
|
@ -0,0 +1,92 @@
|
||||||
|
package container
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
TransportSplitter struct {
|
||||||
|
next Server
|
||||||
|
|
||||||
|
respSvc *response.Service
|
||||||
|
cnrAmount uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
listStreamMsgSizeCtrl struct {
|
||||||
|
util.ServerStream
|
||||||
|
stream ListStream
|
||||||
|
respSvc *response.Service
|
||||||
|
cnrAmount uint32
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewSplitterService(cnrAmount uint32, respSvc *response.Service, next Server) Server {
|
||||||
|
return &TransportSplitter{
|
||||||
|
next: next,
|
||||||
|
respSvc: respSvc,
|
||||||
|
cnrAmount: cnrAmount,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TransportSplitter) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||||
|
return s.next.Put(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TransportSplitter) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
|
||||||
|
return s.next.Delete(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TransportSplitter) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) {
|
||||||
|
return s.next.Get(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TransportSplitter) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) {
|
||||||
|
return s.next.List(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TransportSplitter) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||||
|
return s.next.ListStream(req, &listStreamMsgSizeCtrl{
|
||||||
|
ServerStream: stream,
|
||||||
|
stream: stream,
|
||||||
|
respSvc: s.respSvc,
|
||||||
|
cnrAmount: s.cnrAmount,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *listStreamMsgSizeCtrl) Send(resp *container.ListStreamResponse) error {
|
||||||
|
s.respSvc.SetMeta(resp)
|
||||||
|
body := resp.GetBody()
|
||||||
|
ids := body.GetContainerIDs()
|
||||||
|
|
||||||
|
var newResp *container.ListStreamResponse
|
||||||
|
|
||||||
|
for {
|
||||||
|
if newResp == nil {
|
||||||
|
newResp = new(container.ListStreamResponse)
|
||||||
|
newResp.SetBody(body)
|
||||||
|
}
|
||||||
|
|
||||||
|
cut := min(s.cnrAmount, uint32(len(ids)))
|
||||||
|
|
||||||
|
body.SetContainerIDs(ids[:cut])
|
||||||
|
newResp.SetMetaHeader(resp.GetMetaHeader())
|
||||||
|
newResp.SetVerificationHeader(resp.GetVerificationHeader())
|
||||||
|
|
||||||
|
if err := s.stream.Send(newResp); err != nil {
|
||||||
|
return fmt.Errorf("TransportSplitter: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ids = ids[cut:]
|
||||||
|
|
||||||
|
if len(ids) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in a new issue