risingwave_meta/barrier/checkpoint/
control.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
26use risingwave_meta_model::WorkerId;
27use risingwave_pb::ddl_service::DdlProgress;
28use risingwave_pb::hummock::HummockVersionStats;
29use risingwave_pb::stream_service::BarrierCompleteResponse;
30use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
31use thiserror_ext::AsReport;
32use tracing::{debug, warn};
33
34use crate::barrier::cdc_progress::CdcTableBackfillTrackerRef;
35use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
36use crate::barrier::checkpoint::recovery::{
37    DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
38    RecoveringStateAction,
39};
40use crate::barrier::checkpoint::state::BarrierWorkerState;
41use crate::barrier::command::CommandContext;
42use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
43use crate::barrier::info::{InflightStreamingJobInfo, SharedActorInfos};
44use crate::barrier::notifier::Notifier;
45use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob};
46use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
47use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
48use crate::barrier::utils::{
49    NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
50};
51use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, InflightSubscriptionInfo};
52use crate::manager::MetaSrvEnv;
53use crate::rpc::metrics::GLOBAL_META_METRICS;
54use crate::stream::fill_snapshot_backfill_epoch;
55use crate::{MetaError, MetaResult};
56
57pub(crate) struct CheckpointControl {
58    pub(crate) env: MetaSrvEnv,
59    pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
60    pub(super) hummock_version_stats: HummockVersionStats,
61}
62
63impl CheckpointControl {
64    pub fn new(env: MetaSrvEnv) -> Self {
65        Self {
66            env,
67            databases: Default::default(),
68            hummock_version_stats: Default::default(),
69        }
70    }
71
72    pub(crate) fn recover(
73        databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
74        failed_databases: HashSet<DatabaseId>,
75        control_stream_manager: &mut ControlStreamManager,
76        hummock_version_stats: HummockVersionStats,
77        env: MetaSrvEnv,
78    ) -> Self {
79        env.shared_actor_infos()
80            .retain_databases(databases.keys().chain(&failed_databases).cloned());
81        Self {
82            env,
83            databases: databases
84                .into_iter()
85                .map(|(database_id, control)| {
86                    (
87                        database_id,
88                        DatabaseCheckpointControlStatus::Running(control),
89                    )
90                })
91                .chain(failed_databases.into_iter().map(|database_id| {
92                    (
93                        database_id,
94                        DatabaseCheckpointControlStatus::Recovering(
95                            DatabaseRecoveringState::resetting(database_id, control_stream_manager),
96                        ),
97                    )
98                }))
99                .collect(),
100            hummock_version_stats,
101        }
102    }
103
104    pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
105        self.hummock_version_stats = output.hummock_version_stats;
106        for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
107            self.databases
108                .get_mut(&database_id)
109                .expect("should exist")
110                .expect_running("should have wait for completing command before enter recovery")
111                .ack_completed(command_prev_epoch, creating_job_epochs);
112        }
113    }
114
115    pub(crate) fn next_complete_barrier_task(
116        &mut self,
117        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
118    ) -> Option<CompleteBarrierTask> {
119        let mut task = None;
120        for database in self.databases.values_mut() {
121            let Some(database) = database.running_state_mut() else {
122                continue;
123            };
124            let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
125            database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
126        }
127        task
128    }
129
130    pub(crate) fn barrier_collected(
131        &mut self,
132        resp: BarrierCompleteResponse,
133        periodic_barriers: &mut PeriodicBarriers,
134    ) -> MetaResult<()> {
135        let database_id = DatabaseId::new(resp.database_id);
136        let database_status = self.databases.get_mut(&database_id).expect("should exist");
137        match database_status {
138            DatabaseCheckpointControlStatus::Running(database) => {
139                database.barrier_collected(resp, periodic_barriers)
140            }
141            DatabaseCheckpointControlStatus::Recovering(state) => {
142                state.barrier_collected(database_id, resp);
143                Ok(())
144            }
145        }
146    }
147
148    pub(crate) fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
149        self.databases.values().all(|database| {
150            database
151                .running_state()
152                .map(|database| database.can_inject_barrier(in_flight_barrier_nums))
153                .unwrap_or(true)
154        })
155    }
156
157    pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
158        self.databases.iter().filter_map(|(database_id, database)| {
159            database.running_state().is_none().then_some(*database_id)
160        })
161    }
162
163    pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
164        self.databases.iter().filter_map(|(database_id, database)| {
165            database.running_state().is_some().then_some(*database_id)
166        })
167    }
168
169    /// return Some(failed `database_id` -> `err`)
170    pub(crate) fn handle_new_barrier(
171        &mut self,
172        new_barrier: NewBarrier,
173        control_stream_manager: &mut ControlStreamManager,
174    ) -> MetaResult<()> {
175        let NewBarrier {
176            database_id,
177            command,
178            span,
179            checkpoint,
180        } = new_barrier;
181
182        if let Some((mut command, notifiers)) = command {
183            if let &mut Command::CreateStreamingJob {
184                ref mut cross_db_snapshot_backfill_info,
185                ref info,
186                ..
187            } = &mut command
188            {
189                for (table_id, snapshot_epoch) in
190                    &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
191                {
192                    for database in self.databases.values() {
193                        if let Some(database) = database.running_state()
194                            && database.state.inflight_graph_info.contains_job(*table_id)
195                        {
196                            if let Some(committed_epoch) = database.committed_epoch {
197                                *snapshot_epoch = Some(committed_epoch);
198                            }
199                            break;
200                        }
201                    }
202                    if snapshot_epoch.is_none() {
203                        let table_id = *table_id;
204                        warn!(
205                            ?cross_db_snapshot_backfill_info,
206                            ?table_id,
207                            ?info,
208                            "database of cross db upstream table not found"
209                        );
210                        let err: MetaError =
211                            anyhow!("database of cross db upstream table {} not found", table_id)
212                                .into();
213                        for notifier in notifiers {
214                            notifier.notify_start_failed(err.clone());
215                        }
216
217                        return Ok(());
218                    }
219                }
220            }
221
222            let database = match self.databases.entry(database_id) {
223                Entry::Occupied(entry) => entry
224                    .into_mut()
225                    .expect_running("should not have command when not running"),
226                Entry::Vacant(entry) => match &command {
227                    Command::CreateStreamingJob {
228                        job_type: CreateStreamingJobType::Normal,
229                        ..
230                    } => {
231                        let new_database = DatabaseCheckpointControl::new(
232                            database_id,
233                            self.env.shared_actor_infos().clone(),
234                            self.env.cdc_table_backfill_tracker(),
235                        );
236                        control_stream_manager.add_partial_graph(database_id, None);
237                        entry
238                            .insert(DatabaseCheckpointControlStatus::Running(new_database))
239                            .expect_running("just initialized as running")
240                    }
241                    Command::Flush | Command::Pause | Command::Resume => {
242                        for mut notifier in notifiers {
243                            notifier.notify_started();
244                            notifier.notify_collected();
245                        }
246                        warn!(?command, "skip command for empty database");
247                        return Ok(());
248                    }
249                    _ => {
250                        panic!(
251                            "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
252                            database_id, command
253                        )
254                    }
255                },
256            };
257
258            database.handle_new_barrier(
259                Some((command, notifiers)),
260                checkpoint,
261                span.clone(),
262                control_stream_manager,
263                &self.hummock_version_stats,
264            )
265        } else {
266            let database = match self.databases.entry(database_id) {
267                Entry::Occupied(entry) => entry.into_mut(),
268                Entry::Vacant(_) => {
269                    // If it does not exist in the HashMap yet, it means that the first streaming
270                    // job has not been created, and we do not need to send a barrier.
271                    return Ok(());
272                }
273            };
274            let Some(database) = database.running_state_mut() else {
275                // Skip new barrier for database which is not running.
276                return Ok(());
277            };
278            database.handle_new_barrier(
279                None,
280                checkpoint,
281                span.clone(),
282                control_stream_manager,
283                &self.hummock_version_stats,
284            )
285        }
286    }
287
288    pub(crate) fn update_barrier_nums_metrics(&self) {
289        self.databases
290            .values()
291            .flat_map(|database| database.running_state())
292            .for_each(|database| database.update_barrier_nums_metrics());
293    }
294
295    pub(crate) fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
296        let mut progress = HashMap::new();
297        for status in self.databases.values() {
298            let Some(database_checkpoint_control) = status.running_state() else {
299                continue;
300            };
301            // Progress of normal backfill
302            progress.extend(
303                database_checkpoint_control
304                    .create_mview_tracker
305                    .gen_ddl_progress(),
306            );
307            // Progress of snapshot backfill
308            for creating_job in database_checkpoint_control
309                .creating_streaming_job_controls
310                .values()
311            {
312                progress.extend([(
313                    creating_job.job_id.table_id,
314                    creating_job.gen_ddl_progress(),
315                )]);
316            }
317        }
318        progress
319    }
320
321    pub(crate) fn databases_failed_at_worker_err(
322        &mut self,
323        worker_id: WorkerId,
324    ) -> Vec<DatabaseId> {
325        let mut failed_databases = Vec::new();
326        for (database_id, database_status) in &mut self.databases {
327            let database_checkpoint_control = match database_status {
328                DatabaseCheckpointControlStatus::Running(control) => control,
329                DatabaseCheckpointControlStatus::Recovering(state) => {
330                    if !state.is_valid_after_worker_err(worker_id) {
331                        failed_databases.push(*database_id);
332                    }
333                    continue;
334                }
335            };
336
337            if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
338                || database_checkpoint_control
339                    .state
340                    .inflight_graph_info
341                    .contains_worker(worker_id as _)
342                || database_checkpoint_control
343                    .creating_streaming_job_controls
344                    .values_mut()
345                    .any(|job| !job.is_valid_after_worker_err(worker_id))
346            {
347                failed_databases.push(*database_id);
348            }
349        }
350        failed_databases
351    }
352
353    pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
354        for (_, node) in self.databases.values_mut().flat_map(|status| {
355            status
356                .running_state_mut()
357                .map(|database| take(&mut database.command_ctx_queue))
358                .into_iter()
359                .flatten()
360        }) {
361            for notifier in node.notifiers {
362                notifier.notify_collection_failed(err.clone());
363            }
364            node.enqueue_time.observe_duration();
365        }
366        self.databases.values_mut().for_each(|database| {
367            if let Some(database) = database.running_state_mut() {
368                database.create_mview_tracker.abort_all()
369            }
370        });
371    }
372
373    pub(crate) fn inflight_infos(
374        &self,
375    ) -> impl Iterator<
376        Item = (
377            DatabaseId,
378            &InflightSubscriptionInfo,
379            impl Iterator<Item = TableId> + '_,
380        ),
381    > + '_ {
382        self.databases.iter().flat_map(|(database_id, database)| {
383            database
384                .database_state()
385                .map(|(database_state, creating_jobs)| {
386                    (
387                        *database_id,
388                        &database_state.inflight_subscription_info,
389                        creating_jobs
390                            .values()
391                            .filter_map(|job| job.is_consuming().then_some(job.job_id)),
392                    )
393                })
394        })
395    }
396}
397
398pub(crate) enum CheckpointControlEvent<'a> {
399    EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
400    EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
401}
402
403impl CheckpointControl {
404    pub(crate) fn on_reset_database_resp(
405        &mut self,
406        worker_id: WorkerId,
407        resp: ResetDatabaseResponse,
408    ) {
409        let database_id = DatabaseId::new(resp.database_id);
410        match self.databases.get_mut(&database_id).expect("should exist") {
411            DatabaseCheckpointControlStatus::Running(_) => {
412                unreachable!("should not receive reset database resp when running")
413            }
414            DatabaseCheckpointControlStatus::Recovering(state) => {
415                state.on_reset_database_resp(worker_id, resp)
416            }
417        }
418    }
419
420    pub(crate) fn next_event(
421        &mut self,
422    ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
423        let mut this = Some(self);
424        poll_fn(move |cx| {
425            let Some(this_mut) = this.as_mut() else {
426                unreachable!("should not be polled after poll ready")
427            };
428            for (&database_id, database_status) in &mut this_mut.databases {
429                match database_status {
430                    DatabaseCheckpointControlStatus::Running(_) => {}
431                    DatabaseCheckpointControlStatus::Recovering(state) => {
432                        let poll_result = state.poll_next_event(cx);
433                        if let Poll::Ready(action) = poll_result {
434                            let this = this.take().expect("checked Some");
435                            return Poll::Ready(match action {
436                                RecoveringStateAction::EnterInitializing(reset_workers) => {
437                                    CheckpointControlEvent::EnteringInitializing(
438                                        this.new_database_status_action(
439                                            database_id,
440                                            EnterInitializing(reset_workers),
441                                        ),
442                                    )
443                                }
444                                RecoveringStateAction::EnterRunning => {
445                                    CheckpointControlEvent::EnteringRunning(
446                                        this.new_database_status_action(database_id, EnterRunning),
447                                    )
448                                }
449                            });
450                        }
451                    }
452                }
453            }
454            Poll::Pending
455        })
456    }
457}
458
459pub(crate) enum DatabaseCheckpointControlStatus {
460    Running(DatabaseCheckpointControl),
461    Recovering(DatabaseRecoveringState),
462}
463
464impl DatabaseCheckpointControlStatus {
465    fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
466        match self {
467            DatabaseCheckpointControlStatus::Running(state) => Some(state),
468            DatabaseCheckpointControlStatus::Recovering(_) => None,
469        }
470    }
471
472    fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
473        match self {
474            DatabaseCheckpointControlStatus::Running(state) => Some(state),
475            DatabaseCheckpointControlStatus::Recovering(_) => None,
476        }
477    }
478
479    fn database_state(
480        &self,
481    ) -> Option<(
482        &BarrierWorkerState,
483        &HashMap<TableId, CreatingStreamingJobControl>,
484    )> {
485        match self {
486            DatabaseCheckpointControlStatus::Running(control) => {
487                Some((&control.state, &control.creating_streaming_job_controls))
488            }
489            DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
490        }
491    }
492
493    fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
494        match self {
495            DatabaseCheckpointControlStatus::Running(state) => state,
496            DatabaseCheckpointControlStatus::Recovering(_) => {
497                panic!("should be at running: {}", reason)
498            }
499        }
500    }
501}
502
503struct DatabaseCheckpointControlMetrics {
504    barrier_latency: LabelGuardedHistogram,
505    in_flight_barrier_nums: LabelGuardedIntGauge,
506    all_barrier_nums: LabelGuardedIntGauge,
507}
508
509impl DatabaseCheckpointControlMetrics {
510    fn new(database_id: DatabaseId) -> Self {
511        let database_id_str = database_id.database_id.to_string();
512        let barrier_latency = GLOBAL_META_METRICS
513            .barrier_latency
514            .with_guarded_label_values(&[&database_id_str]);
515        let in_flight_barrier_nums = GLOBAL_META_METRICS
516            .in_flight_barrier_nums
517            .with_guarded_label_values(&[&database_id_str]);
518        let all_barrier_nums = GLOBAL_META_METRICS
519            .all_barrier_nums
520            .with_guarded_label_values(&[&database_id_str]);
521        Self {
522            barrier_latency,
523            in_flight_barrier_nums,
524            all_barrier_nums,
525        }
526    }
527}
528
529/// Controls the concurrent execution of commands.
530pub(crate) struct DatabaseCheckpointControl {
531    database_id: DatabaseId,
532    state: BarrierWorkerState,
533
534    /// Save the state and message of barrier in order.
535    /// Key is the `prev_epoch`.
536    command_ctx_queue: BTreeMap<u64, EpochNode>,
537    /// The barrier that are completing.
538    /// Some(`prev_epoch`)
539    completing_barrier: Option<u64>,
540
541    committed_epoch: Option<u64>,
542    creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
543
544    create_mview_tracker: CreateMviewProgressTracker,
545    cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
546
547    metrics: DatabaseCheckpointControlMetrics,
548}
549
550impl DatabaseCheckpointControl {
551    fn new(
552        database_id: DatabaseId,
553        shared_actor_infos: SharedActorInfos,
554        cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
555    ) -> Self {
556        Self {
557            database_id,
558            state: BarrierWorkerState::new(database_id, shared_actor_infos),
559            command_ctx_queue: Default::default(),
560            completing_barrier: None,
561            committed_epoch: None,
562            creating_streaming_job_controls: Default::default(),
563            create_mview_tracker: Default::default(),
564            cdc_table_backfill_tracker,
565            metrics: DatabaseCheckpointControlMetrics::new(database_id),
566        }
567    }
568
569    pub(crate) fn recovery(
570        database_id: DatabaseId,
571        create_mview_tracker: CreateMviewProgressTracker,
572        state: BarrierWorkerState,
573        committed_epoch: u64,
574        creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
575        cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
576    ) -> Self {
577        Self {
578            database_id,
579            state,
580            command_ctx_queue: Default::default(),
581            completing_barrier: None,
582            committed_epoch: Some(committed_epoch),
583            creating_streaming_job_controls,
584            create_mview_tracker,
585            cdc_table_backfill_tracker,
586            metrics: DatabaseCheckpointControlMetrics::new(database_id),
587        }
588    }
589
590    fn total_command_num(&self) -> usize {
591        self.command_ctx_queue.len()
592            + match &self.completing_barrier {
593                Some(_) => 1,
594                None => 0,
595            }
596    }
597
598    /// Update the metrics of barrier nums.
599    fn update_barrier_nums_metrics(&self) {
600        self.metrics.in_flight_barrier_nums.set(
601            self.command_ctx_queue
602                .values()
603                .filter(|x| x.state.is_inflight())
604                .count() as i64,
605        );
606        self.metrics
607            .all_barrier_nums
608            .set(self.total_command_num() as i64);
609    }
610
611    fn jobs_to_merge(
612        &self,
613    ) -> Option<HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>> {
614        let mut table_ids_to_merge = HashMap::new();
615
616        for (table_id, creating_streaming_job) in &self.creating_streaming_job_controls {
617            if let Some(graph_info) = creating_streaming_job.should_merge_to_upstream() {
618                table_ids_to_merge.insert(
619                    *table_id,
620                    (
621                        creating_streaming_job
622                            .snapshot_backfill_upstream_tables
623                            .clone(),
624                        graph_info.clone(),
625                    ),
626                );
627            }
628        }
629        if table_ids_to_merge.is_empty() {
630            None
631        } else {
632            Some(table_ids_to_merge)
633        }
634    }
635
636    /// Enqueue a barrier command
637    fn enqueue_command(
638        &mut self,
639        command_ctx: CommandContext,
640        notifiers: Vec<Notifier>,
641        node_to_collect: NodeToCollect,
642        creating_jobs_to_wait: HashSet<TableId>,
643    ) {
644        let timer = self.metrics.barrier_latency.start_timer();
645
646        if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
647            assert_eq!(
648                command_ctx.barrier_info.prev_epoch.value(),
649                node.command_ctx.barrier_info.curr_epoch.value()
650            );
651        }
652
653        tracing::trace!(
654            prev_epoch = command_ctx.barrier_info.prev_epoch(),
655            ?creating_jobs_to_wait,
656            ?node_to_collect,
657            "enqueue command"
658        );
659        self.command_ctx_queue.insert(
660            command_ctx.barrier_info.prev_epoch(),
661            EpochNode {
662                enqueue_time: timer,
663                state: BarrierEpochState {
664                    node_to_collect,
665                    resps: vec![],
666                    creating_jobs_to_wait,
667                    finished_jobs: HashMap::new(),
668                },
669                command_ctx,
670                notifiers,
671            },
672        );
673    }
674
675    /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
676    /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
677    fn barrier_collected(
678        &mut self,
679        resp: BarrierCompleteResponse,
680        periodic_barriers: &mut PeriodicBarriers,
681    ) -> MetaResult<()> {
682        let worker_id = resp.worker_id;
683        let prev_epoch = resp.epoch;
684        tracing::trace!(
685            worker_id,
686            prev_epoch,
687            partial_graph_id = resp.partial_graph_id,
688            "barrier collected"
689        );
690        let creating_job_id = from_partial_graph_id(resp.partial_graph_id);
691        match creating_job_id {
692            None => {
693                if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
694                    assert!(
695                        node.state
696                            .node_to_collect
697                            .remove(&(worker_id as _))
698                            .is_some()
699                    );
700                    node.state.resps.push(resp);
701                } else {
702                    panic!(
703                        "collect barrier on non-existing barrier: {}, {}",
704                        prev_epoch, worker_id
705                    );
706                }
707            }
708            Some(creating_job_id) => {
709                let should_merge_to_upstream = self
710                    .creating_streaming_job_controls
711                    .get_mut(&creating_job_id)
712                    .expect("should exist")
713                    .collect(resp);
714                if should_merge_to_upstream {
715                    periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
716                }
717            }
718        }
719        Ok(())
720    }
721
722    /// Pause inject barrier until True.
723    fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
724        self.command_ctx_queue
725            .values()
726            .filter(|x| x.state.is_inflight())
727            .count()
728            < in_flight_barrier_nums
729    }
730
731    /// Return whether the database can still work after worker failure
732    pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
733        for epoch_node in self.command_ctx_queue.values_mut() {
734            if !is_valid_after_worker_err(&mut epoch_node.state.node_to_collect, worker_id) {
735                return false;
736            }
737        }
738        // TODO: include barrier in creating jobs
739        true
740    }
741}
742
743impl DatabaseCheckpointControl {
744    /// return creating job table fragment id -> (backfill progress epoch , {`upstream_mv_table_id`})
745    fn collect_backfill_pinned_upstream_log_epoch(
746        &self,
747    ) -> HashMap<TableId, (u64, HashSet<TableId>)> {
748        self.creating_streaming_job_controls
749            .iter()
750            .filter_map(|(table_id, creating_job)| {
751                creating_job
752                    .pinned_upstream_log_epoch()
753                    .map(|progress_epoch| {
754                        (
755                            *table_id,
756                            (
757                                progress_epoch,
758                                creating_job.snapshot_backfill_upstream_tables.clone(),
759                            ),
760                        )
761                    })
762            })
763            .collect()
764    }
765
766    fn next_complete_barrier_task(
767        &mut self,
768        task: &mut Option<CompleteBarrierTask>,
769        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
770        hummock_version_stats: &HummockVersionStats,
771    ) {
772        // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
773        let mut creating_jobs_task = vec![];
774        {
775            // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
776            let mut finished_jobs = Vec::new();
777            let min_upstream_inflight_barrier = self
778                .command_ctx_queue
779                .first_key_value()
780                .map(|(epoch, _)| *epoch);
781            for (table_id, job) in &mut self.creating_streaming_job_controls {
782                if let Some((epoch, resps, status)) =
783                    job.start_completing(min_upstream_inflight_barrier)
784                {
785                    let is_first_time = match status {
786                        CompleteJobType::First => true,
787                        CompleteJobType::Normal => false,
788                        CompleteJobType::Finished => {
789                            finished_jobs.push((*table_id, epoch, resps));
790                            continue;
791                        }
792                    };
793                    creating_jobs_task.push((*table_id, epoch, resps, is_first_time));
794                }
795            }
796            if !finished_jobs.is_empty()
797                && let Some((_, control_stream_manager)) = &mut context
798            {
799                control_stream_manager.remove_partial_graph(
800                    self.database_id,
801                    finished_jobs
802                        .iter()
803                        .map(|(table_id, _, _)| *table_id)
804                        .collect(),
805                );
806            }
807            for (table_id, epoch, resps) in finished_jobs {
808                let epoch_state = &mut self
809                    .command_ctx_queue
810                    .get_mut(&epoch)
811                    .expect("should exist")
812                    .state;
813                assert!(epoch_state.creating_jobs_to_wait.remove(&table_id));
814                debug!(epoch, ?table_id, "finish creating job");
815                // It's safe to remove the creating job, because on CompleteJobType::Finished,
816                // all previous barriers have been collected and completed.
817                let creating_streaming_job = self
818                    .creating_streaming_job_controls
819                    .remove(&table_id)
820                    .expect("should exist");
821                assert!(creating_streaming_job.is_finished());
822                assert!(epoch_state.finished_jobs.insert(table_id, resps).is_none());
823            }
824        }
825        assert!(self.completing_barrier.is_none());
826        while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
827            && !state.is_inflight()
828        {
829            {
830                let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
831                assert!(node.state.creating_jobs_to_wait.is_empty());
832                assert!(node.state.node_to_collect.is_empty());
833
834                // Process load_finished_source_ids for all barrier types (checkpoint and non-checkpoint)
835                let load_finished_source_ids: Vec<_> = node
836                    .state
837                    .resps
838                    .iter()
839                    .flat_map(|resp| &resp.load_finished_source_ids)
840                    .cloned()
841                    .collect();
842                if !load_finished_source_ids.is_empty() {
843                    // Add load_finished_source_ids to the task for processing
844                    let task = task.get_or_insert_default();
845                    task.load_finished_source_ids
846                        .extend(load_finished_source_ids);
847                }
848
849                let mut finished_jobs = self.create_mview_tracker.apply_collected_command(
850                    node.command_ctx.command.as_ref(),
851                    &node.command_ctx.barrier_info,
852                    &node.state.resps,
853                    hummock_version_stats,
854                );
855                let finished_cdc_backfill = self
856                    .cdc_table_backfill_tracker
857                    .apply_collected_command(&node.state.resps);
858                if !node.command_ctx.barrier_info.kind.is_checkpoint() {
859                    assert!(finished_jobs.is_empty());
860                    node.notifiers.into_iter().for_each(|notifier| {
861                        notifier.notify_collected();
862                    });
863                    if let Some((periodic_barriers, _)) = &mut context
864                        && self.create_mview_tracker.has_pending_finished_jobs()
865                        && self
866                            .command_ctx_queue
867                            .values()
868                            .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
869                    {
870                        periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
871                    }
872                    continue;
873                }
874                node.state
875                    .finished_jobs
876                    .drain()
877                    .for_each(|(job_id, resps)| {
878                        node.state.resps.extend(resps);
879                        finished_jobs.push(TrackingJob::New(TrackingCommand { job_id }));
880                    });
881                let task = task.get_or_insert_default();
882                node.command_ctx.collect_commit_epoch_info(
883                    &mut task.commit_info,
884                    take(&mut node.state.resps),
885                    self.collect_backfill_pinned_upstream_log_epoch(),
886                );
887                self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
888                task.finished_jobs.extend(finished_jobs);
889                task.finished_cdc_table_backfill
890                    .extend(finished_cdc_backfill);
891                task.notifiers.extend(node.notifiers);
892                task.epoch_infos
893                    .try_insert(
894                        self.database_id,
895                        (Some((node.command_ctx, node.enqueue_time)), vec![]),
896                    )
897                    .expect("non duplicate");
898                break;
899            }
900        }
901        if !creating_jobs_task.is_empty() {
902            let task = task.get_or_insert_default();
903            for (table_id, epoch, resps, is_first_time) in creating_jobs_task {
904                collect_creating_job_commit_epoch_info(
905                    &mut task.commit_info,
906                    epoch,
907                    resps,
908                    self.creating_streaming_job_controls[&table_id].state_table_ids(),
909                    is_first_time,
910                );
911                let (_, creating_job_epochs) =
912                    task.epoch_infos.entry(self.database_id).or_default();
913                creating_job_epochs.push((table_id, epoch));
914            }
915        }
916    }
917
918    fn ack_completed(
919        &mut self,
920        command_prev_epoch: Option<u64>,
921        creating_job_epochs: Vec<(TableId, u64)>,
922    ) {
923        {
924            if let Some(prev_epoch) = self.completing_barrier.take() {
925                assert_eq!(command_prev_epoch, Some(prev_epoch));
926                self.committed_epoch = Some(prev_epoch);
927            } else {
928                assert_eq!(command_prev_epoch, None);
929            };
930            for (table_id, epoch) in creating_job_epochs {
931                self.creating_streaming_job_controls
932                    .get_mut(&table_id)
933                    .expect("should exist")
934                    .ack_completed(epoch)
935            }
936        }
937    }
938}
939
940/// The state and message of this barrier, a node for concurrent checkpoint.
941struct EpochNode {
942    /// Timer for recording barrier latency, taken after `complete_barriers`.
943    enqueue_time: HistogramTimer,
944
945    /// Whether this barrier is in-flight or completed.
946    state: BarrierEpochState,
947
948    /// Context of this command to generate barrier and do some post jobs.
949    command_ctx: CommandContext,
950    /// Notifiers of this barrier.
951    notifiers: Vec<Notifier>,
952}
953
954#[derive(Debug)]
955/// The state of barrier.
956struct BarrierEpochState {
957    node_to_collect: NodeToCollect,
958
959    resps: Vec<BarrierCompleteResponse>,
960
961    creating_jobs_to_wait: HashSet<TableId>,
962
963    finished_jobs: HashMap<TableId, Vec<BarrierCompleteResponse>>,
964}
965
966impl BarrierEpochState {
967    fn is_inflight(&self) -> bool {
968        !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
969    }
970}
971
972impl DatabaseCheckpointControl {
973    /// Handle the new barrier from the scheduled queue and inject it.
974    fn handle_new_barrier(
975        &mut self,
976        command: Option<(Command, Vec<Notifier>)>,
977        checkpoint: bool,
978        span: tracing::Span,
979        control_stream_manager: &mut ControlStreamManager,
980        hummock_version_stats: &HummockVersionStats,
981    ) -> MetaResult<()> {
982        let curr_epoch = self.state.in_flight_prev_epoch().next();
983
984        let (mut command, mut notifiers) = if let Some((command, notifiers)) = command {
985            (Some(command), notifiers)
986        } else {
987            (None, vec![])
988        };
989
990        for job_to_cancel in command
991            .as_ref()
992            .map(Command::jobs_to_drop)
993            .into_iter()
994            .flatten()
995        {
996            if self
997                .creating_streaming_job_controls
998                .contains_key(&job_to_cancel)
999            {
1000                warn!(
1001                    job_id = job_to_cancel.table_id,
1002                    "ignore cancel command on creating streaming job"
1003                );
1004                for notifier in notifiers {
1005                    notifier
1006                        .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
1007                }
1008                return Ok(());
1009            }
1010        }
1011
1012        if let Some(Command::RescheduleFragment { .. }) = &command
1013            && !self.creating_streaming_job_controls.is_empty()
1014        {
1015            warn!("ignore reschedule when creating streaming job with snapshot backfill");
1016            for notifier in notifiers {
1017                notifier.notify_start_failed(
1018                    anyhow!(
1019                            "cannot reschedule when creating streaming job with snapshot backfill",
1020                        )
1021                        .into(),
1022                );
1023            }
1024            return Ok(());
1025        }
1026
1027        let Some(barrier_info) =
1028            self.state
1029                .next_barrier_info(command.as_ref(), checkpoint, curr_epoch)
1030        else {
1031            // skip the command when there is nothing to do with the barrier
1032            for mut notifier in notifiers {
1033                notifier.notify_started();
1034                notifier.notify_collected();
1035            }
1036            return Ok(());
1037        };
1038
1039        let mut edges = self
1040            .state
1041            .inflight_graph_info
1042            .build_edge(command.as_ref(), &*control_stream_manager);
1043
1044        // Insert newly added creating job
1045        if let Some(Command::CreateStreamingJob {
1046            job_type,
1047            info,
1048            cross_db_snapshot_backfill_info,
1049        }) = &mut command
1050        {
1051            match job_type {
1052                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
1053                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1054                        fill_snapshot_backfill_epoch(
1055                            &mut fragment.nodes,
1056                            None,
1057                            cross_db_snapshot_backfill_info,
1058                        )?;
1059                    }
1060                }
1061                CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
1062                    if self.state.is_paused() {
1063                        warn!("cannot create streaming job with snapshot backfill when paused");
1064                        for notifier in notifiers {
1065                            notifier.notify_start_failed(
1066                                anyhow!("cannot create streaming job with snapshot backfill when paused",)
1067                                    .into(),
1068                            );
1069                        }
1070                        return Ok(());
1071                    }
1072                    // set snapshot epoch of upstream table for snapshot backfill
1073                    for snapshot_backfill_epoch in snapshot_backfill_info
1074                        .upstream_mv_table_id_to_backfill_epoch
1075                        .values_mut()
1076                    {
1077                        assert_eq!(
1078                            snapshot_backfill_epoch.replace(barrier_info.prev_epoch()),
1079                            None,
1080                            "must not set previously"
1081                        );
1082                    }
1083                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1084                        if let Err(e) = fill_snapshot_backfill_epoch(
1085                            &mut fragment.nodes,
1086                            Some(snapshot_backfill_info),
1087                            cross_db_snapshot_backfill_info,
1088                        ) {
1089                            warn!(e = %e.as_report(), "failed to fill snapshot backfill epoch");
1090                            for notifier in notifiers {
1091                                notifier.notify_start_failed(e.clone());
1092                            }
1093                            return Ok(());
1094                        };
1095                    }
1096                    let job_id = info.stream_job_fragments.stream_job_id();
1097                    let snapshot_backfill_upstream_tables = snapshot_backfill_info
1098                        .upstream_mv_table_id_to_backfill_epoch
1099                        .keys()
1100                        .cloned()
1101                        .collect();
1102
1103                    let job = CreatingStreamingJobControl::new(
1104                        info,
1105                        snapshot_backfill_upstream_tables,
1106                        barrier_info.prev_epoch(),
1107                        hummock_version_stats,
1108                        control_stream_manager,
1109                        edges.as_mut().expect("should exist"),
1110                    )?;
1111
1112                    self.state
1113                        .inflight_graph_info
1114                        .shared_actor_infos
1115                        .upsert(self.database_id, job.graph_info().fragment_infos.values());
1116
1117                    self.creating_streaming_job_controls.insert(job_id, job);
1118                }
1119            }
1120        }
1121
1122        // Collect the jobs to finish
1123        if let (BarrierKind::Checkpoint(_), None) = (&barrier_info.kind, &command) {
1124            if let Some(jobs_to_merge) = self.jobs_to_merge() {
1125                command = Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge));
1126            } else {
1127                let pending_backfill_nodes =
1128                    self.create_mview_tracker.take_pending_backfill_nodes();
1129                if !pending_backfill_nodes.is_empty() {
1130                    command = Some(Command::StartFragmentBackfill {
1131                        fragment_ids: pending_backfill_nodes,
1132                    });
1133                }
1134            }
1135        }
1136
1137        let command = command;
1138
1139        let (
1140            pre_applied_graph_info,
1141            pre_applied_subscription_info,
1142            table_ids_to_commit,
1143            jobs_to_wait,
1144            prev_paused_reason,
1145        ) = self.state.apply_command(command.as_ref());
1146
1147        // Tracing related stuff
1148        barrier_info.prev_epoch.span().in_scope(|| {
1149            tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch.value().0, "new barrier enqueued");
1150        });
1151        span.record("epoch", barrier_info.curr_epoch.value().0);
1152
1153        for creating_job in &mut self.creating_streaming_job_controls.values_mut() {
1154            creating_job.on_new_command(control_stream_manager, command.as_ref(), &barrier_info)?;
1155        }
1156
1157        let node_to_collect = match control_stream_manager.inject_command_ctx_barrier(
1158            self.database_id,
1159            command.as_ref(),
1160            &barrier_info,
1161            prev_paused_reason,
1162            &pre_applied_graph_info,
1163            &self.state.inflight_graph_info,
1164            &mut edges,
1165        ) {
1166            Ok(node_to_collect) => node_to_collect,
1167            Err(err) => {
1168                for notifier in notifiers {
1169                    notifier.notify_start_failed(err.clone());
1170                }
1171                fail_point!("inject_barrier_err_success");
1172                return Err(err);
1173            }
1174        };
1175
1176        // Notify about the injection.
1177        notifiers.iter_mut().for_each(|n| n.notify_started());
1178
1179        let command_ctx = CommandContext::new(
1180            barrier_info,
1181            pre_applied_subscription_info,
1182            table_ids_to_commit.clone(),
1183            command,
1184            span,
1185        );
1186
1187        // Record the in-flight barrier.
1188        self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1189
1190        Ok(())
1191    }
1192}