1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
1415use std::sync::Arc;
1617use async_trait::async_trait;
18use risingwave_common::bail;
1920use super::NatsProperties;
21use super::source::{NatsOffset, NatsSplit};
22use crate::error::ConnectorResult;
23use crate::source::{SourceEnumeratorContextRef, SplitEnumerator, SplitId};
2425#[derive(Debug, Clone)]
26pub struct NatsSplitEnumerator {
27 subject: String,
28#[expect(dead_code)]
29split_id: SplitId,
30 client: async_nats::Client,
31}
3233#[async_trait]
34impl SplitEnumerator for NatsSplitEnumerator {
35type Properties = NatsProperties;
36type Split = NatsSplit;
3738async fn new(
39 properties: Self::Properties,
40 _context: SourceEnumeratorContextRef,
41 ) -> ConnectorResult<NatsSplitEnumerator> {
42let client = properties.common.build_client().await?;
43Ok(Self {
44 subject: properties.common.subject,
45 split_id: Arc::from("0"),
46 client,
47 })
48 }
4950async fn list_splits(&mut self) -> ConnectorResult<Vec<NatsSplit>> {
51// Nats currently does not support list_splits API, if we simple return the default 0 without checking the client status, will result executor crash
52let state = self.client.connection_state();
53if state != async_nats::connection::State::Connected {
54bail!(
55"Nats connection status is not connected, current status is {:?}",
56 state
57 );
58 }
59// TODO: to simplify the logic, return 1 split for first version
60let nats_split = NatsSplit {
61 subject: self.subject.clone(),
62 split_id: Arc::from("0"), // be the same as `from_nats_jetstream_message`
63start_sequence: NatsOffset::None,
64 };
6566Ok(vec![nats_split])
67 }
68}