risingwave_meta/barrier/checkpoint/
control.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_meta_model::{SourceId, WorkerId};
28use risingwave_pb::ddl_service::DdlProgress;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::stream_service::BarrierCompleteResponse;
31use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
32use thiserror_ext::AsReport;
33use tracing::{debug, warn};
34
35use crate::barrier::cdc_progress::CdcTableBackfillTrackerRef;
36use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
37use crate::barrier::checkpoint::recovery::{
38    DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
39    RecoveringStateAction,
40};
41use crate::barrier::checkpoint::state::BarrierWorkerState;
42use crate::barrier::command::CommandContext;
43use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
44use crate::barrier::info::{InflightStreamingJobInfo, SharedActorInfos};
45use crate::barrier::notifier::Notifier;
46use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
47use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
48use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
49use crate::barrier::utils::{
50    NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
51};
52use crate::barrier::{BarrierKind, Command, CreateStreamingJobType};
53use crate::manager::MetaSrvEnv;
54use crate::rpc::metrics::GLOBAL_META_METRICS;
55use crate::stream::{SourceChange, fill_snapshot_backfill_epoch};
56use crate::{MetaError, MetaResult};
57
58pub(crate) struct CheckpointControl {
59    pub(crate) env: MetaSrvEnv,
60    pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
61    pub(super) hummock_version_stats: HummockVersionStats,
62    /// The max barrier nums in flight
63    pub(crate) in_flight_barrier_nums: usize,
64}
65
66impl CheckpointControl {
67    pub fn new(env: MetaSrvEnv) -> Self {
68        Self {
69            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
70            env,
71            databases: Default::default(),
72            hummock_version_stats: Default::default(),
73        }
74    }
75
76    pub(crate) fn recover(
77        databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
78        failed_databases: HashSet<DatabaseId>,
79        control_stream_manager: &mut ControlStreamManager,
80        hummock_version_stats: HummockVersionStats,
81        env: MetaSrvEnv,
82    ) -> Self {
83        env.shared_actor_infos()
84            .retain_databases(databases.keys().chain(&failed_databases).cloned());
85        Self {
86            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
87            env,
88            databases: databases
89                .into_iter()
90                .map(|(database_id, control)| {
91                    (
92                        database_id,
93                        DatabaseCheckpointControlStatus::Running(control),
94                    )
95                })
96                .chain(failed_databases.into_iter().map(|database_id| {
97                    (
98                        database_id,
99                        DatabaseCheckpointControlStatus::Recovering(
100                            DatabaseRecoveringState::resetting(database_id, control_stream_manager),
101                        ),
102                    )
103                }))
104                .collect(),
105            hummock_version_stats,
106        }
107    }
108
109    pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
110        self.hummock_version_stats = output.hummock_version_stats;
111        for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
112            self.databases
113                .get_mut(&database_id)
114                .expect("should exist")
115                .expect_running("should have wait for completing command before enter recovery")
116                .ack_completed(command_prev_epoch, creating_job_epochs);
117        }
118    }
119
120    pub(crate) fn next_complete_barrier_task(
121        &mut self,
122        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
123    ) -> Option<CompleteBarrierTask> {
124        let mut task = None;
125        for database in self.databases.values_mut() {
126            let Some(database) = database.running_state_mut() else {
127                continue;
128            };
129            let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
130            database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
131        }
132        task
133    }
134
135    pub(crate) fn barrier_collected(
136        &mut self,
137        resp: BarrierCompleteResponse,
138        periodic_barriers: &mut PeriodicBarriers,
139    ) -> MetaResult<()> {
140        let database_id = resp.database_id;
141        let database_status = self.databases.get_mut(&database_id).expect("should exist");
142        match database_status {
143            DatabaseCheckpointControlStatus::Running(database) => {
144                database.barrier_collected(resp, periodic_barriers)
145            }
146            DatabaseCheckpointControlStatus::Recovering(state) => {
147                state.barrier_collected(database_id, resp);
148                Ok(())
149            }
150        }
151    }
152
153    pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
154        self.databases.iter().filter_map(|(database_id, database)| {
155            database.running_state().is_none().then_some(*database_id)
156        })
157    }
158
159    pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
160        self.databases.iter().filter_map(|(database_id, database)| {
161            database.running_state().is_some().then_some(*database_id)
162        })
163    }
164
165    /// return Some(failed `database_id` -> `err`)
166    pub(crate) fn handle_new_barrier(
167        &mut self,
168        new_barrier: NewBarrier,
169        control_stream_manager: &mut ControlStreamManager,
170    ) -> MetaResult<()> {
171        let NewBarrier {
172            database_id,
173            command,
174            span,
175            checkpoint,
176        } = new_barrier;
177
178        if let Some((mut command, notifiers)) = command {
179            if let &mut Command::CreateStreamingJob {
180                ref mut cross_db_snapshot_backfill_info,
181                ref info,
182                ..
183            } = &mut command
184            {
185                for (table_id, snapshot_epoch) in
186                    &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
187                {
188                    for database in self.databases.values() {
189                        if let Some(database) = database.running_state()
190                            && database
191                                .state
192                                .inflight_graph_info
193                                .contains_job(table_id.as_job_id())
194                        {
195                            if let Some(committed_epoch) = database.committed_epoch {
196                                *snapshot_epoch = Some(committed_epoch);
197                            }
198                            break;
199                        }
200                    }
201                    if snapshot_epoch.is_none() {
202                        let table_id = *table_id;
203                        warn!(
204                            ?cross_db_snapshot_backfill_info,
205                            ?table_id,
206                            ?info,
207                            "database of cross db upstream table not found"
208                        );
209                        let err: MetaError =
210                            anyhow!("database of cross db upstream table {} not found", table_id)
211                                .into();
212                        for notifier in notifiers {
213                            notifier.notify_start_failed(err.clone());
214                        }
215
216                        return Ok(());
217                    }
218                }
219            }
220
221            let database = match self.databases.entry(database_id) {
222                Entry::Occupied(entry) => entry
223                    .into_mut()
224                    .expect_running("should not have command when not running"),
225                Entry::Vacant(entry) => match &command {
226                    Command::CreateStreamingJob {
227                        job_type: CreateStreamingJobType::Normal,
228                        ..
229                    } => {
230                        let new_database = DatabaseCheckpointControl::new(
231                            database_id,
232                            self.env.shared_actor_infos().clone(),
233                            self.env.cdc_table_backfill_tracker(),
234                        );
235                        control_stream_manager.add_partial_graph(database_id, None);
236                        entry
237                            .insert(DatabaseCheckpointControlStatus::Running(new_database))
238                            .expect_running("just initialized as running")
239                    }
240                    Command::Flush | Command::Pause | Command::Resume => {
241                        for mut notifier in notifiers {
242                            notifier.notify_started();
243                            notifier.notify_collected();
244                        }
245                        warn!(?command, "skip command for empty database");
246                        return Ok(());
247                    }
248                    _ => {
249                        panic!(
250                            "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
251                            database_id, command
252                        )
253                    }
254                },
255            };
256
257            database.handle_new_barrier(
258                Some((command, notifiers)),
259                checkpoint,
260                span,
261                control_stream_manager,
262                &self.hummock_version_stats,
263            )
264        } else {
265            let database = match self.databases.entry(database_id) {
266                Entry::Occupied(entry) => entry.into_mut(),
267                Entry::Vacant(_) => {
268                    // If it does not exist in the HashMap yet, it means that the first streaming
269                    // job has not been created, and we do not need to send a barrier.
270                    return Ok(());
271                }
272            };
273            let Some(database) = database.running_state_mut() else {
274                // Skip new barrier for database which is not running.
275                return Ok(());
276            };
277            if !database.can_inject_barrier(self.in_flight_barrier_nums) {
278                // Skip new barrier with no explicit command when the database should pause inject additional barrier
279                return Ok(());
280            }
281            database.handle_new_barrier(
282                None,
283                checkpoint,
284                span,
285                control_stream_manager,
286                &self.hummock_version_stats,
287            )
288        }
289    }
290
291    pub(crate) fn update_barrier_nums_metrics(&self) {
292        self.databases
293            .values()
294            .flat_map(|database| database.running_state())
295            .for_each(|database| database.update_barrier_nums_metrics());
296    }
297
298    pub(crate) fn gen_ddl_progress(&self) -> HashMap<JobId, DdlProgress> {
299        let mut progress = HashMap::new();
300        for status in self.databases.values() {
301            let Some(database_checkpoint_control) = status.running_state() else {
302                continue;
303            };
304            // Progress of normal backfill
305            progress.extend(
306                database_checkpoint_control
307                    .create_mview_tracker
308                    .gen_ddl_progress(),
309            );
310            // Progress of snapshot backfill
311            for creating_job in database_checkpoint_control
312                .creating_streaming_job_controls
313                .values()
314            {
315                progress.extend([(creating_job.job_id, creating_job.gen_ddl_progress())]);
316            }
317        }
318        progress
319    }
320
321    pub(crate) fn databases_failed_at_worker_err(
322        &mut self,
323        worker_id: WorkerId,
324    ) -> Vec<DatabaseId> {
325        let mut failed_databases = Vec::new();
326        for (database_id, database_status) in &mut self.databases {
327            let database_checkpoint_control = match database_status {
328                DatabaseCheckpointControlStatus::Running(control) => control,
329                DatabaseCheckpointControlStatus::Recovering(state) => {
330                    if !state.is_valid_after_worker_err(worker_id) {
331                        failed_databases.push(*database_id);
332                    }
333                    continue;
334                }
335            };
336
337            if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
338                || database_checkpoint_control
339                    .state
340                    .inflight_graph_info
341                    .contains_worker(worker_id as _)
342                || database_checkpoint_control
343                    .creating_streaming_job_controls
344                    .values_mut()
345                    .any(|job| !job.is_valid_after_worker_err(worker_id))
346            {
347                failed_databases.push(*database_id);
348            }
349        }
350        failed_databases
351    }
352
353    pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
354        for (_, node) in self.databases.values_mut().flat_map(|status| {
355            status
356                .running_state_mut()
357                .map(|database| take(&mut database.command_ctx_queue))
358                .into_iter()
359                .flatten()
360        }) {
361            for notifier in node.notifiers {
362                notifier.notify_collection_failed(err.clone());
363            }
364            node.enqueue_time.observe_duration();
365        }
366        self.databases.values_mut().for_each(|database| {
367            if let Some(database) = database.running_state_mut() {
368                database.create_mview_tracker.abort_all()
369            }
370        });
371    }
372
373    pub(crate) fn inflight_infos(
374        &self,
375    ) -> impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId> + '_)> + '_ {
376        self.databases.iter().flat_map(|(database_id, database)| {
377            database.database_state().map(|(_, creating_jobs)| {
378                (
379                    *database_id,
380                    creating_jobs
381                        .values()
382                        .filter_map(|job| job.is_consuming().then_some(job.job_id)),
383                )
384            })
385        })
386    }
387}
388
389pub(crate) enum CheckpointControlEvent<'a> {
390    EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
391    EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
392}
393
394impl CheckpointControl {
395    pub(crate) fn on_reset_database_resp(
396        &mut self,
397        worker_id: WorkerId,
398        resp: ResetDatabaseResponse,
399    ) {
400        let database_id = resp.database_id;
401        match self.databases.get_mut(&database_id).expect("should exist") {
402            DatabaseCheckpointControlStatus::Running(_) => {
403                unreachable!("should not receive reset database resp when running")
404            }
405            DatabaseCheckpointControlStatus::Recovering(state) => {
406                state.on_reset_database_resp(worker_id, resp)
407            }
408        }
409    }
410
411    pub(crate) fn next_event(
412        &mut self,
413    ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
414        let mut this = Some(self);
415        poll_fn(move |cx| {
416            let Some(this_mut) = this.as_mut() else {
417                unreachable!("should not be polled after poll ready")
418            };
419            for (&database_id, database_status) in &mut this_mut.databases {
420                match database_status {
421                    DatabaseCheckpointControlStatus::Running(_) => {}
422                    DatabaseCheckpointControlStatus::Recovering(state) => {
423                        let poll_result = state.poll_next_event(cx);
424                        if let Poll::Ready(action) = poll_result {
425                            let this = this.take().expect("checked Some");
426                            return Poll::Ready(match action {
427                                RecoveringStateAction::EnterInitializing(reset_workers) => {
428                                    CheckpointControlEvent::EnteringInitializing(
429                                        this.new_database_status_action(
430                                            database_id,
431                                            EnterInitializing(reset_workers),
432                                        ),
433                                    )
434                                }
435                                RecoveringStateAction::EnterRunning => {
436                                    CheckpointControlEvent::EnteringRunning(
437                                        this.new_database_status_action(database_id, EnterRunning),
438                                    )
439                                }
440                            });
441                        }
442                    }
443                }
444            }
445            Poll::Pending
446        })
447    }
448}
449
450pub(crate) enum DatabaseCheckpointControlStatus {
451    Running(DatabaseCheckpointControl),
452    Recovering(DatabaseRecoveringState),
453}
454
455impl DatabaseCheckpointControlStatus {
456    fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
457        match self {
458            DatabaseCheckpointControlStatus::Running(state) => Some(state),
459            DatabaseCheckpointControlStatus::Recovering(_) => None,
460        }
461    }
462
463    fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
464        match self {
465            DatabaseCheckpointControlStatus::Running(state) => Some(state),
466            DatabaseCheckpointControlStatus::Recovering(_) => None,
467        }
468    }
469
470    fn database_state(
471        &self,
472    ) -> Option<(
473        &BarrierWorkerState,
474        &HashMap<JobId, CreatingStreamingJobControl>,
475    )> {
476        match self {
477            DatabaseCheckpointControlStatus::Running(control) => {
478                Some((&control.state, &control.creating_streaming_job_controls))
479            }
480            DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
481        }
482    }
483
484    fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
485        match self {
486            DatabaseCheckpointControlStatus::Running(state) => state,
487            DatabaseCheckpointControlStatus::Recovering(_) => {
488                panic!("should be at running: {}", reason)
489            }
490        }
491    }
492}
493
494struct DatabaseCheckpointControlMetrics {
495    barrier_latency: LabelGuardedHistogram,
496    in_flight_barrier_nums: LabelGuardedIntGauge,
497    all_barrier_nums: LabelGuardedIntGauge,
498}
499
500impl DatabaseCheckpointControlMetrics {
501    fn new(database_id: DatabaseId) -> Self {
502        let database_id_str = database_id.to_string();
503        let barrier_latency = GLOBAL_META_METRICS
504            .barrier_latency
505            .with_guarded_label_values(&[&database_id_str]);
506        let in_flight_barrier_nums = GLOBAL_META_METRICS
507            .in_flight_barrier_nums
508            .with_guarded_label_values(&[&database_id_str]);
509        let all_barrier_nums = GLOBAL_META_METRICS
510            .all_barrier_nums
511            .with_guarded_label_values(&[&database_id_str]);
512        Self {
513            barrier_latency,
514            in_flight_barrier_nums,
515            all_barrier_nums,
516        }
517    }
518}
519
520/// Controls the concurrent execution of commands.
521pub(crate) struct DatabaseCheckpointControl {
522    database_id: DatabaseId,
523    state: BarrierWorkerState,
524
525    /// Save the state and message of barrier in order.
526    /// Key is the `prev_epoch`.
527    command_ctx_queue: BTreeMap<u64, EpochNode>,
528    /// The barrier that are completing.
529    /// Some(`prev_epoch`)
530    completing_barrier: Option<u64>,
531
532    committed_epoch: Option<u64>,
533    creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
534
535    create_mview_tracker: CreateMviewProgressTracker,
536    cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
537
538    metrics: DatabaseCheckpointControlMetrics,
539}
540
541impl DatabaseCheckpointControl {
542    fn new(
543        database_id: DatabaseId,
544        shared_actor_infos: SharedActorInfos,
545        cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
546    ) -> Self {
547        Self {
548            database_id,
549            state: BarrierWorkerState::new(database_id, shared_actor_infos),
550            command_ctx_queue: Default::default(),
551            completing_barrier: None,
552            committed_epoch: None,
553            creating_streaming_job_controls: Default::default(),
554            create_mview_tracker: Default::default(),
555            cdc_table_backfill_tracker,
556            metrics: DatabaseCheckpointControlMetrics::new(database_id),
557        }
558    }
559
560    pub(crate) fn recovery(
561        database_id: DatabaseId,
562        create_mview_tracker: CreateMviewProgressTracker,
563        state: BarrierWorkerState,
564        committed_epoch: u64,
565        creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
566        cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
567    ) -> Self {
568        Self {
569            database_id,
570            state,
571            command_ctx_queue: Default::default(),
572            completing_barrier: None,
573            committed_epoch: Some(committed_epoch),
574            creating_streaming_job_controls,
575            create_mview_tracker,
576            cdc_table_backfill_tracker,
577            metrics: DatabaseCheckpointControlMetrics::new(database_id),
578        }
579    }
580
581    fn total_command_num(&self) -> usize {
582        self.command_ctx_queue.len()
583            + match &self.completing_barrier {
584                Some(_) => 1,
585                None => 0,
586            }
587    }
588
589    /// Update the metrics of barrier nums.
590    fn update_barrier_nums_metrics(&self) {
591        self.metrics.in_flight_barrier_nums.set(
592            self.command_ctx_queue
593                .values()
594                .filter(|x| x.state.is_inflight())
595                .count() as i64,
596        );
597        self.metrics
598            .all_barrier_nums
599            .set(self.total_command_num() as i64);
600    }
601
602    fn jobs_to_merge(
603        &self,
604    ) -> Option<HashMap<JobId, (HashSet<TableId>, InflightStreamingJobInfo)>> {
605        let mut table_ids_to_merge = HashMap::new();
606
607        for (table_id, creating_streaming_job) in &self.creating_streaming_job_controls {
608            if let Some(graph_info) = creating_streaming_job.should_merge_to_upstream() {
609                table_ids_to_merge.insert(
610                    *table_id,
611                    (
612                        creating_streaming_job
613                            .snapshot_backfill_upstream_tables
614                            .clone(),
615                        graph_info.clone(),
616                    ),
617                );
618            }
619        }
620        if table_ids_to_merge.is_empty() {
621            None
622        } else {
623            Some(table_ids_to_merge)
624        }
625    }
626
627    /// Enqueue a barrier command
628    fn enqueue_command(
629        &mut self,
630        command_ctx: CommandContext,
631        notifiers: Vec<Notifier>,
632        node_to_collect: NodeToCollect,
633        creating_jobs_to_wait: HashSet<JobId>,
634    ) {
635        let timer = self.metrics.barrier_latency.start_timer();
636
637        if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
638            assert_eq!(
639                command_ctx.barrier_info.prev_epoch.value(),
640                node.command_ctx.barrier_info.curr_epoch.value()
641            );
642        }
643
644        tracing::trace!(
645            prev_epoch = command_ctx.barrier_info.prev_epoch(),
646            ?creating_jobs_to_wait,
647            ?node_to_collect,
648            "enqueue command"
649        );
650        self.command_ctx_queue.insert(
651            command_ctx.barrier_info.prev_epoch(),
652            EpochNode {
653                enqueue_time: timer,
654                state: BarrierEpochState {
655                    node_to_collect,
656                    resps: vec![],
657                    creating_jobs_to_wait,
658                    finished_jobs: HashMap::new(),
659                },
660                command_ctx,
661                notifiers,
662            },
663        );
664    }
665
666    /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
667    /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
668    fn barrier_collected(
669        &mut self,
670        resp: BarrierCompleteResponse,
671        periodic_barriers: &mut PeriodicBarriers,
672    ) -> MetaResult<()> {
673        let worker_id = resp.worker_id;
674        let prev_epoch = resp.epoch;
675        tracing::trace!(
676            worker_id,
677            prev_epoch,
678            partial_graph_id = resp.partial_graph_id,
679            "barrier collected"
680        );
681        let creating_job_id = from_partial_graph_id(resp.partial_graph_id);
682        match creating_job_id {
683            None => {
684                if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
685                    assert!(
686                        node.state
687                            .node_to_collect
688                            .remove(&(worker_id as _))
689                            .is_some()
690                    );
691                    node.state.resps.push(resp);
692                } else {
693                    panic!(
694                        "collect barrier on non-existing barrier: {}, {}",
695                        prev_epoch, worker_id
696                    );
697                }
698            }
699            Some(creating_job_id) => {
700                let should_merge_to_upstream = self
701                    .creating_streaming_job_controls
702                    .get_mut(&creating_job_id)
703                    .expect("should exist")
704                    .collect(resp);
705                if should_merge_to_upstream {
706                    periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
707                }
708            }
709        }
710        Ok(())
711    }
712
713    /// Pause inject barrier until True.
714    fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
715        self.command_ctx_queue
716            .values()
717            .filter(|x| x.state.is_inflight())
718            .count()
719            < in_flight_barrier_nums
720    }
721
722    /// Return whether the database can still work after worker failure
723    pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
724        for epoch_node in self.command_ctx_queue.values_mut() {
725            if !is_valid_after_worker_err(&mut epoch_node.state.node_to_collect, worker_id) {
726                return false;
727            }
728        }
729        // TODO: include barrier in creating jobs
730        true
731    }
732}
733
734impl DatabaseCheckpointControl {
735    /// return creating job table fragment id -> (backfill progress epoch , {`upstream_mv_table_id`})
736    fn collect_backfill_pinned_upstream_log_epoch(
737        &self,
738    ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
739        self.creating_streaming_job_controls
740            .iter()
741            .filter_map(|(table_id, creating_job)| {
742                creating_job
743                    .pinned_upstream_log_epoch()
744                    .map(|progress_epoch| {
745                        (
746                            *table_id,
747                            (
748                                progress_epoch,
749                                creating_job.snapshot_backfill_upstream_tables.clone(),
750                            ),
751                        )
752                    })
753            })
754            .collect()
755    }
756
757    fn next_complete_barrier_task(
758        &mut self,
759        task: &mut Option<CompleteBarrierTask>,
760        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
761        hummock_version_stats: &HummockVersionStats,
762    ) {
763        // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
764        let mut creating_jobs_task = vec![];
765        {
766            // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
767            let mut finished_jobs = Vec::new();
768            let min_upstream_inflight_barrier = self
769                .command_ctx_queue
770                .first_key_value()
771                .map(|(epoch, _)| *epoch);
772            for (table_id, job) in &mut self.creating_streaming_job_controls {
773                if let Some((epoch, resps, status)) =
774                    job.start_completing(min_upstream_inflight_barrier)
775                {
776                    let is_first_time = match status {
777                        CompleteJobType::First => true,
778                        CompleteJobType::Normal => false,
779                        CompleteJobType::Finished => {
780                            finished_jobs.push((*table_id, epoch, resps));
781                            continue;
782                        }
783                    };
784                    creating_jobs_task.push((*table_id, epoch, resps, is_first_time));
785                }
786            }
787            if !finished_jobs.is_empty()
788                && let Some((_, control_stream_manager)) = &mut context
789            {
790                control_stream_manager.remove_partial_graph(
791                    self.database_id,
792                    finished_jobs
793                        .iter()
794                        .map(|(table_id, _, _)| *table_id)
795                        .collect(),
796                );
797            }
798            for (job_id, epoch, resps) in finished_jobs {
799                let epoch_state = &mut self
800                    .command_ctx_queue
801                    .get_mut(&epoch)
802                    .expect("should exist")
803                    .state;
804                assert!(epoch_state.creating_jobs_to_wait.remove(&job_id));
805                debug!(epoch, %job_id, "finish creating job");
806                // It's safe to remove the creating job, because on CompleteJobType::Finished,
807                // all previous barriers have been collected and completed.
808                let creating_streaming_job = self
809                    .creating_streaming_job_controls
810                    .remove(&job_id)
811                    .expect("should exist");
812                assert!(creating_streaming_job.is_finished());
813
814                let mut source_backfill_fragments = HashMap::new();
815                for info in creating_streaming_job.graph_info().fragment_infos() {
816                    if let Some((source_id, upstream_source_fragment_id)) =
817                        info.nodes.find_source_backfill()
818                    {
819                        source_backfill_fragments
820                            .entry(source_id as SourceId)
821                            .or_insert(BTreeSet::new())
822                            .insert((info.fragment_id, upstream_source_fragment_id));
823                    }
824                }
825                let source_change = if !source_backfill_fragments.is_empty() {
826                    Some(SourceChange::CreateJobFinished {
827                        finished_backfill_fragments: source_backfill_fragments,
828                    })
829                } else {
830                    None
831                };
832
833                assert!(
834                    epoch_state
835                        .finished_jobs
836                        .insert(job_id, (resps, source_change))
837                        .is_none()
838                );
839            }
840        }
841        assert!(self.completing_barrier.is_none());
842        while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
843            && !state.is_inflight()
844        {
845            {
846                let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
847                assert!(node.state.creating_jobs_to_wait.is_empty());
848                assert!(node.state.node_to_collect.is_empty());
849
850                self.handle_refresh_table_info(task, &node);
851
852                let staging_commit_info = self.create_mview_tracker.apply_collected_command(
853                    node.command_ctx.command.as_ref(),
854                    &node.command_ctx.barrier_info,
855                    &node.state.resps,
856                    hummock_version_stats,
857                );
858                let finished_cdc_backfill = self
859                    .cdc_table_backfill_tracker
860                    .apply_collected_command(&node.state.resps);
861                if !node.command_ctx.barrier_info.kind.is_checkpoint() {
862                    assert!(staging_commit_info.is_none());
863                    node.notifiers.into_iter().for_each(|notifier| {
864                        notifier.notify_collected();
865                    });
866                    if let Some((periodic_barriers, _)) = &mut context
867                        && self.create_mview_tracker.has_pending_finished_jobs()
868                        && self
869                            .command_ctx_queue
870                            .values()
871                            .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
872                    {
873                        periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
874                    }
875                    continue;
876                }
877                let mut staging_commit_info =
878                    staging_commit_info.expect("should be Some for checkpoint");
879                node.state
880                    .finished_jobs
881                    .drain()
882                    .for_each(|(job_id, (resps, source_change))| {
883                        node.state.resps.extend(resps);
884                        staging_commit_info
885                            .finished_jobs
886                            .push(TrackingJob::new(job_id, source_change));
887                    });
888                let task = task.get_or_insert_default();
889                node.command_ctx.collect_commit_epoch_info(
890                    &mut task.commit_info,
891                    take(&mut node.state.resps),
892                    self.collect_backfill_pinned_upstream_log_epoch(),
893                );
894                self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
895                task.finished_jobs.extend(staging_commit_info.finished_jobs);
896                task.finished_cdc_table_backfill
897                    .extend(finished_cdc_backfill);
898                task.notifiers.extend(node.notifiers);
899                task.epoch_infos
900                    .try_insert(
901                        self.database_id,
902                        (Some((node.command_ctx, node.enqueue_time)), vec![]),
903                    )
904                    .expect("non duplicate");
905                task.commit_info
906                    .truncate_tables
907                    .extend(staging_commit_info.table_ids_to_truncate);
908                break;
909            }
910        }
911        if !creating_jobs_task.is_empty() {
912            let task = task.get_or_insert_default();
913            for (job_id, epoch, resps, is_first_time) in creating_jobs_task {
914                collect_creating_job_commit_epoch_info(
915                    &mut task.commit_info,
916                    epoch,
917                    resps,
918                    self.creating_streaming_job_controls[&job_id].state_table_ids(),
919                    is_first_time,
920                );
921                let (_, creating_job_epochs) =
922                    task.epoch_infos.entry(self.database_id).or_default();
923                creating_job_epochs.push((job_id, epoch));
924            }
925        }
926    }
927
928    fn ack_completed(
929        &mut self,
930        command_prev_epoch: Option<u64>,
931        creating_job_epochs: Vec<(JobId, u64)>,
932    ) {
933        {
934            if let Some(prev_epoch) = self.completing_barrier.take() {
935                assert_eq!(command_prev_epoch, Some(prev_epoch));
936                self.committed_epoch = Some(prev_epoch);
937            } else {
938                assert_eq!(command_prev_epoch, None);
939            };
940            for (job_id, epoch) in creating_job_epochs {
941                self.creating_streaming_job_controls
942                    .get_mut(&job_id)
943                    .expect("should exist")
944                    .ack_completed(epoch)
945            }
946        }
947    }
948
949    fn handle_refresh_table_info(&self, task: &mut Option<CompleteBarrierTask>, node: &EpochNode) {
950        let list_finished_info = node
951            .state
952            .resps
953            .iter()
954            .flat_map(|resp| resp.list_finished_sources.clone())
955            .collect::<Vec<_>>();
956        if !list_finished_info.is_empty() {
957            let task = task.get_or_insert_default();
958            task.list_finished_source_ids.extend(list_finished_info);
959        }
960
961        let load_finished_info = node
962            .state
963            .resps
964            .iter()
965            .flat_map(|resp| resp.load_finished_sources.clone())
966            .collect::<Vec<_>>();
967        if !load_finished_info.is_empty() {
968            let task = task.get_or_insert_default();
969            task.load_finished_source_ids.extend(load_finished_info);
970        }
971
972        let refresh_finished_table_ids: Vec<JobId> = node
973            .state
974            .resps
975            .iter()
976            .flat_map(|resp| {
977                resp.refresh_finished_tables
978                    .iter()
979                    .map(|table_id| table_id.as_job_id())
980            })
981            .collect::<Vec<_>>();
982        if !refresh_finished_table_ids.is_empty() {
983            let task = task.get_or_insert_default();
984            task.refresh_finished_table_job_ids
985                .extend(refresh_finished_table_ids);
986        }
987    }
988}
989
990/// The state and message of this barrier, a node for concurrent checkpoint.
991struct EpochNode {
992    /// Timer for recording barrier latency, taken after `complete_barriers`.
993    enqueue_time: HistogramTimer,
994
995    /// Whether this barrier is in-flight or completed.
996    state: BarrierEpochState,
997
998    /// Context of this command to generate barrier and do some post jobs.
999    command_ctx: CommandContext,
1000    /// Notifiers of this barrier.
1001    notifiers: Vec<Notifier>,
1002}
1003
1004#[derive(Debug)]
1005/// The state of barrier.
1006struct BarrierEpochState {
1007    node_to_collect: NodeToCollect,
1008
1009    resps: Vec<BarrierCompleteResponse>,
1010
1011    creating_jobs_to_wait: HashSet<JobId>,
1012
1013    finished_jobs: HashMap<JobId, (Vec<BarrierCompleteResponse>, Option<SourceChange>)>,
1014}
1015
1016impl BarrierEpochState {
1017    fn is_inflight(&self) -> bool {
1018        !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
1019    }
1020}
1021
1022impl DatabaseCheckpointControl {
1023    /// Handle the new barrier from the scheduled queue and inject it.
1024    fn handle_new_barrier(
1025        &mut self,
1026        command: Option<(Command, Vec<Notifier>)>,
1027        checkpoint: bool,
1028        span: tracing::Span,
1029        control_stream_manager: &mut ControlStreamManager,
1030        hummock_version_stats: &HummockVersionStats,
1031    ) -> MetaResult<()> {
1032        let curr_epoch = self.state.in_flight_prev_epoch().next();
1033
1034        let (mut command, mut notifiers) = if let Some((command, notifiers)) = command {
1035            (Some(command), notifiers)
1036        } else {
1037            (None, vec![])
1038        };
1039
1040        for job_to_cancel in command
1041            .as_ref()
1042            .map(Command::jobs_to_drop)
1043            .into_iter()
1044            .flatten()
1045        {
1046            if self
1047                .creating_streaming_job_controls
1048                .contains_key(&job_to_cancel)
1049            {
1050                warn!(
1051                    job_id = %job_to_cancel,
1052                    "ignore cancel command on creating streaming job"
1053                );
1054                for notifier in notifiers {
1055                    notifier
1056                        .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
1057                }
1058                return Ok(());
1059            }
1060        }
1061
1062        if let Some(Command::RescheduleFragment { .. }) = &command
1063            && !self.creating_streaming_job_controls.is_empty()
1064        {
1065            warn!("ignore reschedule when creating streaming job with snapshot backfill");
1066            for notifier in notifiers {
1067                notifier.notify_start_failed(
1068                    anyhow!(
1069                            "cannot reschedule when creating streaming job with snapshot backfill",
1070                        )
1071                        .into(),
1072                );
1073            }
1074            return Ok(());
1075        }
1076
1077        let Some(barrier_info) =
1078            self.state
1079                .next_barrier_info(command.as_ref(), checkpoint, curr_epoch)
1080        else {
1081            // skip the command when there is nothing to do with the barrier
1082            for mut notifier in notifiers {
1083                notifier.notify_started();
1084                notifier.notify_collected();
1085            }
1086            return Ok(());
1087        };
1088
1089        let mut edges = self
1090            .state
1091            .inflight_graph_info
1092            .build_edge(command.as_ref(), &*control_stream_manager);
1093
1094        // Insert newly added creating job
1095        if let Some(Command::CreateStreamingJob {
1096            job_type,
1097            info,
1098            cross_db_snapshot_backfill_info,
1099        }) = &mut command
1100        {
1101            match job_type {
1102                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
1103                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1104                        fill_snapshot_backfill_epoch(
1105                            &mut fragment.nodes,
1106                            None,
1107                            cross_db_snapshot_backfill_info,
1108                        )?;
1109                    }
1110                }
1111                CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
1112                    if self.state.is_paused() {
1113                        warn!("cannot create streaming job with snapshot backfill when paused");
1114                        for notifier in notifiers {
1115                            notifier.notify_start_failed(
1116                                anyhow!("cannot create streaming job with snapshot backfill when paused",)
1117                                    .into(),
1118                            );
1119                        }
1120                        return Ok(());
1121                    }
1122                    // set snapshot epoch of upstream table for snapshot backfill
1123                    for snapshot_backfill_epoch in snapshot_backfill_info
1124                        .upstream_mv_table_id_to_backfill_epoch
1125                        .values_mut()
1126                    {
1127                        assert_eq!(
1128                            snapshot_backfill_epoch.replace(barrier_info.prev_epoch()),
1129                            None,
1130                            "must not set previously"
1131                        );
1132                    }
1133                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1134                        if let Err(e) = fill_snapshot_backfill_epoch(
1135                            &mut fragment.nodes,
1136                            Some(snapshot_backfill_info),
1137                            cross_db_snapshot_backfill_info,
1138                        ) {
1139                            warn!(e = %e.as_report(), "failed to fill snapshot backfill epoch");
1140                            for notifier in notifiers {
1141                                notifier.notify_start_failed(e.clone());
1142                            }
1143                            return Ok(());
1144                        };
1145                    }
1146                    let job_id = info.stream_job_fragments.stream_job_id();
1147                    let snapshot_backfill_upstream_tables = snapshot_backfill_info
1148                        .upstream_mv_table_id_to_backfill_epoch
1149                        .keys()
1150                        .cloned()
1151                        .collect();
1152
1153                    let job = CreatingStreamingJobControl::new(
1154                        info,
1155                        snapshot_backfill_upstream_tables,
1156                        barrier_info.prev_epoch(),
1157                        hummock_version_stats,
1158                        control_stream_manager,
1159                        edges.as_mut().expect("should exist"),
1160                    )?;
1161
1162                    self.state.inflight_graph_info.shared_actor_infos.upsert(
1163                        self.database_id,
1164                        job.graph_info()
1165                            .fragment_infos
1166                            .values()
1167                            .map(|fragment| (fragment, job.job_id)),
1168                    );
1169
1170                    self.creating_streaming_job_controls.insert(job_id, job);
1171                }
1172            }
1173        }
1174
1175        // Collect the jobs to finish
1176        if let (BarrierKind::Checkpoint(_), None) = (&barrier_info.kind, &command) {
1177            if let Some(jobs_to_merge) = self.jobs_to_merge() {
1178                command = Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge));
1179            } else {
1180                let pending_backfill_nodes =
1181                    self.create_mview_tracker.take_pending_backfill_nodes();
1182                if !pending_backfill_nodes.is_empty() {
1183                    command = Some(Command::StartFragmentBackfill {
1184                        fragment_ids: pending_backfill_nodes,
1185                    });
1186                }
1187            }
1188        }
1189
1190        let command = command;
1191
1192        let (
1193            pre_applied_graph_info,
1194            mv_subscription_max_retention,
1195            table_ids_to_commit,
1196            jobs_to_wait,
1197            prev_paused_reason,
1198        ) = self.state.apply_command(command.as_ref());
1199
1200        // Tracing related stuff
1201        barrier_info.prev_epoch.span().in_scope(|| {
1202            tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch.value().0, "new barrier enqueued");
1203        });
1204        span.record("epoch", barrier_info.curr_epoch.value().0);
1205
1206        for creating_job in &mut self.creating_streaming_job_controls.values_mut() {
1207            creating_job.on_new_command(control_stream_manager, command.as_ref(), &barrier_info)?;
1208        }
1209
1210        let node_to_collect = match control_stream_manager.inject_command_ctx_barrier(
1211            self.database_id,
1212            command.as_ref(),
1213            &barrier_info,
1214            prev_paused_reason,
1215            &pre_applied_graph_info,
1216            &self.state.inflight_graph_info,
1217            &mut edges,
1218        ) {
1219            Ok(node_to_collect) => node_to_collect,
1220            Err(err) => {
1221                for notifier in notifiers {
1222                    notifier.notify_start_failed(err.clone());
1223                }
1224                fail_point!("inject_barrier_err_success");
1225                return Err(err);
1226            }
1227        };
1228
1229        // Notify about the injection.
1230        notifiers.iter_mut().for_each(|n| n.notify_started());
1231
1232        let command_ctx = CommandContext::new(
1233            barrier_info,
1234            mv_subscription_max_retention,
1235            table_ids_to_commit,
1236            command,
1237            span,
1238        );
1239
1240        // Record the in-flight barrier.
1241        self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1242
1243        Ok(())
1244    }
1245}