[#53] services/object: Implement GetRangeHash service

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-09-28 16:22:13 +03:00 committed by Alex Vanin
parent 9d8576d397
commit 0a51263e72
9 changed files with 765 additions and 0 deletions

View file

@ -0,0 +1,139 @@
package rangehashsvc
import (
"context"
"sync"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/pkg/errors"
)
type distributedHasher struct {
*cfg
traverser *placement.Traverser
}
func (h *distributedHasher) head(ctx context.Context, prm *Prm) (*Response, error) {
if err := h.prepare(ctx, prm); err != nil {
return nil, errors.Wrapf(err, "(%T) could not prepare parameters", h)
}
return h.finish(ctx, prm)
}
func (h *distributedHasher) prepare(ctx context.Context, prm *Prm) error {
var err error
// get latest network map
nm, err := netmap.GetLatestNetworkMap(h.netMapSrc)
if err != nil {
return errors.Wrapf(err, "(%T) could not get latest network map", h)
}
// get container to read the object
cnr, err := h.cnrSrc.Get(prm.addr.GetContainerID())
if err != nil {
return errors.Wrapf(err, "(%T) could not get container by ID", h)
}
// allocate placement traverser options
traverseOpts := make([]placement.Option, 0, 4)
// add common options
traverseOpts = append(traverseOpts,
// set processing container
placement.ForContainer(cnr),
// set success count (1st incoming hashes)
placement.SuccessAfter(1),
// set identifier of the processing object
placement.ForObject(prm.addr.GetObjectID()),
)
// create placement builder from network map
builder := placement.NewNetworkMapBuilder(nm)
if prm.local {
// use local-only placement builder
builder = util.NewLocalPlacement(builder, h.localAddrSrc)
}
// set placement builder
traverseOpts = append(traverseOpts, placement.UseBuilder(builder))
// build placement traverser
if h.traverser, err = placement.NewTraverser(traverseOpts...); err != nil {
return errors.Wrapf(err, "(%T) could not build placement traverser", h)
}
return nil
}
func (h *distributedHasher) finish(ctx context.Context, prm *Prm) (*Response, error) {
resp := new(Response)
w := &onceHashWriter{
once: new(sync.Once),
traverser: h.traverser,
resp: resp,
}
ctx, w.cancel = context.WithCancel(ctx)
loop:
for {
addrs := h.traverser.Next()
if len(addrs) == 0 {
break
}
wg := new(sync.WaitGroup)
for i := range addrs {
wg.Add(1)
addr := addrs[i]
if err := h.workerPool.Submit(func() {
defer wg.Done()
var hasher interface {
hashRange(context.Context, *Prm, func([][]byte)) error
}
if network.IsLocalAddress(h.localAddrSrc, addr) {
hasher = &localHasher{
storage: h.localStore,
}
} else {
hasher = &remoteHasher{
key: h.key,
node: addr,
}
}
if err := hasher.hashRange(ctx, prm, w.write); err != nil {
// TODO: log error
return
}
}); err != nil {
wg.Done()
// TODO: log error
break loop
}
}
wg.Wait()
}
if !h.traverser.Success() {
return nil, errors.Errorf("(%T) incomplete object GetRangeHash operation", h)
}
return resp, nil
}

View file

@ -0,0 +1,57 @@
package rangehashsvc
import (
"context"
"crypto/sha256"
"fmt"
"hash"
"github.com/nspcc-dev/neofs-api-go/pkg"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"
"github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/tzhash/tz"
"github.com/pkg/errors"
)
type localHasher struct {
storage *localstore.Storage
}
func (h *localHasher) hashRange(ctx context.Context, prm *Prm, handler func([][]byte)) error {
obj, err := h.storage.Get(prm.addr)
if err != nil {
return errors.Wrapf(err, "(%T) could not get object from local storage", h)
}
payload := obj.GetPayload()
hashes := make([][]byte, 0, len(prm.rngs))
var hasher hash.Hash
switch prm.typ {
default:
panic(fmt.Sprintf("unexpected checksum type %v", prm.typ))
case pkg.ChecksumSHA256:
hasher = sha256.New()
case pkg.ChecksumTZ:
hasher = tz.New()
}
for i := range prm.rngs {
left := prm.rngs[i].GetOffset()
right := left + prm.rngs[i].GetLength()
if ln := uint64(len(payload)); ln < right {
return errors.Errorf("(%T) object range is out-of-boundaries (size %d, range [%d:%d]", h, ln, left, right)
}
hasher.Reset()
hasher.Write(util.SaltXOR(payload[left:right], prm.salt))
hashes = append(hashes, hasher.Sum(nil))
}
handler(hashes)
return nil
}

View file

@ -0,0 +1,50 @@
package rangehashsvc
import (
"github.com/nspcc-dev/neofs-api-go/pkg"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
)
type Prm struct {
local bool
addr *object.Address
typ pkg.ChecksumType
rngs []*object.Range
salt []byte
}
func (p *Prm) OnlyLocal(v bool) *Prm {
if p != nil {
p.local = v
}
return p
}
func (p *Prm) WithAddress(v *object.Address) *Prm {
if p != nil {
p.addr = v
}
return p
}
func (p *Prm) WithChecksumType(typ pkg.ChecksumType) *Prm {
if p != nil {
p.typ = typ
}
return p
}
func (p *Prm) FromRanges(v ...*object.Range) *Prm {
if p != nil {
p.rngs = v
}
return p
}

View file

@ -0,0 +1,67 @@
package rangehashsvc
import (
"context"
"crypto/ecdsa"
"fmt"
"github.com/nspcc-dev/neofs-api-go/pkg"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/pkg/errors"
)
type remoteHasher struct {
key *ecdsa.PrivateKey
node *network.Address
}
func (h *remoteHasher) hashRange(ctx context.Context, prm *Prm, handler func([][]byte)) error {
addr := h.node.NetAddr()
c, err := client.New(h.key,
client.WithAddress(addr),
)
if err != nil {
return errors.Wrapf(err, "(%T) could not create SDK client %s", h, addr)
}
hashes := make([][]byte, 0, len(prm.rngs))
p := new(client.RangeChecksumParams).
WithAddress(prm.addr).
WithSalt(prm.salt).
WithRangeList(prm.rngs...)
opts := []client.CallOption{
client.WithTTL(1), // FIXME: use constant
}
switch prm.typ {
default:
panic(fmt.Sprintf("unexpected checksum type %v", prm.typ))
case pkg.ChecksumSHA256:
v, err := c.ObjectPayloadRangeSHA256(ctx, p, opts...)
if err != nil {
return errors.Wrapf(err, "(%T) could not get SHA256 checksum from %s", h, addr)
}
for i := range v {
hashes = append(hashes, v[i][:])
}
case pkg.ChecksumTZ:
v, err := c.ObjectPayloadRangeTZ(ctx, p, opts...)
if err != nil {
return errors.Wrapf(err, "(%T) could not get Tillich-Zemor checksum from %s", h, addr)
}
for i := range v {
hashes = append(hashes, v[i][:])
}
}
handler(hashes)
return nil
}

View file

@ -0,0 +1,9 @@
package rangehashsvc
type Response struct {
hashes [][]byte
}
func (r *Response) Hashes() [][]byte {
return r.hashes
}

View file

@ -0,0 +1,267 @@
package rangehashsvc
import (
"context"
"crypto/ecdsa"
"crypto/sha256"
"fmt"
"io"
"github.com/nspcc-dev/neofs-api-go/pkg"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"
"github.com/nspcc-dev/neofs-node/pkg/network"
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range"
objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/pkg/errors"
)
type Service struct {
*cfg
}
type Option func(*cfg)
type cfg struct {
key *ecdsa.PrivateKey
localStore *localstore.Storage
cnrSrc container.Source
netMapSrc netmap.Source
workerPool util.WorkerPool
localAddrSrc network.LocalAddressSource
headSvc *headsvc.Service
rangeSvc *rangesvc.Service
}
func defaultCfg() *cfg {
return &cfg{
workerPool: new(util.SyncWorkerPool),
}
}
func NewService(opts ...Option) *Service {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
return &Service{
cfg: c,
}
}
func (s *Service) GetRangeHash(ctx context.Context, prm *Prm) (*Response, error) {
headResult, err := s.headSvc.Head(ctx, new(headsvc.Prm).
WithAddress(prm.addr).
OnlyLocal(prm.local),
)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not receive Head result", s)
}
origin := headResult.Header()
originSize := origin.GetPayloadSize()
var minLeft, maxRight uint64
for i := range prm.rngs {
left := prm.rngs[i].GetOffset()
right := left + prm.rngs[i].GetLength()
if originSize < right {
return nil, errors.Errorf("(%T) requested payload range is out-of-bounds", s)
}
if left < minLeft {
minLeft = left
}
if right > maxRight {
maxRight = right
}
}
right := headResult.RightChild()
if right == nil {
right = origin
}
borderRng := new(object.Range)
borderRng.SetOffset(minLeft)
borderRng.SetLength(maxRight - minLeft)
return s.getHashes(ctx, prm, objutil.NewRangeTraverser(originSize, right, borderRng))
}
func (s *Service) getHashes(ctx context.Context, prm *Prm, traverser *objutil.RangeTraverser) (*Response, error) {
addr := object.NewAddress()
addr.SetContainerID(prm.addr.GetContainerID())
resp := &Response{
hashes: make([][]byte, 0, len(prm.rngs)),
}
for _, rng := range prm.rngs {
for {
nextID, nextRng := traverser.Next()
if nextRng != nil {
break
}
addr.SetObjectID(nextID)
head, err := s.headSvc.Head(ctx, new(headsvc.Prm).
WithAddress(addr).
OnlyLocal(prm.local),
)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not receive object header", s)
}
traverser.PushHeader(head.Header())
}
traverser.SetSeekRange(rng)
var hasher hasher
for {
nextID, nextRng := traverser.Next()
if hasher == nil {
if nextRng.GetLength() == rng.GetLength() {
hasher = new(singleHasher)
} else {
switch prm.typ {
default:
panic(fmt.Sprintf("unexpected checksum type %v", prm.typ))
case pkg.ChecksumSHA256:
hasher = &commonHasher{h: sha256.New()}
case pkg.ChecksumTZ:
hasher = &tzHasher{
hashes: make([][]byte, 0, 10),
}
}
}
}
if nextRng.GetLength() == 0 {
break
}
addr.SetObjectID(nextID)
if prm.typ == pkg.ChecksumSHA256 && nextRng.GetLength() != rng.GetLength() {
// here we cannot receive SHA256 checksum through GetRangeHash service
// since SHA256 is not homomorphic
res, err := s.rangeSvc.GetRange(ctx, new(rangesvc.Prm).
OnlyLocal(prm.local).
WithAddress(addr).
WithRange(nextRng),
)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not receive payload range for %v checksum", s, prm.typ)
}
for stream := res.Stream(); ; {
resp, err := stream.Recv()
if errors.Is(errors.Cause(err), io.EOF) {
break
}
hasher.add(resp.PayloadChunk())
}
} else {
resp, err := (&distributedHasher{
cfg: s.cfg,
}).head(ctx, new(Prm).
OnlyLocal(prm.local).
WithAddress(addr).
WithChecksumType(prm.typ).
FromRanges(nextRng),
)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not receive %v checksum", s, prm.typ)
}
hs := resp.Hashes()
if ln := len(hs); ln != 1 {
return nil, errors.Errorf("(%T) unexpected %v hashes amount %d", s, prm.typ, ln)
}
hasher.add(hs[0])
}
traverser.PushSuccessSize(nextRng.GetLength())
}
sum, err := hasher.sum()
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not calculate %v checksum", s, prm.typ)
}
resp.hashes = append(resp.hashes, sum)
}
return resp, nil
}
func WithKey(v *ecdsa.PrivateKey) Option {
return func(c *cfg) {
c.key = v
}
}
func WithLocalStorage(v *localstore.Storage) Option {
return func(c *cfg) {
c.localStore = v
}
}
func WithContainerSource(v container.Source) Option {
return func(c *cfg) {
c.cnrSrc = v
}
}
func WithNetworkMapSource(v netmap.Source) Option {
return func(c *cfg) {
c.netMapSrc = v
}
}
func WithWorkerPool(v util.WorkerPool) Option {
return func(c *cfg) {
c.workerPool = v
}
}
func WithLocalAddressSource(v network.LocalAddressSource) Option {
return func(c *cfg) {
c.localAddrSrc = v
}
}
func WithHeadService(v *headsvc.Service) Option {
return func(c *cfg) {
c.headSvc = v
}
}
func WithRangeService(v *rangesvc.Service) Option {
return func(c *cfg) {
c.rangeSvc = v
}
}

