[#1412] metabase: Drop empty user attribute buckets on upgrade
Some checks failed
DCO action / DCO (pull_request) Successful in 1m7s
Tests and linters / Run gofumpt (pull_request) Successful in 1m25s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m10s
Vulncheck / Vulncheck (pull_request) Successful in 2m6s
Tests and linters / Lint (pull_request) Failing after 2m15s
Build / Build Components (pull_request) Successful in 2m34s
Tests and linters / gopls check (pull_request) Successful in 2m43s
Tests and linters / Staticcheck (pull_request) Successful in 3m5s
Tests and linters / Tests (pull_request) Successful in 4m20s
Tests and linters / Tests with -race (pull_request) Successful in 5m58s
Some checks failed
DCO action / DCO (pull_request) Successful in 1m7s
Tests and linters / Run gofumpt (pull_request) Successful in 1m25s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m10s
Vulncheck / Vulncheck (pull_request) Successful in 2m6s
Tests and linters / Lint (pull_request) Failing after 2m15s
Build / Build Components (pull_request) Successful in 2m34s
Tests and linters / gopls check (pull_request) Successful in 2m43s
Tests and linters / Staticcheck (pull_request) Successful in 3m5s
Tests and linters / Tests (pull_request) Successful in 4m20s
Tests and linters / Tests with -race (pull_request) Successful in 5m58s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
1cde3509e2
commit
6b5b44dc98
1 changed files with 154 additions and 12 deletions
|
@ -360,26 +360,40 @@ func dropUserAttributes(ctx context.Context, db *bbolt.DB, cs container.InfoProv
|
|||
return nil
|
||||
}
|
||||
last = keys[len(keys)-1]
|
||||
keysToDrop, err := selectUserAttributeKeysToDrop(keys, cs)
|
||||
cnt, err := dropNonIndexableUserAttributeBuckets(db, cs, keys)
|
||||
if err != nil {
|
||||
log("deleting user attribute buckets completed with an error:", err)
|
||||
return err
|
||||
}
|
||||
if err := db.Batch(func(tx *bbolt.Tx) error {
|
||||
for _, k := range keysToDrop {
|
||||
if err := tx.DeleteBucket(k); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
log("deleting buckets completed with an error:", err)
|
||||
count += cnt
|
||||
cnt, err = dropEmptyUserAttributeBuckets(ctx, db, keys)
|
||||
if err != nil {
|
||||
log("deleting user attribute buckets completed with an error:", err)
|
||||
return err
|
||||
}
|
||||
count += uint64(len(keysToDrop))
|
||||
log("deleted", count, "buckets")
|
||||
count += cnt
|
||||
log("deleted", count, "user attribute buckets")
|
||||
}
|
||||
}
|
||||
|
||||
func dropNonIndexableUserAttributeBuckets(db *bbolt.DB, cs container.InfoProvider, keys [][]byte) (uint64, error) {
|
||||
keysToDrop, err := selectUserAttributeKeysToDrop(keys, cs)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("select non indexable user attributes: %w", err)
|
||||
}
|
||||
if err := db.Batch(func(tx *bbolt.Tx) error {
|
||||
for _, k := range keysToDrop {
|
||||
if err := tx.DeleteBucket(k); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return 0, fmt.Errorf("drop non indexable user attributes: %w", err)
|
||||
}
|
||||
return uint64(len(keysToDrop)), nil
|
||||
}
|
||||
|
||||
func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([][]byte, error) {
|
||||
var keysToDrop [][]byte
|
||||
for _, key := range keys {
|
||||
|
@ -406,6 +420,134 @@ func selectUserAttributeKeysToDrop(keys [][]byte, cs container.InfoProvider) ([]
|
|||
return keysToDrop, nil
|
||||
}
|
||||
|
||||
func dropEmptyUserAttributeBuckets(ctx context.Context, db *bbolt.DB, keys [][]byte) (uint64, error) {
|
||||
var dropBuckets [][]byte
|
||||
for _, key := range keys {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return 0, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err := dropEmptyNestedBuckets(ctx, db, key); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
empty, exists, err := bucketIsEmpty(db, key)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if empty && exists {
|
||||
dropBuckets = append(dropBuckets, key)
|
||||
}
|
||||
}
|
||||
if len(dropBuckets) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
if err := db.Batch(func(tx *bbolt.Tx) error {
|
||||
for _, key := range dropBuckets {
|
||||
if err := tx.DeleteBucket(key); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return 0, fmt.Errorf("drop empty user attributes buckets: %w", err)
|
||||
}
|
||||
return uint64(len(dropBuckets)), nil
|
||||
}
|
||||
|
||||
func bucketIsEmpty(db *bbolt.DB, bucketKey []byte) (bool, bool, error) {
|
||||
var empty bool
|
||||
var exists bool
|
||||
if err := db.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(bucketKey)
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
exists = true
|
||||
empty = !hasAnyItem(b)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return false, false, fmt.Errorf("bucket empty check: %w", err)
|
||||
}
|
||||
return empty, exists, nil
|
||||
}
|
||||
|
||||
func dropEmptyNestedBuckets(ctx context.Context, db *bbolt.DB, rootBucketKey []byte) error {
|
||||
var last []byte
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
var dropBuckets [][]byte
|
||||
var err error
|
||||
dropBuckets, last, err = selectEmptyNestedBuckets(ctx, db, rootBucketKey, last)
|
||||
if err != nil {
|
||||
return fmt.Errorf("select empty nested buckets: %w", err)
|
||||
}
|
||||
if len(dropBuckets) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := db.Batch(func(tx *bbolt.Tx) error {
|
||||
rootBucket := tx.Bucket(rootBucketKey)
|
||||
if rootBucket == nil {
|
||||
return nil
|
||||
}
|
||||
for _, sb := range dropBuckets {
|
||||
if err := rootBucket.DeleteBucket(sb); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return fmt.Errorf("drop empty nested buckets: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func selectEmptyNestedBuckets(ctx context.Context, db *bbolt.DB, rootBucketKey, last []byte) ([][]byte, []byte, error) {
|
||||
const batchSize = 1000
|
||||
var result [][]byte
|
||||
if err := db.View(func(tx *bbolt.Tx) error {
|
||||
rootBucket := tx.Bucket(rootBucketKey)
|
||||
if rootBucket == nil {
|
||||
return nil
|
||||
}
|
||||
c := rootBucket.Cursor()
|
||||
for k, v := c.Seek(last); k != nil && len(result) < batchSize; k, v = c.Next() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if bytes.Equal(last, k) {
|
||||
continue
|
||||
}
|
||||
last = bytes.Clone(k)
|
||||
if v != nil { // record
|
||||
continue
|
||||
}
|
||||
nestedBucket := rootBucket.Bucket(k)
|
||||
if nestedBucket == nil {
|
||||
continue
|
||||
}
|
||||
if !hasAnyItem(nestedBucket) {
|
||||
result = append(result, bytes.Clone(k))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return result, last, nil
|
||||
}
|
||||
|
||||
func dropOwnerIDIndex(ctx context.Context, db *bbolt.DB, log func(a ...any)) error {
|
||||
return dropBucketsByPrefix(ctx, db, []byte{ownerPrefix}, func(a ...any) {
|
||||
log(append([]any{"owner ID index:"}, a...)...)
|
||||
|
|
Loading…
Reference in a new issue