risingwave_connector/source/nats/enumerator/
mod.rs1use std::sync::Arc;
16
17use async_trait::async_trait;
18use risingwave_common::bail;
19
20use super::NatsProperties;
21use super::source::{NatsOffset, NatsSplit};
22use crate::error::ConnectorResult;
23use crate::source::{SourceEnumeratorContextRef, SplitEnumerator, SplitId};
24
25#[derive(Debug, Clone)]
26pub struct NatsSplitEnumerator {
27 subject: String,
28 #[expect(dead_code)]
29 split_id: SplitId,
30 client: async_nats::Client,
31}
32
33#[async_trait]
34impl SplitEnumerator for NatsSplitEnumerator {
35 type Properties = NatsProperties;
36 type Split = NatsSplit;
37
38 async fn new(
39 properties: Self::Properties,
40 _context: SourceEnumeratorContextRef,
41 ) -> ConnectorResult<NatsSplitEnumerator> {
42 let client = properties.common.build_client().await?;
43
44 let jetstream = properties.common.build_context().await?;
46 let _ = properties
47 .common
48 .build_or_get_stream(jetstream, properties.stream.clone())
49 .await?;
50 Ok(Self {
51 subject: properties.common.subject,
52 split_id: Arc::from("0"),
53 client,
54 })
55 }
56
57 async fn list_splits(&mut self) -> ConnectorResult<Vec<NatsSplit>> {
58 let state = self.client.connection_state();
60 if state != async_nats::connection::State::Connected {
61 bail!(
62 "Nats connection status is not connected, current status is {:?}",
63 state
64 );
65 }
66 let nats_split = NatsSplit {
68 subject: self.subject.clone(),
69 split_id: Arc::from("0"), start_sequence: NatsOffset::None,
71 };
72
73 Ok(vec![nats_split])
74 }
75}