1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::secret::LocalSecretManager;
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_common::util::stream_graph_visitor::{
28 visit_stream_node_body, visit_stream_node_mut,
29};
30use risingwave_common::{bail, current_cluster_version};
31use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
32use risingwave_connector::connector_common::validate_connection;
33use risingwave_connector::error::ConnectorError;
34use risingwave_connector::sink::file_sink::fs::FsSink;
35use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
36use risingwave_connector::source::{ConnectorProperties, pb_connection_type_to_connection_type};
37use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
38use risingwave_meta_model::object::ObjectType;
39use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
40use risingwave_meta_model::refresh_job::RefreshState;
41use risingwave_meta_model::table::TableType;
42use risingwave_meta_model::user_privilege::Action;
43use risingwave_meta_model::*;
44use risingwave_pb::catalog::{PbConnection, PbCreateType, PbTable};
45use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
46use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
47use risingwave_pb::meta::object::PbObjectInfo;
48use risingwave_pb::meta::subscribe_response::{
49 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
50};
51use risingwave_pb::meta::{PbObject, PbObjectGroup};
52use risingwave_pb::plan_common::PbColumnCatalog;
53use risingwave_pb::plan_common::source_refresh_mode::{
54 RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
55};
56use risingwave_pb::secret::PbSecretRef;
57use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
58use risingwave_pb::stream_plan::stream_node::PbNodeBody;
59use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
60use risingwave_pb::user::PbUserInfo;
61use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
62use risingwave_sqlparser::parser::{Parser, ParserError};
63use sea_orm::ActiveValue::Set;
64use sea_orm::sea_query::{Expr, Query, SimpleExpr};
65use sea_orm::{
66 ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
67 ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
68};
69use thiserror_ext::AsReport;
70
71use super::rename::IndexItemRewriter;
72use crate::barrier::Command;
73use crate::controller::ObjectModel;
74use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
75use crate::controller::fragment::FragmentTypeMaskExt;
76use crate::controller::utils::{
77 PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
78 check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
79 ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
80 get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
81 list_user_info_by_ids, try_get_iceberg_table_by_downstream_sink,
82};
83use crate::error::MetaErrorInner;
84use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
85use crate::model::{
86 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
87 StreamJobFragmentsToCreate,
88};
89use crate::stream::SplitAssignment;
90use crate::{MetaError, MetaResult};
91
92#[derive(Debug)]
97struct DependentSourceFragmentUpdate {
98 job_ids: Vec<JobId>,
99 with_properties: BTreeMap<String, String>,
100 secret_refs: BTreeMap<String, PbSecretRef>,
101 is_shared_source: bool,
102}
103
104impl CatalogController {
105 #[allow(clippy::too_many_arguments)]
106 pub async fn create_streaming_job_obj(
107 txn: &DatabaseTransaction,
108 obj_type: ObjectType,
109 owner_id: UserId,
110 database_id: Option<DatabaseId>,
111 schema_id: Option<SchemaId>,
112 create_type: PbCreateType,
113 ctx: StreamContext,
114 streaming_parallelism: StreamingParallelism,
115 max_parallelism: usize,
116 specific_resource_group: Option<String>, backfill_parallelism: Option<StreamingParallelism>,
118 ) -> MetaResult<JobId> {
119 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
120 let job_id = obj.oid.as_job_id();
121 let job = streaming_job::ActiveModel {
122 job_id: Set(job_id),
123 job_status: Set(JobStatus::Initial),
124 create_type: Set(create_type.into()),
125 timezone: Set(ctx.timezone),
126 config_override: Set(Some(ctx.config_override.to_string())),
127 parallelism: Set(streaming_parallelism),
128 backfill_parallelism: Set(backfill_parallelism),
129 max_parallelism: Set(max_parallelism as _),
130 specific_resource_group: Set(specific_resource_group),
131 };
132 job.insert(txn).await?;
133
134 Ok(job_id)
135 }
136
137 #[await_tree::instrument]
143 pub async fn create_job_catalog(
144 &self,
145 streaming_job: &mut StreamingJob,
146 ctx: &StreamContext,
147 parallelism: &Option<Parallelism>,
148 max_parallelism: usize,
149 mut dependencies: HashSet<ObjectId>,
150 specific_resource_group: Option<String>,
151 backfill_parallelism: &Option<Parallelism>,
152 ) -> MetaResult<()> {
153 let inner = self.inner.write().await;
154 let txn = inner.db.begin().await?;
155 let create_type = streaming_job.create_type();
156
157 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
158 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
159 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
160 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
161 };
162 let backfill_parallelism = backfill_parallelism
163 .as_ref()
164 .map(|p| StreamingParallelism::Fixed(p.parallelism as _));
165
166 ensure_user_id(streaming_job.owner() as _, &txn).await?;
167 ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
168 ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
169 check_relation_name_duplicate(
170 &streaming_job.name(),
171 streaming_job.database_id(),
172 streaming_job.schema_id(),
173 &txn,
174 )
175 .await?;
176
177 if !dependencies.is_empty() {
179 let altering_cnt = ObjectDependency::find()
180 .join(
181 JoinType::InnerJoin,
182 object_dependency::Relation::Object1.def(),
183 )
184 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
185 .filter(
186 object_dependency::Column::Oid
187 .is_in(dependencies.clone())
188 .and(object::Column::ObjType.eq(ObjectType::Table))
189 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
190 .and(
191 object::Column::Oid.not_in_subquery(
193 Query::select()
194 .column(table::Column::TableId)
195 .from(Table)
196 .to_owned(),
197 ),
198 ),
199 )
200 .count(&txn)
201 .await?;
202 if altering_cnt != 0 {
203 return Err(MetaError::permission_denied(
204 "some dependent relations are being altered",
205 ));
206 }
207 }
208
209 match streaming_job {
210 StreamingJob::MaterializedView(table) => {
211 let job_id = Self::create_streaming_job_obj(
212 &txn,
213 ObjectType::Table,
214 table.owner as _,
215 Some(table.database_id),
216 Some(table.schema_id),
217 create_type,
218 ctx.clone(),
219 streaming_parallelism,
220 max_parallelism,
221 specific_resource_group,
222 backfill_parallelism.clone(),
223 )
224 .await?;
225 table.id = job_id.as_mv_table_id();
226 let table_model: table::ActiveModel = table.clone().into();
227 Table::insert(table_model).exec(&txn).await?;
228 }
229 StreamingJob::Sink(sink) => {
230 if let Some(target_table_id) = sink.target_table
231 && check_sink_into_table_cycle(
232 target_table_id.into(),
233 dependencies.iter().cloned().collect(),
234 &txn,
235 )
236 .await?
237 {
238 bail!("Creating such a sink will result in circular dependency.");
239 }
240
241 let job_id = Self::create_streaming_job_obj(
242 &txn,
243 ObjectType::Sink,
244 sink.owner as _,
245 Some(sink.database_id),
246 Some(sink.schema_id),
247 create_type,
248 ctx.clone(),
249 streaming_parallelism,
250 max_parallelism,
251 specific_resource_group,
252 backfill_parallelism.clone(),
253 )
254 .await?;
255 sink.id = job_id.as_sink_id();
256 let sink_model: sink::ActiveModel = sink.clone().into();
257 Sink::insert(sink_model).exec(&txn).await?;
258 }
259 StreamingJob::Table(src, table, _) => {
260 let job_id = Self::create_streaming_job_obj(
261 &txn,
262 ObjectType::Table,
263 table.owner as _,
264 Some(table.database_id),
265 Some(table.schema_id),
266 create_type,
267 ctx.clone(),
268 streaming_parallelism,
269 max_parallelism,
270 specific_resource_group,
271 backfill_parallelism.clone(),
272 )
273 .await?;
274 table.id = job_id.as_mv_table_id();
275 if let Some(src) = src {
276 let src_obj = Self::create_object(
277 &txn,
278 ObjectType::Source,
279 src.owner as _,
280 Some(src.database_id),
281 Some(src.schema_id),
282 )
283 .await?;
284 src.id = src_obj.oid.as_source_id();
285 src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
286 table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
287 let source: source::ActiveModel = src.clone().into();
288 Source::insert(source).exec(&txn).await?;
289 }
290 let table_model: table::ActiveModel = table.clone().into();
291 Table::insert(table_model).exec(&txn).await?;
292
293 if table.refreshable {
294 let trigger_interval_secs = src
295 .as_ref()
296 .and_then(|source_catalog| source_catalog.refresh_mode)
297 .and_then(
298 |source_refresh_mode| match source_refresh_mode.refresh_mode {
299 Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
300 refresh_interval_sec,
301 })) => refresh_interval_sec,
302 Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
303 None => None,
304 },
305 );
306
307 RefreshJob::insert(refresh_job::ActiveModel {
308 table_id: Set(table.id),
309 last_trigger_time: Set(None),
310 trigger_interval_secs: Set(trigger_interval_secs),
311 current_status: Set(RefreshState::Idle),
312 last_success_time: Set(None),
313 })
314 .exec(&txn)
315 .await?;
316 }
317 }
318 StreamingJob::Index(index, table) => {
319 ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
320 let job_id = Self::create_streaming_job_obj(
321 &txn,
322 ObjectType::Index,
323 index.owner as _,
324 Some(index.database_id),
325 Some(index.schema_id),
326 create_type,
327 ctx.clone(),
328 streaming_parallelism,
329 max_parallelism,
330 specific_resource_group,
331 backfill_parallelism.clone(),
332 )
333 .await?;
334 index.id = job_id.as_index_id();
336 index.index_table_id = job_id.as_mv_table_id();
337 table.id = job_id.as_mv_table_id();
338
339 ObjectDependency::insert(object_dependency::ActiveModel {
340 oid: Set(index.primary_table_id.into()),
341 used_by: Set(table.id.into()),
342 ..Default::default()
343 })
344 .exec(&txn)
345 .await?;
346
347 let table_model: table::ActiveModel = table.clone().into();
348 Table::insert(table_model).exec(&txn).await?;
349 let index_model: index::ActiveModel = index.clone().into();
350 Index::insert(index_model).exec(&txn).await?;
351 }
352 StreamingJob::Source(src) => {
353 let job_id = Self::create_streaming_job_obj(
354 &txn,
355 ObjectType::Source,
356 src.owner as _,
357 Some(src.database_id),
358 Some(src.schema_id),
359 create_type,
360 ctx.clone(),
361 streaming_parallelism,
362 max_parallelism,
363 specific_resource_group,
364 backfill_parallelism.clone(),
365 )
366 .await?;
367 src.id = job_id.as_shared_source_id();
368 let source_model: source::ActiveModel = src.clone().into();
369 Source::insert(source_model).exec(&txn).await?;
370 }
371 }
372
373 dependencies.extend(
375 streaming_job
376 .dependent_secret_ids()?
377 .into_iter()
378 .map(|id| id.as_object_id()),
379 );
380 dependencies.extend(
382 streaming_job
383 .dependent_connection_ids()?
384 .into_iter()
385 .map(|id| id.as_object_id()),
386 );
387
388 if !dependencies.is_empty() {
390 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
391 object_dependency::ActiveModel {
392 oid: Set(oid),
393 used_by: Set(streaming_job.id().as_object_id()),
394 ..Default::default()
395 }
396 }))
397 .exec(&txn)
398 .await?;
399 }
400
401 txn.commit().await?;
402
403 Ok(())
404 }
405
406 pub async fn create_internal_table_catalog(
414 &self,
415 job: &StreamingJob,
416 mut incomplete_internal_tables: Vec<PbTable>,
417 ) -> MetaResult<HashMap<TableId, TableId>> {
418 let job_id = job.id();
419 let inner = self.inner.write().await;
420 let txn = inner.db.begin().await?;
421
422 ensure_job_not_canceled(job_id, &txn).await?;
424
425 let mut table_id_map = HashMap::new();
426 for table in &mut incomplete_internal_tables {
427 let table_id = Self::create_object(
428 &txn,
429 ObjectType::Table,
430 table.owner as _,
431 Some(table.database_id),
432 Some(table.schema_id),
433 )
434 .await?
435 .oid
436 .as_table_id();
437 table_id_map.insert(table.id, table_id);
438 table.id = table_id;
439 table.job_id = Some(job_id);
440
441 let table_model = table::ActiveModel {
442 table_id: Set(table_id),
443 belongs_to_job_id: Set(Some(job_id)),
444 fragment_id: NotSet,
445 ..table.clone().into()
446 };
447 Table::insert(table_model).exec(&txn).await?;
448 }
449 txn.commit().await?;
450
451 Ok(table_id_map)
452 }
453
454 pub async fn prepare_stream_job_fragments(
455 &self,
456 stream_job_fragments: &StreamJobFragmentsToCreate,
457 streaming_job: &StreamingJob,
458 for_replace: bool,
459 ) -> MetaResult<()> {
460 self.prepare_streaming_job(
461 stream_job_fragments.stream_job_id(),
462 || stream_job_fragments.fragments.values(),
463 &stream_job_fragments.downstreams,
464 for_replace,
465 Some(streaming_job),
466 )
467 .await
468 }
469
470 #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
475 )]
476 pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
477 &self,
478 job_id: JobId,
479 get_fragments: impl Fn() -> I + 'a,
480 downstreams: &FragmentDownstreamRelation,
481 for_replace: bool,
482 creating_streaming_job: Option<&'a StreamingJob>,
483 ) -> MetaResult<()> {
484 let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
485
486 let inner = self.inner.write().await;
487
488 let need_notify = creating_streaming_job
489 .map(|job| job.should_notify_creating())
490 .unwrap_or(false);
491 let definition = creating_streaming_job.map(|job| job.definition());
492
493 let mut objects_to_notify = vec![];
494 let txn = inner.db.begin().await?;
495
496 ensure_job_not_canceled(job_id, &txn).await?;
498
499 for fragment in fragments {
500 let fragment_id = fragment.fragment_id;
501 let state_table_ids = fragment.state_table_ids.inner_ref().clone();
502
503 let fragment = fragment.into_active_model();
504 Fragment::insert(fragment).exec(&txn).await?;
505
506 if !for_replace {
509 let all_tables = StreamJobFragments::collect_tables(get_fragments());
510 for state_table_id in state_table_ids {
511 let table = all_tables
515 .get(&state_table_id)
516 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
517 assert_eq!(table.id, state_table_id);
518 assert_eq!(table.fragment_id, fragment_id);
519 let vnode_count = table.vnode_count();
520
521 table::ActiveModel {
522 table_id: Set(state_table_id as _),
523 fragment_id: Set(Some(fragment_id)),
524 vnode_count: Set(vnode_count as _),
525 ..Default::default()
526 }
527 .update(&txn)
528 .await?;
529
530 if need_notify {
531 let mut table = table.clone();
532 if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
534 table.definition = definition.clone().unwrap();
535 }
536 objects_to_notify.push(PbObject {
537 object_info: Some(PbObjectInfo::Table(table)),
538 });
539 }
540 }
541 }
542 }
543
544 if need_notify {
546 match creating_streaming_job.unwrap() {
547 StreamingJob::Table(Some(source), ..) => {
548 objects_to_notify.push(PbObject {
549 object_info: Some(PbObjectInfo::Source(source.clone())),
550 });
551 }
552 StreamingJob::Sink(sink) => {
553 objects_to_notify.push(PbObject {
554 object_info: Some(PbObjectInfo::Sink(sink.clone())),
555 });
556 }
557 StreamingJob::Index(index, _) => {
558 objects_to_notify.push(PbObject {
559 object_info: Some(PbObjectInfo::Index(index.clone())),
560 });
561 }
562 _ => {}
563 }
564 }
565
566 insert_fragment_relations(&txn, downstreams).await?;
567
568 if !for_replace {
569 if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
571 Table::update(table::ActiveModel {
572 table_id: Set(table.id),
573 dml_fragment_id: Set(table.dml_fragment_id),
574 ..Default::default()
575 })
576 .exec(&txn)
577 .await?;
578 }
579 }
580
581 txn.commit().await?;
582
583 if !objects_to_notify.is_empty() {
587 self.notify_frontend(
588 Operation::Add,
589 Info::ObjectGroup(PbObjectGroup {
590 objects: objects_to_notify,
591 }),
592 )
593 .await;
594 }
595
596 Ok(())
597 }
598
599 pub async fn build_cancel_command(
602 &self,
603 table_fragments: &StreamJobFragments,
604 ) -> MetaResult<Command> {
605 let inner = self.inner.read().await;
606 let txn = inner.db.begin().await?;
607
608 let dropped_sink_fragment_with_target =
609 if let Some(sink_fragment) = table_fragments.sink_fragment() {
610 let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
611 let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
612 sink_target_fragment
613 .get(&sink_fragment_id)
614 .map(|target_fragments| {
615 let target_fragment_id = *target_fragments
616 .first()
617 .expect("sink should have at least one downstream fragment");
618 (sink_fragment_id, target_fragment_id)
619 })
620 } else {
621 None
622 };
623
624 Ok(Command::DropStreamingJobs {
625 streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
626 actors: table_fragments.actor_ids().collect(),
627 unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
628 unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
629 dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
630 .into_iter()
631 .map(|(sink, target)| (target as _, vec![sink as _]))
632 .collect(),
633 })
634 }
635
636 #[await_tree::instrument]
640 pub async fn try_abort_creating_streaming_job(
641 &self,
642 mut job_id: JobId,
643 is_cancelled: bool,
644 ) -> MetaResult<(bool, Option<DatabaseId>)> {
645 let mut inner = self.inner.write().await;
646 let txn = inner.db.begin().await?;
647
648 let obj = Object::find_by_id(job_id).one(&txn).await?;
649 let Some(obj) = obj else {
650 tracing::warn!(
651 id = %job_id,
652 "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
653 );
654 return Ok((true, None));
655 };
656 let database_id = obj
657 .database_id
658 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
659 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
660
661 if !is_cancelled && let Some(streaming_job) = &streaming_job {
662 assert_ne!(streaming_job.job_status, JobStatus::Created);
663 if streaming_job.create_type == CreateType::Background
664 && streaming_job.job_status == JobStatus::Creating
665 {
666 if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
667 && check_if_belongs_to_iceberg_table(&txn, job_id).await?
668 {
669 } else {
671 tracing::warn!(
673 id = %job_id,
674 "streaming job is created in background and still in creating status"
675 );
676 return Ok((false, Some(database_id)));
677 }
678 }
679 }
680
681 let original_job_id = job_id;
683 let original_obj_type = obj.obj_type;
684
685 let iceberg_table_id =
686 try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
687 if let Some(iceberg_table_id) = iceberg_table_id {
688 let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
691 Object::delete_many()
692 .filter(
693 object::Column::Oid
694 .eq(job_id)
695 .or(object::Column::Oid.is_in(internal_tables)),
696 )
697 .exec(&txn)
698 .await?;
699 job_id = iceberg_table_id.as_job_id();
700 };
701
702 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
703
704 let mut objs = vec![];
706 let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
707
708 let mut need_notify =
709 streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
710 if !need_notify {
711 if let Some(table) = &table_obj {
712 need_notify = table.table_type == TableType::MaterializedView;
713 } else if original_obj_type == ObjectType::Sink {
714 need_notify = true;
715 }
716 }
717
718 if is_cancelled {
719 let dropped_tables = Table::find()
720 .find_also_related(Object)
721 .filter(
722 table::Column::TableId.is_in(
723 internal_table_ids
724 .iter()
725 .cloned()
726 .chain(table_obj.iter().map(|t| t.table_id as _)),
727 ),
728 )
729 .all(&txn)
730 .await?
731 .into_iter()
732 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
733 inner
734 .dropped_tables
735 .extend(dropped_tables.map(|t| (t.id, t)));
736 }
737
738 if need_notify {
739 if original_obj_type == ObjectType::Sink && original_job_id != job_id {
742 let orig_obj: Option<PartialObject> = Object::find_by_id(original_job_id)
743 .select_only()
744 .columns([
745 object::Column::Oid,
746 object::Column::ObjType,
747 object::Column::SchemaId,
748 object::Column::DatabaseId,
749 ])
750 .into_partial_model()
751 .one(&txn)
752 .await?;
753 if let Some(orig_obj) = orig_obj {
754 objs.push(orig_obj);
755 }
756 }
757
758 let obj: Option<PartialObject> = Object::find_by_id(job_id)
759 .select_only()
760 .columns([
761 object::Column::Oid,
762 object::Column::ObjType,
763 object::Column::SchemaId,
764 object::Column::DatabaseId,
765 ])
766 .into_partial_model()
767 .one(&txn)
768 .await?;
769 let obj =
770 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
771 objs.push(obj);
772 let internal_table_objs: Vec<PartialObject> = Object::find()
773 .select_only()
774 .columns([
775 object::Column::Oid,
776 object::Column::ObjType,
777 object::Column::SchemaId,
778 object::Column::DatabaseId,
779 ])
780 .join(JoinType::InnerJoin, object::Relation::Table.def())
781 .filter(table::Column::BelongsToJobId.eq(job_id))
782 .into_partial_model()
783 .all(&txn)
784 .await?;
785 objs.extend(internal_table_objs);
786 }
787
788 if table_obj.is_none()
790 && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_sink_id())
791 .select_only()
792 .column(sink::Column::TargetTable)
793 .into_tuple::<Option<TableId>>()
794 .one(&txn)
795 .await?
796 {
797 let tmp_id: Option<ObjectId> = ObjectDependency::find()
798 .select_only()
799 .column(object_dependency::Column::UsedBy)
800 .join(
801 JoinType::InnerJoin,
802 object_dependency::Relation::Object1.def(),
803 )
804 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
805 .filter(
806 object_dependency::Column::Oid
807 .eq(target_table_id)
808 .and(object::Column::ObjType.eq(ObjectType::Table))
809 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
810 )
811 .into_tuple()
812 .one(&txn)
813 .await?;
814 if let Some(tmp_id) = tmp_id {
815 tracing::warn!(
816 id = %tmp_id,
817 "aborting temp streaming job for sink into table"
818 );
819
820 Object::delete_by_id(tmp_id).exec(&txn).await?;
821 }
822 }
823
824 Object::delete_by_id(job_id).exec(&txn).await?;
825 if !internal_table_ids.is_empty() {
826 Object::delete_many()
827 .filter(object::Column::Oid.is_in(internal_table_ids))
828 .exec(&txn)
829 .await?;
830 }
831 if let Some(t) = &table_obj
832 && let Some(source_id) = t.optional_associated_source_id
833 {
834 Object::delete_by_id(source_id).exec(&txn).await?;
835 }
836
837 let err = if is_cancelled {
838 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
839 } else {
840 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
841 };
842 let abort_reason = format!("streaming job aborted {}", err.as_report());
843 for tx in inner
844 .creating_table_finish_notifier
845 .get_mut(&database_id)
846 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
847 .into_iter()
848 .flatten()
849 .flatten()
850 {
851 let _ = tx.send(Err(abort_reason.clone()));
852 }
853 txn.commit().await?;
854
855 if !objs.is_empty() {
856 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
859 .await;
860 }
861 Ok((true, Some(database_id)))
862 }
863
864 #[await_tree::instrument]
865 pub async fn post_collect_job_fragments(
866 &self,
867 job_id: JobId,
868 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
869 new_sink_downstream: Option<FragmentDownstreamRelation>,
870 split_assignment: Option<&SplitAssignment>,
871 ) -> MetaResult<()> {
872 let inner = self.inner.write().await;
873 let txn = inner.db.begin().await?;
874
875 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
876
877 if let Some(new_downstream) = new_sink_downstream {
878 insert_fragment_relations(&txn, &new_downstream).await?;
879 }
880
881 streaming_job::ActiveModel {
883 job_id: Set(job_id),
884 job_status: Set(JobStatus::Creating),
885 ..Default::default()
886 }
887 .update(&txn)
888 .await?;
889
890 if let Some(split_assignment) = split_assignment {
891 let fragment_splits = split_assignment
892 .iter()
893 .map(|(fragment_id, splits)| {
894 (
895 *fragment_id as _,
896 splits.values().flatten().cloned().collect_vec(),
897 )
898 })
899 .collect();
900
901 self.update_fragment_splits(&txn, &fragment_splits).await?;
902 }
903
904 txn.commit().await?;
905
906 Ok(())
907 }
908
909 pub async fn create_job_catalog_for_replace(
910 &self,
911 streaming_job: &StreamingJob,
912 ctx: Option<&StreamContext>,
913 specified_parallelism: Option<&NonZeroUsize>,
914 expected_original_max_parallelism: Option<usize>,
915 ) -> MetaResult<JobId> {
916 let id = streaming_job.id();
917 let inner = self.inner.write().await;
918 let txn = inner.db.begin().await?;
919
920 streaming_job.verify_version_for_replace(&txn).await?;
922 let referring_cnt = ObjectDependency::find()
924 .join(
925 JoinType::InnerJoin,
926 object_dependency::Relation::Object1.def(),
927 )
928 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
929 .filter(
930 object_dependency::Column::Oid
931 .eq(id)
932 .and(object::Column::ObjType.eq(ObjectType::Table))
933 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
934 )
935 .count(&txn)
936 .await?;
937 if referring_cnt != 0 {
938 return Err(MetaError::permission_denied(
939 "job is being altered or referenced by some creating jobs",
940 ));
941 }
942
943 let (original_max_parallelism, original_timezone, original_config_override): (
945 i32,
946 Option<String>,
947 Option<String>,
948 ) = StreamingJobModel::find_by_id(id)
949 .select_only()
950 .column(streaming_job::Column::MaxParallelism)
951 .column(streaming_job::Column::Timezone)
952 .column(streaming_job::Column::ConfigOverride)
953 .into_tuple()
954 .one(&txn)
955 .await?
956 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
957
958 if let Some(max_parallelism) = expected_original_max_parallelism
959 && original_max_parallelism != max_parallelism as i32
960 {
961 bail!(
964 "cannot use a different max parallelism \
965 when replacing streaming job, \
966 original: {}, new: {}",
967 original_max_parallelism,
968 max_parallelism
969 );
970 }
971
972 let parallelism = match specified_parallelism {
973 None => StreamingParallelism::Adaptive,
974 Some(n) => StreamingParallelism::Fixed(n.get() as _),
975 };
976
977 let ctx = StreamContext {
978 timezone: ctx
979 .map(|ctx| ctx.timezone.clone())
980 .unwrap_or(original_timezone),
981 config_override: original_config_override.unwrap_or_default().into(),
984 };
985
986 let new_obj_id = Self::create_streaming_job_obj(
988 &txn,
989 streaming_job.object_type(),
990 streaming_job.owner() as _,
991 Some(streaming_job.database_id() as _),
992 Some(streaming_job.schema_id() as _),
993 streaming_job.create_type(),
994 ctx,
995 parallelism,
996 original_max_parallelism as _,
997 None,
998 None,
999 )
1000 .await?;
1001
1002 ObjectDependency::insert(object_dependency::ActiveModel {
1004 oid: Set(id.as_object_id()),
1005 used_by: Set(new_obj_id.as_object_id()),
1006 ..Default::default()
1007 })
1008 .exec(&txn)
1009 .await?;
1010
1011 txn.commit().await?;
1012
1013 Ok(new_obj_id)
1014 }
1015
1016 pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
1018 let mut inner = self.inner.write().await;
1019 let txn = inner.db.begin().await?;
1020
1021 if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
1023 tracing::info!(
1024 "streaming job {} is for iceberg table, wait for manual finish operation",
1025 job_id
1026 );
1027 return Ok(());
1028 }
1029
1030 let (notification_op, objects, updated_user_info) =
1031 Self::finish_streaming_job_inner(&txn, job_id).await?;
1032
1033 txn.commit().await?;
1034
1035 let mut version = self
1036 .notify_frontend(
1037 notification_op,
1038 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1039 )
1040 .await;
1041
1042 if !updated_user_info.is_empty() {
1044 version = self.notify_users_update(updated_user_info).await;
1045 }
1046
1047 inner
1048 .creating_table_finish_notifier
1049 .values_mut()
1050 .for_each(|creating_tables| {
1051 if let Some(txs) = creating_tables.remove(&job_id) {
1052 for tx in txs {
1053 let _ = tx.send(Ok(version));
1054 }
1055 }
1056 });
1057
1058 Ok(())
1059 }
1060
1061 pub async fn finish_streaming_job_inner(
1063 txn: &DatabaseTransaction,
1064 job_id: JobId,
1065 ) -> MetaResult<(Operation, Vec<risingwave_pb::meta::Object>, Vec<PbUserInfo>)> {
1066 let job_type = Object::find_by_id(job_id)
1067 .select_only()
1068 .column(object::Column::ObjType)
1069 .into_tuple()
1070 .one(txn)
1071 .await?
1072 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1073
1074 let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1075 .select_only()
1076 .column(streaming_job::Column::CreateType)
1077 .into_tuple()
1078 .one(txn)
1079 .await?
1080 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1081
1082 let res = Object::update_many()
1084 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1085 .col_expr(
1086 object::Column::CreatedAtClusterVersion,
1087 current_cluster_version().into(),
1088 )
1089 .filter(object::Column::Oid.eq(job_id))
1090 .exec(txn)
1091 .await?;
1092 if res.rows_affected == 0 {
1093 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1094 }
1095
1096 let job = streaming_job::ActiveModel {
1098 job_id: Set(job_id),
1099 job_status: Set(JobStatus::Created),
1100 ..Default::default()
1101 };
1102 job.update(txn).await?;
1103
1104 let internal_table_objs = Table::find()
1106 .find_also_related(Object)
1107 .filter(table::Column::BelongsToJobId.eq(job_id))
1108 .all(txn)
1109 .await?;
1110 let mut objects = internal_table_objs
1111 .iter()
1112 .map(|(table, obj)| PbObject {
1113 object_info: Some(PbObjectInfo::Table(
1114 ObjectModel(table.clone(), obj.clone().unwrap()).into(),
1115 )),
1116 })
1117 .collect_vec();
1118 let mut notification_op = if create_type == CreateType::Background {
1119 NotificationOperation::Update
1120 } else {
1121 NotificationOperation::Add
1122 };
1123 let mut updated_user_info = vec![];
1124 let mut need_grant_default_privileges = true;
1125
1126 match job_type {
1127 ObjectType::Table => {
1128 let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1129 .find_also_related(Object)
1130 .one(txn)
1131 .await?
1132 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1133 if table.table_type == TableType::MaterializedView {
1134 notification_op = NotificationOperation::Update;
1135 }
1136
1137 if let Some(source_id) = table.optional_associated_source_id {
1138 let (src, obj) = Source::find_by_id(source_id)
1139 .find_also_related(Object)
1140 .one(txn)
1141 .await?
1142 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1143 objects.push(PbObject {
1144 object_info: Some(PbObjectInfo::Source(
1145 ObjectModel(src, obj.unwrap()).into(),
1146 )),
1147 });
1148 }
1149 objects.push(PbObject {
1150 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1151 });
1152 }
1153 ObjectType::Sink => {
1154 let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1155 .find_also_related(Object)
1156 .one(txn)
1157 .await?
1158 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1159 if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
1160 need_grant_default_privileges = false;
1161 }
1162 notification_op = NotificationOperation::Update;
1165 objects.push(PbObject {
1166 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
1167 });
1168 }
1169 ObjectType::Index => {
1170 need_grant_default_privileges = false;
1171 let (index, obj) = Index::find_by_id(job_id.as_index_id())
1172 .find_also_related(Object)
1173 .one(txn)
1174 .await?
1175 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1176 {
1177 let (table, obj) = Table::find_by_id(index.index_table_id)
1178 .find_also_related(Object)
1179 .one(txn)
1180 .await?
1181 .ok_or_else(|| {
1182 MetaError::catalog_id_not_found("table", index.index_table_id)
1183 })?;
1184 objects.push(PbObject {
1185 object_info: Some(PbObjectInfo::Table(
1186 ObjectModel(table, obj.unwrap()).into(),
1187 )),
1188 });
1189 }
1190
1191 let primary_table_privileges = UserPrivilege::find()
1194 .filter(
1195 user_privilege::Column::Oid
1196 .eq(index.primary_table_id)
1197 .and(user_privilege::Column::Action.eq(Action::Select)),
1198 )
1199 .all(txn)
1200 .await?;
1201 if !primary_table_privileges.is_empty() {
1202 let index_state_table_ids: Vec<TableId> = Table::find()
1203 .select_only()
1204 .column(table::Column::TableId)
1205 .filter(
1206 table::Column::BelongsToJobId
1207 .eq(job_id)
1208 .or(table::Column::TableId.eq(index.index_table_id)),
1209 )
1210 .into_tuple()
1211 .all(txn)
1212 .await?;
1213 let mut new_privileges = vec![];
1214 for privilege in &primary_table_privileges {
1215 for state_table_id in &index_state_table_ids {
1216 new_privileges.push(user_privilege::ActiveModel {
1217 id: Default::default(),
1218 oid: Set(state_table_id.as_object_id()),
1219 user_id: Set(privilege.user_id),
1220 action: Set(Action::Select),
1221 dependent_id: Set(privilege.dependent_id),
1222 granted_by: Set(privilege.granted_by),
1223 with_grant_option: Set(privilege.with_grant_option),
1224 });
1225 }
1226 }
1227 UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1228
1229 updated_user_info = list_user_info_by_ids(
1230 primary_table_privileges.into_iter().map(|p| p.user_id),
1231 txn,
1232 )
1233 .await?;
1234 }
1235
1236 objects.push(PbObject {
1237 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1238 });
1239 }
1240 ObjectType::Source => {
1241 let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1242 .find_also_related(Object)
1243 .one(txn)
1244 .await?
1245 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1246 objects.push(PbObject {
1247 object_info: Some(PbObjectInfo::Source(
1248 ObjectModel(source, obj.unwrap()).into(),
1249 )),
1250 });
1251 }
1252 _ => unreachable!("invalid job type: {:?}", job_type),
1253 }
1254
1255 if need_grant_default_privileges {
1256 updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1257 }
1258
1259 Ok((notification_op, objects, updated_user_info))
1260 }
1261
1262 pub async fn finish_replace_streaming_job(
1263 &self,
1264 tmp_id: JobId,
1265 streaming_job: StreamingJob,
1266 replace_upstream: FragmentReplaceUpstream,
1267 sink_into_table_context: SinkIntoTableContext,
1268 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1269 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1270 ) -> MetaResult<NotificationVersion> {
1271 let inner = self.inner.write().await;
1272 let txn = inner.db.begin().await?;
1273
1274 let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1275 tmp_id,
1276 replace_upstream,
1277 sink_into_table_context,
1278 &txn,
1279 streaming_job,
1280 drop_table_connector_ctx,
1281 auto_refresh_schema_sinks,
1282 )
1283 .await?;
1284
1285 txn.commit().await?;
1286
1287 let mut version = self
1288 .notify_frontend(
1289 NotificationOperation::Update,
1290 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1291 )
1292 .await;
1293
1294 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1295 self.notify_users_update(user_infos).await;
1296 version = self
1297 .notify_frontend(
1298 NotificationOperation::Delete,
1299 build_object_group_for_delete(to_drop_objects),
1300 )
1301 .await;
1302 }
1303
1304 Ok(version)
1305 }
1306
1307 pub async fn finish_replace_streaming_job_inner(
1308 tmp_id: JobId,
1309 replace_upstream: FragmentReplaceUpstream,
1310 SinkIntoTableContext {
1311 updated_sink_catalogs,
1312 }: SinkIntoTableContext,
1313 txn: &DatabaseTransaction,
1314 streaming_job: StreamingJob,
1315 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1316 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1317 ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1318 let original_job_id = streaming_job.id();
1319 let job_type = streaming_job.job_type();
1320
1321 let mut index_item_rewriter = None;
1322
1323 match streaming_job {
1325 StreamingJob::Table(_source, table, _table_job_type) => {
1326 let original_column_catalogs =
1329 get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1330
1331 index_item_rewriter = Some({
1332 let original_columns = original_column_catalogs
1333 .to_protobuf()
1334 .into_iter()
1335 .map(|c| c.column_desc.unwrap())
1336 .collect_vec();
1337 let new_columns = table
1338 .columns
1339 .iter()
1340 .map(|c| c.column_desc.clone().unwrap())
1341 .collect_vec();
1342
1343 IndexItemRewriter {
1344 original_columns,
1345 new_columns,
1346 }
1347 });
1348
1349 for sink_id in updated_sink_catalogs {
1351 sink::ActiveModel {
1352 sink_id: Set(sink_id as _),
1353 original_target_columns: Set(Some(original_column_catalogs.clone())),
1354 ..Default::default()
1355 }
1356 .update(txn)
1357 .await?;
1358 }
1359 let mut table = table::ActiveModel::from(table);
1361 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1362 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1363 {
1364 table.optional_associated_source_id = Set(None);
1366 }
1367
1368 table.update(txn).await?;
1369 }
1370 StreamingJob::Source(source) => {
1371 let source = source::ActiveModel::from(source);
1373 source.update(txn).await?;
1374 }
1375 StreamingJob::MaterializedView(table) => {
1376 let table = table::ActiveModel::from(table);
1378 table.update(txn).await?;
1379 }
1380 _ => unreachable!(
1381 "invalid streaming job type: {:?}",
1382 streaming_job.job_type_str()
1383 ),
1384 }
1385
1386 async fn finish_fragments(
1387 txn: &DatabaseTransaction,
1388 tmp_id: JobId,
1389 original_job_id: JobId,
1390 replace_upstream: FragmentReplaceUpstream,
1391 ) -> MetaResult<()> {
1392 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1396 .select_only()
1397 .columns([
1398 fragment::Column::FragmentId,
1399 fragment::Column::StateTableIds,
1400 ])
1401 .filter(fragment::Column::JobId.eq(tmp_id))
1402 .into_tuple()
1403 .all(txn)
1404 .await?;
1405 for (fragment_id, state_table_ids) in fragment_info {
1406 for state_table_id in state_table_ids.into_inner() {
1407 let state_table_id = TableId::new(state_table_id as _);
1408 table::ActiveModel {
1409 table_id: Set(state_table_id),
1410 fragment_id: Set(Some(fragment_id)),
1411 ..Default::default()
1413 }
1414 .update(txn)
1415 .await?;
1416 }
1417 }
1418
1419 Fragment::delete_many()
1421 .filter(fragment::Column::JobId.eq(original_job_id))
1422 .exec(txn)
1423 .await?;
1424 Fragment::update_many()
1425 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1426 .filter(fragment::Column::JobId.eq(tmp_id))
1427 .exec(txn)
1428 .await?;
1429
1430 for (fragment_id, fragment_replace_map) in replace_upstream {
1433 let (fragment_id, mut stream_node) =
1434 Fragment::find_by_id(fragment_id as FragmentId)
1435 .select_only()
1436 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1437 .into_tuple::<(FragmentId, StreamNode)>()
1438 .one(txn)
1439 .await?
1440 .map(|(id, node)| (id, node.to_protobuf()))
1441 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1442
1443 visit_stream_node_mut(&mut stream_node, |body| {
1444 if let PbNodeBody::Merge(m) = body
1445 && let Some(new_fragment_id) =
1446 fragment_replace_map.get(&m.upstream_fragment_id)
1447 {
1448 m.upstream_fragment_id = *new_fragment_id;
1449 }
1450 });
1451 fragment::ActiveModel {
1452 fragment_id: Set(fragment_id),
1453 stream_node: Set(StreamNode::from(&stream_node)),
1454 ..Default::default()
1455 }
1456 .update(txn)
1457 .await?;
1458 }
1459
1460 Object::delete_by_id(tmp_id).exec(txn).await?;
1462
1463 Ok(())
1464 }
1465
1466 finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1467
1468 let mut objects = vec![];
1470 match job_type {
1471 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1472 let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1473 .find_also_related(Object)
1474 .one(txn)
1475 .await?
1476 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1477 objects.push(PbObject {
1478 object_info: Some(PbObjectInfo::Table(
1479 ObjectModel(table, table_obj.unwrap()).into(),
1480 )),
1481 })
1482 }
1483 StreamingJobType::Source => {
1484 let (source, source_obj) =
1485 Source::find_by_id(original_job_id.as_shared_source_id())
1486 .find_also_related(Object)
1487 .one(txn)
1488 .await?
1489 .ok_or_else(|| {
1490 MetaError::catalog_id_not_found("object", original_job_id)
1491 })?;
1492 objects.push(PbObject {
1493 object_info: Some(PbObjectInfo::Source(
1494 ObjectModel(source, source_obj.unwrap()).into(),
1495 )),
1496 })
1497 }
1498 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1499 }
1500
1501 if let Some(expr_rewriter) = index_item_rewriter {
1502 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1503 .select_only()
1504 .columns([index::Column::IndexId, index::Column::IndexItems])
1505 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1506 .into_tuple()
1507 .all(txn)
1508 .await?;
1509 for (index_id, nodes) in index_items {
1510 let mut pb_nodes = nodes.to_protobuf();
1511 pb_nodes
1512 .iter_mut()
1513 .for_each(|x| expr_rewriter.rewrite_expr(x));
1514 let index = index::ActiveModel {
1515 index_id: Set(index_id),
1516 index_items: Set(pb_nodes.into()),
1517 ..Default::default()
1518 }
1519 .update(txn)
1520 .await?;
1521 let index_obj = index
1522 .find_related(Object)
1523 .one(txn)
1524 .await?
1525 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1526 objects.push(PbObject {
1527 object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1528 });
1529 }
1530 }
1531
1532 if let Some(sinks) = auto_refresh_schema_sinks {
1533 for finish_sink_context in sinks {
1534 finish_fragments(
1535 txn,
1536 finish_sink_context.tmp_sink_id.as_job_id(),
1537 finish_sink_context.original_sink_id.as_job_id(),
1538 Default::default(),
1539 )
1540 .await?;
1541 let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1542 .find_also_related(Object)
1543 .one(txn)
1544 .await?
1545 .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1546 let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1547 Sink::update(sink::ActiveModel {
1548 sink_id: Set(finish_sink_context.original_sink_id),
1549 columns: Set(columns.clone()),
1550 ..Default::default()
1551 })
1552 .exec(txn)
1553 .await?;
1554 sink.columns = columns;
1555 objects.push(PbObject {
1556 object_info: Some(PbObjectInfo::Sink(
1557 ObjectModel(sink, sink_obj.unwrap()).into(),
1558 )),
1559 });
1560 if let Some((log_store_table_id, new_log_store_table_columns)) =
1561 finish_sink_context.new_log_store_table
1562 {
1563 let new_log_store_table_columns: ColumnCatalogArray =
1564 new_log_store_table_columns.into();
1565 let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1566 .find_also_related(Object)
1567 .one(txn)
1568 .await?
1569 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1570 Table::update(table::ActiveModel {
1571 table_id: Set(log_store_table_id),
1572 columns: Set(new_log_store_table_columns.clone()),
1573 ..Default::default()
1574 })
1575 .exec(txn)
1576 .await?;
1577 table.columns = new_log_store_table_columns;
1578 objects.push(PbObject {
1579 object_info: Some(PbObjectInfo::Table(
1580 ObjectModel(table, table_obj.unwrap()).into(),
1581 )),
1582 });
1583 }
1584 }
1585 }
1586
1587 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1588 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1589 notification_objs =
1590 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1591 }
1592
1593 Ok((objects, notification_objs))
1594 }
1595
1596 pub async fn try_abort_replacing_streaming_job(
1598 &self,
1599 tmp_job_id: JobId,
1600 tmp_sink_ids: Option<Vec<ObjectId>>,
1601 ) -> MetaResult<()> {
1602 let inner = self.inner.write().await;
1603 let txn = inner.db.begin().await?;
1604 Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1605 if let Some(tmp_sink_ids) = tmp_sink_ids {
1606 for tmp_sink_id in tmp_sink_ids {
1607 Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1608 }
1609 }
1610 txn.commit().await?;
1611 Ok(())
1612 }
1613
1614 pub async fn update_source_rate_limit_by_source_id(
1617 &self,
1618 source_id: SourceId,
1619 rate_limit: Option<u32>,
1620 ) -> MetaResult<(HashSet<JobId>, HashSet<FragmentId>)> {
1621 let inner = self.inner.read().await;
1622 let txn = inner.db.begin().await?;
1623
1624 {
1625 let active_source = source::ActiveModel {
1626 source_id: Set(source_id),
1627 rate_limit: Set(rate_limit.map(|v| v as i32)),
1628 ..Default::default()
1629 };
1630 active_source.update(&txn).await?;
1631 }
1632
1633 let (source, obj) = Source::find_by_id(source_id)
1634 .find_also_related(Object)
1635 .one(&txn)
1636 .await?
1637 .ok_or_else(|| {
1638 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1639 })?;
1640
1641 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1642 let streaming_job_ids: Vec<JobId> =
1643 if let Some(table_id) = source.optional_associated_table_id {
1644 vec![table_id.as_job_id()]
1645 } else if let Some(source_info) = &source.source_info
1646 && source_info.to_protobuf().is_shared()
1647 {
1648 vec![source_id.as_share_source_job_id()]
1649 } else {
1650 ObjectDependency::find()
1651 .select_only()
1652 .column(object_dependency::Column::UsedBy)
1653 .filter(object_dependency::Column::Oid.eq(source_id))
1654 .into_tuple()
1655 .all(&txn)
1656 .await?
1657 };
1658
1659 if streaming_job_ids.is_empty() {
1660 return Err(MetaError::invalid_parameter(format!(
1661 "source id {source_id} not used by any streaming job"
1662 )));
1663 }
1664
1665 let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
1666 .select_only()
1667 .columns([
1668 fragment::Column::FragmentId,
1669 fragment::Column::JobId,
1670 fragment::Column::FragmentTypeMask,
1671 fragment::Column::StreamNode,
1672 ])
1673 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1674 .into_tuple()
1675 .all(&txn)
1676 .await?;
1677 let mut fragments = fragments
1678 .into_iter()
1679 .map(|(id, job_id, mask, stream_node)| {
1680 (
1681 id,
1682 job_id,
1683 FragmentTypeMask::from(mask as u32),
1684 stream_node.to_protobuf(),
1685 )
1686 })
1687 .collect_vec();
1688
1689 fragments.retain_mut(|(_, _, fragment_type_mask, stream_node)| {
1690 let mut found = false;
1691 if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1692 visit_stream_node_mut(stream_node, |node| {
1693 if let PbNodeBody::Source(node) = node
1694 && let Some(node_inner) = &mut node.source_inner
1695 && node_inner.source_id == source_id
1696 {
1697 node_inner.rate_limit = rate_limit;
1698 found = true;
1699 }
1700 });
1701 }
1702 if is_fs_source {
1703 visit_stream_node_mut(stream_node, |node| {
1706 if let PbNodeBody::StreamFsFetch(node) = node {
1707 fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1708 if let Some(node_inner) = &mut node.node_inner
1709 && node_inner.source_id == source_id
1710 {
1711 node_inner.rate_limit = rate_limit;
1712 found = true;
1713 }
1714 }
1715 });
1716 }
1717 found
1718 });
1719
1720 assert!(
1721 !fragments.is_empty(),
1722 "source id should be used by at least one fragment"
1723 );
1724
1725 let (fragment_ids, job_ids) = fragments
1726 .iter()
1727 .map(|(framgnet_id, job_id, _, _)| (framgnet_id, job_id))
1728 .unzip();
1729
1730 for (fragment_id, _, fragment_type_mask, stream_node) in fragments {
1731 fragment::ActiveModel {
1732 fragment_id: Set(fragment_id),
1733 fragment_type_mask: Set(fragment_type_mask.into()),
1734 stream_node: Set(StreamNode::from(&stream_node)),
1735 ..Default::default()
1736 }
1737 .update(&txn)
1738 .await?;
1739 }
1740
1741 txn.commit().await?;
1742
1743 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1744 let _version = self
1745 .notify_frontend(
1746 NotificationOperation::Update,
1747 NotificationInfo::ObjectGroup(PbObjectGroup {
1748 objects: vec![PbObject {
1749 object_info: Some(relation_info),
1750 }],
1751 }),
1752 )
1753 .await;
1754
1755 Ok((job_ids, fragment_ids))
1756 }
1757
1758 pub async fn mutate_fragments_by_job_id(
1761 &self,
1762 job_id: JobId,
1763 mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1765 err_msg: &'static str,
1767 ) -> MetaResult<HashSet<FragmentId>> {
1768 let inner = self.inner.read().await;
1769 let txn = inner.db.begin().await?;
1770
1771 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1772 .select_only()
1773 .columns([
1774 fragment::Column::FragmentId,
1775 fragment::Column::FragmentTypeMask,
1776 fragment::Column::StreamNode,
1777 ])
1778 .filter(fragment::Column::JobId.eq(job_id))
1779 .into_tuple()
1780 .all(&txn)
1781 .await?;
1782 let mut fragments = fragments
1783 .into_iter()
1784 .map(|(id, mask, stream_node)| {
1785 (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1786 })
1787 .collect_vec();
1788
1789 let fragments = fragments
1790 .iter_mut()
1791 .map(|(_, fragment_type_mask, stream_node)| {
1792 fragments_mutation_fn(*fragment_type_mask, stream_node)
1793 })
1794 .collect::<MetaResult<Vec<bool>>>()?
1795 .into_iter()
1796 .zip_eq_debug(std::mem::take(&mut fragments))
1797 .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1798 .collect::<Vec<_>>();
1799
1800 if fragments.is_empty() {
1801 return Err(MetaError::invalid_parameter(format!(
1802 "job id {job_id}: {}",
1803 err_msg
1804 )));
1805 }
1806
1807 let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
1808 for (id, _, stream_node) in fragments {
1809 fragment::ActiveModel {
1810 fragment_id: Set(id),
1811 stream_node: Set(StreamNode::from(&stream_node)),
1812 ..Default::default()
1813 }
1814 .update(&txn)
1815 .await?;
1816 }
1817
1818 txn.commit().await?;
1819
1820 Ok(fragment_ids)
1821 }
1822
1823 async fn mutate_fragment_by_fragment_id(
1824 &self,
1825 fragment_id: FragmentId,
1826 mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1827 err_msg: &'static str,
1828 ) -> MetaResult<()> {
1829 let inner = self.inner.read().await;
1830 let txn = inner.db.begin().await?;
1831
1832 let (fragment_type_mask, stream_node): (i32, StreamNode) =
1833 Fragment::find_by_id(fragment_id)
1834 .select_only()
1835 .columns([
1836 fragment::Column::FragmentTypeMask,
1837 fragment::Column::StreamNode,
1838 ])
1839 .into_tuple()
1840 .one(&txn)
1841 .await?
1842 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1843 let mut pb_stream_node = stream_node.to_protobuf();
1844 let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1845
1846 if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1847 return Err(MetaError::invalid_parameter(format!(
1848 "fragment id {fragment_id}: {}",
1849 err_msg
1850 )));
1851 }
1852
1853 fragment::ActiveModel {
1854 fragment_id: Set(fragment_id),
1855 stream_node: Set(stream_node),
1856 ..Default::default()
1857 }
1858 .update(&txn)
1859 .await?;
1860
1861 txn.commit().await?;
1862
1863 Ok(())
1864 }
1865
1866 pub async fn update_backfill_rate_limit_by_job_id(
1869 &self,
1870 job_id: JobId,
1871 rate_limit: Option<u32>,
1872 ) -> MetaResult<HashSet<FragmentId>> {
1873 let update_backfill_rate_limit =
1874 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1875 let mut found = false;
1876 if fragment_type_mask
1877 .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1878 {
1879 visit_stream_node_mut(stream_node, |node| match node {
1880 PbNodeBody::StreamCdcScan(node) => {
1881 node.rate_limit = rate_limit;
1882 found = true;
1883 }
1884 PbNodeBody::StreamScan(node) => {
1885 node.rate_limit = rate_limit;
1886 found = true;
1887 }
1888 PbNodeBody::SourceBackfill(node) => {
1889 node.rate_limit = rate_limit;
1890 found = true;
1891 }
1892 PbNodeBody::Sink(node) => {
1893 node.rate_limit = rate_limit;
1894 found = true;
1895 }
1896 _ => {}
1897 });
1898 }
1899 Ok(found)
1900 };
1901
1902 self.mutate_fragments_by_job_id(
1903 job_id,
1904 update_backfill_rate_limit,
1905 "stream scan node or source node not found",
1906 )
1907 .await
1908 }
1909
1910 pub async fn update_sink_rate_limit_by_job_id(
1913 &self,
1914 sink_id: SinkId,
1915 rate_limit: Option<u32>,
1916 ) -> MetaResult<HashSet<FragmentId>> {
1917 let update_sink_rate_limit =
1918 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1919 let mut found = Ok(false);
1920 if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1921 visit_stream_node_mut(stream_node, |node| {
1922 if let PbNodeBody::Sink(node) = node {
1923 if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1924 found = Err(MetaError::invalid_parameter(
1925 "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1926 ));
1927 return;
1928 }
1929 node.rate_limit = rate_limit;
1930 found = Ok(true);
1931 }
1932 });
1933 }
1934 found
1935 };
1936
1937 self.mutate_fragments_by_job_id(
1938 sink_id.as_job_id(),
1939 update_sink_rate_limit,
1940 "sink node not found",
1941 )
1942 .await
1943 }
1944
1945 pub async fn update_dml_rate_limit_by_job_id(
1946 &self,
1947 job_id: JobId,
1948 rate_limit: Option<u32>,
1949 ) -> MetaResult<HashSet<FragmentId>> {
1950 let update_dml_rate_limit =
1951 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1952 let mut found = false;
1953 if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1954 visit_stream_node_mut(stream_node, |node| {
1955 if let PbNodeBody::Dml(node) = node {
1956 node.rate_limit = rate_limit;
1957 found = true;
1958 }
1959 });
1960 }
1961 Ok(found)
1962 };
1963
1964 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1965 .await
1966 }
1967
1968 pub async fn update_source_props_by_source_id(
1969 &self,
1970 source_id: SourceId,
1971 alter_props: BTreeMap<String, String>,
1972 alter_secret_refs: BTreeMap<String, PbSecretRef>,
1973 ) -> MetaResult<WithOptionsSecResolved> {
1974 let inner = self.inner.read().await;
1975 let txn = inner.db.begin().await?;
1976
1977 let (source, _obj) = Source::find_by_id(source_id)
1978 .find_also_related(Object)
1979 .one(&txn)
1980 .await?
1981 .ok_or_else(|| {
1982 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1983 })?;
1984 let connector = source.with_properties.0.get_connector().unwrap();
1985 let is_shared_source = source.is_shared();
1986
1987 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
1988 if !is_shared_source {
1989 dep_source_job_ids = ObjectDependency::find()
1991 .select_only()
1992 .column(object_dependency::Column::UsedBy)
1993 .filter(object_dependency::Column::Oid.eq(source_id))
1994 .into_tuple()
1995 .all(&txn)
1996 .await?;
1997 }
1998
1999 let prop_keys: Vec<String> = alter_props
2001 .keys()
2002 .chain(alter_secret_refs.keys())
2003 .cloned()
2004 .collect();
2005 risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
2006 &connector, &prop_keys,
2007 )?;
2008
2009 let mut options_with_secret = WithOptionsSecResolved::new(
2010 source.with_properties.0.clone(),
2011 source
2012 .secret_ref
2013 .map(|secret_ref| secret_ref.to_protobuf())
2014 .unwrap_or_default(),
2015 );
2016 let (to_add_secret_dep, to_remove_secret_dep) =
2017 options_with_secret.handle_update(alter_props, alter_secret_refs)?;
2018
2019 tracing::info!(
2020 "applying new properties to source: source_id={}, options_with_secret={:?}",
2021 source_id,
2022 options_with_secret
2023 );
2024 let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
2026 let mut associate_table_id = None;
2029
2030 let mut preferred_id = source_id.as_object_id();
2034 let rewrite_sql = {
2035 let definition = source.definition.clone();
2036
2037 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2038 .map_err(|e| {
2039 MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2040 anyhow!(e).context("Failed to parse source definition SQL"),
2041 )))
2042 })?
2043 .try_into()
2044 .unwrap();
2045
2046 async fn format_with_option_secret_resolved(
2060 txn: &DatabaseTransaction,
2061 options_with_secret: &WithOptionsSecResolved,
2062 ) -> MetaResult<Vec<SqlOption>> {
2063 let mut options = Vec::new();
2064 for (k, v) in options_with_secret.as_plaintext() {
2065 let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2066 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2067 options.push(sql_option);
2068 }
2069 for (k, v) in options_with_secret.as_secret() {
2070 if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2071 let sql_option =
2072 SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2073 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2074 options.push(sql_option);
2075 } else {
2076 return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2077 }
2078 }
2079 Ok(options)
2080 }
2081
2082 match &mut stmt {
2083 Statement::CreateSource { stmt } => {
2084 stmt.with_properties.0 =
2085 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2086 }
2087 Statement::CreateTable { with_options, .. } => {
2088 *with_options =
2089 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2090 associate_table_id = source.optional_associated_table_id;
2091 preferred_id = associate_table_id.unwrap().as_object_id();
2092 }
2093 _ => unreachable!(),
2094 }
2095
2096 stmt.to_string()
2097 };
2098
2099 {
2100 if !to_add_secret_dep.is_empty() {
2102 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2103 object_dependency::ActiveModel {
2104 oid: Set(secret_id.into()),
2105 used_by: Set(preferred_id),
2106 ..Default::default()
2107 }
2108 }))
2109 .exec(&txn)
2110 .await?;
2111 }
2112 if !to_remove_secret_dep.is_empty() {
2113 let _ = ObjectDependency::delete_many()
2115 .filter(
2116 object_dependency::Column::Oid
2117 .is_in(to_remove_secret_dep)
2118 .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2119 )
2120 .exec(&txn)
2121 .await?;
2122 }
2123 }
2124
2125 let active_source_model = source::ActiveModel {
2126 source_id: Set(source_id),
2127 definition: Set(rewrite_sql.clone()),
2128 with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2129 secret_ref: Set((!options_with_secret.as_secret().is_empty())
2130 .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2131 ..Default::default()
2132 };
2133 active_source_model.update(&txn).await?;
2134
2135 if let Some(associate_table_id) = associate_table_id {
2136 let active_table_model = table::ActiveModel {
2138 table_id: Set(associate_table_id),
2139 definition: Set(rewrite_sql),
2140 ..Default::default()
2141 };
2142 active_table_model.update(&txn).await?;
2143 }
2144
2145 let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2146 associate_table_id.as_job_id()
2148 } else {
2149 source_id.as_share_source_job_id()
2150 }]
2151 .into_iter()
2152 .chain(dep_source_job_ids.into_iter())
2153 .collect_vec();
2154
2155 update_connector_props_fragments(
2157 &txn,
2158 to_check_job_ids,
2159 FragmentTypeFlag::Source,
2160 |node, found| {
2161 if let PbNodeBody::Source(node) = node
2162 && let Some(source_inner) = &mut node.source_inner
2163 {
2164 source_inner.with_properties = options_with_secret.as_plaintext().clone();
2165 source_inner.secret_refs = options_with_secret.as_secret().clone();
2166 *found = true;
2167 }
2168 },
2169 is_shared_source,
2170 )
2171 .await?;
2172
2173 let mut to_update_objs = Vec::with_capacity(2);
2174 let (source, obj) = Source::find_by_id(source_id)
2175 .find_also_related(Object)
2176 .one(&txn)
2177 .await?
2178 .ok_or_else(|| {
2179 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2180 })?;
2181 to_update_objs.push(PbObject {
2182 object_info: Some(PbObjectInfo::Source(
2183 ObjectModel(source, obj.unwrap()).into(),
2184 )),
2185 });
2186
2187 if let Some(associate_table_id) = associate_table_id {
2188 let (table, obj) = Table::find_by_id(associate_table_id)
2189 .find_also_related(Object)
2190 .one(&txn)
2191 .await?
2192 .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2193 to_update_objs.push(PbObject {
2194 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2195 });
2196 }
2197
2198 txn.commit().await?;
2199
2200 self.notify_frontend(
2201 NotificationOperation::Update,
2202 NotificationInfo::ObjectGroup(PbObjectGroup {
2203 objects: to_update_objs,
2204 }),
2205 )
2206 .await;
2207
2208 Ok(options_with_secret)
2209 }
2210
2211 pub async fn update_sink_props_by_sink_id(
2212 &self,
2213 sink_id: SinkId,
2214 props: BTreeMap<String, String>,
2215 ) -> MetaResult<HashMap<String, String>> {
2216 let inner = self.inner.read().await;
2217 let txn = inner.db.begin().await?;
2218
2219 let (sink, _obj) = Sink::find_by_id(sink_id)
2220 .find_also_related(Object)
2221 .one(&txn)
2222 .await?
2223 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2224 validate_sink_props(&sink, &props)?;
2225 let definition = sink.definition.clone();
2226 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2227 .map_err(|e| SinkError::Config(anyhow!(e)))?
2228 .try_into()
2229 .unwrap();
2230 if let Statement::CreateSink { stmt } = &mut stmt {
2231 update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2232 } else {
2233 panic!("definition is not a create sink statement")
2234 }
2235 let mut new_config = sink.properties.clone().into_inner();
2236 new_config.extend(props.clone());
2237
2238 let definition = stmt.to_string();
2239 let active_sink = sink::ActiveModel {
2240 sink_id: Set(sink_id),
2241 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2242 definition: Set(definition),
2243 ..Default::default()
2244 };
2245 active_sink.update(&txn).await?;
2246
2247 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2248 let (sink, obj) = Sink::find_by_id(sink_id)
2249 .find_also_related(Object)
2250 .one(&txn)
2251 .await?
2252 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2253 txn.commit().await?;
2254 let relation_infos = vec![PbObject {
2255 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2256 }];
2257
2258 let _version = self
2259 .notify_frontend(
2260 NotificationOperation::Update,
2261 NotificationInfo::ObjectGroup(PbObjectGroup {
2262 objects: relation_infos,
2263 }),
2264 )
2265 .await;
2266
2267 Ok(props.into_iter().collect())
2268 }
2269
2270 pub async fn update_iceberg_table_props_by_table_id(
2271 &self,
2272 table_id: TableId,
2273 props: BTreeMap<String, String>,
2274 alter_iceberg_table_props: Option<
2275 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2276 >,
2277 ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2278 let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2279 ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2280 let inner = self.inner.read().await;
2281 let txn = inner.db.begin().await?;
2282
2283 let (sink, _obj) = Sink::find_by_id(sink_id)
2284 .find_also_related(Object)
2285 .one(&txn)
2286 .await?
2287 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2288 validate_sink_props(&sink, &props)?;
2289
2290 let definition = sink.definition.clone();
2291 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2292 .map_err(|e| SinkError::Config(anyhow!(e)))?
2293 .try_into()
2294 .unwrap();
2295 if let Statement::CreateTable {
2296 with_options,
2297 engine,
2298 ..
2299 } = &mut stmt
2300 {
2301 if !matches!(engine, Engine::Iceberg) {
2302 return Err(SinkError::Config(anyhow!(
2303 "only iceberg table can be altered as sink"
2304 ))
2305 .into());
2306 }
2307 update_stmt_with_props(with_options, &props)?;
2308 } else {
2309 panic!("definition is not a create iceberg table statement")
2310 }
2311 let mut new_config = sink.properties.clone().into_inner();
2312 new_config.extend(props.clone());
2313
2314 let definition = stmt.to_string();
2315 let active_sink = sink::ActiveModel {
2316 sink_id: Set(sink_id),
2317 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2318 definition: Set(definition.clone()),
2319 ..Default::default()
2320 };
2321 let active_source = source::ActiveModel {
2322 source_id: Set(source_id),
2323 definition: Set(definition.clone()),
2324 ..Default::default()
2325 };
2326 let active_table = table::ActiveModel {
2327 table_id: Set(table_id),
2328 definition: Set(definition),
2329 ..Default::default()
2330 };
2331 active_sink.update(&txn).await?;
2332 active_source.update(&txn).await?;
2333 active_table.update(&txn).await?;
2334
2335 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2336
2337 let (sink, sink_obj) = Sink::find_by_id(sink_id)
2338 .find_also_related(Object)
2339 .one(&txn)
2340 .await?
2341 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2342 let (source, source_obj) = Source::find_by_id(source_id)
2343 .find_also_related(Object)
2344 .one(&txn)
2345 .await?
2346 .ok_or_else(|| {
2347 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2348 })?;
2349 let (table, table_obj) = Table::find_by_id(table_id)
2350 .find_also_related(Object)
2351 .one(&txn)
2352 .await?
2353 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2354 txn.commit().await?;
2355 let relation_infos = vec![
2356 PbObject {
2357 object_info: Some(PbObjectInfo::Sink(
2358 ObjectModel(sink, sink_obj.unwrap()).into(),
2359 )),
2360 },
2361 PbObject {
2362 object_info: Some(PbObjectInfo::Source(
2363 ObjectModel(source, source_obj.unwrap()).into(),
2364 )),
2365 },
2366 PbObject {
2367 object_info: Some(PbObjectInfo::Table(
2368 ObjectModel(table, table_obj.unwrap()).into(),
2369 )),
2370 },
2371 ];
2372 let _version = self
2373 .notify_frontend(
2374 NotificationOperation::Update,
2375 NotificationInfo::ObjectGroup(PbObjectGroup {
2376 objects: relation_infos,
2377 }),
2378 )
2379 .await;
2380
2381 Ok((props.into_iter().collect(), sink_id))
2382 }
2383
2384 pub async fn update_connection_and_dependent_objects_props(
2386 &self,
2387 connection_id: ConnectionId,
2388 alter_props: BTreeMap<String, String>,
2389 alter_secret_refs: BTreeMap<String, PbSecretRef>,
2390 ) -> MetaResult<(
2391 WithOptionsSecResolved, Vec<(SourceId, HashMap<String, String>)>, Vec<(SinkId, HashMap<String, String>)>, )> {
2395 let inner = self.inner.read().await;
2396 let txn = inner.db.begin().await?;
2397
2398 let dependent_sources: Vec<SourceId> = Source::find()
2400 .select_only()
2401 .column(source::Column::SourceId)
2402 .filter(source::Column::ConnectionId.eq(connection_id))
2403 .into_tuple()
2404 .all(&txn)
2405 .await?;
2406
2407 let dependent_sinks: Vec<SinkId> = Sink::find()
2408 .select_only()
2409 .column(sink::Column::SinkId)
2410 .filter(sink::Column::ConnectionId.eq(connection_id))
2411 .into_tuple()
2412 .all(&txn)
2413 .await?;
2414
2415 let (connection_catalog, _obj) = Connection::find_by_id(connection_id)
2416 .find_also_related(Object)
2417 .one(&txn)
2418 .await?
2419 .ok_or_else(|| {
2420 MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2421 })?;
2422
2423 let prop_keys: Vec<String> = alter_props
2425 .keys()
2426 .chain(alter_secret_refs.keys())
2427 .cloned()
2428 .collect();
2429
2430 let connection_type_str = pb_connection_type_to_connection_type(
2432 &connection_catalog.params.to_protobuf().connection_type(),
2433 )
2434 .ok_or_else(|| MetaError::invalid_parameter("Unspecified connection type"))?;
2435
2436 risingwave_connector::allow_alter_on_fly_fields::check_connection_allow_alter_on_fly_fields(
2437 connection_type_str, &prop_keys,
2438 )?;
2439
2440 let connection_pb = connection_catalog.params.to_protobuf();
2441 let mut connection_options_with_secret = WithOptionsSecResolved::new(
2442 connection_pb.properties.into_iter().collect(),
2443 connection_pb.secret_refs.into_iter().collect(),
2444 );
2445
2446 let (to_add_secret_dep, to_remove_secret_dep) = connection_options_with_secret
2447 .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2448
2449 tracing::debug!(
2450 "applying new properties to connection and dependents: connection_id={}, sources={:?}, sinks={:?}",
2451 connection_id,
2452 dependent_sources,
2453 dependent_sinks
2454 );
2455
2456 {
2458 let conn_params_pb = risingwave_pb::catalog::ConnectionParams {
2459 connection_type: connection_pb.connection_type,
2460 properties: connection_options_with_secret
2461 .as_plaintext()
2462 .clone()
2463 .into_iter()
2464 .collect(),
2465 secret_refs: connection_options_with_secret
2466 .as_secret()
2467 .clone()
2468 .into_iter()
2469 .collect(),
2470 };
2471 let connection = PbConnection {
2472 id: connection_id as _,
2473 info: Some(risingwave_pb::catalog::connection::Info::ConnectionParams(
2474 conn_params_pb,
2475 )),
2476 ..Default::default()
2477 };
2478 validate_connection(&connection).await?;
2479 }
2480
2481 if !to_add_secret_dep.is_empty() {
2483 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2484 object_dependency::ActiveModel {
2485 oid: Set(secret_id.into()),
2486 used_by: Set(connection_id.as_object_id()),
2487 ..Default::default()
2488 }
2489 }))
2490 .exec(&txn)
2491 .await?;
2492 }
2493 if !to_remove_secret_dep.is_empty() {
2494 let _ = ObjectDependency::delete_many()
2495 .filter(
2496 object_dependency::Column::Oid
2497 .is_in(to_remove_secret_dep)
2498 .and(object_dependency::Column::UsedBy.eq(connection_id.as_object_id())),
2499 )
2500 .exec(&txn)
2501 .await?;
2502 }
2503
2504 let updated_connection_params = risingwave_pb::catalog::ConnectionParams {
2506 connection_type: connection_pb.connection_type,
2507 properties: connection_options_with_secret
2508 .as_plaintext()
2509 .clone()
2510 .into_iter()
2511 .collect(),
2512 secret_refs: connection_options_with_secret
2513 .as_secret()
2514 .clone()
2515 .into_iter()
2516 .collect(),
2517 };
2518 let active_connection_model = connection::ActiveModel {
2519 connection_id: Set(connection_id),
2520 params: Set(ConnectionParams::from(&updated_connection_params)),
2521 ..Default::default()
2522 };
2523 active_connection_model.update(&txn).await?;
2524
2525 let mut updated_sources_with_props: Vec<(SourceId, HashMap<String, String>)> = Vec::new();
2527
2528 if !dependent_sources.is_empty() {
2529 let sources_with_objs = Source::find()
2531 .find_also_related(Object)
2532 .filter(source::Column::SourceId.is_in(dependent_sources.iter().cloned()))
2533 .all(&txn)
2534 .await?;
2535
2536 let mut source_updates = Vec::new();
2538 let mut fragment_updates: Vec<DependentSourceFragmentUpdate> = Vec::new();
2539
2540 for (source, _obj) in sources_with_objs {
2541 let source_id = source.source_id;
2542
2543 let mut source_options_with_secret = WithOptionsSecResolved::new(
2544 source.with_properties.0.clone(),
2545 source
2546 .secret_ref
2547 .clone()
2548 .map(|secret_ref| secret_ref.to_protobuf())
2549 .unwrap_or_default(),
2550 );
2551 let (_source_to_add_secret_dep, _source_to_remove_secret_dep) =
2552 source_options_with_secret
2553 .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2554
2555 let _ = ConnectorProperties::extract(source_options_with_secret.clone(), true)?;
2557
2558 let active_source = source::ActiveModel {
2560 source_id: Set(source_id),
2561 with_properties: Set(Property(
2562 source_options_with_secret.as_plaintext().clone(),
2563 )),
2564 secret_ref: Set((!source_options_with_secret.as_secret().is_empty()).then(
2565 || {
2566 risingwave_meta_model::SecretRef::from(
2567 source_options_with_secret.as_secret().clone(),
2568 )
2569 },
2570 )),
2571 ..Default::default()
2572 };
2573 source_updates.push(active_source);
2574
2575 let is_shared_source = source.is_shared();
2580 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2581 if !is_shared_source {
2582 dep_source_job_ids = ObjectDependency::find()
2583 .select_only()
2584 .column(object_dependency::Column::UsedBy)
2585 .filter(object_dependency::Column::Oid.eq(source_id))
2586 .into_tuple()
2587 .all(&txn)
2588 .await?;
2589 }
2590
2591 let base_job_id =
2592 if let Some(associate_table_id) = source.optional_associated_table_id {
2593 associate_table_id.as_job_id()
2594 } else {
2595 source_id.as_share_source_job_id()
2596 };
2597 let job_ids = vec![base_job_id]
2598 .into_iter()
2599 .chain(dep_source_job_ids.into_iter())
2600 .collect_vec();
2601
2602 fragment_updates.push(DependentSourceFragmentUpdate {
2603 job_ids,
2604 with_properties: source_options_with_secret.as_plaintext().clone(),
2605 secret_refs: source_options_with_secret.as_secret().clone(),
2606 is_shared_source,
2607 });
2608
2609 let complete_source_props = LocalSecretManager::global()
2611 .fill_secrets(
2612 source_options_with_secret.as_plaintext().clone(),
2613 source_options_with_secret.as_secret().clone(),
2614 )
2615 .map_err(MetaError::from)?
2616 .into_iter()
2617 .collect::<HashMap<String, String>>();
2618 updated_sources_with_props.push((source_id, complete_source_props));
2619 }
2620
2621 for source_update in source_updates {
2622 source_update.update(&txn).await?;
2623 }
2624
2625 for DependentSourceFragmentUpdate {
2627 job_ids,
2628 with_properties,
2629 secret_refs,
2630 is_shared_source,
2631 } in fragment_updates
2632 {
2633 update_connector_props_fragments(
2634 &txn,
2635 job_ids,
2636 FragmentTypeFlag::Source,
2637 |node, found| {
2638 if let PbNodeBody::Source(node) = node
2639 && let Some(source_inner) = &mut node.source_inner
2640 {
2641 source_inner.with_properties = with_properties.clone();
2642 source_inner.secret_refs = secret_refs.clone();
2643 *found = true;
2644 }
2645 },
2646 is_shared_source,
2647 )
2648 .await?;
2649 }
2650 }
2651
2652 let mut updated_sinks_with_props: Vec<(SinkId, HashMap<String, String>)> = Vec::new();
2654
2655 if !dependent_sinks.is_empty() {
2656 let sinks_with_objs = Sink::find()
2658 .find_also_related(Object)
2659 .filter(sink::Column::SinkId.is_in(dependent_sinks.iter().cloned()))
2660 .all(&txn)
2661 .await?;
2662
2663 let mut sink_updates = Vec::new();
2665 let mut sink_fragment_updates = Vec::new();
2666
2667 for (sink, _obj) in sinks_with_objs {
2668 let sink_id = sink.sink_id;
2669
2670 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2672 Some(connector) => {
2673 let connector_type = connector.to_lowercase();
2674 check_sink_allow_alter_on_fly_fields(&connector_type, &prop_keys)
2675 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2676
2677 match_sink_name_str!(
2678 connector_type.as_str(),
2679 SinkType,
2680 {
2681 let mut new_sink_props = sink.properties.0.clone();
2682 new_sink_props.extend(alter_props.clone());
2683 SinkType::validate_alter_config(&new_sink_props)
2684 },
2685 |sink: &str| Err(SinkError::Config(anyhow!(
2686 "unsupported sink type {}",
2687 sink
2688 )))
2689 )?
2690 }
2691 None => {
2692 return Err(SinkError::Config(anyhow!(
2693 "connector not specified when alter sink"
2694 ))
2695 .into());
2696 }
2697 };
2698
2699 let mut new_sink_props = sink.properties.0.clone();
2700 new_sink_props.extend(alter_props.clone());
2701
2702 let active_sink = sink::ActiveModel {
2704 sink_id: Set(sink_id),
2705 properties: Set(risingwave_meta_model::Property(new_sink_props.clone())),
2706 ..Default::default()
2707 };
2708 sink_updates.push(active_sink);
2709
2710 sink_fragment_updates.push((sink_id, new_sink_props.clone()));
2712
2713 let complete_sink_props: HashMap<String, String> =
2715 new_sink_props.into_iter().collect();
2716 updated_sinks_with_props.push((sink_id, complete_sink_props));
2717 }
2718
2719 for sink_update in sink_updates {
2721 sink_update.update(&txn).await?;
2722 }
2723
2724 for (sink_id, new_sink_props) in sink_fragment_updates {
2726 update_connector_props_fragments(
2727 &txn,
2728 vec![sink_id.as_job_id()],
2729 FragmentTypeFlag::Sink,
2730 |node, found| {
2731 if let PbNodeBody::Sink(node) = node
2732 && let Some(sink_desc) = &mut node.sink_desc
2733 && sink_desc.id == sink_id.as_raw_id()
2734 {
2735 sink_desc.properties = new_sink_props.clone();
2736 *found = true;
2737 }
2738 },
2739 true,
2740 )
2741 .await?;
2742 }
2743 }
2744
2745 let mut updated_objects = Vec::new();
2747
2748 let (connection, obj) = Connection::find_by_id(connection_id)
2750 .find_also_related(Object)
2751 .one(&txn)
2752 .await?
2753 .ok_or_else(|| {
2754 MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2755 })?;
2756 updated_objects.push(PbObject {
2757 object_info: Some(PbObjectInfo::Connection(
2758 ObjectModel(connection, obj.unwrap()).into(),
2759 )),
2760 });
2761
2762 for source_id in &dependent_sources {
2764 let (source, obj) = Source::find_by_id(*source_id)
2765 .find_also_related(Object)
2766 .one(&txn)
2767 .await?
2768 .ok_or_else(|| {
2769 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), *source_id)
2770 })?;
2771 updated_objects.push(PbObject {
2772 object_info: Some(PbObjectInfo::Source(
2773 ObjectModel(source, obj.unwrap()).into(),
2774 )),
2775 });
2776 }
2777
2778 for sink_id in &dependent_sinks {
2780 let (sink, obj) = Sink::find_by_id(*sink_id)
2781 .find_also_related(Object)
2782 .one(&txn)
2783 .await?
2784 .ok_or_else(|| {
2785 MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), *sink_id)
2786 })?;
2787 updated_objects.push(PbObject {
2788 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2789 });
2790 }
2791
2792 txn.commit().await?;
2794
2795 if !updated_objects.is_empty() {
2797 self.notify_frontend(
2798 NotificationOperation::Update,
2799 NotificationInfo::ObjectGroup(PbObjectGroup {
2800 objects: updated_objects,
2801 }),
2802 )
2803 .await;
2804 }
2805
2806 Ok((
2807 connection_options_with_secret,
2808 updated_sources_with_props,
2809 updated_sinks_with_props,
2810 ))
2811 }
2812
2813 pub async fn update_fragment_rate_limit_by_fragment_id(
2814 &self,
2815 fragment_id: FragmentId,
2816 rate_limit: Option<u32>,
2817 ) -> MetaResult<()> {
2818 let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2819 stream_node: &mut PbStreamNode| {
2820 let mut found = false;
2821 if fragment_type_mask.contains_any(
2822 FragmentTypeFlag::dml_rate_limit_fragments()
2823 .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2824 .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2825 .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2826 ) {
2827 visit_stream_node_mut(stream_node, |node| {
2828 if let PbNodeBody::Dml(node) = node {
2829 node.rate_limit = rate_limit;
2830 found = true;
2831 }
2832 if let PbNodeBody::Sink(node) = node {
2833 node.rate_limit = rate_limit;
2834 found = true;
2835 }
2836 if let PbNodeBody::StreamCdcScan(node) = node {
2837 node.rate_limit = rate_limit;
2838 found = true;
2839 }
2840 if let PbNodeBody::StreamScan(node) = node {
2841 node.rate_limit = rate_limit;
2842 found = true;
2843 }
2844 if let PbNodeBody::SourceBackfill(node) = node {
2845 node.rate_limit = rate_limit;
2846 found = true;
2847 }
2848 });
2849 }
2850 found
2851 };
2852 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2853 .await
2854 }
2855
2856 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2859 let inner = self.inner.read().await;
2860 let txn = inner.db.begin().await?;
2861
2862 let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2863 .select_only()
2864 .columns([
2865 fragment::Column::FragmentId,
2866 fragment::Column::JobId,
2867 fragment::Column::FragmentTypeMask,
2868 fragment::Column::StreamNode,
2869 ])
2870 .filter(FragmentTypeMask::intersects_any(
2871 FragmentTypeFlag::rate_limit_fragments(),
2872 ))
2873 .into_tuple()
2874 .all(&txn)
2875 .await?;
2876
2877 let mut rate_limits = Vec::new();
2878 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2879 let stream_node = stream_node.to_protobuf();
2880 visit_stream_node_body(&stream_node, |node| {
2881 let mut rate_limit = None;
2882 let mut node_name = None;
2883
2884 match node {
2885 PbNodeBody::Source(node) => {
2887 if let Some(node_inner) = &node.source_inner {
2888 rate_limit = node_inner.rate_limit;
2889 node_name = Some("SOURCE");
2890 }
2891 }
2892 PbNodeBody::StreamFsFetch(node) => {
2893 if let Some(node_inner) = &node.node_inner {
2894 rate_limit = node_inner.rate_limit;
2895 node_name = Some("FS_FETCH");
2896 }
2897 }
2898 PbNodeBody::SourceBackfill(node) => {
2900 rate_limit = node.rate_limit;
2901 node_name = Some("SOURCE_BACKFILL");
2902 }
2903 PbNodeBody::StreamScan(node) => {
2904 rate_limit = node.rate_limit;
2905 node_name = Some("STREAM_SCAN");
2906 }
2907 PbNodeBody::StreamCdcScan(node) => {
2908 rate_limit = node.rate_limit;
2909 node_name = Some("STREAM_CDC_SCAN");
2910 }
2911 PbNodeBody::Sink(node) => {
2912 rate_limit = node.rate_limit;
2913 node_name = Some("SINK");
2914 }
2915 _ => {}
2916 }
2917
2918 if let Some(rate_limit) = rate_limit {
2919 rate_limits.push(RateLimitInfo {
2920 fragment_id,
2921 job_id,
2922 fragment_type_mask: fragment_type_mask as u32,
2923 rate_limit,
2924 node_name: node_name.unwrap().to_owned(),
2925 });
2926 }
2927 });
2928 }
2929
2930 Ok(rate_limits)
2931 }
2932}
2933
2934fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
2935 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2937 Some(connector) => {
2938 let connector_type = connector.to_lowercase();
2939 let field_names: Vec<String> = props.keys().cloned().collect();
2940 check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2941 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2942
2943 match_sink_name_str!(
2944 connector_type.as_str(),
2945 SinkType,
2946 {
2947 let mut new_props = sink.properties.0.clone();
2948 new_props.extend(props.clone());
2949 SinkType::validate_alter_config(&new_props)
2950 },
2951 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2952 )?
2953 }
2954 None => {
2955 return Err(
2956 SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2957 );
2958 }
2959 };
2960 Ok(())
2961}
2962
2963fn update_stmt_with_props(
2964 with_properties: &mut Vec<SqlOption>,
2965 props: &BTreeMap<String, String>,
2966) -> MetaResult<()> {
2967 let mut new_sql_options = with_properties
2968 .iter()
2969 .map(|sql_option| (&sql_option.name, sql_option))
2970 .collect::<IndexMap<_, _>>();
2971 let add_sql_options = props
2972 .iter()
2973 .map(|(k, v)| SqlOption::try_from((k, v)))
2974 .collect::<Result<Vec<SqlOption>, ParserError>>()
2975 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2976 new_sql_options.extend(
2977 add_sql_options
2978 .iter()
2979 .map(|sql_option| (&sql_option.name, sql_option)),
2980 );
2981 *with_properties = new_sql_options.into_values().cloned().collect();
2982 Ok(())
2983}
2984
2985async fn update_sink_fragment_props(
2986 txn: &DatabaseTransaction,
2987 sink_id: SinkId,
2988 props: BTreeMap<String, String>,
2989) -> MetaResult<()> {
2990 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2991 .select_only()
2992 .columns([
2993 fragment::Column::FragmentId,
2994 fragment::Column::FragmentTypeMask,
2995 fragment::Column::StreamNode,
2996 ])
2997 .filter(fragment::Column::JobId.eq(sink_id))
2998 .into_tuple()
2999 .all(txn)
3000 .await?;
3001 let fragments = fragments
3002 .into_iter()
3003 .filter(|(_, fragment_type_mask, _)| {
3004 *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
3005 })
3006 .filter_map(|(id, _, stream_node)| {
3007 let mut stream_node = stream_node.to_protobuf();
3008 let mut found = false;
3009 visit_stream_node_mut(&mut stream_node, |node| {
3010 if let PbNodeBody::Sink(node) = node
3011 && let Some(sink_desc) = &mut node.sink_desc
3012 && sink_desc.id == sink_id
3013 {
3014 sink_desc.properties.extend(props.clone());
3015 found = true;
3016 }
3017 });
3018 if found { Some((id, stream_node)) } else { None }
3019 })
3020 .collect_vec();
3021 assert!(
3022 !fragments.is_empty(),
3023 "sink id should be used by at least one fragment"
3024 );
3025 for (id, stream_node) in fragments {
3026 fragment::ActiveModel {
3027 fragment_id: Set(id),
3028 stream_node: Set(StreamNode::from(&stream_node)),
3029 ..Default::default()
3030 }
3031 .update(txn)
3032 .await?;
3033 }
3034 Ok(())
3035}
3036
3037pub struct SinkIntoTableContext {
3038 pub updated_sink_catalogs: Vec<SinkId>,
3041}
3042
3043pub struct FinishAutoRefreshSchemaSinkContext {
3044 pub tmp_sink_id: SinkId,
3045 pub original_sink_id: SinkId,
3046 pub columns: Vec<PbColumnCatalog>,
3047 pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
3048}
3049
3050async fn update_connector_props_fragments<F>(
3051 txn: &DatabaseTransaction,
3052 job_ids: Vec<JobId>,
3053 expect_flag: FragmentTypeFlag,
3054 mut alter_stream_node_fn: F,
3055 is_shared_source: bool,
3056) -> MetaResult<()>
3057where
3058 F: FnMut(&mut PbNodeBody, &mut bool),
3059{
3060 let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
3061 .select_only()
3062 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
3063 .filter(
3064 fragment::Column::JobId
3065 .is_in(job_ids.clone())
3066 .and(FragmentTypeMask::intersects(expect_flag)),
3067 )
3068 .into_tuple()
3069 .all(txn)
3070 .await?;
3071 let fragments = fragments
3072 .into_iter()
3073 .filter_map(|(id, stream_node)| {
3074 let mut stream_node = stream_node.to_protobuf();
3075 let mut found = false;
3076 visit_stream_node_mut(&mut stream_node, |node| {
3077 alter_stream_node_fn(node, &mut found);
3078 });
3079 if found { Some((id, stream_node)) } else { None }
3080 })
3081 .collect_vec();
3082 if is_shared_source || job_ids.len() > 1 {
3083 assert!(
3087 !fragments.is_empty(),
3088 "job ids {:?} (type: {:?}) should be used by at least one fragment",
3089 job_ids,
3090 expect_flag
3091 );
3092 }
3093
3094 for (id, stream_node) in fragments {
3095 fragment::ActiveModel {
3096 fragment_id: Set(id),
3097 stream_node: Set(StreamNode::from(&stream_node)),
3098 ..Default::default()
3099 }
3100 .update(txn)
3101 .await?;
3102 }
3103
3104 Ok(())
3105}