Dmitrii Stepanov
c80b46fad3
All checks were successful
DCO action / DCO (pull_request) Successful in 1m59s
Vulncheck / Vulncheck (pull_request) Successful in 3m31s
Build / Build Components (1.20) (pull_request) Successful in 4m37s
Build / Build Components (1.21) (pull_request) Successful in 4m33s
Tests and linters / Tests (1.20) (pull_request) Successful in 4m54s
Tests and linters / Staticcheck (pull_request) Successful in 4m49s
Tests and linters / Tests with -race (pull_request) Successful in 5m9s
Tests and linters / Lint (pull_request) Successful in 6m4s
Tests and linters / Tests (1.21) (pull_request) Successful in 6m9s
Now it is possible to enable compressability estimation. If data is likely uncompressable, it should reduce CPU time and memory. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
155 lines
3.9 KiB
Go
155 lines
3.9 KiB
Go
package blobstor
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// SubStorage represents single storage component with some storage policy.
|
|
type SubStorage struct {
|
|
Storage common.Storage
|
|
Policy func(*objectSDK.Object, []byte) bool
|
|
}
|
|
|
|
// BlobStor represents FrostFS local BLOB storage.
|
|
type BlobStor struct {
|
|
cfg
|
|
|
|
modeMtx sync.RWMutex
|
|
mode mode.Mode
|
|
}
|
|
|
|
// Info contains information about blobstor.
|
|
type Info struct {
|
|
SubStorages []SubStorageInfo
|
|
}
|
|
|
|
// SubStorageInfo contains information about blobstor storage component.
|
|
type SubStorageInfo struct {
|
|
Type string
|
|
Path string
|
|
}
|
|
|
|
// Option represents BlobStor's constructor option.
|
|
type Option func(*cfg)
|
|
|
|
type cfg struct {
|
|
compression compression.Config
|
|
log *logger.Logger
|
|
storage []SubStorage
|
|
metrics Metrics
|
|
}
|
|
|
|
func initConfig(c *cfg) {
|
|
c.log = &logger.Logger{Logger: zap.L()}
|
|
c.metrics = &noopMetrics{}
|
|
}
|
|
|
|
// New creates, initializes and returns new BlobStor instance.
|
|
func New(opts ...Option) *BlobStor {
|
|
bs := new(BlobStor)
|
|
initConfig(&bs.cfg)
|
|
|
|
for i := range opts {
|
|
opts[i](&bs.cfg)
|
|
}
|
|
|
|
for i := range bs.storage {
|
|
bs.storage[i].Storage.SetCompressor(&bs.compression)
|
|
}
|
|
|
|
return bs
|
|
}
|
|
|
|
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
|
|
func (b *BlobStor) SetLogger(l *logger.Logger) {
|
|
b.log = l
|
|
}
|
|
|
|
func (b *BlobStor) SetParentID(parentID string) {
|
|
b.metrics.SetParentID(parentID)
|
|
for _, ss := range b.storage {
|
|
ss.Storage.SetParentID(parentID)
|
|
}
|
|
}
|
|
|
|
// WithStorages provides sub-blobstors.
|
|
func WithStorages(st []SubStorage) Option {
|
|
return func(c *cfg) {
|
|
c.storage = st
|
|
}
|
|
}
|
|
|
|
// WithLogger returns option to specify BlobStor's logger.
|
|
func WithLogger(l *logger.Logger) Option {
|
|
return func(c *cfg) {
|
|
c.log = &logger.Logger{Logger: l.With(zap.String("component", "BlobStor"))}
|
|
}
|
|
}
|
|
|
|
// WithCompressObjects returns option to toggle
|
|
// compression of the stored objects.
|
|
//
|
|
// If true, Zstandard algorithm is used for data compression.
|
|
//
|
|
// If compressor (decompressor) creation failed,
|
|
// the uncompressed option will be used, and the error
|
|
// is recorded in the provided log.
|
|
func WithCompressObjects(comp bool) Option {
|
|
return func(c *cfg) {
|
|
c.compression.Enabled = comp
|
|
}
|
|
}
|
|
|
|
// WithCompressibilityEstimate returns an option to use
|
|
// normilized compressibility estimate to decide compress
|
|
// data or not.
|
|
//
|
|
// See https://github.com/klauspost/compress/blob/v1.17.2/compressible.go#L5
|
|
func WithCompressibilityEstimate(v bool) Option {
|
|
return func(c *cfg) {
|
|
c.compression.UseCompressEstimation = v
|
|
}
|
|
}
|
|
|
|
// WithCompressibilityEstimateThreshold returns an option to set
|
|
// normilized compressibility estimate threshold.
|
|
//
|
|
// See https://github.com/klauspost/compress/blob/v1.17.2/compressible.go#L5
|
|
func WithCompressibilityEstimateThreshold(threshold float64) Option {
|
|
return func(c *cfg) {
|
|
c.compression.CompressEstimationThreshold = threshold
|
|
}
|
|
}
|
|
|
|
// WithUncompressableContentTypes returns option to disable decompression
|
|
// for specific content types as seen by object.AttributeContentType attribute.
|
|
func WithUncompressableContentTypes(values []string) Option {
|
|
return func(c *cfg) {
|
|
c.compression.UncompressableContentTypes = values
|
|
}
|
|
}
|
|
|
|
// SetReportErrorFunc allows to provide a function to be called on disk errors.
|
|
// This function MUST be called before Open.
|
|
func (b *BlobStor) SetReportErrorFunc(f func(string, error)) {
|
|
for i := range b.storage {
|
|
b.storage[i].Storage.SetReportErrorFunc(f)
|
|
}
|
|
}
|
|
|
|
func WithMetrics(m Metrics) Option {
|
|
return func(c *cfg) {
|
|
c.metrics = m
|
|
}
|
|
}
|
|
|
|
func (b *BlobStor) Compressor() *compression.Config {
|
|
return &b.cfg.compression
|
|
}
|