From b2ffd7df537ac6ff85c71a017da07781eefd5192 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 27 Apr 2023 18:46:42 +0300 Subject: [PATCH] [#291] object: Use PayloadSizeLimiter from SDK Signed-off-by: Evgenii Stratonikov --- go.mod | 2 +- go.sum | Bin 95389 -> 95389 bytes pkg/services/object/put/distributed.go | 7 +- pkg/services/object/put/local.go | 7 +- pkg/services/object/put/remote.go | 5 +- pkg/services/object/put/streamer.go | 26 +- pkg/services/object/put/validation.go | 2 +- .../object_manager/transformer/fmt.go | 114 ------- .../object_manager/transformer/transformer.go | 294 ------------------ .../object_manager/transformer/types.go | 111 ------- .../object_manager/transformer/writer.go | 52 ---- 11 files changed, 23 insertions(+), 597 deletions(-) delete mode 100644 pkg/services/object_manager/transformer/fmt.go delete mode 100644 pkg/services/object_manager/transformer/transformer.go delete mode 100644 pkg/services/object_manager/transformer/types.go delete mode 100644 pkg/services/object_manager/transformer/writer.go diff --git a/go.mod b/go.mod index 2f0244c6e..ae0bc88c1 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.18 require ( git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230418080822-bd44a3f47b85 git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb - git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230418145405-db5b89496d68 + git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230503082209-d4fe9a193d1a git.frostfs.info/TrueCloudLab/hrw v1.2.0 git.frostfs.info/TrueCloudLab/tzhash v1.8.0 github.com/cheggaaa/pb v1.0.29 diff --git a/go.sum b/go.sum index 3e5bcd57042f107b565b9a56f1c9bcf5e910b39f..633ac9a1838b2911ff66612c1ab9a3d6e1694380 100644 GIT binary patch delta 119 zcmbRHl6CG&)(uJmuBHaY1{Ov}29~-hCTXdbiH4TODTav(8HQG#7RkO*p(Pn^AujGo zfu_!p1tA%cuGy6ZUb$&L{!Wo$#m)w~sUSh9;&a2Bx|xNv25_mL`^FDP|T58HQH5mL5Ju 0 { - // initialize parent object once (after 1st object) - if ln == 1 { - s.detachParent() - } - - // set previous object to the last previous identifier - s.current.SetPreviousID(s.previous[ln-1]) - } - - s.initializeCurrent() -} - -func fromObject(obj *object.Object) *object.Object { - cnr, _ := obj.ContainerID() - - res := object.New() - res.SetContainerID(cnr) - res.SetOwnerID(obj.OwnerID()) - res.SetAttributes(obj.Attributes()...) - res.SetType(obj.Type()) - - // obj.SetSplitID creates splitHeader but we don't need to do it in case - // of small objects, so we should make nil check. - if obj.SplitID() != nil { - res.SetSplitID(obj.SplitID()) - } - - return res -} - -func (s *payloadSizeLimiter) initializeCurrent() { - // initialize current object target - s.target = s.targetInit() - - // create payload hashers - s.currentHashers = payloadHashersForObject(s.current, s.withoutHomomorphicHash) - - // compose multi-writer from target and all payload hashers - ws := make([]writer, 0, 1+len(s.currentHashers)+len(s.parentHashers)) - - ws = append(ws, s.target) - - for i := range s.currentHashers { - ws = append(ws, newWriter(s.currentHashers[i].hasher)) - } - - for i := range s.parentHashers { - ws = append(ws, newWriter(s.parentHashers[i].hasher)) - } - - s.chunkWriter = newMultiWriter(ws...) -} - -func payloadHashersForObject(obj *object.Object, withoutHomomorphicHash bool) []*payloadChecksumHasher { - hashers := make([]*payloadChecksumHasher, 0, 2) - - hashers = append(hashers, &payloadChecksumHasher{ - hasher: sha256.New(), - checksumWriter: func(binChecksum []byte) { - if ln := len(binChecksum); ln != sha256.Size { - panic(fmt.Sprintf("wrong checksum length: expected %d, has %d", sha256.Size, ln)) - } - - csSHA := [sha256.Size]byte{} - copy(csSHA[:], binChecksum) - - var cs checksum.Checksum - cs.SetSHA256(csSHA) - - obj.SetPayloadChecksum(cs) - }, - }) - - if !withoutHomomorphicHash { - hashers = append(hashers, &payloadChecksumHasher{ - hasher: tz.New(), - checksumWriter: func(binChecksum []byte) { - if ln := len(binChecksum); ln != tz.Size { - panic(fmt.Sprintf("wrong checksum length: expected %d, has %d", tz.Size, ln)) - } - - csTZ := [tz.Size]byte{} - copy(csTZ[:], binChecksum) - - var cs checksum.Checksum - cs.SetTillichZemor(csTZ) - - obj.SetPayloadHomomorphicHash(cs) - }, - }) - } - - return hashers -} - -func (s *payloadSizeLimiter) release(ctx context.Context, finalize bool) (*AccessIdentifiers, error) { - // Arg finalize is true only when called from Close method. - // We finalize parent and generate linking objects only if it is more - // than 1 object in split-chain. - withParent := finalize && len(s.previous) > 0 - - if withParent { - writeHashes(s.parentHashers) - s.parent.SetPayloadSize(s.written) - s.current.SetParent(s.parent) - } - - // release current object - writeHashes(s.currentHashers) - - // release current, get its id - if err := s.target.WriteHeader(ctx, s.current); err != nil { - return nil, fmt.Errorf("could not write header: %w", err) - } - - ids, err := s.target.Close(ctx) - if err != nil { - return nil, fmt.Errorf("could not close target: %w", err) - } - - // save identifier of the released object - s.previous = append(s.previous, ids.SelfID()) - - if withParent { - // generate and release linking object - s.initializeLinking(ids.Parent()) - s.initializeCurrent() - - if _, err := s.release(ctx, false); err != nil { - return nil, fmt.Errorf("could not release linking object: %w", err) - } - } - - return ids, nil -} - -func writeHashes(hashers []*payloadChecksumHasher) { - for i := range hashers { - hashers[i].checksumWriter(hashers[i].hasher.Sum(nil)) - } -} - -func (s *payloadSizeLimiter) initializeLinking(parHdr *object.Object) { - s.current = fromObject(s.current) - s.current.SetParent(parHdr) - s.current.SetChildren(s.previous...) - s.current.SetSplitID(s.splitID) -} - -func (s *payloadSizeLimiter) writeChunk(ctx context.Context, chunk []byte) error { - // statement is true if the previous write of bytes reached exactly the boundary. - if s.written > 0 && s.written%s.maxSize == 0 { - if s.written == s.maxSize { - s.prepareFirstChild() - } - - // we need to release current object - if _, err := s.release(ctx, false); err != nil { - return fmt.Errorf("could not release object: %w", err) - } - - // initialize another object - s.initialize() - } - - var ( - ln = uint64(len(chunk)) - cut = ln - leftToEdge = s.maxSize - s.written%s.maxSize - ) - - // write bytes no further than the boundary of the current object - if ln > leftToEdge { - cut = leftToEdge - } - - if _, err := s.chunkWriter.Write(ctx, chunk[:cut]); err != nil { - return fmt.Errorf("could not write chunk to target: %w", err) - } - - // increase written bytes counter - s.written += cut - - // if there are more bytes in buffer we call method again to start filling another object - if ln > leftToEdge { - return s.writeChunk(ctx, chunk[cut:]) - } - - return nil -} - -func (s *payloadSizeLimiter) prepareFirstChild() { - // initialize split header with split ID on first object in chain - s.current.InitRelations() - s.current.SetSplitID(s.splitID) - - // cut source attributes - s.parAttrs = s.current.Attributes() - s.current.SetAttributes() - - // attributes will be added to parent in detachParent -} - -func (s *payloadSizeLimiter) detachParent() { - s.parent = s.current - s.current = fromObject(s.parent) - s.parent.ResetRelations() - s.parent.SetSignature(nil) - s.parentHashers = s.currentHashers - - // return source attributes - s.parent.SetAttributes(s.parAttrs...) -} diff --git a/pkg/services/object_manager/transformer/types.go b/pkg/services/object_manager/transformer/types.go deleted file mode 100644 index 73cea5216..000000000 --- a/pkg/services/object_manager/transformer/types.go +++ /dev/null @@ -1,111 +0,0 @@ -package transformer - -import ( - "context" - - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" -) - -// AccessIdentifiers represents group of the object identifiers -// that are returned after writing the object. -// Consists of the ID of the stored object and the ID of the parent object. -type AccessIdentifiers struct { - par *oid.ID - - self oid.ID - - parHdr *object.Object -} - -// ObjectTarget is an interface of the object writer. -type ObjectTarget interface { - // WriteHeader writes object header w/ payload part. - // The payload of the object may be incomplete. - // - // Must be called exactly once. Control remains with the caller. - // Missing a call or re-calling can lead to undefined behavior - // that depends on the implementation. - // - // Must not be called after Close call. - WriteHeader(context.Context, *object.Object) error - - // Write writes object payload chunk. - // - // Can be called multiple times. - // - // Must not be called after Close call. - Write(ctx context.Context, p []byte) (n int, err error) - - // Close is used to finish object writing. - // - // Close must return access identifiers of the object - // that has been written. - // - // Must be called no more than once. Control remains with the caller. - // Re-calling can lead to undefined behavior - // that depends on the implementation. - Close(ctx context.Context) (*AccessIdentifiers, error) -} - -// TargetInitializer represents ObjectTarget constructor. -type TargetInitializer func() ObjectTarget - -// SelfID returns identifier of the written object. -func (a AccessIdentifiers) SelfID() oid.ID { - return a.self -} - -// WithSelfID returns AccessIdentifiers with passed self identifier. -func (a *AccessIdentifiers) WithSelfID(v oid.ID) *AccessIdentifiers { - res := a - if res == nil { - res = new(AccessIdentifiers) - } - - res.self = v - - return res -} - -// ParentID return identifier of the parent of the written object. -func (a *AccessIdentifiers) ParentID() *oid.ID { - if a != nil { - return a.par - } - - return nil -} - -// WithParentID returns AccessIdentifiers with passed parent identifier. -func (a *AccessIdentifiers) WithParentID(v *oid.ID) *AccessIdentifiers { - res := a - if res == nil { - res = new(AccessIdentifiers) - } - - res.par = v - - return res -} - -// Parent return identifier of the parent of the written object. -func (a *AccessIdentifiers) Parent() *object.Object { - if a != nil { - return a.parHdr - } - - return nil -} - -// WithParent returns AccessIdentifiers with passed parent identifier. -func (a *AccessIdentifiers) WithParent(v *object.Object) *AccessIdentifiers { - res := a - if res == nil { - res = new(AccessIdentifiers) - } - - res.parHdr = v - - return res -} diff --git a/pkg/services/object_manager/transformer/writer.go b/pkg/services/object_manager/transformer/writer.go deleted file mode 100644 index 27aed16ff..000000000 --- a/pkg/services/object_manager/transformer/writer.go +++ /dev/null @@ -1,52 +0,0 @@ -package transformer - -import ( - "context" - "io" -) - -type writer interface { - Write(ctx context.Context, p []byte) (n int, err error) -} - -type multiWriter struct { - writers []writer -} - -func (t *multiWriter) Write(ctx context.Context, p []byte) (n int, err error) { - for _, w := range t.writers { - n, err = w.Write(ctx, p) - if err != nil { - return - } - if n != len(p) { - err = io.ErrShortWrite - return - } - } - return len(p), nil -} - -func newMultiWriter(writers ...writer) writer { - allWriters := make([]writer, 0, len(writers)) - for _, w := range writers { - if mw, ok := w.(*multiWriter); ok { - allWriters = append(allWriters, mw.writers...) - } else { - allWriters = append(allWriters, w) - } - } - return &multiWriter{allWriters} -} - -type writerWrapper struct { - Writer io.Writer -} - -func (w *writerWrapper) Write(_ context.Context, p []byte) (n int, err error) { - return w.Writer.Write(p) -} - -func newWriter(w io.Writer) writer { - return &writerWrapper{Writer: w} -}