risingwave_meta/manager/
streaming_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_common::bail_not_implemented;
18use risingwave_common::catalog::TableVersionId;
19use risingwave_common::id::{ConnectionId, DatabaseId, JobId, SchemaId, SecretId};
20use risingwave_meta_model::object::ObjectType;
21use risingwave_meta_model::prelude::{SourceModel, TableModel};
22use risingwave_meta_model::{TableVersion, source, table};
23use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table};
24use risingwave_pb::ddl_service::TableJobType;
25use sea_orm::entity::prelude::*;
26use sea_orm::{DatabaseTransaction, QuerySelect};
27use strum::{EnumIs, EnumTryAs};
28
29use super::{
30    get_referred_connection_ids_from_sink, get_referred_connection_ids_from_source,
31    get_referred_secret_ids_from_sink, get_referred_secret_ids_from_source,
32};
33use crate::stream::StreamFragmentGraph;
34use crate::{MetaError, MetaResult};
35
36// This enum is used to re-use code in `DdlServiceImpl` for creating MaterializedView and
37// Sink.
38#[derive(Debug, Clone, EnumIs, EnumTryAs)]
39pub enum StreamingJob {
40    MaterializedView(Table),
41    Sink(Sink),
42    Table(Option<PbSource>, Table, TableJobType),
43    Index(Index, Table),
44    Source(PbSource),
45}
46
47impl std::fmt::Display for StreamingJob {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        match self {
50            StreamingJob::MaterializedView(table) => {
51                write!(f, "MaterializedView: {}({})", table.name, table.id)
52            }
53            StreamingJob::Sink(sink) => write!(f, "Sink: {}({})", sink.name, sink.id),
54            StreamingJob::Table(_, table, _) => write!(f, "Table: {}({})", table.name, table.id),
55            StreamingJob::Index(index, _) => write!(f, "Index: {}({})", index.name, index.id),
56            StreamingJob::Source(source) => write!(f, "Source: {}({})", source.name, source.id),
57        }
58    }
59}
60
61#[derive(Debug, Clone, Copy, PartialEq)]
62pub enum StreamingJobType {
63    MaterializedView,
64    Sink,
65    Table(TableJobType),
66    Index,
67    Source,
68}
69
70impl From<&StreamingJob> for StreamingJobType {
71    fn from(job: &StreamingJob) -> Self {
72        match job {
73            StreamingJob::MaterializedView(_) => StreamingJobType::MaterializedView,
74            StreamingJob::Sink(_) => StreamingJobType::Sink,
75            StreamingJob::Table(_, _, ty) => StreamingJobType::Table(*ty),
76            StreamingJob::Index(_, _) => StreamingJobType::Index,
77            StreamingJob::Source(_) => StreamingJobType::Source,
78        }
79    }
80}
81
82#[cfg(test)]
83#[allow(clippy::derivable_impls)]
84impl Default for StreamingJobType {
85    fn default() -> Self {
86        // This should not be used by mock services,
87        // so we can just pick an arbitrary default variant.
88        StreamingJobType::MaterializedView
89    }
90}
91
92// TODO: basically we want to ensure that the `Table` persisted in the catalog is the same as the
93// one in the `Materialize` node in the actor. However, they are currently handled separately
94// and can be out of sync. Shall we directly copy the whole struct from the actor to the catalog
95// to avoid `set`ting each field separately?
96impl StreamingJob {
97    /// Set the vnode count of the table.
98    pub fn set_table_vnode_count(&mut self, vnode_count: usize) {
99        match self {
100            Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
101                table.maybe_vnode_count = Some(vnode_count as u32);
102            }
103            Self::Sink(_) | Self::Source(_) => {}
104        }
105    }
106
107    /// Add some info which is only available in fragment graph to the catalog.
108    pub fn set_info_from_graph(&mut self, graph: &StreamFragmentGraph) {
109        match self {
110            Self::Table(_, table, ..) => {
111                table.fragment_id = graph.table_fragment_id();
112                table.dml_fragment_id = graph.dml_fragment_id();
113            }
114            Self::MaterializedView(table) | Self::Index(_, table) => {
115                table.fragment_id = graph.table_fragment_id();
116            }
117            Self::Sink(_) | Self::Source(_) => {}
118        }
119    }
120
121    pub fn id(&self) -> JobId {
122        match self {
123            Self::MaterializedView(table) => table.id.as_job_id(),
124            Self::Sink(sink) => sink.id.as_job_id(),
125            Self::Table(_, table, ..) => table.id.as_job_id(),
126            Self::Index(index, _) => index.id.as_job_id(),
127            Self::Source(source) => source.id.as_share_source_job_id(),
128        }
129    }
130
131    /// Returns the reference to the [`Table`] of the job if it exists.
132    pub fn table(&self) -> Option<&Table> {
133        match self {
134            Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
135                Some(table)
136            }
137            Self::Sink(_) | Self::Source(_) => None,
138        }
139    }
140
141    pub fn schema_id(&self) -> SchemaId {
142        match self {
143            Self::MaterializedView(table) => table.schema_id,
144            Self::Sink(sink) => sink.schema_id,
145            Self::Table(_, table, ..) => table.schema_id,
146            Self::Index(index, _) => index.schema_id,
147            Self::Source(source) => source.schema_id,
148        }
149    }
150
151    pub fn database_id(&self) -> DatabaseId {
152        match self {
153            Self::MaterializedView(table) => table.database_id,
154            Self::Sink(sink) => sink.database_id,
155            Self::Table(_, table, ..) => table.database_id,
156            Self::Index(index, _) => index.database_id,
157            Self::Source(source) => source.database_id,
158        }
159    }
160
161    pub fn name(&self) -> String {
162        match self {
163            Self::MaterializedView(table) => table.name.clone(),
164            Self::Sink(sink) => sink.name.clone(),
165            Self::Table(_, table, ..) => table.name.clone(),
166            Self::Index(index, _) => index.name.clone(),
167            Self::Source(source) => source.name.clone(),
168        }
169    }
170
171    pub fn owner(&self) -> u32 {
172        match self {
173            StreamingJob::MaterializedView(mv) => mv.owner,
174            StreamingJob::Sink(sink) => sink.owner,
175            StreamingJob::Table(_, table, ..) => table.owner,
176            StreamingJob::Index(index, _) => index.owner,
177            StreamingJob::Source(source) => source.owner,
178        }
179    }
180
181    pub fn job_type(&self) -> StreamingJobType {
182        self.into()
183    }
184
185    pub fn job_type_str(&self) -> &'static str {
186        match self {
187            StreamingJob::MaterializedView(_) => "materialized view",
188            StreamingJob::Sink(_) => "sink",
189            StreamingJob::Table(_, _, _) => "table",
190            StreamingJob::Index(_, _) => "index",
191            StreamingJob::Source(_) => "source",
192        }
193    }
194
195    pub fn definition(&self) -> String {
196        match self {
197            Self::MaterializedView(table) => table.definition.clone(),
198            Self::Table(_, table, ..) => table.definition.clone(),
199            Self::Index(_, table) => table.definition.clone(),
200            Self::Sink(sink) => sink.definition.clone(),
201            Self::Source(source) => source.definition.clone(),
202        }
203    }
204
205    pub fn object_type(&self) -> ObjectType {
206        match self {
207            Self::MaterializedView(_) => ObjectType::Table, // Note MV is special.
208            Self::Sink(_) => ObjectType::Sink,
209            Self::Table(_, _, _) => ObjectType::Table,
210            Self::Index(_, _) => ObjectType::Index,
211            Self::Source(_) => ObjectType::Source,
212        }
213    }
214
215    /// Returns the [`TableVersionId`] if this job is `Table`.
216    pub fn table_version_id(&self) -> Option<TableVersionId> {
217        if let Self::Table(_, table, ..) = self {
218            Some(
219                table
220                    .get_version()
221                    .expect("table must be versioned")
222                    .version,
223            )
224        } else {
225            None
226        }
227    }
228
229    pub fn create_type(&self) -> CreateType {
230        match self {
231            Self::Table(_, table, ..) => table.get_create_type().unwrap_or(CreateType::Foreground),
232            Self::MaterializedView(table) => {
233                table.get_create_type().unwrap_or(CreateType::Foreground)
234            }
235            Self::Sink(s) => s.get_create_type().unwrap_or(CreateType::Foreground),
236            Self::Index(index, _) => {
237                CreateType::try_from(index.create_type).unwrap_or(CreateType::Foreground)
238            }
239            _ => CreateType::Foreground,
240        }
241    }
242
243    pub fn dependent_connection_ids(&self) -> MetaResult<HashSet<ConnectionId>> {
244        match self {
245            StreamingJob::Source(source) => Ok(get_referred_connection_ids_from_source(source)),
246            StreamingJob::Table(source, _, _) => {
247                if let Some(source) = source {
248                    Ok(get_referred_connection_ids_from_source(source))
249                } else {
250                    Ok(HashSet::new())
251                }
252            }
253            StreamingJob::Sink(sink) => Ok(get_referred_connection_ids_from_sink(sink)),
254            StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()),
255        }
256    }
257
258    // Get the secret ids that are referenced by this job.
259    pub fn dependent_secret_ids(&self) -> MetaResult<HashSet<SecretId>> {
260        match self {
261            StreamingJob::Sink(sink) => Ok(get_referred_secret_ids_from_sink(sink)),
262            StreamingJob::Table(source, _, _) => {
263                if let Some(source) = source {
264                    get_referred_secret_ids_from_source(source)
265                } else {
266                    Ok(HashSet::new())
267                }
268            }
269            StreamingJob::Source(source) => get_referred_secret_ids_from_source(source),
270            StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()),
271        }
272    }
273
274    /// Verify the new version is the next version of the original version.
275    pub async fn verify_version_for_replace(&self, txn: &DatabaseTransaction) -> MetaResult<()> {
276        let id = self.id();
277
278        match self {
279            StreamingJob::Table(_source, table, _table_job_type) => {
280                let new_version = table.get_version()?.get_version();
281                let original_version: Option<TableVersion> =
282                    TableModel::find_by_id(id.as_mv_table_id())
283                        .select_only()
284                        .column(table::Column::Version)
285                        .into_tuple()
286                        .one(txn)
287                        .await?
288                        .ok_or_else(|| MetaError::catalog_id_not_found(self.job_type_str(), id))?;
289                let original_version = original_version
290                    .expect("version for table should exist")
291                    .to_protobuf();
292                if new_version != original_version.version + 1 {
293                    return Err(MetaError::permission_denied("table version is stale"));
294                }
295            }
296            StreamingJob::Source(source) => {
297                let new_version = source.get_version();
298                let original_version: Option<i64> =
299                    SourceModel::find_by_id(id.as_shared_source_id())
300                        .select_only()
301                        .column(source::Column::Version)
302                        .into_tuple()
303                        .one(txn)
304                        .await?
305                        .ok_or_else(|| MetaError::catalog_id_not_found(self.job_type_str(), id))?;
306                let original_version = original_version.expect("version for source should exist");
307                if new_version != original_version as u64 + 1 {
308                    return Err(MetaError::permission_denied("source version is stale"));
309                }
310            }
311            StreamingJob::MaterializedView(_) => {
312                // No version check for materialized view, since `ALTER MATERIALIZED VIEW AS QUERY`
313                // is a full rewrite.
314            }
315            StreamingJob::Sink(_) => {
316                // No version check for sink, since sink fragment altering is triggered along with Table
317            }
318            StreamingJob::Index(_, _) => {
319                bail_not_implemented!("schema change for {}", self.job_type_str())
320            }
321        }
322        Ok(())
323    }
324
325    // Check whether we should notify the FE about the `CREATING` catalog of this job.
326    pub fn should_notify_creating(&self) -> bool {
327        self.is_materialized_view() || matches!(self.create_type(), CreateType::Background)
328    }
329
330    pub fn is_sink_into_table(&self) -> bool {
331        matches!(self, Self::Sink(sink) if sink.target_table.is_some())
332    }
333}