risingwave_connector/sink/iceberg/
exactly_once_util.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::id::SinkId;
16use risingwave_meta_model::exactly_once_iceberg_sink::{self, Column, Entity, Model};
17use sea_orm::{
18    ColumnTrait, DatabaseConnection, EntityTrait, Order, PaginatorTrait, QueryFilter, QueryOrder,
19    Set,
20};
21use thiserror_ext::AsReport;
22
23use crate::sink::Result;
24
25// This file contains methods for accessing system tables in the meta store
26// after enabling exactly once semantics.
27
28pub async fn persist_pre_commit_metadata(
29    sink_id: SinkId,
30    db: DatabaseConnection,
31    start_epoch: u64,
32    end_epoch: u64,
33    pre_commit_metadata: Vec<u8>,
34    snapshot_id: i64,
35) -> Result<()> {
36    let m = exactly_once_iceberg_sink::ActiveModel {
37        sink_id: Set(sink_id),
38        end_epoch: Set(end_epoch.try_into().unwrap()),
39        start_epoch: Set(start_epoch.try_into().unwrap()),
40        metadata: Set(pre_commit_metadata),
41        committed: Set(false),
42        snapshot_id: Set(snapshot_id),
43    };
44    match exactly_once_iceberg_sink::Entity::insert(m).exec(&db).await {
45        Ok(_) => Ok(()),
46        Err(e) => {
47            tracing::error!("Error inserting into system table: {:?}", e.as_report());
48            Err(e.into())
49        }
50    }
51}
52
53pub async fn mark_row_is_committed_by_sink_id_and_end_epoch(
54    db: &DatabaseConnection,
55    sink_id: SinkId,
56    end_epoch: u64,
57) -> Result<()> {
58    match Entity::update(exactly_once_iceberg_sink::ActiveModel {
59        sink_id: Set(sink_id),
60        end_epoch: Set(end_epoch.try_into().unwrap()),
61        committed: Set(true),
62        ..Default::default()
63    })
64    .exec(db)
65    .await
66    {
67        Ok(_) => {
68            tracing::info!(
69                "Sink id = {}: mark written data status to committed, end_epoch = {}.",
70                sink_id,
71                end_epoch
72            );
73            Ok(())
74        }
75        Err(e) => {
76            tracing::error!(
77                "Error marking item to committed from iceberg exactly once system table: {:?}",
78                e.as_report()
79            );
80            Err(e.into())
81        }
82    }
83}
84
85pub async fn delete_row_by_sink_id_and_end_epoch(
86    db: &DatabaseConnection,
87    sink_id: SinkId,
88    end_epoch: u64,
89) -> Result<()> {
90    let end_epoch_i64: i64 = end_epoch.try_into().unwrap();
91    match Entity::delete_many()
92        .filter(Column::SinkId.eq(sink_id))
93        .filter(Column::EndEpoch.lt(end_epoch_i64))
94        .exec(db)
95        .await
96    {
97        Ok(result) => {
98            let deleted_count = result.rows_affected;
99
100            if deleted_count == 0 {
101                tracing::info!(
102                    "Sink id = {}: no item deleted in iceberg exactly once system table, end_epoch < {}.",
103                    sink_id,
104                    end_epoch
105                );
106            } else {
107                tracing::info!(
108                    "Sink id = {}: deleted item in iceberg exactly once system table, end_epoch < {}.",
109                    sink_id,
110                    end_epoch
111                );
112            }
113            Ok(())
114        }
115        Err(e) => {
116            tracing::error!(
117                "Sink id = {}: error deleting from iceberg exactly once system table: {:?}",
118                sink_id,
119                e.as_report()
120            );
121            Err(e.into())
122        }
123    }
124}
125
126pub async fn iceberg_sink_has_pre_commit_metadata(
127    db: &DatabaseConnection,
128    sink_id: SinkId,
129) -> Result<bool> {
130    match exactly_once_iceberg_sink::Entity::find()
131        .filter(exactly_once_iceberg_sink::Column::SinkId.eq(sink_id))
132        .count(db)
133        .await
134    {
135        Ok(count) => Ok(count > 0),
136        Err(e) => {
137            tracing::error!(
138                "Error querying pre-commit metadata from system table: {:?}",
139                e.as_report()
140            );
141            Err(e.into())
142        }
143    }
144}
145
146pub async fn get_pre_commit_info_by_sink_id(
147    db: &DatabaseConnection,
148    sink_id: SinkId,
149) -> Result<Vec<(u64, Vec<u8>, i64, bool)>> {
150    let models: Vec<Model> = Entity::find()
151        .filter(Column::SinkId.eq(sink_id))
152        .order_by(Column::EndEpoch, Order::Asc)
153        .all(db)
154        .await?;
155
156    let mut result: Vec<(u64, Vec<u8>, i64, bool)> = Vec::new();
157
158    for model in models {
159        result.push((
160            model.end_epoch.try_into().unwrap(),
161            model.metadata,
162            model.snapshot_id,
163            model.committed,
164        ));
165    }
166
167    Ok(result)
168}