risingwave_meta/barrier/context/
context_impl.rs1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::{Context, anyhow};
19use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
20use risingwave_common::id::JobId;
21use risingwave_meta_model::table::RefreshState;
22use risingwave_pb::common::WorkerNode;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::id::{ActorId, SourceId};
25use risingwave_pb::stream_service::barrier_complete_response::{
26 PbListFinishedSource, PbLoadFinishedSource,
27};
28use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
29use risingwave_rpc_client::StreamingControlHandle;
30
31use crate::barrier::command::CommandContext;
32use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
33use crate::barrier::progress::TrackingJob;
34use crate::barrier::schedule::MarkReadyOptions;
35use crate::barrier::{
36 BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
37 CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
38 Scheduled,
39};
40use crate::hummock::CommitEpochInfo;
41use crate::model::FragmentDownstreamRelation;
42use crate::stream::{REFRESH_TABLE_PROGRESS_TRACKER, SourceChange, SplitState};
43use crate::{MetaError, MetaResult};
44
45impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
46 #[await_tree::instrument]
47 async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
48 self.hummock_manager.commit_epoch(commit_info).await?;
49 Ok(self.hummock_manager.get_version_stats().await)
50 }
51
52 #[await_tree::instrument("next_scheduled_barrier")]
53 async fn next_scheduled(&self) -> Scheduled {
54 self.scheduled_barriers.next_scheduled().await
55 }
56
57 fn abort_and_mark_blocked(
58 &self,
59 database_id: Option<DatabaseId>,
60 recovery_reason: RecoveryReason,
61 ) {
62 if database_id.is_none() {
63 self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
64 }
65
66 self.scheduled_barriers
68 .abort_and_mark_blocked(database_id, "cluster is under recovering");
69 }
70
71 fn mark_ready(&self, options: MarkReadyOptions) {
72 let is_global = matches!(&options, MarkReadyOptions::Global { .. });
73 self.scheduled_barriers.mark_ready(options);
74 if is_global {
75 self.set_status(BarrierManagerStatus::Running);
76 }
77 }
78
79 #[await_tree::instrument("post_collect_command({command})")]
80 async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> {
81 command.post_collect(self).await
82 }
83
84 async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
85 self.metadata_manager
86 .notify_finish_failed(database_id, err)
87 .await
88 }
89
90 #[await_tree::instrument("finish_creating_job({job})")]
91 async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
92 job.finish(&self.metadata_manager, &self.source_manager)
93 .await
94 }
95
96 #[await_tree::instrument("finish_cdc_table_backfill({job})")]
97 async fn finish_cdc_table_backfill(&self, job: JobId) -> MetaResult<()> {
98 self.env.cdc_table_backfill_tracker.complete_job(job).await
99 }
100
101 #[await_tree::instrument("new_control_stream({})", node.id)]
102 async fn new_control_stream(
103 &self,
104 node: &WorkerNode,
105 init_request: &PbInitRequest,
106 ) -> MetaResult<StreamingControlHandle> {
107 self.new_control_stream_impl(node, init_request).await
108 }
109
110 async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
111 self.reload_runtime_info_impl().await
112 }
113
114 async fn reload_database_runtime_info(
115 &self,
116 database_id: DatabaseId,
117 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
118 self.reload_database_runtime_info_impl(database_id).await
119 }
120
121 async fn handle_list_finished_source_ids(
122 &self,
123 list_finished: Vec<PbListFinishedSource>,
124 ) -> MetaResult<()> {
125 let mut list_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
126
127 for list_finished in list_finished {
128 let table_id = list_finished.table_id;
129 let associated_source_id = list_finished.associated_source_id;
130 list_finished_info
131 .entry((table_id, associated_source_id))
132 .or_default()
133 .insert(list_finished.reporter_actor_id);
134 }
135
136 for ((table_id, associated_source_id), actors) in list_finished_info {
137 let allow_yield = {
138 let mut lock_handle = REFRESH_TABLE_PROGRESS_TRACKER.lock();
139 let single_task_tracker =
140 lock_handle.inner.get_mut(&table_id).ok_or_else(|| {
141 MetaError::from(anyhow!("Table tracker not found for table {}", table_id))
142 })?;
143 single_task_tracker.report_list_finished(actors.iter().copied());
144 let allow_yield = single_task_tracker.is_list_finished()?;
145
146 Ok::<_, MetaError>(allow_yield)
147 }?;
148
149 if !allow_yield {
150 continue;
151 }
152
153 let database_id = self
155 .metadata_manager
156 .catalog_controller
157 .get_object_database_id(associated_source_id)
158 .await
159 .context("Failed to get database id for table")?;
160
161 let list_finish_command = Command::ListFinish {
163 table_id,
164 associated_source_id,
165 };
166
167 self.barrier_scheduler
169 .run_command_no_wait(database_id, list_finish_command)
170 .context("Failed to schedule ListFinish command")?;
171
172 tracing::info!(
173 %table_id,
174 %associated_source_id,
175 "ListFinish command scheduled successfully"
176 );
177 }
178 Ok(())
179 }
180
181 async fn handle_load_finished_source_ids(
182 &self,
183 load_finished: Vec<PbLoadFinishedSource>,
184 ) -> MetaResult<()> {
185 let mut load_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
186
187 for load_finished in load_finished {
188 let table_id = load_finished.table_id;
189 let associated_source_id = load_finished.associated_source_id;
190 load_finished_info
191 .entry((table_id, associated_source_id))
192 .or_default()
193 .insert(load_finished.reporter_actor_id);
194 }
195
196 for ((table_id, associated_source_id), actors) in load_finished_info {
197 let allow_yield = {
198 let mut lock_handle = REFRESH_TABLE_PROGRESS_TRACKER.lock();
199 let single_task_tracker =
200 lock_handle.inner.get_mut(&table_id).ok_or_else(|| {
201 MetaError::from(anyhow!("Table tracker not found for table {}", table_id))
202 })?;
203 single_task_tracker.report_load_finished(actors.iter().copied());
204 let allow_yield = single_task_tracker.is_load_finished()?;
205
206 Ok::<_, MetaError>(allow_yield)
207 }?;
208
209 if !allow_yield {
210 continue;
211 }
212
213 let database_id = self
215 .metadata_manager
216 .catalog_controller
217 .get_object_database_id(associated_source_id)
218 .await
219 .context("Failed to get database id for table")?;
220
221 let load_finish_command = Command::LoadFinish {
223 table_id,
224 associated_source_id,
225 };
226
227 self.barrier_scheduler
229 .run_command_no_wait(database_id, load_finish_command)
230 .context("Failed to schedule LoadFinish command")?;
231
232 tracing::info!(
233 %table_id,
234 %associated_source_id,
235 "LoadFinish command scheduled successfully"
236 );
237 }
238
239 Ok(())
240 }
241
242 async fn handle_refresh_finished_table_ids(
243 &self,
244 refresh_finished_table_job_ids: Vec<JobId>,
245 ) -> MetaResult<()> {
246 for job_id in refresh_finished_table_job_ids {
247 {
248 let table_id = &job_id.as_mv_table_id();
249 let mut lock_handle = REFRESH_TABLE_PROGRESS_TRACKER.lock();
250 let remove_res = lock_handle.inner.remove(table_id);
251 debug_assert!(remove_res.is_some());
252
253 lock_handle
255 .table_id_by_database_id
256 .values_mut()
257 .for_each(|table_ids| {
258 table_ids.remove(table_id);
259 });
260 }
261
262 self.metadata_manager
264 .catalog_controller
265 .set_table_refresh_state(job_id.as_mv_table_id(), RefreshState::Idle)
266 .await
267 .context("Failed to set table refresh state to Idle")?;
268
269 tracing::info!(%job_id, "Table refresh completed, state updated to Idle");
270 }
271
272 Ok(())
273 }
274}
275
276impl GlobalBarrierWorkerContextImpl {
277 fn set_status(&self, new_status: BarrierManagerStatus) {
278 self.status.store(Arc::new(new_status));
279 }
280}
281
282impl CommandContext {
283 pub async fn post_collect(
286 &self,
287 barrier_manager_context: &GlobalBarrierWorkerContextImpl,
288 ) -> MetaResult<()> {
289 let Some(command) = &self.command else {
290 return Ok(());
291 };
292 match command {
293 Command::Flush => {}
294
295 Command::Throttle(_) => {}
296
297 Command::Pause => {}
298
299 Command::Resume => {}
300
301 Command::SourceChangeSplit(SplitState {
302 split_assignment: assignment,
303 ..
304 }) => {
305 barrier_manager_context
306 .metadata_manager
307 .update_fragment_splits(assignment)
308 .await?;
309 }
310
311 Command::DropStreamingJobs {
312 unregistered_state_table_ids,
313 ..
314 } => {
315 barrier_manager_context
316 .hummock_manager
317 .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
318 .await?;
319 barrier_manager_context
320 .metadata_manager
321 .catalog_controller
322 .complete_dropped_tables(unregistered_state_table_ids.iter().copied())
323 .await;
324 }
325 Command::ConnectorPropsChange(obj_id_map_props) => {
326 barrier_manager_context
328 .source_manager
329 .apply_source_change(SourceChange::UpdateSourceProps {
330 source_id_map_new_props: obj_id_map_props
331 .iter()
332 .map(|(source_id, props)| (source_id.as_source_id(), props.clone()))
333 .collect(),
334 })
335 .await;
336 }
337 Command::CreateStreamingJob {
338 info,
339 job_type,
340 cross_db_snapshot_backfill_info,
341 } => {
342 match job_type {
343 CreateStreamingJobType::SinkIntoTable(_) | CreateStreamingJobType::Normal => {
344 barrier_manager_context
345 .metadata_manager
346 .catalog_controller
347 .fill_snapshot_backfill_epoch(
348 info.stream_job_fragments.fragments.iter().filter_map(
349 |(fragment_id, fragment)| {
350 if fragment.fragment_type_mask.contains(
351 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
352 ) {
353 Some(*fragment_id as _)
354 } else {
355 None
356 }
357 },
358 ),
359 None,
360 cross_db_snapshot_backfill_info,
361 )
362 .await?
363 }
364 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
365 barrier_manager_context
366 .metadata_manager
367 .catalog_controller
368 .fill_snapshot_backfill_epoch(
369 info.stream_job_fragments.fragments.iter().filter_map(
370 |(fragment_id, fragment)| {
371 if fragment.fragment_type_mask.contains_any([
372 FragmentTypeFlag::SnapshotBackfillStreamScan,
373 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
374 ]) {
375 Some(*fragment_id as _)
376 } else {
377 None
378 }
379 },
380 ),
381 Some(snapshot_backfill_info),
382 cross_db_snapshot_backfill_info,
383 )
384 .await?
385 }
386 }
387
388 let CreateStreamingJobCommandInfo {
391 stream_job_fragments,
392 upstream_fragment_downstreams,
393 ..
394 } = info;
395 let new_sink_downstream =
396 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
397 let new_downstreams = ctx.new_sink_downstream.clone();
398 let new_downstreams = FragmentDownstreamRelation::from([(
399 ctx.sink_fragment_id,
400 vec![new_downstreams],
401 )]);
402 Some(new_downstreams)
403 } else {
404 None
405 };
406
407 barrier_manager_context
408 .metadata_manager
409 .catalog_controller
410 .post_collect_job_fragments(
411 stream_job_fragments.stream_job_id(),
412 upstream_fragment_downstreams,
413 new_sink_downstream,
414 Some(&info.init_split_assignment),
415 )
416 .await?;
417
418 let source_change = SourceChange::CreateJob {
419 added_source_fragments: stream_job_fragments.stream_source_fragments(),
420 added_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
421 };
422
423 barrier_manager_context
424 .source_manager
425 .apply_source_change(source_change)
426 .await;
427 }
428 Command::RescheduleFragment { reschedules, .. } => {
429 let fragment_splits = reschedules
430 .iter()
431 .map(|(fragment_id, reschedule)| {
432 (*fragment_id, reschedule.actor_splits.clone())
433 })
434 .collect();
435
436 barrier_manager_context
437 .metadata_manager
438 .update_fragment_splits(&fragment_splits)
439 .await?;
440 }
441
442 Command::ReplaceStreamJob(
443 replace_plan @ ReplaceStreamJobPlan {
444 old_fragments,
445 new_fragments,
446 upstream_fragment_downstreams,
447 to_drop_state_table_ids,
448 auto_refresh_schema_sinks,
449 init_split_assignment,
450 ..
451 },
452 ) => {
453 barrier_manager_context
455 .metadata_manager
456 .catalog_controller
457 .post_collect_job_fragments(
458 new_fragments.stream_job_id,
459 upstream_fragment_downstreams,
460 None,
461 Some(init_split_assignment),
462 )
463 .await?;
464
465 if let Some(sinks) = auto_refresh_schema_sinks {
466 for sink in sinks {
467 barrier_manager_context
468 .metadata_manager
469 .catalog_controller
470 .post_collect_job_fragments(
471 sink.tmp_sink_id.as_job_id(),
472 &Default::default(), None, None, )
476 .await?;
477 }
478 }
479
480 barrier_manager_context
482 .source_manager
483 .handle_replace_job(
484 old_fragments,
485 new_fragments.stream_source_fragments(),
486 replace_plan,
487 )
488 .await;
489 barrier_manager_context
490 .hummock_manager
491 .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
492 .await?;
493 }
494
495 Command::CreateSubscription {
496 subscription_id, ..
497 } => {
498 barrier_manager_context
499 .metadata_manager
500 .catalog_controller
501 .finish_create_subscription_catalog(*subscription_id)
502 .await?
503 }
504 Command::DropSubscription { .. } => {}
505 Command::MergeSnapshotBackfillStreamingJobs(_) => {}
506 Command::StartFragmentBackfill { .. } => {}
507 Command::Refresh { table_id, .. } => {
508 barrier_manager_context
509 .metadata_manager
510 .catalog_controller
511 .set_table_refresh_state(*table_id, RefreshState::Refreshing)
512 .await?;
513 }
514 Command::ListFinish { .. } => {
515 }
518 Command::LoadFinish { table_id, .. } => {
519 barrier_manager_context
520 .metadata_manager
521 .catalog_controller
522 .set_table_refresh_state(*table_id, RefreshState::Finishing)
523 .await?;
524 }
525 }
526
527 Ok(())
528 }
529}