risingwave_meta/barrier/context/
context_impl.rs1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::Context;
19use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
20use risingwave_common::id::JobId;
21use risingwave_meta_model::ActorId;
22use risingwave_meta_model::streaming_job::BackfillOrders;
23use risingwave_pb::common::WorkerNode;
24use risingwave_pb::hummock::HummockVersionStats;
25use risingwave_pb::id::SourceId;
26use risingwave_pb::stream_service::barrier_complete_response::{
27 PbListFinishedSource, PbLoadFinishedSource,
28};
29use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
30use risingwave_rpc_client::StreamingControlHandle;
31
32use crate::barrier::cdc_progress::CdcTableBackfillTracker;
33use crate::barrier::command::{PostCollectCommand, ResumeBackfillTarget};
34use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
35use crate::barrier::progress::TrackingJob;
36use crate::barrier::schedule::MarkReadyOptions;
37use crate::barrier::{
38 BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
39 CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
40 Scheduled,
41};
42use crate::hummock::CommitEpochInfo;
43use crate::manager::LocalNotification;
44use crate::model::FragmentDownstreamRelation;
45use crate::stream::SourceChange;
46use crate::{MetaError, MetaResult};
47
48impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
49 #[await_tree::instrument]
50 async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
51 self.hummock_manager.commit_epoch(commit_info).await?;
52 Ok(self.hummock_manager.get_version_stats().await)
53 }
54
55 #[await_tree::instrument("next_scheduled_barrier")]
56 async fn next_scheduled(&self) -> Scheduled {
57 self.scheduled_barriers.next_scheduled().await
58 }
59
60 fn abort_and_mark_blocked(
61 &self,
62 database_id: Option<DatabaseId>,
63 recovery_reason: RecoveryReason,
64 ) {
65 if database_id.is_none() {
66 self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
67 }
68
69 self.scheduled_barriers
71 .abort_and_mark_blocked(database_id, "cluster is under recovering");
72 }
73
74 fn mark_ready(&self, options: MarkReadyOptions) {
75 let is_global = matches!(&options, MarkReadyOptions::Global { .. });
76 self.scheduled_barriers.mark_ready(options);
77 if is_global {
78 self.set_status(BarrierManagerStatus::Running);
79 }
80 }
81
82 #[await_tree::instrument("post_collect_command({command})")]
83 async fn post_collect_command(&self, command: PostCollectCommand) -> MetaResult<()> {
84 command.post_collect(self).await
85 }
86
87 async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
88 self.metadata_manager
89 .notify_finish_failed(database_id, err)
90 .await
91 }
92
93 #[await_tree::instrument("finish_creating_job({job})")]
94 async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
95 let job_id = job.job_id();
96 job.finish(&self.metadata_manager, &self.source_manager)
97 .await?;
98 self.env
99 .notification_manager()
100 .notify_local_subscribers(LocalNotification::StreamingJobBackfillFinished(job_id));
101 Ok(())
102 }
103
104 #[await_tree::instrument("finish_cdc_table_backfill({job})")]
105 async fn finish_cdc_table_backfill(&self, job: JobId) -> MetaResult<()> {
106 CdcTableBackfillTracker::mark_complete_job(&self.env.meta_store().conn, job).await
107 }
108
109 #[await_tree::instrument("new_control_stream({})", node.id)]
110 async fn new_control_stream(
111 &self,
112 node: &WorkerNode,
113 init_request: &PbInitRequest,
114 ) -> MetaResult<StreamingControlHandle> {
115 self.new_control_stream_impl(node, init_request).await
116 }
117
118 async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
119 self.reload_runtime_info_impl().await
120 }
121
122 async fn reload_database_runtime_info(
123 &self,
124 database_id: DatabaseId,
125 ) -> MetaResult<DatabaseRuntimeInfoSnapshot> {
126 self.reload_database_runtime_info_impl(database_id).await
127 }
128
129 async fn handle_list_finished_source_ids(
130 &self,
131 list_finished: Vec<PbListFinishedSource>,
132 ) -> MetaResult<()> {
133 let mut list_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
134
135 for list_finished in list_finished {
136 let table_id = list_finished.table_id;
137 let associated_source_id = list_finished.associated_source_id;
138 list_finished_info
139 .entry((table_id, associated_source_id))
140 .or_default()
141 .insert(list_finished.reporter_actor_id);
142 }
143
144 for ((table_id, associated_source_id), actors) in list_finished_info {
145 let allow_yield = self
146 .refresh_manager
147 .mark_list_stage_finished(table_id, &actors)?;
148
149 if !allow_yield {
150 continue;
151 }
152
153 let database_id = self
155 .metadata_manager
156 .catalog_controller
157 .get_object_database_id(associated_source_id)
158 .await
159 .context("Failed to get database id for table")?;
160
161 let list_finish_command = Command::ListFinish {
163 table_id,
164 associated_source_id,
165 };
166
167 self.barrier_scheduler
169 .run_command_no_wait(database_id, list_finish_command)
170 .context("Failed to schedule ListFinish command")?;
171
172 tracing::info!(
173 %table_id,
174 %associated_source_id,
175 "ListFinish command scheduled successfully"
176 );
177 }
178 Ok(())
179 }
180
181 async fn handle_load_finished_source_ids(
182 &self,
183 load_finished: Vec<PbLoadFinishedSource>,
184 ) -> MetaResult<()> {
185 let mut load_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
186
187 for load_finished in load_finished {
188 let table_id = load_finished.table_id;
189 let associated_source_id = load_finished.associated_source_id;
190 load_finished_info
191 .entry((table_id, associated_source_id))
192 .or_default()
193 .insert(load_finished.reporter_actor_id);
194 }
195
196 for ((table_id, associated_source_id), actors) in load_finished_info {
197 let allow_yield = self
198 .refresh_manager
199 .mark_load_stage_finished(table_id, &actors)?;
200
201 if !allow_yield {
202 continue;
203 }
204
205 let database_id = self
207 .metadata_manager
208 .catalog_controller
209 .get_object_database_id(associated_source_id)
210 .await
211 .context("Failed to get database id for table")?;
212
213 let load_finish_command = Command::LoadFinish {
215 table_id,
216 associated_source_id,
217 };
218
219 self.barrier_scheduler
221 .run_command_no_wait(database_id, load_finish_command)
222 .context("Failed to schedule LoadFinish command")?;
223
224 tracing::info!(
225 %table_id,
226 %associated_source_id,
227 "LoadFinish command scheduled successfully"
228 );
229 }
230
231 Ok(())
232 }
233
234 async fn handle_refresh_finished_table_ids(
235 &self,
236 refresh_finished_table_job_ids: Vec<JobId>,
237 ) -> MetaResult<()> {
238 for job_id in refresh_finished_table_job_ids {
239 let table_id = job_id.as_mv_table_id();
240
241 self.refresh_manager.mark_refresh_complete(table_id).await?;
242 }
243
244 Ok(())
245 }
246}
247
248impl GlobalBarrierWorkerContextImpl {
249 fn set_status(&self, new_status: BarrierManagerStatus) {
250 self.status.store(Arc::new(new_status));
251 }
252}
253
254impl PostCollectCommand {
255 pub async fn post_collect(
258 self,
259 barrier_manager_context: &GlobalBarrierWorkerContextImpl,
260 ) -> MetaResult<()> {
261 match self {
262 PostCollectCommand::Command(_) => {}
263 PostCollectCommand::SourceChangeSplit {
264 split_assignment: assignment,
265 } => {
266 barrier_manager_context
267 .metadata_manager
268 .update_fragment_splits(&assignment)
269 .await?;
270 }
271
272 PostCollectCommand::DropStreamingJobs {
273 streaming_job_ids,
274 unregistered_state_table_ids,
275 } => {
276 for job_id in streaming_job_ids {
277 barrier_manager_context
278 .refresh_manager
279 .remove_progress_tracker(job_id.as_mv_table_id(), "drop_streaming_jobs");
280 }
281
282 barrier_manager_context
283 .hummock_manager
284 .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
285 .await?;
286 barrier_manager_context
287 .metadata_manager
288 .catalog_controller
289 .complete_dropped_tables(unregistered_state_table_ids.iter().copied())
290 .await;
291 }
292 PostCollectCommand::ConnectorPropsChange(obj_id_map_props) => {
293 barrier_manager_context
295 .source_manager
296 .apply_source_change(SourceChange::UpdateSourceProps {
297 source_id_map_new_props: obj_id_map_props
300 .iter()
301 .map(|(object_id, props)| (object_id.as_source_id(), props.clone()))
302 .collect(),
303 })
304 .await;
305 }
306 PostCollectCommand::ResumeBackfill { target } => match target {
307 ResumeBackfillTarget::Job(job_id) => {
308 barrier_manager_context
309 .metadata_manager
310 .catalog_controller
311 .update_backfill_orders_by_job_id(job_id, None)
312 .await?;
313 }
314 ResumeBackfillTarget::Fragment(fragment_id) => {
315 let mut job_ids = barrier_manager_context
316 .metadata_manager
317 .catalog_controller
318 .get_fragment_job_id(vec![fragment_id])
319 .await?;
320 let job_id = job_ids.pop().ok_or_else(|| {
321 MetaError::invalid_parameter("fragment not found".to_owned())
322 })?;
323 let job_id = JobId::new(job_id.as_raw_id());
324
325 let extra_info = barrier_manager_context
326 .metadata_manager
327 .catalog_controller
328 .get_streaming_job_extra_info(vec![job_id])
329 .await?;
330 let mut backfill_orders: BackfillOrders = extra_info
331 .get(&job_id)
332 .cloned()
333 .ok_or_else(|| MetaError::invalid_parameter("job not found".to_owned()))?
334 .backfill_orders
335 .unwrap_or_default();
336
337 let resumed_fragment_id = fragment_id.as_raw_id();
338 for children in backfill_orders.0.values_mut() {
339 children.retain(|child| *child != resumed_fragment_id);
340 }
341 backfill_orders.0.retain(|_, children| !children.is_empty());
342
343 barrier_manager_context
344 .metadata_manager
345 .catalog_controller
346 .update_backfill_orders_by_job_id(job_id, Some(backfill_orders))
347 .await?;
348 }
349 },
350 PostCollectCommand::CreateStreamingJob {
351 info,
352 job_type,
353 cross_db_snapshot_backfill_info,
354 } => {
355 match &job_type {
356 CreateStreamingJobType::SinkIntoTable(_) | CreateStreamingJobType::Normal => {
357 barrier_manager_context
358 .metadata_manager
359 .catalog_controller
360 .fill_snapshot_backfill_epoch(
361 info.stream_job_fragments.fragments.iter().filter_map(
362 |(fragment_id, fragment)| {
363 if fragment.fragment_type_mask.contains(
364 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
365 ) {
366 Some(*fragment_id as _)
367 } else {
368 None
369 }
370 },
371 ),
372 None,
373 &cross_db_snapshot_backfill_info,
374 )
375 .await?
376 }
377 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
378 barrier_manager_context
379 .metadata_manager
380 .catalog_controller
381 .fill_snapshot_backfill_epoch(
382 info.stream_job_fragments.fragments.iter().filter_map(
383 |(fragment_id, fragment)| {
384 if fragment.fragment_type_mask.contains_any([
385 FragmentTypeFlag::SnapshotBackfillStreamScan,
386 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
387 ]) {
388 Some(*fragment_id as _)
389 } else {
390 None
391 }
392 },
393 ),
394 Some(snapshot_backfill_info),
395 &cross_db_snapshot_backfill_info,
396 )
397 .await?
398 }
399 }
400
401 let CreateStreamingJobCommandInfo {
404 stream_job_fragments,
405 upstream_fragment_downstreams,
406 ..
407 } = info;
408 let new_sink_downstream =
409 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
410 let new_downstreams = ctx.new_sink_downstream.clone();
411 let new_downstreams = FragmentDownstreamRelation::from([(
412 ctx.sink_fragment_id,
413 vec![new_downstreams],
414 )]);
415 Some(new_downstreams)
416 } else {
417 None
418 };
419
420 barrier_manager_context
421 .metadata_manager
422 .catalog_controller
423 .post_collect_job_fragments(
424 stream_job_fragments.stream_job_id(),
425 &upstream_fragment_downstreams,
426 new_sink_downstream,
427 Some(&info.init_split_assignment),
428 )
429 .await?;
430
431 let source_change = SourceChange::CreateJob {
432 added_source_fragments: stream_job_fragments.stream_source_fragments(),
433 added_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
434 };
435
436 barrier_manager_context
437 .source_manager
438 .apply_source_change(source_change)
439 .await;
440 }
441 PostCollectCommand::Reschedule { reschedules, .. } => {
442 let fragment_splits = reschedules
443 .iter()
444 .map(|(fragment_id, reschedule)| {
445 (*fragment_id, reschedule.actor_splits.clone())
446 })
447 .collect();
448
449 barrier_manager_context
450 .metadata_manager
451 .update_fragment_splits(&fragment_splits)
452 .await?;
453 }
454
455 PostCollectCommand::ReplaceStreamJob(replace_plan) => {
456 let ReplaceStreamJobPlan {
457 old_fragments,
458 new_fragments,
459 upstream_fragment_downstreams,
460 to_drop_state_table_ids,
461 auto_refresh_schema_sinks,
462 init_split_assignment,
463 ..
464 } = &replace_plan;
465 barrier_manager_context
467 .metadata_manager
468 .catalog_controller
469 .post_collect_job_fragments(
470 new_fragments.stream_job_id,
471 upstream_fragment_downstreams,
472 None,
473 Some(init_split_assignment),
474 )
475 .await?;
476
477 if let Some(sinks) = auto_refresh_schema_sinks {
478 for sink in sinks {
479 barrier_manager_context
480 .metadata_manager
481 .catalog_controller
482 .post_collect_job_fragments(
483 sink.tmp_sink_id.as_job_id(),
484 &Default::default(), None, None, )
488 .await?;
489 }
490 }
491
492 barrier_manager_context
494 .source_manager
495 .handle_replace_job(
496 old_fragments,
497 new_fragments.stream_source_fragments(),
498 &replace_plan,
499 )
500 .await;
501 barrier_manager_context
502 .hummock_manager
503 .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
504 .await?;
505 }
506
507 PostCollectCommand::CreateSubscription { subscription_id } => {
508 barrier_manager_context
509 .metadata_manager
510 .catalog_controller
511 .finish_create_subscription_catalog(subscription_id)
512 .await?
513 }
514 }
515
516 Ok(())
517 }
518}