container: Add ListStream method #1453
|
@ -85,6 +85,62 @@ func (x ListContainersRes) SortedIDList() []cid.ID {
|
||||||
return list
|
return list
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ContainerListStreamRes groups the resulting values of ListStream operation.
|
||||||
|
type ContainerListStreamRes struct {
|
||||||
|
ids []cid.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
// SortedIDList returns sorted identifiers of the matched containers.
|
||||||
|
func (x ContainerListStreamRes) SortedIDList() []cid.ID {
|
||||||
|
|||||||
|
list := x.ids
|
||||||
|
sort.Slice(list, func(i, j int) bool {
|
||||||
|
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString()
|
||||||
|
return strings.Compare(lhs, rhs) < 0
|
||||||
|
})
|
||||||
|
return list
|
||||||
|
}
|
||||||
|
|
||||||
|
func ListContainersStream(ctx context.Context, prm ListContainersPrm) (res ContainerListStreamRes, err error) {
|
||||||
|
cliPrm := &client.PrmContainerListStream{
|
||||||
|
XHeaders: prm.XHeaders,
|
||||||
|
Account: prm.Account,
|
||||||
|
Session: prm.Session,
|
||||||
|
}
|
||||||
|
rdr, err := prm.cli.ContainerListInit(ctx, *cliPrm)
|
||||||
|
if err != nil {
|
||||||
|
return res, fmt.Errorf("init container list: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := make([]cid.ID, 10)
|
||||||
|
var list []cid.ID
|
||||||
|
var n int
|
||||||
|
var ok bool
|
||||||
|
|
||||||
|
for {
|
||||||
|
n, ok = rdr.Read(buf)
|
||||||
|
for i := range n {
|
||||||
|
list = append(list, buf[i])
|
||||||
acid-ant
commented
How about to use callback here instead of append? How about to use callback here instead of append?
|
|||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = rdr.Close()
|
||||||
|
if err != nil {
|
||||||
|
return res, fmt.Errorf("read container list: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(list, func(i, j int) bool {
|
||||||
|
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString()
|
||||||
a-savchuk
commented
Can we compare IDs using their byte representation instead of as strings? Can we compare IDs using their byte representation instead of as strings?
|
|||||||
|
return strings.Compare(lhs, rhs) < 0
|
||||||
|
})
|
||||||
|
|
||||||
|
res.ids = list
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// PutContainerPrm groups parameters of PutContainer operation.
|
// PutContainerPrm groups parameters of PutContainer operation.
|
||||||
type PutContainerPrm struct {
|
type PutContainerPrm struct {
|
||||||
Client *client.Client
|
Client *client.Client
|
||||||
|
|
|
@ -6,8 +6,11 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
// flags of list command.
|
// flags of list command.
|
||||||
|
@ -53,14 +56,27 @@ var listContainersCmd = &cobra.Command{
|
||||||
prm.SetClient(cli)
|
prm.SetClient(cli)
|
||||||
prm.Account = idUser
|
prm.Account = idUser
|
||||||
|
|
||||||
res, err := internalclient.ListContainers(cmd.Context(), prm)
|
var containerIDs []cid.ID
|
||||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
res, err := internalclient.ListContainersStream(cmd.Context(), prm)
|
||||||
|
if err != nil {
|
||||||
|
if e, ok := status.FromError(err); ok {
|
||||||
|
switch e.Code() {
|
||||||
|
case codes.Unimplemented:
|
||||||
|
resV1, err := internalclient.ListContainers(cmd.Context(), prm)
|
||||||
|
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||||
|
containerIDs = resV1.SortedIDList()
|
||||||
|
default:
|
||||||
|
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
containerIDs = res.SortedIDList()
|
||||||
|
}
|
||||||
|
|
||||||
prmGet := internalclient.GetContainerPrm{
|
prmGet := internalclient.GetContainerPrm{
|
||||||
Client: cli,
|
Client: cli,
|
||||||
}
|
}
|
||||||
|
|
||||||
containerIDs := res.SortedIDList()
|
|
||||||
for _, cnrID := range containerIDs {
|
for _, cnrID := range containerIDs {
|
||||||
if flagVarListName == "" && !flagVarListPrintAttr {
|
if flagVarListName == "" && !flagVarListPrintAttr {
|
||||||
cmd.Println(cnrID.String())
|
cmd.Println(cnrID.String())
|
||||||
|
|
|
@ -611,6 +611,7 @@ type cfgContainer struct {
|
||||||||||||||||||||||||||||||||
parsers map[event.Type]event.NotificationParser
|
parsers map[event.Type]event.NotificationParser
|
||||||||||||||||||||||||||||||||
subscribers map[event.Type][]event.Handler
|
subscribers map[event.Type][]event.Handler
|
||||||||||||||||||||||||||||||||
workerPool util.WorkerPool // pool for asynchronous handlers
|
workerPool util.WorkerPool // pool for asynchronous handlers
|
||||||||||||||||||||||||||||||||
|
cnrAmount uint64 // amount of containers to send via stream at once
|
||||||||||||||||||||||||||||||||
}
|
}
|
||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
type cfgFrostfsID struct {
|
type cfgFrostfsID struct {
|
||||||||||||||||||||||||||||||||
|
@ -841,9 +842,15 @@ func initContainer(appCfg *config.Config) cfgContainer {
|
||||||||||||||||||||||||||||||||
containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
|
containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
|
||||||||||||||||||||||||||||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
|
amount := config.UintSafe(appCfg.Sub("morph"), "container_batch_size")
|
||||||||||||||||||||||||||||||||
|
if amount <= 0 {
|
||||||||||||||||||||||||||||||||
a-savchuk
commented
I can't be less than zero, I can't be less than zero, `if amount == 0` is enough
|
|||||||||||||||||||||||||||||||||
|
amount = 1000
|
||||||||||||||||||||||||||||||||
a-savchuk
commented
How about reading this value in a separate function in Lines 34 to 35 in 01acec7
Lines 109 to 119 in 01acec7
How about reading this value in a separate function in `cmd/frostfs-node/config/morph.go`, similar to how it's done for `container_cache_size` (examples below)? You can also define the default value as a constant there
https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/01acec708fa8f0c536ebc79e8f03ee2420ef3731/cmd/frostfs-node/config/morph/config.go#L34-L35
https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/01acec708fa8f0c536ebc79e8f03ee2420ef3731/cmd/frostfs-node/config/morph/config.go#L109-L119
What do you think?
a-savchuk
commented
Moreover, I think you should write a test for reading this new configuration option. You can find an example in cmd/frostfs-node/config/morph/config_test.go. This test will read the new option from the config file in different formats, so you may need to replicate this option in those formats as mentioned in one of my comments:
Moreover, I think you should write a test for reading this new configuration option. You can find an example in [cmd/frostfs-node/config/morph/config_test.go](https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/01acec708fa8f0c536ebc79e8f03ee2420ef3731/cmd/frostfs-node/config/morph/config_test.go).
This test will read the new option from the config file in different formats, so you may need to replicate this option in those formats as mentioned in one of my comments:
> We have configuration examples in YAML, JSON, and dotenv formats, which mostly duplicate each other. It'd be very helpful if you could replicate this new configuration option in the other config files we have
|
|||||||||||||||||||||||||||||||||
|
}
|
||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
return cfgContainer{
|
return cfgContainer{
|
||||||||||||||||||||||||||||||||
scriptHash: contractsconfig.Container(appCfg),
|
scriptHash: contractsconfig.Container(appCfg),
|
||||||||||||||||||||||||||||||||
workerPool: containerWorkerPool,
|
workerPool: containerWorkerPool,
|
||||||||||||||||||||||||||||||||
|
cnrAmount: amount,
|
||||||||||||||||||||||||||||||||
}
|
}
|
||||||||||||||||||||||||||||||||
}
|
}
|
||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
|
|
|
@ -56,7 +56,9 @@ func initContainerService(_ context.Context, c *cfg) {
|
||||||
&c.key.PrivateKey,
|
&c.key.PrivateKey,
|
||||||
containerService.NewAPEServer(defaultChainRouter, cnrRdr,
|
containerService.NewAPEServer(defaultChainRouter, cnrRdr,
|
||||||
newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient,
|
newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient,
|
||||||
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
|
containerService.NewSplitterService(
|
||||||
|
c.cfgContainer.cnrAmount, c.respSvc,
|
||||||
|
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc)),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
service = containerService.NewAuditService(service, c.log, c.audit)
|
service = containerService.NewAuditService(service, c.log, c.audit)
|
||||||
|
|
|
@ -83,6 +83,7 @@ morph:
|
||||||
# Default value: block time. It is recommended to have this value less or equal to block time.
|
# Default value: block time. It is recommended to have this value less or equal to block time.
|
||||||
# Cached entities: containers, container lists, eACL tables.
|
# Cached entities: containers, container lists, eACL tables.
|
||||||
container_cache_size: 100 # container_cache_size is is the maximum number of containers in the cache.
|
container_cache_size: 100 # container_cache_size is is the maximum number of containers in the cache.
|
||||||
|
container_batch_size: 1000 # container_batch_size is the maximum amount of containers to send via stream at once
|
||||||
a-savchuk
commented
We have configuration examples in YAML, JSON, and dotenv formats, which mostly duplicate each other. It'd be very helpful if you could replicate this new configuration option in the other config files we have We have configuration examples in YAML, JSON, and dotenv formats, which mostly duplicate each other. It'd be very helpful if you could replicate this new configuration option in the other config files we have
|
|||||||
switch_interval: 3m # interval b/w RPC switch attempts if the node is connected not to the highest priority node
|
switch_interval: 3m # interval b/w RPC switch attempts if the node is connected not to the highest priority node
|
||||||
rpc_endpoint: # side chain NEO RPC endpoints; are shuffled and used one by one until the first success
|
rpc_endpoint: # side chain NEO RPC endpoints; are shuffled and used one by one until the first success
|
||||||
- address: wss://rpc1.morph.frostfs.info:40341/ws
|
- address: wss://rpc1.morph.frostfs.info:40341/ws
|
||||||
|
|
4
go.mod
|
@ -8,7 +8,7 @@ require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241107121119-cb813e27a823
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241113074125-afdc2d8340bb
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
|
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
|
||||||
|
@ -134,3 +134,5 @@ require (
|
||||||
)
|
)
|
||||||
|
|
||||||
replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07
|
replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20241015133823-8aee80dbdc07
|
||||||
|
|
||||||
|
replace git.frostfs.info/TrueCloudLab/frostfs-sdk-go => /home/w0lframm/tmp/frostfs-sdk-go
|
||||||
|
|
BIN
go.sum
|
@ -80,3 +80,26 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con
|
||||||
|
|
||||||
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
|
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type containerStreamerV2 struct {
|
||||||
|
containerGRPC.ContainerService_ListStreamServer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *containerStreamerV2) Send(resp *container.ListStreamResponse) error {
|
||||||
|
return s.ContainerService_ListStreamServer.Send(
|
||||||
|
resp.ToGRPCMessage().(*containerGRPC.ListStreamResponse),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListStream converts gRPC ListRequest message and server-side stream and overtakes its data
|
||||||
|
// to gRPC stream.
|
||||||
|
func (s *Server) ListStream(req *containerGRPC.ListStreamRequest, gStream containerGRPC.ContainerService_ListStreamServer) error {
|
||||||
|
listReq := new(container.ListStreamRequest)
|
||||||
|
if err := listReq.FromGRPCMessage(req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.srv.ListStream(listReq, &containerStreamerV2{
|
||||||
|
ContainerService_ListStreamServer: gStream,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -175,6 +175,79 @@ func (ac *apeChecker) List(ctx context.Context, req *container.ListRequest) (*co
|
||||||
return nil, apeErr(nativeschema.MethodListContainers, s)
|
return nil, apeErr(nativeschema.MethodListContainers, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ac *apeChecker) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(stream.Context(), "apeChecker.ListStream")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
role, pk, err := ac.getRoleWithoutContainerID(req.GetBody().GetOwnerID(), req.GetMetaHeader(), req.GetVerificationHeader())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
reqProps := map[string]string{
|
||||||
|
nativeschema.PropertyKeyActorPublicKey: hex.EncodeToString(pk.Bytes()),
|
||||||
|
nativeschema.PropertyKeyActorRole: role,
|
||||||
|
}
|
||||||
|
|
||||||
|
reqProps, err = ac.fillWithUserClaimTags(reqProps, pk)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if p, ok := peer.FromContext(ctx); ok {
|
||||||
|
if tcpAddr, ok := p.Addr.(*net.TCPAddr); ok {
|
||||||
|
reqProps[commonschema.PropertyKeyFrostFSSourceIP] = tcpAddr.IP.String()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace, err := ac.namespaceByOwner(req.GetBody().GetOwnerID())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not get owner namespace: %w", err)
|
||||||
|
}
|
||||||
|
if err := ac.validateNamespaceByPublicKey(pk, namespace); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
request := aperequest.NewRequest(
|
||||||
|
nativeschema.MethodListContainers,
|
||||||
|
aperequest.NewResource(
|
||||||
|
resourceName(namespace, ""),
|
||||||
|
make(map[string]string),
|
||||||
|
),
|
||||||
|
reqProps,
|
||||||
|
)
|
||||||
|
|
||||||
|
groups, err := aperequest.Groups(ac.frostFSIDClient, pk)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get group ids: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Policy contract keeps group related chains as namespace-group pair.
|
||||||
|
for i := range groups {
|
||||||
|
groups[i] = fmt.Sprintf("%s:%s", namespace, groups[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
rt := policyengine.NewRequestTargetWithNamespace(namespace)
|
||||||
|
rt.User = &policyengine.Target{
|
||||||
|
Type: policyengine.User,
|
||||||
|
Name: fmt.Sprintf("%s:%s", namespace, pk.Address()),
|
||||||
|
}
|
||||||
|
rt.Groups = make([]policyengine.Target, len(groups))
|
||||||
|
for i := range groups {
|
||||||
|
rt.Groups[i] = policyengine.GroupTarget(groups[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
s, found, err := ac.router.IsAllowed(apechain.Ingress, rt, request)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if found && s == apechain.Allow {
|
||||||
|
return ac.next.ListStream(req, stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
return apeErr(nativeschema.MethodListContainers, s)
|
||||||
|
}
|
||||||
|
|
||||||
func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put")
|
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
|
@ -1079,6 +1079,11 @@ func (s *srvStub) List(context.Context, *container.ListRequest) (*container.List
|
||||||
return &container.ListResponse{}, nil
|
return &container.ListResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *srvStub) ListStream(*container.ListStreamRequest, ListStream) error {
|
||||||
|
s.calls["ListStream"]++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) {
|
func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) {
|
||||||
s.calls["Put"]++
|
s.calls["Put"]++
|
||||||
return &container.PutResponse{}, nil
|
return &container.PutResponse{}, nil
|
||||||
|
|
|
@ -63,6 +63,17 @@ func (a *auditService) List(ctx context.Context, req *container.ListRequest) (*c
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListStream implements Server.
|
||||||
|
func (a *auditService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||||
|
err := a.next.ListStream(req, stream)
|
||||||
|
if !a.enabled.Load() {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
audit.LogRequest(stream.Context(), a.log, container_grpc.ContainerService_ListStream_FullMethodName, req,
|
||||||
|
audit.TargetFromRef(req.GetBody().GetOwnerID(), &user.ID{}), err == nil)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Put implements Server.
|
// Put implements Server.
|
||||||
func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||||
res, err := a.next.Put(ctx, req)
|
res, err := a.next.Put(ctx, req)
|
||||||
|
|
|
@ -14,6 +14,7 @@ type ServiceExecutor interface {
|
||||||
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
|
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
|
||||||
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
|
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
|
||||||
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
|
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
|
||||||
|
ListStream(context.Context, *container.ListStreamRequest, ListStream) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type executorSvc struct {
|
type executorSvc struct {
|
||||||
|
@ -93,3 +94,11 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
|
||||||
s.respSvc.SetMeta(resp)
|
s.respSvc.SetMeta(resp)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *executorSvc) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||||
|
err := s.exec.ListStream(stream.Context(), req, stream)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not execute ListStream request: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -200,3 +200,35 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
|
||||||
|
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error {
|
||||||
|
body := req.GetBody()
|
||||||
|
idV2 := body.GetOwnerID()
|
||||||
|
if idV2 == nil {
|
||||||
|
return errMissingUserID
|
||||||
|
}
|
||||||
|
|
||||||
|
var id user.ID
|
||||||
|
|
||||||
|
err := id.ReadFromV2(*idV2)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("invalid user ID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cnrs, err := s.rdr.ContainersOf(&id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
cidList := make([]refs.ContainerID, len(cnrs))
|
||||||
|
for i := range cnrs {
|
||||||
|
cnrs[i].WriteToV2(&cidList[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
resBody := new(container.ListStreamResponseBody)
|
||||||
|
resBody.SetContainerIDs(cidList)
|
||||||
|
r := new(container.ListStreamResponse)
|
||||||
|
r.SetBody(resBody)
|
||||||
|
|
||||||
|
return stream.Send(r)
|
||||||
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package container
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -12,4 +13,11 @@ type Server interface {
|
||||||
Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
|
Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
|
||||||
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
|
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
|
||||||
List(context.Context, *container.ListRequest) (*container.ListResponse, error)
|
List(context.Context, *container.ListRequest) (*container.ListResponse, error)
|
||||||
|
ListStream(*container.ListStreamRequest, ListStream) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListStream is an interface of FrostFS API v2 compatible search streamer.
|
||||||
|
type ListStream interface {
|
||||||
|
util.ServerStream
|
||||||
|
Send(*container.ListStreamResponse) error
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,3 +56,40 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co
|
||||||
resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req))
|
resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req))
|
||||||
return resp, s.sigSvc.SignResponse(resp, err)
|
return resp, s.sigSvc.SignResponse(resp, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *signService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||||
|
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
||||||
|
resp := new(container.ListStreamResponse)
|
||||||
|
_ = s.sigSvc.SignResponse(resp, err)
|
||||||
|
return stream.Send(resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
ss := &listStreamSigner{
|
||||||
|
ListStream: stream,
|
||||||
|
sigSvc: s.sigSvc,
|
||||||
|
}
|
||||||
|
err := s.svc.ListStream(req, ss)
|
||||||
|
if err != nil || !ss.nonEmptyResp {
|
||||||
|
return ss.send(new(container.ListStreamResponse), err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type listStreamSigner struct {
|
||||||
|
ListStream
|
||||||
|
sigSvc *util.SignService
|
||||||
|
|
||||||
|
nonEmptyResp bool // set on first Send call
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *listStreamSigner) Send(resp *container.ListStreamResponse) error {
|
||||||
|
s.nonEmptyResp = true
|
||||||
|
return s.send(resp, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *listStreamSigner) send(resp *container.ListStreamResponse, err error) error {
|
||||||
|
if err := s.sigSvc.SignResponse(resp, err); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return s.ListStream.Send(resp)
|
||||||
|
}
|
||||||
|
|
92
pkg/services/container/transport_splitter.go
Normal file
|
@ -0,0 +1,92 @@
|
||||||
|
package container
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
TransportSplitter struct {
|
||||||
|
next Server
|
||||||
|
|
||||||
|
respSvc *response.Service
|
||||||
|
cnrAmount uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
listStreamMsgSizeCtrl struct {
|
||||||
|
util.ServerStream
|
||||||
|
stream ListStream
|
||||||
|
respSvc *response.Service
|
||||||
|
cnrAmount uint64
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewSplitterService(cnrAmount uint64, respSvc *response.Service, next Server) Server {
|
||||||
|
return &TransportSplitter{
|
||||||
|
next: next,
|
||||||
|
respSvc: respSvc,
|
||||||
|
cnrAmount: cnrAmount,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TransportSplitter) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||||
|
return s.next.Put(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TransportSplitter) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
|
||||||
|
return s.next.Delete(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TransportSplitter) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) {
|
||||||
|
return s.next.Get(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TransportSplitter) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) {
|
||||||
|
return s.next.List(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TransportSplitter) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||||
|
return s.next.ListStream(req, &listStreamMsgSizeCtrl{
|
||||||
|
ServerStream: stream,
|
||||||
|
stream: stream,
|
||||||
|
respSvc: s.respSvc,
|
||||||
|
cnrAmount: s.cnrAmount,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *listStreamMsgSizeCtrl) Send(resp *container.ListStreamResponse) error {
|
||||||
|
s.respSvc.SetMeta(resp)
|
||||||
|
body := resp.GetBody()
|
||||||
|
ids := body.GetContainerIDs()
|
||||||
|
|
||||||
|
var newResp *container.ListStreamResponse
|
||||||
|
|
||||||
|
for {
|
||||||
|
if newResp == nil {
|
||||||
|
newResp = new(container.ListStreamResponse)
|
||||||
|
newResp.SetBody(body)
|
||||||
|
}
|
||||||
|
|
||||||
|
cut := min(s.cnrAmount, uint64(len(ids)))
|
||||||
|
|
||||||
|
body.SetContainerIDs(ids[:cut])
|
||||||
|
newResp.SetMetaHeader(resp.GetMetaHeader())
|
||||||
|
newResp.SetVerificationHeader(resp.GetVerificationHeader())
|
||||||
|
|
||||||
|
if err := s.stream.Send(newResp); err != nil {
|
||||||
|
return fmt.Errorf("TransportSplitter: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ids = ids[cut:]
|
||||||
|
|
||||||
|
if len(ids) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
I didn't quite catch why we need to sort it here. I see that we already sort it at the end of
ListContainersStream
. Could you please explain your idea?