risingwave_meta/barrier/checkpoint/
control.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
26use risingwave_meta_model::WorkerId;
27use risingwave_pb::ddl_service::DdlProgress;
28use risingwave_pb::hummock::HummockVersionStats;
29use risingwave_pb::stream_service::BarrierCompleteResponse;
30use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
31use thiserror_ext::AsReport;
32use tracing::{debug, warn};
33
34use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
35use crate::barrier::checkpoint::recovery::{
36    DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
37    RecoveringStateAction,
38};
39use crate::barrier::checkpoint::state::BarrierWorkerState;
40use crate::barrier::command::CommandContext;
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
42use crate::barrier::info::{InflightDatabaseInfo, InflightStreamingJobInfo};
43use crate::barrier::notifier::Notifier;
44use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob};
45use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
46use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
47use crate::barrier::utils::{
48    NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
49};
50use crate::barrier::{
51    BarrierKind, Command, CreateStreamingJobType, InflightSubscriptionInfo, TracedEpoch,
52};
53use crate::manager::MetaSrvEnv;
54use crate::rpc::metrics::GLOBAL_META_METRICS;
55use crate::stream::fill_snapshot_backfill_epoch;
56use crate::{MetaError, MetaResult};
57
58pub(crate) struct CheckpointControl {
59    pub(crate) env: MetaSrvEnv,
60    pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
61    pub(super) hummock_version_stats: HummockVersionStats,
62}
63
64impl CheckpointControl {
65    pub fn new(env: MetaSrvEnv) -> Self {
66        Self {
67            env,
68            databases: Default::default(),
69            hummock_version_stats: Default::default(),
70        }
71    }
72
73    pub(crate) fn recover(
74        databases: impl IntoIterator<Item = (DatabaseId, DatabaseCheckpointControl)>,
75        failed_databases: HashSet<DatabaseId>,
76        control_stream_manager: &mut ControlStreamManager,
77        hummock_version_stats: HummockVersionStats,
78    ) -> Self {
79        Self {
80            env: control_stream_manager.env.clone(),
81            databases: databases
82                .into_iter()
83                .map(|(database_id, control)| {
84                    (
85                        database_id,
86                        DatabaseCheckpointControlStatus::Running(control),
87                    )
88                })
89                .chain(failed_databases.into_iter().map(|database_id| {
90                    (
91                        database_id,
92                        DatabaseCheckpointControlStatus::Recovering(
93                            DatabaseRecoveringState::resetting(database_id, control_stream_manager),
94                        ),
95                    )
96                }))
97                .collect(),
98            hummock_version_stats,
99        }
100    }
101
102    pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
103        self.hummock_version_stats = output.hummock_version_stats;
104        for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
105            self.databases
106                .get_mut(&database_id)
107                .expect("should exist")
108                .expect_running("should have wait for completing command before enter recovery")
109                .ack_completed(command_prev_epoch, creating_job_epochs);
110        }
111    }
112
113    pub(crate) fn next_complete_barrier_task(
114        &mut self,
115        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
116    ) -> Option<CompleteBarrierTask> {
117        let mut task = None;
118        for database in self.databases.values_mut() {
119            let Some(database) = database.running_state_mut() else {
120                continue;
121            };
122            let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
123            database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
124        }
125        task
126    }
127
128    pub(crate) fn barrier_collected(
129        &mut self,
130        resp: BarrierCompleteResponse,
131        periodic_barriers: &mut PeriodicBarriers,
132    ) -> MetaResult<()> {
133        let database_id = DatabaseId::new(resp.database_id);
134        let database_status = self.databases.get_mut(&database_id).expect("should exist");
135        match database_status {
136            DatabaseCheckpointControlStatus::Running(database) => {
137                database.barrier_collected(resp, periodic_barriers)
138            }
139            DatabaseCheckpointControlStatus::Recovering(state) => {
140                state.barrier_collected(database_id, resp);
141                Ok(())
142            }
143        }
144    }
145
146    pub(crate) fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
147        self.databases.values().all(|database| {
148            database
149                .running_state()
150                .map(|database| database.can_inject_barrier(in_flight_barrier_nums))
151                .unwrap_or(true)
152        })
153    }
154
155    pub(crate) fn max_prev_epoch(&self) -> Option<TracedEpoch> {
156        self.databases
157            .values()
158            .flat_map(|database| {
159                database
160                    .running_state()
161                    .map(|database| database.state.in_flight_prev_epoch())
162            })
163            .max_by_key(|epoch| epoch.value())
164            .cloned()
165    }
166
167    pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
168        self.databases.iter().filter_map(|(database_id, database)| {
169            database.running_state().is_none().then_some(*database_id)
170        })
171    }
172
173    pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
174        self.databases.iter().filter_map(|(database_id, database)| {
175            database.running_state().is_some().then_some(*database_id)
176        })
177    }
178
179    /// return Some(failed `database_id` -> `err`)
180    pub(crate) fn handle_new_barrier(
181        &mut self,
182        new_barrier: NewBarrier,
183        control_stream_manager: &mut ControlStreamManager,
184    ) -> Option<HashMap<DatabaseId, MetaError>> {
185        let NewBarrier {
186            command,
187            span,
188            checkpoint,
189        } = new_barrier;
190
191        if let Some((database_id, mut command, notifiers)) = command {
192            if let &mut Command::CreateStreamingJob {
193                ref mut cross_db_snapshot_backfill_info,
194                ref info,
195                ..
196            } = &mut command
197            {
198                for (table_id, snapshot_epoch) in
199                    &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
200                {
201                    for database in self.databases.values() {
202                        if let Some(database) = database.running_state()
203                            && database.state.inflight_graph_info.contains_job(*table_id)
204                        {
205                            if let Some(committed_epoch) = database.committed_epoch {
206                                *snapshot_epoch = Some(committed_epoch);
207                            }
208                            break;
209                        }
210                    }
211                    if snapshot_epoch.is_none() {
212                        let table_id = *table_id;
213                        warn!(
214                            ?cross_db_snapshot_backfill_info,
215                            ?table_id,
216                            ?info,
217                            "database of cross db upstream table not found"
218                        );
219                        let err: MetaError =
220                            anyhow!("database of cross db upstream table {} not found", table_id)
221                                .into();
222                        for notifier in notifiers {
223                            notifier.notify_start_failed(err.clone());
224                        }
225
226                        return None;
227                    }
228                }
229            }
230
231            let max_prev_epoch = self.max_prev_epoch();
232            let (database, max_prev_epoch) = match self.databases.entry(database_id) {
233                Entry::Occupied(entry) => (
234                    entry
235                        .into_mut()
236                        .expect_running("should not have command when not running"),
237                    max_prev_epoch.expect("should exist when having some database"),
238                ),
239                Entry::Vacant(entry) => match &command {
240                    Command::CreateStreamingJob {
241                        job_type: CreateStreamingJobType::Normal,
242                        ..
243                    } => {
244                        let new_database = DatabaseCheckpointControl::new(database_id);
245                        let max_prev_epoch = if let Some(max_prev_epoch) = max_prev_epoch {
246                            if max_prev_epoch.value()
247                                < new_database.state.in_flight_prev_epoch().value()
248                            {
249                                new_database.state.in_flight_prev_epoch().clone()
250                            } else {
251                                max_prev_epoch
252                            }
253                        } else {
254                            new_database.state.in_flight_prev_epoch().clone()
255                        };
256                        control_stream_manager.add_partial_graph(database_id, None);
257                        (
258                            entry
259                                .insert(DatabaseCheckpointControlStatus::Running(new_database))
260                                .expect_running("just initialized as running"),
261                            max_prev_epoch,
262                        )
263                    }
264                    Command::Flush | Command::Pause | Command::Resume => {
265                        for mut notifier in notifiers {
266                            notifier.notify_started();
267                            notifier.notify_collected();
268                        }
269                        warn!(?command, "skip command for empty database");
270                        return None;
271                    }
272                    _ => {
273                        panic!(
274                            "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
275                            database_id, command
276                        )
277                    }
278                },
279            };
280
281            let curr_epoch = max_prev_epoch.next();
282
283            let mut failed_databases: Option<HashMap<_, _>> = None;
284
285            if let Err(e) = database.handle_new_barrier(
286                Some((command, notifiers)),
287                checkpoint,
288                span.clone(),
289                control_stream_manager,
290                &self.hummock_version_stats,
291                curr_epoch.clone(),
292            ) {
293                failed_databases
294                    .get_or_insert_default()
295                    .try_insert(database_id, e)
296                    .expect("non-duplicate");
297            }
298            for database in self.databases.values_mut() {
299                let Some(database) = database.running_state_mut() else {
300                    continue;
301                };
302                if database.database_id == database_id {
303                    continue;
304                }
305                if let Err(e) = database.handle_new_barrier(
306                    None,
307                    checkpoint,
308                    span.clone(),
309                    control_stream_manager,
310                    &self.hummock_version_stats,
311                    curr_epoch.clone(),
312                ) {
313                    failed_databases
314                        .get_or_insert_default()
315                        .try_insert(database.database_id, e)
316                        .expect("non-duplicate");
317                }
318            }
319            failed_databases
320        } else {
321            #[expect(clippy::question_mark)]
322            let Some(max_prev_epoch) = self.max_prev_epoch() else {
323                return None;
324            };
325            let curr_epoch = max_prev_epoch.next();
326
327            let mut failed_databases: Option<HashMap<_, _>> = None;
328            for database in self.databases.values_mut() {
329                let Some(database) = database.running_state_mut() else {
330                    continue;
331                };
332                if let Err(e) = database.handle_new_barrier(
333                    None,
334                    checkpoint,
335                    span.clone(),
336                    control_stream_manager,
337                    &self.hummock_version_stats,
338                    curr_epoch.clone(),
339                ) {
340                    failed_databases
341                        .get_or_insert_default()
342                        .try_insert(database.database_id, e)
343                        .expect("non-duplicate");
344                }
345            }
346
347            failed_databases
348        }
349    }
350
351    pub(crate) fn update_barrier_nums_metrics(&self) {
352        self.databases
353            .values()
354            .flat_map(|database| database.running_state())
355            .for_each(|database| database.update_barrier_nums_metrics());
356    }
357
358    pub(crate) fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
359        let mut progress = HashMap::new();
360        for status in self.databases.values() {
361            let Some(database_checkpoint_control) = status.running_state() else {
362                continue;
363            };
364            // Progress of normal backfill
365            progress.extend(
366                database_checkpoint_control
367                    .create_mview_tracker
368                    .gen_ddl_progress(),
369            );
370            // Progress of snapshot backfill
371            for creating_job in database_checkpoint_control
372                .creating_streaming_job_controls
373                .values()
374            {
375                progress.extend([(
376                    creating_job.job_id.table_id,
377                    creating_job.gen_ddl_progress(),
378                )]);
379            }
380        }
381        progress
382    }
383
384    pub(crate) fn databases_failed_at_worker_err(
385        &mut self,
386        worker_id: WorkerId,
387    ) -> Vec<DatabaseId> {
388        let mut failed_databases = Vec::new();
389        for (database_id, database_status) in &mut self.databases {
390            let database_checkpoint_control = match database_status {
391                DatabaseCheckpointControlStatus::Running(control) => control,
392                DatabaseCheckpointControlStatus::Recovering(state) => {
393                    if !state.is_valid_after_worker_err(worker_id) {
394                        failed_databases.push(*database_id);
395                    }
396                    continue;
397                }
398            };
399
400            if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
401                || database_checkpoint_control
402                    .state
403                    .inflight_graph_info
404                    .contains_worker(worker_id as _)
405                || database_checkpoint_control
406                    .creating_streaming_job_controls
407                    .values_mut()
408                    .any(|job| !job.is_valid_after_worker_err(worker_id))
409            {
410                failed_databases.push(*database_id);
411            }
412        }
413        failed_databases
414    }
415
416    pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
417        for (_, node) in self.databases.values_mut().flat_map(|status| {
418            status
419                .running_state_mut()
420                .map(|database| take(&mut database.command_ctx_queue))
421                .into_iter()
422                .flatten()
423        }) {
424            for notifier in node.notifiers {
425                notifier.notify_collection_failed(err.clone());
426            }
427            node.enqueue_time.observe_duration();
428        }
429        self.databases.values_mut().for_each(|database| {
430            if let Some(database) = database.running_state_mut() {
431                database.create_mview_tracker.abort_all()
432            }
433        });
434    }
435
436    pub(crate) fn inflight_infos(
437        &self,
438    ) -> impl Iterator<
439        Item = (
440            DatabaseId,
441            &InflightSubscriptionInfo,
442            &InflightDatabaseInfo,
443            impl Iterator<Item = &InflightStreamingJobInfo> + '_,
444        ),
445    > + '_ {
446        self.databases.iter().flat_map(|(database_id, database)| {
447            database
448                .database_state()
449                .map(|(database_state, creating_jobs)| {
450                    (
451                        *database_id,
452                        &database_state.inflight_subscription_info,
453                        &database_state.inflight_graph_info,
454                        creating_jobs
455                            .values()
456                            .filter_map(|job| job.inflight_graph_info()),
457                    )
458                })
459        })
460    }
461}
462
463pub(crate) enum CheckpointControlEvent<'a> {
464    EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
465    EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
466}
467
468impl CheckpointControl {
469    pub(crate) fn on_reset_database_resp(
470        &mut self,
471        worker_id: WorkerId,
472        resp: ResetDatabaseResponse,
473    ) {
474        let database_id = DatabaseId::new(resp.database_id);
475        match self.databases.get_mut(&database_id).expect("should exist") {
476            DatabaseCheckpointControlStatus::Running(_) => {
477                unreachable!("should not receive reset database resp when running")
478            }
479            DatabaseCheckpointControlStatus::Recovering(state) => {
480                state.on_reset_database_resp(worker_id, resp)
481            }
482        }
483    }
484
485    pub(crate) fn next_event(
486        &mut self,
487    ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
488        let mut this = Some(self);
489        poll_fn(move |cx| {
490            let Some(this_mut) = this.as_mut() else {
491                unreachable!("should not be polled after poll ready")
492            };
493            for (&database_id, database_status) in &mut this_mut.databases {
494                match database_status {
495                    DatabaseCheckpointControlStatus::Running(_) => {}
496                    DatabaseCheckpointControlStatus::Recovering(state) => {
497                        let poll_result = state.poll_next_event(cx);
498                        if let Poll::Ready(action) = poll_result {
499                            let this = this.take().expect("checked Some");
500                            return Poll::Ready(match action {
501                                RecoveringStateAction::EnterInitializing(reset_workers) => {
502                                    CheckpointControlEvent::EnteringInitializing(
503                                        this.new_database_status_action(
504                                            database_id,
505                                            EnterInitializing(reset_workers),
506                                        ),
507                                    )
508                                }
509                                RecoveringStateAction::EnterRunning => {
510                                    CheckpointControlEvent::EnteringRunning(
511                                        this.new_database_status_action(database_id, EnterRunning),
512                                    )
513                                }
514                            });
515                        }
516                    }
517                }
518            }
519            Poll::Pending
520        })
521    }
522}
523
524pub(crate) enum DatabaseCheckpointControlStatus {
525    Running(DatabaseCheckpointControl),
526    Recovering(DatabaseRecoveringState),
527}
528
529impl DatabaseCheckpointControlStatus {
530    fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
531        match self {
532            DatabaseCheckpointControlStatus::Running(state) => Some(state),
533            DatabaseCheckpointControlStatus::Recovering(_) => None,
534        }
535    }
536
537    fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
538        match self {
539            DatabaseCheckpointControlStatus::Running(state) => Some(state),
540            DatabaseCheckpointControlStatus::Recovering(_) => None,
541        }
542    }
543
544    fn database_state(
545        &self,
546    ) -> Option<(
547        &BarrierWorkerState,
548        &HashMap<TableId, CreatingStreamingJobControl>,
549    )> {
550        match self {
551            DatabaseCheckpointControlStatus::Running(control) => {
552                Some((&control.state, &control.creating_streaming_job_controls))
553            }
554            DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
555        }
556    }
557
558    fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
559        match self {
560            DatabaseCheckpointControlStatus::Running(state) => state,
561            DatabaseCheckpointControlStatus::Recovering(_) => {
562                panic!("should be at running: {}", reason)
563            }
564        }
565    }
566}
567
568struct DatabaseCheckpointControlMetrics {
569    barrier_latency: LabelGuardedHistogram<1>,
570    in_flight_barrier_nums: LabelGuardedIntGauge<1>,
571    all_barrier_nums: LabelGuardedIntGauge<1>,
572}
573
574impl DatabaseCheckpointControlMetrics {
575    fn new(database_id: DatabaseId) -> Self {
576        let database_id_str = database_id.database_id.to_string();
577        let barrier_latency = GLOBAL_META_METRICS
578            .barrier_latency
579            .with_guarded_label_values(&[&database_id_str]);
580        let in_flight_barrier_nums = GLOBAL_META_METRICS
581            .in_flight_barrier_nums
582            .with_guarded_label_values(&[&database_id_str]);
583        let all_barrier_nums = GLOBAL_META_METRICS
584            .all_barrier_nums
585            .with_guarded_label_values(&[&database_id_str]);
586        Self {
587            barrier_latency,
588            in_flight_barrier_nums,
589            all_barrier_nums,
590        }
591    }
592}
593
594/// Controls the concurrent execution of commands.
595pub(crate) struct DatabaseCheckpointControl {
596    database_id: DatabaseId,
597    state: BarrierWorkerState,
598
599    /// Save the state and message of barrier in order.
600    /// Key is the `prev_epoch`.
601    command_ctx_queue: BTreeMap<u64, EpochNode>,
602    /// The barrier that are completing.
603    /// Some(`prev_epoch`)
604    completing_barrier: Option<u64>,
605
606    committed_epoch: Option<u64>,
607    creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
608
609    create_mview_tracker: CreateMviewProgressTracker,
610
611    metrics: DatabaseCheckpointControlMetrics,
612}
613
614impl DatabaseCheckpointControl {
615    fn new(database_id: DatabaseId) -> Self {
616        Self {
617            database_id,
618            state: BarrierWorkerState::new(),
619            command_ctx_queue: Default::default(),
620            completing_barrier: None,
621            committed_epoch: None,
622            creating_streaming_job_controls: Default::default(),
623            create_mview_tracker: Default::default(),
624            metrics: DatabaseCheckpointControlMetrics::new(database_id),
625        }
626    }
627
628    pub(crate) fn recovery(
629        database_id: DatabaseId,
630        create_mview_tracker: CreateMviewProgressTracker,
631        state: BarrierWorkerState,
632        committed_epoch: u64,
633    ) -> Self {
634        Self {
635            database_id,
636            state,
637            command_ctx_queue: Default::default(),
638            completing_barrier: None,
639            committed_epoch: Some(committed_epoch),
640            creating_streaming_job_controls: Default::default(),
641            create_mview_tracker,
642            metrics: DatabaseCheckpointControlMetrics::new(database_id),
643        }
644    }
645
646    fn total_command_num(&self) -> usize {
647        self.command_ctx_queue.len()
648            + match &self.completing_barrier {
649                Some(_) => 1,
650                None => 0,
651            }
652    }
653
654    /// Update the metrics of barrier nums.
655    fn update_barrier_nums_metrics(&self) {
656        self.metrics.in_flight_barrier_nums.set(
657            self.command_ctx_queue
658                .values()
659                .filter(|x| x.state.is_inflight())
660                .count() as i64,
661        );
662        self.metrics
663            .all_barrier_nums
664            .set(self.total_command_num() as i64);
665    }
666
667    fn jobs_to_merge(
668        &self,
669    ) -> Option<HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>> {
670        let mut table_ids_to_merge = HashMap::new();
671
672        for (table_id, creating_streaming_job) in &self.creating_streaming_job_controls {
673            if let Some(graph_info) = creating_streaming_job.should_merge_to_upstream() {
674                table_ids_to_merge.insert(
675                    *table_id,
676                    (
677                        creating_streaming_job
678                            .snapshot_backfill_upstream_tables
679                            .clone(),
680                        graph_info.clone(),
681                    ),
682                );
683            }
684        }
685        if table_ids_to_merge.is_empty() {
686            None
687        } else {
688            Some(table_ids_to_merge)
689        }
690    }
691
692    /// Enqueue a barrier command
693    fn enqueue_command(
694        &mut self,
695        command_ctx: CommandContext,
696        notifiers: Vec<Notifier>,
697        node_to_collect: NodeToCollect,
698        creating_jobs_to_wait: HashSet<TableId>,
699    ) {
700        let timer = self.metrics.barrier_latency.start_timer();
701
702        if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
703            assert_eq!(
704                command_ctx.barrier_info.prev_epoch.value(),
705                node.command_ctx.barrier_info.curr_epoch.value()
706            );
707        }
708
709        tracing::trace!(
710            prev_epoch = command_ctx.barrier_info.prev_epoch(),
711            ?creating_jobs_to_wait,
712            ?node_to_collect,
713            "enqueue command"
714        );
715        self.command_ctx_queue.insert(
716            command_ctx.barrier_info.prev_epoch(),
717            EpochNode {
718                enqueue_time: timer,
719                state: BarrierEpochState {
720                    node_to_collect,
721                    resps: vec![],
722                    creating_jobs_to_wait,
723                    finished_jobs: HashMap::new(),
724                },
725                command_ctx,
726                notifiers,
727            },
728        );
729    }
730
731    /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
732    /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
733    fn barrier_collected(
734        &mut self,
735        resp: BarrierCompleteResponse,
736        periodic_barriers: &mut PeriodicBarriers,
737    ) -> MetaResult<()> {
738        let worker_id = resp.worker_id;
739        let prev_epoch = resp.epoch;
740        tracing::trace!(
741            worker_id,
742            prev_epoch,
743            partial_graph_id = resp.partial_graph_id,
744            "barrier collected"
745        );
746        let creating_job_id = from_partial_graph_id(resp.partial_graph_id);
747        match creating_job_id {
748            None => {
749                if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
750                    assert!(
751                        node.state
752                            .node_to_collect
753                            .remove(&(worker_id as _))
754                            .is_some()
755                    );
756                    node.state.resps.push(resp);
757                } else {
758                    panic!(
759                        "collect barrier on non-existing barrier: {}, {}",
760                        prev_epoch, worker_id
761                    );
762                }
763            }
764            Some(creating_job_id) => {
765                let should_merge_to_upstream = self
766                    .creating_streaming_job_controls
767                    .get_mut(&creating_job_id)
768                    .expect("should exist")
769                    .collect(prev_epoch, worker_id as _, resp)?;
770                if should_merge_to_upstream {
771                    periodic_barriers.force_checkpoint_in_next_barrier();
772                }
773            }
774        }
775        Ok(())
776    }
777
778    /// Pause inject barrier until True.
779    fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
780        self.command_ctx_queue
781            .values()
782            .filter(|x| x.state.is_inflight())
783            .count()
784            < in_flight_barrier_nums
785    }
786
787    /// Return whether the database can still work after worker failure
788    pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
789        for epoch_node in self.command_ctx_queue.values_mut() {
790            if !is_valid_after_worker_err(&mut epoch_node.state.node_to_collect, worker_id) {
791                return false;
792            }
793        }
794        // TODO: include barrier in creating jobs
795        true
796    }
797}
798
799impl DatabaseCheckpointControl {
800    /// return creating job table fragment id -> (backfill progress epoch , {`upstream_mv_table_id`})
801    fn collect_backfill_pinned_upstream_log_epoch(
802        &self,
803    ) -> HashMap<TableId, (u64, HashSet<TableId>)> {
804        self.creating_streaming_job_controls
805            .iter()
806            .filter_map(|(table_id, creating_job)| {
807                creating_job
808                    .pinned_upstream_log_epoch()
809                    .map(|progress_epoch| {
810                        (
811                            *table_id,
812                            (
813                                progress_epoch,
814                                creating_job.snapshot_backfill_upstream_tables.clone(),
815                            ),
816                        )
817                    })
818            })
819            .collect()
820    }
821
822    fn next_complete_barrier_task(
823        &mut self,
824        task: &mut Option<CompleteBarrierTask>,
825        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
826        hummock_version_stats: &HummockVersionStats,
827    ) {
828        // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
829        let mut creating_jobs_task = vec![];
830        {
831            // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
832            let mut finished_jobs = Vec::new();
833            let min_upstream_inflight_barrier = self
834                .command_ctx_queue
835                .first_key_value()
836                .map(|(epoch, _)| *epoch);
837            for (table_id, job) in &mut self.creating_streaming_job_controls {
838                if let Some((epoch, resps, status)) =
839                    job.start_completing(min_upstream_inflight_barrier)
840                {
841                    let is_first_time = match status {
842                        CompleteJobType::First => true,
843                        CompleteJobType::Normal => false,
844                        CompleteJobType::Finished => {
845                            finished_jobs.push((*table_id, epoch, resps));
846                            continue;
847                        }
848                    };
849                    creating_jobs_task.push((*table_id, epoch, resps, is_first_time));
850                }
851            }
852            if !finished_jobs.is_empty()
853                && let Some((_, control_stream_manager)) = &mut context
854            {
855                control_stream_manager.remove_partial_graph(
856                    self.database_id,
857                    finished_jobs
858                        .iter()
859                        .map(|(table_id, _, _)| *table_id)
860                        .collect(),
861                );
862            }
863            for (table_id, epoch, resps) in finished_jobs {
864                let epoch_state = &mut self
865                    .command_ctx_queue
866                    .get_mut(&epoch)
867                    .expect("should exist")
868                    .state;
869                assert!(epoch_state.creating_jobs_to_wait.remove(&table_id));
870                debug!(epoch, ?table_id, "finish creating job");
871                // It's safe to remove the creating job, because on CompleteJobType::Finished,
872                // all previous barriers have been collected and completed.
873                let creating_streaming_job = self
874                    .creating_streaming_job_controls
875                    .remove(&table_id)
876                    .expect("should exist");
877                assert!(creating_streaming_job.is_finished());
878                assert!(epoch_state.finished_jobs.insert(table_id, resps).is_none());
879            }
880        }
881        assert!(self.completing_barrier.is_none());
882        while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
883            && !state.is_inflight()
884        {
885            {
886                let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
887                assert!(node.state.creating_jobs_to_wait.is_empty());
888                assert!(node.state.node_to_collect.is_empty());
889                let mut finished_jobs = self.create_mview_tracker.apply_collected_command(
890                    node.command_ctx.command.as_ref(),
891                    &node.command_ctx.barrier_info,
892                    &node.state.resps,
893                    hummock_version_stats,
894                );
895                if !node.command_ctx.barrier_info.kind.is_checkpoint() {
896                    assert!(finished_jobs.is_empty());
897                    node.notifiers.into_iter().for_each(|notifier| {
898                        notifier.notify_collected();
899                    });
900                    if let Some((periodic_barriers, _)) = &mut context
901                        && self.create_mview_tracker.has_pending_finished_jobs()
902                        && self
903                            .command_ctx_queue
904                            .values()
905                            .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
906                    {
907                        periodic_barriers.force_checkpoint_in_next_barrier();
908                    }
909                    continue;
910                }
911                node.state
912                    .finished_jobs
913                    .drain()
914                    .for_each(|(job_id, resps)| {
915                        node.state.resps.extend(resps);
916                        finished_jobs.push(TrackingJob::New(TrackingCommand {
917                            job_id,
918                            replace_stream_job: None,
919                        }));
920                    });
921                let task = task.get_or_insert_default();
922                node.command_ctx.collect_commit_epoch_info(
923                    &mut task.commit_info,
924                    take(&mut node.state.resps),
925                    self.collect_backfill_pinned_upstream_log_epoch(),
926                );
927                self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
928                task.finished_jobs.extend(finished_jobs);
929                task.notifiers.extend(node.notifiers);
930                task.epoch_infos
931                    .try_insert(
932                        self.database_id,
933                        (Some((node.command_ctx, node.enqueue_time)), vec![]),
934                    )
935                    .expect("non duplicate");
936                break;
937            }
938        }
939        if !creating_jobs_task.is_empty() {
940            let task = task.get_or_insert_default();
941            for (table_id, epoch, resps, is_first_time) in creating_jobs_task {
942                collect_creating_job_commit_epoch_info(
943                    &mut task.commit_info,
944                    epoch,
945                    resps,
946                    self.creating_streaming_job_controls[&table_id].state_table_ids(),
947                    is_first_time,
948                );
949                let (_, creating_job_epochs) =
950                    task.epoch_infos.entry(self.database_id).or_default();
951                creating_job_epochs.push((table_id, epoch));
952            }
953        }
954    }
955
956    fn ack_completed(
957        &mut self,
958        command_prev_epoch: Option<u64>,
959        creating_job_epochs: Vec<(TableId, u64)>,
960    ) {
961        {
962            if let Some(prev_epoch) = self.completing_barrier.take() {
963                assert_eq!(command_prev_epoch, Some(prev_epoch));
964                self.committed_epoch = Some(prev_epoch);
965            } else {
966                assert_eq!(command_prev_epoch, None);
967            };
968            for (table_id, epoch) in creating_job_epochs {
969                self.creating_streaming_job_controls
970                    .get_mut(&table_id)
971                    .expect("should exist")
972                    .ack_completed(epoch)
973            }
974        }
975    }
976}
977
978/// The state and message of this barrier, a node for concurrent checkpoint.
979struct EpochNode {
980    /// Timer for recording barrier latency, taken after `complete_barriers`.
981    enqueue_time: HistogramTimer,
982
983    /// Whether this barrier is in-flight or completed.
984    state: BarrierEpochState,
985
986    /// Context of this command to generate barrier and do some post jobs.
987    command_ctx: CommandContext,
988    /// Notifiers of this barrier.
989    notifiers: Vec<Notifier>,
990}
991
992#[derive(Debug)]
993/// The state of barrier.
994struct BarrierEpochState {
995    node_to_collect: NodeToCollect,
996
997    resps: Vec<BarrierCompleteResponse>,
998
999    creating_jobs_to_wait: HashSet<TableId>,
1000
1001    finished_jobs: HashMap<TableId, Vec<BarrierCompleteResponse>>,
1002}
1003
1004impl BarrierEpochState {
1005    fn is_inflight(&self) -> bool {
1006        !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
1007    }
1008}
1009
1010impl DatabaseCheckpointControl {
1011    /// Handle the new barrier from the scheduled queue and inject it.
1012    fn handle_new_barrier(
1013        &mut self,
1014        command: Option<(Command, Vec<Notifier>)>,
1015        checkpoint: bool,
1016        span: tracing::Span,
1017        control_stream_manager: &mut ControlStreamManager,
1018        hummock_version_stats: &HummockVersionStats,
1019        curr_epoch: TracedEpoch,
1020    ) -> MetaResult<()> {
1021        let (mut command, mut notifiers) = if let Some((command, notifiers)) = command {
1022            (Some(command), notifiers)
1023        } else {
1024            (None, vec![])
1025        };
1026
1027        for table_to_cancel in command
1028            .as_ref()
1029            .map(Command::tables_to_drop)
1030            .into_iter()
1031            .flatten()
1032        {
1033            if self
1034                .creating_streaming_job_controls
1035                .contains_key(&table_to_cancel)
1036            {
1037                warn!(
1038                    table_id = table_to_cancel.table_id,
1039                    "ignore cancel command on creating streaming job"
1040                );
1041                for notifier in notifiers {
1042                    notifier
1043                        .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
1044                }
1045                return Ok(());
1046            }
1047        }
1048
1049        if let Some(Command::RescheduleFragment { .. }) = &command {
1050            if !self.creating_streaming_job_controls.is_empty() {
1051                warn!("ignore reschedule when creating streaming job with snapshot backfill");
1052                for notifier in notifiers {
1053                    notifier.notify_start_failed(
1054                        anyhow!(
1055                            "cannot reschedule when creating streaming job with snapshot backfill",
1056                        )
1057                        .into(),
1058                    );
1059                }
1060                return Ok(());
1061            }
1062        }
1063
1064        let Some(barrier_info) =
1065            self.state
1066                .next_barrier_info(command.as_ref(), checkpoint, curr_epoch)
1067        else {
1068            // skip the command when there is nothing to do with the barrier
1069            for mut notifier in notifiers {
1070                notifier.notify_started();
1071                notifier.notify_collected();
1072            }
1073            return Ok(());
1074        };
1075
1076        let mut edges = self.state.inflight_graph_info.build_edge(command.as_ref());
1077
1078        // Insert newly added creating job
1079        if let Some(Command::CreateStreamingJob {
1080            job_type,
1081            info,
1082            cross_db_snapshot_backfill_info,
1083        }) = &mut command
1084        {
1085            match job_type {
1086                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
1087                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1088                        fill_snapshot_backfill_epoch(
1089                            &mut fragment.nodes,
1090                            None,
1091                            cross_db_snapshot_backfill_info,
1092                        )?;
1093                    }
1094                }
1095                CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
1096                    if self.state.is_paused() {
1097                        warn!("cannot create streaming job with snapshot backfill when paused");
1098                        for notifier in notifiers {
1099                            notifier.notify_start_failed(
1100                                anyhow!("cannot create streaming job with snapshot backfill when paused",)
1101                                    .into(),
1102                            );
1103                        }
1104                        return Ok(());
1105                    }
1106                    // set snapshot epoch of upstream table for snapshot backfill
1107                    for snapshot_backfill_epoch in snapshot_backfill_info
1108                        .upstream_mv_table_id_to_backfill_epoch
1109                        .values_mut()
1110                    {
1111                        assert_eq!(
1112                            snapshot_backfill_epoch.replace(barrier_info.prev_epoch()),
1113                            None,
1114                            "must not set previously"
1115                        );
1116                    }
1117                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1118                        if let Err(e) = fill_snapshot_backfill_epoch(
1119                            &mut fragment.nodes,
1120                            Some(snapshot_backfill_info),
1121                            cross_db_snapshot_backfill_info,
1122                        ) {
1123                            warn!(e = %e.as_report(), "failed to fill snapshot backfill epoch");
1124                            for notifier in notifiers {
1125                                notifier.notify_start_failed(e.clone());
1126                            }
1127                            return Ok(());
1128                        };
1129                    }
1130                    let job_id = info.stream_job_fragments.stream_job_id();
1131                    let snapshot_backfill_upstream_tables = snapshot_backfill_info
1132                        .upstream_mv_table_id_to_backfill_epoch
1133                        .keys()
1134                        .cloned()
1135                        .collect();
1136
1137                    self.creating_streaming_job_controls.insert(
1138                        job_id,
1139                        CreatingStreamingJobControl::new(
1140                            info,
1141                            snapshot_backfill_upstream_tables,
1142                            barrier_info.prev_epoch(),
1143                            hummock_version_stats,
1144                            control_stream_manager,
1145                            edges.as_mut().expect("should exist"),
1146                        )?,
1147                    );
1148                }
1149            }
1150        }
1151
1152        // Collect the jobs to finish
1153        if let (BarrierKind::Checkpoint(_), None) = (&barrier_info.kind, &command)
1154            && let Some(jobs_to_merge) = self.jobs_to_merge()
1155        {
1156            command = Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge));
1157        }
1158
1159        let command = command;
1160
1161        let (
1162            pre_applied_graph_info,
1163            pre_applied_subscription_info,
1164            table_ids_to_commit,
1165            jobs_to_wait,
1166            prev_paused_reason,
1167        ) = self.state.apply_command(command.as_ref());
1168
1169        // Tracing related stuff
1170        barrier_info.prev_epoch.span().in_scope(|| {
1171            tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch.value().0, "new barrier enqueued");
1172        });
1173        span.record("epoch", barrier_info.curr_epoch.value().0);
1174
1175        for creating_job in &mut self.creating_streaming_job_controls.values_mut() {
1176            creating_job.on_new_command(control_stream_manager, command.as_ref(), &barrier_info)?;
1177        }
1178
1179        let node_to_collect = match control_stream_manager.inject_command_ctx_barrier(
1180            self.database_id,
1181            command.as_ref(),
1182            &barrier_info,
1183            prev_paused_reason,
1184            &pre_applied_graph_info,
1185            &self.state.inflight_graph_info,
1186            &mut edges,
1187        ) {
1188            Ok(node_to_collect) => node_to_collect,
1189            Err(err) => {
1190                for notifier in notifiers {
1191                    notifier.notify_start_failed(err.clone());
1192                }
1193                fail_point!("inject_barrier_err_success");
1194                return Err(err);
1195            }
1196        };
1197
1198        // Notify about the injection.
1199        notifiers.iter_mut().for_each(|n| n.notify_started());
1200
1201        let command_ctx = CommandContext::new(
1202            barrier_info,
1203            pre_applied_subscription_info,
1204            table_ids_to_commit.clone(),
1205            command,
1206            span,
1207        );
1208
1209        // Record the in-flight barrier.
1210        self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1211
1212        Ok(())
1213    }
1214}