risingwave_connector/sink/elasticsearch_opensearch/
opensearch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_common::catalog::Schema;
17use risingwave_common::session_config::sink_decouple::SinkDecouple;
18use tonic::async_trait;
19
20use super::super::writer::{AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriterExt};
21use super::super::{Sink, SinkError, SinkParam, SinkWriterParam};
22use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchSinkWriter;
23use super::elasticsearch_opensearch_config::{ElasticSearchOpenSearchConfig, OpenSearchConfig};
24use crate::enforce_secret::EnforceSecret;
25use crate::sink::Result;
26
27pub const OPENSEARCH_SINK: &str = "opensearch";
28
29#[derive(Debug)]
30pub struct OpenSearchSink {
31    config: ElasticSearchOpenSearchConfig,
32    schema: Schema,
33    pk_indices: Vec<usize>,
34    is_append_only: bool,
35}
36
37impl EnforceSecret for OpenSearchSink {
38    fn enforce_secret<'a>(
39        prop_iter: impl Iterator<Item = &'a str>,
40    ) -> crate::error::ConnectorResult<()> {
41        for prop in prop_iter {
42            ElasticSearchOpenSearchConfig::enforce_one(prop)?;
43        }
44        Ok(())
45    }
46}
47#[async_trait]
48impl TryFrom<SinkParam> for OpenSearchSink {
49    type Error = SinkError;
50
51    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
52        let schema = param.schema();
53        let config = OpenSearchConfig::from_btreemap(param.properties)?.inner;
54        Ok(Self {
55            config,
56            schema,
57            pk_indices: param.downstream_pk,
58            is_append_only: param.sink_type.is_append_only(),
59        })
60    }
61}
62
63impl Sink for OpenSearchSink {
64    type LogSinker = AsyncTruncateLogSinkerOf<ElasticSearchOpenSearchSinkWriter>;
65
66    const SINK_NAME: &'static str = OPENSEARCH_SINK;
67
68    async fn validate(&self) -> Result<()> {
69        risingwave_common::license::Feature::OpenSearchSink
70            .check_available()
71            .map_err(|e| anyhow::anyhow!(e))?;
72        self.config.validate_config(&self.schema)?;
73        let client = self.config.build_client(Self::SINK_NAME)?;
74        client.ping().await?;
75        Ok(())
76    }
77
78    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
79        Ok(ElasticSearchOpenSearchSinkWriter::new(
80            self.config.clone(),
81            self.schema.clone(),
82            self.pk_indices.clone(),
83            Self::SINK_NAME,
84            self.is_append_only,
85        )?
86        .into_log_sinker(self.config.concurrent_requests))
87    }
88
89    fn set_default_commit_checkpoint_interval(
90        desc: &mut crate::sink::catalog::desc::SinkDesc,
91        user_specified: &risingwave_common::session_config::sink_decouple::SinkDecouple,
92    ) -> Result<()> {
93        if crate::sink::is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
94            match desc
95                .properties
96                .get(crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL)
97            {
98                Some(commit_checkpoint_interval) => {
99                    let commit_checkpoint_interval = commit_checkpoint_interval
100                        .parse::<u64>()
101                        .map_err(|e| SinkError::Config(anyhow!(e)))?;
102                    if std::matches!(user_specified, SinkDecouple::Disable)
103                        && commit_checkpoint_interval > 1
104                    {
105                        return Err(SinkError::Config(anyhow!(
106                            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
107                        )));
108                    }
109                }
110                None => match user_specified {
111                    risingwave_common::session_config::sink_decouple::SinkDecouple::Default
112                    | risingwave_common::session_config::sink_decouple::SinkDecouple::Enable => {
113                        desc.properties.insert(
114                            crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL.to_owned(),
115                            crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
116                        );
117                    }
118                    risingwave_common::session_config::sink_decouple::SinkDecouple::Disable => {
119                        desc.properties.insert(
120                            crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL.to_owned(),
121                            crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
122                        );
123                    }
124                },
125            }
126        }
127        Ok(())
128    }
129
130    fn is_sink_decouple(
131        user_specified: &risingwave_common::session_config::sink_decouple::SinkDecouple,
132    ) -> Result<bool> {
133        match user_specified {
134            risingwave_common::session_config::sink_decouple::SinkDecouple::Default
135            | risingwave_common::session_config::sink_decouple::SinkDecouple::Enable => Ok(true),
136            risingwave_common::session_config::sink_decouple::SinkDecouple::Disable => Ok(false),
137        }
138    }
139}