risingwave_meta/manager/
streaming_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_common::bail_not_implemented;
18use risingwave_common::catalog::TableVersionId;
19use risingwave_common::id::JobId;
20use risingwave_meta_model::object::ObjectType;
21use risingwave_meta_model::prelude::{SourceModel, TableModel};
22use risingwave_meta_model::{SourceId, TableVersion, source, table};
23use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table};
24use risingwave_pb::ddl_service::TableJobType;
25use sea_orm::entity::prelude::*;
26use sea_orm::{DatabaseTransaction, QuerySelect};
27use strum::{EnumIs, EnumTryAs};
28
29use super::{
30    get_referred_connection_ids_from_sink, get_referred_connection_ids_from_source,
31    get_referred_secret_ids_from_sink, get_referred_secret_ids_from_source,
32};
33use crate::stream::StreamFragmentGraph;
34use crate::{MetaError, MetaResult};
35
36// This enum is used to re-use code in `DdlServiceImpl` for creating MaterializedView and
37// Sink.
38#[derive(Debug, Clone, EnumIs, EnumTryAs)]
39pub enum StreamingJob {
40    MaterializedView(Table),
41    Sink(Sink),
42    Table(Option<PbSource>, Table, TableJobType),
43    Index(Index, Table),
44    Source(PbSource),
45}
46
47impl std::fmt::Display for StreamingJob {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        match self {
50            StreamingJob::MaterializedView(table) => {
51                write!(f, "MaterializedView: {}({})", table.name, table.id)
52            }
53            StreamingJob::Sink(sink) => write!(f, "Sink: {}({})", sink.name, sink.id),
54            StreamingJob::Table(_, table, _) => write!(f, "Table: {}({})", table.name, table.id),
55            StreamingJob::Index(index, _) => write!(f, "Index: {}({})", index.name, index.id),
56            StreamingJob::Source(source) => write!(f, "Source: {}({})", source.name, source.id),
57        }
58    }
59}
60
61#[derive(Debug, Clone, Copy, PartialEq)]
62pub enum StreamingJobType {
63    MaterializedView,
64    Sink,
65    Table(TableJobType),
66    Index,
67    Source,
68}
69
70impl From<&StreamingJob> for StreamingJobType {
71    fn from(job: &StreamingJob) -> Self {
72        match job {
73            StreamingJob::MaterializedView(_) => StreamingJobType::MaterializedView,
74            StreamingJob::Sink(_) => StreamingJobType::Sink,
75            StreamingJob::Table(_, _, ty) => StreamingJobType::Table(*ty),
76            StreamingJob::Index(_, _) => StreamingJobType::Index,
77            StreamingJob::Source(_) => StreamingJobType::Source,
78        }
79    }
80}
81
82#[cfg(test)]
83#[allow(clippy::derivable_impls)]
84impl Default for StreamingJobType {
85    fn default() -> Self {
86        // This should not be used by mock services,
87        // so we can just pick an arbitrary default variant.
88        StreamingJobType::MaterializedView
89    }
90}
91
92// TODO: basically we want to ensure that the `Table` persisted in the catalog is the same as the
93// one in the `Materialize` node in the actor. However, they are currently handled separately
94// and can be out of sync. Shall we directly copy the whole struct from the actor to the catalog
95// to avoid `set`ting each field separately?
96impl StreamingJob {
97    /// Set the vnode count of the table.
98    pub fn set_table_vnode_count(&mut self, vnode_count: usize) {
99        match self {
100            Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
101                table.maybe_vnode_count = Some(vnode_count as u32);
102            }
103            Self::Sink(_) | Self::Source(_) => {}
104        }
105    }
106
107    /// Add some info which is only available in fragment graph to the catalog.
108    pub fn set_info_from_graph(&mut self, graph: &StreamFragmentGraph) {
109        match self {
110            Self::Table(_, table, ..) => {
111                table.fragment_id = graph.table_fragment_id();
112                table.dml_fragment_id = graph.dml_fragment_id();
113            }
114            Self::MaterializedView(table) | Self::Index(_, table) => {
115                table.fragment_id = graph.table_fragment_id();
116            }
117            Self::Sink(_) | Self::Source(_) => {}
118        }
119    }
120
121    pub fn id(&self) -> JobId {
122        match self {
123            Self::MaterializedView(table) => table.id,
124            Self::Sink(sink) => sink.id,
125            Self::Table(_, table, ..) => table.id,
126            Self::Index(index, _) => index.id,
127            Self::Source(source) => source.id,
128        }
129        .into()
130    }
131
132    pub fn mv_table(&self) -> Option<u32> {
133        match self {
134            Self::MaterializedView(table) => Some(table.id),
135            Self::Sink(_) => None,
136            Self::Table(_, table, ..) => Some(table.id),
137            Self::Index(_, table) => Some(table.id),
138            Self::Source(_) => None,
139        }
140    }
141
142    /// Returns the reference to the [`Table`] of the job if it exists.
143    pub fn table(&self) -> Option<&Table> {
144        match self {
145            Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
146                Some(table)
147            }
148            Self::Sink(_) | Self::Source(_) => None,
149        }
150    }
151
152    pub fn schema_id(&self) -> u32 {
153        match self {
154            Self::MaterializedView(table) => table.schema_id,
155            Self::Sink(sink) => sink.schema_id,
156            Self::Table(_, table, ..) => table.schema_id,
157            Self::Index(index, _) => index.schema_id,
158            Self::Source(source) => source.schema_id,
159        }
160    }
161
162    pub fn database_id(&self) -> u32 {
163        match self {
164            Self::MaterializedView(table) => table.database_id,
165            Self::Sink(sink) => sink.database_id,
166            Self::Table(_, table, ..) => table.database_id,
167            Self::Index(index, _) => index.database_id,
168            Self::Source(source) => source.database_id,
169        }
170    }
171
172    pub fn name(&self) -> String {
173        match self {
174            Self::MaterializedView(table) => table.name.clone(),
175            Self::Sink(sink) => sink.name.clone(),
176            Self::Table(_, table, ..) => table.name.clone(),
177            Self::Index(index, _) => index.name.clone(),
178            Self::Source(source) => source.name.clone(),
179        }
180    }
181
182    pub fn owner(&self) -> u32 {
183        match self {
184            StreamingJob::MaterializedView(mv) => mv.owner,
185            StreamingJob::Sink(sink) => sink.owner,
186            StreamingJob::Table(_, table, ..) => table.owner,
187            StreamingJob::Index(index, _) => index.owner,
188            StreamingJob::Source(source) => source.owner,
189        }
190    }
191
192    pub fn job_type(&self) -> StreamingJobType {
193        self.into()
194    }
195
196    pub fn job_type_str(&self) -> &'static str {
197        match self {
198            StreamingJob::MaterializedView(_) => "materialized view",
199            StreamingJob::Sink(_) => "sink",
200            StreamingJob::Table(_, _, _) => "table",
201            StreamingJob::Index(_, _) => "index",
202            StreamingJob::Source(_) => "source",
203        }
204    }
205
206    pub fn definition(&self) -> String {
207        match self {
208            Self::MaterializedView(table) => table.definition.clone(),
209            Self::Table(_, table, ..) => table.definition.clone(),
210            Self::Index(_, table) => table.definition.clone(),
211            Self::Sink(sink) => sink.definition.clone(),
212            Self::Source(source) => source.definition.clone(),
213        }
214    }
215
216    pub fn object_type(&self) -> ObjectType {
217        match self {
218            Self::MaterializedView(_) => ObjectType::Table, // Note MV is special.
219            Self::Sink(_) => ObjectType::Sink,
220            Self::Table(_, _, _) => ObjectType::Table,
221            Self::Index(_, _) => ObjectType::Index,
222            Self::Source(_) => ObjectType::Source,
223        }
224    }
225
226    /// Returns the [`TableVersionId`] if this job is `Table`.
227    pub fn table_version_id(&self) -> Option<TableVersionId> {
228        if let Self::Table(_, table, ..) = self {
229            Some(
230                table
231                    .get_version()
232                    .expect("table must be versioned")
233                    .version,
234            )
235        } else {
236            None
237        }
238    }
239
240    pub fn create_type(&self) -> CreateType {
241        match self {
242            Self::MaterializedView(table) => {
243                table.get_create_type().unwrap_or(CreateType::Foreground)
244            }
245            Self::Sink(s) => s.get_create_type().unwrap_or(CreateType::Foreground),
246            Self::Index(index, _) => {
247                CreateType::try_from(index.create_type).unwrap_or(CreateType::Foreground)
248            }
249            _ => CreateType::Foreground,
250        }
251    }
252
253    pub fn dependent_connection_ids(&self) -> MetaResult<HashSet<u32>> {
254        match self {
255            StreamingJob::Source(source) => Ok(get_referred_connection_ids_from_source(source)),
256            StreamingJob::Table(source, _, _) => {
257                if let Some(source) = source {
258                    Ok(get_referred_connection_ids_from_source(source))
259                } else {
260                    Ok(HashSet::new())
261                }
262            }
263            StreamingJob::Sink(sink) => Ok(get_referred_connection_ids_from_sink(sink)),
264            StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()),
265        }
266    }
267
268    // Get the secret ids that are referenced by this job.
269    pub fn dependent_secret_ids(&self) -> MetaResult<HashSet<u32>> {
270        match self {
271            StreamingJob::Sink(sink) => Ok(get_referred_secret_ids_from_sink(sink)),
272            StreamingJob::Table(source, _, _) => {
273                if let Some(source) = source {
274                    get_referred_secret_ids_from_source(source)
275                } else {
276                    Ok(HashSet::new())
277                }
278            }
279            StreamingJob::Source(source) => get_referred_secret_ids_from_source(source),
280            StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()),
281        }
282    }
283
284    /// Verify the new version is the next version of the original version.
285    pub async fn verify_version_for_replace(&self, txn: &DatabaseTransaction) -> MetaResult<()> {
286        let id = self.id();
287
288        match self {
289            StreamingJob::Table(_source, table, _table_job_type) => {
290                let new_version = table.get_version()?.get_version();
291                let original_version: Option<TableVersion> =
292                    TableModel::find_by_id(id.as_mv_table_id())
293                        .select_only()
294                        .column(table::Column::Version)
295                        .into_tuple()
296                        .one(txn)
297                        .await?
298                        .ok_or_else(|| MetaError::catalog_id_not_found(self.job_type_str(), id))?;
299                let original_version = original_version
300                    .expect("version for table should exist")
301                    .to_protobuf();
302                if new_version != original_version.version + 1 {
303                    return Err(MetaError::permission_denied("table version is stale"));
304                }
305            }
306            StreamingJob::Source(source) => {
307                let new_version = source.get_version();
308                let original_version: Option<i64> =
309                    SourceModel::find_by_id(id.as_raw_id() as SourceId)
310                        .select_only()
311                        .column(source::Column::Version)
312                        .into_tuple()
313                        .one(txn)
314                        .await?
315                        .ok_or_else(|| MetaError::catalog_id_not_found(self.job_type_str(), id))?;
316                let original_version = original_version.expect("version for source should exist");
317                if new_version != original_version as u64 + 1 {
318                    return Err(MetaError::permission_denied("source version is stale"));
319                }
320            }
321            StreamingJob::MaterializedView(_) => {
322                // No version check for materialized view, since `ALTER MATERIALIZED VIEW AS QUERY`
323                // is a full rewrite.
324            }
325            StreamingJob::Sink(_) => {
326                // No version check for sink, since sink fragment altering is triggered along with Table
327            }
328            StreamingJob::Index(_, _) => {
329                bail_not_implemented!("schema change for {}", self.job_type_str())
330            }
331        }
332        Ok(())
333    }
334
335    // Check whether we should notify the FE about the `CREATING` catalog of this job.
336    pub fn should_notify_creating(&self) -> bool {
337        self.is_materialized_view() || matches!(self.create_type(), CreateType::Background)
338    }
339
340    pub fn is_sink_into_table(&self) -> bool {
341        matches!(self, Self::Sink(sink) if sink.target_table.is_some())
342    }
343}