forked from TrueCloudLab/frostfs-node
160 lines
3.2 KiB
Go
160 lines
3.2 KiB
Go
package tui
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"go.etcd.io/bbolt"
|
|
)
|
|
|
|
type Item[T any] struct {
|
|
val T
|
|
err error
|
|
}
|
|
|
|
func resolvePath(tx *bbolt.Tx, path [][]byte) (*bbolt.Bucket, error) {
|
|
if len(path) == 0 {
|
|
return nil, errors.New("can't find bucket without path")
|
|
}
|
|
|
|
name := path[0]
|
|
bucket := tx.Bucket(name)
|
|
if bucket == nil {
|
|
return nil, fmt.Errorf("no bucket with name %s", name)
|
|
}
|
|
for _, name := range path[1:] {
|
|
bucket = bucket.Bucket(name)
|
|
if bucket == nil {
|
|
return nil, fmt.Errorf("no bucket with name %s", name)
|
|
}
|
|
}
|
|
return bucket, nil
|
|
}
|
|
|
|
func load[T any](
|
|
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
|
filter func(key, value []byte) bool, transform func(key, value []byte) T,
|
|
) (<-chan Item[T], error) {
|
|
buffer := make(chan Item[T], bufferSize)
|
|
|
|
go func() {
|
|
defer close(buffer)
|
|
|
|
err := db.View(func(tx *bbolt.Tx) error {
|
|
var cursor *bbolt.Cursor
|
|
if len(path) == 0 {
|
|
cursor = tx.Cursor()
|
|
} else {
|
|
bucket, err := resolvePath(tx, path)
|
|
if err != nil {
|
|
buffer <- Item[T]{err: fmt.Errorf("can't find bucket: %w", err)}
|
|
return nil
|
|
}
|
|
cursor = bucket.Cursor()
|
|
}
|
|
|
|
key, value := cursor.First()
|
|
for {
|
|
if key == nil {
|
|
return nil
|
|
}
|
|
if filter != nil && !filter(key, value) {
|
|
key, value = cursor.Next()
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case buffer <- Item[T]{val: transform(key, value)}:
|
|
key, value = cursor.Next()
|
|
}
|
|
}
|
|
})
|
|
if err != nil {
|
|
buffer <- Item[T]{err: err}
|
|
}
|
|
}()
|
|
|
|
return buffer, nil
|
|
}
|
|
|
|
func LoadBuckets(
|
|
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
|
) (<-chan Item[*Bucket], error) {
|
|
buffer, err := load(
|
|
ctx, db, path, bufferSize,
|
|
func(_, value []byte) bool {
|
|
return value == nil
|
|
},
|
|
func(key, _ []byte) *Bucket {
|
|
base := make([][]byte, 0, len(path))
|
|
base = append(base, path...)
|
|
|
|
return &Bucket{
|
|
Name: key,
|
|
Path: append(base, key),
|
|
}
|
|
},
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("can't start iterating bucket: %w", err)
|
|
}
|
|
|
|
return buffer, nil
|
|
}
|
|
|
|
func LoadRecords(
|
|
ctx context.Context, db *bbolt.DB, path [][]byte, bufferSize int,
|
|
) (<-chan Item[*Record], error) {
|
|
buffer, err := load(
|
|
ctx, db, path, bufferSize,
|
|
func(_, value []byte) bool {
|
|
return value != nil
|
|
},
|
|
func(key, value []byte) *Record {
|
|
base := make([][]byte, 0, len(path))
|
|
base = append(base, path...)
|
|
|
|
return &Record{
|
|
Key: key,
|
|
Value: value,
|
|
Path: append(base, key),
|
|
}
|
|
},
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("can't start iterating bucket: %w", err)
|
|
}
|
|
|
|
return buffer, nil
|
|
}
|
|
|
|
// HasBuckets checks if a bucket has nested buckets. It relies on assumption
|
|
// that a bucket can have either nested buckets or records but not both.
|
|
func HasBuckets(ctx context.Context, db *bbolt.DB, path [][]byte) (bool, error) {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
buffer, err := load(
|
|
ctx, db, path, 1,
|
|
nil,
|
|
func(_, value []byte) []byte { return value },
|
|
)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
x, ok := <-buffer
|
|
if !ok {
|
|
return false, nil
|
|
}
|
|
if x.err != nil {
|
|
return false, err
|
|
}
|
|
if x.val != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|