risingwave_meta/barrier/context/
context_impl.rs1use std::sync::Arc;
16
17use anyhow::Context;
18use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
19use risingwave_meta_model::table::RefreshState;
20use risingwave_pb::common::WorkerNode;
21use risingwave_pb::hummock::HummockVersionStats;
22use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
23use risingwave_rpc_client::StreamingControlHandle;
24use thiserror_ext::AsReport;
25
26use crate::MetaResult;
27use crate::barrier::command::CommandContext;
28use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
29use crate::barrier::progress::TrackingJob;
30use crate::barrier::schedule::MarkReadyOptions;
31use crate::barrier::{
32 BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
33 CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
34 Scheduled,
35};
36use crate::hummock::CommitEpochInfo;
37use crate::model::FragmentDownstreamRelation;
38use crate::stream::{SourceChange, SplitState};
39
40impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
41 #[await_tree::instrument]
42 async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
43 self.hummock_manager.commit_epoch(commit_info).await?;
44 Ok(self.hummock_manager.get_version_stats().await)
45 }
46
47 #[await_tree::instrument("next_scheduled_barrier")]
48 async fn next_scheduled(&self) -> Scheduled {
49 self.scheduled_barriers.next_scheduled().await
50 }
51
52 fn abort_and_mark_blocked(
53 &self,
54 database_id: Option<DatabaseId>,
55 recovery_reason: RecoveryReason,
56 ) {
57 if database_id.is_none() {
58 self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
59 }
60
61 self.scheduled_barriers
63 .abort_and_mark_blocked(database_id, "cluster is under recovering");
64 }
65
66 fn mark_ready(&self, options: MarkReadyOptions) {
67 let is_global = matches!(&options, MarkReadyOptions::Global { .. });
68 self.scheduled_barriers.mark_ready(options);
69 if is_global {
70 self.set_status(BarrierManagerStatus::Running);
71 }
72 }
73
74 #[await_tree::instrument("post_collect_command({command})")]
75 async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> {
76 command.post_collect(self).await
77 }
78
79 async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
80 self.metadata_manager
81 .notify_finish_failed(database_id, err)
82 .await
83 }
84
85 #[await_tree::instrument("finish_creating_job({job})")]
86 async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
87 job.finish(&self.metadata_manager).await
88 }
89
90 #[await_tree::instrument("finish_cdc_table_backfill({job})")]
91 async fn finish_cdc_table_backfill(&self, job: TableId) -> MetaResult<()> {
92 self.env.cdc_table_backfill_tracker.complete_job(job).await
93 }
94
95 #[await_tree::instrument("new_control_stream({})", node.id)]
96 async fn new_control_stream(
97 &self,
98 node: &WorkerNode,
99 init_request: &PbInitRequest,
100 ) -> MetaResult<StreamingControlHandle> {
101 self.new_control_stream_impl(node, init_request).await
102 }
103
104 async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
105 self.reload_runtime_info_impl().await
106 }
107
108 async fn reload_database_runtime_info(
109 &self,
110 database_id: DatabaseId,
111 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
112 self.reload_database_runtime_info_impl(database_id).await
113 }
114
115 async fn handle_load_finished_source_ids(
116 &self,
117 load_finished_source_ids: Vec<u32>,
118 ) -> MetaResult<()> {
119 use risingwave_common::catalog::TableId;
120
121 tracing::info!(
122 "Handling load finished source IDs: {:?}",
123 load_finished_source_ids
124 );
125
126 use crate::barrier::Command;
127 for associated_source_id in load_finished_source_ids {
128 let res: MetaResult<()> = try {
129 tracing::info!(%associated_source_id, "Scheduling LoadFinish command for refreshable batch source");
130
131 let associated_source_id = TableId::new(associated_source_id);
133 let table_id = self
135 .metadata_manager
136 .catalog_controller
137 .get_table_by_associate_source_id(associated_source_id.table_id() as _)
138 .await
139 .context("Failed to get table id for source")?
140 .id
141 .into();
142
143 let database_id = self
145 .metadata_manager
146 .catalog_controller
147 .get_object_database_id(associated_source_id.table_id() as _)
148 .await
149 .context("Failed to get database id for table")?;
150
151 let load_finish_command = Command::LoadFinish {
153 table_id,
154 associated_source_id,
155 };
156
157 self.barrier_scheduler
159 .run_command_no_wait(
160 risingwave_common::catalog::DatabaseId::new(database_id as u32),
161 load_finish_command,
162 )
163 .context("Failed to schedule LoadFinish command")?;
164
165 tracing::info!(%associated_source_id, %associated_source_id, "LoadFinish command scheduled successfully");
166 };
167 if let Err(e) = res {
168 tracing::error!(error = %e.as_report(), %associated_source_id, "Failed to handle source load finished");
169 }
170 }
171
172 Ok(())
173 }
174
175 async fn handle_refresh_finished_table_ids(
176 &self,
177 refresh_finished_table_ids: Vec<u32>,
178 ) -> MetaResult<()> {
179 use risingwave_meta_model::table::RefreshState;
180
181 tracing::info!(
182 "Handling refresh finished table IDs: {:?}",
183 refresh_finished_table_ids
184 );
185
186 for table_id in refresh_finished_table_ids {
187 let res: MetaResult<()> = try {
188 tracing::info!(%table_id, "Processing refresh finished for materialized view");
189
190 self.metadata_manager
192 .catalog_controller
193 .set_table_refresh_state(table_id as i32, RefreshState::Idle)
194 .await
195 .context("Failed to set table refresh state to Idle")?;
196
197 tracing::info!(%table_id, "Table refresh completed, state updated to Idle");
198 };
199 if let Err(e) = res {
200 tracing::error!(error = %e.as_report(), %table_id, "Failed to handle refresh finished table");
201 }
202 }
203
204 Ok(())
205 }
206}
207
208impl GlobalBarrierWorkerContextImpl {
209 fn set_status(&self, new_status: BarrierManagerStatus) {
210 self.status.store(Arc::new(new_status));
211 }
212}
213
214impl CommandContext {
215 pub async fn post_collect(
218 &self,
219 barrier_manager_context: &GlobalBarrierWorkerContextImpl,
220 ) -> MetaResult<()> {
221 let Some(command) = &self.command else {
222 return Ok(());
223 };
224 match command {
225 Command::Flush => {}
226
227 Command::Throttle(_) => {}
228
229 Command::Pause => {}
230
231 Command::Resume => {}
232
233 Command::SourceChangeSplit(SplitState {
234 split_assignment: assignment,
235 discovered_source_splits: source_splits,
236 }) => {
237 barrier_manager_context
238 .metadata_manager
239 .update_actor_splits_by_split_assignment(assignment)
240 .await?;
241
242 barrier_manager_context
243 .metadata_manager
244 .update_source_splits(source_splits)
245 .await?;
246 }
247
248 Command::DropStreamingJobs {
249 unregistered_state_table_ids,
250 ..
251 } => {
252 barrier_manager_context
253 .hummock_manager
254 .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
255 .await?;
256 barrier_manager_context
257 .metadata_manager
258 .catalog_controller
259 .complete_dropped_tables(
260 unregistered_state_table_ids
261 .iter()
262 .map(|id| id.table_id as _),
263 )
264 .await;
265 }
266 Command::ConnectorPropsChange(obj_id_map_props) => {
267 barrier_manager_context
269 .source_manager
270 .apply_source_change(SourceChange::UpdateSourceProps {
271 source_id_map_new_props: obj_id_map_props.clone(),
272 })
273 .await;
274 }
275 Command::CreateStreamingJob {
276 info,
277 job_type,
278 cross_db_snapshot_backfill_info,
279 } => {
280 match job_type {
281 CreateStreamingJobType::SinkIntoTable(_) | CreateStreamingJobType::Normal => {
282 barrier_manager_context
283 .metadata_manager
284 .catalog_controller
285 .fill_snapshot_backfill_epoch(
286 info.stream_job_fragments.fragments.iter().filter_map(
287 |(fragment_id, fragment)| {
288 if fragment.fragment_type_mask.contains(
289 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
290 ) {
291 Some(*fragment_id as _)
292 } else {
293 None
294 }
295 },
296 ),
297 None,
298 cross_db_snapshot_backfill_info,
299 )
300 .await?
301 }
302 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
303 barrier_manager_context
304 .metadata_manager
305 .catalog_controller
306 .fill_snapshot_backfill_epoch(
307 info.stream_job_fragments.fragments.iter().filter_map(
308 |(fragment_id, fragment)| {
309 if fragment.fragment_type_mask.contains_any([
310 FragmentTypeFlag::SnapshotBackfillStreamScan,
311 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
312 ]) {
313 Some(*fragment_id as _)
314 } else {
315 None
316 }
317 },
318 ),
319 Some(snapshot_backfill_info),
320 cross_db_snapshot_backfill_info,
321 )
322 .await?
323 }
324 }
325
326 let CreateStreamingJobCommandInfo {
329 stream_job_fragments,
330 upstream_fragment_downstreams,
331 init_split_assignment,
332 ..
333 } = info;
334 let new_sink_downstream =
335 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
336 let new_downstreams = ctx.new_sink_downstream.clone();
337 let new_downstreams = FragmentDownstreamRelation::from([(
338 ctx.sink_fragment_id,
339 vec![new_downstreams],
340 )]);
341 Some(new_downstreams)
342 } else {
343 None
344 };
345
346 barrier_manager_context
347 .metadata_manager
348 .catalog_controller
349 .post_collect_job_fragments(
350 stream_job_fragments.stream_job_id().table_id as _,
351 stream_job_fragments.actor_ids(),
352 upstream_fragment_downstreams,
353 init_split_assignment,
354 new_sink_downstream,
355 )
356 .await?;
357
358 let source_change = SourceChange::CreateJob {
359 added_source_fragments: stream_job_fragments.stream_source_fragments(),
360 added_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
361 };
362
363 barrier_manager_context
364 .source_manager
365 .apply_source_change(source_change)
366 .await;
367 }
368 Command::RescheduleFragment {
369 reschedules,
370 post_updates,
371 ..
372 } => {
373 barrier_manager_context
374 .scale_controller
375 .post_apply_reschedule(reschedules, post_updates)
376 .await?;
377 }
378
379 Command::ReplaceStreamJob(
380 replace_plan @ ReplaceStreamJobPlan {
381 old_fragments,
382 new_fragments,
383 upstream_fragment_downstreams,
384 init_split_assignment,
385 to_drop_state_table_ids,
386 auto_refresh_schema_sinks,
387 ..
388 },
389 ) => {
390 barrier_manager_context
392 .metadata_manager
393 .catalog_controller
394 .post_collect_job_fragments(
395 new_fragments.stream_job_id.table_id as _,
396 new_fragments.actor_ids(),
397 upstream_fragment_downstreams,
398 init_split_assignment,
399 None,
400 )
401 .await?;
402
403 if let Some(sinks) = auto_refresh_schema_sinks {
404 for sink in sinks {
405 barrier_manager_context
406 .metadata_manager
407 .catalog_controller
408 .post_collect_job_fragments(
409 sink.tmp_sink_id,
410 sink.actor_status.keys().cloned().collect(),
411 &Default::default(), &Default::default(), None, )
415 .await?;
416 }
417 }
418
419 barrier_manager_context
421 .source_manager
422 .handle_replace_job(
423 old_fragments,
424 new_fragments.stream_source_fragments(),
425 replace_plan,
426 )
427 .await;
428 barrier_manager_context
429 .hummock_manager
430 .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
431 .await?;
432 }
433
434 Command::CreateSubscription {
435 subscription_id, ..
436 } => {
437 barrier_manager_context
438 .metadata_manager
439 .catalog_controller
440 .finish_create_subscription_catalog(*subscription_id)
441 .await?
442 }
443 Command::DropSubscription { .. } => {}
444 Command::MergeSnapshotBackfillStreamingJobs(_) => {}
445 Command::StartFragmentBackfill { .. } => {}
446 Command::Refresh { table_id, .. } => {
447 barrier_manager_context
448 .metadata_manager
449 .catalog_controller
450 .set_table_refresh_state(table_id.table_id() as i32, RefreshState::Refreshing)
451 .await?;
452 }
453 Command::LoadFinish { table_id, .. } => {
454 barrier_manager_context
455 .metadata_manager
456 .catalog_controller
457 .set_table_refresh_state(table_id.table_id() as i32, RefreshState::Finishing)
458 .await?;
459 }
460 }
461
462 Ok(())
463 }
464}