frostfs-node/pkg/services/container/morph/executor.go
Ekaterina Lebedeva 03d5a1d5ad [#1452] container: Add ListStream method
* Added new method for listing containers to container service.
  It opens stream and sends containers in batches.

* Added TransportSplitter wrapper around ExecutionService to
  split container ID list read from contract in parts that are
  smaller than grpc max message size. Batch size can be changed
  in node configuration file (as in example config file).

* Changed `container list` implementaion in cli: now ListStream
  is called by default. Old List is called only if ListStream
  is not implemented.

Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2024-12-13 17:22:13 +03:00

234 lines
5.4 KiB
Go

package container
import (
"context"
"errors"
"fmt"
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
containerSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
sessionV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
)
var errMissingUserID = errors.New("missing user ID")
type morphExecutor struct {
rdr Reader
wrt Writer
}
// Reader is an interface of read-only container storage.
type Reader interface {
containercore.Source
// ContainersOf returns a list of container identifiers belonging
// to the specified user of FrostFS system. Returns the identifiers
// of all FrostFS containers if pointer to owner identifier is nil.
ContainersOf(*user.ID) ([]cid.ID, error)
}
// Writer is an interface of container storage updater.
type Writer interface {
// Put stores specified container in the side chain.
Put(context.Context, containercore.Container) (*cid.ID, error)
// Delete removes specified container from the side chain.
Delete(context.Context, containercore.RemovalWitness) error
}
func NewExecutor(rdr Reader, wrt Writer) containerSvc.ServiceExecutor {
return &morphExecutor{
rdr: rdr,
wrt: wrt,
}
}
func (s *morphExecutor) Put(ctx context.Context, tokV2 *sessionV2.Token, body *container.PutRequestBody) (*container.PutResponseBody, error) {
sigV2 := body.GetSignature()
if sigV2 == nil {
// TODO(@cthulhu-rider): #468 use "const" error
return nil, errors.New("missing signature")
}
cnrV2 := body.GetContainer()
if cnrV2 == nil {
return nil, errors.New("missing container field")
}
var cnr containercore.Container
err := cnr.Value.ReadFromV2(*cnrV2)
if err != nil {
return nil, fmt.Errorf("invalid container: %w", err)
}
err = cnr.Signature.ReadFromV2(*sigV2)
if err != nil {
return nil, fmt.Errorf("can't read signature: %w", err)
}
if tokV2 != nil {
cnr.Session = new(session.Container)
err := cnr.Session.ReadFromV2(*tokV2)
if err != nil {
return nil, fmt.Errorf("invalid session token: %w", err)
}
}
idCnr, err := s.wrt.Put(ctx, cnr)
if err != nil {
return nil, err
}
var idCnrV2 refs.ContainerID
idCnr.WriteToV2(&idCnrV2)
res := new(container.PutResponseBody)
res.SetContainerID(&idCnrV2)
return res, nil
}
func (s *morphExecutor) Delete(ctx context.Context, tokV2 *sessionV2.Token, body *container.DeleteRequestBody) (*container.DeleteResponseBody, error) {
idV2 := body.GetContainerID()
if idV2 == nil {
return nil, errors.New("missing container ID")
}
var id cid.ID
err := id.ReadFromV2(*idV2)
if err != nil {
return nil, fmt.Errorf("invalid container ID: %w", err)
}
var tok *session.Container
if tokV2 != nil {
tok = new(session.Container)
err := tok.ReadFromV2(*tokV2)
if err != nil {
return nil, fmt.Errorf("invalid session token: %w", err)
}
}
var rmWitness containercore.RemovalWitness
rmWitness.ContainerID = id
rmWitness.Signature = body.GetSignature()
rmWitness.SessionToken = tok
err = s.wrt.Delete(ctx, rmWitness)
if err != nil {
return nil, err
}
return new(container.DeleteResponseBody), nil
}
func (s *morphExecutor) Get(_ context.Context, body *container.GetRequestBody) (*container.GetResponseBody, error) {
idV2 := body.GetContainerID()
if idV2 == nil {
return nil, errors.New("missing container ID")
}
var id cid.ID
err := id.ReadFromV2(*idV2)
if err != nil {
return nil, fmt.Errorf("invalid container ID: %w", err)
}
cnr, err := s.rdr.Get(id)
if err != nil {
return nil, err
}
sigV2 := new(refs.Signature)
cnr.Signature.WriteToV2(sigV2)
var tokV2 *sessionV2.Token
if cnr.Session != nil {
tokV2 = new(sessionV2.Token)
cnr.Session.WriteToV2(tokV2)
}
var cnrV2 container.Container
cnr.Value.WriteToV2(&cnrV2)
res := new(container.GetResponseBody)
res.SetContainer(&cnrV2)
res.SetSignature(sigV2)
res.SetSessionToken(tokV2)
return res, nil
}
func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody) (*container.ListResponseBody, error) {
idV2 := body.GetOwnerID()
if idV2 == nil {
return nil, errMissingUserID
}
var id user.ID
err := id.ReadFromV2(*idV2)
if err != nil {
return nil, fmt.Errorf("invalid user ID: %w", err)
}
cnrs, err := s.rdr.ContainersOf(&id)
if err != nil {
return nil, err
}
cidList := make([]refs.ContainerID, len(cnrs))
for i := range cnrs {
cnrs[i].WriteToV2(&cidList[i])
}
res := new(container.ListResponseBody)
res.SetContainerIDs(cidList)
return res, nil
}
func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error {
body := req.GetBody()
idV2 := body.GetOwnerID()
if idV2 == nil {
return errMissingUserID
}
var id user.ID
err := id.ReadFromV2(*idV2)
if err != nil {
return fmt.Errorf("invalid user ID: %w", err)
}
cnrs, err := s.rdr.ContainersOf(&id)
if err != nil {
return err
}
cidList := make([]refs.ContainerID, len(cnrs))
for i := range cnrs {
cnrs[i].WriteToV2(&cidList[i])
}
resBody := new(container.ListStreamResponseBody)
resBody.SetContainerIDs(cidList)
r := new(container.ListStreamResponse)
r.SetBody(resBody)
return stream.Send(r)
}