risingwave_meta/barrier/checkpoint/
control.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
26use risingwave_meta_model::WorkerId;
27use risingwave_pb::ddl_service::DdlProgress;
28use risingwave_pb::hummock::HummockVersionStats;
29use risingwave_pb::stream_service::BarrierCompleteResponse;
30use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
31use thiserror_ext::AsReport;
32use tracing::{debug, warn};
33
34use crate::barrier::cdc_progress::CdcTableBackfillTrackerRef;
35use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
36use crate::barrier::checkpoint::recovery::{
37    DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
38    RecoveringStateAction,
39};
40use crate::barrier::checkpoint::state::BarrierWorkerState;
41use crate::barrier::command::CommandContext;
42use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
43use crate::barrier::info::{InflightStreamingJobInfo, SharedActorInfos};
44use crate::barrier::notifier::Notifier;
45use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob};
46use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
47use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
48use crate::barrier::utils::{
49    NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
50};
51use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, InflightSubscriptionInfo};
52use crate::manager::MetaSrvEnv;
53use crate::rpc::metrics::GLOBAL_META_METRICS;
54use crate::stream::fill_snapshot_backfill_epoch;
55use crate::{MetaError, MetaResult};
56
57pub(crate) struct CheckpointControl {
58    pub(crate) env: MetaSrvEnv,
59    pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
60    pub(super) hummock_version_stats: HummockVersionStats,
61    /// The max barrier nums in flight
62    pub(crate) in_flight_barrier_nums: usize,
63}
64
65impl CheckpointControl {
66    pub fn new(env: MetaSrvEnv) -> Self {
67        Self {
68            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
69            env,
70            databases: Default::default(),
71            hummock_version_stats: Default::default(),
72        }
73    }
74
75    pub(crate) fn recover(
76        databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
77        failed_databases: HashSet<DatabaseId>,
78        control_stream_manager: &mut ControlStreamManager,
79        hummock_version_stats: HummockVersionStats,
80        env: MetaSrvEnv,
81    ) -> Self {
82        env.shared_actor_infos()
83            .retain_databases(databases.keys().chain(&failed_databases).cloned());
84        Self {
85            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
86            env,
87            databases: databases
88                .into_iter()
89                .map(|(database_id, control)| {
90                    (
91                        database_id,
92                        DatabaseCheckpointControlStatus::Running(control),
93                    )
94                })
95                .chain(failed_databases.into_iter().map(|database_id| {
96                    (
97                        database_id,
98                        DatabaseCheckpointControlStatus::Recovering(
99                            DatabaseRecoveringState::resetting(database_id, control_stream_manager),
100                        ),
101                    )
102                }))
103                .collect(),
104            hummock_version_stats,
105        }
106    }
107
108    pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
109        self.hummock_version_stats = output.hummock_version_stats;
110        for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
111            self.databases
112                .get_mut(&database_id)
113                .expect("should exist")
114                .expect_running("should have wait for completing command before enter recovery")
115                .ack_completed(command_prev_epoch, creating_job_epochs);
116        }
117    }
118
119    pub(crate) fn next_complete_barrier_task(
120        &mut self,
121        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
122    ) -> Option<CompleteBarrierTask> {
123        let mut task = None;
124        for database in self.databases.values_mut() {
125            let Some(database) = database.running_state_mut() else {
126                continue;
127            };
128            let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
129            database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
130        }
131        task
132    }
133
134    pub(crate) fn barrier_collected(
135        &mut self,
136        resp: BarrierCompleteResponse,
137        periodic_barriers: &mut PeriodicBarriers,
138    ) -> MetaResult<()> {
139        let database_id = DatabaseId::new(resp.database_id);
140        let database_status = self.databases.get_mut(&database_id).expect("should exist");
141        match database_status {
142            DatabaseCheckpointControlStatus::Running(database) => {
143                database.barrier_collected(resp, periodic_barriers)
144            }
145            DatabaseCheckpointControlStatus::Recovering(state) => {
146                state.barrier_collected(database_id, resp);
147                Ok(())
148            }
149        }
150    }
151
152    pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
153        self.databases.iter().filter_map(|(database_id, database)| {
154            database.running_state().is_none().then_some(*database_id)
155        })
156    }
157
158    pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
159        self.databases.iter().filter_map(|(database_id, database)| {
160            database.running_state().is_some().then_some(*database_id)
161        })
162    }
163
164    /// return Some(failed `database_id` -> `err`)
165    pub(crate) fn handle_new_barrier(
166        &mut self,
167        new_barrier: NewBarrier,
168        control_stream_manager: &mut ControlStreamManager,
169    ) -> MetaResult<()> {
170        let NewBarrier {
171            database_id,
172            command,
173            span,
174            checkpoint,
175        } = new_barrier;
176
177        if let Some((mut command, notifiers)) = command {
178            if let &mut Command::CreateStreamingJob {
179                ref mut cross_db_snapshot_backfill_info,
180                ref info,
181                ..
182            } = &mut command
183            {
184                for (table_id, snapshot_epoch) in
185                    &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
186                {
187                    for database in self.databases.values() {
188                        if let Some(database) = database.running_state()
189                            && database.state.inflight_graph_info.contains_job(*table_id)
190                        {
191                            if let Some(committed_epoch) = database.committed_epoch {
192                                *snapshot_epoch = Some(committed_epoch);
193                            }
194                            break;
195                        }
196                    }
197                    if snapshot_epoch.is_none() {
198                        let table_id = *table_id;
199                        warn!(
200                            ?cross_db_snapshot_backfill_info,
201                            ?table_id,
202                            ?info,
203                            "database of cross db upstream table not found"
204                        );
205                        let err: MetaError =
206                            anyhow!("database of cross db upstream table {} not found", table_id)
207                                .into();
208                        for notifier in notifiers {
209                            notifier.notify_start_failed(err.clone());
210                        }
211
212                        return Ok(());
213                    }
214                }
215            }
216
217            let database = match self.databases.entry(database_id) {
218                Entry::Occupied(entry) => entry
219                    .into_mut()
220                    .expect_running("should not have command when not running"),
221                Entry::Vacant(entry) => match &command {
222                    Command::CreateStreamingJob {
223                        job_type: CreateStreamingJobType::Normal,
224                        ..
225                    } => {
226                        let new_database = DatabaseCheckpointControl::new(
227                            database_id,
228                            self.env.shared_actor_infos().clone(),
229                            self.env.cdc_table_backfill_tracker(),
230                        );
231                        control_stream_manager.add_partial_graph(database_id, None);
232                        entry
233                            .insert(DatabaseCheckpointControlStatus::Running(new_database))
234                            .expect_running("just initialized as running")
235                    }
236                    Command::Flush | Command::Pause | Command::Resume => {
237                        for mut notifier in notifiers {
238                            notifier.notify_started();
239                            notifier.notify_collected();
240                        }
241                        warn!(?command, "skip command for empty database");
242                        return Ok(());
243                    }
244                    _ => {
245                        panic!(
246                            "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
247                            database_id, command
248                        )
249                    }
250                },
251            };
252
253            database.handle_new_barrier(
254                Some((command, notifiers)),
255                checkpoint,
256                span.clone(),
257                control_stream_manager,
258                &self.hummock_version_stats,
259            )
260        } else {
261            let database = match self.databases.entry(database_id) {
262                Entry::Occupied(entry) => entry.into_mut(),
263                Entry::Vacant(_) => {
264                    // If it does not exist in the HashMap yet, it means that the first streaming
265                    // job has not been created, and we do not need to send a barrier.
266                    return Ok(());
267                }
268            };
269            let Some(database) = database.running_state_mut() else {
270                // Skip new barrier for database which is not running.
271                return Ok(());
272            };
273            if !database.can_inject_barrier(self.in_flight_barrier_nums) {
274                // Skip new barrier with no explicit command when the database should pause inject additional barrier
275                return Ok(());
276            }
277            database.handle_new_barrier(
278                None,
279                checkpoint,
280                span.clone(),
281                control_stream_manager,
282                &self.hummock_version_stats,
283            )
284        }
285    }
286
287    pub(crate) fn update_barrier_nums_metrics(&self) {
288        self.databases
289            .values()
290            .flat_map(|database| database.running_state())
291            .for_each(|database| database.update_barrier_nums_metrics());
292    }
293
294    pub(crate) fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
295        let mut progress = HashMap::new();
296        for status in self.databases.values() {
297            let Some(database_checkpoint_control) = status.running_state() else {
298                continue;
299            };
300            // Progress of normal backfill
301            progress.extend(
302                database_checkpoint_control
303                    .create_mview_tracker
304                    .gen_ddl_progress(),
305            );
306            // Progress of snapshot backfill
307            for creating_job in database_checkpoint_control
308                .creating_streaming_job_controls
309                .values()
310            {
311                progress.extend([(
312                    creating_job.job_id.table_id,
313                    creating_job.gen_ddl_progress(),
314                )]);
315            }
316        }
317        progress
318    }
319
320    pub(crate) fn databases_failed_at_worker_err(
321        &mut self,
322        worker_id: WorkerId,
323    ) -> Vec<DatabaseId> {
324        let mut failed_databases = Vec::new();
325        for (database_id, database_status) in &mut self.databases {
326            let database_checkpoint_control = match database_status {
327                DatabaseCheckpointControlStatus::Running(control) => control,
328                DatabaseCheckpointControlStatus::Recovering(state) => {
329                    if !state.is_valid_after_worker_err(worker_id) {
330                        failed_databases.push(*database_id);
331                    }
332                    continue;
333                }
334            };
335
336            if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
337                || database_checkpoint_control
338                    .state
339                    .inflight_graph_info
340                    .contains_worker(worker_id as _)
341                || database_checkpoint_control
342                    .creating_streaming_job_controls
343                    .values_mut()
344                    .any(|job| !job.is_valid_after_worker_err(worker_id))
345            {
346                failed_databases.push(*database_id);
347            }
348        }
349        failed_databases
350    }
351
352    pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
353        for (_, node) in self.databases.values_mut().flat_map(|status| {
354            status
355                .running_state_mut()
356                .map(|database| take(&mut database.command_ctx_queue))
357                .into_iter()
358                .flatten()
359        }) {
360            for notifier in node.notifiers {
361                notifier.notify_collection_failed(err.clone());
362            }
363            node.enqueue_time.observe_duration();
364        }
365        self.databases.values_mut().for_each(|database| {
366            if let Some(database) = database.running_state_mut() {
367                database.create_mview_tracker.abort_all()
368            }
369        });
370    }
371
372    pub(crate) fn inflight_infos(
373        &self,
374    ) -> impl Iterator<
375        Item = (
376            DatabaseId,
377            &InflightSubscriptionInfo,
378            impl Iterator<Item = TableId> + '_,
379        ),
380    > + '_ {
381        self.databases.iter().flat_map(|(database_id, database)| {
382            database
383                .database_state()
384                .map(|(database_state, creating_jobs)| {
385                    (
386                        *database_id,
387                        &database_state.inflight_subscription_info,
388                        creating_jobs
389                            .values()
390                            .filter_map(|job| job.is_consuming().then_some(job.job_id)),
391                    )
392                })
393        })
394    }
395}
396
397pub(crate) enum CheckpointControlEvent<'a> {
398    EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
399    EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
400}
401
402impl CheckpointControl {
403    pub(crate) fn on_reset_database_resp(
404        &mut self,
405        worker_id: WorkerId,
406        resp: ResetDatabaseResponse,
407    ) {
408        let database_id = DatabaseId::new(resp.database_id);
409        match self.databases.get_mut(&database_id).expect("should exist") {
410            DatabaseCheckpointControlStatus::Running(_) => {
411                unreachable!("should not receive reset database resp when running")
412            }
413            DatabaseCheckpointControlStatus::Recovering(state) => {
414                state.on_reset_database_resp(worker_id, resp)
415            }
416        }
417    }
418
419    pub(crate) fn next_event(
420        &mut self,
421    ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
422        let mut this = Some(self);
423        poll_fn(move |cx| {
424            let Some(this_mut) = this.as_mut() else {
425                unreachable!("should not be polled after poll ready")
426            };
427            for (&database_id, database_status) in &mut this_mut.databases {
428                match database_status {
429                    DatabaseCheckpointControlStatus::Running(_) => {}
430                    DatabaseCheckpointControlStatus::Recovering(state) => {
431                        let poll_result = state.poll_next_event(cx);
432                        if let Poll::Ready(action) = poll_result {
433                            let this = this.take().expect("checked Some");
434                            return Poll::Ready(match action {
435                                RecoveringStateAction::EnterInitializing(reset_workers) => {
436                                    CheckpointControlEvent::EnteringInitializing(
437                                        this.new_database_status_action(
438                                            database_id,
439                                            EnterInitializing(reset_workers),
440                                        ),
441                                    )
442                                }
443                                RecoveringStateAction::EnterRunning => {
444                                    CheckpointControlEvent::EnteringRunning(
445                                        this.new_database_status_action(database_id, EnterRunning),
446                                    )
447                                }
448                            });
449                        }
450                    }
451                }
452            }
453            Poll::Pending
454        })
455    }
456}
457
458pub(crate) enum DatabaseCheckpointControlStatus {
459    Running(DatabaseCheckpointControl),
460    Recovering(DatabaseRecoveringState),
461}
462
463impl DatabaseCheckpointControlStatus {
464    fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
465        match self {
466            DatabaseCheckpointControlStatus::Running(state) => Some(state),
467            DatabaseCheckpointControlStatus::Recovering(_) => None,
468        }
469    }
470
471    fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
472        match self {
473            DatabaseCheckpointControlStatus::Running(state) => Some(state),
474            DatabaseCheckpointControlStatus::Recovering(_) => None,
475        }
476    }
477
478    fn database_state(
479        &self,
480    ) -> Option<(
481        &BarrierWorkerState,
482        &HashMap<TableId, CreatingStreamingJobControl>,
483    )> {
484        match self {
485            DatabaseCheckpointControlStatus::Running(control) => {
486                Some((&control.state, &control.creating_streaming_job_controls))
487            }
488            DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
489        }
490    }
491
492    fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
493        match self {
494            DatabaseCheckpointControlStatus::Running(state) => state,
495            DatabaseCheckpointControlStatus::Recovering(_) => {
496                panic!("should be at running: {}", reason)
497            }
498        }
499    }
500}
501
502struct DatabaseCheckpointControlMetrics {
503    barrier_latency: LabelGuardedHistogram,
504    in_flight_barrier_nums: LabelGuardedIntGauge,
505    all_barrier_nums: LabelGuardedIntGauge,
506}
507
508impl DatabaseCheckpointControlMetrics {
509    fn new(database_id: DatabaseId) -> Self {
510        let database_id_str = database_id.database_id.to_string();
511        let barrier_latency = GLOBAL_META_METRICS
512            .barrier_latency
513            .with_guarded_label_values(&[&database_id_str]);
514        let in_flight_barrier_nums = GLOBAL_META_METRICS
515            .in_flight_barrier_nums
516            .with_guarded_label_values(&[&database_id_str]);
517        let all_barrier_nums = GLOBAL_META_METRICS
518            .all_barrier_nums
519            .with_guarded_label_values(&[&database_id_str]);
520        Self {
521            barrier_latency,
522            in_flight_barrier_nums,
523            all_barrier_nums,
524        }
525    }
526}
527
528/// Controls the concurrent execution of commands.
529pub(crate) struct DatabaseCheckpointControl {
530    database_id: DatabaseId,
531    state: BarrierWorkerState,
532
533    /// Save the state and message of barrier in order.
534    /// Key is the `prev_epoch`.
535    command_ctx_queue: BTreeMap<u64, EpochNode>,
536    /// The barrier that are completing.
537    /// Some(`prev_epoch`)
538    completing_barrier: Option<u64>,
539
540    committed_epoch: Option<u64>,
541    creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
542
543    create_mview_tracker: CreateMviewProgressTracker,
544    cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
545
546    metrics: DatabaseCheckpointControlMetrics,
547}
548
549impl DatabaseCheckpointControl {
550    fn new(
551        database_id: DatabaseId,
552        shared_actor_infos: SharedActorInfos,
553        cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
554    ) -> Self {
555        Self {
556            database_id,
557            state: BarrierWorkerState::new(database_id, shared_actor_infos),
558            command_ctx_queue: Default::default(),
559            completing_barrier: None,
560            committed_epoch: None,
561            creating_streaming_job_controls: Default::default(),
562            create_mview_tracker: Default::default(),
563            cdc_table_backfill_tracker,
564            metrics: DatabaseCheckpointControlMetrics::new(database_id),
565        }
566    }
567
568    pub(crate) fn recovery(
569        database_id: DatabaseId,
570        create_mview_tracker: CreateMviewProgressTracker,
571        state: BarrierWorkerState,
572        committed_epoch: u64,
573        creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
574        cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
575    ) -> Self {
576        Self {
577            database_id,
578            state,
579            command_ctx_queue: Default::default(),
580            completing_barrier: None,
581            committed_epoch: Some(committed_epoch),
582            creating_streaming_job_controls,
583            create_mview_tracker,
584            cdc_table_backfill_tracker,
585            metrics: DatabaseCheckpointControlMetrics::new(database_id),
586        }
587    }
588
589    fn total_command_num(&self) -> usize {
590        self.command_ctx_queue.len()
591            + match &self.completing_barrier {
592                Some(_) => 1,
593                None => 0,
594            }
595    }
596
597    /// Update the metrics of barrier nums.
598    fn update_barrier_nums_metrics(&self) {
599        self.metrics.in_flight_barrier_nums.set(
600            self.command_ctx_queue
601                .values()
602                .filter(|x| x.state.is_inflight())
603                .count() as i64,
604        );
605        self.metrics
606            .all_barrier_nums
607            .set(self.total_command_num() as i64);
608    }
609
610    fn jobs_to_merge(
611        &self,
612    ) -> Option<HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>> {
613        let mut table_ids_to_merge = HashMap::new();
614
615        for (table_id, creating_streaming_job) in &self.creating_streaming_job_controls {
616            if let Some(graph_info) = creating_streaming_job.should_merge_to_upstream() {
617                table_ids_to_merge.insert(
618                    *table_id,
619                    (
620                        creating_streaming_job
621                            .snapshot_backfill_upstream_tables
622                            .clone(),
623                        graph_info.clone(),
624                    ),
625                );
626            }
627        }
628        if table_ids_to_merge.is_empty() {
629            None
630        } else {
631            Some(table_ids_to_merge)
632        }
633    }
634
635    /// Enqueue a barrier command
636    fn enqueue_command(
637        &mut self,
638        command_ctx: CommandContext,
639        notifiers: Vec<Notifier>,
640        node_to_collect: NodeToCollect,
641        creating_jobs_to_wait: HashSet<TableId>,
642    ) {
643        let timer = self.metrics.barrier_latency.start_timer();
644
645        if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
646            assert_eq!(
647                command_ctx.barrier_info.prev_epoch.value(),
648                node.command_ctx.barrier_info.curr_epoch.value()
649            );
650        }
651
652        tracing::trace!(
653            prev_epoch = command_ctx.barrier_info.prev_epoch(),
654            ?creating_jobs_to_wait,
655            ?node_to_collect,
656            "enqueue command"
657        );
658        self.command_ctx_queue.insert(
659            command_ctx.barrier_info.prev_epoch(),
660            EpochNode {
661                enqueue_time: timer,
662                state: BarrierEpochState {
663                    node_to_collect,
664                    resps: vec![],
665                    creating_jobs_to_wait,
666                    finished_jobs: HashMap::new(),
667                },
668                command_ctx,
669                notifiers,
670            },
671        );
672    }
673
674    /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
675    /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
676    fn barrier_collected(
677        &mut self,
678        resp: BarrierCompleteResponse,
679        periodic_barriers: &mut PeriodicBarriers,
680    ) -> MetaResult<()> {
681        let worker_id = resp.worker_id;
682        let prev_epoch = resp.epoch;
683        tracing::trace!(
684            worker_id,
685            prev_epoch,
686            partial_graph_id = resp.partial_graph_id,
687            "barrier collected"
688        );
689        let creating_job_id = from_partial_graph_id(resp.partial_graph_id);
690        match creating_job_id {
691            None => {
692                if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
693                    assert!(
694                        node.state
695                            .node_to_collect
696                            .remove(&(worker_id as _))
697                            .is_some()
698                    );
699                    node.state.resps.push(resp);
700                } else {
701                    panic!(
702                        "collect barrier on non-existing barrier: {}, {}",
703                        prev_epoch, worker_id
704                    );
705                }
706            }
707            Some(creating_job_id) => {
708                let should_merge_to_upstream = self
709                    .creating_streaming_job_controls
710                    .get_mut(&creating_job_id)
711                    .expect("should exist")
712                    .collect(resp);
713                if should_merge_to_upstream {
714                    periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
715                }
716            }
717        }
718        Ok(())
719    }
720
721    /// Pause inject barrier until True.
722    fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
723        self.command_ctx_queue
724            .values()
725            .filter(|x| x.state.is_inflight())
726            .count()
727            < in_flight_barrier_nums
728    }
729
730    /// Return whether the database can still work after worker failure
731    pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
732        for epoch_node in self.command_ctx_queue.values_mut() {
733            if !is_valid_after_worker_err(&mut epoch_node.state.node_to_collect, worker_id) {
734                return false;
735            }
736        }
737        // TODO: include barrier in creating jobs
738        true
739    }
740}
741
742impl DatabaseCheckpointControl {
743    /// return creating job table fragment id -> (backfill progress epoch , {`upstream_mv_table_id`})
744    fn collect_backfill_pinned_upstream_log_epoch(
745        &self,
746    ) -> HashMap<TableId, (u64, HashSet<TableId>)> {
747        self.creating_streaming_job_controls
748            .iter()
749            .filter_map(|(table_id, creating_job)| {
750                creating_job
751                    .pinned_upstream_log_epoch()
752                    .map(|progress_epoch| {
753                        (
754                            *table_id,
755                            (
756                                progress_epoch,
757                                creating_job.snapshot_backfill_upstream_tables.clone(),
758                            ),
759                        )
760                    })
761            })
762            .collect()
763    }
764
765    fn next_complete_barrier_task(
766        &mut self,
767        task: &mut Option<CompleteBarrierTask>,
768        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
769        hummock_version_stats: &HummockVersionStats,
770    ) {
771        // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
772        let mut creating_jobs_task = vec![];
773        {
774            // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
775            let mut finished_jobs = Vec::new();
776            let min_upstream_inflight_barrier = self
777                .command_ctx_queue
778                .first_key_value()
779                .map(|(epoch, _)| *epoch);
780            for (table_id, job) in &mut self.creating_streaming_job_controls {
781                if let Some((epoch, resps, status)) =
782                    job.start_completing(min_upstream_inflight_barrier)
783                {
784                    let is_first_time = match status {
785                        CompleteJobType::First => true,
786                        CompleteJobType::Normal => false,
787                        CompleteJobType::Finished => {
788                            finished_jobs.push((*table_id, epoch, resps));
789                            continue;
790                        }
791                    };
792                    creating_jobs_task.push((*table_id, epoch, resps, is_first_time));
793                }
794            }
795            if !finished_jobs.is_empty()
796                && let Some((_, control_stream_manager)) = &mut context
797            {
798                control_stream_manager.remove_partial_graph(
799                    self.database_id,
800                    finished_jobs
801                        .iter()
802                        .map(|(table_id, _, _)| *table_id)
803                        .collect(),
804                );
805            }
806            for (table_id, epoch, resps) in finished_jobs {
807                let epoch_state = &mut self
808                    .command_ctx_queue
809                    .get_mut(&epoch)
810                    .expect("should exist")
811                    .state;
812                assert!(epoch_state.creating_jobs_to_wait.remove(&table_id));
813                debug!(epoch, ?table_id, "finish creating job");
814                // It's safe to remove the creating job, because on CompleteJobType::Finished,
815                // all previous barriers have been collected and completed.
816                let creating_streaming_job = self
817                    .creating_streaming_job_controls
818                    .remove(&table_id)
819                    .expect("should exist");
820                assert!(creating_streaming_job.is_finished());
821                assert!(epoch_state.finished_jobs.insert(table_id, resps).is_none());
822            }
823        }
824        assert!(self.completing_barrier.is_none());
825        while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
826            && !state.is_inflight()
827        {
828            {
829                let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
830                assert!(node.state.creating_jobs_to_wait.is_empty());
831                assert!(node.state.node_to_collect.is_empty());
832
833                // Process load_finished_source_ids for all barrier types (checkpoint and non-checkpoint)
834                let load_finished_source_ids: Vec<_> = node
835                    .state
836                    .resps
837                    .iter()
838                    .flat_map(|resp| &resp.load_finished_source_ids)
839                    .cloned()
840                    .collect();
841                if !load_finished_source_ids.is_empty() {
842                    // Add load_finished_source_ids to the task for processing
843                    let task = task.get_or_insert_default();
844                    task.load_finished_source_ids
845                        .extend(load_finished_source_ids);
846                }
847                // Process refresh_finished_table_ids for all barrier types (checkpoint and non-checkpoint)
848                let refresh_finished_table_ids: Vec<_> = node
849                    .state
850                    .resps
851                    .iter()
852                    .flat_map(|resp| &resp.refresh_finished_tables)
853                    .cloned()
854                    .collect();
855                if !refresh_finished_table_ids.is_empty() {
856                    // Add refresh_finished_table_ids to the task for processing
857                    let task = task.get_or_insert_default();
858                    task.refresh_finished_table_ids
859                        .extend(refresh_finished_table_ids);
860                }
861
862                let mut finished_jobs = self.create_mview_tracker.apply_collected_command(
863                    node.command_ctx.command.as_ref(),
864                    &node.command_ctx.barrier_info,
865                    &node.state.resps,
866                    hummock_version_stats,
867                );
868                let finished_cdc_backfill = self
869                    .cdc_table_backfill_tracker
870                    .apply_collected_command(&node.state.resps);
871                if !node.command_ctx.barrier_info.kind.is_checkpoint() {
872                    assert!(finished_jobs.is_empty());
873                    node.notifiers.into_iter().for_each(|notifier| {
874                        notifier.notify_collected();
875                    });
876                    if let Some((periodic_barriers, _)) = &mut context
877                        && self.create_mview_tracker.has_pending_finished_jobs()
878                        && self
879                            .command_ctx_queue
880                            .values()
881                            .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
882                    {
883                        periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
884                    }
885                    continue;
886                }
887                node.state
888                    .finished_jobs
889                    .drain()
890                    .for_each(|(job_id, resps)| {
891                        node.state.resps.extend(resps);
892                        finished_jobs.push(TrackingJob::New(TrackingCommand { job_id }));
893                    });
894                let task = task.get_or_insert_default();
895                node.command_ctx.collect_commit_epoch_info(
896                    &mut task.commit_info,
897                    take(&mut node.state.resps),
898                    self.collect_backfill_pinned_upstream_log_epoch(),
899                );
900                self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
901                task.finished_jobs.extend(finished_jobs);
902                task.finished_cdc_table_backfill
903                    .extend(finished_cdc_backfill);
904                task.notifiers.extend(node.notifiers);
905                task.epoch_infos
906                    .try_insert(
907                        self.database_id,
908                        (Some((node.command_ctx, node.enqueue_time)), vec![]),
909                    )
910                    .expect("non duplicate");
911                break;
912            }
913        }
914        if !creating_jobs_task.is_empty() {
915            let task = task.get_or_insert_default();
916            for (table_id, epoch, resps, is_first_time) in creating_jobs_task {
917                collect_creating_job_commit_epoch_info(
918                    &mut task.commit_info,
919                    epoch,
920                    resps,
921                    self.creating_streaming_job_controls[&table_id].state_table_ids(),
922                    is_first_time,
923                );
924                let (_, creating_job_epochs) =
925                    task.epoch_infos.entry(self.database_id).or_default();
926                creating_job_epochs.push((table_id, epoch));
927            }
928        }
929    }
930
931    fn ack_completed(
932        &mut self,
933        command_prev_epoch: Option<u64>,
934        creating_job_epochs: Vec<(TableId, u64)>,
935    ) {
936        {
937            if let Some(prev_epoch) = self.completing_barrier.take() {
938                assert_eq!(command_prev_epoch, Some(prev_epoch));
939                self.committed_epoch = Some(prev_epoch);
940            } else {
941                assert_eq!(command_prev_epoch, None);
942            };
943            for (table_id, epoch) in creating_job_epochs {
944                self.creating_streaming_job_controls
945                    .get_mut(&table_id)
946                    .expect("should exist")
947                    .ack_completed(epoch)
948            }
949        }
950    }
951}
952
953/// The state and message of this barrier, a node for concurrent checkpoint.
954struct EpochNode {
955    /// Timer for recording barrier latency, taken after `complete_barriers`.
956    enqueue_time: HistogramTimer,
957
958    /// Whether this barrier is in-flight or completed.
959    state: BarrierEpochState,
960
961    /// Context of this command to generate barrier and do some post jobs.
962    command_ctx: CommandContext,
963    /// Notifiers of this barrier.
964    notifiers: Vec<Notifier>,
965}
966
967#[derive(Debug)]
968/// The state of barrier.
969struct BarrierEpochState {
970    node_to_collect: NodeToCollect,
971
972    resps: Vec<BarrierCompleteResponse>,
973
974    creating_jobs_to_wait: HashSet<TableId>,
975
976    finished_jobs: HashMap<TableId, Vec<BarrierCompleteResponse>>,
977}
978
979impl BarrierEpochState {
980    fn is_inflight(&self) -> bool {
981        !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
982    }
983}
984
985impl DatabaseCheckpointControl {
986    /// Handle the new barrier from the scheduled queue and inject it.
987    fn handle_new_barrier(
988        &mut self,
989        command: Option<(Command, Vec<Notifier>)>,
990        checkpoint: bool,
991        span: tracing::Span,
992        control_stream_manager: &mut ControlStreamManager,
993        hummock_version_stats: &HummockVersionStats,
994    ) -> MetaResult<()> {
995        let curr_epoch = self.state.in_flight_prev_epoch().next();
996
997        let (mut command, mut notifiers) = if let Some((command, notifiers)) = command {
998            (Some(command), notifiers)
999        } else {
1000            (None, vec![])
1001        };
1002
1003        for job_to_cancel in command
1004            .as_ref()
1005            .map(Command::jobs_to_drop)
1006            .into_iter()
1007            .flatten()
1008        {
1009            if self
1010                .creating_streaming_job_controls
1011                .contains_key(&job_to_cancel)
1012            {
1013                warn!(
1014                    job_id = job_to_cancel.table_id,
1015                    "ignore cancel command on creating streaming job"
1016                );
1017                for notifier in notifiers {
1018                    notifier
1019                        .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
1020                }
1021                return Ok(());
1022            }
1023        }
1024
1025        if let Some(Command::RescheduleFragment { .. }) = &command
1026            && !self.creating_streaming_job_controls.is_empty()
1027        {
1028            warn!("ignore reschedule when creating streaming job with snapshot backfill");
1029            for notifier in notifiers {
1030                notifier.notify_start_failed(
1031                    anyhow!(
1032                            "cannot reschedule when creating streaming job with snapshot backfill",
1033                        )
1034                        .into(),
1035                );
1036            }
1037            return Ok(());
1038        }
1039
1040        let Some(barrier_info) =
1041            self.state
1042                .next_barrier_info(command.as_ref(), checkpoint, curr_epoch)
1043        else {
1044            // skip the command when there is nothing to do with the barrier
1045            for mut notifier in notifiers {
1046                notifier.notify_started();
1047                notifier.notify_collected();
1048            }
1049            return Ok(());
1050        };
1051
1052        let mut edges = self
1053            .state
1054            .inflight_graph_info
1055            .build_edge(command.as_ref(), &*control_stream_manager);
1056
1057        // Insert newly added creating job
1058        if let Some(Command::CreateStreamingJob {
1059            job_type,
1060            info,
1061            cross_db_snapshot_backfill_info,
1062        }) = &mut command
1063        {
1064            match job_type {
1065                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
1066                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1067                        fill_snapshot_backfill_epoch(
1068                            &mut fragment.nodes,
1069                            None,
1070                            cross_db_snapshot_backfill_info,
1071                        )?;
1072                    }
1073                }
1074                CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
1075                    if self.state.is_paused() {
1076                        warn!("cannot create streaming job with snapshot backfill when paused");
1077                        for notifier in notifiers {
1078                            notifier.notify_start_failed(
1079                                anyhow!("cannot create streaming job with snapshot backfill when paused",)
1080                                    .into(),
1081                            );
1082                        }
1083                        return Ok(());
1084                    }
1085                    // set snapshot epoch of upstream table for snapshot backfill
1086                    for snapshot_backfill_epoch in snapshot_backfill_info
1087                        .upstream_mv_table_id_to_backfill_epoch
1088                        .values_mut()
1089                    {
1090                        assert_eq!(
1091                            snapshot_backfill_epoch.replace(barrier_info.prev_epoch()),
1092                            None,
1093                            "must not set previously"
1094                        );
1095                    }
1096                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1097                        if let Err(e) = fill_snapshot_backfill_epoch(
1098                            &mut fragment.nodes,
1099                            Some(snapshot_backfill_info),
1100                            cross_db_snapshot_backfill_info,
1101                        ) {
1102                            warn!(e = %e.as_report(), "failed to fill snapshot backfill epoch");
1103                            for notifier in notifiers {
1104                                notifier.notify_start_failed(e.clone());
1105                            }
1106                            return Ok(());
1107                        };
1108                    }
1109                    let job_id = info.stream_job_fragments.stream_job_id();
1110                    let snapshot_backfill_upstream_tables = snapshot_backfill_info
1111                        .upstream_mv_table_id_to_backfill_epoch
1112                        .keys()
1113                        .cloned()
1114                        .collect();
1115
1116                    let job = CreatingStreamingJobControl::new(
1117                        info,
1118                        snapshot_backfill_upstream_tables,
1119                        barrier_info.prev_epoch(),
1120                        hummock_version_stats,
1121                        control_stream_manager,
1122                        edges.as_mut().expect("should exist"),
1123                    )?;
1124
1125                    self.state
1126                        .inflight_graph_info
1127                        .shared_actor_infos
1128                        .upsert(self.database_id, job.graph_info().fragment_infos.values());
1129
1130                    self.creating_streaming_job_controls.insert(job_id, job);
1131                }
1132            }
1133        }
1134
1135        // Collect the jobs to finish
1136        if let (BarrierKind::Checkpoint(_), None) = (&barrier_info.kind, &command) {
1137            if let Some(jobs_to_merge) = self.jobs_to_merge() {
1138                command = Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge));
1139            } else {
1140                let pending_backfill_nodes =
1141                    self.create_mview_tracker.take_pending_backfill_nodes();
1142                if !pending_backfill_nodes.is_empty() {
1143                    command = Some(Command::StartFragmentBackfill {
1144                        fragment_ids: pending_backfill_nodes,
1145                    });
1146                }
1147            }
1148        }
1149
1150        let command = command;
1151
1152        let (
1153            pre_applied_graph_info,
1154            pre_applied_subscription_info,
1155            table_ids_to_commit,
1156            jobs_to_wait,
1157            prev_paused_reason,
1158        ) = self.state.apply_command(command.as_ref());
1159
1160        // Tracing related stuff
1161        barrier_info.prev_epoch.span().in_scope(|| {
1162            tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch.value().0, "new barrier enqueued");
1163        });
1164        span.record("epoch", barrier_info.curr_epoch.value().0);
1165
1166        for creating_job in &mut self.creating_streaming_job_controls.values_mut() {
1167            creating_job.on_new_command(control_stream_manager, command.as_ref(), &barrier_info)?;
1168        }
1169
1170        let node_to_collect = match control_stream_manager.inject_command_ctx_barrier(
1171            self.database_id,
1172            command.as_ref(),
1173            &barrier_info,
1174            prev_paused_reason,
1175            &pre_applied_graph_info,
1176            &self.state.inflight_graph_info,
1177            &mut edges,
1178        ) {
1179            Ok(node_to_collect) => node_to_collect,
1180            Err(err) => {
1181                for notifier in notifiers {
1182                    notifier.notify_start_failed(err.clone());
1183                }
1184                fail_point!("inject_barrier_err_success");
1185                return Err(err);
1186            }
1187        };
1188
1189        // Notify about the injection.
1190        notifiers.iter_mut().for_each(|n| n.notify_started());
1191
1192        let command_ctx = CommandContext::new(
1193            barrier_info,
1194            pre_applied_subscription_info,
1195            table_ids_to_commit.clone(),
1196            command,
1197            span,
1198        );
1199
1200        // Record the in-flight barrier.
1201        self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1202
1203        Ok(())
1204    }
1205}