This commit is contained in:
Evgeniy Kulikov 2019-11-18 16:34:06 +03:00
commit 1cf33e5ffd
No known key found for this signature in database
GPG key ID: BF6AEE0A2A699BF2
87 changed files with 29835 additions and 0 deletions

143
object/doc.go Normal file
View file

@ -0,0 +1,143 @@
/*
Package object manages main storage structure in the system. All storage
operations are performed with the objects. During lifetime object might be
transformed into another object by cutting its payload or adding meta
information. All transformation may be reversed, therefore source object
will be able to restore.
Object structure
Object consists of Payload and Header. Payload is unlimited but storage nodes
may have a policy to store objects with a limited payload. In this case object
with large payload will be transformed into the chain of objects with small
payload.
Headers are simple key-value fields that divided into two groups: system
headers and extended headers. System headers contain information about
protocol version, object id, payload length in bytes, owner id, container id
and object creation timestamp (both in epochs and unix time). All these fields
must be set up in the correct object.
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-
| System Headers |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-
| Version : 1 |
| Payload Length : 21673465 |
| Object ID : 465208e2-ba4f-4f99-ad47-82a59f4192d4 |
| Owner ID : AShvoCbSZ7VfRiPkVb1tEcBLiJrcbts1tt |
| Container ID : FGobtRZA6sBZv2i9k4L7TiTtnuP6E788qa278xfj3Fxj |
| Created At : Epoch#10, 1573033162 |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-
| Extended Headers |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-
| User Header : <user-defined-key>, <user-defined-value> |
| Verification Header : <session public key>, <owner's signature> |
| Homomorphic Hash : 0x23d35a56ae... |
| Payload Checksum : 0x1bd34abs75... |
| Integrity Header : <header checksum>, <session signature> |
| Transformation : Payload Split |
| Link-parent : cae08935-b4ba-499a-bf6c-98276c1e6c0b |
| Link-next : c3b40fbf-3798-4b61-a189-2992b5fb5070 |
| Payload Checksum : 0x1f387a5c36... |
| Integrity Header : <header checksum>, <session signature> |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-
| Payload |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-
| 0xd1581963a342d231... |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-
There are different kinds of extended headers. A correct object must contain
verification header, homomorphic hash header, payload checksum and
integrity header. The order of headers is matter. Let's look through all
these headers.
Link header points to the connected objects. During object transformation, large
object might be transformed into the chain of smaller objects. One of these
objects drops payload and has several "Child" links. We call this object as
zero-object. Others will have "Parent" link to the zero-object, "Previous"
and "Next" links in the payload chain.
[ Object ID:1 ] = > transformed
`- [ Zero-Object ID:1 ]
`- Link-child ID:2
`- Link-child ID:3
`- Link-child ID:4
`- Payload [null]
`- [ Object ID:2 ]
`- Link-parent ID:1
`- Link-next ID:3
`- Payload [ 0x13ba... ]
`- [ Object ID:3 ]
`- Link-parent ID:1
`- Link-previous ID:2
`- Link-next ID:4
`- Payload [ 0xcd34... ]
`- [ Object ID:4 ]
`- Link-parent ID:1
`- Link-previous ID:3
`- Payload [ 0xef86... ]
Storage groups are also objects. They have "Storage Group" links to all
objects in the group. Links are set by nodes during transformations and,
in general, they should not be set by user manually.
Redirect headers are not used yet, they will be implemented and described
later.
User header is a key-value pair of string that can be defined by user. User
can use these headers as search attribute. You can store any meta information
about object there, e.g. object's nicename.
Transformation header notifies that object was transformed by some pre-defined
way. This header sets up before object is transformed and all headers after
transformation must be located after transformation header. During reverse
transformation, all headers under transformation header will be cut out.
+-+-+-+-+-+-+-+-+-+- +-+-+-+-+-+-+-+-+-+-+ +-+-+-+-+-+-+-+-+-+-+
| Payload checksum | | Payload checksum | | Payload checksum |
| Integrity header | => | Integrity header | + | Integrity header |
+-+-+-+-+-+-+-+-+-+- | Transformation | | Transformation |
| Large payload | | New Checksum | | New Checksum |
+-+-+-+-+-+-+-+-+-+- | New Integrity | | New Integrity |
+-+-+-+-+-+-+-+-+-+-+ +-+-+-+-+-+-+-+-+-+-+
| Small payload | | Small payload |
+-+-+-+-+-+-+-+-+-+-+ +-+-+-+-+-+-+-+-+-+-+
For now, we use only one type of transformation: payload split transformation.
This header set up by node automatically.
Tombstone header notifies that this object was deleted by user. Objects with
tombstone header do not have payload, but they still contain meta information
in the headers. This way we implement two-phase commit for object removal.
Storage nodes will eventually delete all tombstone objects. If you want to
delete object, you must create new object with the same object id, with
tombstone header, correct signatures and without payload.
Verification header contains session information. To put the object in
the system user must create session. It is required because objects might
be transformed and therefore must be re-signed. To do that node creates
a pair of session public and private keys. Object owner delegates permission to
re-sign objects by signing session public key. This header contains session
public key and owner's signature of this key. You must specify this header
manually.
Homomorphic hash header contains homomorphic hash of the source object.
Transformations do not affect this header. This header used by data audit and
set by node automatically.
Payload checksum contains checksum of the actual object payload. All payload
transformation must set new payload checksum headers. This header set by node
automatically.
Integrity header contains checksum of the header and signature of the
session key. This header must be last in the list of extended headers.
Checksum is calculated by marshaling all above headers, including system
headers. This header set by node automatically.
Storage group header is presented in storage group objects. It contains
information for data audit: size of validated data, homomorphic has of this
data, storage group expiration time in epochs or unix time.
*/
package object

