risingwave_meta/barrier/checkpoint/
control.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::task::Poll;
19
20use anyhow::anyhow;
21use fail::fail_point;
22use risingwave_common::catalog::{DatabaseId, TableId};
23use risingwave_common::id::JobId;
24use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
25use risingwave_common::system_param::AdaptiveParallelismStrategy;
26use risingwave_common::util::epoch::EpochPair;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_meta_model::WorkerId;
29use risingwave_pb::common::WorkerNode;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::id::{FragmentId, PartialGraphId};
32use risingwave_pb::stream_plan::DispatcherType as PbDispatcherType;
33use risingwave_pb::stream_plan::stream_node::NodeBody;
34use risingwave_pb::stream_service::BarrierCompleteResponse;
35use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
36use tracing::{debug, warn};
37
38use crate::barrier::cdc_progress::CdcProgress;
39use crate::barrier::checkpoint::creating_job::CreatingStreamingJobControl;
40use crate::barrier::checkpoint::recovery::{
41    DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
42    RecoveringStateAction,
43};
44use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
45use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
46use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
47use crate::barrier::notifier::Notifier;
48use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager, PartialGraphStat};
49use crate::barrier::progress::TrackingJob;
50use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
51use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
52use crate::barrier::utils::collect_creating_job_commit_epoch_info;
53use crate::barrier::{
54    BackfillProgress, Command, CreateStreamingJobType, FragmentBackfillProgress, Reschedule,
55};
56use crate::controller::scale::{build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs};
57use crate::manager::MetaSrvEnv;
58use crate::model::ActorId;
59use crate::rpc::metrics::GLOBAL_META_METRICS;
60use crate::{MetaError, MetaResult};
61
62pub(crate) struct CheckpointControl {
63    pub(crate) env: MetaSrvEnv,
64    pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
65    pub(super) hummock_version_stats: HummockVersionStats,
66    /// The max barrier nums in flight
67    pub(crate) in_flight_barrier_nums: usize,
68}
69
70impl CheckpointControl {
71    pub fn new(env: MetaSrvEnv) -> Self {
72        Self {
73            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
74            env,
75            databases: Default::default(),
76            hummock_version_stats: Default::default(),
77        }
78    }
79
80    pub(crate) fn recover(
81        databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
82        failed_databases: HashMap<DatabaseId, HashSet<PartialGraphId>>, /* `database_id` -> set of resetting partial graph ids */
83        hummock_version_stats: HummockVersionStats,
84        env: MetaSrvEnv,
85    ) -> Self {
86        env.shared_actor_infos()
87            .retain_databases(databases.keys().chain(failed_databases.keys()).cloned());
88        Self {
89            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
90            env,
91            databases: databases
92                .into_iter()
93                .map(|(database_id, control)| {
94                    (
95                        database_id,
96                        DatabaseCheckpointControlStatus::Running(control),
97                    )
98                })
99                .chain(failed_databases.into_iter().map(
100                    |(database_id, resetting_partial_graphs)| {
101                        (
102                            database_id,
103                            DatabaseCheckpointControlStatus::Recovering(
104                                DatabaseRecoveringState::new_resetting(
105                                    database_id,
106                                    resetting_partial_graphs,
107                                ),
108                            ),
109                        )
110                    },
111                ))
112                .collect(),
113            hummock_version_stats,
114        }
115    }
116
117    pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
118        self.hummock_version_stats = output.hummock_version_stats;
119        for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
120            self.databases
121                .get_mut(&database_id)
122                .expect("should exist")
123                .expect_running("should have wait for completing command before enter recovery")
124                .ack_completed(command_prev_epoch, creating_job_epochs);
125        }
126    }
127
128    pub(crate) fn next_complete_barrier_task(
129        &mut self,
130        periodic_barriers: &mut PeriodicBarriers,
131        partial_graph_manager: &mut PartialGraphManager,
132    ) -> Option<CompleteBarrierTask> {
133        let mut task = None;
134        for database in self.databases.values_mut() {
135            let Some(database) = database.running_state_mut() else {
136                continue;
137            };
138            database.next_complete_barrier_task(
139                periodic_barriers,
140                partial_graph_manager,
141                &mut task,
142                &self.hummock_version_stats,
143            );
144        }
145        task
146    }
147
148    pub(crate) fn barrier_collected(
149        &mut self,
150        partial_graph_id: PartialGraphId,
151        collected_barrier: CollectedBarrier<'_>,
152        periodic_barriers: &mut PeriodicBarriers,
153    ) -> MetaResult<()> {
154        let (database_id, _) = from_partial_graph_id(partial_graph_id);
155        let database_status = self.databases.get_mut(&database_id).expect("should exist");
156        match database_status {
157            DatabaseCheckpointControlStatus::Running(database) => {
158                database.barrier_collected(partial_graph_id, collected_barrier, periodic_barriers)
159            }
160            DatabaseCheckpointControlStatus::Recovering(_) => {
161                if cfg!(debug_assertions) {
162                    panic!(
163                        "receive collected barrier {:?} on recovering database {} from partial graph {}",
164                        collected_barrier, database_id, partial_graph_id
165                    );
166                } else {
167                    warn!(?collected_barrier, %partial_graph_id, "ignore collected barrier on recovering database");
168                }
169                Ok(())
170            }
171        }
172    }
173
174    pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
175        self.databases.iter().filter_map(|(database_id, database)| {
176            database.running_state().is_none().then_some(*database_id)
177        })
178    }
179
180    pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
181        self.databases.iter().filter_map(|(database_id, database)| {
182            database.running_state().is_some().then_some(*database_id)
183        })
184    }
185
186    pub(crate) fn database_info(&self, database_id: DatabaseId) -> Option<&InflightDatabaseInfo> {
187        self.databases
188            .get(&database_id)
189            .and_then(|database| database.running_state())
190            .map(|database| &database.database_info)
191    }
192
193    pub(crate) fn may_have_snapshot_backfilling_jobs(&self) -> bool {
194        self.databases
195            .values()
196            .any(|database| database.may_have_snapshot_backfilling_jobs())
197    }
198
199    /// return Some(failed `database_id` -> `err`)
200    pub(crate) fn handle_new_barrier(
201        &mut self,
202        new_barrier: NewBarrier,
203        partial_graph_manager: &mut PartialGraphManager,
204        adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
205        worker_nodes: &HashMap<WorkerId, WorkerNode>,
206    ) -> MetaResult<()> {
207        let NewBarrier {
208            database_id,
209            command,
210            span,
211            checkpoint,
212        } = new_barrier;
213
214        if let Some((mut command, notifiers)) = command {
215            if let &mut Command::CreateStreamingJob {
216                ref mut cross_db_snapshot_backfill_info,
217                ref info,
218                ..
219            } = &mut command
220            {
221                for (table_id, snapshot_epoch) in
222                    &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
223                {
224                    for database in self.databases.values() {
225                        if let Some(database) = database.running_state()
226                            && database.database_info.contains_job(table_id.as_job_id())
227                        {
228                            if let Some(committed_epoch) = database.committed_epoch {
229                                *snapshot_epoch = Some(committed_epoch);
230                            }
231                            break;
232                        }
233                    }
234                    if snapshot_epoch.is_none() {
235                        let table_id = *table_id;
236                        warn!(
237                            ?cross_db_snapshot_backfill_info,
238                            ?table_id,
239                            ?info,
240                            "database of cross db upstream table not found"
241                        );
242                        let err: MetaError =
243                            anyhow!("database of cross db upstream table {} not found", table_id)
244                                .into();
245                        for notifier in notifiers {
246                            notifier.notify_start_failed(err.clone());
247                        }
248
249                        return Ok(());
250                    }
251                }
252            }
253
254            let database = match self.databases.entry(database_id) {
255                Entry::Occupied(entry) => entry
256                    .into_mut()
257                    .expect_running("should not have command when not running"),
258                Entry::Vacant(entry) => match &command {
259                    Command::CreateStreamingJob { info, job_type, .. } => {
260                        let CreateStreamingJobType::Normal = job_type else {
261                            if cfg!(debug_assertions) {
262                                panic!(
263                                    "unexpected first job of type {job_type:?} with info {info:?}"
264                                );
265                            } else {
266                                for notifier in notifiers {
267                                    notifier.notify_start_failed(anyhow!("unexpected job_type {job_type:?} for first job {} in database {database_id}", info.streaming_job.id()).into());
268                                }
269                                return Ok(());
270                            }
271                        };
272                        let new_database = DatabaseCheckpointControl::new(
273                            database_id,
274                            self.env.shared_actor_infos().clone(),
275                        );
276                        let adder = partial_graph_manager.add_partial_graph(
277                            to_partial_graph_id(database_id, None),
278                            DatabaseCheckpointControlMetrics::new(database_id),
279                        );
280                        adder.added();
281                        entry
282                            .insert(DatabaseCheckpointControlStatus::Running(new_database))
283                            .expect_running("just initialized as running")
284                    }
285                    Command::Flush
286                    | Command::Pause
287                    | Command::Resume
288                    | Command::DropStreamingJobs { .. }
289                    | Command::DropSubscription { .. } => {
290                        for mut notifier in notifiers {
291                            notifier.notify_started();
292                            notifier.notify_collected();
293                        }
294                        warn!(?command, "skip command for empty database");
295                        return Ok(());
296                    }
297                    Command::RescheduleIntent { .. }
298                    | Command::ReplaceStreamJob(_)
299                    | Command::SourceChangeSplit(_)
300                    | Command::Throttle { .. }
301                    | Command::CreateSubscription { .. }
302                    | Command::AlterSubscriptionRetention { .. }
303                    | Command::ConnectorPropsChange(_)
304                    | Command::Refresh { .. }
305                    | Command::ListFinish { .. }
306                    | Command::LoadFinish { .. }
307                    | Command::ResetSource { .. }
308                    | Command::ResumeBackfill { .. }
309                    | Command::InjectSourceOffsets { .. } => {
310                        if cfg!(debug_assertions) {
311                            panic!(
312                                "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
313                                database_id, command
314                            )
315                        } else {
316                            warn!(%database_id, ?command, "database not exist when handling command");
317                            for notifier in notifiers {
318                                notifier.notify_start_failed(anyhow!("database {database_id} not exist when handling command {command:?}").into());
319                            }
320                            return Ok(());
321                        }
322                    }
323                },
324            };
325
326            database.handle_new_barrier(
327                Some((command, notifiers)),
328                checkpoint,
329                span,
330                partial_graph_manager,
331                &self.hummock_version_stats,
332                adaptive_parallelism_strategy,
333                worker_nodes,
334            )
335        } else {
336            let database = match self.databases.entry(database_id) {
337                Entry::Occupied(entry) => entry.into_mut(),
338                Entry::Vacant(_) => {
339                    // If it does not exist in the HashMap yet, it means that the first streaming
340                    // job has not been created, and we do not need to send a barrier.
341                    return Ok(());
342                }
343            };
344            let Some(database) = database.running_state_mut() else {
345                // Skip new barrier for database which is not running.
346                return Ok(());
347            };
348            if !database.can_inject_barrier(self.in_flight_barrier_nums) {
349                // Skip new barrier with no explicit command when the database should pause inject additional barrier
350                return Ok(());
351            }
352            database.handle_new_barrier(
353                None,
354                checkpoint,
355                span,
356                partial_graph_manager,
357                &self.hummock_version_stats,
358                adaptive_parallelism_strategy,
359                worker_nodes,
360            )
361        }
362    }
363
364    pub(crate) fn gen_backfill_progress(&self) -> HashMap<JobId, BackfillProgress> {
365        let mut progress = HashMap::new();
366        for status in self.databases.values() {
367            let Some(database_checkpoint_control) = status.running_state() else {
368                continue;
369            };
370            // Progress of normal backfill
371            progress.extend(
372                database_checkpoint_control
373                    .database_info
374                    .gen_backfill_progress(),
375            );
376            // Progress of snapshot backfill
377            for (job_id, creating_job) in
378                &database_checkpoint_control.creating_streaming_job_controls
379            {
380                progress.extend([(*job_id, creating_job.gen_backfill_progress())]);
381            }
382        }
383        progress
384    }
385
386    pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
387        let mut progress = Vec::new();
388        for status in self.databases.values() {
389            let Some(database_checkpoint_control) = status.running_state() else {
390                continue;
391            };
392            progress.extend(
393                database_checkpoint_control
394                    .database_info
395                    .gen_fragment_backfill_progress(),
396            );
397            for creating_job in database_checkpoint_control
398                .creating_streaming_job_controls
399                .values()
400            {
401                progress.extend(creating_job.gen_fragment_backfill_progress());
402            }
403        }
404        progress
405    }
406
407    pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
408        let mut progress = HashMap::new();
409        for status in self.databases.values() {
410            let Some(database_checkpoint_control) = status.running_state() else {
411                continue;
412            };
413            // Progress of normal backfill
414            progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
415        }
416        progress
417    }
418
419    pub(crate) fn databases_failed_at_worker_err(
420        &mut self,
421        worker_id: WorkerId,
422    ) -> impl Iterator<Item = DatabaseId> + '_ {
423        self.databases
424            .iter_mut()
425            .filter_map(
426                move |(database_id, database_status)| match database_status {
427                    DatabaseCheckpointControlStatus::Running(control) => {
428                        if !control.is_valid_after_worker_err(worker_id) {
429                            Some(*database_id)
430                        } else {
431                            None
432                        }
433                    }
434                    DatabaseCheckpointControlStatus::Recovering(state) => {
435                        if !state.is_valid_after_worker_err(worker_id) {
436                            Some(*database_id)
437                        } else {
438                            None
439                        }
440                    }
441                },
442            )
443    }
444}
445
446pub(crate) enum CheckpointControlEvent<'a> {
447    EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
448    EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
449}
450
451impl CheckpointControl {
452    pub(crate) fn on_partial_graph_reset(
453        &mut self,
454        partial_graph_id: PartialGraphId,
455        reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
456    ) {
457        let (database_id, creating_job) = from_partial_graph_id(partial_graph_id);
458        match self.databases.get_mut(&database_id).expect("should exist") {
459            DatabaseCheckpointControlStatus::Running(database) => {
460                if let Some(creating_job_id) = creating_job {
461                    let Some(job) = database
462                        .creating_streaming_job_controls
463                        .remove(&creating_job_id)
464                    else {
465                        if cfg!(debug_assertions) {
466                            panic!(
467                                "receive reset partial graph resp on non-existing creating job {creating_job_id} in database {database_id}"
468                            )
469                        }
470                        warn!(
471                            %database_id,
472                            %creating_job_id,
473                            "ignore reset partial graph resp on non-existing creating job on running database"
474                        );
475                        return;
476                    };
477                    job.on_partial_graph_reset();
478                } else {
479                    unreachable!("should not receive reset database resp when database running")
480                }
481            }
482            DatabaseCheckpointControlStatus::Recovering(state) => {
483                state.on_partial_graph_reset(partial_graph_id, reset_resps);
484            }
485        }
486    }
487
488    pub(crate) fn on_partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
489        let (database_id, _) = from_partial_graph_id(partial_graph_id);
490        match self.databases.get_mut(&database_id).expect("should exist") {
491            DatabaseCheckpointControlStatus::Running(_) => {
492                unreachable!("should not have partial graph initialized when running")
493            }
494            DatabaseCheckpointControlStatus::Recovering(state) => {
495                state.partial_graph_initialized(partial_graph_id);
496            }
497        }
498    }
499
500    pub(crate) fn next_event(
501        &mut self,
502    ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
503        let mut this = Some(self);
504        poll_fn(move |cx| {
505            let Some(this_mut) = this.as_mut() else {
506                unreachable!("should not be polled after poll ready")
507            };
508            for (&database_id, database_status) in &mut this_mut.databases {
509                match database_status {
510                    DatabaseCheckpointControlStatus::Running(_) => {}
511                    DatabaseCheckpointControlStatus::Recovering(state) => {
512                        let poll_result = state.poll_next_event(cx);
513                        if let Poll::Ready(action) = poll_result {
514                            let this = this.take().expect("checked Some");
515                            return Poll::Ready(match action {
516                                RecoveringStateAction::EnterInitializing(reset_workers) => {
517                                    CheckpointControlEvent::EnteringInitializing(
518                                        this.new_database_status_action(
519                                            database_id,
520                                            EnterInitializing(reset_workers),
521                                        ),
522                                    )
523                                }
524                                RecoveringStateAction::EnterRunning => {
525                                    CheckpointControlEvent::EnteringRunning(
526                                        this.new_database_status_action(database_id, EnterRunning),
527                                    )
528                                }
529                            });
530                        }
531                    }
532                }
533            }
534            Poll::Pending
535        })
536    }
537}
538
539pub(crate) enum DatabaseCheckpointControlStatus {
540    Running(DatabaseCheckpointControl),
541    Recovering(DatabaseRecoveringState),
542}
543
544impl DatabaseCheckpointControlStatus {
545    fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
546        match self {
547            DatabaseCheckpointControlStatus::Running(state) => Some(state),
548            DatabaseCheckpointControlStatus::Recovering(_) => None,
549        }
550    }
551
552    fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
553        match self {
554            DatabaseCheckpointControlStatus::Running(state) => Some(state),
555            DatabaseCheckpointControlStatus::Recovering(_) => None,
556        }
557    }
558
559    fn may_have_snapshot_backfilling_jobs(&self) -> bool {
560        self.running_state()
561            .map(|database| !database.creating_streaming_job_controls.is_empty())
562            .unwrap_or(true) // there can be snapshot backfilling jobs when the database is recovering.
563    }
564
565    fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
566        match self {
567            DatabaseCheckpointControlStatus::Running(state) => state,
568            DatabaseCheckpointControlStatus::Recovering(_) => {
569                panic!("should be at running: {}", reason)
570            }
571        }
572    }
573}
574
575pub(in crate::barrier) struct DatabaseCheckpointControlMetrics {
576    barrier_latency: LabelGuardedHistogram,
577    in_flight_barrier_nums: LabelGuardedIntGauge,
578    all_barrier_nums: LabelGuardedIntGauge,
579}
580
581impl DatabaseCheckpointControlMetrics {
582    pub(in crate::barrier) fn new(database_id: DatabaseId) -> Self {
583        let database_id_str = database_id.to_string();
584        let barrier_latency = GLOBAL_META_METRICS
585            .barrier_latency
586            .with_guarded_label_values(&[&database_id_str]);
587        let in_flight_barrier_nums = GLOBAL_META_METRICS
588            .in_flight_barrier_nums
589            .with_guarded_label_values(&[&database_id_str]);
590        let all_barrier_nums = GLOBAL_META_METRICS
591            .all_barrier_nums
592            .with_guarded_label_values(&[&database_id_str]);
593        Self {
594            barrier_latency,
595            in_flight_barrier_nums,
596            all_barrier_nums,
597        }
598    }
599}
600
601impl PartialGraphStat for DatabaseCheckpointControlMetrics {
602    fn observe_barrier_latency(&self, _epoch: EpochPair, barrier_latency_secs: f64) {
603        self.barrier_latency.observe(barrier_latency_secs);
604    }
605
606    fn observe_barrier_num(&self, inflight_barrier_num: usize, collected_barrier_num: usize) {
607        self.in_flight_barrier_nums.set(inflight_barrier_num as _);
608        self.all_barrier_nums
609            .set((inflight_barrier_num + collected_barrier_num) as _);
610    }
611}
612
613/// Controls the concurrent execution of commands.
614pub(in crate::barrier) struct DatabaseCheckpointControl {
615    pub(super) database_id: DatabaseId,
616    partial_graph_id: PartialGraphId,
617    pub(super) state: BarrierWorkerState,
618
619    /// Save the state and message of barrier in order.
620    /// Key is the `prev_epoch`.
621    command_ctx_queue: BTreeMap<u64, BarrierEpochState>,
622    /// The barrier that are completing.
623    /// Some(`prev_epoch`)
624    completing_barrier: Option<u64>,
625
626    committed_epoch: Option<u64>,
627
628    pub(super) database_info: InflightDatabaseInfo,
629    pub creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
630}
631
632impl DatabaseCheckpointControl {
633    fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
634        Self {
635            database_id,
636            partial_graph_id: to_partial_graph_id(database_id, None),
637            state: BarrierWorkerState::new(),
638            command_ctx_queue: Default::default(),
639            completing_barrier: None,
640            committed_epoch: None,
641            database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
642            creating_streaming_job_controls: Default::default(),
643        }
644    }
645
646    pub(crate) fn recovery(
647        database_id: DatabaseId,
648        state: BarrierWorkerState,
649        committed_epoch: u64,
650        database_info: InflightDatabaseInfo,
651        creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
652    ) -> Self {
653        Self {
654            database_id,
655            partial_graph_id: to_partial_graph_id(database_id, None),
656            state,
657            command_ctx_queue: Default::default(),
658            completing_barrier: None,
659            committed_epoch: Some(committed_epoch),
660            database_info,
661            creating_streaming_job_controls,
662        }
663    }
664
665    pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
666        !self.database_info.contains_worker(worker_id as _)
667            && self
668                .creating_streaming_job_controls
669                .values()
670                .all(|job| job.is_valid_after_worker_err(worker_id))
671    }
672
673    /// Enqueue a barrier command
674    fn enqueue_command(&mut self, prev_epoch: u64, creating_jobs_to_wait: HashSet<JobId>) {
675        tracing::trace!(prev_epoch, ?creating_jobs_to_wait, "enqueue command");
676        self.command_ctx_queue.insert(
677            prev_epoch,
678            BarrierEpochState {
679                is_collected: false,
680                creating_jobs_to_wait,
681                finished_jobs: HashMap::new(),
682            },
683        );
684    }
685
686    /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
687    /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
688    fn barrier_collected(
689        &mut self,
690        partial_graph_id: PartialGraphId,
691        collected_barrier: CollectedBarrier<'_>,
692        periodic_barriers: &mut PeriodicBarriers,
693    ) -> MetaResult<()> {
694        let prev_epoch = collected_barrier.epoch.prev;
695        tracing::trace!(
696            prev_epoch,
697            partial_graph_id = %partial_graph_id,
698            "barrier collected"
699        );
700        let (database_id, creating_job_id) = from_partial_graph_id(partial_graph_id);
701        assert_eq!(self.database_id, database_id);
702        match creating_job_id {
703            None => {
704                if let Some(state) = self.command_ctx_queue.get_mut(&prev_epoch) {
705                    assert!(!state.is_collected);
706                    state.is_collected = true;
707                } else {
708                    panic!(
709                        "collect barrier on non-existing barrier: {}, {:?}",
710                        prev_epoch, collected_barrier
711                    );
712                }
713            }
714            Some(creating_job_id) => {
715                let should_merge_to_upstream = self
716                    .creating_streaming_job_controls
717                    .get_mut(&creating_job_id)
718                    .expect("should exist")
719                    .collect(collected_barrier);
720                if should_merge_to_upstream {
721                    periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
722                }
723            }
724        }
725        Ok(())
726    }
727
728    /// Pause inject barrier until True.
729    fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
730        self.command_ctx_queue
731            .values()
732            .filter(|state| state.is_inflight())
733            .count()
734            < in_flight_barrier_nums
735    }
736}
737
738impl DatabaseCheckpointControl {
739    /// return creating job table fragment id -> (backfill progress epoch , {`upstream_mv_table_id`})
740    fn collect_backfill_pinned_upstream_log_epoch(
741        &self,
742    ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
743        self.creating_streaming_job_controls
744            .iter()
745            .map(|(job_id, creating_job)| (*job_id, creating_job.pinned_upstream_log_epoch()))
746            .collect()
747    }
748
749    fn collect_no_shuffle_fragment_relations_for_reschedule_check(
750        &self,
751    ) -> Vec<(FragmentId, FragmentId)> {
752        let mut no_shuffle_relations = Vec::new();
753        for fragment in self.database_info.fragment_infos() {
754            let downstream_fragment_id = fragment.fragment_id;
755            visit_stream_node_cont(&fragment.nodes, |node| {
756                if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
757                    && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
758                {
759                    no_shuffle_relations.push((merge.upstream_fragment_id, downstream_fragment_id));
760                }
761                true
762            });
763        }
764
765        for creating_job in self.creating_streaming_job_controls.values() {
766            creating_job.collect_no_shuffle_fragment_relations(&mut no_shuffle_relations);
767        }
768        no_shuffle_relations
769    }
770
771    fn collect_reschedule_blocked_jobs_for_creating_jobs_inflight(
772        &self,
773    ) -> MetaResult<HashSet<JobId>> {
774        let mut initial_blocked_fragment_ids = HashSet::new();
775        for creating_job in self.creating_streaming_job_controls.values() {
776            creating_job.collect_reschedule_blocked_fragment_ids(&mut initial_blocked_fragment_ids);
777        }
778
779        let mut blocked_fragment_ids = initial_blocked_fragment_ids.clone();
780        if !initial_blocked_fragment_ids.is_empty() {
781            let no_shuffle_relations =
782                self.collect_no_shuffle_fragment_relations_for_reschedule_check();
783            let (forward_edges, backward_edges) =
784                build_no_shuffle_fragment_graph_edges(no_shuffle_relations);
785            let initial_blocked_fragment_ids: Vec<_> =
786                initial_blocked_fragment_ids.iter().copied().collect();
787            for ensemble in find_no_shuffle_graphs(
788                &initial_blocked_fragment_ids,
789                &forward_edges,
790                &backward_edges,
791            )? {
792                blocked_fragment_ids.extend(ensemble.fragments());
793            }
794        }
795
796        let mut blocked_job_ids = HashSet::new();
797        blocked_job_ids.extend(
798            blocked_fragment_ids
799                .into_iter()
800                .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id)),
801        );
802        Ok(blocked_job_ids)
803    }
804
805    fn collect_reschedule_blocked_job_ids(
806        &self,
807        reschedules: &HashMap<FragmentId, Reschedule>,
808        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
809        blocked_job_ids: &HashSet<JobId>,
810    ) -> HashSet<JobId> {
811        let mut affected_fragment_ids: HashSet<FragmentId> = reschedules.keys().copied().collect();
812        affected_fragment_ids.extend(fragment_actors.keys().copied());
813        for reschedule in reschedules.values() {
814            affected_fragment_ids.extend(reschedule.downstream_fragment_ids.iter().copied());
815            affected_fragment_ids.extend(
816                reschedule
817                    .upstream_fragment_dispatcher_ids
818                    .iter()
819                    .map(|(fragment_id, _)| *fragment_id),
820            );
821        }
822
823        affected_fragment_ids
824            .into_iter()
825            .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id))
826            .filter(|job_id| blocked_job_ids.contains(job_id))
827            .collect()
828    }
829
830    fn next_complete_barrier_task(
831        &mut self,
832        periodic_barriers: &mut PeriodicBarriers,
833        partial_graph_manager: &mut PartialGraphManager,
834        task: &mut Option<CompleteBarrierTask>,
835        hummock_version_stats: &HummockVersionStats,
836    ) {
837        // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
838        let mut creating_jobs_task = vec![];
839        if let Some(committed_epoch) = self.committed_epoch {
840            // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
841            let mut finished_jobs = Vec::new();
842            let min_upstream_inflight_barrier = self
843                .command_ctx_queue
844                .first_key_value()
845                .map(|(epoch, _)| *epoch);
846            for (job_id, job) in &mut self.creating_streaming_job_controls {
847                if let Some((epoch, resps, info, is_finish_epoch)) = job.start_completing(
848                    partial_graph_manager,
849                    min_upstream_inflight_barrier,
850                    committed_epoch,
851                ) {
852                    if is_finish_epoch {
853                        assert!(info.notifiers.is_empty());
854                        finished_jobs.push((*job_id, epoch, resps));
855                        continue;
856                    };
857                    creating_jobs_task.push((*job_id, epoch, resps, info));
858                }
859            }
860
861            if !finished_jobs.is_empty() {
862                partial_graph_manager.remove_partial_graphs(
863                    finished_jobs
864                        .iter()
865                        .map(|(job_id, ..)| to_partial_graph_id(self.database_id, Some(*job_id)))
866                        .collect(),
867                );
868            }
869            for (job_id, epoch, resps) in finished_jobs {
870                let epoch_state = &mut self
871                    .command_ctx_queue
872                    .get_mut(&epoch)
873                    .expect("should exist");
874                assert!(epoch_state.creating_jobs_to_wait.remove(&job_id));
875                debug!(epoch, %job_id, "finish creating job");
876                // It's safe to remove the creating job, because on CompleteJobType::Finished,
877                // all previous barriers have been collected and completed.
878                let creating_streaming_job = self
879                    .creating_streaming_job_controls
880                    .remove(&job_id)
881                    .expect("should exist");
882                let tracking_job = creating_streaming_job.into_tracking_job();
883
884                assert!(
885                    epoch_state
886                        .finished_jobs
887                        .insert(job_id, (resps, tracking_job))
888                        .is_none()
889                );
890            }
891        }
892        assert!(self.completing_barrier.is_none());
893        while let Some((_, state)) = self.command_ctx_queue.first_key_value()
894            && !state.is_inflight()
895        {
896            {
897                let (epoch, state) = self.command_ctx_queue.pop_first().expect("non-empty");
898                assert!(state.creating_jobs_to_wait.is_empty());
899                assert!(state.is_collected);
900
901                let (resps, mut info) =
902                    partial_graph_manager.take_collected_barrier(self.partial_graph_id, epoch);
903
904                self.handle_refresh_table_info(task, &resps);
905                self.database_info.apply_collected_command(
906                    &info.post_collect_command,
907                    &resps,
908                    hummock_version_stats,
909                );
910
911                let mut resps_to_commit = resps;
912                if !info.barrier_info.kind.is_checkpoint() {
913                    info.notifiers.drain(..).for_each(|notifier| {
914                        notifier.notify_collected();
915                    });
916                    if self.database_info.has_pending_finished_jobs()
917                        && !partial_graph_manager
918                            .has_pending_checkpoint_barrier(self.partial_graph_id)
919                    {
920                        periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
921                    }
922                    continue;
923                }
924                let mut staging_commit_info = self.database_info.take_staging_commit_info();
925                state
926                    .finished_jobs
927                    .into_iter()
928                    .for_each(|(_, (resps, tracking_job))| {
929                        resps_to_commit.extend(resps);
930                        staging_commit_info.finished_jobs.push(tracking_job);
931                    });
932                let task = task.get_or_insert_default();
933                Command::collect_commit_epoch_info(
934                    &self.database_info,
935                    &info,
936                    &mut task.commit_info,
937                    resps_to_commit,
938                    self.collect_backfill_pinned_upstream_log_epoch(),
939                );
940                self.completing_barrier = Some(info.barrier_info.prev_epoch());
941                task.finished_jobs.extend(staging_commit_info.finished_jobs);
942                task.finished_cdc_table_backfill
943                    .extend(staging_commit_info.finished_cdc_table_backfill);
944                task.epoch_infos
945                    .try_insert(self.partial_graph_id, info)
946                    .expect("non duplicate");
947                task.commit_info
948                    .truncate_tables
949                    .extend(staging_commit_info.table_ids_to_truncate);
950                break;
951            }
952        }
953        if !creating_jobs_task.is_empty() {
954            let task = task.get_or_insert_default();
955            for (job_id, epoch, resps, info) in creating_jobs_task {
956                collect_creating_job_commit_epoch_info(&mut task.commit_info, epoch, resps, &info);
957                task.epoch_infos
958                    .try_insert(to_partial_graph_id(self.database_id, Some(job_id)), info)
959                    .expect("non duplicate");
960            }
961        }
962    }
963
964    fn ack_completed(
965        &mut self,
966        command_prev_epoch: Option<u64>,
967        creating_job_epochs: Vec<(JobId, u64)>,
968    ) {
969        {
970            if let Some(prev_epoch) = self.completing_barrier.take() {
971                assert_eq!(command_prev_epoch, Some(prev_epoch));
972                self.committed_epoch = Some(prev_epoch);
973            } else {
974                assert_eq!(command_prev_epoch, None);
975            };
976            for (job_id, epoch) in creating_job_epochs {
977                self.creating_streaming_job_controls
978                    .get_mut(&job_id)
979                    .expect("should exist")
980                    .ack_completed(epoch)
981            }
982        }
983    }
984
985    fn handle_refresh_table_info(
986        &self,
987        task: &mut Option<CompleteBarrierTask>,
988        resps: &[BarrierCompleteResponse],
989    ) {
990        let list_finished_info = resps
991            .iter()
992            .flat_map(|resp| resp.list_finished_sources.clone())
993            .collect::<Vec<_>>();
994        if !list_finished_info.is_empty() {
995            let task = task.get_or_insert_default();
996            task.list_finished_source_ids.extend(list_finished_info);
997        }
998
999        let load_finished_info = resps
1000            .iter()
1001            .flat_map(|resp| resp.load_finished_sources.clone())
1002            .collect::<Vec<_>>();
1003        if !load_finished_info.is_empty() {
1004            let task = task.get_or_insert_default();
1005            task.load_finished_source_ids.extend(load_finished_info);
1006        }
1007
1008        let refresh_finished_table_ids: Vec<JobId> = resps
1009            .iter()
1010            .flat_map(|resp| {
1011                resp.refresh_finished_tables
1012                    .iter()
1013                    .map(|table_id| table_id.as_job_id())
1014            })
1015            .collect::<Vec<_>>();
1016        if !refresh_finished_table_ids.is_empty() {
1017            let task = task.get_or_insert_default();
1018            task.refresh_finished_table_job_ids
1019                .extend(refresh_finished_table_ids);
1020        }
1021    }
1022}
1023
1024#[derive(Debug)]
1025/// The state of barrier.
1026struct BarrierEpochState {
1027    is_collected: bool,
1028
1029    creating_jobs_to_wait: HashSet<JobId>,
1030
1031    finished_jobs: HashMap<JobId, (Vec<BarrierCompleteResponse>, TrackingJob)>,
1032}
1033
1034impl BarrierEpochState {
1035    fn is_inflight(&self) -> bool {
1036        !self.is_collected || !self.creating_jobs_to_wait.is_empty()
1037    }
1038}
1039
1040impl DatabaseCheckpointControl {
1041    /// Handle the new barrier from the scheduled queue and inject it.
1042    fn handle_new_barrier(
1043        &mut self,
1044        command: Option<(Command, Vec<Notifier>)>,
1045        checkpoint: bool,
1046        span: tracing::Span,
1047        partial_graph_manager: &mut PartialGraphManager,
1048        hummock_version_stats: &HummockVersionStats,
1049        adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
1050        worker_nodes: &HashMap<WorkerId, WorkerNode>,
1051    ) -> MetaResult<()> {
1052        let curr_epoch = self.state.in_flight_prev_epoch().next();
1053
1054        let (command, mut notifiers) = if let Some((command, notifiers)) = command {
1055            (Some(command), notifiers)
1056        } else {
1057            (None, vec![])
1058        };
1059
1060        debug_assert!(
1061            !matches!(
1062                &command,
1063                Some(Command::RescheduleIntent {
1064                    reschedule_plan: None,
1065                    ..
1066                })
1067            ),
1068            "reschedule intent should be resolved before injection"
1069        );
1070
1071        if let Some(Command::DropStreamingJobs {
1072            streaming_job_ids, ..
1073        }) = &command
1074        {
1075            if streaming_job_ids.len() > 1 {
1076                for job_to_cancel in streaming_job_ids {
1077                    if self
1078                        .creating_streaming_job_controls
1079                        .contains_key(job_to_cancel)
1080                    {
1081                        warn!(
1082                            job_id = %job_to_cancel,
1083                            "ignore multi-job cancel command on creating snapshot backfill streaming job"
1084                        );
1085                        for notifier in notifiers {
1086                            notifier
1087                                .notify_start_failed(anyhow!("cannot cancel creating snapshot backfill streaming job with other jobs, \
1088                                the job will continue creating until created or recovery. Please cancel the snapshot backfilling job in a single DDL ").into());
1089                        }
1090                        return Ok(());
1091                    }
1092                }
1093            } else if let Some(job_to_drop) = streaming_job_ids.iter().next()
1094                && let Some(creating_job) =
1095                    self.creating_streaming_job_controls.get_mut(job_to_drop)
1096                && creating_job.drop(&mut notifiers, partial_graph_manager)
1097            {
1098                return Ok(());
1099            }
1100        }
1101
1102        if let Some(Command::Throttle { jobs, .. }) = &command
1103            && jobs.len() > 1
1104            && let Some(creating_job_id) = jobs
1105                .iter()
1106                .find(|job| self.creating_streaming_job_controls.contains_key(*job))
1107        {
1108            warn!(
1109                job_id = %creating_job_id,
1110                "ignore multi-job throttle command on creating snapshot backfill streaming job"
1111            );
1112            for notifier in notifiers {
1113                notifier
1114                    .notify_start_failed(anyhow!("cannot alter rate limit for snapshot backfill streaming job with other jobs, \
1115                                the original rate limit will be kept recovery.").into());
1116            }
1117            return Ok(());
1118        };
1119
1120        if let Some(Command::RescheduleIntent {
1121            reschedule_plan: Some(reschedule_plan),
1122            ..
1123        }) = &command
1124            && !self.creating_streaming_job_controls.is_empty()
1125        {
1126            let blocked_job_ids =
1127                self.collect_reschedule_blocked_jobs_for_creating_jobs_inflight()?;
1128            let blocked_reschedule_job_ids = self.collect_reschedule_blocked_job_ids(
1129                &reschedule_plan.reschedules,
1130                &reschedule_plan.fragment_actors,
1131                &blocked_job_ids,
1132            );
1133            if !blocked_reschedule_job_ids.is_empty() {
1134                warn!(
1135                    blocked_reschedule_job_ids = ?blocked_reschedule_job_ids,
1136                    "reject reschedule fragments related to creating unreschedulable backfill jobs"
1137                );
1138                for notifier in notifiers {
1139                    notifier.notify_start_failed(
1140                        anyhow!(
1141                            "cannot reschedule jobs {:?} when creating jobs with unreschedulable backfill fragments",
1142                            blocked_reschedule_job_ids
1143                        )
1144                        .into(),
1145                    );
1146                }
1147                return Ok(());
1148            }
1149        }
1150
1151        if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1152            && self.database_info.is_empty()
1153        {
1154            assert!(
1155                self.creating_streaming_job_controls.is_empty(),
1156                "should not have snapshot backfill job when there is no normal job in database"
1157            );
1158            // skip the command when there is nothing to do with the barrier
1159            for mut notifier in notifiers {
1160                notifier.notify_started();
1161                notifier.notify_collected();
1162            }
1163            return Ok(());
1164        };
1165
1166        if let Some(Command::CreateStreamingJob {
1167            job_type: CreateStreamingJobType::SnapshotBackfill(_),
1168            ..
1169        }) = &command
1170            && self.state.is_paused()
1171        {
1172            warn!("cannot create streaming job with snapshot backfill when paused");
1173            for notifier in notifiers {
1174                notifier.notify_start_failed(
1175                    anyhow!("cannot create streaming job with snapshot backfill when paused",)
1176                        .into(),
1177                );
1178            }
1179            return Ok(());
1180        }
1181
1182        let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1183        // Tracing related stuff
1184        barrier_info.prev_epoch.span().in_scope(|| {
1185                tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1186            });
1187        span.record("epoch", barrier_info.curr_epoch());
1188
1189        let prev_epoch = barrier_info.prev_epoch();
1190        let ApplyCommandInfo { jobs_to_wait } = match self.apply_command(
1191            command,
1192            &mut notifiers,
1193            barrier_info,
1194            partial_graph_manager,
1195            hummock_version_stats,
1196            adaptive_parallelism_strategy,
1197            worker_nodes,
1198        ) {
1199            Ok(info) => {
1200                assert!(notifiers.is_empty());
1201                info
1202            }
1203            Err(err) => {
1204                for notifier in notifiers {
1205                    notifier.notify_start_failed(err.clone());
1206                }
1207                fail_point!("inject_barrier_err_success");
1208                return Err(err);
1209            }
1210        };
1211
1212        // Record the in-flight barrier.
1213        self.enqueue_command(prev_epoch, jobs_to_wait);
1214
1215        Ok(())
1216    }
1217}