1use super::*;
16
17pub(crate) async fn update_internal_tables(
18 txn: &DatabaseTransaction,
19 object_id: i32,
20 column: object::Column,
21 new_value: Value,
22 objects_to_notify: &mut Vec<PbObjectInfo>,
23) -> MetaResult<()> {
24 let internal_tables = get_internal_tables_by_id(object_id, txn).await?;
25
26 if !internal_tables.is_empty() {
27 Object::update_many()
28 .col_expr(column, SimpleExpr::Value(new_value))
29 .filter(object::Column::Oid.is_in(internal_tables.clone()))
30 .exec(txn)
31 .await?;
32
33 let table_objs = Table::find()
34 .find_also_related(Object)
35 .filter(table::Column::TableId.is_in(internal_tables))
36 .all(txn)
37 .await?;
38 for (table, table_obj) in table_objs {
39 objects_to_notify.push(PbObjectInfo::Table(
40 ObjectModel(table, table_obj.unwrap()).into(),
41 ));
42 }
43 }
44 Ok(())
45}
46
47impl CatalogController {
48 pub(crate) async fn init(&self) -> MetaResult<()> {
49 self.table_catalog_cdc_table_id_update().await?;
50 Ok(())
51 }
52
53 pub(crate) async fn table_catalog_cdc_table_id_update(&self) -> MetaResult<()> {
56 let inner = self.inner.read().await;
57 let txn = inner.db.begin().await?;
58
59 let table_and_source_id: Vec<(TableId, String, SourceId)> = Table::find()
61 .join(JoinType::InnerJoin, table::Relation::ObjectDependency.def())
62 .join(
63 JoinType::InnerJoin,
64 object_dependency::Relation::Source.def(),
65 )
66 .select_only()
67 .columns([table::Column::TableId, table::Column::Definition])
68 .columns([source::Column::SourceId])
69 .filter(
70 table::Column::TableType.eq(TableType::Table).and(
71 table::Column::CdcTableId
72 .is_null()
73 .or(table::Column::CdcTableId.eq("")),
74 ),
75 )
76 .into_tuple()
77 .all(&txn)
78 .await?;
79
80 if table_and_source_id.is_empty() {
82 return Ok(());
83 }
84
85 info!(table_and_source_id = ?table_and_source_id, "cdc table with empty cdc_table_id");
86
87 let mut cdc_table_ids = HashMap::new();
88 for (table_id, definition, source_id) in table_and_source_id {
89 match extract_external_table_name_from_definition(&definition) {
90 None => {
91 tracing::warn!(
92 table_id = table_id,
93 definition = definition,
94 "failed to extract cdc table name from table definition.",
95 )
96 }
97 Some(external_table_name) => {
98 cdc_table_ids.insert(
99 table_id,
100 build_cdc_table_id(source_id as u32, &external_table_name),
101 );
102 }
103 }
104 }
105
106 for (table_id, cdc_table_id) in cdc_table_ids {
107 table::ActiveModel {
108 table_id: Set(table_id as _),
109 cdc_table_id: Set(Some(cdc_table_id)),
110 ..Default::default()
111 }
112 .update(&txn)
113 .await?;
114 }
115 txn.commit().await?;
116 Ok(())
117 }
118
119 pub(crate) async fn list_object_dependencies(
120 &self,
121 include_creating: bool,
122 ) -> MetaResult<Vec<PbObjectDependencies>> {
123 let inner = self.inner.read().await;
124
125 let dependencies: Vec<(ObjectId, ObjectId)> = {
126 let filter = if include_creating {
127 Expr::value(true)
128 } else {
129 streaming_job::Column::JobStatus.eq(JobStatus::Created)
130 };
131 ObjectDependency::find()
132 .select_only()
133 .columns([
134 object_dependency::Column::Oid,
135 object_dependency::Column::UsedBy,
136 ])
137 .join(
138 JoinType::InnerJoin,
139 object_dependency::Relation::Object1.def(),
140 )
141 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
142 .filter(filter)
143 .into_tuple()
144 .all(&inner.db)
145 .await?
146 };
147 let mut obj_dependencies = dependencies
148 .into_iter()
149 .map(|(oid, used_by)| PbObjectDependencies {
150 object_id: used_by as _,
151 referenced_object_id: oid as _,
152 })
153 .collect_vec();
154
155 let view_dependencies: Vec<(ObjectId, ObjectId)> = ObjectDependency::find()
156 .select_only()
157 .columns([
158 object_dependency::Column::Oid,
159 object_dependency::Column::UsedBy,
160 ])
161 .join(
162 JoinType::InnerJoin,
163 object_dependency::Relation::Object1.def(),
164 )
165 .join(JoinType::InnerJoin, object::Relation::View.def())
166 .into_tuple()
167 .all(&inner.db)
168 .await?;
169
170 obj_dependencies.extend(view_dependencies.into_iter().map(|(view_id, table_id)| {
171 PbObjectDependencies {
172 object_id: table_id as _,
173 referenced_object_id: view_id as _,
174 }
175 }));
176
177 let sink_dependencies: Vec<(SinkId, TableId)> = {
178 let filter = if include_creating {
179 sink::Column::TargetTable.is_not_null()
180 } else {
181 streaming_job::Column::JobStatus
182 .eq(JobStatus::Created)
183 .and(sink::Column::TargetTable.is_not_null())
184 };
185 Sink::find()
186 .select_only()
187 .columns([sink::Column::SinkId, sink::Column::TargetTable])
188 .join(JoinType::InnerJoin, sink::Relation::Object.def())
189 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
190 .filter(filter)
191 .into_tuple()
192 .all(&inner.db)
193 .await?
194 };
195 obj_dependencies.extend(sink_dependencies.into_iter().map(|(sink_id, table_id)| {
196 PbObjectDependencies {
197 object_id: table_id as _,
198 referenced_object_id: sink_id as _,
199 }
200 }));
201
202 let subscription_dependencies: Vec<(SubscriptionId, TableId)> = {
203 let filter = if include_creating {
204 subscription::Column::DependentTableId.is_not_null()
205 } else {
206 subscription::Column::SubscriptionState
207 .eq(Into::<i32>::into(SubscriptionState::Created))
208 .and(subscription::Column::DependentTableId.is_not_null())
209 };
210 Subscription::find()
211 .select_only()
212 .columns([
213 subscription::Column::SubscriptionId,
214 subscription::Column::DependentTableId,
215 ])
216 .join(JoinType::InnerJoin, subscription::Relation::Object.def())
217 .filter(filter)
218 .into_tuple()
219 .all(&inner.db)
220 .await?
221 };
222 obj_dependencies.extend(subscription_dependencies.into_iter().map(
223 |(subscription_id, table_id)| PbObjectDependencies {
224 object_id: subscription_id as _,
225 referenced_object_id: table_id as _,
226 },
227 ));
228
229 Ok(obj_dependencies)
230 }
231
232 pub(crate) async fn log_cleaned_dirty_jobs(
233 &self,
234 dirty_objs: &[PartialObject],
235 txn: &DatabaseTransaction,
236 ) -> MetaResult<()> {
237 let mut dirty_table_ids = vec![];
239 let mut dirty_source_ids = vec![];
240 let mut dirty_sink_ids = vec![];
241 for dirty_job_obj in dirty_objs {
242 let job_id = dirty_job_obj.oid;
243 let job_type = dirty_job_obj.obj_type;
244 match job_type {
245 ObjectType::Table | ObjectType::Index => dirty_table_ids.push(job_id),
246 ObjectType::Source => dirty_source_ids.push(job_id),
247 ObjectType::Sink => dirty_sink_ids.push(job_id),
248 _ => unreachable!("unexpected streaming job type"),
249 }
250 }
251
252 let mut event_logs = vec![];
253 if !dirty_table_ids.is_empty() {
254 let table_info: Vec<(TableId, String, String)> = Table::find()
255 .select_only()
256 .columns([
257 table::Column::TableId,
258 table::Column::Name,
259 table::Column::Definition,
260 ])
261 .filter(table::Column::TableId.is_in(dirty_table_ids))
262 .into_tuple()
263 .all(txn)
264 .await?;
265 for (table_id, name, definition) in table_info {
266 let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
267 id: table_id as _,
268 name,
269 definition,
270 error: "clear during recovery".to_owned(),
271 };
272 event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
273 event,
274 ));
275 }
276 }
277 if !dirty_source_ids.is_empty() {
278 let source_info: Vec<(SourceId, String, String)> = Source::find()
279 .select_only()
280 .columns([
281 source::Column::SourceId,
282 source::Column::Name,
283 source::Column::Definition,
284 ])
285 .filter(source::Column::SourceId.is_in(dirty_source_ids))
286 .into_tuple()
287 .all(txn)
288 .await?;
289 for (source_id, name, definition) in source_info {
290 let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
291 id: source_id as _,
292 name,
293 definition,
294 error: "clear during recovery".to_owned(),
295 };
296 event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
297 event,
298 ));
299 }
300 }
301 if !dirty_sink_ids.is_empty() {
302 let sink_info: Vec<(SinkId, String, String)> = Sink::find()
303 .select_only()
304 .columns([
305 sink::Column::SinkId,
306 sink::Column::Name,
307 sink::Column::Definition,
308 ])
309 .filter(sink::Column::SinkId.is_in(dirty_sink_ids))
310 .into_tuple()
311 .all(txn)
312 .await?;
313 for (sink_id, name, definition) in sink_info {
314 let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
315 id: sink_id as _,
316 name,
317 definition,
318 error: "clear during recovery".to_owned(),
319 };
320 event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
321 event,
322 ));
323 }
324 }
325 self.env.event_log_manager_ref().add_event_logs(event_logs);
326 Ok(())
327 }
328
329 pub(crate) async fn clean_dirty_sink_downstreams(
331 txn: &DatabaseTransaction,
332 ) -> MetaResult<Vec<TableId>> {
333 let all_fragment_ids: Vec<FragmentId> = Fragment::find()
338 .select_only()
339 .column(fragment::Column::FragmentId)
340 .into_tuple()
341 .all(txn)
342 .await?;
343
344 let all_fragment_ids: HashSet<_> = all_fragment_ids.into_iter().collect();
345
346 let table_sink_ids: Vec<ObjectId> = Sink::find()
347 .select_only()
348 .column(sink::Column::SinkId)
349 .filter(sink::Column::TargetTable.is_not_null())
350 .into_tuple()
351 .all(txn)
352 .await?;
353
354 let all_table_with_incoming_sinks: Vec<(ObjectId, I32Array)> = Table::find()
355 .select_only()
356 .columns(vec![table::Column::TableId, table::Column::IncomingSinks])
357 .into_tuple()
358 .all(txn)
359 .await?;
360
361 let table_incoming_sinks_to_update = all_table_with_incoming_sinks
362 .into_iter()
363 .filter(|(_, incoming_sinks)| {
364 let inner_ref = incoming_sinks.inner_ref();
365 !inner_ref.is_empty()
366 && inner_ref
367 .iter()
368 .any(|sink_id| !table_sink_ids.contains(sink_id))
369 })
370 .collect_vec();
371
372 let new_table_incoming_sinks = table_incoming_sinks_to_update
373 .into_iter()
374 .map(|(table_id, incoming_sinks)| {
375 let new_incoming_sinks = incoming_sinks
376 .into_inner()
377 .extract_if(.., |id| table_sink_ids.contains(id))
378 .collect_vec();
379 (table_id, I32Array::from(new_incoming_sinks))
380 })
381 .collect_vec();
382
383 if new_table_incoming_sinks.is_empty() {
385 return Ok(vec![]);
386 }
387
388 let mut updated_table_ids = vec![];
389 for (table_id, new_incoming_sinks) in new_table_incoming_sinks {
390 tracing::info!("cleaning dirty table sink downstream table {}", table_id);
391 Table::update(table::ActiveModel {
392 table_id: Set(table_id as _),
393 incoming_sinks: Set(new_incoming_sinks),
394 ..Default::default()
395 })
396 .exec(txn)
397 .await?;
398 updated_table_ids.push(table_id);
399
400 let fragments: Vec<(FragmentId, StreamNode, i32)> = Fragment::find()
401 .select_only()
402 .columns(vec![
403 fragment::Column::FragmentId,
404 fragment::Column::StreamNode,
405 fragment::Column::FragmentTypeMask,
406 ])
407 .filter(fragment::Column::JobId.eq(table_id))
408 .into_tuple()
409 .all(txn)
410 .await?;
411
412 for (fragment_id, stream_node, fragment_mask) in fragments {
413 {
414 if fragment_mask & FragmentTypeFlag::Mview as i32 == 0 {
416 continue;
417 }
418
419 let mut dirty_upstream_fragment_ids = HashSet::new();
420
421 let mut pb_stream_node = stream_node.to_protobuf();
422
423 visit_stream_node_cont_mut(&mut pb_stream_node, |node| {
424 if let Some(NodeBody::Union(_)) = node.node_body {
425 node.input.retain_mut(|input| match &mut input.node_body {
426 Some(NodeBody::Project(_)) => {
427 let body = input.input.iter().exactly_one().unwrap();
428 let Some(NodeBody::Merge(merge_node)) = &body.node_body else {
429 unreachable!("expect merge node");
430 };
431 if all_fragment_ids
432 .contains(&(merge_node.upstream_fragment_id as i32))
433 {
434 true
435 } else {
436 dirty_upstream_fragment_ids
437 .insert(merge_node.upstream_fragment_id);
438 false
439 }
440 }
441 Some(NodeBody::Merge(merge_node)) => {
442 if all_fragment_ids
443 .contains(&(merge_node.upstream_fragment_id as i32))
444 {
445 true
446 } else {
447 dirty_upstream_fragment_ids
448 .insert(merge_node.upstream_fragment_id);
449 false
450 }
451 }
452 _ => false,
453 });
454 }
455 true
456 });
457
458 tracing::info!(
459 "cleaning dirty table sink fragment {:?} from downstream fragment {}",
460 dirty_upstream_fragment_ids,
461 fragment_id
462 );
463
464 if !dirty_upstream_fragment_ids.is_empty() {
465 tracing::info!(
466 "fixing dirty stream node in downstream fragment {}",
467 fragment_id
468 );
469 Fragment::update_many()
470 .col_expr(
471 fragment::Column::StreamNode,
472 StreamNode::from(&pb_stream_node).into(),
473 )
474 .filter(fragment::Column::FragmentId.eq(fragment_id))
475 .exec(txn)
476 .await?;
477 }
478 }
479 }
480 }
481
482 Ok(updated_table_ids)
483 }
484
485 pub async fn has_any_streaming_jobs(&self) -> MetaResult<bool> {
486 let inner = self.inner.read().await;
487 let count = streaming_job::Entity::find().count(&inner.db).await?;
488 Ok(count > 0)
489 }
490
491 pub async fn find_creating_streaming_job_ids(
492 &self,
493 infos: Vec<PbCreatingJobInfo>,
494 ) -> MetaResult<Vec<ObjectId>> {
495 let inner = self.inner.read().await;
496
497 type JobKey = (DatabaseId, SchemaId, String);
498
499 let creating_tables: Vec<(ObjectId, String, DatabaseId, SchemaId)> = Table::find()
501 .select_only()
502 .columns([table::Column::TableId, table::Column::Name])
503 .columns([object::Column::DatabaseId, object::Column::SchemaId])
504 .join(JoinType::InnerJoin, table::Relation::Object1.def())
505 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
506 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
507 .into_tuple()
508 .all(&inner.db)
509 .await?;
510 let creating_sinks: Vec<(ObjectId, String, DatabaseId, SchemaId)> = Sink::find()
511 .select_only()
512 .columns([sink::Column::SinkId, sink::Column::Name])
513 .columns([object::Column::DatabaseId, object::Column::SchemaId])
514 .join(JoinType::InnerJoin, sink::Relation::Object.def())
515 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
516 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
517 .into_tuple()
518 .all(&inner.db)
519 .await?;
520 let creating_subscriptions: Vec<(ObjectId, String, DatabaseId, SchemaId)> =
521 Subscription::find()
522 .select_only()
523 .columns([
524 subscription::Column::SubscriptionId,
525 subscription::Column::Name,
526 ])
527 .columns([object::Column::DatabaseId, object::Column::SchemaId])
528 .join(JoinType::InnerJoin, subscription::Relation::Object.def())
529 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
530 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
531 .into_tuple()
532 .all(&inner.db)
533 .await?;
534
535 let mut job_mapping: HashMap<JobKey, ObjectId> = creating_tables
536 .into_iter()
537 .chain(creating_sinks.into_iter())
538 .chain(creating_subscriptions.into_iter())
539 .map(|(id, name, database_id, schema_id)| ((database_id, schema_id, name), id))
540 .collect();
541
542 Ok(infos
543 .into_iter()
544 .flat_map(|info| {
545 job_mapping.remove(&(
546 info.database_id as _,
547 info.schema_id as _,
548 info.name.clone(),
549 ))
550 })
551 .collect())
552 }
553}