risingwave_meta/controller/catalog/
util.rs1use risingwave_common::catalog::FragmentTypeMask;
16
17use super::*;
18use crate::controller::fragment::FragmentTypeMaskExt;
19use crate::controller::utils::load_streaming_jobs_by_ids;
20
21pub(crate) async fn update_internal_tables(
22 txn: &DatabaseTransaction,
23 object_id: ObjectId,
24 column: object::Column,
25 new_value: impl Into<Value>,
26 objects_to_notify: &mut Vec<PbObjectInfo>,
27) -> MetaResult<()> {
28 let internal_tables = get_internal_tables_by_id(object_id.as_job_id(), txn).await?;
29
30 if !internal_tables.is_empty() {
31 Object::update_many()
32 .col_expr(column, SimpleExpr::Value(new_value.into()))
33 .filter(object::Column::Oid.is_in(internal_tables.clone()))
34 .exec(txn)
35 .await?;
36
37 let table_objs = Table::find()
38 .find_also_related(Object)
39 .filter(table::Column::TableId.is_in(internal_tables))
40 .all(txn)
41 .await?;
42 let streaming_jobs =
43 load_streaming_jobs_by_ids(txn, table_objs.iter().map(|(table, _)| table.job_id()))
44 .await?;
45 for (table, table_obj) in table_objs {
46 let job_id = table.job_id();
47 let streaming_job = streaming_jobs.get(&job_id).cloned();
48 objects_to_notify.push(PbObjectInfo::Table(
49 ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
50 ));
51 }
52 }
53 Ok(())
54}
55
56impl CatalogController {
57 pub(crate) async fn init(&self) -> MetaResult<()> {
58 self.table_catalog_cdc_table_id_update().await?;
59 Ok(())
60 }
61
62 pub(crate) async fn table_catalog_cdc_table_id_update(&self) -> MetaResult<()> {
65 let inner = self.inner.read().await;
66 let txn = inner.db.begin().await?;
67
68 let table_and_source_id: Vec<(TableId, String, SourceId)> = Table::find()
70 .join(JoinType::InnerJoin, table::Relation::ObjectDependency.def())
71 .join(
72 JoinType::InnerJoin,
73 object_dependency::Relation::Source.def(),
74 )
75 .select_only()
76 .columns([table::Column::TableId, table::Column::Definition])
77 .columns([source::Column::SourceId])
78 .filter(
79 table::Column::TableType.eq(TableType::Table).and(
80 table::Column::CdcTableId
81 .is_null()
82 .or(table::Column::CdcTableId.eq("")),
83 ),
84 )
85 .into_tuple()
86 .all(&txn)
87 .await?;
88
89 if table_and_source_id.is_empty() {
91 return Ok(());
92 }
93
94 info!(table_and_source_id = ?table_and_source_id, "cdc table with empty cdc_table_id");
95
96 let mut cdc_table_ids = HashMap::new();
97 for (table_id, definition, source_id) in table_and_source_id {
98 match extract_external_table_name_from_definition(&definition) {
99 None => {
100 tracing::warn!(
101 %table_id,
102 definition,
103 "failed to extract cdc table name from table definition.",
104 )
105 }
106 Some(external_table_name) => {
107 cdc_table_ids.insert(
108 table_id,
109 build_cdc_table_id(source_id, &external_table_name),
110 );
111 }
112 }
113 }
114
115 for (table_id, cdc_table_id) in cdc_table_ids {
116 Table::update(table::ActiveModel {
117 table_id: Set(table_id as _),
118 cdc_table_id: Set(Some(cdc_table_id)),
119 ..Default::default()
120 })
121 .exec(&txn)
122 .await?;
123 }
124 txn.commit().await?;
125 Ok(())
126 }
127
128 pub(crate) async fn log_cleaned_dirty_jobs(
129 &self,
130 dirty_objs: &[PartialObject],
131 txn: &DatabaseTransaction,
132 ) -> MetaResult<()> {
133 let mut dirty_table_ids = vec![];
135 let mut dirty_source_ids = vec![];
136 let mut dirty_sink_ids = vec![];
137 for dirty_job_obj in dirty_objs {
138 let job_id = dirty_job_obj.oid;
139 let job_type = dirty_job_obj.obj_type;
140 match job_type {
141 ObjectType::Table | ObjectType::Index => dirty_table_ids.push(job_id),
142 ObjectType::Source => dirty_source_ids.push(job_id),
143 ObjectType::Sink => dirty_sink_ids.push(job_id),
144 _ => unreachable!("unexpected streaming job type"),
145 }
146 }
147
148 let mut event_logs = vec![];
149 if !dirty_table_ids.is_empty() {
150 let table_info: Vec<(TableId, String, String)> = Table::find()
151 .select_only()
152 .columns([
153 table::Column::TableId,
154 table::Column::Name,
155 table::Column::Definition,
156 ])
157 .filter(table::Column::TableId.is_in(dirty_table_ids))
158 .into_tuple()
159 .all(txn)
160 .await?;
161 for (table_id, name, definition) in table_info {
162 let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
163 id: table_id.as_job_id(),
164 name,
165 definition,
166 error: "clear during recovery".to_owned(),
167 };
168 event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
169 event,
170 ));
171 }
172 }
173 if !dirty_source_ids.is_empty() {
174 let source_info: Vec<(SourceId, String, String)> = Source::find()
175 .select_only()
176 .columns([
177 source::Column::SourceId,
178 source::Column::Name,
179 source::Column::Definition,
180 ])
181 .filter(source::Column::SourceId.is_in(dirty_source_ids))
182 .into_tuple()
183 .all(txn)
184 .await?;
185 for (source_id, name, definition) in source_info {
186 let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
187 id: source_id.as_share_source_job_id(),
188 name,
189 definition,
190 error: "clear during recovery".to_owned(),
191 };
192 event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
193 event,
194 ));
195 }
196 }
197 if !dirty_sink_ids.is_empty() {
198 let sink_info: Vec<(SinkId, String, String)> = Sink::find()
199 .select_only()
200 .columns([
201 sink::Column::SinkId,
202 sink::Column::Name,
203 sink::Column::Definition,
204 ])
205 .filter(sink::Column::SinkId.is_in(dirty_sink_ids))
206 .into_tuple()
207 .all(txn)
208 .await?;
209 for (sink_id, name, definition) in sink_info {
210 let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
211 id: sink_id.as_job_id(),
212 name,
213 definition,
214 error: "clear during recovery".to_owned(),
215 };
216 event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
217 event,
218 ));
219 }
220 }
221 self.env.event_log_manager_ref().add_event_logs(event_logs);
222 Ok(())
223 }
224
225 pub(crate) async fn clean_dirty_sink_downstreams(txn: &DatabaseTransaction) -> MetaResult<()> {
226 let all_fragment_ids: Vec<FragmentId> = Fragment::find()
236 .select_only()
237 .column(fragment::Column::FragmentId)
238 .into_tuple()
239 .all(txn)
240 .await?;
241
242 let all_fragment_ids: HashSet<_> = all_fragment_ids.into_iter().collect();
243
244 let all_sink_into_tables: Vec<Option<TableId>> = Sink::find()
245 .select_only()
246 .column(sink::Column::TargetTable)
247 .filter(sink::Column::TargetTable.is_not_null())
248 .into_tuple()
249 .all(txn)
250 .await?;
251
252 let mut table_with_incoming_sinks: HashSet<TableId> = HashSet::new();
253 for target_table_id in all_sink_into_tables {
254 table_with_incoming_sinks.insert(target_table_id.expect("filter by non null"));
255 }
256
257 if table_with_incoming_sinks.is_empty() {
259 return Ok(());
260 }
261
262 for table_id in table_with_incoming_sinks {
263 tracing::info!("cleaning dirty table sink downstream table {}", table_id);
264
265 let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
266 .select_only()
267 .columns(vec![
268 fragment::Column::FragmentId,
269 fragment::Column::StreamNode,
270 ])
271 .filter(fragment::Column::JobId.eq(table_id).and(
272 FragmentTypeMask::intersects(FragmentTypeFlag::Mview),
274 ))
275 .into_tuple()
276 .all(txn)
277 .await?;
278
279 for (fragment_id, stream_node) in fragments {
280 {
281 let mut dirty_upstream_fragment_ids = HashSet::new();
282
283 let mut pb_stream_node = stream_node.to_protobuf();
284
285 visit_stream_node_cont_mut(&mut pb_stream_node, |node| {
286 if let Some(NodeBody::Union(_)) = node.node_body {
287 node.input.retain_mut(|input| match &mut input.node_body {
288 Some(NodeBody::Project(_)) => {
289 let body = input.input.iter().exactly_one().unwrap();
290 let Some(NodeBody::Merge(merge_node)) = &body.node_body else {
291 unreachable!("expect merge node");
292 };
293 if all_fragment_ids.contains(&(merge_node.upstream_fragment_id))
294 {
295 true
296 } else {
297 dirty_upstream_fragment_ids
298 .insert(merge_node.upstream_fragment_id);
299 false
300 }
301 }
302 Some(NodeBody::Merge(merge_node)) => {
303 if all_fragment_ids.contains(&(merge_node.upstream_fragment_id))
304 {
305 true
306 } else {
307 dirty_upstream_fragment_ids
308 .insert(merge_node.upstream_fragment_id);
309 false
310 }
311 }
312 _ => false,
313 });
314 }
315 true
316 });
317
318 tracing::info!(
319 "cleaning dirty table sink fragment {:?} from downstream fragment {}",
320 dirty_upstream_fragment_ids,
321 fragment_id
322 );
323
324 if !dirty_upstream_fragment_ids.is_empty() {
325 tracing::info!(
326 "fixing dirty stream node in downstream fragment {}",
327 fragment_id
328 );
329 Fragment::update_many()
330 .col_expr(
331 fragment::Column::StreamNode,
332 StreamNode::from(&pb_stream_node).into(),
333 )
334 .filter(fragment::Column::FragmentId.eq(fragment_id))
335 .exec(txn)
336 .await?;
337 }
338 }
339 }
340 }
341
342 Ok(())
343 }
344
345 pub async fn has_any_streaming_jobs(&self) -> MetaResult<bool> {
346 let inner = self.inner.read().await;
347 let count = streaming_job::Entity::find().count(&inner.db).await?;
348 Ok(count > 0)
349 }
350
351 pub async fn find_creating_streaming_job_ids(
352 &self,
353 infos: Vec<PbCreatingJobInfo>,
354 ) -> MetaResult<Vec<ObjectId>> {
355 let inner = self.inner.read().await;
356
357 type JobKey = (DatabaseId, SchemaId, String);
358
359 let creating_tables: Vec<(ObjectId, String, DatabaseId, SchemaId)> = Table::find()
361 .select_only()
362 .columns([table::Column::TableId, table::Column::Name])
363 .columns([object::Column::DatabaseId, object::Column::SchemaId])
364 .join(JoinType::InnerJoin, table::Relation::Object1.def())
365 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
366 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
367 .into_tuple()
368 .all(&inner.db)
369 .await?;
370 let creating_sinks: Vec<(ObjectId, String, DatabaseId, SchemaId)> = Sink::find()
371 .select_only()
372 .columns([sink::Column::SinkId, sink::Column::Name])
373 .columns([object::Column::DatabaseId, object::Column::SchemaId])
374 .join(JoinType::InnerJoin, sink::Relation::Object.def())
375 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
376 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
377 .into_tuple()
378 .all(&inner.db)
379 .await?;
380 let creating_subscriptions: Vec<(ObjectId, String, DatabaseId, SchemaId)> =
381 Subscription::find()
382 .select_only()
383 .columns([
384 subscription::Column::SubscriptionId,
385 subscription::Column::Name,
386 ])
387 .columns([object::Column::DatabaseId, object::Column::SchemaId])
388 .join(JoinType::InnerJoin, subscription::Relation::Object.def())
389 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
390 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
391 .into_tuple()
392 .all(&inner.db)
393 .await?;
394
395 let mut job_mapping: HashMap<JobKey, ObjectId> = creating_tables
396 .into_iter()
397 .chain(creating_sinks.into_iter())
398 .chain(creating_subscriptions.into_iter())
399 .map(|(id, name, database_id, schema_id)| ((database_id, schema_id, name), id))
400 .collect();
401
402 Ok(infos
403 .into_iter()
404 .flat_map(|info| job_mapping.remove(&(info.database_id, info.schema_id, info.name)))
405 .collect())
406 }
407}