risingwave_meta/barrier/checkpoint/
control.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
26use risingwave_meta_model::WorkerId;
27use risingwave_pb::ddl_service::DdlProgress;
28use risingwave_pb::hummock::HummockVersionStats;
29use risingwave_pb::stream_service::BarrierCompleteResponse;
30use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
31use thiserror_ext::AsReport;
32use tracing::{debug, warn};
33
34use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
35use crate::barrier::checkpoint::recovery::{
36    DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
37    RecoveringStateAction,
38};
39use crate::barrier::checkpoint::state::BarrierWorkerState;
40use crate::barrier::command::CommandContext;
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
42use crate::barrier::info::{InflightStreamingJobInfo, SharedActorInfos};
43use crate::barrier::notifier::Notifier;
44use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob};
45use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
46use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
47use crate::barrier::utils::{
48    NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
49};
50use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, InflightSubscriptionInfo};
51use crate::manager::MetaSrvEnv;
52use crate::rpc::metrics::GLOBAL_META_METRICS;
53use crate::stream::fill_snapshot_backfill_epoch;
54use crate::{MetaError, MetaResult};
55
56pub(crate) struct CheckpointControl {
57    pub(crate) env: MetaSrvEnv,
58    pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
59    pub(super) hummock_version_stats: HummockVersionStats,
60}
61
62impl CheckpointControl {
63    pub fn new(env: MetaSrvEnv) -> Self {
64        Self {
65            env,
66            databases: Default::default(),
67            hummock_version_stats: Default::default(),
68        }
69    }
70
71    pub(crate) fn recover(
72        databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
73        failed_databases: HashSet<DatabaseId>,
74        control_stream_manager: &mut ControlStreamManager,
75        hummock_version_stats: HummockVersionStats,
76        env: MetaSrvEnv,
77    ) -> Self {
78        env.shared_actor_infos()
79            .retain_databases(databases.keys().chain(&failed_databases).cloned());
80        Self {
81            env,
82            databases: databases
83                .into_iter()
84                .map(|(database_id, control)| {
85                    (
86                        database_id,
87                        DatabaseCheckpointControlStatus::Running(control),
88                    )
89                })
90                .chain(failed_databases.into_iter().map(|database_id| {
91                    (
92                        database_id,
93                        DatabaseCheckpointControlStatus::Recovering(
94                            DatabaseRecoveringState::resetting(database_id, control_stream_manager),
95                        ),
96                    )
97                }))
98                .collect(),
99            hummock_version_stats,
100        }
101    }
102
103    pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
104        self.hummock_version_stats = output.hummock_version_stats;
105        for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
106            self.databases
107                .get_mut(&database_id)
108                .expect("should exist")
109                .expect_running("should have wait for completing command before enter recovery")
110                .ack_completed(command_prev_epoch, creating_job_epochs);
111        }
112    }
113
114    pub(crate) fn next_complete_barrier_task(
115        &mut self,
116        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
117    ) -> Option<CompleteBarrierTask> {
118        let mut task = None;
119        for database in self.databases.values_mut() {
120            let Some(database) = database.running_state_mut() else {
121                continue;
122            };
123            let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
124            database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
125        }
126        task
127    }
128
129    pub(crate) fn barrier_collected(
130        &mut self,
131        resp: BarrierCompleteResponse,
132        periodic_barriers: &mut PeriodicBarriers,
133    ) -> MetaResult<()> {
134        let database_id = DatabaseId::new(resp.database_id);
135        let database_status = self.databases.get_mut(&database_id).expect("should exist");
136        match database_status {
137            DatabaseCheckpointControlStatus::Running(database) => {
138                database.barrier_collected(resp, periodic_barriers)
139            }
140            DatabaseCheckpointControlStatus::Recovering(state) => {
141                state.barrier_collected(database_id, resp);
142                Ok(())
143            }
144        }
145    }
146
147    pub(crate) fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
148        self.databases.values().all(|database| {
149            database
150                .running_state()
151                .map(|database| database.can_inject_barrier(in_flight_barrier_nums))
152                .unwrap_or(true)
153        })
154    }
155
156    pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
157        self.databases.iter().filter_map(|(database_id, database)| {
158            database.running_state().is_none().then_some(*database_id)
159        })
160    }
161
162    pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
163        self.databases.iter().filter_map(|(database_id, database)| {
164            database.running_state().is_some().then_some(*database_id)
165        })
166    }
167
168    /// return Some(failed `database_id` -> `err`)
169    pub(crate) fn handle_new_barrier(
170        &mut self,
171        new_barrier: NewBarrier,
172        control_stream_manager: &mut ControlStreamManager,
173    ) -> MetaResult<()> {
174        let NewBarrier {
175            database_id,
176            command,
177            span,
178            checkpoint,
179        } = new_barrier;
180
181        if let Some((mut command, notifiers)) = command {
182            if let &mut Command::CreateStreamingJob {
183                ref mut cross_db_snapshot_backfill_info,
184                ref info,
185                ..
186            } = &mut command
187            {
188                for (table_id, snapshot_epoch) in
189                    &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
190                {
191                    for database in self.databases.values() {
192                        if let Some(database) = database.running_state()
193                            && database.state.inflight_graph_info.contains_job(*table_id)
194                        {
195                            if let Some(committed_epoch) = database.committed_epoch {
196                                *snapshot_epoch = Some(committed_epoch);
197                            }
198                            break;
199                        }
200                    }
201                    if snapshot_epoch.is_none() {
202                        let table_id = *table_id;
203                        warn!(
204                            ?cross_db_snapshot_backfill_info,
205                            ?table_id,
206                            ?info,
207                            "database of cross db upstream table not found"
208                        );
209                        let err: MetaError =
210                            anyhow!("database of cross db upstream table {} not found", table_id)
211                                .into();
212                        for notifier in notifiers {
213                            notifier.notify_start_failed(err.clone());
214                        }
215
216                        return Ok(());
217                    }
218                }
219            }
220
221            let database = match self.databases.entry(database_id) {
222                Entry::Occupied(entry) => entry
223                    .into_mut()
224                    .expect_running("should not have command when not running"),
225                Entry::Vacant(entry) => match &command {
226                    Command::CreateStreamingJob {
227                        job_type: CreateStreamingJobType::Normal,
228                        ..
229                    } => {
230                        let new_database = DatabaseCheckpointControl::new(
231                            database_id,
232                            self.env.shared_actor_infos().clone(),
233                        );
234                        control_stream_manager.add_partial_graph(database_id, None);
235                        entry
236                            .insert(DatabaseCheckpointControlStatus::Running(new_database))
237                            .expect_running("just initialized as running")
238                    }
239                    Command::Flush | Command::Pause | Command::Resume => {
240                        for mut notifier in notifiers {
241                            notifier.notify_started();
242                            notifier.notify_collected();
243                        }
244                        warn!(?command, "skip command for empty database");
245                        return Ok(());
246                    }
247                    _ => {
248                        panic!(
249                            "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
250                            database_id, command
251                        )
252                    }
253                },
254            };
255
256            database.handle_new_barrier(
257                Some((command, notifiers)),
258                checkpoint,
259                span.clone(),
260                control_stream_manager,
261                &self.hummock_version_stats,
262            )
263        } else {
264            let database = match self.databases.entry(database_id) {
265                Entry::Occupied(entry) => entry.into_mut(),
266                Entry::Vacant(_) => {
267                    // If it does not exist in the HashMap yet, it means that the first streaming
268                    // job has not been created, and we do not need to send a barrier.
269                    return Ok(());
270                }
271            };
272            let Some(database) = database.running_state_mut() else {
273                // Skip new barrier for database which is not running.
274                return Ok(());
275            };
276            database.handle_new_barrier(
277                None,
278                checkpoint,
279                span.clone(),
280                control_stream_manager,
281                &self.hummock_version_stats,
282            )
283        }
284    }
285
286    pub(crate) fn update_barrier_nums_metrics(&self) {
287        self.databases
288            .values()
289            .flat_map(|database| database.running_state())
290            .for_each(|database| database.update_barrier_nums_metrics());
291    }
292
293    pub(crate) fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
294        let mut progress = HashMap::new();
295        for status in self.databases.values() {
296            let Some(database_checkpoint_control) = status.running_state() else {
297                continue;
298            };
299            // Progress of normal backfill
300            progress.extend(
301                database_checkpoint_control
302                    .create_mview_tracker
303                    .gen_ddl_progress(),
304            );
305            // Progress of snapshot backfill
306            for creating_job in database_checkpoint_control
307                .creating_streaming_job_controls
308                .values()
309            {
310                progress.extend([(
311                    creating_job.job_id.table_id,
312                    creating_job.gen_ddl_progress(),
313                )]);
314            }
315        }
316        progress
317    }
318
319    pub(crate) fn databases_failed_at_worker_err(
320        &mut self,
321        worker_id: WorkerId,
322    ) -> Vec<DatabaseId> {
323        let mut failed_databases = Vec::new();
324        for (database_id, database_status) in &mut self.databases {
325            let database_checkpoint_control = match database_status {
326                DatabaseCheckpointControlStatus::Running(control) => control,
327                DatabaseCheckpointControlStatus::Recovering(state) => {
328                    if !state.is_valid_after_worker_err(worker_id) {
329                        failed_databases.push(*database_id);
330                    }
331                    continue;
332                }
333            };
334
335            if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
336                || database_checkpoint_control
337                    .state
338                    .inflight_graph_info
339                    .contains_worker(worker_id as _)
340                || database_checkpoint_control
341                    .creating_streaming_job_controls
342                    .values_mut()
343                    .any(|job| !job.is_valid_after_worker_err(worker_id))
344            {
345                failed_databases.push(*database_id);
346            }
347        }
348        failed_databases
349    }
350
351    pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
352        for (_, node) in self.databases.values_mut().flat_map(|status| {
353            status
354                .running_state_mut()
355                .map(|database| take(&mut database.command_ctx_queue))
356                .into_iter()
357                .flatten()
358        }) {
359            for notifier in node.notifiers {
360                notifier.notify_collection_failed(err.clone());
361            }
362            node.enqueue_time.observe_duration();
363        }
364        self.databases.values_mut().for_each(|database| {
365            if let Some(database) = database.running_state_mut() {
366                database.create_mview_tracker.abort_all()
367            }
368        });
369    }
370
371    pub(crate) fn inflight_infos(
372        &self,
373    ) -> impl Iterator<
374        Item = (
375            DatabaseId,
376            &InflightSubscriptionInfo,
377            impl Iterator<Item = TableId> + '_,
378        ),
379    > + '_ {
380        self.databases.iter().flat_map(|(database_id, database)| {
381            database
382                .database_state()
383                .map(|(database_state, creating_jobs)| {
384                    (
385                        *database_id,
386                        &database_state.inflight_subscription_info,
387                        creating_jobs
388                            .values()
389                            .filter_map(|job| job.is_consuming().then_some(job.job_id)),
390                    )
391                })
392        })
393    }
394}
395
396pub(crate) enum CheckpointControlEvent<'a> {
397    EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
398    EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
399}
400
401impl CheckpointControl {
402    pub(crate) fn on_reset_database_resp(
403        &mut self,
404        worker_id: WorkerId,
405        resp: ResetDatabaseResponse,
406    ) {
407        let database_id = DatabaseId::new(resp.database_id);
408        match self.databases.get_mut(&database_id).expect("should exist") {
409            DatabaseCheckpointControlStatus::Running(_) => {
410                unreachable!("should not receive reset database resp when running")
411            }
412            DatabaseCheckpointControlStatus::Recovering(state) => {
413                state.on_reset_database_resp(worker_id, resp)
414            }
415        }
416    }
417
418    pub(crate) fn next_event(
419        &mut self,
420    ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
421        let mut this = Some(self);
422        poll_fn(move |cx| {
423            let Some(this_mut) = this.as_mut() else {
424                unreachable!("should not be polled after poll ready")
425            };
426            for (&database_id, database_status) in &mut this_mut.databases {
427                match database_status {
428                    DatabaseCheckpointControlStatus::Running(_) => {}
429                    DatabaseCheckpointControlStatus::Recovering(state) => {
430                        let poll_result = state.poll_next_event(cx);
431                        if let Poll::Ready(action) = poll_result {
432                            let this = this.take().expect("checked Some");
433                            return Poll::Ready(match action {
434                                RecoveringStateAction::EnterInitializing(reset_workers) => {
435                                    CheckpointControlEvent::EnteringInitializing(
436                                        this.new_database_status_action(
437                                            database_id,
438                                            EnterInitializing(reset_workers),
439                                        ),
440                                    )
441                                }
442                                RecoveringStateAction::EnterRunning => {
443                                    CheckpointControlEvent::EnteringRunning(
444                                        this.new_database_status_action(database_id, EnterRunning),
445                                    )
446                                }
447                            });
448                        }
449                    }
450                }
451            }
452            Poll::Pending
453        })
454    }
455}
456
457pub(crate) enum DatabaseCheckpointControlStatus {
458    Running(DatabaseCheckpointControl),
459    Recovering(DatabaseRecoveringState),
460}
461
462impl DatabaseCheckpointControlStatus {
463    fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
464        match self {
465            DatabaseCheckpointControlStatus::Running(state) => Some(state),
466            DatabaseCheckpointControlStatus::Recovering(_) => None,
467        }
468    }
469
470    fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
471        match self {
472            DatabaseCheckpointControlStatus::Running(state) => Some(state),
473            DatabaseCheckpointControlStatus::Recovering(_) => None,
474        }
475    }
476
477    fn database_state(
478        &self,
479    ) -> Option<(
480        &BarrierWorkerState,
481        &HashMap<TableId, CreatingStreamingJobControl>,
482    )> {
483        match self {
484            DatabaseCheckpointControlStatus::Running(control) => {
485                Some((&control.state, &control.creating_streaming_job_controls))
486            }
487            DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
488        }
489    }
490
491    fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
492        match self {
493            DatabaseCheckpointControlStatus::Running(state) => state,
494            DatabaseCheckpointControlStatus::Recovering(_) => {
495                panic!("should be at running: {}", reason)
496            }
497        }
498    }
499}
500
501struct DatabaseCheckpointControlMetrics {
502    barrier_latency: LabelGuardedHistogram,
503    in_flight_barrier_nums: LabelGuardedIntGauge,
504    all_barrier_nums: LabelGuardedIntGauge,
505}
506
507impl DatabaseCheckpointControlMetrics {
508    fn new(database_id: DatabaseId) -> Self {
509        let database_id_str = database_id.database_id.to_string();
510        let barrier_latency = GLOBAL_META_METRICS
511            .barrier_latency
512            .with_guarded_label_values(&[&database_id_str]);
513        let in_flight_barrier_nums = GLOBAL_META_METRICS
514            .in_flight_barrier_nums
515            .with_guarded_label_values(&[&database_id_str]);
516        let all_barrier_nums = GLOBAL_META_METRICS
517            .all_barrier_nums
518            .with_guarded_label_values(&[&database_id_str]);
519        Self {
520            barrier_latency,
521            in_flight_barrier_nums,
522            all_barrier_nums,
523        }
524    }
525}
526
527/// Controls the concurrent execution of commands.
528pub(crate) struct DatabaseCheckpointControl {
529    database_id: DatabaseId,
530    state: BarrierWorkerState,
531
532    /// Save the state and message of barrier in order.
533    /// Key is the `prev_epoch`.
534    command_ctx_queue: BTreeMap<u64, EpochNode>,
535    /// The barrier that are completing.
536    /// Some(`prev_epoch`)
537    completing_barrier: Option<u64>,
538
539    committed_epoch: Option<u64>,
540    creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
541
542    create_mview_tracker: CreateMviewProgressTracker,
543
544    metrics: DatabaseCheckpointControlMetrics,
545}
546
547impl DatabaseCheckpointControl {
548    fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
549        Self {
550            database_id,
551            state: BarrierWorkerState::new(database_id, shared_actor_infos),
552            command_ctx_queue: Default::default(),
553            completing_barrier: None,
554            committed_epoch: None,
555            creating_streaming_job_controls: Default::default(),
556            create_mview_tracker: Default::default(),
557            metrics: DatabaseCheckpointControlMetrics::new(database_id),
558        }
559    }
560
561    pub(crate) fn recovery(
562        database_id: DatabaseId,
563        create_mview_tracker: CreateMviewProgressTracker,
564        state: BarrierWorkerState,
565        committed_epoch: u64,
566        creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
567    ) -> Self {
568        Self {
569            database_id,
570            state,
571            command_ctx_queue: Default::default(),
572            completing_barrier: None,
573            committed_epoch: Some(committed_epoch),
574            creating_streaming_job_controls,
575            create_mview_tracker,
576            metrics: DatabaseCheckpointControlMetrics::new(database_id),
577        }
578    }
579
580    fn total_command_num(&self) -> usize {
581        self.command_ctx_queue.len()
582            + match &self.completing_barrier {
583                Some(_) => 1,
584                None => 0,
585            }
586    }
587
588    /// Update the metrics of barrier nums.
589    fn update_barrier_nums_metrics(&self) {
590        self.metrics.in_flight_barrier_nums.set(
591            self.command_ctx_queue
592                .values()
593                .filter(|x| x.state.is_inflight())
594                .count() as i64,
595        );
596        self.metrics
597            .all_barrier_nums
598            .set(self.total_command_num() as i64);
599    }
600
601    fn jobs_to_merge(
602        &self,
603    ) -> Option<HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>> {
604        let mut table_ids_to_merge = HashMap::new();
605
606        for (table_id, creating_streaming_job) in &self.creating_streaming_job_controls {
607            if let Some(graph_info) = creating_streaming_job.should_merge_to_upstream() {
608                table_ids_to_merge.insert(
609                    *table_id,
610                    (
611                        creating_streaming_job
612                            .snapshot_backfill_upstream_tables
613                            .clone(),
614                        graph_info.clone(),
615                    ),
616                );
617            }
618        }
619        if table_ids_to_merge.is_empty() {
620            None
621        } else {
622            Some(table_ids_to_merge)
623        }
624    }
625
626    /// Enqueue a barrier command
627    fn enqueue_command(
628        &mut self,
629        command_ctx: CommandContext,
630        notifiers: Vec<Notifier>,
631        node_to_collect: NodeToCollect,
632        creating_jobs_to_wait: HashSet<TableId>,
633    ) {
634        let timer = self.metrics.barrier_latency.start_timer();
635
636        if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
637            assert_eq!(
638                command_ctx.barrier_info.prev_epoch.value(),
639                node.command_ctx.barrier_info.curr_epoch.value()
640            );
641        }
642
643        tracing::trace!(
644            prev_epoch = command_ctx.barrier_info.prev_epoch(),
645            ?creating_jobs_to_wait,
646            ?node_to_collect,
647            "enqueue command"
648        );
649        self.command_ctx_queue.insert(
650            command_ctx.barrier_info.prev_epoch(),
651            EpochNode {
652                enqueue_time: timer,
653                state: BarrierEpochState {
654                    node_to_collect,
655                    resps: vec![],
656                    creating_jobs_to_wait,
657                    finished_jobs: HashMap::new(),
658                },
659                command_ctx,
660                notifiers,
661            },
662        );
663    }
664
665    /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
666    /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
667    fn barrier_collected(
668        &mut self,
669        resp: BarrierCompleteResponse,
670        periodic_barriers: &mut PeriodicBarriers,
671    ) -> MetaResult<()> {
672        let worker_id = resp.worker_id;
673        let prev_epoch = resp.epoch;
674        tracing::trace!(
675            worker_id,
676            prev_epoch,
677            partial_graph_id = resp.partial_graph_id,
678            "barrier collected"
679        );
680        let creating_job_id = from_partial_graph_id(resp.partial_graph_id);
681        match creating_job_id {
682            None => {
683                if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
684                    assert!(
685                        node.state
686                            .node_to_collect
687                            .remove(&(worker_id as _))
688                            .is_some()
689                    );
690                    node.state.resps.push(resp);
691                } else {
692                    panic!(
693                        "collect barrier on non-existing barrier: {}, {}",
694                        prev_epoch, worker_id
695                    );
696                }
697            }
698            Some(creating_job_id) => {
699                let should_merge_to_upstream = self
700                    .creating_streaming_job_controls
701                    .get_mut(&creating_job_id)
702                    .expect("should exist")
703                    .collect(resp);
704                if should_merge_to_upstream {
705                    periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
706                }
707            }
708        }
709        Ok(())
710    }
711
712    /// Pause inject barrier until True.
713    fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
714        self.command_ctx_queue
715            .values()
716            .filter(|x| x.state.is_inflight())
717            .count()
718            < in_flight_barrier_nums
719    }
720
721    /// Return whether the database can still work after worker failure
722    pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
723        for epoch_node in self.command_ctx_queue.values_mut() {
724            if !is_valid_after_worker_err(&mut epoch_node.state.node_to_collect, worker_id) {
725                return false;
726            }
727        }
728        // TODO: include barrier in creating jobs
729        true
730    }
731}
732
733impl DatabaseCheckpointControl {
734    /// return creating job table fragment id -> (backfill progress epoch , {`upstream_mv_table_id`})
735    fn collect_backfill_pinned_upstream_log_epoch(
736        &self,
737    ) -> HashMap<TableId, (u64, HashSet<TableId>)> {
738        self.creating_streaming_job_controls
739            .iter()
740            .filter_map(|(table_id, creating_job)| {
741                creating_job
742                    .pinned_upstream_log_epoch()
743                    .map(|progress_epoch| {
744                        (
745                            *table_id,
746                            (
747                                progress_epoch,
748                                creating_job.snapshot_backfill_upstream_tables.clone(),
749                            ),
750                        )
751                    })
752            })
753            .collect()
754    }
755
756    fn next_complete_barrier_task(
757        &mut self,
758        task: &mut Option<CompleteBarrierTask>,
759        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
760        hummock_version_stats: &HummockVersionStats,
761    ) {
762        // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
763        let mut creating_jobs_task = vec![];
764        {
765            // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
766            let mut finished_jobs = Vec::new();
767            let min_upstream_inflight_barrier = self
768                .command_ctx_queue
769                .first_key_value()
770                .map(|(epoch, _)| *epoch);
771            for (table_id, job) in &mut self.creating_streaming_job_controls {
772                if let Some((epoch, resps, status)) =
773                    job.start_completing(min_upstream_inflight_barrier)
774                {
775                    let is_first_time = match status {
776                        CompleteJobType::First => true,
777                        CompleteJobType::Normal => false,
778                        CompleteJobType::Finished => {
779                            finished_jobs.push((*table_id, epoch, resps));
780                            continue;
781                        }
782                    };
783                    creating_jobs_task.push((*table_id, epoch, resps, is_first_time));
784                }
785            }
786            if !finished_jobs.is_empty()
787                && let Some((_, control_stream_manager)) = &mut context
788            {
789                control_stream_manager.remove_partial_graph(
790                    self.database_id,
791                    finished_jobs
792                        .iter()
793                        .map(|(table_id, _, _)| *table_id)
794                        .collect(),
795                );
796            }
797            for (table_id, epoch, resps) in finished_jobs {
798                let epoch_state = &mut self
799                    .command_ctx_queue
800                    .get_mut(&epoch)
801                    .expect("should exist")
802                    .state;
803                assert!(epoch_state.creating_jobs_to_wait.remove(&table_id));
804                debug!(epoch, ?table_id, "finish creating job");
805                // It's safe to remove the creating job, because on CompleteJobType::Finished,
806                // all previous barriers have been collected and completed.
807                let creating_streaming_job = self
808                    .creating_streaming_job_controls
809                    .remove(&table_id)
810                    .expect("should exist");
811                assert!(creating_streaming_job.is_finished());
812                assert!(epoch_state.finished_jobs.insert(table_id, resps).is_none());
813            }
814        }
815        assert!(self.completing_barrier.is_none());
816        while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
817            && !state.is_inflight()
818        {
819            {
820                let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
821                assert!(node.state.creating_jobs_to_wait.is_empty());
822                assert!(node.state.node_to_collect.is_empty());
823
824                // Process load_finished_source_ids for all barrier types (checkpoint and non-checkpoint)
825                let load_finished_source_ids: Vec<_> = node
826                    .state
827                    .resps
828                    .iter()
829                    .flat_map(|resp| &resp.load_finished_source_ids)
830                    .cloned()
831                    .collect();
832                if !load_finished_source_ids.is_empty() {
833                    // Add load_finished_source_ids to the task for processing
834                    let task = task.get_or_insert_default();
835                    task.load_finished_source_ids
836                        .extend(load_finished_source_ids);
837                }
838
839                let mut finished_jobs = self.create_mview_tracker.apply_collected_command(
840                    node.command_ctx.command.as_ref(),
841                    &node.command_ctx.barrier_info,
842                    &node.state.resps,
843                    hummock_version_stats,
844                );
845                if !node.command_ctx.barrier_info.kind.is_checkpoint() {
846                    assert!(finished_jobs.is_empty());
847                    node.notifiers.into_iter().for_each(|notifier| {
848                        notifier.notify_collected();
849                    });
850                    if let Some((periodic_barriers, _)) = &mut context
851                        && self.create_mview_tracker.has_pending_finished_jobs()
852                        && self
853                            .command_ctx_queue
854                            .values()
855                            .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
856                    {
857                        periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
858                    }
859                    continue;
860                }
861                node.state
862                    .finished_jobs
863                    .drain()
864                    .for_each(|(job_id, resps)| {
865                        node.state.resps.extend(resps);
866                        finished_jobs.push(TrackingJob::New(TrackingCommand { job_id }));
867                    });
868                let task = task.get_or_insert_default();
869                node.command_ctx.collect_commit_epoch_info(
870                    &mut task.commit_info,
871                    take(&mut node.state.resps),
872                    self.collect_backfill_pinned_upstream_log_epoch(),
873                );
874                self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
875                task.finished_jobs.extend(finished_jobs);
876                task.notifiers.extend(node.notifiers);
877                task.epoch_infos
878                    .try_insert(
879                        self.database_id,
880                        (Some((node.command_ctx, node.enqueue_time)), vec![]),
881                    )
882                    .expect("non duplicate");
883                break;
884            }
885        }
886        if !creating_jobs_task.is_empty() {
887            let task = task.get_or_insert_default();
888            for (table_id, epoch, resps, is_first_time) in creating_jobs_task {
889                collect_creating_job_commit_epoch_info(
890                    &mut task.commit_info,
891                    epoch,
892                    resps,
893                    self.creating_streaming_job_controls[&table_id].state_table_ids(),
894                    is_first_time,
895                );
896                let (_, creating_job_epochs) =
897                    task.epoch_infos.entry(self.database_id).or_default();
898                creating_job_epochs.push((table_id, epoch));
899            }
900        }
901    }
902
903    fn ack_completed(
904        &mut self,
905        command_prev_epoch: Option<u64>,
906        creating_job_epochs: Vec<(TableId, u64)>,
907    ) {
908        {
909            if let Some(prev_epoch) = self.completing_barrier.take() {
910                assert_eq!(command_prev_epoch, Some(prev_epoch));
911                self.committed_epoch = Some(prev_epoch);
912            } else {
913                assert_eq!(command_prev_epoch, None);
914            };
915            for (table_id, epoch) in creating_job_epochs {
916                self.creating_streaming_job_controls
917                    .get_mut(&table_id)
918                    .expect("should exist")
919                    .ack_completed(epoch)
920            }
921        }
922    }
923}
924
925/// The state and message of this barrier, a node for concurrent checkpoint.
926struct EpochNode {
927    /// Timer for recording barrier latency, taken after `complete_barriers`.
928    enqueue_time: HistogramTimer,
929
930    /// Whether this barrier is in-flight or completed.
931    state: BarrierEpochState,
932
933    /// Context of this command to generate barrier and do some post jobs.
934    command_ctx: CommandContext,
935    /// Notifiers of this barrier.
936    notifiers: Vec<Notifier>,
937}
938
939#[derive(Debug)]
940/// The state of barrier.
941struct BarrierEpochState {
942    node_to_collect: NodeToCollect,
943
944    resps: Vec<BarrierCompleteResponse>,
945
946    creating_jobs_to_wait: HashSet<TableId>,
947
948    finished_jobs: HashMap<TableId, Vec<BarrierCompleteResponse>>,
949}
950
951impl BarrierEpochState {
952    fn is_inflight(&self) -> bool {
953        !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
954    }
955}
956
957impl DatabaseCheckpointControl {
958    /// Handle the new barrier from the scheduled queue and inject it.
959    fn handle_new_barrier(
960        &mut self,
961        command: Option<(Command, Vec<Notifier>)>,
962        checkpoint: bool,
963        span: tracing::Span,
964        control_stream_manager: &mut ControlStreamManager,
965        hummock_version_stats: &HummockVersionStats,
966    ) -> MetaResult<()> {
967        let curr_epoch = self.state.in_flight_prev_epoch().next();
968
969        let (mut command, mut notifiers) = if let Some((command, notifiers)) = command {
970            (Some(command), notifiers)
971        } else {
972            (None, vec![])
973        };
974
975        for table_to_cancel in command
976            .as_ref()
977            .map(Command::tables_to_drop)
978            .into_iter()
979            .flatten()
980        {
981            if self
982                .creating_streaming_job_controls
983                .contains_key(&table_to_cancel)
984            {
985                warn!(
986                    table_id = table_to_cancel.table_id,
987                    "ignore cancel command on creating streaming job"
988                );
989                for notifier in notifiers {
990                    notifier
991                        .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
992                }
993                return Ok(());
994            }
995        }
996
997        if let Some(Command::RescheduleFragment { .. }) = &command
998            && !self.creating_streaming_job_controls.is_empty()
999        {
1000            warn!("ignore reschedule when creating streaming job with snapshot backfill");
1001            for notifier in notifiers {
1002                notifier.notify_start_failed(
1003                    anyhow!(
1004                            "cannot reschedule when creating streaming job with snapshot backfill",
1005                        )
1006                        .into(),
1007                );
1008            }
1009            return Ok(());
1010        }
1011
1012        let Some(barrier_info) =
1013            self.state
1014                .next_barrier_info(command.as_ref(), checkpoint, curr_epoch)
1015        else {
1016            // skip the command when there is nothing to do with the barrier
1017            for mut notifier in notifiers {
1018                notifier.notify_started();
1019                notifier.notify_collected();
1020            }
1021            return Ok(());
1022        };
1023
1024        let mut edges = self
1025            .state
1026            .inflight_graph_info
1027            .build_edge(command.as_ref(), &*control_stream_manager);
1028
1029        // Insert newly added creating job
1030        if let Some(Command::CreateStreamingJob {
1031            job_type,
1032            info,
1033            cross_db_snapshot_backfill_info,
1034        }) = &mut command
1035        {
1036            match job_type {
1037                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
1038                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1039                        fill_snapshot_backfill_epoch(
1040                            &mut fragment.nodes,
1041                            None,
1042                            cross_db_snapshot_backfill_info,
1043                        )?;
1044                    }
1045                }
1046                CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
1047                    if self.state.is_paused() {
1048                        warn!("cannot create streaming job with snapshot backfill when paused");
1049                        for notifier in notifiers {
1050                            notifier.notify_start_failed(
1051                                anyhow!("cannot create streaming job with snapshot backfill when paused",)
1052                                    .into(),
1053                            );
1054                        }
1055                        return Ok(());
1056                    }
1057                    // set snapshot epoch of upstream table for snapshot backfill
1058                    for snapshot_backfill_epoch in snapshot_backfill_info
1059                        .upstream_mv_table_id_to_backfill_epoch
1060                        .values_mut()
1061                    {
1062                        assert_eq!(
1063                            snapshot_backfill_epoch.replace(barrier_info.prev_epoch()),
1064                            None,
1065                            "must not set previously"
1066                        );
1067                    }
1068                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1069                        if let Err(e) = fill_snapshot_backfill_epoch(
1070                            &mut fragment.nodes,
1071                            Some(snapshot_backfill_info),
1072                            cross_db_snapshot_backfill_info,
1073                        ) {
1074                            warn!(e = %e.as_report(), "failed to fill snapshot backfill epoch");
1075                            for notifier in notifiers {
1076                                notifier.notify_start_failed(e.clone());
1077                            }
1078                            return Ok(());
1079                        };
1080                    }
1081                    let job_id = info.stream_job_fragments.stream_job_id();
1082                    let snapshot_backfill_upstream_tables = snapshot_backfill_info
1083                        .upstream_mv_table_id_to_backfill_epoch
1084                        .keys()
1085                        .cloned()
1086                        .collect();
1087
1088                    let job = CreatingStreamingJobControl::new(
1089                        info,
1090                        snapshot_backfill_upstream_tables,
1091                        barrier_info.prev_epoch(),
1092                        hummock_version_stats,
1093                        control_stream_manager,
1094                        edges.as_mut().expect("should exist"),
1095                    )?;
1096
1097                    self.state
1098                        .inflight_graph_info
1099                        .shared_actor_infos
1100                        .upsert(self.database_id, job.graph_info().fragment_infos.values());
1101
1102                    self.creating_streaming_job_controls.insert(job_id, job);
1103                }
1104            }
1105        }
1106
1107        // Collect the jobs to finish
1108        if let (BarrierKind::Checkpoint(_), None) = (&barrier_info.kind, &command) {
1109            if let Some(jobs_to_merge) = self.jobs_to_merge() {
1110                command = Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge));
1111            } else {
1112                let pending_backfill_nodes =
1113                    self.create_mview_tracker.take_pending_backfill_nodes();
1114                if !pending_backfill_nodes.is_empty() {
1115                    command = Some(Command::StartFragmentBackfill {
1116                        fragment_ids: pending_backfill_nodes,
1117                    });
1118                }
1119            }
1120        }
1121
1122        let command = command;
1123
1124        let (
1125            pre_applied_graph_info,
1126            pre_applied_subscription_info,
1127            table_ids_to_commit,
1128            jobs_to_wait,
1129            prev_paused_reason,
1130        ) = self.state.apply_command(command.as_ref());
1131
1132        // Tracing related stuff
1133        barrier_info.prev_epoch.span().in_scope(|| {
1134            tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch.value().0, "new barrier enqueued");
1135        });
1136        span.record("epoch", barrier_info.curr_epoch.value().0);
1137
1138        for creating_job in &mut self.creating_streaming_job_controls.values_mut() {
1139            creating_job.on_new_command(control_stream_manager, command.as_ref(), &barrier_info)?;
1140        }
1141
1142        let node_to_collect = match control_stream_manager.inject_command_ctx_barrier(
1143            self.database_id,
1144            command.as_ref(),
1145            &barrier_info,
1146            prev_paused_reason,
1147            &pre_applied_graph_info,
1148            &self.state.inflight_graph_info,
1149            &mut edges,
1150        ) {
1151            Ok(node_to_collect) => node_to_collect,
1152            Err(err) => {
1153                for notifier in notifiers {
1154                    notifier.notify_start_failed(err.clone());
1155                }
1156                fail_point!("inject_barrier_err_success");
1157                return Err(err);
1158            }
1159        };
1160
1161        // Notify about the injection.
1162        notifiers.iter_mut().for_each(|n| n.notify_started());
1163
1164        let command_ctx = CommandContext::new(
1165            barrier_info,
1166            pre_applied_subscription_info,
1167            table_ids_to_commit.clone(),
1168            command,
1169            span,
1170        );
1171
1172        // Record the in-flight barrier.
1173        self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1174
1175        Ok(())
1176    }
1177}