[#1365] node: Calculate object homomorphic hash flexibly

Do not calculate and do not write homomorphic hash for containers that were
configured to store objects without hash.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2022-04-29 21:44:00 +03:00 committed by Evgenii Stratonikov
parent e9c534b0a0
commit adcda361a7
3 changed files with 45 additions and 28 deletions

View file

@ -4,6 +4,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/core/client"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
containerSDK "github.com/nspcc-dev/neofs-sdk-go/container"
"github.com/nspcc-dev/neofs-sdk-go/object"
)
@ -12,6 +13,8 @@ type PutInitPrm struct {
hdr *object.Object
cnr containerSDK.Container
traverseOpts []placement.Option
relay func(client.NodeInfo, client.MultiAddressClient) error

View file

@ -10,6 +10,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
containerSDK "github.com/nspcc-dev/neofs-sdk-go/container"
"github.com/nspcc-dev/neofs-sdk-go/object"
"github.com/nspcc-dev/neofs-sdk-go/user"
)
@ -119,6 +120,7 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error {
unpreparedObject: true,
nextTarget: transformer.NewPayloadSizeLimiter(
p.maxPayloadSz,
containerSDK.IsHomomorphicHashingDisabled(prm.cnr),
func() transformer.ObjectTarget {
return transformer.NewFormatTarget(&transformer.FormatterParams{
Key: sessionKey,
@ -148,15 +150,17 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
}
// get container to store the object
cnr, err := p.cnrSrc.Get(idCnr)
cnrInfo, err := p.cnrSrc.Get(idCnr)
if err != nil {
return fmt.Errorf("(%T) could not get container by ID: %w", p, err)
}
prm.cnr = cnrInfo.Value
// add common options
prm.traverseOpts = append(prm.traverseOpts,
// set processing container
placement.ForContainer(cnr.Value),
placement.ForContainer(prm.cnr),
)
if id, ok := prm.hdr.ID(); ok {

View file

@ -9,12 +9,13 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/checksum"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/tzhash/tz"
)
type payloadSizeLimiter struct {
maxSize, written uint64
withoutHomomorphicHash bool
targetInit func() ObjectTarget
target ObjectTarget
@ -41,12 +42,16 @@ type payloadChecksumHasher struct {
// NewPayloadSizeLimiter returns ObjectTarget instance that restricts payload length
// of the writing object and writes generated objects to targets from initializer.
//
// Calculates and adds homomorphic hash to resulting objects only if withoutHomomorphicHash
// is false.
//
// Objects w/ payload size less or equal than max size remain untouched.
func NewPayloadSizeLimiter(maxSize uint64, targetInit TargetInitializer) ObjectTarget {
func NewPayloadSizeLimiter(maxSize uint64, withoutHomomorphicHash bool, targetInit TargetInitializer) ObjectTarget {
return &payloadSizeLimiter{
maxSize: maxSize,
targetInit: targetInit,
splitID: object.NewSplitID(),
maxSize: maxSize,
withoutHomomorphicHash: withoutHomomorphicHash,
targetInit: targetInit,
splitID: object.NewSplitID(),
}
}
@ -108,7 +113,7 @@ func (s *payloadSizeLimiter) initializeCurrent() {
s.target = s.targetInit()
// create payload hashers
s.currentHashers = payloadHashersForObject(s.current)
s.currentHashers = payloadHashersForObject(s.current, s.withoutHomomorphicHash)
// compose multi-writer from target and all payload hashers
ws := make([]io.Writer, 0, 1+len(s.currentHashers)+len(s.parentHashers))
@ -126,9 +131,28 @@ func (s *payloadSizeLimiter) initializeCurrent() {
s.chunkWriter = io.MultiWriter(ws...)
}
func payloadHashersForObject(obj *object.Object) []*payloadChecksumHasher {
return []*payloadChecksumHasher{
{
func payloadHashersForObject(obj *object.Object, withoutHomomorphicHash bool) []*payloadChecksumHasher {
hashers := make([]*payloadChecksumHasher, 0, 2)
hashers = append(hashers, &payloadChecksumHasher{
hasher: sha256.New(),
checksumWriter: func(binChecksum []byte) {
if ln := len(binChecksum); ln != sha256.Size {
panic(fmt.Sprintf("wrong checksum length: expected %d, has %d", sha256.Size, ln))
}
csSHA := [sha256.Size]byte{}
copy(csSHA[:], binChecksum)
var cs checksum.Checksum
cs.SetSHA256(csSHA)
obj.SetPayloadChecksum(cs)
},
})
if !withoutHomomorphicHash {
hashers = append(hashers, &payloadChecksumHasher{
hasher: sha256.New(),
checksumWriter: func(binChecksum []byte) {
if ln := len(binChecksum); ln != sha256.Size {
@ -141,26 +165,12 @@ func payloadHashersForObject(obj *object.Object) []*payloadChecksumHasher {
var cs checksum.Checksum
cs.SetSHA256(csSHA)
obj.SetPayloadChecksum(cs)
},
},
{
hasher: tz.New(),
checksumWriter: func(binChecksum []byte) {
if ln := len(binChecksum); ln != tz.Size {
panic(fmt.Sprintf("wrong checksum length: expected %d, has %d", tz.Size, ln))
}
csTZ := [tz.Size]byte{}
copy(csTZ[:], binChecksum)
var cs checksum.Checksum
cs.SetTillichZemor(csTZ)
obj.SetPayloadHomomorphicHash(cs)
},
},
})
}
return hashers
}
func (s *payloadSizeLimiter) release(close bool) (*AccessIdentifiers, error) {