risingwave_meta/manager/sink_coordination/
exactly_once_util.rs1use risingwave_meta_model::pending_sink_state::{self};
16use risingwave_meta_model::{Epoch, SinkId, SinkSchemachange};
17use risingwave_pb::stream_plan::PbSinkSchemaChange;
18use sea_orm::{
19 ColumnTrait, DatabaseConnection, EntityTrait, Order, QueryFilter, QueryOrder, QuerySelect, Set,
20 TransactionTrait,
21};
22use thiserror_ext::AsReport;
23
24pub async fn persist_pre_commit_metadata(
27 db: &DatabaseConnection,
28 sink_id: SinkId,
29 epoch: u64,
30 commit_metadata: Option<Vec<u8>>,
31 schema_change: Option<&PbSinkSchemaChange>,
32) -> anyhow::Result<()> {
33 let schema_change = schema_change.map(Into::into);
34 let m = pending_sink_state::ActiveModel {
35 sink_id: Set(sink_id),
36 epoch: Set(epoch as Epoch),
37 sink_state: Set(pending_sink_state::SinkState::Pending),
38 metadata: Set(commit_metadata),
39 schema_change: Set(schema_change),
40 };
41 match pending_sink_state::Entity::insert(m).exec(db).await {
42 Ok(_) => Ok(()),
43 Err(e) => {
44 tracing::error!(
45 "Error inserting into exactly once system table: {:?}",
46 e.as_report()
47 );
48 Err(e.into())
49 }
50 }
51}
52
53pub async fn commit_and_prune_epoch(
54 db: &DatabaseConnection,
55 sink_id: SinkId,
56 epoch: u64,
57 prev_epoch: Option<u64>,
58) -> anyhow::Result<()> {
59 let txn = db.begin().await?;
60 pending_sink_state::Entity::update(pending_sink_state::ActiveModel {
61 sink_id: Set(sink_id),
62 epoch: Set(epoch as Epoch),
63 sink_state: Set(pending_sink_state::SinkState::Committed),
64 ..Default::default()
65 })
66 .exec(&txn)
67 .await?;
68
69 if let Some(prev_epoch) = prev_epoch {
70 pending_sink_state::Entity::delete_many()
71 .filter(
72 pending_sink_state::Column::SinkId
73 .eq(sink_id)
74 .and(pending_sink_state::Column::Epoch.eq(prev_epoch as Epoch)),
75 )
76 .exec(&txn)
77 .await?;
78 }
79
80 match txn.commit().await {
81 Ok(_) => Ok(()),
82 Err(e) => {
83 tracing::error!(
84 "Error marking item to committed exactly once system table: {:?}",
85 e.as_report()
86 );
87 Err(e.into())
88 }
89 }
90}
91
92pub async fn clean_aborted_records(
93 db: &DatabaseConnection,
94 sink_id: SinkId,
95 aborted_epochs: Vec<u64>,
96) -> anyhow::Result<()> {
97 match pending_sink_state::Entity::delete_many()
98 .filter(
99 pending_sink_state::Column::SinkId
100 .eq(sink_id)
101 .and(pending_sink_state::Column::Epoch.is_in(aborted_epochs)),
102 )
103 .exec(db)
104 .await
105 {
106 Ok(_) => Ok(()),
107 Err(e) => {
108 tracing::error!(
109 "Error deleting records from exactly once system table: {:?}",
110 e.as_report()
111 );
112 Err(e.into())
113 }
114 }
115}
116
117type PendingSinkStateRow = (
118 Epoch,
119 pending_sink_state::SinkState,
120 Option<Vec<u8>>,
121 Option<SinkSchemachange>,
122);
123
124pub async fn list_sink_states_ordered_by_epoch(
125 db: &DatabaseConnection,
126 sink_id: SinkId,
127) -> anyhow::Result<
128 Vec<(
129 u64,
130 pending_sink_state::SinkState,
131 Option<Vec<u8>>,
132 Option<PbSinkSchemaChange>,
133 )>,
134> {
135 let rows: Vec<PendingSinkStateRow> = match pending_sink_state::Entity::find()
136 .select_only()
137 .columns([
138 pending_sink_state::Column::Epoch,
139 pending_sink_state::Column::SinkState,
140 pending_sink_state::Column::Metadata,
141 pending_sink_state::Column::SchemaChange,
142 ])
143 .filter(pending_sink_state::Column::SinkId.eq(sink_id))
144 .order_by(pending_sink_state::Column::Epoch, Order::Asc)
145 .into_tuple()
146 .all(db)
147 .await
148 {
149 Ok(rows) => rows,
150 Err(e) => {
151 tracing::error!("Error querying pending sink states: {:?}", e.as_report());
152 return Err(e.into());
153 }
154 };
155
156 Ok(rows
157 .into_iter()
158 .map(|(epoch, state, metadata, schema_change)| {
159 (
160 epoch as u64,
161 state,
162 metadata,
163 schema_change.map(|v| v.to_protobuf()),
164 )
165 })
166 .collect())
167}