[#546] Add size and etag in nodeVersionInfo

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-07-18 17:51:34 +03:00 committed by Alex Vanin
parent 56eb2dc3dc
commit 0057f6b7db
14 changed files with 130 additions and 80 deletions

View file

@ -47,6 +47,14 @@ type (
Headers map[string]string
}
// NotificationInfo store info to send s3 notification.
NotificationInfo struct {
Name string
Version string
Size int64
HashSum string
}
// BucketSettings stores settings such as versioning.
BucketSettings struct {
Versioning string `json:"versioning"`
@ -70,6 +78,16 @@ type (
}
)
// NotificationInfoFromObject creates new NotificationInfo from ObjectInfo.
func NotificationInfoFromObject(objInfo *ObjectInfo) *NotificationInfo {
return &NotificationInfo{
Name: objInfo.Name,
Version: objInfo.Version(),
Size: objInfo.Size,
HashSum: objInfo.HashSum,
}
}
// SettingsObjectName is a system name for a bucket settings file.
func (b *BucketInfo) SettingsObjectName() string { return bktSettingsObject }

View file

@ -36,6 +36,8 @@ type BaseNodeVersion struct {
ID uint64
OID oid.ID
Timestamp uint64
Size int64
ETag string
FilePath string
}

View file

@ -377,7 +377,7 @@ func (h *handler) PutObjectACLHandler(w http.ResponseWriter, r *http.Request) {
if updated {
s := &SendNotificationParams{
Event: EventObjectACLPut,
ObjInfo: extendedInfo.ObjectInfo,
NotificationInfo: data.NotificationInfoFromObject(extendedInfo.ObjectInfo),
BktInfo: bktInfo,
ReqInfo: reqInfo,
}

View file

@ -7,6 +7,7 @@ import (
"time"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/data"
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
"github.com/nspcc-dev/neofs-sdk-go/session"
@ -166,7 +167,7 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
s := &SendNotificationParams{
Event: EventObjectCreatedCopy,
ObjInfo: info,
NotificationInfo: data.NotificationInfoFromObject(info),
BktInfo: dstBktInfo,
ReqInfo: reqInfo,
}

View file

@ -101,7 +101,7 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
if bktSettings.VersioningEnabled() && len(versionID) == 0 {
m = &SendNotificationParams{
Event: EventObjectRemovedDeleteMarkerCreated,
ObjInfo: &data.ObjectInfo{
NotificationInfo: &data.NotificationInfo{
Name: reqInfo.ObjectName,
HashSum: deletedObject.DeleteMarkerEtag,
},
@ -118,9 +118,9 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
m = &SendNotificationParams{
Event: EventObjectRemovedDelete,
ObjInfo: &data.ObjectInfo{
NotificationInfo: &data.NotificationInfo{
Name: reqInfo.ObjectName,
ID: objID,
Version: objID.EncodeToString(),
},
BktInfo: bktInfo,
ReqInfo: reqInfo,

View file

@ -9,6 +9,7 @@ import (
"github.com/google/uuid"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/data"
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
"github.com/nspcc-dev/neofs-sdk-go/session"
@ -364,7 +365,7 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
ObjectName: objInfo.Name,
VersionID: objInfo.Version(),
}
if err = h.obj.PutObjectTagging(r.Context(), t, uploadData.TagSet); err != nil {
if _, err = h.obj.PutObjectTagging(r.Context(), t, uploadData.TagSet); err != nil {
h.logAndSendError(w, "could not put tagging file of completed multipart upload", reqInfo, err, additional...)
return
}
@ -399,7 +400,7 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
s := &SendNotificationParams{
Event: EventObjectCreatedCompleteMultipartUpload,
ObjInfo: objInfo,
NotificationInfo: data.NotificationInfoFromObject(objInfo),
BktInfo: bktInfo,
ReqInfo: reqInfo,
}

View file

@ -18,7 +18,7 @@ import (
type (
SendNotificationParams struct {
Event string
ObjInfo *data.ObjectInfo
NotificationInfo *data.NotificationInfo
BktInfo *data.BucketInfo
ReqInfo *api.ReqInfo
User string
@ -163,7 +163,7 @@ func (h *handler) sendNotifications(ctx context.Context, p *SendNotificationPara
p.User = bearer.ResolveIssuer(*box.Gate.BearerToken).EncodeToString()
}
topics := filterSubjects(conf, p.Event, p.ObjInfo.Name)
topics := filterSubjects(conf, p.Event, p.NotificationInfo.Name)
return h.notificator.SendNotifications(topics, p)
}

View file

@ -237,7 +237,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
s := &SendNotificationParams{
Event: EventObjectCreatedPut,
ObjInfo: info,
NotificationInfo: data.NotificationInfoFromObject(info),
BktInfo: bktInfo,
ReqInfo: reqInfo,
}
@ -258,7 +258,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
VersionID: info.Version(),
}
if tagSet != nil {
if err = h.obj.PutObjectTagging(r.Context(), t, tagSet); err != nil {
if _, err = h.obj.PutObjectTagging(r.Context(), t, tagSet); err != nil {
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
return
}
@ -359,7 +359,7 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
s := &SendNotificationParams{
Event: EventObjectCreatedPost,
ObjInfo: info,
NotificationInfo: data.NotificationInfoFromObject(info),
BktInfo: bktInfo,
ReqInfo: reqInfo,
}
@ -386,7 +386,7 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
}
if tagSet != nil {
if err = h.obj.PutObjectTagging(r.Context(), t, tagSet); err != nil {
if _, err = h.obj.PutObjectTagging(r.Context(), t, tagSet); err != nil {
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
return
}

View file

@ -41,18 +41,22 @@ func (h *handler) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request
p := &layer.ObjectVersion{
BktInfo: bktInfo,
ObjectName: reqInfo.ObjectName,
VersionID: reqInfo.URL.Query().Get("versionId"),
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
}
if err = h.obj.PutObjectTagging(r.Context(), p, tagSet); err != nil {
nodeVersion, err := h.obj.PutObjectTagging(r.Context(), p, tagSet)
if err != nil {
h.logAndSendError(w, "could not put object tagging", reqInfo, err)
return
}
s := &SendNotificationParams{
Event: EventObjectTaggingPut,
ObjInfo: &data.ObjectInfo{
Name: reqInfo.ObjectName,
NotificationInfo: &data.NotificationInfo{
Name: nodeVersion.FilePath,
Size: nodeVersion.Size,
Version: nodeVersion.OID.EncodeToString(),
HashSum: nodeVersion.ETag,
},
BktInfo: bktInfo,
ReqInfo: reqInfo,
@ -111,18 +115,22 @@ func (h *handler) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Requ
p := &layer.ObjectVersion{
BktInfo: bktInfo,
ObjectName: reqInfo.ObjectName,
VersionID: reqInfo.URL.Query().Get("versionId"),
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
}
if err = h.obj.DeleteObjectTagging(r.Context(), p); err != nil {
nodeVersion, err := h.obj.DeleteObjectTagging(r.Context(), p)
if err != nil {
h.logAndSendError(w, "could not delete object tagging", reqInfo, err)
return
}
s := &SendNotificationParams{
Event: EventObjectTaggingDelete,
ObjInfo: &data.ObjectInfo{
Name: reqInfo.ObjectName,
NotificationInfo: &data.NotificationInfo{
Name: nodeVersion.FilePath,
Size: nodeVersion.Size,
Version: nodeVersion.OID.EncodeToString(),
HashSum: nodeVersion.ETag,
},
BktInfo: bktInfo,
ReqInfo: reqInfo,

View file

@ -226,8 +226,8 @@ type (
DeleteBucketTagging(ctx context.Context, cnrID cid.ID) error
GetObjectTagging(ctx context.Context, p *ObjectVersion) (string, map[string]string, error)
PutObjectTagging(ctx context.Context, p *ObjectVersion, tagSet map[string]string) error
DeleteObjectTagging(ctx context.Context, p *ObjectVersion) error
PutObjectTagging(ctx context.Context, p *ObjectVersion, tagSet map[string]string) (*data.NodeVersion, error)
DeleteObjectTagging(ctx context.Context, p *ObjectVersion) (*data.NodeVersion, error)
PutObject(ctx context.Context, p *PutObjectParams) (*data.ObjectInfo, error)

View file

@ -155,7 +155,10 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object
}
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{FilePath: p.Object},
BaseNodeVersion: data.BaseNodeVersion{
FilePath: p.Object,
Size: p.Size,
},
IsUnversioned: !bktSettings.VersioningEnabled(),
}
@ -194,6 +197,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object
}
newVersion.OID = id
newVersion.ETag = hex.EncodeToString(hash)
if err = n.treeService.AddVersion(ctx, p.BktInfo.CID, newVersion); err != nil {
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
}
@ -223,7 +227,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object
Created: time.Now(),
Headers: p.Header,
ContentType: p.Header[api.ContentType],
HashSum: hex.EncodeToString(hash),
HashSum: newVersion.ETag,
}
if err = n.objCache.PutObject(objInfo); err != nil {

View file

@ -42,45 +42,45 @@ func (n *layer) GetObjectTagging(ctx context.Context, p *ObjectVersion) (string,
return p.VersionID, tags, nil
}
func (n *layer) PutObjectTagging(ctx context.Context, p *ObjectVersion, tagSet map[string]string) error {
func (n *layer) PutObjectTagging(ctx context.Context, p *ObjectVersion, tagSet map[string]string) (*data.NodeVersion, error) {
version, err := n.getNodeVersion(ctx, p)
if err != nil {
return err
return nil, err
}
p.VersionID = version.OID.EncodeToString()
err = n.treeService.PutObjectTagging(ctx, p.BktInfo.CID, version, tagSet)
if err != nil {
if errorsStd.Is(err, ErrNodeNotFound) {
return errors.GetAPIError(errors.ErrNoSuchKey)
return nil, errors.GetAPIError(errors.ErrNoSuchKey)
}
return err
return nil, err
}
if err = n.systemCache.PutTagging(objectTaggingCacheKey(p), tagSet); err != nil {
n.log.Error("couldn't cache system object", zap.Error(err))
}
return nil
return version, nil
}
func (n *layer) DeleteObjectTagging(ctx context.Context, p *ObjectVersion) error {
func (n *layer) DeleteObjectTagging(ctx context.Context, p *ObjectVersion) (*data.NodeVersion, error) {
version, err := n.getNodeVersion(ctx, p)
if err != nil {
return err
return nil, err
}
err = n.treeService.DeleteObjectTagging(ctx, p.BktInfo.CID, version)
if err != nil {
if errorsStd.Is(err, ErrNodeNotFound) {
return errors.GetAPIError(errors.ErrNoSuchKey)
return nil, errors.GetAPIError(errors.ErrNoSuchKey)
}
return err
return nil, err
}
n.systemCache.Delete(objectTaggingCacheKey(p))
return nil
return version, nil
}
func (n *layer) GetBucketTagging(ctx context.Context, cnrID cid.ID) (map[string]string, error) {

View file

@ -240,10 +240,10 @@ func prepareEvent(p *handler.SendNotificationParams) *Event {
Arn: p.BktInfo.Name,
},
Object: Object{
Key: p.ObjInfo.Name,
Size: p.ObjInfo.Size,
VersionID: p.ObjInfo.Version(),
ETag: p.ObjInfo.HashSum,
Key: p.NotificationInfo.Name,
Size: p.NotificationInfo.Size,
VersionID: p.NotificationInfo.Version,
ETag: p.NotificationInfo.HashSum,
Sequencer: "",
},
},

View file

@ -33,6 +33,7 @@ type (
ID uint64
ObjID oid.ID
TimeStamp uint64
Size int64
Meta map[string]string
}
@ -112,26 +113,31 @@ type NodeResponse interface {
}
func newTreeNode(nodeInfo NodeResponse) (*TreeNode, error) {
var objID oid.ID
meta := make(map[string]string, len(nodeInfo.GetMeta()))
treeNode := &TreeNode{
ID: nodeInfo.GetNodeId(),
TimeStamp: nodeInfo.GetTimestamp(),
Meta: make(map[string]string, len(nodeInfo.GetMeta())),
}
for _, kv := range nodeInfo.GetMeta() {
if kv.GetKey() == oidKV {
if err := objID.DecodeString(string(kv.GetValue())); err != nil {
switch kv.GetKey() {
case oidKV:
if err := treeNode.ObjID.DecodeString(string(kv.GetValue())); err != nil {
return nil, err
}
continue
case sizeKV:
if sizeStr := string(kv.GetValue()); len(sizeStr) > 0 {
var err error
if treeNode.Size, err = strconv.ParseInt(sizeStr, 10, 64); err != nil {
return nil, fmt.Errorf("invalid size value '%s': %w", sizeStr, err)
}
}
default:
treeNode.Meta[kv.GetKey()] = string(kv.GetValue())
}
}
meta[kv.GetKey()] = string(kv.GetValue())
}
return &TreeNode{
ID: nodeInfo.GetNodeId(),
ObjID: objID,
TimeStamp: nodeInfo.GetTimestamp(),
Meta: meta,
}, nil
return treeNode, nil
}
func (n *TreeNode) Get(key string) (string, bool) {
@ -160,12 +166,15 @@ func newNodeVersion(filePath string, node NodeResponse) (*data.NodeVersion, erro
func newNodeVersionFromTreeNode(filePath string, treeNode *TreeNode) *data.NodeVersion {
_, isUnversioned := treeNode.Get(isUnversionedKV)
_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
eTag, _ := treeNode.Get(etagKV)
version := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
ID: treeNode.ID,
OID: treeNode.ObjID,
Timestamp: treeNode.TimeStamp,
ETag: eTag,
Size: treeNode.Size,
FilePath: filePath,
},
IsUnversioned: isUnversioned,
@ -528,7 +537,7 @@ func (c *TreeClient) GetVersions(ctx context.Context, cnrID cid.ID, filepath str
}
func (c *TreeClient) GetLatestVersion(ctx context.Context, cnrID cid.ID, objectName string) (*data.NodeVersion, error) {
meta := []string{oidKV, isUnversionedKV, isDeleteMarkerKV}
meta := []string{oidKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV}
path := pathFromName(objectName)
p := &getNodesParams{
@ -1034,6 +1043,13 @@ func (c *TreeClient) addVersion(ctx context.Context, cnrID cid.ID, treeID string
fileNameKV: path[len(path)-1],
}
if version.Size > 0 {
meta[sizeKV] = strconv.FormatInt(version.Size, 10)
}
if len(version.ETag) > 0 {
meta[etagKV] = version.ETag
}
if version.DeleteMarker != nil {
meta[isDeleteMarkerKV] = "true"
meta[ownerKV] = version.DeleteMarker.Owner.EncodeToString()
@ -1062,7 +1078,7 @@ func (c *TreeClient) addVersion(ctx context.Context, cnrID cid.ID, treeID string
}
func (c *TreeClient) getVersions(ctx context.Context, cnrID cid.ID, treeID, filepath string, onlyUnversioned bool) ([]*data.NodeVersion, error) {
keysToReturn := []string{oidKV, isUnversionedKV, isDeleteMarkerKV}
keysToReturn := []string{oidKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV}
path := pathFromName(filepath)
p := &getNodesParams{
CnrID: cnrID,