[#4] Support aborting multiparts

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-07-23 11:37:05 +03:00
parent 2ece73b786
commit 2ee8967547
11 changed files with 577 additions and 156 deletions

View file

@ -2,7 +2,6 @@ package main
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
@ -124,27 +123,37 @@ func (a *App) init(ctx context.Context) {
close(epochCh)
}()
ffs := frostfs.NewFrostFS(objPool, a.log)
tr := tree.NewTree(frostfs.NewTreePoolWrapper(treePool), a.log)
lifecycleCfg := lifecycle.Config{
UserFetcher: contract.NewFrostFSID(ffsidCfg),
ContainerFetcher: contract.NewContainer(containerCfg),
ConfigurationFetcher: frostfs.NewFrostFS(objPool, a.log),
CredentialSource: credSource,
Settings: a.settings,
CurrentLifecycler: a.key,
Logger: a.log,
TreeFetcher: tree.NewTree(frostfs.NewTreePoolWrapper(treePool), a.log),
BufferSize: fetchJobFetcherBuffer(a.cfg),
EpochChannel: epochCh,
UserFetcher: contract.NewFrostFSID(ffsidCfg),
ContainerFetcher: contract.NewContainer(containerCfg),
FrostFSFetcher: ffs,
CredentialSource: credSource,
Settings: a.settings,
CurrentLifecycler: a.key,
Logger: a.log,
TreeFetcher: tr,
BufferSize: fetchJobFetcherBuffer(a.cfg),
EpochChannel: epochCh,
}
jobProvider := lifecycle.NewJobProvider(ctx, lifecycleCfg)
go func() {
// todo (d.kirillov) use real job executor here TrueCloudLab/frostfs-s3-lifecycler#4
for job := range jobProvider.Jobs() {
fmt.Println(job)
}
}()
executorCfg := lifecycle.ExecutorConfig{
Logger: a.log,
Jobs: jobProvider.Jobs(),
WorkerPoolSize: fetchExecutorPoolSize(a.cfg),
TreeFetcher: tr,
FrostFSFetcher: ffs,
}
executor, err := lifecycle.NewExecutor(ctx, executorCfg)
if err != nil {
a.log.Fatal(logs.ResolveNetmapContract, zap.Error(err))
}
_ = executor // todo consider run with separate method
netmapContract, err := resolver.ResolveContractHash(cli, a.cfg.GetString(cfgMorphContractNetmap))
if err != nil {

View file

@ -71,6 +71,7 @@ const (
// Lifecycle.
cfgLifecycleJobFetcherBuffer = "lifecycle.job_fetcher_buffer"
cfgLifecycleExecutorPoolSize = "lifecycle.executor_pool_size"
cfgLifecycleServices = "lifecycle.services"
// Command line args.
@ -95,6 +96,7 @@ const (
defaultFrostFSPoolErrorThreshold uint32 = 100
defaultLifecycleJobFetcherBuffer = 1000
defaultLifecycleExecutorPoolSize = 100
)
func settings() *viper.Viper {
@ -141,6 +143,7 @@ func settings() *viper.Viper {
// lifecycle:
v.SetDefault(cfgLifecycleJobFetcherBuffer, defaultLifecycleJobFetcherBuffer)
v.SetDefault(cfgLifecycleExecutorPoolSize, defaultLifecycleExecutorPoolSize)
// Bind flags with configuration values.
if err := v.BindPFlags(flags); err != nil {
@ -428,6 +431,15 @@ func fetchJobFetcherBuffer(cfg *viper.Viper) int {
return bufferSize
}
func fetchExecutorPoolSize(cfg *viper.Viper) int {
val := cfg.GetInt(cfgLifecycleExecutorPoolSize)
if val <= 0 {
val = defaultLifecycleExecutorPoolSize
}
return val
}
func fetchMorphReconnectClientsInterval(cfg *viper.Viper) time.Duration {
val := cfg.GetDuration(cfgMorphReconnectClientsInterval)
if val <= 0 {

View file

@ -38,6 +38,7 @@ S3_LIFECYCLER_CREDENTIAL_SOURCE_WALLETS_0_PASSPHRASE=""
# Lifecycle
S3_LIFECYCLER_LIFECYCLE_JOB_FETCHER_BUFFER=1000
S3_LIFECYCLER_LIFECYCLE_EXECUTOR_POOL_SIZE=100
S3_LIFECYCLER_LIFECYCLE_SERVICES=0313b1ac3a8076e155a7e797b24f0b650cccad5941ea59d7cfd51a024a8b2a06bf 031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a
# FrostFS

View file

@ -41,6 +41,7 @@ credential_source:
lifecycle:
job_fetcher_buffer: 1000
executor_pool_size: 100
services:
- 0313b1ac3a8076e155a7e797b24f0b650cccad5941ea59d7cfd51a024a8b2a06bf

View file

@ -147,6 +147,7 @@ Configuration for main lifecycle handling procedure.
```yaml
lifecycle:
job_fetcher_buffer: 1000
executor_pool_size: 100
services:
- 0313b1ac3a8076e155a7e797b24f0b650cccad5941ea59d7cfd51a024a8b2a06bf
```
@ -154,6 +155,7 @@ lifecycle:
| Parameter | Type | SIGHUP reload | Default value | Description |
|----------------------|------------|---------------|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `job_fetcher_buffer` | `int` | no | `1000` | Size for buffered channel to fetch users/container and other data for lifecycle procedure. This param helps reduce number concurrent outgoing network requests. |
| `executor_pool_size` | `int` | no | `100` | Worker pool size to tidy container up (according to lifecycle configuration). |
| `services` | `[]string` | yes | | List of Lifecycle services public keys. Needs to split jobs. |
# `frostfs` section

View file

@ -2,18 +2,14 @@ package frostfs
import (
"context"
"encoding/xml"
"fmt"
"io"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"go.uber.org/zap"
"golang.org/x/text/encoding/ianaindex"
)
// FrostFS represents virtual connection to the FrostFS network.
@ -32,70 +28,39 @@ func NewFrostFS(p *pool.Pool, log *zap.Logger) *FrostFS {
}
}
type PrmGetObject struct {
// Container to read the object header from.
Container cid.ID
func (f *FrostFS) GetObject(ctx context.Context, addr oid.Address) (pool.ResGetObject, error) {
var prm pool.PrmObjectGet
prm.SetAddress(addr)
addBearer(ctx, &prm)
// ID of the object for which to read the header.
Object oid.ID
// Bearer token to be used for the operation. Overlaps PrivateKey. Optional.
BearerToken bearer.Token
return f.pool.GetObject(ctx, prm)
}
func (f *FrostFS) GetObject(ctx context.Context, prm PrmGetObject) (pool.ResGetObject, error) {
var addr oid.Address
addr.SetContainer(prm.Container)
addr.SetObject(prm.Object)
func (f *FrostFS) DeleteObject(ctx context.Context, addr oid.Address) error {
var prm pool.PrmObjectDelete
prm.SetAddress(addr)
addBearer(ctx, &prm)
var prmGet pool.PrmObjectGet
prmGet.SetAddress(addr)
prmGet.UseBearer(prm.BearerToken)
return f.pool.GetObject(ctx, prmGet)
return f.pool.DeleteObject(ctx, prm)
}
func (f *FrostFS) LifecycleConfiguration(ctx context.Context, addr oid.Address) (*data.LifecycleConfiguration, error) {
prm := PrmGetObject{
Container: addr.Container(),
Object: addr.Object(),
}
if bd, err := middleware.GetBoxData(ctx); err == nil && bd.Gate.BearerToken != nil {
prm.BearerToken = *bd.Gate.BearerToken
}
res, err := f.GetObject(ctx, prm)
func (f *FrostFS) NetworkInfo(ctx context.Context) (*netmap.NetworkInfo, error) {
networkInfo, err := f.pool.NetworkInfo(ctx)
if err != nil {
return nil, err
}
defer func() {
if closeErr := res.Payload.Close(); closeErr != nil {
f.log.Warn("could not close object payload", zap.String("address", addr.EncodeToString()), zap.Error(closeErr))
}
}()
lifecycleCfg := &data.LifecycleConfiguration{}
dec := newDecoder(res.Payload)
if err = dec.Decode(lifecycleCfg); err != nil {
return nil, fmt.Errorf("unmarshal lifecycle configuration '%s': %w", addr.EncodeToString(), err)
return nil, fmt.Errorf("get network info via client: %w", err)
}
return lifecycleCfg, nil
return &networkInfo, nil
}
const awsDefaultNamespace = "http://s3.amazonaws.com/doc/2006-03-01/"
func newDecoder(r io.Reader) *xml.Decoder {
dec := xml.NewDecoder(r)
dec.DefaultSpace = awsDefaultNamespace
dec.CharsetReader = func(charset string, reader io.Reader) (io.Reader, error) {
enc, err := ianaindex.IANA.Encoding(charset)
if err != nil {
return nil, fmt.Errorf("charset %s: %w", charset, err)
}
return enc.NewDecoder().Reader(reader), nil
}
return dec
type WithBearerParam interface {
UseBearer(token bearer.Token)
}
func addBearer(ctx context.Context, prm WithBearerParam) {
if bd, err := middleware.GetBoxData(ctx); err == nil {
if bd.Gate.BearerToken != nil {
prm.UseBearer(*bd.Gate.BearerToken)
}
}
}

View file

@ -0,0 +1,40 @@
package lifecycle
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util"
)
type UserFetcher interface {
Users() ([]util.Uint160, error)
UserKey(hash util.Uint160) (*keys.PublicKey, error)
}
type ContainerFetcher interface {
Containers(owner user.ID) ([]cid.ID, error)
}
type TreeFetcher interface {
GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error)
GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error)
DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error)
}
type FrostFSFetcher interface {
GetObject(ctx context.Context, addr oid.Address) (pool.ResGetObject, error)
NetworkInfo(ctx context.Context) (*netmap.NetworkInfo, error)
DeleteObject(ctx context.Context, addr oid.Address) error
}
type CredentialSource interface {
Credentials(ctx context.Context, pk *keys.PublicKey) (*keys.PrivateKey, error)
}

View file

@ -0,0 +1,326 @@
package lifecycle
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
type Executor struct {
log *zap.Logger
jobs <-chan Job
pool *ants.Pool
tree TreeFetcher
frostfs FrostFSFetcher
}
type ExecutorConfig struct {
Logger *zap.Logger
Jobs <-chan Job
WorkerPoolSize int
TreeFetcher TreeFetcher
FrostFSFetcher FrostFSFetcher
}
type logWrapper struct {
log *zap.Logger
}
func (l *logWrapper) Printf(format string, args ...interface{}) {
l.log.Info(fmt.Sprintf(format, args...))
}
func NewExecutor(ctx context.Context, cfg ExecutorConfig) (*Executor, error) {
e := &Executor{
log: cfg.Logger,
jobs: cfg.Jobs,
tree: cfg.TreeFetcher,
frostfs: cfg.FrostFSFetcher,
}
var err error
e.pool, err = ants.NewPool(cfg.WorkerPoolSize, ants.WithLogger(&logWrapper{cfg.Logger}))
if err != nil {
return nil, fmt.Errorf("coudln't init worker pool: %w", err)
}
go e.workerRoutine(ctx)
return e, nil
}
func (e *Executor) workerRoutine(ctx context.Context) {
var (
wg sync.WaitGroup
err error
)
LOOP:
for job := range e.jobs {
select {
case <-ctx.Done():
break LOOP
default:
}
func(job Job) {
wg.Add(1)
err = e.pool.Submit(func() {
defer wg.Done()
if inErr := e.worker(ctx, job); inErr != nil {
e.log.Warn(logs.WorkerFailedToHandleJob, zap.Error(err))
}
})
if err != nil {
wg.Done()
e.log.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
}
}(job)
}
wg.Wait()
e.pool.Release()
}
func (e *Executor) worker(ctx context.Context, job Job) error {
ctx = addBearerToContext(ctx, job.Bearer)
var userID user.ID
user.IDFromKey(&userID, (ecdsa.PublicKey)(*job.PrivateKey.PublicKey()))
bktInfo := &data.BucketInfo{
CID: job.ContainerID,
Owner: userID,
}
ni, err := e.frostfs.NetworkInfo(ctx)
if err != nil {
return fmt.Errorf("get network info: %w", err)
}
if ni.CurrentEpoch() != job.Epoch {
ni.SetCurrentEpoch(job.Epoch)
}
for _, rule := range job.LifecycleConfiguration.Rules {
if rule.Status == "Disabled" {
continue
}
// todo consider validating rule
if err = e.handleRule(ctx, ni, rule, bktInfo); err != nil {
e.log.Warn("failed to handle rule", zap.Uint64("epoch", job.Epoch),
zap.Stringer("cid", job.ContainerID), zap.String("rule", rule.ID),
zap.Error(err))
}
}
return nil
}
func (e *Executor) handleRule(ctx context.Context, ni *netmap.NetworkInfo, rule data.LifecycleRule, bktInfo *data.BucketInfo) error {
if err := e.abortMultiparts(ctx, ni, rule, bktInfo); err != nil {
e.log.Warn("handle multiparts uploads", zap.Error(err))
}
return nil
}
const (
creationEpochKV = "CreationEpoch"
tagPrefix = "S3-Tag-"
)
func (e *Executor) abortMultiparts(ctx context.Context, ni *netmap.NetworkInfo, rule data.LifecycleRule, bktInfo *data.BucketInfo) error {
if rule.AbortIncompleteMultipartUpload == nil || rule.AbortIncompleteMultipartUpload.DaysAfterInitiation == nil {
return nil
}
var prefix string
matchMultipartFn := func(*data.MultipartInfo) bool { return true }
if rule.Filter != nil {
filter := rule.Filter
if filter.ObjectSizeGreaterThan != nil || filter.ObjectSizeLessThan != nil ||
filter.And != nil && (filter.And.ObjectSizeGreaterThan != nil || filter.And.ObjectSizeLessThan != nil) {
// todo check if AWS behave the same way
// uncompleted multipart has no size, so filter cannot be applied
return nil
}
prefix = filter.Prefix
if filter.And != nil {
prefix = filter.And.Prefix
}
if filter.Tag != nil || filter.And != nil {
matchMultipartFn = matchMultipartByTagsFunc(filter)
}
}
multipartDuration, err := durationToEpochsAbs(ni, 24*time.Hour*time.Duration(*rule.AbortIncompleteMultipartUpload.DaysAfterInitiation))
if err != nil {
return fmt.Errorf("DaysAfterInitiation to epochs: %w", err)
}
multiparts, err := e.tree.GetMultipartUploadsByPrefix(ctx, bktInfo, prefix)
if err != nil {
return fmt.Errorf("list multiparts: %w", err)
}
for _, multipart := range multiparts {
if !matchMultipartFn(multipart) {
continue
}
if multipart.Finished {
continue
}
multipartCreationEpoch, err := creationEpoch(ni, multipart.Created, multipart.Meta)
if err != nil {
e.log.Warn("failed to get multipart creation epoch", zap.Error(err))
continue
}
if multipartCreationEpoch+multipartDuration < ni.CurrentEpoch() {
continue
}
if err = e.abortMultipart(ctx, bktInfo, multipart); err != nil {
e.log.Warn("failed to abort multipart", zap.String("key", multipart.Key),
zap.String("upload_id", multipart.UploadID), zap.Error(err))
continue
}
}
return nil
}
func matchMultipartByTagsFunc(filter *data.LifecycleRuleFilter) func(*data.MultipartInfo) bool {
return func(info *data.MultipartInfo) bool {
tags := make(map[string]string)
for k, v := range info.Meta {
if strings.HasPrefix(k, tagPrefix) {
tags[k[len(tagPrefix):]] = v
}
}
if filter.Tag != nil {
if tags[filter.Tag.Key] != filter.Tag.Value {
return false
}
}
if filter.And != nil {
for _, tag := range filter.And.Tags {
if tags[tag.Key] != tag.Value {
return false
}
}
}
return true
}
}
func (e *Executor) abortMultipart(ctx context.Context, bktInfo *data.BucketInfo, multipart *data.MultipartInfo) error {
parts, err := e.tree.GetParts(ctx, bktInfo, multipart.ID)
if err != nil {
return fmt.Errorf("get parts: %w", err)
}
var addr oid.Address
addr.SetContainer(bktInfo.CID)
for _, part := range parts {
addr.SetObject(part.OID)
if err = e.frostfs.DeleteObject(ctx, addr); err != nil {
return fmt.Errorf("delete object '%s': %w", addr.EncodeToString(), err)
}
}
if err = e.tree.DeleteMultipartUpload(ctx, bktInfo, multipart); err != nil {
return fmt.Errorf("delete multipart '%d': %w", multipart.ID, err)
}
return nil
}
func creationEpoch(ni *netmap.NetworkInfo, created time.Time, meta map[string]string) (creationEpoch uint64, err error) {
createdEpochStr := meta[creationEpochKV]
if createdEpochStr != "" {
if creationEpoch, err = strconv.ParseUint(createdEpochStr, 10, 64); err != nil {
return 0, fmt.Errorf("invalid creation epoch '%s': %w", createdEpochStr, err)
}
} else {
if creationEpoch, err = timeToEpoch(ni, created); err != nil {
return 0, fmt.Errorf("time to epoch: %w", err)
}
}
return creationEpoch, nil
}
func timeToEpoch(ni *netmap.NetworkInfo, timeToConvert time.Time) (uint64, error) {
dur := timeToConvert.Sub(time.Now())
epochLifetime, err := durationToEpochsAbs(ni, dur)
if err != nil {
return 0, err
}
curr := ni.CurrentEpoch()
var epoch uint64
if dur > 0 {
if epochLifetime >= math.MaxUint64-curr {
epoch = math.MaxUint64
} else {
epoch = curr + epochLifetime
}
} else {
if epochLifetime >= curr {
epoch = 0
} else {
epoch = curr - epochLifetime
}
}
return epoch, nil
}
func durationToEpochsAbs(ni *netmap.NetworkInfo, duration time.Duration) (uint64, error) {
duration = duration.Abs()
durEpoch := ni.EpochDuration()
if durEpoch == 0 {
return 0, errors.New("epoch duration is missing or zero")
}
msPerEpoch := durEpoch * uint64(ni.MsPerBlock())
epochLifetime := uint64(duration.Milliseconds()) / msPerEpoch
if uint64(duration.Milliseconds())%msPerEpoch != 0 {
epochLifetime++
}
return epochLifetime, nil
}

View file

@ -5,7 +5,9 @@ import (
"crypto/ecdsa"
"encoding/binary"
"encoding/hex"
"encoding/xml"
"fmt"
"io"
"slices"
"sort"
"sync"
@ -26,46 +28,27 @@ import (
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
"golang.org/x/text/encoding/ianaindex"
)
type UserFetcher interface {
Users() ([]util.Uint160, error)
UserKey(hash util.Uint160) (*keys.PublicKey, error)
}
type ContainerFetcher interface {
Containers(owner user.ID) ([]cid.ID, error)
}
type TreeFetcher interface {
GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error)
}
type ConfigurationFetcher interface {
LifecycleConfiguration(ctx context.Context, addr oid.Address) (*data.LifecycleConfiguration, error)
}
type CredentialSource interface {
Credentials(ctx context.Context, pk *keys.PublicKey) (*keys.PrivateKey, error)
}
type Job struct {
ContainerID cid.ID
PrivateKey *keys.PrivateKey
LifecycleConfiguration *data.LifecycleConfiguration
Epoch uint64
Bearer *bearer.Token
}
type JobProvider struct {
userFetcher UserFetcher
containerFetcher ContainerFetcher
treeFetcher TreeFetcher
configurationFetcher ConfigurationFetcher
credentialSource CredentialSource
settings Settings
currentLifecycler *keys.PrivateKey
log *zap.Logger
cancelCurrentFetch context.CancelFunc
userFetcher UserFetcher
containerFetcher ContainerFetcher
treeFetcher TreeFetcher
frostfsFetcher FrostFSFetcher
credentialSource CredentialSource
settings Settings
currentLifecycler *keys.PrivateKey
log *zap.Logger
cancelCurrentFetch context.CancelFunc
jobChan chan Job
epochChan <-chan uint64
@ -76,31 +59,31 @@ type Settings interface {
}
type Config struct {
UserFetcher UserFetcher
ContainerFetcher ContainerFetcher
ConfigurationFetcher ConfigurationFetcher
CredentialSource CredentialSource
TreeFetcher TreeFetcher
Settings Settings
CurrentLifecycler *keys.PrivateKey
Logger *zap.Logger
BufferSize int
EpochChannel <-chan uint64
UserFetcher UserFetcher
ContainerFetcher ContainerFetcher
FrostFSFetcher FrostFSFetcher
CredentialSource CredentialSource
TreeFetcher TreeFetcher
Settings Settings
CurrentLifecycler *keys.PrivateKey
Logger *zap.Logger
BufferSize int
EpochChannel <-chan uint64
}
func NewJobProvider(ctx context.Context, cfg Config) *JobProvider {
provider := &JobProvider{
userFetcher: cfg.UserFetcher,
settings: cfg.Settings,
log: cfg.Logger,
containerFetcher: cfg.ContainerFetcher,
treeFetcher: cfg.TreeFetcher,
configurationFetcher: cfg.ConfigurationFetcher,
credentialSource: cfg.CredentialSource,
currentLifecycler: cfg.CurrentLifecycler,
epochChan: cfg.EpochChannel,
jobChan: make(chan Job, cfg.BufferSize),
cancelCurrentFetch: func() {},
userFetcher: cfg.UserFetcher,
settings: cfg.Settings,
log: cfg.Logger,
containerFetcher: cfg.ContainerFetcher,
treeFetcher: cfg.TreeFetcher,
frostfsFetcher: cfg.FrostFSFetcher,
credentialSource: cfg.CredentialSource,
currentLifecycler: cfg.CurrentLifecycler,
epochChan: cfg.EpochChannel,
jobChan: make(chan Job, cfg.BufferSize),
cancelCurrentFetch: func() {},
}
go provider.startFetchRoutine(ctx)
@ -279,7 +262,7 @@ func (p *JobProvider) handleContainer(ctx context.Context, uc *UserContainer, ep
addr.SetContainer(uc.Container)
addr.SetObject(objID)
configuration, err := p.configurationFetcher.LifecycleConfiguration(ctx, addr)
configuration, err := p.fetchLifecycleConfiguration(ctx, addr, *btoken)
if err != nil {
return fmt.Errorf("get lifecycle configuration from storage: %w", err)
}
@ -289,6 +272,7 @@ func (p *JobProvider) handleContainer(ctx context.Context, uc *UserContainer, ep
PrivateKey: uc.Key,
LifecycleConfiguration: configuration,
Epoch: epoch,
Bearer: btoken,
}
select {
@ -339,6 +323,26 @@ func (p *JobProvider) svcKeys() (keys.PublicKeys, uint64) {
return lifecyclerKeys, uint64(position)
}
func (p *JobProvider) fetchLifecycleConfiguration(ctx context.Context, addr oid.Address, btoken bearer.Token) (*data.LifecycleConfiguration, error) {
res, err := p.frostfsFetcher.GetObject(ctx, addr)
if err != nil {
return nil, err
}
defer func() {
if closeErr := res.Payload.Close(); closeErr != nil {
p.log.Warn("could not close object payload", zap.String("address", addr.EncodeToString()), zap.Error(closeErr))
}
}()
lifecycleCfg := &data.LifecycleConfiguration{}
dec := newDecoder(res.Payload)
if err = dec.Decode(lifecycleCfg); err != nil {
return nil, fmt.Errorf("unmarshal lifecycle configuration '%s': %w", addr.EncodeToString(), err)
}
return lifecycleCfg, nil
}
func formAllowedAPEChain(userKey *keys.PublicKey) *chain.Chain {
return &chain.Chain{
ID: chain.ID("lifecycler"),
@ -390,3 +394,19 @@ func addBearerToContext(ctx context.Context, btoken *bearer.Token) context.Conte
},
})
}
const awsDefaultNamespace = "http://s3.amazonaws.com/doc/2006-03-01/"
func newDecoder(r io.Reader) *xml.Decoder {
dec := xml.NewDecoder(r)
dec.DefaultSpace = awsDefaultNamespace
dec.CharsetReader = func(charset string, reader io.Reader) (io.Reader, error) {
enc, err := ianaindex.IANA.Encoding(charset)
if err != nil {
return nil, fmt.Errorf("charset %s: %w", charset, err)
}
return enc.NewDecoder().Reader(reader), nil
}
return dec
}

View file

@ -1,14 +1,19 @@
package lifecycle
import (
"bytes"
"context"
"encoding/xml"
"errors"
"io"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util"
@ -79,28 +84,51 @@ func (c *containerFetcherMock) Containers(owner user.ID) ([]cid.ID, error) {
return containers, nil
}
var _ ConfigurationFetcher = (*configurationFetcherMock)(nil)
var _ FrostFSFetcher = (*frostfsFetcherMock)(nil)
type configurationFetcherMock struct {
type frostfsFetcherMock struct {
configurations map[oid.Address]*data.LifecycleConfiguration
}
func newConfigurationFetcherMock(configs map[oid.Address]*data.LifecycleConfiguration) *configurationFetcherMock {
func newFrostFSFetcherMock(configs map[oid.Address]*data.LifecycleConfiguration) *frostfsFetcherMock {
if configs == nil {
configs = map[oid.Address]*data.LifecycleConfiguration{}
}
return &configurationFetcherMock{
return &frostfsFetcherMock{
configurations: configs,
}
}
func (c *configurationFetcherMock) LifecycleConfiguration(_ context.Context, addr oid.Address) (*data.LifecycleConfiguration, error) {
func (c *frostfsFetcherMock) GetObject(ctx context.Context, addr oid.Address) (pool.ResGetObject, error) {
val, ok := c.configurations[addr]
if !ok {
return nil, errors.New("configurationFetcherMock: hash not found")
return pool.ResGetObject{}, errors.New("configurationFetcherMock: hash not found")
}
return val, nil
raw, err := xml.Marshal(val)
if err != nil {
return pool.ResGetObject{}, err
}
return pool.ResGetObject{
Payload: &payloadReader{bytes.NewReader(raw)},
}, nil
}
type payloadReader struct {
io.Reader
}
func (p *payloadReader) Close() error { return nil }
func (c *frostfsFetcherMock) NetworkInfo(ctx context.Context) (*netmap.NetworkInfo, error) {
//TODO implement me
panic("implement me")
}
func (c *frostfsFetcherMock) DeleteObject(ctx context.Context, addr oid.Address) error {
//TODO implement me
panic("implement me")
}
var _ CredentialSource = (*credentialSourceMock)(nil)
@ -151,6 +179,21 @@ func (t *treeFetcherMock) GetBucketLifecycleConfiguration(_ context.Context, bkt
return val, nil
}
func (t *treeFetcherMock) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) {
//TODO implement me
panic("implement me")
}
func (t *treeFetcherMock) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error {
//TODO implement me
panic("implement me")
}
func (t *treeFetcherMock) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) {
//TODO implement me
panic("implement me")
}
var _ Settings = (*settingsMock)(nil)
type settingsMock struct{}
@ -176,15 +219,15 @@ func TestFetcherBase(t *testing.T) {
}()
cfg := Config{
UserFetcher: mocks.userFetcher,
ContainerFetcher: mocks.containerFetcher,
ConfigurationFetcher: mocks.configurationFetcher,
CredentialSource: mocks.credentialSource,
TreeFetcher: mocks.treeFetcher,
Settings: &settingsMock{},
CurrentLifecycler: key,
Logger: log,
EpochChannel: epochCh,
UserFetcher: mocks.userFetcher,
ContainerFetcher: mocks.containerFetcher,
FrostFSFetcher: mocks.configurationFetcher,
CredentialSource: mocks.credentialSource,
TreeFetcher: mocks.treeFetcher,
Settings: &settingsMock{},
CurrentLifecycler: key,
Logger: log,
EpochChannel: epochCh,
}
f := NewJobProvider(ctx, cfg)
@ -216,15 +259,15 @@ func TestFetcherCancel(t *testing.T) {
}()
cfg := Config{
UserFetcher: mocks.userFetcher,
ContainerFetcher: mocks.containerFetcher,
ConfigurationFetcher: mocks.configurationFetcher,
CredentialSource: mocks.credentialSource,
TreeFetcher: mocks.treeFetcher,
Settings: &settingsMock{},
CurrentLifecycler: key,
Logger: log,
EpochChannel: epochCh,
UserFetcher: mocks.userFetcher,
ContainerFetcher: mocks.containerFetcher,
FrostFSFetcher: mocks.configurationFetcher,
CredentialSource: mocks.credentialSource,
TreeFetcher: mocks.treeFetcher,
Settings: &settingsMock{},
CurrentLifecycler: key,
Logger: log,
EpochChannel: epochCh,
}
f := NewJobProvider(ctx, cfg)
@ -240,7 +283,7 @@ func TestFetcherCancel(t *testing.T) {
type fetchersMock struct {
userFetcher *userFetcherMock
containerFetcher *containerFetcherMock
configurationFetcher *configurationFetcherMock
configurationFetcher *frostfsFetcherMock
credentialSource *credentialSourceMock
treeFetcher *treeFetcherMock
}
@ -266,7 +309,7 @@ func initMocks(users, containers int) (*fetchersMock, error) {
return &fetchersMock{
userFetcher: newUserFetcherMock(usersMap),
containerFetcher: newContainerFetcherMock(cnrsMap),
configurationFetcher: newConfigurationFetcherMock(configMap),
configurationFetcher: newFrostFSFetcherMock(configMap),
credentialSource: newCredentialSourceMock(usersMap),
treeFetcher: newTreeFetcherMock(treeMap),
}, nil

View file

@ -47,4 +47,6 @@ const (
FetchedUserContainers = "fetched user container configurations"
HandlerTriggered = "handler: triggered"
HandlerContextCanceled = "handler: context canceled"
FailedToSubmitTaskToPool = "failed to submit task to executor pool"
WorkerFailedToHandleJob = "worker failed to handle job"
)