diff --git a/go.mod b/go.mod index 2f0244c6e..ae0bc88c1 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.18 require ( git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230418080822-bd44a3f47b85 git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb - git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230418145405-db5b89496d68 + git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230503082209-d4fe9a193d1a git.frostfs.info/TrueCloudLab/hrw v1.2.0 git.frostfs.info/TrueCloudLab/tzhash v1.8.0 github.com/cheggaaa/pb v1.0.29 diff --git a/go.sum b/go.sum index 3e5bcd570..633ac9a18 100644 Binary files a/go.sum and b/go.sum differ diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 4b2056802..b24218621 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -10,10 +10,10 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "go.uber.org/zap" ) @@ -206,8 +206,9 @@ func (t *distributedTarget) iteratePlacement(ctx context.Context) (*transformer. id, _ = t.obj.ID() - return new(transformer.AccessIdentifiers). - WithSelfID(id), nil + return &transformer.AccessIdentifiers{ + SelfID: id, + }, nil } func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *placement.Traverser, addrs []placement.Node, resErr *atomic.Value) bool { diff --git a/pkg/services/object/put/local.go b/pkg/services/object/put/local.go index 7aef9f065..f07122729 100644 --- a/pkg/services/object/put/local.go +++ b/pkg/services/object/put/local.go @@ -5,9 +5,9 @@ import ( "fmt" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" ) // ObjectStorage is an object storage interface. @@ -61,6 +61,7 @@ func (t *localTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers id, _ := t.obj.ID() - return new(transformer.AccessIdentifiers). - WithSelfID(id), nil + return &transformer.AccessIdentifiers{ + SelfID: id, + }, nil } diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/put/remote.go index e7fa124fa..bcc566b74 100644 --- a/pkg/services/object/put/remote.go +++ b/pkg/services/object/put/remote.go @@ -10,9 +10,9 @@ import ( objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" ) type remoteTarget struct { @@ -68,8 +68,7 @@ func (t *remoteTarget) Close(ctx context.Context) (*transformer.AccessIdentifier return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err) } - return new(transformer.AccessIdentifiers). - WithSelfID(res.ID()), nil + return &transformer.AccessIdentifiers{SelfID: res.ID()}, nil } // NewRemoteSender creates, initializes and returns new RemoteSender instance. diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index f1ecd4df1..6d0d8062e 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -10,9 +10,9 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" ) @@ -126,18 +126,14 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error { p.sessionKey = sessionKey p.target = &validatingTarget{ fmt: p.fmtValidator, - nextTarget: transformer.NewPayloadSizeLimiter( - p.maxPayloadSz, - containerSDK.IsHomomorphicHashingDisabled(prm.cnr), - func() transformer.ObjectTarget { - return transformer.NewFormatTarget(&transformer.FormatterParams{ - Key: sessionKey, - NextTarget: p.newCommonTarget(prm), - SessionToken: sToken, - NetworkState: p.networkState, - }) - }, - ), + nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{ + Key: sessionKey, + NextTargetInit: func() transformer.ObjectTarget { return p.newCommonTarget(prm) }, + NetworkState: p.networkState, + MaxSize: p.maxPayloadSz, + WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr), + SessionToken: sToken, + }), } return nil @@ -273,7 +269,7 @@ func (p *Streamer) Close(ctx context.Context) (*PutResponse, error) { return nil, fmt.Errorf("(%T) could not close object target: %w", p, err) } - id := ids.ParentID() + id := ids.ParentID if id != nil { return &PutResponse{ id: *id, @@ -281,6 +277,6 @@ func (p *Streamer) Close(ctx context.Context) (*PutResponse, error) { } return &PutResponse{ - id: ids.SelfID(), + id: ids.SelfID, }, nil } diff --git a/pkg/services/object/put/validation.go b/pkg/services/object/put/validation.go index a4790071a..406304422 100644 --- a/pkg/services/object/put/validation.go +++ b/pkg/services/object/put/validation.go @@ -9,9 +9,9 @@ import ( "hash" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/transformer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "git.frostfs.info/TrueCloudLab/tzhash/tz" ) diff --git a/pkg/services/object_manager/transformer/fmt.go b/pkg/services/object_manager/transformer/fmt.go deleted file mode 100644 index fbe8af2fb..000000000 --- a/pkg/services/object_manager/transformer/fmt.go +++ /dev/null @@ -1,114 +0,0 @@ -package transformer - -import ( - "context" - "crypto/ecdsa" - "fmt" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" -) - -type formatter struct { - prm *FormatterParams - - obj *object.Object - - sz uint64 -} - -// FormatterParams groups NewFormatTarget parameters. -type FormatterParams struct { - Key *ecdsa.PrivateKey - - NextTarget ObjectTarget - - SessionToken *session.Object - - NetworkState netmap.State -} - -// NewFormatTarget returns ObjectTarget instance that finalizes object structure -// and writes it to the next target. -// -// Chunks must be written before the WriteHeader call. -// -// Object changes: -// - sets version to current SDK version; -// - sets payload size to the total length of all written chunks; -// - sets session token; -// - sets number of creation epoch; -// - calculates and sets verification fields (ID, Signature). -func NewFormatTarget(p *FormatterParams) ObjectTarget { - return &formatter{ - prm: p, - } -} - -func (f *formatter) WriteHeader(_ context.Context, obj *object.Object) error { - f.obj = obj - - return nil -} - -func (f *formatter) Write(ctx context.Context, p []byte) (n int, err error) { - n, err = f.prm.NextTarget.Write(ctx, p) - - f.sz += uint64(n) - - return -} - -func (f *formatter) Close(ctx context.Context) (*AccessIdentifiers, error) { - curEpoch := f.prm.NetworkState.CurrentEpoch() - ver := version.Current() - - f.obj.SetVersion(&ver) - f.obj.SetPayloadSize(f.sz) - f.obj.SetSessionToken(f.prm.SessionToken) - f.obj.SetCreationEpoch(curEpoch) - - var ( - parID *oid.ID - parHdr *object.Object - ) - - if par := f.obj.Parent(); par != nil && par.Signature() == nil { - rawPar := object.NewFromV2(par.ToV2()) - - rawPar.SetSessionToken(f.prm.SessionToken) - rawPar.SetCreationEpoch(curEpoch) - - if err := object.SetIDWithSignature(*f.prm.Key, rawPar); err != nil { - return nil, fmt.Errorf("could not finalize parent object: %w", err) - } - - id, _ := rawPar.ID() - parID = &id - parHdr = rawPar - - f.obj.SetParent(parHdr) - } - - if err := object.SetIDWithSignature(*f.prm.Key, f.obj); err != nil { - return nil, fmt.Errorf("could not finalize object: %w", err) - } - - if err := f.prm.NextTarget.WriteHeader(ctx, f.obj); err != nil { - return nil, fmt.Errorf("could not write header to next target: %w", err) - } - - if _, err := f.prm.NextTarget.Close(ctx); err != nil { - return nil, fmt.Errorf("could not close next target: %w", err) - } - - id, _ := f.obj.ID() - - return new(AccessIdentifiers). - WithSelfID(id). - WithParentID(parID). - WithParent(parHdr), nil -} diff --git a/pkg/services/object_manager/transformer/transformer.go b/pkg/services/object_manager/transformer/transformer.go deleted file mode 100644 index c23b4dca7..000000000 --- a/pkg/services/object_manager/transformer/transformer.go +++ /dev/null @@ -1,294 +0,0 @@ -package transformer - -import ( - "context" - "crypto/sha256" - "fmt" - "hash" - - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "git.frostfs.info/TrueCloudLab/tzhash/tz" -) - -type payloadSizeLimiter struct { - maxSize, written uint64 - - withoutHomomorphicHash bool - - targetInit func() ObjectTarget - - target ObjectTarget - - current, parent *object.Object - - currentHashers, parentHashers []*payloadChecksumHasher - - previous []oid.ID - - chunkWriter writer - - splitID *object.SplitID - - parAttrs []object.Attribute -} - -type payloadChecksumHasher struct { - hasher hash.Hash - - checksumWriter func([]byte) -} - -// NewPayloadSizeLimiter returns ObjectTarget instance that restricts payload length -// of the writing object and writes generated objects to targets from initializer. -// -// Calculates and adds homomorphic hash to resulting objects only if withoutHomomorphicHash -// is false. -// -// Objects w/ payload size less or equal than max size remain untouched. -func NewPayloadSizeLimiter(maxSize uint64, withoutHomomorphicHash bool, targetInit TargetInitializer) ObjectTarget { - return &payloadSizeLimiter{ - maxSize: maxSize, - withoutHomomorphicHash: withoutHomomorphicHash, - targetInit: targetInit, - splitID: object.NewSplitID(), - } -} - -func (s *payloadSizeLimiter) WriteHeader(_ context.Context, hdr *object.Object) error { - s.current = fromObject(hdr) - - s.initialize() - - return nil -} - -func (s *payloadSizeLimiter) Write(ctx context.Context, p []byte) (int, error) { - if err := s.writeChunk(ctx, p); err != nil { - return 0, err - } - - return len(p), nil -} - -func (s *payloadSizeLimiter) Close(ctx context.Context) (*AccessIdentifiers, error) { - return s.release(ctx, true) -} - -func (s *payloadSizeLimiter) initialize() { - // if it is an object after the 1st - if ln := len(s.previous); ln > 0 { - // initialize parent object once (after 1st object) - if ln == 1 { - s.detachParent() - } - - // set previous object to the last previous identifier - s.current.SetPreviousID(s.previous[ln-1]) - } - - s.initializeCurrent() -} - -func fromObject(obj *object.Object) *object.Object { - cnr, _ := obj.ContainerID() - - res := object.New() - res.SetContainerID(cnr) - res.SetOwnerID(obj.OwnerID()) - res.SetAttributes(obj.Attributes()...) - res.SetType(obj.Type()) - - // obj.SetSplitID creates splitHeader but we don't need to do it in case - // of small objects, so we should make nil check. - if obj.SplitID() != nil { - res.SetSplitID(obj.SplitID()) - } - - return res -} - -func (s *payloadSizeLimiter) initializeCurrent() { - // initialize current object target - s.target = s.targetInit() - - // create payload hashers - s.currentHashers = payloadHashersForObject(s.current, s.withoutHomomorphicHash) - - // compose multi-writer from target and all payload hashers - ws := make([]writer, 0, 1+len(s.currentHashers)+len(s.parentHashers)) - - ws = append(ws, s.target) - - for i := range s.currentHashers { - ws = append(ws, newWriter(s.currentHashers[i].hasher)) - } - - for i := range s.parentHashers { - ws = append(ws, newWriter(s.parentHashers[i].hasher)) - } - - s.chunkWriter = newMultiWriter(ws...) -} - -func payloadHashersForObject(obj *object.Object, withoutHomomorphicHash bool) []*payloadChecksumHasher { - hashers := make([]*payloadChecksumHasher, 0, 2) - - hashers = append(hashers, &payloadChecksumHasher{ - hasher: sha256.New(), - checksumWriter: func(binChecksum []byte) { - if ln := len(binChecksum); ln != sha256.Size { - panic(fmt.Sprintf("wrong checksum length: expected %d, has %d", sha256.Size, ln)) - } - - csSHA := [sha256.Size]byte{} - copy(csSHA[:], binChecksum) - - var cs checksum.Checksum - cs.SetSHA256(csSHA) - - obj.SetPayloadChecksum(cs) - }, - }) - - if !withoutHomomorphicHash { - hashers = append(hashers, &payloadChecksumHasher{ - hasher: tz.New(), - checksumWriter: func(binChecksum []byte) { - if ln := len(binChecksum); ln != tz.Size { - panic(fmt.Sprintf("wrong checksum length: expected %d, has %d", tz.Size, ln)) - } - - csTZ := [tz.Size]byte{} - copy(csTZ[:], binChecksum) - - var cs checksum.Checksum - cs.SetTillichZemor(csTZ) - - obj.SetPayloadHomomorphicHash(cs) - }, - }) - } - - return hashers -} - -func (s *payloadSizeLimiter) release(ctx context.Context, finalize bool) (*AccessIdentifiers, error) { - // Arg finalize is true only when called from Close method. - // We finalize parent and generate linking objects only if it is more - // than 1 object in split-chain. - withParent := finalize && len(s.previous) > 0 - - if withParent { - writeHashes(s.parentHashers) - s.parent.SetPayloadSize(s.written) - s.current.SetParent(s.parent) - } - - // release current object - writeHashes(s.currentHashers) - - // release current, get its id - if err := s.target.WriteHeader(ctx, s.current); err != nil { - return nil, fmt.Errorf("could not write header: %w", err) - } - - ids, err := s.target.Close(ctx) - if err != nil { - return nil, fmt.Errorf("could not close target: %w", err) - } - - // save identifier of the released object - s.previous = append(s.previous, ids.SelfID()) - - if withParent { - // generate and release linking object - s.initializeLinking(ids.Parent()) - s.initializeCurrent() - - if _, err := s.release(ctx, false); err != nil { - return nil, fmt.Errorf("could not release linking object: %w", err) - } - } - - return ids, nil -} - -func writeHashes(hashers []*payloadChecksumHasher) { - for i := range hashers { - hashers[i].checksumWriter(hashers[i].hasher.Sum(nil)) - } -} - -func (s *payloadSizeLimiter) initializeLinking(parHdr *object.Object) { - s.current = fromObject(s.current) - s.current.SetParent(parHdr) - s.current.SetChildren(s.previous...) - s.current.SetSplitID(s.splitID) -} - -func (s *payloadSizeLimiter) writeChunk(ctx context.Context, chunk []byte) error { - // statement is true if the previous write of bytes reached exactly the boundary. - if s.written > 0 && s.written%s.maxSize == 0 { - if s.written == s.maxSize { - s.prepareFirstChild() - } - - // we need to release current object - if _, err := s.release(ctx, false); err != nil { - return fmt.Errorf("could not release object: %w", err) - } - - // initialize another object - s.initialize() - } - - var ( - ln = uint64(len(chunk)) - cut = ln - leftToEdge = s.maxSize - s.written%s.maxSize - ) - - // write bytes no further than the boundary of the current object - if ln > leftToEdge { - cut = leftToEdge - } - - if _, err := s.chunkWriter.Write(ctx, chunk[:cut]); err != nil { - return fmt.Errorf("could not write chunk to target: %w", err) - } - - // increase written bytes counter - s.written += cut - - // if there are more bytes in buffer we call method again to start filling another object - if ln > leftToEdge { - return s.writeChunk(ctx, chunk[cut:]) - } - - return nil -} - -func (s *payloadSizeLimiter) prepareFirstChild() { - // initialize split header with split ID on first object in chain - s.current.InitRelations() - s.current.SetSplitID(s.splitID) - - // cut source attributes - s.parAttrs = s.current.Attributes() - s.current.SetAttributes() - - // attributes will be added to parent in detachParent -} - -func (s *payloadSizeLimiter) detachParent() { - s.parent = s.current - s.current = fromObject(s.parent) - s.parent.ResetRelations() - s.parent.SetSignature(nil) - s.parentHashers = s.currentHashers - - // return source attributes - s.parent.SetAttributes(s.parAttrs...) -} diff --git a/pkg/services/object_manager/transformer/types.go b/pkg/services/object_manager/transformer/types.go deleted file mode 100644 index 73cea5216..000000000 --- a/pkg/services/object_manager/transformer/types.go +++ /dev/null @@ -1,111 +0,0 @@ -package transformer - -import ( - "context" - - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" -) - -// AccessIdentifiers represents group of the object identifiers -// that are returned after writing the object. -// Consists of the ID of the stored object and the ID of the parent object. -type AccessIdentifiers struct { - par *oid.ID - - self oid.ID - - parHdr *object.Object -} - -// ObjectTarget is an interface of the object writer. -type ObjectTarget interface { - // WriteHeader writes object header w/ payload part. - // The payload of the object may be incomplete. - // - // Must be called exactly once. Control remains with the caller. - // Missing a call or re-calling can lead to undefined behavior - // that depends on the implementation. - // - // Must not be called after Close call. - WriteHeader(context.Context, *object.Object) error - - // Write writes object payload chunk. - // - // Can be called multiple times. - // - // Must not be called after Close call. - Write(ctx context.Context, p []byte) (n int, err error) - - // Close is used to finish object writing. - // - // Close must return access identifiers of the object - // that has been written. - // - // Must be called no more than once. Control remains with the caller. - // Re-calling can lead to undefined behavior - // that depends on the implementation. - Close(ctx context.Context) (*AccessIdentifiers, error) -} - -// TargetInitializer represents ObjectTarget constructor. -type TargetInitializer func() ObjectTarget - -// SelfID returns identifier of the written object. -func (a AccessIdentifiers) SelfID() oid.ID { - return a.self -} - -// WithSelfID returns AccessIdentifiers with passed self identifier. -func (a *AccessIdentifiers) WithSelfID(v oid.ID) *AccessIdentifiers { - res := a - if res == nil { - res = new(AccessIdentifiers) - } - - res.self = v - - return res -} - -// ParentID return identifier of the parent of the written object. -func (a *AccessIdentifiers) ParentID() *oid.ID { - if a != nil { - return a.par - } - - return nil -} - -// WithParentID returns AccessIdentifiers with passed parent identifier. -func (a *AccessIdentifiers) WithParentID(v *oid.ID) *AccessIdentifiers { - res := a - if res == nil { - res = new(AccessIdentifiers) - } - - res.par = v - - return res -} - -// Parent return identifier of the parent of the written object. -func (a *AccessIdentifiers) Parent() *object.Object { - if a != nil { - return a.parHdr - } - - return nil -} - -// WithParent returns AccessIdentifiers with passed parent identifier. -func (a *AccessIdentifiers) WithParent(v *object.Object) *AccessIdentifiers { - res := a - if res == nil { - res = new(AccessIdentifiers) - } - - res.parHdr = v - - return res -} diff --git a/pkg/services/object_manager/transformer/writer.go b/pkg/services/object_manager/transformer/writer.go deleted file mode 100644 index 27aed16ff..000000000 --- a/pkg/services/object_manager/transformer/writer.go +++ /dev/null @@ -1,52 +0,0 @@ -package transformer - -import ( - "context" - "io" -) - -type writer interface { - Write(ctx context.Context, p []byte) (n int, err error) -} - -type multiWriter struct { - writers []writer -} - -func (t *multiWriter) Write(ctx context.Context, p []byte) (n int, err error) { - for _, w := range t.writers { - n, err = w.Write(ctx, p) - if err != nil { - return - } - if n != len(p) { - err = io.ErrShortWrite - return - } - } - return len(p), nil -} - -func newMultiWriter(writers ...writer) writer { - allWriters := make([]writer, 0, len(writers)) - for _, w := range writers { - if mw, ok := w.(*multiWriter); ok { - allWriters = append(allWriters, mw.writers...) - } else { - allWriters = append(allWriters, w) - } - } - return &multiWriter{allWriters} -} - -type writerWrapper struct { - Writer io.Writer -} - -func (w *writerWrapper) Write(_ context.Context, p []byte) (n int, err error) { - return w.Writer.Write(p) -} - -func newWriter(w io.Writer) writer { - return &writerWrapper{Writer: w} -}