From 94c0a607b509e16090ed5994dbd23e875adf03cd Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 23 Feb 2023 17:13:27 +0300 Subject: [PATCH] [#19] transformer: Add a target which sends parts to a channel Signed-off-by: Evgenii Stratonikov --- object/transformer/channel.go | 44 ++++++++++++++++++++++++ object/transformer/channel_test.go | 55 ++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+) create mode 100644 object/transformer/channel.go create mode 100644 object/transformer/channel_test.go diff --git a/object/transformer/channel.go b/object/transformer/channel.go new file mode 100644 index 0000000..707de09 --- /dev/null +++ b/object/transformer/channel.go @@ -0,0 +1,44 @@ +package transformer + +import ( + objectSDK "github.com/TrueCloudLab/frostfs-sdk-go/object" + "github.com/nspcc-dev/neo-go/pkg/util/slice" +) + +type chanTarget struct { + header *objectSDK.Object + payload []byte + ch chan<- *objectSDK.Object +} + +// NewChannelTarget returns ObjectTarget which writes +// object parts to a provided channel. +func NewChannelTarget(ch chan<- *objectSDK.Object) ObjectTarget { + return &chanTarget{ + ch: ch, + } +} + +// WriteHeader implements the ObjectTarget interface. +func (c *chanTarget) WriteHeader(object *objectSDK.Object) error { + c.header = object + return nil +} + +// Write implements the ObjectTarget interface. +func (c *chanTarget) Write(p []byte) (n int, err error) { + c.payload = append(c.payload, p...) + return len(p), nil +} + +// Close implements the ObjectTarget interface. +func (c *chanTarget) Close() (*AccessIdentifiers, error) { + if len(c.payload) != 0 { + c.header.SetPayload(slice.Copy(c.payload)) + } + c.ch <- c.header + + c.header = nil + c.payload = nil + return new(AccessIdentifiers), nil +} diff --git a/object/transformer/channel_test.go b/object/transformer/channel_test.go new file mode 100644 index 0000000..99aef23 --- /dev/null +++ b/object/transformer/channel_test.go @@ -0,0 +1,55 @@ +package transformer + +import ( + "crypto/rand" + "testing" + + cidtest "github.com/TrueCloudLab/frostfs-sdk-go/container/id/test" + objectSDK "github.com/TrueCloudLab/frostfs-sdk-go/object" + "github.com/TrueCloudLab/frostfs-sdk-go/version" + "github.com/stretchr/testify/require" +) + +func TestChannelTarget(t *testing.T) { + const maxSize = 100 + + ch := make(chan *objectSDK.Object, 10) + tt := new(testTarget) + + chTarget, _ := newPayloadSizeLimiter(maxSize, NewChannelTarget(ch)) + testTarget, _ := newPayloadSizeLimiter(maxSize, tt) + + ver := version.Current() + cnr := cidtest.ID() + hdr := objectSDK.New() + hdr.SetContainerID(cnr) + hdr.SetType(objectSDK.TypeRegular) + hdr.SetVersion(&ver) + + payload := make([]byte, maxSize*2+maxSize/2) + _, _ = rand.Read(payload) + + expectedIDs := writeObject(t, testTarget, hdr, payload) + actualIDs := writeObject(t, chTarget, hdr, payload) + _ = expectedIDs + _ = actualIDs + //require.Equal(t, expectedIDs, actualIDs) + + for i := range tt.objects { + select { + case obj := <-ch: + // Because of the split ID objects can be different. + // However, payload and attributes must be the same. + require.Equal(t, tt.objects[i].Payload(), obj.Payload()) + require.Equal(t, tt.objects[i].Attributes(), obj.Attributes()) + default: + require.FailNow(t, "received less parts than expected") + } + } + + select { + case <-ch: + require.FailNow(t, "received more parts than expected") + default: + } +}