risingwave_connector/sink/elasticsearch_opensearch/
elasticsearch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::Schema;
16use tonic::async_trait;
17
18use super::super::writer::{AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriterExt};
19use super::super::{DummySinkCommitCoordinator, Sink, SinkError, SinkParam, SinkWriterParam};
20use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchSinkWriter;
21use super::elasticsearch_opensearch_config::ElasticSearchOpenSearchConfig;
22use crate::enforce_secret::EnforceSecret;
23use crate::sink::Result;
24
25pub const ES_SINK: &str = "elasticsearch";
26
27#[derive(Debug)]
28pub struct ElasticSearchSink {
29    config: ElasticSearchOpenSearchConfig,
30    schema: Schema,
31    pk_indices: Vec<usize>,
32    is_append_only: bool,
33}
34
35impl EnforceSecret for ElasticSearchSink {
36    fn enforce_secret<'a>(
37        prop_iter: impl Iterator<Item = &'a str>,
38    ) -> crate::error::ConnectorResult<()> {
39        for prop in prop_iter {
40            ElasticSearchOpenSearchConfig::enforce_one(prop)?;
41        }
42        Ok(())
43    }
44}
45
46#[async_trait]
47impl TryFrom<SinkParam> for ElasticSearchSink {
48    type Error = SinkError;
49
50    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
51        let schema = param.schema();
52        let config = ElasticSearchOpenSearchConfig::from_btreemap(param.properties)?;
53        Ok(Self {
54            config,
55            schema,
56            pk_indices: param.downstream_pk,
57            is_append_only: param.sink_type.is_append_only(),
58        })
59    }
60}
61
62impl Sink for ElasticSearchSink {
63    type Coordinator = DummySinkCommitCoordinator;
64    type LogSinker = AsyncTruncateLogSinkerOf<ElasticSearchOpenSearchSinkWriter>;
65
66    const SINK_NAME: &'static str = ES_SINK;
67
68    async fn validate(&self) -> Result<()> {
69        self.config.validate_config(&self.schema)?;
70        let client = self.config.build_client(Self::SINK_NAME)?;
71        client.ping().await?;
72        Ok(())
73    }
74
75    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
76        Ok(ElasticSearchOpenSearchSinkWriter::new(
77            self.config.clone(),
78            self.schema.clone(),
79            self.pk_indices.clone(),
80            Self::SINK_NAME,
81            self.is_append_only,
82        )?
83        .into_log_sinker(self.config.concurrent_requests))
84    }
85}