1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::util::iter_util::ZipEqDebug;
26use risingwave_common::util::stream_graph_visitor::{
27 visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_common::{bail, current_cluster_version};
30use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
31use risingwave_connector::error::ConnectorError;
32use risingwave_connector::sink::file_sink::fs::FsSink;
33use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
34use risingwave_connector::source::ConnectorProperties;
35use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
36use risingwave_meta_model::object::ObjectType;
37use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
38use risingwave_meta_model::refresh_job::RefreshState;
39use risingwave_meta_model::table::TableType;
40use risingwave_meta_model::user_privilege::Action;
41use risingwave_meta_model::*;
42use risingwave_pb::catalog::{PbCreateType, PbTable};
43use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
44use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
45use risingwave_pb::meta::object::PbObjectInfo;
46use risingwave_pb::meta::subscribe_response::{
47 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
48};
49use risingwave_pb::meta::{PbObject, PbObjectGroup};
50use risingwave_pb::plan_common::PbColumnCatalog;
51use risingwave_pb::plan_common::source_refresh_mode::{
52 RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
53};
54use risingwave_pb::secret::PbSecretRef;
55use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
56use risingwave_pb::stream_plan::stream_node::PbNodeBody;
57use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
58use risingwave_pb::user::PbUserInfo;
59use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
60use risingwave_sqlparser::parser::{Parser, ParserError};
61use sea_orm::ActiveValue::Set;
62use sea_orm::sea_query::{Expr, Query, SimpleExpr};
63use sea_orm::{
64 ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
65 ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
66};
67use thiserror_ext::AsReport;
68
69use super::rename::IndexItemRewriter;
70use crate::barrier::{Command, SharedFragmentInfo};
71use crate::controller::ObjectModel;
72use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
73use crate::controller::fragment::FragmentTypeMaskExt;
74use crate::controller::utils::{
75 PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
76 check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
77 ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
78 get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
79 list_user_info_by_ids, try_get_iceberg_table_by_downstream_sink,
80};
81use crate::error::MetaErrorInner;
82use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
83use crate::model::{
84 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
85 StreamJobFragmentsToCreate,
86};
87use crate::stream::SplitAssignment;
88use crate::{MetaError, MetaResult};
89
90impl CatalogController {
91 pub async fn create_streaming_job_obj(
92 txn: &DatabaseTransaction,
93 obj_type: ObjectType,
94 owner_id: UserId,
95 database_id: Option<DatabaseId>,
96 schema_id: Option<SchemaId>,
97 create_type: PbCreateType,
98 ctx: StreamContext,
99 streaming_parallelism: StreamingParallelism,
100 max_parallelism: usize,
101 specific_resource_group: Option<String>, ) -> MetaResult<JobId> {
103 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
104 let job_id = obj.oid.as_job_id();
105 let job = streaming_job::ActiveModel {
106 job_id: Set(job_id),
107 job_status: Set(JobStatus::Initial),
108 create_type: Set(create_type.into()),
109 timezone: Set(ctx.timezone),
110 config_override: Set(Some(ctx.config_override.to_string())),
111 parallelism: Set(streaming_parallelism),
112 max_parallelism: Set(max_parallelism as _),
113 specific_resource_group: Set(specific_resource_group),
114 };
115 job.insert(txn).await?;
116
117 Ok(job_id)
118 }
119
120 #[await_tree::instrument]
126 pub async fn create_job_catalog(
127 &self,
128 streaming_job: &mut StreamingJob,
129 ctx: &StreamContext,
130 parallelism: &Option<Parallelism>,
131 max_parallelism: usize,
132 mut dependencies: HashSet<ObjectId>,
133 specific_resource_group: Option<String>,
134 ) -> MetaResult<()> {
135 let inner = self.inner.write().await;
136 let txn = inner.db.begin().await?;
137 let create_type = streaming_job.create_type();
138
139 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
140 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
141 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
142 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
143 };
144
145 ensure_user_id(streaming_job.owner() as _, &txn).await?;
146 ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
147 ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
148 check_relation_name_duplicate(
149 &streaming_job.name(),
150 streaming_job.database_id(),
151 streaming_job.schema_id(),
152 &txn,
153 )
154 .await?;
155
156 if !dependencies.is_empty() {
158 let altering_cnt = ObjectDependency::find()
159 .join(
160 JoinType::InnerJoin,
161 object_dependency::Relation::Object1.def(),
162 )
163 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
164 .filter(
165 object_dependency::Column::Oid
166 .is_in(dependencies.clone())
167 .and(object::Column::ObjType.eq(ObjectType::Table))
168 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
169 .and(
170 object::Column::Oid.not_in_subquery(
172 Query::select()
173 .column(table::Column::TableId)
174 .from(Table)
175 .to_owned(),
176 ),
177 ),
178 )
179 .count(&txn)
180 .await?;
181 if altering_cnt != 0 {
182 return Err(MetaError::permission_denied(
183 "some dependent relations are being altered",
184 ));
185 }
186 }
187
188 match streaming_job {
189 StreamingJob::MaterializedView(table) => {
190 let job_id = Self::create_streaming_job_obj(
191 &txn,
192 ObjectType::Table,
193 table.owner as _,
194 Some(table.database_id),
195 Some(table.schema_id),
196 create_type,
197 ctx.clone(),
198 streaming_parallelism,
199 max_parallelism,
200 specific_resource_group,
201 )
202 .await?;
203 table.id = job_id.as_mv_table_id();
204 let table_model: table::ActiveModel = table.clone().into();
205 Table::insert(table_model).exec(&txn).await?;
206 }
207 StreamingJob::Sink(sink) => {
208 if let Some(target_table_id) = sink.target_table
209 && check_sink_into_table_cycle(
210 target_table_id.into(),
211 dependencies.iter().cloned().collect(),
212 &txn,
213 )
214 .await?
215 {
216 bail!("Creating such a sink will result in circular dependency.");
217 }
218
219 let job_id = Self::create_streaming_job_obj(
220 &txn,
221 ObjectType::Sink,
222 sink.owner as _,
223 Some(sink.database_id),
224 Some(sink.schema_id),
225 create_type,
226 ctx.clone(),
227 streaming_parallelism,
228 max_parallelism,
229 specific_resource_group,
230 )
231 .await?;
232 sink.id = job_id.as_sink_id();
233 let sink_model: sink::ActiveModel = sink.clone().into();
234 Sink::insert(sink_model).exec(&txn).await?;
235 }
236 StreamingJob::Table(src, table, _) => {
237 let job_id = Self::create_streaming_job_obj(
238 &txn,
239 ObjectType::Table,
240 table.owner as _,
241 Some(table.database_id),
242 Some(table.schema_id),
243 create_type,
244 ctx.clone(),
245 streaming_parallelism,
246 max_parallelism,
247 specific_resource_group,
248 )
249 .await?;
250 table.id = job_id.as_mv_table_id();
251 if let Some(src) = src {
252 let src_obj = Self::create_object(
253 &txn,
254 ObjectType::Source,
255 src.owner as _,
256 Some(src.database_id),
257 Some(src.schema_id),
258 )
259 .await?;
260 src.id = src_obj.oid.as_source_id();
261 src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
262 table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
263 let source: source::ActiveModel = src.clone().into();
264 Source::insert(source).exec(&txn).await?;
265 }
266 let table_model: table::ActiveModel = table.clone().into();
267 Table::insert(table_model).exec(&txn).await?;
268
269 if table.refreshable {
270 let trigger_interval_secs = src
271 .as_ref()
272 .and_then(|source_catalog| source_catalog.refresh_mode)
273 .and_then(
274 |source_refresh_mode| match source_refresh_mode.refresh_mode {
275 Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
276 refresh_interval_sec,
277 })) => refresh_interval_sec,
278 Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
279 None => None,
280 },
281 );
282
283 RefreshJob::insert(refresh_job::ActiveModel {
284 table_id: Set(table.id),
285 last_trigger_time: Set(None),
286 trigger_interval_secs: Set(trigger_interval_secs),
287 current_status: Set(RefreshState::Idle),
288 last_success_time: Set(None),
289 })
290 .exec(&txn)
291 .await?;
292 }
293 }
294 StreamingJob::Index(index, table) => {
295 ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
296 let job_id = Self::create_streaming_job_obj(
297 &txn,
298 ObjectType::Index,
299 index.owner as _,
300 Some(index.database_id),
301 Some(index.schema_id),
302 create_type,
303 ctx.clone(),
304 streaming_parallelism,
305 max_parallelism,
306 specific_resource_group,
307 )
308 .await?;
309 index.id = job_id.as_index_id();
311 index.index_table_id = job_id.as_mv_table_id();
312 table.id = job_id.as_mv_table_id();
313
314 ObjectDependency::insert(object_dependency::ActiveModel {
315 oid: Set(index.primary_table_id.into()),
316 used_by: Set(table.id.into()),
317 ..Default::default()
318 })
319 .exec(&txn)
320 .await?;
321
322 let table_model: table::ActiveModel = table.clone().into();
323 Table::insert(table_model).exec(&txn).await?;
324 let index_model: index::ActiveModel = index.clone().into();
325 Index::insert(index_model).exec(&txn).await?;
326 }
327 StreamingJob::Source(src) => {
328 let job_id = Self::create_streaming_job_obj(
329 &txn,
330 ObjectType::Source,
331 src.owner as _,
332 Some(src.database_id),
333 Some(src.schema_id),
334 create_type,
335 ctx.clone(),
336 streaming_parallelism,
337 max_parallelism,
338 specific_resource_group,
339 )
340 .await?;
341 src.id = job_id.as_shared_source_id();
342 let source_model: source::ActiveModel = src.clone().into();
343 Source::insert(source_model).exec(&txn).await?;
344 }
345 }
346
347 dependencies.extend(
349 streaming_job
350 .dependent_secret_ids()?
351 .into_iter()
352 .map(|id| id.as_object_id()),
353 );
354 dependencies.extend(
356 streaming_job
357 .dependent_connection_ids()?
358 .into_iter()
359 .map(|id| id.as_object_id()),
360 );
361
362 if !dependencies.is_empty() {
364 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
365 object_dependency::ActiveModel {
366 oid: Set(oid),
367 used_by: Set(streaming_job.id().as_object_id()),
368 ..Default::default()
369 }
370 }))
371 .exec(&txn)
372 .await?;
373 }
374
375 txn.commit().await?;
376
377 Ok(())
378 }
379
380 pub async fn create_internal_table_catalog(
388 &self,
389 job: &StreamingJob,
390 mut incomplete_internal_tables: Vec<PbTable>,
391 ) -> MetaResult<HashMap<TableId, TableId>> {
392 let job_id = job.id();
393 let inner = self.inner.write().await;
394 let txn = inner.db.begin().await?;
395
396 ensure_job_not_canceled(job_id, &txn).await?;
398
399 let mut table_id_map = HashMap::new();
400 for table in &mut incomplete_internal_tables {
401 let table_id = Self::create_object(
402 &txn,
403 ObjectType::Table,
404 table.owner as _,
405 Some(table.database_id),
406 Some(table.schema_id),
407 )
408 .await?
409 .oid
410 .as_table_id();
411 table_id_map.insert(table.id, table_id);
412 table.id = table_id;
413 table.job_id = Some(job_id);
414
415 let table_model = table::ActiveModel {
416 table_id: Set(table_id),
417 belongs_to_job_id: Set(Some(job_id)),
418 fragment_id: NotSet,
419 ..table.clone().into()
420 };
421 Table::insert(table_model).exec(&txn).await?;
422 }
423 txn.commit().await?;
424
425 Ok(table_id_map)
426 }
427
428 pub async fn prepare_stream_job_fragments(
429 &self,
430 stream_job_fragments: &StreamJobFragmentsToCreate,
431 streaming_job: &StreamingJob,
432 for_replace: bool,
433 ) -> MetaResult<()> {
434 self.prepare_streaming_job(
435 stream_job_fragments.stream_job_id(),
436 || stream_job_fragments.fragments.values(),
437 &stream_job_fragments.downstreams,
438 for_replace,
439 Some(streaming_job),
440 )
441 .await
442 }
443
444 #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
449 )]
450 pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
451 &self,
452 job_id: JobId,
453 get_fragments: impl Fn() -> I + 'a,
454 downstreams: &FragmentDownstreamRelation,
455 for_replace: bool,
456 creating_streaming_job: Option<&'a StreamingJob>,
457 ) -> MetaResult<()> {
458 let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
459
460 let inner = self.inner.write().await;
461
462 let need_notify = creating_streaming_job
463 .map(|job| job.should_notify_creating())
464 .unwrap_or(false);
465 let definition = creating_streaming_job.map(|job| job.definition());
466
467 let mut objects_to_notify = vec![];
468 let txn = inner.db.begin().await?;
469
470 ensure_job_not_canceled(job_id, &txn).await?;
472
473 for fragment in fragments {
474 let fragment_id = fragment.fragment_id;
475 let state_table_ids = fragment.state_table_ids.inner_ref().clone();
476
477 let fragment = fragment.into_active_model();
478 Fragment::insert(fragment).exec(&txn).await?;
479
480 if !for_replace {
483 let all_tables = StreamJobFragments::collect_tables(get_fragments());
484 for state_table_id in state_table_ids {
485 let table = all_tables
489 .get(&state_table_id)
490 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
491 assert_eq!(table.id, state_table_id);
492 assert_eq!(table.fragment_id, fragment_id);
493 let vnode_count = table.vnode_count();
494
495 table::ActiveModel {
496 table_id: Set(state_table_id as _),
497 fragment_id: Set(Some(fragment_id)),
498 vnode_count: Set(vnode_count as _),
499 ..Default::default()
500 }
501 .update(&txn)
502 .await?;
503
504 if need_notify {
505 let mut table = table.clone();
506 if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
508 table.definition = definition.clone().unwrap();
509 }
510 objects_to_notify.push(PbObject {
511 object_info: Some(PbObjectInfo::Table(table)),
512 });
513 }
514 }
515 }
516 }
517
518 if need_notify {
520 match creating_streaming_job.unwrap() {
521 StreamingJob::Table(Some(source), ..) => {
522 objects_to_notify.push(PbObject {
523 object_info: Some(PbObjectInfo::Source(source.clone())),
524 });
525 }
526 StreamingJob::Sink(sink) => {
527 objects_to_notify.push(PbObject {
528 object_info: Some(PbObjectInfo::Sink(sink.clone())),
529 });
530 }
531 StreamingJob::Index(index, _) => {
532 objects_to_notify.push(PbObject {
533 object_info: Some(PbObjectInfo::Index(index.clone())),
534 });
535 }
536 _ => {}
537 }
538 }
539
540 insert_fragment_relations(&txn, downstreams).await?;
541
542 if !for_replace {
543 if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
545 Table::update(table::ActiveModel {
546 table_id: Set(table.id),
547 dml_fragment_id: Set(table.dml_fragment_id),
548 ..Default::default()
549 })
550 .exec(&txn)
551 .await?;
552 }
553 }
554
555 txn.commit().await?;
556
557 if !objects_to_notify.is_empty() {
561 self.notify_frontend(
562 Operation::Add,
563 Info::ObjectGroup(PbObjectGroup {
564 objects: objects_to_notify,
565 }),
566 )
567 .await;
568 }
569
570 Ok(())
571 }
572
573 pub async fn build_cancel_command(
576 &self,
577 table_fragments: &StreamJobFragments,
578 ) -> MetaResult<Command> {
579 let inner = self.inner.read().await;
580 let txn = inner.db.begin().await?;
581
582 let dropped_sink_fragment_with_target =
583 if let Some(sink_fragment) = table_fragments.sink_fragment() {
584 let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
585 let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
586 sink_target_fragment
587 .get(&sink_fragment_id)
588 .map(|target_fragments| {
589 let target_fragment_id = *target_fragments
590 .first()
591 .expect("sink should have at least one downstream fragment");
592 (sink_fragment_id, target_fragment_id)
593 })
594 } else {
595 None
596 };
597
598 Ok(Command::DropStreamingJobs {
599 streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
600 actors: table_fragments.actor_ids().collect(),
601 unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
602 unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
603 dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
604 .into_iter()
605 .map(|(sink, target)| (target as _, vec![sink as _]))
606 .collect(),
607 })
608 }
609
610 #[await_tree::instrument]
614 pub async fn try_abort_creating_streaming_job(
615 &self,
616 mut job_id: JobId,
617 is_cancelled: bool,
618 ) -> MetaResult<(bool, Option<DatabaseId>)> {
619 let mut inner = self.inner.write().await;
620 let txn = inner.db.begin().await?;
621
622 let obj = Object::find_by_id(job_id).one(&txn).await?;
623 let Some(obj) = obj else {
624 tracing::warn!(
625 id = %job_id,
626 "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
627 );
628 return Ok((true, None));
629 };
630 let database_id = obj
631 .database_id
632 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
633 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
634
635 if !is_cancelled && let Some(streaming_job) = &streaming_job {
636 assert_ne!(streaming_job.job_status, JobStatus::Created);
637 if streaming_job.create_type == CreateType::Background
638 && streaming_job.job_status == JobStatus::Creating
639 {
640 if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
641 && check_if_belongs_to_iceberg_table(&txn, job_id).await?
642 {
643 } else {
645 tracing::warn!(
647 id = %job_id,
648 "streaming job is created in background and still in creating status"
649 );
650 return Ok((false, Some(database_id)));
651 }
652 }
653 }
654
655 let iceberg_table_id =
656 try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
657 if let Some(iceberg_table_id) = iceberg_table_id {
658 let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
661 Object::delete_many()
662 .filter(
663 object::Column::Oid
664 .eq(job_id)
665 .or(object::Column::Oid.is_in(internal_tables)),
666 )
667 .exec(&txn)
668 .await?;
669 job_id = iceberg_table_id.as_job_id();
670 };
671
672 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
673
674 let mut objs = vec![];
676 let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
677
678 let mut need_notify =
679 streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
680 if !need_notify {
681 if let Some(table) = &table_obj {
683 need_notify = table.table_type == TableType::MaterializedView;
684 }
685 }
686
687 if is_cancelled {
688 let dropped_tables = Table::find()
689 .find_also_related(Object)
690 .filter(
691 table::Column::TableId.is_in(
692 internal_table_ids
693 .iter()
694 .cloned()
695 .chain(table_obj.iter().map(|t| t.table_id as _)),
696 ),
697 )
698 .all(&txn)
699 .await?
700 .into_iter()
701 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
702 inner
703 .dropped_tables
704 .extend(dropped_tables.map(|t| (t.id, t)));
705 }
706
707 if need_notify {
708 let obj: Option<PartialObject> = Object::find_by_id(job_id)
709 .select_only()
710 .columns([
711 object::Column::Oid,
712 object::Column::ObjType,
713 object::Column::SchemaId,
714 object::Column::DatabaseId,
715 ])
716 .into_partial_model()
717 .one(&txn)
718 .await?;
719 let obj =
720 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
721 objs.push(obj);
722 let internal_table_objs: Vec<PartialObject> = Object::find()
723 .select_only()
724 .columns([
725 object::Column::Oid,
726 object::Column::ObjType,
727 object::Column::SchemaId,
728 object::Column::DatabaseId,
729 ])
730 .join(JoinType::InnerJoin, object::Relation::Table.def())
731 .filter(table::Column::BelongsToJobId.eq(job_id))
732 .into_partial_model()
733 .all(&txn)
734 .await?;
735 objs.extend(internal_table_objs);
736 }
737
738 if table_obj.is_none()
740 && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_sink_id())
741 .select_only()
742 .column(sink::Column::TargetTable)
743 .into_tuple::<Option<TableId>>()
744 .one(&txn)
745 .await?
746 {
747 let tmp_id: Option<ObjectId> = ObjectDependency::find()
748 .select_only()
749 .column(object_dependency::Column::UsedBy)
750 .join(
751 JoinType::InnerJoin,
752 object_dependency::Relation::Object1.def(),
753 )
754 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
755 .filter(
756 object_dependency::Column::Oid
757 .eq(target_table_id)
758 .and(object::Column::ObjType.eq(ObjectType::Table))
759 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
760 )
761 .into_tuple()
762 .one(&txn)
763 .await?;
764 if let Some(tmp_id) = tmp_id {
765 tracing::warn!(
766 id = %tmp_id,
767 "aborting temp streaming job for sink into table"
768 );
769
770 Object::delete_by_id(tmp_id).exec(&txn).await?;
771 }
772 }
773
774 Object::delete_by_id(job_id).exec(&txn).await?;
775 if !internal_table_ids.is_empty() {
776 Object::delete_many()
777 .filter(object::Column::Oid.is_in(internal_table_ids))
778 .exec(&txn)
779 .await?;
780 }
781 if let Some(t) = &table_obj
782 && let Some(source_id) = t.optional_associated_source_id
783 {
784 Object::delete_by_id(source_id).exec(&txn).await?;
785 }
786
787 let err = if is_cancelled {
788 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
789 } else {
790 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
791 };
792 let abort_reason = format!("streaming job aborted {}", err.as_report());
793 for tx in inner
794 .creating_table_finish_notifier
795 .get_mut(&database_id)
796 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
797 .into_iter()
798 .flatten()
799 .flatten()
800 {
801 let _ = tx.send(Err(abort_reason.clone()));
802 }
803 txn.commit().await?;
804
805 if !objs.is_empty() {
806 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
809 .await;
810 }
811 Ok((true, Some(database_id)))
812 }
813
814 #[await_tree::instrument]
815 pub async fn post_collect_job_fragments(
816 &self,
817 job_id: JobId,
818 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
819 new_sink_downstream: Option<FragmentDownstreamRelation>,
820 split_assignment: Option<&SplitAssignment>,
821 ) -> MetaResult<()> {
822 let inner = self.inner.write().await;
823 let txn = inner.db.begin().await?;
824
825 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
826
827 if let Some(new_downstream) = new_sink_downstream {
828 insert_fragment_relations(&txn, &new_downstream).await?;
829 }
830
831 streaming_job::ActiveModel {
833 job_id: Set(job_id),
834 job_status: Set(JobStatus::Creating),
835 ..Default::default()
836 }
837 .update(&txn)
838 .await?;
839
840 if let Some(split_assignment) = split_assignment {
841 let fragment_splits = split_assignment
842 .iter()
843 .map(|(fragment_id, splits)| {
844 (
845 *fragment_id as _,
846 splits.values().flatten().cloned().collect_vec(),
847 )
848 })
849 .collect();
850
851 self.update_fragment_splits(&txn, &fragment_splits).await?;
852 }
853
854 txn.commit().await?;
855
856 Ok(())
857 }
858
859 pub async fn create_job_catalog_for_replace(
860 &self,
861 streaming_job: &StreamingJob,
862 ctx: Option<&StreamContext>,
863 specified_parallelism: Option<&NonZeroUsize>,
864 expected_original_max_parallelism: Option<usize>,
865 ) -> MetaResult<JobId> {
866 let id = streaming_job.id();
867 let inner = self.inner.write().await;
868 let txn = inner.db.begin().await?;
869
870 streaming_job.verify_version_for_replace(&txn).await?;
872 let referring_cnt = ObjectDependency::find()
874 .join(
875 JoinType::InnerJoin,
876 object_dependency::Relation::Object1.def(),
877 )
878 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
879 .filter(
880 object_dependency::Column::Oid
881 .eq(id)
882 .and(object::Column::ObjType.eq(ObjectType::Table))
883 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
884 )
885 .count(&txn)
886 .await?;
887 if referring_cnt != 0 {
888 return Err(MetaError::permission_denied(
889 "job is being altered or referenced by some creating jobs",
890 ));
891 }
892
893 let (original_max_parallelism, original_timezone, original_config_override): (
895 i32,
896 Option<String>,
897 Option<String>,
898 ) = StreamingJobModel::find_by_id(id)
899 .select_only()
900 .column(streaming_job::Column::MaxParallelism)
901 .column(streaming_job::Column::Timezone)
902 .column(streaming_job::Column::ConfigOverride)
903 .into_tuple()
904 .one(&txn)
905 .await?
906 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
907
908 if let Some(max_parallelism) = expected_original_max_parallelism
909 && original_max_parallelism != max_parallelism as i32
910 {
911 bail!(
914 "cannot use a different max parallelism \
915 when replacing streaming job, \
916 original: {}, new: {}",
917 original_max_parallelism,
918 max_parallelism
919 );
920 }
921
922 let parallelism = match specified_parallelism {
923 None => StreamingParallelism::Adaptive,
924 Some(n) => StreamingParallelism::Fixed(n.get() as _),
925 };
926
927 let ctx = StreamContext {
928 timezone: ctx
929 .map(|ctx| ctx.timezone.clone())
930 .unwrap_or(original_timezone),
931 config_override: original_config_override.unwrap_or_default().into(),
934 };
935
936 let new_obj_id = Self::create_streaming_job_obj(
938 &txn,
939 streaming_job.object_type(),
940 streaming_job.owner() as _,
941 Some(streaming_job.database_id() as _),
942 Some(streaming_job.schema_id() as _),
943 streaming_job.create_type(),
944 ctx,
945 parallelism,
946 original_max_parallelism as _,
947 None,
948 )
949 .await?;
950
951 ObjectDependency::insert(object_dependency::ActiveModel {
953 oid: Set(id.as_object_id()),
954 used_by: Set(new_obj_id.as_object_id()),
955 ..Default::default()
956 })
957 .exec(&txn)
958 .await?;
959
960 txn.commit().await?;
961
962 Ok(new_obj_id)
963 }
964
965 pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
967 let mut inner = self.inner.write().await;
968 let txn = inner.db.begin().await?;
969
970 if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
972 tracing::info!(
973 "streaming job {} is for iceberg table, wait for manual finish operation",
974 job_id
975 );
976 return Ok(());
977 }
978
979 let (notification_op, objects, updated_user_info) =
980 Self::finish_streaming_job_inner(&txn, job_id).await?;
981
982 txn.commit().await?;
983
984 let mut version = self
985 .notify_frontend(
986 notification_op,
987 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
988 )
989 .await;
990
991 if !updated_user_info.is_empty() {
993 version = self.notify_users_update(updated_user_info).await;
994 }
995
996 inner
997 .creating_table_finish_notifier
998 .values_mut()
999 .for_each(|creating_tables| {
1000 if let Some(txs) = creating_tables.remove(&job_id) {
1001 for tx in txs {
1002 let _ = tx.send(Ok(version));
1003 }
1004 }
1005 });
1006
1007 Ok(())
1008 }
1009
1010 pub async fn finish_streaming_job_inner(
1012 txn: &DatabaseTransaction,
1013 job_id: JobId,
1014 ) -> MetaResult<(Operation, Vec<risingwave_pb::meta::Object>, Vec<PbUserInfo>)> {
1015 let job_type = Object::find_by_id(job_id)
1016 .select_only()
1017 .column(object::Column::ObjType)
1018 .into_tuple()
1019 .one(txn)
1020 .await?
1021 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1022
1023 let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1024 .select_only()
1025 .column(streaming_job::Column::CreateType)
1026 .into_tuple()
1027 .one(txn)
1028 .await?
1029 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1030
1031 let res = Object::update_many()
1033 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1034 .col_expr(
1035 object::Column::CreatedAtClusterVersion,
1036 current_cluster_version().into(),
1037 )
1038 .filter(object::Column::Oid.eq(job_id))
1039 .exec(txn)
1040 .await?;
1041 if res.rows_affected == 0 {
1042 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1043 }
1044
1045 let job = streaming_job::ActiveModel {
1047 job_id: Set(job_id),
1048 job_status: Set(JobStatus::Created),
1049 ..Default::default()
1050 };
1051 job.update(txn).await?;
1052
1053 let internal_table_objs = Table::find()
1055 .find_also_related(Object)
1056 .filter(table::Column::BelongsToJobId.eq(job_id))
1057 .all(txn)
1058 .await?;
1059 let mut objects = internal_table_objs
1060 .iter()
1061 .map(|(table, obj)| PbObject {
1062 object_info: Some(PbObjectInfo::Table(
1063 ObjectModel(table.clone(), obj.clone().unwrap()).into(),
1064 )),
1065 })
1066 .collect_vec();
1067 let mut notification_op = if create_type == CreateType::Background {
1068 NotificationOperation::Update
1069 } else {
1070 NotificationOperation::Add
1071 };
1072 let mut updated_user_info = vec![];
1073
1074 match job_type {
1075 ObjectType::Table => {
1076 let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1077 .find_also_related(Object)
1078 .one(txn)
1079 .await?
1080 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1081 if table.table_type == TableType::MaterializedView {
1082 notification_op = NotificationOperation::Update;
1083 }
1084
1085 if let Some(source_id) = table.optional_associated_source_id {
1086 let (src, obj) = Source::find_by_id(source_id)
1087 .find_also_related(Object)
1088 .one(txn)
1089 .await?
1090 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1091 objects.push(PbObject {
1092 object_info: Some(PbObjectInfo::Source(
1093 ObjectModel(src, obj.unwrap()).into(),
1094 )),
1095 });
1096 }
1097 objects.push(PbObject {
1098 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1099 });
1100 }
1101 ObjectType::Sink => {
1102 let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1103 .find_also_related(Object)
1104 .one(txn)
1105 .await?
1106 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1107 objects.push(PbObject {
1108 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
1109 });
1110 }
1111 ObjectType::Index => {
1112 let (index, obj) = Index::find_by_id(job_id.as_index_id())
1113 .find_also_related(Object)
1114 .one(txn)
1115 .await?
1116 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1117 {
1118 let (table, obj) = Table::find_by_id(index.index_table_id)
1119 .find_also_related(Object)
1120 .one(txn)
1121 .await?
1122 .ok_or_else(|| {
1123 MetaError::catalog_id_not_found("table", index.index_table_id)
1124 })?;
1125 objects.push(PbObject {
1126 object_info: Some(PbObjectInfo::Table(
1127 ObjectModel(table, obj.unwrap()).into(),
1128 )),
1129 });
1130 }
1131
1132 let primary_table_privileges = UserPrivilege::find()
1135 .filter(
1136 user_privilege::Column::Oid
1137 .eq(index.primary_table_id)
1138 .and(user_privilege::Column::Action.eq(Action::Select)),
1139 )
1140 .all(txn)
1141 .await?;
1142 if !primary_table_privileges.is_empty() {
1143 let index_state_table_ids: Vec<TableId> = Table::find()
1144 .select_only()
1145 .column(table::Column::TableId)
1146 .filter(
1147 table::Column::BelongsToJobId
1148 .eq(job_id)
1149 .or(table::Column::TableId.eq(index.index_table_id)),
1150 )
1151 .into_tuple()
1152 .all(txn)
1153 .await?;
1154 let mut new_privileges = vec![];
1155 for privilege in &primary_table_privileges {
1156 for state_table_id in &index_state_table_ids {
1157 new_privileges.push(user_privilege::ActiveModel {
1158 id: Default::default(),
1159 oid: Set(state_table_id.as_object_id()),
1160 user_id: Set(privilege.user_id),
1161 action: Set(Action::Select),
1162 dependent_id: Set(privilege.dependent_id),
1163 granted_by: Set(privilege.granted_by),
1164 with_grant_option: Set(privilege.with_grant_option),
1165 });
1166 }
1167 }
1168 UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1169
1170 updated_user_info = list_user_info_by_ids(
1171 primary_table_privileges.into_iter().map(|p| p.user_id),
1172 txn,
1173 )
1174 .await?;
1175 }
1176
1177 objects.push(PbObject {
1178 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1179 });
1180 }
1181 ObjectType::Source => {
1182 let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1183 .find_also_related(Object)
1184 .one(txn)
1185 .await?
1186 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1187 objects.push(PbObject {
1188 object_info: Some(PbObjectInfo::Source(
1189 ObjectModel(source, obj.unwrap()).into(),
1190 )),
1191 });
1192 }
1193 _ => unreachable!("invalid job type: {:?}", job_type),
1194 }
1195
1196 if job_type != ObjectType::Index {
1197 updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1198 }
1199
1200 Ok((notification_op, objects, updated_user_info))
1201 }
1202
1203 pub async fn finish_replace_streaming_job(
1204 &self,
1205 tmp_id: JobId,
1206 streaming_job: StreamingJob,
1207 replace_upstream: FragmentReplaceUpstream,
1208 sink_into_table_context: SinkIntoTableContext,
1209 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1210 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1211 ) -> MetaResult<NotificationVersion> {
1212 let inner = self.inner.write().await;
1213 let txn = inner.db.begin().await?;
1214
1215 let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1216 tmp_id,
1217 replace_upstream,
1218 sink_into_table_context,
1219 &txn,
1220 streaming_job,
1221 drop_table_connector_ctx,
1222 auto_refresh_schema_sinks,
1223 )
1224 .await?;
1225
1226 txn.commit().await?;
1227
1228 let mut version = self
1229 .notify_frontend(
1230 NotificationOperation::Update,
1231 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1232 )
1233 .await;
1234
1235 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1236 self.notify_users_update(user_infos).await;
1237 version = self
1238 .notify_frontend(
1239 NotificationOperation::Delete,
1240 build_object_group_for_delete(to_drop_objects),
1241 )
1242 .await;
1243 }
1244
1245 Ok(version)
1246 }
1247
1248 pub async fn finish_replace_streaming_job_inner(
1249 tmp_id: JobId,
1250 replace_upstream: FragmentReplaceUpstream,
1251 SinkIntoTableContext {
1252 updated_sink_catalogs,
1253 }: SinkIntoTableContext,
1254 txn: &DatabaseTransaction,
1255 streaming_job: StreamingJob,
1256 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1257 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1258 ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1259 let original_job_id = streaming_job.id();
1260 let job_type = streaming_job.job_type();
1261
1262 let mut index_item_rewriter = None;
1263
1264 match streaming_job {
1266 StreamingJob::Table(_source, table, _table_job_type) => {
1267 let original_column_catalogs =
1270 get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1271
1272 index_item_rewriter = Some({
1273 let original_columns = original_column_catalogs
1274 .to_protobuf()
1275 .into_iter()
1276 .map(|c| c.column_desc.unwrap())
1277 .collect_vec();
1278 let new_columns = table
1279 .columns
1280 .iter()
1281 .map(|c| c.column_desc.clone().unwrap())
1282 .collect_vec();
1283
1284 IndexItemRewriter {
1285 original_columns,
1286 new_columns,
1287 }
1288 });
1289
1290 for sink_id in updated_sink_catalogs {
1292 sink::ActiveModel {
1293 sink_id: Set(sink_id as _),
1294 original_target_columns: Set(Some(original_column_catalogs.clone())),
1295 ..Default::default()
1296 }
1297 .update(txn)
1298 .await?;
1299 }
1300 let mut table = table::ActiveModel::from(table);
1302 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1303 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1304 {
1305 table.optional_associated_source_id = Set(None);
1307 }
1308
1309 table.update(txn).await?;
1310 }
1311 StreamingJob::Source(source) => {
1312 let source = source::ActiveModel::from(source);
1314 source.update(txn).await?;
1315 }
1316 StreamingJob::MaterializedView(table) => {
1317 let table = table::ActiveModel::from(table);
1319 table.update(txn).await?;
1320 }
1321 _ => unreachable!(
1322 "invalid streaming job type: {:?}",
1323 streaming_job.job_type_str()
1324 ),
1325 }
1326
1327 async fn finish_fragments(
1328 txn: &DatabaseTransaction,
1329 tmp_id: JobId,
1330 original_job_id: JobId,
1331 replace_upstream: FragmentReplaceUpstream,
1332 ) -> MetaResult<()> {
1333 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1337 .select_only()
1338 .columns([
1339 fragment::Column::FragmentId,
1340 fragment::Column::StateTableIds,
1341 ])
1342 .filter(fragment::Column::JobId.eq(tmp_id))
1343 .into_tuple()
1344 .all(txn)
1345 .await?;
1346 for (fragment_id, state_table_ids) in fragment_info {
1347 for state_table_id in state_table_ids.into_inner() {
1348 let state_table_id = TableId::new(state_table_id as _);
1349 table::ActiveModel {
1350 table_id: Set(state_table_id),
1351 fragment_id: Set(Some(fragment_id)),
1352 ..Default::default()
1354 }
1355 .update(txn)
1356 .await?;
1357 }
1358 }
1359
1360 Fragment::delete_many()
1362 .filter(fragment::Column::JobId.eq(original_job_id))
1363 .exec(txn)
1364 .await?;
1365 Fragment::update_many()
1366 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1367 .filter(fragment::Column::JobId.eq(tmp_id))
1368 .exec(txn)
1369 .await?;
1370
1371 for (fragment_id, fragment_replace_map) in replace_upstream {
1374 let (fragment_id, mut stream_node) =
1375 Fragment::find_by_id(fragment_id as FragmentId)
1376 .select_only()
1377 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1378 .into_tuple::<(FragmentId, StreamNode)>()
1379 .one(txn)
1380 .await?
1381 .map(|(id, node)| (id, node.to_protobuf()))
1382 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1383
1384 visit_stream_node_mut(&mut stream_node, |body| {
1385 if let PbNodeBody::Merge(m) = body
1386 && let Some(new_fragment_id) =
1387 fragment_replace_map.get(&m.upstream_fragment_id)
1388 {
1389 m.upstream_fragment_id = *new_fragment_id;
1390 }
1391 });
1392 fragment::ActiveModel {
1393 fragment_id: Set(fragment_id),
1394 stream_node: Set(StreamNode::from(&stream_node)),
1395 ..Default::default()
1396 }
1397 .update(txn)
1398 .await?;
1399 }
1400
1401 Object::delete_by_id(tmp_id).exec(txn).await?;
1403
1404 Ok(())
1405 }
1406
1407 finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1408
1409 let mut objects = vec![];
1411 match job_type {
1412 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1413 let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1414 .find_also_related(Object)
1415 .one(txn)
1416 .await?
1417 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1418 objects.push(PbObject {
1419 object_info: Some(PbObjectInfo::Table(
1420 ObjectModel(table, table_obj.unwrap()).into(),
1421 )),
1422 })
1423 }
1424 StreamingJobType::Source => {
1425 let (source, source_obj) =
1426 Source::find_by_id(original_job_id.as_shared_source_id())
1427 .find_also_related(Object)
1428 .one(txn)
1429 .await?
1430 .ok_or_else(|| {
1431 MetaError::catalog_id_not_found("object", original_job_id)
1432 })?;
1433 objects.push(PbObject {
1434 object_info: Some(PbObjectInfo::Source(
1435 ObjectModel(source, source_obj.unwrap()).into(),
1436 )),
1437 })
1438 }
1439 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1440 }
1441
1442 if let Some(expr_rewriter) = index_item_rewriter {
1443 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1444 .select_only()
1445 .columns([index::Column::IndexId, index::Column::IndexItems])
1446 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1447 .into_tuple()
1448 .all(txn)
1449 .await?;
1450 for (index_id, nodes) in index_items {
1451 let mut pb_nodes = nodes.to_protobuf();
1452 pb_nodes
1453 .iter_mut()
1454 .for_each(|x| expr_rewriter.rewrite_expr(x));
1455 let index = index::ActiveModel {
1456 index_id: Set(index_id),
1457 index_items: Set(pb_nodes.into()),
1458 ..Default::default()
1459 }
1460 .update(txn)
1461 .await?;
1462 let index_obj = index
1463 .find_related(Object)
1464 .one(txn)
1465 .await?
1466 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1467 objects.push(PbObject {
1468 object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1469 });
1470 }
1471 }
1472
1473 if let Some(sinks) = auto_refresh_schema_sinks {
1474 for finish_sink_context in sinks {
1475 finish_fragments(
1476 txn,
1477 finish_sink_context.tmp_sink_id.as_job_id(),
1478 finish_sink_context.original_sink_id.as_job_id(),
1479 Default::default(),
1480 )
1481 .await?;
1482 let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1483 .find_also_related(Object)
1484 .one(txn)
1485 .await?
1486 .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1487 let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1488 Sink::update(sink::ActiveModel {
1489 sink_id: Set(finish_sink_context.original_sink_id),
1490 columns: Set(columns.clone()),
1491 ..Default::default()
1492 })
1493 .exec(txn)
1494 .await?;
1495 sink.columns = columns;
1496 objects.push(PbObject {
1497 object_info: Some(PbObjectInfo::Sink(
1498 ObjectModel(sink, sink_obj.unwrap()).into(),
1499 )),
1500 });
1501 if let Some((log_store_table_id, new_log_store_table_columns)) =
1502 finish_sink_context.new_log_store_table
1503 {
1504 let new_log_store_table_columns: ColumnCatalogArray =
1505 new_log_store_table_columns.into();
1506 let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1507 .find_also_related(Object)
1508 .one(txn)
1509 .await?
1510 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1511 Table::update(table::ActiveModel {
1512 table_id: Set(log_store_table_id),
1513 columns: Set(new_log_store_table_columns.clone()),
1514 ..Default::default()
1515 })
1516 .exec(txn)
1517 .await?;
1518 table.columns = new_log_store_table_columns;
1519 objects.push(PbObject {
1520 object_info: Some(PbObjectInfo::Table(
1521 ObjectModel(table, table_obj.unwrap()).into(),
1522 )),
1523 });
1524 }
1525 }
1526 }
1527
1528 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1529 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1530 notification_objs =
1531 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1532 }
1533
1534 Ok((objects, notification_objs))
1535 }
1536
1537 pub async fn try_abort_replacing_streaming_job(
1539 &self,
1540 tmp_job_id: JobId,
1541 tmp_sink_ids: Option<Vec<ObjectId>>,
1542 ) -> MetaResult<()> {
1543 let inner = self.inner.write().await;
1544 let txn = inner.db.begin().await?;
1545 Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1546 if let Some(tmp_sink_ids) = tmp_sink_ids {
1547 for tmp_sink_id in tmp_sink_ids {
1548 Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1549 }
1550 }
1551 txn.commit().await?;
1552 Ok(())
1553 }
1554
1555 pub async fn update_source_rate_limit_by_source_id(
1558 &self,
1559 source_id: SourceId,
1560 rate_limit: Option<u32>,
1561 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1562 let inner = self.inner.read().await;
1563 let txn = inner.db.begin().await?;
1564
1565 {
1566 let active_source = source::ActiveModel {
1567 source_id: Set(source_id),
1568 rate_limit: Set(rate_limit.map(|v| v as i32)),
1569 ..Default::default()
1570 };
1571 active_source.update(&txn).await?;
1572 }
1573
1574 let (source, obj) = Source::find_by_id(source_id)
1575 .find_also_related(Object)
1576 .one(&txn)
1577 .await?
1578 .ok_or_else(|| {
1579 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1580 })?;
1581
1582 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1583 let streaming_job_ids: Vec<JobId> =
1584 if let Some(table_id) = source.optional_associated_table_id {
1585 vec![table_id.as_job_id()]
1586 } else if let Some(source_info) = &source.source_info
1587 && source_info.to_protobuf().is_shared()
1588 {
1589 vec![source_id.as_share_source_job_id()]
1590 } else {
1591 ObjectDependency::find()
1592 .select_only()
1593 .column(object_dependency::Column::UsedBy)
1594 .filter(object_dependency::Column::Oid.eq(source_id))
1595 .into_tuple()
1596 .all(&txn)
1597 .await?
1598 };
1599
1600 if streaming_job_ids.is_empty() {
1601 return Err(MetaError::invalid_parameter(format!(
1602 "source id {source_id} not used by any streaming job"
1603 )));
1604 }
1605
1606 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1607 .select_only()
1608 .columns([
1609 fragment::Column::FragmentId,
1610 fragment::Column::FragmentTypeMask,
1611 fragment::Column::StreamNode,
1612 ])
1613 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1614 .into_tuple()
1615 .all(&txn)
1616 .await?;
1617 let mut fragments = fragments
1618 .into_iter()
1619 .map(|(id, mask, stream_node)| {
1620 (
1621 id,
1622 FragmentTypeMask::from(mask as u32),
1623 stream_node.to_protobuf(),
1624 )
1625 })
1626 .collect_vec();
1627
1628 fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1629 let mut found = false;
1630 if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1631 visit_stream_node_mut(stream_node, |node| {
1632 if let PbNodeBody::Source(node) = node
1633 && let Some(node_inner) = &mut node.source_inner
1634 && node_inner.source_id == source_id
1635 {
1636 node_inner.rate_limit = rate_limit;
1637 found = true;
1638 }
1639 });
1640 }
1641 if is_fs_source {
1642 visit_stream_node_mut(stream_node, |node| {
1645 if let PbNodeBody::StreamFsFetch(node) = node {
1646 fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1647 if let Some(node_inner) = &mut node.node_inner
1648 && node_inner.source_id == source_id
1649 {
1650 node_inner.rate_limit = rate_limit;
1651 found = true;
1652 }
1653 }
1654 });
1655 }
1656 found
1657 });
1658
1659 assert!(
1660 !fragments.is_empty(),
1661 "source id should be used by at least one fragment"
1662 );
1663 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1664
1665 for (id, fragment_type_mask, stream_node) in fragments {
1666 fragment::ActiveModel {
1667 fragment_id: Set(id),
1668 fragment_type_mask: Set(fragment_type_mask.into()),
1669 stream_node: Set(StreamNode::from(&stream_node)),
1670 ..Default::default()
1671 }
1672 .update(&txn)
1673 .await?;
1674 }
1675
1676 txn.commit().await?;
1677
1678 let fragment_actors = self.get_fragment_actors_from_running_info(fragment_ids.into_iter());
1679
1680 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1681 let _version = self
1682 .notify_frontend(
1683 NotificationOperation::Update,
1684 NotificationInfo::ObjectGroup(PbObjectGroup {
1685 objects: vec![PbObject {
1686 object_info: Some(relation_info),
1687 }],
1688 }),
1689 )
1690 .await;
1691
1692 Ok(fragment_actors)
1693 }
1694
1695 fn get_fragment_actors_from_running_info(
1696 &self,
1697 fragment_ids: impl Iterator<Item = FragmentId>,
1698 ) -> HashMap<FragmentId, Vec<ActorId>> {
1699 let mut fragment_actors: HashMap<FragmentId, Vec<ActorId>> = HashMap::new();
1700
1701 let info = self.env.shared_actor_infos().read_guard();
1702
1703 for fragment_id in fragment_ids {
1704 let SharedFragmentInfo { actors, .. } = info.get_fragment(fragment_id).unwrap();
1705 fragment_actors
1706 .entry(fragment_id as _)
1707 .or_default()
1708 .extend(actors.keys().copied());
1709 }
1710
1711 fragment_actors
1712 }
1713
1714 pub async fn mutate_fragments_by_job_id(
1717 &self,
1718 job_id: JobId,
1719 mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1721 err_msg: &'static str,
1723 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1724 let inner = self.inner.read().await;
1725 let txn = inner.db.begin().await?;
1726
1727 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1728 .select_only()
1729 .columns([
1730 fragment::Column::FragmentId,
1731 fragment::Column::FragmentTypeMask,
1732 fragment::Column::StreamNode,
1733 ])
1734 .filter(fragment::Column::JobId.eq(job_id))
1735 .into_tuple()
1736 .all(&txn)
1737 .await?;
1738 let mut fragments = fragments
1739 .into_iter()
1740 .map(|(id, mask, stream_node)| {
1741 (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1742 })
1743 .collect_vec();
1744
1745 let fragments = fragments
1746 .iter_mut()
1747 .map(|(_, fragment_type_mask, stream_node)| {
1748 fragments_mutation_fn(*fragment_type_mask, stream_node)
1749 })
1750 .collect::<MetaResult<Vec<bool>>>()?
1751 .into_iter()
1752 .zip_eq_debug(std::mem::take(&mut fragments))
1753 .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1754 .collect::<Vec<_>>();
1755
1756 if fragments.is_empty() {
1757 return Err(MetaError::invalid_parameter(format!(
1758 "job id {job_id}: {}",
1759 err_msg
1760 )));
1761 }
1762
1763 let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
1764 for (id, _, stream_node) in fragments {
1765 fragment::ActiveModel {
1766 fragment_id: Set(id),
1767 stream_node: Set(StreamNode::from(&stream_node)),
1768 ..Default::default()
1769 }
1770 .update(&txn)
1771 .await?;
1772 }
1773
1774 txn.commit().await?;
1775
1776 let fragment_actors =
1777 self.get_fragment_actors_from_running_info(fragment_ids.iter().copied());
1778
1779 Ok(fragment_actors)
1780 }
1781
1782 async fn mutate_fragment_by_fragment_id(
1783 &self,
1784 fragment_id: FragmentId,
1785 mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1786 err_msg: &'static str,
1787 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1788 let inner = self.inner.read().await;
1789 let txn = inner.db.begin().await?;
1790
1791 let (fragment_type_mask, stream_node): (i32, StreamNode) =
1792 Fragment::find_by_id(fragment_id)
1793 .select_only()
1794 .columns([
1795 fragment::Column::FragmentTypeMask,
1796 fragment::Column::StreamNode,
1797 ])
1798 .into_tuple()
1799 .one(&txn)
1800 .await?
1801 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1802 let mut pb_stream_node = stream_node.to_protobuf();
1803 let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1804
1805 if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1806 return Err(MetaError::invalid_parameter(format!(
1807 "fragment id {fragment_id}: {}",
1808 err_msg
1809 )));
1810 }
1811
1812 fragment::ActiveModel {
1813 fragment_id: Set(fragment_id),
1814 stream_node: Set(stream_node),
1815 ..Default::default()
1816 }
1817 .update(&txn)
1818 .await?;
1819
1820 let fragment_actors =
1821 self.get_fragment_actors_from_running_info(std::iter::once(fragment_id));
1822
1823 txn.commit().await?;
1824
1825 Ok(fragment_actors)
1826 }
1827
1828 pub async fn update_backfill_rate_limit_by_job_id(
1831 &self,
1832 job_id: JobId,
1833 rate_limit: Option<u32>,
1834 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1835 let update_backfill_rate_limit =
1836 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1837 let mut found = false;
1838 if fragment_type_mask
1839 .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1840 {
1841 visit_stream_node_mut(stream_node, |node| match node {
1842 PbNodeBody::StreamCdcScan(node) => {
1843 node.rate_limit = rate_limit;
1844 found = true;
1845 }
1846 PbNodeBody::StreamScan(node) => {
1847 node.rate_limit = rate_limit;
1848 found = true;
1849 }
1850 PbNodeBody::SourceBackfill(node) => {
1851 node.rate_limit = rate_limit;
1852 found = true;
1853 }
1854 PbNodeBody::Sink(node) => {
1855 node.rate_limit = rate_limit;
1856 found = true;
1857 }
1858 _ => {}
1859 });
1860 }
1861 Ok(found)
1862 };
1863
1864 self.mutate_fragments_by_job_id(
1865 job_id,
1866 update_backfill_rate_limit,
1867 "stream scan node or source node not found",
1868 )
1869 .await
1870 }
1871
1872 pub async fn update_sink_rate_limit_by_job_id(
1875 &self,
1876 sink_id: SinkId,
1877 rate_limit: Option<u32>,
1878 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1879 let update_sink_rate_limit =
1880 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1881 let mut found = Ok(false);
1882 if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1883 visit_stream_node_mut(stream_node, |node| {
1884 if let PbNodeBody::Sink(node) = node {
1885 if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1886 found = Err(MetaError::invalid_parameter(
1887 "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1888 ));
1889 return;
1890 }
1891 node.rate_limit = rate_limit;
1892 found = Ok(true);
1893 }
1894 });
1895 }
1896 found
1897 };
1898
1899 self.mutate_fragments_by_job_id(
1900 sink_id.as_job_id(),
1901 update_sink_rate_limit,
1902 "sink node not found",
1903 )
1904 .await
1905 }
1906
1907 pub async fn update_dml_rate_limit_by_job_id(
1908 &self,
1909 job_id: JobId,
1910 rate_limit: Option<u32>,
1911 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1912 let update_dml_rate_limit =
1913 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1914 let mut found = false;
1915 if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1916 visit_stream_node_mut(stream_node, |node| {
1917 if let PbNodeBody::Dml(node) = node {
1918 node.rate_limit = rate_limit;
1919 found = true;
1920 }
1921 });
1922 }
1923 Ok(found)
1924 };
1925
1926 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1927 .await
1928 }
1929
1930 pub async fn update_source_props_by_source_id(
1931 &self,
1932 source_id: SourceId,
1933 alter_props: BTreeMap<String, String>,
1934 alter_secret_refs: BTreeMap<String, PbSecretRef>,
1935 ) -> MetaResult<WithOptionsSecResolved> {
1936 let inner = self.inner.read().await;
1937 let txn = inner.db.begin().await?;
1938
1939 let (source, _obj) = Source::find_by_id(source_id)
1940 .find_also_related(Object)
1941 .one(&txn)
1942 .await?
1943 .ok_or_else(|| {
1944 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1945 })?;
1946 let connector = source.with_properties.0.get_connector().unwrap();
1947 let is_shared_source = source.is_shared();
1948
1949 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
1950 if !is_shared_source {
1951 dep_source_job_ids = ObjectDependency::find()
1953 .select_only()
1954 .column(object_dependency::Column::UsedBy)
1955 .filter(object_dependency::Column::Oid.eq(source_id))
1956 .into_tuple()
1957 .all(&txn)
1958 .await?;
1959 }
1960
1961 let prop_keys: Vec<String> = alter_props
1963 .keys()
1964 .chain(alter_secret_refs.keys())
1965 .cloned()
1966 .collect();
1967 risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
1968 &connector, &prop_keys,
1969 )?;
1970
1971 let mut options_with_secret = WithOptionsSecResolved::new(
1972 source.with_properties.0.clone(),
1973 source
1974 .secret_ref
1975 .map(|secret_ref| secret_ref.to_protobuf())
1976 .unwrap_or_default(),
1977 );
1978 let (to_add_secret_dep, to_remove_secret_dep) =
1979 options_with_secret.handle_update(alter_props, alter_secret_refs)?;
1980
1981 tracing::info!(
1982 "applying new properties to source: source_id={}, options_with_secret={:?}",
1983 source_id,
1984 options_with_secret
1985 );
1986 let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
1988 let mut associate_table_id = None;
1991
1992 let mut preferred_id = source_id.as_object_id();
1996 let rewrite_sql = {
1997 let definition = source.definition.clone();
1998
1999 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2000 .map_err(|e| {
2001 MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2002 anyhow!(e).context("Failed to parse source definition SQL"),
2003 )))
2004 })?
2005 .try_into()
2006 .unwrap();
2007
2008 async fn format_with_option_secret_resolved(
2022 txn: &DatabaseTransaction,
2023 options_with_secret: &WithOptionsSecResolved,
2024 ) -> MetaResult<Vec<SqlOption>> {
2025 let mut options = Vec::new();
2026 for (k, v) in options_with_secret.as_plaintext() {
2027 let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2028 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2029 options.push(sql_option);
2030 }
2031 for (k, v) in options_with_secret.as_secret() {
2032 if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2033 let sql_option =
2034 SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2035 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2036 options.push(sql_option);
2037 } else {
2038 return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2039 }
2040 }
2041 Ok(options)
2042 }
2043
2044 match &mut stmt {
2045 Statement::CreateSource { stmt } => {
2046 stmt.with_properties.0 =
2047 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2048 }
2049 Statement::CreateTable { with_options, .. } => {
2050 *with_options =
2051 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2052 associate_table_id = source.optional_associated_table_id;
2053 preferred_id = associate_table_id.unwrap().as_object_id();
2054 }
2055 _ => unreachable!(),
2056 }
2057
2058 stmt.to_string()
2059 };
2060
2061 {
2062 if !to_add_secret_dep.is_empty() {
2064 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2065 object_dependency::ActiveModel {
2066 oid: Set(secret_id.into()),
2067 used_by: Set(preferred_id),
2068 ..Default::default()
2069 }
2070 }))
2071 .exec(&txn)
2072 .await?;
2073 }
2074 if !to_remove_secret_dep.is_empty() {
2075 let _ = ObjectDependency::delete_many()
2077 .filter(
2078 object_dependency::Column::Oid
2079 .is_in(to_remove_secret_dep)
2080 .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2081 )
2082 .exec(&txn)
2083 .await?;
2084 }
2085 }
2086
2087 let active_source_model = source::ActiveModel {
2088 source_id: Set(source_id),
2089 definition: Set(rewrite_sql.clone()),
2090 with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2091 secret_ref: Set((!options_with_secret.as_secret().is_empty())
2092 .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2093 ..Default::default()
2094 };
2095 active_source_model.update(&txn).await?;
2096
2097 if let Some(associate_table_id) = associate_table_id {
2098 let active_table_model = table::ActiveModel {
2100 table_id: Set(associate_table_id),
2101 definition: Set(rewrite_sql),
2102 ..Default::default()
2103 };
2104 active_table_model.update(&txn).await?;
2105 }
2106
2107 let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2108 associate_table_id.as_job_id()
2110 } else {
2111 source_id.as_share_source_job_id()
2112 }]
2113 .into_iter()
2114 .chain(dep_source_job_ids.into_iter())
2115 .collect_vec();
2116
2117 update_connector_props_fragments(
2119 &txn,
2120 to_check_job_ids,
2121 FragmentTypeFlag::Source,
2122 |node, found| {
2123 if let PbNodeBody::Source(node) = node
2124 && let Some(source_inner) = &mut node.source_inner
2125 {
2126 source_inner.with_properties = options_with_secret.as_plaintext().clone();
2127 source_inner.secret_refs = options_with_secret.as_secret().clone();
2128 *found = true;
2129 }
2130 },
2131 is_shared_source,
2132 )
2133 .await?;
2134
2135 let mut to_update_objs = Vec::with_capacity(2);
2136 let (source, obj) = Source::find_by_id(source_id)
2137 .find_also_related(Object)
2138 .one(&txn)
2139 .await?
2140 .ok_or_else(|| {
2141 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2142 })?;
2143 to_update_objs.push(PbObject {
2144 object_info: Some(PbObjectInfo::Source(
2145 ObjectModel(source, obj.unwrap()).into(),
2146 )),
2147 });
2148
2149 if let Some(associate_table_id) = associate_table_id {
2150 let (table, obj) = Table::find_by_id(associate_table_id)
2151 .find_also_related(Object)
2152 .one(&txn)
2153 .await?
2154 .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2155 to_update_objs.push(PbObject {
2156 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2157 });
2158 }
2159
2160 txn.commit().await?;
2161
2162 self.notify_frontend(
2163 NotificationOperation::Update,
2164 NotificationInfo::ObjectGroup(PbObjectGroup {
2165 objects: to_update_objs,
2166 }),
2167 )
2168 .await;
2169
2170 Ok(options_with_secret)
2171 }
2172
2173 pub async fn update_sink_props_by_sink_id(
2174 &self,
2175 sink_id: SinkId,
2176 props: BTreeMap<String, String>,
2177 ) -> MetaResult<HashMap<String, String>> {
2178 let inner = self.inner.read().await;
2179 let txn = inner.db.begin().await?;
2180
2181 let (sink, _obj) = Sink::find_by_id(sink_id)
2182 .find_also_related(Object)
2183 .one(&txn)
2184 .await?
2185 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2186 validate_sink_props(&sink, &props)?;
2187 let definition = sink.definition.clone();
2188 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2189 .map_err(|e| SinkError::Config(anyhow!(e)))?
2190 .try_into()
2191 .unwrap();
2192 if let Statement::CreateSink { stmt } = &mut stmt {
2193 update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2194 } else {
2195 panic!("definition is not a create sink statement")
2196 }
2197 let mut new_config = sink.properties.clone().into_inner();
2198 new_config.extend(props.clone());
2199
2200 let definition = stmt.to_string();
2201 let active_sink = sink::ActiveModel {
2202 sink_id: Set(sink_id),
2203 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2204 definition: Set(definition),
2205 ..Default::default()
2206 };
2207 active_sink.update(&txn).await?;
2208
2209 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2210 let (sink, obj) = Sink::find_by_id(sink_id)
2211 .find_also_related(Object)
2212 .one(&txn)
2213 .await?
2214 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2215 txn.commit().await?;
2216 let relation_infos = vec![PbObject {
2217 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2218 }];
2219
2220 let _version = self
2221 .notify_frontend(
2222 NotificationOperation::Update,
2223 NotificationInfo::ObjectGroup(PbObjectGroup {
2224 objects: relation_infos,
2225 }),
2226 )
2227 .await;
2228
2229 Ok(props.into_iter().collect())
2230 }
2231
2232 pub async fn update_iceberg_table_props_by_table_id(
2233 &self,
2234 table_id: TableId,
2235 props: BTreeMap<String, String>,
2236 alter_iceberg_table_props: Option<
2237 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2238 >,
2239 ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2240 let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2241 ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2242 let inner = self.inner.read().await;
2243 let txn = inner.db.begin().await?;
2244
2245 let (sink, _obj) = Sink::find_by_id(sink_id)
2246 .find_also_related(Object)
2247 .one(&txn)
2248 .await?
2249 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2250 validate_sink_props(&sink, &props)?;
2251
2252 let definition = sink.definition.clone();
2253 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2254 .map_err(|e| SinkError::Config(anyhow!(e)))?
2255 .try_into()
2256 .unwrap();
2257 if let Statement::CreateTable {
2258 with_options,
2259 engine,
2260 ..
2261 } = &mut stmt
2262 {
2263 if !matches!(engine, Engine::Iceberg) {
2264 return Err(SinkError::Config(anyhow!(
2265 "only iceberg table can be altered as sink"
2266 ))
2267 .into());
2268 }
2269 update_stmt_with_props(with_options, &props)?;
2270 } else {
2271 panic!("definition is not a create iceberg table statement")
2272 }
2273 let mut new_config = sink.properties.clone().into_inner();
2274 new_config.extend(props.clone());
2275
2276 let definition = stmt.to_string();
2277 let active_sink = sink::ActiveModel {
2278 sink_id: Set(sink_id),
2279 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2280 definition: Set(definition.clone()),
2281 ..Default::default()
2282 };
2283 let active_source = source::ActiveModel {
2284 source_id: Set(source_id),
2285 definition: Set(definition.clone()),
2286 ..Default::default()
2287 };
2288 let active_table = table::ActiveModel {
2289 table_id: Set(table_id),
2290 definition: Set(definition),
2291 ..Default::default()
2292 };
2293 active_sink.update(&txn).await?;
2294 active_source.update(&txn).await?;
2295 active_table.update(&txn).await?;
2296
2297 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2298
2299 let (sink, sink_obj) = Sink::find_by_id(sink_id)
2300 .find_also_related(Object)
2301 .one(&txn)
2302 .await?
2303 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2304 let (source, source_obj) = Source::find_by_id(source_id)
2305 .find_also_related(Object)
2306 .one(&txn)
2307 .await?
2308 .ok_or_else(|| {
2309 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2310 })?;
2311 let (table, table_obj) = Table::find_by_id(table_id)
2312 .find_also_related(Object)
2313 .one(&txn)
2314 .await?
2315 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2316 txn.commit().await?;
2317 let relation_infos = vec![
2318 PbObject {
2319 object_info: Some(PbObjectInfo::Sink(
2320 ObjectModel(sink, sink_obj.unwrap()).into(),
2321 )),
2322 },
2323 PbObject {
2324 object_info: Some(PbObjectInfo::Source(
2325 ObjectModel(source, source_obj.unwrap()).into(),
2326 )),
2327 },
2328 PbObject {
2329 object_info: Some(PbObjectInfo::Table(
2330 ObjectModel(table, table_obj.unwrap()).into(),
2331 )),
2332 },
2333 ];
2334 let _version = self
2335 .notify_frontend(
2336 NotificationOperation::Update,
2337 NotificationInfo::ObjectGroup(PbObjectGroup {
2338 objects: relation_infos,
2339 }),
2340 )
2341 .await;
2342
2343 Ok((props.into_iter().collect(), sink_id))
2344 }
2345
2346 pub async fn update_fragment_rate_limit_by_fragment_id(
2347 &self,
2348 fragment_id: FragmentId,
2349 rate_limit: Option<u32>,
2350 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2351 let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2352 stream_node: &mut PbStreamNode| {
2353 let mut found = false;
2354 if fragment_type_mask.contains_any(
2355 FragmentTypeFlag::dml_rate_limit_fragments()
2356 .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2357 .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2358 .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2359 ) {
2360 visit_stream_node_mut(stream_node, |node| {
2361 if let PbNodeBody::Dml(node) = node {
2362 node.rate_limit = rate_limit;
2363 found = true;
2364 }
2365 if let PbNodeBody::Sink(node) = node {
2366 node.rate_limit = rate_limit;
2367 found = true;
2368 }
2369 if let PbNodeBody::StreamCdcScan(node) = node {
2370 node.rate_limit = rate_limit;
2371 found = true;
2372 }
2373 if let PbNodeBody::StreamScan(node) = node {
2374 node.rate_limit = rate_limit;
2375 found = true;
2376 }
2377 if let PbNodeBody::SourceBackfill(node) = node {
2378 node.rate_limit = rate_limit;
2379 found = true;
2380 }
2381 });
2382 }
2383 found
2384 };
2385 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2386 .await
2387 }
2388
2389 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2392 let inner = self.inner.read().await;
2393 let txn = inner.db.begin().await?;
2394
2395 let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2396 .select_only()
2397 .columns([
2398 fragment::Column::FragmentId,
2399 fragment::Column::JobId,
2400 fragment::Column::FragmentTypeMask,
2401 fragment::Column::StreamNode,
2402 ])
2403 .filter(FragmentTypeMask::intersects_any(
2404 FragmentTypeFlag::rate_limit_fragments(),
2405 ))
2406 .into_tuple()
2407 .all(&txn)
2408 .await?;
2409
2410 let mut rate_limits = Vec::new();
2411 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2412 let stream_node = stream_node.to_protobuf();
2413 visit_stream_node_body(&stream_node, |node| {
2414 let mut rate_limit = None;
2415 let mut node_name = None;
2416
2417 match node {
2418 PbNodeBody::Source(node) => {
2420 if let Some(node_inner) = &node.source_inner {
2421 rate_limit = node_inner.rate_limit;
2422 node_name = Some("SOURCE");
2423 }
2424 }
2425 PbNodeBody::StreamFsFetch(node) => {
2426 if let Some(node_inner) = &node.node_inner {
2427 rate_limit = node_inner.rate_limit;
2428 node_name = Some("FS_FETCH");
2429 }
2430 }
2431 PbNodeBody::SourceBackfill(node) => {
2433 rate_limit = node.rate_limit;
2434 node_name = Some("SOURCE_BACKFILL");
2435 }
2436 PbNodeBody::StreamScan(node) => {
2437 rate_limit = node.rate_limit;
2438 node_name = Some("STREAM_SCAN");
2439 }
2440 PbNodeBody::StreamCdcScan(node) => {
2441 rate_limit = node.rate_limit;
2442 node_name = Some("STREAM_CDC_SCAN");
2443 }
2444 PbNodeBody::Sink(node) => {
2445 rate_limit = node.rate_limit;
2446 node_name = Some("SINK");
2447 }
2448 _ => {}
2449 }
2450
2451 if let Some(rate_limit) = rate_limit {
2452 rate_limits.push(RateLimitInfo {
2453 fragment_id,
2454 job_id,
2455 fragment_type_mask: fragment_type_mask as u32,
2456 rate_limit,
2457 node_name: node_name.unwrap().to_owned(),
2458 });
2459 }
2460 });
2461 }
2462
2463 Ok(rate_limits)
2464 }
2465}
2466
2467fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
2468 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2470 Some(connector) => {
2471 let connector_type = connector.to_lowercase();
2472 let field_names: Vec<String> = props.keys().cloned().collect();
2473 check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2474 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2475
2476 match_sink_name_str!(
2477 connector_type.as_str(),
2478 SinkType,
2479 {
2480 let mut new_props = sink.properties.0.clone();
2481 new_props.extend(props.clone());
2482 SinkType::validate_alter_config(&new_props)
2483 },
2484 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2485 )?
2486 }
2487 None => {
2488 return Err(
2489 SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2490 );
2491 }
2492 };
2493 Ok(())
2494}
2495
2496fn update_stmt_with_props(
2497 with_properties: &mut Vec<SqlOption>,
2498 props: &BTreeMap<String, String>,
2499) -> MetaResult<()> {
2500 let mut new_sql_options = with_properties
2501 .iter()
2502 .map(|sql_option| (&sql_option.name, sql_option))
2503 .collect::<IndexMap<_, _>>();
2504 let add_sql_options = props
2505 .iter()
2506 .map(|(k, v)| SqlOption::try_from((k, v)))
2507 .collect::<Result<Vec<SqlOption>, ParserError>>()
2508 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2509 new_sql_options.extend(
2510 add_sql_options
2511 .iter()
2512 .map(|sql_option| (&sql_option.name, sql_option)),
2513 );
2514 *with_properties = new_sql_options.into_values().cloned().collect();
2515 Ok(())
2516}
2517
2518async fn update_sink_fragment_props(
2519 txn: &DatabaseTransaction,
2520 sink_id: SinkId,
2521 props: BTreeMap<String, String>,
2522) -> MetaResult<()> {
2523 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2524 .select_only()
2525 .columns([
2526 fragment::Column::FragmentId,
2527 fragment::Column::FragmentTypeMask,
2528 fragment::Column::StreamNode,
2529 ])
2530 .filter(fragment::Column::JobId.eq(sink_id))
2531 .into_tuple()
2532 .all(txn)
2533 .await?;
2534 let fragments = fragments
2535 .into_iter()
2536 .filter(|(_, fragment_type_mask, _)| {
2537 *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
2538 })
2539 .filter_map(|(id, _, stream_node)| {
2540 let mut stream_node = stream_node.to_protobuf();
2541 let mut found = false;
2542 visit_stream_node_mut(&mut stream_node, |node| {
2543 if let PbNodeBody::Sink(node) = node
2544 && let Some(sink_desc) = &mut node.sink_desc
2545 && sink_desc.id == sink_id
2546 {
2547 sink_desc.properties.extend(props.clone());
2548 found = true;
2549 }
2550 });
2551 if found { Some((id, stream_node)) } else { None }
2552 })
2553 .collect_vec();
2554 assert!(
2555 !fragments.is_empty(),
2556 "sink id should be used by at least one fragment"
2557 );
2558 for (id, stream_node) in fragments {
2559 fragment::ActiveModel {
2560 fragment_id: Set(id),
2561 stream_node: Set(StreamNode::from(&stream_node)),
2562 ..Default::default()
2563 }
2564 .update(txn)
2565 .await?;
2566 }
2567 Ok(())
2568}
2569
2570pub struct SinkIntoTableContext {
2571 pub updated_sink_catalogs: Vec<SinkId>,
2574}
2575
2576pub struct FinishAutoRefreshSchemaSinkContext {
2577 pub tmp_sink_id: SinkId,
2578 pub original_sink_id: SinkId,
2579 pub columns: Vec<PbColumnCatalog>,
2580 pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
2581}
2582
2583async fn update_connector_props_fragments<F>(
2584 txn: &DatabaseTransaction,
2585 job_ids: Vec<JobId>,
2586 expect_flag: FragmentTypeFlag,
2587 mut alter_stream_node_fn: F,
2588 is_shared_source: bool,
2589) -> MetaResult<()>
2590where
2591 F: FnMut(&mut PbNodeBody, &mut bool),
2592{
2593 let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
2594 .select_only()
2595 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
2596 .filter(
2597 fragment::Column::JobId
2598 .is_in(job_ids.clone())
2599 .and(FragmentTypeMask::intersects(expect_flag)),
2600 )
2601 .into_tuple()
2602 .all(txn)
2603 .await?;
2604 let fragments = fragments
2605 .into_iter()
2606 .filter_map(|(id, stream_node)| {
2607 let mut stream_node = stream_node.to_protobuf();
2608 let mut found = false;
2609 visit_stream_node_mut(&mut stream_node, |node| {
2610 alter_stream_node_fn(node, &mut found);
2611 });
2612 if found { Some((id, stream_node)) } else { None }
2613 })
2614 .collect_vec();
2615 if is_shared_source || job_ids.len() > 1 {
2616 assert!(
2620 !fragments.is_empty(),
2621 "job ids {:?} (type: {:?}) should be used by at least one fragment",
2622 job_ids,
2623 expect_flag
2624 );
2625 }
2626
2627 for (id, stream_node) in fragments {
2628 fragment::ActiveModel {
2629 fragment_id: Set(id),
2630 stream_node: Set(StreamNode::from(&stream_node)),
2631 ..Default::default()
2632 }
2633 .update(txn)
2634 .await?;
2635 }
2636
2637 Ok(())
2638}