diff --git a/pkg/local_object_storage/metabase/upgrade.go b/pkg/local_object_storage/metabase/upgrade.go index a997b90a0..1f2c7956b 100644 --- a/pkg/local_object_storage/metabase/upgrade.go +++ b/pkg/local_object_storage/metabase/upgrade.go @@ -360,26 +360,40 @@ func dropUserAttributes(ctx context.Context, db *bbolt.DB, cs container.InfoProv return nil } last = keys[len(keys)-1] - keysToDrop, err := selectUserAttributeKeysToDrop(keys, cs) + cnt, err := dropNonIndexedUserAttributeBuckets(db, cs, keys) if err != nil { + log("deleting user attribute buckets completed with an error:", err) return err } - if err := db.Batch(func(tx *bbolt.Tx) error { - for _, k := range keysToDrop { - if err := tx.DeleteBucket(k); err != nil { - return err - } - } - return nil - }); err != nil { - log("deleting buckets completed with an error:", err) + count += cnt + cnt, err = dropEmptyUserAttributeBuckets(ctx, db, keys) + if err != nil { + log("deleting user attribute buckets completed with an error:", err) return err } - count += uint64(len(keysToDrop)) - log("deleted", count, "buckets") + count += cnt + log("deleted", count, "user attribute buckets") } } +func dropNonIndexedUserAttributeBuckets(db *bbolt.DB, cs container.InfoProvider, keys [][]byte) (uint64, error) { + keysToDrop, err := selectUserAttributeKeysToDrop(keys, cs) + if err != nil { + return 0, fmt.Errorf("select non indexed user attributes: %w", err) + } + if err := db.Batch(func(tx *bbolt.Tx) error { + for _, k := range keysToDrop { + if err := tx.DeleteBucket(k); err != nil { + return err + } + } + return nil + }); err != nil { + return 0, fmt.Errorf("drop non indexed user attributes: %w", err) + } + return uint64(len(keysToDrop)), nil +} + func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([][]byte, error) { var keysToDrop [][]byte for _, key := range keys { @@ -406,6 +420,134 @@ func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([] return keysToDrop, nil } +func dropEmptyUserAttributeBuckets(ctx context.Context, db *bbolt.DB, keys [][]byte) (uint64, error) { + var dropBuckets [][]byte + for _, key := range keys { + select { + case <-ctx.Done(): + return 0, ctx.Err() + default: + } + + if err := dropEmptyNestedBuckets(ctx, db, key); err != nil { + return 0, err + } + + empty, exists, err := bucketIsEmpty(db, key) + if err != nil { + return 0, err + } + if empty && exists { + dropBuckets = append(dropBuckets, key) + } + } + if len(dropBuckets) == 0 { + return 0, nil + } + if err := db.Batch(func(tx *bbolt.Tx) error { + for _, key := range dropBuckets { + if err := tx.DeleteBucket(key); err != nil { + return err + } + } + return nil + }); err != nil { + return 0, fmt.Errorf("drop empty user attributes buckets: %w", err) + } + return uint64(len(dropBuckets)), nil +} + +func bucketIsEmpty(db *bbolt.DB, bucketKey []byte) (bool, bool, error) { + var empty bool + var exists bool + if err := db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(bucketKey) + if b == nil { + return nil + } + exists = true + empty = !hasAnyItem(b) + return nil + }); err != nil { + return false, false, fmt.Errorf("bucket empty check: %w", err) + } + return empty, exists, nil +} + +func dropEmptyNestedBuckets(ctx context.Context, db *bbolt.DB, rootBucketKey []byte) error { + var last []byte + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + var dropBuckets [][]byte + var err error + dropBuckets, last, err = selectEmptyNestedBuckets(ctx, db, rootBucketKey, last) + if err != nil { + return fmt.Errorf("select empty nested buckets: %w", err) + } + if len(dropBuckets) == 0 { + return nil + } + + if err := db.Batch(func(tx *bbolt.Tx) error { + rootBucket := tx.Bucket(rootBucketKey) + if rootBucket == nil { + return nil + } + for _, sb := range dropBuckets { + if err := rootBucket.DeleteBucket(sb); err != nil { + return err + } + } + return nil + }); err != nil { + return fmt.Errorf("drop empty nested buckets: %w", err) + } + } +} + +func selectEmptyNestedBuckets(ctx context.Context, db *bbolt.DB, rootBucketKey, last []byte) ([][]byte, []byte, error) { + const batchSize = 1000 + var result [][]byte + if err := db.View(func(tx *bbolt.Tx) error { + rootBucket := tx.Bucket(rootBucketKey) + if rootBucket == nil { + return nil + } + c := rootBucket.Cursor() + for k, v := c.Seek(last); k != nil && len(result) < batchSize; k, v = c.Next() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if bytes.Equal(last, k) { + continue + } + last = bytes.Clone(k) + if v != nil { // record + continue + } + nestedBucket := rootBucket.Bucket(k) + if nestedBucket == nil { + continue + } + if !hasAnyItem(nestedBucket) { + result = append(result, bytes.Clone(k)) + } + } + return nil + }); err != nil { + return nil, nil, err + } + return result, last, nil +} + func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB, log func(a ...any)) error { return dropBucketsByPrefix(ctx, db, []byte{ownerPrefix}, func(a ...any) { log(append([]any{"owner ID index:"}, a...)...)