Dmitrii Stepanov
b2769ca3de
Now move info stores in blobovnicza, so in case of failover rebuild completes previous operation first. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
85 lines
2 KiB
Go
85 lines
2 KiB
Go
package blobovnicza
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"go.etcd.io/bbolt"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
)
|
|
|
|
// GetPrm groups the parameters of Get operation.
|
|
type GetPrm struct {
|
|
addr oid.Address
|
|
}
|
|
|
|
// GetRes groups the resulting values of Get operation.
|
|
type GetRes struct {
|
|
obj []byte
|
|
}
|
|
|
|
// SetAddress sets the address of the requested object.
|
|
func (p *GetPrm) SetAddress(addr oid.Address) {
|
|
p.addr = addr
|
|
}
|
|
|
|
// Object returns binary representation of the requested object.
|
|
func (p GetRes) Object() []byte {
|
|
return p.obj
|
|
}
|
|
|
|
// special error for normal bbolt.Tx.ForEach interruption.
|
|
var errInterruptForEach = errors.New("interrupt for-each")
|
|
|
|
// Get reads an object from Blobovnicza by address.
|
|
//
|
|
// Returns any error encountered that
|
|
// did not allow to completely read the object.
|
|
//
|
|
// Returns an error of type apistatus.ObjectNotFound if the requested object is not
|
|
// presented in Blobovnicza.
|
|
func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
|
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.Get",
|
|
trace.WithAttributes(
|
|
attribute.String("path", b.path),
|
|
attribute.String("address", prm.addr.EncodeToString()),
|
|
))
|
|
defer span.End()
|
|
|
|
var (
|
|
data []byte
|
|
addrKey = addressKey(prm.addr)
|
|
)
|
|
|
|
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
|
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
|
|
if bytes.Equal(bucketName, incompletedMoveBucketName) {
|
|
return nil
|
|
}
|
|
|
|
data = buck.Get(addrKey)
|
|
if data == nil {
|
|
return nil
|
|
}
|
|
|
|
data = bytes.Clone(data)
|
|
|
|
return errInterruptForEach
|
|
})
|
|
}); err != nil && err != errInterruptForEach {
|
|
return GetRes{}, err
|
|
}
|
|
|
|
if data == nil {
|
|
return GetRes{}, new(apistatus.ObjectNotFound)
|
|
}
|
|
|
|
return GetRes{
|
|
obj: data,
|
|
}, nil
|
|
}
|