1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::util::iter_util::ZipEqDebug;
26use risingwave_common::util::stream_graph_visitor::{
27 visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_common::{bail, current_cluster_version};
30use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
31use risingwave_connector::error::ConnectorError;
32use risingwave_connector::sink::file_sink::fs::FsSink;
33use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
34use risingwave_connector::source::ConnectorProperties;
35use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
36use risingwave_meta_model::object::ObjectType;
37use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
38use risingwave_meta_model::refresh_job::RefreshState;
39use risingwave_meta_model::table::TableType;
40use risingwave_meta_model::user_privilege::Action;
41use risingwave_meta_model::*;
42use risingwave_pb::catalog::{PbCreateType, PbTable};
43use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
44use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
45use risingwave_pb::meta::object::PbObjectInfo;
46use risingwave_pb::meta::subscribe_response::{
47 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
48};
49use risingwave_pb::meta::{PbObject, PbObjectGroup};
50use risingwave_pb::plan_common::PbColumnCatalog;
51use risingwave_pb::plan_common::source_refresh_mode::{
52 RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
53};
54use risingwave_pb::secret::PbSecretRef;
55use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
56use risingwave_pb::stream_plan::stream_node::PbNodeBody;
57use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
58use risingwave_pb::user::PbUserInfo;
59use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
60use risingwave_sqlparser::parser::{Parser, ParserError};
61use sea_orm::ActiveValue::Set;
62use sea_orm::sea_query::{Expr, Query, SimpleExpr};
63use sea_orm::{
64 ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
65 ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
66};
67use thiserror_ext::AsReport;
68
69use super::rename::IndexItemRewriter;
70use crate::barrier::{Command, SharedFragmentInfo};
71use crate::controller::ObjectModel;
72use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
73use crate::controller::fragment::FragmentTypeMaskExt;
74use crate::controller::utils::{
75 PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
76 check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
77 ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
78 get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
79 list_user_info_by_ids, try_get_iceberg_table_by_downstream_sink,
80};
81use crate::error::MetaErrorInner;
82use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
83use crate::model::{
84 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
85 StreamJobFragmentsToCreate,
86};
87use crate::stream::SplitAssignment;
88use crate::{MetaError, MetaResult};
89
90impl CatalogController {
91 pub async fn create_streaming_job_obj(
92 txn: &DatabaseTransaction,
93 obj_type: ObjectType,
94 owner_id: UserId,
95 database_id: Option<DatabaseId>,
96 schema_id: Option<SchemaId>,
97 create_type: PbCreateType,
98 ctx: StreamContext,
99 streaming_parallelism: StreamingParallelism,
100 max_parallelism: usize,
101 specific_resource_group: Option<String>, ) -> MetaResult<JobId> {
103 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
104 let job_id = obj.oid.as_job_id();
105 let job = streaming_job::ActiveModel {
106 job_id: Set(job_id),
107 job_status: Set(JobStatus::Initial),
108 create_type: Set(create_type.into()),
109 timezone: Set(ctx.timezone),
110 config_override: Set(Some(ctx.config_override.to_string())),
111 parallelism: Set(streaming_parallelism),
112 max_parallelism: Set(max_parallelism as _),
113 specific_resource_group: Set(specific_resource_group),
114 };
115 job.insert(txn).await?;
116
117 Ok(job_id)
118 }
119
120 #[await_tree::instrument]
126 pub async fn create_job_catalog(
127 &self,
128 streaming_job: &mut StreamingJob,
129 ctx: &StreamContext,
130 parallelism: &Option<Parallelism>,
131 max_parallelism: usize,
132 mut dependencies: HashSet<ObjectId>,
133 specific_resource_group: Option<String>,
134 ) -> MetaResult<()> {
135 let inner = self.inner.write().await;
136 let txn = inner.db.begin().await?;
137 let create_type = streaming_job.create_type();
138
139 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
140 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
141 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
142 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
143 };
144
145 ensure_user_id(streaming_job.owner() as _, &txn).await?;
146 ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
147 ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
148 check_relation_name_duplicate(
149 &streaming_job.name(),
150 streaming_job.database_id(),
151 streaming_job.schema_id(),
152 &txn,
153 )
154 .await?;
155
156 if !dependencies.is_empty() {
158 let altering_cnt = ObjectDependency::find()
159 .join(
160 JoinType::InnerJoin,
161 object_dependency::Relation::Object1.def(),
162 )
163 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
164 .filter(
165 object_dependency::Column::Oid
166 .is_in(dependencies.clone())
167 .and(object::Column::ObjType.eq(ObjectType::Table))
168 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
169 .and(
170 object::Column::Oid.not_in_subquery(
172 Query::select()
173 .column(table::Column::TableId)
174 .from(Table)
175 .to_owned(),
176 ),
177 ),
178 )
179 .count(&txn)
180 .await?;
181 if altering_cnt != 0 {
182 return Err(MetaError::permission_denied(
183 "some dependent relations are being altered",
184 ));
185 }
186 }
187
188 match streaming_job {
189 StreamingJob::MaterializedView(table) => {
190 let job_id = Self::create_streaming_job_obj(
191 &txn,
192 ObjectType::Table,
193 table.owner as _,
194 Some(table.database_id),
195 Some(table.schema_id),
196 create_type,
197 ctx.clone(),
198 streaming_parallelism,
199 max_parallelism,
200 specific_resource_group,
201 )
202 .await?;
203 table.id = job_id.as_mv_table_id();
204 let table_model: table::ActiveModel = table.clone().into();
205 Table::insert(table_model).exec(&txn).await?;
206 }
207 StreamingJob::Sink(sink) => {
208 if let Some(target_table_id) = sink.target_table
209 && check_sink_into_table_cycle(
210 target_table_id.into(),
211 dependencies.iter().cloned().collect(),
212 &txn,
213 )
214 .await?
215 {
216 bail!("Creating such a sink will result in circular dependency.");
217 }
218
219 let job_id = Self::create_streaming_job_obj(
220 &txn,
221 ObjectType::Sink,
222 sink.owner as _,
223 Some(sink.database_id),
224 Some(sink.schema_id),
225 create_type,
226 ctx.clone(),
227 streaming_parallelism,
228 max_parallelism,
229 specific_resource_group,
230 )
231 .await?;
232 sink.id = job_id.as_sink_id();
233 let sink_model: sink::ActiveModel = sink.clone().into();
234 Sink::insert(sink_model).exec(&txn).await?;
235 }
236 StreamingJob::Table(src, table, _) => {
237 let job_id = Self::create_streaming_job_obj(
238 &txn,
239 ObjectType::Table,
240 table.owner as _,
241 Some(table.database_id),
242 Some(table.schema_id),
243 create_type,
244 ctx.clone(),
245 streaming_parallelism,
246 max_parallelism,
247 specific_resource_group,
248 )
249 .await?;
250 table.id = job_id.as_mv_table_id();
251 if let Some(src) = src {
252 let src_obj = Self::create_object(
253 &txn,
254 ObjectType::Source,
255 src.owner as _,
256 Some(src.database_id),
257 Some(src.schema_id),
258 )
259 .await?;
260 src.id = src_obj.oid.as_source_id();
261 src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
262 table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
263 let source: source::ActiveModel = src.clone().into();
264 Source::insert(source).exec(&txn).await?;
265 }
266 let table_model: table::ActiveModel = table.clone().into();
267 Table::insert(table_model).exec(&txn).await?;
268
269 if table.refreshable {
270 let trigger_interval_secs = src
271 .as_ref()
272 .and_then(|source_catalog| source_catalog.refresh_mode)
273 .and_then(
274 |source_refresh_mode| match source_refresh_mode.refresh_mode {
275 Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
276 refresh_interval_sec,
277 })) => refresh_interval_sec,
278 Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
279 None => None,
280 },
281 );
282
283 RefreshJob::insert(refresh_job::ActiveModel {
284 table_id: Set(table.id),
285 last_trigger_time: Set(None),
286 trigger_interval_secs: Set(trigger_interval_secs),
287 current_status: Set(RefreshState::Idle),
288 last_success_time: Set(None),
289 })
290 .exec(&txn)
291 .await?;
292 }
293 }
294 StreamingJob::Index(index, table) => {
295 ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
296 let job_id = Self::create_streaming_job_obj(
297 &txn,
298 ObjectType::Index,
299 index.owner as _,
300 Some(index.database_id),
301 Some(index.schema_id),
302 create_type,
303 ctx.clone(),
304 streaming_parallelism,
305 max_parallelism,
306 specific_resource_group,
307 )
308 .await?;
309 index.id = job_id.as_index_id();
311 index.index_table_id = job_id.as_mv_table_id();
312 table.id = job_id.as_mv_table_id();
313
314 ObjectDependency::insert(object_dependency::ActiveModel {
315 oid: Set(index.primary_table_id.into()),
316 used_by: Set(table.id.into()),
317 ..Default::default()
318 })
319 .exec(&txn)
320 .await?;
321
322 let table_model: table::ActiveModel = table.clone().into();
323 Table::insert(table_model).exec(&txn).await?;
324 let index_model: index::ActiveModel = index.clone().into();
325 Index::insert(index_model).exec(&txn).await?;
326 }
327 StreamingJob::Source(src) => {
328 let job_id = Self::create_streaming_job_obj(
329 &txn,
330 ObjectType::Source,
331 src.owner as _,
332 Some(src.database_id),
333 Some(src.schema_id),
334 create_type,
335 ctx.clone(),
336 streaming_parallelism,
337 max_parallelism,
338 specific_resource_group,
339 )
340 .await?;
341 src.id = job_id.as_shared_source_id();
342 let source_model: source::ActiveModel = src.clone().into();
343 Source::insert(source_model).exec(&txn).await?;
344 }
345 }
346
347 dependencies.extend(
349 streaming_job
350 .dependent_secret_ids()?
351 .into_iter()
352 .map(|id| id.as_object_id()),
353 );
354 dependencies.extend(
356 streaming_job
357 .dependent_connection_ids()?
358 .into_iter()
359 .map(|id| id.as_object_id()),
360 );
361
362 if !dependencies.is_empty() {
364 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
365 object_dependency::ActiveModel {
366 oid: Set(oid),
367 used_by: Set(streaming_job.id().as_object_id()),
368 ..Default::default()
369 }
370 }))
371 .exec(&txn)
372 .await?;
373 }
374
375 txn.commit().await?;
376
377 Ok(())
378 }
379
380 pub async fn create_internal_table_catalog(
388 &self,
389 job: &StreamingJob,
390 mut incomplete_internal_tables: Vec<PbTable>,
391 ) -> MetaResult<HashMap<TableId, TableId>> {
392 let job_id = job.id();
393 let inner = self.inner.write().await;
394 let txn = inner.db.begin().await?;
395
396 ensure_job_not_canceled(job_id, &txn).await?;
398
399 let mut table_id_map = HashMap::new();
400 for table in &mut incomplete_internal_tables {
401 let table_id = Self::create_object(
402 &txn,
403 ObjectType::Table,
404 table.owner as _,
405 Some(table.database_id),
406 Some(table.schema_id),
407 )
408 .await?
409 .oid
410 .as_table_id();
411 table_id_map.insert(table.id, table_id);
412 table.id = table_id;
413 table.job_id = Some(job_id);
414
415 let table_model = table::ActiveModel {
416 table_id: Set(table_id),
417 belongs_to_job_id: Set(Some(job_id)),
418 fragment_id: NotSet,
419 ..table.clone().into()
420 };
421 Table::insert(table_model).exec(&txn).await?;
422 }
423 txn.commit().await?;
424
425 Ok(table_id_map)
426 }
427
428 pub async fn prepare_stream_job_fragments(
429 &self,
430 stream_job_fragments: &StreamJobFragmentsToCreate,
431 streaming_job: &StreamingJob,
432 for_replace: bool,
433 ) -> MetaResult<()> {
434 self.prepare_streaming_job(
435 stream_job_fragments.stream_job_id(),
436 || stream_job_fragments.fragments.values(),
437 &stream_job_fragments.downstreams,
438 for_replace,
439 Some(streaming_job),
440 )
441 .await
442 }
443
444 #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
449 )]
450 pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
451 &self,
452 job_id: JobId,
453 get_fragments: impl Fn() -> I + 'a,
454 downstreams: &FragmentDownstreamRelation,
455 for_replace: bool,
456 creating_streaming_job: Option<&'a StreamingJob>,
457 ) -> MetaResult<()> {
458 let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
459
460 let inner = self.inner.write().await;
461
462 let need_notify = creating_streaming_job
463 .map(|job| job.should_notify_creating())
464 .unwrap_or(false);
465 let definition = creating_streaming_job.map(|job| job.definition());
466
467 let mut objects_to_notify = vec![];
468 let txn = inner.db.begin().await?;
469
470 ensure_job_not_canceled(job_id, &txn).await?;
472
473 for fragment in fragments {
474 let fragment_id = fragment.fragment_id;
475 let state_table_ids = fragment.state_table_ids.inner_ref().clone();
476
477 let fragment = fragment.into_active_model();
478 Fragment::insert(fragment).exec(&txn).await?;
479
480 if !for_replace {
483 let all_tables = StreamJobFragments::collect_tables(get_fragments());
484 for state_table_id in state_table_ids {
485 let table = all_tables
489 .get(&state_table_id)
490 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
491 assert_eq!(table.id, state_table_id);
492 assert_eq!(table.fragment_id, fragment_id);
493 let vnode_count = table.vnode_count();
494
495 table::ActiveModel {
496 table_id: Set(state_table_id as _),
497 fragment_id: Set(Some(fragment_id)),
498 vnode_count: Set(vnode_count as _),
499 ..Default::default()
500 }
501 .update(&txn)
502 .await?;
503
504 if need_notify {
505 let mut table = table.clone();
506 if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
508 table.definition = definition.clone().unwrap();
509 }
510 objects_to_notify.push(PbObject {
511 object_info: Some(PbObjectInfo::Table(table)),
512 });
513 }
514 }
515 }
516 }
517
518 if need_notify {
520 match creating_streaming_job.unwrap() {
521 StreamingJob::Table(Some(source), ..) => {
522 objects_to_notify.push(PbObject {
523 object_info: Some(PbObjectInfo::Source(source.clone())),
524 });
525 }
526 StreamingJob::Sink(sink) => {
527 objects_to_notify.push(PbObject {
528 object_info: Some(PbObjectInfo::Sink(sink.clone())),
529 });
530 }
531 StreamingJob::Index(index, _) => {
532 objects_to_notify.push(PbObject {
533 object_info: Some(PbObjectInfo::Index(index.clone())),
534 });
535 }
536 _ => {}
537 }
538 }
539
540 insert_fragment_relations(&txn, downstreams).await?;
541
542 if !for_replace {
543 if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
545 Table::update(table::ActiveModel {
546 table_id: Set(table.id),
547 dml_fragment_id: Set(table.dml_fragment_id),
548 ..Default::default()
549 })
550 .exec(&txn)
551 .await?;
552 }
553 }
554
555 txn.commit().await?;
556
557 if !objects_to_notify.is_empty() {
561 self.notify_frontend(
562 Operation::Add,
563 Info::ObjectGroup(PbObjectGroup {
564 objects: objects_to_notify,
565 }),
566 )
567 .await;
568 }
569
570 Ok(())
571 }
572
573 pub async fn build_cancel_command(
576 &self,
577 table_fragments: &StreamJobFragments,
578 ) -> MetaResult<Command> {
579 let inner = self.inner.read().await;
580 let txn = inner.db.begin().await?;
581
582 let dropped_sink_fragment_with_target =
583 if let Some(sink_fragment) = table_fragments.sink_fragment() {
584 let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
585 let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
586 sink_target_fragment
587 .get(&sink_fragment_id)
588 .map(|target_fragments| {
589 let target_fragment_id = *target_fragments
590 .first()
591 .expect("sink should have at least one downstream fragment");
592 (sink_fragment_id, target_fragment_id)
593 })
594 } else {
595 None
596 };
597
598 Ok(Command::DropStreamingJobs {
599 streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
600 actors: table_fragments.actor_ids().collect(),
601 unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
602 unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
603 dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
604 .into_iter()
605 .map(|(sink, target)| (target as _, vec![sink as _]))
606 .collect(),
607 })
608 }
609
610 #[await_tree::instrument]
614 pub async fn try_abort_creating_streaming_job(
615 &self,
616 mut job_id: JobId,
617 is_cancelled: bool,
618 ) -> MetaResult<(bool, Option<DatabaseId>)> {
619 let mut inner = self.inner.write().await;
620 let txn = inner.db.begin().await?;
621
622 let obj = Object::find_by_id(job_id).one(&txn).await?;
623 let Some(obj) = obj else {
624 tracing::warn!(
625 id = %job_id,
626 "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
627 );
628 return Ok((true, None));
629 };
630 let database_id = obj
631 .database_id
632 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
633 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
634
635 if !is_cancelled && let Some(streaming_job) = &streaming_job {
636 assert_ne!(streaming_job.job_status, JobStatus::Created);
637 if streaming_job.create_type == CreateType::Background
638 && streaming_job.job_status == JobStatus::Creating
639 {
640 if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
641 && check_if_belongs_to_iceberg_table(&txn, job_id).await?
642 {
643 } else {
645 tracing::warn!(
647 id = %job_id,
648 "streaming job is created in background and still in creating status"
649 );
650 return Ok((false, Some(database_id)));
651 }
652 }
653 }
654
655 let original_job_id = job_id;
657 let original_obj_type = obj.obj_type;
658
659 let iceberg_table_id =
660 try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
661 if let Some(iceberg_table_id) = iceberg_table_id {
662 let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
665 Object::delete_many()
666 .filter(
667 object::Column::Oid
668 .eq(job_id)
669 .or(object::Column::Oid.is_in(internal_tables)),
670 )
671 .exec(&txn)
672 .await?;
673 job_id = iceberg_table_id.as_job_id();
674 };
675
676 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
677
678 let mut objs = vec![];
680 let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
681
682 let mut need_notify =
683 streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
684 if !need_notify {
685 if let Some(table) = &table_obj {
686 need_notify = table.table_type == TableType::MaterializedView;
687 } else if original_obj_type == ObjectType::Sink {
688 need_notify = true;
689 }
690 }
691
692 if is_cancelled {
693 let dropped_tables = Table::find()
694 .find_also_related(Object)
695 .filter(
696 table::Column::TableId.is_in(
697 internal_table_ids
698 .iter()
699 .cloned()
700 .chain(table_obj.iter().map(|t| t.table_id as _)),
701 ),
702 )
703 .all(&txn)
704 .await?
705 .into_iter()
706 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
707 inner
708 .dropped_tables
709 .extend(dropped_tables.map(|t| (t.id, t)));
710 }
711
712 if need_notify {
713 if original_obj_type == ObjectType::Sink && original_job_id != job_id {
716 let orig_obj: Option<PartialObject> = Object::find_by_id(original_job_id)
717 .select_only()
718 .columns([
719 object::Column::Oid,
720 object::Column::ObjType,
721 object::Column::SchemaId,
722 object::Column::DatabaseId,
723 ])
724 .into_partial_model()
725 .one(&txn)
726 .await?;
727 if let Some(orig_obj) = orig_obj {
728 objs.push(orig_obj);
729 }
730 }
731
732 let obj: Option<PartialObject> = Object::find_by_id(job_id)
733 .select_only()
734 .columns([
735 object::Column::Oid,
736 object::Column::ObjType,
737 object::Column::SchemaId,
738 object::Column::DatabaseId,
739 ])
740 .into_partial_model()
741 .one(&txn)
742 .await?;
743 let obj =
744 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
745 objs.push(obj);
746 let internal_table_objs: Vec<PartialObject> = Object::find()
747 .select_only()
748 .columns([
749 object::Column::Oid,
750 object::Column::ObjType,
751 object::Column::SchemaId,
752 object::Column::DatabaseId,
753 ])
754 .join(JoinType::InnerJoin, object::Relation::Table.def())
755 .filter(table::Column::BelongsToJobId.eq(job_id))
756 .into_partial_model()
757 .all(&txn)
758 .await?;
759 objs.extend(internal_table_objs);
760 }
761
762 if table_obj.is_none()
764 && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_sink_id())
765 .select_only()
766 .column(sink::Column::TargetTable)
767 .into_tuple::<Option<TableId>>()
768 .one(&txn)
769 .await?
770 {
771 let tmp_id: Option<ObjectId> = ObjectDependency::find()
772 .select_only()
773 .column(object_dependency::Column::UsedBy)
774 .join(
775 JoinType::InnerJoin,
776 object_dependency::Relation::Object1.def(),
777 )
778 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
779 .filter(
780 object_dependency::Column::Oid
781 .eq(target_table_id)
782 .and(object::Column::ObjType.eq(ObjectType::Table))
783 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
784 )
785 .into_tuple()
786 .one(&txn)
787 .await?;
788 if let Some(tmp_id) = tmp_id {
789 tracing::warn!(
790 id = %tmp_id,
791 "aborting temp streaming job for sink into table"
792 );
793
794 Object::delete_by_id(tmp_id).exec(&txn).await?;
795 }
796 }
797
798 Object::delete_by_id(job_id).exec(&txn).await?;
799 if !internal_table_ids.is_empty() {
800 Object::delete_many()
801 .filter(object::Column::Oid.is_in(internal_table_ids))
802 .exec(&txn)
803 .await?;
804 }
805 if let Some(t) = &table_obj
806 && let Some(source_id) = t.optional_associated_source_id
807 {
808 Object::delete_by_id(source_id).exec(&txn).await?;
809 }
810
811 let err = if is_cancelled {
812 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
813 } else {
814 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
815 };
816 let abort_reason = format!("streaming job aborted {}", err.as_report());
817 for tx in inner
818 .creating_table_finish_notifier
819 .get_mut(&database_id)
820 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
821 .into_iter()
822 .flatten()
823 .flatten()
824 {
825 let _ = tx.send(Err(abort_reason.clone()));
826 }
827 txn.commit().await?;
828
829 if !objs.is_empty() {
830 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
833 .await;
834 }
835 Ok((true, Some(database_id)))
836 }
837
838 #[await_tree::instrument]
839 pub async fn post_collect_job_fragments(
840 &self,
841 job_id: JobId,
842 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
843 new_sink_downstream: Option<FragmentDownstreamRelation>,
844 split_assignment: Option<&SplitAssignment>,
845 ) -> MetaResult<()> {
846 let inner = self.inner.write().await;
847 let txn = inner.db.begin().await?;
848
849 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
850
851 if let Some(new_downstream) = new_sink_downstream {
852 insert_fragment_relations(&txn, &new_downstream).await?;
853 }
854
855 streaming_job::ActiveModel {
857 job_id: Set(job_id),
858 job_status: Set(JobStatus::Creating),
859 ..Default::default()
860 }
861 .update(&txn)
862 .await?;
863
864 if let Some(split_assignment) = split_assignment {
865 let fragment_splits = split_assignment
866 .iter()
867 .map(|(fragment_id, splits)| {
868 (
869 *fragment_id as _,
870 splits.values().flatten().cloned().collect_vec(),
871 )
872 })
873 .collect();
874
875 self.update_fragment_splits(&txn, &fragment_splits).await?;
876 }
877
878 txn.commit().await?;
879
880 Ok(())
881 }
882
883 pub async fn create_job_catalog_for_replace(
884 &self,
885 streaming_job: &StreamingJob,
886 ctx: Option<&StreamContext>,
887 specified_parallelism: Option<&NonZeroUsize>,
888 expected_original_max_parallelism: Option<usize>,
889 ) -> MetaResult<JobId> {
890 let id = streaming_job.id();
891 let inner = self.inner.write().await;
892 let txn = inner.db.begin().await?;
893
894 streaming_job.verify_version_for_replace(&txn).await?;
896 let referring_cnt = ObjectDependency::find()
898 .join(
899 JoinType::InnerJoin,
900 object_dependency::Relation::Object1.def(),
901 )
902 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
903 .filter(
904 object_dependency::Column::Oid
905 .eq(id)
906 .and(object::Column::ObjType.eq(ObjectType::Table))
907 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
908 )
909 .count(&txn)
910 .await?;
911 if referring_cnt != 0 {
912 return Err(MetaError::permission_denied(
913 "job is being altered or referenced by some creating jobs",
914 ));
915 }
916
917 let (original_max_parallelism, original_timezone, original_config_override): (
919 i32,
920 Option<String>,
921 Option<String>,
922 ) = StreamingJobModel::find_by_id(id)
923 .select_only()
924 .column(streaming_job::Column::MaxParallelism)
925 .column(streaming_job::Column::Timezone)
926 .column(streaming_job::Column::ConfigOverride)
927 .into_tuple()
928 .one(&txn)
929 .await?
930 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
931
932 if let Some(max_parallelism) = expected_original_max_parallelism
933 && original_max_parallelism != max_parallelism as i32
934 {
935 bail!(
938 "cannot use a different max parallelism \
939 when replacing streaming job, \
940 original: {}, new: {}",
941 original_max_parallelism,
942 max_parallelism
943 );
944 }
945
946 let parallelism = match specified_parallelism {
947 None => StreamingParallelism::Adaptive,
948 Some(n) => StreamingParallelism::Fixed(n.get() as _),
949 };
950
951 let ctx = StreamContext {
952 timezone: ctx
953 .map(|ctx| ctx.timezone.clone())
954 .unwrap_or(original_timezone),
955 config_override: original_config_override.unwrap_or_default().into(),
958 };
959
960 let new_obj_id = Self::create_streaming_job_obj(
962 &txn,
963 streaming_job.object_type(),
964 streaming_job.owner() as _,
965 Some(streaming_job.database_id() as _),
966 Some(streaming_job.schema_id() as _),
967 streaming_job.create_type(),
968 ctx,
969 parallelism,
970 original_max_parallelism as _,
971 None,
972 )
973 .await?;
974
975 ObjectDependency::insert(object_dependency::ActiveModel {
977 oid: Set(id.as_object_id()),
978 used_by: Set(new_obj_id.as_object_id()),
979 ..Default::default()
980 })
981 .exec(&txn)
982 .await?;
983
984 txn.commit().await?;
985
986 Ok(new_obj_id)
987 }
988
989 pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
991 let mut inner = self.inner.write().await;
992 let txn = inner.db.begin().await?;
993
994 if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
996 tracing::info!(
997 "streaming job {} is for iceberg table, wait for manual finish operation",
998 job_id
999 );
1000 return Ok(());
1001 }
1002
1003 let (notification_op, objects, updated_user_info) =
1004 Self::finish_streaming_job_inner(&txn, job_id).await?;
1005
1006 txn.commit().await?;
1007
1008 let mut version = self
1009 .notify_frontend(
1010 notification_op,
1011 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1012 )
1013 .await;
1014
1015 if !updated_user_info.is_empty() {
1017 version = self.notify_users_update(updated_user_info).await;
1018 }
1019
1020 inner
1021 .creating_table_finish_notifier
1022 .values_mut()
1023 .for_each(|creating_tables| {
1024 if let Some(txs) = creating_tables.remove(&job_id) {
1025 for tx in txs {
1026 let _ = tx.send(Ok(version));
1027 }
1028 }
1029 });
1030
1031 Ok(())
1032 }
1033
1034 pub async fn finish_streaming_job_inner(
1036 txn: &DatabaseTransaction,
1037 job_id: JobId,
1038 ) -> MetaResult<(Operation, Vec<risingwave_pb::meta::Object>, Vec<PbUserInfo>)> {
1039 let job_type = Object::find_by_id(job_id)
1040 .select_only()
1041 .column(object::Column::ObjType)
1042 .into_tuple()
1043 .one(txn)
1044 .await?
1045 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1046
1047 let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1048 .select_only()
1049 .column(streaming_job::Column::CreateType)
1050 .into_tuple()
1051 .one(txn)
1052 .await?
1053 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1054
1055 let res = Object::update_many()
1057 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1058 .col_expr(
1059 object::Column::CreatedAtClusterVersion,
1060 current_cluster_version().into(),
1061 )
1062 .filter(object::Column::Oid.eq(job_id))
1063 .exec(txn)
1064 .await?;
1065 if res.rows_affected == 0 {
1066 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1067 }
1068
1069 let job = streaming_job::ActiveModel {
1071 job_id: Set(job_id),
1072 job_status: Set(JobStatus::Created),
1073 ..Default::default()
1074 };
1075 job.update(txn).await?;
1076
1077 let internal_table_objs = Table::find()
1079 .find_also_related(Object)
1080 .filter(table::Column::BelongsToJobId.eq(job_id))
1081 .all(txn)
1082 .await?;
1083 let mut objects = internal_table_objs
1084 .iter()
1085 .map(|(table, obj)| PbObject {
1086 object_info: Some(PbObjectInfo::Table(
1087 ObjectModel(table.clone(), obj.clone().unwrap()).into(),
1088 )),
1089 })
1090 .collect_vec();
1091 let mut notification_op = if create_type == CreateType::Background {
1092 NotificationOperation::Update
1093 } else {
1094 NotificationOperation::Add
1095 };
1096 let mut updated_user_info = vec![];
1097 let mut need_grant_default_privileges = true;
1098
1099 match job_type {
1100 ObjectType::Table => {
1101 let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1102 .find_also_related(Object)
1103 .one(txn)
1104 .await?
1105 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1106 if table.table_type == TableType::MaterializedView {
1107 notification_op = NotificationOperation::Update;
1108 }
1109
1110 if let Some(source_id) = table.optional_associated_source_id {
1111 let (src, obj) = Source::find_by_id(source_id)
1112 .find_also_related(Object)
1113 .one(txn)
1114 .await?
1115 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1116 objects.push(PbObject {
1117 object_info: Some(PbObjectInfo::Source(
1118 ObjectModel(src, obj.unwrap()).into(),
1119 )),
1120 });
1121 }
1122 objects.push(PbObject {
1123 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1124 });
1125 }
1126 ObjectType::Sink => {
1127 let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1128 .find_also_related(Object)
1129 .one(txn)
1130 .await?
1131 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1132 if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
1133 need_grant_default_privileges = false;
1134 }
1135 notification_op = NotificationOperation::Update;
1138 objects.push(PbObject {
1139 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
1140 });
1141 }
1142 ObjectType::Index => {
1143 need_grant_default_privileges = false;
1144 let (index, obj) = Index::find_by_id(job_id.as_index_id())
1145 .find_also_related(Object)
1146 .one(txn)
1147 .await?
1148 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1149 {
1150 let (table, obj) = Table::find_by_id(index.index_table_id)
1151 .find_also_related(Object)
1152 .one(txn)
1153 .await?
1154 .ok_or_else(|| {
1155 MetaError::catalog_id_not_found("table", index.index_table_id)
1156 })?;
1157 objects.push(PbObject {
1158 object_info: Some(PbObjectInfo::Table(
1159 ObjectModel(table, obj.unwrap()).into(),
1160 )),
1161 });
1162 }
1163
1164 let primary_table_privileges = UserPrivilege::find()
1167 .filter(
1168 user_privilege::Column::Oid
1169 .eq(index.primary_table_id)
1170 .and(user_privilege::Column::Action.eq(Action::Select)),
1171 )
1172 .all(txn)
1173 .await?;
1174 if !primary_table_privileges.is_empty() {
1175 let index_state_table_ids: Vec<TableId> = Table::find()
1176 .select_only()
1177 .column(table::Column::TableId)
1178 .filter(
1179 table::Column::BelongsToJobId
1180 .eq(job_id)
1181 .or(table::Column::TableId.eq(index.index_table_id)),
1182 )
1183 .into_tuple()
1184 .all(txn)
1185 .await?;
1186 let mut new_privileges = vec![];
1187 for privilege in &primary_table_privileges {
1188 for state_table_id in &index_state_table_ids {
1189 new_privileges.push(user_privilege::ActiveModel {
1190 id: Default::default(),
1191 oid: Set(state_table_id.as_object_id()),
1192 user_id: Set(privilege.user_id),
1193 action: Set(Action::Select),
1194 dependent_id: Set(privilege.dependent_id),
1195 granted_by: Set(privilege.granted_by),
1196 with_grant_option: Set(privilege.with_grant_option),
1197 });
1198 }
1199 }
1200 UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1201
1202 updated_user_info = list_user_info_by_ids(
1203 primary_table_privileges.into_iter().map(|p| p.user_id),
1204 txn,
1205 )
1206 .await?;
1207 }
1208
1209 objects.push(PbObject {
1210 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1211 });
1212 }
1213 ObjectType::Source => {
1214 let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1215 .find_also_related(Object)
1216 .one(txn)
1217 .await?
1218 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1219 objects.push(PbObject {
1220 object_info: Some(PbObjectInfo::Source(
1221 ObjectModel(source, obj.unwrap()).into(),
1222 )),
1223 });
1224 }
1225 _ => unreachable!("invalid job type: {:?}", job_type),
1226 }
1227
1228 if need_grant_default_privileges {
1229 updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1230 }
1231
1232 Ok((notification_op, objects, updated_user_info))
1233 }
1234
1235 pub async fn finish_replace_streaming_job(
1236 &self,
1237 tmp_id: JobId,
1238 streaming_job: StreamingJob,
1239 replace_upstream: FragmentReplaceUpstream,
1240 sink_into_table_context: SinkIntoTableContext,
1241 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1242 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1243 ) -> MetaResult<NotificationVersion> {
1244 let inner = self.inner.write().await;
1245 let txn = inner.db.begin().await?;
1246
1247 let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1248 tmp_id,
1249 replace_upstream,
1250 sink_into_table_context,
1251 &txn,
1252 streaming_job,
1253 drop_table_connector_ctx,
1254 auto_refresh_schema_sinks,
1255 )
1256 .await?;
1257
1258 txn.commit().await?;
1259
1260 let mut version = self
1261 .notify_frontend(
1262 NotificationOperation::Update,
1263 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1264 )
1265 .await;
1266
1267 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1268 self.notify_users_update(user_infos).await;
1269 version = self
1270 .notify_frontend(
1271 NotificationOperation::Delete,
1272 build_object_group_for_delete(to_drop_objects),
1273 )
1274 .await;
1275 }
1276
1277 Ok(version)
1278 }
1279
1280 pub async fn finish_replace_streaming_job_inner(
1281 tmp_id: JobId,
1282 replace_upstream: FragmentReplaceUpstream,
1283 SinkIntoTableContext {
1284 updated_sink_catalogs,
1285 }: SinkIntoTableContext,
1286 txn: &DatabaseTransaction,
1287 streaming_job: StreamingJob,
1288 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1289 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1290 ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1291 let original_job_id = streaming_job.id();
1292 let job_type = streaming_job.job_type();
1293
1294 let mut index_item_rewriter = None;
1295
1296 match streaming_job {
1298 StreamingJob::Table(_source, table, _table_job_type) => {
1299 let original_column_catalogs =
1302 get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1303
1304 index_item_rewriter = Some({
1305 let original_columns = original_column_catalogs
1306 .to_protobuf()
1307 .into_iter()
1308 .map(|c| c.column_desc.unwrap())
1309 .collect_vec();
1310 let new_columns = table
1311 .columns
1312 .iter()
1313 .map(|c| c.column_desc.clone().unwrap())
1314 .collect_vec();
1315
1316 IndexItemRewriter {
1317 original_columns,
1318 new_columns,
1319 }
1320 });
1321
1322 for sink_id in updated_sink_catalogs {
1324 sink::ActiveModel {
1325 sink_id: Set(sink_id as _),
1326 original_target_columns: Set(Some(original_column_catalogs.clone())),
1327 ..Default::default()
1328 }
1329 .update(txn)
1330 .await?;
1331 }
1332 let mut table = table::ActiveModel::from(table);
1334 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1335 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1336 {
1337 table.optional_associated_source_id = Set(None);
1339 }
1340
1341 table.update(txn).await?;
1342 }
1343 StreamingJob::Source(source) => {
1344 let source = source::ActiveModel::from(source);
1346 source.update(txn).await?;
1347 }
1348 StreamingJob::MaterializedView(table) => {
1349 let table = table::ActiveModel::from(table);
1351 table.update(txn).await?;
1352 }
1353 _ => unreachable!(
1354 "invalid streaming job type: {:?}",
1355 streaming_job.job_type_str()
1356 ),
1357 }
1358
1359 async fn finish_fragments(
1360 txn: &DatabaseTransaction,
1361 tmp_id: JobId,
1362 original_job_id: JobId,
1363 replace_upstream: FragmentReplaceUpstream,
1364 ) -> MetaResult<()> {
1365 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1369 .select_only()
1370 .columns([
1371 fragment::Column::FragmentId,
1372 fragment::Column::StateTableIds,
1373 ])
1374 .filter(fragment::Column::JobId.eq(tmp_id))
1375 .into_tuple()
1376 .all(txn)
1377 .await?;
1378 for (fragment_id, state_table_ids) in fragment_info {
1379 for state_table_id in state_table_ids.into_inner() {
1380 let state_table_id = TableId::new(state_table_id as _);
1381 table::ActiveModel {
1382 table_id: Set(state_table_id),
1383 fragment_id: Set(Some(fragment_id)),
1384 ..Default::default()
1386 }
1387 .update(txn)
1388 .await?;
1389 }
1390 }
1391
1392 Fragment::delete_many()
1394 .filter(fragment::Column::JobId.eq(original_job_id))
1395 .exec(txn)
1396 .await?;
1397 Fragment::update_many()
1398 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1399 .filter(fragment::Column::JobId.eq(tmp_id))
1400 .exec(txn)
1401 .await?;
1402
1403 for (fragment_id, fragment_replace_map) in replace_upstream {
1406 let (fragment_id, mut stream_node) =
1407 Fragment::find_by_id(fragment_id as FragmentId)
1408 .select_only()
1409 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1410 .into_tuple::<(FragmentId, StreamNode)>()
1411 .one(txn)
1412 .await?
1413 .map(|(id, node)| (id, node.to_protobuf()))
1414 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1415
1416 visit_stream_node_mut(&mut stream_node, |body| {
1417 if let PbNodeBody::Merge(m) = body
1418 && let Some(new_fragment_id) =
1419 fragment_replace_map.get(&m.upstream_fragment_id)
1420 {
1421 m.upstream_fragment_id = *new_fragment_id;
1422 }
1423 });
1424 fragment::ActiveModel {
1425 fragment_id: Set(fragment_id),
1426 stream_node: Set(StreamNode::from(&stream_node)),
1427 ..Default::default()
1428 }
1429 .update(txn)
1430 .await?;
1431 }
1432
1433 Object::delete_by_id(tmp_id).exec(txn).await?;
1435
1436 Ok(())
1437 }
1438
1439 finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1440
1441 let mut objects = vec![];
1443 match job_type {
1444 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1445 let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1446 .find_also_related(Object)
1447 .one(txn)
1448 .await?
1449 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1450 objects.push(PbObject {
1451 object_info: Some(PbObjectInfo::Table(
1452 ObjectModel(table, table_obj.unwrap()).into(),
1453 )),
1454 })
1455 }
1456 StreamingJobType::Source => {
1457 let (source, source_obj) =
1458 Source::find_by_id(original_job_id.as_shared_source_id())
1459 .find_also_related(Object)
1460 .one(txn)
1461 .await?
1462 .ok_or_else(|| {
1463 MetaError::catalog_id_not_found("object", original_job_id)
1464 })?;
1465 objects.push(PbObject {
1466 object_info: Some(PbObjectInfo::Source(
1467 ObjectModel(source, source_obj.unwrap()).into(),
1468 )),
1469 })
1470 }
1471 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1472 }
1473
1474 if let Some(expr_rewriter) = index_item_rewriter {
1475 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1476 .select_only()
1477 .columns([index::Column::IndexId, index::Column::IndexItems])
1478 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1479 .into_tuple()
1480 .all(txn)
1481 .await?;
1482 for (index_id, nodes) in index_items {
1483 let mut pb_nodes = nodes.to_protobuf();
1484 pb_nodes
1485 .iter_mut()
1486 .for_each(|x| expr_rewriter.rewrite_expr(x));
1487 let index = index::ActiveModel {
1488 index_id: Set(index_id),
1489 index_items: Set(pb_nodes.into()),
1490 ..Default::default()
1491 }
1492 .update(txn)
1493 .await?;
1494 let index_obj = index
1495 .find_related(Object)
1496 .one(txn)
1497 .await?
1498 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1499 objects.push(PbObject {
1500 object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1501 });
1502 }
1503 }
1504
1505 if let Some(sinks) = auto_refresh_schema_sinks {
1506 for finish_sink_context in sinks {
1507 finish_fragments(
1508 txn,
1509 finish_sink_context.tmp_sink_id.as_job_id(),
1510 finish_sink_context.original_sink_id.as_job_id(),
1511 Default::default(),
1512 )
1513 .await?;
1514 let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1515 .find_also_related(Object)
1516 .one(txn)
1517 .await?
1518 .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1519 let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1520 Sink::update(sink::ActiveModel {
1521 sink_id: Set(finish_sink_context.original_sink_id),
1522 columns: Set(columns.clone()),
1523 ..Default::default()
1524 })
1525 .exec(txn)
1526 .await?;
1527 sink.columns = columns;
1528 objects.push(PbObject {
1529 object_info: Some(PbObjectInfo::Sink(
1530 ObjectModel(sink, sink_obj.unwrap()).into(),
1531 )),
1532 });
1533 if let Some((log_store_table_id, new_log_store_table_columns)) =
1534 finish_sink_context.new_log_store_table
1535 {
1536 let new_log_store_table_columns: ColumnCatalogArray =
1537 new_log_store_table_columns.into();
1538 let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1539 .find_also_related(Object)
1540 .one(txn)
1541 .await?
1542 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1543 Table::update(table::ActiveModel {
1544 table_id: Set(log_store_table_id),
1545 columns: Set(new_log_store_table_columns.clone()),
1546 ..Default::default()
1547 })
1548 .exec(txn)
1549 .await?;
1550 table.columns = new_log_store_table_columns;
1551 objects.push(PbObject {
1552 object_info: Some(PbObjectInfo::Table(
1553 ObjectModel(table, table_obj.unwrap()).into(),
1554 )),
1555 });
1556 }
1557 }
1558 }
1559
1560 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1561 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1562 notification_objs =
1563 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1564 }
1565
1566 Ok((objects, notification_objs))
1567 }
1568
1569 pub async fn try_abort_replacing_streaming_job(
1571 &self,
1572 tmp_job_id: JobId,
1573 tmp_sink_ids: Option<Vec<ObjectId>>,
1574 ) -> MetaResult<()> {
1575 let inner = self.inner.write().await;
1576 let txn = inner.db.begin().await?;
1577 Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1578 if let Some(tmp_sink_ids) = tmp_sink_ids {
1579 for tmp_sink_id in tmp_sink_ids {
1580 Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1581 }
1582 }
1583 txn.commit().await?;
1584 Ok(())
1585 }
1586
1587 pub async fn update_source_rate_limit_by_source_id(
1590 &self,
1591 source_id: SourceId,
1592 rate_limit: Option<u32>,
1593 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1594 let inner = self.inner.read().await;
1595 let txn = inner.db.begin().await?;
1596
1597 {
1598 let active_source = source::ActiveModel {
1599 source_id: Set(source_id),
1600 rate_limit: Set(rate_limit.map(|v| v as i32)),
1601 ..Default::default()
1602 };
1603 active_source.update(&txn).await?;
1604 }
1605
1606 let (source, obj) = Source::find_by_id(source_id)
1607 .find_also_related(Object)
1608 .one(&txn)
1609 .await?
1610 .ok_or_else(|| {
1611 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1612 })?;
1613
1614 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1615 let streaming_job_ids: Vec<JobId> =
1616 if let Some(table_id) = source.optional_associated_table_id {
1617 vec![table_id.as_job_id()]
1618 } else if let Some(source_info) = &source.source_info
1619 && source_info.to_protobuf().is_shared()
1620 {
1621 vec![source_id.as_share_source_job_id()]
1622 } else {
1623 ObjectDependency::find()
1624 .select_only()
1625 .column(object_dependency::Column::UsedBy)
1626 .filter(object_dependency::Column::Oid.eq(source_id))
1627 .into_tuple()
1628 .all(&txn)
1629 .await?
1630 };
1631
1632 if streaming_job_ids.is_empty() {
1633 return Err(MetaError::invalid_parameter(format!(
1634 "source id {source_id} not used by any streaming job"
1635 )));
1636 }
1637
1638 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1639 .select_only()
1640 .columns([
1641 fragment::Column::FragmentId,
1642 fragment::Column::FragmentTypeMask,
1643 fragment::Column::StreamNode,
1644 ])
1645 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1646 .into_tuple()
1647 .all(&txn)
1648 .await?;
1649 let mut fragments = fragments
1650 .into_iter()
1651 .map(|(id, mask, stream_node)| {
1652 (
1653 id,
1654 FragmentTypeMask::from(mask as u32),
1655 stream_node.to_protobuf(),
1656 )
1657 })
1658 .collect_vec();
1659
1660 fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1661 let mut found = false;
1662 if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1663 visit_stream_node_mut(stream_node, |node| {
1664 if let PbNodeBody::Source(node) = node
1665 && let Some(node_inner) = &mut node.source_inner
1666 && node_inner.source_id == source_id
1667 {
1668 node_inner.rate_limit = rate_limit;
1669 found = true;
1670 }
1671 });
1672 }
1673 if is_fs_source {
1674 visit_stream_node_mut(stream_node, |node| {
1677 if let PbNodeBody::StreamFsFetch(node) = node {
1678 fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1679 if let Some(node_inner) = &mut node.node_inner
1680 && node_inner.source_id == source_id
1681 {
1682 node_inner.rate_limit = rate_limit;
1683 found = true;
1684 }
1685 }
1686 });
1687 }
1688 found
1689 });
1690
1691 assert!(
1692 !fragments.is_empty(),
1693 "source id should be used by at least one fragment"
1694 );
1695 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1696
1697 for (id, fragment_type_mask, stream_node) in fragments {
1698 fragment::ActiveModel {
1699 fragment_id: Set(id),
1700 fragment_type_mask: Set(fragment_type_mask.into()),
1701 stream_node: Set(StreamNode::from(&stream_node)),
1702 ..Default::default()
1703 }
1704 .update(&txn)
1705 .await?;
1706 }
1707
1708 txn.commit().await?;
1709
1710 let fragment_actors = self.get_fragment_actors_from_running_info(fragment_ids.into_iter());
1711
1712 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1713 let _version = self
1714 .notify_frontend(
1715 NotificationOperation::Update,
1716 NotificationInfo::ObjectGroup(PbObjectGroup {
1717 objects: vec![PbObject {
1718 object_info: Some(relation_info),
1719 }],
1720 }),
1721 )
1722 .await;
1723
1724 Ok(fragment_actors)
1725 }
1726
1727 fn get_fragment_actors_from_running_info(
1728 &self,
1729 fragment_ids: impl Iterator<Item = FragmentId>,
1730 ) -> HashMap<FragmentId, Vec<ActorId>> {
1731 let mut fragment_actors: HashMap<FragmentId, Vec<ActorId>> = HashMap::new();
1732
1733 let info = self.env.shared_actor_infos().read_guard();
1734
1735 for fragment_id in fragment_ids {
1736 let SharedFragmentInfo { actors, .. } = info.get_fragment(fragment_id).unwrap();
1737 fragment_actors
1738 .entry(fragment_id as _)
1739 .or_default()
1740 .extend(actors.keys().copied());
1741 }
1742
1743 fragment_actors
1744 }
1745
1746 pub async fn mutate_fragments_by_job_id(
1749 &self,
1750 job_id: JobId,
1751 mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1753 err_msg: &'static str,
1755 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1756 let inner = self.inner.read().await;
1757 let txn = inner.db.begin().await?;
1758
1759 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1760 .select_only()
1761 .columns([
1762 fragment::Column::FragmentId,
1763 fragment::Column::FragmentTypeMask,
1764 fragment::Column::StreamNode,
1765 ])
1766 .filter(fragment::Column::JobId.eq(job_id))
1767 .into_tuple()
1768 .all(&txn)
1769 .await?;
1770 let mut fragments = fragments
1771 .into_iter()
1772 .map(|(id, mask, stream_node)| {
1773 (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1774 })
1775 .collect_vec();
1776
1777 let fragments = fragments
1778 .iter_mut()
1779 .map(|(_, fragment_type_mask, stream_node)| {
1780 fragments_mutation_fn(*fragment_type_mask, stream_node)
1781 })
1782 .collect::<MetaResult<Vec<bool>>>()?
1783 .into_iter()
1784 .zip_eq_debug(std::mem::take(&mut fragments))
1785 .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1786 .collect::<Vec<_>>();
1787
1788 if fragments.is_empty() {
1789 return Err(MetaError::invalid_parameter(format!(
1790 "job id {job_id}: {}",
1791 err_msg
1792 )));
1793 }
1794
1795 let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
1796 for (id, _, stream_node) in fragments {
1797 fragment::ActiveModel {
1798 fragment_id: Set(id),
1799 stream_node: Set(StreamNode::from(&stream_node)),
1800 ..Default::default()
1801 }
1802 .update(&txn)
1803 .await?;
1804 }
1805
1806 txn.commit().await?;
1807
1808 let fragment_actors =
1809 self.get_fragment_actors_from_running_info(fragment_ids.iter().copied());
1810
1811 Ok(fragment_actors)
1812 }
1813
1814 async fn mutate_fragment_by_fragment_id(
1815 &self,
1816 fragment_id: FragmentId,
1817 mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1818 err_msg: &'static str,
1819 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1820 let inner = self.inner.read().await;
1821 let txn = inner.db.begin().await?;
1822
1823 let (fragment_type_mask, stream_node): (i32, StreamNode) =
1824 Fragment::find_by_id(fragment_id)
1825 .select_only()
1826 .columns([
1827 fragment::Column::FragmentTypeMask,
1828 fragment::Column::StreamNode,
1829 ])
1830 .into_tuple()
1831 .one(&txn)
1832 .await?
1833 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1834 let mut pb_stream_node = stream_node.to_protobuf();
1835 let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1836
1837 if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1838 return Err(MetaError::invalid_parameter(format!(
1839 "fragment id {fragment_id}: {}",
1840 err_msg
1841 )));
1842 }
1843
1844 fragment::ActiveModel {
1845 fragment_id: Set(fragment_id),
1846 stream_node: Set(stream_node),
1847 ..Default::default()
1848 }
1849 .update(&txn)
1850 .await?;
1851
1852 let fragment_actors =
1853 self.get_fragment_actors_from_running_info(std::iter::once(fragment_id));
1854
1855 txn.commit().await?;
1856
1857 Ok(fragment_actors)
1858 }
1859
1860 pub async fn update_backfill_rate_limit_by_job_id(
1863 &self,
1864 job_id: JobId,
1865 rate_limit: Option<u32>,
1866 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1867 let update_backfill_rate_limit =
1868 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1869 let mut found = false;
1870 if fragment_type_mask
1871 .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1872 {
1873 visit_stream_node_mut(stream_node, |node| match node {
1874 PbNodeBody::StreamCdcScan(node) => {
1875 node.rate_limit = rate_limit;
1876 found = true;
1877 }
1878 PbNodeBody::StreamScan(node) => {
1879 node.rate_limit = rate_limit;
1880 found = true;
1881 }
1882 PbNodeBody::SourceBackfill(node) => {
1883 node.rate_limit = rate_limit;
1884 found = true;
1885 }
1886 PbNodeBody::Sink(node) => {
1887 node.rate_limit = rate_limit;
1888 found = true;
1889 }
1890 _ => {}
1891 });
1892 }
1893 Ok(found)
1894 };
1895
1896 self.mutate_fragments_by_job_id(
1897 job_id,
1898 update_backfill_rate_limit,
1899 "stream scan node or source node not found",
1900 )
1901 .await
1902 }
1903
1904 pub async fn update_sink_rate_limit_by_job_id(
1907 &self,
1908 sink_id: SinkId,
1909 rate_limit: Option<u32>,
1910 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1911 let update_sink_rate_limit =
1912 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1913 let mut found = Ok(false);
1914 if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1915 visit_stream_node_mut(stream_node, |node| {
1916 if let PbNodeBody::Sink(node) = node {
1917 if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1918 found = Err(MetaError::invalid_parameter(
1919 "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1920 ));
1921 return;
1922 }
1923 node.rate_limit = rate_limit;
1924 found = Ok(true);
1925 }
1926 });
1927 }
1928 found
1929 };
1930
1931 self.mutate_fragments_by_job_id(
1932 sink_id.as_job_id(),
1933 update_sink_rate_limit,
1934 "sink node not found",
1935 )
1936 .await
1937 }
1938
1939 pub async fn update_dml_rate_limit_by_job_id(
1940 &self,
1941 job_id: JobId,
1942 rate_limit: Option<u32>,
1943 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1944 let update_dml_rate_limit =
1945 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1946 let mut found = false;
1947 if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1948 visit_stream_node_mut(stream_node, |node| {
1949 if let PbNodeBody::Dml(node) = node {
1950 node.rate_limit = rate_limit;
1951 found = true;
1952 }
1953 });
1954 }
1955 Ok(found)
1956 };
1957
1958 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1959 .await
1960 }
1961
1962 pub async fn update_source_props_by_source_id(
1963 &self,
1964 source_id: SourceId,
1965 alter_props: BTreeMap<String, String>,
1966 alter_secret_refs: BTreeMap<String, PbSecretRef>,
1967 ) -> MetaResult<WithOptionsSecResolved> {
1968 let inner = self.inner.read().await;
1969 let txn = inner.db.begin().await?;
1970
1971 let (source, _obj) = Source::find_by_id(source_id)
1972 .find_also_related(Object)
1973 .one(&txn)
1974 .await?
1975 .ok_or_else(|| {
1976 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1977 })?;
1978 let connector = source.with_properties.0.get_connector().unwrap();
1979 let is_shared_source = source.is_shared();
1980
1981 let mut dep_source_job_ids: Vec<JobId> = Vec::new();
1982 if !is_shared_source {
1983 dep_source_job_ids = ObjectDependency::find()
1985 .select_only()
1986 .column(object_dependency::Column::UsedBy)
1987 .filter(object_dependency::Column::Oid.eq(source_id))
1988 .into_tuple()
1989 .all(&txn)
1990 .await?;
1991 }
1992
1993 let prop_keys: Vec<String> = alter_props
1995 .keys()
1996 .chain(alter_secret_refs.keys())
1997 .cloned()
1998 .collect();
1999 risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
2000 &connector, &prop_keys,
2001 )?;
2002
2003 let mut options_with_secret = WithOptionsSecResolved::new(
2004 source.with_properties.0.clone(),
2005 source
2006 .secret_ref
2007 .map(|secret_ref| secret_ref.to_protobuf())
2008 .unwrap_or_default(),
2009 );
2010 let (to_add_secret_dep, to_remove_secret_dep) =
2011 options_with_secret.handle_update(alter_props, alter_secret_refs)?;
2012
2013 tracing::info!(
2014 "applying new properties to source: source_id={}, options_with_secret={:?}",
2015 source_id,
2016 options_with_secret
2017 );
2018 let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
2020 let mut associate_table_id = None;
2023
2024 let mut preferred_id = source_id.as_object_id();
2028 let rewrite_sql = {
2029 let definition = source.definition.clone();
2030
2031 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2032 .map_err(|e| {
2033 MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2034 anyhow!(e).context("Failed to parse source definition SQL"),
2035 )))
2036 })?
2037 .try_into()
2038 .unwrap();
2039
2040 async fn format_with_option_secret_resolved(
2054 txn: &DatabaseTransaction,
2055 options_with_secret: &WithOptionsSecResolved,
2056 ) -> MetaResult<Vec<SqlOption>> {
2057 let mut options = Vec::new();
2058 for (k, v) in options_with_secret.as_plaintext() {
2059 let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2060 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2061 options.push(sql_option);
2062 }
2063 for (k, v) in options_with_secret.as_secret() {
2064 if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2065 let sql_option =
2066 SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2067 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2068 options.push(sql_option);
2069 } else {
2070 return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2071 }
2072 }
2073 Ok(options)
2074 }
2075
2076 match &mut stmt {
2077 Statement::CreateSource { stmt } => {
2078 stmt.with_properties.0 =
2079 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2080 }
2081 Statement::CreateTable { with_options, .. } => {
2082 *with_options =
2083 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2084 associate_table_id = source.optional_associated_table_id;
2085 preferred_id = associate_table_id.unwrap().as_object_id();
2086 }
2087 _ => unreachable!(),
2088 }
2089
2090 stmt.to_string()
2091 };
2092
2093 {
2094 if !to_add_secret_dep.is_empty() {
2096 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2097 object_dependency::ActiveModel {
2098 oid: Set(secret_id.into()),
2099 used_by: Set(preferred_id),
2100 ..Default::default()
2101 }
2102 }))
2103 .exec(&txn)
2104 .await?;
2105 }
2106 if !to_remove_secret_dep.is_empty() {
2107 let _ = ObjectDependency::delete_many()
2109 .filter(
2110 object_dependency::Column::Oid
2111 .is_in(to_remove_secret_dep)
2112 .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2113 )
2114 .exec(&txn)
2115 .await?;
2116 }
2117 }
2118
2119 let active_source_model = source::ActiveModel {
2120 source_id: Set(source_id),
2121 definition: Set(rewrite_sql.clone()),
2122 with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2123 secret_ref: Set((!options_with_secret.as_secret().is_empty())
2124 .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2125 ..Default::default()
2126 };
2127 active_source_model.update(&txn).await?;
2128
2129 if let Some(associate_table_id) = associate_table_id {
2130 let active_table_model = table::ActiveModel {
2132 table_id: Set(associate_table_id),
2133 definition: Set(rewrite_sql),
2134 ..Default::default()
2135 };
2136 active_table_model.update(&txn).await?;
2137 }
2138
2139 let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2140 associate_table_id.as_job_id()
2142 } else {
2143 source_id.as_share_source_job_id()
2144 }]
2145 .into_iter()
2146 .chain(dep_source_job_ids.into_iter())
2147 .collect_vec();
2148
2149 update_connector_props_fragments(
2151 &txn,
2152 to_check_job_ids,
2153 FragmentTypeFlag::Source,
2154 |node, found| {
2155 if let PbNodeBody::Source(node) = node
2156 && let Some(source_inner) = &mut node.source_inner
2157 {
2158 source_inner.with_properties = options_with_secret.as_plaintext().clone();
2159 source_inner.secret_refs = options_with_secret.as_secret().clone();
2160 *found = true;
2161 }
2162 },
2163 is_shared_source,
2164 )
2165 .await?;
2166
2167 let mut to_update_objs = Vec::with_capacity(2);
2168 let (source, obj) = Source::find_by_id(source_id)
2169 .find_also_related(Object)
2170 .one(&txn)
2171 .await?
2172 .ok_or_else(|| {
2173 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2174 })?;
2175 to_update_objs.push(PbObject {
2176 object_info: Some(PbObjectInfo::Source(
2177 ObjectModel(source, obj.unwrap()).into(),
2178 )),
2179 });
2180
2181 if let Some(associate_table_id) = associate_table_id {
2182 let (table, obj) = Table::find_by_id(associate_table_id)
2183 .find_also_related(Object)
2184 .one(&txn)
2185 .await?
2186 .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2187 to_update_objs.push(PbObject {
2188 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2189 });
2190 }
2191
2192 txn.commit().await?;
2193
2194 self.notify_frontend(
2195 NotificationOperation::Update,
2196 NotificationInfo::ObjectGroup(PbObjectGroup {
2197 objects: to_update_objs,
2198 }),
2199 )
2200 .await;
2201
2202 Ok(options_with_secret)
2203 }
2204
2205 pub async fn update_sink_props_by_sink_id(
2206 &self,
2207 sink_id: SinkId,
2208 props: BTreeMap<String, String>,
2209 ) -> MetaResult<HashMap<String, String>> {
2210 let inner = self.inner.read().await;
2211 let txn = inner.db.begin().await?;
2212
2213 let (sink, _obj) = Sink::find_by_id(sink_id)
2214 .find_also_related(Object)
2215 .one(&txn)
2216 .await?
2217 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2218 validate_sink_props(&sink, &props)?;
2219 let definition = sink.definition.clone();
2220 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2221 .map_err(|e| SinkError::Config(anyhow!(e)))?
2222 .try_into()
2223 .unwrap();
2224 if let Statement::CreateSink { stmt } = &mut stmt {
2225 update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2226 } else {
2227 panic!("definition is not a create sink statement")
2228 }
2229 let mut new_config = sink.properties.clone().into_inner();
2230 new_config.extend(props.clone());
2231
2232 let definition = stmt.to_string();
2233 let active_sink = sink::ActiveModel {
2234 sink_id: Set(sink_id),
2235 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2236 definition: Set(definition),
2237 ..Default::default()
2238 };
2239 active_sink.update(&txn).await?;
2240
2241 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2242 let (sink, obj) = Sink::find_by_id(sink_id)
2243 .find_also_related(Object)
2244 .one(&txn)
2245 .await?
2246 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2247 txn.commit().await?;
2248 let relation_infos = vec![PbObject {
2249 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2250 }];
2251
2252 let _version = self
2253 .notify_frontend(
2254 NotificationOperation::Update,
2255 NotificationInfo::ObjectGroup(PbObjectGroup {
2256 objects: relation_infos,
2257 }),
2258 )
2259 .await;
2260
2261 Ok(props.into_iter().collect())
2262 }
2263
2264 pub async fn update_iceberg_table_props_by_table_id(
2265 &self,
2266 table_id: TableId,
2267 props: BTreeMap<String, String>,
2268 alter_iceberg_table_props: Option<
2269 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2270 >,
2271 ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2272 let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2273 ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2274 let inner = self.inner.read().await;
2275 let txn = inner.db.begin().await?;
2276
2277 let (sink, _obj) = Sink::find_by_id(sink_id)
2278 .find_also_related(Object)
2279 .one(&txn)
2280 .await?
2281 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2282 validate_sink_props(&sink, &props)?;
2283
2284 let definition = sink.definition.clone();
2285 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2286 .map_err(|e| SinkError::Config(anyhow!(e)))?
2287 .try_into()
2288 .unwrap();
2289 if let Statement::CreateTable {
2290 with_options,
2291 engine,
2292 ..
2293 } = &mut stmt
2294 {
2295 if !matches!(engine, Engine::Iceberg) {
2296 return Err(SinkError::Config(anyhow!(
2297 "only iceberg table can be altered as sink"
2298 ))
2299 .into());
2300 }
2301 update_stmt_with_props(with_options, &props)?;
2302 } else {
2303 panic!("definition is not a create iceberg table statement")
2304 }
2305 let mut new_config = sink.properties.clone().into_inner();
2306 new_config.extend(props.clone());
2307
2308 let definition = stmt.to_string();
2309 let active_sink = sink::ActiveModel {
2310 sink_id: Set(sink_id),
2311 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2312 definition: Set(definition.clone()),
2313 ..Default::default()
2314 };
2315 let active_source = source::ActiveModel {
2316 source_id: Set(source_id),
2317 definition: Set(definition.clone()),
2318 ..Default::default()
2319 };
2320 let active_table = table::ActiveModel {
2321 table_id: Set(table_id),
2322 definition: Set(definition),
2323 ..Default::default()
2324 };
2325 active_sink.update(&txn).await?;
2326 active_source.update(&txn).await?;
2327 active_table.update(&txn).await?;
2328
2329 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2330
2331 let (sink, sink_obj) = Sink::find_by_id(sink_id)
2332 .find_also_related(Object)
2333 .one(&txn)
2334 .await?
2335 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2336 let (source, source_obj) = Source::find_by_id(source_id)
2337 .find_also_related(Object)
2338 .one(&txn)
2339 .await?
2340 .ok_or_else(|| {
2341 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2342 })?;
2343 let (table, table_obj) = Table::find_by_id(table_id)
2344 .find_also_related(Object)
2345 .one(&txn)
2346 .await?
2347 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2348 txn.commit().await?;
2349 let relation_infos = vec![
2350 PbObject {
2351 object_info: Some(PbObjectInfo::Sink(
2352 ObjectModel(sink, sink_obj.unwrap()).into(),
2353 )),
2354 },
2355 PbObject {
2356 object_info: Some(PbObjectInfo::Source(
2357 ObjectModel(source, source_obj.unwrap()).into(),
2358 )),
2359 },
2360 PbObject {
2361 object_info: Some(PbObjectInfo::Table(
2362 ObjectModel(table, table_obj.unwrap()).into(),
2363 )),
2364 },
2365 ];
2366 let _version = self
2367 .notify_frontend(
2368 NotificationOperation::Update,
2369 NotificationInfo::ObjectGroup(PbObjectGroup {
2370 objects: relation_infos,
2371 }),
2372 )
2373 .await;
2374
2375 Ok((props.into_iter().collect(), sink_id))
2376 }
2377
2378 pub async fn update_fragment_rate_limit_by_fragment_id(
2379 &self,
2380 fragment_id: FragmentId,
2381 rate_limit: Option<u32>,
2382 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2383 let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2384 stream_node: &mut PbStreamNode| {
2385 let mut found = false;
2386 if fragment_type_mask.contains_any(
2387 FragmentTypeFlag::dml_rate_limit_fragments()
2388 .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2389 .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2390 .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2391 ) {
2392 visit_stream_node_mut(stream_node, |node| {
2393 if let PbNodeBody::Dml(node) = node {
2394 node.rate_limit = rate_limit;
2395 found = true;
2396 }
2397 if let PbNodeBody::Sink(node) = node {
2398 node.rate_limit = rate_limit;
2399 found = true;
2400 }
2401 if let PbNodeBody::StreamCdcScan(node) = node {
2402 node.rate_limit = rate_limit;
2403 found = true;
2404 }
2405 if let PbNodeBody::StreamScan(node) = node {
2406 node.rate_limit = rate_limit;
2407 found = true;
2408 }
2409 if let PbNodeBody::SourceBackfill(node) = node {
2410 node.rate_limit = rate_limit;
2411 found = true;
2412 }
2413 });
2414 }
2415 found
2416 };
2417 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2418 .await
2419 }
2420
2421 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2424 let inner = self.inner.read().await;
2425 let txn = inner.db.begin().await?;
2426
2427 let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2428 .select_only()
2429 .columns([
2430 fragment::Column::FragmentId,
2431 fragment::Column::JobId,
2432 fragment::Column::FragmentTypeMask,
2433 fragment::Column::StreamNode,
2434 ])
2435 .filter(FragmentTypeMask::intersects_any(
2436 FragmentTypeFlag::rate_limit_fragments(),
2437 ))
2438 .into_tuple()
2439 .all(&txn)
2440 .await?;
2441
2442 let mut rate_limits = Vec::new();
2443 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2444 let stream_node = stream_node.to_protobuf();
2445 visit_stream_node_body(&stream_node, |node| {
2446 let mut rate_limit = None;
2447 let mut node_name = None;
2448
2449 match node {
2450 PbNodeBody::Source(node) => {
2452 if let Some(node_inner) = &node.source_inner {
2453 rate_limit = node_inner.rate_limit;
2454 node_name = Some("SOURCE");
2455 }
2456 }
2457 PbNodeBody::StreamFsFetch(node) => {
2458 if let Some(node_inner) = &node.node_inner {
2459 rate_limit = node_inner.rate_limit;
2460 node_name = Some("FS_FETCH");
2461 }
2462 }
2463 PbNodeBody::SourceBackfill(node) => {
2465 rate_limit = node.rate_limit;
2466 node_name = Some("SOURCE_BACKFILL");
2467 }
2468 PbNodeBody::StreamScan(node) => {
2469 rate_limit = node.rate_limit;
2470 node_name = Some("STREAM_SCAN");
2471 }
2472 PbNodeBody::StreamCdcScan(node) => {
2473 rate_limit = node.rate_limit;
2474 node_name = Some("STREAM_CDC_SCAN");
2475 }
2476 PbNodeBody::Sink(node) => {
2477 rate_limit = node.rate_limit;
2478 node_name = Some("SINK");
2479 }
2480 _ => {}
2481 }
2482
2483 if let Some(rate_limit) = rate_limit {
2484 rate_limits.push(RateLimitInfo {
2485 fragment_id,
2486 job_id,
2487 fragment_type_mask: fragment_type_mask as u32,
2488 rate_limit,
2489 node_name: node_name.unwrap().to_owned(),
2490 });
2491 }
2492 });
2493 }
2494
2495 Ok(rate_limits)
2496 }
2497}
2498
2499fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
2500 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2502 Some(connector) => {
2503 let connector_type = connector.to_lowercase();
2504 let field_names: Vec<String> = props.keys().cloned().collect();
2505 check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2506 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2507
2508 match_sink_name_str!(
2509 connector_type.as_str(),
2510 SinkType,
2511 {
2512 let mut new_props = sink.properties.0.clone();
2513 new_props.extend(props.clone());
2514 SinkType::validate_alter_config(&new_props)
2515 },
2516 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2517 )?
2518 }
2519 None => {
2520 return Err(
2521 SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2522 );
2523 }
2524 };
2525 Ok(())
2526}
2527
2528fn update_stmt_with_props(
2529 with_properties: &mut Vec<SqlOption>,
2530 props: &BTreeMap<String, String>,
2531) -> MetaResult<()> {
2532 let mut new_sql_options = with_properties
2533 .iter()
2534 .map(|sql_option| (&sql_option.name, sql_option))
2535 .collect::<IndexMap<_, _>>();
2536 let add_sql_options = props
2537 .iter()
2538 .map(|(k, v)| SqlOption::try_from((k, v)))
2539 .collect::<Result<Vec<SqlOption>, ParserError>>()
2540 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2541 new_sql_options.extend(
2542 add_sql_options
2543 .iter()
2544 .map(|sql_option| (&sql_option.name, sql_option)),
2545 );
2546 *with_properties = new_sql_options.into_values().cloned().collect();
2547 Ok(())
2548}
2549
2550async fn update_sink_fragment_props(
2551 txn: &DatabaseTransaction,
2552 sink_id: SinkId,
2553 props: BTreeMap<String, String>,
2554) -> MetaResult<()> {
2555 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2556 .select_only()
2557 .columns([
2558 fragment::Column::FragmentId,
2559 fragment::Column::FragmentTypeMask,
2560 fragment::Column::StreamNode,
2561 ])
2562 .filter(fragment::Column::JobId.eq(sink_id))
2563 .into_tuple()
2564 .all(txn)
2565 .await?;
2566 let fragments = fragments
2567 .into_iter()
2568 .filter(|(_, fragment_type_mask, _)| {
2569 *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
2570 })
2571 .filter_map(|(id, _, stream_node)| {
2572 let mut stream_node = stream_node.to_protobuf();
2573 let mut found = false;
2574 visit_stream_node_mut(&mut stream_node, |node| {
2575 if let PbNodeBody::Sink(node) = node
2576 && let Some(sink_desc) = &mut node.sink_desc
2577 && sink_desc.id == sink_id
2578 {
2579 sink_desc.properties.extend(props.clone());
2580 found = true;
2581 }
2582 });
2583 if found { Some((id, stream_node)) } else { None }
2584 })
2585 .collect_vec();
2586 assert!(
2587 !fragments.is_empty(),
2588 "sink id should be used by at least one fragment"
2589 );
2590 for (id, stream_node) in fragments {
2591 fragment::ActiveModel {
2592 fragment_id: Set(id),
2593 stream_node: Set(StreamNode::from(&stream_node)),
2594 ..Default::default()
2595 }
2596 .update(txn)
2597 .await?;
2598 }
2599 Ok(())
2600}
2601
2602pub struct SinkIntoTableContext {
2603 pub updated_sink_catalogs: Vec<SinkId>,
2606}
2607
2608pub struct FinishAutoRefreshSchemaSinkContext {
2609 pub tmp_sink_id: SinkId,
2610 pub original_sink_id: SinkId,
2611 pub columns: Vec<PbColumnCatalog>,
2612 pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
2613}
2614
2615async fn update_connector_props_fragments<F>(
2616 txn: &DatabaseTransaction,
2617 job_ids: Vec<JobId>,
2618 expect_flag: FragmentTypeFlag,
2619 mut alter_stream_node_fn: F,
2620 is_shared_source: bool,
2621) -> MetaResult<()>
2622where
2623 F: FnMut(&mut PbNodeBody, &mut bool),
2624{
2625 let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
2626 .select_only()
2627 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
2628 .filter(
2629 fragment::Column::JobId
2630 .is_in(job_ids.clone())
2631 .and(FragmentTypeMask::intersects(expect_flag)),
2632 )
2633 .into_tuple()
2634 .all(txn)
2635 .await?;
2636 let fragments = fragments
2637 .into_iter()
2638 .filter_map(|(id, stream_node)| {
2639 let mut stream_node = stream_node.to_protobuf();
2640 let mut found = false;
2641 visit_stream_node_mut(&mut stream_node, |node| {
2642 alter_stream_node_fn(node, &mut found);
2643 });
2644 if found { Some((id, stream_node)) } else { None }
2645 })
2646 .collect_vec();
2647 if is_shared_source || job_ids.len() > 1 {
2648 assert!(
2652 !fragments.is_empty(),
2653 "job ids {:?} (type: {:?}) should be used by at least one fragment",
2654 job_ids,
2655 expect_flag
2656 );
2657 }
2658
2659 for (id, stream_node) in fragments {
2660 fragment::ActiveModel {
2661 fragment_id: Set(id),
2662 stream_node: Set(StreamNode::from(&stream_node)),
2663 ..Default::default()
2664 }
2665 .update(txn)
2666 .await?;
2667 }
2668
2669 Ok(())
2670}