1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{self, FragmentTypeFlag, FragmentTypeMask};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::util::iter_util::ZipEqDebug;
25use risingwave_common::util::stream_graph_visitor::{
26 visit_stream_node_body, visit_stream_node_mut,
27};
28use risingwave_common::{bail, current_cluster_version};
29use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
30use risingwave_connector::error::ConnectorError;
31use risingwave_connector::sink::file_sink::fs::FsSink;
32use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
33use risingwave_connector::source::{ConnectorProperties, SplitImpl};
34use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
35use risingwave_meta_model::actor::ActorStatus;
36use risingwave_meta_model::object::ObjectType;
37use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
38use risingwave_meta_model::table::TableType;
39use risingwave_meta_model::user_privilege::Action;
40use risingwave_meta_model::*;
41use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
42use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId;
43use risingwave_pb::catalog::{PbCreateType, PbTable};
44use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
45use risingwave_pb::meta::object::PbObjectInfo;
46use risingwave_pb::meta::subscribe_response::{
47 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
48};
49use risingwave_pb::meta::table_fragments::PbActorStatus;
50use risingwave_pb::meta::{PbObject, PbObjectGroup};
51use risingwave_pb::plan_common::PbColumnCatalog;
52use risingwave_pb::secret::PbSecretRef;
53use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
55use risingwave_pb::stream_plan::stream_node::PbNodeBody;
56use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
57use risingwave_pb::user::PbUserInfo;
58use risingwave_sqlparser::ast::{SqlOption, Statement};
59use risingwave_sqlparser::parser::{Parser, ParserError};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::{Expr, Query, SimpleExpr};
62use sea_orm::{
63 ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel,
64 JoinType, ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
65 TransactionTrait,
66};
67use thiserror_ext::AsReport;
68
69use super::rename::IndexItemRewriter;
70use crate::barrier::{Command, Reschedule};
71use crate::controller::ObjectModel;
72use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
73use crate::controller::fragment::FragmentTypeMaskExt;
74use crate::controller::utils::{
75 PartialObject, build_object_group_for_delete, check_relation_name_duplicate,
76 check_sink_into_table_cycle, ensure_job_not_canceled, ensure_object_id, ensure_user_id,
77 fetch_target_fragments, get_fragment_actor_ids, get_internal_tables_by_id, get_table_columns,
78 grant_default_privileges_automatically, insert_fragment_relations, list_user_info_by_ids,
79};
80use crate::error::MetaErrorInner;
81use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
82use crate::model::{
83 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamActor, StreamContext,
84 StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
85};
86use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
87use crate::{MetaError, MetaResult};
88
89impl CatalogController {
90 pub async fn create_streaming_job_obj(
91 txn: &DatabaseTransaction,
92 obj_type: ObjectType,
93 owner_id: UserId,
94 database_id: Option<DatabaseId>,
95 schema_id: Option<SchemaId>,
96 create_type: PbCreateType,
97 timezone: Option<String>,
98 streaming_parallelism: StreamingParallelism,
99 max_parallelism: usize,
100 specific_resource_group: Option<String>, ) -> MetaResult<ObjectId> {
102 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
103 let job = streaming_job::ActiveModel {
104 job_id: Set(obj.oid),
105 job_status: Set(JobStatus::Initial),
106 create_type: Set(create_type.into()),
107 timezone: Set(timezone),
108 parallelism: Set(streaming_parallelism),
109 max_parallelism: Set(max_parallelism as _),
110 specific_resource_group: Set(specific_resource_group),
111 };
112 job.insert(txn).await?;
113
114 Ok(obj.oid)
115 }
116
117 #[await_tree::instrument]
123 pub async fn create_job_catalog(
124 &self,
125 streaming_job: &mut StreamingJob,
126 ctx: &StreamContext,
127 parallelism: &Option<Parallelism>,
128 max_parallelism: usize,
129 mut dependencies: HashSet<ObjectId>,
130 specific_resource_group: Option<String>,
131 ) -> MetaResult<()> {
132 let inner = self.inner.write().await;
133 let txn = inner.db.begin().await?;
134 let create_type = streaming_job.create_type();
135
136 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
137 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
138 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
139 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
140 };
141
142 ensure_user_id(streaming_job.owner() as _, &txn).await?;
143 ensure_object_id(ObjectType::Database, streaming_job.database_id() as _, &txn).await?;
144 ensure_object_id(ObjectType::Schema, streaming_job.schema_id() as _, &txn).await?;
145 check_relation_name_duplicate(
146 &streaming_job.name(),
147 streaming_job.database_id() as _,
148 streaming_job.schema_id() as _,
149 &txn,
150 )
151 .await?;
152
153 if !dependencies.is_empty() {
155 let altering_cnt = ObjectDependency::find()
156 .join(
157 JoinType::InnerJoin,
158 object_dependency::Relation::Object1.def(),
159 )
160 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
161 .filter(
162 object_dependency::Column::Oid
163 .is_in(dependencies.clone())
164 .and(object::Column::ObjType.eq(ObjectType::Table))
165 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
166 .and(
167 object::Column::Oid.not_in_subquery(
169 Query::select()
170 .column(table::Column::TableId)
171 .from(Table)
172 .to_owned(),
173 ),
174 ),
175 )
176 .count(&txn)
177 .await?;
178 if altering_cnt != 0 {
179 return Err(MetaError::permission_denied(
180 "some dependent relations are being altered",
181 ));
182 }
183 }
184
185 match streaming_job {
186 StreamingJob::MaterializedView(table) => {
187 let job_id = Self::create_streaming_job_obj(
188 &txn,
189 ObjectType::Table,
190 table.owner as _,
191 Some(table.database_id as _),
192 Some(table.schema_id as _),
193 create_type,
194 ctx.timezone.clone(),
195 streaming_parallelism,
196 max_parallelism,
197 specific_resource_group,
198 )
199 .await?;
200 table.id = job_id as _;
201 let table_model: table::ActiveModel = table.clone().into();
202 Table::insert(table_model).exec(&txn).await?;
203 }
204 StreamingJob::Sink(sink) => {
205 if let Some(target_table_id) = sink.target_table
206 && check_sink_into_table_cycle(
207 target_table_id as ObjectId,
208 dependencies.iter().cloned().collect(),
209 &txn,
210 )
211 .await?
212 {
213 bail!("Creating such a sink will result in circular dependency.");
214 }
215
216 let job_id = Self::create_streaming_job_obj(
217 &txn,
218 ObjectType::Sink,
219 sink.owner as _,
220 Some(sink.database_id as _),
221 Some(sink.schema_id as _),
222 create_type,
223 ctx.timezone.clone(),
224 streaming_parallelism,
225 max_parallelism,
226 specific_resource_group,
227 )
228 .await?;
229 sink.id = job_id as _;
230 let sink_model: sink::ActiveModel = sink.clone().into();
231 Sink::insert(sink_model).exec(&txn).await?;
232 }
233 StreamingJob::Table(src, table, _) => {
234 let job_id = Self::create_streaming_job_obj(
235 &txn,
236 ObjectType::Table,
237 table.owner as _,
238 Some(table.database_id as _),
239 Some(table.schema_id as _),
240 create_type,
241 ctx.timezone.clone(),
242 streaming_parallelism,
243 max_parallelism,
244 specific_resource_group,
245 )
246 .await?;
247 table.id = job_id as _;
248 if let Some(src) = src {
249 let src_obj = Self::create_object(
250 &txn,
251 ObjectType::Source,
252 src.owner as _,
253 Some(src.database_id as _),
254 Some(src.schema_id as _),
255 )
256 .await?;
257 src.id = src_obj.oid as _;
258 src.optional_associated_table_id =
259 Some(PbOptionalAssociatedTableId::AssociatedTableId(job_id as _));
260 table.optional_associated_source_id = Some(
261 PbOptionalAssociatedSourceId::AssociatedSourceId(src_obj.oid as _),
262 );
263 let source: source::ActiveModel = src.clone().into();
264 Source::insert(source).exec(&txn).await?;
265 }
266 let table_model: table::ActiveModel = table.clone().into();
267 Table::insert(table_model).exec(&txn).await?;
268 }
269 StreamingJob::Index(index, table) => {
270 ensure_object_id(ObjectType::Table, index.primary_table_id as _, &txn).await?;
271 let job_id = Self::create_streaming_job_obj(
272 &txn,
273 ObjectType::Index,
274 index.owner as _,
275 Some(index.database_id as _),
276 Some(index.schema_id as _),
277 create_type,
278 ctx.timezone.clone(),
279 streaming_parallelism,
280 max_parallelism,
281 specific_resource_group,
282 )
283 .await?;
284 index.id = job_id as _;
286 index.index_table_id = job_id as _;
287 table.id = job_id as _;
288
289 ObjectDependency::insert(object_dependency::ActiveModel {
290 oid: Set(index.primary_table_id as _),
291 used_by: Set(table.id as _),
292 ..Default::default()
293 })
294 .exec(&txn)
295 .await?;
296
297 let table_model: table::ActiveModel = table.clone().into();
298 Table::insert(table_model).exec(&txn).await?;
299 let index_model: index::ActiveModel = index.clone().into();
300 Index::insert(index_model).exec(&txn).await?;
301 }
302 StreamingJob::Source(src) => {
303 let job_id = Self::create_streaming_job_obj(
304 &txn,
305 ObjectType::Source,
306 src.owner as _,
307 Some(src.database_id as _),
308 Some(src.schema_id as _),
309 create_type,
310 ctx.timezone.clone(),
311 streaming_parallelism,
312 max_parallelism,
313 specific_resource_group,
314 )
315 .await?;
316 src.id = job_id as _;
317 let source_model: source::ActiveModel = src.clone().into();
318 Source::insert(source_model).exec(&txn).await?;
319 }
320 }
321
322 dependencies.extend(
324 streaming_job
325 .dependent_secret_ids()?
326 .into_iter()
327 .map(|secret_id| secret_id as ObjectId),
328 );
329 dependencies.extend(
331 streaming_job
332 .dependent_connection_ids()?
333 .into_iter()
334 .map(|conn_id| conn_id as ObjectId),
335 );
336
337 if !dependencies.is_empty() {
339 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
340 object_dependency::ActiveModel {
341 oid: Set(oid),
342 used_by: Set(streaming_job.id() as _),
343 ..Default::default()
344 }
345 }))
346 .exec(&txn)
347 .await?;
348 }
349
350 txn.commit().await?;
351
352 Ok(())
353 }
354
355 pub async fn create_internal_table_catalog(
363 &self,
364 job: &StreamingJob,
365 mut incomplete_internal_tables: Vec<PbTable>,
366 ) -> MetaResult<HashMap<u32, u32>> {
367 let job_id = job.id() as ObjectId;
368 let inner = self.inner.write().await;
369 let txn = inner.db.begin().await?;
370
371 ensure_job_not_canceled(job_id, &txn).await?;
373
374 let mut table_id_map = HashMap::new();
375 for table in &mut incomplete_internal_tables {
376 let table_id = Self::create_object(
377 &txn,
378 ObjectType::Table,
379 table.owner as _,
380 Some(table.database_id as _),
381 Some(table.schema_id as _),
382 )
383 .await?
384 .oid;
385 table_id_map.insert(table.id, table_id as u32);
386 table.id = table_id as _;
387 table.job_id = Some(job_id as _);
388
389 let table_model = table::ActiveModel {
390 table_id: Set(table_id as _),
391 belongs_to_job_id: Set(Some(job_id)),
392 fragment_id: NotSet,
393 ..table.clone().into()
394 };
395 Table::insert(table_model).exec(&txn).await?;
396 }
397 txn.commit().await?;
398
399 Ok(table_id_map)
400 }
401
402 pub async fn prepare_stream_job_fragments(
403 &self,
404 stream_job_fragments: &StreamJobFragmentsToCreate,
405 streaming_job: &StreamingJob,
406 for_replace: bool,
407 ) -> MetaResult<()> {
408 self.prepare_streaming_job(
409 stream_job_fragments.stream_job_id().table_id as _,
410 || stream_job_fragments.fragments.values(),
411 &stream_job_fragments.actor_status,
412 &stream_job_fragments.actor_splits,
413 &stream_job_fragments.downstreams,
414 for_replace,
415 Some(streaming_job),
416 )
417 .await
418 }
419
420 #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
425 )]
426 pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
427 &self,
428 job_id: ObjectId,
429 get_fragments: impl Fn() -> I + 'a,
430 actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
431 actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
432 downstreams: &FragmentDownstreamRelation,
433 for_replace: bool,
434 creating_streaming_job: Option<&'a StreamingJob>,
435 ) -> MetaResult<()> {
436 let fragment_actors = Self::extract_fragment_and_actors_from_fragments(
437 job_id,
438 get_fragments(),
439 actor_status,
440 actor_splits,
441 )?;
442 let inner = self.inner.write().await;
443
444 let need_notify = creating_streaming_job
445 .map(|job| job.should_notify_creating())
446 .unwrap_or(false);
447 let definition = creating_streaming_job.map(|job| job.definition());
448
449 let mut objects_to_notify = vec![];
450 let txn = inner.db.begin().await?;
451
452 ensure_job_not_canceled(job_id, &txn).await?;
454
455 let (fragments, actors): (Vec<_>, Vec<_>) = fragment_actors.into_iter().unzip();
457 for fragment in fragments {
458 let fragment_id = fragment.fragment_id;
459 let state_table_ids = fragment.state_table_ids.inner_ref().clone();
460
461 let fragment = fragment.into_active_model();
462 Fragment::insert(fragment).exec(&txn).await?;
463
464 if !for_replace {
467 let all_tables = StreamJobFragments::collect_tables(get_fragments());
468 for state_table_id in state_table_ids {
469 let table = all_tables
473 .get(&(state_table_id as u32))
474 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
475 assert_eq!(table.id, state_table_id as u32);
476 assert_eq!(table.fragment_id, fragment_id as u32);
477 let vnode_count = table.vnode_count();
478
479 table::ActiveModel {
480 table_id: Set(state_table_id as _),
481 fragment_id: Set(Some(fragment_id)),
482 vnode_count: Set(vnode_count as _),
483 ..Default::default()
484 }
485 .update(&txn)
486 .await?;
487
488 if need_notify {
489 let mut table = table.clone();
490 if cfg!(not(debug_assertions)) && table.id == job_id as u32 {
492 table.definition = definition.clone().unwrap();
493 }
494 objects_to_notify.push(PbObject {
495 object_info: Some(PbObjectInfo::Table(table)),
496 });
497 }
498 }
499 }
500 }
501
502 if need_notify {
504 match creating_streaming_job.unwrap() {
505 StreamingJob::Sink(sink) => {
506 objects_to_notify.push(PbObject {
507 object_info: Some(PbObjectInfo::Sink(sink.clone())),
508 });
509 }
510 StreamingJob::Index(index, _) => {
511 objects_to_notify.push(PbObject {
512 object_info: Some(PbObjectInfo::Index(index.clone())),
513 });
514 }
515 _ => {}
516 }
517 }
518
519 insert_fragment_relations(&txn, downstreams).await?;
520
521 for actors in actors {
523 for actor in actors {
524 let actor = actor.into_active_model();
525 Actor::insert(actor).exec(&txn).await?;
526 }
527 }
528
529 if !for_replace {
530 if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
532 Table::update(table::ActiveModel {
533 table_id: Set(table.id as _),
534 dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)),
535 ..Default::default()
536 })
537 .exec(&txn)
538 .await?;
539 }
540 }
541
542 txn.commit().await?;
543
544 if !objects_to_notify.is_empty() {
548 self.notify_frontend(
549 Operation::Add,
550 Info::ObjectGroup(PbObjectGroup {
551 objects: objects_to_notify,
552 }),
553 )
554 .await;
555 }
556
557 Ok(())
558 }
559
560 pub async fn build_cancel_command(
563 &self,
564 table_fragments: &StreamJobFragments,
565 ) -> MetaResult<Command> {
566 let inner = self.inner.read().await;
567 let txn = inner.db.begin().await?;
568
569 let dropped_sink_fragment_with_target =
570 if let Some(sink_fragment) = table_fragments.sink_fragment() {
571 let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
572 let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
573 sink_target_fragment
574 .get(&sink_fragment_id)
575 .map(|target_fragments| {
576 let target_fragment_id = *target_fragments
577 .first()
578 .expect("sink should have at least one downstream fragment");
579 (sink_fragment_id, target_fragment_id)
580 })
581 } else {
582 None
583 };
584
585 Ok(Command::DropStreamingJobs {
586 streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
587 actors: table_fragments.actor_ids(),
588 unregistered_state_table_ids: table_fragments
589 .all_table_ids()
590 .map(catalog::TableId::new)
591 .collect(),
592 unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
593 dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
594 .into_iter()
595 .map(|(sink, target)| (target as _, vec![sink as _]))
596 .collect(),
597 })
598 }
599
600 #[await_tree::instrument]
604 pub async fn try_abort_creating_streaming_job(
605 &self,
606 job_id: ObjectId,
607 is_cancelled: bool,
608 ) -> MetaResult<(bool, Option<DatabaseId>)> {
609 let mut inner = self.inner.write().await;
610 let txn = inner.db.begin().await?;
611
612 let obj = Object::find_by_id(job_id).one(&txn).await?;
613 let Some(obj) = obj else {
614 tracing::warn!(
615 id = job_id,
616 "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
617 );
618 return Ok((true, None));
619 };
620 let database_id = obj
621 .database_id
622 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
623 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
624
625 if !is_cancelled && let Some(streaming_job) = &streaming_job {
626 assert_ne!(streaming_job.job_status, JobStatus::Created);
627 if streaming_job.create_type == CreateType::Background
628 && streaming_job.job_status == JobStatus::Creating
629 {
630 tracing::warn!(
632 id = job_id,
633 "streaming job is created in background and still in creating status"
634 );
635 return Ok((false, Some(database_id)));
636 }
637 }
638
639 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
640
641 let mut objs = vec![];
643 let table_obj = Table::find_by_id(job_id).one(&txn).await?;
644
645 let mut need_notify =
646 streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
647 if !need_notify {
648 if let Some(table) = &table_obj {
650 need_notify = table.table_type == TableType::MaterializedView;
651 }
652 }
653
654 let dropped_tables = Table::find()
655 .find_also_related(Object)
656 .filter(
657 table::Column::TableId.is_in(
658 internal_table_ids
659 .iter()
660 .cloned()
661 .chain(table_obj.iter().map(|t| t.table_id as _)),
662 ),
663 )
664 .all(&txn)
665 .await?
666 .into_iter()
667 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
668 inner
669 .dropped_tables
670 .extend(dropped_tables.map(|t| (TableId::try_from(t.id).unwrap(), t)));
671
672 if need_notify {
673 let obj: Option<PartialObject> = Object::find_by_id(job_id)
674 .select_only()
675 .columns([
676 object::Column::Oid,
677 object::Column::ObjType,
678 object::Column::SchemaId,
679 object::Column::DatabaseId,
680 ])
681 .into_partial_model()
682 .one(&txn)
683 .await?;
684 let obj =
685 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
686 objs.push(obj);
687 let internal_table_objs: Vec<PartialObject> = Object::find()
688 .select_only()
689 .columns([
690 object::Column::Oid,
691 object::Column::ObjType,
692 object::Column::SchemaId,
693 object::Column::DatabaseId,
694 ])
695 .join(JoinType::InnerJoin, object::Relation::Table.def())
696 .filter(table::Column::BelongsToJobId.eq(job_id))
697 .into_partial_model()
698 .all(&txn)
699 .await?;
700 objs.extend(internal_table_objs);
701 }
702
703 if table_obj.is_none()
705 && let Some(Some(target_table_id)) = Sink::find_by_id(job_id)
706 .select_only()
707 .column(sink::Column::TargetTable)
708 .into_tuple::<Option<TableId>>()
709 .one(&txn)
710 .await?
711 {
712 let tmp_id: Option<ObjectId> = ObjectDependency::find()
713 .select_only()
714 .column(object_dependency::Column::UsedBy)
715 .join(
716 JoinType::InnerJoin,
717 object_dependency::Relation::Object1.def(),
718 )
719 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
720 .filter(
721 object_dependency::Column::Oid
722 .eq(target_table_id)
723 .and(object::Column::ObjType.eq(ObjectType::Table))
724 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
725 )
726 .into_tuple()
727 .one(&txn)
728 .await?;
729 if let Some(tmp_id) = tmp_id {
730 tracing::warn!(
731 id = tmp_id,
732 "aborting temp streaming job for sink into table"
733 );
734 Object::delete_by_id(tmp_id).exec(&txn).await?;
735 }
736 }
737
738 Object::delete_by_id(job_id).exec(&txn).await?;
739 if !internal_table_ids.is_empty() {
740 Object::delete_many()
741 .filter(object::Column::Oid.is_in(internal_table_ids))
742 .exec(&txn)
743 .await?;
744 }
745 if let Some(t) = &table_obj
746 && let Some(source_id) = t.optional_associated_source_id
747 {
748 Object::delete_by_id(source_id).exec(&txn).await?;
749 }
750
751 let err = if is_cancelled {
752 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
753 } else {
754 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
755 };
756 let abort_reason = format!("streaming job aborted {}", err.as_report());
757 for tx in inner
758 .creating_table_finish_notifier
759 .get_mut(&database_id)
760 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
761 .into_iter()
762 .flatten()
763 .flatten()
764 {
765 let _ = tx.send(Err(abort_reason.clone()));
766 }
767 txn.commit().await?;
768
769 if !objs.is_empty() {
770 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
773 .await;
774 }
775 Ok((true, Some(database_id)))
776 }
777
778 #[await_tree::instrument]
779 pub async fn post_collect_job_fragments(
780 &self,
781 job_id: ObjectId,
782 actor_ids: Vec<crate::model::ActorId>,
783 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
784 split_assignment: &SplitAssignment,
785 new_sink_downstream: Option<FragmentDownstreamRelation>,
786 ) -> MetaResult<()> {
787 let inner = self.inner.write().await;
788 let txn = inner.db.begin().await?;
789
790 let actor_ids = actor_ids.into_iter().map(|id| id as ActorId).collect_vec();
791
792 Actor::update_many()
793 .col_expr(
794 actor::Column::Status,
795 SimpleExpr::from(ActorStatus::Running.into_value()),
796 )
797 .filter(actor::Column::ActorId.is_in(actor_ids))
798 .exec(&txn)
799 .await?;
800
801 for splits in split_assignment.values() {
802 for (actor_id, splits) in splits {
803 let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
804 let connector_splits = &PbConnectorSplits { splits };
805 actor::ActiveModel {
806 actor_id: Set(*actor_id as _),
807 splits: Set(Some(connector_splits.into())),
808 ..Default::default()
809 }
810 .update(&txn)
811 .await?;
812 }
813 }
814
815 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
816
817 if let Some(new_downstream) = new_sink_downstream {
818 insert_fragment_relations(&txn, &new_downstream).await?;
819 }
820
821 streaming_job::ActiveModel {
823 job_id: Set(job_id),
824 job_status: Set(JobStatus::Creating),
825 ..Default::default()
826 }
827 .update(&txn)
828 .await?;
829
830 txn.commit().await?;
831
832 Ok(())
833 }
834
835 pub async fn create_job_catalog_for_replace(
836 &self,
837 streaming_job: &StreamingJob,
838 ctx: Option<&StreamContext>,
839 specified_parallelism: Option<&NonZeroUsize>,
840 expected_original_max_parallelism: Option<usize>,
841 ) -> MetaResult<ObjectId> {
842 let id = streaming_job.id();
843 let inner = self.inner.write().await;
844 let txn = inner.db.begin().await?;
845
846 streaming_job.verify_version_for_replace(&txn).await?;
848 let referring_cnt = ObjectDependency::find()
850 .join(
851 JoinType::InnerJoin,
852 object_dependency::Relation::Object1.def(),
853 )
854 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
855 .filter(
856 object_dependency::Column::Oid
857 .eq(id as ObjectId)
858 .and(object::Column::ObjType.eq(ObjectType::Table))
859 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
860 )
861 .count(&txn)
862 .await?;
863 if referring_cnt != 0 {
864 return Err(MetaError::permission_denied(
865 "job is being altered or referenced by some creating jobs",
866 ));
867 }
868
869 let (original_max_parallelism, original_timezone): (i32, Option<String>) =
871 StreamingJobModel::find_by_id(id as ObjectId)
872 .select_only()
873 .column(streaming_job::Column::MaxParallelism)
874 .column(streaming_job::Column::Timezone)
875 .into_tuple()
876 .one(&txn)
877 .await?
878 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
879
880 if let Some(max_parallelism) = expected_original_max_parallelism
881 && original_max_parallelism != max_parallelism as i32
882 {
883 bail!(
886 "cannot use a different max parallelism \
887 when replacing streaming job, \
888 original: {}, new: {}",
889 original_max_parallelism,
890 max_parallelism
891 );
892 }
893
894 let parallelism = match specified_parallelism {
895 None => StreamingParallelism::Adaptive,
896 Some(n) => StreamingParallelism::Fixed(n.get() as _),
897 };
898 let timezone = ctx
899 .map(|ctx| ctx.timezone.clone())
900 .unwrap_or(original_timezone);
901
902 let new_obj_id = Self::create_streaming_job_obj(
904 &txn,
905 streaming_job.object_type(),
906 streaming_job.owner() as _,
907 Some(streaming_job.database_id() as _),
908 Some(streaming_job.schema_id() as _),
909 streaming_job.create_type(),
910 timezone,
911 parallelism,
912 original_max_parallelism as _,
913 None,
914 )
915 .await?;
916
917 ObjectDependency::insert(object_dependency::ActiveModel {
919 oid: Set(id as _),
920 used_by: Set(new_obj_id as _),
921 ..Default::default()
922 })
923 .exec(&txn)
924 .await?;
925
926 txn.commit().await?;
927
928 Ok(new_obj_id)
929 }
930
931 pub async fn finish_streaming_job(&self, job_id: ObjectId) -> MetaResult<()> {
933 let mut inner = self.inner.write().await;
934 let txn = inner.db.begin().await?;
935
936 let job_type = Object::find_by_id(job_id)
937 .select_only()
938 .column(object::Column::ObjType)
939 .into_tuple()
940 .one(&txn)
941 .await?
942 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
943
944 let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
945 .select_only()
946 .column(streaming_job::Column::CreateType)
947 .into_tuple()
948 .one(&txn)
949 .await?
950 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
951
952 let res = Object::update_many()
954 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
955 .col_expr(
956 object::Column::CreatedAtClusterVersion,
957 current_cluster_version().into(),
958 )
959 .filter(object::Column::Oid.eq(job_id))
960 .exec(&txn)
961 .await?;
962 if res.rows_affected == 0 {
963 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
964 }
965
966 let job = streaming_job::ActiveModel {
968 job_id: Set(job_id),
969 job_status: Set(JobStatus::Created),
970 ..Default::default()
971 };
972 job.update(&txn).await?;
973
974 let internal_table_objs = Table::find()
976 .find_also_related(Object)
977 .filter(table::Column::BelongsToJobId.eq(job_id))
978 .all(&txn)
979 .await?;
980 let mut objects = internal_table_objs
981 .iter()
982 .map(|(table, obj)| PbObject {
983 object_info: Some(PbObjectInfo::Table(
984 ObjectModel(table.clone(), obj.clone().unwrap()).into(),
985 )),
986 })
987 .collect_vec();
988 let mut notification_op = if create_type == CreateType::Background {
989 NotificationOperation::Update
990 } else {
991 NotificationOperation::Add
992 };
993 let mut updated_user_info = vec![];
994
995 match job_type {
996 ObjectType::Table => {
997 let (table, obj) = Table::find_by_id(job_id)
998 .find_also_related(Object)
999 .one(&txn)
1000 .await?
1001 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1002 if table.table_type == TableType::MaterializedView {
1003 notification_op = NotificationOperation::Update;
1004 }
1005
1006 if let Some(source_id) = table.optional_associated_source_id {
1007 let (src, obj) = Source::find_by_id(source_id)
1008 .find_also_related(Object)
1009 .one(&txn)
1010 .await?
1011 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1012 objects.push(PbObject {
1013 object_info: Some(PbObjectInfo::Source(
1014 ObjectModel(src, obj.unwrap()).into(),
1015 )),
1016 });
1017 }
1018 objects.push(PbObject {
1019 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1020 });
1021 }
1022 ObjectType::Sink => {
1023 let (sink, obj) = Sink::find_by_id(job_id)
1024 .find_also_related(Object)
1025 .one(&txn)
1026 .await?
1027 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1028 objects.push(PbObject {
1029 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
1030 });
1031 }
1032 ObjectType::Index => {
1033 let (index, obj) = Index::find_by_id(job_id)
1034 .find_also_related(Object)
1035 .one(&txn)
1036 .await?
1037 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1038 {
1039 let (table, obj) = Table::find_by_id(index.index_table_id)
1040 .find_also_related(Object)
1041 .one(&txn)
1042 .await?
1043 .ok_or_else(|| {
1044 MetaError::catalog_id_not_found("table", index.index_table_id)
1045 })?;
1046 objects.push(PbObject {
1047 object_info: Some(PbObjectInfo::Table(
1048 ObjectModel(table, obj.unwrap()).into(),
1049 )),
1050 });
1051 }
1052
1053 let primary_table_privileges = UserPrivilege::find()
1056 .filter(
1057 user_privilege::Column::Oid
1058 .eq(index.primary_table_id)
1059 .and(user_privilege::Column::Action.eq(Action::Select)),
1060 )
1061 .all(&txn)
1062 .await?;
1063 if !primary_table_privileges.is_empty() {
1064 let index_state_table_ids: Vec<TableId> = Table::find()
1065 .select_only()
1066 .column(table::Column::TableId)
1067 .filter(
1068 table::Column::BelongsToJobId
1069 .eq(job_id)
1070 .or(table::Column::TableId.eq(index.index_table_id)),
1071 )
1072 .into_tuple()
1073 .all(&txn)
1074 .await?;
1075 let mut new_privileges = vec![];
1076 for privilege in &primary_table_privileges {
1077 for state_table_id in &index_state_table_ids {
1078 new_privileges.push(user_privilege::ActiveModel {
1079 id: Default::default(),
1080 oid: Set(*state_table_id),
1081 user_id: Set(privilege.user_id),
1082 action: Set(Action::Select),
1083 dependent_id: Set(privilege.dependent_id),
1084 granted_by: Set(privilege.granted_by),
1085 with_grant_option: Set(privilege.with_grant_option),
1086 });
1087 }
1088 }
1089 UserPrivilege::insert_many(new_privileges)
1090 .exec(&txn)
1091 .await?;
1092
1093 updated_user_info = list_user_info_by_ids(
1094 primary_table_privileges.into_iter().map(|p| p.user_id),
1095 &txn,
1096 )
1097 .await?;
1098 }
1099
1100 objects.push(PbObject {
1101 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1102 });
1103 }
1104 ObjectType::Source => {
1105 let (source, obj) = Source::find_by_id(job_id)
1106 .find_also_related(Object)
1107 .one(&txn)
1108 .await?
1109 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1110 objects.push(PbObject {
1111 object_info: Some(PbObjectInfo::Source(
1112 ObjectModel(source, obj.unwrap()).into(),
1113 )),
1114 });
1115 }
1116 _ => unreachable!("invalid job type: {:?}", job_type),
1117 }
1118
1119 if job_type != ObjectType::Index {
1120 updated_user_info = grant_default_privileges_automatically(&txn, job_id).await?;
1121 }
1122 txn.commit().await?;
1123
1124 let mut version = self
1125 .notify_frontend(
1126 notification_op,
1127 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1128 )
1129 .await;
1130
1131 if !updated_user_info.is_empty() {
1133 version = self.notify_users_update(updated_user_info).await;
1134 }
1135
1136 inner
1137 .creating_table_finish_notifier
1138 .values_mut()
1139 .for_each(|creating_tables| {
1140 if let Some(txs) = creating_tables.remove(&job_id) {
1141 for tx in txs {
1142 let _ = tx.send(Ok(version));
1143 }
1144 }
1145 });
1146
1147 Ok(())
1148 }
1149
1150 pub async fn finish_replace_streaming_job(
1151 &self,
1152 tmp_id: ObjectId,
1153 streaming_job: StreamingJob,
1154 replace_upstream: FragmentReplaceUpstream,
1155 sink_into_table_context: SinkIntoTableContext,
1156 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1157 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1158 ) -> MetaResult<NotificationVersion> {
1159 let inner = self.inner.write().await;
1160 let txn = inner.db.begin().await?;
1161
1162 let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1163 tmp_id,
1164 replace_upstream,
1165 sink_into_table_context,
1166 &txn,
1167 streaming_job,
1168 drop_table_connector_ctx,
1169 auto_refresh_schema_sinks,
1170 )
1171 .await?;
1172
1173 txn.commit().await?;
1174
1175 let mut version = self
1176 .notify_frontend(
1177 NotificationOperation::Update,
1178 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1179 )
1180 .await;
1181
1182 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1183 self.notify_users_update(user_infos).await;
1184 version = self
1185 .notify_frontend(
1186 NotificationOperation::Delete,
1187 build_object_group_for_delete(to_drop_objects),
1188 )
1189 .await;
1190 }
1191
1192 Ok(version)
1193 }
1194
1195 pub async fn finish_replace_streaming_job_inner(
1196 tmp_id: ObjectId,
1197 replace_upstream: FragmentReplaceUpstream,
1198 SinkIntoTableContext {
1199 updated_sink_catalogs,
1200 }: SinkIntoTableContext,
1201 txn: &DatabaseTransaction,
1202 streaming_job: StreamingJob,
1203 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1204 auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1205 ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1206 let original_job_id = streaming_job.id() as ObjectId;
1207 let job_type = streaming_job.job_type();
1208
1209 let mut index_item_rewriter = None;
1210
1211 match streaming_job {
1213 StreamingJob::Table(_source, table, _table_job_type) => {
1214 let original_column_catalogs = get_table_columns(txn, original_job_id).await?;
1217
1218 index_item_rewriter = Some({
1219 let original_columns = original_column_catalogs
1220 .to_protobuf()
1221 .into_iter()
1222 .map(|c| c.column_desc.unwrap())
1223 .collect_vec();
1224 let new_columns = table
1225 .columns
1226 .iter()
1227 .map(|c| c.column_desc.clone().unwrap())
1228 .collect_vec();
1229
1230 IndexItemRewriter {
1231 original_columns,
1232 new_columns,
1233 }
1234 });
1235
1236 for sink_id in updated_sink_catalogs {
1238 sink::ActiveModel {
1239 sink_id: Set(sink_id as _),
1240 original_target_columns: Set(Some(original_column_catalogs.clone())),
1241 ..Default::default()
1242 }
1243 .update(txn)
1244 .await?;
1245 }
1246 let mut table = table::ActiveModel::from(table);
1248 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1249 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1250 {
1251 table.optional_associated_source_id = Set(None);
1253 }
1254
1255 table.update(txn).await?;
1256 }
1257 StreamingJob::Source(source) => {
1258 let source = source::ActiveModel::from(source);
1260 source.update(txn).await?;
1261 }
1262 StreamingJob::MaterializedView(table) => {
1263 let table = table::ActiveModel::from(table);
1265 table.update(txn).await?;
1266 }
1267 _ => unreachable!(
1268 "invalid streaming job type: {:?}",
1269 streaming_job.job_type_str()
1270 ),
1271 }
1272
1273 async fn finish_fragments(
1274 txn: &DatabaseTransaction,
1275 tmp_id: ObjectId,
1276 original_job_id: ObjectId,
1277 replace_upstream: FragmentReplaceUpstream,
1278 ) -> MetaResult<()> {
1279 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1283 .select_only()
1284 .columns([
1285 fragment::Column::FragmentId,
1286 fragment::Column::StateTableIds,
1287 ])
1288 .filter(fragment::Column::JobId.eq(tmp_id))
1289 .into_tuple()
1290 .all(txn)
1291 .await?;
1292 for (fragment_id, state_table_ids) in fragment_info {
1293 for state_table_id in state_table_ids.into_inner() {
1294 table::ActiveModel {
1295 table_id: Set(state_table_id as _),
1296 fragment_id: Set(Some(fragment_id)),
1297 ..Default::default()
1299 }
1300 .update(txn)
1301 .await?;
1302 }
1303 }
1304
1305 Fragment::delete_many()
1307 .filter(fragment::Column::JobId.eq(original_job_id))
1308 .exec(txn)
1309 .await?;
1310 Fragment::update_many()
1311 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1312 .filter(fragment::Column::JobId.eq(tmp_id))
1313 .exec(txn)
1314 .await?;
1315
1316 for (fragment_id, fragment_replace_map) in replace_upstream {
1319 let (fragment_id, mut stream_node) =
1320 Fragment::find_by_id(fragment_id as FragmentId)
1321 .select_only()
1322 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1323 .into_tuple::<(FragmentId, StreamNode)>()
1324 .one(txn)
1325 .await?
1326 .map(|(id, node)| (id, node.to_protobuf()))
1327 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1328
1329 visit_stream_node_mut(&mut stream_node, |body| {
1330 if let PbNodeBody::Merge(m) = body
1331 && let Some(new_fragment_id) =
1332 fragment_replace_map.get(&m.upstream_fragment_id)
1333 {
1334 m.upstream_fragment_id = *new_fragment_id;
1335 }
1336 });
1337 fragment::ActiveModel {
1338 fragment_id: Set(fragment_id),
1339 stream_node: Set(StreamNode::from(&stream_node)),
1340 ..Default::default()
1341 }
1342 .update(txn)
1343 .await?;
1344 }
1345
1346 Object::delete_by_id(tmp_id).exec(txn).await?;
1348
1349 Ok(())
1350 }
1351
1352 finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1353
1354 let mut objects = vec![];
1356 match job_type {
1357 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1358 let (table, table_obj) = Table::find_by_id(original_job_id)
1359 .find_also_related(Object)
1360 .one(txn)
1361 .await?
1362 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1363 objects.push(PbObject {
1364 object_info: Some(PbObjectInfo::Table(
1365 ObjectModel(table, table_obj.unwrap()).into(),
1366 )),
1367 })
1368 }
1369 StreamingJobType::Source => {
1370 let (source, source_obj) = Source::find_by_id(original_job_id)
1371 .find_also_related(Object)
1372 .one(txn)
1373 .await?
1374 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1375 objects.push(PbObject {
1376 object_info: Some(PbObjectInfo::Source(
1377 ObjectModel(source, source_obj.unwrap()).into(),
1378 )),
1379 })
1380 }
1381 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1382 }
1383
1384 if let Some(expr_rewriter) = index_item_rewriter {
1385 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1386 .select_only()
1387 .columns([index::Column::IndexId, index::Column::IndexItems])
1388 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1389 .into_tuple()
1390 .all(txn)
1391 .await?;
1392 for (index_id, nodes) in index_items {
1393 let mut pb_nodes = nodes.to_protobuf();
1394 pb_nodes
1395 .iter_mut()
1396 .for_each(|x| expr_rewriter.rewrite_expr(x));
1397 let index = index::ActiveModel {
1398 index_id: Set(index_id),
1399 index_items: Set(pb_nodes.into()),
1400 ..Default::default()
1401 }
1402 .update(txn)
1403 .await?;
1404 let index_obj = index
1405 .find_related(Object)
1406 .one(txn)
1407 .await?
1408 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1409 objects.push(PbObject {
1410 object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1411 });
1412 }
1413 }
1414
1415 if let Some(sinks) = auto_refresh_schema_sinks {
1416 for finish_sink_context in sinks {
1417 finish_fragments(
1418 txn,
1419 finish_sink_context.tmp_sink_id,
1420 finish_sink_context.original_sink_id,
1421 Default::default(),
1422 )
1423 .await?;
1424 let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1425 .find_also_related(Object)
1426 .one(txn)
1427 .await?
1428 .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1429 let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1430 Sink::update(sink::ActiveModel {
1431 sink_id: Set(finish_sink_context.original_sink_id),
1432 columns: Set(columns.clone()),
1433 ..Default::default()
1434 })
1435 .exec(txn)
1436 .await?;
1437 sink.columns = columns;
1438 objects.push(PbObject {
1439 object_info: Some(PbObjectInfo::Sink(
1440 ObjectModel(sink, sink_obj.unwrap()).into(),
1441 )),
1442 });
1443 if let Some((log_store_table_id, new_log_store_table_columns)) =
1444 finish_sink_context.new_log_store_table
1445 {
1446 let new_log_store_table_columns: ColumnCatalogArray =
1447 new_log_store_table_columns.into();
1448 let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1449 .find_also_related(Object)
1450 .one(txn)
1451 .await?
1452 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1453 Table::update(table::ActiveModel {
1454 table_id: Set(log_store_table_id),
1455 columns: Set(new_log_store_table_columns.clone()),
1456 ..Default::default()
1457 })
1458 .exec(txn)
1459 .await?;
1460 table.columns = new_log_store_table_columns;
1461 objects.push(PbObject {
1462 object_info: Some(PbObjectInfo::Table(
1463 ObjectModel(table, table_obj.unwrap()).into(),
1464 )),
1465 });
1466 }
1467 }
1468 }
1469
1470 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1471 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1472 notification_objs =
1473 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1474 }
1475
1476 Ok((objects, notification_objs))
1477 }
1478
1479 pub async fn try_abort_replacing_streaming_job(
1481 &self,
1482 tmp_job_id: ObjectId,
1483 tmp_sink_ids: Option<Vec<ObjectId>>,
1484 ) -> MetaResult<()> {
1485 let inner = self.inner.write().await;
1486 let txn = inner.db.begin().await?;
1487 Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1488 if let Some(tmp_sink_ids) = tmp_sink_ids {
1489 for tmp_sink_id in tmp_sink_ids {
1490 Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1491 }
1492 }
1493 txn.commit().await?;
1494 Ok(())
1495 }
1496
1497 pub async fn update_source_rate_limit_by_source_id(
1500 &self,
1501 source_id: SourceId,
1502 rate_limit: Option<u32>,
1503 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1504 let inner = self.inner.read().await;
1505 let txn = inner.db.begin().await?;
1506
1507 {
1508 let active_source = source::ActiveModel {
1509 source_id: Set(source_id),
1510 rate_limit: Set(rate_limit.map(|v| v as i32)),
1511 ..Default::default()
1512 };
1513 active_source.update(&txn).await?;
1514 }
1515
1516 let (source, obj) = Source::find_by_id(source_id)
1517 .find_also_related(Object)
1518 .one(&txn)
1519 .await?
1520 .ok_or_else(|| {
1521 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1522 })?;
1523
1524 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1525 let streaming_job_ids: Vec<ObjectId> =
1526 if let Some(table_id) = source.optional_associated_table_id {
1527 vec![table_id]
1528 } else if let Some(source_info) = &source.source_info
1529 && source_info.to_protobuf().is_shared()
1530 {
1531 vec![source_id]
1532 } else {
1533 ObjectDependency::find()
1534 .select_only()
1535 .column(object_dependency::Column::UsedBy)
1536 .filter(object_dependency::Column::Oid.eq(source_id))
1537 .into_tuple()
1538 .all(&txn)
1539 .await?
1540 };
1541
1542 if streaming_job_ids.is_empty() {
1543 return Err(MetaError::invalid_parameter(format!(
1544 "source id {source_id} not used by any streaming job"
1545 )));
1546 }
1547
1548 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1549 .select_only()
1550 .columns([
1551 fragment::Column::FragmentId,
1552 fragment::Column::FragmentTypeMask,
1553 fragment::Column::StreamNode,
1554 ])
1555 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1556 .into_tuple()
1557 .all(&txn)
1558 .await?;
1559 let mut fragments = fragments
1560 .into_iter()
1561 .map(|(id, mask, stream_node)| {
1562 (
1563 id,
1564 FragmentTypeMask::from(mask as u32),
1565 stream_node.to_protobuf(),
1566 )
1567 })
1568 .collect_vec();
1569
1570 fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1571 let mut found = false;
1572 if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1573 visit_stream_node_mut(stream_node, |node| {
1574 if let PbNodeBody::Source(node) = node
1575 && let Some(node_inner) = &mut node.source_inner
1576 && node_inner.source_id == source_id as u32
1577 {
1578 node_inner.rate_limit = rate_limit;
1579 found = true;
1580 }
1581 });
1582 }
1583 if is_fs_source {
1584 visit_stream_node_mut(stream_node, |node| {
1587 if let PbNodeBody::StreamFsFetch(node) = node {
1588 fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1589 if let Some(node_inner) = &mut node.node_inner
1590 && node_inner.source_id == source_id as u32
1591 {
1592 node_inner.rate_limit = rate_limit;
1593 found = true;
1594 }
1595 }
1596 });
1597 }
1598 found
1599 });
1600
1601 assert!(
1602 !fragments.is_empty(),
1603 "source id should be used by at least one fragment"
1604 );
1605 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1606 for (id, fragment_type_mask, stream_node) in fragments {
1607 fragment::ActiveModel {
1608 fragment_id: Set(id),
1609 fragment_type_mask: Set(fragment_type_mask.into()),
1610 stream_node: Set(StreamNode::from(&stream_node)),
1611 ..Default::default()
1612 }
1613 .update(&txn)
1614 .await?;
1615 }
1616 let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1617
1618 txn.commit().await?;
1619
1620 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1621 let _version = self
1622 .notify_frontend(
1623 NotificationOperation::Update,
1624 NotificationInfo::ObjectGroup(PbObjectGroup {
1625 objects: vec![PbObject {
1626 object_info: Some(relation_info),
1627 }],
1628 }),
1629 )
1630 .await;
1631
1632 Ok(fragment_actors)
1633 }
1634
1635 pub async fn mutate_fragments_by_job_id(
1638 &self,
1639 job_id: ObjectId,
1640 mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1642 err_msg: &'static str,
1644 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1645 let inner = self.inner.read().await;
1646 let txn = inner.db.begin().await?;
1647
1648 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1649 .select_only()
1650 .columns([
1651 fragment::Column::FragmentId,
1652 fragment::Column::FragmentTypeMask,
1653 fragment::Column::StreamNode,
1654 ])
1655 .filter(fragment::Column::JobId.eq(job_id))
1656 .into_tuple()
1657 .all(&txn)
1658 .await?;
1659 let mut fragments = fragments
1660 .into_iter()
1661 .map(|(id, mask, stream_node)| {
1662 (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1663 })
1664 .collect_vec();
1665
1666 let fragments = fragments
1667 .iter_mut()
1668 .map(|(_, fragment_type_mask, stream_node)| {
1669 fragments_mutation_fn(*fragment_type_mask, stream_node)
1670 })
1671 .collect::<MetaResult<Vec<bool>>>()?
1672 .into_iter()
1673 .zip_eq_debug(std::mem::take(&mut fragments))
1674 .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1675 .collect::<Vec<_>>();
1676
1677 if fragments.is_empty() {
1678 return Err(MetaError::invalid_parameter(format!(
1679 "job id {job_id}: {}",
1680 err_msg
1681 )));
1682 }
1683
1684 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1685 for (id, _, stream_node) in fragments {
1686 fragment::ActiveModel {
1687 fragment_id: Set(id),
1688 stream_node: Set(StreamNode::from(&stream_node)),
1689 ..Default::default()
1690 }
1691 .update(&txn)
1692 .await?;
1693 }
1694 let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1695
1696 txn.commit().await?;
1697
1698 Ok(fragment_actors)
1699 }
1700
1701 async fn mutate_fragment_by_fragment_id(
1702 &self,
1703 fragment_id: FragmentId,
1704 mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1705 err_msg: &'static str,
1706 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1707 let inner = self.inner.read().await;
1708 let txn = inner.db.begin().await?;
1709
1710 let (fragment_type_mask, stream_node): (i32, StreamNode) =
1711 Fragment::find_by_id(fragment_id)
1712 .select_only()
1713 .columns([
1714 fragment::Column::FragmentTypeMask,
1715 fragment::Column::StreamNode,
1716 ])
1717 .into_tuple()
1718 .one(&txn)
1719 .await?
1720 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1721 let mut pb_stream_node = stream_node.to_protobuf();
1722 let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1723
1724 if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1725 return Err(MetaError::invalid_parameter(format!(
1726 "fragment id {fragment_id}: {}",
1727 err_msg
1728 )));
1729 }
1730
1731 fragment::ActiveModel {
1732 fragment_id: Set(fragment_id),
1733 stream_node: Set(stream_node),
1734 ..Default::default()
1735 }
1736 .update(&txn)
1737 .await?;
1738
1739 let fragment_actors = get_fragment_actor_ids(&txn, vec![fragment_id]).await?;
1740
1741 txn.commit().await?;
1742
1743 Ok(fragment_actors)
1744 }
1745
1746 pub async fn update_backfill_rate_limit_by_job_id(
1749 &self,
1750 job_id: ObjectId,
1751 rate_limit: Option<u32>,
1752 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1753 let update_backfill_rate_limit =
1754 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1755 let mut found = false;
1756 if fragment_type_mask
1757 .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1758 {
1759 visit_stream_node_mut(stream_node, |node| match node {
1760 PbNodeBody::StreamCdcScan(node) => {
1761 node.rate_limit = rate_limit;
1762 found = true;
1763 }
1764 PbNodeBody::StreamScan(node) => {
1765 node.rate_limit = rate_limit;
1766 found = true;
1767 }
1768 PbNodeBody::SourceBackfill(node) => {
1769 node.rate_limit = rate_limit;
1770 found = true;
1771 }
1772 PbNodeBody::Sink(node) => {
1773 node.rate_limit = rate_limit;
1774 found = true;
1775 }
1776 _ => {}
1777 });
1778 }
1779 Ok(found)
1780 };
1781
1782 self.mutate_fragments_by_job_id(
1783 job_id,
1784 update_backfill_rate_limit,
1785 "stream scan node or source node not found",
1786 )
1787 .await
1788 }
1789
1790 pub async fn update_sink_rate_limit_by_job_id(
1793 &self,
1794 job_id: ObjectId,
1795 rate_limit: Option<u32>,
1796 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1797 let update_sink_rate_limit =
1798 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1799 let mut found = Ok(false);
1800 if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1801 visit_stream_node_mut(stream_node, |node| {
1802 if let PbNodeBody::Sink(node) = node {
1803 if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1804 found = Err(MetaError::invalid_parameter(
1805 "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1806 ));
1807 return;
1808 }
1809 node.rate_limit = rate_limit;
1810 found = Ok(true);
1811 }
1812 });
1813 }
1814 found
1815 };
1816
1817 self.mutate_fragments_by_job_id(job_id, update_sink_rate_limit, "sink node not found")
1818 .await
1819 }
1820
1821 pub async fn update_dml_rate_limit_by_job_id(
1822 &self,
1823 job_id: ObjectId,
1824 rate_limit: Option<u32>,
1825 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1826 let update_dml_rate_limit =
1827 |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1828 let mut found = false;
1829 if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1830 visit_stream_node_mut(stream_node, |node| {
1831 if let PbNodeBody::Dml(node) = node {
1832 node.rate_limit = rate_limit;
1833 found = true;
1834 }
1835 });
1836 }
1837 Ok(found)
1838 };
1839
1840 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1841 .await
1842 }
1843
1844 pub async fn update_source_props_by_source_id(
1845 &self,
1846 source_id: SourceId,
1847 alter_props: BTreeMap<String, String>,
1848 alter_secret_refs: BTreeMap<String, PbSecretRef>,
1849 ) -> MetaResult<WithOptionsSecResolved> {
1850 let inner = self.inner.read().await;
1851 let txn = inner.db.begin().await?;
1852
1853 let (source, _obj) = Source::find_by_id(source_id)
1854 .find_also_related(Object)
1855 .one(&txn)
1856 .await?
1857 .ok_or_else(|| {
1858 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1859 })?;
1860 let connector = source.with_properties.0.get_connector().unwrap();
1861 let is_shared_source = source.is_shared();
1862
1863 let mut dep_source_job_ids: Vec<ObjectId> = Vec::new();
1864 if !is_shared_source {
1865 dep_source_job_ids = ObjectDependency::find()
1867 .select_only()
1868 .column(object_dependency::Column::UsedBy)
1869 .filter(object_dependency::Column::Oid.eq(source_id))
1870 .into_tuple()
1871 .all(&txn)
1872 .await?;
1873 }
1874
1875 let prop_keys: Vec<String> = alter_props
1877 .keys()
1878 .chain(alter_secret_refs.keys())
1879 .cloned()
1880 .collect();
1881 risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
1882 &connector, &prop_keys,
1883 )?;
1884
1885 let mut options_with_secret = WithOptionsSecResolved::new(
1886 source.with_properties.0.clone(),
1887 source
1888 .secret_ref
1889 .map(|secret_ref| secret_ref.to_protobuf())
1890 .unwrap_or_default(),
1891 );
1892 let (to_add_secret_dep, to_remove_secret_dep) =
1893 options_with_secret.handle_update(alter_props, alter_secret_refs)?;
1894
1895 tracing::info!(
1896 "applying new properties to source: source_id={}, options_with_secret={:?}",
1897 source_id,
1898 options_with_secret
1899 );
1900 let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
1902 let mut associate_table_id = None;
1905
1906 let mut preferred_id: i32 = source_id;
1910 let rewrite_sql = {
1911 let definition = source.definition.clone();
1912
1913 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1914 .map_err(|e| {
1915 MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
1916 anyhow!(e).context("Failed to parse source definition SQL"),
1917 )))
1918 })?
1919 .try_into()
1920 .unwrap();
1921
1922 async fn format_with_option_secret_resolved(
1936 txn: &DatabaseTransaction,
1937 options_with_secret: &WithOptionsSecResolved,
1938 ) -> MetaResult<Vec<SqlOption>> {
1939 let mut options = Vec::new();
1940 for (k, v) in options_with_secret.as_plaintext() {
1941 let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
1942 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1943 options.push(sql_option);
1944 }
1945 for (k, v) in options_with_secret.as_secret() {
1946 if let Some(secret_model) =
1947 Secret::find_by_id(v.secret_id as i32).one(txn).await?
1948 {
1949 let sql_option =
1950 SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
1951 .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1952 options.push(sql_option);
1953 } else {
1954 return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
1955 }
1956 }
1957 Ok(options)
1958 }
1959
1960 match &mut stmt {
1961 Statement::CreateSource { stmt } => {
1962 stmt.with_properties.0 =
1963 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
1964 }
1965 Statement::CreateTable { with_options, .. } => {
1966 *with_options =
1967 format_with_option_secret_resolved(&txn, &options_with_secret).await?;
1968 associate_table_id = source.optional_associated_table_id;
1969 preferred_id = associate_table_id.unwrap();
1970 }
1971 _ => unreachable!(),
1972 }
1973
1974 stmt.to_string()
1975 };
1976
1977 {
1978 if !to_add_secret_dep.is_empty() {
1980 ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
1981 object_dependency::ActiveModel {
1982 oid: Set(secret_id as _),
1983 used_by: Set(preferred_id as _),
1984 ..Default::default()
1985 }
1986 }))
1987 .exec(&txn)
1988 .await?;
1989 }
1990 if !to_remove_secret_dep.is_empty() {
1991 let _ = ObjectDependency::delete_many()
1993 .filter(
1994 object_dependency::Column::Oid
1995 .is_in(to_remove_secret_dep)
1996 .and(
1997 object_dependency::Column::UsedBy.eq::<ObjectId>(preferred_id as _),
1998 ),
1999 )
2000 .exec(&txn)
2001 .await?;
2002 }
2003 }
2004
2005 let active_source_model = source::ActiveModel {
2006 source_id: Set(source_id),
2007 definition: Set(rewrite_sql.clone()),
2008 with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2009 secret_ref: Set((!options_with_secret.as_secret().is_empty())
2010 .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2011 ..Default::default()
2012 };
2013 active_source_model.update(&txn).await?;
2014
2015 if let Some(associate_table_id) = associate_table_id {
2016 let active_table_model = table::ActiveModel {
2018 table_id: Set(associate_table_id),
2019 definition: Set(rewrite_sql),
2020 ..Default::default()
2021 };
2022 active_table_model.update(&txn).await?;
2023 }
2024
2025 let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2026 associate_table_id
2028 } else {
2029 source_id
2030 }]
2031 .into_iter()
2032 .chain(dep_source_job_ids.into_iter())
2033 .collect_vec();
2034
2035 update_connector_props_fragments(
2037 &txn,
2038 to_check_job_ids,
2039 FragmentTypeFlag::Source,
2040 |node, found| {
2041 if let PbNodeBody::Source(node) = node
2042 && let Some(source_inner) = &mut node.source_inner
2043 {
2044 source_inner.with_properties = options_with_secret.as_plaintext().clone();
2045 source_inner.secret_refs = options_with_secret.as_secret().clone();
2046 *found = true;
2047 }
2048 },
2049 is_shared_source,
2050 )
2051 .await?;
2052
2053 let mut to_update_objs = Vec::with_capacity(2);
2054 let (source, obj) = Source::find_by_id(source_id)
2055 .find_also_related(Object)
2056 .one(&txn)
2057 .await?
2058 .ok_or_else(|| {
2059 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2060 })?;
2061 to_update_objs.push(PbObject {
2062 object_info: Some(PbObjectInfo::Source(
2063 ObjectModel(source, obj.unwrap()).into(),
2064 )),
2065 });
2066
2067 if let Some(associate_table_id) = associate_table_id {
2068 let (table, obj) = Table::find_by_id(associate_table_id)
2069 .find_also_related(Object)
2070 .one(&txn)
2071 .await?
2072 .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2073 to_update_objs.push(PbObject {
2074 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2075 });
2076 }
2077
2078 txn.commit().await?;
2079
2080 self.notify_frontend(
2081 NotificationOperation::Update,
2082 NotificationInfo::ObjectGroup(PbObjectGroup {
2083 objects: to_update_objs,
2084 }),
2085 )
2086 .await;
2087
2088 Ok(options_with_secret)
2089 }
2090
2091 pub async fn update_sink_props_by_sink_id(
2092 &self,
2093 sink_id: SinkId,
2094 props: BTreeMap<String, String>,
2095 ) -> MetaResult<HashMap<String, String>> {
2096 let inner = self.inner.read().await;
2097 let txn = inner.db.begin().await?;
2098
2099 let (sink, _obj) = Sink::find_by_id(sink_id)
2100 .find_also_related(Object)
2101 .one(&txn)
2102 .await?
2103 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2104
2105 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2107 Some(connector) => {
2108 let connector_type = connector.to_lowercase();
2109 let field_names: Vec<String> = props.keys().cloned().collect();
2110 check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2111 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2112
2113 match_sink_name_str!(
2114 connector_type.as_str(),
2115 SinkType,
2116 {
2117 let mut new_props = sink.properties.0.clone();
2118 new_props.extend(props.clone());
2119 SinkType::validate_alter_config(&new_props)
2120 },
2121 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2122 )?
2123 }
2124 None => {
2125 return Err(
2126 SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2127 );
2128 }
2129 };
2130 let definition = sink.definition.clone();
2131 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2132 .map_err(|e| SinkError::Config(anyhow!(e)))?
2133 .try_into()
2134 .unwrap();
2135 if let Statement::CreateSink { stmt } = &mut stmt {
2136 let mut new_sql_options = stmt
2137 .with_properties
2138 .0
2139 .iter()
2140 .map(|sql_option| (&sql_option.name, sql_option))
2141 .collect::<IndexMap<_, _>>();
2142 let add_sql_options = props
2143 .iter()
2144 .map(|(k, v)| SqlOption::try_from((k, v)))
2145 .collect::<Result<Vec<SqlOption>, ParserError>>()
2146 .map_err(|e| SinkError::Config(anyhow!(e)))?;
2147 new_sql_options.extend(
2148 add_sql_options
2149 .iter()
2150 .map(|sql_option| (&sql_option.name, sql_option)),
2151 );
2152 stmt.with_properties.0 = new_sql_options.into_values().cloned().collect();
2153 } else {
2154 panic!("sink definition is not a create sink statement")
2155 }
2156 let mut new_config = sink.properties.clone().into_inner();
2157 new_config.extend(props);
2158
2159 let active_sink = sink::ActiveModel {
2160 sink_id: Set(sink_id),
2161 properties: Set(risingwave_meta_model::Property(new_config.clone())),
2162 definition: Set(stmt.to_string()),
2163 ..Default::default()
2164 };
2165 active_sink.update(&txn).await?;
2166
2167 let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
2168 .select_only()
2169 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
2170 .filter(
2171 fragment::Column::JobId
2172 .eq(sink_id)
2173 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
2174 )
2175 .into_tuple()
2176 .all(&txn)
2177 .await?;
2178 let fragments = fragments
2179 .into_iter()
2180 .filter_map(|(id, stream_node)| {
2181 let mut stream_node = stream_node.to_protobuf();
2182 let mut found = false;
2183 visit_stream_node_mut(&mut stream_node, |node| {
2184 if let PbNodeBody::Sink(node) = node
2185 && let Some(sink_desc) = &mut node.sink_desc
2186 && sink_desc.id == sink_id as u32
2187 {
2188 sink_desc.properties = new_config.clone();
2189 found = true;
2190 }
2191 });
2192 if found { Some((id, stream_node)) } else { None }
2193 })
2194 .collect_vec();
2195 assert!(
2196 !fragments.is_empty(),
2197 "sink id should be used by at least one fragment"
2198 );
2199 for (id, stream_node) in fragments {
2200 fragment::ActiveModel {
2201 fragment_id: Set(id),
2202 stream_node: Set(StreamNode::from(&stream_node)),
2203 ..Default::default()
2204 }
2205 .update(&txn)
2206 .await?;
2207 }
2208
2209 let (sink, obj) = Sink::find_by_id(sink_id)
2210 .find_also_related(Object)
2211 .one(&txn)
2212 .await?
2213 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2214
2215 txn.commit().await?;
2216
2217 let relation_info = PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into());
2218 let _version = self
2219 .notify_frontend(
2220 NotificationOperation::Update,
2221 NotificationInfo::ObjectGroup(PbObjectGroup {
2222 objects: vec![PbObject {
2223 object_info: Some(relation_info),
2224 }],
2225 }),
2226 )
2227 .await;
2228
2229 Ok(new_config.into_iter().collect())
2230 }
2231
2232 pub async fn update_fragment_rate_limit_by_fragment_id(
2233 &self,
2234 fragment_id: FragmentId,
2235 rate_limit: Option<u32>,
2236 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2237 let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2238 stream_node: &mut PbStreamNode| {
2239 let mut found = false;
2240 if fragment_type_mask.contains_any(
2241 FragmentTypeFlag::dml_rate_limit_fragments()
2242 .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2243 .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2244 .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2245 ) {
2246 visit_stream_node_mut(stream_node, |node| {
2247 if let PbNodeBody::Dml(node) = node {
2248 node.rate_limit = rate_limit;
2249 found = true;
2250 }
2251 if let PbNodeBody::Sink(node) = node {
2252 node.rate_limit = rate_limit;
2253 found = true;
2254 }
2255 if let PbNodeBody::StreamCdcScan(node) = node {
2256 node.rate_limit = rate_limit;
2257 found = true;
2258 }
2259 if let PbNodeBody::StreamScan(node) = node {
2260 node.rate_limit = rate_limit;
2261 found = true;
2262 }
2263 if let PbNodeBody::SourceBackfill(node) = node {
2264 node.rate_limit = rate_limit;
2265 found = true;
2266 }
2267 });
2268 }
2269 found
2270 };
2271 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2272 .await
2273 }
2274
2275 pub async fn post_apply_reschedules(
2276 &self,
2277 reschedules: HashMap<FragmentId, Reschedule>,
2278 post_updates: &JobReschedulePostUpdates,
2279 ) -> MetaResult<()> {
2280 let new_created_actors: HashSet<_> = reschedules
2281 .values()
2282 .flat_map(|reschedule| {
2283 reschedule
2284 .added_actors
2285 .values()
2286 .flatten()
2287 .map(|actor_id| *actor_id as ActorId)
2288 })
2289 .collect();
2290
2291 let inner = self.inner.write().await;
2292
2293 let txn = inner.db.begin().await?;
2294
2295 for Reschedule {
2296 removed_actors,
2297 vnode_bitmap_updates,
2298 actor_splits,
2299 newly_created_actors,
2300 ..
2301 } in reschedules.into_values()
2302 {
2303 Actor::delete_many()
2305 .filter(
2306 actor::Column::ActorId
2307 .is_in(removed_actors.iter().map(|id| *id as ActorId).collect_vec()),
2308 )
2309 .exec(&txn)
2310 .await?;
2311
2312 for (
2314 (
2315 StreamActor {
2316 actor_id,
2317 fragment_id,
2318 vnode_bitmap,
2319 expr_context,
2320 ..
2321 },
2322 _,
2323 ),
2324 worker_id,
2325 ) in newly_created_actors.into_values()
2326 {
2327 let splits = actor_splits
2328 .get(&actor_id)
2329 .map(|splits| splits.iter().map(PbConnectorSplit::from).collect_vec());
2330
2331 Actor::insert(actor::ActiveModel {
2332 actor_id: Set(actor_id as _),
2333 fragment_id: Set(fragment_id as _),
2334 status: Set(ActorStatus::Running),
2335 splits: Set(splits.map(|splits| (&PbConnectorSplits { splits }).into())),
2336 worker_id: Set(worker_id),
2337 upstream_actor_ids: Set(Default::default()),
2338 vnode_bitmap: Set(vnode_bitmap
2339 .as_ref()
2340 .map(|bitmap| (&bitmap.to_protobuf()).into())),
2341 expr_context: Set(expr_context.as_ref().unwrap().into()),
2342 })
2343 .exec(&txn)
2344 .await?;
2345 }
2346
2347 for (actor_id, bitmap) in vnode_bitmap_updates {
2349 let actor = Actor::find_by_id(actor_id as ActorId)
2350 .one(&txn)
2351 .await?
2352 .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
2353
2354 let mut actor = actor.into_active_model();
2355 actor.vnode_bitmap = Set(Some((&bitmap.to_protobuf()).into()));
2356 actor.update(&txn).await?;
2357 }
2358
2359 for (actor_id, splits) in actor_splits {
2361 if new_created_actors.contains(&(actor_id as ActorId)) {
2362 continue;
2363 }
2364
2365 let actor = Actor::find_by_id(actor_id as ActorId)
2366 .one(&txn)
2367 .await?
2368 .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
2369
2370 let mut actor = actor.into_active_model();
2371 let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
2372 actor.splits = Set(Some((&PbConnectorSplits { splits }).into()));
2373 actor.update(&txn).await?;
2374 }
2375 }
2376
2377 let JobReschedulePostUpdates {
2378 parallelism_updates,
2379 resource_group_updates,
2380 } = post_updates;
2381
2382 for (table_id, parallelism) in parallelism_updates {
2383 let mut streaming_job = StreamingJobModel::find_by_id(table_id.table_id() as ObjectId)
2384 .one(&txn)
2385 .await?
2386 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?
2387 .into_active_model();
2388
2389 streaming_job.parallelism = Set(match parallelism {
2390 TableParallelism::Adaptive => StreamingParallelism::Adaptive,
2391 TableParallelism::Fixed(n) => StreamingParallelism::Fixed(*n as _),
2392 TableParallelism::Custom => StreamingParallelism::Custom,
2393 });
2394
2395 if let Some(resource_group) =
2396 resource_group_updates.get(&(table_id.table_id() as ObjectId))
2397 {
2398 streaming_job.specific_resource_group = Set(resource_group.to_owned());
2399 }
2400
2401 streaming_job.update(&txn).await?;
2402 }
2403
2404 txn.commit().await?;
2405
2406 Ok(())
2407 }
2408
2409 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2412 let inner = self.inner.read().await;
2413 let txn = inner.db.begin().await?;
2414
2415 let fragments: Vec<(FragmentId, ObjectId, i32, StreamNode)> = Fragment::find()
2416 .select_only()
2417 .columns([
2418 fragment::Column::FragmentId,
2419 fragment::Column::JobId,
2420 fragment::Column::FragmentTypeMask,
2421 fragment::Column::StreamNode,
2422 ])
2423 .filter(FragmentTypeMask::intersects_any(
2424 FragmentTypeFlag::rate_limit_fragments(),
2425 ))
2426 .into_tuple()
2427 .all(&txn)
2428 .await?;
2429
2430 let mut rate_limits = Vec::new();
2431 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2432 let stream_node = stream_node.to_protobuf();
2433 visit_stream_node_body(&stream_node, |node| {
2434 let mut rate_limit = None;
2435 let mut node_name = None;
2436
2437 match node {
2438 PbNodeBody::Source(node) => {
2440 if let Some(node_inner) = &node.source_inner {
2441 rate_limit = node_inner.rate_limit;
2442 node_name = Some("SOURCE");
2443 }
2444 }
2445 PbNodeBody::StreamFsFetch(node) => {
2446 if let Some(node_inner) = &node.node_inner {
2447 rate_limit = node_inner.rate_limit;
2448 node_name = Some("FS_FETCH");
2449 }
2450 }
2451 PbNodeBody::SourceBackfill(node) => {
2453 rate_limit = node.rate_limit;
2454 node_name = Some("SOURCE_BACKFILL");
2455 }
2456 PbNodeBody::StreamScan(node) => {
2457 rate_limit = node.rate_limit;
2458 node_name = Some("STREAM_SCAN");
2459 }
2460 PbNodeBody::StreamCdcScan(node) => {
2461 rate_limit = node.rate_limit;
2462 node_name = Some("STREAM_CDC_SCAN");
2463 }
2464 PbNodeBody::Sink(node) => {
2465 rate_limit = node.rate_limit;
2466 node_name = Some("SINK");
2467 }
2468 _ => {}
2469 }
2470
2471 if let Some(rate_limit) = rate_limit {
2472 rate_limits.push(RateLimitInfo {
2473 fragment_id: fragment_id as u32,
2474 job_id: job_id as u32,
2475 fragment_type_mask: fragment_type_mask as u32,
2476 rate_limit,
2477 node_name: node_name.unwrap().to_owned(),
2478 });
2479 }
2480 });
2481 }
2482
2483 Ok(rate_limits)
2484 }
2485}
2486
2487pub struct SinkIntoTableContext {
2488 pub updated_sink_catalogs: Vec<SinkId>,
2491}
2492
2493pub struct FinishAutoRefreshSchemaSinkContext {
2494 pub tmp_sink_id: ObjectId,
2495 pub original_sink_id: ObjectId,
2496 pub columns: Vec<PbColumnCatalog>,
2497 pub new_log_store_table: Option<(ObjectId, Vec<PbColumnCatalog>)>,
2498}
2499
2500async fn update_connector_props_fragments<F>(
2501 txn: &DatabaseTransaction,
2502 job_ids: Vec<i32>,
2503 expect_flag: FragmentTypeFlag,
2504 mut alter_stream_node_fn: F,
2505 is_shared_source: bool,
2506) -> MetaResult<()>
2507where
2508 F: FnMut(&mut PbNodeBody, &mut bool),
2509{
2510 let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
2511 .select_only()
2512 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
2513 .filter(
2514 fragment::Column::JobId
2515 .is_in(job_ids.clone())
2516 .and(FragmentTypeMask::intersects(expect_flag)),
2517 )
2518 .into_tuple()
2519 .all(txn)
2520 .await?;
2521 let fragments = fragments
2522 .into_iter()
2523 .filter_map(|(id, stream_node)| {
2524 let mut stream_node = stream_node.to_protobuf();
2525 let mut found = false;
2526 visit_stream_node_mut(&mut stream_node, |node| {
2527 alter_stream_node_fn(node, &mut found);
2528 });
2529 if found { Some((id, stream_node)) } else { None }
2530 })
2531 .collect_vec();
2532 if is_shared_source || job_ids.len() > 1 {
2533 assert!(
2537 !fragments.is_empty(),
2538 "job ids {:?} (type: {:?}) should be used by at least one fragment",
2539 job_ids,
2540 expect_flag
2541 );
2542 }
2543
2544 for (id, stream_node) in fragments {
2545 fragment::ActiveModel {
2546 fragment_id: Set(id),
2547 stream_node: Set(StreamNode::from(&stream_node)),
2548 ..Default::default()
2549 }
2550 .update(txn)
2551 .await?;
2552 }
2553
2554 Ok(())
2555}