risingwave_meta_model_migration/
m20251124_195858_pending_sink_state.rs

1use sea_orm_migration::prelude::*;
2
3use crate::drop_tables;
4use crate::sea_orm::{FromQueryResult, Statement};
5use crate::utils::ColumnDefExt;
6
7#[derive(DeriveMigrationName)]
8pub struct Migration;
9
10#[derive(FromQueryResult, Debug)]
11struct OldRow {
12    pub sink_id: i32,
13    pub end_epoch: i64,
14    pub metadata: Vec<u8>,
15    pub snapshot_id: i64,
16    pub committed: bool,
17}
18
19// Logic referenced from src/connector/src/sink/iceberg/mod.rs
20fn transform_metadata(metadata: Vec<u8>, snapshot_id: i64) -> Vec<u8> {
21    let mut write_results_bytes: Vec<Vec<u8>> = serde_json::from_slice(&metadata).unwrap();
22    let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
23    write_results_bytes.push(snapshot_id_bytes);
24    serde_json::to_vec(&write_results_bytes).unwrap()
25}
26
27#[async_trait::async_trait]
28impl MigrationTrait for Migration {
29    async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
30        manager
31            .create_table(
32                Table::create()
33                    .table(PendingSinkState::Table)
34                    .if_not_exists()
35                    .col(
36                        ColumnDef::new(PendingSinkState::SinkId)
37                            .integer()
38                            .not_null(),
39                    )
40                    .col(
41                        ColumnDef::new(PendingSinkState::Epoch)
42                            .big_integer()
43                            .not_null(),
44                    )
45                    .col(
46                        ColumnDef::new(PendingSinkState::SinkState)
47                            .string()
48                            .not_null(),
49                    )
50                    .col(
51                        ColumnDef::new(PendingSinkState::Metadata)
52                            .rw_binary(manager)
53                            .not_null(),
54                    )
55                    .primary_key(
56                        Index::create()
57                            .col(PendingSinkState::SinkId)
58                            .col(PendingSinkState::Epoch),
59                    )
60                    .to_owned(),
61            )
62            .await?;
63
64        // TODO: if `ExactlyOnceIcebergSinkMetadata` table is deprecated, clean up the data migration code below.
65        assert!(
66            manager
67                .has_table(ExactlyOnceIcebergSinkMetadata::Table.to_string())
68                .await?
69        );
70
71        let conn = manager.get_connection();
72
73        let (sql, values) = Query::select()
74            .columns([
75                ExactlyOnceIcebergSinkMetadata::SinkId,
76                ExactlyOnceIcebergSinkMetadata::EndEpoch,
77                ExactlyOnceIcebergSinkMetadata::Metadata,
78                ExactlyOnceIcebergSinkMetadata::SnapshotId,
79                ExactlyOnceIcebergSinkMetadata::Committed,
80            ])
81            .from(ExactlyOnceIcebergSinkMetadata::Table)
82            .to_owned()
83            .build_any(&*conn.get_database_backend().get_query_builder());
84
85        let rows = conn
86            .query_all(Statement::from_sql_and_values(
87                conn.get_database_backend(),
88                sql,
89                values,
90            ))
91            .await?;
92
93        if !rows.is_empty() {
94            let mut insert = Query::insert();
95            insert
96                .into_table(PendingSinkState::Table)
97                .columns([
98                    PendingSinkState::SinkId,
99                    PendingSinkState::Epoch,
100                    PendingSinkState::SinkState,
101                    PendingSinkState::Metadata,
102                ])
103                .on_conflict(
104                    sea_query::OnConflict::columns([
105                        PendingSinkState::SinkId,
106                        PendingSinkState::Epoch,
107                    ])
108                    .do_nothing()
109                    .to_owned(),
110                );
111            for row in rows {
112                let OldRow {
113                    sink_id,
114                    end_epoch,
115                    metadata,
116                    snapshot_id,
117                    committed,
118                } = OldRow::from_query_result(&row, "")?;
119                let sink_state = if committed { "COMMITTED" } else { "PENDING" };
120                let combined_metadata = transform_metadata(metadata, snapshot_id);
121                insert.values_panic([
122                    sink_id.into(),
123                    end_epoch.into(),
124                    sink_state.into(),
125                    combined_metadata.into(),
126                ]);
127            }
128
129            manager.exec_stmt(insert.to_owned()).await?;
130        }
131
132        Ok(())
133    }
134
135    async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
136        drop_tables!(manager, PendingSinkState);
137        Ok(())
138    }
139}
140
141#[derive(DeriveIden)]
142enum PendingSinkState {
143    Table,
144    SinkId,
145    Epoch,
146    SinkState,
147    Metadata,
148}
149
150#[derive(DeriveIden)]
151enum ExactlyOnceIcebergSinkMetadata {
152    Table,
153    SinkId,
154    EndEpoch,
155    Metadata,
156    SnapshotId,
157    Committed,
158}