From b2769ca3dec21420a800ed79d56ea2abe608084c Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 27 Sep 2023 16:25:15 +0300 Subject: [PATCH] [#661] blobovniczatree: Make Rebuild failover safe Now move info stores in blobovnicza, so in case of failover rebuild completes previous operation first. Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 11 + .../blobovnicza/control.go | 2 +- .../blobovnicza/delete.go | 2 +- .../blobovnicza/exists.go | 6 +- pkg/local_object_storage/blobovnicza/get.go | 6 +- .../blobovnicza/iterate.go | 10 +- pkg/local_object_storage/blobovnicza/move.go | 108 +++++++ .../blobstor/blobovniczatree/blobovnicza.go | 12 +- .../blobstor/blobovniczatree/control.go | 10 +- .../blobstor/blobovniczatree/delete.go | 7 + .../blobstor/blobovniczatree/iterate.go | 16 +- .../blobstor/blobovniczatree/rebuild.go | 268 +++++++++++++++++- .../blobovniczatree/rebuild_failover_test.go | 192 +++++++++++++ 13 files changed, 615 insertions(+), 35 deletions(-) create mode 100644 pkg/local_object_storage/blobovnicza/move.go create mode 100644 pkg/local_object_storage/blobstor/blobovniczatree/rebuild_failover_test.go diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 6a466720..17ba3399 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -537,4 +537,15 @@ const ( BlobovniczaTreeRebuildingBlobovnicza = "rebuilding blobovnicza..." BlobovniczaTreeRebuildingBlobovniczaFailed = "rebuilding blobovnicza failed" BlobovniczaTreeRebuildingBlobovniczaSuccess = "rebuilding blobovnicza completed successfully" + BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza = "could not put move info to source blobovnicza" + BlobovniczatreeCouldNotUpdateStorageID = "could not update storage ID" + BlobovniczatreeCouldNotDropMoveInfo = "could not drop move info from source blobovnicza" + BlobovniczatreeCouldNotDeleteFromSource = "could not delete object from source blobovnicza" + BlobovniczaTreeCompletingPreviousRebuild = "completing previous rebuild if failed..." + BlobovniczaTreeCompletedPreviousRebuildSuccess = "previous rebuild completed successfully" + BlobovniczaTreeCompletedPreviousRebuildFailed = "failed to complete previous rebuild" + BlobovniczatreeCouldNotCheckExistenceInSourceDB = "could not check object existence in source blobovnicza" + BlobovniczatreeCouldNotCheckExistenceInTargetDB = "could not check object existence in target blobovnicza" + BlobovniczatreeCouldNotGetObjectFromSourceDB = "could not get object from source blobovnicza" + BlobovniczatreeCouldNotPutObjectToTargetDB = "could not put object to target blobovnicza" ) diff --git a/pkg/local_object_storage/blobovnicza/control.go b/pkg/local_object_storage/blobovnicza/control.go index ad554a0a..c1d7d49a 100644 --- a/pkg/local_object_storage/blobovnicza/control.go +++ b/pkg/local_object_storage/blobovnicza/control.go @@ -105,7 +105,7 @@ func (b *Blobovnicza) initializeCounters() error { var size uint64 var items uint64 err := b.boltDB.View(func(tx *bbolt.Tx) error { - return b.iterateAllBuckets(tx, func(lower, upper uint64, b *bbolt.Bucket) (bool, error) { + return b.iterateAllDataBuckets(tx, func(lower, upper uint64, b *bbolt.Bucket) (bool, error) { keysN := uint64(b.Stats().KeyN) size += keysN * upper items += keysN diff --git a/pkg/local_object_storage/blobovnicza/delete.go b/pkg/local_object_storage/blobovnicza/delete.go index d256c1c6..0f92ea3f 100644 --- a/pkg/local_object_storage/blobovnicza/delete.go +++ b/pkg/local_object_storage/blobovnicza/delete.go @@ -51,7 +51,7 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err var dataSize uint64 err := b.boltDB.Update(func(tx *bbolt.Tx) error { - return b.iterateAllBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) { + return b.iterateAllDataBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) { objData := buck.Get(addrKey) if objData == nil { // object is not in bucket => continue iterating diff --git a/pkg/local_object_storage/blobovnicza/exists.go b/pkg/local_object_storage/blobovnicza/exists.go index 9b190416..8cafc72a 100644 --- a/pkg/local_object_storage/blobovnicza/exists.go +++ b/pkg/local_object_storage/blobovnicza/exists.go @@ -1,6 +1,7 @@ package blobovnicza import ( + "bytes" "context" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" @@ -24,7 +25,10 @@ func (b *Blobovnicza) Exists(ctx context.Context, addr oid.Address) (bool, error addrKey := addressKey(addr) err := b.boltDB.View(func(tx *bbolt.Tx) error { - return tx.ForEach(func(_ []byte, buck *bbolt.Bucket) error { + return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error { + if bytes.Equal(bucketName, incompletedMoveBucketName) { + return nil + } exists = buck.Get(addrKey) != nil if exists { return errInterruptForEach diff --git a/pkg/local_object_storage/blobovnicza/get.go b/pkg/local_object_storage/blobovnicza/get.go index 9975e5e8..88d8e7e4 100644 --- a/pkg/local_object_storage/blobovnicza/get.go +++ b/pkg/local_object_storage/blobovnicza/get.go @@ -57,7 +57,11 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) { ) if err := b.boltDB.View(func(tx *bbolt.Tx) error { - return tx.ForEach(func(_ []byte, buck *bbolt.Bucket) error { + return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error { + if bytes.Equal(bucketName, incompletedMoveBucketName) { + return nil + } + data = buck.Get(addrKey) if data == nil { return nil diff --git a/pkg/local_object_storage/blobovnicza/iterate.go b/pkg/local_object_storage/blobovnicza/iterate.go index f4b85f5d..e19f8f14 100644 --- a/pkg/local_object_storage/blobovnicza/iterate.go +++ b/pkg/local_object_storage/blobovnicza/iterate.go @@ -1,6 +1,7 @@ package blobovnicza import ( + "bytes" "context" "fmt" "math" @@ -12,11 +13,11 @@ import ( "go.opentelemetry.io/otel/trace" ) -// iterateAllBuckets iterates all buckets in db +// iterateAllDataBuckets iterates all buckets in db // // If the maximum size of the object (b.objSizeLimit) has been changed to lower value, // then there may be more buckets than the current limit of the object size. -func (b *Blobovnicza) iterateAllBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error { +func (b *Blobovnicza) iterateAllDataBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error { return b.iterateBucketKeys(false, func(lower uint64, upper uint64, key []byte) (bool, error) { buck := tx.Bucket(key) if buck == nil { @@ -138,7 +139,10 @@ func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes, var elem IterationElement if err := b.boltDB.View(func(tx *bbolt.Tx) error { - return tx.ForEach(func(name []byte, buck *bbolt.Bucket) error { + return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error { + if bytes.Equal(bucketName, incompletedMoveBucketName) { + return nil + } return buck.ForEach(func(k, v []byte) error { select { case <-ctx.Done(): diff --git a/pkg/local_object_storage/blobovnicza/move.go b/pkg/local_object_storage/blobovnicza/move.go new file mode 100644 index 00000000..255198f6 --- /dev/null +++ b/pkg/local_object_storage/blobovnicza/move.go @@ -0,0 +1,108 @@ +package blobovnicza + +import ( + "context" + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.etcd.io/bbolt" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +var incompletedMoveBucketName = []byte("INCOMPLETED_MOVE") + +type MoveInfo struct { + Address oid.Address + TargetStorageID []byte +} + +func (b *Blobovnicza) PutMoveInfo(ctx context.Context, prm MoveInfo) error { + _, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.PutMoveInfo", + trace.WithAttributes( + attribute.String("path", b.path), + attribute.String("address", prm.Address.EncodeToString()), + attribute.String("target_storage_id", string(prm.TargetStorageID)), + )) + defer span.End() + + key := addressKey(prm.Address) + + return b.boltDB.Update(func(tx *bbolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(incompletedMoveBucketName) + if err != nil { + return err + } + + if err := bucket.Put(key, prm.TargetStorageID); err != nil { + return fmt.Errorf("(%T) failed to save move info: %w", b, err) + } + + return nil + }) +} + +func (b *Blobovnicza) DropMoveInfo(ctx context.Context, address oid.Address) error { + _, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.DropMoveInfo", + trace.WithAttributes( + attribute.String("path", b.path), + attribute.String("address", address.EncodeToString()), + )) + defer span.End() + + key := addressKey(address) + + return b.boltDB.Update(func(tx *bbolt.Tx) error { + bucket := tx.Bucket(incompletedMoveBucketName) + if bucket == nil { + return nil + } + + if err := bucket.Delete(key); err != nil { + return fmt.Errorf("(%T) failed to drop move info: %w", b, err) + } + + c := bucket.Cursor() + k, v := c.First() + bucketEmpty := k == nil && v == nil + if bucketEmpty { + return tx.DeleteBucket(incompletedMoveBucketName) + } + + return nil + }) +} + +func (b *Blobovnicza) ListMoveInfo(ctx context.Context) ([]MoveInfo, error) { + _, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.ListMoveInfo", + trace.WithAttributes( + attribute.String("path", b.path), + )) + defer span.End() + + var result []MoveInfo + if err := b.boltDB.View(func(tx *bbolt.Tx) error { + bucket := tx.Bucket(incompletedMoveBucketName) + if bucket == nil { + return nil + } + return bucket.ForEach(func(k, v []byte) error { + var addr oid.Address + storageID := make([]byte, len(v)) + if err := addressFromKey(&addr, k); err != nil { + return err + } + copy(storageID, v) + result = append(result, MoveInfo{ + Address: addr, + TargetStorageID: storageID, + }) + return nil + }) + }); err != nil { + return nil, err + } + + return result, nil +} diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go b/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go index 0354fa2a..9f295009 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go @@ -56,11 +56,12 @@ import ( type Blobovniczas struct { cfg - commondbManager *dbManager - activeDBManager *activeDBManager - dbCache *dbCache - dbFilesGuard *sync.RWMutex - rebuildGuard *sync.RWMutex + commondbManager *dbManager + activeDBManager *activeDBManager + dbCache *dbCache + deleteProtectedObjects *addressMap + dbFilesGuard *sync.RWMutex + rebuildGuard *sync.RWMutex } var _ common.Storage = (*Blobovniczas)(nil) @@ -87,6 +88,7 @@ func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) { blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.readOnly, blz.metrics.Blobovnicza(), blz.log) blz.activeDBManager = newActiveDBManager(blz.commondbManager, blz.blzLeafWidth) blz.dbCache = newDBCache(blz.openedCacheSize, blz.commondbManager) + blz.deleteProtectedObjects = newAddressMap() blz.dbFilesGuard = &sync.RWMutex{} blz.rebuildGuard = &sync.RWMutex{} diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/control.go b/pkg/local_object_storage/blobstor/blobovniczatree/control.go index ad463c11..561b0859 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/control.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/control.go @@ -57,12 +57,20 @@ func (b *Blobovniczas) initializeDBs(ctx context.Context) error { visited[p] = struct{}{} eg.Go(func() error { shBlz := b.getBlobovniczaWithoutCaching(p) - _, err := shBlz.Open() + blz, err := shBlz.Open() if err != nil { return err } defer shBlz.Close() + moveInfo, err := blz.ListMoveInfo(egCtx) + if err != nil { + return err + } + for _, move := range moveInfo { + b.deleteProtectedObjects.Add(move.Address) + } + b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p)) return nil }) diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/delete.go b/pkg/local_object_storage/blobstor/blobovniczatree/delete.go index 56803c87..298de3ad 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/delete.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/delete.go @@ -3,6 +3,7 @@ package blobovniczatree import ( "context" "encoding/hex" + "errors" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -18,6 +19,8 @@ import ( "go.uber.org/zap" ) +var errObjectIsDeleteProtected = errors.New("object is delete protected") + // Delete deletes object from blobovnicza tree. // // If blobocvnicza ID is specified, only this blobovnicza is processed. @@ -49,6 +52,10 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co return common.DeleteRes{}, errRebuildInProgress } + if b.deleteProtectedObjects.Contains(prm.Address) { + return common.DeleteRes{}, errObjectIsDeleteProtected + } + var bPrm blobovnicza.DeletePrm bPrm.SetAddress(prm.Address) diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go b/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go index 73678795..5115439b 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go @@ -176,16 +176,14 @@ func (b *Blobovniczas) iterateSorted(ctx context.Context, addr *oid.Address, cur // // Uses existed blobovnicza files for iteration. func (b *Blobovniczas) iterateExistingDBPaths(ctx context.Context, f func(string) (bool, error)) error { + b.dbFilesGuard.RLock() + defer b.dbFilesGuard.RUnlock() + _, err := b.iterateExistingDBPathsDFS(ctx, "", f) return err } func (b *Blobovniczas) iterateExistingDBPathsDFS(ctx context.Context, path string, f func(string) (bool, error)) (bool, error) { - if path == "" { - b.dbFilesGuard.RLock() - defer b.dbFilesGuard.RUnlock() - } - sysPath := filepath.Join(b.rootPath, path) entries, err := os.ReadDir(sysPath) if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode @@ -222,16 +220,14 @@ func (b *Blobovniczas) iterateExistingDBPathsDFS(ctx context.Context, path strin } func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Address, f func(string) (bool, error)) error { + b.dbFilesGuard.RLock() + defer b.dbFilesGuard.RUnlock() + _, err := b.iterateSordedDBPathsInternal(ctx, "", addr, f) return err } func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) { - if path == "" { - b.dbFilesGuard.RLock() - defer b.dbFilesGuard.RUnlock() - } - sysPath := filepath.Join(b.rootPath, path) entries, err := os.ReadDir(sysPath) if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go index 35e001b6..5f87933b 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go @@ -5,11 +5,13 @@ import ( "errors" "os" "path/filepath" + "sync" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) @@ -24,8 +26,18 @@ func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (comm b.rebuildGuard.Lock() defer b.rebuildGuard.Unlock() - b.log.Debug(logs.BlobovniczaTreeCollectingDBToRebuild) var res common.RebuildRes + + b.log.Debug(logs.BlobovniczaTreeCompletingPreviousRebuild) + completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage) + res.ObjectsMoved += completedPreviosMoves + if err != nil { + b.log.Warn(logs.BlobovniczaTreeCompletedPreviousRebuildFailed, zap.Error(err)) + return res, err + } + b.log.Debug(logs.BlobovniczaTreeCompletedPreviousRebuildSuccess) + + b.log.Debug(logs.BlobovniczaTreeCollectingDBToRebuild) dbsToMigrate, err := b.getDBsToRebuild(ctx) if err != nil { b.log.Warn(logs.BlobovniczaTreeCollectingDBToRebuildFailed, zap.Error(err)) @@ -82,7 +94,7 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M shDB.Close() }() - migratedObjects, err := b.moveObjects(ctx, blz, meta) + migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta) if err != nil { return migratedObjects, err } @@ -90,13 +102,13 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M return migratedObjects, err } -func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, meta common.MetaStorage) (uint64, error) { +func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage) (uint64, error) { var result uint64 var prm blobovnicza.IteratePrm prm.DecodeAddresses() prm.SetHandler(func(ie blobovnicza.IterationElement) error { - e := b.moveObject(ctx, ie.Address(), ie.ObjectData(), meta) + e := b.moveObject(ctx, blz, blzPath, ie.Address(), ie.ObjectData(), meta) if e == nil { result++ } @@ -107,15 +119,28 @@ func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovn return result, err } -func (b *Blobovniczas) moveObject(ctx context.Context, addr oid.Address, data []byte, metaStore common.MetaStorage) error { - var pPrm common.PutPrm - pPrm.Address = addr - pPrm.RawData = data - pRes, err := b.Put(ctx, pPrm) - if err != nil { - return err +func (b *Blobovniczas) moveObject(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string, + addr oid.Address, data []byte, metaStore common.MetaStorage) error { + it := &moveIterator{ + B: b, + ID: nil, + AllFull: true, + Address: addr, + ObjectData: data, + MetaStore: metaStore, + Source: source, + SourceSysPath: sourcePath, } - return metaStore.UpdateStorageID(ctx, addr, pRes.StorageID) + + if err := b.iterateDeepest(ctx, addr, func(lvlPath string) (bool, error) { return it.tryMoveToLvl(ctx, lvlPath) }); err != nil { + return err + } else if it.ID == nil { + if it.AllFull { + return common.ErrNoSpace + } + return errPutFailed + } + return nil } func (b *Blobovniczas) dropDB(ctx context.Context, path string, shDb *sharedDB) (bool, error) { @@ -159,3 +184,222 @@ func (b *Blobovniczas) dropDirectoryIfEmpty(path string) error { } return b.dropDirectoryIfEmpty(filepath.Dir(path)) } + +func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage) (uint64, error) { + var count uint64 + return count, b.iterateExistingDBPaths(ctx, func(s string) (bool, error) { + shDB := b.getBlobovnicza(s) + blz, err := shDB.Open() + if err != nil { + return true, err + } + defer shDB.Close() + + incompletedMoves, err := blz.ListMoveInfo(ctx) + if err != nil { + return true, err + } + + for _, move := range incompletedMoves { + if err := b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore); err != nil { + return true, err + } + count++ + } + + return false, nil + }) +} + +func (b *Blobovniczas) performMove(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string, + move blobovnicza.MoveInfo, metaStore common.MetaStorage) error { + targetDB := b.getBlobovnicza(NewIDFromBytes(move.TargetStorageID).Path()) + target, err := targetDB.Open() + if err != nil { + return err + } + defer targetDB.Close() + + existsInSource := true + var gPrm blobovnicza.GetPrm + gPrm.SetAddress(move.Address) + gRes, err := source.Get(ctx, gPrm) + if err != nil { + if client.IsErrObjectNotFound(err) { + existsInSource = false + } else { + b.log.Warn(logs.BlobovniczatreeCouldNotCheckExistenceInTargetDB, zap.Error(err)) + return err + } + } + + if !existsInSource { //object was deleted by Rebuild, need to delete move info + if err = source.DropMoveInfo(ctx, move.Address); err != nil { + b.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err)) + return err + } + b.deleteProtectedObjects.Delete(move.Address) + return nil + } + + existsInTarget, err := target.Exists(ctx, move.Address) + if err != nil { + b.log.Warn(logs.BlobovniczatreeCouldNotCheckExistenceInTargetDB, zap.Error(err)) + return err + } + + if !existsInTarget { + var putPrm blobovnicza.PutPrm + putPrm.SetAddress(move.Address) + putPrm.SetMarshaledObject(gRes.Object()) + _, err = target.Put(ctx, putPrm) + if err != nil { + b.log.Warn(logs.BlobovniczatreeCouldNotPutObjectToTargetDB, zap.String("path", targetDB.SystemPath()), zap.Error(err)) + return err + } + } + + if err = metaStore.UpdateStorageID(ctx, move.Address, move.TargetStorageID); err != nil { + b.log.Warn(logs.BlobovniczatreeCouldNotUpdateStorageID, zap.Error(err)) + return err + } + + var deletePrm blobovnicza.DeletePrm + deletePrm.SetAddress(move.Address) + if _, err = source.Delete(ctx, deletePrm); err != nil { + b.log.Warn(logs.BlobovniczatreeCouldNotDeleteFromSource, zap.String("path", sourcePath), zap.Error(err)) + return err + } + + if err = source.DropMoveInfo(ctx, move.Address); err != nil { + b.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err)) + return err + } + + b.deleteProtectedObjects.Delete(move.Address) + return nil +} + +type moveIterator struct { + B *Blobovniczas + ID *ID + AllFull bool + Address oid.Address + ObjectData []byte + MetaStore common.MetaStorage + Source *blobovnicza.Blobovnicza + SourceSysPath string +} + +func (i *moveIterator) tryMoveToLvl(ctx context.Context, lvlPath string) (bool, error) { + target, err := i.B.activeDBManager.GetOpenedActiveDBForLevel(lvlPath) + if err != nil { + if !isLogical(err) { + i.B.reportError(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, err) + } else { + i.B.log.Warn(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, zap.Error(err)) + } + return false, nil + } + + if target == nil { + i.B.log.Warn(logs.BlobovniczatreeBlobovniczaOverflowed, zap.String("level", lvlPath)) + return false, nil + } + defer target.Close() + + i.AllFull = false + + targetIDx := u64FromHexString(filepath.Base(target.SystemPath())) + targetStorageID := NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(targetIDx)))) + + if err = i.Source.PutMoveInfo(ctx, blobovnicza.MoveInfo{ + Address: i.Address, + TargetStorageID: targetStorageID.Bytes(), + }); err != nil { + if !isLogical(err) { + i.B.reportError(logs.BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza, err) + } else { + i.B.log.Warn(logs.BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza, zap.String("path", i.SourceSysPath), zap.Error(err)) + } + return true, nil + } + i.B.deleteProtectedObjects.Add(i.Address) + + var putPrm blobovnicza.PutPrm + putPrm.SetAddress(i.Address) + putPrm.SetMarshaledObject(i.ObjectData) + + _, err = target.Blobovnicza().Put(ctx, putPrm) + if err != nil { + if !isLogical(err) { + i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err) + } else { + i.B.log.Warn(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, zap.String("path", target.SystemPath()), zap.Error(err)) + } + return true, nil + } + + if err = i.MetaStore.UpdateStorageID(ctx, i.Address, targetStorageID.Bytes()); err != nil { + i.B.log.Warn(logs.BlobovniczatreeCouldNotUpdateStorageID, zap.Error(err)) + return true, nil + } + + var deletePrm blobovnicza.DeletePrm + deletePrm.SetAddress(i.Address) + if _, err = i.Source.Delete(ctx, deletePrm); err != nil { + if !isLogical(err) { + i.B.reportError(logs.BlobovniczatreeCouldNotDeleteFromSource, err) + } else { + i.B.log.Warn(logs.BlobovniczatreeCouldNotDeleteFromSource, zap.String("path", i.SourceSysPath), zap.Error(err)) + } + return true, nil + } + + if err = i.Source.DropMoveInfo(ctx, i.Address); err != nil { + if !isLogical(err) { + i.B.reportError(logs.BlobovniczatreeCouldNotDropMoveInfo, err) + } else { + i.B.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", i.SourceSysPath), zap.Error(err)) + } + return true, nil + } + i.B.deleteProtectedObjects.Delete(i.Address) + + i.ID = targetStorageID + return true, nil +} + +type addressMap struct { + data map[oid.Address]struct{} + guard *sync.RWMutex +} + +func newAddressMap() *addressMap { + return &addressMap{ + data: make(map[oid.Address]struct{}), + guard: &sync.RWMutex{}, + } +} + +func (m *addressMap) Add(address oid.Address) { + m.guard.Lock() + defer m.guard.Unlock() + + m.data[address] = struct{}{} +} + +func (m *addressMap) Delete(address oid.Address) { + m.guard.Lock() + defer m.guard.Unlock() + + delete(m.data, address) +} + +func (m *addressMap) Contains(address oid.Address) bool { + m.guard.RLock() + defer m.guard.RUnlock() + + _, contains := m.data[address] + return contains +} diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_failover_test.go b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_failover_test.go new file mode 100644 index 00000000..c9ce564c --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_failover_test.go @@ -0,0 +1,192 @@ +package blobovniczatree + +import ( + "bytes" + "context" + "path/filepath" + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/stretchr/testify/require" +) + +func TestRebuildFailover(t *testing.T) { + t.Parallel() + + t.Run("only move info saved", testRebuildFailoverOnlyMoveInfoSaved) + + t.Run("object saved to target", testRebuildFailoverObjectSavedToTarget) + + t.Run("object deleted from source", testRebuildFailoverObjectDeletedFromSource) +} + +func testRebuildFailoverOnlyMoveInfoSaved(t *testing.T) { + t.Parallel() + dir := t.TempDir() + + blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db"))) + require.NoError(t, blz.Open()) + require.NoError(t, blz.Init()) + + obj := blobstortest.NewObject(1024) + data, err := obj.Marshal() + require.NoError(t, err) + + var pPrm blobovnicza.PutPrm + pPrm.SetAddress(object.AddressOf(obj)) + pPrm.SetMarshaledObject(data) + _, err = blz.Put(context.Background(), pPrm) + require.NoError(t, err) + + require.NoError(t, blz.PutMoveInfo(context.Background(), blobovnicza.MoveInfo{ + Address: object.AddressOf(obj), + TargetStorageID: []byte("0/0/0"), + })) + + require.NoError(t, blz.Close()) + + testRebuildFailoverValidate(t, dir, obj, true) +} + +func testRebuildFailoverObjectSavedToTarget(t *testing.T) { + t.Parallel() + dir := t.TempDir() + + blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db"))) + require.NoError(t, blz.Open()) + require.NoError(t, blz.Init()) + + obj := blobstortest.NewObject(1024) + data, err := obj.Marshal() + require.NoError(t, err) + + var pPrm blobovnicza.PutPrm + pPrm.SetAddress(object.AddressOf(obj)) + pPrm.SetMarshaledObject(data) + _, err = blz.Put(context.Background(), pPrm) + require.NoError(t, err) + + require.NoError(t, blz.PutMoveInfo(context.Background(), blobovnicza.MoveInfo{ + Address: object.AddressOf(obj), + TargetStorageID: []byte("0/0/0"), + })) + + require.NoError(t, blz.Close()) + + blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db"))) + require.NoError(t, blz.Open()) + require.NoError(t, blz.Init()) + + _, err = blz.Put(context.Background(), pPrm) + require.NoError(t, err) + + require.NoError(t, blz.Close()) + + testRebuildFailoverValidate(t, dir, obj, true) +} + +func testRebuildFailoverObjectDeletedFromSource(t *testing.T) { + t.Parallel() + dir := t.TempDir() + + blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db"))) + require.NoError(t, blz.Open()) + require.NoError(t, blz.Init()) + + obj := blobstortest.NewObject(1024) + data, err := obj.Marshal() + require.NoError(t, err) + + require.NoError(t, blz.PutMoveInfo(context.Background(), blobovnicza.MoveInfo{ + Address: object.AddressOf(obj), + TargetStorageID: []byte("0/0/0"), + })) + + require.NoError(t, blz.Close()) + + blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db"))) + require.NoError(t, blz.Open()) + require.NoError(t, blz.Init()) + + var pPrm blobovnicza.PutPrm + pPrm.SetAddress(object.AddressOf(obj)) + pPrm.SetMarshaledObject(data) + _, err = blz.Put(context.Background(), pPrm) + require.NoError(t, err) + + require.NoError(t, blz.Close()) + + testRebuildFailoverValidate(t, dir, obj, false) +} + +func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object, mustUpdateStorageID bool) { + b := NewBlobovniczaTree( + WithLogger(test.NewLogger(t, true)), + WithObjectSizeLimit(2048), + WithBlobovniczaShallowWidth(2), + WithBlobovniczaShallowDepth(2), + WithRootPath(dir), + WithBlobovniczaSize(100*1024*1024), + WithWaitBeforeDropDB(0), + WithOpenedCacheSize(1000)) + require.NoError(t, b.Open(false)) + require.NoError(t, b.Init()) + + var dPrm common.DeletePrm + dPrm.Address = object.AddressOf(obj) + dPrm.StorageID = []byte("0/0/1") + _, err := b.Delete(context.Background(), dPrm) + require.ErrorIs(t, err, errObjectIsDeleteProtected) + + metaStub := &storageIDUpdateStub{ + storageIDs: make(map[oid.Address][]byte), + } + rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{ + MetaStorage: metaStub, + }) + require.NoError(t, err) + require.Equal(t, uint64(1), rRes.ObjectsMoved) + require.Equal(t, uint64(0), rRes.FilesRemoved) + + require.NoError(t, b.Close()) + + blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db"))) + require.NoError(t, blz.Open()) + require.NoError(t, blz.Init()) + + moveInfo, err := blz.ListMoveInfo(context.Background()) + require.NoError(t, err) + require.Equal(t, 0, len(moveInfo)) + + var gPrm blobovnicza.GetPrm + gPrm.SetAddress(object.AddressOf(obj)) + _, err = blz.Get(context.Background(), gPrm) + require.True(t, client.IsErrObjectNotFound(err)) + + require.NoError(t, blz.Close()) + + blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db"))) + require.NoError(t, blz.Open()) + require.NoError(t, blz.Init()) + + moveInfo, err = blz.ListMoveInfo(context.Background()) + require.NoError(t, err) + require.Equal(t, 0, len(moveInfo)) + + gRes, err := blz.Get(context.Background(), gPrm) + require.NoError(t, err) + require.True(t, len(gRes.Object()) > 0) + + if mustUpdateStorageID { + require.True(t, bytes.Equal([]byte("0/0/0"), metaStub.storageIDs[object.AddressOf(obj)])) + } + + require.NoError(t, blz.Close()) +}