1use std::collections::HashSet;
16
17use risingwave_common::bail_not_implemented;
18use risingwave_common::catalog::TableVersionId;
19use risingwave_common::id::{ConnectionId, DatabaseId, JobId, SchemaId, SecretId};
20use risingwave_meta_model::object::ObjectType;
21use risingwave_meta_model::prelude::{SourceModel, TableModel};
22use risingwave_meta_model::{TableVersion, source, table};
23use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table};
24use risingwave_pb::ddl_service::TableJobType;
25use sea_orm::entity::prelude::*;
26use sea_orm::{DatabaseTransaction, QuerySelect};
27use strum::{EnumIs, EnumTryAs};
28
29use super::{
30 get_referred_connection_ids_from_sink, get_referred_connection_ids_from_source,
31 get_referred_secret_ids_from_sink, get_referred_secret_ids_from_source,
32};
33use crate::stream::StreamFragmentGraph;
34use crate::{MetaError, MetaResult};
35
36#[derive(Debug, Clone, EnumIs, EnumTryAs)]
39pub enum StreamingJob {
40 MaterializedView(Table),
41 Sink(Sink),
42 Table(Option<PbSource>, Table, TableJobType),
43 Index(Index, Table),
44 Source(PbSource),
45}
46
47impl std::fmt::Display for StreamingJob {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 match self {
50 StreamingJob::MaterializedView(table) => {
51 write!(f, "MaterializedView: {}({})", table.name, table.id)
52 }
53 StreamingJob::Sink(sink) => write!(f, "Sink: {}({})", sink.name, sink.id),
54 StreamingJob::Table(_, table, _) => write!(f, "Table: {}({})", table.name, table.id),
55 StreamingJob::Index(index, _) => write!(f, "Index: {}({})", index.name, index.id),
56 StreamingJob::Source(source) => write!(f, "Source: {}({})", source.name, source.id),
57 }
58 }
59}
60
61#[derive(Debug, Clone, Copy, PartialEq)]
62pub enum StreamingJobType {
63 MaterializedView,
64 Sink,
65 Table(TableJobType),
66 Index,
67 Source,
68}
69
70impl From<&StreamingJob> for StreamingJobType {
71 fn from(job: &StreamingJob) -> Self {
72 match job {
73 StreamingJob::MaterializedView(_) => StreamingJobType::MaterializedView,
74 StreamingJob::Sink(_) => StreamingJobType::Sink,
75 StreamingJob::Table(_, _, ty) => StreamingJobType::Table(*ty),
76 StreamingJob::Index(_, _) => StreamingJobType::Index,
77 StreamingJob::Source(_) => StreamingJobType::Source,
78 }
79 }
80}
81
82#[cfg(test)]
83#[allow(clippy::derivable_impls)]
84impl Default for StreamingJobType {
85 fn default() -> Self {
86 StreamingJobType::MaterializedView
89 }
90}
91
92impl StreamingJob {
97 pub fn set_table_vnode_count(&mut self, vnode_count: usize) {
99 match self {
100 Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
101 table.maybe_vnode_count = Some(vnode_count as u32);
102 }
103 Self::Sink(_) | Self::Source(_) => {}
104 }
105 }
106
107 pub fn set_info_from_graph(&mut self, graph: &StreamFragmentGraph) {
109 match self {
110 Self::Table(_, table, ..) => {
111 table.fragment_id = graph.table_fragment_id();
112 table.dml_fragment_id = graph.dml_fragment_id();
113 }
114 Self::MaterializedView(table) | Self::Index(_, table) => {
115 table.fragment_id = graph.table_fragment_id();
116 }
117 Self::Sink(_) | Self::Source(_) => {}
118 }
119 }
120
121 pub fn id(&self) -> JobId {
122 match self {
123 Self::MaterializedView(table) => table.id.as_job_id(),
124 Self::Sink(sink) => sink.id.as_job_id(),
125 Self::Table(_, table, ..) => table.id.as_job_id(),
126 Self::Index(index, _) => index.id.as_job_id(),
127 Self::Source(source) => source.id.as_share_source_job_id(),
128 }
129 }
130
131 pub fn table(&self) -> Option<&Table> {
133 match self {
134 Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
135 Some(table)
136 }
137 Self::Sink(_) | Self::Source(_) => None,
138 }
139 }
140
141 pub fn schema_id(&self) -> SchemaId {
142 match self {
143 Self::MaterializedView(table) => table.schema_id,
144 Self::Sink(sink) => sink.schema_id,
145 Self::Table(_, table, ..) => table.schema_id,
146 Self::Index(index, _) => index.schema_id,
147 Self::Source(source) => source.schema_id,
148 }
149 }
150
151 pub fn database_id(&self) -> DatabaseId {
152 match self {
153 Self::MaterializedView(table) => table.database_id,
154 Self::Sink(sink) => sink.database_id,
155 Self::Table(_, table, ..) => table.database_id,
156 Self::Index(index, _) => index.database_id,
157 Self::Source(source) => source.database_id,
158 }
159 }
160
161 pub fn name(&self) -> String {
162 match self {
163 Self::MaterializedView(table) => table.name.clone(),
164 Self::Sink(sink) => sink.name.clone(),
165 Self::Table(_, table, ..) => table.name.clone(),
166 Self::Index(index, _) => index.name.clone(),
167 Self::Source(source) => source.name.clone(),
168 }
169 }
170
171 pub fn owner(&self) -> u32 {
172 match self {
173 StreamingJob::MaterializedView(mv) => mv.owner,
174 StreamingJob::Sink(sink) => sink.owner,
175 StreamingJob::Table(_, table, ..) => table.owner,
176 StreamingJob::Index(index, _) => index.owner,
177 StreamingJob::Source(source) => source.owner,
178 }
179 }
180
181 pub fn job_type(&self) -> StreamingJobType {
182 self.into()
183 }
184
185 pub fn job_type_str(&self) -> &'static str {
186 match self {
187 StreamingJob::MaterializedView(_) => "materialized view",
188 StreamingJob::Sink(_) => "sink",
189 StreamingJob::Table(_, _, _) => "table",
190 StreamingJob::Index(_, _) => "index",
191 StreamingJob::Source(_) => "source",
192 }
193 }
194
195 pub fn definition(&self) -> String {
196 match self {
197 Self::MaterializedView(table) => table.definition.clone(),
198 Self::Table(_, table, ..) => table.definition.clone(),
199 Self::Index(_, table) => table.definition.clone(),
200 Self::Sink(sink) => sink.definition.clone(),
201 Self::Source(source) => source.definition.clone(),
202 }
203 }
204
205 pub fn object_type(&self) -> ObjectType {
206 match self {
207 Self::MaterializedView(_) => ObjectType::Table, Self::Sink(_) => ObjectType::Sink,
209 Self::Table(_, _, _) => ObjectType::Table,
210 Self::Index(_, _) => ObjectType::Index,
211 Self::Source(_) => ObjectType::Source,
212 }
213 }
214
215 pub fn table_version_id(&self) -> Option<TableVersionId> {
217 if let Self::Table(_, table, ..) = self {
218 Some(
219 table
220 .get_version()
221 .expect("table must be versioned")
222 .version,
223 )
224 } else {
225 None
226 }
227 }
228
229 pub fn create_type(&self) -> CreateType {
230 match self {
231 Self::Table(_, table, ..) => table.get_create_type().unwrap_or(CreateType::Foreground),
232 Self::MaterializedView(table) => {
233 table.get_create_type().unwrap_or(CreateType::Foreground)
234 }
235 Self::Sink(s) => s.get_create_type().unwrap_or(CreateType::Foreground),
236 Self::Index(index, _) => {
237 CreateType::try_from(index.create_type).unwrap_or(CreateType::Foreground)
238 }
239 _ => CreateType::Foreground,
240 }
241 }
242
243 pub fn dependent_connection_ids(&self) -> MetaResult<HashSet<ConnectionId>> {
244 match self {
245 StreamingJob::Source(source) => Ok(get_referred_connection_ids_from_source(source)),
246 StreamingJob::Table(source, _, _) => {
247 if let Some(source) = source {
248 Ok(get_referred_connection_ids_from_source(source))
249 } else {
250 Ok(HashSet::new())
251 }
252 }
253 StreamingJob::Sink(sink) => Ok(get_referred_connection_ids_from_sink(sink)),
254 StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()),
255 }
256 }
257
258 pub fn dependent_secret_ids(&self) -> MetaResult<HashSet<SecretId>> {
260 match self {
261 StreamingJob::Sink(sink) => Ok(get_referred_secret_ids_from_sink(sink)),
262 StreamingJob::Table(source, _, _) => {
263 if let Some(source) = source {
264 get_referred_secret_ids_from_source(source)
265 } else {
266 Ok(HashSet::new())
267 }
268 }
269 StreamingJob::Source(source) => get_referred_secret_ids_from_source(source),
270 StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()),
271 }
272 }
273
274 pub async fn verify_version_for_replace(&self, txn: &DatabaseTransaction) -> MetaResult<()> {
276 let id = self.id();
277
278 match self {
279 StreamingJob::Table(_source, table, _table_job_type) => {
280 let new_version = table.get_version()?.get_version();
281 let original_version: Option<TableVersion> =
282 TableModel::find_by_id(id.as_mv_table_id())
283 .select_only()
284 .column(table::Column::Version)
285 .into_tuple()
286 .one(txn)
287 .await?
288 .ok_or_else(|| MetaError::catalog_id_not_found(self.job_type_str(), id))?;
289 let original_version = original_version
290 .expect("version for table should exist")
291 .to_protobuf();
292 if new_version != original_version.version + 1 {
293 return Err(MetaError::permission_denied("table version is stale"));
294 }
295 }
296 StreamingJob::Source(source) => {
297 let new_version = source.get_version();
298 let original_version: Option<i64> =
299 SourceModel::find_by_id(id.as_shared_source_id())
300 .select_only()
301 .column(source::Column::Version)
302 .into_tuple()
303 .one(txn)
304 .await?
305 .ok_or_else(|| MetaError::catalog_id_not_found(self.job_type_str(), id))?;
306 let original_version = original_version.expect("version for source should exist");
307 if new_version != original_version as u64 + 1 {
308 return Err(MetaError::permission_denied("source version is stale"));
309 }
310 }
311 StreamingJob::MaterializedView(_) => {
312 }
315 StreamingJob::Sink(_) => {
316 }
318 StreamingJob::Index(_, _) => {
319 bail_not_implemented!("schema change for {}", self.job_type_str())
320 }
321 }
322 Ok(())
323 }
324
325 pub fn should_notify_creating(&self) -> bool {
327 self.is_materialized_view() || matches!(self.create_type(), CreateType::Background)
328 }
329
330 pub fn is_sink_into_table(&self) -> bool {
331 matches!(self, Self::Sink(sink) if sink.target_table.is_some())
332 }
333}