forked from TrueCloudLab/frostfs-s3-gw
Alex Vanin
fae03c2b50
When object exists in tree but missing in storage, we can't remove bucket. While storage node does not sync tree service and object service, the only way to delete such broken bucket is to ignore 'object not found' error, clear cache and do not include missing objects in the listing result. Signed-off-by: Alex Vanin <a.vanin@yadro.com>
746 lines
23 KiB
Go
746 lines
23 KiB
Go
package layer
|
|
|
|
import (
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"crypto/rand"
|
|
"fmt"
|
|
"io"
|
|
"net/url"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type (
|
|
EventListener interface {
|
|
Subscribe(context.Context, string, MsgHandler) error
|
|
Listen(context.Context)
|
|
}
|
|
|
|
MsgHandler interface {
|
|
HandleMessage(context.Context, *nats.Msg) error
|
|
}
|
|
|
|
MsgHandlerFunc func(context.Context, *nats.Msg) error
|
|
|
|
BucketResolver interface {
|
|
Resolve(ctx context.Context, name string) (cid.ID, error)
|
|
}
|
|
|
|
layer struct {
|
|
frostFS FrostFS
|
|
log *zap.Logger
|
|
anonKey AnonymousKey
|
|
resolver BucketResolver
|
|
ncontroller EventListener
|
|
cache *Cache
|
|
treeService TreeService
|
|
}
|
|
|
|
Config struct {
|
|
ChainAddress string
|
|
Caches *CachesConfig
|
|
AnonKey AnonymousKey
|
|
Resolver BucketResolver
|
|
TreeService TreeService
|
|
}
|
|
|
|
// AnonymousKey contains data for anonymous requests.
|
|
AnonymousKey struct {
|
|
Key *keys.PrivateKey
|
|
}
|
|
|
|
// GetObjectParams stores object get request parameters.
|
|
GetObjectParams struct {
|
|
Range *RangeParams
|
|
ObjectInfo *data.ObjectInfo
|
|
BucketInfo *data.BucketInfo
|
|
Writer io.Writer
|
|
Encryption encryption.Params
|
|
}
|
|
|
|
// HeadObjectParams stores object head request parameters.
|
|
HeadObjectParams struct {
|
|
BktInfo *data.BucketInfo
|
|
Object string
|
|
VersionID string
|
|
}
|
|
|
|
// ObjectVersion stores object version info.
|
|
ObjectVersion struct {
|
|
BktInfo *data.BucketInfo
|
|
ObjectName string
|
|
VersionID string
|
|
NoErrorOnDeleteMarker bool
|
|
}
|
|
|
|
// RangeParams stores range header request parameters.
|
|
RangeParams struct {
|
|
Start uint64
|
|
End uint64
|
|
}
|
|
|
|
// PutObjectParams stores object put request parameters.
|
|
PutObjectParams struct {
|
|
BktInfo *data.BucketInfo
|
|
Object string
|
|
Size int64
|
|
Reader io.Reader
|
|
Header map[string]string
|
|
Lock *data.ObjectLock
|
|
Encryption encryption.Params
|
|
CopiesNumbers []uint32
|
|
}
|
|
|
|
DeleteObjectParams struct {
|
|
BktInfo *data.BucketInfo
|
|
Objects []*VersionedObject
|
|
Settings *data.BucketSettings
|
|
}
|
|
|
|
// PutSettingsParams stores object copy request parameters.
|
|
PutSettingsParams struct {
|
|
BktInfo *data.BucketInfo
|
|
Settings *data.BucketSettings
|
|
}
|
|
|
|
// PutCORSParams stores PutCORS request parameters.
|
|
PutCORSParams struct {
|
|
BktInfo *data.BucketInfo
|
|
Reader io.Reader
|
|
CopiesNumbers []uint32
|
|
}
|
|
|
|
// CopyObjectParams stores object copy request parameters.
|
|
CopyObjectParams struct {
|
|
SrcObject *data.ObjectInfo
|
|
ScrBktInfo *data.BucketInfo
|
|
DstBktInfo *data.BucketInfo
|
|
DstObject string
|
|
SrcSize int64
|
|
Header map[string]string
|
|
Range *RangeParams
|
|
Lock *data.ObjectLock
|
|
Encryption encryption.Params
|
|
CopiesNumbers []uint32
|
|
}
|
|
// CreateBucketParams stores bucket create request parameters.
|
|
CreateBucketParams struct {
|
|
Name string
|
|
Policy netmap.PlacementPolicy
|
|
EACL *eacl.Table
|
|
SessionContainerCreation *session.Container
|
|
SessionEACL *session.Container
|
|
LocationConstraint string
|
|
ObjectLockEnabled bool
|
|
}
|
|
// PutBucketACLParams stores put bucket acl request parameters.
|
|
PutBucketACLParams struct {
|
|
BktInfo *data.BucketInfo
|
|
EACL *eacl.Table
|
|
SessionToken *session.Container
|
|
}
|
|
// DeleteBucketParams stores delete bucket request parameters.
|
|
DeleteBucketParams struct {
|
|
BktInfo *data.BucketInfo
|
|
SessionToken *session.Container
|
|
}
|
|
|
|
// ListObjectVersionsParams stores list objects versions parameters.
|
|
ListObjectVersionsParams struct {
|
|
BktInfo *data.BucketInfo
|
|
Delimiter string
|
|
KeyMarker string
|
|
MaxKeys int
|
|
Prefix string
|
|
VersionIDMarker string
|
|
Encode string
|
|
}
|
|
|
|
// VersionedObject stores info about objects to delete.
|
|
VersionedObject struct {
|
|
Name string
|
|
VersionID string
|
|
DeleteMarkVersion string
|
|
DeleteMarkerEtag string
|
|
Error error
|
|
}
|
|
|
|
// Client provides S3 API client interface.
|
|
Client interface {
|
|
Initialize(ctx context.Context, c EventListener) error
|
|
EphemeralKey() *keys.PublicKey
|
|
|
|
GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error)
|
|
PutBucketSettings(ctx context.Context, p *PutSettingsParams) error
|
|
|
|
PutBucketCORS(ctx context.Context, p *PutCORSParams) error
|
|
GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*data.CORSConfiguration, error)
|
|
DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error
|
|
|
|
ListBuckets(ctx context.Context) ([]*data.BucketInfo, error)
|
|
GetBucketInfo(ctx context.Context, name string) (*data.BucketInfo, error)
|
|
GetBucketACL(ctx context.Context, bktInfo *data.BucketInfo) (*BucketACL, error)
|
|
PutBucketACL(ctx context.Context, p *PutBucketACLParams) error
|
|
CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error)
|
|
DeleteBucket(ctx context.Context, p *DeleteBucketParams) error
|
|
|
|
GetObject(ctx context.Context, p *GetObjectParams) error
|
|
GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ObjectInfo, error)
|
|
GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error)
|
|
|
|
GetLockInfo(ctx context.Context, obj *ObjectVersion) (*data.LockInfo, error)
|
|
PutLockInfo(ctx context.Context, p *PutLockInfoParams) error
|
|
|
|
GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error)
|
|
PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error
|
|
DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error
|
|
|
|
GetObjectTagging(ctx context.Context, p *GetObjectTaggingParams) (string, map[string]string, error)
|
|
PutObjectTagging(ctx context.Context, p *PutObjectTaggingParams) (*data.NodeVersion, error)
|
|
DeleteObjectTagging(ctx context.Context, p *ObjectVersion) (*data.NodeVersion, error)
|
|
|
|
PutObject(ctx context.Context, p *PutObjectParams) (*data.ExtendedObjectInfo, error)
|
|
|
|
CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ExtendedObjectInfo, error)
|
|
|
|
ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error)
|
|
ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error)
|
|
ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error)
|
|
|
|
DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*VersionedObject
|
|
|
|
CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error
|
|
CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ExtendedObjectInfo, error)
|
|
UploadPart(ctx context.Context, p *UploadPartParams) (string, error)
|
|
UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error)
|
|
ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error)
|
|
AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error
|
|
ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error)
|
|
|
|
PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error
|
|
GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error)
|
|
|
|
// Compound methods for optimizations
|
|
|
|
// GetObjectTaggingAndLock unifies GetObjectTagging and GetLock methods in single tree service invocation.
|
|
GetObjectTaggingAndLock(ctx context.Context, p *ObjectVersion, nodeVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error)
|
|
}
|
|
)
|
|
|
|
const (
|
|
tagPrefix = "S3-Tag-"
|
|
|
|
AESEncryptionAlgorithm = "AES256"
|
|
AESKeySize = 32
|
|
AttributeEncryptionAlgorithm = api.FrostFSSystemMetadataPrefix + "Algorithm"
|
|
AttributeDecryptedSize = api.FrostFSSystemMetadataPrefix + "Decrypted-Size"
|
|
AttributeHMACSalt = api.FrostFSSystemMetadataPrefix + "HMAC-Salt"
|
|
AttributeHMACKey = api.FrostFSSystemMetadataPrefix + "HMAC-Key"
|
|
|
|
AttributeFrostfsCopiesNumber = "frostfs-copies-number" // such format to match X-Amz-Meta-Frostfs-Copies-Number header
|
|
)
|
|
|
|
func (t *VersionedObject) String() string {
|
|
return t.Name + ":" + t.VersionID
|
|
}
|
|
|
|
func (f MsgHandlerFunc) HandleMessage(ctx context.Context, msg *nats.Msg) error {
|
|
return f(ctx, msg)
|
|
}
|
|
|
|
// NewLayer creates an instance of a layer. It checks credentials
|
|
// and establishes gRPC connection with the node.
|
|
func NewLayer(log *zap.Logger, frostFS FrostFS, config *Config) Client {
|
|
return &layer{
|
|
frostFS: frostFS,
|
|
log: log,
|
|
anonKey: config.AnonKey,
|
|
resolver: config.Resolver,
|
|
cache: NewCache(config.Caches),
|
|
treeService: config.TreeService,
|
|
}
|
|
}
|
|
|
|
func (n *layer) EphemeralKey() *keys.PublicKey {
|
|
return n.anonKey.Key.PublicKey()
|
|
}
|
|
|
|
func (n *layer) Initialize(ctx context.Context, c EventListener) error {
|
|
if n.IsNotificationEnabled() {
|
|
return fmt.Errorf("already initialized")
|
|
}
|
|
|
|
// todo add notification handlers (e.g. for lifecycles)
|
|
|
|
c.Listen(ctx)
|
|
|
|
n.ncontroller = c
|
|
return nil
|
|
}
|
|
|
|
func (n *layer) IsNotificationEnabled() bool {
|
|
return n.ncontroller != nil
|
|
}
|
|
|
|
// IsAuthenticatedRequest checks if access box exists in the current request.
|
|
func IsAuthenticatedRequest(ctx context.Context) bool {
|
|
_, ok := ctx.Value(api.BoxData).(*accessbox.Box)
|
|
return ok
|
|
}
|
|
|
|
// TimeNow returns client time from request or time.Now().
|
|
func TimeNow(ctx context.Context) time.Time {
|
|
if now, ok := ctx.Value(api.ClientTime).(time.Time); ok {
|
|
return now
|
|
}
|
|
|
|
return time.Now()
|
|
}
|
|
|
|
// Owner returns owner id from BearerToken (context) or from client owner.
|
|
func (n *layer) Owner(ctx context.Context) user.ID {
|
|
if bd, ok := ctx.Value(api.BoxData).(*accessbox.Box); ok && bd != nil && bd.Gate != nil && bd.Gate.BearerToken != nil {
|
|
return bearer.ResolveIssuer(*bd.Gate.BearerToken)
|
|
}
|
|
|
|
var ownerID user.ID
|
|
user.IDFromKey(&ownerID, (ecdsa.PublicKey)(*n.EphemeralKey()))
|
|
|
|
return ownerID
|
|
}
|
|
|
|
func (n *layer) prepareAuthParameters(ctx context.Context, prm *PrmAuth, bktOwner user.ID) {
|
|
if bd, ok := ctx.Value(api.BoxData).(*accessbox.Box); ok && bd != nil && bd.Gate != nil && bd.Gate.BearerToken != nil {
|
|
if bd.Gate.BearerToken.Impersonate() || bktOwner.Equals(bearer.ResolveIssuer(*bd.Gate.BearerToken)) {
|
|
prm.BearerToken = bd.Gate.BearerToken
|
|
return
|
|
}
|
|
}
|
|
|
|
prm.PrivateKey = &n.anonKey.Key.PrivateKey
|
|
}
|
|
|
|
// GetBucketInfo returns bucket info by name.
|
|
func (n *layer) GetBucketInfo(ctx context.Context, name string) (*data.BucketInfo, error) {
|
|
name, err := url.QueryUnescape(name)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unescape bucket name: %w", err)
|
|
}
|
|
|
|
if bktInfo := n.cache.GetBucket(name); bktInfo != nil {
|
|
return bktInfo, nil
|
|
}
|
|
|
|
containerID, err := n.ResolveBucket(ctx, name)
|
|
if err != nil {
|
|
n.log.Debug("bucket not found", zap.Error(err))
|
|
return nil, errors.GetAPIError(errors.ErrNoSuchBucket)
|
|
}
|
|
|
|
return n.containerInfo(ctx, containerID)
|
|
}
|
|
|
|
// GetBucketACL returns bucket acl info by name.
|
|
func (n *layer) GetBucketACL(ctx context.Context, bktInfo *data.BucketInfo) (*BucketACL, error) {
|
|
eACL, err := n.GetContainerEACL(ctx, bktInfo.CID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get container eacl: %w", err)
|
|
}
|
|
|
|
return &BucketACL{
|
|
Info: bktInfo,
|
|
EACL: eACL,
|
|
}, nil
|
|
}
|
|
|
|
// PutBucketACL puts bucket acl by name.
|
|
func (n *layer) PutBucketACL(ctx context.Context, param *PutBucketACLParams) error {
|
|
return n.setContainerEACLTable(ctx, param.BktInfo.CID, param.EACL, param.SessionToken)
|
|
}
|
|
|
|
// ListBuckets returns all user containers. The name of the bucket is a container
|
|
// id. Timestamp is omitted since it is not saved in frostfs container.
|
|
func (n *layer) ListBuckets(ctx context.Context) ([]*data.BucketInfo, error) {
|
|
return n.containerList(ctx)
|
|
}
|
|
|
|
// GetObject from storage.
|
|
func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
|
|
var params getParams
|
|
|
|
params.oid = p.ObjectInfo.ID
|
|
params.bktInfo = p.BucketInfo
|
|
|
|
var decReader *encryption.Decrypter
|
|
if p.Encryption.Enabled() {
|
|
var err error
|
|
decReader, err = getDecrypter(p)
|
|
if err != nil {
|
|
return fmt.Errorf("creating decrypter: %w", err)
|
|
}
|
|
params.off = decReader.EncryptedOffset()
|
|
params.ln = decReader.EncryptedLength()
|
|
} else {
|
|
if p.Range != nil {
|
|
if p.Range.Start > p.Range.End {
|
|
panic("invalid range")
|
|
}
|
|
params.ln = p.Range.End - p.Range.Start + 1
|
|
params.off = p.Range.Start
|
|
}
|
|
}
|
|
|
|
payload, err := n.initObjectPayloadReader(ctx, params)
|
|
if err != nil {
|
|
return fmt.Errorf("init object payload reader: %w", err)
|
|
}
|
|
|
|
bufSize := uint64(32 * 1024) // configure?
|
|
if params.ln != 0 && params.ln < bufSize {
|
|
bufSize = params.ln
|
|
}
|
|
|
|
// alloc buffer for copying
|
|
buf := make([]byte, bufSize) // sync-pool it?
|
|
|
|
r := payload
|
|
if decReader != nil {
|
|
if err = decReader.SetReader(payload); err != nil {
|
|
return fmt.Errorf("set reader to decrypter: %w", err)
|
|
}
|
|
r = io.LimitReader(decReader, int64(decReader.DecryptedLength()))
|
|
}
|
|
|
|
// copy full payload
|
|
written, err := io.CopyBuffer(p.Writer, r, buf)
|
|
if err != nil {
|
|
if decReader != nil {
|
|
return fmt.Errorf("copy object payload written: '%d', decLength: '%d', params.ln: '%d' : %w", written, decReader.DecryptedLength(), params.ln, err)
|
|
}
|
|
return fmt.Errorf("copy object payload written: '%d': %w", written, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func getDecrypter(p *GetObjectParams) (*encryption.Decrypter, error) {
|
|
var encRange *encryption.Range
|
|
if p.Range != nil {
|
|
encRange = &encryption.Range{Start: p.Range.Start, End: p.Range.End}
|
|
}
|
|
|
|
header := p.ObjectInfo.Headers[UploadCompletedParts]
|
|
if len(header) == 0 {
|
|
return encryption.NewDecrypter(p.Encryption, uint64(p.ObjectInfo.Size), encRange)
|
|
}
|
|
|
|
decryptedObjectSize, err := strconv.ParseUint(p.ObjectInfo.Headers[AttributeDecryptedSize], 10, 64)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse decrypted size: %w", err)
|
|
}
|
|
|
|
splits := strings.Split(header, ",")
|
|
sizes := make([]uint64, len(splits))
|
|
for i, splitInfo := range splits {
|
|
part, err := ParseCompletedPartHeader(splitInfo)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse completed part: %w", err)
|
|
}
|
|
sizes[i] = uint64(part.Size)
|
|
}
|
|
|
|
return encryption.NewMultipartDecrypter(p.Encryption, decryptedObjectSize, sizes, encRange)
|
|
}
|
|
|
|
// GetObjectInfo returns meta information about the object.
|
|
func (n *layer) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ObjectInfo, error) {
|
|
extendedObjectInfo, err := n.GetExtendedObjectInfo(ctx, p)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return extendedObjectInfo.ObjectInfo, nil
|
|
}
|
|
|
|
// GetExtendedObjectInfo returns meta information and corresponding info from the tree service about the object.
|
|
func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error) {
|
|
var objInfo *data.ExtendedObjectInfo
|
|
var err error
|
|
|
|
if len(p.VersionID) == 0 {
|
|
objInfo, err = n.headLastVersionIfNotDeleted(ctx, p.BktInfo, p.Object)
|
|
} else {
|
|
objInfo, err = n.headVersion(ctx, p.BktInfo, p)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
reqInfo := api.GetReqInfo(ctx)
|
|
n.log.Debug("get object",
|
|
zap.String("reqId", reqInfo.RequestID),
|
|
zap.String("bucket", p.BktInfo.Name),
|
|
zap.Stringer("cid", p.BktInfo.CID),
|
|
zap.String("object", objInfo.ObjectInfo.Name),
|
|
zap.Stringer("oid", objInfo.ObjectInfo.ID))
|
|
|
|
return objInfo, nil
|
|
}
|
|
|
|
// CopyObject from one bucket into another bucket.
|
|
func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ExtendedObjectInfo, error) {
|
|
pr, pw := io.Pipe()
|
|
|
|
go func() {
|
|
err := n.GetObject(ctx, &GetObjectParams{
|
|
ObjectInfo: p.SrcObject,
|
|
Writer: pw,
|
|
Range: p.Range,
|
|
BucketInfo: p.ScrBktInfo,
|
|
Encryption: p.Encryption,
|
|
})
|
|
|
|
if err = pw.CloseWithError(err); err != nil {
|
|
n.log.Error("could not get object", zap.Error(err))
|
|
}
|
|
}()
|
|
|
|
return n.PutObject(ctx, &PutObjectParams{
|
|
BktInfo: p.DstBktInfo,
|
|
Object: p.DstObject,
|
|
Size: p.SrcSize,
|
|
Reader: pr,
|
|
Header: p.Header,
|
|
Encryption: p.Encryption,
|
|
CopiesNumbers: p.CopiesNumbers,
|
|
})
|
|
}
|
|
|
|
func getRandomOID() (oid.ID, error) {
|
|
b := [32]byte{}
|
|
if _, err := rand.Read(b[:]); err != nil {
|
|
return oid.ID{}, err
|
|
}
|
|
|
|
var objID oid.ID
|
|
objID.SetSHA256(b)
|
|
return objID, nil
|
|
}
|
|
|
|
func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject {
|
|
if len(obj.VersionID) != 0 || settings.Unversioned() {
|
|
var nodeVersion *data.NodeVersion
|
|
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
|
|
return n.handleNotFoundError(bkt, obj)
|
|
}
|
|
|
|
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
|
|
return n.handleObjectDeleteErrors(ctx, bkt, obj, nodeVersion.ID)
|
|
}
|
|
|
|
obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeVersion.ID)
|
|
n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID)
|
|
return obj
|
|
}
|
|
|
|
lastVersion, err := n.getLastNodeVersion(ctx, bkt, obj)
|
|
if err != nil {
|
|
obj.Error = err
|
|
return n.handleNotFoundError(bkt, obj)
|
|
}
|
|
|
|
if settings.VersioningSuspended() {
|
|
obj.VersionID = data.UnversionedObjectVersionID
|
|
|
|
var nullVersionToDelete *data.NodeVersion
|
|
if lastVersion.IsUnversioned {
|
|
if !lastVersion.IsDeleteMarker() {
|
|
nullVersionToDelete = lastVersion
|
|
}
|
|
} else if nullVersionToDelete, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
|
|
if !isNotFoundError(obj.Error) {
|
|
return obj
|
|
}
|
|
}
|
|
|
|
if nullVersionToDelete != nil {
|
|
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nullVersionToDelete, obj); obj.Error != nil {
|
|
return n.handleObjectDeleteErrors(ctx, bkt, obj, nullVersionToDelete.ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
if lastVersion.IsDeleteMarker() {
|
|
obj.DeleteMarkVersion = lastVersion.OID.EncodeToString()
|
|
return obj
|
|
}
|
|
|
|
randOID, err := getRandomOID()
|
|
if err != nil {
|
|
obj.Error = fmt.Errorf("couldn't get random oid: %w", err)
|
|
return obj
|
|
}
|
|
|
|
obj.DeleteMarkVersion = randOID.EncodeToString()
|
|
|
|
newVersion := &data.NodeVersion{
|
|
BaseNodeVersion: data.BaseNodeVersion{
|
|
OID: randOID,
|
|
FilePath: obj.Name,
|
|
},
|
|
DeleteMarker: &data.DeleteMarkerInfo{
|
|
Created: TimeNow(ctx),
|
|
Owner: n.Owner(ctx),
|
|
},
|
|
IsUnversioned: settings.VersioningSuspended(),
|
|
}
|
|
|
|
if _, obj.Error = n.treeService.AddVersion(ctx, bkt, newVersion); obj.Error != nil {
|
|
return obj
|
|
}
|
|
|
|
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
|
|
|
|
return obj
|
|
}
|
|
|
|
func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
|
|
if isNotFoundError(obj.Error) {
|
|
obj.Error = nil
|
|
n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID)
|
|
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
|
|
}
|
|
|
|
return obj
|
|
}
|
|
|
|
func (n *layer) handleObjectDeleteErrors(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject, nodeID uint64) *VersionedObject {
|
|
if client.IsErrObjectAlreadyRemoved(obj.Error) {
|
|
n.log.Debug("object already removed", zap.String("bucket", bkt.Name), zap.Stringer("cid", bkt.CID),
|
|
zap.String("object", obj.Name), zap.String("oid", obj.VersionID))
|
|
|
|
obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeID)
|
|
if obj.Error != nil {
|
|
return obj
|
|
}
|
|
|
|
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
|
|
}
|
|
|
|
if client.IsErrObjectNotFound(obj.Error) {
|
|
n.log.Debug("object not found", zap.String("bucket", bkt.Name), zap.Stringer("cid", bkt.CID),
|
|
zap.String("object", obj.Name), zap.String("oid", obj.VersionID))
|
|
|
|
obj.Error = nil
|
|
|
|
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
|
|
}
|
|
|
|
return obj
|
|
}
|
|
|
|
func isNotFoundError(err error) bool {
|
|
return errors.IsS3Error(err, errors.ErrNoSuchKey) ||
|
|
errors.IsS3Error(err, errors.ErrNoSuchVersion)
|
|
}
|
|
|
|
func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
|
|
objVersion := &ObjectVersion{
|
|
BktInfo: bkt,
|
|
ObjectName: obj.Name,
|
|
VersionID: obj.VersionID,
|
|
NoErrorOnDeleteMarker: true,
|
|
}
|
|
|
|
return n.getNodeVersion(ctx, objVersion)
|
|
}
|
|
|
|
func (n *layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
|
|
objVersion := &ObjectVersion{
|
|
BktInfo: bkt,
|
|
ObjectName: obj.Name,
|
|
VersionID: "",
|
|
NoErrorOnDeleteMarker: true,
|
|
}
|
|
|
|
return n.getNodeVersion(ctx, objVersion)
|
|
}
|
|
|
|
func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) {
|
|
if nodeVersion.IsDeleteMarker() {
|
|
return obj.VersionID, nil
|
|
}
|
|
|
|
return "", n.objectDelete(ctx, bkt, nodeVersion.OID)
|
|
}
|
|
|
|
// DeleteObjects from the storage.
|
|
func (n *layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*VersionedObject {
|
|
for i, obj := range p.Objects {
|
|
p.Objects[i] = n.deleteObject(ctx, p.BktInfo, p.Settings, obj)
|
|
}
|
|
|
|
return p.Objects
|
|
}
|
|
|
|
func (n *layer) CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error) {
|
|
bktInfo, err := n.GetBucketInfo(ctx, p.Name)
|
|
if err != nil {
|
|
if errors.IsS3Error(err, errors.ErrNoSuchBucket) {
|
|
return n.createContainer(ctx, p)
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
if p.SessionContainerCreation != nil && session.IssuedBy(*p.SessionContainerCreation, bktInfo.Owner) {
|
|
return nil, errors.GetAPIError(errors.ErrBucketAlreadyOwnedByYou)
|
|
}
|
|
|
|
return nil, errors.GetAPIError(errors.ErrBucketAlreadyExists)
|
|
}
|
|
|
|
func (n *layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error) {
|
|
var cnrID cid.ID
|
|
if err := cnrID.DecodeString(name); err != nil {
|
|
if cnrID, err = n.resolver.Resolve(ctx, name); err != nil {
|
|
return cid.ID{}, err
|
|
}
|
|
|
|
reqInfo := api.GetReqInfo(ctx)
|
|
n.log.Info("resolve bucket", zap.String("reqId", reqInfo.RequestID), zap.String("bucket", name), zap.Stringer("cid", cnrID))
|
|
}
|
|
|
|
return cnrID, nil
|
|
}
|
|
|
|
func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
|
|
nodeVersions, err := n.getAllObjectsVersions(ctx, p.BktInfo, "", "")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(nodeVersions) != 0 {
|
|
return errors.GetAPIError(errors.ErrBucketNotEmpty)
|
|
}
|
|
|
|
n.cache.DeleteBucket(p.BktInfo.Name)
|
|
return n.frostFS.DeleteContainer(ctx, p.BktInfo.CID, p.SessionToken)
|
|
}
|