From aaa652de67d24d525c9ddc4e46c09745daecf681 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 30 Jul 2024 16:11:41 +0300 Subject: [PATCH] [#16] executor: Fix object expiration condition Wrong comparison operators (and values) were used. Signed-off-by: Denis Kirillov --- internal/lifecycle/executor.go | 12 ++-- internal/lifecycle/executor_test.go | 86 ++++++++++++++++++++++++++++- internal/lifecycle/fetcher_test.go | 31 +++++++++-- 3 files changed, 118 insertions(+), 11 deletions(-) diff --git a/internal/lifecycle/executor.go b/internal/lifecycle/executor.go index 6aee672..5ab3984 100644 --- a/internal/lifecycle/executor.go +++ b/internal/lifecycle/executor.go @@ -25,6 +25,10 @@ import ( "go.uber.org/zap" ) +// nowTime is a value for getting the current time. This value can be overridden +// for testing mocking out current time. +var nowTime = time.Now + type Executor struct { log *zap.Logger jobs <-chan Job @@ -521,7 +525,7 @@ func (e *Executor) objectMatcher(ctx context.Context, bktInfo *data.BucketInfo, return false, err } - if versionEpoch < prm.expirationEpoch && versionEpoch+prm.expirationDurationEpochs < ni.CurrentEpoch() { + if ni.CurrentEpoch() < prm.expirationEpoch && versionEpoch+prm.expirationDurationEpochs > ni.CurrentEpoch() { return false, nil } @@ -574,7 +578,7 @@ func (e *Executor) addDeleteMarker(ctx context.Context, version *data.NodeVersio return } - now := time.Now() + now := nowTime() newVersion := &data.NodeVersion{ BaseNodeVersion: data.BaseNodeVersion{ OID: randOID, @@ -653,7 +657,7 @@ func isNotFound(err error) bool { func versionCreationEpoch(version *data.NodeVersion, ni *netmap.NetworkInfo) (uint64, error) { objCreationEpoch := version.CreationEpoch if objCreationEpoch == 0 { - created := time.Now() + created := nowTime() if version.Created != nil { created = *version.Created } @@ -704,7 +708,7 @@ func creationEpoch(ni *netmap.NetworkInfo, created time.Time, meta map[string]st } func timeToEpoch(ni *netmap.NetworkInfo, timeToConvert time.Time) (uint64, error) { - dur := time.Until(timeToConvert) + dur := timeToConvert.Sub(nowTime()) epochLifetime, err := durationToEpochsAbs(ni, dur) if err != nil { diff --git a/internal/lifecycle/executor_test.go b/internal/lifecycle/executor_test.go index 6148bd2..1c550ff 100644 --- a/internal/lifecycle/executor_test.go +++ b/internal/lifecycle/executor_test.go @@ -2,6 +2,7 @@ package lifecycle import ( "context" + "strconv" "testing" "time" @@ -356,7 +357,7 @@ func TestExecutorMultipleRulesMultiparts(t *testing.T) { ContainerID: bktInfo.CID, PrivateKey: ec.key, LifecycleConfiguration: lifecycleCfg, - Epoch: 1, + Epoch: 50, } ec.close() @@ -397,6 +398,89 @@ func TestExecutorAbortMultipartsInDays(t *testing.T) { require.Len(t, multiparts, 0) } +func TestExecutorExpireObjectsInDays(t *testing.T) { + ec := newExecutorContext(t) + defer ec.cancel() + + bktInfo, err := ec.createBktInfo(data.VersioningUnversioned) + require.NoError(t, err) + + // set epoch to 1 hour + ec.ffs.setEpoch(1) + ec.ffs.setEpochDuration(3600) + ec.ffs.setMsPerBlock(1000) + + addresses := make([]oid.Address, 6) + for i := range addresses { + addresses[i], err = ec.addObject(bktInfo, "obj"+strconv.Itoa(i), 0, nil) + require.NoError(t, err) + } + + lifecycleCfg := &data.LifecycleConfiguration{Rules: []data.LifecycleRule{ + { + Status: "Enabled", + Expiration: &data.LifecycleExpiration{Days: ptrInt(1), Date: time.Now().Add(30 * time.Hour).Format(time.RFC3339)}, + Filter: &data.LifecycleRuleFilter{Prefix: "obj0"}, + ID: "obj0 expired", + }, + { + Status: "Enabled", + Expiration: &data.LifecycleExpiration{Days: ptrInt(1), Date: time.Now().Add(18 * time.Hour).Format(time.RFC3339)}, + Filter: &data.LifecycleRuleFilter{Prefix: "obj1"}, + ID: "obj1 expired", + }, + { + Status: "Enabled", + Expiration: &data.LifecycleExpiration{Days: ptrInt(1), Date: time.Now().Add(42 * time.Hour).Format(time.RFC3339)}, + Filter: &data.LifecycleRuleFilter{Prefix: "obj2"}, + ID: "obj2 expired", + }, + { + Status: "Enabled", + Expiration: &data.LifecycleExpiration{Days: ptrInt(2), Date: time.Now().Add(30 * time.Hour).Format(time.RFC3339)}, + Filter: &data.LifecycleRuleFilter{Prefix: "obj3"}, + ID: "obj3 expired", + }, + { + Status: "Enabled", + Expiration: &data.LifecycleExpiration{Days: ptrInt(2), Date: time.Now().Add(52 * time.Hour).Format(time.RFC3339)}, + Filter: &data.LifecycleRuleFilter{Prefix: "obj4"}, + ID: "obj4 not expired", + }, + { + Status: "Enabled", + Expiration: &data.LifecycleExpiration{Days: ptrInt(2), Date: time.Now().Add(42 * time.Hour).Format(time.RFC3339)}, + Filter: &data.LifecycleRuleFilter{Prefix: "obj5"}, + ID: "obj5 not expired", + }, + }} + + currentEpoch := uint64(36) + ec.ffs.setEpoch(currentEpoch) + + nowTime = func() time.Time { + return time.Now().Add(time.Duration(currentEpoch) * time.Hour) + } + + ec.jobs <- Job{ + ContainerID: bktInfo.CID, + PrivateKey: ec.key, + LifecycleConfiguration: lifecycleCfg, + Epoch: currentEpoch, + } + + ec.close() + + for i, addr := range addresses { + _, err = ec.ffs.GetObject(ec.ctx, addr) + if i >= 4 { + require.NoError(t, err, "expected no error, got: %v (obj %d)", err, i) + } else { + require.Truef(t, client.IsErrObjectNotFound(err), "expected not found error, got: %v (obj %d)", err, i) + } + } +} + func ptrUint64(val uint64) *uint64 { return &val } diff --git a/internal/lifecycle/fetcher_test.go b/internal/lifecycle/fetcher_test.go index 2f27d9a..c5825b5 100644 --- a/internal/lifecycle/fetcher_test.go +++ b/internal/lifecycle/fetcher_test.go @@ -92,14 +92,19 @@ func (c *containerFetcherMock) Containers(owner user.ID) ([]cid.ID, error) { var _ FrostFSFetcher = (*frostfsFetcherMock)(nil) type frostfsFetcherMock struct { - mu sync.RWMutex - objects map[oid.Address]*object.Object - epoch uint64 + mu sync.RWMutex + objects map[oid.Address]*object.Object + epoch uint64 + epochDuration uint64 + msPerBlock int64 } func newFrostFSFetcherMock() *frostfsFetcherMock { return &frostfsFetcherMock{ - objects: map[oid.Address]*object.Object{}, + objects: map[oid.Address]*object.Object{}, + epoch: 1, + epochDuration: 3600, + msPerBlock: 1000, } } @@ -117,6 +122,20 @@ func (c *frostfsFetcherMock) setEpoch(epoch uint64) { c.epoch = epoch } +func (c *frostfsFetcherMock) setEpochDuration(blocks uint64) { + c.mu.Lock() + defer c.mu.Unlock() + + c.epochDuration = blocks +} + +func (c *frostfsFetcherMock) setMsPerBlock(msPerBlock int64) { + c.mu.Lock() + defer c.mu.Unlock() + + c.msPerBlock = msPerBlock +} + func (c *frostfsFetcherMock) GetObject(_ context.Context, addr oid.Address) (pool.ResGetObject, error) { c.mu.RLock() defer c.mu.RUnlock() @@ -144,8 +163,8 @@ func (c *frostfsFetcherMock) NetworkInfo(context.Context) (*netmap.NetworkInfo, var ni netmap.NetworkInfo ni.SetCurrentEpoch(c.epoch) - ni.SetEpochDuration(3600) - ni.SetMsPerBlock(1000) + ni.SetEpochDuration(c.epochDuration) + ni.SetMsPerBlock(c.msPerBlock) return &ni, nil }