risingwave_meta/barrier/checkpoint/creating_job/
mod.rs1mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::ops::Bound::{Excluded, Unbounded};
21
22use barrier_control::CreatingStreamingJobBarrierControl;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::metrics::LabelGuardedIntGauge;
25use risingwave_meta_model::WorkerId;
26use risingwave_pb::ddl_service::DdlProgress;
27use risingwave_pb::hummock::HummockVersionStats;
28use risingwave_pb::stream_plan::AddMutation;
29use risingwave_pb::stream_plan::barrier_mutation::Mutation;
30use risingwave_pb::stream_service::BarrierCompleteResponse;
31use status::CreatingStreamingJobStatus;
32use tracing::info;
33
34use crate::MetaResult;
35use crate::barrier::edge_builder::FragmentEdgeBuildResult;
36use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
37use crate::barrier::progress::CreateMviewProgressTracker;
38use crate::barrier::rpc::ControlStreamManager;
39use crate::barrier::{Command, CreateStreamingJobCommandInfo};
40use crate::controller::fragment::InflightFragmentInfo;
41use crate::model::StreamJobActorsToCreate;
42use crate::rpc::metrics::GLOBAL_META_METRICS;
43use crate::stream::build_actor_connector_splits;
44
45#[derive(Debug)]
46pub(crate) struct CreatingStreamingJobControl {
47 database_id: DatabaseId,
48 pub(super) job_id: TableId,
49 definition: String,
50 pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
51 backfill_epoch: u64,
52
53 graph_info: InflightStreamingJobInfo,
54
55 barrier_control: CreatingStreamingJobBarrierControl,
56 status: CreatingStreamingJobStatus,
57
58 upstream_lag: LabelGuardedIntGauge<1>,
59}
60
61impl CreatingStreamingJobControl {
62 pub(super) fn new(
63 info: &CreateStreamingJobCommandInfo,
64 snapshot_backfill_upstream_tables: HashSet<TableId>,
65 backfill_epoch: u64,
66 version_stat: &HummockVersionStats,
67 control_stream_manager: &mut ControlStreamManager,
68 edges: &mut FragmentEdgeBuildResult,
69 ) -> MetaResult<Self> {
70 let job_id = info.stream_job_fragments.stream_job_id();
71 let database_id = DatabaseId::new(info.streaming_job.database_id());
72 info!(
73 %job_id,
74 definition = info.definition,
75 "new creating job"
76 );
77 let snapshot_backfill_actors = info.stream_job_fragments.snapshot_backfill_actor_ids();
78 let create_mview_tracker = CreateMviewProgressTracker::recover(
79 [(
80 job_id,
81 (info.definition.clone(), &*info.stream_job_fragments),
82 )],
83 version_stat,
84 );
85 let fragment_infos: HashMap<_, _> = info.stream_job_fragments.new_fragment_info().collect();
86
87 let table_id = info.stream_job_fragments.stream_job_id();
88 let table_id_str = format!("{}", table_id.table_id);
89
90 let actors_to_create =
91 edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create());
92
93 let graph_info = InflightStreamingJobInfo {
94 job_id: table_id,
95 fragment_infos,
96 };
97
98 let mut barrier_control =
99 CreatingStreamingJobBarrierControl::new(table_id, backfill_epoch, false);
100
101 let mut prev_epoch_fake_physical_time = 0;
102 let mut pending_non_checkpoint_barriers = vec![];
103
104 let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
105 &mut prev_epoch_fake_physical_time,
106 &mut pending_non_checkpoint_barriers,
107 true,
108 );
109
110 let added_actors = info.stream_job_fragments.actor_ids();
111 let actor_splits = info
112 .init_split_assignment
113 .values()
114 .flat_map(build_actor_connector_splits)
115 .collect();
116
117 let initial_mutation = Mutation::Add(AddMutation {
118 actor_dispatchers: Default::default(),
120 added_actors,
121 actor_splits,
122 pause: false,
124 subscriptions_to_add: Default::default(),
125 });
126
127 control_stream_manager.add_partial_graph(database_id, Some(job_id));
128 Self::inject_barrier(
129 database_id,
130 job_id,
131 control_stream_manager,
132 &mut barrier_control,
133 &graph_info,
134 Some(&graph_info),
135 initial_barrier_info,
136 Some(actors_to_create),
137 Some(initial_mutation),
138 )?;
139
140 assert!(pending_non_checkpoint_barriers.is_empty());
141
142 Ok(Self {
143 database_id,
144 definition: info.definition.clone(),
145 job_id,
146 snapshot_backfill_upstream_tables,
147 barrier_control,
148 backfill_epoch,
149 graph_info,
150 status: CreatingStreamingJobStatus::ConsumingSnapshot {
151 prev_epoch_fake_physical_time,
152 pending_upstream_barriers: vec![],
153 version_stats: version_stat.clone(),
154 create_mview_tracker,
155 snapshot_backfill_actors,
156 backfill_epoch,
157 pending_non_checkpoint_barriers,
158 },
159 upstream_lag: GLOBAL_META_METRICS
160 .snapshot_backfill_lag
161 .with_guarded_label_values(&[&table_id_str]),
162 })
163 }
164
165 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
166 self.barrier_control.is_valid_after_worker_err(worker_id)
167 && (!self.status.is_finishing()
168 || InflightFragmentInfo::contains_worker(
169 self.graph_info.fragment_infos(),
170 worker_id,
171 ))
172 }
173
174 pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
175 let progress = match &self.status {
176 CreatingStreamingJobStatus::ConsumingSnapshot {
177 create_mview_tracker,
178 ..
179 } => {
180 if create_mview_tracker.has_pending_finished_jobs() {
181 "Snapshot finished".to_owned()
182 } else {
183 let progress = create_mview_tracker
184 .gen_ddl_progress()
185 .remove(&self.job_id.table_id)
186 .expect("should exist");
187 format!("Snapshot [{}]", progress.progress)
188 }
189 }
190 CreatingStreamingJobStatus::ConsumingLogStore {
191 log_store_progress_tracker,
192 ..
193 } => {
194 format!(
195 "LogStore [{}]",
196 log_store_progress_tracker.gen_ddl_progress()
197 )
198 }
199 CreatingStreamingJobStatus::Finishing(_) => {
200 format!(
201 "Finishing [epoch count: {}]",
202 self.barrier_control.inflight_barrier_count()
203 )
204 }
205 };
206 DdlProgress {
207 id: self.job_id.table_id as u64,
208 statement: self.definition.clone(),
209 progress,
210 }
211 }
212
213 pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
214 if self.status.is_finishing() {
215 None
216 } else {
217 Some(max(
219 self.barrier_control.max_collected_epoch().unwrap_or(0),
220 self.backfill_epoch,
221 ))
222 }
223 }
224
225 fn inject_barrier(
226 database_id: DatabaseId,
227 table_id: TableId,
228 control_stream_manager: &mut ControlStreamManager,
229 barrier_control: &mut CreatingStreamingJobBarrierControl,
230 pre_applied_graph_info: &InflightStreamingJobInfo,
231 applied_graph_info: Option<&InflightStreamingJobInfo>,
232 barrier_info: BarrierInfo,
233 new_actors: Option<StreamJobActorsToCreate>,
234 mutation: Option<Mutation>,
235 ) -> MetaResult<()> {
236 let node_to_collect = control_stream_manager.inject_barrier(
237 database_id,
238 Some(table_id),
239 mutation,
240 &barrier_info,
241 pre_applied_graph_info.fragment_infos(),
242 applied_graph_info
243 .map(|graph_info| graph_info.fragment_infos())
244 .into_iter()
245 .flatten(),
246 new_actors,
247 vec![],
248 vec![],
249 )?;
250 barrier_control.enqueue_epoch(
251 barrier_info.prev_epoch(),
252 node_to_collect,
253 barrier_info.kind.is_checkpoint(),
254 );
255 Ok(())
256 }
257
258 pub(super) fn on_new_command(
259 &mut self,
260 control_stream_manager: &mut ControlStreamManager,
261 command: Option<&Command>,
262 barrier_info: &BarrierInfo,
263 ) -> MetaResult<()> {
264 let table_id = self.job_id;
265 let start_consume_upstream =
266 if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
267 jobs_to_merge.contains_key(&table_id)
268 } else {
269 false
270 };
271 if start_consume_upstream {
272 info!(
273 table_id = self.job_id.table_id,
274 prev_epoch = barrier_info.prev_epoch(),
275 "start consuming upstream"
276 );
277 }
278 let progress_epoch =
279 if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
280 max(max_collected_epoch, self.backfill_epoch)
281 } else {
282 self.backfill_epoch
283 };
284 self.upstream_lag.set(
285 barrier_info
286 .prev_epoch
287 .value()
288 .0
289 .saturating_sub(progress_epoch) as _,
290 );
291 if start_consume_upstream {
292 self.status.start_consume_upstream(barrier_info);
293 Self::inject_barrier(
294 self.database_id,
295 self.job_id,
296 control_stream_manager,
297 &mut self.barrier_control,
298 &self.graph_info,
299 None,
300 barrier_info.clone(),
301 None,
302 None,
303 )?;
304 } else {
305 for barrier_to_inject in self.status.on_new_upstream_epoch(barrier_info) {
306 Self::inject_barrier(
307 self.database_id,
308 self.job_id,
309 control_stream_manager,
310 &mut self.barrier_control,
311 &self.graph_info,
312 Some(&self.graph_info),
313 barrier_to_inject,
314 None,
315 None,
316 )?;
317 }
318 }
319 Ok(())
320 }
321
322 pub(super) fn collect(
323 &mut self,
324 epoch: u64,
325 worker_id: WorkerId,
326 resp: BarrierCompleteResponse,
327 ) -> MetaResult<bool> {
328 self.status.update_progress(&resp.create_mview_progress);
329 self.barrier_control.collect(epoch, worker_id, resp);
330 Ok(self.should_merge_to_upstream().is_some())
331 }
332
333 pub(super) fn should_merge_to_upstream(&self) -> Option<&InflightStreamingJobInfo> {
334 if let CreatingStreamingJobStatus::ConsumingLogStore {
335 log_store_progress_tracker,
336 barriers_to_inject,
337 } = &self.status
338 && barriers_to_inject.is_none()
339 && log_store_progress_tracker.is_finished()
340 {
341 Some(&self.graph_info)
342 } else {
343 None
344 }
345 }
346}
347
348pub(super) enum CompleteJobType {
349 First,
351 Normal,
352 Finished,
354}
355
356impl CreatingStreamingJobControl {
357 pub(super) fn start_completing(
358 &mut self,
359 min_upstream_inflight_epoch: Option<u64>,
360 ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
361 let (finished_at_epoch, epoch_end_bound) = match &self.status {
362 CreatingStreamingJobStatus::Finishing(finish_at_epoch) => {
363 let epoch_end_bound = min_upstream_inflight_epoch
364 .map(|upstream_epoch| {
365 if upstream_epoch < *finish_at_epoch {
366 Excluded(upstream_epoch)
367 } else {
368 Unbounded
369 }
370 })
371 .unwrap_or(Unbounded);
372 (Some(*finish_at_epoch), epoch_end_bound)
373 }
374 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
375 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
376 None,
377 min_upstream_inflight_epoch
378 .map(Excluded)
379 .unwrap_or(Unbounded),
380 ),
381 };
382 self.barrier_control.start_completing(epoch_end_bound).map(
383 |(epoch, resps, is_first_commit)| {
384 let status = if let Some(finish_at_epoch) = finished_at_epoch {
385 assert!(!is_first_commit);
386 if epoch == finish_at_epoch {
387 self.barrier_control.ack_completed(epoch);
388 assert!(self.barrier_control.is_empty());
389 CompleteJobType::Finished
390 } else {
391 CompleteJobType::Normal
392 }
393 } else if is_first_commit {
394 CompleteJobType::First
395 } else {
396 CompleteJobType::Normal
397 };
398 (epoch, resps, status)
399 },
400 )
401 }
402
403 pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
404 self.barrier_control.ack_completed(completed_epoch);
405 }
406
407 pub(super) fn is_finished(&self) -> bool {
408 self.barrier_control.is_empty() && self.status.is_finishing()
409 }
410
411 pub fn inflight_graph_info(&self) -> Option<&InflightStreamingJobInfo> {
412 match &self.status {
413 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
414 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => Some(&self.graph_info),
415 CreatingStreamingJobStatus::Finishing(_) => None,
416 }
417 }
418
419 pub fn state_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
420 self.graph_info.existing_table_ids()
421 }
422}