risingwave_meta/barrier/checkpoint/
control.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::ops::Bound::{Excluded, Unbounded};
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use itertools::Itertools;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_common::system_param::AdaptiveParallelismStrategy;
28use risingwave_common::util::epoch::EpochPair;
29use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
30use risingwave_meta_model::WorkerId;
31use risingwave_pb::common::WorkerNode;
32use risingwave_pb::hummock::HummockVersionStats;
33use risingwave_pb::id::{FragmentId, PartialGraphId};
34use risingwave_pb::stream_plan::DispatcherType as PbDispatcherType;
35use risingwave_pb::stream_plan::stream_node::NodeBody;
36use risingwave_pb::stream_service::BarrierCompleteResponse;
37use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
38use tracing::{debug, warn};
39
40use crate::barrier::cdc_progress::CdcProgress;
41use crate::barrier::checkpoint::independent_job::IndependentCheckpointJobControl;
42use crate::barrier::checkpoint::recovery::{
43    DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
44    RecoveringStateAction,
45};
46use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
47use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
48use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
49use crate::barrier::notifier::Notifier;
50use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager, PartialGraphStat};
51use crate::barrier::progress::TrackingJob;
52use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
53use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
54use crate::barrier::utils::{BarrierItemCollector, collect_independent_job_commit_epoch_info};
55use crate::barrier::{
56    BackfillProgress, Command, CreateStreamingJobType, FragmentBackfillProgress, Reschedule,
57};
58use crate::controller::fragment::InflightFragmentInfo;
59use crate::controller::scale::{build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs};
60use crate::manager::MetaSrvEnv;
61
62fn fragment_has_online_unreschedulable_scan(fragment: &InflightFragmentInfo) -> bool {
63    let mut has_unreschedulable_scan = false;
64    visit_stream_node_cont(&fragment.nodes, |node| {
65        if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
66            let scan_type = stream_scan.stream_scan_type();
67            if !scan_type.is_reschedulable(true) {
68                has_unreschedulable_scan = true;
69                return false;
70            }
71        }
72        true
73    });
74    has_unreschedulable_scan
75}
76
77fn collect_fragment_upstream_fragment_ids(
78    fragment: &InflightFragmentInfo,
79    upstream_fragment_ids: &mut HashSet<FragmentId>,
80) {
81    visit_stream_node_cont(&fragment.nodes, |node| {
82        if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref() {
83            upstream_fragment_ids.insert(merge.upstream_fragment_id);
84        }
85        true
86    });
87}
88
89use crate::model::ActorId;
90use crate::rpc::metrics::GLOBAL_META_METRICS;
91use crate::{MetaError, MetaResult};
92
93pub(crate) struct CheckpointControl {
94    pub(crate) env: MetaSrvEnv,
95    pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
96    pub(super) hummock_version_stats: HummockVersionStats,
97    /// The max barrier nums in flight
98    pub(crate) in_flight_barrier_nums: usize,
99}
100
101impl CheckpointControl {
102    pub fn new(env: MetaSrvEnv) -> Self {
103        Self {
104            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
105            env,
106            databases: Default::default(),
107            hummock_version_stats: Default::default(),
108        }
109    }
110
111    pub(crate) fn recover(
112        databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
113        failed_databases: HashMap<DatabaseId, HashSet<PartialGraphId>>, /* `database_id` -> set of resetting partial graph ids */
114        hummock_version_stats: HummockVersionStats,
115        env: MetaSrvEnv,
116    ) -> Self {
117        env.shared_actor_infos()
118            .retain_databases(databases.keys().chain(failed_databases.keys()).cloned());
119        Self {
120            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
121            env,
122            databases: databases
123                .into_iter()
124                .map(|(database_id, control)| {
125                    (
126                        database_id,
127                        DatabaseCheckpointControlStatus::Running(control),
128                    )
129                })
130                .chain(failed_databases.into_iter().map(
131                    |(database_id, resetting_partial_graphs)| {
132                        (
133                            database_id,
134                            DatabaseCheckpointControlStatus::Recovering(
135                                DatabaseRecoveringState::new_resetting(
136                                    database_id,
137                                    resetting_partial_graphs,
138                                ),
139                            ),
140                        )
141                    },
142                ))
143                .collect(),
144            hummock_version_stats,
145        }
146    }
147
148    pub(crate) fn ack_completed(
149        &mut self,
150        partial_graph_manager: &mut PartialGraphManager,
151        output: BarrierCompleteOutput,
152    ) {
153        self.hummock_version_stats = output.hummock_version_stats;
154        for (database_id, (command_prev_epoch, independent_job_epochs)) in output.epochs_to_ack {
155            self.databases
156                .get_mut(&database_id)
157                .expect("should exist")
158                .expect_running("should have wait for completing command before enter recovery")
159                .ack_completed(
160                    partial_graph_manager,
161                    command_prev_epoch,
162                    independent_job_epochs,
163                );
164        }
165    }
166
167    pub(crate) fn next_complete_barrier_task(
168        &mut self,
169        periodic_barriers: &mut PeriodicBarriers,
170        partial_graph_manager: &mut PartialGraphManager,
171    ) -> Option<CompleteBarrierTask> {
172        let mut task = None;
173        for database in self.databases.values_mut() {
174            let Some(database) = database.running_state_mut() else {
175                continue;
176            };
177            database.next_complete_barrier_task(
178                periodic_barriers,
179                partial_graph_manager,
180                &mut task,
181                &self.hummock_version_stats,
182            );
183        }
184        task
185    }
186
187    pub(crate) fn barrier_collected(
188        &mut self,
189        partial_graph_id: PartialGraphId,
190        collected_barrier: CollectedBarrier<'_>,
191        periodic_barriers: &mut PeriodicBarriers,
192    ) -> MetaResult<()> {
193        let (database_id, _) = from_partial_graph_id(partial_graph_id);
194        let database_status = self.databases.get_mut(&database_id).expect("should exist");
195        match database_status {
196            DatabaseCheckpointControlStatus::Running(database) => {
197                database.barrier_collected(partial_graph_id, collected_barrier, periodic_barriers)
198            }
199            DatabaseCheckpointControlStatus::Recovering(_) => {
200                if cfg!(debug_assertions) {
201                    panic!(
202                        "receive collected barrier {:?} on recovering database {} from partial graph {}",
203                        collected_barrier, database_id, partial_graph_id
204                    );
205                } else {
206                    warn!(?collected_barrier, %partial_graph_id, "ignore collected barrier on recovering database");
207                }
208                Ok(())
209            }
210        }
211    }
212
213    pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
214        self.databases.iter().filter_map(|(database_id, database)| {
215            database.running_state().is_none().then_some(*database_id)
216        })
217    }
218
219    pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
220        self.databases.iter().filter_map(|(database_id, database)| {
221            database.running_state().is_some().then_some(*database_id)
222        })
223    }
224
225    pub(crate) fn database_info(&self, database_id: DatabaseId) -> Option<&InflightDatabaseInfo> {
226        self.databases
227            .get(&database_id)
228            .and_then(|database| database.running_state())
229            .map(|database| &database.database_info)
230    }
231
232    pub(crate) fn may_have_snapshot_backfilling_jobs(&self) -> bool {
233        self.databases
234            .values()
235            .any(|database| database.may_have_snapshot_backfilling_jobs())
236    }
237
238    /// return Some(failed `database_id` -> `err`)
239    pub(crate) fn handle_new_barrier(
240        &mut self,
241        new_barrier: NewBarrier,
242        partial_graph_manager: &mut PartialGraphManager,
243        adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
244        worker_nodes: &HashMap<WorkerId, WorkerNode>,
245    ) -> MetaResult<()> {
246        let NewBarrier {
247            database_id,
248            command,
249            span,
250            checkpoint,
251        } = new_barrier;
252
253        if let Some((mut command, notifiers)) = command {
254            if let &mut Command::CreateStreamingJob {
255                ref mut cross_db_snapshot_backfill_info,
256                ref info,
257                ..
258            } = &mut command
259            {
260                for (table_id, snapshot_epoch) in
261                    &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
262                {
263                    for database in self.databases.values() {
264                        if let Some(database) = database.running_state()
265                            && database.database_info.contains_job(table_id.as_job_id())
266                        {
267                            if let Some(committed_epoch) = database.committed_epoch {
268                                *snapshot_epoch = Some(committed_epoch);
269                            }
270                            break;
271                        }
272                    }
273                    if snapshot_epoch.is_none() {
274                        let table_id = *table_id;
275                        warn!(
276                            ?cross_db_snapshot_backfill_info,
277                            ?table_id,
278                            ?info,
279                            "database of cross db upstream table not found"
280                        );
281                        let err: MetaError =
282                            anyhow!("database of cross db upstream table {} not found", table_id)
283                                .into();
284                        for notifier in notifiers {
285                            notifier.notify_start_failed(err.clone());
286                        }
287
288                        return Ok(());
289                    }
290                }
291            }
292
293            let database = match self.databases.entry(database_id) {
294                Entry::Occupied(entry) => entry
295                    .into_mut()
296                    .expect_running("should not have command when not running"),
297                Entry::Vacant(entry) => match &command {
298                    Command::CreateStreamingJob { info, job_type, .. } => {
299                        let CreateStreamingJobType::Normal = job_type else {
300                            if cfg!(debug_assertions) {
301                                panic!(
302                                    "unexpected first job of type {job_type:?} with info {info:?}"
303                                );
304                            } else {
305                                for notifier in notifiers {
306                                    notifier.notify_start_failed(anyhow!("unexpected job_type {job_type:?} for first job {} in database {database_id}", info.streaming_job.id()).into());
307                                }
308                                return Ok(());
309                            }
310                        };
311                        let new_database = DatabaseCheckpointControl::new(
312                            database_id,
313                            self.env.shared_actor_infos().clone(),
314                        );
315                        let adder = partial_graph_manager.add_partial_graph(
316                            to_partial_graph_id(database_id, None),
317                            DatabaseCheckpointControlMetrics::new(database_id),
318                        );
319                        adder.added();
320                        entry
321                            .insert(DatabaseCheckpointControlStatus::Running(new_database))
322                            .expect_running("just initialized as running")
323                    }
324                    Command::Flush
325                    | Command::Pause
326                    | Command::Resume
327                    | Command::DropStreamingJobs { .. }
328                    | Command::DropSubscription { .. } => {
329                        for mut notifier in notifiers {
330                            notifier.notify_started();
331                            notifier.notify_collected();
332                        }
333                        warn!(?command, "skip command for empty database");
334                        return Ok(());
335                    }
336                    Command::RescheduleIntent { .. }
337                    | Command::ReplaceStreamJob(_)
338                    | Command::SourceChangeSplit(_)
339                    | Command::Throttle { .. }
340                    | Command::CreateSubscription { .. }
341                    | Command::AlterSubscriptionRetention { .. }
342                    | Command::ConnectorPropsChange(_)
343                    | Command::Refresh { .. }
344                    | Command::ListFinish { .. }
345                    | Command::LoadFinish { .. }
346                    | Command::ResetSource { .. }
347                    | Command::ResumeBackfill { .. }
348                    | Command::InjectSourceOffsets { .. } => {
349                        if cfg!(debug_assertions) {
350                            panic!(
351                                "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
352                                database_id, command
353                            )
354                        } else {
355                            warn!(%database_id, ?command, "database not exist when handling command");
356                            for notifier in notifiers {
357                                notifier.notify_start_failed(anyhow!("database {database_id} not exist when handling command {command:?}").into());
358                            }
359                            return Ok(());
360                        }
361                    }
362                },
363            };
364
365            database.handle_new_barrier(
366                Some((command, notifiers)),
367                checkpoint,
368                span,
369                partial_graph_manager,
370                &self.hummock_version_stats,
371                adaptive_parallelism_strategy,
372                worker_nodes,
373            )
374        } else {
375            let database = match self.databases.entry(database_id) {
376                Entry::Occupied(entry) => entry.into_mut(),
377                Entry::Vacant(_) => {
378                    // If it does not exist in the HashMap yet, it means that the first streaming
379                    // job has not been created, and we do not need to send a barrier.
380                    return Ok(());
381                }
382            };
383            let Some(database) = database.running_state_mut() else {
384                // Skip new barrier for database which is not running.
385                return Ok(());
386            };
387            if partial_graph_manager.inflight_barrier_num(database.partial_graph_id)
388                >= self.in_flight_barrier_nums
389            {
390                // Skip new barrier with no explicit command when the database should pause inject additional barrier
391                return Ok(());
392            }
393            database.handle_new_barrier(
394                None,
395                checkpoint,
396                span,
397                partial_graph_manager,
398                &self.hummock_version_stats,
399                adaptive_parallelism_strategy,
400                worker_nodes,
401            )
402        }
403    }
404
405    pub(crate) fn gen_backfill_progress(&self) -> HashMap<JobId, BackfillProgress> {
406        let mut progress = HashMap::new();
407        for status in self.databases.values() {
408            let Some(database_checkpoint_control) = status.running_state() else {
409                continue;
410            };
411            // Progress of normal backfill
412            progress.extend(
413                database_checkpoint_control
414                    .database_info
415                    .gen_backfill_progress(),
416            );
417            // Progress of independent checkpoint jobs
418            for (job_id, job) in &database_checkpoint_control.independent_checkpoint_job_controls {
419                progress.extend([(*job_id, job.gen_backfill_progress())]);
420            }
421        }
422        progress
423    }
424
425    pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
426        let mut progress = Vec::new();
427        for status in self.databases.values() {
428            let Some(database_checkpoint_control) = status.running_state() else {
429                continue;
430            };
431            progress.extend(
432                database_checkpoint_control
433                    .database_info
434                    .gen_fragment_backfill_progress(),
435            );
436            for job in database_checkpoint_control
437                .independent_checkpoint_job_controls
438                .values()
439            {
440                progress.extend(job.gen_fragment_backfill_progress());
441            }
442        }
443        progress
444    }
445
446    pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
447        let mut progress = HashMap::new();
448        for status in self.databases.values() {
449            let Some(database_checkpoint_control) = status.running_state() else {
450                continue;
451            };
452            // Progress of normal backfill
453            progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
454        }
455        progress
456    }
457
458    pub(crate) fn databases_failed_at_worker_err(
459        &mut self,
460        worker_id: WorkerId,
461    ) -> impl Iterator<Item = DatabaseId> + '_ {
462        self.databases
463            .iter_mut()
464            .filter_map(
465                move |(database_id, database_status)| match database_status {
466                    DatabaseCheckpointControlStatus::Running(control) => {
467                        if !control.is_valid_after_worker_err(worker_id) {
468                            Some(*database_id)
469                        } else {
470                            None
471                        }
472                    }
473                    DatabaseCheckpointControlStatus::Recovering(state) => {
474                        if !state.is_valid_after_worker_err(worker_id) {
475                            Some(*database_id)
476                        } else {
477                            None
478                        }
479                    }
480                },
481            )
482    }
483}
484
485pub(crate) enum CheckpointControlEvent<'a> {
486    EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
487    EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
488}
489
490impl CheckpointControl {
491    pub(crate) fn on_partial_graph_reset(
492        &mut self,
493        partial_graph_id: PartialGraphId,
494        reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
495    ) {
496        let (database_id, independent_job_id) = from_partial_graph_id(partial_graph_id);
497        match self.databases.get_mut(&database_id).expect("should exist") {
498            DatabaseCheckpointControlStatus::Running(database) => {
499                if let Some(independent_job_id) = independent_job_id {
500                    match database
501                        .independent_checkpoint_job_controls
502                        .remove(&independent_job_id)
503                    {
504                        Some(independent_job) => {
505                            independent_job.on_partial_graph_reset();
506                        }
507                        None => {
508                            if cfg!(debug_assertions) {
509                                panic!(
510                                    "receive reset partial graph resp on non-existing independent job {independent_job_id} in database {database_id}"
511                                )
512                            }
513                            warn!(
514                                %database_id,
515                                %independent_job_id,
516                                "ignore reset partial graph resp on non-existing independent job on running database"
517                            );
518                        }
519                    }
520                } else {
521                    unreachable!("should not receive reset database resp when database running")
522                }
523            }
524            DatabaseCheckpointControlStatus::Recovering(state) => {
525                state.on_partial_graph_reset(partial_graph_id, reset_resps);
526            }
527        }
528    }
529
530    pub(crate) fn on_partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
531        let (database_id, _) = from_partial_graph_id(partial_graph_id);
532        match self.databases.get_mut(&database_id).expect("should exist") {
533            DatabaseCheckpointControlStatus::Running(_) => {
534                unreachable!("should not have partial graph initialized when running")
535            }
536            DatabaseCheckpointControlStatus::Recovering(state) => {
537                state.partial_graph_initialized(partial_graph_id);
538            }
539        }
540    }
541
542    pub(crate) fn next_event(
543        &mut self,
544    ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
545        let mut this = Some(self);
546        poll_fn(move |cx| {
547            let Some(this_mut) = this.as_mut() else {
548                unreachable!("should not be polled after poll ready")
549            };
550            for (&database_id, database_status) in &mut this_mut.databases {
551                match database_status {
552                    DatabaseCheckpointControlStatus::Running(_) => {}
553                    DatabaseCheckpointControlStatus::Recovering(state) => {
554                        let poll_result = state.poll_next_event(cx);
555                        if let Poll::Ready(action) = poll_result {
556                            let this = this.take().expect("checked Some");
557                            return Poll::Ready(match action {
558                                RecoveringStateAction::EnterInitializing(reset_workers) => {
559                                    CheckpointControlEvent::EnteringInitializing(
560                                        this.new_database_status_action(
561                                            database_id,
562                                            EnterInitializing(reset_workers),
563                                        ),
564                                    )
565                                }
566                                RecoveringStateAction::EnterRunning => {
567                                    CheckpointControlEvent::EnteringRunning(
568                                        this.new_database_status_action(database_id, EnterRunning),
569                                    )
570                                }
571                            });
572                        }
573                    }
574                }
575            }
576            Poll::Pending
577        })
578    }
579}
580
581pub(crate) enum DatabaseCheckpointControlStatus {
582    Running(DatabaseCheckpointControl),
583    Recovering(DatabaseRecoveringState),
584}
585
586impl DatabaseCheckpointControlStatus {
587    fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
588        match self {
589            DatabaseCheckpointControlStatus::Running(state) => Some(state),
590            DatabaseCheckpointControlStatus::Recovering(_) => None,
591        }
592    }
593
594    fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
595        match self {
596            DatabaseCheckpointControlStatus::Running(state) => Some(state),
597            DatabaseCheckpointControlStatus::Recovering(_) => None,
598        }
599    }
600
601    fn may_have_snapshot_backfilling_jobs(&self) -> bool {
602        self.running_state()
603            .map(|database| {
604                database
605                    .independent_checkpoint_job_controls
606                    .values()
607                    .any(|job| job.is_snapshot_backfilling())
608            })
609            .unwrap_or(true) // there can be snapshot backfilling jobs when the database is recovering.
610    }
611
612    fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
613        match self {
614            DatabaseCheckpointControlStatus::Running(state) => state,
615            DatabaseCheckpointControlStatus::Recovering(_) => {
616                panic!("should be at running: {}", reason)
617            }
618        }
619    }
620}
621
622pub(in crate::barrier) struct DatabaseCheckpointControlMetrics {
623    barrier_latency: LabelGuardedHistogram,
624    in_flight_barrier_nums: LabelGuardedIntGauge,
625    all_barrier_nums: LabelGuardedIntGauge,
626}
627
628impl DatabaseCheckpointControlMetrics {
629    pub(in crate::barrier) fn new(database_id: DatabaseId) -> Self {
630        let database_id_str = database_id.to_string();
631        let barrier_latency = GLOBAL_META_METRICS
632            .barrier_latency
633            .with_guarded_label_values(&[&database_id_str]);
634        let in_flight_barrier_nums = GLOBAL_META_METRICS
635            .in_flight_barrier_nums
636            .with_guarded_label_values(&[&database_id_str]);
637        let all_barrier_nums = GLOBAL_META_METRICS
638            .all_barrier_nums
639            .with_guarded_label_values(&[&database_id_str]);
640        Self {
641            barrier_latency,
642            in_flight_barrier_nums,
643            all_barrier_nums,
644        }
645    }
646}
647
648impl PartialGraphStat for DatabaseCheckpointControlMetrics {
649    fn observe_barrier_latency(&self, _epoch: EpochPair, barrier_latency_secs: f64) {
650        self.barrier_latency.observe(barrier_latency_secs);
651    }
652
653    fn observe_barrier_num(&self, inflight_barrier_num: usize, collected_barrier_num: usize) {
654        self.in_flight_barrier_nums.set(inflight_barrier_num as _);
655        self.all_barrier_nums
656            .set((inflight_barrier_num + collected_barrier_num) as _);
657    }
658}
659
660/// Controls the concurrent execution of commands.
661pub(in crate::barrier) struct DatabaseCheckpointControl {
662    pub(super) database_id: DatabaseId,
663    partial_graph_id: PartialGraphId,
664    pub(super) state: BarrierWorkerState,
665
666    finishing_jobs_collector:
667        BarrierItemCollector<JobId, (Vec<BarrierCompleteResponse>, TrackingJob), ()>,
668    /// The barrier that are completing.
669    /// Some(`prev_epoch`)
670    completing_barrier: Option<u64>,
671
672    committed_epoch: Option<u64>,
673
674    pub(super) database_info: InflightDatabaseInfo,
675    pub independent_checkpoint_job_controls: HashMap<JobId, IndependentCheckpointJobControl>,
676}
677
678impl DatabaseCheckpointControl {
679    fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
680        Self {
681            database_id,
682            partial_graph_id: to_partial_graph_id(database_id, None),
683            state: BarrierWorkerState::new(),
684            finishing_jobs_collector: BarrierItemCollector::new(),
685            completing_barrier: None,
686            committed_epoch: None,
687            database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
688            independent_checkpoint_job_controls: Default::default(),
689        }
690    }
691
692    pub(crate) fn recovery(
693        database_id: DatabaseId,
694        state: BarrierWorkerState,
695        committed_epoch: u64,
696        database_info: InflightDatabaseInfo,
697        independent_checkpoint_job_controls: HashMap<JobId, IndependentCheckpointJobControl>,
698    ) -> Self {
699        Self {
700            database_id,
701            partial_graph_id: to_partial_graph_id(database_id, None),
702            state,
703            finishing_jobs_collector: BarrierItemCollector::new(),
704            completing_barrier: None,
705            committed_epoch: Some(committed_epoch),
706            database_info,
707            independent_checkpoint_job_controls,
708        }
709    }
710
711    pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
712        !self.database_info.contains_worker(worker_id as _)
713            && self
714                .independent_checkpoint_job_controls
715                .values()
716                .all(|job| {
717                    job.fragment_infos()
718                        .map(|fragment_infos| {
719                            !InflightFragmentInfo::contains_worker(
720                                fragment_infos.values(),
721                                worker_id,
722                            )
723                        })
724                        .unwrap_or(true)
725                })
726    }
727
728    /// Enqueue a barrier command
729    fn enqueue_command(&mut self, epoch: EpochPair, independent_jobs_to_wait: HashSet<JobId>) {
730        let prev_epoch = epoch.prev;
731        tracing::trace!(prev_epoch, ?independent_jobs_to_wait, "enqueue command");
732        if !independent_jobs_to_wait.is_empty() {
733            self.finishing_jobs_collector
734                .enqueue(epoch, independent_jobs_to_wait, ());
735        }
736    }
737
738    /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
739    /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
740    fn barrier_collected(
741        &mut self,
742        partial_graph_id: PartialGraphId,
743        collected_barrier: CollectedBarrier<'_>,
744        periodic_barriers: &mut PeriodicBarriers,
745    ) -> MetaResult<()> {
746        let prev_epoch = collected_barrier.epoch.prev;
747        tracing::trace!(
748            prev_epoch,
749            partial_graph_id = %partial_graph_id,
750            "barrier collected"
751        );
752        let (database_id, independent_job_id) = from_partial_graph_id(partial_graph_id);
753        assert_eq!(self.database_id, database_id);
754        if let Some(independent_job_id) = independent_job_id {
755            let job = self
756                .independent_checkpoint_job_controls
757                .get_mut(&independent_job_id)
758                .expect("should exist");
759            let should_force_checkpoint = job.collect(collected_barrier);
760            if should_force_checkpoint {
761                periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
762            }
763        }
764        Ok(())
765    }
766}
767
768impl DatabaseCheckpointControl {
769    /// return creating job table fragment id -> (backfill progress epoch , {`upstream_mv_table_id`})
770    fn collect_backfill_pinned_upstream_log_epoch(
771        &self,
772    ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
773        self.independent_checkpoint_job_controls
774            .iter()
775            .map(|(job_id, job)| (*job_id, job.pinned_upstream_log_epoch()))
776            .collect()
777    }
778
779    fn collect_no_shuffle_fragment_relations_for_reschedule_check(
780        &self,
781    ) -> Vec<(FragmentId, FragmentId)> {
782        let mut no_shuffle_relations = Vec::new();
783        for fragment in self.database_info.fragment_infos() {
784            let downstream_fragment_id = fragment.fragment_id;
785            visit_stream_node_cont(&fragment.nodes, |node| {
786                if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
787                    && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
788                {
789                    no_shuffle_relations.push((merge.upstream_fragment_id, downstream_fragment_id));
790                }
791                true
792            });
793        }
794
795        for job in self.independent_checkpoint_job_controls.values() {
796            if let Some(fragment_infos) = job.fragment_infos() {
797                for fragment_info in fragment_infos.values() {
798                    let downstream_fragment_id = fragment_info.fragment_id;
799                    visit_stream_node_cont(&fragment_info.nodes, |node| {
800                        if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
801                            && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
802                        {
803                            no_shuffle_relations
804                                .push((merge.upstream_fragment_id, downstream_fragment_id));
805                        }
806                        true
807                    });
808                }
809            }
810        }
811        no_shuffle_relations
812    }
813
814    fn collect_reschedule_blocked_jobs_for_independent_jobs_inflight(
815        &self,
816    ) -> MetaResult<HashSet<JobId>> {
817        let mut initial_blocked_fragment_ids = HashSet::new();
818        for job in self.independent_checkpoint_job_controls.values() {
819            if let Some(fragment_infos) = job.fragment_infos() {
820                for fragment_info in fragment_infos.values() {
821                    if fragment_has_online_unreschedulable_scan(fragment_info) {
822                        initial_blocked_fragment_ids.insert(fragment_info.fragment_id);
823                        collect_fragment_upstream_fragment_ids(
824                            fragment_info,
825                            &mut initial_blocked_fragment_ids,
826                        );
827                    }
828                }
829            }
830        }
831
832        let mut blocked_fragment_ids = initial_blocked_fragment_ids.clone();
833        if !initial_blocked_fragment_ids.is_empty() {
834            let no_shuffle_relations =
835                self.collect_no_shuffle_fragment_relations_for_reschedule_check();
836            let (forward_edges, backward_edges) =
837                build_no_shuffle_fragment_graph_edges(no_shuffle_relations);
838            let initial_blocked_fragment_ids: Vec<_> =
839                initial_blocked_fragment_ids.iter().copied().collect();
840            for ensemble in find_no_shuffle_graphs(
841                &initial_blocked_fragment_ids,
842                &forward_edges,
843                &backward_edges,
844            )? {
845                blocked_fragment_ids.extend(ensemble.fragments());
846            }
847        }
848
849        let mut blocked_job_ids = HashSet::new();
850        blocked_job_ids.extend(
851            blocked_fragment_ids
852                .into_iter()
853                .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id)),
854        );
855        Ok(blocked_job_ids)
856    }
857
858    fn collect_reschedule_blocked_job_ids(
859        &self,
860        reschedules: &HashMap<FragmentId, Reschedule>,
861        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
862        blocked_job_ids: &HashSet<JobId>,
863    ) -> HashSet<JobId> {
864        let mut affected_fragment_ids: HashSet<FragmentId> = reschedules.keys().copied().collect();
865        affected_fragment_ids.extend(fragment_actors.keys().copied());
866        for reschedule in reschedules.values() {
867            affected_fragment_ids.extend(reschedule.downstream_fragment_ids.iter().copied());
868            affected_fragment_ids.extend(
869                reschedule
870                    .upstream_fragment_dispatcher_ids
871                    .iter()
872                    .map(|(fragment_id, _)| *fragment_id),
873            );
874        }
875
876        affected_fragment_ids
877            .into_iter()
878            .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id))
879            .filter(|job_id| blocked_job_ids.contains(job_id))
880            .collect()
881    }
882
883    fn next_complete_barrier_task(
884        &mut self,
885        periodic_barriers: &mut PeriodicBarriers,
886        partial_graph_manager: &mut PartialGraphManager,
887        task: &mut Option<CompleteBarrierTask>,
888        hummock_version_stats: &HummockVersionStats,
889    ) {
890        // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
891        let mut independent_jobs_task = vec![];
892        if let Some(committed_epoch) = self.committed_epoch {
893            // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
894            let mut finished_jobs = Vec::new();
895            let min_upstream_inflight_barrier = partial_graph_manager
896                .first_inflight_barrier(self.partial_graph_id)
897                .map(|epoch| epoch.prev);
898            for (job_id, job) in &mut self.independent_checkpoint_job_controls {
899                let IndependentCheckpointJobControl::CreatingStreamingJob(job) = job;
900                if let Some((epoch, resps, info, is_finish_epoch)) = job.start_completing(
901                    partial_graph_manager,
902                    min_upstream_inflight_barrier,
903                    committed_epoch,
904                ) {
905                    let resps = resps.into_values().collect_vec();
906                    if is_finish_epoch {
907                        assert!(info.notifiers.is_empty());
908                        finished_jobs.push((*job_id, epoch, resps));
909                        continue;
910                    };
911                    independent_jobs_task.push((*job_id, epoch, resps, info));
912                }
913            }
914
915            if !finished_jobs.is_empty() {
916                partial_graph_manager.remove_partial_graphs(
917                    finished_jobs
918                        .iter()
919                        .map(|(job_id, ..)| to_partial_graph_id(self.database_id, Some(*job_id)))
920                        .collect(),
921                );
922            }
923            for (job_id, epoch, resps) in finished_jobs {
924                debug!(epoch, %job_id, "finish creating job");
925                // It's safe to remove the creating job, because on CompleteJobType::Finished,
926                // all previous barriers have been collected and completed.
927                let IndependentCheckpointJobControl::CreatingStreamingJob(creating_streaming_job) =
928                    self.independent_checkpoint_job_controls
929                        .remove(&job_id)
930                        .expect("should exist");
931                let tracking_job = creating_streaming_job.into_tracking_job();
932                self.finishing_jobs_collector
933                    .collect(epoch, job_id, (resps, tracking_job));
934            }
935        }
936        let mut observed_non_checkpoint = false;
937        self.finishing_jobs_collector.advance_collected();
938        let epoch_end_bound = self
939            .finishing_jobs_collector
940            .first_inflight_epoch()
941            .map_or(Unbounded, |epoch| Excluded(epoch.prev));
942        if let Some((epoch, resps, info)) = partial_graph_manager.start_completing(
943            self.partial_graph_id,
944            epoch_end_bound,
945            |_, resps, post_collect_command| {
946                observed_non_checkpoint = true;
947                self.handle_refresh_table_info(task, &resps);
948                self.database_info.apply_collected_command(
949                    &post_collect_command,
950                    &resps,
951                    hummock_version_stats,
952                );
953            },
954        ) {
955            self.handle_refresh_table_info(task, &resps);
956            self.database_info.apply_collected_command(
957                &info.post_collect_command,
958                &resps,
959                hummock_version_stats,
960            );
961            let mut resps_to_commit = resps.into_values().collect_vec();
962            let mut staging_commit_info = self.database_info.take_staging_commit_info();
963            if let Some((_, finished_jobs, _)) =
964                self.finishing_jobs_collector
965                    .take_collected_if(|collected_epoch| {
966                        assert!(epoch <= collected_epoch.prev);
967                        epoch == collected_epoch.prev
968                    })
969            {
970                finished_jobs
971                    .into_iter()
972                    .for_each(|(_, (resps, tracking_job))| {
973                        resps_to_commit.extend(resps);
974                        staging_commit_info.finished_jobs.push(tracking_job);
975                    });
976            }
977            {
978                let task = task.get_or_insert_default();
979                Command::collect_commit_epoch_info(
980                    &self.database_info,
981                    &info,
982                    &mut task.commit_info,
983                    resps_to_commit,
984                    self.collect_backfill_pinned_upstream_log_epoch(),
985                );
986                self.completing_barrier = Some(info.barrier_info.prev_epoch());
987                task.finished_jobs.extend(staging_commit_info.finished_jobs);
988                task.finished_cdc_table_backfill
989                    .extend(staging_commit_info.finished_cdc_table_backfill);
990                task.epoch_infos
991                    .try_insert(self.partial_graph_id, info)
992                    .expect("non duplicate");
993                task.commit_info
994                    .truncate_tables
995                    .extend(staging_commit_info.table_ids_to_truncate);
996            }
997        } else if observed_non_checkpoint
998            && self.database_info.has_pending_finished_jobs()
999            && !partial_graph_manager.has_pending_checkpoint_barrier(self.partial_graph_id)
1000        {
1001            periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
1002        }
1003        if !independent_jobs_task.is_empty() {
1004            let task = task.get_or_insert_default();
1005            for (job_id, epoch, resps, info) in independent_jobs_task {
1006                collect_independent_job_commit_epoch_info(
1007                    &mut task.commit_info,
1008                    epoch,
1009                    resps,
1010                    &info,
1011                );
1012                task.epoch_infos
1013                    .try_insert(to_partial_graph_id(self.database_id, Some(job_id)), info)
1014                    .expect("non duplicate");
1015            }
1016        }
1017    }
1018
1019    fn ack_completed(
1020        &mut self,
1021        partial_graph_manager: &mut PartialGraphManager,
1022        command_prev_epoch: Option<u64>,
1023        independent_job_epochs: Vec<(JobId, u64)>,
1024    ) {
1025        {
1026            if let Some(prev_epoch) = self.completing_barrier.take() {
1027                assert_eq!(command_prev_epoch, Some(prev_epoch));
1028                self.committed_epoch = Some(prev_epoch);
1029                partial_graph_manager.ack_completed(self.partial_graph_id, prev_epoch);
1030            } else {
1031                assert_eq!(command_prev_epoch, None);
1032            };
1033            for (job_id, epoch) in independent_job_epochs {
1034                if let Some(job) = self.independent_checkpoint_job_controls.get_mut(&job_id) {
1035                    job.ack_completed(partial_graph_manager, epoch);
1036                }
1037                // If the job is not found, it was dropped and already removed
1038                // by `on_partial_graph_reset` while the completing task was running.
1039            }
1040        }
1041    }
1042
1043    fn handle_refresh_table_info(
1044        &self,
1045        task: &mut Option<CompleteBarrierTask>,
1046        resps: &HashMap<WorkerId, BarrierCompleteResponse>,
1047    ) {
1048        let list_finished_info = resps
1049            .values()
1050            .flat_map(|resp| resp.list_finished_sources.clone())
1051            .collect::<Vec<_>>();
1052        if !list_finished_info.is_empty() {
1053            let task = task.get_or_insert_default();
1054            task.list_finished_source_ids.extend(list_finished_info);
1055        }
1056
1057        let load_finished_info = resps
1058            .values()
1059            .flat_map(|resp| resp.load_finished_sources.clone())
1060            .collect::<Vec<_>>();
1061        if !load_finished_info.is_empty() {
1062            let task = task.get_or_insert_default();
1063            task.load_finished_source_ids.extend(load_finished_info);
1064        }
1065
1066        let refresh_finished_table_ids: Vec<JobId> = resps
1067            .values()
1068            .flat_map(|resp| {
1069                resp.refresh_finished_tables
1070                    .iter()
1071                    .map(|table_id| table_id.as_job_id())
1072            })
1073            .collect::<Vec<_>>();
1074        if !refresh_finished_table_ids.is_empty() {
1075            let task = task.get_or_insert_default();
1076            task.refresh_finished_table_job_ids
1077                .extend(refresh_finished_table_ids);
1078        }
1079    }
1080}
1081
1082impl DatabaseCheckpointControl {
1083    /// Handle the new barrier from the scheduled queue and inject it.
1084    fn handle_new_barrier(
1085        &mut self,
1086        command: Option<(Command, Vec<Notifier>)>,
1087        checkpoint: bool,
1088        span: tracing::Span,
1089        partial_graph_manager: &mut PartialGraphManager,
1090        hummock_version_stats: &HummockVersionStats,
1091        adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
1092        worker_nodes: &HashMap<WorkerId, WorkerNode>,
1093    ) -> MetaResult<()> {
1094        let curr_epoch = self.state.in_flight_prev_epoch().next();
1095
1096        let (command, mut notifiers) = if let Some((command, notifiers)) = command {
1097            (Some(command), notifiers)
1098        } else {
1099            (None, vec![])
1100        };
1101
1102        debug_assert!(
1103            !matches!(
1104                &command,
1105                Some(Command::RescheduleIntent {
1106                    reschedule_plan: None,
1107                    ..
1108                })
1109            ),
1110            "reschedule intent should be resolved before injection"
1111        );
1112
1113        if let Some(Command::DropStreamingJobs {
1114            streaming_job_ids, ..
1115        }) = &command
1116        {
1117            if streaming_job_ids.len() > 1 {
1118                for job_to_cancel in streaming_job_ids {
1119                    if self
1120                        .independent_checkpoint_job_controls
1121                        .contains_key(job_to_cancel)
1122                    {
1123                        warn!(
1124                            job_id = %job_to_cancel,
1125                            "ignore multi-job cancel command on creating snapshot backfill streaming job"
1126                        );
1127                        for notifier in notifiers {
1128                            notifier
1129                                .notify_start_failed(anyhow!("cannot cancel creating snapshot backfill streaming job with other jobs, \
1130                                the job will continue creating until created or recovery. Please cancel the snapshot backfilling job in a single DDL ").into());
1131                        }
1132                        return Ok(());
1133                    }
1134                }
1135            } else if let Some(job_to_drop) = streaming_job_ids.iter().next()
1136                && let Some(job) = self
1137                    .independent_checkpoint_job_controls
1138                    .get_mut(job_to_drop)
1139            {
1140                let dropped = job.drop(&mut notifiers, partial_graph_manager);
1141                if dropped {
1142                    return Ok(());
1143                }
1144            }
1145        }
1146
1147        if let Some(Command::Throttle { jobs, .. }) = &command
1148            && jobs.len() > 1
1149            && let Some(independent_job_id) = jobs
1150                .iter()
1151                .find(|job| self.independent_checkpoint_job_controls.contains_key(*job))
1152        {
1153            warn!(
1154                job_id = %independent_job_id,
1155                "ignore multi-job throttle command on independent checkpoint job"
1156            );
1157            for notifier in notifiers {
1158                notifier.notify_start_failed(
1159                    anyhow!(
1160                        "cannot alter rate limit for independent checkpoint job with other jobs, \
1161                                the original rate limit will be kept during recovery."
1162                    )
1163                    .into(),
1164                );
1165            }
1166            return Ok(());
1167        };
1168
1169        if let Some(Command::RescheduleIntent {
1170            reschedule_plan: Some(reschedule_plan),
1171            ..
1172        }) = &command
1173            && !self.independent_checkpoint_job_controls.is_empty()
1174        {
1175            let blocked_job_ids =
1176                self.collect_reschedule_blocked_jobs_for_independent_jobs_inflight()?;
1177            let blocked_reschedule_job_ids = self.collect_reschedule_blocked_job_ids(
1178                &reschedule_plan.reschedules,
1179                &reschedule_plan.fragment_actors,
1180                &blocked_job_ids,
1181            );
1182            if !blocked_reschedule_job_ids.is_empty() {
1183                warn!(
1184                    blocked_reschedule_job_ids = ?blocked_reschedule_job_ids,
1185                    "reject reschedule fragments related to creating unreschedulable backfill jobs"
1186                );
1187                for notifier in notifiers {
1188                    notifier.notify_start_failed(
1189                        anyhow!(
1190                            "cannot reschedule jobs {:?} when creating jobs with unreschedulable backfill fragments",
1191                            blocked_reschedule_job_ids
1192                        )
1193                            .into(),
1194                    );
1195                }
1196                return Ok(());
1197            }
1198        }
1199
1200        if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1201            && self.database_info.is_empty()
1202        {
1203            assert!(
1204                self.independent_checkpoint_job_controls.is_empty(),
1205                "should not have snapshot backfill job when there is no normal job in database"
1206            );
1207            // skip the command when there is nothing to do with the barrier
1208            for mut notifier in notifiers {
1209                notifier.notify_started();
1210                notifier.notify_collected();
1211            }
1212            return Ok(());
1213        };
1214
1215        if let Some(Command::CreateStreamingJob {
1216            job_type: CreateStreamingJobType::SnapshotBackfill(_),
1217            ..
1218        }) = &command
1219            && self.state.is_paused()
1220        {
1221            warn!("cannot create streaming job with snapshot backfill when paused");
1222            for notifier in notifiers {
1223                notifier.notify_start_failed(
1224                    anyhow!("cannot create streaming job with snapshot backfill when paused",)
1225                        .into(),
1226                );
1227            }
1228            return Ok(());
1229        }
1230
1231        let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1232        // Tracing related stuff
1233        barrier_info.prev_epoch.span().in_scope(|| {
1234            tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1235        });
1236        span.record("epoch", barrier_info.curr_epoch());
1237
1238        let epoch = barrier_info.epoch();
1239        let ApplyCommandInfo { jobs_to_wait } = match self.apply_command(
1240            command,
1241            &mut notifiers,
1242            barrier_info,
1243            partial_graph_manager,
1244            hummock_version_stats,
1245            adaptive_parallelism_strategy,
1246            worker_nodes,
1247        ) {
1248            Ok(info) => {
1249                assert!(notifiers.is_empty());
1250                info
1251            }
1252            Err(err) => {
1253                for notifier in notifiers {
1254                    notifier.notify_start_failed(err.clone());
1255                }
1256                fail_point!("inject_barrier_err_success");
1257                return Err(err);
1258            }
1259        };
1260
1261        // Record the in-flight barrier.
1262        self.enqueue_command(epoch, jobs_to_wait);
1263
1264        Ok(())
1265    }
1266}