From cdc94cf6688c5a32c0d0c078018e6778930f1ece Mon Sep 17 00:00:00 2001 From: Ekaterina Lebedeva Date: Mon, 28 Oct 2024 18:06:01 +0300 Subject: [PATCH] [#291] container: Add ListStream method Signed-off-by: Ekaterina Lebedeva --- api/rpc/container.go | 2 +- client/container_list_stream.go | 203 ++++++++++++++++++++++++++++++++ pool/mock_test.go | 4 + pool/pool.go | 88 ++++++++++++++ 4 files changed, 296 insertions(+), 1 deletion(-) create mode 100644 client/container_list_stream.go diff --git a/api/rpc/container.go b/api/rpc/container.go index 0e3af82..a759bd5 100644 --- a/api/rpc/container.go +++ b/api/rpc/container.go @@ -96,7 +96,7 @@ func ListContainersStream( req *container.ListStreamRequest, opts ...client.CallOption, ) (*ListStreamResponseReader, error) { - wc, err := client.OpenServerStream(cli, common.CallMethodInfoServerStream(serviceContainer, rpcContainerList), req, opts...) + wc, err := client.OpenServerStream(cli, common.CallMethodInfoServerStream(serviceContainer, rpcContainerStream), req, opts...) if err != nil { return nil, err } diff --git a/client/container_list_stream.go b/client/container_list_stream.go new file mode 100644 index 0000000..71ecb18 --- /dev/null +++ b/client/container_list_stream.go @@ -0,0 +1,203 @@ +package client + +import ( + "context" + "errors" + "fmt" + "io" + + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs" + rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client" + v2session "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/signature" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" +) + +// PrmContainerListStream groups parameters of ContainerListStream operation. +type PrmContainerListStream struct { + XHeaders []string + + OwnerID user.ID + + Session *session.Container +} + +func (x *PrmContainerListStream) buildRequest(c *Client) (*container.ListStreamRequest, error) { + if x.OwnerID.IsEmpty() { + return nil, errorAccountNotSet + } + + var ownerV2 refs.OwnerID + x.OwnerID.WriteToV2(&ownerV2) + + reqBody := new(container.ListStreamRequestBody) + reqBody.SetOwnerID(&ownerV2) + + var meta v2session.RequestMetaHeader + writeXHeadersToMeta(x.XHeaders, &meta) + + if x.Session != nil { + var tokv2 v2session.Token + x.Session.WriteToV2(&tokv2) + + meta.SetSessionToken(&tokv2) + } + + var req container.ListStreamRequest + req.SetBody(reqBody) + c.prepareRequest(&req, &meta) + return &req, nil +} + +// ResContainerListStream groups the final result values of ContainerListStream operation. +type ResContainerListStream struct { + statusRes +} + +// ContainerListReader is designed to read list of container identifiers from FrostFS system. +// +// Must be initialized using Client.ContainerListInit, any other usage is unsafe. +type ContainerListReader struct { + client *Client + cancelCtxStream context.CancelFunc + err error + res ResContainerListStream + stream interface { + Read(resp *container.ListStreamResponse) error + } + tail []refs.ContainerID +} + +// Read reads another list of the container identifiers. Works similar to +// io.Reader.Read but copies cid.ID and returns success flag instead of error. +// +// Failure reason can be received via Close. +// +// Panics if buf has zero length. +func (x *ContainerListReader) Read(buf []cid.ID) (int, bool) { + if len(buf) == 0 { + panic("empty buffer in ContainerListReader.ReadList") + } + + read := copyCnrIDBuffers(buf, x.tail) + x.tail = x.tail[read:] + + if len(buf) == read { + return read, true + } + + for { + var resp container.ListStreamResponse + x.err = x.stream.Read(&resp) + if x.err != nil { + return read, false + } + + x.res.st, x.err = x.client.processResponse(&resp) + if x.err != nil || !apistatus.IsSuccessful(x.res.st) { + return read, false + } + + // read new chunk of containers + ids := resp.GetBody().GetContainerIDs() + if len(ids) == 0 { + // just skip empty lists since they are not prohibited by protocol + continue + } + + ln := copyCnrIDBuffers(buf[read:], ids) + read += ln + + if read == len(buf) { + // save the tail + x.tail = append(x.tail, ids[ln:]...) + + return read, true + } + } +} + +func copyCnrIDBuffers(dst []cid.ID, src []refs.ContainerID) int { + var i int + for ; i < len(dst) && i < len(src); i++ { + _ = dst[i].ReadFromV2(src[i]) + } + return i +} + +// Iterate iterates over the list of found container identifiers. +// f can return true to stop iteration earlier. +// +// Returns an error if container can't be read. +func (x *ContainerListReader) Iterate(f func(cid.ID) bool) error { + buf := make([]cid.ID, 1) + + for { + // Do not check first return value because `len(buf) == 1`, + // so false means nothing was read. + _, ok := x.Read(buf) + if !ok { + res, err := x.Close() + if err != nil { + return err + } + return apistatus.ErrFromStatus(res.Status()) + } + if f(buf[0]) { + return nil + } + } +} + +// Close ends reading list of the matched containers and returns the result of the operation +// along with the final results. Must be called after using the ContainerListReader. +// +// Exactly one return value is non-nil. By default, server status is returned in res structure. +// Any client's internal or transport errors are returned as Go built-in error. +// If Client is tuned to resolve FrostFS API statuses, then FrostFS failures +// codes are returned as error. +func (x *ContainerListReader) Close() (*ResContainerListStream, error) { + defer x.cancelCtxStream() + + if x.err != nil && !errors.Is(x.err, io.EOF) { + return nil, x.err + } + + return &x.res, nil +} + +// ContainerListInit initiates container selection through a remote server using FrostFS API protocol. +// +// The call only opens the transmission channel, explicit fetching of identifiers of the account-owned +// containers is done using the ContainerListReader. Exactly one return value is non-nil. +// Resulting reader must be finally closed. +// +// Returns an error if parameters are set incorrectly (see PrmContainerListStream docs). +// Context is required and must not be nil. It is used for network communication. +func (c *Client) ContainerListInit(ctx context.Context, prm PrmContainerListStream) (*ContainerListReader, error) { + req, err := prm.buildRequest(c) + if err != nil { + return nil, err + } + + err = signature.SignServiceMessage(&c.prm.Key, req) + if err != nil { + return nil, fmt.Errorf("sign request: %w", err) + } + + var r ContainerListReader + ctx, r.cancelCtxStream = context.WithCancel(ctx) + + r.stream, err = rpcapi.ListContainersStream(&c.c, req, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("open stream: %w", err) + } + r.client = c + + return &r, nil +} diff --git a/pool/mock_test.go b/pool/mock_test.go index ad9fd94..4731108 100644 --- a/pool/mock_test.go +++ b/pool/mock_test.go @@ -104,6 +104,10 @@ func (m *mockClient) containerList(context.Context, PrmContainerList) ([]cid.ID, return nil, nil } +func (m *mockClient) containerListStream(context.Context, PrmListStream) (ResListStream, error) { + return ResListStream{}, nil +} + func (m *mockClient) containerDelete(context.Context, PrmContainerDelete) error { return nil } diff --git a/pool/pool.go b/pool/pool.go index d795af8..c9c2240 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -47,6 +47,8 @@ type client interface { containerGet(context.Context, PrmContainerGet) (container.Container, error) // see clientWrapper.containerList. containerList(context.Context, PrmContainerList) ([]cid.ID, error) + // see clientWrapper.containerListStream. + containerListStream(context.Context, PrmListStream) (ResListStream, error) // see clientWrapper.containerDelete. containerDelete(context.Context, PrmContainerDelete) error // see clientWrapper.apeManagerAddChain. @@ -145,6 +147,7 @@ const ( methodContainerPut methodContainerGet methodContainerList + methodContainerListStream methodContainerDelete methodEndpointInfo methodNetworkInfo @@ -529,6 +532,75 @@ func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList) return res.Containers(), nil } +// PrmListStream groups parameters of ListContainersStream operation. +type PrmListStream struct { + OwnerID user.ID + + Session *session.Container +} + +// ResListStream is designed to read list of object identifiers from FrostFS system. +// +// Must be initialized using Pool.ListContainersStream, any other usage is unsafe. +type ResListStream struct { + r *sdkClient.ContainerListReader + handleError func(context.Context, apistatus.Status, error) error +} + +// Read reads another list of the container identifiers. +func (x *ResListStream) Read(buf []cid.ID) (int, error) { + n, ok := x.r.Read(buf) + if !ok { + res, err := x.r.Close() + if err == nil { + return n, io.EOF + } + + var status apistatus.Status + if res != nil { + status = res.Status() + } + err = x.handleError(nil, status, err) + + return n, err + } + + return n, nil +} + +// Iterate iterates over the list of found container identifiers. +// f can return true to stop iteration earlier. +// +// Returns an error if container can't be read. +func (x *ResListStream) Iterate(f func(cid.ID) bool) error { + return x.r.Iterate(f) +} + +// Close ends reading list of the matched containers and returns the result of the operation +// along with the final results. Must be called after using the ResListStream. +func (x *ResListStream) Close() { + _, _ = x.r.Close() +} + +// containerList invokes sdkClient.ContainerList parse response status to error and return result as is. +func (c *clientWrapper) containerListStream(ctx context.Context, prm PrmListStream) (ResListStream, error) { + cl, err := c.getClient() + if err != nil { + return ResListStream{}, err + } + + cliPrm := sdkClient.PrmContainerListStream{ + OwnerID: prm.OwnerID, + Session: prm.Session, + } + + res, err := cl.ContainerListInit(ctx, cliPrm) + if err = c.handleError(ctx, nil, err); err != nil { + return ResListStream{}, fmt.Errorf("init container listing on client: %w", err) + } + return ResListStream{r: res, handleError: c.handleError}, nil +} + // containerDelete invokes sdkClient.ContainerDelete parse response status to error. // It also waits for the container to be removed from the network. func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDelete) error { @@ -2887,6 +2959,22 @@ func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid. return cnrIDs, nil } +// // ListContainers requests identifiers of the account-owned containers. +func (p *Pool) ListContainersStream(ctx context.Context, prm PrmListStream) (ResListStream, error) { + var res ResListStream + cp, err := p.connection() + if err != nil { + return res, err + } + + res, err = cp.containerListStream(ctx, prm) + if err != nil { + return res, fmt.Errorf("list containers stream via client '%s': %w", cp.address(), err) + } + + return res, nil +} + // DeleteContainer sends request to remove the FrostFS container and waits for the operation to complete. // // Waiting parameters can be specified using SetWaitParams. If not called, defaults are used: