forked from TrueCloudLab/frostfs-node
[#291] Remove unused pkg/services/object/rangehash packages
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
d299d94049
commit
3260e9263e
9 changed files with 0 additions and 807 deletions
|
@ -1,144 +0,0 @@
|
||||||
package rangehashsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
svcutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
type distributedHasher struct {
|
|
||||||
*cfg
|
|
||||||
|
|
||||||
traverser *placement.Traverser
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *distributedHasher) head(ctx context.Context, prm *Prm) (*Response, error) {
|
|
||||||
if err := h.prepare(ctx, prm); err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "(%T) could not prepare parameters", h)
|
|
||||||
}
|
|
||||||
|
|
||||||
return h.finish(ctx, prm)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *distributedHasher) prepare(ctx context.Context, prm *Prm) error {
|
|
||||||
var err error
|
|
||||||
|
|
||||||
// get latest network map
|
|
||||||
nm, err := netmap.GetLatestNetworkMap(h.netMapSrc)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "(%T) could not get latest network map", h)
|
|
||||||
}
|
|
||||||
|
|
||||||
// get container to read the object
|
|
||||||
cnr, err := h.cnrSrc.Get(prm.addr.ContainerID())
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "(%T) could not get container by ID", h)
|
|
||||||
}
|
|
||||||
|
|
||||||
// allocate placement traverser options
|
|
||||||
traverseOpts := make([]placement.Option, 0, 4)
|
|
||||||
|
|
||||||
// add common options
|
|
||||||
traverseOpts = append(traverseOpts,
|
|
||||||
// set processing container
|
|
||||||
placement.ForContainer(cnr),
|
|
||||||
|
|
||||||
// set success count (1st incoming hashes)
|
|
||||||
placement.SuccessAfter(1),
|
|
||||||
|
|
||||||
// set identifier of the processing object
|
|
||||||
placement.ForObject(prm.addr.ObjectID()),
|
|
||||||
)
|
|
||||||
|
|
||||||
// create placement builder from network map
|
|
||||||
builder := placement.NewNetworkMapBuilder(nm)
|
|
||||||
|
|
||||||
if prm.common.LocalOnly() {
|
|
||||||
// use local-only placement builder
|
|
||||||
builder = svcutil.NewLocalPlacement(builder, h.localAddrSrc)
|
|
||||||
}
|
|
||||||
|
|
||||||
// set placement builder
|
|
||||||
traverseOpts = append(traverseOpts, placement.UseBuilder(builder))
|
|
||||||
|
|
||||||
// build placement traverser
|
|
||||||
if h.traverser, err = placement.NewTraverser(traverseOpts...); err != nil {
|
|
||||||
return errors.Wrapf(err, "(%T) could not build placement traverser", h)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *distributedHasher) finish(ctx context.Context, prm *Prm) (*Response, error) {
|
|
||||||
resp := new(Response)
|
|
||||||
|
|
||||||
w := &onceHashWriter{
|
|
||||||
once: new(sync.Once),
|
|
||||||
traverser: h.traverser,
|
|
||||||
resp: resp,
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, w.cancel = context.WithCancel(ctx)
|
|
||||||
|
|
||||||
loop:
|
|
||||||
for {
|
|
||||||
addrs := h.traverser.Next()
|
|
||||||
if len(addrs) == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
wg := new(sync.WaitGroup)
|
|
||||||
|
|
||||||
for i := range addrs {
|
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
addr := addrs[i]
|
|
||||||
|
|
||||||
if err := h.workerPool.Submit(func() {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
var hasher interface {
|
|
||||||
hashRange(context.Context, *Prm, func([][]byte)) error
|
|
||||||
}
|
|
||||||
|
|
||||||
if network.IsLocalAddress(h.localAddrSrc, addr) {
|
|
||||||
hasher = &localHasher{
|
|
||||||
storage: h.localStore,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
hasher = &remoteHasher{
|
|
||||||
keyStorage: h.keyStorage,
|
|
||||||
node: addr,
|
|
||||||
clientCache: h.clientCache,
|
|
||||||
clientOpts: h.clientOpts,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := hasher.hashRange(ctx, prm, w.write); err != nil {
|
|
||||||
svcutil.LogServiceError(h.log, "RANGEHASH", addr, err)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}); err != nil {
|
|
||||||
wg.Done()
|
|
||||||
|
|
||||||
svcutil.LogWorkerPoolError(h.log, "RANGEHASH", err)
|
|
||||||
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
if !h.traverser.Success() {
|
|
||||||
return nil, errors.Errorf("(%T) incomplete object GetRangeHash operation", h)
|
|
||||||
}
|
|
||||||
|
|
||||||
return resp, nil
|
|
||||||
}
|
|
|
@ -1,61 +0,0 @@
|
||||||
package rangehashsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"crypto/sha256"
|
|
||||||
"fmt"
|
|
||||||
"hash"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util"
|
|
||||||
"github.com/nspcc-dev/tzhash/tz"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
type localHasher struct {
|
|
||||||
storage *engine.StorageEngine
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *localHasher) hashRange(ctx context.Context, prm *Prm, handler func([][]byte)) error {
|
|
||||||
// FIXME: get partial range instead of full object.
|
|
||||||
// Current solution is simple, but more loaded
|
|
||||||
// We can calculate left and right border between all ranges
|
|
||||||
// and request bordered range (look Service.GetRangeHash).
|
|
||||||
obj, err := engine.Get(h.storage, prm.addr)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "(%T) could not get object from local storage", h)
|
|
||||||
}
|
|
||||||
|
|
||||||
payload := obj.Payload()
|
|
||||||
hashes := make([][]byte, 0, len(prm.rngs))
|
|
||||||
|
|
||||||
var hasher hash.Hash
|
|
||||||
switch prm.typ {
|
|
||||||
default:
|
|
||||||
panic(fmt.Sprintf("unexpected checksum type %v", prm.typ))
|
|
||||||
case pkg.ChecksumSHA256:
|
|
||||||
hasher = sha256.New()
|
|
||||||
case pkg.ChecksumTZ:
|
|
||||||
hasher = tz.New()
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range prm.rngs {
|
|
||||||
left := prm.rngs[i].GetOffset()
|
|
||||||
right := left + prm.rngs[i].GetLength()
|
|
||||||
|
|
||||||
if ln := uint64(len(payload)); ln < right {
|
|
||||||
return errors.Errorf("(%T) object range is out-of-boundaries (size %d, range [%d:%d]", h, ln, left, right)
|
|
||||||
}
|
|
||||||
|
|
||||||
hasher.Reset()
|
|
||||||
|
|
||||||
hasher.Write(util.SaltXOR(payload[left:right], prm.salt))
|
|
||||||
|
|
||||||
hashes = append(hashes, hasher.Sum(nil))
|
|
||||||
}
|
|
||||||
|
|
||||||
handler(hashes)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,51 +0,0 @@
|
||||||
package rangehashsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg"
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Prm struct {
|
|
||||||
common *util.CommonPrm
|
|
||||||
|
|
||||||
addr *object.Address
|
|
||||||
|
|
||||||
typ pkg.ChecksumType
|
|
||||||
|
|
||||||
rngs []*object.Range
|
|
||||||
|
|
||||||
salt []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Prm) WithCommonPrm(v *util.CommonPrm) *Prm {
|
|
||||||
if p != nil {
|
|
||||||
p.common = v
|
|
||||||
}
|
|
||||||
|
|
||||||
return p
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Prm) WithAddress(v *object.Address) *Prm {
|
|
||||||
if p != nil {
|
|
||||||
p.addr = v
|
|
||||||
}
|
|
||||||
|
|
||||||
return p
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Prm) WithChecksumType(typ pkg.ChecksumType) *Prm {
|
|
||||||
if p != nil {
|
|
||||||
p.typ = typ
|
|
||||||
}
|
|
||||||
|
|
||||||
return p
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Prm) FromRanges(v ...*object.Range) *Prm {
|
|
||||||
if p != nil {
|
|
||||||
p.rngs = v
|
|
||||||
}
|
|
||||||
|
|
||||||
return p
|
|
||||||
}
|
|
|
@ -1,80 +0,0 @@
|
||||||
package rangehashsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg"
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
type remoteHasher struct {
|
|
||||||
keyStorage *util.KeyStorage
|
|
||||||
|
|
||||||
node *network.Address
|
|
||||||
|
|
||||||
clientCache *cache.ClientCache
|
|
||||||
|
|
||||||
clientOpts []client.Option
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *remoteHasher) hashRange(ctx context.Context, prm *Prm, handler func([][]byte)) error {
|
|
||||||
key, err := h.keyStorage.GetKey(prm.common.SessionToken())
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "(%T) could not receive private key", h)
|
|
||||||
}
|
|
||||||
|
|
||||||
addr, err := h.node.IPAddrString()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
c, err := h.clientCache.Get(key, addr, h.clientOpts...)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "(%T) could not create SDK client %s", h, addr)
|
|
||||||
}
|
|
||||||
|
|
||||||
hashes := make([][]byte, 0, len(prm.rngs))
|
|
||||||
|
|
||||||
p := new(client.RangeChecksumParams).
|
|
||||||
WithAddress(prm.addr).
|
|
||||||
WithSalt(prm.salt).
|
|
||||||
WithRangeList(prm.rngs...)
|
|
||||||
|
|
||||||
opts := []client.CallOption{
|
|
||||||
client.WithTTL(1), // FIXME: use constant
|
|
||||||
client.WithSession(prm.common.SessionToken()),
|
|
||||||
client.WithBearer(prm.common.BearerToken()),
|
|
||||||
}
|
|
||||||
|
|
||||||
switch prm.typ {
|
|
||||||
default:
|
|
||||||
panic(fmt.Sprintf("unexpected checksum type %v", prm.typ))
|
|
||||||
case pkg.ChecksumSHA256:
|
|
||||||
v, err := c.ObjectPayloadRangeSHA256(ctx, p, opts...)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "(%T) could not get SHA256 checksum from %s", h, addr)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range v {
|
|
||||||
hashes = append(hashes, v[i][:])
|
|
||||||
}
|
|
||||||
case pkg.ChecksumTZ:
|
|
||||||
v, err := c.ObjectPayloadRangeTZ(ctx, p, opts...)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "(%T) could not get Tillich-Zemor checksum from %s", h, addr)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range v {
|
|
||||||
hashes = append(hashes, v[i][:])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
handler(hashes)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,9 +0,0 @@
|
||||||
package rangehashsvc
|
|
||||||
|
|
||||||
type Response struct {
|
|
||||||
hashes [][]byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Response) Hashes() [][]byte {
|
|
||||||
return r.hashes
|
|
||||||
}
|
|
|
@ -1,281 +0,0 @@
|
||||||
package rangehashsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"crypto/sha256"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg"
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/container"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
|
|
||||||
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
|
|
||||||
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
|
|
||||||
objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Service struct {
|
|
||||||
*cfg
|
|
||||||
}
|
|
||||||
|
|
||||||
type Option func(*cfg)
|
|
||||||
|
|
||||||
type cfg struct {
|
|
||||||
keyStorage *objutil.KeyStorage
|
|
||||||
|
|
||||||
localStore *engine.StorageEngine
|
|
||||||
|
|
||||||
cnrSrc container.Source
|
|
||||||
|
|
||||||
netMapSrc netmap.Source
|
|
||||||
|
|
||||||
workerPool util.WorkerPool
|
|
||||||
|
|
||||||
localAddrSrc network.LocalAddressSource
|
|
||||||
|
|
||||||
headSvc *headsvc.Service
|
|
||||||
|
|
||||||
rangeSvc *getsvc.Service
|
|
||||||
|
|
||||||
clientCache *cache.ClientCache
|
|
||||||
|
|
||||||
log *logger.Logger
|
|
||||||
|
|
||||||
clientOpts []client.Option
|
|
||||||
}
|
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
|
||||||
return &cfg{
|
|
||||||
workerPool: new(util.SyncWorkerPool),
|
|
||||||
log: zap.L(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewService(opts ...Option) *Service {
|
|
||||||
c := defaultCfg()
|
|
||||||
|
|
||||||
for i := range opts {
|
|
||||||
opts[i](c)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &Service{
|
|
||||||
cfg: c,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) GetRangeHash(ctx context.Context, prm *Prm) (*Response, error) {
|
|
||||||
headResult, err := s.headSvc.Head(ctx, new(headsvc.Prm).
|
|
||||||
WithAddress(prm.addr).
|
|
||||||
WithCommonPrm(prm.common),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "(%T) could not receive Head result", s)
|
|
||||||
}
|
|
||||||
|
|
||||||
origin := headResult.Header()
|
|
||||||
|
|
||||||
originSize := origin.PayloadSize()
|
|
||||||
|
|
||||||
var minLeft, maxRight uint64
|
|
||||||
for i := range prm.rngs {
|
|
||||||
left := prm.rngs[i].GetOffset()
|
|
||||||
right := left + prm.rngs[i].GetLength()
|
|
||||||
|
|
||||||
if originSize < right {
|
|
||||||
return nil, errors.Errorf("(%T) requested payload range is out-of-bounds", s)
|
|
||||||
}
|
|
||||||
|
|
||||||
if left < minLeft {
|
|
||||||
minLeft = left
|
|
||||||
}
|
|
||||||
|
|
||||||
if right > maxRight {
|
|
||||||
maxRight = right
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
borderRng := new(object.Range)
|
|
||||||
borderRng.SetOffset(minLeft)
|
|
||||||
borderRng.SetLength(maxRight - minLeft)
|
|
||||||
|
|
||||||
return s.getHashes(ctx, prm, objutil.NewRangeTraverser(originSize, origin, borderRng))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) getHashes(ctx context.Context, prm *Prm, traverser *objutil.RangeTraverser) (*Response, error) {
|
|
||||||
addr := object.NewAddress()
|
|
||||||
addr.SetContainerID(prm.addr.ContainerID())
|
|
||||||
|
|
||||||
resp := &Response{
|
|
||||||
hashes: make([][]byte, 0, len(prm.rngs)),
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, rng := range prm.rngs {
|
|
||||||
for {
|
|
||||||
nextID, nextRng := traverser.Next()
|
|
||||||
if nextRng != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
addr.SetObjectID(nextID)
|
|
||||||
|
|
||||||
head, err := s.headSvc.Head(ctx, new(headsvc.Prm).
|
|
||||||
WithAddress(addr).
|
|
||||||
WithCommonPrm(prm.common),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "(%T) could not receive object header", s)
|
|
||||||
}
|
|
||||||
|
|
||||||
traverser.PushHeader(head.Header())
|
|
||||||
}
|
|
||||||
|
|
||||||
traverser.SetSeekRange(rng)
|
|
||||||
|
|
||||||
var hasher hasher
|
|
||||||
|
|
||||||
for {
|
|
||||||
nextID, nextRng := traverser.Next()
|
|
||||||
|
|
||||||
if hasher == nil {
|
|
||||||
if nextRng.GetLength() == rng.GetLength() {
|
|
||||||
hasher = new(singleHasher)
|
|
||||||
} else {
|
|
||||||
switch prm.typ {
|
|
||||||
default:
|
|
||||||
panic(fmt.Sprintf("unexpected checksum type %v", prm.typ))
|
|
||||||
case pkg.ChecksumSHA256:
|
|
||||||
hasher = &commonHasher{h: sha256.New()}
|
|
||||||
case pkg.ChecksumTZ:
|
|
||||||
hasher = &tzHasher{
|
|
||||||
hashes: make([][]byte, 0, 10),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if nextRng.GetLength() == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
addr.SetObjectID(nextID)
|
|
||||||
|
|
||||||
if prm.typ == pkg.ChecksumSHA256 && nextRng.GetLength() != rng.GetLength() {
|
|
||||||
// here we cannot receive SHA256 checksum through GetRangeHash service
|
|
||||||
// since SHA256 is not homomorphic
|
|
||||||
rngPrm := getsvc.RangePrm{}
|
|
||||||
rngPrm.SetRange(nextRng)
|
|
||||||
rngPrm.WithAddress(addr)
|
|
||||||
rngPrm.SetChunkWriter(hasher)
|
|
||||||
|
|
||||||
err := s.rangeSvc.GetRange(ctx, rngPrm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "(%T) could not receive payload range for %v checksum", s, prm.typ)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
resp, err := (&distributedHasher{
|
|
||||||
cfg: s.cfg,
|
|
||||||
}).head(ctx, new(Prm).
|
|
||||||
WithAddress(addr).
|
|
||||||
WithChecksumType(prm.typ).
|
|
||||||
FromRanges(nextRng).
|
|
||||||
WithCommonPrm(prm.common),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "(%T) could not receive %v checksum", s, prm.typ)
|
|
||||||
}
|
|
||||||
|
|
||||||
hs := resp.Hashes()
|
|
||||||
if ln := len(hs); ln != 1 {
|
|
||||||
return nil, errors.Errorf("(%T) unexpected %v hashes amount %d", s, prm.typ, ln)
|
|
||||||
}
|
|
||||||
|
|
||||||
_ = hasher.WriteChunk(hs[0])
|
|
||||||
}
|
|
||||||
|
|
||||||
traverser.PushSuccessSize(nextRng.GetLength())
|
|
||||||
}
|
|
||||||
|
|
||||||
sum, err := hasher.sum()
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "(%T) could not calculate %v checksum", s, prm.typ)
|
|
||||||
}
|
|
||||||
|
|
||||||
resp.hashes = append(resp.hashes, sum)
|
|
||||||
}
|
|
||||||
|
|
||||||
return resp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithKeyStorage(v *objutil.KeyStorage) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.keyStorage = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithLocalStorage(v *engine.StorageEngine) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.localStore = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithContainerSource(v container.Source) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.cnrSrc = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithNetworkMapSource(v netmap.Source) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.netMapSrc = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithWorkerPool(v util.WorkerPool) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.workerPool = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithLocalAddressSource(v network.LocalAddressSource) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.localAddrSrc = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithHeadService(v *headsvc.Service) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.headSvc = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithRangeService(v *getsvc.Service) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.rangeSvc = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithClientCache(v *cache.ClientCache) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.clientCache = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithLogger(l *logger.Logger) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.log = l
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithClientOptions(opts ...client.Option) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.clientOpts = opts
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,75 +0,0 @@
|
||||||
package rangehashsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"hash"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
|
||||||
"github.com/nspcc-dev/tzhash/tz"
|
|
||||||
)
|
|
||||||
|
|
||||||
type onceHashWriter struct {
|
|
||||||
once *sync.Once
|
|
||||||
|
|
||||||
traverser *placement.Traverser
|
|
||||||
|
|
||||||
resp *Response
|
|
||||||
|
|
||||||
cancel context.CancelFunc
|
|
||||||
}
|
|
||||||
|
|
||||||
type hasher interface {
|
|
||||||
WriteChunk([]byte) error
|
|
||||||
sum() ([]byte, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type tzHasher struct {
|
|
||||||
hashes [][]byte
|
|
||||||
}
|
|
||||||
|
|
||||||
type commonHasher struct {
|
|
||||||
h hash.Hash
|
|
||||||
}
|
|
||||||
|
|
||||||
type singleHasher struct {
|
|
||||||
hash []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *singleHasher) WriteChunk(p []byte) error {
|
|
||||||
h.hash = p
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *singleHasher) sum() ([]byte, error) {
|
|
||||||
return h.hash, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *onceHashWriter) write(hs [][]byte) {
|
|
||||||
w.once.Do(func() {
|
|
||||||
w.resp.hashes = hs
|
|
||||||
w.traverser.SubmitSuccess()
|
|
||||||
w.cancel()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *tzHasher) WriteChunk(p []byte) error {
|
|
||||||
h.hashes = append(h.hashes, p)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *tzHasher) sum() ([]byte, error) {
|
|
||||||
return tz.Concat(h.hashes)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *commonHasher) WriteChunk(p []byte) error {
|
|
||||||
h.h.Write(p)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *commonHasher) sum() ([]byte, error) {
|
|
||||||
return h.h.Sum(nil), nil
|
|
||||||
}
|
|
|
@ -1,55 +0,0 @@
|
||||||
package rangehashsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
|
||||||
rangehashsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/rangehash"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Service implements GetRangeHash operation of Object service v2.
|
|
||||||
type Service struct {
|
|
||||||
*cfg
|
|
||||||
}
|
|
||||||
|
|
||||||
// Option represents Service constructor option.
|
|
||||||
type Option func(*cfg)
|
|
||||||
|
|
||||||
type cfg struct {
|
|
||||||
svc *rangehashsvc.Service
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewService constructs Service instance from provided options.
|
|
||||||
func NewService(opts ...Option) *Service {
|
|
||||||
c := new(cfg)
|
|
||||||
|
|
||||||
for i := range opts {
|
|
||||||
opts[i](c)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &Service{
|
|
||||||
cfg: c,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Head calls internal service and returns v2 object header.
|
|
||||||
func (s *Service) GetRangeHash(ctx context.Context, req *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) {
|
|
||||||
prm, err := toPrm(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "(%T) incorrect input parameters", s)
|
|
||||||
}
|
|
||||||
|
|
||||||
r, err := s.svc.GetRangeHash(ctx, prm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "(%T) could not get range hashes", s)
|
|
||||||
}
|
|
||||||
|
|
||||||
return fromResponse(r, req.GetBody().GetType()), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithInternalService(v *rangehashsvc.Service) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.svc = v
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,51 +0,0 @@
|
||||||
package rangehashsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg"
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
||||||
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/v2/refs"
|
|
||||||
rangehashsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/rangehash"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
func toPrm(req *objectV2.GetRangeHashRequest) (*rangehashsvc.Prm, error) {
|
|
||||||
body := req.GetBody()
|
|
||||||
|
|
||||||
var typ pkg.ChecksumType
|
|
||||||
switch t := body.GetType(); t {
|
|
||||||
default:
|
|
||||||
return nil, errors.Errorf("unknown checksum type %v", t)
|
|
||||||
case refs.SHA256:
|
|
||||||
typ = pkg.ChecksumSHA256
|
|
||||||
case refs.TillichZemor:
|
|
||||||
typ = pkg.ChecksumTZ
|
|
||||||
}
|
|
||||||
|
|
||||||
rngsV2 := body.GetRanges()
|
|
||||||
rngs := make([]*object.Range, 0, len(rngsV2))
|
|
||||||
|
|
||||||
for i := range rngsV2 {
|
|
||||||
rngs = append(rngs, object.NewRangeFromV2(rngsV2[i]))
|
|
||||||
}
|
|
||||||
|
|
||||||
return new(rangehashsvc.Prm).
|
|
||||||
WithAddress(
|
|
||||||
object.NewAddressFromV2(body.GetAddress()),
|
|
||||||
).
|
|
||||||
WithChecksumType(typ).
|
|
||||||
FromRanges(rngs...).
|
|
||||||
WithCommonPrm(util.CommonPrmFromV2(req)), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func fromResponse(r *rangehashsvc.Response, typ refs.ChecksumType) *objectV2.GetRangeHashResponse {
|
|
||||||
body := new(objectV2.GetRangeHashResponseBody)
|
|
||||||
body.SetType(typ)
|
|
||||||
body.SetHashList(r.Hashes())
|
|
||||||
|
|
||||||
resp := new(objectV2.GetRangeHashResponse)
|
|
||||||
resp.SetBody(body)
|
|
||||||
|
|
||||||
return resp
|
|
||||||
}
|
|
Loading…
Reference in a new issue