From cc0fef2c55e2ee2e79dc5cde8e737c54f0ebcb3a Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Sat, 18 Feb 2023 10:43:34 +0300 Subject: [PATCH] [#19] transformer: Merge formatter and payload splitter Signed-off-by: Evgenii Stratonikov --- object/transformer/fmt.go | 112 ------------------------- object/transformer/transformer.go | 95 +++++++++++++++------ object/transformer/transformer_test.go | 12 +-- 3 files changed, 76 insertions(+), 143 deletions(-) diff --git a/object/transformer/fmt.go b/object/transformer/fmt.go index 31357d7..193b8f4 100644 --- a/object/transformer/fmt.go +++ b/object/transformer/fmt.go @@ -1,117 +1,5 @@ package transformer -import ( - "crypto/ecdsa" - "fmt" - - "github.com/TrueCloudLab/frostfs-sdk-go/object" - oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id" - "github.com/TrueCloudLab/frostfs-sdk-go/session" - "github.com/TrueCloudLab/frostfs-sdk-go/version" -) - -type formatter struct { - prm *FormatterParams - - obj *object.Object - - sz uint64 -} - type EpochSource interface { CurrentEpoch() uint64 } - -// FormatterParams groups NewFormatTarget parameters. -type FormatterParams struct { - Key *ecdsa.PrivateKey - - NextTarget ObjectTarget - - SessionToken *session.Object - - NetworkState EpochSource -} - -// NewFormatTarget returns ObjectTarget instance that finalizes object structure -// and writes it to the next target. -// -// Chunks must be written before the WriteHeader call. -// -// Object changes: -// - sets version to current SDK version; -// - sets payload size to the total length of all written chunks; -// - sets session token; -// - sets number of creation epoch; -// - calculates and sets verification fields (ID, Signature). -func NewFormatTarget(p *FormatterParams) ObjectTarget { - return &formatter{ - prm: p, - } -} - -func (f *formatter) WriteHeader(obj *object.Object) error { - f.obj = obj - - return nil -} - -func (f *formatter) Write(p []byte) (n int, err error) { - n, err = f.prm.NextTarget.Write(p) - - f.sz += uint64(n) - - return -} - -func (f *formatter) Close() (*AccessIdentifiers, error) { - curEpoch := f.prm.NetworkState.CurrentEpoch() - ver := version.Current() - - f.obj.SetVersion(&ver) - f.obj.SetPayloadSize(f.sz) - f.obj.SetSessionToken(f.prm.SessionToken) - f.obj.SetCreationEpoch(curEpoch) - - var ( - parID *oid.ID - parHdr *object.Object - ) - - if par := f.obj.Parent(); par != nil && par.Signature() == nil { - rawPar := object.NewFromV2(par.ToV2()) - - rawPar.SetSessionToken(f.prm.SessionToken) - rawPar.SetCreationEpoch(curEpoch) - - if err := object.SetIDWithSignature(*f.prm.Key, rawPar); err != nil { - return nil, fmt.Errorf("could not finalize parent object: %w", err) - } - - id, _ := rawPar.ID() - parID = &id - parHdr = rawPar - - f.obj.SetParent(parHdr) - } - - if err := object.SetIDWithSignature(*f.prm.Key, f.obj); err != nil { - return nil, fmt.Errorf("could not finalize object: %w", err) - } - - if err := f.prm.NextTarget.WriteHeader(f.obj); err != nil { - return nil, fmt.Errorf("could not write header to next target: %w", err) - } - - if _, err := f.prm.NextTarget.Close(); err != nil { - return nil, fmt.Errorf("could not close next target: %w", err) - } - - id, _ := f.obj.ID() - - return &AccessIdentifiers{ - ParentID: parID, - SelfID: id, - ParentHeader: parHdr, - }, nil -} diff --git a/object/transformer/transformer.go b/object/transformer/transformer.go index 8c819fa..55ad9ff 100644 --- a/object/transformer/transformer.go +++ b/object/transformer/transformer.go @@ -1,6 +1,7 @@ package transformer import ( + "crypto/ecdsa" "crypto/sha256" "fmt" "hash" @@ -9,17 +10,15 @@ import ( "github.com/TrueCloudLab/frostfs-sdk-go/checksum" "github.com/TrueCloudLab/frostfs-sdk-go/object" oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/TrueCloudLab/frostfs-sdk-go/session" + "github.com/TrueCloudLab/frostfs-sdk-go/version" "github.com/TrueCloudLab/tzhash/tz" ) type payloadSizeLimiter struct { - maxSize, written uint64 + Params - withoutHomomorphicHash bool - - targetInit func() ObjectTarget - - target ObjectTarget + written, writtenCurrent uint64 current, parent *object.Object @@ -40,6 +39,15 @@ type payloadChecksumHasher struct { checksumWriter func([]byte) } +type Params struct { + Key *ecdsa.PrivateKey + NextTarget ObjectTarget + SessionToken *session.Object + NetworkState EpochSource + MaxSize uint64 + WithoutHomomorphicHash bool +} + // NewPayloadSizeLimiter returns ObjectTarget instance that restricts payload length // of the writing object and writes generated objects to targets from initializer. // @@ -47,12 +55,10 @@ type payloadChecksumHasher struct { // is false. // // Objects w/ payload size less or equal than max size remain untouched. -func NewPayloadSizeLimiter(maxSize uint64, withoutHomomorphicHash bool, targetInit TargetInitializer) ObjectTarget { +func NewPayloadSizeLimiter(p Params) ObjectTarget { return &payloadSizeLimiter{ - maxSize: maxSize, - withoutHomomorphicHash: withoutHomomorphicHash, - targetInit: targetInit, - splitID: object.NewSplitID(), + Params: p, + splitID: object.NewSplitID(), } } @@ -110,16 +116,14 @@ func fromObject(obj *object.Object) *object.Object { } func (s *payloadSizeLimiter) initializeCurrent() { - // initialize current object target - s.target = s.targetInit() - // create payload hashers - s.currentHashers = payloadHashersForObject(s.current, s.withoutHomomorphicHash) + s.writtenCurrent = 0 + s.currentHashers = payloadHashersForObject(s.current, s.WithoutHomomorphicHash) // compose multi-writer from target and all payload hashers ws := make([]io.Writer, 0, 1+len(s.currentHashers)+len(s.parentHashers)) - ws = append(ws, s.target) + ws = append(ws, s.NextTarget) for i := range s.currentHashers { ws = append(ws, s.currentHashers[i].hasher) @@ -189,14 +193,54 @@ func (s *payloadSizeLimiter) release(finalize bool) (*AccessIdentifiers, error) // release current object writeHashes(s.currentHashers) - // release current, get its id - if err := s.target.WriteHeader(s.current); err != nil { - return nil, fmt.Errorf("could not write header: %w", err) + curEpoch := s.NetworkState.CurrentEpoch() + ver := version.Current() + + s.current.SetVersion(&ver) + s.current.SetPayloadSize(s.writtenCurrent) + s.current.SetSessionToken(s.SessionToken) + s.current.SetCreationEpoch(curEpoch) + + var ( + parID *oid.ID + parHdr *object.Object + ) + + if par := s.current.Parent(); par != nil && par.Signature() == nil { + rawPar := object.NewFromV2(par.ToV2()) + + rawPar.SetSessionToken(s.SessionToken) + rawPar.SetCreationEpoch(curEpoch) + + if err := object.SetIDWithSignature(*s.Key, rawPar); err != nil { + return nil, fmt.Errorf("could not finalize parent object: %w", err) + } + + id, _ := rawPar.ID() + parID = &id + parHdr = rawPar + + s.current.SetParent(parHdr) } - ids, err := s.target.Close() - if err != nil { - return nil, fmt.Errorf("could not close target: %w", err) + if err := object.SetIDWithSignature(*s.Key, s.current); err != nil { + return nil, fmt.Errorf("could not finalize object: %w", err) + } + + if err := s.NextTarget.WriteHeader(s.current); err != nil { + return nil, fmt.Errorf("could not write header to next target: %w", err) + } + + if _, err := s.NextTarget.Close(); err != nil { + return nil, fmt.Errorf("could not close next target: %w", err) + } + + id, _ := s.current.ID() + + ids := &AccessIdentifiers{ + ParentID: parID, + SelfID: id, + ParentHeader: parHdr, } // save identifier of the released object @@ -231,8 +275,8 @@ func (s *payloadSizeLimiter) initializeLinking(parHdr *object.Object) { func (s *payloadSizeLimiter) writeChunk(chunk []byte) error { for { // statement is true if the previous write of bytes reached exactly the boundary. - if s.written > 0 && s.written%s.maxSize == 0 { - if s.written == s.maxSize { + if s.written > 0 && s.written%s.MaxSize == 0 { + if s.written == s.MaxSize { s.prepareFirstChild() } @@ -248,7 +292,7 @@ func (s *payloadSizeLimiter) writeChunk(chunk []byte) error { var ( ln = uint64(len(chunk)) cut = ln - leftToEdge = s.maxSize - s.written%s.maxSize + leftToEdge = s.MaxSize - s.written%s.MaxSize ) // write bytes no further than the boundary of the current object @@ -261,6 +305,7 @@ func (s *payloadSizeLimiter) writeChunk(chunk []byte) error { } // increase written bytes counter + s.writtenCurrent += cut s.written += cut if cut == ln { diff --git a/object/transformer/transformer_test.go b/object/transformer/transformer_test.go index 8fcd680..545945d 100644 --- a/object/transformer/transformer_test.go +++ b/object/transformer/transformer_test.go @@ -105,12 +105,12 @@ func newPayloadSizeLimiter(maxSize uint64, nextTarget ObjectTarget) (ObjectTarge panic(err) } - return NewPayloadSizeLimiter(maxSize, true, func() ObjectTarget { - return NewFormatTarget(&FormatterParams{ - Key: &p.PrivateKey, - NextTarget: nextTarget, - NetworkState: dummyEpochSource(123), - }) + return NewPayloadSizeLimiter(Params{ + Key: &p.PrivateKey, + NextTarget: nextTarget, + NetworkState: dummyEpochSource(123), + MaxSize: maxSize, + WithoutHomomorphicHash: true, }), p }