frostfs-node/pkg/local_object_storage/blobovnicza/put.go
Dmitrii Stepanov 5b8200de88 [#984] blobovnicza: Do not fail rebuild on big objects
If blobovnicza contains objects larger than object size parameter
value, then rebuild fails with an error, because there is no such
bucket in database. This commit forces to create bucket on rebuild.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-04-09 11:51:18 +00:00

109 lines
2.9 KiB
Go

package blobovnicza
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// PutPrm groups the parameters of Put operation.
type PutPrm struct {
addr oid.Address
objData []byte
force bool
}
// PutRes groups the resulting values of Put operation.
type PutRes struct{}
// SetAddress sets the address of the saving object.
func (p *PutPrm) SetAddress(addr oid.Address) {
p.addr = addr
}
// SetMarshaledObject sets binary representation of the object.
func (p *PutPrm) SetMarshaledObject(data []byte) {
p.objData = data
}
// SetForce sets force option.
func (p *PutPrm) SetForce(f bool) {
p.force = f
}
// Put saves an object in Blobovnicza.
//
// If binary representation of the object is not set,
// it is calculated via Marshal method.
//
// The size of the object MUST BE less that or equal to
// the size specified in WithObjectSizeLimit option.
//
// Returns any error encountered that
// did not allow to completely save the object.
//
// Returns ErrFull if blobovnicza is filled.
//
// Should not be called in read-only configuration.
func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.Put",
trace.WithAttributes(
attribute.String("path", b.path),
attribute.String("address", prm.addr.EncodeToString()),
attribute.Int("size", len(prm.objData)),
))
defer span.End()
sz := uint64(len(prm.objData))
bucketName := bucketForSize(sz)
key := addressKey(prm.addr)
recordSize := sz + uint64(len(key))
err := b.boltDB.Batch(func(tx *bbolt.Tx) error {
buck := tx.Bucket(bucketName)
if buck == nil {
// expected to happen:
// - before initialization step (incorrect usage by design)
// - if DB is corrupted (in future this case should be handled)
// - blobovnicza's object size changed before rebuild (handled if prm.force flag specified)
if !prm.force {
return logicerr.Wrap(fmt.Errorf("(%T) bucket for size %d not created", b, sz))
}
var err error
buck, err = tx.CreateBucket(bucketName)
if err != nil {
return fmt.Errorf("(%T) failed to create bucket for size %d: %w", b, sz, err)
}
}
// save the object in bucket
if err := buck.Put(key, prm.objData); err != nil {
return fmt.Errorf("(%T) could not save object in bucket: %w", b, err)
}
return updateMeta(tx, func(count, size uint64) (uint64, uint64) {
return count + 1, size + recordSize
})
})
if err == nil {
b.itemAdded(recordSize)
}
return PutRes{}, err
}
func addressKey(addr oid.Address) []byte {
return []byte(addr.EncodeToString())
}
func addressFromKey(dst *oid.Address, data []byte) error {
return dst.DecodeString(string(data))
}