risingwave_meta/controller/catalog/
util.rs1use risingwave_common::catalog::FragmentTypeMask;
16
17use super::*;
18use crate::controller::fragment::FragmentTypeMaskExt;
19
20pub(crate) async fn update_internal_tables(
21 txn: &DatabaseTransaction,
22 object_id: i32,
23 column: object::Column,
24 new_value: Value,
25 objects_to_notify: &mut Vec<PbObjectInfo>,
26) -> MetaResult<()> {
27 let internal_tables = get_internal_tables_by_id(object_id, txn).await?;
28
29 if !internal_tables.is_empty() {
30 Object::update_many()
31 .col_expr(column, SimpleExpr::Value(new_value))
32 .filter(object::Column::Oid.is_in(internal_tables.clone()))
33 .exec(txn)
34 .await?;
35
36 let table_objs = Table::find()
37 .find_also_related(Object)
38 .filter(table::Column::TableId.is_in(internal_tables))
39 .all(txn)
40 .await?;
41 for (table, table_obj) in table_objs {
42 objects_to_notify.push(PbObjectInfo::Table(
43 ObjectModel(table, table_obj.unwrap()).into(),
44 ));
45 }
46 }
47 Ok(())
48}
49
50impl CatalogController {
51 pub(crate) async fn init(&self) -> MetaResult<()> {
52 self.table_catalog_cdc_table_id_update().await?;
53 Ok(())
54 }
55
56 pub(crate) async fn table_catalog_cdc_table_id_update(&self) -> MetaResult<()> {
59 let inner = self.inner.read().await;
60 let txn = inner.db.begin().await?;
61
62 let table_and_source_id: Vec<(TableId, String, SourceId)> = Table::find()
64 .join(JoinType::InnerJoin, table::Relation::ObjectDependency.def())
65 .join(
66 JoinType::InnerJoin,
67 object_dependency::Relation::Source.def(),
68 )
69 .select_only()
70 .columns([table::Column::TableId, table::Column::Definition])
71 .columns([source::Column::SourceId])
72 .filter(
73 table::Column::TableType.eq(TableType::Table).and(
74 table::Column::CdcTableId
75 .is_null()
76 .or(table::Column::CdcTableId.eq("")),
77 ),
78 )
79 .into_tuple()
80 .all(&txn)
81 .await?;
82
83 if table_and_source_id.is_empty() {
85 return Ok(());
86 }
87
88 info!(table_and_source_id = ?table_and_source_id, "cdc table with empty cdc_table_id");
89
90 let mut cdc_table_ids = HashMap::new();
91 for (table_id, definition, source_id) in table_and_source_id {
92 match extract_external_table_name_from_definition(&definition) {
93 None => {
94 tracing::warn!(
95 table_id = table_id,
96 definition = definition,
97 "failed to extract cdc table name from table definition.",
98 )
99 }
100 Some(external_table_name) => {
101 cdc_table_ids.insert(
102 table_id,
103 build_cdc_table_id(source_id as u32, &external_table_name),
104 );
105 }
106 }
107 }
108
109 for (table_id, cdc_table_id) in cdc_table_ids {
110 table::ActiveModel {
111 table_id: Set(table_id as _),
112 cdc_table_id: Set(Some(cdc_table_id)),
113 ..Default::default()
114 }
115 .update(&txn)
116 .await?;
117 }
118 txn.commit().await?;
119 Ok(())
120 }
121
122 pub(crate) async fn list_object_dependencies(
123 &self,
124 include_creating: bool,
125 ) -> MetaResult<Vec<PbObjectDependencies>> {
126 let inner = self.inner.read().await;
127
128 let dependencies: Vec<(ObjectId, ObjectId)> = {
129 let filter = if include_creating {
130 Expr::value(true)
131 } else {
132 streaming_job::Column::JobStatus.eq(JobStatus::Created)
133 };
134 ObjectDependency::find()
135 .select_only()
136 .columns([
137 object_dependency::Column::Oid,
138 object_dependency::Column::UsedBy,
139 ])
140 .join(
141 JoinType::InnerJoin,
142 object_dependency::Relation::Object1.def(),
143 )
144 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
145 .filter(filter)
146 .into_tuple()
147 .all(&inner.db)
148 .await?
149 };
150 let mut obj_dependencies = dependencies
151 .into_iter()
152 .map(|(oid, used_by)| PbObjectDependencies {
153 object_id: used_by as _,
154 referenced_object_id: oid as _,
155 })
156 .collect_vec();
157
158 let view_dependencies: Vec<(ObjectId, ObjectId)> = ObjectDependency::find()
159 .select_only()
160 .columns([
161 object_dependency::Column::Oid,
162 object_dependency::Column::UsedBy,
163 ])
164 .join(
165 JoinType::InnerJoin,
166 object_dependency::Relation::Object1.def(),
167 )
168 .join(JoinType::InnerJoin, object::Relation::View.def())
169 .into_tuple()
170 .all(&inner.db)
171 .await?;
172
173 obj_dependencies.extend(view_dependencies.into_iter().map(|(view_id, table_id)| {
174 PbObjectDependencies {
175 object_id: table_id as _,
176 referenced_object_id: view_id as _,
177 }
178 }));
179
180 let sink_dependencies: Vec<(SinkId, TableId)> = {
181 let filter = if include_creating {
182 sink::Column::TargetTable.is_not_null()
183 } else {
184 streaming_job::Column::JobStatus
185 .eq(JobStatus::Created)
186 .and(sink::Column::TargetTable.is_not_null())
187 };
188 Sink::find()
189 .select_only()
190 .columns([sink::Column::SinkId, sink::Column::TargetTable])
191 .join(JoinType::InnerJoin, sink::Relation::Object.def())
192 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
193 .filter(filter)
194 .into_tuple()
195 .all(&inner.db)
196 .await?
197 };
198 obj_dependencies.extend(sink_dependencies.into_iter().map(|(sink_id, table_id)| {
199 PbObjectDependencies {
200 object_id: table_id as _,
201 referenced_object_id: sink_id as _,
202 }
203 }));
204
205 let subscription_dependencies: Vec<(SubscriptionId, TableId)> = {
206 let filter = if include_creating {
207 subscription::Column::DependentTableId.is_not_null()
208 } else {
209 subscription::Column::SubscriptionState
210 .eq(Into::<i32>::into(SubscriptionState::Created))
211 .and(subscription::Column::DependentTableId.is_not_null())
212 };
213 Subscription::find()
214 .select_only()
215 .columns([
216 subscription::Column::SubscriptionId,
217 subscription::Column::DependentTableId,
218 ])
219 .join(JoinType::InnerJoin, subscription::Relation::Object.def())
220 .filter(filter)
221 .into_tuple()
222 .all(&inner.db)
223 .await?
224 };
225 obj_dependencies.extend(subscription_dependencies.into_iter().map(
226 |(subscription_id, table_id)| PbObjectDependencies {
227 object_id: subscription_id as _,
228 referenced_object_id: table_id as _,
229 },
230 ));
231
232 Ok(obj_dependencies)
233 }
234
235 pub(crate) async fn log_cleaned_dirty_jobs(
236 &self,
237 dirty_objs: &[PartialObject],
238 txn: &DatabaseTransaction,
239 ) -> MetaResult<()> {
240 let mut dirty_table_ids = vec![];
242 let mut dirty_source_ids = vec![];
243 let mut dirty_sink_ids = vec![];
244 for dirty_job_obj in dirty_objs {
245 let job_id = dirty_job_obj.oid;
246 let job_type = dirty_job_obj.obj_type;
247 match job_type {
248 ObjectType::Table | ObjectType::Index => dirty_table_ids.push(job_id),
249 ObjectType::Source => dirty_source_ids.push(job_id),
250 ObjectType::Sink => dirty_sink_ids.push(job_id),
251 _ => unreachable!("unexpected streaming job type"),
252 }
253 }
254
255 let mut event_logs = vec![];
256 if !dirty_table_ids.is_empty() {
257 let table_info: Vec<(TableId, String, String)> = Table::find()
258 .select_only()
259 .columns([
260 table::Column::TableId,
261 table::Column::Name,
262 table::Column::Definition,
263 ])
264 .filter(table::Column::TableId.is_in(dirty_table_ids))
265 .into_tuple()
266 .all(txn)
267 .await?;
268 for (table_id, name, definition) in table_info {
269 let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
270 id: table_id as _,
271 name,
272 definition,
273 error: "clear during recovery".to_owned(),
274 };
275 event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
276 event,
277 ));
278 }
279 }
280 if !dirty_source_ids.is_empty() {
281 let source_info: Vec<(SourceId, String, String)> = Source::find()
282 .select_only()
283 .columns([
284 source::Column::SourceId,
285 source::Column::Name,
286 source::Column::Definition,
287 ])
288 .filter(source::Column::SourceId.is_in(dirty_source_ids))
289 .into_tuple()
290 .all(txn)
291 .await?;
292 for (source_id, name, definition) in source_info {
293 let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
294 id: source_id as _,
295 name,
296 definition,
297 error: "clear during recovery".to_owned(),
298 };
299 event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
300 event,
301 ));
302 }
303 }
304 if !dirty_sink_ids.is_empty() {
305 let sink_info: Vec<(SinkId, String, String)> = Sink::find()
306 .select_only()
307 .columns([
308 sink::Column::SinkId,
309 sink::Column::Name,
310 sink::Column::Definition,
311 ])
312 .filter(sink::Column::SinkId.is_in(dirty_sink_ids))
313 .into_tuple()
314 .all(txn)
315 .await?;
316 for (sink_id, name, definition) in sink_info {
317 let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
318 id: sink_id as _,
319 name,
320 definition,
321 error: "clear during recovery".to_owned(),
322 };
323 event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
324 event,
325 ));
326 }
327 }
328 self.env.event_log_manager_ref().add_event_logs(event_logs);
329 Ok(())
330 }
331
332 pub(crate) async fn clean_dirty_sink_downstreams(
334 txn: &DatabaseTransaction,
335 ) -> MetaResult<Vec<TableId>> {
336 let all_fragment_ids: Vec<FragmentId> = Fragment::find()
346 .select_only()
347 .column(fragment::Column::FragmentId)
348 .into_tuple()
349 .all(txn)
350 .await?;
351
352 let all_fragment_ids: HashSet<_> = all_fragment_ids.into_iter().collect();
353
354 let all_sink_into_tables: Vec<Option<TableId>> = Sink::find()
355 .select_only()
356 .column(sink::Column::TargetTable)
357 .filter(sink::Column::TargetTable.is_not_null())
358 .into_tuple()
359 .all(txn)
360 .await?;
361
362 let mut table_with_incoming_sinks: HashSet<TableId> = HashSet::new();
363 for target_table_id in all_sink_into_tables {
364 table_with_incoming_sinks.insert(target_table_id.expect("filter by non null"));
365 }
366
367 if table_with_incoming_sinks.is_empty() {
369 return Ok(vec![]);
370 }
371
372 let mut updated_table_ids = vec![];
373 for table_id in table_with_incoming_sinks {
374 tracing::info!("cleaning dirty table sink downstream table {}", table_id);
375 updated_table_ids.push(table_id);
376
377 let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
378 .select_only()
379 .columns(vec![
380 fragment::Column::FragmentId,
381 fragment::Column::StreamNode,
382 ])
383 .filter(fragment::Column::JobId.eq(table_id).and(
384 FragmentTypeMask::intersects(FragmentTypeFlag::Mview),
386 ))
387 .into_tuple()
388 .all(txn)
389 .await?;
390
391 for (fragment_id, stream_node) in fragments {
392 {
393 let mut dirty_upstream_fragment_ids = HashSet::new();
394
395 let mut pb_stream_node = stream_node.to_protobuf();
396
397 visit_stream_node_cont_mut(&mut pb_stream_node, |node| {
398 if let Some(NodeBody::Union(_)) = node.node_body {
399 node.input.retain_mut(|input| match &mut input.node_body {
400 Some(NodeBody::Project(_)) => {
401 let body = input.input.iter().exactly_one().unwrap();
402 let Some(NodeBody::Merge(merge_node)) = &body.node_body else {
403 unreachable!("expect merge node");
404 };
405 if all_fragment_ids
406 .contains(&(merge_node.upstream_fragment_id as i32))
407 {
408 true
409 } else {
410 dirty_upstream_fragment_ids
411 .insert(merge_node.upstream_fragment_id);
412 false
413 }
414 }
415 Some(NodeBody::Merge(merge_node)) => {
416 if all_fragment_ids
417 .contains(&(merge_node.upstream_fragment_id as i32))
418 {
419 true
420 } else {
421 dirty_upstream_fragment_ids
422 .insert(merge_node.upstream_fragment_id);
423 false
424 }
425 }
426 _ => false,
427 });
428 }
429 true
430 });
431
432 tracing::info!(
433 "cleaning dirty table sink fragment {:?} from downstream fragment {}",
434 dirty_upstream_fragment_ids,
435 fragment_id
436 );
437
438 if !dirty_upstream_fragment_ids.is_empty() {
439 tracing::info!(
440 "fixing dirty stream node in downstream fragment {}",
441 fragment_id
442 );
443 Fragment::update_many()
444 .col_expr(
445 fragment::Column::StreamNode,
446 StreamNode::from(&pb_stream_node).into(),
447 )
448 .filter(fragment::Column::FragmentId.eq(fragment_id))
449 .exec(txn)
450 .await?;
451 }
452 }
453 }
454 }
455
456 Ok(updated_table_ids)
457 }
458
459 pub async fn has_any_streaming_jobs(&self) -> MetaResult<bool> {
460 let inner = self.inner.read().await;
461 let count = streaming_job::Entity::find().count(&inner.db).await?;
462 Ok(count > 0)
463 }
464
465 pub async fn find_creating_streaming_job_ids(
466 &self,
467 infos: Vec<PbCreatingJobInfo>,
468 ) -> MetaResult<Vec<ObjectId>> {
469 let inner = self.inner.read().await;
470
471 type JobKey = (DatabaseId, SchemaId, String);
472
473 let creating_tables: Vec<(ObjectId, String, DatabaseId, SchemaId)> = Table::find()
475 .select_only()
476 .columns([table::Column::TableId, table::Column::Name])
477 .columns([object::Column::DatabaseId, object::Column::SchemaId])
478 .join(JoinType::InnerJoin, table::Relation::Object1.def())
479 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
480 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
481 .into_tuple()
482 .all(&inner.db)
483 .await?;
484 let creating_sinks: Vec<(ObjectId, String, DatabaseId, SchemaId)> = Sink::find()
485 .select_only()
486 .columns([sink::Column::SinkId, sink::Column::Name])
487 .columns([object::Column::DatabaseId, object::Column::SchemaId])
488 .join(JoinType::InnerJoin, sink::Relation::Object.def())
489 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
490 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
491 .into_tuple()
492 .all(&inner.db)
493 .await?;
494 let creating_subscriptions: Vec<(ObjectId, String, DatabaseId, SchemaId)> =
495 Subscription::find()
496 .select_only()
497 .columns([
498 subscription::Column::SubscriptionId,
499 subscription::Column::Name,
500 ])
501 .columns([object::Column::DatabaseId, object::Column::SchemaId])
502 .join(JoinType::InnerJoin, subscription::Relation::Object.def())
503 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
504 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
505 .into_tuple()
506 .all(&inner.db)
507 .await?;
508
509 let mut job_mapping: HashMap<JobKey, ObjectId> = creating_tables
510 .into_iter()
511 .chain(creating_sinks.into_iter())
512 .chain(creating_subscriptions.into_iter())
513 .map(|(id, name, database_id, schema_id)| ((database_id, schema_id, name), id))
514 .collect();
515
516 Ok(infos
517 .into_iter()
518 .flat_map(|info| {
519 job_mapping.remove(&(
520 info.database_id as _,
521 info.schema_id as _,
522 info.name.clone(),
523 ))
524 })
525 .collect())
526 }
527}