84
object/extensions.go Normal file
View file

@ -0,0 +1,84 @@
package object
import (
"github.com/nspcc-dev/neofs-proto/hash"
)
// IsLinking checks if object has children links to another objects.
// We have to check payload size because zero-object must have zero
// payload and non-zero payload length field in system header.
func (m Object) IsLinking() bool {
for i := range m.Headers {
switch v := m.Headers[i].Value.(type) {
case *Header_Link:
if v.Link.GetType() == Link_Child {
return m.SystemHeader.PayloadLength > 0 && len(m.Payload) == 0
}
}
}
return false
}
// VerificationHeader returns verification header if it is presented in extended headers.
func (m Object) VerificationHeader() (*VerificationHeader, error) {
_, vh := m.LastHeader(HeaderType(VerifyHdr))
if vh == nil {
return nil, ErrHeaderNotFound
}
return vh.Value.(*Header_Verify).Verify, nil
}
// SetVerificationHeader sets verification header in the object.
// It will replace existing verification header or add a new one.
func (m *Object) SetVerificationHeader(header *VerificationHeader) {
m.SetHeader(&Header{Value: &Header_Verify{Verify: header}})
}
// Links returns slice of ids of specified link type
func (m *Object) Links(t Link_Type) []ID {
var res []ID
for i := range m.Headers {
switch v := m.Headers[i].Value.(type) {
case *Header_Link:
if v.Link.GetType() == t {
res = append(res, v.Link.ID)
}
}
}
return res
}
// Tombstone returns tombstone header if it is presented in extended headers.
func (m Object) Tombstone() *Tombstone {
_, h := m.LastHeader(HeaderType(TombstoneHdr))
if h != nil {
return h.Value.(*Header_Tombstone).Tombstone
}
return nil
}
// IsTombstone checks if object has tombstone header.
func (m Object) IsTombstone() bool {
n, _ := m.LastHeader(HeaderType(TombstoneHdr))
return n != -1
}
// StorageGroup returns storage group structure if it is presented in extended headers.
func (m Object) StorageGroup() (*StorageGroup, error) {
_, sgHdr := m.LastHeader(HeaderType(StorageGroupHdr))
if sgHdr == nil {
return nil, ErrHeaderNotFound
}
return sgHdr.Value.(*Header_StorageGroup).StorageGroup, nil
}
// SetStorageGroup sets storage group header in the object.
// It will replace existing storage group header or add a new one.
func (m *Object) SetStorageGroup(sg *StorageGroup) {
m.SetHeader(&Header{Value: &Header_StorageGroup{StorageGroup: sg}})
}
// Empty checks if storage group has some data for validation.
func (m StorageGroup) Empty() bool {
return m.ValidationDataSize == 0 && m.ValidationHash.Equal(hash.Hash{})
}

215
object/service.go Normal file
View file

