package writecache import ( "context" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) // Put puts object to write-cache. // // Returns ErrReadOnly if write-cache is in R/O mode. // Returns ErrNotInitialized if write-cache has not been initialized yet. // Returns ErrOutOfSpace if saving an object leads to WC's size overflow. // Returns ErrBigObject if an objects exceeds maximum object size. func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) { ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Put", trace.WithAttributes( attribute.String("address", prm.Address.EncodeToString()), attribute.Bool("dont_compress", prm.DontCompress), )) defer span.End() startedAt := time.Now() added := false storageType := StorageTypeUndefined defer func() { c.metrics.Put(time.Since(startedAt), added, storageType) }() if !c.modeMtx.TryRLock() { return common.PutRes{}, ErrNotInitialized } defer c.modeMtx.RUnlock() if c.readOnly() { return common.PutRes{}, ErrReadOnly } sz := uint64(len(prm.RawData)) if sz > c.maxObjectSize { return common.PutRes{}, ErrBigObject } oi := objectInfo{ addr: prm.Address.EncodeToString(), obj: prm.Object, data: prm.RawData, } if sz <= c.smallObjectSize { storageType = StorageTypeDB err := c.putSmall(oi) if err == nil { added = true } return common.PutRes{}, err } storageType = StorageTypeFSTree err := c.putBig(ctx, oi.addr, prm) if err == nil { added = true } return common.PutRes{}, metaerr.Wrap(err) } // putSmall persists small objects to the write-cache database and // pushes the to the flush workers queue. func (c *cache) putSmall(obj objectInfo) error { cacheSize := c.estimateCacheSize() if c.maxCacheSize < c.incSizeDB(cacheSize) { return ErrOutOfSpace } var newRecord bool err := c.db.Batch(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) key := []byte(obj.addr) newRecord = b.Get(key) == nil if newRecord { return b.Put(key, obj.data) } return nil }) if err == nil { storagelog.Write(c.log, storagelog.AddressField(obj.addr), storagelog.StorageTypeField(wcStorageType), storagelog.OpField("db PUT"), ) if newRecord { c.objCounters.cDB.Add(1) c.estimateCacheSize() } } return err } // putBig writes object to FSTree and pushes it to the flush workers queue. func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error { cacheSz := c.estimateCacheSize() if c.maxCacheSize < c.incSizeFS(cacheSz) { return ErrOutOfSpace } _, err := c.fsTree.Put(ctx, prm) if err != nil { return err } if compressor := c.blobstor.Compressor(); compressor != nil && compressor.NeedsCompression(prm.Object) { c.mtx.Lock() c.compressFlags[addr] = struct{}{} c.mtx.Unlock() } storagelog.Write(c.log, storagelog.AddressField(addr), storagelog.StorageTypeField(wcStorageType), storagelog.OpField("fstree PUT"), ) // counter changed by fstree c.estimateCacheSize() return nil }