forked from TrueCloudLab/frostfs-sdk-go
e2011832eb
Homomorphic hashing of object payload is not always necessary. There is a need to provide ability to skip calculation. It's also worth to not calculate it by default since current implementation of Tillich-Zemor algorithm has large resource cost. Do not calculate homomorphic checksum in `Slicer` methods by default. Provide option to enable the calculation. Make tests to randomize calculation flag and assert results according to it. Refs #342. Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
489 lines
12 KiB
Go
489 lines
12 KiB
Go
package slicer_test
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/ecdsa"
|
|
"crypto/elliptic"
|
|
cryptorand "crypto/rand"
|
|
"crypto/sha256"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"hash"
|
|
"io"
|
|
"math/rand"
|
|
"testing"
|
|
|
|
"github.com/nspcc-dev/neofs-sdk-go/checksum"
|
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
|
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
|
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
|
|
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
|
|
"github.com/nspcc-dev/neofs-sdk-go/crypto/test"
|
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
|
"github.com/nspcc-dev/neofs-sdk-go/object/slicer"
|
|
"github.com/nspcc-dev/neofs-sdk-go/session"
|
|
sessiontest "github.com/nspcc-dev/neofs-sdk-go/session/test"
|
|
"github.com/nspcc-dev/neofs-sdk-go/user"
|
|
usertest "github.com/nspcc-dev/neofs-sdk-go/user/test"
|
|
"github.com/nspcc-dev/neofs-sdk-go/version"
|
|
"github.com/nspcc-dev/tzhash/tz"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
const defaultLimit = 1 << 20
|
|
|
|
func TestSliceDataIntoObjects(t *testing.T) {
|
|
const size = 1 << 10
|
|
|
|
t.Run("known limit", func(t *testing.T) {
|
|
t.Run("under limit", func(t *testing.T) {
|
|
testSlicer(t, size, size)
|
|
testSlicer(t, size, size+1)
|
|
})
|
|
|
|
t.Run("multiple size", func(t *testing.T) {
|
|
testSlicer(t, size, 3*size)
|
|
testSlicer(t, size, 3*size+1)
|
|
})
|
|
})
|
|
|
|
t.Run("unknown limit", func(t *testing.T) {
|
|
t.Run("under limit", func(t *testing.T) {
|
|
testSlicer(t, defaultLimit-1, 0)
|
|
testSlicer(t, defaultLimit, 0)
|
|
})
|
|
|
|
t.Run("multiple size", func(t *testing.T) {
|
|
testSlicer(t, 3*defaultLimit, 0)
|
|
testSlicer(t, 3*defaultLimit+1, 0)
|
|
})
|
|
})
|
|
|
|
t.Run("no payload", func(t *testing.T) {
|
|
testSlicer(t, 0, 0)
|
|
testSlicer(t, 0, 1024)
|
|
})
|
|
}
|
|
|
|
func BenchmarkSliceDataIntoObjects(b *testing.B) {
|
|
const limit = 1 << 7
|
|
const stepFactor = 4
|
|
for size := uint64(1); size <= 1<<20; size *= stepFactor {
|
|
b.Run(fmt.Sprintf("slice_%d-%d", size, limit), func(b *testing.B) {
|
|
benchmarkSliceDataIntoObjects(b, size, limit)
|
|
})
|
|
}
|
|
}
|
|
|
|
func benchmarkSliceDataIntoObjects(b *testing.B, size, sizeLimit uint64) {
|
|
in, opts := randomInput(b, size, sizeLimit)
|
|
s := slicer.NewSession(in.signer, in.container, *sessiontest.ObjectSigned(test.RandomSigner(b)), discardObject{}, opts)
|
|
|
|
b.Run("reader", func(b *testing.B) {
|
|
var err error
|
|
r := bytes.NewReader(in.payload)
|
|
|
|
b.ReportAllocs()
|
|
b.ResetTimer()
|
|
|
|
for i := 0; i < b.N; i++ {
|
|
_, err = s.Slice(r, in.attributes...)
|
|
b.StopTimer()
|
|
require.NoError(b, err)
|
|
b.StartTimer()
|
|
}
|
|
})
|
|
|
|
b.Run("writer", func(b *testing.B) {
|
|
b.ReportAllocs()
|
|
b.ResetTimer()
|
|
|
|
var err error
|
|
var w *slicer.PayloadWriter
|
|
|
|
for i := 0; i < b.N; i++ {
|
|
w, err = s.InitPayloadStream(in.attributes...)
|
|
b.StopTimer()
|
|
require.NoError(b, err)
|
|
b.StartTimer()
|
|
|
|
_, err = w.Write(in.payload)
|
|
if err == nil {
|
|
err = w.Close()
|
|
}
|
|
|
|
b.StopTimer()
|
|
require.NoError(b, err)
|
|
b.StartTimer()
|
|
}
|
|
})
|
|
}
|
|
|
|
type discardObject struct{}
|
|
|
|
func (discardObject) InitDataStream(object.Object) (io.Writer, error) {
|
|
return discardPayload{}, nil
|
|
}
|
|
|
|
type discardPayload struct{}
|
|
|
|
func (discardPayload) Write(p []byte) (n int, err error) {
|
|
return len(p), nil
|
|
}
|
|
|
|
type input struct {
|
|
signer neofscrypto.Signer
|
|
container cid.ID
|
|
owner user.ID
|
|
currentEpoch uint64
|
|
payloadLimit uint64
|
|
sessionToken *session.Object
|
|
payload []byte
|
|
attributes []string
|
|
withHomo bool
|
|
}
|
|
|
|
func randomData(size uint64) []byte {
|
|
data := make([]byte, size)
|
|
rand.Read(data)
|
|
return data
|
|
}
|
|
|
|
func randomInput(tb testing.TB, size, sizeLimit uint64) (input, slicer.Options) {
|
|
key, err := ecdsa.GenerateKey(elliptic.P256(), cryptorand.Reader)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("generate ECDSA private key: %v", err))
|
|
}
|
|
|
|
attrNum := rand.Int() % 5
|
|
attrs := make([]string, 2*attrNum)
|
|
|
|
for i := 0; i < len(attrs); i += 2 {
|
|
attrs[i] = base64.StdEncoding.EncodeToString(randomData(32))
|
|
attrs[i+1] = base64.StdEncoding.EncodeToString(randomData(32))
|
|
}
|
|
|
|
var in input
|
|
in.signer = neofsecdsa.Signer(*key)
|
|
in.container = cidtest.ID()
|
|
in.currentEpoch = rand.Uint64()
|
|
if sizeLimit > 0 {
|
|
in.payloadLimit = sizeLimit
|
|
} else {
|
|
in.payloadLimit = defaultLimit
|
|
}
|
|
in.payload = randomData(size)
|
|
in.attributes = attrs
|
|
|
|
if rand.Int()%2 == 0 {
|
|
in.sessionToken = sessiontest.ObjectSigned(test.RandomSigner(tb))
|
|
} else {
|
|
in.owner = *usertest.ID(tb)
|
|
}
|
|
|
|
in.withHomo = rand.Int()%2 == 0
|
|
|
|
var opts slicer.Options
|
|
opts.SetObjectPayloadLimit(in.payloadLimit)
|
|
opts.SetCurrentNeoFSEpoch(in.currentEpoch)
|
|
if in.withHomo {
|
|
opts.CalculateHomomorphicChecksum()
|
|
}
|
|
|
|
return in, opts
|
|
}
|
|
|
|
func testSlicer(tb testing.TB, size, sizeLimit uint64) {
|
|
in, opts := randomInput(tb, size, sizeLimit)
|
|
|
|
checker := &slicedObjectChecker{
|
|
tb: tb,
|
|
input: in,
|
|
chainCollector: newChainCollector(tb),
|
|
}
|
|
|
|
var s *slicer.Slicer
|
|
if checker.input.sessionToken != nil {
|
|
s = slicer.NewSession(in.signer, checker.input.container, *checker.input.sessionToken, checker, opts)
|
|
} else {
|
|
s = slicer.New(in.signer, checker.input.container, checker.input.owner, checker, opts)
|
|
}
|
|
|
|
// check reader
|
|
rootID, err := s.Slice(bytes.NewReader(in.payload), in.attributes...)
|
|
require.NoError(tb, err)
|
|
checker.chainCollector.verify(checker.input, rootID)
|
|
|
|
// check writer with random written chunk's size
|
|
checker.chainCollector = newChainCollector(tb)
|
|
|
|
w, err := s.InitPayloadStream(in.attributes...)
|
|
require.NoError(tb, err)
|
|
|
|
var chunkSize int
|
|
if len(in.payload) > 0 {
|
|
chunkSize = rand.Int() % len(in.payload)
|
|
if chunkSize == 0 {
|
|
chunkSize = 1
|
|
}
|
|
}
|
|
|
|
for payload := in.payload; len(payload) > 0; payload = payload[chunkSize:] {
|
|
if chunkSize > len(payload) {
|
|
chunkSize = len(payload)
|
|
}
|
|
n, err := w.Write(payload[:chunkSize])
|
|
require.NoError(tb, err)
|
|
require.EqualValues(tb, chunkSize, n)
|
|
}
|
|
|
|
err = w.Close()
|
|
require.NoError(tb, err)
|
|
|
|
checker.chainCollector.verify(checker.input, w.ID())
|
|
}
|
|
|
|
type slicedObjectChecker struct {
|
|
tb testing.TB
|
|
|
|
input input
|
|
|
|
chainCollector *chainCollector
|
|
}
|
|
|
|
func (x *slicedObjectChecker) InitDataStream(hdr object.Object) (io.Writer, error) {
|
|
checkStaticMetadata(x.tb, hdr, x.input)
|
|
|
|
buf := bytes.NewBuffer(nil)
|
|
|
|
x.chainCollector.handleOutgoingObject(hdr, buf)
|
|
|
|
return newSizeChecker(x.tb, buf, x.input.payloadLimit), nil
|
|
}
|
|
|
|
type writeSizeChecker struct {
|
|
tb testing.TB
|
|
limit uint64
|
|
processed uint64
|
|
base io.Writer
|
|
}
|
|
|
|
func newSizeChecker(tb testing.TB, base io.Writer, sizeLimit uint64) io.Writer {
|
|
return &writeSizeChecker{
|
|
tb: tb,
|
|
limit: sizeLimit,
|
|
base: base,
|
|
}
|
|
}
|
|
|
|
func (x *writeSizeChecker) Write(p []byte) (int, error) {
|
|
n, err := x.base.Write(p)
|
|
x.processed += uint64(n)
|
|
return n, err
|
|
}
|
|
|
|
func (x *writeSizeChecker) Close() error {
|
|
require.LessOrEqual(x.tb, x.processed, x.limit, "object payload must not overflow the limit")
|
|
return nil
|
|
}
|
|
|
|
type payloadWithChecksum struct {
|
|
r io.Reader
|
|
cs []checksum.Checksum
|
|
hs []hash.Hash
|
|
}
|
|
|
|
type chainCollector struct {
|
|
tb testing.TB
|
|
|
|
mProcessed map[oid.ID]struct{}
|
|
|
|
parentHeaderSet bool
|
|
parentHeader object.Object
|
|
|
|
splitID *object.SplitID
|
|
|
|
firstSet bool
|
|
first oid.ID
|
|
firstHeader object.Object
|
|
|
|
mNext map[oid.ID]oid.ID
|
|
|
|
mPayloads map[oid.ID]payloadWithChecksum
|
|
|
|
children []oid.ID
|
|
}
|
|
|
|
func newChainCollector(tb testing.TB) *chainCollector {
|
|
return &chainCollector{
|
|
tb: tb,
|
|
mProcessed: make(map[oid.ID]struct{}),
|
|
mNext: make(map[oid.ID]oid.ID),
|
|
mPayloads: make(map[oid.ID]payloadWithChecksum),
|
|
}
|
|
}
|
|
|
|
func checkStaticMetadata(tb testing.TB, header object.Object, in input) {
|
|
cnr, ok := header.ContainerID()
|
|
require.True(tb, ok, "all objects must be bound to some container")
|
|
require.True(tb, cnr.Equals(in.container), "the container must be set to the configured one")
|
|
|
|
owner := header.OwnerID()
|
|
require.NotNil(tb, owner, "any object must be owned by somebody")
|
|
if in.sessionToken != nil {
|
|
require.True(tb, in.sessionToken.Issuer().Equals(*owner), "owner must be set to the session issuer")
|
|
} else {
|
|
require.True(tb, owner.Equals(in.owner), "owner must be set to the particular user")
|
|
}
|
|
|
|
ver := header.Version()
|
|
require.NotNil(tb, ver, "version must be set in all objects")
|
|
require.Equal(tb, version.Current(), *ver, "the version must be set to current SDK one")
|
|
|
|
require.Equal(tb, object.TypeRegular, header.Type(), "only regular objects must be produced")
|
|
require.EqualValues(tb, in.currentEpoch, header.CreationEpoch(), "configured current epoch must be set as creation epoch")
|
|
require.Equal(tb, in.sessionToken, header.SessionToken(), "configured session token must be written into objects")
|
|
|
|
require.NoError(tb, object.CheckHeaderVerificationFields(&header), "verification fields must be correctly set in header")
|
|
|
|
_, ok = header.PayloadHomomorphicHash()
|
|
require.Equal(tb, in.withHomo, ok)
|
|
}
|
|
|
|
func (x *chainCollector) handleOutgoingObject(header object.Object, payload io.Reader) {
|
|
id, ok := header.ID()
|
|
require.True(x.tb, ok, "all objects must have an ID")
|
|
|
|
idCalc, err := object.CalculateID(&header)
|
|
require.NoError(x.tb, err)
|
|
|
|
require.True(x.tb, idCalc.Equals(id))
|
|
|
|
_, ok = x.mProcessed[id]
|
|
require.False(x.tb, ok, "object must be written exactly once")
|
|
|
|
x.mProcessed[id] = struct{}{}
|
|
|
|
splitID := header.SplitID()
|
|
if x.splitID == nil && splitID != nil {
|
|
x.splitID = splitID
|
|
} else {
|
|
require.Equal(x.tb, x.splitID, splitID, "split ID must the same in all objects")
|
|
}
|
|
|
|
parent := header.Parent()
|
|
if parent != nil {
|
|
require.Nil(x.tb, parent.Parent(), "multi-level genealogy is not supported")
|
|
|
|
if x.parentHeaderSet {
|
|
require.Equal(x.tb, x.parentHeader, *parent, "root header must the same")
|
|
} else {
|
|
x.parentHeaderSet = true
|
|
x.parentHeader = *parent
|
|
}
|
|
}
|
|
|
|
prev, ok := header.PreviousID()
|
|
if ok {
|
|
_, ok := x.mNext[prev]
|
|
require.False(x.tb, ok, "split-chain must not be forked")
|
|
|
|
for k := range x.mNext {
|
|
require.False(x.tb, k.Equals(prev), "split-chain must not be cycled")
|
|
}
|
|
|
|
x.mNext[prev] = id
|
|
} else if len(header.Children()) == 0 { // 1st split-chain or linking object
|
|
require.False(x.tb, x.firstSet, "there must not be multiple split-chains")
|
|
x.firstSet = true
|
|
x.first = id
|
|
x.firstHeader = header
|
|
}
|
|
|
|
children := header.Children()
|
|
if len(children) > 0 {
|
|
if len(x.children) > 0 {
|
|
require.Equal(x.tb, x.children, children, "children list must be the same")
|
|
} else {
|
|
x.children = children
|
|
}
|
|
}
|
|
|
|
cs, ok := header.PayloadChecksum()
|
|
require.True(x.tb, ok)
|
|
|
|
pcs := payloadWithChecksum{
|
|
r: payload,
|
|
cs: []checksum.Checksum{cs},
|
|
hs: []hash.Hash{sha256.New()},
|
|
}
|
|
|
|
csHomo, ok := header.PayloadHomomorphicHash()
|
|
if ok {
|
|
pcs.cs = append(pcs.cs, csHomo)
|
|
pcs.hs = append(pcs.hs, tz.New())
|
|
}
|
|
|
|
x.mPayloads[id] = pcs
|
|
}
|
|
|
|
func (x *chainCollector) verify(in input, rootID oid.ID) {
|
|
require.True(x.tb, x.firstSet, "initial split-chain element must be set")
|
|
|
|
rootObj := x.parentHeader
|
|
if !x.parentHeaderSet {
|
|
rootObj = x.firstHeader
|
|
}
|
|
|
|
restoredChain := []oid.ID{x.first}
|
|
restoredPayload := bytes.NewBuffer(make([]byte, 0, rootObj.PayloadSize()))
|
|
|
|
for {
|
|
v, ok := x.mPayloads[restoredChain[len(restoredChain)-1]]
|
|
require.True(x.tb, ok)
|
|
|
|
ws := []io.Writer{restoredPayload}
|
|
for i := range v.hs {
|
|
ws = append(ws, v.hs[i])
|
|
}
|
|
|
|
_, err := io.Copy(io.MultiWriter(ws...), v.r)
|
|
require.NoError(x.tb, err)
|
|
|
|
for i := range v.cs {
|
|
require.True(x.tb, bytes.Equal(v.cs[i].Value(), v.hs[i].Sum(nil)))
|
|
}
|
|
|
|
next, ok := x.mNext[restoredChain[len(restoredChain)-1]]
|
|
if !ok {
|
|
break
|
|
}
|
|
|
|
restoredChain = append(restoredChain, next)
|
|
}
|
|
|
|
rootObj.SetPayload(restoredPayload.Bytes())
|
|
|
|
if uint64(len(in.payload)) <= in.payloadLimit {
|
|
require.Empty(x.tb, x.children)
|
|
} else {
|
|
require.Equal(x.tb, x.children, restoredChain)
|
|
}
|
|
|
|
id, ok := rootObj.ID()
|
|
require.True(x.tb, ok, "root object must have an ID")
|
|
require.True(x.tb, id.Equals(rootID), "root ID in root object must be returned in the result")
|
|
|
|
checkStaticMetadata(x.tb, rootObj, in)
|
|
|
|
attrs := rootObj.Attributes()
|
|
require.Len(x.tb, attrs, len(in.attributes)/2)
|
|
for i := range attrs {
|
|
require.Equal(x.tb, in.attributes[2*i], attrs[i].Key())
|
|
require.Equal(x.tb, in.attributes[2*i+1], attrs[i].Value())
|
|
}
|
|
|
|
require.Equal(x.tb, in.payload, rootObj.Payload())
|
|
require.NoError(x.tb, object.VerifyPayloadChecksum(&rootObj), "payload checksum must be correctly set")
|
|
}
|