frostfs-node/pkg/local_object_storage/metabase/put.go

649 lines
16 KiB
Go
Raw Permalink Normal View History

package meta
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
gio "io"
"time"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/nspcc-dev/neo-go/pkg/io"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
type (
namedBucketItem struct {
name, key, val []byte
}
)
// PutPrm groups the parameters of Put operation.
type PutPrm struct {
obj *objectSDK.Object
id []byte
}
// PutRes groups the resulting values of Put operation.
type PutRes struct {
Inserted bool
}
// SetObject is a Put option to set object to save.
func (p *PutPrm) SetObject(obj *objectSDK.Object) {
p.obj = obj
}
// SetStorageID is a Put option to set storage ID to save.
func (p *PutPrm) SetStorageID(id []byte) {
p.id = id
}
var (
ErrUnknownObjectType = errors.New("unknown object type")
ErrIncorrectSplitInfoUpdate = errors.New("updating split info on object without it")
ErrIncorrectRootObject = errors.New("invalid root object")
)
// Put saves object header in metabase. Object payload expected to be cut.
//
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard.
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("Put", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.Put",
trace.WithAttributes(
attribute.String("address", objectCore.AddressOf(prm.obj).EncodeToString()),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return res, ErrDegradedMode
} else if db.mode.ReadOnly() {
return res, ErrReadOnlyMode
}
currEpoch := db.epochState.CurrentEpoch()
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
var e error
res, e = db.put(tx, prm.obj, prm.id, nil, currEpoch)
return e
})
if err == nil {
success = true
storagelog.Write(db.log,
storagelog.AddressField(objectCore.AddressOf(prm.obj)),
storagelog.OpField("metabase PUT"))
}
return res, metaerr.Wrap(err)
}
func (db *DB) put(tx *bbolt.Tx,
obj *objectSDK.Object,
id []byte,
si *objectSDK.SplitInfo,
currEpoch uint64,
) (PutRes, error) {
cnr, ok := obj.ContainerID()
if !ok {
return PutRes{}, errors.New("missing container in object")
}
isParent := si != nil
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch)
var splitInfoError *objectSDK.SplitInfoError
if errors.As(err, &splitInfoError) {
exists = true // object exists, however it is virtual
} else if err != nil {
return PutRes{}, err // return any error besides SplitInfoError
}
if exists {
return PutRes{}, db.updateObj(tx, obj, id, si, isParent)
}
return PutRes{Inserted: true}, db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch)
}
func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error {
// most right child and split header overlap parent so we have to
// check if object exists to not overwrite it twice
// When storage engine moves objects between different sub-storages,
// it calls metabase.Put method with new storage ID, thus triggering this code.
if !isParent && id != nil {
return setStorageID(tx, objectCore.AddressOf(obj), id, true)
}
// when storage already has last object in split hierarchy and there is
// a linking object to put (or vice versa), we should update split info
// with object ids of these objects
if isParent {
return updateSplitInfo(tx, objectCore.AddressOf(obj), si)
}
return nil
}
func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64) error {
if par := obj.Parent(); par != nil && !isParent { // limit depth by two
parentSI, err := splitInfoFromObject(obj)
if err != nil {
return err
}
_, err = db.put(tx, par, id, parentSI, currEpoch)
if err != nil {
return err
}
}
err := putUniqueIndexes(tx, obj, si, id)
if err != nil {
return fmt.Errorf("can't put unique indexes: %w", err)
}
err = updateListIndexes(tx, obj, putListIndexItem)
if err != nil {
return fmt.Errorf("can't put list indexes: %w", err)
}
err = updateFKBTIndexes(tx, obj, putFKBTIndexItem)
if err != nil {
return fmt.Errorf("can't put fake bucket tree indexes: %w", err)
}
// update container volume size estimation
if obj.Type() == objectSDK.TypeRegular && !isParent {
err = changeContainerSize(tx, cnr, obj.PayloadSize(), true)
if err != nil {
return err
}
}
if !isParent {
if err = db.incCounters(tx, cnr, IsUserObject(obj)); err != nil {
return err
}
}
return nil
}
func putUniqueIndexes(
tx *bbolt.Tx,
obj *objectSDK.Object,
si *objectSDK.SplitInfo,
id []byte,
) error {
isParent := si != nil
addr := objectCore.AddressOf(obj)
cnr := addr.Container()
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
bucketName := make([]byte, bucketKeySize)
// add value to primary unique bucket
if !isParent {
switch obj.Type() {
case objectSDK.TypeRegular:
bucketName = primaryBucketName(cnr, bucketName)
case objectSDK.TypeTombstone:
bucketName = tombstoneBucketName(cnr, bucketName)
case objectSDK.TypeLock:
bucketName = bucketNameLockers(cnr, bucketName)
default:
return ErrUnknownObjectType
}
rawObject, err := obj.CutPayload().Marshal()
if err != nil {
return fmt.Errorf("can't marshal object header: %w", err)
}
err = putUniqueIndexItem(tx, namedBucketItem{
name: bucketName,
key: objKey,
val: rawObject,
})
if err != nil {
return err
}
// index storageID if it is present
if id != nil {
if err = setStorageID(tx, objectCore.AddressOf(obj), id, false); err != nil {
return err
}
}
}
// index root object
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() {
var (
err error
splitInfo []byte
)
if isParent {
splitInfo, err = si.Marshal()
if err != nil {
return fmt.Errorf("can't marshal split info: %w", err)
}
}
isObjKeySet := true
if ecHead := obj.ECHeader(); ecHead != nil {
if err = putECInfo(tx, cnr, objKey, ecHead); err != nil {
return err
}
objKey, isObjKeySet = objectKeyByECHeader(ecHead)
}
if isObjKeySet {
err = putUniqueIndexItem(tx, namedBucketItem{
name: rootBucketName(cnr, bucketName),
key: objKey,
val: splitInfo,
})
if err != nil {
return err
}
}
}
return nil
}
// objectKeyByECHeader returns objectKey for an object that has EC Header.
// If object's parent is in Split, then parent's non-nil Split parent ID is set to object key.
// If object's parent is not in Split, then its ID is set to object key.
// Otherwise, such object keys should be ignored -- they are not put to the root bucket.
func objectKeyByECHeader(ech *objectSDK.ECHeader) (objKey []byte, isSet bool) {
if ech.ParentSplitID() != nil {
if parentSplitParentID := ech.ParentSplitParentID(); parentSplitParentID != nil {
isSet = true
objKey = objectKey(*parentSplitParentID, make([]byte, objectKeySize))
}
return
}
isSet = true
objKey = objectKey(ech.Parent(), make([]byte, objectKeySize))
return
}
type updateIndexItemFunc = func(tx *bbolt.Tx, item namedBucketItem) error
func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error {
idObj, _ := obj.ID()
cnr, _ := obj.ContainerID()
objKey := objectKey(idObj, make([]byte, objectKeySize))
bucketName := make([]byte, bucketKeySize)
cs, _ := obj.PayloadChecksum()
// index payload hashes
err := f(tx, namedBucketItem{
name: payloadHashBucketName(cnr, bucketName),
key: cs.Value(),
val: objKey,
})
if err != nil {
return err
}
idParent, ok := obj.ParentID()
// index parent ids
if ok {
err := f(tx, namedBucketItem{
name: parentBucketName(cnr, bucketName),
key: objectKey(idParent, make([]byte, objectKeySize)),
val: objKey,
})
if err != nil {
return err
}
}
// index split ids
if obj.SplitID() != nil {
err := f(tx, namedBucketItem{
name: splitBucketName(cnr, bucketName),
key: obj.SplitID().ToV2(),
val: objKey,
})
if err != nil {
return err
}
}
if ech := obj.ECHeader(); ech != nil {
err := f(tx, namedBucketItem{
name: ecParentToChunksBucketName(cnr, bucketName),
key: objectKey(ech.Parent(), make([]byte, objectKeySize)),
val: objKey,
})
if err != nil {
return err
}
}
return nil
}
func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error {
id, _ := obj.ID()
cnr, _ := obj.ContainerID()
objKey := objectKey(id, make([]byte, objectKeySize))
key := make([]byte, bucketKeySize)
err := f(tx, namedBucketItem{
name: ownerBucketName(cnr, key),
key: []byte(obj.OwnerID().EncodeToString()),
val: objKey,
})
if err != nil {
return err
}
var attrs []objectSDK.Attribute
if obj.ECHeader() != nil {
attrs = obj.ECHeader().ParentAttributes()
objKey = objectKey(obj.ECHeader().Parent(), make([]byte, objectKeySize))
} else {
attrs = obj.Attributes()
}
// user specified attributes
for i := range attrs {
key = attributeBucketName(cnr, attrs[i].Key(), key)
err := f(tx, namedBucketItem{
name: key,
key: []byte(attrs[i].Value()),
val: objKey,
})
if err != nil {
return err
}
}
return nil
}
type bucketContainer interface {
Bucket([]byte) *bbolt.Bucket
CreateBucket([]byte) (*bbolt.Bucket, error)
CreateBucketIfNotExists([]byte) (*bbolt.Bucket, error)
}
func createBucketLikelyExists[T bucketContainer](tx T, name []byte) (*bbolt.Bucket, error) {
if bkt := tx.Bucket(name); bkt != nil {
return bkt, nil
}
return tx.CreateBucket(name)
}
func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
bkt, err := createBucketLikelyExists(tx, item.name)
if err != nil {
return fmt.Errorf("can't create index %v: %w", item.name, err)
}
return bkt.Put(item.key, item.val)
}
func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
bkt, err := createBucketLikelyExists(tx, item.name)
if err != nil {
return fmt.Errorf("can't create index %v: %w", item.name, err)
}
fkbtRoot, err := createBucketLikelyExists(bkt, item.key)
if err != nil {
return fmt.Errorf("can't create fake bucket tree index %v: %w", item.key, err)
}
return fkbtRoot.Put(item.val, zeroValue)
}
func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
bkt, err := createBucketLikelyExists(tx, item.name)
if err != nil {
return fmt.Errorf("can't create index %v: %w", item.name, err)
}
lst, err := decodeList(bkt.Get(item.key))
if err != nil {
return fmt.Errorf("can't decode leaf list %v: %w", item.key, err)
}
lst = append(lst, item.val)
encodedLst, err := encodeList(lst)
if err != nil {
return fmt.Errorf("can't encode leaf list %v: %w", item.key, err)
}
return bkt.Put(item.key, encodedLst)
}
// encodeList decodes list of bytes into a single blog for list bucket indexes.
func encodeList(lst [][]byte) ([]byte, error) {
w := io.NewBufBinWriter()
w.WriteVarUint(uint64(len(lst)))
for i := range lst {
w.WriteVarBytes(lst[i])
}
if w.Err != nil {
return nil, w.Err
}
return w.Bytes(), nil
}
// decodeList decodes blob into the list of bytes from list bucket index.
func decodeList(data []byte) (lst [][]byte, err error) {
if len(data) == 0 {
return nil, nil
}
var offset uint64
size, n, err := getVarUint(data)
if err != nil {
return nil, err
}
offset += uint64(n)
lst = make([][]byte, size, size+1)
for i := range lst {
sz, n, err := getVarUint(data[offset:])
if err != nil {
return nil, err
}
offset += uint64(n)
next := offset + sz
if uint64(len(data)) < next {
return nil, gio.ErrUnexpectedEOF
}
lst[i] = data[offset:next]
offset = next
}
return lst, nil
}
func getVarUint(data []byte) (uint64, int, error) {
if len(data) == 0 {
return 0, 0, gio.ErrUnexpectedEOF
}
switch b := data[0]; b {
case 0xfd:
if len(data) < 3 {
return 0, 1, gio.ErrUnexpectedEOF
}
return uint64(binary.LittleEndian.Uint16(data[1:])), 3, nil
case 0xfe:
if len(data) < 5 {
return 0, 1, gio.ErrUnexpectedEOF
}
return uint64(binary.LittleEndian.Uint32(data[1:])), 5, nil
case 0xff:
if len(data) < 9 {
return 0, 1, gio.ErrUnexpectedEOF
}
return binary.LittleEndian.Uint64(data[1:]), 9, nil
default:
return uint64(b), 1, nil
}
}
// setStorageID for existing objects if they were moved from one
// storage location to another.
func setStorageID(tx *bbolt.Tx, addr oid.Address, id []byte, override bool) error {
key := make([]byte, bucketKeySize)
bkt, err := createBucketLikelyExists(tx, smallBucketName(addr.Container(), key))
if err != nil {
return err
}
key = objectKey(addr.Object(), key)
if override || bkt.Get(key) == nil {
return bkt.Put(key, id)
}
return nil
}
// updateSpliInfo for existing objects if storage filled with extra information
// about last object in split hierarchy or linking object.
func updateSplitInfo(tx *bbolt.Tx, addr oid.Address, from *objectSDK.SplitInfo) error {
key := make([]byte, bucketKeySize)
bkt := tx.Bucket(rootBucketName(addr.Container(), key))
if bkt == nil {
// if object doesn't exists and we want to update split info on it
// then ignore, this should never happen
return ErrIncorrectSplitInfoUpdate
}
objectKey := objectKey(addr.Object(), key)
rawSplitInfo := bkt.Get(objectKey)
if len(rawSplitInfo) == 0 {
return ErrIncorrectSplitInfoUpdate
}
to := objectSDK.NewSplitInfo()
err := to.Unmarshal(rawSplitInfo)
if err != nil {
return fmt.Errorf("can't unmarshal split info from root index: %w", err)
}
result := util.MergeSplitInfo(from, to)
rawSplitInfo, err = result.Marshal()
if err != nil {
return fmt.Errorf("can't marhsal merged split info: %w", err)
}
return bkt.Put(objectKey, rawSplitInfo)
}
// splitInfoFromObject returns split info based on last or linkin object.
// Otherwise returns nil, nil.
func splitInfoFromObject(obj *objectSDK.Object) (*objectSDK.SplitInfo, error) {
if obj.Parent() == nil {
return nil, nil
}
info := objectSDK.NewSplitInfo()
info.SetSplitID(obj.SplitID())
switch {
case isLinkObject(obj):
id, ok := obj.ID()
if !ok {
return nil, errors.New("missing object ID")
}
info.SetLink(id)
case isLastObject(obj):
id, ok := obj.ID()
if !ok {
return nil, errors.New("missing object ID")
}
info.SetLastPart(id)
default:
return nil, ErrIncorrectRootObject // should never happen
}
return info, nil
}
// isLinkObject returns true if object contains parent header and list
// of children.
func isLinkObject(obj *objectSDK.Object) bool {
return len(obj.Children()) > 0 && obj.Parent() != nil
}
// isLastObject returns true if object contains only parent header without list
// of children.
func isLastObject(obj *objectSDK.Object) bool {
return len(obj.Children()) == 0 && obj.Parent() != nil
}
func putECInfo(tx *bbolt.Tx,
cnr cid.ID, objKey []byte,
ecHead *objectSDK.ECHeader,
) error {
parentID := objectKey(ecHead.Parent(), make([]byte, objectKeySize))
bucketName := make([]byte, bucketKeySize)
val := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), parentID)
if len(val) == 0 {
val = objKey
} else {
offset := 0
found := false
for offset < len(val) {
if bytes.Equal(objKey, val[offset:offset+objectKeySize]) {
found = true
break
}
offset += objectKeySize
}
if !found {
val = append(val, objKey...)
}
}
return putUniqueIndexItem(tx, namedBucketItem{
name: ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
key: parentID,
val: val,
})
}