risingwave_meta/barrier/checkpoint/
control.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::ops::Bound::{Excluded, Unbounded};
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use itertools::Itertools;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_common::system_param::AdaptiveParallelismStrategy;
28use risingwave_common::util::epoch::EpochPair;
29use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
30use risingwave_meta_model::WorkerId;
31use risingwave_pb::common::WorkerNode;
32use risingwave_pb::hummock::HummockVersionStats;
33use risingwave_pb::id::{FragmentId, PartialGraphId};
34use risingwave_pb::stream_plan::DispatcherType as PbDispatcherType;
35use risingwave_pb::stream_plan::stream_node::NodeBody;
36use risingwave_pb::stream_service::BarrierCompleteResponse;
37use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
38use tracing::{debug, warn};
39
40use crate::barrier::cdc_progress::CdcProgress;
41use crate::barrier::checkpoint::creating_job::CreatingStreamingJobControl;
42use crate::barrier::checkpoint::recovery::{
43    DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
44    RecoveringStateAction,
45};
46use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
47use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
48use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
49use crate::barrier::notifier::Notifier;
50use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager, PartialGraphStat};
51use crate::barrier::progress::TrackingJob;
52use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
53use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
54use crate::barrier::utils::{BarrierItemCollector, collect_creating_job_commit_epoch_info};
55use crate::barrier::{
56    BackfillProgress, Command, CreateStreamingJobType, FragmentBackfillProgress, Reschedule,
57};
58use crate::controller::scale::{build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs};
59use crate::manager::MetaSrvEnv;
60use crate::model::ActorId;
61use crate::rpc::metrics::GLOBAL_META_METRICS;
62use crate::{MetaError, MetaResult};
63
64pub(crate) struct CheckpointControl {
65    pub(crate) env: MetaSrvEnv,
66    pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
67    pub(super) hummock_version_stats: HummockVersionStats,
68    /// The max barrier nums in flight
69    pub(crate) in_flight_barrier_nums: usize,
70}
71
72impl CheckpointControl {
73    pub fn new(env: MetaSrvEnv) -> Self {
74        Self {
75            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
76            env,
77            databases: Default::default(),
78            hummock_version_stats: Default::default(),
79        }
80    }
81
82    pub(crate) fn recover(
83        databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
84        failed_databases: HashMap<DatabaseId, HashSet<PartialGraphId>>, /* `database_id` -> set of resetting partial graph ids */
85        hummock_version_stats: HummockVersionStats,
86        env: MetaSrvEnv,
87    ) -> Self {
88        env.shared_actor_infos()
89            .retain_databases(databases.keys().chain(failed_databases.keys()).cloned());
90        Self {
91            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
92            env,
93            databases: databases
94                .into_iter()
95                .map(|(database_id, control)| {
96                    (
97                        database_id,
98                        DatabaseCheckpointControlStatus::Running(control),
99                    )
100                })
101                .chain(failed_databases.into_iter().map(
102                    |(database_id, resetting_partial_graphs)| {
103                        (
104                            database_id,
105                            DatabaseCheckpointControlStatus::Recovering(
106                                DatabaseRecoveringState::new_resetting(
107                                    database_id,
108                                    resetting_partial_graphs,
109                                ),
110                            ),
111                        )
112                    },
113                ))
114                .collect(),
115            hummock_version_stats,
116        }
117    }
118
119    pub(crate) fn ack_completed(
120        &mut self,
121        partial_graph_manager: &mut PartialGraphManager,
122        output: BarrierCompleteOutput,
123    ) {
124        self.hummock_version_stats = output.hummock_version_stats;
125        for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
126            self.databases
127                .get_mut(&database_id)
128                .expect("should exist")
129                .expect_running("should have wait for completing command before enter recovery")
130                .ack_completed(
131                    partial_graph_manager,
132                    command_prev_epoch,
133                    creating_job_epochs,
134                );
135        }
136    }
137
138    pub(crate) fn next_complete_barrier_task(
139        &mut self,
140        periodic_barriers: &mut PeriodicBarriers,
141        partial_graph_manager: &mut PartialGraphManager,
142    ) -> Option<CompleteBarrierTask> {
143        let mut task = None;
144        for database in self.databases.values_mut() {
145            let Some(database) = database.running_state_mut() else {
146                continue;
147            };
148            database.next_complete_barrier_task(
149                periodic_barriers,
150                partial_graph_manager,
151                &mut task,
152                &self.hummock_version_stats,
153            );
154        }
155        task
156    }
157
158    pub(crate) fn barrier_collected(
159        &mut self,
160        partial_graph_id: PartialGraphId,
161        collected_barrier: CollectedBarrier<'_>,
162        periodic_barriers: &mut PeriodicBarriers,
163    ) -> MetaResult<()> {
164        let (database_id, _) = from_partial_graph_id(partial_graph_id);
165        let database_status = self.databases.get_mut(&database_id).expect("should exist");
166        match database_status {
167            DatabaseCheckpointControlStatus::Running(database) => {
168                database.barrier_collected(partial_graph_id, collected_barrier, periodic_barriers)
169            }
170            DatabaseCheckpointControlStatus::Recovering(_) => {
171                if cfg!(debug_assertions) {
172                    panic!(
173                        "receive collected barrier {:?} on recovering database {} from partial graph {}",
174                        collected_barrier, database_id, partial_graph_id
175                    );
176                } else {
177                    warn!(?collected_barrier, %partial_graph_id, "ignore collected barrier on recovering database");
178                }
179                Ok(())
180            }
181        }
182    }
183
184    pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
185        self.databases.iter().filter_map(|(database_id, database)| {
186            database.running_state().is_none().then_some(*database_id)
187        })
188    }
189
190    pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
191        self.databases.iter().filter_map(|(database_id, database)| {
192            database.running_state().is_some().then_some(*database_id)
193        })
194    }
195
196    pub(crate) fn database_info(&self, database_id: DatabaseId) -> Option<&InflightDatabaseInfo> {
197        self.databases
198            .get(&database_id)
199            .and_then(|database| database.running_state())
200            .map(|database| &database.database_info)
201    }
202
203    pub(crate) fn may_have_snapshot_backfilling_jobs(&self) -> bool {
204        self.databases
205            .values()
206            .any(|database| database.may_have_snapshot_backfilling_jobs())
207    }
208
209    /// return Some(failed `database_id` -> `err`)
210    pub(crate) fn handle_new_barrier(
211        &mut self,
212        new_barrier: NewBarrier,
213        partial_graph_manager: &mut PartialGraphManager,
214        adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
215        worker_nodes: &HashMap<WorkerId, WorkerNode>,
216    ) -> MetaResult<()> {
217        let NewBarrier {
218            database_id,
219            command,
220            span,
221            checkpoint,
222        } = new_barrier;
223
224        if let Some((mut command, notifiers)) = command {
225            if let &mut Command::CreateStreamingJob {
226                ref mut cross_db_snapshot_backfill_info,
227                ref info,
228                ..
229            } = &mut command
230            {
231                for (table_id, snapshot_epoch) in
232                    &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
233                {
234                    for database in self.databases.values() {
235                        if let Some(database) = database.running_state()
236                            && database.database_info.contains_job(table_id.as_job_id())
237                        {
238                            if let Some(committed_epoch) = database.committed_epoch {
239                                *snapshot_epoch = Some(committed_epoch);
240                            }
241                            break;
242                        }
243                    }
244                    if snapshot_epoch.is_none() {
245                        let table_id = *table_id;
246                        warn!(
247                            ?cross_db_snapshot_backfill_info,
248                            ?table_id,
249                            ?info,
250                            "database of cross db upstream table not found"
251                        );
252                        let err: MetaError =
253                            anyhow!("database of cross db upstream table {} not found", table_id)
254                                .into();
255                        for notifier in notifiers {
256                            notifier.notify_start_failed(err.clone());
257                        }
258
259                        return Ok(());
260                    }
261                }
262            }
263
264            let database = match self.databases.entry(database_id) {
265                Entry::Occupied(entry) => entry
266                    .into_mut()
267                    .expect_running("should not have command when not running"),
268                Entry::Vacant(entry) => match &command {
269                    Command::CreateStreamingJob { info, job_type, .. } => {
270                        let CreateStreamingJobType::Normal = job_type else {
271                            if cfg!(debug_assertions) {
272                                panic!(
273                                    "unexpected first job of type {job_type:?} with info {info:?}"
274                                );
275                            } else {
276                                for notifier in notifiers {
277                                    notifier.notify_start_failed(anyhow!("unexpected job_type {job_type:?} for first job {} in database {database_id}", info.streaming_job.id()).into());
278                                }
279                                return Ok(());
280                            }
281                        };
282                        let new_database = DatabaseCheckpointControl::new(
283                            database_id,
284                            self.env.shared_actor_infos().clone(),
285                        );
286                        let adder = partial_graph_manager.add_partial_graph(
287                            to_partial_graph_id(database_id, None),
288                            DatabaseCheckpointControlMetrics::new(database_id),
289                        );
290                        adder.added();
291                        entry
292                            .insert(DatabaseCheckpointControlStatus::Running(new_database))
293                            .expect_running("just initialized as running")
294                    }
295                    Command::Flush
296                    | Command::Pause
297                    | Command::Resume
298                    | Command::DropStreamingJobs { .. }
299                    | Command::DropSubscription { .. } => {
300                        for mut notifier in notifiers {
301                            notifier.notify_started();
302                            notifier.notify_collected();
303                        }
304                        warn!(?command, "skip command for empty database");
305                        return Ok(());
306                    }
307                    Command::RescheduleIntent { .. }
308                    | Command::ReplaceStreamJob(_)
309                    | Command::SourceChangeSplit(_)
310                    | Command::Throttle { .. }
311                    | Command::CreateSubscription { .. }
312                    | Command::AlterSubscriptionRetention { .. }
313                    | Command::ConnectorPropsChange(_)
314                    | Command::Refresh { .. }
315                    | Command::ListFinish { .. }
316                    | Command::LoadFinish { .. }
317                    | Command::ResetSource { .. }
318                    | Command::ResumeBackfill { .. }
319                    | Command::InjectSourceOffsets { .. } => {
320                        if cfg!(debug_assertions) {
321                            panic!(
322                                "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
323                                database_id, command
324                            )
325                        } else {
326                            warn!(%database_id, ?command, "database not exist when handling command");
327                            for notifier in notifiers {
328                                notifier.notify_start_failed(anyhow!("database {database_id} not exist when handling command {command:?}").into());
329                            }
330                            return Ok(());
331                        }
332                    }
333                },
334            };
335
336            database.handle_new_barrier(
337                Some((command, notifiers)),
338                checkpoint,
339                span,
340                partial_graph_manager,
341                &self.hummock_version_stats,
342                adaptive_parallelism_strategy,
343                worker_nodes,
344            )
345        } else {
346            let database = match self.databases.entry(database_id) {
347                Entry::Occupied(entry) => entry.into_mut(),
348                Entry::Vacant(_) => {
349                    // If it does not exist in the HashMap yet, it means that the first streaming
350                    // job has not been created, and we do not need to send a barrier.
351                    return Ok(());
352                }
353            };
354            let Some(database) = database.running_state_mut() else {
355                // Skip new barrier for database which is not running.
356                return Ok(());
357            };
358            if partial_graph_manager.inflight_barrier_num(database.partial_graph_id)
359                >= self.in_flight_barrier_nums
360            {
361                // Skip new barrier with no explicit command when the database should pause inject additional barrier
362                return Ok(());
363            }
364            database.handle_new_barrier(
365                None,
366                checkpoint,
367                span,
368                partial_graph_manager,
369                &self.hummock_version_stats,
370                adaptive_parallelism_strategy,
371                worker_nodes,
372            )
373        }
374    }
375
376    pub(crate) fn gen_backfill_progress(&self) -> HashMap<JobId, BackfillProgress> {
377        let mut progress = HashMap::new();
378        for status in self.databases.values() {
379            let Some(database_checkpoint_control) = status.running_state() else {
380                continue;
381            };
382            // Progress of normal backfill
383            progress.extend(
384                database_checkpoint_control
385                    .database_info
386                    .gen_backfill_progress(),
387            );
388            // Progress of snapshot backfill
389            for (job_id, creating_job) in
390                &database_checkpoint_control.creating_streaming_job_controls
391            {
392                progress.extend([(*job_id, creating_job.gen_backfill_progress())]);
393            }
394        }
395        progress
396    }
397
398    pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
399        let mut progress = Vec::new();
400        for status in self.databases.values() {
401            let Some(database_checkpoint_control) = status.running_state() else {
402                continue;
403            };
404            progress.extend(
405                database_checkpoint_control
406                    .database_info
407                    .gen_fragment_backfill_progress(),
408            );
409            for creating_job in database_checkpoint_control
410                .creating_streaming_job_controls
411                .values()
412            {
413                progress.extend(creating_job.gen_fragment_backfill_progress());
414            }
415        }
416        progress
417    }
418
419    pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
420        let mut progress = HashMap::new();
421        for status in self.databases.values() {
422            let Some(database_checkpoint_control) = status.running_state() else {
423                continue;
424            };
425            // Progress of normal backfill
426            progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
427        }
428        progress
429    }
430
431    pub(crate) fn databases_failed_at_worker_err(
432        &mut self,
433        worker_id: WorkerId,
434    ) -> impl Iterator<Item = DatabaseId> + '_ {
435        self.databases
436            .iter_mut()
437            .filter_map(
438                move |(database_id, database_status)| match database_status {
439                    DatabaseCheckpointControlStatus::Running(control) => {
440                        if !control.is_valid_after_worker_err(worker_id) {
441                            Some(*database_id)
442                        } else {
443                            None
444                        }
445                    }
446                    DatabaseCheckpointControlStatus::Recovering(state) => {
447                        if !state.is_valid_after_worker_err(worker_id) {
448                            Some(*database_id)
449                        } else {
450                            None
451                        }
452                    }
453                },
454            )
455    }
456}
457
458pub(crate) enum CheckpointControlEvent<'a> {
459    EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
460    EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
461}
462
463impl CheckpointControl {
464    pub(crate) fn on_partial_graph_reset(
465        &mut self,
466        partial_graph_id: PartialGraphId,
467        reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
468    ) {
469        let (database_id, creating_job) = from_partial_graph_id(partial_graph_id);
470        match self.databases.get_mut(&database_id).expect("should exist") {
471            DatabaseCheckpointControlStatus::Running(database) => {
472                if let Some(creating_job_id) = creating_job {
473                    let Some(job) = database
474                        .creating_streaming_job_controls
475                        .remove(&creating_job_id)
476                    else {
477                        if cfg!(debug_assertions) {
478                            panic!(
479                                "receive reset partial graph resp on non-existing creating job {creating_job_id} in database {database_id}"
480                            )
481                        }
482                        warn!(
483                            %database_id,
484                            %creating_job_id,
485                            "ignore reset partial graph resp on non-existing creating job on running database"
486                        );
487                        return;
488                    };
489                    job.on_partial_graph_reset();
490                } else {
491                    unreachable!("should not receive reset database resp when database running")
492                }
493            }
494            DatabaseCheckpointControlStatus::Recovering(state) => {
495                state.on_partial_graph_reset(partial_graph_id, reset_resps);
496            }
497        }
498    }
499
500    pub(crate) fn on_partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
501        let (database_id, _) = from_partial_graph_id(partial_graph_id);
502        match self.databases.get_mut(&database_id).expect("should exist") {
503            DatabaseCheckpointControlStatus::Running(_) => {
504                unreachable!("should not have partial graph initialized when running")
505            }
506            DatabaseCheckpointControlStatus::Recovering(state) => {
507                state.partial_graph_initialized(partial_graph_id);
508            }
509        }
510    }
511
512    pub(crate) fn next_event(
513        &mut self,
514    ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
515        let mut this = Some(self);
516        poll_fn(move |cx| {
517            let Some(this_mut) = this.as_mut() else {
518                unreachable!("should not be polled after poll ready")
519            };
520            for (&database_id, database_status) in &mut this_mut.databases {
521                match database_status {
522                    DatabaseCheckpointControlStatus::Running(_) => {}
523                    DatabaseCheckpointControlStatus::Recovering(state) => {
524                        let poll_result = state.poll_next_event(cx);
525                        if let Poll::Ready(action) = poll_result {
526                            let this = this.take().expect("checked Some");
527                            return Poll::Ready(match action {
528                                RecoveringStateAction::EnterInitializing(reset_workers) => {
529                                    CheckpointControlEvent::EnteringInitializing(
530                                        this.new_database_status_action(
531                                            database_id,
532                                            EnterInitializing(reset_workers),
533                                        ),
534                                    )
535                                }
536                                RecoveringStateAction::EnterRunning => {
537                                    CheckpointControlEvent::EnteringRunning(
538                                        this.new_database_status_action(database_id, EnterRunning),
539                                    )
540                                }
541                            });
542                        }
543                    }
544                }
545            }
546            Poll::Pending
547        })
548    }
549}
550
551pub(crate) enum DatabaseCheckpointControlStatus {
552    Running(DatabaseCheckpointControl),
553    Recovering(DatabaseRecoveringState),
554}
555
556impl DatabaseCheckpointControlStatus {
557    fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
558        match self {
559            DatabaseCheckpointControlStatus::Running(state) => Some(state),
560            DatabaseCheckpointControlStatus::Recovering(_) => None,
561        }
562    }
563
564    fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
565        match self {
566            DatabaseCheckpointControlStatus::Running(state) => Some(state),
567            DatabaseCheckpointControlStatus::Recovering(_) => None,
568        }
569    }
570
571    fn may_have_snapshot_backfilling_jobs(&self) -> bool {
572        self.running_state()
573            .map(|database| !database.creating_streaming_job_controls.is_empty())
574            .unwrap_or(true) // there can be snapshot backfilling jobs when the database is recovering.
575    }
576
577    fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
578        match self {
579            DatabaseCheckpointControlStatus::Running(state) => state,
580            DatabaseCheckpointControlStatus::Recovering(_) => {
581                panic!("should be at running: {}", reason)
582            }
583        }
584    }
585}
586
587pub(in crate::barrier) struct DatabaseCheckpointControlMetrics {
588    barrier_latency: LabelGuardedHistogram,
589    in_flight_barrier_nums: LabelGuardedIntGauge,
590    all_barrier_nums: LabelGuardedIntGauge,
591}
592
593impl DatabaseCheckpointControlMetrics {
594    pub(in crate::barrier) fn new(database_id: DatabaseId) -> Self {
595        let database_id_str = database_id.to_string();
596        let barrier_latency = GLOBAL_META_METRICS
597            .barrier_latency
598            .with_guarded_label_values(&[&database_id_str]);
599        let in_flight_barrier_nums = GLOBAL_META_METRICS
600            .in_flight_barrier_nums
601            .with_guarded_label_values(&[&database_id_str]);
602        let all_barrier_nums = GLOBAL_META_METRICS
603            .all_barrier_nums
604            .with_guarded_label_values(&[&database_id_str]);
605        Self {
606            barrier_latency,
607            in_flight_barrier_nums,
608            all_barrier_nums,
609        }
610    }
611}
612
613impl PartialGraphStat for DatabaseCheckpointControlMetrics {
614    fn observe_barrier_latency(&self, _epoch: EpochPair, barrier_latency_secs: f64) {
615        self.barrier_latency.observe(barrier_latency_secs);
616    }
617
618    fn observe_barrier_num(&self, inflight_barrier_num: usize, collected_barrier_num: usize) {
619        self.in_flight_barrier_nums.set(inflight_barrier_num as _);
620        self.all_barrier_nums
621            .set((inflight_barrier_num + collected_barrier_num) as _);
622    }
623}
624
625/// Controls the concurrent execution of commands.
626pub(in crate::barrier) struct DatabaseCheckpointControl {
627    pub(super) database_id: DatabaseId,
628    partial_graph_id: PartialGraphId,
629    pub(super) state: BarrierWorkerState,
630
631    finishing_jobs_collector:
632        BarrierItemCollector<JobId, (Vec<BarrierCompleteResponse>, TrackingJob), ()>,
633    /// The barrier that are completing.
634    /// Some(`prev_epoch`)
635    completing_barrier: Option<u64>,
636
637    committed_epoch: Option<u64>,
638
639    pub(super) database_info: InflightDatabaseInfo,
640    pub creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
641}
642
643impl DatabaseCheckpointControl {
644    fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
645        Self {
646            database_id,
647            partial_graph_id: to_partial_graph_id(database_id, None),
648            state: BarrierWorkerState::new(),
649            finishing_jobs_collector: BarrierItemCollector::new(),
650            completing_barrier: None,
651            committed_epoch: None,
652            database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
653            creating_streaming_job_controls: Default::default(),
654        }
655    }
656
657    pub(crate) fn recovery(
658        database_id: DatabaseId,
659        state: BarrierWorkerState,
660        committed_epoch: u64,
661        database_info: InflightDatabaseInfo,
662        creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
663    ) -> Self {
664        Self {
665            database_id,
666            partial_graph_id: to_partial_graph_id(database_id, None),
667            state,
668            finishing_jobs_collector: BarrierItemCollector::new(),
669            completing_barrier: None,
670            committed_epoch: Some(committed_epoch),
671            database_info,
672            creating_streaming_job_controls,
673        }
674    }
675
676    pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
677        !self.database_info.contains_worker(worker_id as _)
678            && self
679                .creating_streaming_job_controls
680                .values()
681                .all(|job| job.is_valid_after_worker_err(worker_id))
682    }
683
684    /// Enqueue a barrier command
685    fn enqueue_command(&mut self, epoch: EpochPair, creating_jobs_to_wait: HashSet<JobId>) {
686        let prev_epoch = epoch.prev;
687        tracing::trace!(prev_epoch, ?creating_jobs_to_wait, "enqueue command");
688        if !creating_jobs_to_wait.is_empty() {
689            self.finishing_jobs_collector
690                .enqueue(epoch, creating_jobs_to_wait, ());
691        }
692    }
693
694    /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
695    /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
696    fn barrier_collected(
697        &mut self,
698        partial_graph_id: PartialGraphId,
699        collected_barrier: CollectedBarrier<'_>,
700        periodic_barriers: &mut PeriodicBarriers,
701    ) -> MetaResult<()> {
702        let prev_epoch = collected_barrier.epoch.prev;
703        tracing::trace!(
704            prev_epoch,
705            partial_graph_id = %partial_graph_id,
706            "barrier collected"
707        );
708        let (database_id, creating_job_id) = from_partial_graph_id(partial_graph_id);
709        assert_eq!(self.database_id, database_id);
710        if let Some(creating_job_id) = creating_job_id {
711            let should_merge_to_upstream = self
712                .creating_streaming_job_controls
713                .get_mut(&creating_job_id)
714                .expect("should exist")
715                .collect(collected_barrier);
716            if should_merge_to_upstream {
717                periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
718            }
719        }
720        Ok(())
721    }
722}
723
724impl DatabaseCheckpointControl {
725    /// return creating job table fragment id -> (backfill progress epoch , {`upstream_mv_table_id`})
726    fn collect_backfill_pinned_upstream_log_epoch(
727        &self,
728    ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
729        self.creating_streaming_job_controls
730            .iter()
731            .map(|(job_id, creating_job)| (*job_id, creating_job.pinned_upstream_log_epoch()))
732            .collect()
733    }
734
735    fn collect_no_shuffle_fragment_relations_for_reschedule_check(
736        &self,
737    ) -> Vec<(FragmentId, FragmentId)> {
738        let mut no_shuffle_relations = Vec::new();
739        for fragment in self.database_info.fragment_infos() {
740            let downstream_fragment_id = fragment.fragment_id;
741            visit_stream_node_cont(&fragment.nodes, |node| {
742                if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
743                    && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
744                {
745                    no_shuffle_relations.push((merge.upstream_fragment_id, downstream_fragment_id));
746                }
747                true
748            });
749        }
750
751        for creating_job in self.creating_streaming_job_controls.values() {
752            creating_job.collect_no_shuffle_fragment_relations(&mut no_shuffle_relations);
753        }
754        no_shuffle_relations
755    }
756
757    fn collect_reschedule_blocked_jobs_for_creating_jobs_inflight(
758        &self,
759    ) -> MetaResult<HashSet<JobId>> {
760        let mut initial_blocked_fragment_ids = HashSet::new();
761        for creating_job in self.creating_streaming_job_controls.values() {
762            creating_job.collect_reschedule_blocked_fragment_ids(&mut initial_blocked_fragment_ids);
763        }
764
765        let mut blocked_fragment_ids = initial_blocked_fragment_ids.clone();
766        if !initial_blocked_fragment_ids.is_empty() {
767            let no_shuffle_relations =
768                self.collect_no_shuffle_fragment_relations_for_reschedule_check();
769            let (forward_edges, backward_edges) =
770                build_no_shuffle_fragment_graph_edges(no_shuffle_relations);
771            let initial_blocked_fragment_ids: Vec<_> =
772                initial_blocked_fragment_ids.iter().copied().collect();
773            for ensemble in find_no_shuffle_graphs(
774                &initial_blocked_fragment_ids,
775                &forward_edges,
776                &backward_edges,
777            )? {
778                blocked_fragment_ids.extend(ensemble.fragments());
779            }
780        }
781
782        let mut blocked_job_ids = HashSet::new();
783        blocked_job_ids.extend(
784            blocked_fragment_ids
785                .into_iter()
786                .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id)),
787        );
788        Ok(blocked_job_ids)
789    }
790
791    fn collect_reschedule_blocked_job_ids(
792        &self,
793        reschedules: &HashMap<FragmentId, Reschedule>,
794        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
795        blocked_job_ids: &HashSet<JobId>,
796    ) -> HashSet<JobId> {
797        let mut affected_fragment_ids: HashSet<FragmentId> = reschedules.keys().copied().collect();
798        affected_fragment_ids.extend(fragment_actors.keys().copied());
799        for reschedule in reschedules.values() {
800            affected_fragment_ids.extend(reschedule.downstream_fragment_ids.iter().copied());
801            affected_fragment_ids.extend(
802                reschedule
803                    .upstream_fragment_dispatcher_ids
804                    .iter()
805                    .map(|(fragment_id, _)| *fragment_id),
806            );
807        }
808
809        affected_fragment_ids
810            .into_iter()
811            .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id))
812            .filter(|job_id| blocked_job_ids.contains(job_id))
813            .collect()
814    }
815
816    fn next_complete_barrier_task(
817        &mut self,
818        periodic_barriers: &mut PeriodicBarriers,
819        partial_graph_manager: &mut PartialGraphManager,
820        task: &mut Option<CompleteBarrierTask>,
821        hummock_version_stats: &HummockVersionStats,
822    ) {
823        // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
824        let mut creating_jobs_task = vec![];
825        if let Some(committed_epoch) = self.committed_epoch {
826            // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
827            let mut finished_jobs = Vec::new();
828            let min_upstream_inflight_barrier = partial_graph_manager
829                .first_inflight_barrier(self.partial_graph_id)
830                .map(|epoch| epoch.prev);
831            for (job_id, job) in &mut self.creating_streaming_job_controls {
832                if let Some((epoch, resps, info, is_finish_epoch)) = job.start_completing(
833                    partial_graph_manager,
834                    min_upstream_inflight_barrier,
835                    committed_epoch,
836                ) {
837                    let resps = resps.into_values().collect_vec();
838                    if is_finish_epoch {
839                        assert!(info.notifiers.is_empty());
840                        finished_jobs.push((*job_id, epoch, resps));
841                        continue;
842                    };
843                    creating_jobs_task.push((*job_id, epoch, resps, info));
844                }
845            }
846
847            if !finished_jobs.is_empty() {
848                partial_graph_manager.remove_partial_graphs(
849                    finished_jobs
850                        .iter()
851                        .map(|(job_id, ..)| to_partial_graph_id(self.database_id, Some(*job_id)))
852                        .collect(),
853                );
854            }
855            for (job_id, epoch, resps) in finished_jobs {
856                debug!(epoch, %job_id, "finish creating job");
857                // It's safe to remove the creating job, because on CompleteJobType::Finished,
858                // all previous barriers have been collected and completed.
859                let creating_streaming_job = self
860                    .creating_streaming_job_controls
861                    .remove(&job_id)
862                    .expect("should exist");
863                let tracking_job = creating_streaming_job.into_tracking_job();
864                self.finishing_jobs_collector
865                    .collect(epoch, job_id, (resps, tracking_job));
866            }
867        }
868        let mut observed_non_checkpoint = false;
869        self.finishing_jobs_collector.advance_collected();
870        let epoch_end_bound = self
871            .finishing_jobs_collector
872            .first_inflight_epoch()
873            .map_or(Unbounded, |epoch| Excluded(epoch.prev));
874        if let Some((epoch, resps, info)) = partial_graph_manager.start_completing(
875            self.partial_graph_id,
876            epoch_end_bound,
877            |_, resps, post_collect_command| {
878                observed_non_checkpoint = true;
879                self.handle_refresh_table_info(task, &resps);
880                self.database_info.apply_collected_command(
881                    &post_collect_command,
882                    &resps,
883                    hummock_version_stats,
884                );
885            },
886        ) {
887            self.handle_refresh_table_info(task, &resps);
888            self.database_info.apply_collected_command(
889                &info.post_collect_command,
890                &resps,
891                hummock_version_stats,
892            );
893            let mut resps_to_commit = resps.into_values().collect_vec();
894            let mut staging_commit_info = self.database_info.take_staging_commit_info();
895            if let Some((_, finished_jobs, _)) =
896                self.finishing_jobs_collector
897                    .take_collected_if(|collected_epoch| {
898                        assert!(epoch <= collected_epoch.prev);
899                        epoch == collected_epoch.prev
900                    })
901            {
902                finished_jobs
903                    .into_iter()
904                    .for_each(|(_, (resps, tracking_job))| {
905                        resps_to_commit.extend(resps);
906                        staging_commit_info.finished_jobs.push(tracking_job);
907                    });
908            }
909            {
910                let task = task.get_or_insert_default();
911                Command::collect_commit_epoch_info(
912                    &self.database_info,
913                    &info,
914                    &mut task.commit_info,
915                    resps_to_commit,
916                    self.collect_backfill_pinned_upstream_log_epoch(),
917                );
918                self.completing_barrier = Some(info.barrier_info.prev_epoch());
919                task.finished_jobs.extend(staging_commit_info.finished_jobs);
920                task.finished_cdc_table_backfill
921                    .extend(staging_commit_info.finished_cdc_table_backfill);
922                task.epoch_infos
923                    .try_insert(self.partial_graph_id, info)
924                    .expect("non duplicate");
925                task.commit_info
926                    .truncate_tables
927                    .extend(staging_commit_info.table_ids_to_truncate);
928            }
929        } else if observed_non_checkpoint
930            && self.database_info.has_pending_finished_jobs()
931            && !partial_graph_manager.has_pending_checkpoint_barrier(self.partial_graph_id)
932        {
933            periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
934        }
935        if !creating_jobs_task.is_empty() {
936            let task = task.get_or_insert_default();
937            for (job_id, epoch, resps, info) in creating_jobs_task {
938                collect_creating_job_commit_epoch_info(&mut task.commit_info, epoch, resps, &info);
939                task.epoch_infos
940                    .try_insert(to_partial_graph_id(self.database_id, Some(job_id)), info)
941                    .expect("non duplicate");
942            }
943        }
944    }
945
946    fn ack_completed(
947        &mut self,
948        partial_graph_manager: &mut PartialGraphManager,
949        command_prev_epoch: Option<u64>,
950        creating_job_epochs: Vec<(JobId, u64)>,
951    ) {
952        {
953            if let Some(prev_epoch) = self.completing_barrier.take() {
954                assert_eq!(command_prev_epoch, Some(prev_epoch));
955                self.committed_epoch = Some(prev_epoch);
956                partial_graph_manager.ack_completed(self.partial_graph_id, prev_epoch);
957            } else {
958                assert_eq!(command_prev_epoch, None);
959            };
960            for (job_id, epoch) in creating_job_epochs {
961                if let Some(job) = self.creating_streaming_job_controls.get_mut(&job_id) {
962                    job.ack_completed(partial_graph_manager, epoch);
963                }
964                // If the job is not found, it was dropped and already removed
965                // by `on_partial_graph_reset` while the completing task was running.
966            }
967        }
968    }
969
970    fn handle_refresh_table_info(
971        &self,
972        task: &mut Option<CompleteBarrierTask>,
973        resps: &HashMap<WorkerId, BarrierCompleteResponse>,
974    ) {
975        let list_finished_info = resps
976            .values()
977            .flat_map(|resp| resp.list_finished_sources.clone())
978            .collect::<Vec<_>>();
979        if !list_finished_info.is_empty() {
980            let task = task.get_or_insert_default();
981            task.list_finished_source_ids.extend(list_finished_info);
982        }
983
984        let load_finished_info = resps
985            .values()
986            .flat_map(|resp| resp.load_finished_sources.clone())
987            .collect::<Vec<_>>();
988        if !load_finished_info.is_empty() {
989            let task = task.get_or_insert_default();
990            task.load_finished_source_ids.extend(load_finished_info);
991        }
992
993        let refresh_finished_table_ids: Vec<JobId> = resps
994            .values()
995            .flat_map(|resp| {
996                resp.refresh_finished_tables
997                    .iter()
998                    .map(|table_id| table_id.as_job_id())
999            })
1000            .collect::<Vec<_>>();
1001        if !refresh_finished_table_ids.is_empty() {
1002            let task = task.get_or_insert_default();
1003            task.refresh_finished_table_job_ids
1004                .extend(refresh_finished_table_ids);
1005        }
1006    }
1007}
1008
1009impl DatabaseCheckpointControl {
1010    /// Handle the new barrier from the scheduled queue and inject it.
1011    fn handle_new_barrier(
1012        &mut self,
1013        command: Option<(Command, Vec<Notifier>)>,
1014        checkpoint: bool,
1015        span: tracing::Span,
1016        partial_graph_manager: &mut PartialGraphManager,
1017        hummock_version_stats: &HummockVersionStats,
1018        adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
1019        worker_nodes: &HashMap<WorkerId, WorkerNode>,
1020    ) -> MetaResult<()> {
1021        let curr_epoch = self.state.in_flight_prev_epoch().next();
1022
1023        let (command, mut notifiers) = if let Some((command, notifiers)) = command {
1024            (Some(command), notifiers)
1025        } else {
1026            (None, vec![])
1027        };
1028
1029        debug_assert!(
1030            !matches!(
1031                &command,
1032                Some(Command::RescheduleIntent {
1033                    reschedule_plan: None,
1034                    ..
1035                })
1036            ),
1037            "reschedule intent should be resolved before injection"
1038        );
1039
1040        if let Some(Command::DropStreamingJobs {
1041            streaming_job_ids, ..
1042        }) = &command
1043        {
1044            if streaming_job_ids.len() > 1 {
1045                for job_to_cancel in streaming_job_ids {
1046                    if self
1047                        .creating_streaming_job_controls
1048                        .contains_key(job_to_cancel)
1049                    {
1050                        warn!(
1051                            job_id = %job_to_cancel,
1052                            "ignore multi-job cancel command on creating snapshot backfill streaming job"
1053                        );
1054                        for notifier in notifiers {
1055                            notifier
1056                                .notify_start_failed(anyhow!("cannot cancel creating snapshot backfill streaming job with other jobs, \
1057                                the job will continue creating until created or recovery. Please cancel the snapshot backfilling job in a single DDL ").into());
1058                        }
1059                        return Ok(());
1060                    }
1061                }
1062            } else if let Some(job_to_drop) = streaming_job_ids.iter().next()
1063                && let Some(creating_job) =
1064                    self.creating_streaming_job_controls.get_mut(job_to_drop)
1065                && creating_job.drop(&mut notifiers, partial_graph_manager)
1066            {
1067                return Ok(());
1068            }
1069        }
1070
1071        if let Some(Command::Throttle { jobs, .. }) = &command
1072            && jobs.len() > 1
1073            && let Some(creating_job_id) = jobs
1074                .iter()
1075                .find(|job| self.creating_streaming_job_controls.contains_key(*job))
1076        {
1077            warn!(
1078                job_id = %creating_job_id,
1079                "ignore multi-job throttle command on creating snapshot backfill streaming job"
1080            );
1081            for notifier in notifiers {
1082                notifier
1083                    .notify_start_failed(anyhow!("cannot alter rate limit for snapshot backfill streaming job with other jobs, \
1084                                the original rate limit will be kept recovery.").into());
1085            }
1086            return Ok(());
1087        };
1088
1089        if let Some(Command::RescheduleIntent {
1090            reschedule_plan: Some(reschedule_plan),
1091            ..
1092        }) = &command
1093            && !self.creating_streaming_job_controls.is_empty()
1094        {
1095            let blocked_job_ids =
1096                self.collect_reschedule_blocked_jobs_for_creating_jobs_inflight()?;
1097            let blocked_reschedule_job_ids = self.collect_reschedule_blocked_job_ids(
1098                &reschedule_plan.reschedules,
1099                &reschedule_plan.fragment_actors,
1100                &blocked_job_ids,
1101            );
1102            if !blocked_reschedule_job_ids.is_empty() {
1103                warn!(
1104                    blocked_reschedule_job_ids = ?blocked_reschedule_job_ids,
1105                    "reject reschedule fragments related to creating unreschedulable backfill jobs"
1106                );
1107                for notifier in notifiers {
1108                    notifier.notify_start_failed(
1109                        anyhow!(
1110                            "cannot reschedule jobs {:?} when creating jobs with unreschedulable backfill fragments",
1111                            blocked_reschedule_job_ids
1112                        )
1113                            .into(),
1114                    );
1115                }
1116                return Ok(());
1117            }
1118        }
1119
1120        if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1121            && self.database_info.is_empty()
1122        {
1123            assert!(
1124                self.creating_streaming_job_controls.is_empty(),
1125                "should not have snapshot backfill job when there is no normal job in database"
1126            );
1127            // skip the command when there is nothing to do with the barrier
1128            for mut notifier in notifiers {
1129                notifier.notify_started();
1130                notifier.notify_collected();
1131            }
1132            return Ok(());
1133        };
1134
1135        if let Some(Command::CreateStreamingJob {
1136            job_type: CreateStreamingJobType::SnapshotBackfill(_),
1137            ..
1138        }) = &command
1139            && self.state.is_paused()
1140        {
1141            warn!("cannot create streaming job with snapshot backfill when paused");
1142            for notifier in notifiers {
1143                notifier.notify_start_failed(
1144                    anyhow!("cannot create streaming job with snapshot backfill when paused",)
1145                        .into(),
1146                );
1147            }
1148            return Ok(());
1149        }
1150
1151        let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1152        // Tracing related stuff
1153        barrier_info.prev_epoch.span().in_scope(|| {
1154            tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1155        });
1156        span.record("epoch", barrier_info.curr_epoch());
1157
1158        let epoch = barrier_info.epoch();
1159        let ApplyCommandInfo { jobs_to_wait } = match self.apply_command(
1160            command,
1161            &mut notifiers,
1162            barrier_info,
1163            partial_graph_manager,
1164            hummock_version_stats,
1165            adaptive_parallelism_strategy,
1166            worker_nodes,
1167        ) {
1168            Ok(info) => {
1169                assert!(notifiers.is_empty());
1170                info
1171            }
1172            Err(err) => {
1173                for notifier in notifiers {
1174                    notifier.notify_start_failed(err.clone());
1175                }
1176                fail_point!("inject_barrier_err_success");
1177                return Err(err);
1178            }
1179        };
1180
1181        // Record the in-flight barrier.
1182        self.enqueue_command(epoch, jobs_to_wait);
1183
1184        Ok(())
1185    }
1186}