1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{self, FragmentTypeFlag, FragmentTypeMask};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::util::iter_util::ZipEqDebug;
25use risingwave_common::util::stream_graph_visitor::{
26 visit_stream_node_body, visit_stream_node_mut,
27};
28use risingwave_common::{bail, current_cluster_version};
29use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
30use risingwave_connector::error::ConnectorError;
31use risingwave_connector::sink::file_sink::fs::FsSink;
32use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
33use risingwave_connector::source::ConnectorProperties;
34use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
35use risingwave_meta_model::actor::ActorStatus;
36use risingwave_meta_model::object::ObjectType;
37use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
38use risingwave_meta_model::table::TableType;
39use risingwave_meta_model::user_privilege::Action;
40use risingwave_meta_model::*;
41use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
42use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId;
43use risingwave_pb::catalog::{PbCreateType, PbTable};
44use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
45use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
46use risingwave_pb::meta::object::PbObjectInfo;
47use risingwave_pb::meta::subscribe_response::{
48 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
49};
50use risingwave_pb::meta::table_fragments::PbActorStatus;
51use risingwave_pb::meta::{PbObject, PbObjectGroup};
52use risingwave_pb::plan_common::PbColumnCatalog;
53use risingwave_pb::secret::PbSecretRef;
54use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
55use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
56use risingwave_pb::stream_plan::stream_node::PbNodeBody;
57use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
58use risingwave_pb::user::PbUserInfo;
59use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
60use risingwave_sqlparser::parser::{Parser, ParserError};
61use sea_orm::ActiveValue::Set;
62use sea_orm::sea_query::{Expr, Query, SimpleExpr};
63use sea_orm::{
64 ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel,
65 JoinType, ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
66 TransactionTrait,
67};
68use thiserror_ext::AsReport;
69
70use super::rename::IndexItemRewriter;
71use crate::barrier::{Command, Reschedule};
72use crate::controller::ObjectModel;
73use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
74use crate::controller::fragment::FragmentTypeMaskExt;
75use crate::controller::utils::{
76 PartialObject, build_object_group_for_delete, check_relation_name_duplicate,
77 check_sink_into_table_cycle, ensure_job_not_canceled, ensure_object_id, ensure_user_id,
78 fetch_target_fragments, get_fragment_actor_ids, get_internal_tables_by_id, get_table_columns,
79 grant_default_privileges_automatically, insert_fragment_relations, list_user_info_by_ids,
80};
81use crate::error::MetaErrorInner;
82use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
83use crate::model::{
84 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamActor, StreamContext,
85 StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
86};
87use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
88use crate::{MetaError, MetaResult};
89
90impl CatalogController {
91 pub async fn create_streaming_job_obj(
92 txn: &DatabaseTransaction,
93 obj_type: ObjectType,
94 owner_id: UserId,
95 database_id: Option<DatabaseId>,
96 schema_id: Option<SchemaId>,
97 create_type: PbCreateType,
98 timezone: Option<String>,
99 streaming_parallelism: StreamingParallelism,
100 max_parallelism: usize,
101 specific_resource_group: Option<String>, ) -> MetaResult<ObjectId> {
103 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
104 let job = streaming_job::ActiveModel {
105 job_id: Set(obj.oid),
106 job_status: Set(JobStatus::Initial),
107 create_type: Set(create_type.into()),
108 timezone: Set(timezone),
109 parallelism: Set(streaming_parallelism),
110 max_parallelism: Set(max_parallelism as _),
111 specific_resource_group: Set(specific_resource_group),
112 };
113 job.insert(txn).await?;
114
115 Ok(obj.oid)
116 }
117
118 #[await_tree::instrument]
124 pub async fn create_job_catalog(
125 &self,
126 streaming_job: &mut StreamingJob,
127 ctx: &StreamContext,
128 parallelism: &Option<Parallelism>,
129 max_parallelism: usize,
130 mut dependencies: HashSet<ObjectId>,
131 specific_resource_group: Option<String>,
132 ) -> MetaResult<()> {
133 let inner = self.inner.write().await;
134 let txn = inner.db.begin().await?;
135 let create_type = streaming_job.create_type();
136
137 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
138 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
139 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
140 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
141 };
142
143 ensure_user_id(streaming_job.owner() as _, &txn).await?;
144 ensure_object_id(ObjectType::Database, streaming_job.database_id() as _, &txn).await?;
145 ensure_object_id(ObjectType::Schema, streaming_job.schema_id() as _, &txn).await?;
146 check_relation_name_duplicate(
147 &streaming_job.name(),
148 streaming_job.database_id() as _,
149 streaming_job.schema_id() as _,
150 &txn,
151 )
152 .await?;
153
154 if !dependencies.is_empty() {
156 let altering_cnt = ObjectDependency::find()
157 .join(
158 JoinType::InnerJoin,
159 object_dependency::Relation::Object1.def(),
160 )
161 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
162 .filter(
163 object_dependency::Column::Oid
164 .is_in(dependencies.clone())
165 .and(object::Column::ObjType.eq(ObjectType::Table))
166 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
167 .and(
168 object::Column::Oid.not_in_subquery(
170 Query::select()
171 .column(table::Column::TableId)
172 .from(Table)
173 .to_owned(),
174 ),
175 ),
176 )
177 .count(&txn)
178 .await?;
179 if altering_cnt != 0 {
180 return Err(MetaError::permission_denied(
181 "some dependent relations are being altered",
182 ));
183 }
184 }
185
186 match streaming_job {
187 StreamingJob::MaterializedView(table) => {
188 let job_id = Self::create_streaming_job_obj(
189 &txn,
190 ObjectType::Table,
191 table.owner as _,
192 Some(table.database_id as _),
193 Some(table.schema_id as _),
194 create_type,
195 ctx.timezone.clone(),
196 streaming_parallelism,
197 max_parallelism,
198 specific_resource_group,
199 )
200 .await?;
201 table.id = job_id as _;
202 let table_model: table::ActiveModel = table.clone().into();
203 Table::insert(table_model).exec(&txn).await?;
204 }
205 StreamingJob::Sink(sink) => {
206 if let Some(target_table_id) = sink.target_table
207 && check_sink_into_table_cycle(
208 target_table_id as ObjectId,
209 dependencies.iter().cloned().collect(),
210 &txn,
211 )
212 .await?
213 {
214 bail!("Creating such a sink will result in circular dependency.");
215 }
216
217 let job_id = Self::create_streaming_job_obj(
218 &txn,
219 ObjectType::Sink,
220 sink.owner as _,
221 Some(sink.database_id as _),
222 Some(sink.schema_id as _),
223 create_type,
224 ctx.timezone.clone(),
225 streaming_parallelism,
226 max_parallelism,
227 specific_resource_group,
228 )
229 .await?;
230 sink.id = job_id as _;
231 let sink_model: sink::ActiveModel = sink.clone().into();
232 Sink::insert(sink_model).exec(&txn).await?;
233 }
234 StreamingJob::Table(src, table, _) => {
235 let job_id = Self::create_streaming_job_obj(
236 &txn,
237 ObjectType::Table,
238 table.owner as _,
239 Some(table.database_id as _),
240 Some(table.schema_id as _),
241 create_type,
242 ctx.timezone.clone(),
243 streaming_parallelism,
244 max_parallelism,
245 specific_resource_group,
246 )
247 .await?;
248 table.id = job_id as _;
249 if let Some(src) = src {
250 let src_obj = Self::create_object(
251 &txn,
252 ObjectType::Source,
253 src.owner as _,
254 Some(src.database_id as _),
255 Some(src.schema_id as _),
256 )
257 .await?;
258 src.id = src_obj.oid as _;
259 src.optional_associated_table_id =
260 Some(PbOptionalAssociatedTableId::AssociatedTableId(job_id as _));
261 table.optional_associated_source_id = Some(
262 PbOptionalAssociatedSourceId::AssociatedSourceId(src_obj.oid as _),
263 );
264 let source: source::ActiveModel = src.clone().into();
265 Source::insert(source).exec(&txn).await?;
266 }
267 let table_model: table::ActiveModel = table.clone().into();
268 Table::insert(table_model).exec(&txn).await?;
269 }
270 StreamingJob::Index(index, table) => {
271 ensure_object_id(ObjectType::Table, index.primary_table_id as _, &txn).await?;
272 let job_id = Self::create_streaming_job_obj(
273 &txn,
274 ObjectType::Index,
275 index.owner as _,
276 Some(index.database_id as _),
277 Some(index.schema_id as _),
278 create_type,
279 ctx.timezone.clone(),
280 streaming_parallelism,
281 max_parallelism,
282 specific_resource_group,
283 )
284 .await?;
285 index.id = job_id as _;
287 index.index_table_id = job_id as _;
288 table.id = job_id as _;
289
290 ObjectDependency::insert(object_dependency::ActiveModel {
291 oid: Set(index.primary_table_id as _),
292 used_by: Set(table.id as _),
293 ..Default::default()
294 })
295 .exec(&txn)
296 .await?;
297
298 let table_model: table::ActiveModel = table.clone().into();
299 Table::insert(table_model).exec(&txn).await?;
300 let index_model: index::ActiveModel = index.clone().into();
301 Index::insert(index_model).exec(&txn).await?;
302 }
303 StreamingJob::Source(src) => {
304 let job_id = Self::create_streaming_job_obj(
305 &txn,
306 ObjectType::Source,
307 src.owner as _,
308 Some(src.database_id as _),
309 Some(src.schema_id as _),
310 create_type,
311 ctx.timezone.clone(),
312 streaming_parallelism,
313 max_parallelism,
314 specific_resource_group,
315 )
316 .await?;
317 src.id = job_id as _;
318 let source_model: source::ActiveModel = src.clone().into();
319 Source::insert(source_model).exec(&txn).await?;
320 }
321 }
322
323 dependencies.extend(
325 streaming_job
326 .dependent_secret_ids()?
327 .into_iter()
328 .map(|secret_id| secret_id as ObjectId),
329 );
330 dependencies.extend(
332 streaming_job
333 .dependent_connection_ids()?
334 .into_iter()
335 .map(|conn_id| conn_id as ObjectId),
336 );
337
338 if !dependencies.is_empty() {
340 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
341 object_dependency::ActiveModel {
342 oid: Set(oid),
343 used_by: Set(streaming_job.id() as _),
344 ..Default::default()
345 }
346 }))
347 .exec(&txn)
348 .await?;
349 }
350
351 txn.commit().await?;
352
353 Ok(())
354 }
355
356 pub async fn create_internal_table_catalog(
364 &self,
365 job: &StreamingJob,
366 mut incomplete_internal_tables: Vec<PbTable>,
367 ) -> MetaResult<HashMap<u32, u32>> {
368 let job_id = job.id() as ObjectId;
369 let inner = self.inner.write().await;
370 let txn = inner.db.begin().await?;
371
372 ensure_job_not_canceled(job_id, &txn).await?;
374
375 let mut table_id_map = HashMap::new();
376 for table in &mut incomplete_internal_tables {
377 let table_id = Self::create_object(
378 &txn,
379 ObjectType::Table,
380 table.owner as _,
381 Some(table.database_id as _),
382 Some(table.schema_id as _),
383 )
384 .await?
385 .oid;
386 table_id_map.insert(table.id, table_id as u32);
387 table.id = table_id as _;
388 table.job_id = Some(job_id as _);
389
390 let table_model = table::ActiveModel {
391 table_id: Set(table_id as _),
392 belongs_to_job_id: Set(Some(job_id)),
393 fragment_id: NotSet,
394 ..table.clone().into()
395 };
396 Table::insert(table_model).exec(&txn).await?;
397 }
398 txn.commit().await?;
399
400 Ok(table_id_map)
401 }
402
403 pub async fn prepare_stream_job_fragments(
404 &self,
405 stream_job_fragments: &StreamJobFragmentsToCreate,
406 streaming_job: &StreamingJob,
407 for_replace: bool,
408 ) -> MetaResult<()> {
409 self.prepare_streaming_job(
410 stream_job_fragments.stream_job_id().table_id as _,
411 || stream_job_fragments.fragments.values(),
412 &stream_job_fragments.actor_status,
413 &stream_job_fragments.downstreams,
414 for_replace,
415 Some(streaming_job),
416 )
417 .await
418 }
419
420 #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
425 )]
426 pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
427 &self,
428 job_id: ObjectId,
429 get_fragments: impl Fn() -> I + 'a,
430 actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
431 downstreams: &FragmentDownstreamRelation,
432 for_replace: bool,
433 creating_streaming_job: Option<&'a StreamingJob>,
434 ) -> MetaResult<()> {
435 let empty_actor_splits = Default::default();
438
439 let fragment_actors = Self::extract_fragment_and_actors_from_fragments(
440 job_id,
441 get_fragments(),
442 actor_status,
443 &empty_actor_splits,
444 )?;
445 let inner = self.inner.write().await;
446
447 let need_notify = creating_streaming_job
448 .map(|job| job.should_notify_creating())
449 .unwrap_or(false);
450 let definition = creating_streaming_job.map(|job| job.definition());
451
452 let mut objects_to_notify = vec![];
453 let txn = inner.db.begin().await?;
454
455 ensure_job_not_canceled(job_id, &txn).await?;
457
458 let (fragments, actors): (Vec<_>, Vec<_>) = fragment_actors.into_iter().unzip();
460 for fragment in fragments {
461 let fragment_id = fragment.fragment_id;
462 let state_table_ids = fragment.state_table_ids.inner_ref().clone();
463
464 let fragment = fragment.into_active_model();
465 Fragment::insert(fragment).exec(&txn).await?;
466
467 if !for_replace {
470 let all_tables = StreamJobFragments::collect_tables(get_fragments());
471 for state_table_id in state_table_ids {
472 let table = all_tables
476 .get(&(state_table_id as u32))
477 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
478 assert_eq!(table.id, state_table_id as u32);
479 assert_eq!(table.fragment_id, fragment_id as u32);
480 let vnode_count = table.vnode_count();
481
482 table::ActiveModel {
483 table_id: Set(state_table_id as _),
484 fragment_id: Set(Some(fragment_id)),
485 vnode_count: Set(vnode_count as _),
486 ..Default::default()
487 }
488 .update(&txn)
489 .await?;
490
491 if need_notify {
492 let mut table = table.clone();
493 if cfg!(not(debug_assertions)) && table.id == job_id as u32 {
495 table.definition = definition.clone().unwrap();
496 }
497 objects_to_notify.push(PbObject {
498 object_info: Some(PbObjectInfo::Table(table)),
499 });
500 }
501 }
502 }
503 }
504
505 if need_notify {
507 match creating_streaming_job.unwrap() {
508 StreamingJob::Sink(sink) => {
509 objects_to_notify.push(PbObject {
510 object_info: Some(PbObjectInfo::Sink(sink.clone())),
511 });
512 }
513 StreamingJob::Index(index, _) => {
514 objects_to_notify.push(PbObject {
515 object_info: Some(PbObjectInfo::Index(index.clone())),
516 });
517 }
518 _ => {}
519 }
520 }
521
522 insert_fragment_relations(&txn, downstreams).await?;
523
524 for actors in actors {
526 for actor in actors {
527 let actor = actor.into_active_model();
528 Actor::insert(actor).exec(&txn).await?;
529 }
530 }
531
532 if !for_replace {
533 if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
535 Table::update(table::ActiveModel {
536 table_id: Set(table.id as _),
537 dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)),
538 ..Default::default()
539 })
540 .exec(&txn)
541 .await?;
542 }
543 }
544
545 txn.commit().await?;
546
547 if !objects_to_notify.is_empty() {
551 self.notify_frontend(
552 Operation::Add,
553 Info::ObjectGroup(PbObjectGroup {
554 objects: objects_to_notify,
555 }),
556 )
557 .await;
558 }
559
560 Ok(())
561 }
562
563 pub async fn build_cancel_command(
566 &self,
567 table_fragments: &StreamJobFragments,
568 ) -> MetaResult<Command> {
569 let inner = self.inner.read().await;
570 let txn = inner.db.begin().await?;
571
572 let dropped_sink_fragment_with_target =
573 if let Some(sink_fragment) = table_fragments.sink_fragment() {
574 let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
575 let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
576 sink_target_fragment
577 .get(&sink_fragment_id)
578 .map(|target_fragments| {
579 let target_fragment_id = *target_fragments
580 .first()
581 .expect("sink should have at least one downstream fragment");
582 (sink_fragment_id, target_fragment_id)
583 })
584 } else {
585 None
586 };
587
588 Ok(Command::DropStreamingJobs {
589 streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
590 actors: table_fragments.actor_ids(),
591 unregistered_state_table_ids: table_fragments
592 .all_table_ids()
593 .map(catalog::TableId::new)
594 .collect(),
595 unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
596 dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
597 .into_iter()
598 .map(|(sink, target)| (target as _, vec![sink as _]))
599 .collect(),
600 })
601 }
602
603 #[await_tree::instrument]
607 pub async fn try_abort_creating_streaming_job(
608 &self,
609 job_id: ObjectId,
610 is_cancelled: bool,
611 ) -> MetaResult<(bool, Option<DatabaseId>)> {
612 let mut inner = self.inner.write().await;
613 let txn = inner.db.begin().await?;
614
615 let obj = Object::find_by_id(job_id).one(&txn).await?;
616 let Some(obj) = obj else {
617 tracing::warn!(
618 id = job_id,
619 "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
620 );
621 return Ok((true, None));
622 };
623 let database_id = obj
624 .database_id
625 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
626 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
627
628 if !is_cancelled && let Some(streaming_job) = &streaming_job {
629 assert_ne!(streaming_job.job_status, JobStatus::Created);
630 if streaming_job.create_type == CreateType::Background
631 && streaming_job.job_status == JobStatus::Creating
632 {
633 tracing::warn!(
635 id = job_id,
636 "streaming job is created in background and still in creating status"
637 );
638 return Ok((false, Some(database_id)));
639 }
640 }
641
642 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
643
644 let mut objs = vec![];
646 let table_obj = Table::find_by_id(job_id).one(&txn).await?;
647
648 let mut need_notify =
649 streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
650 if !need_notify {
651 if let Some(table) = &table_obj {
653 need_notify = table.table_type == TableType::MaterializedView;
654 }
655 }
656
657 if is_cancelled {
658 let dropped_tables = Table::find()
659 .find_also_related(Object)
660 .filter(
661 table::Column::TableId.is_in(
662 internal_table_ids
663 .iter()
664 .cloned()
665 .chain(table_obj.iter().map(|t| t.table_id as _)),
666 ),
667 )
668 .all(&txn)
669 .await?
670 .into_iter()
671 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
672 inner
673 .dropped_tables
674 .extend(dropped_tables.map(|t| (TableId::try_from(t.id).unwrap(), t)));
675 }
676
677 if need_notify {
678 let obj: Option<PartialObject> = Object::find_by_id(job_id)
679 .select_only()
680 .columns([
681 object::Column::Oid,
682 object::Column::ObjType,
683 object::Column::SchemaId,
684 object::Column::DatabaseId,
685 ])
686 .into_partial_model()
687 .one(&txn)
688 .await?;
689 let obj =
690 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
691 objs.push(obj);
692 let internal_table_objs: Vec<PartialObject> = Object::find()
693 .select_only()
694 .columns([
695 object::Column::Oid,
696 object::Column::ObjType,
697 object::Column::SchemaId,
698 object::Column::DatabaseId,
699 ])
700 .join(JoinType::InnerJoin, object::Relation::Table.def())
701 .filter(table::Column::BelongsToJobId.eq(job_id))
702 .into_partial_model()
703 .all(&txn)
704 .await?;
705 objs.extend(internal_table_objs);
706 }
707
708 if table_obj.is_none()
710 && let Some(Some(target_table_id)) = Sink::find_by_id(job_id)
711 .select_only()
712 .column(sink::Column::TargetTable)
713 .into_tuple::<Option<TableId>>()
714 .one(&txn)
715 .await?
716 {
717 let tmp_id: Option<ObjectId> = ObjectDependency::find()
718 .select_only()
719 .column(object_dependency::Column::UsedBy)
720 .join(
721 JoinType::InnerJoin,
722 object_dependency::Relation::Object1.def(),
723 )
724 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
725 .filter(
726 object_dependency::Column::Oid
727 .eq(target_table_id)
728 .and(object::Column::ObjType.eq(ObjectType::Table))
729 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
730 )
731 .into_tuple()
732 .one(&txn)
733 .await?;
734 if let Some(tmp_id) = tmp_id {
735 tracing::warn!(
736 id = tmp_id,
737 "aborting temp streaming job for sink into table"
738 );
739 Object::delete_by_id(tmp_id).exec(&txn).await?;
740 }
741 }
742
743 Object::delete_by_id(job_id).exec(&txn).await?;
744 if !internal_table_ids.is_empty() {
745 Object::delete_many()
746 .filter(object::Column::Oid.is_in(internal_table_ids))
747 .exec(&txn)
748 .await?;
749 }
750 if let Some(t) = &table_obj
751 && let Some(source_id) = t.optional_associated_source_id
752 {
753 Object::delete_by_id(source_id).exec(&txn).await?;
754 }
755
756 let err = if is_cancelled {
757 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
758 } else {
759 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
760 };
761 let abort_reason = format!("streaming job aborted {}", err.as_report());
762 for tx in inner
763 .creating_table_finish_notifier
764 .get_mut(&database_id)
765 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
766 .into_iter()
767 .flatten()
768 .flatten()
769 {
770 let _ = tx.send(Err(abort_reason.clone()));
771 }
772 txn.commit().await?;
773
774 if !objs.is_empty() {
775 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
778 .await;
779 }
780 Ok((true, Some(database_id)))
781 }
782
783 #[await_tree::instrument]
784 pub async fn post_collect_job_fragments(
785 &self,
786 job_id: ObjectId,
787 actor_ids: Vec<crate::model::ActorId>,
788 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
789 split_assignment: &SplitAssignment,
790 new_sink_downstream: Option<FragmentDownstreamRelation>,
791 ) -> MetaResult<()> {
792 let inner = self.inner.write().await;
793 let txn = inner.db.begin().await?;
794
795 let actor_ids = actor_ids.into_iter().map(|id| id as ActorId).collect_vec();
796
797 Actor::update_many()
798 .col_expr(
799 actor::Column::Status,
800 SimpleExpr::from(ActorStatus::Running.into_value()),
801 )
802 .filter(actor::Column::ActorId.is_in(actor_ids))
803 .exec(&txn)
804 .await?;
805
806 for splits in split_assignment.values() {
807 for (actor_id, splits) in splits {
808 let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
809 let connector_splits = &PbConnectorSplits { splits };
810 actor::ActiveModel {
811 actor_id: Set(*actor_id as _),
812 splits: Set(Some(connector_splits.into())),
813 ..Default::default()
814 }
815 .update(&txn)
816 .await?;
817 }
818 }
819
820 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
821
822 if let Some(new_downstream) = new_sink_downstream {
823 insert_fragment_relations(&txn, &new_downstream).await?;
824 }
825
826 streaming_job::ActiveModel {
828 job_id: Set(job_id),
829 job_status: Set(JobStatus::Creating),
830 ..Default::default()
831 }
832 .update(&txn)
833 .await?;
834
835 txn.commit().await?;
836
837 Ok(())
838 }
839
840 pub async fn create_job_catalog_for_replace(
841 &self,
842 streaming_job: &StreamingJob,
843 ctx: Option<&StreamContext>,
844 specified_parallelism: Option<&NonZeroUsize>,
845 expected_original_max_parallelism: Option<usize>,
846 ) -> MetaResult<ObjectId> {
847 let id = streaming_job.id();
848 let inner = self.inner.write().await;
849 let txn = inner.db.begin().await?;
850
851 streaming_job.verify_version_for_replace(&txn).await?;
853 let referring_cnt = ObjectDependency::find()
855 .join(
856 JoinType::InnerJoin,
857 object_dependency::Relation::Object1.def(),
858 )
859 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
860 .filter(
861 object_dependency::Column::Oid
862 .eq(id as ObjectId)
863 .and(object::Column::ObjType.eq(ObjectType::Table))
864 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
865 )
866 .count(&txn)
867 .await?;
868 if referring_cnt != 0 {
869 return Err(MetaError::permission_denied(
870 "job is being altered or referenced by some creating jobs",
871 ));
872 }
873
874 let (original_max_parallelism, original_timezone): (i32, Option<String>) =
876 StreamingJobModel::find_by_id(id as ObjectId)
877 .select_only()
878 .column(streaming_job::Column::MaxParallelism)
879 .column(streaming_job::Column::Timezone)
880 .into_tuple()
881 .one(&txn)
882 .await?
883 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
884
885 if let Some(max_parallelism) = expected_original_max_parallelism
886 && original_max_parallelism != max_parallelism as i32
887 {
888 bail!(
891 "cannot use a different max parallelism \
892 when replacing streaming job, \
893 original: {}, new: {}",
894 original_max_parallelism,
895 max_parallelism
896 );
897 }
898
899 let parallelism = match specified_parallelism {
900 None => StreamingParallelism::Adaptive,
901 Some(n) => StreamingParallelism::Fixed(n.get() as _),
902 };
903 let timezone = ctx
904 .map(|ctx| ctx.timezone.clone())
905 .unwrap_or(original_timezone);
906
907 let new_obj_id = Self::create_streaming_job_obj(
909 &txn,
910 streaming_job.object_type(),
911 streaming_job.owner() as _,
912 Some(streaming_job.database_id() as _),
913 Some(streaming_job.schema_id() as _),
914 streaming_job.create_type(),
915 timezone,
916 parallelism,
917 original_max_parallelism as _,
918 None,
919 )
920 .await?;
921
922 ObjectDependency::insert(object_dependency::ActiveModel {
924 oid: Set(id as _),
925 used_by: Set(new_obj_id as _),
926 ..Default::default()
927 })
928 .exec(&txn)
929 .await?;
930
931 txn.commit().await?;
932
933 Ok(new_obj_id)
934 }
935
936 pub async fn finish_streaming_job(&self, job_id: ObjectId) -> MetaResult<()> {
938 let mut inner = self.inner.write().await;
939 let txn = inner.db.begin().await?;
940
941 let job_type = Object::find_by_id(job_id)
942 .select_only()
943 .column(object::Column::ObjType)
944 .into_tuple()
945 .one(&txn)
946 .await?
947 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
948
949 let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
950 .select_only()
951 .column(streaming_job::Column::CreateType)
952 .into_tuple()
953 .one(&txn)
954 .await?
955 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
956
957 let res = Object::update_many()
959 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
960 .col_expr(
961 object::Column::CreatedAtClusterVersion,
962 current_cluster_version().into(),
963 )
964 .filter(object::Column::Oid.eq(job_id))
965 .exec(&txn)
966 .await?;
967 if res.rows_affected == 0 {
968 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
969 }
970
971 let job = streaming_job::ActiveModel {
973 job_id: Set(job_id),
974 job_status: Set(JobStatus::Created),
975 ..Default::default()
976 };
977 job.update(&txn).await?;
978
979 let internal_table_objs = Table::find()
981 .find_also_related(Object)
982 .filter(table::Column::BelongsToJobId.eq(job_id))
983 .all(&txn)
984 .await?;
985 let mut objects = internal_table_objs
986 .iter()
987 .map(|(table, obj)| PbObject {
988 object_info: Some(PbObjectInfo::Table(
989 ObjectModel(table.clone(), obj.clone().unwrap()).into(),
990 )),
991 })
992 .collect_vec();
993 let mut notification_op = if create_type == CreateType::Background {
994 NotificationOperation::Update
995 } else {
996 NotificationOperation::Add
997 };
998 let mut updated_user_info = vec![];
999
1000 match job_type {
1001 ObjectType::Table => {
1002 let (table, obj) = Table::find_by_id(job_id)
1003 .find_also_related(Object)
1004 .one(&txn)
1005 .await?
1006 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1007 if table.table_type == TableType::MaterializedView {
1008 notification_op = NotificationOperation::Update;
1009 }
1010
1011 if let Some(source_id) = table.optional_associated_source_id {
1012 let (src, obj) = Source::find_by_id(source_id)
1013 .find_also_related(Object)
1014 .one(&txn)
1015 .await?
1016 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1017 objects.push(PbObject {
1018 object_info: Some(PbObjectInfo::Source(
1019 ObjectModel(src, obj.unwrap()).into(),
1020 )),
1021 });
1022 }
1023 objects.push(PbObject {
1024 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1025 });
1026 }
1027 ObjectType::Sink => {
1028 let (sink, obj) = Sink::find_by_id(job_id)
1029 .find_also_related(Object)
1030 .one(&txn)
1031 .await?
1032 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1033 objects.push(PbObject {
1034 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
1035 });
1036 }
1037 ObjectType::Index => {
1038 let (index, obj) = Index::find_by_id(job_id)
1039 .find_also_related(Object)
1040 .one(&txn)
1041 .await?
1042 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1043 {
1044 let (table, obj) = Table::find_by_id(index.index_table_id)
1045 .find_also_related(Object)
1046 .one(&txn)
1047 .await?
1048 .ok_or_else(|| {
1049 MetaError::catalog_id_not_found("table", index.index_table_id)
1050 })?;
1051 objects.push(PbObject {
1052 object_info: Some(PbObjectInfo::Table(
1053 ObjectModel(table, obj.unwrap()).into(),
1054 )),
1055 });
1056 }
1057
1058 let primary_table_privileges = UserPrivilege::find()
1061 .filter(
1062 user_privilege::Column::Oid
1063 .eq(index.primary_table_id)
1064 .and(user_privilege::Column::Action.eq(Action::Select)),
1065 )
1066 .all(&txn)
1067 .await?;
1068 if !primary_table_privileges.is_empty() {
1069 let index_state_table_ids: Vec<TableId> = Table::find()
1070 .select_only()
1071 .column(table::Column::TableId)
1072 .filter(
1073 table::Column::BelongsToJobId
1074 .eq(job_id)
1075 .or(table::Column::TableId.eq(index.index_table_id)),
1076 )
1077 .into_tuple()
1078 .all(&txn)
1079 .await?;
1080 let mut new_privileges = vec![];
1081 for privilege in &primary_table_privileges {
1082 for state_table_id in &index_state_table_ids {
1083 new_privileges.push(user_privilege::ActiveModel {
1084 id: Default::default(),
1085 oid: Set(*state_table_id),
1086 user_id: Set(privilege.user_id),
1087 action: Set(Action::Select),
1088 dependent_id: Set(privilege.dependent_id),
1089 granted_by: Set(privilege.granted_by),
1090 with_grant_option: Set(privilege.with_grant_option),
1091 });
1092 }
1093 }
1094 UserPrivilege::insert_many(new_privileges)
1095 .exec(&txn)
1096 .await?;
1097
1098 updated_user_info = list_user_info_by_ids(
1099 primary_table_privileges.into_iter().map(|p| p.user_id),
1100 &txn,
1101 )
1102 .await?;
1103 }
1104
1105 objects.push(PbObject {
1106 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1107 });
1108 }
1109 ObjectType::Source => {
1110 let (source, obj) = Source::find_by_id(job_id)
1111 .find_also_related(Object)
1112 .one(&txn)
1113 .await?
1114 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1115 objects.push(PbObject {
1116 object_info: Some(PbObjectInfo::Source(
1117 ObjectModel(source, obj.unwrap()).into(),
1118 )),
1119 });
1120 }
1121 _ => unreachable!("invalid job type: {:?}", job_type),
1122 }
1123
1124 if job_type != ObjectType::Index {
1125 updated_user_info = grant_default_privileges_automatically(&txn, job_id).await?;
1126 }
1127 txn.commit().await?;
1128
1129 let mut version = self
1130 .notify_frontend(
1131 notification_op,
1132 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1133 )
1134 .await;
1135
1136 if !updated_user_info.is_empty() {
1138 version = self.notify_users_update(updated_user_info).await;
1139 }
1140
1141 inner
1142 .creating_table_finish_notifier
1143 .values_mut()
1144 .for_each(|creating_tables| {
1145 if let Some(txs) = creating_tables.remove(&job_id) {
1146 for tx in txs {
1147 let _ = tx.send(Ok(version));
1148 }
1149 }
1150 });
1151
1152 Ok(())
1153 }
1154
1155 pub async fn finish_replace_streaming_job(
1156 &self,
1157 tmp_id: ObjectId,
1158 streaming_job: StreamingJob,
1159 replace_upstream: FragmentReplaceUpstream,
1160 sink_into_table_context: SinkIntoTableContext,
1161 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1162 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1163 ) -> MetaResult<NotificationVersion> {
1164 let inner = self.inner.write().await;
1165 let txn = inner.db.begin().await?;
1166
1167 let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1168 tmp_id,
1169 replace_upstream,
1170 sink_into_table_context,
1171 &txn,
1172 streaming_job,
1173 drop_table_connector_ctx,
1174 auto_refresh_schema_sinks,
1175 )
1176 .await?;
1177
1178 txn.commit().await?;
1179
1180 let mut version = self
1181 .notify_frontend(
1182 NotificationOperation::Update,
1183 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1184 )
1185 .await;
1186
1187 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1188 self.notify_users_update(user_infos).await;
1189 version = self
1190 .notify_frontend(
1191 NotificationOperation::Delete,
1192 build_object_group_for_delete(to_drop_objects),
1193 )
1194 .await;
1195 }
1196
1197 Ok(version)
1198 }
1199
1200 pub async fn finish_replace_streaming_job_inner(
1201 tmp_id: ObjectId,
1202 replace_upstream: FragmentReplaceUpstream,
1203 SinkIntoTableContext {
1204 updated_sink_catalogs,
1205 }: SinkIntoTableContext,
1206 txn: &DatabaseTransaction,
1207 streaming_job: StreamingJob,
1208 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1209 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1210 ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1211 let original_job_id = streaming_job.id() as ObjectId;
1212 let job_type = streaming_job.job_type();
1213
1214 let mut index_item_rewriter = None;
1215
1216 match streaming_job {
1218 StreamingJob::Table(_source, table, _table_job_type) => {
1219 let original_column_catalogs = get_table_columns(txn, original_job_id).await?;
1222
1223 index_item_rewriter = Some({
1224 let original_columns = original_column_catalogs
1225 .to_protobuf()
1226 .into_iter()
1227 .map(|c| c.column_desc.unwrap())
1228 .collect_vec();
1229 let new_columns = table
1230 .columns
1231 .iter()
1232 .map(|c| c.column_desc.clone().unwrap())
1233 .collect_vec();
1234
1235 IndexItemRewriter {
1236 original_columns,
1237 new_columns,
1238 }
1239 });
1240
1241 for sink_id in updated_sink_catalogs {
1243 sink::ActiveModel {
1244 sink_id: Set(sink_id as _),
1245 original_target_columns: Set(Some(original_column_catalogs.clone())),
1246 ..Default::default()
1247 }
1248 .update(txn)
1249 .await?;
1250 }
1251 let mut table = table::ActiveModel::from(table);
1253 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1254 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1255 {
1256 table.optional_associated_source_id = Set(None);
1258 }
1259
1260 table.update(txn).await?;
1261 }
1262 StreamingJob::Source(source) => {
1263 let source = source::ActiveModel::from(source);
1265 source.update(txn).await?;
1266 }
1267 StreamingJob::MaterializedView(table) => {
1268 let table = table::ActiveModel::from(table);
1270 table.update(txn).await?;
1271 }
1272 _ => unreachable!(
1273 "invalid streaming job type: {:?}",
1274 streaming_job.job_type_str()
1275 ),
1276 }
1277
1278 async fn finish_fragments(
1279 txn: &DatabaseTransaction,
1280 tmp_id: ObjectId,
1281 original_job_id: ObjectId,
1282 replace_upstream: FragmentReplaceUpstream,
1283 ) -> MetaResult<()> {
1284 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1288 .select_only()
1289 .columns([
1290 fragment::Column::FragmentId,
1291 fragment::Column::StateTableIds,
1292 ])
1293 .filter(fragment::Column::JobId.eq(tmp_id))
1294 .into_tuple()
1295 .all(txn)
1296 .await?;
1297 for (fragment_id, state_table_ids) in fragment_info {
1298 for state_table_id in state_table_ids.into_inner() {
1299 table::ActiveModel {
1300 table_id: Set(state_table_id as _),
1301 fragment_id: Set(Some(fragment_id)),
1302 ..Default::default()
1304 }
1305 .update(txn)
1306 .await?;
1307 }
1308 }
1309
1310 Fragment::delete_many()
1312 .filter(fragment::Column::JobId.eq(original_job_id))
1313 .exec(txn)
1314 .await?;
1315 Fragment::update_many()
1316 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1317 .filter(fragment::Column::JobId.eq(tmp_id))
1318 .exec(txn)
1319 .await?;
1320
1321 for (fragment_id, fragment_replace_map) in replace_upstream {
1324 let (fragment_id, mut stream_node) =
1325 Fragment::find_by_id(fragment_id as FragmentId)
1326 .select_only()
1327 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1328 .into_tuple::<(FragmentId, StreamNode)>()
1329 .one(txn)
1330 .await?
1331 .map(|(id, node)| (id, node.to_protobuf()))
1332 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1333
1334 visit_stream_node_mut(&mut stream_node, |body| {
1335 if let PbNodeBody::Merge(m) = body
1336 && let Some(new_fragment_id) =
1337 fragment_replace_map.get(&m.upstream_fragment_id)
1338 {
1339 m.upstream_fragment_id = *new_fragment_id;
1340 }
1341 });
1342 fragment::ActiveModel {
1343 fragment_id: Set(fragment_id),
1344 stream_node: Set(StreamNode::from(&stream_node)),
1345 ..Default::default()
1346 }
1347 .update(txn)
1348 .await?;
1349 }
1350
1351 Object::delete_by_id(tmp_id).exec(txn).await?;
1353
1354 Ok(())
1355 }
1356
1357 finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1358
1359 let mut objects = vec![];
1361 match job_type {
1362 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1363 let (table, table_obj) = Table::find_by_id(original_job_id)
1364 .find_also_related(Object)
1365 .one(txn)
1366 .await?
1367 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1368 objects.push(PbObject {
1369 object_info: Some(PbObjectInfo::Table(
1370 ObjectModel(table, table_obj.unwrap()).into(),
1371 )),
1372 })
1373 }
1374 StreamingJobType::Source => {
1375 let (source, source_obj) = Source::find_by_id(original_job_id)
1376 .find_also_related(Object)
1377 .one(txn)
1378 .await?
1379 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1380 objects.push(PbObject {
1381 object_info: Some(PbObjectInfo::Source(
1382 ObjectModel(source, source_obj.unwrap()).into(),
1383 )),
1384 })
1385 }
1386 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1387 }
1388
1389 if let Some(expr_rewriter) = index_item_rewriter {
1390 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1391 .select_only()
1392 .columns([index::Column::IndexId, index::Column::IndexItems])
1393 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1394 .into_tuple()
1395 .all(txn)
1396 .await?;
1397 for (index_id, nodes) in index_items {
1398 let mut pb_nodes = nodes.to_protobuf();
1399 pb_nodes
1400 .iter_mut()
1401 .for_each(|x| expr_rewriter.rewrite_expr(x));
1402 let index = index::ActiveModel {
1403 index_id: Set(index_id),
1404 index_items: Set(pb_nodes.into()),
1405 ..Default::default()
1406 }
1407 .update(txn)
1408 .await?;
1409 let index_obj = index
1410 .find_related(Object)
1411 .one(txn)
1412 .await?
1413 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1414 objects.push(PbObject {
1415 object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1416 });
1417 }
1418 }
1419
1420 if let Some(sinks) = auto_refresh_schema_sinks {
1421 for finish_sink_context in sinks {
1422 finish_fragments(
1423 txn,
1424 finish_sink_context.tmp_sink_id,
1425 finish_sink_context.original_sink_id,
1426 Default::default(),
1427 )
1428 .await?;
1429 let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1430 .find_also_related(Object)
1431 .one(txn)
1432 .await?
1433 .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1434 let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1435 Sink::update(sink::ActiveModel {
1436 sink_id: Set(finish_sink_context.original_sink_id),
1437 columns: Set(columns.clone()),
1438 ..Default::default()
1439 })
1440 .exec(txn)
1441 .await?;
1442 sink.columns = columns;
1443 objects.push(PbObject {
1444 object_info: Some(PbObjectInfo::Sink(
1445 ObjectModel(sink, sink_obj.unwrap()).into(),
1446 )),
1447 });
1448 if let Some((log_store_table_id, new_log_store_table_columns)) =
1449 finish_sink_context.new_log_store_table
1450 {
1451 let new_log_store_table_columns: ColumnCatalogArray =
1452 new_log_store_table_columns.into();
1453 let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1454 .find_also_related(Object)
1455 .one(txn)
1456 .await?
1457 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1458 Table::update(table::ActiveModel {
1459 table_id: Set(log_store_table_id),
1460 columns: Set(new_log_store_table_columns.clone()),
1461 ..Default::default()
1462 })
1463 .exec(txn)
1464 .await?;
1465 table.columns = new_log_store_table_columns;
1466 objects.push(PbObject {
1467 object_info: Some(PbObjectInfo::Table(
1468 ObjectModel(table, table_obj.unwrap()).into(),
1469 )),
1470 });
1471 }
1472 }
1473 }
1474
1475 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1476 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1477 notification_objs =
1478 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1479 }
1480
1481 Ok((objects, notification_objs))
1482 }
1483
1484 pub async fn try_abort_replacing_streaming_job(
1486 &self,
1487 tmp_job_id: ObjectId,
1488 tmp_sink_ids: Option<Vec<ObjectId>>,
1489 ) -> MetaResult<()> {
1490 let inner = self.inner.write().await;
1491 let txn = inner.db.begin().await?;
1492 Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1493 if let Some(tmp_sink_ids) = tmp_sink_ids {
1494 for tmp_sink_id in tmp_sink_ids {
1495 Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1496 }
1497 }
1498 txn.commit().await?;
1499 Ok(())
1500 }
1501
1502 pub async fn update_source_rate_limit_by_source_id(
1505 &self,
1506 source_id: SourceId,
1507 rate_limit: Option<u32>,
1508 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1509 let inner = self.inner.read().await;
1510 let txn = inner.db.begin().await?;
1511
1512 {
1513 let active_source = source::ActiveModel {
1514 source_id: Set(source_id),
1515 rate_limit: Set(rate_limit.map(|v| v as i32)),
1516 ..Default::default()
1517 };
1518 active_source.update(&txn).await?;
1519 }
1520
1521 let (source, obj) = Source::find_by_id(source_id)
1522 .find_also_related(Object)
1523 .one(&txn)
1524 .await?
1525 .ok_or_else(|| {
1526 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1527 })?;
1528
1529 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1530 let streaming_job_ids: Vec<ObjectId> =
1531 if let Some(table_id) = source.optional_associated_table_id {
1532 vec![table_id]
1533 } else if let Some(source_info) = &source.source_info
1534 && source_info.to_protobuf().is_shared()
1535 {
1536 vec![source_id]
1537 } else {
1538 ObjectDependency::find()
1539 .select_only()
1540 .column(object_dependency::Column::UsedBy)
1541 .filter(object_dependency::Column::Oid.eq(source_id))
1542 .into_tuple()
1543 .all(&txn)
1544 .await?
1545 };
1546
1547 if streaming_job_ids.is_empty() {
1548 return Err(MetaError::invalid_parameter(format!(
1549 "source id {source_id} not used by any streaming job"
1550 )));
1551 }
1552
1553 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1554 .select_only()
1555 .columns([
1556 fragment::Column::FragmentId,
1557 fragment::Column::FragmentTypeMask,
1558 fragment::Column::StreamNode,
1559 ])
1560 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1561 .into_tuple()
1562 .all(&txn)
1563 .await?;
1564 let mut fragments = fragments
1565 .into_iter()
1566 .map(|(id, mask, stream_node)| {
1567 (
1568 id,
1569 FragmentTypeMask::from(mask as u32),
1570 stream_node.to_protobuf(),
1571 )
1572 })
1573 .collect_vec();
1574
1575 fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1576 let mut found = false;
1577 if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1578 visit_stream_node_mut(stream_node, |node| {
1579 if let PbNodeBody::Source(node) = node
1580 && let Some(node_inner) = &mut node.source_inner
1581 && node_inner.source_id == source_id as u32
1582 {
1583 node_inner.rate_limit = rate_limit;
1584 found = true;
1585 }
1586 });
1587 }
1588 if is_fs_source {
1589 visit_stream_node_mut(stream_node, |node| {
1592 if let PbNodeBody::StreamFsFetch(node) = node {
1593 fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1594 if let Some(node_inner) = &mut node.node_inner
1595 && node_inner.source_id == source_id as u32
1596 {
1597 node_inner.rate_limit = rate_limit;
1598 found = true;
1599 }
1600 }
1601 });
1602 }
1603 found
1604 });
1605
1606 assert!(
1607 !fragments.is_empty(),
1608 "source id should be used by at least one fragment"
1609 );
1610 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1611 for (id, fragment_type_mask, stream_node) in fragments {
1612 fragment::ActiveModel {
1613 fragment_id: Set(id),
1614 fragment_type_mask: Set(fragment_type_mask.into()),
1615 stream_node: Set(StreamNode::from(&stream_node)),
1616 ..Default::default()
1617 }
1618 .update(&txn)
1619 .await?;
1620 }
1621 let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1622
1623 txn.commit().await?;
1624
1625 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1626 let _version = self
1627 .notify_frontend(
1628 NotificationOperation::Update,
1629 NotificationInfo::ObjectGroup(PbObjectGroup {
1630 objects: vec![PbObject {
1631 object_info: Some(relation_info),
1632 }],
1633 }),
1634 )
1635 .await;
1636
1637 Ok(fragment_actors)
1638 }
1639
1640 pub async fn mutate_fragments_by_job_id(
1643 &self,
1644 job_id: ObjectId,
1645 mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1647 err_msg: &'static str,
1649 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1650 let inner = self.inner.read().await;
1651 let txn = inner.db.begin().await?;
1652
1653 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1654 .select_only()
1655 .columns([
1656 fragment::Column::FragmentId,
1657 fragment::Column::FragmentTypeMask,
1658 fragment::Column::StreamNode,
1659 ])
1660 .filter(fragment::Column::JobId.eq(job_id))
1661 .into_tuple()
1662 .all(&txn)
1663 .await?;
1664 let mut fragments = fragments
1665 .into_iter()
1666 .map(|(id, mask, stream_node)| {
1667 (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1668 })
1669 .collect_vec();
1670
1671 let fragments = fragments
1672 .iter_mut()
1673 .map(|(_, fragment_type_mask, stream_node)| {
1674 fragments_mutation_fn(*fragment_type_mask, stream_node)
1675 })
1676 .collect::<MetaResult<Vec<bool>>>()?
1677 .into_iter()
1678 .zip_eq_debug(std::mem::take(&mut fragments))
1679 .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1680 .collect::<Vec<_>>();
1681
1682 if fragments.is_empty() {
1683 return Err(MetaError::invalid_parameter(format!(
1684 "job id {job_id}: {}",
1685 err_msg
1686 )));
1687 }
1688
1689 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1690 for (id, _, stream_node) in fragments {
1691 fragment::ActiveModel {
1692 fragment_id: Set(id),
1693 stream_node: Set(StreamNode::from(&stream_node)),
1694 ..Default::default()
1695 }
1696 .update(&txn)
1697 .await?;
1698 }
1699 let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1700
1701 txn.commit().await?;
1702
1703 Ok(fragment_actors)
1704 }
1705
1706 async fn mutate_fragment_by_fragment_id(
1707 &self,
1708 fragment_id: FragmentId,
1709 mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1710 err_msg: &'static str,
1711 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1712 let inner = self.inner.read().await;
1713 let txn = inner.db.begin().await?;
1714
1715 let (fragment_type_mask, stream_node): (i32, StreamNode) =
1716 Fragment::find_by_id(fragment_id)
1717 .select_only()
1718 .columns([
1719 fragment::Column::FragmentTypeMask,
1720 fragment::Column::StreamNode,
1721 ])
1722 .into_tuple()
1723 .one(&txn)
1724 .await?
1725 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1726 let mut pb_stream_node = stream_node.to_protobuf();
1727 let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1728
1729 if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1730 return Err(MetaError::invalid_parameter(format!(
1731 "fragment id {fragment_id}: {}",
1732 err_msg
1733 )));
1734 }
1735
1736 fragment::ActiveModel {
1737 fragment_id: Set(fragment_id),
1738 stream_node: Set(stream_node),
1739 ..Default::default()
1740 }
1741 .update(&txn)
1742 .await?;
1743
1744 let fragment_actors = get_fragment_actor_ids(&txn, vec![fragment_id]).await?;
1745
1746 txn.commit().await?;
1747
1748 Ok(fragment_actors)
1749 }
1750
1751 pub async fn update_backfill_rate_limit_by_job_id(
1754 &self,
1755 job_id: ObjectId,
1756 rate_limit: Option<u32>,
1757 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1758 let update_backfill_rate_limit =
1759 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1760 let mut found = false;
1761 if fragment_type_mask
1762 .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1763 {
1764 visit_stream_node_mut(stream_node, |node| match node {
1765 PbNodeBody::StreamCdcScan(node) => {
1766 node.rate_limit = rate_limit;
1767 found = true;
1768 }
1769 PbNodeBody::StreamScan(node) => {
1770 node.rate_limit = rate_limit;
1771 found = true;
1772 }
1773 PbNodeBody::SourceBackfill(node) => {
1774 node.rate_limit = rate_limit;
1775 found = true;
1776 }
1777 PbNodeBody::Sink(node) => {
1778 node.rate_limit = rate_limit;
1779 found = true;
1780 }
1781 _ => {}
1782 });
1783 }
1784 Ok(found)
1785 };
1786
1787 self.mutate_fragments_by_job_id(
1788 job_id,
1789 update_backfill_rate_limit,
1790 "stream scan node or source node not found",
1791 )
1792 .await
1793 }
1794
1795 pub async fn update_sink_rate_limit_by_job_id(
1798 &self,
1799 job_id: ObjectId,
1800 rate_limit: Option<u32>,
1801 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1802 let update_sink_rate_limit =
1803 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1804 let mut found = Ok(false);
1805 if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1806 visit_stream_node_mut(stream_node, |node| {
1807 if let PbNodeBody::Sink(node) = node {
1808 if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1809 found = Err(MetaError::invalid_parameter(
1810 "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1811 ));
1812 return;
1813 }
1814 node.rate_limit = rate_limit;
1815 found = Ok(true);
1816 }
1817 });
1818 }
1819 found
1820 };
1821
1822 self.mutate_fragments_by_job_id(job_id, update_sink_rate_limit, "sink node not found")
1823 .await
1824 }
1825
1826 pub async fn update_dml_rate_limit_by_job_id(
1827 &self,
1828 job_id: ObjectId,
1829 rate_limit: Option<u32>,
1830 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1831 let update_dml_rate_limit =
1832 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1833 let mut found = false;
1834 if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1835 visit_stream_node_mut(stream_node, |node| {
1836 if let PbNodeBody::Dml(node) = node {
1837 node.rate_limit = rate_limit;
1838 found = true;
1839 }
1840 });
1841 }
1842 Ok(found)
1843 };
1844
1845 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1846 .await
1847 }
1848
1849 pub async fn update_source_props_by_source_id(
1850 &self,
1851 source_id: SourceId,
1852 alter_props: BTreeMap<String, String>,
1853 alter_secret_refs: BTreeMap<String, PbSecretRef>,
1854 ) -> MetaResult<WithOptionsSecResolved> {
1855 let inner = self.inner.read().await;
1856 let txn = inner.db.begin().await?;
1857
1858 let (source, _obj) = Source::find_by_id(source_id)
1859 .find_also_related(Object)
1860 .one(&txn)
1861 .await?
1862 .ok_or_else(|| {
1863 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1864 })?;
1865 let connector = source.with_properties.0.get_connector().unwrap();
1866 let is_shared_source = source.is_shared();
1867
1868 let mut dep_source_job_ids: Vec<ObjectId> = Vec::new();
1869 if !is_shared_source {
1870 dep_source_job_ids = ObjectDependency::find()
1872 .select_only()
1873 .column(object_dependency::Column::UsedBy)
1874 .filter(object_dependency::Column::Oid.eq(source_id))
1875 .into_tuple()
1876 .all(&txn)
1877 .await?;
1878 }
1879
1880 let prop_keys: Vec<String> = alter_props
1882 .keys()
1883 .chain(alter_secret_refs.keys())
1884 .cloned()
1885 .collect();
1886 risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
1887 &connector, &prop_keys,
1888 )?;
1889
1890 let mut options_with_secret = WithOptionsSecResolved::new(
1891 source.with_properties.0.clone(),
1892 source
1893 .secret_ref
1894 .map(|secret_ref| secret_ref.to_protobuf())
1895 .unwrap_or_default(),
1896 );
1897 let (to_add_secret_dep, to_remove_secret_dep) =
1898 options_with_secret.handle_update(alter_props, alter_secret_refs)?;
1899
1900 tracing::info!(
1901 "applying new properties to source: source_id={}, options_with_secret={:?}",
1902 source_id,
1903 options_with_secret
1904 );
1905 let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
1907 let mut associate_table_id = None;
1910
1911 let mut preferred_id: i32 = source_id;
1915 let rewrite_sql = {
1916 let definition = source.definition.clone();
1917
1918 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1919 .map_err(|e| {
1920 MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
1921 anyhow!(e).context("Failed to parse source definition SQL"),
1922 )))
1923 })?
1924 .try_into()
1925 .unwrap();
1926
1927 async fn format_with_option_secret_resolved(
1941 txn: &DatabaseTransaction,
1942 options_with_secret: &WithOptionsSecResolved,
1943 ) -> MetaResult<Vec<SqlOption>> {
1944 let mut options = Vec::new();
1945 for (k, v) in options_with_secret.as_plaintext() {
1946 let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
1947 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1948 options.push(sql_option);
1949 }
1950 for (k, v) in options_with_secret.as_secret() {
1951 if let Some(secret_model) =
1952 Secret::find_by_id(v.secret_id as i32).one(txn).await?
1953 {
1954 let sql_option =
1955 SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
1956 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1957 options.push(sql_option);
1958 } else {
1959 return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
1960 }
1961 }
1962 Ok(options)
1963 }
1964
1965 match &mut stmt {
1966 Statement::CreateSource { stmt } => {
1967 stmt.with_properties.0 =
1968 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
1969 }
1970 Statement::CreateTable { with_options, .. } => {
1971 *with_options =
1972 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
1973 associate_table_id = source.optional_associated_table_id;
1974 preferred_id = associate_table_id.unwrap();
1975 }
1976 _ => unreachable!(),
1977 }
1978
1979 stmt.to_string()
1980 };
1981
1982 {
1983 if !to_add_secret_dep.is_empty() {
1985 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
1986 object_dependency::ActiveModel {
1987 oid: Set(secret_id as _),
1988 used_by: Set(preferred_id as _),
1989 ..Default::default()
1990 }
1991 }))
1992 .exec(&txn)
1993 .await?;
1994 }
1995 if !to_remove_secret_dep.is_empty() {
1996 let _ = ObjectDependency::delete_many()
1998 .filter(
1999 object_dependency::Column::Oid
2000 .is_in(to_remove_secret_dep)
2001 .and(
2002 object_dependency::Column::UsedBy.eq::<ObjectId>(preferred_id as _),
2003 ),
2004 )
2005 .exec(&txn)
2006 .await?;
2007 }
2008 }
2009
2010 let active_source_model = source::ActiveModel {
2011 source_id: Set(source_id),
2012 definition: Set(rewrite_sql.clone()),
2013 with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2014 secret_ref: Set((!options_with_secret.as_secret().is_empty())
2015 .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2016 ..Default::default()
2017 };
2018 active_source_model.update(&txn).await?;
2019
2020 if let Some(associate_table_id) = associate_table_id {
2021 let active_table_model = table::ActiveModel {
2023 table_id: Set(associate_table_id),
2024 definition: Set(rewrite_sql),
2025 ..Default::default()
2026 };
2027 active_table_model.update(&txn).await?;
2028 }
2029
2030 let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2031 associate_table_id
2033 } else {
2034 source_id
2035 }]
2036 .into_iter()
2037 .chain(dep_source_job_ids.into_iter())
2038 .collect_vec();
2039
2040 update_connector_props_fragments(
2042 &txn,
2043 to_check_job_ids,
2044 FragmentTypeFlag::Source,
2045 |node, found| {
2046 if let PbNodeBody::Source(node) = node
2047 && let Some(source_inner) = &mut node.source_inner
2048 {
2049 source_inner.with_properties = options_with_secret.as_plaintext().clone();
2050 source_inner.secret_refs = options_with_secret.as_secret().clone();
2051 *found = true;
2052 }
2053 },
2054 is_shared_source,
2055 )
2056 .await?;
2057
2058 let mut to_update_objs = Vec::with_capacity(2);
2059 let (source, obj) = Source::find_by_id(source_id)
2060 .find_also_related(Object)
2061 .one(&txn)
2062 .await?
2063 .ok_or_else(|| {
2064 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2065 })?;
2066 to_update_objs.push(PbObject {
2067 object_info: Some(PbObjectInfo::Source(
2068 ObjectModel(source, obj.unwrap()).into(),
2069 )),
2070 });
2071
2072 if let Some(associate_table_id) = associate_table_id {
2073 let (table, obj) = Table::find_by_id(associate_table_id)
2074 .find_also_related(Object)
2075 .one(&txn)
2076 .await?
2077 .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2078 to_update_objs.push(PbObject {
2079 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2080 });
2081 }
2082
2083 txn.commit().await?;
2084
2085 self.notify_frontend(
2086 NotificationOperation::Update,
2087 NotificationInfo::ObjectGroup(PbObjectGroup {
2088 objects: to_update_objs,
2089 }),
2090 )
2091 .await;
2092
2093 Ok(options_with_secret)
2094 }
2095
2096 pub async fn update_sink_props_by_sink_id(
2097 &self,
2098 sink_id: SinkId,
2099 props: BTreeMap<String, String>,
2100 ) -> MetaResult<HashMap<String, String>> {
2101 let inner = self.inner.read().await;
2102 let txn = inner.db.begin().await?;
2103
2104 let (sink, _obj) = Sink::find_by_id(sink_id)
2105 .find_also_related(Object)
2106 .one(&txn)
2107 .await?
2108 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2109 validate_sink_props(&sink, &props)?;
2110 let definition = sink.definition.clone();
2111 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2112 .map_err(|e| SinkError::Config(anyhow!(e)))?
2113 .try_into()
2114 .unwrap();
2115 if let Statement::CreateSink { stmt } = &mut stmt {
2116 update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2117 } else {
2118 panic!("definition is not a create sink statement")
2119 }
2120 let mut new_config = sink.properties.clone().into_inner();
2121 new_config.extend(props.clone());
2122
2123 let definition = stmt.to_string();
2124 let active_sink = sink::ActiveModel {
2125 sink_id: Set(sink_id),
2126 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2127 definition: Set(definition),
2128 ..Default::default()
2129 };
2130 active_sink.update(&txn).await?;
2131
2132 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2133 let (sink, obj) = Sink::find_by_id(sink_id)
2134 .find_also_related(Object)
2135 .one(&txn)
2136 .await?
2137 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2138 txn.commit().await?;
2139 let relation_infos = vec![PbObject {
2140 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2141 }];
2142
2143 let _version = self
2144 .notify_frontend(
2145 NotificationOperation::Update,
2146 NotificationInfo::ObjectGroup(PbObjectGroup {
2147 objects: relation_infos,
2148 }),
2149 )
2150 .await;
2151
2152 Ok(props.into_iter().collect())
2153 }
2154
2155 pub async fn update_iceberg_table_props_by_table_id(
2156 &self,
2157 table_id: TableId,
2158 props: BTreeMap<String, String>,
2159 alter_iceberg_table_props: Option<
2160 risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2161 >,
2162 ) -> MetaResult<(HashMap<String, String>, u32)> {
2163 let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2164 ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2165 let inner = self.inner.read().await;
2166 let txn = inner.db.begin().await?;
2167
2168 let (sink, _obj) = Sink::find_by_id(sink_id)
2169 .find_also_related(Object)
2170 .one(&txn)
2171 .await?
2172 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2173 validate_sink_props(&sink, &props)?;
2174
2175 let definition = sink.definition.clone();
2176 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2177 .map_err(|e| SinkError::Config(anyhow!(e)))?
2178 .try_into()
2179 .unwrap();
2180 if let Statement::CreateTable {
2181 with_options,
2182 engine,
2183 ..
2184 } = &mut stmt
2185 {
2186 if !matches!(engine, Engine::Iceberg) {
2187 return Err(SinkError::Config(anyhow!(
2188 "only iceberg table can be altered as sink"
2189 ))
2190 .into());
2191 }
2192 update_stmt_with_props(with_options, &props)?;
2193 } else {
2194 panic!("definition is not a create iceberg table statement")
2195 }
2196 let mut new_config = sink.properties.clone().into_inner();
2197 new_config.extend(props.clone());
2198
2199 let definition = stmt.to_string();
2200 let active_sink = sink::ActiveModel {
2201 sink_id: Set(sink_id),
2202 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2203 definition: Set(definition.clone()),
2204 ..Default::default()
2205 };
2206 let active_source = source::ActiveModel {
2207 source_id: Set(source_id),
2208 definition: Set(definition.clone()),
2209 ..Default::default()
2210 };
2211 let active_table = table::ActiveModel {
2212 table_id: Set(table_id),
2213 definition: Set(definition),
2214 ..Default::default()
2215 };
2216 active_sink.update(&txn).await?;
2217 active_source.update(&txn).await?;
2218 active_table.update(&txn).await?;
2219
2220 update_sink_fragment_props(&txn, sink_id, new_config).await?;
2221
2222 let (sink, sink_obj) = Sink::find_by_id(sink_id)
2223 .find_also_related(Object)
2224 .one(&txn)
2225 .await?
2226 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2227 let (source, source_obj) = Source::find_by_id(source_id)
2228 .find_also_related(Object)
2229 .one(&txn)
2230 .await?
2231 .ok_or_else(|| {
2232 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2233 })?;
2234 let (table, table_obj) = Table::find_by_id(table_id)
2235 .find_also_related(Object)
2236 .one(&txn)
2237 .await?
2238 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2239 txn.commit().await?;
2240 let relation_infos = vec![
2241 PbObject {
2242 object_info: Some(PbObjectInfo::Sink(
2243 ObjectModel(sink, sink_obj.unwrap()).into(),
2244 )),
2245 },
2246 PbObject {
2247 object_info: Some(PbObjectInfo::Source(
2248 ObjectModel(source, source_obj.unwrap()).into(),
2249 )),
2250 },
2251 PbObject {
2252 object_info: Some(PbObjectInfo::Table(
2253 ObjectModel(table, table_obj.unwrap()).into(),
2254 )),
2255 },
2256 ];
2257 let _version = self
2258 .notify_frontend(
2259 NotificationOperation::Update,
2260 NotificationInfo::ObjectGroup(PbObjectGroup {
2261 objects: relation_infos,
2262 }),
2263 )
2264 .await;
2265
2266 Ok((props.into_iter().collect(), sink_id as u32))
2267 }
2268
2269 pub async fn update_fragment_rate_limit_by_fragment_id(
2270 &self,
2271 fragment_id: FragmentId,
2272 rate_limit: Option<u32>,
2273 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2274 let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2275 stream_node: &mut PbStreamNode| {
2276 let mut found = false;
2277 if fragment_type_mask.contains_any(
2278 FragmentTypeFlag::dml_rate_limit_fragments()
2279 .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2280 .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2281 .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2282 ) {
2283 visit_stream_node_mut(stream_node, |node| {
2284 if let PbNodeBody::Dml(node) = node {
2285 node.rate_limit = rate_limit;
2286 found = true;
2287 }
2288 if let PbNodeBody::Sink(node) = node {
2289 node.rate_limit = rate_limit;
2290 found = true;
2291 }
2292 if let PbNodeBody::StreamCdcScan(node) = node {
2293 node.rate_limit = rate_limit;
2294 found = true;
2295 }
2296 if let PbNodeBody::StreamScan(node) = node {
2297 node.rate_limit = rate_limit;
2298 found = true;
2299 }
2300 if let PbNodeBody::SourceBackfill(node) = node {
2301 node.rate_limit = rate_limit;
2302 found = true;
2303 }
2304 });
2305 }
2306 found
2307 };
2308 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2309 .await
2310 }
2311
2312 pub async fn post_apply_reschedules(
2313 &self,
2314 reschedules: HashMap<FragmentId, Reschedule>,
2315 post_updates: &JobReschedulePostUpdates,
2316 ) -> MetaResult<()> {
2317 let new_created_actors: HashSet<_> = reschedules
2318 .values()
2319 .flat_map(|reschedule| {
2320 reschedule
2321 .added_actors
2322 .values()
2323 .flatten()
2324 .map(|actor_id| *actor_id as ActorId)
2325 })
2326 .collect();
2327
2328 let inner = self.inner.write().await;
2329
2330 let txn = inner.db.begin().await?;
2331
2332 for Reschedule {
2333 removed_actors,
2334 vnode_bitmap_updates,
2335 actor_splits,
2336 newly_created_actors,
2337 ..
2338 } in reschedules.into_values()
2339 {
2340 Actor::delete_many()
2342 .filter(
2343 actor::Column::ActorId
2344 .is_in(removed_actors.iter().map(|id| *id as ActorId).collect_vec()),
2345 )
2346 .exec(&txn)
2347 .await?;
2348
2349 for (
2351 (
2352 StreamActor {
2353 actor_id,
2354 fragment_id,
2355 vnode_bitmap,
2356 expr_context,
2357 ..
2358 },
2359 _,
2360 ),
2361 worker_id,
2362 ) in newly_created_actors.into_values()
2363 {
2364 let splits = actor_splits
2365 .get(&actor_id)
2366 .map(|splits| splits.iter().map(PbConnectorSplit::from).collect_vec());
2367
2368 Actor::insert(actor::ActiveModel {
2369 actor_id: Set(actor_id as _),
2370 fragment_id: Set(fragment_id as _),
2371 status: Set(ActorStatus::Running),
2372 splits: Set(splits.map(|splits| (&PbConnectorSplits { splits }).into())),
2373 worker_id: Set(worker_id),
2374 upstream_actor_ids: Set(Default::default()),
2375 vnode_bitmap: Set(vnode_bitmap
2376 .as_ref()
2377 .map(|bitmap| (&bitmap.to_protobuf()).into())),
2378 expr_context: Set(expr_context.as_ref().unwrap().into()),
2379 })
2380 .exec(&txn)
2381 .await?;
2382 }
2383
2384 for (actor_id, bitmap) in vnode_bitmap_updates {
2386 let actor = Actor::find_by_id(actor_id as ActorId)
2387 .one(&txn)
2388 .await?
2389 .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
2390
2391 let mut actor = actor.into_active_model();
2392 actor.vnode_bitmap = Set(Some((&bitmap.to_protobuf()).into()));
2393 actor.update(&txn).await?;
2394 }
2395
2396 for (actor_id, splits) in actor_splits {
2398 if new_created_actors.contains(&(actor_id as ActorId)) {
2399 continue;
2400 }
2401
2402 let actor = Actor::find_by_id(actor_id as ActorId)
2403 .one(&txn)
2404 .await?
2405 .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
2406
2407 let mut actor = actor.into_active_model();
2408 let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
2409 actor.splits = Set(Some((&PbConnectorSplits { splits }).into()));
2410 actor.update(&txn).await?;
2411 }
2412 }
2413
2414 let JobReschedulePostUpdates {
2415 parallelism_updates,
2416 resource_group_updates,
2417 } = post_updates;
2418
2419 for (table_id, parallelism) in parallelism_updates {
2420 let mut streaming_job = StreamingJobModel::find_by_id(table_id.table_id() as ObjectId)
2421 .one(&txn)
2422 .await?
2423 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?
2424 .into_active_model();
2425
2426 streaming_job.parallelism = Set(match parallelism {
2427 TableParallelism::Adaptive => StreamingParallelism::Adaptive,
2428 TableParallelism::Fixed(n) => StreamingParallelism::Fixed(*n as _),
2429 TableParallelism::Custom => StreamingParallelism::Custom,
2430 });
2431
2432 if let Some(resource_group) =
2433 resource_group_updates.get(&(table_id.table_id() as ObjectId))
2434 {
2435 streaming_job.specific_resource_group = Set(resource_group.to_owned());
2436 }
2437
2438 streaming_job.update(&txn).await?;
2439 }
2440
2441 txn.commit().await?;
2442
2443 Ok(())
2444 }
2445
2446 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2449 let inner = self.inner.read().await;
2450 let txn = inner.db.begin().await?;
2451
2452 let fragments: Vec<(FragmentId, ObjectId, i32, StreamNode)> = Fragment::find()
2453 .select_only()
2454 .columns([
2455 fragment::Column::FragmentId,
2456 fragment::Column::JobId,
2457 fragment::Column::FragmentTypeMask,
2458 fragment::Column::StreamNode,
2459 ])
2460 .filter(FragmentTypeMask::intersects_any(
2461 FragmentTypeFlag::rate_limit_fragments(),
2462 ))
2463 .into_tuple()
2464 .all(&txn)
2465 .await?;
2466
2467 let mut rate_limits = Vec::new();
2468 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2469 let stream_node = stream_node.to_protobuf();
2470 visit_stream_node_body(&stream_node, |node| {
2471 let mut rate_limit = None;
2472 let mut node_name = None;
2473
2474 match node {
2475 PbNodeBody::Source(node) => {
2477 if let Some(node_inner) = &node.source_inner {
2478 rate_limit = node_inner.rate_limit;
2479 node_name = Some("SOURCE");
2480 }
2481 }
2482 PbNodeBody::StreamFsFetch(node) => {
2483 if let Some(node_inner) = &node.node_inner {
2484 rate_limit = node_inner.rate_limit;
2485 node_name = Some("FS_FETCH");
2486 }
2487 }
2488 PbNodeBody::SourceBackfill(node) => {
2490 rate_limit = node.rate_limit;
2491 node_name = Some("SOURCE_BACKFILL");
2492 }
2493 PbNodeBody::StreamScan(node) => {
2494 rate_limit = node.rate_limit;
2495 node_name = Some("STREAM_SCAN");
2496 }
2497 PbNodeBody::StreamCdcScan(node) => {
2498 rate_limit = node.rate_limit;
2499 node_name = Some("STREAM_CDC_SCAN");
2500 }
2501 PbNodeBody::Sink(node) => {
2502 rate_limit = node.rate_limit;
2503 node_name = Some("SINK");
2504 }
2505 _ => {}
2506 }
2507
2508 if let Some(rate_limit) = rate_limit {
2509 rate_limits.push(RateLimitInfo {
2510 fragment_id: fragment_id as u32,
2511 job_id: job_id as u32,
2512 fragment_type_mask: fragment_type_mask as u32,
2513 rate_limit,
2514 node_name: node_name.unwrap().to_owned(),
2515 });
2516 }
2517 });
2518 }
2519
2520 Ok(rate_limits)
2521 }
2522}
2523
2524fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
2525 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2527 Some(connector) => {
2528 let connector_type = connector.to_lowercase();
2529 let field_names: Vec<String> = props.keys().cloned().collect();
2530 check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2531 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2532
2533 match_sink_name_str!(
2534 connector_type.as_str(),
2535 SinkType,
2536 {
2537 let mut new_props = sink.properties.0.clone();
2538 new_props.extend(props.clone());
2539 SinkType::validate_alter_config(&new_props)
2540 },
2541 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2542 )?
2543 }
2544 None => {
2545 return Err(
2546 SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2547 );
2548 }
2549 };
2550 Ok(())
2551}
2552
2553fn update_stmt_with_props(
2554 with_properties: &mut Vec<SqlOption>,
2555 props: &BTreeMap<String, String>,
2556) -> MetaResult<()> {
2557 let mut new_sql_options = with_properties
2558 .iter()
2559 .map(|sql_option| (&sql_option.name, sql_option))
2560 .collect::<IndexMap<_, _>>();
2561 let add_sql_options = props
2562 .iter()
2563 .map(|(k, v)| SqlOption::try_from((k, v)))
2564 .collect::<Result<Vec<SqlOption>, ParserError>>()
2565 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2566 new_sql_options.extend(
2567 add_sql_options
2568 .iter()
2569 .map(|sql_option| (&sql_option.name, sql_option)),
2570 );
2571 *with_properties = new_sql_options.into_values().cloned().collect();
2572 Ok(())
2573}
2574
2575async fn update_sink_fragment_props(
2576 txn: &DatabaseTransaction,
2577 sink_id: SinkId,
2578 props: BTreeMap<String, String>,
2579) -> MetaResult<()> {
2580 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2581 .select_only()
2582 .columns([
2583 fragment::Column::FragmentId,
2584 fragment::Column::FragmentTypeMask,
2585 fragment::Column::StreamNode,
2586 ])
2587 .filter(fragment::Column::JobId.eq(sink_id))
2588 .into_tuple()
2589 .all(txn)
2590 .await?;
2591 let fragments = fragments
2592 .into_iter()
2593 .filter(|(_, fragment_type_mask, _)| {
2594 *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
2595 })
2596 .filter_map(|(id, _, stream_node)| {
2597 let mut stream_node = stream_node.to_protobuf();
2598 let mut found = false;
2599 visit_stream_node_mut(&mut stream_node, |node| {
2600 if let PbNodeBody::Sink(node) = node
2601 && let Some(sink_desc) = &mut node.sink_desc
2602 && sink_desc.id == sink_id as u32
2603 {
2604 sink_desc.properties.extend(props.clone());
2605 found = true;
2606 }
2607 });
2608 if found { Some((id, stream_node)) } else { None }
2609 })
2610 .collect_vec();
2611 assert!(
2612 !fragments.is_empty(),
2613 "sink id should be used by at least one fragment"
2614 );
2615 for (id, stream_node) in fragments {
2616 fragment::ActiveModel {
2617 fragment_id: Set(id),
2618 stream_node: Set(StreamNode::from(&stream_node)),
2619 ..Default::default()
2620 }
2621 .update(txn)
2622 .await?;
2623 }
2624 Ok(())
2625}
2626
2627pub struct SinkIntoTableContext {
2628 pub updated_sink_catalogs: Vec<SinkId>,
2631}
2632
2633pub struct FinishAutoRefreshSchemaSinkContext {
2634 pub tmp_sink_id: ObjectId,
2635 pub original_sink_id: ObjectId,
2636 pub columns: Vec<PbColumnCatalog>,
2637 pub new_log_store_table: Option<(ObjectId, Vec<PbColumnCatalog>)>,
2638}
2639
2640async fn update_connector_props_fragments<F>(
2641 txn: &DatabaseTransaction,
2642 job_ids: Vec<i32>,
2643 expect_flag: FragmentTypeFlag,
2644 mut alter_stream_node_fn: F,
2645 is_shared_source: bool,
2646) -> MetaResult<()>
2647where
2648 F: FnMut(&mut PbNodeBody, &mut bool),
2649{
2650 let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
2651 .select_only()
2652 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
2653 .filter(
2654 fragment::Column::JobId
2655 .is_in(job_ids.clone())
2656 .and(FragmentTypeMask::intersects(expect_flag)),
2657 )
2658 .into_tuple()
2659 .all(txn)
2660 .await?;
2661 let fragments = fragments
2662 .into_iter()
2663 .filter_map(|(id, stream_node)| {
2664 let mut stream_node = stream_node.to_protobuf();
2665 let mut found = false;
2666 visit_stream_node_mut(&mut stream_node, |node| {
2667 alter_stream_node_fn(node, &mut found);
2668 });
2669 if found { Some((id, stream_node)) } else { None }
2670 })
2671 .collect_vec();
2672 if is_shared_source || job_ids.len() > 1 {
2673 assert!(
2677 !fragments.is_empty(),
2678 "job ids {:?} (type: {:?}) should be used by at least one fragment",
2679 job_ids,
2680 expect_flag
2681 );
2682 }
2683
2684 for (id, stream_node) in fragments {
2685 fragment::ActiveModel {
2686 fragment_id: Set(id),
2687 stream_node: Set(StreamNode::from(&stream_node)),
2688 ..Default::default()
2689 }
2690 .update(txn)
2691 .await?;
2692 }
2693
2694 Ok(())
2695}