@ -0,0 +1,215 @@
package object
import (
"github.com/nspcc-dev/neofs-proto/hash"
"github.com/nspcc-dev/neofs-proto/internal"
"github.com/nspcc-dev/neofs-proto/refs"
"github.com/nspcc-dev/neofs-proto/service"
"github.com/nspcc-dev/neofs-proto/session"
)
type (
// ID is a type alias of object id.
ID = refs.ObjectID
// CID is a type alias of container id.
CID = refs.CID
// SGID is a type alias of storage group id.
SGID = refs.SGID
// OwnerID is a type alias of owner id.
OwnerID = refs.OwnerID
// Hash is a type alias of Homomorphic hash.
Hash = hash.Hash
// Token is a type alias of session token.
Token = session.Token
// Request defines object rpc requests.
// All object operations must have TTL, Epoch, Container ID and
// permission of usage previous network map.
Request interface {
service.TTLRequest
service.EpochRequest
CID() CID
AllowPreviousNetMap() bool
}
)
const (
// UnitsB starts enum for amount of bytes.
UnitsB int64 = 1 << (10 * iota)
// UnitsKB defines amount of bytes in one kilobyte.
UnitsKB
// UnitsMB defines amount of bytes in one megabyte.
UnitsMB
// UnitsGB defines amount of bytes in one gigabyte.
UnitsGB
// UnitsTB defines amount of bytes in one terabyte.
UnitsTB
)
const (
// ErrNotFound is raised when object is not found in the system.
ErrNotFound = internal.Error("could not find object")
// ErrHeaderExpected is raised when first message in protobuf stream does not contain user header.
ErrHeaderExpected = internal.Error("expected header as a first message in stream")
// KeyStorageGroup is a key for a search object by storage group id.
KeyStorageGroup = "STORAGE_GROUP"
// KeyNoChildren is a key for searching object that have no children links.
KeyNoChildren = "LEAF"
// KeyParent is a key for searching object by id of parent object.
KeyParent = "PARENT"
// KeyHasParent is a key for searching object that have parent link.
KeyHasParent = "HAS_PAR"
// KeyTombstone is a key for searching object that have tombstone header.
KeyTombstone = "TOMBSTONE"
// KeyChild is a key for searching object by id of child link.
KeyChild = "CHILD"
// KeyPrev is a key for searching object by id of previous link.
KeyPrev = "PREV"
// KeyNext is a key for searching object by id of next link.
KeyNext = "NEXT"
// KeyID is a key for searching object by object id.
KeyID = "ID"
// KeyCID is a key for searching object by container id.
KeyCID = "CID"
// KeyOwnerID is a key for searching object by owner id.
KeyOwnerID = "OWNERID"
// KeyRootObject is a key for searching object that are zero-object or do
// not have any children.
KeyRootObject = "ROOT_OBJECT"
)
func checkIsNotFull(v interface{}) bool {
var obj *Object
switch t := v.(type) {
case *GetResponse:
obj = t.GetObject()
case *PutRequest:
if h := t.GetHeader(); h != nil {
obj = h.Object
}
default:
panic("unknown type")
}
return obj == nil || obj.SystemHeader.PayloadLength != uint64(len(obj.Payload)) && !obj.IsLinking()
}
// NotFull checks if protobuf stream provided whole object for get operation.
func (m *GetResponse) NotFull() bool { return checkIsNotFull(m) }
// NotFull checks if protobuf stream provided whole object for put operation.
func (m *PutRequest) NotFull() bool { return checkIsNotFull(m) }
// GetTTL returns TTL value from object put request.
func (m *PutRequest) GetTTL() uint32 { return m.GetHeader().TTL }
// GetEpoch returns epoch value from object put request.
func (m *PutRequest) GetEpoch() uint64 { return m.GetHeader().GetEpoch() }
// SetTTL sets TTL value into object put request.
func (m *PutRequest) SetTTL(ttl uint32) { m.GetHeader().TTL = ttl }
// SetTTL sets TTL value into object get request.
func (m *GetRequest) SetTTL(ttl uint32) { m.TTL = ttl }
// SetTTL sets TTL value into object head request.
func (m *HeadRequest) SetTTL(ttl uint32) { m.TTL = ttl }
// SetTTL sets TTL value into object search request.
func (m *SearchRequest) SetTTL(ttl uint32) { m.TTL = ttl }
// SetTTL sets TTL value into object delete request.
func (m *DeleteRequest) SetTTL(ttl uint32) { m.TTL = ttl }
// SetTTL sets TTL value into object get range request.
func (m *GetRangeRequest) SetTTL(ttl uint32) { m.TTL = ttl }
// SetTTL sets TTL value into object get range hash request.
func (m *GetRangeHashRequest) SetTTL(ttl uint32) { m.TTL = ttl }
// SetEpoch sets epoch value into object put request.
func (m *PutRequest) SetEpoch(v uint64) { m.GetHeader().Epoch = v }
// SetEpoch sets epoch value into object get request.
func (m *GetRequest) SetEpoch(v uint64) { m.Epoch = v }
// SetEpoch sets epoch value into object head request.
func (m *HeadRequest) SetEpoch(v uint64) { m.Epoch = v }
// SetEpoch sets epoch value into object search request.
func (m *SearchRequest) SetEpoch(v uint64) { m.Epoch = v }
// SetEpoch sets epoch value into object delete request.
func (m *DeleteRequest) SetEpoch(v uint64) { m.Epoch = v }
// SetEpoch sets epoch value into object get range request.
func (m *GetRangeRequest) SetEpoch(v uint64) { m.Epoch = v }
// SetEpoch sets epoch value into object get range hash request.
func (m *GetRangeHashRequest) SetEpoch(v uint64) { m.Epoch = v }
// CID returns container id value from object put request.
func (m *PutRequest) CID() CID { return m.GetHeader().Object.SystemHeader.CID }
// CID returns container id value from object get request.
func (m *GetRequest) CID() CID { return m.Address.CID }
// CID returns container id value from object head request.
func (m *HeadRequest) CID() CID { return m.Address.CID }
// CID returns container id value from object search request.
func (m *SearchRequest) CID() CID { return m.ContainerID }
// CID returns container id value from object delete request.
func (m *DeleteRequest) CID() CID { return m.Address.CID }
// CID returns container id value from object get range request.
func (m *GetRangeRequest) CID() CID { return m.Address.CID }
// CID returns container id value from object get range hash request.
func (m *GetRangeHashRequest) CID() CID { return m.Address.CID }
// AllowPreviousNetMap returns permission to use previous network map in object put request.
func (m *PutRequest) AllowPreviousNetMap() bool { return false }
// AllowPreviousNetMap returns permission to use previous network map in object get request.
func (m *GetRequest) AllowPreviousNetMap() bool { return true }
// AllowPreviousNetMap returns permission to use previous network map in object head request.
func (m *HeadRequest) AllowPreviousNetMap() bool { return true }
// AllowPreviousNetMap returns permission to use previous network map in object search request.
func (m *SearchRequest) AllowPreviousNetMap() bool { return true }
// AllowPreviousNetMap returns permission to use previous network map in object delete request.
func (m *DeleteRequest) AllowPreviousNetMap() bool { return false }
// AllowPreviousNetMap returns permission to use previous network map in object get range request.
func (m *GetRangeRequest) AllowPreviousNetMap() bool { return false }
// AllowPreviousNetMap returns permission to use previous network map in object get range hash request.
func (m *GetRangeHashRequest) AllowPreviousNetMap() bool { return false }

4491
object/service.pb.go Normal file

