risingwave_meta/controller/catalog/
util.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16
17pub(crate) async fn update_internal_tables(
18    txn: &DatabaseTransaction,
19    object_id: i32,
20    column: object::Column,
21    new_value: Value,
22    objects_to_notify: &mut Vec<PbObjectInfo>,
23) -> MetaResult<()> {
24    let internal_tables = get_internal_tables_by_id(object_id, txn).await?;
25
26    if !internal_tables.is_empty() {
27        Object::update_many()
28            .col_expr(column, SimpleExpr::Value(new_value))
29            .filter(object::Column::Oid.is_in(internal_tables.clone()))
30            .exec(txn)
31            .await?;
32
33        let table_objs = Table::find()
34            .find_also_related(Object)
35            .filter(table::Column::TableId.is_in(internal_tables))
36            .all(txn)
37            .await?;
38        for (table, table_obj) in table_objs {
39            objects_to_notify.push(PbObjectInfo::Table(
40                ObjectModel(table, table_obj.unwrap()).into(),
41            ));
42        }
43    }
44    Ok(())
45}
46
47impl CatalogController {
48    pub(crate) async fn init(&self) -> MetaResult<()> {
49        self.table_catalog_cdc_table_id_update().await?;
50        Ok(())
51    }
52
53    /// Fill in the `cdc_table_id` field for Table with empty `cdc_table_id` and parent Source job.
54    /// NOTES: We assume Table with a parent Source job is a CDC table
55    pub(crate) async fn table_catalog_cdc_table_id_update(&self) -> MetaResult<()> {
56        let inner = self.inner.read().await;
57        let txn = inner.db.begin().await?;
58
59        // select Tables which cdc_table_id is empty and has a parent Source job
60        let table_and_source_id: Vec<(TableId, String, SourceId)> = Table::find()
61            .join(JoinType::InnerJoin, table::Relation::ObjectDependency.def())
62            .join(
63                JoinType::InnerJoin,
64                object_dependency::Relation::Source.def(),
65            )
66            .select_only()
67            .columns([table::Column::TableId, table::Column::Definition])
68            .columns([source::Column::SourceId])
69            .filter(
70                table::Column::TableType.eq(TableType::Table).and(
71                    table::Column::CdcTableId
72                        .is_null()
73                        .or(table::Column::CdcTableId.eq("")),
74                ),
75            )
76            .into_tuple()
77            .all(&txn)
78            .await?;
79
80        // return directly if the result set is empty.
81        if table_and_source_id.is_empty() {
82            return Ok(());
83        }
84
85        info!(table_and_source_id = ?table_and_source_id, "cdc table with empty cdc_table_id");
86
87        let mut cdc_table_ids = HashMap::new();
88        for (table_id, definition, source_id) in table_and_source_id {
89            match extract_external_table_name_from_definition(&definition) {
90                None => {
91                    tracing::warn!(
92                        table_id = table_id,
93                        definition = definition,
94                        "failed to extract cdc table name from table definition.",
95                    )
96                }
97                Some(external_table_name) => {
98                    cdc_table_ids.insert(
99                        table_id,
100                        build_cdc_table_id(source_id as u32, &external_table_name),
101                    );
102                }
103            }
104        }
105
106        for (table_id, cdc_table_id) in cdc_table_ids {
107            table::ActiveModel {
108                table_id: Set(table_id as _),
109                cdc_table_id: Set(Some(cdc_table_id)),
110                ..Default::default()
111            }
112            .update(&txn)
113            .await?;
114        }
115        txn.commit().await?;
116        Ok(())
117    }
118
119    pub(crate) async fn list_object_dependencies(
120        &self,
121        include_creating: bool,
122    ) -> MetaResult<Vec<PbObjectDependencies>> {
123        let inner = self.inner.read().await;
124
125        let dependencies: Vec<(ObjectId, ObjectId)> = {
126            let filter = if include_creating {
127                Expr::value(true)
128            } else {
129                streaming_job::Column::JobStatus.eq(JobStatus::Created)
130            };
131            ObjectDependency::find()
132                .select_only()
133                .columns([
134                    object_dependency::Column::Oid,
135                    object_dependency::Column::UsedBy,
136                ])
137                .join(
138                    JoinType::InnerJoin,
139                    object_dependency::Relation::Object1.def(),
140                )
141                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
142                .filter(filter)
143                .into_tuple()
144                .all(&inner.db)
145                .await?
146        };
147        let mut obj_dependencies = dependencies
148            .into_iter()
149            .map(|(oid, used_by)| PbObjectDependencies {
150                object_id: used_by as _,
151                referenced_object_id: oid as _,
152            })
153            .collect_vec();
154
155        let view_dependencies: Vec<(ObjectId, ObjectId)> = ObjectDependency::find()
156            .select_only()
157            .columns([
158                object_dependency::Column::Oid,
159                object_dependency::Column::UsedBy,
160            ])
161            .join(
162                JoinType::InnerJoin,
163                object_dependency::Relation::Object1.def(),
164            )
165            .join(JoinType::InnerJoin, object::Relation::View.def())
166            .into_tuple()
167            .all(&inner.db)
168            .await?;
169
170        obj_dependencies.extend(view_dependencies.into_iter().map(|(view_id, table_id)| {
171            PbObjectDependencies {
172                object_id: table_id as _,
173                referenced_object_id: view_id as _,
174            }
175        }));
176
177        let sink_dependencies: Vec<(SinkId, TableId)> = {
178            let filter = if include_creating {
179                sink::Column::TargetTable.is_not_null()
180            } else {
181                streaming_job::Column::JobStatus
182                    .eq(JobStatus::Created)
183                    .and(sink::Column::TargetTable.is_not_null())
184            };
185            Sink::find()
186                .select_only()
187                .columns([sink::Column::SinkId, sink::Column::TargetTable])
188                .join(JoinType::InnerJoin, sink::Relation::Object.def())
189                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
190                .filter(filter)
191                .into_tuple()
192                .all(&inner.db)
193                .await?
194        };
195        obj_dependencies.extend(sink_dependencies.into_iter().map(|(sink_id, table_id)| {
196            PbObjectDependencies {
197                object_id: table_id as _,
198                referenced_object_id: sink_id as _,
199            }
200        }));
201
202        let subscription_dependencies: Vec<(SubscriptionId, TableId)> = {
203            let filter = if include_creating {
204                subscription::Column::DependentTableId.is_not_null()
205            } else {
206                subscription::Column::SubscriptionState
207                    .eq(Into::<i32>::into(SubscriptionState::Created))
208                    .and(subscription::Column::DependentTableId.is_not_null())
209            };
210            Subscription::find()
211                .select_only()
212                .columns([
213                    subscription::Column::SubscriptionId,
214                    subscription::Column::DependentTableId,
215                ])
216                .join(JoinType::InnerJoin, subscription::Relation::Object.def())
217                .filter(filter)
218                .into_tuple()
219                .all(&inner.db)
220                .await?
221        };
222        obj_dependencies.extend(subscription_dependencies.into_iter().map(
223            |(subscription_id, table_id)| PbObjectDependencies {
224                object_id: subscription_id as _,
225                referenced_object_id: table_id as _,
226            },
227        ));
228
229        Ok(obj_dependencies)
230    }
231
232    pub(crate) async fn log_cleaned_dirty_jobs(
233        &self,
234        dirty_objs: &[PartialObject],
235        txn: &DatabaseTransaction,
236    ) -> MetaResult<()> {
237        // Record cleaned streaming jobs in event logs.
238        let mut dirty_table_ids = vec![];
239        let mut dirty_source_ids = vec![];
240        let mut dirty_sink_ids = vec![];
241        for dirty_job_obj in dirty_objs {
242            let job_id = dirty_job_obj.oid;
243            let job_type = dirty_job_obj.obj_type;
244            match job_type {
245                ObjectType::Table | ObjectType::Index => dirty_table_ids.push(job_id),
246                ObjectType::Source => dirty_source_ids.push(job_id),
247                ObjectType::Sink => dirty_sink_ids.push(job_id),
248                _ => unreachable!("unexpected streaming job type"),
249            }
250        }
251
252        let mut event_logs = vec![];
253        if !dirty_table_ids.is_empty() {
254            let table_info: Vec<(TableId, String, String)> = Table::find()
255                .select_only()
256                .columns([
257                    table::Column::TableId,
258                    table::Column::Name,
259                    table::Column::Definition,
260                ])
261                .filter(table::Column::TableId.is_in(dirty_table_ids))
262                .into_tuple()
263                .all(txn)
264                .await?;
265            for (table_id, name, definition) in table_info {
266                let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
267                    id: table_id as _,
268                    name,
269                    definition,
270                    error: "clear during recovery".to_owned(),
271                };
272                event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
273                    event,
274                ));
275            }
276        }
277        if !dirty_source_ids.is_empty() {
278            let source_info: Vec<(SourceId, String, String)> = Source::find()
279                .select_only()
280                .columns([
281                    source::Column::SourceId,
282                    source::Column::Name,
283                    source::Column::Definition,
284                ])
285                .filter(source::Column::SourceId.is_in(dirty_source_ids))
286                .into_tuple()
287                .all(txn)
288                .await?;
289            for (source_id, name, definition) in source_info {
290                let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
291                    id: source_id as _,
292                    name,
293                    definition,
294                    error: "clear during recovery".to_owned(),
295                };
296                event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
297                    event,
298                ));
299            }
300        }
301        if !dirty_sink_ids.is_empty() {
302            let sink_info: Vec<(SinkId, String, String)> = Sink::find()
303                .select_only()
304                .columns([
305                    sink::Column::SinkId,
306                    sink::Column::Name,
307                    sink::Column::Definition,
308                ])
309                .filter(sink::Column::SinkId.is_in(dirty_sink_ids))
310                .into_tuple()
311                .all(txn)
312                .await?;
313            for (sink_id, name, definition) in sink_info {
314                let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
315                    id: sink_id as _,
316                    name,
317                    definition,
318                    error: "clear during recovery".to_owned(),
319                };
320                event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
321                    event,
322                ));
323            }
324        }
325        self.env.event_log_manager_ref().add_event_logs(event_logs);
326        Ok(())
327    }
328
329    pub(crate) async fn clean_dirty_sink_downstreams(
330        txn: &DatabaseTransaction,
331    ) -> MetaResult<bool> {
332        // clean incoming sink from (table)
333        // clean upstream fragment ids from (fragment)
334        // clean stream node from (fragment)
335        // clean upstream actor ids from (actor)
336        let all_fragment_ids: Vec<FragmentId> = Fragment::find()
337            .select_only()
338            .columns(vec![fragment::Column::FragmentId])
339            .into_tuple()
340            .all(txn)
341            .await?;
342
343        let all_fragment_ids: HashSet<_> = all_fragment_ids.into_iter().collect();
344
345        let table_sink_ids: Vec<ObjectId> = Sink::find()
346            .select_only()
347            .column(sink::Column::SinkId)
348            .filter(sink::Column::TargetTable.is_not_null())
349            .into_tuple()
350            .all(txn)
351            .await?;
352
353        let all_table_with_incoming_sinks: Vec<(ObjectId, I32Array)> = Table::find()
354            .select_only()
355            .columns(vec![table::Column::TableId, table::Column::IncomingSinks])
356            .into_tuple()
357            .all(txn)
358            .await?;
359
360        let table_incoming_sinks_to_update = all_table_with_incoming_sinks
361            .into_iter()
362            .filter(|(_, incoming_sinks)| {
363                let inner_ref = incoming_sinks.inner_ref();
364                !inner_ref.is_empty()
365                    && inner_ref
366                        .iter()
367                        .any(|sink_id| !table_sink_ids.contains(sink_id))
368            })
369            .collect_vec();
370
371        let new_table_incoming_sinks = table_incoming_sinks_to_update
372            .into_iter()
373            .map(|(table_id, incoming_sinks)| {
374                let new_incoming_sinks = incoming_sinks
375                    .into_inner()
376                    .extract_if(.., |id| table_sink_ids.contains(id))
377                    .collect_vec();
378                (table_id, I32Array::from(new_incoming_sinks))
379            })
380            .collect_vec();
381
382        // no need to update, returning
383        if new_table_incoming_sinks.is_empty() {
384            return Ok(false);
385        }
386
387        for (table_id, new_incoming_sinks) in new_table_incoming_sinks {
388            tracing::info!("cleaning dirty table sink downstream table {}", table_id);
389            Table::update_many()
390                .col_expr(table::Column::IncomingSinks, new_incoming_sinks.into())
391                .filter(table::Column::TableId.eq(table_id))
392                .exec(txn)
393                .await?;
394
395            let fragments: Vec<(FragmentId, StreamNode, i32)> = Fragment::find()
396                .select_only()
397                .columns(vec![
398                    fragment::Column::FragmentId,
399                    fragment::Column::StreamNode,
400                    fragment::Column::FragmentTypeMask,
401                ])
402                .filter(fragment::Column::JobId.eq(table_id))
403                .into_tuple()
404                .all(txn)
405                .await?;
406
407            for (fragment_id, stream_node, fragment_mask) in fragments {
408                {
409                    // dirty downstream should be materialize fragment of table
410                    assert!(fragment_mask & FragmentTypeFlag::Mview as i32 > 0);
411
412                    let mut dirty_upstream_fragment_ids = HashSet::new();
413
414                    let mut pb_stream_node = stream_node.to_protobuf();
415
416                    visit_stream_node_cont_mut(&mut pb_stream_node, |node| {
417                        if let Some(NodeBody::Union(_)) = node.node_body {
418                            node.input.retain_mut(|input| {
419                                if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body {
420                                    if all_fragment_ids
421                                        .contains(&(merge_node.upstream_fragment_id as i32))
422                                    {
423                                        true
424                                    } else {
425                                        dirty_upstream_fragment_ids
426                                            .insert(merge_node.upstream_fragment_id);
427                                        false
428                                    }
429                                } else {
430                                    false
431                                }
432                            });
433                        }
434                        true
435                    });
436
437                    tracing::info!(
438                        "cleaning dirty table sink fragment {:?} from downstream fragment {}",
439                        dirty_upstream_fragment_ids,
440                        fragment_id
441                    );
442
443                    Fragment::update_many()
444                        .col_expr(
445                            fragment::Column::StreamNode,
446                            StreamNode::from(&pb_stream_node).into(),
447                        )
448                        .filter(fragment::Column::FragmentId.eq(fragment_id))
449                        .exec(txn)
450                        .await?;
451                }
452            }
453        }
454
455        Ok(true)
456    }
457
458    pub async fn has_any_streaming_jobs(&self) -> MetaResult<bool> {
459        let inner = self.inner.read().await;
460        let count = streaming_job::Entity::find().count(&inner.db).await?;
461        Ok(count > 0)
462    }
463
464    pub async fn find_creating_streaming_job_ids(
465        &self,
466        infos: Vec<PbCreatingJobInfo>,
467    ) -> MetaResult<Vec<ObjectId>> {
468        let inner = self.inner.read().await;
469
470        type JobKey = (DatabaseId, SchemaId, String);
471
472        // Index table is already included if we still assign the same name for index table as the index.
473        let creating_tables: Vec<(ObjectId, String, DatabaseId, SchemaId)> = Table::find()
474            .select_only()
475            .columns([table::Column::TableId, table::Column::Name])
476            .columns([object::Column::DatabaseId, object::Column::SchemaId])
477            .join(JoinType::InnerJoin, table::Relation::Object1.def())
478            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
479            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
480            .into_tuple()
481            .all(&inner.db)
482            .await?;
483        let creating_sinks: Vec<(ObjectId, String, DatabaseId, SchemaId)> = Sink::find()
484            .select_only()
485            .columns([sink::Column::SinkId, sink::Column::Name])
486            .columns([object::Column::DatabaseId, object::Column::SchemaId])
487            .join(JoinType::InnerJoin, sink::Relation::Object.def())
488            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
489            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
490            .into_tuple()
491            .all(&inner.db)
492            .await?;
493        let creating_subscriptions: Vec<(ObjectId, String, DatabaseId, SchemaId)> =
494            Subscription::find()
495                .select_only()
496                .columns([
497                    subscription::Column::SubscriptionId,
498                    subscription::Column::Name,
499                ])
500                .columns([object::Column::DatabaseId, object::Column::SchemaId])
501                .join(JoinType::InnerJoin, subscription::Relation::Object.def())
502                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
503                .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
504                .into_tuple()
505                .all(&inner.db)
506                .await?;
507
508        let mut job_mapping: HashMap<JobKey, ObjectId> = creating_tables
509            .into_iter()
510            .chain(creating_sinks.into_iter())
511            .chain(creating_subscriptions.into_iter())
512            .map(|(id, name, database_id, schema_id)| ((database_id, schema_id, name), id))
513            .collect();
514
515        Ok(infos
516            .into_iter()
517            .flat_map(|info| {
518                job_mapping.remove(&(
519                    info.database_id as _,
520                    info.schema_id as _,
521                    info.name.clone(),
522                ))
523            })
524            .collect())
525    }
526}