WIP: Change metabase engine to pebble #1221
7 changed files with 280 additions and 454 deletions
24
pkg/local_object_storage/metabase/concurrency.go
Normal file
24
pkg/local_object_storage/metabase/concurrency.go
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
package meta
|
||||||
|
|
||||||
|
import (
|
||||||
|
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
)
|
||||||
|
|
||||||
|
type concurrency struct {
|
||||||
|
containerLocks *utilSync.KeyLocker[cid.ID]
|
||||||
|
}
|
||||||
|
|
||||||
|
func newConcurrency() *concurrency {
|
||||||
|
return &concurrency{
|
||||||
|
containerLocks: utilSync.NewKeyLocker[cid.ID](),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *concurrency) LockContainerID(id cid.ID) func() {
|
||||||
|
c.containerLocks.Lock(id)
|
||||||
|
|
||||||
|
return func() {
|
||||||
|
c.containerLocks.Unlock(id)
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,8 +17,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
containerSizeKeySize = 1 + cidSize + 2
|
containerSizeKeySize = 1 + cidSize
|
||||||
containerSizePrefixSize = 1 + cidSize
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (db *DB) Containers(ctx context.Context) (list []cid.ID, err error) {
|
func (db *DB) Containers(ctx context.Context) (list []cid.ID, err error) {
|
||||||
|
@ -64,7 +63,7 @@ func containers(ctx context.Context, r pebble.Reader) ([]cid.ID, error) {
|
||||||
for v := it.First(); v; v = it.Next() {
|
for v := it.First(); v; v = it.Next() {
|
||||||
if parseContainerIDWithIgnore(&cnr, it.Key(), unique) {
|
if parseContainerIDWithIgnore(&cnr, it.Key(), unique) {
|
||||||
result = append(result, cnr)
|
result = append(result, cnr)
|
||||||
unique[string(it.Key()[1:containerSizePrefixSize])] = struct{}{}
|
unique[string(it.Key()[1:containerSizeKeySize])] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,11 +88,25 @@ func (db *DB) ContainerSize(ctx context.Context, id cid.ID) (size uint64, err er
|
||||||
return 0, ErrDegradedMode
|
return 0, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
result, err := db.containerSizesInternal(ctx, &id)
|
err = db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
|
val, err := valueSafe(s, containerSizeKey(id))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(val) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
value, ok := parseSize(val)
|
||||||
|
if !ok || value < 0 {
|
||||||
|
return fmt.Errorf("invalid container size value for container %s", id)
|
||||||
|
}
|
||||||
|
size = uint64(value)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, metaerr.Wrap(err)
|
return 0, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
return result[id], nil
|
return size, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) ContainerSizes(ctx context.Context) (map[cid.ID]uint64, error) {
|
func (db *DB) ContainerSizes(ctx context.Context) (map[cid.ID]uint64, error) {
|
||||||
|
@ -104,7 +117,7 @@ func (db *DB) ContainerSizes(ctx context.Context) (map[cid.ID]uint64, error) {
|
||||||
return nil, ErrDegradedMode
|
return nil, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
return db.containerSizesInternal(ctx, nil)
|
return db.containerSizesInternal(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ZeroSizeContainers returns containers with size = 0.
|
// ZeroSizeContainers returns containers with size = 0.
|
||||||
|
@ -123,7 +136,7 @@ func (db *DB) ZeroSizeContainers(ctx context.Context) ([]cid.ID, error) {
|
||||||
db.modeMtx.RLock()
|
db.modeMtx.RLock()
|
||||||
defer db.modeMtx.RUnlock()
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
sizes, err := db.containerSizesInternal(ctx, nil)
|
sizes, err := db.containerSizesInternal(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -162,17 +175,16 @@ func (db *DB) DeleteContainerSize(ctx context.Context, id cid.ID) error {
|
||||||
return ErrReadOnlyMode
|
return ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer db.guard.LockContainerID(id)()
|
||||||
|
|
||||||
return metaerr.Wrap(db.batch(
|
return metaerr.Wrap(db.batch(
|
||||||
func(b *pebble.Batch) error {
|
func(b *pebble.Batch) error {
|
||||||
return deleteByPrefix(ctx, b, containerSizeKeyPrefix(id))
|
return b.Delete(containerSizeKey(id), pebble.Sync)
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) containerSizesInternal(ctx context.Context, id *cid.ID) (map[cid.ID]uint64, error) {
|
func (db *DB) containerSizesInternal(ctx context.Context) (map[cid.ID]uint64, error) {
|
||||||
prefix := []byte{containerSizePrefix}
|
prefix := []byte{containerSizePrefix}
|
||||||
if id != nil {
|
|
||||||
prefix = containerSizeKeyPrefix(*id)
|
|
||||||
}
|
|
||||||
result := make(map[cid.ID]int64)
|
result := make(map[cid.ID]int64)
|
||||||
err := db.snapshot(func(s *pebble.Snapshot) error {
|
err := db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
it, err := s.NewIter(&pebble.IterOptions{
|
it, err := s.NewIter(&pebble.IterOptions{
|
||||||
|
@ -192,7 +204,7 @@ func (db *DB) containerSizesInternal(ctx context.Context, id *cid.ID) (map[cid.I
|
||||||
|
|
||||||
key := it.Key()
|
key := it.Key()
|
||||||
var cnr cid.ID
|
var cnr cid.ID
|
||||||
if err := cnr.Decode(key[1:containerSizePrefixSize]); err != nil {
|
if err := cnr.Decode(key[1:containerSizeKeySize]); err != nil {
|
||||||
return errors.Join(fmt.Errorf("invalid container size key: %w", err), it.Close())
|
return errors.Join(fmt.Errorf("invalid container size key: %w", err), it.Close())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -222,8 +234,8 @@ func normilizeContainerSizes(sizes map[cid.ID]int64) (map[cid.ID]uint64, error)
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func changeContainerSize(b *pebble.Batch, id cid.ID, delta int64, bucketID uint16) error {
|
func changeContainerSize(b *pebble.Batch, id cid.ID, delta int64) error {
|
||||||
key := containerSizeKey(id, bucketID)
|
key := containerSizeKey(id)
|
||||||
|
|
||||||
v, err := valueSafe(b, key)
|
v, err := valueSafe(b, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -240,20 +252,11 @@ func changeContainerSize(b *pebble.Batch, id cid.ID, delta int64, bucketID uint1
|
||||||
return b.Set(key, value, pebble.Sync)
|
return b.Set(key, value, pebble.Sync)
|
||||||
}
|
}
|
||||||
|
|
||||||
// containerSizeKeyPrefix returns containerSizePrefix_CID key prefix.
|
|
||||||
func containerSizeKeyPrefix(cnr cid.ID) []byte {
|
|
||||||
result := make([]byte, containerSizePrefixSize)
|
|
||||||
result[0] = containerSizePrefix
|
|
||||||
cnr.Encode(result[1:])
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
// containerSizeKey returns containerVolumePrefix_CID_bucketID key.
|
// containerSizeKey returns containerVolumePrefix_CID_bucketID key.
|
||||||
func containerSizeKey(cnr cid.ID, bucketID uint16) []byte {
|
func containerSizeKey(cnr cid.ID) []byte {
|
||||||
result := make([]byte, containerSizeKeySize)
|
result := make([]byte, containerSizeKeySize)
|
||||||
result[0] = containerSizePrefix
|
result[0] = containerSizePrefix
|
||||||
cnr.Encode(result[1:])
|
cnr.Encode(result[1:])
|
||||||
binary.LittleEndian.PutUint16(result[containerSizePrefixSize:], bucketID)
|
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,8 +26,7 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
containerObjectCountKeySize = 1 + cidSize + 2
|
containerObjectCountKeySize = 1 + cidSize
|
||||||
containerObjectCountPrefixSize = 1 + cidSize
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ObjectCounters groups object counter
|
// ObjectCounters groups object counter
|
||||||
|
@ -64,7 +63,7 @@ func (db *DB) ObjectCounters(ctx context.Context) (ObjectCounters, error) {
|
||||||
var cc map[cid.ID]ObjectCounters
|
var cc map[cid.ID]ObjectCounters
|
||||||
err := db.snapshot(func(s *pebble.Snapshot) error {
|
err := db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
var err error
|
var err error
|
||||||
cc, err = containerObjectCounters(ctx, s, nil)
|
cc, err = containerObjectCounters(ctx, s)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -107,7 +106,7 @@ func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error)
|
||||||
}
|
}
|
||||||
err := db.snapshot(func(s *pebble.Snapshot) error {
|
err := db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
var err error
|
var err error
|
||||||
cc.Counts, err = containerObjectCounters(ctx, s, nil)
|
cc.Counts, err = containerObjectCounters(ctx, s)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -136,27 +135,44 @@ func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, er
|
||||||
return ObjectCounters{}, ErrDegradedMode
|
return ObjectCounters{}, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
var cc map[cid.ID]ObjectCounters
|
var res ObjectCounters
|
||||||
err := db.snapshot(func(s *pebble.Snapshot) error {
|
err := db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
var err error
|
val, err := valueSafe(s, containerCounterKey(id))
|
||||||
cc, err = containerObjectCounters(ctx, s, &id)
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
}
|
||||||
|
if len(val) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
oc, err := parseContainerCounterValue(val)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if oc.Logic < 0 || oc.Phy < 0 || oc.User < 0 {
|
||||||
|
return fmt.Errorf("invalid container object counter for container ID %s", id.EncodeToString())
|
||||||
|
}
|
||||||
|
|
||||||
|
res.Logic = uint64(oc.Logic)
|
||||||
|
res.Phy = uint64(oc.Phy)
|
||||||
|
res.User = uint64(oc.User)
|
||||||
|
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ObjectCounters{}, metaerr.Wrap(err)
|
return ObjectCounters{}, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
return cc[id], nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func containerCounterKey(cnrID cid.ID, bucketID uint16) []byte {
|
func containerCounterKey(cnrID cid.ID) []byte {
|
||||||
result := make([]byte, containerObjectCountKeySize)
|
result := make([]byte, containerObjectCountKeySize)
|
||||||
result[0] = containerCountersPrefix
|
result[0] = containerCountersPrefix
|
||||||
cnrID.Encode(result[1:])
|
cnrID.Encode(result[1:])
|
||||||
binary.LittleEndian.PutUint16(result[containerObjectCountPrefixSize:], bucketID)
|
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func incCounters(b *pebble.Batch, cnrID cid.ID, isUserObject bool, bucketID uint16) error {
|
func incCounters(b *pebble.Batch, cnrID cid.ID, isUserObject bool) error {
|
||||||
delta := objectCounterValue{
|
delta := objectCounterValue{
|
||||||
Logic: 1,
|
Logic: 1,
|
||||||
Phy: 1,
|
Phy: 1,
|
||||||
|
@ -164,24 +180,20 @@ func incCounters(b *pebble.Batch, cnrID cid.ID, isUserObject bool, bucketID uint
|
||||||
if isUserObject {
|
if isUserObject {
|
||||||
delta.User = 1
|
delta.User = 1
|
||||||
}
|
}
|
||||||
return editContainerCounterValue(b, cnrID, delta, bucketID)
|
return editContainerCounterValue(b, cnrID, delta)
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateContainerCounter(b *pebble.Batch, delta map[cid.ID]objectCounterValue, bucketIDs map[cid.ID]uint16) error {
|
func updateContainerCounter(b *pebble.Batch, delta map[cid.ID]objectCounterValue) error {
|
||||||
for cnrID, cnrDelta := range delta {
|
for cnrID, cnrDelta := range delta {
|
||||||
bucketID, found := bucketIDs[cnrID]
|
if err := editContainerCounterValue(b, cnrID, cnrDelta); err != nil {
|
||||||
if !found {
|
|
||||||
return fmt.Errorf("bucket ID not found for container %s", cnrID)
|
|
||||||
}
|
|
||||||
if err := editContainerCounterValue(b, cnrID, cnrDelta, bucketID); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func editContainerCounterValue(b *pebble.Batch, cnrID cid.ID, delta objectCounterValue, bucketID uint16) error {
|
func editContainerCounterValue(b *pebble.Batch, cnrID cid.ID, delta objectCounterValue) error {
|
||||||
key := containerCounterKey(cnrID, bucketID)
|
key := containerCounterKey(cnrID)
|
||||||
val, err := valueSafe(b, key)
|
val, err := valueSafe(b, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -284,9 +296,7 @@ func setObjectCounters(b *pebble.Batch, counters map[cid.ID]ObjectCounters) erro
|
||||||
Phy: int64(count.Phy),
|
Phy: int64(count.Phy),
|
||||||
User: int64(count.User),
|
User: int64(count.User),
|
||||||
}
|
}
|
||||||
// this function called by init or refill, so no other updates should happen
|
if err := editContainerCounterValue(b, cnrID, delta); err != nil {
|
||||||
// so here bucketID = 0 can be used
|
|
||||||
if err := editContainerCounterValue(b, cnrID, delta, 0); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -329,7 +339,7 @@ func (db *DB) ZeroCountContainers(ctx context.Context) ([]cid.ID, error) {
|
||||||
var cc map[cid.ID]ObjectCounters
|
var cc map[cid.ID]ObjectCounters
|
||||||
err := db.snapshot(func(s *pebble.Snapshot) error {
|
err := db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
var err error
|
var err error
|
||||||
cc, err = containerObjectCounters(ctx, s, nil)
|
cc, err = containerObjectCounters(ctx, s)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -370,12 +380,10 @@ func (db *DB) DeleteContainerCount(ctx context.Context, id cid.ID) error {
|
||||||
return ErrReadOnlyMode
|
return ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
|
|
||||||
prefix := make([]byte, containerObjectCountPrefixSize)
|
defer db.guard.LockContainerID(id)()
|
||||||
prefix[0] = containerCountersPrefix
|
|
||||||
id.Encode(prefix[1:])
|
|
||||||
|
|
||||||
err := db.batch(func(b *pebble.Batch) error {
|
err := db.batch(func(b *pebble.Batch) error {
|
||||||
return deleteByPrefix(ctx, b, prefix)
|
return b.Delete(containerCounterKey(id), pebble.Sync)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return metaerr.Wrap(err)
|
return metaerr.Wrap(err)
|
||||||
|
@ -385,17 +393,12 @@ func (db *DB) DeleteContainerCount(ctx context.Context, id cid.ID) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func containerObjectCounterInitialized(ctx context.Context, r pebble.Reader) bool {
|
func containerObjectCounterInitialized(ctx context.Context, r pebble.Reader) bool {
|
||||||
_, err := containerObjectCounters(ctx, r, nil)
|
_, err := containerObjectCounters(ctx, r)
|
||||||
return err == nil
|
return err == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func containerObjectCounters(ctx context.Context, r pebble.Reader, cnrID *cid.ID) (map[cid.ID]ObjectCounters, error) {
|
func containerObjectCounters(ctx context.Context, r pebble.Reader) (map[cid.ID]ObjectCounters, error) {
|
||||||
prefix := []byte{containerCountersPrefix}
|
prefix := []byte{containerCountersPrefix}
|
||||||
if cnrID != nil {
|
|
||||||
buf := make([]byte, cidSize)
|
|
||||||
cnrID.Encode(buf)
|
|
||||||
prefix = append(prefix, buf...)
|
|
||||||
}
|
|
||||||
it, err := r.NewIter(&pebble.IterOptions{
|
it, err := r.NewIter(&pebble.IterOptions{
|
||||||
LowerBound: prefix,
|
LowerBound: prefix,
|
||||||
OnlyReadGuaranteedDurable: true,
|
OnlyReadGuaranteedDurable: true,
|
||||||
|
@ -421,7 +424,7 @@ func containerObjectCounters(ctx context.Context, r pebble.Reader, cnrID *cid.ID
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Join(err, it.Close())
|
return nil, errors.Join(err, it.Close())
|
||||||
}
|
}
|
||||||
counters[cnrID] = mergeObjectCounterValues(counters[cnrID], oc)
|
counters[cnrID] = oc
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := it.Close(); err != nil {
|
if err := it.Close(); err != nil {
|
||||||
|
|
|
@ -58,6 +58,8 @@ type cfg struct {
|
||||||
|
|
||||||
epochState EpochState
|
epochState EpochState
|
||||||
metrics Metrics
|
metrics Metrics
|
||||||
|
|
||||||
|
guard *concurrency
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
func defaultCfg() *cfg {
|
||||||
|
@ -68,6 +70,7 @@ func defaultCfg() *cfg {
|
||||||
log: &logger.Logger{Logger: zap.L()},
|
log: &logger.Logger{Logger: zap.L()},
|
||||||
metrics: &noopMetrics{},
|
metrics: &noopMetrics{},
|
||||||
dbOptions: &pebble.Options{},
|
dbOptions: &pebble.Options{},
|
||||||
|
guard: newConcurrency(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -15,63 +14,23 @@ import (
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"github.com/cockroachdb/pebble"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
|
||||||
"go.opentelemetry.io/otel/trace"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var errFailedToRemoveUniqueIndexes = errors.New("can't remove unique indexes")
|
var errFailedToRemoveUniqueIndexes = errors.New("can't remove unique indexes")
|
||||||
|
|
||||||
// DeletePrm groups the parameters of Delete operation.
|
// DeletePrm groups the parameters of Delete operation.
|
||||||
type DeletePrm struct {
|
type DeletePrm struct {
|
||||||
addrs []oid.Address
|
Address oid.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteRes groups the resulting values of Delete operation.
|
// DeleteRes groups the resulting values of Delete operation.
|
||||||
type DeleteRes struct {
|
type DeleteRes struct {
|
||||||
phyCount uint64
|
PhyCount uint64
|
||||||
logicCount uint64
|
LogicCount uint64
|
||||||
userCount uint64
|
UserCount uint64
|
||||||
phySize uint64
|
PhySize uint64
|
||||||
logicSize uint64
|
LogicSize uint64
|
||||||
removedByCnrID map[cid.ID]ObjectCounters
|
|
||||||
}
|
|
||||||
|
|
||||||
// LogicCount returns the number of removed logic
|
|
||||||
// objects.
|
|
||||||
func (d DeleteRes) LogicCount() uint64 {
|
|
||||||
return d.logicCount
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d DeleteRes) UserCount() uint64 {
|
|
||||||
return d.userCount
|
|
||||||
}
|
|
||||||
|
|
||||||
// RemovedByCnrID returns the number of removed objects by container ID.
|
|
||||||
func (d DeleteRes) RemovedByCnrID() map[cid.ID]ObjectCounters {
|
|
||||||
return d.removedByCnrID
|
|
||||||
}
|
|
||||||
|
|
||||||
// PhyCount returns the number of removed physical objects.
|
|
||||||
func (d DeleteRes) PhyCount() uint64 {
|
|
||||||
return d.phyCount
|
|
||||||
}
|
|
||||||
|
|
||||||
// PhySize returns the size of removed physical objects.
|
|
||||||
func (d DeleteRes) PhySize() uint64 {
|
|
||||||
return d.phySize
|
|
||||||
}
|
|
||||||
|
|
||||||
// LogicSize returns the size of removed logical objects.
|
|
||||||
func (d DeleteRes) LogicSize() uint64 {
|
|
||||||
return d.logicSize
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetAddresses is a Delete option to set the addresses of the objects to delete.
|
|
||||||
//
|
|
||||||
// Option is required.
|
|
||||||
func (p *DeletePrm) SetAddresses(addrs ...oid.Address) {
|
|
||||||
p.addrs = addrs
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type referenceNumber struct {
|
type referenceNumber struct {
|
||||||
|
@ -82,8 +41,6 @@ type referenceNumber struct {
|
||||||
obj *objectSDK.Object
|
obj *objectSDK.Object
|
||||||
}
|
}
|
||||||
|
|
||||||
type referenceCounter map[string]*referenceNumber
|
|
||||||
|
|
||||||
// Delete removed object records from metabase indexes.
|
// Delete removed object records from metabase indexes.
|
||||||
func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
||||||
var (
|
var (
|
||||||
|
@ -94,10 +51,7 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
||||||
db.metrics.AddMethodDuration("Delete", time.Since(startedAt), deleted)
|
db.metrics.AddMethodDuration("Delete", time.Since(startedAt), deleted)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.Delete",
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.Delete")
|
||||||
trace.WithAttributes(
|
|
||||||
attribute.Int("addr_count", len(prm.addrs)),
|
|
||||||
))
|
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
db.modeMtx.RLock()
|
db.modeMtx.RLock()
|
||||||
|
@ -109,403 +63,260 @@ func (db *DB) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
||||||
return DeleteRes{}, ErrReadOnlyMode
|
return DeleteRes{}, ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer db.guard.LockContainerID(prm.Address.Container())()
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
var res DeleteRes
|
var res DeleteRes
|
||||||
|
|
||||||
err = db.database.Update(func(tx *bbolt.Tx) error {
|
err = db.batch(func(b *pebble.Batch) error {
|
||||||
res, err = db.deleteGroup(tx, prm.addrs)
|
res, err = db.deleteByAddress(ctx, b, prm.Address)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
deleted = true
|
deleted = true
|
||||||
for i := range prm.addrs {
|
storagelog.Write(db.log,
|
||||||
storagelog.Write(db.log,
|
storagelog.AddressField(prm.Address),
|
||||||
storagelog.AddressField(prm.addrs[i]),
|
storagelog.OpField("metabase DELETE"))
|
||||||
storagelog.OpField("metabase DELETE"))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteGroup deletes object from the metabase. Handles removal of the
|
// deleteGroup deletes object from the metabase. Handles removal of the
|
||||||
// references of the split objects.
|
// references of the split objects.
|
||||||
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (DeleteRes, error) {
|
func (db *DB) deleteByAddress(ctx context.Context, b *pebble.Batch, addr oid.Address) (DeleteRes, error) {
|
||||||
res := DeleteRes{
|
refCounter := &referenceNumber{}
|
||||||
removedByCnrID: make(map[cid.ID]ObjectCounters),
|
|
||||||
}
|
|
||||||
refCounter := make(referenceCounter, len(addrs))
|
|
||||||
currEpoch := db.epochState.CurrentEpoch()
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
|
res, err := db.delete(ctx, b, addr, refCounter, currEpoch)
|
||||||
for i := range addrs {
|
if err != nil {
|
||||||
r, err := db.delete(tx, addrs[i], refCounter, currEpoch)
|
|
||||||
if err != nil {
|
|
||||||
return DeleteRes{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
applyDeleteSingleResult(r, &res, addrs, i)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := db.updateCountersDelete(tx, res); err != nil {
|
|
||||||
return DeleteRes{}, err
|
return DeleteRes{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, refNum := range refCounter {
|
if err := db.updateCountersDelete(b, addr.Container(), res); err != nil {
|
||||||
if refNum.cur == refNum.all {
|
return DeleteRes{}, err
|
||||||
err := db.deleteObject(tx, refNum.obj, true)
|
|
||||||
if err != nil {
|
|
||||||
return DeleteRes{}, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if refCounter.cur == refCounter.all {
|
||||||
|
err := db.deleteObject(ctx, b, refCounter.obj, true)
|
||||||
|
if err != nil {
|
||||||
|
return DeleteRes{}, err
|
||||||
|
}
|
||||||
|
}
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) updateCountersDelete(tx *bbolt.Tx, res DeleteRes) error {
|
func (db *DB) updateCountersDelete(b *pebble.Batch, cnrID cid.ID, res DeleteRes) error {
|
||||||
if res.phyCount > 0 {
|
if err := editContainerCounterValue(b, cnrID, objectCounterValue{
|
||||||
err := db.updateShardObjectCounter(tx, phy, res.phyCount, false)
|
Logic: -1 * int64(res.LogicCount),
|
||||||
if err != nil {
|
Phy: -1 * int64(res.PhyCount),
|
||||||
return fmt.Errorf("could not decrease phy object counter: %w", err)
|
User: -1 * int64(res.UserCount),
|
||||||
}
|
}); err != nil {
|
||||||
}
|
|
||||||
|
|
||||||
if res.logicCount > 0 {
|
|
||||||
err := db.updateShardObjectCounter(tx, logical, res.logicCount, false)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not decrease logical object counter: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if res.userCount > 0 {
|
|
||||||
err := db.updateShardObjectCounter(tx, user, res.userCount, false)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not decrease user object counter: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := db.updateContainerCounter(tx, res.removedByCnrID, false); err != nil {
|
|
||||||
return fmt.Errorf("could not decrease container object counter: %w", err)
|
return fmt.Errorf("could not decrease container object counter: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func applyDeleteSingleResult(r deleteSingleResult, res *DeleteRes, addrs []oid.Address, i int) {
|
|
||||||
if r.Phy {
|
|
||||||
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
|
|
||||||
v.Phy++
|
|
||||||
res.removedByCnrID[addrs[i].Container()] = v
|
|
||||||
} else {
|
|
||||||
res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
|
|
||||||
Phy: 1,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
res.phyCount++
|
|
||||||
res.phySize += r.Size
|
|
||||||
}
|
|
||||||
|
|
||||||
if r.Logic {
|
|
||||||
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
|
|
||||||
v.Logic++
|
|
||||||
res.removedByCnrID[addrs[i].Container()] = v
|
|
||||||
} else {
|
|
||||||
res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
|
|
||||||
Logic: 1,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
res.logicCount++
|
|
||||||
res.logicSize += r.Size
|
|
||||||
}
|
|
||||||
|
|
||||||
if r.User {
|
|
||||||
if v, ok := res.removedByCnrID[addrs[i].Container()]; ok {
|
|
||||||
v.User++
|
|
||||||
res.removedByCnrID[addrs[i].Container()] = v
|
|
||||||
} else {
|
|
||||||
res.removedByCnrID[addrs[i].Container()] = ObjectCounters{
|
|
||||||
User: 1,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
res.userCount++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type deleteSingleResult struct {
|
|
||||||
Phy bool
|
|
||||||
Logic bool
|
|
||||||
User bool
|
|
||||||
Size uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
// delete removes object indexes from the metabase. Counts the references
|
// delete removes object indexes from the metabase. Counts the references
|
||||||
// of the object that is being removed.
|
// of the object that is being removed.
|
||||||
// The first return value indicates if an object has been removed. (removing a
|
// The first return value indicates if an object has been removed. (removing a
|
||||||
// non-exist object is error-free). The second return value indicates if an
|
// non-exist object is error-free). The second return value indicates if an
|
||||||
// object was available before the removal (for calculating the logical object
|
// object was available before the removal (for calculating the logical object
|
||||||
// counter). The third return value The fourth return value is removed object payload size.
|
// counter). The third return value The fourth return value is removed object payload size.
|
||||||
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (deleteSingleResult, error) {
|
func (db *DB) delete(ctx context.Context, b *pebble.Batch, addr oid.Address, refCounter *referenceNumber, currEpoch uint64) (DeleteRes, error) {
|
||||||
key := make([]byte, addressKeySize)
|
status, err := inGraveyardWithKey(b, addr)
|
||||||
addrKey := addressKey(addr, key)
|
if err != nil {
|
||||||
garbageBKT := tx.Bucket(garbageBucketName)
|
return DeleteRes{}, err
|
||||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
}
|
||||||
|
removeAvailableObject := status == 0
|
||||||
removeAvailableObject := inGraveyardWithKey(addrKey, graveyardBKT, garbageBKT) == 0
|
|
||||||
|
|
||||||
// unmarshal object, work only with physically stored (raw == true) objects
|
// unmarshal object, work only with physically stored (raw == true) objects
|
||||||
obj, err := db.get(tx, addr, key, false, true, currEpoch)
|
obj, err := get(ctx, b, addr, false, true, currEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if client.IsErrObjectNotFound(err) {
|
if client.IsErrObjectNotFound(err) {
|
||||||
addrKey = addressKey(addr, key)
|
err := deleteFromGarbage(b, addr)
|
||||||
if garbageBKT != nil {
|
if err != nil {
|
||||||
err := garbageBKT.Delete(addrKey)
|
return DeleteRes{}, fmt.Errorf("could not remove from garbage bucket: %w", err)
|
||||||
if err != nil {
|
|
||||||
return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return deleteSingleResult{}, nil
|
return DeleteRes{}, nil
|
||||||
}
|
}
|
||||||
var siErr *objectSDK.SplitInfoError
|
var siErr *objectSDK.SplitInfoError
|
||||||
var ecErr *objectSDK.ECInfoError
|
var ecErr *objectSDK.ECInfoError
|
||||||
if errors.As(err, &siErr) || errors.As(err, &ecErr) {
|
if errors.As(err, &siErr) || errors.As(err, &ecErr) {
|
||||||
// if object is virtual (parent) then do nothing, it will be deleted with last child
|
// if object is virtual (parent) then do nothing, it will be deleted with last child
|
||||||
// if object is erasure-coded it will be deleted with the last chunk presented on the shard
|
// if object is erasure-coded it will be deleted with the last chunk presented on the shard
|
||||||
return deleteSingleResult{}, nil
|
return DeleteRes{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return deleteSingleResult{}, err
|
return DeleteRes{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
addrKey = addressKey(addr, key)
|
|
||||||
// remove record from the garbage bucket
|
// remove record from the garbage bucket
|
||||||
if garbageBKT != nil {
|
err = deleteFromGarbage(b, addr)
|
||||||
err := garbageBKT.Delete(addrKey)
|
if err != nil {
|
||||||
if err != nil {
|
return DeleteRes{}, fmt.Errorf("could not remove from garbage bucket: %w", err)
|
||||||
return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// if object is an only link to a parent, then remove parent
|
// if object is an only link to a parent, then remove parent
|
||||||
if parent := obj.Parent(); parent != nil {
|
if parent := obj.Parent(); parent != nil {
|
||||||
parAddr := object.AddressOf(parent)
|
parAddr := object.AddressOf(parent)
|
||||||
sParAddr := addressKey(parAddr, key)
|
parentLen, err := parentLength(ctx, b, parAddr)
|
||||||
k := string(sParAddr)
|
if err != nil {
|
||||||
|
return DeleteRes{}, fmt.Errorf("failed to get parent count for object %s: %w", parAddr, err)
|
||||||
nRef, ok := refCounter[k]
|
|
||||||
if !ok {
|
|
||||||
nRef = &referenceNumber{
|
|
||||||
all: parentLength(tx, parAddr),
|
|
||||||
addr: parAddr,
|
|
||||||
obj: parent,
|
|
||||||
}
|
|
||||||
|
|
||||||
refCounter[k] = nRef
|
|
||||||
}
|
}
|
||||||
|
refCounter.addr = parAddr
|
||||||
nRef.cur++
|
refCounter.all = parentLen
|
||||||
|
refCounter.obj = parent
|
||||||
|
refCounter.cur = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
isUserObject := IsUserObject(obj)
|
isUserObject := IsUserObject(obj)
|
||||||
|
|
||||||
// remove object
|
// remove object
|
||||||
err = db.deleteObject(tx, obj, false)
|
err = db.deleteObject(ctx, b, obj, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return deleteSingleResult{}, fmt.Errorf("could not remove object: %w", err)
|
return DeleteRes{}, fmt.Errorf("could not remove object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := deleteECRelatedInfo(tx, garbageBKT, obj, addr.Container(), refCounter); err != nil {
|
if err := deleteECRelatedInfo(ctx, b, obj, addr.Container(), refCounter); err != nil {
|
||||||
return deleteSingleResult{}, err
|
return DeleteRes{}, fmt.Errorf("could not remove EC related info object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return deleteSingleResult{
|
var result DeleteRes
|
||||||
Phy: true,
|
result.PhyCount = 1
|
||||||
Logic: removeAvailableObject,
|
result.PhySize = obj.PayloadSize()
|
||||||
User: isUserObject && removeAvailableObject,
|
if removeAvailableObject {
|
||||||
Size: obj.PayloadSize(),
|
result.LogicCount = 1
|
||||||
}, nil
|
result.LogicSize = obj.PayloadSize()
|
||||||
|
}
|
||||||
|
if removeAvailableObject && isUserObject {
|
||||||
|
result.UserCount = 1
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) deleteObject(
|
func (db *DB) deleteObject(
|
||||||
tx *bbolt.Tx,
|
ctx context.Context,
|
||||||
|
b *pebble.Batch,
|
||||||
obj *objectSDK.Object,
|
obj *objectSDK.Object,
|
||||||
isParent bool,
|
isParent bool,
|
||||||
) error {
|
) error {
|
||||||
err := delUniqueIndexes(tx, obj, isParent)
|
err := delUniqueIndexes(ctx, b, obj, isParent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errFailedToRemoveUniqueIndexes
|
return errFailedToRemoveUniqueIndexes
|
||||||
}
|
}
|
||||||
|
|
||||||
err = updateListIndexes(tx, obj, delListIndexItem)
|
err = updateListIndexes(b, obj, deleteByKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't remove list indexes: %w", err)
|
return fmt.Errorf("can't remove list indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = updateFKBTIndexes(tx, obj, delFKBTIndexItem)
|
err = updateFKBTIndexes(b, obj, deleteByKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't remove fake bucket tree indexes: %w", err)
|
return fmt.Errorf("can't remove fake bucket tree indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if isParent {
|
if isParent {
|
||||||
// remove record from the garbage bucket, because regular object deletion does nothing for virtual object
|
// remove record from the garbage bucket, because regular object deletion does nothing for virtual object
|
||||||
garbageBKT := tx.Bucket(garbageBucketName)
|
err := deleteFromGarbage(b, object.AddressOf(obj))
|
||||||
if garbageBKT != nil {
|
if err != nil {
|
||||||
key := make([]byte, addressKeySize)
|
return fmt.Errorf("could not remove from garbage bucket: %w", err)
|
||||||
addrKey := addressKey(object.AddressOf(obj), key)
|
|
||||||
err := garbageBKT.Delete(addrKey)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not remove from garbage bucket: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func deleteByKey(b *pebble.Batch, key []byte) error {
|
||||||
|
return b.Delete(key, pebble.Sync)
|
||||||
|
}
|
||||||
|
|
||||||
|
func deleteFromGarbage(b *pebble.Batch, addr oid.Address) error {
|
||||||
|
return b.Delete(garbageKey(addr.Container(), addr.Object()), pebble.Sync)
|
||||||
|
}
|
||||||
|
|
||||||
// parentLength returns amount of available children from parentid index.
|
// parentLength returns amount of available children from parentid index.
|
||||||
func parentLength(tx *bbolt.Tx, addr oid.Address) int {
|
func parentLength(ctx context.Context, r pebble.Reader, addr oid.Address) (int, error) {
|
||||||
bucketName := make([]byte, bucketKeySize)
|
var result int
|
||||||
|
prefix := parentKeyLongPrefix(addr.Container(), addr.Object())
|
||||||
bkt := tx.Bucket(parentBucketName(addr.Container(), bucketName[:]))
|
for {
|
||||||
if bkt == nil {
|
ids, err := selectByPrefixBatch(ctx, r, prefix, batchSize)
|
||||||
return 0
|
if err != nil {
|
||||||
}
|
return 0, err
|
||||||
|
}
|
||||||
lst, err := decodeList(bkt.Get(objectKey(addr.Object(), bucketName[:])))
|
result += len(ids)
|
||||||
if err != nil {
|
if len(ids) < batchSize {
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
return len(lst)
|
|
||||||
}
|
|
||||||
|
|
||||||
func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) {
|
|
||||||
bkt := tx.Bucket(item.name)
|
|
||||||
if bkt != nil {
|
|
||||||
_ = bkt.Delete(item.key) // ignore error, best effort there
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func delFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
|
||||||
bkt := tx.Bucket(item.name)
|
|
||||||
if bkt == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
fkbtRoot := bkt.Bucket(item.key)
|
|
||||||
if fkbtRoot == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
_ = fkbtRoot.Delete(item.val) // ignore error, best effort there
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
|
||||||
bkt := tx.Bucket(item.name)
|
|
||||||
if bkt == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
lst, err := decodeList(bkt.Get(item.key))
|
|
||||||
if err != nil || len(lst) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// remove element from the list
|
|
||||||
for i := range lst {
|
|
||||||
if bytes.Equal(item.val, lst[i]) {
|
|
||||||
copy(lst[i:], lst[i+1:])
|
|
||||||
lst = lst[:len(lst)-1]
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return result, nil
|
||||||
// if list empty, remove the key from <list> bucket
|
|
||||||
if len(lst) == 0 {
|
|
||||||
_ = bkt.Delete(item.key) // ignore error, best effort there
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// if list is not empty, then update it
|
|
||||||
encodedLst, err := encodeList(lst)
|
|
||||||
if err != nil {
|
|
||||||
return nil // ignore error, best effort there
|
|
||||||
}
|
|
||||||
|
|
||||||
_ = bkt.Put(item.key, encodedLst) // ignore error, best effort there
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error {
|
func delParent(ctx context.Context, b *pebble.Batch, addr oid.Address) error {
|
||||||
addr := object.AddressOf(obj)
|
prefix := parentKeyLongPrefix(addr.Container(), addr.Object())
|
||||||
|
return deleteByPrefix(ctx, b, prefix)
|
||||||
|
}
|
||||||
|
|
||||||
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
func delUniqueIndexes(ctx context.Context, b *pebble.Batch, obj *objectSDK.Object, isParent bool) error {
|
||||||
cnr := addr.Container()
|
addr := object.AddressOf(obj)
|
||||||
bucketName := make([]byte, bucketKeySize)
|
|
||||||
|
|
||||||
// add value to primary unique bucket
|
// add value to primary unique bucket
|
||||||
if !isParent {
|
if !isParent {
|
||||||
|
var key []byte
|
||||||
switch obj.Type() {
|
switch obj.Type() {
|
||||||
case objectSDK.TypeRegular:
|
case objectSDK.TypeRegular:
|
||||||
bucketName = primaryBucketName(cnr, bucketName)
|
key = primaryKey(addr.Container(), addr.Object())
|
||||||
case objectSDK.TypeTombstone:
|
case objectSDK.TypeTombstone:
|
||||||
bucketName = tombstoneBucketName(cnr, bucketName)
|
key = tombstoneKey(addr.Container(), addr.Object())
|
||||||
case objectSDK.TypeLock:
|
case objectSDK.TypeLock:
|
||||||
bucketName = bucketNameLockers(cnr, bucketName)
|
key = lockersKey(addr.Container(), addr.Object())
|
||||||
default:
|
default:
|
||||||
return ErrUnknownObjectType
|
return ErrUnknownObjectType
|
||||||
}
|
}
|
||||||
|
|
||||||
delUniqueIndexItem(tx, namedBucketItem{
|
if err := b.Delete(key, pebble.Sync); err != nil {
|
||||||
name: bucketName,
|
return err
|
||||||
key: objKey,
|
}
|
||||||
})
|
|
||||||
} else {
|
} else {
|
||||||
delUniqueIndexItem(tx, namedBucketItem{
|
if err := delParent(ctx, b, addr); err != nil {
|
||||||
name: parentBucketName(cnr, bucketName),
|
return err
|
||||||
key: objKey,
|
}
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
delUniqueIndexItem(tx, namedBucketItem{ // remove from storage id index
|
if err := b.Delete(smallKey(addr.Container(), addr.Object()), pebble.Sync); err != nil {
|
||||||
name: smallBucketName(cnr, bucketName),
|
return err
|
||||||
key: objKey,
|
}
|
||||||
})
|
if ecHead := obj.ECHeader(); ecHead != nil {
|
||||||
delUniqueIndexItem(tx, namedBucketItem{ // remove from root index
|
if err := b.Delete(ecInfoKey(addr.Container(), ecHead.Parent(), addr.Object()), pebble.Sync); err != nil {
|
||||||
name: rootBucketName(cnr, bucketName),
|
return err
|
||||||
key: objKey,
|
}
|
||||||
})
|
}
|
||||||
|
return b.Delete(rootKey(addr.Container(), addr.Object()), pebble.Sync)
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.Object, cnr cid.ID, refCounter referenceCounter) error {
|
func deleteECRelatedInfo(ctx context.Context, b *pebble.Batch, obj *objectSDK.Object, cnr cid.ID, refCounter *referenceNumber) error {
|
||||||
ech := obj.ECHeader()
|
ech := obj.ECHeader()
|
||||||
if ech == nil {
|
if ech == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
hasAnyChunks := hasAnyECChunks(tx, ech, cnr)
|
hasAnyChunks, err := hasAnyECChunks(ctx, b, ech, cnr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
// drop EC parent GC mark if current EC chunk is the last one
|
// drop EC parent GC mark if current EC chunk is the last one
|
||||||
if !hasAnyChunks && garbageBKT != nil {
|
if !hasAnyChunks {
|
||||||
var ecParentAddress oid.Address
|
var ecParentAddress oid.Address
|
||||||
ecParentAddress.SetContainer(cnr)
|
ecParentAddress.SetContainer(cnr)
|
||||||
ecParentAddress.SetObject(ech.Parent())
|
ecParentAddress.SetObject(ech.Parent())
|
||||||
addrKey := addressKey(ecParentAddress, make([]byte, addressKeySize))
|
err := deleteFromGarbage(b, ecParentAddress)
|
||||||
err := garbageBKT.Delete(addrKey)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not remove EC parent from garbage bucket: %w", err)
|
return fmt.Errorf("could not remove EC parent from garbage bucket: %w", err)
|
||||||
}
|
}
|
||||||
}
|
// also drop EC parent root info if current EC chunk is the last one
|
||||||
|
err = b.Delete(rootKey(ecParentAddress.Container(), ecParentAddress.Object()), pebble.Sync)
|
||||||
// also drop EC parent root info if current EC chunk is the last one
|
if err != nil {
|
||||||
if !hasAnyChunks {
|
return fmt.Errorf("could not remove EC parent from root bucket: %w", err)
|
||||||
delUniqueIndexItem(tx, namedBucketItem{
|
}
|
||||||
name: rootBucketName(cnr, make([]byte, bucketKeySize)),
|
|
||||||
key: objectKey(ech.Parent(), make([]byte, objectKeySize)),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if ech.ParentSplitParentID() == nil {
|
if ech.ParentSplitParentID() == nil {
|
||||||
|
@ -516,38 +327,37 @@ func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.
|
||||||
splitParentAddress.SetContainer(cnr)
|
splitParentAddress.SetContainer(cnr)
|
||||||
splitParentAddress.SetObject(*ech.ParentSplitParentID())
|
splitParentAddress.SetObject(*ech.ParentSplitParentID())
|
||||||
|
|
||||||
if ref, ok := refCounter[string(addressKey(splitParentAddress, make([]byte, addressKeySize)))]; ok {
|
if refCounter.all > 0 {
|
||||||
// linking object is already processing
|
// linking object is already processing
|
||||||
// so just inform that one more reference was deleted
|
// so just inform that one more reference was deleted
|
||||||
// split info and gc marks will be deleted after linking object delete
|
// split info and gc marks will be deleted after linking object delete
|
||||||
ref.cur++
|
refCounter.cur++
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if parentLength(tx, splitParentAddress) > 0 {
|
parLen, err := parentLength(ctx, b, splitParentAddress)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if parLen > 0 {
|
||||||
// linking object still exists, so leave split info and gc mark deletion for linking object processing
|
// linking object still exists, so leave split info and gc mark deletion for linking object processing
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// drop split parent gc mark
|
// drop split parent gc mark
|
||||||
if garbageBKT != nil {
|
err = deleteFromGarbage(b, splitParentAddress)
|
||||||
addrKey := addressKey(splitParentAddress, make([]byte, addressKeySize))
|
if err != nil {
|
||||||
err := garbageBKT.Delete(addrKey)
|
return fmt.Errorf("could not remove EC split parent from garbage bucket: %w", err)
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not remove EC parent from garbage bucket: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// drop split info
|
// drop split info
|
||||||
delUniqueIndexItem(tx, namedBucketItem{
|
return b.Delete(rootKey(splitParentAddress.Container(), splitParentAddress.Object()), pebble.Sync)
|
||||||
name: rootBucketName(cnr, make([]byte, bucketKeySize)),
|
|
||||||
key: objectKey(*ech.ParentSplitParentID(), make([]byte, objectKeySize)),
|
|
||||||
})
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func hasAnyECChunks(tx *bbolt.Tx, ech *objectSDK.ECHeader, cnr cid.ID) bool {
|
func hasAnyECChunks(ctx context.Context, r pebble.Reader, ech *objectSDK.ECHeader, cnr cid.ID) (bool, error) {
|
||||||
data := getFromBucket(tx, ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
|
data, err := selectByPrefixBatch(ctx, r, ecInfoLongKeyPrefix(cnr, ech.Parent()), 1)
|
||||||
objectKey(ech.Parent(), make([]byte, objectKeySize)))
|
if err != nil {
|
||||||
return len(data) > 0
|
return false, err
|
||||||
|
}
|
||||||
|
return len(data) > 0, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,7 +80,7 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) {
|
||||||
|
|
||||||
err = db.database.View(func(tx *bbolt.Tx) error {
|
err = db.database.View(func(tx *bbolt.Tx) error {
|
||||||
key := make([]byte, addressKeySize)
|
key := make([]byte, addressKeySize)
|
||||||
res.hdr, err = db.get(tx, prm.addr, key, true, prm.raw, currEpoch)
|
res.hdr, err = get(tx, prm.addr, key, true, prm.raw, currEpoch)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
@ -88,7 +88,7 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) {
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) {
|
func get(ctx context.Context, r pebble.Reader, addr oid.Address, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) {
|
||||||
if checkStatus {
|
if checkStatus {
|
||||||
switch objectStatus(tx, addr, currEpoch) {
|
switch objectStatus(tx, addr, currEpoch) {
|
||||||
case 1:
|
case 1:
|
||||||
|
|
|
@ -6,8 +6,10 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
gio "io"
|
gio "io"
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
|
@ -55,6 +57,8 @@ var (
|
||||||
ErrUnknownObjectType = errors.New("unknown object type")
|
ErrUnknownObjectType = errors.New("unknown object type")
|
||||||
ErrIncorrectSplitInfoUpdate = errors.New("updating split info on object without it")
|
ErrIncorrectSplitInfoUpdate = errors.New("updating split info on object without it")
|
||||||
ErrIncorrectRootObject = errors.New("invalid root object")
|
ErrIncorrectRootObject = errors.New("invalid root object")
|
||||||
|
|
||||||
|
errInvalidUserAttributeKeyFormat = errors.New("invalid user attribute key format")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Put saves object header in metabase. Object payload expected to be cut.
|
// Put saves object header in metabase. Object payload expected to be cut.
|
||||||
|
@ -290,22 +294,15 @@ func updateSplitInfoIndex(tx *bbolt.Tx, objKey []byte, cnr cid.ID, bucketName []
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type updateIndexItemFunc = func(tx *bbolt.Tx, item namedBucketItem) error
|
type updateIndexItemFunc = func(b *pebble.Batch, key []byte) error
|
||||||
|
|
||||||
func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error {
|
func updateListIndexes(b *pebble.Batch, obj *objectSDK.Object, f updateIndexItemFunc) error {
|
||||||
idObj, _ := obj.ID()
|
idObj, _ := obj.ID()
|
||||||
cnr, _ := obj.ContainerID()
|
cnr, _ := obj.ContainerID()
|
||||||
objKey := objectKey(idObj, make([]byte, objectKeySize))
|
|
||||||
bucketName := make([]byte, bucketKeySize)
|
|
||||||
|
|
||||||
cs, _ := obj.PayloadChecksum()
|
cs, _ := obj.PayloadChecksum()
|
||||||
|
|
||||||
// index payload hashes
|
// index payload hashes
|
||||||
err := f(tx, namedBucketItem{
|
err := f(b, payloadHashKey(cnr, idObj, cs.Value()))
|
||||||
name: payloadHashBucketName(cnr, bucketName),
|
|
||||||
key: cs.Value(),
|
|
||||||
val: objKey,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -314,11 +311,7 @@ func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFun
|
||||||
|
|
||||||
// index parent ids
|
// index parent ids
|
||||||
if ok {
|
if ok {
|
||||||
err := f(tx, namedBucketItem{
|
err := f(b, parentKey(cnr, idParent, idObj))
|
||||||
name: parentBucketName(cnr, bucketName),
|
|
||||||
key: objectKey(idParent, make([]byte, objectKeySize)),
|
|
||||||
val: objKey,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -326,33 +319,35 @@ func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFun
|
||||||
|
|
||||||
// index split ids
|
// index split ids
|
||||||
if obj.SplitID() != nil {
|
if obj.SplitID() != nil {
|
||||||
err := f(tx, namedBucketItem{
|
err := f(b, splitKey(cnr, idObj, obj.SplitID().ToV2()))
|
||||||
name: splitBucketName(cnr, bucketName),
|
|
||||||
key: obj.SplitID().ToV2(),
|
|
||||||
val: objKey,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, attr := range obj.Attributes() {
|
||||||
|
if attr.Key() != objectV2.SysAttributeExpEpochNeoFS && attr.Key() != objectV2.SysAttributeExpEpoch {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
expEpoch, err := strconv.ParseUint(attr.Value(), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return errInvalidUserAttributeKeyFormat
|
||||||
|
}
|
||||||
|
err = f(b, expiredKey(cnr, idObj, expEpoch))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
if ech := obj.ECHeader(); ech != nil {
|
if ech := obj.ECHeader(); ech != nil {
|
||||||
err := f(tx, namedBucketItem{
|
err := f(b, ecInfoKey(cnr, ech.Parent(), idObj))
|
||||||
name: ecInfoBucketName(cnr, bucketName),
|
|
||||||
key: objectKey(ech.Parent(), make([]byte, objectKeySize)),
|
|
||||||
val: objKey,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if ech.ParentSplitID() != nil {
|
if ech.ParentSplitID() != nil {
|
||||||
objKey := objectKey(ech.Parent(), make([]byte, objectKeySize))
|
err := f(b, splitKey(cnr, ech.Parent(), ech.ParentSplitID().ToV2()))
|
||||||
err := f(tx, namedBucketItem{
|
|
||||||
name: splitBucketName(cnr, bucketName),
|
|
||||||
key: ech.ParentSplitID().ToV2(),
|
|
||||||
val: objKey,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -374,17 +369,10 @@ func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFun
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error {
|
func updateFKBTIndexes(b *pebble.Batch, obj *objectSDK.Object, f updateIndexItemFunc) error {
|
||||||
id, _ := obj.ID()
|
id, _ := obj.ID()
|
||||||
cnr, _ := obj.ContainerID()
|
cnr, _ := obj.ContainerID()
|
||||||
objKey := objectKey(id, make([]byte, objectKeySize))
|
err := f(b, ownerKey(cnr, id, []byte(obj.OwnerID().EncodeToString())))
|
||||||
|
|
||||||
key := make([]byte, bucketKeySize)
|
|
||||||
err := f(tx, namedBucketItem{
|
|
||||||
name: ownerBucketName(cnr, key),
|
|
||||||
key: []byte(obj.OwnerID().EncodeToString()),
|
|
||||||
val: objKey,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -392,19 +380,14 @@ func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFun
|
||||||
var attrs []objectSDK.Attribute
|
var attrs []objectSDK.Attribute
|
||||||
if obj.ECHeader() != nil {
|
if obj.ECHeader() != nil {
|
||||||
attrs = obj.ECHeader().ParentAttributes()
|
attrs = obj.ECHeader().ParentAttributes()
|
||||||
objKey = objectKey(obj.ECHeader().Parent(), make([]byte, objectKeySize))
|
id = obj.ECHeader().Parent()
|
||||||
} else {
|
} else {
|
||||||
attrs = obj.Attributes()
|
attrs = obj.Attributes()
|
||||||
}
|
}
|
||||||
|
|
||||||
// user specified attributes
|
// user specified attributes
|
||||||
for i := range attrs {
|
for i := range attrs {
|
||||||
key = attributeBucketName(cnr, attrs[i].Key(), key)
|
err := f(b, attributeKey(cnr, id, attrs[i].Key(), attrs[i].Value()))
|
||||||
err := f(tx, namedBucketItem{
|
|
||||||
name: key,
|
|
||||||
key: []byte(attrs[i].Value()),
|
|
||||||
val: objKey,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue