Performance improvements #1080

Merged
dstepanov-yadro merged 4 commits from dstepanov-yadro/frostfs-node:fix/improve_ape_perf into master 2024-09-04 19:51:07 +00:00
6 changed files with 60 additions and 36 deletions

View file

@ -232,14 +232,19 @@ func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, er
}
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil {
b := tx.Bucket(shardInfoBucket)
if b == nil {
return db.incContainerObjectCounter(tx, cnrID, isUserObject)
}
if err := db.updateShardObjectCounterBucket(b, phy, 1, true); err != nil {
return fmt.Errorf("could not increase phy object counter: %w", err)
}
if err := db.updateShardObjectCounter(tx, logical, 1, true); err != nil {
if err := db.updateShardObjectCounterBucket(b, logical, 1, true); err != nil {
return fmt.Errorf("could not increase logical object counter: %w", err)
}
if isUserObject {
if err := db.updateShardObjectCounter(tx, user, 1, true); err != nil {
if err := db.updateShardObjectCounterBucket(b, user, 1, true); err != nil {
return fmt.Errorf("could not increase user object counter: %w", err)
}
}
@ -252,6 +257,10 @@ func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint6
return nil
}
return db.updateShardObjectCounterBucket(b, typ, delta, inc)
}
func (*DB) updateShardObjectCounterBucket(b *bbolt.Bucket, typ objectType, delta uint64, inc bool) error {
var counter uint64
var counterKey []byte

View file

@ -36,6 +36,14 @@ func (r StorageIDRes) StorageID() []byte {
// StorageID returns storage descriptor for objects from the blobstor.
// It is put together with the object can makes get/delete operation faster.
func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes, err error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("StorageID", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.StorageID",
trace.WithAttributes(
attribute.String("address", prm.addr.EncodeToString()),
@ -54,7 +62,7 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes
return err
})
success = err == nil
return res, metaerr.Wrap(err)
}

View file

@ -54,6 +54,9 @@ type Prm struct {
// If SoftAPECheck is set to true, then NoRuleFound is interpreted as allow.
SoftAPECheck bool
// If true, object headers will not retrieved from storage engine.
WithoutHeaderRequest bool
}
var errMissingOID = errors.New("object ID is not set")

View file

@ -17,36 +17,38 @@ import (
type request struct {
operation string
resource *resource
resource resource
properties map[string]string
}
var _ aperesource.Request = (*request)(nil)
var defaultRequest = request{}
var _ aperesource.Request = request{}
type resource struct {
name string
properties map[string]string
}
var _ aperesource.Resource = (*resource)(nil)
var _ aperesource.Resource = resource{}
func (r *resource) Name() string {
func (r resource) Name() string {
return r.name
}
func (r *resource) Property(key string) string {
func (r resource) Property(key string) string {
return r.properties[key]
}
func (r *request) Operation() string {
func (r request) Operation() string {
return r.operation
}
func (r *request) Property(key string) string {
func (r request) Property(key string) string {
return r.properties[key]
}
func (r *request) Resource() aperesource.Resource {
func (r request) Resource() aperesource.Resource {
return r.resource
}
@ -123,7 +125,7 @@ func objectProperties(cnr cid.ID, oid *oid.ID, cnrOwner user.ID, header *objectV
// newAPERequest creates an APE request to be passed to a chain router. It collects resource properties from
// header provided by headerProvider. If it cannot be found in headerProvider, then properties are
// initialized from header given in prm (if it is set). Otherwise, just CID and OID are set to properties.
func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (*request, error) {
func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (request, error) {
switch prm.Method {
case nativeschema.MethodGetObject,
nativeschema.MethodHeadObject,
@ -131,26 +133,26 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (*request, err
nativeschema.MethodHashObject,
nativeschema.MethodDeleteObject:
if prm.Object == nil {
return nil, fmt.Errorf("method %s: %w", prm.Method, errMissingOID)
return defaultRequest, fmt.Errorf("method %s: %w", prm.Method, errMissingOID)
}
case nativeschema.MethodSearchObject, nativeschema.MethodPutObject:
default:
return nil, fmt.Errorf("unknown method: %s", prm.Method)
return defaultRequest, fmt.Errorf("unknown method: %s", prm.Method)
}
var header *objectV2.Header
if prm.Header != nil {
header = prm.Header
} else if prm.Object != nil {
} else if prm.Object != nil && !prm.WithoutHeaderRequest {
headerObjSDK, err := c.headerProvider.GetHeader(ctx, prm.Container, *prm.Object)
if err == nil {
header = headerObjSDK.ToV2().GetHeader()
}
}
return &request{
return request{
operation: prm.Method,
resource: &resource{
resource: resource{
name: resourceName(prm.Container, prm.Object, prm.Namespace),
properties: objectProperties(prm.Container, prm.Object, prm.ContainerOwner, header),
},

View file

@ -258,7 +258,7 @@ func TestNewAPERequest(t *testing.T) {
expectedRequest := request{
operation: method,
resource: &resource{
resource: resource{
name: resourceName(cnr, obj, prm.Namespace),
properties: objectProperties(cnr, obj, testCnrOwner, func() *objectV2.Header {
if headerObjSDK != nil {
@ -273,7 +273,7 @@ func TestNewAPERequest(t *testing.T) {
},
}
require.Equal(t, expectedRequest, *r)
require.Equal(t, expectedRequest, r)
})
}
})

View file

@ -125,14 +125,15 @@ func (c *Service) Get(request *objectV2.GetRequest, stream objectSvc.GetObjectSt
}
err = c.apeChecker.CheckAPE(stream.Context(), Prm{
Namespace: reqCtx.Namespace,
Container: cnrID,
Object: objID,
Method: nativeschema.MethodGetObject,
Role: nativeSchemaRole(reqCtx.Role),
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
ContainerOwner: reqCtx.ContainerOwner,
SoftAPECheck: reqCtx.SoftAPECheck,
Namespace: reqCtx.Namespace,
Container: cnrID,
Object: objID,
Method: nativeschema.MethodGetObject,
Role: nativeSchemaRole(reqCtx.Role),
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
ContainerOwner: reqCtx.ContainerOwner,
SoftAPECheck: reqCtx.SoftAPECheck,
WithoutHeaderRequest: true,
})
if err != nil {
return toStatusErr(err)
@ -211,14 +212,15 @@ func (c *Service) Head(ctx context.Context, request *objectV2.HeadRequest) (*obj
}
err = c.apeChecker.CheckAPE(ctx, Prm{
Namespace: reqCtx.Namespace,
Container: cnrID,
Object: objID,
Method: nativeschema.MethodHeadObject,
Role: nativeSchemaRole(reqCtx.Role),
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
ContainerOwner: reqCtx.ContainerOwner,
SoftAPECheck: reqCtx.SoftAPECheck,
Namespace: reqCtx.Namespace,
Container: cnrID,
Object: objID,
Method: nativeschema.MethodHeadObject,
Role: nativeSchemaRole(reqCtx.Role),
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
ContainerOwner: reqCtx.ContainerOwner,
SoftAPECheck: reqCtx.SoftAPECheck,
WithoutHeaderRequest: true,
})
if err != nil {
return nil, toStatusErr(err)