File diff suppressed because it is too large Load diff

119
object/service.proto Normal file
View file

@ -0,0 +1,119 @@
syntax = "proto3";
package object;
option go_package = "github.com/nspcc-dev/neofs-proto/object";
import "refs/types.proto";
import "object/types.proto";
import "session/types.proto";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option (gogoproto.stable_marshaler_all) = true;
service Service {
// Get the object from a container
rpc Get(GetRequest) returns (stream GetResponse);
// Put the object into a container
rpc Put(stream PutRequest) returns (PutResponse);
// Delete the object from a container
rpc Delete(DeleteRequest) returns (DeleteResponse);
// Get MetaInfo
rpc Head(HeadRequest) returns (HeadResponse);
// Search by MetaInfo
rpc Search(SearchRequest) returns (SearchResponse);
// Get ranges of object payload
rpc GetRange(GetRangeRequest) returns (GetRangeResponse);
// Get hashes of object ranges
rpc GetRangeHash(GetRangeHashRequest) returns (GetRangeHashResponse);
}
message GetRequest {
uint64 Epoch = 1;
refs.Address Address = 2 [(gogoproto.nullable) = false];
uint32 TTL = 3;
}
message GetResponse {
oneof R {
Object object = 1;
bytes Chunk = 2;
}
}
message PutRequest {
message PutHeader {
uint64 Epoch = 1;
Object Object = 2;
uint32 TTL = 3;
session.Token Token = 4;
}
oneof R {
PutHeader Header = 1;
bytes Chunk = 2;
}
}
message PutResponse {
refs.Address Address = 1 [(gogoproto.nullable) = false];
}
message DeleteRequest {
uint64 Epoch = 1;
refs.Address Address = 2 [(gogoproto.nullable) = false];
bytes OwnerID = 3 [(gogoproto.nullable) = false, (gogoproto.customtype) = "OwnerID"];
uint32 TTL = 4;
session.Token Token = 5;
}
message DeleteResponse {}
// HeadRequest.FullHeader == true, for fetch all headers
message HeadRequest {
uint64 Epoch = 1;
refs.Address Address = 2 [(gogoproto.nullable) = false, (gogoproto.customtype) = "Address"];
bool FullHeaders = 3;
uint32 TTL = 4;
}
message HeadResponse {
Object Object = 1;
}
message SearchRequest {
uint64 Epoch = 1;
uint32 Version = 2;
bytes ContainerID = 3 [(gogoproto.nullable) = false, (gogoproto.customtype) = "CID"];
bytes Query = 4;
uint32 TTL = 5;
}
message SearchResponse {
repeated refs.Address Addresses = 1 [(gogoproto.nullable) = false];
}
message GetRangeRequest {
uint64 Epoch = 1;
refs.Address Address = 2 [(gogoproto.nullable) = false];
repeated Range Ranges = 3 [(gogoproto.nullable) = false];
uint32 TTL = 4;
}
message GetRangeResponse {
repeated bytes Fragments = 1;
}
message GetRangeHashRequest {
uint64 Epoch = 1;
refs.Address Address = 2 [(gogoproto.nullable) = false];
repeated Range Ranges = 3 [(gogoproto.nullable) = false];
bytes Salt = 4;
uint32 TTL = 5;
}
message GetRangeHashResponse {
repeated bytes Hashes = 1 [(gogoproto.customtype) = "Hash", (gogoproto.nullable) = false];
}

66
object/sg.go Normal file
View file

@ -0,0 +1,66 @@
package object
import (
"bytes"
"sort"
)
// Here are defined getter functions for objects that contain storage group
// information.
type (
// IDList is a slice of object ids, that can be sorted.
IDList []ID
// ZoneInfo provides validation info of storage group.
ZoneInfo struct {
Hash
Size uint64
}
// IdentificationInfo provides meta information about storage group.
IdentificationInfo struct {
SGID
CID
OwnerID
}
)
// Len returns amount of object ids in IDList.
func (s IDList) Len() int { return len(s) }
// Less returns byte comparision between IDList[i] and IDList[j].
func (s IDList) Less(i, j int) bool { return bytes.Compare(s[i].Bytes(), s[j].Bytes()) == -1 }
// Swap swaps element with i and j index in IDList.
func (s IDList) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
// Group returns slice of object ids that are part of a storage group.
func (m *Object) Group() []ID {
sgLinks := m.Links(Link_StorageGroup)
sort.Sort(IDList(sgLinks))
return sgLinks
}
// Zones returns validation zones of storage group.
func (m *Object) Zones() []ZoneInfo {
sgInfo, err := m.StorageGroup()
if err != nil {
return nil
}
return []ZoneInfo{
{
Hash: sgInfo.ValidationHash,
Size: sgInfo.ValidationDataSize,
},
}
}
// IDInfo returns meta information about storage group.
func (m *Object) IDInfo() *IdentificationInfo {
return &IdentificationInfo{
SGID: m.SystemHeader.ID,
CID: m.SystemHeader.CID,
OwnerID: m.SystemHeader.OwnerID,
}
}

87
object/sg_test.go Normal file
View file

