slicer: Implement alternative slicing through user writing

Provide method to initialize payload stream as `io.WriteCloser`. This
approach will be useful for applications which need to control data
writing by themselves.

Refs #342.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
This commit is contained in:
Leonard Lyubich 2023-03-24 19:21:52 +04:00
parent 0d7d03d56f
commit ab5ae28fdb
2 changed files with 329 additions and 23 deletions

View file

@ -1,6 +1,7 @@
package slicer
import (
"bytes"
"crypto/sha256"
"errors"
"fmt"
@ -108,6 +109,17 @@ func (x *Slicer) fillCommonMetadata(obj *object.Object) {
obj.SetSessionToken(x.sessionToken)
}
const defaultPayloadSizeLimit = 1 << 20
// childPayloadSizeLimit returns configured size limit of the child object's
// payload which defaults to 1MB.
func (x *Slicer) childPayloadSizeLimit() uint64 {
if x.opts.objectPayloadLimit > 0 {
return x.opts.objectPayloadLimit
}
return defaultPayloadSizeLimit
}
// Slice creates new NeoFS object from the input data stream, associates the
// object with the configured container and writes the object via underlying
// ObjectWriter. After a successful write, Slice returns an oid.ID which is a
@ -274,6 +286,197 @@ func (x *Slicer) Slice(data io.Reader, attributes ...string) (oid.ID, error) {
return rootID, nil
}
// InitPayloadStream works similar to Slice but provides PayloadWriter allowing
// the caller to write data himself.
func (x *Slicer) InitPayloadStream(attributes ...string) (*PayloadWriter, error) {
res := &PayloadWriter{
stream: x.w,
signer: x.signer,
container: x.cnr,
owner: x.owner,
currentEpoch: x.opts.currentNeoFSEpoch,
sessionToken: x.sessionToken,
attributes: attributes,
}
res.buf.Grow(int(x.childPayloadSizeLimit()))
res.rootMeta.reset()
res.currentWriter = newLimitedWriter(io.MultiWriter(&res.buf, &res.rootMeta), x.childPayloadSizeLimit())
return res, nil
}
// PayloadWriter is a single-object payload stream provided by Slicer.
type PayloadWriter struct {
stream ObjectWriter
rootID oid.ID
signer neofscrypto.Signer
container cid.ID
owner user.ID
currentEpoch uint64
sessionToken *session.Object
attributes []string
buf bytes.Buffer
rootMeta dynamicObjectMetadata
childMeta dynamicObjectMetadata
currentWriter limitedWriter
withSplit bool
writtenChildren []oid.ID
}
// Write writes next chunk of the object data. Concatenation of all chunks forms
// the payload of the final object. When the data is over, the PayloadWriter
// should be closed.
func (x *PayloadWriter) Write(chunk []byte) (int, error) {
if len(chunk) == 0 {
// not explicitly prohibited in the io.Writer documentation
return 0, nil
}
n, err := x.currentWriter.Write(chunk)
if err == nil || !errors.Is(err, errOverflow) {
return n, err
}
if !x.withSplit {
err = x.writeIntermediateChild(x.rootMeta)
if err != nil {
return n, fmt.Errorf("write 1st child: %w", err)
}
x.currentWriter.reset(io.MultiWriter(&x.buf, &x.rootMeta, &x.childMeta))
x.withSplit = true
} else {
err = x.writeIntermediateChild(x.childMeta)
if err != nil {
return n, fmt.Errorf("write next child: %w", err)
}
x.currentWriter.resetProgress()
}
x.buf.Reset()
x.childMeta.reset()
n2, err := x.Write(chunk[n:]) // here n > 0 so infinite recursion shouldn't occur
return n + n2, err
}
// Close finalizes object with written payload data, saves the object and closes
// the stream. Reference to the stored object can be obtained by ID method.
func (x *PayloadWriter) Close() error {
if x.withSplit {
return x.writeLastChild(x.childMeta, x.setID)
}
return x.writeLastChild(x.rootMeta, x.setID)
}
func (x *PayloadWriter) setID(id oid.ID) {
x.rootID = id
}
// ID returns unique identifier of the stored object representing its reference
// in the configured container.
//
// ID MUST NOT be called before successful Close (undefined behavior otherwise).
func (x *PayloadWriter) ID() oid.ID {
return x.rootID
}
// writeIntermediateChild writes intermediate split-chain element with specified
// dynamicObjectMetadata to the configured ObjectWriter.
func (x *PayloadWriter) writeIntermediateChild(meta dynamicObjectMetadata) error {
return x._writeChild(meta, false, nil)
}
// writeIntermediateChild writes last split-chain element with specified
// dynamicObjectMetadata to the configured ObjectWriter. If rootIDHandler is
// specified, ID of the resulting root object is passed into it.
func (x *PayloadWriter) writeLastChild(meta dynamicObjectMetadata, rootIDHandler func(id oid.ID)) error {
return x._writeChild(meta, true, rootIDHandler)
}
func (x *PayloadWriter) _writeChild(meta dynamicObjectMetadata, last bool, rootIDHandler func(id oid.ID)) error {
currentVersion := version.Current()
fCommon := func(obj *object.Object) {
obj.SetVersion(&currentVersion)
obj.SetContainerID(x.container)
obj.SetCreationEpoch(x.currentEpoch)
obj.SetType(object.TypeRegular)
obj.SetOwnerID(&x.owner)
obj.SetSessionToken(x.sessionToken)
}
var obj object.Object
fCommon(&obj)
if len(x.writtenChildren) > 0 {
obj.SetPreviousID(x.writtenChildren[len(x.writtenChildren)-1])
}
if last {
var rootObj *object.Object
if x.withSplit {
rootObj = new(object.Object)
} else {
rootObj = &obj
}
fCommon(rootObj)
if len(x.attributes) > 0 {
attrs := make([]object.Attribute, len(x.attributes)/2)
for i := 0; i < len(attrs); i++ {
attrs[i].SetKey(x.attributes[2*i])
attrs[i].SetValue(x.attributes[2*i+1])
}
rootObj.SetAttributes(attrs...)
}
rootID, err := flushObjectMetadata(x.signer, x.rootMeta, rootObj)
if err != nil {
return fmt.Errorf("form root object: %w", err)
}
if rootIDHandler != nil {
rootIDHandler(rootID)
}
if x.withSplit {
obj.SetParentID(rootID)
obj.SetParent(rootObj)
}
}
id, err := writeInMemObject(x.signer, x.stream, obj, x.buf.Bytes(), meta)
if err != nil {
return fmt.Errorf("write formed object: %w", err)
}
x.writtenChildren = append(x.writtenChildren, id)
if x.withSplit && last {
obj.ResetPreviousID()
obj.SetChildren(x.writtenChildren...)
_, err = writeInMemObject(x.signer, x.stream, obj, nil, meta)
if err != nil {
return fmt.Errorf("write linking object: %w", err)
}
}
return nil
}
func flushObjectMetadata(signer neofscrypto.Signer, meta dynamicObjectMetadata, header *object.Object) (oid.ID, error) {
var cs checksum.Checksum
@ -349,6 +552,11 @@ type dynamicObjectMetadata struct {
homomorphicChecksum hash.Hash
}
func (x *dynamicObjectMetadata) Write(chunk []byte) (int, error) {
x.accumulateNextPayloadChunk(chunk)
return len(chunk), nil
}
// accumulateNextPayloadChunk handles the next payload chunk and updates the
// accumulated metadata.
func (x *dynamicObjectMetadata) accumulateNextPayloadChunk(chunk []byte) {
@ -373,3 +581,53 @@ func (x *dynamicObjectMetadata) reset() {
x.homomorphicChecksum = tz.New()
}
}
var errOverflow = errors.New("overflow")
// limitedWriter provides io.Writer limiting data volume.
type limitedWriter struct {
base io.Writer
limit, written uint64
}
// newLimitedWriter initializes limiterWriter which writes data to the base
// writer before the specified limit.
func newLimitedWriter(base io.Writer, limit uint64) limitedWriter {
return limitedWriter{
base: base,
limit: limit,
}
}
// reset resets progress to zero and sets the base target for writing subsequent
// data.
func (x *limitedWriter) reset(base io.Writer) {
x.base = base
x.resetProgress()
}
// resetProgress resets progress to zero.
func (x *limitedWriter) resetProgress() {
x.written = 0
}
// Write writes next chunk of the data to the base writer. If chunk along with
// already written data overflows configured limit, Write returns errOverflow.
func (x *limitedWriter) Write(p []byte) (n int, err error) {
overflow := uint64(len(p)) > x.limit-x.written
if overflow {
n, err = x.base.Write(p[:x.limit-x.written])
} else {
n, err = x.base.Write(p)
}
x.written += uint64(n)
if overflow && err == nil {
return n, errOverflow
}
return n, err
}

View file

@ -77,27 +77,47 @@ func BenchmarkSliceDataIntoObjects(b *testing.B) {
}
func benchmarkSliceDataIntoObjects(b *testing.B, size, sizeLimit uint64) {
var err error
var w discardObject
in, opts := randomInput(b, size, sizeLimit)
var s *slicer.Slicer
r := bytes.NewReader(in.payload)
s := slicer.NewSession(in.signer, in.container, *sessiontest.ObjectSigned(test.RandomSigner(b)), discardObject{}, opts)
if in.sessionToken != nil {
s = slicer.NewSession(in.signer, in.container, *in.sessionToken, w, opts)
} else {
s = slicer.New(in.signer, in.container, in.owner, w, opts)
}
b.Run("reader", func(b *testing.B) {
var err error
r := bytes.NewReader(in.payload)
b.ReportAllocs()
b.ResetTimer()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = s.Slice(r, in.attributes...)
b.StopTimer()
require.NoError(b, err)
b.StartTimer()
}
for i := 0; i < b.N; i++ {
_, err = s.Slice(r, in.attributes...)
b.StopTimer()
require.NoError(b, err)
b.StartTimer()
}
})
b.Run("writer", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
var err error
var w *slicer.PayloadWriter
for i := 0; i < b.N; i++ {
w, err = s.InitPayloadStream(in.attributes...)
b.StopTimer()
require.NoError(b, err)
b.StartTimer()
_, err = w.Write(in.payload)
if err == nil {
err = w.Close()
}
b.StopTimer()
require.NoError(b, err)
b.StartTimer()
}
})
}
type discardObject struct{}
@ -147,7 +167,11 @@ func randomInput(tb testing.TB, size, sizeLimit uint64) (input, slicer.Options)
in.signer = neofsecdsa.Signer(*key)
in.container = cidtest.ID()
in.currentEpoch = rand.Uint64()
in.payloadLimit = sizeLimit
if sizeLimit > 0 {
in.payloadLimit = sizeLimit
} else {
in.payloadLimit = defaultLimit
}
in.payload = randomData(size)
in.attributes = attrs
@ -173,10 +197,6 @@ func testSlicer(tb testing.TB, size, sizeLimit uint64) {
chainCollector: newChainCollector(tb),
}
if sizeLimit == 0 {
checker.input.payloadLimit = defaultLimit
}
var s *slicer.Slicer
if checker.input.sessionToken != nil {
s = slicer.NewSession(in.signer, checker.input.container, *checker.input.sessionToken, checker, opts)
@ -184,10 +204,38 @@ func testSlicer(tb testing.TB, size, sizeLimit uint64) {
s = slicer.New(in.signer, checker.input.container, checker.input.owner, checker, opts)
}
// check reader
rootID, err := s.Slice(bytes.NewReader(in.payload), in.attributes...)
require.NoError(tb, err)
checker.chainCollector.verify(checker.input, rootID)
// check writer with random written chunk's size
checker.chainCollector = newChainCollector(tb)
w, err := s.InitPayloadStream(in.attributes...)
require.NoError(tb, err)
var chunkSize int
if len(in.payload) > 0 {
chunkSize = rand.Int() % len(in.payload)
if chunkSize == 0 {
chunkSize = 1
}
}
for payload := in.payload; len(payload) > 0; payload = payload[chunkSize:] {
if chunkSize > len(payload) {
chunkSize = len(payload)
}
n, err := w.Write(payload[:chunkSize])
require.NoError(tb, err)
require.EqualValues(tb, chunkSize, n)
}
err = w.Close()
require.NoError(tb, err)
checker.chainCollector.verify(checker.input, w.ID())
}
type slicedObjectChecker struct {