package engine import ( "context" "sync" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" ) // FlushWriteCachePrm groups the parameters of FlushWriteCache operation. type FlushWriteCachePrm struct { shardID *shard.ID ignoreErrors bool seal bool } // SetShardID is an option to set shard ID. // // Option is required. func (p *FlushWriteCachePrm) SetShardID(id *shard.ID) { p.shardID = id } // SetIgnoreErrors sets errors ignore flag. func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) { p.ignoreErrors = ignore } // SetSeal sets seal flag. func (p *FlushWriteCachePrm) SetSeal(v bool) { p.seal = v } // FlushWriteCacheRes groups the resulting values of FlushWriteCache operation. type FlushWriteCacheRes struct{} // FlushWriteCache flushes write-cache on a single shard. func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) (FlushWriteCacheRes, error) { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.FlushWriteCache", trace.WithAttributes( attribute.String("shard_id", p.shardID.String()), attribute.Bool("ignore_errors", p.ignoreErrors), attribute.Bool("seal", p.seal), )) defer span.End() e.mtx.RLock() sh, ok := e.shards[p.shardID.String()] e.mtx.RUnlock() if !ok { return FlushWriteCacheRes{}, errShardNotFound } var prm shard.FlushWriteCachePrm prm.SetIgnoreErrors(p.ignoreErrors) prm.SetSeal(p.seal) return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm) } type SealWriteCachePrm struct { ShardIDs []*shard.ID IgnoreErrors bool } type ShardSealResult struct { ShardID *shard.ID Success bool ErrorMsg string } type SealWriteCacheRes struct { ShardResults []ShardSealResult } // SealWriteCache flushed all data to blobstore and moves write-cache to degraded read only mode. func (e *StorageEngine) SealWriteCache(ctx context.Context, prm SealWriteCachePrm) (SealWriteCacheRes, error) { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.SealWriteCache", trace.WithAttributes( attribute.Int("shard_id_count", len(prm.ShardIDs)), attribute.Bool("ignore_errors", prm.IgnoreErrors), )) defer span.End() res := SealWriteCacheRes{ ShardResults: make([]ShardSealResult, 0, len(prm.ShardIDs)), } resGuard := &sync.Mutex{} eg, egCtx := errgroup.WithContext(ctx) for _, shardID := range prm.ShardIDs { shardID := shardID eg.Go(func() error { e.mtx.RLock() sh, ok := e.shards[shardID.String()] e.mtx.RUnlock() if !ok { resGuard.Lock() defer resGuard.Unlock() res.ShardResults = append(res.ShardResults, ShardSealResult{ ShardID: shardID, ErrorMsg: errShardNotFound.Error(), }) return nil } err := sh.SealWriteCache(egCtx, shard.SealWriteCachePrm{IgnoreErrors: prm.IgnoreErrors}) resGuard.Lock() defer resGuard.Unlock() if err != nil { res.ShardResults = append(res.ShardResults, ShardSealResult{ ShardID: shardID, ErrorMsg: err.Error(), }) } else { res.ShardResults = append(res.ShardResults, ShardSealResult{ ShardID: shardID, Success: true, }) } return nil }) } if err := eg.Wait(); err != nil { return SealWriteCacheRes{}, err } return res, nil } type writeCacheMetrics struct { shardID string metrics metrics.WriteCacheMetrics path string } func (m *writeCacheMetrics) SetPath(path string) { m.path = path } func (m *writeCacheMetrics) SetShardID(id string) { m.shardID = id } func (m *writeCacheMetrics) Get(d time.Duration, success bool, st writecache.StorageType) { m.metrics.AddMethodDuration(m.shardID, m.path, st.String(), "Get", success, d) } func (m *writeCacheMetrics) Delete(d time.Duration, success bool, st writecache.StorageType) { m.metrics.AddMethodDuration(m.shardID, m.path, st.String(), "Delete", success, d) } func (m *writeCacheMetrics) Put(d time.Duration, success bool, st writecache.StorageType) { m.metrics.AddMethodDuration(m.shardID, m.path, st.String(), "Put", success, d) } func (m *writeCacheMetrics) SetEstimateSize(db, fstree uint64) { m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeDB.String(), db) m.metrics.SetEstimateSize(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree) } func (m *writeCacheMetrics) SetMode(mode mode.Mode) { m.metrics.SetMode(m.shardID, mode.String()) } func (m *writeCacheMetrics) SetActualCounters(db, fstree uint64) { m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeDB.String(), db) m.metrics.SetActualCount(m.shardID, m.path, writecache.StorageTypeFSTree.String(), fstree) } func (m *writeCacheMetrics) Flush(success bool, st writecache.StorageType) { m.metrics.IncOperationCounter(m.shardID, m.path, st.String(), "Flush", metrics.NullBool{Bool: success, Valid: true}) } func (m *writeCacheMetrics) Evict(st writecache.StorageType) { m.metrics.IncOperationCounter(m.shardID, m.path, st.String(), "Evict", metrics.NullBool{}) } func (m *writeCacheMetrics) Close() { m.metrics.Close(m.shardID, m.path) }