forked from TrueCloudLab/frostfs-node
Dmitrii Stepanov
5b8200de88
If blobovnicza contains objects larger than object size parameter value, then rebuild fails with an error, because there is no such bucket in database. This commit forces to create bucket on rebuild. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
109 lines
2.9 KiB
Go
109 lines
2.9 KiB
Go
package blobovnicza
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"go.etcd.io/bbolt"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
)
|
|
|
|
// PutPrm groups the parameters of Put operation.
|
|
type PutPrm struct {
|
|
addr oid.Address
|
|
|
|
objData []byte
|
|
|
|
force bool
|
|
}
|
|
|
|
// PutRes groups the resulting values of Put operation.
|
|
type PutRes struct{}
|
|
|
|
// SetAddress sets the address of the saving object.
|
|
func (p *PutPrm) SetAddress(addr oid.Address) {
|
|
p.addr = addr
|
|
}
|
|
|
|
// SetMarshaledObject sets binary representation of the object.
|
|
func (p *PutPrm) SetMarshaledObject(data []byte) {
|
|
p.objData = data
|
|
}
|
|
|
|
// SetForce sets force option.
|
|
func (p *PutPrm) SetForce(f bool) {
|
|
p.force = f
|
|
}
|
|
|
|
// Put saves an object in Blobovnicza.
|
|
//
|
|
// If binary representation of the object is not set,
|
|
// it is calculated via Marshal method.
|
|
//
|
|
// The size of the object MUST BE less that or equal to
|
|
// the size specified in WithObjectSizeLimit option.
|
|
//
|
|
// Returns any error encountered that
|
|
// did not allow to completely save the object.
|
|
//
|
|
// Returns ErrFull if blobovnicza is filled.
|
|
//
|
|
// Should not be called in read-only configuration.
|
|
func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
|
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.Put",
|
|
trace.WithAttributes(
|
|
attribute.String("path", b.path),
|
|
attribute.String("address", prm.addr.EncodeToString()),
|
|
attribute.Int("size", len(prm.objData)),
|
|
))
|
|
defer span.End()
|
|
|
|
sz := uint64(len(prm.objData))
|
|
bucketName := bucketForSize(sz)
|
|
key := addressKey(prm.addr)
|
|
recordSize := sz + uint64(len(key))
|
|
|
|
err := b.boltDB.Batch(func(tx *bbolt.Tx) error {
|
|
buck := tx.Bucket(bucketName)
|
|
if buck == nil {
|
|
// expected to happen:
|
|
// - before initialization step (incorrect usage by design)
|
|
// - if DB is corrupted (in future this case should be handled)
|
|
// - blobovnicza's object size changed before rebuild (handled if prm.force flag specified)
|
|
if !prm.force {
|
|
return logicerr.Wrap(fmt.Errorf("(%T) bucket for size %d not created", b, sz))
|
|
}
|
|
var err error
|
|
buck, err = tx.CreateBucket(bucketName)
|
|
if err != nil {
|
|
return fmt.Errorf("(%T) failed to create bucket for size %d: %w", b, sz, err)
|
|
}
|
|
}
|
|
|
|
// save the object in bucket
|
|
if err := buck.Put(key, prm.objData); err != nil {
|
|
return fmt.Errorf("(%T) could not save object in bucket: %w", b, err)
|
|
}
|
|
|
|
return updateMeta(tx, func(count, size uint64) (uint64, uint64) {
|
|
return count + 1, size + recordSize
|
|
})
|
|
})
|
|
if err == nil {
|
|
b.itemAdded(recordSize)
|
|
}
|
|
|
|
return PutRes{}, err
|
|
}
|
|
|
|
func addressKey(addr oid.Address) []byte {
|
|
return []byte(addr.EncodeToString())
|
|
}
|
|
|
|
func addressFromKey(dst *oid.Address, data []byte) error {
|
|
return dst.DecodeString(string(data))
|
|
}
|