1use core::pin::Pin;
16use core::time::Duration;
17use std::collections::{BTreeMap, HashMap, VecDeque};
18
19use anyhow::{Context, anyhow};
20use base64::Engine;
21use base64::prelude::BASE64_STANDARD;
22use futures::future::pending;
23use futures::prelude::Future;
24use futures::{Stream, StreamExt};
25use futures_async_stream::try_stream;
26use gcp_bigquery_client::Client;
27use gcp_bigquery_client::error::BQError;
28use gcp_bigquery_client::model::query_request::QueryRequest;
29use gcp_bigquery_client::model::table::Table;
30use gcp_bigquery_client::model::table_field_schema::TableFieldSchema;
31use gcp_bigquery_client::model::table_schema::TableSchema;
32use google_cloud_bigquery::grpc::apiv1::conn_pool::ConnectionManager;
33use google_cloud_gax::conn::{ConnectionOptions, Environment};
34use google_cloud_gax::grpc::Request;
35use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request::{
36 MissingValueInterpretation, ProtoData, Rows as AppendRowsRequestRows,
37};
38use google_cloud_googleapis::cloud::bigquery::storage::v1::{
39 AppendRowsRequest, AppendRowsResponse, ProtoRows, ProtoSchema,
40};
41use google_cloud_pubsub::client::google_cloud_auth;
42use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
43use phf::{Set, phf_set};
44use prost_reflect::{FieldDescriptor, MessageDescriptor};
45use prost_types::{
46 DescriptorProto, FieldDescriptorProto, FileDescriptorProto, FileDescriptorSet,
47 field_descriptor_proto,
48};
49use risingwave_common::array::{Op, StreamChunk};
50use risingwave_common::catalog::{Field, Schema};
51use risingwave_common::types::DataType;
52use serde::Deserialize;
53use serde_with::{DisplayFromStr, serde_as};
54use simd_json::prelude::ArrayTrait;
55use tokio::sync::mpsc;
56use tonic::{Response, Status, async_trait};
57use url::Url;
58use uuid::Uuid;
59use with_options::WithOptions;
60use yup_oauth2::ServiceAccountKey;
61
62use super::encoder::{ProtoEncoder, ProtoHeader, RowEncoder, SerTo};
63use super::log_store::{LogStoreReadItem, TruncateOffset};
64use super::{
65 LogSinker, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkLogReader,
66};
67use crate::aws_utils::load_file_descriptor_from_s3;
68use crate::connector_common::AwsAuthProps;
69use crate::enforce_secret::EnforceSecret;
70use crate::sink::{Result, Sink, SinkParam, SinkWriterParam};
71
72pub const BIGQUERY_SINK: &str = "bigquery";
73pub const CHANGE_TYPE: &str = "_CHANGE_TYPE";
74const DEFAULT_GRPC_CHANNEL_NUMS: usize = 4;
75const CONNECT_TIMEOUT: Option<Duration> = Some(Duration::from_secs(30));
76const CONNECTION_TIMEOUT: Option<Duration> = None;
77const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
78const MAX_ROW_SIZE: usize = 8 * 1024 * 1024;
80
81#[serde_as]
82#[derive(Deserialize, Debug, Clone, WithOptions)]
83pub struct BigQueryCommon {
84 #[serde(rename = "bigquery.local.path")]
85 pub local_path: Option<String>,
86 #[serde(rename = "bigquery.s3.path")]
87 pub s3_path: Option<String>,
88 #[serde(rename = "bigquery.project")]
89 pub project: String,
90 #[serde(rename = "bigquery.dataset")]
91 pub dataset: String,
92 #[serde(rename = "bigquery.table")]
93 pub table: String,
94 #[serde(default)] #[serde_as(as = "DisplayFromStr")]
96 pub auto_create: bool,
97 #[serde(rename = "bigquery.credentials")]
98 pub credentials: Option<String>,
99}
100
101impl EnforceSecret for BigQueryCommon {
102 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
103 "bigquery.credentials",
104 };
105}
106
107struct BigQueryFutureManager {
108 offset_queue: VecDeque<(TruncateOffset, usize)>,
115 resp_stream: Pin<Box<dyn Stream<Item = Result<()>> + Send>>,
116}
117impl BigQueryFutureManager {
118 pub fn new(
119 max_future_num: usize,
120 resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
121 ) -> Self {
122 let offset_queue = VecDeque::with_capacity(max_future_num);
123 Self {
124 offset_queue,
125 resp_stream: Box::pin(resp_stream),
126 }
127 }
128
129 pub fn add_offset(&mut self, offset: TruncateOffset, resp_num: usize) {
130 self.offset_queue.push_back((offset, resp_num));
131 }
132
133 pub async fn next_offset(&mut self) -> Result<TruncateOffset> {
134 if let Some((_offset, remaining_resp_num)) = self.offset_queue.front_mut() {
135 if *remaining_resp_num == 0 {
136 return Ok(self.offset_queue.pop_front().unwrap().0);
137 }
138 while *remaining_resp_num > 0 {
139 self.resp_stream
140 .next()
141 .await
142 .ok_or_else(|| SinkError::BigQuery(anyhow::anyhow!("end of stream")))??;
143 *remaining_resp_num -= 1;
144 }
145 Ok(self.offset_queue.pop_front().unwrap().0)
146 } else {
147 pending().await
148 }
149 }
150}
151pub struct BigQueryLogSinker {
152 writer: BigQuerySinkWriter,
153 bigquery_future_manager: BigQueryFutureManager,
154 future_num: usize,
155}
156impl BigQueryLogSinker {
157 pub fn new(
158 writer: BigQuerySinkWriter,
159 resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
160 future_num: usize,
161 ) -> Self {
162 Self {
163 writer,
164 bigquery_future_manager: BigQueryFutureManager::new(future_num, resp_stream),
165 future_num,
166 }
167 }
168}
169
170#[async_trait]
171impl LogSinker for BigQueryLogSinker {
172 async fn consume_log_and_sink(mut self, mut log_reader: impl SinkLogReader) -> Result<!> {
173 log_reader.start_from(None).await?;
174 loop {
175 tokio::select!(
176 offset = self.bigquery_future_manager.next_offset() => {
177 log_reader.truncate(offset?)?;
178 }
179 item_result = log_reader.next_item(), if self.bigquery_future_manager.offset_queue.len() <= self.future_num => {
180 let (epoch, item) = item_result?;
181 match item {
182 LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
183 let resp_num = self.writer.write_chunk(chunk)?;
184 self.bigquery_future_manager
185 .add_offset(TruncateOffset::Chunk { epoch, chunk_id },resp_num);
186 }
187 LogStoreReadItem::Barrier { .. } => {
188 self.bigquery_future_manager
189 .add_offset(TruncateOffset::Barrier { epoch },0);
190 }
191 }
192 }
193 )
194 }
195 }
196}
197
198impl BigQueryCommon {
199 async fn build_client(&self, aws_auth_props: &AwsAuthProps) -> Result<Client> {
200 let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
201
202 let service_account =
203 if let Ok(auth_json_from_base64) = BASE64_STANDARD.decode(auth_json.clone()) {
204 serde_json::from_slice::<ServiceAccountKey>(&auth_json_from_base64)
205 } else {
206 serde_json::from_str::<ServiceAccountKey>(&auth_json)
207 }
208 .map_err(|e| SinkError::BigQuery(e.into()))?;
209
210 let client: Client = Client::from_service_account_key(service_account, false)
211 .await
212 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
213 Ok(client)
214 }
215
216 async fn build_writer_client(
217 &self,
218 aws_auth_props: &AwsAuthProps,
219 ) -> Result<(StorageWriterClient, impl Stream<Item = Result<()>> + use<>)> {
220 let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
221
222 let credentials_file =
223 if let Ok(auth_json_from_base64) = BASE64_STANDARD.decode(auth_json.clone()) {
224 serde_json::from_slice::<CredentialsFile>(&auth_json_from_base64)
225 } else {
226 serde_json::from_str::<CredentialsFile>(&auth_json)
227 }
228 .map_err(|e| SinkError::BigQuery(e.into()))?;
229
230 StorageWriterClient::new(credentials_file).await
231 }
232
233 async fn get_auth_json_from_path(&self, aws_auth_props: &AwsAuthProps) -> Result<String> {
234 if let Some(credentials) = &self.credentials {
235 Ok(credentials.clone())
236 } else if let Some(local_path) = &self.local_path {
237 std::fs::read_to_string(local_path)
238 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))
239 } else if let Some(s3_path) = &self.s3_path {
240 let url =
241 Url::parse(s3_path).map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
242 let auth_vec = load_file_descriptor_from_s3(&url, aws_auth_props)
243 .await
244 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
245 Ok(String::from_utf8(auth_vec).map_err(|e| SinkError::BigQuery(e.into()))?)
246 } else {
247 Err(SinkError::BigQuery(anyhow::anyhow!(
248 "`bigquery.local.path` and `bigquery.s3.path` set at least one, configure as needed."
249 )))
250 }
251 }
252}
253
254#[serde_as]
255#[derive(Clone, Debug, Deserialize, WithOptions)]
256pub struct BigQueryConfig {
257 #[serde(flatten)]
258 pub common: BigQueryCommon,
259 #[serde(flatten)]
260 pub aws_auth_props: AwsAuthProps,
261 pub r#type: String, }
263
264impl EnforceSecret for BigQueryConfig {
265 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
266 BigQueryCommon::enforce_one(prop)?;
267 AwsAuthProps::enforce_one(prop)?;
268 Ok(())
269 }
270}
271
272impl BigQueryConfig {
273 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
274 let config =
275 serde_json::from_value::<BigQueryConfig>(serde_json::to_value(properties).unwrap())
276 .map_err(|e| SinkError::Config(anyhow!(e)))?;
277 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
278 return Err(SinkError::Config(anyhow!(
279 "`{}` must be {}, or {}",
280 SINK_TYPE_OPTION,
281 SINK_TYPE_APPEND_ONLY,
282 SINK_TYPE_UPSERT
283 )));
284 }
285 Ok(config)
286 }
287}
288
289#[derive(Debug)]
290pub struct BigQuerySink {
291 pub config: BigQueryConfig,
292 schema: Schema,
293 pk_indices: Vec<usize>,
294 is_append_only: bool,
295}
296
297impl EnforceSecret for BigQuerySink {
298 fn enforce_secret<'a>(
299 prop_iter: impl Iterator<Item = &'a str>,
300 ) -> crate::error::ConnectorResult<()> {
301 for prop in prop_iter {
302 BigQueryConfig::enforce_one(prop)?;
303 }
304 Ok(())
305 }
306}
307
308impl BigQuerySink {
309 pub fn new(
310 config: BigQueryConfig,
311 schema: Schema,
312 pk_indices: Vec<usize>,
313 is_append_only: bool,
314 ) -> Result<Self> {
315 Ok(Self {
316 config,
317 schema,
318 pk_indices,
319 is_append_only,
320 })
321 }
322}
323
324impl BigQuerySink {
325 fn check_column_name_and_type(
326 &self,
327 big_query_columns_desc: HashMap<String, String>,
328 ) -> Result<()> {
329 let rw_fields_name = self.schema.fields();
330 if big_query_columns_desc.is_empty() {
331 return Err(SinkError::BigQuery(anyhow::anyhow!(
332 "Cannot find table in bigquery"
333 )));
334 }
335 if rw_fields_name.len().ne(&big_query_columns_desc.len()) {
336 return Err(SinkError::BigQuery(anyhow::anyhow!(
337 "The length of the RisingWave column {} must be equal to the length of the bigquery column {}",
338 rw_fields_name.len(),
339 big_query_columns_desc.len()
340 )));
341 }
342
343 for i in rw_fields_name {
344 let value = big_query_columns_desc.get(&i.name).ok_or_else(|| {
345 SinkError::BigQuery(anyhow::anyhow!(
346 "Column `{:?}` on RisingWave side is not found on BigQuery side.",
347 i.name
348 ))
349 })?;
350 let data_type_string = Self::get_string_and_check_support_from_datatype(&i.data_type)?;
351 if data_type_string.ne(value) {
352 return Err(SinkError::BigQuery(anyhow::anyhow!(
353 "Data type mismatch for column `{:?}`. BigQuery side: `{:?}`, RisingWave side: `{:?}`. ",
354 i.name,
355 value,
356 data_type_string
357 )));
358 };
359 }
360 Ok(())
361 }
362
363 fn get_string_and_check_support_from_datatype(rw_data_type: &DataType) -> Result<String> {
364 match rw_data_type {
365 DataType::Boolean => Ok("BOOL".to_owned()),
366 DataType::Int16 => Ok("INT64".to_owned()),
367 DataType::Int32 => Ok("INT64".to_owned()),
368 DataType::Int64 => Ok("INT64".to_owned()),
369 DataType::Float32 => Err(SinkError::BigQuery(anyhow::anyhow!(
370 "REAL is not supported for BigQuery sink. Please convert to FLOAT64 or other supported types."
371 ))),
372 DataType::Float64 => Ok("FLOAT64".to_owned()),
373 DataType::Decimal => Ok("NUMERIC".to_owned()),
374 DataType::Date => Ok("DATE".to_owned()),
375 DataType::Varchar => Ok("STRING".to_owned()),
376 DataType::Time => Ok("TIME".to_owned()),
377 DataType::Timestamp => Ok("DATETIME".to_owned()),
378 DataType::Timestamptz => Ok("TIMESTAMP".to_owned()),
379 DataType::Interval => Ok("INTERVAL".to_owned()),
380 DataType::Struct(structs) => {
381 let mut elements_vec = vec![];
382 for (name, datatype) in structs.iter() {
383 let element_string =
384 Self::get_string_and_check_support_from_datatype(datatype)?;
385 elements_vec.push(format!("{} {}", name, element_string));
386 }
387 Ok(format!("STRUCT<{}>", elements_vec.join(", ")))
388 }
389 DataType::List(l) => {
390 let element_string = Self::get_string_and_check_support_from_datatype(l.elem())?;
391 Ok(format!("ARRAY<{}>", element_string))
392 }
393 DataType::Bytea => Ok("BYTES".to_owned()),
394 DataType::Jsonb => Ok("JSON".to_owned()),
395 DataType::Serial => Ok("INT64".to_owned()),
396 DataType::Int256 => Err(SinkError::BigQuery(anyhow::anyhow!(
397 "INT256 is not supported for BigQuery sink."
398 ))),
399 DataType::Map(_) => Err(SinkError::BigQuery(anyhow::anyhow!(
400 "MAP is not supported for BigQuery sink."
401 ))),
402 DataType::Vector(_) => Err(SinkError::BigQuery(anyhow::anyhow!(
403 "VECTOR is not supported for BigQuery sink."
404 ))),
405 }
406 }
407
408 fn map_field(rw_field: &Field) -> Result<TableFieldSchema> {
409 let tfs = match &rw_field.data_type {
410 DataType::Boolean => TableFieldSchema::bool(&rw_field.name),
411 DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Serial => {
412 TableFieldSchema::integer(&rw_field.name)
413 }
414 DataType::Float32 => {
415 return Err(SinkError::BigQuery(anyhow::anyhow!(
416 "REAL is not supported for BigQuery sink. Please convert to FLOAT64 or other supported types."
417 )));
418 }
419 DataType::Float64 => TableFieldSchema::float(&rw_field.name),
420 DataType::Decimal => TableFieldSchema::numeric(&rw_field.name),
421 DataType::Date => TableFieldSchema::date(&rw_field.name),
422 DataType::Varchar => TableFieldSchema::string(&rw_field.name),
423 DataType::Time => TableFieldSchema::time(&rw_field.name),
424 DataType::Timestamp => TableFieldSchema::date_time(&rw_field.name),
425 DataType::Timestamptz => TableFieldSchema::timestamp(&rw_field.name),
426 DataType::Interval => {
427 return Err(SinkError::BigQuery(anyhow::anyhow!(
428 "INTERVAL is not supported for BigQuery sink. Please convert to VARCHAR or other supported types."
429 )));
430 }
431 DataType::Struct(st) => {
432 let mut sub_fields = Vec::with_capacity(st.len());
433 for (name, dt) in st.iter() {
434 let rw_field = Field::with_name(dt.clone(), name);
435 let field = Self::map_field(&rw_field)?;
436 sub_fields.push(field);
437 }
438 TableFieldSchema::record(&rw_field.name, sub_fields)
439 }
440 DataType::List(lt) => {
441 let inner_field =
442 Self::map_field(&Field::with_name(lt.elem().clone(), &rw_field.name))?;
443 TableFieldSchema {
444 mode: Some("REPEATED".to_owned()),
445 ..inner_field
446 }
447 }
448
449 DataType::Bytea => TableFieldSchema::bytes(&rw_field.name),
450 DataType::Jsonb => TableFieldSchema::json(&rw_field.name),
451 DataType::Int256 => {
452 return Err(SinkError::BigQuery(anyhow::anyhow!(
453 "INT256 is not supported for BigQuery sink."
454 )));
455 }
456 DataType::Map(_) => {
457 return Err(SinkError::BigQuery(anyhow::anyhow!(
458 "MAP is not supported for BigQuery sink."
459 )));
460 }
461 DataType::Vector(_) => {
462 return Err(SinkError::BigQuery(anyhow::anyhow!(
463 "VECTOR is not supported for BigQuery sink."
464 )));
465 }
466 };
467 Ok(tfs)
468 }
469
470 async fn create_table(
471 &self,
472 client: &Client,
473 project_id: &str,
474 dataset_id: &str,
475 table_id: &str,
476 fields: &Vec<Field>,
477 ) -> Result<Table> {
478 let dataset = client
479 .dataset()
480 .get(project_id, dataset_id)
481 .await
482 .map_err(|e| SinkError::BigQuery(e.into()))?;
483 let fields: Vec<_> = fields.iter().map(Self::map_field).collect::<Result<_>>()?;
484 let table = Table::from_dataset(&dataset, table_id, TableSchema::new(fields));
485
486 client
487 .table()
488 .create(table)
489 .await
490 .map_err(|e| SinkError::BigQuery(e.into()))
491 }
492}
493
494impl Sink for BigQuerySink {
495 type LogSinker = BigQueryLogSinker;
496
497 const SINK_NAME: &'static str = BIGQUERY_SINK;
498
499 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
500 let (writer, resp_stream) = BigQuerySinkWriter::new(
501 self.config.clone(),
502 self.schema.clone(),
503 self.pk_indices.clone(),
504 self.is_append_only,
505 )
506 .await?;
507 Ok(BigQueryLogSinker::new(
508 writer,
509 resp_stream,
510 BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE,
511 ))
512 }
513
514 async fn validate(&self) -> Result<()> {
515 risingwave_common::license::Feature::BigQuerySink
516 .check_available()
517 .map_err(|e| anyhow::anyhow!(e))?;
518 if !self.is_append_only && self.pk_indices.is_empty() {
519 return Err(SinkError::Config(anyhow!(
520 "Primary key not defined for upsert bigquery sink (please define in `primary_key` field)"
521 )));
522 }
523 let client = self
524 .config
525 .common
526 .build_client(&self.config.aws_auth_props)
527 .await?;
528 let BigQueryCommon {
529 project: project_id,
530 dataset: dataset_id,
531 table: table_id,
532 ..
533 } = &self.config.common;
534
535 if self.config.common.auto_create {
536 match client
537 .table()
538 .get(project_id, dataset_id, table_id, None)
539 .await
540 {
541 Err(BQError::RequestError(_)) => {
542 return self
544 .create_table(
545 &client,
546 project_id,
547 dataset_id,
548 table_id,
549 &self.schema.fields,
550 )
551 .await
552 .map(|_| ());
553 }
554 Err(e) => return Err(SinkError::BigQuery(e.into())),
555 _ => {}
556 }
557 }
558
559 let mut rs = client
560 .job()
561 .query(
562 &self.config.common.project,
563 QueryRequest::new(format!(
564 "SELECT column_name, data_type FROM `{}.{}.INFORMATION_SCHEMA.COLUMNS` WHERE table_name = '{}'",
565 project_id, dataset_id, table_id,
566 )),
567 ).await.map_err(|e| SinkError::BigQuery(e.into()))?;
568
569 let mut big_query_schema = HashMap::default();
570 while rs.next_row() {
571 big_query_schema.insert(
572 rs.get_string_by_name("column_name")
573 .map_err(|e| SinkError::BigQuery(e.into()))?
574 .ok_or_else(|| {
575 SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
576 })?,
577 rs.get_string_by_name("data_type")
578 .map_err(|e| SinkError::BigQuery(e.into()))?
579 .ok_or_else(|| {
580 SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
581 })?,
582 );
583 }
584
585 self.check_column_name_and_type(big_query_schema)?;
586 Ok(())
587 }
588}
589
590pub struct BigQuerySinkWriter {
591 pub config: BigQueryConfig,
592 #[expect(dead_code)]
593 schema: Schema,
594 #[expect(dead_code)]
595 pk_indices: Vec<usize>,
596 client: StorageWriterClient,
597 is_append_only: bool,
598 row_encoder: ProtoEncoder,
599 writer_pb_schema: ProtoSchema,
600 #[expect(dead_code)]
601 message_descriptor: MessageDescriptor,
602 write_stream: String,
603 proto_field: Option<FieldDescriptor>,
604}
605
606impl TryFrom<SinkParam> for BigQuerySink {
607 type Error = SinkError;
608
609 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
610 let schema = param.schema();
611 let config = BigQueryConfig::from_btreemap(param.properties)?;
612 BigQuerySink::new(
613 config,
614 schema,
615 param.downstream_pk,
616 param.sink_type.is_append_only(),
617 )
618 }
619}
620
621impl BigQuerySinkWriter {
622 pub async fn new(
623 config: BigQueryConfig,
624 schema: Schema,
625 pk_indices: Vec<usize>,
626 is_append_only: bool,
627 ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
628 let (client, resp_stream) = config
629 .common
630 .build_writer_client(&config.aws_auth_props)
631 .await?;
632 let mut descriptor_proto = build_protobuf_schema(
633 schema
634 .fields()
635 .iter()
636 .map(|f| (f.name.as_str(), &f.data_type)),
637 config.common.table.clone(),
638 )?;
639
640 if !is_append_only {
641 let field = FieldDescriptorProto {
642 name: Some(CHANGE_TYPE.to_owned()),
643 number: Some((schema.len() + 1) as i32),
644 r#type: Some(field_descriptor_proto::Type::String.into()),
645 ..Default::default()
646 };
647 descriptor_proto.field.push(field);
648 }
649
650 let descriptor_pool = build_protobuf_descriptor_pool(&descriptor_proto)?;
651 let message_descriptor = descriptor_pool
652 .get_message_by_name(&config.common.table)
653 .ok_or_else(|| {
654 SinkError::BigQuery(anyhow::anyhow!(
655 "Can't find message proto {}",
656 &config.common.table
657 ))
658 })?;
659 let proto_field = if !is_append_only {
660 let proto_field = message_descriptor
661 .get_field_by_name(CHANGE_TYPE)
662 .ok_or_else(|| {
663 SinkError::BigQuery(anyhow::anyhow!("Can't find {}", CHANGE_TYPE))
664 })?;
665 Some(proto_field)
666 } else {
667 None
668 };
669 let row_encoder = ProtoEncoder::new(
670 schema.clone(),
671 None,
672 message_descriptor.clone(),
673 ProtoHeader::None,
674 )?;
675 Ok((
676 Self {
677 write_stream: format!(
678 "projects/{}/datasets/{}/tables/{}/streams/_default",
679 config.common.project, config.common.dataset, config.common.table
680 ),
681 config,
682 schema,
683 pk_indices,
684 client,
685 is_append_only,
686 row_encoder,
687 message_descriptor,
688 proto_field,
689 writer_pb_schema: ProtoSchema {
690 proto_descriptor: Some(descriptor_proto),
691 },
692 },
693 resp_stream,
694 ))
695 }
696
697 fn append_only(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
698 let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
699 for (op, row) in chunk.rows() {
700 if op != Op::Insert {
701 continue;
702 }
703 serialized_rows.push(self.row_encoder.encode(row)?.ser_to()?)
704 }
705 Ok(serialized_rows)
706 }
707
708 fn upsert(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
709 let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
710 for (op, row) in chunk.rows() {
711 if op == Op::UpdateDelete {
712 continue;
713 }
714 let mut pb_row = self.row_encoder.encode(row)?;
715 match op {
716 Op::Insert => pb_row
717 .message
718 .try_set_field(
719 self.proto_field.as_ref().unwrap(),
720 prost_reflect::Value::String("UPSERT".to_owned()),
721 )
722 .map_err(|e| SinkError::BigQuery(e.into()))?,
723 Op::Delete => pb_row
724 .message
725 .try_set_field(
726 self.proto_field.as_ref().unwrap(),
727 prost_reflect::Value::String("DELETE".to_owned()),
728 )
729 .map_err(|e| SinkError::BigQuery(e.into()))?,
730 Op::UpdateDelete => continue,
731 Op::UpdateInsert => pb_row
732 .message
733 .try_set_field(
734 self.proto_field.as_ref().unwrap(),
735 prost_reflect::Value::String("UPSERT".to_owned()),
736 )
737 .map_err(|e| SinkError::BigQuery(e.into()))?,
738 };
739
740 serialized_rows.push(pb_row.ser_to()?)
741 }
742 Ok(serialized_rows)
743 }
744
745 fn write_chunk(&mut self, chunk: StreamChunk) -> Result<usize> {
746 let serialized_rows = if self.is_append_only {
747 self.append_only(chunk)?
748 } else {
749 self.upsert(chunk)?
750 };
751 if serialized_rows.is_empty() {
752 return Ok(0);
753 }
754 let mut result = Vec::new();
755 let mut result_inner = Vec::new();
756 let mut size_count = 0;
757 for i in serialized_rows {
758 size_count += i.len();
759 if size_count > MAX_ROW_SIZE {
760 result.push(result_inner);
761 result_inner = Vec::new();
762 size_count = i.len();
763 }
764 result_inner.push(i);
765 }
766 if !result_inner.is_empty() {
767 result.push(result_inner);
768 }
769 let len = result.len();
770 for serialized_rows in result {
771 let rows = AppendRowsRequestRows::ProtoRows(ProtoData {
772 writer_schema: Some(self.writer_pb_schema.clone()),
773 rows: Some(ProtoRows { serialized_rows }),
774 });
775 self.client.append_rows(rows, self.write_stream.clone())?;
776 }
777 Ok(len)
778 }
779}
780
781#[try_stream(ok = (), error = SinkError)]
782pub async fn resp_to_stream(
783 resp_stream: impl Future<
784 Output = std::result::Result<
785 Response<google_cloud_gax::grpc::Streaming<AppendRowsResponse>>,
786 Status,
787 >,
788 >
789 + 'static
790 + Send,
791) {
792 let mut resp_stream = resp_stream
793 .await
794 .map_err(|e| SinkError::BigQuery(e.into()))?
795 .into_inner();
796 loop {
797 match resp_stream
798 .message()
799 .await
800 .map_err(|e| SinkError::BigQuery(e.into()))?
801 {
802 Some(append_rows_response) => {
803 if !append_rows_response.row_errors.is_empty() {
804 return Err(SinkError::BigQuery(anyhow::anyhow!(
805 "bigquery insert error {:?}",
806 append_rows_response.row_errors
807 )));
808 }
809 if let Some(google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_response::Response::Error(status)) = append_rows_response.response{
810 return Err(SinkError::BigQuery(anyhow::anyhow!(
811 "bigquery insert error {:?}",
812 status
813 )));
814 }
815 yield ();
816 }
817 None => {
818 return Err(SinkError::BigQuery(anyhow::anyhow!(
819 "bigquery insert error: end of resp stream",
820 )));
821 }
822 }
823 }
824}
825
826struct StorageWriterClient {
827 #[expect(dead_code)]
828 environment: Environment,
829 request_sender: mpsc::UnboundedSender<AppendRowsRequest>,
830}
831impl StorageWriterClient {
832 pub async fn new(
833 credentials: CredentialsFile,
834 ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
835 let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
836 Self::bigquery_grpc_auth_config(),
837 Box::new(credentials),
838 )
839 .await
840 .map_err(|e| SinkError::BigQuery(e.into()))?;
841 let conn_options = ConnectionOptions {
842 connect_timeout: CONNECT_TIMEOUT,
843 timeout: CONNECTION_TIMEOUT,
844 };
845 let environment = Environment::GoogleCloud(Box::new(ts_grpc));
846 let conn = ConnectionManager::new(DEFAULT_GRPC_CHANNEL_NUMS, &environment, &conn_options)
847 .await
848 .map_err(|e| SinkError::BigQuery(e.into()))?;
849 let mut client = conn.writer();
850
851 let (tx, rx) = mpsc::unbounded_channel();
852 let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
853
854 let resp = async move { client.append_rows(Request::new(stream)).await };
855 let resp_stream = resp_to_stream(resp);
856
857 Ok((
858 StorageWriterClient {
859 environment,
860 request_sender: tx,
861 },
862 resp_stream,
863 ))
864 }
865
866 pub fn append_rows(&mut self, row: AppendRowsRequestRows, write_stream: String) -> Result<()> {
867 let append_req = AppendRowsRequest {
868 write_stream: write_stream.clone(),
869 offset: None,
870 trace_id: Uuid::new_v4().hyphenated().to_string(),
871 missing_value_interpretations: HashMap::default(),
872 rows: Some(row),
873 default_missing_value_interpretation: MissingValueInterpretation::DefaultValue as i32,
874 };
875 self.request_sender
876 .send(append_req)
877 .map_err(|e| SinkError::BigQuery(e.into()))?;
878 Ok(())
879 }
880
881 fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> {
882 let mut auth_config = google_cloud_auth::project::Config::default();
883 auth_config =
884 auth_config.with_audience(google_cloud_bigquery::grpc::apiv1::conn_pool::AUDIENCE);
885 auth_config =
886 auth_config.with_scopes(&google_cloud_bigquery::grpc::apiv1::conn_pool::SCOPES);
887 auth_config
888 }
889}
890
891fn build_protobuf_descriptor_pool(desc: &DescriptorProto) -> Result<prost_reflect::DescriptorPool> {
892 let file_descriptor = FileDescriptorProto {
893 message_type: vec![desc.clone()],
894 name: Some("bigquery".to_owned()),
895 ..Default::default()
896 };
897
898 prost_reflect::DescriptorPool::from_file_descriptor_set(FileDescriptorSet {
899 file: vec![file_descriptor],
900 })
901 .context("failed to build descriptor pool")
902 .map_err(SinkError::BigQuery)
903}
904
905fn build_protobuf_schema<'a>(
906 fields: impl Iterator<Item = (&'a str, &'a DataType)>,
907 name: String,
908) -> Result<DescriptorProto> {
909 let mut proto = DescriptorProto {
910 name: Some(name),
911 ..Default::default()
912 };
913 let mut struct_vec = vec![];
914 let field_vec = fields
915 .enumerate()
916 .map(|(index, (name, data_type))| {
917 let (field, des_proto) =
918 build_protobuf_field(data_type, (index + 1) as i32, name.to_owned())?;
919 if let Some(sv) = des_proto {
920 struct_vec.push(sv);
921 }
922 Ok(field)
923 })
924 .collect::<Result<Vec<_>>>()?;
925 proto.field = field_vec;
926 proto.nested_type = struct_vec;
927 Ok(proto)
928}
929
930fn build_protobuf_field(
931 data_type: &DataType,
932 index: i32,
933 name: String,
934) -> Result<(FieldDescriptorProto, Option<DescriptorProto>)> {
935 let mut field = FieldDescriptorProto {
936 name: Some(name.clone()),
937 number: Some(index),
938 ..Default::default()
939 };
940 match data_type {
941 DataType::Boolean => field.r#type = Some(field_descriptor_proto::Type::Bool.into()),
942 DataType::Int32 => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
943 DataType::Int16 | DataType::Int64 => {
944 field.r#type = Some(field_descriptor_proto::Type::Int64.into())
945 }
946 DataType::Float64 => field.r#type = Some(field_descriptor_proto::Type::Double.into()),
947 DataType::Decimal => field.r#type = Some(field_descriptor_proto::Type::String.into()),
948 DataType::Date => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
949 DataType::Varchar => field.r#type = Some(field_descriptor_proto::Type::String.into()),
950 DataType::Time => field.r#type = Some(field_descriptor_proto::Type::String.into()),
951 DataType::Timestamp => field.r#type = Some(field_descriptor_proto::Type::String.into()),
952 DataType::Timestamptz => field.r#type = Some(field_descriptor_proto::Type::String.into()),
953 DataType::Interval => field.r#type = Some(field_descriptor_proto::Type::String.into()),
954 DataType::Struct(s) => {
955 field.r#type = Some(field_descriptor_proto::Type::Message.into());
956 let name = format!("Struct{}", name);
957 let sub_proto = build_protobuf_schema(s.iter(), name.clone())?;
958 field.type_name = Some(name);
959 return Ok((field, Some(sub_proto)));
960 }
961 DataType::List(l) => {
962 let (mut field, proto) = build_protobuf_field(l.elem(), index, name.clone())?;
963 field.label = Some(field_descriptor_proto::Label::Repeated.into());
964 return Ok((field, proto));
965 }
966 DataType::Bytea => field.r#type = Some(field_descriptor_proto::Type::Bytes.into()),
967 DataType::Jsonb => field.r#type = Some(field_descriptor_proto::Type::String.into()),
968 DataType::Serial => field.r#type = Some(field_descriptor_proto::Type::Int64.into()),
969 DataType::Float32 | DataType::Int256 => {
970 return Err(SinkError::BigQuery(anyhow::anyhow!(
971 "Don't support Float32 and Int256"
972 )));
973 }
974 DataType::Map(_) => return Err(SinkError::BigQuery(anyhow::anyhow!("Don't support Map"))),
975 DataType::Vector(_) => {
976 return Err(SinkError::BigQuery(anyhow::anyhow!("Don't support Vector")));
977 }
978 }
979 Ok((field, None))
980}
981
982#[cfg(test)]
983mod test {
984
985 use std::assert_matches::assert_matches;
986
987 use risingwave_common::catalog::{Field, Schema};
988 use risingwave_common::types::{DataType, StructType};
989
990 use crate::sink::big_query::{
991 BigQuerySink, build_protobuf_descriptor_pool, build_protobuf_schema,
992 };
993
994 #[tokio::test]
995 async fn test_type_check() {
996 let big_query_type_string = "ARRAY<STRUCT<v1 ARRAY<INT64>, v2 STRUCT<v1 INT64, v2 INT64>>>";
997 let rw_datatype = DataType::list(DataType::Struct(StructType::new(vec![
998 ("v1".to_owned(), DataType::Int64.list()),
999 (
1000 "v2".to_owned(),
1001 DataType::Struct(StructType::new(vec![
1002 ("v1".to_owned(), DataType::Int64),
1003 ("v2".to_owned(), DataType::Int64),
1004 ])),
1005 ),
1006 ])));
1007 assert_eq!(
1008 BigQuerySink::get_string_and_check_support_from_datatype(&rw_datatype).unwrap(),
1009 big_query_type_string
1010 );
1011 }
1012
1013 #[tokio::test]
1014 async fn test_schema_check() {
1015 let schema = Schema {
1016 fields: vec![
1017 Field::with_name(DataType::Int64, "v1"),
1018 Field::with_name(DataType::Float64, "v2"),
1019 Field::with_name(
1020 DataType::list(DataType::Struct(StructType::new(vec![
1021 ("v1".to_owned(), DataType::Int64.list()),
1022 (
1023 "v3".to_owned(),
1024 DataType::Struct(StructType::new(vec![
1025 ("v1".to_owned(), DataType::Int64),
1026 ("v2".to_owned(), DataType::Int64),
1027 ])),
1028 ),
1029 ]))),
1030 "v3",
1031 ),
1032 ],
1033 };
1034 let fields = schema
1035 .fields()
1036 .iter()
1037 .map(|f| (f.name.as_str(), &f.data_type));
1038 let desc = build_protobuf_schema(fields, "t1".to_owned()).unwrap();
1039 let pool = build_protobuf_descriptor_pool(&desc).unwrap();
1040 let t1_message = pool.get_message_by_name("t1").unwrap();
1041 assert_matches!(
1042 t1_message.get_field_by_name("v1").unwrap().kind(),
1043 prost_reflect::Kind::Int64
1044 );
1045 assert_matches!(
1046 t1_message.get_field_by_name("v2").unwrap().kind(),
1047 prost_reflect::Kind::Double
1048 );
1049 assert_matches!(
1050 t1_message.get_field_by_name("v3").unwrap().kind(),
1051 prost_reflect::Kind::Message(_)
1052 );
1053
1054 let v3_message = pool.get_message_by_name("t1.Structv3").unwrap();
1055 assert_matches!(
1056 v3_message.get_field_by_name("v1").unwrap().kind(),
1057 prost_reflect::Kind::Int64
1058 );
1059 assert!(v3_message.get_field_by_name("v1").unwrap().is_list());
1060
1061 let v3_v3_message = pool.get_message_by_name("t1.Structv3.Structv3").unwrap();
1062 assert_matches!(
1063 v3_v3_message.get_field_by_name("v1").unwrap().kind(),
1064 prost_reflect::Kind::Int64
1065 );
1066 assert_matches!(
1067 v3_v3_message.get_field_by_name("v2").unwrap().kind(),
1068 prost_reflect::Kind::Int64
1069 );
1070 }
1071}