risingwave_meta_model_migration/
m20251231_000000_sink_ignore_delete.rs

1use sea_orm_migration::prelude::{Table as MigrationTable, *};
2
3#[derive(DeriveMigrationName)]
4pub struct Migration;
5
6#[async_trait::async_trait]
7impl MigrationTrait for Migration {
8    async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
9        manager
10            .alter_table(
11                MigrationTable::alter()
12                    .table(Sink::Table)
13                    .add_column(
14                        ColumnDef::new(Sink::IgnoreDelete)
15                            .boolean()
16                            .not_null()
17                            .default(false),
18                    )
19                    .to_owned(),
20            )
21            .await?;
22
23        // Normalize deprecated FORCE_APPEND_ONLY rows to APPEND_ONLY + ignore_delete = true.
24        let stmt = Query::update()
25            .table(Sink::Table)
26            .value(Sink::IgnoreDelete, true)
27            .value(Sink::SinkType, "APPEND_ONLY")
28            .and_where(Expr::col(Sink::SinkType).eq("FORCE_APPEND_ONLY"))
29            .to_owned();
30        manager.exec_stmt(stmt).await?;
31
32        Ok(())
33    }
34
35    async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> {
36        Ok(())
37    }
38}
39
40#[derive(DeriveIden)]
41#[allow(clippy::enum_variant_names)]
42enum Sink {
43    Table,
44    SinkType,
45    IgnoreDelete,
46}