package loadroute import ( "context" "encoding/hex" "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "go.uber.org/zap" ) // InitWriter initializes and returns Writer that sends each value to its next route point. // // If route is present, then it is taken into account, // and the value will be sent to its continuation. Otherwise, the route will be laid // from scratch and the value will be sent to its primary point. // // After building a list of remote points of the next leg of the route, the value is sent // sequentially to all of them. If any transmissions (even all) fail, an error will not // be returned. // // Close of the composed Writer calls Close method on each internal Writer generated in // runtime and never returns an error. // // Always returns nil error. func (r *Router) InitWriter(route []loadcontroller.ServerInfo) (loadcontroller.Writer, error) { if len(route) == 0 { route = []loadcontroller.ServerInfo{r.localSrvInfo} } return &loadWriter{ router: r, route: route, mRoute: make(map[routeKey]*valuesRoute), mServers: make(map[string]loadcontroller.Writer), }, nil } type routeKey struct { epoch uint64 cid string } type valuesRoute struct { route []loadcontroller.ServerInfo values []container.SizeEstimation } type loadWriter struct { router *Router route []loadcontroller.ServerInfo routeMtx sync.RWMutex mRoute map[routeKey]*valuesRoute mServers map[string]loadcontroller.Writer } func (w *loadWriter) Put(a container.SizeEstimation) error { w.routeMtx.Lock() defer w.routeMtx.Unlock() key := routeKey{ epoch: a.Epoch(), cid: a.Container().EncodeToString(), } routeValues, ok := w.mRoute[key] if !ok { route, err := w.router.routeBuilder.NextStage(a, w.route) if err != nil { return err } else if len(route) == 0 { route = []loadcontroller.ServerInfo{nil} } routeValues = &valuesRoute{ route: route, values: []container.SizeEstimation{a}, } w.mRoute[key] = routeValues } for _, remoteInfo := range routeValues.route { var key string if remoteInfo != nil { key = hex.EncodeToString(remoteInfo.PublicKey()) } remoteWriter, ok := w.mServers[key] if !ok { provider, err := w.router.remoteProvider.InitRemote(remoteInfo) if err != nil { w.router.log.Debug(logs.RouteCouldNotInitializeWriterProvider, zap.String("error", err.Error()), ) continue // best effort } remoteWriter, err = provider.InitWriter(w.route) if err != nil { w.router.log.Debug(logs.RouteCouldNotInitializeWriter, zap.String("error", err.Error()), ) continue // best effort } w.mServers[key] = remoteWriter } err := remoteWriter.Put(a) if err != nil { w.router.log.Debug(logs.RouteCouldNotPutTheValue, zap.String("error", err.Error()), ) } // continue best effort } return nil } func (w *loadWriter) Close(ctx context.Context) error { for key, wRemote := range w.mServers { err := wRemote.Close(ctx) if err != nil { w.router.log.Debug(logs.RouteCouldNotCloseRemoteServerWriter, zap.String("key", key), zap.String("error", err.Error()), ) } } return nil }