frostfs-s3-gw/api/layer/system_object.go
Denis Kirillov 6cb0026007
All checks were successful
/ Builds (1.21) (pull_request) Successful in 1m20s
/ Builds (1.22) (pull_request) Successful in 1m15s
/ Vulncheck (pull_request) Successful in 1m6s
/ DCO (pull_request) Successful in 57s
/ Lint (pull_request) Successful in 2m59s
/ Tests (1.21) (pull_request) Successful in 1m9s
/ Tests (1.22) (pull_request) Successful in 1m23s
[#427] layer: Split FrostFS ReadObject to separate methods
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-07-23 16:53:59 +03:00

260 lines
7.7 KiB
Go

package layer
import (
"context"
"encoding/xml"
errorsStd "errors"
"fmt"
"math"
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
const (
AttributeComplianceMode = ".s3-compliance-mode"
)
type PutLockInfoParams struct {
ObjVersion *data.ObjectVersion
NewLock *data.ObjectLock
CopiesNumbers []uint32
NodeVersion *data.NodeVersion // optional
}
func (n *Layer) PutLockInfo(ctx context.Context, p *PutLockInfoParams) (err error) {
newLock := p.NewLock
versionNode := p.NodeVersion
// sometimes node version can be provided from executing context
// if not, then receive node version from tree service
if versionNode == nil {
versionNode, err = n.getNodeVersionFromCacheOrFrostfs(ctx, p.ObjVersion)
if err != nil {
return err
}
}
lockInfo, err := n.treeService.GetLock(ctx, p.ObjVersion.BktInfo, versionNode.ID)
if err != nil && !errorsStd.Is(err, ErrNodeNotFound) {
return err
}
if lockInfo == nil {
lockInfo = &data.LockInfo{}
}
if newLock.Retention != nil {
if lockInfo.IsRetentionSet() {
if lockInfo.IsCompliance() {
return fmt.Errorf("you cannot change compliance mode")
}
if !newLock.Retention.ByPassedGovernance {
return fmt.Errorf("you cannot bypass governence mode")
}
untilDate := lockInfo.UntilDate()
if len(untilDate) > 0 {
parsedTime, err := time.Parse(time.RFC3339, untilDate)
if err != nil {
return fmt.Errorf("couldn't parse time '%s': %w", untilDate, err)
}
if parsedTime.After(newLock.Retention.Until) {
return fmt.Errorf("you couldn't short the until date")
}
}
}
lock := &data.ObjectLock{Retention: newLock.Retention}
retentionOID, err := n.putLockObject(ctx, p.ObjVersion.BktInfo, versionNode.OID, lock, p.CopiesNumbers)
if err != nil {
return err
}
lockInfo.SetRetention(retentionOID, newLock.Retention.Until.UTC().Format(time.RFC3339), newLock.Retention.IsCompliance)
}
if newLock.LegalHold != nil {
if newLock.LegalHold.Enabled && !lockInfo.IsLegalHoldSet() {
lock := &data.ObjectLock{LegalHold: newLock.LegalHold}
legalHoldOID, err := n.putLockObject(ctx, p.ObjVersion.BktInfo, versionNode.OID, lock, p.CopiesNumbers)
if err != nil {
return err
}
lockInfo.SetLegalHold(legalHoldOID)
} else if !newLock.LegalHold.Enabled && lockInfo.IsLegalHoldSet() {
if err = n.objectDelete(ctx, p.ObjVersion.BktInfo, lockInfo.LegalHold()); err != nil {
return fmt.Errorf("couldn't delete lock object '%s' to remove legal hold: %w", lockInfo.LegalHold().EncodeToString(), err)
}
lockInfo.ResetLegalHold()
}
}
if err = n.treeService.PutLock(ctx, p.ObjVersion.BktInfo, versionNode.ID, lockInfo); err != nil {
return fmt.Errorf("couldn't put lock into tree: %w", err)
}
n.cache.PutLockInfo(n.BearerOwner(ctx), lockObjectKey(p.ObjVersion), lockInfo)
return nil
}
func (n *Layer) getNodeVersionFromCacheOrFrostfs(ctx context.Context, objVersion *data.ObjectVersion) (nodeVersion *data.NodeVersion, err error) {
// check cache if node version is stored inside extendedObjectVersion
nodeVersion = n.getNodeVersionFromCache(n.BearerOwner(ctx), objVersion)
if nodeVersion == nil {
// else get node version from tree service
return n.getNodeVersion(ctx, objVersion)
}
return nodeVersion, nil
}
func (n *Layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, lock *data.ObjectLock, copiesNumber []uint32) (oid.ID, error) {
prm := PrmObjectCreate{
Container: bktInfo.CID,
Locks: []oid.ID{objID},
CreationTime: TimeNow(ctx),
CopiesNumber: copiesNumber,
}
var err error
prm.Attributes, err = n.attributesFromLock(ctx, lock)
if err != nil {
return oid.ID{}, err
}
_, id, _, _, err := n.objectPutAndHash(ctx, prm, bktInfo)
return id, err
}
func (n *Layer) GetLockInfo(ctx context.Context, objVersion *data.ObjectVersion) (*data.LockInfo, error) {
owner := n.BearerOwner(ctx)
if lockInfo := n.cache.GetLockInfo(owner, lockObjectKey(objVersion)); lockInfo != nil {
return lockInfo, nil
}
versionNode, err := n.getNodeVersion(ctx, objVersion)
if err != nil {
return nil, err
}
lockInfo, err := n.treeService.GetLock(ctx, objVersion.BktInfo, versionNode.ID)
if err != nil && !errorsStd.Is(err, ErrNodeNotFound) {
return nil, err
}
if lockInfo == nil {
lockInfo = &data.LockInfo{}
}
n.cache.PutLockInfo(owner, lockObjectKey(objVersion), lockInfo)
return lockInfo, nil
}
func (n *Layer) getCORS(ctx context.Context, bkt *data.BucketInfo) (*data.CORSConfiguration, error) {
owner := n.BearerOwner(ctx)
if cors := n.cache.GetCORS(owner, bkt); cors != nil {
return cors, nil
}
addr, err := n.treeService.GetBucketCORS(ctx, bkt)
objNotFound := errorsStd.Is(err, ErrNodeNotFound)
if err != nil && !objNotFound {
return nil, err
}
if objNotFound {
return nil, fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchCORSConfiguration), err.Error())
}
var prmAuth PrmAuth
corsBkt := bkt
if !addr.Container().Equals(bkt.CID) && !addr.Container().Equals(cid.ID{}) {
corsBkt = &data.BucketInfo{CID: addr.Container()}
prmAuth.PrivateKey = &n.gateKey.PrivateKey
}
obj, err := n.objectGetWithAuth(ctx, corsBkt, addr.Object(), prmAuth)
if err != nil {
return nil, fmt.Errorf("get cors object: %w", err)
}
cors := &data.CORSConfiguration{}
if err = xml.NewDecoder(obj.Payload).Decode(&cors); err != nil {
return nil, fmt.Errorf("unmarshal cors: %w", err)
}
n.cache.PutCORS(owner, bkt, cors)
return cors, nil
}
func lockObjectKey(objVersion *data.ObjectVersion) string {
// todo reconsider forming name since versionID can be "null" or ""
return ".lock." + objVersion.BktInfo.CID.EncodeToString() + "." + objVersion.ObjectName + "." + objVersion.VersionID
}
func (n *Layer) GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
owner := n.BearerOwner(ctx)
if settings := n.cache.GetSettings(owner, bktInfo); settings != nil {
return settings, nil
}
settings, err := n.treeService.GetSettingsNode(ctx, bktInfo)
if err != nil {
if !errorsStd.Is(err, ErrNodeNotFound) {
return nil, err
}
settings = &data.BucketSettings{Versioning: data.VersioningUnversioned}
}
n.cache.PutSettings(owner, bktInfo, settings)
return settings, nil
}
func (n *Layer) PutBucketSettings(ctx context.Context, p *PutSettingsParams) error {
if err := n.treeService.PutSettingsNode(ctx, p.BktInfo, p.Settings); err != nil {
return fmt.Errorf("failed to get settings node: %w", err)
}
n.cache.PutSettings(n.BearerOwner(ctx), p.BktInfo, p.Settings)
return nil
}
func (n *Layer) attributesFromLock(ctx context.Context, lock *data.ObjectLock) ([][2]string, error) {
var (
err error
expEpoch uint64
result [][2]string
)
if lock.Retention != nil {
if _, expEpoch, err = n.frostFS.TimeToEpoch(ctx, TimeNow(ctx), lock.Retention.Until); err != nil {
return nil, fmt.Errorf("fetch time to epoch: %w", err)
}
if lock.Retention.IsCompliance {
result = append(result, [2]string{AttributeComplianceMode, "true"})
}
}
if lock.LegalHold != nil && lock.LegalHold.Enabled {
// todo: (@KirillovDenis) reconsider this when FrostFS will support Legal Hold https://git.frostfs.info/TrueCloudLab/frostfs-contract/issues/2
// Currently lock object must have an expiration epoch.
// Besides we need to override retention expiration epoch since legal hold cannot be deleted yet.
expEpoch = math.MaxUint64
}
if expEpoch != 0 {
result = append(result, [2]string{
object.SysAttributeExpEpoch, strconv.FormatUint(expEpoch, 10),
})
}
return result, nil
}