@ -0,0 +1,87 @@
package object
import (
"math/rand"
"sort"
"testing"
"github.com/nspcc-dev/neofs-proto/hash"
"github.com/stretchr/testify/require"
)
func TestObject_StorageGroup(t *testing.T) {
t.Run("group method", func(t *testing.T) {
var linkCount byte = 100
obj := &Object{Headers: make([]Header, 0, linkCount)}
require.Empty(t, obj.Group())
idList := make([]ID, linkCount)
for i := byte(0); i < linkCount; i++ {
idList[i] = ID{i}
obj.Headers = append(obj.Headers, Header{
Value: &Header_Link{Link: &Link{
Type: Link_StorageGroup,
ID: idList[i],
}},
})
}
rand.Shuffle(len(obj.Headers), func(i, j int) { obj.Headers[i], obj.Headers[j] = obj.Headers[j], obj.Headers[i] })
sort.Sort(IDList(idList))
require.Equal(t, idList, obj.Group())
})
t.Run("identification method", func(t *testing.T) {
oid, cid, owner := ID{1}, CID{2}, OwnerID{3}
obj := &Object{
SystemHeader: SystemHeader{
ID: oid,
OwnerID: owner,
CID: cid,
},
}
idInfo := obj.IDInfo()
require.Equal(t, oid, idInfo.SGID)
require.Equal(t, cid, idInfo.CID)
require.Equal(t, owner, idInfo.OwnerID)
})
t.Run("zones method", func(t *testing.T) {
sgSize := uint64(100)
d := make([]byte, sgSize)
_, err := rand.Read(d)
require.NoError(t, err)
sgHash := hash.Sum(d)
obj := &Object{
Headers: []Header{
{
Value: &Header_StorageGroup{
StorageGroup: &StorageGroup{
ValidationDataSize: sgSize,
ValidationHash: sgHash,
},
},
},
},
}
var (
sumSize uint64
zones = obj.Zones()
hashes = make([]Hash, len(zones))
)
for i := range zones {
sumSize += zones[i].Size
hashes[i] = zones[i].Hash
}
sumHash, err := hash.Concat(hashes)
require.NoError(t, err)
require.Equal(t, sgSize, sumSize)
require.Equal(t, sgHash, sumHash)
})
}

219
object/types.go Normal file
View file

