risingwave_meta/barrier/context/
context_impl.rs1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::Context;
19use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
20use risingwave_common::id::JobId;
21use risingwave_meta_model::ActorId;
22use risingwave_meta_model::streaming_job::BackfillOrders;
23use risingwave_pb::common::WorkerNode;
24use risingwave_pb::hummock::HummockVersionStats;
25use risingwave_pb::id::SourceId;
26use risingwave_pb::stream_service::barrier_complete_response::{
27 PbListFinishedSource, PbLoadFinishedSource,
28};
29use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
30use risingwave_rpc_client::StreamingControlHandle;
31
32use crate::barrier::cdc_progress::CdcTableBackfillTracker;
33use crate::barrier::command::{PostCollectCommand, ResumeBackfillTarget};
34use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
35use crate::barrier::progress::TrackingJob;
36use crate::barrier::schedule::MarkReadyOptions;
37use crate::barrier::{
38 BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
39 CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
40 Scheduled,
41};
42use crate::hummock::CommitEpochInfo;
43use crate::manager::LocalNotification;
44use crate::model::FragmentDownstreamRelation;
45use crate::stream::{SourceChange, cleanup_dropped_streaming_jobs};
46use crate::{MetaError, MetaResult};
47
48impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
49 #[await_tree::instrument]
50 async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
51 self.hummock_manager.commit_epoch(commit_info).await?;
52 Ok(self.hummock_manager.get_version_stats().await)
53 }
54
55 #[await_tree::instrument("next_scheduled_barrier")]
56 async fn next_scheduled(&self) -> Scheduled {
57 self.scheduled_barriers.next_scheduled().await
58 }
59
60 fn abort_and_mark_blocked(
61 &self,
62 database_id: Option<DatabaseId>,
63 recovery_reason: RecoveryReason,
64 ) {
65 if database_id.is_none() {
66 self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
67 }
68
69 self.scheduled_barriers
71 .abort_and_mark_blocked(database_id, "cluster is under recovering");
72 }
73
74 fn mark_ready(&self, options: MarkReadyOptions) {
75 let is_global = matches!(&options, MarkReadyOptions::Global { .. });
76 self.scheduled_barriers.mark_ready(options);
77 if is_global {
78 self.set_status(BarrierManagerStatus::Running);
79 }
80 }
81
82 #[await_tree::instrument("post_collect_command({command})")]
83 async fn post_collect_command(&self, command: PostCollectCommand) -> MetaResult<()> {
84 command.post_collect(self).await
85 }
86
87 async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
88 self.metadata_manager
89 .notify_finish_failed(database_id, err)
90 .await
91 }
92
93 #[await_tree::instrument("finish_creating_job({job})")]
94 async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
95 let job_id = job.job_id();
96 job.finish(&self.metadata_manager, &self.source_manager)
97 .await?;
98 self.env
99 .notification_manager()
100 .notify_local_subscribers(LocalNotification::StreamingJobBackfillFinished(job_id));
101 Ok(())
102 }
103
104 #[await_tree::instrument("finish_cdc_table_backfill({job})")]
105 async fn finish_cdc_table_backfill(&self, job: JobId) -> MetaResult<()> {
106 CdcTableBackfillTracker::mark_complete_job(&self.env.meta_store().conn, job).await
107 }
108
109 #[await_tree::instrument("new_control_stream({})", node.id)]
110 async fn new_control_stream(
111 &self,
112 node: &WorkerNode,
113 init_request: &PbInitRequest,
114 ) -> MetaResult<StreamingControlHandle> {
115 self.new_control_stream_impl(node, init_request).await
116 }
117
118 async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
119 self.reload_runtime_info_impl().await
120 }
121
122 async fn reload_database_runtime_info(
123 &self,
124 database_id: DatabaseId,
125 ) -> MetaResult<DatabaseRuntimeInfoSnapshot> {
126 self.reload_database_runtime_info_impl(database_id).await
127 }
128
129 async fn handle_list_finished_source_ids(
130 &self,
131 list_finished: Vec<PbListFinishedSource>,
132 ) -> MetaResult<()> {
133 let mut list_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
134
135 for list_finished in list_finished {
136 let table_id = list_finished.table_id;
137 let associated_source_id = list_finished.associated_source_id;
138 list_finished_info
139 .entry((table_id, associated_source_id))
140 .or_default()
141 .insert(list_finished.reporter_actor_id);
142 }
143
144 for ((table_id, associated_source_id), actors) in list_finished_info {
145 let allow_yield = self
146 .refresh_manager
147 .mark_list_stage_finished(table_id, &actors)?;
148
149 if !allow_yield {
150 continue;
151 }
152
153 let Some(database_id) = self
154 .get_source_database_id_for_refresh_stage(table_id, associated_source_id, "list")
155 .await?
156 else {
157 continue;
158 };
159
160 let list_finish_command = Command::ListFinish {
162 table_id,
163 associated_source_id,
164 };
165
166 self.barrier_scheduler
168 .run_command_no_wait(database_id, list_finish_command)
169 .context("Failed to schedule ListFinish command")?;
170
171 tracing::info!(
172 %table_id,
173 %associated_source_id,
174 "ListFinish command scheduled successfully"
175 );
176 }
177 Ok(())
178 }
179
180 async fn handle_load_finished_source_ids(
181 &self,
182 load_finished: Vec<PbLoadFinishedSource>,
183 ) -> MetaResult<()> {
184 let mut load_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
185
186 for load_finished in load_finished {
187 let table_id = load_finished.table_id;
188 let associated_source_id = load_finished.associated_source_id;
189 load_finished_info
190 .entry((table_id, associated_source_id))
191 .or_default()
192 .insert(load_finished.reporter_actor_id);
193 }
194
195 for ((table_id, associated_source_id), actors) in load_finished_info {
196 let allow_yield = self
197 .refresh_manager
198 .mark_load_stage_finished(table_id, &actors)?;
199
200 if !allow_yield {
201 continue;
202 }
203
204 let Some(database_id) = self
205 .get_source_database_id_for_refresh_stage(table_id, associated_source_id, "load")
206 .await?
207 else {
208 continue;
209 };
210
211 let load_finish_command = Command::LoadFinish {
213 table_id,
214 associated_source_id,
215 };
216
217 self.barrier_scheduler
219 .run_command_no_wait(database_id, load_finish_command)
220 .context("Failed to schedule LoadFinish command")?;
221
222 tracing::info!(
223 %table_id,
224 %associated_source_id,
225 "LoadFinish command scheduled successfully"
226 );
227 }
228
229 Ok(())
230 }
231
232 async fn handle_refresh_finished_table_ids(
233 &self,
234 refresh_finished_table_job_ids: Vec<JobId>,
235 ) -> MetaResult<()> {
236 for job_id in refresh_finished_table_job_ids {
237 let table_id = job_id.as_mv_table_id();
238
239 self.refresh_manager.mark_refresh_complete(table_id).await?;
240 }
241
242 Ok(())
243 }
244}
245
246impl GlobalBarrierWorkerContextImpl {
247 async fn get_source_database_id_for_refresh_stage(
248 &self,
249 table_id: TableId,
250 associated_source_id: SourceId,
251 stage: &'static str,
252 ) -> MetaResult<Option<DatabaseId>> {
253 match self
254 .metadata_manager
255 .catalog_controller
256 .get_object_database_id(associated_source_id)
257 .await
258 {
259 Ok(database_id) => Ok(Some(database_id)),
260 Err(err) if err.is_catalog_id_not_found("object") => {
261 tracing::warn!(
262 %table_id,
263 %associated_source_id,
264 stage,
265 "skip refresh finish command because associated source is already dropped"
266 );
267 Ok(None)
268 }
269 Err(err) => Err(err)
270 .with_context(|| {
271 format!(
272 "failed to get database id for refresh stage: table_id={}, associated_source_id={}, stage={stage}",
273 table_id, associated_source_id
274 )
275 })
276 .map_err(Into::into),
277 }
278 }
279
280 fn set_status(&self, new_status: BarrierManagerStatus) {
281 self.status.store(Arc::new(new_status));
282 }
283}
284
285impl PostCollectCommand {
286 pub async fn post_collect(
289 self,
290 barrier_manager_context: &GlobalBarrierWorkerContextImpl,
291 ) -> MetaResult<()> {
292 match self {
293 PostCollectCommand::Command(_) => {}
294 PostCollectCommand::SourceChangeSplit {
295 split_assignment: assignment,
296 } => {
297 barrier_manager_context
298 .metadata_manager
299 .update_fragment_splits(&assignment)
300 .await?;
301 }
302
303 PostCollectCommand::DropStreamingJobs => {}
304 PostCollectCommand::ConnectorPropsChange(obj_id_map_props) => {
305 barrier_manager_context
307 .source_manager
308 .apply_source_change(SourceChange::UpdateSourceProps {
309 source_id_map_new_props: obj_id_map_props
312 .iter()
313 .map(|(object_id, props)| (object_id.as_source_id(), props.clone()))
314 .collect(),
315 })
316 .await;
317 }
318 PostCollectCommand::ResumeBackfill { target } => match target {
319 ResumeBackfillTarget::Job(job_id) => {
320 barrier_manager_context
321 .metadata_manager
322 .catalog_controller
323 .update_backfill_orders_by_job_id(job_id, None)
324 .await?;
325 }
326 ResumeBackfillTarget::Fragment(fragment_id) => {
327 let mut job_ids = barrier_manager_context
328 .metadata_manager
329 .catalog_controller
330 .get_fragment_job_id(vec![fragment_id])
331 .await?;
332 let job_id = job_ids.pop().ok_or_else(|| {
333 MetaError::invalid_parameter("fragment not found".to_owned())
334 })?;
335 let job_id = JobId::new(job_id.as_raw_id());
336
337 let extra_info = barrier_manager_context
338 .metadata_manager
339 .catalog_controller
340 .get_streaming_job_extra_info(vec![job_id])
341 .await?;
342 let mut backfill_orders: BackfillOrders = extra_info
343 .get(&job_id)
344 .cloned()
345 .ok_or_else(|| MetaError::invalid_parameter("job not found".to_owned()))?
346 .backfill_orders
347 .unwrap_or_default();
348
349 let resumed_fragment_id = fragment_id.as_raw_id();
350 for children in backfill_orders.0.values_mut() {
351 children.retain(|child| *child != resumed_fragment_id);
352 }
353 backfill_orders.0.retain(|_, children| !children.is_empty());
354
355 barrier_manager_context
356 .metadata_manager
357 .catalog_controller
358 .update_backfill_orders_by_job_id(job_id, Some(backfill_orders))
359 .await?;
360 }
361 },
362 PostCollectCommand::CreateStreamingJob {
363 info,
364 job_type,
365 cross_db_snapshot_backfill_info,
366 resolved_split_assignment,
367 } => {
368 match &job_type {
369 CreateStreamingJobType::SinkIntoTable(_) | CreateStreamingJobType::Normal => {
370 barrier_manager_context
371 .metadata_manager
372 .catalog_controller
373 .fill_snapshot_backfill_epoch(
374 info.stream_job_fragments.fragments.iter().filter_map(
375 |(fragment_id, fragment)| {
376 if fragment.fragment_type_mask.contains(
377 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
378 ) {
379 Some(*fragment_id as _)
380 } else {
381 None
382 }
383 },
384 ),
385 None,
386 &cross_db_snapshot_backfill_info,
387 )
388 .await?
389 }
390 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
391 barrier_manager_context
392 .metadata_manager
393 .catalog_controller
394 .fill_snapshot_backfill_epoch(
395 info.stream_job_fragments.fragments.iter().filter_map(
396 |(fragment_id, fragment)| {
397 if fragment.fragment_type_mask.contains_any([
398 FragmentTypeFlag::SnapshotBackfillStreamScan,
399 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
400 ]) {
401 Some(*fragment_id as _)
402 } else {
403 None
404 }
405 },
406 ),
407 Some(snapshot_backfill_info),
408 &cross_db_snapshot_backfill_info,
409 )
410 .await?
411 }
412 }
413
414 let CreateStreamingJobCommandInfo {
417 stream_job_fragments,
418 upstream_fragment_downstreams,
419 ..
420 } = info;
421 let new_sink_downstream =
422 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
423 let new_downstreams = ctx.new_sink_downstream.clone();
424 let new_downstreams = FragmentDownstreamRelation::from([(
425 ctx.sink_fragment_id,
426 vec![new_downstreams],
427 )]);
428 Some(new_downstreams)
429 } else {
430 None
431 };
432
433 barrier_manager_context
434 .metadata_manager
435 .catalog_controller
436 .post_collect_job_fragments(
437 stream_job_fragments.stream_job_id(),
438 &upstream_fragment_downstreams,
439 new_sink_downstream,
440 Some(&resolved_split_assignment),
441 )
442 .await?;
443
444 let source_change = SourceChange::CreateJob {
445 added_source_fragments: stream_job_fragments.stream_source_fragments(),
446 added_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
447 };
448
449 barrier_manager_context
450 .source_manager
451 .apply_source_change(source_change)
452 .await;
453 }
454 PostCollectCommand::Reschedule { reschedules, .. } => {
455 let fragment_splits = reschedules
456 .iter()
457 .map(|(fragment_id, reschedule)| {
458 (*fragment_id, reschedule.actor_splits.clone())
459 })
460 .collect();
461
462 barrier_manager_context
463 .metadata_manager
464 .update_fragment_splits(&fragment_splits)
465 .await?;
466 }
467
468 PostCollectCommand::ReplaceStreamJob {
469 plan: replace_plan,
470 resolved_split_assignment,
471 } => {
472 let ReplaceStreamJobPlan {
473 old_fragments,
474 new_fragments,
475 upstream_fragment_downstreams,
476 to_drop_state_table_ids,
477 auto_refresh_schema_sinks,
478 ..
479 } = &replace_plan;
480 barrier_manager_context
482 .metadata_manager
483 .catalog_controller
484 .post_collect_job_fragments(
485 new_fragments.stream_job_id,
486 upstream_fragment_downstreams,
487 None,
488 Some(&resolved_split_assignment),
489 )
490 .await?;
491
492 if let Some(sinks) = auto_refresh_schema_sinks {
493 for sink in sinks {
494 barrier_manager_context
495 .metadata_manager
496 .catalog_controller
497 .post_collect_job_fragments(
498 sink.tmp_sink_id.as_job_id(),
499 &Default::default(), None, None, )
503 .await?;
504 }
505 }
506
507 barrier_manager_context
509 .source_manager
510 .handle_replace_job(
511 old_fragments,
512 new_fragments.stream_source_fragments(),
513 &replace_plan,
514 )
515 .await;
516 cleanup_dropped_streaming_jobs(
517 &barrier_manager_context.refresh_manager,
518 &barrier_manager_context.hummock_manager,
519 &barrier_manager_context.metadata_manager,
520 [],
521 to_drop_state_table_ids.clone(),
522 "replace_streaming_job",
523 )
524 .await?;
525 }
526
527 PostCollectCommand::CreateSubscription { subscription_id } => {
528 barrier_manager_context
529 .metadata_manager
530 .catalog_controller
531 .finish_create_subscription_catalog(subscription_id)
532 .await?
533 }
534 }
535
536 Ok(())
537 }
538}
539
540#[cfg(test)]
541mod tests {
542 use super::*;
543
544 #[test]
545 fn test_skip_refresh_finish_when_associated_source_missing() {
546 let err = MetaError::catalog_id_not_found("object", 42);
547 assert!(err.is_catalog_id_not_found("object"));
548 }
549
550 #[test]
551 fn test_do_not_skip_refresh_finish_for_other_not_found_types() {
552 let err = MetaError::catalog_id_not_found("table", 42);
553 assert!(!err.is_catalog_id_not_found("object"));
554 }
555}