diff --git a/api/apemanager/grpc/service_grpc.pb.go b/api/apemanager/grpc/service_grpc.pb.go index eb11f3f..bf76cc4 100644 Binary files a/api/apemanager/grpc/service_grpc.pb.go and b/api/apemanager/grpc/service_grpc.pb.go differ diff --git a/api/container/convert.go b/api/container/convert.go index c91bdfd..5a9a805 100644 --- a/api/container/convert.go +++ b/api/container/convert.go @@ -762,3 +762,138 @@ func (r *ListResponse) FromGRPCMessage(m grpc.Message) error { return r.ResponseHeaders.FromMessage(v) } + +func (r *ListStreamRequestBody) ToGRPCMessage() grpc.Message { + var m *container.ListStreamRequest_Body + + if r != nil { + m = new(container.ListStreamRequest_Body) + + m.SetOwnerId(r.ownerID.ToGRPCMessage().(*refsGRPC.OwnerID)) + } + + return m +} + +func (r *ListStreamRequestBody) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*container.ListStreamRequest_Body) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + ownerID := v.GetOwnerId() + if ownerID == nil { + r.ownerID = nil + } else { + if r.ownerID == nil { + r.ownerID = new(refs.OwnerID) + } + + err = r.ownerID.FromGRPCMessage(ownerID) + } + + return err +} + +func (r *ListStreamRequest) ToGRPCMessage() grpc.Message { + var m *container.ListStreamRequest + + if r != nil { + m = new(container.ListStreamRequest) + + m.SetBody(r.body.ToGRPCMessage().(*container.ListStreamRequest_Body)) + r.RequestHeaders.ToMessage(m) + } + + return m +} + +func (r *ListStreamRequest) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*container.ListStreamRequest) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(ListStreamRequestBody) + } + + err = r.body.FromGRPCMessage(body) + if err != nil { + return err + } + } + + return r.RequestHeaders.FromMessage(v) +} + +func (r *ListStreamResponseBody) ToGRPCMessage() grpc.Message { + var m *container.ListStreamResponse_Body + + if r != nil { + m = new(container.ListStreamResponse_Body) + + m.SetContainerIds(refs.ContainerIDsToGRPCMessage(r.cidList)) + } + + return m +} + +func (r *ListStreamResponseBody) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*container.ListStreamResponse_Body) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + r.cidList, err = refs.ContainerIDsFromGRPCMessage(v.GetContainerIds()) + + return err +} + +func (r *ListStreamResponse) ToGRPCMessage() grpc.Message { + var m *container.ListStreamResponse + + if r != nil { + m = new(container.ListStreamResponse) + + m.SetBody(r.body.ToGRPCMessage().(*container.ListStreamResponse_Body)) + r.ResponseHeaders.ToMessage(m) + } + + return m +} + +func (r *ListStreamResponse) FromGRPCMessage(m grpc.Message) error { + v, ok := m.(*container.ListStreamResponse) + if !ok { + return message.NewUnexpectedMessageType(m, v) + } + + var err error + + body := v.GetBody() + if body == nil { + r.body = nil + } else { + if r.body == nil { + r.body = new(ListStreamResponseBody) + } + + err = r.body.FromGRPCMessage(body) + if err != nil { + return err + } + } + + return r.ResponseHeaders.FromMessage(v) +} diff --git a/api/container/grpc/service_frostfs.pb.go b/api/container/grpc/service_frostfs.pb.go index 0743824..0388293 100644 Binary files a/api/container/grpc/service_frostfs.pb.go and b/api/container/grpc/service_frostfs.pb.go differ diff --git a/api/container/grpc/service_frostfs_fuzz.go b/api/container/grpc/service_frostfs_fuzz.go index 7e6d6e6..024b237 100644 --- a/api/container/grpc/service_frostfs_fuzz.go +++ b/api/container/grpc/service_frostfs_fuzz.go @@ -157,3 +157,41 @@ func DoFuzzJSONListResponse(data []byte) int { } return 1 } +func DoFuzzProtoListStreamRequest(data []byte) int { + msg := new(ListStreamRequest) + if err := msg.UnmarshalProtobuf(data); err != nil { + return 0 + } + _ = msg.MarshalProtobuf(nil) + return 1 +} +func DoFuzzJSONListStreamRequest(data []byte) int { + msg := new(ListStreamRequest) + if err := msg.UnmarshalJSON(data); err != nil { + return 0 + } + _, err := msg.MarshalJSON() + if err != nil { + panic(err) + } + return 1 +} +func DoFuzzProtoListStreamResponse(data []byte) int { + msg := new(ListStreamResponse) + if err := msg.UnmarshalProtobuf(data); err != nil { + return 0 + } + _ = msg.MarshalProtobuf(nil) + return 1 +} +func DoFuzzJSONListStreamResponse(data []byte) int { + msg := new(ListStreamResponse) + if err := msg.UnmarshalJSON(data); err != nil { + return 0 + } + _, err := msg.MarshalJSON() + if err != nil { + panic(err) + } + return 1 +} diff --git a/api/container/grpc/service_frostfs_test.go b/api/container/grpc/service_frostfs_test.go index 804b89c..2844996 100644 --- a/api/container/grpc/service_frostfs_test.go +++ b/api/container/grpc/service_frostfs_test.go @@ -89,3 +89,23 @@ func FuzzJSONListResponse(f *testing.F) { DoFuzzJSONListResponse(data) }) } +func FuzzProtoListStreamRequest(f *testing.F) { + f.Fuzz(func(t *testing.T, data []byte) { + DoFuzzProtoListStreamRequest(data) + }) +} +func FuzzJSONListStreamRequest(f *testing.F) { + f.Fuzz(func(t *testing.T, data []byte) { + DoFuzzJSONListStreamRequest(data) + }) +} +func FuzzProtoListStreamResponse(f *testing.F) { + f.Fuzz(func(t *testing.T, data []byte) { + DoFuzzProtoListStreamResponse(data) + }) +} +func FuzzJSONListStreamResponse(f *testing.F) { + f.Fuzz(func(t *testing.T, data []byte) { + DoFuzzJSONListStreamResponse(data) + }) +} diff --git a/api/container/grpc/service_grpc.pb.go b/api/container/grpc/service_grpc.pb.go index ea22604..5991e81 100644 Binary files a/api/container/grpc/service_grpc.pb.go and b/api/container/grpc/service_grpc.pb.go differ diff --git a/api/container/marshal.go b/api/container/marshal.go index 2b16669..89a6661 100644 --- a/api/container/marshal.go +++ b/api/container/marshal.go @@ -343,3 +343,65 @@ func (r *ListResponseBody) StableSize() (size int) { func (r *ListResponseBody) Unmarshal(data []byte) error { return message.Unmarshal(r, data, new(container.ListResponse_Body)) } + +func (r *ListStreamRequestBody) StableMarshal(buf []byte) []byte { + if r == nil { + return []byte{} + } + + if buf == nil { + buf = make([]byte, r.StableSize()) + } + + protoutil.NestedStructureMarshal(listReqBodyOwnerField, buf, r.ownerID) + + return buf +} + +func (r *ListStreamRequestBody) StableSize() (size int) { + if r == nil { + return 0 + } + + size += protoutil.NestedStructureSize(listReqBodyOwnerField, r.ownerID) + + return size +} + +func (r *ListStreamRequestBody) Unmarshal(data []byte) error { + return message.Unmarshal(r, data, new(container.ListStreamRequest_Body)) +} + +func (r *ListStreamResponseBody) StableMarshal(buf []byte) []byte { + if r == nil { + return []byte{} + } + + if buf == nil { + buf = make([]byte, r.StableSize()) + } + + var offset int + + for i := range r.cidList { + offset += protoutil.NestedStructureMarshal(listRespBodyIDsField, buf[offset:], &r.cidList[i]) + } + + return buf +} + +func (r *ListStreamResponseBody) StableSize() (size int) { + if r == nil { + return 0 + } + + for i := range r.cidList { + size += protoutil.NestedStructureSize(listRespBodyIDsField, &r.cidList[i]) + } + + return size +} + +func (r *ListStreamResponseBody) Unmarshal(data []byte) error { + return message.Unmarshal(r, data, new(container.ListStreamResponse_Body)) +} diff --git a/api/container/types.go b/api/container/types.go index 92c4706..4da3f87 100644 --- a/api/container/types.go +++ b/api/container/types.go @@ -109,6 +109,26 @@ type ListResponse struct { session.ResponseHeaders } +type ListStreamRequestBody struct { + ownerID *refs.OwnerID +} + +type ListStreamRequest struct { + body *ListStreamRequestBody + + session.RequestHeaders +} + +type ListStreamResponseBody struct { + cidList []refs.ContainerID +} + +type ListStreamResponse struct { + body *ListStreamResponseBody + + session.ResponseHeaders +} + func (a *Attribute) GetKey() string { if a != nil { return a.key @@ -444,3 +464,51 @@ func (r *ListResponse) GetBody() *ListResponseBody { func (r *ListResponse) SetBody(v *ListResponseBody) { r.body = v } + +func (r *ListStreamRequestBody) GetOwnerID() *refs.OwnerID { + if r != nil { + return r.ownerID + } + + return nil +} + +func (r *ListStreamRequestBody) SetOwnerID(v *refs.OwnerID) { + r.ownerID = v +} + +func (r *ListStreamRequest) GetBody() *ListStreamRequestBody { + if r != nil { + return r.body + } + + return nil +} + +func (r *ListStreamRequest) SetBody(v *ListStreamRequestBody) { + r.body = v +} + +func (r *ListStreamResponseBody) GetContainerIDs() []refs.ContainerID { + if r != nil { + return r.cidList + } + + return nil +} + +func (r *ListStreamResponseBody) SetContainerIDs(v []refs.ContainerID) { + r.cidList = v +} + +func (r *ListStreamResponse) GetBody() *ListStreamResponseBody { + if r != nil { + return r.body + } + + return nil +} + +func (r *ListStreamResponse) SetBody(v *ListStreamResponseBody) { + r.body = v +} diff --git a/api/object/grpc/service_grpc.pb.go b/api/object/grpc/service_grpc.pb.go index eb53afe..2054f89 100644 Binary files a/api/object/grpc/service_grpc.pb.go and b/api/object/grpc/service_grpc.pb.go differ diff --git a/api/rpc/container.go b/api/rpc/container.go index 9ba5c99..0e3af82 100644 --- a/api/rpc/container.go +++ b/api/rpc/container.go @@ -13,6 +13,7 @@ const ( rpcContainerGet = "Get" rpcContainerDel = "Delete" rpcContainerList = "List" + rpcContainerStream = "ListStream" rpcContainerGetEACL = "GetExtendedACL" rpcContainerUsedSpace = "AnnounceUsedSpace" ) @@ -80,3 +81,27 @@ func ListContainers( return resp, nil } + +type ListStreamResponseReader struct { + r client.MessageReader +} + +func (r *ListStreamResponseReader) Read(resp *container.ListStreamResponse) error { + return r.r.ReadMessage(resp) +} + +// ListContainersStream executes ContainerService.ListStream RPC. +func ListContainersStream( + cli *client.Client, + req *container.ListStreamRequest, + opts ...client.CallOption, +) (*ListStreamResponseReader, error) { + wc, err := client.OpenServerStream(cli, common.CallMethodInfoServerStream(serviceContainer, rpcContainerList), req, opts...) + if err != nil { + return nil, err + } + + return &ListStreamResponseReader{ + r: wc, + }, nil +} diff --git a/api/signature/body.go b/api/signature/body.go index 50a09e9..2467dc0 100644 --- a/api/signature/body.go +++ b/api/signature/body.go @@ -46,6 +46,10 @@ func serviceMessageBody(req any) stableMarshaler { return v.GetBody() case *container.ListResponse: return v.GetBody() + case *container.ListStreamRequest: + return v.GetBody() + case *container.ListStreamResponse: + return v.GetBody() /* Object */ case *object.PutRequest: diff --git a/client/container_list_stream.go b/client/container_list_stream.go new file mode 100644 index 0000000..35936d8 --- /dev/null +++ b/client/container_list_stream.go @@ -0,0 +1,182 @@ +package client + +import ( + "context" + "errors" + "fmt" + "io" + + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs" + rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client" + v2session "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/signature" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" +) + +// PrmContainerListStream groups parameters of ContainerListStream operation. +type PrmContainerListStream struct { + XHeaders []string + + Account user.ID + + Session *session.Container +} + +// SetAccount sets identifier of the FrostFS account to list the containers. +// Required parameter. +// +// Deprecated: Use PrmContainerListStream.Account instead. +func (x *PrmContainerListStream) SetAccount(id user.ID) { + x.Account = id +} + +func (x *PrmContainerListStream) buildRequest(c *Client) (*container.ListStreamRequest, error) { + if x.Account.IsEmpty() { + return nil, errorAccountNotSet + } + + var ownerV2 refs.OwnerID + x.Account.WriteToV2(&ownerV2) + + reqBody := new(container.ListStreamRequestBody) + reqBody.SetOwnerID(&ownerV2) + + var meta v2session.RequestMetaHeader + writeXHeadersToMeta(x.XHeaders, &meta) + + if x.Session != nil { + var tokv2 v2session.Token + x.Session.WriteToV2(&tokv2) + + meta.SetSessionToken(&tokv2) + } + + var req container.ListStreamRequest + req.SetBody(reqBody) + c.prepareRequest(&req, &meta) + return &req, nil +} + +type ResContainerListStream struct { + statusRes +} + +type ContainerListReader struct { + client *Client + cancelCtxStream context.CancelFunc + err error + res ResContainerListStream + stream interface { + Read(resp *container.ListStreamResponse) error + } + tail []refs.ContainerID +} + +func (x *ContainerListReader) Read(buf []cid.ID) (int, bool) { + if len(buf) == 0 { + panic("empty buffer in ContainerListReader.ReadList") + } + + read := copyCnrIDBuffers(buf, x.tail) + x.tail = x.tail[read:] + + if len(buf) == read { + return read, true + } + + for { + var resp container.ListStreamResponse + x.err = x.stream.Read(&resp) + if x.err != nil { + return read, false + } + + x.res.st, x.err = x.client.processResponse(&resp) + if x.err != nil || !apistatus.IsSuccessful(x.res.st) { + return read, false + } + + // read new chunk of objects + ids := resp.GetBody().GetContainerIDs() + if len(ids) == 0 { + // just skip empty lists since they are not prohibited by protocol + continue + } + + ln := copyCnrIDBuffers(buf[read:], ids) + read += ln + + if read == len(buf) { + // save the tail + x.tail = append(x.tail, ids[ln:]...) + + return read, true + } + } +} + +func copyCnrIDBuffers(dst []cid.ID, src []refs.ContainerID) int { + var i int + for ; i < len(dst) && i < len(src); i++ { + _ = dst[i].ReadFromV2(src[i]) + } + return i +} + +func (x *ContainerListReader) Iterate(f func(cid.ID) bool) error { + buf := make([]cid.ID, 1) + + for { + // Do not check first return value because `len(buf) == 1`, + // so false means nothing was read. + _, ok := x.Read(buf) + if !ok { + res, err := x.Close() + if err != nil { + return err + } + return apistatus.ErrFromStatus(res.Status()) + } + if f(buf[0]) { + return nil + } + } +} + +func (x *ContainerListReader) Close() (*ResContainerListStream, error) { + defer x.cancelCtxStream() + + if x.err != nil && !errors.Is(x.err, io.EOF) { + return nil, x.err + } + + return &x.res, nil +} + +func (c *Client) ContainerListInit(ctx context.Context, prm PrmContainerListStream) (*ContainerListReader, error) { + req, err := prm.buildRequest(c) + if err != nil { + return nil, err + } + + err = signature.SignServiceMessage(&c.prm.Key, req) + if err != nil { + return nil, fmt.Errorf("sign request: %w", err) + } + + var r ContainerListReader + ctx, r.cancelCtxStream = context.WithCancel(ctx) + + r.stream, err = rpcapi.ListContainersStream(&c.c, req, client.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("open stream: %w", err) + } + r.client = c + + return &r, nil +}