risingwave_meta/barrier/checkpoint/
control.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::ops::Bound::{Excluded, Unbounded};
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use itertools::Itertools;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_common::util::epoch::EpochPair;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
29use risingwave_meta_model::WorkerId;
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::{FragmentId, PartialGraphId};
33use risingwave_pb::stream_plan::DispatcherType as PbDispatcherType;
34use risingwave_pb::stream_plan::stream_node::NodeBody;
35use risingwave_pb::stream_service::BarrierCompleteResponse;
36use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
37use tracing::{debug, warn};
38
39use crate::barrier::cdc_progress::CdcProgress;
40use crate::barrier::checkpoint::independent_job::IndependentCheckpointJobControl;
41use crate::barrier::checkpoint::recovery::{
42    DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
43    RecoveringStateAction,
44};
45use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
46use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
47use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
48use crate::barrier::notifier::Notifier;
49use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager, PartialGraphStat};
50use crate::barrier::progress::TrackingJob;
51use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
52use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
53use crate::barrier::utils::{BarrierItemCollector, collect_independent_job_commit_epoch_info};
54use crate::barrier::{
55    BackfillProgress, Command, CreateStreamingJobType, FragmentBackfillProgress, Reschedule,
56};
57use crate::controller::fragment::InflightFragmentInfo;
58use crate::controller::scale::{build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs};
59use crate::manager::MetaSrvEnv;
60
61fn fragment_has_online_unreschedulable_scan(fragment: &InflightFragmentInfo) -> bool {
62    let mut has_unreschedulable_scan = false;
63    visit_stream_node_cont(&fragment.nodes, |node| {
64        if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
65            let scan_type = stream_scan.stream_scan_type();
66            if !scan_type.is_reschedulable(true) {
67                has_unreschedulable_scan = true;
68                return false;
69            }
70        }
71        true
72    });
73    has_unreschedulable_scan
74}
75
76fn collect_fragment_upstream_fragment_ids(
77    fragment: &InflightFragmentInfo,
78    upstream_fragment_ids: &mut HashSet<FragmentId>,
79) {
80    visit_stream_node_cont(&fragment.nodes, |node| {
81        if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref() {
82            upstream_fragment_ids.insert(merge.upstream_fragment_id);
83        }
84        true
85    });
86}
87
88use crate::model::ActorId;
89use crate::rpc::metrics::GLOBAL_META_METRICS;
90use crate::{MetaError, MetaResult};
91
92pub(crate) struct CheckpointControl {
93    pub(crate) env: MetaSrvEnv,
94    pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
95    pub(super) hummock_version_stats: HummockVersionStats,
96    /// The max barrier nums in flight
97    pub(crate) in_flight_barrier_nums: usize,
98}
99
100impl CheckpointControl {
101    pub fn new(env: MetaSrvEnv) -> Self {
102        Self {
103            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
104            env,
105            databases: Default::default(),
106            hummock_version_stats: Default::default(),
107        }
108    }
109
110    pub(crate) fn recover(
111        databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
112        failed_databases: HashMap<DatabaseId, HashSet<PartialGraphId>>, /* `database_id` -> set of resetting partial graph ids */
113        hummock_version_stats: HummockVersionStats,
114        env: MetaSrvEnv,
115    ) -> Self {
116        env.shared_actor_infos()
117            .retain_databases(databases.keys().chain(failed_databases.keys()).cloned());
118        Self {
119            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
120            env,
121            databases: databases
122                .into_iter()
123                .map(|(database_id, control)| {
124                    (
125                        database_id,
126                        DatabaseCheckpointControlStatus::Running(control),
127                    )
128                })
129                .chain(failed_databases.into_iter().map(
130                    |(database_id, resetting_partial_graphs)| {
131                        (
132                            database_id,
133                            DatabaseCheckpointControlStatus::Recovering(
134                                DatabaseRecoveringState::new_resetting(
135                                    database_id,
136                                    resetting_partial_graphs,
137                                ),
138                            ),
139                        )
140                    },
141                ))
142                .collect(),
143            hummock_version_stats,
144        }
145    }
146
147    pub(crate) fn ack_completed(
148        &mut self,
149        partial_graph_manager: &mut PartialGraphManager,
150        output: BarrierCompleteOutput,
151    ) {
152        self.hummock_version_stats = output.hummock_version_stats;
153        for (database_id, (command_prev_epoch, independent_job_epochs)) in output.epochs_to_ack {
154            self.databases
155                .get_mut(&database_id)
156                .expect("should exist")
157                .expect_running("should have wait for completing command before enter recovery")
158                .ack_completed(
159                    partial_graph_manager,
160                    command_prev_epoch,
161                    independent_job_epochs,
162                );
163        }
164    }
165
166    pub(crate) fn next_complete_barrier_task(
167        &mut self,
168        periodic_barriers: &mut PeriodicBarriers,
169        partial_graph_manager: &mut PartialGraphManager,
170    ) -> Option<CompleteBarrierTask> {
171        let mut task = None;
172        for database in self.databases.values_mut() {
173            let Some(database) = database.running_state_mut() else {
174                continue;
175            };
176            database.next_complete_barrier_task(
177                periodic_barriers,
178                partial_graph_manager,
179                &mut task,
180                &self.hummock_version_stats,
181            );
182        }
183        task
184    }
185
186    pub(crate) fn barrier_collected(
187        &mut self,
188        partial_graph_id: PartialGraphId,
189        collected_barrier: CollectedBarrier<'_>,
190        periodic_barriers: &mut PeriodicBarriers,
191    ) -> MetaResult<()> {
192        let (database_id, _) = from_partial_graph_id(partial_graph_id);
193        let database_status = self.databases.get_mut(&database_id).expect("should exist");
194        match database_status {
195            DatabaseCheckpointControlStatus::Running(database) => {
196                database.barrier_collected(partial_graph_id, collected_barrier, periodic_barriers)
197            }
198            DatabaseCheckpointControlStatus::Recovering(_) => {
199                if cfg!(debug_assertions) {
200                    panic!(
201                        "receive collected barrier {:?} on recovering database {} from partial graph {}",
202                        collected_barrier, database_id, partial_graph_id
203                    );
204                } else {
205                    warn!(?collected_barrier, %partial_graph_id, "ignore collected barrier on recovering database");
206                }
207                Ok(())
208            }
209        }
210    }
211
212    pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
213        self.databases.iter().filter_map(|(database_id, database)| {
214            database.running_state().is_none().then_some(*database_id)
215        })
216    }
217
218    pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
219        self.databases.iter().filter_map(|(database_id, database)| {
220            database.running_state().is_some().then_some(*database_id)
221        })
222    }
223
224    pub(crate) fn database_info(&self, database_id: DatabaseId) -> Option<&InflightDatabaseInfo> {
225        self.databases
226            .get(&database_id)
227            .and_then(|database| database.running_state())
228            .map(|database| &database.database_info)
229    }
230
231    pub(crate) fn may_have_snapshot_backfilling_jobs(&self) -> bool {
232        self.databases
233            .values()
234            .any(|database| database.may_have_snapshot_backfilling_jobs())
235    }
236
237    /// return Some(failed `database_id` -> `err`)
238    pub(crate) fn handle_new_barrier(
239        &mut self,
240        new_barrier: NewBarrier,
241        partial_graph_manager: &mut PartialGraphManager,
242        worker_nodes: &HashMap<WorkerId, WorkerNode>,
243    ) -> MetaResult<()> {
244        let NewBarrier {
245            database_id,
246            command,
247            span,
248            checkpoint,
249        } = new_barrier;
250
251        if let Some((mut command, notifiers)) = command {
252            if let &mut Command::CreateStreamingJob {
253                ref mut cross_db_snapshot_backfill_info,
254                ref info,
255                ..
256            } = &mut command
257            {
258                for (table_id, snapshot_epoch) in
259                    &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
260                {
261                    for database in self.databases.values() {
262                        if let Some(database) = database.running_state()
263                            && database.database_info.contains_job(table_id.as_job_id())
264                        {
265                            if let Some(committed_epoch) = database.committed_epoch {
266                                *snapshot_epoch = Some(committed_epoch);
267                            }
268                            break;
269                        }
270                    }
271                    if snapshot_epoch.is_none() {
272                        let table_id = *table_id;
273                        warn!(
274                            ?cross_db_snapshot_backfill_info,
275                            ?table_id,
276                            ?info,
277                            "database of cross db upstream table not found"
278                        );
279                        let err: MetaError =
280                            anyhow!("database of cross db upstream table {} not found", table_id)
281                                .into();
282                        for notifier in notifiers {
283                            notifier.notify_start_failed(err.clone());
284                        }
285
286                        return Ok(());
287                    }
288                }
289            }
290
291            let database = match self.databases.entry(database_id) {
292                Entry::Occupied(entry) => entry
293                    .into_mut()
294                    .expect_running("should not have command when not running"),
295                Entry::Vacant(entry) => match &command {
296                    Command::CreateStreamingJob { info, job_type, .. } => {
297                        let CreateStreamingJobType::Normal = job_type else {
298                            if cfg!(debug_assertions) {
299                                panic!(
300                                    "unexpected first job of type {job_type:?} with info {info:?}"
301                                );
302                            } else {
303                                for notifier in notifiers {
304                                    notifier.notify_start_failed(anyhow!("unexpected job_type {job_type:?} for first job {} in database {database_id}", info.streaming_job.id()).into());
305                                }
306                                return Ok(());
307                            }
308                        };
309                        let new_database = DatabaseCheckpointControl::new(
310                            database_id,
311                            self.env.shared_actor_infos().clone(),
312                        );
313                        let adder = partial_graph_manager.add_partial_graph(
314                            to_partial_graph_id(database_id, None),
315                            DatabaseCheckpointControlMetrics::new(database_id),
316                        );
317                        adder.added();
318                        entry
319                            .insert(DatabaseCheckpointControlStatus::Running(new_database))
320                            .expect_running("just initialized as running")
321                    }
322                    Command::Flush
323                    | Command::Pause
324                    | Command::Resume
325                    | Command::DropStreamingJobs { .. }
326                    | Command::DropSubscription { .. } => {
327                        for mut notifier in notifiers {
328                            notifier.notify_started();
329                            notifier.notify_collected();
330                        }
331                        warn!(?command, "skip command for empty database");
332                        return Ok(());
333                    }
334                    Command::RescheduleIntent { .. }
335                    | Command::ReplaceStreamJob(_)
336                    | Command::SourceChangeSplit(_)
337                    | Command::Throttle { .. }
338                    | Command::CreateSubscription { .. }
339                    | Command::AlterSubscriptionRetention { .. }
340                    | Command::ConnectorPropsChange(_)
341                    | Command::Refresh { .. }
342                    | Command::ListFinish { .. }
343                    | Command::LoadFinish { .. }
344                    | Command::ResetSource { .. }
345                    | Command::ResumeBackfill { .. }
346                    | Command::InjectSourceOffsets { .. } => {
347                        if cfg!(debug_assertions) {
348                            panic!(
349                                "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
350                                database_id, command
351                            )
352                        } else {
353                            warn!(%database_id, ?command, "database not exist when handling command");
354                            for notifier in notifiers {
355                                notifier.notify_start_failed(anyhow!("database {database_id} not exist when handling command {command:?}").into());
356                            }
357                            return Ok(());
358                        }
359                    }
360                },
361            };
362
363            database.handle_new_barrier(
364                Some((command, notifiers)),
365                checkpoint,
366                span,
367                partial_graph_manager,
368                &self.hummock_version_stats,
369                worker_nodes,
370            )
371        } else {
372            let database = match self.databases.entry(database_id) {
373                Entry::Occupied(entry) => entry.into_mut(),
374                Entry::Vacant(_) => {
375                    // If it does not exist in the HashMap yet, it means that the first streaming
376                    // job has not been created, and we do not need to send a barrier.
377                    return Ok(());
378                }
379            };
380            let Some(database) = database.running_state_mut() else {
381                // Skip new barrier for database which is not running.
382                return Ok(());
383            };
384            if partial_graph_manager.inflight_barrier_num(database.partial_graph_id)
385                >= self.in_flight_barrier_nums
386            {
387                // Skip new barrier with no explicit command when the database should pause inject additional barrier
388                return Ok(());
389            }
390            database.handle_new_barrier(
391                None,
392                checkpoint,
393                span,
394                partial_graph_manager,
395                &self.hummock_version_stats,
396                worker_nodes,
397            )
398        }
399    }
400
401    pub(crate) fn gen_backfill_progress(&self) -> HashMap<JobId, BackfillProgress> {
402        let mut progress = HashMap::new();
403        for status in self.databases.values() {
404            let Some(database_checkpoint_control) = status.running_state() else {
405                continue;
406            };
407            // Progress of normal backfill
408            progress.extend(
409                database_checkpoint_control
410                    .database_info
411                    .gen_backfill_progress(),
412            );
413            // Progress of independent checkpoint jobs
414            for (job_id, job) in &database_checkpoint_control.independent_checkpoint_job_controls {
415                progress.extend([(*job_id, job.gen_backfill_progress())]);
416            }
417        }
418        progress
419    }
420
421    pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
422        let mut progress = Vec::new();
423        for status in self.databases.values() {
424            let Some(database_checkpoint_control) = status.running_state() else {
425                continue;
426            };
427            progress.extend(
428                database_checkpoint_control
429                    .database_info
430                    .gen_fragment_backfill_progress(),
431            );
432            for job in database_checkpoint_control
433                .independent_checkpoint_job_controls
434                .values()
435            {
436                progress.extend(job.gen_fragment_backfill_progress());
437            }
438        }
439        progress
440    }
441
442    pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
443        let mut progress = HashMap::new();
444        for status in self.databases.values() {
445            let Some(database_checkpoint_control) = status.running_state() else {
446                continue;
447            };
448            // Progress of normal backfill
449            progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
450        }
451        progress
452    }
453
454    pub(crate) fn databases_failed_at_worker_err(
455        &mut self,
456        worker_id: WorkerId,
457    ) -> impl Iterator<Item = DatabaseId> + '_ {
458        self.databases
459            .iter_mut()
460            .filter_map(
461                move |(database_id, database_status)| match database_status {
462                    DatabaseCheckpointControlStatus::Running(control) => {
463                        if !control.is_valid_after_worker_err(worker_id) {
464                            Some(*database_id)
465                        } else {
466                            None
467                        }
468                    }
469                    DatabaseCheckpointControlStatus::Recovering(state) => {
470                        if !state.is_valid_after_worker_err(worker_id) {
471                            Some(*database_id)
472                        } else {
473                            None
474                        }
475                    }
476                },
477            )
478    }
479}
480
481pub(crate) enum CheckpointControlEvent<'a> {
482    EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
483    EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
484}
485
486impl CheckpointControl {
487    pub(crate) fn on_partial_graph_reset(
488        &mut self,
489        partial_graph_id: PartialGraphId,
490        reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
491    ) {
492        let (database_id, independent_job_id) = from_partial_graph_id(partial_graph_id);
493        match self.databases.get_mut(&database_id).expect("should exist") {
494            DatabaseCheckpointControlStatus::Running(database) => {
495                if let Some(independent_job_id) = independent_job_id {
496                    match database
497                        .independent_checkpoint_job_controls
498                        .remove(&independent_job_id)
499                    {
500                        Some(independent_job) => {
501                            independent_job.on_partial_graph_reset();
502                        }
503                        None => {
504                            if cfg!(debug_assertions) {
505                                panic!(
506                                    "receive reset partial graph resp on non-existing independent job {independent_job_id} in database {database_id}"
507                                )
508                            }
509                            warn!(
510                                %database_id,
511                                %independent_job_id,
512                                "ignore reset partial graph resp on non-existing independent job on running database"
513                            );
514                        }
515                    }
516                } else {
517                    unreachable!("should not receive reset database resp when database running")
518                }
519            }
520            DatabaseCheckpointControlStatus::Recovering(state) => {
521                state.on_partial_graph_reset(partial_graph_id, reset_resps);
522            }
523        }
524    }
525
526    pub(crate) fn on_partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
527        let (database_id, _) = from_partial_graph_id(partial_graph_id);
528        match self.databases.get_mut(&database_id).expect("should exist") {
529            DatabaseCheckpointControlStatus::Running(_) => {
530                unreachable!("should not have partial graph initialized when running")
531            }
532            DatabaseCheckpointControlStatus::Recovering(state) => {
533                state.partial_graph_initialized(partial_graph_id);
534            }
535        }
536    }
537
538    pub(crate) fn next_event(
539        &mut self,
540    ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
541        let mut this = Some(self);
542        poll_fn(move |cx| {
543            let Some(this_mut) = this.as_mut() else {
544                unreachable!("should not be polled after poll ready")
545            };
546            for (&database_id, database_status) in &mut this_mut.databases {
547                match database_status {
548                    DatabaseCheckpointControlStatus::Running(_) => {}
549                    DatabaseCheckpointControlStatus::Recovering(state) => {
550                        let poll_result = state.poll_next_event(cx);
551                        if let Poll::Ready(action) = poll_result {
552                            let this = this.take().expect("checked Some");
553                            return Poll::Ready(match action {
554                                RecoveringStateAction::EnterInitializing(reset_workers) => {
555                                    CheckpointControlEvent::EnteringInitializing(
556                                        this.new_database_status_action(
557                                            database_id,
558                                            EnterInitializing(reset_workers),
559                                        ),
560                                    )
561                                }
562                                RecoveringStateAction::EnterRunning => {
563                                    CheckpointControlEvent::EnteringRunning(
564                                        this.new_database_status_action(database_id, EnterRunning),
565                                    )
566                                }
567                            });
568                        }
569                    }
570                }
571            }
572            Poll::Pending
573        })
574    }
575}
576
577pub(crate) enum DatabaseCheckpointControlStatus {
578    Running(DatabaseCheckpointControl),
579    Recovering(DatabaseRecoveringState),
580}
581
582impl DatabaseCheckpointControlStatus {
583    fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
584        match self {
585            DatabaseCheckpointControlStatus::Running(state) => Some(state),
586            DatabaseCheckpointControlStatus::Recovering(_) => None,
587        }
588    }
589
590    fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
591        match self {
592            DatabaseCheckpointControlStatus::Running(state) => Some(state),
593            DatabaseCheckpointControlStatus::Recovering(_) => None,
594        }
595    }
596
597    fn may_have_snapshot_backfilling_jobs(&self) -> bool {
598        self.running_state()
599            .map(|database| {
600                database
601                    .independent_checkpoint_job_controls
602                    .values()
603                    .any(|job| job.is_snapshot_backfilling())
604            })
605            .unwrap_or(true) // there can be snapshot backfilling jobs when the database is recovering.
606    }
607
608    fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
609        match self {
610            DatabaseCheckpointControlStatus::Running(state) => state,
611            DatabaseCheckpointControlStatus::Recovering(_) => {
612                panic!("should be at running: {}", reason)
613            }
614        }
615    }
616}
617
618pub(in crate::barrier) struct DatabaseCheckpointControlMetrics {
619    barrier_latency: LabelGuardedHistogram,
620    in_flight_barrier_nums: LabelGuardedIntGauge,
621    all_barrier_nums: LabelGuardedIntGauge,
622}
623
624impl DatabaseCheckpointControlMetrics {
625    pub(in crate::barrier) fn new(database_id: DatabaseId) -> Self {
626        let database_id_str = database_id.to_string();
627        let barrier_latency = GLOBAL_META_METRICS
628            .barrier_latency
629            .with_guarded_label_values(&[&database_id_str]);
630        let in_flight_barrier_nums = GLOBAL_META_METRICS
631            .in_flight_barrier_nums
632            .with_guarded_label_values(&[&database_id_str]);
633        let all_barrier_nums = GLOBAL_META_METRICS
634            .all_barrier_nums
635            .with_guarded_label_values(&[&database_id_str]);
636        Self {
637            barrier_latency,
638            in_flight_barrier_nums,
639            all_barrier_nums,
640        }
641    }
642}
643
644impl PartialGraphStat for DatabaseCheckpointControlMetrics {
645    fn observe_barrier_latency(&self, _epoch: EpochPair, barrier_latency_secs: f64) {
646        self.barrier_latency.observe(barrier_latency_secs);
647    }
648
649    fn observe_barrier_num(&self, inflight_barrier_num: usize, collected_barrier_num: usize) {
650        self.in_flight_barrier_nums.set(inflight_barrier_num as _);
651        self.all_barrier_nums
652            .set((inflight_barrier_num + collected_barrier_num) as _);
653    }
654}
655
656/// Controls the concurrent execution of commands.
657pub(in crate::barrier) struct DatabaseCheckpointControl {
658    pub(super) database_id: DatabaseId,
659    partial_graph_id: PartialGraphId,
660    pub(super) state: BarrierWorkerState,
661
662    finishing_jobs_collector:
663        BarrierItemCollector<JobId, (Vec<BarrierCompleteResponse>, TrackingJob), ()>,
664    /// The barrier that are completing.
665    /// Some(`prev_epoch`)
666    completing_barrier: Option<u64>,
667
668    committed_epoch: Option<u64>,
669
670    pub(super) database_info: InflightDatabaseInfo,
671    pub independent_checkpoint_job_controls: HashMap<JobId, IndependentCheckpointJobControl>,
672}
673
674impl DatabaseCheckpointControl {
675    fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
676        Self {
677            database_id,
678            partial_graph_id: to_partial_graph_id(database_id, None),
679            state: BarrierWorkerState::new(),
680            finishing_jobs_collector: BarrierItemCollector::new(),
681            completing_barrier: None,
682            committed_epoch: None,
683            database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
684            independent_checkpoint_job_controls: Default::default(),
685        }
686    }
687
688    pub(crate) fn recovery(
689        database_id: DatabaseId,
690        state: BarrierWorkerState,
691        committed_epoch: u64,
692        database_info: InflightDatabaseInfo,
693        independent_checkpoint_job_controls: HashMap<JobId, IndependentCheckpointJobControl>,
694    ) -> Self {
695        Self {
696            database_id,
697            partial_graph_id: to_partial_graph_id(database_id, None),
698            state,
699            finishing_jobs_collector: BarrierItemCollector::new(),
700            completing_barrier: None,
701            committed_epoch: Some(committed_epoch),
702            database_info,
703            independent_checkpoint_job_controls,
704        }
705    }
706
707    pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
708        !self.database_info.contains_worker(worker_id as _)
709            && self
710                .independent_checkpoint_job_controls
711                .values()
712                .all(|job| {
713                    job.fragment_infos()
714                        .map(|fragment_infos| {
715                            !InflightFragmentInfo::contains_worker(
716                                fragment_infos.values(),
717                                worker_id,
718                            )
719                        })
720                        .unwrap_or(true)
721                })
722    }
723
724    /// Enqueue a barrier command
725    fn enqueue_command(&mut self, epoch: EpochPair, independent_jobs_to_wait: HashSet<JobId>) {
726        let prev_epoch = epoch.prev;
727        tracing::trace!(prev_epoch, ?independent_jobs_to_wait, "enqueue command");
728        if !independent_jobs_to_wait.is_empty() {
729            self.finishing_jobs_collector
730                .enqueue(epoch, independent_jobs_to_wait, ());
731        }
732    }
733
734    /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
735    /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
736    fn barrier_collected(
737        &mut self,
738        partial_graph_id: PartialGraphId,
739        collected_barrier: CollectedBarrier<'_>,
740        periodic_barriers: &mut PeriodicBarriers,
741    ) -> MetaResult<()> {
742        let prev_epoch = collected_barrier.epoch.prev;
743        tracing::trace!(
744            prev_epoch,
745            partial_graph_id = %partial_graph_id,
746            "barrier collected"
747        );
748        let (database_id, independent_job_id) = from_partial_graph_id(partial_graph_id);
749        assert_eq!(self.database_id, database_id);
750        if let Some(independent_job_id) = independent_job_id {
751            let job = self
752                .independent_checkpoint_job_controls
753                .get_mut(&independent_job_id)
754                .expect("should exist");
755            let should_force_checkpoint = job.collect(collected_barrier);
756            if should_force_checkpoint {
757                periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
758            }
759        }
760        Ok(())
761    }
762}
763
764impl DatabaseCheckpointControl {
765    /// return creating job table fragment id -> (backfill progress epoch , {`upstream_mv_table_id`})
766    fn collect_backfill_pinned_upstream_log_epoch(
767        &self,
768    ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
769        self.independent_checkpoint_job_controls
770            .iter()
771            .map(|(job_id, job)| (*job_id, job.pinned_upstream_log_epoch()))
772            .collect()
773    }
774
775    fn collect_no_shuffle_fragment_relations_for_reschedule_check(
776        &self,
777    ) -> Vec<(FragmentId, FragmentId)> {
778        let mut no_shuffle_relations = Vec::new();
779        for fragment in self.database_info.fragment_infos() {
780            let downstream_fragment_id = fragment.fragment_id;
781            visit_stream_node_cont(&fragment.nodes, |node| {
782                if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
783                    && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
784                {
785                    no_shuffle_relations.push((merge.upstream_fragment_id, downstream_fragment_id));
786                }
787                true
788            });
789        }
790
791        for job in self.independent_checkpoint_job_controls.values() {
792            if let Some(fragment_infos) = job.fragment_infos() {
793                for fragment_info in fragment_infos.values() {
794                    let downstream_fragment_id = fragment_info.fragment_id;
795                    visit_stream_node_cont(&fragment_info.nodes, |node| {
796                        if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
797                            && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
798                        {
799                            no_shuffle_relations
800                                .push((merge.upstream_fragment_id, downstream_fragment_id));
801                        }
802                        true
803                    });
804                }
805            }
806        }
807        no_shuffle_relations
808    }
809
810    fn collect_reschedule_blocked_jobs_for_independent_jobs_inflight(
811        &self,
812    ) -> MetaResult<HashSet<JobId>> {
813        let mut initial_blocked_fragment_ids = HashSet::new();
814        for job in self.independent_checkpoint_job_controls.values() {
815            if let Some(fragment_infos) = job.fragment_infos() {
816                for fragment_info in fragment_infos.values() {
817                    if fragment_has_online_unreschedulable_scan(fragment_info) {
818                        initial_blocked_fragment_ids.insert(fragment_info.fragment_id);
819                        collect_fragment_upstream_fragment_ids(
820                            fragment_info,
821                            &mut initial_blocked_fragment_ids,
822                        );
823                    }
824                }
825            }
826        }
827
828        let mut blocked_fragment_ids = initial_blocked_fragment_ids.clone();
829        if !initial_blocked_fragment_ids.is_empty() {
830            let no_shuffle_relations =
831                self.collect_no_shuffle_fragment_relations_for_reschedule_check();
832            let (forward_edges, backward_edges) =
833                build_no_shuffle_fragment_graph_edges(no_shuffle_relations);
834            let initial_blocked_fragment_ids: Vec<_> =
835                initial_blocked_fragment_ids.iter().copied().collect();
836            for ensemble in find_no_shuffle_graphs(
837                &initial_blocked_fragment_ids,
838                &forward_edges,
839                &backward_edges,
840            )? {
841                blocked_fragment_ids.extend(ensemble.fragments());
842            }
843        }
844
845        let mut blocked_job_ids = HashSet::new();
846        blocked_job_ids.extend(
847            blocked_fragment_ids
848                .into_iter()
849                .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id)),
850        );
851        Ok(blocked_job_ids)
852    }
853
854    fn collect_reschedule_blocked_job_ids(
855        &self,
856        reschedules: &HashMap<FragmentId, Reschedule>,
857        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
858        blocked_job_ids: &HashSet<JobId>,
859    ) -> HashSet<JobId> {
860        let mut affected_fragment_ids: HashSet<FragmentId> = reschedules.keys().copied().collect();
861        affected_fragment_ids.extend(fragment_actors.keys().copied());
862        for reschedule in reschedules.values() {
863            affected_fragment_ids.extend(reschedule.downstream_fragment_ids.iter().copied());
864            affected_fragment_ids.extend(
865                reschedule
866                    .upstream_fragment_dispatcher_ids
867                    .iter()
868                    .map(|(fragment_id, _)| *fragment_id),
869            );
870        }
871
872        affected_fragment_ids
873            .into_iter()
874            .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id))
875            .filter(|job_id| blocked_job_ids.contains(job_id))
876            .collect()
877    }
878
879    fn next_complete_barrier_task(
880        &mut self,
881        periodic_barriers: &mut PeriodicBarriers,
882        partial_graph_manager: &mut PartialGraphManager,
883        task: &mut Option<CompleteBarrierTask>,
884        hummock_version_stats: &HummockVersionStats,
885    ) {
886        // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
887        let mut independent_jobs_task = vec![];
888        if let Some(committed_epoch) = self.committed_epoch {
889            // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
890            let mut finished_jobs = Vec::new();
891            let min_upstream_inflight_barrier = partial_graph_manager
892                .first_inflight_barrier(self.partial_graph_id)
893                .map(|epoch| epoch.prev);
894            for (job_id, job) in &mut self.independent_checkpoint_job_controls {
895                let IndependentCheckpointJobControl::CreatingStreamingJob(job) = job;
896                if let Some((epoch, resps, info, is_finish_epoch)) = job.start_completing(
897                    partial_graph_manager,
898                    min_upstream_inflight_barrier,
899                    committed_epoch,
900                ) {
901                    let resps = resps.into_values().collect_vec();
902                    if is_finish_epoch {
903                        assert!(info.notifiers.is_empty());
904                        finished_jobs.push((*job_id, epoch, resps));
905                        continue;
906                    };
907                    independent_jobs_task.push((*job_id, epoch, resps, info));
908                }
909            }
910
911            if !finished_jobs.is_empty() {
912                partial_graph_manager.remove_partial_graphs(
913                    finished_jobs
914                        .iter()
915                        .map(|(job_id, ..)| to_partial_graph_id(self.database_id, Some(*job_id)))
916                        .collect(),
917                );
918            }
919            for (job_id, epoch, resps) in finished_jobs {
920                debug!(epoch, %job_id, "finish creating job");
921                // It's safe to remove the creating job, because on CompleteJobType::Finished,
922                // all previous barriers have been collected and completed.
923                let IndependentCheckpointJobControl::CreatingStreamingJob(creating_streaming_job) =
924                    self.independent_checkpoint_job_controls
925                        .remove(&job_id)
926                        .expect("should exist");
927                let tracking_job = creating_streaming_job.into_tracking_job();
928                self.finishing_jobs_collector
929                    .collect(epoch, job_id, (resps, tracking_job));
930            }
931        }
932        let mut observed_non_checkpoint = false;
933        self.finishing_jobs_collector.advance_collected();
934        let epoch_end_bound = self
935            .finishing_jobs_collector
936            .first_inflight_epoch()
937            .map_or(Unbounded, |epoch| Excluded(epoch.prev));
938        if let Some((epoch, resps, info)) = partial_graph_manager.start_completing(
939            self.partial_graph_id,
940            epoch_end_bound,
941            |_, resps, post_collect_command| {
942                observed_non_checkpoint = true;
943                self.handle_refresh_table_info(task, &resps);
944                self.database_info.apply_collected_command(
945                    &post_collect_command,
946                    &resps,
947                    hummock_version_stats,
948                );
949            },
950        ) {
951            self.handle_refresh_table_info(task, &resps);
952            self.database_info.apply_collected_command(
953                &info.post_collect_command,
954                &resps,
955                hummock_version_stats,
956            );
957            let mut resps_to_commit = resps.into_values().collect_vec();
958            let mut staging_commit_info = self.database_info.take_staging_commit_info();
959            if let Some((_, finished_jobs, _)) =
960                self.finishing_jobs_collector
961                    .take_collected_if(|collected_epoch| {
962                        assert!(epoch <= collected_epoch.prev);
963                        epoch == collected_epoch.prev
964                    })
965            {
966                finished_jobs
967                    .into_iter()
968                    .for_each(|(_, (resps, tracking_job))| {
969                        resps_to_commit.extend(resps);
970                        staging_commit_info.finished_jobs.push(tracking_job);
971                    });
972            }
973            {
974                let task = task.get_or_insert_default();
975                Command::collect_commit_epoch_info(
976                    &self.database_info,
977                    &info,
978                    &mut task.commit_info,
979                    resps_to_commit,
980                    self.collect_backfill_pinned_upstream_log_epoch(),
981                );
982                self.completing_barrier = Some(info.barrier_info.prev_epoch());
983                task.finished_jobs.extend(staging_commit_info.finished_jobs);
984                task.finished_cdc_table_backfill
985                    .extend(staging_commit_info.finished_cdc_table_backfill);
986                task.epoch_infos
987                    .try_insert(self.partial_graph_id, info)
988                    .expect("non duplicate");
989                task.commit_info
990                    .truncate_tables
991                    .extend(staging_commit_info.table_ids_to_truncate);
992            }
993        } else if observed_non_checkpoint
994            && self.database_info.has_pending_finished_jobs()
995            && !partial_graph_manager.has_pending_checkpoint_barrier(self.partial_graph_id)
996        {
997            periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
998        }
999        if !independent_jobs_task.is_empty() {
1000            let task = task.get_or_insert_default();
1001            for (job_id, epoch, resps, info) in independent_jobs_task {
1002                collect_independent_job_commit_epoch_info(
1003                    &mut task.commit_info,
1004                    epoch,
1005                    resps,
1006                    &info,
1007                );
1008                task.epoch_infos
1009                    .try_insert(to_partial_graph_id(self.database_id, Some(job_id)), info)
1010                    .expect("non duplicate");
1011            }
1012        }
1013    }
1014
1015    fn ack_completed(
1016        &mut self,
1017        partial_graph_manager: &mut PartialGraphManager,
1018        command_prev_epoch: Option<u64>,
1019        independent_job_epochs: Vec<(JobId, u64)>,
1020    ) {
1021        {
1022            if let Some(prev_epoch) = self.completing_barrier.take() {
1023                assert_eq!(command_prev_epoch, Some(prev_epoch));
1024                self.committed_epoch = Some(prev_epoch);
1025                partial_graph_manager.ack_completed(self.partial_graph_id, prev_epoch);
1026            } else {
1027                assert_eq!(command_prev_epoch, None);
1028            };
1029            for (job_id, epoch) in independent_job_epochs {
1030                if let Some(job) = self.independent_checkpoint_job_controls.get_mut(&job_id) {
1031                    job.ack_completed(partial_graph_manager, epoch);
1032                }
1033                // If the job is not found, it was dropped and already removed
1034                // by `on_partial_graph_reset` while the completing task was running.
1035            }
1036        }
1037    }
1038
1039    fn handle_refresh_table_info(
1040        &self,
1041        task: &mut Option<CompleteBarrierTask>,
1042        resps: &HashMap<WorkerId, BarrierCompleteResponse>,
1043    ) {
1044        let list_finished_info = resps
1045            .values()
1046            .flat_map(|resp| resp.list_finished_sources.clone())
1047            .collect::<Vec<_>>();
1048        if !list_finished_info.is_empty() {
1049            let task = task.get_or_insert_default();
1050            task.list_finished_source_ids.extend(list_finished_info);
1051        }
1052
1053        let load_finished_info = resps
1054            .values()
1055            .flat_map(|resp| resp.load_finished_sources.clone())
1056            .collect::<Vec<_>>();
1057        if !load_finished_info.is_empty() {
1058            let task = task.get_or_insert_default();
1059            task.load_finished_source_ids.extend(load_finished_info);
1060        }
1061
1062        let refresh_finished_table_ids: Vec<JobId> = resps
1063            .values()
1064            .flat_map(|resp| {
1065                resp.refresh_finished_tables
1066                    .iter()
1067                    .map(|table_id| table_id.as_job_id())
1068            })
1069            .collect::<Vec<_>>();
1070        if !refresh_finished_table_ids.is_empty() {
1071            let task = task.get_or_insert_default();
1072            task.refresh_finished_table_job_ids
1073                .extend(refresh_finished_table_ids);
1074        }
1075    }
1076}
1077
1078impl DatabaseCheckpointControl {
1079    /// Handle the new barrier from the scheduled queue and inject it.
1080    fn handle_new_barrier(
1081        &mut self,
1082        command: Option<(Command, Vec<Notifier>)>,
1083        checkpoint: bool,
1084        span: tracing::Span,
1085        partial_graph_manager: &mut PartialGraphManager,
1086        hummock_version_stats: &HummockVersionStats,
1087        worker_nodes: &HashMap<WorkerId, WorkerNode>,
1088    ) -> MetaResult<()> {
1089        let curr_epoch = self.state.in_flight_prev_epoch().next();
1090
1091        let (command, mut notifiers) = if let Some((command, notifiers)) = command {
1092            (Some(command), notifiers)
1093        } else {
1094            (None, vec![])
1095        };
1096
1097        debug_assert!(
1098            !matches!(
1099                &command,
1100                Some(Command::RescheduleIntent {
1101                    reschedule_plan: None,
1102                    ..
1103                })
1104            ),
1105            "reschedule intent should be resolved before injection"
1106        );
1107
1108        if let Some(Command::DropStreamingJobs {
1109            streaming_job_ids, ..
1110        }) = &command
1111        {
1112            if streaming_job_ids.len() > 1 {
1113                for job_to_cancel in streaming_job_ids {
1114                    if self
1115                        .independent_checkpoint_job_controls
1116                        .contains_key(job_to_cancel)
1117                    {
1118                        warn!(
1119                            job_id = %job_to_cancel,
1120                            "ignore multi-job cancel command on creating snapshot backfill streaming job"
1121                        );
1122                        for notifier in notifiers {
1123                            notifier
1124                                .notify_start_failed(anyhow!("cannot cancel creating snapshot backfill streaming job with other jobs, \
1125                                the job will continue creating until created or recovery. Please cancel the snapshot backfilling job in a single DDL ").into());
1126                        }
1127                        return Ok(());
1128                    }
1129                }
1130            } else if let Some(job_to_drop) = streaming_job_ids.iter().next()
1131                && let Some(job) = self
1132                    .independent_checkpoint_job_controls
1133                    .get_mut(job_to_drop)
1134            {
1135                let dropped = job.drop(&mut notifiers, partial_graph_manager);
1136                if dropped {
1137                    return Ok(());
1138                }
1139            }
1140        }
1141
1142        if let Some(Command::Throttle { jobs, .. }) = &command
1143            && jobs.len() > 1
1144            && let Some(independent_job_id) = jobs
1145                .iter()
1146                .find(|job| self.independent_checkpoint_job_controls.contains_key(*job))
1147        {
1148            warn!(
1149                job_id = %independent_job_id,
1150                "ignore multi-job throttle command on independent checkpoint job"
1151            );
1152            for notifier in notifiers {
1153                notifier.notify_start_failed(
1154                    anyhow!(
1155                        "cannot alter rate limit for independent checkpoint job with other jobs, \
1156                                the original rate limit will be kept during recovery."
1157                    )
1158                    .into(),
1159                );
1160            }
1161            return Ok(());
1162        };
1163
1164        if let Some(Command::RescheduleIntent {
1165            reschedule_plan: Some(reschedule_plan),
1166            ..
1167        }) = &command
1168            && !self.independent_checkpoint_job_controls.is_empty()
1169        {
1170            let blocked_job_ids =
1171                self.collect_reschedule_blocked_jobs_for_independent_jobs_inflight()?;
1172            let blocked_reschedule_job_ids = self.collect_reschedule_blocked_job_ids(
1173                &reschedule_plan.reschedules,
1174                &reschedule_plan.fragment_actors,
1175                &blocked_job_ids,
1176            );
1177            if !blocked_reschedule_job_ids.is_empty() {
1178                warn!(
1179                    blocked_reschedule_job_ids = ?blocked_reschedule_job_ids,
1180                    "reject reschedule fragments related to creating unreschedulable backfill jobs"
1181                );
1182                for notifier in notifiers {
1183                    notifier.notify_start_failed(
1184                        anyhow!(
1185                            "cannot reschedule jobs {:?} when creating jobs with unreschedulable backfill fragments",
1186                            blocked_reschedule_job_ids
1187                        )
1188                            .into(),
1189                    );
1190                }
1191                return Ok(());
1192            }
1193        }
1194
1195        if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1196            && self.database_info.is_empty()
1197        {
1198            assert!(
1199                self.independent_checkpoint_job_controls.is_empty(),
1200                "should not have snapshot backfill job when there is no normal job in database"
1201            );
1202            // skip the command when there is nothing to do with the barrier
1203            for mut notifier in notifiers {
1204                notifier.notify_started();
1205                notifier.notify_collected();
1206            }
1207            return Ok(());
1208        };
1209
1210        if let Some(Command::CreateStreamingJob {
1211            job_type: CreateStreamingJobType::SnapshotBackfill(_),
1212            ..
1213        }) = &command
1214            && self.state.is_paused()
1215        {
1216            warn!("cannot create streaming job with snapshot backfill when paused");
1217            for notifier in notifiers {
1218                notifier.notify_start_failed(
1219                    anyhow!("cannot create streaming job with snapshot backfill when paused",)
1220                        .into(),
1221                );
1222            }
1223            return Ok(());
1224        }
1225
1226        let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1227        // Tracing related stuff
1228        barrier_info.prev_epoch.span().in_scope(|| {
1229            tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1230        });
1231        span.record("epoch", barrier_info.curr_epoch());
1232
1233        let epoch = barrier_info.epoch();
1234        let ApplyCommandInfo { jobs_to_wait } = match self.apply_command(
1235            command,
1236            &mut notifiers,
1237            barrier_info,
1238            partial_graph_manager,
1239            hummock_version_stats,
1240            worker_nodes,
1241        ) {
1242            Ok(info) => {
1243                assert!(notifiers.is_empty());
1244                info
1245            }
1246            Err(err) => {
1247                for notifier in notifiers {
1248                    notifier.notify_start_failed(err.clone());
1249                }
1250                fail_point!("inject_barrier_err_success");
1251                return Err(err);
1252            }
1253        };
1254
1255        // Record the in-flight barrier.
1256        self.enqueue_command(epoch, jobs_to_wait);
1257
1258        Ok(())
1259    }
1260}