Performance improvements #1080
6 changed files with 60 additions and 36 deletions
|
@ -232,14 +232,19 @@ func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, er
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
|
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
|
||||||
if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil {
|
b := tx.Bucket(shardInfoBucket)
|
||||||
|
if b == nil {
|
||||||
|
return db.incContainerObjectCounter(tx, cnrID, isUserObject)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := db.updateShardObjectCounterBucket(b, phy, 1, true); err != nil {
|
||||||
return fmt.Errorf("could not increase phy object counter: %w", err)
|
return fmt.Errorf("could not increase phy object counter: %w", err)
|
||||||
}
|
}
|
||||||
if err := db.updateShardObjectCounter(tx, logical, 1, true); err != nil {
|
if err := db.updateShardObjectCounterBucket(b, logical, 1, true); err != nil {
|
||||||
return fmt.Errorf("could not increase logical object counter: %w", err)
|
return fmt.Errorf("could not increase logical object counter: %w", err)
|
||||||
}
|
}
|
||||||
if isUserObject {
|
if isUserObject {
|
||||||
if err := db.updateShardObjectCounter(tx, user, 1, true); err != nil {
|
if err := db.updateShardObjectCounterBucket(b, user, 1, true); err != nil {
|
||||||
return fmt.Errorf("could not increase user object counter: %w", err)
|
return fmt.Errorf("could not increase user object counter: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -252,6 +257,10 @@ func (db *DB) updateShardObjectCounter(tx *bbolt.Tx, typ objectType, delta uint6
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return db.updateShardObjectCounterBucket(b, typ, delta, inc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*DB) updateShardObjectCounterBucket(b *bbolt.Bucket, typ objectType, delta uint64, inc bool) error {
|
||||||
var counter uint64
|
var counter uint64
|
||||||
var counterKey []byte
|
var counterKey []byte
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,14 @@ func (r StorageIDRes) StorageID() []byte {
|
||||||
// StorageID returns storage descriptor for objects from the blobstor.
|
// StorageID returns storage descriptor for objects from the blobstor.
|
||||||
// It is put together with the object can makes get/delete operation faster.
|
// It is put together with the object can makes get/delete operation faster.
|
||||||
func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes, err error) {
|
func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes, err error) {
|
||||||
|
var (
|
||||||
|
startedAt = time.Now()
|
||||||
|
success = false
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
db.metrics.AddMethodDuration("StorageID", time.Since(startedAt), success)
|
||||||
|
}()
|
||||||
|
|
||||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.StorageID",
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.StorageID",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("address", prm.addr.EncodeToString()),
|
attribute.String("address", prm.addr.EncodeToString()),
|
||||||
|
@ -54,7 +62,7 @@ func (db *DB) StorageID(ctx context.Context, prm StorageIDPrm) (res StorageIDRes
|
||||||
|
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
success = err == nil
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -54,6 +54,9 @@ type Prm struct {
|
||||||
|
|
||||||
// If SoftAPECheck is set to true, then NoRuleFound is interpreted as allow.
|
// If SoftAPECheck is set to true, then NoRuleFound is interpreted as allow.
|
||||||
SoftAPECheck bool
|
SoftAPECheck bool
|
||||||
|
|
||||||
|
// If true, object headers will not retrieved from storage engine.
|
||||||
|
WithoutHeaderRequest bool
|
||||||
}
|
}
|
||||||
|
|
||||||
var errMissingOID = errors.New("object ID is not set")
|
var errMissingOID = errors.New("object ID is not set")
|
||||||
|
|
|
@ -17,36 +17,38 @@ import (
|
||||||
|
|
||||||
type request struct {
|
type request struct {
|
||||||
operation string
|
operation string
|
||||||
resource *resource
|
resource resource
|
||||||
properties map[string]string
|
properties map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ aperesource.Request = (*request)(nil)
|
var defaultRequest = request{}
|
||||||
|
|
||||||
|
var _ aperesource.Request = request{}
|
||||||
|
|
||||||
type resource struct {
|
type resource struct {
|
||||||
name string
|
name string
|
||||||
properties map[string]string
|
properties map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ aperesource.Resource = (*resource)(nil)
|
var _ aperesource.Resource = resource{}
|
||||||
|
|
||||||
func (r *resource) Name() string {
|
func (r resource) Name() string {
|
||||||
return r.name
|
return r.name
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *resource) Property(key string) string {
|
func (r resource) Property(key string) string {
|
||||||
return r.properties[key]
|
return r.properties[key]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *request) Operation() string {
|
func (r request) Operation() string {
|
||||||
return r.operation
|
return r.operation
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *request) Property(key string) string {
|
func (r request) Property(key string) string {
|
||||||
return r.properties[key]
|
return r.properties[key]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *request) Resource() aperesource.Resource {
|
func (r request) Resource() aperesource.Resource {
|
||||||
return r.resource
|
return r.resource
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,7 +125,7 @@ func objectProperties(cnr cid.ID, oid *oid.ID, cnrOwner user.ID, header *objectV
|
||||||
// newAPERequest creates an APE request to be passed to a chain router. It collects resource properties from
|
// newAPERequest creates an APE request to be passed to a chain router. It collects resource properties from
|
||||||
// header provided by headerProvider. If it cannot be found in headerProvider, then properties are
|
// header provided by headerProvider. If it cannot be found in headerProvider, then properties are
|
||||||
// initialized from header given in prm (if it is set). Otherwise, just CID and OID are set to properties.
|
// initialized from header given in prm (if it is set). Otherwise, just CID and OID are set to properties.
|
||||||
func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (*request, error) {
|
func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (request, error) {
|
||||||
switch prm.Method {
|
switch prm.Method {
|
||||||
case nativeschema.MethodGetObject,
|
case nativeschema.MethodGetObject,
|
||||||
nativeschema.MethodHeadObject,
|
nativeschema.MethodHeadObject,
|
||||||
|
@ -131,26 +133,26 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (*request, err
|
||||||
nativeschema.MethodHashObject,
|
nativeschema.MethodHashObject,
|
||||||
nativeschema.MethodDeleteObject:
|
nativeschema.MethodDeleteObject:
|
||||||
if prm.Object == nil {
|
if prm.Object == nil {
|
||||||
return nil, fmt.Errorf("method %s: %w", prm.Method, errMissingOID)
|
return defaultRequest, fmt.Errorf("method %s: %w", prm.Method, errMissingOID)
|
||||||
}
|
}
|
||||||
case nativeschema.MethodSearchObject, nativeschema.MethodPutObject:
|
case nativeschema.MethodSearchObject, nativeschema.MethodPutObject:
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unknown method: %s", prm.Method)
|
return defaultRequest, fmt.Errorf("unknown method: %s", prm.Method)
|
||||||
}
|
}
|
||||||
|
|
||||||
var header *objectV2.Header
|
var header *objectV2.Header
|
||||||
if prm.Header != nil {
|
if prm.Header != nil {
|
||||||
header = prm.Header
|
header = prm.Header
|
||||||
} else if prm.Object != nil {
|
} else if prm.Object != nil && !prm.WithoutHeaderRequest {
|
||||||
headerObjSDK, err := c.headerProvider.GetHeader(ctx, prm.Container, *prm.Object)
|
headerObjSDK, err := c.headerProvider.GetHeader(ctx, prm.Container, *prm.Object)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
header = headerObjSDK.ToV2().GetHeader()
|
header = headerObjSDK.ToV2().GetHeader()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &request{
|
return request{
|
||||||
operation: prm.Method,
|
operation: prm.Method,
|
||||||
resource: &resource{
|
resource: resource{
|
||||||
name: resourceName(prm.Container, prm.Object, prm.Namespace),
|
name: resourceName(prm.Container, prm.Object, prm.Namespace),
|
||||||
properties: objectProperties(prm.Container, prm.Object, prm.ContainerOwner, header),
|
properties: objectProperties(prm.Container, prm.Object, prm.ContainerOwner, header),
|
||||||
},
|
},
|
||||||
|
|
|
@ -258,7 +258,7 @@ func TestNewAPERequest(t *testing.T) {
|
||||||
|
|
||||||
expectedRequest := request{
|
expectedRequest := request{
|
||||||
operation: method,
|
operation: method,
|
||||||
resource: &resource{
|
resource: resource{
|
||||||
name: resourceName(cnr, obj, prm.Namespace),
|
name: resourceName(cnr, obj, prm.Namespace),
|
||||||
properties: objectProperties(cnr, obj, testCnrOwner, func() *objectV2.Header {
|
properties: objectProperties(cnr, obj, testCnrOwner, func() *objectV2.Header {
|
||||||
if headerObjSDK != nil {
|
if headerObjSDK != nil {
|
||||||
|
@ -273,7 +273,7 @@ func TestNewAPERequest(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
require.Equal(t, expectedRequest, *r)
|
require.Equal(t, expectedRequest, r)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
|
@ -125,14 +125,15 @@ func (c *Service) Get(request *objectV2.GetRequest, stream objectSvc.GetObjectSt
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.apeChecker.CheckAPE(stream.Context(), Prm{
|
err = c.apeChecker.CheckAPE(stream.Context(), Prm{
|
||||||
Namespace: reqCtx.Namespace,
|
Namespace: reqCtx.Namespace,
|
||||||
Container: cnrID,
|
Container: cnrID,
|
||||||
Object: objID,
|
Object: objID,
|
||||||
Method: nativeschema.MethodGetObject,
|
Method: nativeschema.MethodGetObject,
|
||||||
Role: nativeSchemaRole(reqCtx.Role),
|
Role: nativeSchemaRole(reqCtx.Role),
|
||||||
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
||||||
ContainerOwner: reqCtx.ContainerOwner,
|
ContainerOwner: reqCtx.ContainerOwner,
|
||||||
SoftAPECheck: reqCtx.SoftAPECheck,
|
SoftAPECheck: reqCtx.SoftAPECheck,
|
||||||
|
WithoutHeaderRequest: true,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return toStatusErr(err)
|
return toStatusErr(err)
|
||||||
|
@ -211,14 +212,15 @@ func (c *Service) Head(ctx context.Context, request *objectV2.HeadRequest) (*obj
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.apeChecker.CheckAPE(ctx, Prm{
|
err = c.apeChecker.CheckAPE(ctx, Prm{
|
||||||
Namespace: reqCtx.Namespace,
|
Namespace: reqCtx.Namespace,
|
||||||
Container: cnrID,
|
Container: cnrID,
|
||||||
Object: objID,
|
Object: objID,
|
||||||
Method: nativeschema.MethodHeadObject,
|
Method: nativeschema.MethodHeadObject,
|
||||||
Role: nativeSchemaRole(reqCtx.Role),
|
Role: nativeSchemaRole(reqCtx.Role),
|
||||||
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
||||||
ContainerOwner: reqCtx.ContainerOwner,
|
ContainerOwner: reqCtx.ContainerOwner,
|
||||||
SoftAPECheck: reqCtx.SoftAPECheck,
|
SoftAPECheck: reqCtx.SoftAPECheck,
|
||||||
|
WithoutHeaderRequest: true,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, toStatusErr(err)
|
return nil, toStatusErr(err)
|
||||||
|
|
Loading…
Reference in a new issue