[#1412] metabase: Drop empty user attribute buckets on upgrade

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-10-04 10:49:39 +03:00
parent 87f4b934d1
commit fe9f664b57
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0

View file

@ -360,26 +360,40 @@ func dropUserAttributes(ctx context.Context, db *bbolt.DB, cs container.InfoProv
return nil
}
last = keys[len(keys)-1]
keysToDrop, err := selectUserAttributeKeysToDrop(keys, cs)
cnt, err := dropNonIndexedUserAttributeBuckets(db, cs, keys)
if err != nil {
log("deleting user attribute buckets completed with an error:", err)
return err
}
if err := db.Batch(func(tx *bbolt.Tx) error {
for _, k := range keysToDrop {
if err := tx.DeleteBucket(k); err != nil {
return err
}
}
return nil
}); err != nil {
log("deleting buckets completed with an error:", err)
count += cnt
cnt, err = dropEmptyUserAttributeBuckets(ctx, db, keys)
if err != nil {
log("deleting user attribute buckets completed with an error:", err)
return err
}
count += uint64(len(keysToDrop))
log("deleted", count, "buckets")
count += cnt
log("deleted", count, "user attribute buckets")
}
}
func dropNonIndexedUserAttributeBuckets(db *bbolt.DB, cs container.InfoProvider, keys [][]byte) (uint64, error) {
keysToDrop, err := selectUserAttributeKeysToDrop(keys, cs)
if err != nil {
return 0, fmt.Errorf("select non indexed user attributes: %w", err)
}
if err := db.Batch(func(tx *bbolt.Tx) error {
for _, k := range keysToDrop {
if err := tx.DeleteBucket(k); err != nil {
return err
}
}
return nil
}); err != nil {
return 0, fmt.Errorf("drop non indexed user attributes: %w", err)
}
return uint64(len(keysToDrop)), nil
}
func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([][]byte, error) {
var keysToDrop [][]byte
for _, key := range keys {
@ -406,6 +420,134 @@ func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([]
return keysToDrop, nil
}
func dropEmptyUserAttributeBuckets(ctx context.Context, db *bbolt.DB, keys [][]byte) (uint64, error) {
var dropBuckets [][]byte
for _, key := range keys {
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
}
if err := dropEmptyNestedBuckets(ctx, db, key); err != nil {
return 0, err
}
empty, exists, err := bucketIsEmpty(db, key)
if err != nil {
return 0, err
}
if empty && exists {
dropBuckets = append(dropBuckets, key)
}
}
if len(dropBuckets) == 0 {
return 0, nil
}
if err := db.Batch(func(tx *bbolt.Tx) error {
for _, key := range dropBuckets {
if err := tx.DeleteBucket(key); err != nil {
return err
}
}
return nil
}); err != nil {
return 0, fmt.Errorf("drop empty user attributes buckets: %w", err)
}
return uint64(len(dropBuckets)), nil
}
func bucketIsEmpty(db *bbolt.DB, bucketKey []byte) (bool, bool, error) {
var empty bool
var exists bool
if err := db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(bucketKey)
if b == nil {
return nil
}
exists = true
empty = !hasAnyItem(b)
return nil
}); err != nil {
return false, false, fmt.Errorf("bucket empty check: %w", err)
}
return empty, exists, nil
}
func dropEmptyNestedBuckets(ctx context.Context, db *bbolt.DB, rootBucketKey []byte) error {
var last []byte
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
var dropBuckets [][]byte
var err error
dropBuckets, last, err = selectEmptyNestedBuckets(ctx, db, rootBucketKey, last)
if err != nil {
return fmt.Errorf("select empty nested buckets: %w", err)
}
if len(dropBuckets) == 0 {
return nil
}
if err := db.Batch(func(tx *bbolt.Tx) error {
rootBucket := tx.Bucket(rootBucketKey)
if rootBucket == nil {
return nil
}
for _, sb := range dropBuckets {
if err := rootBucket.DeleteBucket(sb); err != nil {
return err
}
}
return nil
}); err != nil {
return fmt.Errorf("drop empty nested buckets: %w", err)
}
}
}
func selectEmptyNestedBuckets(ctx context.Context, db *bbolt.DB, rootBucketKey, last []byte) ([][]byte, []byte, error) {
const batchSize = 1000
var result [][]byte
if err := db.View(func(tx *bbolt.Tx) error {
rootBucket := tx.Bucket(rootBucketKey)
if rootBucket == nil {
return nil
}
c := rootBucket.Cursor()
for k, v := c.Seek(last); k != nil && len(result) < batchSize; k, v = c.Next() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if bytes.Equal(last, k) {
continue
}
last = bytes.Clone(k)
if v != nil { // record
continue
}
nestedBucket := rootBucket.Bucket(k)
if nestedBucket == nil {
continue
}
if !hasAnyItem(nestedBucket) {
result = append(result, bytes.Clone(k))
}
}
return nil
}); err != nil {
return nil, nil, err
}
return result, last, nil
}
func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
return dropBucketsByPrefix(ctx, db, []byte{ownerPrefix}, func(a ...any) {
log(append([]any{"owner ID index:"}, a...)...)