[#4] Support object expiration

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-07-24 15:20:49 +03:00
parent d9f16604bc
commit 1121da9d24
6 changed files with 712 additions and 122 deletions

View file

@ -24,9 +24,6 @@ type ContainerFetcher interface {
type TreeFetcher interface { type TreeFetcher interface {
GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error)
GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error)
DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error)
} }
type FrostFSFetcher interface { type FrostFSFetcher interface {

View file

@ -3,16 +3,21 @@ package lifecycle
import ( import (
"context" "context"
"crypto/ecdsa" "crypto/ecdsa"
"crypto/rand"
"errors" "errors"
"fmt" "fmt"
"io"
"math" "math"
"sort"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
@ -24,15 +29,16 @@ type Executor struct {
log *zap.Logger log *zap.Logger
jobs <-chan Job jobs <-chan Job
pool *ants.Pool pool *ants.Pool
tree TreeFetcher tree *tree.Tree
frostfs FrostFSFetcher frostfs FrostFSFetcher
done chan struct{}
} }
type ExecutorConfig struct { type ExecutorConfig struct {
Logger *zap.Logger Logger *zap.Logger
Jobs <-chan Job Jobs <-chan Job
WorkerPoolSize int WorkerPoolSize int
TreeFetcher TreeFetcher TreeFetcher *tree.Tree
FrostFSFetcher FrostFSFetcher FrostFSFetcher FrostFSFetcher
} }
@ -50,6 +56,7 @@ func NewExecutor(ctx context.Context, cfg ExecutorConfig) (*Executor, error) {
jobs: cfg.Jobs, jobs: cfg.Jobs,
tree: cfg.TreeFetcher, tree: cfg.TreeFetcher,
frostfs: cfg.FrostFSFetcher, frostfs: cfg.FrostFSFetcher,
done: make(chan struct{}),
} }
var err error var err error
@ -63,6 +70,10 @@ func NewExecutor(ctx context.Context, cfg ExecutorConfig) (*Executor, error) {
return e, nil return e, nil
} }
func (e *Executor) Done() {
<-e.done
}
func (e *Executor) workerRoutine(ctx context.Context) { func (e *Executor) workerRoutine(ctx context.Context) {
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
@ -70,32 +81,38 @@ func (e *Executor) workerRoutine(ctx context.Context) {
) )
LOOP: LOOP:
for job := range e.jobs { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
e.log.Info(logs.ExecutorStopped, zap.Error(ctx.Err()))
break LOOP break LOOP
default: case job, ok := <-e.jobs:
} if !ok {
e.log.Info(logs.ExecutorStoppedJobsChannelIsClosed)
func(job Job) { break LOOP
wg.Add(1)
err = e.pool.Submit(func() {
defer wg.Done()
if inErr := e.worker(ctx, job); inErr != nil {
e.log.Warn(logs.WorkerFailedToHandleJob, zap.Error(err))
}
})
if err != nil {
wg.Done()
e.log.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
} }
}(job)
func(job Job) {
wg.Add(1)
err = e.pool.Submit(func() {
defer wg.Done()
if inErr := e.worker(ctx, job); inErr != nil {
e.log.Warn(logs.WorkerFailedToHandleJob, zap.Uint64("epoch", job.Epoch),
zap.String("cid", job.ContainerID.EncodeToString()), zap.Error(inErr))
}
})
if err != nil {
wg.Done()
e.log.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
}
}(job)
}
} }
wg.Wait() wg.Wait()
e.pool.Release() e.pool.Release()
close(e.done)
} }
func (e *Executor) worker(ctx context.Context, job Job) error { func (e *Executor) worker(ctx context.Context, job Job) error {
@ -109,15 +126,23 @@ func (e *Executor) worker(ctx context.Context, job Job) error {
Owner: userID, Owner: userID,
} }
settings, err := e.tree.GetSettingsNode(ctx, bktInfo)
if err != nil {
return fmt.Errorf("get settings node: %w", err)
}
ni, err := e.frostfs.NetworkInfo(ctx) ni, err := e.frostfs.NetworkInfo(ctx)
if err != nil { if err != nil {
return fmt.Errorf("get network info: %w", err) return fmt.Errorf("get network info: %w", err)
} }
if ni.CurrentEpoch() != job.Epoch { if ni.CurrentEpoch() != job.Epoch {
e.log.Warn(logs.EpochMismatched, zap.Uint64("job epoch", job.Epoch), zap.Uint64("network info epoch", ni.CurrentEpoch()))
ni.SetCurrentEpoch(job.Epoch) ni.SetCurrentEpoch(job.Epoch)
} }
// todo consider listing full object and multipart once
for _, rule := range job.LifecycleConfiguration.Rules { for _, rule := range job.LifecycleConfiguration.Rules {
if rule.Status == "Disabled" { if rule.Status == "Disabled" {
continue continue
@ -125,8 +150,8 @@ func (e *Executor) worker(ctx context.Context, job Job) error {
// todo consider validating rule // todo consider validating rule
if err = e.handleRule(ctx, ni, rule, bktInfo); err != nil { if err = e.handleRule(ctx, ni, rule, bktInfo, settings); err != nil {
e.log.Warn("failed to handle rule", zap.Uint64("epoch", job.Epoch), e.log.Warn(logs.FailedToHandleRule, zap.Uint64("epoch", job.Epoch),
zap.Stringer("cid", job.ContainerID), zap.String("rule", rule.ID), zap.Stringer("cid", job.ContainerID), zap.String("rule", rule.ID),
zap.Error(err)) zap.Error(err))
} }
@ -135,9 +160,12 @@ func (e *Executor) worker(ctx context.Context, job Job) error {
return nil return nil
} }
func (e *Executor) handleRule(ctx context.Context, ni *netmap.NetworkInfo, rule data.LifecycleRule, bktInfo *data.BucketInfo) error { func (e *Executor) handleRule(ctx context.Context, ni *netmap.NetworkInfo, rule data.LifecycleRule, bktInfo *data.BucketInfo, settings *data.BucketSettings) error {
if err := e.abortMultiparts(ctx, ni, rule, bktInfo); err != nil { if err := e.abortMultiparts(ctx, ni, rule, bktInfo); err != nil {
e.log.Warn("handle multiparts uploads", zap.Error(err)) e.log.Warn(logs.AbortMultipartUploads, zap.Error(err))
}
if err := e.expireObjects(ctx, ni, rule, bktInfo, settings); err != nil {
e.log.Warn(logs.ExpireObjects, zap.Error(err))
} }
return nil return nil
@ -160,7 +188,6 @@ func (e *Executor) abortMultiparts(ctx context.Context, ni *netmap.NetworkInfo,
if filter.ObjectSizeGreaterThan != nil || filter.ObjectSizeLessThan != nil || if filter.ObjectSizeGreaterThan != nil || filter.ObjectSizeLessThan != nil ||
filter.And != nil && (filter.And.ObjectSizeGreaterThan != nil || filter.And.ObjectSizeLessThan != nil) { filter.And != nil && (filter.And.ObjectSizeGreaterThan != nil || filter.And.ObjectSizeLessThan != nil) {
// todo check if AWS behave the same way // todo check if AWS behave the same way
// uncompleted multipart has no size, so filter cannot be applied // uncompleted multipart has no size, so filter cannot be applied
return nil return nil
@ -197,7 +224,7 @@ func (e *Executor) abortMultiparts(ctx context.Context, ni *netmap.NetworkInfo,
multipartCreationEpoch, err := creationEpoch(ni, multipart.Created, multipart.Meta) multipartCreationEpoch, err := creationEpoch(ni, multipart.Created, multipart.Meta)
if err != nil { if err != nil {
e.log.Warn("failed to get multipart creation epoch", zap.Error(err)) e.log.Warn(logs.FailedToGetMultipartCreationEpoch, zap.Error(err))
continue continue
} }
@ -206,7 +233,7 @@ func (e *Executor) abortMultiparts(ctx context.Context, ni *netmap.NetworkInfo,
} }
if err = e.abortMultipart(ctx, bktInfo, multipart); err != nil { if err = e.abortMultipart(ctx, bktInfo, multipart); err != nil {
e.log.Warn("failed to abort multipart", zap.String("key", multipart.Key), e.log.Warn(logs.FailedToAbortMultipart, zap.String("key", multipart.Key),
zap.String("upload_id", multipart.UploadID), zap.Error(err)) zap.String("upload_id", multipart.UploadID), zap.Error(err))
continue continue
} }
@ -215,6 +242,358 @@ func (e *Executor) abortMultiparts(ctx context.Context, ni *netmap.NetworkInfo,
return nil return nil
} }
func (e *Executor) expireObjects(ctx context.Context, ni *netmap.NetworkInfo, rule data.LifecycleRule, bktInfo *data.BucketInfo, settings *data.BucketSettings) error {
if rule.Expiration == nil && rule.NonCurrentVersionExpiration == nil {
return fmt.Errorf("no expiration rules for '%s", rule.ID)
}
var prefix string
if rule.Filter != nil {
prefix = rule.Filter.Prefix
if rule.Filter.And != nil {
prefix = rule.Filter.And.Prefix
}
}
objectStream, err := e.tree.InitVersionsByPrefixStream(ctx, bktInfo, prefix, false)
if err != nil {
return fmt.Errorf("list multiparts: %w", err)
}
var (
latestObjName string
versions []*data.NodeVersion
)
for {
nodeVersion, err := objectStream.Next(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
if err = e.expireObject(ctx, versions, bktInfo, ni, rule, settings); err != nil {
e.log.Warn(logs.FailedToExpireObject, zap.String("object", latestObjName), zap.Error(err))
}
return nil
}
return fmt.Errorf("get node version from stream: %w", err)
}
if nodeVersion.FilePath != latestObjName {
if err = e.expireObject(ctx, versions, bktInfo, ni, rule, settings); err != nil {
e.log.Warn(logs.FailedToExpireObject, zap.String("object", latestObjName), zap.Error(err))
}
latestObjName = nodeVersion.FilePath
versions = versions[:0]
}
versions = append(versions, nodeVersion)
}
}
func (e *Executor) expireObject(ctx context.Context, versions []*data.NodeVersion, bktInfo *data.BucketInfo, ni *netmap.NetworkInfo, rule data.LifecycleRule, settings *data.BucketSettings) error {
sort.Slice(versions, func(i, j int) bool {
return versions[i].Timestamp < versions[j].Timestamp
})
matchFilter, err := e.expiredObjectMatcher(ctx, versions, bktInfo, ni, rule)
if err != nil {
return err
}
var nullVersionsToDelete []int
for i, version := range versions {
if version.IsUnversioned {
nullVersionsToDelete = append(nullVersionsToDelete, i)
}
matched, err := matchFilter(i, version)
if err != nil {
return err
}
if !matched {
continue
}
if i != len(versions)-1 { // non current
e.deleteObject(ctx, version, bktInfo)
if version.IsUnversioned {
nullVersionsToDelete = nullVersionsToDelete[:len(nullVersionsToDelete)-1]
}
continue
}
switch {
case settings.Unversioned():
e.deleteObject(ctx, version, bktInfo)
case settings.VersioningEnabled():
if version.IsDeleteMarker && len(versions) == 1 { // remove expired object delete marker
e.removeVersion(ctx, version, bktInfo)
} else if !version.IsDeleteMarker {
e.addDeleteMarker(ctx, version, ni, bktInfo, settings)
}
default:
for _, index := range nullVersionsToDelete {
if versions[index].ID == version.ID && version.IsDeleteMarker {
continue
}
e.deleteObject(ctx, versions[index], bktInfo)
}
if !version.IsDeleteMarker {
e.addDeleteMarker(ctx, version, ni, bktInfo, settings)
}
}
}
return nil
}
func (e *Executor) expiredObjectMatcher(ctx context.Context, versions []*data.NodeVersion, bktInfo *data.BucketInfo, ni *netmap.NetworkInfo, rule data.LifecycleRule) (func(index int, version *data.NodeVersion) (bool, error), error) {
var prm objectMatcherParams
prm.minObjSize, prm.maxObjSize = getObjectSizeRange(rule)
if rule.NonCurrentVersionExpiration != nil {
if rule.NonCurrentVersionExpiration.NewerNonCurrentVersions != nil {
prm.startIndex = len(versions) - *rule.NonCurrentVersionExpiration.NewerNonCurrentVersions - 1
if prm.startIndex < 0 {
prm.startIndex = 0
}
}
if rule.NonCurrentVersionExpiration.NonCurrentDays != nil {
nonCurrentDuration, err := durationToEpochsAbs(ni, 24*time.Hour*time.Duration(*rule.NonCurrentVersionExpiration.NonCurrentDays))
if err != nil {
return nil, fmt.Errorf("NonCurrentDays to epochs: %w", err)
}
prm.nonCurrentDuration = &nonCurrentDuration
}
}
if rule.Expiration != nil {
if rule.Expiration.Date != "" {
dateToExpire, err := time.Parse(time.RFC3339, rule.Expiration.Date)
if err != nil {
return nil, fmt.Errorf("invalid expiration date '%s': %w", rule.Expiration.Date, err)
}
if prm.expirationEpoch, err = timeToEpoch(ni, dateToExpire); err != nil {
return nil, fmt.Errorf("expiration date to epoch: %w", err)
}
}
if rule.Expiration.Days != nil {
var err error
prm.expirationDurationEpochs, err = durationToEpochsAbs(ni, 24*time.Hour*time.Duration(*rule.Expiration.Days))
if err != nil {
return nil, fmt.Errorf("Expiration.Days to epochs: %w", err)
}
}
prm.expiredObjectDeleteMarker = rule.Expiration.ExpiredObjectDeleteMarker != nil && *rule.Expiration.ExpiredObjectDeleteMarker
}
if rule.Filter != nil {
if rule.Filter.Tag != nil {
prm.tagsToMatch = append(prm.tagsToMatch, *rule.Filter.Tag)
}
if rule.Filter.And != nil {
prm.tagsToMatch = append(prm.tagsToMatch, rule.Filter.And.Tags...)
}
}
return e.objectMatcher(ctx, bktInfo, ni, versions, prm), nil
}
type objectMatcherParams struct {
startIndex int
minObjSize uint64
maxObjSize uint64
nonCurrentDuration *uint64
expiredObjectDeleteMarker bool
expirationEpoch uint64
expirationDurationEpochs uint64
tagsToMatch []data.Tag
}
func (e *Executor) objectMatcher(ctx context.Context, bktInfo *data.BucketInfo, ni *netmap.NetworkInfo, versions []*data.NodeVersion, prm objectMatcherParams) func(int, *data.NodeVersion) (bool, error) {
return func(index int, version *data.NodeVersion) (bool, error) {
if index < prm.startIndex {
return true, nil
}
if prm.nonCurrentDuration != nil {
if index < len(versions)-1 {
next := versions[index+1]
epoch, err := versionCreationEpoch(next, ni)
if err != nil {
return false, err
}
if epoch+(*prm.nonCurrentDuration) < ni.CurrentEpoch() {
return false, nil
}
}
}
// the following applying only for current version,
// so we have to skip all non-current
if index != len(versions)-1 {
return false, nil
}
// remove expired delete marker unconditionally
if version.IsDeleteMarker && len(versions) == 1 && prm.expiredObjectDeleteMarker {
return true, nil
}
versionEpoch, err := versionCreationEpoch(version, ni)
if err != nil {
return false, err
}
if versionEpoch < prm.expirationEpoch && versionEpoch+prm.expirationDurationEpochs < ni.CurrentEpoch() {
return false, nil
}
if version.IsDeleteMarker && len(versions) == 1 { // remove expired delete marker under matching all conditions
return true, nil
}
if version.Size < prm.minObjSize || version.Size > prm.maxObjSize {
return false, nil
}
if len(prm.tagsToMatch) == 0 {
return true, nil
}
tags, err := e.tree.GetObjectTagging(ctx, bktInfo, version)
if err != nil {
return false, fmt.Errorf("get object tags from tree: %w", err)
}
for _, tag := range prm.tagsToMatch {
if tags[tag.Key] != tag.Value {
return false, nil
}
}
return true, nil
}
}
func (e *Executor) deleteObject(ctx context.Context, version *data.NodeVersion, bktInfo *data.BucketInfo) {
if !version.IsDeleteMarker {
var addr oid.Address
addr.SetContainer(bktInfo.CID)
addr.SetObject(version.OID)
if err := e.frostfs.DeleteObject(ctx, addr); err != nil && !isNotFound(err) {
e.log.Warn(logs.DeleteObjectVersionFromStorage, zap.String("key", version.FilePath),
zap.String("address", addr.EncodeToString()), zap.Error(err))
return
}
}
e.removeVersion(ctx, version, bktInfo)
}
func (e *Executor) addDeleteMarker(ctx context.Context, version *data.NodeVersion, ni *netmap.NetworkInfo, bktInfo *data.BucketInfo, settings *data.BucketSettings) {
randOID, err := getRandomOID()
if err != nil {
e.log.Warn(logs.FailedToGenerateRandomIDForDeleteMarker, zap.Error(err))
return
}
now := time.Now()
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: randOID,
FilePath: version.FilePath,
Created: &now,
Owner: &bktInfo.Owner,
IsDeleteMarker: true,
CreationEpoch: ni.CurrentEpoch(),
},
IsUnversioned: settings.VersioningSuspended(),
}
if _, err = e.tree.AddVersion(ctx, bktInfo, newVersion); err != nil {
e.log.Warn(logs.AddDeleteMarker, zap.Error(err))
return
}
}
func (e *Executor) removeVersion(ctx context.Context, version *data.NodeVersion, bktInfo *data.BucketInfo) {
if err := e.tree.RemoveVersion(ctx, bktInfo, version.ID); err != nil {
e.log.Warn(logs.DeleteObjectVersionFromTree, zap.String("key", version.FilePath),
zap.Uint64("id", version.ID), zap.Error(err))
}
}
func getRandomOID() (oid.ID, error) {
b := [32]byte{}
if _, err := rand.Read(b[:]); err != nil {
return oid.ID{}, err
}
var objID oid.ID
objID.SetSHA256(b)
return objID, nil
}
func getObjectSizeRange(rule data.LifecycleRule) (uint64, uint64) {
minObjSize := uint64(0)
maxObjSize := uint64(math.MaxUint64)
if rule.Filter == nil {
return minObjSize, maxObjSize
}
if rule.Filter.ObjectSizeGreaterThan != nil {
minObjSize = *rule.Filter.ObjectSizeGreaterThan
if minObjSize != math.MaxUint64 {
minObjSize++
}
} else if rule.Filter.And != nil && rule.Filter.And.ObjectSizeGreaterThan != nil {
minObjSize = *rule.Filter.And.ObjectSizeGreaterThan
if minObjSize != math.MaxUint64 {
minObjSize++
}
}
if rule.Filter.ObjectSizeLessThan != nil {
maxObjSize = *rule.Filter.ObjectSizeLessThan
if maxObjSize != 0 {
maxObjSize--
}
} else if rule.Filter.And != nil && rule.Filter.And.ObjectSizeLessThan != nil {
maxObjSize = *rule.Filter.And.ObjectSizeLessThan - 1
if maxObjSize != 0 {
maxObjSize--
}
}
return minObjSize, maxObjSize
}
func isNotFound(err error) bool {
return client.IsErrObjectAlreadyRemoved(err) || client.IsErrObjectNotFound(err)
}
func versionCreationEpoch(version *data.NodeVersion, ni *netmap.NetworkInfo) (uint64, error) {
objCreationEpoch := version.CreationEpoch
if objCreationEpoch == 0 {
created := time.Now()
if version.Created != nil {
created = *version.Created
}
var err error
if objCreationEpoch, err = timeToEpoch(ni, created); err != nil {
return 0, fmt.Errorf("time to epoch: %w", err)
}
}
return objCreationEpoch, nil
}
func matchMultipartByTagsFunc(filter *data.LifecycleRuleFilter) func(*data.MultipartInfo) bool { func matchMultipartByTagsFunc(filter *data.LifecycleRuleFilter) func(*data.MultipartInfo) bool {
return func(info *data.MultipartInfo) bool { return func(info *data.MultipartInfo) bool {
tags := make(map[string]string) tags := make(map[string]string)
@ -280,7 +659,7 @@ func creationEpoch(ni *netmap.NetworkInfo, created time.Time, meta map[string]st
} }
func timeToEpoch(ni *netmap.NetworkInfo, timeToConvert time.Time) (uint64, error) { func timeToEpoch(ni *netmap.NetworkInfo, timeToConvert time.Time) (uint64, error) {
dur := timeToConvert.Sub(time.Now()) dur := time.Until(timeToConvert)
epochLifetime, err := durationToEpochsAbs(ni, dur) epochLifetime, err := durationToEpochsAbs(ni, dur)
if err != nil { if err != nil {

View file

@ -0,0 +1,184 @@
package lifecycle
import (
"context"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
func TestExecutorBase(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := zaptest.NewLogger(t)
key, err := keys.NewPrivateKey()
require.NoError(t, err)
var owner user.ID
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
cnrID := cidtest.ID()
bktInfo := &data.BucketInfo{
CID: cnrID,
Owner: owner,
}
ffs := newFrostFSFetcherMock()
memTreeCli, err := tree.NewTreeServiceClientMemory()
require.NoError(t, err)
tr := tree.NewTree(memTreeCli, logger)
err = tr.PutSettingsNode(ctx, bktInfo, &data.BucketSettings{Versioning: data.VersioningUnversioned})
require.NoError(t, err)
objAddr, err := addObjectWithTags(ffs, tr, bktInfo, "obj", nil)
require.NoError(t, err)
jobs := make(chan Job)
cfg := ExecutorConfig{
Logger: logger,
Jobs: jobs,
WorkerPoolSize: 1,
TreeFetcher: tr,
FrostFSFetcher: ffs,
}
executor, err := NewExecutor(ctx, cfg)
require.NoError(t, err)
lifecycleCfg := &data.LifecycleConfiguration{Rules: []data.LifecycleRule{{
Status: "Enabled",
Expiration: &data.LifecycleExpiration{
Date: "2024-01-24T12:19:33Z",
},
ID: "test",
}}}
jobs <- Job{
ContainerID: cnrID,
PrivateKey: key,
LifecycleConfiguration: lifecycleCfg,
Epoch: 1,
}
close(jobs)
executor.Done()
_, err = ffs.GetObject(ctx, objAddr)
require.Truef(t, client.IsErrObjectNotFound(err), "expected not found error, got: %v", err)
}
func TestExecutorFilterPrefix(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := zaptest.NewLogger(t)
key, err := keys.NewPrivateKey()
require.NoError(t, err)
var owner user.ID
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
cnrID := cidtest.ID()
bktInfo := &data.BucketInfo{
CID: cnrID,
Owner: owner,
}
ffs := newFrostFSFetcherMock()
memTreeCli, err := tree.NewTreeServiceClientMemory()
require.NoError(t, err)
tr := tree.NewTree(memTreeCli, logger)
err = tr.PutSettingsNode(ctx, bktInfo, &data.BucketSettings{Versioning: data.VersioningUnversioned})
require.NoError(t, err)
objAddr1, err := addObjectWithTags(ffs, tr, bktInfo, "obj", nil)
require.NoError(t, err)
objAddr2, err := addObjectWithTags(ffs, tr, bktInfo, "tmp/obj", nil)
require.NoError(t, err)
jobs := make(chan Job)
cfg := ExecutorConfig{
Logger: logger,
Jobs: jobs,
WorkerPoolSize: 1,
TreeFetcher: tr,
FrostFSFetcher: ffs,
}
executor, err := NewExecutor(ctx, cfg)
require.NoError(t, err)
lifecycleCfg := &data.LifecycleConfiguration{Rules: []data.LifecycleRule{{
Status: "Enabled",
Expiration: &data.LifecycleExpiration{
Date: "2024-01-24T12:19:33Z",
},
Filter: &data.LifecycleRuleFilter{
Prefix: "tmp",
},
ID: "test",
}}}
jobs <- Job{
ContainerID: cnrID,
PrivateKey: key,
LifecycleConfiguration: lifecycleCfg,
Epoch: 1,
}
close(jobs)
executor.Done()
_, err = ffs.GetObject(ctx, objAddr1)
require.NoError(t, err)
_, err = ffs.GetObject(ctx, objAddr2)
require.Truef(t, client.IsErrObjectNotFound(err), "expected not found error, got: %v", err)
}
func addObjectWithTags(ffs *frostfsFetcherMock, tr *tree.Tree, bktInfo *data.BucketInfo, name string, tags map[string]string) (oid.Address, error) {
var objAddr oid.Address
objAddr.SetContainer(bktInfo.CID)
objAddr.SetObject(oidtest.ID())
obj := object.New()
obj.SetContainerID(objAddr.Container())
obj.SetID(objAddr.Object())
obj.SetPayload([]byte("content"))
ffs.setObject(objAddr, obj)
now := time.Now()
nodeVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: objAddr.Object(),
FilePath: name,
Owner: &bktInfo.Owner,
Created: &now,
},
IsUnversioned: true,
}
id, err := tr.AddVersion(context.TODO(), bktInfo, nodeVersion)
if err != nil {
return oid.Address{}, err
}
nodeVersion.ID = id
if err = tr.PutObjectTagging(context.TODO(), bktInfo, nodeVersion, tags); err != nil {
return oid.Address{}, err
}
return objAddr, nil
}

View file

@ -262,7 +262,7 @@ func (p *JobProvider) handleContainer(ctx context.Context, uc *UserContainer, ep
addr.SetContainer(uc.Container) addr.SetContainer(uc.Container)
addr.SetObject(objID) addr.SetObject(objID)
configuration, err := p.fetchLifecycleConfiguration(ctx, addr, *btoken) configuration, err := p.fetchLifecycleConfiguration(ctx, addr)
if err != nil { if err != nil {
return fmt.Errorf("get lifecycle configuration from storage: %w", err) return fmt.Errorf("get lifecycle configuration from storage: %w", err)
} }
@ -323,7 +323,7 @@ func (p *JobProvider) svcKeys() (keys.PublicKeys, uint64) {
return lifecyclerKeys, uint64(position) return lifecyclerKeys, uint64(position)
} }
func (p *JobProvider) fetchLifecycleConfiguration(ctx context.Context, addr oid.Address, btoken bearer.Token) (*data.LifecycleConfiguration, error) { func (p *JobProvider) fetchLifecycleConfiguration(ctx context.Context, addr oid.Address) (*data.LifecycleConfiguration, error) {
res, err := p.frostfsFetcher.GetObject(ctx, addr) res, err := p.frostfsFetcher.GetObject(ctx, addr)
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -6,11 +6,14 @@ import (
"encoding/xml" "encoding/xml"
"errors" "errors"
"io" "io"
"sync"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
@ -87,31 +90,36 @@ func (c *containerFetcherMock) Containers(owner user.ID) ([]cid.ID, error) {
var _ FrostFSFetcher = (*frostfsFetcherMock)(nil) var _ FrostFSFetcher = (*frostfsFetcherMock)(nil)
type frostfsFetcherMock struct { type frostfsFetcherMock struct {
configurations map[oid.Address]*data.LifecycleConfiguration mu sync.RWMutex
objects map[oid.Address]*object.Object
epoch uint64
} }
func newFrostFSFetcherMock(configs map[oid.Address]*data.LifecycleConfiguration) *frostfsFetcherMock { func newFrostFSFetcherMock() *frostfsFetcherMock {
if configs == nil {
configs = map[oid.Address]*data.LifecycleConfiguration{}
}
return &frostfsFetcherMock{ return &frostfsFetcherMock{
configurations: configs, objects: map[oid.Address]*object.Object{},
} }
} }
func (c *frostfsFetcherMock) GetObject(ctx context.Context, addr oid.Address) (pool.ResGetObject, error) { func (c *frostfsFetcherMock) setObject(addr oid.Address, obj *object.Object) {
val, ok := c.configurations[addr] c.mu.Lock()
if !ok { defer c.mu.Unlock()
return pool.ResGetObject{}, errors.New("configurationFetcherMock: hash not found")
}
raw, err := xml.Marshal(val) c.objects[addr] = obj
if err != nil { }
return pool.ResGetObject{}, err
func (c *frostfsFetcherMock) GetObject(_ context.Context, addr oid.Address) (pool.ResGetObject, error) {
c.mu.RLock()
defer c.mu.RUnlock()
val, ok := c.objects[addr]
if !ok {
return pool.ResGetObject{}, &apistatus.ObjectNotFound{}
} }
return pool.ResGetObject{ return pool.ResGetObject{
Payload: &payloadReader{bytes.NewReader(raw)}, Header: *val,
Payload: &payloadReader{bytes.NewReader(val.Payload())},
}, nil }, nil
} }
@ -121,14 +129,27 @@ type payloadReader struct {
func (p *payloadReader) Close() error { return nil } func (p *payloadReader) Close() error { return nil }
func (c *frostfsFetcherMock) NetworkInfo(ctx context.Context) (*netmap.NetworkInfo, error) { func (c *frostfsFetcherMock) NetworkInfo(context.Context) (*netmap.NetworkInfo, error) {
//TODO implement me c.mu.RLock()
panic("implement me") defer c.mu.RUnlock()
var ni netmap.NetworkInfo
ni.SetCurrentEpoch(c.epoch)
ni.SetEpochDuration(10)
ni.SetMsPerBlock(1000)
return &ni, nil
} }
func (c *frostfsFetcherMock) DeleteObject(ctx context.Context, addr oid.Address) error { func (c *frostfsFetcherMock) DeleteObject(_ context.Context, addr oid.Address) error {
//TODO implement me c.mu.Lock()
panic("implement me") defer c.mu.Unlock()
if _, ok := c.objects[addr]; !ok {
return &apistatus.ObjectNotFound{}
}
delete(c.objects, addr)
return nil
} }
var _ CredentialSource = (*credentialSourceMock)(nil) var _ CredentialSource = (*credentialSourceMock)(nil)
@ -179,21 +200,6 @@ func (t *treeFetcherMock) GetBucketLifecycleConfiguration(_ context.Context, bkt
return val, nil return val, nil
} }
func (t *treeFetcherMock) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) {
//TODO implement me
panic("implement me")
}
func (t *treeFetcherMock) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error {
//TODO implement me
panic("implement me")
}
func (t *treeFetcherMock) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) {
//TODO implement me
panic("implement me")
}
var _ Settings = (*settingsMock)(nil) var _ Settings = (*settingsMock)(nil)
type settingsMock struct{} type settingsMock struct{}
@ -209,7 +215,7 @@ func TestFetcherBase(t *testing.T) {
key, err := keys.NewPrivateKey() key, err := keys.NewPrivateKey()
require.NoError(t, err) require.NoError(t, err)
mocks, err := initMocks(2, 1) mocks, err := initFetcherMocks(2, 1)
require.NoError(t, err) require.NoError(t, err)
epochCh := make(chan uint64) epochCh := make(chan uint64)
@ -248,7 +254,7 @@ func TestFetcherCancel(t *testing.T) {
key, err := keys.NewPrivateKey() key, err := keys.NewPrivateKey()
require.NoError(t, err) require.NoError(t, err)
mocks, err := initMocks(1, 1) mocks, err := initFetcherMocks(1, 1)
require.NoError(t, err) require.NoError(t, err)
epochCh := make(chan uint64) epochCh := make(chan uint64)
@ -288,28 +294,39 @@ type fetchersMock struct {
treeFetcher *treeFetcherMock treeFetcher *treeFetcherMock
} }
func initMocks(users, containers int) (*fetchersMock, error) { func initFetcherMocks(users, containers int) (*fetchersMock, error) {
usersMap, err := generateUsersMap(users) usersMap, err := generateUsersMap(users)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ffsFetcher := newFrostFSFetcherMock()
cnrsMap := make(map[util.Uint160][]cid.ID) cnrsMap := make(map[util.Uint160][]cid.ID)
treeMap := make(map[cid.ID]oid.ID) treeMap := make(map[cid.ID]oid.ID)
configMap := make(map[oid.Address]*data.LifecycleConfiguration)
for hash := range usersMap { for hash := range usersMap {
for i := 0; i < containers; i++ { for i := 0; i < containers; i++ {
addr := oidtest.Address() addr := oidtest.Address()
cnrsMap[hash] = append(cnrsMap[hash], addr.Container()) cnrsMap[hash] = append(cnrsMap[hash], addr.Container())
treeMap[addr.Container()] = addr.Object() treeMap[addr.Container()] = addr.Object()
configMap[addr] = &data.LifecycleConfiguration{Rules: []data.LifecycleRule{{ID: addr.EncodeToString()}}}
lc := &data.LifecycleConfiguration{Rules: []data.LifecycleRule{{ID: addr.EncodeToString()}}}
raw, err := xml.Marshal(lc)
if err != nil {
return nil, err
}
obj := object.New()
obj.SetPayload(raw)
obj.SetContainerID(addr.Container())
obj.SetID(addr.Object())
ffsFetcher.objects[addr] = obj
} }
} }
return &fetchersMock{ return &fetchersMock{
userFetcher: newUserFetcherMock(usersMap), userFetcher: newUserFetcherMock(usersMap),
containerFetcher: newContainerFetcherMock(cnrsMap), containerFetcher: newContainerFetcherMock(cnrsMap),
configurationFetcher: newFrostFSFetcherMock(configMap), configurationFetcher: ffsFetcher,
credentialSource: newCredentialSourceMock(usersMap), credentialSource: newCredentialSourceMock(usersMap),
treeFetcher: newTreeFetcherMock(treeMap), treeFetcher: newTreeFetcherMock(treeMap),
}, nil }, nil

View file

@ -1,52 +1,65 @@
package logs package logs
const ( const (
ApplicationStarted = "application started" ApplicationStarted = "application started"
ApplicationStopped = "application stopped" ApplicationStopped = "application stopped"
StoppingApplication = "stopping application" StoppingApplication = "stopping application"
ServiceIsRunning = "service is running" ServiceIsRunning = "service is running"
ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port" ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port"
ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled" ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled"
ShuttingDownService = "shutting down service" ShuttingDownService = "shutting down service"
CantGracefullyShutDownService = "can't gracefully shut down service, force stop" CantGracefullyShutDownService = "can't gracefully shut down service, force stop"
CantShutDownService = "can't shut down service" CantShutDownService = "can't shut down service"
SIGHUPConfigReloadStarted = "SIGHUP config reload started" SIGHUPConfigReloadStarted = "SIGHUP config reload started"
FailedToReloadConfigBecauseItsMissed = "failed to reload config because it's missed" FailedToReloadConfigBecauseItsMissed = "failed to reload config because it's missed"
FailedToReloadConfig = "failed to reload config" FailedToReloadConfig = "failed to reload config"
LogLevelWontBeUpdated = "log level won't be updated" LogLevelWontBeUpdated = "log level won't be updated"
SIGHUPConfigReloadCompleted = "SIGHUP config reload completed" SIGHUPConfigReloadCompleted = "SIGHUP config reload completed"
ListenerStopped = "listener stopped" ListenerStopped = "listener stopped"
MorphClientStopped = "morph client stopped" MorphClientStopped = "morph client stopped"
MorphClientReconnection = "morph client reconnection..." MorphClientReconnection = "morph client reconnection..."
ListenerReconnection = "listener reconnection..." ListenerReconnection = "listener reconnection..."
MorphClientCouldntBeReconnected = "morph client couldn't be reconnected" MorphClientCouldntBeReconnected = "morph client couldn't be reconnected"
ListenerCouldntBeReconnected = "listener couldn't be reconnected" ListenerCouldntBeReconnected = "listener couldn't be reconnected"
ResolveNetmapContract = "failed to resolve netmap contract" ResolveNetmapContract = "failed to resolve netmap contract"
ResolveFrostfsIDContract = "failed to resolve frostfsid contract" ResolveFrostfsIDContract = "failed to resolve frostfsid contract"
ResolveContainerContract = "failed to resolve container contract" ResolveContainerContract = "failed to resolve container contract"
NewEpochWasTriggered = "new epoch was triggered" NewEpochWasTriggered = "new epoch was triggered"
InitNotificator = "init notificator" InitNotificator = "init notificator"
NoMorphRPCEndpoints = "no morph RPC endpoints" NoMorphRPCEndpoints = "no morph RPC endpoints"
FailedToLoadPrivateKey = "failed to load private key" FailedToLoadPrivateKey = "failed to load private key"
NoCredentialSourceWallets = "no credential source wallets" NoCredentialSourceWallets = "no credential source wallets"
CouldntCreateWalletSource = "could not create wallet source" CouldntCreateWalletSource = "could not create wallet source"
AddedStoragePeer = "added storage peer" AddedStoragePeer = "added storage peer"
FailedToCreateConnectionPool = "failed to create connection pool" FailedToCreateConnectionPool = "failed to create connection pool"
FailedToDialConnectionPool = "failed to dial connection pool" FailedToDialConnectionPool = "failed to dial connection pool"
FailedToCreateTreePool = "failed to create tree pool" FailedToCreateTreePool = "failed to create tree pool"
FailedToDialTreePool = "failed to dial tree pool" FailedToDialTreePool = "failed to dial tree pool"
FoundUserContainers = "found user containers" FoundUserContainers = "found user containers"
JobProviderStopped = "job provider stopped" JobProviderStopped = "job provider stopped"
JobProviderStoppedBecauseOfEpochChan = "job provider stopped because of epoch channel is closed" JobProviderStoppedBecauseOfEpochChan = "job provider stopped because of epoch channel is closed"
FailedToInitMorphClient = "failed to init morph client" FailedToInitMorphClient = "failed to init morph client"
FailedToFetchServicesKeys = "failed to fetch lifecycle services keys" FailedToFetchServicesKeys = "failed to fetch lifecycle services keys"
FailedToFetchUsers = "failed to fetch users" FailedToFetchUsers = "failed to fetch users"
FailedToHandleUser = "failed to handle user" FailedToHandleUser = "failed to handle user"
FailedToHandleContainer = "failed to handle container" FailedToHandleContainer = "failed to handle container"
FetcherTriggerEpoch = "fetcher: trigger epoch, cancel previous fetch" FetcherTriggerEpoch = "fetcher: trigger epoch, cancel previous fetch"
FetchedUserContainers = "fetched user container configurations" FetchedUserContainers = "fetched user container configurations"
HandlerTriggered = "handler: triggered" HandlerTriggered = "handler: triggered"
HandlerContextCanceled = "handler: context canceled" HandlerContextCanceled = "handler: context canceled"
FailedToSubmitTaskToPool = "failed to submit task to executor pool" FailedToSubmitTaskToPool = "failed to submit task to executor pool"
WorkerFailedToHandleJob = "worker failed to handle job" WorkerFailedToHandleJob = "worker failed to handle job"
ExecutorStopped = "executor stopped"
ExecutorStoppedJobsChannelIsClosed = "executor stopped: jobs channel is closed"
FailedToHandleRule = "failed to handle rule"
AbortMultipartUploads = "abort multiparts uploads"
ExpireObjects = "expire objects"
FailedToGetMultipartCreationEpoch = "failed to get multipart creation epoch"
FailedToAbortMultipart = "failed to abort multipart"
FailedToExpireObject = "failed to expire object"
DeleteObjectVersionFromStorage = "delete object version from storage"
FailedToGenerateRandomIDForDeleteMarker = "failed to generate random id for delete marker"
AddDeleteMarker = "add delete marker"
DeleteObjectVersionFromTree = "delete object version from tree"
EpochMismatched = "epoch mismatched"
) )