[#306] Use APE instead of eACL on bucket creation

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
pull/310/head
Denis Kirillov 2024-02-12 11:00:04 +03:00
parent 37be8851b3
commit 1f2cf0ed67
7 changed files with 202 additions and 39 deletions

View File

@ -31,6 +31,7 @@ type (
LocationConstraint string
ObjectLockEnabled bool
HomomorphicHashDisabled bool
APEEnabled bool
}
// ObjectInfo holds S3 object data.
@ -62,6 +63,7 @@ type (
BucketSettings struct {
Versioning string `json:"versioning"`
LockConfiguration *ObjectLockConfiguration `json:"lock_configuration"`
APE bool `json:"ape"`
}
// CORSConfiguration stores CORS configuration of a request.

View File

@ -4,6 +4,7 @@ import (
"bytes"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"encoding/json"
"encoding/xml"
"fmt"
@ -24,8 +25,14 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
"git.frostfs.info/TrueCloudLab/policy-engine/schema/native"
"git.frostfs.info/TrueCloudLab/policy-engine/schema/s3"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap"
)
@ -744,6 +751,20 @@ func parseMetadata(r *http.Request) map[string]string {
return res
}
func parseCannedACL(header http.Header) (string, error) {
acl := header.Get(api.AmzACL)
if len(acl) == 0 {
return basicACLPrivate, nil
}
if acl == basicACLPrivate || acl == basicACLPublic ||
acl == cannedACLAuthRead || acl == basicACLReadOnly {
return acl, nil
}
return "", fmt.Errorf("unknown acl: %s", acl)
}
func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
reqInfo := middleware.GetReqInfo(ctx)
@ -763,16 +784,9 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) {
return
}
bktACL, err := parseACLHeaders(r.Header, key)
cannedACL, err := parseCannedACL(r.Header)
if err != nil {
h.logAndSendError(w, "could not parse bucket acl", reqInfo, err)
return
}
resInfo := &resourceInfo{Bucket: reqInfo.BucketName}
p.EACL, err = bucketACLToTable(bktACL, resInfo)
if err != nil {
h.logAndSendError(w, "could translate bucket acl to eacl", reqInfo, err)
h.logAndSendError(w, "could not parse canned ACL", reqInfo, err)
return
}
@ -787,7 +801,6 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) {
if err == nil {
policies = boxData.Policies
p.SessionContainerCreation = boxData.Gate.SessionTokenForPut()
p.SessionEACL = boxData.Gate.SessionTokenForSetEACL()
}
if p.SessionContainerCreation == nil {
@ -795,12 +808,7 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) {
return
}
if p.SessionEACL == nil {
h.logAndSendError(w, "couldn't find session token for setEACL", reqInfo, errors.GetAPIError(errors.ErrAccessDenied))
return
}
if err = h.setPolicy(p, reqInfo.Namespace, createParams.LocationConstraint, policies); err != nil {
if err = h.setPlacementPolicy(p, reqInfo.Namespace, createParams.LocationConstraint, policies); err != nil {
h.logAndSendError(w, "couldn't set placement policy", reqInfo, err)
return
}
@ -812,25 +820,165 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) {
h.logAndSendError(w, "could not create bucket", reqInfo, err)
return
}
h.reqLogger(ctx).Info(logs.BucketIsCreated, zap.Stringer("container_id", bktInfo.CID))
if p.ObjectLockEnabled {
sp := &layer.PutSettingsParams{
BktInfo: bktInfo,
Settings: &data.BucketSettings{Versioning: data.VersioningEnabled},
}
if err = h.obj.PutBucketSettings(ctx, sp); err != nil {
h.logAndSendError(w, "couldn't enable bucket versioning", reqInfo, err,
zap.String("container_id", bktInfo.CID.EncodeToString()))
chainRules := bucketCannedACLToAPERules(cannedACL, reqInfo, key, bktInfo.CID)
target := engine.NamespaceTarget(reqInfo.Namespace)
for _, chainPolicy := range chainRules {
if err = h.ape.AddChain(target, chainPolicy); err != nil {
h.logAndSendError(w, "failed to add morph rule chain", reqInfo, err, zap.String("chain_id", string(chainPolicy.ID)))
return
}
}
sp := &layer.PutSettingsParams{
BktInfo: bktInfo,
Settings: &data.BucketSettings{APE: true},
}
if p.ObjectLockEnabled {
sp.Settings.Versioning = data.VersioningEnabled
}
if err = h.obj.PutBucketSettings(ctx, sp); err != nil {
h.logAndSendError(w, "couldn't save bucket settings", reqInfo, err,
zap.String("container_id", bktInfo.CID.EncodeToString()))
return
}
middleware.WriteSuccessResponseHeadersOnly(w)
}
func (h handler) setPolicy(prm *layer.CreateBucketParams, namespace, locationConstraint string, userPolicies []*accessbox.ContainerPolicy) error {
const s3ActionPrefix = "s3:"
var (
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html
writeACLBucketS3Actions = []string{
s3ActionPrefix + middleware.PutObjectOperation,
s3ActionPrefix + middleware.PostObjectOperation,
s3ActionPrefix + middleware.CopyObjectOperation,
s3ActionPrefix + middleware.UploadPartOperation,
s3ActionPrefix + middleware.UploadPartCopyOperation,
s3ActionPrefix + middleware.CreateMultipartUploadOperation,
s3ActionPrefix + middleware.CompleteMultipartUploadOperation,
}
readACLBucketS3Actions = []string{
s3ActionPrefix + middleware.HeadBucketOperation,
s3ActionPrefix + middleware.GetBucketLocationOperation,
s3ActionPrefix + middleware.ListObjectsV1Operation,
s3ActionPrefix + middleware.ListObjectsV2Operation,
s3ActionPrefix + middleware.ListBucketObjectVersionsOperation,
s3ActionPrefix + middleware.ListMultipartUploadsOperation,
}
writeACLBucketNativeActions = []string{
native.MethodPutObject,
}
readACLBucketNativeActions = []string{
native.MethodGetContainer,
native.MethodGetObject,
native.MethodHeadObject,
native.MethodSearchObject,
native.MethodRangeObject,
native.MethodHashObject,
}
)
func bucketCannedACLToAPERules(cannedACL string, reqInfo *middleware.ReqInfo, key *keys.PublicKey, cnrID cid.ID) []*chain.Chain {
cnrIDStr := cnrID.EncodeToString()
chains := []*chain.Chain{
{
ID: getBucketCannedChainID(chain.S3, cnrID),
Rules: []chain.Rule{{
Status: chain.Allow,
Actions: chain.Actions{Names: []string{"s3:*"}},
Resources: chain.Resources{Names: []string{
fmt.Sprintf(s3.ResourceFormatS3Bucket, reqInfo.BucketName),
fmt.Sprintf(s3.ResourceFormatS3BucketObjects, reqInfo.BucketName),
}},
Condition: []chain.Condition{{
Op: chain.CondStringEquals,
Object: chain.ObjectRequest,
Key: s3.PropertyKeyOwner,
Value: key.Address(),
}},
}}},
{
ID: getBucketCannedChainID(chain.Ingress, cnrID),
Rules: []chain.Rule{{
Status: chain.Allow,
Actions: chain.Actions{Names: []string{"*"}},
Resources: chain.Resources{Names: []string{
fmt.Sprintf(native.ResourceFormatNamespaceContainer, reqInfo.Namespace, cnrIDStr),
fmt.Sprintf(native.ResourceFormatNamespaceContainerObjects, reqInfo.Namespace, cnrIDStr),
}},
Condition: []chain.Condition{{
Op: chain.CondStringEquals,
Object: chain.ObjectRequest,
Key: native.PropertyKeyActorPublicKey,
Value: hex.EncodeToString(key.Bytes()),
}},
}},
},
}
switch cannedACL {
case basicACLPrivate:
case cannedACLAuthRead:
fallthrough
case basicACLReadOnly:
chains[0].Rules = append(chains[0].Rules, chain.Rule{
Status: chain.Allow,
Actions: chain.Actions{Names: readACLBucketS3Actions},
Resources: chain.Resources{Names: []string{
fmt.Sprintf(s3.ResourceFormatS3Bucket, reqInfo.BucketName),
fmt.Sprintf(s3.ResourceFormatS3BucketObjects, reqInfo.BucketName),
}},
})
chains[1].Rules = append(chains[1].Rules, chain.Rule{
Status: chain.Allow,
Actions: chain.Actions{Names: readACLBucketNativeActions},
Resources: chain.Resources{Names: []string{
fmt.Sprintf(native.ResourceFormatNamespaceContainer, reqInfo.Namespace, cnrIDStr),
fmt.Sprintf(native.ResourceFormatNamespaceContainerObjects, reqInfo.Namespace, cnrIDStr),
}},
})
case basicACLPublic:
chains[0].Rules = append(chains[0].Rules, chain.Rule{
Status: chain.Allow,
Actions: chain.Actions{Names: append(readACLBucketS3Actions, writeACLBucketS3Actions...)},
Resources: chain.Resources{Names: []string{
fmt.Sprintf(s3.ResourceFormatS3Bucket, reqInfo.BucketName),
fmt.Sprintf(s3.ResourceFormatS3BucketObjects, reqInfo.BucketName),
}},
})
chains[1].Rules = append(chains[1].Rules, chain.Rule{
Status: chain.Allow,
Actions: chain.Actions{Names: append(readACLBucketNativeActions, writeACLBucketNativeActions...)},
Resources: chain.Resources{Names: []string{
fmt.Sprintf(native.ResourceFormatNamespaceContainer, reqInfo.Namespace, cnrIDStr),
fmt.Sprintf(native.ResourceFormatNamespaceContainerObjects, reqInfo.Namespace, cnrIDStr),
}},
})
default:
panic("unknown canned acl") // this should never happen
}
return chains
}
func getBucketCannedChainID(prefix chain.Name, cnrID cid.ID) chain.ID {
return chain.ID(string(prefix) + ":bktCanned" + string(cnrID[:]))
}
func (h handler) setPlacementPolicy(prm *layer.CreateBucketParams, namespace, locationConstraint string, userPolicies []*accessbox.ContainerPolicy) error {
prm.Policy = h.cfg.DefaultPlacementPolicy(namespace)
prm.LocationConstraint = locationConstraint

View File

@ -28,6 +28,7 @@ type (
const (
attributeLocationConstraint = ".s3-location-constraint"
attributeAPEEnabled = ".s3-APE-enabled"
AttributeLockEnabled = "LockEnabled"
)
@ -74,6 +75,17 @@ func (n *layer) containerInfo(ctx context.Context, idCnr cid.ID) (*data.BucketIn
}
}
APEEnabled := cnr.Attribute(attributeAPEEnabled)
if len(APEEnabled) > 0 {
info.APEEnabled, err = strconv.ParseBool(APEEnabled)
if err != nil {
log.Error(logs.CouldNotParseContainerAPEEnabledAttribute,
zap.String("ape_enabled", APEEnabled),
zap.Error(err),
)
}
}
zone, _ := n.features.FormContainerZone(reqInfo.Namespace)
if zone != info.Zone {
return nil, fmt.Errorf("ns '%s' and zone '%s' are mismatched for container '%s'", zone, info.Zone, idCnr)
@ -119,13 +131,13 @@ func (n *layer) createContainer(ctx context.Context, p *CreateBucketParams) (*da
Created: TimeNow(ctx),
LocationConstraint: p.LocationConstraint,
ObjectLockEnabled: p.ObjectLockEnabled,
APEEnabled: true,
}
var attributes [][2]string
attributes = append(attributes, [2]string{
attributeLocationConstraint, p.LocationConstraint,
})
attributes := [][2]string{
{attributeLocationConstraint, p.LocationConstraint},
{attributeAPEEnabled, "true"},
}
if p.ObjectLockEnabled {
attributes = append(attributes, [2]string{
@ -149,10 +161,6 @@ func (n *layer) createContainer(ctx context.Context, p *CreateBucketParams) (*da
bktInfo.CID = res.ContainerID
bktInfo.HomomorphicHashDisabled = res.HomomorphicHashDisabled
if err = n.setContainerEACLTable(ctx, bktInfo.CID, p.EACL, p.SessionEACL); err != nil {
return nil, fmt.Errorf("set container eacl: %w", err)
}
n.cache.PutBucket(bktInfo)
return bktInfo, nil

View File

@ -175,9 +175,7 @@ type (
Name string
Namespace string
Policy netmap.PlacementPolicy
EACL *eacl.Table
SessionContainerCreation *session.Container
SessionEACL *session.Container
LocationConstraint string
ObjectLockEnabled bool
}

View File

@ -108,7 +108,7 @@ var basicACLZero acl.Basic
// If prm.BasicACL is zero, 'eacl-public-read-write' is used.
func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCreate) (*layer.ContainerCreateResult, error) {
if prm.BasicACL == basicACLZero {
prm.BasicACL = acl.PublicRWExtended
prm.BasicACL = acl.PublicRW
}
var cnr container.Container

View File

@ -141,4 +141,5 @@ const (
CouldntDeleteObjectFromStorageContinueDeleting = "couldn't delete object from storage, continue deleting from tree"
CouldntPutAccessBoxIntoCache = "couldn't put accessbox into cache"
InvalidAccessBoxCacheRemovingCheckInterval = "invalid accessbox check removing interval, using default value"
CouldNotParseContainerAPEEnabledAttribute = "could not parse container APE enabled attribute"
)

View File

@ -78,6 +78,7 @@ var (
const (
versioningKV = "Versioning"
apeKV = "APE"
lockConfigurationKV = "LockConfiguration"
oidKV = "OID"
@ -332,7 +333,7 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
}
func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
keysToReturn := []string{versioningKV, lockConfigurationKV}
keysToReturn := []string{versioningKV, lockConfigurationKV, apeKV}
node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName}, keysToReturn)
if err != nil {
return nil, fmt.Errorf("couldn't get node: %w", err)
@ -349,6 +350,10 @@ func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*
}
}
if ape, ok := node.Get(apeKV); ok {
settings.APE, _ = strconv.ParseBool(ape)
}
return settings, nil
}
@ -1384,6 +1389,7 @@ func metaFromSettings(settings *data.BucketSettings) map[string]string {
results[FileNameKey] = settingsFileName
results[versioningKV] = settings.Versioning
results[lockConfigurationKV] = encodeLockConfiguration(settings.LockConfiguration)
results[apeKV] = strconv.FormatBool(settings.APE)
return results
}