risingwave_meta/barrier/checkpoint/creating_job/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::LabelGuardedIntGauge;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::cdc::build_pb_actor_cdc_table_snapshot_splits_with_generation;
28use risingwave_meta_model::{CreateType, WorkerId};
29use risingwave_pb::ddl_service::DdlProgress;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::stream_plan::AddMutation;
32use risingwave_pb::stream_plan::barrier::PbBarrierKind;
33use risingwave_pb::stream_plan::barrier_mutation::Mutation;
34use risingwave_pb::stream_service::BarrierCompleteResponse;
35use status::CreatingStreamingJobStatus;
36use tracing::{debug, info};
37
38use crate::MetaResult;
39use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
40use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
41use crate::barrier::edge_builder::FragmentEdgeBuildResult;
42use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
43use crate::barrier::progress::CreateMviewProgressTracker;
44use crate::barrier::rpc::ControlStreamManager;
45use crate::barrier::{
46    BackfillOrderState, BarrierKind, Command, CreateStreamingJobCommandInfo, TracedEpoch,
47};
48use crate::controller::fragment::InflightFragmentInfo;
49use crate::model::{StreamJobActorsToCreate, StreamJobFragments};
50use crate::rpc::metrics::GLOBAL_META_METRICS;
51use crate::stream::build_actor_connector_splits;
52
53#[derive(Debug)]
54pub(crate) struct CreatingStreamingJobControl {
55    database_id: DatabaseId,
56    pub(super) job_id: TableId,
57    definition: String,
58    create_type: CreateType,
59    pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
60    backfill_epoch: u64,
61
62    graph_info: InflightStreamingJobInfo,
63
64    barrier_control: CreatingStreamingJobBarrierControl,
65    status: CreatingStreamingJobStatus,
66
67    upstream_lag: LabelGuardedIntGauge,
68}
69
70impl CreatingStreamingJobControl {
71    pub(super) fn new(
72        info: &CreateStreamingJobCommandInfo,
73        snapshot_backfill_upstream_tables: HashSet<TableId>,
74        backfill_epoch: u64,
75        version_stat: &HummockVersionStats,
76        control_stream_manager: &mut ControlStreamManager,
77        edges: &mut FragmentEdgeBuildResult,
78    ) -> MetaResult<Self> {
79        let job_id = info.stream_job_fragments.stream_job_id();
80        let database_id = DatabaseId::new(info.streaming_job.database_id());
81        debug!(
82            %job_id,
83            definition = info.definition,
84            "new creating job"
85        );
86        let snapshot_backfill_actors = info.stream_job_fragments.snapshot_backfill_actor_ids();
87        let backfill_nodes_to_pause =
88            get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
89                .into_iter()
90                .collect();
91        let backfill_order_state = BackfillOrderState::new(
92            info.fragment_backfill_ordering.clone(),
93            &info.stream_job_fragments,
94        );
95        let create_mview_tracker = CreateMviewProgressTracker::recover(
96            [(
97                job_id,
98                (
99                    info.definition.clone(),
100                    &*info.stream_job_fragments,
101                    backfill_order_state,
102                ),
103            )],
104            version_stat,
105        );
106        let fragment_infos: HashMap<_, _> = info.stream_job_fragments.new_fragment_info().collect();
107
108        let actors_to_create =
109            edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create());
110
111        let graph_info = InflightStreamingJobInfo {
112            job_id,
113            fragment_infos,
114        };
115
116        let mut barrier_control =
117            CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, false);
118
119        let mut prev_epoch_fake_physical_time = 0;
120        let mut pending_non_checkpoint_barriers = vec![];
121
122        let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
123            &mut prev_epoch_fake_physical_time,
124            &mut pending_non_checkpoint_barriers,
125            PbBarrierKind::Checkpoint,
126        );
127
128        let added_actors = info.stream_job_fragments.actor_ids();
129        let actor_splits = info
130            .init_split_assignment
131            .values()
132            .flat_map(build_actor_connector_splits)
133            .collect();
134
135        let initial_mutation = Mutation::Add(AddMutation {
136            // for mutation of snapshot backfill job, we won't include changes to dispatchers of upstream actors.
137            actor_dispatchers: Default::default(),
138            added_actors,
139            actor_splits,
140            // we assume that when handling snapshot backfill, the cluster must not be paused
141            pause: false,
142            subscriptions_to_add: Default::default(),
143            backfill_nodes_to_pause,
144            actor_cdc_table_snapshot_splits:
145                build_pb_actor_cdc_table_snapshot_splits_with_generation(
146                    info.cdc_table_snapshot_split_assignment.clone(),
147                )
148                .into(),
149            new_upstream_sinks: Default::default(),
150        });
151
152        control_stream_manager.add_partial_graph(database_id, Some(job_id));
153        Self::inject_barrier(
154            database_id,
155            job_id,
156            control_stream_manager,
157            &mut barrier_control,
158            &graph_info,
159            Some(&graph_info),
160            initial_barrier_info,
161            Some(actors_to_create),
162            Some(initial_mutation),
163        )?;
164
165        assert!(pending_non_checkpoint_barriers.is_empty());
166
167        Ok(Self {
168            database_id,
169            definition: info.definition.clone(),
170            create_type: info.create_type.into(),
171            job_id,
172            snapshot_backfill_upstream_tables,
173            barrier_control,
174            backfill_epoch,
175            graph_info,
176            status: CreatingStreamingJobStatus::ConsumingSnapshot {
177                prev_epoch_fake_physical_time,
178                pending_upstream_barriers: vec![],
179                version_stats: version_stat.clone(),
180                create_mview_tracker,
181                snapshot_backfill_actors,
182                backfill_epoch,
183                pending_non_checkpoint_barriers,
184            },
185            upstream_lag: GLOBAL_META_METRICS
186                .snapshot_backfill_lag
187                .with_guarded_label_values(&[&format!("{}", job_id)]),
188        })
189    }
190
191    fn resolve_upstream_log_epochs(
192        snapshot_backfill_upstream_tables: &HashSet<TableId>,
193        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
194        committed_epoch: u64,
195        upstream_curr_epoch: u64,
196    ) -> MetaResult<Vec<BarrierInfo>> {
197        // TODO: add sanity check
198        let table_id = snapshot_backfill_upstream_tables.iter().next().unwrap();
199        let mut epochs_iter = upstream_table_log_epochs[table_id].iter();
200        loop {
201            let (_, checkpoint_epoch) = epochs_iter.next().expect("not reach committed epoch yet");
202            if *checkpoint_epoch < committed_epoch {
203                continue;
204            }
205            assert_eq!(*checkpoint_epoch, committed_epoch);
206            break;
207        }
208        let mut ret = vec![];
209        let mut prev_epoch = committed_epoch;
210        let mut pending_non_checkpoint_barriers = vec![];
211        for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
212            for (i, epoch) in non_checkpoint_epochs
213                .iter()
214                .chain([checkpoint_epoch])
215                .enumerate()
216            {
217                assert!(*epoch > prev_epoch);
218                pending_non_checkpoint_barriers.push(prev_epoch);
219                ret.push(BarrierInfo {
220                    prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
221                    curr_epoch: TracedEpoch::new(Epoch(*epoch)),
222                    kind: if i == 0 {
223                        BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
224                    } else {
225                        BarrierKind::Barrier
226                    },
227                });
228                prev_epoch = *epoch;
229            }
230        }
231        ret.push(BarrierInfo {
232            prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
233            curr_epoch: TracedEpoch::new(Epoch(upstream_curr_epoch)),
234            kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
235        });
236        Ok(ret)
237    }
238
239    fn recover_consuming_snapshot(
240        job_id: TableId,
241        definition: &String,
242        snapshot_backfill_upstream_tables: &HashSet<TableId>,
243        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
244        backfill_epoch: u64,
245        committed_epoch: u64,
246        upstream_curr_epoch: u64,
247        stream_job_fragments: StreamJobFragments,
248        version_stat: &HummockVersionStats,
249    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
250        let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
251        let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
252        let mut pending_non_checkpoint_barriers = vec![];
253        let create_mview_tracker = CreateMviewProgressTracker::recover(
254            [(
255                job_id,
256                (
257                    definition.clone(),
258                    &stream_job_fragments,
259                    Default::default(),
260                ),
261            )],
262            version_stat,
263        );
264        let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
265            &mut prev_epoch_fake_physical_time,
266            &mut pending_non_checkpoint_barriers,
267            PbBarrierKind::Initial,
268        );
269        Ok((
270            CreatingStreamingJobStatus::ConsumingSnapshot {
271                prev_epoch_fake_physical_time,
272                pending_upstream_barriers: Self::resolve_upstream_log_epochs(
273                    snapshot_backfill_upstream_tables,
274                    upstream_table_log_epochs,
275                    backfill_epoch,
276                    upstream_curr_epoch,
277                )?,
278                version_stats: version_stat.clone(),
279                create_mview_tracker,
280                snapshot_backfill_actors,
281                backfill_epoch,
282                pending_non_checkpoint_barriers,
283            },
284            barrier_info,
285        ))
286    }
287
288    fn recover_consuming_log_store(
289        snapshot_backfill_upstream_tables: &HashSet<TableId>,
290        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
291        committed_epoch: u64,
292        upstream_curr_epoch: u64,
293        stream_job_fragments: StreamJobFragments,
294    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
295        let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
296        let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
297            snapshot_backfill_upstream_tables,
298            upstream_table_log_epochs,
299            committed_epoch,
300            upstream_curr_epoch,
301        )?;
302        let mut first_barrier = barriers_to_inject.remove(0);
303        assert!(first_barrier.kind.is_checkpoint());
304        first_barrier.kind = BarrierKind::Initial;
305        Ok((
306            CreatingStreamingJobStatus::ConsumingLogStore {
307                log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
308                    snapshot_backfill_actors.into_iter(),
309                    barriers_to_inject
310                        .last()
311                        .map(|info| info.prev_epoch() - committed_epoch)
312                        .unwrap_or(0),
313                ),
314                barriers_to_inject: Some(barriers_to_inject),
315            },
316            first_barrier,
317        ))
318    }
319
320    #[expect(clippy::too_many_arguments)]
321    pub(crate) fn recover(
322        database_id: DatabaseId,
323        job_id: TableId,
324        definition: String,
325        snapshot_backfill_upstream_tables: HashSet<TableId>,
326        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
327        backfill_epoch: u64,
328        committed_epoch: u64,
329        upstream_curr_epoch: u64,
330        graph_info: InflightStreamingJobInfo,
331        stream_job_fragments: StreamJobFragments,
332        version_stat: &HummockVersionStats,
333        new_actors: StreamJobActorsToCreate,
334        initial_mutation: Mutation,
335        control_stream_manager: &mut ControlStreamManager,
336    ) -> MetaResult<Self> {
337        debug!(
338            %job_id,
339            definition,
340            "recovered creating job"
341        );
342        let mut barrier_control =
343            CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, true);
344
345        let (status, first_barrier_info) = if committed_epoch < backfill_epoch {
346            Self::recover_consuming_snapshot(
347                job_id,
348                &definition,
349                &snapshot_backfill_upstream_tables,
350                upstream_table_log_epochs,
351                backfill_epoch,
352                committed_epoch,
353                upstream_curr_epoch,
354                stream_job_fragments,
355                version_stat,
356            )?
357        } else {
358            Self::recover_consuming_log_store(
359                &snapshot_backfill_upstream_tables,
360                upstream_table_log_epochs,
361                committed_epoch,
362                upstream_curr_epoch,
363                stream_job_fragments,
364            )?
365        };
366        control_stream_manager.add_partial_graph(database_id, Some(job_id));
367        Self::inject_barrier(
368            database_id,
369            job_id,
370            control_stream_manager,
371            &mut barrier_control,
372            &graph_info,
373            Some(&graph_info),
374            first_barrier_info,
375            Some(new_actors),
376            Some(initial_mutation),
377        )?;
378        Ok(Self {
379            database_id,
380            job_id,
381            definition,
382            create_type: CreateType::Background,
383            snapshot_backfill_upstream_tables,
384            backfill_epoch,
385            graph_info,
386            barrier_control,
387            status,
388            upstream_lag: GLOBAL_META_METRICS
389                .snapshot_backfill_lag
390                .with_guarded_label_values(&[&format!("{}", job_id)]),
391        })
392    }
393
394    pub(crate) fn is_empty(&self) -> bool {
395        self.barrier_control.is_empty()
396    }
397
398    pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
399        self.barrier_control.is_valid_after_worker_err(worker_id)
400            && (!self.status.is_finishing()
401                || InflightFragmentInfo::contains_worker(
402                    self.graph_info.fragment_infos(),
403                    worker_id,
404                ))
405    }
406
407    pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
408        let progress = match &self.status {
409            CreatingStreamingJobStatus::ConsumingSnapshot {
410                create_mview_tracker,
411                ..
412            } => {
413                if create_mview_tracker.has_pending_finished_jobs() {
414                    "Snapshot finished".to_owned()
415                } else {
416                    let progress = create_mview_tracker
417                        .gen_ddl_progress()
418                        .remove(&self.job_id.table_id)
419                        .expect("should exist");
420                    format!("Snapshot [{}]", progress.progress)
421                }
422            }
423            CreatingStreamingJobStatus::ConsumingLogStore {
424                log_store_progress_tracker,
425                ..
426            } => {
427                format!(
428                    "LogStore [{}]",
429                    log_store_progress_tracker.gen_ddl_progress()
430                )
431            }
432            CreatingStreamingJobStatus::Finishing(_) => {
433                format!(
434                    "Finishing [epoch count: {}]",
435                    self.barrier_control.inflight_barrier_count()
436                )
437            }
438        };
439        DdlProgress {
440            id: self.job_id.table_id as u64,
441            statement: self.definition.clone(),
442            create_type: self.create_type.as_str().to_owned(),
443            progress,
444        }
445    }
446
447    pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
448        if self.status.is_finishing() {
449            None
450        } else {
451            // TODO: when supporting recoverable snapshot backfill, we should use the max epoch that has committed
452            Some(max(
453                self.barrier_control.max_collected_epoch().unwrap_or(0),
454                self.backfill_epoch,
455            ))
456        }
457    }
458
459    fn inject_barrier(
460        database_id: DatabaseId,
461        table_id: TableId,
462        control_stream_manager: &mut ControlStreamManager,
463        barrier_control: &mut CreatingStreamingJobBarrierControl,
464        pre_applied_graph_info: &InflightStreamingJobInfo,
465        applied_graph_info: Option<&InflightStreamingJobInfo>,
466        barrier_info: BarrierInfo,
467        new_actors: Option<StreamJobActorsToCreate>,
468        mutation: Option<Mutation>,
469    ) -> MetaResult<()> {
470        let node_to_collect = control_stream_manager.inject_barrier(
471            database_id,
472            Some(table_id),
473            mutation,
474            &barrier_info,
475            pre_applied_graph_info.fragment_infos(),
476            applied_graph_info
477                .map(|graph_info| graph_info.fragment_infos())
478                .into_iter()
479                .flatten(),
480            new_actors,
481            vec![],
482            vec![],
483        )?;
484        barrier_control.enqueue_epoch(
485            barrier_info.prev_epoch(),
486            node_to_collect,
487            barrier_info.kind.clone(),
488        );
489        Ok(())
490    }
491
492    pub(super) fn on_new_command(
493        &mut self,
494        control_stream_manager: &mut ControlStreamManager,
495        command: Option<&Command>,
496        barrier_info: &BarrierInfo,
497    ) -> MetaResult<()> {
498        let table_id = self.job_id;
499        let start_consume_upstream =
500            if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
501                jobs_to_merge.contains_key(&table_id)
502            } else {
503                false
504            };
505        if start_consume_upstream {
506            info!(
507                table_id = self.job_id.table_id,
508                prev_epoch = barrier_info.prev_epoch(),
509                "start consuming upstream"
510            );
511        }
512        let progress_epoch =
513            if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
514                max(max_collected_epoch, self.backfill_epoch)
515            } else {
516                self.backfill_epoch
517            };
518        self.upstream_lag.set(
519            barrier_info
520                .prev_epoch
521                .value()
522                .0
523                .saturating_sub(progress_epoch) as _,
524        );
525        if start_consume_upstream {
526            self.status.start_consume_upstream(barrier_info);
527            Self::inject_barrier(
528                self.database_id,
529                self.job_id,
530                control_stream_manager,
531                &mut self.barrier_control,
532                &self.graph_info,
533                None,
534                barrier_info.clone(),
535                None,
536                None,
537            )?;
538        } else {
539            for (barrier_to_inject, mutation) in self.status.on_new_upstream_epoch(barrier_info) {
540                Self::inject_barrier(
541                    self.database_id,
542                    self.job_id,
543                    control_stream_manager,
544                    &mut self.barrier_control,
545                    &self.graph_info,
546                    Some(&self.graph_info),
547                    barrier_to_inject,
548                    None,
549                    mutation,
550                )?;
551            }
552        }
553        Ok(())
554    }
555
556    pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
557        self.status.update_progress(&resp.create_mview_progress);
558        self.barrier_control.collect(resp);
559        self.should_merge_to_upstream().is_some()
560    }
561
562    pub(super) fn should_merge_to_upstream(&self) -> Option<&InflightStreamingJobInfo> {
563        if let CreatingStreamingJobStatus::ConsumingLogStore {
564            log_store_progress_tracker,
565            barriers_to_inject,
566        } = &self.status
567            && barriers_to_inject.is_none()
568            && log_store_progress_tracker.is_finished()
569        {
570            Some(&self.graph_info)
571        } else {
572            None
573        }
574    }
575}
576
577pub(super) enum CompleteJobType {
578    /// The first barrier
579    First,
580    Normal,
581    /// The last barrier to complete
582    Finished,
583}
584
585impl CreatingStreamingJobControl {
586    pub(super) fn start_completing(
587        &mut self,
588        min_upstream_inflight_epoch: Option<u64>,
589    ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
590        let (finished_at_epoch, epoch_end_bound) = match &self.status {
591            CreatingStreamingJobStatus::Finishing(finish_at_epoch) => {
592                let epoch_end_bound = min_upstream_inflight_epoch
593                    .map(|upstream_epoch| {
594                        if upstream_epoch < *finish_at_epoch {
595                            Excluded(upstream_epoch)
596                        } else {
597                            Unbounded
598                        }
599                    })
600                    .unwrap_or(Unbounded);
601                (Some(*finish_at_epoch), epoch_end_bound)
602            }
603            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
604            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
605                None,
606                min_upstream_inflight_epoch
607                    .map(Excluded)
608                    .unwrap_or(Unbounded),
609            ),
610        };
611        self.barrier_control.start_completing(epoch_end_bound).map(
612            |(epoch, resps, is_first_commit)| {
613                let status = if let Some(finish_at_epoch) = finished_at_epoch {
614                    assert!(!is_first_commit);
615                    if epoch == finish_at_epoch {
616                        self.barrier_control.ack_completed(epoch);
617                        assert!(self.barrier_control.is_empty());
618                        CompleteJobType::Finished
619                    } else {
620                        CompleteJobType::Normal
621                    }
622                } else if is_first_commit {
623                    CompleteJobType::First
624                } else {
625                    CompleteJobType::Normal
626                };
627                (epoch, resps, status)
628            },
629        )
630    }
631
632    pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
633        self.barrier_control.ack_completed(completed_epoch);
634    }
635
636    pub(super) fn is_finished(&self) -> bool {
637        self.barrier_control.is_empty() && self.status.is_finishing()
638    }
639
640    pub fn is_consuming(&self) -> bool {
641        match &self.status {
642            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
643            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
644            CreatingStreamingJobStatus::Finishing(_) => false,
645        }
646    }
647
648    pub fn state_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
649        self.graph_info.existing_table_ids()
650    }
651
652    pub fn graph_info(&self) -> &InflightStreamingJobInfo {
653        &self.graph_info
654    }
655}