forked from TrueCloudLab/frostfs-s3-gw
[#274] Refactor system cache and cors
Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
This commit is contained in:
parent
5b4b9df031
commit
91bed76010
7 changed files with 192 additions and 159 deletions
30
api/cache/system.go
vendored
30
api/cache/system.go
vendored
|
@ -4,7 +4,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/bluele/gcache"
|
"github.com/bluele/gcache"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SystemCache provides lru cache for objects.
|
// SystemCache provides lru cache for objects.
|
||||||
|
@ -32,14 +32,14 @@ func NewSystemCache(config *Config) *SystemCache {
|
||||||
return &SystemCache{cache: gc}
|
return &SystemCache{cache: gc}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns cached object.
|
// GetObject returns cached object.
|
||||||
func (o *SystemCache) Get(key string) *object.Object {
|
func (o *SystemCache) GetObject(key string) *data.ObjectInfo {
|
||||||
entry, err := o.cache.Get(key)
|
entry, err := o.cache.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
result, ok := entry.(*object.Object)
|
result, ok := entry.(*data.ObjectInfo)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -47,8 +47,26 @@ func (o *SystemCache) Get(key string) *object.Object {
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put puts an object to cache.
|
func (o *SystemCache) GetCORS(key string) *data.CORSConfiguration {
|
||||||
func (o *SystemCache) Put(key string, obj *object.Object) error {
|
entry, err := o.cache.Get(key)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
result, ok := entry.(*data.CORSConfiguration)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// PutObject puts an object to cache.
|
||||||
|
func (o *SystemCache) PutObject(key string, obj *data.ObjectInfo) error {
|
||||||
|
return o.cache.Set(key, obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *SystemCache) PutCORS(key string, obj *data.CORSConfiguration) error {
|
||||||
return o.cache.Set(key, obj)
|
return o.cache.Set(key, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package data
|
package data
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/xml"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
|
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
|
||||||
|
@ -39,6 +40,22 @@ type (
|
||||||
Owner *owner.ID
|
Owner *owner.ID
|
||||||
Headers map[string]string
|
Headers map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CORSConfiguration stores CORS configuration of a request.
|
||||||
|
CORSConfiguration struct {
|
||||||
|
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CORSConfiguration" json:"-"`
|
||||||
|
CORSRules []CORSRule `xml:"CORSRule" json:"CORSRules"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// CORSRule stores rules for CORS in a bucket.
|
||||||
|
CORSRule struct {
|
||||||
|
ID string `xml:"ID,omitempty" json:"ID,omitempty"`
|
||||||
|
AllowedHeaders []string `xml:"AllowedHeader" json:"AllowedHeaders"`
|
||||||
|
AllowedMethods []string `xml:"AllowedMethod" json:"AllowedMethods"`
|
||||||
|
AllowedOrigins []string `xml:"AllowedOrigin" json:"AllowedOrigins"`
|
||||||
|
ExposeHeaders []string `xml:"ExposeHeader" json:"ExposeHeaders"`
|
||||||
|
MaxAgeSeconds int `xml:"MaxAgeSeconds,omitempty" json:"MaxAgeSeconds,omitempty"`
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
// SettingsObjectName is system name for bucket settings file.
|
// SettingsObjectName is system name for bucket settings file.
|
||||||
|
|
|
@ -1,8 +1,6 @@
|
||||||
package handler
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/xml"
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -12,31 +10,12 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
|
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
|
||||||
// CORSConfiguration stores CORS configuration of a request.
|
|
||||||
CORSConfiguration struct {
|
|
||||||
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CORSConfiguration" json:"-"`
|
|
||||||
CORSRules []CORSRule `xml:"CORSRule" json:"CORSRules"`
|
|
||||||
}
|
|
||||||
// CORSRule stores rules for CORS in a bucket.
|
|
||||||
CORSRule struct {
|
|
||||||
ID string `xml:"ID,omitempty" json:"ID,omitempty"`
|
|
||||||
AllowedHeaders []string `xml:"AllowedHeader" json:"AllowedHeaders"`
|
|
||||||
AllowedMethods []string `xml:"AllowedMethod" json:"AllowedMethods"`
|
|
||||||
AllowedOrigins []string `xml:"AllowedOrigin" json:"AllowedOrigins"`
|
|
||||||
ExposeHeaders []string `xml:"ExposeHeader" json:"ExposeHeaders"`
|
|
||||||
MaxAgeSeconds int `xml:"MaxAgeSeconds,omitempty" json:"MaxAgeSeconds,omitempty"`
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// DefaultMaxAge -- default value of Access-Control-Max-Age if this value is not set in a rule.
|
// DefaultMaxAge -- default value of Access-Control-Max-Age if this value is not set in a rule.
|
||||||
DefaultMaxAge = 600
|
DefaultMaxAge = 600
|
||||||
wildcard = "*"
|
wildcard = "*"
|
||||||
)
|
)
|
||||||
|
|
||||||
var supportedMethods = map[string]struct{}{"GET": {}, "HEAD": {}, "POST": {}, "PUT": {}, "DELETE": {}}
|
|
||||||
|
|
||||||
func (h *handler) GetBucketCorsHandler(w http.ResponseWriter, r *http.Request) {
|
func (h *handler) GetBucketCorsHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
reqInfo := api.GetReqInfo(r.Context())
|
reqInfo := api.GetReqInfo(r.Context())
|
||||||
|
|
||||||
|
@ -51,13 +30,16 @@ func (h *handler) GetBucketCorsHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
info, err := h.obj.GetBucketCORS(r.Context(), bktInfo)
|
cors, err := h.obj.GetBucketCORS(r.Context(), bktInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logAndSendError(w, "could not get cors", reqInfo, err)
|
h.logAndSendError(w, "could not get cors", reqInfo, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
api.WriteResponse(w, http.StatusOK, info, api.MimeNone)
|
if err = api.EncodeToResponse(w, cors); err != nil {
|
||||||
|
h.logAndSendError(w, "could not encode cors to response", reqInfo, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handler) PutBucketCorsHandler(w http.ResponseWriter, r *http.Request) {
|
func (h *handler) PutBucketCorsHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
@ -74,30 +56,9 @@ func (h *handler) PutBucketCorsHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
cors := &CORSConfiguration{}
|
|
||||||
if err := xml.NewDecoder(r.Body).Decode(cors); err != nil {
|
|
||||||
h.logAndSendError(w, "could not parse cors configuration", reqInfo, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if cors.CORSRules == nil {
|
|
||||||
h.logAndSendError(w, "could not parse cors rules", reqInfo, errors.GetAPIError(errors.ErrMalformedXML))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = checkCORS(cors); err != nil {
|
|
||||||
h.logAndSendError(w, "invalid cors configuration", reqInfo, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
xml, err := xml.Marshal(cors)
|
|
||||||
if err != nil {
|
|
||||||
h.logAndSendError(w, "could not encode cors configuration to xml", reqInfo, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
p := &layer.PutCORSParams{
|
p := &layer.PutCORSParams{
|
||||||
BktInfo: bktInfo,
|
BktInfo: bktInfo,
|
||||||
CORSConfiguration: xml,
|
Reader: r.Body,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = h.obj.PutBucketCORS(r.Context(), p); err != nil {
|
if err = h.obj.PutBucketCORS(r.Context(), p); err != nil {
|
||||||
|
@ -146,14 +107,11 @@ func (h *handler) AppendCORSHeaders(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
info, err := h.obj.GetBucketCORS(r.Context(), bktInfo)
|
cors, err := h.obj.GetBucketCORS(r.Context(), bktInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cors := &CORSConfiguration{}
|
|
||||||
if err = xml.Unmarshal(info, cors); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
withCredentials := r.Header.Get(api.Authorization) != ""
|
withCredentials := r.Header.Get(api.Authorization) != ""
|
||||||
|
|
||||||
for _, rule := range cors.CORSRules {
|
for _, rule := range cors.CORSRules {
|
||||||
|
@ -213,16 +171,11 @@ func (h *handler) Preflight(w http.ResponseWriter, r *http.Request) {
|
||||||
headers = strings.Split(requestHeaders, ", ")
|
headers = strings.Split(requestHeaders, ", ")
|
||||||
}
|
}
|
||||||
|
|
||||||
info, err := h.obj.GetBucketCORS(r.Context(), bktInfo)
|
cors, err := h.obj.GetBucketCORS(r.Context(), bktInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logAndSendError(w, "could not get cors", reqInfo, err)
|
h.logAndSendError(w, "could not get cors", reqInfo, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cors := &CORSConfiguration{}
|
|
||||||
if err = xml.Unmarshal(info, cors); err != nil {
|
|
||||||
h.logAndSendError(w, "could not parse cors configuration", reqInfo, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, rule := range cors.CORSRules {
|
for _, rule := range cors.CORSRules {
|
||||||
for _, o := range rule.AllowedOrigins {
|
for _, o := range rule.AllowedOrigins {
|
||||||
|
@ -258,22 +211,6 @@ func (h *handler) Preflight(w http.ResponseWriter, r *http.Request) {
|
||||||
h.logAndSendError(w, "Forbidden", reqInfo, errors.GetAPIError(errors.ErrAccessDenied))
|
h.logAndSendError(w, "Forbidden", reqInfo, errors.GetAPIError(errors.ErrAccessDenied))
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkCORS(cors *CORSConfiguration) error {
|
|
||||||
for _, r := range cors.CORSRules {
|
|
||||||
for _, m := range r.AllowedMethods {
|
|
||||||
if _, ok := supportedMethods[m]; !ok {
|
|
||||||
return errors.GetAPIErrorWithError(errors.ErrCORSUnsupportedMethod, fmt.Errorf("unsupported method is %s", m))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, h := range r.ExposeHeaders {
|
|
||||||
if h == wildcard {
|
|
||||||
return errors.GetAPIError(errors.ErrCORSWildcardExposeHeaders)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func checkSubslice(slice []string, subSlice []string) bool {
|
func checkSubslice(slice []string, subSlice []string) bool {
|
||||||
if sliceContains(slice, wildcard) {
|
if sliceContains(slice, wildcard) {
|
||||||
return true
|
return true
|
||||||
|
|
|
@ -1,28 +1,66 @@
|
||||||
package layer
|
package layer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/xml"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||||
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const wildcard = "*"
|
||||||
|
|
||||||
|
var supportedMethods = map[string]struct{}{"GET": {}, "HEAD": {}, "POST": {}, "PUT": {}, "DELETE": {}}
|
||||||
|
|
||||||
func (n *layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
|
func (n *layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
|
||||||
|
var (
|
||||||
|
buf bytes.Buffer
|
||||||
|
tee = io.TeeReader(p.Reader, &buf)
|
||||||
|
cors = &data.CORSConfiguration{}
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := xml.NewDecoder(tee).Decode(cors); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if cors.CORSRules == nil {
|
||||||
|
return errors.GetAPIError(errors.ErrMalformedXML)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := checkCORS(cors); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
s := &PutSystemObjectParams{
|
s := &PutSystemObjectParams{
|
||||||
BktInfo: p.BktInfo,
|
BktInfo: p.BktInfo,
|
||||||
ObjName: p.BktInfo.CORSObjectName(),
|
ObjName: p.BktInfo.CORSObjectName(),
|
||||||
Metadata: map[string]string{},
|
Metadata: map[string]string{},
|
||||||
Prefix: "",
|
Prefix: "",
|
||||||
Payload: p.CORSConfiguration,
|
Reader: &buf,
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := n.putSystemObject(ctx, s)
|
obj, err := n.putSystemObjectIntoNeoFS(ctx, s)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return err
|
if obj.Size == 0 {
|
||||||
|
return errors.GetAPIError(errors.ErrInternalError)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = n.systemCache.PutCORS(systemObjectKey(p.BktInfo, s.ObjName), cors); err != nil {
|
||||||
|
n.log.Error("couldn't cache system object", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]byte, error) {
|
func (n *layer) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*data.CORSConfiguration, error) {
|
||||||
obj, err := n.getSystemObject(ctx, bktInfo, bktInfo.CORSObjectName())
|
cors, err := n.getCORS(ctx, bktInfo, bktInfo.CORSObjectName())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.IsS3Error(err, errors.ErrNoSuchKey) {
|
if errors.IsS3Error(err, errors.ErrNoSuchKey) {
|
||||||
return nil, errors.GetAPIError(errors.ErrNoSuchCORSConfiguration)
|
return nil, errors.GetAPIError(errors.ErrNoSuchCORSConfiguration)
|
||||||
|
@ -30,13 +68,25 @@ func (n *layer) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if obj.Payload() == nil {
|
return cors, nil
|
||||||
return nil, errors.GetAPIError(errors.ErrInternalError)
|
|
||||||
}
|
|
||||||
|
|
||||||
return obj.Payload(), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error {
|
func (n *layer) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error {
|
||||||
return n.deleteSystemObject(ctx, bktInfo, bktInfo.CORSObjectName())
|
return n.deleteSystemObject(ctx, bktInfo, bktInfo.CORSObjectName())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkCORS(cors *data.CORSConfiguration) error {
|
||||||
|
for _, r := range cors.CORSRules {
|
||||||
|
for _, m := range r.AllowedMethods {
|
||||||
|
if _, ok := supportedMethods[m]; !ok {
|
||||||
|
return errors.GetAPIErrorWithError(errors.ErrCORSUnsupportedMethod, fmt.Errorf("unsupported method is %s", m))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, h := range r.ExposeHeaders {
|
||||||
|
if h == wildcard {
|
||||||
|
return errors.GetAPIError(errors.ErrCORSWildcardExposeHeaders)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -93,8 +93,8 @@ type (
|
||||||
|
|
||||||
// PutCORSParams stores PutCORS request parameters.
|
// PutCORSParams stores PutCORS request parameters.
|
||||||
PutCORSParams struct {
|
PutCORSParams struct {
|
||||||
BktInfo *data.BucketInfo
|
BktInfo *data.BucketInfo
|
||||||
CORSConfiguration []byte
|
Reader io.Reader
|
||||||
}
|
}
|
||||||
|
|
||||||
// BucketSettings stores settings such as versioning.
|
// BucketSettings stores settings such as versioning.
|
||||||
|
@ -134,7 +134,7 @@ type (
|
||||||
ObjName string
|
ObjName string
|
||||||
Metadata map[string]string
|
Metadata map[string]string
|
||||||
Prefix string
|
Prefix string
|
||||||
Payload []byte
|
Reader io.Reader
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListObjectVersionsParams stores list objects versions parameters.
|
// ListObjectVersionsParams stores list objects versions parameters.
|
||||||
|
@ -175,7 +175,7 @@ type (
|
||||||
GetBucketVersioning(ctx context.Context, name string) (*BucketSettings, error)
|
GetBucketVersioning(ctx context.Context, name string) (*BucketSettings, error)
|
||||||
|
|
||||||
PutBucketCORS(ctx context.Context, p *PutCORSParams) error
|
PutBucketCORS(ctx context.Context, p *PutCORSParams) error
|
||||||
GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]byte, error)
|
GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*data.CORSConfiguration, error)
|
||||||
DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error
|
DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error
|
||||||
|
|
||||||
ListBuckets(ctx context.Context) ([]*data.BucketInfo, error)
|
ListBuckets(ctx context.Context) ([]*data.BucketInfo, error)
|
||||||
|
@ -453,7 +453,7 @@ func (n *layer) PutObjectTagging(ctx context.Context, p *PutTaggingParams) error
|
||||||
ObjName: p.ObjectInfo.TagsObject(),
|
ObjName: p.ObjectInfo.TagsObject(),
|
||||||
Metadata: p.TagSet,
|
Metadata: p.TagSet,
|
||||||
Prefix: tagPrefix,
|
Prefix: tagPrefix,
|
||||||
Payload: nil,
|
Reader: nil,
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := n.putSystemObject(ctx, s); err != nil {
|
if _, err := n.putSystemObject(ctx, s); err != nil {
|
||||||
|
@ -475,7 +475,7 @@ func (n *layer) PutBucketTagging(ctx context.Context, bucketName string, tagSet
|
||||||
ObjName: formBucketTagObjectName(bucketName),
|
ObjName: formBucketTagObjectName(bucketName),
|
||||||
Metadata: tagSet,
|
Metadata: tagSet,
|
||||||
Prefix: tagPrefix,
|
Prefix: tagPrefix,
|
||||||
Payload: nil,
|
Reader: nil,
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err = n.putSystemObject(ctx, s); err != nil {
|
if _, err = n.putSystemObject(ctx, s); err != nil {
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
package layer
|
package layer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/xml"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -13,7 +13,53 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (n *layer) putSystemObject(ctx context.Context, p *PutSystemObjectParams) (*object.Object, error) {
|
func (n *layer) putSystemObject(ctx context.Context, p *PutSystemObjectParams) (*data.ObjectInfo, error) {
|
||||||
|
objInfo, err := n.putSystemObjectIntoNeoFS(ctx, p)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = n.systemCache.PutObject(systemObjectKey(p.BktInfo, p.ObjName), objInfo); err != nil {
|
||||||
|
n.log.Error("couldn't cache system object", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
return objInfo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *layer) headSystemObject(ctx context.Context, bkt *data.BucketInfo, objName string) (*data.ObjectInfo, error) {
|
||||||
|
if objInfo := n.systemCache.GetObject(systemObjectKey(bkt, objName)); objInfo != nil {
|
||||||
|
return objInfo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
versions, err := n.headSystemVersions(ctx, bkt, objName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = n.systemCache.PutObject(systemObjectKey(bkt, objName), versions.getLast()); err != nil {
|
||||||
|
n.log.Error("couldn't cache system object", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
return versions.getLast(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *layer) deleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error {
|
||||||
|
ids, err := n.objectSearch(ctx, &findParams{cid: bktInfo.CID, attr: objectSystemAttributeName, val: name})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, id := range ids {
|
||||||
|
if err = n.objectDelete(ctx, bktInfo.CID, id); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
n.systemCache.Delete(systemObjectKey(bktInfo, name))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *layer) putSystemObjectIntoNeoFS(ctx context.Context, p *PutSystemObjectParams) (*data.ObjectInfo, error) {
|
||||||
versions, err := n.headSystemVersions(ctx, p.BktInfo, p.ObjName)
|
versions, err := n.headSystemVersions(ctx, p.BktInfo, p.ObjName)
|
||||||
if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) {
|
if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -51,7 +97,7 @@ func (n *layer) putSystemObject(ctx context.Context, p *PutSystemObjectParams) (
|
||||||
raw.SetContainerID(p.BktInfo.CID)
|
raw.SetContainerID(p.BktInfo.CID)
|
||||||
raw.SetAttributes(attributes...)
|
raw.SetAttributes(attributes...)
|
||||||
|
|
||||||
ops := new(client.PutObjectParams).WithObject(raw.Object()).WithPayloadReader(bytes.NewReader(p.Payload))
|
ops := new(client.PutObjectParams).WithObject(raw.Object()).WithPayloadReader(p.Reader)
|
||||||
oid, err := n.pool.PutObject(ctx, ops, n.BearerOpt(ctx))
|
oid, err := n.pool.PutObject(ctx, ops, n.BearerOpt(ctx))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -62,14 +108,6 @@ func (n *layer) putSystemObject(ctx context.Context, p *PutSystemObjectParams) (
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.Payload != nil {
|
|
||||||
meta.ToV2().SetPayload(p.Payload)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = n.systemCache.Put(systemObjectKey(p.BktInfo, p.ObjName), meta); err != nil {
|
|
||||||
n.log.Error("couldn't cache system object", zap.Error(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, id := range idsToDeleteArr {
|
for _, id := range idsToDeleteArr {
|
||||||
if err = n.objectDelete(ctx, p.BktInfo.CID, id); err != nil {
|
if err = n.objectDelete(ctx, p.BktInfo.CID, id); err != nil {
|
||||||
n.log.Warn("couldn't delete system object",
|
n.log.Warn("couldn't delete system object",
|
||||||
|
@ -79,63 +117,52 @@ func (n *layer) putSystemObject(ctx context.Context, p *PutSystemObjectParams) (
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return meta, nil
|
return objInfoFromMeta(p.BktInfo, meta), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) headSystemObject(ctx context.Context, bkt *data.BucketInfo, objName string) (*data.ObjectInfo, error) {
|
func (n *layer) getSystemObjectFromNeoFS(ctx context.Context, bkt *data.BucketInfo, objName string) (*object.Object, error) {
|
||||||
if meta := n.systemCache.Get(systemObjectKey(bkt, objName)); meta != nil {
|
|
||||||
return objInfoFromMeta(bkt, meta), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
versions, err := n.headSystemVersions(ctx, bkt, objName)
|
versions, err := n.headSystemVersions(ctx, bkt, objName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return versions.getLast(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *layer) getSystemObject(ctx context.Context, bktInfo *data.BucketInfo, objName string) (*object.Object, error) {
|
|
||||||
if meta := n.systemCache.Get(systemObjectKey(bktInfo, objName)); meta != nil {
|
|
||||||
return meta, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
versions, err := n.headSystemVersions(ctx, bktInfo, objName)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
objInfo := versions.getLast()
|
objInfo := versions.getLast()
|
||||||
|
|
||||||
obj, err := n.objectGet(ctx, bktInfo.CID, objInfo.ID)
|
obj, err := n.objectGet(ctx, bkt.CID, objInfo.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = n.systemCache.Put(systemObjectKey(bktInfo, objName), obj); err != nil {
|
if len(obj.Payload()) == 0 {
|
||||||
n.log.Warn("couldn't put system meta to objects cache",
|
return nil, errors.GetAPIError(errors.ErrInternalError)
|
||||||
zap.Stringer("object id", obj.ID()),
|
|
||||||
zap.Stringer("bucket id", obj.ContainerID()),
|
|
||||||
zap.Error(err))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return obj, nil
|
return obj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) deleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error {
|
func (n *layer) getCORS(ctx context.Context, bkt *data.BucketInfo, sysName string) (*data.CORSConfiguration, error) {
|
||||||
ids, err := n.objectSearch(ctx, &findParams{cid: bktInfo.CID, attr: objectSystemAttributeName, val: name})
|
if cors := n.systemCache.GetCORS(systemObjectKey(bkt, sysName)); cors != nil {
|
||||||
|
return cors, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
obj, err := n.getSystemObjectFromNeoFS(ctx, bkt, sysName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, id := range ids {
|
cors := &data.CORSConfiguration{}
|
||||||
if err = n.objectDelete(ctx, bktInfo.CID, id); err != nil {
|
|
||||||
return err
|
if err = xml.Unmarshal(obj.Payload(), &cors); err != nil {
|
||||||
}
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
n.systemCache.Delete(systemObjectKey(bktInfo, name))
|
if err = n.systemCache.PutCORS(systemObjectKey(bkt, sysName), cors); err != nil {
|
||||||
return nil
|
n.log.Warn("couldn't put system meta to objects cache",
|
||||||
|
zap.Stringer("object id", obj.ID()),
|
||||||
|
zap.Stringer("bucket id", bkt.CID),
|
||||||
|
zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
return cors, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) headSystemVersions(ctx context.Context, bkt *data.BucketInfo, sysName string) (*objectVersions, error) {
|
func (n *layer) headSystemVersions(ctx context.Context, bkt *data.BucketInfo, sysName string) (*objectVersions, error) {
|
||||||
|
@ -144,9 +171,6 @@ func (n *layer) headSystemVersions(ctx context.Context, bkt *data.BucketInfo, sy
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// should be changed when system cache will store payload instead of meta
|
|
||||||
metas := make(map[string]*object.Object, len(ids))
|
|
||||||
|
|
||||||
versions := newObjectVersions(sysName)
|
versions := newObjectVersions(sysName)
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
meta, err := n.objectHead(ctx, bkt.CID, id)
|
meta, err := n.objectHead(ctx, bkt.CID, id)
|
||||||
|
@ -163,7 +187,6 @@ func (n *layer) headSystemVersions(ctx context.Context, bkt *data.BucketInfo, sy
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
versions.appendVersion(oi)
|
versions.appendVersion(oi)
|
||||||
metas[oi.Version()] = meta
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,13 +195,6 @@ func (n *layer) headSystemVersions(ctx context.Context, bkt *data.BucketInfo, sy
|
||||||
return nil, errors.GetAPIError(errors.ErrNoSuchKey)
|
return nil, errors.GetAPIError(errors.ErrNoSuchKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = n.systemCache.Put(systemObjectKey(bkt, sysName), metas[lastVersion.Version()]); err != nil {
|
|
||||||
n.log.Warn("couldn't put system meta to objects cache",
|
|
||||||
zap.Stringer("object id", lastVersion.ID),
|
|
||||||
zap.Stringer("bucket id", bkt.CID),
|
|
||||||
zap.Error(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
return versions, nil
|
return versions, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -253,15 +253,10 @@ func (n *layer) PutBucketVersioning(ctx context.Context, p *PutVersioningParams)
|
||||||
ObjName: bktInfo.SettingsObjectName(),
|
ObjName: bktInfo.SettingsObjectName(),
|
||||||
Metadata: metadata,
|
Metadata: metadata,
|
||||||
Prefix: "",
|
Prefix: "",
|
||||||
Payload: nil,
|
Reader: nil,
|
||||||
}
|
}
|
||||||
|
|
||||||
meta, err := n.putSystemObject(ctx, s)
|
return n.putSystemObject(ctx, s)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return objInfoFromMeta(bktInfo, meta), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) GetBucketVersioning(ctx context.Context, bucketName string) (*BucketSettings, error) {
|
func (n *layer) GetBucketVersioning(ctx context.Context, bucketName string) (*BucketSettings, error) {
|
||||||
|
|
Loading…
Reference in a new issue