forked from TrueCloudLab/frostfs-node
[#235] object/getrangehash: Implement new service processing
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
0fc5ea674c
commit
7e56427534
7 changed files with 151 additions and 31 deletions
|
@ -28,8 +28,6 @@ import (
|
|||
headsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/head/v2"
|
||||
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
|
||||
putsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/put/v2"
|
||||
rangehashsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/rangehash"
|
||||
rangehashsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/rangehash/v2"
|
||||
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
|
||||
searchsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/v2"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
|
@ -50,8 +48,6 @@ type objectSvc struct {
|
|||
|
||||
get *getsvcV2.Service
|
||||
|
||||
rngHash *rangehashsvcV2.Service
|
||||
|
||||
delete *deletesvcV2.Service
|
||||
}
|
||||
|
||||
|
@ -164,7 +160,7 @@ func (s *objectSvc) GetRange(req *object.GetRangeRequest, stream objectService.G
|
|||
}
|
||||
|
||||
func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
|
||||
return s.rngHash.GetRangeHash(ctx, req)
|
||||
return s.get.GetRangeHash(ctx, req)
|
||||
}
|
||||
|
||||
type localObjectRemover struct {
|
||||
|
@ -370,26 +366,6 @@ func initObjectService(c *cfg) {
|
|||
getsvcV2.WithKeyStorage(keyStorage),
|
||||
)
|
||||
|
||||
sRangeHash := rangehashsvc.NewService(
|
||||
rangehashsvc.WithKeyStorage(keyStorage),
|
||||
rangehashsvc.WithClientCache(clientCache),
|
||||
rangehashsvc.WithLocalStorage(ls),
|
||||
rangehashsvc.WithContainerSource(c.cfgObject.cnrStorage),
|
||||
rangehashsvc.WithNetworkMapSource(c.cfgObject.netMapStorage),
|
||||
rangehashsvc.WithLocalAddressSource(c),
|
||||
rangehashsvc.WithHeadService(sHead),
|
||||
rangehashsvc.WithRangeService(sGet),
|
||||
rangehashsvc.WithWorkerPool(c.cfgObject.pool.rngHash),
|
||||
rangehashsvc.WithLogger(c.log),
|
||||
rangehashsvc.WithClientOptions(
|
||||
client.WithDialTimeout(c.viper.GetDuration(cfgObjectRangeHashDialTimeout)),
|
||||
),
|
||||
)
|
||||
|
||||
sRangeHashV2 := rangehashsvcV2.NewService(
|
||||
rangehashsvcV2.WithInternalService(sRangeHash),
|
||||
)
|
||||
|
||||
sDelete := deletesvc.NewService(
|
||||
deletesvc.WithKeyStorage(keyStorage),
|
||||
deletesvc.WitHeadService(sHead),
|
||||
|
@ -425,12 +401,11 @@ func initObjectService(c *cfg) {
|
|||
c.cfgGRPC.maxChunkSize,
|
||||
c.cfgGRPC.maxAddrAmount,
|
||||
&objectSvc{
|
||||
put: sPutV2,
|
||||
search: sSearchV2,
|
||||
head: sHeadV2,
|
||||
get: sGetV2,
|
||||
rngHash: sRangeHashV2,
|
||||
delete: sDeleteV2,
|
||||
put: sPutV2,
|
||||
search: sSearchV2,
|
||||
head: sHeadV2,
|
||||
get: sGetV2,
|
||||
delete: sDeleteV2,
|
||||
},
|
||||
),
|
||||
c.respSvc,
|
||||
|
|
|
@ -19,6 +19,37 @@ func (s *Service) GetRange(ctx context.Context, prm RangePrm) error {
|
|||
return s.get(ctx, prm).err
|
||||
}
|
||||
|
||||
func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHashRes, error) {
|
||||
hashes := make([][]byte, 0, len(prm.rngs))
|
||||
|
||||
for _, rng := range prm.rngs {
|
||||
h := prm.hashGen()
|
||||
|
||||
// TODO: calculating of homomorphic hash (TZ) for "big" ranges can be optimized
|
||||
// by "smaller" range hash requests spawn and response concatenation.
|
||||
// NOTE: for non-homomorphic hashes (SHA256) this won't work with split-range.
|
||||
|
||||
rngPrm := RangePrm{
|
||||
commonPrm: prm.commonPrm,
|
||||
}
|
||||
|
||||
rngPrm.SetRange(rng)
|
||||
rngPrm.SetChunkWriter(&hasherWrapper{
|
||||
hash: h,
|
||||
})
|
||||
|
||||
if err := s.GetRange(ctx, rngPrm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hashes = append(hashes, h.Sum(nil))
|
||||
}
|
||||
|
||||
return &RangeHashRes{
|
||||
hashes: hashes,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Service) get(ctx context.Context, prm RangePrm) statusError {
|
||||
exec := &execCtx{
|
||||
svc: s,
|
||||
|
|
|
@ -2,6 +2,7 @@ package getsvc
|
|||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"hash"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
|
@ -21,6 +22,15 @@ type RangePrm struct {
|
|||
rng *objectSDK.Range
|
||||
}
|
||||
|
||||
// RangeHashPrm groups parameters of GetRange service call.
|
||||
type RangeHashPrm struct {
|
||||
commonPrm
|
||||
|
||||
hashGen func() hash.Hash
|
||||
|
||||
rngs []*objectSDK.Range
|
||||
}
|
||||
|
||||
type commonPrm struct {
|
||||
objWriter ObjectWriter
|
||||
|
||||
|
@ -72,3 +82,13 @@ func (p *RangePrm) SetChunkWriter(w ChunkWriter) {
|
|||
func (p *RangePrm) SetRange(rng *objectSDK.Range) {
|
||||
p.rng = rng
|
||||
}
|
||||
|
||||
// SetRangeList sets list of object payload ranges.
|
||||
func (p *RangeHashPrm) SetRangeList(rngs []*objectSDK.Range) {
|
||||
p.rngs = rngs
|
||||
}
|
||||
|
||||
// SetHashGenerator sets constructor of hashing algorithm.
|
||||
func (p *RangeHashPrm) SetHashGenerator(v func() hash.Hash) {
|
||||
p.hashGen = v
|
||||
}
|
||||
|
|
9
pkg/services/object/get/res.go
Normal file
9
pkg/services/object/get/res.go
Normal file
|
@ -0,0 +1,9 @@
|
|||
package getsvc
|
||||
|
||||
type RangeHashRes struct {
|
||||
hashes [][]byte
|
||||
}
|
||||
|
||||
func (r *RangeHashRes) Hashes() [][]byte {
|
||||
return r.hashes
|
||||
}
|
|
@ -3,6 +3,7 @@ package getsvc
|
|||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"hash"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
|
@ -42,6 +43,10 @@ type rangeWriter struct {
|
|||
chunkWriter ChunkWriter
|
||||
}
|
||||
|
||||
type hasherWrapper struct {
|
||||
hash hash.Hash
|
||||
}
|
||||
|
||||
func newSimpleObjectWriter() *simpleObjectWriter {
|
||||
return &simpleObjectWriter{
|
||||
obj: object.NewRaw(),
|
||||
|
@ -156,3 +161,8 @@ func payloadOnlyObject(payload []byte) *objectSDK.Object {
|
|||
|
||||
return rawObj.Object().SDK()
|
||||
}
|
||||
|
||||
func (h *hasherWrapper) WriteChunk(p []byte) error {
|
||||
_, err := h.hash.Write(p)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object"
|
||||
|
@ -74,6 +76,21 @@ func (s *Service) GetRange(req *objectV2.GetRangeRequest, stream objectSvc.GetOb
|
|||
}
|
||||
}
|
||||
|
||||
// GetRangeHash calls internal service and returns v2 response.
|
||||
func (s *Service) GetRangeHash(ctx context.Context, req *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) {
|
||||
p, err := s.toHashRangePrm(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res, err := s.svc.GetRangeHash(ctx, *p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return toHashResponse(req.GetBody().GetType(), res), nil
|
||||
}
|
||||
|
||||
func WithInternalService(v *getsvc.Service) Option {
|
||||
return func(c *cfg) {
|
||||
c.svc = v
|
||||
|
|
|
@ -1,14 +1,20 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"hash"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
||||
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/refs"
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/session"
|
||||
objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object"
|
||||
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
|
||||
"github.com/nspcc-dev/tzhash/tz"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) {
|
||||
|
@ -52,6 +58,46 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
|
|||
return p, nil
|
||||
}
|
||||
|
||||
func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.RangeHashPrm, error) {
|
||||
meta := req.GetMetaHeader()
|
||||
|
||||
key, err := s.keyStorage.GetKey(token.NewSessionTokenFromV2(meta.GetSessionToken()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p := new(getsvc.RangeHashPrm)
|
||||
p.SetPrivateKey(key)
|
||||
|
||||
body := req.GetBody()
|
||||
p.WithAddress(object.NewAddressFromV2(body.GetAddress()))
|
||||
p.SetRemoteCallOptions(remoteCallOptionsFromMeta(meta)...)
|
||||
|
||||
rngsV2 := body.GetRanges()
|
||||
rngs := make([]*object.Range, 0, len(rngsV2))
|
||||
|
||||
for i := range rngsV2 {
|
||||
rngs = append(rngs, object.NewRangeFromV2(rngsV2[i]))
|
||||
}
|
||||
|
||||
p.SetRangeList(rngs)
|
||||
|
||||
switch t := body.GetType(); t {
|
||||
default:
|
||||
return nil, errors.Errorf("unknown checksum type %v", t)
|
||||
case refs.SHA256:
|
||||
p.SetHashGenerator(func() hash.Hash {
|
||||
return sha256.New()
|
||||
})
|
||||
case refs.TillichZemor:
|
||||
p.SetHashGenerator(func() hash.Hash {
|
||||
return tz.New()
|
||||
})
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// can be shared accross all services
|
||||
func remoteCallOptionsFromMeta(meta *session.RequestMetaHeader) []client.CallOption {
|
||||
xHdrs := meta.GetXHeaders()
|
||||
|
@ -92,3 +138,15 @@ func splitInfoRangeResponse(info *object.SplitInfo) *objectV2.GetRangeResponse {
|
|||
|
||||
return resp
|
||||
}
|
||||
|
||||
func toHashResponse(typ refs.ChecksumType, res *getsvc.RangeHashRes) *objectV2.GetRangeHashResponse {
|
||||
resp := new(objectV2.GetRangeHashResponse)
|
||||
|
||||
body := new(objectV2.GetRangeHashResponseBody)
|
||||
resp.SetBody(body)
|
||||
|
||||
body.SetType(typ)
|
||||
body.SetHashList(res.Hashes())
|
||||
|
||||
return resp
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue