risingwave_meta/barrier/checkpoint/creating_job/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::LabelGuardedIntGauge;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::cdc::build_pb_actor_cdc_table_snapshot_splits;
28use risingwave_meta_model::{CreateType, WorkerId};
29use risingwave_pb::ddl_service::DdlProgress;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::stream_plan::AddMutation;
32use risingwave_pb::stream_plan::barrier::PbBarrierKind;
33use risingwave_pb::stream_plan::barrier_mutation::Mutation;
34use risingwave_pb::stream_service::BarrierCompleteResponse;
35use status::CreatingStreamingJobStatus;
36use tracing::{debug, info};
37
38use crate::MetaResult;
39use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
40use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
41use crate::barrier::edge_builder::FragmentEdgeBuildResult;
42use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
43use crate::barrier::progress::CreateMviewProgressTracker;
44use crate::barrier::rpc::ControlStreamManager;
45use crate::barrier::{
46    BackfillOrderState, BarrierKind, Command, CreateStreamingJobCommandInfo, TracedEpoch,
47};
48use crate::controller::fragment::InflightFragmentInfo;
49use crate::model::{StreamJobActorsToCreate, StreamJobFragments};
50use crate::rpc::metrics::GLOBAL_META_METRICS;
51use crate::stream::build_actor_connector_splits;
52
53#[derive(Debug)]
54pub(crate) struct CreatingStreamingJobControl {
55    database_id: DatabaseId,
56    pub(super) job_id: TableId,
57    definition: String,
58    create_type: CreateType,
59    pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
60    backfill_epoch: u64,
61
62    graph_info: InflightStreamingJobInfo,
63
64    barrier_control: CreatingStreamingJobBarrierControl,
65    status: CreatingStreamingJobStatus,
66
67    upstream_lag: LabelGuardedIntGauge,
68}
69
70impl CreatingStreamingJobControl {
71    pub(super) fn new(
72        info: &CreateStreamingJobCommandInfo,
73        snapshot_backfill_upstream_tables: HashSet<TableId>,
74        backfill_epoch: u64,
75        version_stat: &HummockVersionStats,
76        control_stream_manager: &mut ControlStreamManager,
77        edges: &mut FragmentEdgeBuildResult,
78    ) -> MetaResult<Self> {
79        let job_id = info.stream_job_fragments.stream_job_id();
80        let database_id = DatabaseId::new(info.streaming_job.database_id());
81        debug!(
82            %job_id,
83            definition = info.definition,
84            "new creating job"
85        );
86        let snapshot_backfill_actors = info.stream_job_fragments.snapshot_backfill_actor_ids();
87        let backfill_nodes_to_pause =
88            get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
89                .into_iter()
90                .collect();
91        let backfill_order_state = BackfillOrderState::new(
92            info.fragment_backfill_ordering.clone(),
93            &info.stream_job_fragments,
94        );
95        let create_mview_tracker = CreateMviewProgressTracker::recover(
96            [(
97                job_id,
98                (
99                    info.definition.clone(),
100                    &*info.stream_job_fragments,
101                    backfill_order_state,
102                ),
103            )],
104            version_stat,
105        );
106        let fragment_infos: HashMap<_, _> = info.stream_job_fragments.new_fragment_info().collect();
107
108        let actors_to_create =
109            edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create());
110
111        let graph_info = InflightStreamingJobInfo {
112            job_id,
113            fragment_infos,
114        };
115
116        let mut barrier_control =
117            CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, false);
118
119        let mut prev_epoch_fake_physical_time = 0;
120        let mut pending_non_checkpoint_barriers = vec![];
121
122        let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
123            &mut prev_epoch_fake_physical_time,
124            &mut pending_non_checkpoint_barriers,
125            PbBarrierKind::Checkpoint,
126        );
127
128        let added_actors = info.stream_job_fragments.actor_ids();
129        let actor_splits = info
130            .init_split_assignment
131            .values()
132            .flat_map(build_actor_connector_splits)
133            .collect();
134
135        let initial_mutation = Mutation::Add(AddMutation {
136            // for mutation of snapshot backfill job, we won't include changes to dispatchers of upstream actors.
137            actor_dispatchers: Default::default(),
138            added_actors,
139            actor_splits,
140            // we assume that when handling snapshot backfill, the cluster must not be paused
141            pause: false,
142            subscriptions_to_add: Default::default(),
143            backfill_nodes_to_pause,
144            actor_cdc_table_snapshot_splits: build_pb_actor_cdc_table_snapshot_splits(
145                info.cdc_table_snapshot_split_assignment.clone(),
146            ),
147        });
148
149        control_stream_manager.add_partial_graph(database_id, Some(job_id));
150        Self::inject_barrier(
151            database_id,
152            job_id,
153            control_stream_manager,
154            &mut barrier_control,
155            &graph_info,
156            Some(&graph_info),
157            initial_barrier_info,
158            Some(actors_to_create),
159            Some(initial_mutation),
160        )?;
161
162        assert!(pending_non_checkpoint_barriers.is_empty());
163
164        Ok(Self {
165            database_id,
166            definition: info.definition.clone(),
167            create_type: info.create_type.into(),
168            job_id,
169            snapshot_backfill_upstream_tables,
170            barrier_control,
171            backfill_epoch,
172            graph_info,
173            status: CreatingStreamingJobStatus::ConsumingSnapshot {
174                prev_epoch_fake_physical_time,
175                pending_upstream_barriers: vec![],
176                version_stats: version_stat.clone(),
177                create_mview_tracker,
178                snapshot_backfill_actors,
179                backfill_epoch,
180                pending_non_checkpoint_barriers,
181            },
182            upstream_lag: GLOBAL_META_METRICS
183                .snapshot_backfill_lag
184                .with_guarded_label_values(&[&format!("{}", job_id)]),
185        })
186    }
187
188    fn resolve_upstream_log_epochs(
189        snapshot_backfill_upstream_tables: &HashSet<TableId>,
190        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
191        committed_epoch: u64,
192        upstream_curr_epoch: u64,
193    ) -> MetaResult<Vec<BarrierInfo>> {
194        // TODO: add sanity check
195        let table_id = snapshot_backfill_upstream_tables.iter().next().unwrap();
196        let mut epochs_iter = upstream_table_log_epochs[table_id].iter();
197        loop {
198            let (_, checkpoint_epoch) = epochs_iter.next().expect("not reach committed epoch yet");
199            if *checkpoint_epoch < committed_epoch {
200                continue;
201            }
202            assert_eq!(*checkpoint_epoch, committed_epoch);
203            break;
204        }
205        let mut ret = vec![];
206        let mut prev_epoch = committed_epoch;
207        let mut pending_non_checkpoint_barriers = vec![];
208        for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
209            for (i, epoch) in non_checkpoint_epochs
210                .iter()
211                .chain([checkpoint_epoch])
212                .enumerate()
213            {
214                assert!(*epoch > prev_epoch);
215                pending_non_checkpoint_barriers.push(prev_epoch);
216                ret.push(BarrierInfo {
217                    prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
218                    curr_epoch: TracedEpoch::new(Epoch(*epoch)),
219                    kind: if i == 0 {
220                        BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
221                    } else {
222                        BarrierKind::Barrier
223                    },
224                });
225                prev_epoch = *epoch;
226            }
227        }
228        ret.push(BarrierInfo {
229            prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
230            curr_epoch: TracedEpoch::new(Epoch(upstream_curr_epoch)),
231            kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
232        });
233        Ok(ret)
234    }
235
236    fn recover_consuming_snapshot(
237        job_id: TableId,
238        definition: &String,
239        snapshot_backfill_upstream_tables: &HashSet<TableId>,
240        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
241        backfill_epoch: u64,
242        committed_epoch: u64,
243        upstream_curr_epoch: u64,
244        stream_job_fragments: StreamJobFragments,
245        version_stat: &HummockVersionStats,
246    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
247        let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
248        let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
249        let mut pending_non_checkpoint_barriers = vec![];
250        let create_mview_tracker = CreateMviewProgressTracker::recover(
251            [(
252                job_id,
253                (
254                    definition.clone(),
255                    &stream_job_fragments,
256                    Default::default(),
257                ),
258            )],
259            version_stat,
260        );
261        let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
262            &mut prev_epoch_fake_physical_time,
263            &mut pending_non_checkpoint_barriers,
264            PbBarrierKind::Initial,
265        );
266        Ok((
267            CreatingStreamingJobStatus::ConsumingSnapshot {
268                prev_epoch_fake_physical_time,
269                pending_upstream_barriers: Self::resolve_upstream_log_epochs(
270                    snapshot_backfill_upstream_tables,
271                    upstream_table_log_epochs,
272                    backfill_epoch,
273                    upstream_curr_epoch,
274                )?,
275                version_stats: version_stat.clone(),
276                create_mview_tracker,
277                snapshot_backfill_actors,
278                backfill_epoch,
279                pending_non_checkpoint_barriers,
280            },
281            barrier_info,
282        ))
283    }
284
285    fn recover_consuming_log_store(
286        snapshot_backfill_upstream_tables: &HashSet<TableId>,
287        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
288        committed_epoch: u64,
289        upstream_curr_epoch: u64,
290        stream_job_fragments: StreamJobFragments,
291    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
292        let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
293        let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
294            snapshot_backfill_upstream_tables,
295            upstream_table_log_epochs,
296            committed_epoch,
297            upstream_curr_epoch,
298        )?;
299        let mut first_barrier = barriers_to_inject.remove(0);
300        assert!(first_barrier.kind.is_checkpoint());
301        first_barrier.kind = BarrierKind::Initial;
302        Ok((
303            CreatingStreamingJobStatus::ConsumingLogStore {
304                log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
305                    snapshot_backfill_actors.into_iter(),
306                    barriers_to_inject
307                        .last()
308                        .map(|info| info.prev_epoch() - committed_epoch)
309                        .unwrap_or(0),
310                ),
311                barriers_to_inject: Some(barriers_to_inject),
312            },
313            first_barrier,
314        ))
315    }
316
317    #[expect(clippy::too_many_arguments)]
318    pub(crate) fn recover(
319        database_id: DatabaseId,
320        job_id: TableId,
321        definition: String,
322        snapshot_backfill_upstream_tables: HashSet<TableId>,
323        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
324        backfill_epoch: u64,
325        committed_epoch: u64,
326        upstream_curr_epoch: u64,
327        graph_info: InflightStreamingJobInfo,
328        stream_job_fragments: StreamJobFragments,
329        version_stat: &HummockVersionStats,
330        new_actors: StreamJobActorsToCreate,
331        initial_mutation: Mutation,
332        control_stream_manager: &mut ControlStreamManager,
333    ) -> MetaResult<Self> {
334        debug!(
335            %job_id,
336            definition,
337            "recovered creating job"
338        );
339        let mut barrier_control =
340            CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, true);
341
342        let (status, first_barrier_info) = if committed_epoch < backfill_epoch {
343            Self::recover_consuming_snapshot(
344                job_id,
345                &definition,
346                &snapshot_backfill_upstream_tables,
347                upstream_table_log_epochs,
348                backfill_epoch,
349                committed_epoch,
350                upstream_curr_epoch,
351                stream_job_fragments,
352                version_stat,
353            )?
354        } else {
355            Self::recover_consuming_log_store(
356                &snapshot_backfill_upstream_tables,
357                upstream_table_log_epochs,
358                committed_epoch,
359                upstream_curr_epoch,
360                stream_job_fragments,
361            )?
362        };
363        control_stream_manager.add_partial_graph(database_id, Some(job_id));
364        Self::inject_barrier(
365            database_id,
366            job_id,
367            control_stream_manager,
368            &mut barrier_control,
369            &graph_info,
370            Some(&graph_info),
371            first_barrier_info,
372            Some(new_actors),
373            Some(initial_mutation),
374        )?;
375        Ok(Self {
376            database_id,
377            job_id,
378            definition,
379            create_type: CreateType::Background,
380            snapshot_backfill_upstream_tables,
381            backfill_epoch,
382            graph_info,
383            barrier_control,
384            status,
385            upstream_lag: GLOBAL_META_METRICS
386                .snapshot_backfill_lag
387                .with_guarded_label_values(&[&format!("{}", job_id)]),
388        })
389    }
390
391    pub(crate) fn is_empty(&self) -> bool {
392        self.barrier_control.is_empty()
393    }
394
395    pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
396        self.barrier_control.is_valid_after_worker_err(worker_id)
397            && (!self.status.is_finishing()
398                || InflightFragmentInfo::contains_worker(
399                    self.graph_info.fragment_infos(),
400                    worker_id,
401                ))
402    }
403
404    pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
405        let progress = match &self.status {
406            CreatingStreamingJobStatus::ConsumingSnapshot {
407                create_mview_tracker,
408                ..
409            } => {
410                if create_mview_tracker.has_pending_finished_jobs() {
411                    "Snapshot finished".to_owned()
412                } else {
413                    let progress = create_mview_tracker
414                        .gen_ddl_progress()
415                        .remove(&self.job_id.table_id)
416                        .expect("should exist");
417                    format!("Snapshot [{}]", progress.progress)
418                }
419            }
420            CreatingStreamingJobStatus::ConsumingLogStore {
421                log_store_progress_tracker,
422                ..
423            } => {
424                format!(
425                    "LogStore [{}]",
426                    log_store_progress_tracker.gen_ddl_progress()
427                )
428            }
429            CreatingStreamingJobStatus::Finishing(_) => {
430                format!(
431                    "Finishing [epoch count: {}]",
432                    self.barrier_control.inflight_barrier_count()
433                )
434            }
435        };
436        DdlProgress {
437            id: self.job_id.table_id as u64,
438            statement: self.definition.clone(),
439            create_type: self.create_type.as_str().to_owned(),
440            progress,
441        }
442    }
443
444    pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
445        if self.status.is_finishing() {
446            None
447        } else {
448            // TODO: when supporting recoverable snapshot backfill, we should use the max epoch that has committed
449            Some(max(
450                self.barrier_control.max_collected_epoch().unwrap_or(0),
451                self.backfill_epoch,
452            ))
453        }
454    }
455
456    fn inject_barrier(
457        database_id: DatabaseId,
458        table_id: TableId,
459        control_stream_manager: &mut ControlStreamManager,
460        barrier_control: &mut CreatingStreamingJobBarrierControl,
461        pre_applied_graph_info: &InflightStreamingJobInfo,
462        applied_graph_info: Option<&InflightStreamingJobInfo>,
463        barrier_info: BarrierInfo,
464        new_actors: Option<StreamJobActorsToCreate>,
465        mutation: Option<Mutation>,
466    ) -> MetaResult<()> {
467        let node_to_collect = control_stream_manager.inject_barrier(
468            database_id,
469            Some(table_id),
470            mutation,
471            &barrier_info,
472            pre_applied_graph_info.fragment_infos(),
473            applied_graph_info
474                .map(|graph_info| graph_info.fragment_infos())
475                .into_iter()
476                .flatten(),
477            new_actors,
478            vec![],
479            vec![],
480        )?;
481        barrier_control.enqueue_epoch(
482            barrier_info.prev_epoch(),
483            node_to_collect,
484            barrier_info.kind.clone(),
485        );
486        Ok(())
487    }
488
489    pub(super) fn on_new_command(
490        &mut self,
491        control_stream_manager: &mut ControlStreamManager,
492        command: Option<&Command>,
493        barrier_info: &BarrierInfo,
494    ) -> MetaResult<()> {
495        let table_id = self.job_id;
496        let start_consume_upstream =
497            if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
498                jobs_to_merge.contains_key(&table_id)
499            } else {
500                false
501            };
502        if start_consume_upstream {
503            info!(
504                table_id = self.job_id.table_id,
505                prev_epoch = barrier_info.prev_epoch(),
506                "start consuming upstream"
507            );
508        }
509        let progress_epoch =
510            if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
511                max(max_collected_epoch, self.backfill_epoch)
512            } else {
513                self.backfill_epoch
514            };
515        self.upstream_lag.set(
516            barrier_info
517                .prev_epoch
518                .value()
519                .0
520                .saturating_sub(progress_epoch) as _,
521        );
522        if start_consume_upstream {
523            self.status.start_consume_upstream(barrier_info);
524            Self::inject_barrier(
525                self.database_id,
526                self.job_id,
527                control_stream_manager,
528                &mut self.barrier_control,
529                &self.graph_info,
530                None,
531                barrier_info.clone(),
532                None,
533                None,
534            )?;
535        } else {
536            for (barrier_to_inject, mutation) in self.status.on_new_upstream_epoch(barrier_info) {
537                Self::inject_barrier(
538                    self.database_id,
539                    self.job_id,
540                    control_stream_manager,
541                    &mut self.barrier_control,
542                    &self.graph_info,
543                    Some(&self.graph_info),
544                    barrier_to_inject,
545                    None,
546                    mutation,
547                )?;
548            }
549        }
550        Ok(())
551    }
552
553    pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
554        self.status.update_progress(&resp.create_mview_progress);
555        self.barrier_control.collect(resp);
556        self.should_merge_to_upstream().is_some()
557    }
558
559    pub(super) fn should_merge_to_upstream(&self) -> Option<&InflightStreamingJobInfo> {
560        if let CreatingStreamingJobStatus::ConsumingLogStore {
561            log_store_progress_tracker,
562            barriers_to_inject,
563        } = &self.status
564            && barriers_to_inject.is_none()
565            && log_store_progress_tracker.is_finished()
566        {
567            Some(&self.graph_info)
568        } else {
569            None
570        }
571    }
572}
573
574pub(super) enum CompleteJobType {
575    /// The first barrier
576    First,
577    Normal,
578    /// The last barrier to complete
579    Finished,
580}
581
582impl CreatingStreamingJobControl {
583    pub(super) fn start_completing(
584        &mut self,
585        min_upstream_inflight_epoch: Option<u64>,
586    ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
587        let (finished_at_epoch, epoch_end_bound) = match &self.status {
588            CreatingStreamingJobStatus::Finishing(finish_at_epoch) => {
589                let epoch_end_bound = min_upstream_inflight_epoch
590                    .map(|upstream_epoch| {
591                        if upstream_epoch < *finish_at_epoch {
592                            Excluded(upstream_epoch)
593                        } else {
594                            Unbounded
595                        }
596                    })
597                    .unwrap_or(Unbounded);
598                (Some(*finish_at_epoch), epoch_end_bound)
599            }
600            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
601            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
602                None,
603                min_upstream_inflight_epoch
604                    .map(Excluded)
605                    .unwrap_or(Unbounded),
606            ),
607        };
608        self.barrier_control.start_completing(epoch_end_bound).map(
609            |(epoch, resps, is_first_commit)| {
610                let status = if let Some(finish_at_epoch) = finished_at_epoch {
611                    assert!(!is_first_commit);
612                    if epoch == finish_at_epoch {
613                        self.barrier_control.ack_completed(epoch);
614                        assert!(self.barrier_control.is_empty());
615                        CompleteJobType::Finished
616                    } else {
617                        CompleteJobType::Normal
618                    }
619                } else if is_first_commit {
620                    CompleteJobType::First
621                } else {
622                    CompleteJobType::Normal
623                };
624                (epoch, resps, status)
625            },
626        )
627    }
628
629    pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
630        self.barrier_control.ack_completed(completed_epoch);
631    }
632
633    pub(super) fn is_finished(&self) -> bool {
634        self.barrier_control.is_empty() && self.status.is_finishing()
635    }
636
637    pub fn is_consuming(&self) -> bool {
638        match &self.status {
639            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
640            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
641            CreatingStreamingJobStatus::Finishing(_) => false,
642        }
643    }
644
645    pub fn state_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
646        self.graph_info.existing_table_ids()
647    }
648
649    pub fn graph_info(&self) -> &InflightStreamingJobInfo {
650        &self.graph_info
651    }
652}