1#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct TableSchema {
5 #[prost(message, repeated, tag = "1")]
6 pub columns: ::prost::alloc::vec::Vec<super::plan_common::ColumnDesc>,
7 #[prost(uint32, repeated, tag = "2")]
8 pub pk_indices: ::prost::alloc::vec::Vec<u32>,
9}
10#[derive(prost_helpers::AnyPB)]
11#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
12pub struct ValidationError {
13 #[prost(string, tag = "1")]
14 pub error_message: ::prost::alloc::string::String,
15}
16#[derive(prost_helpers::AnyPB)]
17#[derive(Clone, PartialEq, ::prost::Message)]
18pub struct SinkParam {
19 #[prost(uint32, tag = "1", wrapper = "crate::id::SinkId")]
20 pub sink_id: crate::id::SinkId,
21 #[prost(btree_map = "string, string", tag = "2")]
22 pub properties: ::prost::alloc::collections::BTreeMap<
23 ::prost::alloc::string::String,
24 ::prost::alloc::string::String,
25 >,
26 #[prost(message, optional, tag = "3")]
27 pub table_schema: ::core::option::Option<TableSchema>,
28 #[prost(enumeration = "super::catalog::SinkType", tag = "4")]
30 pub sink_type: i32,
31 #[prost(string, tag = "5")]
32 pub db_name: ::prost::alloc::string::String,
33 #[prost(string, tag = "6")]
34 pub sink_from_name: ::prost::alloc::string::String,
35 #[prost(message, optional, tag = "7")]
36 pub format_desc: ::core::option::Option<super::catalog::SinkFormatDesc>,
37 #[prost(string, tag = "8")]
38 pub sink_name: ::prost::alloc::string::String,
39 #[prost(bool, tag = "9")]
43 pub raw_ignore_delete: bool,
44}
45#[derive(prost_helpers::AnyPB)]
46#[derive(Clone, PartialEq, ::prost::Message)]
47pub struct SinkWriterStreamRequest {
48 #[prost(oneof = "sink_writer_stream_request::Request", tags = "1, 3, 4")]
49 pub request: ::core::option::Option<sink_writer_stream_request::Request>,
50}
51pub mod sink_writer_stream_request {
53 #[derive(prost_helpers::AnyPB)]
54 #[derive(Clone, PartialEq, ::prost::Message)]
55 pub struct StartSink {
56 #[prost(message, optional, tag = "1")]
57 pub sink_param: ::core::option::Option<super::SinkParam>,
58 #[prost(message, optional, tag = "3")]
59 pub payload_schema: ::core::option::Option<super::TableSchema>,
60 }
61 #[derive(prost_helpers::AnyPB)]
62 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
63 pub struct WriteBatch {
64 #[prost(uint64, tag = "3")]
65 pub batch_id: u64,
66 #[prost(uint64, tag = "4")]
67 pub epoch: u64,
68 #[prost(oneof = "write_batch::Payload", tags = "2, 5")]
69 pub payload: ::core::option::Option<write_batch::Payload>,
70 }
71 pub mod write_batch {
73 #[derive(prost_helpers::AnyPB)]
74 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
75 pub struct StreamChunkPayload {
76 #[prost(bytes = "vec", tag = "1")]
77 pub binary_data: ::prost::alloc::vec::Vec<u8>,
78 }
79 #[derive(prost_helpers::AnyPB)]
80 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)]
81 pub enum Payload {
82 #[prost(message, tag = "2")]
83 StreamChunkPayload(StreamChunkPayload),
84 #[prost(int64, tag = "5")]
88 StreamChunkRefPointer(i64),
89 }
90 }
91 #[derive(prost_helpers::AnyPB)]
92 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
93 pub struct Barrier {
94 #[prost(uint64, tag = "1")]
95 pub epoch: u64,
96 #[prost(bool, tag = "2")]
97 pub is_checkpoint: bool,
98 }
99 #[derive(prost_helpers::AnyPB)]
100 #[derive(Clone, PartialEq, ::prost::Oneof)]
101 pub enum Request {
102 #[prost(message, tag = "1")]
103 Start(StartSink),
104 #[prost(message, tag = "3")]
105 WriteBatch(WriteBatch),
106 #[prost(message, tag = "4")]
107 Barrier(Barrier),
108 }
109}
110#[derive(prost_helpers::AnyPB)]
111#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
112pub struct SinkWriterStreamResponse {
113 #[prost(oneof = "sink_writer_stream_response::Response", tags = "1, 2, 3")]
114 pub response: ::core::option::Option<sink_writer_stream_response::Response>,
115}
116pub mod sink_writer_stream_response {
118 #[derive(prost_helpers::AnyPB)]
119 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
120 pub struct StartResponse {}
121 #[derive(prost_helpers::AnyPB)]
122 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
123 pub struct CommitResponse {
124 #[prost(uint64, tag = "1")]
125 pub epoch: u64,
126 #[prost(message, optional, tag = "2")]
127 pub metadata: ::core::option::Option<super::SinkMetadata>,
128 }
129 #[derive(prost_helpers::AnyPB)]
130 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
131 pub struct BatchWrittenResponse {
132 #[prost(uint64, tag = "1")]
133 pub epoch: u64,
134 #[prost(uint64, tag = "2")]
135 pub batch_id: u64,
136 }
137 #[derive(prost_helpers::AnyPB)]
138 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)]
139 pub enum Response {
140 #[prost(message, tag = "1")]
141 Start(StartResponse),
142 #[prost(message, tag = "2")]
143 Commit(CommitResponse),
144 #[prost(message, tag = "3")]
145 Batch(BatchWrittenResponse),
146 }
147}
148#[derive(prost_helpers::AnyPB)]
149#[derive(Clone, PartialEq, ::prost::Message)]
150pub struct ValidateSinkRequest {
151 #[prost(message, optional, tag = "1")]
152 pub sink_param: ::core::option::Option<SinkParam>,
153}
154#[derive(prost_helpers::AnyPB)]
155#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
156pub struct ValidateSinkResponse {
157 #[prost(message, optional, tag = "1")]
159 pub error: ::core::option::Option<ValidationError>,
160}
161#[derive(prost_helpers::AnyPB)]
162#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
163pub struct SinkMetadata {
164 #[prost(oneof = "sink_metadata::Metadata", tags = "1")]
165 pub metadata: ::core::option::Option<sink_metadata::Metadata>,
166}
167pub mod sink_metadata {
169 #[derive(prost_helpers::AnyPB)]
170 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
171 pub struct SerializedMetadata {
172 #[prost(bytes = "vec", tag = "1")]
173 pub metadata: ::prost::alloc::vec::Vec<u8>,
174 }
175 #[derive(prost_helpers::AnyPB)]
176 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)]
177 pub enum Metadata {
178 #[prost(message, tag = "1")]
179 Serialized(SerializedMetadata),
180 }
181}
182#[derive(prost_helpers::AnyPB)]
183#[derive(Clone, PartialEq, ::prost::Message)]
184pub struct SinkCoordinatorStreamRequest {
185 #[prost(oneof = "sink_coordinator_stream_request::Request", tags = "1, 2")]
186 pub request: ::core::option::Option<sink_coordinator_stream_request::Request>,
187}
188pub mod sink_coordinator_stream_request {
190 #[derive(prost_helpers::AnyPB)]
191 #[derive(Clone, PartialEq, ::prost::Message)]
192 pub struct StartCoordinator {
193 #[prost(message, optional, tag = "1")]
194 pub param: ::core::option::Option<super::SinkParam>,
195 }
196 #[derive(prost_helpers::AnyPB)]
197 #[derive(Clone, PartialEq, ::prost::Message)]
198 pub struct CommitMetadata {
199 #[prost(uint64, tag = "1")]
200 pub epoch: u64,
201 #[prost(message, repeated, tag = "2")]
202 pub metadata: ::prost::alloc::vec::Vec<super::SinkMetadata>,
203 }
204 #[derive(prost_helpers::AnyPB)]
205 #[derive(Clone, PartialEq, ::prost::Oneof)]
206 pub enum Request {
207 #[prost(message, tag = "1")]
208 Start(StartCoordinator),
209 #[prost(message, tag = "2")]
210 Commit(CommitMetadata),
211 }
212}
213#[derive(prost_helpers::AnyPB)]
214#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
215pub struct SinkCoordinatorStreamResponse {
216 #[prost(oneof = "sink_coordinator_stream_response::Response", tags = "1, 2")]
217 pub response: ::core::option::Option<sink_coordinator_stream_response::Response>,
218}
219pub mod sink_coordinator_stream_response {
221 #[derive(prost_helpers::AnyPB)]
222 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
223 pub struct StartResponse {}
224 #[derive(prost_helpers::AnyPB)]
225 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
226 pub struct CommitResponse {
227 #[prost(uint64, tag = "1")]
228 pub epoch: u64,
229 }
230 #[derive(prost_helpers::AnyPB)]
231 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)]
232 pub enum Response {
233 #[prost(message, tag = "1")]
234 Start(StartResponse),
235 #[prost(message, tag = "2")]
236 Commit(CommitResponse),
237 }
238}
239#[derive(prost_helpers::AnyPB)]
240#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
241pub struct CdcMessage {
242 #[prost(string, tag = "1")]
244 pub payload: ::prost::alloc::string::String,
245 #[prost(string, tag = "2")]
246 pub partition: ::prost::alloc::string::String,
247 #[prost(string, tag = "3")]
248 pub offset: ::prost::alloc::string::String,
249 #[prost(string, tag = "4")]
250 pub full_table_name: ::prost::alloc::string::String,
251 #[prost(int64, tag = "5")]
252 pub source_ts_ms: i64,
253 #[prost(enumeration = "cdc_message::CdcMessageType", tag = "6")]
254 pub msg_type: i32,
255 #[prost(string, tag = "7")]
257 pub key: ::prost::alloc::string::String,
258 #[prost(enumeration = "SourceType", tag = "8")]
261 pub source_type: i32,
262}
263pub mod cdc_message {
265 #[derive(prost_helpers::AnyPB)]
266 #[derive(
267 Clone,
268 Copy,
269 Debug,
270 PartialEq,
271 Eq,
272 Hash,
273 PartialOrd,
274 Ord,
275 ::prost::Enumeration
276 )]
277 #[repr(i32)]
278 pub enum CdcMessageType {
279 Unspecified = 0,
280 Heartbeat = 1,
281 Data = 2,
282 TransactionMeta = 3,
283 SchemaChange = 4,
284 }
285 impl CdcMessageType {
286 pub fn as_str_name(&self) -> &'static str {
291 match self {
292 Self::Unspecified => "UNSPECIFIED",
293 Self::Heartbeat => "HEARTBEAT",
294 Self::Data => "DATA",
295 Self::TransactionMeta => "TRANSACTION_META",
296 Self::SchemaChange => "SCHEMA_CHANGE",
297 }
298 }
299 pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
301 match value {
302 "UNSPECIFIED" => Some(Self::Unspecified),
303 "HEARTBEAT" => Some(Self::Heartbeat),
304 "DATA" => Some(Self::Data),
305 "TRANSACTION_META" => Some(Self::TransactionMeta),
306 "SCHEMA_CHANGE" => Some(Self::SchemaChange),
307 _ => None,
308 }
309 }
310 }
311}
312#[derive(prost_helpers::AnyPB)]
313#[derive(Clone, PartialEq, ::prost::Message)]
314pub struct GetEventStreamRequest {
315 #[prost(uint64, tag = "1")]
316 pub source_id: u64,
317 #[prost(enumeration = "SourceType", tag = "2")]
318 pub source_type: i32,
319 #[prost(string, tag = "3")]
320 pub start_offset: ::prost::alloc::string::String,
321 #[prost(btree_map = "string, string", tag = "4")]
322 pub properties: ::prost::alloc::collections::BTreeMap<
323 ::prost::alloc::string::String,
324 ::prost::alloc::string::String,
325 >,
326 #[prost(bool, tag = "5")]
327 pub snapshot_done: bool,
328 #[prost(bool, tag = "6")]
329 pub is_source_job: bool,
330}
331#[derive(prost_helpers::AnyPB)]
332#[derive(Clone, PartialEq, ::prost::Message)]
333pub struct GetEventStreamResponse {
334 #[prost(uint64, tag = "1")]
335 pub source_id: u64,
336 #[prost(message, repeated, tag = "2")]
337 pub events: ::prost::alloc::vec::Vec<CdcMessage>,
338 #[prost(message, optional, tag = "3")]
339 pub control: ::core::option::Option<get_event_stream_response::ControlInfo>,
340}
341pub mod get_event_stream_response {
343 #[derive(prost_helpers::AnyPB)]
344 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
345 pub struct ControlInfo {
346 #[prost(bool, tag = "1")]
347 pub handshake_ok: bool,
348 }
349}
350#[derive(prost_helpers::AnyPB)]
351#[derive(Clone, PartialEq, ::prost::Message)]
352pub struct ValidateSourceRequest {
353 #[prost(uint64, tag = "1")]
354 pub source_id: u64,
355 #[prost(enumeration = "SourceType", tag = "2")]
356 pub source_type: i32,
357 #[prost(btree_map = "string, string", tag = "3")]
358 pub properties: ::prost::alloc::collections::BTreeMap<
359 ::prost::alloc::string::String,
360 ::prost::alloc::string::String,
361 >,
362 #[prost(message, optional, tag = "4")]
363 pub table_schema: ::core::option::Option<TableSchema>,
364 #[prost(bool, tag = "5")]
365 pub is_source_job: bool,
366 #[prost(bool, tag = "6")]
367 pub is_backfill_table: bool,
368}
369#[derive(prost_helpers::AnyPB)]
370#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
371pub struct ValidateSourceResponse {
372 #[prost(message, optional, tag = "1")]
374 pub error: ::core::option::Option<ValidationError>,
375}
376#[derive(prost_helpers::AnyPB)]
377#[derive(Clone, PartialEq, ::prost::Message)]
378pub struct CoordinateRequest {
379 #[prost(oneof = "coordinate_request::Msg", tags = "1, 2, 3, 4, 5")]
380 pub msg: ::core::option::Option<coordinate_request::Msg>,
381}
382pub mod coordinate_request {
384 #[derive(prost_helpers::AnyPB)]
387 #[derive(Clone, PartialEq, ::prost::Message)]
388 pub struct StartCoordinationRequest {
389 #[prost(message, optional, tag = "1")]
390 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
391 #[prost(message, optional, tag = "2")]
392 pub param: ::core::option::Option<super::SinkParam>,
393 }
394 #[derive(prost_helpers::AnyPB)]
395 #[derive(Clone, PartialEq, ::prost::Message)]
396 pub struct CommitRequest {
397 #[prost(uint64, tag = "1")]
398 pub epoch: u64,
399 #[prost(message, optional, tag = "2")]
400 pub metadata: ::core::option::Option<super::SinkMetadata>,
401 #[prost(message, optional, tag = "3")]
402 pub schema_change: ::core::option::Option<
403 super::super::stream_plan::SinkSchemaChange,
404 >,
405 }
406 #[derive(prost_helpers::AnyPB)]
407 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
408 pub struct UpdateVnodeBitmapRequest {
409 #[prost(message, optional, tag = "1")]
410 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
411 }
412 #[derive(prost_helpers::AnyPB)]
413 #[derive(Clone, PartialEq, ::prost::Oneof)]
414 pub enum Msg {
415 #[prost(message, tag = "1")]
416 StartRequest(StartCoordinationRequest),
417 #[prost(message, tag = "2")]
418 CommitRequest(CommitRequest),
419 #[prost(message, tag = "3")]
420 UpdateVnodeRequest(UpdateVnodeBitmapRequest),
421 #[prost(bool, tag = "4")]
422 Stop(bool),
423 #[prost(uint64, tag = "5")]
424 AlignInitialEpochRequest(u64),
425 }
426}
427#[derive(prost_helpers::AnyPB)]
428#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
429pub struct CoordinateResponse {
430 #[prost(oneof = "coordinate_response::Msg", tags = "1, 2, 3, 4")]
431 pub msg: ::core::option::Option<coordinate_response::Msg>,
432}
433pub mod coordinate_response {
435 #[derive(prost_helpers::AnyPB)]
436 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
437 pub struct StartCoordinationResponse {
438 #[prost(uint64, optional, tag = "1")]
439 pub log_store_rewind_start_epoch: ::core::option::Option<u64>,
440 }
441 #[derive(prost_helpers::AnyPB)]
442 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
443 pub struct CommitResponse {
444 #[prost(uint64, tag = "1")]
445 pub epoch: u64,
446 }
447 #[derive(prost_helpers::AnyPB)]
448 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)]
449 pub enum Msg {
450 #[prost(message, tag = "1")]
451 StartResponse(StartCoordinationResponse),
452 #[prost(message, tag = "2")]
453 CommitResponse(CommitResponse),
454 #[prost(bool, tag = "3")]
455 Stopped(bool),
456 #[prost(uint64, tag = "4")]
457 AlignInitialEpochResponse(u64),
458 }
459}
460#[derive(prost_helpers::AnyPB)]
461#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
462pub struct IcebergV3PreCommitState {
463 #[prost(bytes = "vec", tag = "1")]
464 pub agg_result: ::prost::alloc::vec::Vec<u8>,
465 #[prost(int64, tag = "2")]
466 pub snapshot_id: i64,
467}
468#[derive(prost_helpers::AnyPB)]
469#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
470#[repr(i32)]
471pub enum SourceType {
472 Unspecified = 0,
473 Mysql = 1,
474 Postgres = 2,
475 Citus = 3,
476 Mongodb = 4,
477 SqlServer = 5,
478}
479impl SourceType {
480 pub fn as_str_name(&self) -> &'static str {
485 match self {
486 Self::Unspecified => "UNSPECIFIED",
487 Self::Mysql => "MYSQL",
488 Self::Postgres => "POSTGRES",
489 Self::Citus => "CITUS",
490 Self::Mongodb => "MONGODB",
491 Self::SqlServer => "SQL_SERVER",
492 }
493 }
494 pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
496 match value {
497 "UNSPECIFIED" => Some(Self::Unspecified),
498 "MYSQL" => Some(Self::Mysql),
499 "POSTGRES" => Some(Self::Postgres),
500 "CITUS" => Some(Self::Citus),
501 "MONGODB" => Some(Self::Mongodb),
502 "SQL_SERVER" => Some(Self::SqlServer),
503 _ => None,
504 }
505 }
506}
507pub mod connector_service_client {
509 #![allow(
510 unused_variables,
511 dead_code,
512 missing_docs,
513 clippy::wildcard_imports,
514 clippy::let_unit_value,
515 )]
516 use tonic::codegen::*;
517 use tonic::codegen::http::Uri;
518 #[derive(Debug, Clone)]
519 pub struct ConnectorServiceClient<T> {
520 inner: tonic::client::Grpc<T>,
521 }
522 impl ConnectorServiceClient<tonic::transport::Channel> {
523 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
525 where
526 D: TryInto<tonic::transport::Endpoint>,
527 D::Error: Into<StdError>,
528 {
529 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
530 Ok(Self::new(conn))
531 }
532 }
533 impl<T> ConnectorServiceClient<T>
534 where
535 T: tonic::client::GrpcService<tonic::body::Body>,
536 T::Error: Into<StdError>,
537 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
538 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
539 {
540 pub fn new(inner: T) -> Self {
541 let inner = tonic::client::Grpc::new(inner);
542 Self { inner }
543 }
544 pub fn with_origin(inner: T, origin: Uri) -> Self {
545 let inner = tonic::client::Grpc::with_origin(inner, origin);
546 Self { inner }
547 }
548 pub fn with_interceptor<F>(
549 inner: T,
550 interceptor: F,
551 ) -> ConnectorServiceClient<InterceptedService<T, F>>
552 where
553 F: tonic::service::Interceptor,
554 T::ResponseBody: Default,
555 T: tonic::codegen::Service<
556 http::Request<tonic::body::Body>,
557 Response = http::Response<
558 <T as tonic::client::GrpcService<tonic::body::Body>>::ResponseBody,
559 >,
560 >,
561 <T as tonic::codegen::Service<
562 http::Request<tonic::body::Body>,
563 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
564 {
565 ConnectorServiceClient::new(InterceptedService::new(inner, interceptor))
566 }
567 #[must_use]
572 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
573 self.inner = self.inner.send_compressed(encoding);
574 self
575 }
576 #[must_use]
578 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
579 self.inner = self.inner.accept_compressed(encoding);
580 self
581 }
582 #[must_use]
586 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
587 self.inner = self.inner.max_decoding_message_size(limit);
588 self
589 }
590 #[must_use]
594 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
595 self.inner = self.inner.max_encoding_message_size(limit);
596 self
597 }
598 pub async fn sink_writer_stream(
599 &mut self,
600 request: impl tonic::IntoStreamingRequest<
601 Message = super::SinkWriterStreamRequest,
602 >,
603 ) -> std::result::Result<
604 tonic::Response<tonic::codec::Streaming<super::SinkWriterStreamResponse>>,
605 tonic::Status,
606 > {
607 self.inner
608 .ready()
609 .await
610 .map_err(|e| {
611 tonic::Status::unknown(
612 format!("Service was not ready: {}", e.into()),
613 )
614 })?;
615 let codec = tonic_prost::ProstCodec::default();
616 let path = http::uri::PathAndQuery::from_static(
617 "/connector_service.ConnectorService/SinkWriterStream",
618 );
619 let mut req = request.into_streaming_request();
620 req.extensions_mut()
621 .insert(
622 GrpcMethod::new(
623 "connector_service.ConnectorService",
624 "SinkWriterStream",
625 ),
626 );
627 self.inner.streaming(req, path, codec).await
628 }
629 pub async fn sink_coordinator_stream(
630 &mut self,
631 request: impl tonic::IntoStreamingRequest<
632 Message = super::SinkCoordinatorStreamRequest,
633 >,
634 ) -> std::result::Result<
635 tonic::Response<
636 tonic::codec::Streaming<super::SinkCoordinatorStreamResponse>,
637 >,
638 tonic::Status,
639 > {
640 self.inner
641 .ready()
642 .await
643 .map_err(|e| {
644 tonic::Status::unknown(
645 format!("Service was not ready: {}", e.into()),
646 )
647 })?;
648 let codec = tonic_prost::ProstCodec::default();
649 let path = http::uri::PathAndQuery::from_static(
650 "/connector_service.ConnectorService/SinkCoordinatorStream",
651 );
652 let mut req = request.into_streaming_request();
653 req.extensions_mut()
654 .insert(
655 GrpcMethod::new(
656 "connector_service.ConnectorService",
657 "SinkCoordinatorStream",
658 ),
659 );
660 self.inner.streaming(req, path, codec).await
661 }
662 pub async fn validate_sink(
663 &mut self,
664 request: impl tonic::IntoRequest<super::ValidateSinkRequest>,
665 ) -> std::result::Result<
666 tonic::Response<super::ValidateSinkResponse>,
667 tonic::Status,
668 > {
669 self.inner
670 .ready()
671 .await
672 .map_err(|e| {
673 tonic::Status::unknown(
674 format!("Service was not ready: {}", e.into()),
675 )
676 })?;
677 let codec = tonic_prost::ProstCodec::default();
678 let path = http::uri::PathAndQuery::from_static(
679 "/connector_service.ConnectorService/ValidateSink",
680 );
681 let mut req = request.into_request();
682 req.extensions_mut()
683 .insert(
684 GrpcMethod::new("connector_service.ConnectorService", "ValidateSink"),
685 );
686 self.inner.unary(req, path, codec).await
687 }
688 pub async fn get_event_stream(
689 &mut self,
690 request: impl tonic::IntoRequest<super::GetEventStreamRequest>,
691 ) -> std::result::Result<
692 tonic::Response<tonic::codec::Streaming<super::GetEventStreamResponse>>,
693 tonic::Status,
694 > {
695 self.inner
696 .ready()
697 .await
698 .map_err(|e| {
699 tonic::Status::unknown(
700 format!("Service was not ready: {}", e.into()),
701 )
702 })?;
703 let codec = tonic_prost::ProstCodec::default();
704 let path = http::uri::PathAndQuery::from_static(
705 "/connector_service.ConnectorService/GetEventStream",
706 );
707 let mut req = request.into_request();
708 req.extensions_mut()
709 .insert(
710 GrpcMethod::new(
711 "connector_service.ConnectorService",
712 "GetEventStream",
713 ),
714 );
715 self.inner.server_streaming(req, path, codec).await
716 }
717 pub async fn validate_source(
718 &mut self,
719 request: impl tonic::IntoRequest<super::ValidateSourceRequest>,
720 ) -> std::result::Result<
721 tonic::Response<super::ValidateSourceResponse>,
722 tonic::Status,
723 > {
724 self.inner
725 .ready()
726 .await
727 .map_err(|e| {
728 tonic::Status::unknown(
729 format!("Service was not ready: {}", e.into()),
730 )
731 })?;
732 let codec = tonic_prost::ProstCodec::default();
733 let path = http::uri::PathAndQuery::from_static(
734 "/connector_service.ConnectorService/ValidateSource",
735 );
736 let mut req = request.into_request();
737 req.extensions_mut()
738 .insert(
739 GrpcMethod::new(
740 "connector_service.ConnectorService",
741 "ValidateSource",
742 ),
743 );
744 self.inner.unary(req, path, codec).await
745 }
746 }
747}
748pub mod connector_service_server {
750 #![allow(
751 unused_variables,
752 dead_code,
753 missing_docs,
754 clippy::wildcard_imports,
755 clippy::let_unit_value,
756 )]
757 use tonic::codegen::*;
758 #[async_trait]
760 pub trait ConnectorService: std::marker::Send + std::marker::Sync + 'static {
761 type SinkWriterStreamStream: tonic::codegen::tokio_stream::Stream<
763 Item = std::result::Result<
764 super::SinkWriterStreamResponse,
765 tonic::Status,
766 >,
767 >
768 + std::marker::Send
769 + 'static;
770 async fn sink_writer_stream(
771 &self,
772 request: tonic::Request<tonic::Streaming<super::SinkWriterStreamRequest>>,
773 ) -> std::result::Result<
774 tonic::Response<Self::SinkWriterStreamStream>,
775 tonic::Status,
776 >;
777 type SinkCoordinatorStreamStream: tonic::codegen::tokio_stream::Stream<
779 Item = std::result::Result<
780 super::SinkCoordinatorStreamResponse,
781 tonic::Status,
782 >,
783 >
784 + std::marker::Send
785 + 'static;
786 async fn sink_coordinator_stream(
787 &self,
788 request: tonic::Request<
789 tonic::Streaming<super::SinkCoordinatorStreamRequest>,
790 >,
791 ) -> std::result::Result<
792 tonic::Response<Self::SinkCoordinatorStreamStream>,
793 tonic::Status,
794 >;
795 async fn validate_sink(
796 &self,
797 request: tonic::Request<super::ValidateSinkRequest>,
798 ) -> std::result::Result<
799 tonic::Response<super::ValidateSinkResponse>,
800 tonic::Status,
801 >;
802 type GetEventStreamStream: tonic::codegen::tokio_stream::Stream<
804 Item = std::result::Result<super::GetEventStreamResponse, tonic::Status>,
805 >
806 + std::marker::Send
807 + 'static;
808 async fn get_event_stream(
809 &self,
810 request: tonic::Request<super::GetEventStreamRequest>,
811 ) -> std::result::Result<
812 tonic::Response<Self::GetEventStreamStream>,
813 tonic::Status,
814 >;
815 async fn validate_source(
816 &self,
817 request: tonic::Request<super::ValidateSourceRequest>,
818 ) -> std::result::Result<
819 tonic::Response<super::ValidateSourceResponse>,
820 tonic::Status,
821 >;
822 }
823 #[derive(Debug)]
824 pub struct ConnectorServiceServer<T> {
825 inner: Arc<T>,
826 accept_compression_encodings: EnabledCompressionEncodings,
827 send_compression_encodings: EnabledCompressionEncodings,
828 max_decoding_message_size: Option<usize>,
829 max_encoding_message_size: Option<usize>,
830 }
831 impl<T> ConnectorServiceServer<T> {
832 pub fn new(inner: T) -> Self {
833 Self::from_arc(Arc::new(inner))
834 }
835 pub fn from_arc(inner: Arc<T>) -> Self {
836 Self {
837 inner,
838 accept_compression_encodings: Default::default(),
839 send_compression_encodings: Default::default(),
840 max_decoding_message_size: None,
841 max_encoding_message_size: None,
842 }
843 }
844 pub fn with_interceptor<F>(
845 inner: T,
846 interceptor: F,
847 ) -> InterceptedService<Self, F>
848 where
849 F: tonic::service::Interceptor,
850 {
851 InterceptedService::new(Self::new(inner), interceptor)
852 }
853 #[must_use]
855 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
856 self.accept_compression_encodings.enable(encoding);
857 self
858 }
859 #[must_use]
861 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
862 self.send_compression_encodings.enable(encoding);
863 self
864 }
865 #[must_use]
869 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
870 self.max_decoding_message_size = Some(limit);
871 self
872 }
873 #[must_use]
877 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
878 self.max_encoding_message_size = Some(limit);
879 self
880 }
881 }
882 impl<T, B> tonic::codegen::Service<http::Request<B>> for ConnectorServiceServer<T>
883 where
884 T: ConnectorService,
885 B: Body + std::marker::Send + 'static,
886 B::Error: Into<StdError> + std::marker::Send + 'static,
887 {
888 type Response = http::Response<tonic::body::Body>;
889 type Error = std::convert::Infallible;
890 type Future = BoxFuture<Self::Response, Self::Error>;
891 fn poll_ready(
892 &mut self,
893 _cx: &mut Context<'_>,
894 ) -> Poll<std::result::Result<(), Self::Error>> {
895 Poll::Ready(Ok(()))
896 }
897 fn call(&mut self, req: http::Request<B>) -> Self::Future {
898 match req.uri().path() {
899 "/connector_service.ConnectorService/SinkWriterStream" => {
900 #[allow(non_camel_case_types)]
901 struct SinkWriterStreamSvc<T: ConnectorService>(pub Arc<T>);
902 impl<
903 T: ConnectorService,
904 > tonic::server::StreamingService<super::SinkWriterStreamRequest>
905 for SinkWriterStreamSvc<T> {
906 type Response = super::SinkWriterStreamResponse;
907 type ResponseStream = T::SinkWriterStreamStream;
908 type Future = BoxFuture<
909 tonic::Response<Self::ResponseStream>,
910 tonic::Status,
911 >;
912 fn call(
913 &mut self,
914 request: tonic::Request<
915 tonic::Streaming<super::SinkWriterStreamRequest>,
916 >,
917 ) -> Self::Future {
918 let inner = Arc::clone(&self.0);
919 let fut = async move {
920 <T as ConnectorService>::sink_writer_stream(&inner, request)
921 .await
922 };
923 Box::pin(fut)
924 }
925 }
926 let accept_compression_encodings = self.accept_compression_encodings;
927 let send_compression_encodings = self.send_compression_encodings;
928 let max_decoding_message_size = self.max_decoding_message_size;
929 let max_encoding_message_size = self.max_encoding_message_size;
930 let inner = self.inner.clone();
931 let fut = async move {
932 let method = SinkWriterStreamSvc(inner);
933 let codec = tonic_prost::ProstCodec::default();
934 let mut grpc = tonic::server::Grpc::new(codec)
935 .apply_compression_config(
936 accept_compression_encodings,
937 send_compression_encodings,
938 )
939 .apply_max_message_size_config(
940 max_decoding_message_size,
941 max_encoding_message_size,
942 );
943 let res = grpc.streaming(method, req).await;
944 Ok(res)
945 };
946 Box::pin(fut)
947 }
948 "/connector_service.ConnectorService/SinkCoordinatorStream" => {
949 #[allow(non_camel_case_types)]
950 struct SinkCoordinatorStreamSvc<T: ConnectorService>(pub Arc<T>);
951 impl<
952 T: ConnectorService,
953 > tonic::server::StreamingService<
954 super::SinkCoordinatorStreamRequest,
955 > for SinkCoordinatorStreamSvc<T> {
956 type Response = super::SinkCoordinatorStreamResponse;
957 type ResponseStream = T::SinkCoordinatorStreamStream;
958 type Future = BoxFuture<
959 tonic::Response<Self::ResponseStream>,
960 tonic::Status,
961 >;
962 fn call(
963 &mut self,
964 request: tonic::Request<
965 tonic::Streaming<super::SinkCoordinatorStreamRequest>,
966 >,
967 ) -> Self::Future {
968 let inner = Arc::clone(&self.0);
969 let fut = async move {
970 <T as ConnectorService>::sink_coordinator_stream(
971 &inner,
972 request,
973 )
974 .await
975 };
976 Box::pin(fut)
977 }
978 }
979 let accept_compression_encodings = self.accept_compression_encodings;
980 let send_compression_encodings = self.send_compression_encodings;
981 let max_decoding_message_size = self.max_decoding_message_size;
982 let max_encoding_message_size = self.max_encoding_message_size;
983 let inner = self.inner.clone();
984 let fut = async move {
985 let method = SinkCoordinatorStreamSvc(inner);
986 let codec = tonic_prost::ProstCodec::default();
987 let mut grpc = tonic::server::Grpc::new(codec)
988 .apply_compression_config(
989 accept_compression_encodings,
990 send_compression_encodings,
991 )
992 .apply_max_message_size_config(
993 max_decoding_message_size,
994 max_encoding_message_size,
995 );
996 let res = grpc.streaming(method, req).await;
997 Ok(res)
998 };
999 Box::pin(fut)
1000 }
1001 "/connector_service.ConnectorService/ValidateSink" => {
1002 #[allow(non_camel_case_types)]
1003 struct ValidateSinkSvc<T: ConnectorService>(pub Arc<T>);
1004 impl<
1005 T: ConnectorService,
1006 > tonic::server::UnaryService<super::ValidateSinkRequest>
1007 for ValidateSinkSvc<T> {
1008 type Response = super::ValidateSinkResponse;
1009 type Future = BoxFuture<
1010 tonic::Response<Self::Response>,
1011 tonic::Status,
1012 >;
1013 fn call(
1014 &mut self,
1015 request: tonic::Request<super::ValidateSinkRequest>,
1016 ) -> Self::Future {
1017 let inner = Arc::clone(&self.0);
1018 let fut = async move {
1019 <T as ConnectorService>::validate_sink(&inner, request)
1020 .await
1021 };
1022 Box::pin(fut)
1023 }
1024 }
1025 let accept_compression_encodings = self.accept_compression_encodings;
1026 let send_compression_encodings = self.send_compression_encodings;
1027 let max_decoding_message_size = self.max_decoding_message_size;
1028 let max_encoding_message_size = self.max_encoding_message_size;
1029 let inner = self.inner.clone();
1030 let fut = async move {
1031 let method = ValidateSinkSvc(inner);
1032 let codec = tonic_prost::ProstCodec::default();
1033 let mut grpc = tonic::server::Grpc::new(codec)
1034 .apply_compression_config(
1035 accept_compression_encodings,
1036 send_compression_encodings,
1037 )
1038 .apply_max_message_size_config(
1039 max_decoding_message_size,
1040 max_encoding_message_size,
1041 );
1042 let res = grpc.unary(method, req).await;
1043 Ok(res)
1044 };
1045 Box::pin(fut)
1046 }
1047 "/connector_service.ConnectorService/GetEventStream" => {
1048 #[allow(non_camel_case_types)]
1049 struct GetEventStreamSvc<T: ConnectorService>(pub Arc<T>);
1050 impl<
1051 T: ConnectorService,
1052 > tonic::server::ServerStreamingService<super::GetEventStreamRequest>
1053 for GetEventStreamSvc<T> {
1054 type Response = super::GetEventStreamResponse;
1055 type ResponseStream = T::GetEventStreamStream;
1056 type Future = BoxFuture<
1057 tonic::Response<Self::ResponseStream>,
1058 tonic::Status,
1059 >;
1060 fn call(
1061 &mut self,
1062 request: tonic::Request<super::GetEventStreamRequest>,
1063 ) -> Self::Future {
1064 let inner = Arc::clone(&self.0);
1065 let fut = async move {
1066 <T as ConnectorService>::get_event_stream(&inner, request)
1067 .await
1068 };
1069 Box::pin(fut)
1070 }
1071 }
1072 let accept_compression_encodings = self.accept_compression_encodings;
1073 let send_compression_encodings = self.send_compression_encodings;
1074 let max_decoding_message_size = self.max_decoding_message_size;
1075 let max_encoding_message_size = self.max_encoding_message_size;
1076 let inner = self.inner.clone();
1077 let fut = async move {
1078 let method = GetEventStreamSvc(inner);
1079 let codec = tonic_prost::ProstCodec::default();
1080 let mut grpc = tonic::server::Grpc::new(codec)
1081 .apply_compression_config(
1082 accept_compression_encodings,
1083 send_compression_encodings,
1084 )
1085 .apply_max_message_size_config(
1086 max_decoding_message_size,
1087 max_encoding_message_size,
1088 );
1089 let res = grpc.server_streaming(method, req).await;
1090 Ok(res)
1091 };
1092 Box::pin(fut)
1093 }
1094 "/connector_service.ConnectorService/ValidateSource" => {
1095 #[allow(non_camel_case_types)]
1096 struct ValidateSourceSvc<T: ConnectorService>(pub Arc<T>);
1097 impl<
1098 T: ConnectorService,
1099 > tonic::server::UnaryService<super::ValidateSourceRequest>
1100 for ValidateSourceSvc<T> {
1101 type Response = super::ValidateSourceResponse;
1102 type Future = BoxFuture<
1103 tonic::Response<Self::Response>,
1104 tonic::Status,
1105 >;
1106 fn call(
1107 &mut self,
1108 request: tonic::Request<super::ValidateSourceRequest>,
1109 ) -> Self::Future {
1110 let inner = Arc::clone(&self.0);
1111 let fut = async move {
1112 <T as ConnectorService>::validate_source(&inner, request)
1113 .await
1114 };
1115 Box::pin(fut)
1116 }
1117 }
1118 let accept_compression_encodings = self.accept_compression_encodings;
1119 let send_compression_encodings = self.send_compression_encodings;
1120 let max_decoding_message_size = self.max_decoding_message_size;
1121 let max_encoding_message_size = self.max_encoding_message_size;
1122 let inner = self.inner.clone();
1123 let fut = async move {
1124 let method = ValidateSourceSvc(inner);
1125 let codec = tonic_prost::ProstCodec::default();
1126 let mut grpc = tonic::server::Grpc::new(codec)
1127 .apply_compression_config(
1128 accept_compression_encodings,
1129 send_compression_encodings,
1130 )
1131 .apply_max_message_size_config(
1132 max_decoding_message_size,
1133 max_encoding_message_size,
1134 );
1135 let res = grpc.unary(method, req).await;
1136 Ok(res)
1137 };
1138 Box::pin(fut)
1139 }
1140 _ => {
1141 Box::pin(async move {
1142 let mut response = http::Response::new(
1143 tonic::body::Body::default(),
1144 );
1145 let headers = response.headers_mut();
1146 headers
1147 .insert(
1148 tonic::Status::GRPC_STATUS,
1149 (tonic::Code::Unimplemented as i32).into(),
1150 );
1151 headers
1152 .insert(
1153 http::header::CONTENT_TYPE,
1154 tonic::metadata::GRPC_CONTENT_TYPE,
1155 );
1156 Ok(response)
1157 })
1158 }
1159 }
1160 }
1161 }
1162 impl<T> Clone for ConnectorServiceServer<T> {
1163 fn clone(&self) -> Self {
1164 let inner = self.inner.clone();
1165 Self {
1166 inner,
1167 accept_compression_encodings: self.accept_compression_encodings,
1168 send_compression_encodings: self.send_compression_encodings,
1169 max_decoding_message_size: self.max_decoding_message_size,
1170 max_encoding_message_size: self.max_encoding_message_size,
1171 }
1172 }
1173 }
1174 pub const SERVICE_NAME: &str = "connector_service.ConnectorService";
1176 impl<T> tonic::server::NamedService for ConnectorServiceServer<T> {
1177 const NAME: &'static str = SERVICE_NAME;
1178 }
1179}
1180pub mod sink_coordination_service_client {
1182 #![allow(
1183 unused_variables,
1184 dead_code,
1185 missing_docs,
1186 clippy::wildcard_imports,
1187 clippy::let_unit_value,
1188 )]
1189 use tonic::codegen::*;
1190 use tonic::codegen::http::Uri;
1191 #[derive(Debug, Clone)]
1192 pub struct SinkCoordinationServiceClient<T> {
1193 inner: tonic::client::Grpc<T>,
1194 }
1195 impl SinkCoordinationServiceClient<tonic::transport::Channel> {
1196 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
1198 where
1199 D: TryInto<tonic::transport::Endpoint>,
1200 D::Error: Into<StdError>,
1201 {
1202 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
1203 Ok(Self::new(conn))
1204 }
1205 }
1206 impl<T> SinkCoordinationServiceClient<T>
1207 where
1208 T: tonic::client::GrpcService<tonic::body::Body>,
1209 T::Error: Into<StdError>,
1210 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
1211 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
1212 {
1213 pub fn new(inner: T) -> Self {
1214 let inner = tonic::client::Grpc::new(inner);
1215 Self { inner }
1216 }
1217 pub fn with_origin(inner: T, origin: Uri) -> Self {
1218 let inner = tonic::client::Grpc::with_origin(inner, origin);
1219 Self { inner }
1220 }
1221 pub fn with_interceptor<F>(
1222 inner: T,
1223 interceptor: F,
1224 ) -> SinkCoordinationServiceClient<InterceptedService<T, F>>
1225 where
1226 F: tonic::service::Interceptor,
1227 T::ResponseBody: Default,
1228 T: tonic::codegen::Service<
1229 http::Request<tonic::body::Body>,
1230 Response = http::Response<
1231 <T as tonic::client::GrpcService<tonic::body::Body>>::ResponseBody,
1232 >,
1233 >,
1234 <T as tonic::codegen::Service<
1235 http::Request<tonic::body::Body>,
1236 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
1237 {
1238 SinkCoordinationServiceClient::new(
1239 InterceptedService::new(inner, interceptor),
1240 )
1241 }
1242 #[must_use]
1247 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
1248 self.inner = self.inner.send_compressed(encoding);
1249 self
1250 }
1251 #[must_use]
1253 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
1254 self.inner = self.inner.accept_compressed(encoding);
1255 self
1256 }
1257 #[must_use]
1261 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
1262 self.inner = self.inner.max_decoding_message_size(limit);
1263 self
1264 }
1265 #[must_use]
1269 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
1270 self.inner = self.inner.max_encoding_message_size(limit);
1271 self
1272 }
1273 pub async fn coordinate(
1274 &mut self,
1275 request: impl tonic::IntoStreamingRequest<Message = super::CoordinateRequest>,
1276 ) -> std::result::Result<
1277 tonic::Response<tonic::codec::Streaming<super::CoordinateResponse>>,
1278 tonic::Status,
1279 > {
1280 self.inner
1281 .ready()
1282 .await
1283 .map_err(|e| {
1284 tonic::Status::unknown(
1285 format!("Service was not ready: {}", e.into()),
1286 )
1287 })?;
1288 let codec = tonic_prost::ProstCodec::default();
1289 let path = http::uri::PathAndQuery::from_static(
1290 "/connector_service.SinkCoordinationService/Coordinate",
1291 );
1292 let mut req = request.into_streaming_request();
1293 req.extensions_mut()
1294 .insert(
1295 GrpcMethod::new(
1296 "connector_service.SinkCoordinationService",
1297 "Coordinate",
1298 ),
1299 );
1300 self.inner.streaming(req, path, codec).await
1301 }
1302 }
1303}
1304pub mod sink_coordination_service_server {
1306 #![allow(
1307 unused_variables,
1308 dead_code,
1309 missing_docs,
1310 clippy::wildcard_imports,
1311 clippy::let_unit_value,
1312 )]
1313 use tonic::codegen::*;
1314 #[async_trait]
1316 pub trait SinkCoordinationService: std::marker::Send + std::marker::Sync + 'static {
1317 type CoordinateStream: tonic::codegen::tokio_stream::Stream<
1319 Item = std::result::Result<super::CoordinateResponse, tonic::Status>,
1320 >
1321 + std::marker::Send
1322 + 'static;
1323 async fn coordinate(
1324 &self,
1325 request: tonic::Request<tonic::Streaming<super::CoordinateRequest>>,
1326 ) -> std::result::Result<tonic::Response<Self::CoordinateStream>, tonic::Status>;
1327 }
1328 #[derive(Debug)]
1329 pub struct SinkCoordinationServiceServer<T> {
1330 inner: Arc<T>,
1331 accept_compression_encodings: EnabledCompressionEncodings,
1332 send_compression_encodings: EnabledCompressionEncodings,
1333 max_decoding_message_size: Option<usize>,
1334 max_encoding_message_size: Option<usize>,
1335 }
1336 impl<T> SinkCoordinationServiceServer<T> {
1337 pub fn new(inner: T) -> Self {
1338 Self::from_arc(Arc::new(inner))
1339 }
1340 pub fn from_arc(inner: Arc<T>) -> Self {
1341 Self {
1342 inner,
1343 accept_compression_encodings: Default::default(),
1344 send_compression_encodings: Default::default(),
1345 max_decoding_message_size: None,
1346 max_encoding_message_size: None,
1347 }
1348 }
1349 pub fn with_interceptor<F>(
1350 inner: T,
1351 interceptor: F,
1352 ) -> InterceptedService<Self, F>
1353 where
1354 F: tonic::service::Interceptor,
1355 {
1356 InterceptedService::new(Self::new(inner), interceptor)
1357 }
1358 #[must_use]
1360 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
1361 self.accept_compression_encodings.enable(encoding);
1362 self
1363 }
1364 #[must_use]
1366 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
1367 self.send_compression_encodings.enable(encoding);
1368 self
1369 }
1370 #[must_use]
1374 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
1375 self.max_decoding_message_size = Some(limit);
1376 self
1377 }
1378 #[must_use]
1382 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
1383 self.max_encoding_message_size = Some(limit);
1384 self
1385 }
1386 }
1387 impl<T, B> tonic::codegen::Service<http::Request<B>>
1388 for SinkCoordinationServiceServer<T>
1389 where
1390 T: SinkCoordinationService,
1391 B: Body + std::marker::Send + 'static,
1392 B::Error: Into<StdError> + std::marker::Send + 'static,
1393 {
1394 type Response = http::Response<tonic::body::Body>;
1395 type Error = std::convert::Infallible;
1396 type Future = BoxFuture<Self::Response, Self::Error>;
1397 fn poll_ready(
1398 &mut self,
1399 _cx: &mut Context<'_>,
1400 ) -> Poll<std::result::Result<(), Self::Error>> {
1401 Poll::Ready(Ok(()))
1402 }
1403 fn call(&mut self, req: http::Request<B>) -> Self::Future {
1404 match req.uri().path() {
1405 "/connector_service.SinkCoordinationService/Coordinate" => {
1406 #[allow(non_camel_case_types)]
1407 struct CoordinateSvc<T: SinkCoordinationService>(pub Arc<T>);
1408 impl<
1409 T: SinkCoordinationService,
1410 > tonic::server::StreamingService<super::CoordinateRequest>
1411 for CoordinateSvc<T> {
1412 type Response = super::CoordinateResponse;
1413 type ResponseStream = T::CoordinateStream;
1414 type Future = BoxFuture<
1415 tonic::Response<Self::ResponseStream>,
1416 tonic::Status,
1417 >;
1418 fn call(
1419 &mut self,
1420 request: tonic::Request<
1421 tonic::Streaming<super::CoordinateRequest>,
1422 >,
1423 ) -> Self::Future {
1424 let inner = Arc::clone(&self.0);
1425 let fut = async move {
1426 <T as SinkCoordinationService>::coordinate(&inner, request)
1427 .await
1428 };
1429 Box::pin(fut)
1430 }
1431 }
1432 let accept_compression_encodings = self.accept_compression_encodings;
1433 let send_compression_encodings = self.send_compression_encodings;
1434 let max_decoding_message_size = self.max_decoding_message_size;
1435 let max_encoding_message_size = self.max_encoding_message_size;
1436 let inner = self.inner.clone();
1437 let fut = async move {
1438 let method = CoordinateSvc(inner);
1439 let codec = tonic_prost::ProstCodec::default();
1440 let mut grpc = tonic::server::Grpc::new(codec)
1441 .apply_compression_config(
1442 accept_compression_encodings,
1443 send_compression_encodings,
1444 )
1445 .apply_max_message_size_config(
1446 max_decoding_message_size,
1447 max_encoding_message_size,
1448 );
1449 let res = grpc.streaming(method, req).await;
1450 Ok(res)
1451 };
1452 Box::pin(fut)
1453 }
1454 _ => {
1455 Box::pin(async move {
1456 let mut response = http::Response::new(
1457 tonic::body::Body::default(),
1458 );
1459 let headers = response.headers_mut();
1460 headers
1461 .insert(
1462 tonic::Status::GRPC_STATUS,
1463 (tonic::Code::Unimplemented as i32).into(),
1464 );
1465 headers
1466 .insert(
1467 http::header::CONTENT_TYPE,
1468 tonic::metadata::GRPC_CONTENT_TYPE,
1469 );
1470 Ok(response)
1471 })
1472 }
1473 }
1474 }
1475 }
1476 impl<T> Clone for SinkCoordinationServiceServer<T> {
1477 fn clone(&self) -> Self {
1478 let inner = self.inner.clone();
1479 Self {
1480 inner,
1481 accept_compression_encodings: self.accept_compression_encodings,
1482 send_compression_encodings: self.send_compression_encodings,
1483 max_decoding_message_size: self.max_decoding_message_size,
1484 max_encoding_message_size: self.max_encoding_message_size,
1485 }
1486 }
1487 }
1488 pub const SERVICE_NAME: &str = "connector_service.SinkCoordinationService";
1490 impl<T> tonic::server::NamedService for SinkCoordinationServiceServer<T> {
1491 const NAME: &'static str = SERVICE_NAME;
1492 }
1493}