risingwave_meta_model_migration/
m20240617_071625_sink_into_table_column.rs1use sea_orm_migration::prelude::{Table as MigrationTable, *};
2
3use crate::SubQueryStatement::SelectStatement;
4use crate::utils::ColumnDefExt;
5
6#[derive(DeriveMigrationName)]
7pub struct Migration;
8
9#[async_trait::async_trait]
10impl MigrationTrait for Migration {
11 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
12 manager
13 .alter_table(
14 MigrationTable::alter()
15 .table(Sink::Table)
16 .add_column(ColumnDef::new(Sink::OriginalTargetColumns).rw_binary(manager))
17 .to_owned(),
18 )
19 .await?;
20
21 let stmt = Query::update()
22 .table(Sink::Table)
23 .value(
24 Sink::OriginalTargetColumns,
25 SimpleExpr::SubQuery(
26 None,
27 Box::new(SelectStatement(
28 Query::select()
29 .column((Table::Table, Table::Columns))
30 .from(Table::Table)
31 .and_where(
32 Expr::col((Table::Table, Table::TableId))
33 .equals((Sink::Table, Sink::TargetTable)),
34 )
35 .to_owned(),
36 )),
37 ),
38 )
39 .and_where(Expr::col((Sink::Table, Sink::TargetTable)).is_not_null())
40 .to_owned();
41
42 manager.exec_stmt(stmt).await?;
43
44 Ok(())
45 }
46
47 async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
48 manager
49 .alter_table(
50 MigrationTable::alter()
51 .table(Sink::Table)
52 .drop_column(Sink::OriginalTargetColumns)
53 .to_owned(),
54 )
55 .await
56 }
57}
58
59#[derive(DeriveIden)]
60enum Sink {
61 Table,
62 TargetTable,
63 OriginalTargetColumns,
64}
65
66#[derive(DeriveIden)]
67enum Table {
68 Table,
69 TableId,
70 Columns,
71}