risingwave_meta/barrier/checkpoint/
control.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_common::system_param::AdaptiveParallelismStrategy;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
29use risingwave_meta_model::WorkerId;
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::{FragmentId, PartialGraphId};
33use risingwave_pb::stream_plan::DispatcherType as PbDispatcherType;
34use risingwave_pb::stream_plan::stream_node::NodeBody;
35use risingwave_pb::stream_service::BarrierCompleteResponse;
36use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
37use tracing::{debug, warn};
38
39use crate::barrier::cdc_progress::CdcProgress;
40use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
41use crate::barrier::checkpoint::recovery::{
42    DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
43    RecoveringStateAction,
44};
45use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
46use crate::barrier::command::CommandContext;
47use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
48use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
49use crate::barrier::notifier::Notifier;
50use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager};
51use crate::barrier::progress::TrackingJob;
52use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
53use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
54use crate::barrier::utils::collect_creating_job_commit_epoch_info;
55use crate::barrier::{
56    BackfillProgress, Command, CreateStreamingJobType, FragmentBackfillProgress, Reschedule,
57};
58use crate::controller::scale::{build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs};
59use crate::manager::MetaSrvEnv;
60use crate::model::ActorId;
61use crate::rpc::metrics::GLOBAL_META_METRICS;
62use crate::{MetaError, MetaResult};
63
64pub(crate) struct CheckpointControl {
65    pub(crate) env: MetaSrvEnv,
66    pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
67    pub(super) hummock_version_stats: HummockVersionStats,
68    /// The max barrier nums in flight
69    pub(crate) in_flight_barrier_nums: usize,
70}
71
72impl CheckpointControl {
73    pub fn new(env: MetaSrvEnv) -> Self {
74        Self {
75            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
76            env,
77            databases: Default::default(),
78            hummock_version_stats: Default::default(),
79        }
80    }
81
82    pub(crate) fn recover(
83        databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
84        failed_databases: HashMap<DatabaseId, HashSet<PartialGraphId>>, /* `database_id` -> set of resetting partial graph ids */
85        hummock_version_stats: HummockVersionStats,
86        env: MetaSrvEnv,
87    ) -> Self {
88        env.shared_actor_infos()
89            .retain_databases(databases.keys().chain(failed_databases.keys()).cloned());
90        Self {
91            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
92            env,
93            databases: databases
94                .into_iter()
95                .map(|(database_id, control)| {
96                    (
97                        database_id,
98                        DatabaseCheckpointControlStatus::Running(control),
99                    )
100                })
101                .chain(failed_databases.into_iter().map(
102                    |(database_id, resetting_partial_graphs)| {
103                        (
104                            database_id,
105                            DatabaseCheckpointControlStatus::Recovering(
106                                DatabaseRecoveringState::new_resetting(
107                                    database_id,
108                                    resetting_partial_graphs,
109                                ),
110                            ),
111                        )
112                    },
113                ))
114                .collect(),
115            hummock_version_stats,
116        }
117    }
118
119    pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
120        self.hummock_version_stats = output.hummock_version_stats;
121        for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
122            self.databases
123                .get_mut(&database_id)
124                .expect("should exist")
125                .expect_running("should have wait for completing command before enter recovery")
126                .ack_completed(command_prev_epoch, creating_job_epochs);
127        }
128    }
129
130    pub(crate) fn next_complete_barrier_task(
131        &mut self,
132        mut context: Option<(&mut PeriodicBarriers, &mut PartialGraphManager)>,
133    ) -> Option<CompleteBarrierTask> {
134        let mut task = None;
135        for database in self.databases.values_mut() {
136            let Some(database) = database.running_state_mut() else {
137                continue;
138            };
139            let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
140            database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
141        }
142        task
143    }
144
145    pub(crate) fn barrier_collected(
146        &mut self,
147        partial_graph_id: PartialGraphId,
148        collected_barrier: CollectedBarrier,
149        periodic_barriers: &mut PeriodicBarriers,
150    ) -> MetaResult<()> {
151        let (database_id, _) = from_partial_graph_id(partial_graph_id);
152        let database_status = self.databases.get_mut(&database_id).expect("should exist");
153        match database_status {
154            DatabaseCheckpointControlStatus::Running(database) => {
155                database.barrier_collected(partial_graph_id, collected_barrier, periodic_barriers)
156            }
157            DatabaseCheckpointControlStatus::Recovering(_) => {
158                if cfg!(debug_assertions) {
159                    panic!(
160                        "receive collected barrier {:?} on recovering database {} from partial graph {}",
161                        collected_barrier, database_id, partial_graph_id
162                    );
163                } else {
164                    warn!(?collected_barrier, %partial_graph_id, "ignore collected barrier on recovering database");
165                }
166                Ok(())
167            }
168        }
169    }
170
171    pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
172        self.databases.iter().filter_map(|(database_id, database)| {
173            database.running_state().is_none().then_some(*database_id)
174        })
175    }
176
177    pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
178        self.databases.iter().filter_map(|(database_id, database)| {
179            database.running_state().is_some().then_some(*database_id)
180        })
181    }
182
183    pub(crate) fn database_info(&self, database_id: DatabaseId) -> Option<&InflightDatabaseInfo> {
184        self.databases
185            .get(&database_id)
186            .and_then(|database| database.running_state())
187            .map(|database| &database.database_info)
188    }
189
190    pub(crate) fn may_have_snapshot_backfilling_jobs(&self) -> bool {
191        self.databases
192            .values()
193            .any(|database| database.may_have_snapshot_backfilling_jobs())
194    }
195
196    /// return Some(failed `database_id` -> `err`)
197    pub(crate) fn handle_new_barrier(
198        &mut self,
199        new_barrier: NewBarrier,
200        partial_graph_manager: &mut PartialGraphManager,
201        adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
202        worker_nodes: &HashMap<WorkerId, WorkerNode>,
203    ) -> MetaResult<()> {
204        let NewBarrier {
205            database_id,
206            command,
207            span,
208            checkpoint,
209        } = new_barrier;
210
211        if let Some((mut command, notifiers)) = command {
212            if let &mut Command::CreateStreamingJob {
213                ref mut cross_db_snapshot_backfill_info,
214                ref info,
215                ..
216            } = &mut command
217            {
218                for (table_id, snapshot_epoch) in
219                    &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
220                {
221                    for database in self.databases.values() {
222                        if let Some(database) = database.running_state()
223                            && database.database_info.contains_job(table_id.as_job_id())
224                        {
225                            if let Some(committed_epoch) = database.committed_epoch {
226                                *snapshot_epoch = Some(committed_epoch);
227                            }
228                            break;
229                        }
230                    }
231                    if snapshot_epoch.is_none() {
232                        let table_id = *table_id;
233                        warn!(
234                            ?cross_db_snapshot_backfill_info,
235                            ?table_id,
236                            ?info,
237                            "database of cross db upstream table not found"
238                        );
239                        let err: MetaError =
240                            anyhow!("database of cross db upstream table {} not found", table_id)
241                                .into();
242                        for notifier in notifiers {
243                            notifier.notify_start_failed(err.clone());
244                        }
245
246                        return Ok(());
247                    }
248                }
249            }
250
251            let database = match self.databases.entry(database_id) {
252                Entry::Occupied(entry) => entry
253                    .into_mut()
254                    .expect_running("should not have command when not running"),
255                Entry::Vacant(entry) => match &command {
256                    Command::CreateStreamingJob { info, job_type, .. } => {
257                        let CreateStreamingJobType::Normal = job_type else {
258                            if cfg!(debug_assertions) {
259                                panic!(
260                                    "unexpected first job of type {job_type:?} with info {info:?}"
261                                );
262                            } else {
263                                for notifier in notifiers {
264                                    notifier.notify_start_failed(anyhow!("unexpected job_type {job_type:?} for first job {} in database {database_id}", info.streaming_job.id()).into());
265                                }
266                                return Ok(());
267                            }
268                        };
269                        let new_database = DatabaseCheckpointControl::new(
270                            database_id,
271                            self.env.shared_actor_infos().clone(),
272                        );
273                        let adder = partial_graph_manager
274                            .add_partial_graph(to_partial_graph_id(database_id, None));
275                        adder.added();
276                        entry
277                            .insert(DatabaseCheckpointControlStatus::Running(new_database))
278                            .expect_running("just initialized as running")
279                    }
280                    Command::Flush
281                    | Command::Pause
282                    | Command::Resume
283                    | Command::DropStreamingJobs { .. }
284                    | Command::DropSubscription { .. } => {
285                        for mut notifier in notifiers {
286                            notifier.notify_started();
287                            notifier.notify_collected();
288                        }
289                        warn!(?command, "skip command for empty database");
290                        return Ok(());
291                    }
292                    Command::RescheduleIntent { .. }
293                    | Command::ReplaceStreamJob(_)
294                    | Command::SourceChangeSplit(_)
295                    | Command::Throttle { .. }
296                    | Command::CreateSubscription { .. }
297                    | Command::AlterSubscriptionRetention { .. }
298                    | Command::ConnectorPropsChange(_)
299                    | Command::Refresh { .. }
300                    | Command::ListFinish { .. }
301                    | Command::LoadFinish { .. }
302                    | Command::ResetSource { .. }
303                    | Command::ResumeBackfill { .. }
304                    | Command::InjectSourceOffsets { .. } => {
305                        if cfg!(debug_assertions) {
306                            panic!(
307                                "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
308                                database_id, command
309                            )
310                        } else {
311                            warn!(%database_id, ?command, "database not exist when handling command");
312                            for notifier in notifiers {
313                                notifier.notify_start_failed(anyhow!("database {database_id} not exist when handling command {command:?}").into());
314                            }
315                            return Ok(());
316                        }
317                    }
318                },
319            };
320
321            database.handle_new_barrier(
322                Some((command, notifiers)),
323                checkpoint,
324                span,
325                partial_graph_manager,
326                &self.hummock_version_stats,
327                adaptive_parallelism_strategy,
328                worker_nodes,
329            )
330        } else {
331            let database = match self.databases.entry(database_id) {
332                Entry::Occupied(entry) => entry.into_mut(),
333                Entry::Vacant(_) => {
334                    // If it does not exist in the HashMap yet, it means that the first streaming
335                    // job has not been created, and we do not need to send a barrier.
336                    return Ok(());
337                }
338            };
339            let Some(database) = database.running_state_mut() else {
340                // Skip new barrier for database which is not running.
341                return Ok(());
342            };
343            if !database.can_inject_barrier(self.in_flight_barrier_nums) {
344                // Skip new barrier with no explicit command when the database should pause inject additional barrier
345                return Ok(());
346            }
347            database.handle_new_barrier(
348                None,
349                checkpoint,
350                span,
351                partial_graph_manager,
352                &self.hummock_version_stats,
353                adaptive_parallelism_strategy,
354                worker_nodes,
355            )
356        }
357    }
358
359    pub(crate) fn update_barrier_nums_metrics(&self) {
360        self.databases
361            .values()
362            .flat_map(|database| database.running_state())
363            .for_each(|database| database.update_barrier_nums_metrics());
364    }
365
366    pub(crate) fn gen_backfill_progress(&self) -> HashMap<JobId, BackfillProgress> {
367        let mut progress = HashMap::new();
368        for status in self.databases.values() {
369            let Some(database_checkpoint_control) = status.running_state() else {
370                continue;
371            };
372            // Progress of normal backfill
373            progress.extend(
374                database_checkpoint_control
375                    .database_info
376                    .gen_backfill_progress(),
377            );
378            // Progress of snapshot backfill
379            for (job_id, creating_job) in
380                &database_checkpoint_control.creating_streaming_job_controls
381            {
382                progress.extend([(*job_id, creating_job.gen_backfill_progress())]);
383            }
384        }
385        progress
386    }
387
388    pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
389        let mut progress = Vec::new();
390        for status in self.databases.values() {
391            let Some(database_checkpoint_control) = status.running_state() else {
392                continue;
393            };
394            progress.extend(
395                database_checkpoint_control
396                    .database_info
397                    .gen_fragment_backfill_progress(),
398            );
399            for creating_job in database_checkpoint_control
400                .creating_streaming_job_controls
401                .values()
402            {
403                progress.extend(creating_job.gen_fragment_backfill_progress());
404            }
405        }
406        progress
407    }
408
409    pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
410        let mut progress = HashMap::new();
411        for status in self.databases.values() {
412            let Some(database_checkpoint_control) = status.running_state() else {
413                continue;
414            };
415            // Progress of normal backfill
416            progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
417        }
418        progress
419    }
420
421    pub(crate) fn databases_failed_at_worker_err(
422        &mut self,
423        worker_id: WorkerId,
424    ) -> impl Iterator<Item = DatabaseId> + '_ {
425        self.databases
426            .iter_mut()
427            .filter_map(
428                move |(database_id, database_status)| match database_status {
429                    DatabaseCheckpointControlStatus::Running(control) => {
430                        if !control.is_valid_after_worker_err(worker_id) {
431                            Some(*database_id)
432                        } else {
433                            None
434                        }
435                    }
436                    DatabaseCheckpointControlStatus::Recovering(state) => {
437                        if !state.is_valid_after_worker_err(worker_id) {
438                            Some(*database_id)
439                        } else {
440                            None
441                        }
442                    }
443                },
444            )
445    }
446
447    pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
448        for (_, node) in self.databases.values_mut().flat_map(|status| {
449            status
450                .running_state_mut()
451                .map(|database| take(&mut database.command_ctx_queue))
452                .into_iter()
453                .flatten()
454        }) {
455            for notifier in node.notifiers {
456                notifier.notify_collection_failed(err.clone());
457            }
458            node.enqueue_time.observe_duration();
459        }
460    }
461}
462
463pub(crate) enum CheckpointControlEvent<'a> {
464    EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
465    EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
466}
467
468impl CheckpointControl {
469    pub(crate) fn on_partial_graph_reset(
470        &mut self,
471        partial_graph_id: PartialGraphId,
472        reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
473    ) {
474        let (database_id, creating_job) = from_partial_graph_id(partial_graph_id);
475        match self.databases.get_mut(&database_id).expect("should exist") {
476            DatabaseCheckpointControlStatus::Running(database) => {
477                if let Some(creating_job_id) = creating_job {
478                    let Some(job) = database
479                        .creating_streaming_job_controls
480                        .remove(&creating_job_id)
481                    else {
482                        if cfg!(debug_assertions) {
483                            panic!(
484                                "receive reset partial graph resp on non-existing creating job {creating_job_id} in database {database_id}"
485                            )
486                        }
487                        warn!(
488                            %database_id,
489                            %creating_job_id,
490                            "ignore reset partial graph resp on non-existing creating job on running database"
491                        );
492                        return;
493                    };
494                    job.on_partial_graph_reset();
495                } else {
496                    unreachable!("should not receive reset database resp when database running")
497                }
498            }
499            DatabaseCheckpointControlStatus::Recovering(state) => {
500                state.on_partial_graph_reset(partial_graph_id, reset_resps);
501            }
502        }
503    }
504
505    pub(crate) fn on_partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
506        let (database_id, _) = from_partial_graph_id(partial_graph_id);
507        match self.databases.get_mut(&database_id).expect("should exist") {
508            DatabaseCheckpointControlStatus::Running(_) => {
509                unreachable!("should not have partial graph initialized when running")
510            }
511            DatabaseCheckpointControlStatus::Recovering(state) => {
512                state.partial_graph_initialized(partial_graph_id);
513            }
514        }
515    }
516
517    pub(crate) fn next_event(
518        &mut self,
519    ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
520        let mut this = Some(self);
521        poll_fn(move |cx| {
522            let Some(this_mut) = this.as_mut() else {
523                unreachable!("should not be polled after poll ready")
524            };
525            for (&database_id, database_status) in &mut this_mut.databases {
526                match database_status {
527                    DatabaseCheckpointControlStatus::Running(_) => {}
528                    DatabaseCheckpointControlStatus::Recovering(state) => {
529                        let poll_result = state.poll_next_event(cx);
530                        if let Poll::Ready(action) = poll_result {
531                            let this = this.take().expect("checked Some");
532                            return Poll::Ready(match action {
533                                RecoveringStateAction::EnterInitializing(reset_workers) => {
534                                    CheckpointControlEvent::EnteringInitializing(
535                                        this.new_database_status_action(
536                                            database_id,
537                                            EnterInitializing(reset_workers),
538                                        ),
539                                    )
540                                }
541                                RecoveringStateAction::EnterRunning => {
542                                    CheckpointControlEvent::EnteringRunning(
543                                        this.new_database_status_action(database_id, EnterRunning),
544                                    )
545                                }
546                            });
547                        }
548                    }
549                }
550            }
551            Poll::Pending
552        })
553    }
554}
555
556pub(crate) enum DatabaseCheckpointControlStatus {
557    Running(DatabaseCheckpointControl),
558    Recovering(DatabaseRecoveringState),
559}
560
561impl DatabaseCheckpointControlStatus {
562    fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
563        match self {
564            DatabaseCheckpointControlStatus::Running(state) => Some(state),
565            DatabaseCheckpointControlStatus::Recovering(_) => None,
566        }
567    }
568
569    fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
570        match self {
571            DatabaseCheckpointControlStatus::Running(state) => Some(state),
572            DatabaseCheckpointControlStatus::Recovering(_) => None,
573        }
574    }
575
576    fn may_have_snapshot_backfilling_jobs(&self) -> bool {
577        self.running_state()
578            .map(|database| !database.creating_streaming_job_controls.is_empty())
579            .unwrap_or(true) // there can be snapshot backfilling jobs when the database is recovering.
580    }
581
582    fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
583        match self {
584            DatabaseCheckpointControlStatus::Running(state) => state,
585            DatabaseCheckpointControlStatus::Recovering(_) => {
586                panic!("should be at running: {}", reason)
587            }
588        }
589    }
590}
591
592struct DatabaseCheckpointControlMetrics {
593    barrier_latency: LabelGuardedHistogram,
594    in_flight_barrier_nums: LabelGuardedIntGauge,
595    all_barrier_nums: LabelGuardedIntGauge,
596}
597
598impl DatabaseCheckpointControlMetrics {
599    fn new(database_id: DatabaseId) -> Self {
600        let database_id_str = database_id.to_string();
601        let barrier_latency = GLOBAL_META_METRICS
602            .barrier_latency
603            .with_guarded_label_values(&[&database_id_str]);
604        let in_flight_barrier_nums = GLOBAL_META_METRICS
605            .in_flight_barrier_nums
606            .with_guarded_label_values(&[&database_id_str]);
607        let all_barrier_nums = GLOBAL_META_METRICS
608            .all_barrier_nums
609            .with_guarded_label_values(&[&database_id_str]);
610        Self {
611            barrier_latency,
612            in_flight_barrier_nums,
613            all_barrier_nums,
614        }
615    }
616}
617
618/// Controls the concurrent execution of commands.
619pub(in crate::barrier) struct DatabaseCheckpointControl {
620    pub(super) database_id: DatabaseId,
621    pub(super) state: BarrierWorkerState,
622
623    /// Save the state and message of barrier in order.
624    /// Key is the `prev_epoch`.
625    command_ctx_queue: BTreeMap<u64, EpochNode>,
626    /// The barrier that are completing.
627    /// Some(`prev_epoch`)
628    completing_barrier: Option<u64>,
629
630    committed_epoch: Option<u64>,
631
632    pub(super) database_info: InflightDatabaseInfo,
633    pub creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
634
635    metrics: DatabaseCheckpointControlMetrics,
636}
637
638impl DatabaseCheckpointControl {
639    fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
640        Self {
641            database_id,
642            state: BarrierWorkerState::new(),
643            command_ctx_queue: Default::default(),
644            completing_barrier: None,
645            committed_epoch: None,
646            database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
647            creating_streaming_job_controls: Default::default(),
648            metrics: DatabaseCheckpointControlMetrics::new(database_id),
649        }
650    }
651
652    pub(crate) fn recovery(
653        database_id: DatabaseId,
654        state: BarrierWorkerState,
655        committed_epoch: u64,
656        database_info: InflightDatabaseInfo,
657        creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
658    ) -> Self {
659        Self {
660            database_id,
661            state,
662            command_ctx_queue: Default::default(),
663            completing_barrier: None,
664            committed_epoch: Some(committed_epoch),
665            database_info,
666            creating_streaming_job_controls,
667            metrics: DatabaseCheckpointControlMetrics::new(database_id),
668        }
669    }
670
671    pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
672        !self.database_info.contains_worker(worker_id as _)
673            && self
674                .creating_streaming_job_controls
675                .values()
676                .all(|job| job.is_valid_after_worker_err(worker_id))
677    }
678
679    fn total_command_num(&self) -> usize {
680        self.command_ctx_queue.len()
681            + match &self.completing_barrier {
682                Some(_) => 1,
683                None => 0,
684            }
685    }
686
687    /// Update the metrics of barrier nums.
688    fn update_barrier_nums_metrics(&self) {
689        self.metrics.in_flight_barrier_nums.set(
690            self.command_ctx_queue
691                .values()
692                .filter(|x| x.state.is_inflight())
693                .count() as i64,
694        );
695        self.metrics
696            .all_barrier_nums
697            .set(self.total_command_num() as i64);
698    }
699
700    /// Enqueue a barrier command
701    fn enqueue_command(
702        &mut self,
703        command_ctx: CommandContext,
704        notifiers: Vec<Notifier>,
705        creating_jobs_to_wait: HashSet<JobId>,
706    ) {
707        let timer = self.metrics.barrier_latency.start_timer();
708
709        if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
710            assert_eq!(
711                command_ctx.barrier_info.prev_epoch.value(),
712                node.command_ctx.barrier_info.curr_epoch.value()
713            );
714        }
715
716        tracing::trace!(
717            prev_epoch = command_ctx.barrier_info.prev_epoch(),
718            ?creating_jobs_to_wait,
719            "enqueue command"
720        );
721        self.command_ctx_queue.insert(
722            command_ctx.barrier_info.prev_epoch(),
723            EpochNode {
724                enqueue_time: timer,
725                state: BarrierEpochState {
726                    is_collected: false,
727                    resps: vec![],
728                    creating_jobs_to_wait,
729                    finished_jobs: HashMap::new(),
730                },
731                command_ctx,
732                notifiers,
733            },
734        );
735    }
736
737    /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
738    /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
739    fn barrier_collected(
740        &mut self,
741        partial_graph_id: PartialGraphId,
742        collected_barrier: CollectedBarrier,
743        periodic_barriers: &mut PeriodicBarriers,
744    ) -> MetaResult<()> {
745        let prev_epoch = collected_barrier.epoch.prev;
746        tracing::trace!(
747            prev_epoch,
748            partial_graph_id = %partial_graph_id,
749            "barrier collected"
750        );
751        let (database_id, creating_job_id) = from_partial_graph_id(partial_graph_id);
752        assert_eq!(self.database_id, database_id);
753        match creating_job_id {
754            None => {
755                if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
756                    assert!(!node.state.is_collected);
757                    node.state.is_collected = true;
758                    node.state
759                        .resps
760                        .extend(collected_barrier.resps.into_values());
761                } else {
762                    panic!(
763                        "collect barrier on non-existing barrier: {}, {:?}",
764                        prev_epoch, collected_barrier
765                    );
766                }
767            }
768            Some(creating_job_id) => {
769                let should_merge_to_upstream = self
770                    .creating_streaming_job_controls
771                    .get_mut(&creating_job_id)
772                    .expect("should exist")
773                    .collect(collected_barrier);
774                if should_merge_to_upstream {
775                    periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
776                }
777            }
778        }
779        Ok(())
780    }
781
782    /// Pause inject barrier until True.
783    fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
784        self.command_ctx_queue
785            .values()
786            .filter(|x| x.state.is_inflight())
787            .count()
788            < in_flight_barrier_nums
789    }
790}
791
792impl DatabaseCheckpointControl {
793    /// return creating job table fragment id -> (backfill progress epoch , {`upstream_mv_table_id`})
794    fn collect_backfill_pinned_upstream_log_epoch(
795        &self,
796    ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
797        self.creating_streaming_job_controls
798            .iter()
799            .map(|(job_id, creating_job)| (*job_id, creating_job.pinned_upstream_log_epoch()))
800            .collect()
801    }
802
803    fn collect_no_shuffle_fragment_relations_for_reschedule_check(
804        &self,
805    ) -> Vec<(FragmentId, FragmentId)> {
806        let mut no_shuffle_relations = Vec::new();
807        for fragment in self.database_info.fragment_infos() {
808            let downstream_fragment_id = fragment.fragment_id;
809            visit_stream_node_cont(&fragment.nodes, |node| {
810                if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
811                    && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
812                {
813                    no_shuffle_relations.push((merge.upstream_fragment_id, downstream_fragment_id));
814                }
815                true
816            });
817        }
818
819        for creating_job in self.creating_streaming_job_controls.values() {
820            creating_job.collect_no_shuffle_fragment_relations(&mut no_shuffle_relations);
821        }
822        no_shuffle_relations
823    }
824
825    fn collect_reschedule_blocked_jobs_for_creating_jobs_inflight(
826        &self,
827    ) -> MetaResult<HashSet<JobId>> {
828        let mut initial_blocked_fragment_ids = HashSet::new();
829        for creating_job in self.creating_streaming_job_controls.values() {
830            creating_job.collect_reschedule_blocked_fragment_ids(&mut initial_blocked_fragment_ids);
831        }
832
833        let mut blocked_fragment_ids = initial_blocked_fragment_ids.clone();
834        if !initial_blocked_fragment_ids.is_empty() {
835            let no_shuffle_relations =
836                self.collect_no_shuffle_fragment_relations_for_reschedule_check();
837            let (forward_edges, backward_edges) =
838                build_no_shuffle_fragment_graph_edges(no_shuffle_relations);
839            let initial_blocked_fragment_ids: Vec<_> =
840                initial_blocked_fragment_ids.iter().copied().collect();
841            for ensemble in find_no_shuffle_graphs(
842                &initial_blocked_fragment_ids,
843                &forward_edges,
844                &backward_edges,
845            )? {
846                blocked_fragment_ids.extend(ensemble.fragments());
847            }
848        }
849
850        let mut blocked_job_ids = HashSet::new();
851        blocked_job_ids.extend(
852            blocked_fragment_ids
853                .into_iter()
854                .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id)),
855        );
856        Ok(blocked_job_ids)
857    }
858
859    fn collect_reschedule_blocked_job_ids(
860        &self,
861        reschedules: &HashMap<FragmentId, Reschedule>,
862        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
863        blocked_job_ids: &HashSet<JobId>,
864    ) -> HashSet<JobId> {
865        let mut affected_fragment_ids: HashSet<FragmentId> = reschedules.keys().copied().collect();
866        affected_fragment_ids.extend(fragment_actors.keys().copied());
867        for reschedule in reschedules.values() {
868            affected_fragment_ids.extend(reschedule.downstream_fragment_ids.iter().copied());
869            affected_fragment_ids.extend(
870                reschedule
871                    .upstream_fragment_dispatcher_ids
872                    .iter()
873                    .map(|(fragment_id, _)| *fragment_id),
874            );
875        }
876
877        affected_fragment_ids
878            .into_iter()
879            .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id))
880            .filter(|job_id| blocked_job_ids.contains(job_id))
881            .collect()
882    }
883
884    fn next_complete_barrier_task(
885        &mut self,
886        task: &mut Option<CompleteBarrierTask>,
887        mut context: Option<(&mut PeriodicBarriers, &mut PartialGraphManager)>,
888        hummock_version_stats: &HummockVersionStats,
889    ) {
890        // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
891        let mut creating_jobs_task = vec![];
892        if let Some(committed_epoch) = self.committed_epoch {
893            // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
894            let mut finished_jobs = Vec::new();
895            let min_upstream_inflight_barrier = self
896                .command_ctx_queue
897                .first_key_value()
898                .map(|(epoch, _)| *epoch);
899            for (job_id, job) in &mut self.creating_streaming_job_controls {
900                if let Some((epoch, resps, status)) =
901                    job.start_completing(min_upstream_inflight_barrier, committed_epoch)
902                {
903                    let create_info = match status {
904                        CompleteJobType::First(info) => Some(info),
905                        CompleteJobType::Normal => None,
906                        CompleteJobType::Finished => {
907                            finished_jobs.push((*job_id, epoch, resps));
908                            continue;
909                        }
910                    };
911                    creating_jobs_task.push((*job_id, epoch, resps, create_info));
912                }
913            }
914            if !finished_jobs.is_empty()
915                && let Some((_, partial_graph_manager)) = &mut context
916            {
917                partial_graph_manager.remove_partial_graphs(
918                    finished_jobs
919                        .iter()
920                        .map(|(job_id, _, _)| to_partial_graph_id(self.database_id, Some(*job_id)))
921                        .collect(),
922                );
923            }
924            for (job_id, epoch, resps) in finished_jobs {
925                let epoch_state = &mut self
926                    .command_ctx_queue
927                    .get_mut(&epoch)
928                    .expect("should exist")
929                    .state;
930                assert!(epoch_state.creating_jobs_to_wait.remove(&job_id));
931                debug!(epoch, %job_id, "finish creating job");
932                // It's safe to remove the creating job, because on CompleteJobType::Finished,
933                // all previous barriers have been collected and completed.
934                let creating_streaming_job = self
935                    .creating_streaming_job_controls
936                    .remove(&job_id)
937                    .expect("should exist");
938                let tracking_job = creating_streaming_job.into_tracking_job();
939
940                assert!(
941                    epoch_state
942                        .finished_jobs
943                        .insert(job_id, (resps, tracking_job))
944                        .is_none()
945                );
946            }
947        }
948        assert!(self.completing_barrier.is_none());
949        while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
950            && !state.is_inflight()
951        {
952            {
953                let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
954                assert!(node.state.creating_jobs_to_wait.is_empty());
955                assert!(node.state.is_collected);
956
957                self.handle_refresh_table_info(task, &node);
958
959                self.database_info.apply_collected_command(
960                    &node.command_ctx.command,
961                    &node.state.resps,
962                    hummock_version_stats,
963                );
964                if !node.command_ctx.barrier_info.kind.is_checkpoint() {
965                    node.notifiers.into_iter().for_each(|notifier| {
966                        notifier.notify_collected();
967                    });
968                    if let Some((periodic_barriers, _)) = &mut context
969                        && self.database_info.has_pending_finished_jobs()
970                        && self
971                            .command_ctx_queue
972                            .values()
973                            .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
974                    {
975                        periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
976                    }
977                    continue;
978                }
979                let mut staging_commit_info = self.database_info.take_staging_commit_info();
980                node.state
981                    .finished_jobs
982                    .drain()
983                    .for_each(|(_job_id, (resps, tracking_job))| {
984                        node.state.resps.extend(resps);
985                        staging_commit_info.finished_jobs.push(tracking_job);
986                    });
987                let task = task.get_or_insert_default();
988                node.command_ctx.collect_commit_epoch_info(
989                    &mut task.commit_info,
990                    take(&mut node.state.resps),
991                    self.collect_backfill_pinned_upstream_log_epoch(),
992                );
993                self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
994                task.finished_jobs.extend(staging_commit_info.finished_jobs);
995                task.finished_cdc_table_backfill
996                    .extend(staging_commit_info.finished_cdc_table_backfill);
997                task.notifiers.extend(node.notifiers);
998                task.epoch_infos
999                    .try_insert(
1000                        self.database_id,
1001                        (Some((node.command_ctx, node.enqueue_time)), vec![]),
1002                    )
1003                    .expect("non duplicate");
1004                task.commit_info
1005                    .truncate_tables
1006                    .extend(staging_commit_info.table_ids_to_truncate);
1007                break;
1008            }
1009        }
1010        if !creating_jobs_task.is_empty() {
1011            let task = task.get_or_insert_default();
1012            for (job_id, epoch, resps, create_info) in creating_jobs_task {
1013                collect_creating_job_commit_epoch_info(
1014                    &mut task.commit_info,
1015                    epoch,
1016                    resps,
1017                    self.creating_streaming_job_controls[&job_id]
1018                        .state_table_ids()
1019                        .iter()
1020                        .copied(),
1021                    create_info.as_ref(),
1022                );
1023                let (_, creating_job_epochs) =
1024                    task.epoch_infos.entry(self.database_id).or_default();
1025                creating_job_epochs.push((job_id, epoch, create_info));
1026            }
1027        }
1028    }
1029
1030    fn ack_completed(
1031        &mut self,
1032        command_prev_epoch: Option<u64>,
1033        creating_job_epochs: Vec<(JobId, u64)>,
1034    ) {
1035        {
1036            if let Some(prev_epoch) = self.completing_barrier.take() {
1037                assert_eq!(command_prev_epoch, Some(prev_epoch));
1038                self.committed_epoch = Some(prev_epoch);
1039            } else {
1040                assert_eq!(command_prev_epoch, None);
1041            };
1042            for (job_id, epoch) in creating_job_epochs {
1043                self.creating_streaming_job_controls
1044                    .get_mut(&job_id)
1045                    .expect("should exist")
1046                    .ack_completed(epoch)
1047            }
1048        }
1049    }
1050
1051    fn handle_refresh_table_info(&self, task: &mut Option<CompleteBarrierTask>, node: &EpochNode) {
1052        let list_finished_info = node
1053            .state
1054            .resps
1055            .iter()
1056            .flat_map(|resp| resp.list_finished_sources.clone())
1057            .collect::<Vec<_>>();
1058        if !list_finished_info.is_empty() {
1059            let task = task.get_or_insert_default();
1060            task.list_finished_source_ids.extend(list_finished_info);
1061        }
1062
1063        let load_finished_info = node
1064            .state
1065            .resps
1066            .iter()
1067            .flat_map(|resp| resp.load_finished_sources.clone())
1068            .collect::<Vec<_>>();
1069        if !load_finished_info.is_empty() {
1070            let task = task.get_or_insert_default();
1071            task.load_finished_source_ids.extend(load_finished_info);
1072        }
1073
1074        let refresh_finished_table_ids: Vec<JobId> = node
1075            .state
1076            .resps
1077            .iter()
1078            .flat_map(|resp| {
1079                resp.refresh_finished_tables
1080                    .iter()
1081                    .map(|table_id| table_id.as_job_id())
1082            })
1083            .collect::<Vec<_>>();
1084        if !refresh_finished_table_ids.is_empty() {
1085            let task = task.get_or_insert_default();
1086            task.refresh_finished_table_job_ids
1087                .extend(refresh_finished_table_ids);
1088        }
1089    }
1090}
1091
1092/// The state and message of this barrier, a node for concurrent checkpoint.
1093struct EpochNode {
1094    /// Timer for recording barrier latency, taken after `complete_barriers`.
1095    enqueue_time: HistogramTimer,
1096
1097    /// Whether this barrier is in-flight or completed.
1098    state: BarrierEpochState,
1099
1100    /// Context of this command to generate barrier and do some post jobs.
1101    command_ctx: CommandContext,
1102    /// Notifiers of this barrier.
1103    notifiers: Vec<Notifier>,
1104}
1105
1106#[derive(Debug)]
1107/// The state of barrier.
1108struct BarrierEpochState {
1109    is_collected: bool,
1110
1111    resps: Vec<BarrierCompleteResponse>,
1112
1113    creating_jobs_to_wait: HashSet<JobId>,
1114
1115    finished_jobs: HashMap<JobId, (Vec<BarrierCompleteResponse>, TrackingJob)>,
1116}
1117
1118impl BarrierEpochState {
1119    fn is_inflight(&self) -> bool {
1120        !self.is_collected || !self.creating_jobs_to_wait.is_empty()
1121    }
1122}
1123
1124impl DatabaseCheckpointControl {
1125    /// Handle the new barrier from the scheduled queue and inject it.
1126    fn handle_new_barrier(
1127        &mut self,
1128        command: Option<(Command, Vec<Notifier>)>,
1129        checkpoint: bool,
1130        span: tracing::Span,
1131        partial_graph_manager: &mut PartialGraphManager,
1132        hummock_version_stats: &HummockVersionStats,
1133        adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
1134        worker_nodes: &HashMap<WorkerId, WorkerNode>,
1135    ) -> MetaResult<()> {
1136        let curr_epoch = self.state.in_flight_prev_epoch().next();
1137
1138        let (command, mut notifiers) = if let Some((command, notifiers)) = command {
1139            (Some(command), notifiers)
1140        } else {
1141            (None, vec![])
1142        };
1143
1144        debug_assert!(
1145            !matches!(
1146                &command,
1147                Some(Command::RescheduleIntent {
1148                    reschedule_plan: None,
1149                    ..
1150                })
1151            ),
1152            "reschedule intent should be resolved before injection"
1153        );
1154
1155        if let Some(Command::DropStreamingJobs {
1156            streaming_job_ids, ..
1157        }) = &command
1158        {
1159            if streaming_job_ids.len() > 1 {
1160                for job_to_cancel in streaming_job_ids {
1161                    if self
1162                        .creating_streaming_job_controls
1163                        .contains_key(job_to_cancel)
1164                    {
1165                        warn!(
1166                            job_id = %job_to_cancel,
1167                            "ignore multi-job cancel command on creating snapshot backfill streaming job"
1168                        );
1169                        for notifier in notifiers {
1170                            notifier
1171                                .notify_start_failed(anyhow!("cannot cancel creating snapshot backfill streaming job with other jobs, \
1172                                the job will continue creating until created or recovery. Please cancel the snapshot backfilling job in a single DDL ").into());
1173                        }
1174                        return Ok(());
1175                    }
1176                }
1177            } else if let Some(job_to_drop) = streaming_job_ids.iter().next()
1178                && let Some(creating_job) =
1179                    self.creating_streaming_job_controls.get_mut(job_to_drop)
1180                && creating_job.drop(&mut notifiers, partial_graph_manager)
1181            {
1182                return Ok(());
1183            }
1184        }
1185
1186        if let Some(Command::Throttle { jobs, .. }) = &command
1187            && jobs.len() > 1
1188            && let Some(creating_job_id) = jobs
1189                .iter()
1190                .find(|job| self.creating_streaming_job_controls.contains_key(*job))
1191        {
1192            warn!(
1193                job_id = %creating_job_id,
1194                "ignore multi-job throttle command on creating snapshot backfill streaming job"
1195            );
1196            for notifier in notifiers {
1197                notifier
1198                    .notify_start_failed(anyhow!("cannot alter rate limit for snapshot backfill streaming job with other jobs, \
1199                                the original rate limit will be kept recovery.").into());
1200            }
1201            return Ok(());
1202        };
1203
1204        if let Some(Command::RescheduleIntent {
1205            reschedule_plan: Some(reschedule_plan),
1206            ..
1207        }) = &command
1208            && !self.creating_streaming_job_controls.is_empty()
1209        {
1210            let blocked_job_ids =
1211                self.collect_reschedule_blocked_jobs_for_creating_jobs_inflight()?;
1212            let blocked_reschedule_job_ids = self.collect_reschedule_blocked_job_ids(
1213                &reschedule_plan.reschedules,
1214                &reschedule_plan.fragment_actors,
1215                &blocked_job_ids,
1216            );
1217            if !blocked_reschedule_job_ids.is_empty() {
1218                warn!(
1219                    blocked_reschedule_job_ids = ?blocked_reschedule_job_ids,
1220                    "reject reschedule fragments related to creating unreschedulable backfill jobs"
1221                );
1222                for notifier in notifiers {
1223                    notifier.notify_start_failed(
1224                        anyhow!(
1225                            "cannot reschedule jobs {:?} when creating jobs with unreschedulable backfill fragments",
1226                            blocked_reschedule_job_ids
1227                        )
1228                        .into(),
1229                    );
1230                }
1231                return Ok(());
1232            }
1233        }
1234
1235        if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1236            && self.database_info.is_empty()
1237        {
1238            assert!(
1239                self.creating_streaming_job_controls.is_empty(),
1240                "should not have snapshot backfill job when there is no normal job in database"
1241            );
1242            // skip the command when there is nothing to do with the barrier
1243            for mut notifier in notifiers {
1244                notifier.notify_started();
1245                notifier.notify_collected();
1246            }
1247            return Ok(());
1248        };
1249
1250        if let Some(Command::CreateStreamingJob {
1251            job_type: CreateStreamingJobType::SnapshotBackfill(_),
1252            ..
1253        }) = &command
1254            && self.state.is_paused()
1255        {
1256            warn!("cannot create streaming job with snapshot backfill when paused");
1257            for notifier in notifiers {
1258                notifier.notify_start_failed(
1259                    anyhow!("cannot create streaming job with snapshot backfill when paused",)
1260                        .into(),
1261                );
1262            }
1263            return Ok(());
1264        }
1265
1266        let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1267        // Tracing related stuff
1268        barrier_info.prev_epoch.span().in_scope(|| {
1269                tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1270            });
1271        span.record("epoch", barrier_info.curr_epoch());
1272
1273        let ApplyCommandInfo {
1274            mv_subscription_max_retention,
1275            table_ids_to_commit,
1276            jobs_to_wait,
1277            command,
1278        } = match self.apply_command(
1279            command,
1280            &mut notifiers,
1281            &barrier_info,
1282            partial_graph_manager,
1283            hummock_version_stats,
1284            adaptive_parallelism_strategy,
1285            worker_nodes,
1286        ) {
1287            Ok(info) => info,
1288            Err(err) => {
1289                for notifier in notifiers {
1290                    notifier.notify_start_failed(err.clone());
1291                }
1292                fail_point!("inject_barrier_err_success");
1293                return Err(err);
1294            }
1295        };
1296
1297        // Notify about the injection.
1298        notifiers.iter_mut().for_each(|n| n.notify_started());
1299
1300        let command_ctx = CommandContext::new(
1301            barrier_info,
1302            mv_subscription_max_retention,
1303            table_ids_to_commit,
1304            command,
1305            span,
1306        );
1307
1308        // Record the in-flight barrier.
1309        self.enqueue_command(command_ctx, notifiers, jobs_to_wait);
1310
1311        Ok(())
1312    }
1313}