From 121137c62b645927879301dad4fd63f18c02856d Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 16 Sep 2020 17:46:31 +0300 Subject: [PATCH] [#30] object_manager: Implement object transformers Define object writer interface. Implement payload size limiter that restricts payload size of the object. Implement object format moulder that fill verification fields and finalizes object structure. Signed-off-by: Leonard Lyubich --- go.mod | 1 + go.sum | 7 +- .../object_manager/transformer/.gitkeep | 0 .../object_manager/transformer/fmt.go | 97 +++++++ .../object_manager/transformer/transformer.go | 252 ++++++++++++++++++ .../object_manager/transformer/types.go | 90 +++++++ 6 files changed, 443 insertions(+), 4 deletions(-) delete mode 100644 pkg/services/object_manager/transformer/.gitkeep create mode 100644 pkg/services/object_manager/transformer/fmt.go create mode 100644 pkg/services/object_manager/transformer/transformer.go create mode 100644 pkg/services/object_manager/transformer/types.go diff --git a/go.mod b/go.mod index 42686c0c..a38a0750 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/nspcc-dev/neo-go v0.91.1-pre.0.20200827184617-7560aa345a78 github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200916115135-ff325b877023 github.com/nspcc-dev/neofs-crypto v0.3.0 + github.com/nspcc-dev/tzhash v1.4.0 github.com/panjf2000/ants/v2 v2.3.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.6.0 diff --git a/go.sum b/go.sum index 2edcf1a5..339114a6 100644 --- a/go.sum +++ b/go.sum @@ -267,10 +267,6 @@ github.com/nspcc-dev/neo-go v0.73.1-pre.0.20200303142215-f5a1b928ce09/go.mod h1: github.com/nspcc-dev/neo-go v0.91.0/go.mod h1:G6HdOWvzQ6tlvFdvFSN/PgCzLPN/X/X4d5hTjFRUDcc= github.com/nspcc-dev/neo-go v0.91.1-pre.0.20200827184617-7560aa345a78 h1:stIa+nBXK8uDY/JZaxIZzAUfkzfaotVw2FbnHxO4aZI= github.com/nspcc-dev/neo-go v0.91.1-pre.0.20200827184617-7560aa345a78/go.mod h1:G6HdOWvzQ6tlvFdvFSN/PgCzLPN/X/X4d5hTjFRUDcc= -github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200902121740-5a6dff8c83ba h1:S8YfqJ2F8o4KY5dP7DcBNXdJsWtTK/vtD9mcF4J/g6Y= -github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200902121740-5a6dff8c83ba/go.mod h1:f5Z4UggDgVgZXnDNr2ItdGwBpVYJBKlm5Yyu2XYEiAc= -github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200911095622-47fd771ee4c5 h1:+/VTNh3tHfuBHF+Xyf/QVO/U7QTcWyJ0rPHl+b8i0kU= -github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200911095622-47fd771ee4c5/go.mod h1:FsFd1z4YzoEgPlltsUgnqna9qhcF87RHYjot0pby2L4= github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200916115135-ff325b877023 h1:tltnqudivH6TBzs4DEouLx9rwPUBuvn7bjm4EZyosUc= github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200916115135-ff325b877023/go.mod h1:FsFd1z4YzoEgPlltsUgnqna9qhcF87RHYjot0pby2L4= github.com/nspcc-dev/neofs-crypto v0.2.0/go.mod h1:F/96fUzPM3wR+UGsPi3faVNmFlA9KAEAUQR7dMxZmNA= @@ -280,6 +276,8 @@ github.com/nspcc-dev/neofs-crypto v0.3.0/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BE github.com/nspcc-dev/rfc6979 v0.1.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso= github.com/nspcc-dev/rfc6979 v0.2.0 h1:3e1WNxrN60/6N0DW7+UYisLeZJyfqZTNOjeV/toYvOE= github.com/nspcc-dev/rfc6979 v0.2.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso= +github.com/nspcc-dev/tzhash v1.4.0 h1:RVIR+mxOBHl58CE99+DXtE31ylD5PEkZSoWqoj4fVjg= +github.com/nspcc-dev/tzhash v1.4.0/go.mod h1:Z8gp/VZbyWgPhaMp/KTmeoW5UTynp/N60g0jTtSzBws= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -488,6 +486,7 @@ golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f h1:gWF768j/LaZugp8dyS4UwsslYCYz9XgFxvlgsn0n9H8= diff --git a/pkg/services/object_manager/transformer/.gitkeep b/pkg/services/object_manager/transformer/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/pkg/services/object_manager/transformer/fmt.go b/pkg/services/object_manager/transformer/fmt.go new file mode 100644 index 00000000..5333025e --- /dev/null +++ b/pkg/services/object_manager/transformer/fmt.go @@ -0,0 +1,97 @@ +package transformer + +import ( + "crypto/ecdsa" + + "github.com/nspcc-dev/neofs-api-go/pkg" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/pkg/errors" +) + +type formatter struct { + nextTarget ObjectTarget + + key *ecdsa.PrivateKey + + obj *object.RawObject + + sz uint64 +} + +// NewFormatTarget returns ObjectTarget instance that finalizes object structure +// and writes it to the next target. +// +// Chunks must be written before the WriteHeader call. +// +// Object changes: +// - sets version to current SDK version; +// - sets payload size to the total length of all written chunks; +// - calculates and sets verification fields (ID, Signature). +func NewFormatTarget(key *ecdsa.PrivateKey, nextTarget ObjectTarget) ObjectTarget { + return &formatter{ + nextTarget: nextTarget, + key: key, + } +} + +func (f *formatter) WriteHeader(obj *object.RawObject) error { + f.obj = obj + + return nil +} + +func (f *formatter) Write(p []byte) (n int, err error) { + n, err = f.nextTarget.Write(p) + + f.sz += uint64(n) + + return +} + +func (f *formatter) Close() (*AccessIdentifiers, error) { + f.obj.SetVersion(pkg.SDKVersion()) + f.obj.SetPayloadSize(f.sz) + + var parID *objectSDK.ID + + if par := f.obj.GetParent(); par != nil && par.ToV2().GetHeader() != nil { + rawPar := objectSDK.NewRawFromV2(par.ToV2()) + + if err := setIDAndSignature(f.key, rawPar); err != nil { + return nil, errors.Wrap(err, "could not finalize parent object") + } + + parID = rawPar.GetID() + + f.obj.SetParent(rawPar.Object()) + } + + if err := setIDAndSignature(f.key, f.obj.SDK()); err != nil { + return nil, errors.Wrap(err, "could not finalize object") + } + + if err := f.nextTarget.WriteHeader(f.obj); err != nil { + return nil, errors.Wrap(err, "could not write header to next target") + } + + if _, err := f.nextTarget.Close(); err != nil { + return nil, errors.Wrap(err, "could not close next target") + } + + return new(AccessIdentifiers). + WithSelfID(f.obj.GetID()). + WithParentID(parID), nil +} + +func setIDAndSignature(key *ecdsa.PrivateKey, obj *objectSDK.RawObject) error { + if err := objectSDK.CalculateAndSetID(obj); err != nil { + return errors.Wrap(err, "could not set identifier") + } + + if err := objectSDK.CalculateAndSetSignature(key, obj); err != nil { + return errors.Wrap(err, "could not set signature") + } + + return nil +} diff --git a/pkg/services/object_manager/transformer/transformer.go b/pkg/services/object_manager/transformer/transformer.go new file mode 100644 index 00000000..761a5a74 --- /dev/null +++ b/pkg/services/object_manager/transformer/transformer.go @@ -0,0 +1,252 @@ +package transformer + +import ( + "crypto/sha256" + "fmt" + "hash" + "io" + + "github.com/nspcc-dev/neofs-api-go/pkg" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/tzhash/tz" + "github.com/pkg/errors" +) + +type payloadSizeLimiter struct { + maxSize, written uint64 + + targetInit func() ObjectTarget + + target ObjectTarget + + current, parent *object.RawObject + + currentHashers, parentHashers []*payloadChecksumHasher + + previous []*objectSDK.ID + + chunkWriter io.Writer +} + +type payloadChecksumHasher struct { + hasher hash.Hash + + checksumWriter func([]byte) +} + +const tzChecksumSize = 64 + +// NewPayloadSizeLimiter returns ObjectTarget instance that restricts payload length +// of the writing object and writes generated objects to targets from initializer. +// +// Objects w/ payload size less or equal than max size remain untouched. +// +// TODO: describe behavior in details. +func NewPayloadSizeLimiter(maxSize uint64, targetInit TargetInitializer) ObjectTarget { + return &payloadSizeLimiter{ + maxSize: maxSize, + targetInit: targetInit, + } +} + +func (s *payloadSizeLimiter) WriteHeader(hdr *object.RawObject) error { + s.current = fromObject(hdr) + + return nil +} + +func (s *payloadSizeLimiter) Write(p []byte) (int, error) { + if err := s.writeChunk(p); err != nil { + return 0, err + } + + return len(p), nil +} + +func (s *payloadSizeLimiter) Close() (*AccessIdentifiers, error) { + return s.release(true) +} + +func (s *payloadSizeLimiter) initialize() { + // if it is an object after the 1st + if ln := len(s.previous); ln > 0 { + // initialize parent object once (after 1st object) + if ln == 1 { + s.parent = s.current + s.parentHashers = s.currentHashers + s.current = fromObject(s.parent) + } + + // set previous object to the last previous identifier + s.current.SetPreviousID(s.previous[ln-1]) + } + + s.initializeCurrent() +} + +func fromObject(obj *object.RawObject) *object.RawObject { + res := object.NewRaw() + res.SetContainerID(obj.GetContainerID()) + res.SetOwnerID(obj.GetOwnerID()) + res.SetAttributes(obj.GetAttributes()...) + + return res +} + +func (s *payloadSizeLimiter) initializeCurrent() { + // initialize current object target + s.target = s.targetInit() + + // create payload hashers + s.currentHashers = []*payloadChecksumHasher{ + { + hasher: sha256.New(), + checksumWriter: func(cs []byte) { + if ln := len(cs); ln != sha256.Size { + panic(fmt.Sprintf("wrong checksum length: expected %d, has %d", ln, sha256.Size)) + } + + csSHA := [sha256.Size]byte{} + copy(csSHA[:], cs) + + checksum := pkg.NewChecksum() + checksum.SetSHA256(csSHA) + + s.current.SetPayloadChecksum(checksum) + }, + }, + { + hasher: tz.New(), + checksumWriter: func(cs []byte) { + if ln := len(cs); ln != tzChecksumSize { + panic(fmt.Sprintf("wrong checksum length: expected %d, has %d", ln, tzChecksumSize)) + } + + csTZ := [tzChecksumSize]byte{} + copy(csTZ[:], cs) + + checksum := pkg.NewChecksum() + checksum.SetTillichZemor(csTZ) + + s.current.SetPayloadHomomorphicHash(checksum) + }, + }, + } + + // compose multi-writer from target and all payload hashers + ws := make([]io.Writer, 0, 1+len(s.currentHashers)+len(s.parentHashers)) + + ws = append(ws, s.target) + + for i := range s.currentHashers { + ws = append(ws, s.currentHashers[i].hasher) + } + + for i := range s.parentHashers { + ws = append(ws, s.parentHashers[i].hasher) + } + + s.chunkWriter = io.MultiWriter(ws...) +} + +func (s *payloadSizeLimiter) release(close bool) (*AccessIdentifiers, error) { + // Arg close is true only from Close method. + // We finalize parent and generate linking objects only if it is more + // than 1 object in split-chain. + withParent := close && len(s.previous) > 0 + + if withParent { + writeHashes(s.parentHashers) + s.parent.SetPayloadSize(s.written) + s.current.SetParent(s.parent.SDK().Object()) + } + + // release current object + writeHashes(s.currentHashers) + + // release current, get its id + if err := s.target.WriteHeader(s.current); err != nil { + return nil, errors.Wrap(err, "could not write header") + } + + ids, err := s.target.Close() + if err != nil { + return nil, errors.Wrap(err, "could not close target") + } + + // save identifier of the released object + s.previous = append(s.previous, ids.SelfID()) + + if withParent { + // generate and release linking object + s.initializeLinking() + s.initializeCurrent() + + if _, err := s.release(false); err != nil { + return nil, errors.Wrap(err, "could not release linking object") + } + } + + return ids, nil +} + +func writeHashes(hashers []*payloadChecksumHasher) { + for i := range hashers { + hashers[i].checksumWriter(hashers[i].hasher.Sum(nil)) + + } +} + +func (s *payloadSizeLimiter) initializeLinking() { + id := s.current.GetParent().GetID() + par := objectSDK.NewRaw() + par.SetID(id) + + s.current = fromObject(s.current) + s.current.SetChildren(s.previous...) + + s.current.SetParent(par.Object()) +} + +func (s *payloadSizeLimiter) writeChunk(chunk []byte) error { + // statement is true if: + // 1. the method is called for the first time; + // 2. the previous write of bytes reached exactly the boundary. + if s.written%s.maxSize == 0 { + // if 2. we need to release current object + if s.written > 0 { + if _, err := s.release(false); err != nil { + return errors.Wrap(err, "could not release object") + } + } + + // initialize another object + s.initialize() + } + + var ( + ln = uint64(len(chunk)) + cut = ln + leftToEdge = s.maxSize - s.written%s.maxSize + ) + + // write bytes no further than the boundary of the current object + if ln > leftToEdge { + cut = leftToEdge + } + + if _, err := s.chunkWriter.Write(chunk[:cut]); err != nil { + return errors.Wrap(err, "could not write chunk to target") + } + + // increase written bytes counter + s.written += cut + + // if there are more bytes in buffer we call method again to start filling another object + if ln > leftToEdge { + return s.writeChunk(chunk[cut:]) + } + + return nil +} diff --git a/pkg/services/object_manager/transformer/types.go b/pkg/services/object_manager/transformer/types.go new file mode 100644 index 00000000..14a6d719 --- /dev/null +++ b/pkg/services/object_manager/transformer/types.go @@ -0,0 +1,90 @@ +package transformer + +import ( + "io" + + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" +) + +// AccessIdentifiers represents group of the object identifiers +// that are returned after writing the object. +// Consists of the ID of the stored object and the ID of the parent object. +type AccessIdentifiers struct { + par, self *objectSDK.ID +} + +// ObjectTarget is an interface of the object writer. +type ObjectTarget interface { + // WriteHeader writes object header w/ payload part. + // The payload of the object may be incomplete. + // + // Must be called exactly once. Control remains with the caller. + // Missing a call or re-calling can lead to undefined behavior + // that depends on the implementation. + // + // Must not be called after Close call. + WriteHeader(*object.RawObject) error + + // Write writes object payload chunk. + // + // Can be called multiple times. + // + // Must not be called after Close call. + io.Writer + + // Close is used to finish object writing. + // + // Close must return access identifiers of the object + // that has been written. + // + // Must be called no more than once. Control remains with the caller. + // Re-calling can lead to undefined behavior + // that depends on the implementation. + Close() (*AccessIdentifiers, error) +} + +// TargetInitializer represents ObjectTarget constructor. +type TargetInitializer func() ObjectTarget + +// SelfID returns identifier of the written object. +func (a *AccessIdentifiers) SelfID() *objectSDK.ID { + if a != nil { + return a.self + } + + return nil +} + +// WithSelfID returns AccessIdentifiers with passed self identifier. +func (a *AccessIdentifiers) WithSelfID(v *objectSDK.ID) *AccessIdentifiers { + res := a + if res == nil { + res = new(AccessIdentifiers) + } + + res.self = v + + return res +} + +// ParentID return identifier of the parent of the written object. +func (a *AccessIdentifiers) ParentID() *objectSDK.ID { + if a != nil { + return a.par + } + + return nil +} + +// WithParentID returns AccessIdentifiers with passed parent identifier. +func (a *AccessIdentifiers) WithParentID(v *objectSDK.ID) *AccessIdentifiers { + res := a + if res == nil { + res = new(AccessIdentifiers) + } + + res.par = v + + return res +}