risingwave_meta/barrier/checkpoint/creating_job/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::LabelGuardedIntGauge;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::ddl_service::DdlProgress;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::stream_plan::AddMutation;
31use risingwave_pb::stream_plan::barrier::PbBarrierKind;
32use risingwave_pb::stream_plan::barrier_mutation::Mutation;
33use risingwave_pb::stream_service::BarrierCompleteResponse;
34use status::CreatingStreamingJobStatus;
35use tracing::{debug, info};
36
37use crate::MetaResult;
38use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
39use crate::barrier::edge_builder::FragmentEdgeBuildResult;
40use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
41use crate::barrier::progress::CreateMviewProgressTracker;
42use crate::barrier::rpc::ControlStreamManager;
43use crate::barrier::{BarrierKind, Command, CreateStreamingJobCommandInfo, TracedEpoch};
44use crate::controller::fragment::InflightFragmentInfo;
45use crate::model::{StreamJobActorsToCreate, StreamJobFragments};
46use crate::rpc::metrics::GLOBAL_META_METRICS;
47use crate::stream::build_actor_connector_splits;
48
49#[derive(Debug)]
50pub(crate) struct CreatingStreamingJobControl {
51    database_id: DatabaseId,
52    pub(super) job_id: TableId,
53    definition: String,
54    pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
55    backfill_epoch: u64,
56
57    graph_info: InflightStreamingJobInfo,
58
59    barrier_control: CreatingStreamingJobBarrierControl,
60    status: CreatingStreamingJobStatus,
61
62    upstream_lag: LabelGuardedIntGauge,
63}
64
65impl CreatingStreamingJobControl {
66    pub(super) fn new(
67        info: &CreateStreamingJobCommandInfo,
68        snapshot_backfill_upstream_tables: HashSet<TableId>,
69        backfill_epoch: u64,
70        version_stat: &HummockVersionStats,
71        control_stream_manager: &mut ControlStreamManager,
72        edges: &mut FragmentEdgeBuildResult,
73    ) -> MetaResult<Self> {
74        let job_id = info.stream_job_fragments.stream_job_id();
75        let database_id = DatabaseId::new(info.streaming_job.database_id());
76        debug!(
77            %job_id,
78            definition = info.definition,
79            "new creating job"
80        );
81        let snapshot_backfill_actors = info.stream_job_fragments.snapshot_backfill_actor_ids();
82        let create_mview_tracker = CreateMviewProgressTracker::recover(
83            [(
84                job_id,
85                (info.definition.clone(), &*info.stream_job_fragments),
86            )],
87            version_stat,
88        );
89        let fragment_infos: HashMap<_, _> = info.stream_job_fragments.new_fragment_info().collect();
90
91        let actors_to_create =
92            edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create());
93
94        let graph_info = InflightStreamingJobInfo {
95            job_id,
96            fragment_infos,
97        };
98
99        let mut barrier_control =
100            CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, false);
101
102        let mut prev_epoch_fake_physical_time = 0;
103        let mut pending_non_checkpoint_barriers = vec![];
104
105        let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
106            &mut prev_epoch_fake_physical_time,
107            &mut pending_non_checkpoint_barriers,
108            PbBarrierKind::Checkpoint,
109        );
110
111        let added_actors = info.stream_job_fragments.actor_ids();
112        let actor_splits = info
113            .init_split_assignment
114            .values()
115            .flat_map(build_actor_connector_splits)
116            .collect();
117
118        let initial_mutation = Mutation::Add(AddMutation {
119            // for mutation of snapshot backfill job, we won't include changes to dispatchers of upstream actors.
120            actor_dispatchers: Default::default(),
121            added_actors,
122            actor_splits,
123            // we assume that when handling snapshot backfill, the cluster must not be paused
124            pause: false,
125            subscriptions_to_add: Default::default(),
126        });
127
128        control_stream_manager.add_partial_graph(database_id, Some(job_id));
129        Self::inject_barrier(
130            database_id,
131            job_id,
132            control_stream_manager,
133            &mut barrier_control,
134            &graph_info,
135            Some(&graph_info),
136            initial_barrier_info,
137            Some(actors_to_create),
138            Some(initial_mutation),
139        )?;
140
141        assert!(pending_non_checkpoint_barriers.is_empty());
142
143        Ok(Self {
144            database_id,
145            definition: info.definition.clone(),
146            job_id,
147            snapshot_backfill_upstream_tables,
148            barrier_control,
149            backfill_epoch,
150            graph_info,
151            status: CreatingStreamingJobStatus::ConsumingSnapshot {
152                prev_epoch_fake_physical_time,
153                pending_upstream_barriers: vec![],
154                version_stats: version_stat.clone(),
155                create_mview_tracker,
156                snapshot_backfill_actors,
157                backfill_epoch,
158                pending_non_checkpoint_barriers,
159            },
160            upstream_lag: GLOBAL_META_METRICS
161                .snapshot_backfill_lag
162                .with_guarded_label_values(&[&format!("{}", job_id)]),
163        })
164    }
165
166    fn resolve_upstream_log_epochs(
167        snapshot_backfill_upstream_tables: &HashSet<TableId>,
168        upstream_table_log_epochs: &HashMap<TableId, Vec<Vec<u64>>>,
169        committed_epoch: u64,
170        upstream_curr_epoch: u64,
171    ) -> MetaResult<Vec<BarrierInfo>> {
172        // TODO: add sanity check
173        let table_id = snapshot_backfill_upstream_tables.iter().next().unwrap();
174        let mut epochs_iter = upstream_table_log_epochs[table_id].iter();
175        loop {
176            let epochs = epochs_iter.next().expect("not reach committed epoch yet");
177            if *epochs.last().unwrap() < committed_epoch {
178                continue;
179            }
180            assert_eq!(*epochs.last().unwrap(), committed_epoch);
181            break;
182        }
183        let mut ret = vec![];
184        let mut prev_epoch = committed_epoch;
185        let mut pending_non_checkpoint_barriers = vec![];
186        for epochs in epochs_iter {
187            for (i, epoch) in epochs.iter().enumerate() {
188                assert!(*epoch > prev_epoch);
189                pending_non_checkpoint_barriers.push(prev_epoch);
190                ret.push(BarrierInfo {
191                    prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
192                    curr_epoch: TracedEpoch::new(Epoch(*epoch)),
193                    kind: if i == 0 {
194                        BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
195                    } else {
196                        BarrierKind::Barrier
197                    },
198                });
199                prev_epoch = *epoch;
200            }
201        }
202        ret.push(BarrierInfo {
203            prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
204            curr_epoch: TracedEpoch::new(Epoch(upstream_curr_epoch)),
205            kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
206        });
207        Ok(ret)
208    }
209
210    fn recover_consuming_snapshot(
211        job_id: TableId,
212        definition: &String,
213        snapshot_backfill_upstream_tables: &HashSet<TableId>,
214        upstream_table_log_epochs: &HashMap<TableId, Vec<Vec<u64>>>,
215        backfill_epoch: u64,
216        committed_epoch: u64,
217        upstream_curr_epoch: u64,
218        stream_job_fragments: StreamJobFragments,
219        version_stat: &HummockVersionStats,
220    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
221        let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
222        let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
223        let mut pending_non_checkpoint_barriers = vec![];
224        let create_mview_tracker = CreateMviewProgressTracker::recover(
225            [(job_id, (definition.clone(), &stream_job_fragments))],
226            version_stat,
227        );
228        let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
229            &mut prev_epoch_fake_physical_time,
230            &mut pending_non_checkpoint_barriers,
231            PbBarrierKind::Initial,
232        );
233        Ok((
234            CreatingStreamingJobStatus::ConsumingSnapshot {
235                prev_epoch_fake_physical_time,
236                pending_upstream_barriers: Self::resolve_upstream_log_epochs(
237                    snapshot_backfill_upstream_tables,
238                    upstream_table_log_epochs,
239                    backfill_epoch,
240                    upstream_curr_epoch,
241                )?,
242                version_stats: version_stat.clone(),
243                create_mview_tracker,
244                snapshot_backfill_actors,
245                backfill_epoch,
246                pending_non_checkpoint_barriers,
247            },
248            barrier_info,
249        ))
250    }
251
252    fn recover_consuming_log_store(
253        snapshot_backfill_upstream_tables: &HashSet<TableId>,
254        upstream_table_log_epochs: &HashMap<TableId, Vec<Vec<u64>>>,
255        committed_epoch: u64,
256        upstream_curr_epoch: u64,
257        stream_job_fragments: StreamJobFragments,
258    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
259        let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
260        let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
261            snapshot_backfill_upstream_tables,
262            upstream_table_log_epochs,
263            committed_epoch,
264            upstream_curr_epoch,
265        )?;
266        let mut first_barrier = barriers_to_inject.remove(0);
267        assert!(first_barrier.kind.is_checkpoint());
268        first_barrier.kind = BarrierKind::Initial;
269        Ok((
270            CreatingStreamingJobStatus::ConsumingLogStore {
271                log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
272                    snapshot_backfill_actors.into_iter(),
273                    barriers_to_inject
274                        .last()
275                        .map(|info| info.prev_epoch() - committed_epoch)
276                        .unwrap_or(0),
277                ),
278                barriers_to_inject: Some(barriers_to_inject),
279            },
280            first_barrier,
281        ))
282    }
283
284    #[expect(clippy::too_many_arguments)]
285    pub(crate) fn recover(
286        database_id: DatabaseId,
287        job_id: TableId,
288        definition: String,
289        snapshot_backfill_upstream_tables: HashSet<TableId>,
290        upstream_table_log_epochs: &HashMap<TableId, Vec<Vec<u64>>>,
291        backfill_epoch: u64,
292        committed_epoch: u64,
293        upstream_curr_epoch: u64,
294        graph_info: InflightStreamingJobInfo,
295        stream_job_fragments: StreamJobFragments,
296        version_stat: &HummockVersionStats,
297        new_actors: StreamJobActorsToCreate,
298        initial_mutation: Mutation,
299        control_stream_manager: &mut ControlStreamManager,
300    ) -> MetaResult<Self> {
301        debug!(
302            %job_id,
303            definition,
304            "recovered creating job"
305        );
306        let mut barrier_control =
307            CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, true);
308
309        let (status, first_barrier_info) = if committed_epoch < backfill_epoch {
310            Self::recover_consuming_snapshot(
311                job_id,
312                &definition,
313                &snapshot_backfill_upstream_tables,
314                upstream_table_log_epochs,
315                backfill_epoch,
316                committed_epoch,
317                upstream_curr_epoch,
318                stream_job_fragments,
319                version_stat,
320            )?
321        } else {
322            Self::recover_consuming_log_store(
323                &snapshot_backfill_upstream_tables,
324                upstream_table_log_epochs,
325                committed_epoch,
326                upstream_curr_epoch,
327                stream_job_fragments,
328            )?
329        };
330        control_stream_manager.add_partial_graph(database_id, Some(job_id));
331        Self::inject_barrier(
332            database_id,
333            job_id,
334            control_stream_manager,
335            &mut barrier_control,
336            &graph_info,
337            Some(&graph_info),
338            first_barrier_info,
339            Some(new_actors),
340            Some(initial_mutation),
341        )?;
342        Ok(Self {
343            database_id,
344            job_id,
345            definition,
346            snapshot_backfill_upstream_tables,
347            backfill_epoch,
348            graph_info,
349            barrier_control,
350            status,
351            upstream_lag: GLOBAL_META_METRICS
352                .snapshot_backfill_lag
353                .with_guarded_label_values(&[&format!("{}", job_id)]),
354        })
355    }
356
357    pub(crate) fn is_empty(&self) -> bool {
358        self.barrier_control.is_empty()
359    }
360
361    pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
362        self.barrier_control.is_valid_after_worker_err(worker_id)
363            && (!self.status.is_finishing()
364                || InflightFragmentInfo::contains_worker(
365                    self.graph_info.fragment_infos(),
366                    worker_id,
367                ))
368    }
369
370    pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
371        let progress = match &self.status {
372            CreatingStreamingJobStatus::ConsumingSnapshot {
373                create_mview_tracker,
374                ..
375            } => {
376                if create_mview_tracker.has_pending_finished_jobs() {
377                    "Snapshot finished".to_owned()
378                } else {
379                    let progress = create_mview_tracker
380                        .gen_ddl_progress()
381                        .remove(&self.job_id.table_id)
382                        .expect("should exist");
383                    format!("Snapshot [{}]", progress.progress)
384                }
385            }
386            CreatingStreamingJobStatus::ConsumingLogStore {
387                log_store_progress_tracker,
388                ..
389            } => {
390                format!(
391                    "LogStore [{}]",
392                    log_store_progress_tracker.gen_ddl_progress()
393                )
394            }
395            CreatingStreamingJobStatus::Finishing(_) => {
396                format!(
397                    "Finishing [epoch count: {}]",
398                    self.barrier_control.inflight_barrier_count()
399                )
400            }
401        };
402        DdlProgress {
403            id: self.job_id.table_id as u64,
404            statement: self.definition.clone(),
405            progress,
406        }
407    }
408
409    pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
410        if self.status.is_finishing() {
411            None
412        } else {
413            // TODO: when supporting recoverable snapshot backfill, we should use the max epoch that has committed
414            Some(max(
415                self.barrier_control.max_collected_epoch().unwrap_or(0),
416                self.backfill_epoch,
417            ))
418        }
419    }
420
421    fn inject_barrier(
422        database_id: DatabaseId,
423        table_id: TableId,
424        control_stream_manager: &mut ControlStreamManager,
425        barrier_control: &mut CreatingStreamingJobBarrierControl,
426        pre_applied_graph_info: &InflightStreamingJobInfo,
427        applied_graph_info: Option<&InflightStreamingJobInfo>,
428        barrier_info: BarrierInfo,
429        new_actors: Option<StreamJobActorsToCreate>,
430        mutation: Option<Mutation>,
431    ) -> MetaResult<()> {
432        let node_to_collect = control_stream_manager.inject_barrier(
433            database_id,
434            Some(table_id),
435            mutation,
436            &barrier_info,
437            pre_applied_graph_info.fragment_infos(),
438            applied_graph_info
439                .map(|graph_info| graph_info.fragment_infos())
440                .into_iter()
441                .flatten(),
442            new_actors,
443            vec![],
444            vec![],
445        )?;
446        barrier_control.enqueue_epoch(
447            barrier_info.prev_epoch(),
448            node_to_collect,
449            barrier_info.kind.clone(),
450        );
451        Ok(())
452    }
453
454    pub(super) fn on_new_command(
455        &mut self,
456        control_stream_manager: &mut ControlStreamManager,
457        command: Option<&Command>,
458        barrier_info: &BarrierInfo,
459    ) -> MetaResult<()> {
460        let table_id = self.job_id;
461        let start_consume_upstream =
462            if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
463                jobs_to_merge.contains_key(&table_id)
464            } else {
465                false
466            };
467        if start_consume_upstream {
468            info!(
469                table_id = self.job_id.table_id,
470                prev_epoch = barrier_info.prev_epoch(),
471                "start consuming upstream"
472            );
473        }
474        let progress_epoch =
475            if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
476                max(max_collected_epoch, self.backfill_epoch)
477            } else {
478                self.backfill_epoch
479            };
480        self.upstream_lag.set(
481            barrier_info
482                .prev_epoch
483                .value()
484                .0
485                .saturating_sub(progress_epoch) as _,
486        );
487        if start_consume_upstream {
488            self.status.start_consume_upstream(barrier_info);
489            Self::inject_barrier(
490                self.database_id,
491                self.job_id,
492                control_stream_manager,
493                &mut self.barrier_control,
494                &self.graph_info,
495                None,
496                barrier_info.clone(),
497                None,
498                None,
499            )?;
500        } else {
501            for barrier_to_inject in self.status.on_new_upstream_epoch(barrier_info) {
502                Self::inject_barrier(
503                    self.database_id,
504                    self.job_id,
505                    control_stream_manager,
506                    &mut self.barrier_control,
507                    &self.graph_info,
508                    Some(&self.graph_info),
509                    barrier_to_inject,
510                    None,
511                    None,
512                )?;
513            }
514        }
515        Ok(())
516    }
517
518    pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
519        self.status.update_progress(&resp.create_mview_progress);
520        self.barrier_control.collect(resp);
521        self.should_merge_to_upstream().is_some()
522    }
523
524    pub(super) fn should_merge_to_upstream(&self) -> Option<&InflightStreamingJobInfo> {
525        if let CreatingStreamingJobStatus::ConsumingLogStore {
526            log_store_progress_tracker,
527            barriers_to_inject,
528        } = &self.status
529            && barriers_to_inject.is_none()
530            && log_store_progress_tracker.is_finished()
531        {
532            Some(&self.graph_info)
533        } else {
534            None
535        }
536    }
537}
538
539pub(super) enum CompleteJobType {
540    /// The first barrier
541    First,
542    Normal,
543    /// The last barrier to complete
544    Finished,
545}
546
547impl CreatingStreamingJobControl {
548    pub(super) fn start_completing(
549        &mut self,
550        min_upstream_inflight_epoch: Option<u64>,
551    ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
552        let (finished_at_epoch, epoch_end_bound) = match &self.status {
553            CreatingStreamingJobStatus::Finishing(finish_at_epoch) => {
554                let epoch_end_bound = min_upstream_inflight_epoch
555                    .map(|upstream_epoch| {
556                        if upstream_epoch < *finish_at_epoch {
557                            Excluded(upstream_epoch)
558                        } else {
559                            Unbounded
560                        }
561                    })
562                    .unwrap_or(Unbounded);
563                (Some(*finish_at_epoch), epoch_end_bound)
564            }
565            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
566            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
567                None,
568                min_upstream_inflight_epoch
569                    .map(Excluded)
570                    .unwrap_or(Unbounded),
571            ),
572        };
573        self.barrier_control.start_completing(epoch_end_bound).map(
574            |(epoch, resps, is_first_commit)| {
575                let status = if let Some(finish_at_epoch) = finished_at_epoch {
576                    assert!(!is_first_commit);
577                    if epoch == finish_at_epoch {
578                        self.barrier_control.ack_completed(epoch);
579                        assert!(self.barrier_control.is_empty());
580                        CompleteJobType::Finished
581                    } else {
582                        CompleteJobType::Normal
583                    }
584                } else if is_first_commit {
585                    CompleteJobType::First
586                } else {
587                    CompleteJobType::Normal
588                };
589                (epoch, resps, status)
590            },
591        )
592    }
593
594    pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
595        self.barrier_control.ack_completed(completed_epoch);
596    }
597
598    pub(super) fn is_finished(&self) -> bool {
599        self.barrier_control.is_empty() && self.status.is_finishing()
600    }
601
602    pub fn is_consuming(&self) -> bool {
603        match &self.status {
604            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
605            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
606            CreatingStreamingJobStatus::Finishing(_) => false,
607        }
608    }
609
610    pub fn state_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
611        self.graph_info.existing_table_ids()
612    }
613}