[#205] object: Initial EC implementation
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
9c96c49928
commit
50e538f27a
12 changed files with 928 additions and 0 deletions
2
go.mod
2
go.mod
|
@ -11,6 +11,7 @@ require (
|
|||
github.com/antlr4-go/antlr/v4 v4.13.0
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.2
|
||||
github.com/klauspost/reedsolomon v1.12.1
|
||||
github.com/mr-tron/base58 v1.2.0
|
||||
github.com/nspcc-dev/neo-go v0.101.2-0.20230601131642-a0117042e8fc
|
||||
github.com/stretchr/testify v1.8.3
|
||||
|
@ -28,6 +29,7 @@ require (
|
|||
github.com/golang/protobuf v1.5.3 // indirect
|
||||
github.com/gorilla/websocket v1.5.0 // indirect
|
||||
github.com/hashicorp/golang-lru v0.6.0 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
|
||||
github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22 // indirect
|
||||
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20230615193820-9185820289ce // indirect
|
||||
github.com/nspcc-dev/rfc6979 v0.2.0 // indirect
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
121
object/erasure_code.go
Normal file
121
object/erasure_code.go
Normal file
|
@ -0,0 +1,121 @@
|
|||
package object
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
refs "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
// ECHeader represents erasure coding header.
|
||||
type ECHeader struct {
|
||||
parent oid.ID
|
||||
index uint32
|
||||
total uint32
|
||||
header []byte
|
||||
headerLength uint32
|
||||
}
|
||||
|
||||
// NewECHeader constructs new erasure coding header.
|
||||
func NewECHeader(parent oid.ID, index, total uint32, header []byte, headerLength uint32) *ECHeader {
|
||||
return &ECHeader{
|
||||
parent: parent,
|
||||
index: index,
|
||||
total: total,
|
||||
header: header,
|
||||
headerLength: headerLength,
|
||||
}
|
||||
}
|
||||
|
||||
// WriteToV2 converts SDK structure to v2-api one.
|
||||
func (e *ECHeader) WriteToV2(h *object.ECHeader) {
|
||||
var parent refs.ObjectID
|
||||
e.parent.WriteToV2(&parent)
|
||||
h.Parent = &parent
|
||||
h.Index = e.index
|
||||
h.Total = e.total
|
||||
h.Header = e.header
|
||||
h.HeaderLength = e.headerLength
|
||||
}
|
||||
|
||||
// ReadFromV2 converts v2-api structure to SDK one.
|
||||
func (e *ECHeader) ReadFromV2(h *object.ECHeader) error {
|
||||
if h == nil {
|
||||
return nil
|
||||
}
|
||||
if h.Parent == nil {
|
||||
return errors.New("empty parent")
|
||||
}
|
||||
|
||||
_ = e.parent.ReadFromV2(*h.Parent)
|
||||
e.index = h.Index
|
||||
e.total = h.Total
|
||||
e.header = h.Header
|
||||
e.headerLength = h.HeaderLength
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *Object) ECHeader() *ECHeader {
|
||||
ec := (*object.Object)(o).GetHeader().GetEC()
|
||||
if ec == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
h := new(ECHeader)
|
||||
_ = h.ReadFromV2(ec)
|
||||
return h
|
||||
}
|
||||
|
||||
func (o *Object) SetECHeader(ec *ECHeader) {
|
||||
o.setHeaderField(func(h *object.Header) {
|
||||
if ec == nil {
|
||||
h.SetEC(nil)
|
||||
return
|
||||
}
|
||||
|
||||
v2 := new(object.ECHeader)
|
||||
ec.WriteToV2(v2)
|
||||
h.SetEC(v2)
|
||||
})
|
||||
}
|
||||
|
||||
func (e *ECHeader) Parent() oid.ID {
|
||||
return e.parent
|
||||
}
|
||||
|
||||
func (e *ECHeader) SetParent(id oid.ID) {
|
||||
e.parent = id
|
||||
}
|
||||
|
||||
func (e *ECHeader) Index() uint32 {
|
||||
return e.index
|
||||
}
|
||||
|
||||
func (e *ECHeader) SetIndex(i uint32) {
|
||||
e.index = i
|
||||
}
|
||||
|
||||
func (e *ECHeader) Total() uint32 {
|
||||
return e.total
|
||||
}
|
||||
|
||||
func (e *ECHeader) SetTotal(i uint32) {
|
||||
e.total = i
|
||||
}
|
||||
|
||||
func (e *ECHeader) Header() []byte {
|
||||
return e.header
|
||||
}
|
||||
|
||||
func (e *ECHeader) SetHeader(header []byte) {
|
||||
e.header = header
|
||||
}
|
||||
|
||||
func (e *ECHeader) HeaderLength() uint32 {
|
||||
return e.headerLength
|
||||
}
|
||||
|
||||
func (e *ECHeader) SetHeaderLength(l uint32) {
|
||||
e.headerLength = l
|
||||
}
|
87
object/erasurecode/constructor.go
Normal file
87
object/erasurecode/constructor.go
Normal file
|
@ -0,0 +1,87 @@
|
|||
package erasurecode
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"github.com/klauspost/reedsolomon"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrMalformedSlice is returned when a slice of EC chunks is inconsistent.
|
||||
ErrMalformedSlice = errors.New("inconsistent EC headers")
|
||||
// ErrInvShardNum is returned from NewConstructor when the number of shards is invalid.
|
||||
ErrInvShardNum = reedsolomon.ErrInvShardNum
|
||||
// ErrMaxShardNum is returned from NewConstructor when the number of shards is too big.
|
||||
ErrMaxShardNum = reedsolomon.ErrMaxShardNum
|
||||
)
|
||||
|
||||
// MaxShardCount is the maximum number of shards.
|
||||
const MaxShardCount = 256
|
||||
|
||||
// Constructor is a wrapper around encoder allowing to reconstruct objects.
|
||||
// It's methods are not thread-safe.
|
||||
type Constructor struct {
|
||||
enc reedsolomon.Encoder
|
||||
headerLength uint32
|
||||
payloadShards [][]byte
|
||||
headerShards [][]byte
|
||||
}
|
||||
|
||||
// NewConstructor returns new constructor instance.
|
||||
func NewConstructor(dataCount int, parityCount int) (*Constructor, error) {
|
||||
// The library supports up to 65536 shards with some restrictions.
|
||||
// This can easily result in OOM or panic, thus SDK declares it's own restriction.
|
||||
if dataCount+parityCount > MaxShardCount {
|
||||
return nil, ErrMaxShardNum
|
||||
}
|
||||
|
||||
enc, err := reedsolomon.New(dataCount, parityCount)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Constructor{enc: enc}, nil
|
||||
}
|
||||
|
||||
// clear clears internal state of the constructor, so it can be reused.
|
||||
func (c *Constructor) clear() {
|
||||
c.headerLength = 0
|
||||
c.payloadShards = nil
|
||||
c.headerShards = nil
|
||||
}
|
||||
|
||||
func (c *Constructor) fillHeader(parts []*objectSDK.Object) error {
|
||||
shards := make([][]byte, len(parts))
|
||||
headerLength := 0
|
||||
for i := range parts {
|
||||
if parts[i] == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
var err error
|
||||
headerLength, err = validatePart(parts, i, headerLength)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
shards[i] = parts[i].GetECHeader().Header()
|
||||
}
|
||||
|
||||
c.headerLength = uint32(headerLength)
|
||||
c.headerShards = shards
|
||||
return nil
|
||||
}
|
||||
|
||||
// fillPayload fills the payload shards.
|
||||
// Currently there is no case when it can be called without reconstructing header,
|
||||
// thus fillHeader() must be called before and this function performs no validation.
|
||||
func (c *Constructor) fillPayload(parts []*objectSDK.Object) {
|
||||
shards := make([][]byte, len(parts))
|
||||
for i := range parts {
|
||||
if parts[i] == nil {
|
||||
continue
|
||||
}
|
||||
shards[i] = parts[i].Payload()
|
||||
}
|
||||
c.payloadShards = shards
|
||||
}
|
31
object/erasurecode/constructor_test.go
Normal file
31
object/erasurecode/constructor_test.go
Normal file
|
@ -0,0 +1,31 @@
|
|||
package erasurecode_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestErasureConstruct(t *testing.T) {
|
||||
t.Run("negative, no panic", func(t *testing.T) {
|
||||
_, err := erasurecode.NewConstructor(-1, 2)
|
||||
require.ErrorIs(t, err, erasurecode.ErrInvShardNum)
|
||||
})
|
||||
t.Run("negative, no panic", func(t *testing.T) {
|
||||
_, err := erasurecode.NewConstructor(2, -1)
|
||||
require.ErrorIs(t, err, erasurecode.ErrInvShardNum)
|
||||
})
|
||||
t.Run("zero parity", func(t *testing.T) {
|
||||
_, err := erasurecode.NewConstructor(1, 0)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
t.Run("max shard num", func(t *testing.T) {
|
||||
_, err := erasurecode.NewConstructor(erasurecode.MaxShardCount, 0)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
t.Run("max+1 shard num", func(t *testing.T) {
|
||||
_, err := erasurecode.NewConstructor(erasurecode.MaxShardCount+1, 0)
|
||||
require.ErrorIs(t, err, erasurecode.ErrMaxShardNum)
|
||||
})
|
||||
}
|
142
object/erasurecode/reconstruct.go
Normal file
142
object/erasurecode/reconstruct.go
Normal file
|
@ -0,0 +1,142 @@
|
|||
package erasurecode
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/ecdsa"
|
||||
"fmt"
|
||||
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"github.com/klauspost/reedsolomon"
|
||||
)
|
||||
|
||||
// Reconstruct returns full object reconstructed from parts.
|
||||
// All non-nil objects in parts must have EC header with the same `total` field equal to len(parts).
|
||||
// The slice must contain at least one non nil object.
|
||||
// Index of the objects in parts must be equal to it's index field in the EC header.
|
||||
// The parts slice isn't changed and can be used concurrently for reading.
|
||||
func (c *Constructor) Reconstruct(parts []*objectSDK.Object) (*objectSDK.Object, error) {
|
||||
res, err := c.ReconstructHeader(parts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.fillPayload(parts)
|
||||
|
||||
payload, err := reconstructExact(c.enc, int(res.PayloadSize()), c.payloadShards)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%w: %w", ErrMalformedSlice, err)
|
||||
}
|
||||
|
||||
res.SetPayload(payload)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// ReconstructHeader returns object header reconstructed from parts.
|
||||
// All non-nil objects in parts must have EC header with the same `total` field equal to len(parts).
|
||||
// The slice must contain at least one non nil object.
|
||||
// Index of the objects in parts must be equal to it's index field in the EC header.
|
||||
// The parts slice isn't changed and can be used concurrently for reading.
|
||||
func (c *Constructor) ReconstructHeader(parts []*objectSDK.Object) (*objectSDK.Object, error) {
|
||||
c.clear()
|
||||
|
||||
if err := c.fillHeader(parts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
obj, err := c.reconstructHeader()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%w: %w", ErrMalformedSlice, err)
|
||||
}
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
// ReconstructParts reconstructs specific EC parts without reconstructing full object.
|
||||
// All non-nil objects in parts must have EC header with the same `total` field equal to len(parts).
|
||||
// The slice must contain at least one non nil object.
|
||||
// Index of the objects in parts must be equal to it's index field in the EC header.
|
||||
// Those parts for which corresponding element in required is true must be nil and will be overwritten.
|
||||
// Because partial reconstruction only makes sense for full objects, all parts must have non-empty payload.
|
||||
// If key is not nil, all reconstructed parts are signed with this key.
|
||||
func (c *Constructor) ReconstructParts(parts []*objectSDK.Object, required []bool, key *ecdsa.PrivateKey) error {
|
||||
if len(required) != len(parts) {
|
||||
return fmt.Errorf("len(parts) != len(required): %d != %d", len(parts), len(required))
|
||||
}
|
||||
|
||||
c.clear()
|
||||
|
||||
if err := c.fillHeader(parts); err != nil {
|
||||
return err
|
||||
}
|
||||
c.fillPayload(parts)
|
||||
|
||||
if err := c.enc.ReconstructSome(c.payloadShards, required); err != nil {
|
||||
return fmt.Errorf("%w: %w", ErrMalformedSlice, err)
|
||||
}
|
||||
if err := c.enc.ReconstructSome(c.headerShards, required); err != nil {
|
||||
return fmt.Errorf("%w: %w", ErrMalformedSlice, err)
|
||||
}
|
||||
|
||||
nonNilPart := 0
|
||||
for i := range parts {
|
||||
if parts[i] != nil {
|
||||
nonNilPart = i
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
ec := parts[nonNilPart].GetECHeader()
|
||||
parent := ec.Parent()
|
||||
total := ec.Total()
|
||||
|
||||
for i := range required {
|
||||
if parts[i] != nil || !required[i] {
|
||||
continue
|
||||
}
|
||||
|
||||
part := objectSDK.New()
|
||||
copyRequiredFields(part, parts[nonNilPart])
|
||||
part.SetPayload(c.payloadShards[i])
|
||||
part.SetPayloadSize(uint64(len(c.payloadShards[i])))
|
||||
part.SetECHeader(objectSDK.NewECHeader(parent, uint32(i), total,
|
||||
c.headerShards[i], c.headerLength))
|
||||
|
||||
if err := setIDWithSignature(part, key); err != nil {
|
||||
return err
|
||||
}
|
||||
parts[i] = part
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Constructor) reconstructHeader() (*objectSDK.Object, error) {
|
||||
data, err := reconstructExact(c.enc, int(c.headerLength), c.headerShards)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var obj objectSDK.Object
|
||||
return &obj, obj.Unmarshal(data)
|
||||
}
|
||||
|
||||
func reconstructExact(enc reedsolomon.Encoder, size int, shards [][]byte) ([]byte, error) {
|
||||
if err := enc.ReconstructData(shards); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Technically, this error will be returned from enc.Join().
|
||||
// However, allocating based on unvalidated user data is an easy attack vector.
|
||||
// Preallocating seems to have enough benefits to justify a slight increase in code complexity.
|
||||
maxSize := 0
|
||||
for i := range shards {
|
||||
maxSize += len(shards[i])
|
||||
}
|
||||
if size > maxSize {
|
||||
return nil, reedsolomon.ErrShortData
|
||||
}
|
||||
|
||||
buf := bytes.NewBuffer(make([]byte, 0, size))
|
||||
if err := enc.Join(buf, shards, size); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return buf.Bytes(), nil
|
||||
}
|
284
object/erasurecode/reconstruct_test.go
Normal file
284
object/erasurecode/reconstruct_test.go
Normal file
|
@ -0,0 +1,284 @@
|
|||
package erasurecode_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestErasureCodeReconstruct(t *testing.T) {
|
||||
const payloadSize = 99
|
||||
const dataCount = 3
|
||||
const parityCount = 2
|
||||
|
||||
// We would also like to test padding behaviour,
|
||||
// so ensure padding is done.
|
||||
require.NotZero(t, payloadSize%(dataCount+parityCount))
|
||||
|
||||
pk, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
original := newObject(t, payloadSize, pk)
|
||||
|
||||
c, err := erasurecode.NewConstructor(dataCount, parityCount)
|
||||
require.NoError(t, err)
|
||||
|
||||
parts, err := c.Split(original, &pk.PrivateKey)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("reconstruct header", func(t *testing.T) {
|
||||
original := original.CutPayload()
|
||||
parts := cloneSlice(parts)
|
||||
for i := range parts {
|
||||
parts[i] = parts[i].CutPayload()
|
||||
}
|
||||
t.Run("from data", func(t *testing.T) {
|
||||
parts := cloneSlice(parts)
|
||||
for i := dataCount; i < dataCount+parityCount; i++ {
|
||||
parts[i] = nil
|
||||
}
|
||||
reconstructed, err := c.ReconstructHeader(parts)
|
||||
require.NoError(t, err)
|
||||
verifyReconstruction(t, original, reconstructed)
|
||||
})
|
||||
t.Run("from parity", func(t *testing.T) {
|
||||
parts := cloneSlice(parts)
|
||||
for i := 0; i < parityCount; i++ {
|
||||
parts[i] = nil
|
||||
}
|
||||
reconstructed, err := c.ReconstructHeader(parts)
|
||||
require.NoError(t, err)
|
||||
verifyReconstruction(t, original, reconstructed)
|
||||
|
||||
t.Run("not enough shards", func(t *testing.T) {
|
||||
parts[parityCount] = nil
|
||||
_, err := c.ReconstructHeader(parts)
|
||||
require.ErrorIs(t, err, erasurecode.ErrMalformedSlice)
|
||||
})
|
||||
})
|
||||
t.Run("only nil parts", func(t *testing.T) {
|
||||
parts := make([]*objectSDK.Object, len(parts))
|
||||
_, err := c.ReconstructHeader(parts)
|
||||
require.ErrorIs(t, err, erasurecode.ErrMalformedSlice)
|
||||
})
|
||||
t.Run("missing EC header", func(t *testing.T) {
|
||||
parts := cloneSlice(parts)
|
||||
parts[0] = deepCopy(t, parts[0])
|
||||
parts[0].SetECHeader(nil)
|
||||
|
||||
_, err := c.ReconstructHeader(parts)
|
||||
require.ErrorIs(t, err, erasurecode.ErrMalformedSlice)
|
||||
})
|
||||
t.Run("invalid index", func(t *testing.T) {
|
||||
parts := cloneSlice(parts)
|
||||
parts[0] = deepCopy(t, parts[0])
|
||||
|
||||
ec := parts[0].GetECHeader()
|
||||
ec.SetIndex(1)
|
||||
parts[0].SetECHeader(ec)
|
||||
|
||||
_, err := c.ReconstructHeader(parts)
|
||||
require.ErrorIs(t, err, erasurecode.ErrMalformedSlice)
|
||||
})
|
||||
t.Run("invalid total", func(t *testing.T) {
|
||||
parts := cloneSlice(parts)
|
||||
parts[0] = deepCopy(t, parts[0])
|
||||
|
||||
ec := parts[0].GetECHeader()
|
||||
ec.SetTotal(uint32(len(parts) + 1))
|
||||
parts[0].SetECHeader(ec)
|
||||
|
||||
_, err := c.ReconstructHeader(parts)
|
||||
require.ErrorIs(t, err, erasurecode.ErrMalformedSlice)
|
||||
})
|
||||
t.Run("inconsistent header length", func(t *testing.T) {
|
||||
parts := cloneSlice(parts)
|
||||
parts[0] = deepCopy(t, parts[0])
|
||||
|
||||
ec := parts[0].GetECHeader()
|
||||
ec.SetHeaderLength(ec.HeaderLength() - 1)
|
||||
parts[0].SetECHeader(ec)
|
||||
|
||||
_, err := c.ReconstructHeader(parts)
|
||||
require.ErrorIs(t, err, erasurecode.ErrMalformedSlice)
|
||||
})
|
||||
t.Run("invalid header length", func(t *testing.T) {
|
||||
parts := cloneSlice(parts)
|
||||
for i := range parts {
|
||||
parts[i] = deepCopy(t, parts[i])
|
||||
|
||||
ec := parts[0].GetECHeader()
|
||||
ec.SetHeaderLength(math.MaxUint32)
|
||||
parts[0].SetECHeader(ec)
|
||||
}
|
||||
|
||||
_, err := c.ReconstructHeader(parts)
|
||||
require.ErrorIs(t, err, erasurecode.ErrMalformedSlice)
|
||||
})
|
||||
})
|
||||
t.Run("reconstruct data", func(t *testing.T) {
|
||||
t.Run("from data", func(t *testing.T) {
|
||||
parts := cloneSlice(parts)
|
||||
for i := dataCount; i < dataCount+parityCount; i++ {
|
||||
parts[i] = nil
|
||||
}
|
||||
reconstructed, err := c.Reconstruct(parts)
|
||||
require.NoError(t, err)
|
||||
verifyReconstruction(t, original, reconstructed)
|
||||
})
|
||||
t.Run("from parity", func(t *testing.T) {
|
||||
parts := cloneSlice(parts)
|
||||
for i := 0; i < parityCount; i++ {
|
||||
parts[i] = nil
|
||||
}
|
||||
reconstructed, err := c.Reconstruct(parts)
|
||||
require.NoError(t, err)
|
||||
verifyReconstruction(t, original, reconstructed)
|
||||
|
||||
t.Run("not enough shards", func(t *testing.T) {
|
||||
parts[parityCount] = nil
|
||||
_, err := c.Reconstruct(parts)
|
||||
require.ErrorIs(t, err, erasurecode.ErrMalformedSlice)
|
||||
})
|
||||
})
|
||||
})
|
||||
t.Run("reconstruct parts", func(t *testing.T) {
|
||||
// We would like to also test that ReconstructParts doesn't perform
|
||||
// excessive work, so ensure this test makes sense.
|
||||
require.GreaterOrEqual(t, parityCount, 2)
|
||||
|
||||
t.Run("from data", func(t *testing.T) {
|
||||
oldParts := parts
|
||||
parts := cloneSlice(parts)
|
||||
for i := dataCount; i < dataCount+parityCount; i++ {
|
||||
parts[i] = nil
|
||||
}
|
||||
|
||||
required := make([]bool, len(parts))
|
||||
required[dataCount] = true
|
||||
|
||||
require.NoError(t, c.ReconstructParts(parts, required, nil))
|
||||
|
||||
old := deepCopy(t, oldParts[dataCount])
|
||||
old.SetSignature(nil)
|
||||
require.Equal(t, old, parts[dataCount])
|
||||
|
||||
for i := dataCount + 1; i < dataCount+parityCount; i++ {
|
||||
require.Nil(t, parts[i])
|
||||
}
|
||||
})
|
||||
t.Run("from parity", func(t *testing.T) {
|
||||
oldParts := parts
|
||||
parts := cloneSlice(parts)
|
||||
for i := 0; i < parityCount; i++ {
|
||||
parts[i] = nil
|
||||
}
|
||||
|
||||
required := make([]bool, len(parts))
|
||||
required[0] = true
|
||||
|
||||
require.NoError(t, c.ReconstructParts(parts, required, nil))
|
||||
|
||||
old := deepCopy(t, oldParts[0])
|
||||
old.SetSignature(nil)
|
||||
require.Equal(t, old, parts[0])
|
||||
|
||||
for i := 1; i < parityCount; i++ {
|
||||
require.Nil(t, parts[i])
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func newObject(t *testing.T, size uint64, pk *keys.PrivateKey) *objectSDK.Object {
|
||||
// Use transformer to form object to avoid potential bugs with yet another helper object creation in tests.
|
||||
tt := &testTarget{}
|
||||
p := transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||
Key: &pk.PrivateKey,
|
||||
NextTargetInit: func() transformer.ObjectWriter { return tt },
|
||||
NetworkState: dummyEpochSource(123),
|
||||
MaxSize: size + 1,
|
||||
WithoutHomomorphicHash: true,
|
||||
})
|
||||
cnr := cidtest.ID()
|
||||
ver := version.Current()
|
||||
hdr := objectSDK.New()
|
||||
hdr.SetContainerID(cnr)
|
||||
hdr.SetType(objectSDK.TypeRegular)
|
||||
hdr.SetVersion(&ver)
|
||||
|
||||
var owner user.ID
|
||||
user.IDFromKey(&owner, pk.PrivateKey.PublicKey)
|
||||
hdr.SetOwnerID(owner)
|
||||
|
||||
var attr objectSDK.Attribute
|
||||
attr.SetKey("somekey")
|
||||
attr.SetValue("somevalue")
|
||||
hdr.SetAttributes(attr)
|
||||
|
||||
expectedPayload := make([]byte, size)
|
||||
_, _ = rand.Read(expectedPayload)
|
||||
writeObject(t, context.Background(), p, hdr, expectedPayload)
|
||||
require.Len(t, tt.objects, 1)
|
||||
return tt.objects[0]
|
||||
}
|
||||
|
||||
func writeObject(t *testing.T, ctx context.Context, target transformer.ChunkedObjectWriter, header *objectSDK.Object, payload []byte) *transformer.AccessIdentifiers {
|
||||
require.NoError(t, target.WriteHeader(ctx, header))
|
||||
|
||||
_, err := target.Write(ctx, payload)
|
||||
require.NoError(t, err)
|
||||
|
||||
ids, err := target.Close(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
return ids
|
||||
}
|
||||
|
||||
func verifyReconstruction(t *testing.T, original, reconstructed *objectSDK.Object) {
|
||||
require.True(t, reconstructed.VerifyIDSignature())
|
||||
reconstructed.ToV2().SetMarshalData(nil)
|
||||
original.ToV2().SetMarshalData(nil)
|
||||
|
||||
require.Equal(t, original, reconstructed)
|
||||
}
|
||||
|
||||
func deepCopy(t *testing.T, obj *objectSDK.Object) *objectSDK.Object {
|
||||
data, err := obj.Marshal()
|
||||
require.NoError(t, err)
|
||||
|
||||
res := objectSDK.New()
|
||||
require.NoError(t, res.Unmarshal(data))
|
||||
return res
|
||||
}
|
||||
|
||||
func cloneSlice[T any](src []T) []T {
|
||||
dst := make([]T, len(src))
|
||||
copy(dst, src)
|
||||
return dst
|
||||
}
|
||||
|
||||
type dummyEpochSource uint64
|
||||
|
||||
func (s dummyEpochSource) CurrentEpoch() uint64 {
|
||||
return uint64(s)
|
||||
}
|
||||
|
||||
type testTarget struct {
|
||||
objects []*objectSDK.Object
|
||||
}
|
||||
|
||||
func (tt *testTarget) WriteObject(_ context.Context, o *objectSDK.Object) error {
|
||||
tt.objects = append(tt.objects, o)
|
||||
return nil // AccessIdentifiers should not be used.
|
||||
}
|
69
object/erasurecode/split.go
Normal file
69
object/erasurecode/split.go
Normal file
|
@ -0,0 +1,69 @@
|
|||
package erasurecode
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
)
|
||||
|
||||
// Split splits fully formed object into multiple chunks.
|
||||
func (c *Constructor) Split(obj *objectSDK.Object, key *ecdsa.PrivateKey) ([]*objectSDK.Object, error) {
|
||||
c.clear()
|
||||
|
||||
header, err := obj.CutPayload().Marshal()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
headerShards, err := c.encodeRaw(header)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
payloadShards, err := c.encodeRaw(obj.Payload())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
parts := make([]*objectSDK.Object, len(payloadShards))
|
||||
parent, _ := obj.ID()
|
||||
for i := range parts {
|
||||
chunk := objectSDK.New()
|
||||
copyRequiredFields(chunk, obj)
|
||||
chunk.SetPayload(payloadShards[i])
|
||||
chunk.SetPayloadSize(uint64(len(payloadShards[i])))
|
||||
|
||||
ec := objectSDK.NewECHeader(parent, uint32(i), uint32(len(payloadShards)), headerShards[i], uint32(len(header)))
|
||||
chunk.SetECHeader(ec)
|
||||
if err := setIDWithSignature(chunk, key); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
parts[i] = chunk
|
||||
}
|
||||
return parts, nil
|
||||
}
|
||||
|
||||
func setIDWithSignature(obj *objectSDK.Object, key *ecdsa.PrivateKey) error {
|
||||
if err := objectSDK.CalculateAndSetID(obj); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
objectSDK.CalculateAndSetPayloadChecksum(obj)
|
||||
|
||||
if key == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return objectSDK.CalculateAndSetSignature(*key, obj)
|
||||
}
|
||||
|
||||
func (c *Constructor) encodeRaw(data []byte) ([][]byte, error) {
|
||||
shards, err := c.enc.Split(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := c.enc.Encode(shards); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return shards, nil
|
||||
}
|
36
object/erasurecode/split_test.go
Normal file
36
object/erasurecode/split_test.go
Normal file
|
@ -0,0 +1,36 @@
|
|||
package erasurecode_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// The library can behave differently for big shard counts.
|
||||
// This test checks we support the maximum number of chunks we promise.
|
||||
func TestSplitMaxShardCount(t *testing.T) {
|
||||
pk, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
original := newObject(t, 1024, pk)
|
||||
|
||||
t.Run("only data", func(t *testing.T) {
|
||||
c, err := erasurecode.NewConstructor(erasurecode.MaxShardCount, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
parts, err := c.Split(original, &pk.PrivateKey)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, parts, erasurecode.MaxShardCount)
|
||||
})
|
||||
t.Run("data + parity", func(t *testing.T) {
|
||||
c, err := erasurecode.NewConstructor(1, erasurecode.MaxShardCount-1)
|
||||
require.NoError(t, err)
|
||||
|
||||
parts, err := c.Split(original, &pk.PrivateKey)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, parts, erasurecode.MaxShardCount)
|
||||
})
|
||||
|
||||
}
|
44
object/erasurecode/target.go
Normal file
44
object/erasurecode/target.go
Normal file
|
@ -0,0 +1,44 @@
|
|||
package erasurecode
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||
)
|
||||
|
||||
// Target accepts regular objects and splits them into erasure-coded chunks.
|
||||
type Target struct {
|
||||
c *Constructor
|
||||
key *ecdsa.PrivateKey
|
||||
next transformer.ObjectWriter
|
||||
}
|
||||
|
||||
// ObjectWriter is an interface of the object writer that writes prepared object.
|
||||
type ObjectWriter interface {
|
||||
WriteObject(context.Context, *objectSDK.Object) error
|
||||
}
|
||||
|
||||
// NewTarget returns new target instance.
|
||||
func NewTarget(c *Constructor, key *ecdsa.PrivateKey, next ObjectWriter) *Target {
|
||||
return &Target{
|
||||
c: c,
|
||||
key: key,
|
||||
next: next,
|
||||
}
|
||||
}
|
||||
|
||||
// WriteObject implements the transformer.ObjectWriter interface.
|
||||
func (t *Target) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||
parts, err := t.c.Split(obj, t.key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i := range parts {
|
||||
if err := t.next.WriteObject(ctx, parts[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
104
object/erasurecode/verify.go
Normal file
104
object/erasurecode/verify.go
Normal file
|
@ -0,0 +1,104 @@
|
|||
package erasurecode
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
)
|
||||
|
||||
// Verify verifies that parts are well formed.
|
||||
// All parts are expected to be non-nil.
|
||||
// The number of parts must be equal to `total` field of the EC header
|
||||
// and parts must be sorted by index.
|
||||
func (c *Constructor) Verify(parts []*objectSDK.Object) error {
|
||||
c.clear()
|
||||
|
||||
var headerLength int
|
||||
for i := range parts {
|
||||
if parts[i] == nil {
|
||||
return ErrMalformedSlice
|
||||
}
|
||||
|
||||
var err error
|
||||
headerLength, err = validatePart(parts, i, headerLength)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
p0 := parts[0]
|
||||
for i := 1; i < len(parts); i++ {
|
||||
// This part must be kept in sync with copyRequiredFields().
|
||||
pi := parts[i]
|
||||
if p0.OwnerID().Equals(pi.OwnerID()) {
|
||||
return fmt.Errorf("%w: owner id mismatch: %s != %s", ErrMalformedSlice, p0.OwnerID(), pi.OwnerID())
|
||||
}
|
||||
if p0.Version() == nil && pi.Version() != nil || !p0.Version().Equal(*pi.Version()) {
|
||||
return fmt.Errorf("%w: version mismatch: %s != %s", ErrMalformedSlice, p0.Version(), pi.Version())
|
||||
}
|
||||
|
||||
cnr0, _ := p0.ContainerID()
|
||||
cnri, _ := pi.ContainerID()
|
||||
if !cnr0.Equals(cnri) {
|
||||
return fmt.Errorf("%w: container id mismatch: %s != %s", ErrMalformedSlice, cnr0, cnri)
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.fillHeader(parts); err != nil {
|
||||
return err
|
||||
}
|
||||
c.fillPayload(parts)
|
||||
|
||||
ok, err := c.enc.Verify(c.headerShards)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
return ErrMalformedSlice
|
||||
}
|
||||
|
||||
ok, err = c.enc.Verify(c.payloadShards)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
return ErrMalformedSlice
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// copyRequiredFields sets all fields in dst which are copied from src and shared among all chunks.
|
||||
// src can be either another chunk of full object.
|
||||
// dst must be a chunk.
|
||||
func copyRequiredFields(dst *objectSDK.Object, src *objectSDK.Object) {
|
||||
dst.SetVersion(src.Version())
|
||||
dst.SetOwnerID(src.OwnerID())
|
||||
dst.SetCreationEpoch(src.CreationEpoch())
|
||||
dst.SetSessionToken(src.SessionToken())
|
||||
|
||||
cnr, _ := src.ContainerID()
|
||||
dst.SetContainerID(cnr)
|
||||
}
|
||||
|
||||
// validatePart makes i-th part is consistent with the rest.
|
||||
// If headerLength is not zero it is asserted to be equal in the ec header.
|
||||
// Otherwise, new headerLength is returned.
|
||||
func validatePart(parts []*objectSDK.Object, i int, headerLength int) (int, error) {
|
||||
ec := parts[i].GetECHeader()
|
||||
if ec == nil {
|
||||
return headerLength, fmt.Errorf("%w: missing EC header", ErrMalformedSlice)
|
||||
}
|
||||
if ec.Index() != uint32(i) {
|
||||
return headerLength, fmt.Errorf("%w: index=%d, ec.index=%d", ErrMalformedSlice, i, ec.Index())
|
||||
}
|
||||
if ec.Total() != uint32(len(parts)) {
|
||||
return headerLength, fmt.Errorf("%w: len(parts)=%d, total=%d", ErrMalformedSlice, len(parts), ec.Total())
|
||||
}
|
||||
if headerLength == 0 {
|
||||
return int(ec.HeaderLength()), nil
|
||||
}
|
||||
if ec.HeaderLength() != uint32(headerLength) {
|
||||
return headerLength, fmt.Errorf("%w: header length mismatch %d != %d", ErrMalformedSlice, headerLength, ec.HeaderLength())
|
||||
}
|
||||
return headerLength, nil
|
||||
}
|
|
@ -366,6 +366,14 @@ func (o *Object) Children() []oid.ID {
|
|||
return res
|
||||
}
|
||||
|
||||
func (o *Object) GetECHeader() *ECHeader {
|
||||
v2 := (*object.Object)(o).GetHeader().GetEC()
|
||||
|
||||
var ec ECHeader
|
||||
_ = ec.ReadFromV2(v2) // Errors is checked on unmarshal.
|
||||
return &ec
|
||||
}
|
||||
|
||||
// SetChildren sets list of the identifiers of the child objects.
|
||||
func (o *Object) SetChildren(v ...oid.ID) {
|
||||
var (
|
||||
|
|
Loading…
Reference in a new issue