1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{AddAssign, Deref};
17
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::TableId;
21use risingwave_common::hash::{
22 IsSingleton, VirtualNode, VnodeCount, VnodeCountCompat, WorkerSlotId,
23};
24use risingwave_common::util::stream_graph_visitor::{self, visit_stream_node};
25use risingwave_connector::source::SplitImpl;
26use risingwave_meta_model::actor_dispatcher::DispatcherType;
27use risingwave_meta_model::{SourceId, StreamingParallelism, WorkerId};
28use risingwave_pb::catalog::Table;
29use risingwave_pb::common::{ActorInfo, PbActorLocation};
30use risingwave_pb::meta::table_fragments::actor_status::ActorState;
31use risingwave_pb::meta::table_fragments::fragment::{
32 FragmentDistributionType, PbFragmentDistributionType,
33};
34use risingwave_pb::meta::table_fragments::{ActorStatus, PbFragment, State};
35use risingwave_pb::meta::table_parallelism::{
36 FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism,
37 PbParallelism,
38};
39use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
40use risingwave_pb::plan_common::PbExprContext;
41use risingwave_pb::stream_plan::stream_node::NodeBody;
42use risingwave_pb::stream_plan::{
43 DispatchStrategy, Dispatcher, FragmentTypeFlag, PbDispatcher, PbStreamActor, PbStreamContext,
44 StreamNode,
45};
46
47use super::{ActorId, FragmentId};
48use crate::model::MetadataModelResult;
49use crate::stream::{SplitAssignment, build_actor_connector_splits};
50
51#[derive(Debug, Copy, Clone, Eq, PartialEq)]
53pub enum TableParallelism {
54 Adaptive,
56 Fixed(usize),
59 Custom,
66}
67
68impl From<PbTableParallelism> for TableParallelism {
69 fn from(value: PbTableParallelism) -> Self {
70 use Parallelism::*;
71 match &value.parallelism {
72 Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
73 Some(Adaptive(_)) | Some(Auto(_)) => Self::Adaptive,
74 Some(Custom(_)) => Self::Custom,
75 _ => unreachable!(),
76 }
77 }
78}
79
80impl From<TableParallelism> for PbTableParallelism {
81 fn from(value: TableParallelism) -> Self {
82 use TableParallelism::*;
83
84 let parallelism = match value {
85 Adaptive => PbParallelism::Adaptive(PbAdaptiveParallelism {}),
86 Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
87 parallelism: n as u32,
88 }),
89 Custom => PbParallelism::Custom(PbCustomParallelism {}),
90 };
91
92 Self {
93 parallelism: Some(parallelism),
94 }
95 }
96}
97
98impl From<StreamingParallelism> for TableParallelism {
99 fn from(value: StreamingParallelism) -> Self {
100 match value {
101 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
102 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
103 StreamingParallelism::Custom => TableParallelism::Custom,
104 }
105 }
106}
107
108impl From<TableParallelism> for StreamingParallelism {
109 fn from(value: TableParallelism) -> Self {
110 match value {
111 TableParallelism::Adaptive => StreamingParallelism::Adaptive,
112 TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
113 TableParallelism::Custom => StreamingParallelism::Custom,
114 }
115 }
116}
117
118pub type ActorUpstreams = BTreeMap<FragmentId, HashMap<ActorId, ActorInfo>>;
119pub type StreamActorWithDispatchers = (StreamActor, Vec<PbDispatcher>);
120pub type StreamActorWithUpDownstreams = (StreamActor, ActorUpstreams, Vec<PbDispatcher>);
121pub type FragmentActorDispatchers = HashMap<FragmentId, HashMap<ActorId, Vec<PbDispatcher>>>;
122
123pub type FragmentDownstreamRelation = HashMap<FragmentId, Vec<DownstreamFragmentRelation>>;
124pub type FragmentReplaceUpstream = HashMap<FragmentId, HashMap<FragmentId, FragmentId>>;
126pub type FragmentNewNoShuffle = HashMap<FragmentId, HashMap<FragmentId, HashMap<ActorId, ActorId>>>;
129
130#[derive(Debug, Clone)]
131pub struct DownstreamFragmentRelation {
132 pub downstream_fragment_id: FragmentId,
133 pub dispatcher_type: DispatcherType,
134 pub dist_key_indices: Vec<u32>,
135 pub output_indices: Vec<u32>,
136}
137
138impl From<(FragmentId, DispatchStrategy)> for DownstreamFragmentRelation {
139 fn from((fragment_id, dispatch): (FragmentId, DispatchStrategy)) -> Self {
140 Self {
141 downstream_fragment_id: fragment_id,
142 dispatcher_type: dispatch.get_type().unwrap().into(),
143 dist_key_indices: dispatch.dist_key_indices,
144 output_indices: dispatch.output_indices,
145 }
146 }
147}
148
149#[derive(Debug, Clone)]
150pub struct StreamJobFragmentsToCreate {
151 pub inner: StreamJobFragments,
152 pub downstreams: FragmentDownstreamRelation,
153}
154
155impl Deref for StreamJobFragmentsToCreate {
156 type Target = StreamJobFragments;
157
158 fn deref(&self) -> &Self::Target {
159 &self.inner
160 }
161}
162
163#[derive(Clone, Debug)]
164pub struct StreamActor {
165 pub actor_id: u32,
166 pub fragment_id: u32,
167 pub vnode_bitmap: Option<Bitmap>,
168 pub mview_definition: String,
169 pub expr_context: Option<PbExprContext>,
170}
171
172impl StreamActor {
173 fn to_protobuf(&self, dispatchers: impl Iterator<Item = Dispatcher>) -> PbStreamActor {
174 PbStreamActor {
175 actor_id: self.actor_id,
176 fragment_id: self.fragment_id,
177 dispatcher: dispatchers.collect(),
178 vnode_bitmap: self
179 .vnode_bitmap
180 .as_ref()
181 .map(|bitmap| bitmap.to_protobuf()),
182 mview_definition: self.mview_definition.clone(),
183 expr_context: self.expr_context.clone(),
184 }
185 }
186}
187
188#[derive(Clone, Debug, Default)]
189pub struct Fragment {
190 pub fragment_id: FragmentId,
191 pub fragment_type_mask: u32,
192 pub distribution_type: PbFragmentDistributionType,
193 pub actors: Vec<StreamActor>,
194 pub state_table_ids: Vec<u32>,
195 pub maybe_vnode_count: Option<u32>,
196 pub nodes: StreamNode,
197}
198
199impl Fragment {
200 pub fn to_protobuf(
201 &self,
202 upstream_fragments: impl Iterator<Item = FragmentId>,
203 dispatchers: Option<&HashMap<ActorId, Vec<Dispatcher>>>,
204 ) -> PbFragment {
205 PbFragment {
206 fragment_id: self.fragment_id,
207 fragment_type_mask: self.fragment_type_mask,
208 distribution_type: self.distribution_type as _,
209 actors: self
210 .actors
211 .iter()
212 .map(|actor| {
213 actor.to_protobuf(
214 dispatchers
215 .and_then(|dispatchers| dispatchers.get(&(actor.actor_id as _)))
216 .into_iter()
217 .flatten()
218 .cloned(),
219 )
220 })
221 .collect(),
222 state_table_ids: self.state_table_ids.clone(),
223 upstream_fragment_ids: upstream_fragments.collect(),
224 maybe_vnode_count: self.maybe_vnode_count,
225 nodes: Some(self.nodes.clone()),
226 }
227 }
228}
229
230impl VnodeCountCompat for Fragment {
231 fn vnode_count_inner(&self) -> VnodeCount {
232 VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
233 }
234}
235
236impl IsSingleton for Fragment {
237 fn is_singleton(&self) -> bool {
238 matches!(self.distribution_type, FragmentDistributionType::Single)
239 }
240}
241
242#[derive(Debug, Clone)]
248pub struct StreamJobFragments {
249 pub stream_job_id: TableId,
251
252 pub state: State,
254
255 pub fragments: BTreeMap<FragmentId, Fragment>,
257
258 pub actor_status: BTreeMap<ActorId, ActorStatus>,
260
261 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
264
265 pub ctx: StreamContext,
267
268 pub assigned_parallelism: TableParallelism,
270
271 pub max_parallelism: usize,
282}
283
284#[derive(Debug, Clone, Default)]
285pub struct StreamContext {
286 pub timezone: Option<String>,
288}
289
290impl StreamContext {
291 pub fn to_protobuf(&self) -> PbStreamContext {
292 PbStreamContext {
293 timezone: self.timezone.clone().unwrap_or("".into()),
294 }
295 }
296
297 pub fn to_expr_context(&self) -> PbExprContext {
298 PbExprContext {
299 time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
301 strict_mode: false,
302 }
303 }
304
305 pub fn from_protobuf(prost: &PbStreamContext) -> Self {
306 Self {
307 timezone: if prost.get_timezone().is_empty() {
308 None
309 } else {
310 Some(prost.get_timezone().clone())
311 },
312 }
313 }
314}
315
316impl StreamJobFragments {
317 pub fn to_protobuf(
318 &self,
319 fragment_upstreams: &HashMap<FragmentId, HashSet<FragmentId>>,
320 fragment_dispatchers: &FragmentActorDispatchers,
321 ) -> PbTableFragments {
322 PbTableFragments {
323 table_id: self.stream_job_id.table_id(),
324 state: self.state as _,
325 fragments: self
326 .fragments
327 .iter()
328 .map(|(id, fragment)| {
329 (
330 *id,
331 fragment.to_protobuf(
332 fragment_upstreams.get(id).into_iter().flatten().cloned(),
333 fragment_dispatchers.get(&(*id as _)),
334 ),
335 )
336 })
337 .collect(),
338 actor_status: self.actor_status.clone().into_iter().collect(),
339 actor_splits: build_actor_connector_splits(&self.actor_splits),
340 ctx: Some(self.ctx.to_protobuf()),
341 parallelism: Some(self.assigned_parallelism.into()),
342 node_label: "".to_owned(),
343 backfill_done: true,
344 max_parallelism: Some(self.max_parallelism as _),
345 }
346 }
347}
348
349pub type StreamJobActorsToCreate =
350 HashMap<WorkerId, HashMap<FragmentId, (StreamNode, Vec<StreamActorWithUpDownstreams>)>>;
351
352impl StreamJobFragments {
353 pub fn for_test(table_id: TableId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
355 Self::new(
356 table_id,
357 fragments,
358 &BTreeMap::new(),
359 StreamContext::default(),
360 TableParallelism::Adaptive,
361 VirtualNode::COUNT_FOR_TEST,
362 )
363 }
364
365 pub fn new(
368 stream_job_id: TableId,
369 fragments: BTreeMap<FragmentId, Fragment>,
370 actor_locations: &BTreeMap<ActorId, WorkerSlotId>,
371 ctx: StreamContext,
372 table_parallelism: TableParallelism,
373 max_parallelism: usize,
374 ) -> Self {
375 let actor_status = actor_locations
376 .iter()
377 .map(|(&actor_id, worker_slot_id)| {
378 (
379 actor_id,
380 ActorStatus {
381 location: PbActorLocation::from_worker(worker_slot_id.worker_id()),
382 state: ActorState::Inactive as i32,
383 },
384 )
385 })
386 .collect();
387
388 Self {
389 stream_job_id,
390 state: State::Initial,
391 fragments,
392 actor_status,
393 actor_splits: HashMap::default(),
394 ctx,
395 assigned_parallelism: table_parallelism,
396 max_parallelism,
397 }
398 }
399
400 pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_ {
401 self.fragments.keys().cloned()
402 }
403
404 pub fn fragments(&self) -> impl Iterator<Item = &Fragment> {
405 self.fragments.values()
406 }
407
408 pub fn stream_job_id(&self) -> TableId {
410 self.stream_job_id
411 }
412
413 pub fn state(&self) -> State {
415 self.state
416 }
417
418 pub fn timezone(&self) -> Option<String> {
420 self.ctx.timezone.clone()
421 }
422
423 pub fn is_created(&self) -> bool {
425 self.state == State::Created
426 }
427
428 pub fn is_initial(&self) -> bool {
430 self.state == State::Initial
431 }
432
433 pub fn set_state(&mut self, state: State) {
435 self.state = state;
436 }
437
438 pub fn update_actors_state(&mut self, state: ActorState) {
440 for actor_status in self.actor_status.values_mut() {
441 actor_status.set_state(state);
442 }
443 }
444
445 pub fn set_actor_splits_by_split_assignment(&mut self, split_assignment: SplitAssignment) {
446 self.actor_splits = split_assignment.into_values().flatten().collect();
447 }
448
449 pub fn actor_ids(&self) -> Vec<ActorId> {
451 self.fragments
452 .values()
453 .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
454 .collect()
455 }
456
457 pub fn actor_fragment_mapping(&self) -> HashMap<ActorId, FragmentId> {
458 self.fragments
459 .values()
460 .flat_map(|fragment| {
461 fragment
462 .actors
463 .iter()
464 .map(|actor| (actor.actor_id, fragment.fragment_id))
465 })
466 .collect()
467 }
468
469 #[cfg(test)]
471 pub fn actors(&self) -> Vec<StreamActor> {
472 self.fragments
473 .values()
474 .flat_map(|fragment| fragment.actors.clone())
475 .collect()
476 }
477
478 pub fn filter_actor_ids(
480 &self,
481 check_type: impl Fn(u32) -> bool + 'static,
482 ) -> impl Iterator<Item = ActorId> + '_ {
483 self.fragments
484 .values()
485 .filter(move |fragment| check_type(fragment.fragment_type_mask))
486 .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
487 }
488
489 pub fn mview_actor_ids(&self) -> Vec<ActorId> {
491 Self::filter_actor_ids(self, |fragment_type_mask| {
492 (fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0
493 })
494 .collect()
495 }
496
497 pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
499 let mut actor_ids = vec![];
500 for fragment in self.fragments.values() {
501 if fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32 != 0 {
502 return vec![];
505 }
506 if (fragment.fragment_type_mask
507 & (FragmentTypeFlag::Values as u32
508 | FragmentTypeFlag::StreamScan as u32
509 | FragmentTypeFlag::SourceScan as u32))
510 != 0
511 {
512 actor_ids.extend(fragment.actors.iter().map(|actor| {
513 (
514 actor.actor_id,
515 BackfillUpstreamType::from_fragment_type_mask(fragment.fragment_type_mask),
516 )
517 }));
518 }
519 }
520 actor_ids
521 }
522
523 pub fn root_fragment(&self) -> Option<Fragment> {
524 self.mview_fragment()
525 .or_else(|| self.sink_fragment())
526 .or_else(|| self.source_fragment())
527 }
528
529 pub fn mview_fragment(&self) -> Option<Fragment> {
531 self.fragments
532 .values()
533 .find(|fragment| (fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0)
534 .cloned()
535 }
536
537 pub fn source_fragment(&self) -> Option<Fragment> {
538 self.fragments
539 .values()
540 .find(|fragment| (fragment.fragment_type_mask & FragmentTypeFlag::Source as u32) != 0)
541 .cloned()
542 }
543
544 pub fn sink_fragment(&self) -> Option<Fragment> {
545 self.fragments
546 .values()
547 .find(|fragment| (fragment.fragment_type_mask & FragmentTypeFlag::Sink as u32) != 0)
548 .cloned()
549 }
550
551 pub fn snapshot_backfill_actor_ids(&self) -> HashSet<ActorId> {
552 Self::filter_actor_ids(self, |mask| {
553 (mask & FragmentTypeFlag::SnapshotBackfillStreamScan as u32) != 0
554 })
555 .collect()
556 }
557
558 pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>> {
561 let mut source_fragments = HashMap::new();
562
563 for fragment in self.fragments() {
564 {
565 if let Some(source_id) = fragment.nodes.find_stream_source() {
566 source_fragments
567 .entry(source_id as SourceId)
568 .or_insert(BTreeSet::new())
569 .insert(fragment.fragment_id as FragmentId);
570 }
571 }
572 }
573 source_fragments
574 }
575
576 pub fn source_backfill_fragments(
581 &self,
582 ) -> MetadataModelResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
583 let mut source_backfill_fragments = HashMap::new();
584
585 for fragment in self.fragments() {
586 {
587 if let Some((source_id, upstream_source_fragment_id)) =
588 fragment.nodes.find_source_backfill()
589 {
590 source_backfill_fragments
591 .entry(source_id as SourceId)
592 .or_insert(BTreeSet::new())
593 .insert((fragment.fragment_id, upstream_source_fragment_id));
594 }
595 }
596 }
597 Ok(source_backfill_fragments)
598 }
599
600 pub fn union_fragment_for_table(&mut self) -> &mut Fragment {
603 let mut union_fragment_id = None;
604 for (fragment_id, fragment) in &self.fragments {
605 {
606 {
607 visit_stream_node(&fragment.nodes, |body| {
608 if let NodeBody::Union(_) = body {
609 if let Some(union_fragment_id) = union_fragment_id.as_mut() {
610 assert_eq!(*union_fragment_id, *fragment_id);
612 } else {
613 union_fragment_id = Some(*fragment_id);
614 }
615 }
616 })
617 }
618 }
619 }
620
621 let union_fragment_id =
622 union_fragment_id.expect("fragment of placeholder merger not found");
623
624 (self
625 .fragments
626 .get_mut(&union_fragment_id)
627 .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id))) as _
628 }
629
630 fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>) {
632 let table_id = match stream_node.node_body.as_ref() {
633 Some(NodeBody::StreamScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
634 Some(NodeBody::StreamCdcScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
635 _ => None,
636 };
637 if let Some(table_id) = table_id {
638 table_ids.entry(table_id).or_default().add_assign(1);
639 }
640
641 for child in &stream_node.input {
642 Self::resolve_dependent_table(child, table_ids);
643 }
644 }
645
646 pub fn upstream_table_counts(&self) -> HashMap<TableId, usize> {
648 let mut table_ids = HashMap::new();
649 self.fragments.values().for_each(|fragment| {
650 Self::resolve_dependent_table(&fragment.nodes, &mut table_ids);
651 });
652
653 table_ids
654 }
655
656 pub fn worker_actor_states(&self) -> BTreeMap<WorkerId, Vec<(ActorId, ActorState)>> {
658 let mut map = BTreeMap::default();
659 for (&actor_id, actor_status) in &self.actor_status {
660 let node_id = actor_status.worker_id() as WorkerId;
661 map.entry(node_id)
662 .or_insert_with(Vec::new)
663 .push((actor_id, actor_status.state()));
664 }
665 map
666 }
667
668 pub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>> {
670 let mut map = BTreeMap::default();
671 for (&actor_id, actor_status) in &self.actor_status {
672 let node_id = actor_status.worker_id() as WorkerId;
673 map.entry(node_id).or_insert_with(Vec::new).push(actor_id);
674 }
675 map
676 }
677
678 pub fn active_actors(&self) -> Vec<StreamActor> {
680 let mut actors = vec![];
681 for fragment in self.fragments.values() {
682 for actor in &fragment.actors {
683 if self.actor_status[&actor.actor_id].state == ActorState::Inactive as i32 {
684 continue;
685 }
686 actors.push(actor.clone());
687 }
688 }
689 actors
690 }
691
692 pub fn actors_to_create(
693 &self,
694 ) -> impl Iterator<
695 Item = (
696 FragmentId,
697 &StreamNode,
698 impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
699 ),
700 > + '_ {
701 self.fragments.values().map(move |fragment| {
702 (
703 fragment.fragment_id,
704 &fragment.nodes,
705 fragment.actors.iter().map(move |actor| {
706 let worker_id = self
707 .actor_status
708 .get(&actor.actor_id)
709 .expect("should exist")
710 .worker_id() as WorkerId;
711 (actor, worker_id)
712 }),
713 )
714 })
715 }
716
717 pub fn mv_table_id(&self) -> Option<u32> {
718 if self
719 .fragments
720 .values()
721 .flat_map(|f| f.state_table_ids.iter())
722 .any(|table_id| *table_id == self.stream_job_id.table_id)
723 {
724 Some(self.stream_job_id.table_id)
725 } else {
726 None
727 }
728 }
729
730 pub fn internal_tables(&self) -> BTreeMap<u32, Table> {
735 self.collect_tables_inner(true)
736 }
737
738 pub fn all_tables(&self) -> BTreeMap<u32, Table> {
740 self.collect_tables_inner(false)
741 }
742
743 fn collect_tables_inner(&self, internal_tables_only: bool) -> BTreeMap<u32, Table> {
744 let mut tables = BTreeMap::new();
745 for fragment in self.fragments.values() {
746 stream_graph_visitor::visit_stream_node_tables_inner(
747 &mut fragment.nodes.clone(),
748 internal_tables_only,
749 true,
750 |table, _| {
751 let table_id = table.id;
752 tables
753 .try_insert(table_id, table.clone())
754 .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
755 },
756 );
757 }
758 tables
759 }
760
761 pub fn internal_table_ids(&self) -> Vec<u32> {
763 self.fragments
764 .values()
765 .flat_map(|f| f.state_table_ids.clone())
766 .filter(|&t| t != self.stream_job_id.table_id)
767 .collect_vec()
768 }
769
770 pub fn all_table_ids(&self) -> impl Iterator<Item = u32> + '_ {
772 self.fragments
773 .values()
774 .flat_map(|f| f.state_table_ids.clone())
775 }
776
777 pub fn fill_expr_context(mut self) -> Self {
779 self.fragments.values_mut().for_each(|fragment| {
780 fragment.actors.iter_mut().for_each(|actor| {
781 if actor.expr_context.is_none() {
782 actor.expr_context = Some(self.ctx.to_expr_context());
783 }
784 });
785 });
786 self
787 }
788}
789
790#[derive(Debug, Clone, Copy, PartialEq, Eq)]
791pub enum BackfillUpstreamType {
792 MView,
793 Values,
794 Source,
795}
796
797impl BackfillUpstreamType {
798 pub fn from_fragment_type_mask(mask: u32) -> Self {
799 let is_mview = (mask & FragmentTypeFlag::StreamScan as u32) != 0;
800 let is_values = (mask & FragmentTypeFlag::Values as u32) != 0;
801 let is_source = (mask & FragmentTypeFlag::SourceScan as u32) != 0;
802
803 debug_assert!(
806 is_mview as u8 + is_values as u8 + is_source as u8 == 1,
807 "a backfill fragment should either be mview, value or source, found {:?}",
808 mask
809 );
810
811 if is_mview {
812 BackfillUpstreamType::MView
813 } else if is_values {
814 BackfillUpstreamType::Values
815 } else if is_source {
816 BackfillUpstreamType::Source
817 } else {
818 unreachable!("invalid fragment type mask: {}", mask);
819 }
820 }
821}