risingwave_dml/
txn_channel.rs1use std::sync::Arc;
16
17use futures::FutureExt;
18use risingwave_common::transaction::transaction_message::TxnMsg;
19use tokio::sync::{Semaphore, mpsc, oneshot};
20
21pub struct PermitValue(u32);
22
23pub struct TxnMsgWithPermits {
24    pub txn_msg: TxnMsg,
25    pub notificator: oneshot::Sender<usize>,
26    pub permit_value: Option<PermitValue>,
27}
28
29pub fn txn_channel(max_chunk_permits: usize) -> (Sender, Receiver) {
31    let (tx, rx) = mpsc::unbounded_channel();
33
34    let records = Semaphore::new(max_chunk_permits);
35    let permits = Arc::new(Permits { records });
36
37    (
38        Sender {
39            tx,
40            permits: permits.clone(),
41            max_chunk_permits,
42        },
43        Receiver { rx, permits },
44    )
45}
46
47#[derive(Debug)]
51pub struct Permits {
52    records: Semaphore,
54}
55
56impl Permits {
57    pub fn add_permits(&self, permit_value: PermitValue) {
59        self.records.add_permits(permit_value.0 as usize)
60    }
61}
62
63#[derive(Debug, Clone)]
65pub struct Sender {
66    pub tx: mpsc::UnboundedSender<TxnMsgWithPermits>,
67    permits: Arc<Permits>,
68
69    max_chunk_permits: usize,
72}
73
74impl Sender {
75    pub async fn send(
80        &self,
81        txn_msg: TxnMsg,
82        notificator: oneshot::Sender<usize>,
83    ) -> Result<(), mpsc::error::SendError<TxnMsg>> {
84        let permits = match &txn_msg {
86            TxnMsg::Data(_, c) => {
87                let card = c.cardinality().clamp(1, self.max_chunk_permits);
88                if card == self.max_chunk_permits {
89                    tracing::warn!(
90                        cardinality = c.cardinality(),
91                        "large chunk in transaction channel"
92                    )
93                }
94                self.permits
95                    .records
96                    .acquire_many(card as _)
97                    .await
98                    .unwrap()
99                    .forget();
100                Some(PermitValue(card as _))
101            }
102            TxnMsg::Begin(_) | TxnMsg::Rollback(_) | TxnMsg::End(..) => None,
103        };
104
105        self.tx
106            .send(TxnMsgWithPermits {
107                txn_msg,
108                notificator,
109                permit_value: permits,
110            })
111            .map_err(|e| mpsc::error::SendError(e.0.txn_msg))
112    }
113
114    pub fn send_immediate(
119        &self,
120        txn_msg: TxnMsg,
121        notificator: oneshot::Sender<usize>,
122    ) -> Result<(), mpsc::error::SendError<TxnMsg>> {
123        self.send(txn_msg, notificator)
124            .now_or_never()
125            .expect("cannot send immediately")
126    }
127
128    pub fn is_closed(&self) -> bool {
129        self.tx.is_closed()
130    }
131}
132
133#[derive(Debug)]
135pub struct Receiver {
136    rx: mpsc::UnboundedReceiver<TxnMsgWithPermits>,
137    permits: Arc<Permits>,
138}
139
140impl Receiver {
141    pub async fn recv(&mut self) -> Option<(TxnMsg, oneshot::Sender<usize>)> {
145        let TxnMsgWithPermits {
146            txn_msg,
147            notificator,
148            permit_value: permits,
149        } = self.rx.recv().await?;
150
151        if let Some(permits) = permits {
152            self.permits.add_permits(permits);
153        }
154
155        Some((txn_msg, notificator))
156    }
157}