From 3260e9263e4192ee9505b811beda669be7d2184b Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 29 Dec 2020 17:24:49 +0300 Subject: [PATCH] [#291] Remove unused pkg/services/object/rangehash packages Signed-off-by: Leonard Lyubich --- pkg/services/object/rangehash/distributed.go | 144 ---------- pkg/services/object/rangehash/local.go | 61 ---- pkg/services/object/rangehash/prm.go | 51 ---- pkg/services/object/rangehash/remote.go | 80 ------ pkg/services/object/rangehash/res.go | 9 - pkg/services/object/rangehash/service.go | 281 ------------------- pkg/services/object/rangehash/util.go | 75 ----- pkg/services/object/rangehash/v2/service.go | 55 ---- pkg/services/object/rangehash/v2/util.go | 51 ---- 9 files changed, 807 deletions(-) delete mode 100644 pkg/services/object/rangehash/distributed.go delete mode 100644 pkg/services/object/rangehash/local.go delete mode 100644 pkg/services/object/rangehash/prm.go delete mode 100644 pkg/services/object/rangehash/remote.go delete mode 100644 pkg/services/object/rangehash/res.go delete mode 100644 pkg/services/object/rangehash/service.go delete mode 100644 pkg/services/object/rangehash/util.go delete mode 100644 pkg/services/object/rangehash/v2/service.go delete mode 100644 pkg/services/object/rangehash/v2/util.go diff --git a/pkg/services/object/rangehash/distributed.go b/pkg/services/object/rangehash/distributed.go deleted file mode 100644 index 059656146..000000000 --- a/pkg/services/object/rangehash/distributed.go +++ /dev/null @@ -1,144 +0,0 @@ -package rangehashsvc - -import ( - "context" - "sync" - - "github.com/nspcc-dev/neofs-node/pkg/core/netmap" - "github.com/nspcc-dev/neofs-node/pkg/network" - svcutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" - "github.com/pkg/errors" -) - -type distributedHasher struct { - *cfg - - traverser *placement.Traverser -} - -func (h *distributedHasher) head(ctx context.Context, prm *Prm) (*Response, error) { - if err := h.prepare(ctx, prm); err != nil { - return nil, errors.Wrapf(err, "(%T) could not prepare parameters", h) - } - - return h.finish(ctx, prm) -} - -func (h *distributedHasher) prepare(ctx context.Context, prm *Prm) error { - var err error - - // get latest network map - nm, err := netmap.GetLatestNetworkMap(h.netMapSrc) - if err != nil { - return errors.Wrapf(err, "(%T) could not get latest network map", h) - } - - // get container to read the object - cnr, err := h.cnrSrc.Get(prm.addr.ContainerID()) - if err != nil { - return errors.Wrapf(err, "(%T) could not get container by ID", h) - } - - // allocate placement traverser options - traverseOpts := make([]placement.Option, 0, 4) - - // add common options - traverseOpts = append(traverseOpts, - // set processing container - placement.ForContainer(cnr), - - // set success count (1st incoming hashes) - placement.SuccessAfter(1), - - // set identifier of the processing object - placement.ForObject(prm.addr.ObjectID()), - ) - - // create placement builder from network map - builder := placement.NewNetworkMapBuilder(nm) - - if prm.common.LocalOnly() { - // use local-only placement builder - builder = svcutil.NewLocalPlacement(builder, h.localAddrSrc) - } - - // set placement builder - traverseOpts = append(traverseOpts, placement.UseBuilder(builder)) - - // build placement traverser - if h.traverser, err = placement.NewTraverser(traverseOpts...); err != nil { - return errors.Wrapf(err, "(%T) could not build placement traverser", h) - } - - return nil -} - -func (h *distributedHasher) finish(ctx context.Context, prm *Prm) (*Response, error) { - resp := new(Response) - - w := &onceHashWriter{ - once: new(sync.Once), - traverser: h.traverser, - resp: resp, - } - - ctx, w.cancel = context.WithCancel(ctx) - -loop: - for { - addrs := h.traverser.Next() - if len(addrs) == 0 { - break - } - - wg := new(sync.WaitGroup) - - for i := range addrs { - wg.Add(1) - - addr := addrs[i] - - if err := h.workerPool.Submit(func() { - defer wg.Done() - - var hasher interface { - hashRange(context.Context, *Prm, func([][]byte)) error - } - - if network.IsLocalAddress(h.localAddrSrc, addr) { - hasher = &localHasher{ - storage: h.localStore, - } - } else { - hasher = &remoteHasher{ - keyStorage: h.keyStorage, - node: addr, - clientCache: h.clientCache, - clientOpts: h.clientOpts, - } - } - - if err := hasher.hashRange(ctx, prm, w.write); err != nil { - svcutil.LogServiceError(h.log, "RANGEHASH", addr, err) - - return - } - }); err != nil { - wg.Done() - - svcutil.LogWorkerPoolError(h.log, "RANGEHASH", err) - - break loop - } - } - - wg.Wait() - } - - if !h.traverser.Success() { - return nil, errors.Errorf("(%T) incomplete object GetRangeHash operation", h) - } - - return resp, nil -} diff --git a/pkg/services/object/rangehash/local.go b/pkg/services/object/rangehash/local.go deleted file mode 100644 index 2f3f7f1c8..000000000 --- a/pkg/services/object/rangehash/local.go +++ /dev/null @@ -1,61 +0,0 @@ -package rangehashsvc - -import ( - "context" - "crypto/sha256" - "fmt" - "hash" - - "github.com/nspcc-dev/neofs-api-go/pkg" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" - "github.com/nspcc-dev/neofs-node/pkg/util" - "github.com/nspcc-dev/tzhash/tz" - "github.com/pkg/errors" -) - -type localHasher struct { - storage *engine.StorageEngine -} - -func (h *localHasher) hashRange(ctx context.Context, prm *Prm, handler func([][]byte)) error { - // FIXME: get partial range instead of full object. - // Current solution is simple, but more loaded - // We can calculate left and right border between all ranges - // and request bordered range (look Service.GetRangeHash). - obj, err := engine.Get(h.storage, prm.addr) - if err != nil { - return errors.Wrapf(err, "(%T) could not get object from local storage", h) - } - - payload := obj.Payload() - hashes := make([][]byte, 0, len(prm.rngs)) - - var hasher hash.Hash - switch prm.typ { - default: - panic(fmt.Sprintf("unexpected checksum type %v", prm.typ)) - case pkg.ChecksumSHA256: - hasher = sha256.New() - case pkg.ChecksumTZ: - hasher = tz.New() - } - - for i := range prm.rngs { - left := prm.rngs[i].GetOffset() - right := left + prm.rngs[i].GetLength() - - if ln := uint64(len(payload)); ln < right { - return errors.Errorf("(%T) object range is out-of-boundaries (size %d, range [%d:%d]", h, ln, left, right) - } - - hasher.Reset() - - hasher.Write(util.SaltXOR(payload[left:right], prm.salt)) - - hashes = append(hashes, hasher.Sum(nil)) - } - - handler(hashes) - - return nil -} diff --git a/pkg/services/object/rangehash/prm.go b/pkg/services/object/rangehash/prm.go deleted file mode 100644 index ff7078e2c..000000000 --- a/pkg/services/object/rangehash/prm.go +++ /dev/null @@ -1,51 +0,0 @@ -package rangehashsvc - -import ( - "github.com/nspcc-dev/neofs-api-go/pkg" - "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-node/pkg/services/object/util" -) - -type Prm struct { - common *util.CommonPrm - - addr *object.Address - - typ pkg.ChecksumType - - rngs []*object.Range - - salt []byte -} - -func (p *Prm) WithCommonPrm(v *util.CommonPrm) *Prm { - if p != nil { - p.common = v - } - - return p -} - -func (p *Prm) WithAddress(v *object.Address) *Prm { - if p != nil { - p.addr = v - } - - return p -} - -func (p *Prm) WithChecksumType(typ pkg.ChecksumType) *Prm { - if p != nil { - p.typ = typ - } - - return p -} - -func (p *Prm) FromRanges(v ...*object.Range) *Prm { - if p != nil { - p.rngs = v - } - - return p -} diff --git a/pkg/services/object/rangehash/remote.go b/pkg/services/object/rangehash/remote.go deleted file mode 100644 index 78f0cc743..000000000 --- a/pkg/services/object/rangehash/remote.go +++ /dev/null @@ -1,80 +0,0 @@ -package rangehashsvc - -import ( - "context" - "fmt" - - "github.com/nspcc-dev/neofs-api-go/pkg" - "github.com/nspcc-dev/neofs-api-go/pkg/client" - "github.com/nspcc-dev/neofs-node/pkg/network" - "github.com/nspcc-dev/neofs-node/pkg/network/cache" - "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/pkg/errors" -) - -type remoteHasher struct { - keyStorage *util.KeyStorage - - node *network.Address - - clientCache *cache.ClientCache - - clientOpts []client.Option -} - -func (h *remoteHasher) hashRange(ctx context.Context, prm *Prm, handler func([][]byte)) error { - key, err := h.keyStorage.GetKey(prm.common.SessionToken()) - if err != nil { - return errors.Wrapf(err, "(%T) could not receive private key", h) - } - - addr, err := h.node.IPAddrString() - if err != nil { - return err - } - - c, err := h.clientCache.Get(key, addr, h.clientOpts...) - if err != nil { - return errors.Wrapf(err, "(%T) could not create SDK client %s", h, addr) - } - - hashes := make([][]byte, 0, len(prm.rngs)) - - p := new(client.RangeChecksumParams). - WithAddress(prm.addr). - WithSalt(prm.salt). - WithRangeList(prm.rngs...) - - opts := []client.CallOption{ - client.WithTTL(1), // FIXME: use constant - client.WithSession(prm.common.SessionToken()), - client.WithBearer(prm.common.BearerToken()), - } - - switch prm.typ { - default: - panic(fmt.Sprintf("unexpected checksum type %v", prm.typ)) - case pkg.ChecksumSHA256: - v, err := c.ObjectPayloadRangeSHA256(ctx, p, opts...) - if err != nil { - return errors.Wrapf(err, "(%T) could not get SHA256 checksum from %s", h, addr) - } - - for i := range v { - hashes = append(hashes, v[i][:]) - } - case pkg.ChecksumTZ: - v, err := c.ObjectPayloadRangeTZ(ctx, p, opts...) - if err != nil { - return errors.Wrapf(err, "(%T) could not get Tillich-Zemor checksum from %s", h, addr) - } - - for i := range v { - hashes = append(hashes, v[i][:]) - } - } - - handler(hashes) - - return nil -} diff --git a/pkg/services/object/rangehash/res.go b/pkg/services/object/rangehash/res.go deleted file mode 100644 index 634f560a4..000000000 --- a/pkg/services/object/rangehash/res.go +++ /dev/null @@ -1,9 +0,0 @@ -package rangehashsvc - -type Response struct { - hashes [][]byte -} - -func (r *Response) Hashes() [][]byte { - return r.hashes -} diff --git a/pkg/services/object/rangehash/service.go b/pkg/services/object/rangehash/service.go deleted file mode 100644 index 69defe936..000000000 --- a/pkg/services/object/rangehash/service.go +++ /dev/null @@ -1,281 +0,0 @@ -package rangehashsvc - -import ( - "context" - "crypto/sha256" - "fmt" - - "github.com/nspcc-dev/neofs-api-go/pkg" - "github.com/nspcc-dev/neofs-api-go/pkg/client" - "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-node/pkg/core/container" - "github.com/nspcc-dev/neofs-node/pkg/core/netmap" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" - "github.com/nspcc-dev/neofs-node/pkg/network" - "github.com/nspcc-dev/neofs-node/pkg/network/cache" - getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" - headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" - objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/nspcc-dev/neofs-node/pkg/util" - "github.com/nspcc-dev/neofs-node/pkg/util/logger" - "github.com/pkg/errors" - "go.uber.org/zap" -) - -type Service struct { - *cfg -} - -type Option func(*cfg) - -type cfg struct { - keyStorage *objutil.KeyStorage - - localStore *engine.StorageEngine - - cnrSrc container.Source - - netMapSrc netmap.Source - - workerPool util.WorkerPool - - localAddrSrc network.LocalAddressSource - - headSvc *headsvc.Service - - rangeSvc *getsvc.Service - - clientCache *cache.ClientCache - - log *logger.Logger - - clientOpts []client.Option -} - -func defaultCfg() *cfg { - return &cfg{ - workerPool: new(util.SyncWorkerPool), - log: zap.L(), - } -} - -func NewService(opts ...Option) *Service { - c := defaultCfg() - - for i := range opts { - opts[i](c) - } - - return &Service{ - cfg: c, - } -} - -func (s *Service) GetRangeHash(ctx context.Context, prm *Prm) (*Response, error) { - headResult, err := s.headSvc.Head(ctx, new(headsvc.Prm). - WithAddress(prm.addr). - WithCommonPrm(prm.common), - ) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not receive Head result", s) - } - - origin := headResult.Header() - - originSize := origin.PayloadSize() - - var minLeft, maxRight uint64 - for i := range prm.rngs { - left := prm.rngs[i].GetOffset() - right := left + prm.rngs[i].GetLength() - - if originSize < right { - return nil, errors.Errorf("(%T) requested payload range is out-of-bounds", s) - } - - if left < minLeft { - minLeft = left - } - - if right > maxRight { - maxRight = right - } - } - - borderRng := new(object.Range) - borderRng.SetOffset(minLeft) - borderRng.SetLength(maxRight - minLeft) - - return s.getHashes(ctx, prm, objutil.NewRangeTraverser(originSize, origin, borderRng)) -} - -func (s *Service) getHashes(ctx context.Context, prm *Prm, traverser *objutil.RangeTraverser) (*Response, error) { - addr := object.NewAddress() - addr.SetContainerID(prm.addr.ContainerID()) - - resp := &Response{ - hashes: make([][]byte, 0, len(prm.rngs)), - } - - for _, rng := range prm.rngs { - for { - nextID, nextRng := traverser.Next() - if nextRng != nil { - break - } - - addr.SetObjectID(nextID) - - head, err := s.headSvc.Head(ctx, new(headsvc.Prm). - WithAddress(addr). - WithCommonPrm(prm.common), - ) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not receive object header", s) - } - - traverser.PushHeader(head.Header()) - } - - traverser.SetSeekRange(rng) - - var hasher hasher - - for { - nextID, nextRng := traverser.Next() - - if hasher == nil { - if nextRng.GetLength() == rng.GetLength() { - hasher = new(singleHasher) - } else { - switch prm.typ { - default: - panic(fmt.Sprintf("unexpected checksum type %v", prm.typ)) - case pkg.ChecksumSHA256: - hasher = &commonHasher{h: sha256.New()} - case pkg.ChecksumTZ: - hasher = &tzHasher{ - hashes: make([][]byte, 0, 10), - } - } - } - } - - if nextRng.GetLength() == 0 { - break - } - - addr.SetObjectID(nextID) - - if prm.typ == pkg.ChecksumSHA256 && nextRng.GetLength() != rng.GetLength() { - // here we cannot receive SHA256 checksum through GetRangeHash service - // since SHA256 is not homomorphic - rngPrm := getsvc.RangePrm{} - rngPrm.SetRange(nextRng) - rngPrm.WithAddress(addr) - rngPrm.SetChunkWriter(hasher) - - err := s.rangeSvc.GetRange(ctx, rngPrm) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not receive payload range for %v checksum", s, prm.typ) - } - } else { - resp, err := (&distributedHasher{ - cfg: s.cfg, - }).head(ctx, new(Prm). - WithAddress(addr). - WithChecksumType(prm.typ). - FromRanges(nextRng). - WithCommonPrm(prm.common), - ) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not receive %v checksum", s, prm.typ) - } - - hs := resp.Hashes() - if ln := len(hs); ln != 1 { - return nil, errors.Errorf("(%T) unexpected %v hashes amount %d", s, prm.typ, ln) - } - - _ = hasher.WriteChunk(hs[0]) - } - - traverser.PushSuccessSize(nextRng.GetLength()) - } - - sum, err := hasher.sum() - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not calculate %v checksum", s, prm.typ) - } - - resp.hashes = append(resp.hashes, sum) - } - - return resp, nil -} - -func WithKeyStorage(v *objutil.KeyStorage) Option { - return func(c *cfg) { - c.keyStorage = v - } -} - -func WithLocalStorage(v *engine.StorageEngine) Option { - return func(c *cfg) { - c.localStore = v - } -} - -func WithContainerSource(v container.Source) Option { - return func(c *cfg) { - c.cnrSrc = v - } -} - -func WithNetworkMapSource(v netmap.Source) Option { - return func(c *cfg) { - c.netMapSrc = v - } -} - -func WithWorkerPool(v util.WorkerPool) Option { - return func(c *cfg) { - c.workerPool = v - } -} - -func WithLocalAddressSource(v network.LocalAddressSource) Option { - return func(c *cfg) { - c.localAddrSrc = v - } -} - -func WithHeadService(v *headsvc.Service) Option { - return func(c *cfg) { - c.headSvc = v - } -} - -func WithRangeService(v *getsvc.Service) Option { - return func(c *cfg) { - c.rangeSvc = v - } -} - -func WithClientCache(v *cache.ClientCache) Option { - return func(c *cfg) { - c.clientCache = v - } -} - -func WithLogger(l *logger.Logger) Option { - return func(c *cfg) { - c.log = l - } -} - -func WithClientOptions(opts ...client.Option) Option { - return func(c *cfg) { - c.clientOpts = opts - } -} diff --git a/pkg/services/object/rangehash/util.go b/pkg/services/object/rangehash/util.go deleted file mode 100644 index d41b5095d..000000000 --- a/pkg/services/object/rangehash/util.go +++ /dev/null @@ -1,75 +0,0 @@ -package rangehashsvc - -import ( - "context" - "hash" - "sync" - - "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" - "github.com/nspcc-dev/tzhash/tz" -) - -type onceHashWriter struct { - once *sync.Once - - traverser *placement.Traverser - - resp *Response - - cancel context.CancelFunc -} - -type hasher interface { - WriteChunk([]byte) error - sum() ([]byte, error) -} - -type tzHasher struct { - hashes [][]byte -} - -type commonHasher struct { - h hash.Hash -} - -type singleHasher struct { - hash []byte -} - -func (h *singleHasher) WriteChunk(p []byte) error { - h.hash = p - - return nil -} - -func (h *singleHasher) sum() ([]byte, error) { - return h.hash, nil -} - -func (w *onceHashWriter) write(hs [][]byte) { - w.once.Do(func() { - w.resp.hashes = hs - w.traverser.SubmitSuccess() - w.cancel() - }) -} - -func (h *tzHasher) WriteChunk(p []byte) error { - h.hashes = append(h.hashes, p) - - return nil -} - -func (h *tzHasher) sum() ([]byte, error) { - return tz.Concat(h.hashes) -} - -func (h *commonHasher) WriteChunk(p []byte) error { - h.h.Write(p) - - return nil -} - -func (h *commonHasher) sum() ([]byte, error) { - return h.h.Sum(nil), nil -} diff --git a/pkg/services/object/rangehash/v2/service.go b/pkg/services/object/rangehash/v2/service.go deleted file mode 100644 index 2f3f18f8d..000000000 --- a/pkg/services/object/rangehash/v2/service.go +++ /dev/null @@ -1,55 +0,0 @@ -package rangehashsvc - -import ( - "context" - - objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" - rangehashsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/rangehash" - "github.com/pkg/errors" -) - -// Service implements GetRangeHash operation of Object service v2. -type Service struct { - *cfg -} - -// Option represents Service constructor option. -type Option func(*cfg) - -type cfg struct { - svc *rangehashsvc.Service -} - -// NewService constructs Service instance from provided options. -func NewService(opts ...Option) *Service { - c := new(cfg) - - for i := range opts { - opts[i](c) - } - - return &Service{ - cfg: c, - } -} - -// Head calls internal service and returns v2 object header. -func (s *Service) GetRangeHash(ctx context.Context, req *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) { - prm, err := toPrm(req) - if err != nil { - return nil, errors.Wrapf(err, "(%T) incorrect input parameters", s) - } - - r, err := s.svc.GetRangeHash(ctx, prm) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not get range hashes", s) - } - - return fromResponse(r, req.GetBody().GetType()), nil -} - -func WithInternalService(v *rangehashsvc.Service) Option { - return func(c *cfg) { - c.svc = v - } -} diff --git a/pkg/services/object/rangehash/v2/util.go b/pkg/services/object/rangehash/v2/util.go deleted file mode 100644 index d1faa56fa..000000000 --- a/pkg/services/object/rangehash/v2/util.go +++ /dev/null @@ -1,51 +0,0 @@ -package rangehashsvc - -import ( - "github.com/nspcc-dev/neofs-api-go/pkg" - "github.com/nspcc-dev/neofs-api-go/pkg/object" - objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" - "github.com/nspcc-dev/neofs-api-go/v2/refs" - rangehashsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/rangehash" - "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/pkg/errors" -) - -func toPrm(req *objectV2.GetRangeHashRequest) (*rangehashsvc.Prm, error) { - body := req.GetBody() - - var typ pkg.ChecksumType - switch t := body.GetType(); t { - default: - return nil, errors.Errorf("unknown checksum type %v", t) - case refs.SHA256: - typ = pkg.ChecksumSHA256 - case refs.TillichZemor: - typ = pkg.ChecksumTZ - } - - rngsV2 := body.GetRanges() - rngs := make([]*object.Range, 0, len(rngsV2)) - - for i := range rngsV2 { - rngs = append(rngs, object.NewRangeFromV2(rngsV2[i])) - } - - return new(rangehashsvc.Prm). - WithAddress( - object.NewAddressFromV2(body.GetAddress()), - ). - WithChecksumType(typ). - FromRanges(rngs...). - WithCommonPrm(util.CommonPrmFromV2(req)), nil -} - -func fromResponse(r *rangehashsvc.Response, typ refs.ChecksumType) *objectV2.GetRangeHashResponse { - body := new(objectV2.GetRangeHashResponseBody) - body.SetType(typ) - body.SetHashList(r.Hashes()) - - resp := new(objectV2.GetRangeHashResponse) - resp.SetBody(body) - - return resp -}