Provide static object slicer (#382)

* closes #342
This commit is contained in:
Roman Khimov 2023-04-27 12:03:40 +03:00 committed by GitHub
commit e0d06dd444
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 1275 additions and 0 deletions

View file

@ -13,10 +13,13 @@ import (
v2session "github.com/nspcc-dev/neofs-api-go/v2/session" v2session "github.com/nspcc-dev/neofs-api-go/v2/session"
"github.com/nspcc-dev/neofs-sdk-go/bearer" "github.com/nspcc-dev/neofs-sdk-go/bearer"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
"github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/object/slicer"
"github.com/nspcc-dev/neofs-sdk-go/session" "github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/user"
) )
// PrmObjectPutInit groups parameters of ObjectPutInit operation. // PrmObjectPutInit groups parameters of ObjectPutInit operation.
@ -263,3 +266,106 @@ func (c *Client) ObjectPutInit(ctx context.Context, prm PrmObjectPutInit) (*Obje
return &w, nil return &w, nil
} }
type objectWriter struct {
context context.Context
client *Client
}
func (x *objectWriter) InitDataStream(header object.Object) (io.Writer, error) {
var prm PrmObjectPutInit
stream, err := x.client.ObjectPutInit(x.context, prm)
if err != nil {
return nil, fmt.Errorf("init object stream: %w", err)
}
if stream.WriteHeader(header) {
return &payloadWriter{
stream: stream,
}, nil
}
res, err := stream.Close()
if err != nil {
return nil, err
}
return nil, apistatus.ErrFromStatus(res.Status())
}
type payloadWriter struct {
stream *ObjectWriter
}
func (x *payloadWriter) Write(p []byte) (int, error) {
if !x.stream.WritePayloadChunk(p) {
return 0, x.Close()
}
return len(p), nil
}
func (x *payloadWriter) Close() error {
res, err := x.stream.Close()
if err != nil {
return err
}
return apistatus.ErrFromStatus(res.Status())
}
// CreateObject creates new NeoFS object with given payload data and stores it
// in specified container of the NeoFS network using provided Client connection.
// The object is created on behalf of provided neofscrypto.Signer, and owned by
// the specified user.ID.
//
// In terms of NeoFS, parameterized neofscrypto.Signer represents object owner,
// object signer and request sender. Container SHOULD be public-write or sender
// SHOULD have corresponding rights.
//
// Client connection MUST be opened in advance, see Dial method for details.
// Network communication is carried out within a given context, so it MUST NOT
// be nil.
//
// Notice: This API is EXPERIMENTAL and is planned to be replaced/changed in the
// future. Be ready to refactor your code regarding imports and call mechanics,
// in essence the operation will not change.
func CreateObject(ctx context.Context, cli *Client, signer neofscrypto.Signer, cnr cid.ID, owner user.ID, data io.Reader, attributes ...string) (oid.ID, error) {
s, err := NewDataSlicer(ctx, cli, signer, cnr, owner)
if err != nil {
return oid.ID{}, err
}
return s.Slice(data, attributes...)
}
// NewDataSlicer creates slicer.Slicer that saves data in the NeoFS network
// through provided Client. The data is packaged into NeoFS objects stored in
// the specified container. Provided signer is being used to sign the resulting
// objects as a system requirement. Produced objects are owned by the
// parameterized NeoFS user.
//
// Notice: This API is EXPERIMENTAL and is planned to be replaced/changed in the
// future. Be ready to refactor your code regarding imports and call mechanics,
// in essence the operation will not change.
func NewDataSlicer(ctx context.Context, cli *Client, signer neofscrypto.Signer, cnr cid.ID, owner user.ID) (*slicer.Slicer, error) {
resNetInfo, err := cli.NetworkInfo(ctx, PrmNetworkInfo{})
if err != nil {
return nil, fmt.Errorf("read current network info: %w", err)
}
netInfo := resNetInfo.Info()
var opts slicer.Options
opts.SetObjectPayloadLimit(netInfo.MaxObjectSize())
opts.SetCurrentNeoFSEpoch(netInfo.CurrentEpoch())
if !netInfo.HomomorphicHashingDisabled() {
opts.CalculateHomomorphicChecksum()
}
return slicer.New(signer, cnr, owner, &objectWriter{
context: ctx,
client: cli,
}, opts), nil
}

View file

