risingwave_meta/manager/sink_coordination/
exactly_once_util.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_meta_model::pending_sink_state::{self};
16use risingwave_meta_model::{Epoch, SinkId, SinkSchemachange};
17use risingwave_pb::stream_plan::PbSinkSchemaChange;
18use sea_orm::{
19    ColumnTrait, DatabaseConnection, EntityTrait, Order, QueryFilter, QueryOrder, QuerySelect, Set,
20    TransactionTrait,
21};
22use thiserror_ext::AsReport;
23
24// This file contains methods for accessing system tables in the meta store with two-phase commit sink support.
25
26pub async fn persist_pre_commit_metadata(
27    db: &DatabaseConnection,
28    sink_id: SinkId,
29    epoch: u64,
30    commit_metadata: Option<Vec<u8>>,
31    schema_change: Option<&PbSinkSchemaChange>,
32) -> anyhow::Result<()> {
33    let schema_change = schema_change.map(Into::into);
34    let m = pending_sink_state::ActiveModel {
35        sink_id: Set(sink_id),
36        epoch: Set(epoch as Epoch),
37        sink_state: Set(pending_sink_state::SinkState::Pending),
38        metadata: Set(commit_metadata),
39        schema_change: Set(schema_change),
40    };
41    match pending_sink_state::Entity::insert(m).exec(db).await {
42        Ok(_) => Ok(()),
43        Err(e) => {
44            tracing::error!(
45                "Error inserting into exactly once system table: {:?}",
46                e.as_report()
47            );
48            Err(e.into())
49        }
50    }
51}
52
53pub async fn commit_and_prune_epoch(
54    db: &DatabaseConnection,
55    sink_id: SinkId,
56    epoch: u64,
57    prev_epoch: Option<u64>,
58) -> anyhow::Result<()> {
59    let txn = db.begin().await?;
60    pending_sink_state::Entity::update(pending_sink_state::ActiveModel {
61        sink_id: Set(sink_id),
62        epoch: Set(epoch as Epoch),
63        sink_state: Set(pending_sink_state::SinkState::Committed),
64        ..Default::default()
65    })
66    .exec(&txn)
67    .await?;
68
69    if let Some(prev_epoch) = prev_epoch {
70        pending_sink_state::Entity::delete_many()
71            .filter(
72                pending_sink_state::Column::SinkId
73                    .eq(sink_id)
74                    .and(pending_sink_state::Column::Epoch.eq(prev_epoch as Epoch)),
75            )
76            .exec(&txn)
77            .await?;
78    }
79
80    match txn.commit().await {
81        Ok(_) => Ok(()),
82        Err(e) => {
83            tracing::error!(
84                "Error marking item to committed exactly once system table: {:?}",
85                e.as_report()
86            );
87            Err(e.into())
88        }
89    }
90}
91
92pub async fn clean_aborted_records(
93    db: &DatabaseConnection,
94    sink_id: SinkId,
95    aborted_epochs: Vec<u64>,
96) -> anyhow::Result<()> {
97    match pending_sink_state::Entity::delete_many()
98        .filter(
99            pending_sink_state::Column::SinkId
100                .eq(sink_id)
101                .and(pending_sink_state::Column::Epoch.is_in(aborted_epochs)),
102        )
103        .exec(db)
104        .await
105    {
106        Ok(_) => Ok(()),
107        Err(e) => {
108            tracing::error!(
109                "Error deleting records from exactly once system table: {:?}",
110                e.as_report()
111            );
112            Err(e.into())
113        }
114    }
115}
116
117type PendingSinkStateRow = (
118    Epoch,
119    pending_sink_state::SinkState,
120    Option<Vec<u8>>,
121    Option<SinkSchemachange>,
122);
123
124pub async fn list_sink_states_ordered_by_epoch(
125    db: &DatabaseConnection,
126    sink_id: SinkId,
127) -> anyhow::Result<
128    Vec<(
129        u64,
130        pending_sink_state::SinkState,
131        Option<Vec<u8>>,
132        Option<PbSinkSchemaChange>,
133    )>,
134> {
135    let rows: Vec<PendingSinkStateRow> = match pending_sink_state::Entity::find()
136        .select_only()
137        .columns([
138            pending_sink_state::Column::Epoch,
139            pending_sink_state::Column::SinkState,
140            pending_sink_state::Column::Metadata,
141            pending_sink_state::Column::SchemaChange,
142        ])
143        .filter(pending_sink_state::Column::SinkId.eq(sink_id))
144        .order_by(pending_sink_state::Column::Epoch, Order::Asc)
145        .into_tuple()
146        .all(db)
147        .await
148    {
149        Ok(rows) => rows,
150        Err(e) => {
151            tracing::error!("Error querying pending sink states: {:?}", e.as_report());
152            return Err(e.into());
153        }
154    };
155
156    Ok(rows
157        .into_iter()
158        .map(|(epoch, state, metadata, schema_change)| {
159            (
160                epoch as u64,
161                state,
162                metadata,
163                schema_change.map(|v| v.to_protobuf()),
164            )
165        })
166        .collect())
167}