risingwave_meta/barrier/checkpoint/creating_job/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::LabelGuardedIntGauge;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::cdc::build_pb_actor_cdc_table_snapshot_splits_with_generation;
28use risingwave_meta_model::{CreateType, WorkerId};
29use risingwave_pb::ddl_service::DdlProgress;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::stream_plan::AddMutation;
32use risingwave_pb::stream_plan::barrier::PbBarrierKind;
33use risingwave_pb::stream_plan::barrier_mutation::Mutation;
34use risingwave_pb::stream_service::BarrierCompleteResponse;
35use status::CreatingStreamingJobStatus;
36use tracing::{debug, info};
37
38use crate::MetaResult;
39use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
40use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
41use crate::barrier::edge_builder::FragmentEdgeBuildResult;
42use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
43use crate::barrier::progress::CreateMviewProgressTracker;
44use crate::barrier::rpc::ControlStreamManager;
45use crate::barrier::{
46    BackfillOrderState, BarrierKind, Command, CreateStreamingJobCommandInfo, TracedEpoch,
47};
48use crate::controller::fragment::InflightFragmentInfo;
49use crate::model::{StreamJobActorsToCreate, StreamJobFragments};
50use crate::rpc::metrics::GLOBAL_META_METRICS;
51use crate::stream::build_actor_connector_splits;
52
53#[derive(Debug)]
54pub(crate) struct CreatingStreamingJobControl {
55    database_id: DatabaseId,
56    pub(super) job_id: TableId,
57    definition: String,
58    create_type: CreateType,
59    pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
60    backfill_epoch: u64,
61
62    graph_info: InflightStreamingJobInfo,
63
64    barrier_control: CreatingStreamingJobBarrierControl,
65    status: CreatingStreamingJobStatus,
66
67    upstream_lag: LabelGuardedIntGauge,
68}
69
70impl CreatingStreamingJobControl {
71    pub(super) fn new(
72        info: &CreateStreamingJobCommandInfo,
73        snapshot_backfill_upstream_tables: HashSet<TableId>,
74        backfill_epoch: u64,
75        version_stat: &HummockVersionStats,
76        control_stream_manager: &mut ControlStreamManager,
77        edges: &mut FragmentEdgeBuildResult,
78    ) -> MetaResult<Self> {
79        let job_id = info.stream_job_fragments.stream_job_id();
80        let database_id = DatabaseId::new(info.streaming_job.database_id());
81        debug!(
82            %job_id,
83            definition = info.definition,
84            "new creating job"
85        );
86        let snapshot_backfill_actors = info.stream_job_fragments.snapshot_backfill_actor_ids();
87        let backfill_nodes_to_pause =
88            get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
89                .into_iter()
90                .collect();
91        let backfill_order_state = BackfillOrderState::new(
92            info.fragment_backfill_ordering.clone(),
93            &info.stream_job_fragments,
94        );
95        let create_mview_tracker = CreateMviewProgressTracker::recover(
96            [(
97                job_id,
98                (
99                    info.definition.clone(),
100                    &*info.stream_job_fragments,
101                    backfill_order_state,
102                ),
103            )],
104            version_stat,
105        );
106
107        let fragment_infos: HashMap<_, _> = info
108            .stream_job_fragments
109            .new_fragment_info(&info.init_split_assignment)
110            .collect();
111
112        let actors_to_create =
113            edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create());
114
115        let graph_info = InflightStreamingJobInfo {
116            job_id,
117            fragment_infos,
118        };
119
120        let mut barrier_control =
121            CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, false);
122
123        let mut prev_epoch_fake_physical_time = 0;
124        let mut pending_non_checkpoint_barriers = vec![];
125
126        let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
127            &mut prev_epoch_fake_physical_time,
128            &mut pending_non_checkpoint_barriers,
129            PbBarrierKind::Checkpoint,
130        );
131
132        let added_actors = info.stream_job_fragments.actor_ids();
133        let actor_splits = info
134            .init_split_assignment
135            .values()
136            .flat_map(build_actor_connector_splits)
137            .collect();
138
139        let initial_mutation = Mutation::Add(AddMutation {
140            // for mutation of snapshot backfill job, we won't include changes to dispatchers of upstream actors.
141            actor_dispatchers: Default::default(),
142            added_actors,
143            actor_splits,
144            // we assume that when handling snapshot backfill, the cluster must not be paused
145            pause: false,
146            subscriptions_to_add: Default::default(),
147            backfill_nodes_to_pause,
148            actor_cdc_table_snapshot_splits:
149                build_pb_actor_cdc_table_snapshot_splits_with_generation(
150                    info.cdc_table_snapshot_split_assignment.clone(),
151                )
152                .into(),
153            new_upstream_sinks: Default::default(),
154        });
155
156        control_stream_manager.add_partial_graph(database_id, Some(job_id));
157        Self::inject_barrier(
158            database_id,
159            job_id,
160            control_stream_manager,
161            &mut barrier_control,
162            &graph_info,
163            Some(&graph_info),
164            initial_barrier_info,
165            Some(actors_to_create),
166            Some(initial_mutation),
167        )?;
168
169        assert!(pending_non_checkpoint_barriers.is_empty());
170
171        Ok(Self {
172            database_id,
173            definition: info.definition.clone(),
174            create_type: info.create_type.into(),
175            job_id,
176            snapshot_backfill_upstream_tables,
177            barrier_control,
178            backfill_epoch,
179            graph_info,
180            status: CreatingStreamingJobStatus::ConsumingSnapshot {
181                prev_epoch_fake_physical_time,
182                pending_upstream_barriers: vec![],
183                version_stats: version_stat.clone(),
184                create_mview_tracker,
185                snapshot_backfill_actors,
186                backfill_epoch,
187                pending_non_checkpoint_barriers,
188            },
189            upstream_lag: GLOBAL_META_METRICS
190                .snapshot_backfill_lag
191                .with_guarded_label_values(&[&format!("{}", job_id)]),
192        })
193    }
194
195    fn resolve_upstream_log_epochs(
196        snapshot_backfill_upstream_tables: &HashSet<TableId>,
197        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
198        committed_epoch: u64,
199        upstream_curr_epoch: u64,
200    ) -> MetaResult<Vec<BarrierInfo>> {
201        // TODO: add sanity check
202        let table_id = snapshot_backfill_upstream_tables.iter().next().unwrap();
203        let mut epochs_iter = upstream_table_log_epochs[table_id].iter();
204        loop {
205            let (_, checkpoint_epoch) = epochs_iter.next().expect("not reach committed epoch yet");
206            if *checkpoint_epoch < committed_epoch {
207                continue;
208            }
209            assert_eq!(*checkpoint_epoch, committed_epoch);
210            break;
211        }
212        let mut ret = vec![];
213        let mut prev_epoch = committed_epoch;
214        let mut pending_non_checkpoint_barriers = vec![];
215        for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
216            for (i, epoch) in non_checkpoint_epochs
217                .iter()
218                .chain([checkpoint_epoch])
219                .enumerate()
220            {
221                assert!(*epoch > prev_epoch);
222                pending_non_checkpoint_barriers.push(prev_epoch);
223                ret.push(BarrierInfo {
224                    prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
225                    curr_epoch: TracedEpoch::new(Epoch(*epoch)),
226                    kind: if i == 0 {
227                        BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
228                    } else {
229                        BarrierKind::Barrier
230                    },
231                });
232                prev_epoch = *epoch;
233            }
234        }
235        ret.push(BarrierInfo {
236            prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
237            curr_epoch: TracedEpoch::new(Epoch(upstream_curr_epoch)),
238            kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
239        });
240        Ok(ret)
241    }
242
243    fn recover_consuming_snapshot(
244        job_id: TableId,
245        definition: &String,
246        snapshot_backfill_upstream_tables: &HashSet<TableId>,
247        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
248        backfill_epoch: u64,
249        committed_epoch: u64,
250        upstream_curr_epoch: u64,
251        stream_job_fragments: StreamJobFragments,
252        version_stat: &HummockVersionStats,
253    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
254        let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
255        let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
256        let mut pending_non_checkpoint_barriers = vec![];
257        let create_mview_tracker = CreateMviewProgressTracker::recover(
258            [(
259                job_id,
260                (
261                    definition.clone(),
262                    &stream_job_fragments,
263                    Default::default(),
264                ),
265            )],
266            version_stat,
267        );
268        let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
269            &mut prev_epoch_fake_physical_time,
270            &mut pending_non_checkpoint_barriers,
271            PbBarrierKind::Initial,
272        );
273        Ok((
274            CreatingStreamingJobStatus::ConsumingSnapshot {
275                prev_epoch_fake_physical_time,
276                pending_upstream_barriers: Self::resolve_upstream_log_epochs(
277                    snapshot_backfill_upstream_tables,
278                    upstream_table_log_epochs,
279                    backfill_epoch,
280                    upstream_curr_epoch,
281                )?,
282                version_stats: version_stat.clone(),
283                create_mview_tracker,
284                snapshot_backfill_actors,
285                backfill_epoch,
286                pending_non_checkpoint_barriers,
287            },
288            barrier_info,
289        ))
290    }
291
292    fn recover_consuming_log_store(
293        snapshot_backfill_upstream_tables: &HashSet<TableId>,
294        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
295        committed_epoch: u64,
296        upstream_curr_epoch: u64,
297        stream_job_fragments: StreamJobFragments,
298    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
299        let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
300        let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
301            snapshot_backfill_upstream_tables,
302            upstream_table_log_epochs,
303            committed_epoch,
304            upstream_curr_epoch,
305        )?;
306        let mut first_barrier = barriers_to_inject.remove(0);
307        assert!(first_barrier.kind.is_checkpoint());
308        first_barrier.kind = BarrierKind::Initial;
309        Ok((
310            CreatingStreamingJobStatus::ConsumingLogStore {
311                log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
312                    snapshot_backfill_actors.into_iter(),
313                    barriers_to_inject
314                        .last()
315                        .map(|info| info.prev_epoch() - committed_epoch)
316                        .unwrap_or(0),
317                ),
318                barriers_to_inject: Some(barriers_to_inject),
319            },
320            first_barrier,
321        ))
322    }
323
324    #[expect(clippy::too_many_arguments)]
325    pub(crate) fn recover(
326        database_id: DatabaseId,
327        job_id: TableId,
328        definition: String,
329        snapshot_backfill_upstream_tables: HashSet<TableId>,
330        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
331        backfill_epoch: u64,
332        committed_epoch: u64,
333        upstream_curr_epoch: u64,
334        graph_info: InflightStreamingJobInfo,
335        stream_job_fragments: StreamJobFragments,
336        version_stat: &HummockVersionStats,
337        new_actors: StreamJobActorsToCreate,
338        initial_mutation: Mutation,
339        control_stream_manager: &mut ControlStreamManager,
340    ) -> MetaResult<Self> {
341        debug!(
342            %job_id,
343            definition,
344            "recovered creating job"
345        );
346        let mut barrier_control =
347            CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, true);
348
349        let (status, first_barrier_info) = if committed_epoch < backfill_epoch {
350            Self::recover_consuming_snapshot(
351                job_id,
352                &definition,
353                &snapshot_backfill_upstream_tables,
354                upstream_table_log_epochs,
355                backfill_epoch,
356                committed_epoch,
357                upstream_curr_epoch,
358                stream_job_fragments,
359                version_stat,
360            )?
361        } else {
362            Self::recover_consuming_log_store(
363                &snapshot_backfill_upstream_tables,
364                upstream_table_log_epochs,
365                committed_epoch,
366                upstream_curr_epoch,
367                stream_job_fragments,
368            )?
369        };
370        control_stream_manager.add_partial_graph(database_id, Some(job_id));
371        Self::inject_barrier(
372            database_id,
373            job_id,
374            control_stream_manager,
375            &mut barrier_control,
376            &graph_info,
377            Some(&graph_info),
378            first_barrier_info,
379            Some(new_actors),
380            Some(initial_mutation),
381        )?;
382        Ok(Self {
383            database_id,
384            job_id,
385            definition,
386            create_type: CreateType::Background,
387            snapshot_backfill_upstream_tables,
388            backfill_epoch,
389            graph_info,
390            barrier_control,
391            status,
392            upstream_lag: GLOBAL_META_METRICS
393                .snapshot_backfill_lag
394                .with_guarded_label_values(&[&format!("{}", job_id)]),
395        })
396    }
397
398    pub(crate) fn is_empty(&self) -> bool {
399        self.barrier_control.is_empty()
400    }
401
402    pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
403        self.barrier_control.is_valid_after_worker_err(worker_id)
404            && (!self.status.is_finishing()
405                || InflightFragmentInfo::contains_worker(
406                    self.graph_info.fragment_infos(),
407                    worker_id,
408                ))
409    }
410
411    pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
412        let progress = match &self.status {
413            CreatingStreamingJobStatus::ConsumingSnapshot {
414                create_mview_tracker,
415                ..
416            } => {
417                if create_mview_tracker.has_pending_finished_jobs() {
418                    "Snapshot finished".to_owned()
419                } else {
420                    let progress = create_mview_tracker
421                        .gen_ddl_progress()
422                        .remove(&self.job_id.table_id)
423                        .expect("should exist");
424                    format!("Snapshot [{}]", progress.progress)
425                }
426            }
427            CreatingStreamingJobStatus::ConsumingLogStore {
428                log_store_progress_tracker,
429                ..
430            } => {
431                format!(
432                    "LogStore [{}]",
433                    log_store_progress_tracker.gen_ddl_progress()
434                )
435            }
436            CreatingStreamingJobStatus::Finishing(_) => {
437                format!(
438                    "Finishing [epoch count: {}]",
439                    self.barrier_control.inflight_barrier_count()
440                )
441            }
442        };
443        DdlProgress {
444            id: self.job_id.table_id as u64,
445            statement: self.definition.clone(),
446            create_type: self.create_type.as_str().to_owned(),
447            progress,
448        }
449    }
450
451    pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
452        if self.status.is_finishing() {
453            None
454        } else {
455            // TODO: when supporting recoverable snapshot backfill, we should use the max epoch that has committed
456            Some(max(
457                self.barrier_control.max_collected_epoch().unwrap_or(0),
458                self.backfill_epoch,
459            ))
460        }
461    }
462
463    fn inject_barrier(
464        database_id: DatabaseId,
465        table_id: TableId,
466        control_stream_manager: &mut ControlStreamManager,
467        barrier_control: &mut CreatingStreamingJobBarrierControl,
468        pre_applied_graph_info: &InflightStreamingJobInfo,
469        applied_graph_info: Option<&InflightStreamingJobInfo>,
470        barrier_info: BarrierInfo,
471        new_actors: Option<StreamJobActorsToCreate>,
472        mutation: Option<Mutation>,
473    ) -> MetaResult<()> {
474        let node_to_collect = control_stream_manager.inject_barrier(
475            database_id,
476            Some(table_id),
477            mutation,
478            &barrier_info,
479            pre_applied_graph_info.fragment_infos(),
480            applied_graph_info
481                .map(|graph_info| graph_info.fragment_infos())
482                .into_iter()
483                .flatten(),
484            new_actors,
485            vec![],
486            vec![],
487        )?;
488        barrier_control.enqueue_epoch(
489            barrier_info.prev_epoch(),
490            node_to_collect,
491            barrier_info.kind.clone(),
492        );
493        Ok(())
494    }
495
496    pub(super) fn on_new_command(
497        &mut self,
498        control_stream_manager: &mut ControlStreamManager,
499        command: Option<&Command>,
500        barrier_info: &BarrierInfo,
501    ) -> MetaResult<()> {
502        let table_id = self.job_id;
503        let start_consume_upstream =
504            if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
505                jobs_to_merge.contains_key(&table_id)
506            } else {
507                false
508            };
509        if start_consume_upstream {
510            info!(
511                table_id = self.job_id.table_id,
512                prev_epoch = barrier_info.prev_epoch(),
513                "start consuming upstream"
514            );
515        }
516        let progress_epoch =
517            if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
518                max(max_collected_epoch, self.backfill_epoch)
519            } else {
520                self.backfill_epoch
521            };
522        self.upstream_lag.set(
523            barrier_info
524                .prev_epoch
525                .value()
526                .0
527                .saturating_sub(progress_epoch) as _,
528        );
529        if start_consume_upstream {
530            self.status.start_consume_upstream(barrier_info);
531            Self::inject_barrier(
532                self.database_id,
533                self.job_id,
534                control_stream_manager,
535                &mut self.barrier_control,
536                &self.graph_info,
537                None,
538                barrier_info.clone(),
539                None,
540                None,
541            )?;
542        } else {
543            for (barrier_to_inject, mutation) in self.status.on_new_upstream_epoch(barrier_info) {
544                Self::inject_barrier(
545                    self.database_id,
546                    self.job_id,
547                    control_stream_manager,
548                    &mut self.barrier_control,
549                    &self.graph_info,
550                    Some(&self.graph_info),
551                    barrier_to_inject,
552                    None,
553                    mutation,
554                )?;
555            }
556        }
557        Ok(())
558    }
559
560    pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
561        self.status.update_progress(&resp.create_mview_progress);
562        self.barrier_control.collect(resp);
563        self.should_merge_to_upstream().is_some()
564    }
565
566    pub(super) fn should_merge_to_upstream(&self) -> Option<&InflightStreamingJobInfo> {
567        if let CreatingStreamingJobStatus::ConsumingLogStore {
568            log_store_progress_tracker,
569            barriers_to_inject,
570        } = &self.status
571            && barriers_to_inject.is_none()
572            && log_store_progress_tracker.is_finished()
573        {
574            Some(&self.graph_info)
575        } else {
576            None
577        }
578    }
579}
580
581pub(super) enum CompleteJobType {
582    /// The first barrier
583    First,
584    Normal,
585    /// The last barrier to complete
586    Finished,
587}
588
589impl CreatingStreamingJobControl {
590    pub(super) fn start_completing(
591        &mut self,
592        min_upstream_inflight_epoch: Option<u64>,
593    ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
594        let (finished_at_epoch, epoch_end_bound) = match &self.status {
595            CreatingStreamingJobStatus::Finishing(finish_at_epoch) => {
596                let epoch_end_bound = min_upstream_inflight_epoch
597                    .map(|upstream_epoch| {
598                        if upstream_epoch < *finish_at_epoch {
599                            Excluded(upstream_epoch)
600                        } else {
601                            Unbounded
602                        }
603                    })
604                    .unwrap_or(Unbounded);
605                (Some(*finish_at_epoch), epoch_end_bound)
606            }
607            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
608            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
609                None,
610                min_upstream_inflight_epoch
611                    .map(Excluded)
612                    .unwrap_or(Unbounded),
613            ),
614        };
615        self.barrier_control.start_completing(epoch_end_bound).map(
616            |(epoch, resps, is_first_commit)| {
617                let status = if let Some(finish_at_epoch) = finished_at_epoch {
618                    assert!(!is_first_commit);
619                    if epoch == finish_at_epoch {
620                        self.barrier_control.ack_completed(epoch);
621                        assert!(self.barrier_control.is_empty());
622                        CompleteJobType::Finished
623                    } else {
624                        CompleteJobType::Normal
625                    }
626                } else if is_first_commit {
627                    CompleteJobType::First
628                } else {
629                    CompleteJobType::Normal
630                };
631                (epoch, resps, status)
632            },
633        )
634    }
635
636    pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
637        self.barrier_control.ack_completed(completed_epoch);
638    }
639
640    pub(super) fn is_finished(&self) -> bool {
641        self.barrier_control.is_empty() && self.status.is_finishing()
642    }
643
644    pub fn is_consuming(&self) -> bool {
645        match &self.status {
646            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
647            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
648            CreatingStreamingJobStatus::Finishing(_) => false,
649        }
650    }
651
652    pub fn state_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
653        self.graph_info.existing_table_ids()
654    }
655
656    pub fn graph_info(&self) -> &InflightStreamingJobInfo {
657        &self.graph_info
658    }
659}