Skip to main content

risingwave_meta/manager/
exactly_once_util.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_meta_model::pending_sink_state::{self};
16use risingwave_meta_model::{Epoch, SinkId, SinkSchemachange};
17use risingwave_pb::stream_plan::PbSinkSchemaChange;
18use sea_orm::{
19    ColumnTrait, DatabaseConnection, EntityTrait, Order, QueryFilter, QueryOrder, QuerySelect, Set,
20    TransactionTrait,
21};
22use thiserror_ext::AsReport;
23
24// Helpers for accessing the `pending_sink_state` system table used by exactly-once sink coordinators
25// (both the generic sink coordinator and the Iceberg V3 sink coordinator).
26
27pub async fn persist_pre_commit_metadata(
28    db: &DatabaseConnection,
29    sink_id: SinkId,
30    epoch: u64,
31    commit_metadata: Option<Vec<u8>>,
32    schema_change: Option<&PbSinkSchemaChange>,
33) -> anyhow::Result<()> {
34    fail::fail_point!("iceberg_v3_persist_pre_commit_fail", |_| Err(
35        anyhow::anyhow!("injected: iceberg_v3_persist_pre_commit_fail")
36    ));
37    let schema_change = schema_change.map(Into::into);
38    let m = pending_sink_state::ActiveModel {
39        sink_id: Set(sink_id),
40        epoch: Set(epoch as Epoch),
41        sink_state: Set(pending_sink_state::SinkState::Pending),
42        metadata: Set(commit_metadata),
43        schema_change: Set(schema_change),
44    };
45    match pending_sink_state::Entity::insert(m).exec(db).await {
46        Ok(_) => Ok(()),
47        Err(e) => {
48            tracing::error!(
49                "Error inserting into exactly once system table: {:?}",
50                e.as_report()
51            );
52            Err(e.into())
53        }
54    }
55}
56
57pub async fn commit_and_prune_epoch(
58    db: &DatabaseConnection,
59    sink_id: SinkId,
60    epoch: u64,
61    prev_epoch: Option<u64>,
62) -> anyhow::Result<()> {
63    fail::fail_point!("iceberg_v3_commit_prune_fail", |_| Err(anyhow::anyhow!(
64        "injected: iceberg_v3_commit_prune_fail"
65    )));
66    let txn = db.begin().await?;
67    pending_sink_state::Entity::update(pending_sink_state::ActiveModel {
68        sink_id: Set(sink_id),
69        epoch: Set(epoch as Epoch),
70        sink_state: Set(pending_sink_state::SinkState::Committed),
71        ..Default::default()
72    })
73    .exec(&txn)
74    .await?;
75
76    if let Some(prev_epoch) = prev_epoch {
77        pending_sink_state::Entity::delete_many()
78            .filter(
79                pending_sink_state::Column::SinkId
80                    .eq(sink_id)
81                    .and(pending_sink_state::Column::Epoch.eq(prev_epoch as Epoch)),
82            )
83            .exec(&txn)
84            .await?;
85    }
86
87    match txn.commit().await {
88        Ok(_) => Ok(()),
89        Err(e) => {
90            tracing::error!(
91                "Error marking item to committed exactly once system table: {:?}",
92                e.as_report()
93            );
94            Err(e.into())
95        }
96    }
97}
98
99pub async fn clean_aborted_records(
100    db: &DatabaseConnection,
101    sink_id: SinkId,
102    aborted_epochs: Vec<u64>,
103) -> anyhow::Result<()> {
104    match pending_sink_state::Entity::delete_many()
105        .filter(
106            pending_sink_state::Column::SinkId
107                .eq(sink_id)
108                .and(pending_sink_state::Column::Epoch.is_in(aborted_epochs)),
109        )
110        .exec(db)
111        .await
112    {
113        Ok(_) => Ok(()),
114        Err(e) => {
115            tracing::error!(
116                "Error deleting records from exactly once system table: {:?}",
117                e.as_report()
118            );
119            Err(e.into())
120        }
121    }
122}
123
124type PendingSinkStateRow = (
125    Epoch,
126    pending_sink_state::SinkState,
127    Option<Vec<u8>>,
128    Option<SinkSchemachange>,
129);
130
131pub async fn list_sink_states_ordered_by_epoch(
132    db: &DatabaseConnection,
133    sink_id: SinkId,
134) -> anyhow::Result<
135    Vec<(
136        u64,
137        pending_sink_state::SinkState,
138        Option<Vec<u8>>,
139        Option<PbSinkSchemaChange>,
140    )>,
141> {
142    let rows: Vec<PendingSinkStateRow> = match pending_sink_state::Entity::find()
143        .select_only()
144        .columns([
145            pending_sink_state::Column::Epoch,
146            pending_sink_state::Column::SinkState,
147            pending_sink_state::Column::Metadata,
148            pending_sink_state::Column::SchemaChange,
149        ])
150        .filter(pending_sink_state::Column::SinkId.eq(sink_id))
151        .order_by(pending_sink_state::Column::Epoch, Order::Asc)
152        .into_tuple()
153        .all(db)
154        .await
155    {
156        Ok(rows) => rows,
157        Err(e) => {
158            tracing::error!("Error querying pending sink states: {:?}", e.as_report());
159            return Err(e.into());
160        }
161    };
162
163    Ok(rows
164        .into_iter()
165        .map(|(epoch, state, metadata, schema_change)| {
166            (
167                epoch as u64,
168                state,
169                metadata,
170                schema_change.map(|v| v.to_protobuf()),
171            )
172        })
173        .collect())
174}