[#1412] metabase: Drop empty user attribute buckets on upgrade
All checks were successful
DCO action / DCO (pull_request) Successful in 1m12s
Tests and linters / Run gofumpt (pull_request) Successful in 1m45s
Vulncheck / Vulncheck (pull_request) Successful in 2m9s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m26s
Build / Build Components (pull_request) Successful in 2m35s
Tests and linters / gopls check (pull_request) Successful in 2m58s
Tests and linters / Staticcheck (pull_request) Successful in 3m20s
Tests and linters / Lint (pull_request) Successful in 3m26s
Tests and linters / Tests (pull_request) Successful in 4m45s
Tests and linters / Tests with -race (pull_request) Successful in 5m43s

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-10-04 10:49:39 +03:00
parent 653b3604e1
commit eac1f93c97

View file

@ -360,26 +360,40 @@ func dropUserAttributes(ctx context.Context, db *bbolt.DB, cs container.InfoProv
return nil return nil
} }
last = keys[len(keys)-1] last = keys[len(keys)-1]
keysToDrop, err := selectUserAttributeKeysToDrop(keys, cs) cnt, err := dropNonIndexableUserAttributeBuckets(db, cs, keys)
if err != nil { if err != nil {
log("deleting user attribute buckets completed with an error:", err)
return err return err
} }
if err := db.Batch(func(tx *bbolt.Tx) error { count += cnt
for _, k := range keysToDrop { cnt, err = dropEmptyUserAttributeBuckets(ctx, db, keys)
if err := tx.DeleteBucket(k); err != nil { if err != nil {
return err log("deleting user attribute buckets completed with an error:", err)
}
}
return nil
}); err != nil {
log("deleting buckets completed with an error:", err)
return err return err
} }
count += uint64(len(keysToDrop)) count += cnt
log("deleted", count, "buckets") log("deleted", count, "user attribute buckets")
} }
} }
func dropNonIndexableUserAttributeBuckets(db *bbolt.DB, cs container.InfoProvider, keys [][]byte) (uint64, error) {
keysToDrop, err := selectUserAttributeKeysToDrop(keys, cs)
if err != nil {
return 0, fmt.Errorf("select non indexable user attributes: %w", err)
}
if err := db.Batch(func(tx *bbolt.Tx) error {
for _, k := range keysToDrop {
if err := tx.DeleteBucket(k); err != nil {
return err
}
}
return nil
}); err != nil {
return 0, fmt.Errorf("drop non indexable user attributes: %w", err)
}
return uint64(len(keysToDrop)), nil
}
func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([][]byte, error) { func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([][]byte, error) {
var keysToDrop [][]byte var keysToDrop [][]byte
for _, key := range keys { for _, key := range keys {
@ -406,6 +420,134 @@ func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([]
return keysToDrop, nil return keysToDrop, nil
} }
func dropEmptyUserAttributeBuckets(ctx context.Context, db *bbolt.DB, keys [][]byte) (uint64, error) {
var dropBuckets [][]byte
for _, key := range keys {
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
}
if err := dropEmptyNestedBuckets(ctx, db, key); err != nil {
return 0, err
}
empty, exists, err := bucketIsEmpty(db, key)
if err != nil {
return 0, err
}
if empty && exists {
dropBuckets = append(dropBuckets, key)
}
}
if len(dropBuckets) == 0 {
return 0, nil
}
if err := db.Batch(func(tx *bbolt.Tx) error {
for _, key := range dropBuckets {
if err := tx.DeleteBucket(key); err != nil {
return err
}
}
return nil
}); err != nil {
return 0, fmt.Errorf("drop empty user attributes buckets: %w", err)
}
return uint64(len(dropBuckets)), nil
}
func bucketIsEmpty(db *bbolt.DB, bucketKey []byte) (bool, bool, error) {
var empty bool
var exists bool
if err := db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(bucketKey)
if b == nil {
return nil
}
exists = true
empty = !hasAnyItem(b)
return nil
}); err != nil {
return false, false, fmt.Errorf("bucket empty check: %w", err)
}
return empty, exists, nil
}
func dropEmptyNestedBuckets(ctx context.Context, db *bbolt.DB, rootBucketKey []byte) error {
var last []byte
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
var dropBuckets [][]byte
var err error
dropBuckets, last, err = selectEmptyNestedBuckets(ctx, db, rootBucketKey, last)
if err != nil {
return fmt.Errorf("select empty nested buckets: %w", err)
}
if len(dropBuckets) == 0 {
return nil
}
if err := db.Batch(func(tx *bbolt.Tx) error {
rootBucket := tx.Bucket(rootBucketKey)
if rootBucket == nil {
return nil
}
for _, sb := range dropBuckets {
if err := rootBucket.DeleteBucket(sb); err != nil {
return err
}
}
return nil
}); err != nil {
return fmt.Errorf("drop empty nested buckets: %w", err)
}
}
}
func selectEmptyNestedBuckets(ctx context.Context, db *bbolt.DB, rootBucketKey, last []byte) ([][]byte, []byte, error) {
const batchSize = 1000
var result [][]byte
if err := db.View(func(tx *bbolt.Tx) error {
rootBucket := tx.Bucket(rootBucketKey)
if rootBucket == nil {
return nil
}
c := rootBucket.Cursor()
for k, v := c.Seek(last); k != nil && len(result) < batchSize; k, v = c.Next() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if bytes.Equal(last, k) {
continue
}
last = bytes.Clone(k)
if v != nil { // record
continue
}
nestedBucket := rootBucket.Bucket(k)
if nestedBucket == nil {
continue
}
if !hasAnyItem(nestedBucket) {
result = append(result, bytes.Clone(k))
}
}
return nil
}); err != nil {
return nil, nil, err
}
return result, last, nil
}
func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB, log func(a ...any)) error { func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
return dropBucketsByPrefix(ctx, db, []byte{ownerPrefix}, func(a ...any) { return dropBucketsByPrefix(ctx, db, []byte{ownerPrefix}, func(a ...any) {
log(append([]any{"owner ID index:"}, a...)...) log(append([]any{"owner ID index:"}, a...)...)