frostfs-node/pkg/local_object_storage/shard/put.go
2024-12-18 15:52:26 +00:00

108 lines
3 KiB
Go

package shard
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
// PutPrm groups the parameters of Put operation.
type PutPrm struct {
obj *objectSDK.Object
indexAttributes bool
}
// PutRes groups the resulting values of Put operation.
type PutRes struct{}
// SetObject is a Put option to set object to save.
func (p *PutPrm) SetObject(obj *objectSDK.Object) {
p.obj = obj
}
func (p *PutPrm) SetIndexAttributes(v bool) {
p.indexAttributes = v
}
// Put saves the object in shard.
//
// Returns any error encountered that
// did not allow to completely save the object.
//
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
func (s *Shard) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Put",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("address", objectCore.AddressOf(prm.obj).EncodeToString()),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
m := s.info.Mode
if m.ReadOnly() {
return PutRes{}, ErrReadOnlyMode
}
data, err := prm.obj.Marshal()
if err != nil {
return PutRes{}, fmt.Errorf("cannot marshal object: %w", err)
}
var putPrm common.PutPrm // form Put parameters
putPrm.Object = prm.obj
putPrm.RawData = data
putPrm.Address = objectCore.AddressOf(prm.obj)
var res common.PutRes
// exist check are not performed there, these checks should be executed
// ahead of `Put` by storage engine
tryCache := s.hasWriteCache() && !m.NoMetabase()
if tryCache {
res, err = s.writeCache.Put(ctx, putPrm)
}
if err != nil || !tryCache {
if err != nil {
s.log.Debug(ctx, logs.ShardCantPutObjectToTheWritecacheTryingBlobstor,
zap.Error(err))
}
res, err = s.blobStor.Put(ctx, putPrm)
if err != nil {
return PutRes{}, fmt.Errorf("put object to BLOB storage: %w", err)
}
}
if !m.NoMetabase() {
var pPrm meta.PutPrm
pPrm.SetObject(prm.obj)
pPrm.SetStorageID(res.StorageID)
pPrm.SetIndexAttributes(prm.indexAttributes)
res, err := s.metaBase.Put(ctx, pPrm)
if err != nil {
// may we need to handle this case in a special way
// since the object has been successfully written to BlobStor
return PutRes{}, fmt.Errorf("put object to metabase: %w", err)
}
if res.Inserted {
s.incObjectCounter(putPrm.Address.Container(), meta.IsUserObject(prm.obj))
s.addToPayloadSize(int64(prm.obj.PayloadSize()))
s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize()))
}
}
return PutRes{}, nil
}