View file

@ -0,0 +1,71 @@
package rangehashsvc
import (
"context"
"hash"
"sync"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/tzhash/tz"
)
type onceHashWriter struct {
once *sync.Once
traverser *placement.Traverser
resp *Response
cancel context.CancelFunc
}
type hasher interface {
add([]byte)
sum() ([]byte, error)
}
type tzHasher struct {
hashes [][]byte
}
type commonHasher struct {
h hash.Hash
}
type singleHasher struct {
hash []byte
}
func (h *singleHasher) add(p []byte) {
h.hash = p
}
func (h *singleHasher) sum() ([]byte, error) {
return h.hash, nil
}
func (w *onceHashWriter) write(hs [][]byte) {
w.once.Do(func() {
w.resp.hashes = hs
w.traverser.SubmitSuccess()
w.cancel()
})
}
func (h *tzHasher) add(p []byte) {
h.hashes = append(h.hashes, p)
return
}
func (h *tzHasher) sum() ([]byte, error) {
return tz.Concat(h.hashes)
}
func (h *commonHasher) add(p []byte) {
h.h.Write(p)
}
func (h *commonHasher) sum() ([]byte, error) {
return h.h.Sum(nil), nil
}

View file

@ -0,0 +1,55 @@
package rangehashsvc
import (
"context"
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
rangehashsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/rangehash"
"github.com/pkg/errors"
)
// Service implements GetRangeHash operation of Object service v2.
type Service struct {
*cfg
}
// Option represents Service constructor option.
type Option func(*cfg)
type cfg struct {
svc *rangehashsvc.Service
}
// NewService constructs Service instance from provided options.
func NewService(opts ...Option) *Service {
c := new(cfg)
for i := range opts {
opts[i](c)
}
return &Service{
cfg: c,
}
}
// Head calls internal service and returns v2 object header.
func (s *Service) GetRangeHash(ctx context.Context, req *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) {
prm, err := toPrm(req)
if err != nil {
return nil, errors.Wrapf(err, "(%T) incorrect input parameters", s)
}
r, err := s.svc.GetRangeHash(ctx, prm)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not get range hashes", s)
}
return fromResponse(r, req.GetBody().GetType()), nil
}
func WithInternalService(v *rangehashsvc.Service) Option {
return func(c *cfg) {
c.svc = v
}
}

View file

@ -0,0 +1,50 @@
package rangehashsvc
import (
"github.com/nspcc-dev/neofs-api-go/pkg"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
rangehashsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/rangehash"
"github.com/pkg/errors"
)
func toPrm(req *objectV2.GetRangeHashRequest) (*rangehashsvc.Prm, error) {
body := req.GetBody()
var typ pkg.ChecksumType
switch t := body.GetType(); t {
default:
return nil, errors.Errorf("unknown checksum type %v", t)
case refs.SHA256:
typ = pkg.ChecksumSHA256
case refs.TillichZemor:
typ = pkg.ChecksumTZ
}
rngsV2 := body.GetRanges()
rngs := make([]*object.Range, 0, len(rngsV2))
for i := range rngsV2 {
rngs = append(rngs, object.NewRangeFromV2(rngsV2[i]))
}
return new(rangehashsvc.Prm).
WithAddress(
object.NewAddressFromV2(body.GetAddress()),
).
OnlyLocal(req.GetMetaHeader().GetTTL() == 1). // FIXME: use constant
WithChecksumType(typ).
FromRanges(rngs...), nil
}
func fromResponse(r *rangehashsvc.Response, typ refs.ChecksumType) *objectV2.GetRangeHashResponse {
body := new(objectV2.GetRangeHashResponseBody)
body.SetType(typ)
body.SetHashList(r.Hashes())
resp := new(objectV2.GetRangeHashResponse)
resp.SetBody(body)
return resp
}