1use std::collections::HashSet;
16
17use risingwave_common::catalog::TableVersionId;
18use risingwave_common::util::epoch::Epoch;
19use risingwave_common::{bail_not_implemented, current_cluster_version};
20use risingwave_meta_model::object::ObjectType;
21use risingwave_meta_model::prelude::{SourceModel, TableModel};
22use risingwave_meta_model::{SourceId, TableId, TableVersion, source, table};
23use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table};
24use risingwave_pb::ddl_service::TableJobType;
25use sea_orm::entity::prelude::*;
26use sea_orm::{DatabaseTransaction, QuerySelect};
27use strum::{EnumIs, EnumTryAs};
28
29use super::{
30 get_referred_connection_ids_from_sink, get_referred_connection_ids_from_source,
31 get_referred_secret_ids_from_sink, get_referred_secret_ids_from_source,
32};
33use crate::stream::StreamFragmentGraph;
34use crate::{MetaError, MetaResult};
35
36#[derive(Debug, Clone, EnumIs, EnumTryAs)]
39pub enum StreamingJob {
40 MaterializedView(Table),
41 Sink(Sink, Option<(Table, Option<PbSource>)>),
42 Table(Option<PbSource>, Table, TableJobType),
43 Index(Index, Table),
44 Source(PbSource),
45}
46
47#[derive(Debug, Clone, Copy, PartialEq)]
48pub enum StreamingJobType {
49 MaterializedView,
50 Sink,
51 Table(TableJobType),
52 Index,
53 Source,
54}
55
56impl From<&StreamingJob> for StreamingJobType {
57 fn from(job: &StreamingJob) -> Self {
58 match job {
59 StreamingJob::MaterializedView(_) => StreamingJobType::MaterializedView,
60 StreamingJob::Sink(_, _) => StreamingJobType::Sink,
61 StreamingJob::Table(_, _, ty) => StreamingJobType::Table(*ty),
62 StreamingJob::Index(_, _) => StreamingJobType::Index,
63 StreamingJob::Source(_) => StreamingJobType::Source,
64 }
65 }
66}
67
68#[cfg(test)]
69#[allow(clippy::derivable_impls)]
70impl Default for StreamingJobType {
71 fn default() -> Self {
72 StreamingJobType::MaterializedView
75 }
76}
77
78impl StreamingJob {
79 pub fn mark_created(&mut self) {
80 let created_at_epoch = Some(Epoch::now().0);
81 let created_at_cluster_version = Some(current_cluster_version());
82 match self {
83 StreamingJob::MaterializedView(table) => {
84 table.created_at_epoch = created_at_epoch;
85 table.created_at_cluster_version = created_at_cluster_version;
86 }
87 StreamingJob::Sink(table, _) => table.created_at_epoch = created_at_epoch,
88 StreamingJob::Table(source, table, ..) => {
89 table.created_at_epoch = created_at_epoch;
90 table
91 .created_at_cluster_version
92 .clone_from(&created_at_cluster_version);
93 if let Some(source) = source {
94 source.created_at_epoch = created_at_epoch;
95 source.created_at_cluster_version = created_at_cluster_version;
96 }
97 }
98 StreamingJob::Index(index, _) => {
99 index.created_at_epoch = created_at_epoch;
100 index.created_at_cluster_version = created_at_cluster_version;
101 }
102 StreamingJob::Source(source) => {
103 source.created_at_epoch = created_at_epoch;
104 source.created_at_cluster_version = created_at_cluster_version;
105 }
106 }
107 }
108
109 pub fn mark_initialized(&mut self) {
110 let initialized_at_epoch = Some(Epoch::now().0);
111 let initialized_at_cluster_version = Some(current_cluster_version());
112 match self {
113 StreamingJob::MaterializedView(table) => {
114 table.initialized_at_epoch = initialized_at_epoch;
115 table.initialized_at_cluster_version = initialized_at_cluster_version;
116 }
117 StreamingJob::Sink(table, _) => {
118 table.initialized_at_epoch = initialized_at_epoch;
119 table.initialized_at_cluster_version = initialized_at_cluster_version;
120 }
121 StreamingJob::Table(source, table, ..) => {
122 table.initialized_at_epoch = initialized_at_epoch;
123 table
124 .initialized_at_cluster_version
125 .clone_from(&initialized_at_cluster_version);
126
127 if let Some(source) = source {
128 source.initialized_at_epoch = initialized_at_epoch;
129 source.initialized_at_cluster_version = initialized_at_cluster_version;
130 }
131 }
132 StreamingJob::Index(index, _) => {
133 index.initialized_at_epoch = initialized_at_epoch;
134 index.initialized_at_cluster_version = initialized_at_cluster_version;
135 }
136 StreamingJob::Source(source) => {
137 source.initialized_at_epoch = initialized_at_epoch;
138 source.initialized_at_cluster_version = initialized_at_cluster_version;
139 }
140 }
141 }
142}
143
144impl StreamingJob {
149 pub fn set_id(&mut self, id: u32) {
150 match self {
151 Self::MaterializedView(table) => table.id = id,
152 Self::Sink(sink, _) => sink.id = id,
153 Self::Table(_, table, ..) => table.id = id,
154 Self::Index(index, index_table) => {
155 index.id = id;
156 index.index_table_id = id;
157 index_table.id = id;
158 }
159 StreamingJob::Source(src) => {
160 src.id = id;
161 }
162 }
163 }
164
165 pub fn set_table_vnode_count(&mut self, vnode_count: usize) {
167 match self {
168 Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
169 table.maybe_vnode_count = Some(vnode_count as u32);
170 }
171 Self::Sink(_, _) | Self::Source(_) => {}
172 }
173 }
174
175 pub fn set_info_from_graph(&mut self, graph: &StreamFragmentGraph) {
177 match self {
178 Self::Table(_, table, ..) => {
179 table.fragment_id = graph.table_fragment_id();
180 table.dml_fragment_id = graph.dml_fragment_id();
181 }
182 Self::MaterializedView(table) | Self::Index(_, table) => {
183 table.fragment_id = graph.table_fragment_id();
184 }
185 Self::Sink(_, _) | Self::Source(_) => {}
186 }
187 }
188
189 pub fn id(&self) -> u32 {
190 match self {
191 Self::MaterializedView(table) => table.id,
192 Self::Sink(sink, _) => sink.id,
193 Self::Table(_, table, ..) => table.id,
194 Self::Index(index, _) => index.id,
195 Self::Source(source) => source.id,
196 }
197 }
198
199 pub fn mv_table(&self) -> Option<u32> {
200 match self {
201 Self::MaterializedView(table) => Some(table.id),
202 Self::Sink(_, _) => None,
203 Self::Table(_, table, ..) => Some(table.id),
204 Self::Index(_, table) => Some(table.id),
205 Self::Source(_) => None,
206 }
207 }
208
209 pub fn table(&self) -> Option<&Table> {
211 match self {
212 Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
213 Some(table)
214 }
215 Self::Sink(_, _) | Self::Source(_) => None,
216 }
217 }
218
219 pub fn schema_id(&self) -> u32 {
220 match self {
221 Self::MaterializedView(table) => table.schema_id,
222 Self::Sink(sink, _) => sink.schema_id,
223 Self::Table(_, table, ..) => table.schema_id,
224 Self::Index(index, _) => index.schema_id,
225 Self::Source(source) => source.schema_id,
226 }
227 }
228
229 pub fn database_id(&self) -> u32 {
230 match self {
231 Self::MaterializedView(table) => table.database_id,
232 Self::Sink(sink, _) => sink.database_id,
233 Self::Table(_, table, ..) => table.database_id,
234 Self::Index(index, _) => index.database_id,
235 Self::Source(source) => source.database_id,
236 }
237 }
238
239 pub fn name(&self) -> String {
240 match self {
241 Self::MaterializedView(table) => table.name.clone(),
242 Self::Sink(sink, _) => sink.name.clone(),
243 Self::Table(_, table, ..) => table.name.clone(),
244 Self::Index(index, _) => index.name.clone(),
245 Self::Source(source) => source.name.clone(),
246 }
247 }
248
249 pub fn owner(&self) -> u32 {
250 match self {
251 StreamingJob::MaterializedView(mv) => mv.owner,
252 StreamingJob::Sink(sink, _) => sink.owner,
253 StreamingJob::Table(_, table, ..) => table.owner,
254 StreamingJob::Index(index, _) => index.owner,
255 StreamingJob::Source(source) => source.owner,
256 }
257 }
258
259 pub fn job_type(&self) -> StreamingJobType {
260 self.into()
261 }
262
263 pub fn job_type_str(&self) -> &'static str {
264 match self {
265 StreamingJob::MaterializedView(_) => "materialized view",
266 StreamingJob::Sink(_, _) => "sink",
267 StreamingJob::Table(_, _, _) => "table",
268 StreamingJob::Index(_, _) => "index",
269 StreamingJob::Source(_) => "source",
270 }
271 }
272
273 pub fn definition(&self) -> String {
274 match self {
275 Self::MaterializedView(table) => table.definition.clone(),
276 Self::Table(_, table, ..) => table.definition.clone(),
277 Self::Index(_, table) => table.definition.clone(),
278 Self::Sink(sink, _) => sink.definition.clone(),
279 Self::Source(source) => source.definition.clone(),
280 }
281 }
282
283 pub fn object_type(&self) -> ObjectType {
284 match self {
285 Self::MaterializedView(_) => ObjectType::Table, Self::Sink(_, _) => ObjectType::Sink,
287 Self::Table(_, _, _) => ObjectType::Table,
288 Self::Index(_, _) => ObjectType::Index,
289 Self::Source(_) => ObjectType::Source,
290 }
291 }
292
293 pub fn table_version_id(&self) -> Option<TableVersionId> {
295 if let Self::Table(_, table, ..) = self {
296 Some(
297 table
298 .get_version()
299 .expect("table must be versioned")
300 .version,
301 )
302 } else {
303 None
304 }
305 }
306
307 pub fn create_type(&self) -> CreateType {
308 match self {
309 Self::MaterializedView(table) => {
310 table.get_create_type().unwrap_or(CreateType::Foreground)
311 }
312 Self::Sink(s, _) => s.get_create_type().unwrap_or(CreateType::Foreground),
313 _ => CreateType::Foreground,
314 }
315 }
316
317 pub fn dependent_relations(&self) -> Vec<u32> {
319 match self {
320 StreamingJob::MaterializedView(table) => table.dependent_relations.clone(),
321 StreamingJob::Sink(_sink, _) => vec![], StreamingJob::Table(_, table, _) => table.dependent_relations.clone(), StreamingJob::Index(index, index_table) => {
324 assert_eq!(index.primary_table_id, index_table.dependent_relations[0]);
326 vec![]
327 }
328 StreamingJob::Source(_) => vec![],
329 }
330 }
331
332 pub fn dependent_connection_ids(&self) -> MetaResult<HashSet<u32>> {
333 match self {
334 StreamingJob::Source(source) => Ok(get_referred_connection_ids_from_source(source)),
335 StreamingJob::Table(source, _, _) => {
336 if let Some(source) = source {
337 Ok(get_referred_connection_ids_from_source(source))
338 } else {
339 Ok(HashSet::new())
340 }
341 }
342 StreamingJob::Sink(sink, _) => Ok(get_referred_connection_ids_from_sink(sink)),
343 StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()),
344 }
345 }
346
347 pub fn dependent_secret_ids(&self) -> MetaResult<HashSet<u32>> {
349 match self {
350 StreamingJob::Sink(sink, _) => Ok(get_referred_secret_ids_from_sink(sink)),
351 StreamingJob::Table(source, _, _) => {
352 if let Some(source) = source {
353 get_referred_secret_ids_from_source(source)
354 } else {
355 Ok(HashSet::new())
356 }
357 }
358 StreamingJob::Source(source) => get_referred_secret_ids_from_source(source),
359 StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()),
360 }
361 }
362
363 pub async fn verify_version_for_replace(&self, txn: &DatabaseTransaction) -> MetaResult<()> {
365 let id = self.id();
366
367 match self {
368 StreamingJob::Table(_source, table, _table_job_type) => {
369 let new_version = table.get_version()?.get_version();
370 let original_version: Option<TableVersion> = TableModel::find_by_id(id as TableId)
371 .select_only()
372 .column(table::Column::Version)
373 .into_tuple()
374 .one(txn)
375 .await?
376 .ok_or_else(|| MetaError::catalog_id_not_found(self.job_type_str(), id))?;
377 let original_version = original_version
378 .expect("version for table should exist")
379 .to_protobuf();
380 if new_version != original_version.version + 1 {
381 return Err(MetaError::permission_denied("table version is stale"));
382 }
383 }
384 StreamingJob::Source(source) => {
385 let new_version = source.get_version();
386 let original_version: Option<i64> = SourceModel::find_by_id(id as SourceId)
387 .select_only()
388 .column(source::Column::Version)
389 .into_tuple()
390 .one(txn)
391 .await?
392 .ok_or_else(|| MetaError::catalog_id_not_found(self.job_type_str(), id))?;
393 let original_version = original_version.expect("version for source should exist");
394 if new_version != original_version as u64 + 1 {
395 return Err(MetaError::permission_denied("source version is stale"));
396 }
397 }
398 StreamingJob::MaterializedView(_)
399 | StreamingJob::Sink(_, _)
400 | StreamingJob::Index(_, _) => {
401 bail_not_implemented!("schema change for {}", self.job_type_str())
402 }
403 }
404 Ok(())
405 }
406}