risingwave_meta/barrier/context/
context_impl.rs1use std::sync::Arc;
16
17use anyhow::Context;
18use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
19use risingwave_pb::common::WorkerNode;
20use risingwave_pb::hummock::HummockVersionStats;
21use risingwave_pb::meta::object::PbObjectInfo;
22use risingwave_pb::meta::subscribe_response::{Operation, PbInfo};
23use risingwave_pb::meta::{PbObject, PbObjectGroup};
24use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
25use risingwave_rpc_client::StreamingControlHandle;
26use thiserror_ext::AsReport;
27
28use crate::MetaResult;
29use crate::barrier::command::CommandContext;
30use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
31use crate::barrier::progress::TrackingJob;
32use crate::barrier::schedule::MarkReadyOptions;
33use crate::barrier::{
34 BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
35 CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
36 Scheduled,
37};
38use crate::hummock::CommitEpochInfo;
39use crate::model::FragmentDownstreamRelation;
40use crate::stream::SourceChange;
41
42impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
43 #[await_tree::instrument]
44 async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
45 self.hummock_manager.commit_epoch(commit_info).await?;
46 Ok(self.hummock_manager.get_version_stats().await)
47 }
48
49 #[await_tree::instrument("next_scheduled_barrier")]
50 async fn next_scheduled(&self) -> Scheduled {
51 self.scheduled_barriers.next_scheduled().await
52 }
53
54 fn abort_and_mark_blocked(
55 &self,
56 database_id: Option<DatabaseId>,
57 recovery_reason: RecoveryReason,
58 ) {
59 if database_id.is_none() {
60 self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
61 }
62
63 self.scheduled_barriers
65 .abort_and_mark_blocked(database_id, "cluster is under recovering");
66 }
67
68 fn mark_ready(&self, options: MarkReadyOptions) {
69 let is_global = matches!(&options, MarkReadyOptions::Global { .. });
70 self.scheduled_barriers.mark_ready(options);
71 if is_global {
72 self.set_status(BarrierManagerStatus::Running);
73 }
74 }
75
76 #[await_tree::instrument("post_collect_command({command})")]
77 async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> {
78 command.post_collect(self).await
79 }
80
81 async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
82 self.metadata_manager
83 .notify_finish_failed(database_id, err)
84 .await
85 }
86
87 #[await_tree::instrument("finish_creating_job({job})")]
88 async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
89 job.finish(&self.metadata_manager).await
90 }
91
92 #[await_tree::instrument("finish_cdc_table_backfill({job})")]
93 async fn finish_cdc_table_backfill(&self, job: TableId) -> MetaResult<()> {
94 self.env.cdc_table_backfill_tracker.complete_job(job).await
95 }
96
97 #[await_tree::instrument("new_control_stream({})", node.id)]
98 async fn new_control_stream(
99 &self,
100 node: &WorkerNode,
101 init_request: &PbInitRequest,
102 ) -> MetaResult<StreamingControlHandle> {
103 self.new_control_stream_impl(node, init_request).await
104 }
105
106 async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
107 self.reload_runtime_info_impl().await
108 }
109
110 async fn reload_database_runtime_info(
111 &self,
112 database_id: DatabaseId,
113 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
114 self.reload_database_runtime_info_impl(database_id).await
115 }
116
117 async fn handle_load_finished_source_ids(
118 &self,
119 load_finished_source_ids: Vec<u32>,
120 ) -> MetaResult<()> {
121 use risingwave_common::catalog::TableId;
122
123 tracing::info!(
124 "Handling load finished source IDs: {:?}",
125 load_finished_source_ids
126 );
127
128 use crate::barrier::Command;
129 for associated_source_id in load_finished_source_ids {
130 let res: MetaResult<()> = try {
131 tracing::info!(%associated_source_id, "Scheduling LoadFinish command for refreshable batch source");
132
133 let table_id = TableId::new(associated_source_id);
135 let associated_source_id = table_id;
136
137 let database_id = self
139 .metadata_manager
140 .catalog_controller
141 .get_object_database_id(table_id.table_id() as _)
142 .await
143 .context("Failed to get database id for table")?;
144
145 let load_finish_command = Command::LoadFinish {
147 table_id,
148 associated_source_id,
149 };
150
151 self.barrier_scheduler
153 .run_command_no_wait(
154 risingwave_common::catalog::DatabaseId::new(database_id as u32),
155 load_finish_command,
156 )
157 .context("Failed to schedule LoadFinish command")?;
158
159 tracing::info!(%table_id, %associated_source_id, "LoadFinish command scheduled successfully");
160 };
161 if let Err(e) = res {
162 tracing::error!(error = %e.as_report(),%associated_source_id, "Failed to handle source load finished");
163 }
164 }
165
166 Ok(())
167 }
168}
169
170impl GlobalBarrierWorkerContextImpl {
171 fn set_status(&self, new_status: BarrierManagerStatus) {
172 self.status.store(Arc::new(new_status));
173 }
174}
175
176impl CommandContext {
177 pub async fn post_collect(
180 &self,
181 barrier_manager_context: &GlobalBarrierWorkerContextImpl,
182 ) -> MetaResult<()> {
183 let Some(command) = &self.command else {
184 return Ok(());
185 };
186 match command {
187 Command::Flush => {}
188
189 Command::Throttle(_) => {}
190
191 Command::Pause => {}
192
193 Command::Resume => {}
194
195 Command::SourceChangeSplit(split_assignment) => {
196 barrier_manager_context
197 .metadata_manager
198 .update_actor_splits_by_split_assignment(split_assignment)
199 .await?;
200 barrier_manager_context
201 .source_manager
202 .apply_source_change(SourceChange::SplitChange(split_assignment.clone()))
203 .await;
204 }
205
206 Command::DropStreamingJobs {
207 unregistered_state_table_ids,
208 ..
209 } => {
210 barrier_manager_context
211 .hummock_manager
212 .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
213 .await?;
214 let tables = barrier_manager_context
215 .metadata_manager
216 .catalog_controller
217 .complete_dropped_tables(
218 unregistered_state_table_ids
219 .iter()
220 .map(|id| id.table_id as _),
221 )
222 .await;
223 let objects = tables
224 .into_iter()
225 .map(|t| PbObject {
226 object_info: Some(PbObjectInfo::Table(t)),
227 })
228 .collect();
229 let group = PbInfo::ObjectGroup(PbObjectGroup { objects });
230 barrier_manager_context
231 .env
232 .notification_manager()
233 .notify_hummock(Operation::Delete, group.clone())
234 .await;
235 barrier_manager_context
236 .env
237 .notification_manager()
238 .notify_compactor(Operation::Delete, group)
239 .await;
240 }
241 Command::ConnectorPropsChange(obj_id_map_props) => {
242 barrier_manager_context
244 .source_manager
245 .apply_source_change(SourceChange::UpdateSourceProps {
246 source_id_map_new_props: obj_id_map_props.clone(),
247 })
248 .await;
249 }
250 Command::CreateStreamingJob {
251 info,
252 job_type,
253 cross_db_snapshot_backfill_info,
254 } => {
255 match job_type {
256 CreateStreamingJobType::SinkIntoTable(_) | CreateStreamingJobType::Normal => {
257 barrier_manager_context
258 .metadata_manager
259 .catalog_controller
260 .fill_snapshot_backfill_epoch(
261 info.stream_job_fragments.fragments.iter().filter_map(
262 |(fragment_id, fragment)| {
263 if fragment.fragment_type_mask.contains(
264 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
265 ) {
266 Some(*fragment_id as _)
267 } else {
268 None
269 }
270 },
271 ),
272 None,
273 cross_db_snapshot_backfill_info,
274 )
275 .await?
276 }
277 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
278 barrier_manager_context
279 .metadata_manager
280 .catalog_controller
281 .fill_snapshot_backfill_epoch(
282 info.stream_job_fragments.fragments.iter().filter_map(
283 |(fragment_id, fragment)| {
284 if fragment.fragment_type_mask.contains_any([
285 FragmentTypeFlag::SnapshotBackfillStreamScan,
286 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
287 ]) {
288 Some(*fragment_id as _)
289 } else {
290 None
291 }
292 },
293 ),
294 Some(snapshot_backfill_info),
295 cross_db_snapshot_backfill_info,
296 )
297 .await?
298 }
299 }
300
301 let CreateStreamingJobCommandInfo {
304 stream_job_fragments,
305 upstream_fragment_downstreams,
306 init_split_assignment,
307 ..
308 } = info;
309 let new_sink_downstream =
310 if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
311 let new_downstreams = ctx.new_sink_downstream.clone();
312 let new_downstreams = FragmentDownstreamRelation::from([(
313 ctx.sink_fragment_id,
314 vec![new_downstreams],
315 )]);
316 Some(new_downstreams)
317 } else {
318 None
319 };
320
321 barrier_manager_context
322 .metadata_manager
323 .catalog_controller
324 .post_collect_job_fragments(
325 stream_job_fragments.stream_job_id().table_id as _,
326 stream_job_fragments.actor_ids(),
327 upstream_fragment_downstreams,
328 init_split_assignment,
329 new_sink_downstream,
330 )
331 .await?;
332
333 let source_change = SourceChange::CreateJob {
334 added_source_fragments: stream_job_fragments.stream_source_fragments(),
335 added_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
336 split_assignment: init_split_assignment.clone(),
337 };
338
339 barrier_manager_context
340 .source_manager
341 .apply_source_change(source_change)
342 .await;
343 }
344 Command::RescheduleFragment {
345 reschedules,
346 post_updates,
347 ..
348 } => {
349 barrier_manager_context
350 .scale_controller
351 .post_apply_reschedule(reschedules, post_updates)
352 .await?;
353 }
354
355 Command::ReplaceStreamJob(
356 replace_plan @ ReplaceStreamJobPlan {
357 old_fragments,
358 new_fragments,
359 upstream_fragment_downstreams,
360 init_split_assignment,
361 to_drop_state_table_ids,
362 auto_refresh_schema_sinks,
363 ..
364 },
365 ) => {
366 barrier_manager_context
368 .metadata_manager
369 .catalog_controller
370 .post_collect_job_fragments(
371 new_fragments.stream_job_id.table_id as _,
372 new_fragments.actor_ids(),
373 upstream_fragment_downstreams,
374 init_split_assignment,
375 None,
376 )
377 .await?;
378
379 if let Some(sinks) = auto_refresh_schema_sinks {
380 for sink in sinks {
381 barrier_manager_context
382 .metadata_manager
383 .catalog_controller
384 .post_collect_job_fragments(
385 sink.tmp_sink_id,
386 sink.actor_status.keys().cloned().collect(),
387 &Default::default(), &Default::default(), None, )
391 .await?;
392 }
393 }
394
395 barrier_manager_context
397 .source_manager
398 .handle_replace_job(
399 old_fragments,
400 new_fragments.stream_source_fragments(),
401 init_split_assignment.clone(),
402 replace_plan,
403 )
404 .await;
405 barrier_manager_context
406 .hummock_manager
407 .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
408 .await?;
409 }
410
411 Command::CreateSubscription {
412 subscription_id, ..
413 } => {
414 barrier_manager_context
415 .metadata_manager
416 .catalog_controller
417 .finish_create_subscription_catalog(*subscription_id)
418 .await?
419 }
420 Command::DropSubscription { .. } => {}
421 Command::MergeSnapshotBackfillStreamingJobs(_) => {}
422 Command::StartFragmentBackfill { .. } => {}
423 Command::Refresh { .. } => {}
424 Command::LoadFinish { .. } => {}
425 }
426
427 Ok(())
428 }
429}