From 7e56427534a48b3cdb974ffa193433e9c198599a Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 8 Dec 2020 19:18:24 +0300 Subject: [PATCH] [#235] object/getrangehash: Implement new service processing Signed-off-by: Leonard Lyubich --- cmd/neofs-node/object.go | 37 +++-------------- pkg/services/object/get/get.go | 31 ++++++++++++++ pkg/services/object/get/prm.go | 20 +++++++++ pkg/services/object/get/res.go | 9 +++++ pkg/services/object/get/util.go | 10 +++++ pkg/services/object/get/v2/service.go | 17 ++++++++ pkg/services/object/get/v2/util.go | 58 +++++++++++++++++++++++++++ 7 files changed, 151 insertions(+), 31 deletions(-) create mode 100644 pkg/services/object/get/res.go diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index b6e013215..eba8ca713 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -28,8 +28,6 @@ import ( headsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/head/v2" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" putsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/put/v2" - rangehashsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/rangehash" - rangehashsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/rangehash/v2" searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" searchsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/v2" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" @@ -50,8 +48,6 @@ type objectSvc struct { get *getsvcV2.Service - rngHash *rangehashsvcV2.Service - delete *deletesvcV2.Service } @@ -164,7 +160,7 @@ func (s *objectSvc) GetRange(req *object.GetRangeRequest, stream objectService.G } func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { - return s.rngHash.GetRangeHash(ctx, req) + return s.get.GetRangeHash(ctx, req) } type localObjectRemover struct { @@ -370,26 +366,6 @@ func initObjectService(c *cfg) { getsvcV2.WithKeyStorage(keyStorage), ) - sRangeHash := rangehashsvc.NewService( - rangehashsvc.WithKeyStorage(keyStorage), - rangehashsvc.WithClientCache(clientCache), - rangehashsvc.WithLocalStorage(ls), - rangehashsvc.WithContainerSource(c.cfgObject.cnrStorage), - rangehashsvc.WithNetworkMapSource(c.cfgObject.netMapStorage), - rangehashsvc.WithLocalAddressSource(c), - rangehashsvc.WithHeadService(sHead), - rangehashsvc.WithRangeService(sGet), - rangehashsvc.WithWorkerPool(c.cfgObject.pool.rngHash), - rangehashsvc.WithLogger(c.log), - rangehashsvc.WithClientOptions( - client.WithDialTimeout(c.viper.GetDuration(cfgObjectRangeHashDialTimeout)), - ), - ) - - sRangeHashV2 := rangehashsvcV2.NewService( - rangehashsvcV2.WithInternalService(sRangeHash), - ) - sDelete := deletesvc.NewService( deletesvc.WithKeyStorage(keyStorage), deletesvc.WitHeadService(sHead), @@ -425,12 +401,11 @@ func initObjectService(c *cfg) { c.cfgGRPC.maxChunkSize, c.cfgGRPC.maxAddrAmount, &objectSvc{ - put: sPutV2, - search: sSearchV2, - head: sHeadV2, - get: sGetV2, - rngHash: sRangeHashV2, - delete: sDeleteV2, + put: sPutV2, + search: sSearchV2, + head: sHeadV2, + get: sGetV2, + delete: sDeleteV2, }, ), c.respSvc, diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index 47ce1e291..a5de66f20 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -19,6 +19,37 @@ func (s *Service) GetRange(ctx context.Context, prm RangePrm) error { return s.get(ctx, prm).err } +func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHashRes, error) { + hashes := make([][]byte, 0, len(prm.rngs)) + + for _, rng := range prm.rngs { + h := prm.hashGen() + + // TODO: calculating of homomorphic hash (TZ) for "big" ranges can be optimized + // by "smaller" range hash requests spawn and response concatenation. + // NOTE: for non-homomorphic hashes (SHA256) this won't work with split-range. + + rngPrm := RangePrm{ + commonPrm: prm.commonPrm, + } + + rngPrm.SetRange(rng) + rngPrm.SetChunkWriter(&hasherWrapper{ + hash: h, + }) + + if err := s.GetRange(ctx, rngPrm); err != nil { + return nil, err + } + + hashes = append(hashes, h.Sum(nil)) + } + + return &RangeHashRes{ + hashes: hashes, + }, nil +} + func (s *Service) get(ctx context.Context, prm RangePrm) statusError { exec := &execCtx{ svc: s, diff --git a/pkg/services/object/get/prm.go b/pkg/services/object/get/prm.go index f3cb41b9b..5d5600b27 100644 --- a/pkg/services/object/get/prm.go +++ b/pkg/services/object/get/prm.go @@ -2,6 +2,7 @@ package getsvc import ( "crypto/ecdsa" + "hash" "github.com/nspcc-dev/neofs-api-go/pkg/client" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" @@ -21,6 +22,15 @@ type RangePrm struct { rng *objectSDK.Range } +// RangeHashPrm groups parameters of GetRange service call. +type RangeHashPrm struct { + commonPrm + + hashGen func() hash.Hash + + rngs []*objectSDK.Range +} + type commonPrm struct { objWriter ObjectWriter @@ -72,3 +82,13 @@ func (p *RangePrm) SetChunkWriter(w ChunkWriter) { func (p *RangePrm) SetRange(rng *objectSDK.Range) { p.rng = rng } + +// SetRangeList sets list of object payload ranges. +func (p *RangeHashPrm) SetRangeList(rngs []*objectSDK.Range) { + p.rngs = rngs +} + +// SetHashGenerator sets constructor of hashing algorithm. +func (p *RangeHashPrm) SetHashGenerator(v func() hash.Hash) { + p.hashGen = v +} diff --git a/pkg/services/object/get/res.go b/pkg/services/object/get/res.go new file mode 100644 index 000000000..75a5aaedd --- /dev/null +++ b/pkg/services/object/get/res.go @@ -0,0 +1,9 @@ +package getsvc + +type RangeHashRes struct { + hashes [][]byte +} + +func (r *RangeHashRes) Hashes() [][]byte { + return r.hashes +} diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index a437a566e..5b026bd00 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -3,6 +3,7 @@ package getsvc import ( "context" "crypto/ecdsa" + "hash" "github.com/nspcc-dev/neofs-api-go/pkg/client" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" @@ -42,6 +43,10 @@ type rangeWriter struct { chunkWriter ChunkWriter } +type hasherWrapper struct { + hash hash.Hash +} + func newSimpleObjectWriter() *simpleObjectWriter { return &simpleObjectWriter{ obj: object.NewRaw(), @@ -156,3 +161,8 @@ func payloadOnlyObject(payload []byte) *objectSDK.Object { return rawObj.Object().SDK() } + +func (h *hasherWrapper) WriteChunk(p []byte) error { + _, err := h.hash.Write(p) + return err +} diff --git a/pkg/services/object/get/v2/service.go b/pkg/services/object/get/v2/service.go index caf176fcf..27f90ab4f 100644 --- a/pkg/services/object/get/v2/service.go +++ b/pkg/services/object/get/v2/service.go @@ -1,6 +1,8 @@ package getsvc import ( + "context" + "github.com/nspcc-dev/neofs-api-go/pkg/object" objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" @@ -74,6 +76,21 @@ func (s *Service) GetRange(req *objectV2.GetRangeRequest, stream objectSvc.GetOb } } +// GetRangeHash calls internal service and returns v2 response. +func (s *Service) GetRangeHash(ctx context.Context, req *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) { + p, err := s.toHashRangePrm(req) + if err != nil { + return nil, err + } + + res, err := s.svc.GetRangeHash(ctx, *p) + if err != nil { + return nil, err + } + + return toHashResponse(req.GetBody().GetType(), res), nil +} + func WithInternalService(v *getsvc.Service) Option { return func(c *cfg) { c.svc = v diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index 3233f4d61..c823dc3e4 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -1,14 +1,20 @@ package getsvc import ( + "crypto/sha256" + "hash" + "github.com/nspcc-dev/neofs-api-go/pkg" "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/token" objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-api-go/v2/refs" "github.com/nspcc-dev/neofs-api-go/v2/session" objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" + "github.com/nspcc-dev/tzhash/tz" + "github.com/pkg/errors" ) func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) { @@ -52,6 +58,46 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get return p, nil } +func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.RangeHashPrm, error) { + meta := req.GetMetaHeader() + + key, err := s.keyStorage.GetKey(token.NewSessionTokenFromV2(meta.GetSessionToken())) + if err != nil { + return nil, err + } + + p := new(getsvc.RangeHashPrm) + p.SetPrivateKey(key) + + body := req.GetBody() + p.WithAddress(object.NewAddressFromV2(body.GetAddress())) + p.SetRemoteCallOptions(remoteCallOptionsFromMeta(meta)...) + + rngsV2 := body.GetRanges() + rngs := make([]*object.Range, 0, len(rngsV2)) + + for i := range rngsV2 { + rngs = append(rngs, object.NewRangeFromV2(rngsV2[i])) + } + + p.SetRangeList(rngs) + + switch t := body.GetType(); t { + default: + return nil, errors.Errorf("unknown checksum type %v", t) + case refs.SHA256: + p.SetHashGenerator(func() hash.Hash { + return sha256.New() + }) + case refs.TillichZemor: + p.SetHashGenerator(func() hash.Hash { + return tz.New() + }) + } + + return p, nil +} + // can be shared accross all services func remoteCallOptionsFromMeta(meta *session.RequestMetaHeader) []client.CallOption { xHdrs := meta.GetXHeaders() @@ -92,3 +138,15 @@ func splitInfoRangeResponse(info *object.SplitInfo) *objectV2.GetRangeResponse { return resp } + +func toHashResponse(typ refs.ChecksumType, res *getsvc.RangeHashRes) *objectV2.GetRangeHashResponse { + resp := new(objectV2.GetRangeHashResponse) + + body := new(objectV2.GetRangeHashResponseBody) + resp.SetBody(body) + + body.SetType(typ) + body.SetHashList(res.Hashes()) + + return resp +}