diff --git a/internal/lifecycle/executor_test.go b/internal/lifecycle/executor_test.go index aba801d..4a365b1 100644 --- a/internal/lifecycle/executor_test.go +++ b/internal/lifecycle/executor_test.go @@ -15,170 +15,320 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/stretchr/testify/require" + "go.uber.org/zap" "go.uber.org/zap/zaptest" ) -func TestExecutorBase(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +type executorContext struct { + ctx context.Context + cancel context.CancelFunc + log *zap.Logger + key *keys.PrivateKey + ffs *frostfsFetcherMock + tree *tree.Tree + jobs chan Job + executor *Executor +} - logger := zaptest.NewLogger(t) +func newExecutorContext(t *testing.T) *executorContext { + log := zaptest.NewLogger(t) - key, err := keys.NewPrivateKey() + ec, err := newExecutorContextBase(log) require.NoError(t, err) - var owner user.ID - user.IDFromKey(&owner, key.PrivateKey.PublicKey) + return ec +} + +func newExecutorContextBase(log *zap.Logger) (*executorContext, error) { + ec := &executorContext{ + log: log, + ffs: newFrostFSFetcherMock(), + jobs: make(chan Job), + } + + var err error + ec.key, err = keys.NewPrivateKey() + if err != nil { + return nil, err + } + + memTreeCli, err := tree.NewTreeServiceClientMemory() + if err != nil { + return nil, err + } + ec.tree = tree.NewTree(memTreeCli, log) + + ec.ctx, ec.cancel = context.WithCancel(context.Background()) + + cfg := ExecutorConfig{ + Logger: ec.log, + Jobs: ec.jobs, + WorkerPoolSize: 1, + TreeFetcher: ec.tree, + FrostFSFetcher: ec.ffs, + } + + if ec.executor, err = NewExecutor(ec.ctx, cfg); err != nil { + return nil, err + } + + return ec, nil +} + +func (e *executorContext) owner() user.ID { + var owner user.ID + user.IDFromKey(&owner, e.key.PrivateKey.PublicKey) + + return owner +} + +func (e *executorContext) close() { + close(e.jobs) + e.executor.Done() +} + +func (e *executorContext) createBktInfo(versioningStatus string) (*data.BucketInfo, error) { cnrID := cidtest.ID() bktInfo := &data.BucketInfo{ CID: cnrID, - Owner: owner, + Owner: e.owner(), } - ffs := newFrostFSFetcherMock() - memTreeCli, err := tree.NewTreeServiceClientMemory() - require.NoError(t, err) - tr := tree.NewTree(memTreeCli, logger) - - err = tr.PutSettingsNode(ctx, bktInfo, &data.BucketSettings{Versioning: data.VersioningUnversioned}) - require.NoError(t, err) - - objAddr, err := addObjectWithTags(ffs, tr, bktInfo, "obj", nil) - require.NoError(t, err) - - jobs := make(chan Job) - cfg := ExecutorConfig{ - Logger: logger, - Jobs: jobs, - WorkerPoolSize: 1, - TreeFetcher: tr, - FrostFSFetcher: ffs, + err := e.tree.PutSettingsNode(e.ctx, bktInfo, &data.BucketSettings{Versioning: versioningStatus}) + if err != nil { + return nil, err } - executor, err := NewExecutor(ctx, cfg) - require.NoError(t, err) - - lifecycleCfg := &data.LifecycleConfiguration{Rules: []data.LifecycleRule{{ - Status: "Enabled", - Expiration: &data.LifecycleExpiration{ - Date: "2024-01-24T12:19:33Z", - }, - ID: "test", - }}} - - jobs <- Job{ - ContainerID: cnrID, - PrivateKey: key, - LifecycleConfiguration: lifecycleCfg, - Epoch: 1, - } - - close(jobs) - executor.Done() - - _, err = ffs.GetObject(ctx, objAddr) - require.Truef(t, client.IsErrObjectNotFound(err), "expected not found error, got: %v", err) + return bktInfo, nil } -func TestExecutorFilterPrefix(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - logger := zaptest.NewLogger(t) - - key, err := keys.NewPrivateKey() - require.NoError(t, err) - var owner user.ID - user.IDFromKey(&owner, key.PrivateKey.PublicKey) - - cnrID := cidtest.ID() - bktInfo := &data.BucketInfo{ - CID: cnrID, - Owner: owner, - } - - ffs := newFrostFSFetcherMock() - memTreeCli, err := tree.NewTreeServiceClientMemory() - require.NoError(t, err) - tr := tree.NewTree(memTreeCli, logger) - - err = tr.PutSettingsNode(ctx, bktInfo, &data.BucketSettings{Versioning: data.VersioningUnversioned}) - require.NoError(t, err) - - objAddr1, err := addObjectWithTags(ffs, tr, bktInfo, "obj", nil) - require.NoError(t, err) - objAddr2, err := addObjectWithTags(ffs, tr, bktInfo, "tmp/obj", nil) - require.NoError(t, err) - - jobs := make(chan Job) - cfg := ExecutorConfig{ - Logger: logger, - Jobs: jobs, - WorkerPoolSize: 1, - TreeFetcher: tr, - FrostFSFetcher: ffs, - } - - executor, err := NewExecutor(ctx, cfg) - require.NoError(t, err) - - lifecycleCfg := &data.LifecycleConfiguration{Rules: []data.LifecycleRule{{ - Status: "Enabled", - Expiration: &data.LifecycleExpiration{ - Date: "2024-01-24T12:19:33Z", - }, - Filter: &data.LifecycleRuleFilter{ - Prefix: "tmp", - }, - ID: "test", - }}} - - jobs <- Job{ - ContainerID: cnrID, - PrivateKey: key, - LifecycleConfiguration: lifecycleCfg, - Epoch: 1, - } - - close(jobs) - executor.Done() - - _, err = ffs.GetObject(ctx, objAddr1) - require.NoError(t, err) - _, err = ffs.GetObject(ctx, objAddr2) - require.Truef(t, client.IsErrObjectNotFound(err), "expected not found error, got: %v", err) -} - -func addObjectWithTags(ffs *frostfsFetcherMock, tr *tree.Tree, bktInfo *data.BucketInfo, name string, tags map[string]string) (oid.Address, error) { +func (e *executorContext) addObject(bktInfo *data.BucketInfo, name string, size int, tags map[string]string) (oid.Address, error) { var objAddr oid.Address objAddr.SetContainer(bktInfo.CID) objAddr.SetObject(oidtest.ID()) obj := object.New() obj.SetContainerID(objAddr.Container()) obj.SetID(objAddr.Object()) - obj.SetPayload([]byte("content")) - ffs.setObject(objAddr, obj) + e.ffs.setObject(objAddr, obj) + + content := "content" + buf := make([]byte, size) + for i := 0; i < len(buf); i++ { + buf[i] = content[i%len(content)] + } + obj.SetPayload(buf) + + settings, err := e.tree.GetSettingsNode(e.ctx, bktInfo) + if err != nil { + return oid.Address{}, err + } + + ni, err := e.ffs.NetworkInfo(e.ctx) + if err != nil { + return oid.Address{}, err + } now := time.Now() nodeVersion := &data.NodeVersion{ BaseNodeVersion: data.BaseNodeVersion{ - OID: objAddr.Object(), - FilePath: name, - Owner: &bktInfo.Owner, - Created: &now, + OID: objAddr.Object(), + FilePath: name, + Owner: &bktInfo.Owner, + Created: &now, + Size: uint64(size), + CreationEpoch: ni.CurrentEpoch(), }, - IsUnversioned: true, + IsUnversioned: !settings.VersioningEnabled(), } - id, err := tr.AddVersion(context.TODO(), bktInfo, nodeVersion) + id, err := e.tree.AddVersion(context.TODO(), bktInfo, nodeVersion) if err != nil { return oid.Address{}, err } nodeVersion.ID = id - if err = tr.PutObjectTagging(context.TODO(), bktInfo, nodeVersion, tags); err != nil { + if err = e.tree.PutObjectTagging(context.TODO(), bktInfo, nodeVersion, tags); err != nil { return oid.Address{}, err } return objAddr, nil } + +func TestExecutorFilterPrefix(t *testing.T) { + ec := newExecutorContext(t) + defer ec.cancel() + + bktInfo, err := ec.createBktInfo(data.VersioningUnversioned) + require.NoError(t, err) + + objAddr1, err := ec.addObject(bktInfo, "obj", 10, nil) + require.NoError(t, err) + objAddr2, err := ec.addObject(bktInfo, "tmp/obj", 10, nil) + require.NoError(t, err) + + lifecycleCfg := &data.LifecycleConfiguration{Rules: []data.LifecycleRule{{ + Status: "Enabled", + Expiration: &data.LifecycleExpiration{Date: "2024-01-24T12:19:33Z"}, + Filter: &data.LifecycleRuleFilter{Prefix: "tmp"}, + ID: "test", + }}} + + ec.jobs <- Job{ + ContainerID: bktInfo.CID, + PrivateKey: ec.key, + LifecycleConfiguration: lifecycleCfg, + Epoch: 1, + } + + ec.close() + + _, err = ec.ffs.GetObject(ec.ctx, objAddr1) + require.NoError(t, err) + _, err = ec.ffs.GetObject(ec.ctx, objAddr2) + require.Truef(t, client.IsErrObjectNotFound(err), "expected not found error, got: %v", err) +} + +func TestExecutorFilterNewerNoncurrent(t *testing.T) { + ec := newExecutorContext(t) + defer ec.cancel() + + bktInfo, err := ec.createBktInfo(data.VersioningEnabled) + require.NoError(t, err) + + ln := 10 + addresses := make([]oid.Address, ln) + for i := 0; i < ln; i++ { + addresses[i], err = ec.addObject(bktInfo, "obj", i, nil) + require.NoError(t, err) + } + + maxNonCurrent := 3 + lifecycleCfg := &data.LifecycleConfiguration{Rules: []data.LifecycleRule{{ + Status: "Enabled", + NonCurrentVersionExpiration: &data.NonCurrentVersionExpiration{NewerNonCurrentVersions: &maxNonCurrent}, + ID: "test", + }}} + + ec.jobs <- Job{ + ContainerID: bktInfo.CID, + PrivateKey: ec.key, + LifecycleConfiguration: lifecycleCfg, + Epoch: 1, + } + + ec.close() + + for i, addr := range addresses { + _, err = ec.ffs.GetObject(ec.ctx, addr) + if i < len(addresses)-maxNonCurrent-1 { + require.Truef(t, client.IsErrObjectNotFound(err), "expected not found error, got: %v", err) + } else { + require.NoError(t, err) + } + } +} + +func TestExecutorFilterNoncurrent(t *testing.T) { + ec := newExecutorContext(t) + defer ec.cancel() + + bktInfo, err := ec.createBktInfo(data.VersioningEnabled) + require.NoError(t, err) + + addr1, err := ec.addObject(bktInfo, "obj", 0, nil) + require.NoError(t, err) + + ec.ffs.setEpoch(2) + addr2, err := ec.addObject(bktInfo, "obj", 0, nil) + require.NoError(t, err) + + lifecycleCfg := &data.LifecycleConfiguration{Rules: []data.LifecycleRule{{ + Status: "Enabled", + NonCurrentVersionExpiration: &data.NonCurrentVersionExpiration{NonCurrentDays: ptrInt(1)}, + ID: "test", + }}} + + ec.jobs <- Job{ + ContainerID: bktInfo.CID, + PrivateKey: ec.key, + LifecycleConfiguration: lifecycleCfg, + Epoch: 30, // epoch duration is 1h, so we set epoch that certainly be after 24h + } + + ec.close() + + _, err = ec.ffs.GetObject(ec.ctx, addr1) + require.Truef(t, client.IsErrObjectNotFound(err), "expected not found error, got: %v", err) + + _, err = ec.ffs.GetObject(ec.ctx, addr2) + require.NoError(t, err) +} + +func TestExecutorMultipleRules(t *testing.T) { + ec := newExecutorContext(t) + defer ec.cancel() + + bktInfo, err := ec.createBktInfo(data.VersioningUnversioned) + require.NoError(t, err) + + ln := 5 + addresses := make([]oid.Address, ln) + + addresses[0], err = ec.addObject(bktInfo, "obj0", 0, nil) + require.NoError(t, err) + addresses[1], err = ec.addObject(bktInfo, "obj1", 100, nil) + require.NoError(t, err) + addresses[2], err = ec.addObject(bktInfo, "obj2", 50, nil) + require.NoError(t, err) + addresses[3], err = ec.addObject(bktInfo, "obj3", 0, map[string]string{"tag1": "val1"}) + require.NoError(t, err) + addresses[4], err = ec.addObject(bktInfo, "tmp1", 0, nil) + require.NoError(t, err) + + lifecycleCfg := &data.LifecycleConfiguration{Rules: []data.LifecycleRule{ + { + Status: "Enabled", + Expiration: &data.LifecycleExpiration{Date: "2024-01-24T12:19:33Z"}, + Filter: &data.LifecycleRuleFilter{Prefix: "tmp1"}, + ID: "for tmp1/obj", + }, + { + Status: "Enabled", + Expiration: &data.LifecycleExpiration{Date: "2024-01-24T12:19:33Z"}, + Filter: &data.LifecycleRuleFilter{ObjectSizeGreaterThan: ptrUint64(25), ObjectSizeLessThan: ptrUint64(75)}, + ID: "for obj1", + }, + { + Status: "Enabled", + Expiration: &data.LifecycleExpiration{Date: "2024-01-24T12:19:33Z"}, + Filter: &data.LifecycleRuleFilter{Tag: &data.Tag{Key: "tag1", Value: "val1"}}, + ID: "for obj3", + }, + }} + + ec.jobs <- Job{ + ContainerID: bktInfo.CID, + PrivateKey: ec.key, + LifecycleConfiguration: lifecycleCfg, + Epoch: 1, + } + + ec.close() + + for i, addr := range addresses { + _, err = ec.ffs.GetObject(ec.ctx, addr) + if i == 0 || i == 1 { + require.NoError(t, err) + } else { + require.Truef(t, client.IsErrObjectNotFound(err), "expected not found error, got: %v (obj %d)", err, i) + } + } +} + +func ptrUint64(val uint64) *uint64 { + return &val +} + +func ptrInt(val int) *int { + return &val +} diff --git a/internal/lifecycle/fetcher_test.go b/internal/lifecycle/fetcher_test.go index 046994e..1b75bfb 100644 --- a/internal/lifecycle/fetcher_test.go +++ b/internal/lifecycle/fetcher_test.go @@ -108,6 +108,13 @@ func (c *frostfsFetcherMock) setObject(addr oid.Address, obj *object.Object) { c.objects[addr] = obj } +func (c *frostfsFetcherMock) setEpoch(epoch uint64) { + c.mu.Lock() + defer c.mu.Unlock() + + c.epoch = epoch +} + func (c *frostfsFetcherMock) GetObject(_ context.Context, addr oid.Address) (pool.ResGetObject, error) { c.mu.RLock() defer c.mu.RUnlock() @@ -135,7 +142,7 @@ func (c *frostfsFetcherMock) NetworkInfo(context.Context) (*netmap.NetworkInfo, var ni netmap.NetworkInfo ni.SetCurrentEpoch(c.epoch) - ni.SetEpochDuration(10) + ni.SetEpochDuration(3600) ni.SetMsPerBlock(1000) return &ni, nil }