frostfs-s3-lifecycler/internal/lifecycle/fetcher_test.go
Denis Kirillov fce631be59
All checks were successful
/ DCO (pull_request) Successful in 1m26s
/ Builds (1.22) (pull_request) Successful in 1m26s
/ Builds (1.23) (pull_request) Successful in 1m31s
/ Vulncheck (pull_request) Successful in 1m47s
/ Lint (pull_request) Successful in 2m31s
/ Tests (1.22) (pull_request) Successful in 1m49s
/ Tests (1.23) (pull_request) Successful in 1m46s
[#19] fetcher: Don't user btoken for separate lifecycle container
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-09-03 15:32:42 +03:00

517 lines
12 KiB
Go

package lifecycle
import (
"bytes"
"context"
"encoding/xml"
"errors"
"io"
"sync"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
var _ UserFetcher = (*userFetcherMock)(nil)
type userFetcherMock struct {
users map[util.Uint160]*keys.PrivateKey
}
func newUserFetcherMock(users map[util.Uint160]*keys.PrivateKey) *userFetcherMock {
if users == nil {
users = map[util.Uint160]*keys.PrivateKey{}
}
return &userFetcherMock{
users: users,
}
}
func (u *userFetcherMock) Users() ([]util.Uint160, error) {
res := make([]util.Uint160, 0, len(u.users))
for hash := range u.users {
res = append(res, hash)
}
return res, nil
}
func (u *userFetcherMock) UserKey(hash util.Uint160) (*keys.PublicKey, error) {
key, ok := u.users[hash]
if !ok {
return nil, errors.New("userFetcherMock: hash not found")
}
return key.PublicKey(), nil
}
var _ ContainerFetcher = (*containerFetcherMock)(nil)
type containerFetcherMock struct {
containers map[util.Uint160][]cid.ID
}
func newContainerFetcherMock(containers map[util.Uint160][]cid.ID) *containerFetcherMock {
if containers == nil {
containers = map[util.Uint160][]cid.ID{}
}
return &containerFetcherMock{
containers: containers,
}
}
func (c *containerFetcherMock) Containers(owner user.ID) ([]cid.ID, error) {
hash, err := owner.ScriptHash()
if err != nil {
return nil, err
}
containers, ok := c.containers[hash]
if !ok {
return nil, errors.New("containerFetcherMock: hash not found")
}
return containers, nil
}
var _ FrostFSFetcher = (*frostfsFetcherMock)(nil)
type frostfsFetcherMock struct {
mu sync.RWMutex
objects map[oid.Address]*object.Object
epoch uint64
epochDuration uint64
msPerBlock int64
lifecycleContainer cid.ID
enableBearerCheck bool
}
func newFrostFSFetcherMock() *frostfsFetcherMock {
return &frostfsFetcherMock{
objects: map[oid.Address]*object.Object{},
epoch: 1,
epochDuration: 3600,
msPerBlock: 1000,
}
}
func (c *frostfsFetcherMock) setObject(addr oid.Address, obj *object.Object) {
c.mu.Lock()
defer c.mu.Unlock()
c.objects[addr] = obj
}
func (c *frostfsFetcherMock) setEpoch(epoch uint64) {
c.mu.Lock()
defer c.mu.Unlock()
c.epoch = epoch
}
func (c *frostfsFetcherMock) setEpochDuration(blocks uint64) {
c.mu.Lock()
defer c.mu.Unlock()
c.epochDuration = blocks
}
func (c *frostfsFetcherMock) setMsPerBlock(msPerBlock int64) {
c.mu.Lock()
defer c.mu.Unlock()
c.msPerBlock = msPerBlock
}
func (c *frostfsFetcherMock) GetObject(ctx context.Context, addr oid.Address) (pool.ResGetObject, error) {
c.mu.RLock()
defer c.mu.RUnlock()
if c.enableBearerCheck {
_, err := middleware.GetBoxData(ctx)
if c.lifecycleContainer.Equals(addr.Container()) {
if err == nil {
return pool.ResGetObject{}, errors.New("box data present in context for lifecycle container")
}
} else if err != nil {
return pool.ResGetObject{}, err
}
}
val, ok := c.objects[addr]
if !ok {
return pool.ResGetObject{}, &apistatus.ObjectNotFound{}
}
return pool.ResGetObject{
Header: *val,
Payload: &payloadReader{bytes.NewReader(val.Payload())},
}, nil
}
type payloadReader struct {
io.Reader
}
func (p *payloadReader) Close() error { return nil }
func (c *frostfsFetcherMock) NetworkInfo(context.Context) (*netmap.NetworkInfo, error) {
c.mu.RLock()
defer c.mu.RUnlock()
var ni netmap.NetworkInfo
ni.SetCurrentEpoch(c.epoch)
ni.SetEpochDuration(c.epochDuration)
ni.SetMsPerBlock(c.msPerBlock)
return &ni, nil
}
func (c *frostfsFetcherMock) DeleteObject(_ context.Context, addr oid.Address) error {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.objects[addr]; !ok {
return &apistatus.ObjectNotFound{}
}
delete(c.objects, addr)
return nil
}
var _ CredentialSource = (*credentialSourceMock)(nil)
type credentialSourceMock struct {
users map[util.Uint160]*keys.PrivateKey
}
func newCredentialSourceMock(users map[util.Uint160]*keys.PrivateKey) *credentialSourceMock {
if users == nil {
users = map[util.Uint160]*keys.PrivateKey{}
}
return &credentialSourceMock{
users: users,
}
}
func (c *credentialSourceMock) Credentials(_ context.Context, pk *keys.PublicKey) (*keys.PrivateKey, error) {
key, ok := c.users[pk.GetScriptHash()]
if !ok {
return nil, errors.New("credentialSourceMock: hash not found")
}
return key, nil
}
var _ TreeFetcher = (*treeFetcherMock)(nil)
type treeFetcherMock struct {
configurations map[cid.ID]oid.Address
}
func newTreeFetcherMock(configs map[cid.ID]oid.Address) *treeFetcherMock {
if configs == nil {
configs = map[cid.ID]oid.Address{}
}
return &treeFetcherMock{
configurations: configs,
}
}
func (t *treeFetcherMock) GetBucketLifecycleConfiguration(_ context.Context, bktInfo *data.BucketInfo) (oid.Address, error) {
val, ok := t.configurations[bktInfo.CID]
if !ok {
return oid.Address{}, errors.New("treeFetcherMock: hash not found")
}
return val, nil
}
var _ Settings = (*settingsMock)(nil)
type settingsMock struct{}
func (s *settingsMock) ServicesKeys() keys.PublicKeys {
return nil
}
func TestFetcherBase(t *testing.T) {
ctx := context.Background()
log := zaptest.NewLogger(t)
key, err := keys.NewPrivateKey()
require.NoError(t, err)
mocks, err := initFetcherMocks(2, 1)
require.NoError(t, err)
epochCh := make(chan uint64)
go func() {
epochCh <- 1
close(epochCh)
}()
cfg := Config{
UserFetcher: mocks.userFetcher,
ContainerFetcher: mocks.containerFetcher,
FrostFSFetcher: mocks.configurationFetcher,
CredentialSource: mocks.credentialSource,
TreeFetcher: mocks.treeFetcher,
Settings: &settingsMock{},
CurrentLifecycler: key,
Logger: log,
EpochChannel: epochCh,
}
f := NewJobProvider(ctx, cfg)
var res []Job
for job := range f.Jobs() {
res = append(res, job)
}
require.Len(t, res, 2)
}
func TestFetcherBearer(t *testing.T) {
ctx := context.Background()
log := zaptest.NewLogger(t)
key, err := keys.NewPrivateKey()
require.NoError(t, err)
mocks, err := initFetcherMocks(1, 2)
require.NoError(t, err)
epochCh := make(chan uint64)
go func() {
epochCh <- 1
close(epochCh)
}()
users, err := mocks.userFetcher.Users()
require.NoError(t, err)
require.Len(t, users, 1)
cids := mocks.containerFetcher.containers[users[0]]
require.Len(t, cids, 2)
// emulate lifecycle container for one bucket
addr := mocks.treeFetcher.configurations[cids[0]]
obj := mocks.configurationFetcher.objects[addr]
addr.SetContainer(cidtest.ID())
mocks.treeFetcher.configurations[cids[0]] = addr
obj.SetContainerID(addr.Container())
mocks.configurationFetcher.objects[addr] = obj
mocks.configurationFetcher.lifecycleContainer = addr.Container()
mocks.configurationFetcher.enableBearerCheck = true
cfg := Config{
UserFetcher: mocks.userFetcher,
ContainerFetcher: mocks.containerFetcher,
FrostFSFetcher: mocks.configurationFetcher,
CredentialSource: mocks.credentialSource,
TreeFetcher: mocks.treeFetcher,
Settings: &settingsMock{},
CurrentLifecycler: key,
Logger: log,
EpochChannel: epochCh,
}
f := NewJobProvider(ctx, cfg)
var res []Job
for job := range f.Jobs() {
res = append(res, job)
}
require.Len(t, res, 2)
}
func TestFetcherCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log := zaptest.NewLogger(t)
key, err := keys.NewPrivateKey()
require.NoError(t, err)
mocks, err := initFetcherMocks(1, 1)
require.NoError(t, err)
epochCh := make(chan uint64)
go func() {
epochCh <- 1
epochCh <- 2
close(epochCh)
}()
cfg := Config{
UserFetcher: mocks.userFetcher,
ContainerFetcher: mocks.containerFetcher,
FrostFSFetcher: mocks.configurationFetcher,
CredentialSource: mocks.credentialSource,
TreeFetcher: mocks.treeFetcher,
Settings: &settingsMock{},
CurrentLifecycler: key,
Logger: log,
EpochChannel: epochCh,
}
f := NewJobProvider(ctx, cfg)
var res []Job
for job := range f.Jobs() {
res = append(res, job)
}
require.Len(t, res, 1)
}
func TestFetcherCleanBuffer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log := zaptest.NewLogger(t)
key, err := keys.NewPrivateKey()
require.NoError(t, err)
mocks, err := initFetcherMocks(1, 10)
require.NoError(t, err)
epochCh := make(chan uint64)
f := newJobProvider(ctx, mocks, epochCh, key, log, 11)
epochCh <- 1
for len(f.Jobs()) != 10 { // wait jobs buffer be filled by first epoch works
time.Sleep(100 * time.Millisecond)
}
mocks, err = initFetcherMocks(1, 11)
require.NoError(t, err)
updateFetcherMocks(f, mocks)
epochCh <- 2
close(epochCh)
for len(f.Jobs()) != 11 { // wait jobs buffer be filled by second epoch works
time.Sleep(100 * time.Millisecond)
}
for job := range f.Jobs() {
require.Equal(t, uint64(2), job.Epoch, "not all old epoch job is cleaned from buffer")
}
}
func newJobProvider(ctx context.Context, mocks *fetchersMock, epochCh <-chan uint64, key *keys.PrivateKey, log *zap.Logger, bufferSize int) *JobProvider {
cfg := Config{
UserFetcher: mocks.userFetcher,
ContainerFetcher: mocks.containerFetcher,
FrostFSFetcher: mocks.configurationFetcher,
CredentialSource: mocks.credentialSource,
TreeFetcher: mocks.treeFetcher,
Settings: &settingsMock{},
CurrentLifecycler: key,
Logger: log,
EpochChannel: epochCh,
BufferSize: bufferSize,
}
return NewJobProvider(ctx, cfg)
}
func updateFetcherMocks(f *JobProvider, mocks *fetchersMock) {
f.userFetcher = mocks.userFetcher
f.containerFetcher = mocks.containerFetcher
f.frostfsFetcher = mocks.configurationFetcher
f.credentialSource = mocks.credentialSource
f.treeFetcher = mocks.treeFetcher
}
type fetchersMock struct {
userFetcher *userFetcherMock
containerFetcher *containerFetcherMock
configurationFetcher *frostfsFetcherMock
credentialSource *credentialSourceMock
treeFetcher *treeFetcherMock
}
func initFetcherMocks(users, containers int) (*fetchersMock, error) {
usersMap, err := generateUsersMap(users)
if err != nil {
return nil, err
}
ffsFetcher := newFrostFSFetcherMock()
cnrsMap := make(map[util.Uint160][]cid.ID)
treeMap := make(map[cid.ID]oid.Address)
for hash := range usersMap {
for i := 0; i < containers; i++ {
addr := oidtest.Address()
cnrsMap[hash] = append(cnrsMap[hash], addr.Container())
treeMap[addr.Container()] = addr
lc := &data.LifecycleConfiguration{Rules: []data.LifecycleRule{{ID: addr.EncodeToString()}}}
raw, err := xml.Marshal(lc)
if err != nil {
return nil, err
}
obj := object.New()
obj.SetPayload(raw)
obj.SetContainerID(addr.Container())
obj.SetID(addr.Object())
ffsFetcher.objects[addr] = obj
}
}
return &fetchersMock{
userFetcher: newUserFetcherMock(usersMap),
containerFetcher: newContainerFetcherMock(cnrsMap),
configurationFetcher: ffsFetcher,
credentialSource: newCredentialSourceMock(usersMap),
treeFetcher: newTreeFetcherMock(treeMap),
}, nil
}
func generateKeys(n int) ([]*keys.PrivateKey, error) {
var err error
res := make([]*keys.PrivateKey, n)
for i := 0; i < n; i++ {
if res[i], err = keys.NewPrivateKey(); err != nil {
return nil, err
}
}
return res, nil
}
func generateUsersMap(n int) (map[util.Uint160]*keys.PrivateKey, error) {
res := make(map[util.Uint160]*keys.PrivateKey, n)
userKeys, err := generateKeys(n)
if err != nil {
return nil, err
}
for _, key := range userKeys {
res[key.GetScriptHash()] = key
}
return res, nil
}