[#16] executor: Fix object expiration condition
Wrong comparison operators (and values) were used. Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
c9011f6da2
commit
aaa652de67
3 changed files with 118 additions and 11 deletions
|
@ -25,6 +25,10 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// nowTime is a value for getting the current time. This value can be overridden
|
||||||
|
// for testing mocking out current time.
|
||||||
|
var nowTime = time.Now
|
||||||
|
|
||||||
type Executor struct {
|
type Executor struct {
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
jobs <-chan Job
|
jobs <-chan Job
|
||||||
|
@ -521,7 +525,7 @@ func (e *Executor) objectMatcher(ctx context.Context, bktInfo *data.BucketInfo,
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if versionEpoch < prm.expirationEpoch && versionEpoch+prm.expirationDurationEpochs < ni.CurrentEpoch() {
|
if ni.CurrentEpoch() < prm.expirationEpoch && versionEpoch+prm.expirationDurationEpochs > ni.CurrentEpoch() {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -574,7 +578,7 @@ func (e *Executor) addDeleteMarker(ctx context.Context, version *data.NodeVersio
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now()
|
now := nowTime()
|
||||||
newVersion := &data.NodeVersion{
|
newVersion := &data.NodeVersion{
|
||||||
BaseNodeVersion: data.BaseNodeVersion{
|
BaseNodeVersion: data.BaseNodeVersion{
|
||||||
OID: randOID,
|
OID: randOID,
|
||||||
|
@ -653,7 +657,7 @@ func isNotFound(err error) bool {
|
||||||
func versionCreationEpoch(version *data.NodeVersion, ni *netmap.NetworkInfo) (uint64, error) {
|
func versionCreationEpoch(version *data.NodeVersion, ni *netmap.NetworkInfo) (uint64, error) {
|
||||||
objCreationEpoch := version.CreationEpoch
|
objCreationEpoch := version.CreationEpoch
|
||||||
if objCreationEpoch == 0 {
|
if objCreationEpoch == 0 {
|
||||||
created := time.Now()
|
created := nowTime()
|
||||||
if version.Created != nil {
|
if version.Created != nil {
|
||||||
created = *version.Created
|
created = *version.Created
|
||||||
}
|
}
|
||||||
|
@ -704,7 +708,7 @@ func creationEpoch(ni *netmap.NetworkInfo, created time.Time, meta map[string]st
|
||||||
}
|
}
|
||||||
|
|
||||||
func timeToEpoch(ni *netmap.NetworkInfo, timeToConvert time.Time) (uint64, error) {
|
func timeToEpoch(ni *netmap.NetworkInfo, timeToConvert time.Time) (uint64, error) {
|
||||||
dur := time.Until(timeToConvert)
|
dur := timeToConvert.Sub(nowTime())
|
||||||
|
|
||||||
epochLifetime, err := durationToEpochsAbs(ni, dur)
|
epochLifetime, err := durationToEpochsAbs(ni, dur)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -2,6 +2,7 @@ package lifecycle
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -356,7 +357,7 @@ func TestExecutorMultipleRulesMultiparts(t *testing.T) {
|
||||||
ContainerID: bktInfo.CID,
|
ContainerID: bktInfo.CID,
|
||||||
PrivateKey: ec.key,
|
PrivateKey: ec.key,
|
||||||
LifecycleConfiguration: lifecycleCfg,
|
LifecycleConfiguration: lifecycleCfg,
|
||||||
Epoch: 1,
|
Epoch: 50,
|
||||||
}
|
}
|
||||||
|
|
||||||
ec.close()
|
ec.close()
|
||||||
|
@ -397,6 +398,89 @@ func TestExecutorAbortMultipartsInDays(t *testing.T) {
|
||||||
require.Len(t, multiparts, 0)
|
require.Len(t, multiparts, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestExecutorExpireObjectsInDays(t *testing.T) {
|
||||||
|
ec := newExecutorContext(t)
|
||||||
|
defer ec.cancel()
|
||||||
|
|
||||||
|
bktInfo, err := ec.createBktInfo(data.VersioningUnversioned)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// set epoch to 1 hour
|
||||||
|
ec.ffs.setEpoch(1)
|
||||||
|
ec.ffs.setEpochDuration(3600)
|
||||||
|
ec.ffs.setMsPerBlock(1000)
|
||||||
|
|
||||||
|
addresses := make([]oid.Address, 6)
|
||||||
|
for i := range addresses {
|
||||||
|
addresses[i], err = ec.addObject(bktInfo, "obj"+strconv.Itoa(i), 0, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
lifecycleCfg := &data.LifecycleConfiguration{Rules: []data.LifecycleRule{
|
||||||
|
{
|
||||||
|
Status: "Enabled",
|
||||||
|
Expiration: &data.LifecycleExpiration{Days: ptrInt(1), Date: time.Now().Add(30 * time.Hour).Format(time.RFC3339)},
|
||||||
|
Filter: &data.LifecycleRuleFilter{Prefix: "obj0"},
|
||||||
|
ID: "obj0 expired",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Status: "Enabled",
|
||||||
|
Expiration: &data.LifecycleExpiration{Days: ptrInt(1), Date: time.Now().Add(18 * time.Hour).Format(time.RFC3339)},
|
||||||
|
Filter: &data.LifecycleRuleFilter{Prefix: "obj1"},
|
||||||
|
ID: "obj1 expired",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Status: "Enabled",
|
||||||
|
Expiration: &data.LifecycleExpiration{Days: ptrInt(1), Date: time.Now().Add(42 * time.Hour).Format(time.RFC3339)},
|
||||||
|
Filter: &data.LifecycleRuleFilter{Prefix: "obj2"},
|
||||||
|
ID: "obj2 expired",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Status: "Enabled",
|
||||||
|
Expiration: &data.LifecycleExpiration{Days: ptrInt(2), Date: time.Now().Add(30 * time.Hour).Format(time.RFC3339)},
|
||||||
|
Filter: &data.LifecycleRuleFilter{Prefix: "obj3"},
|
||||||
|
ID: "obj3 expired",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Status: "Enabled",
|
||||||
|
Expiration: &data.LifecycleExpiration{Days: ptrInt(2), Date: time.Now().Add(52 * time.Hour).Format(time.RFC3339)},
|
||||||
|
Filter: &data.LifecycleRuleFilter{Prefix: "obj4"},
|
||||||
|
ID: "obj4 not expired",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Status: "Enabled",
|
||||||
|
Expiration: &data.LifecycleExpiration{Days: ptrInt(2), Date: time.Now().Add(42 * time.Hour).Format(time.RFC3339)},
|
||||||
|
Filter: &data.LifecycleRuleFilter{Prefix: "obj5"},
|
||||||
|
ID: "obj5 not expired",
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
|
||||||
|
currentEpoch := uint64(36)
|
||||||
|
ec.ffs.setEpoch(currentEpoch)
|
||||||
|
|
||||||
|
nowTime = func() time.Time {
|
||||||
|
return time.Now().Add(time.Duration(currentEpoch) * time.Hour)
|
||||||
|
}
|
||||||
|
|
||||||
|
ec.jobs <- Job{
|
||||||
|
ContainerID: bktInfo.CID,
|
||||||
|
PrivateKey: ec.key,
|
||||||
|
LifecycleConfiguration: lifecycleCfg,
|
||||||
|
Epoch: currentEpoch,
|
||||||
|
}
|
||||||
|
|
||||||
|
ec.close()
|
||||||
|
|
||||||
|
for i, addr := range addresses {
|
||||||
|
_, err = ec.ffs.GetObject(ec.ctx, addr)
|
||||||
|
if i >= 4 {
|
||||||
|
require.NoError(t, err, "expected no error, got: %v (obj %d)", err, i)
|
||||||
|
} else {
|
||||||
|
require.Truef(t, client.IsErrObjectNotFound(err), "expected not found error, got: %v (obj %d)", err, i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func ptrUint64(val uint64) *uint64 {
|
func ptrUint64(val uint64) *uint64 {
|
||||||
return &val
|
return &val
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,14 +92,19 @@ func (c *containerFetcherMock) Containers(owner user.ID) ([]cid.ID, error) {
|
||||||
var _ FrostFSFetcher = (*frostfsFetcherMock)(nil)
|
var _ FrostFSFetcher = (*frostfsFetcherMock)(nil)
|
||||||
|
|
||||||
type frostfsFetcherMock struct {
|
type frostfsFetcherMock struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
objects map[oid.Address]*object.Object
|
objects map[oid.Address]*object.Object
|
||||||
epoch uint64
|
epoch uint64
|
||||||
|
epochDuration uint64
|
||||||
|
msPerBlock int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func newFrostFSFetcherMock() *frostfsFetcherMock {
|
func newFrostFSFetcherMock() *frostfsFetcherMock {
|
||||||
return &frostfsFetcherMock{
|
return &frostfsFetcherMock{
|
||||||
objects: map[oid.Address]*object.Object{},
|
objects: map[oid.Address]*object.Object{},
|
||||||
|
epoch: 1,
|
||||||
|
epochDuration: 3600,
|
||||||
|
msPerBlock: 1000,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,6 +122,20 @@ func (c *frostfsFetcherMock) setEpoch(epoch uint64) {
|
||||||
c.epoch = epoch
|
c.epoch = epoch
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *frostfsFetcherMock) setEpochDuration(blocks uint64) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
c.epochDuration = blocks
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *frostfsFetcherMock) setMsPerBlock(msPerBlock int64) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
c.msPerBlock = msPerBlock
|
||||||
|
}
|
||||||
|
|
||||||
func (c *frostfsFetcherMock) GetObject(_ context.Context, addr oid.Address) (pool.ResGetObject, error) {
|
func (c *frostfsFetcherMock) GetObject(_ context.Context, addr oid.Address) (pool.ResGetObject, error) {
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
defer c.mu.RUnlock()
|
defer c.mu.RUnlock()
|
||||||
|
@ -144,8 +163,8 @@ func (c *frostfsFetcherMock) NetworkInfo(context.Context) (*netmap.NetworkInfo,
|
||||||
|
|
||||||
var ni netmap.NetworkInfo
|
var ni netmap.NetworkInfo
|
||||||
ni.SetCurrentEpoch(c.epoch)
|
ni.SetCurrentEpoch(c.epoch)
|
||||||
ni.SetEpochDuration(3600)
|
ni.SetEpochDuration(c.epochDuration)
|
||||||
ni.SetMsPerBlock(1000)
|
ni.SetMsPerBlock(c.msPerBlock)
|
||||||
return &ni, nil
|
return &ni, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue