1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::config::DefaultParallelism;
22use risingwave_common::hash::VnodeCountCompat;
23use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_mut};
24use risingwave_common::{bail, current_cluster_version};
25use risingwave_connector::sink::file_sink::fs::FsSink;
26use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
27use risingwave_connector::{WithPropertiesExt, match_sink_name_str};
28use risingwave_meta_model::actor::ActorStatus;
29use risingwave_meta_model::object::ObjectType;
30use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
31use risingwave_meta_model::table::TableType;
32use risingwave_meta_model::*;
33use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
34use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId;
35use risingwave_pb::catalog::{PbCreateType, PbTable};
36use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
37use risingwave_pb::meta::object::PbObjectInfo;
38use risingwave_pb::meta::subscribe_response::{
39 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
40};
41use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
42use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
43use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
44use risingwave_pb::stream_plan::stream_node::PbNodeBody;
45use risingwave_pb::stream_plan::{FragmentTypeFlag, PbFragmentTypeFlag, PbStreamNode};
46use risingwave_pb::user::PbUserInfo;
47use risingwave_sqlparser::ast::{SqlOption, Statement};
48use risingwave_sqlparser::parser::{Parser, ParserError};
49use sea_orm::ActiveValue::Set;
50use sea_orm::sea_query::{BinOper, Expr, Query, SimpleExpr};
51use sea_orm::{
52 ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel,
53 IntoSimpleExpr, JoinType, ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect,
54 RelationTrait, TransactionTrait,
55};
56use thiserror_ext::AsReport;
57
58use super::rename::IndexItemRewriter;
59use crate::barrier::{ReplaceStreamJobPlan, Reschedule};
60use crate::controller::ObjectModel;
61use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
62use crate::controller::utils::{
63 PartialObject, build_object_group_for_delete, check_relation_name_duplicate,
64 check_sink_into_table_cycle, ensure_object_id, ensure_user_id, get_fragment_actor_ids,
65 get_fragment_mappings, get_internal_tables_by_id, insert_fragment_relations,
66 rebuild_fragment_mapping_from_actors,
67};
68use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
69use crate::model::{
70 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamActor, StreamContext,
71 StreamJobFragmentsToCreate, TableParallelism,
72};
73use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
74use crate::{MetaError, MetaResult};
75
76impl CatalogController {
77 pub async fn create_streaming_job_obj(
78 txn: &DatabaseTransaction,
79 obj_type: ObjectType,
80 owner_id: UserId,
81 database_id: Option<DatabaseId>,
82 schema_id: Option<SchemaId>,
83 create_type: PbCreateType,
84 ctx: &StreamContext,
85 streaming_parallelism: StreamingParallelism,
86 max_parallelism: usize,
87 specific_resource_group: Option<String>, ) -> MetaResult<ObjectId> {
89 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
90 let job = streaming_job::ActiveModel {
91 job_id: Set(obj.oid),
92 job_status: Set(JobStatus::Initial),
93 create_type: Set(create_type.into()),
94 timezone: Set(ctx.timezone.clone()),
95 parallelism: Set(streaming_parallelism),
96 max_parallelism: Set(max_parallelism as _),
97 specific_resource_group: Set(specific_resource_group),
98 };
99 job.insert(txn).await?;
100
101 Ok(obj.oid)
102 }
103
104 #[await_tree::instrument]
110 pub async fn create_job_catalog(
111 &self,
112 streaming_job: &mut StreamingJob,
113 ctx: &StreamContext,
114 parallelism: &Option<Parallelism>,
115 max_parallelism: usize,
116 mut dependencies: HashSet<ObjectId>,
117 specific_resource_group: Option<String>,
118 ) -> MetaResult<()> {
119 let inner = self.inner.write().await;
120 let txn = inner.db.begin().await?;
121 let create_type = streaming_job.create_type();
122
123 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
124 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
125 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
126 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
127 };
128
129 ensure_user_id(streaming_job.owner() as _, &txn).await?;
130 ensure_object_id(ObjectType::Database, streaming_job.database_id() as _, &txn).await?;
131 ensure_object_id(ObjectType::Schema, streaming_job.schema_id() as _, &txn).await?;
132 check_relation_name_duplicate(
133 &streaming_job.name(),
134 streaming_job.database_id() as _,
135 streaming_job.schema_id() as _,
136 &txn,
137 )
138 .await?;
139
140 dependencies.extend(
142 streaming_job
143 .dependent_relations()
144 .into_iter()
145 .map(|id| id as ObjectId),
146 );
147
148 if !dependencies.is_empty() {
150 let altering_cnt = ObjectDependency::find()
151 .join(
152 JoinType::InnerJoin,
153 object_dependency::Relation::Object1.def(),
154 )
155 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
156 .filter(
157 object_dependency::Column::Oid
158 .is_in(dependencies.clone())
159 .and(object::Column::ObjType.eq(ObjectType::Table))
160 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
161 .and(
162 object::Column::Oid.not_in_subquery(
164 Query::select()
165 .column(table::Column::TableId)
166 .from(Table)
167 .to_owned(),
168 ),
169 ),
170 )
171 .count(&txn)
172 .await?;
173 if altering_cnt != 0 {
174 return Err(MetaError::permission_denied(
175 "some dependent relations are being altered",
176 ));
177 }
178 }
179
180 match streaming_job {
181 StreamingJob::MaterializedView(table) => {
182 let job_id = Self::create_streaming_job_obj(
183 &txn,
184 ObjectType::Table,
185 table.owner as _,
186 Some(table.database_id as _),
187 Some(table.schema_id as _),
188 create_type,
189 ctx,
190 streaming_parallelism,
191 max_parallelism,
192 specific_resource_group,
193 )
194 .await?;
195 table.id = job_id as _;
196 let table_model: table::ActiveModel = table.clone().into();
197 Table::insert(table_model).exec(&txn).await?;
198 }
199 StreamingJob::Sink(sink, _) => {
200 if let Some(target_table_id) = sink.target_table {
201 if check_sink_into_table_cycle(
202 target_table_id as ObjectId,
203 dependencies.iter().cloned().collect(),
204 &txn,
205 )
206 .await?
207 {
208 bail!("Creating such a sink will result in circular dependency.");
209 }
210 }
211
212 let job_id = Self::create_streaming_job_obj(
213 &txn,
214 ObjectType::Sink,
215 sink.owner as _,
216 Some(sink.database_id as _),
217 Some(sink.schema_id as _),
218 create_type,
219 ctx,
220 streaming_parallelism,
221 max_parallelism,
222 specific_resource_group,
223 )
224 .await?;
225 sink.id = job_id as _;
226 let sink_model: sink::ActiveModel = sink.clone().into();
227 Sink::insert(sink_model).exec(&txn).await?;
228 }
229 StreamingJob::Table(src, table, _) => {
230 let job_id = Self::create_streaming_job_obj(
231 &txn,
232 ObjectType::Table,
233 table.owner as _,
234 Some(table.database_id as _),
235 Some(table.schema_id as _),
236 create_type,
237 ctx,
238 streaming_parallelism,
239 max_parallelism,
240 specific_resource_group,
241 )
242 .await?;
243 table.id = job_id as _;
244 if let Some(src) = src {
245 let src_obj = Self::create_object(
246 &txn,
247 ObjectType::Source,
248 src.owner as _,
249 Some(src.database_id as _),
250 Some(src.schema_id as _),
251 )
252 .await?;
253 src.id = src_obj.oid as _;
254 src.optional_associated_table_id =
255 Some(PbOptionalAssociatedTableId::AssociatedTableId(job_id as _));
256 table.optional_associated_source_id = Some(
257 PbOptionalAssociatedSourceId::AssociatedSourceId(src_obj.oid as _),
258 );
259 let source: source::ActiveModel = src.clone().into();
260 Source::insert(source).exec(&txn).await?;
261 }
262 let table_model: table::ActiveModel = table.clone().into();
263 Table::insert(table_model).exec(&txn).await?;
264 }
265 StreamingJob::Index(index, table) => {
266 ensure_object_id(ObjectType::Table, index.primary_table_id as _, &txn).await?;
267 let job_id = Self::create_streaming_job_obj(
268 &txn,
269 ObjectType::Index,
270 index.owner as _,
271 Some(index.database_id as _),
272 Some(index.schema_id as _),
273 create_type,
274 ctx,
275 streaming_parallelism,
276 max_parallelism,
277 specific_resource_group,
278 )
279 .await?;
280 index.id = job_id as _;
282 index.index_table_id = job_id as _;
283 table.id = job_id as _;
284
285 ObjectDependency::insert(object_dependency::ActiveModel {
286 oid: Set(index.primary_table_id as _),
287 used_by: Set(table.id as _),
288 ..Default::default()
289 })
290 .exec(&txn)
291 .await?;
292
293 let table_model: table::ActiveModel = table.clone().into();
294 Table::insert(table_model).exec(&txn).await?;
295 let index_model: index::ActiveModel = index.clone().into();
296 Index::insert(index_model).exec(&txn).await?;
297 }
298 StreamingJob::Source(src) => {
299 let job_id = Self::create_streaming_job_obj(
300 &txn,
301 ObjectType::Source,
302 src.owner as _,
303 Some(src.database_id as _),
304 Some(src.schema_id as _),
305 create_type,
306 ctx,
307 streaming_parallelism,
308 max_parallelism,
309 specific_resource_group,
310 )
311 .await?;
312 src.id = job_id as _;
313 let source_model: source::ActiveModel = src.clone().into();
314 Source::insert(source_model).exec(&txn).await?;
315 }
316 }
317
318 dependencies.extend(
320 streaming_job
321 .dependent_secret_ids()?
322 .into_iter()
323 .map(|secret_id| secret_id as ObjectId),
324 );
325 dependencies.extend(
327 streaming_job
328 .dependent_connection_ids()?
329 .into_iter()
330 .map(|conn_id| conn_id as ObjectId),
331 );
332
333 if !dependencies.is_empty() {
335 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
336 object_dependency::ActiveModel {
337 oid: Set(oid),
338 used_by: Set(streaming_job.id() as _),
339 ..Default::default()
340 }
341 }))
342 .exec(&txn)
343 .await?;
344 }
345
346 txn.commit().await?;
347
348 Ok(())
349 }
350
351 pub async fn create_internal_table_catalog(
359 &self,
360 job: &StreamingJob,
361 mut incomplete_internal_tables: Vec<PbTable>,
362 ) -> MetaResult<HashMap<u32, u32>> {
363 let job_id = job.id() as ObjectId;
364 let inner = self.inner.write().await;
365 let txn = inner.db.begin().await?;
366 let mut table_id_map = HashMap::new();
367 for table in &mut incomplete_internal_tables {
368 let table_id = Self::create_object(
369 &txn,
370 ObjectType::Table,
371 table.owner as _,
372 Some(table.database_id as _),
373 Some(table.schema_id as _),
374 )
375 .await?
376 .oid;
377 table_id_map.insert(table.id, table_id as u32);
378 table.id = table_id as _;
379 table.job_id = Some(job_id as _);
380
381 let table_model = table::ActiveModel {
382 table_id: Set(table_id as _),
383 belongs_to_job_id: Set(Some(job_id)),
384 fragment_id: NotSet,
385 ..table.clone().into()
386 };
387 Table::insert(table_model).exec(&txn).await?;
388 }
389 txn.commit().await?;
390
391 Ok(table_id_map)
392 }
393
394 #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" })]
399 pub async fn prepare_streaming_job(
400 &self,
401 stream_job_fragments: &StreamJobFragmentsToCreate,
402 streaming_job: &StreamingJob,
403 for_replace: bool,
404 ) -> MetaResult<()> {
405 let is_materialized_view = streaming_job.is_materialized_view();
406 let fragment_actors =
407 Self::extract_fragment_and_actors_from_fragments(stream_job_fragments)?;
408 let mut all_tables = stream_job_fragments.all_tables();
409 let inner = self.inner.write().await;
410
411 let mut objects = vec![];
412 let txn = inner.db.begin().await?;
413
414 let (fragments, actors): (Vec<_>, Vec<_>) = fragment_actors.into_iter().unzip();
416 for fragment in fragments {
417 let fragment_id = fragment.fragment_id;
418 let state_table_ids = fragment.state_table_ids.inner_ref().clone();
419
420 let fragment = fragment.into_active_model();
421 Fragment::insert(fragment).exec(&txn).await?;
422
423 if !for_replace {
426 for state_table_id in state_table_ids {
427 let table = all_tables
431 .get_mut(&(state_table_id as u32))
432 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
433 assert_eq!(table.id, state_table_id as u32);
434 assert_eq!(table.fragment_id, fragment_id as u32);
435 table.job_id = Some(streaming_job.id());
436 let vnode_count = table.vnode_count();
437
438 table::ActiveModel {
439 table_id: Set(state_table_id as _),
440 fragment_id: Set(Some(fragment_id)),
441 vnode_count: Set(vnode_count as _),
442 ..Default::default()
443 }
444 .update(&txn)
445 .await?;
446
447 if is_materialized_view {
448 objects.push(PbObject {
449 object_info: Some(PbObjectInfo::Table(table.clone())),
450 });
451 }
452 }
453 }
454 }
455
456 insert_fragment_relations(&txn, &stream_job_fragments.downstreams).await?;
457
458 for actors in actors {
460 for actor in actors {
461 let actor = actor.into_active_model();
462 Actor::insert(actor).exec(&txn).await?;
463 }
464 }
465
466 if !for_replace {
467 if let StreamingJob::Table(_, table, ..) = streaming_job {
469 Table::update(table::ActiveModel {
470 table_id: Set(table.id as _),
471 dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)),
472 ..Default::default()
473 })
474 .exec(&txn)
475 .await?;
476 }
477 }
478
479 txn.commit().await?;
480
481 if !objects.is_empty() {
482 assert!(is_materialized_view);
483 self.notify_frontend(Operation::Add, Info::ObjectGroup(PbObjectGroup { objects }))
484 .await;
485 }
486
487 Ok(())
488 }
489
490 #[await_tree::instrument]
494 pub async fn try_abort_creating_streaming_job(
495 &self,
496 job_id: ObjectId,
497 is_cancelled: bool,
498 ) -> MetaResult<(bool, Option<DatabaseId>)> {
499 let mut inner = self.inner.write().await;
500 let txn = inner.db.begin().await?;
501
502 let obj = Object::find_by_id(job_id).one(&txn).await?;
503 let Some(obj) = obj else {
504 tracing::warn!(
505 id = job_id,
506 "streaming job not found when aborting creating, might be cleaned by recovery"
507 );
508 return Ok((true, None));
509 };
510 let database_id = obj
511 .database_id
512 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
513
514 if !is_cancelled {
515 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
516 if let Some(streaming_job) = streaming_job {
517 assert_ne!(streaming_job.job_status, JobStatus::Created);
518 if streaming_job.create_type == CreateType::Background
519 && streaming_job.job_status == JobStatus::Creating
520 {
521 tracing::warn!(
523 id = job_id,
524 "streaming job is created in background and still in creating status"
525 );
526 return Ok((false, Some(database_id)));
527 }
528 }
529 }
530
531 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
532
533 let table_obj = Table::find_by_id(job_id).one(&txn).await?;
535 let mut objs = vec![];
536 if let Some(table) = &table_obj
537 && table.table_type == TableType::MaterializedView
538 {
539 let obj: Option<PartialObject> = Object::find_by_id(job_id)
540 .select_only()
541 .columns([
542 object::Column::Oid,
543 object::Column::ObjType,
544 object::Column::SchemaId,
545 object::Column::DatabaseId,
546 ])
547 .into_partial_model()
548 .one(&txn)
549 .await?;
550 let obj =
551 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
552 objs.push(obj);
553 let internal_table_objs: Vec<PartialObject> = Object::find()
554 .select_only()
555 .columns([
556 object::Column::Oid,
557 object::Column::ObjType,
558 object::Column::SchemaId,
559 object::Column::DatabaseId,
560 ])
561 .join(JoinType::InnerJoin, object::Relation::Table.def())
562 .filter(table::Column::BelongsToJobId.eq(job_id))
563 .into_partial_model()
564 .all(&txn)
565 .await?;
566 objs.extend(internal_table_objs);
567 }
568
569 if table_obj.is_none()
571 && let Some(Some(target_table_id)) = Sink::find_by_id(job_id)
572 .select_only()
573 .column(sink::Column::TargetTable)
574 .into_tuple::<Option<TableId>>()
575 .one(&txn)
576 .await?
577 {
578 let tmp_id: Option<ObjectId> = ObjectDependency::find()
579 .select_only()
580 .column(object_dependency::Column::UsedBy)
581 .join(
582 JoinType::InnerJoin,
583 object_dependency::Relation::Object1.def(),
584 )
585 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
586 .filter(
587 object_dependency::Column::Oid
588 .eq(target_table_id)
589 .and(object::Column::ObjType.eq(ObjectType::Table))
590 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
591 )
592 .into_tuple()
593 .one(&txn)
594 .await?;
595 if let Some(tmp_id) = tmp_id {
596 tracing::warn!(
597 id = tmp_id,
598 "aborting temp streaming job for sink into table"
599 );
600 Object::delete_by_id(tmp_id).exec(&txn).await?;
601 }
602 }
603
604 Object::delete_by_id(job_id).exec(&txn).await?;
605 if !internal_table_ids.is_empty() {
606 Object::delete_many()
607 .filter(object::Column::Oid.is_in(internal_table_ids))
608 .exec(&txn)
609 .await?;
610 }
611 if let Some(t) = &table_obj
612 && let Some(source_id) = t.optional_associated_source_id
613 {
614 Object::delete_by_id(source_id).exec(&txn).await?;
615 }
616
617 let err = if is_cancelled {
618 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
619 } else {
620 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
621 };
622 let abort_reason = format!("streaming job aborted {}", err.as_report());
623 for tx in inner
624 .creating_table_finish_notifier
625 .get_mut(&database_id)
626 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
627 .into_iter()
628 .flatten()
629 .flatten()
630 {
631 let _ = tx.send(Err(abort_reason.clone()));
632 }
633 txn.commit().await?;
634
635 if !objs.is_empty() {
636 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
639 .await;
640 }
641 Ok((true, Some(database_id)))
642 }
643
644 #[await_tree::instrument]
645 pub async fn post_collect_job_fragments(
646 &self,
647 job_id: ObjectId,
648 actor_ids: Vec<crate::model::ActorId>,
649 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
650 split_assignment: &SplitAssignment,
651 ) -> MetaResult<()> {
652 self.post_collect_job_fragments_inner(
653 job_id,
654 actor_ids,
655 upstream_fragment_new_downstreams,
656 split_assignment,
657 false,
658 )
659 .await
660 }
661
662 pub async fn post_collect_job_fragments_inner(
663 &self,
664 job_id: ObjectId,
665 actor_ids: Vec<crate::model::ActorId>,
666 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
667 split_assignment: &SplitAssignment,
668 is_mv: bool,
669 ) -> MetaResult<()> {
670 let inner = self.inner.write().await;
671 let txn = inner.db.begin().await?;
672
673 Actor::update_many()
674 .col_expr(
675 actor::Column::Status,
676 SimpleExpr::from(ActorStatus::Running.into_value()),
677 )
678 .filter(
679 actor::Column::ActorId
680 .is_in(actor_ids.into_iter().map(|id| id as ActorId).collect_vec()),
681 )
682 .exec(&txn)
683 .await?;
684
685 for splits in split_assignment.values() {
686 for (actor_id, splits) in splits {
687 let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
688 let connector_splits = &PbConnectorSplits { splits };
689 actor::ActiveModel {
690 actor_id: Set(*actor_id as _),
691 splits: Set(Some(connector_splits.into())),
692 ..Default::default()
693 }
694 .update(&txn)
695 .await?;
696 }
697 }
698
699 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
700
701 streaming_job::ActiveModel {
703 job_id: Set(job_id),
704 job_status: Set(JobStatus::Creating),
705 ..Default::default()
706 }
707 .update(&txn)
708 .await?;
709
710 let fragment_mapping = if is_mv {
711 get_fragment_mappings(&txn, job_id as _).await?
712 } else {
713 vec![]
714 };
715
716 txn.commit().await?;
717 self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
718 .await;
719
720 Ok(())
721 }
722
723 pub async fn create_job_catalog_for_replace(
724 &self,
725 streaming_job: &StreamingJob,
726 ctx: &StreamContext,
727 specified_parallelism: &Option<NonZeroUsize>,
728 max_parallelism: usize,
729 ) -> MetaResult<ObjectId> {
730 let id = streaming_job.id();
731 let inner = self.inner.write().await;
732 let txn = inner.db.begin().await?;
733
734 streaming_job.verify_version_for_replace(&txn).await?;
736 let referring_cnt = ObjectDependency::find()
738 .join(
739 JoinType::InnerJoin,
740 object_dependency::Relation::Object1.def(),
741 )
742 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
743 .filter(
744 object_dependency::Column::Oid
745 .eq(id as ObjectId)
746 .and(object::Column::ObjType.eq(ObjectType::Table))
747 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
748 )
749 .count(&txn)
750 .await?;
751 if referring_cnt != 0 {
752 return Err(MetaError::permission_denied(
753 "job is being altered or referenced by some creating jobs",
754 ));
755 }
756
757 let original_max_parallelism: i32 = StreamingJobModel::find_by_id(id as ObjectId)
759 .select_only()
760 .column(streaming_job::Column::MaxParallelism)
761 .into_tuple()
762 .one(&txn)
763 .await?
764 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
765
766 if original_max_parallelism != max_parallelism as i32 {
767 bail!(
770 "cannot use a different max parallelism \
771 when replacing streaming job, \
772 original: {}, new: {}",
773 original_max_parallelism,
774 max_parallelism
775 );
776 }
777
778 let parallelism = match specified_parallelism {
779 None => StreamingParallelism::Adaptive,
780 Some(n) => StreamingParallelism::Fixed(n.get() as _),
781 };
782
783 let new_obj_id = Self::create_streaming_job_obj(
785 &txn,
786 streaming_job.object_type(),
787 streaming_job.owner() as _,
788 Some(streaming_job.database_id() as _),
789 Some(streaming_job.schema_id() as _),
790 streaming_job.create_type(),
791 ctx,
792 parallelism,
793 max_parallelism,
794 None,
795 )
796 .await?;
797
798 ObjectDependency::insert(object_dependency::ActiveModel {
800 oid: Set(id as _),
801 used_by: Set(new_obj_id as _),
802 ..Default::default()
803 })
804 .exec(&txn)
805 .await?;
806
807 txn.commit().await?;
808
809 Ok(new_obj_id)
810 }
811
812 pub async fn finish_streaming_job(
814 &self,
815 job_id: ObjectId,
816 replace_stream_job_info: Option<ReplaceStreamJobPlan>,
817 ) -> MetaResult<()> {
818 let mut inner = self.inner.write().await;
819 let txn = inner.db.begin().await?;
820
821 let job_type = Object::find_by_id(job_id)
822 .select_only()
823 .column(object::Column::ObjType)
824 .into_tuple()
825 .one(&txn)
826 .await?
827 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
828
829 let res = Object::update_many()
831 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
832 .col_expr(
833 object::Column::CreatedAtClusterVersion,
834 current_cluster_version().into(),
835 )
836 .filter(object::Column::Oid.eq(job_id))
837 .exec(&txn)
838 .await?;
839 if res.rows_affected == 0 {
840 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
841 }
842
843 let job = streaming_job::ActiveModel {
845 job_id: Set(job_id),
846 job_status: Set(JobStatus::Created),
847 ..Default::default()
848 };
849 job.update(&txn).await?;
850
851 let internal_table_objs = Table::find()
853 .find_also_related(Object)
854 .filter(table::Column::BelongsToJobId.eq(job_id))
855 .all(&txn)
856 .await?;
857 let mut objects = internal_table_objs
858 .iter()
859 .map(|(table, obj)| PbObject {
860 object_info: Some(PbObjectInfo::Table(
861 ObjectModel(table.clone(), obj.clone().unwrap()).into(),
862 )),
863 })
864 .collect_vec();
865 let mut notification_op = NotificationOperation::Add;
866
867 match job_type {
868 ObjectType::Table => {
869 let (table, obj) = Table::find_by_id(job_id)
870 .find_also_related(Object)
871 .one(&txn)
872 .await?
873 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
874 if table.table_type == TableType::MaterializedView {
875 notification_op = NotificationOperation::Update;
876 }
877
878 if let Some(source_id) = table.optional_associated_source_id {
879 let (src, obj) = Source::find_by_id(source_id)
880 .find_also_related(Object)
881 .one(&txn)
882 .await?
883 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
884 objects.push(PbObject {
885 object_info: Some(PbObjectInfo::Source(
886 ObjectModel(src, obj.unwrap()).into(),
887 )),
888 });
889 }
890 objects.push(PbObject {
891 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
892 });
893 }
894 ObjectType::Sink => {
895 let (sink, obj) = Sink::find_by_id(job_id)
896 .find_also_related(Object)
897 .one(&txn)
898 .await?
899 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
900 objects.push(PbObject {
901 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
902 });
903 }
904 ObjectType::Index => {
905 let (index, obj) = Index::find_by_id(job_id)
906 .find_also_related(Object)
907 .one(&txn)
908 .await?
909 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
910 {
911 let (table, obj) = Table::find_by_id(index.index_table_id)
912 .find_also_related(Object)
913 .one(&txn)
914 .await?
915 .ok_or_else(|| {
916 MetaError::catalog_id_not_found("table", index.index_table_id)
917 })?;
918 objects.push(PbObject {
919 object_info: Some(PbObjectInfo::Table(
920 ObjectModel(table, obj.unwrap()).into(),
921 )),
922 });
923 }
924 objects.push(PbObject {
925 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
926 });
927 }
928 ObjectType::Source => {
929 let (source, obj) = Source::find_by_id(job_id)
930 .find_also_related(Object)
931 .one(&txn)
932 .await?
933 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
934 objects.push(PbObject {
935 object_info: Some(PbObjectInfo::Source(
936 ObjectModel(source, obj.unwrap()).into(),
937 )),
938 });
939 }
940 _ => unreachable!("invalid job type: {:?}", job_type),
941 }
942
943 let fragment_mapping = get_fragment_mappings(&txn, job_id).await?;
944
945 let replace_table_mapping_update = match replace_stream_job_info {
946 Some(ReplaceStreamJobPlan {
947 streaming_job,
948 replace_upstream,
949 tmp_id,
950 ..
951 }) => {
952 let incoming_sink_id = job_id;
953
954 let (relations, fragment_mapping, _) = Self::finish_replace_streaming_job_inner(
955 tmp_id as ObjectId,
956 replace_upstream,
957 SinkIntoTableContext {
958 creating_sink_id: Some(incoming_sink_id as _),
959 dropping_sink_id: None,
960 updated_sink_catalogs: vec![],
961 },
962 &txn,
963 streaming_job,
964 None, )
966 .await?;
967
968 Some((relations, fragment_mapping))
969 }
970 None => None,
971 };
972
973 txn.commit().await?;
974
975 self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
976 .await;
977
978 let mut version = self
979 .notify_frontend(
980 notification_op,
981 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
982 )
983 .await;
984
985 if let Some((objects, fragment_mapping)) = replace_table_mapping_update {
986 self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
987 .await;
988 version = self
989 .notify_frontend(
990 NotificationOperation::Update,
991 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
992 )
993 .await;
994 }
995 inner
996 .creating_table_finish_notifier
997 .values_mut()
998 .for_each(|creating_tables| {
999 if let Some(txs) = creating_tables.remove(&job_id) {
1000 for tx in txs {
1001 let _ = tx.send(Ok(version));
1002 }
1003 }
1004 });
1005
1006 Ok(())
1007 }
1008
1009 pub async fn finish_replace_streaming_job(
1010 &self,
1011 tmp_id: ObjectId,
1012 streaming_job: StreamingJob,
1013 replace_upstream: FragmentReplaceUpstream,
1014 sink_into_table_context: SinkIntoTableContext,
1015 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1016 ) -> MetaResult<NotificationVersion> {
1017 let inner = self.inner.write().await;
1018 let txn = inner.db.begin().await?;
1019
1020 let (objects, fragment_mapping, delete_notification_objs) =
1021 Self::finish_replace_streaming_job_inner(
1022 tmp_id,
1023 replace_upstream,
1024 sink_into_table_context,
1025 &txn,
1026 streaming_job,
1027 drop_table_connector_ctx,
1028 )
1029 .await?;
1030
1031 txn.commit().await?;
1032
1033 self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
1039 .await;
1040 let mut version = self
1041 .notify_frontend(
1042 NotificationOperation::Update,
1043 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1044 )
1045 .await;
1046
1047 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1048 self.notify_users_update(user_infos).await;
1049 version = self
1050 .notify_frontend(
1051 NotificationOperation::Delete,
1052 build_object_group_for_delete(to_drop_objects),
1053 )
1054 .await;
1055 }
1056
1057 Ok(version)
1058 }
1059
1060 pub async fn finish_replace_streaming_job_inner(
1061 tmp_id: ObjectId,
1062 replace_upstream: FragmentReplaceUpstream,
1063 SinkIntoTableContext {
1064 creating_sink_id,
1065 dropping_sink_id,
1066 updated_sink_catalogs,
1067 }: SinkIntoTableContext,
1068 txn: &DatabaseTransaction,
1069 streaming_job: StreamingJob,
1070 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1071 ) -> MetaResult<(
1072 Vec<PbObject>,
1073 Vec<PbFragmentWorkerSlotMapping>,
1074 Option<(Vec<PbUserInfo>, Vec<PartialObject>)>,
1075 )> {
1076 let original_job_id = streaming_job.id() as ObjectId;
1077 let job_type = streaming_job.job_type();
1078
1079 let mut index_item_rewriter = None;
1080
1081 match streaming_job {
1083 StreamingJob::Table(_source, table, _table_job_type) => {
1084 let original_column_catalogs = Table::find_by_id(original_job_id)
1087 .select_only()
1088 .columns([table::Column::Columns])
1089 .into_tuple::<ColumnCatalogArray>()
1090 .one(txn)
1091 .await?
1092 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1093
1094 index_item_rewriter = Some({
1095 let original_columns = original_column_catalogs
1096 .to_protobuf()
1097 .into_iter()
1098 .map(|c| c.column_desc.unwrap())
1099 .collect_vec();
1100 let new_columns = table
1101 .columns
1102 .iter()
1103 .map(|c| c.column_desc.clone().unwrap())
1104 .collect_vec();
1105
1106 IndexItemRewriter {
1107 original_columns,
1108 new_columns,
1109 }
1110 });
1111
1112 for sink_id in updated_sink_catalogs {
1114 sink::ActiveModel {
1115 sink_id: Set(sink_id as _),
1116 original_target_columns: Set(Some(original_column_catalogs.clone())),
1117 ..Default::default()
1118 }
1119 .update(txn)
1120 .await?;
1121 }
1122 let mut table = table::ActiveModel::from(table);
1124 let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone();
1125 if let Some(sink_id) = creating_sink_id {
1126 debug_assert!(!incoming_sinks.contains(&{ sink_id }));
1127 incoming_sinks.push(sink_id as _);
1128 }
1129 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1130 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1131 {
1132 table.optional_associated_source_id = Set(None);
1134 }
1135
1136 if let Some(sink_id) = dropping_sink_id {
1137 let drained = incoming_sinks
1138 .extract_if(.., |id| *id == sink_id)
1139 .collect_vec();
1140 debug_assert_eq!(drained, vec![sink_id]);
1141 }
1142
1143 table.incoming_sinks = Set(incoming_sinks.into());
1144 table.update(txn).await?;
1145 }
1146 StreamingJob::Source(source) => {
1147 let source = source::ActiveModel::from(source);
1149 source.update(txn).await?;
1150 }
1151 _ => unreachable!(
1152 "invalid streaming job type: {:?}",
1153 streaming_job.job_type_str()
1154 ),
1155 }
1156
1157 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1161 .select_only()
1162 .columns([
1163 fragment::Column::FragmentId,
1164 fragment::Column::StateTableIds,
1165 ])
1166 .filter(fragment::Column::JobId.eq(tmp_id))
1167 .into_tuple()
1168 .all(txn)
1169 .await?;
1170 for (fragment_id, state_table_ids) in fragment_info {
1171 for state_table_id in state_table_ids.into_inner() {
1172 table::ActiveModel {
1173 table_id: Set(state_table_id as _),
1174 fragment_id: Set(Some(fragment_id)),
1175 ..Default::default()
1177 }
1178 .update(txn)
1179 .await?;
1180 }
1181 }
1182
1183 Fragment::delete_many()
1185 .filter(fragment::Column::JobId.eq(original_job_id))
1186 .exec(txn)
1187 .await?;
1188 Fragment::update_many()
1189 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1190 .filter(fragment::Column::JobId.eq(tmp_id))
1191 .exec(txn)
1192 .await?;
1193
1194 for (fragment_id, fragment_replace_map) in replace_upstream {
1197 let (fragment_id, mut stream_node) = Fragment::find_by_id(fragment_id as FragmentId)
1198 .select_only()
1199 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1200 .into_tuple::<(FragmentId, StreamNode)>()
1201 .one(txn)
1202 .await?
1203 .map(|(id, node)| (id, node.to_protobuf()))
1204 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1205
1206 visit_stream_node_mut(&mut stream_node, |body| {
1207 if let PbNodeBody::Merge(m) = body
1208 && let Some(new_fragment_id) = fragment_replace_map.get(&m.upstream_fragment_id)
1209 {
1210 m.upstream_fragment_id = *new_fragment_id;
1211 }
1212 });
1213 fragment::ActiveModel {
1214 fragment_id: Set(fragment_id),
1215 stream_node: Set(StreamNode::from(&stream_node)),
1216 ..Default::default()
1217 }
1218 .update(txn)
1219 .await?;
1220 }
1221
1222 Object::delete_by_id(tmp_id).exec(txn).await?;
1224
1225 let mut objects = vec![];
1227 match job_type {
1228 StreamingJobType::Table(_) => {
1229 let (table, table_obj) = Table::find_by_id(original_job_id)
1230 .find_also_related(Object)
1231 .one(txn)
1232 .await?
1233 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1234 objects.push(PbObject {
1235 object_info: Some(PbObjectInfo::Table(
1236 ObjectModel(table, table_obj.unwrap()).into(),
1237 )),
1238 })
1239 }
1240 StreamingJobType::Source => {
1241 let (source, source_obj) = Source::find_by_id(original_job_id)
1242 .find_also_related(Object)
1243 .one(txn)
1244 .await?
1245 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1246 objects.push(PbObject {
1247 object_info: Some(PbObjectInfo::Source(
1248 ObjectModel(source, source_obj.unwrap()).into(),
1249 )),
1250 })
1251 }
1252 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1253 }
1254
1255 if let Some(expr_rewriter) = index_item_rewriter {
1256 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1257 .select_only()
1258 .columns([index::Column::IndexId, index::Column::IndexItems])
1259 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1260 .into_tuple()
1261 .all(txn)
1262 .await?;
1263 for (index_id, nodes) in index_items {
1264 let mut pb_nodes = nodes.to_protobuf();
1265 pb_nodes
1266 .iter_mut()
1267 .for_each(|x| expr_rewriter.rewrite_expr(x));
1268 let index = index::ActiveModel {
1269 index_id: Set(index_id),
1270 index_items: Set(pb_nodes.into()),
1271 ..Default::default()
1272 }
1273 .update(txn)
1274 .await?;
1275 let index_obj = index
1276 .find_related(Object)
1277 .one(txn)
1278 .await?
1279 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1280 objects.push(PbObject {
1281 object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1282 });
1283 }
1284 }
1285
1286 let fragment_mapping: Vec<_> = get_fragment_mappings(txn, original_job_id as _).await?;
1287
1288 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1289 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1290 notification_objs =
1291 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1292 }
1293
1294 Ok((objects, fragment_mapping, notification_objs))
1295 }
1296
1297 pub async fn try_abort_replacing_streaming_job(&self, tmp_job_id: ObjectId) -> MetaResult<()> {
1299 let inner = self.inner.write().await;
1300 Object::delete_by_id(tmp_job_id).exec(&inner.db).await?;
1301 Ok(())
1302 }
1303
1304 pub async fn update_source_rate_limit_by_source_id(
1307 &self,
1308 source_id: SourceId,
1309 rate_limit: Option<u32>,
1310 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1311 let inner = self.inner.read().await;
1312 let txn = inner.db.begin().await?;
1313
1314 {
1315 let active_source = source::ActiveModel {
1316 source_id: Set(source_id),
1317 rate_limit: Set(rate_limit.map(|v| v as i32)),
1318 ..Default::default()
1319 };
1320 active_source.update(&txn).await?;
1321 }
1322
1323 let (source, obj) = Source::find_by_id(source_id)
1324 .find_also_related(Object)
1325 .one(&txn)
1326 .await?
1327 .ok_or_else(|| {
1328 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1329 })?;
1330
1331 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1332 let streaming_job_ids: Vec<ObjectId> =
1333 if let Some(table_id) = source.optional_associated_table_id {
1334 vec![table_id]
1335 } else if let Some(source_info) = &source.source_info
1336 && source_info.to_protobuf().is_shared()
1337 {
1338 vec![source_id]
1339 } else {
1340 ObjectDependency::find()
1341 .select_only()
1342 .column(object_dependency::Column::UsedBy)
1343 .filter(object_dependency::Column::Oid.eq(source_id))
1344 .into_tuple()
1345 .all(&txn)
1346 .await?
1347 };
1348
1349 if streaming_job_ids.is_empty() {
1350 return Err(MetaError::invalid_parameter(format!(
1351 "source id {source_id} not used by any streaming job"
1352 )));
1353 }
1354
1355 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1356 .select_only()
1357 .columns([
1358 fragment::Column::FragmentId,
1359 fragment::Column::FragmentTypeMask,
1360 fragment::Column::StreamNode,
1361 ])
1362 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1363 .into_tuple()
1364 .all(&txn)
1365 .await?;
1366 let mut fragments = fragments
1367 .into_iter()
1368 .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf()))
1369 .collect_vec();
1370
1371 fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1372 let mut found = false;
1373 if *fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
1374 visit_stream_node_mut(stream_node, |node| {
1375 if let PbNodeBody::Source(node) = node {
1376 if let Some(node_inner) = &mut node.source_inner
1377 && node_inner.source_id == source_id as u32
1378 {
1379 node_inner.rate_limit = rate_limit;
1380 found = true;
1381 }
1382 }
1383 });
1384 }
1385 if is_fs_source {
1386 visit_stream_node_mut(stream_node, |node| {
1389 if let PbNodeBody::StreamFsFetch(node) = node {
1390 *fragment_type_mask |= PbFragmentTypeFlag::FsFetch as i32;
1391 if let Some(node_inner) = &mut node.node_inner
1392 && node_inner.source_id == source_id as u32
1393 {
1394 node_inner.rate_limit = rate_limit;
1395 found = true;
1396 }
1397 }
1398 });
1399 }
1400 found
1401 });
1402
1403 assert!(
1404 !fragments.is_empty(),
1405 "source id should be used by at least one fragment"
1406 );
1407 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1408 for (id, fragment_type_mask, stream_node) in fragments {
1409 fragment::ActiveModel {
1410 fragment_id: Set(id),
1411 fragment_type_mask: Set(fragment_type_mask),
1412 stream_node: Set(StreamNode::from(&stream_node)),
1413 ..Default::default()
1414 }
1415 .update(&txn)
1416 .await?;
1417 }
1418 let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1419
1420 txn.commit().await?;
1421
1422 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1423 let _version = self
1424 .notify_frontend(
1425 NotificationOperation::Update,
1426 NotificationInfo::ObjectGroup(PbObjectGroup {
1427 objects: vec![PbObject {
1428 object_info: Some(relation_info),
1429 }],
1430 }),
1431 )
1432 .await;
1433
1434 Ok(fragment_actors)
1435 }
1436
1437 async fn mutate_fragments_by_job_id(
1440 &self,
1441 job_id: ObjectId,
1442 mut fragments_mutation_fn: impl FnMut(&mut i32, &mut PbStreamNode) -> bool,
1444 err_msg: &'static str,
1446 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1447 let inner = self.inner.read().await;
1448 let txn = inner.db.begin().await?;
1449
1450 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1451 .select_only()
1452 .columns([
1453 fragment::Column::FragmentId,
1454 fragment::Column::FragmentTypeMask,
1455 fragment::Column::StreamNode,
1456 ])
1457 .filter(fragment::Column::JobId.eq(job_id))
1458 .into_tuple()
1459 .all(&txn)
1460 .await?;
1461 let mut fragments = fragments
1462 .into_iter()
1463 .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf()))
1464 .collect_vec();
1465
1466 fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1467 fragments_mutation_fn(fragment_type_mask, stream_node)
1468 });
1469 if fragments.is_empty() {
1470 return Err(MetaError::invalid_parameter(format!(
1471 "job id {job_id}: {}",
1472 err_msg
1473 )));
1474 }
1475
1476 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1477 for (id, _, stream_node) in fragments {
1478 fragment::ActiveModel {
1479 fragment_id: Set(id),
1480 stream_node: Set(StreamNode::from(&stream_node)),
1481 ..Default::default()
1482 }
1483 .update(&txn)
1484 .await?;
1485 }
1486 let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1487
1488 txn.commit().await?;
1489
1490 Ok(fragment_actors)
1491 }
1492
1493 async fn mutate_fragment_by_fragment_id(
1494 &self,
1495 fragment_id: FragmentId,
1496 mut fragment_mutation_fn: impl FnMut(&mut i32, &mut PbStreamNode) -> bool,
1497 err_msg: &'static str,
1498 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1499 let inner = self.inner.read().await;
1500 let txn = inner.db.begin().await?;
1501
1502 let (mut fragment_type_mask, stream_node): (i32, StreamNode) =
1503 Fragment::find_by_id(fragment_id)
1504 .select_only()
1505 .columns([
1506 fragment::Column::FragmentTypeMask,
1507 fragment::Column::StreamNode,
1508 ])
1509 .into_tuple()
1510 .one(&txn)
1511 .await?
1512 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1513 let mut pb_stream_node = stream_node.to_protobuf();
1514
1515 if !fragment_mutation_fn(&mut fragment_type_mask, &mut pb_stream_node) {
1516 return Err(MetaError::invalid_parameter(format!(
1517 "fragment id {fragment_id}: {}",
1518 err_msg
1519 )));
1520 }
1521
1522 fragment::ActiveModel {
1523 fragment_id: Set(fragment_id),
1524 stream_node: Set(stream_node),
1525 ..Default::default()
1526 }
1527 .update(&txn)
1528 .await?;
1529
1530 let fragment_actors = get_fragment_actor_ids(&txn, vec![fragment_id]).await?;
1531
1532 txn.commit().await?;
1533
1534 Ok(fragment_actors)
1535 }
1536
1537 pub async fn update_backfill_rate_limit_by_job_id(
1540 &self,
1541 job_id: ObjectId,
1542 rate_limit: Option<u32>,
1543 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1544 let update_backfill_rate_limit =
1545 |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1546 let mut found = false;
1547 if *fragment_type_mask & PbFragmentTypeFlag::backfill_rate_limit_fragments() != 0 {
1548 visit_stream_node_mut(stream_node, |node| match node {
1549 PbNodeBody::StreamCdcScan(node) => {
1550 node.rate_limit = rate_limit;
1551 found = true;
1552 }
1553 PbNodeBody::StreamScan(node) => {
1554 node.rate_limit = rate_limit;
1555 found = true;
1556 }
1557 PbNodeBody::SourceBackfill(node) => {
1558 node.rate_limit = rate_limit;
1559 found = true;
1560 }
1561 PbNodeBody::Sink(node) => {
1562 node.rate_limit = rate_limit;
1563 found = true;
1564 }
1565 _ => {}
1566 });
1567 }
1568 found
1569 };
1570
1571 self.mutate_fragments_by_job_id(
1572 job_id,
1573 update_backfill_rate_limit,
1574 "stream scan node or source node not found",
1575 )
1576 .await
1577 }
1578
1579 pub async fn update_sink_rate_limit_by_job_id(
1582 &self,
1583 job_id: ObjectId,
1584 rate_limit: Option<u32>,
1585 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1586 let update_sink_rate_limit =
1587 |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1588 let mut found = false;
1589 if *fragment_type_mask & PbFragmentTypeFlag::sink_rate_limit_fragments() != 0 {
1590 visit_stream_node_mut(stream_node, |node| {
1591 if let PbNodeBody::Sink(node) = node {
1592 node.rate_limit = rate_limit;
1593 found = true;
1594 }
1595 });
1596 }
1597 found
1598 };
1599
1600 self.mutate_fragments_by_job_id(job_id, update_sink_rate_limit, "sink node not found")
1601 .await
1602 }
1603
1604 pub async fn update_dml_rate_limit_by_job_id(
1605 &self,
1606 job_id: ObjectId,
1607 rate_limit: Option<u32>,
1608 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1609 let update_dml_rate_limit =
1610 |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1611 let mut found = false;
1612 if *fragment_type_mask & PbFragmentTypeFlag::dml_rate_limit_fragments() != 0 {
1613 visit_stream_node_mut(stream_node, |node| {
1614 if let PbNodeBody::Dml(node) = node {
1615 node.rate_limit = rate_limit;
1616 found = true;
1617 }
1618 });
1619 }
1620 found
1621 };
1622
1623 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1624 .await
1625 }
1626
1627 pub async fn update_sink_props_by_sink_id(
1628 &self,
1629 sink_id: SinkId,
1630 props: BTreeMap<String, String>,
1631 ) -> MetaResult<HashMap<String, String>> {
1632 let inner = self.inner.read().await;
1633 let txn = inner.db.begin().await?;
1634
1635 let (sink, _obj) = Sink::find_by_id(sink_id)
1636 .find_also_related(Object)
1637 .one(&txn)
1638 .await?
1639 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
1640
1641 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
1643 Some(connector) => {
1644 let connector_type = connector.to_lowercase();
1645 match_sink_name_str!(
1646 connector_type.as_str(),
1647 SinkType,
1648 {
1649 for (k, v) in &props {
1650 if !SinkType::SINK_ALTER_CONFIG_LIST.contains(&k.as_str()) {
1651 return Err(SinkError::Config(anyhow!(
1652 "unsupported alter config: {}={}",
1653 k,
1654 v
1655 ))
1656 .into());
1657 }
1658 }
1659 let mut new_props = sink.properties.0.clone();
1660 new_props.extend(props.clone());
1661 SinkType::validate_alter_config(&new_props)
1662 },
1663 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
1664 )?
1665 }
1666 None => {
1667 return Err(
1668 SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
1669 );
1670 }
1671 };
1672 let definition = sink.definition.clone();
1673 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1674 .map_err(|e| SinkError::Config(anyhow!(e)))?
1675 .try_into()
1676 .unwrap();
1677 if let Statement::CreateSink { stmt } = &mut stmt {
1678 let mut new_sql_options = stmt
1679 .with_properties
1680 .0
1681 .iter()
1682 .map(|sql_option| (&sql_option.name, sql_option))
1683 .collect::<IndexMap<_, _>>();
1684 let add_sql_options = props
1685 .iter()
1686 .map(|(k, v)| SqlOption::try_from((k, v)))
1687 .collect::<Result<Vec<SqlOption>, ParserError>>()
1688 .map_err(|e| SinkError::Config(anyhow!(e)))?;
1689 new_sql_options.extend(
1690 add_sql_options
1691 .iter()
1692 .map(|sql_option| (&sql_option.name, sql_option)),
1693 );
1694 stmt.with_properties.0 = new_sql_options.into_values().cloned().collect();
1695 } else {
1696 panic!("sink definition is not a create sink statement")
1697 }
1698 let mut new_config = sink.properties.clone().into_inner();
1699 new_config.extend(props);
1700
1701 let active_sink = sink::ActiveModel {
1702 sink_id: Set(sink_id),
1703 properties: Set(risingwave_meta_model::Property(new_config.clone())),
1704 definition: Set(stmt.to_string()),
1705 ..Default::default()
1706 };
1707 active_sink.update(&txn).await?;
1708
1709 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1710 .select_only()
1711 .columns([
1712 fragment::Column::FragmentId,
1713 fragment::Column::FragmentTypeMask,
1714 fragment::Column::StreamNode,
1715 ])
1716 .filter(fragment::Column::JobId.eq(sink_id))
1717 .into_tuple()
1718 .all(&txn)
1719 .await?;
1720 let fragments = fragments
1721 .into_iter()
1722 .filter(|(_, fragment_type_mask, _)| {
1723 *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
1724 })
1725 .filter_map(|(id, _, stream_node)| {
1726 let mut stream_node = stream_node.to_protobuf();
1727 let mut found = false;
1728 visit_stream_node_mut(&mut stream_node, |node| {
1729 if let PbNodeBody::Sink(node) = node
1730 && let Some(sink_desc) = &mut node.sink_desc
1731 && sink_desc.id == sink_id as u32
1732 {
1733 sink_desc.properties = new_config.clone();
1734 found = true;
1735 }
1736 });
1737 if found { Some((id, stream_node)) } else { None }
1738 })
1739 .collect_vec();
1740 assert!(
1741 !fragments.is_empty(),
1742 "sink id should be used by at least one fragment"
1743 );
1744 for (id, stream_node) in fragments {
1745 fragment::ActiveModel {
1746 fragment_id: Set(id),
1747 stream_node: Set(StreamNode::from(&stream_node)),
1748 ..Default::default()
1749 }
1750 .update(&txn)
1751 .await?;
1752 }
1753
1754 let (sink, obj) = Sink::find_by_id(sink_id)
1755 .find_also_related(Object)
1756 .one(&txn)
1757 .await?
1758 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
1759
1760 txn.commit().await?;
1761
1762 let relation_info = PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into());
1763 let _version = self
1764 .notify_frontend(
1765 NotificationOperation::Update,
1766 NotificationInfo::ObjectGroup(PbObjectGroup {
1767 objects: vec![PbObject {
1768 object_info: Some(relation_info),
1769 }],
1770 }),
1771 )
1772 .await;
1773
1774 Ok(new_config.into_iter().collect())
1775 }
1776
1777 pub async fn update_fragment_rate_limit_by_fragment_id(
1778 &self,
1779 fragment_id: FragmentId,
1780 rate_limit: Option<u32>,
1781 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1782 let update_rate_limit = |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1783 let mut found = false;
1784 if *fragment_type_mask & PbFragmentTypeFlag::dml_rate_limit_fragments() != 0
1785 || *fragment_type_mask & PbFragmentTypeFlag::sink_rate_limit_fragments() != 0
1786 || *fragment_type_mask & PbFragmentTypeFlag::backfill_rate_limit_fragments() != 0
1787 || *fragment_type_mask & PbFragmentTypeFlag::source_rate_limit_fragments() != 0
1788 {
1789 visit_stream_node_mut(stream_node, |node| {
1790 if let PbNodeBody::Dml(node) = node {
1791 node.rate_limit = rate_limit;
1792 found = true;
1793 }
1794 if let PbNodeBody::Sink(node) = node {
1795 node.rate_limit = rate_limit;
1796 found = true;
1797 }
1798 if let PbNodeBody::StreamCdcScan(node) = node {
1799 node.rate_limit = rate_limit;
1800 found = true;
1801 }
1802 if let PbNodeBody::StreamScan(node) = node {
1803 node.rate_limit = rate_limit;
1804 found = true;
1805 }
1806 if let PbNodeBody::SourceBackfill(node) = node {
1807 node.rate_limit = rate_limit;
1808 found = true;
1809 }
1810 });
1811 }
1812 found
1813 };
1814 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
1815 .await
1816 }
1817
1818 pub async fn post_apply_reschedules(
1819 &self,
1820 reschedules: HashMap<FragmentId, Reschedule>,
1821 post_updates: &JobReschedulePostUpdates,
1822 ) -> MetaResult<()> {
1823 let new_created_actors: HashSet<_> = reschedules
1824 .values()
1825 .flat_map(|reschedule| {
1826 reschedule
1827 .added_actors
1828 .values()
1829 .flatten()
1830 .map(|actor_id| *actor_id as ActorId)
1831 })
1832 .collect();
1833
1834 let inner = self.inner.write().await;
1835
1836 let txn = inner.db.begin().await?;
1837
1838 let mut fragment_mapping_to_notify = vec![];
1839
1840 for (
1841 fragment_id,
1842 Reschedule {
1843 removed_actors,
1844 vnode_bitmap_updates,
1845 actor_splits,
1846 newly_created_actors,
1847 ..
1848 },
1849 ) in reschedules
1850 {
1851 Actor::delete_many()
1853 .filter(
1854 actor::Column::ActorId
1855 .is_in(removed_actors.iter().map(|id| *id as ActorId).collect_vec()),
1856 )
1857 .exec(&txn)
1858 .await?;
1859
1860 for (
1862 (
1863 StreamActor {
1864 actor_id,
1865 fragment_id,
1866 vnode_bitmap,
1867 expr_context,
1868 ..
1869 },
1870 _,
1871 ),
1872 worker_id,
1873 ) in newly_created_actors.into_values()
1874 {
1875 let splits = actor_splits
1876 .get(&actor_id)
1877 .map(|splits| splits.iter().map(PbConnectorSplit::from).collect_vec());
1878
1879 Actor::insert(actor::ActiveModel {
1880 actor_id: Set(actor_id as _),
1881 fragment_id: Set(fragment_id as _),
1882 status: Set(ActorStatus::Running),
1883 splits: Set(splits.map(|splits| (&PbConnectorSplits { splits }).into())),
1884 worker_id: Set(worker_id),
1885 upstream_actor_ids: Set(Default::default()),
1886 vnode_bitmap: Set(vnode_bitmap
1887 .as_ref()
1888 .map(|bitmap| (&bitmap.to_protobuf()).into())),
1889 expr_context: Set(expr_context.as_ref().unwrap().into()),
1890 })
1891 .exec(&txn)
1892 .await?;
1893 }
1894
1895 for (actor_id, bitmap) in vnode_bitmap_updates {
1897 let actor = Actor::find_by_id(actor_id as ActorId)
1898 .one(&txn)
1899 .await?
1900 .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
1901
1902 let mut actor = actor.into_active_model();
1903 actor.vnode_bitmap = Set(Some((&bitmap.to_protobuf()).into()));
1904 actor.update(&txn).await?;
1905 }
1906
1907 for (actor_id, splits) in actor_splits {
1909 if new_created_actors.contains(&(actor_id as ActorId)) {
1910 continue;
1911 }
1912
1913 let actor = Actor::find_by_id(actor_id as ActorId)
1914 .one(&txn)
1915 .await?
1916 .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
1917
1918 let mut actor = actor.into_active_model();
1919 let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
1920 actor.splits = Set(Some((&PbConnectorSplits { splits }).into()));
1921 actor.update(&txn).await?;
1922 }
1923
1924 let fragment = Fragment::find_by_id(fragment_id)
1926 .one(&txn)
1927 .await?
1928 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1929
1930 let job_actors = fragment
1931 .find_related(Actor)
1932 .all(&txn)
1933 .await?
1934 .into_iter()
1935 .map(|actor| {
1936 (
1937 fragment_id,
1938 fragment.distribution_type,
1939 actor.actor_id,
1940 actor.vnode_bitmap,
1941 actor.worker_id,
1942 actor.status,
1943 )
1944 })
1945 .collect_vec();
1946
1947 fragment_mapping_to_notify.extend(rebuild_fragment_mapping_from_actors(job_actors));
1948 }
1949
1950 let JobReschedulePostUpdates {
1951 parallelism_updates,
1952 resource_group_updates,
1953 } = post_updates;
1954
1955 for (table_id, parallelism) in parallelism_updates {
1956 let mut streaming_job = StreamingJobModel::find_by_id(table_id.table_id() as ObjectId)
1957 .one(&txn)
1958 .await?
1959 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?
1960 .into_active_model();
1961
1962 streaming_job.parallelism = Set(match parallelism {
1963 TableParallelism::Adaptive => StreamingParallelism::Adaptive,
1964 TableParallelism::Fixed(n) => StreamingParallelism::Fixed(*n as _),
1965 TableParallelism::Custom => StreamingParallelism::Custom,
1966 });
1967
1968 if let Some(resource_group) =
1969 resource_group_updates.get(&(table_id.table_id() as ObjectId))
1970 {
1971 streaming_job.specific_resource_group = Set(resource_group.to_owned());
1972 }
1973
1974 streaming_job.update(&txn).await?;
1975 }
1976
1977 txn.commit().await?;
1978 self.notify_fragment_mapping(Operation::Update, fragment_mapping_to_notify)
1979 .await;
1980
1981 Ok(())
1982 }
1983
1984 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
1987 let inner = self.inner.read().await;
1988 let txn = inner.db.begin().await?;
1989
1990 let fragments: Vec<(FragmentId, ObjectId, i32, StreamNode)> = Fragment::find()
1991 .select_only()
1992 .columns([
1993 fragment::Column::FragmentId,
1994 fragment::Column::JobId,
1995 fragment::Column::FragmentTypeMask,
1996 fragment::Column::StreamNode,
1997 ])
1998 .filter(fragment_type_mask_intersects(
1999 PbFragmentTypeFlag::rate_limit_fragments(),
2000 ))
2001 .into_tuple()
2002 .all(&txn)
2003 .await?;
2004
2005 let mut rate_limits = Vec::new();
2006 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2007 let stream_node = stream_node.to_protobuf();
2008 let mut rate_limit = None;
2009 let mut node_name = None;
2010
2011 visit_stream_node(&stream_node, |node| {
2012 match node {
2013 PbNodeBody::Source(node) => {
2015 if let Some(node_inner) = &node.source_inner {
2016 debug_assert!(
2017 rate_limit.is_none(),
2018 "one fragment should only have 1 rate limit node"
2019 );
2020 rate_limit = node_inner.rate_limit;
2021 node_name = Some("SOURCE");
2022 }
2023 }
2024 PbNodeBody::StreamFsFetch(node) => {
2025 if let Some(node_inner) = &node.node_inner {
2026 debug_assert!(
2027 rate_limit.is_none(),
2028 "one fragment should only have 1 rate limit node"
2029 );
2030 rate_limit = node_inner.rate_limit;
2031 node_name = Some("FS_FETCH");
2032 }
2033 }
2034 PbNodeBody::SourceBackfill(node) => {
2036 debug_assert!(
2037 rate_limit.is_none(),
2038 "one fragment should only have 1 rate limit node"
2039 );
2040 rate_limit = node.rate_limit;
2041 node_name = Some("SOURCE_BACKFILL");
2042 }
2043 PbNodeBody::StreamScan(node) => {
2044 debug_assert!(
2045 rate_limit.is_none(),
2046 "one fragment should only have 1 rate limit node"
2047 );
2048 rate_limit = node.rate_limit;
2049 node_name = Some("STREAM_SCAN");
2050 }
2051 PbNodeBody::StreamCdcScan(node) => {
2052 debug_assert!(
2053 rate_limit.is_none(),
2054 "one fragment should only have 1 rate limit node"
2055 );
2056 rate_limit = node.rate_limit;
2057 node_name = Some("STREAM_CDC_SCAN");
2058 }
2059 PbNodeBody::Sink(node) => {
2060 debug_assert!(
2061 rate_limit.is_none(),
2062 "one fragment should only have 1 rate limit node"
2063 );
2064 rate_limit = node.rate_limit;
2065 node_name = Some("SINK");
2066 }
2067 _ => {}
2068 }
2069 });
2070
2071 if let Some(rate_limit) = rate_limit {
2072 rate_limits.push(RateLimitInfo {
2073 fragment_id: fragment_id as u32,
2074 job_id: job_id as u32,
2075 fragment_type_mask: fragment_type_mask as u32,
2076 rate_limit,
2077 node_name: node_name.unwrap().to_owned(),
2078 });
2079 }
2080 }
2081
2082 Ok(rate_limits)
2083 }
2084}
2085
2086fn bitflag_intersects(column: SimpleExpr, value: i32) -> SimpleExpr {
2087 column
2088 .binary(BinOper::Custom("&"), value)
2089 .binary(BinOper::NotEqual, 0)
2090}
2091
2092fn fragment_type_mask_intersects(value: i32) -> SimpleExpr {
2093 bitflag_intersects(fragment::Column::FragmentTypeMask.into_simple_expr(), value)
2094}
2095
2096pub struct SinkIntoTableContext {
2097 pub creating_sink_id: Option<SinkId>,
2099 pub dropping_sink_id: Option<SinkId>,
2101 pub updated_sink_catalogs: Vec<SinkId>,
2104}