risingwave_meta/barrier/checkpoint/creating_job/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_connector::source::cdc::build_pb_actor_cdc_table_snapshot_splits_with_generation;
29use risingwave_meta_model::{CreateType, WorkerId};
30use risingwave_pb::ddl_service::DdlProgress;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::FragmentId;
33use risingwave_pb::stream_plan::AddMutation;
34use risingwave_pb::stream_plan::barrier::PbBarrierKind;
35use risingwave_pb::stream_plan::barrier_mutation::Mutation;
36use risingwave_pb::stream_service::BarrierCompleteResponse;
37use status::CreatingStreamingJobStatus;
38use tracing::{debug, info};
39
40use crate::MetaResult;
41use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
42use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
43use crate::barrier::edge_builder::FragmentEdgeBuildResult;
44use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
45use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
46use crate::barrier::rpc::ControlStreamManager;
47use crate::barrier::{
48    BackfillOrderState, BarrierKind, Command, CreateStreamingJobCommandInfo, TracedEpoch,
49};
50use crate::controller::fragment::InflightFragmentInfo;
51use crate::model::StreamJobActorsToCreate;
52use crate::rpc::metrics::GLOBAL_META_METRICS;
53use crate::stream::build_actor_connector_splits;
54
55#[derive(Debug)]
56pub(crate) struct CreatingStreamingJobControl {
57    database_id: DatabaseId,
58    pub(super) job_id: JobId,
59    definition: String,
60    create_type: CreateType,
61    pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
62    backfill_epoch: u64,
63
64    fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
65
66    barrier_control: CreatingStreamingJobBarrierControl,
67    status: CreatingStreamingJobStatus,
68
69    upstream_lag: LabelGuardedIntGauge,
70}
71
72impl CreatingStreamingJobControl {
73    pub(super) fn new(
74        info: &CreateStreamingJobCommandInfo,
75        snapshot_backfill_upstream_tables: HashSet<TableId>,
76        backfill_epoch: u64,
77        version_stat: &HummockVersionStats,
78        control_stream_manager: &mut ControlStreamManager,
79        edges: &mut FragmentEdgeBuildResult,
80    ) -> MetaResult<Self> {
81        let job_id = info.stream_job_fragments.stream_job_id();
82        let database_id = info.streaming_job.database_id();
83        debug!(
84            %job_id,
85            definition = info.definition,
86            "new creating job"
87        );
88        let fragment_infos = info
89            .stream_job_fragments
90            .new_fragment_info(&info.init_split_assignment)
91            .collect();
92        let snapshot_backfill_actors =
93            InflightStreamingJobInfo::snapshot_backfill_actor_ids(&fragment_infos).collect();
94        let backfill_nodes_to_pause =
95            get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
96                .into_iter()
97                .collect();
98        let backfill_order_state = BackfillOrderState::new(
99            &info.fragment_backfill_ordering,
100            &info.stream_job_fragments,
101            info.locality_fragment_state_table_mapping.clone(),
102        );
103        let create_mview_tracker = CreateMviewProgressTracker::recover(
104            job_id,
105            info.definition.clone(),
106            &fragment_infos,
107            backfill_order_state,
108            version_stat,
109        );
110
111        let actors_to_create =
112            edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create().map(
113                |(fragment_id, node, actors)| {
114                    (
115                        fragment_id,
116                        node,
117                        actors,
118                        [], // no subscribers for newly creating job
119                    )
120                },
121            ));
122
123        let mut barrier_control =
124            CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, false);
125
126        let mut prev_epoch_fake_physical_time = 0;
127        let mut pending_non_checkpoint_barriers = vec![];
128
129        let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
130            &mut prev_epoch_fake_physical_time,
131            &mut pending_non_checkpoint_barriers,
132            PbBarrierKind::Checkpoint,
133        );
134
135        let added_actors = info.stream_job_fragments.actor_ids().collect();
136        let actor_splits = info
137            .init_split_assignment
138            .values()
139            .flat_map(build_actor_connector_splits)
140            .collect();
141
142        let initial_mutation = Mutation::Add(AddMutation {
143            // for mutation of snapshot backfill job, we won't include changes to dispatchers of upstream actors.
144            actor_dispatchers: Default::default(),
145            added_actors,
146            actor_splits,
147            // we assume that when handling snapshot backfill, the cluster must not be paused
148            pause: false,
149            subscriptions_to_add: Default::default(),
150            backfill_nodes_to_pause,
151            actor_cdc_table_snapshot_splits:
152                build_pb_actor_cdc_table_snapshot_splits_with_generation(
153                    info.cdc_table_snapshot_split_assignment.clone(),
154                )
155                .into(),
156            new_upstream_sinks: Default::default(),
157        });
158
159        control_stream_manager.add_partial_graph(database_id, Some(job_id));
160        Self::inject_barrier(
161            database_id,
162            job_id,
163            control_stream_manager,
164            &mut barrier_control,
165            &fragment_infos,
166            Some(&fragment_infos),
167            initial_barrier_info,
168            Some(actors_to_create),
169            Some(initial_mutation),
170        )?;
171
172        assert!(pending_non_checkpoint_barriers.is_empty());
173
174        Ok(Self {
175            database_id,
176            definition: info.definition.clone(),
177            create_type: info.create_type.into(),
178            job_id,
179            snapshot_backfill_upstream_tables,
180            barrier_control,
181            backfill_epoch,
182            fragment_infos,
183            status: CreatingStreamingJobStatus::ConsumingSnapshot {
184                prev_epoch_fake_physical_time,
185                pending_upstream_barriers: vec![],
186                version_stats: version_stat.clone(),
187                create_mview_tracker,
188                snapshot_backfill_actors,
189                backfill_epoch,
190                pending_non_checkpoint_barriers,
191            },
192            upstream_lag: GLOBAL_META_METRICS
193                .snapshot_backfill_lag
194                .with_guarded_label_values(&[&format!("{}", job_id)]),
195        })
196    }
197
198    fn resolve_upstream_log_epochs(
199        snapshot_backfill_upstream_tables: &HashSet<TableId>,
200        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
201        committed_epoch: u64,
202        upstream_curr_epoch: u64,
203    ) -> MetaResult<Vec<BarrierInfo>> {
204        // TODO: add sanity check
205        let table_id = snapshot_backfill_upstream_tables.iter().next().unwrap();
206        let mut epochs_iter = upstream_table_log_epochs[table_id].iter();
207        loop {
208            let (_, checkpoint_epoch) = epochs_iter.next().expect("not reach committed epoch yet");
209            if *checkpoint_epoch < committed_epoch {
210                continue;
211            }
212            assert_eq!(*checkpoint_epoch, committed_epoch);
213            break;
214        }
215        let mut ret = vec![];
216        let mut prev_epoch = committed_epoch;
217        let mut pending_non_checkpoint_barriers = vec![];
218        for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
219            for (i, epoch) in non_checkpoint_epochs
220                .iter()
221                .chain([checkpoint_epoch])
222                .enumerate()
223            {
224                assert!(*epoch > prev_epoch);
225                pending_non_checkpoint_barriers.push(prev_epoch);
226                ret.push(BarrierInfo {
227                    prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
228                    curr_epoch: TracedEpoch::new(Epoch(*epoch)),
229                    kind: if i == 0 {
230                        BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
231                    } else {
232                        BarrierKind::Barrier
233                    },
234                });
235                prev_epoch = *epoch;
236            }
237        }
238        ret.push(BarrierInfo {
239            prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
240            curr_epoch: TracedEpoch::new(Epoch(upstream_curr_epoch)),
241            kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
242        });
243        Ok(ret)
244    }
245
246    fn recover_consuming_snapshot(
247        job_id: JobId,
248        definition: &String,
249        snapshot_backfill_upstream_tables: &HashSet<TableId>,
250        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
251        backfill_epoch: u64,
252        committed_epoch: u64,
253        upstream_curr_epoch: u64,
254        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
255        version_stat: &HummockVersionStats,
256    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
257        let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
258        let mut pending_non_checkpoint_barriers = vec![];
259        let create_mview_tracker = CreateMviewProgressTracker::recover(
260            job_id,
261            definition.clone(),
262            fragment_infos,
263            Default::default(),
264            version_stat,
265        );
266        let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
267            &mut prev_epoch_fake_physical_time,
268            &mut pending_non_checkpoint_barriers,
269            PbBarrierKind::Initial,
270        );
271        Ok((
272            CreatingStreamingJobStatus::ConsumingSnapshot {
273                prev_epoch_fake_physical_time,
274                pending_upstream_barriers: Self::resolve_upstream_log_epochs(
275                    snapshot_backfill_upstream_tables,
276                    upstream_table_log_epochs,
277                    backfill_epoch,
278                    upstream_curr_epoch,
279                )?,
280                version_stats: version_stat.clone(),
281                create_mview_tracker,
282                snapshot_backfill_actors: InflightStreamingJobInfo::snapshot_backfill_actor_ids(
283                    fragment_infos,
284                )
285                .collect(),
286                backfill_epoch,
287                pending_non_checkpoint_barriers,
288            },
289            barrier_info,
290        ))
291    }
292
293    fn recover_consuming_log_store(
294        job_id: JobId,
295        snapshot_backfill_upstream_tables: &HashSet<TableId>,
296        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
297        committed_epoch: u64,
298        upstream_curr_epoch: u64,
299        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
300    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
301        let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
302            snapshot_backfill_upstream_tables,
303            upstream_table_log_epochs,
304            committed_epoch,
305            upstream_curr_epoch,
306        )?;
307        let mut first_barrier = barriers_to_inject.remove(0);
308        assert!(first_barrier.kind.is_checkpoint());
309        first_barrier.kind = BarrierKind::Initial;
310        Ok((
311            CreatingStreamingJobStatus::ConsumingLogStore {
312                tracking_job: TrackingJob::recovered(job_id, fragment_infos),
313                log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
314                    InflightStreamingJobInfo::snapshot_backfill_actor_ids(fragment_infos),
315                    barriers_to_inject
316                        .last()
317                        .map(|info| info.prev_epoch() - committed_epoch)
318                        .unwrap_or(0),
319                ),
320                barriers_to_inject: Some(barriers_to_inject),
321            },
322            first_barrier,
323        ))
324    }
325
326    #[expect(clippy::too_many_arguments)]
327    pub(crate) fn recover(
328        database_id: DatabaseId,
329        job_id: JobId,
330        definition: String,
331        snapshot_backfill_upstream_tables: HashSet<TableId>,
332        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
333        backfill_epoch: u64,
334        committed_epoch: u64,
335        upstream_curr_epoch: u64,
336        fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
337        version_stat: &HummockVersionStats,
338        new_actors: StreamJobActorsToCreate,
339        initial_mutation: Mutation,
340        control_stream_manager: &mut ControlStreamManager,
341    ) -> MetaResult<Self> {
342        debug!(
343            %job_id,
344            definition,
345            "recovered creating job"
346        );
347        let mut barrier_control =
348            CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, true);
349
350        let (status, first_barrier_info) = if committed_epoch < backfill_epoch {
351            Self::recover_consuming_snapshot(
352                job_id,
353                &definition,
354                &snapshot_backfill_upstream_tables,
355                upstream_table_log_epochs,
356                backfill_epoch,
357                committed_epoch,
358                upstream_curr_epoch,
359                &fragment_infos,
360                version_stat,
361            )?
362        } else {
363            Self::recover_consuming_log_store(
364                job_id,
365                &snapshot_backfill_upstream_tables,
366                upstream_table_log_epochs,
367                committed_epoch,
368                upstream_curr_epoch,
369                &fragment_infos,
370            )?
371        };
372        control_stream_manager.add_partial_graph(database_id, Some(job_id));
373        Self::inject_barrier(
374            database_id,
375            job_id,
376            control_stream_manager,
377            &mut barrier_control,
378            &fragment_infos,
379            Some(&fragment_infos),
380            first_barrier_info,
381            Some(new_actors),
382            Some(initial_mutation),
383        )?;
384        Ok(Self {
385            database_id,
386            job_id,
387            definition,
388            create_type: CreateType::Background,
389            snapshot_backfill_upstream_tables,
390            backfill_epoch,
391            fragment_infos,
392            barrier_control,
393            status,
394            upstream_lag: GLOBAL_META_METRICS
395                .snapshot_backfill_lag
396                .with_guarded_label_values(&[&format!("{}", job_id)]),
397        })
398    }
399
400    pub(crate) fn is_empty(&self) -> bool {
401        self.barrier_control.is_empty()
402    }
403
404    pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
405        self.barrier_control.is_valid_after_worker_err(worker_id)
406            && (!self.status.is_finishing()
407                || InflightFragmentInfo::contains_worker(self.fragment_infos.values(), worker_id))
408    }
409
410    pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
411        let progress = match &self.status {
412            CreatingStreamingJobStatus::ConsumingSnapshot {
413                create_mview_tracker,
414                ..
415            } => {
416                if create_mview_tracker.is_finished() {
417                    "Snapshot finished".to_owned()
418                } else {
419                    let progress = create_mview_tracker.gen_ddl_progress();
420                    format!("Snapshot [{}]", progress.progress)
421                }
422            }
423            CreatingStreamingJobStatus::ConsumingLogStore {
424                log_store_progress_tracker,
425                ..
426            } => {
427                format!(
428                    "LogStore [{}]",
429                    log_store_progress_tracker.gen_ddl_progress()
430                )
431            }
432            CreatingStreamingJobStatus::Finishing(..) => {
433                format!(
434                    "Finishing [epoch count: {}]",
435                    self.barrier_control.inflight_barrier_count()
436                )
437            }
438            CreatingStreamingJobStatus::PlaceHolder => {
439                unreachable!()
440            }
441        };
442        DdlProgress {
443            id: self.job_id.as_raw_id() as u64,
444            statement: self.definition.clone(),
445            create_type: self.create_type.as_str().to_owned(),
446            progress,
447        }
448    }
449
450    pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
451        if self.status.is_finishing() {
452            None
453        } else {
454            // TODO: when supporting recoverable snapshot backfill, we should use the max epoch that has committed
455            Some(max(
456                self.barrier_control.max_collected_epoch().unwrap_or(0),
457                self.backfill_epoch,
458            ))
459        }
460    }
461
462    fn inject_barrier(
463        database_id: DatabaseId,
464        job_id: JobId,
465        control_stream_manager: &mut ControlStreamManager,
466        barrier_control: &mut CreatingStreamingJobBarrierControl,
467        pre_applied_graph_info: &HashMap<FragmentId, InflightFragmentInfo>,
468        applied_graph_info: Option<&HashMap<FragmentId, InflightFragmentInfo>>,
469        barrier_info: BarrierInfo,
470        new_actors: Option<StreamJobActorsToCreate>,
471        mutation: Option<Mutation>,
472    ) -> MetaResult<()> {
473        let node_to_collect = control_stream_manager.inject_barrier(
474            database_id,
475            Some(job_id),
476            mutation,
477            &barrier_info,
478            pre_applied_graph_info.values(),
479            applied_graph_info
480                .map(|graph_info| graph_info.values())
481                .into_iter()
482                .flatten(),
483            new_actors,
484        )?;
485        barrier_control.enqueue_epoch(
486            barrier_info.prev_epoch(),
487            node_to_collect,
488            barrier_info.kind.clone(),
489        );
490        Ok(())
491    }
492
493    pub(super) fn on_new_command(
494        &mut self,
495        control_stream_manager: &mut ControlStreamManager,
496        command: Option<&Command>,
497        barrier_info: &BarrierInfo,
498    ) -> MetaResult<()> {
499        let table_id = self.job_id;
500        let start_consume_upstream =
501            if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
502                jobs_to_merge.contains_key(&table_id)
503            } else {
504                false
505            };
506        if start_consume_upstream {
507            info!(
508                job_id = %self.job_id,
509                prev_epoch = barrier_info.prev_epoch(),
510                "start consuming upstream"
511            );
512        }
513        let progress_epoch =
514            if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
515                max(max_collected_epoch, self.backfill_epoch)
516            } else {
517                self.backfill_epoch
518            };
519        self.upstream_lag.set(
520            barrier_info
521                .prev_epoch
522                .value()
523                .0
524                .saturating_sub(progress_epoch) as _,
525        );
526        if start_consume_upstream {
527            self.status.start_consume_upstream(barrier_info);
528            Self::inject_barrier(
529                self.database_id,
530                self.job_id,
531                control_stream_manager,
532                &mut self.barrier_control,
533                &self.fragment_infos,
534                None,
535                barrier_info.clone(),
536                None,
537                None,
538            )?;
539        } else {
540            for (barrier_to_inject, mutation) in self.status.on_new_upstream_epoch(barrier_info) {
541                Self::inject_barrier(
542                    self.database_id,
543                    self.job_id,
544                    control_stream_manager,
545                    &mut self.barrier_control,
546                    &self.fragment_infos,
547                    Some(&self.fragment_infos),
548                    barrier_to_inject,
549                    None,
550                    mutation,
551                )?;
552            }
553        }
554        Ok(())
555    }
556
557    pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
558        self.status.update_progress(&resp.create_mview_progress);
559        self.barrier_control.collect(resp);
560        self.should_merge_to_upstream().is_some()
561    }
562
563    pub(super) fn should_merge_to_upstream(
564        &self,
565    ) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
566        if let CreatingStreamingJobStatus::ConsumingLogStore {
567            log_store_progress_tracker,
568            barriers_to_inject,
569            ..
570        } = &self.status
571            && barriers_to_inject.is_none()
572            && log_store_progress_tracker.is_finished()
573        {
574            Some(&self.fragment_infos)
575        } else {
576            None
577        }
578    }
579}
580
581pub(super) enum CompleteJobType {
582    /// The first barrier
583    First,
584    Normal,
585    /// The last barrier to complete
586    Finished,
587}
588
589impl CreatingStreamingJobControl {
590    pub(super) fn start_completing(
591        &mut self,
592        min_upstream_inflight_epoch: Option<u64>,
593    ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
594        let (finished_at_epoch, epoch_end_bound) = match &self.status {
595            CreatingStreamingJobStatus::Finishing(finish_at_epoch, _) => {
596                let epoch_end_bound = min_upstream_inflight_epoch
597                    .map(|upstream_epoch| {
598                        if upstream_epoch < *finish_at_epoch {
599                            Excluded(upstream_epoch)
600                        } else {
601                            Unbounded
602                        }
603                    })
604                    .unwrap_or(Unbounded);
605                (Some(*finish_at_epoch), epoch_end_bound)
606            }
607            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
608            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
609                None,
610                min_upstream_inflight_epoch
611                    .map(Excluded)
612                    .unwrap_or(Unbounded),
613            ),
614            CreatingStreamingJobStatus::PlaceHolder => {
615                unreachable!()
616            }
617        };
618        self.barrier_control.start_completing(epoch_end_bound).map(
619            |(epoch, resps, is_first_commit)| {
620                let status = if let Some(finish_at_epoch) = finished_at_epoch {
621                    assert!(!is_first_commit);
622                    if epoch == finish_at_epoch {
623                        self.barrier_control.ack_completed(epoch);
624                        assert!(self.barrier_control.is_empty());
625                        CompleteJobType::Finished
626                    } else {
627                        CompleteJobType::Normal
628                    }
629                } else if is_first_commit {
630                    CompleteJobType::First
631                } else {
632                    CompleteJobType::Normal
633                };
634                (epoch, resps, status)
635            },
636        )
637    }
638
639    pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
640        self.barrier_control.ack_completed(completed_epoch);
641    }
642
643    pub fn is_consuming(&self) -> bool {
644        match &self.status {
645            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
646            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
647            CreatingStreamingJobStatus::Finishing(..) => false,
648            CreatingStreamingJobStatus::PlaceHolder => {
649                unreachable!()
650            }
651        }
652    }
653
654    pub fn state_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
655        InflightFragmentInfo::existing_table_ids(self.fragment_infos.values())
656    }
657
658    pub fn graph_info(&self) -> &HashMap<FragmentId, InflightFragmentInfo> {
659        &self.fragment_infos
660    }
661
662    pub fn into_tracking_job(self) -> TrackingJob {
663        assert!(self.barrier_control.is_empty());
664        match self.status {
665            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
666            | CreatingStreamingJobStatus::ConsumingLogStore { .. }
667            | CreatingStreamingJobStatus::PlaceHolder => {
668                unreachable!("expect finish")
669            }
670            CreatingStreamingJobStatus::Finishing(_, tracking_job) => tracking_job,
671        }
672    }
673}