From d8f1cc29f550d79b74321223db9fed9a3c4658ad Mon Sep 17 00:00:00 2001 From: Ekaterina Lebedeva Date: Mon, 28 Oct 2024 18:06:01 +0300 Subject: [PATCH] [#XX] container: Add ListStream method Signed-off-by: Ekaterina Lebedeva --- client/container_list_stream.go | 182 ++++++++++++++++++++++++++++++++ 1 file changed, 182 insertions(+) create mode 100644 client/container_list_stream.go diff --git a/client/container_list_stream.go b/client/container_list_stream.go new file mode 100644 index 0000000..5c460a9 --- /dev/null +++ b/client/container_list_stream.go @@ -0,0 +1,182 @@ +package client + +import ( + "context" + "errors" + "fmt" + "io" + + v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container" + v2refs "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" + rpcapi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" +) + +// PrmContainerListStream groups parameters of ContainerListStream operation. +type PrmContainerListStream struct { + XHeaders []string + + Account user.ID + + Session *session.Container +} + +// SetAccount sets identifier of the FrostFS account to list the containers. +// Required parameter. +// +// Deprecated: Use PrmContainerListStream.Account instead. +func (x *PrmContainerListStream) SetAccount(id user.ID) { + x.Account = id +} + +func (x *PrmContainerListStream) buildRequest(c *Client) (*v2container.ListStreamRequest, error) { + if x.Account.IsEmpty() { + return nil, errorAccountNotSet + } + + var ownerV2 v2refs.OwnerID + x.Account.WriteToV2(&ownerV2) + + reqBody := new(v2container.ListStreamRequestBody) + reqBody.SetOwnerID(&ownerV2) + + var meta v2session.RequestMetaHeader + writeXHeadersToMeta(x.XHeaders, &meta) + + if x.Session != nil { + var tokv2 v2session.Token + x.Session.WriteToV2(&tokv2) + + meta.SetSessionToken(&tokv2) + } + + var req v2container.ListStreamRequest + req.SetBody(reqBody) + c.prepareRequest(&req, &meta) + return &req, nil +} + +type ResContainerListStream struct { + statusRes +} + +type ContainerListReader struct { + client *Client + cancelCtxStream context.CancelFunc + err error + res ResContainerListStream + stream interface { + Read(resp *v2container.ListStreamResponse) error + } + tail []v2refs.ContainerID +} + +func (x *ContainerListReader) Read(buf []cid.ID) (int, bool) { + if len(buf) == 0 { + panic("empty buffer in ContainerListReader.ReadList") + } + + read := copyCnrIDBuffers(buf, x.tail) + x.tail = x.tail[read:] + + if len(buf) == read { + return read, true + } + + for { + var resp v2container.ListStreamResponse + x.err = x.stream.Read(&resp) + if x.err != nil { + return read, false + } + + x.res.st, x.err = x.client.processResponse(&resp) + if x.err != nil || !apistatus.IsSuccessful(x.res.st) { + return read, false + } + + // read new chunk of objects + ids := resp.GetBody().GetContainerIDs() + if len(ids) == 0 { + // just skip empty lists since they are not prohibited by protocol + continue + } + + ln := copyCnrIDBuffers(buf[read:], ids) + read += ln + + if read == len(buf) { + // save the tail + x.tail = append(x.tail, ids[ln:]...) + + return read, true + } + } +} + +func copyCnrIDBuffers(dst []cid.ID, src []v2refs.ContainerID) int { + var i int + for ; i < len(dst) && i < len(src); i++ { + _ = dst[i].ReadFromV2(src[i]) + } + return i +} + +func (x *ContainerListReader) Iterate(f func(cid.ID) bool) error { + buf := make([]cid.ID, 1) + + for { + // Do not check first return value because `len(buf) == 1`, + // so false means nothing was read. + _, ok := x.Read(buf) + if !ok { + res, err := x.Close() + if err != nil { + return err + } + return apistatus.ErrFromStatus(res.Status()) + } + if f(buf[0]) { + return nil + } + } +} + +func (x *ContainerListReader) Close() (*ResContainerListStream, error) { + defer x.cancelCtxStream() + + if x.err != nil && !errors.Is(x.err, io.EOF) { + return nil, x.err + } + + return &x.res, nil +} + +func (c *Client) ContainerListInit(ctx context.Context, prm PrmContainerListStream) (*ContainerListReader, error) { + req, err := prm.buildRequest(c) + if err != nil { + return nil, err + } + + err = signature.SignServiceMessage(&c.prm.Key, req) + if err != nil { + return nil, fmt.Errorf("sign request: %w", err) + } + + var r ContainerListReader + ctx, r.cancelCtxStream = context.WithCancel(ctx) + + r.stream, err = rpcapi.ListContainersStream(&c.c, req, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("open stream: %w", err) + } + r.client = c + + return &r, nil +}