risingwave_meta/barrier/checkpoint/
control.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::ddl_service::DdlProgress;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::stream_service::BarrierCompleteResponse;
31use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
32use thiserror_ext::AsReport;
33use tracing::{debug, warn};
34
35use crate::barrier::cdc_progress::CdcTableBackfillTrackerRef;
36use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
37use crate::barrier::checkpoint::recovery::{
38    DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
39    RecoveringStateAction,
40};
41use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
42use crate::barrier::command::CommandContext;
43use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
44use crate::barrier::info::SharedActorInfos;
45use crate::barrier::notifier::Notifier;
46use crate::barrier::progress::TrackingJob;
47use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
48use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
49use crate::barrier::utils::{
50    NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
51};
52use crate::barrier::{BarrierKind, Command, CreateStreamingJobType};
53use crate::manager::MetaSrvEnv;
54use crate::rpc::metrics::GLOBAL_META_METRICS;
55use crate::stream::fill_snapshot_backfill_epoch;
56use crate::{MetaError, MetaResult};
57
58pub(crate) struct CheckpointControl {
59    pub(crate) env: MetaSrvEnv,
60    pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
61    pub(super) hummock_version_stats: HummockVersionStats,
62    /// The max barrier nums in flight
63    pub(crate) in_flight_barrier_nums: usize,
64}
65
66impl CheckpointControl {
67    pub fn new(env: MetaSrvEnv) -> Self {
68        Self {
69            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
70            env,
71            databases: Default::default(),
72            hummock_version_stats: Default::default(),
73        }
74    }
75
76    pub(crate) fn recover(
77        databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
78        failed_databases: HashSet<DatabaseId>,
79        control_stream_manager: &mut ControlStreamManager,
80        hummock_version_stats: HummockVersionStats,
81        env: MetaSrvEnv,
82    ) -> Self {
83        env.shared_actor_infos()
84            .retain_databases(databases.keys().chain(&failed_databases).cloned());
85        Self {
86            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
87            env,
88            databases: databases
89                .into_iter()
90                .map(|(database_id, control)| {
91                    (
92                        database_id,
93                        DatabaseCheckpointControlStatus::Running(control),
94                    )
95                })
96                .chain(failed_databases.into_iter().map(|database_id| {
97                    (
98                        database_id,
99                        DatabaseCheckpointControlStatus::Recovering(
100                            DatabaseRecoveringState::resetting(database_id, control_stream_manager),
101                        ),
102                    )
103                }))
104                .collect(),
105            hummock_version_stats,
106        }
107    }
108
109    pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
110        self.hummock_version_stats = output.hummock_version_stats;
111        for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
112            self.databases
113                .get_mut(&database_id)
114                .expect("should exist")
115                .expect_running("should have wait for completing command before enter recovery")
116                .ack_completed(command_prev_epoch, creating_job_epochs);
117        }
118    }
119
120    pub(crate) fn next_complete_barrier_task(
121        &mut self,
122        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
123    ) -> Option<CompleteBarrierTask> {
124        let mut task = None;
125        for database in self.databases.values_mut() {
126            let Some(database) = database.running_state_mut() else {
127                continue;
128            };
129            let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
130            database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
131        }
132        task
133    }
134
135    pub(crate) fn barrier_collected(
136        &mut self,
137        resp: BarrierCompleteResponse,
138        periodic_barriers: &mut PeriodicBarriers,
139    ) -> MetaResult<()> {
140        let database_id = resp.database_id;
141        let database_status = self.databases.get_mut(&database_id).expect("should exist");
142        match database_status {
143            DatabaseCheckpointControlStatus::Running(database) => {
144                database.barrier_collected(resp, periodic_barriers)
145            }
146            DatabaseCheckpointControlStatus::Recovering(state) => {
147                state.barrier_collected(database_id, resp);
148                Ok(())
149            }
150        }
151    }
152
153    pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
154        self.databases.iter().filter_map(|(database_id, database)| {
155            database.running_state().is_none().then_some(*database_id)
156        })
157    }
158
159    pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
160        self.databases.iter().filter_map(|(database_id, database)| {
161            database.running_state().is_some().then_some(*database_id)
162        })
163    }
164
165    /// return Some(failed `database_id` -> `err`)
166    pub(crate) fn handle_new_barrier(
167        &mut self,
168        new_barrier: NewBarrier,
169        control_stream_manager: &mut ControlStreamManager,
170    ) -> MetaResult<()> {
171        let NewBarrier {
172            database_id,
173            command,
174            span,
175            checkpoint,
176        } = new_barrier;
177
178        if let Some((mut command, notifiers)) = command {
179            if let &mut Command::CreateStreamingJob {
180                ref mut cross_db_snapshot_backfill_info,
181                ref info,
182                ..
183            } = &mut command
184            {
185                for (table_id, snapshot_epoch) in
186                    &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
187                {
188                    for database in self.databases.values() {
189                        if let Some(database) = database.running_state()
190                            && database
191                                .state
192                                .database_info
193                                .contains_job(table_id.as_job_id())
194                        {
195                            if let Some(committed_epoch) = database.committed_epoch {
196                                *snapshot_epoch = Some(committed_epoch);
197                            }
198                            break;
199                        }
200                    }
201                    if snapshot_epoch.is_none() {
202                        let table_id = *table_id;
203                        warn!(
204                            ?cross_db_snapshot_backfill_info,
205                            ?table_id,
206                            ?info,
207                            "database of cross db upstream table not found"
208                        );
209                        let err: MetaError =
210                            anyhow!("database of cross db upstream table {} not found", table_id)
211                                .into();
212                        for notifier in notifiers {
213                            notifier.notify_start_failed(err.clone());
214                        }
215
216                        return Ok(());
217                    }
218                }
219            }
220
221            let database = match self.databases.entry(database_id) {
222                Entry::Occupied(entry) => entry
223                    .into_mut()
224                    .expect_running("should not have command when not running"),
225                Entry::Vacant(entry) => match &command {
226                    Command::CreateStreamingJob {
227                        job_type: CreateStreamingJobType::Normal,
228                        ..
229                    } => {
230                        let new_database = DatabaseCheckpointControl::new(
231                            database_id,
232                            self.env.shared_actor_infos().clone(),
233                            self.env.cdc_table_backfill_tracker(),
234                        );
235                        control_stream_manager.add_partial_graph(database_id, None);
236                        entry
237                            .insert(DatabaseCheckpointControlStatus::Running(new_database))
238                            .expect_running("just initialized as running")
239                    }
240                    Command::Flush | Command::Pause | Command::Resume => {
241                        for mut notifier in notifiers {
242                            notifier.notify_started();
243                            notifier.notify_collected();
244                        }
245                        warn!(?command, "skip command for empty database");
246                        return Ok(());
247                    }
248                    _ => {
249                        panic!(
250                            "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
251                            database_id, command
252                        )
253                    }
254                },
255            };
256
257            database.handle_new_barrier(
258                Some((command, notifiers)),
259                checkpoint,
260                span,
261                control_stream_manager,
262                &self.hummock_version_stats,
263            )
264        } else {
265            let database = match self.databases.entry(database_id) {
266                Entry::Occupied(entry) => entry.into_mut(),
267                Entry::Vacant(_) => {
268                    // If it does not exist in the HashMap yet, it means that the first streaming
269                    // job has not been created, and we do not need to send a barrier.
270                    return Ok(());
271                }
272            };
273            let Some(database) = database.running_state_mut() else {
274                // Skip new barrier for database which is not running.
275                return Ok(());
276            };
277            if !database.can_inject_barrier(self.in_flight_barrier_nums) {
278                // Skip new barrier with no explicit command when the database should pause inject additional barrier
279                return Ok(());
280            }
281            database.handle_new_barrier(
282                None,
283                checkpoint,
284                span,
285                control_stream_manager,
286                &self.hummock_version_stats,
287            )
288        }
289    }
290
291    pub(crate) fn update_barrier_nums_metrics(&self) {
292        self.databases
293            .values()
294            .flat_map(|database| database.running_state())
295            .for_each(|database| database.update_barrier_nums_metrics());
296    }
297
298    pub(crate) fn gen_ddl_progress(&self) -> HashMap<JobId, DdlProgress> {
299        let mut progress = HashMap::new();
300        for status in self.databases.values() {
301            let Some(database_checkpoint_control) = status.running_state() else {
302                continue;
303            };
304            // Progress of normal backfill
305            progress.extend(
306                database_checkpoint_control
307                    .state
308                    .database_info
309                    .gen_ddl_progress(),
310            );
311            // Progress of snapshot backfill
312            for creating_job in database_checkpoint_control
313                .creating_streaming_job_controls
314                .values()
315            {
316                progress.extend([(creating_job.job_id, creating_job.gen_ddl_progress())]);
317            }
318        }
319        progress
320    }
321
322    pub(crate) fn databases_failed_at_worker_err(
323        &mut self,
324        worker_id: WorkerId,
325    ) -> Vec<DatabaseId> {
326        let mut failed_databases = Vec::new();
327        for (database_id, database_status) in &mut self.databases {
328            let database_checkpoint_control = match database_status {
329                DatabaseCheckpointControlStatus::Running(control) => control,
330                DatabaseCheckpointControlStatus::Recovering(state) => {
331                    if !state.is_valid_after_worker_err(worker_id) {
332                        failed_databases.push(*database_id);
333                    }
334                    continue;
335                }
336            };
337
338            if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
339                || database_checkpoint_control
340                    .state
341                    .database_info
342                    .contains_worker(worker_id as _)
343                || database_checkpoint_control
344                    .creating_streaming_job_controls
345                    .values_mut()
346                    .any(|job| !job.is_valid_after_worker_err(worker_id))
347            {
348                failed_databases.push(*database_id);
349            }
350        }
351        failed_databases
352    }
353
354    pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
355        for (_, node) in self.databases.values_mut().flat_map(|status| {
356            status
357                .running_state_mut()
358                .map(|database| take(&mut database.command_ctx_queue))
359                .into_iter()
360                .flatten()
361        }) {
362            for notifier in node.notifiers {
363                notifier.notify_collection_failed(err.clone());
364            }
365            node.enqueue_time.observe_duration();
366        }
367    }
368
369    pub(crate) fn inflight_infos(
370        &self,
371    ) -> impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId> + '_)> + '_ {
372        self.databases.iter().flat_map(|(database_id, database)| {
373            database.database_state().map(|(_, creating_jobs)| {
374                (
375                    *database_id,
376                    creating_jobs
377                        .values()
378                        .filter_map(|job| job.is_consuming().then_some(job.job_id)),
379                )
380            })
381        })
382    }
383}
384
385pub(crate) enum CheckpointControlEvent<'a> {
386    EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
387    EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
388}
389
390impl CheckpointControl {
391    pub(crate) fn on_reset_database_resp(
392        &mut self,
393        worker_id: WorkerId,
394        resp: ResetDatabaseResponse,
395    ) {
396        let database_id = resp.database_id;
397        match self.databases.get_mut(&database_id).expect("should exist") {
398            DatabaseCheckpointControlStatus::Running(_) => {
399                unreachable!("should not receive reset database resp when running")
400            }
401            DatabaseCheckpointControlStatus::Recovering(state) => {
402                state.on_reset_database_resp(worker_id, resp)
403            }
404        }
405    }
406
407    pub(crate) fn next_event(
408        &mut self,
409    ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
410        let mut this = Some(self);
411        poll_fn(move |cx| {
412            let Some(this_mut) = this.as_mut() else {
413                unreachable!("should not be polled after poll ready")
414            };
415            for (&database_id, database_status) in &mut this_mut.databases {
416                match database_status {
417                    DatabaseCheckpointControlStatus::Running(_) => {}
418                    DatabaseCheckpointControlStatus::Recovering(state) => {
419                        let poll_result = state.poll_next_event(cx);
420                        if let Poll::Ready(action) = poll_result {
421                            let this = this.take().expect("checked Some");
422                            return Poll::Ready(match action {
423                                RecoveringStateAction::EnterInitializing(reset_workers) => {
424                                    CheckpointControlEvent::EnteringInitializing(
425                                        this.new_database_status_action(
426                                            database_id,
427                                            EnterInitializing(reset_workers),
428                                        ),
429                                    )
430                                }
431                                RecoveringStateAction::EnterRunning => {
432                                    CheckpointControlEvent::EnteringRunning(
433                                        this.new_database_status_action(database_id, EnterRunning),
434                                    )
435                                }
436                            });
437                        }
438                    }
439                }
440            }
441            Poll::Pending
442        })
443    }
444}
445
446pub(crate) enum DatabaseCheckpointControlStatus {
447    Running(DatabaseCheckpointControl),
448    Recovering(DatabaseRecoveringState),
449}
450
451impl DatabaseCheckpointControlStatus {
452    fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
453        match self {
454            DatabaseCheckpointControlStatus::Running(state) => Some(state),
455            DatabaseCheckpointControlStatus::Recovering(_) => None,
456        }
457    }
458
459    fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
460        match self {
461            DatabaseCheckpointControlStatus::Running(state) => Some(state),
462            DatabaseCheckpointControlStatus::Recovering(_) => None,
463        }
464    }
465
466    fn database_state(
467        &self,
468    ) -> Option<(
469        &BarrierWorkerState,
470        &HashMap<JobId, CreatingStreamingJobControl>,
471    )> {
472        match self {
473            DatabaseCheckpointControlStatus::Running(control) => {
474                Some((&control.state, &control.creating_streaming_job_controls))
475            }
476            DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
477        }
478    }
479
480    fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
481        match self {
482            DatabaseCheckpointControlStatus::Running(state) => state,
483            DatabaseCheckpointControlStatus::Recovering(_) => {
484                panic!("should be at running: {}", reason)
485            }
486        }
487    }
488}
489
490struct DatabaseCheckpointControlMetrics {
491    barrier_latency: LabelGuardedHistogram,
492    in_flight_barrier_nums: LabelGuardedIntGauge,
493    all_barrier_nums: LabelGuardedIntGauge,
494}
495
496impl DatabaseCheckpointControlMetrics {
497    fn new(database_id: DatabaseId) -> Self {
498        let database_id_str = database_id.to_string();
499        let barrier_latency = GLOBAL_META_METRICS
500            .barrier_latency
501            .with_guarded_label_values(&[&database_id_str]);
502        let in_flight_barrier_nums = GLOBAL_META_METRICS
503            .in_flight_barrier_nums
504            .with_guarded_label_values(&[&database_id_str]);
505        let all_barrier_nums = GLOBAL_META_METRICS
506            .all_barrier_nums
507            .with_guarded_label_values(&[&database_id_str]);
508        Self {
509            barrier_latency,
510            in_flight_barrier_nums,
511            all_barrier_nums,
512        }
513    }
514}
515
516/// Controls the concurrent execution of commands.
517pub(crate) struct DatabaseCheckpointControl {
518    database_id: DatabaseId,
519    state: BarrierWorkerState,
520
521    /// Save the state and message of barrier in order.
522    /// Key is the `prev_epoch`.
523    command_ctx_queue: BTreeMap<u64, EpochNode>,
524    /// The barrier that are completing.
525    /// Some(`prev_epoch`)
526    completing_barrier: Option<u64>,
527
528    committed_epoch: Option<u64>,
529    creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
530
531    cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
532
533    metrics: DatabaseCheckpointControlMetrics,
534}
535
536impl DatabaseCheckpointControl {
537    fn new(
538        database_id: DatabaseId,
539        shared_actor_infos: SharedActorInfos,
540        cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
541    ) -> Self {
542        Self {
543            database_id,
544            state: BarrierWorkerState::new(database_id, shared_actor_infos),
545            command_ctx_queue: Default::default(),
546            completing_barrier: None,
547            committed_epoch: None,
548            creating_streaming_job_controls: Default::default(),
549            cdc_table_backfill_tracker,
550            metrics: DatabaseCheckpointControlMetrics::new(database_id),
551        }
552    }
553
554    pub(crate) fn recovery(
555        database_id: DatabaseId,
556        state: BarrierWorkerState,
557        committed_epoch: u64,
558        creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
559        cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
560    ) -> Self {
561        Self {
562            database_id,
563            state,
564            command_ctx_queue: Default::default(),
565            completing_barrier: None,
566            committed_epoch: Some(committed_epoch),
567            creating_streaming_job_controls,
568            cdc_table_backfill_tracker,
569            metrics: DatabaseCheckpointControlMetrics::new(database_id),
570        }
571    }
572
573    fn total_command_num(&self) -> usize {
574        self.command_ctx_queue.len()
575            + match &self.completing_barrier {
576                Some(_) => 1,
577                None => 0,
578            }
579    }
580
581    /// Update the metrics of barrier nums.
582    fn update_barrier_nums_metrics(&self) {
583        self.metrics.in_flight_barrier_nums.set(
584            self.command_ctx_queue
585                .values()
586                .filter(|x| x.state.is_inflight())
587                .count() as i64,
588        );
589        self.metrics
590            .all_barrier_nums
591            .set(self.total_command_num() as i64);
592    }
593
594    fn jobs_to_merge(&self) -> Option<HashMap<JobId, HashSet<TableId>>> {
595        let mut job_ids_to_merge = HashMap::new();
596
597        for (job_id, creating_streaming_job) in &self.creating_streaming_job_controls {
598            if let Some(upstream_table_ids) = creating_streaming_job.should_merge_to_upstream() {
599                job_ids_to_merge.insert(*job_id, upstream_table_ids.clone());
600            }
601        }
602        if job_ids_to_merge.is_empty() {
603            None
604        } else {
605            Some(job_ids_to_merge)
606        }
607    }
608
609    /// Enqueue a barrier command
610    fn enqueue_command(
611        &mut self,
612        command_ctx: CommandContext,
613        notifiers: Vec<Notifier>,
614        node_to_collect: NodeToCollect,
615        creating_jobs_to_wait: HashSet<JobId>,
616    ) {
617        let timer = self.metrics.barrier_latency.start_timer();
618
619        if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
620            assert_eq!(
621                command_ctx.barrier_info.prev_epoch.value(),
622                node.command_ctx.barrier_info.curr_epoch.value()
623            );
624        }
625
626        tracing::trace!(
627            prev_epoch = command_ctx.barrier_info.prev_epoch(),
628            ?creating_jobs_to_wait,
629            ?node_to_collect,
630            "enqueue command"
631        );
632        self.command_ctx_queue.insert(
633            command_ctx.barrier_info.prev_epoch(),
634            EpochNode {
635                enqueue_time: timer,
636                state: BarrierEpochState {
637                    node_to_collect,
638                    resps: vec![],
639                    creating_jobs_to_wait,
640                    finished_jobs: HashMap::new(),
641                },
642                command_ctx,
643                notifiers,
644            },
645        );
646    }
647
648    /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
649    /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
650    fn barrier_collected(
651        &mut self,
652        resp: BarrierCompleteResponse,
653        periodic_barriers: &mut PeriodicBarriers,
654    ) -> MetaResult<()> {
655        let worker_id = resp.worker_id;
656        let prev_epoch = resp.epoch;
657        tracing::trace!(
658            %worker_id,
659            prev_epoch,
660            partial_graph_id = resp.partial_graph_id,
661            "barrier collected"
662        );
663        let creating_job_id = from_partial_graph_id(resp.partial_graph_id);
664        match creating_job_id {
665            None => {
666                if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
667                    assert!(node.state.node_to_collect.remove(&worker_id).is_some());
668                    node.state.resps.push(resp);
669                } else {
670                    panic!(
671                        "collect barrier on non-existing barrier: {}, {}",
672                        prev_epoch, worker_id
673                    );
674                }
675            }
676            Some(creating_job_id) => {
677                let should_merge_to_upstream = self
678                    .creating_streaming_job_controls
679                    .get_mut(&creating_job_id)
680                    .expect("should exist")
681                    .collect(resp);
682                if should_merge_to_upstream {
683                    periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
684                }
685            }
686        }
687        Ok(())
688    }
689
690    /// Pause inject barrier until True.
691    fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
692        self.command_ctx_queue
693            .values()
694            .filter(|x| x.state.is_inflight())
695            .count()
696            < in_flight_barrier_nums
697    }
698
699    /// Return whether the database can still work after worker failure
700    pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
701        for epoch_node in self.command_ctx_queue.values_mut() {
702            if !is_valid_after_worker_err(&mut epoch_node.state.node_to_collect, worker_id) {
703                return false;
704            }
705        }
706        // TODO: include barrier in creating jobs
707        true
708    }
709}
710
711impl DatabaseCheckpointControl {
712    /// return creating job table fragment id -> (backfill progress epoch , {`upstream_mv_table_id`})
713    fn collect_backfill_pinned_upstream_log_epoch(
714        &self,
715    ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
716        self.creating_streaming_job_controls
717            .iter()
718            .filter_map(|(job_id, creating_job)| {
719                creating_job
720                    .pinned_upstream_log_epoch()
721                    .map(|progress_epoch| {
722                        (
723                            *job_id,
724                            (
725                                progress_epoch,
726                                creating_job.snapshot_backfill_upstream_tables.clone(),
727                            ),
728                        )
729                    })
730            })
731            .collect()
732    }
733
734    fn next_complete_barrier_task(
735        &mut self,
736        task: &mut Option<CompleteBarrierTask>,
737        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
738        hummock_version_stats: &HummockVersionStats,
739    ) {
740        // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
741        let mut creating_jobs_task = vec![];
742        {
743            // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
744            let mut finished_jobs = Vec::new();
745            let min_upstream_inflight_barrier = self
746                .command_ctx_queue
747                .first_key_value()
748                .map(|(epoch, _)| *epoch);
749            for (job_id, job) in &mut self.creating_streaming_job_controls {
750                if let Some((epoch, resps, status)) =
751                    job.start_completing(min_upstream_inflight_barrier)
752                {
753                    let is_first_time = match status {
754                        CompleteJobType::First => true,
755                        CompleteJobType::Normal => false,
756                        CompleteJobType::Finished => {
757                            finished_jobs.push((*job_id, epoch, resps));
758                            continue;
759                        }
760                    };
761                    creating_jobs_task.push((*job_id, epoch, resps, is_first_time));
762                }
763            }
764            if !finished_jobs.is_empty()
765                && let Some((_, control_stream_manager)) = &mut context
766            {
767                control_stream_manager.remove_partial_graph(
768                    self.database_id,
769                    finished_jobs.iter().map(|(job_id, _, _)| *job_id).collect(),
770                );
771            }
772            for (job_id, epoch, resps) in finished_jobs {
773                let epoch_state = &mut self
774                    .command_ctx_queue
775                    .get_mut(&epoch)
776                    .expect("should exist")
777                    .state;
778                assert!(epoch_state.creating_jobs_to_wait.remove(&job_id));
779                debug!(epoch, %job_id, "finish creating job");
780                // It's safe to remove the creating job, because on CompleteJobType::Finished,
781                // all previous barriers have been collected and completed.
782                let creating_streaming_job = self
783                    .creating_streaming_job_controls
784                    .remove(&job_id)
785                    .expect("should exist");
786                let tracking_job = creating_streaming_job.into_tracking_job();
787
788                assert!(
789                    epoch_state
790                        .finished_jobs
791                        .insert(job_id, (resps, tracking_job))
792                        .is_none()
793                );
794            }
795        }
796        assert!(self.completing_barrier.is_none());
797        while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
798            && !state.is_inflight()
799        {
800            {
801                let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
802                assert!(node.state.creating_jobs_to_wait.is_empty());
803                assert!(node.state.node_to_collect.is_empty());
804
805                self.handle_refresh_table_info(task, &node);
806
807                self.state.database_info.apply_collected_command(
808                    node.command_ctx.command.as_ref(),
809                    node.state.resps.iter(),
810                    hummock_version_stats,
811                );
812                let finished_cdc_backfill = self
813                    .cdc_table_backfill_tracker
814                    .apply_collected_command(&node.state.resps);
815                if !node.command_ctx.barrier_info.kind.is_checkpoint() {
816                    node.notifiers.into_iter().for_each(|notifier| {
817                        notifier.notify_collected();
818                    });
819                    if let Some((periodic_barriers, _)) = &mut context
820                        && self.state.database_info.has_pending_finished_jobs()
821                        && self
822                            .command_ctx_queue
823                            .values()
824                            .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
825                    {
826                        periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
827                    }
828                    continue;
829                }
830                let mut staging_commit_info = self.state.database_info.take_staging_commit_info();
831                node.state
832                    .finished_jobs
833                    .drain()
834                    .for_each(|(_job_id, (resps, tracking_job))| {
835                        node.state.resps.extend(resps);
836                        staging_commit_info.finished_jobs.push(tracking_job);
837                    });
838                let task = task.get_or_insert_default();
839                node.command_ctx.collect_commit_epoch_info(
840                    &mut task.commit_info,
841                    take(&mut node.state.resps),
842                    self.collect_backfill_pinned_upstream_log_epoch(),
843                );
844                self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
845                task.finished_jobs.extend(staging_commit_info.finished_jobs);
846                task.finished_cdc_table_backfill
847                    .extend(finished_cdc_backfill);
848                task.notifiers.extend(node.notifiers);
849                task.epoch_infos
850                    .try_insert(
851                        self.database_id,
852                        (Some((node.command_ctx, node.enqueue_time)), vec![]),
853                    )
854                    .expect("non duplicate");
855                task.commit_info
856                    .truncate_tables
857                    .extend(staging_commit_info.table_ids_to_truncate);
858                break;
859            }
860        }
861        if !creating_jobs_task.is_empty() {
862            let task = task.get_or_insert_default();
863            for (job_id, epoch, resps, is_first_time) in creating_jobs_task {
864                collect_creating_job_commit_epoch_info(
865                    &mut task.commit_info,
866                    epoch,
867                    resps,
868                    self.creating_streaming_job_controls[&job_id]
869                        .state_table_ids()
870                        .iter()
871                        .copied(),
872                    is_first_time,
873                );
874                let (_, creating_job_epochs) =
875                    task.epoch_infos.entry(self.database_id).or_default();
876                creating_job_epochs.push((job_id, epoch));
877            }
878        }
879    }
880
881    fn ack_completed(
882        &mut self,
883        command_prev_epoch: Option<u64>,
884        creating_job_epochs: Vec<(JobId, u64)>,
885    ) {
886        {
887            if let Some(prev_epoch) = self.completing_barrier.take() {
888                assert_eq!(command_prev_epoch, Some(prev_epoch));
889                self.committed_epoch = Some(prev_epoch);
890            } else {
891                assert_eq!(command_prev_epoch, None);
892            };
893            for (job_id, epoch) in creating_job_epochs {
894                self.creating_streaming_job_controls
895                    .get_mut(&job_id)
896                    .expect("should exist")
897                    .ack_completed(epoch)
898            }
899        }
900    }
901
902    fn handle_refresh_table_info(&self, task: &mut Option<CompleteBarrierTask>, node: &EpochNode) {
903        let list_finished_info = node
904            .state
905            .resps
906            .iter()
907            .flat_map(|resp| resp.list_finished_sources.clone())
908            .collect::<Vec<_>>();
909        if !list_finished_info.is_empty() {
910            let task = task.get_or_insert_default();
911            task.list_finished_source_ids.extend(list_finished_info);
912        }
913
914        let load_finished_info = node
915            .state
916            .resps
917            .iter()
918            .flat_map(|resp| resp.load_finished_sources.clone())
919            .collect::<Vec<_>>();
920        if !load_finished_info.is_empty() {
921            let task = task.get_or_insert_default();
922            task.load_finished_source_ids.extend(load_finished_info);
923        }
924
925        let refresh_finished_table_ids: Vec<JobId> = node
926            .state
927            .resps
928            .iter()
929            .flat_map(|resp| {
930                resp.refresh_finished_tables
931                    .iter()
932                    .map(|table_id| table_id.as_job_id())
933            })
934            .collect::<Vec<_>>();
935        if !refresh_finished_table_ids.is_empty() {
936            let task = task.get_or_insert_default();
937            task.refresh_finished_table_job_ids
938                .extend(refresh_finished_table_ids);
939        }
940    }
941}
942
943/// The state and message of this barrier, a node for concurrent checkpoint.
944struct EpochNode {
945    /// Timer for recording barrier latency, taken after `complete_barriers`.
946    enqueue_time: HistogramTimer,
947
948    /// Whether this barrier is in-flight or completed.
949    state: BarrierEpochState,
950
951    /// Context of this command to generate barrier and do some post jobs.
952    command_ctx: CommandContext,
953    /// Notifiers of this barrier.
954    notifiers: Vec<Notifier>,
955}
956
957#[derive(Debug)]
958/// The state of barrier.
959struct BarrierEpochState {
960    node_to_collect: NodeToCollect,
961
962    resps: Vec<BarrierCompleteResponse>,
963
964    creating_jobs_to_wait: HashSet<JobId>,
965
966    finished_jobs: HashMap<JobId, (Vec<BarrierCompleteResponse>, TrackingJob)>,
967}
968
969impl BarrierEpochState {
970    fn is_inflight(&self) -> bool {
971        !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
972    }
973}
974
975impl DatabaseCheckpointControl {
976    /// Handle the new barrier from the scheduled queue and inject it.
977    fn handle_new_barrier(
978        &mut self,
979        command: Option<(Command, Vec<Notifier>)>,
980        checkpoint: bool,
981        span: tracing::Span,
982        control_stream_manager: &mut ControlStreamManager,
983        hummock_version_stats: &HummockVersionStats,
984    ) -> MetaResult<()> {
985        let curr_epoch = self.state.in_flight_prev_epoch().next();
986
987        let (mut command, mut notifiers) = if let Some((command, notifiers)) = command {
988            (Some(command), notifiers)
989        } else {
990            (None, vec![])
991        };
992
993        for job_to_cancel in command
994            .as_ref()
995            .map(Command::jobs_to_drop)
996            .into_iter()
997            .flatten()
998        {
999            if self
1000                .creating_streaming_job_controls
1001                .contains_key(&job_to_cancel)
1002            {
1003                warn!(
1004                    job_id = %job_to_cancel,
1005                    "ignore cancel command on creating streaming job"
1006                );
1007                for notifier in notifiers {
1008                    notifier
1009                        .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
1010                }
1011                return Ok(());
1012            }
1013        }
1014
1015        if let Some(Command::RescheduleFragment { .. }) = &command
1016            && !self.creating_streaming_job_controls.is_empty()
1017        {
1018            warn!("ignore reschedule when creating streaming job with snapshot backfill");
1019            for notifier in notifiers {
1020                notifier.notify_start_failed(
1021                    anyhow!(
1022                            "cannot reschedule when creating streaming job with snapshot backfill",
1023                        )
1024                        .into(),
1025                );
1026            }
1027            return Ok(());
1028        }
1029
1030        let Some(barrier_info) =
1031            self.state
1032                .next_barrier_info(command.as_ref(), checkpoint, curr_epoch)
1033        else {
1034            // skip the command when there is nothing to do with the barrier
1035            for mut notifier in notifiers {
1036                notifier.notify_started();
1037                notifier.notify_collected();
1038            }
1039            return Ok(());
1040        };
1041
1042        let mut edges = self
1043            .state
1044            .database_info
1045            .build_edge(command.as_ref(), &*control_stream_manager);
1046
1047        // Insert newly added creating job
1048        if let Some(Command::CreateStreamingJob {
1049            job_type,
1050            info,
1051            cross_db_snapshot_backfill_info,
1052        }) = &mut command
1053        {
1054            match job_type {
1055                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
1056                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1057                        fill_snapshot_backfill_epoch(
1058                            &mut fragment.nodes,
1059                            None,
1060                            cross_db_snapshot_backfill_info,
1061                        )?;
1062                    }
1063                }
1064                CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
1065                    if self.state.is_paused() {
1066                        warn!("cannot create streaming job with snapshot backfill when paused");
1067                        for notifier in notifiers {
1068                            notifier.notify_start_failed(
1069                                anyhow!("cannot create streaming job with snapshot backfill when paused",)
1070                                    .into(),
1071                            );
1072                        }
1073                        return Ok(());
1074                    }
1075                    // set snapshot epoch of upstream table for snapshot backfill
1076                    for snapshot_backfill_epoch in snapshot_backfill_info
1077                        .upstream_mv_table_id_to_backfill_epoch
1078                        .values_mut()
1079                    {
1080                        assert_eq!(
1081                            snapshot_backfill_epoch.replace(barrier_info.prev_epoch()),
1082                            None,
1083                            "must not set previously"
1084                        );
1085                    }
1086                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1087                        if let Err(e) = fill_snapshot_backfill_epoch(
1088                            &mut fragment.nodes,
1089                            Some(snapshot_backfill_info),
1090                            cross_db_snapshot_backfill_info,
1091                        ) {
1092                            warn!(e = %e.as_report(), "failed to fill snapshot backfill epoch");
1093                            for notifier in notifiers {
1094                                notifier.notify_start_failed(e.clone());
1095                            }
1096                            return Ok(());
1097                        };
1098                    }
1099                    let job_id = info.stream_job_fragments.stream_job_id();
1100                    let snapshot_backfill_upstream_tables = snapshot_backfill_info
1101                        .upstream_mv_table_id_to_backfill_epoch
1102                        .keys()
1103                        .cloned()
1104                        .collect();
1105
1106                    let job = CreatingStreamingJobControl::new(
1107                        info,
1108                        snapshot_backfill_upstream_tables,
1109                        barrier_info.prev_epoch(),
1110                        hummock_version_stats,
1111                        control_stream_manager,
1112                        edges.as_mut().expect("should exist"),
1113                    )?;
1114
1115                    self.state
1116                        .database_info
1117                        .shared_actor_infos
1118                        .upsert(self.database_id, job.fragment_infos_with_job_id());
1119
1120                    self.creating_streaming_job_controls.insert(job_id, job);
1121                }
1122            }
1123        }
1124
1125        // Collect the jobs to finish
1126        if let (BarrierKind::Checkpoint(_), None) = (&barrier_info.kind, &command) {
1127            if let Some(jobs_to_merge) = self.jobs_to_merge() {
1128                command = Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge));
1129            } else {
1130                let pending_backfill_nodes = self.state.database_info.take_pending_backfill_nodes();
1131                if !pending_backfill_nodes.is_empty() {
1132                    command = Some(Command::StartFragmentBackfill {
1133                        fragment_ids: pending_backfill_nodes,
1134                    });
1135                }
1136            }
1137        }
1138
1139        let mut finished_snapshot_backfill_job_fragments = HashMap::new();
1140        for (job_id, creating_job) in &mut self.creating_streaming_job_controls {
1141            if let Some(finished_job_fragments) = creating_job.on_new_command(
1142                control_stream_manager,
1143                command.as_ref(),
1144                &barrier_info,
1145            )? {
1146                finished_snapshot_backfill_job_fragments.insert(*job_id, finished_job_fragments);
1147            }
1148        }
1149
1150        let command = command;
1151
1152        let ApplyCommandInfo {
1153            node_actors,
1154            actors_to_create,
1155            mv_subscription_max_retention,
1156            table_ids_to_commit,
1157            table_ids_to_sync,
1158            jobs_to_wait,
1159            mutation,
1160        } = self.state.apply_command(
1161            command.as_ref(),
1162            finished_snapshot_backfill_job_fragments,
1163            &mut edges,
1164            control_stream_manager,
1165        );
1166
1167        // Tracing related stuff
1168        barrier_info.prev_epoch.span().in_scope(|| {
1169            tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch.value().0, "new barrier enqueued");
1170        });
1171        span.record("epoch", barrier_info.curr_epoch.value().0);
1172        let node_to_collect = match control_stream_manager.inject_barrier(
1173            self.database_id,
1174            None,
1175            mutation,
1176            &barrier_info,
1177            &node_actors,
1178            table_ids_to_sync.iter().copied(),
1179            actors_to_create,
1180        ) {
1181            Ok(node_to_collect) => node_to_collect,
1182            Err(err) => {
1183                for notifier in notifiers {
1184                    notifier.notify_start_failed(err.clone());
1185                }
1186                fail_point!("inject_barrier_err_success");
1187                return Err(err);
1188            }
1189        };
1190
1191        // Notify about the injection.
1192        notifiers.iter_mut().for_each(|n| n.notify_started());
1193
1194        let command_ctx = CommandContext::new(
1195            barrier_info,
1196            mv_subscription_max_retention,
1197            table_ids_to_commit,
1198            command,
1199            span,
1200        );
1201
1202        // Record the in-flight barrier.
1203        self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1204
1205        Ok(())
1206    }
1207}