[#451] Handle lock objects using tree service
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
bc000f1bc4
commit
dd534e8738
23 changed files with 488 additions and 520 deletions
|
@ -2,7 +2,6 @@ package layer
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"encoding/xml"
|
||||
errorsStd "errors"
|
||||
"fmt"
|
||||
|
@ -11,56 +10,123 @@ import (
|
|||
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/internal/misc"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
AttributeComplianceMode = ".s3-compliance-mode"
|
||||
AttributeRetainUntil = ".s3-retain-until"
|
||||
AttributeExpirationEpoch = "__NEOFS__EXPIRATION_EPOCH"
|
||||
AttributeSysTickEpoch = "__NEOFS__TICK_EPOCH"
|
||||
AttributeSysTickTopic = "__NEOFS__TICK_TOPIC"
|
||||
)
|
||||
|
||||
func (n *layer) PutSystemObject(ctx context.Context, p *PutSystemObjectParams) (*data.ObjectInfo, error) {
|
||||
objInfo, err := n.putSystemObjectIntoNeoFS(ctx, p)
|
||||
func (n *layer) PutLockInfo(ctx context.Context, objVersion *ObjectVersion, newLock *data.ObjectLock) error {
|
||||
cnrID := objVersion.BktInfo.CID
|
||||
versionNode, err := n.getNodeVersion(ctx, objVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
if err = n.systemCache.PutObject(systemObjectKey(p.BktInfo, p.ObjName), objInfo); err != nil {
|
||||
lockInfo, err := n.treeService.GetLock(ctx, &cnrID, versionNode.ID)
|
||||
if err != nil && !errorsStd.Is(err, ErrNodeNotFound) {
|
||||
return err
|
||||
}
|
||||
|
||||
if lockInfo == nil {
|
||||
lockInfo = &data.LockInfo{}
|
||||
}
|
||||
|
||||
if newLock.Retention != nil {
|
||||
if lockInfo.RetentionOID != nil {
|
||||
if lockInfo.IsCompliance {
|
||||
return fmt.Errorf("you cannot change compliance mode")
|
||||
}
|
||||
if !newLock.Retention.ByPassedGovernance {
|
||||
return fmt.Errorf("you cannot bypass governence mode")
|
||||
}
|
||||
|
||||
if len(lockInfo.UntilDate) > 0 {
|
||||
parsedTime, err := time.Parse(time.RFC3339, lockInfo.UntilDate)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't parse time '%s': %w", lockInfo.UntilDate, err)
|
||||
}
|
||||
if parsedTime.After(newLock.Retention.Until) {
|
||||
return fmt.Errorf("you couldn't short the until date")
|
||||
}
|
||||
}
|
||||
}
|
||||
lock := &data.ObjectLock{Retention: newLock.Retention}
|
||||
if lockInfo.RetentionOID, err = n.putLockObject(ctx, objVersion.BktInfo, versionNode.OID, lock); err != nil {
|
||||
return err
|
||||
}
|
||||
lockInfo.IsCompliance = newLock.Retention.IsCompliance
|
||||
lockInfo.UntilDate = newLock.Retention.Until.UTC().Format(time.RFC3339)
|
||||
}
|
||||
|
||||
if newLock.LegalHold != nil {
|
||||
if newLock.LegalHold.Enabled && lockInfo.LegalHoldOID == nil {
|
||||
lock := &data.ObjectLock{LegalHold: newLock.LegalHold}
|
||||
if lockInfo.LegalHoldOID, err = n.putLockObject(ctx, objVersion.BktInfo, versionNode.OID, lock); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if !newLock.LegalHold.Enabled && lockInfo.LegalHoldOID != nil {
|
||||
if err = n.objectDelete(ctx, objVersion.BktInfo, *lockInfo.LegalHoldOID); err != nil {
|
||||
return fmt.Errorf("couldn't delete lock object '%s' to remove legal hold: %w", lockInfo.LegalHoldOID.EncodeToString(), err)
|
||||
}
|
||||
lockInfo.LegalHoldOID = nil
|
||||
}
|
||||
}
|
||||
|
||||
if err = n.treeService.PutLock(ctx, &cnrID, versionNode.ID, lockInfo); err != nil {
|
||||
return fmt.Errorf("couldn't put lock into tree: %w", err)
|
||||
}
|
||||
|
||||
if err = n.systemCache.PutLockInfo(lockObjectKey(objVersion), lockInfo); err != nil {
|
||||
n.log.Error("couldn't cache system object", zap.Error(err))
|
||||
}
|
||||
|
||||
return objInfo, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *layer) HeadSystemObject(ctx context.Context, bkt *data.BucketInfo, objName string) (*data.ObjectInfo, error) {
|
||||
if objInfo := n.systemCache.GetObject(systemObjectKey(bkt, objName)); objInfo != nil {
|
||||
return objInfo, nil
|
||||
func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, lock *data.ObjectLock) (*oid.ID, error) {
|
||||
prm := PrmObjectCreate{
|
||||
Container: bktInfo.CID,
|
||||
Creator: bktInfo.Owner,
|
||||
Locks: []oid.ID{objID},
|
||||
}
|
||||
|
||||
node, err := n.treeService.GetSystemVersion(ctx, &bkt.CID, objName)
|
||||
if err != nil {
|
||||
if errorsStd.Is(err, ErrNodeNotFound) {
|
||||
return nil, errors.GetAPIError(errors.ErrNoSuchKey)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
meta, err := n.objectHead(ctx, bkt, node.OID)
|
||||
var err error
|
||||
prm.Attributes, err = n.attributesFromLock(ctx, lock)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
objInfo := objInfoFromMeta(bkt, meta)
|
||||
if err = n.systemCache.PutObject(systemObjectKey(bkt, objName), objInfo); err != nil {
|
||||
id, _, err := n.objectPutAndHash(ctx, prm, bktInfo)
|
||||
return id, err
|
||||
}
|
||||
|
||||
func (n *layer) GetLockInfo(ctx context.Context, objVersion *ObjectVersion) (*data.LockInfo, error) {
|
||||
if lockInfo := n.systemCache.GetLockInfo(lockObjectKey(objVersion)); lockInfo != nil {
|
||||
return lockInfo, nil
|
||||
}
|
||||
|
||||
versionNode, err := n.getNodeVersion(ctx, objVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lockInfo, err := n.treeService.GetLock(ctx, &objVersion.BktInfo.CID, versionNode.ID)
|
||||
if err != nil && !errorsStd.Is(err, ErrNodeNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
if lockInfo == nil {
|
||||
lockInfo = &data.LockInfo{}
|
||||
}
|
||||
|
||||
if err = n.systemCache.PutLockInfo(lockObjectKey(objVersion), lockInfo); err != nil {
|
||||
n.log.Error("couldn't cache system object", zap.Error(err))
|
||||
}
|
||||
|
||||
return objInfo, nil
|
||||
return lockInfo, nil
|
||||
}
|
||||
|
||||
func (n *layer) DeleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error {
|
||||
|
@ -83,97 +149,6 @@ func (n *layer) DeleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo
|
|||
return nil
|
||||
}
|
||||
|
||||
func (n *layer) putSystemObjectIntoNeoFS(ctx context.Context, p *PutSystemObjectParams) (*data.ObjectInfo, error) {
|
||||
prm := PrmObjectCreate{
|
||||
Container: p.BktInfo.CID,
|
||||
Creator: p.BktInfo.Owner,
|
||||
Attributes: make([][2]string, 2, 2+len(p.Metadata)),
|
||||
Payload: p.Reader,
|
||||
}
|
||||
|
||||
prm.Attributes[0][0], prm.Attributes[0][1] = objectSystemAttributeName, p.ObjName
|
||||
prm.Attributes[1][0], prm.Attributes[1][1] = attrVersionsIgnore, "true"
|
||||
|
||||
for k, v := range p.Metadata {
|
||||
if !IsSystemHeader(k) {
|
||||
k = p.Prefix + k
|
||||
}
|
||||
|
||||
if v == "" && p.Prefix == tagPrefix {
|
||||
v = tagEmptyMark
|
||||
}
|
||||
|
||||
if p.Lock != nil && len(p.Lock.Objects) > 0 {
|
||||
prm.Locks = p.Lock.Objects
|
||||
|
||||
attrs, err := n.attributesFromLock(ctx, p.Lock)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get lock attributes: %w", err)
|
||||
}
|
||||
|
||||
prm.Attributes = append(prm.Attributes, attrs...)
|
||||
}
|
||||
|
||||
prm.Attributes = append(prm.Attributes, [2]string{k, v})
|
||||
}
|
||||
|
||||
id, hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
newVersion := &data.BaseNodeVersion{OID: *id}
|
||||
if err = n.treeService.AddSystemVersion(ctx, &p.BktInfo.CID, p.ObjName, newVersion); err != nil {
|
||||
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
|
||||
}
|
||||
|
||||
currentEpoch, _, err := n.neoFS.TimeToEpoch(ctx, time.Now().Add(time.Minute))
|
||||
if err != nil {
|
||||
n.log.Warn("couldn't get creation epoch",
|
||||
zap.String("bucket", p.BktInfo.Name),
|
||||
zap.String("object", misc.SanitizeString(p.ObjName)),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
||||
headers := make(map[string]string, len(p.Metadata))
|
||||
for _, attr := range prm.Attributes {
|
||||
headers[attr[0]] = attr[1]
|
||||
}
|
||||
|
||||
return &data.ObjectInfo{
|
||||
ID: *id,
|
||||
CID: p.BktInfo.CID,
|
||||
|
||||
Owner: p.BktInfo.Owner,
|
||||
Bucket: p.BktInfo.Name,
|
||||
Name: p.ObjName,
|
||||
Created: time.Now(),
|
||||
CreationEpoch: currentEpoch,
|
||||
Size: p.Size,
|
||||
Headers: headers,
|
||||
HashSum: hex.EncodeToString(hash),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (n *layer) getSystemObjectFromNeoFS(ctx context.Context, bkt *data.BucketInfo, objName string) (*object.Object, error) {
|
||||
versions, err := n.headSystemVersions(ctx, bkt, objName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
objInfo := versions.getLast()
|
||||
|
||||
obj, err := n.objectGet(ctx, bkt, objInfo.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(obj.Payload()) == 0 {
|
||||
return nil, errors.GetAPIError(errors.ErrInternalError)
|
||||
}
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
func (n *layer) getCORS(ctx context.Context, bkt *data.BucketInfo, sysName string) (*data.CORSConfiguration, error) {
|
||||
if cors := n.systemCache.GetCORS(systemObjectKey(bkt, sysName)); cors != nil {
|
||||
return cors, nil
|
||||
|
@ -251,6 +226,11 @@ func systemObjectKey(bktInfo *data.BucketInfo, obj string) string {
|
|||
return bktInfo.Name + obj
|
||||
}
|
||||
|
||||
func lockObjectKey(objVersion *ObjectVersion) string {
|
||||
// todo reconsider forming name since versionID can be "null" or ""
|
||||
return ".lock." + objVersion.BktInfo.CID.EncodeToString() + "." + objVersion.ObjectName + "." + objVersion.VersionID
|
||||
}
|
||||
|
||||
func (n *layer) GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
|
||||
systemKey := systemObjectKey(bktInfo, bktInfo.SettingsObjectName())
|
||||
if settings := n.systemCache.GetSettings(systemKey); settings != nil {
|
||||
|
@ -289,26 +269,24 @@ func (n *layer) PutBucketSettings(ctx context.Context, p *PutSettingsParams) err
|
|||
}
|
||||
|
||||
func (n *layer) attributesFromLock(ctx context.Context, lock *data.ObjectLock) ([][2]string, error) {
|
||||
var result [][2]string
|
||||
if !lock.Until.IsZero() {
|
||||
_, exp, err := n.neoFS.TimeToEpoch(ctx, lock.Until)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("fetch time to epoch: %w", err)
|
||||
}
|
||||
|
||||
attrs := [][2]string{
|
||||
{AttributeExpirationEpoch, strconv.FormatUint(exp, 10)},
|
||||
{AttributeRetainUntil, lock.Until.Format(time.RFC3339)},
|
||||
}
|
||||
|
||||
result = append(result, attrs...)
|
||||
if lock.IsCompliance {
|
||||
attrCompliance := [2]string{
|
||||
AttributeComplianceMode, strconv.FormatBool(true),
|
||||
}
|
||||
result = append(result, attrCompliance)
|
||||
}
|
||||
if lock.Retention == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
_, exp, err := n.neoFS.TimeToEpoch(ctx, lock.Retention.Until)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("fetch time to epoch: %w", err)
|
||||
}
|
||||
|
||||
result := [][2]string{
|
||||
{AttributeExpirationEpoch, strconv.FormatUint(exp, 10)},
|
||||
}
|
||||
|
||||
if lock.Retention.IsCompliance {
|
||||
attrCompliance := [2]string{
|
||||
AttributeComplianceMode, strconv.FormatBool(true),
|
||||
}
|
||||
result = append(result, attrCompliance)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue