From 3d5fc50d5786c90dd790aa3ae834933cfb1e4190 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Thu, 25 Jul 2024 12:50:15 +0300 Subject: [PATCH] [#4] Use one object/multipart listing Don't init listing on every rule in configuration. Use just one Signed-off-by: Denis Kirillov --- internal/lifecycle/executor.go | 288 +++++++++++++++++++++------------ internal/logs/logs.go | 3 +- 2 files changed, 184 insertions(+), 107 deletions(-) diff --git a/internal/lifecycle/executor.go b/internal/lifecycle/executor.go index 3defd22..6e7e537 100644 --- a/internal/lifecycle/executor.go +++ b/internal/lifecycle/executor.go @@ -50,6 +50,10 @@ func (l *logWrapper) Printf(format string, args ...interface{}) { l.log.Info(fmt.Sprintf(format, args...)) } +const ( + statusDisabled = "Disabled" +) + func NewExecutor(ctx context.Context, cfg ExecutorConfig) (*Executor, error) { e := &Executor{ log: cfg.Logger, @@ -92,21 +96,19 @@ LOOP: break LOOP } - func(job Job) { - wg.Add(1) - err = e.pool.Submit(func() { - defer wg.Done() - if inErr := e.worker(ctx, job); inErr != nil { - e.log.Warn(logs.WorkerFailedToHandleJob, zap.Uint64("epoch", job.Epoch), - zap.String("cid", job.ContainerID.EncodeToString()), zap.Error(inErr)) - } - }) - - if err != nil { - wg.Done() - e.log.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err)) + wg.Add(1) + err = e.pool.Submit(func() { + defer wg.Done() + if inErr := e.worker(ctx, job); inErr != nil { + e.log.Warn(logs.WorkerFailedToHandleJob, zap.Uint64("epoch", job.Epoch), + zap.String("cid", job.ContainerID.EncodeToString()), zap.Error(inErr)) } - }(job) + }) + + if err != nil { + wg.Done() + e.log.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err)) + } } } @@ -141,31 +143,14 @@ func (e *Executor) worker(ctx context.Context, job Job) error { ni.SetCurrentEpoch(job.Epoch) } - // todo consider listing full object and multipart once - - for _, rule := range job.LifecycleConfiguration.Rules { - if rule.Status == "Disabled" { - continue - } - - // todo consider validating rule - - if err = e.handleRule(ctx, ni, rule, bktInfo, settings); err != nil { - e.log.Warn(logs.FailedToHandleRule, zap.Uint64("epoch", job.Epoch), - zap.Stringer("cid", job.ContainerID), zap.String("rule", rule.ID), - zap.Error(err)) - } + if err = e.abortMultiparts(ctx, ni, job.LifecycleConfiguration.Rules, bktInfo); err != nil { + e.log.Warn(logs.AbortMultipartUploads, zap.Uint64("epoch", job.Epoch), + zap.Stringer("cid", job.ContainerID), zap.Error(err)) } - return nil -} - -func (e *Executor) handleRule(ctx context.Context, ni *netmap.NetworkInfo, rule data.LifecycleRule, bktInfo *data.BucketInfo, settings *data.BucketSettings) error { - if err := e.abortMultiparts(ctx, ni, rule, bktInfo); err != nil { - e.log.Warn(logs.AbortMultipartUploads, zap.Error(err)) - } - if err := e.expireObjects(ctx, ni, rule, bktInfo, settings); err != nil { - e.log.Warn(logs.ExpireObjects, zap.Error(err)) + if err = e.expireObjects(ctx, ni, job.LifecycleConfiguration.Rules, bktInfo, settings); err != nil { + e.log.Warn(logs.ExpireObjects, zap.Uint64("epoch", job.Epoch), + zap.Stringer("cid", job.ContainerID), zap.Error(err)) } return nil @@ -176,59 +161,23 @@ const ( tagPrefix = "S3-Tag-" ) -func (e *Executor) abortMultiparts(ctx context.Context, ni *netmap.NetworkInfo, rule data.LifecycleRule, bktInfo *data.BucketInfo) error { - if rule.AbortIncompleteMultipartUpload == nil || rule.AbortIncompleteMultipartUpload.DaysAfterInitiation == nil { +func (e *Executor) abortMultiparts(ctx context.Context, ni *netmap.NetworkInfo, rules []data.LifecycleRule, bktInfo *data.BucketInfo) error { + if len(rules) == 0 { return nil } - var prefix string - matchMultipartFn := func(*data.MultipartInfo) bool { return true } - if rule.Filter != nil { - filter := rule.Filter - - if filter.ObjectSizeGreaterThan != nil || filter.ObjectSizeLessThan != nil || - filter.And != nil && (filter.And.ObjectSizeGreaterThan != nil || filter.And.ObjectSizeLessThan != nil) { - // todo check if AWS behave the same way - // uncompleted multipart has no size, so filter cannot be applied - return nil - } - - prefix = filter.Prefix - if filter.And != nil { - prefix = filter.And.Prefix - } - - if filter.Tag != nil || filter.And != nil { - matchMultipartFn = matchMultipartByTagsFunc(filter) - } - } - - multipartDuration, err := durationToEpochsAbs(ni, 24*time.Hour*time.Duration(*rule.AbortIncompleteMultipartUpload.DaysAfterInitiation)) + matcherFn, err := e.matchMultipartByRulesFn(ni, rules) if err != nil { - return fmt.Errorf("DaysAfterInitiation to epochs: %w", err) + return fmt.Errorf("form multiaprt matcher: %w", err) } - multiparts, err := e.tree.GetMultipartUploadsByPrefix(ctx, bktInfo, prefix) + multiparts, err := e.tree.GetMultipartUploadsByPrefix(ctx, bktInfo, "") if err != nil { return fmt.Errorf("list multiparts: %w", err) } for _, multipart := range multiparts { - if !matchMultipartFn(multipart) { - continue - } - - if multipart.Finished { - continue - } - - multipartCreationEpoch, err := creationEpoch(ni, multipart.Created, multipart.Meta) - if err != nil { - e.log.Warn(logs.FailedToGetMultipartCreationEpoch, zap.Error(err)) - continue - } - - if multipartCreationEpoch+multipartDuration < ni.CurrentEpoch() { + if !matcherFn(multipart) { continue } @@ -242,20 +191,91 @@ func (e *Executor) abortMultiparts(ctx context.Context, ni *netmap.NetworkInfo, return nil } -func (e *Executor) expireObjects(ctx context.Context, ni *netmap.NetworkInfo, rule data.LifecycleRule, bktInfo *data.BucketInfo, settings *data.BucketSettings) error { - if rule.Expiration == nil && rule.NonCurrentVersionExpiration == nil { - return fmt.Errorf("no expiration rules for '%s", rule.ID) +func (e *Executor) matchMultipartByRulesFn(ni *netmap.NetworkInfo, rules []data.LifecycleRule) (func(*data.MultipartInfo) bool, error) { + matchers := make([]func(*data.MultipartInfo) bool, 0, len(rules)) + for _, rule := range rules { + matchFn, err := e.matchMultipartByRuleFn(ni, rule) + if err != nil && !errors.Is(err, errNotApplicableRule) { + return nil, err + } + + matchers = append(matchers, matchFn) + } + + if len(matchers) == 0 { + return nil, errNotApplicableRule + } + + return func(info *data.MultipartInfo) bool { + for _, matcher := range matchers { + if matcher(info) { + return true + } + } + return false + }, nil +} + +var ( + errNotApplicableRule = errors.New("not applicable rule") + errAllObjectMatcherFailed = errors.New("all object matcher failed") +) + +func (e *Executor) matchMultipartByRuleFn(ni *netmap.NetworkInfo, rule data.LifecycleRule) (func(*data.MultipartInfo) bool, error) { + if rule.Status == statusDisabled { + return nil, errNotApplicableRule + } + + if rule.AbortIncompleteMultipartUpload == nil || rule.AbortIncompleteMultipartUpload.DaysAfterInitiation == nil { + return nil, errNotApplicableRule + } + + multipartDuration, err := durationToEpochsAbs(ni, 24*time.Hour*time.Duration(*rule.AbortIncompleteMultipartUpload.DaysAfterInitiation)) + if err != nil { + return nil, fmt.Errorf("DaysAfterInitiation to epochs: %w", err) } var prefix string + matchMultipartByTags := func(*data.MultipartInfo) bool { return true } if rule.Filter != nil { - prefix = rule.Filter.Prefix - if rule.Filter.And != nil { - prefix = rule.Filter.And.Prefix + filter := rule.Filter + + prefix = filter.Prefix + if filter.And != nil { + prefix = filter.And.Prefix + } + + if filter.Tag != nil || filter.And != nil { + matchMultipartByTags = matchMultipartByTagsFunc(filter) } } - objectStream, err := e.tree.InitVersionsByPrefixStream(ctx, bktInfo, prefix, false) + return func(multipart *data.MultipartInfo) bool { + if multipart.Finished || !strings.HasPrefix(multipart.Key, prefix) || !matchMultipartByTags(multipart) { + return false + } + + multipartCreationEpoch, err := creationEpoch(ni, multipart.Created, multipart.Meta) + if err != nil { + e.log.Warn(logs.FailedToGetMultipartCreationEpoch, zap.Error(err)) + return false + } + + return multipartCreationEpoch+multipartDuration >= ni.CurrentEpoch() + }, nil +} + +func (e *Executor) expireObjects(ctx context.Context, ni *netmap.NetworkInfo, rules []data.LifecycleRule, bktInfo *data.BucketInfo, settings *data.BucketSettings) error { + if len(rules) == 0 { + return nil + } + + matcherFn, err := e.matchObjectByRulesFn(ctx, ni, bktInfo, rules) + if err != nil { + return fmt.Errorf("form multiaprt matcher: %w", err) + } + + objectStream, err := e.tree.InitVersionsByPrefixStream(ctx, bktInfo, "", false) if err != nil { return fmt.Errorf("list multiparts: %w", err) } @@ -269,7 +289,7 @@ func (e *Executor) expireObjects(ctx context.Context, ni *netmap.NetworkInfo, ru nodeVersion, err := objectStream.Next(ctx) if err != nil { if errors.Is(err, io.EOF) { - if err = e.expireObject(ctx, versions, bktInfo, ni, rule, settings); err != nil { + if err = e.expireObject(ctx, versions, bktInfo, ni, matcherFn, settings); err != nil { e.log.Warn(logs.FailedToExpireObject, zap.String("object", latestObjName), zap.Error(err)) } return nil @@ -278,7 +298,7 @@ func (e *Executor) expireObjects(ctx context.Context, ni *netmap.NetworkInfo, ru } if nodeVersion.FilePath != latestObjName { - if err = e.expireObject(ctx, versions, bktInfo, ni, rule, settings); err != nil { + if err = e.expireObject(ctx, versions, bktInfo, ni, matcherFn, settings); err != nil { e.log.Warn(logs.FailedToExpireObject, zap.String("object", latestObjName), zap.Error(err)) } latestObjName = nodeVersion.FilePath @@ -289,23 +309,61 @@ func (e *Executor) expireObjects(ctx context.Context, ni *netmap.NetworkInfo, ru } } -func (e *Executor) expireObject(ctx context.Context, versions []*data.NodeVersion, bktInfo *data.BucketInfo, ni *netmap.NetworkInfo, rule data.LifecycleRule, settings *data.BucketSettings) error { +type MatchObjectFunc = func(index int, versions []*data.NodeVersion) (bool, error) + +func (e *Executor) matchObjectByRulesFn(ctx context.Context, ni *netmap.NetworkInfo, bktInfo *data.BucketInfo, rules []data.LifecycleRule) (MatchObjectFunc, error) { + matchers := make([]MatchObjectFunc, 0, len(rules)) + + for _, rule := range rules { + matchFn, err := e.expiredObjectMatcher(ctx, bktInfo, ni, rule) + if err != nil { + e.log.Warn(logs.SkipRule, zap.String("rule", rule.ID), zap.Error(err)) + continue + } + + matchers = append(matchers, matchFn) + } + + if len(matchers) == 0 { + return nil, errNotApplicableRule + } + + return func(index int, versions []*data.NodeVersion) (bool, error) { + var numErrors int + for _, matcher := range matchers { + if matched, err := matcher(index, versions); err != nil { + e.log.Warn(logs.ObjectMatchingFailed, zap.Error(err)) + numErrors++ + continue + } else if matched { + return true, nil + } + } + + var err error + if numErrors == len(matchers) { + err = errAllObjectMatcherFailed + } + return false, err + }, nil +} + +func (e *Executor) expireObject(ctx context.Context, versions []*data.NodeVersion, bktInfo *data.BucketInfo, ni *netmap.NetworkInfo, matcherFn MatchObjectFunc, settings *data.BucketSettings) error { + if len(versions) == 0 { + return nil + } + sort.Slice(versions, func(i, j int) bool { return versions[i].Timestamp < versions[j].Timestamp }) - matchFilter, err := e.expiredObjectMatcher(ctx, versions, bktInfo, ni, rule) - if err != nil { - return err - } - var nullVersionsToDelete []int for i, version := range versions { if version.IsUnversioned { nullVersionsToDelete = append(nullVersionsToDelete, i) } - matched, err := matchFilter(i, version) + matched, err := matcherFn(i, versions) if err != nil { return err } @@ -346,16 +404,22 @@ func (e *Executor) expireObject(ctx context.Context, versions []*data.NodeVersio return nil } -func (e *Executor) expiredObjectMatcher(ctx context.Context, versions []*data.NodeVersion, bktInfo *data.BucketInfo, ni *netmap.NetworkInfo, rule data.LifecycleRule) (func(index int, version *data.NodeVersion) (bool, error), error) { +func (e *Executor) expiredObjectMatcher(ctx context.Context, bktInfo *data.BucketInfo, ni *netmap.NetworkInfo, rule data.LifecycleRule) (func(int, []*data.NodeVersion) (bool, error), error) { + if rule.Status == statusDisabled { + return nil, fmt.Errorf("%w: rule disabled", errNotApplicableRule) + } + + if rule.Expiration == nil && rule.NonCurrentVersionExpiration == nil { + return nil, fmt.Errorf("%w: missing expiration and non current expiration", errNotApplicableRule) + } + var prm objectMatcherParams + prm.newerNonCurrentVersions = math.MaxInt prm.minObjSize, prm.maxObjSize = getObjectSizeRange(rule) if rule.NonCurrentVersionExpiration != nil { if rule.NonCurrentVersionExpiration.NewerNonCurrentVersions != nil { - prm.startIndex = len(versions) - *rule.NonCurrentVersionExpiration.NewerNonCurrentVersions - 1 - if prm.startIndex < 0 { - prm.startIndex = 0 - } + prm.newerNonCurrentVersions = *rule.NonCurrentVersionExpiration.NewerNonCurrentVersions } if rule.NonCurrentVersionExpiration.NonCurrentDays != nil { @@ -391,6 +455,11 @@ func (e *Executor) expiredObjectMatcher(ctx context.Context, versions []*data.No } if rule.Filter != nil { + prm.prefix = rule.Filter.Prefix + if rule.Filter.And != nil { + prm.prefix = rule.Filter.And.Prefix + } + if rule.Filter.Tag != nil { prm.tagsToMatch = append(prm.tagsToMatch, *rule.Filter.Tag) } @@ -399,11 +468,12 @@ func (e *Executor) expiredObjectMatcher(ctx context.Context, versions []*data.No } } - return e.objectMatcher(ctx, bktInfo, ni, versions, prm), nil + return e.objectMatcher(ctx, bktInfo, ni, prm), nil } type objectMatcherParams struct { - startIndex int + prefix string + newerNonCurrentVersions int minObjSize uint64 maxObjSize uint64 nonCurrentDuration *uint64 @@ -413,12 +483,18 @@ type objectMatcherParams struct { tagsToMatch []data.Tag } -func (e *Executor) objectMatcher(ctx context.Context, bktInfo *data.BucketInfo, ni *netmap.NetworkInfo, versions []*data.NodeVersion, prm objectMatcherParams) func(int, *data.NodeVersion) (bool, error) { - return func(index int, version *data.NodeVersion) (bool, error) { - if index < prm.startIndex { +func (e *Executor) objectMatcher(ctx context.Context, bktInfo *data.BucketInfo, ni *netmap.NetworkInfo, prm objectMatcherParams) func(int, []*data.NodeVersion) (bool, error) { + return func(index int, versions []*data.NodeVersion) (bool, error) { + if !strings.HasPrefix(versions[index].FilePath, prm.prefix) { + return false, nil + } + + if index < len(versions)-prm.newerNonCurrentVersions-1 { return true, nil } + version := versions[index] + if prm.nonCurrentDuration != nil { if index < len(versions)-1 { next := versions[index+1] @@ -428,7 +504,7 @@ func (e *Executor) objectMatcher(ctx context.Context, bktInfo *data.BucketInfo, } if epoch+(*prm.nonCurrentDuration) < ni.CurrentEpoch() { - return false, nil + return true, nil } } } diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 6cc6c63..868dc07 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -51,7 +51,8 @@ const ( WorkerFailedToHandleJob = "worker failed to handle job" ExecutorStopped = "executor stopped" ExecutorStoppedJobsChannelIsClosed = "executor stopped: jobs channel is closed" - FailedToHandleRule = "failed to handle rule" + SkipRule = "skip rule" + ObjectMatchingFailed = "object matching failed" AbortMultipartUploads = "abort multiparts uploads" ExpireObjects = "expire objects" FailedToGetMultipartCreationEpoch = "failed to get multipart creation epoch"