risingwave_dml/
txn_channel.rs1use std::sync::Arc;
16
17use futures::FutureExt;
18use risingwave_common::transaction::transaction_message::TxnMsg;
19use tokio::sync::{Semaphore, mpsc, oneshot};
20
21pub struct PermitValue(u32);
22
23pub struct TxnMsgWithPermits {
24 pub txn_msg: TxnMsg,
25 pub notificator: oneshot::Sender<usize>,
26 pub permit_value: Option<PermitValue>,
27}
28
29pub fn txn_channel(max_chunk_permits: usize) -> (Sender, Receiver) {
31 let (tx, rx) = mpsc::unbounded_channel();
33
34 let records = Semaphore::new(max_chunk_permits);
35 let permits = Arc::new(Permits { records });
36
37 (
38 Sender {
39 tx,
40 permits: permits.clone(),
41 max_chunk_permits,
42 },
43 Receiver { rx, permits },
44 )
45}
46
47#[derive(Debug)]
51pub struct Permits {
52 records: Semaphore,
54}
55
56impl Permits {
57 pub fn add_permits(&self, permit_value: PermitValue) {
59 self.records.add_permits(permit_value.0 as usize)
60 }
61}
62
63#[derive(Debug, Clone)]
65pub struct Sender {
66 pub tx: mpsc::UnboundedSender<TxnMsgWithPermits>,
67 permits: Arc<Permits>,
68
69 max_chunk_permits: usize,
72}
73
74impl Sender {
75 pub async fn send(
80 &self,
81 txn_msg: TxnMsg,
82 notificator: oneshot::Sender<usize>,
83 ) -> Result<(), mpsc::error::SendError<TxnMsg>> {
84 let permits = match &txn_msg {
86 TxnMsg::Data(_, c) => {
87 let card = c.cardinality().clamp(1, self.max_chunk_permits);
88 if card == self.max_chunk_permits {
89 tracing::warn!(
90 cardinality = c.cardinality(),
91 "large chunk in transaction channel"
92 )
93 }
94 self.permits
95 .records
96 .acquire_many(card as _)
97 .await
98 .unwrap()
99 .forget();
100 Some(PermitValue(card as _))
101 }
102 TxnMsg::Begin(_) | TxnMsg::Rollback(_) | TxnMsg::End(..) => None,
103 };
104
105 self.tx
106 .send(TxnMsgWithPermits {
107 txn_msg,
108 notificator,
109 permit_value: permits,
110 })
111 .map_err(|e| mpsc::error::SendError(e.0.txn_msg))
112 }
113
114 pub fn send_immediate(
119 &self,
120 txn_msg: TxnMsg,
121 notificator: oneshot::Sender<usize>,
122 ) -> Result<(), mpsc::error::SendError<TxnMsg>> {
123 self.send(txn_msg, notificator)
124 .now_or_never()
125 .expect("cannot send immediately")
126 }
127
128 pub fn is_closed(&self) -> bool {
129 self.tx.is_closed()
130 }
131}
132
133#[derive(Debug)]
135pub struct Receiver {
136 rx: mpsc::UnboundedReceiver<TxnMsgWithPermits>,
137 permits: Arc<Permits>,
138}
139
140impl Receiver {
141 pub async fn recv(&mut self) -> Option<(TxnMsg, oneshot::Sender<usize>)> {
145 let TxnMsgWithPermits {
146 txn_msg,
147 notificator,
148 permit_value: permits,
149 } = self.rx.recv().await?;
150
151 if let Some(permits) = permits {
152 self.permits.add_permits(permits);
153 }
154
155 Some((txn_msg, notificator))
156 }
157}