risingwave_meta/controller/catalog/
util.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::FragmentTypeMask;
16
17use super::*;
18use crate::controller::fragment::FragmentTypeMaskExt;
19use crate::controller::utils::load_streaming_jobs_by_ids;
20
21pub(crate) async fn update_internal_tables(
22    txn: &DatabaseTransaction,
23    object_id: ObjectId,
24    column: object::Column,
25    new_value: impl Into<Value>,
26    objects_to_notify: &mut Vec<PbObjectInfo>,
27) -> MetaResult<()> {
28    let internal_tables = get_internal_tables_by_id(object_id.as_job_id(), txn).await?;
29
30    if !internal_tables.is_empty() {
31        Object::update_many()
32            .col_expr(column, SimpleExpr::Value(new_value.into()))
33            .filter(object::Column::Oid.is_in(internal_tables.clone()))
34            .exec(txn)
35            .await?;
36
37        let table_objs = Table::find()
38            .find_also_related(Object)
39            .filter(table::Column::TableId.is_in(internal_tables))
40            .all(txn)
41            .await?;
42        let streaming_jobs =
43            load_streaming_jobs_by_ids(txn, table_objs.iter().map(|(table, _)| table.job_id()))
44                .await?;
45        for (table, table_obj) in table_objs {
46            let job_id = table.job_id();
47            let streaming_job = streaming_jobs.get(&job_id).cloned();
48            objects_to_notify.push(PbObjectInfo::Table(
49                ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
50            ));
51        }
52    }
53    Ok(())
54}
55
56impl CatalogController {
57    pub(crate) async fn init(&self) -> MetaResult<()> {
58        self.table_catalog_cdc_table_id_update().await?;
59        Ok(())
60    }
61
62    /// Fill in the `cdc_table_id` field for Table with empty `cdc_table_id` and parent Source job.
63    /// NOTES: We assume Table with a parent Source job is a CDC table
64    pub(crate) async fn table_catalog_cdc_table_id_update(&self) -> MetaResult<()> {
65        let inner = self.inner.read().await;
66        let txn = inner.db.begin().await?;
67
68        // select Tables which cdc_table_id is empty and has a parent Source job
69        let table_and_source_id: Vec<(TableId, String, SourceId)> = Table::find()
70            .join(JoinType::InnerJoin, table::Relation::ObjectDependency.def())
71            .join(
72                JoinType::InnerJoin,
73                object_dependency::Relation::Source.def(),
74            )
75            .select_only()
76            .columns([table::Column::TableId, table::Column::Definition])
77            .columns([source::Column::SourceId])
78            .filter(
79                table::Column::TableType.eq(TableType::Table).and(
80                    table::Column::CdcTableId
81                        .is_null()
82                        .or(table::Column::CdcTableId.eq("")),
83                ),
84            )
85            .into_tuple()
86            .all(&txn)
87            .await?;
88
89        // return directly if the result set is empty.
90        if table_and_source_id.is_empty() {
91            return Ok(());
92        }
93
94        info!(table_and_source_id = ?table_and_source_id, "cdc table with empty cdc_table_id");
95
96        let mut cdc_table_ids = HashMap::new();
97        for (table_id, definition, source_id) in table_and_source_id {
98            match extract_external_table_name_from_definition(&definition) {
99                None => {
100                    tracing::warn!(
101                        %table_id,
102                        definition,
103                        "failed to extract cdc table name from table definition.",
104                    )
105                }
106                Some(external_table_name) => {
107                    cdc_table_ids.insert(
108                        table_id,
109                        build_cdc_table_id(source_id, &external_table_name),
110                    );
111                }
112            }
113        }
114
115        for (table_id, cdc_table_id) in cdc_table_ids {
116            Table::update(table::ActiveModel {
117                table_id: Set(table_id as _),
118                cdc_table_id: Set(Some(cdc_table_id)),
119                ..Default::default()
120            })
121            .exec(&txn)
122            .await?;
123        }
124        txn.commit().await?;
125        Ok(())
126    }
127
128    pub(crate) async fn log_cleaned_dirty_jobs(
129        &self,
130        dirty_objs: &[PartialObject],
131        txn: &DatabaseTransaction,
132    ) -> MetaResult<()> {
133        // Record cleaned streaming jobs in event logs.
134        let mut dirty_table_ids = vec![];
135        let mut dirty_source_ids = vec![];
136        let mut dirty_sink_ids = vec![];
137        for dirty_job_obj in dirty_objs {
138            let job_id = dirty_job_obj.oid;
139            let job_type = dirty_job_obj.obj_type;
140            match job_type {
141                ObjectType::Table | ObjectType::Index => dirty_table_ids.push(job_id),
142                ObjectType::Source => dirty_source_ids.push(job_id),
143                ObjectType::Sink => dirty_sink_ids.push(job_id),
144                _ => unreachable!("unexpected streaming job type"),
145            }
146        }
147
148        let mut event_logs = vec![];
149        if !dirty_table_ids.is_empty() {
150            let table_info: Vec<(TableId, String, String)> = Table::find()
151                .select_only()
152                .columns([
153                    table::Column::TableId,
154                    table::Column::Name,
155                    table::Column::Definition,
156                ])
157                .filter(table::Column::TableId.is_in(dirty_table_ids))
158                .into_tuple()
159                .all(txn)
160                .await?;
161            for (table_id, name, definition) in table_info {
162                let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
163                    id: table_id.as_job_id(),
164                    name,
165                    definition,
166                    error: "clear during recovery".to_owned(),
167                };
168                event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
169                    event,
170                ));
171            }
172        }
173        if !dirty_source_ids.is_empty() {
174            let source_info: Vec<(SourceId, String, String)> = Source::find()
175                .select_only()
176                .columns([
177                    source::Column::SourceId,
178                    source::Column::Name,
179                    source::Column::Definition,
180                ])
181                .filter(source::Column::SourceId.is_in(dirty_source_ids))
182                .into_tuple()
183                .all(txn)
184                .await?;
185            for (source_id, name, definition) in source_info {
186                let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
187                    id: source_id.as_share_source_job_id(),
188                    name,
189                    definition,
190                    error: "clear during recovery".to_owned(),
191                };
192                event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
193                    event,
194                ));
195            }
196        }
197        if !dirty_sink_ids.is_empty() {
198            let sink_info: Vec<(SinkId, String, String)> = Sink::find()
199                .select_only()
200                .columns([
201                    sink::Column::SinkId,
202                    sink::Column::Name,
203                    sink::Column::Definition,
204                ])
205                .filter(sink::Column::SinkId.is_in(dirty_sink_ids))
206                .into_tuple()
207                .all(txn)
208                .await?;
209            for (sink_id, name, definition) in sink_info {
210                let event = risingwave_pb::meta::event_log::EventDirtyStreamJobClear {
211                    id: sink_id.as_job_id(),
212                    name,
213                    definition,
214                    error: "clear during recovery".to_owned(),
215                };
216                event_logs.push(risingwave_pb::meta::event_log::Event::DirtyStreamJobClear(
217                    event,
218                ));
219            }
220        }
221        self.env.event_log_manager_ref().add_event_logs(event_logs);
222        Ok(())
223    }
224
225    pub(crate) async fn clean_dirty_sink_downstreams(txn: &DatabaseTransaction) -> MetaResult<()> {
226        // clean incoming sink from (table)
227        // clean upstream fragment ids from (fragment)
228        // clean stream node from (fragment)
229        // clean upstream actor ids from (actor)
230
231        // The cleanup of fragment and StreamNode is to maintain compatibility with old versions of data. For the
232        // current sink-into-table implementation, there is no need to restore the contents of StreamNode, because the
233        // `UpstreamSinkUnion` operator does not persist any data, but relies on refill during recovery.
234
235        let all_fragment_ids: Vec<FragmentId> = Fragment::find()
236            .select_only()
237            .column(fragment::Column::FragmentId)
238            .into_tuple()
239            .all(txn)
240            .await?;
241
242        let all_fragment_ids: HashSet<_> = all_fragment_ids.into_iter().collect();
243
244        let all_sink_into_tables: Vec<Option<TableId>> = Sink::find()
245            .select_only()
246            .column(sink::Column::TargetTable)
247            .filter(sink::Column::TargetTable.is_not_null())
248            .into_tuple()
249            .all(txn)
250            .await?;
251
252        let mut table_with_incoming_sinks: HashSet<TableId> = HashSet::new();
253        for target_table_id in all_sink_into_tables {
254            table_with_incoming_sinks.insert(target_table_id.expect("filter by non null"));
255        }
256
257        // no need to update, returning
258        if table_with_incoming_sinks.is_empty() {
259            return Ok(());
260        }
261
262        for table_id in table_with_incoming_sinks {
263            tracing::info!("cleaning dirty table sink downstream table {}", table_id);
264
265            let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
266                .select_only()
267                .columns(vec![
268                    fragment::Column::FragmentId,
269                    fragment::Column::StreamNode,
270                ])
271                .filter(fragment::Column::JobId.eq(table_id).and(
272                    // dirty downstream should be materialize fragment of table
273                    FragmentTypeMask::intersects(FragmentTypeFlag::Mview),
274                ))
275                .into_tuple()
276                .all(txn)
277                .await?;
278
279            for (fragment_id, stream_node) in fragments {
280                {
281                    let mut dirty_upstream_fragment_ids = HashSet::new();
282
283                    let mut pb_stream_node = stream_node.to_protobuf();
284
285                    visit_stream_node_cont_mut(&mut pb_stream_node, |node| {
286                        if let Some(NodeBody::Union(_)) = node.node_body {
287                            node.input.retain_mut(|input| match &mut input.node_body {
288                                Some(NodeBody::Project(_)) => {
289                                    let body = input.input.iter().exactly_one().unwrap();
290                                    let Some(NodeBody::Merge(merge_node)) = &body.node_body else {
291                                        unreachable!("expect merge node");
292                                    };
293                                    if all_fragment_ids.contains(&(merge_node.upstream_fragment_id))
294                                    {
295                                        true
296                                    } else {
297                                        dirty_upstream_fragment_ids
298                                            .insert(merge_node.upstream_fragment_id);
299                                        false
300                                    }
301                                }
302                                Some(NodeBody::Merge(merge_node)) => {
303                                    if all_fragment_ids.contains(&(merge_node.upstream_fragment_id))
304                                    {
305                                        true
306                                    } else {
307                                        dirty_upstream_fragment_ids
308                                            .insert(merge_node.upstream_fragment_id);
309                                        false
310                                    }
311                                }
312                                _ => false,
313                            });
314                        }
315                        true
316                    });
317
318                    tracing::info!(
319                        "cleaning dirty table sink fragment {:?} from downstream fragment {}",
320                        dirty_upstream_fragment_ids,
321                        fragment_id
322                    );
323
324                    if !dirty_upstream_fragment_ids.is_empty() {
325                        tracing::info!(
326                            "fixing dirty stream node in downstream fragment {}",
327                            fragment_id
328                        );
329                        Fragment::update_many()
330                            .col_expr(
331                                fragment::Column::StreamNode,
332                                StreamNode::from(&pb_stream_node).into(),
333                            )
334                            .filter(fragment::Column::FragmentId.eq(fragment_id))
335                            .exec(txn)
336                            .await?;
337                    }
338                }
339            }
340        }
341
342        Ok(())
343    }
344
345    pub async fn has_any_streaming_jobs(&self) -> MetaResult<bool> {
346        let inner = self.inner.read().await;
347        let count = streaming_job::Entity::find().count(&inner.db).await?;
348        Ok(count > 0)
349    }
350
351    pub async fn find_creating_streaming_job_ids(
352        &self,
353        infos: Vec<PbCreatingJobInfo>,
354    ) -> MetaResult<Vec<ObjectId>> {
355        let inner = self.inner.read().await;
356
357        type JobKey = (DatabaseId, SchemaId, String);
358
359        // Index table is already included if we still assign the same name for index table as the index.
360        let creating_tables: Vec<(ObjectId, String, DatabaseId, SchemaId)> = Table::find()
361            .select_only()
362            .columns([table::Column::TableId, table::Column::Name])
363            .columns([object::Column::DatabaseId, object::Column::SchemaId])
364            .join(JoinType::InnerJoin, table::Relation::Object1.def())
365            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
366            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
367            .into_tuple()
368            .all(&inner.db)
369            .await?;
370        let creating_sinks: Vec<(ObjectId, String, DatabaseId, SchemaId)> = Sink::find()
371            .select_only()
372            .columns([sink::Column::SinkId, sink::Column::Name])
373            .columns([object::Column::DatabaseId, object::Column::SchemaId])
374            .join(JoinType::InnerJoin, sink::Relation::Object.def())
375            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
376            .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
377            .into_tuple()
378            .all(&inner.db)
379            .await?;
380        let creating_subscriptions: Vec<(ObjectId, String, DatabaseId, SchemaId)> =
381            Subscription::find()
382                .select_only()
383                .columns([
384                    subscription::Column::SubscriptionId,
385                    subscription::Column::Name,
386                ])
387                .columns([object::Column::DatabaseId, object::Column::SchemaId])
388                .join(JoinType::InnerJoin, subscription::Relation::Object.def())
389                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
390                .filter(streaming_job::Column::JobStatus.eq(JobStatus::Creating))
391                .into_tuple()
392                .all(&inner.db)
393                .await?;
394
395        let mut job_mapping: HashMap<JobKey, ObjectId> = creating_tables
396            .into_iter()
397            .chain(creating_sinks.into_iter())
398            .chain(creating_subscriptions.into_iter())
399            .map(|(id, name, database_id, schema_id)| ((database_id, schema_id, name), id))
400            .collect();
401
402        Ok(infos
403            .into_iter()
404            .flat_map(|info| job_mapping.remove(&(info.database_id, info.schema_id, info.name)))
405            .collect())
406    }
407}