1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::config::DefaultParallelism;
22use risingwave_common::hash::VnodeCountCompat;
23use risingwave_common::util::column_index_mapping::ColIndexMapping;
24use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_mut};
25use risingwave_common::{bail, current_cluster_version};
26use risingwave_connector::sink::file_sink::fs::FsSink;
27use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
28use risingwave_connector::{WithPropertiesExt, match_sink_name_str};
29use risingwave_meta_model::actor::ActorStatus;
30use risingwave_meta_model::object::ObjectType;
31use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
32use risingwave_meta_model::table::TableType;
33use risingwave_meta_model::*;
34use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
35use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId;
36use risingwave_pb::catalog::{PbCreateType, PbTable};
37use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
38use risingwave_pb::meta::object::PbObjectInfo;
39use risingwave_pb::meta::subscribe_response::{
40 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
41};
42use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
43use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
44use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
45use risingwave_pb::stream_plan::stream_node::PbNodeBody;
46use risingwave_pb::stream_plan::{FragmentTypeFlag, PbFragmentTypeFlag, PbStreamNode};
47use risingwave_pb::user::PbUserInfo;
48use risingwave_sqlparser::ast::{SqlOption, Statement};
49use risingwave_sqlparser::parser::{Parser, ParserError};
50use sea_orm::ActiveValue::Set;
51use sea_orm::sea_query::{BinOper, Expr, Query, SimpleExpr};
52use sea_orm::{
53 ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel,
54 IntoSimpleExpr, JoinType, ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect,
55 RelationTrait, TransactionTrait,
56};
57use thiserror_ext::AsReport;
58
59use crate::barrier::{ReplaceStreamJobPlan, Reschedule};
60use crate::controller::ObjectModel;
61use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
62use crate::controller::rename::ReplaceTableExprRewriter;
63use crate::controller::utils::{
64 PartialObject, build_object_group_for_delete, check_relation_name_duplicate,
65 check_sink_into_table_cycle, ensure_object_id, ensure_user_id, get_fragment_actor_ids,
66 get_fragment_mappings, get_internal_tables_by_id, insert_fragment_relations,
67 rebuild_fragment_mapping_from_actors,
68};
69use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
70use crate::model::{
71 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamActor, StreamContext,
72 StreamJobFragmentsToCreate, TableParallelism,
73};
74use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
75use crate::{MetaError, MetaResult};
76
77impl CatalogController {
78 pub async fn create_streaming_job_obj(
79 txn: &DatabaseTransaction,
80 obj_type: ObjectType,
81 owner_id: UserId,
82 database_id: Option<DatabaseId>,
83 schema_id: Option<SchemaId>,
84 create_type: PbCreateType,
85 ctx: &StreamContext,
86 streaming_parallelism: StreamingParallelism,
87 max_parallelism: usize,
88 specific_resource_group: Option<String>, ) -> MetaResult<ObjectId> {
90 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
91 let job = streaming_job::ActiveModel {
92 job_id: Set(obj.oid),
93 job_status: Set(JobStatus::Initial),
94 create_type: Set(create_type.into()),
95 timezone: Set(ctx.timezone.clone()),
96 parallelism: Set(streaming_parallelism),
97 max_parallelism: Set(max_parallelism as _),
98 specific_resource_group: Set(specific_resource_group),
99 };
100 job.insert(txn).await?;
101
102 Ok(obj.oid)
103 }
104
105 pub async fn create_job_catalog(
111 &self,
112 streaming_job: &mut StreamingJob,
113 ctx: &StreamContext,
114 parallelism: &Option<Parallelism>,
115 max_parallelism: usize,
116 mut dependencies: HashSet<ObjectId>,
117 specific_resource_group: Option<String>,
118 ) -> MetaResult<()> {
119 let inner = self.inner.write().await;
120 let txn = inner.db.begin().await?;
121 let create_type = streaming_job.create_type();
122
123 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
124 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
125 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
126 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
127 };
128
129 ensure_user_id(streaming_job.owner() as _, &txn).await?;
130 ensure_object_id(ObjectType::Database, streaming_job.database_id() as _, &txn).await?;
131 ensure_object_id(ObjectType::Schema, streaming_job.schema_id() as _, &txn).await?;
132 check_relation_name_duplicate(
133 &streaming_job.name(),
134 streaming_job.database_id() as _,
135 streaming_job.schema_id() as _,
136 &txn,
137 )
138 .await?;
139
140 dependencies.extend(
142 streaming_job
143 .dependent_relations()
144 .into_iter()
145 .map(|id| id as ObjectId),
146 );
147
148 if !dependencies.is_empty() {
150 let altering_cnt = ObjectDependency::find()
151 .join(
152 JoinType::InnerJoin,
153 object_dependency::Relation::Object1.def(),
154 )
155 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
156 .filter(
157 object_dependency::Column::Oid
158 .is_in(dependencies.clone())
159 .and(object::Column::ObjType.eq(ObjectType::Table))
160 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
161 .and(
162 object::Column::Oid.not_in_subquery(
164 Query::select()
165 .column(table::Column::TableId)
166 .from(Table)
167 .to_owned(),
168 ),
169 ),
170 )
171 .count(&txn)
172 .await?;
173 if altering_cnt != 0 {
174 return Err(MetaError::permission_denied(
175 "some dependent relations are being altered",
176 ));
177 }
178 }
179
180 match streaming_job {
181 StreamingJob::MaterializedView(table) => {
182 let job_id = Self::create_streaming_job_obj(
183 &txn,
184 ObjectType::Table,
185 table.owner as _,
186 Some(table.database_id as _),
187 Some(table.schema_id as _),
188 create_type,
189 ctx,
190 streaming_parallelism,
191 max_parallelism,
192 specific_resource_group,
193 )
194 .await?;
195 table.id = job_id as _;
196 let table_model: table::ActiveModel = table.clone().into();
197 Table::insert(table_model).exec(&txn).await?;
198 }
199 StreamingJob::Sink(sink, _) => {
200 if let Some(target_table_id) = sink.target_table {
201 if check_sink_into_table_cycle(
202 target_table_id as ObjectId,
203 dependencies.iter().cloned().collect(),
204 &txn,
205 )
206 .await?
207 {
208 bail!("Creating such a sink will result in circular dependency.");
209 }
210 }
211
212 let job_id = Self::create_streaming_job_obj(
213 &txn,
214 ObjectType::Sink,
215 sink.owner as _,
216 Some(sink.database_id as _),
217 Some(sink.schema_id as _),
218 create_type,
219 ctx,
220 streaming_parallelism,
221 max_parallelism,
222 specific_resource_group,
223 )
224 .await?;
225 sink.id = job_id as _;
226 let sink_model: sink::ActiveModel = sink.clone().into();
227 Sink::insert(sink_model).exec(&txn).await?;
228 }
229 StreamingJob::Table(src, table, _) => {
230 let job_id = Self::create_streaming_job_obj(
231 &txn,
232 ObjectType::Table,
233 table.owner as _,
234 Some(table.database_id as _),
235 Some(table.schema_id as _),
236 create_type,
237 ctx,
238 streaming_parallelism,
239 max_parallelism,
240 specific_resource_group,
241 )
242 .await?;
243 table.id = job_id as _;
244 if let Some(src) = src {
245 let src_obj = Self::create_object(
246 &txn,
247 ObjectType::Source,
248 src.owner as _,
249 Some(src.database_id as _),
250 Some(src.schema_id as _),
251 )
252 .await?;
253 src.id = src_obj.oid as _;
254 src.optional_associated_table_id =
255 Some(PbOptionalAssociatedTableId::AssociatedTableId(job_id as _));
256 table.optional_associated_source_id = Some(
257 PbOptionalAssociatedSourceId::AssociatedSourceId(src_obj.oid as _),
258 );
259 let source: source::ActiveModel = src.clone().into();
260 Source::insert(source).exec(&txn).await?;
261 }
262 let table_model: table::ActiveModel = table.clone().into();
263 Table::insert(table_model).exec(&txn).await?;
264 }
265 StreamingJob::Index(index, table) => {
266 ensure_object_id(ObjectType::Table, index.primary_table_id as _, &txn).await?;
267 let job_id = Self::create_streaming_job_obj(
268 &txn,
269 ObjectType::Index,
270 index.owner as _,
271 Some(index.database_id as _),
272 Some(index.schema_id as _),
273 create_type,
274 ctx,
275 streaming_parallelism,
276 max_parallelism,
277 specific_resource_group,
278 )
279 .await?;
280 index.id = job_id as _;
282 index.index_table_id = job_id as _;
283 table.id = job_id as _;
284
285 ObjectDependency::insert(object_dependency::ActiveModel {
286 oid: Set(index.primary_table_id as _),
287 used_by: Set(table.id as _),
288 ..Default::default()
289 })
290 .exec(&txn)
291 .await?;
292
293 let table_model: table::ActiveModel = table.clone().into();
294 Table::insert(table_model).exec(&txn).await?;
295 let index_model: index::ActiveModel = index.clone().into();
296 Index::insert(index_model).exec(&txn).await?;
297 }
298 StreamingJob::Source(src) => {
299 let job_id = Self::create_streaming_job_obj(
300 &txn,
301 ObjectType::Source,
302 src.owner as _,
303 Some(src.database_id as _),
304 Some(src.schema_id as _),
305 create_type,
306 ctx,
307 streaming_parallelism,
308 max_parallelism,
309 specific_resource_group,
310 )
311 .await?;
312 src.id = job_id as _;
313 let source_model: source::ActiveModel = src.clone().into();
314 Source::insert(source_model).exec(&txn).await?;
315 }
316 }
317
318 dependencies.extend(
320 streaming_job
321 .dependent_secret_ids()?
322 .into_iter()
323 .map(|secret_id| secret_id as ObjectId),
324 );
325 dependencies.extend(
327 streaming_job
328 .dependent_connection_ids()?
329 .into_iter()
330 .map(|conn_id| conn_id as ObjectId),
331 );
332
333 if !dependencies.is_empty() {
335 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
336 object_dependency::ActiveModel {
337 oid: Set(oid),
338 used_by: Set(streaming_job.id() as _),
339 ..Default::default()
340 }
341 }))
342 .exec(&txn)
343 .await?;
344 }
345
346 txn.commit().await?;
347
348 Ok(())
349 }
350
351 pub async fn create_internal_table_catalog(
359 &self,
360 job: &StreamingJob,
361 mut incomplete_internal_tables: Vec<PbTable>,
362 ) -> MetaResult<HashMap<u32, u32>> {
363 let job_id = job.id() as ObjectId;
364 let inner = self.inner.write().await;
365 let txn = inner.db.begin().await?;
366 let mut table_id_map = HashMap::new();
367 for table in &mut incomplete_internal_tables {
368 let table_id = Self::create_object(
369 &txn,
370 ObjectType::Table,
371 table.owner as _,
372 Some(table.database_id as _),
373 Some(table.schema_id as _),
374 )
375 .await?
376 .oid;
377 table_id_map.insert(table.id, table_id as u32);
378 table.id = table_id as _;
379 table.job_id = Some(job_id as _);
380
381 let table_model = table::ActiveModel {
382 table_id: Set(table_id as _),
383 belongs_to_job_id: Set(Some(job_id)),
384 fragment_id: NotSet,
385 ..table.clone().into()
386 };
387 Table::insert(table_model).exec(&txn).await?;
388 }
389 txn.commit().await?;
390
391 Ok(table_id_map)
392 }
393
394 pub async fn prepare_streaming_job(
399 &self,
400 stream_job_fragments: &StreamJobFragmentsToCreate,
401 streaming_job: &StreamingJob,
402 for_replace: bool,
403 ) -> MetaResult<()> {
404 let is_materialized_view = streaming_job.is_materialized_view();
405 let fragment_actors =
406 Self::extract_fragment_and_actors_from_fragments(stream_job_fragments)?;
407 let mut all_tables = stream_job_fragments.all_tables();
408 let inner = self.inner.write().await;
409
410 let mut objects = vec![];
411 let txn = inner.db.begin().await?;
412
413 let (fragments, actors): (Vec<_>, Vec<_>) = fragment_actors.into_iter().unzip();
415 for fragment in fragments {
416 let fragment_id = fragment.fragment_id;
417 let state_table_ids = fragment.state_table_ids.inner_ref().clone();
418
419 let fragment = fragment.into_active_model();
420 Fragment::insert(fragment).exec(&txn).await?;
421
422 if !for_replace {
425 for state_table_id in state_table_ids {
426 let table = all_tables
430 .get_mut(&(state_table_id as u32))
431 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
432 assert_eq!(table.id, state_table_id as u32);
433 assert_eq!(table.fragment_id, fragment_id as u32);
434 table.job_id = Some(streaming_job.id());
435 let vnode_count = table.vnode_count();
436
437 table::ActiveModel {
438 table_id: Set(state_table_id as _),
439 fragment_id: Set(Some(fragment_id)),
440 vnode_count: Set(vnode_count as _),
441 ..Default::default()
442 }
443 .update(&txn)
444 .await?;
445
446 if is_materialized_view {
447 objects.push(PbObject {
448 object_info: Some(PbObjectInfo::Table(table.clone())),
449 });
450 }
451 }
452 }
453 }
454
455 insert_fragment_relations(&txn, &stream_job_fragments.downstreams).await?;
456
457 for actors in actors {
459 for actor in actors {
460 let actor = actor.into_active_model();
461 Actor::insert(actor).exec(&txn).await?;
462 }
463 }
464
465 if !for_replace {
466 if let StreamingJob::Table(_, table, ..) = streaming_job {
468 Table::update(table::ActiveModel {
469 table_id: Set(table.id as _),
470 dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)),
471 ..Default::default()
472 })
473 .exec(&txn)
474 .await?;
475 }
476 }
477
478 txn.commit().await?;
479
480 if !objects.is_empty() {
481 assert!(is_materialized_view);
482 self.notify_frontend(Operation::Add, Info::ObjectGroup(PbObjectGroup { objects }))
483 .await;
484 }
485
486 Ok(())
487 }
488
489 pub async fn try_abort_creating_streaming_job(
493 &self,
494 job_id: ObjectId,
495 is_cancelled: bool,
496 ) -> MetaResult<(bool, Option<DatabaseId>)> {
497 let mut inner = self.inner.write().await;
498 let txn = inner.db.begin().await?;
499
500 let obj = Object::find_by_id(job_id).one(&txn).await?;
501 let Some(obj) = obj else {
502 tracing::warn!(
503 id = job_id,
504 "streaming job not found when aborting creating, might be cleaned by recovery"
505 );
506 return Ok((true, None));
507 };
508 let database_id = obj
509 .database_id
510 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
511
512 if !is_cancelled {
513 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
514 if let Some(streaming_job) = streaming_job {
515 assert_ne!(streaming_job.job_status, JobStatus::Created);
516 if streaming_job.create_type == CreateType::Background
517 && streaming_job.job_status == JobStatus::Creating
518 {
519 tracing::warn!(
521 id = job_id,
522 "streaming job is created in background and still in creating status"
523 );
524 return Ok((false, Some(database_id)));
525 }
526 }
527 }
528
529 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
530
531 let table_obj = Table::find_by_id(job_id).one(&txn).await?;
533 let mut objs = vec![];
534 if let Some(table) = &table_obj
535 && table.table_type == TableType::MaterializedView
536 {
537 let obj: Option<PartialObject> = Object::find_by_id(job_id)
538 .select_only()
539 .columns([
540 object::Column::Oid,
541 object::Column::ObjType,
542 object::Column::SchemaId,
543 object::Column::DatabaseId,
544 ])
545 .into_partial_model()
546 .one(&txn)
547 .await?;
548 let obj =
549 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
550 objs.push(obj);
551 let internal_table_objs: Vec<PartialObject> = Object::find()
552 .select_only()
553 .columns([
554 object::Column::Oid,
555 object::Column::ObjType,
556 object::Column::SchemaId,
557 object::Column::DatabaseId,
558 ])
559 .join(JoinType::InnerJoin, object::Relation::Table.def())
560 .filter(table::Column::BelongsToJobId.eq(job_id))
561 .into_partial_model()
562 .all(&txn)
563 .await?;
564 objs.extend(internal_table_objs);
565 }
566
567 if table_obj.is_none()
569 && let Some(Some(target_table_id)) = Sink::find_by_id(job_id)
570 .select_only()
571 .column(sink::Column::TargetTable)
572 .into_tuple::<Option<TableId>>()
573 .one(&txn)
574 .await?
575 {
576 let tmp_id: Option<ObjectId> = ObjectDependency::find()
577 .select_only()
578 .column(object_dependency::Column::UsedBy)
579 .join(
580 JoinType::InnerJoin,
581 object_dependency::Relation::Object1.def(),
582 )
583 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
584 .filter(
585 object_dependency::Column::Oid
586 .eq(target_table_id)
587 .and(object::Column::ObjType.eq(ObjectType::Table))
588 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
589 )
590 .into_tuple()
591 .one(&txn)
592 .await?;
593 if let Some(tmp_id) = tmp_id {
594 tracing::warn!(
595 id = tmp_id,
596 "aborting temp streaming job for sink into table"
597 );
598 Object::delete_by_id(tmp_id).exec(&txn).await?;
599 }
600 }
601
602 Object::delete_by_id(job_id).exec(&txn).await?;
603 if !internal_table_ids.is_empty() {
604 Object::delete_many()
605 .filter(object::Column::Oid.is_in(internal_table_ids))
606 .exec(&txn)
607 .await?;
608 }
609 if let Some(t) = &table_obj
610 && let Some(source_id) = t.optional_associated_source_id
611 {
612 Object::delete_by_id(source_id).exec(&txn).await?;
613 }
614
615 let err = if is_cancelled {
616 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
617 } else {
618 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
619 };
620 let abort_reason = format!("streaming job aborted {}", err.as_report());
621 for tx in inner
622 .creating_table_finish_notifier
623 .get_mut(&database_id)
624 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
625 .into_iter()
626 .flatten()
627 .flatten()
628 {
629 let _ = tx.send(Err(abort_reason.clone()));
630 }
631 txn.commit().await?;
632
633 if !objs.is_empty() {
634 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
637 .await;
638 }
639 Ok((true, Some(database_id)))
640 }
641
642 pub async fn post_collect_job_fragments(
643 &self,
644 job_id: ObjectId,
645 actor_ids: Vec<crate::model::ActorId>,
646 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
647 split_assignment: &SplitAssignment,
648 ) -> MetaResult<()> {
649 self.post_collect_job_fragments_inner(
650 job_id,
651 actor_ids,
652 upstream_fragment_new_downstreams,
653 split_assignment,
654 false,
655 )
656 .await
657 }
658
659 pub async fn post_collect_job_fragments_inner(
660 &self,
661 job_id: ObjectId,
662 actor_ids: Vec<crate::model::ActorId>,
663 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
664 split_assignment: &SplitAssignment,
665 is_mv: bool,
666 ) -> MetaResult<()> {
667 let inner = self.inner.write().await;
668 let txn = inner.db.begin().await?;
669
670 Actor::update_many()
671 .col_expr(
672 actor::Column::Status,
673 SimpleExpr::from(ActorStatus::Running.into_value()),
674 )
675 .filter(
676 actor::Column::ActorId
677 .is_in(actor_ids.into_iter().map(|id| id as ActorId).collect_vec()),
678 )
679 .exec(&txn)
680 .await?;
681
682 for splits in split_assignment.values() {
683 for (actor_id, splits) in splits {
684 let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
685 let connector_splits = &PbConnectorSplits { splits };
686 actor::ActiveModel {
687 actor_id: Set(*actor_id as _),
688 splits: Set(Some(connector_splits.into())),
689 ..Default::default()
690 }
691 .update(&txn)
692 .await?;
693 }
694 }
695
696 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
697
698 streaming_job::ActiveModel {
700 job_id: Set(job_id),
701 job_status: Set(JobStatus::Creating),
702 ..Default::default()
703 }
704 .update(&txn)
705 .await?;
706
707 let fragment_mapping = if is_mv {
708 get_fragment_mappings(&txn, job_id as _).await?
709 } else {
710 vec![]
711 };
712
713 txn.commit().await?;
714 self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
715 .await;
716
717 Ok(())
718 }
719
720 pub async fn create_job_catalog_for_replace(
721 &self,
722 streaming_job: &StreamingJob,
723 ctx: &StreamContext,
724 specified_parallelism: &Option<NonZeroUsize>,
725 max_parallelism: usize,
726 ) -> MetaResult<ObjectId> {
727 let id = streaming_job.id();
728 let inner = self.inner.write().await;
729 let txn = inner.db.begin().await?;
730
731 streaming_job.verify_version_for_replace(&txn).await?;
733 let referring_cnt = ObjectDependency::find()
735 .join(
736 JoinType::InnerJoin,
737 object_dependency::Relation::Object1.def(),
738 )
739 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
740 .filter(
741 object_dependency::Column::Oid
742 .eq(id as ObjectId)
743 .and(object::Column::ObjType.eq(ObjectType::Table))
744 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
745 )
746 .count(&txn)
747 .await?;
748 if referring_cnt != 0 {
749 return Err(MetaError::permission_denied(
750 "job is being altered or referenced by some creating jobs",
751 ));
752 }
753
754 let original_max_parallelism: i32 = StreamingJobModel::find_by_id(id as ObjectId)
756 .select_only()
757 .column(streaming_job::Column::MaxParallelism)
758 .into_tuple()
759 .one(&txn)
760 .await?
761 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
762
763 if original_max_parallelism != max_parallelism as i32 {
764 bail!(
767 "cannot use a different max parallelism \
768 when replacing streaming job, \
769 original: {}, new: {}",
770 original_max_parallelism,
771 max_parallelism
772 );
773 }
774
775 let parallelism = match specified_parallelism {
776 None => StreamingParallelism::Adaptive,
777 Some(n) => StreamingParallelism::Fixed(n.get() as _),
778 };
779
780 let new_obj_id = Self::create_streaming_job_obj(
782 &txn,
783 streaming_job.object_type(),
784 streaming_job.owner() as _,
785 Some(streaming_job.database_id() as _),
786 Some(streaming_job.schema_id() as _),
787 streaming_job.create_type(),
788 ctx,
789 parallelism,
790 max_parallelism,
791 None,
792 )
793 .await?;
794
795 ObjectDependency::insert(object_dependency::ActiveModel {
797 oid: Set(id as _),
798 used_by: Set(new_obj_id as _),
799 ..Default::default()
800 })
801 .exec(&txn)
802 .await?;
803
804 txn.commit().await?;
805
806 Ok(new_obj_id)
807 }
808
809 pub async fn finish_streaming_job(
811 &self,
812 job_id: ObjectId,
813 replace_stream_job_info: Option<ReplaceStreamJobPlan>,
814 ) -> MetaResult<()> {
815 let mut inner = self.inner.write().await;
816 let txn = inner.db.begin().await?;
817
818 let job_type = Object::find_by_id(job_id)
819 .select_only()
820 .column(object::Column::ObjType)
821 .into_tuple()
822 .one(&txn)
823 .await?
824 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
825
826 let res = Object::update_many()
828 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
829 .col_expr(
830 object::Column::CreatedAtClusterVersion,
831 current_cluster_version().into(),
832 )
833 .filter(object::Column::Oid.eq(job_id))
834 .exec(&txn)
835 .await?;
836 if res.rows_affected == 0 {
837 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
838 }
839
840 let job = streaming_job::ActiveModel {
842 job_id: Set(job_id),
843 job_status: Set(JobStatus::Created),
844 ..Default::default()
845 };
846 job.update(&txn).await?;
847
848 let internal_table_objs = Table::find()
850 .find_also_related(Object)
851 .filter(table::Column::BelongsToJobId.eq(job_id))
852 .all(&txn)
853 .await?;
854 let mut objects = internal_table_objs
855 .iter()
856 .map(|(table, obj)| PbObject {
857 object_info: Some(PbObjectInfo::Table(
858 ObjectModel(table.clone(), obj.clone().unwrap()).into(),
859 )),
860 })
861 .collect_vec();
862 let mut notification_op = NotificationOperation::Add;
863
864 match job_type {
865 ObjectType::Table => {
866 let (table, obj) = Table::find_by_id(job_id)
867 .find_also_related(Object)
868 .one(&txn)
869 .await?
870 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
871 if table.table_type == TableType::MaterializedView {
872 notification_op = NotificationOperation::Update;
873 }
874
875 if let Some(source_id) = table.optional_associated_source_id {
876 let (src, obj) = Source::find_by_id(source_id)
877 .find_also_related(Object)
878 .one(&txn)
879 .await?
880 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
881 objects.push(PbObject {
882 object_info: Some(PbObjectInfo::Source(
883 ObjectModel(src, obj.unwrap()).into(),
884 )),
885 });
886 }
887 objects.push(PbObject {
888 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
889 });
890 }
891 ObjectType::Sink => {
892 let (sink, obj) = Sink::find_by_id(job_id)
893 .find_also_related(Object)
894 .one(&txn)
895 .await?
896 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
897 objects.push(PbObject {
898 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
899 });
900 }
901 ObjectType::Index => {
902 let (index, obj) = Index::find_by_id(job_id)
903 .find_also_related(Object)
904 .one(&txn)
905 .await?
906 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
907 {
908 let (table, obj) = Table::find_by_id(index.index_table_id)
909 .find_also_related(Object)
910 .one(&txn)
911 .await?
912 .ok_or_else(|| {
913 MetaError::catalog_id_not_found("table", index.index_table_id)
914 })?;
915 objects.push(PbObject {
916 object_info: Some(PbObjectInfo::Table(
917 ObjectModel(table, obj.unwrap()).into(),
918 )),
919 });
920 }
921 objects.push(PbObject {
922 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
923 });
924 }
925 ObjectType::Source => {
926 let (source, obj) = Source::find_by_id(job_id)
927 .find_also_related(Object)
928 .one(&txn)
929 .await?
930 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
931 objects.push(PbObject {
932 object_info: Some(PbObjectInfo::Source(
933 ObjectModel(source, obj.unwrap()).into(),
934 )),
935 });
936 }
937 _ => unreachable!("invalid job type: {:?}", job_type),
938 }
939
940 let fragment_mapping = get_fragment_mappings(&txn, job_id).await?;
941
942 let replace_table_mapping_update = match replace_stream_job_info {
943 Some(ReplaceStreamJobPlan {
944 streaming_job,
945 replace_upstream,
946 tmp_id,
947 ..
948 }) => {
949 let incoming_sink_id = job_id;
950
951 let (relations, fragment_mapping, _) = Self::finish_replace_streaming_job_inner(
952 tmp_id as ObjectId,
953 replace_upstream,
954 None,
955 SinkIntoTableContext {
956 creating_sink_id: Some(incoming_sink_id as _),
957 dropping_sink_id: None,
958 updated_sink_catalogs: vec![],
959 },
960 &txn,
961 streaming_job,
962 None, )
964 .await?;
965
966 Some((relations, fragment_mapping))
967 }
968 None => None,
969 };
970
971 txn.commit().await?;
972
973 self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
974 .await;
975
976 let mut version = self
977 .notify_frontend(
978 notification_op,
979 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
980 )
981 .await;
982
983 if let Some((objects, fragment_mapping)) = replace_table_mapping_update {
984 self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
985 .await;
986 version = self
987 .notify_frontend(
988 NotificationOperation::Update,
989 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
990 )
991 .await;
992 }
993 inner
994 .creating_table_finish_notifier
995 .values_mut()
996 .for_each(|creating_tables| {
997 if let Some(txs) = creating_tables.remove(&job_id) {
998 for tx in txs {
999 let _ = tx.send(Ok(version));
1000 }
1001 }
1002 });
1003
1004 Ok(())
1005 }
1006
1007 pub async fn finish_replace_streaming_job(
1008 &self,
1009 tmp_id: ObjectId,
1010 streaming_job: StreamingJob,
1011 replace_upstream: FragmentReplaceUpstream,
1012 col_index_mapping: Option<ColIndexMapping>,
1013 sink_into_table_context: SinkIntoTableContext,
1014 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1015 ) -> MetaResult<NotificationVersion> {
1016 let inner = self.inner.write().await;
1017 let txn = inner.db.begin().await?;
1018
1019 let (objects, fragment_mapping, delete_notification_objs) =
1020 Self::finish_replace_streaming_job_inner(
1021 tmp_id,
1022 replace_upstream,
1023 col_index_mapping,
1024 sink_into_table_context,
1025 &txn,
1026 streaming_job,
1027 drop_table_connector_ctx,
1028 )
1029 .await?;
1030
1031 txn.commit().await?;
1032
1033 self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
1039 .await;
1040 let mut version = self
1041 .notify_frontend(
1042 NotificationOperation::Update,
1043 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1044 )
1045 .await;
1046
1047 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1048 self.notify_users_update(user_infos).await;
1049 version = self
1050 .notify_frontend(
1051 NotificationOperation::Delete,
1052 build_object_group_for_delete(to_drop_objects),
1053 )
1054 .await;
1055 }
1056
1057 Ok(version)
1058 }
1059
1060 pub async fn finish_replace_streaming_job_inner(
1061 tmp_id: ObjectId,
1062 replace_upstream: FragmentReplaceUpstream,
1063 col_index_mapping: Option<ColIndexMapping>,
1064 SinkIntoTableContext {
1065 creating_sink_id,
1066 dropping_sink_id,
1067 updated_sink_catalogs,
1068 }: SinkIntoTableContext,
1069 txn: &DatabaseTransaction,
1070 streaming_job: StreamingJob,
1071 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1072 ) -> MetaResult<(
1073 Vec<PbObject>,
1074 Vec<PbFragmentWorkerSlotMapping>,
1075 Option<(Vec<PbUserInfo>, Vec<PartialObject>)>,
1076 )> {
1077 let original_job_id = streaming_job.id() as ObjectId;
1078 let job_type = streaming_job.job_type();
1079
1080 match streaming_job {
1082 StreamingJob::Table(_source, table, _table_job_type) => {
1083 let original_table_catalogs = Table::find_by_id(original_job_id)
1086 .select_only()
1087 .columns([table::Column::Columns])
1088 .into_tuple::<ColumnCatalogArray>()
1089 .one(txn)
1090 .await?
1091 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1092
1093 for sink_id in updated_sink_catalogs {
1095 sink::ActiveModel {
1096 sink_id: Set(sink_id as _),
1097 original_target_columns: Set(Some(original_table_catalogs.clone())),
1098 ..Default::default()
1099 }
1100 .update(txn)
1101 .await?;
1102 }
1103 let mut table = table::ActiveModel::from(table);
1105 let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone();
1106 if let Some(sink_id) = creating_sink_id {
1107 debug_assert!(!incoming_sinks.contains(&{ sink_id }));
1108 incoming_sinks.push(sink_id as _);
1109 }
1110 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1111 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1112 {
1113 table.optional_associated_source_id = Set(None);
1115 }
1116
1117 if let Some(sink_id) = dropping_sink_id {
1118 let drained = incoming_sinks
1119 .extract_if(.., |id| *id == sink_id)
1120 .collect_vec();
1121 debug_assert_eq!(drained, vec![sink_id]);
1122 }
1123
1124 table.incoming_sinks = Set(incoming_sinks.into());
1125 table.update(txn).await?;
1126 }
1127 StreamingJob::Source(source) => {
1128 let source = source::ActiveModel::from(source);
1130 source.update(txn).await?;
1131 }
1132 _ => unreachable!(
1133 "invalid streaming job type: {:?}",
1134 streaming_job.job_type_str()
1135 ),
1136 }
1137
1138 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1142 .select_only()
1143 .columns([
1144 fragment::Column::FragmentId,
1145 fragment::Column::StateTableIds,
1146 ])
1147 .filter(fragment::Column::JobId.eq(tmp_id))
1148 .into_tuple()
1149 .all(txn)
1150 .await?;
1151 for (fragment_id, state_table_ids) in fragment_info {
1152 for state_table_id in state_table_ids.into_inner() {
1153 table::ActiveModel {
1154 table_id: Set(state_table_id as _),
1155 fragment_id: Set(Some(fragment_id)),
1156 ..Default::default()
1158 }
1159 .update(txn)
1160 .await?;
1161 }
1162 }
1163
1164 Fragment::delete_many()
1166 .filter(fragment::Column::JobId.eq(original_job_id))
1167 .exec(txn)
1168 .await?;
1169 Fragment::update_many()
1170 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1171 .filter(fragment::Column::JobId.eq(tmp_id))
1172 .exec(txn)
1173 .await?;
1174
1175 for (fragment_id, fragment_replace_map) in replace_upstream {
1178 let (fragment_id, mut stream_node) = Fragment::find_by_id(fragment_id as FragmentId)
1179 .select_only()
1180 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1181 .into_tuple::<(FragmentId, StreamNode)>()
1182 .one(txn)
1183 .await?
1184 .map(|(id, node)| (id, node.to_protobuf()))
1185 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1186
1187 visit_stream_node_mut(&mut stream_node, |body| {
1188 if let PbNodeBody::Merge(m) = body
1189 && let Some(new_fragment_id) = fragment_replace_map.get(&m.upstream_fragment_id)
1190 {
1191 m.upstream_fragment_id = *new_fragment_id;
1192 }
1193 });
1194 fragment::ActiveModel {
1195 fragment_id: Set(fragment_id),
1196 stream_node: Set(StreamNode::from(&stream_node)),
1197 ..Default::default()
1198 }
1199 .update(txn)
1200 .await?;
1201 }
1202
1203 Object::delete_by_id(tmp_id).exec(txn).await?;
1205
1206 let mut objects = vec![];
1208 match job_type {
1209 StreamingJobType::Table(_) => {
1210 let (table, table_obj) = Table::find_by_id(original_job_id)
1211 .find_also_related(Object)
1212 .one(txn)
1213 .await?
1214 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1215 objects.push(PbObject {
1216 object_info: Some(PbObjectInfo::Table(
1217 ObjectModel(table, table_obj.unwrap()).into(),
1218 )),
1219 })
1220 }
1221 StreamingJobType::Source => {
1222 let (source, source_obj) = Source::find_by_id(original_job_id)
1223 .find_also_related(Object)
1224 .one(txn)
1225 .await?
1226 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1227 objects.push(PbObject {
1228 object_info: Some(PbObjectInfo::Source(
1229 ObjectModel(source, source_obj.unwrap()).into(),
1230 )),
1231 })
1232 }
1233 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1234 }
1235 if let Some(table_col_index_mapping) = col_index_mapping {
1236 let expr_rewriter = ReplaceTableExprRewriter {
1237 table_col_index_mapping,
1238 };
1239
1240 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1241 .select_only()
1242 .columns([index::Column::IndexId, index::Column::IndexItems])
1243 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1244 .into_tuple()
1245 .all(txn)
1246 .await?;
1247 for (index_id, nodes) in index_items {
1248 let mut pb_nodes = nodes.to_protobuf();
1249 pb_nodes
1250 .iter_mut()
1251 .for_each(|x| expr_rewriter.rewrite_expr(x));
1252 let index = index::ActiveModel {
1253 index_id: Set(index_id),
1254 index_items: Set(pb_nodes.into()),
1255 ..Default::default()
1256 }
1257 .update(txn)
1258 .await?;
1259 let index_obj = index
1260 .find_related(Object)
1261 .one(txn)
1262 .await?
1263 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1264 objects.push(PbObject {
1265 object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1266 });
1267 }
1268 }
1269
1270 let fragment_mapping: Vec<_> = get_fragment_mappings(txn, original_job_id as _).await?;
1271
1272 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1273 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1274 notification_objs =
1275 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1276 }
1277
1278 Ok((objects, fragment_mapping, notification_objs))
1279 }
1280
1281 pub async fn try_abort_replacing_streaming_job(&self, tmp_job_id: ObjectId) -> MetaResult<()> {
1283 let inner = self.inner.write().await;
1284 Object::delete_by_id(tmp_job_id).exec(&inner.db).await?;
1285 Ok(())
1286 }
1287
1288 pub async fn update_source_rate_limit_by_source_id(
1291 &self,
1292 source_id: SourceId,
1293 rate_limit: Option<u32>,
1294 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1295 let inner = self.inner.read().await;
1296 let txn = inner.db.begin().await?;
1297
1298 {
1299 let active_source = source::ActiveModel {
1300 source_id: Set(source_id),
1301 rate_limit: Set(rate_limit.map(|v| v as i32)),
1302 ..Default::default()
1303 };
1304 active_source.update(&txn).await?;
1305 }
1306
1307 let (source, obj) = Source::find_by_id(source_id)
1308 .find_also_related(Object)
1309 .one(&txn)
1310 .await?
1311 .ok_or_else(|| {
1312 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1313 })?;
1314
1315 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1316 let streaming_job_ids: Vec<ObjectId> =
1317 if let Some(table_id) = source.optional_associated_table_id {
1318 vec![table_id]
1319 } else if let Some(source_info) = &source.source_info
1320 && source_info.to_protobuf().is_shared()
1321 {
1322 vec![source_id]
1323 } else {
1324 ObjectDependency::find()
1325 .select_only()
1326 .column(object_dependency::Column::UsedBy)
1327 .filter(object_dependency::Column::Oid.eq(source_id))
1328 .into_tuple()
1329 .all(&txn)
1330 .await?
1331 };
1332
1333 if streaming_job_ids.is_empty() {
1334 return Err(MetaError::invalid_parameter(format!(
1335 "source id {source_id} not used by any streaming job"
1336 )));
1337 }
1338
1339 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1340 .select_only()
1341 .columns([
1342 fragment::Column::FragmentId,
1343 fragment::Column::FragmentTypeMask,
1344 fragment::Column::StreamNode,
1345 ])
1346 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1347 .into_tuple()
1348 .all(&txn)
1349 .await?;
1350 let mut fragments = fragments
1351 .into_iter()
1352 .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf()))
1353 .collect_vec();
1354
1355 fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1356 let mut found = false;
1357 if *fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
1358 visit_stream_node_mut(stream_node, |node| {
1359 if let PbNodeBody::Source(node) = node {
1360 if let Some(node_inner) = &mut node.source_inner
1361 && node_inner.source_id == source_id as u32
1362 {
1363 node_inner.rate_limit = rate_limit;
1364 found = true;
1365 }
1366 }
1367 });
1368 }
1369 if is_fs_source {
1370 visit_stream_node_mut(stream_node, |node| {
1373 if let PbNodeBody::StreamFsFetch(node) = node {
1374 *fragment_type_mask |= PbFragmentTypeFlag::FsFetch as i32;
1375 if let Some(node_inner) = &mut node.node_inner
1376 && node_inner.source_id == source_id as u32
1377 {
1378 node_inner.rate_limit = rate_limit;
1379 found = true;
1380 }
1381 }
1382 });
1383 }
1384 found
1385 });
1386
1387 assert!(
1388 !fragments.is_empty(),
1389 "source id should be used by at least one fragment"
1390 );
1391 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1392 for (id, fragment_type_mask, stream_node) in fragments {
1393 fragment::ActiveModel {
1394 fragment_id: Set(id),
1395 fragment_type_mask: Set(fragment_type_mask),
1396 stream_node: Set(StreamNode::from(&stream_node)),
1397 ..Default::default()
1398 }
1399 .update(&txn)
1400 .await?;
1401 }
1402 let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1403
1404 txn.commit().await?;
1405
1406 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1407 let _version = self
1408 .notify_frontend(
1409 NotificationOperation::Update,
1410 NotificationInfo::ObjectGroup(PbObjectGroup {
1411 objects: vec![PbObject {
1412 object_info: Some(relation_info),
1413 }],
1414 }),
1415 )
1416 .await;
1417
1418 Ok(fragment_actors)
1419 }
1420
1421 async fn mutate_fragments_by_job_id(
1424 &self,
1425 job_id: ObjectId,
1426 mut fragments_mutation_fn: impl FnMut(&mut i32, &mut PbStreamNode) -> bool,
1428 err_msg: &'static str,
1430 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1431 let inner = self.inner.read().await;
1432 let txn = inner.db.begin().await?;
1433
1434 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1435 .select_only()
1436 .columns([
1437 fragment::Column::FragmentId,
1438 fragment::Column::FragmentTypeMask,
1439 fragment::Column::StreamNode,
1440 ])
1441 .filter(fragment::Column::JobId.eq(job_id))
1442 .into_tuple()
1443 .all(&txn)
1444 .await?;
1445 let mut fragments = fragments
1446 .into_iter()
1447 .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf()))
1448 .collect_vec();
1449
1450 fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1451 fragments_mutation_fn(fragment_type_mask, stream_node)
1452 });
1453 if fragments.is_empty() {
1454 return Err(MetaError::invalid_parameter(format!(
1455 "job id {job_id}: {}",
1456 err_msg
1457 )));
1458 }
1459
1460 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1461 for (id, _, stream_node) in fragments {
1462 fragment::ActiveModel {
1463 fragment_id: Set(id),
1464 stream_node: Set(StreamNode::from(&stream_node)),
1465 ..Default::default()
1466 }
1467 .update(&txn)
1468 .await?;
1469 }
1470 let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1471
1472 txn.commit().await?;
1473
1474 Ok(fragment_actors)
1475 }
1476
1477 async fn mutate_fragment_by_fragment_id(
1478 &self,
1479 fragment_id: FragmentId,
1480 mut fragment_mutation_fn: impl FnMut(&mut i32, &mut PbStreamNode) -> bool,
1481 err_msg: &'static str,
1482 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1483 let inner = self.inner.read().await;
1484 let txn = inner.db.begin().await?;
1485
1486 let (mut fragment_type_mask, stream_node): (i32, StreamNode) =
1487 Fragment::find_by_id(fragment_id)
1488 .select_only()
1489 .columns([
1490 fragment::Column::FragmentTypeMask,
1491 fragment::Column::StreamNode,
1492 ])
1493 .into_tuple()
1494 .one(&txn)
1495 .await?
1496 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1497 let mut pb_stream_node = stream_node.to_protobuf();
1498
1499 if !fragment_mutation_fn(&mut fragment_type_mask, &mut pb_stream_node) {
1500 return Err(MetaError::invalid_parameter(format!(
1501 "fragment id {fragment_id}: {}",
1502 err_msg
1503 )));
1504 }
1505
1506 fragment::ActiveModel {
1507 fragment_id: Set(fragment_id),
1508 stream_node: Set(stream_node),
1509 ..Default::default()
1510 }
1511 .update(&txn)
1512 .await?;
1513
1514 let fragment_actors = get_fragment_actor_ids(&txn, vec![fragment_id]).await?;
1515
1516 txn.commit().await?;
1517
1518 Ok(fragment_actors)
1519 }
1520
1521 pub async fn update_backfill_rate_limit_by_job_id(
1524 &self,
1525 job_id: ObjectId,
1526 rate_limit: Option<u32>,
1527 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1528 let update_backfill_rate_limit =
1529 |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1530 let mut found = false;
1531 if *fragment_type_mask & PbFragmentTypeFlag::backfill_rate_limit_fragments() != 0 {
1532 visit_stream_node_mut(stream_node, |node| match node {
1533 PbNodeBody::StreamCdcScan(node) => {
1534 node.rate_limit = rate_limit;
1535 found = true;
1536 }
1537 PbNodeBody::StreamScan(node) => {
1538 node.rate_limit = rate_limit;
1539 found = true;
1540 }
1541 PbNodeBody::SourceBackfill(node) => {
1542 node.rate_limit = rate_limit;
1543 found = true;
1544 }
1545 PbNodeBody::Sink(node) => {
1546 node.rate_limit = rate_limit;
1547 found = true;
1548 }
1549 _ => {}
1550 });
1551 }
1552 found
1553 };
1554
1555 self.mutate_fragments_by_job_id(
1556 job_id,
1557 update_backfill_rate_limit,
1558 "stream scan node or source node not found",
1559 )
1560 .await
1561 }
1562
1563 pub async fn update_sink_rate_limit_by_job_id(
1566 &self,
1567 job_id: ObjectId,
1568 rate_limit: Option<u32>,
1569 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1570 let update_sink_rate_limit =
1571 |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1572 let mut found = false;
1573 if *fragment_type_mask & PbFragmentTypeFlag::sink_rate_limit_fragments() != 0 {
1574 visit_stream_node_mut(stream_node, |node| {
1575 if let PbNodeBody::Sink(node) = node {
1576 node.rate_limit = rate_limit;
1577 found = true;
1578 }
1579 });
1580 }
1581 found
1582 };
1583
1584 self.mutate_fragments_by_job_id(job_id, update_sink_rate_limit, "sink node not found")
1585 .await
1586 }
1587
1588 pub async fn update_dml_rate_limit_by_job_id(
1589 &self,
1590 job_id: ObjectId,
1591 rate_limit: Option<u32>,
1592 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1593 let update_dml_rate_limit =
1594 |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1595 let mut found = false;
1596 if *fragment_type_mask & PbFragmentTypeFlag::dml_rate_limit_fragments() != 0 {
1597 visit_stream_node_mut(stream_node, |node| {
1598 if let PbNodeBody::Dml(node) = node {
1599 node.rate_limit = rate_limit;
1600 found = true;
1601 }
1602 });
1603 }
1604 found
1605 };
1606
1607 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1608 .await
1609 }
1610
1611 pub async fn update_sink_props_by_sink_id(
1612 &self,
1613 sink_id: SinkId,
1614 props: BTreeMap<String, String>,
1615 ) -> MetaResult<HashMap<String, String>> {
1616 let inner = self.inner.read().await;
1617 let txn = inner.db.begin().await?;
1618
1619 let (sink, _obj) = Sink::find_by_id(sink_id)
1620 .find_also_related(Object)
1621 .one(&txn)
1622 .await?
1623 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
1624
1625 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
1627 Some(connector) => {
1628 let connector_type = connector.to_lowercase();
1629 match_sink_name_str!(
1630 connector_type.as_str(),
1631 SinkType,
1632 {
1633 for (k, v) in &props {
1634 if !SinkType::SINK_ALTER_CONFIG_LIST.contains(&k.as_str()) {
1635 return Err(SinkError::Config(anyhow!(
1636 "unsupported alter config: {}={}",
1637 k,
1638 v
1639 ))
1640 .into());
1641 }
1642 }
1643 let mut new_props = sink.properties.0.clone();
1644 new_props.extend(props.clone());
1645 SinkType::validate_alter_config(&new_props)
1646 },
1647 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
1648 )?
1649 }
1650 None => {
1651 return Err(
1652 SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
1653 );
1654 }
1655 };
1656 let definition = sink.definition.clone();
1657 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1658 .map_err(|e| SinkError::Config(anyhow!(e)))?
1659 .try_into()
1660 .unwrap();
1661 if let Statement::CreateSink { stmt } = &mut stmt {
1662 let mut new_sql_options = stmt
1663 .with_properties
1664 .0
1665 .iter()
1666 .map(|sql_option| (&sql_option.name, sql_option))
1667 .collect::<IndexMap<_, _>>();
1668 let add_sql_options = props
1669 .iter()
1670 .map(|(k, v)| SqlOption::try_from((k, v)))
1671 .collect::<Result<Vec<SqlOption>, ParserError>>()
1672 .map_err(|e| SinkError::Config(anyhow!(e)))?;
1673 new_sql_options.extend(
1674 add_sql_options
1675 .iter()
1676 .map(|sql_option| (&sql_option.name, sql_option)),
1677 );
1678 stmt.with_properties.0 = new_sql_options.into_values().cloned().collect();
1679 } else {
1680 panic!("sink definition is not a create sink statement")
1681 }
1682 let mut new_config = sink.properties.clone().into_inner();
1683 new_config.extend(props);
1684
1685 let active_sink = sink::ActiveModel {
1686 sink_id: Set(sink_id),
1687 properties: Set(risingwave_meta_model::Property(new_config.clone())),
1688 definition: Set(stmt.to_string()),
1689 ..Default::default()
1690 };
1691 active_sink.update(&txn).await?;
1692
1693 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1694 .select_only()
1695 .columns([
1696 fragment::Column::FragmentId,
1697 fragment::Column::FragmentTypeMask,
1698 fragment::Column::StreamNode,
1699 ])
1700 .filter(fragment::Column::JobId.eq(sink_id))
1701 .into_tuple()
1702 .all(&txn)
1703 .await?;
1704 let fragments = fragments
1705 .into_iter()
1706 .filter(|(_, fragment_type_mask, _)| {
1707 *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
1708 })
1709 .filter_map(|(id, _, stream_node)| {
1710 let mut stream_node = stream_node.to_protobuf();
1711 let mut found = false;
1712 visit_stream_node_mut(&mut stream_node, |node| {
1713 if let PbNodeBody::Sink(node) = node
1714 && let Some(sink_desc) = &mut node.sink_desc
1715 && sink_desc.id == sink_id as u32
1716 {
1717 sink_desc.properties = new_config.clone();
1718 found = true;
1719 }
1720 });
1721 if found { Some((id, stream_node)) } else { None }
1722 })
1723 .collect_vec();
1724 assert!(
1725 !fragments.is_empty(),
1726 "sink id should be used by at least one fragment"
1727 );
1728 for (id, stream_node) in fragments {
1729 fragment::ActiveModel {
1730 fragment_id: Set(id),
1731 stream_node: Set(StreamNode::from(&stream_node)),
1732 ..Default::default()
1733 }
1734 .update(&txn)
1735 .await?;
1736 }
1737
1738 let (sink, obj) = Sink::find_by_id(sink_id)
1739 .find_also_related(Object)
1740 .one(&txn)
1741 .await?
1742 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
1743
1744 txn.commit().await?;
1745
1746 let relation_info = PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into());
1747 let _version = self
1748 .notify_frontend(
1749 NotificationOperation::Update,
1750 NotificationInfo::ObjectGroup(PbObjectGroup {
1751 objects: vec![PbObject {
1752 object_info: Some(relation_info),
1753 }],
1754 }),
1755 )
1756 .await;
1757
1758 Ok(new_config.into_iter().collect())
1759 }
1760
1761 pub async fn update_fragment_rate_limit_by_fragment_id(
1762 &self,
1763 fragment_id: FragmentId,
1764 rate_limit: Option<u32>,
1765 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1766 let update_rate_limit = |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1767 let mut found = false;
1768 if *fragment_type_mask & PbFragmentTypeFlag::dml_rate_limit_fragments() != 0
1769 || *fragment_type_mask & PbFragmentTypeFlag::sink_rate_limit_fragments() != 0
1770 || *fragment_type_mask & PbFragmentTypeFlag::backfill_rate_limit_fragments() != 0
1771 || *fragment_type_mask & PbFragmentTypeFlag::source_rate_limit_fragments() != 0
1772 {
1773 visit_stream_node_mut(stream_node, |node| {
1774 if let PbNodeBody::Dml(node) = node {
1775 node.rate_limit = rate_limit;
1776 found = true;
1777 }
1778 if let PbNodeBody::Sink(node) = node {
1779 node.rate_limit = rate_limit;
1780 found = true;
1781 }
1782 if let PbNodeBody::StreamCdcScan(node) = node {
1783 node.rate_limit = rate_limit;
1784 found = true;
1785 }
1786 if let PbNodeBody::StreamScan(node) = node {
1787 node.rate_limit = rate_limit;
1788 found = true;
1789 }
1790 if let PbNodeBody::SourceBackfill(node) = node {
1791 node.rate_limit = rate_limit;
1792 found = true;
1793 }
1794 });
1795 }
1796 found
1797 };
1798 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
1799 .await
1800 }
1801
1802 pub async fn post_apply_reschedules(
1803 &self,
1804 reschedules: HashMap<FragmentId, Reschedule>,
1805 post_updates: &JobReschedulePostUpdates,
1806 ) -> MetaResult<()> {
1807 let new_created_actors: HashSet<_> = reschedules
1808 .values()
1809 .flat_map(|reschedule| {
1810 reschedule
1811 .added_actors
1812 .values()
1813 .flatten()
1814 .map(|actor_id| *actor_id as ActorId)
1815 })
1816 .collect();
1817
1818 let inner = self.inner.write().await;
1819
1820 let txn = inner.db.begin().await?;
1821
1822 let mut fragment_mapping_to_notify = vec![];
1823
1824 for (
1825 fragment_id,
1826 Reschedule {
1827 removed_actors,
1828 vnode_bitmap_updates,
1829 actor_splits,
1830 newly_created_actors,
1831 ..
1832 },
1833 ) in reschedules
1834 {
1835 Actor::delete_many()
1837 .filter(
1838 actor::Column::ActorId
1839 .is_in(removed_actors.iter().map(|id| *id as ActorId).collect_vec()),
1840 )
1841 .exec(&txn)
1842 .await?;
1843
1844 for (
1846 (
1847 StreamActor {
1848 actor_id,
1849 fragment_id,
1850 vnode_bitmap,
1851 expr_context,
1852 ..
1853 },
1854 _,
1855 ),
1856 worker_id,
1857 ) in newly_created_actors.into_values()
1858 {
1859 let splits = actor_splits
1860 .get(&actor_id)
1861 .map(|splits| splits.iter().map(PbConnectorSplit::from).collect_vec());
1862
1863 Actor::insert(actor::ActiveModel {
1864 actor_id: Set(actor_id as _),
1865 fragment_id: Set(fragment_id as _),
1866 status: Set(ActorStatus::Running),
1867 splits: Set(splits.map(|splits| (&PbConnectorSplits { splits }).into())),
1868 worker_id: Set(worker_id),
1869 upstream_actor_ids: Set(Default::default()),
1870 vnode_bitmap: Set(vnode_bitmap
1871 .as_ref()
1872 .map(|bitmap| (&bitmap.to_protobuf()).into())),
1873 expr_context: Set(expr_context.as_ref().unwrap().into()),
1874 })
1875 .exec(&txn)
1876 .await?;
1877 }
1878
1879 for (actor_id, bitmap) in vnode_bitmap_updates {
1881 let actor = Actor::find_by_id(actor_id as ActorId)
1882 .one(&txn)
1883 .await?
1884 .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
1885
1886 let mut actor = actor.into_active_model();
1887 actor.vnode_bitmap = Set(Some((&bitmap.to_protobuf()).into()));
1888 actor.update(&txn).await?;
1889 }
1890
1891 for (actor_id, splits) in actor_splits {
1893 if new_created_actors.contains(&(actor_id as ActorId)) {
1894 continue;
1895 }
1896
1897 let actor = Actor::find_by_id(actor_id as ActorId)
1898 .one(&txn)
1899 .await?
1900 .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
1901
1902 let mut actor = actor.into_active_model();
1903 let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
1904 actor.splits = Set(Some((&PbConnectorSplits { splits }).into()));
1905 actor.update(&txn).await?;
1906 }
1907
1908 let fragment = Fragment::find_by_id(fragment_id)
1910 .one(&txn)
1911 .await?
1912 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1913
1914 let job_actors = fragment
1915 .find_related(Actor)
1916 .all(&txn)
1917 .await?
1918 .into_iter()
1919 .map(|actor| {
1920 (
1921 fragment_id,
1922 fragment.distribution_type,
1923 actor.actor_id,
1924 actor.vnode_bitmap,
1925 actor.worker_id,
1926 actor.status,
1927 )
1928 })
1929 .collect_vec();
1930
1931 fragment_mapping_to_notify.extend(rebuild_fragment_mapping_from_actors(job_actors));
1932 }
1933
1934 let JobReschedulePostUpdates {
1935 parallelism_updates,
1936 resource_group_updates,
1937 } = post_updates;
1938
1939 for (table_id, parallelism) in parallelism_updates {
1940 let mut streaming_job = StreamingJobModel::find_by_id(table_id.table_id() as ObjectId)
1941 .one(&txn)
1942 .await?
1943 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?
1944 .into_active_model();
1945
1946 streaming_job.parallelism = Set(match parallelism {
1947 TableParallelism::Adaptive => StreamingParallelism::Adaptive,
1948 TableParallelism::Fixed(n) => StreamingParallelism::Fixed(*n as _),
1949 TableParallelism::Custom => StreamingParallelism::Custom,
1950 });
1951
1952 if let Some(resource_group) =
1953 resource_group_updates.get(&(table_id.table_id() as ObjectId))
1954 {
1955 streaming_job.specific_resource_group = Set(resource_group.to_owned());
1956 }
1957
1958 streaming_job.update(&txn).await?;
1959 }
1960
1961 txn.commit().await?;
1962 self.notify_fragment_mapping(Operation::Update, fragment_mapping_to_notify)
1963 .await;
1964
1965 Ok(())
1966 }
1967
1968 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
1971 let inner = self.inner.read().await;
1972 let txn = inner.db.begin().await?;
1973
1974 let fragments: Vec<(FragmentId, ObjectId, i32, StreamNode)> = Fragment::find()
1975 .select_only()
1976 .columns([
1977 fragment::Column::FragmentId,
1978 fragment::Column::JobId,
1979 fragment::Column::FragmentTypeMask,
1980 fragment::Column::StreamNode,
1981 ])
1982 .filter(fragment_type_mask_intersects(
1983 PbFragmentTypeFlag::rate_limit_fragments(),
1984 ))
1985 .into_tuple()
1986 .all(&txn)
1987 .await?;
1988
1989 let mut rate_limits = Vec::new();
1990 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
1991 let stream_node = stream_node.to_protobuf();
1992 let mut rate_limit = None;
1993 let mut node_name = None;
1994
1995 visit_stream_node(&stream_node, |node| {
1996 match node {
1997 PbNodeBody::Source(node) => {
1999 if let Some(node_inner) = &node.source_inner {
2000 debug_assert!(
2001 rate_limit.is_none(),
2002 "one fragment should only have 1 rate limit node"
2003 );
2004 rate_limit = node_inner.rate_limit;
2005 node_name = Some("SOURCE");
2006 }
2007 }
2008 PbNodeBody::StreamFsFetch(node) => {
2009 if let Some(node_inner) = &node.node_inner {
2010 debug_assert!(
2011 rate_limit.is_none(),
2012 "one fragment should only have 1 rate limit node"
2013 );
2014 rate_limit = node_inner.rate_limit;
2015 node_name = Some("FS_FETCH");
2016 }
2017 }
2018 PbNodeBody::SourceBackfill(node) => {
2020 debug_assert!(
2021 rate_limit.is_none(),
2022 "one fragment should only have 1 rate limit node"
2023 );
2024 rate_limit = node.rate_limit;
2025 node_name = Some("SOURCE_BACKFILL");
2026 }
2027 PbNodeBody::StreamScan(node) => {
2028 debug_assert!(
2029 rate_limit.is_none(),
2030 "one fragment should only have 1 rate limit node"
2031 );
2032 rate_limit = node.rate_limit;
2033 node_name = Some("STREAM_SCAN");
2034 }
2035 PbNodeBody::StreamCdcScan(node) => {
2036 debug_assert!(
2037 rate_limit.is_none(),
2038 "one fragment should only have 1 rate limit node"
2039 );
2040 rate_limit = node.rate_limit;
2041 node_name = Some("STREAM_CDC_SCAN");
2042 }
2043 PbNodeBody::Sink(node) => {
2044 debug_assert!(
2045 rate_limit.is_none(),
2046 "one fragment should only have 1 rate limit node"
2047 );
2048 rate_limit = node.rate_limit;
2049 node_name = Some("SINK");
2050 }
2051 _ => {}
2052 }
2053 });
2054
2055 if let Some(rate_limit) = rate_limit {
2056 rate_limits.push(RateLimitInfo {
2057 fragment_id: fragment_id as u32,
2058 job_id: job_id as u32,
2059 fragment_type_mask: fragment_type_mask as u32,
2060 rate_limit,
2061 node_name: node_name.unwrap().to_owned(),
2062 });
2063 }
2064 }
2065
2066 Ok(rate_limits)
2067 }
2068}
2069
2070fn bitflag_intersects(column: SimpleExpr, value: i32) -> SimpleExpr {
2071 column
2072 .binary(BinOper::Custom("&"), value)
2073 .binary(BinOper::NotEqual, 0)
2074}
2075
2076fn fragment_type_mask_intersects(value: i32) -> SimpleExpr {
2077 bitflag_intersects(fragment::Column::FragmentTypeMask.into_simple_expr(), value)
2078}
2079
2080pub struct SinkIntoTableContext {
2081 pub creating_sink_id: Option<SinkId>,
2083 pub dropping_sink_id: Option<SinkId>,
2085 pub updated_sink_catalogs: Vec<SinkId>,
2088}