frostfs-node/pkg/local_object_storage/engine/put.go
Dmitrii Stepanov 3a441f072f
[#1709] shard: Check if context canceled for shard iteration
If context has already been canceled, then there is no need to check other shards.
At the same time, it is necessary to avoid handling context cancellation
in each handler. Therefore, the context check has been moved to the shard
iteration method, which now returns an error.

Change-Id: I70030ace36593ce7d2b8376bee39fe82e9dbf88f
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-04-21 15:20:50 +03:00

186 lines
5.1 KiB
Go

package engine
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
// PutPrm groups the parameters of Put operation.
type PutPrm struct {
Object *objectSDK.Object
IsIndexedContainer bool
}
var errPutShard = errors.New("could not put object to any shard")
type putToShardStatus byte
const (
putToShardUnknown putToShardStatus = iota
putToShardSuccess
putToShardExists
putToShardRemoved
)
type putToShardRes struct {
status putToShardStatus
err error
}
// Put saves the object to local storage.
//
// Returns any error encountered that
// did not allow to completely save the object.
//
// Returns an error if executions are blocked (see BlockExecution).
//
// Returns an error of type apistatus.ObjectAlreadyRemoved if the object has been marked as removed.
func (e *StorageEngine) Put(ctx context.Context, prm PutPrm) (err error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Put",
trace.WithAttributes(
attribute.String("address", object.AddressOf(prm.Object).EncodeToString()),
))
defer span.End()
defer elapsed("Put", e.metrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error {
err = e.put(ctx, prm)
return err
})
return
}
func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
addr := object.AddressOf(prm.Object)
// In #1146 this check was parallelized, however, it became
// much slower on fast machines for 4 shards.
var ecParent oid.Address
if prm.Object.ECHeader() != nil {
ecParent.SetObject(prm.Object.ECHeader().Parent())
ecParent.SetContainer(addr.Container())
}
var shPrm shard.ExistsPrm
shPrm.Address = addr
shPrm.ECParentAddress = ecParent
existed, locked, err := e.exists(ctx, shPrm)
if err != nil {
return err
}
if !existed && locked {
lockers, err := e.GetLocks(ctx, ecParent)
if err != nil {
return err
}
for _, locker := range lockers {
err = e.lock(ctx, addr.Container(), locker, []oid.ID{addr.Object()})
if err != nil {
return err
}
}
}
var shRes putToShardRes
if err := e.iterateOverSortedShards(ctx, addr, func(_ int, sh hashedShard) (stop bool) {
e.mtx.RLock()
_, ok := e.shards[sh.ID().String()]
e.mtx.RUnlock()
if !ok {
// Shard was concurrently removed, skip.
return false
}
shRes = e.putToShard(ctx, sh, addr, prm.Object, prm.IsIndexedContainer)
return shRes.status != putToShardUnknown
}); err != nil {
return err
}
switch shRes.status {
case putToShardUnknown:
return errPutShard
case putToShardRemoved:
return shRes.err
case putToShardExists, putToShardSuccess:
return nil
default:
return errPutShard
}
}
// putToShard puts object to sh.
// Return putToShardStatus and error if it is necessary to propagate an error upper.
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard,
addr oid.Address, obj *objectSDK.Object, isIndexedContainer bool,
) (res putToShardRes) {
var existPrm shard.ExistsPrm
existPrm.Address = addr
exists, err := sh.Exists(ctx, existPrm)
if err != nil {
if shard.IsErrObjectExpired(err) {
// object is already found but
// expired => do nothing with it
res.status = putToShardExists
} else {
e.log.Warn(ctx, logs.EngineCouldNotCheckObjectExistence,
zap.Stringer("shard_id", sh.ID()),
zap.Error(err))
}
return // this is not ErrAlreadyRemoved error so we can go to the next shard
}
if exists.Exists() {
res.status = putToShardExists
return
}
var putPrm shard.PutPrm
putPrm.SetObject(obj)
putPrm.SetIndexAttributes(isIndexedContainer)
_, err = sh.Put(ctx, putPrm)
if err != nil {
if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, blobstor.ErrNoPlaceFound) ||
errors.Is(err, common.ErrReadOnly) || errors.Is(err, common.ErrNoSpace) {
e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard,
zap.Stringer("shard_id", sh.ID()),
zap.Error(err))
return
}
if client.IsErrObjectAlreadyRemoved(err) {
e.log.Warn(ctx, logs.EngineCouldNotPutObjectToShard,
zap.Stringer("shard_id", sh.ID()),
zap.Error(err))
res.status = putToShardRemoved
res.err = err
return
}
e.reportShardError(ctx, sh, "could not put object to shard", err, zap.Stringer("address", addr))
return
}
res.status = putToShardSuccess
return
}
// Put writes provided object to local storage.
func Put(ctx context.Context, storage *StorageEngine, obj *objectSDK.Object, indexedContainer bool) error {
return storage.Put(ctx, PutPrm{Object: obj, IsIndexedContainer: indexedContainer})
}