risingwave_meta/barrier/checkpoint/creating_job/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::LabelGuardedIntGauge;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::ddl_service::DdlProgress;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::stream_plan::AddMutation;
31use risingwave_pb::stream_plan::barrier::PbBarrierKind;
32use risingwave_pb::stream_plan::barrier_mutation::Mutation;
33use risingwave_pb::stream_service::BarrierCompleteResponse;
34use status::CreatingStreamingJobStatus;
35use tracing::{debug, info};
36
37use crate::MetaResult;
38use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
39use crate::barrier::edge_builder::FragmentEdgeBuildResult;
40use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
41use crate::barrier::progress::CreateMviewProgressTracker;
42use crate::barrier::rpc::ControlStreamManager;
43use crate::barrier::{BarrierKind, Command, CreateStreamingJobCommandInfo, TracedEpoch};
44use crate::controller::fragment::InflightFragmentInfo;
45use crate::model::{StreamJobActorsToCreate, StreamJobFragments};
46use crate::rpc::metrics::GLOBAL_META_METRICS;
47use crate::stream::build_actor_connector_splits;
48
49#[derive(Debug)]
50pub(crate) struct CreatingStreamingJobControl {
51    database_id: DatabaseId,
52    pub(super) job_id: TableId,
53    definition: String,
54    pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
55    backfill_epoch: u64,
56
57    graph_info: InflightStreamingJobInfo,
58
59    barrier_control: CreatingStreamingJobBarrierControl,
60    status: CreatingStreamingJobStatus,
61
62    upstream_lag: LabelGuardedIntGauge,
63}
64
65impl CreatingStreamingJobControl {
66    pub(super) fn new(
67        info: &CreateStreamingJobCommandInfo,
68        snapshot_backfill_upstream_tables: HashSet<TableId>,
69        backfill_epoch: u64,
70        version_stat: &HummockVersionStats,
71        control_stream_manager: &mut ControlStreamManager,
72        edges: &mut FragmentEdgeBuildResult,
73    ) -> MetaResult<Self> {
74        let job_id = info.stream_job_fragments.stream_job_id();
75        let database_id = DatabaseId::new(info.streaming_job.database_id());
76        debug!(
77            %job_id,
78            definition = info.definition,
79            "new creating job"
80        );
81        let snapshot_backfill_actors = info.stream_job_fragments.snapshot_backfill_actor_ids();
82        // FIXME(kwannoel): support backfill order control for snapshot backfill
83        let create_mview_tracker = CreateMviewProgressTracker::recover(
84            [(
85                job_id,
86                (info.definition.clone(), &*info.stream_job_fragments),
87            )],
88            version_stat,
89        );
90        let fragment_infos: HashMap<_, _> = info.stream_job_fragments.new_fragment_info().collect();
91
92        let actors_to_create =
93            edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create());
94
95        let graph_info = InflightStreamingJobInfo {
96            job_id,
97            fragment_infos,
98        };
99
100        let mut barrier_control =
101            CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, false);
102
103        let mut prev_epoch_fake_physical_time = 0;
104        let mut pending_non_checkpoint_barriers = vec![];
105
106        let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
107            &mut prev_epoch_fake_physical_time,
108            &mut pending_non_checkpoint_barriers,
109            PbBarrierKind::Checkpoint,
110        );
111
112        let added_actors = info.stream_job_fragments.actor_ids();
113        let actor_splits = info
114            .init_split_assignment
115            .values()
116            .flat_map(build_actor_connector_splits)
117            .collect();
118
119        let initial_mutation = Mutation::Add(AddMutation {
120            // for mutation of snapshot backfill job, we won't include changes to dispatchers of upstream actors.
121            actor_dispatchers: Default::default(),
122            added_actors,
123            actor_splits,
124            // we assume that when handling snapshot backfill, the cluster must not be paused
125            pause: false,
126            subscriptions_to_add: Default::default(),
127            backfill_nodes_to_pause: Default::default(),
128        });
129
130        control_stream_manager.add_partial_graph(database_id, Some(job_id));
131        Self::inject_barrier(
132            database_id,
133            job_id,
134            control_stream_manager,
135            &mut barrier_control,
136            &graph_info,
137            Some(&graph_info),
138            initial_barrier_info,
139            Some(actors_to_create),
140            Some(initial_mutation),
141        )?;
142
143        assert!(pending_non_checkpoint_barriers.is_empty());
144
145        Ok(Self {
146            database_id,
147            definition: info.definition.clone(),
148            job_id,
149            snapshot_backfill_upstream_tables,
150            barrier_control,
151            backfill_epoch,
152            graph_info,
153            status: CreatingStreamingJobStatus::ConsumingSnapshot {
154                prev_epoch_fake_physical_time,
155                pending_upstream_barriers: vec![],
156                version_stats: version_stat.clone(),
157                create_mview_tracker,
158                snapshot_backfill_actors,
159                backfill_epoch,
160                pending_non_checkpoint_barriers,
161            },
162            upstream_lag: GLOBAL_META_METRICS
163                .snapshot_backfill_lag
164                .with_guarded_label_values(&[&format!("{}", job_id)]),
165        })
166    }
167
168    fn resolve_upstream_log_epochs(
169        snapshot_backfill_upstream_tables: &HashSet<TableId>,
170        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
171        committed_epoch: u64,
172        upstream_curr_epoch: u64,
173    ) -> MetaResult<Vec<BarrierInfo>> {
174        // TODO: add sanity check
175        let table_id = snapshot_backfill_upstream_tables.iter().next().unwrap();
176        let mut epochs_iter = upstream_table_log_epochs[table_id].iter();
177        loop {
178            let (_, checkpoint_epoch) = epochs_iter.next().expect("not reach committed epoch yet");
179            if *checkpoint_epoch < committed_epoch {
180                continue;
181            }
182            assert_eq!(*checkpoint_epoch, committed_epoch);
183            break;
184        }
185        let mut ret = vec![];
186        let mut prev_epoch = committed_epoch;
187        let mut pending_non_checkpoint_barriers = vec![];
188        for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
189            for (i, epoch) in non_checkpoint_epochs
190                .iter()
191                .chain([checkpoint_epoch])
192                .enumerate()
193            {
194                assert!(*epoch > prev_epoch);
195                pending_non_checkpoint_barriers.push(prev_epoch);
196                ret.push(BarrierInfo {
197                    prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
198                    curr_epoch: TracedEpoch::new(Epoch(*epoch)),
199                    kind: if i == 0 {
200                        BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
201                    } else {
202                        BarrierKind::Barrier
203                    },
204                });
205                prev_epoch = *epoch;
206            }
207        }
208        ret.push(BarrierInfo {
209            prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
210            curr_epoch: TracedEpoch::new(Epoch(upstream_curr_epoch)),
211            kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
212        });
213        Ok(ret)
214    }
215
216    fn recover_consuming_snapshot(
217        job_id: TableId,
218        definition: &String,
219        snapshot_backfill_upstream_tables: &HashSet<TableId>,
220        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
221        backfill_epoch: u64,
222        committed_epoch: u64,
223        upstream_curr_epoch: u64,
224        stream_job_fragments: StreamJobFragments,
225        version_stat: &HummockVersionStats,
226    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
227        let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
228        let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
229        let mut pending_non_checkpoint_barriers = vec![];
230        let create_mview_tracker = CreateMviewProgressTracker::recover(
231            [(job_id, (definition.clone(), &stream_job_fragments))],
232            version_stat,
233        );
234        let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
235            &mut prev_epoch_fake_physical_time,
236            &mut pending_non_checkpoint_barriers,
237            PbBarrierKind::Initial,
238        );
239        Ok((
240            CreatingStreamingJobStatus::ConsumingSnapshot {
241                prev_epoch_fake_physical_time,
242                pending_upstream_barriers: Self::resolve_upstream_log_epochs(
243                    snapshot_backfill_upstream_tables,
244                    upstream_table_log_epochs,
245                    backfill_epoch,
246                    upstream_curr_epoch,
247                )?,
248                version_stats: version_stat.clone(),
249                create_mview_tracker,
250                snapshot_backfill_actors,
251                backfill_epoch,
252                pending_non_checkpoint_barriers,
253            },
254            barrier_info,
255        ))
256    }
257
258    fn recover_consuming_log_store(
259        snapshot_backfill_upstream_tables: &HashSet<TableId>,
260        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
261        committed_epoch: u64,
262        upstream_curr_epoch: u64,
263        stream_job_fragments: StreamJobFragments,
264    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
265        let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
266        let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
267            snapshot_backfill_upstream_tables,
268            upstream_table_log_epochs,
269            committed_epoch,
270            upstream_curr_epoch,
271        )?;
272        let mut first_barrier = barriers_to_inject.remove(0);
273        assert!(first_barrier.kind.is_checkpoint());
274        first_barrier.kind = BarrierKind::Initial;
275        Ok((
276            CreatingStreamingJobStatus::ConsumingLogStore {
277                log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
278                    snapshot_backfill_actors.into_iter(),
279                    barriers_to_inject
280                        .last()
281                        .map(|info| info.prev_epoch() - committed_epoch)
282                        .unwrap_or(0),
283                ),
284                barriers_to_inject: Some(barriers_to_inject),
285            },
286            first_barrier,
287        ))
288    }
289
290    #[expect(clippy::too_many_arguments)]
291    pub(crate) fn recover(
292        database_id: DatabaseId,
293        job_id: TableId,
294        definition: String,
295        snapshot_backfill_upstream_tables: HashSet<TableId>,
296        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
297        backfill_epoch: u64,
298        committed_epoch: u64,
299        upstream_curr_epoch: u64,
300        graph_info: InflightStreamingJobInfo,
301        stream_job_fragments: StreamJobFragments,
302        version_stat: &HummockVersionStats,
303        new_actors: StreamJobActorsToCreate,
304        initial_mutation: Mutation,
305        control_stream_manager: &mut ControlStreamManager,
306    ) -> MetaResult<Self> {
307        debug!(
308            %job_id,
309            definition,
310            "recovered creating job"
311        );
312        let mut barrier_control =
313            CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, true);
314
315        let (status, first_barrier_info) = if committed_epoch < backfill_epoch {
316            Self::recover_consuming_snapshot(
317                job_id,
318                &definition,
319                &snapshot_backfill_upstream_tables,
320                upstream_table_log_epochs,
321                backfill_epoch,
322                committed_epoch,
323                upstream_curr_epoch,
324                stream_job_fragments,
325                version_stat,
326            )?
327        } else {
328            Self::recover_consuming_log_store(
329                &snapshot_backfill_upstream_tables,
330                upstream_table_log_epochs,
331                committed_epoch,
332                upstream_curr_epoch,
333                stream_job_fragments,
334            )?
335        };
336        control_stream_manager.add_partial_graph(database_id, Some(job_id));
337        Self::inject_barrier(
338            database_id,
339            job_id,
340            control_stream_manager,
341            &mut barrier_control,
342            &graph_info,
343            Some(&graph_info),
344            first_barrier_info,
345            Some(new_actors),
346            Some(initial_mutation),
347        )?;
348        Ok(Self {
349            database_id,
350            job_id,
351            definition,
352            snapshot_backfill_upstream_tables,
353            backfill_epoch,
354            graph_info,
355            barrier_control,
356            status,
357            upstream_lag: GLOBAL_META_METRICS
358                .snapshot_backfill_lag
359                .with_guarded_label_values(&[&format!("{}", job_id)]),
360        })
361    }
362
363    pub(crate) fn is_empty(&self) -> bool {
364        self.barrier_control.is_empty()
365    }
366
367    pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
368        self.barrier_control.is_valid_after_worker_err(worker_id)
369            && (!self.status.is_finishing()
370                || InflightFragmentInfo::contains_worker(
371                    self.graph_info.fragment_infos(),
372                    worker_id,
373                ))
374    }
375
376    pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
377        let progress = match &self.status {
378            CreatingStreamingJobStatus::ConsumingSnapshot {
379                create_mview_tracker,
380                ..
381            } => {
382                if create_mview_tracker.has_pending_finished_jobs() {
383                    "Snapshot finished".to_owned()
384                } else {
385                    let progress = create_mview_tracker
386                        .gen_ddl_progress()
387                        .remove(&self.job_id.table_id)
388                        .expect("should exist");
389                    format!("Snapshot [{}]", progress.progress)
390                }
391            }
392            CreatingStreamingJobStatus::ConsumingLogStore {
393                log_store_progress_tracker,
394                ..
395            } => {
396                format!(
397                    "LogStore [{}]",
398                    log_store_progress_tracker.gen_ddl_progress()
399                )
400            }
401            CreatingStreamingJobStatus::Finishing(_) => {
402                format!(
403                    "Finishing [epoch count: {}]",
404                    self.barrier_control.inflight_barrier_count()
405                )
406            }
407        };
408        DdlProgress {
409            id: self.job_id.table_id as u64,
410            statement: self.definition.clone(),
411            progress,
412        }
413    }
414
415    pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
416        if self.status.is_finishing() {
417            None
418        } else {
419            // TODO: when supporting recoverable snapshot backfill, we should use the max epoch that has committed
420            Some(max(
421                self.barrier_control.max_collected_epoch().unwrap_or(0),
422                self.backfill_epoch,
423            ))
424        }
425    }
426
427    fn inject_barrier(
428        database_id: DatabaseId,
429        table_id: TableId,
430        control_stream_manager: &mut ControlStreamManager,
431        barrier_control: &mut CreatingStreamingJobBarrierControl,
432        pre_applied_graph_info: &InflightStreamingJobInfo,
433        applied_graph_info: Option<&InflightStreamingJobInfo>,
434        barrier_info: BarrierInfo,
435        new_actors: Option<StreamJobActorsToCreate>,
436        mutation: Option<Mutation>,
437    ) -> MetaResult<()> {
438        let node_to_collect = control_stream_manager.inject_barrier(
439            database_id,
440            Some(table_id),
441            mutation,
442            &barrier_info,
443            pre_applied_graph_info.fragment_infos(),
444            applied_graph_info
445                .map(|graph_info| graph_info.fragment_infos())
446                .into_iter()
447                .flatten(),
448            new_actors,
449            vec![],
450            vec![],
451        )?;
452        barrier_control.enqueue_epoch(
453            barrier_info.prev_epoch(),
454            node_to_collect,
455            barrier_info.kind.clone(),
456        );
457        Ok(())
458    }
459
460    pub(super) fn on_new_command(
461        &mut self,
462        control_stream_manager: &mut ControlStreamManager,
463        command: Option<&Command>,
464        barrier_info: &BarrierInfo,
465    ) -> MetaResult<()> {
466        let table_id = self.job_id;
467        let start_consume_upstream =
468            if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
469                jobs_to_merge.contains_key(&table_id)
470            } else {
471                false
472            };
473        if start_consume_upstream {
474            info!(
475                table_id = self.job_id.table_id,
476                prev_epoch = barrier_info.prev_epoch(),
477                "start consuming upstream"
478            );
479        }
480        let progress_epoch =
481            if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
482                max(max_collected_epoch, self.backfill_epoch)
483            } else {
484                self.backfill_epoch
485            };
486        self.upstream_lag.set(
487            barrier_info
488                .prev_epoch
489                .value()
490                .0
491                .saturating_sub(progress_epoch) as _,
492        );
493        if start_consume_upstream {
494            self.status.start_consume_upstream(barrier_info);
495            Self::inject_barrier(
496                self.database_id,
497                self.job_id,
498                control_stream_manager,
499                &mut self.barrier_control,
500                &self.graph_info,
501                None,
502                barrier_info.clone(),
503                None,
504                None,
505            )?;
506        } else {
507            for barrier_to_inject in self.status.on_new_upstream_epoch(barrier_info) {
508                Self::inject_barrier(
509                    self.database_id,
510                    self.job_id,
511                    control_stream_manager,
512                    &mut self.barrier_control,
513                    &self.graph_info,
514                    Some(&self.graph_info),
515                    barrier_to_inject,
516                    None,
517                    None,
518                )?;
519            }
520        }
521        Ok(())
522    }
523
524    pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
525        self.status.update_progress(&resp.create_mview_progress);
526        self.barrier_control.collect(resp);
527        self.should_merge_to_upstream().is_some()
528    }
529
530    pub(super) fn should_merge_to_upstream(&self) -> Option<&InflightStreamingJobInfo> {
531        if let CreatingStreamingJobStatus::ConsumingLogStore {
532            log_store_progress_tracker,
533            barriers_to_inject,
534        } = &self.status
535            && barriers_to_inject.is_none()
536            && log_store_progress_tracker.is_finished()
537        {
538            Some(&self.graph_info)
539        } else {
540            None
541        }
542    }
543}
544
545pub(super) enum CompleteJobType {
546    /// The first barrier
547    First,
548    Normal,
549    /// The last barrier to complete
550    Finished,
551}
552
553impl CreatingStreamingJobControl {
554    pub(super) fn start_completing(
555        &mut self,
556        min_upstream_inflight_epoch: Option<u64>,
557    ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
558        let (finished_at_epoch, epoch_end_bound) = match &self.status {
559            CreatingStreamingJobStatus::Finishing(finish_at_epoch) => {
560                let epoch_end_bound = min_upstream_inflight_epoch
561                    .map(|upstream_epoch| {
562                        if upstream_epoch < *finish_at_epoch {
563                            Excluded(upstream_epoch)
564                        } else {
565                            Unbounded
566                        }
567                    })
568                    .unwrap_or(Unbounded);
569                (Some(*finish_at_epoch), epoch_end_bound)
570            }
571            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
572            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
573                None,
574                min_upstream_inflight_epoch
575                    .map(Excluded)
576                    .unwrap_or(Unbounded),
577            ),
578        };
579        self.barrier_control.start_completing(epoch_end_bound).map(
580            |(epoch, resps, is_first_commit)| {
581                let status = if let Some(finish_at_epoch) = finished_at_epoch {
582                    assert!(!is_first_commit);
583                    if epoch == finish_at_epoch {
584                        self.barrier_control.ack_completed(epoch);
585                        assert!(self.barrier_control.is_empty());
586                        CompleteJobType::Finished
587                    } else {
588                        CompleteJobType::Normal
589                    }
590                } else if is_first_commit {
591                    CompleteJobType::First
592                } else {
593                    CompleteJobType::Normal
594                };
595                (epoch, resps, status)
596            },
597        )
598    }
599
600    pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
601        self.barrier_control.ack_completed(completed_epoch);
602    }
603
604    pub(super) fn is_finished(&self) -> bool {
605        self.barrier_control.is_empty() && self.status.is_finishing()
606    }
607
608    pub fn is_consuming(&self) -> bool {
609        match &self.status {
610            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
611            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
612            CreatingStreamingJobStatus::Finishing(_) => false,
613        }
614    }
615
616    pub fn state_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
617        self.graph_info.existing_table_ids()
618    }
619}