risingwave_meta/barrier/
worker.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::pin::pin;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use arc_swap::ArcSwap;
24use futures::{TryFutureExt, pin_mut};
25use itertools::Itertools;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::system_param::{AdaptiveParallelismStrategy, PAUSE_ON_NEXT_BOOTSTRAP_KEY};
28use risingwave_pb::meta::Recovery;
29use risingwave_pb::meta::subscribe_response::{Info, Operation};
30use risingwave_pb::stream_service::streaming_control_stream_response::Response;
31use thiserror_ext::AsReport;
32use tokio::select;
33use tokio::sync::mpsc;
34use tokio::sync::oneshot::{Receiver, Sender};
35use tokio::task::JoinHandle;
36use tonic::Status;
37use tracing::{Instrument, debug, error, info, warn};
38use uuid::Uuid;
39
40use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
42use crate::barrier::context::recovery::{RenderedDatabaseRuntimeInfo, render_runtime_info};
43use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
44use crate::barrier::rpc::{
45    ControlStreamManager, WorkerNodeEvent, from_partial_graph_id, merge_node_rpc_errors,
46};
47use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
48use crate::barrier::{
49    BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, RecoveryReason,
50    UpdateDatabaseBarrierRequest, schedule,
51};
52use crate::error::MetaErrorInner;
53use crate::hummock::HummockManagerRef;
54use crate::manager::sink_coordination::SinkCoordinatorManager;
55use crate::manager::{
56    ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
57    MetadataManager,
58};
59use crate::rpc::metrics::GLOBAL_META_METRICS;
60use crate::stream::{GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef};
61use crate::{MetaError, MetaResult};
62
63/// [`crate::barrier::worker::GlobalBarrierWorker`] sends barriers to all registered compute nodes and
64/// collect them, with monotonic increasing epoch numbers. On compute nodes, `LocalBarrierManager`
65/// in `risingwave_stream` crate will serve these requests and dispatch them to source actors.
66///
67/// Configuration change in our system is achieved by the mutation in the barrier. Thus,
68/// [`crate::barrier::worker::GlobalBarrierWorker`] provides a set of interfaces like a state machine,
69/// accepting [`crate::barrier::command::Command`] that carries info to build `Mutation`. To keep the consistency between
70/// barrier manager and meta store, some actions like "drop materialized view" or "create mv on mv"
71/// must be done in barrier manager transactional using [`crate::barrier::command::Command`].
72pub(super) struct GlobalBarrierWorker<C> {
73    /// Enable recovery or not when failover.
74    enable_recovery: bool,
75
76    /// The queue of scheduled barriers.
77    periodic_barriers: PeriodicBarriers,
78
79    /// Whether per database failure isolation is enabled in system parameters.
80    system_enable_per_database_isolation: bool,
81
82    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
83
84    pub(super) context: Arc<C>,
85
86    env: MetaSrvEnv,
87
88    checkpoint_control: CheckpointControl,
89
90    /// Command that has been collected but is still completing.
91    /// The join handle of the completing future is stored.
92    completing_task: CompletingTask,
93
94    request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
95
96    active_streaming_nodes: ActiveStreamingWorkerNodes,
97
98    control_stream_manager: ControlStreamManager,
99
100    term_id: String,
101}
102
103impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
104    pub(super) async fn new_inner(
105        env: MetaSrvEnv,
106        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
107        context: Arc<C>,
108    ) -> Self {
109        let enable_recovery = env.opts.enable_recovery;
110
111        let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
112
113        let control_stream_manager = ControlStreamManager::new(env.clone());
114
115        let reader = env.system_params_reader().await;
116        let system_enable_per_database_isolation = reader.per_database_isolation();
117        // Load config will be performed in bootstrap phase.
118        let periodic_barriers = PeriodicBarriers::default();
119        let adaptive_parallelism_strategy = reader.adaptive_parallelism_strategy();
120
121        let checkpoint_control = CheckpointControl::new(env.clone());
122        Self {
123            enable_recovery,
124            periodic_barriers,
125            system_enable_per_database_isolation,
126            adaptive_parallelism_strategy,
127            context,
128            env,
129            checkpoint_control,
130            completing_task: CompletingTask::None,
131            request_rx,
132            active_streaming_nodes,
133            control_stream_manager,
134            term_id: "uninitialized".into(),
135        }
136    }
137}
138
139impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
140    /// Create a new [`crate::barrier::worker::GlobalBarrierWorker`].
141    pub async fn new(
142        scheduled_barriers: schedule::ScheduledBarriers,
143        env: MetaSrvEnv,
144        metadata_manager: MetadataManager,
145        hummock_manager: HummockManagerRef,
146        source_manager: SourceManagerRef,
147        sink_manager: SinkCoordinatorManager,
148        scale_controller: ScaleControllerRef,
149        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
150        barrier_scheduler: schedule::BarrierScheduler,
151        refresh_manager: GlobalRefreshManagerRef,
152    ) -> Self {
153        let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
154
155        let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
156            scheduled_barriers,
157            status,
158            metadata_manager,
159            hummock_manager,
160            source_manager,
161            scale_controller,
162            env.clone(),
163            barrier_scheduler,
164            refresh_manager,
165            sink_manager,
166        ));
167
168        Self::new_inner(env, request_rx, context).await
169    }
170
171    pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
172        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
173        let fut = (self.env.await_tree_reg())
174            .register_derived_root("Global Barrier Worker")
175            .instrument(self.run(shutdown_rx));
176        let join_handle = tokio::spawn(fut);
177
178        (join_handle, shutdown_tx)
179    }
180
181    /// Check whether we should pause on bootstrap from the system parameter and reset it.
182    async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
183        let paused = self
184            .env
185            .system_params_reader()
186            .await
187            .pause_on_next_bootstrap()
188            || self.env.opts.pause_on_next_bootstrap_offline;
189
190        if paused {
191            warn!(
192                "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
193                 It will now be reset to `false`. \
194                 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
195                PAUSE_ON_NEXT_BOOTSTRAP_KEY
196            );
197            self.env
198                .system_params_manager_impl_ref()
199                .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
200                .await?;
201        }
202        Ok(paused)
203    }
204
205    /// Start an infinite loop to take scheduled barriers and send them.
206    async fn run(mut self, shutdown_rx: Receiver<()>) {
207        tracing::info!(
208            "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
209            self.enable_recovery,
210            self.checkpoint_control.in_flight_barrier_nums,
211        );
212
213        if !self.enable_recovery {
214            let job_exist = self
215                .context
216                .metadata_manager
217                .catalog_controller
218                .has_any_streaming_jobs()
219                .await
220                .unwrap();
221            if job_exist {
222                panic!(
223                    "Some streaming jobs already exist in meta, please start with recovery enabled \
224                or clean up the metadata using `./risedev clean-data`"
225                );
226            }
227        }
228
229        {
230            // Bootstrap recovery. Here we simply trigger a recovery process to achieve the
231            // consistency.
232            // Even if there's no actor to recover, we still go through the recovery process to
233            // inject the first `Initial` barrier.
234            let span = tracing::info_span!("bootstrap_recovery");
235            crate::telemetry::report_event(
236                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
237                "normal_recovery",
238                0,
239                None,
240                None,
241                None,
242            );
243
244            let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
245
246            self.recovery(paused, RecoveryReason::Bootstrap)
247                .instrument(span)
248                .await;
249        }
250
251        self.run_inner(shutdown_rx).await
252    }
253}
254
255impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
256    fn enable_per_database_isolation(&self) -> bool {
257        self.system_enable_per_database_isolation && {
258            if let Err(e) =
259                risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
260            {
261                warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
262                false
263            } else {
264                true
265            }
266        }
267    }
268
269    pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
270        let (local_notification_tx, mut local_notification_rx) =
271            tokio::sync::mpsc::unbounded_channel();
272        self.env
273            .notification_manager()
274            .insert_local_sender(local_notification_tx);
275
276        // Start the event loop.
277        loop {
278            tokio::select! {
279                biased;
280
281                // Shutdown
282                _ = &mut shutdown_rx => {
283                    tracing::info!("Barrier manager is stopped");
284                    break;
285                }
286
287                request = self.request_rx.recv() => {
288                    if let Some(request) = request {
289                        match request {
290                            BarrierManagerRequest::GetBackfillProgress(result_tx) => {
291                                let progress = self.checkpoint_control.gen_backfill_progress();
292                                if result_tx.send(Ok(progress)).is_err() {
293                                    error!("failed to send get ddl progress");
294                                }
295                            }
296                            BarrierManagerRequest::GetFragmentBackfillProgress(result_tx) => {
297                                let progress =
298                                    self.checkpoint_control.gen_fragment_backfill_progress();
299                                if result_tx.send(Ok(progress)).is_err() {
300                                    error!("failed to send get fragment backfill progress");
301                                }
302                            }
303                            BarrierManagerRequest::GetCdcProgress(result_tx) => {
304                                let progress = self.checkpoint_control.gen_cdc_progress();
305                                if result_tx.send(Ok(progress)).is_err() {
306                                    error!("failed to send get ddl progress");
307                                }
308                            }
309                            // Handle adhoc recovery triggered by user.
310                            BarrierManagerRequest::AdhocRecovery(sender) => {
311                                self.adhoc_recovery().await;
312                                if sender.send(()).is_err() {
313                                    warn!("failed to notify finish of adhoc recovery");
314                                }
315                            }
316                            BarrierManagerRequest::UpdateDatabaseBarrier( UpdateDatabaseBarrierRequest {
317                                database_id,
318                                barrier_interval_ms,
319                                checkpoint_frequency,
320                                sender,
321                            }) => {
322                                self.periodic_barriers
323                                    .update_database_barrier(
324                                        database_id,
325                                        barrier_interval_ms,
326                                        checkpoint_frequency,
327                                    );
328                                if sender.send(()).is_err() {
329                                    warn!("failed to notify finish of update database barrier");
330                                }
331                            }
332                            BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
333                                if tx.send(self.checkpoint_control.may_have_snapshot_backfilling_jobs()).is_err() {
334                                    warn!("failed to may have snapshot backfill job");
335                                }
336                            }
337                        }
338                    } else {
339                        tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
340                        return;
341                    }
342                }
343
344                changed_worker = self.active_streaming_nodes.changed() => {
345                    #[cfg(debug_assertions)]
346                    {
347                        self.active_streaming_nodes.validate_change().await;
348                    }
349
350                    info!(?changed_worker, "worker changed");
351
352                    match changed_worker {
353                        ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) => {
354                            self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
355                        }
356                        ActiveStreamingWorkerChange::Remove(node) => {
357                            self.control_stream_manager.remove_worker(node);
358                        }
359                    }
360                }
361
362                notification = local_notification_rx.recv() => {
363                    let notification = notification.unwrap();
364                    if let LocalNotification::SystemParamsChange(p) = notification {
365                        {
366                            self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
367                            self.periodic_barriers
368                                .set_sys_checkpoint_frequency(p.checkpoint_frequency());
369                            self.system_enable_per_database_isolation = p.per_database_isolation();
370                            self.adaptive_parallelism_strategy = p.adaptive_parallelism_strategy();
371                        }
372                    }
373                }
374                complete_result = self
375                    .completing_task
376                    .next_completed_barrier(
377                        &mut self.periodic_barriers,
378                        &mut self.checkpoint_control,
379                        &mut self.control_stream_manager,
380                        &self.context,
381                        &self.env,
382                ) => {
383                    match complete_result {
384                        Ok(output) => {
385                            self.checkpoint_control.ack_completed(output);
386                        }
387                        Err(e) => {
388                            self.failure_recovery(e).await;
389                        }
390                    }
391                },
392                event = self.checkpoint_control.next_event() => {
393                    let result: MetaResult<()> = try {
394                        match event {
395                            CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
396                                let database_id = entering_initializing.database_id();
397                                let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
398                                    resp.root_err.as_ref().map(|root_err| {
399                                        (*worker_id, ScoredError {
400                                            error: Status::internal(&root_err.err_msg),
401                                            score: Score(root_err.score)
402                                        })
403                                    })
404                                }));
405                                Self::report_collect_failure(&self.env, &error);
406                                self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
407                                let result: MetaResult<_> = try {
408                                    let runtime_info = self.context.reload_database_runtime_info(database_id).await.inspect_err(|err| {
409                                        warn!(%database_id, err = %err.as_report(), "reload runtime info failed");
410                                    })?;
411                                    let rendered_info = render_runtime_info(
412                                        self.env.actor_id_generator(),
413                                        &self.active_streaming_nodes,
414                                        self.adaptive_parallelism_strategy,
415                                        &runtime_info.recovery_context,
416                                        database_id,
417                                    )
418                                    .inspect_err(|err: &MetaError| {
419                                        warn!(%database_id, err = %err.as_report(), "render runtime info failed");
420                                    })?;
421                                    if let Some(rendered_info) = rendered_info {
422                                        BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
423                                            database_id,
424                                            &rendered_info.job_infos,
425                                            &self.active_streaming_nodes,
426                                            &rendered_info.stream_actors,
427                                            &runtime_info.state_table_committed_epochs,
428                                        )
429                                        .inspect_err(|err| {
430                                            warn!(%database_id, err = ?err.as_report(), "database runtime info failed validation");
431                                        })?;
432                                        Some((runtime_info, rendered_info))
433                                    } else {
434                                        None
435                                    }
436                                };
437                                match result {
438                                    Ok(Some((runtime_info, rendered_info))) => {
439                                        entering_initializing.enter(
440                                            runtime_info,
441                                            rendered_info,
442                                            &mut self.control_stream_manager,
443                                        );
444                                    }
445                                    Ok(None) => {
446                                        info!(%database_id, "database removed after reloading empty runtime info");
447                                        // mark ready to unblock subsequent request
448                                        self.context.mark_ready(MarkReadyOptions::Database(database_id));
449                                        entering_initializing.remove();
450                                    }
451                                    Err(e) => {
452                                        entering_initializing.fail_reload_runtime_info(e);
453                                    }
454                                }
455                            }
456                            CheckpointControlEvent::EnteringRunning(entering_running) => {
457                                self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
458                                entering_running.enter();
459                            }
460                        }
461                    };
462                    if let Err(e) = result {
463                        self.failure_recovery(e).await;
464                    }
465                }
466                (worker_id, event) = self.control_stream_manager.next_event(&self.term_id, &self.context) => {
467                    let resp_result = match event {
468                        WorkerNodeEvent::Response(result) => {
469                            result
470                        }
471                        WorkerNodeEvent::Connected(connected) => {
472                            connected.initialize(self.checkpoint_control.inflight_infos());
473                            continue;
474                        }
475                    };
476                    let result: MetaResult<()> = try {
477                        let resp = match resp_result {
478                            Err(err) => {
479                                let failed_databases = self.checkpoint_control.databases_failed_at_worker_err(worker_id);
480                                if !failed_databases.is_empty() {
481                                    if !self.enable_recovery {
482                                        panic!("control stream to worker {} failed but recovery not enabled: {:?}", worker_id, err.as_report());
483                                    }
484                                    if !self.enable_per_database_isolation() {
485                                        Err(err.clone())?;
486                                    }
487                                    Self::report_collect_failure(&self.env, &err);
488                                    for database_id in failed_databases {
489                                        if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
490                                            warn!(%worker_id, %database_id, "database entering recovery on node failure");
491                                            self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
492                                            self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
493                                            // TODO: add log on blocking time
494                                            let output = self.completing_task.wait_completing_task().await?;
495                                            entering_recovery.enter(output, &mut self.control_stream_manager);
496                                        }
497                                    }
498                                }  else {
499                                    warn!(%worker_id, "no barrier to collect from worker, ignore err");
500                                }
501                                continue;
502                            }
503                            Ok(resp) => resp,
504                        };
505                        match resp {
506                            Response::CompleteBarrier(resp) => {
507                                self.checkpoint_control.barrier_collected(resp, &mut self.periodic_barriers)?;
508                            },
509                            Response::ReportPartialGraphFailure(resp) => {
510                                if !self.enable_recovery {
511                                    panic!("database failure reported but recovery not enabled: {:?}", resp)
512                                }
513                                let (database_id, _) = from_partial_graph_id(resp.partial_graph_id);
514                                if !self.enable_per_database_isolation() {
515                                        Err(anyhow!("database {} reset", database_id))?;
516                                    }
517                                if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
518                                    warn!(%database_id, "database entering recovery");
519                                    self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
520                                    // TODO: add log on blocking time
521                                    let output = self.completing_task.wait_completing_task().await?;
522                                    entering_recovery.enter(output, &mut self.control_stream_manager);
523                                }
524                            }
525                            Response::ResetPartialGraph(resp) => {
526                                self.checkpoint_control.on_reset_partial_graph_resp(worker_id, resp);
527                            }
528                            other @ Response::Init(_) | other @ Response::Shutdown(_) => {
529                                Err(anyhow!("get expected response: {:?}", other))?;
530                            }
531                        }
532                    };
533                    if let Err(e) = result {
534                        self.failure_recovery(e).await;
535                    }
536                }
537                new_barrier = self.periodic_barriers.next_barrier(&*self.context) => {
538                    let database_id = new_barrier.database_id;
539                    if let Err(e) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.control_stream_manager) {
540                        if !self.enable_recovery {
541                            panic!(
542                                "failed to inject barrier to some databases but recovery not enabled: {:?}", (
543                                    database_id,
544                                    e.as_report()
545                                )
546                            );
547                        }
548                        let result: MetaResult<_> = try {
549                            if !self.enable_per_database_isolation() {
550                                let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
551                                Err(err)?;
552                            } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
553                                warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
554                                self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
555                                // TODO: add log on blocking time
556                                let output = self.completing_task.wait_completing_task().await?;
557                                entering_recovery.enter(output, &mut self.control_stream_manager);
558                            }
559                        };
560                        if let Err(e) = result {
561                            self.failure_recovery(e).await;
562                        }
563                    }
564                }
565            }
566            self.checkpoint_control.update_barrier_nums_metrics();
567        }
568    }
569}
570
571impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
572    /// We need to make sure there are no changes when doing recovery
573    pub async fn clear_on_err(&mut self, err: &MetaError) {
574        // join spawned completing command to finish no matter it succeeds or not.
575        let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
576            CompletingTask::None => false,
577            CompletingTask::Completing {
578                epochs_to_ack,
579                join_handle,
580                ..
581            } => {
582                info!("waiting for completing command to finish in recovery");
583                match join_handle.await {
584                    Err(e) => {
585                        warn!(err = %e.as_report(), "failed to join completing task");
586                        true
587                    }
588                    Ok(Err(e)) => {
589                        warn!(
590                            err = %e.as_report(),
591                            "failed to complete barrier during clear"
592                        );
593                        true
594                    }
595                    Ok(Ok(hummock_version_stats)) => {
596                        self.checkpoint_control
597                            .ack_completed(BarrierCompleteOutput {
598                                epochs_to_ack,
599                                hummock_version_stats,
600                            });
601                        false
602                    }
603                }
604            }
605            CompletingTask::Err(_) => true,
606        };
607        if !is_err {
608            // continue to finish the pending collected barrier.
609            while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
610                let epochs_to_ack = task.epochs_to_ack();
611                match task
612                    .complete_barrier(&*self.context, self.env.clone())
613                    .await
614                {
615                    Ok(hummock_version_stats) => {
616                        self.checkpoint_control
617                            .ack_completed(BarrierCompleteOutput {
618                                epochs_to_ack,
619                                hummock_version_stats,
620                            });
621                    }
622                    Err(e) => {
623                        error!(
624                            err = %e.as_report(),
625                            "failed to complete barrier during recovery"
626                        );
627                        break;
628                    }
629                }
630            }
631        }
632        self.checkpoint_control.clear_on_err(err);
633    }
634}
635
636impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
637    /// Set barrier manager status.
638    async fn failure_recovery(&mut self, err: MetaError) {
639        self.clear_on_err(&err).await;
640
641        if self.enable_recovery {
642            let span = tracing::info_span!(
643                "failure_recovery",
644                error = %err.as_report(),
645            );
646
647            crate::telemetry::report_event(
648                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
649                "failure_recovery",
650                0,
651                None,
652                None,
653                None,
654            );
655
656            let reason = RecoveryReason::Failover(err);
657
658            // No need to clean dirty tables for barrier recovery,
659            // The foreground stream job should cleanup their own tables.
660            self.recovery(false, reason).instrument(span).await;
661        } else {
662            panic!(
663                "a streaming error occurred while recovery is disabled, aborting: {:?}",
664                err.as_report()
665            );
666        }
667    }
668
669    async fn adhoc_recovery(&mut self) {
670        let err = MetaErrorInner::AdhocRecovery.into();
671        self.clear_on_err(&err).await;
672
673        let span = tracing::info_span!(
674            "adhoc_recovery",
675            error = %err.as_report(),
676        );
677
678        crate::telemetry::report_event(
679            risingwave_pb::telemetry::TelemetryEventStage::Recovery,
680            "adhoc_recovery",
681            0,
682            None,
683            None,
684            None,
685        );
686
687        // No need to clean dirty tables for barrier recovery,
688        // The foreground stream job should cleanup their own tables.
689        self.recovery(false, RecoveryReason::Adhoc)
690            .instrument(span)
691            .await;
692    }
693}
694
695impl<C> GlobalBarrierWorker<C> {
696    /// Send barrier-complete-rpc and wait for responses from all CNs
697    pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
698        // Record failure in event log.
699        use risingwave_pb::meta::event_log;
700        let event = event_log::EventCollectBarrierFail {
701            error: error.to_report_string(),
702        };
703        env.event_log_manager_ref()
704            .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
705    }
706}
707
708mod retry_strategy {
709    use std::time::Duration;
710
711    use tokio_retry::strategy::{ExponentialBackoff, jitter};
712
713    // Retry base interval in milliseconds.
714    const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
715    // Retry max interval.
716    const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
717
718    // MrCroxx: Use concrete type here to prevent unsolved compiler issue.
719    // Feel free to replace the concrete type with TAIT after fixed.
720
721    // mod retry_backoff_future {
722    //     use std::future::Future;
723    //     use std::time::Duration;
724    //
725    //     use tokio::time::sleep;
726    //
727    //     pub(crate) type RetryBackoffFuture = impl Future<Output = ()> + Unpin + Send + 'static;
728    //
729    //     #[define_opaque(RetryBackoffFuture)]
730    //     pub(super) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
731    //         Box::pin(sleep(duration))
732    //     }
733    // }
734    // pub(crate) use retry_backoff_future::*;
735
736    pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
737
738    pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
739        Box::pin(tokio::time::sleep(duration))
740    }
741
742    pub(crate) type RetryBackoffStrategy =
743        impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
744
745    /// Initialize a retry strategy for operation in recovery.
746    #[inline(always)]
747    pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
748        ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
749            .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
750            .map(jitter)
751    }
752
753    #[define_opaque(RetryBackoffStrategy)]
754    pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
755        get_retry_strategy().map(get_retry_backoff_future)
756    }
757}
758
759pub(crate) use retry_strategy::*;
760use risingwave_common::error::tonic::extra::{Score, ScoredError};
761use risingwave_pb::meta::event_log::{Event, EventRecovery};
762
763impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
764    /// Recovery the whole cluster from the latest epoch.
765    ///
766    /// If `paused_reason` is `Some`, all data sources (including connectors and DMLs) will be
767    /// immediately paused after recovery, until the user manually resume them either by restarting
768    /// the cluster or `risectl` command. Used for debugging purpose.
769    ///
770    /// Returns the new state of the barrier manager after recovery.
771    pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
772        // Clear all control streams to release resources (connections to compute nodes) first.
773        self.control_stream_manager.clear();
774
775        let reason_str = match &recovery_reason {
776            RecoveryReason::Bootstrap => "bootstrap".to_owned(),
777            RecoveryReason::Failover(err) => {
778                format!("failed over: {}", err.as_report())
779            }
780            RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
781        };
782        self.context.abort_and_mark_blocked(None, recovery_reason);
783
784        self.recovery_inner(is_paused, reason_str).await;
785        self.context.mark_ready(MarkReadyOptions::Global {
786            blocked_databases: self.checkpoint_control.recovering_databases().collect(),
787        });
788    }
789
790    #[await_tree::instrument("recovery({recovery_reason})")]
791    async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
792        let event_log_manager_ref = self.env.event_log_manager_ref();
793
794        tracing::info!("recovery start!");
795        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
796            EventRecovery::global_recovery_start(recovery_reason.clone()),
797        )]);
798
799        let retry_strategy = get_retry_strategy();
800
801        // We take retry into consideration because this is the latency user sees for a cluster to
802        // get recovered.
803        let recovery_timer = GLOBAL_META_METRICS
804            .recovery_latency
805            .with_label_values(&["global"])
806            .start_timer();
807
808        let enable_per_database_isolation = self.enable_per_database_isolation();
809
810        let recovery_future = tokio_retry::Retry::spawn(retry_strategy, || async {
811            self.env.stream_client_pool().invalidate_all();
812            // We need to notify_creating_job_failed in every recovery retry, because in outer create_streaming_job handler,
813            // it holds the reschedule_read_lock and wait for creating job to finish, and caused the following scale_actor fail
814            // to acquire the reschedule_write_lock, and then keep recovering, and then deadlock.
815            // TODO: refactor and fix this hacky implementation.
816            self.context
817                .notify_creating_job_failed(None, recovery_reason.clone())
818                .await;
819
820            let runtime_info_snapshot = self
821                .context
822                .reload_runtime_info()
823                .await?;
824            let BarrierWorkerRuntimeInfoSnapshot {
825                active_streaming_nodes,
826                recovery_context,
827                mut state_table_committed_epochs,
828                mut state_table_log_epochs,
829                mut mv_depended_subscriptions,
830                mut background_jobs,
831                hummock_version_stats,
832                database_infos,
833                mut cdc_table_snapshot_splits,
834            } = runtime_info_snapshot;
835
836            let term_id = Uuid::new_v4().to_string();
837
838
839            let mut control_stream_manager = ControlStreamManager::recover(
840                    self.env.clone(),
841                    active_streaming_nodes.current(),
842                    &term_id,
843                    self.context.clone(),
844                )
845                .await;
846            {
847                let mut empty_databases = HashSet::new();
848                let mut collected_databases = HashMap::new();
849                let mut collecting_databases = HashMap::new();
850                let mut failed_databases = HashMap::new();
851                for &database_id in recovery_context.fragment_context.database_map.keys() {
852                    let mut injected_creating_jobs = HashSet::new();
853                    let result: MetaResult<_> = try {
854                        let Some(rendered_info) = render_runtime_info(
855                            self.env.actor_id_generator(),
856                            &active_streaming_nodes,
857                            self.adaptive_parallelism_strategy,
858                            &recovery_context,
859                            database_id,
860                        )
861                            .inspect_err(|err: &MetaError| {
862                                warn!(%database_id, err = %err.as_report(), "render runtime info failed");
863                            })? else {
864                            empty_databases.insert(database_id);
865                            continue;
866                        };
867                        BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
868                            database_id,
869                            &rendered_info.job_infos,
870                            &active_streaming_nodes,
871                            &rendered_info.stream_actors,
872                            &state_table_committed_epochs,
873                        )
874                        .inspect_err(|err| {
875                            warn!(%database_id, err = %err.as_report(), "rendered runtime info failed validation");
876                        })?;
877                        let RenderedDatabaseRuntimeInfo {
878                            job_infos,
879                            stream_actors,
880                            mut source_splits,
881                        } = rendered_info;
882                        control_stream_manager.inject_database_initial_barrier(
883                            database_id,
884                            job_infos,
885                            &recovery_context.job_extra_info,
886                            &mut state_table_committed_epochs,
887                            &mut state_table_log_epochs,
888                            &recovery_context.fragment_relations,
889                            &stream_actors,
890                            &mut source_splits,
891                            &mut background_jobs,
892                            &mut mv_depended_subscriptions,
893                            is_paused,
894                            &hummock_version_stats,
895                            &mut cdc_table_snapshot_splits,
896                            &mut injected_creating_jobs,
897                        )?
898                    };
899                    let node_to_collect = match result {
900                        Ok(info) => {
901                            info
902                        }
903                        Err(e) => {
904                            warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
905                            assert!(failed_databases.insert(database_id, injected_creating_jobs).is_none(), "non-duplicate");
906                            continue;
907                        }
908                    };
909                    if !node_to_collect.is_collected() {
910                        assert!(collecting_databases.insert(database_id, node_to_collect).is_none());
911                    } else {
912                        warn!(%database_id, "database has no node to inject initial barrier");
913                        assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
914                    }
915                }
916                if !empty_databases.is_empty() {
917                    info!(?empty_databases, "empty database in global recovery");
918                }
919                while !collecting_databases.is_empty() {
920                    let (worker_id, result) =
921                        control_stream_manager.next_response(&term_id, &self.context).await;
922                    let resp = match result {
923                        Err(e) => {
924                            warn!(%worker_id, err = %e.as_report(), "worker node failure during recovery");
925                            for (failed_database_id, collector) in collecting_databases.extract_if(|_, collector| {
926                                !collector.is_valid_after_worker_err(worker_id)
927                            }) {
928                                warn!(%failed_database_id, %worker_id, "database failed to recovery in global recovery due to worker node err");
929                                assert!(failed_databases.insert(failed_database_id, collector.creating_job_ids().collect()).is_none());
930                            }
931                            continue;
932                        }
933                        Ok(resp) => {
934                            match resp {
935                                Response::CompleteBarrier(resp) => {
936                                    resp
937                                }
938                                Response::ReportPartialGraphFailure(resp) => {
939                                    let (database_id, _) = from_partial_graph_id(resp.partial_graph_id);
940                                    if let Some(collector) = collecting_databases.remove(&database_id) {
941                                        warn!(%database_id, %worker_id, "database reset during global recovery");
942                                        assert!(failed_databases.insert(database_id, collector.creating_job_ids().collect()).is_none());
943                                    } else if let Some(database) = collected_databases.remove(&database_id) {
944                                        warn!(%database_id, %worker_id, "database initialized but later reset during global recovery");
945                                        assert!(failed_databases.insert(database_id, database.creating_streaming_job_controls.keys().copied().collect()).is_none());
946                                    } else {
947                                        assert!(failed_databases.contains_key(&database_id));
948                                    }
949                                    continue;
950                                }
951                                other @ (Response::Init(_) | Response::Shutdown(_) | Response::ResetPartialGraph(_)) => {
952                                    return Err(anyhow!("get unexpected resp {:?}", other).into());
953                                }
954                            }
955                        }
956                    };
957                    assert_eq!(worker_id, resp.worker_id);
958                    let (database_id, _) = from_partial_graph_id(resp.partial_graph_id);
959                    if failed_databases.contains_key(&database_id) {
960                        assert!(!collecting_databases.contains_key(&database_id));
961                        // ignore the lately arrived collect resp of failed database
962                        continue;
963                    }
964                    let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
965                        unreachable!("should exist")
966                    };
967                    let node_to_collect = entry.get_mut();
968                    node_to_collect.collect_resp(resp);
969                    if node_to_collect.is_collected() {
970                        let node_to_collect = entry.remove();
971                        assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
972                    }
973                }
974                debug!("collected initial barrier");
975                if !background_jobs.is_empty() {
976                    warn!(job_ids = ?background_jobs.iter().collect_vec(), "unused recovered background mview in recovery");
977                }
978                if !mv_depended_subscriptions.is_empty() {
979                    warn!(?mv_depended_subscriptions, "unused subscription infos in recovery");
980                }
981                if !state_table_committed_epochs.is_empty() {
982                    warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
983                }
984                if !enable_per_database_isolation && !failed_databases.is_empty() {
985                    return Err(anyhow!(
986                        "global recovery failed due to failure of databases {:?}",
987                        failed_databases.keys().collect_vec()).into()
988                    );
989                }
990                let checkpoint_control = CheckpointControl::recover(
991                    collected_databases,
992                    failed_databases,
993                    &mut control_stream_manager,
994                    hummock_version_stats,
995                    self.env.clone(),
996                );
997
998                let reader = self.env.system_params_reader().await;
999                let checkpoint_frequency = reader.checkpoint_frequency();
1000                let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
1001                let periodic_barriers = PeriodicBarriers::new(
1002                    barrier_interval,
1003                    checkpoint_frequency,
1004                    database_infos,
1005                );
1006
1007                Ok((
1008                    active_streaming_nodes,
1009                    control_stream_manager,
1010                    checkpoint_control,
1011                    term_id,
1012                    periodic_barriers,
1013                ))
1014            }
1015        }.inspect_err(|err: &MetaError| {
1016            tracing::error!(error = %err.as_report(), "recovery failed");
1017            event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1018                EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
1019            )]);
1020            GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
1021        }))
1022        .instrument(tracing::info_span!("recovery_attempt"));
1023
1024        let mut recover_txs = vec![];
1025        let mut update_barrier_requests = vec![];
1026        pin_mut!(recovery_future);
1027        let mut request_rx_closed = false;
1028        let new_state = loop {
1029            select! {
1030                biased;
1031                new_state = &mut recovery_future => {
1032                    break new_state.expect("Retry until recovery success.");
1033                }
1034                request = pin!(self.request_rx.recv()), if !request_rx_closed => {
1035                    let Some(request) = request else {
1036                        warn!("request rx channel closed during recovery");
1037                        request_rx_closed = true;
1038                        continue;
1039                    };
1040                    match request {
1041                        BarrierManagerRequest::GetBackfillProgress(tx) => {
1042                            let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1043                        }
1044                        BarrierManagerRequest::GetFragmentBackfillProgress(tx) => {
1045                            let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1046                        }
1047                        BarrierManagerRequest::GetCdcProgress(tx) => {
1048                            let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1049                        }
1050                        BarrierManagerRequest::AdhocRecovery(tx) => {
1051                            recover_txs.push(tx);
1052                        }
1053                        BarrierManagerRequest::UpdateDatabaseBarrier(request) => {
1054                            update_barrier_requests.push(request);
1055                        }
1056                        BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
1057                            // may recover snapshot backfill jobs
1058                            let _ = tx.send(true);
1059                        }
1060                    }
1061                }
1062            }
1063        };
1064
1065        let duration = recovery_timer.stop_and_record();
1066
1067        (
1068            self.active_streaming_nodes,
1069            self.control_stream_manager,
1070            self.checkpoint_control,
1071            self.term_id,
1072            self.periodic_barriers,
1073        ) = new_state;
1074
1075        tracing::info!("recovery success");
1076
1077        for UpdateDatabaseBarrierRequest {
1078            database_id,
1079            barrier_interval_ms,
1080            checkpoint_frequency,
1081            sender,
1082        } in update_barrier_requests
1083        {
1084            self.periodic_barriers.update_database_barrier(
1085                database_id,
1086                barrier_interval_ms,
1087                checkpoint_frequency,
1088            );
1089            let _ = sender.send(());
1090        }
1091
1092        for tx in recover_txs {
1093            let _ = tx.send(());
1094        }
1095
1096        let recovering_databases = self
1097            .checkpoint_control
1098            .recovering_databases()
1099            .map(|database| database.as_raw_id())
1100            .collect_vec();
1101        let running_databases = self
1102            .checkpoint_control
1103            .running_databases()
1104            .map(|database| database.as_raw_id())
1105            .collect_vec();
1106
1107        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1108            EventRecovery::global_recovery_success(
1109                recovery_reason.clone(),
1110                duration as f32,
1111                running_databases,
1112                recovering_databases,
1113            ),
1114        )]);
1115
1116        self.env
1117            .notification_manager()
1118            .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
1119        self.env
1120            .notification_manager()
1121            .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
1122    }
1123}