@ -338,6 +338,13 @@ func (o *Object) PreviousID() (v oid.ID, isSet bool) {
return return
} }
// ResetPreviousID resets identifier of the previous sibling object.
func (o *Object) ResetPreviousID() {
o.setSplitFields(func(split *object.SplitHeader) {
split.SetPrevious(nil)
})
}
// SetPreviousID sets identifier of the previous sibling object. // SetPreviousID sets identifier of the previous sibling object.
func (o *Object) SetPreviousID(v oid.ID) { func (o *Object) SetPreviousID(v oid.ID) {
var v2 refs.ObjectID var v2 refs.ObjectID

4
object/slicer/doc.go Normal file
View file

@ -0,0 +1,4 @@
/*
Package slicer provides raw data slicing into NeoFS objects.
*/
package slicer

27
object/slicer/options.go Normal file
View file

@ -0,0 +1,27 @@
package slicer
// Options groups Slicer options.
type Options struct {
objectPayloadLimit uint64
currentNeoFSEpoch uint64
withHomoChecksum bool
}
// SetObjectPayloadLimit specifies data size limit for produced physically
// stored objects.
func (x *Options) SetObjectPayloadLimit(l uint64) {
x.objectPayloadLimit = l
}
// SetCurrentNeoFSEpoch sets current NeoFS epoch.
func (x *Options) SetCurrentNeoFSEpoch(e uint64) {
x.currentNeoFSEpoch = e
}
// CalculateHomomorphicChecksum makes Slicer to calculate and set homomorphic
// checksum of the processed objects.
func (x *Options) CalculateHomomorphicChecksum() {
x.withHomoChecksum = true
}

642
object/slicer/slicer.go Normal file
View file

@ -0,0 +1,642 @@
package slicer
import (
"bytes"
"crypto/sha256"
"errors"
"fmt"
"hash"
"io"
"github.com/nspcc-dev/neofs-sdk-go/checksum"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/user"
"github.com/nspcc-dev/neofs-sdk-go/version"
"github.com/nspcc-dev/tzhash/tz"
)
// ObjectWriter represents a virtual object recorder.
type ObjectWriter interface {
// InitDataStream initializes and returns a stream of writable data associated
// with the object according to its header. Provided header includes at least
// container, owner and object ID fields.
InitDataStream(header object.Object) (dataStream io.Writer, err error)
}
// Slicer converts input raw data streams into NeoFS objects. Working Slicer
// must be constructed via New.
type Slicer struct {
signer neofscrypto.Signer
cnr cid.ID
owner user.ID
w ObjectWriter
opts Options
sessionToken *session.Object
}
// New constructs Slicer which writes sliced ready-to-go objects owned by
// particular user into the specified container using provided ObjectWriter.
// All objects are signed using provided neofscrypto.Signer.
//
// If ObjectWriter returns data streams which provide io.Closer, they are closed
// in Slicer.Slice after the payload of any object has been written. In this
// case, Slicer.Slice fails immediately on Close error.
//
// Options parameter allows you to provide optional parameters which tune
// the default Slicer behavior. They are detailed below.
//
// If payload size limit is specified via Options.SetObjectPayloadLimit,
// outgoing objects has payload not bigger than the limit. NeoFS stores the
// corresponding value in the network configuration. Ignore this option if you
// don't (want to) have access to it. By default, single object is limited by
// 1MB. Slicer uses this value to enforce the maximum object payload size limit
// described in the NeoFS Specification. If the total amount of data exceeds the
// specified limit, Slicer applies the slicing algorithm described within the
// same specification. The outcome will be a group of "small" objects containing
// a chunk of data, as well as an auxiliary linking object. All derived objects
// are written to the parameterized ObjectWriter. If the amount of data is
// within the limit, one object is produced. Note that Slicer can write multiple
// objects, but returns the root object ID only.
//
// If current NeoFS epoch is specified via Options.SetCurrentNeoFSEpoch, it is
// written to the metadata of all resulting objects as a creation epoch.
//
// See also NewSession.
func New(signer neofscrypto.Signer, cnr cid.ID, owner user.ID, w ObjectWriter, opts Options) *Slicer {
return &Slicer{
signer: signer,
cnr: cnr,
owner: owner,
w: w,
opts: opts,
}
}
// NewSession creates Slicer which generates objects within provided session.
// NewSession work similar to New with the detail that the session issuer owns
// the produced objects. Specified session token is written to the metadata of
// all resulting objects. In this case, the object is considered to be created
// by a proxy on behalf of the session issuer.
func NewSession(signer neofscrypto.Signer, cnr cid.ID, token session.Object, w ObjectWriter, opts Options) *Slicer {
return &Slicer{
signer: signer,
cnr: cnr,
owner: token.Issuer(),
w: w,
opts: opts,
sessionToken: &token,
}
}
// fillCommonMetadata writes to the object metadata common to all objects of the
// same stream.
func (x *Slicer) fillCommonMetadata(obj *object.Object) {
currentVersion := version.Current()
obj.SetVersion(&currentVersion)
obj.SetContainerID(x.cnr)
obj.SetCreationEpoch(x.opts.currentNeoFSEpoch)
obj.SetType(object.TypeRegular)
obj.SetOwnerID(&x.owner)
obj.SetSessionToken(x.sessionToken)
}
const defaultPayloadSizeLimit = 1 << 20
// childPayloadSizeLimit returns configured size limit of the child object's
// payload which defaults to 1MB.
func (x *Slicer) childPayloadSizeLimit() uint64 {
if x.opts.objectPayloadLimit > 0 {
return x.opts.objectPayloadLimit
}
return defaultPayloadSizeLimit
}
// Slice creates new NeoFS object from the input data stream, associates the
// object with the configured container and writes the object via underlying
// ObjectWriter. After a successful write, Slice returns an oid.ID which is a
// unique reference to the object in the container. Slice sets all required
// calculated fields like payload length, checksum, etc.
//
// Slice allows you to specify string key-value pairs to be written to the
// resulting object's metadata as object attributes. Corresponding argument MUST
// NOT be empty or have odd length. Keys SHOULD NOT start with system-reserved
// '__NEOFS__' prefix.
//
// See New for details.
func (x *Slicer) Slice(data io.Reader, attributes ...string) (oid.ID, error) {
if len(attributes)%2 != 0 {
panic("attributes must be even number of strings")
}
if x.opts.objectPayloadLimit == 0 {
x.opts.objectPayloadLimit = 1 << 20
}
var rootID oid.ID
var rootHeader object.Object
var offset uint64
var isSplit bool
var childMeta dynamicObjectMetadata
var writtenChildren []oid.ID
var childHeader object.Object
rootMeta := newDynamicObjectMetadata(x.opts.withHomoChecksum)
bChunk := make([]byte, x.opts.objectPayloadLimit+1)
x.fillCommonMetadata(&rootHeader)
for {
n, err := data.Read(bChunk[offset:])
if err == nil {
if last := offset + uint64(n); last <= x.opts.objectPayloadLimit {
rootMeta.accumulateNextPayloadChunk(bChunk[offset:last])
if isSplit {
childMeta.accumulateNextPayloadChunk(bChunk[offset:last])
}
offset = last
// data is not over, and we expect more bytes to form next object
continue
}
} else {
if !errors.Is(err, io.EOF) {
return rootID, fmt.Errorf("read payload chunk: %w", err)
}
// there will be no more data
toSend := offset + uint64(n)
if toSend <= x.opts.objectPayloadLimit {
// we can finalize the root object and send last part
if len(attributes) > 0 {
attrs := make([]object.Attribute, len(attributes)/2)
for i := 0; i < len(attrs); i++ {
attrs[i].SetKey(attributes[2*i])
attrs[i].SetValue(attributes[2*i+1])
}
rootHeader.SetAttributes(attrs...)
}
rootID, err = flushObjectMetadata(x.signer, rootMeta, &rootHeader)
if err != nil {
return rootID, fmt.Errorf("form root object: %w", err)
}
if isSplit {
// when splitting, root object's header is written into its last child
childHeader.SetParent(&rootHeader)
childHeader.SetPreviousID(writtenChildren[len(writtenChildren)-1])
childID, err := writeInMemObject(x.signer, x.w, childHeader, bChunk[:toSend], childMeta)
if err != nil {
return rootID, fmt.Errorf("write child object: %w", err)
}
writtenChildren = append(writtenChildren, childID)
} else {
// root object is single (full < limit), so send it directly
rootID, err = writeInMemObject(x.signer, x.w, rootHeader, bChunk[:toSend], rootMeta)
if err != nil {
return rootID, fmt.Errorf("write single root object: %w", err)
}
return rootID, nil
}
break
}
// otherwise, form penultimate object, then do one more iteration for
// simplicity: according to io.Reader, we'll get io.EOF again, but the overflow
// will no longer occur, so we'll finish the loop
}
// according to buffer size, here we can overflow the object payload limit, e.g.
// 1. full=11B,limit=10B,read=11B (no objects created yet)
// 2. full=21B,limit=10B,read=11B (one object has been already sent with size=10B)
toSend := offset + uint64(n)
overflow := toSend > x.opts.objectPayloadLimit
if overflow {
toSend = x.opts.objectPayloadLimit
}
// we could read some data even in case of io.EOF, so don't forget pick up the tail
if n > 0 {
rootMeta.accumulateNextPayloadChunk(bChunk[offset:toSend])
if isSplit {
childMeta.accumulateNextPayloadChunk(bChunk[offset:toSend])
}
}
if overflow {
isSplitCp := isSplit // we modify it in next condition below but need after it
if !isSplit {
// we send only child object below, but we can get here at the beginning (see
// option 1 described above), so we need to pre-init child resources
isSplit = true
x.fillCommonMetadata(&childHeader)
childHeader.SetSplitID(object.NewSplitID())
childMeta = rootMeta
// we do shallow copy of rootMeta because below we take this into account and do
// not corrupt it
} else {
childHeader.SetPreviousID(writtenChildren[len(writtenChildren)-1])
}
childID, err := writeInMemObject(x.signer, x.w, childHeader, bChunk[:toSend], childMeta)
if err != nil {
return rootID, fmt.Errorf("write child object: %w", err)
}
writtenChildren = append(writtenChildren, childID)
// shift overflow bytes to the beginning
if !isSplitCp {
childMeta = newDynamicObjectMetadata(x.opts.withHomoChecksum) // to avoid rootMeta corruption
}
childMeta.reset()
childMeta.accumulateNextPayloadChunk(bChunk[toSend:])
rootMeta.accumulateNextPayloadChunk(bChunk[toSend:])
offset = uint64(copy(bChunk, bChunk[toSend:]))
}
}
// linking object
childMeta.reset()
childHeader.ResetPreviousID()
childHeader.SetChildren(writtenChildren...)
_, err := writeInMemObject(x.signer, x.w, childHeader, nil, childMeta)
if err != nil {
return rootID, fmt.Errorf("write linking object: %w", err)
}
return rootID, nil
}
// InitPayloadStream works similar to Slice but provides PayloadWriter allowing
// the caller to write data himself.
func (x *Slicer) InitPayloadStream(attributes ...string) (*PayloadWriter, error) {
res := &PayloadWriter{
stream: x.w,
signer: x.signer,
container: x.cnr,
owner: x.owner,
currentEpoch: x.opts.currentNeoFSEpoch,
sessionToken: x.sessionToken,
attributes: attributes,
rootMeta: newDynamicObjectMetadata(x.opts.withHomoChecksum),
childMeta: newDynamicObjectMetadata(x.opts.withHomoChecksum),
}
res.buf.Grow(int(x.childPayloadSizeLimit()))
res.rootMeta.reset()
res.currentWriter = newLimitedWriter(io.MultiWriter(&res.buf, &res.rootMeta), x.childPayloadSizeLimit())
return res, nil
}
// PayloadWriter is a single-object payload stream provided by Slicer.
type PayloadWriter struct {
stream ObjectWriter
rootID oid.ID
signer neofscrypto.Signer
container cid.ID
owner user.ID
currentEpoch uint64
sessionToken *session.Object
attributes []string
buf bytes.Buffer
rootMeta dynamicObjectMetadata
childMeta dynamicObjectMetadata
currentWriter limitedWriter
withSplit bool
writtenChildren []oid.ID
}
// Write writes next chunk of the object data. Concatenation of all chunks forms
// the payload of the final object. When the data is over, the PayloadWriter
// should be closed.
func (x *PayloadWriter) Write(chunk []byte) (int, error) {
if len(chunk) == 0 {
// not explicitly prohibited in the io.Writer documentation
return 0, nil
}
n, err := x.currentWriter.Write(chunk)
if err == nil || !errors.Is(err, errOverflow) {
return n, err
}
if !x.withSplit {
err = x.writeIntermediateChild(x.rootMeta)
if err != nil {
return n, fmt.Errorf("write 1st child: %w", err)
}
x.currentWriter.reset(io.MultiWriter(&x.buf, &x.rootMeta, &x.childMeta))
x.withSplit = true
} else {
err = x.writeIntermediateChild(x.childMeta)
if err != nil {
return n, fmt.Errorf("write next child: %w", err)
}
x.currentWriter.resetProgress()
}
x.buf.Reset()
x.childMeta.reset()
n2, err := x.Write(chunk[n:]) // here n > 0 so infinite recursion shouldn't occur
return n + n2, err
}
// Close finalizes object with written payload data, saves the object and closes
// the stream. Reference to the stored object can be obtained by ID method.
func (x *PayloadWriter) Close() error {
if x.withSplit {
return x.writeLastChild(x.childMeta, x.setID)
}
return x.writeLastChild(x.rootMeta, x.setID)
}
func (x *PayloadWriter) setID(id oid.ID) {
x.rootID = id
}
// ID returns unique identifier of the stored object representing its reference
// in the configured container.
//
// ID MUST NOT be called before successful Close (undefined behavior otherwise).
func (x *PayloadWriter) ID() oid.ID {
return x.rootID
}
// writeIntermediateChild writes intermediate split-chain element with specified
// dynamicObjectMetadata to the configured ObjectWriter.
func (x *PayloadWriter) writeIntermediateChild(meta dynamicObjectMetadata) error {
return x._writeChild(meta, false, nil)
}
// writeIntermediateChild writes last split-chain element with specified
// dynamicObjectMetadata to the configured ObjectWriter. If rootIDHandler is
// specified, ID of the resulting root object is passed into it.
func (x *PayloadWriter) writeLastChild(meta dynamicObjectMetadata, rootIDHandler func(id oid.ID)) error {
return x._writeChild(meta, true, rootIDHandler)
}
func (x *PayloadWriter) _writeChild(meta dynamicObjectMetadata, last bool, rootIDHandler func(id oid.ID)) error {
currentVersion := version.Current()
fCommon := func(obj *object.Object) {
obj.SetVersion(&currentVersion)
obj.SetContainerID(x.container)
obj.SetCreationEpoch(x.currentEpoch)
obj.SetType(object.TypeRegular)
obj.SetOwnerID(&x.owner)
obj.SetSessionToken(x.sessionToken)
}
var obj object.Object
fCommon(&obj)
if len(x.writtenChildren) > 0 {
obj.SetPreviousID(x.writtenChildren[len(x.writtenChildren)-1])
}
if last {
var rootObj *object.Object
if x.withSplit {
rootObj = new(object.Object)
} else {
rootObj = &obj
}
fCommon(rootObj)
if len(x.attributes) > 0 {
attrs := make([]object.Attribute, len(x.attributes)/2)
for i := 0; i < len(attrs); i++ {
attrs[i].SetKey(x.attributes[2*i])
attrs[i].SetValue(x.attributes[2*i+1])
}
rootObj.SetAttributes(attrs...)
}
rootID, err := flushObjectMetadata(x.signer, x.rootMeta, rootObj)
if err != nil {
return fmt.Errorf("form root object: %w", err)
}
if rootIDHandler != nil {
rootIDHandler(rootID)
}
if x.withSplit {
obj.SetParentID(rootID)
obj.SetParent(rootObj)
}
}
id, err := writeInMemObject(x.signer, x.stream, obj, x.buf.Bytes(), meta)
if err != nil {
return fmt.Errorf("write formed object: %w", err)
}
x.writtenChildren = append(x.writtenChildren, id)
if x.withSplit && last {
obj.ResetPreviousID()
obj.SetChildren(x.writtenChildren...)
_, err = writeInMemObject(x.signer, x.stream, obj, nil, meta)
if err != nil {
return fmt.Errorf("write linking object: %w", err)
}
}
return nil
}
func flushObjectMetadata(signer neofscrypto.Signer, meta dynamicObjectMetadata, header *object.Object) (oid.ID, error) {
var cs checksum.Checksum
var csBytes [sha256.Size]byte
copy(csBytes[:], meta.checksum.Sum(nil))
cs.SetSHA256(csBytes)
header.SetPayloadChecksum(cs)
if meta.homomorphicChecksum != nil {
var csHomoBytes [tz.Size]byte
copy(csHomoBytes[:], meta.homomorphicChecksum.Sum(nil))
cs.SetTillichZemor(csHomoBytes)
header.SetPayloadHomomorphicHash(cs)
}
header.SetPayloadSize(meta.length)
id, err := object.CalculateID(header)
if err != nil {
return id, fmt.Errorf("calculate ID: %w", err)
}
header.SetID(id)
bID, err := id.Marshal()
if err != nil {
return id, fmt.Errorf("marshal object ID: %w", err)
}
var sig neofscrypto.Signature
err = sig.Calculate(signer, bID)
if err != nil {
return id, fmt.Errorf("sign object ID: %w", err)
}
header.SetSignature(&sig)
return id, nil
}
func writeInMemObject(signer neofscrypto.Signer, w ObjectWriter, header object.Object, payload []byte, meta dynamicObjectMetadata) (oid.ID, error) {
id, err := flushObjectMetadata(signer, meta, &header)
if err != nil {
return id, err
}
stream, err := w.InitDataStream(header)
if err != nil {
return id, fmt.Errorf("init data stream for next object: %w", err)
}
_, err = stream.Write(payload)
if err != nil {
return id, fmt.Errorf("write object payload: %w", err)
}
if c, ok := stream.(io.Closer); ok {
err = c.Close()
if err != nil {
return id, fmt.Errorf("finish object stream: %w", err)
}
}
return id, nil
}
// dynamicObjectMetadata groups accumulated object metadata which depends on
// payload.
type dynamicObjectMetadata struct {
length uint64
checksum hash.Hash
homomorphicChecksum hash.Hash
}
func newDynamicObjectMetadata(withHomoChecksum bool) dynamicObjectMetadata {
m := dynamicObjectMetadata{
checksum: sha256.New(),
}
if withHomoChecksum {
m.homomorphicChecksum = tz.New()
}
return m
}
func (x *dynamicObjectMetadata) Write(chunk []byte) (int, error) {
x.accumulateNextPayloadChunk(chunk)
return len(chunk), nil
}
// accumulateNextPayloadChunk handles the next payload chunk and updates the
// accumulated metadata.
func (x *dynamicObjectMetadata) accumulateNextPayloadChunk(chunk []byte) {
x.length += uint64(len(chunk))
x.checksum.Write(chunk)
if x.homomorphicChecksum != nil {
x.homomorphicChecksum.Write(chunk)
}
}
// reset resets all accumulated metadata.
func (x *dynamicObjectMetadata) reset() {
x.length = 0
x.checksum.Reset()
if x.homomorphicChecksum != nil {
x.homomorphicChecksum.Reset()
}
}
var errOverflow = errors.New("overflow")
// limitedWriter provides io.Writer limiting data volume.
type limitedWriter struct {
base io.Writer
limit, written uint64
}
// newLimitedWriter initializes limiterWriter which writes data to the base
// writer before the specified limit.
func newLimitedWriter(base io.Writer, limit uint64) limitedWriter {
return limitedWriter{
base: base,
limit: limit,
}
}
// reset resets progress to zero and sets the base target for writing subsequent
// data.
func (x *limitedWriter) reset(base io.Writer) {
x.base = base
x.resetProgress()
}
// resetProgress resets progress to zero.
func (x *limitedWriter) resetProgress() {
x.written = 0
}
// Write writes next chunk of the data to the base writer. If chunk along with
// already written data overflows configured limit, Write returns errOverflow.
func (x *limitedWriter) Write(p []byte) (n int, err error) {
overflow := uint64(len(p)) > x.limit-x.written
if overflow {
n, err = x.base.Write(p[:x.limit-x.written])
} else {
n, err = x.base.Write(p)
}
x.written += uint64(n)
if overflow && err == nil {
return n, errOverflow
}
return n, err
}

View file

@ -0,0 +1,489 @@
package slicer_test
import (
"bytes"
"crypto/ecdsa"
"crypto/elliptic"
cryptorand "crypto/rand"
"crypto/sha256"
"encoding/base64"
"fmt"
"hash"
"io"
"math/rand"
"testing"
"github.com/nspcc-dev/neofs-sdk-go/checksum"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
"github.com/nspcc-dev/neofs-sdk-go/crypto/test"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/object/slicer"
"github.com/nspcc-dev/neofs-sdk-go/session"
sessiontest "github.com/nspcc-dev/neofs-sdk-go/session/test"
"github.com/nspcc-dev/neofs-sdk-go/user"
usertest "github.com/nspcc-dev/neofs-sdk-go/user/test"
"github.com/nspcc-dev/neofs-sdk-go/version"
"github.com/nspcc-dev/tzhash/tz"
"github.com/stretchr/testify/require"
)
const defaultLimit = 1 << 20
func TestSliceDataIntoObjects(t *testing.T) {
const size = 1 << 10
t.Run("known limit", func(t *testing.T) {
t.Run("under limit", func(t *testing.T) {
testSlicer(t, size, size)
testSlicer(t, size, size+1)
})
t.Run("multiple size", func(t *testing.T) {
testSlicer(t, size, 3*size)
testSlicer(t, size, 3*size+1)
})
})
t.Run("unknown limit", func(t *testing.T) {
t.Run("under limit", func(t *testing.T) {
testSlicer(t, defaultLimit-1, 0)
testSlicer(t, defaultLimit, 0)
})
t.Run("multiple size", func(t *testing.T) {
testSlicer(t, 3*defaultLimit, 0)
testSlicer(t, 3*defaultLimit+1, 0)
})
})
t.Run("no payload", func(t *testing.T) {
testSlicer(t, 0, 0)
testSlicer(t, 0, 1024)
})
}
func BenchmarkSliceDataIntoObjects(b *testing.B) {
const limit = 1 << 7
const stepFactor = 4
for size := uint64(1); size <= 1<<20; size *= stepFactor {
b.Run(fmt.Sprintf("slice_%d-%d", size, limit), func(b *testing.B) {
benchmarkSliceDataIntoObjects(b, size, limit)
})
}
}
func benchmarkSliceDataIntoObjects(b *testing.B, size, sizeLimit uint64) {
in, opts := randomInput(b, size, sizeLimit)
s := slicer.NewSession(in.signer, in.container, *sessiontest.ObjectSigned(test.RandomSigner(b)), discardObject{}, opts)
b.Run("reader", func(b *testing.B) {
var err error
r := bytes.NewReader(in.payload)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = s.Slice(r, in.attributes...)
b.StopTimer()
require.NoError(b, err)
b.StartTimer()
}
})
b.Run("writer", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
var err error
var w *slicer.PayloadWriter
for i := 0; i < b.N; i++ {
w, err = s.InitPayloadStream(in.attributes...)
b.StopTimer()
require.NoError(b, err)
b.StartTimer()
_, err = w.Write(in.payload)
if err == nil {
err = w.Close()
}
b.StopTimer()
require.NoError(b, err)
b.StartTimer()
}
})
}
type discardObject struct{}
func (discardObject) InitDataStream(object.Object) (io.Writer, error) {
return discardPayload{}, nil
}
type discardPayload struct{}
func (discardPayload) Write(p []byte) (n int, err error) {
return len(p), nil
}
type input struct {
signer neofscrypto.Signer
container cid.ID
owner user.ID
currentEpoch uint64
payloadLimit uint64
sessionToken *session.Object
payload []byte
attributes []string
withHomo bool
}
func randomData(size uint64) []byte {
data := make([]byte, size)
rand.Read(data)
return data
}
func randomInput(tb testing.TB, size, sizeLimit uint64) (input, slicer.Options) {
key, err := ecdsa.GenerateKey(elliptic.P256(), cryptorand.Reader)
if err != nil {
panic(fmt.Sprintf("generate ECDSA private key: %v", err))
}
attrNum := rand.Int() % 5
attrs := make([]string, 2*attrNum)
for i := 0; i < len(attrs); i += 2 {
attrs[i] = base64.StdEncoding.EncodeToString(randomData(32))
attrs[i+1] = base64.StdEncoding.EncodeToString(randomData(32))
}
var in input
in.signer = neofsecdsa.Signer(*key)
in.container = cidtest.ID()
in.currentEpoch = rand.Uint64()
if sizeLimit > 0 {
in.payloadLimit = sizeLimit
} else {
in.payloadLimit = defaultLimit
}
in.payload = randomData(size)
in.attributes = attrs
if rand.Int()%2 == 0 {
in.sessionToken = sessiontest.ObjectSigned(test.RandomSigner(tb))
} else {
in.owner = *usertest.ID(tb)
}
in.withHomo = rand.Int()%2 == 0
var opts slicer.Options
opts.SetObjectPayloadLimit(in.payloadLimit)
opts.SetCurrentNeoFSEpoch(in.currentEpoch)
if in.withHomo {
opts.CalculateHomomorphicChecksum()
}
return in, opts
}
func testSlicer(tb testing.TB, size, sizeLimit uint64) {
in, opts := randomInput(tb, size, sizeLimit)
checker := &slicedObjectChecker{
tb: tb,
input: in,
chainCollector: newChainCollector(tb),
}
var s *slicer.Slicer
if checker.input.sessionToken != nil {
s = slicer.NewSession(in.signer, checker.input.container, *checker.input.sessionToken, checker, opts)
} else {
s = slicer.New(in.signer, checker.input.container, checker.input.owner, checker, opts)
}
// check reader
rootID, err := s.Slice(bytes.NewReader(in.payload), in.attributes...)
require.NoError(tb, err)
checker.chainCollector.verify(checker.input, rootID)
// check writer with random written chunk's size
checker.chainCollector = newChainCollector(tb)
w, err := s.InitPayloadStream(in.attributes...)
require.NoError(tb, err)
var chunkSize int
if len(in.payload) > 0 {
chunkSize = rand.Int() % len(in.payload)
if chunkSize == 0 {
chunkSize = 1
}
}
for payload := in.payload; len(payload) > 0; payload = payload[chunkSize:] {
if chunkSize > len(payload) {
chunkSize = len(payload)
}
n, err := w.Write(payload[:chunkSize])
require.NoError(tb, err)
require.EqualValues(tb, chunkSize, n)
}
err = w.Close()
require.NoError(tb, err)
checker.chainCollector.verify(checker.input, w.ID())
}
type slicedObjectChecker struct {
tb testing.TB
input input
chainCollector *chainCollector
}
func (x *slicedObjectChecker) InitDataStream(hdr object.Object) (io.Writer, error) {
checkStaticMetadata(x.tb, hdr, x.input)
buf := bytes.NewBuffer(nil)
x.chainCollector.handleOutgoingObject(hdr, buf)
return newSizeChecker(x.tb, buf, x.input.payloadLimit), nil
}
type writeSizeChecker struct {
tb testing.TB
limit uint64
processed uint64
base io.Writer
}
func newSizeChecker(tb testing.TB, base io.Writer, sizeLimit uint64) io.Writer {
return &writeSizeChecker{
tb: tb,
limit: sizeLimit,
base: base,
}
}
func (x *writeSizeChecker) Write(p []byte) (int, error) {
n, err := x.base.Write(p)
x.processed += uint64(n)
return n, err
}
func (x *writeSizeChecker) Close() error {
require.LessOrEqual(x.tb, x.processed, x.limit, "object payload must not overflow the limit")
return nil
}
type payloadWithChecksum struct {
r io.Reader
cs []checksum.Checksum
hs []hash.Hash
}
type chainCollector struct {
tb testing.TB
mProcessed map[oid.ID]struct{}
parentHeaderSet bool
parentHeader object.Object
splitID *object.SplitID
firstSet bool
first oid.ID
firstHeader object.Object
mNext map[oid.ID]oid.ID
mPayloads map[oid.ID]payloadWithChecksum
children []oid.ID
}
func newChainCollector(tb testing.TB) *chainCollector {
return &chainCollector{
tb: tb,
mProcessed: make(map[oid.ID]struct{}),
mNext: make(map[oid.ID]oid.ID),
mPayloads: make(map[oid.ID]payloadWithChecksum),
}
}
func checkStaticMetadata(tb testing.TB, header object.Object, in input) {
cnr, ok := header.ContainerID()
require.True(tb, ok, "all objects must be bound to some container")
require.True(tb, cnr.Equals(in.container), "the container must be set to the configured one")
owner := header.OwnerID()
require.NotNil(tb, owner, "any object must be owned by somebody")
if in.sessionToken != nil {
require.True(tb, in.sessionToken.Issuer().Equals(*owner), "owner must be set to the session issuer")
} else {
require.True(tb, owner.Equals(in.owner), "owner must be set to the particular user")
}
ver := header.Version()
require.NotNil(tb, ver, "version must be set in all objects")
require.Equal(tb, version.Current(), *ver, "the version must be set to current SDK one")
require.Equal(tb, object.TypeRegular, header.Type(), "only regular objects must be produced")
require.EqualValues(tb, in.currentEpoch, header.CreationEpoch(), "configured current epoch must be set as creation epoch")
require.Equal(tb, in.sessionToken, header.SessionToken(), "configured session token must be written into objects")
require.NoError(tb, object.CheckHeaderVerificationFields(&header), "verification fields must be correctly set in header")
_, ok = header.PayloadHomomorphicHash()
require.Equal(tb, in.withHomo, ok)
}
func (x *chainCollector) handleOutgoingObject(header object.Object, payload io.Reader) {
id, ok := header.ID()
require.True(x.tb, ok, "all objects must have an ID")
idCalc, err := object.CalculateID(&header)
require.NoError(x.tb, err)
require.True(x.tb, idCalc.Equals(id))
_, ok = x.mProcessed[id]
require.False(x.tb, ok, "object must be written exactly once")
x.mProcessed[id] = struct{}{}
splitID := header.SplitID()
if x.splitID == nil && splitID != nil {
x.splitID = splitID
} else {
require.Equal(x.tb, x.splitID, splitID, "split ID must the same in all objects")
}
parent := header.Parent()
if parent != nil {
require.Nil(x.tb, parent.Parent(), "multi-level genealogy is not supported")
if x.parentHeaderSet {
require.Equal(x.tb, x.parentHeader, *parent, "root header must the same")
} else {
x.parentHeaderSet = true
x.parentHeader = *parent
}
}
prev, ok := header.PreviousID()
if ok {
_, ok := x.mNext[prev]
require.False(x.tb, ok, "split-chain must not be forked")
for k := range x.mNext {
require.False(x.tb, k.Equals(prev), "split-chain must not be cycled")
}
x.mNext[prev] = id
} else if len(header.Children()) == 0 { // 1st split-chain or linking object
require.False(x.tb, x.firstSet, "there must not be multiple split-chains")
x.firstSet = true
x.first = id
x.firstHeader = header
}
children := header.Children()
if len(children) > 0 {
if len(x.children) > 0 {
require.Equal(x.tb, x.children, children, "children list must be the same")
} else {
x.children = children
}
}
cs, ok := header.PayloadChecksum()
require.True(x.tb, ok)
pcs := payloadWithChecksum{
r: payload,
cs: []checksum.Checksum{cs},
hs: []hash.Hash{sha256.New()},
}
csHomo, ok := header.PayloadHomomorphicHash()
if ok {
pcs.cs = append(pcs.cs, csHomo)
pcs.hs = append(pcs.hs, tz.New())
}
x.mPayloads[id] = pcs
}
func (x *chainCollector) verify(in input, rootID oid.ID) {
require.True(x.tb, x.firstSet, "initial split-chain element must be set")
rootObj := x.parentHeader
if !x.parentHeaderSet {
rootObj = x.firstHeader
}
restoredChain := []oid.ID{x.first}
restoredPayload := bytes.NewBuffer(make([]byte, 0, rootObj.PayloadSize()))
for {
v, ok := x.mPayloads[restoredChain[len(restoredChain)-1]]
require.True(x.tb, ok)
ws := []io.Writer{restoredPayload}
for i := range v.hs {
ws = append(ws, v.hs[i])
}
_, err := io.Copy(io.MultiWriter(ws...), v.r)
require.NoError(x.tb, err)
for i := range v.cs {
require.True(x.tb, bytes.Equal(v.cs[i].Value(), v.hs[i].Sum(nil)))
}
next, ok := x.mNext[restoredChain[len(restoredChain)-1]]
if !ok {
break
}
restoredChain = append(restoredChain, next)
}
rootObj.SetPayload(restoredPayload.Bytes())
if uint64(len(in.payload)) <= in.payloadLimit {
require.Empty(x.tb, x.children)
} else {
require.Equal(x.tb, x.children, restoredChain)
}
id, ok := rootObj.ID()
require.True(x.tb, ok, "root object must have an ID")
require.True(x.tb, id.Equals(rootID), "root ID in root object must be returned in the result")
checkStaticMetadata(x.tb, rootObj, in)
attrs := rootObj.Attributes()
require.Len(x.tb, attrs, len(in.attributes)/2)
for i := range attrs {
require.Equal(x.tb, in.attributes[2*i], attrs[i].Key())
require.Equal(x.tb, in.attributes[2*i+1], attrs[i].Value())
}
require.Equal(x.tb, in.payload, rootObj.Payload())
require.NoError(x.tb, object.VerifyPayloadChecksum(&rootObj), "payload checksum must be correctly set")
}