risingwave_meta/controller/catalog/
util.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::FragmentTypeMask;
16
17use super::*;
18use crate::controller::fragment::FragmentTypeMaskExt;
19
20pub(crate) async fn update_internal_tables(
21    txn: &DatabaseTransaction,
22    object_id: i32,
23    column: object::Column,
24    new_value: Value,
25    objects_to_notify: &mut Vec<PbObjectInfo>,
26) -> MetaResult<()> {
27    let internal_tables = get_internal_tables_by_id(object_id, txn).await?;
28
29    if !internal_tables.is_empty() {
30        Object::update_many()
31            .col_expr(column, SimpleExpr::Value(new_value))
32            .filter(object::Column::Oid.is_in(internal_tables.clone()))
33            .exec(txn)
34            .await?;
35
36        let table_objs = Table::find()
37            .find_also_related(Object)
38            .filter(table::Column::TableId.is_in(internal_tables))
39            .all(txn)
40            .await?;
41        for (table, table_obj) in table_objs {
42            objects_to_notify.push(PbObjectInfo::Table(
43                ObjectModel(table, table_obj.unwrap()).into(),
44            ));
45        }
46    }
47    Ok(())
48}
49
50impl CatalogController {
51    pub(crate) async fn init(&self) -> MetaResult<()> {
52        self.table_catalog_cdc_table_id_update().await?;
53        Ok(())
54    }
55
56    /// Fill in the `cdc_table_id` field for Table with empty `cdc_table_id` and parent Source job.
57    /// NOTES: We assume Table with a parent Source job is a CDC table
58    pub(crate) async fn table_catalog_cdc_table_id_update(&self) -> MetaResult<()> {
59        let inner = self.inner.read().await;
60        let txn = inner.db.begin().await?;
61
62        // select Tables which cdc_table_id is empty and has a parent Source job
63        let table_and_source_id: Vec<(TableId, String, SourceId)> = Table::find()
64            .join(JoinType::InnerJoin, table::Relation::ObjectDependency.def())
65            .join(
66                JoinType::InnerJoin,
67                object_dependency::Relation::Source.def(),
68            )
69            .select_only()
70            .columns([table::Column::TableId, table::Column::Definition])
71            .columns([source::Column::SourceId])
72            .filter(
73                table::Column::TableType.eq(TableType::Table).and(
74                    table::Column::CdcTableId
75                        .is_null()
76                        .or(table::Column::CdcTableId.eq("")),
77                ),
78            )
79            .into_tuple()
80            .all(&txn)
81            .await?;
82
83        // return directly if the result set is empty.
84        if table_and_source_id.is_empty() {
85            return Ok(());
86        }
87
88        info!(table_and_source_id = ?table_and_source_id, "cdc table with empty cdc_table_id");
89
90        let mut cdc_table_ids = HashMap::new();
91        for (table_id, definition, source_id) in table_and_source_id {
92            match extract_external_table_name_from_definition(&definition) {
93                None => {
94                    tracing::warn!(
95                        table_id = table_id,
96                        definition = definition,
97                        "failed to extract cdc table name from table definition.",
98                    )
99                }
100                Some(external_table_name) => {
101                    cdc_table_ids.insert(
102                        table_id,
103                        build_cdc_table_id(source_id as u32, &external_table_name),
104                    );
105                }
106            }
107        }
108
109        for (table_id, cdc_table_id) in cdc_table_ids {
110            table::ActiveModel {
111                table_id: Set(table_id as _),
112                cdc_table_id: Set(Some(cdc_table_id)),
113                ..Default::default()
114            }
115            .update(&txn)
116            .await?;
117        }
118        txn.commit().await?;
119        Ok(())
120    }
121
122    pub(crate) async fn list_object_dependencies(
123        &self,
124        include_creating: bool,
125    ) -> MetaResult<Vec<PbObjectDependencies>> {
126        let inner = self.inner.read().await;
127
128        let dependencies: Vec<(ObjectId, ObjectId)> = {
129            let filter = if include_creating {
130                Expr::value(true)
131            } else {
132                streaming_job::Column::JobStatus.eq(JobStatus::Created)
133            };
134            ObjectDependency::find()
135                .select_only()
136                .columns([
137                    object_dependency::Column::Oid,
138                    object_dependency::Column::UsedBy,
139                ])
140                .join(
141                    JoinType::InnerJoin,
142                    object_dependency::Relation::Object1.def(),
143                )
144                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
145                .filter(filter)
146                .into_tuple()
147                .all(&inner.db)
148                .await?
149        };
150        let mut obj_dependencies = dependencies
151            .into_iter()
152            .map(|(oid, used_by)| PbObjectDependencies {
153                object_id: used_by as _,
154                referenced_object_id: oid as _,
155            })
156            .collect_vec();
157
158        let view_dependencies: Vec<(ObjectId, ObjectId)> = ObjectDependency::find()
159            .select_only()
160            .columns([
161                object_dependency::Column::Oid,
162                object_dependency::Column::UsedBy,
163            ])
164            .join(
165                JoinType::InnerJoin,
166                object_dependency::Relation::Object1.def(),
167            )
168            .join(JoinType::InnerJoin, object::Relation::View.def())
169            .into_tuple()
170            .all(&inner.db)
171            .await?;
172
173        obj_dependencies.extend(view_dependencies.into_iter().map(|(view_id, table_id)| {
174            PbObjectDependencies {
175                object_id: table_id as _,
176                referenced_object_id: view_id as _,
177            }
178        }));
179
180        let sink_dependencies: Vec<(SinkId, TableId)> = {
181            let filter = if include_creating {
182                sink::Column::TargetTable.is_not_null()
183            } else {
184                streaming_job::Column::JobStatus
185                    .eq(JobStatus::Created)
186                    .and(sink::Column::TargetTable.is_not_null())
187            };
188            Sink::find()
189                .select_only()
190                .columns([sink::Column::SinkId, sink::Column::TargetTable])
191                .join(JoinType::InnerJoin, sink::Relation::Object.def())
192                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
193                .filter(filter)
194                .into_tuple()
195                .all(&inner.db)
196                .await?
197        };
198        obj_dependencies.extend(sink_dependencies.into_iter().map(|(sink_id, table_id)| {
199            PbObjectDependencies {
200                object_id: table_id as _,
201                referenced_object_id: sink_id as _,
202            }
203        }));
204
205        let subscription_dependencies: Vec<(SubscriptionId, TableId)> = {
206            let filter = if include_creating {
207                subscription::Column::DependentTableId.is_not_null()
208            } else {
209                subscription::Column::SubscriptionState
210                    .eq(Into::<i32>::into(SubscriptionState::Created))
211                    .and(subscription::Column::DependentTableId.is_not_null())
212            };
213            Subscription::find()
214                .select_only()
215                .columns([
216                    subscription::Column::SubscriptionId,
217                    subscription::Column::DependentTableId,
218                ])
219                .join(JoinType::InnerJoin, subscription::Relation::Object.def())
220                .filter(filter)
221                .into_tuple()
222                .all(&inner.db)
223                .await?
224        };
225        obj_dependencies.extend(subscription_dependencies.into_iter().map(
226            |(subscription_id, table_id)| PbObjectDependencies {
227                object_id: subscription_id as _,
228                referenced_object_id: table_id as _,
229            },
230        ));
231
232        Ok(obj_dependencies)
233    }
234
235    pub(crate) async fn log_cleaned_dirty_jobs(
236        &self,
237        dirty_objs: &[PartialObject],
238        txn: &DatabaseTransaction,
239    ) -> MetaResult<()> {
240        // Record cleaned streaming jobs in event logs.
241        let mut dirty_table_ids = vec![];
242        let mut dirty_source_ids = vec![];
243        let mut dirty_sink_ids = vec![];
244        for dirty_job_obj in dirty_objs {
245            let job_id = dirty_job_obj.oid;
246            let job_type = dirty_job_obj.obj_type;
247            match job_type {
248                ObjectType::Table | ObjectType::Index => dirty_table_ids.push(job_id),
249                ObjectType::Source => dirty_source_ids.push(job_id),
250                ObjectType::Sink => dirty_sink_ids.push(job_id),
251                _ => unreachable!("unexpected streaming job type"),
252            }
253        }
254
255        let mut event_logs = vec![];
256        if !dirty_table_ids.is_empty() {
257            let table_info: Vec<(TableId, String, String)> = Table::find()
258                .select_only()
259                .columns([
260                    table::Column::TableId,
261                    table::Column::Name,
262                    table::Column::Definition,
263                ])
264                .filter(table::Column::TableId.is_in(dirty_table_ids))
265                .into_tuple()
266                .all(txn)
267                .await?;
268            for (table_id, name, definition) in table_info {
269                let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
270                    id: table_id as _,
271                    name,
272                    definition,
273                    error: "clear during recovery".to_owned(),
274                };
275                event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
276                    event,
277                ));
278            }
279        }
280        if !dirty_source_ids.is_empty() {
281            let source_info: Vec<(SourceId, String, String)> = Source::find()
282                .select_only()
283                .columns([
284                    source::Column::SourceId,
285                    source::Column::Name,
286                    source::Column::Definition,
287                ])
288                .filter(source::Column::SourceId.is_in(dirty_source_ids))
289                .into_tuple()
290                .all(txn)
291                .await?;
292            for (source_id, name, definition) in source_info {
293                let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
294                    id: source_id as _,
295                    name,
296                    definition,
297                    error: "clear during recovery".to_owned(),
298                };
299                event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
300                    event,
301                ));
302            }
303        }
304        if !dirty_sink_ids.is_empty() {
305            let sink_info: Vec<(SinkId, String, String)> = Sink::find()
306                .select_only()
307                .columns([
308                    sink::Column::SinkId,
309                    sink::Column::Name,
310                    sink::Column::Definition,
311                ])
312                .filter(sink::Column::SinkId.is_in(dirty_sink_ids))
313                .into_tuple()
314                .all(txn)
315                .await?;
316            for (sink_id, name, definition) in sink_info {
317                let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
318                    id: sink_id as _,
319                    name,
320                    definition,
321                    error: "clear during recovery".to_owned(),
322                };
323                event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
324                    event,
325                ));
326            }
327        }
328        self.env.event_log_manager_ref().add_event_logs(event_logs);
329        Ok(())
330    }
331
332    /// Returns the IDs of tables whose catalogs have been updated.
333    pub(crate) async fn clean_dirty_sink_downstreams(
334        txn: &DatabaseTransaction,
335    ) -> MetaResult<Vec<TableId>> {
336        // clean incoming sink from (table)
337        // clean upstream fragment ids from (fragment)
338        // clean stream node from (fragment)
339        // clean upstream actor ids from (actor)
340
341        // The cleanup of fragment and StreamNode is to maintain compatibility with old versions of data. For the
342        // current sink-into-table implementation, there is no need to restore the contents of StreamNode, because the
343        // `UpstreamSinkUnion` operator does not persist any data, but relies on refill during recovery.
344
345        let all_fragment_ids: Vec<FragmentId> = Fragment::find()
346            .select_only()
347            .column(fragment::Column::FragmentId)
348            .into_tuple()
349            .all(txn)
350            .await?;
351
352        let all_fragment_ids: HashSet<_> = all_fragment_ids.into_iter().collect();
353
354        let all_sink_into_tables: Vec<Option<TableId>> = Sink::find()
355            .select_only()
356            .column(sink::Column::TargetTable)
357            .filter(sink::Column::TargetTable.is_not_null())
358            .into_tuple()
359            .all(txn)
360            .await?;
361
362        let mut table_with_incoming_sinks: HashSet<TableId> = HashSet::new();
363        for target_table_id in all_sink_into_tables {
364            table_with_incoming_sinks.insert(target_table_id.expect("filter by non null"));
365        }
366
367        // no need to update, returning
368        if table_with_incoming_sinks.is_empty() {
369            return Ok(vec![]);
370        }
371
372        let mut updated_table_ids = vec![];
373        for table_id in table_with_incoming_sinks {
374            tracing::info!("cleaning dirty table sink downstream table {}", table_id);
375            updated_table_ids.push(table_id);
376
377            let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
378                .select_only()
379                .columns(vec![
380                    fragment::Column::FragmentId,
381                    fragment::Column::StreamNode,
382                ])
383                .filter(fragment::Column::JobId.eq(table_id).and(
384                    // dirty downstream should be materialize fragment of table
385                    FragmentTypeMask::intersects(FragmentTypeFlag::Mview),
386                ))
387                .into_tuple()
388                .all(txn)
389                .await?;
390
391            for (fragment_id, stream_node) in fragments {
392                {
393                    let mut dirty_upstream_fragment_ids = HashSet::new();
394
395                    let mut pb_stream_node = stream_node.to_protobuf();
396
397                    visit_stream_node_cont_mut(&mut pb_stream_node, |node| {
398                        if let Some(NodeBody::Union(_)) = node.node_body {
399                            node.input.retain_mut(|input| match &mut input.node_body {
400                                Some(NodeBody::Project(_)) => {
401                                    let body = input.input.iter().exactly_one().unwrap();
402                                    let Some(NodeBody::Merge(merge_node)) = &body.node_body else {
403                                        unreachable!("expect merge node");
404                                    };
405                                    if all_fragment_ids
406                                        .contains(&(merge_node.upstream_fragment_id as i32))
407                                    {
408                                        true
409                                    } else {
410                                        dirty_upstream_fragment_ids
411                                            .insert(merge_node.upstream_fragment_id);
412                                        false
413                                    }
414                                }
415                                Some(NodeBody::Merge(merge_node)) => {
416                                    if all_fragment_ids
417                                        .contains(&(merge_node.upstream_fragment_id as i32))
418                                    {
419                                        true
420                                    } else {
421                                        dirty_upstream_fragment_ids
422                                            .insert(merge_node.upstream_fragment_id);
423                                        false
424                                    }
425                                }
426                                _ => false,
427                            });
428                        }
429                        true
430                    });
431
432                    tracing::info!(
433                        "cleaning dirty table sink fragment {:?} from downstream fragment {}",
434                        dirty_upstream_fragment_ids,
435                        fragment_id
436                    );
437
438                    if !dirty_upstream_fragment_ids.is_empty() {
439                        tracing::info!(
440                            "fixing dirty stream node in downstream fragment {}",
441                            fragment_id
442                        );
443                        Fragment::update_many()
444                            .col_expr(
445                                fragment::Column::StreamNode,
446                                StreamNode::from(&pb_stream_node).into(),
447                            )
448                            .filter(fragment::Column::FragmentId.eq(fragment_id))
449                            .exec(txn)
450                            .await?;
451                    }
452                }
453            }
454        }
455
456        Ok(updated_table_ids)
457    }
458
459    pub async fn has_any_streaming_jobs(&self) -> MetaResult<bool> {
460        let inner = self.inner.read().await;
461        let count = streaming_job::Entity::find().count(&inner.db).await?;
462        Ok(count > 0)
463    }
464
465    pub async fn find_creating_streaming_job_ids(
466        &self,
467        infos: Vec<PbCreatingJobInfo>,
468    ) -> MetaResult<Vec<ObjectId>> {
469        let inner = self.inner.read().await;
470
471        type JobKey = (DatabaseId, SchemaId, String);
472
473        // Index table is already included if we still assign the same name for index table as the index.
474        let creating_tables: Vec<(ObjectId, String, DatabaseId, SchemaId)> = Table::find()
475            .select_only()
476            .columns([table::Column::TableId, table::Column::Name])
477            .columns([object::Column::DatabaseId, object::Column::SchemaId])
478            .join(JoinType::InnerJoin, table::Relation::Object1.def())
479            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
480            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
481            .into_tuple()
482            .all(&inner.db)
483            .await?;
484        let creating_sinks: Vec<(ObjectId, String, DatabaseId, SchemaId)> = Sink::find()
485            .select_only()
486            .columns([sink::Column::SinkId, sink::Column::Name])
487            .columns([object::Column::DatabaseId, object::Column::SchemaId])
488            .join(JoinType::InnerJoin, sink::Relation::Object.def())
489            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
490            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
491            .into_tuple()
492            .all(&inner.db)
493            .await?;
494        let creating_subscriptions: Vec<(ObjectId, String, DatabaseId, SchemaId)> =
495            Subscription::find()
496                .select_only()
497                .columns([
498                    subscription::Column::SubscriptionId,
499                    subscription::Column::Name,
500                ])
501                .columns([object::Column::DatabaseId, object::Column::SchemaId])
502                .join(JoinType::InnerJoin, subscription::Relation::Object.def())
503                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
504                .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
505                .into_tuple()
506                .all(&inner.db)
507                .await?;
508
509        let mut job_mapping: HashMap<JobKey, ObjectId> = creating_tables
510            .into_iter()
511            .chain(creating_sinks.into_iter())
512            .chain(creating_subscriptions.into_iter())
513            .map(|(id, name, database_id, schema_id)| ((database_id, schema_id, name), id))
514            .collect();
515
516        Ok(infos
517            .into_iter()
518            .flat_map(|info| {
519                job_mapping.remove(&(
520                    info.database_id as _,
521                    info.schema_id as _,
522                    info.name.clone(),
523                ))
524            })
525            .collect())
526    }
527}