[#1365] node: Calculate object homomorphic hash flexibly
Do not calculate and do not write homomorphic hash for containers that were configured to store objects without hash. Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
ae4740f99f
commit
89118e9da0
3 changed files with 41 additions and 23 deletions
|
@ -4,6 +4,7 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||||
|
containerSDK "github.com/nspcc-dev/neofs-sdk-go/container"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -12,6 +13,8 @@ type PutInitPrm struct {
|
||||||
|
|
||||||
hdr *object.Object
|
hdr *object.Object
|
||||||
|
|
||||||
|
cnr containerSDK.Container
|
||||||
|
|
||||||
traverseOpts []placement.Option
|
traverseOpts []placement.Option
|
||||||
|
|
||||||
relay func(client.NodeInfo, client.MultiAddressClient) error
|
relay func(client.NodeInfo, client.MultiAddressClient) error
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
|
||||||
|
containerSDK "github.com/nspcc-dev/neofs-sdk-go/container"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/user"
|
"github.com/nspcc-dev/neofs-sdk-go/user"
|
||||||
)
|
)
|
||||||
|
@ -119,6 +120,7 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error {
|
||||||
unpreparedObject: true,
|
unpreparedObject: true,
|
||||||
nextTarget: transformer.NewPayloadSizeLimiter(
|
nextTarget: transformer.NewPayloadSizeLimiter(
|
||||||
p.maxPayloadSz,
|
p.maxPayloadSz,
|
||||||
|
containerSDK.IsHomomorphicHashingDisabled(prm.cnr),
|
||||||
func() transformer.ObjectTarget {
|
func() transformer.ObjectTarget {
|
||||||
return transformer.NewFormatTarget(&transformer.FormatterParams{
|
return transformer.NewFormatTarget(&transformer.FormatterParams{
|
||||||
Key: sessionKey,
|
Key: sessionKey,
|
||||||
|
@ -148,15 +150,17 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// get container to store the object
|
// get container to store the object
|
||||||
cnr, err := p.cnrSrc.Get(idCnr)
|
cnrInfo, err := p.cnrSrc.Get(idCnr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("(%T) could not get container by ID: %w", p, err)
|
return fmt.Errorf("(%T) could not get container by ID: %w", p, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
prm.cnr = cnrInfo.Value
|
||||||
|
|
||||||
// add common options
|
// add common options
|
||||||
prm.traverseOpts = append(prm.traverseOpts,
|
prm.traverseOpts = append(prm.traverseOpts,
|
||||||
// set processing container
|
// set processing container
|
||||||
placement.ForContainer(cnr.Value),
|
placement.ForContainer(prm.cnr),
|
||||||
)
|
)
|
||||||
|
|
||||||
if id, ok := prm.hdr.ID(); ok {
|
if id, ok := prm.hdr.ID(); ok {
|
||||||
|
|
|
@ -15,6 +15,8 @@ import (
|
||||||
type payloadSizeLimiter struct {
|
type payloadSizeLimiter struct {
|
||||||
maxSize, written uint64
|
maxSize, written uint64
|
||||||
|
|
||||||
|
withoutHomomorphicHash bool
|
||||||
|
|
||||||
targetInit func() ObjectTarget
|
targetInit func() ObjectTarget
|
||||||
|
|
||||||
target ObjectTarget
|
target ObjectTarget
|
||||||
|
@ -41,12 +43,16 @@ type payloadChecksumHasher struct {
|
||||||
// NewPayloadSizeLimiter returns ObjectTarget instance that restricts payload length
|
// NewPayloadSizeLimiter returns ObjectTarget instance that restricts payload length
|
||||||
// of the writing object and writes generated objects to targets from initializer.
|
// of the writing object and writes generated objects to targets from initializer.
|
||||||
//
|
//
|
||||||
|
// Calculates and adds homomorphic hash to resulting objects only if withoutHomomorphicHash
|
||||||
|
// is false.
|
||||||
|
//
|
||||||
// Objects w/ payload size less or equal than max size remain untouched.
|
// Objects w/ payload size less or equal than max size remain untouched.
|
||||||
func NewPayloadSizeLimiter(maxSize uint64, targetInit TargetInitializer) ObjectTarget {
|
func NewPayloadSizeLimiter(maxSize uint64, withoutHomomorphicHash bool, targetInit TargetInitializer) ObjectTarget {
|
||||||
return &payloadSizeLimiter{
|
return &payloadSizeLimiter{
|
||||||
maxSize: maxSize,
|
maxSize: maxSize,
|
||||||
targetInit: targetInit,
|
withoutHomomorphicHash: withoutHomomorphicHash,
|
||||||
splitID: object.NewSplitID(),
|
targetInit: targetInit,
|
||||||
|
splitID: object.NewSplitID(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,7 +114,7 @@ func (s *payloadSizeLimiter) initializeCurrent() {
|
||||||
s.target = s.targetInit()
|
s.target = s.targetInit()
|
||||||
|
|
||||||
// create payload hashers
|
// create payload hashers
|
||||||
s.currentHashers = payloadHashersForObject(s.current)
|
s.currentHashers = payloadHashersForObject(s.current, s.withoutHomomorphicHash)
|
||||||
|
|
||||||
// compose multi-writer from target and all payload hashers
|
// compose multi-writer from target and all payload hashers
|
||||||
ws := make([]io.Writer, 0, 1+len(s.currentHashers)+len(s.parentHashers))
|
ws := make([]io.Writer, 0, 1+len(s.currentHashers)+len(s.parentHashers))
|
||||||
|
@ -126,25 +132,28 @@ func (s *payloadSizeLimiter) initializeCurrent() {
|
||||||
s.chunkWriter = io.MultiWriter(ws...)
|
s.chunkWriter = io.MultiWriter(ws...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func payloadHashersForObject(obj *object.Object) []*payloadChecksumHasher {
|
func payloadHashersForObject(obj *object.Object, withoutHomomorphicHash bool) []*payloadChecksumHasher {
|
||||||
return []*payloadChecksumHasher{
|
hashers := make([]*payloadChecksumHasher, 0, 2)
|
||||||
{
|
|
||||||
hasher: sha256.New(),
|
|
||||||
checksumWriter: func(binChecksum []byte) {
|
|
||||||
if ln := len(binChecksum); ln != sha256.Size {
|
|
||||||
panic(fmt.Sprintf("wrong checksum length: expected %d, has %d", sha256.Size, ln))
|
|
||||||
}
|
|
||||||
|
|
||||||
csSHA := [sha256.Size]byte{}
|
hashers = append(hashers, &payloadChecksumHasher{
|
||||||
copy(csSHA[:], binChecksum)
|
hasher: sha256.New(),
|
||||||
|
checksumWriter: func(binChecksum []byte) {
|
||||||
|
if ln := len(binChecksum); ln != sha256.Size {
|
||||||
|
panic(fmt.Sprintf("wrong checksum length: expected %d, has %d", sha256.Size, ln))
|
||||||
|
}
|
||||||
|
|
||||||
var cs checksum.Checksum
|
csSHA := [sha256.Size]byte{}
|
||||||
cs.SetSHA256(csSHA)
|
copy(csSHA[:], binChecksum)
|
||||||
|
|
||||||
obj.SetPayloadChecksum(cs)
|
var cs checksum.Checksum
|
||||||
},
|
cs.SetSHA256(csSHA)
|
||||||
|
|
||||||
|
obj.SetPayloadChecksum(cs)
|
||||||
},
|
},
|
||||||
{
|
})
|
||||||
|
|
||||||
|
if !withoutHomomorphicHash {
|
||||||
|
hashers = append(hashers, &payloadChecksumHasher{
|
||||||
hasher: tz.New(),
|
hasher: tz.New(),
|
||||||
checksumWriter: func(binChecksum []byte) {
|
checksumWriter: func(binChecksum []byte) {
|
||||||
if ln := len(binChecksum); ln != tz.Size {
|
if ln := len(binChecksum); ln != tz.Size {
|
||||||
|
@ -159,8 +168,10 @@ func payloadHashersForObject(obj *object.Object) []*payloadChecksumHasher {
|
||||||
|
|
||||||
obj.SetPayloadHomomorphicHash(cs)
|
obj.SetPayloadHomomorphicHash(cs)
|
||||||
},
|
},
|
||||||
},
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return hashers
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *payloadSizeLimiter) release(close bool) (*AccessIdentifiers, error) {
|
func (s *payloadSizeLimiter) release(close bool) (*AccessIdentifiers, error) {
|
||||||
|
|
Loading…
Reference in a new issue