forked from TrueCloudLab/frostfs-sdk-go
slicer: Allow to toggle homomorphic hashing
Homomorphic hashing of object payload is not always necessary. There is a need to provide ability to skip calculation. It's also worth to not calculate it by default since current implementation of Tillich-Zemor algorithm has large resource cost. Do not calculate homomorphic checksum in `Slicer` methods by default. Provide option to enable the calculation. Make tests to randomize calculation flag and assert results according to it. Refs #342. Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
This commit is contained in:
parent
ab5ae28fdb
commit
e2011832eb
4 changed files with 57 additions and 23 deletions
|
@ -342,6 +342,9 @@ func CreateObject(ctx context.Context, cli *Client, signer neofscrypto.Signer, c
|
||||||
var opts slicer.Options
|
var opts slicer.Options
|
||||||
opts.SetObjectPayloadLimit(netInfo.MaxObjectSize())
|
opts.SetObjectPayloadLimit(netInfo.MaxObjectSize())
|
||||||
opts.SetCurrentNeoFSEpoch(netInfo.CurrentEpoch())
|
opts.SetCurrentNeoFSEpoch(netInfo.CurrentEpoch())
|
||||||
|
if !netInfo.HomomorphicHashingDisabled() {
|
||||||
|
opts.CalculateHomomorphicChecksum()
|
||||||
|
}
|
||||||
|
|
||||||
s := slicer.New(signer, cnr, owner, &objectWriter{
|
s := slicer.New(signer, cnr, owner, &objectWriter{
|
||||||
context: ctx,
|
context: ctx,
|
||||||
|
|
|
@ -5,6 +5,8 @@ type Options struct {
|
||||||
objectPayloadLimit uint64
|
objectPayloadLimit uint64
|
||||||
|
|
||||||
currentNeoFSEpoch uint64
|
currentNeoFSEpoch uint64
|
||||||
|
|
||||||
|
withHomoChecksum bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetObjectPayloadLimit specifies data size limit for produced physically
|
// SetObjectPayloadLimit specifies data size limit for produced physically
|
||||||
|
@ -17,3 +19,9 @@ func (x *Options) SetObjectPayloadLimit(l uint64) {
|
||||||
func (x *Options) SetCurrentNeoFSEpoch(e uint64) {
|
func (x *Options) SetCurrentNeoFSEpoch(e uint64) {
|
||||||
x.currentNeoFSEpoch = e
|
x.currentNeoFSEpoch = e
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CalculateHomomorphicChecksum makes Slicer to calculate and set homomorphic
|
||||||
|
// checksum of the processed objects.
|
||||||
|
func (x *Options) CalculateHomomorphicChecksum() {
|
||||||
|
x.withHomoChecksum = true
|
||||||
|
}
|
||||||
|
|
|
@ -143,16 +143,15 @@ func (x *Slicer) Slice(data io.Reader, attributes ...string) (oid.ID, error) {
|
||||||
|
|
||||||
var rootID oid.ID
|
var rootID oid.ID
|
||||||
var rootHeader object.Object
|
var rootHeader object.Object
|
||||||
var rootMeta dynamicObjectMetadata
|
|
||||||
var offset uint64
|
var offset uint64
|
||||||
var isSplit bool
|
var isSplit bool
|
||||||
var childMeta dynamicObjectMetadata
|
var childMeta dynamicObjectMetadata
|
||||||
var writtenChildren []oid.ID
|
var writtenChildren []oid.ID
|
||||||
var childHeader object.Object
|
var childHeader object.Object
|
||||||
|
rootMeta := newDynamicObjectMetadata(x.opts.withHomoChecksum)
|
||||||
bChunk := make([]byte, x.opts.objectPayloadLimit+1)
|
bChunk := make([]byte, x.opts.objectPayloadLimit+1)
|
||||||
|
|
||||||
x.fillCommonMetadata(&rootHeader)
|
x.fillCommonMetadata(&rootHeader)
|
||||||
rootMeta.reset()
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
n, err := data.Read(bChunk[offset:])
|
n, err := data.Read(bChunk[offset:])
|
||||||
|
@ -264,7 +263,7 @@ func (x *Slicer) Slice(data io.Reader, attributes ...string) (oid.ID, error) {
|
||||||
|
|
||||||
// shift overflow bytes to the beginning
|
// shift overflow bytes to the beginning
|
||||||
if !isSplitCp {
|
if !isSplitCp {
|
||||||
childMeta = dynamicObjectMetadata{} // to avoid rootMeta corruption
|
childMeta = newDynamicObjectMetadata(x.opts.withHomoChecksum) // to avoid rootMeta corruption
|
||||||
}
|
}
|
||||||
childMeta.reset()
|
childMeta.reset()
|
||||||
childMeta.accumulateNextPayloadChunk(bChunk[toSend:])
|
childMeta.accumulateNextPayloadChunk(bChunk[toSend:])
|
||||||
|
@ -297,6 +296,8 @@ func (x *Slicer) InitPayloadStream(attributes ...string) (*PayloadWriter, error)
|
||||||
currentEpoch: x.opts.currentNeoFSEpoch,
|
currentEpoch: x.opts.currentNeoFSEpoch,
|
||||||
sessionToken: x.sessionToken,
|
sessionToken: x.sessionToken,
|
||||||
attributes: attributes,
|
attributes: attributes,
|
||||||
|
rootMeta: newDynamicObjectMetadata(x.opts.withHomoChecksum),
|
||||||
|
childMeta: newDynamicObjectMetadata(x.opts.withHomoChecksum),
|
||||||
}
|
}
|
||||||
|
|
||||||
res.buf.Grow(int(x.childPayloadSizeLimit()))
|
res.buf.Grow(int(x.childPayloadSizeLimit()))
|
||||||
|
@ -486,11 +487,13 @@ func flushObjectMetadata(signer neofscrypto.Signer, meta dynamicObjectMetadata,
|
||||||
cs.SetSHA256(csBytes)
|
cs.SetSHA256(csBytes)
|
||||||
header.SetPayloadChecksum(cs)
|
header.SetPayloadChecksum(cs)
|
||||||
|
|
||||||
var csHomoBytes [tz.Size]byte
|
if meta.homomorphicChecksum != nil {
|
||||||
copy(csHomoBytes[:], meta.homomorphicChecksum.Sum(nil))
|
var csHomoBytes [tz.Size]byte
|
||||||
|
copy(csHomoBytes[:], meta.homomorphicChecksum.Sum(nil))
|
||||||
|
|
||||||
cs.SetTillichZemor(csHomoBytes)
|
cs.SetTillichZemor(csHomoBytes)
|
||||||
header.SetPayloadHomomorphicHash(cs)
|
header.SetPayloadHomomorphicHash(cs)
|
||||||
|
}
|
||||||
|
|
||||||
header.SetPayloadSize(meta.length)
|
header.SetPayloadSize(meta.length)
|
||||||
|
|
||||||
|
@ -552,6 +555,18 @@ type dynamicObjectMetadata struct {
|
||||||
homomorphicChecksum hash.Hash
|
homomorphicChecksum hash.Hash
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newDynamicObjectMetadata(withHomoChecksum bool) dynamicObjectMetadata {
|
||||||
|
m := dynamicObjectMetadata{
|
||||||
|
checksum: sha256.New(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if withHomoChecksum {
|
||||||
|
m.homomorphicChecksum = tz.New()
|
||||||
|
}
|
||||||
|
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
func (x *dynamicObjectMetadata) Write(chunk []byte) (int, error) {
|
func (x *dynamicObjectMetadata) Write(chunk []byte) (int, error) {
|
||||||
x.accumulateNextPayloadChunk(chunk)
|
x.accumulateNextPayloadChunk(chunk)
|
||||||
return len(chunk), nil
|
return len(chunk), nil
|
||||||
|
@ -562,23 +577,17 @@ func (x *dynamicObjectMetadata) Write(chunk []byte) (int, error) {
|
||||||
func (x *dynamicObjectMetadata) accumulateNextPayloadChunk(chunk []byte) {
|
func (x *dynamicObjectMetadata) accumulateNextPayloadChunk(chunk []byte) {
|
||||||
x.length += uint64(len(chunk))
|
x.length += uint64(len(chunk))
|
||||||
x.checksum.Write(chunk)
|
x.checksum.Write(chunk)
|
||||||
x.homomorphicChecksum.Write(chunk)
|
if x.homomorphicChecksum != nil {
|
||||||
|
x.homomorphicChecksum.Write(chunk)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset resets all accumulated metadata.
|
// reset resets all accumulated metadata.
|
||||||
func (x *dynamicObjectMetadata) reset() {
|
func (x *dynamicObjectMetadata) reset() {
|
||||||
x.length = 0
|
x.length = 0
|
||||||
|
x.checksum.Reset()
|
||||||
if x.checksum != nil {
|
|
||||||
x.checksum.Reset()
|
|
||||||
} else {
|
|
||||||
x.checksum = sha256.New()
|
|
||||||
}
|
|
||||||
|
|
||||||
if x.homomorphicChecksum != nil {
|
if x.homomorphicChecksum != nil {
|
||||||
x.homomorphicChecksum.Reset()
|
x.homomorphicChecksum.Reset()
|
||||||
} else {
|
|
||||||
x.homomorphicChecksum = tz.New()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -141,6 +141,7 @@ type input struct {
|
||||||
sessionToken *session.Object
|
sessionToken *session.Object
|
||||||
payload []byte
|
payload []byte
|
||||||
attributes []string
|
attributes []string
|
||||||
|
withHomo bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func randomData(size uint64) []byte {
|
func randomData(size uint64) []byte {
|
||||||
|
@ -181,9 +182,14 @@ func randomInput(tb testing.TB, size, sizeLimit uint64) (input, slicer.Options)
|
||||||
in.owner = *usertest.ID(tb)
|
in.owner = *usertest.ID(tb)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
in.withHomo = rand.Int()%2 == 0
|
||||||
|
|
||||||
var opts slicer.Options
|
var opts slicer.Options
|
||||||
opts.SetObjectPayloadLimit(in.payloadLimit)
|
opts.SetObjectPayloadLimit(in.payloadLimit)
|
||||||
opts.SetCurrentNeoFSEpoch(in.currentEpoch)
|
opts.SetCurrentNeoFSEpoch(in.currentEpoch)
|
||||||
|
if in.withHomo {
|
||||||
|
opts.CalculateHomomorphicChecksum()
|
||||||
|
}
|
||||||
|
|
||||||
return in, opts
|
return in, opts
|
||||||
}
|
}
|
||||||
|
@ -340,6 +346,9 @@ func checkStaticMetadata(tb testing.TB, header object.Object, in input) {
|
||||||
require.Equal(tb, in.sessionToken, header.SessionToken(), "configured session token must be written into objects")
|
require.Equal(tb, in.sessionToken, header.SessionToken(), "configured session token must be written into objects")
|
||||||
|
|
||||||
require.NoError(tb, object.CheckHeaderVerificationFields(&header), "verification fields must be correctly set in header")
|
require.NoError(tb, object.CheckHeaderVerificationFields(&header), "verification fields must be correctly set in header")
|
||||||
|
|
||||||
|
_, ok = header.PayloadHomomorphicHash()
|
||||||
|
require.Equal(tb, in.withHomo, ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *chainCollector) handleOutgoingObject(header object.Object, payload io.Reader) {
|
func (x *chainCollector) handleOutgoingObject(header object.Object, payload io.Reader) {
|
||||||
|
@ -404,14 +413,19 @@ func (x *chainCollector) handleOutgoingObject(header object.Object, payload io.R
|
||||||
cs, ok := header.PayloadChecksum()
|
cs, ok := header.PayloadChecksum()
|
||||||
require.True(x.tb, ok)
|
require.True(x.tb, ok)
|
||||||
|
|
||||||
csHomo, ok := header.PayloadHomomorphicHash()
|
pcs := payloadWithChecksum{
|
||||||
require.True(x.tb, ok)
|
|
||||||
|
|
||||||
x.mPayloads[id] = payloadWithChecksum{
|
|
||||||
r: payload,
|
r: payload,
|
||||||
cs: []checksum.Checksum{cs, csHomo},
|
cs: []checksum.Checksum{cs},
|
||||||
hs: []hash.Hash{sha256.New(), tz.New()},
|
hs: []hash.Hash{sha256.New()},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
csHomo, ok := header.PayloadHomomorphicHash()
|
||||||
|
if ok {
|
||||||
|
pcs.cs = append(pcs.cs, csHomo)
|
||||||
|
pcs.hs = append(pcs.hs, tz.New())
|
||||||
|
}
|
||||||
|
|
||||||
|
x.mPayloads[id] = pcs
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *chainCollector) verify(in input, rootID oid.ID) {
|
func (x *chainCollector) verify(in input, rootID oid.ID) {
|
||||||
|
|
Loading…
Reference in a new issue