@ -0,0 +1,219 @@
package object
import (
"bytes"
"context"
"github.com/gogo/protobuf/proto"
"github.com/nspcc-dev/neofs-proto/internal"
"github.com/nspcc-dev/neofs-proto/refs"
"github.com/nspcc-dev/neofs-proto/session"
)
type (
// Pred defines a predicate function that can check if passed header
// satisfies predicate condition. It is used to find headers of
// specific type.
Pred = func(*Header) bool
// Address is a type alias of object Address.
Address = refs.Address
// VerificationHeader is a type alias of session's verification header.
VerificationHeader = session.VerificationHeader
// PositionReader defines object reader that returns slice of bytes
// for specified object and data range.
PositionReader interface {
PRead(ctx context.Context, addr refs.Address, rng Range) ([]byte, error)
}
headerType int
)
const (
// ErrVerifyPayload is raised when payload checksum cannot be verified.
ErrVerifyPayload = internal.Error("can't verify payload")
// ErrVerifyHeader is raised when object integrity cannot be verified.
ErrVerifyHeader = internal.Error("can't verify header")
// ErrHeaderNotFound is raised when requested header not found.
ErrHeaderNotFound = internal.Error("header not found")
// ErrVerifySignature is raised when signature cannot be verified.
ErrVerifySignature = internal.Error("can't verify signature")
)
const (
_ headerType = iota
// LinkHdr is a link header type.
LinkHdr
// RedirectHdr is a redirect header type.
RedirectHdr
// UserHdr is a user defined header type.
UserHdr
// TransformHdr is a transformation header type.
TransformHdr
// TombstoneHdr is a tombstone header type.
TombstoneHdr
// VerifyHdr is a verification header type.
VerifyHdr
// HomoHashHdr is a homomorphic hash header type.
HomoHashHdr
// PayloadChecksumHdr is a payload checksum header type.
PayloadChecksumHdr
// IntegrityHdr is a integrity header type.
IntegrityHdr
// StorageGroupHdr is a storage group header type.
StorageGroupHdr
)
var (
_ internal.Custom = (*Object)(nil)
emptyObject = new(Object).Bytes()
)
// Bytes returns marshaled object in a binary format.
func (m Object) Bytes() []byte { data, _ := m.Marshal(); return data }
// Empty checks if object does not contain any information.
func (m Object) Empty() bool { return bytes.Equal(m.Bytes(), emptyObject) }
// LastHeader returns last header of the specified type. Type must be
// specified as a Pred function.
func (m Object) LastHeader(f Pred) (int, *Header) {
for i := len(m.Headers) - 1; i >= 0; i-- {
if f != nil && f(&m.Headers[i]) {
return i, &m.Headers[i]
}
}
return -1, nil
}
// AddHeader adds passed header to the end of extended header list.
func (m *Object) AddHeader(h *Header) {
m.Headers = append(m.Headers, *h)
}
// SetPayload sets payload field and payload length in the system header.
func (m *Object) SetPayload(payload []byte) {
m.Payload = payload
m.SystemHeader.PayloadLength = uint64(len(payload))
}
// SetHeader replaces existing extended header or adds new one to the end of
// extended header list.
func (m *Object) SetHeader(h *Header) {
// looking for the header of that type
for i := range m.Headers {
if m.Headers[i].typeOf(h.Value) {
// if we found one - set it with new value and return
m.Headers[i] = *h
return
}
}
// if we did not find one - add this header
m.AddHeader(h)
}
func (m Header) typeOf(t isHeader_Value) (ok bool) {
switch t.(type) {
case *Header_Link:
_, ok = m.Value.(*Header_Link)
case *Header_Redirect:
_, ok = m.Value.(*Header_Redirect)
case *Header_UserHeader:
_, ok = m.Value.(*Header_UserHeader)
case *Header_Transform:
_, ok = m.Value.(*Header_Transform)
case *Header_Tombstone:
_, ok = m.Value.(*Header_Tombstone)
case *Header_Verify:
_, ok = m.Value.(*Header_Verify)
case *Header_HomoHash:
_, ok = m.Value.(*Header_HomoHash)
case *Header_PayloadChecksum:
_, ok = m.Value.(*Header_PayloadChecksum)
case *Header_Integrity:
_, ok = m.Value.(*Header_Integrity)
case *Header_StorageGroup:
_, ok = m.Value.(*Header_StorageGroup)
}
return
}
// HeaderType returns predicate that check if extended header is a header
// of specified type.
func HeaderType(t headerType) Pred {
switch t {
case LinkHdr:
return func(h *Header) bool { _, ok := h.Value.(*Header_Link); return ok }
case RedirectHdr:
return func(h *Header) bool { _, ok := h.Value.(*Header_Redirect); return ok }
case UserHdr:
return func(h *Header) bool { _, ok := h.Value.(*Header_UserHeader); return ok }
case TransformHdr:
return func(h *Header) bool { _, ok := h.Value.(*Header_Transform); return ok }
case TombstoneHdr:
return func(h *Header) bool { _, ok := h.Value.(*Header_Tombstone); return ok }
case VerifyHdr:
return func(h *Header) bool { _, ok := h.Value.(*Header_Verify); return ok }
case HomoHashHdr:
return func(h *Header) bool { _, ok := h.Value.(*Header_HomoHash); return ok }
case PayloadChecksumHdr:
return func(h *Header) bool { _, ok := h.Value.(*Header_PayloadChecksum); return ok }
case IntegrityHdr:
return func(h *Header) bool { _, ok := h.Value.(*Header_Integrity); return ok }
case StorageGroupHdr:
return func(h *Header) bool { _, ok := h.Value.(*Header_StorageGroup); return ok }
default:
return nil
}
}
// Copy creates full copy of the object.
func (m *Object) Copy() (obj *Object) {
obj = new(Object)
m.CopyTo(obj)
return
}
// CopyTo creates fills passed object with the data from the current object.
// This function creates copies on every available data slice.
func (m *Object) CopyTo(o *Object) {
o.SystemHeader = m.SystemHeader
o.Headers = make([]Header, len(m.Headers))
o.Payload = make([]byte, len(m.Payload))
for i := range m.Headers {
switch v := m.Headers[i].Value.(type) {
case *Header_Link:
link := *v.Link
o.Headers[i] = Header{
Value: &Header_Link{
Link: &link,
},
}
case *Header_HomoHash:
o.Headers[i] = Header{
Value: &Header_HomoHash{
HomoHash: v.HomoHash,
},
}
default:
o.Headers[i] = *proto.Clone(&m.Headers[i]).(*Header)
}
}
copy(o.Payload, m.Payload)
}
// Address returns object's address.
func (m Object) Address() *refs.Address {
return &refs.Address{
ObjectID: m.SystemHeader.ID,
CID: m.SystemHeader.CID,
}
}

3814
object/types.pb.go Normal file

File diff suppressed because it is too large Load diff

107
object/types.proto Normal file
View file

@ -0,0 +1,107 @@
syntax = "proto3";
package object;
option go_package = "github.com/nspcc-dev/neofs-proto/object";
import "refs/types.proto";
import "session/types.proto";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option (gogoproto.stable_marshaler_all) = true;
message Range {
uint64 Offset = 1;
uint64 Length = 2;
}
message UserHeader {
string Key = 1;
string Value = 2;
}
message Header {
oneof Value {
Link Link = 1;
refs.Address Redirect = 2;
UserHeader UserHeader = 3;
Transform Transform = 4;
Tombstone Tombstone = 5;
// session-related info: session.VerificationHeader
session.VerificationHeader Verify = 6;
// integrity-related info
bytes HomoHash = 7 [(gogoproto.customtype) = "Hash"];
bytes PayloadChecksum = 8;
IntegrityHeader Integrity = 9;
StorageGroup StorageGroup = 10;
}
}
message Tombstone {
uint64 Epoch = 1;
}
message SystemHeader {
uint64 Version = 1;
uint64 PayloadLength = 2;
bytes ID = 3 [(gogoproto.customtype) = "ID", (gogoproto.nullable) = false];
bytes OwnerID = 4 [(gogoproto.customtype) = "OwnerID", (gogoproto.nullable) = false];
bytes CID = 5 [(gogoproto.customtype) = "CID", (gogoproto.nullable) = false];
CreationPoint CreatedAt = 6 [(gogoproto.nullable) = false];
}
message CreationPoint {
int64 UnixTime = 1;
uint64 Epoch = 2;
}
message IntegrityHeader {
bytes HeadersChecksum = 1;
bytes ChecksumSignature = 2;
}
message Link {
enum Type {
Unknown = 0;
Parent = 1;
Previous = 2;
Next = 3;
Child = 4;
StorageGroup = 5;
}
Type type = 1;
bytes ID = 2 [(gogoproto.customtype) = "ID", (gogoproto.nullable) = false];
}
message Transform {
enum Type {
Unknown = 0;
Split = 1;
Sign = 2;
Mould = 3;
}
Type type = 1;
}
message Object {
SystemHeader SystemHeader = 1 [(gogoproto.nullable) = false];
repeated Header Headers = 2 [(gogoproto.nullable) = false];
bytes Payload = 3;
}
message StorageGroup {
uint64 ValidationDataSize = 1;
bytes ValidationHash = 2 [(gogoproto.customtype) = "Hash", (gogoproto.nullable) = false];
message Lifetime {
enum Unit {
Unlimited = 0;
NeoFSEpoch = 1;
UnixTime = 2;
}
Unit unit = 1 [(gogoproto.customname) = "Unit"];
int64 Value = 2;
}
Lifetime lifetime = 3 [(gogoproto.customname) = "Lifetime"];
}

