risingwave_meta/barrier/checkpoint/
control.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::hummock::HummockVersionStats;
29use risingwave_pb::stream_service::BarrierCompleteResponse;
30use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
31use tracing::{debug, warn};
32
33use crate::barrier::cdc_progress::CdcProgress;
34use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
35use crate::barrier::checkpoint::recovery::{
36    DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
37    RecoveringStateAction,
38};
39use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
40use crate::barrier::command::CommandContext;
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
42use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
43use crate::barrier::notifier::Notifier;
44use crate::barrier::progress::TrackingJob;
45use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
46use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
47use crate::barrier::utils::{
48    NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
49};
50use crate::barrier::{BackfillProgress, Command, CreateStreamingJobType, FragmentBackfillProgress};
51use crate::manager::MetaSrvEnv;
52use crate::rpc::metrics::GLOBAL_META_METRICS;
53use crate::{MetaError, MetaResult};
54
55pub(crate) struct CheckpointControl {
56    pub(crate) env: MetaSrvEnv,
57    pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
58    pub(super) hummock_version_stats: HummockVersionStats,
59    /// The max barrier nums in flight
60    pub(crate) in_flight_barrier_nums: usize,
61}
62
63impl CheckpointControl {
64    pub fn new(env: MetaSrvEnv) -> Self {
65        Self {
66            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
67            env,
68            databases: Default::default(),
69            hummock_version_stats: Default::default(),
70        }
71    }
72
73    pub(crate) fn recover(
74        databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
75        failed_databases: HashMap<DatabaseId, HashSet<JobId>>, /* `database_id` -> set of `creating_job_id` */
76        control_stream_manager: &mut ControlStreamManager,
77        hummock_version_stats: HummockVersionStats,
78        env: MetaSrvEnv,
79    ) -> Self {
80        env.shared_actor_infos()
81            .retain_databases(databases.keys().chain(failed_databases.keys()).cloned());
82        Self {
83            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
84            env,
85            databases: databases
86                .into_iter()
87                .map(|(database_id, control)| {
88                    (
89                        database_id,
90                        DatabaseCheckpointControlStatus::Running(control),
91                    )
92                })
93                .chain(
94                    failed_databases
95                        .into_iter()
96                        .map(|(database_id, creating_jobs)| {
97                            (
98                                database_id,
99                                DatabaseCheckpointControlStatus::Recovering(
100                                    DatabaseRecoveringState::new_resetting(
101                                        database_id,
102                                        creating_jobs.into_iter(),
103                                        control_stream_manager,
104                                    ),
105                                ),
106                            )
107                        }),
108                )
109                .collect(),
110            hummock_version_stats,
111        }
112    }
113
114    pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
115        self.hummock_version_stats = output.hummock_version_stats;
116        for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
117            self.databases
118                .get_mut(&database_id)
119                .expect("should exist")
120                .expect_running("should have wait for completing command before enter recovery")
121                .ack_completed(command_prev_epoch, creating_job_epochs);
122        }
123    }
124
125    pub(crate) fn next_complete_barrier_task(
126        &mut self,
127        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
128    ) -> Option<CompleteBarrierTask> {
129        let mut task = None;
130        for database in self.databases.values_mut() {
131            let Some(database) = database.running_state_mut() else {
132                continue;
133            };
134            let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
135            database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
136        }
137        task
138    }
139
140    pub(crate) fn barrier_collected(
141        &mut self,
142        resp: BarrierCompleteResponse,
143        periodic_barriers: &mut PeriodicBarriers,
144    ) -> MetaResult<()> {
145        let (database_id, _) = from_partial_graph_id(resp.partial_graph_id);
146        let database_status = self.databases.get_mut(&database_id).expect("should exist");
147        match database_status {
148            DatabaseCheckpointControlStatus::Running(database) => {
149                database.barrier_collected(resp, periodic_barriers)
150            }
151            DatabaseCheckpointControlStatus::Recovering(state) => {
152                state.barrier_collected(database_id, resp);
153                Ok(())
154            }
155        }
156    }
157
158    pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
159        self.databases.iter().filter_map(|(database_id, database)| {
160            database.running_state().is_none().then_some(*database_id)
161        })
162    }
163
164    pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
165        self.databases.iter().filter_map(|(database_id, database)| {
166            database.running_state().is_some().then_some(*database_id)
167        })
168    }
169
170    pub(crate) fn database_info(&self, database_id: DatabaseId) -> Option<&InflightDatabaseInfo> {
171        self.databases
172            .get(&database_id)
173            .and_then(|database| database.running_state())
174            .map(|database| &database.database_info)
175    }
176
177    pub(crate) fn may_have_snapshot_backfilling_jobs(&self) -> bool {
178        self.databases
179            .values()
180            .any(|database| database.may_have_snapshot_backfilling_jobs())
181    }
182
183    /// return Some(failed `database_id` -> `err`)
184    pub(crate) fn handle_new_barrier(
185        &mut self,
186        new_barrier: NewBarrier,
187        control_stream_manager: &mut ControlStreamManager,
188    ) -> MetaResult<()> {
189        let NewBarrier {
190            database_id,
191            command,
192            span,
193            checkpoint,
194        } = new_barrier;
195
196        if let Some((mut command, notifiers)) = command {
197            if let &mut Command::CreateStreamingJob {
198                ref mut cross_db_snapshot_backfill_info,
199                ref info,
200                ..
201            } = &mut command
202            {
203                for (table_id, snapshot_epoch) in
204                    &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
205                {
206                    for database in self.databases.values() {
207                        if let Some(database) = database.running_state()
208                            && database.database_info.contains_job(table_id.as_job_id())
209                        {
210                            if let Some(committed_epoch) = database.committed_epoch {
211                                *snapshot_epoch = Some(committed_epoch);
212                            }
213                            break;
214                        }
215                    }
216                    if snapshot_epoch.is_none() {
217                        let table_id = *table_id;
218                        warn!(
219                            ?cross_db_snapshot_backfill_info,
220                            ?table_id,
221                            ?info,
222                            "database of cross db upstream table not found"
223                        );
224                        let err: MetaError =
225                            anyhow!("database of cross db upstream table {} not found", table_id)
226                                .into();
227                        for notifier in notifiers {
228                            notifier.notify_start_failed(err.clone());
229                        }
230
231                        return Ok(());
232                    }
233                }
234            }
235
236            let database = match self.databases.entry(database_id) {
237                Entry::Occupied(entry) => entry
238                    .into_mut()
239                    .expect_running("should not have command when not running"),
240                Entry::Vacant(entry) => match &command {
241                    Command::CreateStreamingJob { info, job_type, .. } => {
242                        let CreateStreamingJobType::Normal = job_type else {
243                            if cfg!(debug_assertions) {
244                                panic!(
245                                    "unexpected first job of type {job_type:?} with info {info:?}"
246                                );
247                            } else {
248                                for notifier in notifiers {
249                                    notifier.notify_start_failed(anyhow!("unexpected job_type {job_type:?} for first job {} in database {database_id}", info.streaming_job.id()).into());
250                                }
251                                return Ok(());
252                            }
253                        };
254                        let new_database = DatabaseCheckpointControl::new(
255                            database_id,
256                            self.env.shared_actor_infos().clone(),
257                        );
258                        control_stream_manager.add_partial_graph(database_id, None);
259                        entry
260                            .insert(DatabaseCheckpointControlStatus::Running(new_database))
261                            .expect_running("just initialized as running")
262                    }
263                    Command::Flush
264                    | Command::Pause
265                    | Command::Resume
266                    | Command::DropStreamingJobs { .. }
267                    | Command::DropSubscription { .. } => {
268                        for mut notifier in notifiers {
269                            notifier.notify_started();
270                            notifier.notify_collected();
271                        }
272                        warn!(?command, "skip command for empty database");
273                        return Ok(());
274                    }
275                    Command::RescheduleIntent { .. }
276                    | Command::ReplaceStreamJob(_)
277                    | Command::SourceChangeSplit(_)
278                    | Command::Throttle { .. }
279                    | Command::CreateSubscription { .. }
280                    | Command::ConnectorPropsChange(_)
281                    | Command::Refresh { .. }
282                    | Command::ListFinish { .. }
283                    | Command::LoadFinish { .. }
284                    | Command::ResetSource { .. }
285                    | Command::ResumeBackfill { .. }
286                    | Command::InjectSourceOffsets { .. } => {
287                        if cfg!(debug_assertions) {
288                            panic!(
289                                "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
290                                database_id, command
291                            )
292                        } else {
293                            warn!(%database_id, ?command, "database not exist when handling command");
294                            for notifier in notifiers {
295                                notifier.notify_start_failed(anyhow!("database {database_id} not exist when handling command {command:?}").into());
296                            }
297                            return Ok(());
298                        }
299                    }
300                },
301            };
302
303            database.handle_new_barrier(
304                Some((command, notifiers)),
305                checkpoint,
306                span,
307                control_stream_manager,
308                &self.hummock_version_stats,
309            )
310        } else {
311            let database = match self.databases.entry(database_id) {
312                Entry::Occupied(entry) => entry.into_mut(),
313                Entry::Vacant(_) => {
314                    // If it does not exist in the HashMap yet, it means that the first streaming
315                    // job has not been created, and we do not need to send a barrier.
316                    return Ok(());
317                }
318            };
319            let Some(database) = database.running_state_mut() else {
320                // Skip new barrier for database which is not running.
321                return Ok(());
322            };
323            if !database.can_inject_barrier(self.in_flight_barrier_nums) {
324                // Skip new barrier with no explicit command when the database should pause inject additional barrier
325                return Ok(());
326            }
327            database.handle_new_barrier(
328                None,
329                checkpoint,
330                span,
331                control_stream_manager,
332                &self.hummock_version_stats,
333            )
334        }
335    }
336
337    pub(crate) fn update_barrier_nums_metrics(&self) {
338        self.databases
339            .values()
340            .flat_map(|database| database.running_state())
341            .for_each(|database| database.update_barrier_nums_metrics());
342    }
343
344    pub(crate) fn gen_backfill_progress(&self) -> HashMap<JobId, BackfillProgress> {
345        let mut progress = HashMap::new();
346        for status in self.databases.values() {
347            let Some(database_checkpoint_control) = status.running_state() else {
348                continue;
349            };
350            // Progress of normal backfill
351            progress.extend(
352                database_checkpoint_control
353                    .database_info
354                    .gen_backfill_progress(),
355            );
356            // Progress of snapshot backfill
357            for creating_job in database_checkpoint_control
358                .creating_streaming_job_controls
359                .values()
360            {
361                progress.extend([(creating_job.job_id, creating_job.gen_backfill_progress())]);
362            }
363        }
364        progress
365    }
366
367    pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
368        let mut progress = Vec::new();
369        for status in self.databases.values() {
370            let Some(database_checkpoint_control) = status.running_state() else {
371                continue;
372            };
373            progress.extend(
374                database_checkpoint_control
375                    .database_info
376                    .gen_fragment_backfill_progress(),
377            );
378            for creating_job in database_checkpoint_control
379                .creating_streaming_job_controls
380                .values()
381            {
382                progress.extend(creating_job.gen_fragment_backfill_progress());
383            }
384        }
385        progress
386    }
387
388    pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
389        let mut progress = HashMap::new();
390        for status in self.databases.values() {
391            let Some(database_checkpoint_control) = status.running_state() else {
392                continue;
393            };
394            // Progress of normal backfill
395            progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
396        }
397        progress
398    }
399
400    pub(crate) fn databases_failed_at_worker_err(
401        &mut self,
402        worker_id: WorkerId,
403    ) -> Vec<DatabaseId> {
404        let mut failed_databases = Vec::new();
405        for (database_id, database_status) in &mut self.databases {
406            let database_checkpoint_control = match database_status {
407                DatabaseCheckpointControlStatus::Running(control) => control,
408                DatabaseCheckpointControlStatus::Recovering(state) => {
409                    if !state.is_valid_after_worker_err(worker_id) {
410                        failed_databases.push(*database_id);
411                    }
412                    continue;
413                }
414            };
415
416            if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
417                || database_checkpoint_control
418                    .database_info
419                    .contains_worker(worker_id as _)
420                || database_checkpoint_control
421                    .creating_streaming_job_controls
422                    .values_mut()
423                    .any(|job| !job.is_valid_after_worker_err(worker_id))
424            {
425                failed_databases.push(*database_id);
426            }
427        }
428        failed_databases
429    }
430
431    pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
432        for (_, node) in self.databases.values_mut().flat_map(|status| {
433            status
434                .running_state_mut()
435                .map(|database| take(&mut database.command_ctx_queue))
436                .into_iter()
437                .flatten()
438        }) {
439            for notifier in node.notifiers {
440                notifier.notify_collection_failed(err.clone());
441            }
442            node.enqueue_time.observe_duration();
443        }
444    }
445
446    pub(crate) fn inflight_infos(
447        &self,
448    ) -> impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId> + '_)> + '_ {
449        self.databases.iter().flat_map(|(database_id, database)| {
450            database.database_state().map(|(_, creating_jobs)| {
451                (
452                    *database_id,
453                    creating_jobs
454                        .values()
455                        .filter_map(|job| job.is_consuming().then_some(job.job_id)),
456                )
457            })
458        })
459    }
460}
461
462pub(crate) enum CheckpointControlEvent<'a> {
463    EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
464    EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
465}
466
467impl CheckpointControl {
468    pub(crate) fn on_reset_partial_graph_resp(
469        &mut self,
470        worker_id: WorkerId,
471        resp: ResetPartialGraphResponse,
472    ) {
473        let (database_id, creating_job) = from_partial_graph_id(resp.partial_graph_id);
474        match self.databases.get_mut(&database_id).expect("should exist") {
475            DatabaseCheckpointControlStatus::Running(database) => {
476                if let Some(creating_job_id) = creating_job {
477                    let Entry::Occupied(mut entry) = database
478                        .creating_streaming_job_controls
479                        .entry(creating_job_id)
480                    else {
481                        if cfg!(debug_assertions) {
482                            panic!(
483                                "receive reset partial graph resp on non-existing creating job: {resp:?}"
484                            )
485                        }
486                        warn!(
487                            %database_id,
488                            %creating_job_id,
489                            %worker_id,
490                            ?resp,
491                            "ignore reset partial graph resp on non-existing creating job on running database"
492                        );
493                        return;
494                    };
495                    if entry.get_mut().on_reset_partial_graph_resp(worker_id, resp) {
496                        entry.remove();
497                    }
498                } else {
499                    unreachable!("should not receive reset database resp when database running")
500                }
501            }
502            DatabaseCheckpointControlStatus::Recovering(state) => {
503                state.on_reset_partial_graph_resp(worker_id, resp)
504            }
505        }
506    }
507
508    pub(crate) fn next_event(
509        &mut self,
510    ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
511        let mut this = Some(self);
512        poll_fn(move |cx| {
513            let Some(this_mut) = this.as_mut() else {
514                unreachable!("should not be polled after poll ready")
515            };
516            for (&database_id, database_status) in &mut this_mut.databases {
517                match database_status {
518                    DatabaseCheckpointControlStatus::Running(_) => {}
519                    DatabaseCheckpointControlStatus::Recovering(state) => {
520                        let poll_result = state.poll_next_event(cx);
521                        if let Poll::Ready(action) = poll_result {
522                            let this = this.take().expect("checked Some");
523                            return Poll::Ready(match action {
524                                RecoveringStateAction::EnterInitializing(reset_workers) => {
525                                    CheckpointControlEvent::EnteringInitializing(
526                                        this.new_database_status_action(
527                                            database_id,
528                                            EnterInitializing(reset_workers),
529                                        ),
530                                    )
531                                }
532                                RecoveringStateAction::EnterRunning => {
533                                    CheckpointControlEvent::EnteringRunning(
534                                        this.new_database_status_action(database_id, EnterRunning),
535                                    )
536                                }
537                            });
538                        }
539                    }
540                }
541            }
542            Poll::Pending
543        })
544    }
545}
546
547pub(crate) enum DatabaseCheckpointControlStatus {
548    Running(DatabaseCheckpointControl),
549    Recovering(DatabaseRecoveringState),
550}
551
552impl DatabaseCheckpointControlStatus {
553    fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
554        match self {
555            DatabaseCheckpointControlStatus::Running(state) => Some(state),
556            DatabaseCheckpointControlStatus::Recovering(_) => None,
557        }
558    }
559
560    fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
561        match self {
562            DatabaseCheckpointControlStatus::Running(state) => Some(state),
563            DatabaseCheckpointControlStatus::Recovering(_) => None,
564        }
565    }
566
567    fn may_have_snapshot_backfilling_jobs(&self) -> bool {
568        self.running_state()
569            .map(|database| !database.creating_streaming_job_controls.is_empty())
570            .unwrap_or(true) // there can be snapshot backfilling jobs when the database is recovering.
571    }
572
573    fn database_state(
574        &self,
575    ) -> Option<(
576        &BarrierWorkerState,
577        &HashMap<JobId, CreatingStreamingJobControl>,
578    )> {
579        match self {
580            DatabaseCheckpointControlStatus::Running(control) => {
581                Some((&control.state, &control.creating_streaming_job_controls))
582            }
583            DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
584        }
585    }
586
587    fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
588        match self {
589            DatabaseCheckpointControlStatus::Running(state) => state,
590            DatabaseCheckpointControlStatus::Recovering(_) => {
591                panic!("should be at running: {}", reason)
592            }
593        }
594    }
595}
596
597struct DatabaseCheckpointControlMetrics {
598    barrier_latency: LabelGuardedHistogram,
599    in_flight_barrier_nums: LabelGuardedIntGauge,
600    all_barrier_nums: LabelGuardedIntGauge,
601}
602
603impl DatabaseCheckpointControlMetrics {
604    fn new(database_id: DatabaseId) -> Self {
605        let database_id_str = database_id.to_string();
606        let barrier_latency = GLOBAL_META_METRICS
607            .barrier_latency
608            .with_guarded_label_values(&[&database_id_str]);
609        let in_flight_barrier_nums = GLOBAL_META_METRICS
610            .in_flight_barrier_nums
611            .with_guarded_label_values(&[&database_id_str]);
612        let all_barrier_nums = GLOBAL_META_METRICS
613            .all_barrier_nums
614            .with_guarded_label_values(&[&database_id_str]);
615        Self {
616            barrier_latency,
617            in_flight_barrier_nums,
618            all_barrier_nums,
619        }
620    }
621}
622
623/// Controls the concurrent execution of commands.
624pub(in crate::barrier) struct DatabaseCheckpointControl {
625    pub(super) database_id: DatabaseId,
626    pub(super) state: BarrierWorkerState,
627
628    /// Save the state and message of barrier in order.
629    /// Key is the `prev_epoch`.
630    command_ctx_queue: BTreeMap<u64, EpochNode>,
631    /// The barrier that are completing.
632    /// Some(`prev_epoch`)
633    completing_barrier: Option<u64>,
634
635    committed_epoch: Option<u64>,
636
637    pub(super) database_info: InflightDatabaseInfo,
638    pub creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
639
640    metrics: DatabaseCheckpointControlMetrics,
641}
642
643impl DatabaseCheckpointControl {
644    fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
645        Self {
646            database_id,
647            state: BarrierWorkerState::new(),
648            command_ctx_queue: Default::default(),
649            completing_barrier: None,
650            committed_epoch: None,
651            database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
652            creating_streaming_job_controls: Default::default(),
653            metrics: DatabaseCheckpointControlMetrics::new(database_id),
654        }
655    }
656
657    pub(crate) fn recovery(
658        database_id: DatabaseId,
659        state: BarrierWorkerState,
660        committed_epoch: u64,
661        database_info: InflightDatabaseInfo,
662        creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
663    ) -> Self {
664        Self {
665            database_id,
666            state,
667            command_ctx_queue: Default::default(),
668            completing_barrier: None,
669            committed_epoch: Some(committed_epoch),
670            database_info,
671            creating_streaming_job_controls,
672            metrics: DatabaseCheckpointControlMetrics::new(database_id),
673        }
674    }
675
676    fn total_command_num(&self) -> usize {
677        self.command_ctx_queue.len()
678            + match &self.completing_barrier {
679                Some(_) => 1,
680                None => 0,
681            }
682    }
683
684    /// Update the metrics of barrier nums.
685    fn update_barrier_nums_metrics(&self) {
686        self.metrics.in_flight_barrier_nums.set(
687            self.command_ctx_queue
688                .values()
689                .filter(|x| x.state.is_inflight())
690                .count() as i64,
691        );
692        self.metrics
693            .all_barrier_nums
694            .set(self.total_command_num() as i64);
695    }
696
697    /// Enqueue a barrier command
698    fn enqueue_command(
699        &mut self,
700        command_ctx: CommandContext,
701        notifiers: Vec<Notifier>,
702        node_to_collect: NodeToCollect,
703        creating_jobs_to_wait: HashSet<JobId>,
704    ) {
705        let timer = self.metrics.barrier_latency.start_timer();
706
707        if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
708            assert_eq!(
709                command_ctx.barrier_info.prev_epoch.value(),
710                node.command_ctx.barrier_info.curr_epoch.value()
711            );
712        }
713
714        tracing::trace!(
715            prev_epoch = command_ctx.barrier_info.prev_epoch(),
716            ?creating_jobs_to_wait,
717            ?node_to_collect,
718            "enqueue command"
719        );
720        self.command_ctx_queue.insert(
721            command_ctx.barrier_info.prev_epoch(),
722            EpochNode {
723                enqueue_time: timer,
724                state: BarrierEpochState {
725                    node_to_collect,
726                    resps: vec![],
727                    creating_jobs_to_wait,
728                    finished_jobs: HashMap::new(),
729                },
730                command_ctx,
731                notifiers,
732            },
733        );
734    }
735
736    /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
737    /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
738    fn barrier_collected(
739        &mut self,
740        resp: BarrierCompleteResponse,
741        periodic_barriers: &mut PeriodicBarriers,
742    ) -> MetaResult<()> {
743        let worker_id = resp.worker_id;
744        let prev_epoch = resp.epoch;
745        tracing::trace!(
746            %worker_id,
747            prev_epoch,
748            partial_graph_id = %resp.partial_graph_id,
749            "barrier collected"
750        );
751        let (database_id, creating_job_id) = from_partial_graph_id(resp.partial_graph_id);
752        assert_eq!(self.database_id, database_id);
753        match creating_job_id {
754            None => {
755                if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
756                    assert!(node.state.node_to_collect.remove(&worker_id));
757                    node.state.resps.push(resp);
758                } else {
759                    panic!(
760                        "collect barrier on non-existing barrier: {}, {}",
761                        prev_epoch, worker_id
762                    );
763                }
764            }
765            Some(creating_job_id) => {
766                let should_merge_to_upstream = self
767                    .creating_streaming_job_controls
768                    .get_mut(&creating_job_id)
769                    .expect("should exist")
770                    .collect(resp);
771                if should_merge_to_upstream {
772                    periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
773                }
774            }
775        }
776        Ok(())
777    }
778
779    /// Pause inject barrier until True.
780    fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
781        self.command_ctx_queue
782            .values()
783            .filter(|x| x.state.is_inflight())
784            .count()
785            < in_flight_barrier_nums
786    }
787
788    /// Return whether the database can still work after worker failure
789    pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
790        for epoch_node in self.command_ctx_queue.values() {
791            if !is_valid_after_worker_err(&epoch_node.state.node_to_collect, worker_id) {
792                return false;
793            }
794        }
795        true
796    }
797}
798
799impl DatabaseCheckpointControl {
800    /// return creating job table fragment id -> (backfill progress epoch , {`upstream_mv_table_id`})
801    fn collect_backfill_pinned_upstream_log_epoch(
802        &self,
803    ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
804        self.creating_streaming_job_controls
805            .iter()
806            .map(|(job_id, creating_job)| {
807                let progress_epoch = creating_job.pinned_upstream_log_epoch();
808                (
809                    *job_id,
810                    (
811                        progress_epoch,
812                        creating_job.snapshot_backfill_upstream_tables.clone(),
813                    ),
814                )
815            })
816            .collect()
817    }
818
819    fn next_complete_barrier_task(
820        &mut self,
821        task: &mut Option<CompleteBarrierTask>,
822        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
823        hummock_version_stats: &HummockVersionStats,
824    ) {
825        // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
826        let mut creating_jobs_task = vec![];
827        if let Some(committed_epoch) = self.committed_epoch {
828            // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
829            let mut finished_jobs = Vec::new();
830            let min_upstream_inflight_barrier = self
831                .command_ctx_queue
832                .first_key_value()
833                .map(|(epoch, _)| *epoch);
834            for (job_id, job) in &mut self.creating_streaming_job_controls {
835                if let Some((epoch, resps, status)) =
836                    job.start_completing(min_upstream_inflight_barrier, committed_epoch)
837                {
838                    let create_info = match status {
839                        CompleteJobType::First(info) => Some(info),
840                        CompleteJobType::Normal => None,
841                        CompleteJobType::Finished => {
842                            finished_jobs.push((*job_id, epoch, resps));
843                            continue;
844                        }
845                    };
846                    creating_jobs_task.push((*job_id, epoch, resps, create_info));
847                }
848            }
849            if !finished_jobs.is_empty()
850                && let Some((_, control_stream_manager)) = &mut context
851            {
852                control_stream_manager.remove_partial_graph(
853                    self.database_id,
854                    finished_jobs.iter().map(|(job_id, _, _)| *job_id).collect(),
855                );
856            }
857            for (job_id, epoch, resps) in finished_jobs {
858                let epoch_state = &mut self
859                    .command_ctx_queue
860                    .get_mut(&epoch)
861                    .expect("should exist")
862                    .state;
863                assert!(epoch_state.creating_jobs_to_wait.remove(&job_id));
864                debug!(epoch, %job_id, "finish creating job");
865                // It's safe to remove the creating job, because on CompleteJobType::Finished,
866                // all previous barriers have been collected and completed.
867                let creating_streaming_job = self
868                    .creating_streaming_job_controls
869                    .remove(&job_id)
870                    .expect("should exist");
871                let tracking_job = creating_streaming_job.into_tracking_job();
872
873                assert!(
874                    epoch_state
875                        .finished_jobs
876                        .insert(job_id, (resps, tracking_job))
877                        .is_none()
878                );
879            }
880        }
881        assert!(self.completing_barrier.is_none());
882        while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
883            && !state.is_inflight()
884        {
885            {
886                let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
887                assert!(node.state.creating_jobs_to_wait.is_empty());
888                assert!(node.state.node_to_collect.is_empty());
889
890                self.handle_refresh_table_info(task, &node);
891
892                self.database_info.apply_collected_command(
893                    &node.command_ctx.command,
894                    &node.state.resps,
895                    hummock_version_stats,
896                );
897                if !node.command_ctx.barrier_info.kind.is_checkpoint() {
898                    node.notifiers.into_iter().for_each(|notifier| {
899                        notifier.notify_collected();
900                    });
901                    if let Some((periodic_barriers, _)) = &mut context
902                        && self.database_info.has_pending_finished_jobs()
903                        && self
904                            .command_ctx_queue
905                            .values()
906                            .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
907                    {
908                        periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
909                    }
910                    continue;
911                }
912                let mut staging_commit_info = self.database_info.take_staging_commit_info();
913                node.state
914                    .finished_jobs
915                    .drain()
916                    .for_each(|(_job_id, (resps, tracking_job))| {
917                        node.state.resps.extend(resps);
918                        staging_commit_info.finished_jobs.push(tracking_job);
919                    });
920                let task = task.get_or_insert_default();
921                node.command_ctx.collect_commit_epoch_info(
922                    &mut task.commit_info,
923                    take(&mut node.state.resps),
924                    self.collect_backfill_pinned_upstream_log_epoch(),
925                );
926                self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
927                task.finished_jobs.extend(staging_commit_info.finished_jobs);
928                task.finished_cdc_table_backfill
929                    .extend(staging_commit_info.finished_cdc_table_backfill);
930                task.notifiers.extend(node.notifiers);
931                task.epoch_infos
932                    .try_insert(
933                        self.database_id,
934                        (Some((node.command_ctx, node.enqueue_time)), vec![]),
935                    )
936                    .expect("non duplicate");
937                task.commit_info
938                    .truncate_tables
939                    .extend(staging_commit_info.table_ids_to_truncate);
940                break;
941            }
942        }
943        if !creating_jobs_task.is_empty() {
944            let task = task.get_or_insert_default();
945            for (job_id, epoch, resps, create_info) in creating_jobs_task {
946                collect_creating_job_commit_epoch_info(
947                    &mut task.commit_info,
948                    epoch,
949                    resps,
950                    self.creating_streaming_job_controls[&job_id]
951                        .state_table_ids()
952                        .iter()
953                        .copied(),
954                    create_info.as_ref(),
955                );
956                let (_, creating_job_epochs) =
957                    task.epoch_infos.entry(self.database_id).or_default();
958                creating_job_epochs.push((job_id, epoch, create_info));
959            }
960        }
961    }
962
963    fn ack_completed(
964        &mut self,
965        command_prev_epoch: Option<u64>,
966        creating_job_epochs: Vec<(JobId, u64)>,
967    ) {
968        {
969            if let Some(prev_epoch) = self.completing_barrier.take() {
970                assert_eq!(command_prev_epoch, Some(prev_epoch));
971                self.committed_epoch = Some(prev_epoch);
972            } else {
973                assert_eq!(command_prev_epoch, None);
974            };
975            for (job_id, epoch) in creating_job_epochs {
976                self.creating_streaming_job_controls
977                    .get_mut(&job_id)
978                    .expect("should exist")
979                    .ack_completed(epoch)
980            }
981        }
982    }
983
984    fn handle_refresh_table_info(&self, task: &mut Option<CompleteBarrierTask>, node: &EpochNode) {
985        let list_finished_info = node
986            .state
987            .resps
988            .iter()
989            .flat_map(|resp| resp.list_finished_sources.clone())
990            .collect::<Vec<_>>();
991        if !list_finished_info.is_empty() {
992            let task = task.get_or_insert_default();
993            task.list_finished_source_ids.extend(list_finished_info);
994        }
995
996        let load_finished_info = node
997            .state
998            .resps
999            .iter()
1000            .flat_map(|resp| resp.load_finished_sources.clone())
1001            .collect::<Vec<_>>();
1002        if !load_finished_info.is_empty() {
1003            let task = task.get_or_insert_default();
1004            task.load_finished_source_ids.extend(load_finished_info);
1005        }
1006
1007        let refresh_finished_table_ids: Vec<JobId> = node
1008            .state
1009            .resps
1010            .iter()
1011            .flat_map(|resp| {
1012                resp.refresh_finished_tables
1013                    .iter()
1014                    .map(|table_id| table_id.as_job_id())
1015            })
1016            .collect::<Vec<_>>();
1017        if !refresh_finished_table_ids.is_empty() {
1018            let task = task.get_or_insert_default();
1019            task.refresh_finished_table_job_ids
1020                .extend(refresh_finished_table_ids);
1021        }
1022    }
1023}
1024
1025/// The state and message of this barrier, a node for concurrent checkpoint.
1026struct EpochNode {
1027    /// Timer for recording barrier latency, taken after `complete_barriers`.
1028    enqueue_time: HistogramTimer,
1029
1030    /// Whether this barrier is in-flight or completed.
1031    state: BarrierEpochState,
1032
1033    /// Context of this command to generate barrier and do some post jobs.
1034    command_ctx: CommandContext,
1035    /// Notifiers of this barrier.
1036    notifiers: Vec<Notifier>,
1037}
1038
1039#[derive(Debug)]
1040/// The state of barrier.
1041struct BarrierEpochState {
1042    node_to_collect: NodeToCollect,
1043
1044    resps: Vec<BarrierCompleteResponse>,
1045
1046    creating_jobs_to_wait: HashSet<JobId>,
1047
1048    finished_jobs: HashMap<JobId, (Vec<BarrierCompleteResponse>, TrackingJob)>,
1049}
1050
1051impl BarrierEpochState {
1052    fn is_inflight(&self) -> bool {
1053        !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
1054    }
1055}
1056
1057impl DatabaseCheckpointControl {
1058    /// Handle the new barrier from the scheduled queue and inject it.
1059    fn handle_new_barrier(
1060        &mut self,
1061        command: Option<(Command, Vec<Notifier>)>,
1062        checkpoint: bool,
1063        span: tracing::Span,
1064        control_stream_manager: &mut ControlStreamManager,
1065        hummock_version_stats: &HummockVersionStats,
1066    ) -> MetaResult<()> {
1067        let curr_epoch = self.state.in_flight_prev_epoch().next();
1068
1069        let (command, mut notifiers) = if let Some((command, notifiers)) = command {
1070            (Some(command), notifiers)
1071        } else {
1072            (None, vec![])
1073        };
1074
1075        debug_assert!(
1076            !matches!(
1077                &command,
1078                Some(Command::RescheduleIntent {
1079                    reschedule_plan: None,
1080                    ..
1081                })
1082            ),
1083            "reschedule intent should be resolved before injection"
1084        );
1085
1086        if let Some(Command::DropStreamingJobs {
1087            streaming_job_ids, ..
1088        }) = &command
1089        {
1090            if streaming_job_ids.len() > 1 {
1091                for job_to_cancel in streaming_job_ids {
1092                    if self
1093                        .creating_streaming_job_controls
1094                        .contains_key(job_to_cancel)
1095                    {
1096                        warn!(
1097                            job_id = %job_to_cancel,
1098                            "ignore multi-job cancel command on creating snapshot backfill streaming job"
1099                        );
1100                        for notifier in notifiers {
1101                            notifier
1102                                .notify_start_failed(anyhow!("cannot cancel creating snapshot backfill streaming job with other jobs, \
1103                                the job will continue creating until created or recovery. Please cancel the snapshot backfilling job in a single DDL ").into());
1104                        }
1105                        return Ok(());
1106                    }
1107                }
1108            } else if let Some(job_to_drop) = streaming_job_ids.iter().next()
1109                && let Some(creating_job) =
1110                    self.creating_streaming_job_controls.get_mut(job_to_drop)
1111                && creating_job.drop(&mut notifiers, control_stream_manager)
1112            {
1113                return Ok(());
1114            }
1115        }
1116
1117        if let Some(Command::Throttle { jobs, .. }) = &command
1118            && jobs.len() > 1
1119            && let Some(creating_job_id) = jobs
1120                .iter()
1121                .find(|job| self.creating_streaming_job_controls.contains_key(*job))
1122        {
1123            warn!(
1124                job_id = %creating_job_id,
1125                "ignore multi-job throttle command on creating snapshot backfill streaming job"
1126            );
1127            for notifier in notifiers {
1128                notifier
1129                    .notify_start_failed(anyhow!("cannot alter rate limit for snapshot backfill streaming job with other jobs, \
1130                                the original rate limit will be kept recovery.").into());
1131            }
1132            return Ok(());
1133        };
1134
1135        if let Some(Command::RescheduleIntent { .. }) = &command
1136            && !self.creating_streaming_job_controls.is_empty()
1137        {
1138            warn!("ignore reschedule when creating streaming job with snapshot backfill");
1139            for notifier in notifiers {
1140                notifier.notify_start_failed(
1141                    anyhow!(
1142                            "cannot reschedule when creating streaming job with snapshot backfill",
1143                        )
1144                        .into(),
1145                );
1146            }
1147            return Ok(());
1148        }
1149
1150        if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1151            && self.database_info.is_empty()
1152        {
1153            assert!(
1154                self.creating_streaming_job_controls.is_empty(),
1155                "should not have snapshot backfill job when there is no normal job in database"
1156            );
1157            // skip the command when there is nothing to do with the barrier
1158            for mut notifier in notifiers {
1159                notifier.notify_started();
1160                notifier.notify_collected();
1161            }
1162            return Ok(());
1163        };
1164
1165        if let Some(Command::CreateStreamingJob {
1166            job_type: CreateStreamingJobType::SnapshotBackfill(_),
1167            ..
1168        }) = &command
1169            && self.state.is_paused()
1170        {
1171            warn!("cannot create streaming job with snapshot backfill when paused");
1172            for notifier in notifiers {
1173                notifier.notify_start_failed(
1174                    anyhow!("cannot create streaming job with snapshot backfill when paused",)
1175                        .into(),
1176                );
1177            }
1178            return Ok(());
1179        }
1180
1181        let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1182        // Tracing related stuff
1183        barrier_info.prev_epoch.span().in_scope(|| {
1184                tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1185            });
1186        span.record("epoch", barrier_info.curr_epoch());
1187
1188        let ApplyCommandInfo {
1189            mv_subscription_max_retention,
1190            table_ids_to_commit,
1191            jobs_to_wait,
1192            node_to_collect,
1193            command,
1194        } = match self.apply_command(
1195            command,
1196            &mut notifiers,
1197            &barrier_info,
1198            control_stream_manager,
1199            hummock_version_stats,
1200        ) {
1201            Ok(info) => info,
1202            Err(err) => {
1203                for notifier in notifiers {
1204                    notifier.notify_start_failed(err.clone());
1205                }
1206                fail_point!("inject_barrier_err_success");
1207                return Err(err);
1208            }
1209        };
1210
1211        // Notify about the injection.
1212        notifiers.iter_mut().for_each(|n| n.notify_started());
1213
1214        let command_ctx = CommandContext::new(
1215            barrier_info,
1216            mv_subscription_max_retention,
1217            table_ids_to_commit,
1218            command,
1219            span,
1220        );
1221
1222        // Record the in-flight barrier.
1223        self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1224
1225        Ok(())
1226    }
1227}