risingwave_connector/source/nats/enumerator/
mod.rs1use std::sync::Arc;
16
17use async_trait::async_trait;
18use risingwave_common::bail;
19
20use super::NatsProperties;
21use super::source::{NatsOffset, NatsSplit};
22use crate::error::ConnectorResult;
23use crate::source::{SourceEnumeratorContextRef, SplitEnumerator, SplitId};
24
25#[derive(Debug, Clone)]
26pub struct NatsSplitEnumerator {
27 subject: String,
28 #[expect(dead_code)]
29 split_id: SplitId,
30 client: async_nats::Client,
31}
32
33#[async_trait]
34impl SplitEnumerator for NatsSplitEnumerator {
35 type Properties = NatsProperties;
36 type Split = NatsSplit;
37
38 async fn new(
39 properties: Self::Properties,
40 _context: SourceEnumeratorContextRef,
41 ) -> ConnectorResult<NatsSplitEnumerator> {
42 let client = properties.common.build_client().await?;
43 Ok(Self {
44 subject: properties.common.subject,
45 split_id: Arc::from("0"),
46 client,
47 })
48 }
49
50 async fn list_splits(&mut self) -> ConnectorResult<Vec<NatsSplit>> {
51 let state = self.client.connection_state();
53 if state != async_nats::connection::State::Connected {
54 bail!(
55 "Nats connection status is not connected, current status is {:?}",
56 state
57 );
58 }
59 let nats_split = NatsSplit {
61 subject: self.subject.clone(),
62 split_id: Arc::from("0"), start_sequence: NatsOffset::None,
64 };
65
66 Ok(vec![nats_split])
67 }
68}