1use std::collections::HashSet;
16
17use risingwave_common::bail_not_implemented;
18use risingwave_common::catalog::TableVersionId;
19use risingwave_meta_model::object::ObjectType;
20use risingwave_meta_model::prelude::{SourceModel, TableModel};
21use risingwave_meta_model::{SourceId, TableId, TableVersion, source, table};
22use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table};
23use risingwave_pb::ddl_service::TableJobType;
24use sea_orm::entity::prelude::*;
25use sea_orm::{DatabaseTransaction, QuerySelect};
26use strum::{EnumIs, EnumTryAs};
27
28use super::{
29 get_referred_connection_ids_from_sink, get_referred_connection_ids_from_source,
30 get_referred_secret_ids_from_sink, get_referred_secret_ids_from_source,
31};
32use crate::stream::StreamFragmentGraph;
33use crate::{MetaError, MetaResult};
34
35#[derive(Debug, Clone, EnumIs, EnumTryAs)]
38pub enum StreamingJob {
39 MaterializedView(Table),
40 Sink(Sink, Option<(Table, Option<PbSource>)>),
41 Table(Option<PbSource>, Table, TableJobType),
42 Index(Index, Table),
43 Source(PbSource),
44}
45
46impl std::fmt::Display for StreamingJob {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 match self {
49 StreamingJob::MaterializedView(table) => {
50 write!(f, "MaterializedView: {}({})", table.name, table.id)
51 }
52 StreamingJob::Sink(sink, _) => write!(f, "Sink: {}({})", sink.name, sink.id),
53 StreamingJob::Table(_, table, _) => write!(f, "Table: {}({})", table.name, table.id),
54 StreamingJob::Index(index, _) => write!(f, "Index: {}({})", index.name, index.id),
55 StreamingJob::Source(source) => write!(f, "Source: {}({})", source.name, source.id),
56 }
57 }
58}
59
60#[derive(Debug, Clone, Copy, PartialEq)]
61pub enum StreamingJobType {
62 MaterializedView,
63 Sink,
64 Table(TableJobType),
65 Index,
66 Source,
67}
68
69impl From<&StreamingJob> for StreamingJobType {
70 fn from(job: &StreamingJob) -> Self {
71 match job {
72 StreamingJob::MaterializedView(_) => StreamingJobType::MaterializedView,
73 StreamingJob::Sink(_, _) => StreamingJobType::Sink,
74 StreamingJob::Table(_, _, ty) => StreamingJobType::Table(*ty),
75 StreamingJob::Index(_, _) => StreamingJobType::Index,
76 StreamingJob::Source(_) => StreamingJobType::Source,
77 }
78 }
79}
80
81#[cfg(test)]
82#[allow(clippy::derivable_impls)]
83impl Default for StreamingJobType {
84 fn default() -> Self {
85 StreamingJobType::MaterializedView
88 }
89}
90
91impl StreamingJob {
96 pub fn set_table_vnode_count(&mut self, vnode_count: usize) {
98 match self {
99 Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
100 table.maybe_vnode_count = Some(vnode_count as u32);
101 }
102 Self::Sink(_, _) | Self::Source(_) => {}
103 }
104 }
105
106 pub fn set_info_from_graph(&mut self, graph: &StreamFragmentGraph) {
108 match self {
109 Self::Table(_, table, ..) => {
110 table.fragment_id = graph.table_fragment_id();
111 table.dml_fragment_id = graph.dml_fragment_id();
112 }
113 Self::MaterializedView(table) | Self::Index(_, table) => {
114 table.fragment_id = graph.table_fragment_id();
115 }
116 Self::Sink(_, _) | Self::Source(_) => {}
117 }
118 }
119
120 pub fn id(&self) -> u32 {
121 match self {
122 Self::MaterializedView(table) => table.id,
123 Self::Sink(sink, _) => sink.id,
124 Self::Table(_, table, ..) => table.id,
125 Self::Index(index, _) => index.id,
126 Self::Source(source) => source.id,
127 }
128 }
129
130 pub fn mv_table(&self) -> Option<u32> {
131 match self {
132 Self::MaterializedView(table) => Some(table.id),
133 Self::Sink(_, _) => None,
134 Self::Table(_, table, ..) => Some(table.id),
135 Self::Index(_, table) => Some(table.id),
136 Self::Source(_) => None,
137 }
138 }
139
140 pub fn table(&self) -> Option<&Table> {
142 match self {
143 Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
144 Some(table)
145 }
146 Self::Sink(_, _) | Self::Source(_) => None,
147 }
148 }
149
150 pub fn schema_id(&self) -> u32 {
151 match self {
152 Self::MaterializedView(table) => table.schema_id,
153 Self::Sink(sink, _) => sink.schema_id,
154 Self::Table(_, table, ..) => table.schema_id,
155 Self::Index(index, _) => index.schema_id,
156 Self::Source(source) => source.schema_id,
157 }
158 }
159
160 pub fn database_id(&self) -> u32 {
161 match self {
162 Self::MaterializedView(table) => table.database_id,
163 Self::Sink(sink, _) => sink.database_id,
164 Self::Table(_, table, ..) => table.database_id,
165 Self::Index(index, _) => index.database_id,
166 Self::Source(source) => source.database_id,
167 }
168 }
169
170 pub fn name(&self) -> String {
171 match self {
172 Self::MaterializedView(table) => table.name.clone(),
173 Self::Sink(sink, _) => sink.name.clone(),
174 Self::Table(_, table, ..) => table.name.clone(),
175 Self::Index(index, _) => index.name.clone(),
176 Self::Source(source) => source.name.clone(),
177 }
178 }
179
180 pub fn owner(&self) -> u32 {
181 match self {
182 StreamingJob::MaterializedView(mv) => mv.owner,
183 StreamingJob::Sink(sink, _) => sink.owner,
184 StreamingJob::Table(_, table, ..) => table.owner,
185 StreamingJob::Index(index, _) => index.owner,
186 StreamingJob::Source(source) => source.owner,
187 }
188 }
189
190 pub fn job_type(&self) -> StreamingJobType {
191 self.into()
192 }
193
194 pub fn job_type_str(&self) -> &'static str {
195 match self {
196 StreamingJob::MaterializedView(_) => "materialized view",
197 StreamingJob::Sink(_, _) => "sink",
198 StreamingJob::Table(_, _, _) => "table",
199 StreamingJob::Index(_, _) => "index",
200 StreamingJob::Source(_) => "source",
201 }
202 }
203
204 pub fn definition(&self) -> String {
205 match self {
206 Self::MaterializedView(table) => table.definition.clone(),
207 Self::Table(_, table, ..) => table.definition.clone(),
208 Self::Index(_, table) => table.definition.clone(),
209 Self::Sink(sink, _) => sink.definition.clone(),
210 Self::Source(source) => source.definition.clone(),
211 }
212 }
213
214 pub fn object_type(&self) -> ObjectType {
215 match self {
216 Self::MaterializedView(_) => ObjectType::Table, Self::Sink(_, _) => ObjectType::Sink,
218 Self::Table(_, _, _) => ObjectType::Table,
219 Self::Index(_, _) => ObjectType::Index,
220 Self::Source(_) => ObjectType::Source,
221 }
222 }
223
224 pub fn table_version_id(&self) -> Option<TableVersionId> {
226 if let Self::Table(_, table, ..) = self {
227 Some(
228 table
229 .get_version()
230 .expect("table must be versioned")
231 .version,
232 )
233 } else {
234 None
235 }
236 }
237
238 pub fn create_type(&self) -> CreateType {
239 match self {
240 Self::MaterializedView(table) => {
241 table.get_create_type().unwrap_or(CreateType::Foreground)
242 }
243 Self::Sink(s, _) => s.get_create_type().unwrap_or(CreateType::Foreground),
244 _ => CreateType::Foreground,
245 }
246 }
247
248 pub fn dependent_relations(&self) -> Vec<u32> {
250 match self {
251 StreamingJob::MaterializedView(table) => table.dependent_relations.clone(),
252 StreamingJob::Sink(_sink, _) => vec![], StreamingJob::Table(_, table, _) => table.dependent_relations.clone(), StreamingJob::Index(index, index_table) => {
255 assert_eq!(index.primary_table_id, index_table.dependent_relations[0]);
257 vec![]
258 }
259 StreamingJob::Source(_) => vec![],
260 }
261 }
262
263 pub fn dependent_connection_ids(&self) -> MetaResult<HashSet<u32>> {
264 match self {
265 StreamingJob::Source(source) => Ok(get_referred_connection_ids_from_source(source)),
266 StreamingJob::Table(source, _, _) => {
267 if let Some(source) = source {
268 Ok(get_referred_connection_ids_from_source(source))
269 } else {
270 Ok(HashSet::new())
271 }
272 }
273 StreamingJob::Sink(sink, _) => Ok(get_referred_connection_ids_from_sink(sink)),
274 StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()),
275 }
276 }
277
278 pub fn dependent_secret_ids(&self) -> MetaResult<HashSet<u32>> {
280 match self {
281 StreamingJob::Sink(sink, _) => Ok(get_referred_secret_ids_from_sink(sink)),
282 StreamingJob::Table(source, _, _) => {
283 if let Some(source) = source {
284 get_referred_secret_ids_from_source(source)
285 } else {
286 Ok(HashSet::new())
287 }
288 }
289 StreamingJob::Source(source) => get_referred_secret_ids_from_source(source),
290 StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()),
291 }
292 }
293
294 pub async fn verify_version_for_replace(&self, txn: &DatabaseTransaction) -> MetaResult<()> {
296 let id = self.id();
297
298 match self {
299 StreamingJob::Table(_source, table, _table_job_type) => {
300 let new_version = table.get_version()?.get_version();
301 let original_version: Option<TableVersion> = TableModel::find_by_id(id as TableId)
302 .select_only()
303 .column(table::Column::Version)
304 .into_tuple()
305 .one(txn)
306 .await?
307 .ok_or_else(|| MetaError::catalog_id_not_found(self.job_type_str(), id))?;
308 let original_version = original_version
309 .expect("version for table should exist")
310 .to_protobuf();
311 if new_version != original_version.version + 1 {
312 return Err(MetaError::permission_denied("table version is stale"));
313 }
314 }
315 StreamingJob::Source(source) => {
316 let new_version = source.get_version();
317 let original_version: Option<i64> = SourceModel::find_by_id(id as SourceId)
318 .select_only()
319 .column(source::Column::Version)
320 .into_tuple()
321 .one(txn)
322 .await?
323 .ok_or_else(|| MetaError::catalog_id_not_found(self.job_type_str(), id))?;
324 let original_version = original_version.expect("version for source should exist");
325 if new_version != original_version as u64 + 1 {
326 return Err(MetaError::permission_denied("source version is stale"));
327 }
328 }
329 StreamingJob::MaterializedView(_)
330 | StreamingJob::Sink(_, _)
331 | StreamingJob::Index(_, _) => {
332 bail_not_implemented!("schema change for {}", self.job_type_str())
333 }
334 }
335 Ok(())
336 }
337}