risingwave_meta/controller/catalog/
util.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16
17pub(crate) async fn update_internal_tables(
18    txn: &DatabaseTransaction,
19    object_id: i32,
20    column: object::Column,
21    new_value: Value,
22    objects_to_notify: &mut Vec<PbObjectInfo>,
23) -> MetaResult<()> {
24    let internal_tables = get_internal_tables_by_id(object_id, txn).await?;
25
26    if !internal_tables.is_empty() {
27        Object::update_many()
28            .col_expr(column, SimpleExpr::Value(new_value))
29            .filter(object::Column::Oid.is_in(internal_tables.clone()))
30            .exec(txn)
31            .await?;
32
33        let table_objs = Table::find()
34            .find_also_related(Object)
35            .filter(table::Column::TableId.is_in(internal_tables))
36            .all(txn)
37            .await?;
38        for (table, table_obj) in table_objs {
39            objects_to_notify.push(PbObjectInfo::Table(
40                ObjectModel(table, table_obj.unwrap()).into(),
41            ));
42        }
43    }
44    Ok(())
45}
46
47impl CatalogController {
48    pub(crate) async fn init(&self) -> MetaResult<()> {
49        self.table_catalog_cdc_table_id_update().await?;
50        Ok(())
51    }
52
53    /// Fill in the `cdc_table_id` field for Table with empty `cdc_table_id` and parent Source job.
54    /// NOTES: We assume Table with a parent Source job is a CDC table
55    pub(crate) async fn table_catalog_cdc_table_id_update(&self) -> MetaResult<()> {
56        let inner = self.inner.read().await;
57        let txn = inner.db.begin().await?;
58
59        // select Tables which cdc_table_id is empty and has a parent Source job
60        let table_and_source_id: Vec<(TableId, String, SourceId)> = Table::find()
61            .join(JoinType::InnerJoin, table::Relation::ObjectDependency.def())
62            .join(
63                JoinType::InnerJoin,
64                object_dependency::Relation::Source.def(),
65            )
66            .select_only()
67            .columns([table::Column::TableId, table::Column::Definition])
68            .columns([source::Column::SourceId])
69            .filter(
70                table::Column::TableType.eq(TableType::Table).and(
71                    table::Column::CdcTableId
72                        .is_null()
73                        .or(table::Column::CdcTableId.eq("")),
74                ),
75            )
76            .into_tuple()
77            .all(&txn)
78            .await?;
79
80        // return directly if the result set is empty.
81        if table_and_source_id.is_empty() {
82            return Ok(());
83        }
84
85        info!(table_and_source_id = ?table_and_source_id, "cdc table with empty cdc_table_id");
86
87        let mut cdc_table_ids = HashMap::new();
88        for (table_id, definition, source_id) in table_and_source_id {
89            match extract_external_table_name_from_definition(&definition) {
90                None => {
91                    tracing::warn!(
92                        table_id = table_id,
93                        definition = definition,
94                        "failed to extract cdc table name from table definition.",
95                    )
96                }
97                Some(external_table_name) => {
98                    cdc_table_ids.insert(
99                        table_id,
100                        build_cdc_table_id(source_id as u32, &external_table_name),
101                    );
102                }
103            }
104        }
105
106        for (table_id, cdc_table_id) in cdc_table_ids {
107            table::ActiveModel {
108                table_id: Set(table_id as _),
109                cdc_table_id: Set(Some(cdc_table_id)),
110                ..Default::default()
111            }
112            .update(&txn)
113            .await?;
114        }
115        txn.commit().await?;
116        Ok(())
117    }
118
119    pub(crate) async fn list_object_dependencies(
120        &self,
121        include_creating: bool,
122    ) -> MetaResult<Vec<PbObjectDependencies>> {
123        let inner = self.inner.read().await;
124
125        let dependencies: Vec<(ObjectId, ObjectId)> = {
126            let filter = if include_creating {
127                Expr::value(true)
128            } else {
129                streaming_job::Column::JobStatus.eq(JobStatus::Created)
130            };
131            ObjectDependency::find()
132                .select_only()
133                .columns([
134                    object_dependency::Column::Oid,
135                    object_dependency::Column::UsedBy,
136                ])
137                .join(
138                    JoinType::InnerJoin,
139                    object_dependency::Relation::Object1.def(),
140                )
141                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
142                .filter(filter)
143                .into_tuple()
144                .all(&inner.db)
145                .await?
146        };
147        let mut obj_dependencies = dependencies
148            .into_iter()
149            .map(|(oid, used_by)| PbObjectDependencies {
150                object_id: used_by as _,
151                referenced_object_id: oid as _,
152            })
153            .collect_vec();
154
155        let view_dependencies: Vec<(ObjectId, ObjectId)> = ObjectDependency::find()
156            .select_only()
157            .columns([
158                object_dependency::Column::Oid,
159                object_dependency::Column::UsedBy,
160            ])
161            .join(
162                JoinType::InnerJoin,
163                object_dependency::Relation::Object1.def(),
164            )
165            .join(JoinType::InnerJoin, object::Relation::View.def())
166            .into_tuple()
167            .all(&inner.db)
168            .await?;
169
170        obj_dependencies.extend(view_dependencies.into_iter().map(|(view_id, table_id)| {
171            PbObjectDependencies {
172                object_id: table_id as _,
173                referenced_object_id: view_id as _,
174            }
175        }));
176
177        let sink_dependencies: Vec<(SinkId, TableId)> = {
178            let filter = if include_creating {
179                sink::Column::TargetTable.is_not_null()
180            } else {
181                streaming_job::Column::JobStatus
182                    .eq(JobStatus::Created)
183                    .and(sink::Column::TargetTable.is_not_null())
184            };
185            Sink::find()
186                .select_only()
187                .columns([sink::Column::SinkId, sink::Column::TargetTable])
188                .join(JoinType::InnerJoin, sink::Relation::Object.def())
189                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
190                .filter(filter)
191                .into_tuple()
192                .all(&inner.db)
193                .await?
194        };
195        obj_dependencies.extend(sink_dependencies.into_iter().map(|(sink_id, table_id)| {
196            PbObjectDependencies {
197                object_id: table_id as _,
198                referenced_object_id: sink_id as _,
199            }
200        }));
201
202        let subscription_dependencies: Vec<(SubscriptionId, TableId)> = {
203            let filter = if include_creating {
204                subscription::Column::DependentTableId.is_not_null()
205            } else {
206                subscription::Column::SubscriptionState
207                    .eq(Into::<i32>::into(SubscriptionState::Created))
208                    .and(subscription::Column::DependentTableId.is_not_null())
209            };
210            Subscription::find()
211                .select_only()
212                .columns([
213                    subscription::Column::SubscriptionId,
214                    subscription::Column::DependentTableId,
215                ])
216                .join(JoinType::InnerJoin, subscription::Relation::Object.def())
217                .filter(filter)
218                .into_tuple()
219                .all(&inner.db)
220                .await?
221        };
222        obj_dependencies.extend(subscription_dependencies.into_iter().map(
223            |(subscription_id, table_id)| PbObjectDependencies {
224                object_id: subscription_id as _,
225                referenced_object_id: table_id as _,
226            },
227        ));
228
229        Ok(obj_dependencies)
230    }
231
232    pub(crate) async fn log_cleaned_dirty_jobs(
233        &self,
234        dirty_objs: &[PartialObject],
235        txn: &DatabaseTransaction,
236    ) -> MetaResult<()> {
237        // Record cleaned streaming jobs in event logs.
238        let mut dirty_table_ids = vec![];
239        let mut dirty_source_ids = vec![];
240        let mut dirty_sink_ids = vec![];
241        for dirty_job_obj in dirty_objs {
242            let job_id = dirty_job_obj.oid;
243            let job_type = dirty_job_obj.obj_type;
244            match job_type {
245                ObjectType::Table | ObjectType::Index => dirty_table_ids.push(job_id),
246                ObjectType::Source => dirty_source_ids.push(job_id),
247                ObjectType::Sink => dirty_sink_ids.push(job_id),
248                _ => unreachable!("unexpected streaming job type"),
249            }
250        }
251
252        let mut event_logs = vec![];
253        if !dirty_table_ids.is_empty() {
254            let table_info: Vec<(TableId, String, String)> = Table::find()
255                .select_only()
256                .columns([
257                    table::Column::TableId,
258                    table::Column::Name,
259                    table::Column::Definition,
260                ])
261                .filter(table::Column::TableId.is_in(dirty_table_ids))
262                .into_tuple()
263                .all(txn)
264                .await?;
265            for (table_id, name, definition) in table_info {
266                let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
267                    id: table_id as _,
268                    name,
269                    definition,
270                    error: "clear during recovery".to_owned(),
271                };
272                event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
273                    event,
274                ));
275            }
276        }
277        if !dirty_source_ids.is_empty() {
278            let source_info: Vec<(SourceId, String, String)> = Source::find()
279                .select_only()
280                .columns([
281                    source::Column::SourceId,
282                    source::Column::Name,
283                    source::Column::Definition,
284                ])
285                .filter(source::Column::SourceId.is_in(dirty_source_ids))
286                .into_tuple()
287                .all(txn)
288                .await?;
289            for (source_id, name, definition) in source_info {
290                let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
291                    id: source_id as _,
292                    name,
293                    definition,
294                    error: "clear during recovery".to_owned(),
295                };
296                event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
297                    event,
298                ));
299            }
300        }
301        if !dirty_sink_ids.is_empty() {
302            let sink_info: Vec<(SinkId, String, String)> = Sink::find()
303                .select_only()
304                .columns([
305                    sink::Column::SinkId,
306                    sink::Column::Name,
307                    sink::Column::Definition,
308                ])
309                .filter(sink::Column::SinkId.is_in(dirty_sink_ids))
310                .into_tuple()
311                .all(txn)
312                .await?;
313            for (sink_id, name, definition) in sink_info {
314                let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
315                    id: sink_id as _,
316                    name,
317                    definition,
318                    error: "clear during recovery".to_owned(),
319                };
320                event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
321                    event,
322                ));
323            }
324        }
325        self.env.event_log_manager_ref().add_event_logs(event_logs);
326        Ok(())
327    }
328
329    /// Returns the IDs of tables whose catalogs have been updated.
330    pub(crate) async fn clean_dirty_sink_downstreams(
331        txn: &DatabaseTransaction,
332    ) -> MetaResult<Vec<TableId>> {
333        // clean incoming sink from (table)
334        // clean upstream fragment ids from (fragment)
335        // clean stream node from (fragment)
336        // clean upstream actor ids from (actor)
337        let all_fragment_ids: Vec<FragmentId> = Fragment::find()
338            .select_only()
339            .column(fragment::Column::FragmentId)
340            .into_tuple()
341            .all(txn)
342            .await?;
343
344        let all_fragment_ids: HashSet<_> = all_fragment_ids.into_iter().collect();
345
346        let table_sink_ids: Vec<ObjectId> = Sink::find()
347            .select_only()
348            .column(sink::Column::SinkId)
349            .filter(sink::Column::TargetTable.is_not_null())
350            .into_tuple()
351            .all(txn)
352            .await?;
353
354        let all_table_with_incoming_sinks: Vec<(ObjectId, I32Array)> = Table::find()
355            .select_only()
356            .columns(vec![table::Column::TableId, table::Column::IncomingSinks])
357            .into_tuple()
358            .all(txn)
359            .await?;
360
361        let table_incoming_sinks_to_update = all_table_with_incoming_sinks
362            .into_iter()
363            .filter(|(_, incoming_sinks)| {
364                let inner_ref = incoming_sinks.inner_ref();
365                !inner_ref.is_empty()
366                    && inner_ref
367                        .iter()
368                        .any(|sink_id| !table_sink_ids.contains(sink_id))
369            })
370            .collect_vec();
371
372        let new_table_incoming_sinks = table_incoming_sinks_to_update
373            .into_iter()
374            .map(|(table_id, incoming_sinks)| {
375                let new_incoming_sinks = incoming_sinks
376                    .into_inner()
377                    .extract_if(.., |id| table_sink_ids.contains(id))
378                    .collect_vec();
379                (table_id, I32Array::from(new_incoming_sinks))
380            })
381            .collect_vec();
382
383        // no need to update, returning
384        if new_table_incoming_sinks.is_empty() {
385            return Ok(vec![]);
386        }
387
388        let mut updated_table_ids = vec![];
389        for (table_id, new_incoming_sinks) in new_table_incoming_sinks {
390            tracing::info!("cleaning dirty table sink downstream table {}", table_id);
391            Table::update(table::ActiveModel {
392                table_id: Set(table_id as _),
393                incoming_sinks: Set(new_incoming_sinks),
394                ..Default::default()
395            })
396            .exec(txn)
397            .await?;
398            updated_table_ids.push(table_id);
399
400            let fragments: Vec<(FragmentId, StreamNode, i32)> = Fragment::find()
401                .select_only()
402                .columns(vec![
403                    fragment::Column::FragmentId,
404                    fragment::Column::StreamNode,
405                    fragment::Column::FragmentTypeMask,
406                ])
407                .filter(fragment::Column::JobId.eq(table_id))
408                .into_tuple()
409                .all(txn)
410                .await?;
411
412            for (fragment_id, stream_node, fragment_mask) in fragments {
413                {
414                    // dirty downstream should be materialize fragment of table
415                    if fragment_mask & FragmentTypeFlag::Mview as i32 == 0 {
416                        continue;
417                    }
418
419                    let mut dirty_upstream_fragment_ids = HashSet::new();
420
421                    let mut pb_stream_node = stream_node.to_protobuf();
422
423                    visit_stream_node_cont_mut(&mut pb_stream_node, |node| {
424                        if let Some(NodeBody::Union(_)) = node.node_body {
425                            node.input.retain_mut(|input| match &mut input.node_body {
426                                Some(NodeBody::Project(_)) => {
427                                    let body = input.input.iter().exactly_one().unwrap();
428                                    let Some(NodeBody::Merge(merge_node)) = &body.node_body else {
429                                        unreachable!("expect merge node");
430                                    };
431                                    if all_fragment_ids
432                                        .contains(&(merge_node.upstream_fragment_id as i32))
433                                    {
434                                        true
435                                    } else {
436                                        dirty_upstream_fragment_ids
437                                            .insert(merge_node.upstream_fragment_id);
438                                        false
439                                    }
440                                }
441                                Some(NodeBody::Merge(merge_node)) => {
442                                    if all_fragment_ids
443                                        .contains(&(merge_node.upstream_fragment_id as i32))
444                                    {
445                                        true
446                                    } else {
447                                        dirty_upstream_fragment_ids
448                                            .insert(merge_node.upstream_fragment_id);
449                                        false
450                                    }
451                                }
452                                _ => false,
453                            });
454                        }
455                        true
456                    });
457
458                    tracing::info!(
459                        "cleaning dirty table sink fragment {:?} from downstream fragment {}",
460                        dirty_upstream_fragment_ids,
461                        fragment_id
462                    );
463
464                    if !dirty_upstream_fragment_ids.is_empty() {
465                        tracing::info!(
466                            "fixing dirty stream node in downstream fragment {}",
467                            fragment_id
468                        );
469                        Fragment::update_many()
470                            .col_expr(
471                                fragment::Column::StreamNode,
472                                StreamNode::from(&pb_stream_node).into(),
473                            )
474                            .filter(fragment::Column::FragmentId.eq(fragment_id))
475                            .exec(txn)
476                            .await?;
477                    }
478                }
479            }
480        }
481
482        Ok(updated_table_ids)
483    }
484
485    pub async fn has_any_streaming_jobs(&self) -> MetaResult<bool> {
486        let inner = self.inner.read().await;
487        let count = streaming_job::Entity::find().count(&inner.db).await?;
488        Ok(count > 0)
489    }
490
491    pub async fn find_creating_streaming_job_ids(
492        &self,
493        infos: Vec<PbCreatingJobInfo>,
494    ) -> MetaResult<Vec<ObjectId>> {
495        let inner = self.inner.read().await;
496
497        type JobKey = (DatabaseId, SchemaId, String);
498
499        // Index table is already included if we still assign the same name for index table as the index.
500        let creating_tables: Vec<(ObjectId, String, DatabaseId, SchemaId)> = Table::find()
501            .select_only()
502            .columns([table::Column::TableId, table::Column::Name])
503            .columns([object::Column::DatabaseId, object::Column::SchemaId])
504            .join(JoinType::InnerJoin, table::Relation::Object1.def())
505            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
506            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
507            .into_tuple()
508            .all(&inner.db)
509            .await?;
510        let creating_sinks: Vec<(ObjectId, String, DatabaseId, SchemaId)> = Sink::find()
511            .select_only()
512            .columns([sink::Column::SinkId, sink::Column::Name])
513            .columns([object::Column::DatabaseId, object::Column::SchemaId])
514            .join(JoinType::InnerJoin, sink::Relation::Object.def())
515            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
516            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
517            .into_tuple()
518            .all(&inner.db)
519            .await?;
520        let creating_subscriptions: Vec<(ObjectId, String, DatabaseId, SchemaId)> =
521            Subscription::find()
522                .select_only()
523                .columns([
524                    subscription::Column::SubscriptionId,
525                    subscription::Column::Name,
526                ])
527                .columns([object::Column::DatabaseId, object::Column::SchemaId])
528                .join(JoinType::InnerJoin, subscription::Relation::Object.def())
529                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
530                .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
531                .into_tuple()
532                .all(&inner.db)
533                .await?;
534
535        let mut job_mapping: HashMap<JobKey, ObjectId> = creating_tables
536            .into_iter()
537            .chain(creating_sinks.into_iter())
538            .chain(creating_subscriptions.into_iter())
539            .map(|(id, name, database_id, schema_id)| ((database_id, schema_id, name), id))
540            .collect();
541
542        Ok(infos
543            .into_iter()
544            .flat_map(|info| {
545                job_mapping.remove(&(
546                    info.database_id as _,
547                    info.schema_id as _,
548                    info.name.clone(),
549                ))
550            })
551            .collect())
552    }
553}