1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::secret::LocalSecretManager;
26use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
27use risingwave_common::util::iter_util::ZipEqDebug;
28use risingwave_common::util::stream_graph_visitor::{
29 visit_stream_node_body, visit_stream_node_mut,
30};
31use risingwave_common::{bail, current_cluster_version};
32use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
33use risingwave_connector::connector_common::validate_connection;
34use risingwave_connector::error::ConnectorError;
35use risingwave_connector::sink::file_sink::fs::FsSink;
36use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
37use risingwave_connector::source::{
38 ConnectorProperties, UPSTREAM_SOURCE_KEY, pb_connection_type_to_connection_type,
39};
40use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
41use risingwave_meta_model::object::ObjectType;
42use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
43use risingwave_meta_model::refresh_job::RefreshState;
44use risingwave_meta_model::streaming_job::BackfillOrders;
45use risingwave_meta_model::table::TableType;
46use risingwave_meta_model::user_privilege::Action;
47use risingwave_meta_model::*;
48use risingwave_pb::catalog::{PbConnection, PbCreateType, PbTable};
49use risingwave_pb::ddl_service::streaming_job_resource_type;
50use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
51use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
52use risingwave_pb::meta::object::PbObjectInfo;
53use risingwave_pb::meta::subscribe_response::{
54 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
55};
56use risingwave_pb::meta::{ObjectDependency as PbObjectDependency, PbObject, PbObjectGroup};
57use risingwave_pb::plan_common::PbColumnCatalog;
58use risingwave_pb::plan_common::source_refresh_mode::{
59 RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
60};
61use risingwave_pb::secret::PbSecretRef;
62use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
63use risingwave_pb::stream_plan::stream_node::PbNodeBody;
64use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
65use risingwave_pb::user::PbUserInfo;
66use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
67use risingwave_sqlparser::parser::{Parser, ParserError};
68use sea_orm::ActiveValue::Set;
69use sea_orm::sea_query::{Expr, Query, SimpleExpr};
70use sea_orm::{
71 ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
72 NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
73};
74use thiserror_ext::AsReport;
75
76use super::rename::IndexItemRewriter;
77use crate::barrier::Command;
78use crate::controller::ObjectModel;
79use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
80use crate::controller::fragment::FragmentTypeMaskExt;
81use crate::controller::utils::{
82 PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
83 check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
84 ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
85 get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
86 list_object_dependencies_by_object_id, list_user_info_by_ids,
87 try_get_iceberg_table_by_downstream_sink,
88};
89use crate::error::MetaErrorInner;
90use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
91use crate::model::{
92 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
93 StreamJobFragmentsToCreate,
94};
95use crate::stream::SplitAssignment;
96use crate::{MetaError, MetaResult};
97
98#[derive(Debug)]
103struct DependentSourceFragmentUpdate {
104 job_ids: Vec<JobId>,
105 with_properties: BTreeMap<String, String>,
106 secret_refs: BTreeMap<String, PbSecretRef>,
107 is_shared_source: bool,
108}
109
110impl CatalogController {
111 #[allow(clippy::too_many_arguments)]
112 pub async fn create_streaming_job_obj(
113 txn: &DatabaseTransaction,
114 obj_type: ObjectType,
115 owner_id: UserId,
116 database_id: Option<DatabaseId>,
117 schema_id: Option<SchemaId>,
118 create_type: PbCreateType,
119 ctx: StreamContext,
120 streaming_parallelism: StreamingParallelism,
121 max_parallelism: usize,
122 resource_type: streaming_job_resource_type::ResourceType,
123 backfill_parallelism: Option<StreamingParallelism>,
124 ) -> MetaResult<JobId> {
125 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
126 let job_id = obj.oid.as_job_id();
127 let specific_resource_group = resource_type.resource_group();
128 let is_serverless_backfill = matches!(
129 &resource_type,
130 streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
131 );
132 let job = streaming_job::ActiveModel {
133 job_id: Set(job_id),
134 job_status: Set(JobStatus::Initial),
135 create_type: Set(create_type.into()),
136 timezone: Set(ctx.timezone),
137 config_override: Set(Some(ctx.config_override.to_string())),
138 adaptive_parallelism_strategy: Set(ctx
139 .adaptive_parallelism_strategy
140 .as_ref()
141 .map(ToString::to_string)),
142 parallelism: Set(streaming_parallelism),
143 backfill_parallelism: Set(backfill_parallelism),
144 backfill_orders: Set(None),
145 max_parallelism: Set(max_parallelism as _),
146 specific_resource_group: Set(specific_resource_group),
147 is_serverless_backfill: Set(is_serverless_backfill),
148 };
149 StreamingJobModel::insert(job).exec(txn).await?;
150
151 Ok(job_id)
152 }
153
154 #[await_tree::instrument]
160 pub async fn create_job_catalog(
161 &self,
162 streaming_job: &mut StreamingJob,
163 ctx: &StreamContext,
164 parallelism: &Option<Parallelism>,
165 max_parallelism: usize,
166 mut dependencies: HashSet<ObjectId>,
167 resource_type: streaming_job_resource_type::ResourceType,
168 backfill_parallelism: &Option<Parallelism>,
169 ) -> MetaResult<()> {
170 let inner = self.inner.write().await;
171 let txn = inner.db.begin().await?;
172 let create_type = streaming_job.create_type();
173
174 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
175 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
176 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
177 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
178 };
179 let backfill_parallelism = backfill_parallelism
180 .as_ref()
181 .map(|p| StreamingParallelism::Fixed(p.parallelism as _));
182
183 ensure_user_id(streaming_job.owner() as _, &txn).await?;
184 ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
185 ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
186 check_relation_name_duplicate(
187 &streaming_job.name(),
188 streaming_job.database_id(),
189 streaming_job.schema_id(),
190 &txn,
191 )
192 .await?;
193
194 if !dependencies.is_empty() {
196 let altering_cnt = ObjectDependency::find()
197 .join(
198 JoinType::InnerJoin,
199 object_dependency::Relation::Object1.def(),
200 )
201 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
202 .filter(
203 object_dependency::Column::Oid
204 .is_in(dependencies.clone())
205 .and(object::Column::ObjType.eq(ObjectType::Table))
206 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
207 .and(
208 object::Column::Oid.not_in_subquery(
210 Query::select()
211 .column(table::Column::TableId)
212 .from(Table)
213 .to_owned(),
214 ),
215 ),
216 )
217 .count(&txn)
218 .await?;
219 if altering_cnt != 0 {
220 return Err(MetaError::permission_denied(
221 "some dependent relations are being altered",
222 ));
223 }
224 }
225
226 match streaming_job {
227 StreamingJob::MaterializedView(table) => {
228 let job_id = Self::create_streaming_job_obj(
229 &txn,
230 ObjectType::Table,
231 table.owner as _,
232 Some(table.database_id),
233 Some(table.schema_id),
234 create_type,
235 ctx.clone(),
236 streaming_parallelism,
237 max_parallelism,
238 resource_type,
239 backfill_parallelism.clone(),
240 )
241 .await?;
242 table.id = job_id.as_mv_table_id();
243 let table_model: table::ActiveModel = table.clone().into();
244 Table::insert(table_model).exec(&txn).await?;
245 }
246 StreamingJob::Sink(sink) => {
247 if let Some(target_table_id) = sink.target_table
248 && check_sink_into_table_cycle(
249 target_table_id.into(),
250 dependencies.iter().cloned().collect(),
251 &txn,
252 )
253 .await?
254 {
255 bail!("Creating such a sink will result in circular dependency.");
256 }
257
258 let job_id = Self::create_streaming_job_obj(
259 &txn,
260 ObjectType::Sink,
261 sink.owner as _,
262 Some(sink.database_id),
263 Some(sink.schema_id),
264 create_type,
265 ctx.clone(),
266 streaming_parallelism,
267 max_parallelism,
268 resource_type,
269 backfill_parallelism.clone(),
270 )
271 .await?;
272 sink.id = job_id.as_sink_id();
273 let sink_model: sink::ActiveModel = sink.clone().into();
274 Sink::insert(sink_model).exec(&txn).await?;
275 }
276 StreamingJob::Table(src, table, _) => {
277 let job_id = Self::create_streaming_job_obj(
278 &txn,
279 ObjectType::Table,
280 table.owner as _,
281 Some(table.database_id),
282 Some(table.schema_id),
283 create_type,
284 ctx.clone(),
285 streaming_parallelism,
286 max_parallelism,
287 resource_type,
288 backfill_parallelism.clone(),
289 )
290 .await?;
291 table.id = job_id.as_mv_table_id();
292 if let Some(src) = src {
293 let src_obj = Self::create_object(
294 &txn,
295 ObjectType::Source,
296 src.owner as _,
297 Some(src.database_id),
298 Some(src.schema_id),
299 )
300 .await?;
301 src.id = src_obj.oid.as_source_id();
302 src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
303 table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
304 let source: source::ActiveModel = src.clone().into();
305 Source::insert(source).exec(&txn).await?;
306 }
307 let table_model: table::ActiveModel = table.clone().into();
308 Table::insert(table_model).exec(&txn).await?;
309
310 if table.refreshable {
311 let trigger_interval_secs = src
312 .as_ref()
313 .and_then(|source_catalog| source_catalog.refresh_mode)
314 .and_then(
315 |source_refresh_mode| match source_refresh_mode.refresh_mode {
316 Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
317 refresh_interval_sec,
318 })) => refresh_interval_sec,
319 Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
320 None => None,
321 },
322 );
323
324 RefreshJob::insert(refresh_job::ActiveModel {
325 table_id: Set(table.id),
326 last_trigger_time: Set(None),
327 trigger_interval_secs: Set(trigger_interval_secs),
328 current_status: Set(RefreshState::Idle),
329 last_success_time: Set(None),
330 })
331 .exec(&txn)
332 .await?;
333 }
334 }
335 StreamingJob::Index(index, table) => {
336 ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
337 let job_id = Self::create_streaming_job_obj(
338 &txn,
339 ObjectType::Index,
340 index.owner as _,
341 Some(index.database_id),
342 Some(index.schema_id),
343 create_type,
344 ctx.clone(),
345 streaming_parallelism,
346 max_parallelism,
347 resource_type,
348 backfill_parallelism.clone(),
349 )
350 .await?;
351 index.id = job_id.as_index_id();
353 index.index_table_id = job_id.as_mv_table_id();
354 table.id = job_id.as_mv_table_id();
355
356 ObjectDependency::insert(object_dependency::ActiveModel {
357 oid: Set(index.primary_table_id.into()),
358 used_by: Set(table.id.into()),
359 ..Default::default()
360 })
361 .exec(&txn)
362 .await?;
363
364 let table_model: table::ActiveModel = table.clone().into();
365 Table::insert(table_model).exec(&txn).await?;
366 let index_model: index::ActiveModel = index.clone().into();
367 Index::insert(index_model).exec(&txn).await?;
368 }
369 StreamingJob::Source(src) => {
370 let job_id = Self::create_streaming_job_obj(
371 &txn,
372 ObjectType::Source,
373 src.owner as _,
374 Some(src.database_id),
375 Some(src.schema_id),
376 create_type,
377 ctx.clone(),
378 streaming_parallelism,
379 max_parallelism,
380 resource_type,
381 backfill_parallelism.clone(),
382 )
383 .await?;
384 src.id = job_id.as_shared_source_id();
385 let source_model: source::ActiveModel = src.clone().into();
386 Source::insert(source_model).exec(&txn).await?;
387 }
388 }
389
390 dependencies.extend(
392 streaming_job
393 .dependent_secret_ids()?
394 .into_iter()
395 .map(|id| id.as_object_id()),
396 );
397 dependencies.extend(
399 streaming_job
400 .dependent_connection_ids()?
401 .into_iter()
402 .map(|id| id.as_object_id()),
403 );
404
405 if !dependencies.is_empty() {
407 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
408 object_dependency::ActiveModel {
409 oid: Set(oid),
410 used_by: Set(streaming_job.id().as_object_id()),
411 ..Default::default()
412 }
413 }))
414 .exec(&txn)
415 .await?;
416 }
417
418 txn.commit().await?;
419
420 Ok(())
421 }
422
423 pub async fn create_internal_table_catalog(
431 &self,
432 job: &StreamingJob,
433 mut incomplete_internal_tables: Vec<PbTable>,
434 ) -> MetaResult<HashMap<TableId, TableId>> {
435 let job_id = job.id();
436 let inner = self.inner.write().await;
437 let txn = inner.db.begin().await?;
438
439 ensure_job_not_canceled(job_id, &txn).await?;
441
442 let mut table_id_map = HashMap::new();
443 for table in &mut incomplete_internal_tables {
444 let table_id = Self::create_object(
445 &txn,
446 ObjectType::Table,
447 table.owner as _,
448 Some(table.database_id),
449 Some(table.schema_id),
450 )
451 .await?
452 .oid
453 .as_table_id();
454 table_id_map.insert(table.id, table_id);
455 table.id = table_id;
456 table.job_id = Some(job_id);
457
458 let table_model = table::ActiveModel {
459 table_id: Set(table_id),
460 belongs_to_job_id: Set(Some(job_id)),
461 fragment_id: NotSet,
462 ..table.clone().into()
463 };
464 Table::insert(table_model).exec(&txn).await?;
465 }
466 txn.commit().await?;
467
468 Ok(table_id_map)
469 }
470
471 pub async fn prepare_stream_job_fragments(
472 &self,
473 stream_job_fragments: &StreamJobFragmentsToCreate,
474 streaming_job: &StreamingJob,
475 for_replace: bool,
476 backfill_orders: Option<BackfillOrders>,
477 ) -> MetaResult<()> {
478 self.prepare_streaming_job(
479 stream_job_fragments.stream_job_id(),
480 || stream_job_fragments.fragments.values(),
481 &stream_job_fragments.downstreams,
482 for_replace,
483 Some(streaming_job),
484 backfill_orders,
485 )
486 .await
487 }
488
489 #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
494 )]
495 pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
496 &self,
497 job_id: JobId,
498 get_fragments: impl Fn() -> I + 'a,
499 downstreams: &FragmentDownstreamRelation,
500 for_replace: bool,
501 creating_streaming_job: Option<&'a StreamingJob>,
502 backfill_orders: Option<BackfillOrders>,
503 ) -> MetaResult<()> {
504 let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
505
506 let inner = self.inner.write().await;
507
508 let need_notify = creating_streaming_job
509 .map(|job| job.should_notify_creating())
510 .unwrap_or(false);
511 let definition = creating_streaming_job.map(|job| job.definition());
512
513 let mut objects_to_notify = vec![];
514 let txn = inner.db.begin().await?;
515
516 ensure_job_not_canceled(job_id, &txn).await?;
518
519 if let Some(backfill_orders) = backfill_orders {
520 let job = streaming_job::ActiveModel {
521 job_id: Set(job_id),
522 backfill_orders: Set(Some(backfill_orders)),
523 ..Default::default()
524 };
525 StreamingJobModel::update(job).exec(&txn).await?;
526 }
527
528 let state_table_ids = fragments
529 .iter()
530 .flat_map(|fragment| fragment.state_table_ids.inner_ref().clone())
531 .collect_vec();
532
533 if !fragments.is_empty() {
534 let fragment_models = fragments
535 .into_iter()
536 .map(|fragment| fragment.into_active_model())
537 .collect_vec();
538 Fragment::insert_many(fragment_models).exec(&txn).await?;
539 }
540
541 if !for_replace {
544 let all_tables = StreamJobFragments::collect_tables(get_fragments());
545 for state_table_id in state_table_ids {
546 let table = all_tables
550 .get(&state_table_id)
551 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
552 assert_eq!(table.id, state_table_id);
553 let vnode_count = table.vnode_count();
554
555 Table::update(table::ActiveModel {
556 table_id: Set(state_table_id as _),
557 fragment_id: Set(Some(table.fragment_id)),
558 vnode_count: Set(vnode_count as _),
559 ..Default::default()
560 })
561 .exec(&txn)
562 .await?;
563
564 if need_notify {
565 let mut table = table.clone();
566 if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
568 table.definition = definition.clone().unwrap();
569 }
570 objects_to_notify.push(PbObject {
571 object_info: Some(PbObjectInfo::Table(table)),
572 });
573 }
574 }
575 }
576
577 if need_notify {
579 match creating_streaming_job.unwrap() {
580 StreamingJob::Table(Some(source), ..) => {
581 objects_to_notify.push(PbObject {
582 object_info: Some(PbObjectInfo::Source(source.clone())),
583 });
584 }
585 StreamingJob::Sink(sink) => {
586 objects_to_notify.push(PbObject {
587 object_info: Some(PbObjectInfo::Sink(sink.clone())),
588 });
589 }
590 StreamingJob::Index(index, _) => {
591 objects_to_notify.push(PbObject {
592 object_info: Some(PbObjectInfo::Index(index.clone())),
593 });
594 }
595 _ => {}
596 }
597 }
598
599 insert_fragment_relations(&txn, downstreams).await?;
600
601 if !for_replace {
602 if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
604 Table::update(table::ActiveModel {
605 table_id: Set(table.id),
606 dml_fragment_id: Set(table.dml_fragment_id),
607 ..Default::default()
608 })
609 .exec(&txn)
610 .await?;
611 }
612 }
613
614 txn.commit().await?;
615
616 if !objects_to_notify.is_empty() {
620 self.notify_frontend(
621 Operation::Add,
622 Info::ObjectGroup(PbObjectGroup {
623 objects: objects_to_notify,
624 dependencies: vec![],
625 }),
626 )
627 .await;
628 }
629
630 Ok(())
631 }
632
633 pub async fn build_cancel_command(
636 &self,
637 table_fragments: &StreamJobFragments,
638 ) -> MetaResult<Command> {
639 let inner = self.inner.read().await;
640 let txn = inner.db.begin().await?;
641
642 let dropped_sink_fragment_with_target =
643 if let Some(sink_fragment) = table_fragments.sink_fragment() {
644 let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
645 let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
646 sink_target_fragment
647 .get(&sink_fragment_id)
648 .map(|target_fragments| {
649 let target_fragment_id = *target_fragments
650 .first()
651 .expect("sink should have at least one downstream fragment");
652 (sink_fragment_id, target_fragment_id)
653 })
654 } else {
655 None
656 };
657
658 Ok(Command::DropStreamingJobs {
659 streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
660 actors: table_fragments.actor_ids().collect(),
661 unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
662 unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
663 dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
664 .into_iter()
665 .map(|(sink, target)| (target as _, vec![sink as _]))
666 .collect(),
667 })
668 }
669
670 #[await_tree::instrument]
674 pub async fn try_abort_creating_streaming_job(
675 &self,
676 mut job_id: JobId,
677 is_cancelled: bool,
678 ) -> MetaResult<(bool, Option<DatabaseId>)> {
679 let mut inner = self.inner.write().await;
680 let txn = inner.db.begin().await?;
681
682 let obj = Object::find_by_id(job_id).one(&txn).await?;
683 let Some(obj) = obj else {
684 tracing::warn!(
685 id = %job_id,
686 "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
687 );
688 return Ok((true, None));
689 };
690 let database_id = obj
691 .database_id
692 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
693 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
694
695 if !is_cancelled && let Some(streaming_job) = &streaming_job {
696 assert_ne!(streaming_job.job_status, JobStatus::Created);
697 if streaming_job.create_type == CreateType::Background
698 && streaming_job.job_status == JobStatus::Creating
699 {
700 if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
701 && check_if_belongs_to_iceberg_table(&txn, job_id).await?
702 {
703 } else {
705 tracing::warn!(
707 id = %job_id,
708 "streaming job is created in background and still in creating status"
709 );
710 return Ok((false, Some(database_id)));
711 }
712 }
713 }
714
715 let original_job_id = job_id;
717 let original_obj_type = obj.obj_type;
718
719 let iceberg_table_id =
720 try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
721 if let Some(iceberg_table_id) = iceberg_table_id {
722 let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
725 Object::delete_many()
726 .filter(
727 object::Column::Oid
728 .eq(job_id)
729 .or(object::Column::Oid.is_in(internal_tables)),
730 )
731 .exec(&txn)
732 .await?;
733 job_id = iceberg_table_id.as_job_id();
734 };
735
736 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
737
738 let mut objs = vec![];
740 let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
741
742 let mut need_notify =
743 streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
744 if !need_notify {
745 if let Some(table) = &table_obj {
746 need_notify = table.table_type == TableType::MaterializedView;
747 } else if original_obj_type == ObjectType::Sink {
748 need_notify = true;
749 }
750 }
751
752 if is_cancelled {
753 let dropped_tables = Table::find()
754 .find_also_related(Object)
755 .filter(
756 table::Column::TableId.is_in(
757 internal_table_ids
758 .iter()
759 .cloned()
760 .chain(table_obj.iter().map(|t| t.table_id as _)),
761 ),
762 )
763 .all(&txn)
764 .await?
765 .into_iter()
766 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap(), None)));
767 inner
768 .dropped_tables
769 .extend(dropped_tables.map(|t| (t.id, t)));
770 }
771
772 if need_notify {
773 if original_obj_type == ObjectType::Sink && original_job_id != job_id {
776 let orig_obj: Option<PartialObject> = Object::find_by_id(original_job_id)
777 .select_only()
778 .columns([
779 object::Column::Oid,
780 object::Column::ObjType,
781 object::Column::SchemaId,
782 object::Column::DatabaseId,
783 ])
784 .into_partial_model()
785 .one(&txn)
786 .await?;
787 if let Some(orig_obj) = orig_obj {
788 objs.push(orig_obj);
789 }
790 }
791
792 let obj: Option<PartialObject> = Object::find_by_id(job_id)
793 .select_only()
794 .columns([
795 object::Column::Oid,
796 object::Column::ObjType,
797 object::Column::SchemaId,
798 object::Column::DatabaseId,
799 ])
800 .into_partial_model()
801 .one(&txn)
802 .await?;
803 let obj =
804 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
805 objs.push(obj);
806 let internal_table_objs: Vec<PartialObject> = Object::find()
807 .select_only()
808 .columns([
809 object::Column::Oid,
810 object::Column::ObjType,
811 object::Column::SchemaId,
812 object::Column::DatabaseId,
813 ])
814 .join(JoinType::InnerJoin, object::Relation::Table.def())
815 .filter(table::Column::BelongsToJobId.eq(job_id))
816 .into_partial_model()
817 .all(&txn)
818 .await?;
819 objs.extend(internal_table_objs);
820 }
821
822 if table_obj.is_none()
824 && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_sink_id())
825 .select_only()
826 .column(sink::Column::TargetTable)
827 .into_tuple::<Option<TableId>>()
828 .one(&txn)
829 .await?
830 {
831 let tmp_id: Option<ObjectId> = ObjectDependency::find()
832 .select_only()
833 .column(object_dependency::Column::UsedBy)
834 .join(
835 JoinType::InnerJoin,
836 object_dependency::Relation::Object1.def(),
837 )
838 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
839 .filter(
840 object_dependency::Column::Oid
841 .eq(target_table_id)
842 .and(object::Column::ObjType.eq(ObjectType::Table))
843 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
844 )
845 .into_tuple()
846 .one(&txn)
847 .await?;
848 if let Some(tmp_id) = tmp_id {
849 tracing::warn!(
850 id = %tmp_id,
851 "aborting temp streaming job for sink into table"
852 );
853
854 Object::delete_by_id(tmp_id).exec(&txn).await?;
855 }
856 }
857
858 Object::delete_by_id(job_id).exec(&txn).await?;
859 if !internal_table_ids.is_empty() {
860 Object::delete_many()
861 .filter(object::Column::Oid.is_in(internal_table_ids))
862 .exec(&txn)
863 .await?;
864 }
865 if let Some(t) = &table_obj
866 && let Some(source_id) = t.optional_associated_source_id
867 {
868 Object::delete_by_id(source_id).exec(&txn).await?;
869 }
870
871 let err = if is_cancelled {
872 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
873 } else {
874 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
875 };
876 let abort_reason = format!("streaming job aborted {}", err.as_report());
877 for tx in inner
878 .creating_table_finish_notifier
879 .get_mut(&database_id)
880 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
881 .into_iter()
882 .flatten()
883 .flatten()
884 {
885 let _ = tx.send(Err(abort_reason.clone()));
886 }
887 txn.commit().await?;
888
889 if !objs.is_empty() {
890 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
893 .await;
894 }
895 Ok((true, Some(database_id)))
896 }
897
898 #[await_tree::instrument]
899 pub async fn post_collect_job_fragments(
900 &self,
901 job_id: JobId,
902 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
903 new_sink_downstream: Option<FragmentDownstreamRelation>,
904 split_assignment: Option<&SplitAssignment>,
905 ) -> MetaResult<()> {
906 let inner = self.inner.write().await;
907 let txn = inner.db.begin().await?;
908
909 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
910
911 if let Some(new_downstream) = new_sink_downstream {
912 insert_fragment_relations(&txn, &new_downstream).await?;
913 }
914
915 StreamingJobModel::update(streaming_job::ActiveModel {
917 job_id: Set(job_id),
918 job_status: Set(JobStatus::Creating),
919 ..Default::default()
920 })
921 .exec(&txn)
922 .await?;
923
924 if let Some(split_assignment) = split_assignment {
925 let fragment_splits = split_assignment
926 .iter()
927 .map(|(fragment_id, splits)| {
928 (
929 *fragment_id as _,
930 splits.values().flatten().cloned().collect_vec(),
931 )
932 })
933 .collect();
934
935 self.update_fragment_splits(&txn, &fragment_splits).await?;
936 }
937
938 txn.commit().await?;
939
940 Ok(())
941 }
942
943 pub async fn create_job_catalog_for_replace(
944 &self,
945 streaming_job: &StreamingJob,
946 ctx: Option<&StreamContext>,
947 specified_parallelism: Option<&NonZeroUsize>,
948 expected_original_max_parallelism: Option<usize>,
949 ) -> MetaResult<JobId> {
950 let id = streaming_job.id();
951 let inner = self.inner.write().await;
952 let txn = inner.db.begin().await?;
953
954 streaming_job.verify_version_for_replace(&txn).await?;
956 let referring_cnt = ObjectDependency::find()
958 .join(
959 JoinType::InnerJoin,
960 object_dependency::Relation::Object1.def(),
961 )
962 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
963 .filter(
964 object_dependency::Column::Oid
965 .eq(id)
966 .and(object::Column::ObjType.eq(ObjectType::Table))
967 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
968 )
969 .count(&txn)
970 .await?;
971 if referring_cnt != 0 {
972 return Err(MetaError::permission_denied(
973 "job is being altered or referenced by some creating jobs",
974 ));
975 }
976
977 let (
979 original_max_parallelism,
980 original_timezone,
981 original_config_override,
982 original_adaptive_strategy,
983 ): (i32, Option<String>, Option<String>, Option<String>) =
984 StreamingJobModel::find_by_id(id)
985 .select_only()
986 .column(streaming_job::Column::MaxParallelism)
987 .column(streaming_job::Column::Timezone)
988 .column(streaming_job::Column::ConfigOverride)
989 .column(streaming_job::Column::AdaptiveParallelismStrategy)
990 .into_tuple()
991 .one(&txn)
992 .await?
993 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
994
995 if let Some(max_parallelism) = expected_original_max_parallelism
996 && original_max_parallelism != max_parallelism as i32
997 {
998 bail!(
1001 "cannot use a different max parallelism \
1002 when replacing streaming job, \
1003 original: {}, new: {}",
1004 original_max_parallelism,
1005 max_parallelism
1006 );
1007 }
1008
1009 let parallelism = match specified_parallelism {
1010 None => StreamingParallelism::Adaptive,
1011 Some(n) => StreamingParallelism::Fixed(n.get() as _),
1012 };
1013
1014 let ctx = StreamContext {
1015 timezone: ctx
1016 .map(|ctx| ctx.timezone.clone())
1017 .unwrap_or(original_timezone),
1018 config_override: original_config_override.unwrap_or_default().into(),
1021 adaptive_parallelism_strategy: original_adaptive_strategy.as_deref().map(|s| {
1022 parse_strategy(s).expect("strategy should be validated before persisting")
1023 }),
1024 };
1025
1026 let new_obj_id = Self::create_streaming_job_obj(
1028 &txn,
1029 streaming_job.object_type(),
1030 streaming_job.owner() as _,
1031 Some(streaming_job.database_id() as _),
1032 Some(streaming_job.schema_id() as _),
1033 streaming_job.create_type(),
1034 ctx,
1035 parallelism,
1036 original_max_parallelism as _,
1037 streaming_job_resource_type::ResourceType::Regular(true),
1038 None,
1039 )
1040 .await?;
1041
1042 ObjectDependency::insert(object_dependency::ActiveModel {
1044 oid: Set(id.as_object_id()),
1045 used_by: Set(new_obj_id.as_object_id()),
1046 ..Default::default()
1047 })
1048 .exec(&txn)
1049 .await?;
1050
1051 txn.commit().await?;
1052
1053 Ok(new_obj_id)
1054 }
1055
1056 pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
1058 let mut inner = self.inner.write().await;
1059 let txn = inner.db.begin().await?;
1060
1061 if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
1063 tracing::info!(
1064 "streaming job {} is for iceberg table, wait for manual finish operation",
1065 job_id
1066 );
1067 return Ok(());
1068 }
1069
1070 let (notification_op, objects, updated_user_info, dependencies) =
1071 self.finish_streaming_job_inner(&txn, job_id).await?;
1072
1073 txn.commit().await?;
1074
1075 let mut version = self
1076 .notify_frontend(
1077 notification_op,
1078 NotificationInfo::ObjectGroup(PbObjectGroup {
1079 objects,
1080 dependencies,
1081 }),
1082 )
1083 .await;
1084
1085 if !updated_user_info.is_empty() {
1087 version = self.notify_users_update(updated_user_info).await;
1088 }
1089
1090 inner
1091 .creating_table_finish_notifier
1092 .values_mut()
1093 .for_each(|creating_tables| {
1094 if let Some(txs) = creating_tables.remove(&job_id) {
1095 for tx in txs {
1096 let _ = tx.send(Ok(version));
1097 }
1098 }
1099 });
1100
1101 Ok(())
1102 }
1103
1104 pub async fn finish_streaming_job_inner(
1106 &self,
1107 txn: &DatabaseTransaction,
1108 job_id: JobId,
1109 ) -> MetaResult<(
1110 Operation,
1111 Vec<risingwave_pb::meta::Object>,
1112 Vec<PbUserInfo>,
1113 Vec<PbObjectDependency>,
1114 )> {
1115 let job_type = Object::find_by_id(job_id)
1116 .select_only()
1117 .column(object::Column::ObjType)
1118 .into_tuple()
1119 .one(txn)
1120 .await?
1121 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1122
1123 let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1124 .select_only()
1125 .column(streaming_job::Column::CreateType)
1126 .into_tuple()
1127 .one(txn)
1128 .await?
1129 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1130
1131 let res = Object::update_many()
1133 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1134 .col_expr(
1135 object::Column::CreatedAtClusterVersion,
1136 current_cluster_version().into(),
1137 )
1138 .filter(object::Column::Oid.eq(job_id))
1139 .exec(txn)
1140 .await?;
1141 if res.rows_affected == 0 {
1142 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1143 }
1144
1145 let job = streaming_job::ActiveModel {
1147 job_id: Set(job_id),
1148 job_status: Set(JobStatus::Created),
1149 ..Default::default()
1150 };
1151 let streaming_job = Some(job.update(txn).await?);
1152
1153 let internal_table_objs = Table::find()
1155 .find_also_related(Object)
1156 .filter(table::Column::BelongsToJobId.eq(job_id))
1157 .all(txn)
1158 .await?;
1159 let mut objects = internal_table_objs
1160 .iter()
1161 .map(|(table, obj)| PbObject {
1162 object_info: Some(PbObjectInfo::Table(
1163 ObjectModel(table.clone(), obj.clone().unwrap(), streaming_job.clone()).into(),
1164 )),
1165 })
1166 .collect_vec();
1167 let mut notification_op = if create_type == CreateType::Background {
1168 NotificationOperation::Update
1169 } else {
1170 NotificationOperation::Add
1171 };
1172 let mut updated_user_info = vec![];
1173 let mut need_grant_default_privileges = true;
1174
1175 match job_type {
1176 ObjectType::Table => {
1177 let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1178 .find_also_related(Object)
1179 .one(txn)
1180 .await?
1181 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1182 if table.table_type == TableType::MaterializedView {
1183 notification_op = NotificationOperation::Update;
1184 }
1185
1186 if let Some(source_id) = table.optional_associated_source_id {
1187 let (src, obj) = Source::find_by_id(source_id)
1188 .find_also_related(Object)
1189 .one(txn)
1190 .await?
1191 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1192 objects.push(PbObject {
1193 object_info: Some(PbObjectInfo::Source(
1194 ObjectModel(src, obj.unwrap(), None).into(),
1195 )),
1196 });
1197 }
1198 objects.push(PbObject {
1199 object_info: Some(PbObjectInfo::Table(
1200 ObjectModel(table, obj.unwrap(), streaming_job).into(),
1201 )),
1202 });
1203 }
1204 ObjectType::Sink => {
1205 let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1206 .find_also_related(Object)
1207 .one(txn)
1208 .await?
1209 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1210 if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
1211 need_grant_default_privileges = false;
1212 }
1213 notification_op = NotificationOperation::Update;
1216 objects.push(PbObject {
1217 object_info: Some(PbObjectInfo::Sink(
1218 ObjectModel(sink, obj.unwrap(), streaming_job).into(),
1219 )),
1220 });
1221 }
1222 ObjectType::Index => {
1223 need_grant_default_privileges = false;
1224 let (index, obj) = Index::find_by_id(job_id.as_index_id())
1225 .find_also_related(Object)
1226 .one(txn)
1227 .await?
1228 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1229 {
1230 let (table, obj) = Table::find_by_id(index.index_table_id)
1231 .find_also_related(Object)
1232 .one(txn)
1233 .await?
1234 .ok_or_else(|| {
1235 MetaError::catalog_id_not_found("table", index.index_table_id)
1236 })?;
1237 objects.push(PbObject {
1238 object_info: Some(PbObjectInfo::Table(
1239 ObjectModel(table, obj.unwrap(), streaming_job.clone()).into(),
1240 )),
1241 });
1242 }
1243
1244 let primary_table_privileges = UserPrivilege::find()
1247 .filter(
1248 user_privilege::Column::Oid
1249 .eq(index.primary_table_id)
1250 .and(user_privilege::Column::Action.eq(Action::Select)),
1251 )
1252 .all(txn)
1253 .await?;
1254 if !primary_table_privileges.is_empty() {
1255 let index_state_table_ids: Vec<TableId> = Table::find()
1256 .select_only()
1257 .column(table::Column::TableId)
1258 .filter(
1259 table::Column::BelongsToJobId
1260 .eq(job_id)
1261 .or(table::Column::TableId.eq(index.index_table_id)),
1262 )
1263 .into_tuple()
1264 .all(txn)
1265 .await?;
1266 let mut new_privileges = vec![];
1267 for privilege in &primary_table_privileges {
1268 for state_table_id in &index_state_table_ids {
1269 new_privileges.push(user_privilege::ActiveModel {
1270 id: Default::default(),
1271 oid: Set(state_table_id.as_object_id()),
1272 user_id: Set(privilege.user_id),
1273 action: Set(Action::Select),
1274 dependent_id: Set(privilege.dependent_id),
1275 granted_by: Set(privilege.granted_by),
1276 with_grant_option: Set(privilege.with_grant_option),
1277 });
1278 }
1279 }
1280 UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1281
1282 updated_user_info = list_user_info_by_ids(
1283 primary_table_privileges.into_iter().map(|p| p.user_id),
1284 txn,
1285 )
1286 .await?;
1287 }
1288
1289 objects.push(PbObject {
1290 object_info: Some(PbObjectInfo::Index(
1291 ObjectModel(index, obj.unwrap(), streaming_job).into(),
1292 )),
1293 });
1294 }
1295 ObjectType::Source => {
1296 let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1297 .find_also_related(Object)
1298 .one(txn)
1299 .await?
1300 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1301 objects.push(PbObject {
1302 object_info: Some(PbObjectInfo::Source(
1303 ObjectModel(source, obj.unwrap(), None).into(),
1304 )),
1305 });
1306 }
1307 _ => unreachable!("invalid job type: {:?}", job_type),
1308 }
1309
1310 if need_grant_default_privileges {
1311 updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1312 }
1313
1314 let dependencies =
1315 list_object_dependencies_by_object_id(txn, job_id.as_object_id()).await?;
1316
1317 Ok((notification_op, objects, updated_user_info, dependencies))
1318 }
1319
1320 pub async fn finish_replace_streaming_job(
1321 &self,
1322 tmp_id: JobId,
1323 streaming_job: StreamingJob,
1324 replace_upstream: FragmentReplaceUpstream,
1325 sink_into_table_context: SinkIntoTableContext,
1326 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1327 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1328 ) -> MetaResult<NotificationVersion> {
1329 let inner = self.inner.write().await;
1330 let txn = inner.db.begin().await?;
1331
1332 let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1333 tmp_id,
1334 replace_upstream,
1335 sink_into_table_context,
1336 &txn,
1337 streaming_job,
1338 drop_table_connector_ctx,
1339 auto_refresh_schema_sinks,
1340 )
1341 .await?;
1342
1343 txn.commit().await?;
1344
1345 let mut version = self
1346 .notify_frontend(
1347 NotificationOperation::Update,
1348 NotificationInfo::ObjectGroup(PbObjectGroup {
1349 objects,
1350 dependencies: vec![],
1351 }),
1352 )
1353 .await;
1354
1355 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1356 self.notify_users_update(user_infos).await;
1357 version = self
1358 .notify_frontend(
1359 NotificationOperation::Delete,
1360 build_object_group_for_delete(to_drop_objects),
1361 )
1362 .await;
1363 }
1364
1365 Ok(version)
1366 }
1367
1368 pub async fn finish_replace_streaming_job_inner(
1369 tmp_id: JobId,
1370 replace_upstream: FragmentReplaceUpstream,
1371 SinkIntoTableContext {
1372 updated_sink_catalogs,
1373 }: SinkIntoTableContext,
1374 txn: &DatabaseTransaction,
1375 streaming_job: StreamingJob,
1376 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1377 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1378 ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1379 let original_job_id = streaming_job.id();
1380 let job_type = streaming_job.job_type();
1381
1382 let mut index_item_rewriter = None;
1383
1384 match streaming_job {
1386 StreamingJob::Table(_source, table, _table_job_type) => {
1387 let original_column_catalogs =
1390 get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1391
1392 index_item_rewriter = Some({
1393 let original_columns = original_column_catalogs
1394 .to_protobuf()
1395 .into_iter()
1396 .map(|c| c.column_desc.unwrap())
1397 .collect_vec();
1398 let new_columns = table
1399 .columns
1400 .iter()
1401 .map(|c| c.column_desc.clone().unwrap())
1402 .collect_vec();
1403
1404 IndexItemRewriter {
1405 original_columns,
1406 new_columns,
1407 }
1408 });
1409
1410 for sink_id in updated_sink_catalogs {
1412 Sink::update(sink::ActiveModel {
1413 sink_id: Set(sink_id as _),
1414 original_target_columns: Set(Some(original_column_catalogs.clone())),
1415 ..Default::default()
1416 })
1417 .exec(txn)
1418 .await?;
1419 }
1420 let mut table = table::ActiveModel::from(table);
1422 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1423 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1424 {
1425 table.optional_associated_source_id = Set(None);
1427 }
1428
1429 Table::update(table).exec(txn).await?;
1430 }
1431 StreamingJob::Source(source) => {
1432 let source = source::ActiveModel::from(source);
1434 Source::update(source).exec(txn).await?;
1435 }
1436 StreamingJob::MaterializedView(table) => {
1437 let table = table::ActiveModel::from(table);
1439 Table::update(table).exec(txn).await?;
1440 }
1441 _ => unreachable!(
1442 "invalid streaming job type: {:?}",
1443 streaming_job.job_type_str()
1444 ),
1445 }
1446
1447 async fn finish_fragments(
1448 txn: &DatabaseTransaction,
1449 tmp_id: JobId,
1450 original_job_id: JobId,
1451 replace_upstream: FragmentReplaceUpstream,
1452 ) -> MetaResult<()> {
1453 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1457 .select_only()
1458 .columns([
1459 fragment::Column::FragmentId,
1460 fragment::Column::StateTableIds,
1461 ])
1462 .filter(fragment::Column::JobId.eq(tmp_id))
1463 .into_tuple()
1464 .all(txn)
1465 .await?;
1466 for (fragment_id, state_table_ids) in fragment_info {
1467 for state_table_id in state_table_ids.into_inner() {
1468 let state_table_id = TableId::new(state_table_id as _);
1469 Table::update(table::ActiveModel {
1470 table_id: Set(state_table_id),
1471 fragment_id: Set(Some(fragment_id)),
1472 ..Default::default()
1474 })
1475 .exec(txn)
1476 .await?;
1477 }
1478 }
1479
1480 Fragment::delete_many()
1482 .filter(fragment::Column::JobId.eq(original_job_id))
1483 .exec(txn)
1484 .await?;
1485 Fragment::update_many()
1486 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1487 .filter(fragment::Column::JobId.eq(tmp_id))
1488 .exec(txn)
1489 .await?;
1490
1491 for (fragment_id, fragment_replace_map) in replace_upstream {
1494 let (fragment_id, mut stream_node) =
1495 Fragment::find_by_id(fragment_id as FragmentId)
1496 .select_only()
1497 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1498 .into_tuple::<(FragmentId, StreamNode)>()
1499 .one(txn)
1500 .await?
1501 .map(|(id, node)| (id, node.to_protobuf()))
1502 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1503
1504 visit_stream_node_mut(&mut stream_node, |body| {
1505 if let PbNodeBody::Merge(m) = body
1506 && let Some(new_fragment_id) =
1507 fragment_replace_map.get(&m.upstream_fragment_id)
1508 {
1509 m.upstream_fragment_id = *new_fragment_id;
1510 }
1511 });
1512 Fragment::update(fragment::ActiveModel {
1513 fragment_id: Set(fragment_id),
1514 stream_node: Set(StreamNode::from(&stream_node)),
1515 ..Default::default()
1516 })
1517 .exec(txn)
1518 .await?;
1519 }
1520
1521 Object::delete_by_id(tmp_id).exec(txn).await?;
1523
1524 Ok(())
1525 }
1526
1527 finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1528
1529 let mut objects = vec![];
1531 match job_type {
1532 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1533 let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1534 .find_also_related(Object)
1535 .one(txn)
1536 .await?
1537 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1538 let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
1539 .one(txn)
1540 .await?;
1541 objects.push(PbObject {
1542 object_info: Some(PbObjectInfo::Table(
1543 ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
1544 )),
1545 })
1546 }
1547 StreamingJobType::Source => {
1548 let (source, source_obj) =
1549 Source::find_by_id(original_job_id.as_shared_source_id())
1550 .find_also_related(Object)
1551 .one(txn)
1552 .await?
1553 .ok_or_else(|| {
1554 MetaError::catalog_id_not_found("object", original_job_id)
1555 })?;
1556 objects.push(PbObject {
1557 object_info: Some(PbObjectInfo::Source(
1558 ObjectModel(source, source_obj.unwrap(), None).into(),
1559 )),
1560 })
1561 }
1562 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1563 }
1564
1565 if let Some(expr_rewriter) = index_item_rewriter {
1566 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1567 .select_only()
1568 .columns([index::Column::IndexId, index::Column::IndexItems])
1569 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1570 .into_tuple()
1571 .all(txn)
1572 .await?;
1573 for (index_id, nodes) in index_items {
1574 let mut pb_nodes = nodes.to_protobuf();
1575 pb_nodes
1576 .iter_mut()
1577 .for_each(|x| expr_rewriter.rewrite_expr(x));
1578 let index = index::ActiveModel {
1579 index_id: Set(index_id),
1580 index_items: Set(pb_nodes.into()),
1581 ..Default::default()
1582 }
1583 .update(txn)
1584 .await?;
1585 let (index_obj, streaming_job) = Object::find_by_id(index.index_id)
1586 .find_also_related(streaming_job::Entity)
1587 .one(txn)
1588 .await?
1589 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1590 objects.push(PbObject {
1591 object_info: Some(PbObjectInfo::Index(
1592 ObjectModel(index, index_obj, streaming_job).into(),
1593 )),
1594 });
1595 }
1596 }
1597
1598 if let Some(sinks) = auto_refresh_schema_sinks {
1599 for finish_sink_context in sinks {
1600 finish_fragments(
1601 txn,
1602 finish_sink_context.tmp_sink_id.as_job_id(),
1603 finish_sink_context.original_sink_id.as_job_id(),
1604 Default::default(),
1605 )
1606 .await?;
1607 let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1608 .find_also_related(Object)
1609 .one(txn)
1610 .await?
1611 .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1612 let sink_streaming_job =
1613 streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
1614 .one(txn)
1615 .await?;
1616 let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1617 Sink::update(sink::ActiveModel {
1618 sink_id: Set(finish_sink_context.original_sink_id),
1619 columns: Set(columns.clone()),
1620 ..Default::default()
1621 })
1622 .exec(txn)
1623 .await?;
1624 sink.columns = columns;
1625 objects.push(PbObject {
1626 object_info: Some(PbObjectInfo::Sink(
1627 ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job.clone()).into(),
1628 )),
1629 });
1630 if let Some((log_store_table_id, new_log_store_table_columns)) =
1631 finish_sink_context.new_log_store_table
1632 {
1633 let new_log_store_table_columns: ColumnCatalogArray =
1634 new_log_store_table_columns.into();
1635 let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1636 .find_also_related(Object)
1637 .one(txn)
1638 .await?
1639 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1640 Table::update(table::ActiveModel {
1641 table_id: Set(log_store_table_id),
1642 columns: Set(new_log_store_table_columns.clone()),
1643 ..Default::default()
1644 })
1645 .exec(txn)
1646 .await?;
1647 table.columns = new_log_store_table_columns;
1648 objects.push(PbObject {
1649 object_info: Some(PbObjectInfo::Table(
1650 ObjectModel(table, table_obj.unwrap(), sink_streaming_job.clone())
1651 .into(),
1652 )),
1653 });
1654 }
1655 }
1656 }
1657
1658 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1659 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1660 notification_objs =
1661 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1662 }
1663
1664 Ok((objects, notification_objs))
1665 }
1666
1667 pub async fn try_abort_replacing_streaming_job(
1669 &self,
1670 tmp_job_id: JobId,
1671 tmp_sink_ids: Option<Vec<ObjectId>>,
1672 ) -> MetaResult<()> {
1673 let inner = self.inner.write().await;
1674 let txn = inner.db.begin().await?;
1675 Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1676 if let Some(tmp_sink_ids) = tmp_sink_ids {
1677 for tmp_sink_id in tmp_sink_ids {
1678 Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1679 }
1680 }
1681 txn.commit().await?;
1682 Ok(())
1683 }
1684
1685 pub async fn update_source_rate_limit_by_source_id(
1688 &self,
1689 source_id: SourceId,
1690 rate_limit: Option<u32>,
1691 ) -> MetaResult<(HashSet<JobId>, HashSet<FragmentId>)> {
1692 let inner = self.inner.read().await;
1693 let txn = inner.db.begin().await?;
1694
1695 {
1696 let active_source = source::ActiveModel {
1697 source_id: Set(source_id),
1698 rate_limit: Set(rate_limit.map(|v| v as i32)),
1699 ..Default::default()
1700 };
1701 Source::update(active_source).exec(&txn).await?;
1702 }
1703
1704 let (source, obj) = Source::find_by_id(source_id)
1705 .find_also_related(Object)
1706 .one(&txn)
1707 .await?
1708 .ok_or_else(|| {
1709 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1710 })?;
1711
1712 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1713 let streaming_job_ids: Vec<JobId> =
1714 if let Some(table_id) = source.optional_associated_table_id {
1715 vec![table_id.as_job_id()]
1716 } else if let Some(source_info) = &source.source_info
1717 && source_info.to_protobuf().is_shared()
1718 {
1719 vec![source_id.as_share_source_job_id()]
1720 } else {
1721 ObjectDependency::find()
1722 .select_only()
1723 .column(object_dependency::Column::UsedBy)
1724 .filter(object_dependency::Column::Oid.eq(source_id))
1725 .into_tuple()
1726 .all(&txn)
1727 .await?
1728 };
1729
1730 if streaming_job_ids.is_empty() {
1731 return Err(MetaError::invalid_parameter(format!(
1732 "source id {source_id} not used by any streaming job"
1733 )));
1734 }
1735
1736 let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
1737 .select_only()
1738 .columns([
1739 fragment::Column::FragmentId,
1740 fragment::Column::JobId,
1741 fragment::Column::FragmentTypeMask,
1742 fragment::Column::StreamNode,
1743 ])
1744 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1745 .into_tuple()
1746 .all(&txn)
1747 .await?;
1748 let mut fragments = fragments
1749 .into_iter()
1750 .map(|(id, job_id, mask, stream_node)| {
1751 (
1752 id,
1753 job_id,
1754 FragmentTypeMask::from(mask as u32),
1755 stream_node.to_protobuf(),
1756 )
1757 })
1758 .collect_vec();
1759
1760 fragments.retain_mut(|(_, _, fragment_type_mask, stream_node)| {
1761 let mut found = false;
1762 if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1763 visit_stream_node_mut(stream_node, |node| {
1764 if let PbNodeBody::Source(node) = node
1765 && let Some(node_inner) = &mut node.source_inner
1766 && node_inner.source_id == source_id
1767 {
1768 node_inner.rate_limit = rate_limit;
1769 found = true;
1770 }
1771 });
1772 }
1773 if is_fs_source {
1774 visit_stream_node_mut(stream_node, |node| {
1777 if let PbNodeBody::StreamFsFetch(node) = node {
1778 fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1779 if let Some(node_inner) = &mut node.node_inner
1780 && node_inner.source_id == source_id
1781 {
1782 node_inner.rate_limit = rate_limit;
1783 found = true;
1784 }
1785 }
1786 });
1787 }
1788 found
1789 });
1790
1791 assert!(
1792 !fragments.is_empty(),
1793 "source id should be used by at least one fragment"
1794 );
1795
1796 let (fragment_ids, job_ids) = fragments
1797 .iter()
1798 .map(|(framgnet_id, job_id, _, _)| (framgnet_id, job_id))
1799 .unzip();
1800
1801 for (fragment_id, _, fragment_type_mask, stream_node) in fragments {
1802 Fragment::update(fragment::ActiveModel {
1803 fragment_id: Set(fragment_id),
1804 fragment_type_mask: Set(fragment_type_mask.into()),
1805 stream_node: Set(StreamNode::from(&stream_node)),
1806 ..Default::default()
1807 })
1808 .exec(&txn)
1809 .await?;
1810 }
1811
1812 txn.commit().await?;
1813
1814 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap(), None).into());
1815 let _version = self
1816 .notify_frontend(
1817 NotificationOperation::Update,
1818 NotificationInfo::ObjectGroup(PbObjectGroup {
1819 objects: vec![PbObject {
1820 object_info: Some(relation_info),
1821 }],
1822 dependencies: vec![],
1823 }),
1824 )
1825 .await;
1826
1827 Ok((job_ids, fragment_ids))
1828 }
1829
1830 pub async fn mutate_fragments_by_job_id(
1833 &self,
1834 job_id: JobId,
1835 mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1837 err_msg: &'static str,
1839 ) -> MetaResult<HashSet<FragmentId>> {
1840 let inner = self.inner.read().await;
1841 let txn = inner.db.begin().await?;
1842
1843 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1844 .select_only()
1845 .columns([
1846 fragment::Column::FragmentId,
1847 fragment::Column::FragmentTypeMask,
1848 fragment::Column::StreamNode,
1849 ])
1850 .filter(fragment::Column::JobId.eq(job_id))
1851 .into_tuple()
1852 .all(&txn)
1853 .await?;
1854 let mut fragments = fragments
1855 .into_iter()
1856 .map(|(id, mask, stream_node)| {
1857 (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1858 })
1859 .collect_vec();
1860
1861 let fragments = fragments
1862 .iter_mut()
1863 .map(|(_, fragment_type_mask, stream_node)| {
1864 fragments_mutation_fn(*fragment_type_mask, stream_node)
1865 })
1866 .collect::<MetaResult<Vec<bool>>>()?
1867 .into_iter()
1868 .zip_eq_debug(std::mem::take(&mut fragments))
1869 .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1870 .collect::<Vec<_>>();
1871
1872 if fragments.is_empty() {
1873 return Err(MetaError::invalid_parameter(format!(
1874 "job id {job_id}: {}",
1875 err_msg
1876 )));
1877 }
1878
1879 let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
1880 for (id, _, stream_node) in fragments {
1881 Fragment::update(fragment::ActiveModel {
1882 fragment_id: Set(id),
1883 stream_node: Set(StreamNode::from(&stream_node)),
1884 ..Default::default()
1885 })
1886 .exec(&txn)
1887 .await?;
1888 }
1889
1890 txn.commit().await?;
1891
1892 Ok(fragment_ids)
1893 }
1894
1895 async fn mutate_fragment_by_fragment_id(
1896 &self,
1897 fragment_id: FragmentId,
1898 mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1899 err_msg: &'static str,
1900 ) -> MetaResult<()> {
1901 let inner = self.inner.read().await;
1902 let txn = inner.db.begin().await?;
1903
1904 let (fragment_type_mask, stream_node): (i32, StreamNode) =
1905 Fragment::find_by_id(fragment_id)
1906 .select_only()
1907 .columns([
1908 fragment::Column::FragmentTypeMask,
1909 fragment::Column::StreamNode,
1910 ])
1911 .into_tuple()
1912 .one(&txn)
1913 .await?
1914 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1915 let mut pb_stream_node = stream_node.to_protobuf();
1916 let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1917
1918 if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1919 return Err(MetaError::invalid_parameter(format!(
1920 "fragment id {fragment_id}: {}",
1921 err_msg
1922 )));
1923 }
1924
1925 Fragment::update(fragment::ActiveModel {
1926 fragment_id: Set(fragment_id),
1927 stream_node: Set(stream_node),
1928 ..Default::default()
1929 })
1930 .exec(&txn)
1931 .await?;
1932
1933 txn.commit().await?;
1934
1935 Ok(())
1936 }
1937
1938 pub async fn update_backfill_orders_by_job_id(
1939 &self,
1940 job_id: JobId,
1941 backfill_orders: Option<BackfillOrders>,
1942 ) -> MetaResult<()> {
1943 let inner = self.inner.write().await;
1944 let txn = inner.db.begin().await?;
1945
1946 ensure_job_not_canceled(job_id, &txn).await?;
1947
1948 streaming_job::ActiveModel {
1949 job_id: Set(job_id),
1950 backfill_orders: Set(backfill_orders),
1951 ..Default::default()
1952 }
1953 .update(&txn)
1954 .await?;
1955
1956 txn.commit().await?;
1957
1958 Ok(())
1959 }
1960
1961 pub async fn update_backfill_rate_limit_by_job_id(
1964 &self,
1965 job_id: JobId,
1966 rate_limit: Option<u32>,
1967 ) -> MetaResult<HashSet<FragmentId>> {
1968 let update_backfill_rate_limit =
1969 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1970 let mut found = false;
1971 if fragment_type_mask
1972 .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1973 {
1974 visit_stream_node_mut(stream_node, |node| match node {
1975 PbNodeBody::StreamCdcScan(node) => {
1976 node.rate_limit = rate_limit;
1977 found = true;
1978 }
1979 PbNodeBody::StreamScan(node) => {
1980 node.rate_limit = rate_limit;
1981 found = true;
1982 }
1983 PbNodeBody::SourceBackfill(node) => {
1984 node.rate_limit = rate_limit;
1985 found = true;
1986 }
1987 _ => {}
1988 });
1989 }
1990 Ok(found)
1991 };
1992
1993 self.mutate_fragments_by_job_id(
1994 job_id,
1995 update_backfill_rate_limit,
1996 "stream scan node or source node not found",
1997 )
1998 .await
1999 }
2000
2001 pub async fn update_sink_rate_limit_by_job_id(
2004 &self,
2005 sink_id: SinkId,
2006 rate_limit: Option<u32>,
2007 ) -> MetaResult<HashSet<FragmentId>> {
2008 let update_sink_rate_limit =
2009 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2010 let mut found = Ok(false);
2011 if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
2012 visit_stream_node_mut(stream_node, |node| {
2013 if let PbNodeBody::Sink(node) = node {
2014 if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
2015 found = Err(MetaError::invalid_parameter(
2016 "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
2017 ));
2018 return;
2019 }
2020 node.rate_limit = rate_limit;
2021 found = Ok(true);
2022 }
2023 });
2024 }
2025 found
2026 };
2027
2028 self.mutate_fragments_by_job_id(
2029 sink_id.as_job_id(),
2030 update_sink_rate_limit,
2031 "sink node not found",
2032 )
2033 .await
2034 }
2035
2036 pub async fn update_dml_rate_limit_by_job_id(
2037 &self,
2038 job_id: JobId,
2039 rate_limit: Option<u32>,
2040 ) -> MetaResult<HashSet<FragmentId>> {
2041 let update_dml_rate_limit =
2042 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2043 let mut found = false;
2044 if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
2045 visit_stream_node_mut(stream_node, |node| {
2046 if let PbNodeBody::Dml(node) = node {
2047 node.rate_limit = rate_limit;
2048 found = true;
2049 }
2050 });
2051 }
2052 Ok(found)
2053 };
2054
2055 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
2056 .await
2057 }
2058
2059 pub async fn update_source_props_by_source_id(
2060 &self,
2061 source_id: SourceId,
2062 alter_props: BTreeMap<String, String>,
2063 alter_secret_refs: BTreeMap<String, PbSecretRef>,
2064 skip_alter_on_fly_check: bool,
2065 ) -> MetaResult<WithOptionsSecResolved> {
2066 let inner = self.inner.read().await;
2067 let txn = inner.db.begin().await?;
2068
2069 let (source, _obj) = Source::find_by_id(source_id)
2070 .find_also_related(Object)
2071 .one(&txn)
2072 .await?
2073 .ok_or_else(|| {
2074 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2075 })?;
2076 let connector = source.with_properties.0.get_connector().unwrap();
2077 let is_shared_source = source.is_shared();
2078
2079 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2080 if !is_shared_source {
2081 dep_source_job_ids = ObjectDependency::find()
2083 .select_only()
2084 .column(object_dependency::Column::UsedBy)
2085 .filter(object_dependency::Column::Oid.eq(source_id))
2086 .into_tuple()
2087 .all(&txn)
2088 .await?;
2089 }
2090
2091 if let Some(new_connector) = alter_props.get(UPSTREAM_SOURCE_KEY)
2093 && new_connector != &connector
2094 {
2095 return Err(MetaError::invalid_parameter(format!(
2096 "Cannot change connector type from '{}' to '{}'. Drop and recreate the source instead.",
2097 connector, new_connector
2098 )));
2099 }
2100
2101 if !skip_alter_on_fly_check {
2103 let prop_keys: Vec<String> = alter_props
2104 .keys()
2105 .chain(alter_secret_refs.keys())
2106 .cloned()
2107 .collect();
2108 risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
2109 &connector, &prop_keys,
2110 )?;
2111 }
2112
2113 let mut options_with_secret = WithOptionsSecResolved::new(
2114 source.with_properties.0.clone(),
2115 source
2116 .secret_ref
2117 .map(|secret_ref| secret_ref.to_protobuf())
2118 .unwrap_or_default(),
2119 );
2120 let (to_add_secret_dep, to_remove_secret_dep) =
2121 options_with_secret.handle_update(alter_props, alter_secret_refs)?;
2122
2123 tracing::info!(
2124 "applying new properties to source: source_id={}, options_with_secret={:?}",
2125 source_id,
2126 options_with_secret
2127 );
2128 let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
2130 let mut associate_table_id = None;
2133
2134 let mut preferred_id = source_id.as_object_id();
2138 let rewrite_sql = {
2139 let definition = source.definition.clone();
2140
2141 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2142 .map_err(|e| {
2143 MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2144 anyhow!(e).context("Failed to parse source definition SQL"),
2145 )))
2146 })?
2147 .try_into()
2148 .unwrap();
2149
2150 async fn format_with_option_secret_resolved(
2164 txn: &DatabaseTransaction,
2165 options_with_secret: &WithOptionsSecResolved,
2166 ) -> MetaResult<Vec<SqlOption>> {
2167 let mut options = Vec::new();
2168 for (k, v) in options_with_secret.as_plaintext() {
2169 let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2170 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2171 options.push(sql_option);
2172 }
2173 for (k, v) in options_with_secret.as_secret() {
2174 if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2175 let sql_option =
2176 SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2177 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2178 options.push(sql_option);
2179 } else {
2180 return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2181 }
2182 }
2183 Ok(options)
2184 }
2185
2186 match &mut stmt {
2187 Statement::CreateSource { stmt } => {
2188 stmt.with_properties.0 =
2189 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2190 }
2191 Statement::CreateTable { with_options, .. } => {
2192 *with_options =
2193 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2194 associate_table_id = source.optional_associated_table_id;
2195 preferred_id = associate_table_id.unwrap().as_object_id();
2196 }
2197 _ => unreachable!(),
2198 }
2199
2200 stmt.to_string()
2201 };
2202
2203 {
2204 if !to_add_secret_dep.is_empty() {
2207 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2208 object_dependency::ActiveModel {
2209 oid: Set(secret_id.into()),
2210 used_by: Set(preferred_id),
2211 ..Default::default()
2212 }
2213 }))
2214 .exec(&txn)
2215 .await?;
2216 }
2217 if !to_remove_secret_dep.is_empty() {
2220 let _ = ObjectDependency::delete_many()
2221 .filter(
2222 object_dependency::Column::Oid
2223 .is_in(to_remove_secret_dep)
2224 .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2225 )
2226 .exec(&txn)
2227 .await?;
2228 }
2229 }
2230
2231 let active_source_model = source::ActiveModel {
2232 source_id: Set(source_id),
2233 definition: Set(rewrite_sql.clone()),
2234 with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2235 secret_ref: Set((!options_with_secret.as_secret().is_empty())
2236 .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2237 ..Default::default()
2238 };
2239 Source::update(active_source_model).exec(&txn).await?;
2240
2241 if let Some(associate_table_id) = associate_table_id {
2242 let active_table_model = table::ActiveModel {
2244 table_id: Set(associate_table_id),
2245 definition: Set(rewrite_sql),
2246 ..Default::default()
2247 };
2248 Table::update(active_table_model).exec(&txn).await?;
2249 }
2250
2251 let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2252 associate_table_id.as_job_id()
2254 } else {
2255 source_id.as_share_source_job_id()
2256 }]
2257 .into_iter()
2258 .chain(dep_source_job_ids.into_iter())
2259 .collect_vec();
2260
2261 update_connector_props_fragments(
2263 &txn,
2264 to_check_job_ids,
2265 FragmentTypeFlag::Source,
2266 |node, found| {
2267 if let PbNodeBody::Source(node) = node
2268 && let Some(source_inner) = &mut node.source_inner
2269 {
2270 source_inner.with_properties = options_with_secret.as_plaintext().clone();
2271 source_inner.secret_refs = options_with_secret.as_secret().clone();
2272 *found = true;
2273 }
2274 },
2275 is_shared_source,
2276 )
2277 .await?;
2278
2279 let mut to_update_objs = Vec::with_capacity(2);
2280 let (source, obj) = Source::find_by_id(source_id)
2281 .find_also_related(Object)
2282 .one(&txn)
2283 .await?
2284 .ok_or_else(|| {
2285 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2286 })?;
2287 to_update_objs.push(PbObject {
2288 object_info: Some(PbObjectInfo::Source(
2289 ObjectModel(source, obj.unwrap(), None).into(),
2290 )),
2291 });
2292
2293 if let Some(associate_table_id) = associate_table_id {
2294 let (table, obj) = Table::find_by_id(associate_table_id)
2295 .find_also_related(Object)
2296 .one(&txn)
2297 .await?
2298 .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2299 let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2300 .one(&txn)
2301 .await?;
2302 to_update_objs.push(PbObject {
2303 object_info: Some(PbObjectInfo::Table(
2304 ObjectModel(table, obj.unwrap(), streaming_job).into(),
2305 )),
2306 });
2307 }
2308
2309 txn.commit().await?;
2310
2311 self.notify_frontend(
2312 NotificationOperation::Update,
2313 NotificationInfo::ObjectGroup(PbObjectGroup {
2314 objects: to_update_objs,
2315 dependencies: vec![],
2316 }),
2317 )
2318 .await;
2319
2320 Ok(options_with_secret)
2321 }
2322
2323 pub async fn update_sink_props_by_sink_id(
2324 &self,
2325 sink_id: SinkId,
2326 props: BTreeMap<String, String>,
2327 ) -> MetaResult<HashMap<String, String>> {
2328 let inner = self.inner.read().await;
2329 let txn = inner.db.begin().await?;
2330
2331 let (sink, _obj) = Sink::find_by_id(sink_id)
2332 .find_also_related(Object)
2333 .one(&txn)
2334 .await?
2335 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2336 validate_sink_props(&sink, &props)?;
2337 let definition = sink.definition.clone();
2338 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2339 .map_err(|e| SinkError::Config(anyhow!(e)))?
2340 .try_into()
2341 .unwrap();
2342 if let Statement::CreateSink { stmt } = &mut stmt {
2343 update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2344 } else {
2345 panic!("definition is not a create sink statement")
2346 }
2347 let mut new_config = sink.properties.clone().into_inner();
2348 new_config.extend(props.clone());
2349
2350 let definition = stmt.to_string();
2351 let active_sink = sink::ActiveModel {
2352 sink_id: Set(sink_id),
2353 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2354 definition: Set(definition),
2355 ..Default::default()
2356 };
2357 Sink::update(active_sink).exec(&txn).await?;
2358
2359 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2360 let (sink, obj) = Sink::find_by_id(sink_id)
2361 .find_also_related(Object)
2362 .one(&txn)
2363 .await?
2364 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2365 let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2366 .one(&txn)
2367 .await?;
2368 txn.commit().await?;
2369 let relation_infos = vec![PbObject {
2370 object_info: Some(PbObjectInfo::Sink(
2371 ObjectModel(sink, obj.unwrap(), streaming_job).into(),
2372 )),
2373 }];
2374
2375 let _version = self
2376 .notify_frontend(
2377 NotificationOperation::Update,
2378 NotificationInfo::ObjectGroup(PbObjectGroup {
2379 objects: relation_infos,
2380 dependencies: vec![],
2381 }),
2382 )
2383 .await;
2384
2385 Ok(props.into_iter().collect())
2386 }
2387
2388 pub async fn update_iceberg_table_props_by_table_id(
2389 &self,
2390 table_id: TableId,
2391 props: BTreeMap<String, String>,
2392 alter_iceberg_table_props: Option<
2393 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2394 >,
2395 ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2396 let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2397 ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2398 let inner = self.inner.read().await;
2399 let txn = inner.db.begin().await?;
2400
2401 let (sink, _obj) = Sink::find_by_id(sink_id)
2402 .find_also_related(Object)
2403 .one(&txn)
2404 .await?
2405 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2406 validate_sink_props(&sink, &props)?;
2407
2408 let definition = sink.definition.clone();
2409 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2410 .map_err(|e| SinkError::Config(anyhow!(e)))?
2411 .try_into()
2412 .unwrap();
2413 if let Statement::CreateTable {
2414 with_options,
2415 engine,
2416 ..
2417 } = &mut stmt
2418 {
2419 if !matches!(engine, Engine::Iceberg) {
2420 return Err(SinkError::Config(anyhow!(
2421 "only iceberg table can be altered as sink"
2422 ))
2423 .into());
2424 }
2425 update_stmt_with_props(with_options, &props)?;
2426 } else {
2427 panic!("definition is not a create iceberg table statement")
2428 }
2429 let mut new_config = sink.properties.clone().into_inner();
2430 new_config.extend(props.clone());
2431
2432 let definition = stmt.to_string();
2433 let active_sink = sink::ActiveModel {
2434 sink_id: Set(sink_id),
2435 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2436 definition: Set(definition.clone()),
2437 ..Default::default()
2438 };
2439 let active_source = source::ActiveModel {
2440 source_id: Set(source_id),
2441 definition: Set(definition.clone()),
2442 ..Default::default()
2443 };
2444 let active_table = table::ActiveModel {
2445 table_id: Set(table_id),
2446 definition: Set(definition),
2447 ..Default::default()
2448 };
2449 Sink::update(active_sink).exec(&txn).await?;
2450 Source::update(active_source).exec(&txn).await?;
2451 Table::update(active_table).exec(&txn).await?;
2452
2453 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2454
2455 let (sink, sink_obj) = Sink::find_by_id(sink_id)
2456 .find_also_related(Object)
2457 .one(&txn)
2458 .await?
2459 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2460 let sink_streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2461 .one(&txn)
2462 .await?;
2463 let (source, source_obj) = Source::find_by_id(source_id)
2464 .find_also_related(Object)
2465 .one(&txn)
2466 .await?
2467 .ok_or_else(|| {
2468 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2469 })?;
2470 let (table, table_obj) = Table::find_by_id(table_id)
2471 .find_also_related(Object)
2472 .one(&txn)
2473 .await?
2474 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2475 let table_streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2476 .one(&txn)
2477 .await?;
2478 txn.commit().await?;
2479 let relation_infos = vec![
2480 PbObject {
2481 object_info: Some(PbObjectInfo::Sink(
2482 ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job).into(),
2483 )),
2484 },
2485 PbObject {
2486 object_info: Some(PbObjectInfo::Source(
2487 ObjectModel(source, source_obj.unwrap(), None).into(),
2488 )),
2489 },
2490 PbObject {
2491 object_info: Some(PbObjectInfo::Table(
2492 ObjectModel(table, table_obj.unwrap(), table_streaming_job).into(),
2493 )),
2494 },
2495 ];
2496 let _version = self
2497 .notify_frontend(
2498 NotificationOperation::Update,
2499 NotificationInfo::ObjectGroup(PbObjectGroup {
2500 objects: relation_infos,
2501 dependencies: vec![],
2502 }),
2503 )
2504 .await;
2505
2506 Ok((props.into_iter().collect(), sink_id))
2507 }
2508
2509 pub async fn update_connection_and_dependent_objects_props(
2511 &self,
2512 connection_id: ConnectionId,
2513 alter_props: BTreeMap<String, String>,
2514 alter_secret_refs: BTreeMap<String, PbSecretRef>,
2515 ) -> MetaResult<(
2516 WithOptionsSecResolved, Vec<(SourceId, HashMap<String, String>)>, Vec<(SinkId, HashMap<String, String>)>, )> {
2520 let inner = self.inner.read().await;
2521 let txn = inner.db.begin().await?;
2522
2523 let dependent_sources: Vec<SourceId> = Source::find()
2525 .select_only()
2526 .column(source::Column::SourceId)
2527 .filter(source::Column::ConnectionId.eq(connection_id))
2528 .into_tuple()
2529 .all(&txn)
2530 .await?;
2531
2532 let dependent_sinks: Vec<SinkId> = Sink::find()
2533 .select_only()
2534 .column(sink::Column::SinkId)
2535 .filter(sink::Column::ConnectionId.eq(connection_id))
2536 .into_tuple()
2537 .all(&txn)
2538 .await?;
2539
2540 let (connection_catalog, _obj) = Connection::find_by_id(connection_id)
2541 .find_also_related(Object)
2542 .one(&txn)
2543 .await?
2544 .ok_or_else(|| {
2545 MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2546 })?;
2547
2548 let prop_keys: Vec<String> = alter_props
2550 .keys()
2551 .chain(alter_secret_refs.keys())
2552 .cloned()
2553 .collect();
2554
2555 let connection_type_str = pb_connection_type_to_connection_type(
2557 &connection_catalog.params.to_protobuf().connection_type(),
2558 )
2559 .ok_or_else(|| MetaError::invalid_parameter("Unspecified connection type"))?;
2560
2561 risingwave_connector::allow_alter_on_fly_fields::check_connection_allow_alter_on_fly_fields(
2562 connection_type_str, &prop_keys,
2563 )?;
2564
2565 let connection_pb = connection_catalog.params.to_protobuf();
2566 let mut connection_options_with_secret = WithOptionsSecResolved::new(
2567 connection_pb.properties.into_iter().collect(),
2568 connection_pb.secret_refs.into_iter().collect(),
2569 );
2570
2571 let (to_add_secret_dep, to_remove_secret_dep) = connection_options_with_secret
2572 .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2573
2574 tracing::debug!(
2575 "applying new properties to connection and dependents: connection_id={}, sources={:?}, sinks={:?}",
2576 connection_id,
2577 dependent_sources,
2578 dependent_sinks
2579 );
2580
2581 {
2583 let conn_params_pb = risingwave_pb::catalog::ConnectionParams {
2584 connection_type: connection_pb.connection_type,
2585 properties: connection_options_with_secret
2586 .as_plaintext()
2587 .clone()
2588 .into_iter()
2589 .collect(),
2590 secret_refs: connection_options_with_secret
2591 .as_secret()
2592 .clone()
2593 .into_iter()
2594 .collect(),
2595 };
2596 let connection = PbConnection {
2597 id: connection_id as _,
2598 info: Some(risingwave_pb::catalog::connection::Info::ConnectionParams(
2599 conn_params_pb,
2600 )),
2601 ..Default::default()
2602 };
2603 validate_connection(&connection).await?;
2604 }
2605
2606 if !to_add_secret_dep.is_empty() {
2608 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2609 object_dependency::ActiveModel {
2610 oid: Set(secret_id.into()),
2611 used_by: Set(connection_id.as_object_id()),
2612 ..Default::default()
2613 }
2614 }))
2615 .exec(&txn)
2616 .await?;
2617 }
2618 if !to_remove_secret_dep.is_empty() {
2619 let _ = ObjectDependency::delete_many()
2620 .filter(
2621 object_dependency::Column::Oid
2622 .is_in(to_remove_secret_dep)
2623 .and(object_dependency::Column::UsedBy.eq(connection_id.as_object_id())),
2624 )
2625 .exec(&txn)
2626 .await?;
2627 }
2628
2629 let updated_connection_params = risingwave_pb::catalog::ConnectionParams {
2631 connection_type: connection_pb.connection_type,
2632 properties: connection_options_with_secret
2633 .as_plaintext()
2634 .clone()
2635 .into_iter()
2636 .collect(),
2637 secret_refs: connection_options_with_secret
2638 .as_secret()
2639 .clone()
2640 .into_iter()
2641 .collect(),
2642 };
2643 let active_connection_model = connection::ActiveModel {
2644 connection_id: Set(connection_id),
2645 params: Set(ConnectionParams::from(&updated_connection_params)),
2646 ..Default::default()
2647 };
2648 Connection::update(active_connection_model)
2649 .exec(&txn)
2650 .await?;
2651
2652 let mut updated_sources_with_props: Vec<(SourceId, HashMap<String, String>)> = Vec::new();
2654
2655 if !dependent_sources.is_empty() {
2656 let sources_with_objs = Source::find()
2658 .find_also_related(Object)
2659 .filter(source::Column::SourceId.is_in(dependent_sources.iter().cloned()))
2660 .all(&txn)
2661 .await?;
2662
2663 let mut source_updates = Vec::new();
2665 let mut fragment_updates: Vec<DependentSourceFragmentUpdate> = Vec::new();
2666
2667 for (source, _obj) in sources_with_objs {
2668 let source_id = source.source_id;
2669
2670 let mut source_options_with_secret = WithOptionsSecResolved::new(
2671 source.with_properties.0.clone(),
2672 source
2673 .secret_ref
2674 .clone()
2675 .map(|secret_ref| secret_ref.to_protobuf())
2676 .unwrap_or_default(),
2677 );
2678 let (_source_to_add_secret_dep, _source_to_remove_secret_dep) =
2679 source_options_with_secret
2680 .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2681
2682 let _ = ConnectorProperties::extract(source_options_with_secret.clone(), true)?;
2684
2685 let active_source = source::ActiveModel {
2687 source_id: Set(source_id),
2688 with_properties: Set(Property(
2689 source_options_with_secret.as_plaintext().clone(),
2690 )),
2691 secret_ref: Set((!source_options_with_secret.as_secret().is_empty()).then(
2692 || {
2693 risingwave_meta_model::SecretRef::from(
2694 source_options_with_secret.as_secret().clone(),
2695 )
2696 },
2697 )),
2698 ..Default::default()
2699 };
2700 source_updates.push(active_source);
2701
2702 let is_shared_source = source.is_shared();
2707 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2708 if !is_shared_source {
2709 dep_source_job_ids = ObjectDependency::find()
2710 .select_only()
2711 .column(object_dependency::Column::UsedBy)
2712 .filter(object_dependency::Column::Oid.eq(source_id))
2713 .into_tuple()
2714 .all(&txn)
2715 .await?;
2716 }
2717
2718 let base_job_id =
2719 if let Some(associate_table_id) = source.optional_associated_table_id {
2720 associate_table_id.as_job_id()
2721 } else {
2722 source_id.as_share_source_job_id()
2723 };
2724 let job_ids = vec![base_job_id]
2725 .into_iter()
2726 .chain(dep_source_job_ids.into_iter())
2727 .collect_vec();
2728
2729 fragment_updates.push(DependentSourceFragmentUpdate {
2730 job_ids,
2731 with_properties: source_options_with_secret.as_plaintext().clone(),
2732 secret_refs: source_options_with_secret.as_secret().clone(),
2733 is_shared_source,
2734 });
2735
2736 let complete_source_props = LocalSecretManager::global()
2738 .fill_secrets(
2739 source_options_with_secret.as_plaintext().clone(),
2740 source_options_with_secret.as_secret().clone(),
2741 )
2742 .map_err(MetaError::from)?
2743 .into_iter()
2744 .collect::<HashMap<String, String>>();
2745 updated_sources_with_props.push((source_id, complete_source_props));
2746 }
2747
2748 for source_update in source_updates {
2749 Source::update(source_update).exec(&txn).await?;
2750 }
2751
2752 for DependentSourceFragmentUpdate {
2754 job_ids,
2755 with_properties,
2756 secret_refs,
2757 is_shared_source,
2758 } in fragment_updates
2759 {
2760 update_connector_props_fragments(
2761 &txn,
2762 job_ids,
2763 FragmentTypeFlag::Source,
2764 |node, found| {
2765 if let PbNodeBody::Source(node) = node
2766 && let Some(source_inner) = &mut node.source_inner
2767 {
2768 source_inner.with_properties = with_properties.clone();
2769 source_inner.secret_refs = secret_refs.clone();
2770 *found = true;
2771 }
2772 },
2773 is_shared_source,
2774 )
2775 .await?;
2776 }
2777 }
2778
2779 let mut updated_sinks_with_props: Vec<(SinkId, HashMap<String, String>)> = Vec::new();
2781
2782 if !dependent_sinks.is_empty() {
2783 let sinks_with_objs = Sink::find()
2785 .find_also_related(Object)
2786 .filter(sink::Column::SinkId.is_in(dependent_sinks.iter().cloned()))
2787 .all(&txn)
2788 .await?;
2789
2790 let mut sink_updates = Vec::new();
2792 let mut sink_fragment_updates = Vec::new();
2793
2794 for (sink, _obj) in sinks_with_objs {
2795 let sink_id = sink.sink_id;
2796
2797 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2799 Some(connector) => {
2800 let connector_type = connector.to_lowercase();
2801 check_sink_allow_alter_on_fly_fields(&connector_type, &prop_keys)
2802 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2803
2804 match_sink_name_str!(
2805 connector_type.as_str(),
2806 SinkType,
2807 {
2808 let mut new_sink_props = sink.properties.0.clone();
2809 new_sink_props.extend(alter_props.clone());
2810 SinkType::validate_alter_config(&new_sink_props)
2811 },
2812 |sink: &str| Err(SinkError::Config(anyhow!(
2813 "unsupported sink type {}",
2814 sink
2815 )))
2816 )?
2817 }
2818 None => {
2819 return Err(SinkError::Config(anyhow!(
2820 "connector not specified when alter sink"
2821 ))
2822 .into());
2823 }
2824 };
2825
2826 let mut new_sink_props = sink.properties.0.clone();
2827 new_sink_props.extend(alter_props.clone());
2828
2829 let active_sink = sink::ActiveModel {
2831 sink_id: Set(sink_id),
2832 properties: Set(risingwave_meta_model::Property(new_sink_props.clone())),
2833 ..Default::default()
2834 };
2835 sink_updates.push(active_sink);
2836
2837 sink_fragment_updates.push((sink_id, new_sink_props.clone()));
2839
2840 let complete_sink_props: HashMap<String, String> =
2842 new_sink_props.into_iter().collect();
2843 updated_sinks_with_props.push((sink_id, complete_sink_props));
2844 }
2845
2846 for sink_update in sink_updates {
2848 Sink::update(sink_update).exec(&txn).await?;
2849 }
2850
2851 for (sink_id, new_sink_props) in sink_fragment_updates {
2853 update_connector_props_fragments(
2854 &txn,
2855 vec![sink_id.as_job_id()],
2856 FragmentTypeFlag::Sink,
2857 |node, found| {
2858 if let PbNodeBody::Sink(node) = node
2859 && let Some(sink_desc) = &mut node.sink_desc
2860 && sink_desc.id == sink_id.as_raw_id()
2861 {
2862 sink_desc.properties = new_sink_props.clone();
2863 *found = true;
2864 }
2865 },
2866 true,
2867 )
2868 .await?;
2869 }
2870 }
2871
2872 let mut updated_objects = Vec::new();
2874
2875 let (connection, obj) = Connection::find_by_id(connection_id)
2877 .find_also_related(Object)
2878 .one(&txn)
2879 .await?
2880 .ok_or_else(|| {
2881 MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2882 })?;
2883 updated_objects.push(PbObject {
2884 object_info: Some(PbObjectInfo::Connection(
2885 ObjectModel(connection, obj.unwrap(), None).into(),
2886 )),
2887 });
2888
2889 for source_id in &dependent_sources {
2891 let (source, obj) = Source::find_by_id(*source_id)
2892 .find_also_related(Object)
2893 .one(&txn)
2894 .await?
2895 .ok_or_else(|| {
2896 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), *source_id)
2897 })?;
2898 updated_objects.push(PbObject {
2899 object_info: Some(PbObjectInfo::Source(
2900 ObjectModel(source, obj.unwrap(), None).into(),
2901 )),
2902 });
2903 }
2904
2905 for sink_id in &dependent_sinks {
2907 let (sink, obj) = Sink::find_by_id(*sink_id)
2908 .find_also_related(Object)
2909 .one(&txn)
2910 .await?
2911 .ok_or_else(|| {
2912 MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), *sink_id)
2913 })?;
2914 let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2915 .one(&txn)
2916 .await?;
2917 updated_objects.push(PbObject {
2918 object_info: Some(PbObjectInfo::Sink(
2919 ObjectModel(sink, obj.unwrap(), streaming_job).into(),
2920 )),
2921 });
2922 }
2923
2924 txn.commit().await?;
2926
2927 if !updated_objects.is_empty() {
2929 self.notify_frontend(
2930 NotificationOperation::Update,
2931 NotificationInfo::ObjectGroup(PbObjectGroup {
2932 objects: updated_objects,
2933 dependencies: vec![],
2934 }),
2935 )
2936 .await;
2937 }
2938
2939 Ok((
2940 connection_options_with_secret,
2941 updated_sources_with_props,
2942 updated_sinks_with_props,
2943 ))
2944 }
2945
2946 pub async fn update_fragment_rate_limit_by_fragment_id(
2947 &self,
2948 fragment_id: FragmentId,
2949 rate_limit: Option<u32>,
2950 ) -> MetaResult<()> {
2951 let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2952 stream_node: &mut PbStreamNode| {
2953 let mut found = false;
2954 if fragment_type_mask.contains_any(
2955 FragmentTypeFlag::dml_rate_limit_fragments()
2956 .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2957 .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2958 .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2959 ) {
2960 visit_stream_node_mut(stream_node, |node| {
2961 if let PbNodeBody::Dml(node) = node {
2962 node.rate_limit = rate_limit;
2963 found = true;
2964 }
2965 if let PbNodeBody::Sink(node) = node {
2966 node.rate_limit = rate_limit;
2967 found = true;
2968 }
2969 if let PbNodeBody::StreamCdcScan(node) = node {
2970 node.rate_limit = rate_limit;
2971 found = true;
2972 }
2973 if let PbNodeBody::StreamScan(node) = node {
2974 node.rate_limit = rate_limit;
2975 found = true;
2976 }
2977 if let PbNodeBody::SourceBackfill(node) = node {
2978 node.rate_limit = rate_limit;
2979 found = true;
2980 }
2981 });
2982 }
2983 found
2984 };
2985 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2986 .await
2987 }
2988
2989 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2992 let inner = self.inner.read().await;
2993 let txn = inner.db.begin().await?;
2994
2995 let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2996 .select_only()
2997 .columns([
2998 fragment::Column::FragmentId,
2999 fragment::Column::JobId,
3000 fragment::Column::FragmentTypeMask,
3001 fragment::Column::StreamNode,
3002 ])
3003 .filter(FragmentTypeMask::intersects_any(
3004 FragmentTypeFlag::rate_limit_fragments(),
3005 ))
3006 .into_tuple()
3007 .all(&txn)
3008 .await?;
3009
3010 let mut rate_limits = Vec::new();
3011 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
3012 let stream_node = stream_node.to_protobuf();
3013 visit_stream_node_body(&stream_node, |node| {
3014 let mut rate_limit = None;
3015 let mut node_name = None;
3016
3017 match node {
3018 PbNodeBody::Source(node) => {
3020 if let Some(node_inner) = &node.source_inner {
3021 rate_limit = node_inner.rate_limit;
3022 node_name = Some("SOURCE");
3023 }
3024 }
3025 PbNodeBody::StreamFsFetch(node) => {
3026 if let Some(node_inner) = &node.node_inner {
3027 rate_limit = node_inner.rate_limit;
3028 node_name = Some("FS_FETCH");
3029 }
3030 }
3031 PbNodeBody::SourceBackfill(node) => {
3033 rate_limit = node.rate_limit;
3034 node_name = Some("SOURCE_BACKFILL");
3035 }
3036 PbNodeBody::StreamScan(node) => {
3037 rate_limit = node.rate_limit;
3038 node_name = Some("STREAM_SCAN");
3039 }
3040 PbNodeBody::StreamCdcScan(node) => {
3041 rate_limit = node.rate_limit;
3042 node_name = Some("STREAM_CDC_SCAN");
3043 }
3044 PbNodeBody::Sink(node) => {
3045 rate_limit = node.rate_limit;
3046 node_name = Some("SINK");
3047 }
3048 _ => {}
3049 }
3050
3051 if let Some(rate_limit) = rate_limit {
3052 rate_limits.push(RateLimitInfo {
3053 fragment_id,
3054 job_id,
3055 fragment_type_mask: fragment_type_mask as u32,
3056 rate_limit,
3057 node_name: node_name.unwrap().to_owned(),
3058 });
3059 }
3060 });
3061 }
3062
3063 Ok(rate_limits)
3064 }
3065}
3066
3067fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
3068 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
3070 Some(connector) => {
3071 let connector_type = connector.to_lowercase();
3072 let field_names: Vec<String> = props.keys().cloned().collect();
3073 check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
3074 .map_err(|e| SinkError::Config(anyhow!(e)))?;
3075
3076 match_sink_name_str!(
3077 connector_type.as_str(),
3078 SinkType,
3079 {
3080 let mut new_props = sink.properties.0.clone();
3081 new_props.extend(props.clone());
3082 SinkType::validate_alter_config(&new_props)
3083 },
3084 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
3085 )?
3086 }
3087 None => {
3088 return Err(
3089 SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
3090 );
3091 }
3092 };
3093 Ok(())
3094}
3095
3096fn update_stmt_with_props(
3097 with_properties: &mut Vec<SqlOption>,
3098 props: &BTreeMap<String, String>,
3099) -> MetaResult<()> {
3100 let mut new_sql_options = with_properties
3101 .iter()
3102 .map(|sql_option| (&sql_option.name, sql_option))
3103 .collect::<IndexMap<_, _>>();
3104 let add_sql_options = props
3105 .iter()
3106 .map(|(k, v)| SqlOption::try_from((k, v)))
3107 .collect::<Result<Vec<SqlOption>, ParserError>>()
3108 .map_err(|e| SinkError::Config(anyhow!(e)))?;
3109 new_sql_options.extend(
3110 add_sql_options
3111 .iter()
3112 .map(|sql_option| (&sql_option.name, sql_option)),
3113 );
3114 *with_properties = new_sql_options.into_values().cloned().collect();
3115 Ok(())
3116}
3117
3118async fn update_sink_fragment_props(
3119 txn: &DatabaseTransaction,
3120 sink_id: SinkId,
3121 props: BTreeMap<String, String>,
3122) -> MetaResult<()> {
3123 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
3124 .select_only()
3125 .columns([
3126 fragment::Column::FragmentId,
3127 fragment::Column::FragmentTypeMask,
3128 fragment::Column::StreamNode,
3129 ])
3130 .filter(fragment::Column::JobId.eq(sink_id))
3131 .into_tuple()
3132 .all(txn)
3133 .await?;
3134 let fragments = fragments
3135 .into_iter()
3136 .filter(|(_, fragment_type_mask, _)| {
3137 *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
3138 })
3139 .filter_map(|(id, _, stream_node)| {
3140 let mut stream_node = stream_node.to_protobuf();
3141 let mut found = false;
3142 visit_stream_node_mut(&mut stream_node, |node| {
3143 if let PbNodeBody::Sink(node) = node
3144 && let Some(sink_desc) = &mut node.sink_desc
3145 && sink_desc.id == sink_id
3146 {
3147 sink_desc.properties.extend(props.clone());
3148 found = true;
3149 }
3150 });
3151 if found { Some((id, stream_node)) } else { None }
3152 })
3153 .collect_vec();
3154 assert!(
3155 !fragments.is_empty(),
3156 "sink id should be used by at least one fragment"
3157 );
3158 for (id, stream_node) in fragments {
3159 Fragment::update(fragment::ActiveModel {
3160 fragment_id: Set(id),
3161 stream_node: Set(StreamNode::from(&stream_node)),
3162 ..Default::default()
3163 })
3164 .exec(txn)
3165 .await?;
3166 }
3167 Ok(())
3168}
3169
3170pub struct SinkIntoTableContext {
3171 pub updated_sink_catalogs: Vec<SinkId>,
3174}
3175
3176pub struct FinishAutoRefreshSchemaSinkContext {
3177 pub tmp_sink_id: SinkId,
3178 pub original_sink_id: SinkId,
3179 pub columns: Vec<PbColumnCatalog>,
3180 pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
3181}
3182
3183async fn update_connector_props_fragments<F>(
3184 txn: &DatabaseTransaction,
3185 job_ids: Vec<JobId>,
3186 expect_flag: FragmentTypeFlag,
3187 mut alter_stream_node_fn: F,
3188 is_shared_source: bool,
3189) -> MetaResult<()>
3190where
3191 F: FnMut(&mut PbNodeBody, &mut bool),
3192{
3193 let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
3194 .select_only()
3195 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
3196 .filter(
3197 fragment::Column::JobId
3198 .is_in(job_ids.clone())
3199 .and(FragmentTypeMask::intersects(expect_flag)),
3200 )
3201 .into_tuple()
3202 .all(txn)
3203 .await?;
3204 let fragments = fragments
3205 .into_iter()
3206 .filter_map(|(id, stream_node)| {
3207 let mut stream_node = stream_node.to_protobuf();
3208 let mut found = false;
3209 visit_stream_node_mut(&mut stream_node, |node| {
3210 alter_stream_node_fn(node, &mut found);
3211 });
3212 if found { Some((id, stream_node)) } else { None }
3213 })
3214 .collect_vec();
3215 if is_shared_source || job_ids.len() > 1 {
3216 assert!(
3220 !fragments.is_empty(),
3221 "job ids {:?} (type: {:?}) should be used by at least one fragment",
3222 job_ids,
3223 expect_flag
3224 );
3225 }
3226
3227 for (id, stream_node) in fragments {
3228 Fragment::update(fragment::ActiveModel {
3229 fragment_id: Set(id),
3230 stream_node: Set(StreamNode::from(&stream_node)),
3231 ..Default::default()
3232 })
3233 .exec(txn)
3234 .await?;
3235 }
3236
3237 Ok(())
3238}