Performance improvements #1080

Merged
dstepanov-yadro merged 4 commits from dstepanov-yadro/frostfs-node:fix/improve_ape_perf into master 2024-09-04 19:51:07 +00:00
6 changed files with 60 additions and 36 deletions

View file

@ -232,14 +232,19 @@ func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, er
} }
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error { func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil { b := tx.Bucket(shardInfoBucket)
if b == nil {
return db.incContainerObjectCounter(tx, cnrID, isUserObject)
}
if err := db.updateShardObjectCounterBucket(b, phy, 1, true); err != nil {
return fmt.Errorf("could not increase phy object counter: %w", err) return fmt.Errorf("could not increase phy object counter: %w", err)
} }
if err := db.updateShardObjectCounter(tx, logical, 1, true); err != nil { if err := db.updateShardObjectCounterBucket(b, logical, 1, true); err != nil {
return fmt.Errorf("could not increase logical object counter: %w", err) return fmt.Errorf("could not increase logical object counter: %w", err)
} }
if isUserObject { if isUserObject {
if err := db.updateShardObjectCounter(tx, user, 1, true); err != nil { if err := db.updateShardObjectCounterBucket(b, user, 1, true); err != nil {
return fmt.Errorf("could not increase user object counter: %w", err) return fmt.Errorf("could not increase user object counter: %w", err)
} }
} }
@ -252,6 +257,10 @@ func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint6
return nil return nil
} }
return db.updateShardObjectCounterBucket(b, typ, delta, inc)
}
func (*DB) updateShardObjectCounterBucket(b *bbolt.Bucket, typ objectType, delta uint64, inc bool) error {
var counter uint64 var counter uint64
var counterKey []byte var counterKey []byte

View file

@ -36,6 +36,14 @@ func (r StorageIDRes) StorageID() []byte {
// StorageID returns storage descriptor for objects from the blobstor. // StorageID returns storage descriptor for objects from the blobstor.
// It is put together with the object can makes get/delete operation faster. // It is put together with the object can makes get/delete operation faster.
func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes, err error) { func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes, err error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("StorageID", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.StorageID", _, span := tracing.StartSpanFromContext(ctx, "metabase.StorageID",
trace.WithAttributes( trace.WithAttributes(
attribute.String("address", prm.addr.EncodeToString()), attribute.String("address", prm.addr.EncodeToString()),
@ -54,7 +62,7 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes
return err return err
}) })
success = err == nil
return res, metaerr.Wrap(err) return res, metaerr.Wrap(err)
} }

View file

@ -54,6 +54,9 @@ type Prm struct {
// If SoftAPECheck is set to true, then NoRuleFound is interpreted as allow. // If SoftAPECheck is set to true, then NoRuleFound is interpreted as allow.
SoftAPECheck bool SoftAPECheck bool
// If true, object headers will not retrieved from storage engine.
WithoutHeaderRequest bool
} }
var errMissingOID = errors.New("object ID is not set") var errMissingOID = errors.New("object ID is not set")

View file

