frostfs-node/pkg/local_object_storage/shard/put.go

88 lines
2.3 KiB
Go
Raw Permalink Normal View History

package shard
import (
"fmt"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.uber.org/zap"
)
// PutPrm groups the parameters of Put operation.
type PutPrm struct {
obj *object.Object
}
// PutRes groups the resulting values of Put operation.
type PutRes struct{}
// SetObject is a Put option to set object to save.
func (p *PutPrm) SetObject(obj *object.Object) {
p.obj = obj
}
// Put saves the object in shard.
//
// Returns any error encountered that
// did not allow to completely save the object.
//
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
func (s *Shard) Put(prm PutPrm) (PutRes, error) {
s.m.RLock()
defer s.m.RUnlock()
m := s.info.Mode
if m.ReadOnly() {
return PutRes{}, ErrReadOnlyMode
}
data, err := prm.obj.Marshal()
if err != nil {
return PutRes{}, fmt.Errorf("cannot marshal object: %w", err)
}
var putPrm common.PutPrm // form Put parameters
putPrm.Object = prm.obj
putPrm.RawData = data
putPrm.Address = objectCore.AddressOf(prm.obj)
var res common.PutRes
// exist check are not performed there, these checks should be executed
// ahead of `Put` by storage engine
tryCache := s.hasWriteCache() && !m.NoMetabase()
if tryCache {
res, err = s.writeCache.Put(putPrm)
}
if err != nil || !tryCache {
if err != nil {
s.log.Debug("can't put object to the write-cache, trying blobstor",
zap.String("err", err.Error()))
}
res, err = s.blobStor.Put(putPrm)
if err != nil {
return PutRes{}, fmt.Errorf("could not put object to BLOB storage: %w", err)
}
}
if !m.NoMetabase() {
var pPrm meta.PutPrm
pPrm.SetObject(prm.obj)
pPrm.SetStorageID(res.StorageID)
if _, err := s.metaBase.Put(pPrm); err != nil {
// may we need to handle this case in a special way
// since the object has been successfully written to BlobStor
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
}
s.incObjectCounter()
s.addToPayloadSize(int64(prm.obj.PayloadSize()))
s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize()))
}
return PutRes{}, nil
}