forked from TrueCloudLab/frostfs-s3-gw
[#266] Support per namespace placement policies configuration
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
0db6cd6727
commit
28c6bb4cb8
18 changed files with 307 additions and 52 deletions
|
@ -31,10 +31,10 @@ type (
|
|||
|
||||
// Config contains data which handler needs to keep.
|
||||
Config interface {
|
||||
DefaultPlacementPolicy() netmap.PlacementPolicy
|
||||
PlacementPolicy(string) (netmap.PlacementPolicy, bool)
|
||||
CopiesNumbers(string) ([]uint32, bool)
|
||||
DefaultCopiesNumbers() []uint32
|
||||
DefaultPlacementPolicy(namespace string) netmap.PlacementPolicy
|
||||
PlacementPolicy(namespace, constraint string) (netmap.PlacementPolicy, bool)
|
||||
CopiesNumbers(namespace, constraint string) ([]uint32, bool)
|
||||
DefaultCopiesNumbers(namespace string) []uint32
|
||||
NewXMLDecoder(io.Reader) *xml.Decoder
|
||||
DefaultMaxAge() int
|
||||
NotificatorEnabled() bool
|
||||
|
@ -74,7 +74,7 @@ func New(log *zap.Logger, obj layer.Client, notificator Notificator, cfg Config)
|
|||
// 1) array of copies numbers sent in request's header has the highest priority.
|
||||
// 2) array of copies numbers with corresponding location constraint provided in the config file.
|
||||
// 3) default copies number from the config file wrapped into array.
|
||||
func (h *handler) pickCopiesNumbers(metadata map[string]string, locationConstraint string) ([]uint32, error) {
|
||||
func (h *handler) pickCopiesNumbers(metadata map[string]string, namespace, locationConstraint string) ([]uint32, error) {
|
||||
copiesNumbersStr, ok := metadata[layer.AttributeFrostfsCopiesNumber]
|
||||
if ok {
|
||||
result, err := parseCopiesNumbers(copiesNumbersStr)
|
||||
|
@ -84,12 +84,12 @@ func (h *handler) pickCopiesNumbers(metadata map[string]string, locationConstrai
|
|||
return result, nil
|
||||
}
|
||||
|
||||
copiesNumbers, ok := h.cfg.CopiesNumbers(locationConstraint)
|
||||
copiesNumbers, ok := h.cfg.CopiesNumbers(namespace, locationConstraint)
|
||||
if ok {
|
||||
return copiesNumbers, nil
|
||||
}
|
||||
|
||||
return h.cfg.DefaultCopiesNumbers(), nil
|
||||
return h.cfg.DefaultCopiesNumbers(namespace), nil
|
||||
}
|
||||
|
||||
func parseCopiesNumbers(copiesNumbersStr string) ([]uint32, error) {
|
||||
|
|
|
@ -26,7 +26,7 @@ func TestCopiesNumberPicker(t *testing.T) {
|
|||
metadata["somekey1"] = "5, 6, 7"
|
||||
expectedCopiesNumbers := []uint32{1}
|
||||
|
||||
actualCopiesNumbers, err := h.pickCopiesNumbers(metadata, locationConstraint2)
|
||||
actualCopiesNumbers, err := h.pickCopiesNumbers(metadata, "", locationConstraint2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedCopiesNumbers, actualCopiesNumbers)
|
||||
})
|
||||
|
@ -35,7 +35,7 @@ func TestCopiesNumberPicker(t *testing.T) {
|
|||
metadata["somekey2"] = "6, 7, 8"
|
||||
expectedCopiesNumbers := []uint32{2, 3, 4}
|
||||
|
||||
actualCopiesNumbers, err := h.pickCopiesNumbers(metadata, locationConstraint1)
|
||||
actualCopiesNumbers, err := h.pickCopiesNumbers(metadata, "", locationConstraint1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedCopiesNumbers, actualCopiesNumbers)
|
||||
})
|
||||
|
@ -44,7 +44,7 @@ func TestCopiesNumberPicker(t *testing.T) {
|
|||
metadata["frostfs-copies-number"] = "7, 8, 9"
|
||||
expectedCopiesNumbers := []uint32{7, 8, 9}
|
||||
|
||||
actualCopiesNumbers, err := h.pickCopiesNumbers(metadata, locationConstraint2)
|
||||
actualCopiesNumbers, err := h.pickCopiesNumbers(metadata, "", locationConstraint2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedCopiesNumbers, actualCopiesNumbers)
|
||||
})
|
||||
|
@ -53,7 +53,7 @@ func TestCopiesNumberPicker(t *testing.T) {
|
|||
metadata["frostfs-copies-number"] = "7,8,9"
|
||||
expectedCopiesNumbers := []uint32{7, 8, 9}
|
||||
|
||||
actualCopiesNumbers, err := h.pickCopiesNumbers(metadata, locationConstraint2)
|
||||
actualCopiesNumbers, err := h.pickCopiesNumbers(metadata, "", locationConstraint2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedCopiesNumbers, actualCopiesNumbers)
|
||||
})
|
||||
|
@ -62,7 +62,7 @@ func TestCopiesNumberPicker(t *testing.T) {
|
|||
metadata["frostfs-copies-number"] = "11, 12, 13, "
|
||||
expectedCopiesNumbers := []uint32{11, 12, 13}
|
||||
|
||||
actualCopiesNumbers, err := h.pickCopiesNumbers(metadata, locationConstraint2)
|
||||
actualCopiesNumbers, err := h.pickCopiesNumbers(metadata, "", locationConstraint2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedCopiesNumbers, actualCopiesNumbers)
|
||||
})
|
||||
|
|
|
@ -204,7 +204,7 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
DstEncryption: dstEncryptionParams,
|
||||
}
|
||||
|
||||
params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, dstBktInfo.LocationConstraint)
|
||||
params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, reqInfo.Namespace, dstBktInfo.LocationConstraint)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "invalid copies number", reqInfo, err)
|
||||
return
|
||||
|
|
|
@ -55,7 +55,7 @@ func (h *handler) PutBucketCorsHandler(w http.ResponseWriter, r *http.Request) {
|
|||
NewDecoder: h.cfg.NewXMLDecoder,
|
||||
}
|
||||
|
||||
p.CopiesNumbers, err = h.pickCopiesNumbers(parseMetadata(r), bktInfo.LocationConstraint)
|
||||
p.CopiesNumbers, err = h.pickCopiesNumbers(parseMetadata(r), reqInfo.Namespace, bktInfo.LocationConstraint)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "invalid copies number", reqInfo, err)
|
||||
return
|
||||
|
|
|
@ -67,20 +67,20 @@ type configMock struct {
|
|||
md5Enabled bool
|
||||
}
|
||||
|
||||
func (c *configMock) DefaultPlacementPolicy() netmap.PlacementPolicy {
|
||||
func (c *configMock) DefaultPlacementPolicy(_ string) netmap.PlacementPolicy {
|
||||
return c.defaultPolicy
|
||||
}
|
||||
|
||||
func (c *configMock) PlacementPolicy(string) (netmap.PlacementPolicy, bool) {
|
||||
func (c *configMock) PlacementPolicy(_, _ string) (netmap.PlacementPolicy, bool) {
|
||||
return netmap.PlacementPolicy{}, false
|
||||
}
|
||||
|
||||
func (c *configMock) CopiesNumbers(locationConstraint string) ([]uint32, bool) {
|
||||
func (c *configMock) CopiesNumbers(_, locationConstraint string) ([]uint32, bool) {
|
||||
result, ok := c.copiesNumbers[locationConstraint]
|
||||
return result, ok
|
||||
}
|
||||
|
||||
func (c *configMock) DefaultCopiesNumbers() []uint32 {
|
||||
func (c *configMock) DefaultCopiesNumbers(_ string) []uint32 {
|
||||
return c.defaultCopiesNumbers
|
||||
}
|
||||
|
||||
|
|
|
@ -145,7 +145,7 @@ func (h *handler) PutObjectLegalHoldHandler(w http.ResponseWriter, r *http.Reque
|
|||
},
|
||||
}
|
||||
|
||||
p.CopiesNumbers, err = h.pickCopiesNumbers(parseMetadata(r), bktInfo.LocationConstraint)
|
||||
p.CopiesNumbers, err = h.pickCopiesNumbers(parseMetadata(r), reqInfo.Namespace, bktInfo.LocationConstraint)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "invalid copies number", reqInfo, err)
|
||||
return
|
||||
|
@ -229,7 +229,7 @@ func (h *handler) PutObjectRetentionHandler(w http.ResponseWriter, r *http.Reque
|
|||
NewLock: lock,
|
||||
}
|
||||
|
||||
p.CopiesNumbers, err = h.pickCopiesNumbers(parseMetadata(r), bktInfo.LocationConstraint)
|
||||
p.CopiesNumbers, err = h.pickCopiesNumbers(parseMetadata(r), reqInfo.Namespace, bktInfo.LocationConstraint)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "invalid copies number", reqInfo, err)
|
||||
return
|
||||
|
|
|
@ -157,7 +157,7 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re
|
|||
p.Header[api.ContentLanguage] = contentLanguage
|
||||
}
|
||||
|
||||
p.CopiesNumbers, err = h.pickCopiesNumbers(p.Header, bktInfo.LocationConstraint)
|
||||
p.CopiesNumbers, err = h.pickCopiesNumbers(p.Header, reqInfo.Namespace, bktInfo.LocationConstraint)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "invalid copies number", reqInfo, err, additional...)
|
||||
return
|
||||
|
|
|
@ -115,7 +115,7 @@ func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Re
|
|||
Configuration: conf,
|
||||
}
|
||||
|
||||
p.CopiesNumbers, err = h.pickCopiesNumbers(parseMetadata(r), bktInfo.LocationConstraint)
|
||||
p.CopiesNumbers, err = h.pickCopiesNumbers(parseMetadata(r), reqInfo.Namespace, bktInfo.LocationConstraint)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "invalid copies number", reqInfo, err)
|
||||
return
|
||||
|
|
|
@ -248,7 +248,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
ContentSHA256Hash: r.Header.Get(api.AmzContentSha256),
|
||||
}
|
||||
|
||||
params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, bktInfo.LocationConstraint)
|
||||
params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, reqInfo.Namespace, bktInfo.LocationConstraint)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "invalid copies number", reqInfo, err)
|
||||
return
|
||||
|
@ -800,7 +800,7 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
if err = h.setPolicy(p, createParams.LocationConstraint, policies); err != nil {
|
||||
if err = h.setPolicy(p, reqInfo.Namespace, createParams.LocationConstraint, policies); err != nil {
|
||||
h.logAndSendError(w, "couldn't set placement policy", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
@ -830,8 +830,8 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) {
|
|||
middleware.WriteSuccessResponseHeadersOnly(w)
|
||||
}
|
||||
|
||||
func (h handler) setPolicy(prm *layer.CreateBucketParams, locationConstraint string, userPolicies []*accessbox.ContainerPolicy) error {
|
||||
prm.Policy = h.cfg.DefaultPlacementPolicy()
|
||||
func (h handler) setPolicy(prm *layer.CreateBucketParams, namespace, locationConstraint string, userPolicies []*accessbox.ContainerPolicy) error {
|
||||
prm.Policy = h.cfg.DefaultPlacementPolicy(namespace)
|
||||
prm.LocationConstraint = locationConstraint
|
||||
|
||||
if locationConstraint == "" {
|
||||
|
@ -845,7 +845,7 @@ func (h handler) setPolicy(prm *layer.CreateBucketParams, locationConstraint str
|
|||
}
|
||||
}
|
||||
|
||||
if policy, ok := h.cfg.PlacementPolicy(locationConstraint); ok {
|
||||
if policy, ok := h.cfg.PlacementPolicy(namespace, locationConstraint); ok {
|
||||
prm.Policy = policy
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue