package layer import ( "context" "crypto/ecdsa" "crypto/rand" "encoding/json" "encoding/xml" "fmt" "io" "net/url" "strconv" "strings" "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "go.uber.org/zap" ) type ( BucketResolver interface { Resolve(ctx context.Context, name string) (cid.ID, error) } FeatureSettings interface { ClientCut() bool BufferMaxSizeForPut() uint64 MD5Enabled() bool FormContainerZone(ns string) (zone string, isDefault bool) } Layer struct { frostFS FrostFS gateOwner user.ID log *zap.Logger anonKey AnonymousKey resolver BucketResolver cache *Cache treeService TreeService features FeatureSettings } Config struct { GateOwner user.ID ChainAddress string Cache *Cache AnonKey AnonymousKey Resolver BucketResolver TreeService TreeService Features FeatureSettings } // AnonymousKey contains data for anonymous requests. AnonymousKey struct { Key *keys.PrivateKey } // GetObjectParams stores object get request parameters. GetObjectParams struct { Range *RangeParams ObjectInfo *data.ObjectInfo BucketInfo *data.BucketInfo Versioned bool Encryption encryption.Params } // HeadObjectParams stores object head request parameters. HeadObjectParams struct { BktInfo *data.BucketInfo Object string VersionID string } // RangeParams stores range header request parameters. RangeParams struct { Start uint64 End uint64 } // PutObjectParams stores object put request parameters. PutObjectParams struct { BktInfo *data.BucketInfo Object string Size uint64 Reader io.Reader Header map[string]string Lock *data.ObjectLock Encryption encryption.Params CopiesNumbers []uint32 CompleteMD5Hash string ContentMD5 string ContentSHA256Hash string } PutCombinedObjectParams struct { BktInfo *data.BucketInfo Object string Size uint64 Header map[string]string Lock *data.ObjectLock Encryption encryption.Params } DeleteObjectParams struct { BktInfo *data.BucketInfo Objects []*VersionedObject Settings *data.BucketSettings IsMultiple bool } // PutSettingsParams stores object copy request parameters. PutSettingsParams struct { BktInfo *data.BucketInfo Settings *data.BucketSettings } // PutCORSParams stores PutCORS request parameters. PutCORSParams struct { BktInfo *data.BucketInfo Reader io.Reader CopiesNumbers []uint32 NewDecoder func(io.Reader) *xml.Decoder } // CopyObjectParams stores object copy request parameters. CopyObjectParams struct { SrcVersioned bool SrcObject *data.ObjectInfo ScrBktInfo *data.BucketInfo DstBktInfo *data.BucketInfo DstObject string DstSize uint64 Header map[string]string Range *RangeParams Lock *data.ObjectLock SrcEncryption encryption.Params DstEncryption encryption.Params CopiesNumbers []uint32 } // CreateBucketParams stores bucket create request parameters. CreateBucketParams struct { Name string Namespace string Policy netmap.PlacementPolicy SessionContainerCreation *session.Container LocationConstraint string ObjectLockEnabled bool } // DeleteBucketParams stores delete bucket request parameters. DeleteBucketParams struct { BktInfo *data.BucketInfo SessionToken *session.Container } // ListObjectVersionsParams stores list objects versions parameters. ListObjectVersionsParams struct { BktInfo *data.BucketInfo Delimiter string KeyMarker string MaxKeys int Prefix string VersionIDMarker string Encode string } // VersionedObject stores info about objects to delete. VersionedObject struct { Name string VersionID string DeleteMarkVersion string DeleteMarkerEtag string Error error } ObjectPayload struct { r io.Reader params getParams encrypted bool decryptedLen uint64 } ) const ( tagPrefix = "S3-Tag-" AESEncryptionAlgorithm = "AES256" AESKeySize = 32 AttributeEncryptionAlgorithm = api.FrostFSSystemMetadataPrefix + "Algorithm" AttributeDecryptedSize = api.FrostFSSystemMetadataPrefix + "Decrypted-Size" AttributeHMACSalt = api.FrostFSSystemMetadataPrefix + "HMAC-Salt" AttributeHMACKey = api.FrostFSSystemMetadataPrefix + "HMAC-Key" AttributeFrostfsCopiesNumber = "frostfs-copies-number" // such format to match X-Amz-Meta-Frostfs-Copies-Number header ) var EncryptionMetadata = map[string]struct{}{ AttributeEncryptionAlgorithm: {}, AttributeDecryptedSize: {}, AttributeHMACSalt: {}, AttributeHMACKey: {}, } func (t *VersionedObject) String() string { return t.Name + ":" + t.VersionID } func (p HeadObjectParams) Versioned() bool { return len(p.VersionID) > 0 } // NewLayer creates an instance of a Layer. It checks credentials // and establishes gRPC connection with the node. func NewLayer(log *zap.Logger, frostFS FrostFS, config *Config) *Layer { return &Layer{ frostFS: frostFS, log: log, gateOwner: config.GateOwner, anonKey: config.AnonKey, resolver: config.Resolver, cache: config.Cache, treeService: config.TreeService, features: config.Features, } } func (n *Layer) EphemeralKey() *keys.PublicKey { return n.anonKey.Key.PublicKey() } // IsAuthenticatedRequest checks if access box exists in the current request. func IsAuthenticatedRequest(ctx context.Context) bool { _, err := middleware.GetBoxData(ctx) return err == nil } // TimeNow returns client time from request or time.Now(). func TimeNow(ctx context.Context) time.Time { if now, err := middleware.GetClientTime(ctx); err == nil { return now } return time.Now() } // BearerOwner returns owner id from BearerToken (context) or from client owner. func (n *Layer) BearerOwner(ctx context.Context) user.ID { if bd, err := middleware.GetBoxData(ctx); err == nil && bd.Gate.BearerToken != nil { return bearer.ResolveIssuer(*bd.Gate.BearerToken) } var ownerID user.ID user.IDFromKey(&ownerID, (ecdsa.PublicKey)(*n.EphemeralKey())) return ownerID } // SessionTokenForRead returns session container token. func (n *Layer) SessionTokenForRead(ctx context.Context) *session.Container { if bd, err := middleware.GetBoxData(ctx); err == nil && bd.Gate != nil { return bd.Gate.SessionToken() } return nil } func (n *Layer) reqLogger(ctx context.Context) *zap.Logger { reqLogger := middleware.GetReqLog(ctx) if reqLogger != nil { return reqLogger } return n.log } func (n *Layer) prepareAuthParameters(ctx context.Context, prm *PrmAuth, bktOwner user.ID) { if bd, err := middleware.GetBoxData(ctx); err == nil && bd.Gate.BearerToken != nil { if bd.Gate.BearerToken.Impersonate() || bktOwner.Equals(bearer.ResolveIssuer(*bd.Gate.BearerToken)) { prm.BearerToken = bd.Gate.BearerToken return } } prm.PrivateKey = &n.anonKey.Key.PrivateKey } // GetBucketInfo returns bucket info by name. func (n *Layer) GetBucketInfo(ctx context.Context, name string) (*data.BucketInfo, error) { name, err := url.QueryUnescape(name) if err != nil { return nil, fmt.Errorf("unescape bucket name: %w", err) } reqInfo := middleware.GetReqInfo(ctx) zone, _ := n.features.FormContainerZone(reqInfo.Namespace) if bktInfo := n.cache.GetBucket(zone, name); bktInfo != nil { return bktInfo, nil } containerID, err := n.ResolveBucket(ctx, name) if err != nil { if strings.Contains(err.Error(), "not found") { return nil, fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchBucket), err.Error()) } return nil, err } prm := PrmContainer{ ContainerID: containerID, SessionToken: n.SessionTokenForRead(ctx), } return n.containerInfo(ctx, prm) } // ResolveCID returns container id by name. func (n *Layer) ResolveCID(ctx context.Context, name string) (cid.ID, error) { name, err := url.QueryUnescape(name) if err != nil { return cid.ID{}, fmt.Errorf("unescape bucket name: %w", err) } reqInfo := middleware.GetReqInfo(ctx) zone, _ := n.features.FormContainerZone(reqInfo.Namespace) if bktInfo := n.cache.GetBucket(zone, name); bktInfo != nil { return bktInfo.CID, nil } return n.ResolveBucket(ctx, name) } // ListBuckets returns all user containers. The name of the bucket is a container // id. Timestamp is omitted since it is not saved in frostfs container. func (n *Layer) ListBuckets(ctx context.Context) ([]*data.BucketInfo, error) { return n.containerList(ctx) } // GetObject from storage. func (n *Layer) GetObject(ctx context.Context, p *GetObjectParams) (*ObjectPayload, error) { var params getParams params.objInfo = p.ObjectInfo params.bktInfo = p.BucketInfo var decReader *encryption.Decrypter if p.Encryption.Enabled() { var err error decReader, err = getDecrypter(p) if err != nil { return nil, fmt.Errorf("creating decrypter: %w", err) } params.off = decReader.EncryptedOffset() params.ln = decReader.EncryptedLength() } else { if p.Range != nil { if p.Range.Start > p.Range.End { panic("invalid range") } params.ln = p.Range.End - p.Range.Start + 1 params.off = p.Range.Start } } r, err := n.initObjectPayloadReader(ctx, params) if err != nil { if client.IsErrObjectNotFound(err) { if p.Versioned { err = fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchVersion), err.Error()) } else { err = fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchKey), err.Error()) } } return nil, fmt.Errorf("init object payload reader: %w", err) } var decryptedLen uint64 if decReader != nil { if err = decReader.SetReader(r); err != nil { return nil, fmt.Errorf("set reader to decrypter: %w", err) } r = io.LimitReader(decReader, int64(decReader.DecryptedLength())) decryptedLen = decReader.DecryptedLength() } return &ObjectPayload{ r: r, params: params, encrypted: decReader != nil, decryptedLen: decryptedLen, }, nil } // Read implements io.Reader. If you want to use ObjectPayload as io.Reader // you must not use ObjectPayload.StreamTo method and vice versa. func (o *ObjectPayload) Read(p []byte) (int, error) { return o.r.Read(p) } // StreamTo reads all payload to provided writer. // If you want to use this method you must not use ObjectPayload.Read and vice versa. func (o *ObjectPayload) StreamTo(w io.Writer) error { bufSize := uint64(32 * 1024) // configure? if o.params.ln != 0 && o.params.ln < bufSize { bufSize = o.params.ln } // alloc buffer for copying buf := make([]byte, bufSize) // sync-pool it? // copy full payload written, err := io.CopyBuffer(w, o.r, buf) if err != nil { if o.encrypted { return fmt.Errorf("copy object payload written: '%d', decLength: '%d', params.ln: '%d' : %w", written, o.decryptedLen, o.params.ln, err) } return fmt.Errorf("copy object payload written: '%d': %w", written, err) } return nil } func getDecrypter(p *GetObjectParams) (*encryption.Decrypter, error) { var encRange *encryption.Range if p.Range != nil { encRange = &encryption.Range{Start: p.Range.Start, End: p.Range.End} } header := p.ObjectInfo.Headers[UploadCompletedParts] if len(header) == 0 { return encryption.NewDecrypter(p.Encryption, uint64(p.ObjectInfo.Size), encRange) } decryptedObjectSize, err := strconv.ParseUint(p.ObjectInfo.Headers[AttributeDecryptedSize], 10, 64) if err != nil { return nil, fmt.Errorf("parse decrypted size: %w", err) } splits := strings.Split(header, ",") sizes := make([]uint64, len(splits)) for i, splitInfo := range splits { part, err := ParseCompletedPartHeader(splitInfo) if err != nil { return nil, fmt.Errorf("parse completed part: %w", err) } sizes[i] = uint64(part.Size) } return encryption.NewMultipartDecrypter(p.Encryption, decryptedObjectSize, sizes, encRange) } // GetObjectInfo returns meta information about the object. func (n *Layer) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ObjectInfo, error) { extendedObjectInfo, err := n.GetExtendedObjectInfo(ctx, p) if err != nil { return nil, err } return extendedObjectInfo.ObjectInfo, nil } // GetExtendedObjectInfo returns meta information and corresponding info from the tree service about the object. func (n *Layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error) { var objInfo *data.ExtendedObjectInfo var err error if p.Versioned() { objInfo, err = n.headVersion(ctx, p.BktInfo, p) } else { objInfo, err = n.headLastVersionIfNotDeleted(ctx, p.BktInfo, p.Object) } if err != nil { return nil, err } n.reqLogger(ctx).Debug(logs.GetObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", objInfo.ObjectInfo.ID)) return objInfo, nil } // CopyObject from one bucket into another bucket. func (n *Layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ExtendedObjectInfo, error) { objPayload, err := n.GetObject(ctx, &GetObjectParams{ ObjectInfo: p.SrcObject, Versioned: p.SrcVersioned, Range: p.Range, BucketInfo: p.ScrBktInfo, Encryption: p.SrcEncryption, }) if err != nil { return nil, fmt.Errorf("get object to copy: %w", err) } return n.PutObject(ctx, &PutObjectParams{ BktInfo: p.DstBktInfo, Object: p.DstObject, Size: p.DstSize, Reader: objPayload, Header: p.Header, Encryption: p.DstEncryption, CopiesNumbers: p.CopiesNumbers, }) } func getRandomOID() (oid.ID, error) { b := [32]byte{} if _, err := rand.Read(b[:]); err != nil { return oid.ID{}, err } var objID oid.ID objID.SetSHA256(b) return objID, nil } func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject { if len(obj.VersionID) != 0 || settings.Unversioned() { var nodeVersion *data.NodeVersion if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil { return n.handleNotFoundError(bkt, obj) } if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil { return n.handleObjectDeleteErrors(ctx, bkt, obj, nodeVersion.ID) } obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeVersion.ID) n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID) return obj } lastVersion, err := n.getLastNodeVersion(ctx, bkt, obj) if err != nil { obj.Error = err return n.handleNotFoundError(bkt, obj) } if settings.VersioningSuspended() { obj.VersionID = data.UnversionedObjectVersionID var nullVersionToDelete *data.NodeVersion if lastVersion.IsUnversioned { if !lastVersion.IsDeleteMarker { nullVersionToDelete = lastVersion } } else if nullVersionToDelete, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil { if !isNotFoundError(obj.Error) { return obj } } if nullVersionToDelete != nil { if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nullVersionToDelete, obj); obj.Error != nil { return n.handleObjectDeleteErrors(ctx, bkt, obj, nullVersionToDelete.ID) } } } if lastVersion.IsDeleteMarker { obj.DeleteMarkVersion = lastVersion.OID.EncodeToString() return obj } randOID, err := getRandomOID() if err != nil { obj.Error = fmt.Errorf("couldn't get random oid: %w", err) return obj } obj.DeleteMarkVersion = randOID.EncodeToString() now := TimeNow(ctx) newVersion := &data.NodeVersion{ BaseNodeVersion: data.BaseNodeVersion{ OID: randOID, FilePath: obj.Name, Created: &now, Owner: &n.gateOwner, IsDeleteMarker: true, }, IsUnversioned: settings.VersioningSuspended(), } if _, obj.Error = n.treeService.AddVersion(ctx, bkt, newVersion); obj.Error != nil { return obj } n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name) return obj } func (n *Layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject { if isNotFoundError(obj.Error) { obj.Error = nil n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID) n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name) } return obj } func (n *Layer) handleObjectDeleteErrors(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject, nodeID uint64) *VersionedObject { if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) { return obj } n.reqLogger(ctx).Debug(logs.CouldntDeleteObjectFromStorageContinueDeleting, zap.Stringer("cid", bkt.CID), zap.String("oid", obj.VersionID), zap.Error(obj.Error)) obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeID) if obj.Error == nil { n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name) } return obj } func isNotFoundError(err error) bool { return errors.IsS3Error(err, errors.ErrNoSuchKey) || errors.IsS3Error(err, errors.ErrNoSuchVersion) } func (n *Layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) { objVersion := &data.ObjectVersion{ BktInfo: bkt, ObjectName: obj.Name, VersionID: obj.VersionID, NoErrorOnDeleteMarker: true, } return n.getNodeVersion(ctx, objVersion) } func (n *Layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) { objVersion := &data.ObjectVersion{ BktInfo: bkt, ObjectName: obj.Name, VersionID: "", NoErrorOnDeleteMarker: true, } return n.getNodeVersion(ctx, objVersion) } func (n *Layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) { if nodeVersion.IsDeleteMarker { return obj.VersionID, nil } if nodeVersion.IsCombined { return "", n.removeCombinedObject(ctx, bkt, nodeVersion) } return "", n.objectDelete(ctx, bkt, nodeVersion.OID) } func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion) error { combinedObj, err := n.objectGet(ctx, bkt, nodeVersion.OID) if err != nil { return fmt.Errorf("get combined object '%s': %w", nodeVersion.OID.EncodeToString(), err) } var parts []*data.PartInfo if err = json.Unmarshal(combinedObj.Payload(), &parts); err != nil { return fmt.Errorf("unmarshal combined object parts: %w", err) } for _, part := range parts { if err = n.objectDelete(ctx, bkt, part.OID); err == nil { continue } if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) { return fmt.Errorf("couldn't delete part '%s': %w", part.OID.EncodeToString(), err) } n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", bkt.CID.EncodeToString()), zap.String("oid", part.OID.EncodeToString()), zap.Int("part number", part.Number), zap.Error(err)) } return n.objectDelete(ctx, bkt, nodeVersion.OID) } // DeleteObjects from the storage. func (n *Layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*VersionedObject { for i, obj := range p.Objects { p.Objects[i] = n.deleteObject(ctx, p.BktInfo, p.Settings, obj) if p.IsMultiple && p.Objects[i].Error != nil { n.reqLogger(ctx).Error(logs.CouldntDeleteObject, zap.String("object", obj.String()), zap.Error(p.Objects[i].Error)) } } return p.Objects } func (n *Layer) CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error) { bktInfo, err := n.GetBucketInfo(ctx, p.Name) if err != nil { if errors.IsS3Error(err, errors.ErrNoSuchBucket) { return n.createContainer(ctx, p) } return nil, err } if p.SessionContainerCreation != nil && session.IssuedBy(*p.SessionContainerCreation, bktInfo.Owner) { return nil, errors.GetAPIError(errors.ErrBucketAlreadyOwnedByYou) } return nil, errors.GetAPIError(errors.ErrBucketAlreadyExists) } func (n *Layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error) { var cnrID cid.ID if err := cnrID.DecodeString(name); err != nil { if cnrID, err = n.resolver.Resolve(ctx, name); err != nil { return cid.ID{}, err } n.reqLogger(ctx).Info(logs.ResolveBucket, zap.Stringer("cid", cnrID)) } return cnrID, nil } func (n *Layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error { res, _, err := n.getAllObjectsVersions(ctx, commonVersionsListingParams{ BktInfo: p.BktInfo, MaxKeys: 1, }) if err != nil { return err } if len(res) != 0 { return errors.GetAPIError(errors.ErrBucketNotEmpty) } n.cache.DeleteBucket(p.BktInfo) return n.frostFS.DeleteContainer(ctx, p.BktInfo.CID, p.SessionToken) }