frostfs-s3-lifecycler/internal/lifecycle/fetcher.go
Denis Kirillov ba26d975e0 [#16] fetcher: Remove bearer APE condition
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-07-31 12:01:56 +03:00

415 lines
10 KiB
Go

package lifecycle
import (
"context"
"crypto/ecdsa"
"encoding/binary"
"encoding/xml"
"fmt"
"io"
"slices"
"sort"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ape"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/hrw"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
"golang.org/x/text/encoding/ianaindex"
)
type Job struct {
ContainerID cid.ID
PrivateKey *keys.PrivateKey
LifecycleConfiguration *data.LifecycleConfiguration
Epoch uint64
Bearer *bearer.Token
}
type JobProvider struct {
userFetcher UserFetcher
containerFetcher ContainerFetcher
treeFetcher TreeFetcher
frostfsFetcher FrostFSFetcher
credentialSource CredentialSource
settings Settings
currentLifecycler *keys.PrivateKey
log *zap.Logger
cancelCurrentFetch context.CancelFunc
jobChan chan Job
epochChan <-chan uint64
}
type Settings interface {
ServicesKeys() keys.PublicKeys
}
type Config struct {
UserFetcher UserFetcher
ContainerFetcher ContainerFetcher
FrostFSFetcher FrostFSFetcher
CredentialSource CredentialSource
TreeFetcher TreeFetcher
Settings Settings
CurrentLifecycler *keys.PrivateKey
Logger *zap.Logger
BufferSize int
EpochChannel <-chan uint64
}
func NewJobProvider(ctx context.Context, cfg Config) *JobProvider {
provider := &JobProvider{
userFetcher: cfg.UserFetcher,
settings: cfg.Settings,
log: cfg.Logger,
containerFetcher: cfg.ContainerFetcher,
treeFetcher: cfg.TreeFetcher,
frostfsFetcher: cfg.FrostFSFetcher,
credentialSource: cfg.CredentialSource,
currentLifecycler: cfg.CurrentLifecycler,
epochChan: cfg.EpochChannel,
jobChan: make(chan Job, cfg.BufferSize),
cancelCurrentFetch: func() {},
}
go provider.startFetchRoutine(ctx)
return provider
}
type objToHRW struct {
epoch uint64
hash util.Uint160
}
func (o objToHRW) bytes() []byte {
buf := make([]byte, binary.MaxVarintLen64)
ln := binary.PutUvarint(buf, o.epoch)
return append(o.hash[:], buf[:ln]...)
}
type UserContainer struct {
ID user.ID
Key *keys.PrivateKey
Container cid.ID
APEChain ape.Chain
}
func (p *JobProvider) Jobs() <-chan Job {
return p.jobChan
}
func (p *JobProvider) startFetchRoutine(ctx context.Context) {
var (
epochCtx context.Context
wg sync.WaitGroup
)
defer func() {
wg.Wait()
close(p.jobChan)
}()
for {
select {
case <-ctx.Done():
p.log.Info(logs.JobProviderStopped, zap.Error(ctx.Err()))
p.cancelCurrentFetch()
return
case epoch, ok := <-p.epochChan:
if !ok {
p.log.Info(logs.JobProviderStoppedBecauseOfEpochChan)
return
}
p.log.Info(logs.FetcherTriggerEpoch, zap.Uint64("epoch", epoch))
p.cancelCurrentFetch()
wg.Wait()
p.cleanJobChannel()
epochCtx, p.cancelCurrentFetch = context.WithCancel(ctx)
wg.Add(1)
go p.handleEpoch(epochCtx, epoch, &wg)
}
}
}
func (p *JobProvider) cleanJobChannel() {
for len(p.jobChan) != 0 {
select {
case <-p.jobChan:
default:
}
}
}
func (p *JobProvider) handleEpoch(ctx context.Context, epoch uint64, wg *sync.WaitGroup) {
defer wg.Done()
userHashes, err := p.userFetcher.Users()
if err != nil {
p.log.Error(logs.FailedToFetchUsers, zap.Error(err))
return
}
lifecyclers, currentPosition := p.svcKeys()
indexes := make([]uint64, len(lifecyclers))
for i := range indexes {
indexes[i] = uint64(i)
}
obj := objToHRW{epoch: epoch}
for i := range userHashes {
obj.hash = userHashes[i]
h := hrw.Hash(obj.bytes())
if hrw.Sort(indexes, h)[0] != currentPosition {
continue
}
select {
case <-ctx.Done():
return
default:
if err = p.handleUser(ctx, userHashes[i], epoch); err != nil {
p.log.Warn(logs.FailedToHandleUser,
zap.String("address", address.Uint160ToString(userHashes[i])),
zap.Error(err))
}
}
}
}
func (p *JobProvider) handleUser(ctx context.Context, userHash util.Uint160, epoch uint64) error {
userKey, err := p.resolveUserKey(ctx, userHash)
if err != nil {
return fmt.Errorf("resolve key: %w", err)
}
var userID user.ID
user.IDFromKey(&userID, (ecdsa.PublicKey)(*userKey.PublicKey()))
containers, err := p.containerFetcher.Containers(userID)
if err != nil {
return fmt.Errorf("list user containers: %w", err)
}
p.log.Info(logs.FoundUserContainers,
zap.String("user", userID.EncodeToString()),
zap.Int("containers", len(containers)))
successfullyFetchedContainers := len(containers)
allowedChainRaw := p.formAllowedAPEChain().Bytes()
for _, container := range containers {
uc := &UserContainer{
ID: userID,
Key: userKey,
Container: container,
APEChain: ape.Chain{Raw: allowedChainRaw},
}
select {
case <-ctx.Done():
return ctx.Err()
default:
if err = p.handleContainer(ctx, uc, epoch); err != nil {
p.log.Warn(logs.FailedToHandleContainer,
zap.String("user", userID.EncodeToString()),
zap.String("cid", container.EncodeToString()),
zap.Error(err))
successfullyFetchedContainers--
}
}
}
p.log.Info(logs.FetchedUserContainers,
zap.String("user", userID.EncodeToString()),
zap.Int("successful", successfullyFetchedContainers),
zap.Int("all", len(containers)))
return nil
}
func (p *JobProvider) handleContainer(ctx context.Context, uc *UserContainer, epoch uint64) error {
var lifecyclerOwner user.ID
user.IDFromKey(&lifecyclerOwner, p.currentLifecycler.PrivateKey.PublicKey) // consider pre-compute this
bktInfo := &data.BucketInfo{
CID: uc.Container,
Owner: uc.ID,
}
apeOverride := formAPEOverride(uc)
btoken, err := formBearerToken(epoch, apeOverride, uc.Key, lifecyclerOwner)
if err != nil {
return fmt.Errorf("form bearer token: %w", err)
}
ctx = addBearerToContext(ctx, btoken)
objID, err := p.treeFetcher.GetBucketLifecycleConfiguration(ctx, bktInfo)
if err != nil {
return fmt.Errorf("get lifecycle configuration from tree: %w", err)
}
var addr oid.Address
addr.SetContainer(uc.Container)
addr.SetObject(objID)
configuration, err := p.fetchLifecycleConfiguration(ctx, addr)
if err != nil {
return fmt.Errorf("get lifecycle configuration from storage: %w", err)
}
job := Job{
ContainerID: uc.Container,
PrivateKey: uc.Key,
LifecycleConfiguration: configuration,
Epoch: epoch,
Bearer: btoken,
}
select {
case <-ctx.Done():
return ctx.Err()
case p.jobChan <- job:
}
return nil
}
func (p *JobProvider) resolveUserKey(ctx context.Context, userHash util.Uint160) (*keys.PrivateKey, error) {
userKey, err := p.userFetcher.UserKey(userHash)
if err != nil {
return nil, fmt.Errorf("get public key: %w", err)
}
privateKey, err := p.credentialSource.Credentials(ctx, userKey)
if err != nil {
return nil, fmt.Errorf("get private key: %w", err)
}
return privateKey, nil
}
func (p *JobProvider) svcKeys() (keys.PublicKeys, uint64) {
currentPublicKey := p.currentLifecycler.PublicKey()
lifecyclerKeys := p.settings.ServicesKeys()
if position := slices.IndexFunc(lifecyclerKeys, func(pk *keys.PublicKey) bool {
return pk.Equal(currentPublicKey)
}); position == -1 {
lifecyclerKeys = append(lifecyclerKeys, currentPublicKey)
}
sort.Slice(lifecyclerKeys, func(i, j int) bool {
return lifecyclerKeys[i].Cmp(lifecyclerKeys[j]) == -1
})
position := slices.IndexFunc(lifecyclerKeys, func(pk *keys.PublicKey) bool {
return pk.Equal(currentPublicKey)
})
if position == -1 {
// should never happen
panic("current lifecycler key isn't in list")
}
return lifecyclerKeys, uint64(position)
}
func (p *JobProvider) fetchLifecycleConfiguration(ctx context.Context, addr oid.Address) (*data.LifecycleConfiguration, error) {
res, err := p.frostfsFetcher.GetObject(ctx, addr)
if err != nil {
return nil, err
}
defer func() {
if closeErr := res.Payload.Close(); closeErr != nil {
p.log.Warn("could not close object payload", zap.String("address", addr.EncodeToString()), zap.Error(closeErr))
}
}()
lifecycleCfg := &data.LifecycleConfiguration{}
dec := newDecoder(res.Payload)
if err = dec.Decode(lifecycleCfg); err != nil {
return nil, fmt.Errorf("unmarshal lifecycle configuration '%s': %w", addr.EncodeToString(), err)
}
return lifecycleCfg, nil
}
func (p *JobProvider) formAllowedAPEChain() *chain.Chain {
return &chain.Chain{
ID: chain.ID("lifecycler/" + p.currentLifecycler.Address()),
Rules: []chain.Rule{{
Status: chain.Allow,
Actions: chain.Actions{Names: []string{"*"}},
Resources: chain.Resources{Names: []string{"*"}},
}},
}
}
func formBearerToken(epoch uint64, apeOverride bearer.APEOverride, userKey *keys.PrivateKey, lifecyclerOwner user.ID) (*bearer.Token, error) {
var btoken bearer.Token
btoken.SetIat(epoch)
btoken.SetNbf(epoch)
btoken.SetExp(epoch + 2) // maybe +1, I'm not sure if we should configure this parameter
btoken.SetAPEOverride(apeOverride)
btoken.AssertUser(lifecyclerOwner)
if err := btoken.Sign(userKey.PrivateKey); err != nil {
return nil, fmt.Errorf("sign: %w", err)
}
return &btoken, nil
}
func formAPEOverride(userInfo *UserContainer) bearer.APEOverride {
return bearer.APEOverride{
Target: ape.ChainTarget{
TargetType: ape.TargetTypeContainer,
Name: userInfo.Container.EncodeToString(),
},
Chains: []ape.Chain{userInfo.APEChain},
}
}
func addBearerToContext(ctx context.Context, btoken *bearer.Token) context.Context {
return middleware.SetBox(ctx, &middleware.Box{
AccessBox: &accessbox.Box{
Gate: &accessbox.GateData{
BearerToken: btoken,
},
},
})
}
const awsDefaultNamespace = "http://s3.amazonaws.com/doc/2006-03-01/"
func newDecoder(r io.Reader) *xml.Decoder {
dec := xml.NewDecoder(r)
dec.DefaultSpace = awsDefaultNamespace
dec.CharsetReader = func(charset string, reader io.Reader) (io.Reader, error) {
enc, err := ianaindex.IANA.Encoding(charset)
if err != nil {
return nil, fmt.Errorf("charset %s: %w", charset, err)
}
return enc.NewDecoder().Reader(reader), nil
}
return dec
}