risingwave_meta_model_migration/
m20250106_072104_fragment_relation.rs1use sea_orm::{FromJsonQueryResult, FromQueryResult, Statement};
2use sea_orm_migration::prelude::*;
3use serde::{Deserialize, Serialize};
4
5use crate::drop_tables;
6
7#[derive(DeriveMigrationName)]
8pub struct Migration;
9
10#[async_trait::async_trait]
11impl MigrationTrait for Migration {
12 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
13 manager
14 .create_table(
15 Table::create()
16 .table(FragmentRelation::Table)
17 .col(
18 ColumnDef::new(FragmentRelation::SourceFragmentId)
19 .integer()
20 .not_null(),
21 )
22 .col(
23 ColumnDef::new(FragmentRelation::TargetFragmentId)
24 .integer()
25 .not_null(),
26 )
27 .col(
28 ColumnDef::new(FragmentRelation::DispatcherType)
29 .string()
30 .not_null(),
31 )
32 .col(
33 ColumnDef::new(FragmentRelation::DistKeyIndices)
34 .json_binary()
35 .not_null(),
36 )
37 .col(
38 ColumnDef::new(FragmentRelation::OutputIndices)
39 .json_binary()
40 .not_null(),
41 )
42 .primary_key(
43 Index::create()
44 .col(FragmentRelation::SourceFragmentId)
45 .col(FragmentRelation::TargetFragmentId),
46 )
47 .foreign_key(
48 &mut ForeignKey::create()
49 .name("FK_fragment_relation_source_oid")
50 .from(FragmentRelation::Table, FragmentRelation::SourceFragmentId)
51 .to(Fragment::Table, Fragment::FragmentId)
52 .on_delete(ForeignKeyAction::Cascade)
53 .to_owned(),
54 )
55 .foreign_key(
56 &mut ForeignKey::create()
57 .name("FK_fragment_relation_target_oid")
58 .from(FragmentRelation::Table, FragmentRelation::TargetFragmentId)
59 .to(Fragment::Table, Fragment::FragmentId)
60 .on_delete(ForeignKeyAction::Cascade)
61 .to_owned(),
62 )
63 .to_owned(),
64 )
65 .await?;
66
67 fulfill_fragment_relation(manager).await?;
68
69 Ok(())
70 }
71
72 async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
73 drop_tables!(manager, FragmentRelation);
74 Ok(())
75 }
76}
77
78async fn fulfill_fragment_relation(manager: &SchemaManager<'_>) -> Result<(), DbErr> {
80 let connection = manager.get_connection();
81
82 let database_backend = connection.get_database_backend();
83
84 let (sql, values) = Query::select()
85 .distinct()
87 .expr_as(
88 Expr::col((Actor::Table, Actor::FragmentId)),
89 FragmentRelation::SourceFragmentId,
90 )
91 .expr_as(
92 Expr::col((ActorDispatcher::Table, ActorDispatcher::DispatcherId)),
93 FragmentRelation::TargetFragmentId,
94 )
95 .columns([
96 (ActorDispatcher::Table, ActorDispatcher::DispatcherType),
97 (ActorDispatcher::Table, ActorDispatcher::DistKeyIndices),
98 (ActorDispatcher::Table, ActorDispatcher::OutputIndices),
99 ])
100 .from(Actor::Table)
101 .join(
102 JoinType::InnerJoin,
103 ActorDispatcher::Table,
104 Expr::col((Actor::Table, Actor::ActorId))
105 .equals((ActorDispatcher::Table, ActorDispatcher::ActorId)),
106 )
107 .to_owned()
108 .build_any(&*database_backend.get_query_builder());
109
110 let rows = connection
111 .query_all(Statement::from_sql_and_values(
112 database_backend,
113 sql,
114 values,
115 ))
116 .await?;
117
118 for row in rows {
119 let FragmentRelationEntity {
120 source_fragment_id,
121 target_fragment_id,
122 dispatcher_type,
123 dist_key_indices,
124 output_indices,
125 } = FragmentRelationEntity::from_query_result(&row, "")?;
126
127 manager
128 .exec_stmt(
129 Query::insert()
130 .into_table(FragmentRelation::Table)
131 .columns([
132 FragmentRelation::SourceFragmentId,
133 FragmentRelation::TargetFragmentId,
134 FragmentRelation::DispatcherType,
135 FragmentRelation::DistKeyIndices,
136 FragmentRelation::OutputIndices,
137 ])
138 .values_panic([
139 source_fragment_id.into(),
140 target_fragment_id.into(),
141 dispatcher_type.into(),
142 dist_key_indices.into(),
143 output_indices.into(),
144 ])
145 .to_owned(),
146 )
147 .await?;
148 }
149
150 Ok(())
151}
152
153#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
154pub struct I32Array(pub Vec<i32>);
155
156#[derive(Debug, FromQueryResult)]
157#[sea_orm(entity = "FragmentRelation")]
158pub struct FragmentRelationEntity {
159 source_fragment_id: i32,
160 target_fragment_id: i32,
161 dispatcher_type: String,
162 dist_key_indices: I32Array,
163 output_indices: I32Array,
164}
165
166#[derive(DeriveIden)]
167enum FragmentRelation {
168 Table,
169 SourceFragmentId,
170 TargetFragmentId,
171 DispatcherType,
172 DistKeyIndices,
173 OutputIndices,
174}
175
176#[derive(DeriveIden)]
177enum Fragment {
178 Table,
179 FragmentId,
180}
181
182#[derive(DeriveIden)]
183enum Actor {
184 Table,
185 ActorId,
186 FragmentId,
187}
188
189#[derive(DeriveIden)]
190enum ActorDispatcher {
191 Table,
192 ActorId,
193 DispatcherType,
194 DistKeyIndices,
195 OutputIndices,
196 DispatcherId,
197}