risingwave_connector/sink/iceberg/
exactly_once_util.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_meta_model::exactly_once_iceberg_sink::{self, Column, Entity, Model};
16use sea_orm::{
17    ColumnTrait, DatabaseConnection, EntityTrait, Order, PaginatorTrait, QueryFilter, QueryOrder,
18    Set,
19};
20use thiserror_ext::AsReport;
21
22use crate::sink::Result;
23
24// This file contains methods for accessing system tables in the meta store
25// after enabling exactly once semantics.
26
27pub async fn persist_pre_commit_metadata(
28    sink_id: u32,
29    db: DatabaseConnection,
30    start_epoch: u64,
31    end_epoch: u64,
32    pre_commit_metadata: Vec<u8>,
33    snapshot_id: i64,
34) -> Result<()> {
35    let m = exactly_once_iceberg_sink::ActiveModel {
36        sink_id: Set(sink_id as i32),
37        end_epoch: Set(end_epoch.try_into().unwrap()),
38        start_epoch: Set(start_epoch.try_into().unwrap()),
39        metadata: Set(pre_commit_metadata),
40        committed: Set(false),
41        snapshot_id: Set(snapshot_id),
42    };
43    match exactly_once_iceberg_sink::Entity::insert(m).exec(&db).await {
44        Ok(_) => Ok(()),
45        Err(e) => {
46            tracing::error!("Error inserting into system table: {:?}", e.as_report());
47            Err(e.into())
48        }
49    }
50}
51
52pub async fn mark_row_is_committed_by_sink_id_and_end_epoch(
53    db: &DatabaseConnection,
54    sink_id: u32,
55    end_epoch: u64,
56) -> Result<()> {
57    match Entity::update(exactly_once_iceberg_sink::ActiveModel {
58        sink_id: Set(sink_id as i32),
59        end_epoch: Set(end_epoch.try_into().unwrap()),
60        committed: Set(true),
61        ..Default::default()
62    })
63    .exec(db)
64    .await
65    {
66        Ok(_) => {
67            tracing::info!(
68                "Sink id = {}: mark written data status to committed, end_epoch = {}.",
69                sink_id,
70                end_epoch
71            );
72            Ok(())
73        }
74        Err(e) => {
75            tracing::error!(
76                "Error marking item to committed from iceberg exactly once system table: {:?}",
77                e.as_report()
78            );
79            Err(e.into())
80        }
81    }
82}
83
84pub async fn delete_row_by_sink_id_and_end_epoch(
85    db: &DatabaseConnection,
86    sink_id: u32,
87    end_epoch: u64,
88) -> Result<()> {
89    let end_epoch_i64: i64 = end_epoch.try_into().unwrap();
90    match Entity::delete_many()
91        .filter(Column::SinkId.eq(sink_id))
92        .filter(Column::EndEpoch.lt(end_epoch_i64))
93        .exec(db)
94        .await
95    {
96        Ok(result) => {
97            let deleted_count = result.rows_affected;
98
99            if deleted_count == 0 {
100                tracing::info!(
101                    "Sink id = {}: no item deleted in iceberg exactly once system table, end_epoch < {}.",
102                    sink_id,
103                    end_epoch
104                );
105            } else {
106                tracing::info!(
107                    "Sink id = {}: deleted item in iceberg exactly once system table, end_epoch < {}.",
108                    sink_id,
109                    end_epoch
110                );
111            }
112            Ok(())
113        }
114        Err(e) => {
115            tracing::error!(
116                "Sink id = {}: error deleting from iceberg exactly once system table: {:?}",
117                sink_id,
118                e.as_report()
119            );
120            Err(e.into())
121        }
122    }
123}
124
125pub async fn iceberg_sink_has_pre_commit_metadata(
126    db: &DatabaseConnection,
127    sink_id: u32,
128) -> Result<bool> {
129    match exactly_once_iceberg_sink::Entity::find()
130        .filter(exactly_once_iceberg_sink::Column::SinkId.eq(sink_id as i32))
131        .count(db)
132        .await
133    {
134        Ok(count) => Ok(count > 0),
135        Err(e) => {
136            tracing::error!(
137                "Error querying pre-commit metadata from system table: {:?}",
138                e.as_report()
139            );
140            Err(e.into())
141        }
142    }
143}
144
145pub async fn get_pre_commit_info_by_sink_id(
146    db: &DatabaseConnection,
147    sink_id: u32,
148) -> Result<Vec<(u64, Vec<u8>, i64, bool)>> {
149    let models: Vec<Model> = Entity::find()
150        .filter(Column::SinkId.eq(sink_id as i32))
151        .order_by(Column::EndEpoch, Order::Asc)
152        .all(db)
153        .await?;
154
155    let mut result: Vec<(u64, Vec<u8>, i64, bool)> = Vec::new();
156
157    for model in models {
158        result.push((
159            model.end_epoch.try_into().unwrap(),
160            model.metadata,
161            model.snapshot_id,
162            model.committed,
163        ));
164    }
165
166    Ok(result)
167}