risingwave_meta/barrier/context/
context_impl.rs1use std::sync::Arc;
16
17use anyhow::Context;
18use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag};
19use risingwave_pb::common::WorkerNode;
20use risingwave_pb::hummock::HummockVersionStats;
21use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
22use risingwave_rpc_client::StreamingControlHandle;
23use thiserror_ext::AsReport;
24
25use crate::MetaResult;
26use crate::barrier::command::CommandContext;
27use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
28use crate::barrier::progress::TrackingJob;
29use crate::barrier::schedule::MarkReadyOptions;
30use crate::barrier::{
31 BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
32 CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
33 Scheduled,
34};
35use crate::hummock::CommitEpochInfo;
36use crate::stream::SourceChange;
37
38impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
39 #[await_tree::instrument]
40 async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
41 self.hummock_manager.commit_epoch(commit_info).await?;
42 Ok(self.hummock_manager.get_version_stats().await)
43 }
44
45 #[await_tree::instrument("next_scheduled_barrier")]
46 async fn next_scheduled(&self) -> Scheduled {
47 self.scheduled_barriers.next_scheduled().await
48 }
49
50 fn abort_and_mark_blocked(
51 &self,
52 database_id: Option<DatabaseId>,
53 recovery_reason: RecoveryReason,
54 ) {
55 if database_id.is_none() {
56 self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
57 }
58
59 self.scheduled_barriers
61 .abort_and_mark_blocked(database_id, "cluster is under recovering");
62 }
63
64 fn mark_ready(&self, options: MarkReadyOptions) {
65 let is_global = matches!(&options, MarkReadyOptions::Global { .. });
66 self.scheduled_barriers.mark_ready(options);
67 if is_global {
68 self.set_status(BarrierManagerStatus::Running);
69 }
70 }
71
72 #[await_tree::instrument("post_collect_command({command})")]
73 async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> {
74 command.post_collect(self).await
75 }
76
77 async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
78 self.metadata_manager
79 .notify_finish_failed(database_id, err)
80 .await
81 }
82
83 #[await_tree::instrument("finish_creating_job({job})")]
84 async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
85 job.finish(&self.metadata_manager).await
86 }
87
88 #[await_tree::instrument("new_control_stream({})", node.id)]
89 async fn new_control_stream(
90 &self,
91 node: &WorkerNode,
92 init_request: &PbInitRequest,
93 ) -> MetaResult<StreamingControlHandle> {
94 self.new_control_stream_impl(node, init_request).await
95 }
96
97 async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
98 self.reload_runtime_info_impl().await
99 }
100
101 async fn reload_database_runtime_info(
102 &self,
103 database_id: DatabaseId,
104 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
105 self.reload_database_runtime_info_impl(database_id).await
106 }
107
108 async fn handle_load_finished_source_ids(
109 &self,
110 load_finished_source_ids: Vec<u32>,
111 ) -> MetaResult<()> {
112 use risingwave_common::catalog::TableId;
113
114 tracing::info!(
115 "Handling load finished source IDs: {:?}",
116 load_finished_source_ids
117 );
118
119 use crate::barrier::Command;
120 for associated_source_id in load_finished_source_ids {
121 let res: MetaResult<()> = try {
122 tracing::info!(%associated_source_id, "Scheduling LoadFinish command for refreshable batch source");
123
124 let table_id = TableId::new(associated_source_id);
126 let associated_source_id = table_id;
127
128 let database_id = self
130 .metadata_manager
131 .catalog_controller
132 .get_object_database_id(table_id.table_id() as _)
133 .await
134 .context("Failed to get database id for table")?;
135
136 let load_finish_command = Command::LoadFinish {
138 table_id,
139 associated_source_id,
140 };
141
142 self.barrier_scheduler
144 .run_command_no_wait(
145 risingwave_common::catalog::DatabaseId::new(database_id as u32),
146 load_finish_command,
147 )
148 .context("Failed to schedule LoadFinish command")?;
149
150 tracing::info!(%table_id, %associated_source_id, "LoadFinish command scheduled successfully");
151 };
152 if let Err(e) = res {
153 tracing::error!(error = %e.as_report(),%associated_source_id, "Failed to handle source load finished");
154 }
155 }
156
157 Ok(())
158 }
159}
160
161impl GlobalBarrierWorkerContextImpl {
162 fn set_status(&self, new_status: BarrierManagerStatus) {
163 self.status.store(Arc::new(new_status));
164 }
165}
166
167impl CommandContext {
168 pub async fn post_collect(
171 &self,
172 barrier_manager_context: &GlobalBarrierWorkerContextImpl,
173 ) -> MetaResult<()> {
174 let Some(command) = &self.command else {
175 return Ok(());
176 };
177 match command {
178 Command::Flush => {}
179
180 Command::Throttle(_) => {}
181
182 Command::Pause => {}
183
184 Command::Resume => {}
185
186 Command::SourceChangeSplit(split_assignment) => {
187 barrier_manager_context
188 .metadata_manager
189 .update_actor_splits_by_split_assignment(split_assignment)
190 .await?;
191 barrier_manager_context
192 .source_manager
193 .apply_source_change(SourceChange::SplitChange(split_assignment.clone()))
194 .await;
195 }
196
197 Command::DropStreamingJobs {
198 unregistered_state_table_ids,
199 ..
200 } => {
201 barrier_manager_context
202 .hummock_manager
203 .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
204 .await?;
205 }
206 Command::ConnectorPropsChange(obj_id_map_props) => {
207 barrier_manager_context
209 .source_manager
210 .apply_source_change(SourceChange::UpdateSourceProps {
211 source_id_map_new_props: obj_id_map_props.clone(),
212 })
213 .await;
214 }
215 Command::CreateStreamingJob {
216 info,
217 job_type,
218 cross_db_snapshot_backfill_info,
219 } => {
220 let mut replace_plan = None;
221 match job_type {
222 CreateStreamingJobType::SinkIntoTable(plan) => {
223 replace_plan = Some(plan);
224 }
225 CreateStreamingJobType::Normal => {
226 barrier_manager_context
227 .metadata_manager
228 .catalog_controller
229 .fill_snapshot_backfill_epoch(
230 info.stream_job_fragments.fragments.iter().filter_map(
231 |(fragment_id, fragment)| {
232 if fragment.fragment_type_mask.contains(
233 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
234 ) {
235 Some(*fragment_id as _)
236 } else {
237 None
238 }
239 },
240 ),
241 None,
242 cross_db_snapshot_backfill_info,
243 )
244 .await?
245 }
246 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
247 barrier_manager_context
248 .metadata_manager
249 .catalog_controller
250 .fill_snapshot_backfill_epoch(
251 info.stream_job_fragments.fragments.iter().filter_map(
252 |(fragment_id, fragment)| {
253 if fragment.fragment_type_mask.contains_any([
254 FragmentTypeFlag::SnapshotBackfillStreamScan,
255 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
256 ]) {
257 Some(*fragment_id as _)
258 } else {
259 None
260 }
261 },
262 ),
263 Some(snapshot_backfill_info),
264 cross_db_snapshot_backfill_info,
265 )
266 .await?
267 }
268 }
269
270 let CreateStreamingJobCommandInfo {
273 stream_job_fragments,
274 upstream_fragment_downstreams,
275 init_split_assignment,
276 ..
277 } = info;
278 barrier_manager_context
279 .metadata_manager
280 .catalog_controller
281 .post_collect_job_fragments(
282 stream_job_fragments.stream_job_id().table_id as _,
283 stream_job_fragments.actor_ids(),
284 upstream_fragment_downstreams,
285 init_split_assignment,
286 replace_plan,
287 )
288 .await?;
289
290 if let Some(plan) = replace_plan {
291 barrier_manager_context
292 .source_manager
293 .handle_replace_job(
294 &plan.old_fragments,
295 plan.new_fragments.stream_source_fragments(),
296 init_split_assignment.clone(),
297 plan,
298 )
299 .await;
300 }
301
302 let source_change = SourceChange::CreateJob {
303 added_source_fragments: stream_job_fragments.stream_source_fragments(),
304 added_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
305 split_assignment: init_split_assignment.clone(),
306 };
307
308 barrier_manager_context
309 .source_manager
310 .apply_source_change(source_change)
311 .await;
312 }
313 Command::RescheduleFragment {
314 reschedules,
315 post_updates,
316 ..
317 } => {
318 barrier_manager_context
319 .scale_controller
320 .post_apply_reschedule(reschedules, post_updates)
321 .await?;
322 }
323
324 Command::ReplaceStreamJob(
325 replace_plan @ ReplaceStreamJobPlan {
326 old_fragments,
327 new_fragments,
328 upstream_fragment_downstreams,
329 init_split_assignment,
330 to_drop_state_table_ids,
331 auto_refresh_schema_sinks,
332 ..
333 },
334 ) => {
335 barrier_manager_context
337 .metadata_manager
338 .catalog_controller
339 .post_collect_job_fragments(
340 new_fragments.stream_job_id.table_id as _,
341 new_fragments.actor_ids(),
342 upstream_fragment_downstreams,
343 init_split_assignment,
344 None,
345 )
346 .await?;
347
348 if let Some(sinks) = auto_refresh_schema_sinks {
349 for sink in sinks {
350 barrier_manager_context
351 .metadata_manager
352 .catalog_controller
353 .post_collect_job_fragments(
354 sink.tmp_sink_id,
355 sink.actor_status.keys().cloned().collect(),
356 &Default::default(), &Default::default(), None, )
360 .await?;
361 }
362 }
363
364 barrier_manager_context
366 .source_manager
367 .handle_replace_job(
368 old_fragments,
369 new_fragments.stream_source_fragments(),
370 init_split_assignment.clone(),
371 replace_plan,
372 )
373 .await;
374 barrier_manager_context
375 .hummock_manager
376 .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
377 .await?;
378 }
379
380 Command::CreateSubscription {
381 subscription_id, ..
382 } => {
383 barrier_manager_context
384 .metadata_manager
385 .catalog_controller
386 .finish_create_subscription_catalog(*subscription_id)
387 .await?
388 }
389 Command::DropSubscription { .. } => {}
390 Command::MergeSnapshotBackfillStreamingJobs(_) => {}
391 Command::StartFragmentBackfill { .. } => {}
392 Command::Refresh { .. } => {}
393 Command::LoadFinish { .. } => {}
394 }
395
396 Ok(())
397 }
398}