2020-09-30 08:39:45 +00:00
|
|
|
package object
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
|
2023-03-07 13:38:26 +00:00
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
2024-11-07 14:32:10 +00:00
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
2020-09-30 08:39:45 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
type (
|
|
|
|
TransportSplitter struct {
|
2020-12-02 23:45:25 +00:00
|
|
|
next ServiceServer
|
2020-09-30 08:39:45 +00:00
|
|
|
|
|
|
|
chunkSize uint64
|
|
|
|
addrAmount uint64
|
|
|
|
}
|
|
|
|
|
2020-12-02 23:45:25 +00:00
|
|
|
getStreamMsgSizeCtrl struct {
|
|
|
|
util.ServerStream
|
|
|
|
|
|
|
|
stream GetObjectStream
|
|
|
|
|
2020-09-30 08:39:45 +00:00
|
|
|
chunkSize int
|
|
|
|
}
|
|
|
|
|
2020-12-10 12:26:40 +00:00
|
|
|
searchStreamMsgSizeCtrl struct {
|
|
|
|
util.ServerStream
|
|
|
|
|
|
|
|
stream SearchStream
|
|
|
|
|
2020-09-30 08:39:45 +00:00
|
|
|
addrAmount uint64
|
|
|
|
}
|
|
|
|
|
2020-12-07 17:49:47 +00:00
|
|
|
rangeStreamMsgSizeCtrl struct {
|
|
|
|
util.ServerStream
|
|
|
|
|
|
|
|
stream GetObjectRangeStream
|
|
|
|
|
2020-09-30 08:39:45 +00:00
|
|
|
chunkSize int
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
2020-12-02 23:45:25 +00:00
|
|
|
func (s *getStreamMsgSizeCtrl) Send(resp *object.GetResponse) error {
|
|
|
|
body := resp.GetBody()
|
|
|
|
|
|
|
|
part := body.GetObjectPart()
|
|
|
|
|
|
|
|
chunkPart, ok := part.(*object.GetObjectPartChunk)
|
|
|
|
if !ok {
|
|
|
|
return s.stream.Send(resp)
|
|
|
|
}
|
|
|
|
|
|
|
|
var newResp *object.GetResponse
|
|
|
|
|
|
|
|
for buf := bytes.NewBuffer(chunkPart.GetChunk()); buf.Len() > 0; {
|
|
|
|
if newResp == nil {
|
|
|
|
newResp = new(object.GetResponse)
|
|
|
|
newResp.SetBody(body)
|
|
|
|
}
|
|
|
|
|
|
|
|
chunkPart.SetChunk(buf.Next(s.chunkSize))
|
|
|
|
newResp.SetMetaHeader(resp.GetMetaHeader())
|
|
|
|
newResp.SetVerificationHeader(resp.GetVerificationHeader())
|
|
|
|
|
|
|
|
if err := s.stream.Send(newResp); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewTransportSplitter(size, amount uint64, next ServiceServer) *TransportSplitter {
|
2020-09-30 08:39:45 +00:00
|
|
|
return &TransportSplitter{
|
|
|
|
next: next,
|
|
|
|
chunkSize: size,
|
|
|
|
addrAmount: amount,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-12-02 23:45:25 +00:00
|
|
|
func (c *TransportSplitter) Get(req *object.GetRequest, stream GetObjectStream) error {
|
|
|
|
return c.next.Get(req, &getStreamMsgSizeCtrl{
|
|
|
|
ServerStream: stream,
|
|
|
|
stream: stream,
|
|
|
|
chunkSize: int(c.chunkSize),
|
|
|
|
})
|
2020-09-30 08:39:45 +00:00
|
|
|
}
|
|
|
|
|
2024-10-21 13:27:28 +00:00
|
|
|
func (c TransportSplitter) Put(ctx context.Context) (PutObjectStream, error) {
|
|
|
|
return c.next.Put(ctx)
|
2020-09-30 08:39:45 +00:00
|
|
|
}
|
|
|
|
|
2024-10-21 13:27:28 +00:00
|
|
|
func (c TransportSplitter) Patch(ctx context.Context) (PatchObjectStream, error) {
|
|
|
|
return c.next.Patch(ctx)
|
2024-08-12 10:01:57 +00:00
|
|
|
}
|
|
|
|
|
2020-09-30 08:39:45 +00:00
|
|
|
func (c TransportSplitter) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) {
|
|
|
|
return c.next.Head(ctx, request)
|
|
|
|
}
|
|
|
|
|
2020-12-10 12:26:40 +00:00
|
|
|
func (c TransportSplitter) Search(req *object.SearchRequest, stream SearchStream) error {
|
|
|
|
return c.next.Search(req, &searchStreamMsgSizeCtrl{
|
|
|
|
ServerStream: stream,
|
|
|
|
stream: stream,
|
|
|
|
addrAmount: c.addrAmount,
|
|
|
|
})
|
2020-09-30 08:39:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c TransportSplitter) Delete(ctx context.Context, request *object.DeleteRequest) (*object.DeleteResponse, error) {
|
|
|
|
return c.next.Delete(ctx, request)
|
|
|
|
}
|
|
|
|
|
2023-07-03 08:36:20 +00:00
|
|
|
func (c TransportSplitter) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
|
|
|
return c.next.PutSingle(ctx, req)
|
|
|
|
}
|
|
|
|
|
2020-12-07 17:49:47 +00:00
|
|
|
func (s *rangeStreamMsgSizeCtrl) Send(resp *object.GetRangeResponse) error {
|
|
|
|
body := resp.GetBody()
|
2020-09-30 08:39:45 +00:00
|
|
|
|
2020-12-07 17:49:47 +00:00
|
|
|
chunkPart, ok := body.GetRangePart().(*object.GetRangePartChunk)
|
|
|
|
if !ok {
|
|
|
|
return s.stream.Send(resp)
|
|
|
|
}
|
2020-09-30 08:39:45 +00:00
|
|
|
|
2020-12-07 17:49:47 +00:00
|
|
|
var newResp *object.GetRangeResponse
|
2020-09-30 08:39:45 +00:00
|
|
|
|
2020-12-07 17:49:47 +00:00
|
|
|
for buf := bytes.NewBuffer(chunkPart.GetChunk()); buf.Len() > 0; {
|
|
|
|
if newResp == nil {
|
|
|
|
newResp = new(object.GetRangeResponse)
|
|
|
|
newResp.SetBody(body)
|
2020-09-30 08:39:45 +00:00
|
|
|
}
|
|
|
|
|
2020-12-07 17:49:47 +00:00
|
|
|
chunkPart.SetChunk(buf.Next(s.chunkSize))
|
|
|
|
body.SetRangePart(chunkPart)
|
|
|
|
newResp.SetMetaHeader(resp.GetMetaHeader())
|
|
|
|
newResp.SetVerificationHeader(resp.GetVerificationHeader())
|
2020-09-30 08:39:45 +00:00
|
|
|
|
2020-12-07 17:49:47 +00:00
|
|
|
if err := s.stream.Send(newResp); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2020-09-30 08:39:45 +00:00
|
|
|
|
2020-12-07 17:49:47 +00:00
|
|
|
return nil
|
|
|
|
}
|
2020-09-30 08:39:45 +00:00
|
|
|
|
2020-12-07 17:49:47 +00:00
|
|
|
func (c TransportSplitter) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
|
|
|
|
return c.next.GetRange(req, &rangeStreamMsgSizeCtrl{
|
|
|
|
ServerStream: stream,
|
|
|
|
stream: stream,
|
|
|
|
chunkSize: int(c.chunkSize),
|
|
|
|
})
|
|
|
|
}
|
2020-09-30 08:39:45 +00:00
|
|
|
|
2020-12-07 17:49:47 +00:00
|
|
|
func (c TransportSplitter) GetRangeHash(ctx context.Context, request *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
|
|
|
|
return c.next.GetRangeHash(ctx, request)
|
2020-09-30 08:39:45 +00:00
|
|
|
}
|
|
|
|
|
2020-12-10 12:26:40 +00:00
|
|
|
func (s *searchStreamMsgSizeCtrl) Send(resp *object.SearchResponse) error {
|
|
|
|
body := resp.GetBody()
|
|
|
|
ids := body.GetIDList()
|
2020-09-30 08:39:45 +00:00
|
|
|
|
2020-12-10 12:26:40 +00:00
|
|
|
var newResp *object.SearchResponse
|
2020-09-30 08:39:45 +00:00
|
|
|
|
[#1586] objsvc: Allow to send search response in multiple messages
Previously, `ln` was only set once, so search has really worked for
small number of objects.
Fix panic:
```
panic: runtime error: slice bounds out of range [:43690] with capacity 21238
goroutine 6859775 [running]:
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object.(*searchStreamMsgSizeCtrl).Send(0xc001eec8d0, 0xc005734000)
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/transport_splitter.go:173 +0x1f0
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search/v2.(*streamWriter).WriteIDs(0xc000520320, {0xc00eb1a000, 0x4fd9c, 0x7fd6475a9a68?})
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search/v2/streamer.go:28 +0x155
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search.(*uniqueIDWriter).WriteIDs(0xc001386420, {0xc00eb1a000?, 0xc0013ea9c0?, 0x113eef3?})
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search/util.go:62 +0x202
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search.(*execCtx).writeIDList(0xc00011aa38?, {0xc00eb1a000?, 0xc001eec9f0?, 0xc0008f4380?})
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search/exec.go:68 +0x91
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search.(*execCtx).executeLocal(0xc0008f4380, {0x176c538, 0xc001eec9f0})
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search/local.go:18 +0x16b
```
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-28 06:03:07 +00:00
|
|
|
for {
|
2020-12-10 12:26:40 +00:00
|
|
|
if newResp == nil {
|
|
|
|
newResp = new(object.SearchResponse)
|
|
|
|
newResp.SetBody(body)
|
|
|
|
}
|
2020-09-30 08:39:45 +00:00
|
|
|
|
[#1586] objsvc: Allow to send search response in multiple messages
Previously, `ln` was only set once, so search has really worked for
small number of objects.
Fix panic:
```
panic: runtime error: slice bounds out of range [:43690] with capacity 21238
goroutine 6859775 [running]:
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object.(*searchStreamMsgSizeCtrl).Send(0xc001eec8d0, 0xc005734000)
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/transport_splitter.go:173 +0x1f0
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search/v2.(*streamWriter).WriteIDs(0xc000520320, {0xc00eb1a000, 0x4fd9c, 0x7fd6475a9a68?})
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search/v2/streamer.go:28 +0x155
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search.(*uniqueIDWriter).WriteIDs(0xc001386420, {0xc00eb1a000?, 0xc0013ea9c0?, 0x113eef3?})
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search/util.go:62 +0x202
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search.(*execCtx).writeIDList(0xc00011aa38?, {0xc00eb1a000?, 0xc001eec9f0?, 0xc0008f4380?})
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search/exec.go:68 +0x91
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search.(*execCtx).executeLocal(0xc0008f4380, {0x176c538, 0xc001eec9f0})
git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search/local.go:18 +0x16b
```
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-28 06:03:07 +00:00
|
|
|
cut := min(s.addrAmount, uint64(len(ids)))
|
2020-12-10 12:26:40 +00:00
|
|
|
|
|
|
|
body.SetIDList(ids[:cut])
|
|
|
|
newResp.SetMetaHeader(resp.GetMetaHeader())
|
|
|
|
newResp.SetVerificationHeader(resp.GetVerificationHeader())
|
2020-09-30 08:39:45 +00:00
|
|
|
|
2020-12-10 12:26:40 +00:00
|
|
|
if err := s.stream.Send(newResp); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-09-30 08:39:45 +00:00
|
|
|
|
2020-12-10 12:26:40 +00:00
|
|
|
ids = ids[cut:]
|
2022-01-10 09:19:08 +00:00
|
|
|
|
|
|
|
if len(ids) == 0 {
|
|
|
|
break
|
|
|
|
}
|
2020-09-30 08:39:45 +00:00
|
|
|
}
|
|
|
|
|
2020-12-10 12:26:40 +00:00
|
|
|
return nil
|
2020-09-30 08:39:45 +00:00
|
|
|
}
|