risingwave_connector/sink/iceberg/
exactly_once_util.rs1use risingwave_common::id::SinkId;
16use risingwave_meta_model::exactly_once_iceberg_sink::{self, Column, Entity, Model};
17use sea_orm::{
18 ColumnTrait, DatabaseConnection, EntityTrait, Order, PaginatorTrait, QueryFilter, QueryOrder,
19 Set,
20};
21use thiserror_ext::AsReport;
22
23use crate::sink::Result;
24
25pub async fn persist_pre_commit_metadata(
29 sink_id: SinkId,
30 db: DatabaseConnection,
31 start_epoch: u64,
32 end_epoch: u64,
33 pre_commit_metadata: Vec<u8>,
34 snapshot_id: i64,
35) -> Result<()> {
36 let m = exactly_once_iceberg_sink::ActiveModel {
37 sink_id: Set(sink_id),
38 end_epoch: Set(end_epoch.try_into().unwrap()),
39 start_epoch: Set(start_epoch.try_into().unwrap()),
40 metadata: Set(pre_commit_metadata),
41 committed: Set(false),
42 snapshot_id: Set(snapshot_id),
43 };
44 match exactly_once_iceberg_sink::Entity::insert(m).exec(&db).await {
45 Ok(_) => Ok(()),
46 Err(e) => {
47 tracing::error!("Error inserting into system table: {:?}", e.as_report());
48 Err(e.into())
49 }
50 }
51}
52
53pub async fn mark_row_is_committed_by_sink_id_and_end_epoch(
54 db: &DatabaseConnection,
55 sink_id: SinkId,
56 end_epoch: u64,
57) -> Result<()> {
58 match Entity::update(exactly_once_iceberg_sink::ActiveModel {
59 sink_id: Set(sink_id),
60 end_epoch: Set(end_epoch.try_into().unwrap()),
61 committed: Set(true),
62 ..Default::default()
63 })
64 .exec(db)
65 .await
66 {
67 Ok(_) => {
68 tracing::info!(
69 "Sink id = {}: mark written data status to committed, end_epoch = {}.",
70 sink_id,
71 end_epoch
72 );
73 Ok(())
74 }
75 Err(e) => {
76 tracing::error!(
77 "Error marking item to committed from iceberg exactly once system table: {:?}",
78 e.as_report()
79 );
80 Err(e.into())
81 }
82 }
83}
84
85pub async fn delete_row_by_sink_id_and_end_epoch(
86 db: &DatabaseConnection,
87 sink_id: SinkId,
88 end_epoch: u64,
89) -> Result<()> {
90 let end_epoch_i64: i64 = end_epoch.try_into().unwrap();
91 match Entity::delete_many()
92 .filter(Column::SinkId.eq(sink_id))
93 .filter(Column::EndEpoch.lt(end_epoch_i64))
94 .exec(db)
95 .await
96 {
97 Ok(result) => {
98 let deleted_count = result.rows_affected;
99
100 if deleted_count == 0 {
101 tracing::info!(
102 "Sink id = {}: no item deleted in iceberg exactly once system table, end_epoch < {}.",
103 sink_id,
104 end_epoch
105 );
106 } else {
107 tracing::info!(
108 "Sink id = {}: deleted item in iceberg exactly once system table, end_epoch < {}.",
109 sink_id,
110 end_epoch
111 );
112 }
113 Ok(())
114 }
115 Err(e) => {
116 tracing::error!(
117 "Sink id = {}: error deleting from iceberg exactly once system table: {:?}",
118 sink_id,
119 e.as_report()
120 );
121 Err(e.into())
122 }
123 }
124}
125
126pub async fn iceberg_sink_has_pre_commit_metadata(
127 db: &DatabaseConnection,
128 sink_id: SinkId,
129) -> Result<bool> {
130 match exactly_once_iceberg_sink::Entity::find()
131 .filter(exactly_once_iceberg_sink::Column::SinkId.eq(sink_id))
132 .count(db)
133 .await
134 {
135 Ok(count) => Ok(count > 0),
136 Err(e) => {
137 tracing::error!(
138 "Error querying pre-commit metadata from system table: {:?}",
139 e.as_report()
140 );
141 Err(e.into())
142 }
143 }
144}
145
146pub async fn get_pre_commit_info_by_sink_id(
147 db: &DatabaseConnection,
148 sink_id: SinkId,
149) -> Result<Vec<(u64, Vec<u8>, i64, bool)>> {
150 let models: Vec<Model> = Entity::find()
151 .filter(Column::SinkId.eq(sink_id))
152 .order_by(Column::EndEpoch, Order::Asc)
153 .all(db)
154 .await?;
155
156 let mut result: Vec<(u64, Vec<u8>, i64, bool)> = Vec::new();
157
158 for model in models {
159 result.push((
160 model.end_epoch.try_into().unwrap(),
161 model.metadata,
162 model.snapshot_id,
163 model.committed,
164 ));
165 }
166
167 Ok(result)
168}