risingwave_meta_model_migration/
m20240630_131430_remove_parallel_unit.rs1use sea_orm_migration::prelude::*;
2use serde::{Deserialize, Serialize};
3
4use crate::sea_orm::{FromJsonQueryResult, FromQueryResult, Statement};
5use crate::utils::ColumnDefExt;
6
7#[derive(DeriveMigrationName)]
8pub struct Migration;
9
10#[async_trait::async_trait]
11impl MigrationTrait for Migration {
12 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
13 manager
14 .alter_table(
15 Table::alter()
16 .table(WorkerProperty::Table)
17 .add_column(ColumnDef::new(WorkerProperty::Parallelism).integer())
18 .to_owned(),
19 )
20 .await?;
21
22 set_worker_parallelism(manager).await?;
23
24 manager
25 .alter_table(
26 Table::alter()
27 .table(WorkerProperty::Table)
28 .drop_column(WorkerProperty::ParallelUnitIds)
29 .to_owned(),
30 )
31 .await?;
32
33 manager
34 .alter_table(
35 Table::alter()
36 .table(Fragment::Table)
37 .drop_column(Fragment::VnodeMapping)
38 .to_owned(),
39 )
40 .await?;
41
42 manager
43 .alter_table(
44 Table::alter()
45 .table(Actor::Table)
46 .drop_column(Actor::ParallelUnitId)
47 .to_owned(),
48 )
49 .await
50 }
51
52 async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
53 manager
54 .alter_table(
55 Table::alter()
56 .table(WorkerProperty::Table)
57 .drop_column(WorkerProperty::Parallelism)
58 .to_owned(),
59 )
60 .await?;
61
62 manager
63 .alter_table(
64 Table::alter()
65 .table(WorkerProperty::Table)
66 .add_column(
67 ColumnDef::new(WorkerProperty::ParallelUnitIds)
68 .json_binary()
69 .not_null(),
70 )
71 .to_owned(),
72 )
73 .await?;
74
75 manager
76 .alter_table(
77 Table::alter()
78 .table(Fragment::Table)
79 .add_column(
80 ColumnDef::new(Fragment::VnodeMapping)
81 .rw_binary(manager)
82 .not_null(),
83 )
84 .to_owned(),
85 )
86 .await?;
87
88 manager
89 .alter_table(
90 Table::alter()
91 .table(Actor::Table)
92 .add_column(ColumnDef::new(Actor::ParallelUnitId).integer().not_null())
93 .to_owned(),
94 )
95 .await
96 }
97}
98
99async fn set_worker_parallelism(manager: &SchemaManager<'_>) -> Result<(), DbErr> {
101 let connection = manager.get_connection();
102
103 let database_backend = connection.get_database_backend();
104
105 let (sql, values) = Query::select()
106 .columns([
107 (WorkerProperty::Table, WorkerProperty::WorkerId),
108 (WorkerProperty::Table, WorkerProperty::ParallelUnitIds),
109 ])
110 .from(WorkerProperty::Table)
111 .to_owned()
112 .build_any(&*database_backend.get_query_builder());
113
114 let stmt = Statement::from_sql_and_values(database_backend, sql, values);
115
116 for WorkerPropertyParallelUnitIds {
117 worker_id,
118 parallel_unit_ids,
119 } in WorkerPropertyParallelUnitIds::find_by_statement(stmt)
120 .all(connection)
121 .await?
122 {
123 manager
124 .exec_stmt(
125 Query::update()
126 .table(WorkerProperty::Table)
127 .value(
128 WorkerProperty::Parallelism,
129 Expr::value(parallel_unit_ids.0.len() as i32),
130 )
131 .and_where(Expr::col(WorkerProperty::WorkerId).eq(worker_id))
132 .to_owned(),
133 )
134 .await?;
135 }
136 Ok(())
137}
138#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
139pub struct I32Array(pub Vec<i32>);
140
141#[derive(Debug, FromQueryResult)]
142pub struct WorkerPropertyParallelUnitIds {
143 worker_id: i32,
144 parallel_unit_ids: I32Array,
145}
146
147#[derive(DeriveIden)]
148enum WorkerProperty {
149 Table,
150 WorkerId,
151 Parallelism,
152 ParallelUnitIds,
153}
154
155#[derive(DeriveIden)]
156enum Fragment {
157 Table,
158 VnodeMapping,
159}
160
161#[derive(DeriveIden)]
162enum Actor {
163 Table,
164 ParallelUnitId,
165}