107
object/utils.go Normal file
View file

@ -0,0 +1,107 @@
package object
import (
"io"
"code.cloudfoundry.org/bytefmt"
"github.com/nspcc-dev/neofs-proto/session"
"github.com/pkg/errors"
)
const maxGetPayloadSize = 3584 * 1024 // 3.5 MiB
func splitBytes(data []byte, maxSize int) (result [][]byte) {
l := len(data)
if l == 0 {
return [][]byte{data}
}
for i := 0; i < l; i += maxSize {
last := i + maxSize
if last > l {
last = l
}
result = append(result, data[i:last])
}
return
}
// SendPutRequest prepares object and sends it in chunks through protobuf stream.
func SendPutRequest(s Service_PutClient, obj *Object, epoch uint64, ttl uint32) (*PutResponse, error) {
// TODO split must take into account size of the marshalled Object
chunks := splitBytes(obj.Payload, maxGetPayloadSize)
obj.Payload = chunks[0]
if err := s.Send(MakePutRequestHeader(obj, epoch, ttl, nil)); err != nil {
return nil, err
}
for i := range chunks[1:] {
if err := s.Send(MakePutRequestChunk(chunks[i+1])); err != nil {
return nil, err
}
}
resp, err := s.CloseAndRecv()
if err != nil && err != io.EOF {
return nil, err
}
return resp, nil
}
// MakePutRequestHeader combines object, epoch, ttl and session token value
// into header of object put request.
func MakePutRequestHeader(obj *Object, epoch uint64, ttl uint32, token *session.Token) *PutRequest {
return &PutRequest{
R: &PutRequest_Header{
Header: &PutRequest_PutHeader{
Epoch: epoch,
Object: obj,
TTL: ttl,
Token: token,
},
},
}
}
// MakePutRequestChunk splits data into chunks that will be transferred
// in the protobuf stream.
func MakePutRequestChunk(chunk []byte) *PutRequest {
return &PutRequest{R: &PutRequest_Chunk{Chunk: chunk}}
}
func errMaxSizeExceeded(size uint64) error {
return errors.Errorf("object payload size exceed: %s", bytefmt.ByteSize(size))
}
// ReceiveGetResponse receives object by chunks from the protobuf stream
// and combine it into single get response structure.
func ReceiveGetResponse(c Service_GetClient, maxSize uint64) (*GetResponse, error) {
res, err := c.Recv()
if err == io.EOF {
return res, err
} else if err != nil {
return nil, err
}
obj := res.GetObject()
if obj == nil {
return nil, ErrHeaderExpected
}
if obj.SystemHeader.PayloadLength > maxSize {
return nil, errMaxSizeExceeded(maxSize)
}
if res.NotFull() {
payload := make([]byte, obj.SystemHeader.PayloadLength)
offset := copy(payload, obj.Payload)
var r *GetResponse
for r, err = c.Recv(); err == nil; r, err = c.Recv() {
offset += copy(payload[offset:], r.GetChunk())
}
if err != io.EOF {
return nil, err
}
obj.Payload = payload
}
return res, nil
}

132
object/verification.go Normal file
View file

