From 272118af2cec471628919421f806622e49c995d7 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Fri, 7 Jul 2023 15:34:31 +0300 Subject: [PATCH] [#64] transformer: Simplify interface Signed-off-by: Dmitrii Stepanov --- client/object_put_transformer.go | 42 ++++++++------------------ object/transformer/channel.go | 38 +++++------------------ object/transformer/channel_test.go | 4 +-- object/transformer/transformer.go | 26 +++++++--------- object/transformer/transformer_test.go | 39 +++++------------------- object/transformer/types.go | 15 ++++++--- 6 files changed, 52 insertions(+), 112 deletions(-) diff --git a/client/object_put_transformer.go b/client/object_put_transformer.go index cb44f61..ad8165d 100644 --- a/client/object_put_transformer.go +++ b/client/object_put_transformer.go @@ -21,7 +21,7 @@ func (c *Client) objectPutInitTransformer(prm PrmObjectPutInit) (*objectWriterTr } w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{ Key: key, - NextTargetInit: func() transformer.ObjectTarget { return &w.it }, + NextTargetInit: func() transformer.ObjectWriter { return &w.it }, MaxSize: prm.maxSize, WithoutHomomorphicHash: prm.withoutHomomorphicHash, NetworkState: prm.epochSource, @@ -30,7 +30,7 @@ func (c *Client) objectPutInitTransformer(prm PrmObjectPutInit) (*objectWriterTr } type objectWriterTransformer struct { - ot transformer.ObjectTarget + ot transformer.ChunkedObjectWriter it internalTarget err error } @@ -58,56 +58,40 @@ func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, err } type internalTarget struct { - current *object.Object client *Client res *ResObjectPut prm PrmObjectPutInit - payload []byte useStream bool } -func (it *internalTarget) WriteHeader(_ context.Context, object *object.Object) error { - it.current = object - return nil -} - -func (it *internalTarget) Write(_ context.Context, p []byte) (n int, err error) { - it.payload = append(it.payload, p...) - return len(p), nil -} - -func (it *internalTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) { - it.current.SetPayload(it.payload) - - putSingleImplemented, err := it.tryPutSingle(ctx) +func (it *internalTarget) WriteObject(ctx context.Context, o *object.Object) error { + putSingleImplemented, err := it.tryPutSingle(ctx, o) if putSingleImplemented { - return nil, err + return err } it.useStream = true - return nil, it.putAsStream(ctx) + return it.putAsStream(ctx, o) } -func (it *internalTarget) putAsStream(ctx context.Context) error { +func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error { wrt, err := it.client.objectPutInitRaw(ctx, it.prm) if err != nil { return err } - if wrt.WriteHeader(ctx, *it.current) { - wrt.WritePayloadChunk(ctx, it.current.Payload()) + if wrt.WriteHeader(ctx, *o) { + wrt.WritePayloadChunk(ctx, o.Payload()) } it.res, err = wrt.Close(ctx) - it.current = nil - it.payload = nil return err } -func (it *internalTarget) tryPutSingle(ctx context.Context) (bool, error) { +func (it *internalTarget) tryPutSingle(ctx context.Context, o *object.Object) (bool, error) { if it.useStream { return false, nil } var prm PrmObjectPutSingle prm.SetCopiesNumber(it.prm.copyNum) - prm.SetObject(it.current.ToV2()) + prm.SetObject(o.ToV2()) prm.UseKey(prm.key) prm.meta = it.prm.meta @@ -117,13 +101,11 @@ func (it *internalTarget) tryPutSingle(ctx context.Context) (bool, error) { } if err == nil { - id, _ := it.current.ID() + id, _ := o.ID() it.res = &ResObjectPut{ statusRes: res.statusRes, obj: id, } } - it.current = nil - it.payload = nil return true, err } diff --git a/object/transformer/channel.go b/object/transformer/channel.go index b7a50a9..f6d94a5 100644 --- a/object/transformer/channel.go +++ b/object/transformer/channel.go @@ -4,47 +4,25 @@ import ( "context" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - "github.com/nspcc-dev/neo-go/pkg/util/slice" ) type chanTarget struct { - header *objectSDK.Object - payload []byte - ch chan<- *objectSDK.Object + ch chan<- *objectSDK.Object } // NewChannelTarget returns ObjectTarget which writes // object parts to a provided channel. -func NewChannelTarget(ch chan<- *objectSDK.Object) ObjectTarget { +func NewChannelTarget(ch chan<- *objectSDK.Object) ObjectWriter { return &chanTarget{ ch: ch, } } -// WriteHeader implements the ObjectTarget interface. -func (c *chanTarget) WriteHeader(_ context.Context, object *objectSDK.Object) error { - c.header = object +func (c *chanTarget) WriteObject(ctx context.Context, o *objectSDK.Object) error { + select { + case c.ch <- o: + case <-ctx.Done(): + return ctx.Err() + } return nil } - -// Write implements the ObjectTarget interface. -func (c *chanTarget) Write(_ context.Context, p []byte) (n int, err error) { - c.payload = append(c.payload, p...) - return len(p), nil -} - -// Close implements the ObjectTarget interface. -func (c *chanTarget) Close(ctx context.Context) (*AccessIdentifiers, error) { - if len(c.payload) != 0 { - c.header.SetPayload(slice.Copy(c.payload)) - } - select { - case c.ch <- c.header: - case <-ctx.Done(): - return nil, ctx.Err() - } - - c.header = nil - c.payload = nil - return new(AccessIdentifiers), nil -} diff --git a/object/transformer/channel_test.go b/object/transformer/channel_test.go index 6b17bd5..1bb074b 100644 --- a/object/transformer/channel_test.go +++ b/object/transformer/channel_test.go @@ -18,8 +18,8 @@ func TestChannelTarget(t *testing.T) { tt := new(testTarget) ct := NewChannelTarget(ch) - chTarget, _ := newPayloadSizeLimiter(maxSize, func() ObjectTarget { return ct }) - testTarget, _ := newPayloadSizeLimiter(maxSize, func() ObjectTarget { return tt }) + chTarget, _ := newPayloadSizeLimiter(maxSize, func() ObjectWriter { return ct }) + testTarget, _ := newPayloadSizeLimiter(maxSize, func() ObjectWriter { return tt }) ver := version.Current() cnr := cidtest.ID() diff --git a/object/transformer/transformer.go b/object/transformer/transformer.go index a175662..6fea8dd 100644 --- a/object/transformer/transformer.go +++ b/object/transformer/transformer.go @@ -20,6 +20,7 @@ type payloadSizeLimiter struct { written, writtenCurrent uint64 current, parent *object.Object + payload []byte currentHashers, parentHashers []payloadChecksumHasher @@ -29,12 +30,12 @@ type payloadSizeLimiter struct { parAttrs []object.Attribute - nextTarget ObjectTarget + nextTarget ObjectWriter } type Params struct { Key *ecdsa.PrivateKey - NextTargetInit func() ObjectTarget + NextTargetInit TargetInitializer SessionToken *session.Object NetworkState EpochSource MaxSize uint64 @@ -48,7 +49,7 @@ type Params struct { // is false. // // Objects w/ payload size less or equal than max size remain untouched. -func NewPayloadSizeLimiter(p Params) ObjectTarget { +func NewPayloadSizeLimiter(p Params) ChunkedObjectWriter { return &payloadSizeLimiter{ Params: p, splitID: object.NewSplitID(), @@ -120,6 +121,7 @@ func (s *payloadSizeLimiter) initializeCurrent() { s.nextTarget = s.NextTargetInit() s.writtenCurrent = 0 s.initPayloadHashers() + s.payload = make([]byte, 0) } func (s *payloadSizeLimiter) initPayloadHashers() { @@ -160,12 +162,9 @@ func (s *payloadSizeLimiter) release(ctx context.Context, finalize bool) (*Acces return nil, fmt.Errorf("fillHeader: %w", err) } - if err := s.nextTarget.WriteHeader(ctx, s.current); err != nil { - return nil, fmt.Errorf("could not write header to next target: %w", err) - } - - if _, err := s.nextTarget.Close(ctx); err != nil { - return nil, fmt.Errorf("could not close next target: %w", err) + s.current.SetPayload(s.payload) + if err := s.nextTarget.WriteObject(ctx, s.current); err != nil { + return nil, fmt.Errorf("could not write to next target: %w", err) } // save identifier of the released object @@ -262,7 +261,7 @@ func (s *payloadSizeLimiter) writeChunk(ctx context.Context, chunk []byte) error cut = leftToEdge } - if err := s.writeHashes(ctx, chunk[:cut]); err != nil { + if err := s.writeHashes(chunk[:cut]); err != nil { return fmt.Errorf("could not write chunk to target: %w", err) } @@ -278,11 +277,8 @@ func (s *payloadSizeLimiter) writeChunk(ctx context.Context, chunk []byte) error } } -func (s *payloadSizeLimiter) writeHashes(ctx context.Context, chunk []byte) error { - _, err := s.nextTarget.Write(ctx, chunk) - if err != nil { - return err - } +func (s *payloadSizeLimiter) writeHashes(chunk []byte) error { + s.payload = append(s.payload, chunk...) // The `Write` method of `hash.Hash` never returns an error. for i := range s.currentHashers { diff --git a/object/transformer/transformer_test.go b/object/transformer/transformer_test.go index 11a6843..cbfb11a 100644 --- a/object/transformer/transformer_test.go +++ b/object/transformer/transformer_test.go @@ -20,7 +20,7 @@ func TestTransformer(t *testing.T) { tt := new(testTarget) - target, pk := newPayloadSizeLimiter(maxSize, func() ObjectTarget { return tt }) + target, pk := newPayloadSizeLimiter(maxSize, func() ObjectWriter { return tt }) cnr := cidtest.ID() hdr := newObject(cnr) @@ -99,7 +99,7 @@ func newObject(cnr cid.ID) *objectSDK.Object { return hdr } -func writeObject(t *testing.T, ctx context.Context, target ObjectTarget, header *objectSDK.Object, payload []byte) *AccessIdentifiers { +func writeObject(t *testing.T, ctx context.Context, target ChunkedObjectWriter, header *objectSDK.Object, payload []byte) *AccessIdentifiers { require.NoError(t, target.WriteHeader(ctx, header)) _, err := target.Write(ctx, payload) @@ -131,7 +131,7 @@ func benchmarkTransformer(b *testing.B, header *objectSDK.Object, payloadSize in b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - f, _ := newPayloadSizeLimiter(maxSize, func() ObjectTarget { return benchTarget{} }) + f, _ := newPayloadSizeLimiter(maxSize, func() ObjectWriter { return benchTarget{} }) if err := f.WriteHeader(ctx, header); err != nil { b.Fatalf("write header: %v", err) } @@ -144,7 +144,7 @@ func benchmarkTransformer(b *testing.B, header *objectSDK.Object, payloadSize in } } -func newPayloadSizeLimiter(maxSize uint64, nextTarget func() ObjectTarget) (ObjectTarget, *keys.PrivateKey) { +func newPayloadSizeLimiter(maxSize uint64, nextTarget TargetInitializer) (ChunkedObjectWriter, *keys.PrivateKey) { p, err := keys.NewPrivateKey() if err != nil { panic(err) @@ -167,38 +167,15 @@ func (s dummyEpochSource) CurrentEpoch() uint64 { type benchTarget struct{} -func (benchTarget) WriteHeader(_ context.Context, object *objectSDK.Object) error { +func (benchTarget) WriteObject(context.Context, *objectSDK.Object) error { return nil } -func (benchTarget) Write(_ context.Context, p []byte) (n int, err error) { - return len(p), nil -} - -func (benchTarget) Close(context.Context) (*AccessIdentifiers, error) { - return nil, nil -} - type testTarget struct { - current *objectSDK.Object - payload []byte objects []*objectSDK.Object } -func (tt *testTarget) WriteHeader(_ context.Context, object *objectSDK.Object) error { - tt.current = object - return nil -} - -func (tt *testTarget) Write(_ context.Context, p []byte) (n int, err error) { - tt.payload = append(tt.payload, p...) - return len(p), nil -} - -func (tt *testTarget) Close(_ context.Context) (*AccessIdentifiers, error) { - tt.current.SetPayload(tt.payload) - tt.objects = append(tt.objects, tt.current) - tt.current = nil - tt.payload = nil - return nil, nil // AccessIdentifiers should not be used. +func (tt *testTarget) WriteObject(_ context.Context, o *objectSDK.Object) error { + tt.objects = append(tt.objects, o) + return nil // AccessIdentifiers should not be used. } diff --git a/object/transformer/types.go b/object/transformer/types.go index a7e827c..212f453 100644 --- a/object/transformer/types.go +++ b/object/transformer/types.go @@ -21,8 +21,9 @@ type EpochSource interface { CurrentEpoch() uint64 } -// ObjectTarget is an interface of the object writer. -type ObjectTarget interface { +// ChunkedObjectWriter is an interface of the object writer +// that writes object chunked. +type ChunkedObjectWriter interface { // WriteHeader writes object header w/ payload part. // The payload of the object may be incomplete. // @@ -51,5 +52,11 @@ type ObjectTarget interface { Close(context.Context) (*AccessIdentifiers, error) } -// TargetInitializer represents ObjectTarget constructor. -type TargetInitializer func() ObjectTarget +// TargetInitializer represents ObjectWriter constructor. +type TargetInitializer func() ObjectWriter + +// ObjectWriter is an interface of the object writer that writes prepared object. +type ObjectWriter interface { + // WriteObject writes prepared object. + WriteObject(context.Context, *object.Object) error +}