risingwave_connector/sink/iceberg/
exactly_once_util.rs1use risingwave_meta_model::exactly_once_iceberg_sink::{self, Column, Entity, Model};
16use sea_orm::{
17 ColumnTrait, DatabaseConnection, EntityTrait, Order, PaginatorTrait, QueryFilter, QueryOrder,
18 Set,
19};
20use thiserror_ext::AsReport;
21
22use crate::sink::Result;
23
24pub async fn persist_pre_commit_metadata(
28 sink_id: u32,
29 db: DatabaseConnection,
30 start_epoch: u64,
31 end_epoch: u64,
32 pre_commit_metadata: Vec<u8>,
33 snapshot_id: i64,
34) -> Result<()> {
35 let m = exactly_once_iceberg_sink::ActiveModel {
36 sink_id: Set(sink_id as i32),
37 end_epoch: Set(end_epoch.try_into().unwrap()),
38 start_epoch: Set(start_epoch.try_into().unwrap()),
39 metadata: Set(pre_commit_metadata),
40 committed: Set(false),
41 snapshot_id: Set(snapshot_id),
42 };
43 match exactly_once_iceberg_sink::Entity::insert(m).exec(&db).await {
44 Ok(_) => Ok(()),
45 Err(e) => {
46 tracing::error!("Error inserting into system table: {:?}", e.as_report());
47 Err(e.into())
48 }
49 }
50}
51
52pub async fn mark_row_is_committed_by_sink_id_and_end_epoch(
53 db: &DatabaseConnection,
54 sink_id: u32,
55 end_epoch: u64,
56) -> Result<()> {
57 match Entity::update(exactly_once_iceberg_sink::ActiveModel {
58 sink_id: Set(sink_id as i32),
59 end_epoch: Set(end_epoch.try_into().unwrap()),
60 committed: Set(true),
61 ..Default::default()
62 })
63 .exec(db)
64 .await
65 {
66 Ok(_) => {
67 tracing::info!(
68 "Sink id = {}: mark written data status to committed, end_epoch = {}.",
69 sink_id,
70 end_epoch
71 );
72 Ok(())
73 }
74 Err(e) => {
75 tracing::error!(
76 "Error marking item to committed from iceberg exactly once system table: {:?}",
77 e.as_report()
78 );
79 Err(e.into())
80 }
81 }
82}
83
84pub async fn delete_row_by_sink_id_and_end_epoch(
85 db: &DatabaseConnection,
86 sink_id: u32,
87 end_epoch: u64,
88) -> Result<()> {
89 let end_epoch_i64: i64 = end_epoch.try_into().unwrap();
90 match Entity::delete_many()
91 .filter(Column::SinkId.eq(sink_id))
92 .filter(Column::EndEpoch.lt(end_epoch_i64))
93 .exec(db)
94 .await
95 {
96 Ok(result) => {
97 let deleted_count = result.rows_affected;
98
99 if deleted_count == 0 {
100 tracing::info!(
101 "Sink id = {}: no item deleted in iceberg exactly once system table, end_epoch < {}.",
102 sink_id,
103 end_epoch
104 );
105 } else {
106 tracing::info!(
107 "Sink id = {}: deleted item in iceberg exactly once system table, end_epoch < {}.",
108 sink_id,
109 end_epoch
110 );
111 }
112 Ok(())
113 }
114 Err(e) => {
115 tracing::error!(
116 "Sink id = {}: error deleting from iceberg exactly once system table: {:?}",
117 sink_id,
118 e.as_report()
119 );
120 Err(e.into())
121 }
122 }
123}
124
125pub async fn iceberg_sink_has_pre_commit_metadata(
126 db: &DatabaseConnection,
127 sink_id: u32,
128) -> Result<bool> {
129 match exactly_once_iceberg_sink::Entity::find()
130 .filter(exactly_once_iceberg_sink::Column::SinkId.eq(sink_id as i32))
131 .count(db)
132 .await
133 {
134 Ok(count) => Ok(count > 0),
135 Err(e) => {
136 tracing::error!(
137 "Error querying pre-commit metadata from system table: {:?}",
138 e.as_report()
139 );
140 Err(e.into())
141 }
142 }
143}
144
145pub async fn get_pre_commit_info_by_sink_id(
146 db: &DatabaseConnection,
147 sink_id: u32,
148) -> Result<Vec<(u64, Vec<u8>, i64, bool)>> {
149 let models: Vec<Model> = Entity::find()
150 .filter(Column::SinkId.eq(sink_id as i32))
151 .order_by(Column::EndEpoch, Order::Asc)
152 .all(db)
153 .await?;
154
155 let mut result: Vec<(u64, Vec<u8>, i64, bool)> = Vec::new();
156
157 for model in models {
158 result.push((
159 model.end_epoch.try_into().unwrap(),
160 model.metadata,
161 model.snapshot_id,
162 model.committed,
163 ));
164 }
165
166 Ok(result)
167}