package writecachebbolt import ( "context" "errors" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) var ( // ErrBigObject is returned when object is too big to be placed in cache. ErrBigObject = errors.New("too big object") // ErrOutOfSpace is returned when there is no space left to put a new object. ErrOutOfSpace = errors.New("no space left in the write cache") ) // Put puts object to write-cache. // // Returns ErrReadOnly if write-cache is in R/O mode. // Returns ErrNotInitialized if write-cache has not been initialized yet. // Returns ErrOutOfSpace if saving an object leads to WC's size overflow. // Returns ErrBigObject if an objects exceeds maximum object size. func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) { ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Put", trace.WithAttributes( attribute.String("address", prm.Address.EncodeToString()), attribute.Bool("dont_compress", prm.DontCompress), )) defer span.End() startedAt := time.Now() added := false storageType := writecache.StorageTypeUndefined defer func() { c.metrics.Put(time.Since(startedAt), added, storageType) }() c.modeMtx.RLock() defer c.modeMtx.RUnlock() if c.readOnly() { return common.PutRes{}, writecache.ErrReadOnly } sz := uint64(len(prm.RawData)) if sz > c.maxObjectSize { return common.PutRes{}, ErrBigObject } oi := objectInfo{ addr: prm.Address.EncodeToString(), obj: prm.Object, data: prm.RawData, } if sz <= c.smallObjectSize { storageType = writecache.StorageTypeDB err := c.putSmall(oi) if err == nil { added = true } return common.PutRes{}, err } storageType = writecache.StorageTypeFSTree err := c.putBig(ctx, oi.addr, prm) if err == nil { added = true } return common.PutRes{}, metaerr.Wrap(err) } // putSmall persists small objects to the write-cache database and // pushes the to the flush workers queue. func (c *cache) putSmall(obj objectInfo) error { cacheSize := c.estimateCacheSize() if c.maxCacheSize < c.incSizeDB(cacheSize) { return ErrOutOfSpace } err := c.db.Batch(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) return b.Put([]byte(obj.addr), obj.data) }) if err == nil { storagelog.Write(c.log, storagelog.AddressField(obj.addr), storagelog.StorageTypeField(wcStorageType), storagelog.OpField("db PUT"), ) } return err } // putBig writes object to FSTree and pushes it to the flush workers queue. func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error { cacheSz := c.estimateCacheSize() if c.maxCacheSize < c.incSizeFS(cacheSz) { return ErrOutOfSpace } _, err := c.fsTree.Put(ctx, prm) if err != nil { return err } if compressor := c.blobstor.Compressor(); compressor != nil && compressor.NeedsCompression(prm.Object) { c.mtx.Lock() c.compressFlags[addr] = struct{}{} c.mtx.Unlock() } storagelog.Write(c.log, storagelog.AddressField(addr), storagelog.StorageTypeField(wcStorageType), storagelog.OpField("fstree PUT"), ) return nil }