package shard import ( "context" "errors" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) var ( dummyCancel = &writecacheSealCanceler{cancel: func() {}} notInitializedCancel = &writecacheSealCanceler{cancel: func() {}} errWriteCacheSealing = errors.New("writecache is already sealing or shard is not initialized") ) type writecacheSealCanceler struct { cancel context.CancelFunc } // FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation. type FlushWriteCachePrm struct { ignoreErrors bool seal bool } // SetIgnoreErrors sets the flag to ignore read-errors during flush. func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) { p.ignoreErrors = ignore } // SetSeal sets the flag to left writecache in read-only mode after flush. func (p *FlushWriteCachePrm) SetSeal(v bool) { p.seal = v } // errWriteCacheDisabled is returned when an operation on write-cache is performed, // but write-cache is disabled. var errWriteCacheDisabled = errors.New("write-cache is disabled") // FlushWriteCache flushes all data from the write-cache. func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.FlushWriteCache", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), attribute.Bool("ignore_errors", p.ignoreErrors), attribute.Bool("seal", p.seal), )) defer span.End() if !s.hasWriteCache() { return errWriteCacheDisabled } s.m.RLock() defer s.m.RUnlock() // To write data to the blobstor we need to write to the blobstor and the metabase. if s.info.Mode.ReadOnly() { return ErrReadOnlyMode } if s.info.Mode.NoMetabase() { return ErrDegradedMode } return s.writeCache.Flush(ctx, p.ignoreErrors, p.seal) } type SealWriteCachePrm struct { IgnoreErrors bool Async bool RestoreMode bool Shrink bool } // SealWriteCache flushes all data from the write-cache and moves it to degraded read only mode. func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.SealWriteCache", trace.WithAttributes( attribute.String("shard_id", s.ID().String()), attribute.Bool("ignore_errors", p.IgnoreErrors), attribute.Bool("restore_mode", p.RestoreMode), )) defer span.End() if !s.hasWriteCache() { return errWriteCacheDisabled } if p.Async { ctx = context.WithoutCancel(ctx) } ctx, cancel := context.WithCancel(ctx) canceler := &writecacheSealCanceler{cancel: cancel} if !s.writecacheSealCancel.CompareAndSwap(dummyCancel, canceler) { return errWriteCacheSealing } s.m.RLock() cleanup := func() { s.m.RUnlock() s.writecacheSealCancel.Store(dummyCancel) } if s.info.Mode.ReadOnly() { cleanup() return ErrReadOnlyMode } if s.info.Mode.NoMetabase() { cleanup() return ErrDegradedMode } if !p.Async { defer cleanup() } prm := writecache.SealPrm{IgnoreErrors: p.IgnoreErrors, RestoreMode: p.RestoreMode, Shrink: p.Shrink} if p.Async { started := make(chan struct{}) go func() { close(started) defer cleanup() s.log.Info(logs.StartedWritecacheSealAsync) if err := s.writeCache.Seal(ctx, prm); err != nil { s.log.Warn(logs.FailedToSealWritecacheAsync, zap.Error(err)) return } s.log.Info(logs.WritecacheSealCompletedAsync) }() select { case <-ctx.Done(): return ctx.Err() case <-started: return nil } } return s.writeCache.Seal(ctx, prm) }