forked from TrueCloudLab/frostfs-node
Dmitrii Stepanov
7e0c5a55de
Now UpdateStorageID doesn't return error in case of logical error. If object is in graveyard or GC market, it is still required to update storage ID. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
121 lines
3.2 KiB
Go
121 lines
3.2 KiB
Go
package meta
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
|
"go.etcd.io/bbolt"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
)
|
|
|
|
// StorageIDPrm groups the parameters of StorageID operation.
|
|
type StorageIDPrm struct {
|
|
addr oid.Address
|
|
}
|
|
|
|
// StorageIDRes groups the resulting values of StorageID operation.
|
|
type StorageIDRes struct {
|
|
id []byte
|
|
}
|
|
|
|
// SetAddress is a StorageID option to set the object address to check.
|
|
func (p *StorageIDPrm) SetAddress(addr oid.Address) {
|
|
p.addr = addr
|
|
}
|
|
|
|
// StorageID returns storage ID.
|
|
func (r StorageIDRes) StorageID() []byte {
|
|
return r.id
|
|
}
|
|
|
|
// StorageID returns storage descriptor for objects from the blobstor.
|
|
// It is put together with the object can makes get/delete operation faster.
|
|
func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes, err error) {
|
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.StorageID",
|
|
trace.WithAttributes(
|
|
attribute.String("address", prm.addr.EncodeToString()),
|
|
))
|
|
defer span.End()
|
|
|
|
db.modeMtx.RLock()
|
|
defer db.modeMtx.RUnlock()
|
|
|
|
if db.mode.NoMetabase() {
|
|
return res, ErrDegradedMode
|
|
}
|
|
|
|
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
|
res.id, err = db.storageID(tx, prm.addr)
|
|
|
|
return err
|
|
})
|
|
|
|
return res, metaerr.Wrap(err)
|
|
}
|
|
|
|
func (db *DB) storageID(tx *bbolt.Tx, addr oid.Address) ([]byte, error) {
|
|
key := make([]byte, bucketKeySize)
|
|
smallBucket := tx.Bucket(smallBucketName(addr.Container(), key))
|
|
if smallBucket == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
storageID := smallBucket.Get(objectKey(addr.Object(), key))
|
|
if storageID == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
return slice.Copy(storageID), nil
|
|
}
|
|
|
|
// UpdateStorageIDPrm groups the parameters of UpdateStorageID operation.
|
|
type UpdateStorageIDPrm struct {
|
|
addr oid.Address
|
|
id []byte
|
|
}
|
|
|
|
// UpdateStorageIDRes groups the resulting values of UpdateStorageID operation.
|
|
type UpdateStorageIDRes struct{}
|
|
|
|
// SetAddress is an UpdateStorageID option to set the object address to check.
|
|
func (p *UpdateStorageIDPrm) SetAddress(addr oid.Address) {
|
|
p.addr = addr
|
|
}
|
|
|
|
// SetStorageID is an UpdateStorageID option to set the storage ID.
|
|
func (p *UpdateStorageIDPrm) SetStorageID(id []byte) {
|
|
p.id = id
|
|
}
|
|
|
|
// UpdateStorageID updates storage descriptor for objects from the blobstor.
|
|
func (db *DB) UpdateStorageID(prm UpdateStorageIDPrm) (res UpdateStorageIDRes, err error) {
|
|
db.modeMtx.RLock()
|
|
defer db.modeMtx.RUnlock()
|
|
|
|
if db.mode.NoMetabase() {
|
|
return res, ErrDegradedMode
|
|
} else if db.mode.ReadOnly() {
|
|
return res, ErrReadOnlyMode
|
|
}
|
|
|
|
currEpoch := db.epochState.CurrentEpoch()
|
|
|
|
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
|
exists, err := db.exists(tx, prm.addr, currEpoch)
|
|
if err == nil && exists {
|
|
err = updateStorageID(tx, prm.addr, prm.id)
|
|
} else if errors.As(err, new(logicerr.Logical)) {
|
|
err = updateStorageID(tx, prm.addr, prm.id)
|
|
}
|
|
|
|
return err
|
|
})
|
|
|
|
return res, metaerr.Wrap(err)
|
|
}
|