From 93d63e1632f54869ca1c155cdd11d2c9966c3090 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 31 Jul 2024 16:30:07 +0300 Subject: [PATCH] [#1284] writecache: Allow to seal writecache async Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-cli/modules/control/writecache.go | 4 ++ internal/logs/logs.go | 3 + pkg/local_object_storage/engine/writecache.go | 3 +- pkg/local_object_storage/shard/control.go | 6 ++ pkg/local_object_storage/shard/shard.go | 6 +- pkg/local_object_storage/shard/writecache.go | 54 +++++++++++++++++- .../control/server/seal_writecache.go | 1 + pkg/services/control/service.pb.go | Bin 271229 -> 271609 bytes pkg/services/control/service.proto | 3 + pkg/services/control/service_frostfs.pb.go | Bin 124245 -> 124336 bytes 10 files changed, 75 insertions(+), 5 deletions(-) diff --git a/cmd/frostfs-cli/modules/control/writecache.go b/cmd/frostfs-cli/modules/control/writecache.go index b725d8471..ffe9009ab 100644 --- a/cmd/frostfs-cli/modules/control/writecache.go +++ b/cmd/frostfs-cli/modules/control/writecache.go @@ -10,6 +10,7 @@ import ( ) const ( + asyncFlag = "async" restoreModeFlag = "restore-mode" shrinkFlag = "shrink" ) @@ -31,12 +32,14 @@ func sealWritecache(cmd *cobra.Command, _ []string) { pk := key.Get(cmd) ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag) + async, _ := cmd.Flags().GetBool(asyncFlag) restoreMode, _ := cmd.Flags().GetBool(restoreModeFlag) shrink, _ := cmd.Flags().GetBool(shrinkFlag) req := &control.SealWriteCacheRequest{Body: &control.SealWriteCacheRequest_Body{ Shard_ID: getShardIDList(cmd), IgnoreErrors: ignoreErrors, + Async: async, RestoreMode: restoreMode, Shrink: shrink, }} @@ -77,6 +80,7 @@ func initControlShardsWritecacheCmd() { ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding") ff.Bool(shardAllFlag, false, "Process all shards") ff.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects") + ff.Bool(asyncFlag, false, "Run operation in background") ff.Bool(restoreModeFlag, false, "Restore writecache's mode after sealing") ff.Bool(shrinkFlag, false, "Shrink writecache's internal storage") diff --git a/internal/logs/logs.go b/internal/logs/logs.go index ebb822e1c..78bcd0c0e 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -539,5 +539,8 @@ const ( PolicerCouldNotGetChunk = "could not get EC chunk" PolicerCouldNotGetChunks = "could not get EC chunks" AuditEventLogRecord = "audit event log record" + StartedWritecacheSealAsync = "started writecache seal async" + WritecacheSealCompletedAsync = "writecache seal completed successfully" + FailedToSealWritecacheAsync = "failed to seal writecache async" WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: database is not empty" ) diff --git a/pkg/local_object_storage/engine/writecache.go b/pkg/local_object_storage/engine/writecache.go index 2c5e8cc3a..3e8f387ef 100644 --- a/pkg/local_object_storage/engine/writecache.go +++ b/pkg/local_object_storage/engine/writecache.go @@ -70,6 +70,7 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr type SealWriteCachePrm struct { ShardIDs []*shard.ID IgnoreErrors bool + Async bool RestoreMode bool Shrink bool } @@ -117,7 +118,7 @@ func (e *StorageEngine) SealWriteCache(ctx context.Context, prm SealWriteCachePr return nil } - err := sh.SealWriteCache(egCtx, shard.SealWriteCachePrm{IgnoreErrors: prm.IgnoreErrors, RestoreMode: prm.RestoreMode, Shrink: prm.Shrink}) + err := sh.SealWriteCache(egCtx, shard.SealWriteCachePrm{IgnoreErrors: prm.IgnoreErrors, Async: prm.Async, RestoreMode: prm.RestoreMode, Shrink: prm.Shrink}) resGuard.Lock() defer resGuard.Unlock() diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 90d7afdd4..210744702 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -178,6 +178,7 @@ func (s *Shard) Init(ctx context.Context) error { if !m.NoMetabase() { s.rb.Start(ctx, s.blobStor, s.metaBase, s.log) } + s.writecacheSealCancel.Store(dummyCancel) return nil } @@ -350,6 +351,8 @@ func (s *Shard) Close() error { } if s.hasWriteCache() { + prev := s.writecacheSealCancel.Swap(notInitializedCancel) + prev.cancel() // no need to wait: writecache.Seal and writecache.Close lock the same mutex components = append(components, s.writeCache) } @@ -428,6 +431,9 @@ func (s *Shard) lockExclusive() func() { cancelGC := val.(context.CancelFunc) cancelGC() } + if c := s.writecacheSealCancel.Load(); c != nil { + c.cancel() + } s.m.Lock() s.setModeRequested.Store(false) return s.m.Unlock diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 94f22feb5..93f5354a7 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -37,8 +37,9 @@ type Shard struct { rb *rebuilder - gcCancel atomic.Value - setModeRequested atomic.Bool + gcCancel atomic.Value + setModeRequested atomic.Bool + writecacheSealCancel atomic.Pointer[writecacheSealCanceler] } // Option represents Shard's constructor option. @@ -190,6 +191,7 @@ func New(opts ...Option) *Shard { } s.fillInfo() + s.writecacheSealCancel.Store(notInitializedCancel) return s } diff --git a/pkg/local_object_storage/shard/writecache.go b/pkg/local_object_storage/shard/writecache.go index c29710930..a6de07f03 100644 --- a/pkg/local_object_storage/shard/writecache.go +++ b/pkg/local_object_storage/shard/writecache.go @@ -4,12 +4,24 @@ import ( "context" "errors" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" ) +var ( + dummyCancel = &writecacheSealCanceler{cancel: func() {}} + notInitializedCancel = &writecacheSealCanceler{cancel: func() {}} + errWriteCacheSealing = errors.New("writecache is already sealing or shard is not initialized") +) + +type writecacheSealCanceler struct { + cancel context.CancelFunc +} + // FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation. type FlushWriteCachePrm struct { ignoreErrors bool @@ -60,6 +72,7 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error type SealWriteCachePrm struct { IgnoreErrors bool + Async bool RestoreMode bool Shrink bool } @@ -78,15 +91,52 @@ func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error { return errWriteCacheDisabled } + if p.Async { + ctx = context.WithoutCancel(ctx) + } + ctx, cancel := context.WithCancel(ctx) + canceler := &writecacheSealCanceler{cancel: cancel} + if !s.writecacheSealCancel.CompareAndSwap(dummyCancel, canceler) { + return errWriteCacheSealing + } s.m.RLock() - defer s.m.RUnlock() + cleanup := func() { + s.m.RUnlock() + s.writecacheSealCancel.Store(dummyCancel) + } if s.info.Mode.ReadOnly() { + cleanup() return ErrReadOnlyMode } if s.info.Mode.NoMetabase() { + cleanup() return ErrDegradedMode } - return s.writeCache.Seal(ctx, writecache.SealPrm{IgnoreErrors: p.IgnoreErrors, RestoreMode: p.RestoreMode, Shrink: p.Shrink}) + if !p.Async { + defer cleanup() + } + prm := writecache.SealPrm{IgnoreErrors: p.IgnoreErrors, RestoreMode: p.RestoreMode, Shrink: p.Shrink} + if p.Async { + started := make(chan struct{}) + go func() { + close(started) + defer cleanup() + + s.log.Info(logs.StartedWritecacheSealAsync) + if err := s.writeCache.Seal(ctx, prm); err != nil { + s.log.Warn(logs.FailedToSealWritecacheAsync, zap.Error(err)) + return + } + s.log.Info(logs.WritecacheSealCompletedAsync) + }() + select { + case <-ctx.Done(): + return ctx.Err() + case <-started: + return nil + } + } + return s.writeCache.Seal(ctx, prm) } diff --git a/pkg/services/control/server/seal_writecache.go b/pkg/services/control/server/seal_writecache.go index 697b91918..1737677b7 100644 --- a/pkg/services/control/server/seal_writecache.go +++ b/pkg/services/control/server/seal_writecache.go @@ -19,6 +19,7 @@ func (s *Server) SealWriteCache(ctx context.Context, req *control.SealWriteCache prm := engine.SealWriteCachePrm{ ShardIDs: s.getShardIDList(req.GetBody().GetShard_ID()), IgnoreErrors: req.GetBody().GetIgnoreErrors(), + Async: req.GetBody().GetAsync(), RestoreMode: req.GetBody().GetRestoreMode(), Shrink: req.GetBody().GetShrink(), } diff --git a/pkg/services/control/service.pb.go b/pkg/services/control/service.pb.go index ac512f1a55f3e98a6769dca917bb873fd76e5f6d..e5a5ce24c10c4f72796048e1b86927fa55215815 100644 GIT binary patch delta 2829 zcmY*bYfzNu6`nmu7J&tpn?bH0*X7Q_vcQ5MR>7zc7Bd~AMJ|$UjiI=r6@nebggVV& zgy~RDGoiAbPC9L@l}?+IcBTzUtdW`miMiN}B5kHkqOIbzwu4dW^L}3-@elBR@3}tb zdCtqdbJm4PYj2oNW&3|kRBgdhlAN-aq3?Ub4j%EL-Iopf`1akq2m5Jtq|Wn@{=A5hs)q7`Adw2 zW2AbF!ldz0vZxTIU6L3Mza_(13Jj3eDvp$wEC`a|rBIo)aIV~ag@WY6QL>07m_y{8 zg=3{Sn3I(K+Z!oT+?zN+^8I*^$v_*LySjywG_bFF=9*lKsAPntZq)QkeuQ;NOIUJ7S2ASJQA!5A+ ze|-is(HP6bGh`R-UD&$=*eaGjs#wk#Dl4Bu(2FpKmUw=Vp`$AH00@6E|=Lq z0@ib}z+Kx2Hk?i3)e;y7Bl82d@(R@~ZmP`I>gMbT3lKPi*j$f1l= zWiW}$#9~9Ds2vzrRs+>Yo`%xEkPJCd&Jps9CeoH8CyRFnWy?%5BHz_O=x;XBZxt~9 zD@qm5rjkI5v2OrZ^bsomog$|Krd?H^B+un=vYcqc?Lab8Ud-b#v8HjB*qu0@OyNjH z0MmISw0EGD!%nt|uZEp+H5Ed0r*f+FH-I?E555(1saT)K_?kb)h~}#~NUo)UTky3> z?;E6Zx+D!1<#v%PpunI`L7UwA2=l7y1Y+QYRhqjgO}s;p6{_?%JuWw=s9eS#1%%K@ zkl4Gx$mV^NsgTc9aD}Qm<7|i=d=!#QP2f$^e+WR&9swuT+5qa@Avh&o$MMqyu3f%0 zD^FhrJezEQ8CJo|^IRoSnE=-Yi5 zma14>S=b==KLV~W0T*%s!jehGm%`u4LcG{6MEUGRvVh>b~6>a6z*$epd74Nf?O}HMLx)59VUQp2PIA)G~ACUu9~H9 zBi4sEiUY+pLFW}DW_hast#+)(Vi+g`Le6fmcJXy8lp7Z?(U)GP%`y`V>0T*B(|6v) zVrcmlG!pw=ER}(?pjev&zIxCX9T=4F0!piV7QViGaJBh7psOn41SOO_Ipx6mg-L?k z$l-mG@5J$}lU;r~=VUeoEA?;3NmJOhe!gTQmV5VCRE{(x_ZqwmRn8Wp(~RfA&&U#l z)5i<#m}O@P2O!XJ34DS}0n9RW<#Y+gwWKoLGFy%R%lqqhWzh7tQZ5wvm}+Hj70|g{ z3jRRJ`I0?NIig)b7X5bYB4{$xLy3msE7zbnrMN)-i7YNsl5Z*HI+=MF;8iiN61X^| zs+28$*wzJ3nY=`O%UKG=?Z)V)F4517F(IP<8tJ>nCe|rKw0~WojpF``n(~9y+bcLg z4_qR*+M$rIRbVy~?_dY`Zqp+&_zzS*1setQ+m$Hnu?kG<^as==_pV|3`r`U?CCK>5 zJQf_>U6NM~x$gWYIz_S=nVUu0?YSH+LknnS@HTvblD^D}WNaM@4qZf=GYdwob5#(t z`zBS1mdrf}R}gaM4_Lae`;My4UmP0{ zz-RN2@W?B~F-SXoCT9=7XWk zQh5!HzFN)(Y<@**JxUAnI+<9lR?|gPG+hmOXqAxc>~+%L>Ms%lXMgFfS)xw!Jn$X8 zMuP^(5Ty?CCF9vV@vt#CyGPu?oUQ)l(EC{I$Fj$s_cwXum%aF>rap`t#p@4c=}X?J z@fulr%?|4?jg+NfL;PT~@kW2~S`4KDGwo%p@k3GEX>7)Zbk5QDj$e^Z)(2m7D|!Y? zi*zB~r@v&MwxV3f08`Y}5JXV8^o3GyR+#1Rl|Fn*W G-upjcZ+p=I delta 2802 zcmZ`*YfzNu6&}uExnI_0L0wqfU6%W@yWDSr2&vo=q3xiEfSQa=GGr%7Ep`;JjmAl> zU}xmCOgn(hkBmu`(MDQrlWkL5Q?1gOWNI%FCz)x~#1h&ujV3~$_xoHzCzC%r^X_}j zdCz&ybDs0vd(-&f_r{^;f__e)rB5HqphrTci#_z}H7Y5Z{#PicGL_`YofTXl8^1}h z;(MM<;+Y`h^!0IS50VoDI6oTAu`+j)4Dx;nuMle#M@z_VvKVaMes6*FcF;;0_!fnQ zM1#TXcWbb{-_dMHZjd#Oo2Sb>}@hshzKsi?|r;uz_R z<_M`Zu|+<4jI6TH#5NiFDmmrG8aAp&(`92gXURLeXoGwsjuYii+sP<7trRL_ahw)p zli$X1iljN%A%h+cktj1>*%rxBGI#)%v*Q$IP*@d=4w6@fRE-erQK&wqOhKL4wt?Pjuguz;2enObgdzU^5^%^kSm8MOhvsE zkEos|WHGmzljZaU+|BGDn|S|9!Sd6m$vnCjJ#M`PK>z5cEIF3|3EbN_1Iz4eQeE68cQd$NJnaf* z2ZtytPaCz1?@a`LEtBii-FET4fb?f;I6*4x=z8T=N*0>~eA2o)Wk**7#0Zw}UZc2C z2VSw-cvv#GQodBRfi*3;Tqe(0IZn2uuvtCzR0?XSyNtV>dd$9K z(&GgSPZR?jP~W59&OS~SMS?P^O2IqlgF(vJCN{{$G}vu$!{c5WsNd7XPWk@J$j9G- z%og3~tG6HY_*FHu;&M8US_Re`R50-XR5fOx_KDp{v^$K$bam_zuP-=S9kqp-Y&-J%wQZYyp4_&q3BB2gy15IL7L4 z$7x8f=Ih-QBQ2MqHxEk+73(WFX@RG(vS~KHQh|UKv%@O7L|4uuuXF;_R9LR zR4*f^K@eje2P-5>#nz75yLl0#1#Pg(8V|3P;THk#!RG+JzYsVDh9*KX3VD+l&qKe4 z&ms!!T;lr?Dh1EA&0#-ECBXrPrzrARrLq`!%oc${zHsnRi#wGKZF#vEa}fw0EUy*e zX_!PW8{{n|fTHmjQaoCM@zn+c+7u#BmO`(7HiHS*1~9XfEu%jmhdzX5$Ix+2LVjJ! zQ95jeGE&CH((^7{=I)awBKzN>G`YAIC=91Un2(il-WLq=HDUx6>FgXTM+HAALu*=Q zs9XG(Fk=2v4DVS7+L~N}f(~xrj!2!;d4)<6G>HQXOk%A-4eDMdW&m0M;Sf2m96v__ z+GLlW8VG~G96!FhkOQrp6P1B-ie=MHs!-XsN#i*Lt5yQFcNa*1WF013^Y+bBmCV%w zt{XQgTI~_DxEdhiHC3Fi%QB2Lkvm$>n`P!>2*a%fEHBu{svVuKSZ*>DYFeLL!_6uq zlMK8|Zuww6#_@;;SU^=yE{lG<8qqa=daCv4llQ@cKtWEm1~c`7{$0Z+tv`9W()uyg zNY!1;>F!x7)AL?j01_Ym2ULIi5loveA2S|xmkuu7mM-zhAjS>L6y6~J{FFwP?9%0l zI*vH@OR!5JiKX9A%PC6F`f8O_&7<-F*be2DD2vs=exwP4IhG2c?wiM^`cg#i*DXg> zuKokxKV!83C=gkzI!Vvxb95scI9&U-@QUAr%`w%;iAoT)MIrk?RLe0PQScd~&6m-L zy#Xd8J~Fd+GZsU{fpJ&H6UI--zn4`ZZPVY{J2|6$E&=-@8Q(V(`u-nqIeX z{QvF!<(~{6?Se{NZRS|@_-r#`{h|drnBAf`xvGWBf$P2&9oI?yc>UqAwH8|8-iTU0 P=_i%V^p1@@`u6_-rPWlj diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index 486f30a93..d6639cb48 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -656,6 +656,9 @@ message SealWriteCacheRequest { // Flag indicating whether object read errors should be ignored. bool ignore_errors = 2; + // Flag indicating whether writecache will be sealed async. + bool async = 3; + // If true, then writecache will be sealed, but mode will be restored to the current one. bool restore_mode = 4; diff --git a/pkg/services/control/service_frostfs.pb.go b/pkg/services/control/service_frostfs.pb.go index 417d25c05e9e47799ddcd960120a7ae1c31e5295..822244e77dad0a07943f2b2a6f70fccae00b1378 100644 GIT binary patch delta 54 zcmcb5ihaXr_J%Et54Ld`>nK#{ITlyuB~NcW#3;4>!8S%==E?VRgjpeig1Z>yw}0Nr Hn4}E=aa