risingwave_connector/source/nats/enumerator/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use async_trait::async_trait;
18use risingwave_common::bail;
19
20use super::NatsProperties;
21use super::source::{NatsOffset, NatsSplit};
22use crate::connector_common::NatsCommon;
23use crate::error::ConnectorResult;
24use crate::source::{SourceEnumeratorContextRef, SplitEnumerator, SplitId};
25
26#[derive(Debug)]
27pub struct NatsSplitEnumerator {
28    subject: String,
29    #[expect(dead_code)]
30    split_id: SplitId,
31    /// Hold the client Arc to keep it alive. This allows the shared client cache to reuse
32    /// the connection while we're still using it.
33    client: Arc<async_nats::Client>,
34}
35
36#[async_trait]
37impl SplitEnumerator for NatsSplitEnumerator {
38    type Properties = NatsProperties;
39    type Split = NatsSplit;
40
41    async fn new(
42        properties: Self::Properties,
43        _context: SourceEnumeratorContextRef,
44    ) -> ConnectorResult<NatsSplitEnumerator> {
45        let client = properties.common.build_client().await?;
46
47        // check if the stream exists or allow create stream
48        let jetstream = NatsCommon::build_context_from_client(&client);
49        let _ = properties
50            .common
51            .build_or_get_stream(jetstream, properties.stream.clone())
52            .await?;
53        Ok(Self {
54            subject: properties.common.subject,
55            split_id: Arc::from("0"),
56            client,
57        })
58    }
59
60    async fn list_splits(&mut self) -> ConnectorResult<Vec<NatsSplit>> {
61        // Nats currently does not support list_splits API, if we simple return the default 0 without checking the client status, will result executor crash
62        let state = self.client.connection_state();
63        if state != async_nats::connection::State::Connected {
64            bail!(
65                "Nats connection status is not connected, current status is {:?}",
66                state
67            );
68        }
69        // TODO: to simplify the logic, return 1 split for first version
70        let nats_split = NatsSplit {
71            subject: self.subject.clone(),
72            split_id: Arc::from("0"), // be the same as `from_nats_jetstream_message`
73            start_sequence: NatsOffset::None,
74        };
75
76        Ok(vec![nats_split])
77    }
78}