risingwave_meta/manager/
streaming_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_common::catalog::TableVersionId;
18use risingwave_common::util::epoch::Epoch;
19use risingwave_common::{bail_not_implemented, current_cluster_version};
20use risingwave_meta_model::object::ObjectType;
21use risingwave_meta_model::prelude::{SourceModel, TableModel};
22use risingwave_meta_model::{SourceId, TableId, TableVersion, source, table};
23use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table};
24use risingwave_pb::ddl_service::TableJobType;
25use sea_orm::entity::prelude::*;
26use sea_orm::{DatabaseTransaction, QuerySelect};
27use strum::{EnumIs, EnumTryAs};
28
29use super::{
30    get_referred_connection_ids_from_sink, get_referred_connection_ids_from_source,
31    get_referred_secret_ids_from_sink, get_referred_secret_ids_from_source,
32};
33use crate::stream::StreamFragmentGraph;
34use crate::{MetaError, MetaResult};
35
36// This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and
37// Sink.
38#[derive(Debug, Clone, EnumIs, EnumTryAs)]
39pub enum StreamingJob {
40    MaterializedView(Table),
41    Sink(Sink, Option<(Table, Option<PbSource>)>),
42    Table(Option<PbSource>, Table, TableJobType),
43    Index(Index, Table),
44    Source(PbSource),
45}
46
47#[derive(Debug, Clone, Copy, PartialEq)]
48pub enum StreamingJobType {
49    MaterializedView,
50    Sink,
51    Table(TableJobType),
52    Index,
53    Source,
54}
55
56impl From<&StreamingJob> for StreamingJobType {
57    fn from(job: &StreamingJob) -> Self {
58        match job {
59            StreamingJob::MaterializedView(_) => StreamingJobType::MaterializedView,
60            StreamingJob::Sink(_, _) => StreamingJobType::Sink,
61            StreamingJob::Table(_, _, ty) => StreamingJobType::Table(*ty),
62            StreamingJob::Index(_, _) => StreamingJobType::Index,
63            StreamingJob::Source(_) => StreamingJobType::Source,
64        }
65    }
66}
67
68#[cfg(test)]
69#[allow(clippy::derivable_impls)]
70impl Default for StreamingJobType {
71    fn default() -> Self {
72        // This should not be used by mock services,
73        // so we can just pick an arbitrary default variant.
74        StreamingJobType::MaterializedView
75    }
76}
77
78impl StreamingJob {
79    pub fn mark_created(&mut self) {
80        let created_at_epoch = Some(Epoch::now().0);
81        let created_at_cluster_version = Some(current_cluster_version());
82        match self {
83            StreamingJob::MaterializedView(table) => {
84                table.created_at_epoch = created_at_epoch;
85                table.created_at_cluster_version = created_at_cluster_version;
86            }
87            StreamingJob::Sink(table, _) => table.created_at_epoch = created_at_epoch,
88            StreamingJob::Table(source, table, ..) => {
89                table.created_at_epoch = created_at_epoch;
90                table
91                    .created_at_cluster_version
92                    .clone_from(&created_at_cluster_version);
93                if let Some(source) = source {
94                    source.created_at_epoch = created_at_epoch;
95                    source.created_at_cluster_version = created_at_cluster_version;
96                }
97            }
98            StreamingJob::Index(index, _) => {
99                index.created_at_epoch = created_at_epoch;
100                index.created_at_cluster_version = created_at_cluster_version;
101            }
102            StreamingJob::Source(source) => {
103                source.created_at_epoch = created_at_epoch;
104                source.created_at_cluster_version = created_at_cluster_version;
105            }
106        }
107    }
108
109    pub fn mark_initialized(&mut self) {
110        let initialized_at_epoch = Some(Epoch::now().0);
111        let initialized_at_cluster_version = Some(current_cluster_version());
112        match self {
113            StreamingJob::MaterializedView(table) => {
114                table.initialized_at_epoch = initialized_at_epoch;
115                table.initialized_at_cluster_version = initialized_at_cluster_version;
116            }
117            StreamingJob::Sink(table, _) => {
118                table.initialized_at_epoch = initialized_at_epoch;
119                table.initialized_at_cluster_version = initialized_at_cluster_version;
120            }
121            StreamingJob::Table(source, table, ..) => {
122                table.initialized_at_epoch = initialized_at_epoch;
123                table
124                    .initialized_at_cluster_version
125                    .clone_from(&initialized_at_cluster_version);
126
127                if let Some(source) = source {
128                    source.initialized_at_epoch = initialized_at_epoch;
129                    source.initialized_at_cluster_version = initialized_at_cluster_version;
130                }
131            }
132            StreamingJob::Index(index, _) => {
133                index.initialized_at_epoch = initialized_at_epoch;
134                index.initialized_at_cluster_version = initialized_at_cluster_version;
135            }
136            StreamingJob::Source(source) => {
137                source.initialized_at_epoch = initialized_at_epoch;
138                source.initialized_at_cluster_version = initialized_at_cluster_version;
139            }
140        }
141    }
142}
143
144// TODO: basically we want to ensure that the `Table` persisted in the catalog is the same as the
145// one in the `Materialize` node in the actor. However, they are currently handled separately
146// and can be out of sync. Shall we directly copy the whole struct from the actor to the catalog
147// to avoid `set`ting each field separately?
148impl StreamingJob {
149    pub fn set_id(&mut self, id: u32) {
150        match self {
151            Self::MaterializedView(table) => table.id = id,
152            Self::Sink(sink, _) => sink.id = id,
153            Self::Table(_, table, ..) => table.id = id,
154            Self::Index(index, index_table) => {
155                index.id = id;
156                index.index_table_id = id;
157                index_table.id = id;
158            }
159            StreamingJob::Source(src) => {
160                src.id = id;
161            }
162        }
163    }
164
165    /// Set the vnode count of the table.
166    pub fn set_table_vnode_count(&mut self, vnode_count: usize) {
167        match self {
168            Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
169                table.maybe_vnode_count = Some(vnode_count as u32);
170            }
171            Self::Sink(_, _) | Self::Source(_) => {}
172        }
173    }
174
175    /// Add some info which is only available in fragment graph to the catalog.
176    pub fn set_info_from_graph(&mut self, graph: &StreamFragmentGraph) {
177        match self {
178            Self::Table(_, table, ..) => {
179                table.fragment_id = graph.table_fragment_id();
180                table.dml_fragment_id = graph.dml_fragment_id();
181            }
182            Self::MaterializedView(table) | Self::Index(_, table) => {
183                table.fragment_id = graph.table_fragment_id();
184            }
185            Self::Sink(_, _) | Self::Source(_) => {}
186        }
187    }
188
189    pub fn id(&self) -> u32 {
190        match self {
191            Self::MaterializedView(table) => table.id,
192            Self::Sink(sink, _) => sink.id,
193            Self::Table(_, table, ..) => table.id,
194            Self::Index(index, _) => index.id,
195            Self::Source(source) => source.id,
196        }
197    }
198
199    pub fn mv_table(&self) -> Option<u32> {
200        match self {
201            Self::MaterializedView(table) => Some(table.id),
202            Self::Sink(_, _) => None,
203            Self::Table(_, table, ..) => Some(table.id),
204            Self::Index(_, table) => Some(table.id),
205            Self::Source(_) => None,
206        }
207    }
208
209    /// Returns the reference to the [`Table`] of the job if it exists.
210    pub fn table(&self) -> Option<&Table> {
211        match self {
212            Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
213                Some(table)
214            }
215            Self::Sink(_, _) | Self::Source(_) => None,
216        }
217    }
218
219    pub fn schema_id(&self) -> u32 {
220        match self {
221            Self::MaterializedView(table) => table.schema_id,
222            Self::Sink(sink, _) => sink.schema_id,
223            Self::Table(_, table, ..) => table.schema_id,
224            Self::Index(index, _) => index.schema_id,
225            Self::Source(source) => source.schema_id,
226        }
227    }
228
229    pub fn database_id(&self) -> u32 {
230        match self {
231            Self::MaterializedView(table) => table.database_id,
232            Self::Sink(sink, _) => sink.database_id,
233            Self::Table(_, table, ..) => table.database_id,
234            Self::Index(index, _) => index.database_id,
235            Self::Source(source) => source.database_id,
236        }
237    }
238
239    pub fn name(&self) -> String {
240        match self {
241            Self::MaterializedView(table) => table.name.clone(),
242            Self::Sink(sink, _) => sink.name.clone(),
243            Self::Table(_, table, ..) => table.name.clone(),
244            Self::Index(index, _) => index.name.clone(),
245            Self::Source(source) => source.name.clone(),
246        }
247    }
248
249    pub fn owner(&self) -> u32 {
250        match self {
251            StreamingJob::MaterializedView(mv) => mv.owner,
252            StreamingJob::Sink(sink, _) => sink.owner,
253            StreamingJob::Table(_, table, ..) => table.owner,
254            StreamingJob::Index(index, _) => index.owner,
255            StreamingJob::Source(source) => source.owner,
256        }
257    }
258
259    pub fn job_type(&self) -> StreamingJobType {
260        self.into()
261    }
262
263    pub fn job_type_str(&self) -> &'static str {
264        match self {
265            StreamingJob::MaterializedView(_) => "materialized view",
266            StreamingJob::Sink(_, _) => "sink",
267            StreamingJob::Table(_, _, _) => "table",
268            StreamingJob::Index(_, _) => "index",
269            StreamingJob::Source(_) => "source",
270        }
271    }
272
273    pub fn definition(&self) -> String {
274        match self {
275            Self::MaterializedView(table) => table.definition.clone(),
276            Self::Table(_, table, ..) => table.definition.clone(),
277            Self::Index(_, table) => table.definition.clone(),
278            Self::Sink(sink, _) => sink.definition.clone(),
279            Self::Source(source) => source.definition.clone(),
280        }
281    }
282
283    pub fn object_type(&self) -> ObjectType {
284        match self {
285            Self::MaterializedView(_) => ObjectType::Table, // Note MV is special.
286            Self::Sink(_, _) => ObjectType::Sink,
287            Self::Table(_, _, _) => ObjectType::Table,
288            Self::Index(_, _) => ObjectType::Index,
289            Self::Source(_) => ObjectType::Source,
290        }
291    }
292
293    /// Returns the [`TableVersionId`] if this job is `Table`.
294    pub fn table_version_id(&self) -> Option<TableVersionId> {
295        if let Self::Table(_, table, ..) = self {
296            Some(
297                table
298                    .get_version()
299                    .expect("table must be versioned")
300                    .version,
301            )
302        } else {
303            None
304        }
305    }
306
307    pub fn create_type(&self) -> CreateType {
308        match self {
309            Self::MaterializedView(table) => {
310                table.get_create_type().unwrap_or(CreateType::Foreground)
311            }
312            Self::Sink(s, _) => s.get_create_type().unwrap_or(CreateType::Foreground),
313            _ => CreateType::Foreground,
314        }
315    }
316
317    // TODO: to be removed, pass all objects uniformly through `dependencies` field instead.
318    pub fn dependent_relations(&self) -> Vec<u32> {
319        match self {
320            StreamingJob::MaterializedView(table) => table.dependent_relations.clone(),
321            StreamingJob::Sink(_sink, _) => vec![], /* sink dependencies are now passed via `dependencies` field in `CreateSinkRequest` */
322            StreamingJob::Table(_, table, _) => table.dependent_relations.clone(), /* TODO(rc): record table dependencies via `dependencies` field */
323            StreamingJob::Index(index, index_table) => {
324                // TODO(rc): record index dependencies via `dependencies` field
325                assert_eq!(index.primary_table_id, index_table.dependent_relations[0]);
326                vec![]
327            }
328            StreamingJob::Source(_) => vec![],
329        }
330    }
331
332    pub fn dependent_connection_ids(&self) -> MetaResult<HashSet<u32>> {
333        match self {
334            StreamingJob::Source(source) => Ok(get_referred_connection_ids_from_source(source)),
335            StreamingJob::Table(source, _, _) => {
336                if let Some(source) = source {
337                    Ok(get_referred_connection_ids_from_source(source))
338                } else {
339                    Ok(HashSet::new())
340                }
341            }
342            StreamingJob::Sink(sink, _) => Ok(get_referred_connection_ids_from_sink(sink)),
343            StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()),
344        }
345    }
346
347    // Get the secret ids that are referenced by this job.
348    pub fn dependent_secret_ids(&self) -> MetaResult<HashSet<u32>> {
349        match self {
350            StreamingJob::Sink(sink, _) => Ok(get_referred_secret_ids_from_sink(sink)),
351            StreamingJob::Table(source, _, _) => {
352                if let Some(source) = source {
353                    get_referred_secret_ids_from_source(source)
354                } else {
355                    Ok(HashSet::new())
356                }
357            }
358            StreamingJob::Source(source) => get_referred_secret_ids_from_source(source),
359            StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()),
360        }
361    }
362
363    /// Verify the new version is the next version of the original version.
364    pub async fn verify_version_for_replace(&self, txn: &DatabaseTransaction) -> MetaResult<()> {
365        let id = self.id();
366
367        match self {
368            StreamingJob::Table(_source, table, _table_job_type) => {
369                let new_version = table.get_version()?.get_version();
370                let original_version: Option<TableVersion> = TableModel::find_by_id(id as TableId)
371                    .select_only()
372                    .column(table::Column::Version)
373                    .into_tuple()
374                    .one(txn)
375                    .await?
376                    .ok_or_else(|| MetaError::catalog_id_not_found(self.job_type_str(), id))?;
377                let original_version = original_version
378                    .expect("version for table should exist")
379                    .to_protobuf();
380                if new_version != original_version.version + 1 {
381                    return Err(MetaError::permission_denied("table version is stale"));
382                }
383            }
384            StreamingJob::Source(source) => {
385                let new_version = source.get_version();
386                let original_version: Option<i64> = SourceModel::find_by_id(id as SourceId)
387                    .select_only()
388                    .column(source::Column::Version)
389                    .into_tuple()
390                    .one(txn)
391                    .await?
392                    .ok_or_else(|| MetaError::catalog_id_not_found(self.job_type_str(), id))?;
393                let original_version = original_version.expect("version for source should exist");
394                if new_version != original_version as u64 + 1 {
395                    return Err(MetaError::permission_denied("source version is stale"));
396                }
397            }
398            StreamingJob::MaterializedView(_)
399            | StreamingJob::Sink(_, _)
400            | StreamingJob::Index(_, _) => {
401                bail_not_implemented!("schema change for {}", self.job_type_str())
402            }
403        }
404        Ok(())
405    }
406}