risingwave_meta/barrier/context/
context_impl.rs1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::Context;
19use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
20use risingwave_common::id::JobId;
21use risingwave_meta_model::ActorId;
22use risingwave_pb::common::WorkerNode;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::id::SourceId;
25use risingwave_pb::stream_service::barrier_complete_response::{
26 PbListFinishedSource, PbLoadFinishedSource,
27};
28use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
29use risingwave_rpc_client::StreamingControlHandle;
30
31use crate::MetaResult;
32use crate::barrier::cdc_progress::CdcTableBackfillTracker;
33use crate::barrier::command::CommandContext;
34use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
35use crate::barrier::progress::TrackingJob;
36use crate::barrier::schedule::MarkReadyOptions;
37use crate::barrier::{
38 BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
39 CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
40 Scheduled,
41};
42use crate::hummock::CommitEpochInfo;
43use crate::manager::LocalNotification;
44use crate::model::FragmentDownstreamRelation;
45use crate::stream::{SourceChange, SplitState};
46
47impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
48 #[await_tree::instrument]
49 async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
50 self.hummock_manager.commit_epoch(commit_info).await?;
51 Ok(self.hummock_manager.get_version_stats().await)
52 }
53
54 #[await_tree::instrument("next_scheduled_barrier")]
55 async fn next_scheduled(&self) -> Scheduled {
56 self.scheduled_barriers.next_scheduled().await
57 }
58
59 fn abort_and_mark_blocked(
60 &self,
61 database_id: Option<DatabaseId>,
62 recovery_reason: RecoveryReason,
63 ) {
64 if database_id.is_none() {
65 self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
66 }
67
68 self.scheduled_barriers
70 .abort_and_mark_blocked(database_id, "cluster is under recovering");
71 }
72
73 fn mark_ready(&self, options: MarkReadyOptions) {
74 let is_global = matches!(&options, MarkReadyOptions::Global { .. });
75 self.scheduled_barriers.mark_ready(options);
76 if is_global {
77 self.set_status(BarrierManagerStatus::Running);
78 }
79 }
80
81 #[await_tree::instrument("post_collect_command({command})")]
82 async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> {
83 command.post_collect(self).await
84 }
85
86 async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
87 self.metadata_manager
88 .notify_finish_failed(database_id, err)
89 .await
90 }
91
92 #[await_tree::instrument("finish_creating_job({job})")]
93 async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
94 let job_id = job.job_id();
95 job.finish(&self.metadata_manager, &self.source_manager)
96 .await?;
97 self.env
98 .notification_manager()
99 .notify_local_subscribers(LocalNotification::StreamingJobBackfillFinished(job_id));
100 Ok(())
101 }
102
103 #[await_tree::instrument("finish_cdc_table_backfill({job})")]
104 async fn finish_cdc_table_backfill(&self, job: JobId) -> MetaResult<()> {
105 CdcTableBackfillTracker::mark_complete_job(&self.env.meta_store().conn, job).await
106 }
107
108 #[await_tree::instrument("new_control_stream({})", node.id)]
109 async fn new_control_stream(
110 &self,
111 node: &WorkerNode,
112 init_request: &PbInitRequest,
113 ) -> MetaResult<StreamingControlHandle> {
114 self.new_control_stream_impl(node, init_request).await
115 }
116
117 async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
118 self.reload_runtime_info_impl().await
119 }
120
121 async fn reload_database_runtime_info(
122 &self,
123 database_id: DatabaseId,
124 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
125 self.reload_database_runtime_info_impl(database_id).await
126 }
127
128 async fn handle_list_finished_source_ids(
129 &self,
130 list_finished: Vec<PbListFinishedSource>,
131 ) -> MetaResult<()> {
132 let mut list_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
133
134 for list_finished in list_finished {
135 let table_id = list_finished.table_id;
136 let associated_source_id = list_finished.associated_source_id;
137 list_finished_info
138 .entry((table_id, associated_source_id))
139 .or_default()
140 .insert(list_finished.reporter_actor_id);
141 }
142
143 for ((table_id, associated_source_id), actors) in list_finished_info {
144 let allow_yield = self
145 .refresh_manager
146 .mark_list_stage_finished(table_id, &actors)?;
147
148 if !allow_yield {
149 continue;
150 }
151
152 let database_id = self
154 .metadata_manager
155 .catalog_controller
156 .get_object_database_id(associated_source_id)
157 .await
158 .context("Failed to get database id for table")?;
159
160 let list_finish_command = Command::ListFinish {
162 table_id,
163 associated_source_id,
164 };
165
166 self.barrier_scheduler
168 .run_command_no_wait(database_id, list_finish_command)
169 .context("Failed to schedule ListFinish command")?;
170
171 tracing::info!(
172 %table_id,
173 %associated_source_id,
174 "ListFinish command scheduled successfully"
175 );
176 }
177 Ok(())
178 }
179
180 async fn handle_load_finished_source_ids(
181 &self,
182 load_finished: Vec<PbLoadFinishedSource>,
183 ) -> MetaResult<()> {
184 let mut load_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
185
186 for load_finished in load_finished {
187 let table_id = load_finished.table_id;
188 let associated_source_id = load_finished.associated_source_id;
189 load_finished_info
190 .entry((table_id, associated_source_id))
191 .or_default()
192 .insert(load_finished.reporter_actor_id);
193 }
194
195 for ((table_id, associated_source_id), actors) in load_finished_info {
196 let allow_yield = self
197 .refresh_manager
198 .mark_load_stage_finished(table_id, &actors)?;
199
200 if !allow_yield {
201 continue;
202 }
203
204 let database_id = self
206 .metadata_manager
207 .catalog_controller
208 .get_object_database_id(associated_source_id)
209 .await
210 .context("Failed to get database id for table")?;
211
212 let load_finish_command = Command::LoadFinish {
214 table_id,
215 associated_source_id,
216 };
217
218 self.barrier_scheduler
220 .run_command_no_wait(database_id, load_finish_command)
221 .context("Failed to schedule LoadFinish command")?;
222
223 tracing::info!(
224 %table_id,
225 %associated_source_id,
226 "LoadFinish command scheduled successfully"
227 );
228 }
229
230 Ok(())
231 }
232
233 async fn handle_refresh_finished_table_ids(
234 &self,
235 refresh_finished_table_job_ids: Vec<JobId>,
236 ) -> MetaResult<()> {
237 for job_id in refresh_finished_table_job_ids {
238 let table_id = job_id.as_mv_table_id();
239
240 self.refresh_manager.mark_refresh_complete(table_id).await?;
241 }
242
243 Ok(())
244 }
245}
246
247impl GlobalBarrierWorkerContextImpl {
248 fn set_status(&self, new_status: BarrierManagerStatus) {
249 self.status.store(Arc::new(new_status));
250 }
251}
252
253impl CommandContext {
254 pub async fn post_collect(
257 &self,
258 barrier_manager_context: &GlobalBarrierWorkerContextImpl,
259 ) -> MetaResult<()> {
260 let Some(command) = &self.command else {
261 return Ok(());
262 };
263 match command {
264 Command::Flush => {}
265
266 Command::Throttle(_) => {}
267
268 Command::Pause => {}
269
270 Command::Resume => {}
271
272 Command::SourceChangeSplit(SplitState {
273 split_assignment: assignment,
274 ..
275 }) => {
276 barrier_manager_context
277 .metadata_manager
278 .update_fragment_splits(assignment)
279 .await?;
280 }
281
282 Command::DropStreamingJobs {
283 streaming_job_ids,
284 unregistered_state_table_ids,
285 ..
286 } => {
287 for job_id in streaming_job_ids {
288 barrier_manager_context
289 .refresh_manager
290 .remove_progress_tracker(job_id.as_mv_table_id(), "drop_streaming_jobs");
291 }
292
293 barrier_manager_context
294 .hummock_manager
295 .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
296 .await?;
297 barrier_manager_context
298 .metadata_manager
299 .catalog_controller
300 .complete_dropped_tables(unregistered_state_table_ids.iter().copied())
301 .await;
302 }
303 Command::ConnectorPropsChange(obj_id_map_props) => {
304 barrier_manager_context
306 .source_manager
307 .apply_source_change(SourceChange::UpdateSourceProps {
308 source_id_map_new_props: obj_id_map_props
309 .iter()
310 .map(|(source_id, props)| (source_id.as_source_id(), props.clone()))
311 .collect(),
312 })
313 .await;
314 }
315 Command::CreateStreamingJob {
316 info,
317 job_type,
318 cross_db_snapshot_backfill_info,
319 } => {
320 match job_type {
321 CreateStreamingJobType::SinkIntoTable(_) | CreateStreamingJobType::Normal => {
322 barrier_manager_context
323 .metadata_manager
324 .catalog_controller
325 .fill_snapshot_backfill_epoch(
326 info.stream_job_fragments.fragments.iter().filter_map(
327 |(fragment_id, fragment)| {
328 if fragment.fragment_type_mask.contains(
329 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
330 ) {
331 Some(*fragment_id as _)
332 } else {
333 None
334 }
335 },
336 ),
337 None,
338 cross_db_snapshot_backfill_info,
339 )
340 .await?
341 }
342 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
343 barrier_manager_context
344 .metadata_manager
345 .catalog_controller
346 .fill_snapshot_backfill_epoch(
347 info.stream_job_fragments.fragments.iter().filter_map(
348 |(fragment_id, fragment)| {
349 if fragment.fragment_type_mask.contains_any([
350 FragmentTypeFlag::SnapshotBackfillStreamScan,
351 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
352 ]) {
353 Some(*fragment_id as _)
354 } else {
355 None
356 }
357 },
358 ),
359 Some(snapshot_backfill_info),
360 cross_db_snapshot_backfill_info,
361 )
362 .await?
363 }
364 }
365
366 let CreateStreamingJobCommandInfo {
369 stream_job_fragments,
370 upstream_fragment_downstreams,
371 ..
372 } = info;
373 let new_sink_downstream =
374 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
375 let new_downstreams = ctx.new_sink_downstream.clone();
376 let new_downstreams = FragmentDownstreamRelation::from([(
377 ctx.sink_fragment_id,
378 vec![new_downstreams],
379 )]);
380 Some(new_downstreams)
381 } else {
382 None
383 };
384
385 barrier_manager_context
386 .metadata_manager
387 .catalog_controller
388 .post_collect_job_fragments(
389 stream_job_fragments.stream_job_id(),
390 upstream_fragment_downstreams,
391 new_sink_downstream,
392 Some(&info.init_split_assignment),
393 )
394 .await?;
395
396 let source_change = SourceChange::CreateJob {
397 added_source_fragments: stream_job_fragments.stream_source_fragments(),
398 added_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
399 };
400
401 barrier_manager_context
402 .source_manager
403 .apply_source_change(source_change)
404 .await;
405 }
406 Command::RescheduleFragment { reschedules, .. } => {
407 let fragment_splits = reschedules
408 .iter()
409 .map(|(fragment_id, reschedule)| {
410 (*fragment_id, reschedule.actor_splits.clone())
411 })
412 .collect();
413
414 barrier_manager_context
415 .metadata_manager
416 .update_fragment_splits(&fragment_splits)
417 .await?;
418 }
419
420 Command::ReplaceStreamJob(
421 replace_plan @ ReplaceStreamJobPlan {
422 old_fragments,
423 new_fragments,
424 upstream_fragment_downstreams,
425 to_drop_state_table_ids,
426 auto_refresh_schema_sinks,
427 init_split_assignment,
428 ..
429 },
430 ) => {
431 barrier_manager_context
433 .metadata_manager
434 .catalog_controller
435 .post_collect_job_fragments(
436 new_fragments.stream_job_id,
437 upstream_fragment_downstreams,
438 None,
439 Some(init_split_assignment),
440 )
441 .await?;
442
443 if let Some(sinks) = auto_refresh_schema_sinks {
444 for sink in sinks {
445 barrier_manager_context
446 .metadata_manager
447 .catalog_controller
448 .post_collect_job_fragments(
449 sink.tmp_sink_id.as_job_id(),
450 &Default::default(), None, None, )
454 .await?;
455 }
456 }
457
458 barrier_manager_context
460 .source_manager
461 .handle_replace_job(
462 old_fragments,
463 new_fragments.stream_source_fragments(),
464 replace_plan,
465 )
466 .await;
467 barrier_manager_context
468 .hummock_manager
469 .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
470 .await?;
471 }
472
473 Command::CreateSubscription {
474 subscription_id, ..
475 } => {
476 barrier_manager_context
477 .metadata_manager
478 .catalog_controller
479 .finish_create_subscription_catalog(*subscription_id)
480 .await?
481 }
482 Command::DropSubscription { .. } => {}
483 Command::ListFinish { .. } | Command::LoadFinish { .. } | Command::Refresh { .. } => {}
484 }
485
486 Ok(())
487 }
488}