package deletesvc import ( "context" "strconv" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" apiclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) type statusError struct { status int err error } type execCtx struct { svc *Service prm Prm statusError log *logger.Logger tombstone *object.Tombstone splitInfo *object.SplitInfo tombstoneObj *object.Object } const ( statusUndefined int = iota statusOK ) func (exec *execCtx) setLogger(l *logger.Logger) { exec.log = &logger.Logger{Logger: l.With( zap.String("request", "DELETE"), zap.Stringer("address", exec.address()), zap.Bool("local", exec.isLocal()), zap.Bool("with session", exec.prm.common.SessionToken() != nil), zap.Bool("with bearer", exec.prm.common.BearerToken() != nil), )} } func (exec execCtx) isLocal() bool { return exec.prm.common.LocalOnly() } func (exec *execCtx) address() oid.Address { return exec.prm.addr } func (exec *execCtx) containerID() cid.ID { return exec.prm.addr.Container() } func (exec *execCtx) commonParameters() *util.CommonPrm { return exec.prm.common } func (exec *execCtx) newAddress(id oid.ID) oid.Address { var a oid.Address a.SetObject(id) a.SetContainer(exec.containerID()) return a } func (exec *execCtx) formSplitInfo(ctx context.Context) bool { success := false var err error exec.splitInfo, err = exec.svc.header.splitInfo(ctx, exec) switch { default: exec.status = statusUndefined exec.err = err exec.log.Debug(logs.DeleteCouldNotComposeSplitInfo, zap.String("error", err.Error()), ) case err == nil, apiclient.IsErrObjectAlreadyRemoved(err): // IsErrObjectAlreadyRemoved check is required because splitInfo // implicitly performs Head request that may return ObjectAlreadyRemoved // status that is not specified for Delete exec.status = statusOK exec.err = nil success = true } return success } func (exec *execCtx) collectMembers(ctx context.Context) (ok bool) { if exec.splitInfo == nil { exec.log.Debug(logs.DeleteNoSplitInfoObjectIsPHY) return true } if _, withLink := exec.splitInfo.Link(); withLink { ok = exec.collectChildren(ctx) } if !ok { if _, withLast := exec.splitInfo.LastPart(); withLast { ok = exec.collectChain(ctx) if !ok { return } } } // may be fail if neither right nor linking ID is set? return exec.supplementBySplitID(ctx) } func (exec *execCtx) collectChain(ctx context.Context) bool { var chain []oid.ID exec.log.Debug(logs.DeleteAssemblingChain) for prev, withPrev := exec.splitInfo.LastPart(); withPrev; { chain = append(chain, prev) p, err := exec.svc.header.previous(ctx, exec, prev) switch { default: exec.status = statusUndefined exec.err = err exec.log.Debug(logs.DeleteCouldNotGetPreviousSplitElement, zap.Stringer("id", prev), zap.String("error", err.Error()), ) return false case err == nil: exec.status = statusOK exec.err = nil withPrev = p != nil if withPrev { prev = *p } } } exec.addMembers(chain) return true } func (exec *execCtx) collectChildren(ctx context.Context) bool { exec.log.Debug(logs.DeleteCollectingChildren) children, err := exec.svc.header.children(ctx, exec) switch { default: exec.status = statusUndefined exec.err = err exec.log.Debug(logs.DeleteCouldNotCollectObjectChildren, zap.String("error", err.Error()), ) return false case err == nil: exec.status = statusOK exec.err = nil link, _ := exec.splitInfo.Link() exec.addMembers(append(children, link)) return true } } func (exec *execCtx) supplementBySplitID(ctx context.Context) bool { exec.log.Debug(logs.DeleteSupplementBySplitID) chain, err := exec.svc.searcher.splitMembers(ctx, exec) switch { default: exec.status = statusUndefined exec.err = err exec.log.Debug(logs.DeleteCouldNotSearchForSplitChainMembers, zap.String("error", err.Error()), ) return false case err == nil: exec.status = statusOK exec.err = nil exec.addMembers(chain) return true } } func (exec *execCtx) addMembers(incoming []oid.ID) { members := exec.tombstone.Members() for i := range members { for j := 0; j < len(incoming); j++ { // don't use range, slice mutates in body if members[i].Equals(incoming[j]) { incoming = append(incoming[:j], incoming[j+1:]...) j-- } } } exec.tombstone.SetMembers(append(members, incoming...)) } func (exec *execCtx) initTombstoneObject() bool { payload, err := exec.tombstone.Marshal() if err != nil { exec.status = statusUndefined exec.err = err exec.log.Debug(logs.DeleteCouldNotMarshalTombstoneStructure, zap.String("error", err.Error()), ) return false } exec.tombstoneObj = object.New() exec.tombstoneObj.SetContainerID(exec.containerID()) exec.tombstoneObj.SetType(object.TypeTombstone) exec.tombstoneObj.SetPayload(payload) tokenSession := exec.commonParameters().SessionToken() if tokenSession != nil { issuer := tokenSession.Issuer() exec.tombstoneObj.SetOwnerID(&issuer) } else { // make local node a tombstone object owner localUser := exec.svc.netInfo.LocalNodeID() exec.tombstoneObj.SetOwnerID(&localUser) } var a object.Attribute a.SetKey(objectV2.SysAttributeExpEpoch) a.SetValue(strconv.FormatUint(exec.tombstone.ExpirationEpoch(), 10)) exec.tombstoneObj.SetAttributes(a) return true } func (exec *execCtx) saveTombstone(ctx context.Context) bool { id, err := exec.svc.placer.put(ctx, exec) switch { default: exec.status = statusUndefined exec.err = err exec.log.Debug(logs.DeleteCouldNotSaveTheTombstone, zap.String("error", err.Error()), ) return false case err == nil: exec.status = statusOK exec.err = nil exec.prm.tombAddrWriter. SetAddress(exec.newAddress(*id)) } return true }