risingwave_meta/barrier/checkpoint/
control.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::hummock::HummockVersionStats;
29use risingwave_pb::stream_service::BarrierCompleteResponse;
30use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
31use tracing::{debug, warn};
32
33use crate::barrier::cdc_progress::CdcProgress;
34use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
35use crate::barrier::checkpoint::recovery::{
36    DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
37    RecoveringStateAction,
38};
39use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
40use crate::barrier::command::CommandContext;
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
42use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
43use crate::barrier::notifier::Notifier;
44use crate::barrier::progress::TrackingJob;
45use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
46use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
47use crate::barrier::utils::{
48    NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
49};
50use crate::barrier::{BackfillProgress, Command, CreateStreamingJobType};
51use crate::manager::MetaSrvEnv;
52use crate::rpc::metrics::GLOBAL_META_METRICS;
53use crate::{MetaError, MetaResult};
54
55pub(crate) struct CheckpointControl {
56    pub(crate) env: MetaSrvEnv,
57    pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
58    pub(super) hummock_version_stats: HummockVersionStats,
59    /// The max barrier nums in flight
60    pub(crate) in_flight_barrier_nums: usize,
61}
62
63impl CheckpointControl {
64    pub fn new(env: MetaSrvEnv) -> Self {
65        Self {
66            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
67            env,
68            databases: Default::default(),
69            hummock_version_stats: Default::default(),
70        }
71    }
72
73    pub(crate) fn recover(
74        databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
75        failed_databases: HashMap<DatabaseId, HashSet<JobId>>, /* `database_id` -> set of `creating_job_id` */
76        control_stream_manager: &mut ControlStreamManager,
77        hummock_version_stats: HummockVersionStats,
78        env: MetaSrvEnv,
79    ) -> Self {
80        env.shared_actor_infos()
81            .retain_databases(databases.keys().chain(failed_databases.keys()).cloned());
82        Self {
83            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
84            env,
85            databases: databases
86                .into_iter()
87                .map(|(database_id, control)| {
88                    (
89                        database_id,
90                        DatabaseCheckpointControlStatus::Running(control),
91                    )
92                })
93                .chain(
94                    failed_databases
95                        .into_iter()
96                        .map(|(database_id, creating_jobs)| {
97                            (
98                                database_id,
99                                DatabaseCheckpointControlStatus::Recovering(
100                                    DatabaseRecoveringState::new_resetting(
101                                        database_id,
102                                        creating_jobs.into_iter(),
103                                        control_stream_manager,
104                                    ),
105                                ),
106                            )
107                        }),
108                )
109                .collect(),
110            hummock_version_stats,
111        }
112    }
113
114    pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
115        self.hummock_version_stats = output.hummock_version_stats;
116        for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
117            self.databases
118                .get_mut(&database_id)
119                .expect("should exist")
120                .expect_running("should have wait for completing command before enter recovery")
121                .ack_completed(command_prev_epoch, creating_job_epochs);
122        }
123    }
124
125    pub(crate) fn next_complete_barrier_task(
126        &mut self,
127        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
128    ) -> Option<CompleteBarrierTask> {
129        let mut task = None;
130        for database in self.databases.values_mut() {
131            let Some(database) = database.running_state_mut() else {
132                continue;
133            };
134            let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
135            database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
136        }
137        task
138    }
139
140    pub(crate) fn barrier_collected(
141        &mut self,
142        resp: BarrierCompleteResponse,
143        periodic_barriers: &mut PeriodicBarriers,
144    ) -> MetaResult<()> {
145        let (database_id, _) = from_partial_graph_id(resp.partial_graph_id);
146        let database_status = self.databases.get_mut(&database_id).expect("should exist");
147        match database_status {
148            DatabaseCheckpointControlStatus::Running(database) => {
149                database.barrier_collected(resp, periodic_barriers)
150            }
151            DatabaseCheckpointControlStatus::Recovering(state) => {
152                state.barrier_collected(database_id, resp);
153                Ok(())
154            }
155        }
156    }
157
158    pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
159        self.databases.iter().filter_map(|(database_id, database)| {
160            database.running_state().is_none().then_some(*database_id)
161        })
162    }
163
164    pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
165        self.databases.iter().filter_map(|(database_id, database)| {
166            database.running_state().is_some().then_some(*database_id)
167        })
168    }
169
170    /// return Some(failed `database_id` -> `err`)
171    pub(crate) fn handle_new_barrier(
172        &mut self,
173        new_barrier: NewBarrier,
174        control_stream_manager: &mut ControlStreamManager,
175    ) -> MetaResult<()> {
176        let NewBarrier {
177            database_id,
178            command,
179            span,
180            checkpoint,
181        } = new_barrier;
182
183        if let Some((mut command, notifiers)) = command {
184            if let &mut Command::CreateStreamingJob {
185                ref mut cross_db_snapshot_backfill_info,
186                ref info,
187                ..
188            } = &mut command
189            {
190                for (table_id, snapshot_epoch) in
191                    &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
192                {
193                    for database in self.databases.values() {
194                        if let Some(database) = database.running_state()
195                            && database.database_info.contains_job(table_id.as_job_id())
196                        {
197                            if let Some(committed_epoch) = database.committed_epoch {
198                                *snapshot_epoch = Some(committed_epoch);
199                            }
200                            break;
201                        }
202                    }
203                    if snapshot_epoch.is_none() {
204                        let table_id = *table_id;
205                        warn!(
206                            ?cross_db_snapshot_backfill_info,
207                            ?table_id,
208                            ?info,
209                            "database of cross db upstream table not found"
210                        );
211                        let err: MetaError =
212                            anyhow!("database of cross db upstream table {} not found", table_id)
213                                .into();
214                        for notifier in notifiers {
215                            notifier.notify_start_failed(err.clone());
216                        }
217
218                        return Ok(());
219                    }
220                }
221            }
222
223            let database = match self.databases.entry(database_id) {
224                Entry::Occupied(entry) => entry
225                    .into_mut()
226                    .expect_running("should not have command when not running"),
227                Entry::Vacant(entry) => match &command {
228                    Command::CreateStreamingJob { info, job_type, .. } => {
229                        let CreateStreamingJobType::Normal = job_type else {
230                            if cfg!(debug_assertions) {
231                                panic!(
232                                    "unexpected first job of type {job_type:?} with info {info:?}"
233                                );
234                            } else {
235                                for notifier in notifiers {
236                                    notifier.notify_start_failed(anyhow!("unexpected job_type {job_type:?} for first job {} in database {database_id}", info.streaming_job.id()).into());
237                                }
238                                return Ok(());
239                            }
240                        };
241                        let new_database = DatabaseCheckpointControl::new(
242                            database_id,
243                            self.env.shared_actor_infos().clone(),
244                        );
245                        control_stream_manager.add_partial_graph(database_id, None);
246                        entry
247                            .insert(DatabaseCheckpointControlStatus::Running(new_database))
248                            .expect_running("just initialized as running")
249                    }
250                    Command::Flush
251                    | Command::Pause
252                    | Command::Resume
253                    | Command::DropStreamingJobs { .. }
254                    | Command::DropSubscription { .. } => {
255                        for mut notifier in notifiers {
256                            notifier.notify_started();
257                            notifier.notify_collected();
258                        }
259                        warn!(?command, "skip command for empty database");
260                        return Ok(());
261                    }
262                    Command::RescheduleFragment { .. }
263                    | Command::ReplaceStreamJob(_)
264                    | Command::SourceChangeSplit(_)
265                    | Command::Throttle { .. }
266                    | Command::CreateSubscription { .. }
267                    | Command::ConnectorPropsChange(_)
268                    | Command::Refresh { .. }
269                    | Command::ListFinish { .. }
270                    | Command::LoadFinish { .. }
271                    | Command::ResetSource { .. } => {
272                        if cfg!(debug_assertions) {
273                            panic!(
274                                "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
275                                database_id, command
276                            )
277                        } else {
278                            warn!(%database_id, ?command, "database not exist when handling command");
279                            for notifier in notifiers {
280                                notifier.notify_start_failed(anyhow!("database {database_id} not exist when handling command {command:?}").into());
281                            }
282                            return Ok(());
283                        }
284                    }
285                },
286            };
287
288            database.handle_new_barrier(
289                Some((command, notifiers)),
290                checkpoint,
291                span,
292                control_stream_manager,
293                &self.hummock_version_stats,
294            )
295        } else {
296            let database = match self.databases.entry(database_id) {
297                Entry::Occupied(entry) => entry.into_mut(),
298                Entry::Vacant(_) => {
299                    // If it does not exist in the HashMap yet, it means that the first streaming
300                    // job has not been created, and we do not need to send a barrier.
301                    return Ok(());
302                }
303            };
304            let Some(database) = database.running_state_mut() else {
305                // Skip new barrier for database which is not running.
306                return Ok(());
307            };
308            if !database.can_inject_barrier(self.in_flight_barrier_nums) {
309                // Skip new barrier with no explicit command when the database should pause inject additional barrier
310                return Ok(());
311            }
312            database.handle_new_barrier(
313                None,
314                checkpoint,
315                span,
316                control_stream_manager,
317                &self.hummock_version_stats,
318            )
319        }
320    }
321
322    pub(crate) fn update_barrier_nums_metrics(&self) {
323        self.databases
324            .values()
325            .flat_map(|database| database.running_state())
326            .for_each(|database| database.update_barrier_nums_metrics());
327    }
328
329    pub(crate) fn gen_backfill_progress(&self) -> HashMap<JobId, BackfillProgress> {
330        let mut progress = HashMap::new();
331        for status in self.databases.values() {
332            let Some(database_checkpoint_control) = status.running_state() else {
333                continue;
334            };
335            // Progress of normal backfill
336            progress.extend(
337                database_checkpoint_control
338                    .database_info
339                    .gen_backfill_progress(),
340            );
341            // Progress of snapshot backfill
342            for creating_job in database_checkpoint_control
343                .creating_streaming_job_controls
344                .values()
345            {
346                progress.extend([(creating_job.job_id, creating_job.gen_backfill_progress())]);
347            }
348        }
349        progress
350    }
351
352    pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
353        let mut progress = HashMap::new();
354        for status in self.databases.values() {
355            let Some(database_checkpoint_control) = status.running_state() else {
356                continue;
357            };
358            // Progress of normal backfill
359            progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
360        }
361        progress
362    }
363
364    pub(crate) fn databases_failed_at_worker_err(
365        &mut self,
366        worker_id: WorkerId,
367    ) -> Vec<DatabaseId> {
368        let mut failed_databases = Vec::new();
369        for (database_id, database_status) in &mut self.databases {
370            let database_checkpoint_control = match database_status {
371                DatabaseCheckpointControlStatus::Running(control) => control,
372                DatabaseCheckpointControlStatus::Recovering(state) => {
373                    if !state.is_valid_after_worker_err(worker_id) {
374                        failed_databases.push(*database_id);
375                    }
376                    continue;
377                }
378            };
379
380            if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
381                || database_checkpoint_control
382                    .database_info
383                    .contains_worker(worker_id as _)
384                || database_checkpoint_control
385                    .creating_streaming_job_controls
386                    .values_mut()
387                    .any(|job| !job.is_valid_after_worker_err(worker_id))
388            {
389                failed_databases.push(*database_id);
390            }
391        }
392        failed_databases
393    }
394
395    pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
396        for (_, node) in self.databases.values_mut().flat_map(|status| {
397            status
398                .running_state_mut()
399                .map(|database| take(&mut database.command_ctx_queue))
400                .into_iter()
401                .flatten()
402        }) {
403            for notifier in node.notifiers {
404                notifier.notify_collection_failed(err.clone());
405            }
406            node.enqueue_time.observe_duration();
407        }
408    }
409
410    pub(crate) fn inflight_infos(
411        &self,
412    ) -> impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId> + '_)> + '_ {
413        self.databases.iter().flat_map(|(database_id, database)| {
414            database.database_state().map(|(_, creating_jobs)| {
415                (
416                    *database_id,
417                    creating_jobs
418                        .values()
419                        .filter_map(|job| job.is_consuming().then_some(job.job_id)),
420                )
421            })
422        })
423    }
424}
425
426pub(crate) enum CheckpointControlEvent<'a> {
427    EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
428    EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
429}
430
431impl CheckpointControl {
432    pub(crate) fn on_reset_partial_graph_resp(
433        &mut self,
434        worker_id: WorkerId,
435        resp: ResetPartialGraphResponse,
436    ) {
437        let (database_id, creating_job) = from_partial_graph_id(resp.partial_graph_id);
438        match self.databases.get_mut(&database_id).expect("should exist") {
439            DatabaseCheckpointControlStatus::Running(database) => {
440                if let Some(creating_job_id) = creating_job {
441                    let Entry::Occupied(mut entry) = database
442                        .creating_streaming_job_controls
443                        .entry(creating_job_id)
444                    else {
445                        if cfg!(debug_assertions) {
446                            panic!(
447                                "receive reset partial graph resp on non-existing creating job: {resp:?}"
448                            )
449                        }
450                        warn!(
451                            %database_id,
452                            %creating_job_id,
453                            %worker_id,
454                            ?resp,
455                            "ignore reset partial graph resp on non-existing creating job on running database"
456                        );
457                        return;
458                    };
459                    if entry.get_mut().on_reset_partial_graph_resp(worker_id, resp) {
460                        entry.remove();
461                    }
462                } else {
463                    unreachable!("should not receive reset database resp when database running")
464                }
465            }
466            DatabaseCheckpointControlStatus::Recovering(state) => {
467                state.on_reset_partial_graph_resp(worker_id, resp)
468            }
469        }
470    }
471
472    pub(crate) fn next_event(
473        &mut self,
474    ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
475        let mut this = Some(self);
476        poll_fn(move |cx| {
477            let Some(this_mut) = this.as_mut() else {
478                unreachable!("should not be polled after poll ready")
479            };
480            for (&database_id, database_status) in &mut this_mut.databases {
481                match database_status {
482                    DatabaseCheckpointControlStatus::Running(_) => {}
483                    DatabaseCheckpointControlStatus::Recovering(state) => {
484                        let poll_result = state.poll_next_event(cx);
485                        if let Poll::Ready(action) = poll_result {
486                            let this = this.take().expect("checked Some");
487                            return Poll::Ready(match action {
488                                RecoveringStateAction::EnterInitializing(reset_workers) => {
489                                    CheckpointControlEvent::EnteringInitializing(
490                                        this.new_database_status_action(
491                                            database_id,
492                                            EnterInitializing(reset_workers),
493                                        ),
494                                    )
495                                }
496                                RecoveringStateAction::EnterRunning => {
497                                    CheckpointControlEvent::EnteringRunning(
498                                        this.new_database_status_action(database_id, EnterRunning),
499                                    )
500                                }
501                            });
502                        }
503                    }
504                }
505            }
506            Poll::Pending
507        })
508    }
509}
510
511pub(crate) enum DatabaseCheckpointControlStatus {
512    Running(DatabaseCheckpointControl),
513    Recovering(DatabaseRecoveringState),
514}
515
516impl DatabaseCheckpointControlStatus {
517    fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
518        match self {
519            DatabaseCheckpointControlStatus::Running(state) => Some(state),
520            DatabaseCheckpointControlStatus::Recovering(_) => None,
521        }
522    }
523
524    fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
525        match self {
526            DatabaseCheckpointControlStatus::Running(state) => Some(state),
527            DatabaseCheckpointControlStatus::Recovering(_) => None,
528        }
529    }
530
531    fn database_state(
532        &self,
533    ) -> Option<(
534        &BarrierWorkerState,
535        &HashMap<JobId, CreatingStreamingJobControl>,
536    )> {
537        match self {
538            DatabaseCheckpointControlStatus::Running(control) => {
539                Some((&control.state, &control.creating_streaming_job_controls))
540            }
541            DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
542        }
543    }
544
545    fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
546        match self {
547            DatabaseCheckpointControlStatus::Running(state) => state,
548            DatabaseCheckpointControlStatus::Recovering(_) => {
549                panic!("should be at running: {}", reason)
550            }
551        }
552    }
553}
554
555struct DatabaseCheckpointControlMetrics {
556    barrier_latency: LabelGuardedHistogram,
557    in_flight_barrier_nums: LabelGuardedIntGauge,
558    all_barrier_nums: LabelGuardedIntGauge,
559}
560
561impl DatabaseCheckpointControlMetrics {
562    fn new(database_id: DatabaseId) -> Self {
563        let database_id_str = database_id.to_string();
564        let barrier_latency = GLOBAL_META_METRICS
565            .barrier_latency
566            .with_guarded_label_values(&[&database_id_str]);
567        let in_flight_barrier_nums = GLOBAL_META_METRICS
568            .in_flight_barrier_nums
569            .with_guarded_label_values(&[&database_id_str]);
570        let all_barrier_nums = GLOBAL_META_METRICS
571            .all_barrier_nums
572            .with_guarded_label_values(&[&database_id_str]);
573        Self {
574            barrier_latency,
575            in_flight_barrier_nums,
576            all_barrier_nums,
577        }
578    }
579}
580
581/// Controls the concurrent execution of commands.
582pub(in crate::barrier) struct DatabaseCheckpointControl {
583    pub(super) database_id: DatabaseId,
584    pub(super) state: BarrierWorkerState,
585
586    /// Save the state and message of barrier in order.
587    /// Key is the `prev_epoch`.
588    command_ctx_queue: BTreeMap<u64, EpochNode>,
589    /// The barrier that are completing.
590    /// Some(`prev_epoch`)
591    completing_barrier: Option<u64>,
592
593    committed_epoch: Option<u64>,
594
595    pub(super) database_info: InflightDatabaseInfo,
596    pub creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
597
598    metrics: DatabaseCheckpointControlMetrics,
599}
600
601impl DatabaseCheckpointControl {
602    fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
603        Self {
604            database_id,
605            state: BarrierWorkerState::new(),
606            command_ctx_queue: Default::default(),
607            completing_barrier: None,
608            committed_epoch: None,
609            database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
610            creating_streaming_job_controls: Default::default(),
611            metrics: DatabaseCheckpointControlMetrics::new(database_id),
612        }
613    }
614
615    pub(crate) fn recovery(
616        database_id: DatabaseId,
617        state: BarrierWorkerState,
618        committed_epoch: u64,
619        database_info: InflightDatabaseInfo,
620        creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
621    ) -> Self {
622        Self {
623            database_id,
624            state,
625            command_ctx_queue: Default::default(),
626            completing_barrier: None,
627            committed_epoch: Some(committed_epoch),
628            database_info,
629            creating_streaming_job_controls,
630            metrics: DatabaseCheckpointControlMetrics::new(database_id),
631        }
632    }
633
634    fn total_command_num(&self) -> usize {
635        self.command_ctx_queue.len()
636            + match &self.completing_barrier {
637                Some(_) => 1,
638                None => 0,
639            }
640    }
641
642    /// Update the metrics of barrier nums.
643    fn update_barrier_nums_metrics(&self) {
644        self.metrics.in_flight_barrier_nums.set(
645            self.command_ctx_queue
646                .values()
647                .filter(|x| x.state.is_inflight())
648                .count() as i64,
649        );
650        self.metrics
651            .all_barrier_nums
652            .set(self.total_command_num() as i64);
653    }
654
655    /// Enqueue a barrier command
656    fn enqueue_command(
657        &mut self,
658        command_ctx: CommandContext,
659        notifiers: Vec<Notifier>,
660        node_to_collect: NodeToCollect,
661        creating_jobs_to_wait: HashSet<JobId>,
662    ) {
663        let timer = self.metrics.barrier_latency.start_timer();
664
665        if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
666            assert_eq!(
667                command_ctx.barrier_info.prev_epoch.value(),
668                node.command_ctx.barrier_info.curr_epoch.value()
669            );
670        }
671
672        tracing::trace!(
673            prev_epoch = command_ctx.barrier_info.prev_epoch(),
674            ?creating_jobs_to_wait,
675            ?node_to_collect,
676            "enqueue command"
677        );
678        self.command_ctx_queue.insert(
679            command_ctx.barrier_info.prev_epoch(),
680            EpochNode {
681                enqueue_time: timer,
682                state: BarrierEpochState {
683                    node_to_collect,
684                    resps: vec![],
685                    creating_jobs_to_wait,
686                    finished_jobs: HashMap::new(),
687                },
688                command_ctx,
689                notifiers,
690            },
691        );
692    }
693
694    /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
695    /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
696    fn barrier_collected(
697        &mut self,
698        resp: BarrierCompleteResponse,
699        periodic_barriers: &mut PeriodicBarriers,
700    ) -> MetaResult<()> {
701        let worker_id = resp.worker_id;
702        let prev_epoch = resp.epoch;
703        tracing::trace!(
704            %worker_id,
705            prev_epoch,
706            partial_graph_id = %resp.partial_graph_id,
707            "barrier collected"
708        );
709        let (database_id, creating_job_id) = from_partial_graph_id(resp.partial_graph_id);
710        assert_eq!(self.database_id, database_id);
711        match creating_job_id {
712            None => {
713                if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
714                    assert!(node.state.node_to_collect.remove(&worker_id));
715                    node.state.resps.push(resp);
716                } else {
717                    panic!(
718                        "collect barrier on non-existing barrier: {}, {}",
719                        prev_epoch, worker_id
720                    );
721                }
722            }
723            Some(creating_job_id) => {
724                let should_merge_to_upstream = self
725                    .creating_streaming_job_controls
726                    .get_mut(&creating_job_id)
727                    .expect("should exist")
728                    .collect(resp);
729                if should_merge_to_upstream {
730                    periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
731                }
732            }
733        }
734        Ok(())
735    }
736
737    /// Pause inject barrier until True.
738    fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
739        self.command_ctx_queue
740            .values()
741            .filter(|x| x.state.is_inflight())
742            .count()
743            < in_flight_barrier_nums
744    }
745
746    /// Return whether the database can still work after worker failure
747    pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
748        for epoch_node in self.command_ctx_queue.values() {
749            if !is_valid_after_worker_err(&epoch_node.state.node_to_collect, worker_id) {
750                return false;
751            }
752        }
753        true
754    }
755}
756
757impl DatabaseCheckpointControl {
758    /// return creating job table fragment id -> (backfill progress epoch , {`upstream_mv_table_id`})
759    fn collect_backfill_pinned_upstream_log_epoch(
760        &self,
761    ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
762        self.creating_streaming_job_controls
763            .iter()
764            .map(|(job_id, creating_job)| {
765                let progress_epoch = creating_job.pinned_upstream_log_epoch();
766                (
767                    *job_id,
768                    (
769                        progress_epoch,
770                        creating_job.snapshot_backfill_upstream_tables.clone(),
771                    ),
772                )
773            })
774            .collect()
775    }
776
777    fn next_complete_barrier_task(
778        &mut self,
779        task: &mut Option<CompleteBarrierTask>,
780        mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
781        hummock_version_stats: &HummockVersionStats,
782    ) {
783        // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
784        let mut creating_jobs_task = vec![];
785        if let Some(committed_epoch) = self.committed_epoch {
786            // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
787            let mut finished_jobs = Vec::new();
788            let min_upstream_inflight_barrier = self
789                .command_ctx_queue
790                .first_key_value()
791                .map(|(epoch, _)| *epoch);
792            for (job_id, job) in &mut self.creating_streaming_job_controls {
793                if let Some((epoch, resps, status)) =
794                    job.start_completing(min_upstream_inflight_barrier, committed_epoch)
795                {
796                    let create_info = match status {
797                        CompleteJobType::First(info) => Some(info),
798                        CompleteJobType::Normal => None,
799                        CompleteJobType::Finished => {
800                            finished_jobs.push((*job_id, epoch, resps));
801                            continue;
802                        }
803                    };
804                    creating_jobs_task.push((*job_id, epoch, resps, create_info));
805                }
806            }
807            if !finished_jobs.is_empty()
808                && let Some((_, control_stream_manager)) = &mut context
809            {
810                control_stream_manager.remove_partial_graph(
811                    self.database_id,
812                    finished_jobs.iter().map(|(job_id, _, _)| *job_id).collect(),
813                );
814            }
815            for (job_id, epoch, resps) in finished_jobs {
816                let epoch_state = &mut self
817                    .command_ctx_queue
818                    .get_mut(&epoch)
819                    .expect("should exist")
820                    .state;
821                assert!(epoch_state.creating_jobs_to_wait.remove(&job_id));
822                debug!(epoch, %job_id, "finish creating job");
823                // It's safe to remove the creating job, because on CompleteJobType::Finished,
824                // all previous barriers have been collected and completed.
825                let creating_streaming_job = self
826                    .creating_streaming_job_controls
827                    .remove(&job_id)
828                    .expect("should exist");
829                let tracking_job = creating_streaming_job.into_tracking_job();
830
831                assert!(
832                    epoch_state
833                        .finished_jobs
834                        .insert(job_id, (resps, tracking_job))
835                        .is_none()
836                );
837            }
838        }
839        assert!(self.completing_barrier.is_none());
840        while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
841            && !state.is_inflight()
842        {
843            {
844                let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
845                assert!(node.state.creating_jobs_to_wait.is_empty());
846                assert!(node.state.node_to_collect.is_empty());
847
848                self.handle_refresh_table_info(task, &node);
849
850                self.database_info.apply_collected_command(
851                    &node.command_ctx.command,
852                    &node.state.resps,
853                    hummock_version_stats,
854                );
855                if !node.command_ctx.barrier_info.kind.is_checkpoint() {
856                    node.notifiers.into_iter().for_each(|notifier| {
857                        notifier.notify_collected();
858                    });
859                    if let Some((periodic_barriers, _)) = &mut context
860                        && self.database_info.has_pending_finished_jobs()
861                        && self
862                            .command_ctx_queue
863                            .values()
864                            .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
865                    {
866                        periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
867                    }
868                    continue;
869                }
870                let mut staging_commit_info = self.database_info.take_staging_commit_info();
871                node.state
872                    .finished_jobs
873                    .drain()
874                    .for_each(|(_job_id, (resps, tracking_job))| {
875                        node.state.resps.extend(resps);
876                        staging_commit_info.finished_jobs.push(tracking_job);
877                    });
878                let task = task.get_or_insert_default();
879                node.command_ctx.collect_commit_epoch_info(
880                    &mut task.commit_info,
881                    take(&mut node.state.resps),
882                    self.collect_backfill_pinned_upstream_log_epoch(),
883                );
884                self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
885                task.finished_jobs.extend(staging_commit_info.finished_jobs);
886                task.finished_cdc_table_backfill
887                    .extend(staging_commit_info.finished_cdc_table_backfill);
888                task.notifiers.extend(node.notifiers);
889                task.epoch_infos
890                    .try_insert(
891                        self.database_id,
892                        (Some((node.command_ctx, node.enqueue_time)), vec![]),
893                    )
894                    .expect("non duplicate");
895                task.commit_info
896                    .truncate_tables
897                    .extend(staging_commit_info.table_ids_to_truncate);
898                break;
899            }
900        }
901        if !creating_jobs_task.is_empty() {
902            let task = task.get_or_insert_default();
903            for (job_id, epoch, resps, create_info) in creating_jobs_task {
904                collect_creating_job_commit_epoch_info(
905                    &mut task.commit_info,
906                    epoch,
907                    resps,
908                    self.creating_streaming_job_controls[&job_id]
909                        .state_table_ids()
910                        .iter()
911                        .copied(),
912                    create_info.as_ref(),
913                );
914                let (_, creating_job_epochs) =
915                    task.epoch_infos.entry(self.database_id).or_default();
916                creating_job_epochs.push((job_id, epoch, create_info));
917            }
918        }
919    }
920
921    fn ack_completed(
922        &mut self,
923        command_prev_epoch: Option<u64>,
924        creating_job_epochs: Vec<(JobId, u64)>,
925    ) {
926        {
927            if let Some(prev_epoch) = self.completing_barrier.take() {
928                assert_eq!(command_prev_epoch, Some(prev_epoch));
929                self.committed_epoch = Some(prev_epoch);
930            } else {
931                assert_eq!(command_prev_epoch, None);
932            };
933            for (job_id, epoch) in creating_job_epochs {
934                self.creating_streaming_job_controls
935                    .get_mut(&job_id)
936                    .expect("should exist")
937                    .ack_completed(epoch)
938            }
939        }
940    }
941
942    fn handle_refresh_table_info(&self, task: &mut Option<CompleteBarrierTask>, node: &EpochNode) {
943        let list_finished_info = node
944            .state
945            .resps
946            .iter()
947            .flat_map(|resp| resp.list_finished_sources.clone())
948            .collect::<Vec<_>>();
949        if !list_finished_info.is_empty() {
950            let task = task.get_or_insert_default();
951            task.list_finished_source_ids.extend(list_finished_info);
952        }
953
954        let load_finished_info = node
955            .state
956            .resps
957            .iter()
958            .flat_map(|resp| resp.load_finished_sources.clone())
959            .collect::<Vec<_>>();
960        if !load_finished_info.is_empty() {
961            let task = task.get_or_insert_default();
962            task.load_finished_source_ids.extend(load_finished_info);
963        }
964
965        let refresh_finished_table_ids: Vec<JobId> = node
966            .state
967            .resps
968            .iter()
969            .flat_map(|resp| {
970                resp.refresh_finished_tables
971                    .iter()
972                    .map(|table_id| table_id.as_job_id())
973            })
974            .collect::<Vec<_>>();
975        if !refresh_finished_table_ids.is_empty() {
976            let task = task.get_or_insert_default();
977            task.refresh_finished_table_job_ids
978                .extend(refresh_finished_table_ids);
979        }
980    }
981}
982
983/// The state and message of this barrier, a node for concurrent checkpoint.
984struct EpochNode {
985    /// Timer for recording barrier latency, taken after `complete_barriers`.
986    enqueue_time: HistogramTimer,
987
988    /// Whether this barrier is in-flight or completed.
989    state: BarrierEpochState,
990
991    /// Context of this command to generate barrier and do some post jobs.
992    command_ctx: CommandContext,
993    /// Notifiers of this barrier.
994    notifiers: Vec<Notifier>,
995}
996
997#[derive(Debug)]
998/// The state of barrier.
999struct BarrierEpochState {
1000    node_to_collect: NodeToCollect,
1001
1002    resps: Vec<BarrierCompleteResponse>,
1003
1004    creating_jobs_to_wait: HashSet<JobId>,
1005
1006    finished_jobs: HashMap<JobId, (Vec<BarrierCompleteResponse>, TrackingJob)>,
1007}
1008
1009impl BarrierEpochState {
1010    fn is_inflight(&self) -> bool {
1011        !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
1012    }
1013}
1014
1015impl DatabaseCheckpointControl {
1016    /// Handle the new barrier from the scheduled queue and inject it.
1017    fn handle_new_barrier(
1018        &mut self,
1019        command: Option<(Command, Vec<Notifier>)>,
1020        checkpoint: bool,
1021        span: tracing::Span,
1022        control_stream_manager: &mut ControlStreamManager,
1023        hummock_version_stats: &HummockVersionStats,
1024    ) -> MetaResult<()> {
1025        let curr_epoch = self.state.in_flight_prev_epoch().next();
1026
1027        let (command, mut notifiers) = if let Some((command, notifiers)) = command {
1028            (Some(command), notifiers)
1029        } else {
1030            (None, vec![])
1031        };
1032
1033        if let Some(Command::DropStreamingJobs {
1034            streaming_job_ids, ..
1035        }) = &command
1036        {
1037            if streaming_job_ids.len() > 1 {
1038                for job_to_cancel in streaming_job_ids {
1039                    if self
1040                        .creating_streaming_job_controls
1041                        .contains_key(job_to_cancel)
1042                    {
1043                        warn!(
1044                            job_id = %job_to_cancel,
1045                            "ignore multi-job cancel command on creating snapshot backfill streaming job"
1046                        );
1047                        for notifier in notifiers {
1048                            notifier
1049                                .notify_start_failed(anyhow!("cannot cancel creating snapshot backfill streaming job with other jobs, \
1050                                the job will continue creating until created or recovery. Please cancel the snapshot backfilling job in a single DDL ").into());
1051                        }
1052                        return Ok(());
1053                    }
1054                }
1055            } else if let Some(job_to_drop) = streaming_job_ids.iter().next()
1056                && let Some(creating_job) =
1057                    self.creating_streaming_job_controls.get_mut(job_to_drop)
1058                && creating_job.drop(&mut notifiers, control_stream_manager)
1059            {
1060                return Ok(());
1061            }
1062        }
1063
1064        if let Some(Command::Throttle { jobs, .. }) = &command
1065            && jobs.len() > 1
1066            && let Some(creating_job_id) = jobs
1067                .iter()
1068                .find(|job| self.creating_streaming_job_controls.contains_key(job))
1069        {
1070            warn!(
1071                job_id = %creating_job_id,
1072                "ignore multi-job throttle command on creating snapshot backfill streaming job"
1073            );
1074            for notifier in notifiers {
1075                notifier
1076                    .notify_start_failed(anyhow!("cannot alter rate limit for snapshot backfill streaming job with other jobs, \
1077                                the original rate limit will be kept recovery.").into());
1078            }
1079            return Ok(());
1080        };
1081
1082        if let Some(Command::RescheduleFragment { .. }) = &command
1083            && !self.creating_streaming_job_controls.is_empty()
1084        {
1085            warn!("ignore reschedule when creating streaming job with snapshot backfill");
1086            for notifier in notifiers {
1087                notifier.notify_start_failed(
1088                    anyhow!(
1089                            "cannot reschedule when creating streaming job with snapshot backfill",
1090                        )
1091                        .into(),
1092                );
1093            }
1094            return Ok(());
1095        }
1096
1097        if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1098            && self.database_info.is_empty()
1099        {
1100            assert!(
1101                self.creating_streaming_job_controls.is_empty(),
1102                "should not have snapshot backfill job when there is no normal job in database"
1103            );
1104            // skip the command when there is nothing to do with the barrier
1105            for mut notifier in notifiers {
1106                notifier.notify_started();
1107                notifier.notify_collected();
1108            }
1109            return Ok(());
1110        };
1111
1112        if let Some(Command::CreateStreamingJob {
1113            job_type: CreateStreamingJobType::SnapshotBackfill(_),
1114            ..
1115        }) = &command
1116            && self.state.is_paused()
1117        {
1118            warn!("cannot create streaming job with snapshot backfill when paused");
1119            for notifier in notifiers {
1120                notifier.notify_start_failed(
1121                    anyhow!("cannot create streaming job with snapshot backfill when paused",)
1122                        .into(),
1123                );
1124            }
1125            return Ok(());
1126        }
1127
1128        let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1129        // Tracing related stuff
1130        barrier_info.prev_epoch.span().in_scope(|| {
1131                tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1132            });
1133        span.record("epoch", barrier_info.curr_epoch());
1134
1135        let ApplyCommandInfo {
1136            mv_subscription_max_retention,
1137            table_ids_to_commit,
1138            jobs_to_wait,
1139            node_to_collect,
1140            command,
1141        } = match self.apply_command(
1142            command,
1143            &mut notifiers,
1144            &barrier_info,
1145            control_stream_manager,
1146            hummock_version_stats,
1147        ) {
1148            Ok(info) => info,
1149            Err(err) => {
1150                for notifier in notifiers {
1151                    notifier.notify_start_failed(err.clone());
1152                }
1153                fail_point!("inject_barrier_err_success");
1154                return Err(err);
1155            }
1156        };
1157
1158        // Notify about the injection.
1159        notifiers.iter_mut().for_each(|n| n.notify_started());
1160
1161        let command_ctx = CommandContext::new(
1162            barrier_info,
1163            mv_subscription_max_retention,
1164            table_ids_to_commit,
1165            command,
1166            span,
1167        );
1168
1169        // Record the in-flight barrier.
1170        self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1171
1172        Ok(())
1173    }
1174}