From 83b26b08b91e324337175abb1c884352dd5698b9 Mon Sep 17 00:00:00 2001
From: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
Date: Mon, 28 Oct 2024 18:06:01 +0300
Subject: [PATCH] [#291] container: Add ListStream method

Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
---
 client/container_list_stream.go | 203 ++++++++++++++++++++++++++++++++
 pool/mock_test.go               |   4 +
 pool/pool.go                    |  88 ++++++++++++++
 3 files changed, 295 insertions(+)
 create mode 100644 client/container_list_stream.go

diff --git a/client/container_list_stream.go b/client/container_list_stream.go
new file mode 100644
index 0000000..71ecb18
--- /dev/null
+++ b/client/container_list_stream.go
@@ -0,0 +1,203 @@
+package client
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"io"
+
+	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
+	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
+	rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc"
+	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
+	v2session "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session"
+	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/signature"
+	apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
+	cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
+	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
+	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
+)
+
+// PrmContainerListStream groups parameters of ContainerListStream operation.
+type PrmContainerListStream struct {
+	XHeaders []string
+
+	OwnerID user.ID
+
+	Session *session.Container
+}
+
+func (x *PrmContainerListStream) buildRequest(c *Client) (*container.ListStreamRequest, error) {
+	if x.OwnerID.IsEmpty() {
+		return nil, errorAccountNotSet
+	}
+
+	var ownerV2 refs.OwnerID
+	x.OwnerID.WriteToV2(&ownerV2)
+
+	reqBody := new(container.ListStreamRequestBody)
+	reqBody.SetOwnerID(&ownerV2)
+
+	var meta v2session.RequestMetaHeader
+	writeXHeadersToMeta(x.XHeaders, &meta)
+
+	if x.Session != nil {
+		var tokv2 v2session.Token
+		x.Session.WriteToV2(&tokv2)
+
+		meta.SetSessionToken(&tokv2)
+	}
+
+	var req container.ListStreamRequest
+	req.SetBody(reqBody)
+	c.prepareRequest(&req, &meta)
+	return &req, nil
+}
+
+// ResContainerListStream groups the final result values of ContainerListStream operation.
+type ResContainerListStream struct {
+	statusRes
+}
+
+// ContainerListReader is designed to read list of container identifiers from FrostFS system.
+//
+// Must be initialized using Client.ContainerListInit, any other usage is unsafe.
+type ContainerListReader struct {
+	client          *Client
+	cancelCtxStream context.CancelFunc
+	err             error
+	res             ResContainerListStream
+	stream          interface {
+		Read(resp *container.ListStreamResponse) error
+	}
+	tail []refs.ContainerID
+}
+
+// Read reads another list of the container identifiers. Works similar to
+// io.Reader.Read but copies cid.ID and returns success flag instead of error.
+//
+// Failure reason can be received via Close.
+//
+// Panics if buf has zero length.
+func (x *ContainerListReader) Read(buf []cid.ID) (int, bool) {
+	if len(buf) == 0 {
+		panic("empty buffer in ContainerListReader.ReadList")
+	}
+
+	read := copyCnrIDBuffers(buf, x.tail)
+	x.tail = x.tail[read:]
+
+	if len(buf) == read {
+		return read, true
+	}
+
+	for {
+		var resp container.ListStreamResponse
+		x.err = x.stream.Read(&resp)
+		if x.err != nil {
+			return read, false
+		}
+
+		x.res.st, x.err = x.client.processResponse(&resp)
+		if x.err != nil || !apistatus.IsSuccessful(x.res.st) {
+			return read, false
+		}
+
+		// read new chunk of containers
+		ids := resp.GetBody().GetContainerIDs()
+		if len(ids) == 0 {
+			// just skip empty lists since they are not prohibited by protocol
+			continue
+		}
+
+		ln := copyCnrIDBuffers(buf[read:], ids)
+		read += ln
+
+		if read == len(buf) {
+			// save the tail
+			x.tail = append(x.tail, ids[ln:]...)
+
+			return read, true
+		}
+	}
+}
+
+func copyCnrIDBuffers(dst []cid.ID, src []refs.ContainerID) int {
+	var i int
+	for ; i < len(dst) && i < len(src); i++ {
+		_ = dst[i].ReadFromV2(src[i])
+	}
+	return i
+}
+
+// Iterate iterates over the list of found container identifiers.
+// f can return true to stop iteration earlier.
+//
+// Returns an error if container can't be read.
+func (x *ContainerListReader) Iterate(f func(cid.ID) bool) error {
+	buf := make([]cid.ID, 1)
+
+	for {
+		// Do not check first return value because `len(buf) == 1`,
+		// so false means nothing was read.
+		_, ok := x.Read(buf)
+		if !ok {
+			res, err := x.Close()
+			if err != nil {
+				return err
+			}
+			return apistatus.ErrFromStatus(res.Status())
+		}
+		if f(buf[0]) {
+			return nil
+		}
+	}
+}
+
+// Close ends reading list of the matched containers and returns the result of the operation
+// along with the final results. Must be called after using the ContainerListReader.
+//
+// Exactly one return value is non-nil. By default, server status is returned in res structure.
+// Any client's internal or transport errors are returned as Go built-in error.
+// If Client is tuned to resolve FrostFS API statuses, then FrostFS failures
+// codes are returned as error.
+func (x *ContainerListReader) Close() (*ResContainerListStream, error) {
+	defer x.cancelCtxStream()
+
+	if x.err != nil && !errors.Is(x.err, io.EOF) {
+		return nil, x.err
+	}
+
+	return &x.res, nil
+}
+
+// ContainerListInit initiates container selection through a remote server using FrostFS API protocol.
+//
+// The call only opens the transmission channel, explicit fetching of identifiers of the account-owned
+// containers is done using the ContainerListReader. Exactly one return value is non-nil.
+// Resulting reader must be finally closed.
+//
+// Returns an error if parameters are set incorrectly (see PrmContainerListStream docs).
+// Context is required and must not be nil. It is used for network communication.
+func (c *Client) ContainerListInit(ctx context.Context, prm PrmContainerListStream) (*ContainerListReader, error) {
+	req, err := prm.buildRequest(c)
+	if err != nil {
+		return nil, err
+	}
+
+	err = signature.SignServiceMessage(&c.prm.Key, req)
+	if err != nil {
+		return nil, fmt.Errorf("sign request: %w", err)
+	}
+
+	var r ContainerListReader
+	ctx, r.cancelCtxStream = context.WithCancel(ctx)
+
+	r.stream, err = rpcapi.ListContainersStream(&c.c, req, client.WithContext(ctx))
+	if err != nil {
+		return nil, fmt.Errorf("open stream: %w", err)
+	}
+	r.client = c
+
+	return &r, nil
+}
diff --git a/pool/mock_test.go b/pool/mock_test.go
index ad9fd94..4731108 100644
--- a/pool/mock_test.go
+++ b/pool/mock_test.go
@@ -104,6 +104,10 @@ func (m *mockClient) containerList(context.Context, PrmContainerList) ([]cid.ID,
 	return nil, nil
 }
 
