forked from TrueCloudLab/frostfs-node
[#661] blobovniczatree: Make Rebuild failover safe
Now move info stores in blobovnicza, so in case of failover rebuild completes previous operation first. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
8afb42aae8
commit
4f7f67d136
13 changed files with 616 additions and 35 deletions
|
@ -530,4 +530,15 @@ const (
|
||||||
BlobovniczaTreeRebuildingBlobovnicza = "rebuilding blobovnicza..."
|
BlobovniczaTreeRebuildingBlobovnicza = "rebuilding blobovnicza..."
|
||||||
BlobovniczaTreeRebuildingBlobovniczaFailed = "rebuilding blobovnicza failed"
|
BlobovniczaTreeRebuildingBlobovniczaFailed = "rebuilding blobovnicza failed"
|
||||||
BlobovniczaTreeRebuildingBlobovniczaSuccess = "rebuilding blobovnicza completed successfully"
|
BlobovniczaTreeRebuildingBlobovniczaSuccess = "rebuilding blobovnicza completed successfully"
|
||||||
|
BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza = "could not put move info to source blobovnicza"
|
||||||
|
BlobovniczatreeCouldNotUpdateStorageID = "could not update storage ID"
|
||||||
|
BlobovniczatreeCouldNotDropMoveInfo = "could not drop move info from source blobovnicza"
|
||||||
|
BlobovniczatreeCouldNotDeleteFromSource = "could not delete object from source blobovnicza"
|
||||||
|
BlobovniczaTreeCompletingPreviousRebuild = "completing previous rebuild if failed..."
|
||||||
|
BlobovniczaTreeCompletedPreviousRebuildSuccess = "previous rebuild completed successfully"
|
||||||
|
BlobovniczaTreeCompletedPreviousRebuildFailed = "failed to complete previous rebuild"
|
||||||
|
BlobovniczatreeCouldNotCheckExistenceInSourceDB = "could not check object existence in source blobovnicza"
|
||||||
|
BlobovniczatreeCouldNotCheckExistenceInTargetDB = "could not check object existence in target blobovnicza"
|
||||||
|
BlobovniczatreeCouldNotGetObjectFromSourceDB = "could not get object from source blobovnicza"
|
||||||
|
BlobovniczatreeCouldNotPutObjectToTargetDB = "could not put object to target blobovnicza"
|
||||||
)
|
)
|
||||||
|
|
|
@ -105,7 +105,7 @@ func (b *Blobovnicza) initializeCounters() error {
|
||||||
var size uint64
|
var size uint64
|
||||||
var items uint64
|
var items uint64
|
||||||
err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
return b.iterateAllBuckets(tx, func(lower, upper uint64, b *bbolt.Bucket) (bool, error) {
|
return b.iterateAllDataBuckets(tx, func(lower, upper uint64, b *bbolt.Bucket) (bool, error) {
|
||||||
keysN := uint64(b.Stats().KeyN)
|
keysN := uint64(b.Stats().KeyN)
|
||||||
size += keysN * upper
|
size += keysN * upper
|
||||||
items += keysN
|
items += keysN
|
||||||
|
|
|
@ -51,7 +51,7 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
|
||||||
var dataSize uint64
|
var dataSize uint64
|
||||||
|
|
||||||
err := b.boltDB.Update(func(tx *bbolt.Tx) error {
|
err := b.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
return b.iterateAllBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) {
|
return b.iterateAllDataBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) {
|
||||||
objData := buck.Get(addrKey)
|
objData := buck.Get(addrKey)
|
||||||
if objData == nil {
|
if objData == nil {
|
||||||
// object is not in bucket => continue iterating
|
// object is not in bucket => continue iterating
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package blobovnicza
|
package blobovnicza
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
@ -26,7 +27,10 @@ func (b *Blobovnicza) Exists(ctx context.Context, addr oid.Address) (bool, error
|
||||||
addrKey := addressKey(addr)
|
addrKey := addressKey(addr)
|
||||||
|
|
||||||
err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
return tx.ForEach(func(_ []byte, buck *bbolt.Bucket) error {
|
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
|
||||||
|
if bytes.Equal(bucketName, incompletedMoveBucketName) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
exists = buck.Get(addrKey) != nil
|
exists = buck.Get(addrKey) != nil
|
||||||
if exists {
|
if exists {
|
||||||
return errInterruptForEach
|
return errInterruptForEach
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package blobovnicza
|
package blobovnicza
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
|
@ -57,7 +58,11 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
||||||
)
|
)
|
||||||
|
|
||||||
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
return tx.ForEach(func(_ []byte, buck *bbolt.Bucket) error {
|
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
|
||||||
|
if bytes.Equal(bucketName, incompletedMoveBucketName) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
data = buck.Get(addrKey)
|
data = buck.Get(addrKey)
|
||||||
if data == nil {
|
if data == nil {
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package blobovnicza
|
package blobovnicza
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
@ -12,11 +13,11 @@ import (
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
// iterateAllBuckets iterates all buckets in db
|
// iterateAllDataBuckets iterates all buckets in db
|
||||||
//
|
//
|
||||||
// If the maximum size of the object (b.objSizeLimit) has been changed to lower value,
|
// If the maximum size of the object (b.objSizeLimit) has been changed to lower value,
|
||||||
// then there may be more buckets than the current limit of the object size.
|
// then there may be more buckets than the current limit of the object size.
|
||||||
func (b *Blobovnicza) iterateAllBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error {
|
func (b *Blobovnicza) iterateAllDataBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error {
|
||||||
return b.iterateBucketKeys(false, func(lower uint64, upper uint64, key []byte) (bool, error) {
|
return b.iterateBucketKeys(false, func(lower uint64, upper uint64, key []byte) (bool, error) {
|
||||||
buck := tx.Bucket(key)
|
buck := tx.Bucket(key)
|
||||||
if buck == nil {
|
if buck == nil {
|
||||||
|
@ -139,7 +140,10 @@ func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes,
|
||||||
var elem IterationElement
|
var elem IterationElement
|
||||||
|
|
||||||
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
return tx.ForEach(func(name []byte, buck *bbolt.Bucket) error {
|
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
|
||||||
|
if bytes.Equal(bucketName, incompletedMoveBucketName) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return buck.ForEach(func(k, v []byte) error {
|
return buck.ForEach(func(k, v []byte) error {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
108
pkg/local_object_storage/blobovnicza/move.go
Normal file
108
pkg/local_object_storage/blobovnicza/move.go
Normal file
|
@ -0,0 +1,108 @@
|
||||||
|
package blobovnicza
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
)
|
||||||
|
|
||||||
|
var incompletedMoveBucketName = []byte("INCOMPLETED_MOVE")
|
||||||
|
|
||||||
|
type MoveInfo struct {
|
||||||
|
Address oid.Address
|
||||||
|
TargetStorageID []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Blobovnicza) PutMoveInfo(ctx context.Context, prm MoveInfo) error {
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.PutMoveInfo",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("path", b.path),
|
||||||
|
attribute.String("address", prm.Address.EncodeToString()),
|
||||||
|
attribute.String("target_storage_id", string(prm.TargetStorageID)),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
key := addressKey(prm.Address)
|
||||||
|
|
||||||
|
return b.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
|
bucket, err := tx.CreateBucketIfNotExists(incompletedMoveBucketName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := bucket.Put(key, prm.TargetStorageID); err != nil {
|
||||||
|
return fmt.Errorf("(%T) failed to save move info: %w", b, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Blobovnicza) DropMoveInfo(ctx context.Context, address oid.Address) error {
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.DropMoveInfo",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("path", b.path),
|
||||||
|
attribute.String("address", address.EncodeToString()),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
key := addressKey(address)
|
||||||
|
|
||||||
|
return b.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
|
bucket := tx.Bucket(incompletedMoveBucketName)
|
||||||
|
if bucket == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := bucket.Delete(key); err != nil {
|
||||||
|
return fmt.Errorf("(%T) failed to drop move info: %w", b, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c := bucket.Cursor()
|
||||||
|
k, v := c.First()
|
||||||
|
bucketEmpty := k == nil && v == nil
|
||||||
|
if bucketEmpty {
|
||||||
|
return tx.DeleteBucket(incompletedMoveBucketName)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Blobovnicza) ListMoveInfo(ctx context.Context) ([]MoveInfo, error) {
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.ListMoveInfo",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("path", b.path),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
var result []MoveInfo
|
||||||
|
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
bucket := tx.Bucket(incompletedMoveBucketName)
|
||||||
|
if bucket == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return bucket.ForEach(func(k, v []byte) error {
|
||||||
|
var addr oid.Address
|
||||||
|
storageID := make([]byte, len(v))
|
||||||
|
if err := addressFromKey(&addr, k); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
copy(storageID, v)
|
||||||
|
result = append(result, MoveInfo{
|
||||||
|
Address: addr,
|
||||||
|
TargetStorageID: storageID,
|
||||||
|
})
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
|
@ -56,11 +56,12 @@ import (
|
||||||
type Blobovniczas struct {
|
type Blobovniczas struct {
|
||||||
cfg
|
cfg
|
||||||
|
|
||||||
commondbManager *dbManager
|
commondbManager *dbManager
|
||||||
activeDBManager *activeDBManager
|
activeDBManager *activeDBManager
|
||||||
dbCache *dbCache
|
dbCache *dbCache
|
||||||
dbFilesGuard *sync.RWMutex
|
deleteProtectedObjects *addressMap
|
||||||
rebuildGuard *sync.RWMutex
|
dbFilesGuard *sync.RWMutex
|
||||||
|
rebuildGuard *sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ common.Storage = (*Blobovniczas)(nil)
|
var _ common.Storage = (*Blobovniczas)(nil)
|
||||||
|
@ -87,6 +88,7 @@ func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) {
|
||||||
blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.readOnly, blz.metrics.Blobovnicza(), blz.log)
|
blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.readOnly, blz.metrics.Blobovnicza(), blz.log)
|
||||||
blz.activeDBManager = newActiveDBManager(blz.commondbManager, blz.blzLeafWidth)
|
blz.activeDBManager = newActiveDBManager(blz.commondbManager, blz.blzLeafWidth)
|
||||||
blz.dbCache = newDBCache(blz.openedCacheSize, blz.commondbManager)
|
blz.dbCache = newDBCache(blz.openedCacheSize, blz.commondbManager)
|
||||||
|
blz.deleteProtectedObjects = newAddressMap()
|
||||||
blz.dbFilesGuard = &sync.RWMutex{}
|
blz.dbFilesGuard = &sync.RWMutex{}
|
||||||
blz.rebuildGuard = &sync.RWMutex{}
|
blz.rebuildGuard = &sync.RWMutex{}
|
||||||
|
|
||||||
|
|
|
@ -57,12 +57,20 @@ func (b *Blobovniczas) initializeDBs(ctx context.Context) error {
|
||||||
visited[p] = struct{}{}
|
visited[p] = struct{}{}
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
shBlz := b.getBlobovniczaWithoutCaching(p)
|
shBlz := b.getBlobovniczaWithoutCaching(p)
|
||||||
_, err := shBlz.Open()
|
blz, err := shBlz.Open()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer shBlz.Close()
|
defer shBlz.Close()
|
||||||
|
|
||||||
|
moveInfo, err := blz.ListMoveInfo(egCtx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, move := range moveInfo {
|
||||||
|
b.deleteProtectedObjects.Add(move.Address)
|
||||||
|
}
|
||||||
|
|
||||||
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
|
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
|
@ -3,6 +3,7 @@ package blobovniczatree
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
"errors"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
@ -17,6 +18,8 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var errObjectIsDeleteProtected = errors.New("object is delete protected")
|
||||||
|
|
||||||
// Delete deletes object from blobovnicza tree.
|
// Delete deletes object from blobovnicza tree.
|
||||||
//
|
//
|
||||||
// If blobocvnicza ID is specified, only this blobovnicza is processed.
|
// If blobocvnicza ID is specified, only this blobovnicza is processed.
|
||||||
|
@ -48,6 +51,10 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
||||||
return common.DeleteRes{}, errRebuildInProgress
|
return common.DeleteRes{}, errRebuildInProgress
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if b.deleteProtectedObjects.Contains(prm.Address) {
|
||||||
|
return common.DeleteRes{}, errObjectIsDeleteProtected
|
||||||
|
}
|
||||||
|
|
||||||
var bPrm blobovnicza.DeletePrm
|
var bPrm blobovnicza.DeletePrm
|
||||||
bPrm.SetAddress(prm.Address)
|
bPrm.SetAddress(prm.Address)
|
||||||
|
|
||||||
|
|
|
@ -170,16 +170,14 @@ func (b *Blobovniczas) iterateSorted(ctx context.Context, addr *oid.Address, cur
|
||||||
//
|
//
|
||||||
// Uses existed blobovnicza files for iteration.
|
// Uses existed blobovnicza files for iteration.
|
||||||
func (b *Blobovniczas) iterateExistingDBPaths(ctx context.Context, f func(string) (bool, error)) error {
|
func (b *Blobovniczas) iterateExistingDBPaths(ctx context.Context, f func(string) (bool, error)) error {
|
||||||
|
b.dbFilesGuard.RLock()
|
||||||
|
defer b.dbFilesGuard.RUnlock()
|
||||||
|
|
||||||
_, err := b.iterateExistingDBPathsDFS(ctx, "", f)
|
_, err := b.iterateExistingDBPathsDFS(ctx, "", f)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blobovniczas) iterateExistingDBPathsDFS(ctx context.Context, path string, f func(string) (bool, error)) (bool, error) {
|
func (b *Blobovniczas) iterateExistingDBPathsDFS(ctx context.Context, path string, f func(string) (bool, error)) (bool, error) {
|
||||||
if path == "" {
|
|
||||||
b.dbFilesGuard.RLock()
|
|
||||||
defer b.dbFilesGuard.RUnlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
sysPath := filepath.Join(b.rootPath, path)
|
sysPath := filepath.Join(b.rootPath, path)
|
||||||
entries, err := os.ReadDir(sysPath)
|
entries, err := os.ReadDir(sysPath)
|
||||||
if os.IsNotExist(err) && b.readOnly && path == "" { //non initialized tree in read only mode
|
if os.IsNotExist(err) && b.readOnly && path == "" { //non initialized tree in read only mode
|
||||||
|
@ -216,16 +214,14 @@ func (b *Blobovniczas) iterateExistingDBPathsDFS(ctx context.Context, path strin
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Address, f func(string) (bool, error)) error {
|
func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Address, f func(string) (bool, error)) error {
|
||||||
|
b.dbFilesGuard.RLock()
|
||||||
|
defer b.dbFilesGuard.RUnlock()
|
||||||
|
|
||||||
_, err := b.iterateSordedDBPathsInternal(ctx, "", addr, f)
|
_, err := b.iterateSordedDBPathsInternal(ctx, "", addr, f)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
|
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
|
||||||
if path == "" {
|
|
||||||
b.dbFilesGuard.RLock()
|
|
||||||
defer b.dbFilesGuard.RUnlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
sysPath := filepath.Join(b.rootPath, path)
|
sysPath := filepath.Join(b.rootPath, path)
|
||||||
entries, err := os.ReadDir(sysPath)
|
entries, err := os.ReadDir(sysPath)
|
||||||
if os.IsNotExist(err) && b.readOnly && path == "" { //non initialized tree in read only mode
|
if os.IsNotExist(err) && b.readOnly && path == "" { //non initialized tree in read only mode
|
||||||
|
|
|
@ -5,11 +5,13 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -24,8 +26,18 @@ func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (comm
|
||||||
b.rebuildGuard.Lock()
|
b.rebuildGuard.Lock()
|
||||||
defer b.rebuildGuard.Unlock()
|
defer b.rebuildGuard.Unlock()
|
||||||
|
|
||||||
b.log.Debug(logs.BlobovniczaTreeCollectingDBToRebuild)
|
|
||||||
var res common.RebuildRes
|
var res common.RebuildRes
|
||||||
|
|
||||||
|
b.log.Debug(logs.BlobovniczaTreeCompletingPreviousRebuild)
|
||||||
|
completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage)
|
||||||
|
res.ObjectsMoved += completedPreviosMoves
|
||||||
|
if err != nil {
|
||||||
|
b.log.Warn(logs.BlobovniczaTreeCompletedPreviousRebuildFailed, zap.Error(err))
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
b.log.Debug(logs.BlobovniczaTreeCompletedPreviousRebuildSuccess)
|
||||||
|
|
||||||
|
b.log.Debug(logs.BlobovniczaTreeCollectingDBToRebuild)
|
||||||
dbsToMigrate, err := b.getDBsToRebuild(ctx)
|
dbsToMigrate, err := b.getDBsToRebuild(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.log.Warn(logs.BlobovniczaTreeCollectingDBToRebuildFailed, zap.Error(err))
|
b.log.Warn(logs.BlobovniczaTreeCollectingDBToRebuildFailed, zap.Error(err))
|
||||||
|
@ -82,7 +94,7 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M
|
||||||
shDB.Close()
|
shDB.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
migratedObjects, err := b.moveObjects(ctx, blz, meta)
|
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return migratedObjects, err
|
return migratedObjects, err
|
||||||
}
|
}
|
||||||
|
@ -90,13 +102,13 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M
|
||||||
return migratedObjects, err
|
return migratedObjects, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, meta common.MetaStorage) (uint64, error) {
|
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage) (uint64, error) {
|
||||||
var result uint64
|
var result uint64
|
||||||
|
|
||||||
var prm blobovnicza.IteratePrm
|
var prm blobovnicza.IteratePrm
|
||||||
prm.DecodeAddresses()
|
prm.DecodeAddresses()
|
||||||
prm.SetHandler(func(ie blobovnicza.IterationElement) error {
|
prm.SetHandler(func(ie blobovnicza.IterationElement) error {
|
||||||
e := b.moveObject(ctx, ie.Address(), ie.ObjectData(), meta)
|
e := b.moveObject(ctx, blz, blzPath, ie.Address(), ie.ObjectData(), meta)
|
||||||
if e == nil {
|
if e == nil {
|
||||||
result++
|
result++
|
||||||
}
|
}
|
||||||
|
@ -107,15 +119,28 @@ func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovn
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blobovniczas) moveObject(ctx context.Context, addr oid.Address, data []byte, metaStore common.MetaStorage) error {
|
func (b *Blobovniczas) moveObject(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
|
||||||
var pPrm common.PutPrm
|
addr oid.Address, data []byte, metaStore common.MetaStorage) error {
|
||||||
pPrm.Address = addr
|
it := &moveIterator{
|
||||||
pPrm.RawData = data
|
B: b,
|
||||||
pRes, err := b.Put(ctx, pPrm)
|
ID: nil,
|
||||||
if err != nil {
|
AllFull: true,
|
||||||
return err
|
Address: addr,
|
||||||
|
ObjectData: data,
|
||||||
|
MetaStore: metaStore,
|
||||||
|
Source: source,
|
||||||
|
SourceSysPath: sourcePath,
|
||||||
}
|
}
|
||||||
return metaStore.UpdateStorageID(ctx, addr, pRes.StorageID)
|
|
||||||
|
if err := b.iterateDeepest(ctx, addr, func(lvlPath string) (bool, error) { return it.tryMoveToLvl(ctx, lvlPath) }); err != nil {
|
||||||
|
return err
|
||||||
|
} else if it.ID == nil {
|
||||||
|
if it.AllFull {
|
||||||
|
return common.ErrNoSpace
|
||||||
|
}
|
||||||
|
return errPutFailed
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blobovniczas) dropDB(ctx context.Context, path string, shDb *sharedDB) (bool, error) {
|
func (b *Blobovniczas) dropDB(ctx context.Context, path string, shDb *sharedDB) (bool, error) {
|
||||||
|
@ -159,3 +184,222 @@ func (b *Blobovniczas) dropDirectoryIfEmpty(path string) error {
|
||||||
}
|
}
|
||||||
return b.dropDirectoryIfEmpty(filepath.Dir(path))
|
return b.dropDirectoryIfEmpty(filepath.Dir(path))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage) (uint64, error) {
|
||||||
|
var count uint64
|
||||||
|
return count, b.iterateExistingDBPaths(ctx, func(s string) (bool, error) {
|
||||||
|
shDB := b.getBlobovnicza(s)
|
||||||
|
blz, err := shDB.Open()
|
||||||
|
if err != nil {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
defer shDB.Close()
|
||||||
|
|
||||||
|
incompletedMoves, err := blz.ListMoveInfo(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, move := range incompletedMoves {
|
||||||
|
if err := b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore); err != nil {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Blobovniczas) performMove(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
|
||||||
|
move blobovnicza.MoveInfo, metaStore common.MetaStorage) error {
|
||||||
|
targetDB := b.getBlobovnicza(NewIDFromBytes(move.TargetStorageID).Path())
|
||||||
|
target, err := targetDB.Open()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer targetDB.Close()
|
||||||
|
|
||||||
|
existsInSource := true
|
||||||
|
var gPrm blobovnicza.GetPrm
|
||||||
|
gPrm.SetAddress(move.Address)
|
||||||
|
gRes, err := source.Get(ctx, gPrm)
|
||||||
|
if err != nil {
|
||||||
|
if client.IsErrObjectNotFound(err) {
|
||||||
|
existsInSource = false
|
||||||
|
} else {
|
||||||
|
b.log.Warn(logs.BlobovniczatreeCouldNotCheckExistenceInTargetDB, zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !existsInSource { //object was deleted by Rebuild, need to delete move info
|
||||||
|
if err = source.DropMoveInfo(ctx, move.Address); err != nil {
|
||||||
|
b.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
b.deleteProtectedObjects.Delete(move.Address)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
existsInTarget, err := target.Exists(ctx, move.Address)
|
||||||
|
if err != nil {
|
||||||
|
b.log.Warn(logs.BlobovniczatreeCouldNotCheckExistenceInTargetDB, zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !existsInTarget {
|
||||||
|
var putPrm blobovnicza.PutPrm
|
||||||
|
putPrm.SetAddress(move.Address)
|
||||||
|
putPrm.SetMarshaledObject(gRes.Object())
|
||||||
|
_, err = target.Put(ctx, putPrm)
|
||||||
|
if err != nil {
|
||||||
|
b.log.Warn(logs.BlobovniczatreeCouldNotPutObjectToTargetDB, zap.String("path", targetDB.SystemPath()), zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = metaStore.UpdateStorageID(ctx, move.Address, move.TargetStorageID); err != nil {
|
||||||
|
b.log.Warn(logs.BlobovniczatreeCouldNotUpdateStorageID, zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var deletePrm blobovnicza.DeletePrm
|
||||||
|
deletePrm.SetAddress(move.Address)
|
||||||
|
if _, err = source.Delete(ctx, deletePrm); err != nil {
|
||||||
|
b.log.Warn(logs.BlobovniczatreeCouldNotDeleteFromSource, zap.String("path", sourcePath), zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = source.DropMoveInfo(ctx, move.Address); err != nil {
|
||||||
|
b.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
b.deleteProtectedObjects.Delete(move.Address)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type moveIterator struct {
|
||||||
|
B *Blobovniczas
|
||||||
|
ID *ID
|
||||||
|
AllFull bool
|
||||||
|
Address oid.Address
|
||||||
|
ObjectData []byte
|
||||||
|
MetaStore common.MetaStorage
|
||||||
|
Source *blobovnicza.Blobovnicza
|
||||||
|
SourceSysPath string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *moveIterator) tryMoveToLvl(ctx context.Context, lvlPath string) (bool, error) {
|
||||||
|
target, err := i.B.activeDBManager.GetOpenedActiveDBForLevel(lvlPath)
|
||||||
|
if err != nil {
|
||||||
|
if !isLogical(err) {
|
||||||
|
i.B.reportError(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, err)
|
||||||
|
} else {
|
||||||
|
i.B.log.Warn(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, zap.Error(err))
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if target == nil {
|
||||||
|
i.B.log.Warn(logs.BlobovniczatreeBlobovniczaOverflowed, zap.String("level", lvlPath))
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
defer target.Close()
|
||||||
|
|
||||||
|
i.AllFull = false
|
||||||
|
|
||||||
|
targetIDx := u64FromHexString(filepath.Base(target.SystemPath()))
|
||||||
|
targetStorageID := NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(targetIDx))))
|
||||||
|
|
||||||
|
if err = i.Source.PutMoveInfo(ctx, blobovnicza.MoveInfo{
|
||||||
|
Address: i.Address,
|
||||||
|
TargetStorageID: targetStorageID.Bytes(),
|
||||||
|
}); err != nil {
|
||||||
|
if !isLogical(err) {
|
||||||
|
i.B.reportError(logs.BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza, err)
|
||||||
|
} else {
|
||||||
|
i.B.log.Warn(logs.BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza, zap.String("path", i.SourceSysPath), zap.Error(err))
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
i.B.deleteProtectedObjects.Add(i.Address)
|
||||||
|
|
||||||
|
var putPrm blobovnicza.PutPrm
|
||||||
|
putPrm.SetAddress(i.Address)
|
||||||
|
putPrm.SetMarshaledObject(i.ObjectData)
|
||||||
|
|
||||||
|
_, err = target.Blobovnicza().Put(ctx, putPrm)
|
||||||
|
if err != nil {
|
||||||
|
if !isLogical(err) {
|
||||||
|
i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err)
|
||||||
|
} else {
|
||||||
|
i.B.log.Warn(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, zap.String("path", target.SystemPath()), zap.Error(err))
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = i.MetaStore.UpdateStorageID(ctx, i.Address, targetStorageID.Bytes()); err != nil {
|
||||||
|
i.B.log.Warn(logs.BlobovniczatreeCouldNotUpdateStorageID, zap.Error(err))
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var deletePrm blobovnicza.DeletePrm
|
||||||
|
deletePrm.SetAddress(i.Address)
|
||||||
|
if _, err = i.Source.Delete(ctx, deletePrm); err != nil {
|
||||||
|
if !isLogical(err) {
|
||||||
|
i.B.reportError(logs.BlobovniczatreeCouldNotDeleteFromSource, err)
|
||||||
|
} else {
|
||||||
|
i.B.log.Warn(logs.BlobovniczatreeCouldNotDeleteFromSource, zap.String("path", i.SourceSysPath), zap.Error(err))
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = i.Source.DropMoveInfo(ctx, i.Address); err != nil {
|
||||||
|
if !isLogical(err) {
|
||||||
|
i.B.reportError(logs.BlobovniczatreeCouldNotDropMoveInfo, err)
|
||||||
|
} else {
|
||||||
|
i.B.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", i.SourceSysPath), zap.Error(err))
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
i.B.deleteProtectedObjects.Delete(i.Address)
|
||||||
|
|
||||||
|
i.ID = targetStorageID
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type addressMap struct {
|
||||||
|
data map[oid.Address]struct{}
|
||||||
|
guard *sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func newAddressMap() *addressMap {
|
||||||
|
return &addressMap{
|
||||||
|
data: make(map[oid.Address]struct{}),
|
||||||
|
guard: &sync.RWMutex{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *addressMap) Add(address oid.Address) {
|
||||||
|
m.guard.Lock()
|
||||||
|
defer m.guard.Unlock()
|
||||||
|
|
||||||
|
m.data[address] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *addressMap) Delete(address oid.Address) {
|
||||||
|
m.guard.Lock()
|
||||||
|
defer m.guard.Unlock()
|
||||||
|
|
||||||
|
delete(m.data, address)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *addressMap) Contains(address oid.Address) bool {
|
||||||
|
m.guard.RLock()
|
||||||
|
defer m.guard.RUnlock()
|
||||||
|
|
||||||
|
_, contains := m.data[address]
|
||||||
|
return contains
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,192 @@
|
||||||
|
package blobovniczatree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRebuildFailover(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
t.Run("only move info saved", testRebuildFailoverOnlyMoveInfoSaved)
|
||||||
|
|
||||||
|
t.Run("object saved to target", testRebuildFailoverObjectSavedToTarget)
|
||||||
|
|
||||||
|
t.Run("object deleted from source", testRebuildFailoverObjectDeletedFromSource)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testRebuildFailoverOnlyMoveInfoSaved(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
dir := t.TempDir()
|
||||||
|
|
||||||
|
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
|
||||||
|
require.NoError(t, blz.Open())
|
||||||
|
require.NoError(t, blz.Init())
|
||||||
|
|
||||||
|
obj := blobstortest.NewObject(1024)
|
||||||
|
data, err := obj.Marshal()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var pPrm blobovnicza.PutPrm
|
||||||
|
pPrm.SetAddress(object.AddressOf(obj))
|
||||||
|
pPrm.SetMarshaledObject(data)
|
||||||
|
_, err = blz.Put(context.Background(), pPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, blz.PutMoveInfo(context.Background(), blobovnicza.MoveInfo{
|
||||||
|
Address: object.AddressOf(obj),
|
||||||
|
TargetStorageID: []byte("0/0/0"),
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.NoError(t, blz.Close())
|
||||||
|
|
||||||
|
testRebuildFailoverValidate(t, dir, obj, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testRebuildFailoverObjectSavedToTarget(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
dir := t.TempDir()
|
||||||
|
|
||||||
|
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
|
||||||
|
require.NoError(t, blz.Open())
|
||||||
|
require.NoError(t, blz.Init())
|
||||||
|
|
||||||
|
obj := blobstortest.NewObject(1024)
|
||||||
|
data, err := obj.Marshal()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var pPrm blobovnicza.PutPrm
|
||||||
|
pPrm.SetAddress(object.AddressOf(obj))
|
||||||
|
pPrm.SetMarshaledObject(data)
|
||||||
|
_, err = blz.Put(context.Background(), pPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, blz.PutMoveInfo(context.Background(), blobovnicza.MoveInfo{
|
||||||
|
Address: object.AddressOf(obj),
|
||||||
|
TargetStorageID: []byte("0/0/0"),
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.NoError(t, blz.Close())
|
||||||
|
|
||||||
|
blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db")))
|
||||||
|
require.NoError(t, blz.Open())
|
||||||
|
require.NoError(t, blz.Init())
|
||||||
|
|
||||||
|
_, err = blz.Put(context.Background(), pPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, blz.Close())
|
||||||
|
|
||||||
|
testRebuildFailoverValidate(t, dir, obj, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testRebuildFailoverObjectDeletedFromSource(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
dir := t.TempDir()
|
||||||
|
|
||||||
|
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
|
||||||
|
require.NoError(t, blz.Open())
|
||||||
|
require.NoError(t, blz.Init())
|
||||||
|
|
||||||
|
obj := blobstortest.NewObject(1024)
|
||||||
|
data, err := obj.Marshal()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, blz.PutMoveInfo(context.Background(), blobovnicza.MoveInfo{
|
||||||
|
Address: object.AddressOf(obj),
|
||||||
|
TargetStorageID: []byte("0/0/0"),
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.NoError(t, blz.Close())
|
||||||
|
|
||||||
|
blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db")))
|
||||||
|
require.NoError(t, blz.Open())
|
||||||
|
require.NoError(t, blz.Init())
|
||||||
|
|
||||||
|
var pPrm blobovnicza.PutPrm
|
||||||
|
pPrm.SetAddress(object.AddressOf(obj))
|
||||||
|
pPrm.SetMarshaledObject(data)
|
||||||
|
_, err = blz.Put(context.Background(), pPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, blz.Close())
|
||||||
|
|
||||||
|
testRebuildFailoverValidate(t, dir, obj, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object, mustUpdateStorageID bool) {
|
||||||
|
b := NewBlobovniczaTree(
|
||||||
|
WithLogger(test.NewLogger(t, true)),
|
||||||
|
WithObjectSizeLimit(2048),
|
||||||
|
WithBlobovniczaShallowWidth(2),
|
||||||
|
WithBlobovniczaShallowDepth(2),
|
||||||
|
WithRootPath(dir),
|
||||||
|
WithBlobovniczaSize(100*1024*1024),
|
||||||
|
WithWaitBeforeDropDB(0),
|
||||||
|
WithOpenedCacheSize(1000))
|
||||||
|
require.NoError(t, b.Open(false))
|
||||||
|
require.NoError(t, b.Init())
|
||||||
|
|
||||||
|
var dPrm common.DeletePrm
|
||||||
|
dPrm.Address = object.AddressOf(obj)
|
||||||
|
dPrm.StorageID = []byte("0/0/1")
|
||||||
|
_, err := b.Delete(context.Background(), dPrm)
|
||||||
|
require.ErrorIs(t, err, errObjectIsDeleteProtected)
|
||||||
|
|
||||||
|
metaStub := &storageIDUpdateStub{
|
||||||
|
storageIDs: make(map[oid.Address][]byte),
|
||||||
|
}
|
||||||
|
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
|
||||||
|
MetaStorage: metaStub,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, uint64(1), rRes.ObjectsMoved)
|
||||||
|
require.Equal(t, uint64(0), rRes.FilesRemoved)
|
||||||
|
|
||||||
|
require.NoError(t, b.Close())
|
||||||
|
|
||||||
|
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
|
||||||
|
require.NoError(t, blz.Open())
|
||||||
|
require.NoError(t, blz.Init())
|
||||||
|
|
||||||
|
moveInfo, err := blz.ListMoveInfo(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, len(moveInfo))
|
||||||
|
|
||||||
|
var gPrm blobovnicza.GetPrm
|
||||||
|
gPrm.SetAddress(object.AddressOf(obj))
|
||||||
|
_, err = blz.Get(context.Background(), gPrm)
|
||||||
|
require.True(t, client.IsErrObjectNotFound(err))
|
||||||
|
|
||||||
|
require.NoError(t, blz.Close())
|
||||||
|
|
||||||
|
blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db")))
|
||||||
|
require.NoError(t, blz.Open())
|
||||||
|
require.NoError(t, blz.Init())
|
||||||
|
|
||||||
|
moveInfo, err = blz.ListMoveInfo(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, len(moveInfo))
|
||||||
|
|
||||||
|
gRes, err := blz.Get(context.Background(), gPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, len(gRes.Object()) > 0)
|
||||||
|
|
||||||
|
if mustUpdateStorageID {
|
||||||
|
require.True(t, bytes.Equal([]byte("0/0/0"), metaStub.storageIDs[object.AddressOf(obj)]))
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, blz.Close())
|
||||||
|
}
|
Loading…
Reference in a new issue