risingwave_connector/sink/elasticsearch_opensearch/
opensearch.rs1use anyhow::anyhow;
16use risingwave_common::catalog::Schema;
17use risingwave_common::session_config::sink_decouple::SinkDecouple;
18use tonic::async_trait;
19
20use super::super::writer::{AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriterExt};
21use super::super::{Sink, SinkError, SinkParam, SinkWriterParam};
22use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchSinkWriter;
23use super::elasticsearch_opensearch_config::{ElasticSearchOpenSearchConfig, OpenSearchConfig};
24use crate::enforce_secret::EnforceSecret;
25use crate::sink::Result;
26
27pub const OPENSEARCH_SINK: &str = "opensearch";
28
29#[derive(Debug)]
30pub struct OpenSearchSink {
31 config: ElasticSearchOpenSearchConfig,
32 schema: Schema,
33 pk_indices: Vec<usize>,
34 is_append_only: bool,
35}
36
37impl EnforceSecret for OpenSearchSink {
38 fn enforce_secret<'a>(
39 prop_iter: impl Iterator<Item = &'a str>,
40 ) -> crate::error::ConnectorResult<()> {
41 for prop in prop_iter {
42 ElasticSearchOpenSearchConfig::enforce_one(prop)?;
43 }
44 Ok(())
45 }
46}
47#[async_trait]
48impl TryFrom<SinkParam> for OpenSearchSink {
49 type Error = SinkError;
50
51 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
52 let schema = param.schema();
53 let config = OpenSearchConfig::from_btreemap(param.properties)?.inner;
54 Ok(Self {
55 config,
56 schema,
57 pk_indices: param.downstream_pk,
58 is_append_only: param.sink_type.is_append_only(),
59 })
60 }
61}
62
63impl Sink for OpenSearchSink {
64 type LogSinker = AsyncTruncateLogSinkerOf<ElasticSearchOpenSearchSinkWriter>;
65
66 const SINK_NAME: &'static str = OPENSEARCH_SINK;
67
68 async fn validate(&self) -> Result<()> {
69 risingwave_common::license::Feature::OpenSearchSink
70 .check_available()
71 .map_err(|e| anyhow::anyhow!(e))?;
72 self.config.validate_config(&self.schema)?;
73 let client = self.config.build_client(Self::SINK_NAME)?;
74 client.ping().await?;
75 Ok(())
76 }
77
78 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
79 Ok(ElasticSearchOpenSearchSinkWriter::new(
80 self.config.clone(),
81 self.schema.clone(),
82 self.pk_indices.clone(),
83 Self::SINK_NAME,
84 self.is_append_only,
85 )?
86 .into_log_sinker(self.config.concurrent_requests))
87 }
88
89 fn set_default_commit_checkpoint_interval(
90 desc: &mut crate::sink::catalog::desc::SinkDesc,
91 user_specified: &risingwave_common::session_config::sink_decouple::SinkDecouple,
92 ) -> Result<()> {
93 if crate::sink::is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
94 match desc
95 .properties
96 .get(crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL)
97 {
98 Some(commit_checkpoint_interval) => {
99 let commit_checkpoint_interval = commit_checkpoint_interval
100 .parse::<u64>()
101 .map_err(|e| SinkError::Config(anyhow!(e)))?;
102 if std::matches!(user_specified, SinkDecouple::Disable)
103 && commit_checkpoint_interval > 1
104 {
105 return Err(SinkError::Config(anyhow!(
106 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
107 )));
108 }
109 }
110 None => match user_specified {
111 risingwave_common::session_config::sink_decouple::SinkDecouple::Default
112 | risingwave_common::session_config::sink_decouple::SinkDecouple::Enable => {
113 desc.properties.insert(
114 crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL.to_owned(),
115 crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
116 );
117 }
118 risingwave_common::session_config::sink_decouple::SinkDecouple::Disable => {
119 desc.properties.insert(
120 crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL.to_owned(),
121 crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
122 );
123 }
124 },
125 }
126 }
127 Ok(())
128 }
129
130 fn is_sink_decouple(
131 user_specified: &risingwave_common::session_config::sink_decouple::SinkDecouple,
132 ) -> Result<bool> {
133 match user_specified {
134 risingwave_common::session_config::sink_decouple::SinkDecouple::Default
135 | risingwave_common::session_config::sink_decouple::SinkDecouple::Enable => Ok(true),
136 risingwave_common::session_config::sink_decouple::SinkDecouple::Disable => Ok(false),
137 }
138 }
139}