[#360] pool: Fix removing virtual object

Include phy objects in session token scope

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
remotes/KirillovDenis/feature/362-drop_session_for_read_opreations_in_pool
Denis Kirillov 2022-11-11 13:17:26 +03:00 committed by Alex Vanin
parent e35f0df1ca
commit 1cacf472a3
1 changed files with 180 additions and 5 deletions

View File

@ -772,6 +772,9 @@ func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (obje
var cliPrm sdkClient.PrmObjectHead
cliPrm.FromContainer(prm.addr.Container())
cliPrm.ByID(prm.addr.Object())
if prm.raw {
cliPrm.MarkRaw()
}
if prm.stoken != nil {
cliPrm.WithinSession(*prm.stoken)
@ -1128,7 +1131,7 @@ type prmContext struct {
cnr cid.ID
objSet bool
obj oid.ID
objs []oid.ID
}
func (x *prmContext) useDefaultSession() {
@ -1139,9 +1142,14 @@ func (x *prmContext) useContainer(cnr cid.ID) {
x.cnr = cnr
}
func (x *prmContext) useObjects(ids []oid.ID) {
x.objs = ids
x.objSet = true
}
func (x *prmContext) useAddress(addr oid.Address) {
x.cnr = addr.Container()
x.obj = addr.Object()
x.objs = []oid.ID{addr.Object()}
x.objSet = true
}
@ -1227,6 +1235,7 @@ type PrmObjectHead struct {
prmCommon
addr oid.Address
raw bool
}
// SetAddress specifies NeoFS address of the object.
@ -1234,6 +1243,11 @@ func (x *PrmObjectHead) SetAddress(addr oid.Address) {
x.addr = addr
}
// MarkRaw marks an intent to read physically stored object.
func (x *PrmObjectHead) MarkRaw() {
x.raw = true
}
// PrmObjectRange groups parameters of RangeObject operation.
type PrmObjectRange struct {
prmCommon
@ -1852,7 +1866,7 @@ type callContext struct {
sessionVerb session.ObjectVerb
sessionCnr cid.ID
sessionObjSet bool
sessionObj oid.ID
sessionObjs []oid.ID
}
func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContext) error {
@ -1880,7 +1894,7 @@ func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContex
ctx.sessionVerb = prmCtx.verb
ctx.sessionCnr = prmCtx.cnr
ctx.sessionObjSet = prmCtx.objSet
ctx.sessionObj = prmCtx.obj
ctx.sessionObjs = prmCtx.objs
}
return err
@ -1907,7 +1921,7 @@ func (p *Pool) openDefaultSession(ctx *callContext) error {
tok.BindContainer(ctx.sessionCnr)
if ctx.sessionObjSet {
tok.LimitByObjects(ctx.sessionObj)
tok.LimitByObjects(ctx.sessionObjs...)
}
// sign the token
@ -1993,6 +2007,19 @@ func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
prmCtx.useVerb(session.VerbObjectDelete)
prmCtx.useAddress(prm.addr)
if prm.stoken == nil {
// collect phy objects only if we are about to open default session
relatives, err := p.collectObjectRelatives(ctx, prm.addr.Container(), prm.addr.Object(), prm.btoken)
if err != nil {
return fmt.Errorf("failed to collect relatives: %w", err)
}
if len(relatives) != 0 {
prmCtx.useContainer(prm.addr.Container())
prmCtx.useObjects(append(relatives, prm.addr.Object()))
}
}
p.fillAppropriateKey(&prm.prmCommon)
var cc callContext
@ -2014,6 +2041,154 @@ func (p *Pool) DeleteObject(ctx context.Context, prm PrmObjectDelete) error {
})
}
func (p *Pool) collectObjectRelatives(ctx context.Context, cnr cid.ID, obj oid.ID, btoken *bearer.Token) ([]oid.ID, error) {
var addrObj oid.Address
addrObj.SetContainer(cnr)
addrObj.SetObject(obj)
var prmHead PrmObjectHead
prmHead.SetAddress(addrObj)
if btoken != nil {
prmHead.UseBearer(*btoken)
}
prmHead.MarkRaw()
_, err := p.HeadObject(ctx, prmHead)
var errSplit *object.SplitInfoError
switch {
default:
return nil, fmt.Errorf("failed to get raw object header: %w", err)
case err == nil:
return nil, nil
case errors.As(err, &errSplit):
}
splitInfo := errSplit.SplitInfo()
// collect split chain by the descending ease of operations (ease is evaluated heuristically).
// If any approach fails, we don't try the next since we assume that it will fail too.
if idLinking, ok := splitInfo.Link(); ok {
addrObj = oid.Address{}
addrObj.SetContainer(cnr)
addrObj.SetObject(idLinking)
prmHead = PrmObjectHead{}
prmHead.SetAddress(addrObj)
if btoken != nil {
prmHead.UseBearer(*btoken)
}
res, err := p.HeadObject(ctx, prmHead)
if err != nil {
return nil, fmt.Errorf("failed to get linking object's header: %w", err)
}
children := res.Children()
// include linking object
return append(children, idLinking), nil
}
if idSplit := splitInfo.SplitID(); idSplit != nil {
var query object.SearchFilters
query.AddSplitIDFilter(object.MatchStringEqual, idSplit)
var prm PrmObjectSearch
prm.SetContainerID(cnr)
prm.SetFilters(query)
if btoken != nil {
prm.UseBearer(*btoken)
}
res, err := p.SearchObjects(ctx, prm)
if err != nil {
return nil, fmt.Errorf("failed to search objects by split ID: %w", err)
}
var members []oid.ID
err = res.Iterate(func(id oid.ID) bool {
members = append(members, id)
return false
})
if err != nil {
return nil, fmt.Errorf("failed to iterate found objects: %w", err)
}
return members, nil
}
idMember, ok := splitInfo.LastPart()
if !ok {
return nil, errors.New("missing any data in received object split information")
}
var res object.Object
chain := []oid.ID{idMember}
chainSet := map[oid.ID]struct{}{idMember: {}}
addrObj = oid.Address{}
addrObj.SetContainer(cnr)
for {
addrObj.SetObject(idMember)
prmHead = PrmObjectHead{}
prmHead.SetAddress(addrObj)
if btoken != nil {
prmHead.UseBearer(*btoken)
}
res, err = p.HeadObject(ctx, prmHead)
if err != nil {
return nil, fmt.Errorf("failed to read split chain member's header: %w", err)
}
idMember, ok = res.PreviousID()
if !ok {
break
}
if _, ok = chainSet[idMember]; ok {
return nil, fmt.Errorf("duplicated member in the split chain: %s", idMember)
}
chain = append(chain, idMember)
chainSet[idMember] = struct{}{}
}
// Looking for a linking object
var query object.SearchFilters
query.AddParentIDFilter(object.MatchStringEqual, obj)
var prm PrmObjectSearch
prm.SetContainerID(cnr)
prm.SetFilters(query)
if btoken != nil {
prm.UseBearer(*btoken)
}
resSearch, err := p.SearchObjects(ctx, prm)
if err != nil {
return nil, fmt.Errorf("failed to find object children: %w", err)
}
err = resSearch.Iterate(func(id oid.ID) bool {
if _, ok = chainSet[id]; !ok {
chain = append(chain, id)
}
return false
})
if err != nil {
return nil, fmt.Errorf("failed to iterate found objects: %w", err)
}
return chain, nil
}
type objectReadCloser struct {
reader *sdkClient.ObjectReader
elapsedTimeCallback func(time.Duration)