risingwave_meta_model_migration/
m20251124_195858_pending_sink_state.rs1use sea_orm_migration::prelude::*;
2
3use crate::drop_tables;
4use crate::sea_orm::{FromQueryResult, Statement};
5use crate::utils::ColumnDefExt;
6
7#[derive(DeriveMigrationName)]
8pub struct Migration;
9
10#[derive(FromQueryResult, Debug)]
11struct OldRow {
12 pub sink_id: i32,
13 pub end_epoch: i64,
14 pub metadata: Vec<u8>,
15 pub snapshot_id: i64,
16 pub committed: bool,
17}
18
19fn transform_metadata(metadata: Vec<u8>, snapshot_id: i64) -> Vec<u8> {
21 let mut write_results_bytes: Vec<Vec<u8>> = serde_json::from_slice(&metadata).unwrap();
22 let snapshot_id_bytes: Vec<u8> = snapshot_id.to_le_bytes().to_vec();
23 write_results_bytes.push(snapshot_id_bytes);
24 serde_json::to_vec(&write_results_bytes).unwrap()
25}
26
27#[async_trait::async_trait]
28impl MigrationTrait for Migration {
29 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
30 manager
31 .create_table(
32 Table::create()
33 .table(PendingSinkState::Table)
34 .if_not_exists()
35 .col(
36 ColumnDef::new(PendingSinkState::SinkId)
37 .integer()
38 .not_null(),
39 )
40 .col(
41 ColumnDef::new(PendingSinkState::Epoch)
42 .big_integer()
43 .not_null(),
44 )
45 .col(
46 ColumnDef::new(PendingSinkState::SinkState)
47 .string()
48 .not_null(),
49 )
50 .col(
51 ColumnDef::new(PendingSinkState::Metadata)
52 .rw_binary(manager)
53 .not_null(),
54 )
55 .primary_key(
56 Index::create()
57 .col(PendingSinkState::SinkId)
58 .col(PendingSinkState::Epoch),
59 )
60 .to_owned(),
61 )
62 .await?;
63
64 assert!(
66 manager
67 .has_table(ExactlyOnceIcebergSinkMetadata::Table.to_string())
68 .await?
69 );
70
71 let conn = manager.get_connection();
72
73 let (sql, values) = Query::select()
74 .columns([
75 ExactlyOnceIcebergSinkMetadata::SinkId,
76 ExactlyOnceIcebergSinkMetadata::EndEpoch,
77 ExactlyOnceIcebergSinkMetadata::Metadata,
78 ExactlyOnceIcebergSinkMetadata::SnapshotId,
79 ExactlyOnceIcebergSinkMetadata::Committed,
80 ])
81 .from(ExactlyOnceIcebergSinkMetadata::Table)
82 .to_owned()
83 .build_any(&*conn.get_database_backend().get_query_builder());
84
85 let rows = conn
86 .query_all(Statement::from_sql_and_values(
87 conn.get_database_backend(),
88 sql,
89 values,
90 ))
91 .await?;
92
93 if !rows.is_empty() {
94 let mut insert = Query::insert();
95 insert
96 .into_table(PendingSinkState::Table)
97 .columns([
98 PendingSinkState::SinkId,
99 PendingSinkState::Epoch,
100 PendingSinkState::SinkState,
101 PendingSinkState::Metadata,
102 ])
103 .on_conflict(
104 sea_query::OnConflict::columns([
105 PendingSinkState::SinkId,
106 PendingSinkState::Epoch,
107 ])
108 .do_nothing()
109 .to_owned(),
110 );
111 for row in rows {
112 let OldRow {
113 sink_id,
114 end_epoch,
115 metadata,
116 snapshot_id,
117 committed,
118 } = OldRow::from_query_result(&row, "")?;
119 let sink_state = if committed { "COMMITTED" } else { "PENDING" };
120 let combined_metadata = transform_metadata(metadata, snapshot_id);
121 insert.values_panic([
122 sink_id.into(),
123 end_epoch.into(),
124 sink_state.into(),
125 combined_metadata.into(),
126 ]);
127 }
128
129 manager.exec_stmt(insert.to_owned()).await?;
130 }
131
132 Ok(())
133 }
134
135 async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
136 drop_tables!(manager, PendingSinkState);
137 Ok(())
138 }
139}
140
141#[derive(DeriveIden)]
142enum PendingSinkState {
143 Table,
144 SinkId,
145 Epoch,
146 SinkState,
147 Metadata,
148}
149
150#[derive(DeriveIden)]
151enum ExactlyOnceIcebergSinkMetadata {
152 Table,
153 SinkId,
154 EndEpoch,
155 Metadata,
156 SnapshotId,
157 Committed,
158}