forked from TrueCloudLab/frostfs-node
122 lines
2.8 KiB
Go
122 lines
2.8 KiB
Go
package blobovnicza
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"syscall"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"go.etcd.io/bbolt"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
)
|
|
|
|
var incompletedMoveBucketName = []byte("INCOMPLETED_MOVE")
|
|
|
|
type MoveInfo struct {
|
|
Address oid.Address
|
|
TargetStorageID []byte
|
|
}
|
|
|
|
func (b *Blobovnicza) PutMoveInfo(ctx context.Context, prm MoveInfo) error {
|
|
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.PutMoveInfo",
|
|
trace.WithAttributes(
|
|
attribute.String("path", b.path),
|
|
attribute.String("address", prm.Address.EncodeToString()),
|
|
attribute.String("target_storage_id", string(prm.TargetStorageID)),
|
|
))
|
|
defer span.End()
|
|
|
|
b.controlMtx.RLock()
|
|
defer b.controlMtx.RUnlock()
|
|
|
|
key := addressKey(prm.Address)
|
|
|
|
err := b.boltDB.Update(func(tx *bbolt.Tx) error {
|
|
bucket, err := tx.CreateBucketIfNotExists(incompletedMoveBucketName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := bucket.Put(key, prm.TargetStorageID); err != nil {
|
|
return fmt.Errorf("(%T) failed to save move info: %w", b, err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if errors.Is(err, syscall.ENOSPC) {
|
|
err = ErrNoSpace
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (b *Blobovnicza) DropMoveInfo(ctx context.Context, address oid.Address) error {
|
|
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.DropMoveInfo",
|
|
trace.WithAttributes(
|
|
attribute.String("path", b.path),
|
|
attribute.String("address", address.EncodeToString()),
|
|
))
|
|
defer span.End()
|
|
|
|
key := addressKey(address)
|
|
|
|
err := b.boltDB.Update(func(tx *bbolt.Tx) error {
|
|
bucket := tx.Bucket(incompletedMoveBucketName)
|
|
if bucket == nil {
|
|
return nil
|
|
}
|
|
|
|
if err := bucket.Delete(key); err != nil {
|
|
return fmt.Errorf("(%T) failed to drop move info: %w", b, err)
|
|
}
|
|
|
|
c := bucket.Cursor()
|
|
k, v := c.First()
|
|
bucketEmpty := k == nil && v == nil
|
|
if bucketEmpty {
|
|
return tx.DeleteBucket(incompletedMoveBucketName)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if errors.Is(err, syscall.ENOSPC) {
|
|
err = ErrNoSpace
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (b *Blobovnicza) ListMoveInfo(ctx context.Context) ([]MoveInfo, error) {
|
|
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.ListMoveInfo",
|
|
trace.WithAttributes(
|
|
attribute.String("path", b.path),
|
|
))
|
|
defer span.End()
|
|
|
|
var result []MoveInfo
|
|
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
|
bucket := tx.Bucket(incompletedMoveBucketName)
|
|
if bucket == nil {
|
|
return nil
|
|
}
|
|
return bucket.ForEach(func(k, v []byte) error {
|
|
var addr oid.Address
|
|
storageID := make([]byte, len(v))
|
|
if err := addressFromKey(&addr, k); err != nil {
|
|
return err
|
|
}
|
|
copy(storageID, v)
|
|
result = append(result, MoveInfo{
|
|
Address: addr,
|
|
TargetStorageID: storageID,
|
|
})
|
|
return nil
|
|
})
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return result, nil
|
|
}
|