@ -17,36 +17,38 @@ import (
type request struct { type request struct {
operation string operation string
resource *resource resource resource
properties map[string]string properties map[string]string
} }
var _ aperesource.Request = (*request)(nil) var defaultRequest = request{}
var _ aperesource.Request = request{}
type resource struct { type resource struct {
name string name string
properties map[string]string properties map[string]string
} }
var _ aperesource.Resource = (*resource)(nil) var _ aperesource.Resource = resource{}
func (r *resource) Name() string { func (r resource) Name() string {
return r.name return r.name
} }
func (r *resource) Property(key string) string { func (r resource) Property(key string) string {
return r.properties[key] return r.properties[key]
} }
func (r *request) Operation() string { func (r request) Operation() string {
return r.operation return r.operation
} }
func (r *request) Property(key string) string { func (r request) Property(key string) string {
return r.properties[key] return r.properties[key]
} }
func (r *request) Resource() aperesource.Resource { func (r request) Resource() aperesource.Resource {
return r.resource return r.resource
} }
@ -123,7 +125,7 @@ func objectProperties(cnr cid.ID, oid *oid.ID, cnrOwner user.ID, header *objectV
// newAPERequest creates an APE request to be passed to a chain router. It collects resource properties from // newAPERequest creates an APE request to be passed to a chain router. It collects resource properties from
// header provided by headerProvider. If it cannot be found in headerProvider, then properties are // header provided by headerProvider. If it cannot be found in headerProvider, then properties are
// initialized from header given in prm (if it is set). Otherwise, just CID and OID are set to properties. // initialized from header given in prm (if it is set). Otherwise, just CID and OID are set to properties.
func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (*request, error) { func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (request, error) {
switch prm.Method { switch prm.Method {
case nativeschema.MethodGetObject, case nativeschema.MethodGetObject,
nativeschema.MethodHeadObject, nativeschema.MethodHeadObject,
@ -131,26 +133,26 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (*request, err
nativeschema.MethodHashObject, nativeschema.MethodHashObject,
nativeschema.MethodDeleteObject: nativeschema.MethodDeleteObject:
if prm.Object == nil { if prm.Object == nil {
return nil, fmt.Errorf("method %s: %w", prm.Method, errMissingOID) return defaultRequest, fmt.Errorf("method %s: %w", prm.Method, errMissingOID)
} }
case nativeschema.MethodSearchObject, nativeschema.MethodPutObject: case nativeschema.MethodSearchObject, nativeschema.MethodPutObject:
default: default:
return nil, fmt.Errorf("unknown method: %s", prm.Method) return defaultRequest, fmt.Errorf("unknown method: %s", prm.Method)
} }
var header *objectV2.Header var header *objectV2.Header
if prm.Header != nil { if prm.Header != nil {
header = prm.Header header = prm.Header
} else if prm.Object != nil { } else if prm.Object != nil && !prm.WithoutHeaderRequest {
headerObjSDK, err := c.headerProvider.GetHeader(ctx, prm.Container, *prm.Object) headerObjSDK, err := c.headerProvider.GetHeader(ctx, prm.Container, *prm.Object)
if err == nil { if err == nil {
header = headerObjSDK.ToV2().GetHeader() header = headerObjSDK.ToV2().GetHeader()
} }
} }
return &request{ return request{
operation: prm.Method, operation: prm.Method,
resource: &resource{ resource: resource{
name: resourceName(prm.Container, prm.Object, prm.Namespace), name: resourceName(prm.Container, prm.Object, prm.Namespace),
properties: objectProperties(prm.Container, prm.Object, prm.ContainerOwner, header), properties: objectProperties(prm.Container, prm.Object, prm.ContainerOwner, header),
}, },

View file

@ -258,7 +258,7 @@ func TestNewAPERequest(t *testing.T) {
expectedRequest := request{ expectedRequest := request{
operation: method, operation: method,
resource: &resource{ resource: resource{
name: resourceName(cnr, obj, prm.Namespace), name: resourceName(cnr, obj, prm.Namespace),
properties: objectProperties(cnr, obj, testCnrOwner, func() *objectV2.Header { properties: objectProperties(cnr, obj, testCnrOwner, func() *objectV2.Header {
if headerObjSDK != nil { if headerObjSDK != nil {
@ -273,7 +273,7 @@ func TestNewAPERequest(t *testing.T) {
}, },
} }
require.Equal(t, expectedRequest, *r) require.Equal(t, expectedRequest, r)
}) })
} }
}) })

View file

@ -125,14 +125,15 @@ func (c *Service) Get(request *objectV2.GetRequest, stream objectSvc.GetObjectSt
} }
err = c.apeChecker.CheckAPE(stream.Context(), Prm{ err = c.apeChecker.CheckAPE(stream.Context(), Prm{
Namespace: reqCtx.Namespace, Namespace: reqCtx.Namespace,
Container: cnrID, Container: cnrID,
Object: objID, Object: objID,
Method: nativeschema.MethodGetObject, Method: nativeschema.MethodGetObject,
Role: nativeSchemaRole(reqCtx.Role), Role: nativeSchemaRole(reqCtx.Role),
SenderKey: hex.EncodeToString(reqCtx.SenderKey), SenderKey: hex.EncodeToString(reqCtx.SenderKey),
ContainerOwner: reqCtx.ContainerOwner, ContainerOwner: reqCtx.ContainerOwner,
SoftAPECheck: reqCtx.SoftAPECheck, SoftAPECheck: reqCtx.SoftAPECheck,
WithoutHeaderRequest: true,
}) })
if err != nil { if err != nil {
return toStatusErr(err) return toStatusErr(err)
@ -211,14 +212,15 @@ func (c *Service) Head(ctx context.Context, request *objectV2.HeadRequest) (*obj
} }
err = c.apeChecker.CheckAPE(ctx, Prm{ err = c.apeChecker.CheckAPE(ctx, Prm{
Namespace: reqCtx.Namespace, Namespace: reqCtx.Namespace,
Container: cnrID, Container: cnrID,
Object: objID, Object: objID,
Method: nativeschema.MethodHeadObject, Method: nativeschema.MethodHeadObject,
Role: nativeSchemaRole(reqCtx.Role), Role: nativeSchemaRole(reqCtx.Role),
SenderKey: hex.EncodeToString(reqCtx.SenderKey), SenderKey: hex.EncodeToString(reqCtx.SenderKey),
ContainerOwner: reqCtx.ContainerOwner, ContainerOwner: reqCtx.ContainerOwner,
SoftAPECheck: reqCtx.SoftAPECheck, SoftAPECheck: reqCtx.SoftAPECheck,
WithoutHeaderRequest: true,
}) })
if err != nil { if err != nil {
return nil, toStatusErr(err) return nil, toStatusErr(err)