frostfs-node/pkg/services/container/morph/executor.go
Ekaterina Lebedeva 242f0095d0 [#1577] container: Reduce iterations through container list
* Separated iteration through container ids from `ContainersOf()`
  so that it could be reused.
* When listing containers we used to iterate through the
  the whole list of containers twice: first when reading from
  a contract, then when sending them. Now we can send batches
  of containers when reading from the contract.

Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2024-12-27 15:30:26 +03:00

256 lines
5.8 KiB
Go

package container
import (
"context"
"errors"
"fmt"
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
containerSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
sessionV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
)
var errMissingUserID = errors.New("missing user ID")
type morphExecutor struct {
rdr Reader
wrt Writer
}
// Reader is an interface of read-only container storage.
type Reader interface {
containercore.Source
// ContainersOf returns a list of container identifiers belonging
// to the specified user of FrostFS system. Returns the identifiers
// of all FrostFS containers if pointer to owner identifier is nil.
ContainersOf(*user.ID) ([]cid.ID, error)
IterateContainersOf(*user.ID, func(cid.ID) error) error
}
// Writer is an interface of container storage updater.
type Writer interface {
// Put stores specified container in the side chain.
Put(context.Context, containercore.Container) (*cid.ID, error)
// Delete removes specified container from the side chain.
Delete(context.Context, containercore.RemovalWitness) error
}
func NewExecutor(rdr Reader, wrt Writer) containerSvc.ServiceExecutor {
return &morphExecutor{
rdr: rdr,
wrt: wrt,
}
}
func (s *morphExecutor) Put(ctx context.Context, tokV2 *sessionV2.Token, body *container.PutRequestBody) (*container.PutResponseBody, error) {
sigV2 := body.GetSignature()
if sigV2 == nil {
// TODO(@cthulhu-rider): #468 use "const" error
return nil, errors.New("missing signature")
}
cnrV2 := body.GetContainer()
if cnrV2 == nil {
return nil, errors.New("missing container field")
}
var cnr containercore.Container
err := cnr.Value.ReadFromV2(*cnrV2)
if err != nil {
return nil, fmt.Errorf("invalid container: %w", err)
}
err = cnr.Signature.ReadFromV2(*sigV2)
if err != nil {
return nil, fmt.Errorf("can't read signature: %w", err)
}
if tokV2 != nil {
cnr.Session = new(session.Container)
err := cnr.Session.ReadFromV2(*tokV2)
if err != nil {
return nil, fmt.Errorf("invalid session token: %w", err)
}
}
idCnr, err := s.wrt.Put(ctx, cnr)
if err != nil {
return nil, err
}
var idCnrV2 refs.ContainerID
idCnr.WriteToV2(&idCnrV2)
res := new(container.PutResponseBody)
res.SetContainerID(&idCnrV2)
return res, nil
}
func (s *morphExecutor) Delete(ctx context.Context, tokV2 *sessionV2.Token, body *container.DeleteRequestBody) (*container.DeleteResponseBody, error) {
idV2 := body.GetContainerID()
if idV2 == nil {
return nil, errors.New("missing container ID")
}
var id cid.ID
err := id.ReadFromV2(*idV2)
if err != nil {
return nil, fmt.Errorf("invalid container ID: %w", err)
}
var tok *session.Container
if tokV2 != nil {
tok = new(session.Container)
err := tok.ReadFromV2(*tokV2)
if err != nil {
return nil, fmt.Errorf("invalid session token: %w", err)
}
}
var rmWitness containercore.RemovalWitness
rmWitness.ContainerID = id
rmWitness.Signature = body.GetSignature()
rmWitness.SessionToken = tok
err = s.wrt.Delete(ctx, rmWitness)
if err != nil {
return nil, err
}
return new(container.DeleteResponseBody), nil
}
func (s *morphExecutor) Get(_ context.Context, body *container.GetRequestBody) (*container.GetResponseBody, error) {
idV2 := body.GetContainerID()
if idV2 == nil {
return nil, errors.New("missing container ID")
}
var id cid.ID
err := id.ReadFromV2(*idV2)
if err != nil {
return nil, fmt.Errorf("invalid container ID: %w", err)
}
cnr, err := s.rdr.Get(id)
if err != nil {
return nil, err
}
sigV2 := new(refs.Signature)
cnr.Signature.WriteToV2(sigV2)
var tokV2 *sessionV2.Token
if cnr.Session != nil {
tokV2 = new(sessionV2.Token)
cnr.Session.WriteToV2(tokV2)
}
var cnrV2 container.Container
cnr.Value.WriteToV2(&cnrV2)
res := new(container.GetResponseBody)
res.SetContainer(&cnrV2)
res.SetSignature(sigV2)
res.SetSessionToken(tokV2)
return res, nil
}
func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody) (*container.ListResponseBody, error) {
idV2 := body.GetOwnerID()
if idV2 == nil {
return nil, errMissingUserID
}
var id user.ID
err := id.ReadFromV2(*idV2)
if err != nil {
return nil, fmt.Errorf("invalid user ID: %w", err)
}
cnrs, err := s.rdr.ContainersOf(&id)
if err != nil {
return nil, err
}
cidList := make([]refs.ContainerID, len(cnrs))
for i := range cnrs {
cnrs[i].WriteToV2(&cidList[i])
}
res := new(container.ListResponseBody)
res.SetContainerIDs(cidList)
return res, nil
}
func (s *morphExecutor) ListStream(ctx context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error {
body := req.GetBody()
idV2 := body.GetOwnerID()
if idV2 == nil {
return errMissingUserID
}
var id user.ID
err := id.ReadFromV2(*idV2)
if err != nil {
return fmt.Errorf("invalid user ID: %w", err)
}
resBody := new(container.ListStreamResponseBody)
r := new(container.ListStreamResponse)
r.SetBody(resBody)
var cidList []refs.ContainerID
// Amount of containers to send at once.
const batchSize = 1000
processCID := func(id cid.ID) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
var refID refs.ContainerID
id.WriteToV2(&refID)
cidList = append(cidList, refID)
if len(cidList) == batchSize {
r.GetBody().SetContainerIDs(cidList)
cidList = cidList[:0]
return stream.Send(r)
}
return nil
}
if err = s.rdr.IterateContainersOf(&id, processCID); err != nil {
return err
}
if len(cidList) > 0 {
r.GetBody().SetContainerIDs(cidList)
return stream.Send(r)
}
return nil
}