From 91bed7601054074112455e1d8aa37ec5dfa1fc0c Mon Sep 17 00:00:00 2001 From: Angira Kekteeva Date: Wed, 13 Oct 2021 21:50:02 +0300 Subject: [PATCH] [#274] Refactor system cache and cors Signed-off-by: Angira Kekteeva --- api/cache/system.go | 30 +++++++-- api/data/info.go | 17 +++++ api/handler/cors.go | 83 +++-------------------- api/layer/cors.go | 70 +++++++++++++++++--- api/layer/layer.go | 12 ++-- api/layer/system_object.go | 130 +++++++++++++++++++++---------------- api/layer/versioning.go | 9 +-- 7 files changed, 192 insertions(+), 159 deletions(-) diff --git a/api/cache/system.go b/api/cache/system.go index 3c6583c9..71d8325f 100644 --- a/api/cache/system.go +++ b/api/cache/system.go @@ -4,7 +4,7 @@ import ( "time" "github.com/bluele/gcache" - "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-s3-gw/api/data" ) // SystemCache provides lru cache for objects. @@ -32,14 +32,14 @@ func NewSystemCache(config *Config) *SystemCache { return &SystemCache{cache: gc} } -// Get returns cached object. -func (o *SystemCache) Get(key string) *object.Object { +// GetObject returns cached object. +func (o *SystemCache) GetObject(key string) *data.ObjectInfo { entry, err := o.cache.Get(key) if err != nil { return nil } - result, ok := entry.(*object.Object) + result, ok := entry.(*data.ObjectInfo) if !ok { return nil } @@ -47,8 +47,26 @@ func (o *SystemCache) Get(key string) *object.Object { return result } -// Put puts an object to cache. -func (o *SystemCache) Put(key string, obj *object.Object) error { +func (o *SystemCache) GetCORS(key string) *data.CORSConfiguration { + entry, err := o.cache.Get(key) + if err != nil { + return nil + } + + result, ok := entry.(*data.CORSConfiguration) + if !ok { + return nil + } + + return result +} + +// PutObject puts an object to cache. +func (o *SystemCache) PutObject(key string, obj *data.ObjectInfo) error { + return o.cache.Set(key, obj) +} + +func (o *SystemCache) PutCORS(key string, obj *data.CORSConfiguration) error { return o.cache.Set(key, obj) } diff --git a/api/data/info.go b/api/data/info.go index ab3f24b5..100b5bb5 100644 --- a/api/data/info.go +++ b/api/data/info.go @@ -1,6 +1,7 @@ package data import ( + "encoding/xml" "time" cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" @@ -39,6 +40,22 @@ type ( Owner *owner.ID Headers map[string]string } + + // CORSConfiguration stores CORS configuration of a request. + CORSConfiguration struct { + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CORSConfiguration" json:"-"` + CORSRules []CORSRule `xml:"CORSRule" json:"CORSRules"` + } + + // CORSRule stores rules for CORS in a bucket. + CORSRule struct { + ID string `xml:"ID,omitempty" json:"ID,omitempty"` + AllowedHeaders []string `xml:"AllowedHeader" json:"AllowedHeaders"` + AllowedMethods []string `xml:"AllowedMethod" json:"AllowedMethods"` + AllowedOrigins []string `xml:"AllowedOrigin" json:"AllowedOrigins"` + ExposeHeaders []string `xml:"ExposeHeader" json:"ExposeHeaders"` + MaxAgeSeconds int `xml:"MaxAgeSeconds,omitempty" json:"MaxAgeSeconds,omitempty"` + } ) // SettingsObjectName is system name for bucket settings file. diff --git a/api/handler/cors.go b/api/handler/cors.go index b0b21e01..8cbb8990 100644 --- a/api/handler/cors.go +++ b/api/handler/cors.go @@ -1,8 +1,6 @@ package handler import ( - "encoding/xml" - "fmt" "net/http" "strconv" "strings" @@ -12,31 +10,12 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/layer" ) -type ( - // CORSConfiguration stores CORS configuration of a request. - CORSConfiguration struct { - XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CORSConfiguration" json:"-"` - CORSRules []CORSRule `xml:"CORSRule" json:"CORSRules"` - } - // CORSRule stores rules for CORS in a bucket. - CORSRule struct { - ID string `xml:"ID,omitempty" json:"ID,omitempty"` - AllowedHeaders []string `xml:"AllowedHeader" json:"AllowedHeaders"` - AllowedMethods []string `xml:"AllowedMethod" json:"AllowedMethods"` - AllowedOrigins []string `xml:"AllowedOrigin" json:"AllowedOrigins"` - ExposeHeaders []string `xml:"ExposeHeader" json:"ExposeHeaders"` - MaxAgeSeconds int `xml:"MaxAgeSeconds,omitempty" json:"MaxAgeSeconds,omitempty"` - } -) - const ( // DefaultMaxAge -- default value of Access-Control-Max-Age if this value is not set in a rule. DefaultMaxAge = 600 wildcard = "*" ) -var supportedMethods = map[string]struct{}{"GET": {}, "HEAD": {}, "POST": {}, "PUT": {}, "DELETE": {}} - func (h *handler) GetBucketCorsHandler(w http.ResponseWriter, r *http.Request) { reqInfo := api.GetReqInfo(r.Context()) @@ -51,13 +30,16 @@ func (h *handler) GetBucketCorsHandler(w http.ResponseWriter, r *http.Request) { return } - info, err := h.obj.GetBucketCORS(r.Context(), bktInfo) + cors, err := h.obj.GetBucketCORS(r.Context(), bktInfo) if err != nil { h.logAndSendError(w, "could not get cors", reqInfo, err) return } - api.WriteResponse(w, http.StatusOK, info, api.MimeNone) + if err = api.EncodeToResponse(w, cors); err != nil { + h.logAndSendError(w, "could not encode cors to response", reqInfo, err) + return + } } func (h *handler) PutBucketCorsHandler(w http.ResponseWriter, r *http.Request) { @@ -74,30 +56,9 @@ func (h *handler) PutBucketCorsHandler(w http.ResponseWriter, r *http.Request) { return } - cors := &CORSConfiguration{} - if err := xml.NewDecoder(r.Body).Decode(cors); err != nil { - h.logAndSendError(w, "could not parse cors configuration", reqInfo, err) - return - } - if cors.CORSRules == nil { - h.logAndSendError(w, "could not parse cors rules", reqInfo, errors.GetAPIError(errors.ErrMalformedXML)) - return - } - - if err = checkCORS(cors); err != nil { - h.logAndSendError(w, "invalid cors configuration", reqInfo, err) - return - } - - xml, err := xml.Marshal(cors) - if err != nil { - h.logAndSendError(w, "could not encode cors configuration to xml", reqInfo, err) - return - } - p := &layer.PutCORSParams{ - BktInfo: bktInfo, - CORSConfiguration: xml, + BktInfo: bktInfo, + Reader: r.Body, } if err = h.obj.PutBucketCORS(r.Context(), p); err != nil { @@ -146,14 +107,11 @@ func (h *handler) AppendCORSHeaders(w http.ResponseWriter, r *http.Request) { return } - info, err := h.obj.GetBucketCORS(r.Context(), bktInfo) + cors, err := h.obj.GetBucketCORS(r.Context(), bktInfo) if err != nil { return } - cors := &CORSConfiguration{} - if err = xml.Unmarshal(info, cors); err != nil { - return - } + withCredentials := r.Header.Get(api.Authorization) != "" for _, rule := range cors.CORSRules { @@ -213,16 +171,11 @@ func (h *handler) Preflight(w http.ResponseWriter, r *http.Request) { headers = strings.Split(requestHeaders, ", ") } - info, err := h.obj.GetBucketCORS(r.Context(), bktInfo) + cors, err := h.obj.GetBucketCORS(r.Context(), bktInfo) if err != nil { h.logAndSendError(w, "could not get cors", reqInfo, err) return } - cors := &CORSConfiguration{} - if err = xml.Unmarshal(info, cors); err != nil { - h.logAndSendError(w, "could not parse cors configuration", reqInfo, err) - return - } for _, rule := range cors.CORSRules { for _, o := range rule.AllowedOrigins { @@ -258,22 +211,6 @@ func (h *handler) Preflight(w http.ResponseWriter, r *http.Request) { h.logAndSendError(w, "Forbidden", reqInfo, errors.GetAPIError(errors.ErrAccessDenied)) } -func checkCORS(cors *CORSConfiguration) error { - for _, r := range cors.CORSRules { - for _, m := range r.AllowedMethods { - if _, ok := supportedMethods[m]; !ok { - return errors.GetAPIErrorWithError(errors.ErrCORSUnsupportedMethod, fmt.Errorf("unsupported method is %s", m)) - } - } - for _, h := range r.ExposeHeaders { - if h == wildcard { - return errors.GetAPIError(errors.ErrCORSWildcardExposeHeaders) - } - } - } - return nil -} - func checkSubslice(slice []string, subSlice []string) bool { if sliceContains(slice, wildcard) { return true diff --git a/api/layer/cors.go b/api/layer/cors.go index 87033e36..7a5b757c 100644 --- a/api/layer/cors.go +++ b/api/layer/cors.go @@ -1,28 +1,66 @@ package layer import ( + "bytes" "context" + "encoding/xml" + "fmt" + "io" "github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/errors" + "go.uber.org/zap" ) +const wildcard = "*" + +var supportedMethods = map[string]struct{}{"GET": {}, "HEAD": {}, "POST": {}, "PUT": {}, "DELETE": {}} + func (n *layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error { + var ( + buf bytes.Buffer + tee = io.TeeReader(p.Reader, &buf) + cors = &data.CORSConfiguration{} + ) + + if err := xml.NewDecoder(tee).Decode(cors); err != nil { + return err + } + + if cors.CORSRules == nil { + return errors.GetAPIError(errors.ErrMalformedXML) + } + + if err := checkCORS(cors); err != nil { + return err + } + s := &PutSystemObjectParams{ BktInfo: p.BktInfo, ObjName: p.BktInfo.CORSObjectName(), Metadata: map[string]string{}, Prefix: "", - Payload: p.CORSConfiguration, + Reader: &buf, } - _, err := n.putSystemObject(ctx, s) + obj, err := n.putSystemObjectIntoNeoFS(ctx, s) + if err != nil { + return err + } - return err + if obj.Size == 0 { + return errors.GetAPIError(errors.ErrInternalError) + } + + if err = n.systemCache.PutCORS(systemObjectKey(p.BktInfo, s.ObjName), cors); err != nil { + n.log.Error("couldn't cache system object", zap.Error(err)) + } + + return nil } -func (n *layer) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]byte, error) { - obj, err := n.getSystemObject(ctx, bktInfo, bktInfo.CORSObjectName()) +func (n *layer) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*data.CORSConfiguration, error) { + cors, err := n.getCORS(ctx, bktInfo, bktInfo.CORSObjectName()) if err != nil { if errors.IsS3Error(err, errors.ErrNoSuchKey) { return nil, errors.GetAPIError(errors.ErrNoSuchCORSConfiguration) @@ -30,13 +68,25 @@ func (n *layer) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([] return nil, err } - if obj.Payload() == nil { - return nil, errors.GetAPIError(errors.ErrInternalError) - } - - return obj.Payload(), nil + return cors, nil } func (n *layer) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error { return n.deleteSystemObject(ctx, bktInfo, bktInfo.CORSObjectName()) } + +func checkCORS(cors *data.CORSConfiguration) error { + for _, r := range cors.CORSRules { + for _, m := range r.AllowedMethods { + if _, ok := supportedMethods[m]; !ok { + return errors.GetAPIErrorWithError(errors.ErrCORSUnsupportedMethod, fmt.Errorf("unsupported method is %s", m)) + } + } + for _, h := range r.ExposeHeaders { + if h == wildcard { + return errors.GetAPIError(errors.ErrCORSWildcardExposeHeaders) + } + } + } + return nil +} diff --git a/api/layer/layer.go b/api/layer/layer.go index b959cded..57a4ad1c 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -93,8 +93,8 @@ type ( // PutCORSParams stores PutCORS request parameters. PutCORSParams struct { - BktInfo *data.BucketInfo - CORSConfiguration []byte + BktInfo *data.BucketInfo + Reader io.Reader } // BucketSettings stores settings such as versioning. @@ -134,7 +134,7 @@ type ( ObjName string Metadata map[string]string Prefix string - Payload []byte + Reader io.Reader } // ListObjectVersionsParams stores list objects versions parameters. @@ -175,7 +175,7 @@ type ( GetBucketVersioning(ctx context.Context, name string) (*BucketSettings, error) PutBucketCORS(ctx context.Context, p *PutCORSParams) error - GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]byte, error) + GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*data.CORSConfiguration, error) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error ListBuckets(ctx context.Context) ([]*data.BucketInfo, error) @@ -453,7 +453,7 @@ func (n *layer) PutObjectTagging(ctx context.Context, p *PutTaggingParams) error ObjName: p.ObjectInfo.TagsObject(), Metadata: p.TagSet, Prefix: tagPrefix, - Payload: nil, + Reader: nil, } if _, err := n.putSystemObject(ctx, s); err != nil { @@ -475,7 +475,7 @@ func (n *layer) PutBucketTagging(ctx context.Context, bucketName string, tagSet ObjName: formBucketTagObjectName(bucketName), Metadata: tagSet, Prefix: tagPrefix, - Payload: nil, + Reader: nil, } if _, err = n.putSystemObject(ctx, s); err != nil { diff --git a/api/layer/system_object.go b/api/layer/system_object.go index 468cc0f1..104d13a5 100644 --- a/api/layer/system_object.go +++ b/api/layer/system_object.go @@ -1,8 +1,8 @@ package layer import ( - "bytes" "context" + "encoding/xml" "strconv" "time" @@ -13,7 +13,53 @@ import ( "go.uber.org/zap" ) -func (n *layer) putSystemObject(ctx context.Context, p *PutSystemObjectParams) (*object.Object, error) { +func (n *layer) putSystemObject(ctx context.Context, p *PutSystemObjectParams) (*data.ObjectInfo, error) { + objInfo, err := n.putSystemObjectIntoNeoFS(ctx, p) + if err != nil { + return nil, err + } + + if err = n.systemCache.PutObject(systemObjectKey(p.BktInfo, p.ObjName), objInfo); err != nil { + n.log.Error("couldn't cache system object", zap.Error(err)) + } + + return objInfo, nil +} + +func (n *layer) headSystemObject(ctx context.Context, bkt *data.BucketInfo, objName string) (*data.ObjectInfo, error) { + if objInfo := n.systemCache.GetObject(systemObjectKey(bkt, objName)); objInfo != nil { + return objInfo, nil + } + + versions, err := n.headSystemVersions(ctx, bkt, objName) + if err != nil { + return nil, err + } + + if err = n.systemCache.PutObject(systemObjectKey(bkt, objName), versions.getLast()); err != nil { + n.log.Error("couldn't cache system object", zap.Error(err)) + } + + return versions.getLast(), nil +} + +func (n *layer) deleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error { + ids, err := n.objectSearch(ctx, &findParams{cid: bktInfo.CID, attr: objectSystemAttributeName, val: name}) + if err != nil { + return err + } + + for _, id := range ids { + if err = n.objectDelete(ctx, bktInfo.CID, id); err != nil { + return err + } + } + + n.systemCache.Delete(systemObjectKey(bktInfo, name)) + return nil +} + +func (n *layer) putSystemObjectIntoNeoFS(ctx context.Context, p *PutSystemObjectParams) (*data.ObjectInfo, error) { versions, err := n.headSystemVersions(ctx, p.BktInfo, p.ObjName) if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) { return nil, err @@ -51,7 +97,7 @@ func (n *layer) putSystemObject(ctx context.Context, p *PutSystemObjectParams) ( raw.SetContainerID(p.BktInfo.CID) raw.SetAttributes(attributes...) - ops := new(client.PutObjectParams).WithObject(raw.Object()).WithPayloadReader(bytes.NewReader(p.Payload)) + ops := new(client.PutObjectParams).WithObject(raw.Object()).WithPayloadReader(p.Reader) oid, err := n.pool.PutObject(ctx, ops, n.BearerOpt(ctx)) if err != nil { return nil, err @@ -62,14 +108,6 @@ func (n *layer) putSystemObject(ctx context.Context, p *PutSystemObjectParams) ( return nil, err } - if p.Payload != nil { - meta.ToV2().SetPayload(p.Payload) - } - - if err = n.systemCache.Put(systemObjectKey(p.BktInfo, p.ObjName), meta); err != nil { - n.log.Error("couldn't cache system object", zap.Error(err)) - } - for _, id := range idsToDeleteArr { if err = n.objectDelete(ctx, p.BktInfo.CID, id); err != nil { n.log.Warn("couldn't delete system object", @@ -79,63 +117,52 @@ func (n *layer) putSystemObject(ctx context.Context, p *PutSystemObjectParams) ( } } - return meta, nil + return objInfoFromMeta(p.BktInfo, meta), nil } -func (n *layer) headSystemObject(ctx context.Context, bkt *data.BucketInfo, objName string) (*data.ObjectInfo, error) { - if meta := n.systemCache.Get(systemObjectKey(bkt, objName)); meta != nil { - return objInfoFromMeta(bkt, meta), nil - } - +func (n *layer) getSystemObjectFromNeoFS(ctx context.Context, bkt *data.BucketInfo, objName string) (*object.Object, error) { versions, err := n.headSystemVersions(ctx, bkt, objName) if err != nil { return nil, err } - return versions.getLast(), nil -} - -func (n *layer) getSystemObject(ctx context.Context, bktInfo *data.BucketInfo, objName string) (*object.Object, error) { - if meta := n.systemCache.Get(systemObjectKey(bktInfo, objName)); meta != nil { - return meta, nil - } - - versions, err := n.headSystemVersions(ctx, bktInfo, objName) - if err != nil { - return nil, err - } - objInfo := versions.getLast() - obj, err := n.objectGet(ctx, bktInfo.CID, objInfo.ID) + obj, err := n.objectGet(ctx, bkt.CID, objInfo.ID) if err != nil { return nil, err } - if err = n.systemCache.Put(systemObjectKey(bktInfo, objName), obj); err != nil { - n.log.Warn("couldn't put system meta to objects cache", - zap.Stringer("object id", obj.ID()), - zap.Stringer("bucket id", obj.ContainerID()), - zap.Error(err)) + if len(obj.Payload()) == 0 { + return nil, errors.GetAPIError(errors.ErrInternalError) } - return obj, nil } -func (n *layer) deleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error { - ids, err := n.objectSearch(ctx, &findParams{cid: bktInfo.CID, attr: objectSystemAttributeName, val: name}) +func (n *layer) getCORS(ctx context.Context, bkt *data.BucketInfo, sysName string) (*data.CORSConfiguration, error) { + if cors := n.systemCache.GetCORS(systemObjectKey(bkt, sysName)); cors != nil { + return cors, nil + } + + obj, err := n.getSystemObjectFromNeoFS(ctx, bkt, sysName) if err != nil { - return err + return nil, err } - for _, id := range ids { - if err = n.objectDelete(ctx, bktInfo.CID, id); err != nil { - return err - } + cors := &data.CORSConfiguration{} + + if err = xml.Unmarshal(obj.Payload(), &cors); err != nil { + return nil, err } - n.systemCache.Delete(systemObjectKey(bktInfo, name)) - return nil + if err = n.systemCache.PutCORS(systemObjectKey(bkt, sysName), cors); err != nil { + n.log.Warn("couldn't put system meta to objects cache", + zap.Stringer("object id", obj.ID()), + zap.Stringer("bucket id", bkt.CID), + zap.Error(err)) + } + + return cors, nil } func (n *layer) headSystemVersions(ctx context.Context, bkt *data.BucketInfo, sysName string) (*objectVersions, error) { @@ -144,9 +171,6 @@ func (n *layer) headSystemVersions(ctx context.Context, bkt *data.BucketInfo, sy return nil, err } - // should be changed when system cache will store payload instead of meta - metas := make(map[string]*object.Object, len(ids)) - versions := newObjectVersions(sysName) for _, id := range ids { meta, err := n.objectHead(ctx, bkt.CID, id) @@ -163,7 +187,6 @@ func (n *layer) headSystemVersions(ctx context.Context, bkt *data.BucketInfo, sy continue } versions.appendVersion(oi) - metas[oi.Version()] = meta } } @@ -172,13 +195,6 @@ func (n *layer) headSystemVersions(ctx context.Context, bkt *data.BucketInfo, sy return nil, errors.GetAPIError(errors.ErrNoSuchKey) } - if err = n.systemCache.Put(systemObjectKey(bkt, sysName), metas[lastVersion.Version()]); err != nil { - n.log.Warn("couldn't put system meta to objects cache", - zap.Stringer("object id", lastVersion.ID), - zap.Stringer("bucket id", bkt.CID), - zap.Error(err)) - } - return versions, nil } diff --git a/api/layer/versioning.go b/api/layer/versioning.go index 73fe96a0..51aaeb20 100644 --- a/api/layer/versioning.go +++ b/api/layer/versioning.go @@ -253,15 +253,10 @@ func (n *layer) PutBucketVersioning(ctx context.Context, p *PutVersioningParams) ObjName: bktInfo.SettingsObjectName(), Metadata: metadata, Prefix: "", - Payload: nil, + Reader: nil, } - meta, err := n.putSystemObject(ctx, s) - if err != nil { - return nil, err - } - - return objInfoFromMeta(bktInfo, meta), nil + return n.putSystemObject(ctx, s) } func (n *layer) GetBucketVersioning(ctx context.Context, bucketName string) (*BucketSettings, error) {