[#410] Drop layer.Client interface
All checks were successful
/ DCO (pull_request) Successful in 2m1s
/ Vulncheck (pull_request) Successful in 2m31s
/ Builds (1.20) (pull_request) Successful in 2m39s
/ Builds (1.21) (pull_request) Successful in 2m31s
/ Lint (pull_request) Successful in 3m14s
/ Tests (1.20) (pull_request) Successful in 2m34s
/ Tests (1.21) (pull_request) Successful in 2m10s
All checks were successful
/ DCO (pull_request) Successful in 2m1s
/ Vulncheck (pull_request) Successful in 2m31s
/ Builds (1.20) (pull_request) Successful in 2m39s
/ Builds (1.21) (pull_request) Successful in 2m31s
/ Lint (pull_request) Successful in 3m14s
/ Tests (1.20) (pull_request) Successful in 2m34s
/ Tests (1.21) (pull_request) Successful in 2m10s
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
9432782ce6
commit
414f3943e2
14 changed files with 91 additions and 151 deletions
|
@ -20,7 +20,7 @@ import (
|
|||
type (
|
||||
handler struct {
|
||||
log *zap.Logger
|
||||
obj layer.Client
|
||||
obj *layer.Layer
|
||||
cfg Config
|
||||
ape APE
|
||||
frostfsid FrostFSID
|
||||
|
@ -68,7 +68,7 @@ const (
|
|||
var _ api.Handler = (*handler)(nil)
|
||||
|
||||
// New creates new api.Handler using given logger and client.
|
||||
func New(log *zap.Logger, obj layer.Client, cfg Config, storage APE, ffsid FrostFSID) (api.Handler, error) {
|
||||
func New(log *zap.Logger, obj *layer.Layer, cfg Config, storage APE, ffsid FrostFSID) (api.Handler, error) {
|
||||
switch {
|
||||
case obj == nil:
|
||||
return nil, errors.New("empty FrostFS Object Layer")
|
||||
|
|
|
@ -58,7 +58,7 @@ func (hc *handlerContext) MockedPool() *layer.TestFrostFS {
|
|||
return hc.tp
|
||||
}
|
||||
|
||||
func (hc *handlerContext) Layer() layer.Client {
|
||||
func (hc *handlerContext) Layer() *layer.Layer {
|
||||
return hc.h.obj
|
||||
}
|
||||
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
)
|
||||
|
||||
func (n *layer) GetObjectTaggingAndLock(ctx context.Context, objVersion *data.ObjectVersion, nodeVersion *data.NodeVersion) (map[string]string, data.LockInfo, error) {
|
||||
func (n *Layer) GetObjectTaggingAndLock(ctx context.Context, objVersion *data.ObjectVersion, nodeVersion *data.NodeVersion) (map[string]string, data.LockInfo, error) {
|
||||
var err error
|
||||
owner := n.BearerOwner(ctx)
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ const (
|
|||
AttributeLockEnabled = "LockEnabled"
|
||||
)
|
||||
|
||||
func (n *layer) containerInfo(ctx context.Context, prm PrmContainer) (*data.BucketInfo, error) {
|
||||
func (n *Layer) containerInfo(ctx context.Context, prm PrmContainer) (*data.BucketInfo, error) {
|
||||
var (
|
||||
err error
|
||||
res *container.Container
|
||||
|
@ -87,7 +87,7 @@ func (n *layer) containerInfo(ctx context.Context, prm PrmContainer) (*data.Buck
|
|||
return info, nil
|
||||
}
|
||||
|
||||
func (n *layer) containerList(ctx context.Context) ([]*data.BucketInfo, error) {
|
||||
func (n *Layer) containerList(ctx context.Context) ([]*data.BucketInfo, error) {
|
||||
stoken := n.SessionTokenForRead(ctx)
|
||||
|
||||
prm := PrmUserContainers{
|
||||
|
@ -119,7 +119,7 @@ func (n *layer) containerList(ctx context.Context) ([]*data.BucketInfo, error) {
|
|||
return list, nil
|
||||
}
|
||||
|
||||
func (n *layer) createContainer(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error) {
|
||||
func (n *Layer) createContainer(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error) {
|
||||
if p.LocationConstraint == "" {
|
||||
p.LocationConstraint = api.DefaultLocationConstraint // s3tests_boto3.functional.test_s3:test_bucket_get_location
|
||||
}
|
||||
|
@ -173,13 +173,13 @@ func (n *layer) createContainer(ctx context.Context, p *CreateBucketParams) (*da
|
|||
return bktInfo, nil
|
||||
}
|
||||
|
||||
func (n *layer) setContainerEACLTable(ctx context.Context, idCnr cid.ID, table *eacl.Table, sessionToken *session.Container) error {
|
||||
func (n *Layer) setContainerEACLTable(ctx context.Context, idCnr cid.ID, table *eacl.Table, sessionToken *session.Container) error {
|
||||
table.SetCID(idCnr)
|
||||
|
||||
return n.frostFS.SetContainerEACL(ctx, *table, sessionToken)
|
||||
}
|
||||
|
||||
func (n *layer) GetContainerEACL(ctx context.Context, cnrID cid.ID) (*eacl.Table, error) {
|
||||
func (n *Layer) GetContainerEACL(ctx context.Context, cnrID cid.ID) (*eacl.Table, error) {
|
||||
prm := PrmContainerEACL{
|
||||
ContainerID: cnrID,
|
||||
SessionToken: n.SessionTokenForRead(ctx),
|
||||
|
|
|
@ -17,7 +17,7 @@ const wildcard = "*"
|
|||
|
||||
var supportedMethods = map[string]struct{}{"GET": {}, "HEAD": {}, "POST": {}, "PUT": {}, "DELETE": {}}
|
||||
|
||||
func (n *layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
|
||||
func (n *Layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
|
||||
var (
|
||||
buf bytes.Buffer
|
||||
tee = io.TeeReader(p.Reader, &buf)
|
||||
|
@ -68,7 +68,7 @@ func (n *layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (n *layer) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*data.CORSConfiguration, error) {
|
||||
func (n *Layer) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*data.CORSConfiguration, error) {
|
||||
cors, err := n.getCORS(ctx, bktInfo)
|
||||
if err != nil {
|
||||
if errorsStd.Is(err, ErrNodeNotFound) {
|
||||
|
@ -80,7 +80,7 @@ func (n *layer) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*d
|
|||
return cors, nil
|
||||
}
|
||||
|
||||
func (n *layer) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error {
|
||||
func (n *Layer) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error {
|
||||
objID, err := n.treeService.DeleteBucketCORS(ctx, bktInfo)
|
||||
objIDNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
|
||||
if err != nil && !objIDNotFound {
|
||||
|
|
|
@ -43,7 +43,7 @@ type (
|
|||
FormContainerZone(ns string) (zone string, isDefault bool)
|
||||
}
|
||||
|
||||
layer struct {
|
||||
Layer struct {
|
||||
frostFS FrostFS
|
||||
gateOwner user.ID
|
||||
log *zap.Logger
|
||||
|
@ -199,64 +199,6 @@ type (
|
|||
encrypted bool
|
||||
decryptedLen uint64
|
||||
}
|
||||
|
||||
// Client provides S3 API client interface.
|
||||
Client interface {
|
||||
EphemeralKey() *keys.PublicKey
|
||||
|
||||
GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error)
|
||||
PutBucketSettings(ctx context.Context, p *PutSettingsParams) error
|
||||
|
||||
PutBucketCORS(ctx context.Context, p *PutCORSParams) error
|
||||
GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*data.CORSConfiguration, error)
|
||||
DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error
|
||||
|
||||
ListBuckets(ctx context.Context) ([]*data.BucketInfo, error)
|
||||
GetBucketInfo(ctx context.Context, name string) (*data.BucketInfo, error)
|
||||
ResolveCID(ctx context.Context, name string) (cid.ID, error)
|
||||
GetBucketACL(ctx context.Context, bktInfo *data.BucketInfo) (*BucketACL, error)
|
||||
PutBucketACL(ctx context.Context, p *PutBucketACLParams) error
|
||||
CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error)
|
||||
DeleteBucket(ctx context.Context, p *DeleteBucketParams) error
|
||||
|
||||
GetObject(ctx context.Context, p *GetObjectParams) (*ObjectPayload, error)
|
||||
GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ObjectInfo, error)
|
||||
GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error)
|
||||
|
||||
GetLockInfo(ctx context.Context, obj *data.ObjectVersion) (*data.LockInfo, error)
|
||||
PutLockInfo(ctx context.Context, p *PutLockInfoParams) error
|
||||
|
||||
GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error)
|
||||
PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error
|
||||
DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error
|
||||
|
||||
GetObjectTagging(ctx context.Context, p *data.GetObjectTaggingParams) (string, map[string]string, error)
|
||||
PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingParams) error
|
||||
DeleteObjectTagging(ctx context.Context, p *data.ObjectVersion) error
|
||||
|
||||
PutObject(ctx context.Context, p *PutObjectParams) (*data.ExtendedObjectInfo, error)
|
||||
|
||||
CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ExtendedObjectInfo, error)
|
||||
|
||||
ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error)
|
||||
ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error)
|
||||
ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error)
|
||||
|
||||
DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*VersionedObject
|
||||
|
||||
CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error
|
||||
CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ExtendedObjectInfo, error)
|
||||
UploadPart(ctx context.Context, p *UploadPartParams) (string, error)
|
||||
UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error)
|
||||
ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error)
|
||||
AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error
|
||||
ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error)
|
||||
|
||||
// Compound methods for optimizations
|
||||
|
||||
// GetObjectTaggingAndLock unifies GetObjectTagging and GetLock methods in single tree service invocation.
|
||||
GetObjectTaggingAndLock(ctx context.Context, p *data.ObjectVersion, nodeVersion *data.NodeVersion) (map[string]string, data.LockInfo, error)
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -287,10 +229,10 @@ func (p HeadObjectParams) Versioned() bool {
|
|||
return len(p.VersionID) > 0
|
||||
}
|
||||
|
||||
// NewLayer creates an instance of a layer. It checks credentials
|
||||
// NewLayer creates an instance of a Layer. It checks credentials
|
||||
// and establishes gRPC connection with the node.
|
||||
func NewLayer(log *zap.Logger, frostFS FrostFS, config *Config) Client {
|
||||
return &layer{
|
||||
func NewLayer(log *zap.Logger, frostFS FrostFS, config *Config) *Layer {
|
||||
return &Layer{
|
||||
frostFS: frostFS,
|
||||
log: log,
|
||||
gateOwner: config.GateOwner,
|
||||
|
@ -302,7 +244,7 @@ func NewLayer(log *zap.Logger, frostFS FrostFS, config *Config) Client {
|
|||
}
|
||||
}
|
||||
|
||||
func (n *layer) EphemeralKey() *keys.PublicKey {
|
||||
func (n *Layer) EphemeralKey() *keys.PublicKey {
|
||||
return n.anonKey.Key.PublicKey()
|
||||
}
|
||||
|
||||
|
@ -322,7 +264,7 @@ func TimeNow(ctx context.Context) time.Time {
|
|||
}
|
||||
|
||||
// BearerOwner returns owner id from BearerToken (context) or from client owner.
|
||||
func (n *layer) BearerOwner(ctx context.Context) user.ID {
|
||||
func (n *Layer) BearerOwner(ctx context.Context) user.ID {
|
||||
if bd, err := middleware.GetBoxData(ctx); err == nil && bd.Gate.BearerToken != nil {
|
||||
return bearer.ResolveIssuer(*bd.Gate.BearerToken)
|
||||
}
|
||||
|
@ -334,7 +276,7 @@ func (n *layer) BearerOwner(ctx context.Context) user.ID {
|
|||
}
|
||||
|
||||
// SessionTokenForRead returns session container token.
|
||||
func (n *layer) SessionTokenForRead(ctx context.Context) *session.Container {
|
||||
func (n *Layer) SessionTokenForRead(ctx context.Context) *session.Container {
|
||||
if bd, err := middleware.GetBoxData(ctx); err == nil && bd.Gate != nil {
|
||||
return bd.Gate.SessionToken()
|
||||
}
|
||||
|
@ -342,7 +284,7 @@ func (n *layer) SessionTokenForRead(ctx context.Context) *session.Container {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (n *layer) reqLogger(ctx context.Context) *zap.Logger {
|
||||
func (n *Layer) reqLogger(ctx context.Context) *zap.Logger {
|
||||
reqLogger := middleware.GetReqLog(ctx)
|
||||
if reqLogger != nil {
|
||||
return reqLogger
|
||||
|
@ -350,7 +292,7 @@ func (n *layer) reqLogger(ctx context.Context) *zap.Logger {
|
|||
return n.log
|
||||
}
|
||||
|
||||
func (n *layer) prepareAuthParameters(ctx context.Context, prm *PrmAuth, bktOwner user.ID) {
|
||||
func (n *Layer) prepareAuthParameters(ctx context.Context, prm *PrmAuth, bktOwner user.ID) {
|
||||
if bd, err := middleware.GetBoxData(ctx); err == nil && bd.Gate.BearerToken != nil {
|
||||
if bd.Gate.BearerToken.Impersonate() || bktOwner.Equals(bearer.ResolveIssuer(*bd.Gate.BearerToken)) {
|
||||
prm.BearerToken = bd.Gate.BearerToken
|
||||
|
@ -362,7 +304,7 @@ func (n *layer) prepareAuthParameters(ctx context.Context, prm *PrmAuth, bktOwne
|
|||
}
|
||||
|
||||
// GetBucketInfo returns bucket info by name.
|
||||
func (n *layer) GetBucketInfo(ctx context.Context, name string) (*data.BucketInfo, error) {
|
||||
func (n *Layer) GetBucketInfo(ctx context.Context, name string) (*data.BucketInfo, error) {
|
||||
name, err := url.QueryUnescape(name)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unescape bucket name: %w", err)
|
||||
|
@ -392,7 +334,7 @@ func (n *layer) GetBucketInfo(ctx context.Context, name string) (*data.BucketInf
|
|||
}
|
||||
|
||||
// ResolveCID returns container id by name.
|
||||
func (n *layer) ResolveCID(ctx context.Context, name string) (cid.ID, error) {
|
||||
func (n *Layer) ResolveCID(ctx context.Context, name string) (cid.ID, error) {
|
||||
name, err := url.QueryUnescape(name)
|
||||
if err != nil {
|
||||
return cid.ID{}, fmt.Errorf("unescape bucket name: %w", err)
|
||||
|
@ -409,7 +351,7 @@ func (n *layer) ResolveCID(ctx context.Context, name string) (cid.ID, error) {
|
|||
}
|
||||
|
||||
// GetBucketACL returns bucket acl info by name.
|
||||
func (n *layer) GetBucketACL(ctx context.Context, bktInfo *data.BucketInfo) (*BucketACL, error) {
|
||||
func (n *Layer) GetBucketACL(ctx context.Context, bktInfo *data.BucketInfo) (*BucketACL, error) {
|
||||
eACL, err := n.GetContainerEACL(ctx, bktInfo.CID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get container eacl: %w", err)
|
||||
|
@ -422,18 +364,18 @@ func (n *layer) GetBucketACL(ctx context.Context, bktInfo *data.BucketInfo) (*Bu
|
|||
}
|
||||
|
||||
// PutBucketACL puts bucket acl by name.
|
||||
func (n *layer) PutBucketACL(ctx context.Context, param *PutBucketACLParams) error {
|
||||
func (n *Layer) PutBucketACL(ctx context.Context, param *PutBucketACLParams) error {
|
||||
return n.setContainerEACLTable(ctx, param.BktInfo.CID, param.EACL, param.SessionToken)
|
||||
}
|
||||
|
||||
// ListBuckets returns all user containers. The name of the bucket is a container
|
||||
// id. Timestamp is omitted since it is not saved in frostfs container.
|
||||
func (n *layer) ListBuckets(ctx context.Context) ([]*data.BucketInfo, error) {
|
||||
func (n *Layer) ListBuckets(ctx context.Context) ([]*data.BucketInfo, error) {
|
||||
return n.containerList(ctx)
|
||||
}
|
||||
|
||||
// GetObject from storage.
|
||||
func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) (*ObjectPayload, error) {
|
||||
func (n *Layer) GetObject(ctx context.Context, p *GetObjectParams) (*ObjectPayload, error) {
|
||||
var params getParams
|
||||
|
||||
params.objInfo = p.ObjectInfo
|
||||
|
@ -547,7 +489,7 @@ func getDecrypter(p *GetObjectParams) (*encryption.Decrypter, error) {
|
|||
}
|
||||
|
||||
// GetObjectInfo returns meta information about the object.
|
||||
func (n *layer) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ObjectInfo, error) {
|
||||
func (n *Layer) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ObjectInfo, error) {
|
||||
extendedObjectInfo, err := n.GetExtendedObjectInfo(ctx, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -557,7 +499,7 @@ func (n *layer) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.O
|
|||
}
|
||||
|
||||
// GetExtendedObjectInfo returns meta information and corresponding info from the tree service about the object.
|
||||
func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error) {
|
||||
func (n *Layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error) {
|
||||
var objInfo *data.ExtendedObjectInfo
|
||||
var err error
|
||||
|
||||
|
@ -578,7 +520,7 @@ func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams)
|
|||
}
|
||||
|
||||
// CopyObject from one bucket into another bucket.
|
||||
func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ExtendedObjectInfo, error) {
|
||||
func (n *Layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ExtendedObjectInfo, error) {
|
||||
objPayload, err := n.GetObject(ctx, &GetObjectParams{
|
||||
ObjectInfo: p.SrcObject,
|
||||
Versioned: p.SrcVersioned,
|
||||
|
@ -612,7 +554,7 @@ func getRandomOID() (oid.ID, error) {
|
|||
return objID, nil
|
||||
}
|
||||
|
||||
func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject {
|
||||
func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject {
|
||||
if len(obj.VersionID) != 0 || settings.Unversioned() {
|
||||
var nodeVersion *data.NodeVersion
|
||||
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
|
||||
|
@ -688,7 +630,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
|||
return obj
|
||||
}
|
||||
|
||||
func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
|
||||
func (n *Layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
|
||||
if isNotFoundError(obj.Error) {
|
||||
obj.Error = nil
|
||||
n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID)
|
||||
|
@ -698,7 +640,7 @@ func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject)
|
|||
return obj
|
||||
}
|
||||
|
||||
func (n *layer) handleObjectDeleteErrors(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject, nodeID uint64) *VersionedObject {
|
||||
func (n *Layer) handleObjectDeleteErrors(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject, nodeID uint64) *VersionedObject {
|
||||
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
|
||||
return obj
|
||||
}
|
||||
|
@ -719,7 +661,7 @@ func isNotFoundError(err error) bool {
|
|||
errors.IsS3Error(err, errors.ErrNoSuchVersion)
|
||||
}
|
||||
|
||||
func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
|
||||
func (n *Layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
|
||||
objVersion := &data.ObjectVersion{
|
||||
BktInfo: bkt,
|
||||
ObjectName: obj.Name,
|
||||
|
@ -730,7 +672,7 @@ func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo
|
|||
return n.getNodeVersion(ctx, objVersion)
|
||||
}
|
||||
|
||||
func (n *layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
|
||||
func (n *Layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
|
||||
objVersion := &data.ObjectVersion{
|
||||
BktInfo: bkt,
|
||||
ObjectName: obj.Name,
|
||||
|
@ -741,7 +683,7 @@ func (n *layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, ob
|
|||
return n.getNodeVersion(ctx, objVersion)
|
||||
}
|
||||
|
||||
func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) {
|
||||
func (n *Layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) {
|
||||
if nodeVersion.IsDeleteMarker {
|
||||
return obj.VersionID, nil
|
||||
}
|
||||
|
@ -753,7 +695,7 @@ func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, node
|
|||
return "", n.objectDelete(ctx, bkt, nodeVersion.OID)
|
||||
}
|
||||
|
||||
func (n *layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion) error {
|
||||
func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion) error {
|
||||
combinedObj, err := n.objectGet(ctx, bkt, nodeVersion.OID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get combined object '%s': %w", nodeVersion.OID.EncodeToString(), err)
|
||||
|
@ -781,7 +723,7 @@ func (n *layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
|
|||
}
|
||||
|
||||
// DeleteObjects from the storage.
|
||||
func (n *layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*VersionedObject {
|
||||
func (n *Layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*VersionedObject {
|
||||
for i, obj := range p.Objects {
|
||||
p.Objects[i] = n.deleteObject(ctx, p.BktInfo, p.Settings, obj)
|
||||
if p.IsMultiple && p.Objects[i].Error != nil {
|
||||
|
@ -792,7 +734,7 @@ func (n *layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*Ver
|
|||
return p.Objects
|
||||
}
|
||||
|
||||
func (n *layer) CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error) {
|
||||
func (n *Layer) CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error) {
|
||||
bktInfo, err := n.GetBucketInfo(ctx, p.Name)
|
||||
if err != nil {
|
||||
if errors.IsS3Error(err, errors.ErrNoSuchBucket) {
|
||||
|
@ -808,7 +750,7 @@ func (n *layer) CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.
|
|||
return nil, errors.GetAPIError(errors.ErrBucketAlreadyExists)
|
||||
}
|
||||
|
||||
func (n *layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error) {
|
||||
func (n *Layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error) {
|
||||
var cnrID cid.ID
|
||||
if err := cnrID.DecodeString(name); err != nil {
|
||||
if cnrID, err = n.resolver.Resolve(ctx, name); err != nil {
|
||||
|
@ -821,7 +763,7 @@ func (n *layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error)
|
|||
return cnrID, nil
|
||||
}
|
||||
|
||||
func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
|
||||
func (n *Layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
|
||||
res, _, err := n.getAllObjectsVersions(ctx, commonVersionsListingParams{
|
||||
BktInfo: p.BktInfo,
|
||||
MaxKeys: 1,
|
||||
|
|
|
@ -96,7 +96,7 @@ const (
|
|||
)
|
||||
|
||||
// ListObjectsV1 returns objects in a bucket for requests of Version 1.
|
||||
func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) {
|
||||
func (n *Layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) {
|
||||
var result ListObjectsInfoV1
|
||||
|
||||
prm := commonLatestVersionsListingParams{
|
||||
|
@ -127,7 +127,7 @@ func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*Lis
|
|||
}
|
||||
|
||||
// ListObjectsV2 returns objects in a bucket for requests of Version 2.
|
||||
func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) {
|
||||
func (n *Layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) {
|
||||
var result ListObjectsInfoV2
|
||||
|
||||
prm := commonLatestVersionsListingParams{
|
||||
|
@ -157,7 +157,7 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
|
|||
return &result, nil
|
||||
}
|
||||
|
||||
func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
|
||||
func (n *Layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
|
||||
prm := commonVersionsListingParams{
|
||||
BktInfo: p.BktInfo,
|
||||
Delimiter: p.Delimiter,
|
||||
|
@ -188,7 +188,7 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func (n *layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVersionsListingParams) (objects []*data.ExtendedNodeVersion, next *data.ExtendedNodeVersion, err error) {
|
||||
func (n *Layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVersionsListingParams) (objects []*data.ExtendedNodeVersion, next *data.ExtendedNodeVersion, err error) {
|
||||
if p.MaxKeys == 0 {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
@ -225,7 +225,7 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVers
|
|||
return
|
||||
}
|
||||
|
||||
func (n *layer) getAllObjectsVersions(ctx context.Context, p commonVersionsListingParams) ([]*data.ExtendedNodeVersion, bool, error) {
|
||||
func (n *Layer) getAllObjectsVersions(ctx context.Context, p commonVersionsListingParams) ([]*data.ExtendedNodeVersion, bool, error) {
|
||||
if p.MaxKeys == 0 {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
@ -301,15 +301,15 @@ func formVersionsListRow(objects []*data.ExtendedNodeVersion, rowStartIndex int,
|
|||
}
|
||||
}
|
||||
|
||||
func (n *layer) getListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams) (*data.ListSession, error) {
|
||||
func (n *Layer) getListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams) (*data.ListSession, error) {
|
||||
return n.getListVersionsSession(ctx, p.commonVersionsListingParams, true)
|
||||
}
|
||||
|
||||
func (n *layer) getListAllVersionsSession(ctx context.Context, p commonVersionsListingParams) (*data.ListSession, error) {
|
||||
func (n *Layer) getListAllVersionsSession(ctx context.Context, p commonVersionsListingParams) (*data.ListSession, error) {
|
||||
return n.getListVersionsSession(ctx, p, false)
|
||||
}
|
||||
|
||||
func (n *layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (*data.ListSession, error) {
|
||||
func (n *Layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (*data.ListSession, error) {
|
||||
owner := n.BearerOwner(ctx)
|
||||
|
||||
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark)
|
||||
|
@ -329,7 +329,7 @@ func (n *layer) getListVersionsSession(ctx context.Context, p commonVersionsList
|
|||
return session, nil
|
||||
}
|
||||
|
||||
func (n *layer) initNewVersionsByPrefixSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) {
|
||||
func (n *Layer) initNewVersionsByPrefixSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) {
|
||||
session = &data.ListSession{NamesMap: make(map[string]struct{})}
|
||||
session.Context, session.Cancel = context.WithCancel(context.Background())
|
||||
|
||||
|
@ -345,7 +345,7 @@ func (n *layer) initNewVersionsByPrefixSession(ctx context.Context, p commonVers
|
|||
return session, nil
|
||||
}
|
||||
|
||||
func (n *layer) putListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) {
|
||||
func (n *Layer) putListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) {
|
||||
if len(allObjects) <= p.MaxKeys {
|
||||
return
|
||||
}
|
||||
|
@ -366,7 +366,7 @@ func (n *layer) putListLatestVersionsSession(ctx context.Context, p commonLatest
|
|||
n.cache.PutListSession(n.BearerOwner(ctx), cacheKey, session)
|
||||
}
|
||||
|
||||
func (n *layer) putListAllVersionsSession(ctx context.Context, p commonVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) {
|
||||
func (n *Layer) putListAllVersionsSession(ctx context.Context, p commonVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) {
|
||||
if len(allObjects) <= p.MaxKeys {
|
||||
return
|
||||
}
|
||||
|
@ -498,7 +498,7 @@ func nodesGeneratorVersions(ctx context.Context, p commonVersionsListingParams,
|
|||
return nodeCh, errCh
|
||||
}
|
||||
|
||||
func (n *layer) initWorkerPool(ctx context.Context, size int, p commonVersionsListingParams, input <-chan *data.ExtendedNodeVersion) (<-chan *data.ExtendedNodeVersion, error) {
|
||||
func (n *Layer) initWorkerPool(ctx context.Context, size int, p commonVersionsListingParams, input <-chan *data.ExtendedNodeVersion) (<-chan *data.ExtendedNodeVersion, error) {
|
||||
reqLog := n.reqLogger(ctx)
|
||||
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
|
||||
if err != nil {
|
||||
|
@ -637,7 +637,7 @@ func triageExtendedObjects(allObjects []*data.ExtendedNodeVersion) (prefixes []s
|
|||
return
|
||||
}
|
||||
|
||||
func (n *layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo *data.BucketInfo, node *data.NodeVersion) (oi *data.ObjectInfo) {
|
||||
func (n *Layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo *data.BucketInfo, node *data.NodeVersion) (oi *data.ObjectInfo) {
|
||||
owner := n.BearerOwner(ctx)
|
||||
if extInfo := n.cache.GetObject(owner, newAddress(bktInfo.CID, node.OID)); extInfo != nil {
|
||||
return extInfo.ObjectInfo
|
||||
|
|
|
@ -146,7 +146,7 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error {
|
||||
func (n *Layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error {
|
||||
metaSize := len(p.Header)
|
||||
if p.Data != nil {
|
||||
metaSize += len(p.Data.ACLHeaders)
|
||||
|
@ -185,7 +185,7 @@ func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar
|
|||
return n.treeService.CreateMultipartUpload(ctx, p.Info.Bkt, info)
|
||||
}
|
||||
|
||||
func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, error) {
|
||||
func (n *Layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, error) {
|
||||
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Info.Bkt, p.Info.Key, p.Info.UploadID)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrNodeNotFound) {
|
||||
|
@ -206,7 +206,7 @@ func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, er
|
|||
return objInfo.ETag(n.features.MD5Enabled()), nil
|
||||
}
|
||||
|
||||
func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInfo, p *UploadPartParams) (*data.ObjectInfo, error) {
|
||||
func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInfo, p *UploadPartParams) (*data.ObjectInfo, error) {
|
||||
encInfo := FormEncryptionInfo(multipartInfo.Meta)
|
||||
if err := p.Info.Encryption.MatchObjectEncryption(encInfo); err != nil {
|
||||
n.reqLogger(ctx).Warn(logs.MismatchedObjEncryptionInfo, zap.Error(err))
|
||||
|
@ -319,7 +319,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
|
|||
return objInfo, nil
|
||||
}
|
||||
|
||||
func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) {
|
||||
func (n *Layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) {
|
||||
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Info.Bkt, p.Info.Key, p.Info.UploadID)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrNodeNotFound) {
|
||||
|
@ -367,7 +367,7 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
|
|||
return n.uploadPart(ctx, multipartInfo, params)
|
||||
}
|
||||
|
||||
func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ExtendedObjectInfo, error) {
|
||||
func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ExtendedObjectInfo, error) {
|
||||
for i := 1; i < len(p.Parts); i++ {
|
||||
if p.Parts[i].PartNumber <= p.Parts[i-1].PartNumber {
|
||||
return nil, nil, s3errors.GetAPIError(s3errors.ErrInvalidPartOrder)
|
||||
|
@ -492,7 +492,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
|||
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
|
||||
}
|
||||
|
||||
func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) {
|
||||
func (n *Layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) {
|
||||
var result ListMultipartUploadsInfo
|
||||
if p.MaxUploads == 0 {
|
||||
return &result, nil
|
||||
|
@ -552,7 +552,7 @@ func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload
|
|||
return &result, nil
|
||||
}
|
||||
|
||||
func (n *layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error {
|
||||
func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error {
|
||||
multipartInfo, parts, err := n.getUploadParts(ctx, p)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -568,7 +568,7 @@ func (n *layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
|
|||
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo)
|
||||
}
|
||||
|
||||
func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {
|
||||
func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {
|
||||
var res ListPartsInfo
|
||||
multipartInfo, partsInfo, err := n.getUploadParts(ctx, p.Info)
|
||||
if err != nil {
|
||||
|
@ -622,7 +622,7 @@ func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
|
|||
return &res, nil
|
||||
}
|
||||
|
||||
func (n *layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.PartInfo, error) {
|
||||
func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.PartInfo, error) {
|
||||
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrNodeNotFound) {
|
||||
|
|
|
@ -67,7 +67,7 @@ func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
|
|||
}
|
||||
|
||||
// objectHead returns all object's headers.
|
||||
func (n *layer) objectHead(ctx context.Context, bktInfo *data.BucketInfo, idObj oid.ID) (*object.Object, error) {
|
||||
func (n *Layer) objectHead(ctx context.Context, bktInfo *data.BucketInfo, idObj oid.ID) (*object.Object, error) {
|
||||
prm := PrmObjectRead{
|
||||
Container: bktInfo.CID,
|
||||
Object: idObj,
|
||||
|
@ -84,7 +84,7 @@ func (n *layer) objectHead(ctx context.Context, bktInfo *data.BucketInfo, idObj
|
|||
return res.Head, nil
|
||||
}
|
||||
|
||||
func (n *layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Reader, error) {
|
||||
func (n *Layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Reader, error) {
|
||||
if _, isCombined := p.objInfo.Headers[MultipartObjectSize]; !isCombined {
|
||||
return n.initFrostFSObjectPayloadReader(ctx, getFrostFSParams{
|
||||
off: p.off,
|
||||
|
@ -131,7 +131,7 @@ func (n *layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Re
|
|||
|
||||
// initializes payload reader of the FrostFS object.
|
||||
// Zero range corresponds to full payload (panics if only offset is set).
|
||||
func (n *layer) initFrostFSObjectPayloadReader(ctx context.Context, p getFrostFSParams) (io.Reader, error) {
|
||||
func (n *Layer) initFrostFSObjectPayloadReader(ctx context.Context, p getFrostFSParams) (io.Reader, error) {
|
||||
prm := PrmObjectRead{
|
||||
Container: p.bktInfo.CID,
|
||||
Object: p.oid,
|
||||
|
@ -150,7 +150,7 @@ func (n *layer) initFrostFSObjectPayloadReader(ctx context.Context, p getFrostFS
|
|||
}
|
||||
|
||||
// objectGet returns an object with payload in the object.
|
||||
func (n *layer) objectGet(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (*object.Object, error) {
|
||||
func (n *Layer) objectGet(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (*object.Object, error) {
|
||||
prm := PrmObjectRead{
|
||||
Container: bktInfo.CID,
|
||||
Object: objID,
|
||||
|
@ -214,7 +214,7 @@ func ParseCompletedPartHeader(hdr string) (*Part, error) {
|
|||
}
|
||||
|
||||
// PutObject stores object into FrostFS, took payload from io.Reader.
|
||||
func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.ExtendedObjectInfo, error) {
|
||||
func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.ExtendedObjectInfo, error) {
|
||||
bktSettings, err := n.GetBucketSettings(ctx, p.BktInfo)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't get versioning settings object: %w", err)
|
||||
|
@ -363,7 +363,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
|||
return extendedObjInfo, nil
|
||||
}
|
||||
|
||||
func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.BucketInfo, objectName string) (*data.ExtendedObjectInfo, error) {
|
||||
func (n *Layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.BucketInfo, objectName string) (*data.ExtendedObjectInfo, error) {
|
||||
owner := n.BearerOwner(ctx)
|
||||
if extObjInfo := n.cache.GetLastObject(owner, bkt.Name, objectName); extObjInfo != nil {
|
||||
return extObjInfo, nil
|
||||
|
@ -401,7 +401,7 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke
|
|||
return extObjInfo, nil
|
||||
}
|
||||
|
||||
func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadObjectParams) (*data.ExtendedObjectInfo, error) {
|
||||
func (n *Layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadObjectParams) (*data.ExtendedObjectInfo, error) {
|
||||
var err error
|
||||
var foundVersion *data.NodeVersion
|
||||
if p.VersionID == data.UnversionedObjectVersionID {
|
||||
|
@ -459,7 +459,7 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb
|
|||
}
|
||||
|
||||
// objectDelete puts tombstone object into frostfs.
|
||||
func (n *layer) objectDelete(ctx context.Context, bktInfo *data.BucketInfo, idObj oid.ID) error {
|
||||
func (n *Layer) objectDelete(ctx context.Context, bktInfo *data.BucketInfo, idObj oid.ID) error {
|
||||
prm := PrmObjectDelete{
|
||||
Container: bktInfo.CID,
|
||||
Object: idObj,
|
||||
|
@ -474,7 +474,7 @@ func (n *layer) objectDelete(ctx context.Context, bktInfo *data.BucketInfo, idOb
|
|||
|
||||
// objectPutAndHash prepare auth parameters and invoke frostfs.CreateObject.
|
||||
// Returns object ID and payload sha256 hash.
|
||||
func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (uint64, oid.ID, []byte, []byte, error) {
|
||||
func (n *Layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (uint64, oid.ID, []byte, []byte, error) {
|
||||
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
|
||||
prm.ClientCut = n.features.ClientCut()
|
||||
prm.BufferMaxSize = n.features.BufferMaxSizeForPut()
|
||||
|
|
|
@ -31,8 +31,6 @@ func TestWrapReader(t *testing.T) {
|
|||
|
||||
func TestGoroutinesDontLeakInPutAndHash(t *testing.T) {
|
||||
tc := prepareContext(t)
|
||||
l, ok := tc.layer.(*layer)
|
||||
require.True(t, ok)
|
||||
|
||||
content := make([]byte, 128*1024)
|
||||
_, err := rand.Read(content)
|
||||
|
@ -46,7 +44,7 @@ func TestGoroutinesDontLeakInPutAndHash(t *testing.T) {
|
|||
|
||||
expErr := errors.New("some error")
|
||||
tc.testFrostFS.SetObjectPutError(tc.obj, expErr)
|
||||
_, _, _, _, err = l.objectPutAndHash(tc.ctx, prm, tc.bktInfo)
|
||||
_, _, _, _, err = tc.layer.objectPutAndHash(tc.ctx, prm, tc.bktInfo)
|
||||
require.ErrorIs(t, err, expErr)
|
||||
require.Empty(t, payload.Len(), "body must be read out otherwise goroutines can leak in wrapReader")
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ type PutLockInfoParams struct {
|
|||
NodeVersion *data.NodeVersion // optional
|
||||
}
|
||||
|
||||
func (n *layer) PutLockInfo(ctx context.Context, p *PutLockInfoParams) (err error) {
|
||||
func (n *Layer) PutLockInfo(ctx context.Context, p *PutLockInfoParams) (err error) {
|
||||
newLock := p.NewLock
|
||||
versionNode := p.NodeVersion
|
||||
// sometimes node version can be provided from executing context
|
||||
|
@ -100,7 +100,7 @@ func (n *layer) PutLockInfo(ctx context.Context, p *PutLockInfoParams) (err erro
|
|||
return nil
|
||||
}
|
||||
|
||||
func (n *layer) getNodeVersionFromCacheOrFrostfs(ctx context.Context, objVersion *data.ObjectVersion) (nodeVersion *data.NodeVersion, err error) {
|
||||
func (n *Layer) getNodeVersionFromCacheOrFrostfs(ctx context.Context, objVersion *data.ObjectVersion) (nodeVersion *data.NodeVersion, err error) {
|
||||
// check cache if node version is stored inside extendedObjectVersion
|
||||
nodeVersion = n.getNodeVersionFromCache(n.BearerOwner(ctx), objVersion)
|
||||
if nodeVersion == nil {
|
||||
|
@ -111,7 +111,7 @@ func (n *layer) getNodeVersionFromCacheOrFrostfs(ctx context.Context, objVersion
|
|||
return nodeVersion, nil
|
||||
}
|
||||
|
||||
func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, lock *data.ObjectLock, copiesNumber []uint32) (oid.ID, error) {
|
||||
func (n *Layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, lock *data.ObjectLock, copiesNumber []uint32) (oid.ID, error) {
|
||||
prm := PrmObjectCreate{
|
||||
Container: bktInfo.CID,
|
||||
Locks: []oid.ID{objID},
|
||||
|
@ -129,7 +129,7 @@ func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, obj
|
|||
return id, err
|
||||
}
|
||||
|
||||
func (n *layer) GetLockInfo(ctx context.Context, objVersion *data.ObjectVersion) (*data.LockInfo, error) {
|
||||
func (n *Layer) GetLockInfo(ctx context.Context, objVersion *data.ObjectVersion) (*data.LockInfo, error) {
|
||||
owner := n.BearerOwner(ctx)
|
||||
if lockInfo := n.cache.GetLockInfo(owner, lockObjectKey(objVersion)); lockInfo != nil {
|
||||
return lockInfo, nil
|
||||
|
@ -153,7 +153,7 @@ func (n *layer) GetLockInfo(ctx context.Context, objVersion *data.ObjectVersion)
|
|||
return lockInfo, nil
|
||||
}
|
||||
|
||||
func (n *layer) getCORS(ctx context.Context, bkt *data.BucketInfo) (*data.CORSConfiguration, error) {
|
||||
func (n *Layer) getCORS(ctx context.Context, bkt *data.BucketInfo) (*data.CORSConfiguration, error) {
|
||||
owner := n.BearerOwner(ctx)
|
||||
if cors := n.cache.GetCORS(owner, bkt); cors != nil {
|
||||
return cors, nil
|
||||
|
@ -190,7 +190,7 @@ func lockObjectKey(objVersion *data.ObjectVersion) string {
|
|||
return ".lock." + objVersion.BktInfo.CID.EncodeToString() + "." + objVersion.ObjectName + "." + objVersion.VersionID
|
||||
}
|
||||
|
||||
func (n *layer) GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
|
||||
func (n *Layer) GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
|
||||
owner := n.BearerOwner(ctx)
|
||||
if settings := n.cache.GetSettings(owner, bktInfo); settings != nil {
|
||||
return settings, nil
|
||||
|
@ -209,7 +209,7 @@ func (n *layer) GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo)
|
|||
return settings, nil
|
||||
}
|
||||
|
||||
func (n *layer) PutBucketSettings(ctx context.Context, p *PutSettingsParams) error {
|
||||
func (n *Layer) PutBucketSettings(ctx context.Context, p *PutSettingsParams) error {
|
||||
if err := n.treeService.PutSettingsNode(ctx, p.BktInfo, p.Settings); err != nil {
|
||||
return fmt.Errorf("failed to get settings node: %w", err)
|
||||
}
|
||||
|
@ -219,7 +219,7 @@ func (n *layer) PutBucketSettings(ctx context.Context, p *PutSettingsParams) err
|
|||
return nil
|
||||
}
|
||||
|
||||
func (n *layer) attributesFromLock(ctx context.Context, lock *data.ObjectLock) ([][2]string, error) {
|
||||
func (n *Layer) attributesFromLock(ctx context.Context, lock *data.ObjectLock) ([][2]string, error) {
|
||||
var (
|
||||
err error
|
||||
expEpoch uint64
|
||||
|
|
|
@ -14,7 +14,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (n *layer) GetObjectTagging(ctx context.Context, p *data.GetObjectTaggingParams) (string, map[string]string, error) {
|
||||
func (n *Layer) GetObjectTagging(ctx context.Context, p *data.GetObjectTaggingParams) (string, map[string]string, error) {
|
||||
var err error
|
||||
owner := n.BearerOwner(ctx)
|
||||
|
||||
|
@ -50,7 +50,7 @@ func (n *layer) GetObjectTagging(ctx context.Context, p *data.GetObjectTaggingPa
|
|||
return p.ObjectVersion.VersionID, tags, nil
|
||||
}
|
||||
|
||||
func (n *layer) PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingParams) (err error) {
|
||||
func (n *Layer) PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingParams) (err error) {
|
||||
nodeVersion := p.NodeVersion
|
||||
if nodeVersion == nil {
|
||||
nodeVersion, err = n.getNodeVersionFromCacheOrFrostfs(ctx, p.ObjectVersion)
|
||||
|
@ -73,7 +73,7 @@ func (n *layer) PutObjectTagging(ctx context.Context, p *data.PutObjectTaggingPa
|
|||
return nil
|
||||
}
|
||||
|
||||
func (n *layer) DeleteObjectTagging(ctx context.Context, p *data.ObjectVersion) error {
|
||||
func (n *Layer) DeleteObjectTagging(ctx context.Context, p *data.ObjectVersion) error {
|
||||
version, err := n.getNodeVersion(ctx, p)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -94,7 +94,7 @@ func (n *layer) DeleteObjectTagging(ctx context.Context, p *data.ObjectVersion)
|
|||
return nil
|
||||
}
|
||||
|
||||
func (n *layer) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {
|
||||
func (n *Layer) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {
|
||||
owner := n.BearerOwner(ctx)
|
||||
|
||||
if tags := n.cache.GetTagging(owner, bucketTaggingCacheKey(bktInfo.CID)); tags != nil {
|
||||
|
@ -111,7 +111,7 @@ func (n *layer) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo)
|
|||
return tags, nil
|
||||
}
|
||||
|
||||
func (n *layer) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error {
|
||||
func (n *Layer) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error {
|
||||
if err := n.treeService.PutBucketTagging(ctx, bktInfo, tagSet); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -121,7 +121,7 @@ func (n *layer) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo,
|
|||
return nil
|
||||
}
|
||||
|
||||
func (n *layer) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error {
|
||||
func (n *Layer) DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error {
|
||||
n.cache.DeleteTagging(bucketTaggingCacheKey(bktInfo.CID))
|
||||
|
||||
return n.treeService.DeleteBucketTagging(ctx, bktInfo)
|
||||
|
@ -135,7 +135,7 @@ func bucketTaggingCacheKey(cnrID cid.ID) string {
|
|||
return ".tagset." + cnrID.EncodeToString()
|
||||
}
|
||||
|
||||
func (n *layer) getNodeVersion(ctx context.Context, objVersion *data.ObjectVersion) (*data.NodeVersion, error) {
|
||||
func (n *Layer) getNodeVersion(ctx context.Context, objVersion *data.ObjectVersion) (*data.NodeVersion, error) {
|
||||
var err error
|
||||
var version *data.NodeVersion
|
||||
|
||||
|
@ -173,7 +173,7 @@ func (n *layer) getNodeVersion(ctx context.Context, objVersion *data.ObjectVersi
|
|||
return version, err
|
||||
}
|
||||
|
||||
func (n *layer) getNodeVersionFromCache(owner user.ID, o *data.ObjectVersion) *data.NodeVersion {
|
||||
func (n *Layer) getNodeVersionFromCache(owner user.ID, o *data.ObjectVersion) *data.NodeVersion {
|
||||
if len(o.VersionID) == 0 || o.VersionID == data.UnversionedObjectVersionID {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -130,7 +130,7 @@ func (tc *testContext) getObjectByID(objID oid.ID) *object.Object {
|
|||
type testContext struct {
|
||||
t *testing.T
|
||||
ctx context.Context
|
||||
layer Client
|
||||
layer *Layer
|
||||
bktInfo *data.BucketInfo
|
||||
obj string
|
||||
testFrostFS *TestFrostFS
|
||||
|
|
|
@ -61,7 +61,7 @@ type (
|
|||
pool *pool.Pool
|
||||
treePool *treepool.Pool
|
||||
key *keys.PrivateKey
|
||||
obj layer.Client
|
||||
obj *layer.Layer
|
||||
api api.Handler
|
||||
|
||||
frostfsid *frostfsid.FrostFSID
|
||||
|
|
Loading…
Reference in a new issue