risingwave_meta/manager/
streaming_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_common::bail_not_implemented;
18use risingwave_common::catalog::TableVersionId;
19use risingwave_meta_model::object::ObjectType;
20use risingwave_meta_model::prelude::{SourceModel, TableModel};
21use risingwave_meta_model::{SourceId, TableId, TableVersion, source, table};
22use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table};
23use risingwave_pb::ddl_service::TableJobType;
24use sea_orm::entity::prelude::*;
25use sea_orm::{DatabaseTransaction, QuerySelect};
26use strum::{EnumIs, EnumTryAs};
27
28use super::{
29    get_referred_connection_ids_from_sink, get_referred_connection_ids_from_source,
30    get_referred_secret_ids_from_sink, get_referred_secret_ids_from_source,
31};
32use crate::stream::StreamFragmentGraph;
33use crate::{MetaError, MetaResult};
34
35// This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and
36// Sink.
37#[derive(Debug, Clone, EnumIs, EnumTryAs)]
38pub enum StreamingJob {
39    MaterializedView(Table),
40    Sink(Sink, Option<(Table, Option<PbSource>)>),
41    Table(Option<PbSource>, Table, TableJobType),
42    Index(Index, Table),
43    Source(PbSource),
44}
45
46impl std::fmt::Display for StreamingJob {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        match self {
49            StreamingJob::MaterializedView(table) => {
50                write!(f, "MaterializedView: {}({})", table.name, table.id)
51            }
52            StreamingJob::Sink(sink, _) => write!(f, "Sink: {}({})", sink.name, sink.id),
53            StreamingJob::Table(_, table, _) => write!(f, "Table: {}({})", table.name, table.id),
54            StreamingJob::Index(index, _) => write!(f, "Index: {}({})", index.name, index.id),
55            StreamingJob::Source(source) => write!(f, "Source: {}({})", source.name, source.id),
56        }
57    }
58}
59
60#[derive(Debug, Clone, Copy, PartialEq)]
61pub enum StreamingJobType {
62    MaterializedView,
63    Sink,
64    Table(TableJobType),
65    Index,
66    Source,
67}
68
69impl From<&StreamingJob> for StreamingJobType {
70    fn from(job: &StreamingJob) -> Self {
71        match job {
72            StreamingJob::MaterializedView(_) => StreamingJobType::MaterializedView,
73            StreamingJob::Sink(_, _) => StreamingJobType::Sink,
74            StreamingJob::Table(_, _, ty) => StreamingJobType::Table(*ty),
75            StreamingJob::Index(_, _) => StreamingJobType::Index,
76            StreamingJob::Source(_) => StreamingJobType::Source,
77        }
78    }
79}
80
81#[cfg(test)]
82#[allow(clippy::derivable_impls)]
83impl Default for StreamingJobType {
84    fn default() -> Self {
85        // This should not be used by mock services,
86        // so we can just pick an arbitrary default variant.
87        StreamingJobType::MaterializedView
88    }
89}
90
91// TODO: basically we want to ensure that the `Table` persisted in the catalog is the same as the
92// one in the `Materialize` node in the actor. However, they are currently handled separately
93// and can be out of sync. Shall we directly copy the whole struct from the actor to the catalog
94// to avoid `set`ting each field separately?
95impl StreamingJob {
96    /// Set the vnode count of the table.
97    pub fn set_table_vnode_count(&mut self, vnode_count: usize) {
98        match self {
99            Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
100                table.maybe_vnode_count = Some(vnode_count as u32);
101            }
102            Self::Sink(_, _) | Self::Source(_) => {}
103        }
104    }
105
106    /// Add some info which is only available in fragment graph to the catalog.
107    pub fn set_info_from_graph(&mut self, graph: &StreamFragmentGraph) {
108        match self {
109            Self::Table(_, table, ..) => {
110                table.fragment_id = graph.table_fragment_id();
111                table.dml_fragment_id = graph.dml_fragment_id();
112            }
113            Self::MaterializedView(table) | Self::Index(_, table) => {
114                table.fragment_id = graph.table_fragment_id();
115            }
116            Self::Sink(_, _) | Self::Source(_) => {}
117        }
118    }
119
120    pub fn id(&self) -> u32 {
121        match self {
122            Self::MaterializedView(table) => table.id,
123            Self::Sink(sink, _) => sink.id,
124            Self::Table(_, table, ..) => table.id,
125            Self::Index(index, _) => index.id,
126            Self::Source(source) => source.id,
127        }
128    }
129
130    pub fn mv_table(&self) -> Option<u32> {
131        match self {
132            Self::MaterializedView(table) => Some(table.id),
133            Self::Sink(_, _) => None,
134            Self::Table(_, table, ..) => Some(table.id),
135            Self::Index(_, table) => Some(table.id),
136            Self::Source(_) => None,
137        }
138    }
139
140    /// Returns the reference to the [`Table`] of the job if it exists.
141    pub fn table(&self) -> Option<&Table> {
142        match self {
143            Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
144                Some(table)
145            }
146            Self::Sink(_, _) | Self::Source(_) => None,
147        }
148    }
149
150    pub fn schema_id(&self) -> u32 {
151        match self {
152            Self::MaterializedView(table) => table.schema_id,
153            Self::Sink(sink, _) => sink.schema_id,
154            Self::Table(_, table, ..) => table.schema_id,
155            Self::Index(index, _) => index.schema_id,
156            Self::Source(source) => source.schema_id,
157        }
158    }
159
160    pub fn database_id(&self) -> u32 {
161        match self {
162            Self::MaterializedView(table) => table.database_id,
163            Self::Sink(sink, _) => sink.database_id,
164            Self::Table(_, table, ..) => table.database_id,
165            Self::Index(index, _) => index.database_id,
166            Self::Source(source) => source.database_id,
167        }
168    }
169
170    pub fn name(&self) -> String {
171        match self {
172            Self::MaterializedView(table) => table.name.clone(),
173            Self::Sink(sink, _) => sink.name.clone(),
174            Self::Table(_, table, ..) => table.name.clone(),
175            Self::Index(index, _) => index.name.clone(),
176            Self::Source(source) => source.name.clone(),
177        }
178    }
179
180    pub fn owner(&self) -> u32 {
181        match self {
182            StreamingJob::MaterializedView(mv) => mv.owner,
183            StreamingJob::Sink(sink, _) => sink.owner,
184            StreamingJob::Table(_, table, ..) => table.owner,
185            StreamingJob::Index(index, _) => index.owner,
186            StreamingJob::Source(source) => source.owner,
187        }
188    }
189
190    pub fn job_type(&self) -> StreamingJobType {
191        self.into()
192    }
193
194    pub fn job_type_str(&self) -> &'static str {
195        match self {
196            StreamingJob::MaterializedView(_) => "materialized view",
197            StreamingJob::Sink(_, _) => "sink",
198            StreamingJob::Table(_, _, _) => "table",
199            StreamingJob::Index(_, _) => "index",
200            StreamingJob::Source(_) => "source",
201        }
202    }
203
204    pub fn definition(&self) -> String {
205        match self {
206            Self::MaterializedView(table) => table.definition.clone(),
207            Self::Table(_, table, ..) => table.definition.clone(),
208            Self::Index(_, table) => table.definition.clone(),
209            Self::Sink(sink, _) => sink.definition.clone(),
210            Self::Source(source) => source.definition.clone(),
211        }
212    }
213
214    pub fn object_type(&self) -> ObjectType {
215        match self {
216            Self::MaterializedView(_) => ObjectType::Table, // Note MV is special.
217            Self::Sink(_, _) => ObjectType::Sink,
218            Self::Table(_, _, _) => ObjectType::Table,
219            Self::Index(_, _) => ObjectType::Index,
220            Self::Source(_) => ObjectType::Source,
221        }
222    }
223
224    /// Returns the [`TableVersionId`] if this job is `Table`.
225    pub fn table_version_id(&self) -> Option<TableVersionId> {
226        if let Self::Table(_, table, ..) = self {
227            Some(
228                table
229                    .get_version()
230                    .expect("table must be versioned")
231                    .version,
232            )
233        } else {
234            None
235        }
236    }
237
238    pub fn create_type(&self) -> CreateType {
239        match self {
240            Self::MaterializedView(table) => {
241                table.get_create_type().unwrap_or(CreateType::Foreground)
242            }
243            Self::Sink(s, _) => s.get_create_type().unwrap_or(CreateType::Foreground),
244            _ => CreateType::Foreground,
245        }
246    }
247
248    // TODO: to be removed, pass all objects uniformly through `dependencies` field instead.
249    pub fn dependent_relations(&self) -> Vec<u32> {
250        match self {
251            StreamingJob::MaterializedView(table) => table.dependent_relations.clone(),
252            StreamingJob::Sink(_sink, _) => vec![], /* sink dependencies are now passed via `dependencies` field in `CreateSinkRequest` */
253            StreamingJob::Table(_, table, _) => table.dependent_relations.clone(), /* TODO(rc): record table dependencies via `dependencies` field */
254            StreamingJob::Index(index, index_table) => {
255                // TODO(rc): record index dependencies via `dependencies` field
256                assert_eq!(index.primary_table_id, index_table.dependent_relations[0]);
257                vec![]
258            }
259            StreamingJob::Source(_) => vec![],
260        }
261    }
262
263    pub fn dependent_connection_ids(&self) -> MetaResult<HashSet<u32>> {
264        match self {
265            StreamingJob::Source(source) => Ok(get_referred_connection_ids_from_source(source)),
266            StreamingJob::Table(source, _, _) => {
267                if let Some(source) = source {
268                    Ok(get_referred_connection_ids_from_source(source))
269                } else {
270                    Ok(HashSet::new())
271                }
272            }
273            StreamingJob::Sink(sink, _) => Ok(get_referred_connection_ids_from_sink(sink)),
274            StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()),
275        }
276    }
277
278    // Get the secret ids that are referenced by this job.
279    pub fn dependent_secret_ids(&self) -> MetaResult<HashSet<u32>> {
280        match self {
281            StreamingJob::Sink(sink, _) => Ok(get_referred_secret_ids_from_sink(sink)),
282            StreamingJob::Table(source, _, _) => {
283                if let Some(source) = source {
284                    get_referred_secret_ids_from_source(source)
285                } else {
286                    Ok(HashSet::new())
287                }
288            }
289            StreamingJob::Source(source) => get_referred_secret_ids_from_source(source),
290            StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()),
291        }
292    }
293
294    /// Verify the new version is the next version of the original version.
295    pub async fn verify_version_for_replace(&self, txn: &DatabaseTransaction) -> MetaResult<()> {
296        let id = self.id();
297
298        match self {
299            StreamingJob::Table(_source, table, _table_job_type) => {
300                let new_version = table.get_version()?.get_version();
301                let original_version: Option<TableVersion> = TableModel::find_by_id(id as TableId)
302                    .select_only()
303                    .column(table::Column::Version)
304                    .into_tuple()
305                    .one(txn)
306                    .await?
307                    .ok_or_else(|| MetaError::catalog_id_not_found(self.job_type_str(), id))?;
308                let original_version = original_version
309                    .expect("version for table should exist")
310                    .to_protobuf();
311                if new_version != original_version.version + 1 {
312                    return Err(MetaError::permission_denied("table version is stale"));
313                }
314            }
315            StreamingJob::Source(source) => {
316                let new_version = source.get_version();
317                let original_version: Option<i64> = SourceModel::find_by_id(id as SourceId)
318                    .select_only()
319                    .column(source::Column::Version)
320                    .into_tuple()
321                    .one(txn)
322                    .await?
323                    .ok_or_else(|| MetaError::catalog_id_not_found(self.job_type_str(), id))?;
324                let original_version = original_version.expect("version for source should exist");
325                if new_version != original_version as u64 + 1 {
326                    return Err(MetaError::permission_denied("source version is stale"));
327                }
328            }
329            StreamingJob::MaterializedView(_)
330            | StreamingJob::Sink(_, _)
331            | StreamingJob::Index(_, _) => {
332                bail_not_implemented!("schema change for {}", self.job_type_str())
333            }
334        }
335        Ok(())
336    }
337}