1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::util::iter_util::ZipEqDebug;
26use risingwave_common::util::stream_graph_visitor::{
27 visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_common::{bail, current_cluster_version};
30use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
31use risingwave_connector::error::ConnectorError;
32use risingwave_connector::sink::file_sink::fs::FsSink;
33use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
34use risingwave_connector::source::ConnectorProperties;
35use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
36use risingwave_meta_model::object::ObjectType;
37use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
38use risingwave_meta_model::table::TableType;
39use risingwave_meta_model::user_privilege::Action;
40use risingwave_meta_model::*;
41use risingwave_pb::catalog::{PbCreateType, PbTable};
42use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
43use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
44use risingwave_pb::meta::object::PbObjectInfo;
45use risingwave_pb::meta::subscribe_response::{
46 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
47};
48use risingwave_pb::meta::{PbObject, PbObjectGroup};
49use risingwave_pb::plan_common::PbColumnCatalog;
50use risingwave_pb::secret::PbSecretRef;
51use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
52use risingwave_pb::stream_plan::stream_node::PbNodeBody;
53use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
54use risingwave_pb::user::PbUserInfo;
55use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
56use risingwave_sqlparser::parser::{Parser, ParserError};
57use sea_orm::ActiveValue::Set;
58use sea_orm::sea_query::{Expr, Query, SimpleExpr};
59use sea_orm::{
60 ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
61 ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
62};
63use thiserror_ext::AsReport;
64
65use super::rename::IndexItemRewriter;
66use crate::barrier::{Command, SharedFragmentInfo};
67use crate::controller::ObjectModel;
68use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
69use crate::controller::fragment::FragmentTypeMaskExt;
70use crate::controller::utils::{
71 PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
72 check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
73 ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
74 get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
75 list_user_info_by_ids, try_get_iceberg_table_by_downstream_sink,
76};
77use crate::error::MetaErrorInner;
78use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
79use crate::model::{
80 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
81 StreamJobFragmentsToCreate,
82};
83use crate::stream::SplitAssignment;
84use crate::{MetaError, MetaResult};
85
86impl CatalogController {
87 pub async fn create_streaming_job_obj(
88 txn: &DatabaseTransaction,
89 obj_type: ObjectType,
90 owner_id: UserId,
91 database_id: Option<DatabaseId>,
92 schema_id: Option<SchemaId>,
93 create_type: PbCreateType,
94 ctx: StreamContext,
95 streaming_parallelism: StreamingParallelism,
96 max_parallelism: usize,
97 specific_resource_group: Option<String>, ) -> MetaResult<JobId> {
99 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
100 let job_id = obj.oid.as_job_id();
101 let job = streaming_job::ActiveModel {
102 job_id: Set(job_id),
103 job_status: Set(JobStatus::Initial),
104 create_type: Set(create_type.into()),
105 timezone: Set(ctx.timezone),
106 config_override: Set(Some(ctx.config_override.to_string())),
107 parallelism: Set(streaming_parallelism),
108 max_parallelism: Set(max_parallelism as _),
109 specific_resource_group: Set(specific_resource_group),
110 };
111 job.insert(txn).await?;
112
113 Ok(job_id)
114 }
115
116 #[await_tree::instrument]
122 pub async fn create_job_catalog(
123 &self,
124 streaming_job: &mut StreamingJob,
125 ctx: &StreamContext,
126 parallelism: &Option<Parallelism>,
127 max_parallelism: usize,
128 mut dependencies: HashSet<ObjectId>,
129 specific_resource_group: Option<String>,
130 ) -> MetaResult<()> {
131 let inner = self.inner.write().await;
132 let txn = inner.db.begin().await?;
133 let create_type = streaming_job.create_type();
134
135 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
136 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
137 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
138 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
139 };
140
141 ensure_user_id(streaming_job.owner() as _, &txn).await?;
142 ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
143 ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
144 check_relation_name_duplicate(
145 &streaming_job.name(),
146 streaming_job.database_id(),
147 streaming_job.schema_id(),
148 &txn,
149 )
150 .await?;
151
152 if !dependencies.is_empty() {
154 let altering_cnt = ObjectDependency::find()
155 .join(
156 JoinType::InnerJoin,
157 object_dependency::Relation::Object1.def(),
158 )
159 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
160 .filter(
161 object_dependency::Column::Oid
162 .is_in(dependencies.clone())
163 .and(object::Column::ObjType.eq(ObjectType::Table))
164 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
165 .and(
166 object::Column::Oid.not_in_subquery(
168 Query::select()
169 .column(table::Column::TableId)
170 .from(Table)
171 .to_owned(),
172 ),
173 ),
174 )
175 .count(&txn)
176 .await?;
177 if altering_cnt != 0 {
178 return Err(MetaError::permission_denied(
179 "some dependent relations are being altered",
180 ));
181 }
182 }
183
184 match streaming_job {
185 StreamingJob::MaterializedView(table) => {
186 let job_id = Self::create_streaming_job_obj(
187 &txn,
188 ObjectType::Table,
189 table.owner as _,
190 Some(table.database_id),
191 Some(table.schema_id),
192 create_type,
193 ctx.clone(),
194 streaming_parallelism,
195 max_parallelism,
196 specific_resource_group,
197 )
198 .await?;
199 table.id = job_id.as_mv_table_id();
200 let table_model: table::ActiveModel = table.clone().into();
201 Table::insert(table_model).exec(&txn).await?;
202 }
203 StreamingJob::Sink(sink) => {
204 if let Some(target_table_id) = sink.target_table
205 && check_sink_into_table_cycle(
206 target_table_id.into(),
207 dependencies.iter().cloned().collect(),
208 &txn,
209 )
210 .await?
211 {
212 bail!("Creating such a sink will result in circular dependency.");
213 }
214
215 let job_id = Self::create_streaming_job_obj(
216 &txn,
217 ObjectType::Sink,
218 sink.owner as _,
219 Some(sink.database_id),
220 Some(sink.schema_id),
221 create_type,
222 ctx.clone(),
223 streaming_parallelism,
224 max_parallelism,
225 specific_resource_group,
226 )
227 .await?;
228 sink.id = job_id.as_sink_id();
229 let sink_model: sink::ActiveModel = sink.clone().into();
230 Sink::insert(sink_model).exec(&txn).await?;
231 }
232 StreamingJob::Table(src, table, _) => {
233 let job_id = Self::create_streaming_job_obj(
234 &txn,
235 ObjectType::Table,
236 table.owner as _,
237 Some(table.database_id),
238 Some(table.schema_id),
239 create_type,
240 ctx.clone(),
241 streaming_parallelism,
242 max_parallelism,
243 specific_resource_group,
244 )
245 .await?;
246 table.id = job_id.as_mv_table_id();
247 if let Some(src) = src {
248 let src_obj = Self::create_object(
249 &txn,
250 ObjectType::Source,
251 src.owner as _,
252 Some(src.database_id),
253 Some(src.schema_id),
254 )
255 .await?;
256 src.id = src_obj.oid.as_source_id();
257 src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
258 table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
259 let source: source::ActiveModel = src.clone().into();
260 Source::insert(source).exec(&txn).await?;
261 }
262 let table_model: table::ActiveModel = table.clone().into();
263 Table::insert(table_model).exec(&txn).await?;
264 }
265 StreamingJob::Index(index, table) => {
266 ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
267 let job_id = Self::create_streaming_job_obj(
268 &txn,
269 ObjectType::Index,
270 index.owner as _,
271 Some(index.database_id),
272 Some(index.schema_id),
273 create_type,
274 ctx.clone(),
275 streaming_parallelism,
276 max_parallelism,
277 specific_resource_group,
278 )
279 .await?;
280 index.id = job_id.as_index_id();
282 index.index_table_id = job_id.as_mv_table_id();
283 table.id = job_id.as_mv_table_id();
284
285 ObjectDependency::insert(object_dependency::ActiveModel {
286 oid: Set(index.primary_table_id.into()),
287 used_by: Set(table.id.into()),
288 ..Default::default()
289 })
290 .exec(&txn)
291 .await?;
292
293 let table_model: table::ActiveModel = table.clone().into();
294 Table::insert(table_model).exec(&txn).await?;
295 let index_model: index::ActiveModel = index.clone().into();
296 Index::insert(index_model).exec(&txn).await?;
297 }
298 StreamingJob::Source(src) => {
299 let job_id = Self::create_streaming_job_obj(
300 &txn,
301 ObjectType::Source,
302 src.owner as _,
303 Some(src.database_id),
304 Some(src.schema_id),
305 create_type,
306 ctx.clone(),
307 streaming_parallelism,
308 max_parallelism,
309 specific_resource_group,
310 )
311 .await?;
312 src.id = job_id.as_shared_source_id();
313 let source_model: source::ActiveModel = src.clone().into();
314 Source::insert(source_model).exec(&txn).await?;
315 }
316 }
317
318 dependencies.extend(
320 streaming_job
321 .dependent_secret_ids()?
322 .into_iter()
323 .map(|id| id.as_object_id()),
324 );
325 dependencies.extend(
327 streaming_job
328 .dependent_connection_ids()?
329 .into_iter()
330 .map(|id| id.as_object_id()),
331 );
332
333 if !dependencies.is_empty() {
335 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
336 object_dependency::ActiveModel {
337 oid: Set(oid),
338 used_by: Set(streaming_job.id().as_object_id()),
339 ..Default::default()
340 }
341 }))
342 .exec(&txn)
343 .await?;
344 }
345
346 txn.commit().await?;
347
348 Ok(())
349 }
350
351 pub async fn create_internal_table_catalog(
359 &self,
360 job: &StreamingJob,
361 mut incomplete_internal_tables: Vec<PbTable>,
362 ) -> MetaResult<HashMap<TableId, TableId>> {
363 let job_id = job.id();
364 let inner = self.inner.write().await;
365 let txn = inner.db.begin().await?;
366
367 ensure_job_not_canceled(job_id, &txn).await?;
369
370 let mut table_id_map = HashMap::new();
371 for table in &mut incomplete_internal_tables {
372 let table_id = Self::create_object(
373 &txn,
374 ObjectType::Table,
375 table.owner as _,
376 Some(table.database_id),
377 Some(table.schema_id),
378 )
379 .await?
380 .oid
381 .as_table_id();
382 table_id_map.insert(table.id, table_id);
383 table.id = table_id;
384 table.job_id = Some(job_id);
385
386 let table_model = table::ActiveModel {
387 table_id: Set(table_id),
388 belongs_to_job_id: Set(Some(job_id)),
389 fragment_id: NotSet,
390 ..table.clone().into()
391 };
392 Table::insert(table_model).exec(&txn).await?;
393 }
394 txn.commit().await?;
395
396 Ok(table_id_map)
397 }
398
399 pub async fn prepare_stream_job_fragments(
400 &self,
401 stream_job_fragments: &StreamJobFragmentsToCreate,
402 streaming_job: &StreamingJob,
403 for_replace: bool,
404 ) -> MetaResult<()> {
405 self.prepare_streaming_job(
406 stream_job_fragments.stream_job_id(),
407 || stream_job_fragments.fragments.values(),
408 &stream_job_fragments.downstreams,
409 for_replace,
410 Some(streaming_job),
411 )
412 .await
413 }
414
415 #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
420 )]
421 pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
422 &self,
423 job_id: JobId,
424 get_fragments: impl Fn() -> I + 'a,
425 downstreams: &FragmentDownstreamRelation,
426 for_replace: bool,
427 creating_streaming_job: Option<&'a StreamingJob>,
428 ) -> MetaResult<()> {
429 let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
430
431 let inner = self.inner.write().await;
432
433 let need_notify = creating_streaming_job
434 .map(|job| job.should_notify_creating())
435 .unwrap_or(false);
436 let definition = creating_streaming_job.map(|job| job.definition());
437
438 let mut objects_to_notify = vec![];
439 let txn = inner.db.begin().await?;
440
441 ensure_job_not_canceled(job_id, &txn).await?;
443
444 for fragment in fragments {
445 let fragment_id = fragment.fragment_id;
446 let state_table_ids = fragment.state_table_ids.inner_ref().clone();
447
448 let fragment = fragment.into_active_model();
449 Fragment::insert(fragment).exec(&txn).await?;
450
451 if !for_replace {
454 let all_tables = StreamJobFragments::collect_tables(get_fragments());
455 for state_table_id in state_table_ids {
456 let table = all_tables
460 .get(&state_table_id)
461 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
462 assert_eq!(table.id, state_table_id);
463 assert_eq!(table.fragment_id, fragment_id);
464 let vnode_count = table.vnode_count();
465
466 table::ActiveModel {
467 table_id: Set(state_table_id as _),
468 fragment_id: Set(Some(fragment_id)),
469 vnode_count: Set(vnode_count as _),
470 ..Default::default()
471 }
472 .update(&txn)
473 .await?;
474
475 if need_notify {
476 let mut table = table.clone();
477 if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
479 table.definition = definition.clone().unwrap();
480 }
481 objects_to_notify.push(PbObject {
482 object_info: Some(PbObjectInfo::Table(table)),
483 });
484 }
485 }
486 }
487 }
488
489 if need_notify {
491 match creating_streaming_job.unwrap() {
492 StreamingJob::Table(Some(source), ..) => {
493 objects_to_notify.push(PbObject {
494 object_info: Some(PbObjectInfo::Source(source.clone())),
495 });
496 }
497 StreamingJob::Sink(sink) => {
498 objects_to_notify.push(PbObject {
499 object_info: Some(PbObjectInfo::Sink(sink.clone())),
500 });
501 }
502 StreamingJob::Index(index, _) => {
503 objects_to_notify.push(PbObject {
504 object_info: Some(PbObjectInfo::Index(index.clone())),
505 });
506 }
507 _ => {}
508 }
509 }
510
511 insert_fragment_relations(&txn, downstreams).await?;
512
513 if !for_replace {
514 if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
516 Table::update(table::ActiveModel {
517 table_id: Set(table.id),
518 dml_fragment_id: Set(table.dml_fragment_id),
519 ..Default::default()
520 })
521 .exec(&txn)
522 .await?;
523 }
524 }
525
526 txn.commit().await?;
527
528 if !objects_to_notify.is_empty() {
532 self.notify_frontend(
533 Operation::Add,
534 Info::ObjectGroup(PbObjectGroup {
535 objects: objects_to_notify,
536 }),
537 )
538 .await;
539 }
540
541 Ok(())
542 }
543
544 pub async fn build_cancel_command(
547 &self,
548 table_fragments: &StreamJobFragments,
549 ) -> MetaResult<Command> {
550 let inner = self.inner.read().await;
551 let txn = inner.db.begin().await?;
552
553 let dropped_sink_fragment_with_target =
554 if let Some(sink_fragment) = table_fragments.sink_fragment() {
555 let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
556 let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
557 sink_target_fragment
558 .get(&sink_fragment_id)
559 .map(|target_fragments| {
560 let target_fragment_id = *target_fragments
561 .first()
562 .expect("sink should have at least one downstream fragment");
563 (sink_fragment_id, target_fragment_id)
564 })
565 } else {
566 None
567 };
568
569 Ok(Command::DropStreamingJobs {
570 streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
571 actors: table_fragments.actor_ids().collect(),
572 unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
573 unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
574 dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
575 .into_iter()
576 .map(|(sink, target)| (target as _, vec![sink as _]))
577 .collect(),
578 })
579 }
580
581 #[await_tree::instrument]
585 pub async fn try_abort_creating_streaming_job(
586 &self,
587 mut job_id: JobId,
588 is_cancelled: bool,
589 ) -> MetaResult<(bool, Option<DatabaseId>)> {
590 let mut inner = self.inner.write().await;
591 let txn = inner.db.begin().await?;
592
593 let obj = Object::find_by_id(job_id).one(&txn).await?;
594 let Some(obj) = obj else {
595 tracing::warn!(
596 id = %job_id,
597 "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
598 );
599 return Ok((true, None));
600 };
601 let database_id = obj
602 .database_id
603 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
604 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
605
606 if !is_cancelled && let Some(streaming_job) = &streaming_job {
607 assert_ne!(streaming_job.job_status, JobStatus::Created);
608 if streaming_job.create_type == CreateType::Background
609 && streaming_job.job_status == JobStatus::Creating
610 {
611 if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
612 && check_if_belongs_to_iceberg_table(&txn, job_id).await?
613 {
614 } else {
616 tracing::warn!(
618 id = %job_id,
619 "streaming job is created in background and still in creating status"
620 );
621 return Ok((false, Some(database_id)));
622 }
623 }
624 }
625
626 let iceberg_table_id =
627 try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
628 if let Some(iceberg_table_id) = iceberg_table_id {
629 let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
632 Object::delete_many()
633 .filter(
634 object::Column::Oid
635 .eq(job_id)
636 .or(object::Column::Oid.is_in(internal_tables)),
637 )
638 .exec(&txn)
639 .await?;
640 job_id = iceberg_table_id.as_job_id();
641 };
642
643 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
644
645 let mut objs = vec![];
647 let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
648
649 let mut need_notify =
650 streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
651 if !need_notify {
652 if let Some(table) = &table_obj {
654 need_notify = table.table_type == TableType::MaterializedView;
655 }
656 }
657
658 if is_cancelled {
659 let dropped_tables = Table::find()
660 .find_also_related(Object)
661 .filter(
662 table::Column::TableId.is_in(
663 internal_table_ids
664 .iter()
665 .cloned()
666 .chain(table_obj.iter().map(|t| t.table_id as _)),
667 ),
668 )
669 .all(&txn)
670 .await?
671 .into_iter()
672 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
673 inner
674 .dropped_tables
675 .extend(dropped_tables.map(|t| (t.id, t)));
676 }
677
678 if need_notify {
679 let obj: Option<PartialObject> = Object::find_by_id(job_id)
680 .select_only()
681 .columns([
682 object::Column::Oid,
683 object::Column::ObjType,
684 object::Column::SchemaId,
685 object::Column::DatabaseId,
686 ])
687 .into_partial_model()
688 .one(&txn)
689 .await?;
690 let obj =
691 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
692 objs.push(obj);
693 let internal_table_objs: Vec<PartialObject> = Object::find()
694 .select_only()
695 .columns([
696 object::Column::Oid,
697 object::Column::ObjType,
698 object::Column::SchemaId,
699 object::Column::DatabaseId,
700 ])
701 .join(JoinType::InnerJoin, object::Relation::Table.def())
702 .filter(table::Column::BelongsToJobId.eq(job_id))
703 .into_partial_model()
704 .all(&txn)
705 .await?;
706 objs.extend(internal_table_objs);
707 }
708
709 if table_obj.is_none()
711 && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_sink_id())
712 .select_only()
713 .column(sink::Column::TargetTable)
714 .into_tuple::<Option<TableId>>()
715 .one(&txn)
716 .await?
717 {
718 let tmp_id: Option<ObjectId> = ObjectDependency::find()
719 .select_only()
720 .column(object_dependency::Column::UsedBy)
721 .join(
722 JoinType::InnerJoin,
723 object_dependency::Relation::Object1.def(),
724 )
725 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
726 .filter(
727 object_dependency::Column::Oid
728 .eq(target_table_id)
729 .and(object::Column::ObjType.eq(ObjectType::Table))
730 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
731 )
732 .into_tuple()
733 .one(&txn)
734 .await?;
735 if let Some(tmp_id) = tmp_id {
736 tracing::warn!(
737 id = %tmp_id,
738 "aborting temp streaming job for sink into table"
739 );
740
741 Object::delete_by_id(tmp_id).exec(&txn).await?;
742 }
743 }
744
745 Object::delete_by_id(job_id).exec(&txn).await?;
746 if !internal_table_ids.is_empty() {
747 Object::delete_many()
748 .filter(object::Column::Oid.is_in(internal_table_ids))
749 .exec(&txn)
750 .await?;
751 }
752 if let Some(t) = &table_obj
753 && let Some(source_id) = t.optional_associated_source_id
754 {
755 Object::delete_by_id(source_id).exec(&txn).await?;
756 }
757
758 let err = if is_cancelled {
759 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
760 } else {
761 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
762 };
763 let abort_reason = format!("streaming job aborted {}", err.as_report());
764 for tx in inner
765 .creating_table_finish_notifier
766 .get_mut(&database_id)
767 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
768 .into_iter()
769 .flatten()
770 .flatten()
771 {
772 let _ = tx.send(Err(abort_reason.clone()));
773 }
774 txn.commit().await?;
775
776 if !objs.is_empty() {
777 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
780 .await;
781 }
782 Ok((true, Some(database_id)))
783 }
784
785 #[await_tree::instrument]
786 pub async fn post_collect_job_fragments(
787 &self,
788 job_id: JobId,
789 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
790 new_sink_downstream: Option<FragmentDownstreamRelation>,
791 split_assignment: Option<&SplitAssignment>,
792 ) -> MetaResult<()> {
793 let inner = self.inner.write().await;
794 let txn = inner.db.begin().await?;
795
796 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
797
798 if let Some(new_downstream) = new_sink_downstream {
799 insert_fragment_relations(&txn, &new_downstream).await?;
800 }
801
802 streaming_job::ActiveModel {
804 job_id: Set(job_id),
805 job_status: Set(JobStatus::Creating),
806 ..Default::default()
807 }
808 .update(&txn)
809 .await?;
810
811 if let Some(split_assignment) = split_assignment {
812 let fragment_splits = split_assignment
813 .iter()
814 .map(|(fragment_id, splits)| {
815 (
816 *fragment_id as _,
817 splits.values().flatten().cloned().collect_vec(),
818 )
819 })
820 .collect();
821
822 self.update_fragment_splits(&txn, &fragment_splits).await?;
823 }
824
825 txn.commit().await?;
826
827 Ok(())
828 }
829
830 pub async fn create_job_catalog_for_replace(
831 &self,
832 streaming_job: &StreamingJob,
833 ctx: Option<&StreamContext>,
834 specified_parallelism: Option<&NonZeroUsize>,
835 expected_original_max_parallelism: Option<usize>,
836 ) -> MetaResult<JobId> {
837 let id = streaming_job.id();
838 let inner = self.inner.write().await;
839 let txn = inner.db.begin().await?;
840
841 streaming_job.verify_version_for_replace(&txn).await?;
843 let referring_cnt = ObjectDependency::find()
845 .join(
846 JoinType::InnerJoin,
847 object_dependency::Relation::Object1.def(),
848 )
849 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
850 .filter(
851 object_dependency::Column::Oid
852 .eq(id)
853 .and(object::Column::ObjType.eq(ObjectType::Table))
854 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
855 )
856 .count(&txn)
857 .await?;
858 if referring_cnt != 0 {
859 return Err(MetaError::permission_denied(
860 "job is being altered or referenced by some creating jobs",
861 ));
862 }
863
864 let (original_max_parallelism, original_timezone, original_config_override): (
866 i32,
867 Option<String>,
868 Option<String>,
869 ) = StreamingJobModel::find_by_id(id)
870 .select_only()
871 .column(streaming_job::Column::MaxParallelism)
872 .column(streaming_job::Column::Timezone)
873 .column(streaming_job::Column::ConfigOverride)
874 .into_tuple()
875 .one(&txn)
876 .await?
877 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
878
879 if let Some(max_parallelism) = expected_original_max_parallelism
880 && original_max_parallelism != max_parallelism as i32
881 {
882 bail!(
885 "cannot use a different max parallelism \
886 when replacing streaming job, \
887 original: {}, new: {}",
888 original_max_parallelism,
889 max_parallelism
890 );
891 }
892
893 let parallelism = match specified_parallelism {
894 None => StreamingParallelism::Adaptive,
895 Some(n) => StreamingParallelism::Fixed(n.get() as _),
896 };
897
898 let ctx = StreamContext {
899 timezone: ctx
900 .map(|ctx| ctx.timezone.clone())
901 .unwrap_or(original_timezone),
902 config_override: original_config_override.unwrap_or_default().into(),
905 };
906
907 let new_obj_id = Self::create_streaming_job_obj(
909 &txn,
910 streaming_job.object_type(),
911 streaming_job.owner() as _,
912 Some(streaming_job.database_id() as _),
913 Some(streaming_job.schema_id() as _),
914 streaming_job.create_type(),
915 ctx,
916 parallelism,
917 original_max_parallelism as _,
918 None,
919 )
920 .await?;
921
922 ObjectDependency::insert(object_dependency::ActiveModel {
924 oid: Set(id.as_object_id()),
925 used_by: Set(new_obj_id.as_object_id()),
926 ..Default::default()
927 })
928 .exec(&txn)
929 .await?;
930
931 txn.commit().await?;
932
933 Ok(new_obj_id)
934 }
935
936 pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
938 let mut inner = self.inner.write().await;
939 let txn = inner.db.begin().await?;
940
941 if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
943 tracing::info!(
944 "streaming job {} is for iceberg table, wait for manual finish operation",
945 job_id
946 );
947 return Ok(());
948 }
949
950 let (notification_op, objects, updated_user_info) =
951 Self::finish_streaming_job_inner(&txn, job_id).await?;
952
953 txn.commit().await?;
954
955 let mut version = self
956 .notify_frontend(
957 notification_op,
958 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
959 )
960 .await;
961
962 if !updated_user_info.is_empty() {
964 version = self.notify_users_update(updated_user_info).await;
965 }
966
967 inner
968 .creating_table_finish_notifier
969 .values_mut()
970 .for_each(|creating_tables| {
971 if let Some(txs) = creating_tables.remove(&job_id) {
972 for tx in txs {
973 let _ = tx.send(Ok(version));
974 }
975 }
976 });
977
978 Ok(())
979 }
980
981 pub async fn finish_streaming_job_inner(
983 txn: &DatabaseTransaction,
984 job_id: JobId,
985 ) -> MetaResult<(Operation, Vec<risingwave_pb::meta::Object>, Vec<PbUserInfo>)> {
986 let job_type = Object::find_by_id(job_id)
987 .select_only()
988 .column(object::Column::ObjType)
989 .into_tuple()
990 .one(txn)
991 .await?
992 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
993
994 let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
995 .select_only()
996 .column(streaming_job::Column::CreateType)
997 .into_tuple()
998 .one(txn)
999 .await?
1000 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1001
1002 let res = Object::update_many()
1004 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1005 .col_expr(
1006 object::Column::CreatedAtClusterVersion,
1007 current_cluster_version().into(),
1008 )
1009 .filter(object::Column::Oid.eq(job_id))
1010 .exec(txn)
1011 .await?;
1012 if res.rows_affected == 0 {
1013 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1014 }
1015
1016 let job = streaming_job::ActiveModel {
1018 job_id: Set(job_id),
1019 job_status: Set(JobStatus::Created),
1020 ..Default::default()
1021 };
1022 job.update(txn).await?;
1023
1024 let internal_table_objs = Table::find()
1026 .find_also_related(Object)
1027 .filter(table::Column::BelongsToJobId.eq(job_id))
1028 .all(txn)
1029 .await?;
1030 let mut objects = internal_table_objs
1031 .iter()
1032 .map(|(table, obj)| PbObject {
1033 object_info: Some(PbObjectInfo::Table(
1034 ObjectModel(table.clone(), obj.clone().unwrap()).into(),
1035 )),
1036 })
1037 .collect_vec();
1038 let mut notification_op = if create_type == CreateType::Background {
1039 NotificationOperation::Update
1040 } else {
1041 NotificationOperation::Add
1042 };
1043 let mut updated_user_info = vec![];
1044
1045 match job_type {
1046 ObjectType::Table => {
1047 let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1048 .find_also_related(Object)
1049 .one(txn)
1050 .await?
1051 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1052 if table.table_type == TableType::MaterializedView {
1053 notification_op = NotificationOperation::Update;
1054 }
1055
1056 if let Some(source_id) = table.optional_associated_source_id {
1057 let (src, obj) = Source::find_by_id(source_id)
1058 .find_also_related(Object)
1059 .one(txn)
1060 .await?
1061 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1062 objects.push(PbObject {
1063 object_info: Some(PbObjectInfo::Source(
1064 ObjectModel(src, obj.unwrap()).into(),
1065 )),
1066 });
1067 }
1068 objects.push(PbObject {
1069 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1070 });
1071 }
1072 ObjectType::Sink => {
1073 let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1074 .find_also_related(Object)
1075 .one(txn)
1076 .await?
1077 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1078 objects.push(PbObject {
1079 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
1080 });
1081 }
1082 ObjectType::Index => {
1083 let (index, obj) = Index::find_by_id(job_id.as_index_id())
1084 .find_also_related(Object)
1085 .one(txn)
1086 .await?
1087 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1088 {
1089 let (table, obj) = Table::find_by_id(index.index_table_id)
1090 .find_also_related(Object)
1091 .one(txn)
1092 .await?
1093 .ok_or_else(|| {
1094 MetaError::catalog_id_not_found("table", index.index_table_id)
1095 })?;
1096 objects.push(PbObject {
1097 object_info: Some(PbObjectInfo::Table(
1098 ObjectModel(table, obj.unwrap()).into(),
1099 )),
1100 });
1101 }
1102
1103 let primary_table_privileges = UserPrivilege::find()
1106 .filter(
1107 user_privilege::Column::Oid
1108 .eq(index.primary_table_id)
1109 .and(user_privilege::Column::Action.eq(Action::Select)),
1110 )
1111 .all(txn)
1112 .await?;
1113 if !primary_table_privileges.is_empty() {
1114 let index_state_table_ids: Vec<TableId> = Table::find()
1115 .select_only()
1116 .column(table::Column::TableId)
1117 .filter(
1118 table::Column::BelongsToJobId
1119 .eq(job_id)
1120 .or(table::Column::TableId.eq(index.index_table_id)),
1121 )
1122 .into_tuple()
1123 .all(txn)
1124 .await?;
1125 let mut new_privileges = vec![];
1126 for privilege in &primary_table_privileges {
1127 for state_table_id in &index_state_table_ids {
1128 new_privileges.push(user_privilege::ActiveModel {
1129 id: Default::default(),
1130 oid: Set(state_table_id.as_object_id()),
1131 user_id: Set(privilege.user_id),
1132 action: Set(Action::Select),
1133 dependent_id: Set(privilege.dependent_id),
1134 granted_by: Set(privilege.granted_by),
1135 with_grant_option: Set(privilege.with_grant_option),
1136 });
1137 }
1138 }
1139 UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1140
1141 updated_user_info = list_user_info_by_ids(
1142 primary_table_privileges.into_iter().map(|p| p.user_id),
1143 txn,
1144 )
1145 .await?;
1146 }
1147
1148 objects.push(PbObject {
1149 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1150 });
1151 }
1152 ObjectType::Source => {
1153 let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1154 .find_also_related(Object)
1155 .one(txn)
1156 .await?
1157 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1158 objects.push(PbObject {
1159 object_info: Some(PbObjectInfo::Source(
1160 ObjectModel(source, obj.unwrap()).into(),
1161 )),
1162 });
1163 }
1164 _ => unreachable!("invalid job type: {:?}", job_type),
1165 }
1166
1167 if job_type != ObjectType::Index {
1168 updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1169 }
1170
1171 Ok((notification_op, objects, updated_user_info))
1172 }
1173
1174 pub async fn finish_replace_streaming_job(
1175 &self,
1176 tmp_id: JobId,
1177 streaming_job: StreamingJob,
1178 replace_upstream: FragmentReplaceUpstream,
1179 sink_into_table_context: SinkIntoTableContext,
1180 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1181 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1182 ) -> MetaResult<NotificationVersion> {
1183 let inner = self.inner.write().await;
1184 let txn = inner.db.begin().await?;
1185
1186 let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1187 tmp_id,
1188 replace_upstream,
1189 sink_into_table_context,
1190 &txn,
1191 streaming_job,
1192 drop_table_connector_ctx,
1193 auto_refresh_schema_sinks,
1194 )
1195 .await?;
1196
1197 txn.commit().await?;
1198
1199 let mut version = self
1200 .notify_frontend(
1201 NotificationOperation::Update,
1202 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1203 )
1204 .await;
1205
1206 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1207 self.notify_users_update(user_infos).await;
1208 version = self
1209 .notify_frontend(
1210 NotificationOperation::Delete,
1211 build_object_group_for_delete(to_drop_objects),
1212 )
1213 .await;
1214 }
1215
1216 Ok(version)
1217 }
1218
1219 pub async fn finish_replace_streaming_job_inner(
1220 tmp_id: JobId,
1221 replace_upstream: FragmentReplaceUpstream,
1222 SinkIntoTableContext {
1223 updated_sink_catalogs,
1224 }: SinkIntoTableContext,
1225 txn: &DatabaseTransaction,
1226 streaming_job: StreamingJob,
1227 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1228 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1229 ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1230 let original_job_id = streaming_job.id();
1231 let job_type = streaming_job.job_type();
1232
1233 let mut index_item_rewriter = None;
1234
1235 match streaming_job {
1237 StreamingJob::Table(_source, table, _table_job_type) => {
1238 let original_column_catalogs =
1241 get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1242
1243 index_item_rewriter = Some({
1244 let original_columns = original_column_catalogs
1245 .to_protobuf()
1246 .into_iter()
1247 .map(|c| c.column_desc.unwrap())
1248 .collect_vec();
1249 let new_columns = table
1250 .columns
1251 .iter()
1252 .map(|c| c.column_desc.clone().unwrap())
1253 .collect_vec();
1254
1255 IndexItemRewriter {
1256 original_columns,
1257 new_columns,
1258 }
1259 });
1260
1261 for sink_id in updated_sink_catalogs {
1263 sink::ActiveModel {
1264 sink_id: Set(sink_id as _),
1265 original_target_columns: Set(Some(original_column_catalogs.clone())),
1266 ..Default::default()
1267 }
1268 .update(txn)
1269 .await?;
1270 }
1271 let mut table = table::ActiveModel::from(table);
1273 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1274 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1275 {
1276 table.optional_associated_source_id = Set(None);
1278 }
1279
1280 table.update(txn).await?;
1281 }
1282 StreamingJob::Source(source) => {
1283 let source = source::ActiveModel::from(source);
1285 source.update(txn).await?;
1286 }
1287 StreamingJob::MaterializedView(table) => {
1288 let table = table::ActiveModel::from(table);
1290 table.update(txn).await?;
1291 }
1292 _ => unreachable!(
1293 "invalid streaming job type: {:?}",
1294 streaming_job.job_type_str()
1295 ),
1296 }
1297
1298 async fn finish_fragments(
1299 txn: &DatabaseTransaction,
1300 tmp_id: JobId,
1301 original_job_id: JobId,
1302 replace_upstream: FragmentReplaceUpstream,
1303 ) -> MetaResult<()> {
1304 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1308 .select_only()
1309 .columns([
1310 fragment::Column::FragmentId,
1311 fragment::Column::StateTableIds,
1312 ])
1313 .filter(fragment::Column::JobId.eq(tmp_id))
1314 .into_tuple()
1315 .all(txn)
1316 .await?;
1317 for (fragment_id, state_table_ids) in fragment_info {
1318 for state_table_id in state_table_ids.into_inner() {
1319 let state_table_id = TableId::new(state_table_id as _);
1320 table::ActiveModel {
1321 table_id: Set(state_table_id),
1322 fragment_id: Set(Some(fragment_id)),
1323 ..Default::default()
1325 }
1326 .update(txn)
1327 .await?;
1328 }
1329 }
1330
1331 Fragment::delete_many()
1333 .filter(fragment::Column::JobId.eq(original_job_id))
1334 .exec(txn)
1335 .await?;
1336 Fragment::update_many()
1337 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1338 .filter(fragment::Column::JobId.eq(tmp_id))
1339 .exec(txn)
1340 .await?;
1341
1342 for (fragment_id, fragment_replace_map) in replace_upstream {
1345 let (fragment_id, mut stream_node) =
1346 Fragment::find_by_id(fragment_id as FragmentId)
1347 .select_only()
1348 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1349 .into_tuple::<(FragmentId, StreamNode)>()
1350 .one(txn)
1351 .await?
1352 .map(|(id, node)| (id, node.to_protobuf()))
1353 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1354
1355 visit_stream_node_mut(&mut stream_node, |body| {
1356 if let PbNodeBody::Merge(m) = body
1357 && let Some(new_fragment_id) =
1358 fragment_replace_map.get(&m.upstream_fragment_id)
1359 {
1360 m.upstream_fragment_id = *new_fragment_id;
1361 }
1362 });
1363 fragment::ActiveModel {
1364 fragment_id: Set(fragment_id),
1365 stream_node: Set(StreamNode::from(&stream_node)),
1366 ..Default::default()
1367 }
1368 .update(txn)
1369 .await?;
1370 }
1371
1372 Object::delete_by_id(tmp_id).exec(txn).await?;
1374
1375 Ok(())
1376 }
1377
1378 finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1379
1380 let mut objects = vec![];
1382 match job_type {
1383 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1384 let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1385 .find_also_related(Object)
1386 .one(txn)
1387 .await?
1388 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1389 objects.push(PbObject {
1390 object_info: Some(PbObjectInfo::Table(
1391 ObjectModel(table, table_obj.unwrap()).into(),
1392 )),
1393 })
1394 }
1395 StreamingJobType::Source => {
1396 let (source, source_obj) =
1397 Source::find_by_id(original_job_id.as_shared_source_id())
1398 .find_also_related(Object)
1399 .one(txn)
1400 .await?
1401 .ok_or_else(|| {
1402 MetaError::catalog_id_not_found("object", original_job_id)
1403 })?;
1404 objects.push(PbObject {
1405 object_info: Some(PbObjectInfo::Source(
1406 ObjectModel(source, source_obj.unwrap()).into(),
1407 )),
1408 })
1409 }
1410 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1411 }
1412
1413 if let Some(expr_rewriter) = index_item_rewriter {
1414 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1415 .select_only()
1416 .columns([index::Column::IndexId, index::Column::IndexItems])
1417 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1418 .into_tuple()
1419 .all(txn)
1420 .await?;
1421 for (index_id, nodes) in index_items {
1422 let mut pb_nodes = nodes.to_protobuf();
1423 pb_nodes
1424 .iter_mut()
1425 .for_each(|x| expr_rewriter.rewrite_expr(x));
1426 let index = index::ActiveModel {
1427 index_id: Set(index_id),
1428 index_items: Set(pb_nodes.into()),
1429 ..Default::default()
1430 }
1431 .update(txn)
1432 .await?;
1433 let index_obj = index
1434 .find_related(Object)
1435 .one(txn)
1436 .await?
1437 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1438 objects.push(PbObject {
1439 object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1440 });
1441 }
1442 }
1443
1444 if let Some(sinks) = auto_refresh_schema_sinks {
1445 for finish_sink_context in sinks {
1446 finish_fragments(
1447 txn,
1448 finish_sink_context.tmp_sink_id.as_job_id(),
1449 finish_sink_context.original_sink_id.as_job_id(),
1450 Default::default(),
1451 )
1452 .await?;
1453 let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1454 .find_also_related(Object)
1455 .one(txn)
1456 .await?
1457 .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1458 let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1459 Sink::update(sink::ActiveModel {
1460 sink_id: Set(finish_sink_context.original_sink_id),
1461 columns: Set(columns.clone()),
1462 ..Default::default()
1463 })
1464 .exec(txn)
1465 .await?;
1466 sink.columns = columns;
1467 objects.push(PbObject {
1468 object_info: Some(PbObjectInfo::Sink(
1469 ObjectModel(sink, sink_obj.unwrap()).into(),
1470 )),
1471 });
1472 if let Some((log_store_table_id, new_log_store_table_columns)) =
1473 finish_sink_context.new_log_store_table
1474 {
1475 let new_log_store_table_columns: ColumnCatalogArray =
1476 new_log_store_table_columns.into();
1477 let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1478 .find_also_related(Object)
1479 .one(txn)
1480 .await?
1481 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1482 Table::update(table::ActiveModel {
1483 table_id: Set(log_store_table_id),
1484 columns: Set(new_log_store_table_columns.clone()),
1485 ..Default::default()
1486 })
1487 .exec(txn)
1488 .await?;
1489 table.columns = new_log_store_table_columns;
1490 objects.push(PbObject {
1491 object_info: Some(PbObjectInfo::Table(
1492 ObjectModel(table, table_obj.unwrap()).into(),
1493 )),
1494 });
1495 }
1496 }
1497 }
1498
1499 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1500 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1501 notification_objs =
1502 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1503 }
1504
1505 Ok((objects, notification_objs))
1506 }
1507
1508 pub async fn try_abort_replacing_streaming_job(
1510 &self,
1511 tmp_job_id: JobId,
1512 tmp_sink_ids: Option<Vec<ObjectId>>,
1513 ) -> MetaResult<()> {
1514 let inner = self.inner.write().await;
1515 let txn = inner.db.begin().await?;
1516 Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1517 if let Some(tmp_sink_ids) = tmp_sink_ids {
1518 for tmp_sink_id in tmp_sink_ids {
1519 Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1520 }
1521 }
1522 txn.commit().await?;
1523 Ok(())
1524 }
1525
1526 pub async fn update_source_rate_limit_by_source_id(
1529 &self,
1530 source_id: SourceId,
1531 rate_limit: Option<u32>,
1532 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1533 let inner = self.inner.read().await;
1534 let txn = inner.db.begin().await?;
1535
1536 {
1537 let active_source = source::ActiveModel {
1538 source_id: Set(source_id),
1539 rate_limit: Set(rate_limit.map(|v| v as i32)),
1540 ..Default::default()
1541 };
1542 active_source.update(&txn).await?;
1543 }
1544
1545 let (source, obj) = Source::find_by_id(source_id)
1546 .find_also_related(Object)
1547 .one(&txn)
1548 .await?
1549 .ok_or_else(|| {
1550 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1551 })?;
1552
1553 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1554 let streaming_job_ids: Vec<JobId> =
1555 if let Some(table_id) = source.optional_associated_table_id {
1556 vec![table_id.as_job_id()]
1557 } else if let Some(source_info) = &source.source_info
1558 && source_info.to_protobuf().is_shared()
1559 {
1560 vec![source_id.as_share_source_job_id()]
1561 } else {
1562 ObjectDependency::find()
1563 .select_only()
1564 .column(object_dependency::Column::UsedBy)
1565 .filter(object_dependency::Column::Oid.eq(source_id))
1566 .into_tuple()
1567 .all(&txn)
1568 .await?
1569 };
1570
1571 if streaming_job_ids.is_empty() {
1572 return Err(MetaError::invalid_parameter(format!(
1573 "source id {source_id} not used by any streaming job"
1574 )));
1575 }
1576
1577 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1578 .select_only()
1579 .columns([
1580 fragment::Column::FragmentId,
1581 fragment::Column::FragmentTypeMask,
1582 fragment::Column::StreamNode,
1583 ])
1584 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1585 .into_tuple()
1586 .all(&txn)
1587 .await?;
1588 let mut fragments = fragments
1589 .into_iter()
1590 .map(|(id, mask, stream_node)| {
1591 (
1592 id,
1593 FragmentTypeMask::from(mask as u32),
1594 stream_node.to_protobuf(),
1595 )
1596 })
1597 .collect_vec();
1598
1599 fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1600 let mut found = false;
1601 if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1602 visit_stream_node_mut(stream_node, |node| {
1603 if let PbNodeBody::Source(node) = node
1604 && let Some(node_inner) = &mut node.source_inner
1605 && node_inner.source_id == source_id
1606 {
1607 node_inner.rate_limit = rate_limit;
1608 found = true;
1609 }
1610 });
1611 }
1612 if is_fs_source {
1613 visit_stream_node_mut(stream_node, |node| {
1616 if let PbNodeBody::StreamFsFetch(node) = node {
1617 fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1618 if let Some(node_inner) = &mut node.node_inner
1619 && node_inner.source_id == source_id
1620 {
1621 node_inner.rate_limit = rate_limit;
1622 found = true;
1623 }
1624 }
1625 });
1626 }
1627 found
1628 });
1629
1630 assert!(
1631 !fragments.is_empty(),
1632 "source id should be used by at least one fragment"
1633 );
1634 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1635
1636 for (id, fragment_type_mask, stream_node) in fragments {
1637 fragment::ActiveModel {
1638 fragment_id: Set(id),
1639 fragment_type_mask: Set(fragment_type_mask.into()),
1640 stream_node: Set(StreamNode::from(&stream_node)),
1641 ..Default::default()
1642 }
1643 .update(&txn)
1644 .await?;
1645 }
1646
1647 txn.commit().await?;
1648
1649 let fragment_actors = self.get_fragment_actors_from_running_info(fragment_ids.into_iter());
1650
1651 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1652 let _version = self
1653 .notify_frontend(
1654 NotificationOperation::Update,
1655 NotificationInfo::ObjectGroup(PbObjectGroup {
1656 objects: vec![PbObject {
1657 object_info: Some(relation_info),
1658 }],
1659 }),
1660 )
1661 .await;
1662
1663 Ok(fragment_actors)
1664 }
1665
1666 fn get_fragment_actors_from_running_info(
1667 &self,
1668 fragment_ids: impl Iterator<Item = FragmentId>,
1669 ) -> HashMap<FragmentId, Vec<ActorId>> {
1670 let mut fragment_actors: HashMap<FragmentId, Vec<ActorId>> = HashMap::new();
1671
1672 let info = self.env.shared_actor_infos().read_guard();
1673
1674 for fragment_id in fragment_ids {
1675 let SharedFragmentInfo { actors, .. } = info.get_fragment(fragment_id).unwrap();
1676 fragment_actors
1677 .entry(fragment_id as _)
1678 .or_default()
1679 .extend(actors.keys().copied());
1680 }
1681
1682 fragment_actors
1683 }
1684
1685 pub async fn mutate_fragments_by_job_id(
1688 &self,
1689 job_id: JobId,
1690 mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1692 err_msg: &'static str,
1694 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1695 let inner = self.inner.read().await;
1696 let txn = inner.db.begin().await?;
1697
1698 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1699 .select_only()
1700 .columns([
1701 fragment::Column::FragmentId,
1702 fragment::Column::FragmentTypeMask,
1703 fragment::Column::StreamNode,
1704 ])
1705 .filter(fragment::Column::JobId.eq(job_id))
1706 .into_tuple()
1707 .all(&txn)
1708 .await?;
1709 let mut fragments = fragments
1710 .into_iter()
1711 .map(|(id, mask, stream_node)| {
1712 (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1713 })
1714 .collect_vec();
1715
1716 let fragments = fragments
1717 .iter_mut()
1718 .map(|(_, fragment_type_mask, stream_node)| {
1719 fragments_mutation_fn(*fragment_type_mask, stream_node)
1720 })
1721 .collect::<MetaResult<Vec<bool>>>()?
1722 .into_iter()
1723 .zip_eq_debug(std::mem::take(&mut fragments))
1724 .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1725 .collect::<Vec<_>>();
1726
1727 if fragments.is_empty() {
1728 return Err(MetaError::invalid_parameter(format!(
1729 "job id {job_id}: {}",
1730 err_msg
1731 )));
1732 }
1733
1734 let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
1735 for (id, _, stream_node) in fragments {
1736 fragment::ActiveModel {
1737 fragment_id: Set(id),
1738 stream_node: Set(StreamNode::from(&stream_node)),
1739 ..Default::default()
1740 }
1741 .update(&txn)
1742 .await?;
1743 }
1744
1745 txn.commit().await?;
1746
1747 let fragment_actors =
1748 self.get_fragment_actors_from_running_info(fragment_ids.iter().copied());
1749
1750 Ok(fragment_actors)
1751 }
1752
1753 async fn mutate_fragment_by_fragment_id(
1754 &self,
1755 fragment_id: FragmentId,
1756 mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1757 err_msg: &'static str,
1758 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1759 let inner = self.inner.read().await;
1760 let txn = inner.db.begin().await?;
1761
1762 let (fragment_type_mask, stream_node): (i32, StreamNode) =
1763 Fragment::find_by_id(fragment_id)
1764 .select_only()
1765 .columns([
1766 fragment::Column::FragmentTypeMask,
1767 fragment::Column::StreamNode,
1768 ])
1769 .into_tuple()
1770 .one(&txn)
1771 .await?
1772 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1773 let mut pb_stream_node = stream_node.to_protobuf();
1774 let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1775
1776 if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1777 return Err(MetaError::invalid_parameter(format!(
1778 "fragment id {fragment_id}: {}",
1779 err_msg
1780 )));
1781 }
1782
1783 fragment::ActiveModel {
1784 fragment_id: Set(fragment_id),
1785 stream_node: Set(stream_node),
1786 ..Default::default()
1787 }
1788 .update(&txn)
1789 .await?;
1790
1791 let fragment_actors =
1792 self.get_fragment_actors_from_running_info(std::iter::once(fragment_id));
1793
1794 txn.commit().await?;
1795
1796 Ok(fragment_actors)
1797 }
1798
1799 pub async fn update_backfill_rate_limit_by_job_id(
1802 &self,
1803 job_id: JobId,
1804 rate_limit: Option<u32>,
1805 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1806 let update_backfill_rate_limit =
1807 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1808 let mut found = false;
1809 if fragment_type_mask
1810 .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1811 {
1812 visit_stream_node_mut(stream_node, |node| match node {
1813 PbNodeBody::StreamCdcScan(node) => {
1814 node.rate_limit = rate_limit;
1815 found = true;
1816 }
1817 PbNodeBody::StreamScan(node) => {
1818 node.rate_limit = rate_limit;
1819 found = true;
1820 }
1821 PbNodeBody::SourceBackfill(node) => {
1822 node.rate_limit = rate_limit;
1823 found = true;
1824 }
1825 PbNodeBody::Sink(node) => {
1826 node.rate_limit = rate_limit;
1827 found = true;
1828 }
1829 _ => {}
1830 });
1831 }
1832 Ok(found)
1833 };
1834
1835 self.mutate_fragments_by_job_id(
1836 job_id,
1837 update_backfill_rate_limit,
1838 "stream scan node or source node not found",
1839 )
1840 .await
1841 }
1842
1843 pub async fn update_sink_rate_limit_by_job_id(
1846 &self,
1847 sink_id: SinkId,
1848 rate_limit: Option<u32>,
1849 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1850 let update_sink_rate_limit =
1851 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1852 let mut found = Ok(false);
1853 if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1854 visit_stream_node_mut(stream_node, |node| {
1855 if let PbNodeBody::Sink(node) = node {
1856 if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1857 found = Err(MetaError::invalid_parameter(
1858 "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1859 ));
1860 return;
1861 }
1862 node.rate_limit = rate_limit;
1863 found = Ok(true);
1864 }
1865 });
1866 }
1867 found
1868 };
1869
1870 self.mutate_fragments_by_job_id(
1871 sink_id.as_job_id(),
1872 update_sink_rate_limit,
1873 "sink node not found",
1874 )
1875 .await
1876 }
1877
1878 pub async fn update_dml_rate_limit_by_job_id(
1879 &self,
1880 job_id: JobId,
1881 rate_limit: Option<u32>,
1882 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1883 let update_dml_rate_limit =
1884 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1885 let mut found = false;
1886 if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1887 visit_stream_node_mut(stream_node, |node| {
1888 if let PbNodeBody::Dml(node) = node {
1889 node.rate_limit = rate_limit;
1890 found = true;
1891 }
1892 });
1893 }
1894 Ok(found)
1895 };
1896
1897 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1898 .await
1899 }
1900
1901 pub async fn update_source_props_by_source_id(
1902 &self,
1903 source_id: SourceId,
1904 alter_props: BTreeMap<String, String>,
1905 alter_secret_refs: BTreeMap<String, PbSecretRef>,
1906 ) -> MetaResult<WithOptionsSecResolved> {
1907 let inner = self.inner.read().await;
1908 let txn = inner.db.begin().await?;
1909
1910 let (source, _obj) = Source::find_by_id(source_id)
1911 .find_also_related(Object)
1912 .one(&txn)
1913 .await?
1914 .ok_or_else(|| {
1915 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1916 })?;
1917 let connector = source.with_properties.0.get_connector().unwrap();
1918 let is_shared_source = source.is_shared();
1919
1920 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
1921 if !is_shared_source {
1922 dep_source_job_ids = ObjectDependency::find()
1924 .select_only()
1925 .column(object_dependency::Column::UsedBy)
1926 .filter(object_dependency::Column::Oid.eq(source_id))
1927 .into_tuple()
1928 .all(&txn)
1929 .await?;
1930 }
1931
1932 let prop_keys: Vec<String> = alter_props
1934 .keys()
1935 .chain(alter_secret_refs.keys())
1936 .cloned()
1937 .collect();
1938 risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
1939 &connector, &prop_keys,
1940 )?;
1941
1942 let mut options_with_secret = WithOptionsSecResolved::new(
1943 source.with_properties.0.clone(),
1944 source
1945 .secret_ref
1946 .map(|secret_ref| secret_ref.to_protobuf())
1947 .unwrap_or_default(),
1948 );
1949 let (to_add_secret_dep, to_remove_secret_dep) =
1950 options_with_secret.handle_update(alter_props, alter_secret_refs)?;
1951
1952 tracing::info!(
1953 "applying new properties to source: source_id={}, options_with_secret={:?}",
1954 source_id,
1955 options_with_secret
1956 );
1957 let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
1959 let mut associate_table_id = None;
1962
1963 let mut preferred_id = source_id.as_object_id();
1967 let rewrite_sql = {
1968 let definition = source.definition.clone();
1969
1970 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1971 .map_err(|e| {
1972 MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
1973 anyhow!(e).context("Failed to parse source definition SQL"),
1974 )))
1975 })?
1976 .try_into()
1977 .unwrap();
1978
1979 async fn format_with_option_secret_resolved(
1993 txn: &DatabaseTransaction,
1994 options_with_secret: &WithOptionsSecResolved,
1995 ) -> MetaResult<Vec<SqlOption>> {
1996 let mut options = Vec::new();
1997 for (k, v) in options_with_secret.as_plaintext() {
1998 let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
1999 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2000 options.push(sql_option);
2001 }
2002 for (k, v) in options_with_secret.as_secret() {
2003 if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2004 let sql_option =
2005 SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2006 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2007 options.push(sql_option);
2008 } else {
2009 return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2010 }
2011 }
2012 Ok(options)
2013 }
2014
2015 match &mut stmt {
2016 Statement::CreateSource { stmt } => {
2017 stmt.with_properties.0 =
2018 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2019 }
2020 Statement::CreateTable { with_options, .. } => {
2021 *with_options =
2022 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2023 associate_table_id = source.optional_associated_table_id;
2024 preferred_id = associate_table_id.unwrap().as_object_id();
2025 }
2026 _ => unreachable!(),
2027 }
2028
2029 stmt.to_string()
2030 };
2031
2032 {
2033 if !to_add_secret_dep.is_empty() {
2035 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2036 object_dependency::ActiveModel {
2037 oid: Set(secret_id.into()),
2038 used_by: Set(preferred_id),
2039 ..Default::default()
2040 }
2041 }))
2042 .exec(&txn)
2043 .await?;
2044 }
2045 if !to_remove_secret_dep.is_empty() {
2046 let _ = ObjectDependency::delete_many()
2048 .filter(
2049 object_dependency::Column::Oid
2050 .is_in(to_remove_secret_dep)
2051 .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2052 )
2053 .exec(&txn)
2054 .await?;
2055 }
2056 }
2057
2058 let active_source_model = source::ActiveModel {
2059 source_id: Set(source_id),
2060 definition: Set(rewrite_sql.clone()),
2061 with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2062 secret_ref: Set((!options_with_secret.as_secret().is_empty())
2063 .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2064 ..Default::default()
2065 };
2066 active_source_model.update(&txn).await?;
2067
2068 if let Some(associate_table_id) = associate_table_id {
2069 let active_table_model = table::ActiveModel {
2071 table_id: Set(associate_table_id),
2072 definition: Set(rewrite_sql),
2073 ..Default::default()
2074 };
2075 active_table_model.update(&txn).await?;
2076 }
2077
2078 let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2079 associate_table_id.as_job_id()
2081 } else {
2082 source_id.as_share_source_job_id()
2083 }]
2084 .into_iter()
2085 .chain(dep_source_job_ids.into_iter())
2086 .collect_vec();
2087
2088 update_connector_props_fragments(
2090 &txn,
2091 to_check_job_ids,
2092 FragmentTypeFlag::Source,
2093 |node, found| {
2094 if let PbNodeBody::Source(node) = node
2095 && let Some(source_inner) = &mut node.source_inner
2096 {
2097 source_inner.with_properties = options_with_secret.as_plaintext().clone();
2098 source_inner.secret_refs = options_with_secret.as_secret().clone();
2099 *found = true;
2100 }
2101 },
2102 is_shared_source,
2103 )
2104 .await?;
2105
2106 let mut to_update_objs = Vec::with_capacity(2);
2107 let (source, obj) = Source::find_by_id(source_id)
2108 .find_also_related(Object)
2109 .one(&txn)
2110 .await?
2111 .ok_or_else(|| {
2112 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2113 })?;
2114 to_update_objs.push(PbObject {
2115 object_info: Some(PbObjectInfo::Source(
2116 ObjectModel(source, obj.unwrap()).into(),
2117 )),
2118 });
2119
2120 if let Some(associate_table_id) = associate_table_id {
2121 let (table, obj) = Table::find_by_id(associate_table_id)
2122 .find_also_related(Object)
2123 .one(&txn)
2124 .await?
2125 .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2126 to_update_objs.push(PbObject {
2127 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2128 });
2129 }
2130
2131 txn.commit().await?;
2132
2133 self.notify_frontend(
2134 NotificationOperation::Update,
2135 NotificationInfo::ObjectGroup(PbObjectGroup {
2136 objects: to_update_objs,
2137 }),
2138 )
2139 .await;
2140
2141 Ok(options_with_secret)
2142 }
2143
2144 pub async fn update_sink_props_by_sink_id(
2145 &self,
2146 sink_id: SinkId,
2147 props: BTreeMap<String, String>,
2148 ) -> MetaResult<HashMap<String, String>> {
2149 let inner = self.inner.read().await;
2150 let txn = inner.db.begin().await?;
2151
2152 let (sink, _obj) = Sink::find_by_id(sink_id)
2153 .find_also_related(Object)
2154 .one(&txn)
2155 .await?
2156 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2157 validate_sink_props(&sink, &props)?;
2158 let definition = sink.definition.clone();
2159 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2160 .map_err(|e| SinkError::Config(anyhow!(e)))?
2161 .try_into()
2162 .unwrap();
2163 if let Statement::CreateSink { stmt } = &mut stmt {
2164 update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2165 } else {
2166 panic!("definition is not a create sink statement")
2167 }
2168 let mut new_config = sink.properties.clone().into_inner();
2169 new_config.extend(props.clone());
2170
2171 let definition = stmt.to_string();
2172 let active_sink = sink::ActiveModel {
2173 sink_id: Set(sink_id),
2174 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2175 definition: Set(definition),
2176 ..Default::default()
2177 };
2178 active_sink.update(&txn).await?;
2179
2180 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2181 let (sink, obj) = Sink::find_by_id(sink_id)
2182 .find_also_related(Object)
2183 .one(&txn)
2184 .await?
2185 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2186 txn.commit().await?;
2187 let relation_infos = vec![PbObject {
2188 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2189 }];
2190
2191 let _version = self
2192 .notify_frontend(
2193 NotificationOperation::Update,
2194 NotificationInfo::ObjectGroup(PbObjectGroup {
2195 objects: relation_infos,
2196 }),
2197 )
2198 .await;
2199
2200 Ok(props.into_iter().collect())
2201 }
2202
2203 pub async fn update_iceberg_table_props_by_table_id(
2204 &self,
2205 table_id: TableId,
2206 props: BTreeMap<String, String>,
2207 alter_iceberg_table_props: Option<
2208 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2209 >,
2210 ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2211 let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2212 ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2213 let inner = self.inner.read().await;
2214 let txn = inner.db.begin().await?;
2215
2216 let (sink, _obj) = Sink::find_by_id(sink_id)
2217 .find_also_related(Object)
2218 .one(&txn)
2219 .await?
2220 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2221 validate_sink_props(&sink, &props)?;
2222
2223 let definition = sink.definition.clone();
2224 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2225 .map_err(|e| SinkError::Config(anyhow!(e)))?
2226 .try_into()
2227 .unwrap();
2228 if let Statement::CreateTable {
2229 with_options,
2230 engine,
2231 ..
2232 } = &mut stmt
2233 {
2234 if !matches!(engine, Engine::Iceberg) {
2235 return Err(SinkError::Config(anyhow!(
2236 "only iceberg table can be altered as sink"
2237 ))
2238 .into());
2239 }
2240 update_stmt_with_props(with_options, &props)?;
2241 } else {
2242 panic!("definition is not a create iceberg table statement")
2243 }
2244 let mut new_config = sink.properties.clone().into_inner();
2245 new_config.extend(props.clone());
2246
2247 let definition = stmt.to_string();
2248 let active_sink = sink::ActiveModel {
2249 sink_id: Set(sink_id),
2250 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2251 definition: Set(definition.clone()),
2252 ..Default::default()
2253 };
2254 let active_source = source::ActiveModel {
2255 source_id: Set(source_id),
2256 definition: Set(definition.clone()),
2257 ..Default::default()
2258 };
2259 let active_table = table::ActiveModel {
2260 table_id: Set(table_id),
2261 definition: Set(definition),
2262 ..Default::default()
2263 };
2264 active_sink.update(&txn).await?;
2265 active_source.update(&txn).await?;
2266 active_table.update(&txn).await?;
2267
2268 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2269
2270 let (sink, sink_obj) = Sink::find_by_id(sink_id)
2271 .find_also_related(Object)
2272 .one(&txn)
2273 .await?
2274 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2275 let (source, source_obj) = Source::find_by_id(source_id)
2276 .find_also_related(Object)
2277 .one(&txn)
2278 .await?
2279 .ok_or_else(|| {
2280 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2281 })?;
2282 let (table, table_obj) = Table::find_by_id(table_id)
2283 .find_also_related(Object)
2284 .one(&txn)
2285 .await?
2286 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2287 txn.commit().await?;
2288 let relation_infos = vec![
2289 PbObject {
2290 object_info: Some(PbObjectInfo::Sink(
2291 ObjectModel(sink, sink_obj.unwrap()).into(),
2292 )),
2293 },
2294 PbObject {
2295 object_info: Some(PbObjectInfo::Source(
2296 ObjectModel(source, source_obj.unwrap()).into(),
2297 )),
2298 },
2299 PbObject {
2300 object_info: Some(PbObjectInfo::Table(
2301 ObjectModel(table, table_obj.unwrap()).into(),
2302 )),
2303 },
2304 ];
2305 let _version = self
2306 .notify_frontend(
2307 NotificationOperation::Update,
2308 NotificationInfo::ObjectGroup(PbObjectGroup {
2309 objects: relation_infos,
2310 }),
2311 )
2312 .await;
2313
2314 Ok((props.into_iter().collect(), sink_id))
2315 }
2316
2317 pub async fn update_fragment_rate_limit_by_fragment_id(
2318 &self,
2319 fragment_id: FragmentId,
2320 rate_limit: Option<u32>,
2321 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2322 let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2323 stream_node: &mut PbStreamNode| {
2324 let mut found = false;
2325 if fragment_type_mask.contains_any(
2326 FragmentTypeFlag::dml_rate_limit_fragments()
2327 .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2328 .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2329 .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2330 ) {
2331 visit_stream_node_mut(stream_node, |node| {
2332 if let PbNodeBody::Dml(node) = node {
2333 node.rate_limit = rate_limit;
2334 found = true;
2335 }
2336 if let PbNodeBody::Sink(node) = node {
2337 node.rate_limit = rate_limit;
2338 found = true;
2339 }
2340 if let PbNodeBody::StreamCdcScan(node) = node {
2341 node.rate_limit = rate_limit;
2342 found = true;
2343 }
2344 if let PbNodeBody::StreamScan(node) = node {
2345 node.rate_limit = rate_limit;
2346 found = true;
2347 }
2348 if let PbNodeBody::SourceBackfill(node) = node {
2349 node.rate_limit = rate_limit;
2350 found = true;
2351 }
2352 });
2353 }
2354 found
2355 };
2356 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2357 .await
2358 }
2359
2360 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2363 let inner = self.inner.read().await;
2364 let txn = inner.db.begin().await?;
2365
2366 let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2367 .select_only()
2368 .columns([
2369 fragment::Column::FragmentId,
2370 fragment::Column::JobId,
2371 fragment::Column::FragmentTypeMask,
2372 fragment::Column::StreamNode,
2373 ])
2374 .filter(FragmentTypeMask::intersects_any(
2375 FragmentTypeFlag::rate_limit_fragments(),
2376 ))
2377 .into_tuple()
2378 .all(&txn)
2379 .await?;
2380
2381 let mut rate_limits = Vec::new();
2382 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2383 let stream_node = stream_node.to_protobuf();
2384 visit_stream_node_body(&stream_node, |node| {
2385 let mut rate_limit = None;
2386 let mut node_name = None;
2387
2388 match node {
2389 PbNodeBody::Source(node) => {
2391 if let Some(node_inner) = &node.source_inner {
2392 rate_limit = node_inner.rate_limit;
2393 node_name = Some("SOURCE");
2394 }
2395 }
2396 PbNodeBody::StreamFsFetch(node) => {
2397 if let Some(node_inner) = &node.node_inner {
2398 rate_limit = node_inner.rate_limit;
2399 node_name = Some("FS_FETCH");
2400 }
2401 }
2402 PbNodeBody::SourceBackfill(node) => {
2404 rate_limit = node.rate_limit;
2405 node_name = Some("SOURCE_BACKFILL");
2406 }
2407 PbNodeBody::StreamScan(node) => {
2408 rate_limit = node.rate_limit;
2409 node_name = Some("STREAM_SCAN");
2410 }
2411 PbNodeBody::StreamCdcScan(node) => {
2412 rate_limit = node.rate_limit;
2413 node_name = Some("STREAM_CDC_SCAN");
2414 }
2415 PbNodeBody::Sink(node) => {
2416 rate_limit = node.rate_limit;
2417 node_name = Some("SINK");
2418 }
2419 _ => {}
2420 }
2421
2422 if let Some(rate_limit) = rate_limit {
2423 rate_limits.push(RateLimitInfo {
2424 fragment_id,
2425 job_id,
2426 fragment_type_mask: fragment_type_mask as u32,
2427 rate_limit,
2428 node_name: node_name.unwrap().to_owned(),
2429 });
2430 }
2431 });
2432 }
2433
2434 Ok(rate_limits)
2435 }
2436}
2437
2438fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
2439 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2441 Some(connector) => {
2442 let connector_type = connector.to_lowercase();
2443 let field_names: Vec<String> = props.keys().cloned().collect();
2444 check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2445 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2446
2447 match_sink_name_str!(
2448 connector_type.as_str(),
2449 SinkType,
2450 {
2451 let mut new_props = sink.properties.0.clone();
2452 new_props.extend(props.clone());
2453 SinkType::validate_alter_config(&new_props)
2454 },
2455 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2456 )?
2457 }
2458 None => {
2459 return Err(
2460 SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2461 );
2462 }
2463 };
2464 Ok(())
2465}
2466
2467fn update_stmt_with_props(
2468 with_properties: &mut Vec<SqlOption>,
2469 props: &BTreeMap<String, String>,
2470) -> MetaResult<()> {
2471 let mut new_sql_options = with_properties
2472 .iter()
2473 .map(|sql_option| (&sql_option.name, sql_option))
2474 .collect::<IndexMap<_, _>>();
2475 let add_sql_options = props
2476 .iter()
2477 .map(|(k, v)| SqlOption::try_from((k, v)))
2478 .collect::<Result<Vec<SqlOption>, ParserError>>()
2479 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2480 new_sql_options.extend(
2481 add_sql_options
2482 .iter()
2483 .map(|sql_option| (&sql_option.name, sql_option)),
2484 );
2485 *with_properties = new_sql_options.into_values().cloned().collect();
2486 Ok(())
2487}
2488
2489async fn update_sink_fragment_props(
2490 txn: &DatabaseTransaction,
2491 sink_id: SinkId,
2492 props: BTreeMap<String, String>,
2493) -> MetaResult<()> {
2494 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2495 .select_only()
2496 .columns([
2497 fragment::Column::FragmentId,
2498 fragment::Column::FragmentTypeMask,
2499 fragment::Column::StreamNode,
2500 ])
2501 .filter(fragment::Column::JobId.eq(sink_id))
2502 .into_tuple()
2503 .all(txn)
2504 .await?;
2505 let fragments = fragments
2506 .into_iter()
2507 .filter(|(_, fragment_type_mask, _)| {
2508 *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
2509 })
2510 .filter_map(|(id, _, stream_node)| {
2511 let mut stream_node = stream_node.to_protobuf();
2512 let mut found = false;
2513 visit_stream_node_mut(&mut stream_node, |node| {
2514 if let PbNodeBody::Sink(node) = node
2515 && let Some(sink_desc) = &mut node.sink_desc
2516 && sink_desc.id == sink_id
2517 {
2518 sink_desc.properties.extend(props.clone());
2519 found = true;
2520 }
2521 });
2522 if found { Some((id, stream_node)) } else { None }
2523 })
2524 .collect_vec();
2525 assert!(
2526 !fragments.is_empty(),
2527 "sink id should be used by at least one fragment"
2528 );
2529 for (id, stream_node) in fragments {
2530 fragment::ActiveModel {
2531 fragment_id: Set(id),
2532 stream_node: Set(StreamNode::from(&stream_node)),
2533 ..Default::default()
2534 }
2535 .update(txn)
2536 .await?;
2537 }
2538 Ok(())
2539}
2540
2541pub struct SinkIntoTableContext {
2542 pub updated_sink_catalogs: Vec<SinkId>,
2545}
2546
2547pub struct FinishAutoRefreshSchemaSinkContext {
2548 pub tmp_sink_id: SinkId,
2549 pub original_sink_id: SinkId,
2550 pub columns: Vec<PbColumnCatalog>,
2551 pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
2552}
2553
2554async fn update_connector_props_fragments<F>(
2555 txn: &DatabaseTransaction,
2556 job_ids: Vec<JobId>,
2557 expect_flag: FragmentTypeFlag,
2558 mut alter_stream_node_fn: F,
2559 is_shared_source: bool,
2560) -> MetaResult<()>
2561where
2562 F: FnMut(&mut PbNodeBody, &mut bool),
2563{
2564 let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
2565 .select_only()
2566 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
2567 .filter(
2568 fragment::Column::JobId
2569 .is_in(job_ids.clone())
2570 .and(FragmentTypeMask::intersects(expect_flag)),
2571 )
2572 .into_tuple()
2573 .all(txn)
2574 .await?;
2575 let fragments = fragments
2576 .into_iter()
2577 .filter_map(|(id, stream_node)| {
2578 let mut stream_node = stream_node.to_protobuf();
2579 let mut found = false;
2580 visit_stream_node_mut(&mut stream_node, |node| {
2581 alter_stream_node_fn(node, &mut found);
2582 });
2583 if found { Some((id, stream_node)) } else { None }
2584 })
2585 .collect_vec();
2586 if is_shared_source || job_ids.len() > 1 {
2587 assert!(
2591 !fragments.is_empty(),
2592 "job ids {:?} (type: {:?}) should be used by at least one fragment",
2593 job_ids,
2594 expect_flag
2595 );
2596 }
2597
2598 for (id, stream_node) in fragments {
2599 fragment::ActiveModel {
2600 fragment_id: Set(id),
2601 stream_node: Set(StreamNode::from(&stream_node)),
2602 ..Default::default()
2603 }
2604 .update(txn)
2605 .await?;
2606 }
2607
2608 Ok(())
2609}