1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::util::iter_util::ZipEqDebug;
26use risingwave_common::util::stream_graph_visitor::{
27 visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_common::{bail, current_cluster_version};
30use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
31use risingwave_connector::error::ConnectorError;
32use risingwave_connector::sink::file_sink::fs::FsSink;
33use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
34use risingwave_connector::source::ConnectorProperties;
35use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
36use risingwave_meta_model::object::ObjectType;
37use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
38use risingwave_meta_model::table::TableType;
39use risingwave_meta_model::user_privilege::Action;
40use risingwave_meta_model::*;
41use risingwave_pb::catalog::{PbCreateType, PbTable};
42use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
43use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
44use risingwave_pb::meta::object::PbObjectInfo;
45use risingwave_pb::meta::subscribe_response::{
46 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
47};
48use risingwave_pb::meta::{PbObject, PbObjectGroup};
49use risingwave_pb::plan_common::PbColumnCatalog;
50use risingwave_pb::secret::PbSecretRef;
51use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
52use risingwave_pb::stream_plan::stream_node::PbNodeBody;
53use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
54use risingwave_pb::user::PbUserInfo;
55use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
56use risingwave_sqlparser::parser::{Parser, ParserError};
57use sea_orm::ActiveValue::Set;
58use sea_orm::sea_query::{Expr, Query, SimpleExpr};
59use sea_orm::{
60 ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
61 ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
62};
63use thiserror_ext::AsReport;
64
65use super::rename::IndexItemRewriter;
66use crate::barrier::{Command, SharedFragmentInfo};
67use crate::controller::ObjectModel;
68use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
69use crate::controller::fragment::FragmentTypeMaskExt;
70use crate::controller::utils::{
71 PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
72 check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
73 ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
74 get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
75 list_user_info_by_ids, try_get_iceberg_table_by_downstream_sink,
76};
77use crate::error::MetaErrorInner;
78use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
79use crate::model::{
80 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
81 StreamJobFragmentsToCreate,
82};
83use crate::stream::SplitAssignment;
84use crate::{MetaError, MetaResult};
85
86impl CatalogController {
87 pub async fn create_streaming_job_obj(
88 txn: &DatabaseTransaction,
89 obj_type: ObjectType,
90 owner_id: UserId,
91 database_id: Option<DatabaseId>,
92 schema_id: Option<SchemaId>,
93 create_type: PbCreateType,
94 timezone: Option<String>,
95 streaming_parallelism: StreamingParallelism,
96 max_parallelism: usize,
97 specific_resource_group: Option<String>, ) -> MetaResult<JobId> {
99 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
100 let job_id = obj.oid.as_job_id();
101 let job = streaming_job::ActiveModel {
102 job_id: Set(job_id),
103 job_status: Set(JobStatus::Initial),
104 create_type: Set(create_type.into()),
105 timezone: Set(timezone),
106 parallelism: Set(streaming_parallelism),
107 max_parallelism: Set(max_parallelism as _),
108 specific_resource_group: Set(specific_resource_group),
109 };
110 job.insert(txn).await?;
111
112 Ok(job_id)
113 }
114
115 #[await_tree::instrument]
121 pub async fn create_job_catalog(
122 &self,
123 streaming_job: &mut StreamingJob,
124 ctx: &StreamContext,
125 parallelism: &Option<Parallelism>,
126 max_parallelism: usize,
127 mut dependencies: HashSet<ObjectId>,
128 specific_resource_group: Option<String>,
129 ) -> MetaResult<()> {
130 let inner = self.inner.write().await;
131 let txn = inner.db.begin().await?;
132 let create_type = streaming_job.create_type();
133
134 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
135 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
136 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
137 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
138 };
139
140 ensure_user_id(streaming_job.owner() as _, &txn).await?;
141 ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
142 ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
143 check_relation_name_duplicate(
144 &streaming_job.name(),
145 streaming_job.database_id(),
146 streaming_job.schema_id(),
147 &txn,
148 )
149 .await?;
150
151 if !dependencies.is_empty() {
153 let altering_cnt = ObjectDependency::find()
154 .join(
155 JoinType::InnerJoin,
156 object_dependency::Relation::Object1.def(),
157 )
158 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
159 .filter(
160 object_dependency::Column::Oid
161 .is_in(dependencies.clone())
162 .and(object::Column::ObjType.eq(ObjectType::Table))
163 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
164 .and(
165 object::Column::Oid.not_in_subquery(
167 Query::select()
168 .column(table::Column::TableId)
169 .from(Table)
170 .to_owned(),
171 ),
172 ),
173 )
174 .count(&txn)
175 .await?;
176 if altering_cnt != 0 {
177 return Err(MetaError::permission_denied(
178 "some dependent relations are being altered",
179 ));
180 }
181 }
182
183 match streaming_job {
184 StreamingJob::MaterializedView(table) => {
185 let job_id = Self::create_streaming_job_obj(
186 &txn,
187 ObjectType::Table,
188 table.owner as _,
189 Some(table.database_id),
190 Some(table.schema_id),
191 create_type,
192 ctx.timezone.clone(),
193 streaming_parallelism,
194 max_parallelism,
195 specific_resource_group,
196 )
197 .await?;
198 table.id = job_id.as_mv_table_id();
199 let table_model: table::ActiveModel = table.clone().into();
200 Table::insert(table_model).exec(&txn).await?;
201 }
202 StreamingJob::Sink(sink) => {
203 if let Some(target_table_id) = sink.target_table
204 && check_sink_into_table_cycle(
205 target_table_id.into(),
206 dependencies.iter().cloned().collect(),
207 &txn,
208 )
209 .await?
210 {
211 bail!("Creating such a sink will result in circular dependency.");
212 }
213
214 let job_id = Self::create_streaming_job_obj(
215 &txn,
216 ObjectType::Sink,
217 sink.owner as _,
218 Some(sink.database_id),
219 Some(sink.schema_id),
220 create_type,
221 ctx.timezone.clone(),
222 streaming_parallelism,
223 max_parallelism,
224 specific_resource_group,
225 )
226 .await?;
227 sink.id = job_id.as_sink_id();
228 let sink_model: sink::ActiveModel = sink.clone().into();
229 Sink::insert(sink_model).exec(&txn).await?;
230 }
231 StreamingJob::Table(src, table, _) => {
232 let job_id = Self::create_streaming_job_obj(
233 &txn,
234 ObjectType::Table,
235 table.owner as _,
236 Some(table.database_id),
237 Some(table.schema_id),
238 create_type,
239 ctx.timezone.clone(),
240 streaming_parallelism,
241 max_parallelism,
242 specific_resource_group,
243 )
244 .await?;
245 table.id = job_id.as_mv_table_id();
246 if let Some(src) = src {
247 let src_obj = Self::create_object(
248 &txn,
249 ObjectType::Source,
250 src.owner as _,
251 Some(src.database_id),
252 Some(src.schema_id),
253 )
254 .await?;
255 src.id = src_obj.oid.as_source_id();
256 src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
257 table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
258 let source: source::ActiveModel = src.clone().into();
259 Source::insert(source).exec(&txn).await?;
260 }
261 let table_model: table::ActiveModel = table.clone().into();
262 Table::insert(table_model).exec(&txn).await?;
263 }
264 StreamingJob::Index(index, table) => {
265 ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
266 let job_id = Self::create_streaming_job_obj(
267 &txn,
268 ObjectType::Index,
269 index.owner as _,
270 Some(index.database_id),
271 Some(index.schema_id),
272 create_type,
273 ctx.timezone.clone(),
274 streaming_parallelism,
275 max_parallelism,
276 specific_resource_group,
277 )
278 .await?;
279 index.id = job_id.as_index_id();
281 index.index_table_id = job_id.as_mv_table_id();
282 table.id = job_id.as_mv_table_id();
283
284 ObjectDependency::insert(object_dependency::ActiveModel {
285 oid: Set(index.primary_table_id.into()),
286 used_by: Set(table.id.into()),
287 ..Default::default()
288 })
289 .exec(&txn)
290 .await?;
291
292 let table_model: table::ActiveModel = table.clone().into();
293 Table::insert(table_model).exec(&txn).await?;
294 let index_model: index::ActiveModel = index.clone().into();
295 Index::insert(index_model).exec(&txn).await?;
296 }
297 StreamingJob::Source(src) => {
298 let job_id = Self::create_streaming_job_obj(
299 &txn,
300 ObjectType::Source,
301 src.owner as _,
302 Some(src.database_id),
303 Some(src.schema_id),
304 create_type,
305 ctx.timezone.clone(),
306 streaming_parallelism,
307 max_parallelism,
308 specific_resource_group,
309 )
310 .await?;
311 src.id = job_id.as_shared_source_id();
312 let source_model: source::ActiveModel = src.clone().into();
313 Source::insert(source_model).exec(&txn).await?;
314 }
315 }
316
317 dependencies.extend(
319 streaming_job
320 .dependent_secret_ids()?
321 .into_iter()
322 .map(|id| id.as_object_id()),
323 );
324 dependencies.extend(
326 streaming_job
327 .dependent_connection_ids()?
328 .into_iter()
329 .map(|id| id.as_object_id()),
330 );
331
332 if !dependencies.is_empty() {
334 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
335 object_dependency::ActiveModel {
336 oid: Set(oid),
337 used_by: Set(streaming_job.id().as_object_id()),
338 ..Default::default()
339 }
340 }))
341 .exec(&txn)
342 .await?;
343 }
344
345 txn.commit().await?;
346
347 Ok(())
348 }
349
350 pub async fn create_internal_table_catalog(
358 &self,
359 job: &StreamingJob,
360 mut incomplete_internal_tables: Vec<PbTable>,
361 ) -> MetaResult<HashMap<TableId, TableId>> {
362 let job_id = job.id();
363 let inner = self.inner.write().await;
364 let txn = inner.db.begin().await?;
365
366 ensure_job_not_canceled(job_id, &txn).await?;
368
369 let mut table_id_map = HashMap::new();
370 for table in &mut incomplete_internal_tables {
371 let table_id = Self::create_object(
372 &txn,
373 ObjectType::Table,
374 table.owner as _,
375 Some(table.database_id),
376 Some(table.schema_id),
377 )
378 .await?
379 .oid
380 .as_table_id();
381 table_id_map.insert(table.id, table_id);
382 table.id = table_id;
383 table.job_id = Some(job_id);
384
385 let table_model = table::ActiveModel {
386 table_id: Set(table_id),
387 belongs_to_job_id: Set(Some(job_id)),
388 fragment_id: NotSet,
389 ..table.clone().into()
390 };
391 Table::insert(table_model).exec(&txn).await?;
392 }
393 txn.commit().await?;
394
395 Ok(table_id_map)
396 }
397
398 pub async fn prepare_stream_job_fragments(
399 &self,
400 stream_job_fragments: &StreamJobFragmentsToCreate,
401 streaming_job: &StreamingJob,
402 for_replace: bool,
403 ) -> MetaResult<()> {
404 self.prepare_streaming_job(
405 stream_job_fragments.stream_job_id(),
406 || stream_job_fragments.fragments.values(),
407 &stream_job_fragments.downstreams,
408 for_replace,
409 Some(streaming_job),
410 )
411 .await
412 }
413
414 #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
419 )]
420 pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
421 &self,
422 job_id: JobId,
423 get_fragments: impl Fn() -> I + 'a,
424 downstreams: &FragmentDownstreamRelation,
425 for_replace: bool,
426 creating_streaming_job: Option<&'a StreamingJob>,
427 ) -> MetaResult<()> {
428 let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
429
430 let inner = self.inner.write().await;
431
432 let need_notify = creating_streaming_job
433 .map(|job| job.should_notify_creating())
434 .unwrap_or(false);
435 let definition = creating_streaming_job.map(|job| job.definition());
436
437 let mut objects_to_notify = vec![];
438 let txn = inner.db.begin().await?;
439
440 ensure_job_not_canceled(job_id, &txn).await?;
442
443 for fragment in fragments {
444 let fragment_id = fragment.fragment_id;
445 let state_table_ids = fragment.state_table_ids.inner_ref().clone();
446
447 let fragment = fragment.into_active_model();
448 Fragment::insert(fragment).exec(&txn).await?;
449
450 if !for_replace {
453 let all_tables = StreamJobFragments::collect_tables(get_fragments());
454 for state_table_id in state_table_ids {
455 let table = all_tables
459 .get(&state_table_id)
460 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
461 assert_eq!(table.id, state_table_id);
462 assert_eq!(table.fragment_id, fragment_id);
463 let vnode_count = table.vnode_count();
464
465 table::ActiveModel {
466 table_id: Set(state_table_id as _),
467 fragment_id: Set(Some(fragment_id)),
468 vnode_count: Set(vnode_count as _),
469 ..Default::default()
470 }
471 .update(&txn)
472 .await?;
473
474 if need_notify {
475 let mut table = table.clone();
476 if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
478 table.definition = definition.clone().unwrap();
479 }
480 objects_to_notify.push(PbObject {
481 object_info: Some(PbObjectInfo::Table(table)),
482 });
483 }
484 }
485 }
486 }
487
488 if need_notify {
490 match creating_streaming_job.unwrap() {
491 StreamingJob::Table(Some(source), ..) => {
492 objects_to_notify.push(PbObject {
493 object_info: Some(PbObjectInfo::Source(source.clone())),
494 });
495 }
496 StreamingJob::Sink(sink) => {
497 objects_to_notify.push(PbObject {
498 object_info: Some(PbObjectInfo::Sink(sink.clone())),
499 });
500 }
501 StreamingJob::Index(index, _) => {
502 objects_to_notify.push(PbObject {
503 object_info: Some(PbObjectInfo::Index(index.clone())),
504 });
505 }
506 _ => {}
507 }
508 }
509
510 insert_fragment_relations(&txn, downstreams).await?;
511
512 if !for_replace {
513 if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
515 Table::update(table::ActiveModel {
516 table_id: Set(table.id),
517 dml_fragment_id: Set(table.dml_fragment_id),
518 ..Default::default()
519 })
520 .exec(&txn)
521 .await?;
522 }
523 }
524
525 txn.commit().await?;
526
527 if !objects_to_notify.is_empty() {
531 self.notify_frontend(
532 Operation::Add,
533 Info::ObjectGroup(PbObjectGroup {
534 objects: objects_to_notify,
535 }),
536 )
537 .await;
538 }
539
540 Ok(())
541 }
542
543 pub async fn build_cancel_command(
546 &self,
547 table_fragments: &StreamJobFragments,
548 ) -> MetaResult<Command> {
549 let inner = self.inner.read().await;
550 let txn = inner.db.begin().await?;
551
552 let dropped_sink_fragment_with_target =
553 if let Some(sink_fragment) = table_fragments.sink_fragment() {
554 let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
555 let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
556 sink_target_fragment
557 .get(&sink_fragment_id)
558 .map(|target_fragments| {
559 let target_fragment_id = *target_fragments
560 .first()
561 .expect("sink should have at least one downstream fragment");
562 (sink_fragment_id, target_fragment_id)
563 })
564 } else {
565 None
566 };
567
568 Ok(Command::DropStreamingJobs {
569 streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
570 actors: table_fragments.actor_ids().collect(),
571 unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
572 unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
573 dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
574 .into_iter()
575 .map(|(sink, target)| (target as _, vec![sink as _]))
576 .collect(),
577 })
578 }
579
580 #[await_tree::instrument]
584 pub async fn try_abort_creating_streaming_job(
585 &self,
586 mut job_id: JobId,
587 is_cancelled: bool,
588 ) -> MetaResult<(bool, Option<DatabaseId>)> {
589 let mut inner = self.inner.write().await;
590 let txn = inner.db.begin().await?;
591
592 let obj = Object::find_by_id(job_id).one(&txn).await?;
593 let Some(obj) = obj else {
594 tracing::warn!(
595 id = %job_id,
596 "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
597 );
598 return Ok((true, None));
599 };
600 let database_id = obj
601 .database_id
602 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
603 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
604
605 if !is_cancelled && let Some(streaming_job) = &streaming_job {
606 assert_ne!(streaming_job.job_status, JobStatus::Created);
607 if streaming_job.create_type == CreateType::Background
608 && streaming_job.job_status == JobStatus::Creating
609 {
610 if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
611 && check_if_belongs_to_iceberg_table(&txn, job_id).await?
612 {
613 } else {
615 tracing::warn!(
617 id = %job_id,
618 "streaming job is created in background and still in creating status"
619 );
620 return Ok((false, Some(database_id)));
621 }
622 }
623 }
624
625 let iceberg_table_id =
626 try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
627 if let Some(iceberg_table_id) = iceberg_table_id {
628 let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
631 Object::delete_many()
632 .filter(
633 object::Column::Oid
634 .eq(job_id)
635 .or(object::Column::Oid.is_in(internal_tables)),
636 )
637 .exec(&txn)
638 .await?;
639 job_id = iceberg_table_id.as_job_id();
640 };
641
642 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
643
644 let mut objs = vec![];
646 let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
647
648 let mut need_notify =
649 streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
650 if !need_notify {
651 if let Some(table) = &table_obj {
653 need_notify = table.table_type == TableType::MaterializedView;
654 }
655 }
656
657 if is_cancelled {
658 let dropped_tables = Table::find()
659 .find_also_related(Object)
660 .filter(
661 table::Column::TableId.is_in(
662 internal_table_ids
663 .iter()
664 .cloned()
665 .chain(table_obj.iter().map(|t| t.table_id as _)),
666 ),
667 )
668 .all(&txn)
669 .await?
670 .into_iter()
671 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
672 inner
673 .dropped_tables
674 .extend(dropped_tables.map(|t| (t.id, t)));
675 }
676
677 if need_notify {
678 let obj: Option<PartialObject> = Object::find_by_id(job_id)
679 .select_only()
680 .columns([
681 object::Column::Oid,
682 object::Column::ObjType,
683 object::Column::SchemaId,
684 object::Column::DatabaseId,
685 ])
686 .into_partial_model()
687 .one(&txn)
688 .await?;
689 let obj =
690 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
691 objs.push(obj);
692 let internal_table_objs: Vec<PartialObject> = Object::find()
693 .select_only()
694 .columns([
695 object::Column::Oid,
696 object::Column::ObjType,
697 object::Column::SchemaId,
698 object::Column::DatabaseId,
699 ])
700 .join(JoinType::InnerJoin, object::Relation::Table.def())
701 .filter(table::Column::BelongsToJobId.eq(job_id))
702 .into_partial_model()
703 .all(&txn)
704 .await?;
705 objs.extend(internal_table_objs);
706 }
707
708 if table_obj.is_none()
710 && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_sink_id())
711 .select_only()
712 .column(sink::Column::TargetTable)
713 .into_tuple::<Option<TableId>>()
714 .one(&txn)
715 .await?
716 {
717 let tmp_id: Option<ObjectId> = ObjectDependency::find()
718 .select_only()
719 .column(object_dependency::Column::UsedBy)
720 .join(
721 JoinType::InnerJoin,
722 object_dependency::Relation::Object1.def(),
723 )
724 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
725 .filter(
726 object_dependency::Column::Oid
727 .eq(target_table_id)
728 .and(object::Column::ObjType.eq(ObjectType::Table))
729 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
730 )
731 .into_tuple()
732 .one(&txn)
733 .await?;
734 if let Some(tmp_id) = tmp_id {
735 tracing::warn!(
736 id = %tmp_id,
737 "aborting temp streaming job for sink into table"
738 );
739
740 Object::delete_by_id(tmp_id).exec(&txn).await?;
741 }
742 }
743
744 Object::delete_by_id(job_id).exec(&txn).await?;
745 if !internal_table_ids.is_empty() {
746 Object::delete_many()
747 .filter(object::Column::Oid.is_in(internal_table_ids))
748 .exec(&txn)
749 .await?;
750 }
751 if let Some(t) = &table_obj
752 && let Some(source_id) = t.optional_associated_source_id
753 {
754 Object::delete_by_id(source_id).exec(&txn).await?;
755 }
756
757 let err = if is_cancelled {
758 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
759 } else {
760 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
761 };
762 let abort_reason = format!("streaming job aborted {}", err.as_report());
763 for tx in inner
764 .creating_table_finish_notifier
765 .get_mut(&database_id)
766 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
767 .into_iter()
768 .flatten()
769 .flatten()
770 {
771 let _ = tx.send(Err(abort_reason.clone()));
772 }
773 txn.commit().await?;
774
775 if !objs.is_empty() {
776 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
779 .await;
780 }
781 Ok((true, Some(database_id)))
782 }
783
784 #[await_tree::instrument]
785 pub async fn post_collect_job_fragments(
786 &self,
787 job_id: JobId,
788 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
789 new_sink_downstream: Option<FragmentDownstreamRelation>,
790 split_assignment: Option<&SplitAssignment>,
791 ) -> MetaResult<()> {
792 let inner = self.inner.write().await;
793 let txn = inner.db.begin().await?;
794
795 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
796
797 if let Some(new_downstream) = new_sink_downstream {
798 insert_fragment_relations(&txn, &new_downstream).await?;
799 }
800
801 streaming_job::ActiveModel {
803 job_id: Set(job_id),
804 job_status: Set(JobStatus::Creating),
805 ..Default::default()
806 }
807 .update(&txn)
808 .await?;
809
810 if let Some(split_assignment) = split_assignment {
811 let fragment_splits = split_assignment
812 .iter()
813 .map(|(fragment_id, splits)| {
814 (
815 *fragment_id as _,
816 splits.values().flatten().cloned().collect_vec(),
817 )
818 })
819 .collect();
820
821 self.update_fragment_splits(&txn, &fragment_splits).await?;
822 }
823
824 txn.commit().await?;
825
826 Ok(())
827 }
828
829 pub async fn create_job_catalog_for_replace(
830 &self,
831 streaming_job: &StreamingJob,
832 ctx: Option<&StreamContext>,
833 specified_parallelism: Option<&NonZeroUsize>,
834 expected_original_max_parallelism: Option<usize>,
835 ) -> MetaResult<JobId> {
836 let id = streaming_job.id();
837 let inner = self.inner.write().await;
838 let txn = inner.db.begin().await?;
839
840 streaming_job.verify_version_for_replace(&txn).await?;
842 let referring_cnt = ObjectDependency::find()
844 .join(
845 JoinType::InnerJoin,
846 object_dependency::Relation::Object1.def(),
847 )
848 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
849 .filter(
850 object_dependency::Column::Oid
851 .eq(id)
852 .and(object::Column::ObjType.eq(ObjectType::Table))
853 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
854 )
855 .count(&txn)
856 .await?;
857 if referring_cnt != 0 {
858 return Err(MetaError::permission_denied(
859 "job is being altered or referenced by some creating jobs",
860 ));
861 }
862
863 let (original_max_parallelism, original_timezone): (i32, Option<String>) =
865 StreamingJobModel::find_by_id(id)
866 .select_only()
867 .column(streaming_job::Column::MaxParallelism)
868 .column(streaming_job::Column::Timezone)
869 .into_tuple()
870 .one(&txn)
871 .await?
872 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
873
874 if let Some(max_parallelism) = expected_original_max_parallelism
875 && original_max_parallelism != max_parallelism as i32
876 {
877 bail!(
880 "cannot use a different max parallelism \
881 when replacing streaming job, \
882 original: {}, new: {}",
883 original_max_parallelism,
884 max_parallelism
885 );
886 }
887
888 let parallelism = match specified_parallelism {
889 None => StreamingParallelism::Adaptive,
890 Some(n) => StreamingParallelism::Fixed(n.get() as _),
891 };
892 let timezone = ctx
893 .map(|ctx| ctx.timezone.clone())
894 .unwrap_or(original_timezone);
895
896 let new_obj_id = Self::create_streaming_job_obj(
898 &txn,
899 streaming_job.object_type(),
900 streaming_job.owner() as _,
901 Some(streaming_job.database_id() as _),
902 Some(streaming_job.schema_id() as _),
903 streaming_job.create_type(),
904 timezone,
905 parallelism,
906 original_max_parallelism as _,
907 None,
908 )
909 .await?;
910
911 ObjectDependency::insert(object_dependency::ActiveModel {
913 oid: Set(id.as_object_id()),
914 used_by: Set(new_obj_id.as_object_id()),
915 ..Default::default()
916 })
917 .exec(&txn)
918 .await?;
919
920 txn.commit().await?;
921
922 Ok(new_obj_id)
923 }
924
925 pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
927 let mut inner = self.inner.write().await;
928 let txn = inner.db.begin().await?;
929
930 if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
932 tracing::info!(
933 "streaming job {} is for iceberg table, wait for manual finish operation",
934 job_id
935 );
936 return Ok(());
937 }
938
939 let (notification_op, objects, updated_user_info) =
940 Self::finish_streaming_job_inner(&txn, job_id).await?;
941
942 txn.commit().await?;
943
944 let mut version = self
945 .notify_frontend(
946 notification_op,
947 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
948 )
949 .await;
950
951 if !updated_user_info.is_empty() {
953 version = self.notify_users_update(updated_user_info).await;
954 }
955
956 inner
957 .creating_table_finish_notifier
958 .values_mut()
959 .for_each(|creating_tables| {
960 if let Some(txs) = creating_tables.remove(&job_id) {
961 for tx in txs {
962 let _ = tx.send(Ok(version));
963 }
964 }
965 });
966
967 Ok(())
968 }
969
970 pub async fn finish_streaming_job_inner(
972 txn: &DatabaseTransaction,
973 job_id: JobId,
974 ) -> MetaResult<(Operation, Vec<risingwave_pb::meta::Object>, Vec<PbUserInfo>)> {
975 let job_type = Object::find_by_id(job_id)
976 .select_only()
977 .column(object::Column::ObjType)
978 .into_tuple()
979 .one(txn)
980 .await?
981 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
982
983 let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
984 .select_only()
985 .column(streaming_job::Column::CreateType)
986 .into_tuple()
987 .one(txn)
988 .await?
989 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
990
991 let res = Object::update_many()
993 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
994 .col_expr(
995 object::Column::CreatedAtClusterVersion,
996 current_cluster_version().into(),
997 )
998 .filter(object::Column::Oid.eq(job_id))
999 .exec(txn)
1000 .await?;
1001 if res.rows_affected == 0 {
1002 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1003 }
1004
1005 let job = streaming_job::ActiveModel {
1007 job_id: Set(job_id),
1008 job_status: Set(JobStatus::Created),
1009 ..Default::default()
1010 };
1011 job.update(txn).await?;
1012
1013 let internal_table_objs = Table::find()
1015 .find_also_related(Object)
1016 .filter(table::Column::BelongsToJobId.eq(job_id))
1017 .all(txn)
1018 .await?;
1019 let mut objects = internal_table_objs
1020 .iter()
1021 .map(|(table, obj)| PbObject {
1022 object_info: Some(PbObjectInfo::Table(
1023 ObjectModel(table.clone(), obj.clone().unwrap()).into(),
1024 )),
1025 })
1026 .collect_vec();
1027 let mut notification_op = if create_type == CreateType::Background {
1028 NotificationOperation::Update
1029 } else {
1030 NotificationOperation::Add
1031 };
1032 let mut updated_user_info = vec![];
1033
1034 match job_type {
1035 ObjectType::Table => {
1036 let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1037 .find_also_related(Object)
1038 .one(txn)
1039 .await?
1040 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1041 if table.table_type == TableType::MaterializedView {
1042 notification_op = NotificationOperation::Update;
1043 }
1044
1045 if let Some(source_id) = table.optional_associated_source_id {
1046 let (src, obj) = Source::find_by_id(source_id)
1047 .find_also_related(Object)
1048 .one(txn)
1049 .await?
1050 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1051 objects.push(PbObject {
1052 object_info: Some(PbObjectInfo::Source(
1053 ObjectModel(src, obj.unwrap()).into(),
1054 )),
1055 });
1056 }
1057 objects.push(PbObject {
1058 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1059 });
1060 }
1061 ObjectType::Sink => {
1062 let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1063 .find_also_related(Object)
1064 .one(txn)
1065 .await?
1066 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1067 objects.push(PbObject {
1068 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
1069 });
1070 }
1071 ObjectType::Index => {
1072 let (index, obj) = Index::find_by_id(job_id.as_index_id())
1073 .find_also_related(Object)
1074 .one(txn)
1075 .await?
1076 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1077 {
1078 let (table, obj) = Table::find_by_id(index.index_table_id)
1079 .find_also_related(Object)
1080 .one(txn)
1081 .await?
1082 .ok_or_else(|| {
1083 MetaError::catalog_id_not_found("table", index.index_table_id)
1084 })?;
1085 objects.push(PbObject {
1086 object_info: Some(PbObjectInfo::Table(
1087 ObjectModel(table, obj.unwrap()).into(),
1088 )),
1089 });
1090 }
1091
1092 let primary_table_privileges = UserPrivilege::find()
1095 .filter(
1096 user_privilege::Column::Oid
1097 .eq(index.primary_table_id)
1098 .and(user_privilege::Column::Action.eq(Action::Select)),
1099 )
1100 .all(txn)
1101 .await?;
1102 if !primary_table_privileges.is_empty() {
1103 let index_state_table_ids: Vec<TableId> = Table::find()
1104 .select_only()
1105 .column(table::Column::TableId)
1106 .filter(
1107 table::Column::BelongsToJobId
1108 .eq(job_id)
1109 .or(table::Column::TableId.eq(index.index_table_id)),
1110 )
1111 .into_tuple()
1112 .all(txn)
1113 .await?;
1114 let mut new_privileges = vec![];
1115 for privilege in &primary_table_privileges {
1116 for state_table_id in &index_state_table_ids {
1117 new_privileges.push(user_privilege::ActiveModel {
1118 id: Default::default(),
1119 oid: Set(state_table_id.as_object_id()),
1120 user_id: Set(privilege.user_id),
1121 action: Set(Action::Select),
1122 dependent_id: Set(privilege.dependent_id),
1123 granted_by: Set(privilege.granted_by),
1124 with_grant_option: Set(privilege.with_grant_option),
1125 });
1126 }
1127 }
1128 UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1129
1130 updated_user_info = list_user_info_by_ids(
1131 primary_table_privileges.into_iter().map(|p| p.user_id),
1132 txn,
1133 )
1134 .await?;
1135 }
1136
1137 objects.push(PbObject {
1138 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1139 });
1140 }
1141 ObjectType::Source => {
1142 let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1143 .find_also_related(Object)
1144 .one(txn)
1145 .await?
1146 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1147 objects.push(PbObject {
1148 object_info: Some(PbObjectInfo::Source(
1149 ObjectModel(source, obj.unwrap()).into(),
1150 )),
1151 });
1152 }
1153 _ => unreachable!("invalid job type: {:?}", job_type),
1154 }
1155
1156 if job_type != ObjectType::Index {
1157 updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1158 }
1159
1160 Ok((notification_op, objects, updated_user_info))
1161 }
1162
1163 pub async fn finish_replace_streaming_job(
1164 &self,
1165 tmp_id: JobId,
1166 streaming_job: StreamingJob,
1167 replace_upstream: FragmentReplaceUpstream,
1168 sink_into_table_context: SinkIntoTableContext,
1169 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1170 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1171 ) -> MetaResult<NotificationVersion> {
1172 let inner = self.inner.write().await;
1173 let txn = inner.db.begin().await?;
1174
1175 let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1176 tmp_id,
1177 replace_upstream,
1178 sink_into_table_context,
1179 &txn,
1180 streaming_job,
1181 drop_table_connector_ctx,
1182 auto_refresh_schema_sinks,
1183 )
1184 .await?;
1185
1186 txn.commit().await?;
1187
1188 let mut version = self
1189 .notify_frontend(
1190 NotificationOperation::Update,
1191 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1192 )
1193 .await;
1194
1195 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1196 self.notify_users_update(user_infos).await;
1197 version = self
1198 .notify_frontend(
1199 NotificationOperation::Delete,
1200 build_object_group_for_delete(to_drop_objects),
1201 )
1202 .await;
1203 }
1204
1205 Ok(version)
1206 }
1207
1208 pub async fn finish_replace_streaming_job_inner(
1209 tmp_id: JobId,
1210 replace_upstream: FragmentReplaceUpstream,
1211 SinkIntoTableContext {
1212 updated_sink_catalogs,
1213 }: SinkIntoTableContext,
1214 txn: &DatabaseTransaction,
1215 streaming_job: StreamingJob,
1216 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1217 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1218 ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1219 let original_job_id = streaming_job.id();
1220 let job_type = streaming_job.job_type();
1221
1222 let mut index_item_rewriter = None;
1223
1224 match streaming_job {
1226 StreamingJob::Table(_source, table, _table_job_type) => {
1227 let original_column_catalogs =
1230 get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1231
1232 index_item_rewriter = Some({
1233 let original_columns = original_column_catalogs
1234 .to_protobuf()
1235 .into_iter()
1236 .map(|c| c.column_desc.unwrap())
1237 .collect_vec();
1238 let new_columns = table
1239 .columns
1240 .iter()
1241 .map(|c| c.column_desc.clone().unwrap())
1242 .collect_vec();
1243
1244 IndexItemRewriter {
1245 original_columns,
1246 new_columns,
1247 }
1248 });
1249
1250 for sink_id in updated_sink_catalogs {
1252 sink::ActiveModel {
1253 sink_id: Set(sink_id as _),
1254 original_target_columns: Set(Some(original_column_catalogs.clone())),
1255 ..Default::default()
1256 }
1257 .update(txn)
1258 .await?;
1259 }
1260 let mut table = table::ActiveModel::from(table);
1262 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1263 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1264 {
1265 table.optional_associated_source_id = Set(None);
1267 }
1268
1269 table.update(txn).await?;
1270 }
1271 StreamingJob::Source(source) => {
1272 let source = source::ActiveModel::from(source);
1274 source.update(txn).await?;
1275 }
1276 StreamingJob::MaterializedView(table) => {
1277 let table = table::ActiveModel::from(table);
1279 table.update(txn).await?;
1280 }
1281 _ => unreachable!(
1282 "invalid streaming job type: {:?}",
1283 streaming_job.job_type_str()
1284 ),
1285 }
1286
1287 async fn finish_fragments(
1288 txn: &DatabaseTransaction,
1289 tmp_id: JobId,
1290 original_job_id: JobId,
1291 replace_upstream: FragmentReplaceUpstream,
1292 ) -> MetaResult<()> {
1293 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1297 .select_only()
1298 .columns([
1299 fragment::Column::FragmentId,
1300 fragment::Column::StateTableIds,
1301 ])
1302 .filter(fragment::Column::JobId.eq(tmp_id))
1303 .into_tuple()
1304 .all(txn)
1305 .await?;
1306 for (fragment_id, state_table_ids) in fragment_info {
1307 for state_table_id in state_table_ids.into_inner() {
1308 let state_table_id = TableId::new(state_table_id as _);
1309 table::ActiveModel {
1310 table_id: Set(state_table_id),
1311 fragment_id: Set(Some(fragment_id)),
1312 ..Default::default()
1314 }
1315 .update(txn)
1316 .await?;
1317 }
1318 }
1319
1320 Fragment::delete_many()
1322 .filter(fragment::Column::JobId.eq(original_job_id))
1323 .exec(txn)
1324 .await?;
1325 Fragment::update_many()
1326 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1327 .filter(fragment::Column::JobId.eq(tmp_id))
1328 .exec(txn)
1329 .await?;
1330
1331 for (fragment_id, fragment_replace_map) in replace_upstream {
1334 let (fragment_id, mut stream_node) =
1335 Fragment::find_by_id(fragment_id as FragmentId)
1336 .select_only()
1337 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1338 .into_tuple::<(FragmentId, StreamNode)>()
1339 .one(txn)
1340 .await?
1341 .map(|(id, node)| (id, node.to_protobuf()))
1342 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1343
1344 visit_stream_node_mut(&mut stream_node, |body| {
1345 if let PbNodeBody::Merge(m) = body
1346 && let Some(new_fragment_id) =
1347 fragment_replace_map.get(&m.upstream_fragment_id)
1348 {
1349 m.upstream_fragment_id = *new_fragment_id;
1350 }
1351 });
1352 fragment::ActiveModel {
1353 fragment_id: Set(fragment_id),
1354 stream_node: Set(StreamNode::from(&stream_node)),
1355 ..Default::default()
1356 }
1357 .update(txn)
1358 .await?;
1359 }
1360
1361 Object::delete_by_id(tmp_id).exec(txn).await?;
1363
1364 Ok(())
1365 }
1366
1367 finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1368
1369 let mut objects = vec![];
1371 match job_type {
1372 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1373 let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1374 .find_also_related(Object)
1375 .one(txn)
1376 .await?
1377 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1378 objects.push(PbObject {
1379 object_info: Some(PbObjectInfo::Table(
1380 ObjectModel(table, table_obj.unwrap()).into(),
1381 )),
1382 })
1383 }
1384 StreamingJobType::Source => {
1385 let (source, source_obj) =
1386 Source::find_by_id(original_job_id.as_shared_source_id())
1387 .find_also_related(Object)
1388 .one(txn)
1389 .await?
1390 .ok_or_else(|| {
1391 MetaError::catalog_id_not_found("object", original_job_id)
1392 })?;
1393 objects.push(PbObject {
1394 object_info: Some(PbObjectInfo::Source(
1395 ObjectModel(source, source_obj.unwrap()).into(),
1396 )),
1397 })
1398 }
1399 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1400 }
1401
1402 if let Some(expr_rewriter) = index_item_rewriter {
1403 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1404 .select_only()
1405 .columns([index::Column::IndexId, index::Column::IndexItems])
1406 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1407 .into_tuple()
1408 .all(txn)
1409 .await?;
1410 for (index_id, nodes) in index_items {
1411 let mut pb_nodes = nodes.to_protobuf();
1412 pb_nodes
1413 .iter_mut()
1414 .for_each(|x| expr_rewriter.rewrite_expr(x));
1415 let index = index::ActiveModel {
1416 index_id: Set(index_id),
1417 index_items: Set(pb_nodes.into()),
1418 ..Default::default()
1419 }
1420 .update(txn)
1421 .await?;
1422 let index_obj = index
1423 .find_related(Object)
1424 .one(txn)
1425 .await?
1426 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1427 objects.push(PbObject {
1428 object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1429 });
1430 }
1431 }
1432
1433 if let Some(sinks) = auto_refresh_schema_sinks {
1434 for finish_sink_context in sinks {
1435 finish_fragments(
1436 txn,
1437 finish_sink_context.tmp_sink_id.as_job_id(),
1438 finish_sink_context.original_sink_id.as_job_id(),
1439 Default::default(),
1440 )
1441 .await?;
1442 let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1443 .find_also_related(Object)
1444 .one(txn)
1445 .await?
1446 .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1447 let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1448 Sink::update(sink::ActiveModel {
1449 sink_id: Set(finish_sink_context.original_sink_id),
1450 columns: Set(columns.clone()),
1451 ..Default::default()
1452 })
1453 .exec(txn)
1454 .await?;
1455 sink.columns = columns;
1456 objects.push(PbObject {
1457 object_info: Some(PbObjectInfo::Sink(
1458 ObjectModel(sink, sink_obj.unwrap()).into(),
1459 )),
1460 });
1461 if let Some((log_store_table_id, new_log_store_table_columns)) =
1462 finish_sink_context.new_log_store_table
1463 {
1464 let new_log_store_table_columns: ColumnCatalogArray =
1465 new_log_store_table_columns.into();
1466 let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1467 .find_also_related(Object)
1468 .one(txn)
1469 .await?
1470 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1471 Table::update(table::ActiveModel {
1472 table_id: Set(log_store_table_id),
1473 columns: Set(new_log_store_table_columns.clone()),
1474 ..Default::default()
1475 })
1476 .exec(txn)
1477 .await?;
1478 table.columns = new_log_store_table_columns;
1479 objects.push(PbObject {
1480 object_info: Some(PbObjectInfo::Table(
1481 ObjectModel(table, table_obj.unwrap()).into(),
1482 )),
1483 });
1484 }
1485 }
1486 }
1487
1488 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1489 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1490 notification_objs =
1491 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1492 }
1493
1494 Ok((objects, notification_objs))
1495 }
1496
1497 pub async fn try_abort_replacing_streaming_job(
1499 &self,
1500 tmp_job_id: JobId,
1501 tmp_sink_ids: Option<Vec<ObjectId>>,
1502 ) -> MetaResult<()> {
1503 let inner = self.inner.write().await;
1504 let txn = inner.db.begin().await?;
1505 Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1506 if let Some(tmp_sink_ids) = tmp_sink_ids {
1507 for tmp_sink_id in tmp_sink_ids {
1508 Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1509 }
1510 }
1511 txn.commit().await?;
1512 Ok(())
1513 }
1514
1515 pub async fn update_source_rate_limit_by_source_id(
1518 &self,
1519 source_id: SourceId,
1520 rate_limit: Option<u32>,
1521 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1522 let inner = self.inner.read().await;
1523 let txn = inner.db.begin().await?;
1524
1525 {
1526 let active_source = source::ActiveModel {
1527 source_id: Set(source_id),
1528 rate_limit: Set(rate_limit.map(|v| v as i32)),
1529 ..Default::default()
1530 };
1531 active_source.update(&txn).await?;
1532 }
1533
1534 let (source, obj) = Source::find_by_id(source_id)
1535 .find_also_related(Object)
1536 .one(&txn)
1537 .await?
1538 .ok_or_else(|| {
1539 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1540 })?;
1541
1542 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1543 let streaming_job_ids: Vec<JobId> =
1544 if let Some(table_id) = source.optional_associated_table_id {
1545 vec![table_id.as_job_id()]
1546 } else if let Some(source_info) = &source.source_info
1547 && source_info.to_protobuf().is_shared()
1548 {
1549 vec![source_id.as_share_source_job_id()]
1550 } else {
1551 ObjectDependency::find()
1552 .select_only()
1553 .column(object_dependency::Column::UsedBy)
1554 .filter(object_dependency::Column::Oid.eq(source_id))
1555 .into_tuple()
1556 .all(&txn)
1557 .await?
1558 };
1559
1560 if streaming_job_ids.is_empty() {
1561 return Err(MetaError::invalid_parameter(format!(
1562 "source id {source_id} not used by any streaming job"
1563 )));
1564 }
1565
1566 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1567 .select_only()
1568 .columns([
1569 fragment::Column::FragmentId,
1570 fragment::Column::FragmentTypeMask,
1571 fragment::Column::StreamNode,
1572 ])
1573 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1574 .into_tuple()
1575 .all(&txn)
1576 .await?;
1577 let mut fragments = fragments
1578 .into_iter()
1579 .map(|(id, mask, stream_node)| {
1580 (
1581 id,
1582 FragmentTypeMask::from(mask as u32),
1583 stream_node.to_protobuf(),
1584 )
1585 })
1586 .collect_vec();
1587
1588 fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1589 let mut found = false;
1590 if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1591 visit_stream_node_mut(stream_node, |node| {
1592 if let PbNodeBody::Source(node) = node
1593 && let Some(node_inner) = &mut node.source_inner
1594 && node_inner.source_id == source_id
1595 {
1596 node_inner.rate_limit = rate_limit;
1597 found = true;
1598 }
1599 });
1600 }
1601 if is_fs_source {
1602 visit_stream_node_mut(stream_node, |node| {
1605 if let PbNodeBody::StreamFsFetch(node) = node {
1606 fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1607 if let Some(node_inner) = &mut node.node_inner
1608 && node_inner.source_id == source_id
1609 {
1610 node_inner.rate_limit = rate_limit;
1611 found = true;
1612 }
1613 }
1614 });
1615 }
1616 found
1617 });
1618
1619 assert!(
1620 !fragments.is_empty(),
1621 "source id should be used by at least one fragment"
1622 );
1623 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1624
1625 for (id, fragment_type_mask, stream_node) in fragments {
1626 fragment::ActiveModel {
1627 fragment_id: Set(id),
1628 fragment_type_mask: Set(fragment_type_mask.into()),
1629 stream_node: Set(StreamNode::from(&stream_node)),
1630 ..Default::default()
1631 }
1632 .update(&txn)
1633 .await?;
1634 }
1635
1636 txn.commit().await?;
1637
1638 let fragment_actors = self.get_fragment_actors_from_running_info(fragment_ids.into_iter());
1639
1640 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1641 let _version = self
1642 .notify_frontend(
1643 NotificationOperation::Update,
1644 NotificationInfo::ObjectGroup(PbObjectGroup {
1645 objects: vec![PbObject {
1646 object_info: Some(relation_info),
1647 }],
1648 }),
1649 )
1650 .await;
1651
1652 Ok(fragment_actors)
1653 }
1654
1655 fn get_fragment_actors_from_running_info(
1656 &self,
1657 fragment_ids: impl Iterator<Item = FragmentId>,
1658 ) -> HashMap<FragmentId, Vec<ActorId>> {
1659 let mut fragment_actors: HashMap<FragmentId, Vec<ActorId>> = HashMap::new();
1660
1661 let info = self.env.shared_actor_infos().read_guard();
1662
1663 for fragment_id in fragment_ids {
1664 let SharedFragmentInfo { actors, .. } = info.get_fragment(fragment_id).unwrap();
1665 fragment_actors
1666 .entry(fragment_id as _)
1667 .or_default()
1668 .extend(actors.keys().copied());
1669 }
1670
1671 fragment_actors
1672 }
1673
1674 pub async fn mutate_fragments_by_job_id(
1677 &self,
1678 job_id: JobId,
1679 mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1681 err_msg: &'static str,
1683 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1684 let inner = self.inner.read().await;
1685 let txn = inner.db.begin().await?;
1686
1687 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1688 .select_only()
1689 .columns([
1690 fragment::Column::FragmentId,
1691 fragment::Column::FragmentTypeMask,
1692 fragment::Column::StreamNode,
1693 ])
1694 .filter(fragment::Column::JobId.eq(job_id))
1695 .into_tuple()
1696 .all(&txn)
1697 .await?;
1698 let mut fragments = fragments
1699 .into_iter()
1700 .map(|(id, mask, stream_node)| {
1701 (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1702 })
1703 .collect_vec();
1704
1705 let fragments = fragments
1706 .iter_mut()
1707 .map(|(_, fragment_type_mask, stream_node)| {
1708 fragments_mutation_fn(*fragment_type_mask, stream_node)
1709 })
1710 .collect::<MetaResult<Vec<bool>>>()?
1711 .into_iter()
1712 .zip_eq_debug(std::mem::take(&mut fragments))
1713 .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1714 .collect::<Vec<_>>();
1715
1716 if fragments.is_empty() {
1717 return Err(MetaError::invalid_parameter(format!(
1718 "job id {job_id}: {}",
1719 err_msg
1720 )));
1721 }
1722
1723 let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
1724 for (id, _, stream_node) in fragments {
1725 fragment::ActiveModel {
1726 fragment_id: Set(id),
1727 stream_node: Set(StreamNode::from(&stream_node)),
1728 ..Default::default()
1729 }
1730 .update(&txn)
1731 .await?;
1732 }
1733
1734 txn.commit().await?;
1735
1736 let fragment_actors =
1737 self.get_fragment_actors_from_running_info(fragment_ids.iter().copied());
1738
1739 Ok(fragment_actors)
1740 }
1741
1742 async fn mutate_fragment_by_fragment_id(
1743 &self,
1744 fragment_id: FragmentId,
1745 mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1746 err_msg: &'static str,
1747 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1748 let inner = self.inner.read().await;
1749 let txn = inner.db.begin().await?;
1750
1751 let (fragment_type_mask, stream_node): (i32, StreamNode) =
1752 Fragment::find_by_id(fragment_id)
1753 .select_only()
1754 .columns([
1755 fragment::Column::FragmentTypeMask,
1756 fragment::Column::StreamNode,
1757 ])
1758 .into_tuple()
1759 .one(&txn)
1760 .await?
1761 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1762 let mut pb_stream_node = stream_node.to_protobuf();
1763 let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1764
1765 if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1766 return Err(MetaError::invalid_parameter(format!(
1767 "fragment id {fragment_id}: {}",
1768 err_msg
1769 )));
1770 }
1771
1772 fragment::ActiveModel {
1773 fragment_id: Set(fragment_id),
1774 stream_node: Set(stream_node),
1775 ..Default::default()
1776 }
1777 .update(&txn)
1778 .await?;
1779
1780 let fragment_actors =
1781 self.get_fragment_actors_from_running_info(std::iter::once(fragment_id));
1782
1783 txn.commit().await?;
1784
1785 Ok(fragment_actors)
1786 }
1787
1788 pub async fn update_backfill_rate_limit_by_job_id(
1791 &self,
1792 job_id: JobId,
1793 rate_limit: Option<u32>,
1794 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1795 let update_backfill_rate_limit =
1796 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1797 let mut found = false;
1798 if fragment_type_mask
1799 .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1800 {
1801 visit_stream_node_mut(stream_node, |node| match node {
1802 PbNodeBody::StreamCdcScan(node) => {
1803 node.rate_limit = rate_limit;
1804 found = true;
1805 }
1806 PbNodeBody::StreamScan(node) => {
1807 node.rate_limit = rate_limit;
1808 found = true;
1809 }
1810 PbNodeBody::SourceBackfill(node) => {
1811 node.rate_limit = rate_limit;
1812 found = true;
1813 }
1814 PbNodeBody::Sink(node) => {
1815 node.rate_limit = rate_limit;
1816 found = true;
1817 }
1818 _ => {}
1819 });
1820 }
1821 Ok(found)
1822 };
1823
1824 self.mutate_fragments_by_job_id(
1825 job_id,
1826 update_backfill_rate_limit,
1827 "stream scan node or source node not found",
1828 )
1829 .await
1830 }
1831
1832 pub async fn update_sink_rate_limit_by_job_id(
1835 &self,
1836 sink_id: SinkId,
1837 rate_limit: Option<u32>,
1838 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1839 let update_sink_rate_limit =
1840 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1841 let mut found = Ok(false);
1842 if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1843 visit_stream_node_mut(stream_node, |node| {
1844 if let PbNodeBody::Sink(node) = node {
1845 if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1846 found = Err(MetaError::invalid_parameter(
1847 "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1848 ));
1849 return;
1850 }
1851 node.rate_limit = rate_limit;
1852 found = Ok(true);
1853 }
1854 });
1855 }
1856 found
1857 };
1858
1859 self.mutate_fragments_by_job_id(
1860 sink_id.as_job_id(),
1861 update_sink_rate_limit,
1862 "sink node not found",
1863 )
1864 .await
1865 }
1866
1867 pub async fn update_dml_rate_limit_by_job_id(
1868 &self,
1869 job_id: JobId,
1870 rate_limit: Option<u32>,
1871 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1872 let update_dml_rate_limit =
1873 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1874 let mut found = false;
1875 if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1876 visit_stream_node_mut(stream_node, |node| {
1877 if let PbNodeBody::Dml(node) = node {
1878 node.rate_limit = rate_limit;
1879 found = true;
1880 }
1881 });
1882 }
1883 Ok(found)
1884 };
1885
1886 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1887 .await
1888 }
1889
1890 pub async fn update_source_props_by_source_id(
1891 &self,
1892 source_id: SourceId,
1893 alter_props: BTreeMap<String, String>,
1894 alter_secret_refs: BTreeMap<String, PbSecretRef>,
1895 ) -> MetaResult<WithOptionsSecResolved> {
1896 let inner = self.inner.read().await;
1897 let txn = inner.db.begin().await?;
1898
1899 let (source, _obj) = Source::find_by_id(source_id)
1900 .find_also_related(Object)
1901 .one(&txn)
1902 .await?
1903 .ok_or_else(|| {
1904 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1905 })?;
1906 let connector = source.with_properties.0.get_connector().unwrap();
1907 let is_shared_source = source.is_shared();
1908
1909 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
1910 if !is_shared_source {
1911 dep_source_job_ids = ObjectDependency::find()
1913 .select_only()
1914 .column(object_dependency::Column::UsedBy)
1915 .filter(object_dependency::Column::Oid.eq(source_id))
1916 .into_tuple()
1917 .all(&txn)
1918 .await?;
1919 }
1920
1921 let prop_keys: Vec<String> = alter_props
1923 .keys()
1924 .chain(alter_secret_refs.keys())
1925 .cloned()
1926 .collect();
1927 risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
1928 &connector, &prop_keys,
1929 )?;
1930
1931 let mut options_with_secret = WithOptionsSecResolved::new(
1932 source.with_properties.0.clone(),
1933 source
1934 .secret_ref
1935 .map(|secret_ref| secret_ref.to_protobuf())
1936 .unwrap_or_default(),
1937 );
1938 let (to_add_secret_dep, to_remove_secret_dep) =
1939 options_with_secret.handle_update(alter_props, alter_secret_refs)?;
1940
1941 tracing::info!(
1942 "applying new properties to source: source_id={}, options_with_secret={:?}",
1943 source_id,
1944 options_with_secret
1945 );
1946 let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
1948 let mut associate_table_id = None;
1951
1952 let mut preferred_id = source_id.as_object_id();
1956 let rewrite_sql = {
1957 let definition = source.definition.clone();
1958
1959 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1960 .map_err(|e| {
1961 MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
1962 anyhow!(e).context("Failed to parse source definition SQL"),
1963 )))
1964 })?
1965 .try_into()
1966 .unwrap();
1967
1968 async fn format_with_option_secret_resolved(
1982 txn: &DatabaseTransaction,
1983 options_with_secret: &WithOptionsSecResolved,
1984 ) -> MetaResult<Vec<SqlOption>> {
1985 let mut options = Vec::new();
1986 for (k, v) in options_with_secret.as_plaintext() {
1987 let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
1988 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1989 options.push(sql_option);
1990 }
1991 for (k, v) in options_with_secret.as_secret() {
1992 if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
1993 let sql_option =
1994 SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
1995 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1996 options.push(sql_option);
1997 } else {
1998 return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
1999 }
2000 }
2001 Ok(options)
2002 }
2003
2004 match &mut stmt {
2005 Statement::CreateSource { stmt } => {
2006 stmt.with_properties.0 =
2007 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2008 }
2009 Statement::CreateTable { with_options, .. } => {
2010 *with_options =
2011 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2012 associate_table_id = source.optional_associated_table_id;
2013 preferred_id = associate_table_id.unwrap().as_object_id();
2014 }
2015 _ => unreachable!(),
2016 }
2017
2018 stmt.to_string()
2019 };
2020
2021 {
2022 if !to_add_secret_dep.is_empty() {
2024 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2025 object_dependency::ActiveModel {
2026 oid: Set(secret_id.into()),
2027 used_by: Set(preferred_id),
2028 ..Default::default()
2029 }
2030 }))
2031 .exec(&txn)
2032 .await?;
2033 }
2034 if !to_remove_secret_dep.is_empty() {
2035 let _ = ObjectDependency::delete_many()
2037 .filter(
2038 object_dependency::Column::Oid
2039 .is_in(to_remove_secret_dep)
2040 .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2041 )
2042 .exec(&txn)
2043 .await?;
2044 }
2045 }
2046
2047 let active_source_model = source::ActiveModel {
2048 source_id: Set(source_id),
2049 definition: Set(rewrite_sql.clone()),
2050 with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2051 secret_ref: Set((!options_with_secret.as_secret().is_empty())
2052 .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2053 ..Default::default()
2054 };
2055 active_source_model.update(&txn).await?;
2056
2057 if let Some(associate_table_id) = associate_table_id {
2058 let active_table_model = table::ActiveModel {
2060 table_id: Set(associate_table_id),
2061 definition: Set(rewrite_sql),
2062 ..Default::default()
2063 };
2064 active_table_model.update(&txn).await?;
2065 }
2066
2067 let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2068 associate_table_id.as_job_id()
2070 } else {
2071 source_id.as_share_source_job_id()
2072 }]
2073 .into_iter()
2074 .chain(dep_source_job_ids.into_iter())
2075 .collect_vec();
2076
2077 update_connector_props_fragments(
2079 &txn,
2080 to_check_job_ids,
2081 FragmentTypeFlag::Source,
2082 |node, found| {
2083 if let PbNodeBody::Source(node) = node
2084 && let Some(source_inner) = &mut node.source_inner
2085 {
2086 source_inner.with_properties = options_with_secret.as_plaintext().clone();
2087 source_inner.secret_refs = options_with_secret.as_secret().clone();
2088 *found = true;
2089 }
2090 },
2091 is_shared_source,
2092 )
2093 .await?;
2094
2095 let mut to_update_objs = Vec::with_capacity(2);
2096 let (source, obj) = Source::find_by_id(source_id)
2097 .find_also_related(Object)
2098 .one(&txn)
2099 .await?
2100 .ok_or_else(|| {
2101 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2102 })?;
2103 to_update_objs.push(PbObject {
2104 object_info: Some(PbObjectInfo::Source(
2105 ObjectModel(source, obj.unwrap()).into(),
2106 )),
2107 });
2108
2109 if let Some(associate_table_id) = associate_table_id {
2110 let (table, obj) = Table::find_by_id(associate_table_id)
2111 .find_also_related(Object)
2112 .one(&txn)
2113 .await?
2114 .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2115 to_update_objs.push(PbObject {
2116 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2117 });
2118 }
2119
2120 txn.commit().await?;
2121
2122 self.notify_frontend(
2123 NotificationOperation::Update,
2124 NotificationInfo::ObjectGroup(PbObjectGroup {
2125 objects: to_update_objs,
2126 }),
2127 )
2128 .await;
2129
2130 Ok(options_with_secret)
2131 }
2132
2133 pub async fn update_sink_props_by_sink_id(
2134 &self,
2135 sink_id: SinkId,
2136 props: BTreeMap<String, String>,
2137 ) -> MetaResult<HashMap<String, String>> {
2138 let inner = self.inner.read().await;
2139 let txn = inner.db.begin().await?;
2140
2141 let (sink, _obj) = Sink::find_by_id(sink_id)
2142 .find_also_related(Object)
2143 .one(&txn)
2144 .await?
2145 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2146 validate_sink_props(&sink, &props)?;
2147 let definition = sink.definition.clone();
2148 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2149 .map_err(|e| SinkError::Config(anyhow!(e)))?
2150 .try_into()
2151 .unwrap();
2152 if let Statement::CreateSink { stmt } = &mut stmt {
2153 update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2154 } else {
2155 panic!("definition is not a create sink statement")
2156 }
2157 let mut new_config = sink.properties.clone().into_inner();
2158 new_config.extend(props.clone());
2159
2160 let definition = stmt.to_string();
2161 let active_sink = sink::ActiveModel {
2162 sink_id: Set(sink_id),
2163 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2164 definition: Set(definition),
2165 ..Default::default()
2166 };
2167 active_sink.update(&txn).await?;
2168
2169 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2170 let (sink, obj) = Sink::find_by_id(sink_id)
2171 .find_also_related(Object)
2172 .one(&txn)
2173 .await?
2174 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2175 txn.commit().await?;
2176 let relation_infos = vec![PbObject {
2177 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2178 }];
2179
2180 let _version = self
2181 .notify_frontend(
2182 NotificationOperation::Update,
2183 NotificationInfo::ObjectGroup(PbObjectGroup {
2184 objects: relation_infos,
2185 }),
2186 )
2187 .await;
2188
2189 Ok(props.into_iter().collect())
2190 }
2191
2192 pub async fn update_iceberg_table_props_by_table_id(
2193 &self,
2194 table_id: TableId,
2195 props: BTreeMap<String, String>,
2196 alter_iceberg_table_props: Option<
2197 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2198 >,
2199 ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2200 let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2201 ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2202 let inner = self.inner.read().await;
2203 let txn = inner.db.begin().await?;
2204
2205 let (sink, _obj) = Sink::find_by_id(sink_id)
2206 .find_also_related(Object)
2207 .one(&txn)
2208 .await?
2209 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2210 validate_sink_props(&sink, &props)?;
2211
2212 let definition = sink.definition.clone();
2213 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2214 .map_err(|e| SinkError::Config(anyhow!(e)))?
2215 .try_into()
2216 .unwrap();
2217 if let Statement::CreateTable {
2218 with_options,
2219 engine,
2220 ..
2221 } = &mut stmt
2222 {
2223 if !matches!(engine, Engine::Iceberg) {
2224 return Err(SinkError::Config(anyhow!(
2225 "only iceberg table can be altered as sink"
2226 ))
2227 .into());
2228 }
2229 update_stmt_with_props(with_options, &props)?;
2230 } else {
2231 panic!("definition is not a create iceberg table statement")
2232 }
2233 let mut new_config = sink.properties.clone().into_inner();
2234 new_config.extend(props.clone());
2235
2236 let definition = stmt.to_string();
2237 let active_sink = sink::ActiveModel {
2238 sink_id: Set(sink_id),
2239 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2240 definition: Set(definition.clone()),
2241 ..Default::default()
2242 };
2243 let active_source = source::ActiveModel {
2244 source_id: Set(source_id),
2245 definition: Set(definition.clone()),
2246 ..Default::default()
2247 };
2248 let active_table = table::ActiveModel {
2249 table_id: Set(table_id),
2250 definition: Set(definition),
2251 ..Default::default()
2252 };
2253 active_sink.update(&txn).await?;
2254 active_source.update(&txn).await?;
2255 active_table.update(&txn).await?;
2256
2257 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2258
2259 let (sink, sink_obj) = Sink::find_by_id(sink_id)
2260 .find_also_related(Object)
2261 .one(&txn)
2262 .await?
2263 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2264 let (source, source_obj) = Source::find_by_id(source_id)
2265 .find_also_related(Object)
2266 .one(&txn)
2267 .await?
2268 .ok_or_else(|| {
2269 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2270 })?;
2271 let (table, table_obj) = Table::find_by_id(table_id)
2272 .find_also_related(Object)
2273 .one(&txn)
2274 .await?
2275 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2276 txn.commit().await?;
2277 let relation_infos = vec![
2278 PbObject {
2279 object_info: Some(PbObjectInfo::Sink(
2280 ObjectModel(sink, sink_obj.unwrap()).into(),
2281 )),
2282 },
2283 PbObject {
2284 object_info: Some(PbObjectInfo::Source(
2285 ObjectModel(source, source_obj.unwrap()).into(),
2286 )),
2287 },
2288 PbObject {
2289 object_info: Some(PbObjectInfo::Table(
2290 ObjectModel(table, table_obj.unwrap()).into(),
2291 )),
2292 },
2293 ];
2294 let _version = self
2295 .notify_frontend(
2296 NotificationOperation::Update,
2297 NotificationInfo::ObjectGroup(PbObjectGroup {
2298 objects: relation_infos,
2299 }),
2300 )
2301 .await;
2302
2303 Ok((props.into_iter().collect(), sink_id))
2304 }
2305
2306 pub async fn update_fragment_rate_limit_by_fragment_id(
2307 &self,
2308 fragment_id: FragmentId,
2309 rate_limit: Option<u32>,
2310 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2311 let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2312 stream_node: &mut PbStreamNode| {
2313 let mut found = false;
2314 if fragment_type_mask.contains_any(
2315 FragmentTypeFlag::dml_rate_limit_fragments()
2316 .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2317 .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2318 .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2319 ) {
2320 visit_stream_node_mut(stream_node, |node| {
2321 if let PbNodeBody::Dml(node) = node {
2322 node.rate_limit = rate_limit;
2323 found = true;
2324 }
2325 if let PbNodeBody::Sink(node) = node {
2326 node.rate_limit = rate_limit;
2327 found = true;
2328 }
2329 if let PbNodeBody::StreamCdcScan(node) = node {
2330 node.rate_limit = rate_limit;
2331 found = true;
2332 }
2333 if let PbNodeBody::StreamScan(node) = node {
2334 node.rate_limit = rate_limit;
2335 found = true;
2336 }
2337 if let PbNodeBody::SourceBackfill(node) = node {
2338 node.rate_limit = rate_limit;
2339 found = true;
2340 }
2341 });
2342 }
2343 found
2344 };
2345 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2346 .await
2347 }
2348
2349 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2352 let inner = self.inner.read().await;
2353 let txn = inner.db.begin().await?;
2354
2355 let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2356 .select_only()
2357 .columns([
2358 fragment::Column::FragmentId,
2359 fragment::Column::JobId,
2360 fragment::Column::FragmentTypeMask,
2361 fragment::Column::StreamNode,
2362 ])
2363 .filter(FragmentTypeMask::intersects_any(
2364 FragmentTypeFlag::rate_limit_fragments(),
2365 ))
2366 .into_tuple()
2367 .all(&txn)
2368 .await?;
2369
2370 let mut rate_limits = Vec::new();
2371 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2372 let stream_node = stream_node.to_protobuf();
2373 visit_stream_node_body(&stream_node, |node| {
2374 let mut rate_limit = None;
2375 let mut node_name = None;
2376
2377 match node {
2378 PbNodeBody::Source(node) => {
2380 if let Some(node_inner) = &node.source_inner {
2381 rate_limit = node_inner.rate_limit;
2382 node_name = Some("SOURCE");
2383 }
2384 }
2385 PbNodeBody::StreamFsFetch(node) => {
2386 if let Some(node_inner) = &node.node_inner {
2387 rate_limit = node_inner.rate_limit;
2388 node_name = Some("FS_FETCH");
2389 }
2390 }
2391 PbNodeBody::SourceBackfill(node) => {
2393 rate_limit = node.rate_limit;
2394 node_name = Some("SOURCE_BACKFILL");
2395 }
2396 PbNodeBody::StreamScan(node) => {
2397 rate_limit = node.rate_limit;
2398 node_name = Some("STREAM_SCAN");
2399 }
2400 PbNodeBody::StreamCdcScan(node) => {
2401 rate_limit = node.rate_limit;
2402 node_name = Some("STREAM_CDC_SCAN");
2403 }
2404 PbNodeBody::Sink(node) => {
2405 rate_limit = node.rate_limit;
2406 node_name = Some("SINK");
2407 }
2408 _ => {}
2409 }
2410
2411 if let Some(rate_limit) = rate_limit {
2412 rate_limits.push(RateLimitInfo {
2413 fragment_id,
2414 job_id,
2415 fragment_type_mask: fragment_type_mask as u32,
2416 rate_limit,
2417 node_name: node_name.unwrap().to_owned(),
2418 });
2419 }
2420 });
2421 }
2422
2423 Ok(rate_limits)
2424 }
2425}
2426
2427fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
2428 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2430 Some(connector) => {
2431 let connector_type = connector.to_lowercase();
2432 let field_names: Vec<String> = props.keys().cloned().collect();
2433 check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2434 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2435
2436 match_sink_name_str!(
2437 connector_type.as_str(),
2438 SinkType,
2439 {
2440 let mut new_props = sink.properties.0.clone();
2441 new_props.extend(props.clone());
2442 SinkType::validate_alter_config(&new_props)
2443 },
2444 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2445 )?
2446 }
2447 None => {
2448 return Err(
2449 SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2450 );
2451 }
2452 };
2453 Ok(())
2454}
2455
2456fn update_stmt_with_props(
2457 with_properties: &mut Vec<SqlOption>,
2458 props: &BTreeMap<String, String>,
2459) -> MetaResult<()> {
2460 let mut new_sql_options = with_properties
2461 .iter()
2462 .map(|sql_option| (&sql_option.name, sql_option))
2463 .collect::<IndexMap<_, _>>();
2464 let add_sql_options = props
2465 .iter()
2466 .map(|(k, v)| SqlOption::try_from((k, v)))
2467 .collect::<Result<Vec<SqlOption>, ParserError>>()
2468 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2469 new_sql_options.extend(
2470 add_sql_options
2471 .iter()
2472 .map(|sql_option| (&sql_option.name, sql_option)),
2473 );
2474 *with_properties = new_sql_options.into_values().cloned().collect();
2475 Ok(())
2476}
2477
2478async fn update_sink_fragment_props(
2479 txn: &DatabaseTransaction,
2480 sink_id: SinkId,
2481 props: BTreeMap<String, String>,
2482) -> MetaResult<()> {
2483 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2484 .select_only()
2485 .columns([
2486 fragment::Column::FragmentId,
2487 fragment::Column::FragmentTypeMask,
2488 fragment::Column::StreamNode,
2489 ])
2490 .filter(fragment::Column::JobId.eq(sink_id))
2491 .into_tuple()
2492 .all(txn)
2493 .await?;
2494 let fragments = fragments
2495 .into_iter()
2496 .filter(|(_, fragment_type_mask, _)| {
2497 *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
2498 })
2499 .filter_map(|(id, _, stream_node)| {
2500 let mut stream_node = stream_node.to_protobuf();
2501 let mut found = false;
2502 visit_stream_node_mut(&mut stream_node, |node| {
2503 if let PbNodeBody::Sink(node) = node
2504 && let Some(sink_desc) = &mut node.sink_desc
2505 && sink_desc.id == sink_id
2506 {
2507 sink_desc.properties.extend(props.clone());
2508 found = true;
2509 }
2510 });
2511 if found { Some((id, stream_node)) } else { None }
2512 })
2513 .collect_vec();
2514 assert!(
2515 !fragments.is_empty(),
2516 "sink id should be used by at least one fragment"
2517 );
2518 for (id, stream_node) in fragments {
2519 fragment::ActiveModel {
2520 fragment_id: Set(id),
2521 stream_node: Set(StreamNode::from(&stream_node)),
2522 ..Default::default()
2523 }
2524 .update(txn)
2525 .await?;
2526 }
2527 Ok(())
2528}
2529
2530pub struct SinkIntoTableContext {
2531 pub updated_sink_catalogs: Vec<SinkId>,
2534}
2535
2536pub struct FinishAutoRefreshSchemaSinkContext {
2537 pub tmp_sink_id: SinkId,
2538 pub original_sink_id: SinkId,
2539 pub columns: Vec<PbColumnCatalog>,
2540 pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
2541}
2542
2543async fn update_connector_props_fragments<F>(
2544 txn: &DatabaseTransaction,
2545 job_ids: Vec<JobId>,
2546 expect_flag: FragmentTypeFlag,
2547 mut alter_stream_node_fn: F,
2548 is_shared_source: bool,
2549) -> MetaResult<()>
2550where
2551 F: FnMut(&mut PbNodeBody, &mut bool),
2552{
2553 let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
2554 .select_only()
2555 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
2556 .filter(
2557 fragment::Column::JobId
2558 .is_in(job_ids.clone())
2559 .and(FragmentTypeMask::intersects(expect_flag)),
2560 )
2561 .into_tuple()
2562 .all(txn)
2563 .await?;
2564 let fragments = fragments
2565 .into_iter()
2566 .filter_map(|(id, stream_node)| {
2567 let mut stream_node = stream_node.to_protobuf();
2568 let mut found = false;
2569 visit_stream_node_mut(&mut stream_node, |node| {
2570 alter_stream_node_fn(node, &mut found);
2571 });
2572 if found { Some((id, stream_node)) } else { None }
2573 })
2574 .collect_vec();
2575 if is_shared_source || job_ids.len() > 1 {
2576 assert!(
2580 !fragments.is_empty(),
2581 "job ids {:?} (type: {:?}) should be used by at least one fragment",
2582 job_ids,
2583 expect_flag
2584 );
2585 }
2586
2587 for (id, stream_node) in fragments {
2588 fragment::ActiveModel {
2589 fragment_id: Set(id),
2590 stream_node: Set(StreamNode::from(&stream_node)),
2591 ..Default::default()
2592 }
2593 .update(txn)
2594 .await?;
2595 }
2596
2597 Ok(())
2598}