[#1412] metabase: Drop empty user attribute buckets on upgrade
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 32s
DCO action / DCO (pull_request) Successful in 1m11s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m8s
Vulncheck / Vulncheck (pull_request) Successful in 2m6s
Build / Build Components (pull_request) Successful in 2m22s
Tests and linters / gopls check (pull_request) Successful in 2m37s
Tests and linters / Staticcheck (pull_request) Successful in 3m12s
Tests and linters / Lint (pull_request) Successful in 3m33s
Tests and linters / Tests (pull_request) Successful in 4m14s
Tests and linters / Tests with -race (pull_request) Successful in 5m57s

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-10-04 10:49:39 +03:00
parent 856d6d2500
commit 91a9480733

View file

@ -360,9 +360,26 @@ func dropUserAttributes(ctx context.Context, db *bbolt.DB, cs container.InfoProv
return nil return nil
} }
last = keys[len(keys)-1] last = keys[len(keys)-1]
cnt, err := dropNonIndexableUserAttributeBuckets(db, cs, keys)
if err != nil {
log("deleting user attribute buckets completed with an error:", err)
return err
}
count += cnt
cnt, err = dropEmptyUserAttributeBuckets(ctx, db, keys)
if err != nil {
log("deleting user attribute buckets completed with an error:", err)
return err
}
count += cnt
log("deleted", count, "user attribute buckets")
}
}
func dropNonIndexableUserAttributeBuckets(db *bbolt.DB, cs container.InfoProvider, keys [][]byte) (uint64, error) {
keysToDrop, err := selectUserAttributeKeysToDrop(keys, cs) keysToDrop, err := selectUserAttributeKeysToDrop(keys, cs)
if err != nil { if err != nil {
return err return 0, fmt.Errorf("select non indexable user attributes: %w", err)
} }
if err := db.Batch(func(tx *bbolt.Tx) error { if err := db.Batch(func(tx *bbolt.Tx) error {
for _, k := range keysToDrop { for _, k := range keysToDrop {
@ -372,12 +389,9 @@ func dropUserAttributes(ctx context.Context, db *bbolt.DB, cs container.InfoProv
} }
return nil return nil
}); err != nil { }); err != nil {
log("deleting buckets completed with an error:", err) return 0, fmt.Errorf("drop non indexable user attributes: %w", err)
return err
}
count += uint64(len(keysToDrop))
log("deleted", count, "buckets")
} }
return uint64(len(keysToDrop)), nil
} }
func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([][]byte, error) { func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([][]byte, error) {
@ -406,6 +420,134 @@ func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([]
return keysToDrop, nil return keysToDrop, nil
} }
func dropEmptyUserAttributeBuckets(ctx context.Context, db *bbolt.DB, keys [][]byte) (uint64, error) {
var dropBuckets [][]byte
for _, key := range keys {
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
}
if err := dropEmptyNestedBuckets(ctx, db, key); err != nil {
return 0, err
}
empty, exists, err := bucketIsEmpty(db, key)
if err != nil {
return 0, err
}
if empty && exists {
dropBuckets = append(dropBuckets, key)
}
}
if len(dropBuckets) == 0 {
return 0, nil
}
if err := db.Batch(func(tx *bbolt.Tx) error {
for _, key := range dropBuckets {
if err := tx.DeleteBucket(key); err != nil {
return err
}
}
return nil
}); err != nil {
return 0, fmt.Errorf("drop empty user attributes buckets: %w", err)
}
return uint64(len(dropBuckets)), nil
}
func bucketIsEmpty(db *bbolt.DB, bucketKey []byte) (bool, bool, error) {
var empty bool
var exists bool
if err := db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(bucketKey)
if b == nil {
return nil
}
exists = true
empty = !hasAnyItem(b)
return nil
}); err != nil {
return false, false, fmt.Errorf("bucket empty check: %w", err)
}
return empty, exists, nil
}
func dropEmptyNestedBuckets(ctx context.Context, db *bbolt.DB, rootBucketKey []byte) error {
var last []byte
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
var dropBuckets [][]byte
var err error
dropBuckets, last, err = selectEmptyNestedBuckets(ctx, db, rootBucketKey, last)
if err != nil {
return fmt.Errorf("select empty nested buckets: %w", err)
}
if len(dropBuckets) == 0 {
return nil
}
if err := db.Batch(func(tx *bbolt.Tx) error {
rootBucket := tx.Bucket(rootBucketKey)
if rootBucket == nil {
return nil
}
for _, sb := range dropBuckets {
if err := rootBucket.DeleteBucket(sb); err != nil {
return err
}
}
return nil
}); err != nil {
return fmt.Errorf("drop empty nested buckets: %w", err)
}
}
}
func selectEmptyNestedBuckets(ctx context.Context, db *bbolt.DB, rootBucketKey, last []byte) ([][]byte, []byte, error) {
const batchSize = 1000
var result [][]byte
if err := db.View(func(tx *bbolt.Tx) error {
rootBucket := tx.Bucket(rootBucketKey)
if rootBucket == nil {
return nil
}
c := rootBucket.Cursor()
for k, v := c.Seek(last); k != nil && len(result) < batchSize; k, v = c.Next() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if bytes.Equal(last, k) {
continue
}
last = bytes.Clone(k)
if v != nil { // record
continue
}
nestedBucket := rootBucket.Bucket(k)
if nestedBucket == nil {
continue
}
if !hasAnyItem(nestedBucket) {
result = append(result, bytes.Clone(k))
}
}
return nil
}); err != nil {
return nil, nil, err
}
return result, last, nil
}
func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB, log func(a ...any)) error { func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
return dropBucketsByPrefix(ctx, db, []byte{ownerPrefix}, func(a ...any) { return dropBucketsByPrefix(ctx, db, []byte{ownerPrefix}, func(a ...any) {
log(append([]any{"owner ID index:"}, a...)...) log(append([]any{"owner ID index:"}, a...)...)