risingwave_connector/sink/elasticsearch_opensearch/
opensearch.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::anyhow;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
use tonic::async_trait;

use super::super::writer::{AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriterExt};
use super::super::{DummySinkCommitCoordinator, Sink, SinkError, SinkParam, SinkWriterParam};
use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchSinkWriter;
use super::elasticsearch_opensearch_config::ElasticSearchOpenSearchConfig;
use crate::sink::Result;

pub const OPENSEARCH_SINK: &str = "opensearch";

#[derive(Debug)]
pub struct OpenSearchSink {
    config: ElasticSearchOpenSearchConfig,
    schema: Schema,
    pk_indices: Vec<usize>,
}

#[async_trait]
impl TryFrom<SinkParam> for OpenSearchSink {
    type Error = SinkError;

    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
        let schema = param.schema();
        let config = ElasticSearchOpenSearchConfig::from_btreemap(param.properties)?;
        Ok(Self {
            config,
            schema,
            pk_indices: param.downstream_pk,
        })
    }
}

impl Sink for OpenSearchSink {
    type Coordinator = DummySinkCommitCoordinator;
    type LogSinker = AsyncTruncateLogSinkerOf<ElasticSearchOpenSearchSinkWriter>;

    const SINK_NAME: &'static str = OPENSEARCH_SINK;

    async fn validate(&self) -> Result<()> {
        risingwave_common::license::Feature::OpenSearchSink
            .check_available()
            .map_err(|e| anyhow::anyhow!(e))?;
        self.config.validate_config(&self.schema)?;
        let client = self.config.build_client(Self::SINK_NAME)?;
        client.ping().await?;
        Ok(())
    }

    async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
        Ok(ElasticSearchOpenSearchSinkWriter::new(
            self.config.clone(),
            self.schema.clone(),
            self.pk_indices.clone(),
            Self::SINK_NAME,
        )?
        .into_log_sinker(self.config.concurrent_requests))
    }

    fn set_default_commit_checkpoint_interval(
        desc: &mut crate::sink::catalog::desc::SinkDesc,
        user_specified: &risingwave_common::session_config::sink_decouple::SinkDecouple,
    ) -> Result<()> {
        if crate::sink::is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
            match desc
                .properties
                .get(crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL)
            {
                Some(commit_checkpoint_interval) => {
                    let commit_checkpoint_interval = commit_checkpoint_interval
                        .parse::<u64>()
                        .map_err(|e| SinkError::Config(anyhow!(e)))?;
                    if std::matches!(user_specified, SinkDecouple::Disable)
                        && commit_checkpoint_interval > 1
                    {
                        return Err(SinkError::Config(anyhow!("config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled")));
                    }
                }
                None => match user_specified {
                    risingwave_common::session_config::sink_decouple::SinkDecouple::Default
                    | risingwave_common::session_config::sink_decouple::SinkDecouple::Enable => {
                        desc.properties.insert(
                            crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL.to_string(),
                            crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
                        );
                    }
                    risingwave_common::session_config::sink_decouple::SinkDecouple::Disable => {
                        desc.properties.insert(
                            crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL.to_string(),
                            crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
                        );
                    }
                },
            }
        }
        Ok(())
    }

    fn is_sink_decouple(
        user_specified: &risingwave_common::session_config::sink_decouple::SinkDecouple,
    ) -> Result<bool> {
        match user_specified {
            risingwave_common::session_config::sink_decouple::SinkDecouple::Default
            | risingwave_common::session_config::sink_decouple::SinkDecouple::Enable => Ok(true),
            risingwave_common::session_config::sink_decouple::SinkDecouple::Disable => Ok(false),
        }
    }

    async fn new_coordinator(&self) -> Result<Self::Coordinator> {
        Err(SinkError::Coordinator(anyhow!("no coordinator")))
    }
}