risingwave_connector/source/nats/enumerator/
mod.rs1use std::sync::Arc;
16
17use async_trait::async_trait;
18use risingwave_common::bail;
19
20use super::NatsProperties;
21use super::source::{NatsOffset, NatsSplit};
22use crate::connector_common::NatsCommon;
23use crate::error::ConnectorResult;
24use crate::source::{SourceEnumeratorContextRef, SplitEnumerator, SplitId};
25
26#[derive(Debug)]
27pub struct NatsSplitEnumerator {
28 subject: String,
29 #[expect(dead_code)]
30 split_id: SplitId,
31 client: Arc<async_nats::Client>,
34}
35
36#[async_trait]
37impl SplitEnumerator for NatsSplitEnumerator {
38 type Properties = NatsProperties;
39 type Split = NatsSplit;
40
41 async fn new(
42 properties: Self::Properties,
43 _context: SourceEnumeratorContextRef,
44 ) -> ConnectorResult<NatsSplitEnumerator> {
45 let client = properties.common.build_client().await?;
46
47 let jetstream = NatsCommon::build_context_from_client(&client);
49 let _ = properties
50 .common
51 .build_or_get_stream(jetstream, properties.stream.clone())
52 .await?;
53 Ok(Self {
54 subject: properties.common.subject,
55 split_id: Arc::from("0"),
56 client,
57 })
58 }
59
60 async fn list_splits(&mut self) -> ConnectorResult<Vec<NatsSplit>> {
61 let state = self.client.connection_state();
63 if state != async_nats::connection::State::Connected {
64 bail!(
65 "Nats connection status is not connected, current status is {:?}",
66 state
67 );
68 }
69 let nats_split = NatsSplit {
71 subject: self.subject.clone(),
72 split_id: Arc::from("0"), start_sequence: NatsOffset::None,
74 };
75
76 Ok(vec![nats_split])
77 }
78}