+func (m *mockClient) containerListStream(context.Context, PrmListStream) (ResListStream, error) {
+	return ResListStream{}, nil
+}
+
 func (m *mockClient) containerDelete(context.Context, PrmContainerDelete) error {
 	return nil
 }
diff --git a/pool/pool.go b/pool/pool.go
index d795af8..c9c2240 100644
--- a/pool/pool.go
+++ b/pool/pool.go
@@ -47,6 +47,8 @@ type client interface {
 	containerGet(context.Context, PrmContainerGet) (container.Container, error)
 	// see clientWrapper.containerList.
 	containerList(context.Context, PrmContainerList) ([]cid.ID, error)
+	// see clientWrapper.containerListStream.
+	containerListStream(context.Context, PrmListStream) (ResListStream, error)
 	// see clientWrapper.containerDelete.
 	containerDelete(context.Context, PrmContainerDelete) error
 	// see clientWrapper.apeManagerAddChain.
@@ -145,6 +147,7 @@ const (
 	methodContainerPut
 	methodContainerGet
 	methodContainerList
+	methodContainerListStream
 	methodContainerDelete
 	methodEndpointInfo
 	methodNetworkInfo
@@ -529,6 +532,75 @@ func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList)
 	return res.Containers(), nil
 }
 
+// PrmListStream groups parameters of ListContainersStream operation.
+type PrmListStream struct {
+	OwnerID user.ID
+
+	Session *session.Container
+}
+
+// ResListStream is designed to read list of object identifiers from FrostFS system.
+//
+// Must be initialized using Pool.ListContainersStream, any other usage is unsafe.
+type ResListStream struct {
+	r           *sdkClient.ContainerListReader
+	handleError func(context.Context, apistatus.Status, error) error
+}
+
+// Read reads another list of the container identifiers.
+func (x *ResListStream) Read(buf []cid.ID) (int, error) {
+	n, ok := x.r.Read(buf)
+	if !ok {
+		res, err := x.r.Close()
+		if err == nil {
+			return n, io.EOF
+		}
+
+		var status apistatus.Status
+		if res != nil {
+			status = res.Status()
+		}
+		err = x.handleError(nil, status, err)
+
+		return n, err
+	}
+
+	return n, nil
+}
+
+// Iterate iterates over the list of found container identifiers.
+// f can return true to stop iteration earlier.
+//
+// Returns an error if container can't be read.
+func (x *ResListStream) Iterate(f func(cid.ID) bool) error {
+	return x.r.Iterate(f)
+}
+
+// Close ends reading list of the matched containers and returns the result of the operation
+// along with the final results. Must be called after using the ResListStream.
+func (x *ResListStream) Close() {
+	_, _ = x.r.Close()
+}
+
+// containerList invokes sdkClient.ContainerList parse response status to error and return result as is.
+func (c *clientWrapper) containerListStream(ctx context.Context, prm PrmListStream) (ResListStream, error) {
+	cl, err := c.getClient()
+	if err != nil {
+		return ResListStream{}, err
+	}
+
+	cliPrm := sdkClient.PrmContainerListStream{
+		OwnerID: prm.OwnerID,
+		Session: prm.Session,
+	}
+
+	res, err := cl.ContainerListInit(ctx, cliPrm)
+	if err = c.handleError(ctx, nil, err); err != nil {
+		return ResListStream{}, fmt.Errorf("init container listing on client: %w", err)
+	}
+	return ResListStream{r: res, handleError: c.handleError}, nil
+}
+
 // containerDelete invokes sdkClient.ContainerDelete parse response status to error.
 // It also waits for the container to be removed from the network.
 func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDelete) error {
@@ -2887,6 +2959,22 @@ func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.
 	return cnrIDs, nil
 }
 
+// // ListContainers requests identifiers of the account-owned containers.
+func (p *Pool) ListContainersStream(ctx context.Context, prm PrmListStream) (ResListStream, error) {
+	var res ResListStream
+	cp, err := p.connection()
+	if err != nil {
+		return res, err
+	}
+
+	res, err = cp.containerListStream(ctx, prm)
+	if err != nil {
+		return res, fmt.Errorf("list containers stream via client '%s': %w", cp.address(), err)
+	}
+
+	return res, nil
+}
+
 // DeleteContainer sends request to remove the FrostFS container and waits for the operation to complete.
 //
 // Waiting parameters can be specified using SetWaitParams. If not called, defaults are used: