From 0a51263e7289c170881acee27595fff9e4eaea6b Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Mon, 28 Sep 2020 16:22:13 +0300 Subject: [PATCH] [#53] services/object: Implement GetRangeHash service Signed-off-by: Leonard Lyubich --- pkg/services/object/rangehash/distributed.go | 139 ++++++++++ pkg/services/object/rangehash/local.go | 57 ++++ pkg/services/object/rangehash/prm.go | 50 ++++ pkg/services/object/rangehash/remote.go | 67 +++++ pkg/services/object/rangehash/res.go | 9 + pkg/services/object/rangehash/service.go | 267 +++++++++++++++++++ pkg/services/object/rangehash/util.go | 71 +++++ pkg/services/object/rangehash/v2/service.go | 55 ++++ pkg/services/object/rangehash/v2/util.go | 50 ++++ 9 files changed, 765 insertions(+) create mode 100644 pkg/services/object/rangehash/distributed.go create mode 100644 pkg/services/object/rangehash/local.go create mode 100644 pkg/services/object/rangehash/prm.go create mode 100644 pkg/services/object/rangehash/remote.go create mode 100644 pkg/services/object/rangehash/res.go create mode 100644 pkg/services/object/rangehash/service.go create mode 100644 pkg/services/object/rangehash/util.go create mode 100644 pkg/services/object/rangehash/v2/service.go create mode 100644 pkg/services/object/rangehash/v2/util.go diff --git a/pkg/services/object/rangehash/distributed.go b/pkg/services/object/rangehash/distributed.go new file mode 100644 index 000000000..074294189 --- /dev/null +++ b/pkg/services/object/rangehash/distributed.go @@ -0,0 +1,139 @@ +package rangehashsvc + +import ( + "context" + "sync" + + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + "github.com/pkg/errors" +) + +type distributedHasher struct { + *cfg + + traverser *placement.Traverser +} + +func (h *distributedHasher) head(ctx context.Context, prm *Prm) (*Response, error) { + if err := h.prepare(ctx, prm); err != nil { + return nil, errors.Wrapf(err, "(%T) could not prepare parameters", h) + } + + return h.finish(ctx, prm) +} + +func (h *distributedHasher) prepare(ctx context.Context, prm *Prm) error { + var err error + + // get latest network map + nm, err := netmap.GetLatestNetworkMap(h.netMapSrc) + if err != nil { + return errors.Wrapf(err, "(%T) could not get latest network map", h) + } + + // get container to read the object + cnr, err := h.cnrSrc.Get(prm.addr.GetContainerID()) + if err != nil { + return errors.Wrapf(err, "(%T) could not get container by ID", h) + } + + // allocate placement traverser options + traverseOpts := make([]placement.Option, 0, 4) + + // add common options + traverseOpts = append(traverseOpts, + // set processing container + placement.ForContainer(cnr), + + // set success count (1st incoming hashes) + placement.SuccessAfter(1), + + // set identifier of the processing object + placement.ForObject(prm.addr.GetObjectID()), + ) + + // create placement builder from network map + builder := placement.NewNetworkMapBuilder(nm) + + if prm.local { + // use local-only placement builder + builder = util.NewLocalPlacement(builder, h.localAddrSrc) + } + + // set placement builder + traverseOpts = append(traverseOpts, placement.UseBuilder(builder)) + + // build placement traverser + if h.traverser, err = placement.NewTraverser(traverseOpts...); err != nil { + return errors.Wrapf(err, "(%T) could not build placement traverser", h) + } + + return nil +} + +func (h *distributedHasher) finish(ctx context.Context, prm *Prm) (*Response, error) { + resp := new(Response) + + w := &onceHashWriter{ + once: new(sync.Once), + traverser: h.traverser, + resp: resp, + } + + ctx, w.cancel = context.WithCancel(ctx) + +loop: + for { + addrs := h.traverser.Next() + if len(addrs) == 0 { + break + } + + wg := new(sync.WaitGroup) + + for i := range addrs { + wg.Add(1) + + addr := addrs[i] + + if err := h.workerPool.Submit(func() { + defer wg.Done() + + var hasher interface { + hashRange(context.Context, *Prm, func([][]byte)) error + } + + if network.IsLocalAddress(h.localAddrSrc, addr) { + hasher = &localHasher{ + storage: h.localStore, + } + } else { + hasher = &remoteHasher{ + key: h.key, + node: addr, + } + } + + if err := hasher.hashRange(ctx, prm, w.write); err != nil { + // TODO: log error + return + } + }); err != nil { + wg.Done() + // TODO: log error + break loop + } + } + + wg.Wait() + } + + if !h.traverser.Success() { + return nil, errors.Errorf("(%T) incomplete object GetRangeHash operation", h) + } + + return resp, nil +} diff --git a/pkg/services/object/rangehash/local.go b/pkg/services/object/rangehash/local.go new file mode 100644 index 000000000..13b85c502 --- /dev/null +++ b/pkg/services/object/rangehash/local.go @@ -0,0 +1,57 @@ +package rangehashsvc + +import ( + "context" + "crypto/sha256" + "fmt" + "hash" + + "github.com/nspcc-dev/neofs-api-go/pkg" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" + "github.com/nspcc-dev/neofs-node/pkg/util" + "github.com/nspcc-dev/tzhash/tz" + "github.com/pkg/errors" +) + +type localHasher struct { + storage *localstore.Storage +} + +func (h *localHasher) hashRange(ctx context.Context, prm *Prm, handler func([][]byte)) error { + obj, err := h.storage.Get(prm.addr) + if err != nil { + return errors.Wrapf(err, "(%T) could not get object from local storage", h) + } + + payload := obj.GetPayload() + hashes := make([][]byte, 0, len(prm.rngs)) + + var hasher hash.Hash + switch prm.typ { + default: + panic(fmt.Sprintf("unexpected checksum type %v", prm.typ)) + case pkg.ChecksumSHA256: + hasher = sha256.New() + case pkg.ChecksumTZ: + hasher = tz.New() + } + + for i := range prm.rngs { + left := prm.rngs[i].GetOffset() + right := left + prm.rngs[i].GetLength() + + if ln := uint64(len(payload)); ln < right { + return errors.Errorf("(%T) object range is out-of-boundaries (size %d, range [%d:%d]", h, ln, left, right) + } + + hasher.Reset() + + hasher.Write(util.SaltXOR(payload[left:right], prm.salt)) + + hashes = append(hashes, hasher.Sum(nil)) + } + + handler(hashes) + + return nil +} diff --git a/pkg/services/object/rangehash/prm.go b/pkg/services/object/rangehash/prm.go new file mode 100644 index 000000000..9da90344b --- /dev/null +++ b/pkg/services/object/rangehash/prm.go @@ -0,0 +1,50 @@ +package rangehashsvc + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg" + "github.com/nspcc-dev/neofs-api-go/pkg/object" +) + +type Prm struct { + local bool + + addr *object.Address + + typ pkg.ChecksumType + + rngs []*object.Range + + salt []byte +} + +func (p *Prm) OnlyLocal(v bool) *Prm { + if p != nil { + p.local = v + } + + return p +} + +func (p *Prm) WithAddress(v *object.Address) *Prm { + if p != nil { + p.addr = v + } + + return p +} + +func (p *Prm) WithChecksumType(typ pkg.ChecksumType) *Prm { + if p != nil { + p.typ = typ + } + + return p +} + +func (p *Prm) FromRanges(v ...*object.Range) *Prm { + if p != nil { + p.rngs = v + } + + return p +} diff --git a/pkg/services/object/rangehash/remote.go b/pkg/services/object/rangehash/remote.go new file mode 100644 index 000000000..bdbaa33ff --- /dev/null +++ b/pkg/services/object/rangehash/remote.go @@ -0,0 +1,67 @@ +package rangehashsvc + +import ( + "context" + "crypto/ecdsa" + "fmt" + + "github.com/nspcc-dev/neofs-api-go/pkg" + "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/pkg/errors" +) + +type remoteHasher struct { + key *ecdsa.PrivateKey + + node *network.Address +} + +func (h *remoteHasher) hashRange(ctx context.Context, prm *Prm, handler func([][]byte)) error { + addr := h.node.NetAddr() + + c, err := client.New(h.key, + client.WithAddress(addr), + ) + if err != nil { + return errors.Wrapf(err, "(%T) could not create SDK client %s", h, addr) + } + + hashes := make([][]byte, 0, len(prm.rngs)) + + p := new(client.RangeChecksumParams). + WithAddress(prm.addr). + WithSalt(prm.salt). + WithRangeList(prm.rngs...) + + opts := []client.CallOption{ + client.WithTTL(1), // FIXME: use constant + } + + switch prm.typ { + default: + panic(fmt.Sprintf("unexpected checksum type %v", prm.typ)) + case pkg.ChecksumSHA256: + v, err := c.ObjectPayloadRangeSHA256(ctx, p, opts...) + if err != nil { + return errors.Wrapf(err, "(%T) could not get SHA256 checksum from %s", h, addr) + } + + for i := range v { + hashes = append(hashes, v[i][:]) + } + case pkg.ChecksumTZ: + v, err := c.ObjectPayloadRangeTZ(ctx, p, opts...) + if err != nil { + return errors.Wrapf(err, "(%T) could not get Tillich-Zemor checksum from %s", h, addr) + } + + for i := range v { + hashes = append(hashes, v[i][:]) + } + } + + handler(hashes) + + return nil +} diff --git a/pkg/services/object/rangehash/res.go b/pkg/services/object/rangehash/res.go new file mode 100644 index 000000000..634f560a4 --- /dev/null +++ b/pkg/services/object/rangehash/res.go @@ -0,0 +1,9 @@ +package rangehashsvc + +type Response struct { + hashes [][]byte +} + +func (r *Response) Hashes() [][]byte { + return r.hashes +} diff --git a/pkg/services/object/rangehash/service.go b/pkg/services/object/rangehash/service.go new file mode 100644 index 000000000..4d2e9f76e --- /dev/null +++ b/pkg/services/object/rangehash/service.go @@ -0,0 +1,267 @@ +package rangehashsvc + +import ( + "context" + "crypto/ecdsa" + "crypto/sha256" + "fmt" + "io" + + "github.com/nspcc-dev/neofs-api-go/pkg" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/container" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" + "github.com/nspcc-dev/neofs-node/pkg/network" + headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" + rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range" + objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + "github.com/nspcc-dev/neofs-node/pkg/util" + "github.com/pkg/errors" +) + +type Service struct { + *cfg +} + +type Option func(*cfg) + +type cfg struct { + key *ecdsa.PrivateKey + + localStore *localstore.Storage + + cnrSrc container.Source + + netMapSrc netmap.Source + + workerPool util.WorkerPool + + localAddrSrc network.LocalAddressSource + + headSvc *headsvc.Service + + rangeSvc *rangesvc.Service +} + +func defaultCfg() *cfg { + return &cfg{ + workerPool: new(util.SyncWorkerPool), + } +} + +func NewService(opts ...Option) *Service { + c := defaultCfg() + + for i := range opts { + opts[i](c) + } + + return &Service{ + cfg: c, + } +} + +func (s *Service) GetRangeHash(ctx context.Context, prm *Prm) (*Response, error) { + headResult, err := s.headSvc.Head(ctx, new(headsvc.Prm). + WithAddress(prm.addr). + OnlyLocal(prm.local), + ) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not receive Head result", s) + } + + origin := headResult.Header() + + originSize := origin.GetPayloadSize() + + var minLeft, maxRight uint64 + for i := range prm.rngs { + left := prm.rngs[i].GetOffset() + right := left + prm.rngs[i].GetLength() + + if originSize < right { + return nil, errors.Errorf("(%T) requested payload range is out-of-bounds", s) + } + + if left < minLeft { + minLeft = left + } + + if right > maxRight { + maxRight = right + } + } + + right := headResult.RightChild() + if right == nil { + right = origin + } + + borderRng := new(object.Range) + borderRng.SetOffset(minLeft) + borderRng.SetLength(maxRight - minLeft) + + return s.getHashes(ctx, prm, objutil.NewRangeTraverser(originSize, right, borderRng)) +} + +func (s *Service) getHashes(ctx context.Context, prm *Prm, traverser *objutil.RangeTraverser) (*Response, error) { + addr := object.NewAddress() + addr.SetContainerID(prm.addr.GetContainerID()) + + resp := &Response{ + hashes: make([][]byte, 0, len(prm.rngs)), + } + + for _, rng := range prm.rngs { + for { + nextID, nextRng := traverser.Next() + if nextRng != nil { + break + } + + addr.SetObjectID(nextID) + + head, err := s.headSvc.Head(ctx, new(headsvc.Prm). + WithAddress(addr). + OnlyLocal(prm.local), + ) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not receive object header", s) + } + + traverser.PushHeader(head.Header()) + } + + traverser.SetSeekRange(rng) + + var hasher hasher + + for { + nextID, nextRng := traverser.Next() + + if hasher == nil { + if nextRng.GetLength() == rng.GetLength() { + hasher = new(singleHasher) + } else { + switch prm.typ { + default: + panic(fmt.Sprintf("unexpected checksum type %v", prm.typ)) + case pkg.ChecksumSHA256: + hasher = &commonHasher{h: sha256.New()} + case pkg.ChecksumTZ: + hasher = &tzHasher{ + hashes: make([][]byte, 0, 10), + } + } + } + } + + if nextRng.GetLength() == 0 { + break + } + + addr.SetObjectID(nextID) + + if prm.typ == pkg.ChecksumSHA256 && nextRng.GetLength() != rng.GetLength() { + // here we cannot receive SHA256 checksum through GetRangeHash service + // since SHA256 is not homomorphic + res, err := s.rangeSvc.GetRange(ctx, new(rangesvc.Prm). + OnlyLocal(prm.local). + WithAddress(addr). + WithRange(nextRng), + ) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not receive payload range for %v checksum", s, prm.typ) + } + + for stream := res.Stream(); ; { + resp, err := stream.Recv() + if errors.Is(errors.Cause(err), io.EOF) { + break + } + + hasher.add(resp.PayloadChunk()) + } + } else { + resp, err := (&distributedHasher{ + cfg: s.cfg, + }).head(ctx, new(Prm). + OnlyLocal(prm.local). + WithAddress(addr). + WithChecksumType(prm.typ). + FromRanges(nextRng), + ) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not receive %v checksum", s, prm.typ) + } + + hs := resp.Hashes() + if ln := len(hs); ln != 1 { + return nil, errors.Errorf("(%T) unexpected %v hashes amount %d", s, prm.typ, ln) + } + + hasher.add(hs[0]) + } + + traverser.PushSuccessSize(nextRng.GetLength()) + } + + sum, err := hasher.sum() + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not calculate %v checksum", s, prm.typ) + } + + resp.hashes = append(resp.hashes, sum) + } + + return resp, nil +} + +func WithKey(v *ecdsa.PrivateKey) Option { + return func(c *cfg) { + c.key = v + } +} + +func WithLocalStorage(v *localstore.Storage) Option { + return func(c *cfg) { + c.localStore = v + } +} + +func WithContainerSource(v container.Source) Option { + return func(c *cfg) { + c.cnrSrc = v + } +} + +func WithNetworkMapSource(v netmap.Source) Option { + return func(c *cfg) { + c.netMapSrc = v + } +} + +func WithWorkerPool(v util.WorkerPool) Option { + return func(c *cfg) { + c.workerPool = v + } +} + +func WithLocalAddressSource(v network.LocalAddressSource) Option { + return func(c *cfg) { + c.localAddrSrc = v + } +} + +func WithHeadService(v *headsvc.Service) Option { + return func(c *cfg) { + c.headSvc = v + } +} + +func WithRangeService(v *rangesvc.Service) Option { + return func(c *cfg) { + c.rangeSvc = v + } +} diff --git a/pkg/services/object/rangehash/util.go b/pkg/services/object/rangehash/util.go new file mode 100644 index 000000000..7f62096d8 --- /dev/null +++ b/pkg/services/object/rangehash/util.go @@ -0,0 +1,71 @@ +package rangehashsvc + +import ( + "context" + "hash" + "sync" + + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + "github.com/nspcc-dev/tzhash/tz" +) + +type onceHashWriter struct { + once *sync.Once + + traverser *placement.Traverser + + resp *Response + + cancel context.CancelFunc +} + +type hasher interface { + add([]byte) + sum() ([]byte, error) +} + +type tzHasher struct { + hashes [][]byte +} + +type commonHasher struct { + h hash.Hash +} + +type singleHasher struct { + hash []byte +} + +func (h *singleHasher) add(p []byte) { + h.hash = p +} + +func (h *singleHasher) sum() ([]byte, error) { + return h.hash, nil +} + +func (w *onceHashWriter) write(hs [][]byte) { + w.once.Do(func() { + w.resp.hashes = hs + w.traverser.SubmitSuccess() + w.cancel() + }) +} + +func (h *tzHasher) add(p []byte) { + h.hashes = append(h.hashes, p) + + return +} + +func (h *tzHasher) sum() ([]byte, error) { + return tz.Concat(h.hashes) +} + +func (h *commonHasher) add(p []byte) { + h.h.Write(p) +} + +func (h *commonHasher) sum() ([]byte, error) { + return h.h.Sum(nil), nil +} diff --git a/pkg/services/object/rangehash/v2/service.go b/pkg/services/object/rangehash/v2/service.go new file mode 100644 index 000000000..2f3f18f8d --- /dev/null +++ b/pkg/services/object/rangehash/v2/service.go @@ -0,0 +1,55 @@ +package rangehashsvc + +import ( + "context" + + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + rangehashsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/rangehash" + "github.com/pkg/errors" +) + +// Service implements GetRangeHash operation of Object service v2. +type Service struct { + *cfg +} + +// Option represents Service constructor option. +type Option func(*cfg) + +type cfg struct { + svc *rangehashsvc.Service +} + +// NewService constructs Service instance from provided options. +func NewService(opts ...Option) *Service { + c := new(cfg) + + for i := range opts { + opts[i](c) + } + + return &Service{ + cfg: c, + } +} + +// Head calls internal service and returns v2 object header. +func (s *Service) GetRangeHash(ctx context.Context, req *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) { + prm, err := toPrm(req) + if err != nil { + return nil, errors.Wrapf(err, "(%T) incorrect input parameters", s) + } + + r, err := s.svc.GetRangeHash(ctx, prm) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not get range hashes", s) + } + + return fromResponse(r, req.GetBody().GetType()), nil +} + +func WithInternalService(v *rangehashsvc.Service) Option { + return func(c *cfg) { + c.svc = v + } +} diff --git a/pkg/services/object/rangehash/v2/util.go b/pkg/services/object/rangehash/v2/util.go new file mode 100644 index 000000000..163f6598b --- /dev/null +++ b/pkg/services/object/rangehash/v2/util.go @@ -0,0 +1,50 @@ +package rangehashsvc + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-api-go/v2/refs" + rangehashsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/rangehash" + "github.com/pkg/errors" +) + +func toPrm(req *objectV2.GetRangeHashRequest) (*rangehashsvc.Prm, error) { + body := req.GetBody() + + var typ pkg.ChecksumType + switch t := body.GetType(); t { + default: + return nil, errors.Errorf("unknown checksum type %v", t) + case refs.SHA256: + typ = pkg.ChecksumSHA256 + case refs.TillichZemor: + typ = pkg.ChecksumTZ + } + + rngsV2 := body.GetRanges() + rngs := make([]*object.Range, 0, len(rngsV2)) + + for i := range rngsV2 { + rngs = append(rngs, object.NewRangeFromV2(rngsV2[i])) + } + + return new(rangehashsvc.Prm). + WithAddress( + object.NewAddressFromV2(body.GetAddress()), + ). + OnlyLocal(req.GetMetaHeader().GetTTL() == 1). // FIXME: use constant + WithChecksumType(typ). + FromRanges(rngs...), nil +} + +func fromResponse(r *rangehashsvc.Response, typ refs.ChecksumType) *objectV2.GetRangeHashResponse { + body := new(objectV2.GetRangeHashResponseBody) + body.SetType(typ) + body.SetHashList(r.Hashes()) + + resp := new(objectV2.GetRangeHashResponse) + resp.SetBody(body) + + return resp +}