@ -0,0 +1,132 @@
package object
import (
"bytes"
"crypto/ecdsa"
"crypto/sha256"
crypto "github.com/nspcc-dev/neofs-crypto"
"github.com/pkg/errors"
)
func (m Object) headersData(check bool) ([]byte, error) {
var bytebuf = new(bytes.Buffer)
// fixme: we must marshal fields one by one without protobuf marshaling
// protobuf marshaling does not guarantee the same result
if sysheader, err := m.SystemHeader.Marshal(); err != nil {
return nil, err
} else if _, err := bytebuf.Write(sysheader); err != nil {
return nil, err
}
n, _ := m.LastHeader(HeaderType(IntegrityHdr))
for i := range m.Headers {
if check && i == n {
// ignore last integrity header in order to check headers data
continue
}
if header, err := m.Headers[i].Marshal(); err != nil {
return nil, err
} else if _, err := bytebuf.Write(header); err != nil {
return nil, err
}
}
return bytebuf.Bytes(), nil
}
func (m Object) headersChecksum(check bool) ([]byte, error) {
data, err := m.headersData(check)
if err != nil {
return nil, err
}
checksum := sha256.Sum256(data)
return checksum[:], nil
}
// PayloadChecksum calculates sha256 checksum of object payload.
func (m Object) PayloadChecksum() []byte {
checksum := sha256.Sum256(m.Payload)
return checksum[:]
}
func (m Object) verifySignature(key []byte, ih *IntegrityHeader) error {
pk := crypto.UnmarshalPublicKey(key)
if crypto.Verify(pk, ih.HeadersChecksum, ih.ChecksumSignature) == nil {
return nil
}
return ErrVerifySignature
}
// Verify performs local integrity check by finding verification header and
// integrity header. If header integrity is passed, function verifies
// checksum of the object payload.
func (m Object) Verify() error {
var (
err error
checksum []byte
)
// Prepare structures
_, vh := m.LastHeader(HeaderType(VerifyHdr))
if vh == nil {
return ErrHeaderNotFound
}
verify := vh.Value.(*Header_Verify).Verify
_, ih := m.LastHeader(HeaderType(IntegrityHdr))
if ih == nil {
return ErrHeaderNotFound
}
integrity := ih.Value.(*Header_Integrity).Integrity
// Verify signature
err = m.verifySignature(verify.PublicKey, integrity)
if err != nil {
return errors.Wrapf(err, "public key: %x", verify.PublicKey)
}
// Verify checksum of header
checksum, err = m.headersChecksum(true)
if err != nil {
return err
}
if !bytes.Equal(integrity.HeadersChecksum, checksum) {
return ErrVerifyHeader
}
// Verify checksum of payload
if m.SystemHeader.PayloadLength > 0 && !m.IsLinking() {
checksum = m.PayloadChecksum()
_, ph := m.LastHeader(HeaderType(PayloadChecksumHdr))
if ph == nil {
return ErrHeaderNotFound
}
if !bytes.Equal(ph.Value.(*Header_PayloadChecksum).PayloadChecksum, checksum) {
return ErrVerifyPayload
}
}
return nil
}
// Sign creates new integrity header and adds it to the end of the list of
// extended headers.
func (m *Object) Sign(key *ecdsa.PrivateKey) error {
headerChecksum, err := m.headersChecksum(false)
if err != nil {
return err
}
headerChecksumSignature, err := crypto.Sign(key, headerChecksum)
if err != nil {
return err
}
m.AddHeader(&Header{Value: &Header_Integrity{
Integrity: &IntegrityHeader{
HeadersChecksum: headerChecksum,
ChecksumSignature: headerChecksumSignature,
},
}})
return nil
}

105
object/verification_test.go Normal file
View file

@ -0,0 +1,105 @@
package object
import (
"testing"
"github.com/google/uuid"
crypto "github.com/nspcc-dev/neofs-crypto"
"github.com/nspcc-dev/neofs-crypto/test"
"github.com/nspcc-dev/neofs-proto/container"
"github.com/nspcc-dev/neofs-proto/refs"
"github.com/nspcc-dev/neofs-proto/session"
"github.com/stretchr/testify/require"
)
func TestObject_Verify(t *testing.T) {
key := test.DecodeKey(0)
sessionkey := test.DecodeKey(1)
payload := make([]byte, 1024*1024)
cnr, err := container.NewTestContainer()
require.NoError(t, err)
cid, err := cnr.ID()
require.NoError(t, err)
id, err := uuid.NewRandom()
uid := refs.UUID(id)
require.NoError(t, err)
obj := &Object{
SystemHeader: SystemHeader{
ID: uid,
CID: cid,
OwnerID: refs.OwnerID([refs.OwnerIDSize]byte{}),
},
Headers: []Header{
{
Value: &Header_UserHeader{
UserHeader: &UserHeader{
Key: "Profession",
Value: "Developer",
},
},
},
{
Value: &Header_UserHeader{
UserHeader: &UserHeader{
Key: "Language",
Value: "GO",
},
},
},
},
}
obj.SetPayload(payload)
obj.SetHeader(&Header{Value: &Header_PayloadChecksum{[]byte("incorrect checksum")}})
t.Run("error no integrity header", func(t *testing.T) {
err = obj.Verify()
require.EqualError(t, err, ErrHeaderNotFound.Error())
})
badHeaderChecksum := []byte("incorrect checksum")
signature, err := crypto.Sign(sessionkey, badHeaderChecksum)
require.NoError(t, err)
ih := &IntegrityHeader{
HeadersChecksum: badHeaderChecksum,
ChecksumSignature: signature,
}
obj.SetHeader(&Header{Value: &Header_Integrity{ih}})
t.Run("error no validation header", func(t *testing.T) {
err = obj.Verify()
require.EqualError(t, err, ErrHeaderNotFound.Error())
})
dataPK := crypto.MarshalPublicKey(&sessionkey.PublicKey)
signature, err = crypto.Sign(key, dataPK)
vh := &session.VerificationHeader{
PublicKey: dataPK,
KeySignature: signature,
}
obj.SetVerificationHeader(vh)
t.Run("error invalid header checksum", func(t *testing.T) {
err = obj.Verify()
require.EqualError(t, err, ErrVerifyHeader.Error())
})
require.NoError(t, obj.Sign(sessionkey))
t.Run("error invalid payload checksum", func(t *testing.T) {
err = obj.Verify()
require.EqualError(t, err, ErrVerifyPayload.Error())
})
obj.SetHeader(&Header{Value: &Header_PayloadChecksum{obj.PayloadChecksum()}})
require.NoError(t, obj.Sign(sessionkey))
t.Run("correct", func(t *testing.T) {
err = obj.Verify()
require.NoError(t, err)
})
}