risingwave_meta/controller/catalog/
util.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::FragmentTypeMask;
16
17use super::*;
18use crate::controller::fragment::FragmentTypeMaskExt;
19
20pub(crate) async fn update_internal_tables(
21    txn: &DatabaseTransaction,
22    object_id: ObjectId,
23    column: object::Column,
24    new_value: Value,
25    objects_to_notify: &mut Vec<PbObjectInfo>,
26) -> MetaResult<()> {
27    let internal_tables = get_internal_tables_by_id(object_id.as_job_id(), txn).await?;
28
29    if !internal_tables.is_empty() {
30        Object::update_many()
31            .col_expr(column, SimpleExpr::Value(new_value))
32            .filter(object::Column::Oid.is_in(internal_tables.clone()))
33            .exec(txn)
34            .await?;
35
36        let table_objs = Table::find()
37            .find_also_related(Object)
38            .filter(table::Column::TableId.is_in(internal_tables))
39            .all(txn)
40            .await?;
41        for (table, table_obj) in table_objs {
42            objects_to_notify.push(PbObjectInfo::Table(
43                ObjectModel(table, table_obj.unwrap()).into(),
44            ));
45        }
46    }
47    Ok(())
48}
49
50impl CatalogController {
51    pub(crate) async fn init(&self) -> MetaResult<()> {
52        self.table_catalog_cdc_table_id_update().await?;
53        Ok(())
54    }
55
56    /// Fill in the `cdc_table_id` field for Table with empty `cdc_table_id` and parent Source job.
57    /// NOTES: We assume Table with a parent Source job is a CDC table
58    pub(crate) async fn table_catalog_cdc_table_id_update(&self) -> MetaResult<()> {
59        let inner = self.inner.read().await;
60        let txn = inner.db.begin().await?;
61
62        // select Tables which cdc_table_id is empty and has a parent Source job
63        let table_and_source_id: Vec<(TableId, String, SourceId)> = Table::find()
64            .join(JoinType::InnerJoin, table::Relation::ObjectDependency.def())
65            .join(
66                JoinType::InnerJoin,
67                object_dependency::Relation::Source.def(),
68            )
69            .select_only()
70            .columns([table::Column::TableId, table::Column::Definition])
71            .columns([source::Column::SourceId])
72            .filter(
73                table::Column::TableType.eq(TableType::Table).and(
74                    table::Column::CdcTableId
75                        .is_null()
76                        .or(table::Column::CdcTableId.eq("")),
77                ),
78            )
79            .into_tuple()
80            .all(&txn)
81            .await?;
82
83        // return directly if the result set is empty.
84        if table_and_source_id.is_empty() {
85            return Ok(());
86        }
87
88        info!(table_and_source_id = ?table_and_source_id, "cdc table with empty cdc_table_id");
89
90        let mut cdc_table_ids = HashMap::new();
91        for (table_id, definition, source_id) in table_and_source_id {
92            match extract_external_table_name_from_definition(&definition) {
93                None => {
94                    tracing::warn!(
95                        %table_id,
96                        definition,
97                        "failed to extract cdc table name from table definition.",
98                    )
99                }
100                Some(external_table_name) => {
101                    cdc_table_ids.insert(
102                        table_id,
103                        build_cdc_table_id(source_id, &external_table_name),
104                    );
105                }
106            }
107        }
108
109        for (table_id, cdc_table_id) in cdc_table_ids {
110            table::ActiveModel {
111                table_id: Set(table_id as _),
112                cdc_table_id: Set(Some(cdc_table_id)),
113                ..Default::default()
114            }
115            .update(&txn)
116            .await?;
117        }
118        txn.commit().await?;
119        Ok(())
120    }
121
122    pub(crate) async fn list_object_dependencies(
123        &self,
124        include_creating: bool,
125    ) -> MetaResult<Vec<PbObjectDependencies>> {
126        let inner = self.inner.read().await;
127
128        let dependencies: Vec<(ObjectId, ObjectId)> = {
129            let filter = if include_creating {
130                Expr::value(true)
131            } else {
132                streaming_job::Column::JobStatus.eq(JobStatus::Created)
133            };
134            ObjectDependency::find()
135                .select_only()
136                .columns([
137                    object_dependency::Column::Oid,
138                    object_dependency::Column::UsedBy,
139                ])
140                .join(
141                    JoinType::InnerJoin,
142                    object_dependency::Relation::Object1.def(),
143                )
144                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
145                .filter(filter)
146                .into_tuple()
147                .all(&inner.db)
148                .await?
149        };
150        let mut obj_dependencies = dependencies
151            .into_iter()
152            .map(|(oid, used_by)| PbObjectDependencies {
153                object_id: used_by,
154                referenced_object_id: oid,
155            })
156            .collect_vec();
157
158        let view_dependencies: Vec<(ObjectId, ObjectId)> = ObjectDependency::find()
159            .select_only()
160            .columns([
161                object_dependency::Column::Oid,
162                object_dependency::Column::UsedBy,
163            ])
164            .join(
165                JoinType::InnerJoin,
166                object_dependency::Relation::Object1.def(),
167            )
168            .join(JoinType::InnerJoin, object::Relation::View.def())
169            .into_tuple()
170            .all(&inner.db)
171            .await?;
172
173        obj_dependencies.extend(view_dependencies.into_iter().map(|(view_id, table_id)| {
174            PbObjectDependencies {
175                object_id: table_id,
176                referenced_object_id: view_id,
177            }
178        }));
179
180        let sink_dependencies: Vec<(SinkId, TableId)> = {
181            let filter = if include_creating {
182                sink::Column::TargetTable.is_not_null()
183            } else {
184                streaming_job::Column::JobStatus
185                    .eq(JobStatus::Created)
186                    .and(sink::Column::TargetTable.is_not_null())
187            };
188            Sink::find()
189                .select_only()
190                .columns([sink::Column::SinkId, sink::Column::TargetTable])
191                .join(JoinType::InnerJoin, sink::Relation::Object.def())
192                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
193                .filter(filter)
194                .into_tuple()
195                .all(&inner.db)
196                .await?
197        };
198        obj_dependencies.extend(sink_dependencies.into_iter().map(|(sink_id, table_id)| {
199            PbObjectDependencies {
200                object_id: table_id.into(),
201                referenced_object_id: sink_id.into(),
202            }
203        }));
204
205        let subscription_dependencies: Vec<(SubscriptionId, TableId)> = {
206            let filter = if include_creating {
207                subscription::Column::DependentTableId.is_not_null()
208            } else {
209                subscription::Column::SubscriptionState
210                    .eq(Into::<i32>::into(SubscriptionState::Created))
211                    .and(subscription::Column::DependentTableId.is_not_null())
212            };
213            Subscription::find()
214                .select_only()
215                .columns([
216                    subscription::Column::SubscriptionId,
217                    subscription::Column::DependentTableId,
218                ])
219                .join(JoinType::InnerJoin, subscription::Relation::Object.def())
220                .filter(filter)
221                .into_tuple()
222                .all(&inner.db)
223                .await?
224        };
225        obj_dependencies.extend(subscription_dependencies.into_iter().map(
226            |(subscription_id, table_id)| PbObjectDependencies {
227                object_id: subscription_id.into(),
228                referenced_object_id: table_id.into(),
229            },
230        ));
231
232        Ok(obj_dependencies)
233    }
234
235    pub(crate) async fn log_cleaned_dirty_jobs(
236        &self,
237        dirty_objs: &[PartialObject],
238        txn: &DatabaseTransaction,
239    ) -> MetaResult<()> {
240        // Record cleaned streaming jobs in event logs.
241        let mut dirty_table_ids = vec![];
242        let mut dirty_source_ids = vec![];
243        let mut dirty_sink_ids = vec![];
244        for dirty_job_obj in dirty_objs {
245            let job_id = dirty_job_obj.oid;
246            let job_type = dirty_job_obj.obj_type;
247            match job_type {
248                ObjectType::Table | ObjectType::Index => dirty_table_ids.push(job_id),
249                ObjectType::Source => dirty_source_ids.push(job_id),
250                ObjectType::Sink => dirty_sink_ids.push(job_id),
251                _ => unreachable!("unexpected streaming job type"),
252            }
253        }
254
255        let mut event_logs = vec![];
256        if !dirty_table_ids.is_empty() {
257            let table_info: Vec<(TableId, String, String)> = Table::find()
258                .select_only()
259                .columns([
260                    table::Column::TableId,
261                    table::Column::Name,
262                    table::Column::Definition,
263                ])
264                .filter(table::Column::TableId.is_in(dirty_table_ids))
265                .into_tuple()
266                .all(txn)
267                .await?;
268            for (table_id, name, definition) in table_info {
269                let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
270                    id: table_id.as_job_id(),
271                    name,
272                    definition,
273                    error: "clear during recovery".to_owned(),
274                };
275                event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
276                    event,
277                ));
278            }
279        }
280        if !dirty_source_ids.is_empty() {
281            let source_info: Vec<(SourceId, String, String)> = Source::find()
282                .select_only()
283                .columns([
284                    source::Column::SourceId,
285                    source::Column::Name,
286                    source::Column::Definition,
287                ])
288                .filter(source::Column::SourceId.is_in(dirty_source_ids))
289                .into_tuple()
290                .all(txn)
291                .await?;
292            for (source_id, name, definition) in source_info {
293                let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
294                    id: source_id.as_share_source_job_id(),
295                    name,
296                    definition,
297                    error: "clear during recovery".to_owned(),
298                };
299                event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
300                    event,
301                ));
302            }
303        }
304        if !dirty_sink_ids.is_empty() {
305            let sink_info: Vec<(SinkId, String, String)> = Sink::find()
306                .select_only()
307                .columns([
308                    sink::Column::SinkId,
309                    sink::Column::Name,
310                    sink::Column::Definition,
311                ])
312                .filter(sink::Column::SinkId.is_in(dirty_sink_ids))
313                .into_tuple()
314                .all(txn)
315                .await?;
316            for (sink_id, name, definition) in sink_info {
317                let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
318                    id: sink_id.as_job_id(),
319                    name,
320                    definition,
321                    error: "clear during recovery".to_owned(),
322                };
323                event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
324                    event,
325                ));
326            }
327        }
328        self.env.event_log_manager_ref().add_event_logs(event_logs);
329        Ok(())
330    }
331
332    pub(crate) async fn clean_dirty_sink_downstreams(txn: &DatabaseTransaction) -> MetaResult<()> {
333        // clean incoming sink from (table)
334        // clean upstream fragment ids from (fragment)
335        // clean stream node from (fragment)
336        // clean upstream actor ids from (actor)
337
338        // The cleanup of fragment and StreamNode is to maintain compatibility with old versions of data. For the
339        // current sink-into-table implementation, there is no need to restore the contents of StreamNode, because the
340        // `UpstreamSinkUnion` operator does not persist any data, but relies on refill during recovery.
341
342        let all_fragment_ids: Vec<FragmentId> = Fragment::find()
343            .select_only()
344            .column(fragment::Column::FragmentId)
345            .into_tuple()
346            .all(txn)
347            .await?;
348
349        let all_fragment_ids: HashSet<_> = all_fragment_ids.into_iter().collect();
350
351        let all_sink_into_tables: Vec<Option<TableId>> = Sink::find()
352            .select_only()
353            .column(sink::Column::TargetTable)
354            .filter(sink::Column::TargetTable.is_not_null())
355            .into_tuple()
356            .all(txn)
357            .await?;
358
359        let mut table_with_incoming_sinks: HashSet<TableId> = HashSet::new();
360        for target_table_id in all_sink_into_tables {
361            table_with_incoming_sinks.insert(target_table_id.expect("filter by non null"));
362        }
363
364        // no need to update, returning
365        if table_with_incoming_sinks.is_empty() {
366            return Ok(());
367        }
368
369        for table_id in table_with_incoming_sinks {
370            tracing::info!("cleaning dirty table sink downstream table {}", table_id);
371
372            let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
373                .select_only()
374                .columns(vec![
375                    fragment::Column::FragmentId,
376                    fragment::Column::StreamNode,
377                ])
378                .filter(fragment::Column::JobId.eq(table_id).and(
379                    // dirty downstream should be materialize fragment of table
380                    FragmentTypeMask::intersects(FragmentTypeFlag::Mview),
381                ))
382                .into_tuple()
383                .all(txn)
384                .await?;
385
386            for (fragment_id, stream_node) in fragments {
387                {
388                    let mut dirty_upstream_fragment_ids = HashSet::new();
389
390                    let mut pb_stream_node = stream_node.to_protobuf();
391
392                    visit_stream_node_cont_mut(&mut pb_stream_node, |node| {
393                        if let Some(NodeBody::Union(_)) = node.node_body {
394                            node.input.retain_mut(|input| match &mut input.node_body {
395                                Some(NodeBody::Project(_)) => {
396                                    let body = input.input.iter().exactly_one().unwrap();
397                                    let Some(NodeBody::Merge(merge_node)) = &body.node_body else {
398                                        unreachable!("expect merge node");
399                                    };
400                                    if all_fragment_ids.contains(&(merge_node.upstream_fragment_id))
401                                    {
402                                        true
403                                    } else {
404                                        dirty_upstream_fragment_ids
405                                            .insert(merge_node.upstream_fragment_id);
406                                        false
407                                    }
408                                }
409                                Some(NodeBody::Merge(merge_node)) => {
410                                    if all_fragment_ids.contains(&(merge_node.upstream_fragment_id))
411                                    {
412                                        true
413                                    } else {
414                                        dirty_upstream_fragment_ids
415                                            .insert(merge_node.upstream_fragment_id);
416                                        false
417                                    }
418                                }
419                                _ => false,
420                            });
421                        }
422                        true
423                    });
424
425                    tracing::info!(
426                        "cleaning dirty table sink fragment {:?} from downstream fragment {}",
427                        dirty_upstream_fragment_ids,
428                        fragment_id
429                    );
430
431                    if !dirty_upstream_fragment_ids.is_empty() {
432                        tracing::info!(
433                            "fixing dirty stream node in downstream fragment {}",
434                            fragment_id
435                        );
436                        Fragment::update_many()
437                            .col_expr(
438                                fragment::Column::StreamNode,
439                                StreamNode::from(&pb_stream_node).into(),
440                            )
441                            .filter(fragment::Column::FragmentId.eq(fragment_id))
442                            .exec(txn)
443                            .await?;
444                    }
445                }
446            }
447        }
448
449        Ok(())
450    }
451
452    pub async fn has_any_streaming_jobs(&self) -> MetaResult<bool> {
453        let inner = self.inner.read().await;
454        let count = streaming_job::Entity::find().count(&inner.db).await?;
455        Ok(count > 0)
456    }
457
458    pub async fn find_creating_streaming_job_ids(
459        &self,
460        infos: Vec<PbCreatingJobInfo>,
461    ) -> MetaResult<Vec<ObjectId>> {
462        let inner = self.inner.read().await;
463
464        type JobKey = (DatabaseId, SchemaId, String);
465
466        // Index table is already included if we still assign the same name for index table as the index.
467        let creating_tables: Vec<(ObjectId, String, DatabaseId, SchemaId)> = Table::find()
468            .select_only()
469            .columns([table::Column::TableId, table::Column::Name])
470            .columns([object::Column::DatabaseId, object::Column::SchemaId])
471            .join(JoinType::InnerJoin, table::Relation::Object1.def())
472            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
473            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
474            .into_tuple()
475            .all(&inner.db)
476            .await?;
477        let creating_sinks: Vec<(ObjectId, String, DatabaseId, SchemaId)> = Sink::find()
478            .select_only()
479            .columns([sink::Column::SinkId, sink::Column::Name])
480            .columns([object::Column::DatabaseId, object::Column::SchemaId])
481            .join(JoinType::InnerJoin, sink::Relation::Object.def())
482            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
483            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
484            .into_tuple()
485            .all(&inner.db)
486            .await?;
487        let creating_subscriptions: Vec<(ObjectId, String, DatabaseId, SchemaId)> =
488            Subscription::find()
489                .select_only()
490                .columns([
491                    subscription::Column::SubscriptionId,
492                    subscription::Column::Name,
493                ])
494                .columns([object::Column::DatabaseId, object::Column::SchemaId])
495                .join(JoinType::InnerJoin, subscription::Relation::Object.def())
496                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
497                .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
498                .into_tuple()
499                .all(&inner.db)
500                .await?;
501
502        let mut job_mapping: HashMap<JobKey, ObjectId> = creating_tables
503            .into_iter()
504            .chain(creating_sinks.into_iter())
505            .chain(creating_subscriptions.into_iter())
506            .map(|(id, name, database_id, schema_id)| ((database_id, schema_id, name), id))
507            .collect();
508
509        Ok(infos
510            .into_iter()
511            .flat_map(|info| job_mapping.remove(&(info.database_id, info.schema_id, info.name)))
512            .collect())
513    }
514}