1use super::*;
16
17pub(crate) async fn update_internal_tables(
18 txn: &DatabaseTransaction,
19 object_id: i32,
20 column: object::Column,
21 new_value: Value,
22 objects_to_notify: &mut Vec<PbObjectInfo>,
23) -> MetaResult<()> {
24 let internal_tables = get_internal_tables_by_id(object_id, txn).await?;
25
26 if !internal_tables.is_empty() {
27 Object::update_many()
28 .col_expr(column, SimpleExpr::Value(new_value))
29 .filter(object::Column::Oid.is_in(internal_tables.clone()))
30 .exec(txn)
31 .await?;
32
33 let table_objs = Table::find()
34 .find_also_related(Object)
35 .filter(table::Column::TableId.is_in(internal_tables))
36 .all(txn)
37 .await?;
38 for (table, table_obj) in table_objs {
39 objects_to_notify.push(PbObjectInfo::Table(
40 ObjectModel(table, table_obj.unwrap()).into(),
41 ));
42 }
43 }
44 Ok(())
45}
46
47impl CatalogController {
48 pub(crate) async fn init(&self) -> MetaResult<()> {
49 self.table_catalog_cdc_table_id_update().await?;
50 Ok(())
51 }
52
53 pub(crate) async fn table_catalog_cdc_table_id_update(&self) -> MetaResult<()> {
56 let inner = self.inner.read().await;
57 let txn = inner.db.begin().await?;
58
59 let table_and_source_id: Vec<(TableId, String, SourceId)> = Table::find()
61 .join(JoinType::InnerJoin, table::Relation::ObjectDependency.def())
62 .join(
63 JoinType::InnerJoin,
64 object_dependency::Relation::Source.def(),
65 )
66 .select_only()
67 .columns([table::Column::TableId, table::Column::Definition])
68 .columns([source::Column::SourceId])
69 .filter(
70 table::Column::TableType.eq(TableType::Table).and(
71 table::Column::CdcTableId
72 .is_null()
73 .or(table::Column::CdcTableId.eq("")),
74 ),
75 )
76 .into_tuple()
77 .all(&txn)
78 .await?;
79
80 if table_and_source_id.is_empty() {
82 return Ok(());
83 }
84
85 info!(table_and_source_id = ?table_and_source_id, "cdc table with empty cdc_table_id");
86
87 let mut cdc_table_ids = HashMap::new();
88 for (table_id, definition, source_id) in table_and_source_id {
89 match extract_external_table_name_from_definition(&definition) {
90 None => {
91 tracing::warn!(
92 table_id = table_id,
93 definition = definition,
94 "failed to extract cdc table name from table definition.",
95 )
96 }
97 Some(external_table_name) => {
98 cdc_table_ids.insert(
99 table_id,
100 build_cdc_table_id(source_id as u32, &external_table_name),
101 );
102 }
103 }
104 }
105
106 for (table_id, cdc_table_id) in cdc_table_ids {
107 table::ActiveModel {
108 table_id: Set(table_id as _),
109 cdc_table_id: Set(Some(cdc_table_id)),
110 ..Default::default()
111 }
112 .update(&txn)
113 .await?;
114 }
115 txn.commit().await?;
116 Ok(())
117 }
118
119 pub(crate) async fn list_object_dependencies(
120 &self,
121 include_creating: bool,
122 ) -> MetaResult<Vec<PbObjectDependencies>> {
123 let inner = self.inner.read().await;
124
125 let dependencies: Vec<(ObjectId, ObjectId)> = {
126 let filter = if include_creating {
127 Expr::value(true)
128 } else {
129 streaming_job::Column::JobStatus.eq(JobStatus::Created)
130 };
131 ObjectDependency::find()
132 .select_only()
133 .columns([
134 object_dependency::Column::Oid,
135 object_dependency::Column::UsedBy,
136 ])
137 .join(
138 JoinType::InnerJoin,
139 object_dependency::Relation::Object1.def(),
140 )
141 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
142 .filter(filter)
143 .into_tuple()
144 .all(&inner.db)
145 .await?
146 };
147 let mut obj_dependencies = dependencies
148 .into_iter()
149 .map(|(oid, used_by)| PbObjectDependencies {
150 object_id: used_by as _,
151 referenced_object_id: oid as _,
152 })
153 .collect_vec();
154
155 let view_dependencies: Vec<(ObjectId, ObjectId)> = ObjectDependency::find()
156 .select_only()
157 .columns([
158 object_dependency::Column::Oid,
159 object_dependency::Column::UsedBy,
160 ])
161 .join(
162 JoinType::InnerJoin,
163 object_dependency::Relation::Object1.def(),
164 )
165 .join(JoinType::InnerJoin, object::Relation::View.def())
166 .into_tuple()
167 .all(&inner.db)
168 .await?;
169
170 obj_dependencies.extend(view_dependencies.into_iter().map(|(view_id, table_id)| {
171 PbObjectDependencies {
172 object_id: table_id as _,
173 referenced_object_id: view_id as _,
174 }
175 }));
176
177 let sink_dependencies: Vec<(SinkId, TableId)> = {
178 let filter = if include_creating {
179 sink::Column::TargetTable.is_not_null()
180 } else {
181 streaming_job::Column::JobStatus
182 .eq(JobStatus::Created)
183 .and(sink::Column::TargetTable.is_not_null())
184 };
185 Sink::find()
186 .select_only()
187 .columns([sink::Column::SinkId, sink::Column::TargetTable])
188 .join(JoinType::InnerJoin, sink::Relation::Object.def())
189 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
190 .filter(filter)
191 .into_tuple()
192 .all(&inner.db)
193 .await?
194 };
195 obj_dependencies.extend(sink_dependencies.into_iter().map(|(sink_id, table_id)| {
196 PbObjectDependencies {
197 object_id: table_id as _,
198 referenced_object_id: sink_id as _,
199 }
200 }));
201
202 let subscription_dependencies: Vec<(SubscriptionId, TableId)> = {
203 let filter = if include_creating {
204 subscription::Column::DependentTableId.is_not_null()
205 } else {
206 subscription::Column::SubscriptionState
207 .eq(Into::<i32>::into(SubscriptionState::Created))
208 .and(subscription::Column::DependentTableId.is_not_null())
209 };
210 Subscription::find()
211 .select_only()
212 .columns([
213 subscription::Column::SubscriptionId,
214 subscription::Column::DependentTableId,
215 ])
216 .join(JoinType::InnerJoin, subscription::Relation::Object.def())
217 .filter(filter)
218 .into_tuple()
219 .all(&inner.db)
220 .await?
221 };
222 obj_dependencies.extend(subscription_dependencies.into_iter().map(
223 |(subscription_id, table_id)| PbObjectDependencies {
224 object_id: subscription_id as _,
225 referenced_object_id: table_id as _,
226 },
227 ));
228
229 Ok(obj_dependencies)
230 }
231
232 pub(crate) async fn log_cleaned_dirty_jobs(
233 &self,
234 dirty_objs: &[PartialObject],
235 txn: &DatabaseTransaction,
236 ) -> MetaResult<()> {
237 let mut dirty_table_ids = vec![];
239 let mut dirty_source_ids = vec![];
240 let mut dirty_sink_ids = vec![];
241 for dirty_job_obj in dirty_objs {
242 let job_id = dirty_job_obj.oid;
243 let job_type = dirty_job_obj.obj_type;
244 match job_type {
245 ObjectType::Table | ObjectType::Index => dirty_table_ids.push(job_id),
246 ObjectType::Source => dirty_source_ids.push(job_id),
247 ObjectType::Sink => dirty_sink_ids.push(job_id),
248 _ => unreachable!("unexpected streaming job type"),
249 }
250 }
251
252 let mut event_logs = vec![];
253 if !dirty_table_ids.is_empty() {
254 let table_info: Vec<(TableId, String, String)> = Table::find()
255 .select_only()
256 .columns([
257 table::Column::TableId,
258 table::Column::Name,
259 table::Column::Definition,
260 ])
261 .filter(table::Column::TableId.is_in(dirty_table_ids))
262 .into_tuple()
263 .all(txn)
264 .await?;
265 for (table_id, name, definition) in table_info {
266 let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
267 id: table_id as _,
268 name,
269 definition,
270 error: "clear during recovery".to_owned(),
271 };
272 event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
273 event,
274 ));
275 }
276 }
277 if !dirty_source_ids.is_empty() {
278 let source_info: Vec<(SourceId, String, String)> = Source::find()
279 .select_only()
280 .columns([
281 source::Column::SourceId,
282 source::Column::Name,
283 source::Column::Definition,
284 ])
285 .filter(source::Column::SourceId.is_in(dirty_source_ids))
286 .into_tuple()
287 .all(txn)
288 .await?;
289 for (source_id, name, definition) in source_info {
290 let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
291 id: source_id as _,
292 name,
293 definition,
294 error: "clear during recovery".to_owned(),
295 };
296 event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
297 event,
298 ));
299 }
300 }
301 if !dirty_sink_ids.is_empty() {
302 let sink_info: Vec<(SinkId, String, String)> = Sink::find()
303 .select_only()
304 .columns([
305 sink::Column::SinkId,
306 sink::Column::Name,
307 sink::Column::Definition,
308 ])
309 .filter(sink::Column::SinkId.is_in(dirty_sink_ids))
310 .into_tuple()
311 .all(txn)
312 .await?;
313 for (sink_id, name, definition) in sink_info {
314 let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
315 id: sink_id as _,
316 name,
317 definition,
318 error: "clear during recovery".to_owned(),
319 };
320 event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
321 event,
322 ));
323 }
324 }
325 self.env.event_log_manager_ref().add_event_logs(event_logs);
326 Ok(())
327 }
328
329 pub(crate) async fn clean_dirty_sink_downstreams(
330 txn: &DatabaseTransaction,
331 ) -> MetaResult<bool> {
332 let all_fragment_ids: Vec<FragmentId> = Fragment::find()
337 .select_only()
338 .columns(vec![fragment::Column::FragmentId])
339 .into_tuple()
340 .all(txn)
341 .await?;
342
343 let all_fragment_ids: HashSet<_> = all_fragment_ids.into_iter().collect();
344
345 let table_sink_ids: Vec<ObjectId> = Sink::find()
346 .select_only()
347 .column(sink::Column::SinkId)
348 .filter(sink::Column::TargetTable.is_not_null())
349 .into_tuple()
350 .all(txn)
351 .await?;
352
353 let all_table_with_incoming_sinks: Vec<(ObjectId, I32Array)> = Table::find()
354 .select_only()
355 .columns(vec![table::Column::TableId, table::Column::IncomingSinks])
356 .into_tuple()
357 .all(txn)
358 .await?;
359
360 let table_incoming_sinks_to_update = all_table_with_incoming_sinks
361 .into_iter()
362 .filter(|(_, incoming_sinks)| {
363 let inner_ref = incoming_sinks.inner_ref();
364 !inner_ref.is_empty()
365 && inner_ref
366 .iter()
367 .any(|sink_id| !table_sink_ids.contains(sink_id))
368 })
369 .collect_vec();
370
371 let new_table_incoming_sinks = table_incoming_sinks_to_update
372 .into_iter()
373 .map(|(table_id, incoming_sinks)| {
374 let new_incoming_sinks = incoming_sinks
375 .into_inner()
376 .extract_if(.., |id| table_sink_ids.contains(id))
377 .collect_vec();
378 (table_id, I32Array::from(new_incoming_sinks))
379 })
380 .collect_vec();
381
382 if new_table_incoming_sinks.is_empty() {
384 return Ok(false);
385 }
386
387 for (table_id, new_incoming_sinks) in new_table_incoming_sinks {
388 tracing::info!("cleaning dirty table sink downstream table {}", table_id);
389 Table::update_many()
390 .col_expr(table::Column::IncomingSinks, new_incoming_sinks.into())
391 .filter(table::Column::TableId.eq(table_id))
392 .exec(txn)
393 .await?;
394
395 let fragments: Vec<(FragmentId, StreamNode, i32)> = Fragment::find()
396 .select_only()
397 .columns(vec![
398 fragment::Column::FragmentId,
399 fragment::Column::StreamNode,
400 fragment::Column::FragmentTypeMask,
401 ])
402 .filter(fragment::Column::JobId.eq(table_id))
403 .into_tuple()
404 .all(txn)
405 .await?;
406
407 for (fragment_id, stream_node, fragment_mask) in fragments {
408 {
409 assert!(fragment_mask & FragmentTypeFlag::Mview as i32 > 0);
411
412 let mut dirty_upstream_fragment_ids = HashSet::new();
413
414 let mut pb_stream_node = stream_node.to_protobuf();
415
416 visit_stream_node_cont_mut(&mut pb_stream_node, |node| {
417 if let Some(NodeBody::Union(_)) = node.node_body {
418 node.input.retain_mut(|input| {
419 if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body {
420 if all_fragment_ids
421 .contains(&(merge_node.upstream_fragment_id as i32))
422 {
423 true
424 } else {
425 dirty_upstream_fragment_ids
426 .insert(merge_node.upstream_fragment_id);
427 false
428 }
429 } else {
430 false
431 }
432 });
433 }
434 true
435 });
436
437 tracing::info!(
438 "cleaning dirty table sink fragment {:?} from downstream fragment {}",
439 dirty_upstream_fragment_ids,
440 fragment_id
441 );
442
443 Fragment::update_many()
444 .col_expr(
445 fragment::Column::StreamNode,
446 StreamNode::from(&pb_stream_node).into(),
447 )
448 .filter(fragment::Column::FragmentId.eq(fragment_id))
449 .exec(txn)
450 .await?;
451 }
452 }
453 }
454
455 Ok(true)
456 }
457
458 pub async fn has_any_streaming_jobs(&self) -> MetaResult<bool> {
459 let inner = self.inner.read().await;
460 let count = streaming_job::Entity::find().count(&inner.db).await?;
461 Ok(count > 0)
462 }
463
464 pub async fn find_creating_streaming_job_ids(
465 &self,
466 infos: Vec<PbCreatingJobInfo>,
467 ) -> MetaResult<Vec<ObjectId>> {
468 let inner = self.inner.read().await;
469
470 type JobKey = (DatabaseId, SchemaId, String);
471
472 let creating_tables: Vec<(ObjectId, String, DatabaseId, SchemaId)> = Table::find()
474 .select_only()
475 .columns([table::Column::TableId, table::Column::Name])
476 .columns([object::Column::DatabaseId, object::Column::SchemaId])
477 .join(JoinType::InnerJoin, table::Relation::Object1.def())
478 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
479 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
480 .into_tuple()
481 .all(&inner.db)
482 .await?;
483 let creating_sinks: Vec<(ObjectId, String, DatabaseId, SchemaId)> = Sink::find()
484 .select_only()
485 .columns([sink::Column::SinkId, sink::Column::Name])
486 .columns([object::Column::DatabaseId, object::Column::SchemaId])
487 .join(JoinType::InnerJoin, sink::Relation::Object.def())
488 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
489 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
490 .into_tuple()
491 .all(&inner.db)
492 .await?;
493 let creating_subscriptions: Vec<(ObjectId, String, DatabaseId, SchemaId)> =
494 Subscription::find()
495 .select_only()
496 .columns([
497 subscription::Column::SubscriptionId,
498 subscription::Column::Name,
499 ])
500 .columns([object::Column::DatabaseId, object::Column::SchemaId])
501 .join(JoinType::InnerJoin, subscription::Relation::Object.def())
502 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
503 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
504 .into_tuple()
505 .all(&inner.db)
506 .await?;
507
508 let mut job_mapping: HashMap<JobKey, ObjectId> = creating_tables
509 .into_iter()
510 .chain(creating_sinks.into_iter())
511 .chain(creating_subscriptions.into_iter())
512 .map(|(id, name, database_id, schema_id)| ((database_id, schema_id, name), id))
513 .collect();
514
515 Ok(infos
516 .into_iter()
517 .flat_map(|info| {
518 job_mapping.remove(&(
519 info.database_id as _,
520 info.schema_id as _,
521 info.name.clone(),
522 ))
523 })
524 .collect())
525 }
526}