forked from TrueCloudLab/frostfs-node
add prm to to pass stream and resp
This commit is contained in:
parent
0cbc7d99c2
commit
2084a3d7c5
4 changed files with 83 additions and 23 deletions
|
@ -20,7 +20,6 @@ import (
|
||||||
containerMorph "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/morph"
|
containerMorph "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/morph"
|
||||||
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container/grpc"
|
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container/grpc"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
||||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
@ -221,7 +220,7 @@ type morphContainerReader struct {
|
||||||
src containerCore.Source
|
src containerCore.Source
|
||||||
|
|
||||||
lister interface {
|
lister interface {
|
||||||
ContainersOf(*user.ID) ([]cid.ID, error)
|
ContainersOf(*containerMorph.ClientPrm) ([]cid.ID, error)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -233,8 +232,8 @@ func (x *morphContainerReader) DeletionInfo(id cid.ID) (*containerCore.DelInfo,
|
||||||
return x.src.DeletionInfo(id)
|
return x.src.DeletionInfo(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
|
func (x *morphContainerReader) ContainersOf(prm *containerMorph.ClientPrm) ([]cid.ID, error) {
|
||||||
return x.lister.ContainersOf(id)
|
return x.lister.ContainersOf(prm)
|
||||||
}
|
}
|
||||||
|
|
||||||
type morphContainerWriter struct {
|
type morphContainerWriter struct {
|
||||||
|
|
|
@ -5,6 +5,8 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||||
|
container "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/morph"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
||||||
|
@ -15,9 +17,16 @@ import (
|
||||||
// to the specified user of FrostFS system. If idUser is nil, returns the list of all containers.
|
// to the specified user of FrostFS system. If idUser is nil, returns the list of all containers.
|
||||||
//
|
//
|
||||||
// If remote RPC does not support neo-go session API, fallback to List() method.
|
// If remote RPC does not support neo-go session API, fallback to List() method.
|
||||||
func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) {
|
func (c *Client) ContainersOf(prm *container.ClientPrm) ([]cid.ID, error) {
|
||||||
var rawID []byte
|
var rawID []byte
|
||||||
|
|
||||||
|
var idUser *user.ID
|
||||||
|
doSend := false
|
||||||
|
if prm != nil {
|
||||||
|
idUser = prm.OwnerID
|
||||||
|
doSend = prm.Stream != nil
|
||||||
|
}
|
||||||
|
|
||||||
if idUser != nil {
|
if idUser != nil {
|
||||||
rawID = idUser.WalletBytes()
|
rawID = idUser.WalletBytes()
|
||||||
}
|
}
|
||||||
|
@ -32,8 +41,9 @@ func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) {
|
||||||
const batchSize = 512
|
const batchSize = 512
|
||||||
|
|
||||||
var cidList []cid.ID
|
var cidList []cid.ID
|
||||||
wasSent := false
|
// ids := make([]cid.ID, 0, 512)
|
||||||
cb := func(item stackitem.Item) error {
|
// wasSent := false
|
||||||
|
appendCIDtoList := func(item stackitem.Item) error {
|
||||||
rawID, err := client.BytesFromStackItem(item)
|
rawID, err := client.BytesFromStackItem(item)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get byte array from stack item (%s): %w", containersOfMethod, err)
|
return fmt.Errorf("get byte array from stack item (%s): %w", containersOfMethod, err)
|
||||||
|
@ -47,12 +57,30 @@ func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
cidList = append(cidList, id)
|
cidList = append(cidList, id)
|
||||||
if len(cidList) == batchSize { // that's wrong check, need to fix
|
|
||||||
wasSent = true
|
|
||||||
// do send() and return
|
|
||||||
}
|
|
||||||
wasSent = false
|
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
cb := appendCIDtoList
|
||||||
|
if doSend {
|
||||||
|
cb = func(item stackitem.Item) error {
|
||||||
|
// if len(cidList) == 0 {
|
||||||
|
// wasSent = false
|
||||||
|
// }
|
||||||
|
if err := appendCIDtoList(item); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// ids = append(ids, id)
|
||||||
|
// if len(cidList) == batchSize { // that's wrong check, need to fix
|
||||||
|
// wasSent = true
|
||||||
|
prm.Resp.GetBody().SetContainerIDs(getRefCIDList(cidList))
|
||||||
|
// cidList = make([]cid.ID, 0, 512)
|
||||||
|
return prm.Stream.Send(prm.Resp)
|
||||||
|
// do send() and return
|
||||||
|
// }
|
||||||
|
// wasSent = false
|
||||||
|
// return nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cnrHash := c.client.ContractAddress()
|
cnrHash := c.client.ContractAddress()
|
||||||
|
@ -64,9 +92,19 @@ func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !wasSent {
|
// if !wasSent && doSend {
|
||||||
// do send() the rest
|
// // do send() the rest
|
||||||
}
|
// prm.Resp.GetBody().SetContainerIDs(getRefCIDList(cidList))
|
||||||
|
// prm.Stream.Send(prm.Resp)
|
||||||
|
// }
|
||||||
|
|
||||||
return cidList, nil
|
return cidList, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getRefCIDList(cnrs []cid.ID) []refs.ContainerID {
|
||||||
|
cidList := make([]refs.ContainerID, len(cnrs))
|
||||||
|
for i := range cnrs {
|
||||||
|
cnrs[i].WriteToV2(&cidList[i])
|
||||||
|
}
|
||||||
|
return cidList
|
||||||
|
}
|
||||||
|
|
|
@ -29,7 +29,14 @@ type Reader interface {
|
||||||
// ContainersOf returns a list of container identifiers belonging
|
// ContainersOf returns a list of container identifiers belonging
|
||||||
// to the specified user of FrostFS system. Returns the identifiers
|
// to the specified user of FrostFS system. Returns the identifiers
|
||||||
// of all FrostFS containers if pointer to owner identifier is nil.
|
// of all FrostFS containers if pointer to owner identifier is nil.
|
||||||
ContainersOf(*user.ID) ([]cid.ID, error)
|
ContainersOf(*ClientPrm) ([]cid.ID, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type ClientPrm struct {
|
||||||
|
OwnerID *user.ID
|
||||||
|
// Stream interface + signer??
|
||||||
|
Stream containerSvc.ListStream
|
||||||
|
Resp *container.ListStreamResponse
|
||||||
}
|
}
|
||||||
|
|
||||||
// Writer is an interface of container storage updater.
|
// Writer is an interface of container storage updater.
|
||||||
|
@ -185,7 +192,11 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
|
||||||
return nil, fmt.Errorf("invalid user ID: %w", err)
|
return nil, fmt.Errorf("invalid user ID: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cnrs, err := s.rdr.ContainersOf(&id)
|
prm := &ClientPrm{
|
||||||
|
OwnerID: &id,
|
||||||
|
}
|
||||||
|
|
||||||
|
cnrs, err := s.rdr.ContainersOf(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -215,20 +226,27 @@ func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamR
|
||||||
return fmt.Errorf("invalid user ID: %w", err)
|
return fmt.Errorf("invalid user ID: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cnrs, err := s.rdr.ContainersOf(&id)
|
resBody := new(container.ListStreamResponseBody)
|
||||||
|
r := new(container.ListStreamResponse)
|
||||||
|
r.SetBody(resBody)
|
||||||
|
|
||||||
|
prm := &ClientPrm{
|
||||||
|
OwnerID: &id,
|
||||||
|
Stream: stream,
|
||||||
|
Resp: r,
|
||||||
|
}
|
||||||
|
|
||||||
|
cnrs, err := s.rdr.ContainersOf(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// here starts duplicated sending, need to add condition
|
||||||
cidList := make([]refs.ContainerID, len(cnrs))
|
cidList := make([]refs.ContainerID, len(cnrs))
|
||||||
for i := range cnrs {
|
for i := range cnrs {
|
||||||
cnrs[i].WriteToV2(&cidList[i])
|
cnrs[i].WriteToV2(&cidList[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
resBody := new(container.ListStreamResponseBody)
|
|
||||||
resBody.SetContainerIDs(cidList)
|
resBody.SetContainerIDs(cidList)
|
||||||
r := new(container.ListStreamResponse)
|
|
||||||
r.SetBody(resBody)
|
|
||||||
|
|
||||||
return stream.Send(r)
|
return stream.Send(r)
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||||
|
containerMorph "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/morph"
|
||||||
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
|
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
@ -138,7 +139,11 @@ func (s *Server) RemoveContainer(ctx context.Context, req *control.RemoveContain
|
||||||
return nil, status.Error(codes.InvalidArgument, "failed to read owner: "+err.Error())
|
return nil, status.Error(codes.InvalidArgument, "failed to read owner: "+err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
cids, err := s.containerClient.ContainersOf(&owner)
|
prm := &containerMorph.ClientPrm{
|
||||||
|
OwnerID: &owner,
|
||||||
|
}
|
||||||
|
|
||||||
|
cids, err := s.containerClient.ContainersOf(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to get owner's containers: %w", err)
|
return nil, fmt.Errorf("failed to get owner's containers: %w", err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue