[#4] Use one object/multipart listing

Don't init listing on every rule in configuration. Use just one

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-07-25 12:50:15 +03:00
parent 1121da9d24
commit 3d5fc50d57
2 changed files with 184 additions and 107 deletions

View file

@ -50,6 +50,10 @@ func (l *logWrapper) Printf(format string, args ...interface{}) {
l.log.Info(fmt.Sprintf(format, args...))
}
const (
statusDisabled = "Disabled"
)
func NewExecutor(ctx context.Context, cfg ExecutorConfig) (*Executor, error) {
e := &Executor{
log: cfg.Logger,
@ -92,21 +96,19 @@ LOOP:
break LOOP
}
func(job Job) {
wg.Add(1)
err = e.pool.Submit(func() {
defer wg.Done()
if inErr := e.worker(ctx, job); inErr != nil {
e.log.Warn(logs.WorkerFailedToHandleJob, zap.Uint64("epoch", job.Epoch),
zap.String("cid", job.ContainerID.EncodeToString()), zap.Error(inErr))
}
})
if err != nil {
wg.Done()
e.log.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
wg.Add(1)
err = e.pool.Submit(func() {
defer wg.Done()
if inErr := e.worker(ctx, job); inErr != nil {
e.log.Warn(logs.WorkerFailedToHandleJob, zap.Uint64("epoch", job.Epoch),
zap.String("cid", job.ContainerID.EncodeToString()), zap.Error(inErr))
}
}(job)
})
if err != nil {
wg.Done()
e.log.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
}
}
}
@ -141,31 +143,14 @@ func (e *Executor) worker(ctx context.Context, job Job) error {
ni.SetCurrentEpoch(job.Epoch)
}
// todo consider listing full object and multipart once
for _, rule := range job.LifecycleConfiguration.Rules {
if rule.Status == "Disabled" {
continue
}
// todo consider validating rule
if err = e.handleRule(ctx, ni, rule, bktInfo, settings); err != nil {
e.log.Warn(logs.FailedToHandleRule, zap.Uint64("epoch", job.Epoch),
zap.Stringer("cid", job.ContainerID), zap.String("rule", rule.ID),
zap.Error(err))
}
if err = e.abortMultiparts(ctx, ni, job.LifecycleConfiguration.Rules, bktInfo); err != nil {
e.log.Warn(logs.AbortMultipartUploads, zap.Uint64("epoch", job.Epoch),
zap.Stringer("cid", job.ContainerID), zap.Error(err))
}
return nil
}
func (e *Executor) handleRule(ctx context.Context, ni *netmap.NetworkInfo, rule data.LifecycleRule, bktInfo *data.BucketInfo, settings *data.BucketSettings) error {
if err := e.abortMultiparts(ctx, ni, rule, bktInfo); err != nil {
e.log.Warn(logs.AbortMultipartUploads, zap.Error(err))
}
if err := e.expireObjects(ctx, ni, rule, bktInfo, settings); err != nil {
e.log.Warn(logs.ExpireObjects, zap.Error(err))
if err = e.expireObjects(ctx, ni, job.LifecycleConfiguration.Rules, bktInfo, settings); err != nil {
e.log.Warn(logs.ExpireObjects, zap.Uint64("epoch", job.Epoch),
zap.Stringer("cid", job.ContainerID), zap.Error(err))
}
return nil
@ -176,59 +161,23 @@ const (
tagPrefix = "S3-Tag-"
)
func (e *Executor) abortMultiparts(ctx context.Context, ni *netmap.NetworkInfo, rule data.LifecycleRule, bktInfo *data.BucketInfo) error {
if rule.AbortIncompleteMultipartUpload == nil || rule.AbortIncompleteMultipartUpload.DaysAfterInitiation == nil {
func (e *Executor) abortMultiparts(ctx context.Context, ni *netmap.NetworkInfo, rules []data.LifecycleRule, bktInfo *data.BucketInfo) error {
if len(rules) == 0 {
return nil
}
var prefix string
matchMultipartFn := func(*data.MultipartInfo) bool { return true }
if rule.Filter != nil {
filter := rule.Filter
if filter.ObjectSizeGreaterThan != nil || filter.ObjectSizeLessThan != nil ||
filter.And != nil && (filter.And.ObjectSizeGreaterThan != nil || filter.And.ObjectSizeLessThan != nil) {
// todo check if AWS behave the same way
// uncompleted multipart has no size, so filter cannot be applied
return nil
}
prefix = filter.Prefix
if filter.And != nil {
prefix = filter.And.Prefix
}
if filter.Tag != nil || filter.And != nil {
matchMultipartFn = matchMultipartByTagsFunc(filter)
}
}
multipartDuration, err := durationToEpochsAbs(ni, 24*time.Hour*time.Duration(*rule.AbortIncompleteMultipartUpload.DaysAfterInitiation))
matcherFn, err := e.matchMultipartByRulesFn(ni, rules)
if err != nil {
return fmt.Errorf("DaysAfterInitiation to epochs: %w", err)
return fmt.Errorf("form multiaprt matcher: %w", err)
}
multiparts, err := e.tree.GetMultipartUploadsByPrefix(ctx, bktInfo, prefix)
multiparts, err := e.tree.GetMultipartUploadsByPrefix(ctx, bktInfo, "")
if err != nil {
return fmt.Errorf("list multiparts: %w", err)
}
for _, multipart := range multiparts {
if !matchMultipartFn(multipart) {
continue
}
if multipart.Finished {
continue
}
multipartCreationEpoch, err := creationEpoch(ni, multipart.Created, multipart.Meta)
if err != nil {
e.log.Warn(logs.FailedToGetMultipartCreationEpoch, zap.Error(err))
continue
}
if multipartCreationEpoch+multipartDuration < ni.CurrentEpoch() {
if !matcherFn(multipart) {
continue
}
@ -242,20 +191,91 @@ func (e *Executor) abortMultiparts(ctx context.Context, ni *netmap.NetworkInfo,
return nil
}
func (e *Executor) expireObjects(ctx context.Context, ni *netmap.NetworkInfo, rule data.LifecycleRule, bktInfo *data.BucketInfo, settings *data.BucketSettings) error {
if rule.Expiration == nil && rule.NonCurrentVersionExpiration == nil {
return fmt.Errorf("no expiration rules for '%s", rule.ID)
func (e *Executor) matchMultipartByRulesFn(ni *netmap.NetworkInfo, rules []data.LifecycleRule) (func(*data.MultipartInfo) bool, error) {
matchers := make([]func(*data.MultipartInfo) bool, 0, len(rules))
for _, rule := range rules {
matchFn, err := e.matchMultipartByRuleFn(ni, rule)
if err != nil && !errors.Is(err, errNotApplicableRule) {
return nil, err
}
matchers = append(matchers, matchFn)
}
if len(matchers) == 0 {
return nil, errNotApplicableRule
}
return func(info *data.MultipartInfo) bool {
for _, matcher := range matchers {
if matcher(info) {
return true
}
}
return false
}, nil
}
var (
errNotApplicableRule = errors.New("not applicable rule")
errAllObjectMatcherFailed = errors.New("all object matcher failed")
)
func (e *Executor) matchMultipartByRuleFn(ni *netmap.NetworkInfo, rule data.LifecycleRule) (func(*data.MultipartInfo) bool, error) {
if rule.Status == statusDisabled {
return nil, errNotApplicableRule
}
if rule.AbortIncompleteMultipartUpload == nil || rule.AbortIncompleteMultipartUpload.DaysAfterInitiation == nil {
return nil, errNotApplicableRule
}
multipartDuration, err := durationToEpochsAbs(ni, 24*time.Hour*time.Duration(*rule.AbortIncompleteMultipartUpload.DaysAfterInitiation))
if err != nil {
return nil, fmt.Errorf("DaysAfterInitiation to epochs: %w", err)
}
var prefix string
matchMultipartByTags := func(*data.MultipartInfo) bool { return true }
if rule.Filter != nil {
prefix = rule.Filter.Prefix
if rule.Filter.And != nil {
prefix = rule.Filter.And.Prefix
filter := rule.Filter
prefix = filter.Prefix
if filter.And != nil {
prefix = filter.And.Prefix
}
if filter.Tag != nil || filter.And != nil {
matchMultipartByTags = matchMultipartByTagsFunc(filter)
}
}
objectStream, err := e.tree.InitVersionsByPrefixStream(ctx, bktInfo, prefix, false)
return func(multipart *data.MultipartInfo) bool {
if multipart.Finished || !strings.HasPrefix(multipart.Key, prefix) || !matchMultipartByTags(multipart) {
return false
}
multipartCreationEpoch, err := creationEpoch(ni, multipart.Created, multipart.Meta)
if err != nil {
e.log.Warn(logs.FailedToGetMultipartCreationEpoch, zap.Error(err))
return false
}
return multipartCreationEpoch+multipartDuration >= ni.CurrentEpoch()
}, nil
}
func (e *Executor) expireObjects(ctx context.Context, ni *netmap.NetworkInfo, rules []data.LifecycleRule, bktInfo *data.BucketInfo, settings *data.BucketSettings) error {
if len(rules) == 0 {
return nil
}
matcherFn, err := e.matchObjectByRulesFn(ctx, ni, bktInfo, rules)
if err != nil {
return fmt.Errorf("form multiaprt matcher: %w", err)
}
objectStream, err := e.tree.InitVersionsByPrefixStream(ctx, bktInfo, "", false)
if err != nil {
return fmt.Errorf("list multiparts: %w", err)
}
@ -269,7 +289,7 @@ func (e *Executor) expireObjects(ctx context.Context, ni *netmap.NetworkInfo, ru
nodeVersion, err := objectStream.Next(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
if err = e.expireObject(ctx, versions, bktInfo, ni, rule, settings); err != nil {
if err = e.expireObject(ctx, versions, bktInfo, ni, matcherFn, settings); err != nil {
e.log.Warn(logs.FailedToExpireObject, zap.String("object", latestObjName), zap.Error(err))
}
return nil
@ -278,7 +298,7 @@ func (e *Executor) expireObjects(ctx context.Context, ni *netmap.NetworkInfo, ru
}
if nodeVersion.FilePath != latestObjName {
if err = e.expireObject(ctx, versions, bktInfo, ni, rule, settings); err != nil {
if err = e.expireObject(ctx, versions, bktInfo, ni, matcherFn, settings); err != nil {
e.log.Warn(logs.FailedToExpireObject, zap.String("object", latestObjName), zap.Error(err))
}
latestObjName = nodeVersion.FilePath
@ -289,23 +309,61 @@ func (e *Executor) expireObjects(ctx context.Context, ni *netmap.NetworkInfo, ru
}
}
func (e *Executor) expireObject(ctx context.Context, versions []*data.NodeVersion, bktInfo *data.BucketInfo, ni *netmap.NetworkInfo, rule data.LifecycleRule, settings *data.BucketSettings) error {
type MatchObjectFunc = func(index int, versions []*data.NodeVersion) (bool, error)
func (e *Executor) matchObjectByRulesFn(ctx context.Context, ni *netmap.NetworkInfo, bktInfo *data.BucketInfo, rules []data.LifecycleRule) (MatchObjectFunc, error) {
matchers := make([]MatchObjectFunc, 0, len(rules))
for _, rule := range rules {
matchFn, err := e.expiredObjectMatcher(ctx, bktInfo, ni, rule)
if err != nil {
e.log.Warn(logs.SkipRule, zap.String("rule", rule.ID), zap.Error(err))
continue
}
matchers = append(matchers, matchFn)
}
if len(matchers) == 0 {
return nil, errNotApplicableRule
}
return func(index int, versions []*data.NodeVersion) (bool, error) {
var numErrors int
for _, matcher := range matchers {
if matched, err := matcher(index, versions); err != nil {
e.log.Warn(logs.ObjectMatchingFailed, zap.Error(err))
numErrors++
continue
} else if matched {
return true, nil
}
}
var err error
if numErrors == len(matchers) {
err = errAllObjectMatcherFailed
}
return false, err
}, nil
}
func (e *Executor) expireObject(ctx context.Context, versions []*data.NodeVersion, bktInfo *data.BucketInfo, ni *netmap.NetworkInfo, matcherFn MatchObjectFunc, settings *data.BucketSettings) error {
if len(versions) == 0 {
return nil
}
sort.Slice(versions, func(i, j int) bool {
return versions[i].Timestamp < versions[j].Timestamp
})
matchFilter, err := e.expiredObjectMatcher(ctx, versions, bktInfo, ni, rule)
if err != nil {
return err
}
var nullVersionsToDelete []int
for i, version := range versions {
if version.IsUnversioned {
nullVersionsToDelete = append(nullVersionsToDelete, i)
}
matched, err := matchFilter(i, version)
matched, err := matcherFn(i, versions)
if err != nil {
return err
}
@ -346,16 +404,22 @@ func (e *Executor) expireObject(ctx context.Context, versions []*data.NodeVersio
return nil
}
func (e *Executor) expiredObjectMatcher(ctx context.Context, versions []*data.NodeVersion, bktInfo *data.BucketInfo, ni *netmap.NetworkInfo, rule data.LifecycleRule) (func(index int, version *data.NodeVersion) (bool, error), error) {
func (e *Executor) expiredObjectMatcher(ctx context.Context, bktInfo *data.BucketInfo, ni *netmap.NetworkInfo, rule data.LifecycleRule) (func(int, []*data.NodeVersion) (bool, error), error) {
if rule.Status == statusDisabled {
return nil, fmt.Errorf("%w: rule disabled", errNotApplicableRule)
}
if rule.Expiration == nil && rule.NonCurrentVersionExpiration == nil {
return nil, fmt.Errorf("%w: missing expiration and non current expiration", errNotApplicableRule)
}
var prm objectMatcherParams
prm.newerNonCurrentVersions = math.MaxInt
prm.minObjSize, prm.maxObjSize = getObjectSizeRange(rule)
if rule.NonCurrentVersionExpiration != nil {
if rule.NonCurrentVersionExpiration.NewerNonCurrentVersions != nil {
prm.startIndex = len(versions) - *rule.NonCurrentVersionExpiration.NewerNonCurrentVersions - 1
if prm.startIndex < 0 {
prm.startIndex = 0
}
prm.newerNonCurrentVersions = *rule.NonCurrentVersionExpiration.NewerNonCurrentVersions
}
if rule.NonCurrentVersionExpiration.NonCurrentDays != nil {
@ -391,6 +455,11 @@ func (e *Executor) expiredObjectMatcher(ctx context.Context, versions []*data.No
}
if rule.Filter != nil {
prm.prefix = rule.Filter.Prefix
if rule.Filter.And != nil {
prm.prefix = rule.Filter.And.Prefix
}
if rule.Filter.Tag != nil {
prm.tagsToMatch = append(prm.tagsToMatch, *rule.Filter.Tag)
}
@ -399,11 +468,12 @@ func (e *Executor) expiredObjectMatcher(ctx context.Context, versions []*data.No
}
}
return e.objectMatcher(ctx, bktInfo, ni, versions, prm), nil
return e.objectMatcher(ctx, bktInfo, ni, prm), nil
}
type objectMatcherParams struct {
startIndex int
prefix string
newerNonCurrentVersions int
minObjSize uint64
maxObjSize uint64
nonCurrentDuration *uint64
@ -413,12 +483,18 @@ type objectMatcherParams struct {
tagsToMatch []data.Tag
}
func (e *Executor) objectMatcher(ctx context.Context, bktInfo *data.BucketInfo, ni *netmap.NetworkInfo, versions []*data.NodeVersion, prm objectMatcherParams) func(int, *data.NodeVersion) (bool, error) {
return func(index int, version *data.NodeVersion) (bool, error) {
if index < prm.startIndex {
func (e *Executor) objectMatcher(ctx context.Context, bktInfo *data.BucketInfo, ni *netmap.NetworkInfo, prm objectMatcherParams) func(int, []*data.NodeVersion) (bool, error) {
return func(index int, versions []*data.NodeVersion) (bool, error) {
if !strings.HasPrefix(versions[index].FilePath, prm.prefix) {
return false, nil
}
if index < len(versions)-prm.newerNonCurrentVersions-1 {
return true, nil
}
version := versions[index]
if prm.nonCurrentDuration != nil {
if index < len(versions)-1 {
next := versions[index+1]
@ -428,7 +504,7 @@ func (e *Executor) objectMatcher(ctx context.Context, bktInfo *data.BucketInfo,
}
if epoch+(*prm.nonCurrentDuration) < ni.CurrentEpoch() {
return false, nil
return true, nil
}
}
}

View file

@ -51,7 +51,8 @@ const (
WorkerFailedToHandleJob = "worker failed to handle job"
ExecutorStopped = "executor stopped"
ExecutorStoppedJobsChannelIsClosed = "executor stopped: jobs channel is closed"
FailedToHandleRule = "failed to handle rule"
SkipRule = "skip rule"
ObjectMatchingFailed = "object matching failed"
AbortMultipartUploads = "abort multiparts uploads"
ExpireObjects = "expire objects"
FailedToGetMultipartCreationEpoch = "failed to get multipart creation epoch"