frostfs-node/pkg/services/object/delete/util.go

141 lines
2.9 KiB
Go
Raw Normal View History

package deletesvc
import (
"errors"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type headSvcWrapper getsvc.Service
type searchSvcWrapper searchsvc.Service
type putSvcWrapper putsvc.Service
type simpleIDWriter struct {
ids []oid.ID
}
func (w *headSvcWrapper) headAddress(exec *execCtx, addr oid.Address) (*object.Object, error) {
wr := getsvc.NewSimpleObjectWriter()
p := getsvc.HeadPrm{}
p.SetCommonParameters(exec.prm.common)
p.SetHeaderWriter(wr)
p.WithRawFlag(true)
p.WithAddress(addr)
err := (*getsvc.Service)(w).Head(exec.ctx, p)
if err != nil {
return nil, err
}
return wr.Object(), nil
}
func (w *headSvcWrapper) splitInfo(exec *execCtx) (*object.SplitInfo, error) {
_, err := w.headAddress(exec, exec.prm.addr)
var errSplitInfo *object.SplitInfoError
switch {
case err == nil:
return nil, nil
case errors.As(err, &errSplitInfo):
return errSplitInfo.SplitInfo(), nil
default:
return nil, err
}
}
func (w *headSvcWrapper) children(exec *execCtx) ([]oid.ID, error) {
link, _ := exec.splitInfo.Link()
a := exec.newAddress(link)
linking, err := w.headAddress(exec, a)
if err != nil {
return nil, err
}
return linking.Children(), nil
}
func (w *headSvcWrapper) previous(exec *execCtx, id oid.ID) (*oid.ID, error) {
a := exec.newAddress(id)
h, err := w.headAddress(exec, a)
if err != nil {
return nil, err
}
prev, ok := h.PreviousID()
if ok {
return &prev, nil
}
return nil, nil
}
func (w *searchSvcWrapper) splitMembers(exec *execCtx) ([]oid.ID, error) {
fs := object.SearchFilters{}
fs.AddSplitIDFilter(object.MatchStringEqual, exec.splitInfo.SplitID())
wr := new(simpleIDWriter)
p := searchsvc.Prm{}
p.SetWriter(wr)
p.SetCommonParameters(exec.prm.common)
p.WithContainerID(exec.prm.addr.Container())
p.WithSearchFilters(fs)
err := (*searchsvc.Service)(w).Search(exec.ctx, p)
if err != nil {
return nil, err
}
return wr.ids, nil
}
func (s *simpleIDWriter) WriteIDs(ids []oid.ID) error {
s.ids = append(s.ids, ids...)
return nil
}
func (w *putSvcWrapper) put(exec *execCtx) (*oid.ID, error) {
streamer, err := (*putsvc.Service)(w).Put(exec.ctx)
if err != nil {
return nil, err
}
payload := exec.tombstoneObj.Payload()
initPrm := new(putsvc.PutInitPrm).
WithCommonPrm(exec.prm.common).
WithObject(exec.tombstoneObj.CutPayload())
err = streamer.Init(initPrm)
if err != nil {
return nil, err
}
err = streamer.SendChunk(new(putsvc.PutChunkPrm).WithChunk(payload))
if err != nil {
return nil, err
}
r, err := streamer.Close()
if err != nil {
return nil, err
}
id := r.ObjectID()
return &id, nil
}