forked from TrueCloudLab/frostfs-s3-gw
[#449] Add tree service for object tagging
Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
This commit is contained in:
parent
9c74cca9af
commit
99feb1d936
13 changed files with 367 additions and 152 deletions
18
api/cache/system.go
vendored
18
api/cache/system.go
vendored
|
@ -104,6 +104,20 @@ func (o *SystemCache) GetNotificationConfiguration(key string) *data.Notificatio
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *SystemCache) GetObjectTagging(key string) map[string]string {
|
||||||
|
entry, err := o.cache.Get(key)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
result, ok := entry.(map[string]string)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
// PutObject puts an object to cache.
|
// PutObject puts an object to cache.
|
||||||
func (o *SystemCache) PutObject(key string, obj *data.ObjectInfo) error {
|
func (o *SystemCache) PutObject(key string, obj *data.ObjectInfo) error {
|
||||||
return o.cache.Set(key, obj)
|
return o.cache.Set(key, obj)
|
||||||
|
@ -121,6 +135,10 @@ func (o *SystemCache) PutNotificationConfiguration(key string, obj *data.Notific
|
||||||
return o.cache.Set(key, obj)
|
return o.cache.Set(key, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *SystemCache) PutObjectTagging(key string, tagSet map[string]string) error {
|
||||||
|
return o.cache.Set(key, tagSet)
|
||||||
|
}
|
||||||
|
|
||||||
// Delete deletes an object from cache.
|
// Delete deletes an object from cache.
|
||||||
func (o *SystemCache) Delete(key string) bool {
|
func (o *SystemCache) Delete(key string) bool {
|
||||||
return o.cache.Remove(key)
|
return o.cache.Remove(key)
|
||||||
|
|
|
@ -102,9 +102,6 @@ func (o *ObjectInfo) Address() oid.Address {
|
||||||
return addr
|
return addr
|
||||||
}
|
}
|
||||||
|
|
||||||
// TagsObject returns the name of a system object for tags.
|
|
||||||
func (o *ObjectInfo) TagsObject() string { return ".tagset." + o.Name + "." + o.Version() }
|
|
||||||
|
|
||||||
// LegalHoldObject returns the name of a system object for a lock object.
|
// LegalHoldObject returns the name of a system object for a lock object.
|
||||||
func (o *ObjectInfo) LegalHoldObject() string { return ".lock." + o.Name + "." + o.Version() }
|
func (o *ObjectInfo) LegalHoldObject() string { return ".lock." + o.Name + "." + o.Version() }
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ package data
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/user"
|
"github.com/nspcc-dev/neofs-sdk-go/user"
|
||||||
)
|
)
|
||||||
|
@ -35,3 +36,9 @@ type BaseNodeVersion struct {
|
||||||
OID oid.ID
|
OID oid.ID
|
||||||
Timestamp uint64
|
Timestamp uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ObjectTaggingInfo struct {
|
||||||
|
CnrID *cid.ID
|
||||||
|
ObjName string
|
||||||
|
VersionID string
|
||||||
|
}
|
||||||
|
|
|
@ -138,7 +138,13 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
tagSet, err := h.obj.GetObjectTagging(r.Context(), info)
|
t := &data.ObjectTaggingInfo{
|
||||||
|
CnrID: &info.CID,
|
||||||
|
ObjName: info.Name,
|
||||||
|
VersionID: info.Version(),
|
||||||
|
}
|
||||||
|
|
||||||
|
tagSet, err := h.obj.GetObjectTagging(r.Context(), t)
|
||||||
if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) {
|
if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) {
|
||||||
h.logAndSendError(w, "could not get object tag set", reqInfo, err)
|
h.logAndSendError(w, "could not get object tag set", reqInfo, err)
|
||||||
return
|
return
|
||||||
|
|
|
@ -62,7 +62,13 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
tagSet, err := h.obj.GetObjectTagging(r.Context(), info)
|
t := &data.ObjectTaggingInfo{
|
||||||
|
CnrID: &info.CID,
|
||||||
|
ObjName: info.Name,
|
||||||
|
VersionID: info.Version(),
|
||||||
|
}
|
||||||
|
|
||||||
|
tagSet, err := h.obj.GetObjectTagging(r.Context(), t)
|
||||||
if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) {
|
if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) {
|
||||||
h.logAndSendError(w, "could not get object tag set", reqInfo, err)
|
h.logAndSendError(w, "could not get object tag set", reqInfo, err)
|
||||||
return
|
return
|
||||||
|
|
|
@ -412,11 +412,12 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(uploadData.TagSet) != 0 {
|
if len(uploadData.TagSet) != 0 {
|
||||||
t := &layer.PutTaggingParams{
|
t := &data.ObjectTaggingInfo{
|
||||||
ObjectInfo: objInfo,
|
CnrID: &bktInfo.CID,
|
||||||
TagSet: uploadData.TagSet,
|
ObjName: objInfo.Name,
|
||||||
|
VersionID: objInfo.Version(),
|
||||||
}
|
}
|
||||||
if err = h.obj.PutObjectTagging(r.Context(), t); err != nil {
|
if err = h.obj.PutObjectTagging(r.Context(), t, uploadData.TagSet); err != nil {
|
||||||
h.logAndSendError(w, "could not put tagging file of completed multipart upload", reqInfo, err, additional...)
|
h.logAndSendError(w, "could not put tagging file of completed multipart upload", reqInfo, err, additional...)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -252,8 +252,13 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t := &data.ObjectTaggingInfo{
|
||||||
|
CnrID: &info.CID,
|
||||||
|
ObjName: info.Name,
|
||||||
|
VersionID: info.Version(),
|
||||||
|
}
|
||||||
if tagSet != nil {
|
if tagSet != nil {
|
||||||
if err = h.obj.PutObjectTagging(r.Context(), &layer.PutTaggingParams{ObjectInfo: info, TagSet: tagSet}); err != nil {
|
if err = h.obj.PutObjectTagging(r.Context(), t, tagSet); err != nil {
|
||||||
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
|
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -374,8 +379,14 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t := &data.ObjectTaggingInfo{
|
||||||
|
CnrID: &info.CID,
|
||||||
|
ObjName: info.Name,
|
||||||
|
VersionID: info.Version(),
|
||||||
|
}
|
||||||
|
|
||||||
if tagSet != nil {
|
if tagSet != nil {
|
||||||
if err = h.obj.PutObjectTagging(r.Context(), &layer.PutTaggingParams{ObjectInfo: info, TagSet: tagSet}); err != nil {
|
if err = h.obj.PutObjectTagging(r.Context(), t, tagSet); err != nil {
|
||||||
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
|
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,8 +9,8 @@ import (
|
||||||
"unicode"
|
"unicode"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-s3-gw/api"
|
"github.com/nspcc-dev/neofs-s3-gw/api"
|
||||||
|
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||||
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
||||||
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -37,31 +37,22 @@ func (h *handler) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
p := &layer.HeadObjectParams{
|
p := &data.ObjectTaggingInfo{
|
||||||
BktInfo: bktInfo,
|
CnrID: &bktInfo.CID,
|
||||||
Object: reqInfo.ObjectName,
|
ObjName: reqInfo.ObjectName,
|
||||||
VersionID: reqInfo.URL.Query().Get("versionId"),
|
VersionID: reqInfo.URL.Query().Get("versionId"),
|
||||||
}
|
}
|
||||||
|
|
||||||
objInfo, err := h.obj.GetObjectInfo(r.Context(), p)
|
if err = h.obj.PutObjectTagging(r.Context(), p, tagSet); err != nil {
|
||||||
if err != nil {
|
|
||||||
h.logAndSendError(w, "could not get object info", reqInfo, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
p2 := &layer.PutTaggingParams{
|
|
||||||
ObjectInfo: objInfo,
|
|
||||||
TagSet: tagSet,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = h.obj.PutObjectTagging(r.Context(), p2); err != nil {
|
|
||||||
h.logAndSendError(w, "could not put object tagging", reqInfo, err)
|
h.logAndSendError(w, "could not put object tagging", reqInfo, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
s := &SendNotificationParams{
|
s := &SendNotificationParams{
|
||||||
Event: EventObjectTaggingPut,
|
Event: EventObjectTaggingPut,
|
||||||
ObjInfo: objInfo,
|
ObjInfo: &data.ObjectInfo{
|
||||||
|
Name: reqInfo.ObjectName,
|
||||||
|
},
|
||||||
BktInfo: bktInfo,
|
BktInfo: bktInfo,
|
||||||
ReqInfo: reqInfo,
|
ReqInfo: reqInfo,
|
||||||
}
|
}
|
||||||
|
@ -74,6 +65,7 @@ func (h *handler) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request
|
||||||
|
|
||||||
func (h *handler) GetObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
|
func (h *handler) GetObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
reqInfo := api.GetReqInfo(r.Context())
|
reqInfo := api.GetReqInfo(r.Context())
|
||||||
|
versionID := reqInfo.URL.Query().Get("versionId")
|
||||||
|
|
||||||
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
|
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -81,25 +73,19 @@ func (h *handler) GetObjectTaggingHandler(w http.ResponseWriter, r *http.Request
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
p := &layer.HeadObjectParams{
|
p := &data.ObjectTaggingInfo{
|
||||||
BktInfo: bktInfo,
|
CnrID: &bktInfo.CID,
|
||||||
Object: reqInfo.ObjectName,
|
ObjName: reqInfo.ObjectName,
|
||||||
VersionID: reqInfo.URL.Query().Get("versionId"),
|
VersionID: versionID,
|
||||||
}
|
}
|
||||||
|
|
||||||
objInfo, err := h.obj.GetObjectInfo(r.Context(), p)
|
tagSet, err := h.obj.GetObjectTagging(r.Context(), p)
|
||||||
if err != nil {
|
|
||||||
h.logAndSendError(w, "could not get object info", reqInfo, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
tagSet, err := h.obj.GetObjectTagging(r.Context(), objInfo)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logAndSendError(w, "could not get object tagging", reqInfo, err)
|
h.logAndSendError(w, "could not get object tagging", reqInfo, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Header().Set(api.AmzVersionID, objInfo.Version())
|
w.Header().Set(api.AmzVersionID, versionID)
|
||||||
if err = api.EncodeToResponse(w, encodeTagging(tagSet)); err != nil {
|
if err = api.EncodeToResponse(w, encodeTagging(tagSet)); err != nil {
|
||||||
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
||||||
}
|
}
|
||||||
|
@ -114,26 +100,22 @@ func (h *handler) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Requ
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
p := &layer.HeadObjectParams{
|
p := &data.ObjectTaggingInfo{
|
||||||
BktInfo: bktInfo,
|
CnrID: &bktInfo.CID,
|
||||||
Object: reqInfo.ObjectName,
|
ObjName: reqInfo.ObjectName,
|
||||||
VersionID: reqInfo.URL.Query().Get("versionId"),
|
VersionID: reqInfo.URL.Query().Get("versionId"),
|
||||||
}
|
}
|
||||||
|
|
||||||
objInfo, err := h.obj.GetObjectInfo(r.Context(), p)
|
if err = h.obj.DeleteObjectTagging(r.Context(), p); err != nil {
|
||||||
if err != nil {
|
|
||||||
h.logAndSendError(w, "could not get object info", reqInfo, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = h.obj.DeleteObjectTagging(r.Context(), bktInfo, objInfo); err != nil {
|
|
||||||
h.logAndSendError(w, "could not delete object tagging", reqInfo, err)
|
h.logAndSendError(w, "could not delete object tagging", reqInfo, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
s := &SendNotificationParams{
|
s := &SendNotificationParams{
|
||||||
Event: EventObjectTaggingDelete,
|
Event: EventObjectTaggingDelete,
|
||||||
ObjInfo: objInfo,
|
ObjInfo: &data.ObjectInfo{
|
||||||
|
Name: reqInfo.ObjectName,
|
||||||
|
},
|
||||||
BktInfo: bktInfo,
|
BktInfo: bktInfo,
|
||||||
ReqInfo: reqInfo,
|
ReqInfo: reqInfo,
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
|
@ -189,12 +188,6 @@ type (
|
||||||
Error error
|
Error error
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutTaggingParams stores tag set params.
|
|
||||||
PutTaggingParams struct {
|
|
||||||
ObjectInfo *data.ObjectInfo
|
|
||||||
TagSet map[string]string
|
|
||||||
}
|
|
||||||
|
|
||||||
// Client provides S3 API client interface.
|
// Client provides S3 API client interface.
|
||||||
Client interface {
|
Client interface {
|
||||||
Initialize(ctx context.Context, c EventListener) error
|
Initialize(ctx context.Context, c EventListener) error
|
||||||
|
@ -217,13 +210,17 @@ type (
|
||||||
GetObject(ctx context.Context, p *GetObjectParams) error
|
GetObject(ctx context.Context, p *GetObjectParams) error
|
||||||
HeadSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) (*data.ObjectInfo, error)
|
HeadSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) (*data.ObjectInfo, error)
|
||||||
GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ObjectInfo, error)
|
GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ObjectInfo, error)
|
||||||
GetObjectTagging(ctx context.Context, p *data.ObjectInfo) (map[string]string, error)
|
|
||||||
GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error)
|
GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error)
|
||||||
|
PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error
|
||||||
|
DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error
|
||||||
|
|
||||||
|
GetObjectTagging(ctx context.Context, p *data.ObjectTaggingInfo) (map[string]string, error)
|
||||||
|
PutObjectTagging(ctx context.Context, p *data.ObjectTaggingInfo, tagSet map[string]string) error
|
||||||
|
DeleteObjectTagging(ctx context.Context, p *data.ObjectTaggingInfo) error
|
||||||
|
|
||||||
PutObject(ctx context.Context, p *PutObjectParams) (*data.ObjectInfo, error)
|
PutObject(ctx context.Context, p *PutObjectParams) (*data.ObjectInfo, error)
|
||||||
PutSystemObject(ctx context.Context, p *PutSystemObjectParams) (*data.ObjectInfo, error)
|
PutSystemObject(ctx context.Context, p *PutSystemObjectParams) (*data.ObjectInfo, error)
|
||||||
PutObjectTagging(ctx context.Context, p *PutTaggingParams) error
|
|
||||||
PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error
|
|
||||||
|
|
||||||
CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ObjectInfo, error)
|
CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ObjectInfo, error)
|
||||||
|
|
||||||
|
@ -233,8 +230,6 @@ type (
|
||||||
|
|
||||||
DeleteObjects(ctx context.Context, p *DeleteObjectParams) ([]*VersionedObject, error)
|
DeleteObjects(ctx context.Context, p *DeleteObjectParams) ([]*VersionedObject, error)
|
||||||
DeleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error
|
DeleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error
|
||||||
DeleteObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objInfo *data.ObjectInfo) error
|
|
||||||
DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error
|
|
||||||
|
|
||||||
CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*data.ObjectInfo, error)
|
CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*data.ObjectInfo, error)
|
||||||
UploadPart(ctx context.Context, p *UploadPartParams) (*data.ObjectInfo, error)
|
UploadPart(ctx context.Context, p *UploadPartParams) (*data.ObjectInfo, error)
|
||||||
|
@ -430,92 +425,6 @@ func (n *layer) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.O
|
||||||
return n.headVersion(ctx, p.BktInfo, p)
|
return n.headVersion(ctx, p.BktInfo, p)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetObjectTagging from storage.
|
|
||||||
func (n *layer) GetObjectTagging(ctx context.Context, oi *data.ObjectInfo) (map[string]string, error) {
|
|
||||||
bktInfo := &data.BucketInfo{
|
|
||||||
Name: oi.Bucket,
|
|
||||||
CID: oi.CID,
|
|
||||||
Owner: oi.Owner,
|
|
||||||
}
|
|
||||||
|
|
||||||
objInfo, err := n.HeadSystemObject(ctx, bktInfo, oi.TagsObject())
|
|
||||||
if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return formTagSet(objInfo), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetBucketTagging from storage.
|
|
||||||
func (n *layer) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {
|
|
||||||
objInfo, err := n.HeadSystemObject(ctx, bktInfo, formBucketTagObjectName(bktInfo.Name))
|
|
||||||
if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return formTagSet(objInfo), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func formTagSet(objInfo *data.ObjectInfo) map[string]string {
|
|
||||||
var tagSet map[string]string
|
|
||||||
if objInfo != nil {
|
|
||||||
tagSet = make(map[string]string, len(objInfo.Headers))
|
|
||||||
for k, v := range objInfo.Headers {
|
|
||||||
if strings.HasPrefix(k, tagPrefix) {
|
|
||||||
if v == tagEmptyMark {
|
|
||||||
v = ""
|
|
||||||
}
|
|
||||||
tagSet[strings.TrimPrefix(k, tagPrefix)] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return tagSet
|
|
||||||
}
|
|
||||||
|
|
||||||
// PutObjectTagging into storage.
|
|
||||||
func (n *layer) PutObjectTagging(ctx context.Context, p *PutTaggingParams) error {
|
|
||||||
bktInfo := &data.BucketInfo{
|
|
||||||
Name: p.ObjectInfo.Bucket,
|
|
||||||
CID: p.ObjectInfo.CID,
|
|
||||||
Owner: p.ObjectInfo.Owner,
|
|
||||||
}
|
|
||||||
|
|
||||||
s := &PutSystemObjectParams{
|
|
||||||
BktInfo: bktInfo,
|
|
||||||
ObjName: p.ObjectInfo.TagsObject(),
|
|
||||||
Metadata: p.TagSet,
|
|
||||||
Prefix: tagPrefix,
|
|
||||||
Reader: nil,
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := n.PutSystemObject(ctx, s)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// PutBucketTagging into storage.
|
|
||||||
func (n *layer) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error {
|
|
||||||
s := &PutSystemObjectParams{
|
|
||||||
BktInfo: bktInfo,
|
|
||||||
ObjName: formBucketTagObjectName(bktInfo.Name),
|
|
||||||
Metadata: tagSet,
|
|
||||||
Prefix: tagPrefix,
|
|
||||||
Reader: nil,
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := n.PutSystemObject(ctx, s)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteObjectTagging from storage.
|
|
||||||
func (n *layer) DeleteObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objInfo *data.ObjectInfo) error {
|
|
||||||
return n.DeleteSystemObject(ctx, bktInfo, objInfo.TagsObject())
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteBucketTagging from storage.
|
|
||||||
func (n *layer) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error {
|
|
||||||
return n.DeleteSystemObject(ctx, bktInfo, formBucketTagObjectName(bktInfo.Name))
|
|
||||||
}
|
|
||||||
|
|
||||||
// CopyObject from one bucket into another bucket.
|
// CopyObject from one bucket into another bucket.
|
||||||
func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ObjectInfo, error) {
|
func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ObjectInfo, error) {
|
||||||
pr, pw := io.Pipe()
|
pr, pw := io.Pipe()
|
||||||
|
@ -618,7 +527,12 @@ func (n *layer) removeVersionIfFound(ctx context.Context, bkt *data.BucketInfo,
|
||||||
return deleteMarkVersion, err
|
return deleteMarkVersion, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return deleteMarkVersion, n.DeleteObjectTagging(ctx, bkt, &data.ObjectInfo{ID: version.OID, Bucket: bkt.Name, Name: obj.Name})
|
p := &data.ObjectTaggingInfo{
|
||||||
|
CnrID: &bkt.CID,
|
||||||
|
ObjName: obj.Name,
|
||||||
|
VersionID: version.OID.EncodeToString(),
|
||||||
|
}
|
||||||
|
return deleteMarkVersion, n.DeleteObjectTagging(ctx, p)
|
||||||
}
|
}
|
||||||
|
|
||||||
return "", errors.GetAPIError(errors.ErrNoSuchVersion)
|
return "", errors.GetAPIError(errors.ErrNoSuchVersion)
|
||||||
|
|
169
api/layer/tagging.go
Normal file
169
api/layer/tagging.go
Normal file
|
@ -0,0 +1,169 @@
|
||||||
|
package layer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
errorsStd "errors"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||||
|
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (n *layer) GetObjectTagging(ctx context.Context, p *data.ObjectTaggingInfo) (map[string]string, error) {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
tags map[string]string
|
||||||
|
)
|
||||||
|
tags = n.systemCache.GetObjectTagging(objectTaggingCacheKey(p))
|
||||||
|
if tags != nil {
|
||||||
|
return tags, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
version, err := n.getTaggedObjectVersion(ctx, p)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
tags, err = n.treeService.GetObjectTagging(ctx, p.CnrID, version)
|
||||||
|
if err != nil {
|
||||||
|
if errorsStd.Is(err, ErrNodeNotFound) {
|
||||||
|
return nil, errors.GetAPIError(errors.ErrNoSuchKey)
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = n.systemCache.PutObjectTagging(objectTaggingCacheKey(p), tags); err != nil {
|
||||||
|
n.log.Error("couldn't cache system object", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
return tags, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *layer) PutObjectTagging(ctx context.Context, p *data.ObjectTaggingInfo, tagSet map[string]string) error {
|
||||||
|
version, err := n.getTaggedObjectVersion(ctx, p)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = n.treeService.PutObjectTagging(ctx, p.CnrID, version, tagSet)
|
||||||
|
if err != nil {
|
||||||
|
if errorsStd.Is(err, ErrNodeNotFound) {
|
||||||
|
return errors.GetAPIError(errors.ErrNoSuchKey)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = n.systemCache.PutObjectTagging(objectTaggingCacheKey(p), tagSet); err != nil {
|
||||||
|
n.log.Error("couldn't cache system object", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *layer) DeleteObjectTagging(ctx context.Context, p *data.ObjectTaggingInfo) error {
|
||||||
|
version, err := n.getTaggedObjectVersion(ctx, p)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = n.treeService.DeleteObjectTagging(ctx, p.CnrID, version)
|
||||||
|
if err != nil {
|
||||||
|
if errorsStd.Is(err, ErrNodeNotFound) {
|
||||||
|
return errors.GetAPIError(errors.ErrNoSuchKey)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
n.systemCache.Delete(objectTaggingCacheKey(p))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *layer) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {
|
||||||
|
objInfo, err := n.HeadSystemObject(ctx, bktInfo, formBucketTagObjectName(bktInfo.Name))
|
||||||
|
if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return formTagSet(objInfo), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *layer) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error {
|
||||||
|
s := &PutSystemObjectParams{
|
||||||
|
BktInfo: bktInfo,
|
||||||
|
ObjName: formBucketTagObjectName(bktInfo.Name),
|
||||||
|
Metadata: tagSet,
|
||||||
|
Prefix: tagPrefix,
|
||||||
|
Reader: nil,
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := n.PutSystemObject(ctx, s)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *layer) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error {
|
||||||
|
return n.DeleteSystemObject(ctx, bktInfo, formBucketTagObjectName(bktInfo.Name))
|
||||||
|
}
|
||||||
|
|
||||||
|
func formTagSet(objInfo *data.ObjectInfo) map[string]string {
|
||||||
|
var tagSet map[string]string
|
||||||
|
if objInfo != nil {
|
||||||
|
tagSet = make(map[string]string, len(objInfo.Headers))
|
||||||
|
for k, v := range objInfo.Headers {
|
||||||
|
if strings.HasPrefix(k, tagPrefix) {
|
||||||
|
if v == tagEmptyMark {
|
||||||
|
v = ""
|
||||||
|
}
|
||||||
|
tagSet[strings.TrimPrefix(k, tagPrefix)] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return tagSet
|
||||||
|
}
|
||||||
|
|
||||||
|
func objectTaggingCacheKey(p *data.ObjectTaggingInfo) string {
|
||||||
|
return ".tagset." + p.CnrID.EncodeToString() + "." + p.ObjName + "." + p.VersionID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *layer) getTaggedObjectVersion(ctx context.Context, p *data.ObjectTaggingInfo) (*data.NodeVersion, error) {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
version *data.NodeVersion
|
||||||
|
)
|
||||||
|
|
||||||
|
if p.VersionID == "null" {
|
||||||
|
if version, err = n.treeService.GetUnversioned(ctx, p.CnrID, p.ObjName); err != nil {
|
||||||
|
if strings.Contains(err.Error(), "not found") {
|
||||||
|
return nil, errors.GetAPIError(errors.ErrNoSuchKey)
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else if len(p.VersionID) == 0 {
|
||||||
|
if version, err = n.treeService.GetLatestVersion(ctx, p.CnrID, p.ObjName); err != nil {
|
||||||
|
if strings.Contains(err.Error(), "not found") {
|
||||||
|
return nil, errors.GetAPIError(errors.ErrNoSuchKey)
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
versions, err := n.treeService.GetVersions(ctx, p.CnrID, p.ObjName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, v := range versions {
|
||||||
|
if v.OID.EncodeToString() == p.VersionID {
|
||||||
|
version = v
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if version == nil || version.DeleteMarker != nil {
|
||||||
|
return nil, errors.GetAPIError(errors.ErrNoSuchKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
return version, nil
|
||||||
|
}
|
|
@ -30,6 +30,10 @@ type TreeService interface {
|
||||||
// DeleteBucketCORS removes a node from a system tree and returns objID which must be deleted in NeoFS
|
// DeleteBucketCORS removes a node from a system tree and returns objID which must be deleted in NeoFS
|
||||||
DeleteBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID, error)
|
DeleteBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID, error)
|
||||||
|
|
||||||
|
GetObjectTagging(ctx context.Context, cnrID *cid.ID, objVersion *data.NodeVersion) (map[string]string, error)
|
||||||
|
PutObjectTagging(ctx context.Context, cnrID *cid.ID, objVersion *data.NodeVersion, tagSet map[string]string) error
|
||||||
|
DeleteObjectTagging(ctx context.Context, cnrID *cid.ID, objVersion *data.NodeVersion) error
|
||||||
|
|
||||||
GetVersions(ctx context.Context, cnrID *cid.ID, objectName string) ([]*data.NodeVersion, error)
|
GetVersions(ctx context.Context, cnrID *cid.ID, objectName string) ([]*data.NodeVersion, error)
|
||||||
GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*data.NodeVersion, error)
|
GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*data.NodeVersion, error)
|
||||||
GetLatestVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]oid.ID, error)
|
GetLatestVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]oid.ID, error)
|
||||||
|
|
|
@ -44,6 +44,7 @@ const (
|
||||||
fileNameKV = "FileName"
|
fileNameKV = "FileName"
|
||||||
systemNameKV = "SystemName"
|
systemNameKV = "SystemName"
|
||||||
isUnversionedKV = "IsUnversioned"
|
isUnversionedKV = "IsUnversioned"
|
||||||
|
isTagKV = "isTag"
|
||||||
|
|
||||||
// keys for delete marker nodes
|
// keys for delete marker nodes
|
||||||
isDeleteMarkerKV = "IdDeleteMarker"
|
isDeleteMarkerKV = "IdDeleteMarker"
|
||||||
|
@ -274,6 +275,75 @@ func (c *TreeClient) DeleteBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *TreeClient) GetObjectTagging(ctx context.Context, cnrID *cid.ID, objVersion *data.NodeVersion) (map[string]string, error) {
|
||||||
|
tagNode, err := c.getObjectTaggingNode(ctx, cnrID, objVersion)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if tagNode == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(tagNode.Meta, isTagKV)
|
||||||
|
|
||||||
|
return tagNode.Meta, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *TreeClient) PutObjectTagging(ctx context.Context, cnrID *cid.ID, objVersion *data.NodeVersion, tagSet map[string]string) error {
|
||||||
|
tagNode, err := c.getObjectTaggingNode(ctx, cnrID, objVersion)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tagSet[isTagKV] = "true"
|
||||||
|
|
||||||
|
if tagNode == nil {
|
||||||
|
_, err = c.addNode(ctx, cnrID, versionTree, objVersion.ID, tagSet)
|
||||||
|
} else {
|
||||||
|
err = c.moveNode(ctx, cnrID, versionTree, tagNode.ID, objVersion.ID, tagSet)
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(tagSet, isTagKV)
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *TreeClient) DeleteObjectTagging(ctx context.Context, cnrID *cid.ID, objVersion *data.NodeVersion) error {
|
||||||
|
tagNode, err := c.getObjectTaggingNode(ctx, cnrID, objVersion)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if tagNode == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.removeNode(ctx, cnrID, versionTree, tagNode.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *TreeClient) getObjectTaggingNode(ctx context.Context, cnrID *cid.ID, objVersion *data.NodeVersion) (*TreeNode, error) {
|
||||||
|
subtree, err := c.getSubTree(ctx, cnrID, versionTree, objVersion.ID, 1)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var tagNode *TreeNode
|
||||||
|
|
||||||
|
for _, s := range subtree {
|
||||||
|
node, err := newTreeNode(s)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if _, ok := node.Get(isTagKV); ok {
|
||||||
|
tagNode = node
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return tagNode, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *TreeClient) GetVersions(ctx context.Context, cnrID *cid.ID, filepath string) ([]*data.NodeVersion, error) {
|
func (c *TreeClient) GetVersions(ctx context.Context, cnrID *cid.ID, filepath string) ([]*data.NodeVersion, error) {
|
||||||
return c.getVersions(ctx, cnrID, versionTree, filepath, false)
|
return c.getVersions(ctx, cnrID, versionTree, filepath, false)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,21 @@ type TreeServiceMock struct {
|
||||||
system map[string]map[string]*data.BaseNodeVersion
|
system map[string]map[string]*data.BaseNodeVersion
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *TreeServiceMock) GetObjectTagging(ctx context.Context, cnrID *cid.ID, objVersion *data.NodeVersion) (map[string]string, error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TreeServiceMock) PutObjectTagging(ctx context.Context, cnrID *cid.ID, objVersion *data.NodeVersion, tagSet map[string]string) error {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TreeServiceMock) DeleteObjectTagging(ctx context.Context, cnrID *cid.ID, objVersion *data.NodeVersion) error {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
var ErrNodeNotFound = errors.New("not found")
|
var ErrNodeNotFound = errors.New("not found")
|
||||||
|
|
||||||
func NewTreeService() *TreeServiceMock {
|
func NewTreeService() *TreeServiceMock {
|
||||||
|
@ -205,3 +220,18 @@ func (t *TreeServiceMock) RemoveSystemVersion(ctx context.Context, cnrID *cid.ID
|
||||||
func (t *TreeServiceMock) GetAllVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.NodeVersion, error) {
|
func (t *TreeServiceMock) GetAllVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.NodeVersion, error) {
|
||||||
panic("implement me")
|
panic("implement me")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *TreeServiceMock) GetObjectTagging(ctx context.Context, p *data.ObjectTaggingInfo) (map[string]string, error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TreeServiceMock) PutObjectTagging(ctx context.Context, p *data.ObjectTaggingInfo) error {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TreeServiceMock) DeleteObjectTagging(ctx context.Context, p *data.ObjectTaggingInfo) error {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue