frostfs-node/pkg/services/object/delete/exec.go

233 lines
5.6 KiB
Go
Raw Permalink Normal View History

package deletesvc
import (
"context"
"errors"
"fmt"
"strconv"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apiclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
var errDeleteECChunk = errors.New("invalid operation: delete EC object chunk")
type execCtx struct {
svc *Service
prm Prm
log *logger.Logger
tombstone *objectSDK.Tombstone
splitInfo *objectSDK.SplitInfo
tombstoneObj *objectSDK.Object
}
func (exec *execCtx) setLogger(l *logger.Logger) {
exec.log = &logger.Logger{Logger: l.With(
zap.String("request", "DELETE"),
zap.Stringer("address", exec.address()),
zap.Bool("local", exec.isLocal()),
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
)}
}
func (exec *execCtx) isLocal() bool {
return exec.prm.common.LocalOnly()
}
func (exec *execCtx) address() oid.Address {
return exec.prm.addr
}
func (exec *execCtx) containerID() cid.ID {
return exec.prm.addr.Container()
}
func (exec *execCtx) commonParameters() *util.CommonPrm {
return exec.prm.common
}
func (exec *execCtx) newAddress(id oid.ID) oid.Address {
var a oid.Address
a.SetObject(id)
a.SetContainer(exec.containerID())
return a
}
func (exec *execCtx) formExtendedInfo(ctx context.Context) error {
obj, err := exec.svc.header.head(ctx, exec)
var errSplitInfo *objectSDK.SplitInfoError
var errECInfo *objectSDK.ECInfoError
switch {
case err == nil:
if ech := obj.ECHeader(); ech != nil {
return errDeleteECChunk
}
return nil
case errors.As(err, &errSplitInfo):
exec.splitInfo = errSplitInfo.SplitInfo()
exec.tombstone.SetSplitID(exec.splitInfo.SplitID())
exec.log.Debug(logs.DeleteSplitInfoSuccessfullyFormedCollectingMembers)
if err := exec.collectMembers(ctx); err != nil {
return err
}
exec.log.Debug(logs.DeleteMembersSuccessfullyCollected)
return nil
case errors.As(err, &errECInfo):
exec.log.Debug(logs.DeleteECObjectReceived)
return nil
}
if !apiclient.IsErrObjectAlreadyRemoved(err) {
// IsErrObjectAlreadyRemoved check is required because splitInfo
// implicitly performs Head request that may return ObjectAlreadyRemoved
// status that is not specified for Delete.
return err
}
return nil
}
func (exec *execCtx) collectMembers(ctx context.Context) error {
if exec.splitInfo == nil {
exec.log.Debug(logs.DeleteNoSplitInfoObjectIsPHY)
return nil
}
var err error
if _, withLink := exec.splitInfo.Link(); withLink {
err = exec.collectChildren(ctx)
}
if err != nil {
if _, withLast := exec.splitInfo.LastPart(); withLast {
if err := exec.collectChain(ctx); err != nil {
return err
}
}
} // may be fail if neither right nor linking ID is set?
return exec.supplementBySplitID(ctx)
}
func (exec *execCtx) collectChain(ctx context.Context) error {
var chain []oid.ID
exec.log.Debug(logs.DeleteAssemblingChain)
for prev, withPrev := exec.splitInfo.LastPart(); withPrev; {
chain = append(chain, prev)
p, err := exec.svc.header.previous(ctx, exec, prev)
if err != nil {
return fmt.Errorf("get previous split element for %s: %w", prev, err)
}
withPrev = p != nil
if withPrev {
prev = *p
}
}
exec.addMembers(chain)
return nil
}
func (exec *execCtx) collectChildren(ctx context.Context) error {
exec.log.Debug(logs.DeleteCollectingChildren)
children, err := exec.svc.header.children(ctx, exec)
if err != nil {
return fmt.Errorf("collect children: %w", err)
}
link, _ := exec.splitInfo.Link()
exec.addMembers(append(children, link))
return nil
}
func (exec *execCtx) supplementBySplitID(ctx context.Context) error {
exec.log.Debug(logs.DeleteSupplementBySplitID)
chain, err := exec.svc.searcher.splitMembers(ctx, exec)
if err != nil {
return fmt.Errorf("search split chain members: %w", err)
}
exec.addMembers(chain)
return nil
}
func (exec *execCtx) addMembers(incoming []oid.ID) {
members := exec.tombstone.Members()
for i := range members {
for j := 0; j < len(incoming); j++ { // don't use range, slice mutates in body
if members[i].Equals(incoming[j]) {
incoming = append(incoming[:j], incoming[j+1:]...)
j--
}
}
}
exec.tombstone.SetMembers(append(members, incoming...))
}
func (exec *execCtx) initTombstoneObject() error {
payload, err := exec.tombstone.Marshal()
if err != nil {
return fmt.Errorf("marshal tombstone: %w", err)
}
exec.tombstoneObj = objectSDK.New()
exec.tombstoneObj.SetContainerID(exec.containerID())
exec.tombstoneObj.SetType(objectSDK.TypeTombstone)
exec.tombstoneObj.SetPayload(payload)
tokenSession := exec.commonParameters().SessionToken()
if tokenSession != nil {
issuer := tokenSession.Issuer()
exec.tombstoneObj.SetOwnerID(issuer)
} else {
// make local node a tombstone object owner
localUser := exec.svc.netInfo.LocalNodeID()
exec.tombstoneObj.SetOwnerID(localUser)
}
var a objectSDK.Attribute
a.SetKey(objectV2.SysAttributeExpEpoch)
a.SetValue(strconv.FormatUint(exec.tombstone.ExpirationEpoch(), 10))
exec.tombstoneObj.SetAttributes(a)
return nil
}
func (exec *execCtx) saveTombstone(ctx context.Context) error {
id, err := exec.svc.placer.put(ctx, exec)
if err != nil {
return fmt.Errorf("save tombstone: %w", err)
}
exec.prm.tombAddrWriter.SetAddress(exec.newAddress(*id))
return nil
}