[#4] Optimize purge using tombstones for removed objects
Signed-off-by: Aleksey Kravchenko <al.kravchenko@yadro.com>
This commit is contained in:
parent
4733d46d83
commit
69d6e4a2e8
2 changed files with 85 additions and 27 deletions
|
@ -17,6 +17,7 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ape"
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||
sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
|
@ -36,6 +37,7 @@ import (
|
|||
robject "github.com/rclone/rclone/fs/object"
|
||||
"github.com/rclone/rclone/fs/walk"
|
||||
"github.com/rclone/rclone/lib/bucket"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -155,6 +157,24 @@ func init() {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "tombstone_lifetime",
|
||||
Advanced: true,
|
||||
Help: "Tombstone's lifetime in epochs",
|
||||
Default: 10,
|
||||
},
|
||||
{
|
||||
Name: "tombstone_member_size",
|
||||
Advanced: true,
|
||||
Help: "Maximum number of object IDs in one tombstone",
|
||||
Default: 100,
|
||||
},
|
||||
{
|
||||
Name: "tombstone_worker_pool_size",
|
||||
Advanced: true,
|
||||
Help: "Maximum number of concurrent workers for tombstone processing",
|
||||
Default: 10,
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
@ -224,6 +244,9 @@ type Options struct {
|
|||
APECacheInvalidationDuration fs.Duration `config:"ape_cache_invalidation_duration"`
|
||||
APECacheInvalidationTimeout fs.Duration `config:"ape_cache_invalidation_timeout"`
|
||||
APEChainCheckInterval fs.Duration `config:"ape_chain_check_interval"`
|
||||
TombstoneLifetime uint64 `config:"tombstone_lifetime"`
|
||||
TombstoneMemberSize int `config:"tombstone_member_size"`
|
||||
TombstoneWorkerPoolSize int `config:"tombstone_worker_pool_size"`
|
||||
|
||||
RPCEndpoint string `config:"rpc_endpoint"`
|
||||
Wallet string `config:"wallet"`
|
||||
|
@ -1485,40 +1508,74 @@ func (f *Fs) deleteByPrefix(ctx context.Context, cnrID cid.ID, prefix string) er
|
|||
}
|
||||
defer res.Close()
|
||||
|
||||
var (
|
||||
inErr error
|
||||
found bool
|
||||
prmDelete pool.PrmObjectDelete
|
||||
addr oid.Address
|
||||
)
|
||||
|
||||
addr.SetContainer(cnrID)
|
||||
|
||||
err = res.Iterate(func(id oid.ID) bool {
|
||||
found = true
|
||||
|
||||
addr.SetObject(id)
|
||||
prmDelete.SetAddress(addr)
|
||||
|
||||
if err = f.pool.DeleteObject(ctx, prmDelete); err != nil {
|
||||
inErr = fmt.Errorf("delete object: %w", err)
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
})
|
||||
if err == nil {
|
||||
err = inErr
|
||||
}
|
||||
netInfo, err := f.pool.NetworkInfo(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("iterate objects: %w", err)
|
||||
return fmt.Errorf("get network info: %w", err)
|
||||
}
|
||||
|
||||
eg := &errgroup.Group{}
|
||||
eg.SetLimit(f.opt.TombstoneWorkerPoolSize)
|
||||
found := false
|
||||
for {
|
||||
objIDs := make([]oid.ID, f.opt.TombstoneMemberSize)
|
||||
n, err := res.Read(objIDs)
|
||||
if err != nil && err != io.EOF {
|
||||
_ = eg.Wait()
|
||||
return fmt.Errorf("read object IDs: %w", err)
|
||||
}
|
||||
if n == 0 {
|
||||
break
|
||||
}
|
||||
found = true
|
||||
eg.Go(func() error {
|
||||
if n == 1 {
|
||||
objID := objIDs[0]
|
||||
prm := pool.PrmObjectDelete{}
|
||||
prm.SetAddress(newAddress(cnrID, objID))
|
||||
if err := f.pool.DeleteObject(ctx, prm); err != nil {
|
||||
return fmt.Errorf("couldn't delete object '%s' within container '%s': %w", objID, cnrID, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
tomb := object.NewTombstone()
|
||||
tomb.SetExpirationEpoch(netInfo.CurrentEpoch() + f.opt.TombstoneLifetime)
|
||||
tomb.SetMembers(objIDs[:n])
|
||||
payload, err := tomb.Marshal()
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal tombstone: %w", err)
|
||||
}
|
||||
|
||||
exprAttr := object.NewAttribute()
|
||||
exprAttr.SetKey(objectV2.SysAttributeExpEpoch)
|
||||
exprAttr.SetValue(strconv.FormatUint(tomb.ExpirationEpoch(), 10))
|
||||
creationTimeAttr := object.NewAttribute()
|
||||
creationTimeAttr.SetKey(object.AttributeTimestamp)
|
||||
creationTimeAttr.SetValue(strconv.FormatInt(time.Now().Unix(), 10))
|
||||
obj := object.New()
|
||||
obj.SetContainerID(cnrID)
|
||||
obj.SetOwnerID(*f.owner)
|
||||
obj.SetAttributes(*exprAttr, *creationTimeAttr)
|
||||
obj.SetType(object.TypeTombstone)
|
||||
|
||||
prm := pool.PrmObjectPut{}
|
||||
prm.SetPayload(bytes.NewReader(payload))
|
||||
prm.SetHeader(*obj)
|
||||
if _, err = f.pool.PutObject(ctx, prm); err != nil {
|
||||
return fmt.Errorf("put tombstone: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
return fs.ErrorDirNotFound
|
||||
}
|
||||
|
||||
return nil
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
func (f *Fs) isContainerEmpty(ctx context.Context, cnrID cid.ID) (bool, error) {
|
||||
|
|
1
go.sum
1
go.sum
|
@ -41,6 +41,7 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
|
|||
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
|
||||
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
|
||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.0/go.mod h1:uY0AYmCznjZdghDnAk7THFIe1Vlg531IxUcus7ZfUJI=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e h1:kcBqZBiFIUBATUqEuvVigtkJJWQ2Gug/eYXn967o3M4=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e/go.mod h1:F/fe1OoIDKr5Bz99q4sriuHDuf3aZefZy9ZsCqEtgxc=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk=
|
||||
|
|
Loading…
Add table
Reference in a new issue