risingwave_meta/manager/
exactly_once_util.rs1use risingwave_meta_model::pending_sink_state::{self};
16use risingwave_meta_model::{Epoch, SinkId, SinkSchemachange};
17use risingwave_pb::stream_plan::PbSinkSchemaChange;
18use sea_orm::{
19 ColumnTrait, DatabaseConnection, EntityTrait, Order, QueryFilter, QueryOrder, QuerySelect, Set,
20 TransactionTrait,
21};
22use thiserror_ext::AsReport;
23
24pub async fn persist_pre_commit_metadata(
28 db: &DatabaseConnection,
29 sink_id: SinkId,
30 epoch: u64,
31 commit_metadata: Option<Vec<u8>>,
32 schema_change: Option<&PbSinkSchemaChange>,
33) -> anyhow::Result<()> {
34 fail::fail_point!("iceberg_v3_persist_pre_commit_fail", |_| Err(
35 anyhow::anyhow!("injected: iceberg_v3_persist_pre_commit_fail")
36 ));
37 let schema_change = schema_change.map(Into::into);
38 let m = pending_sink_state::ActiveModel {
39 sink_id: Set(sink_id),
40 epoch: Set(epoch as Epoch),
41 sink_state: Set(pending_sink_state::SinkState::Pending),
42 metadata: Set(commit_metadata),
43 schema_change: Set(schema_change),
44 };
45 match pending_sink_state::Entity::insert(m).exec(db).await {
46 Ok(_) => Ok(()),
47 Err(e) => {
48 tracing::error!(
49 "Error inserting into exactly once system table: {:?}",
50 e.as_report()
51 );
52 Err(e.into())
53 }
54 }
55}
56
57pub async fn commit_and_prune_epoch(
58 db: &DatabaseConnection,
59 sink_id: SinkId,
60 epoch: u64,
61 prev_epoch: Option<u64>,
62) -> anyhow::Result<()> {
63 fail::fail_point!("iceberg_v3_commit_prune_fail", |_| Err(anyhow::anyhow!(
64 "injected: iceberg_v3_commit_prune_fail"
65 )));
66 let txn = db.begin().await?;
67 pending_sink_state::Entity::update(pending_sink_state::ActiveModel {
68 sink_id: Set(sink_id),
69 epoch: Set(epoch as Epoch),
70 sink_state: Set(pending_sink_state::SinkState::Committed),
71 ..Default::default()
72 })
73 .exec(&txn)
74 .await?;
75
76 if let Some(prev_epoch) = prev_epoch {
77 pending_sink_state::Entity::delete_many()
78 .filter(
79 pending_sink_state::Column::SinkId
80 .eq(sink_id)
81 .and(pending_sink_state::Column::Epoch.eq(prev_epoch as Epoch)),
82 )
83 .exec(&txn)
84 .await?;
85 }
86
87 match txn.commit().await {
88 Ok(_) => Ok(()),
89 Err(e) => {
90 tracing::error!(
91 "Error marking item to committed exactly once system table: {:?}",
92 e.as_report()
93 );
94 Err(e.into())
95 }
96 }
97}
98
99pub async fn clean_aborted_records(
100 db: &DatabaseConnection,
101 sink_id: SinkId,
102 aborted_epochs: Vec<u64>,
103) -> anyhow::Result<()> {
104 match pending_sink_state::Entity::delete_many()
105 .filter(
106 pending_sink_state::Column::SinkId
107 .eq(sink_id)
108 .and(pending_sink_state::Column::Epoch.is_in(aborted_epochs)),
109 )
110 .exec(db)
111 .await
112 {
113 Ok(_) => Ok(()),
114 Err(e) => {
115 tracing::error!(
116 "Error deleting records from exactly once system table: {:?}",
117 e.as_report()
118 );
119 Err(e.into())
120 }
121 }
122}
123
124type PendingSinkStateRow = (
125 Epoch,
126 pending_sink_state::SinkState,
127 Option<Vec<u8>>,
128 Option<SinkSchemachange>,
129);
130
131pub async fn list_sink_states_ordered_by_epoch(
132 db: &DatabaseConnection,
133 sink_id: SinkId,
134) -> anyhow::Result<
135 Vec<(
136 u64,
137 pending_sink_state::SinkState,
138 Option<Vec<u8>>,
139 Option<PbSinkSchemaChange>,
140 )>,
141> {
142 let rows: Vec<PendingSinkStateRow> = match pending_sink_state::Entity::find()
143 .select_only()
144 .columns([
145 pending_sink_state::Column::Epoch,
146 pending_sink_state::Column::SinkState,
147 pending_sink_state::Column::Metadata,
148 pending_sink_state::Column::SchemaChange,
149 ])
150 .filter(pending_sink_state::Column::SinkId.eq(sink_id))
151 .order_by(pending_sink_state::Column::Epoch, Order::Asc)
152 .into_tuple()
153 .all(db)
154 .await
155 {
156 Ok(rows) => rows,
157 Err(e) => {
158 tracing::error!("Error querying pending sink states: {:?}", e.as_report());
159 return Err(e.into());
160 }
161 };
162
163 Ok(rows
164 .into_iter()
165 .map(|(epoch, state, metadata, schema_change)| {
166 (
167 epoch as u64,
168 state,
169 metadata,
170 schema_change.map(|v| v.to_protobuf()),
171 )
172 })
173 .collect())
174}