forked from TrueCloudLab/frostfs-node
[#9999] objectstore: Put
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
80cf000304
commit
671eba46bc
10 changed files with 679 additions and 58 deletions
|
@ -830,6 +830,8 @@ func (c *cfg) getObjectStoreOpts(shCfg shardCfg) []objectstore.Option {
|
||||||
objectstore.WithBlobPath(shCfg.objectStoreCfg.blobPath),
|
objectstore.WithBlobPath(shCfg.objectStoreCfg.blobPath),
|
||||||
objectstore.WithMetaPath(shCfg.objectStoreCfg.metaPath),
|
objectstore.WithMetaPath(shCfg.objectStoreCfg.metaPath),
|
||||||
objectstore.WithWalPath(shCfg.objectStoreCfg.walPath),
|
objectstore.WithWalPath(shCfg.objectStoreCfg.walPath),
|
||||||
|
objectstore.WithEpochSource(c.cfgNetmap.state),
|
||||||
|
objectstore.WithLogger(c.log),
|
||||||
)
|
)
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
53
pkg/core/object/split_info.go
Normal file
53
pkg/core/object/split_info.go
Normal file
|
@ -0,0 +1,53 @@
|
||||||
|
package object
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ErrIncorrectRootObject = errors.New("invalid root object")
|
||||||
|
|
||||||
|
// SplitInfoFromObject returns split info based on last or linkin object.
|
||||||
|
// Otherwise returns nil, nil.
|
||||||
|
func SplitInfoFromObject(obj *objectSDK.Object) (*objectSDK.SplitInfo, error) {
|
||||||
|
if obj.Parent() == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
info := objectSDK.NewSplitInfo()
|
||||||
|
info.SetSplitID(obj.SplitID())
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case IsLinkObject(obj):
|
||||||
|
id, ok := obj.ID()
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("missing object ID")
|
||||||
|
}
|
||||||
|
|
||||||
|
info.SetLink(id)
|
||||||
|
case isLastObject(obj):
|
||||||
|
id, ok := obj.ID()
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("missing object ID")
|
||||||
|
}
|
||||||
|
|
||||||
|
info.SetLastPart(id)
|
||||||
|
default:
|
||||||
|
return nil, ErrIncorrectRootObject // should never happen
|
||||||
|
}
|
||||||
|
|
||||||
|
return info, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsLinkObject returns true if object contains parent header and list
|
||||||
|
// of children.
|
||||||
|
func IsLinkObject(obj *objectSDK.Object) bool {
|
||||||
|
return len(obj.Children()) > 0 && obj.Parent() != nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// isLastObject returns true if object contains only parent header without list
|
||||||
|
// of children.
|
||||||
|
func isLastObject(obj *objectSDK.Object) bool {
|
||||||
|
return len(obj.Children()) == 0 && obj.Parent() != nil
|
||||||
|
}
|
|
@ -258,7 +258,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
var isLinkingObj bool
|
var isLinkingObj bool
|
||||||
var ecInfo *objectcore.ECInfo
|
var ecInfo *objectcore.ECInfo
|
||||||
if objType == objectSDK.TypeRegular {
|
if objType == objectSDK.TypeRegular {
|
||||||
isLinkingObj = isLinkObject(&o)
|
isLinkingObj = objectcore.IsLinkObject(&o)
|
||||||
ecHeader := o.ECHeader()
|
ecHeader := o.ECHeader()
|
||||||
if ecHeader != nil {
|
if ecHeader != nil {
|
||||||
ecInfo = &objectcore.ECInfo{
|
ecInfo = &objectcore.ECInfo{
|
||||||
|
@ -424,7 +424,7 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, p
|
||||||
if err := o.Unmarshal(v); err != nil {
|
if err := o.Unmarshal(v); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
isLinkingObj = isLinkObject(&o)
|
isLinkingObj = objectcore.IsLinkObject(&o)
|
||||||
ecHeader := o.ECHeader()
|
ecHeader := o.ECHeader()
|
||||||
if ecHeader != nil {
|
if ecHeader != nil {
|
||||||
ecInfo = &objectcore.ECInfo{
|
ecInfo = &objectcore.ECInfo{
|
||||||
|
|
|
@ -58,10 +58,7 @@ func (p *PutPrm) SetIndexAttributes(v bool) {
|
||||||
p.indexAttributes = v
|
p.indexAttributes = v
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var ErrUnknownObjectType = errors.New("unknown object type")
|
||||||
ErrUnknownObjectType = errors.New("unknown object type")
|
|
||||||
ErrIncorrectRootObject = errors.New("invalid root object")
|
|
||||||
)
|
|
||||||
|
|
||||||
// Put saves object header in metabase. Object payload expected to be cut.
|
// Put saves object header in metabase. Object payload expected to be cut.
|
||||||
//
|
//
|
||||||
|
@ -166,7 +163,7 @@ func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *obje
|
||||||
|
|
||||||
func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64, indexAttributes bool) error {
|
func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64, indexAttributes bool) error {
|
||||||
if par := obj.Parent(); par != nil && !isParent { // limit depth by two
|
if par := obj.Parent(); par != nil && !isParent { // limit depth by two
|
||||||
parentSI, err := splitInfoFromObject(obj)
|
parentSI, err := objectCore.SplitInfoFromObject(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -614,47 +611,3 @@ func updateSplitInfo(tx *bbolt.Tx, addr oid.Address, from *objectSDK.SplitInfo)
|
||||||
objKey := objectKey(addr.Object(), make([]byte, bucketKeySize))
|
objKey := objectKey(addr.Object(), make([]byte, bucketKeySize))
|
||||||
return updateSplitInfoIndex(tx, objKey, addr.Container(), make([]byte, bucketKeySize), from)
|
return updateSplitInfoIndex(tx, objKey, addr.Container(), make([]byte, bucketKeySize), from)
|
||||||
}
|
}
|
||||||
|
|
||||||
// splitInfoFromObject returns split info based on last or linkin object.
|
|
||||||
// Otherwise returns nil, nil.
|
|
||||||
func splitInfoFromObject(obj *objectSDK.Object) (*objectSDK.SplitInfo, error) {
|
|
||||||
if obj.Parent() == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
info := objectSDK.NewSplitInfo()
|
|
||||||
info.SetSplitID(obj.SplitID())
|
|
||||||
|
|
||||||
switch {
|
|
||||||
case isLinkObject(obj):
|
|
||||||
id, ok := obj.ID()
|
|
||||||
if !ok {
|
|
||||||
return nil, errors.New("missing object ID")
|
|
||||||
}
|
|
||||||
|
|
||||||
info.SetLink(id)
|
|
||||||
case isLastObject(obj):
|
|
||||||
id, ok := obj.ID()
|
|
||||||
if !ok {
|
|
||||||
return nil, errors.New("missing object ID")
|
|
||||||
}
|
|
||||||
|
|
||||||
info.SetLastPart(id)
|
|
||||||
default:
|
|
||||||
return nil, ErrIncorrectRootObject // should never happen
|
|
||||||
}
|
|
||||||
|
|
||||||
return info, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// isLinkObject returns true if object contains parent header and list
|
|
||||||
// of children.
|
|
||||||
func isLinkObject(obj *objectSDK.Object) bool {
|
|
||||||
return len(obj.Children()) > 0 && obj.Parent() != nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// isLastObject returns true if object contains only parent header without list
|
|
||||||
// of children.
|
|
||||||
func isLastObject(obj *objectSDK.Object) bool {
|
|
||||||
return len(obj.Children()) == 0 && obj.Parent() != nil
|
|
||||||
}
|
|
||||||
|
|
41
pkg/local_object_storage/objectstore/blob.go
Normal file
41
pkg/local_object_storage/objectstore/blob.go
Normal file
|
@ -0,0 +1,41 @@
|
||||||
|
package objectstore
|
||||||
|
|
||||||
|
import "encoding/binary"
|
||||||
|
|
||||||
|
const (
|
||||||
|
compressionNone byte = 0
|
||||||
|
compressionZSTD byte = 1
|
||||||
|
|
||||||
|
blobTypeObject byte = 0 // blob contains only one object
|
||||||
|
|
||||||
|
blobStateReserved byte = 0 // blob reserved, but not commited yet
|
||||||
|
blobStateCommited byte = 1 // blob contains object
|
||||||
|
)
|
||||||
|
|
||||||
|
type blob struct {
|
||||||
|
Type byte
|
||||||
|
State byte
|
||||||
|
CreationEpoch uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *blob) Bytes() []byte {
|
||||||
|
result := make([]byte, 10)
|
||||||
|
result[0] = b.Type
|
||||||
|
result[1] = b.State
|
||||||
|
binary.LittleEndian.PutUint64(result[2:], b.CreationEpoch)
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
type objectIDPartBlobIDValue struct {
|
||||||
|
Compression byte
|
||||||
|
Offset uint64
|
||||||
|
Lenght uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *objectIDPartBlobIDValue) Bytes() []byte {
|
||||||
|
result := make([]byte, 1+8+8)
|
||||||
|
result[0] = v.Compression
|
||||||
|
binary.LittleEndian.PutUint64(result[1:], v.Offset)
|
||||||
|
binary.LittleEndian.PutUint64(result[1+8:], v.Lenght)
|
||||||
|
return result
|
||||||
|
}
|
|
@ -3,11 +3,15 @@ package objectstore
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/fs"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
"github.com/cockroachdb/pebble"
|
"github.com/cockroachdb/pebble"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const permissions fs.FileMode = 0o644
|
||||||
|
|
||||||
func (s *ObjectStore) Open(ctx context.Context, m mode.Mode) error {
|
func (s *ObjectStore) Open(ctx context.Context, m mode.Mode) error {
|
||||||
s.mtx.Lock()
|
s.mtx.Lock()
|
||||||
defer s.mtx.Unlock()
|
defer s.mtx.Unlock()
|
||||||
|
@ -16,6 +20,18 @@ func (s *ObjectStore) Open(ctx context.Context, m mode.Mode) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := util.MkdirAllX(s.cfg.blobPath, permissions); err != nil {
|
||||||
|
return fmt.Errorf("create blob dir: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := util.MkdirAllX(s.cfg.walPath, permissions); err != nil {
|
||||||
|
return fmt.Errorf("create WAL dir: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := util.MkdirAllX(s.cfg.metaPath, permissions); err != nil {
|
||||||
|
return fmt.Errorf("create meta dir: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
db, err := pebble.Open(s.cfg.metaPath, pebbleOptions(s.cfg.walPath, m.ReadOnly()))
|
db, err := pebble.Open(s.cfg.metaPath, pebbleOptions(s.cfg.walPath, m.ReadOnly()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("objectstore pebble db open: %w", err)
|
return fmt.Errorf("objectstore pebble db open: %w", err)
|
||||||
|
|
|
@ -5,18 +5,24 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type EpochSource interface {
|
||||||
|
CurrentEpoch() uint64
|
||||||
|
}
|
||||||
|
|
||||||
type config struct {
|
type config struct {
|
||||||
logger *logger.Logger
|
logger *logger.Logger
|
||||||
walPath string
|
walPath string
|
||||||
blobPath string
|
blobPath string
|
||||||
metaPath string
|
metaPath string
|
||||||
|
epochSource EpochSource
|
||||||
}
|
}
|
||||||
|
|
||||||
type Option func(*config)
|
type Option func(*config)
|
||||||
|
|
||||||
func defaultCfg() *config {
|
func defaultCfg() *config {
|
||||||
return &config{
|
return &config{
|
||||||
logger: logger.NewLoggerWrapper(zap.L()),
|
logger: logger.NewLoggerWrapper(zap.L()),
|
||||||
|
epochSource: &epochSourceStub{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,3 +43,23 @@ func WithWalPath(path string) Option {
|
||||||
c.walPath = path
|
c.walPath = path
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithEpochSource(es EpochSource) Option {
|
||||||
|
return func(c *config) {
|
||||||
|
c.epochSource = es
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithLogger(logger *logger.Logger) Option {
|
||||||
|
return func(c *config) {
|
||||||
|
c.logger = logger
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ EpochSource = (*epochSourceStub)(nil)
|
||||||
|
|
||||||
|
type epochSourceStub struct{}
|
||||||
|
|
||||||
|
func (e *epochSourceStub) CurrentEpoch() uint64 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package objectstore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"errors"
|
||||||
"log"
|
"log"
|
||||||
|
|
||||||
"github.com/cockroachdb/pebble"
|
"github.com/cockroachdb/pebble"
|
||||||
|
@ -90,3 +91,12 @@ func (s *ObjectStore) readValue(key []byte, reader func(value []byte)) error {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *ObjectStore) rwBatch(f func(batch *pebble.Batch) error) error {
|
||||||
|
b := s.meta.NewIndexedBatch()
|
||||||
|
err := f(b)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Join(err, b.Close())
|
||||||
|
}
|
||||||
|
return errors.Join(b.Commit(pebble.Sync), b.Close())
|
||||||
|
}
|
||||||
|
|
|
@ -2,10 +2,372 @@ package objectstore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
objectAPI "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/cockroachdb/pebble"
|
||||||
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const flags = os.O_WRONLY | os.O_CREATE | os.O_TRUNC | os.O_EXCL | os.O_SYNC
|
||||||
|
|
||||||
func (s *ObjectStore) Put(ctx context.Context, object *objectSDK.Object) error {
|
func (s *ObjectStore) Put(ctx context.Context, object *objectSDK.Object) error {
|
||||||
panic("unimplemented")
|
s.mtx.RLock()
|
||||||
|
defer s.mtx.RUnlock()
|
||||||
|
|
||||||
|
if err := s.validateState(true); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
epoch := s.cfg.epochSource.CurrentEpoch()
|
||||||
|
|
||||||
|
payload := object.Payload()
|
||||||
|
header, err := object.CutPayload().Marshal()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("marshal object header: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
blobID, err := s.reserveObjectBlob(epoch)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("reserve blob for object: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := s.saveObjectToObjectBlob(header, payload, blobID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("save object to blob: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.saveObjectMeta(object, result, blobID, epoch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ObjectStore) reserveObjectBlob(epoch uint64) (uuid.UUID, error) {
|
||||||
|
blobID, err := uuid.NewRandom()
|
||||||
|
if err != nil {
|
||||||
|
return uuid.Nil, fmt.Errorf("blobID generate: %w", err)
|
||||||
|
}
|
||||||
|
val := &blob{
|
||||||
|
Type: blobTypeObject,
|
||||||
|
State: blobStateReserved,
|
||||||
|
CreationEpoch: epoch,
|
||||||
|
}
|
||||||
|
if err := s.meta.Set(blobIDKey(blobID), val.Bytes(), pebble.Sync); err != nil {
|
||||||
|
return uuid.Nil, fmt.Errorf("set blobID value: %w", err)
|
||||||
|
}
|
||||||
|
return blobID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// saveObjectToObjectBlob saves object's header and payload to blob of 'object' type.
|
||||||
|
//
|
||||||
|
// Object blob's schema:
|
||||||
|
//
|
||||||
|
// [0] - blob type
|
||||||
|
//
|
||||||
|
// [1-8] - header length
|
||||||
|
//
|
||||||
|
// [9-16] - payload length
|
||||||
|
//
|
||||||
|
// [17 - 17+len(header)-1] - header
|
||||||
|
//
|
||||||
|
// [17+len(header) - ...] - payload.
|
||||||
|
func (s *ObjectStore) saveObjectToObjectBlob(header, payload []byte, blobID uuid.UUID) (*saveObjectBlobResult, error) {
|
||||||
|
path := filepath.Join(s.cfg.blobPath, blobID.String()+".blob")
|
||||||
|
|
||||||
|
f, err := os.OpenFile(path, flags, permissions)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
closed := false
|
||||||
|
defer func() {
|
||||||
|
if !closed {
|
||||||
|
_ = f.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, err = f.Write([]byte{blobTypeObject})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = f.Write([]byte{blobTypeObject})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := make([]byte, 8)
|
||||||
|
binary.LittleEndian.PutUint64(buf, uint64(len(header)))
|
||||||
|
_, err = f.Write(buf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
binary.LittleEndian.PutUint64(buf, uint64(len(payload)))
|
||||||
|
_, err = f.Write(buf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = f.Write(header)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(payload) > 0 {
|
||||||
|
_, err = f.Write(payload)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
closed = true
|
||||||
|
if err := f.Close(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &saveObjectBlobResult{
|
||||||
|
Header: objectIDPartBlobIDValue{
|
||||||
|
Compression: compressionNone,
|
||||||
|
Offset: 17,
|
||||||
|
Lenght: uint64(len(header)),
|
||||||
|
},
|
||||||
|
Payload: objectIDPartBlobIDValue{
|
||||||
|
Compression: compressionNone,
|
||||||
|
Offset: 17 + uint64(len(header)),
|
||||||
|
Lenght: uint64(len(payload)),
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type saveObjectBlobResult struct {
|
||||||
|
Header objectIDPartBlobIDValue
|
||||||
|
Payload objectIDPartBlobIDValue
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ObjectStore) saveObjectMeta(object *objectSDK.Object, blobs *saveObjectBlobResult, blobID uuid.UUID, epoch uint64) error {
|
||||||
|
return s.rwBatch(func(b *pebble.Batch) error {
|
||||||
|
if parent := object.Parent(); parent != nil {
|
||||||
|
if err := saveParentInfo(b, object, parent); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := saveObjectMeta(b, parent); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := saveObjectMeta(b, object); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := saveStoredObject(b, object); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return saveBlob(b, object, blobs, blobID, epoch)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func saveParentInfo(b *pebble.Batch, object, parent *objectSDK.Object) error {
|
||||||
|
parentAddress := objectCore.AddressOf(parent)
|
||||||
|
objectAddress := objectCore.AddressOf(object)
|
||||||
|
|
||||||
|
splitInfo, err := objectCore.SplitInfoFromObject(object)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if splitInfo != nil {
|
||||||
|
if err := saveSplitInfo(b, parentAddress, splitInfo); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return b.Set(parentKey(parentAddress.Container(), parentAddress.Object(), objectAddress.Object()), zeroValue, pebble.Sync)
|
||||||
|
}
|
||||||
|
|
||||||
|
func saveSplitInfo(b *pebble.Batch, addr oid.Address, splitInfo *objectSDK.SplitInfo) error {
|
||||||
|
if err := b.Set(splitInfoKey(addr.Container(), addr.Object(), splitInfoTypeSplitID),
|
||||||
|
splitInfo.SplitID().ToV2(), pebble.Sync); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if lastPartID, isSet := splitInfo.LastPart(); isSet {
|
||||||
|
if err := b.Set(splitInfoKey(addr.Container(), addr.Object(), splitInfoTypeLastPart),
|
||||||
|
lastPartID[:], pebble.Sync); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if linkObjectID, isSet := splitInfo.Link(); isSet {
|
||||||
|
if err := b.Set(splitInfoKey(addr.Container(), addr.Object(), splitInfoTypeLinkingObject),
|
||||||
|
linkObjectID[:], pebble.Sync); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func saveObjectMeta(b *pebble.Batch, object *objectSDK.Object) error {
|
||||||
|
if err := saveECParentInfo(b, object); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := saveECInfo(b, object); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := saveExpirationEpoch(b, object); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := saveTombstoneInfo(b, object); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := saveIndex(b, object); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func saveECParentInfo(b *pebble.Batch, object *objectSDK.Object) error {
|
||||||
|
ecHeader := object.ECHeader()
|
||||||
|
if ecHeader == nil || ecHeader.ParentSplitID() == nil || ecHeader.ParentSplitParentID() == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
objectAddress := objectCore.AddressOf(object)
|
||||||
|
|
||||||
|
splitInfo := objectSDK.NewSplitInfo()
|
||||||
|
splitInfo.SetSplitID(ecHeader.ParentSplitID())
|
||||||
|
splitInfo.SetLastPart(ecHeader.Parent())
|
||||||
|
|
||||||
|
var parentAddress oid.Address
|
||||||
|
parentAddress.SetContainer(objectAddress.Container())
|
||||||
|
parentAddress.SetObject(*ecHeader.ParentSplitParentID())
|
||||||
|
|
||||||
|
return saveSplitInfo(b, parentAddress, splitInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
func saveECInfo(b *pebble.Batch, object *objectSDK.Object) error {
|
||||||
|
ecHeader := object.ECHeader()
|
||||||
|
if ecHeader == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
objectAddress := objectCore.AddressOf(object)
|
||||||
|
return b.Set(ecParentKey(objectAddress.Container(), ecHeader.Parent(), objectAddress.Object()), zeroValue, pebble.Sync)
|
||||||
|
}
|
||||||
|
|
||||||
|
func saveExpirationEpoch(b *pebble.Batch, object *objectSDK.Object) error {
|
||||||
|
expEpoch, defined := hasExpirationEpoch(object)
|
||||||
|
if !defined {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
objectAddress := objectCore.AddressOf(object)
|
||||||
|
if err := b.Set(expEpochToObjectKey(objectAddress.Container(), objectAddress.Object(), expEpoch), zeroValue, pebble.Sync); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := b.Set(objectToExpEpochKey(objectAddress.Container(), objectAddress.Object(), expEpoch), zeroValue, pebble.Sync); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func hasExpirationEpoch(obj *objectSDK.Object) (uint64, bool) {
|
||||||
|
attributes := obj.Attributes()
|
||||||
|
if ech := obj.ECHeader(); ech != nil {
|
||||||
|
attributes = ech.ParentAttributes()
|
||||||
|
}
|
||||||
|
for _, attr := range attributes {
|
||||||
|
if attr.Key() == objectAPI.SysAttributeExpEpoch {
|
||||||
|
expEpoch, err := strconv.ParseUint(attr.Value(), 10, 64)
|
||||||
|
return expEpoch, err == nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func saveTombstoneInfo(b *pebble.Batch, object *objectSDK.Object) error {
|
||||||
|
if object.Type() != objectSDK.TypeTombstone {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
objectAddress := objectCore.AddressOf(object)
|
||||||
|
tombstone := objectSDK.NewTombstone()
|
||||||
|
if err := tombstone.Unmarshal(object.Payload()); err != nil {
|
||||||
|
return fmt.Errorf("unmarshal tombstone content: %w", err)
|
||||||
|
}
|
||||||
|
for _, objID := range tombstone.Members() {
|
||||||
|
if err := b.Set(objectToTombstoneKey(objectAddress.Container(), objID, objectAddress.Object()), zeroValue, pebble.Sync); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := b.Set(tombstoneToObjectKey(objectAddress.Container(), objID, objectAddress.Object()), zeroValue, pebble.Sync); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func saveIndex(b *pebble.Batch, object *objectSDK.Object) error {
|
||||||
|
objectAddress := objectCore.AddressOf(object)
|
||||||
|
if splitID := object.SplitID(); splitID != nil {
|
||||||
|
if err := b.Set(splitIDIndexKey(objectAddress.Container(), objectAddress.Object(), splitID), zeroValue, pebble.Sync); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
attrs := object.Attributes()
|
||||||
|
if ecHeader := object.ECHeader(); ecHeader != nil {
|
||||||
|
attrs = ecHeader.ParentAttributes()
|
||||||
|
objectAddress.SetObject(ecHeader.Parent())
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, attr := range attrs {
|
||||||
|
if err := b.Set(attributeIndexKey(objectAddress.Container(), objectAddress.Object(), attr.Key(), attr.Value()), zeroValue, pebble.Sync); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func saveBlob(b *pebble.Batch, object *objectSDK.Object, blobs *saveObjectBlobResult, blobID uuid.UUID, epoch uint64) error {
|
||||||
|
val := &blob{
|
||||||
|
Type: blobTypeObject,
|
||||||
|
State: blobStateCommited,
|
||||||
|
CreationEpoch: epoch,
|
||||||
|
}
|
||||||
|
objectAddress := objectCore.AddressOf(object)
|
||||||
|
if err := b.Set(blobIDKey(blobID), val.Bytes(), pebble.Sync); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := b.Set(objectToBlobKey(objectAddress.Container(), objectAddress.Object(), partHeader, blobID), blobs.Header.Bytes(), pebble.Sync); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := b.Set(objectToBlobKey(objectAddress.Container(), objectAddress.Object(), partPayload, blobID), blobs.Payload.Bytes(), pebble.Sync); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func saveStoredObject(b *pebble.Batch, object *objectSDK.Object) error {
|
||||||
|
objectAddress := objectCore.AddressOf(object)
|
||||||
|
if err := b.Set(storedObjectKey(objectAddress.Container(), objectAddress.Object()), storedObjectValueBlob, pebble.Sync); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
switch object.Type() {
|
||||||
|
case objectSDK.TypeRegular:
|
||||||
|
err = b.Set(regularObjectKey(objectAddress.Container(), objectAddress.Object()), zeroValue, pebble.Sync)
|
||||||
|
case objectSDK.TypeTombstone:
|
||||||
|
err = b.Set(tombstoneObjectKey(objectAddress.Container(), objectAddress.Object()), zeroValue, pebble.Sync)
|
||||||
|
default:
|
||||||
|
err = fmt.Errorf("unsupported object type %d", object.Type())
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +1,168 @@
|
||||||
package objectstore
|
package objectstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/binary"
|
||||||
|
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
prefixInfo byte = 0
|
prefixInfo byte = 0
|
||||||
|
prefixBlobID byte = 1
|
||||||
|
prefixObjectIDPartBlobID byte = 2
|
||||||
|
prefixSplitInfo byte = 3
|
||||||
|
prefixParent byte = 4
|
||||||
|
prefixECParent byte = 5
|
||||||
|
prefixExpEpochToObject byte = 6
|
||||||
|
prefixObjectToExpEpoch byte = 7
|
||||||
|
prefixObjectToTombstone byte = 8
|
||||||
|
prefixTombstoneToObject byte = 9
|
||||||
|
prefixSplitIDIndex byte = 10
|
||||||
|
prefixAttributeIndex byte = 11
|
||||||
|
prefixStoredObject byte = 12
|
||||||
|
prefixRegularObject byte = 13
|
||||||
|
prefixTombstoneObject byte = 14
|
||||||
|
|
||||||
|
partHeader = 0
|
||||||
|
partPayload = 1
|
||||||
|
|
||||||
|
splitInfoTypeSplitID = 0
|
||||||
|
splitInfoTypeLastPart = 1
|
||||||
|
splitInfoTypeLinkingObject = 2
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
versionKey = append([]byte{prefixInfo}, []byte("version")...)
|
versionKey = append([]byte{prefixInfo}, []byte("version")...)
|
||||||
upgradeKey = append([]byte{prefixInfo}, []byte("upgrade")...)
|
upgradeKey = append([]byte{prefixInfo}, []byte("upgrade")...)
|
||||||
|
zeroValue = []byte{}
|
||||||
|
|
||||||
|
storedObjectValueBlob = []byte{0}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func blobIDKey(blobID uuid.UUID) []byte {
|
||||||
|
result := make([]byte, 17)
|
||||||
|
result[0] = prefixBlobID
|
||||||
|
copy(result[1:], blobID[:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func objectToBlobKey(containerID cid.ID, objectID oid.ID, partType byte, blobID uuid.UUID) []byte {
|
||||||
|
result := make([]byte, 1+32+32+1+16)
|
||||||
|
result[0] = prefixObjectIDPartBlobID
|
||||||
|
containerID.Encode(result[1:])
|
||||||
|
objectID.Encode(result[1+32:])
|
||||||
|
result[1+32+32] = partType
|
||||||
|
copy(result[1+32+32+1:], blobID[:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func storedObjectKey(containerID cid.ID, objectID oid.ID) []byte {
|
||||||
|
result := make([]byte, 1+32+32)
|
||||||
|
result[0] = prefixStoredObject
|
||||||
|
containerID.Encode(result[1:])
|
||||||
|
objectID.Encode(result[1+32:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func regularObjectKey(containerID cid.ID, objectID oid.ID) []byte {
|
||||||
|
result := make([]byte, 1+32+32)
|
||||||
|
result[0] = prefixRegularObject
|
||||||
|
containerID.Encode(result[1:])
|
||||||
|
objectID.Encode(result[1+32:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func tombstoneObjectKey(containerID cid.ID, objectID oid.ID) []byte {
|
||||||
|
result := make([]byte, 1+32+32)
|
||||||
|
result[0] = prefixTombstoneObject
|
||||||
|
containerID.Encode(result[1:])
|
||||||
|
objectID.Encode(result[1+32:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func splitInfoKey(containerID cid.ID, objectID oid.ID, splitInfoType byte) []byte {
|
||||||
|
result := make([]byte, 1+32+32+1)
|
||||||
|
result[0] = prefixSplitInfo
|
||||||
|
containerID.Encode(result[1:])
|
||||||
|
objectID.Encode(result[33:])
|
||||||
|
result[65] = splitInfoType
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func parentKey(containerID cid.ID, parentObjectID, objectID oid.ID) []byte {
|
||||||
|
result := make([]byte, 1+32+32+32)
|
||||||
|
result[0] = prefixParent
|
||||||
|
containerID.Encode(result[1:])
|
||||||
|
parentObjectID.Encode(result[33:])
|
||||||
|
objectID.Encode(result[65:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func ecParentKey(containerID cid.ID, ecParentObjectID, objectID oid.ID) []byte {
|
||||||
|
result := make([]byte, 1+32+32+32)
|
||||||
|
result[0] = prefixECParent
|
||||||
|
containerID.Encode(result[1:])
|
||||||
|
ecParentObjectID.Encode(result[33:])
|
||||||
|
objectID.Encode(result[65:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func expEpochToObjectKey(containerID cid.ID, objectID oid.ID, expEpoch uint64) []byte {
|
||||||
|
result := make([]byte, 1+8+32+32)
|
||||||
|
result[0] = prefixExpEpochToObject
|
||||||
|
binary.LittleEndian.PutUint64(result[1:], expEpoch)
|
||||||
|
containerID.Encode(result[9:])
|
||||||
|
objectID.Encode(result[41:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func objectToExpEpochKey(containerID cid.ID, objectID oid.ID, expEpoch uint64) []byte {
|
||||||
|
result := make([]byte, 1+32+32+8)
|
||||||
|
result[0] = prefixObjectToExpEpoch
|
||||||
|
containerID.Encode(result[1:])
|
||||||
|
objectID.Encode(result[33:])
|
||||||
|
binary.LittleEndian.PutUint64(result[65:], expEpoch)
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func objectToTombstoneKey(containerID cid.ID, objectID, tombstoneID oid.ID) []byte {
|
||||||
|
result := make([]byte, 1+32+32+32)
|
||||||
|
result[0] = prefixObjectToTombstone
|
||||||
|
containerID.Encode(result[1:])
|
||||||
|
objectID.Encode(result[33:])
|
||||||
|
tombstoneID.Encode(result[65:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func tombstoneToObjectKey(containerID cid.ID, objectID, tombstoneID oid.ID) []byte {
|
||||||
|
result := make([]byte, 1+32+32+32)
|
||||||
|
result[0] = prefixTombstoneToObject
|
||||||
|
containerID.Encode(result[1:])
|
||||||
|
tombstoneID.Encode(result[33:])
|
||||||
|
objectID.Encode(result[65:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func splitIDIndexKey(containerID cid.ID, objectID oid.ID, splitID *objectSDK.SplitID) []byte {
|
||||||
|
splitIDBytes := splitID.ToV2()
|
||||||
|
result := make([]byte, 1+len(splitIDBytes)+32+32)
|
||||||
|
result[0] = prefixSplitIDIndex
|
||||||
|
copy(result[1:], splitIDBytes)
|
||||||
|
containerID.Encode(result[1+len(splitIDBytes):])
|
||||||
|
objectID.Encode(result[1+len(splitIDBytes)+32:])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func attributeIndexKey(containerID cid.ID, objectID oid.ID, key, value string) []byte {
|
||||||
|
result := make([]byte, 1+32+len(key)+1+len(value)+32)
|
||||||
|
result[0] = prefixAttributeIndex
|
||||||
|
containerID.Encode(result[1:])
|
||||||
|
copy(result[33:], []byte(key))
|
||||||
|
result[1+32+len(key)] = '='
|
||||||
|
copy(result[1+32+len(key)+1:], []byte(value))
|
||||||
|
objectID.Encode(result[1+32+len(key)+1+len(value):])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue