1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::config::DefaultParallelism;
22use risingwave_common::hash::VnodeCountCompat;
23use risingwave_common::util::column_index_mapping::ColIndexMapping;
24use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_mut};
25use risingwave_common::{bail, current_cluster_version};
26use risingwave_connector::sink::file_sink::fs::FsSink;
27use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
28use risingwave_connector::{WithPropertiesExt, match_sink_name_str};
29use risingwave_meta_model::actor::ActorStatus;
30use risingwave_meta_model::object::ObjectType;
31use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
32use risingwave_meta_model::table::TableType;
33use risingwave_meta_model::*;
34use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
35use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId;
36use risingwave_pb::catalog::{PbCreateType, PbTable};
37use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
38use risingwave_pb::meta::object::PbObjectInfo;
39use risingwave_pb::meta::subscribe_response::{
40 Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
41};
42use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
43use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
44use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
45use risingwave_pb::stream_plan::stream_node::PbNodeBody;
46use risingwave_pb::stream_plan::{FragmentTypeFlag, PbFragmentTypeFlag, PbStreamNode};
47use risingwave_pb::user::PbUserInfo;
48use risingwave_sqlparser::ast::{SqlOption, Statement};
49use risingwave_sqlparser::parser::{Parser, ParserError};
50use sea_orm::ActiveValue::Set;
51use sea_orm::sea_query::{BinOper, Expr, Query, SimpleExpr};
52use sea_orm::{
53 ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel,
54 IntoSimpleExpr, JoinType, ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect,
55 RelationTrait, TransactionTrait,
56};
57use thiserror_ext::AsReport;
58
59use crate::barrier::{ReplaceStreamJobPlan, Reschedule};
60use crate::controller::ObjectModel;
61use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
62use crate::controller::rename::ReplaceTableExprRewriter;
63use crate::controller::utils::{
64 PartialObject, build_object_group_for_delete, check_relation_name_duplicate,
65 check_sink_into_table_cycle, ensure_object_id, ensure_user_id, get_fragment_actor_ids,
66 get_fragment_mappings, get_internal_tables_by_id, insert_fragment_relations,
67 rebuild_fragment_mapping_from_actors,
68};
69use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
70use crate::model::{
71 FragmentDownstreamRelation, FragmentReplaceUpstream, StreamActor, StreamContext,
72 StreamJobFragmentsToCreate, TableParallelism,
73};
74use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
75use crate::{MetaError, MetaResult};
76
77impl CatalogController {
78 pub async fn create_streaming_job_obj(
79 txn: &DatabaseTransaction,
80 obj_type: ObjectType,
81 owner_id: UserId,
82 database_id: Option<DatabaseId>,
83 schema_id: Option<SchemaId>,
84 create_type: PbCreateType,
85 ctx: &StreamContext,
86 streaming_parallelism: StreamingParallelism,
87 max_parallelism: usize,
88 specific_resource_group: Option<String>, ) -> MetaResult<ObjectId> {
90 let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
91 let job = streaming_job::ActiveModel {
92 job_id: Set(obj.oid),
93 job_status: Set(JobStatus::Initial),
94 create_type: Set(create_type.into()),
95 timezone: Set(ctx.timezone.clone()),
96 parallelism: Set(streaming_parallelism),
97 max_parallelism: Set(max_parallelism as _),
98 specific_resource_group: Set(specific_resource_group),
99 };
100 job.insert(txn).await?;
101
102 Ok(obj.oid)
103 }
104
105 pub async fn create_job_catalog(
111 &self,
112 streaming_job: &mut StreamingJob,
113 ctx: &StreamContext,
114 parallelism: &Option<Parallelism>,
115 max_parallelism: usize,
116 mut dependencies: HashSet<ObjectId>,
117 specific_resource_group: Option<String>,
118 ) -> MetaResult<()> {
119 let inner = self.inner.write().await;
120 let txn = inner.db.begin().await?;
121 let create_type = streaming_job.create_type();
122
123 let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
124 (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
125 (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
126 (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
127 };
128
129 ensure_user_id(streaming_job.owner() as _, &txn).await?;
130 ensure_object_id(ObjectType::Database, streaming_job.database_id() as _, &txn).await?;
131 ensure_object_id(ObjectType::Schema, streaming_job.schema_id() as _, &txn).await?;
132 check_relation_name_duplicate(
133 &streaming_job.name(),
134 streaming_job.database_id() as _,
135 streaming_job.schema_id() as _,
136 &txn,
137 )
138 .await?;
139
140 dependencies.extend(
142 streaming_job
143 .dependent_relations()
144 .into_iter()
145 .map(|id| id as ObjectId),
146 );
147
148 if !dependencies.is_empty() {
150 let altering_cnt = ObjectDependency::find()
151 .join(
152 JoinType::InnerJoin,
153 object_dependency::Relation::Object1.def(),
154 )
155 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
156 .filter(
157 object_dependency::Column::Oid
158 .is_in(dependencies.clone())
159 .and(object::Column::ObjType.eq(ObjectType::Table))
160 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
161 .and(
162 object::Column::Oid.not_in_subquery(
164 Query::select()
165 .column(table::Column::TableId)
166 .from(Table)
167 .to_owned(),
168 ),
169 ),
170 )
171 .count(&txn)
172 .await?;
173 if altering_cnt != 0 {
174 return Err(MetaError::permission_denied(
175 "some dependent relations are being altered",
176 ));
177 }
178 }
179
180 match streaming_job {
181 StreamingJob::MaterializedView(table) => {
182 let job_id = Self::create_streaming_job_obj(
183 &txn,
184 ObjectType::Table,
185 table.owner as _,
186 Some(table.database_id as _),
187 Some(table.schema_id as _),
188 create_type,
189 ctx,
190 streaming_parallelism,
191 max_parallelism,
192 specific_resource_group,
193 )
194 .await?;
195 table.id = job_id as _;
196 let table_model: table::ActiveModel = table.clone().into();
197 Table::insert(table_model).exec(&txn).await?;
198 }
199 StreamingJob::Sink(sink, _) => {
200 if let Some(target_table_id) = sink.target_table {
201 if check_sink_into_table_cycle(
202 target_table_id as ObjectId,
203 dependencies.iter().cloned().collect(),
204 &txn,
205 )
206 .await?
207 {
208 bail!("Creating such a sink will result in circular dependency.");
209 }
210 }
211
212 let job_id = Self::create_streaming_job_obj(
213 &txn,
214 ObjectType::Sink,
215 sink.owner as _,
216 Some(sink.database_id as _),
217 Some(sink.schema_id as _),
218 create_type,
219 ctx,
220 streaming_parallelism,
221 max_parallelism,
222 specific_resource_group,
223 )
224 .await?;
225 sink.id = job_id as _;
226 let sink_model: sink::ActiveModel = sink.clone().into();
227 Sink::insert(sink_model).exec(&txn).await?;
228 }
229 StreamingJob::Table(src, table, _) => {
230 let job_id = Self::create_streaming_job_obj(
231 &txn,
232 ObjectType::Table,
233 table.owner as _,
234 Some(table.database_id as _),
235 Some(table.schema_id as _),
236 create_type,
237 ctx,
238 streaming_parallelism,
239 max_parallelism,
240 specific_resource_group,
241 )
242 .await?;
243 table.id = job_id as _;
244 if let Some(src) = src {
245 let src_obj = Self::create_object(
246 &txn,
247 ObjectType::Source,
248 src.owner as _,
249 Some(src.database_id as _),
250 Some(src.schema_id as _),
251 )
252 .await?;
253 src.id = src_obj.oid as _;
254 src.optional_associated_table_id =
255 Some(PbOptionalAssociatedTableId::AssociatedTableId(job_id as _));
256 table.optional_associated_source_id = Some(
257 PbOptionalAssociatedSourceId::AssociatedSourceId(src_obj.oid as _),
258 );
259 let source: source::ActiveModel = src.clone().into();
260 Source::insert(source).exec(&txn).await?;
261 }
262 let table_model: table::ActiveModel = table.clone().into();
263 Table::insert(table_model).exec(&txn).await?;
264 }
265 StreamingJob::Index(index, table) => {
266 ensure_object_id(ObjectType::Table, index.primary_table_id as _, &txn).await?;
267 let job_id = Self::create_streaming_job_obj(
268 &txn,
269 ObjectType::Index,
270 index.owner as _,
271 Some(index.database_id as _),
272 Some(index.schema_id as _),
273 create_type,
274 ctx,
275 streaming_parallelism,
276 max_parallelism,
277 specific_resource_group,
278 )
279 .await?;
280 index.id = job_id as _;
282 index.index_table_id = job_id as _;
283 table.id = job_id as _;
284
285 ObjectDependency::insert(object_dependency::ActiveModel {
286 oid: Set(index.primary_table_id as _),
287 used_by: Set(table.id as _),
288 ..Default::default()
289 })
290 .exec(&txn)
291 .await?;
292
293 let table_model: table::ActiveModel = table.clone().into();
294 Table::insert(table_model).exec(&txn).await?;
295 let index_model: index::ActiveModel = index.clone().into();
296 Index::insert(index_model).exec(&txn).await?;
297 }
298 StreamingJob::Source(src) => {
299 let job_id = Self::create_streaming_job_obj(
300 &txn,
301 ObjectType::Source,
302 src.owner as _,
303 Some(src.database_id as _),
304 Some(src.schema_id as _),
305 create_type,
306 ctx,
307 streaming_parallelism,
308 max_parallelism,
309 specific_resource_group,
310 )
311 .await?;
312 src.id = job_id as _;
313 let source_model: source::ActiveModel = src.clone().into();
314 Source::insert(source_model).exec(&txn).await?;
315 }
316 }
317
318 dependencies.extend(
320 streaming_job
321 .dependent_secret_ids()?
322 .into_iter()
323 .map(|secret_id| secret_id as ObjectId),
324 );
325 dependencies.extend(
327 streaming_job
328 .dependent_connection_ids()?
329 .into_iter()
330 .map(|conn_id| conn_id as ObjectId),
331 );
332
333 if !dependencies.is_empty() {
335 ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
336 object_dependency::ActiveModel {
337 oid: Set(oid),
338 used_by: Set(streaming_job.id() as _),
339 ..Default::default()
340 }
341 }))
342 .exec(&txn)
343 .await?;
344 }
345
346 txn.commit().await?;
347
348 Ok(())
349 }
350
351 pub async fn create_internal_table_catalog(
359 &self,
360 job: &StreamingJob,
361 mut incomplete_internal_tables: Vec<PbTable>,
362 ) -> MetaResult<HashMap<u32, u32>> {
363 let job_id = job.id() as ObjectId;
364 let inner = self.inner.write().await;
365 let txn = inner.db.begin().await?;
366 let mut table_id_map = HashMap::new();
367 for table in &mut incomplete_internal_tables {
368 let table_id = Self::create_object(
369 &txn,
370 ObjectType::Table,
371 table.owner as _,
372 Some(table.database_id as _),
373 Some(table.schema_id as _),
374 )
375 .await?
376 .oid;
377 table_id_map.insert(table.id, table_id as u32);
378 table.id = table_id as _;
379 table.job_id = Some(job_id as _);
380
381 let table_model = table::ActiveModel {
382 table_id: Set(table_id as _),
383 belongs_to_job_id: Set(Some(job_id)),
384 fragment_id: NotSet,
385 ..table.clone().into()
386 };
387 Table::insert(table_model).exec(&txn).await?;
388 }
389 txn.commit().await?;
390
391 Ok(table_id_map)
392 }
393
394 pub async fn prepare_streaming_job(
399 &self,
400 stream_job_fragments: &StreamJobFragmentsToCreate,
401 streaming_job: &StreamingJob,
402 for_replace: bool,
403 ) -> MetaResult<()> {
404 let is_materialized_view = streaming_job.is_materialized_view();
405 let fragment_actors =
406 Self::extract_fragment_and_actors_from_fragments(stream_job_fragments)?;
407 let mut all_tables = stream_job_fragments.all_tables();
408 let inner = self.inner.write().await;
409
410 let mut objects = vec![];
411 let txn = inner.db.begin().await?;
412
413 let (fragments, actors): (Vec<_>, Vec<_>) = fragment_actors.into_iter().unzip();
415 for fragment in fragments {
416 let fragment_id = fragment.fragment_id;
417 let state_table_ids = fragment.state_table_ids.inner_ref().clone();
418
419 let fragment = fragment.into_active_model();
420 Fragment::insert(fragment).exec(&txn).await?;
421
422 if !for_replace {
425 for state_table_id in state_table_ids {
426 let table = all_tables
430 .get_mut(&(state_table_id as u32))
431 .unwrap_or_else(|| panic!("table {} not found", state_table_id));
432 assert_eq!(table.id, state_table_id as u32);
433 assert_eq!(table.fragment_id, fragment_id as u32);
434 table.job_id = Some(streaming_job.id());
435 let vnode_count = table.vnode_count();
436
437 table::ActiveModel {
438 table_id: Set(state_table_id as _),
439 fragment_id: Set(Some(fragment_id)),
440 vnode_count: Set(vnode_count as _),
441 ..Default::default()
442 }
443 .update(&txn)
444 .await?;
445
446 if is_materialized_view {
447 objects.push(PbObject {
448 object_info: Some(PbObjectInfo::Table(table.clone())),
449 });
450 }
451 }
452 }
453 }
454
455 insert_fragment_relations(&txn, &stream_job_fragments.downstreams).await?;
456
457 for actors in actors {
459 for actor in actors {
460 let actor = actor.into_active_model();
461 Actor::insert(actor).exec(&txn).await?;
462 }
463 }
464
465 if !for_replace {
466 if let StreamingJob::Table(_, table, ..) = streaming_job {
468 Table::update(table::ActiveModel {
469 table_id: Set(table.id as _),
470 dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)),
471 ..Default::default()
472 })
473 .exec(&txn)
474 .await?;
475 }
476 }
477
478 txn.commit().await?;
479
480 if !objects.is_empty() {
481 assert!(is_materialized_view);
482 self.notify_frontend(Operation::Add, Info::ObjectGroup(PbObjectGroup { objects }))
483 .await;
484 }
485
486 Ok(())
487 }
488
489 pub async fn try_abort_creating_streaming_job(
493 &self,
494 job_id: ObjectId,
495 is_cancelled: bool,
496 ) -> MetaResult<(bool, Option<DatabaseId>)> {
497 let mut inner = self.inner.write().await;
498 let txn = inner.db.begin().await?;
499
500 let obj = Object::find_by_id(job_id).one(&txn).await?;
501 let Some(obj) = obj else {
502 tracing::warn!(
503 id = job_id,
504 "streaming job not found when aborting creating, might be cleaned by recovery"
505 );
506 return Ok((true, None));
507 };
508 let database_id = obj
509 .database_id
510 .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
511
512 if !is_cancelled {
513 let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
514 if let Some(streaming_job) = streaming_job {
515 assert_ne!(streaming_job.job_status, JobStatus::Created);
516 if streaming_job.create_type == CreateType::Background
517 && streaming_job.job_status == JobStatus::Creating
518 {
519 tracing::warn!(
521 id = job_id,
522 "streaming job is created in background and still in creating status"
523 );
524 return Ok((false, Some(database_id)));
525 }
526 }
527 }
528
529 let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
530
531 let table_obj = Table::find_by_id(job_id).one(&txn).await?;
533 let mut objs = vec![];
534 if let Some(table) = &table_obj
535 && table.table_type == TableType::MaterializedView
536 {
537 let obj: Option<PartialObject> = Object::find_by_id(job_id)
538 .select_only()
539 .columns([
540 object::Column::Oid,
541 object::Column::ObjType,
542 object::Column::SchemaId,
543 object::Column::DatabaseId,
544 ])
545 .into_partial_model()
546 .one(&txn)
547 .await?;
548 let obj =
549 obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
550 objs.push(obj);
551 let internal_table_objs: Vec<PartialObject> = Object::find()
552 .select_only()
553 .columns([
554 object::Column::Oid,
555 object::Column::ObjType,
556 object::Column::SchemaId,
557 object::Column::DatabaseId,
558 ])
559 .join(JoinType::InnerJoin, object::Relation::Table.def())
560 .filter(table::Column::BelongsToJobId.eq(job_id))
561 .into_partial_model()
562 .all(&txn)
563 .await?;
564 objs.extend(internal_table_objs);
565 }
566
567 Object::delete_by_id(job_id).exec(&txn).await?;
568 if !internal_table_ids.is_empty() {
569 Object::delete_many()
570 .filter(object::Column::Oid.is_in(internal_table_ids))
571 .exec(&txn)
572 .await?;
573 }
574 if let Some(t) = &table_obj
575 && let Some(source_id) = t.optional_associated_source_id
576 {
577 Object::delete_by_id(source_id).exec(&txn).await?;
578 }
579
580 let err = if is_cancelled {
581 MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
582 } else {
583 MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
584 };
585 let abort_reason = format!("streaing job aborted {}", err.as_report());
586 for tx in inner
587 .creating_table_finish_notifier
588 .get_mut(&database_id)
589 .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
590 .into_iter()
591 .flatten()
592 .flatten()
593 {
594 let _ = tx.send(Err(abort_reason.clone()));
595 }
596 txn.commit().await?;
597
598 if !objs.is_empty() {
599 self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
602 .await;
603 }
604 Ok((true, Some(database_id)))
605 }
606
607 pub async fn post_collect_job_fragments(
608 &self,
609 job_id: ObjectId,
610 actor_ids: Vec<crate::model::ActorId>,
611 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
612 split_assignment: &SplitAssignment,
613 ) -> MetaResult<()> {
614 self.post_collect_job_fragments_inner(
615 job_id,
616 actor_ids,
617 upstream_fragment_new_downstreams,
618 split_assignment,
619 false,
620 )
621 .await
622 }
623
624 pub async fn post_collect_job_fragments_inner(
625 &self,
626 job_id: ObjectId,
627 actor_ids: Vec<crate::model::ActorId>,
628 upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
629 split_assignment: &SplitAssignment,
630 is_mv: bool,
631 ) -> MetaResult<()> {
632 let inner = self.inner.write().await;
633 let txn = inner.db.begin().await?;
634
635 Actor::update_many()
636 .col_expr(
637 actor::Column::Status,
638 SimpleExpr::from(ActorStatus::Running.into_value()),
639 )
640 .filter(
641 actor::Column::ActorId
642 .is_in(actor_ids.into_iter().map(|id| id as ActorId).collect_vec()),
643 )
644 .exec(&txn)
645 .await?;
646
647 for splits in split_assignment.values() {
648 for (actor_id, splits) in splits {
649 let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
650 let connector_splits = &PbConnectorSplits { splits };
651 actor::ActiveModel {
652 actor_id: Set(*actor_id as _),
653 splits: Set(Some(connector_splits.into())),
654 ..Default::default()
655 }
656 .update(&txn)
657 .await?;
658 }
659 }
660
661 insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
662
663 streaming_job::ActiveModel {
665 job_id: Set(job_id),
666 job_status: Set(JobStatus::Creating),
667 ..Default::default()
668 }
669 .update(&txn)
670 .await?;
671
672 let fragment_mapping = if is_mv {
673 get_fragment_mappings(&txn, job_id as _).await?
674 } else {
675 vec![]
676 };
677
678 txn.commit().await?;
679 self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
680 .await;
681
682 Ok(())
683 }
684
685 pub async fn create_job_catalog_for_replace(
686 &self,
687 streaming_job: &StreamingJob,
688 ctx: &StreamContext,
689 specified_parallelism: &Option<NonZeroUsize>,
690 max_parallelism: usize,
691 ) -> MetaResult<ObjectId> {
692 let id = streaming_job.id();
693 let inner = self.inner.write().await;
694 let txn = inner.db.begin().await?;
695
696 streaming_job.verify_version_for_replace(&txn).await?;
698 let referring_cnt = ObjectDependency::find()
700 .join(
701 JoinType::InnerJoin,
702 object_dependency::Relation::Object1.def(),
703 )
704 .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
705 .filter(
706 object_dependency::Column::Oid
707 .eq(id as ObjectId)
708 .and(object::Column::ObjType.eq(ObjectType::Table))
709 .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
710 )
711 .count(&txn)
712 .await?;
713 if referring_cnt != 0 {
714 return Err(MetaError::permission_denied(
715 "job is being altered or referenced by some creating jobs",
716 ));
717 }
718
719 let original_max_parallelism: i32 = StreamingJobModel::find_by_id(id as ObjectId)
721 .select_only()
722 .column(streaming_job::Column::MaxParallelism)
723 .into_tuple()
724 .one(&txn)
725 .await?
726 .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
727
728 if original_max_parallelism != max_parallelism as i32 {
729 bail!(
732 "cannot use a different max parallelism \
733 when replacing streaming job, \
734 original: {}, new: {}",
735 original_max_parallelism,
736 max_parallelism
737 );
738 }
739
740 let parallelism = match specified_parallelism {
741 None => StreamingParallelism::Adaptive,
742 Some(n) => StreamingParallelism::Fixed(n.get() as _),
743 };
744
745 let new_obj_id = Self::create_streaming_job_obj(
747 &txn,
748 streaming_job.object_type(),
749 streaming_job.owner() as _,
750 Some(streaming_job.database_id() as _),
751 Some(streaming_job.schema_id() as _),
752 streaming_job.create_type(),
753 ctx,
754 parallelism,
755 max_parallelism,
756 None,
757 )
758 .await?;
759
760 ObjectDependency::insert(object_dependency::ActiveModel {
762 oid: Set(id as _),
763 used_by: Set(new_obj_id as _),
764 ..Default::default()
765 })
766 .exec(&txn)
767 .await?;
768
769 txn.commit().await?;
770
771 Ok(new_obj_id)
772 }
773
774 pub async fn finish_streaming_job(
776 &self,
777 job_id: ObjectId,
778 replace_stream_job_info: Option<ReplaceStreamJobPlan>,
779 ) -> MetaResult<()> {
780 let mut inner = self.inner.write().await;
781 let txn = inner.db.begin().await?;
782
783 let job_type = Object::find_by_id(job_id)
784 .select_only()
785 .column(object::Column::ObjType)
786 .into_tuple()
787 .one(&txn)
788 .await?
789 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
790
791 let res = Object::update_many()
793 .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
794 .col_expr(
795 object::Column::CreatedAtClusterVersion,
796 current_cluster_version().into(),
797 )
798 .filter(object::Column::Oid.eq(job_id))
799 .exec(&txn)
800 .await?;
801 if res.rows_affected == 0 {
802 return Err(MetaError::catalog_id_not_found("streaming job", job_id));
803 }
804
805 let job = streaming_job::ActiveModel {
807 job_id: Set(job_id),
808 job_status: Set(JobStatus::Created),
809 ..Default::default()
810 };
811 job.update(&txn).await?;
812
813 let internal_table_objs = Table::find()
815 .find_also_related(Object)
816 .filter(table::Column::BelongsToJobId.eq(job_id))
817 .all(&txn)
818 .await?;
819 let mut objects = internal_table_objs
820 .iter()
821 .map(|(table, obj)| PbObject {
822 object_info: Some(PbObjectInfo::Table(
823 ObjectModel(table.clone(), obj.clone().unwrap()).into(),
824 )),
825 })
826 .collect_vec();
827 let mut notification_op = NotificationOperation::Add;
828
829 match job_type {
830 ObjectType::Table => {
831 let (table, obj) = Table::find_by_id(job_id)
832 .find_also_related(Object)
833 .one(&txn)
834 .await?
835 .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
836 if table.table_type == TableType::MaterializedView {
837 notification_op = NotificationOperation::Update;
838 }
839
840 if let Some(source_id) = table.optional_associated_source_id {
841 let (src, obj) = Source::find_by_id(source_id)
842 .find_also_related(Object)
843 .one(&txn)
844 .await?
845 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
846 objects.push(PbObject {
847 object_info: Some(PbObjectInfo::Source(
848 ObjectModel(src, obj.unwrap()).into(),
849 )),
850 });
851 }
852 objects.push(PbObject {
853 object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
854 });
855 }
856 ObjectType::Sink => {
857 let (sink, obj) = Sink::find_by_id(job_id)
858 .find_also_related(Object)
859 .one(&txn)
860 .await?
861 .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
862 objects.push(PbObject {
863 object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
864 });
865 }
866 ObjectType::Index => {
867 let (index, obj) = Index::find_by_id(job_id)
868 .find_also_related(Object)
869 .one(&txn)
870 .await?
871 .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
872 {
873 let (table, obj) = Table::find_by_id(index.index_table_id)
874 .find_also_related(Object)
875 .one(&txn)
876 .await?
877 .ok_or_else(|| {
878 MetaError::catalog_id_not_found("table", index.index_table_id)
879 })?;
880 objects.push(PbObject {
881 object_info: Some(PbObjectInfo::Table(
882 ObjectModel(table, obj.unwrap()).into(),
883 )),
884 });
885 }
886 objects.push(PbObject {
887 object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
888 });
889 }
890 ObjectType::Source => {
891 let (source, obj) = Source::find_by_id(job_id)
892 .find_also_related(Object)
893 .one(&txn)
894 .await?
895 .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
896 objects.push(PbObject {
897 object_info: Some(PbObjectInfo::Source(
898 ObjectModel(source, obj.unwrap()).into(),
899 )),
900 });
901 }
902 _ => unreachable!("invalid job type: {:?}", job_type),
903 }
904
905 let fragment_mapping = get_fragment_mappings(&txn, job_id).await?;
906
907 let replace_table_mapping_update = match replace_stream_job_info {
908 Some(ReplaceStreamJobPlan {
909 streaming_job,
910 replace_upstream,
911 tmp_id,
912 ..
913 }) => {
914 let incoming_sink_id = job_id;
915
916 let (relations, fragment_mapping, _) = Self::finish_replace_streaming_job_inner(
917 tmp_id as ObjectId,
918 replace_upstream,
919 None,
920 SinkIntoTableContext {
921 creating_sink_id: Some(incoming_sink_id as _),
922 dropping_sink_id: None,
923 updated_sink_catalogs: vec![],
924 },
925 &txn,
926 streaming_job,
927 None, )
929 .await?;
930
931 Some((relations, fragment_mapping))
932 }
933 None => None,
934 };
935
936 txn.commit().await?;
937
938 self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
939 .await;
940
941 let mut version = self
942 .notify_frontend(
943 notification_op,
944 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
945 )
946 .await;
947
948 if let Some((objects, fragment_mapping)) = replace_table_mapping_update {
949 self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
950 .await;
951 version = self
952 .notify_frontend(
953 NotificationOperation::Update,
954 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
955 )
956 .await;
957 }
958 inner
959 .creating_table_finish_notifier
960 .values_mut()
961 .for_each(|creating_tables| {
962 if let Some(txs) = creating_tables.remove(&job_id) {
963 for tx in txs {
964 let _ = tx.send(Ok(version));
965 }
966 }
967 });
968
969 Ok(())
970 }
971
972 pub async fn finish_replace_streaming_job(
973 &self,
974 tmp_id: ObjectId,
975 streaming_job: StreamingJob,
976 replace_upstream: FragmentReplaceUpstream,
977 col_index_mapping: Option<ColIndexMapping>,
978 sink_into_table_context: SinkIntoTableContext,
979 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
980 ) -> MetaResult<NotificationVersion> {
981 let inner = self.inner.write().await;
982 let txn = inner.db.begin().await?;
983
984 let (objects, fragment_mapping, delete_notification_objs) =
985 Self::finish_replace_streaming_job_inner(
986 tmp_id,
987 replace_upstream,
988 col_index_mapping,
989 sink_into_table_context,
990 &txn,
991 streaming_job,
992 drop_table_connector_ctx,
993 )
994 .await?;
995
996 txn.commit().await?;
997
998 self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
1004 .await;
1005 let mut version = self
1006 .notify_frontend(
1007 NotificationOperation::Update,
1008 NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1009 )
1010 .await;
1011
1012 if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1013 self.notify_users_update(user_infos).await;
1014 version = self
1015 .notify_frontend(
1016 NotificationOperation::Delete,
1017 build_object_group_for_delete(to_drop_objects),
1018 )
1019 .await;
1020 }
1021
1022 Ok(version)
1023 }
1024
1025 pub async fn finish_replace_streaming_job_inner(
1026 tmp_id: ObjectId,
1027 replace_upstream: FragmentReplaceUpstream,
1028 col_index_mapping: Option<ColIndexMapping>,
1029 SinkIntoTableContext {
1030 creating_sink_id,
1031 dropping_sink_id,
1032 updated_sink_catalogs,
1033 }: SinkIntoTableContext,
1034 txn: &DatabaseTransaction,
1035 streaming_job: StreamingJob,
1036 drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1037 ) -> MetaResult<(
1038 Vec<PbObject>,
1039 Vec<PbFragmentWorkerSlotMapping>,
1040 Option<(Vec<PbUserInfo>, Vec<PartialObject>)>,
1041 )> {
1042 let original_job_id = streaming_job.id() as ObjectId;
1043 let job_type = streaming_job.job_type();
1044
1045 match streaming_job {
1047 StreamingJob::Table(_source, table, _table_job_type) => {
1048 let original_table_catalogs = Table::find_by_id(original_job_id)
1051 .select_only()
1052 .columns([table::Column::Columns])
1053 .into_tuple::<ColumnCatalogArray>()
1054 .one(txn)
1055 .await?
1056 .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1057
1058 for sink_id in updated_sink_catalogs {
1060 sink::ActiveModel {
1061 sink_id: Set(sink_id as _),
1062 original_target_columns: Set(Some(original_table_catalogs.clone())),
1063 ..Default::default()
1064 }
1065 .update(txn)
1066 .await?;
1067 }
1068 let mut table = table::ActiveModel::from(table);
1070 let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone();
1071 if let Some(sink_id) = creating_sink_id {
1072 debug_assert!(!incoming_sinks.contains(&{ sink_id }));
1073 incoming_sinks.push(sink_id as _);
1074 }
1075 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1076 && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1077 {
1078 table.optional_associated_source_id = Set(None);
1080 }
1081
1082 if let Some(sink_id) = dropping_sink_id {
1083 let drained = incoming_sinks
1084 .extract_if(.., |id| *id == sink_id)
1085 .collect_vec();
1086 debug_assert_eq!(drained, vec![sink_id]);
1087 }
1088
1089 table.incoming_sinks = Set(incoming_sinks.into());
1090 table.update(txn).await?;
1091 }
1092 StreamingJob::Source(source) => {
1093 let source = source::ActiveModel::from(source);
1095 source.update(txn).await?;
1096 }
1097 _ => unreachable!(
1098 "invalid streaming job type: {:?}",
1099 streaming_job.job_type_str()
1100 ),
1101 }
1102
1103 let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1107 .select_only()
1108 .columns([
1109 fragment::Column::FragmentId,
1110 fragment::Column::StateTableIds,
1111 ])
1112 .filter(fragment::Column::JobId.eq(tmp_id))
1113 .into_tuple()
1114 .all(txn)
1115 .await?;
1116 for (fragment_id, state_table_ids) in fragment_info {
1117 for state_table_id in state_table_ids.into_inner() {
1118 table::ActiveModel {
1119 table_id: Set(state_table_id as _),
1120 fragment_id: Set(Some(fragment_id)),
1121 ..Default::default()
1123 }
1124 .update(txn)
1125 .await?;
1126 }
1127 }
1128
1129 Fragment::delete_many()
1131 .filter(fragment::Column::JobId.eq(original_job_id))
1132 .exec(txn)
1133 .await?;
1134 Fragment::update_many()
1135 .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1136 .filter(fragment::Column::JobId.eq(tmp_id))
1137 .exec(txn)
1138 .await?;
1139
1140 for (fragment_id, fragment_replace_map) in replace_upstream {
1143 let (fragment_id, mut stream_node) = Fragment::find_by_id(fragment_id as FragmentId)
1144 .select_only()
1145 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1146 .into_tuple::<(FragmentId, StreamNode)>()
1147 .one(txn)
1148 .await?
1149 .map(|(id, node)| (id, node.to_protobuf()))
1150 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1151
1152 visit_stream_node_mut(&mut stream_node, |body| {
1153 if let PbNodeBody::Merge(m) = body
1154 && let Some(new_fragment_id) = fragment_replace_map.get(&m.upstream_fragment_id)
1155 {
1156 m.upstream_fragment_id = *new_fragment_id;
1157 }
1158 });
1159 fragment::ActiveModel {
1160 fragment_id: Set(fragment_id),
1161 stream_node: Set(StreamNode::from(&stream_node)),
1162 ..Default::default()
1163 }
1164 .update(txn)
1165 .await?;
1166 }
1167
1168 Object::delete_by_id(tmp_id).exec(txn).await?;
1170
1171 let mut objects = vec![];
1173 match job_type {
1174 StreamingJobType::Table(_) => {
1175 let (table, table_obj) = Table::find_by_id(original_job_id)
1176 .find_also_related(Object)
1177 .one(txn)
1178 .await?
1179 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1180 objects.push(PbObject {
1181 object_info: Some(PbObjectInfo::Table(
1182 ObjectModel(table, table_obj.unwrap()).into(),
1183 )),
1184 })
1185 }
1186 StreamingJobType::Source => {
1187 let (source, source_obj) = Source::find_by_id(original_job_id)
1188 .find_also_related(Object)
1189 .one(txn)
1190 .await?
1191 .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1192 objects.push(PbObject {
1193 object_info: Some(PbObjectInfo::Source(
1194 ObjectModel(source, source_obj.unwrap()).into(),
1195 )),
1196 })
1197 }
1198 _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1199 }
1200 if let Some(table_col_index_mapping) = col_index_mapping {
1201 let expr_rewriter = ReplaceTableExprRewriter {
1202 table_col_index_mapping,
1203 };
1204
1205 let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1206 .select_only()
1207 .columns([index::Column::IndexId, index::Column::IndexItems])
1208 .filter(index::Column::PrimaryTableId.eq(original_job_id))
1209 .into_tuple()
1210 .all(txn)
1211 .await?;
1212 for (index_id, nodes) in index_items {
1213 let mut pb_nodes = nodes.to_protobuf();
1214 pb_nodes
1215 .iter_mut()
1216 .for_each(|x| expr_rewriter.rewrite_expr(x));
1217 let index = index::ActiveModel {
1218 index_id: Set(index_id),
1219 index_items: Set(pb_nodes.into()),
1220 ..Default::default()
1221 }
1222 .update(txn)
1223 .await?;
1224 let index_obj = index
1225 .find_related(Object)
1226 .one(txn)
1227 .await?
1228 .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1229 objects.push(PbObject {
1230 object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1231 });
1232 }
1233 }
1234
1235 let fragment_mapping: Vec<_> = get_fragment_mappings(txn, original_job_id as _).await?;
1236
1237 let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1238 if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1239 notification_objs =
1240 Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1241 }
1242
1243 Ok((objects, fragment_mapping, notification_objs))
1244 }
1245
1246 pub async fn try_abort_replacing_streaming_job(&self, tmp_job_id: ObjectId) -> MetaResult<()> {
1248 let inner = self.inner.write().await;
1249 Object::delete_by_id(tmp_job_id).exec(&inner.db).await?;
1250 Ok(())
1251 }
1252
1253 pub async fn update_source_rate_limit_by_source_id(
1256 &self,
1257 source_id: SourceId,
1258 rate_limit: Option<u32>,
1259 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1260 let inner = self.inner.read().await;
1261 let txn = inner.db.begin().await?;
1262
1263 {
1264 let active_source = source::ActiveModel {
1265 source_id: Set(source_id),
1266 rate_limit: Set(rate_limit.map(|v| v as i32)),
1267 ..Default::default()
1268 };
1269 active_source.update(&txn).await?;
1270 }
1271
1272 let (source, obj) = Source::find_by_id(source_id)
1273 .find_also_related(Object)
1274 .one(&txn)
1275 .await?
1276 .ok_or_else(|| {
1277 MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1278 })?;
1279
1280 let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1281 let streaming_job_ids: Vec<ObjectId> =
1282 if let Some(table_id) = source.optional_associated_table_id {
1283 vec![table_id]
1284 } else if let Some(source_info) = &source.source_info
1285 && source_info.to_protobuf().is_shared()
1286 {
1287 vec![source_id]
1288 } else {
1289 ObjectDependency::find()
1290 .select_only()
1291 .column(object_dependency::Column::UsedBy)
1292 .filter(object_dependency::Column::Oid.eq(source_id))
1293 .into_tuple()
1294 .all(&txn)
1295 .await?
1296 };
1297
1298 if streaming_job_ids.is_empty() {
1299 return Err(MetaError::invalid_parameter(format!(
1300 "source id {source_id} not used by any streaming job"
1301 )));
1302 }
1303
1304 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1305 .select_only()
1306 .columns([
1307 fragment::Column::FragmentId,
1308 fragment::Column::FragmentTypeMask,
1309 fragment::Column::StreamNode,
1310 ])
1311 .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1312 .into_tuple()
1313 .all(&txn)
1314 .await?;
1315 let mut fragments = fragments
1316 .into_iter()
1317 .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf()))
1318 .collect_vec();
1319
1320 fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1321 let mut found = false;
1322 if *fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
1323 visit_stream_node_mut(stream_node, |node| {
1324 if let PbNodeBody::Source(node) = node {
1325 if let Some(node_inner) = &mut node.source_inner
1326 && node_inner.source_id == source_id as u32
1327 {
1328 node_inner.rate_limit = rate_limit;
1329 found = true;
1330 }
1331 }
1332 });
1333 }
1334 if is_fs_source {
1335 visit_stream_node_mut(stream_node, |node| {
1338 if let PbNodeBody::StreamFsFetch(node) = node {
1339 *fragment_type_mask |= PbFragmentTypeFlag::FsFetch as i32;
1340 if let Some(node_inner) = &mut node.node_inner
1341 && node_inner.source_id == source_id as u32
1342 {
1343 node_inner.rate_limit = rate_limit;
1344 found = true;
1345 }
1346 }
1347 });
1348 }
1349 found
1350 });
1351
1352 assert!(
1353 !fragments.is_empty(),
1354 "source id should be used by at least one fragment"
1355 );
1356 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1357 for (id, fragment_type_mask, stream_node) in fragments {
1358 fragment::ActiveModel {
1359 fragment_id: Set(id),
1360 fragment_type_mask: Set(fragment_type_mask),
1361 stream_node: Set(StreamNode::from(&stream_node)),
1362 ..Default::default()
1363 }
1364 .update(&txn)
1365 .await?;
1366 }
1367 let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1368
1369 txn.commit().await?;
1370
1371 let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1372 let _version = self
1373 .notify_frontend(
1374 NotificationOperation::Update,
1375 NotificationInfo::ObjectGroup(PbObjectGroup {
1376 objects: vec![PbObject {
1377 object_info: Some(relation_info),
1378 }],
1379 }),
1380 )
1381 .await;
1382
1383 Ok(fragment_actors)
1384 }
1385
1386 async fn mutate_fragments_by_job_id(
1389 &self,
1390 job_id: ObjectId,
1391 mut fragments_mutation_fn: impl FnMut(&mut i32, &mut PbStreamNode) -> bool,
1393 err_msg: &'static str,
1395 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1396 let inner = self.inner.read().await;
1397 let txn = inner.db.begin().await?;
1398
1399 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1400 .select_only()
1401 .columns([
1402 fragment::Column::FragmentId,
1403 fragment::Column::FragmentTypeMask,
1404 fragment::Column::StreamNode,
1405 ])
1406 .filter(fragment::Column::JobId.eq(job_id))
1407 .into_tuple()
1408 .all(&txn)
1409 .await?;
1410 let mut fragments = fragments
1411 .into_iter()
1412 .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf()))
1413 .collect_vec();
1414
1415 fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1416 fragments_mutation_fn(fragment_type_mask, stream_node)
1417 });
1418 if fragments.is_empty() {
1419 return Err(MetaError::invalid_parameter(format!(
1420 "job id {job_id}: {}",
1421 err_msg
1422 )));
1423 }
1424
1425 let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1426 for (id, _, stream_node) in fragments {
1427 fragment::ActiveModel {
1428 fragment_id: Set(id),
1429 stream_node: Set(StreamNode::from(&stream_node)),
1430 ..Default::default()
1431 }
1432 .update(&txn)
1433 .await?;
1434 }
1435 let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1436
1437 txn.commit().await?;
1438
1439 Ok(fragment_actors)
1440 }
1441
1442 async fn mutate_fragment_by_fragment_id(
1443 &self,
1444 fragment_id: FragmentId,
1445 mut fragment_mutation_fn: impl FnMut(&mut i32, &mut PbStreamNode) -> bool,
1446 err_msg: &'static str,
1447 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1448 let inner = self.inner.read().await;
1449 let txn = inner.db.begin().await?;
1450
1451 let (mut fragment_type_mask, stream_node): (i32, StreamNode) =
1452 Fragment::find_by_id(fragment_id)
1453 .select_only()
1454 .columns([
1455 fragment::Column::FragmentTypeMask,
1456 fragment::Column::StreamNode,
1457 ])
1458 .into_tuple()
1459 .one(&txn)
1460 .await?
1461 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1462 let mut pb_stream_node = stream_node.to_protobuf();
1463
1464 if !fragment_mutation_fn(&mut fragment_type_mask, &mut pb_stream_node) {
1465 return Err(MetaError::invalid_parameter(format!(
1466 "fragment id {fragment_id}: {}",
1467 err_msg
1468 )));
1469 }
1470
1471 fragment::ActiveModel {
1472 fragment_id: Set(fragment_id),
1473 stream_node: Set(stream_node),
1474 ..Default::default()
1475 }
1476 .update(&txn)
1477 .await?;
1478
1479 let fragment_actors = get_fragment_actor_ids(&txn, vec![fragment_id]).await?;
1480
1481 txn.commit().await?;
1482
1483 Ok(fragment_actors)
1484 }
1485
1486 pub async fn update_backfill_rate_limit_by_job_id(
1489 &self,
1490 job_id: ObjectId,
1491 rate_limit: Option<u32>,
1492 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1493 let update_backfill_rate_limit =
1494 |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1495 let mut found = false;
1496 if *fragment_type_mask & PbFragmentTypeFlag::backfill_rate_limit_fragments() != 0 {
1497 visit_stream_node_mut(stream_node, |node| match node {
1498 PbNodeBody::StreamCdcScan(node) => {
1499 node.rate_limit = rate_limit;
1500 found = true;
1501 }
1502 PbNodeBody::StreamScan(node) => {
1503 node.rate_limit = rate_limit;
1504 found = true;
1505 }
1506 PbNodeBody::SourceBackfill(node) => {
1507 node.rate_limit = rate_limit;
1508 found = true;
1509 }
1510 PbNodeBody::Sink(node) => {
1511 node.rate_limit = rate_limit;
1512 found = true;
1513 }
1514 _ => {}
1515 });
1516 }
1517 found
1518 };
1519
1520 self.mutate_fragments_by_job_id(
1521 job_id,
1522 update_backfill_rate_limit,
1523 "stream scan node or source node not found",
1524 )
1525 .await
1526 }
1527
1528 pub async fn update_sink_rate_limit_by_job_id(
1531 &self,
1532 job_id: ObjectId,
1533 rate_limit: Option<u32>,
1534 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1535 let update_sink_rate_limit =
1536 |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1537 let mut found = false;
1538 if *fragment_type_mask & PbFragmentTypeFlag::sink_rate_limit_fragments() != 0 {
1539 visit_stream_node_mut(stream_node, |node| {
1540 if let PbNodeBody::Sink(node) = node {
1541 node.rate_limit = rate_limit;
1542 found = true;
1543 }
1544 });
1545 }
1546 found
1547 };
1548
1549 self.mutate_fragments_by_job_id(job_id, update_sink_rate_limit, "sink node not found")
1550 .await
1551 }
1552
1553 pub async fn update_dml_rate_limit_by_job_id(
1554 &self,
1555 job_id: ObjectId,
1556 rate_limit: Option<u32>,
1557 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1558 let update_dml_rate_limit =
1559 |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1560 let mut found = false;
1561 if *fragment_type_mask & PbFragmentTypeFlag::dml_rate_limit_fragments() != 0 {
1562 visit_stream_node_mut(stream_node, |node| {
1563 if let PbNodeBody::Dml(node) = node {
1564 node.rate_limit = rate_limit;
1565 found = true;
1566 }
1567 });
1568 }
1569 found
1570 };
1571
1572 self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1573 .await
1574 }
1575
1576 pub async fn update_sink_props_by_sink_id(
1577 &self,
1578 sink_id: SinkId,
1579 props: BTreeMap<String, String>,
1580 ) -> MetaResult<HashMap<String, String>> {
1581 let inner = self.inner.read().await;
1582 let txn = inner.db.begin().await?;
1583
1584 let (sink, _obj) = Sink::find_by_id(sink_id)
1585 .find_also_related(Object)
1586 .one(&txn)
1587 .await?
1588 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
1589
1590 match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
1592 Some(connector) => {
1593 let connector_type = connector.to_lowercase();
1594 match_sink_name_str!(
1595 connector_type.as_str(),
1596 SinkType,
1597 {
1598 for (k, v) in &props {
1599 if !SinkType::SINK_ALTER_CONFIG_LIST.contains(&k.as_str()) {
1600 return Err(SinkError::Config(anyhow!(
1601 "unsupported alter config: {}={}",
1602 k,
1603 v
1604 ))
1605 .into());
1606 }
1607 }
1608 let mut new_props = sink.properties.0.clone();
1609 new_props.extend(props.clone());
1610 SinkType::validate_alter_config(&new_props)
1611 },
1612 |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
1613 )?
1614 }
1615 None => {
1616 return Err(
1617 SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
1618 );
1619 }
1620 };
1621 let definition = sink.definition.clone();
1622 let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1623 .map_err(|e| SinkError::Config(anyhow!(e)))?
1624 .try_into()
1625 .unwrap();
1626 if let Statement::CreateSink { stmt } = &mut stmt {
1627 let mut new_sql_options = stmt
1628 .with_properties
1629 .0
1630 .iter()
1631 .map(|sql_option| (&sql_option.name, sql_option))
1632 .collect::<IndexMap<_, _>>();
1633 let add_sql_options = props
1634 .iter()
1635 .map(|(k, v)| SqlOption::try_from((k, v)))
1636 .collect::<Result<Vec<SqlOption>, ParserError>>()
1637 .map_err(|e| SinkError::Config(anyhow!(e)))?;
1638 new_sql_options.extend(
1639 add_sql_options
1640 .iter()
1641 .map(|sql_option| (&sql_option.name, sql_option)),
1642 );
1643 stmt.with_properties.0 = new_sql_options.into_values().cloned().collect();
1644 } else {
1645 panic!("sink definition is not a create sink statement")
1646 }
1647 let mut new_config = sink.properties.clone().into_inner();
1648 new_config.extend(props);
1649
1650 let active_sink = sink::ActiveModel {
1651 sink_id: Set(sink_id),
1652 properties: Set(risingwave_meta_model::Property(new_config.clone())),
1653 definition: Set(stmt.to_string()),
1654 ..Default::default()
1655 };
1656 active_sink.update(&txn).await?;
1657
1658 let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1659 .select_only()
1660 .columns([
1661 fragment::Column::FragmentId,
1662 fragment::Column::FragmentTypeMask,
1663 fragment::Column::StreamNode,
1664 ])
1665 .filter(fragment::Column::JobId.eq(sink_id))
1666 .into_tuple()
1667 .all(&txn)
1668 .await?;
1669 let fragments = fragments
1670 .into_iter()
1671 .filter(|(_, fragment_type_mask, _)| {
1672 *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
1673 })
1674 .filter_map(|(id, _, stream_node)| {
1675 let mut stream_node = stream_node.to_protobuf();
1676 let mut found = false;
1677 visit_stream_node_mut(&mut stream_node, |node| {
1678 if let PbNodeBody::Sink(node) = node
1679 && let Some(sink_desc) = &mut node.sink_desc
1680 && sink_desc.id == sink_id as u32
1681 {
1682 sink_desc.properties = new_config.clone();
1683 found = true;
1684 }
1685 });
1686 if found { Some((id, stream_node)) } else { None }
1687 })
1688 .collect_vec();
1689 assert!(
1690 !fragments.is_empty(),
1691 "sink id should be used by at least one fragment"
1692 );
1693 for (id, stream_node) in fragments {
1694 fragment::ActiveModel {
1695 fragment_id: Set(id),
1696 stream_node: Set(StreamNode::from(&stream_node)),
1697 ..Default::default()
1698 }
1699 .update(&txn)
1700 .await?;
1701 }
1702
1703 let (sink, obj) = Sink::find_by_id(sink_id)
1704 .find_also_related(Object)
1705 .one(&txn)
1706 .await?
1707 .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
1708
1709 txn.commit().await?;
1710
1711 let relation_info = PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into());
1712 let _version = self
1713 .notify_frontend(
1714 NotificationOperation::Update,
1715 NotificationInfo::ObjectGroup(PbObjectGroup {
1716 objects: vec![PbObject {
1717 object_info: Some(relation_info),
1718 }],
1719 }),
1720 )
1721 .await;
1722
1723 Ok(new_config.into_iter().collect())
1724 }
1725
1726 pub async fn update_fragment_rate_limit_by_fragment_id(
1727 &self,
1728 fragment_id: FragmentId,
1729 rate_limit: Option<u32>,
1730 ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1731 let update_rate_limit = |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1732 let mut found = false;
1733 if *fragment_type_mask & PbFragmentTypeFlag::dml_rate_limit_fragments() != 0
1734 || *fragment_type_mask & PbFragmentTypeFlag::sink_rate_limit_fragments() != 0
1735 || *fragment_type_mask & PbFragmentTypeFlag::backfill_rate_limit_fragments() != 0
1736 || *fragment_type_mask & PbFragmentTypeFlag::source_rate_limit_fragments() != 0
1737 {
1738 visit_stream_node_mut(stream_node, |node| {
1739 if let PbNodeBody::Dml(node) = node {
1740 node.rate_limit = rate_limit;
1741 found = true;
1742 }
1743 if let PbNodeBody::Sink(node) = node {
1744 node.rate_limit = rate_limit;
1745 found = true;
1746 }
1747 if let PbNodeBody::StreamCdcScan(node) = node {
1748 node.rate_limit = rate_limit;
1749 found = true;
1750 }
1751 if let PbNodeBody::StreamScan(node) = node {
1752 node.rate_limit = rate_limit;
1753 found = true;
1754 }
1755 if let PbNodeBody::SourceBackfill(node) = node {
1756 node.rate_limit = rate_limit;
1757 found = true;
1758 }
1759 });
1760 }
1761 found
1762 };
1763 self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
1764 .await
1765 }
1766
1767 pub async fn post_apply_reschedules(
1768 &self,
1769 reschedules: HashMap<FragmentId, Reschedule>,
1770 post_updates: &JobReschedulePostUpdates,
1771 ) -> MetaResult<()> {
1772 let new_created_actors: HashSet<_> = reschedules
1773 .values()
1774 .flat_map(|reschedule| {
1775 reschedule
1776 .added_actors
1777 .values()
1778 .flatten()
1779 .map(|actor_id| *actor_id as ActorId)
1780 })
1781 .collect();
1782
1783 let inner = self.inner.write().await;
1784
1785 let txn = inner.db.begin().await?;
1786
1787 let mut fragment_mapping_to_notify = vec![];
1788
1789 for (
1790 fragment_id,
1791 Reschedule {
1792 removed_actors,
1793 vnode_bitmap_updates,
1794 actor_splits,
1795 newly_created_actors,
1796 ..
1797 },
1798 ) in reschedules
1799 {
1800 Actor::delete_many()
1802 .filter(
1803 actor::Column::ActorId
1804 .is_in(removed_actors.iter().map(|id| *id as ActorId).collect_vec()),
1805 )
1806 .exec(&txn)
1807 .await?;
1808
1809 for (
1811 (
1812 StreamActor {
1813 actor_id,
1814 fragment_id,
1815 vnode_bitmap,
1816 expr_context,
1817 ..
1818 },
1819 _,
1820 ),
1821 worker_id,
1822 ) in newly_created_actors.into_values()
1823 {
1824 let splits = actor_splits
1825 .get(&actor_id)
1826 .map(|splits| splits.iter().map(PbConnectorSplit::from).collect_vec());
1827
1828 Actor::insert(actor::ActiveModel {
1829 actor_id: Set(actor_id as _),
1830 fragment_id: Set(fragment_id as _),
1831 status: Set(ActorStatus::Running),
1832 splits: Set(splits.map(|splits| (&PbConnectorSplits { splits }).into())),
1833 worker_id: Set(worker_id),
1834 upstream_actor_ids: Set(Default::default()),
1835 vnode_bitmap: Set(vnode_bitmap
1836 .as_ref()
1837 .map(|bitmap| (&bitmap.to_protobuf()).into())),
1838 expr_context: Set(expr_context.as_ref().unwrap().into()),
1839 })
1840 .exec(&txn)
1841 .await?;
1842 }
1843
1844 for (actor_id, bitmap) in vnode_bitmap_updates {
1846 let actor = Actor::find_by_id(actor_id as ActorId)
1847 .one(&txn)
1848 .await?
1849 .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
1850
1851 let mut actor = actor.into_active_model();
1852 actor.vnode_bitmap = Set(Some((&bitmap.to_protobuf()).into()));
1853 actor.update(&txn).await?;
1854 }
1855
1856 for (actor_id, splits) in actor_splits {
1858 if new_created_actors.contains(&(actor_id as ActorId)) {
1859 continue;
1860 }
1861
1862 let actor = Actor::find_by_id(actor_id as ActorId)
1863 .one(&txn)
1864 .await?
1865 .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
1866
1867 let mut actor = actor.into_active_model();
1868 let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
1869 actor.splits = Set(Some((&PbConnectorSplits { splits }).into()));
1870 actor.update(&txn).await?;
1871 }
1872
1873 let fragment = Fragment::find_by_id(fragment_id)
1875 .one(&txn)
1876 .await?
1877 .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1878
1879 let job_actors = fragment
1880 .find_related(Actor)
1881 .all(&txn)
1882 .await?
1883 .into_iter()
1884 .map(|actor| {
1885 (
1886 fragment_id,
1887 fragment.distribution_type,
1888 actor.actor_id,
1889 actor.vnode_bitmap,
1890 actor.worker_id,
1891 actor.status,
1892 )
1893 })
1894 .collect_vec();
1895
1896 fragment_mapping_to_notify.extend(rebuild_fragment_mapping_from_actors(job_actors));
1897 }
1898
1899 let JobReschedulePostUpdates {
1900 parallelism_updates,
1901 resource_group_updates,
1902 } = post_updates;
1903
1904 for (table_id, parallelism) in parallelism_updates {
1905 let mut streaming_job = StreamingJobModel::find_by_id(table_id.table_id() as ObjectId)
1906 .one(&txn)
1907 .await?
1908 .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?
1909 .into_active_model();
1910
1911 streaming_job.parallelism = Set(match parallelism {
1912 TableParallelism::Adaptive => StreamingParallelism::Adaptive,
1913 TableParallelism::Fixed(n) => StreamingParallelism::Fixed(*n as _),
1914 TableParallelism::Custom => StreamingParallelism::Custom,
1915 });
1916
1917 if let Some(resource_group) =
1918 resource_group_updates.get(&(table_id.table_id() as ObjectId))
1919 {
1920 streaming_job.specific_resource_group = Set(resource_group.to_owned());
1921 }
1922
1923 streaming_job.update(&txn).await?;
1924 }
1925
1926 txn.commit().await?;
1927 self.notify_fragment_mapping(Operation::Update, fragment_mapping_to_notify)
1928 .await;
1929
1930 Ok(())
1931 }
1932
1933 pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
1936 let inner = self.inner.read().await;
1937 let txn = inner.db.begin().await?;
1938
1939 let fragments: Vec<(FragmentId, ObjectId, i32, StreamNode)> = Fragment::find()
1940 .select_only()
1941 .columns([
1942 fragment::Column::FragmentId,
1943 fragment::Column::JobId,
1944 fragment::Column::FragmentTypeMask,
1945 fragment::Column::StreamNode,
1946 ])
1947 .filter(fragment_type_mask_intersects(
1948 PbFragmentTypeFlag::rate_limit_fragments(),
1949 ))
1950 .into_tuple()
1951 .all(&txn)
1952 .await?;
1953
1954 let mut rate_limits = Vec::new();
1955 for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
1956 let stream_node = stream_node.to_protobuf();
1957 let mut rate_limit = None;
1958 let mut node_name = None;
1959
1960 visit_stream_node(&stream_node, |node| {
1961 match node {
1962 PbNodeBody::Source(node) => {
1964 if let Some(node_inner) = &node.source_inner {
1965 debug_assert!(
1966 rate_limit.is_none(),
1967 "one fragment should only have 1 rate limit node"
1968 );
1969 rate_limit = node_inner.rate_limit;
1970 node_name = Some("SOURCE");
1971 }
1972 }
1973 PbNodeBody::StreamFsFetch(node) => {
1974 if let Some(node_inner) = &node.node_inner {
1975 debug_assert!(
1976 rate_limit.is_none(),
1977 "one fragment should only have 1 rate limit node"
1978 );
1979 rate_limit = node_inner.rate_limit;
1980 node_name = Some("FS_FETCH");
1981 }
1982 }
1983 PbNodeBody::SourceBackfill(node) => {
1985 debug_assert!(
1986 rate_limit.is_none(),
1987 "one fragment should only have 1 rate limit node"
1988 );
1989 rate_limit = node.rate_limit;
1990 node_name = Some("SOURCE_BACKFILL");
1991 }
1992 PbNodeBody::StreamScan(node) => {
1993 debug_assert!(
1994 rate_limit.is_none(),
1995 "one fragment should only have 1 rate limit node"
1996 );
1997 rate_limit = node.rate_limit;
1998 node_name = Some("STREAM_SCAN");
1999 }
2000 PbNodeBody::StreamCdcScan(node) => {
2001 debug_assert!(
2002 rate_limit.is_none(),
2003 "one fragment should only have 1 rate limit node"
2004 );
2005 rate_limit = node.rate_limit;
2006 node_name = Some("STREAM_CDC_SCAN");
2007 }
2008 PbNodeBody::Sink(node) => {
2009 debug_assert!(
2010 rate_limit.is_none(),
2011 "one fragment should only have 1 rate limit node"
2012 );
2013 rate_limit = node.rate_limit;
2014 node_name = Some("SINK");
2015 }
2016 _ => {}
2017 }
2018 });
2019
2020 if let Some(rate_limit) = rate_limit {
2021 rate_limits.push(RateLimitInfo {
2022 fragment_id: fragment_id as u32,
2023 job_id: job_id as u32,
2024 fragment_type_mask: fragment_type_mask as u32,
2025 rate_limit,
2026 node_name: node_name.unwrap().to_owned(),
2027 });
2028 }
2029 }
2030
2031 Ok(rate_limits)
2032 }
2033}
2034
2035fn bitflag_intersects(column: SimpleExpr, value: i32) -> SimpleExpr {
2036 column
2037 .binary(BinOper::Custom("&"), value)
2038 .binary(BinOper::NotEqual, 0)
2039}
2040
2041fn fragment_type_mask_intersects(value: i32) -> SimpleExpr {
2042 bitflag_intersects(fragment::Column::FragmentTypeMask.into_simple_expr(), value)
2043}
2044
2045pub struct SinkIntoTableContext {
2046 pub creating_sink_id: Option<SinkId>,
2048 pub dropping_sink_id: Option<SinkId>,
2050 pub updated_sink_catalogs: Vec<SinkId>,
2053}