diff --git a/pool/pool.go b/pool/pool.go index 384fa2f..4b7bfe5 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -772,6 +772,9 @@ func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (obje var cliPrm sdkClient.PrmObjectHead cliPrm.FromContainer(prm.addr.Container()) cliPrm.ByID(prm.addr.Object()) + if prm.raw { + cliPrm.MarkRaw() + } if prm.stoken != nil { cliPrm.WithinSession(*prm.stoken) @@ -1128,7 +1131,7 @@ type prmContext struct { cnr cid.ID objSet bool - obj oid.ID + objs []oid.ID } func (x *prmContext) useDefaultSession() { @@ -1139,9 +1142,14 @@ func (x *prmContext) useContainer(cnr cid.ID) { x.cnr = cnr } +func (x *prmContext) useObjects(ids []oid.ID) { + x.objs = ids + x.objSet = true +} + func (x *prmContext) useAddress(addr oid.Address) { x.cnr = addr.Container() - x.obj = addr.Object() + x.objs = []oid.ID{addr.Object()} x.objSet = true } @@ -1227,6 +1235,7 @@ type PrmObjectHead struct { prmCommon addr oid.Address + raw bool } // SetAddress specifies NeoFS address of the object. @@ -1234,6 +1243,11 @@ func (x *PrmObjectHead) SetAddress(addr oid.Address) { x.addr = addr } +// MarkRaw marks an intent to read physically stored object. +func (x *PrmObjectHead) MarkRaw() { + x.raw = true +} + // PrmObjectRange groups parameters of RangeObject operation. type PrmObjectRange struct { prmCommon @@ -1852,7 +1866,7 @@ type callContext struct { sessionVerb session.ObjectVerb sessionCnr cid.ID sessionObjSet bool - sessionObj oid.ID + sessionObjs []oid.ID } func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error { @@ -1880,7 +1894,7 @@ func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContex ctx.sessionVerb = prmCtx.verb ctx.sessionCnr = prmCtx.cnr ctx.sessionObjSet = prmCtx.objSet - ctx.sessionObj = prmCtx.obj + ctx.sessionObjs = prmCtx.objs } return err @@ -1907,7 +1921,7 @@ func (p *Pool) openDefaultSession(ctx *callContext) error { tok.BindContainer(ctx.sessionCnr) if ctx.sessionObjSet { - tok.LimitByObjects(ctx.sessionObj) + tok.LimitByObjects(ctx.sessionObjs...) } // sign the token @@ -1993,6 +2007,19 @@ func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error { prmCtx.useVerb(session.VerbObjectDelete) prmCtx.useAddress(prm.addr) + if prm.stoken == nil { + // collect phy objects only if we are about to open default session + relatives, err := p.collectObjectRelatives(ctx, prm.addr.Container(), prm.addr.Object(), prm.btoken) + if err != nil { + return fmt.Errorf("failed to collect relatives: %w", err) + } + + if len(relatives) != 0 { + prmCtx.useContainer(prm.addr.Container()) + prmCtx.useObjects(append(relatives, prm.addr.Object())) + } + } + p.fillAppropriateKey(&prm.prmCommon) var cc callContext @@ -2014,6 +2041,154 @@ func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error { }) } +func (p *Pool) collectObjectRelatives(ctx context.Context, cnr cid.ID, obj oid.ID, btoken *bearer.Token) ([]oid.ID, error) { + var addrObj oid.Address + addrObj.SetContainer(cnr) + addrObj.SetObject(obj) + + var prmHead PrmObjectHead + prmHead.SetAddress(addrObj) + if btoken != nil { + prmHead.UseBearer(*btoken) + } + prmHead.MarkRaw() + + _, err := p.HeadObject(ctx, prmHead) + + var errSplit *object.SplitInfoError + + switch { + default: + return nil, fmt.Errorf("failed to get raw object header: %w", err) + case err == nil: + return nil, nil + case errors.As(err, &errSplit): + } + + splitInfo := errSplit.SplitInfo() + + // collect split chain by the descending ease of operations (ease is evaluated heuristically). + // If any approach fails, we don't try the next since we assume that it will fail too. + + if idLinking, ok := splitInfo.Link(); ok { + addrObj = oid.Address{} + addrObj.SetContainer(cnr) + addrObj.SetObject(idLinking) + + prmHead = PrmObjectHead{} + prmHead.SetAddress(addrObj) + if btoken != nil { + prmHead.UseBearer(*btoken) + } + + res, err := p.HeadObject(ctx, prmHead) + if err != nil { + return nil, fmt.Errorf("failed to get linking object's header: %w", err) + } + + children := res.Children() + + // include linking object + return append(children, idLinking), nil + } + + if idSplit := splitInfo.SplitID(); idSplit != nil { + var query object.SearchFilters + query.AddSplitIDFilter(object.MatchStringEqual, idSplit) + + var prm PrmObjectSearch + prm.SetContainerID(cnr) + prm.SetFilters(query) + if btoken != nil { + prm.UseBearer(*btoken) + } + + res, err := p.SearchObjects(ctx, prm) + if err != nil { + return nil, fmt.Errorf("failed to search objects by split ID: %w", err) + } + + var members []oid.ID + err = res.Iterate(func(id oid.ID) bool { + members = append(members, id) + return false + }) + if err != nil { + return nil, fmt.Errorf("failed to iterate found objects: %w", err) + } + + return members, nil + } + + idMember, ok := splitInfo.LastPart() + if !ok { + return nil, errors.New("missing any data in received object split information") + } + + var res object.Object + chain := []oid.ID{idMember} + chainSet := map[oid.ID]struct{}{idMember: {}} + + addrObj = oid.Address{} + addrObj.SetContainer(cnr) + + for { + addrObj.SetObject(idMember) + + prmHead = PrmObjectHead{} + prmHead.SetAddress(addrObj) + if btoken != nil { + prmHead.UseBearer(*btoken) + } + + res, err = p.HeadObject(ctx, prmHead) + if err != nil { + return nil, fmt.Errorf("failed to read split chain member's header: %w", err) + } + + idMember, ok = res.PreviousID() + if !ok { + break + } + + if _, ok = chainSet[idMember]; ok { + return nil, fmt.Errorf("duplicated member in the split chain: %s", idMember) + } + + chain = append(chain, idMember) + chainSet[idMember] = struct{}{} + } + + // Looking for a linking object + + var query object.SearchFilters + query.AddParentIDFilter(object.MatchStringEqual, obj) + + var prm PrmObjectSearch + prm.SetContainerID(cnr) + prm.SetFilters(query) + if btoken != nil { + prm.UseBearer(*btoken) + } + + resSearch, err := p.SearchObjects(ctx, prm) + if err != nil { + return nil, fmt.Errorf("failed to find object children: %w", err) + } + + err = resSearch.Iterate(func(id oid.ID) bool { + if _, ok = chainSet[id]; !ok { + chain = append(chain, id) + } + return false + }) + if err != nil { + return nil, fmt.Errorf("failed to iterate found objects: %w", err) + } + + return chain, nil +} + type objectReadCloser struct { reader *sdkClient.ObjectReader elapsedTimeCallback func(time.Duration)