risingwave_meta/barrier/checkpoint/
control.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_meta_model::WorkerId;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::id::{FragmentId, PartialGraphId};
31use risingwave_pb::stream_plan::DispatcherType as PbDispatcherType;
32use risingwave_pb::stream_plan::stream_node::NodeBody;
33use risingwave_pb::stream_service::BarrierCompleteResponse;
34use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
35use tracing::{debug, warn};
36
37use crate::barrier::cdc_progress::CdcProgress;
38use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
39use crate::barrier::checkpoint::recovery::{
40    DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
41    RecoveringStateAction,
42};
43use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
44use crate::barrier::command::CommandContext;
45use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
46use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
47use crate::barrier::notifier::Notifier;
48use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager};
49use crate::barrier::progress::TrackingJob;
50use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
51use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
52use crate::barrier::utils::collect_creating_job_commit_epoch_info;
53use crate::barrier::{
54    BackfillProgress, Command, CreateStreamingJobType, FragmentBackfillProgress, Reschedule,
55};
56use crate::controller::scale::{build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs};
57use crate::manager::MetaSrvEnv;
58use crate::model::ActorId;
59use crate::rpc::metrics::GLOBAL_META_METRICS;
60use crate::{MetaError, MetaResult};
61
62pub(crate) struct CheckpointControl {
63    pub(crate) env: MetaSrvEnv,
64    pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
65    pub(super) hummock_version_stats: HummockVersionStats,
66    /// The max barrier nums in flight
67    pub(crate) in_flight_barrier_nums: usize,
68}
69
70impl CheckpointControl {
71    pub fn new(env: MetaSrvEnv) -> Self {
72        Self {
73            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
74            env,
75            databases: Default::default(),
76            hummock_version_stats: Default::default(),
77        }
78    }
79
80    pub(crate) fn recover(
81        databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
82        failed_databases: HashMap<DatabaseId, HashSet<PartialGraphId>>, /* `database_id` -> set of resetting partial graph ids */
83        hummock_version_stats: HummockVersionStats,
84        env: MetaSrvEnv,
85    ) -> Self {
86        env.shared_actor_infos()
87            .retain_databases(databases.keys().chain(failed_databases.keys()).cloned());
88        Self {
89            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
90            env,
91            databases: databases
92                .into_iter()
93                .map(|(database_id, control)| {
94                    (
95                        database_id,
96                        DatabaseCheckpointControlStatus::Running(control),
97                    )
98                })
99                .chain(failed_databases.into_iter().map(
100                    |(database_id, resetting_partial_graphs)| {
101                        (
102                            database_id,
103                            DatabaseCheckpointControlStatus::Recovering(
104                                DatabaseRecoveringState::new_resetting(
105                                    database_id,
106                                    resetting_partial_graphs,
107                                ),
108                            ),
109                        )
110                    },
111                ))
112                .collect(),
113            hummock_version_stats,
114        }
115    }
116
117    pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
118        self.hummock_version_stats = output.hummock_version_stats;
119        for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
120            self.databases
121                .get_mut(&database_id)
122                .expect("should exist")
123                .expect_running("should have wait for completing command before enter recovery")
124                .ack_completed(command_prev_epoch, creating_job_epochs);
125        }
126    }
127
128    pub(crate) fn next_complete_barrier_task(
129        &mut self,
130        mut context: Option<(&mut PeriodicBarriers, &mut PartialGraphManager)>,
131    ) -> Option<CompleteBarrierTask> {
132        let mut task = None;
133        for database in self.databases.values_mut() {
134            let Some(database) = database.running_state_mut() else {
135                continue;
136            };
137            let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
138            database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
139        }
140        task
141    }
142
143    pub(crate) fn barrier_collected(
144        &mut self,
145        partial_graph_id: PartialGraphId,
146        collected_barrier: CollectedBarrier,
147        periodic_barriers: &mut PeriodicBarriers,
148    ) -> MetaResult<()> {
149        let (database_id, _) = from_partial_graph_id(partial_graph_id);
150        let database_status = self.databases.get_mut(&database_id).expect("should exist");
151        match database_status {
152            DatabaseCheckpointControlStatus::Running(database) => {
153                database.barrier_collected(partial_graph_id, collected_barrier, periodic_barriers)
154            }
155            DatabaseCheckpointControlStatus::Recovering(_) => {
156                if cfg!(debug_assertions) {
157                    panic!(
158                        "receive collected barrier {:?} on recovering database {} from partial graph {}",
159                        collected_barrier, database_id, partial_graph_id
160                    );
161                } else {
162                    warn!(?collected_barrier, %partial_graph_id, "ignore collected barrier on recovering database");
163                }
164                Ok(())
165            }
166        }
167    }
168
169    pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
170        self.databases.iter().filter_map(|(database_id, database)| {
171            database.running_state().is_none().then_some(*database_id)
172        })
173    }
174
175    pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
176        self.databases.iter().filter_map(|(database_id, database)| {
177            database.running_state().is_some().then_some(*database_id)
178        })
179    }
180
181    pub(crate) fn database_info(&self, database_id: DatabaseId) -> Option<&InflightDatabaseInfo> {
182        self.databases
183            .get(&database_id)
184            .and_then(|database| database.running_state())
185            .map(|database| &database.database_info)
186    }
187
188    pub(crate) fn may_have_snapshot_backfilling_jobs(&self) -> bool {
189        self.databases
190            .values()
191            .any(|database| database.may_have_snapshot_backfilling_jobs())
192    }
193
194    /// return Some(failed `database_id` -> `err`)
195    pub(crate) fn handle_new_barrier(
196        &mut self,
197        new_barrier: NewBarrier,
198        partial_graph_manager: &mut PartialGraphManager,
199    ) -> MetaResult<()> {
200        let NewBarrier {
201            database_id,
202            command,
203            span,
204            checkpoint,
205        } = new_barrier;
206
207        if let Some((mut command, notifiers)) = command {
208            if let &mut Command::CreateStreamingJob {
209                ref mut cross_db_snapshot_backfill_info,
210                ref info,
211                ..
212            } = &mut command
213            {
214                for (table_id, snapshot_epoch) in
215                    &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
216                {
217                    for database in self.databases.values() {
218                        if let Some(database) = database.running_state()
219                            && database.database_info.contains_job(table_id.as_job_id())
220                        {
221                            if let Some(committed_epoch) = database.committed_epoch {
222                                *snapshot_epoch = Some(committed_epoch);
223                            }
224                            break;
225                        }
226                    }
227                    if snapshot_epoch.is_none() {
228                        let table_id = *table_id;
229                        warn!(
230                            ?cross_db_snapshot_backfill_info,
231                            ?table_id,
232                            ?info,
233                            "database of cross db upstream table not found"
234                        );
235                        let err: MetaError =
236                            anyhow!("database of cross db upstream table {} not found", table_id)
237                                .into();
238                        for notifier in notifiers {
239                            notifier.notify_start_failed(err.clone());
240                        }
241
242                        return Ok(());
243                    }
244                }
245            }
246
247            let database = match self.databases.entry(database_id) {
248                Entry::Occupied(entry) => entry
249                    .into_mut()
250                    .expect_running("should not have command when not running"),
251                Entry::Vacant(entry) => match &command {
252                    Command::CreateStreamingJob { info, job_type, .. } => {
253                        let CreateStreamingJobType::Normal = job_type else {
254                            if cfg!(debug_assertions) {
255                                panic!(
256                                    "unexpected first job of type {job_type:?} with info {info:?}"
257                                );
258                            } else {
259                                for notifier in notifiers {
260                                    notifier.notify_start_failed(anyhow!("unexpected job_type {job_type:?} for first job {} in database {database_id}", info.streaming_job.id()).into());
261                                }
262                                return Ok(());
263                            }
264                        };
265                        let new_database = DatabaseCheckpointControl::new(
266                            database_id,
267                            self.env.shared_actor_infos().clone(),
268                        );
269                        let adder = partial_graph_manager
270                            .add_partial_graph(to_partial_graph_id(database_id, None));
271                        adder.added();
272                        entry
273                            .insert(DatabaseCheckpointControlStatus::Running(new_database))
274                            .expect_running("just initialized as running")
275                    }
276                    Command::Flush
277                    | Command::Pause
278                    | Command::Resume
279                    | Command::DropStreamingJobs { .. }
280                    | Command::DropSubscription { .. } => {
281                        for mut notifier in notifiers {
282                            notifier.notify_started();
283                            notifier.notify_collected();
284                        }
285                        warn!(?command, "skip command for empty database");
286                        return Ok(());
287                    }
288                    Command::RescheduleIntent { .. }
289                    | Command::ReplaceStreamJob(_)
290                    | Command::SourceChangeSplit(_)
291                    | Command::Throttle { .. }
292                    | Command::CreateSubscription { .. }
293                    | Command::AlterSubscriptionRetention { .. }
294                    | Command::ConnectorPropsChange(_)
295                    | Command::Refresh { .. }
296                    | Command::ListFinish { .. }
297                    | Command::LoadFinish { .. }
298                    | Command::ResetSource { .. }
299                    | Command::ResumeBackfill { .. }
300                    | Command::InjectSourceOffsets { .. } => {
301                        if cfg!(debug_assertions) {
302                            panic!(
303                                "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
304                                database_id, command
305                            )
306                        } else {
307                            warn!(%database_id, ?command, "database not exist when handling command");
308                            for notifier in notifiers {
309                                notifier.notify_start_failed(anyhow!("database {database_id} not exist when handling command {command:?}").into());
310                            }
311                            return Ok(());
312                        }
313                    }
314                },
315            };
316
317            database.handle_new_barrier(
318                Some((command, notifiers)),
319                checkpoint,
320                span,
321                partial_graph_manager,
322                &self.hummock_version_stats,
323            )
324        } else {
325            let database = match self.databases.entry(database_id) {
326                Entry::Occupied(entry) => entry.into_mut(),
327                Entry::Vacant(_) => {
328                    // If it does not exist in the HashMap yet, it means that the first streaming
329                    // job has not been created, and we do not need to send a barrier.
330                    return Ok(());
331                }
332            };
333            let Some(database) = database.running_state_mut() else {
334                // Skip new barrier for database which is not running.
335                return Ok(());
336            };
337            if !database.can_inject_barrier(self.in_flight_barrier_nums) {
338                // Skip new barrier with no explicit command when the database should pause inject additional barrier
339                return Ok(());
340            }
341            database.handle_new_barrier(
342                None,
343                checkpoint,
344                span,
345                partial_graph_manager,
346                &self.hummock_version_stats,
347            )
348        }
349    }
350
351    pub(crate) fn update_barrier_nums_metrics(&self) {
352        self.databases
353            .values()
354            .flat_map(|database| database.running_state())
355            .for_each(|database| database.update_barrier_nums_metrics());
356    }
357
358    pub(crate) fn gen_backfill_progress(&self) -> HashMap<JobId, BackfillProgress> {
359        let mut progress = HashMap::new();
360        for status in self.databases.values() {
361            let Some(database_checkpoint_control) = status.running_state() else {
362                continue;
363            };
364            // Progress of normal backfill
365            progress.extend(
366                database_checkpoint_control
367                    .database_info
368                    .gen_backfill_progress(),
369            );
370            // Progress of snapshot backfill
371            for (job_id, creating_job) in
372                &database_checkpoint_control.creating_streaming_job_controls
373            {
374                progress.extend([(*job_id, creating_job.gen_backfill_progress())]);
375            }
376        }
377        progress
378    }
379
380    pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
381        let mut progress = Vec::new();
382        for status in self.databases.values() {
383            let Some(database_checkpoint_control) = status.running_state() else {
384                continue;
385            };
386            progress.extend(
387                database_checkpoint_control
388                    .database_info
389                    .gen_fragment_backfill_progress(),
390            );
391            for creating_job in database_checkpoint_control
392                .creating_streaming_job_controls
393                .values()
394            {
395                progress.extend(creating_job.gen_fragment_backfill_progress());
396            }
397        }
398        progress
399    }
400
401    pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
402        let mut progress = HashMap::new();
403        for status in self.databases.values() {
404            let Some(database_checkpoint_control) = status.running_state() else {
405                continue;
406            };
407            // Progress of normal backfill
408            progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
409        }
410        progress
411    }
412
413    pub(crate) fn databases_failed_at_worker_err(
414        &mut self,
415        worker_id: WorkerId,
416    ) -> impl Iterator<Item = DatabaseId> + '_ {
417        self.databases
418            .iter_mut()
419            .filter_map(
420                move |(database_id, database_status)| match database_status {
421                    DatabaseCheckpointControlStatus::Running(control) => {
422                        if !control.is_valid_after_worker_err(worker_id) {
423                            Some(*database_id)
424                        } else {
425                            None
426                        }
427                    }
428                    DatabaseCheckpointControlStatus::Recovering(state) => {
429                        if !state.is_valid_after_worker_err(worker_id) {
430                            Some(*database_id)
431                        } else {
432                            None
433                        }
434                    }
435                },
436            )
437    }
438
439    pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
440        for (_, node) in self.databases.values_mut().flat_map(|status| {
441            status
442                .running_state_mut()
443                .map(|database| take(&mut database.command_ctx_queue))
444                .into_iter()
445                .flatten()
446        }) {
447            for notifier in node.notifiers {
448                notifier.notify_collection_failed(err.clone());
449            }
450            node.enqueue_time.observe_duration();
451        }
452    }
453}
454
455pub(crate) enum CheckpointControlEvent<'a> {
456    EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
457    EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
458}
459
460impl CheckpointControl {
461    pub(crate) fn on_partial_graph_reset(
462        &mut self,
463        partial_graph_id: PartialGraphId,
464        reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
465    ) {
466        let (database_id, creating_job) = from_partial_graph_id(partial_graph_id);
467        match self.databases.get_mut(&database_id).expect("should exist") {
468            DatabaseCheckpointControlStatus::Running(database) => {
469                if let Some(creating_job_id) = creating_job {
470                    let Some(job) = database
471                        .creating_streaming_job_controls
472                        .remove(&creating_job_id)
473                    else {
474                        if cfg!(debug_assertions) {
475                            panic!(
476                                "receive reset partial graph resp on non-existing creating job {creating_job_id} in database {database_id}"
477                            )
478                        }
479                        warn!(
480                            %database_id,
481                            %creating_job_id,
482                            "ignore reset partial graph resp on non-existing creating job on running database"
483                        );
484                        return;
485                    };
486                    job.on_partial_graph_reset();
487                } else {
488                    unreachable!("should not receive reset database resp when database running")
489                }
490            }
491            DatabaseCheckpointControlStatus::Recovering(state) => {
492                state.on_partial_graph_reset(partial_graph_id, reset_resps);
493            }
494        }
495    }
496
497    pub(crate) fn on_partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
498        let (database_id, _) = from_partial_graph_id(partial_graph_id);
499        match self.databases.get_mut(&database_id).expect("should exist") {
500            DatabaseCheckpointControlStatus::Running(_) => {
501                unreachable!("should not have partial graph initialized when running")
502            }
503            DatabaseCheckpointControlStatus::Recovering(state) => {
504                state.partial_graph_initialized(partial_graph_id);
505            }
506        }
507    }
508
509    pub(crate) fn next_event(
510        &mut self,
511    ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
512        let mut this = Some(self);
513        poll_fn(move |cx| {
514            let Some(this_mut) = this.as_mut() else {
515                unreachable!("should not be polled after poll ready")
516            };
517            for (&database_id, database_status) in &mut this_mut.databases {
518                match database_status {
519                    DatabaseCheckpointControlStatus::Running(_) => {}
520                    DatabaseCheckpointControlStatus::Recovering(state) => {
521                        let poll_result = state.poll_next_event(cx);
522                        if let Poll::Ready(action) = poll_result {
523                            let this = this.take().expect("checked Some");
524                            return Poll::Ready(match action {
525                                RecoveringStateAction::EnterInitializing(reset_workers) => {
526                                    CheckpointControlEvent::EnteringInitializing(
527                                        this.new_database_status_action(
528                                            database_id,
529                                            EnterInitializing(reset_workers),
530                                        ),
531                                    )
532                                }
533                                RecoveringStateAction::EnterRunning => {
534                                    CheckpointControlEvent::EnteringRunning(
535                                        this.new_database_status_action(database_id, EnterRunning),
536                                    )
537                                }
538                            });
539                        }
540                    }
541                }
542            }
543            Poll::Pending
544        })
545    }
546}
547
548pub(crate) enum DatabaseCheckpointControlStatus {
549    Running(DatabaseCheckpointControl),
550    Recovering(DatabaseRecoveringState),
551}
552
553impl DatabaseCheckpointControlStatus {
554    fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
555        match self {
556            DatabaseCheckpointControlStatus::Running(state) => Some(state),
557            DatabaseCheckpointControlStatus::Recovering(_) => None,
558        }
559    }
560
561    fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
562        match self {
563            DatabaseCheckpointControlStatus::Running(state) => Some(state),
564            DatabaseCheckpointControlStatus::Recovering(_) => None,
565        }
566    }
567
568    fn may_have_snapshot_backfilling_jobs(&self) -> bool {
569        self.running_state()
570            .map(|database| !database.creating_streaming_job_controls.is_empty())
571            .unwrap_or(true) // there can be snapshot backfilling jobs when the database is recovering.
572    }
573
574    fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
575        match self {
576            DatabaseCheckpointControlStatus::Running(state) => state,
577            DatabaseCheckpointControlStatus::Recovering(_) => {
578                panic!("should be at running: {}", reason)
579            }
580        }
581    }
582}
583
584struct DatabaseCheckpointControlMetrics {
585    barrier_latency: LabelGuardedHistogram,
586    in_flight_barrier_nums: LabelGuardedIntGauge,
587    all_barrier_nums: LabelGuardedIntGauge,
588}
589
590impl DatabaseCheckpointControlMetrics {
591    fn new(database_id: DatabaseId) -> Self {
592        let database_id_str = database_id.to_string();
593        let barrier_latency = GLOBAL_META_METRICS
594            .barrier_latency
595            .with_guarded_label_values(&[&database_id_str]);
596        let in_flight_barrier_nums = GLOBAL_META_METRICS
597            .in_flight_barrier_nums
598            .with_guarded_label_values(&[&database_id_str]);
599        let all_barrier_nums = GLOBAL_META_METRICS
600            .all_barrier_nums
601            .with_guarded_label_values(&[&database_id_str]);
602        Self {
603            barrier_latency,
604            in_flight_barrier_nums,
605            all_barrier_nums,
606        }
607    }
608}
609
610/// Controls the concurrent execution of commands.
611pub(in crate::barrier) struct DatabaseCheckpointControl {
612    pub(super) database_id: DatabaseId,
613    pub(super) state: BarrierWorkerState,
614
615    /// Save the state and message of barrier in order.
616    /// Key is the `prev_epoch`.
617    command_ctx_queue: BTreeMap<u64, EpochNode>,
618    /// The barrier that are completing.
619    /// Some(`prev_epoch`)
620    completing_barrier: Option<u64>,
621
622    committed_epoch: Option<u64>,
623
624    pub(super) database_info: InflightDatabaseInfo,
625    pub creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
626
627    metrics: DatabaseCheckpointControlMetrics,
628}
629
630impl DatabaseCheckpointControl {
631    fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
632        Self {
633            database_id,
634            state: BarrierWorkerState::new(),
635            command_ctx_queue: Default::default(),
636            completing_barrier: None,
637            committed_epoch: None,
638            database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
639            creating_streaming_job_controls: Default::default(),
640            metrics: DatabaseCheckpointControlMetrics::new(database_id),
641        }
642    }
643
644    pub(crate) fn recovery(
645        database_id: DatabaseId,
646        state: BarrierWorkerState,
647        committed_epoch: u64,
648        database_info: InflightDatabaseInfo,
649        creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
650    ) -> Self {
651        Self {
652            database_id,
653            state,
654            command_ctx_queue: Default::default(),
655            completing_barrier: None,
656            committed_epoch: Some(committed_epoch),
657            database_info,
658            creating_streaming_job_controls,
659            metrics: DatabaseCheckpointControlMetrics::new(database_id),
660        }
661    }
662
663    pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
664        !self.database_info.contains_worker(worker_id as _)
665            && self
666                .creating_streaming_job_controls
667                .values()
668                .all(|job| job.is_valid_after_worker_err(worker_id))
669    }
670
671    fn total_command_num(&self) -> usize {
672        self.command_ctx_queue.len()
673            + match &self.completing_barrier {
674                Some(_) => 1,
675                None => 0,
676            }
677    }
678
679    /// Update the metrics of barrier nums.
680    fn update_barrier_nums_metrics(&self) {
681        self.metrics.in_flight_barrier_nums.set(
682            self.command_ctx_queue
683                .values()
684                .filter(|x| x.state.is_inflight())
685                .count() as i64,
686        );
687        self.metrics
688            .all_barrier_nums
689            .set(self.total_command_num() as i64);
690    }
691
692    /// Enqueue a barrier command
693    fn enqueue_command(
694        &mut self,
695        command_ctx: CommandContext,
696        notifiers: Vec<Notifier>,
697        creating_jobs_to_wait: HashSet<JobId>,
698    ) {
699        let timer = self.metrics.barrier_latency.start_timer();
700
701        if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
702            assert_eq!(
703                command_ctx.barrier_info.prev_epoch.value(),
704                node.command_ctx.barrier_info.curr_epoch.value()
705            );
706        }
707
708        tracing::trace!(
709            prev_epoch = command_ctx.barrier_info.prev_epoch(),
710            ?creating_jobs_to_wait,
711            "enqueue command"
712        );
713        self.command_ctx_queue.insert(
714            command_ctx.barrier_info.prev_epoch(),
715            EpochNode {
716                enqueue_time: timer,
717                state: BarrierEpochState {
718                    is_collected: false,
719                    resps: vec![],
720                    creating_jobs_to_wait,
721                    finished_jobs: HashMap::new(),
722                },
723                command_ctx,
724                notifiers,
725            },
726        );
727    }
728
729    /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
730    /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
731    fn barrier_collected(
732        &mut self,
733        partial_graph_id: PartialGraphId,
734        collected_barrier: CollectedBarrier,
735        periodic_barriers: &mut PeriodicBarriers,
736    ) -> MetaResult<()> {
737        let prev_epoch = collected_barrier.epoch.prev;
738        tracing::trace!(
739            prev_epoch,
740            partial_graph_id = %partial_graph_id,
741            "barrier collected"
742        );
743        let (database_id, creating_job_id) = from_partial_graph_id(partial_graph_id);
744        assert_eq!(self.database_id, database_id);
745        match creating_job_id {
746            None => {
747                if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
748                    assert!(!node.state.is_collected);
749                    node.state.is_collected = true;
750                    node.state
751                        .resps
752                        .extend(collected_barrier.resps.into_values());
753                } else {
754                    panic!(
755                        "collect barrier on non-existing barrier: {}, {:?}",
756                        prev_epoch, collected_barrier
757                    );
758                }
759            }
760            Some(creating_job_id) => {
761                let should_merge_to_upstream = self
762                    .creating_streaming_job_controls
763                    .get_mut(&creating_job_id)
764                    .expect("should exist")
765                    .collect(collected_barrier);
766                if should_merge_to_upstream {
767                    periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
768                }
769            }
770        }
771        Ok(())
772    }
773
774    /// Pause inject barrier until True.
775    fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
776        self.command_ctx_queue
777            .values()
778            .filter(|x| x.state.is_inflight())
779            .count()
780            < in_flight_barrier_nums
781    }
782}
783
784impl DatabaseCheckpointControl {
785    /// return creating job table fragment id -> (backfill progress epoch , {`upstream_mv_table_id`})
786    fn collect_backfill_pinned_upstream_log_epoch(
787        &self,
788    ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
789        self.creating_streaming_job_controls
790            .iter()
791            .map(|(job_id, creating_job)| (*job_id, creating_job.pinned_upstream_log_epoch()))
792            .collect()
793    }
794
795    fn collect_no_shuffle_fragment_relations_for_reschedule_check(
796        &self,
797    ) -> Vec<(FragmentId, FragmentId)> {
798        let mut no_shuffle_relations = Vec::new();
799        for fragment in self.database_info.fragment_infos() {
800            let downstream_fragment_id = fragment.fragment_id;
801            visit_stream_node_cont(&fragment.nodes, |node| {
802                if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
803                    && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
804                {
805                    no_shuffle_relations.push((merge.upstream_fragment_id, downstream_fragment_id));
806                }
807                true
808            });
809        }
810
811        for creating_job in self.creating_streaming_job_controls.values() {
812            creating_job.collect_no_shuffle_fragment_relations(&mut no_shuffle_relations);
813        }
814        no_shuffle_relations
815    }
816
817    fn collect_reschedule_blocked_jobs_for_creating_jobs_inflight(
818        &self,
819    ) -> MetaResult<HashSet<JobId>> {
820        let mut initial_blocked_fragment_ids = HashSet::new();
821        for creating_job in self.creating_streaming_job_controls.values() {
822            creating_job.collect_reschedule_blocked_fragment_ids(&mut initial_blocked_fragment_ids);
823        }
824
825        let mut blocked_fragment_ids = initial_blocked_fragment_ids.clone();
826        if !initial_blocked_fragment_ids.is_empty() {
827            let no_shuffle_relations =
828                self.collect_no_shuffle_fragment_relations_for_reschedule_check();
829            let (forward_edges, backward_edges) =
830                build_no_shuffle_fragment_graph_edges(no_shuffle_relations);
831            let initial_blocked_fragment_ids: Vec<_> =
832                initial_blocked_fragment_ids.iter().copied().collect();
833            for ensemble in find_no_shuffle_graphs(
834                &initial_blocked_fragment_ids,
835                &forward_edges,
836                &backward_edges,
837            )? {
838                blocked_fragment_ids.extend(ensemble.fragments());
839            }
840        }
841
842        let mut blocked_job_ids = HashSet::new();
843        blocked_job_ids.extend(
844            blocked_fragment_ids
845                .into_iter()
846                .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id)),
847        );
848        Ok(blocked_job_ids)
849    }
850
851    fn collect_reschedule_blocked_job_ids(
852        &self,
853        reschedules: &HashMap<FragmentId, Reschedule>,
854        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
855        blocked_job_ids: &HashSet<JobId>,
856    ) -> HashSet<JobId> {
857        let mut affected_fragment_ids: HashSet<FragmentId> = reschedules.keys().copied().collect();
858        affected_fragment_ids.extend(fragment_actors.keys().copied());
859        for reschedule in reschedules.values() {
860            affected_fragment_ids.extend(reschedule.downstream_fragment_ids.iter().copied());
861            affected_fragment_ids.extend(
862                reschedule
863                    .upstream_fragment_dispatcher_ids
864                    .iter()
865                    .map(|(fragment_id, _)| *fragment_id),
866            );
867        }
868
869        affected_fragment_ids
870            .into_iter()
871            .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id))
872            .filter(|job_id| blocked_job_ids.contains(job_id))
873            .collect()
874    }
875
876    fn next_complete_barrier_task(
877        &mut self,
878        task: &mut Option<CompleteBarrierTask>,
879        mut context: Option<(&mut PeriodicBarriers, &mut PartialGraphManager)>,
880        hummock_version_stats: &HummockVersionStats,
881    ) {
882        // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
883        let mut creating_jobs_task = vec![];
884        if let Some(committed_epoch) = self.committed_epoch {
885            // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
886            let mut finished_jobs = Vec::new();
887            let min_upstream_inflight_barrier = self
888                .command_ctx_queue
889                .first_key_value()
890                .map(|(epoch, _)| *epoch);
891            for (job_id, job) in &mut self.creating_streaming_job_controls {
892                if let Some((epoch, resps, status)) =
893                    job.start_completing(min_upstream_inflight_barrier, committed_epoch)
894                {
895                    let create_info = match status {
896                        CompleteJobType::First(info) => Some(info),
897                        CompleteJobType::Normal => None,
898                        CompleteJobType::Finished => {
899                            finished_jobs.push((*job_id, epoch, resps));
900                            continue;
901                        }
902                    };
903                    creating_jobs_task.push((*job_id, epoch, resps, create_info));
904                }
905            }
906            if !finished_jobs.is_empty()
907                && let Some((_, partial_graph_manager)) = &mut context
908            {
909                partial_graph_manager.remove_partial_graphs(
910                    finished_jobs
911                        .iter()
912                        .map(|(job_id, _, _)| to_partial_graph_id(self.database_id, Some(*job_id)))
913                        .collect(),
914                );
915            }
916            for (job_id, epoch, resps) in finished_jobs {
917                let epoch_state = &mut self
918                    .command_ctx_queue
919                    .get_mut(&epoch)
920                    .expect("should exist")
921                    .state;
922                assert!(epoch_state.creating_jobs_to_wait.remove(&job_id));
923                debug!(epoch, %job_id, "finish creating job");
924                // It's safe to remove the creating job, because on CompleteJobType::Finished,
925                // all previous barriers have been collected and completed.
926                let creating_streaming_job = self
927                    .creating_streaming_job_controls
928                    .remove(&job_id)
929                    .expect("should exist");
930                let tracking_job = creating_streaming_job.into_tracking_job();
931
932                assert!(
933                    epoch_state
934                        .finished_jobs
935                        .insert(job_id, (resps, tracking_job))
936                        .is_none()
937                );
938            }
939        }
940        assert!(self.completing_barrier.is_none());
941        while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
942            && !state.is_inflight()
943        {
944            {
945                let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
946                assert!(node.state.creating_jobs_to_wait.is_empty());
947                assert!(node.state.is_collected);
948
949                self.handle_refresh_table_info(task, &node);
950
951                self.database_info.apply_collected_command(
952                    &node.command_ctx.command,
953                    &node.state.resps,
954                    hummock_version_stats,
955                );
956                if !node.command_ctx.barrier_info.kind.is_checkpoint() {
957                    node.notifiers.into_iter().for_each(|notifier| {
958                        notifier.notify_collected();
959                    });
960                    if let Some((periodic_barriers, _)) = &mut context
961                        && self.database_info.has_pending_finished_jobs()
962                        && self
963                            .command_ctx_queue
964                            .values()
965                            .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
966                    {
967                        periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
968                    }
969                    continue;
970                }
971                let mut staging_commit_info = self.database_info.take_staging_commit_info();
972                node.state
973                    .finished_jobs
974                    .drain()
975                    .for_each(|(_job_id, (resps, tracking_job))| {
976                        node.state.resps.extend(resps);
977                        staging_commit_info.finished_jobs.push(tracking_job);
978                    });
979                let task = task.get_or_insert_default();
980                node.command_ctx.collect_commit_epoch_info(
981                    &mut task.commit_info,
982                    take(&mut node.state.resps),
983                    self.collect_backfill_pinned_upstream_log_epoch(),
984                );
985                self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
986                task.finished_jobs.extend(staging_commit_info.finished_jobs);
987                task.finished_cdc_table_backfill
988                    .extend(staging_commit_info.finished_cdc_table_backfill);
989                task.notifiers.extend(node.notifiers);
990                task.epoch_infos
991                    .try_insert(
992                        self.database_id,
993                        (Some((node.command_ctx, node.enqueue_time)), vec![]),
994                    )
995                    .expect("non duplicate");
996                task.commit_info
997                    .truncate_tables
998                    .extend(staging_commit_info.table_ids_to_truncate);
999                break;
1000            }
1001        }
1002        if !creating_jobs_task.is_empty() {
1003            let task = task.get_or_insert_default();
1004            for (job_id, epoch, resps, create_info) in creating_jobs_task {
1005                collect_creating_job_commit_epoch_info(
1006                    &mut task.commit_info,
1007                    epoch,
1008                    resps,
1009                    self.creating_streaming_job_controls[&job_id]
1010                        .state_table_ids()
1011                        .iter()
1012                        .copied(),
1013                    create_info.as_ref(),
1014                );
1015                let (_, creating_job_epochs) =
1016                    task.epoch_infos.entry(self.database_id).or_default();
1017                creating_job_epochs.push((job_id, epoch, create_info));
1018            }
1019        }
1020    }
1021
1022    fn ack_completed(
1023        &mut self,
1024        command_prev_epoch: Option<u64>,
1025        creating_job_epochs: Vec<(JobId, u64)>,
1026    ) {
1027        {
1028            if let Some(prev_epoch) = self.completing_barrier.take() {
1029                assert_eq!(command_prev_epoch, Some(prev_epoch));
1030                self.committed_epoch = Some(prev_epoch);
1031            } else {
1032                assert_eq!(command_prev_epoch, None);
1033            };
1034            for (job_id, epoch) in creating_job_epochs {
1035                self.creating_streaming_job_controls
1036                    .get_mut(&job_id)
1037                    .expect("should exist")
1038                    .ack_completed(epoch)
1039            }
1040        }
1041    }
1042
1043    fn handle_refresh_table_info(&self, task: &mut Option<CompleteBarrierTask>, node: &EpochNode) {
1044        let list_finished_info = node
1045            .state
1046            .resps
1047            .iter()
1048            .flat_map(|resp| resp.list_finished_sources.clone())
1049            .collect::<Vec<_>>();
1050        if !list_finished_info.is_empty() {
1051            let task = task.get_or_insert_default();
1052            task.list_finished_source_ids.extend(list_finished_info);
1053        }
1054
1055        let load_finished_info = node
1056            .state
1057            .resps
1058            .iter()
1059            .flat_map(|resp| resp.load_finished_sources.clone())
1060            .collect::<Vec<_>>();
1061        if !load_finished_info.is_empty() {
1062            let task = task.get_or_insert_default();
1063            task.load_finished_source_ids.extend(load_finished_info);
1064        }
1065
1066        let refresh_finished_table_ids: Vec<JobId> = node
1067            .state
1068            .resps
1069            .iter()
1070            .flat_map(|resp| {
1071                resp.refresh_finished_tables
1072                    .iter()
1073                    .map(|table_id| table_id.as_job_id())
1074            })
1075            .collect::<Vec<_>>();
1076        if !refresh_finished_table_ids.is_empty() {
1077            let task = task.get_or_insert_default();
1078            task.refresh_finished_table_job_ids
1079                .extend(refresh_finished_table_ids);
1080        }
1081    }
1082}
1083
1084/// The state and message of this barrier, a node for concurrent checkpoint.
1085struct EpochNode {
1086    /// Timer for recording barrier latency, taken after `complete_barriers`.
1087    enqueue_time: HistogramTimer,
1088
1089    /// Whether this barrier is in-flight or completed.
1090    state: BarrierEpochState,
1091
1092    /// Context of this command to generate barrier and do some post jobs.
1093    command_ctx: CommandContext,
1094    /// Notifiers of this barrier.
1095    notifiers: Vec<Notifier>,
1096}
1097
1098#[derive(Debug)]
1099/// The state of barrier.
1100struct BarrierEpochState {
1101    is_collected: bool,
1102
1103    resps: Vec<BarrierCompleteResponse>,
1104
1105    creating_jobs_to_wait: HashSet<JobId>,
1106
1107    finished_jobs: HashMap<JobId, (Vec<BarrierCompleteResponse>, TrackingJob)>,
1108}
1109
1110impl BarrierEpochState {
1111    fn is_inflight(&self) -> bool {
1112        !self.is_collected || !self.creating_jobs_to_wait.is_empty()
1113    }
1114}
1115
1116impl DatabaseCheckpointControl {
1117    /// Handle the new barrier from the scheduled queue and inject it.
1118    fn handle_new_barrier(
1119        &mut self,
1120        command: Option<(Command, Vec<Notifier>)>,
1121        checkpoint: bool,
1122        span: tracing::Span,
1123        partial_graph_manager: &mut PartialGraphManager,
1124        hummock_version_stats: &HummockVersionStats,
1125    ) -> MetaResult<()> {
1126        let curr_epoch = self.state.in_flight_prev_epoch().next();
1127
1128        let (command, mut notifiers) = if let Some((command, notifiers)) = command {
1129            (Some(command), notifiers)
1130        } else {
1131            (None, vec![])
1132        };
1133
1134        debug_assert!(
1135            !matches!(
1136                &command,
1137                Some(Command::RescheduleIntent {
1138                    reschedule_plan: None,
1139                    ..
1140                })
1141            ),
1142            "reschedule intent should be resolved before injection"
1143        );
1144
1145        if let Some(Command::DropStreamingJobs {
1146            streaming_job_ids, ..
1147        }) = &command
1148        {
1149            if streaming_job_ids.len() > 1 {
1150                for job_to_cancel in streaming_job_ids {
1151                    if self
1152                        .creating_streaming_job_controls
1153                        .contains_key(job_to_cancel)
1154                    {
1155                        warn!(
1156                            job_id = %job_to_cancel,
1157                            "ignore multi-job cancel command on creating snapshot backfill streaming job"
1158                        );
1159                        for notifier in notifiers {
1160                            notifier
1161                                .notify_start_failed(anyhow!("cannot cancel creating snapshot backfill streaming job with other jobs, \
1162                                the job will continue creating until created or recovery. Please cancel the snapshot backfilling job in a single DDL ").into());
1163                        }
1164                        return Ok(());
1165                    }
1166                }
1167            } else if let Some(job_to_drop) = streaming_job_ids.iter().next()
1168                && let Some(creating_job) =
1169                    self.creating_streaming_job_controls.get_mut(job_to_drop)
1170                && creating_job.drop(&mut notifiers, partial_graph_manager)
1171            {
1172                return Ok(());
1173            }
1174        }
1175
1176        if let Some(Command::Throttle { jobs, .. }) = &command
1177            && jobs.len() > 1
1178            && let Some(creating_job_id) = jobs
1179                .iter()
1180                .find(|job| self.creating_streaming_job_controls.contains_key(*job))
1181        {
1182            warn!(
1183                job_id = %creating_job_id,
1184                "ignore multi-job throttle command on creating snapshot backfill streaming job"
1185            );
1186            for notifier in notifiers {
1187                notifier
1188                    .notify_start_failed(anyhow!("cannot alter rate limit for snapshot backfill streaming job with other jobs, \
1189                                the original rate limit will be kept recovery.").into());
1190            }
1191            return Ok(());
1192        };
1193
1194        if let Some(Command::RescheduleIntent {
1195            reschedule_plan: Some(reschedule_plan),
1196            ..
1197        }) = &command
1198            && !self.creating_streaming_job_controls.is_empty()
1199        {
1200            let blocked_job_ids =
1201                self.collect_reschedule_blocked_jobs_for_creating_jobs_inflight()?;
1202            let blocked_reschedule_job_ids = self.collect_reschedule_blocked_job_ids(
1203                &reschedule_plan.reschedules,
1204                &reschedule_plan.fragment_actors,
1205                &blocked_job_ids,
1206            );
1207            if !blocked_reschedule_job_ids.is_empty() {
1208                warn!(
1209                    blocked_reschedule_job_ids = ?blocked_reschedule_job_ids,
1210                    "reject reschedule fragments related to creating unreschedulable backfill jobs"
1211                );
1212                for notifier in notifiers {
1213                    notifier.notify_start_failed(
1214                        anyhow!(
1215                            "cannot reschedule jobs {:?} when creating jobs with unreschedulable backfill fragments",
1216                            blocked_reschedule_job_ids
1217                        )
1218                        .into(),
1219                    );
1220                }
1221                return Ok(());
1222            }
1223        }
1224
1225        if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1226            && self.database_info.is_empty()
1227        {
1228            assert!(
1229                self.creating_streaming_job_controls.is_empty(),
1230                "should not have snapshot backfill job when there is no normal job in database"
1231            );
1232            // skip the command when there is nothing to do with the barrier
1233            for mut notifier in notifiers {
1234                notifier.notify_started();
1235                notifier.notify_collected();
1236            }
1237            return Ok(());
1238        };
1239
1240        if let Some(Command::CreateStreamingJob {
1241            job_type: CreateStreamingJobType::SnapshotBackfill(_),
1242            ..
1243        }) = &command
1244            && self.state.is_paused()
1245        {
1246            warn!("cannot create streaming job with snapshot backfill when paused");
1247            for notifier in notifiers {
1248                notifier.notify_start_failed(
1249                    anyhow!("cannot create streaming job with snapshot backfill when paused",)
1250                        .into(),
1251                );
1252            }
1253            return Ok(());
1254        }
1255
1256        let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1257        // Tracing related stuff
1258        barrier_info.prev_epoch.span().in_scope(|| {
1259                tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1260            });
1261        span.record("epoch", barrier_info.curr_epoch());
1262
1263        let ApplyCommandInfo {
1264            mv_subscription_max_retention,
1265            table_ids_to_commit,
1266            jobs_to_wait,
1267            command,
1268        } = match self.apply_command(
1269            command,
1270            &mut notifiers,
1271            &barrier_info,
1272            partial_graph_manager,
1273            hummock_version_stats,
1274        ) {
1275            Ok(info) => info,
1276            Err(err) => {
1277                for notifier in notifiers {
1278                    notifier.notify_start_failed(err.clone());
1279                }
1280                fail_point!("inject_barrier_err_success");
1281                return Err(err);
1282            }
1283        };
1284
1285        // Notify about the injection.
1286        notifiers.iter_mut().for_each(|n| n.notify_started());
1287
1288        let command_ctx = CommandContext::new(
1289            barrier_info,
1290            mv_subscription_max_retention,
1291            table_ids_to_commit,
1292            command,
1293            span,
1294        );
1295
1296        // Record the in-flight barrier.
1297        self.enqueue_command(command_ctx, notifiers, jobs_to_wait);
1298
1299        Ok(())
1300    }
1301}