[#19] transformer: Do not reuse memory of sent objects
Slower, but more correct. ``` name old time/op new time/op delta Transformer/small-8 72.4µs ± 8% 74.8µs ±11% ~ (p=0.278 n=9+10) Transformer/big-8 1.31s ± 8% 1.38s ±11% +5.50% (p=0.035 n=10+10) name old alloc/op new alloc/op delta Transformer/small-8 7.39kB ± 0% 7.69kB ± 0% +4.04% (p=0.000 n=10+10) Transformer/big-8 46.9kB ± 0% 49.2kB ± 0% +4.87% (p=0.000 n=10+10) name old allocs/op new allocs/op delta Transformer/small-8 94.6 ± 1% 102.0 ± 0% +7.82% (p=0.000 n=10+9) Transformer/big-8 560 ± 0% 620 ± 1% +10.66% (p=0.000 n=10+10) ``` Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
611e20587b
commit
e45647de3c
3 changed files with 56 additions and 61 deletions
38
object/transformer/hasher.go
Normal file
38
object/transformer/hasher.go
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
package transformer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/sha256"
|
||||||
|
"hash"
|
||||||
|
|
||||||
|
"github.com/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||||
|
objectSDK "github.com/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"github.com/TrueCloudLab/tzhash/tz"
|
||||||
|
)
|
||||||
|
|
||||||
|
type payloadChecksumHasher struct {
|
||||||
|
hasher hash.Hash
|
||||||
|
typ checksum.Type
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h payloadChecksumHasher) writeChecksum(obj *objectSDK.Object) {
|
||||||
|
switch h.typ {
|
||||||
|
case checksum.SHA256:
|
||||||
|
csSHA := [sha256.Size]byte{}
|
||||||
|
h.hasher.Sum(csSHA[:0])
|
||||||
|
|
||||||
|
var cs checksum.Checksum
|
||||||
|
cs.SetSHA256(csSHA)
|
||||||
|
|
||||||
|
obj.SetPayloadChecksum(cs)
|
||||||
|
case checksum.TZ:
|
||||||
|
csTZ := [tz.Size]byte{}
|
||||||
|
h.hasher.Sum(csTZ[:0])
|
||||||
|
|
||||||
|
var cs checksum.Checksum
|
||||||
|
cs.SetTillichZemor(csTZ)
|
||||||
|
|
||||||
|
obj.SetPayloadHomomorphicHash(cs)
|
||||||
|
default:
|
||||||
|
panic("unreachable")
|
||||||
|
}
|
||||||
|
}
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"fmt"
|
"fmt"
|
||||||
"hash"
|
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/TrueCloudLab/frostfs-sdk-go/checksum"
|
"github.com/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||||
|
@ -33,12 +32,6 @@ type payloadSizeLimiter struct {
|
||||||
parAttrs []object.Attribute
|
parAttrs []object.Attribute
|
||||||
}
|
}
|
||||||
|
|
||||||
type payloadChecksumHasher struct {
|
|
||||||
hasher hash.Hash
|
|
||||||
|
|
||||||
checksumWriter func([]byte)
|
|
||||||
}
|
|
||||||
|
|
||||||
type Params struct {
|
type Params struct {
|
||||||
Key *ecdsa.PrivateKey
|
Key *ecdsa.PrivateKey
|
||||||
NextTarget ObjectTarget
|
NextTarget ObjectTarget
|
||||||
|
@ -83,11 +76,17 @@ func (s *payloadSizeLimiter) Close() (*AccessIdentifiers, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *payloadSizeLimiter) initialize() {
|
func (s *payloadSizeLimiter) initialize() {
|
||||||
|
s.current = fromObject(s.current)
|
||||||
|
|
||||||
// if it is an object after the 1st
|
// if it is an object after the 1st
|
||||||
if ln := len(s.previous); ln > 0 {
|
if ln := len(s.previous); ln > 0 {
|
||||||
// initialize parent object once (after 1st object)
|
// initialize parent object once (after 1st object)
|
||||||
if ln == 1 {
|
if ln == 1 {
|
||||||
s.detachParent()
|
s.parent = fromObject(s.current)
|
||||||
|
s.parentHashers = s.currentHashers
|
||||||
|
|
||||||
|
// return source attributes
|
||||||
|
s.parent.SetAttributes(s.parAttrs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// set previous object to the last previous identifier
|
// set previous object to the last previous identifier
|
||||||
|
@ -118,7 +117,7 @@ func fromObject(obj *object.Object) *object.Object {
|
||||||
func (s *payloadSizeLimiter) initializeCurrent() {
|
func (s *payloadSizeLimiter) initializeCurrent() {
|
||||||
// create payload hashers
|
// create payload hashers
|
||||||
s.writtenCurrent = 0
|
s.writtenCurrent = 0
|
||||||
s.currentHashers = payloadHashersForObject(s.current, s.WithoutHomomorphicHash)
|
s.currentHashers = payloadHashersForObject(s.WithoutHomomorphicHash)
|
||||||
|
|
||||||
// compose multi-writer from target and all payload hashers
|
// compose multi-writer from target and all payload hashers
|
||||||
ws := make([]io.Writer, 0, 1+len(s.currentHashers)+len(s.parentHashers))
|
ws := make([]io.Writer, 0, 1+len(s.currentHashers)+len(s.parentHashers))
|
||||||
|
@ -136,42 +135,18 @@ func (s *payloadSizeLimiter) initializeCurrent() {
|
||||||
s.chunkWriter = io.MultiWriter(ws...)
|
s.chunkWriter = io.MultiWriter(ws...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func payloadHashersForObject(obj *object.Object, withoutHomomorphicHash bool) []*payloadChecksumHasher {
|
func payloadHashersForObject(withoutHomomorphicHash bool) []*payloadChecksumHasher {
|
||||||
hashers := make([]*payloadChecksumHasher, 0, 2)
|
hashers := make([]*payloadChecksumHasher, 0, 2)
|
||||||
|
|
||||||
hashers = append(hashers, &payloadChecksumHasher{
|
hashers = append(hashers, &payloadChecksumHasher{
|
||||||
hasher: sha256.New(),
|
hasher: sha256.New(),
|
||||||
checksumWriter: func(binChecksum []byte) {
|
typ: checksum.SHA256,
|
||||||
if ln := len(binChecksum); ln != sha256.Size {
|
|
||||||
panic(fmt.Sprintf("wrong checksum length: expected %d, has %d", sha256.Size, ln))
|
|
||||||
}
|
|
||||||
|
|
||||||
csSHA := [sha256.Size]byte{}
|
|
||||||
copy(csSHA[:], binChecksum)
|
|
||||||
|
|
||||||
var cs checksum.Checksum
|
|
||||||
cs.SetSHA256(csSHA)
|
|
||||||
|
|
||||||
obj.SetPayloadChecksum(cs)
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if !withoutHomomorphicHash {
|
if !withoutHomomorphicHash {
|
||||||
hashers = append(hashers, &payloadChecksumHasher{
|
hashers = append(hashers, &payloadChecksumHasher{
|
||||||
hasher: tz.New(),
|
hasher: tz.New(),
|
||||||
checksumWriter: func(binChecksum []byte) {
|
typ: checksum.TZ,
|
||||||
if ln := len(binChecksum); ln != tz.Size {
|
|
||||||
panic(fmt.Sprintf("wrong checksum length: expected %d, has %d", tz.Size, ln))
|
|
||||||
}
|
|
||||||
|
|
||||||
csTZ := [tz.Size]byte{}
|
|
||||||
copy(csTZ[:], binChecksum)
|
|
||||||
|
|
||||||
var cs checksum.Checksum
|
|
||||||
cs.SetTillichZemor(csTZ)
|
|
||||||
|
|
||||||
obj.SetPayloadHomomorphicHash(cs)
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,13 +160,17 @@ func (s *payloadSizeLimiter) release(finalize bool) (*AccessIdentifiers, error)
|
||||||
withParent := finalize && len(s.previous) > 0
|
withParent := finalize && len(s.previous) > 0
|
||||||
|
|
||||||
if withParent {
|
if withParent {
|
||||||
writeHashes(s.parentHashers)
|
for i := range s.parentHashers {
|
||||||
|
s.parentHashers[i].writeChecksum(s.parent)
|
||||||
|
}
|
||||||
s.parent.SetPayloadSize(s.written)
|
s.parent.SetPayloadSize(s.written)
|
||||||
s.current.SetParent(s.parent)
|
s.current.SetParent(s.parent)
|
||||||
}
|
}
|
||||||
|
|
||||||
// release current object
|
// release current object
|
||||||
writeHashes(s.currentHashers)
|
for i := range s.currentHashers {
|
||||||
|
s.currentHashers[i].writeChecksum(s.current)
|
||||||
|
}
|
||||||
|
|
||||||
curEpoch := s.NetworkState.CurrentEpoch()
|
curEpoch := s.NetworkState.CurrentEpoch()
|
||||||
ver := version.Current()
|
ver := version.Current()
|
||||||
|
@ -259,12 +238,6 @@ func (s *payloadSizeLimiter) release(finalize bool) (*AccessIdentifiers, error)
|
||||||
return ids, nil
|
return ids, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func writeHashes(hashers []*payloadChecksumHasher) {
|
|
||||||
for i := range hashers {
|
|
||||||
hashers[i].checksumWriter(hashers[i].hasher.Sum(nil))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *payloadSizeLimiter) initializeLinking(parHdr *object.Object) {
|
func (s *payloadSizeLimiter) initializeLinking(parHdr *object.Object) {
|
||||||
s.current = fromObject(s.current)
|
s.current = fromObject(s.current)
|
||||||
s.current.SetParent(parHdr)
|
s.current.SetParent(parHdr)
|
||||||
|
@ -327,14 +300,3 @@ func (s *payloadSizeLimiter) prepareFirstChild() {
|
||||||
|
|
||||||
// attributes will be added to parent in detachParent
|
// attributes will be added to parent in detachParent
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *payloadSizeLimiter) detachParent() {
|
|
||||||
s.parent = s.current
|
|
||||||
s.current = fromObject(s.parent)
|
|
||||||
s.parent.ResetRelations()
|
|
||||||
s.parent.SetSignature(nil)
|
|
||||||
s.parentHashers = s.currentHashers
|
|
||||||
|
|
||||||
// return source attributes
|
|
||||||
s.parent.SetAttributes(s.parAttrs...)
|
|
||||||
}
|
|
||||||
|
|
|
@ -182,12 +182,7 @@ func (tt *testTarget) Write(p []byte) (n int, err error) {
|
||||||
|
|
||||||
func (tt *testTarget) Close() (*AccessIdentifiers, error) {
|
func (tt *testTarget) Close() (*AccessIdentifiers, error) {
|
||||||
tt.current.SetPayload(tt.payload)
|
tt.current.SetPayload(tt.payload)
|
||||||
// We need to marshal, because current implementation reuses written object.
|
tt.objects = append(tt.objects, tt.current)
|
||||||
data, _ := tt.current.Marshal()
|
|
||||||
obj := objectSDK.New()
|
|
||||||
_ = obj.Unmarshal(data)
|
|
||||||
|
|
||||||
tt.objects = append(tt.objects, obj)
|
|
||||||
tt.current = nil
|
tt.current = nil
|
||||||
tt.payload = nil
|
tt.payload = nil
|
||||||
return nil, nil // AccessIdentifiers should not be used.
|
return nil, nil // AccessIdentifiers should not be used.
|
||||||
|
|
Loading…
Reference in a new issue