risingwave_meta/barrier/checkpoint/
control.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
26use risingwave_meta_model::WorkerId;
27use risingwave_pb::ddl_service::DdlProgress;
28use risingwave_pb::hummock::HummockVersionStats;
29use risingwave_pb::stream_service::BarrierCompleteResponse;
30use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
31use thiserror_ext::AsReport;
32use tracing::{debug, warn};
33
34use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
35use crate::barrier::checkpoint::recovery::{
36    DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
37    RecoveringStateAction,
38};
39use crate::barrier::checkpoint::state::BarrierWorkerState;
40use crate::barrier::command::CommandContext;
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
42use crate::barrier::info::InflightStreamingJobInfo;
43use crate::barrier::notifier::Notifier;
44use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob};
45use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
46use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
47use crate::barrier::utils::{
48    NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
49};
50use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, InflightSubscriptionInfo};
51use crate::manager::MetaSrvEnv;
52use crate::rpc::metrics::GLOBAL_META_METRICS;
53use crate::stream::fill_snapshot_backfill_epoch;
54use crate::{MetaError, MetaResult};
55
56pub(crate) struct CheckpointControl {
57    pub(crate) env: MetaSrvEnv,
58    pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
59    pub(super) hummock_version_stats: HummockVersionStats,
60}
61
62impl CheckpointControl {
63    pub fn new(env: MetaSrvEnv) -> Self {
64        Self {
65            env,
66            databases: Default::default(),
67            hummock_version_stats: Default::default(),
68        }
69    }
70
71    pub(crate) fn recover(
72        databases: impl IntoIterator<Item = (DatabaseId, DatabaseCheckpointControl)>,
73        failed_databases: HashSet<DatabaseId>,
74        control_stream_manager: &mut ControlStreamManager,
75        hummock_version_stats: HummockVersionStats,
76        env: MetaSrvEnv,
77    ) -> Self {
78        Self {
79            env,
80            databases: databases
81                .into_iter()
82                .map(|(database_id, control)| {
83                    (
84                        database_id,
85                        DatabaseCheckpointControlStatus::Running(control),
86                    )
87                })
88                .chain(failed_databases.into_iter().map(|database_id| {
89                    (
90                        database_id,
91                        DatabaseCheckpointControlStatus::Recovering(
92                            DatabaseRecoveringState::resetting(database_id, control_stream_manager),
93                        ),
94                    )
95                }))
96                .collect(),
97            hummock_version_stats,
98        }
99    }
100
101    pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
102        self.hummock_version_stats = output.hummock_version_stats;
103        for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
104            self.databases
105                .get_mut(&database_id)
106                .expect("should exist")
107                .expect_running("should have wait for completing command before enter recovery")
108                .ack_completed(command_prev_epoch, creating_job_epochs);
109        }
110    }
111
112    pub(crate) fn next_complete_barrier_task(
113        &mut self,
114        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
115    ) -> Option<CompleteBarrierTask> {
116        let mut task = None;
117        for database in self.databases.values_mut() {
118            let Some(database) = database.running_state_mut() else {
119                continue;
120            };
121            let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
122            database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
123        }
124        task
125    }
126
127    pub(crate) fn barrier_collected(
128        &mut self,
129        resp: BarrierCompleteResponse,
130        periodic_barriers: &mut PeriodicBarriers,
131    ) -> MetaResult<()> {
132        let database_id = DatabaseId::new(resp.database_id);
133        let database_status = self.databases.get_mut(&database_id).expect("should exist");
134        match database_status {
135            DatabaseCheckpointControlStatus::Running(database) => {
136                database.barrier_collected(resp, periodic_barriers)
137            }
138            DatabaseCheckpointControlStatus::Recovering(state) => {
139                state.barrier_collected(database_id, resp);
140                Ok(())
141            }
142        }
143    }
144
145    pub(crate) fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
146        self.databases.values().all(|database| {
147            database
148                .running_state()
149                .map(|database| database.can_inject_barrier(in_flight_barrier_nums))
150                .unwrap_or(true)
151        })
152    }
153
154    pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
155        self.databases.iter().filter_map(|(database_id, database)| {
156            database.running_state().is_none().then_some(*database_id)
157        })
158    }
159
160    pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
161        self.databases.iter().filter_map(|(database_id, database)| {
162            database.running_state().is_some().then_some(*database_id)
163        })
164    }
165
166    /// return Some(failed `database_id` -> `err`)
167    pub(crate) fn handle_new_barrier(
168        &mut self,
169        new_barrier: NewBarrier,
170        control_stream_manager: &mut ControlStreamManager,
171    ) -> MetaResult<()> {
172        let NewBarrier {
173            database_id,
174            command,
175            span,
176            checkpoint,
177        } = new_barrier;
178
179        if let Some((mut command, notifiers)) = command {
180            if let &mut Command::CreateStreamingJob {
181                ref mut cross_db_snapshot_backfill_info,
182                ref info,
183                ..
184            } = &mut command
185            {
186                for (table_id, snapshot_epoch) in
187                    &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
188                {
189                    for database in self.databases.values() {
190                        if let Some(database) = database.running_state()
191                            && database.state.inflight_graph_info.contains_job(*table_id)
192                        {
193                            if let Some(committed_epoch) = database.committed_epoch {
194                                *snapshot_epoch = Some(committed_epoch);
195                            }
196                            break;
197                        }
198                    }
199                    if snapshot_epoch.is_none() {
200                        let table_id = *table_id;
201                        warn!(
202                            ?cross_db_snapshot_backfill_info,
203                            ?table_id,
204                            ?info,
205                            "database of cross db upstream table not found"
206                        );
207                        let err: MetaError =
208                            anyhow!("database of cross db upstream table {} not found", table_id)
209                                .into();
210                        for notifier in notifiers {
211                            notifier.notify_start_failed(err.clone());
212                        }
213
214                        return Ok(());
215                    }
216                }
217            }
218
219            let database = match self.databases.entry(database_id) {
220                Entry::Occupied(entry) => entry
221                    .into_mut()
222                    .expect_running("should not have command when not running"),
223                Entry::Vacant(entry) => match &command {
224                    Command::CreateStreamingJob {
225                        job_type: CreateStreamingJobType::Normal,
226                        ..
227                    } => {
228                        let new_database = DatabaseCheckpointControl::new(database_id);
229                        control_stream_manager.add_partial_graph(database_id, None);
230                        entry
231                            .insert(DatabaseCheckpointControlStatus::Running(new_database))
232                            .expect_running("just initialized as running")
233                    }
234                    Command::Flush | Command::Pause | Command::Resume => {
235                        for mut notifier in notifiers {
236                            notifier.notify_started();
237                            notifier.notify_collected();
238                        }
239                        warn!(?command, "skip command for empty database");
240                        return Ok(());
241                    }
242                    _ => {
243                        panic!(
244                            "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
245                            database_id, command
246                        )
247                    }
248                },
249            };
250
251            database.handle_new_barrier(
252                Some((command, notifiers)),
253                checkpoint,
254                span.clone(),
255                control_stream_manager,
256                &self.hummock_version_stats,
257            )
258        } else {
259            let database = match self.databases.entry(database_id) {
260                Entry::Occupied(entry) => entry.into_mut(),
261                Entry::Vacant(_) => {
262                    // If it does not exist in the HashMap yet, it means that the first streaming
263                    // job has not been created, and we do not need to send a barrier.
264                    return Ok(());
265                }
266            };
267            let Some(database) = database.running_state_mut() else {
268                // Skip new barrier for database which is not running.
269                return Ok(());
270            };
271            database.handle_new_barrier(
272                None,
273                checkpoint,
274                span.clone(),
275                control_stream_manager,
276                &self.hummock_version_stats,
277            )
278        }
279    }
280
281    pub(crate) fn update_barrier_nums_metrics(&self) {
282        self.databases
283            .values()
284            .flat_map(|database| database.running_state())
285            .for_each(|database| database.update_barrier_nums_metrics());
286    }
287
288    pub(crate) fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
289        let mut progress = HashMap::new();
290        for status in self.databases.values() {
291            let Some(database_checkpoint_control) = status.running_state() else {
292                continue;
293            };
294            // Progress of normal backfill
295            progress.extend(
296                database_checkpoint_control
297                    .create_mview_tracker
298                    .gen_ddl_progress(),
299            );
300            // Progress of snapshot backfill
301            for creating_job in database_checkpoint_control
302                .creating_streaming_job_controls
303                .values()
304            {
305                progress.extend([(
306                    creating_job.job_id.table_id,
307                    creating_job.gen_ddl_progress(),
308                )]);
309            }
310        }
311        progress
312    }
313
314    pub(crate) fn databases_failed_at_worker_err(
315        &mut self,
316        worker_id: WorkerId,
317    ) -> Vec<DatabaseId> {
318        let mut failed_databases = Vec::new();
319        for (database_id, database_status) in &mut self.databases {
320            let database_checkpoint_control = match database_status {
321                DatabaseCheckpointControlStatus::Running(control) => control,
322                DatabaseCheckpointControlStatus::Recovering(state) => {
323                    if !state.is_valid_after_worker_err(worker_id) {
324                        failed_databases.push(*database_id);
325                    }
326                    continue;
327                }
328            };
329
330            if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
331                || database_checkpoint_control
332                    .state
333                    .inflight_graph_info
334                    .contains_worker(worker_id as _)
335                || database_checkpoint_control
336                    .creating_streaming_job_controls
337                    .values_mut()
338                    .any(|job| !job.is_valid_after_worker_err(worker_id))
339            {
340                failed_databases.push(*database_id);
341            }
342        }
343        failed_databases
344    }
345
346    pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
347        for (_, node) in self.databases.values_mut().flat_map(|status| {
348            status
349                .running_state_mut()
350                .map(|database| take(&mut database.command_ctx_queue))
351                .into_iter()
352                .flatten()
353        }) {
354            for notifier in node.notifiers {
355                notifier.notify_collection_failed(err.clone());
356            }
357            node.enqueue_time.observe_duration();
358        }
359        self.databases.values_mut().for_each(|database| {
360            if let Some(database) = database.running_state_mut() {
361                database.create_mview_tracker.abort_all()
362            }
363        });
364    }
365
366    pub(crate) fn inflight_infos(
367        &self,
368    ) -> impl Iterator<
369        Item = (
370            DatabaseId,
371            &InflightSubscriptionInfo,
372            impl Iterator<Item = TableId> + '_,
373        ),
374    > + '_ {
375        self.databases.iter().flat_map(|(database_id, database)| {
376            database
377                .database_state()
378                .map(|(database_state, creating_jobs)| {
379                    (
380                        *database_id,
381                        &database_state.inflight_subscription_info,
382                        creating_jobs
383                            .values()
384                            .filter_map(|job| job.is_consuming().then_some(job.job_id)),
385                    )
386                })
387        })
388    }
389}
390
391pub(crate) enum CheckpointControlEvent<'a> {
392    EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
393    EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
394}
395
396impl CheckpointControl {
397    pub(crate) fn on_reset_database_resp(
398        &mut self,
399        worker_id: WorkerId,
400        resp: ResetDatabaseResponse,
401    ) {
402        let database_id = DatabaseId::new(resp.database_id);
403        match self.databases.get_mut(&database_id).expect("should exist") {
404            DatabaseCheckpointControlStatus::Running(_) => {
405                unreachable!("should not receive reset database resp when running")
406            }
407            DatabaseCheckpointControlStatus::Recovering(state) => {
408                state.on_reset_database_resp(worker_id, resp)
409            }
410        }
411    }
412
413    pub(crate) fn next_event(
414        &mut self,
415    ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
416        let mut this = Some(self);
417        poll_fn(move |cx| {
418            let Some(this_mut) = this.as_mut() else {
419                unreachable!("should not be polled after poll ready")
420            };
421            for (&database_id, database_status) in &mut this_mut.databases {
422                match database_status {
423                    DatabaseCheckpointControlStatus::Running(_) => {}
424                    DatabaseCheckpointControlStatus::Recovering(state) => {
425                        let poll_result = state.poll_next_event(cx);
426                        if let Poll::Ready(action) = poll_result {
427                            let this = this.take().expect("checked Some");
428                            return Poll::Ready(match action {
429                                RecoveringStateAction::EnterInitializing(reset_workers) => {
430                                    CheckpointControlEvent::EnteringInitializing(
431                                        this.new_database_status_action(
432                                            database_id,
433                                            EnterInitializing(reset_workers),
434                                        ),
435                                    )
436                                }
437                                RecoveringStateAction::EnterRunning => {
438                                    CheckpointControlEvent::EnteringRunning(
439                                        this.new_database_status_action(database_id, EnterRunning),
440                                    )
441                                }
442                            });
443                        }
444                    }
445                }
446            }
447            Poll::Pending
448        })
449    }
450}
451
452pub(crate) enum DatabaseCheckpointControlStatus {
453    Running(DatabaseCheckpointControl),
454    Recovering(DatabaseRecoveringState),
455}
456
457impl DatabaseCheckpointControlStatus {
458    fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
459        match self {
460            DatabaseCheckpointControlStatus::Running(state) => Some(state),
461            DatabaseCheckpointControlStatus::Recovering(_) => None,
462        }
463    }
464
465    fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
466        match self {
467            DatabaseCheckpointControlStatus::Running(state) => Some(state),
468            DatabaseCheckpointControlStatus::Recovering(_) => None,
469        }
470    }
471
472    fn database_state(
473        &self,
474    ) -> Option<(
475        &BarrierWorkerState,
476        &HashMap<TableId, CreatingStreamingJobControl>,
477    )> {
478        match self {
479            DatabaseCheckpointControlStatus::Running(control) => {
480                Some((&control.state, &control.creating_streaming_job_controls))
481            }
482            DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
483        }
484    }
485
486    fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
487        match self {
488            DatabaseCheckpointControlStatus::Running(state) => state,
489            DatabaseCheckpointControlStatus::Recovering(_) => {
490                panic!("should be at running: {}", reason)
491            }
492        }
493    }
494}
495
496struct DatabaseCheckpointControlMetrics {
497    barrier_latency: LabelGuardedHistogram,
498    in_flight_barrier_nums: LabelGuardedIntGauge,
499    all_barrier_nums: LabelGuardedIntGauge,
500}
501
502impl DatabaseCheckpointControlMetrics {
503    fn new(database_id: DatabaseId) -> Self {
504        let database_id_str = database_id.database_id.to_string();
505        let barrier_latency = GLOBAL_META_METRICS
506            .barrier_latency
507            .with_guarded_label_values(&[&database_id_str]);
508        let in_flight_barrier_nums = GLOBAL_META_METRICS
509            .in_flight_barrier_nums
510            .with_guarded_label_values(&[&database_id_str]);
511        let all_barrier_nums = GLOBAL_META_METRICS
512            .all_barrier_nums
513            .with_guarded_label_values(&[&database_id_str]);
514        Self {
515            barrier_latency,
516            in_flight_barrier_nums,
517            all_barrier_nums,
518        }
519    }
520}
521
522/// Controls the concurrent execution of commands.
523pub(crate) struct DatabaseCheckpointControl {
524    database_id: DatabaseId,
525    state: BarrierWorkerState,
526
527    /// Save the state and message of barrier in order.
528    /// Key is the `prev_epoch`.
529    command_ctx_queue: BTreeMap<u64, EpochNode>,
530    /// The barrier that are completing.
531    /// Some(`prev_epoch`)
532    completing_barrier: Option<u64>,
533
534    committed_epoch: Option<u64>,
535    creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
536
537    create_mview_tracker: CreateMviewProgressTracker,
538
539    metrics: DatabaseCheckpointControlMetrics,
540}
541
542impl DatabaseCheckpointControl {
543    fn new(database_id: DatabaseId) -> Self {
544        Self {
545            database_id,
546            state: BarrierWorkerState::new(),
547            command_ctx_queue: Default::default(),
548            completing_barrier: None,
549            committed_epoch: None,
550            creating_streaming_job_controls: Default::default(),
551            create_mview_tracker: Default::default(),
552            metrics: DatabaseCheckpointControlMetrics::new(database_id),
553        }
554    }
555
556    pub(crate) fn recovery(
557        database_id: DatabaseId,
558        create_mview_tracker: CreateMviewProgressTracker,
559        state: BarrierWorkerState,
560        committed_epoch: u64,
561        creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
562    ) -> Self {
563        Self {
564            database_id,
565            state,
566            command_ctx_queue: Default::default(),
567            completing_barrier: None,
568            committed_epoch: Some(committed_epoch),
569            creating_streaming_job_controls,
570            create_mview_tracker,
571            metrics: DatabaseCheckpointControlMetrics::new(database_id),
572        }
573    }
574
575    fn total_command_num(&self) -> usize {
576        self.command_ctx_queue.len()
577            + match &self.completing_barrier {
578                Some(_) => 1,
579                None => 0,
580            }
581    }
582
583    /// Update the metrics of barrier nums.
584    fn update_barrier_nums_metrics(&self) {
585        self.metrics.in_flight_barrier_nums.set(
586            self.command_ctx_queue
587                .values()
588                .filter(|x| x.state.is_inflight())
589                .count() as i64,
590        );
591        self.metrics
592            .all_barrier_nums
593            .set(self.total_command_num() as i64);
594    }
595
596    fn jobs_to_merge(
597        &self,
598    ) -> Option<HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>> {
599        let mut table_ids_to_merge = HashMap::new();
600
601        for (table_id, creating_streaming_job) in &self.creating_streaming_job_controls {
602            if let Some(graph_info) = creating_streaming_job.should_merge_to_upstream() {
603                table_ids_to_merge.insert(
604                    *table_id,
605                    (
606                        creating_streaming_job
607                            .snapshot_backfill_upstream_tables
608                            .clone(),
609                        graph_info.clone(),
610                    ),
611                );
612            }
613        }
614        if table_ids_to_merge.is_empty() {
615            None
616        } else {
617            Some(table_ids_to_merge)
618        }
619    }
620
621    /// Enqueue a barrier command
622    fn enqueue_command(
623        &mut self,
624        command_ctx: CommandContext,
625        notifiers: Vec<Notifier>,
626        node_to_collect: NodeToCollect,
627        creating_jobs_to_wait: HashSet<TableId>,
628    ) {
629        let timer = self.metrics.barrier_latency.start_timer();
630
631        if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
632            assert_eq!(
633                command_ctx.barrier_info.prev_epoch.value(),
634                node.command_ctx.barrier_info.curr_epoch.value()
635            );
636        }
637
638        tracing::trace!(
639            prev_epoch = command_ctx.barrier_info.prev_epoch(),
640            ?creating_jobs_to_wait,
641            ?node_to_collect,
642            "enqueue command"
643        );
644        self.command_ctx_queue.insert(
645            command_ctx.barrier_info.prev_epoch(),
646            EpochNode {
647                enqueue_time: timer,
648                state: BarrierEpochState {
649                    node_to_collect,
650                    resps: vec![],
651                    creating_jobs_to_wait,
652                    finished_jobs: HashMap::new(),
653                },
654                command_ctx,
655                notifiers,
656            },
657        );
658    }
659
660    /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
661    /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
662    fn barrier_collected(
663        &mut self,
664        resp: BarrierCompleteResponse,
665        periodic_barriers: &mut PeriodicBarriers,
666    ) -> MetaResult<()> {
667        let worker_id = resp.worker_id;
668        let prev_epoch = resp.epoch;
669        tracing::trace!(
670            worker_id,
671            prev_epoch,
672            partial_graph_id = resp.partial_graph_id,
673            "barrier collected"
674        );
675        let creating_job_id = from_partial_graph_id(resp.partial_graph_id);
676        match creating_job_id {
677            None => {
678                if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
679                    assert!(
680                        node.state
681                            .node_to_collect
682                            .remove(&(worker_id as _))
683                            .is_some()
684                    );
685                    node.state.resps.push(resp);
686                } else {
687                    panic!(
688                        "collect barrier on non-existing barrier: {}, {}",
689                        prev_epoch, worker_id
690                    );
691                }
692            }
693            Some(creating_job_id) => {
694                let should_merge_to_upstream = self
695                    .creating_streaming_job_controls
696                    .get_mut(&creating_job_id)
697                    .expect("should exist")
698                    .collect(resp);
699                if should_merge_to_upstream {
700                    periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
701                }
702            }
703        }
704        Ok(())
705    }
706
707    /// Pause inject barrier until True.
708    fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
709        self.command_ctx_queue
710            .values()
711            .filter(|x| x.state.is_inflight())
712            .count()
713            < in_flight_barrier_nums
714    }
715
716    /// Return whether the database can still work after worker failure
717    pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
718        for epoch_node in self.command_ctx_queue.values_mut() {
719            if !is_valid_after_worker_err(&mut epoch_node.state.node_to_collect, worker_id) {
720                return false;
721            }
722        }
723        // TODO: include barrier in creating jobs
724        true
725    }
726}
727
728impl DatabaseCheckpointControl {
729    /// return creating job table fragment id -> (backfill progress epoch , {`upstream_mv_table_id`})
730    fn collect_backfill_pinned_upstream_log_epoch(
731        &self,
732    ) -> HashMap<TableId, (u64, HashSet<TableId>)> {
733        self.creating_streaming_job_controls
734            .iter()
735            .filter_map(|(table_id, creating_job)| {
736                creating_job
737                    .pinned_upstream_log_epoch()
738                    .map(|progress_epoch| {
739                        (
740                            *table_id,
741                            (
742                                progress_epoch,
743                                creating_job.snapshot_backfill_upstream_tables.clone(),
744                            ),
745                        )
746                    })
747            })
748            .collect()
749    }
750
751    fn next_complete_barrier_task(
752        &mut self,
753        task: &mut Option<CompleteBarrierTask>,
754        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
755        hummock_version_stats: &HummockVersionStats,
756    ) {
757        // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
758        let mut creating_jobs_task = vec![];
759        {
760            // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
761            let mut finished_jobs = Vec::new();
762            let min_upstream_inflight_barrier = self
763                .command_ctx_queue
764                .first_key_value()
765                .map(|(epoch, _)| *epoch);
766            for (table_id, job) in &mut self.creating_streaming_job_controls {
767                if let Some((epoch, resps, status)) =
768                    job.start_completing(min_upstream_inflight_barrier)
769                {
770                    let is_first_time = match status {
771                        CompleteJobType::First => true,
772                        CompleteJobType::Normal => false,
773                        CompleteJobType::Finished => {
774                            finished_jobs.push((*table_id, epoch, resps));
775                            continue;
776                        }
777                    };
778                    creating_jobs_task.push((*table_id, epoch, resps, is_first_time));
779                }
780            }
781            if !finished_jobs.is_empty()
782                && let Some((_, control_stream_manager)) = &mut context
783            {
784                control_stream_manager.remove_partial_graph(
785                    self.database_id,
786                    finished_jobs
787                        .iter()
788                        .map(|(table_id, _, _)| *table_id)
789                        .collect(),
790                );
791            }
792            for (table_id, epoch, resps) in finished_jobs {
793                let epoch_state = &mut self
794                    .command_ctx_queue
795                    .get_mut(&epoch)
796                    .expect("should exist")
797                    .state;
798                assert!(epoch_state.creating_jobs_to_wait.remove(&table_id));
799                debug!(epoch, ?table_id, "finish creating job");
800                // It's safe to remove the creating job, because on CompleteJobType::Finished,
801                // all previous barriers have been collected and completed.
802                let creating_streaming_job = self
803                    .creating_streaming_job_controls
804                    .remove(&table_id)
805                    .expect("should exist");
806                assert!(creating_streaming_job.is_finished());
807                assert!(epoch_state.finished_jobs.insert(table_id, resps).is_none());
808            }
809        }
810        assert!(self.completing_barrier.is_none());
811        while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
812            && !state.is_inflight()
813        {
814            {
815                let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
816                assert!(node.state.creating_jobs_to_wait.is_empty());
817                assert!(node.state.node_to_collect.is_empty());
818                let mut finished_jobs = self.create_mview_tracker.apply_collected_command(
819                    node.command_ctx.command.as_ref(),
820                    &node.command_ctx.barrier_info,
821                    &node.state.resps,
822                    hummock_version_stats,
823                );
824                if !node.command_ctx.barrier_info.kind.is_checkpoint() {
825                    assert!(finished_jobs.is_empty());
826                    node.notifiers.into_iter().for_each(|notifier| {
827                        notifier.notify_collected();
828                    });
829                    if let Some((periodic_barriers, _)) = &mut context
830                        && self.create_mview_tracker.has_pending_finished_jobs()
831                        && self
832                            .command_ctx_queue
833                            .values()
834                            .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
835                    {
836                        periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
837                    }
838                    continue;
839                }
840                node.state
841                    .finished_jobs
842                    .drain()
843                    .for_each(|(job_id, resps)| {
844                        node.state.resps.extend(resps);
845                        finished_jobs.push(TrackingJob::New(TrackingCommand {
846                            job_id,
847                            replace_stream_job: None,
848                        }));
849                    });
850                let task = task.get_or_insert_default();
851                node.command_ctx.collect_commit_epoch_info(
852                    &mut task.commit_info,
853                    take(&mut node.state.resps),
854                    self.collect_backfill_pinned_upstream_log_epoch(),
855                );
856                self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
857                task.finished_jobs.extend(finished_jobs);
858                task.notifiers.extend(node.notifiers);
859                task.epoch_infos
860                    .try_insert(
861                        self.database_id,
862                        (Some((node.command_ctx, node.enqueue_time)), vec![]),
863                    )
864                    .expect("non duplicate");
865                break;
866            }
867        }
868        if !creating_jobs_task.is_empty() {
869            let task = task.get_or_insert_default();
870            for (table_id, epoch, resps, is_first_time) in creating_jobs_task {
871                collect_creating_job_commit_epoch_info(
872                    &mut task.commit_info,
873                    epoch,
874                    resps,
875                    self.creating_streaming_job_controls[&table_id].state_table_ids(),
876                    is_first_time,
877                );
878                let (_, creating_job_epochs) =
879                    task.epoch_infos.entry(self.database_id).or_default();
880                creating_job_epochs.push((table_id, epoch));
881            }
882        }
883    }
884
885    fn ack_completed(
886        &mut self,
887        command_prev_epoch: Option<u64>,
888        creating_job_epochs: Vec<(TableId, u64)>,
889    ) {
890        {
891            if let Some(prev_epoch) = self.completing_barrier.take() {
892                assert_eq!(command_prev_epoch, Some(prev_epoch));
893                self.committed_epoch = Some(prev_epoch);
894            } else {
895                assert_eq!(command_prev_epoch, None);
896            };
897            for (table_id, epoch) in creating_job_epochs {
898                self.creating_streaming_job_controls
899                    .get_mut(&table_id)
900                    .expect("should exist")
901                    .ack_completed(epoch)
902            }
903        }
904    }
905}
906
907/// The state and message of this barrier, a node for concurrent checkpoint.
908struct EpochNode {
909    /// Timer for recording barrier latency, taken after `complete_barriers`.
910    enqueue_time: HistogramTimer,
911
912    /// Whether this barrier is in-flight or completed.
913    state: BarrierEpochState,
914
915    /// Context of this command to generate barrier and do some post jobs.
916    command_ctx: CommandContext,
917    /// Notifiers of this barrier.
918    notifiers: Vec<Notifier>,
919}
920
921#[derive(Debug)]
922/// The state of barrier.
923struct BarrierEpochState {
924    node_to_collect: NodeToCollect,
925
926    resps: Vec<BarrierCompleteResponse>,
927
928    creating_jobs_to_wait: HashSet<TableId>,
929
930    finished_jobs: HashMap<TableId, Vec<BarrierCompleteResponse>>,
931}
932
933impl BarrierEpochState {
934    fn is_inflight(&self) -> bool {
935        !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
936    }
937}
938
939impl DatabaseCheckpointControl {
940    /// Handle the new barrier from the scheduled queue and inject it.
941    fn handle_new_barrier(
942        &mut self,
943        command: Option<(Command, Vec<Notifier>)>,
944        checkpoint: bool,
945        span: tracing::Span,
946        control_stream_manager: &mut ControlStreamManager,
947        hummock_version_stats: &HummockVersionStats,
948    ) -> MetaResult<()> {
949        let curr_epoch = self.state.in_flight_prev_epoch().next();
950
951        let (mut command, mut notifiers) = if let Some((command, notifiers)) = command {
952            (Some(command), notifiers)
953        } else {
954            (None, vec![])
955        };
956
957        for table_to_cancel in command
958            .as_ref()
959            .map(Command::tables_to_drop)
960            .into_iter()
961            .flatten()
962        {
963            if self
964                .creating_streaming_job_controls
965                .contains_key(&table_to_cancel)
966            {
967                warn!(
968                    table_id = table_to_cancel.table_id,
969                    "ignore cancel command on creating streaming job"
970                );
971                for notifier in notifiers {
972                    notifier
973                        .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
974                }
975                return Ok(());
976            }
977        }
978
979        if let Some(Command::RescheduleFragment { .. }) = &command
980            && !self.creating_streaming_job_controls.is_empty()
981        {
982            warn!("ignore reschedule when creating streaming job with snapshot backfill");
983            for notifier in notifiers {
984                notifier.notify_start_failed(
985                        anyhow!(
986                            "cannot reschedule when creating streaming job with snapshot backfill",
987                        )
988                        .into(),
989                    );
990            }
991            return Ok(());
992        }
993
994        let Some(barrier_info) =
995            self.state
996                .next_barrier_info(command.as_ref(), checkpoint, curr_epoch)
997        else {
998            // skip the command when there is nothing to do with the barrier
999            for mut notifier in notifiers {
1000                notifier.notify_started();
1001                notifier.notify_collected();
1002            }
1003            return Ok(());
1004        };
1005
1006        let mut edges = self
1007            .state
1008            .inflight_graph_info
1009            .build_edge(command.as_ref(), &*control_stream_manager);
1010
1011        // Insert newly added creating job
1012        if let Some(Command::CreateStreamingJob {
1013            job_type,
1014            info,
1015            cross_db_snapshot_backfill_info,
1016        }) = &mut command
1017        {
1018            match job_type {
1019                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
1020                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1021                        fill_snapshot_backfill_epoch(
1022                            &mut fragment.nodes,
1023                            None,
1024                            cross_db_snapshot_backfill_info,
1025                        )?;
1026                    }
1027                }
1028                CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
1029                    if self.state.is_paused() {
1030                        warn!("cannot create streaming job with snapshot backfill when paused");
1031                        for notifier in notifiers {
1032                            notifier.notify_start_failed(
1033                                anyhow!("cannot create streaming job with snapshot backfill when paused",)
1034                                    .into(),
1035                            );
1036                        }
1037                        return Ok(());
1038                    }
1039                    // set snapshot epoch of upstream table for snapshot backfill
1040                    for snapshot_backfill_epoch in snapshot_backfill_info
1041                        .upstream_mv_table_id_to_backfill_epoch
1042                        .values_mut()
1043                    {
1044                        assert_eq!(
1045                            snapshot_backfill_epoch.replace(barrier_info.prev_epoch()),
1046                            None,
1047                            "must not set previously"
1048                        );
1049                    }
1050                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1051                        if let Err(e) = fill_snapshot_backfill_epoch(
1052                            &mut fragment.nodes,
1053                            Some(snapshot_backfill_info),
1054                            cross_db_snapshot_backfill_info,
1055                        ) {
1056                            warn!(e = %e.as_report(), "failed to fill snapshot backfill epoch");
1057                            for notifier in notifiers {
1058                                notifier.notify_start_failed(e.clone());
1059                            }
1060                            return Ok(());
1061                        };
1062                    }
1063                    let job_id = info.stream_job_fragments.stream_job_id();
1064                    let snapshot_backfill_upstream_tables = snapshot_backfill_info
1065                        .upstream_mv_table_id_to_backfill_epoch
1066                        .keys()
1067                        .cloned()
1068                        .collect();
1069
1070                    self.creating_streaming_job_controls.insert(
1071                        job_id,
1072                        CreatingStreamingJobControl::new(
1073                            info,
1074                            snapshot_backfill_upstream_tables,
1075                            barrier_info.prev_epoch(),
1076                            hummock_version_stats,
1077                            control_stream_manager,
1078                            edges.as_mut().expect("should exist"),
1079                        )?,
1080                    );
1081                }
1082            }
1083        }
1084
1085        // Collect the jobs to finish
1086        if let (BarrierKind::Checkpoint(_), None) = (&barrier_info.kind, &command) {
1087            if let Some(jobs_to_merge) = self.jobs_to_merge() {
1088                command = Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge));
1089            } else {
1090                let pending_backfill_nodes =
1091                    self.create_mview_tracker.take_pending_backfill_nodes();
1092                if !pending_backfill_nodes.is_empty() {
1093                    command = Some(Command::StartFragmentBackfill {
1094                        fragment_ids: pending_backfill_nodes,
1095                    });
1096                }
1097            }
1098        }
1099
1100        let command = command;
1101
1102        let (
1103            pre_applied_graph_info,
1104            pre_applied_subscription_info,
1105            table_ids_to_commit,
1106            jobs_to_wait,
1107            prev_paused_reason,
1108        ) = self.state.apply_command(command.as_ref());
1109
1110        // Tracing related stuff
1111        barrier_info.prev_epoch.span().in_scope(|| {
1112            tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch.value().0, "new barrier enqueued");
1113        });
1114        span.record("epoch", barrier_info.curr_epoch.value().0);
1115
1116        for creating_job in &mut self.creating_streaming_job_controls.values_mut() {
1117            creating_job.on_new_command(control_stream_manager, command.as_ref(), &barrier_info)?;
1118        }
1119
1120        let node_to_collect = match control_stream_manager.inject_command_ctx_barrier(
1121            self.database_id,
1122            command.as_ref(),
1123            &barrier_info,
1124            prev_paused_reason,
1125            &pre_applied_graph_info,
1126            &self.state.inflight_graph_info,
1127            &mut edges,
1128        ) {
1129            Ok(node_to_collect) => node_to_collect,
1130            Err(err) => {
1131                for notifier in notifiers {
1132                    notifier.notify_start_failed(err.clone());
1133                }
1134                fail_point!("inject_barrier_err_success");
1135                return Err(err);
1136            }
1137        };
1138
1139        // Notify about the injection.
1140        notifiers.iter_mut().for_each(|n| n.notify_started());
1141
1142        let command_ctx = CommandContext::new(
1143            barrier_info,
1144            pre_applied_subscription_info,
1145            table_ids_to_commit.clone(),
1146            command,
1147            span,
1148        );
1149
1150        // Record the in-flight barrier.
1151        self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1152
1153        Ok(())
1154    }
1155}