[#434] Remove container on failed bucket creation

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-09-24 14:46:02 +03:00 committed by Alexey Vanin
parent 5358e39f71
commit 738ce14f50
4 changed files with 66 additions and 2 deletions

View file

@ -250,6 +250,7 @@ func getMinCacheConfig(logger *zap.Logger) *layer.CachesConfig {
type apeMock struct {
chainMap map[engine.Target][]*chain.Chain
policyMap map[string][]byte
err error
}
func newAPEMock() *apeMock {
@ -293,6 +294,10 @@ func (a *apeMock) DeletePolicy(namespace string, cnrID cid.ID) error {
}
func (a *apeMock) PutBucketPolicy(ns string, cnrID cid.ID, policy []byte, chain []*chain.Chain) error {
if a.err != nil {
return a.err
}
if err := a.PutPolicy(ns, cnrID, policy); err != nil {
return err
}
@ -307,6 +312,10 @@ func (a *apeMock) PutBucketPolicy(ns string, cnrID cid.ID, policy []byte, chain
}
func (a *apeMock) DeleteBucketPolicy(ns string, cnrID cid.ID, chainIDs []chain.ID) error {
if a.err != nil {
return a.err
}
if err := a.DeletePolicy(ns, cnrID); err != nil {
return err
}
@ -320,6 +329,10 @@ func (a *apeMock) DeleteBucketPolicy(ns string, cnrID cid.ID, chainIDs []chain.I
}
func (a *apeMock) GetBucketPolicy(ns string, cnrID cid.ID) ([]byte, error) {
if a.err != nil {
return nil, a.err
}
policy, ok := a.policyMap[ns+cnrID.EncodeToString()]
if !ok {
return nil, errors.New("not found")
@ -329,6 +342,10 @@ func (a *apeMock) GetBucketPolicy(ns string, cnrID cid.ID) ([]byte, error) {
}
func (a *apeMock) SaveACLChains(cid string, chains []*chain.Chain) error {
if a.err != nil {
return a.err
}
for i := range chains {
if err := a.AddChain(engine.ContainerTarget(cid), chains[i]); err != nil {
return err

View file

@ -2,6 +2,7 @@ package handler
import (
"bytes"
"context"
"crypto/md5"
"encoding/base64"
"encoding/json"
@ -783,7 +784,8 @@ func (h *handler) createBucketHandlerPolicy(w http.ResponseWriter, r *http.Reque
chains := bucketCannedACLToAPERules(cannedACL, reqInfo, bktInfo.CID)
if err = h.ape.SaveACLChains(bktInfo.CID.EncodeToString(), chains); err != nil {
h.logAndSendError(w, "failed to add morph rule chain", reqInfo, err)
cleanErr := h.cleanupBucketCreation(ctx, reqInfo, bktInfo, boxData, chains)
h.logAndSendError(w, "failed to add morph rule chain", reqInfo, err, zap.NamedError("cleanup_error", cleanErr))
return
}
@ -804,8 +806,9 @@ func (h *handler) createBucketHandlerPolicy(w http.ResponseWriter, r *http.Reque
return h.obj.PutBucketSettings(ctx, sp)
}, h.putBucketSettingsRetryer())
if err != nil {
cleanErr := h.cleanupBucketCreation(ctx, reqInfo, bktInfo, boxData, chains)
h.logAndSendError(w, "couldn't save bucket settings", reqInfo, err,
zap.String("container_id", bktInfo.CID.EncodeToString()))
zap.String("container_id", bktInfo.CID.EncodeToString()), zap.NamedError("cleanup_error", cleanErr))
return
}
@ -815,6 +818,28 @@ func (h *handler) createBucketHandlerPolicy(w http.ResponseWriter, r *http.Reque
}
}
func (h *handler) cleanupBucketCreation(ctx context.Context, reqInfo *middleware.ReqInfo, bktInfo *data.BucketInfo, boxData *accessbox.Box, chains []*chain.Chain) error {
prm := &layer.DeleteBucketParams{
BktInfo: bktInfo,
SessionToken: boxData.Gate.SessionTokenForDelete(),
}
if err := h.obj.DeleteContainer(ctx, prm); err != nil {
return err
}
chainIDs := make([]chain.ID, len(chains))
for i, c := range chains {
chainIDs[i] = c.ID
}
if err := h.ape.DeleteBucketPolicy(reqInfo.Namespace, bktInfo.CID, chainIDs); err != nil {
return fmt.Errorf("delete bucket acl policy: %w", err)
}
return nil
}
func (h *handler) putBucketSettingsRetryer() aws.RetryerV2 {
return retry.NewStandard(func(options *retry.StandardOptions) {
options.MaxAttempts = h.cfg.RetryMaxAttempts()

View file

@ -7,6 +7,7 @@ import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"io"
"mime/multipart"
"net/http"
@ -484,6 +485,19 @@ func TestCreateBucket(t *testing.T) {
createBucketAssertS3Error(hc, bktName, box2, s3errors.ErrBucketAlreadyExists)
}
func TestCreateBucketWithoutPermissions(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bkt-name"
hc.h.ape.(*apeMock).err = errors.New("no permissions")
box, _ := createAccessBox(t)
createBucketAssertS3Error(hc, bktName, box, s3errors.ErrInternalError)
_, err := hc.tp.ContainerID(bktName)
require.Errorf(t, err, "container exists after failed creation, but shouldn't")
}
func TestCreateNamespacedBucket(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bkt-name"

View file

@ -854,6 +854,14 @@ func (n *Layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
return nil
}
func (n *Layer) DeleteContainer(ctx context.Context, p *DeleteBucketParams) error {
n.cache.DeleteBucket(p.BktInfo)
if err := n.frostFS.DeleteContainer(ctx, p.BktInfo.CID, p.SessionToken); err != nil {
return fmt.Errorf("delete container: %w", err)
}
return nil
}
func (n *Layer) GetNetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
cachedInfo := n.cache.GetNetworkInfo()
if cachedInfo != nil {