Introduction
This guide is intended to be used by contributors to learn about how to develop RisingWave. The instructions about how to submit code changes are included in contribution guidelines.
If you have questions, you can search for existing discussions or start a new discussion in the Discussions forum of RisingWave, or ask in the RisingWave Community channel on Slack. Please use the invitation link to join the channel.
To report bugs, create a GitHub issue.
Note: the url was previously for the crate rustdocs, and now they are moved to path /risingwave/rustdoc
Read the design docs
Before you start to make code changes, ensure that you understand the design and implementation of RisingWave. We recommend that you read the design docs listed in docs/README.md first.
You can also read the crate level documentation for implementation details, or run ./risedev doc
to read it locally.
Learn about the code structure
- The
src
folder contains all of the kernel components, refer to src/README.md for more details, which contains more details about Design Patterns in RisingWave. - The
docker
folder contains Docker files to build and start RisingWave. - The
e2e_test
folder contains the latest end-to-end test cases. - The
docs
folder contains the design docs. If you want to learn about how RisingWave is designed and implemented, check out the design docs here. - The
dashboard
folder contains RisingWave dashboard.
Contribution guidelines
Thanks for your interest in contributing to RisingWave! We welcome and appreciate contributions.
This document describes how to submit your code changes. To learn about the development process, see other chapters of the book. To understand the design and implementation of RisingWave, refer to the design docs listed in docs/README.md.
If you have questions, you can search for existing discussions or start a new discussion in the Discussions forum of RisingWave, or ask in the RisingWave Community channel on Slack. Please use the invitation link to join the channel.
To report bugs, create a GitHub issue.
Tests and miscellaneous checks
Before submitting your code changes, ensure you fully test them and perform necessary checks. The testing instructions and necessary checks are detailed in other sections of the book.
Submit a PR
Pull Request title
As described in here, a valid PR title should begin with one of the following prefixes:
feat
: A new featurefix
: A bug fixdoc
: Documentation only changesrefactor
: A code change that neither fixes a bug nor adds a featurestyle
: A refactoring that improves code styleperf
: A code change that improves performancetest
: Adding missing tests or correcting existing testsbuild
: Changes that affect the build system or external dependencies (example scopes:.config
,.cargo
,Cargo.toml
)ci
: Changes to RisingWave CI configuration files and scripts (example scopes:.github
,ci
(Buildkite))chore
: Other changes that don’t modify src or test filesrevert
: Reverts a previous commit
For example, a PR title could be:
refactor: modify executor protobuf package path
feat(execution): enable comparison between nullable data arrays
, where(execution)
means that this PR mainly focuses on the execution component.
You may also check out previous PRs in the PR list.
Pull Request description
- If your PR is small (such as a typo fix), you can go brief.
- If it is large and you have changed a lot, it’s better to write more details.
Sign the CLA
Contributors will need to sign RisingWave Labs’ CLA.
Cherry pick the commit to release candidate branch
We have a GitHub Action to help cherry-pick commits from main
branch to a release candidate
branch, such as v*.*.*-rc
where *
is a number.
Checkout details at: https://github.com/risingwavelabs/risingwave/blob/main/.github/workflows/cherry-pick-to-release-branch.yml
To trigger the action, we give a correct label to the PR on main
branch :
https://github.com/risingwavelabs/risingwave/blob/main/.github/workflows/cherry-pick-to-release-branch.yml#L10
It will act when the PR on main
branch merged:
-
If
git cherry-pick
does not find any conflicts, it will open a PR to therelease candidate
branch, and assign the original author as the reviewer. -
If there is a conflict, it will open an issue and make the original author the assignee.
How to build and run RisingWave
Set up the development environment
macOS
To install the dependencies on macOS, run:
brew install postgresql cmake protobuf tmux cyrus-sasl lld openssl@3
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
Debian-based Linux
To install the dependencies on Debian-based Linux systems, run:
sudo apt install make build-essential cmake protobuf-compiler curl postgresql-client tmux lld pkg-config libssl-dev libsasl2-dev
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
nix shell
If you use nix, you can also enter the nix shell via:
nix develop ./develop/nix
All dependencies will be automatically downloaded and configured.
You can also use direnv to automatically enter the nix shell:
direnv allow
Check out flake.nix to read more information!
Then you’ll be able to compile and start RisingWave!
.cargo/config.toml
containsrustflags
configurations like-Clink-arg
and-Ctarget-feature
. Since it will be merged with$HOME/.cargo/config.toml
, check the config files and make sure they don’t conflict if you have globalrustflags
configurations for e.g. linker there.
If you want to build RisingWave with
embedded-python-udf
feature, you need to install Python 3.12.To install Python 3.12 on macOS, run:
brew install python@3.12
To install Python 3.12 on Debian-based Linux systems, run:
sudo apt install software-properties-common sudo add-apt-repository ppa:deadsnakes/ppa sudo apt-get update sudo apt-get install python3.12 python3.12-dev
If the default
python3
version is not 3.12, please set thePYO3_PYTHON
environment variable:export PYO3_PYTHON=python3.12
Start and monitor a dev cluster
RiseDev is the RisingWave developers’ tool. You can now use RiseDev to start a dev cluster. It is as simple as:
./risedev d # shortcut for ./risedev dev
psql -h localhost -p 4566 -d dev -U root
The default dev cluster includes meta-node, compute-node and frontend-node processes, and an embedded volatile in-memory state storage. No data will be persisted. This configuration is intended to make it easier to develop and debug RisingWave.
To stop the cluster:
./risedev k # shortcut for ./risedev kill
To view the logs:
./risedev l # shortcut for ./risedev logs
To clean local data and logs:
./risedev clean-data
Tips for compilation
If you detect memory bottlenecks while compiling, either allocate some disk space on your computer as swap memory, or lower the compilation parallelism with CARGO_BUILD_JOBS
, e.g. CARGO_BUILD_JOBS=2
.
Configure additional components
There are a few additional components supported by RiseDev.
Use the ./risedev configure
command to enable and disable components.
- Hummock (MinIO + MinIO-CLI): Enable this component to persist state data.
- Prometheus and Grafana: Enable this component to view RisingWave metrics. You can view the metrics through a built-in Grafana dashboard.
- Etcd: Enable this component if you want to persist metadata node data.
- Kafka: Enable this component if you want to create a streaming source from a Kafka topic.
- Grafana Tempo: Use this component for tracing.
Enabling a component with the
./risedev configure
command will only download the component to your environment. To allow it to function, you must revise the corresponding configuration setting inrisedev.yml
and restart the dev cluster.
For example, you can modify the default section to:
default:
- use: minio
- use: meta-node
- use: compute-node
- use: frontend
- use: prometheus
- use: grafana
- use: kafka
persist-data: true
Now you can run ./risedev d
to start a new dev cluster. The new dev cluster will contain components as configured in the yaml file. RiseDev will automatically configure the components to use the available storage service and to monitor the target.
You may also add multiple compute nodes in the cluster. The ci-3cn-1fe
config is an example.
Configure system variables
You can check src/common/src/config.rs
to see all the configurable variables.
If additional variables are needed,
include them in the correct sections (such as [server]
or [storage]
) in src/config/risingwave.toml
.
Start the playground
If you do not need to start a full cluster to develop, you can issue ./risedev p
to start the playground, where the metadata node, compute nodes and frontend nodes are running in the same process. Logs are printed to stdout instead of separate log files.
./risedev p # shortcut for ./risedev playground
For more information, refer to README.md
under src/risedevtool
.
You can also start the playground with cargo
directly:
cargo run --bin risingwave -- playground
Then, connect to the playground instance via:
psql -h localhost -p 4566 -d dev -U root
Build Profiles
RisingWave uses Cargo profiles to manage build settings. To briefly introduce Cargo profiles, here is a snippet from the Cargo References:
Profiles provide a way to alter the compiler settings, influencing things like optimizations and debugging symbols.
Cargo has 4 built-in profiles:
dev
,release
,test
, andbench
. The profile is automatically chosen based on which command is being run if a profile is not specified on the command-line. In addition to the built-in profiles, custom user-defined profiles can also be specified.
All profiles talked in this document are defined in the Cargo.toml
file of the root directory of the project. Please always refer to it for the most up-to-date information.
Built-in Profiles
RisingWave tweaks some settings of the built-in profiles to better fit its needs, in the sections of [profile.<built-in-profile>]
in Cargo.toml
. For example,
-
dev
: for local development and testing- completely disables LTO to speed up the build time
-
release
: for local testing with near-production performance- completely disables LTO to speed up the build time
- embeds full debug information to help with debugging in production
Custom Profiles
RisingWave also defines some custom profiles that inherit from the built-in profiles, in the sections of [profile.<custom-profile>]
in Cargo.toml
. For example,
-
production
: for distribution and production deployment- inherits from
release
- enables aggressive code optimizations (like LTO) for maximum performance, at the cost of significantly increased build time
- inherits from
-
ci-dev
: forpull-request
pipelines in CI- inherits from
dev
- tweaks some settings to reduce the build time and binary size
- enables code optimizations for 3rd-party dependencies to improve CI performance
- inherits from
-
ci-release
: formain
andmain-cron
pipelines in CI- inherits from
release
- tweaks some settings to reduce the build time and binary size
- enables more runtime checks (like debug assertions and overflow checks) to catch potential bugs
- inherits from
-
ci-sim
: formadsim
simulation tests in CI- similar to
ci-dev
- enables slight code optimizations for all crates to improve CI performance under single-threaded madsim execution
- similar to
Comparisons
To give a better idea of the differences between the profiles, here is a matrix comparing the profiles:
Profile | Debug Info | cfg(debug_assertions) | Performance | Build Time |
---|---|---|---|---|
dev | Full | true | Bad | Fastest |
release | Full | false | Good | Slow |
production | Full | false | Best | Slowest |
ci-dev | Backtrace only | true | Medium | Fast |
ci-release | Backtrace only | true | Good | Slow |
ci-sim | Backtrace only | true | Medium | Medium |
Some miscellaneous notes:
- Compared to “Backtrace only”, “Full” debug information additionally includes the capability to attach a debugger at runtime or on core dumps, to inspect variables and stack traces.
- There are also other subtle differences like incremental compilation settings, overflow checks, and more. They are not listed here for brevity.
cfg(debug_assertions)
can be roughly used to determine whether it’s a production environment or not. Note that even thoughci-release
containsrelease
in its name, thedebug_assertions
are still enabled.
Choose a Profile
By default, RisingWave (and RiseDev) uses the dev
profile for local development and testing. To use release
profile instead, you can set the corresponding configuration entry by running risedev configure
. Other profiles are for their specific use cases and are not meant to be used directly by developers, thus not available with RiseDev.
Testing
Before you submit a PR, fully test the code changes and perform necessary checks.
The RisingWave project enforces several checks in CI. Every time the code is modified, you need to perform the checks and ensure they pass.
- Lint
- Unit and integration tests
- Planner tests
- End-to-end tests
- Fuzzing tests
- DocSlt tests
- Deterministic simulation tests
- Backwards compatibility tests
Lint
RisingWave requires all code to pass fmt, clippy, sort and hakari checks. Run the following commands to install test tools and perform these checks.
./risedev install-tools # Install required tools for running unit tests
./risedev c # Run all checks. Shortcut for ./risedev check
There are also some miscellaneous checks. See ci/scripts/misc-check.sh
.
Unit and integration tests
RiseDev runs unit tests with cargo-nextest. To run unit tests:
./risedev test # Run unit tests
Some ideas and caveats for writing tests:
-
Use expect_test to write data driven tests that can automatically update results.
-
It’s recommended to write new tests as integration tests (i.e. in
tests/
directory) instead of unit tests (i.e. insrc/
directory).Besides, put integration tests under
tests/integration_tests/*.rs
, instead oftests/*.rs
. See Delete Cargo Integration Tests and #9878, for more details.
You might want to read How to Test for more good ideas on testing.
Planner tests
RisingWave’s SQL frontend has SQL planner tests.
End-to-end tests
We use sqllogictest-rs to run RisingWave e2e tests.
Refer to Sqllogictest .slt
Test File Format Cookbook for the syntax.
Before running end-to-end tests, you will need to start a full cluster first:
./risedev d
Then to run the end-to-end tests, you can use one of the following commands according to which component you are developing:
# run all streaming tests
./risedev slt-streaming -p 4566 -d dev -j 1
# run all batch tests
./risedev slt-batch -p 4566 -d dev -j 1
# run both
./risedev slt-all -p 4566 -d dev -j 1
Use
-j 1
to create a separate database for each test case, which can ensure that previous test case failure won’t affect other tests due to table cleanups.
Alternatively, you can also run some specific tests:
# run a single test
./risedev slt -p 4566 -d dev './e2e_test/path/to/file.slt'
# run all tests under a directory (including subdirectories)
./risedev slt -p 4566 -d dev './e2e_test/path/to/directory/**/*.slt'
After running e2e tests, you may kill the cluster and clean data.
./risedev k # shortcut for ./risedev kill
./risedev clean-data
RisingWave’s codebase is constantly changing. The persistent data might not be stable. In case of unexpected decode errors, try ./risedev clean-data
first.
Fuzzing tests
SqlSmith
Currently, SqlSmith supports for e2e and frontend fuzzing. Take a look at Fuzzing tests for more details on running it locally.
DocSlt tests
As introduced in #5117, DocSlt tool allows you to write SQL examples in sqllogictest syntax in Rust doc comments. After adding or modifying any such SQL examples, you should run the following commands to generate and run e2e tests for them.
# generate e2e tests from doc comments for all default packages
./risedev docslt
# or, generate for only modified package
./risedev docslt -p risingwave_expr
# run all generated e2e tests
./risedev slt-generated -p 4566 -d dev
# or, run only some of them
./risedev slt -p 4566 -d dev './e2e_test/generated/docslt/risingwave_expr/**/*.slt'
These will be run on CI as well.
Deterministic simulation tests
Deterministic simulation is a powerful tool to efficiently search bugs and reliably reproduce them. In case you are not familiar with this technique, here is a talk and a blog post for brief introduction.
See also the blog posts for a detailed writeup:
- Deterministic Simulation: A New Era of Distributed System Testing (Part 1 of 2)
- Applying Deterministic Simulation: The RisingWave Story (Part 2 of 2)
Unit and e2e tests
You can run normal unit tests and end-to-end tests in deterministic simulation mode.
# run deterministic unit test
./risedev stest
# run deterministic end-to-end test
./risedev sslt -- './e2e_test/path/to/directory/**/*.slt'
When your program panics, the simulator will print the random seed of this run:
thread '<unnamed>' panicked at '...',
note: run with `MADSIM_TEST_SEED=1` environment variable to reproduce this error
Then you can reproduce the bug with the given seed:
# set the random seed to reproduce a run
MADSIM_TEST_SEED=1 RUST_LOG=info ./risedev sslt -- './e2e_test/path/to/directory/**/*.slt'
More advanced usages are listed below:
# run multiple times with different seeds to test reliability
# it's recommended to build in release mode for a fast run
MADSIM_TEST_NUM=100 ./risedev sslt --release -- './e2e_test/path/to/directory/**/*.slt'
# configure cluster nodes (by default: 2fe+3cn)
./risedev sslt -- --compute-nodes 2 './e2e_test/path/to/directory/**/*.slt'
# inject failures to test fault recovery
./risedev sslt -- --kill-meta --etcd-timeout-rate=0.01 './e2e_test/path/to/directory/**/*.slt'
# see more usages
./risedev sslt -- --help
Deterministic test is included in CI as well. See CI script for details.
Integration tests
src/tests/simulation
contains some special integration tests that are designed to be run in deterministic simulation mode. In these tests, we have more fine-grained control over the cluster and the execution environment to test some complex cases that are hard to test in normal environments.
To run these tests:
./risedev sit-test <test_name>
Sometimes in CI you may see a backtrace, followed by an error message with a MADSIM_TEST_SEED
:
161: madsim::sim::task::Executor::block_on
at /risingwave/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.22/src/sim/task/mod.rs:238:13
162: madsim::sim::runtime::Runtime::block_on
at /risingwave/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.22/src/sim/runtime/mod.rs:126:9
163: madsim::sim::runtime::builder::Builder::run::{{closure}}::{{closure}}::{{closure}}
at /risingwave/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.22/src/sim/runtime/builder.rs:128:35
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
context: node=6 "compute-1", task=2237 (spawned at /risingwave/src/stream/src/task/stream_manager.rs:689:34)
note: run with `MADSIM_TEST_SEED=2` environment variable to reproduce this error
You may use that to reproduce it in your local environment. For example:
MADSIM_TEST_SEED=4 ./risedev sit-test test_backfill_with_upstream_and_snapshot_read
Backwards compatibility tests
This tests backwards compatibility between the earliest minor version and latest minor version of Risingwave (e.g. 1.0.0 vs 1.1.0).
You can run it locally with:
./risedev backwards-compat-test
In CI, you can make sure the PR runs it by adding the label ci/run-backwards-compat-tests
.
Debugging
Debug playground using vscode
To step through risingwave locally with a debugger you can use the launch.json
and the tasks.json
provided in vscode_suggestions
. After adding these files to your local .vscode
folder you can debug and set breakpoints by launching Launch 'risingwave p' debug
.
Observability
RiseDev supports several observability components.
Cluster Control
risectl
is the tool for providing internal access to the RisingWave cluster. See
cargo run --bin risectl -- --help
… or
./risedev ctl --help
for more information.
Monitoring
Uncomment grafana
and prometheus
lines in risedev.yml
to enable Grafana and Prometheus services.
Tracing
Compute nodes support streaming tracing. Tracing is not enabled by default. You need to
use ./risedev configure
to download the tracing components first. After that, you will need to uncomment tempo
service in risedev.yml
and start a new dev cluster to allow the components to work.
Traces are visualized in Grafana. You may also want to uncomment grafana
service in risedev.yml
to enable it.
Dashboard
You may use RisingWave Dashboard to see actors in the system. It will be started along with meta node, and available at http://127.0.0.1:5691/
.
The development instructions for dashboard are available here.
Logging
The Rust components use tokio-tracing
to handle both logging and tracing. The default log level is set as:
- Third-party libraries: warn
- Other libraries: debug
To configure log levels, launch RisingWave with the environment variable RUST_LOG
set as described here.
There’re also some logs designated for debugging purposes with target names starting with events::
.
For example, by setting RUST_LOG=events::stream::message::chunk=trace
, all chunk messages will be logged as it passes through the executors in the streaming engine. Search in the codebase to find more of them.
Metrics
The contents of this document may be subject to frequent change. It covers what each metric measures, and what information we may derive from it.
Barrier Latency
Prerequisite: Checkpoint
This metric measures the duration from which a barrier is injected into all sources in the stream graph, to the barrier flown through all executors in the graph.
What can we understand from it?
Usually when examining barrier latency, we look at high barrier latency.
There are two contributing factors to it:
- Time taken to actually process the streaming messages.
- Buffer capacity for streaming messages.
Processing costs
When injecting a new barrier, there will usually be streaming messages in the stream graph (unless it’s the initial barrier). Since we keep total order for streaming messages, this means that all streaming messages currently in the stream graph have to be processed before the barrier can pass through. If barrier latency is high, it could mean a long time is taken to process these streaming messages. Concretely, here are some costs of processing streaming messages:
- CPU cost of evaluating expressions.
- I/O remote exchange between fragments.
- Stateful Executor cache-miss (for instance hash-join and hash-agg). This results in extra costs to access state on s3.
Buffer capacities
Next, high barrier latency could also be caused by buffers in the graph. If some downstream buffer is congested, we will be unable to queue and continue processing upstream messages.
For instance, if the channel in the exchange executor is full, upstream messages cannot be sent through this channel. This means the upstream executor will be unable to continue processing new stream messages, until some space on the buffer is freed.
The various buffer sizes can currently be adjusted via options in the developer configuration file. For instance, options to configure buffer size of the exchange executor can be found here.
Another subtle cause is that large buffer size can also worsen barrier latency. Suppose stream message processing is at its limit, and there’s high latency as a result. Typically, backpressure kicks in, the source is throttled. If buffer sizes are too large, or if there are many buffers, there will not be backpressure applied to source immediately. During this delay, we will continue to see high barrier latency. A heuristic algorithm is on the way to deal with this: https://github.com/risingwavelabs/risingwave/issues/8654.
CPU Profiling Guide
Profiling on host
Share an easy-to-use profiler and flamegraph tool: https://github.com/koute/not-perf.git
Record samples:
nperf record -p `pidof compute-node` -o perf.data
Generate flamegraph:
nperf flamegraph --merge-threads perf.data > perf.svg
Profiling remote compute nodes
You can profile remote compute nodes from a local machine by simply type the following command.
./risedev ctl profile cpu --sleep [seconds]
All compute nodes will be profile for a given seconds
time and generated flame graph will be transferred to your local machine .risingwave/profiling/
.
Note: To profile compute nodes remotely, please make sure all remote nodes have a public IP address accessible from your local machine (where you are running risedev
).
CPU Profiling on OSX
Get the pid of the node you want to profile.
pgrep compute-node
Use cargo flamegraph
:
cargo install flamegraph
sudo flamegraph -o flamegraph.svg --pid [pid]
When you’re satisfied, you can Ctrl+C
to stop the profiler.
open flamegraph.svg
Memory (Heap) Profiling Guide
Note that the content below is Linux-exclusive.
What is Heap Profile?
A heap profiler records the stack trace of the allocation of each live object, so it’s possible that function A allocates something and then hand over it to struct B, in this case, the allocation will still be counted on A.
Internals
RisingWave uses tikv-jemallocator on Linux, which is a Rust wrapper of jemalloc, as its memory allocator. On other platforms, RisingWave uses the default allocator.
Luckily, jemalloc provides built-in profiling support (official wiki). jemallocator exposes the feature via a cargo feature ‘profiling’. Here is a simple guide to profiling with jemallocator.
For RisingWave, feat: support heap profiling from risedev by fuyufjh · Pull Request #4871 added all things needed. Please just follow the below steps.
Step 1 - Collect Memory Profiling Dump
Depends on the deployment, click the corresponding section to read the instructions.
1.1. Profile RisingWave (locally) with risedev
Run a local cluster in EC2 instance with an additional environment variable RISEDEV_ENABLE_HEAP_PROFILE
.
RISEDEV_ENABLE_HEAP_PROFILE=1 ./risedev d full
Here we use full
instead of compose-3node-deploy
because compose-3node-deploy
uses Docker container to run RisingWave processes, which makes it more difficult to do profiling and analyzing.
Under the hood, risedev
set environment variable MALLOC_CONF
for RisingWave process. Here is the implementation.
By default, the profiler will output a profile result on every 4GB memory allocation. Running a query and waiting for a while, lots of .heap
files will be generated in the current folder:
...
compactor.266308.15.i15.heap
compactor.266308.16.i16.heap
compactor.266308.17.i17.heap
compactor.266308.18.i18.heap
...
compute-node.266187.116.i116.heap
compute-node.266187.117.i117.heap
compute-node.266187.118.i118.heap
compute-node.266187.119.i119.heap
...
1.2. Profile RisingWave in testing pipelines
Currently, some testing pipelines such as longevity tests have enabled memory profiling by default, but some are not, such as performance benchmarks.
To enable heap profiling of compute nodes in benchmark pipelines, set environment variable when starting a job:
ENABLE_MEMORY_PROFILING=true
Under the hood, the pipeline script passes the value to kube-bench’s parameter benchmark.risingwave.compute.memory_profiling.enable
(code here, and then kube-bench sets the environment to RisingWave Pods (code here).
Note that this is only for compute nodes. If you need to run profiling on other nodes, or need to tune the parameters of profiling, you may modify the parameters in risingwave-test’s env.override.toml manually and run the job with that branch. (Example)
1.3. Profile RisingWave in Kubernetes/EKS
If you run into an OOM issue in Kukernetes, now you will need to enable memory profiling first and reproduce the problem.
To enable memory profiling, set the environment variables MALLOC_CONF
to Pods.
# Example: `statefulsets` for CN and Meta
kubectl edit statefulsets/benchmark-risingwave-compute-c
# Example: `deployments` for other nodes
kubectl edit deployments/benchmark-risingwave-connector-c
Add the MALLOC_CONF
env var. Note the prof_prefix
is used to specify the path and file names of dump. By default, /risingwave/cache/
is mounted to HostPath and will persist after Pod restarts, so we use it as dump path here.
env:
- name: MALLOC_CONF
value: prof:true,lg_prof_interval:38,lg_prof_sample:19,prof_prefix:/risingwave/cache/cn
The suggested values of lg_prof_interval
are different for different nodes. See risedev
code: compactor_service, compute_node_service.rs, meta_node_service.rs.
Afterwards, the memory dump should be outputted to the specified folder. Use kubectl cp
to download it to local.
1.4. Dump memory profile with risectl
You can manually dump a heap profiling with risectl for a compute node with Jemalloc profiling enabled (MALLOC_CONF=prof:true
).
./risedev ctl profile heap --dir [dumped_file_dir]
The dumped files will be saved in the directory you specified.
Note: To profile compute nodes remotely, please make sure all remote nodes have a public IP address accessible from your local machine (where you are running risedev
).
Step 2 - Analyze with jeprof
Note that each of the .heap
files are full snapshots instead of increments. Hence, simply pick the latest file (or any historical snapshot).
jeprof is a utility provided by jemalloc to analyze heap dump files. It reads both the executable binary and the heap dump to get a full heap profiling.
Note that the heap profiler dump file must be analyzed along with exactly the same binary that it generated from. If the memory dump is collected from Kubernetes, please refer to 2.2.
2.1. Use jeprof locally
jeprof
is already compiled in jemallocator and should be compiled by cargo, use it as follows:
# find jeprof binary
find . -name 'jeprof'
# set execution permission
chmod +x ./target/release/build/tikv-jemalloc-sys-22f0d47d5c562226/out/build/bin/jeprof
Faster jeprof
(recommend)
In some platforms jeprof
runs very slow. The bottleneck is addr2line
, if you want to speed up from 30 minutes to 3s, please use :
git clone https://github.com/gimli-rs/addr2line
cd addr2line
cargo b --examples -r
cp ./target/release/examples/addr2line <your-path>
2.2. Use jeprof in Docker images
jeprof
is included in RisingWave image v1.0.0
or later. For earlier versions, please copy an jeprof
manually into the container.
Find a Linux machine and use docker
command to start an environment with the specific RisingWave version. Here, -v $(pwd):/dumps
mounts current directory to /dumps
folder inside the container, so that you don’t need to copy the files in and out.
docker run -it --rm --entrypoint /bin/bash -v $(pwd):/dumps ghcr.io/risingwavelabs/risingwave:latest
Generate collapsed
file.
jeprof --collapsed binary_file heap_file > heap_file.collapsed
For example:
jeprof --collapsed /risingwave/bin/risingwave jeprof.198272.123.i123.heap > jeprof.198272.123.i123.heap.collapsed
Step 3 - Visualize Flame Graph
We recommend you to analyze collapsed
file with speedscope. Just drop the .collapsed
file into it. Click Left Heavy
in the top-left corner to merge shared calling stacks.
Alternative: Generate flame graph locally
Download and unarchive FlameGraph utility.
Run
./flamegraph.pl --color=mem --countname=bytes heap_file.collapsed > flamegraph.svg
Example:
./flamegraph.pl --color=mem --countname=bytes jeprof.198272.4741.i4741.collapsed > flamegraph.svg
By the way, the step 2 and 3 can be written in one line with pipe:
jeprof --collapsed target/release/risingwave compute-node.10404.2466.i2466.heap | ~/FlameGraph/flamegraph.pl --color=mem --countname=bytes > flamegraph.svg
Micro Benchmarks
We have micro benchmarks for various components such as storage, stream and batch executors.
Running Micro Benchmarks
You can run them by specifying their name.
For instance to run json_parser
micro benchmark:
cargo bench json_parser
Generating Flamegraph for Micro Benchmarks
Note: Flamegraph generation depends on
perf
. You will need a linux box to run it.
-
Install
cargo-flamegraph
cargo install flamegraph
-
Install
perf
. If on ubuntu:sudo apt install linux-tools
If using EC2, you may need this instead:
sudo apt install linux-tools-aws
On an EC2 instance you may also need to set
paranoid
level to 1, to give the profiler necessary permissions.sudo sh -c "echo 1 >/proc/sys/kernel/perf_event_paranoid"
-
Run flamegraph + benchmark (change
json_parser
to whichever benchmark you want to run.)cargo flamegraph --bench json_parser -- --bench
Within a benchmark, there are also typically multiple benchmark groups. For instance, within the
json_parser
bench, there’sjson_parser
,debezium_json_parser_(create/read/update/delete)
To filter you can just append a regex. For instance to only benchjson_parser
:cargo flamegraph --bench json_parser -- --bench ^json_parser
You can take a look at Criterion Docs for more information.
Develop Connectors
This page describes the development workflow to develop connectors. For design docs, see
RisingWave supports a lot of connectors (sources and sinks). However, developing connectors is tricky because it involves external systems:
- Before developing and test, it’s troublesome to set up the development environment
- During testing, we need to seed the external system with test data (perhaps with some scripts)
- The test relies on the configuration of the setup. e.g., the test needs to know the port of your Kafka in order to
- We need to do the setup for both CI and local development.
Our solution is: we resort to RiseDev, our all-in-one development tool, to help manage external systems and solve these problems.
Before going to the specific methods described in the sections below, the principles we should follow are:
- environment-independent: It should easy to start cluster and run tests on your own machine, other developers’
machines, and CI.
- Don’t use hard-coded configurations (e.g.,
localhost:9092
for Kafka). - Don’t write too many logic in
ci/scripts
. Let CI scripts be thin wrappers.
- Don’t use hard-coded configurations (e.g.,
- self-contained tests: It should be straightforward to run one test case, without worrying about where is the script
to prepare the test.
- Don’t put setup logic, running logic and verification logic of a test in different places.
Reference: for the full explanations of the difficulies and the design of our solution, see here.
The following sections first walk you through what is the development workflow for existing connectors, and finally explain how to extend the development framework to support a new connector.
- Set up the development environment
- End-to-end tests
- Adding a new connector to the development framework
Set up the development environment
RiseDev supports starting external connector systems (e.g., Kafka, MySQL) just like how it starts the RisingWave cluster, and other standard systems used as part of the RisingWave Cluster (e.g., MinIO, etcd, Grafana).
You write the profile in risedev.yml
(Or risedev-profiles.user.yml
), e.g., the following config includes Kafka and
MySQL, which will be used to test sources.
my-cool-profile:
steps:
# RisingWave cluster
- use: minio
- use: sqlite
- use: meta-node
meta-backend: sqlite
- use: compute-node
- use: frontend
- use: compactor
# Connectors
- use: kafka
address: message_queue
port: 29092
- use: mysql
port: 3306
address: mysql
user: root
password: 123456
Then
# will start the cluster along with Kafka and MySQL for you
risedev d my-cool-profile
For all config options of supported systems, check the comments in template
section of risedev.yml
.
Escape hatch: user-managed
mode
user-managed
is a special config. When set to true
, you will need to start the system by yourself. You may wonder
why bother to add it to the RiseDev profile if you start it by yourself. In this case, the config will still be loaded
by RiseDev, which will be useful in tests. See chapters below for more details.
The user-managed
mode can be used as a workaround to start a system that is not yet supported by RiseDev, or is buggy.
It’s also used to config the CI profile. (In CI, all services are pre-started by ci/docker-compose.yml
)
Example of the config:
- use: kafka
user-managed: true
address: message_queue
port: 29092
End-to-end tests
The e2e tests are written in slt
files. There are 2 main points to note:
- Use
system ok
to runbash
commands to interact with external systems. Use this to prepare the test data, and verify the results. The whole lifecycle of a test case should be written in the sameslt
file. - Use
control substitution on
and then use environment variables to specify the config of the external systems, e.g., the port of Kafka.
Refer to
the sqllogictest-rs documentation
for the details of system
and substitution
.
Take Kafka as an example about how to the tests are written:
When you use risedev d
to start the external services, related environment variables for Kafka will be available when
you run risedev slt
:
RISEDEV_KAFKA_BOOTSTRAP_SERVERS="127.0.0.1:9092"
RISEDEV_KAFKA_WITH_OPTIONS_COMMON="connector='kafka',properties.bootstrap.server='127.0.0.1:9092'"
RPK_BROKERS="127.0.0.1:9092"
The slt
test case looks like this:
control substitution on
# Note: you can also use envvars in `system` commands, but usually it's not necessary since the CLI tools can load envvars themselves.
system ok
rpk topic create my_source -p 4
# Prepared test topic above, and produce test data now
system ok
cat << EOF | rpk topic produce my_source -f "%p %v\n" -p 0
0 {"v1": 1, "v2": "a"}
1 {"v1": 2, "v2": "b"}
2 {"v1": 3, "v2": "c"}
EOF
# Create the source, connecting to the Kafka started by RiseDev
statement ok
create source s0 (v1 int, v2 varchar) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'my_source',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
See src/risedevtool/src/risedev_env.rs
for variables supported for each service.
Note again: You need to use
risedev d
to start the cluster, and then userisedev slt
to run the tests. It doesn’t work if you start the cluster by yourself without telling RiseDev, or you use rawsqllogictest
binary directly.How it works:
risedev d
will write env vars to.risingwave/config/risedev-env
, andrisedev slt
will load env vars from this file.
Tips for writing system
commands
Refer to the sqllogictest-rs documentation for the syntax.
For simple cases, you can directly write a bash command, e.g.,
system ok
mysql -e "
DROP DATABASE IF EXISTS testdb1; CREATE DATABASE testdb1;
USE testdb1;
CREATE TABLE tt1 (v1 int primary key, v2 timestamp);
INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00');
"
system ok
cat << EOF | rpk topic produce my_source -f "%p %v\n" -p 0
0 {"v1": 1, "v2": "a"}
1 {"v1": 2, "v2": "b"}
2 {"v1": 3, "v2": "c"}
EOF
For more complex cases, you can write a test script, and invoke it in slt
. Scripts can be written in any language you
like, but kindly write a README.md
to help other developers get started more easily.
-
For ad-hoc scripts (only used for one test), it’s better to put next to the test file.
e.g.,
e2e_test/source_inline/kafka/consumer_group.mjs
, which is invoked byconsumer_group.slt
next to it. -
For general scripts that can be used under many situations, put it in
e2e_test/commands/
. This directory will be loaded inPATH
byrisedev slt
, and thus function as kind of “built-in” commands.A common scenario is when a CLI tool does not accept envvars as arguments. In such cases, instead of manually specifying the arguments each time invoking it in
slt
, you can create a wrapper to handle this implicitly, making it more concise.e2e_test/commands/mysql
is a good demonstration.
Tips for debugging:
-
Use
echo
to check whether the environment is correctly set.system ok echo $PGPORT ---- placeholder
Then running
risedev slt
will return error “result mismatch”, and shows what’s the output of theecho
command, i.e., the value ofPGPORT
. -
Use
risedev show-risedev-env
to see the environment variables available forrisedev slt
, after you starting the cluster withrisedev d
.
Adding a new connector to the development framework
Refer to #16449 ( user-managed
only MySQL),
and #16514 (Docker based MySQL) as examples.
- Add a new service in
template
section ofrisedev.yml
. And add corresponding config insrc/risedevtool/src/service_config.rs
. - Implement the new service task, and add it to
src/risedevtool/src/bin/risedev-dev.rs
. - Add environment variables you want to use in the
slt
tests insrc/risedevtool/src/risedev_env.rs
. - Write tests according to the style explained in the previous section.
Source
This page describes RisingWave’s Data Source API and the architecture behind it. This may help if you are interested in how data sources work, or if you want to implement a new Data Source.
For the workflow of developing connectors, see Develop Connectors.
Components
RisingWave’s data source covers four parts: connectors, enumerators, ConnectorSource and SourceExecutor.
Connectors
Connector
serves as an interface to upstream data pipeline, including the message queue and file system. In the current design, it can only have a limited concurrency. One connector instance only reads from one split from the upstream. For example, if upstream is a Kafka and it has three partitions so, in RisingWave, there should be three connectors.
All connectors need to implement the following trait and it exposes two methods to the upper layer.
#![allow(unused)] fn main() { // src/connector/src/base.rs pub trait SplitReader: Sized { type Properties; async fn new( properties: Self::Properties, state: ConnectorState, columns: Option<Vec<Column>>, ) -> Result<Self>; async fn next(&mut self) -> Result<Option<Vec<SourceMessage>>>; } }
new
: create a new connector with some properties, and this method should support restoring from a specific state(with partitions and offsets).next
: return a batch of new messages and their offsets in the split.
Enumerators
Enumerator
periodically requests upstream to discover changes in splits, and in most cases the number of splits only increases. The enumerator is a separate task that runs on the meta. If the upstream split changes, the enumerator notifies the connector by means of config change to change the subscription relationship.
All enumerators need to implement the following trait.
#![allow(unused)] fn main() { // src/connector/src/base.rs pub trait SplitEnumerator: Sized { type Split: SplitMetaData + Send + Sync; type Properties; async fn new(properties: Self::Properties) -> Result<Self>; async fn list_splits(&mut self) -> Result<Vec<Self::Split>>; } }
new
: creates an enumerator with some properties.list_splits
: requests the upstream and returns all partitions.
ConnectorSource
ConnectorSource
unites all connectors via SourceReader
trait. Also, a parser is held here, which parses raw data to stream chunks according to column description. A ConnectorSource
can handle multiple splits by spawning a new thread for each split. If the source is assigned no split, it will start a dummy reader whose next
method never returns as a placeholder.
SourceExecutor
SourceExecutor
is initialized with splits assigned by the enumerator to subscribe. The channel of the data chunks and the channel of the barrier are combined at this level and the SourceExecutor
needs to prioritize and correctly handle the barriers.
How It Works
- When a source is defined, meta service will register its schema and broadcast to compute nodes. Compute node extracts properties from the frontend and builds corresponding components and stores them as
SourceDesc
insource_manager
identified by table_id. Note that at this stage, the source instance is only built but not running. - No
SourceExecutor
will be built until a subsequent materialized view is created.SourceExecutor
fetches specific source instance fromsource_manager
identified by table_id and holds a copy of it, and initializes the corresponding state store at this stage. - When receiving a barrier, SourceExecutor will check whether it contains an assign_split mutation. If the partition assignment in the assign_split mutation is different from the current situation, the
SourceExecutor
needs to rebuild theConnectorSource
and other underlying services based on the information in the mutation, then starts reading from the new split and offset. - Whenever receiving a barrier, the state handler always takes a snapshot of the
ConnectorSource
then labels the snapshot with an epoch number. When an error occurs, SourceExecutor takes a specific state and applies it.
Continuous Integration
CI Labels Guide
[ci/run-xxx ...]
: Run additional steps in the PR workflow indicated byci/run-xxx
in your PR.ci/pr/run-selected
+[ci/run-xxx ...]
: Only run selected steps indicated byci/run-xxx
in your DRAFT PR.ci/main-cron/run-all
: Run fullmain-cron
workflow for your PR.ci/main-cron/run-selected
+[ci/run-xxx …]
: Run specific steps indicated byci/run-xxx
from themain-cron
workflow, in your PR. Can use to verify somemain-cron
fix works as expected.- To reference
[ci/run-xxx ...]
labels, you may look at steps frompull-request.yml
andmain-cron.yml
.
Example
https://github.com/risingwavelabs/risingwave/pull/17197
To run e2e-test
and e2e-source-test
for main-cron
in your pull request:
- Add
ci/run-e2e-test
. - Add
ci/run-e2e-source-tests
. - Add
ci/main-cron/run-selected
to skip all other steps which were not selected withci/run-xxx
.
Main Cron Bisect Guide
- Create a new build via buildkite
- Add the following environment variables:
GOOD_COMMIT
: The good commit hash.BAD_COMMIT
: The bad commit hash.BISECT_BRANCH
: The branch name where the bisect will be performed.CI_STEPS
: TheCI_STEPS
to run during the bisect. Separate multiple steps with a comma.- You can check the labels for this in main-cron.yml, under the conditions for each step.
Example you can try on buildkite:
- Environment variables:
GOOD_COMMIT=29791ddf16fdf2c2e83ad3a58215f434e610f89a BAD_COMMIT=7f36bf17c1d19a1e6b2cdb90491d3c08ae8b0004 BISECT_BRANCH=kwannoel/test-bisect CI_STEPS="test-bisect,disable-build"
Architecture Design
Motivation
This document serves as one of the materials for newcomers to learn the high-level architecture and the functionalities of each component.
Architecture
There are currently 4 types of nodes in the cluster:
- Frontend: Frontend is a stateless proxy that accepts user queries through Postgres protocol. It is responsible for parsing, validation, optimization, and answering the results of each individual query.
- ComputeNode: ComputeNode is responsible for executing the optimized query plan.
- Compactor: Compactor is a stateless worker node responsible for executing the compaction tasks for our storage engine.
- MetaServer: The central metadata management service. It also acts as a failure detector that periodically sends heartbeats to frontends and compute-nodes in the cluster. There are multiple sub-components running in MetaServer:
- ClusterManager: Manages the cluster information, such as the address and status of nodes.
- StreamManager: Manages the stream graph of RisingWave.
- CatalogManager: Manage table catalog in RisingWave. DDL goes through catalog manager and catalog updates will be propagated to all frontend nodes in an async manner.
- BarrierManager: Manage barrier injection and collection. Checkpoint is initiated by barrier manager regularly.
- HummockManager: Manages the SST file manifest and meta-info of Hummock storage.
- CompactionManager: Manages the compaction status and task assignment of Hummock storage.
The topmost component is the Postgres client. It issues queries through TCP-based Postgres wire protocol.
The leftmost component is the streaming data source. Kafka is the most representative system for streaming sources. Alternatively, Redpanda, Apache Pulsar, AWS Kinesis, Google Pub/Sub are also widely-used. Streams from Kafka will be consumed and processed through the pipeline in the database.
The bottom-most component is AWS S3, or MinIO (an open-sourced s3-compatible system). We employed a disaggregated architecture in order to elastically scale the compute-nodes without migrating the storage.
Execution Mode
There are 2 execution modes in our system serving different analytics purposes.
Batch-Query Mode
The first is the batch-query mode. Users issue such a query via a SELECT statement and the system answers immediately. This is the most typical RDBMS use case.
Let’s begin with a simple SELECT and see how it is executed.
SELECT SUM(t.quantity) FROM t group by t.company;
The query will be sliced into multiple plan fragments, each being an independent scheduling unit and probably with different parallelism. For simplicity, parallelism is usually set to the number of CPU cores in the cluster. For example, if there are 3 compute-nodes in the cluster, each with 4 CPU cores, then the parallelism will be set to 12 by default.
Each parallel unit is called a task. Specifically, PlanFragment 2 will be distributed as 4 tasks to 4 CPU cores.
Behind the TableScan operator, there’s a storage engine called Hummock that stores the internal states, materialized views, and the tables. Note that only the materialized views and tables are queryable. The internal states are invisible to users.
To know more about Hummock, you can check out “An Overview of RisingWave State Store”.
Streaming Mode
The other execution mode is the streaming mode. Users build streaming pipelines via CREATE MATERIALIZED VIEW statement. For example:
CREATE MATERIALIZED VIEW mv1 AS SELECT SUM(t.quantity) as q FROM t group by t.company;
When the data source (Kafka, e.g.) propagates a bunch of records into the system, the materialized view will refresh automatically.
Assume that we have a sequence [(2, "AMERICA"), (3, "ASIA"), (4, "AMERICA"), (5, "ASIA")]
. After the sequence flows through the DAG, the MV will be updated to:
A | B |
---|---|
6 | AMERICA |
8 | ASIA |
When another sequence [(6, "EUROPE"), (7, "EUROPE")]
comes, the MV will soon become:
A | B |
---|---|
6 | AMERICA |
8 | ASIA |
13 | EUROPE |
mv1
can also act as other MV’s source. For example, mv2, mv3 can reuse the processing results of mv1 thus deduplicating the computation.
The durability of materialized views in RisingWave is built upon a snapshot-based mechanism. Every time a snapshot is triggered, the internal states of each operator will be flushed to S3. Upon failover, the operator recovers from the latest S3 checkpoint.
Since the streaming states can be extremely large, so large that they cannot (or only ineffectively) be held in memory in their entirety, we have designed Hummock to be highly scalable. Compared to Flink’s rocksdb-based state store, Hummock is cloud-native and provides super elasticity.
For more details of our streaming engine, please refer to “An Overview of RisingWave Streaming Engine”.
An Overview of the RisingWave Streaming Engine
Overview
RisingWave provides real-time analytics to serve user’s need. This is done by defining materialized views (MV). All materialized views will be automatically refreshed according to recent updates, such that querying materialized views will reflect real-time analytical results. Such refreshing is carried out by our RisingWave streaming engine.
The core design principles of the RisingWave streaming engine are summarized as follows.
- Actor model based execution engine. We create a set of actors such that each actor reacts to its own input message, including both data update and control signal. In this way we build a highly concurrent and efficient streaming engine.
- Shared storage for states. The backbone of the state storage is based on shared cloud object storage (currently AWS S3), which gives us computational elasticity, cheap and infinite storage capacity, and simplicity during configuration change.
- Everything is a table, everything is a state. We treat every object in our internal storage as both a logical table and an internal state. Therefore, they can be effectively managed by catalog, and be updated in a unified streaming engine with consistency guarantee.
In this document we give an overview of the RisingWave streaming engine.
Architecture
The overall architecture of RisingWave is depicted in the figure above. In brief, RisingWave streaming engine consists of three sets of nodes: frontend, compute nodes, and meta service. The frontend node consists of the serving layer, handling users’ SQL requests concurrently. Underlying is the processing layer. Each compute node hosts a collection of long-running actors for stream processing. All actors access a shared persistence layer of storage (currently AWS S3) as its state storage. The meta service maintains all meta-information and coordinates the whole cluster.
When receiving a create materialized view statement at the frontend, a materialized view and the corresponding streaming pipeline are built in the following steps.
- Building a stream plan. Here a stream plan is a logical plan which consists of logical operators encoding the dataflow. This is carried out by the streaming planner at the frontend.
- Fragmentation. The stream fragmenter at the meta service breaks the generated logical stream plan into stream fragments, and duplicates such fragments when necessary. Here a stream fragment holds partial nodes from the stream plan, and each fragment can be parallelized by building multiple actors for data parallelization.
- Scheduling plan fragments. The meta service distributes different fragments into different compute nodes and let all compute nodes build their local actors.
- Initializing the job at the backend. The meta service notifies all compute nodes to start serving streaming pipelines.
Actors, executors, and states
Actors
Actors are the minimal unit to be scheduled in the RisingWave streaming engine, such that there is no parallelism inside each actor. The typical structure of an actor is depicted on the right of the figure above. An actor consists of three parts.
- Merger (optional). Each merger merges the messages from different upstream actors into one channel, such that the executors can handle messages sequentially. The merger is also in charge of aligning barriers to support checkpoints (details described later).
- A chain of executors. Each executor is the basic unit of delta computation (details described later).
- Dispatcher (optional). Each dispatcher will send its received messages to different downstream actors according to certain strategies, e.g. hash shuffling or round-robin.
The execution of actors is carried out by tokio async runtime. After an actor starts running, it runs an infinite loop in which it continuously runs async functions to generate outputs, until it receives a stop message.
Messages between two local actors are transferred via channels. For two actors located on different compute nodes, messages are re-directed to an exchange service. The exchange service will continuously exchange messages with each other via RPC requests.
Executors
Executors are the basic computational units in the streaming engine. Each executor responds to its received messages and computes an output message atomically, i.e the computation inside each executor will not be broken down.
The underlying algorithmic framework of the RisingWave streaming system is the traditional change propagation framework. Given a materialized view to be maintained, we build a set of executors where each executor corresponds to a relational operator (including base table). When any of the base tables receive an update, the streaming engine computes the changes to each of the materialized views by recursively computing the update from the leaf to the root. Each node receives an update from one of its children, computes the local update, and propagates the update to its parents. By guaranteeing the correctness of every single executor, we get a composable framework for maintaining arbitrary SQL queries.
Checkpoint, Consistency, and Fault tolerance
We use the term consistency to denote the model of the completeness and correctness of querying materialized views. We follow the consistency model introduced in Materialize. More specifically, the system assures that the query result is always a consistent snapshot of a certain timestamp t before the query issued a timestamp. Also, later queries always get consistent snapshots from a later timestamp. A consistent snapshot at t requires that all messages no later than t are reflected in the snapshot exactly once while all messages after t are not reflected.
Barrier based checkpoint
To guarantee consistency, RisingWave introduces a Chandy-Lamport style consistent snapshot algorithm as its checkpoint scheme.
This procedure guarantees that every state to be flushed into the storage is consistent (matching a certain barrier at the source). Therefore, when querying materialized views, consistency is naturally guaranteed when the batch engine reads a consistent snapshot (of views and tables) on the storage. We also call each barrier an epoch and sometimes use both terms interchangeably as data streams are cut into epochs. In other words, the write to the database is visible only after it has been committed to the storage via the checkpoint.
To improve the efficiency, all dirty states on the same compute node are gathered to a shared buffer, and the compute node asynchronously flushes the whole shared buffer into a single SST file in the storage, such that the checkpoint procedure shall not block stream processing.
See more detailed descriptions on Checkpoint.
Fault tolerance
When the streaming engine crashes down, the system must globally rollback to a previous consistent snapshot. To achieve this, whenever the meta detects the failover of some certain compute node or any undergoing checkpoint procedure, it triggers a recovery process. After rebuilding the streaming pipeline, each executor will reset its local state from a consistent snapshot on the storage and recover its computation.
Checkpoint
Revisit: Consistency Model
Similar to other relational databases, RisingWave provides consistent snapshot reads on both tables and materialized views. Specifically,
- Consistency. Given a query Q and the latest event timestamp t, the system returns a result Q(t′) such that Q(t′) is consistent on a timestamp t′≤t , i.e. evaluating Q over a snapshot S(t′) from a previous timestamp t′≤t, where S(t′) is the set of all tuples presented before timestamp t′. That is, the system delivers a query result that is consistent with a previous timestamp t′.
- Monotonicity. For any two timestamps t1 and t2 such that t1<t2, assume the result for Q on t1 and t2 are consistent with snapshots on t1′ and t2′ respectively, then t1′<t2′. That is, later queries should not return results that correspond to earlier snapshots.
Note that RisingWave does not guarantee a write must be visible to subsequence reads, a.k.a. the read-after-write consistency. Users may use the FLUSH
command to make sure the changes have taken effect before reads.
Internally, the upcoming changes may take a while to propagate from sources to materialized views, and at least one barrier event is required to flush the changes. Such two kinds of latency determine the latency between write and read.
Streaming Checkpoint
The consistent checkpoints play 2 roles in our system.
- Fault-tolerance. To recover the cluster from an unexpected failure, every stateful streaming operator needs to recover their states from a consistent checkpoint.
- Consistent snapshot. The snapshot to be read is actually the latest completed checkpoint. As the previous section discussed, it’s required to guarantee the data of tables & materialized views consistent in one snapshot.
RisingWave makes checkpointing via Chandy–Lamport algorithm. A special kind of message, checkpoint barriers, is generated on streaming source and propagates across the streaming graph to the materialized views (or sink).
To guarantee consistency, RisingWave introduces Chandy-Lamport algorithm as its checkpoint scheme.
In particular, RisingWave periodically (every barrier_interval_ms
) repeats the following procedure:
- The meta service initializes a barrier and broadcasts it to all source actors across the streaming engine.
- The barrier messages go through every streaming operator (actors) in the streaming graph.
- For fan-out operators like
Dispatch
, the barrier messages are copied to every downstream. - For fan-in operators like
Merge
orJoin
, the barrier messages are collected and emitted out once collected from all upstreams. - For other operators, the barrier messages flow through the operators and trigger a checkpoint operation on them. Namely, flush the changes into storage.
- For fan-out operators like
- When all dirty states from a compute node are flushed to storage, the compute node sends a finish signal to the meta service.
- After receiving the finish signal from all compute nodes, the meta service tells the storage manager to commit the checkpoint and finishes the checkpoint procedure.
Checkpoint on Storage
As is mentioned before, during checkpointing, every operator writes their changes of this epoch into storage. For the storage layer, these data are still uncommitted, i.e. not persisted to the shared storage. However, the data still need to be visible to that operator locally.
A local shared buffer is introduced to stage these uncommitted write batches. Once the checkpoint barriers have pass through all actors, the storage manager can notify all compute nodes to ‘commit’ their buffered write batches into the shared storage.
Another benefit of shared buffer is that the write batches in a compute node can be compacted into a single SSTable file before uploading, which significantly reduces the number of SSTable files in Layer 0.
Aggregations
We will cover internal implementation of common aggregations in this document.
Frontend
TODO
Expression Framework
TODO
HashAggExecutor
Within the HashAggExecutor
, there are 4 main components:
- AggCalls.
- AggState.
- AggGroups.
- Persisted State.
AggCalls are the aggregation calls for the query. For instance SUM(v1)
, COUNT(v2)
has the AggCalls SUM
and COUNT
.
AggState is the state we use to compute to the result (output) of the aggregation call. Within each aggregation group, it will have an AggState for each AggCall.
AggGroups are created per aggregation group.
For instance with GROUP BY x1, x2
, there will be a group for each unique combination of x1
and x2
.
Whenever stream chunks come in, the executor will update the aggregation state for each group, per agg call.
On barrier, we will persist the in-memory states.
For value
type aggregations, we will persist the state to the intermediate state table.
This state table will store all value aggregations per group on a single row.
For MaterializedInput
type aggregations, these require tracking input state. For example, non-append-only min/max.
For each of these aggregations, they have 1 state table (AggStateStorage::MaterializedInput
) each. Within the state table, it will store the input state for each group.
Initialization of AggGroups
AggGroups are initialized when corresponding aggregation groups are not found in AggGroupCache
.
This could be either because the AggGroupCache
got evicted,
or its a new group key.
It could take a while to initialize agg groups, hence we cache them in AggGroupCache
.
MView on Top of MView
Background
RisingWave supports creating a new materialized view (abbreviated as mview) based on the source and another mview, so users can split their data into multiple layers and use mviews’ chains to connect them.
In detail, we will support the creation of a materialized view whose source is some other mview(s). Please note that there should not be a circular dependency on mviews.
create table t1 (v1 int, deleted boolean);
create materialized view mv1 as select * from t1 where deleted = false;
create materialized view mv2 as select sum(v1) as sum_v1 from mv1;
create materialized view mv3 as select count(v1) as count_v1 from mv1;
Design
Broadcast operator
In physical representation, we introduce a dispatcher operator type, Broadcast. Broadcast dispatcher, as its name indicates, will dispatch every message to multiple downstreams. To simplify our design, we can assume that every MViewOperator has a Broadcast
output, with zero or more downstreams.
Create new mview online
Assume that we already have a materialized view mv1, and we want to create a new materialized view mv2 based on mv1. This is equivalent to a configuration change to Broadcast dispatcher. Before the real change happens, we have to apply the snapshot of mv1 to mv2 first. Therefore, we introduce another operator named Chain.
The Chain operator has two inputs. The first one will be a batch query, denoted by the blue patterns in the figure below, which is a finite append-only stream (the snapshot of historical data in the base mview). The second one is its original input, an infinite stream, denoted by the red patterns.
The full process of creation is:
- The frontend parses the query and sends the plan to StreamManager.
- StreamManager creates the new actors.
- StreamManager chooses a change epoch e1, pins a snapshot of mv1 at e1, and sends a barrier with e1 and change info.
- The broadcast operator receives the barrier, then creates a SnapshotStream of mv1 with e1, and creates a Chain operator, then connects them all. (only changes in the memory).
- The broadcast operator sends a normal barrier e1 to all downstreams, and continues.
- The Chain operator consumes all messages from snapshot and receives EOF, then consumes buffered messages from upstream.
- StreamManager discovered that mv2 has almost caught up with the progress of mv1, and the creation success.
Drop mview online
Assume that we already have three materialized views mv1, mv2, and mv3. mv2 and mv3 are on top of mv1, so mv1 is not allowed to be dropped.
The full process of drop mv3 is:
- The frontend parses the query and sends the plan to StreamManager.
- StreamManager chooses a change epoch e1, and sends a barrier with e1 and change info.
- The broadcast operator sends a normal barrier e1 to all downstreams.
- The broadcast operator removes the dropped output from its outputs, and continues.
- StreamManager discovered that mv3 has the epoch e1, then drops extra fragments physically.
Backfill
Backfill is used by various components of our system to merge historical data and realtime data stream.
There are many variants to it, and we will discuss them in the following sections.
Table of Contents
Backfilling 101
Motivation
When creating a Materialized View on top of another one, we need to fetch all the historical data, only then we can start processing the realtime updates, and applying them to the stream.
However, in the process of doing so, we need to either:
- Buffer all the updates.
- Block the upstream from sending updates (i.e. pause the entire stream graph).
For the first option, it is not feasible to buffer all the updates, when historical data takes while to process. If we buffer all the updates it can cause OOM.
For the second option, it is also not feasible, as we are blocking the entire stream graph whenever we create a new materialized view, until processing historical data is done.
So we need a way to merge historical data and realtime updates, without blocking the upstream. This can be done by Backfill.
How it works
Backfilling is the process of merging historical data and update stream into one.
Consider the following example.
We have the following table with 1M records:
CREATE TABLE t (
id INT PRIMARY KEY,
name VARCHAR
);
Then we create a materialized view from that table:
CREATE MATERIALIZED VIEW mv AS SELECT * FROM t;
In one epoch B
, we read the historical
data from t
up to row 2
, from data in the previous epoch A
:
op | id | name |
---|---|---|
+ | 1 | a |
+ | 2 | b |
Note that the
op
column is the operation type, and+
meansinsert
.We use
insert
since all the historical data are just inserts to the downstream materialized view.
In that same epoch B
, suppose there are some updates to the table t
due to DML
statements being ran:
op | id | name |
---|---|---|
+ | 4 | d |
- | 1 | a |
.. | .. | .. |
- | 99 | zzz |
+ | 100 | zzzz |
The same updates will then be sent propagated mv
in epoch B
.
Since we backfilled the historical data up to row 2
, we only need to apply the updates up to row 2
.
So downstream will just receive:
-
The historical data up to row
2
.op id name + 1 a + 2 b -
The realtime delta stream up to row
2
:op id name - 1 a
So we didn’t need to buffer all the updates, until historical data is completely processed. Instead at each epoch, we just read some historical data, and apply any relevant updates on them.
To ensure we always make progress, we will keep track of the last row we backfilled to, and continue from after that row in the next epoch.
In the following sections, we will delve into specific types of backfill.
References
RFC: Use Backfill To Let Mv On Mv Stream Again
NoShuffle Backfill
This section will mainly discuss the implementation of NoShuffle Backfill, as the concept is as described above.
This backfill executor is the precursor to arrangement backfill.
It is used to backfill RisingWave Materialized Views, Tables and Indexes.
NoShuffleBackfill
executor receives upstream updates via a NoShuffleDispatcher
.
For historical data, it uses batch scan to read snapshot data using the StorageTable
interface.
Using the NoShuffleDispatcher
means that the actors in the scan fragment
need to be scheduled to the same parallel unit
as the actors in the dispatcher.
This also means that the parallelism of NoShuffleBackfill
is coupled with
its upstream fragment.
When scaling, we can’t scale backfill independently as a result, only together with the upstream fragment.
Another issue with NoShuffleBackfill
is the way backfill state is stored.
Backfill executor stores a single latest state across all vnodes it has:
vnode | pk_offset |
---|---|
0 | 5 |
1 | 5 |
2 | 5 |
This means if we scale in or out, the state may not be accurate. This is because the state if partitioned per vnode could be:
vnode | pk_offset |
---|---|
0 | 1 |
1 | 2 |
2 | 5 |
If we ran some scaling operations, and got:
vnode | pk_offset |
---|---|
3 | 0 |
1 | 5 |
2 | 5 |
It would be unclear which pk_offset
to resume from.
In the next iteration of backfill, we have ArrangementBackfill
which solves these issues.
Arrangement Backfill
ArrangementBackfill
is the next iteration of NoShuffleBackfill
executor.
Similarly, it is used to backfill RisingWave Materialized Views, Tables and Indexes.
The main goal of ArrangementBackfill
is to scale its parallelism independently of the upstream fragment.
This is done with replication
.
Differences with NoShuffleBackfill
First, let’s discuss the key differences in components.
Side | NoShuffleBackfill | ArrangementBackfill |
---|---|---|
Upstream | NoShuffleDispatcher | HashDispatcher |
Historical | Scan on StorageTable | Scan on Replicated StateTable |
For the upstream part, it’s pretty straightforward.
We use a HashDispatcher
to dispatch updates to the backfill executor,
since ArrangementBackfill
can be on a different parallel unit than its upstream fragment.
For the historical part,
we use a Replicated StateTable
to read historical data, and replicate the shared buffer.
Arrangement Backfill Frontend
Arrangement Backfill is constructed in the following phases in the optimizer:
(1) LogicalScan -> (2) StreamTableScan -> (3) PbStreamScan (MergeNode, ..)
From 2 to 3, we will compute the output_indices
(A) from upstream to backfill,
and the output_indices
(B) from backfill to downstream.
(B) will always be a subset of (A). The full PK is needed for backfilling, but it is not always needed after that.
For example, consider the following queries.
create table t1(id bigint primary key, i bigint);
create materialized view mv1 as select id, i from t1 order by id, i;
create materialized view mv2 as select id from mv1;
mv1
has the following plan:
StreamMaterialize { columns: [id, i], stream_key: [id], pk_columns: [id, i], pk_conflict: NoCheck }
└─StreamTableScan { table: t1, columns: [id, i] }
mv2
has the following plan:
StreamMaterialize { columns: [id], stream_key: [id], pk_columns: [id], pk_conflict: NoCheck }
└─StreamTableScan { table: mv1, columns: [mv1.id], stream_scan_type: ArrangementBackfill, pk: [mv1.id], dist: UpstreamHashShard(mv1.id) }
(2 rows)
Notice how mv2
only needs the id
column from mv1
, and not the full pk
with i
.
Backfill logic
Overview
For ArrangementBackfill
, we have 2 streams which we merge:
upstream and historical streams.
Upstream
will be given precedence,
to make sure Barriers
can flow through the stream graph.
Additionally, every epoch, we will refresh the historical stream, as upstream data gets checkpointed
so our snapshot is stale.
We will poll from this stream in backfill to get upstream and historical data chunks for processing, as well as barriers to checkpoint to backfill state.
For each chunk (DataChunk / StreamChunk), we may also need to do some further processing based on their schemas.
Schemas
There are 3 schemas to consider when processing the backfill data:
- The state table schema of upstream.
- The output schema from upstream to arrangement backfill.
- The output schema from arrangement backfill to its downstream.
For chunks coming from upstream (whether historical or directly from the dispatcher), we will need to transform it from (2) to (3) before yielding the chunks downstream.
For chunks being replicated to the replicated state table, we need to transform them from (2) to (1), so they match the upstream state table schema. Otherwise, deserialization for these replicated records will fail, due to a schema mismatch.
For chunks being read from the replicated state table, it must contain logic to transform them from (1) to (2), to ensure the historical side and the upstream side have a consistent schema.
Polling loop
If we poll a chunk from the historical side, we will yield it to the downstream, and update the primary key (pk) we have backfilled to in the backfill state.
If we poll a chunk from the upstream side, we will buffer it.
This is because we need to only contain updates for historical data we have backfilled.
We can just do that at the end of the epoch, which is when we receive a barrier.
We will also replicate it by writing it to the ReplicatedStateTable
.
If we poll a barrier from the upstream side, we will need to flush the upstream chunk buffer. First, transform the schema of the upstream chunk buffer from 2. to 3.. Next we will flush records which are lower or equal to the pk we have backfilled to. Finally, we build a new snapshot stream to read historical data in the next epoch.
Then the polling loop will continue.
Replication
Previously, when doing snapshot reads to read Historical Data, backfill executor is able to read from the shared buffer for the previous epoch. This is because it will be scheduled to the same parallel unit as the upstream.
However, with ArrangementBackfill
,
we can’t rely on the shared buffer of upstream,
since it can be on a different parallel unit.
So we need to make sure for the previous epoch, we buffer its updates somewhere to replicate the shared buffer.
Then we can merge the shared buffer, with the current checkpointed data to get the historical data for that epoch.
To replicate the shared buffer, we simply just create a ReplicatedStateTable
.
This will just store the ReadVersions
but never upload them to the Object Store.
Then the StateTable
’s logic will take care of
merging the shared buffer and the committed data in the Object store for us.
Example: Read / Write Paths Replicated Chunk
Recall from the above section on schemas:
For chunks being replicated to the replicated state table, we need to transform them from (2) to (1), so they match the upstream state table schema.
For chunks being read from the replicated state table, it must contain logic to transform them from (1) to (2), to ensure the historical side and the upstream side have a consistent schema.
Where (1) refers to the state table schema of upstream, and (2) refers to the output schema from upstream to arrangement backfill.
Now let’s consider an instance where (1) has the schema:
id | name | age | drivers_license_id |
---|
And (2) has the schema:
drivers_license_id | name | id |
---|
Consider if we have the following chunk being replicated to the ReplicatedStateTable
:
drivers_license_id | name | id |
---|---|---|
1 | ‘Jack’ | 29 |
We will to transform it to the schema of (1), and insert it into the ReplicatedStateTable
:
id | name | age | drivers_license_id |
---|---|---|---|
29 | ‘Jack’ | NULL | 1 |
This will then be serialized into kv pairs and written to the state store.
Subsequently, when reading from the state store to get historical data, we deserialize the kv pairs, merging the shared buffer with the committed data in the Object Store.
Let’s say we got this back:
id | name | age | drivers_license_id |
---|---|---|---|
29 | ‘Jack’ | NULL | 1 |
30 | ‘Jill’ | 30 | 2 |
Then we will transform it to the schema of (2), and arrangement backfill will consume this historical data snapshot:
drivers_license_id | name | id |
---|---|---|
1 | ‘Jack’ | 29 |
2 | ‘Jill’ | 30 |
Initialization
Something to note is that for the first snapshot, upstream may not have finished committing data in that epoch to s3.
Additionally, we have not replicated any upstream records during that epoch, only in the subsequent ones.
As such, we must wait for that first checkpoint to be committed, before reading, or we risk missing the uncommitted data in our backfill.
This is supported internally inside init_epoch
for replicated state table.
upstream_table.init_epoch(first_epoch).await?;
Recovery
TODO
Scaling
TODO
Further improvements
- Make backfill vnode level within each partition: https://github.com/risingwavelabs/risingwave/issues/14905
Cdc Backfill
TODO
Source Backfill
TODO
An Overview of RisingWave State Store
Overview
In RisingWave, all streaming executors store their data into a state store. This KV state store is backed by a service called Hummock, a cloud-native LSM-Tree-based storage engine. Hummock provides key-value API, and stores all data on a S3-compatible service. However, it is not a KV store for general purpose, but a storage engine co-designed with RisingWave the streaming engine and optimized for streaming workloads.
Architecture
Reading this document requires prior knowledge of LSM-Tree-based KV storage engines, like RocksDB, LevelDB, etc.
Hummock consists of a manager service on the meta node, clients on worker nodes (including compute nodes, frontend nodes, and compactor nodes), and a shared storage to store files (SSTs). Every time a new write batch is produced, the Hummock client will upload those files to shared storage, and notify the Hummock manager of the new data. With compaction going on, new files will be added and unused files will be vacuumed. The Hummock manager will take care of the lifecycle of a file — is a file being used? can we delete a file? etc.
The streaming state store has distinguished workload characteristics.
- Every streaming executor will only read and write its own portion of data.
- Data (generally) won’t be shared across nodes, so every worker node will only read and write its own data. Therefore, every Hummock API, like
get
orscan
, only guarantees that writes on one node can be immediately read from the same node. In some cases, if we want to read data written from other nodes, we will need to wait for the epoch. - Streaming data are committed in serial. Based on the barrier-based checkpoint algorithm, the states are persisted epoch by epoch. We can tailor the write path specifically for the epoch-based checkpoint workload.
This leads to the design of Hummock, the cloud-native KV-based streaming state store. We’ll explain concepts like “epoch” and “barrier” in the following chapters.
The Hummock User API
In this part, we will introduce how users can use Hummock as a KV store.
The Hummock itself provides 3 simple APIs: ingest_batch
, get
, and scan
. Hummock provides MVCC write and read on KV pairs. Every key stored in Hummock has an epoch (aka. timestamp). Developers should specify an epoch when calling Hummock APIs.
Hummock doesn’t support writing a single key. To write data into Hummock, users should provide a sorted, unique list of keys and the corresponding operations (put value, delete), with an epoch, and call the ingest_batch
API. Therefore, within one epoch, users can only have one operation for a key. For example,
[a => put 1, b => put 2] epoch = 1 is a valid write batch
[a => put 1, a => delete, b => put 2] epoch = 1 is an invalid write batch
[b => put 1, a => put 2] epoch = 1 is an invalid write batch
For reads, we can call the scan
and get
API on the Hummock client. Developers need to specify a read epoch for read APIs. Hummock only guarantees that writes on one node can be immediately read from the same node. Let’s take a look at the following example:
Node 1: write a => 1, b => 2 at epoch 1
Node 1: write a => 3, b => 4 at epoch 2
Node 2: write c => 5, d => 6 at epoch 2
After all operations have been done,
Read at epoch 2 on Node 1: a => 3, b => 4, (c => 5, d => 6 may be read)
Read at epoch 1 on Node 1: a => 1, b => 2
Read at epoch 2 on Node 2 with `wait_epoch 2`: a => 3, b => 4, c => 5, d => 6
Hummock Internals
In this part, we will discuss how data are stored and organized in Hummock internally. If you will develop Hummock, you should learn some basic concepts, like SST, key encoding, read / write path, consistency, from the following sections.
Storage Format
All key-value pairs are stored in block-based SSTables. Each user key is associated with an epoch. In SSTs, key-value pairs are sorted first by user key (lexicographical order), and then by epoch (largest to smallest).
For example, if users write two batches in consequence:
write a => 1, b => 2 at epoch 1
write a => delete, b => 3 at epoch 2
After compaction (w/ min watermark = 0), there will eventually be an SST with the following content:
(a, 2) => delete
(a, 1) => 1
(b, 2) => 3
(b, 1) => 2
The final written key (aka. full key) is encoded by appending the 8-byte epoch after the user key. When doing full key comparison in Hummock, we should always compare full keys using the KeyComparator
to get the correct result.
Write Path
The Hummock client will batch writes and generate SSTs to sync to the underlying S3-compatible service. An SST consists of two files:
.data
: Data file composed of ~64KB blocks, each of which contains the actual key-value pairs..meta
: Meta file containing large metadata including min-max index, Bloom filter as well as data block metadata.
After the SST is uploaded to an S3-compatible service, the Hummock client will let the Hummock manager know there’s a new table. The list of all SSTs along with some metadata forms a version. When the Hummock client adds new SSTs to the Hummock manager, a new version will be generated with the new set of SST files.
Read Path
To read from Hummock, we need a version (a consistent state of list of SSTs we can read) and epoch to generate a consistent read snapshot. To avoid RPC with the Hummock manager in every user read, the Hummock client will cache a most recent version locally. The local version will be updated when 1) the client initiates a write batch and 2) the background refresher triggers.
For every read operation (scan
, get
), we will first select SSTs that might contain the required keys.
For scan
, we simply select by overlapping key range. For point get, we will filter SSTs further by Bloom filter. After that, we will compose a single MergeIterator
over all SSTs. The MergeIterator
will return all keys in range along with their epochs. Then, we will create UserIterator
over MergeIterator
, and for all user keys, the user iterator will pick the first full key whose epoch <= read epoch. Therefore, users can perform a snapshot read from Hummock based on the given epoch. The snapshot should be acquired beforehand and released afterwards.
Hummock implements the following iterators:
BlockIterator
: iterates a block of an SSTable.SSTableIterator
: iterates an SSTable.ConcatIterator
: iterates SSTables with non-overlapping key ranges.MergeIterator
: iterates SSTables with overlapping key ranges.UserIterator
: wraps internal iterators and outputs user key-value with epoch <= read epoch.
Compaction
Currently, Hummock is using a compaction strategy similar to leveled-compaction in RocksDB. It will compact data using consistent hash (docs and implementation TBD), so that data on shared storage distribute in the same way as how stream executors use them.
Compaction is done on a special worker node called compactor node. The standalone compactor listens for compaction jobs from the meta node, compacts one or more SSTs into new ones, and reports completion to the meta node. (In Hummock in-memory mode, compactor will be running as a thread inside compute node.)
To support MVCC read without affecting compaction, we track the epoch low watermark in Hummock snapshots. A user key-value pair will be retained if (1) it is the latest, or (2) it belongs to an epoch above the low watermark.
Transaction Management with Hummock Manager
source code of Hummock manager on meta service
In this part, we discuss how Hummock coordinates between multiple compute nodes. We will introduce key concepts like “snapshot”, “version”, and give examples on how Hummock manages them.
Every operation on the LSM-tree yields a new version on the Hummock manager, e.g., adding new L0 SSTs and compactions. In streaming, each stream barrier is associated with an epoch. When the barrier flows across the system and collected by the stream manager, we can start doing checkpoint on this epoch. SSTs produced in a single checkpoint are associated with an uncommitted epoch. After all compute nodes flush shared buffers to shared storage, the Hummock manager considers the epoch committed. Therefore, apart from the list of files in LSM, a version also contains committed epoch number max_committed_epoch
and SSTs in uncommitted epochs. As a result, both an operation on LSM and a streaming checkpoint will yield a new version in the Hummock manager.
Currently, there is only one checkpoint happening in the system at the same time. In the future, we might support more checkpoint optimizations including concurrent checkpointing.
As mentioned in Read Path, reads are performed on a version based on a given epoch. During the whole read process, data from the specified read epoch cannot be removed by compaction, which is guaranteed by pinning a snapshot; SSTs within a version cannot be vacuumed by compaction, which is guaranteed by pinning a version.
The SQL frontend will get the latest epoch from the meta service. Then, it will embed the epoch number into SQL plans, so that all compute nodes will read from that epoch. In theory, both SQL frontend and compute nodes will pin the snapshot, to handle the case that frontend goes down and the compute nodes are still reading from Hummock (#622). However, to simplify the process, currently we only pin on the frontend side.
Hummock only guarantees that writes on one node can be immediately read from the same node. However, the worker nodes running batch queries might have a slightly outdated version when a batch query plan is received (due to the local version caching). Therefore, we have a wait_epoch
interface to wait until the local cached version contains full data of one epoch.
When there is no reference to a version, all file deletions in this version can be actually applied. There is a background vacuum task dealing with the actual deletion.
Checkpointing in Streaming
Now we discuss how streaming executors and the streaming manager use Hummock as a state store.
From the perspective of the streaming executors, when they receive a barrier, they will be “in the new epoch”. For data received in epoch 1, they will be persisted (write batch) with epoch 1. Receiving the barrier also causes the read and write epoch being set to the new one.
Here we have two cases: Agg executors always persist and produce new write batches when receiving a barrier; Join executors (in the future when async flush gets implemented) will produce write batches within an epoch.
Streaming executors cannot control when data will be persisted — they can only write to Hummock’s shared buffer
. When a barrier flows across the system and is collected by the meta service, we can ensure that all executors have written their states of the previous epoch to the shared buffer, so we can initiate checkpoint process on all worker nodes, and upload SSTs to persistent remote storage.
For example, the barrier manager sends barrier epoch = 2. When the epoch 2 barrier is collected on meta service, we can ensure that data prior to epoch 2 have been fully flushed to the Hummock shared buffer. (Note that epoch number in streaming is generated by machine time + serial number, so we cannot simply use +1 -1 to determine the epoch of the previous / next barrier.) Assuming the previous barrier is of epoch 1, we can start checkpointing data from epoch 1 after barrier of epoch 2 has been collected.
The Hummock Shared Buffer
Introduction
Note: L0 and Shared buffer are totally different. They both exist independently.
The Hummock Shared Buffer serves 3 purposes:
-
Batch writes on worker node level, so as to reduce SST number.
- Currently, a single epoch might produce hundreds of SST, which makes meta service hard to handle.
-
Support async checkpoint.
- The shared buffer will generate SST based on epoch, and provide a consistent view of a epoch, by merging the snapshot of the storage SSTs and the immutable in-memory buffers.
-
Support read-after-write (so-called async flush), so as to make executor logic simpler.
Currently, if executors need to compute the “maximum” value, there are only two ways:
- Produce a write batch (i.e. write directly to object store), and read from the storage (like ExtremeState i.e. the state of
MAX()/MIN()
). - Write to in-memory flush buffer, and merge data from flush buffer and storage (like TopN, HashJoin).
- Produce a write batch (i.e. write directly to object store), and read from the storage (like ExtremeState i.e. the state of
Part 1: Async Checkpoint
Previously, when an executor is processing a barrier, it will flush its content to Hummock using the write_batch interface. But it turns out that we have so many executors, that a single epoch might produce 200∼300 write batches, which yields 200∼300 SSTs. The SST number is tremendous for an LSM engine.
The first idea is to batch writes. For example, we can collect all write batches from a single epoch, and flush them as one SST to the storage engine. However, it is not as simple as said — the downstream executor might rely on the upstream barrier to process data.
See the following example:
┌─────────────┐ ┌─────────────┐
│ │ │ │
│ HashAgg ├──────► XXXExecutor │
│ │ │ │
└─────────────┘ └─────────────┘
When HashAgg is processing ExtremeState, it will need to “Produce a write batch, and read from the storage”. Therefore, it cannot produce data for the current epoch until data get fully checkpoint. However, we want to batch writes and checkpoint at once. This is a contradiction. Therefore, the only way to solve this is to add a memtable and async checkpoint.
The async checkpoint logic will be implemented with StreamManager (on meta) and HummockLocalManager (on CN), and we will only have immutable memtables in this part.
Write Path
Assume there is a new barrier flowing across our system.
When each executor consumes a barrier, it will produce a write batch (which should complete immediately) and forward the barrier.
After all executors have completed processing barrier, a checkpoint will be initiated, and new SST of the previous epoch will be produced. As our system only have one barrier flowing, we can simply checkpoint epoch 3 after stream manager collects barrier 4.
Read Path
As executors all own their own keyspace, the read path doesn’t need to do snapshot isolation. We can simply view all committed SSTs and memtables as a “HummockVersion”.
When the barrier is being processed (but not checkpoint), the read set is simply all memtables on current worker node and all SSTs on shared storage.
When a checkpoint is being processed, things would become a little bit complex. The HummockVersion might contain both SSTs committed, and SSTs being checkpoint. In this case, the read set is:
- All SSTs (including those uncommitted), but set the read epoch to the previous checkpoint (in this case, < epoch 3, or simply <= epoch 2). Therefore, all epochs being checkpoint will be ignored from shared storage.
- All memtable of the checkpoint epoch (in this case, epoch 3).
These two parts combined provide a consistent view of the KV storage.
After the checkpoint has been finished (SSTs have been committed), we can either:
- Evict epoch 3 from Hummock memtable, and serve all data from shared storage.
- Retain latest N epochs in Hummock memtable, and serve all data where memtable epoch >= E, and shared storage epoch < E.
Having implemented async checkpoint, we can now batch writes and significantly reduce SST number.
Part 2: Write Anytime / Async Flush
As said in introduction, previously, all RisingWave streaming executors will do either of the following to maintain operator states:
- merge-on-write: Produce a write batch, and read from the storage (like ExtremeState).
- merge-on-read: Write to in-memory flush buffer, and merge data from flush buffer and storage (like TopN, HashJoin).
We have established earlier that the merge-on-write way is not scalable, as it will produce too many SSTs. So we will explore the second way.
When executors are using the second way, it will always need to “merge data from flush buffer and storage”. This “merge iterator” has been implemented in various ways in different executors, and make the ManagedState very hard to read.
Therefore, to standardize this, we support “async flush” in shared buffer, which means that streaming executors can write to the state store at any time, and the state store will provide “read after write” semantics within epoch.
Currently, all streaming executors will only read their own keys, since they are partitioned by state_table_id and vnode.
Therefore, we can leverage this property to provide a mutable memtable to each executor, and unify the “merge” logic across all “merge-on-read” executors.
A New Merge Iterator
Apart from the MergeIterator in Hummock, which merges SSTs from various levels in the LSMTree, we now need a “unified” merge iterator above the state store:
The MutableMemTable will store data in its in-memory-representation (e.g., i32, i32). The special MergeIterator will merge encoded data from Hummock and memory-representation data from MutableMemTable.
Therefore, we can unify all logic across TopN executor, HashJoin executor, etc.
Every executor only has one active MutableMemTable. When one epoch ends, the MutableMemTable should be converted to an immutable memtable in Hummock, and everything stays the same as The Hummock Shared Buffer — Part 1 (Async Checkpoint).
Considerations
For all data a, b of the same type, we must ensure that:
in-memory representation of a < in-memory representation of b,
iff memcomparable(a) < memcomparable(b)
Storing State Using Relational Table
Row-based Encoding
RisingWave adapts a relational data model. Relational tables, including tables and materialized views, consist of a list of named, strong-typed columns. All streaming executors store their data into a KV state store, which is backed by a service called Hummock. There are two choices to save a relational row into key-value pairs: cell-based format and row-based format. We choose row-based format because internal states always read and write the whole row, and don’t need to partially update some fields in a row. Row-based encoding has better performance than cell-based encoding, which reduces the number of read and write kv pairs.
We implement a relational table layer as the bridge between executors and KV state store, which provides the interfaces accessing KV data in relational semantics. As the executor state’s encoding is very similar to a row-based table, each kind of state is stored as a row-based relational table first. In short, one row is stored as a key-value pair. For example, encoding of some stateful executors in row-based format is as follows:
state | key | value |
---|---|---|
mv | table_id | sort key | pk | materialized value |
top n | table_id | sort key | pk | materialized value |
join | table_id | join_key | pk | materialized value |
agg | table_id | group_key | agg_value |
Relational Table Layer
In this part, we will introduce how stateful executors interact with KV state store through the relational table layer.
Relational table layer consists of State Table, Mem Table and Storage Table. The State Table and MemTable is used in streaming mode, and Storage Table is used in batch mode.
State Table provides the table operations by these APIs: get_row
, scan
, insert_row
, delete_row
and update_row
, which are the read and write interfaces for streaming executors. The Mem Table is an in-memory buffer for caching table operations during one epoch. The Storage Table is read only, and will output the partial columns upper level needs.
Write Path
To write into KV state store, executors first perform operations on State Table, and these operations will be cached in Mem Table. Once a barrier flows through one executor, executor will flush the cached operations into state store. At this moment, State Table will covert these operations into kv pairs and write to state store with specific epoch.
For example, an executor performs insert(a, b, c)
and delete(d, e, f)
through the State Table APIs, Mem Table first caches these two operations in memory. After receiving new barrier, State Table converts these two operations into KV operations by row-based format, and writes these KV operations into state store (Hummock).
Read Path
In streaming mode, executors should be able to read the latest written data, which means uncommitted data is visible. The data in Mem Table (memory) is fresher than that in shared storage (state store). State Table provides both point-get and scan to read from state store by merging data from Mem Table and Storage Table.
Get
For example, let’s assume that the first column is the pk of relational table, and the following operations are performed.
insert [1, 11, 111]
insert [2, 22, 222]
delete [2, 22, 222]
insert [3, 33, 333]
commit
insert [3, 3333, 3333]
After commit, a new record is inserted again. Then the Get
results with corresponding pk are:
Get(pk = 1): [1, 11, 111]
Get(pk = 2): None
Get(pk = 3): [3, 3333, 3333]
Scan
Scan on relational table is implemented by StateTableIter
, which is a merge iterator of MemTableIter
and StorageIter
. If a pk exists in both KV state store (shared storage) and memory (MemTable), result of MemTableIter
is returned. For example, in the following figure, StateTableIter
will generate 1->4->5->6
in order.
Example: HashAgg
In this doc, we will take HashAgg with extreme state (max
, min
) or value state (sum
, count
) for example, and introduce a more detailed design for the internal table schema.
Table id
table_id
is a globally unique id allocated in meta for each relational table object. Meta is responsible for traversing the Plan Tree and calculating the total number of Relational Tables needed. For example, the Hash Join Operator needs 2, one for the left table and one for the right table. The number of tables needed for Agg depends on the number of agg calls.
Value State (Sum, Count)
Query example:
select sum(v2), count(v3) from t group by v1
This query will need to initiate 2 Relational Tables. The schema is table_id/group_key
.
Extreme State (Max, Min)
Query example:
select max(v2), min(v3) from t group by v1
This query will need to initiate 2 Relational Tables. If the upstream is not append-only, the schema becomes table_id/group_key/sort_key/upstream_pk
.
The order of sort_key
depends on the agg call kind. For example, if it’s max()
, sort_key
will order with Ascending
. if it’s min()
, sort_key
will order with Descending
.
The upstream_pk
is also appended to ensure the uniqueness of the key.
This design allows the streaming executor not to read all the data from the storage when the cache fails, but only a part of it. The streaming executor will try to write all streaming data to storage, because there may be update
or delete
operations in the stream, it’s impossible to always guarantee correct results without storing all data.
If t
is created with append-only flag, the schema becomes table_id/group_key
, which is the same for Value State. This is because in the append-only mode, there is no update
or delete
operation, so the cache will never miss. Therefore, we only need to write one value to the storage.
Build RisingWave with Multiple Object Storage Backends
- Overview
- How RisingWave supports multiple object storage backends
- How to build RisingWave with multiple object store
Overview
As a cloud-neutral database, RisingWave supports running on different (object) storage backends. Currently, these storage products include
This doc first briefly introduces how RisingWave supports these storage products, then give a guidance about how to build RisingWave with these object stores quickly and easily through risedev.
How RisingWave supports multiple object storage backends
The first supported object storage was s3. Afterwards, for other object storage, RisingWave supports them in two ways: via s3 compatible mode or via OpenDAL.
S3 and other S3 compatible object store
If an object store declares that it is s3-compatible, it means that it can be directly accessed through the s3 APIs. As RisingWave already implemented S3ObjectStore
, we can reuse the interfaces of s3 to access this kind of object storage.
Currently for COS and Lyvecloud Storage, we use s3 compatible mode. To use these two object storage products, you need to overwrite s3 environmrnt with the corresponding access_key
, secret_key
, region
and bueket_name
, and config endpoint
as well.
OpenDAL object store
For those (object) storage products that are not compatible with s3 (or compatible but some interfaces are unstable), we use OpenDAL to access them. OpenDAL is the Open Data Access Layer to freely access data, which supports several different storage backends. We implemented a OpenDALObjectStore
to support the interface for accessing object store in RisingWave.
All of these object stores are supported in risedev, you can use the risedev command to start RisingWave on these storage backends.
How to build RisingWave with multiple object store
COS & Lyvecloud Storage
To use COS or Lyvecloud Storage, you need to overwrite the aws default access_key
, secret_key
, region
, and config endpoint in the environment variable:
export AWS_REGION=your_region
export AWS_ACCESS_KEY_ID=your_access_key
export AWS_SECRET_ACCESS_KEY=your_secret_key
export RW_S3_ENDPOINT=your_endpoint
then in risedev.yml
, set the bucket name, starting RisingWave with ridedev. Then you can successfully run RisingWave on these two storage backends.
GCS
To use GCS, you need to enable OpenDAL in risedev.yml
, set engine = gcs
, bucket_name
and root
as well. For authentication, you just need to config credential by gcloud
, and then OpenDAL gcs backend will automatically find the token in application to complete the verification.
Once these configurations are set, run ./risedev d gcs
and then you can run RisingWave on GCS.
OSS
To use OSS, you need to enable OpenDAL in risedev.yml
, set engine = oss
, bucket_name
and root
as well.
For authentication, set the identity information in the environment variable:
export OSS_ENDPOINT="endpoint"
export OSS_ACCESS_KEY_ID="oss_access_key"
export OSS_ACCESS_KEY_SECRET="oss_secret_key"
Once these configurations are set, run ./risedev d oss
and then you can run RisingWave on OSS.
Azure Blob
To use Azure Blob, you need to enable OpenDAL in risedev.yml
, set engine = azblob
, bucket_name
and root
as well. For azure blob storage, bucket_name
is actually the container_name
.
For authentication, set the identity information in the environment variable:
export AZBLOB_ENDPOINT="endpoint"
export AZBLOB_ACCOUNT_NAME="your_account_name"
export AZBLOB_ACCOUNT_KEY="your_account_key"
Once these configurations are set, run ./risedev d azblob
and then you can run RisingWave on Azure Blob Storage.
HDFS
HDFS requairs complete hadoop environment and java environment, which are very heavy. Thus, RisingWave does not open the hdfs feature by default. To compile RisingWave with hdfs backend, turn on this feature first, and enable hdfs for risedev tools.
Run ./risedev configure
, and enable [Component] Hummock: Hdfs Backend
.
After that, you need to enable OpenDAL in risedev.yml
, set engine = hdfs
, namenode
and root
as well.
You can also use WebHDFS as a lightweight alternative to HDFS. Hdfs is powered by HDFS’s native java client. Users need to setup the hdfs services correctly. But webhdfs can access from HTTP API and no extra setup needed. The way to start WebHDFS is basically the same as hdfs, but its default name_node is 127.0.0.1:9870
.
Once these configurations are set, run ./risedev d hdfs
or ./risedev d webhdfs
, then you can run RisingWave on HDFS(WebHDFS).
Meta Service
Background
RisingWave provides both real-time analytical query as well as high-concurrent access to Materialized Views. Therefore, both the frontend and compute nodes are designed to be scalable, and they may share the same set of host machines or not, depending on cluster size and whether the user cares about resource isolation.
Meanwhile, components such as metadata provider, scheduler, monitoring are more suitable for a centralized design. For example, a typical on-premise deployment may look like below, where the dotted boxes represent minimal unit of deployment (VM or container).
Meta Store
Metadata should be regarded as transactional data. Especially, we need guarantees like atomicity, consistency (e.g. read-after-write) and transaction support. Besides, it’s not necessary to scale out it to multiple nodes.
We choose etcd for production deployments, which is a replicated key-value storage based on B-Tree and Raft protocol.
To fit into the key-value data model, metadata entries are serialized with Protobuf as the value and the key is the ID or version of them, with respect to the kind of metadata.
Types of Metadata
Catalog
Catalog is the metadata of relational tables in databases.
- Database & Schema: Namespace of database objects like in PostgreSQL.
- Table & Materialized Views: Definition of tables & materialized views along with the columns on them.
- Source: User-defined external data sources.
To execute a DDL statement like CREATE
or DROP TABLE
, the frontend sends an RPC to meta node and waits the updated catalog to take effect.
Storage
Hummock, an LSM-Tree-based storage engine, stores the mapping from version to the set of SSTable files in Meta Service. See more details in the overview of State Store.
Push on Updates
There are 2 choices on how to distribute information across multiple nodes.
- Push: When metadata changes, the meta node tells all nodes to update, and master node must wait for others to acknowledge before continuing.
- Pull: When data changes, the master node does nothing. Other nodes may not have the latest information, so they need to ask the master node every time.
Currently, for simplicity, we choose the push-style approach for all kinds of metadata. This is implemented as NotificationService
on meta service and ObserverManager
on frontend and compute nodes.
ObserverManager
will register itself to meta service on bootstrap and subscribe metadata it needs. Afterwards, once metadata changed, the meta node streams the changes to it, expecting all subscribers to acknowledge.
Data Model and Encoding
Data Model
Source files:
common/src/types
RisingWave adapts a relational data model with extensive support for semi-structured data. Relational tables, including tables and materialized views, consist of a list of named, strong-typed columns.
Tables created by users have an implicit, auto-generated row-id column as their primary key; while for materialized views, the primary key is derived from queries. For example, the primary key of an aggregation (group-by) materialized view is the specified group keys.
NULL
values mean missing or unknown fields. Currently, all columns are implicitly nullable.
Primitive data types:
- Booleans:
BOOLEAN
- Integers:
SMALLINT
(16-bit),INT
(32-bit),BIGINT
(64-bit) - Decimals:
NUMERIC
- Floating-point numbers:
REAL
,DOUBLE
- Strings:
VARCHAR
- Temporals:
DATE
,TIMESTAMP
,TIMESTAMP WITH TIME ZONE
,TIME
,INTERVAL
Composite data types:
Struct
: A structure with a list of named, strong-typed fields.List
: A variable-length list of values with same data type.
In-Memory Encoding
Source files:
common/src/array
In-memory data is encoded in arrays for vectorized execution. For variable-length data like strings, generally we use another offset array to mark the start of encoded values in a byte buffer.
A Data Chunk consists of multiple columns and a visibility array, as is shown in the left subgraph below. The visibility array marks each row as visible or not. This helps filtering some rows while keeping other data arrays unchanged.
A Stream Chunk consists of columns, visibility array and an additional ops
column, as is shown in the right subgraph below. The ops
column marks the operation of row, which can be one of Delete
, Insert
, UpdateDelete
and UpdateInsert
.
On-Disk Encoding
Source files:
utils/memcomparable
,utils/value-encoding
RisingWave stores user data in shared key-value storage called ‘Hummock’. Tables, materialized views and checkpoints of internal streaming operators are encoded into key-value entries. Every field of a row, a.k.a. cell, is encoded as a key-value entry, except that NULL
values are omitted.
Considering that ordering matters in some cases, e.g. result set of an order-by query, fields of keys must preserve the order of original values after being encoded into bytes. This is what memcomparable
is used for. For example, integers must be encoded in big-endian and the sign bit must be flipped to preserve order. In contrast, the encoding of values does not need to preserve order.
Design: Local execution mode for online serving queries
Background
The query engine of RisingWave supports two types of queries: highly concurrent point queries and ad-hoc queries. The characteristics of these two different kinds of queries are summarized as follows:
Point Queries | Adhoc Queries | |
---|---|---|
Latency | several ms | ms to minutes |
QPS | 1000 ~ 100000 | 100 ~ 1000 |
SQL | Simple | Arbitrary complex |
Result Set | Small | Small, Medium, Large |
Use Scenarios | Dashboard | Adhoc analysis |
Our distributed query processing engine is designed for complex adhoc queries, and it can’t meet the latency/QPS requirement of point queries, and in this article we introduce local execution mode for point queries.
Design
Example 1: select a from t where b in (1, 2, 3, 4)
Let’s use the above SQL as an example:
The key changes from the distributed mode:
- The exchange executor will be executed directly by local query execution, not by distributed scheduler. This means that we no longer have async execution/monitoring, etc.
- The rpc is issued by exchange executor directly, not by scheduler.
Example 2: SELECT pk, t1.a, t1.fk, t2.b FROM t1, t2 WHERE t1.fk = t2.pk AND t1.pk = 114514
Following is the plan and execution of above sql in local mode:
As explained above, the lookup join/exchange phase will be executed directly on frontend. The pushdown(filter/table, both the build and probe side) will be issued by executors rather than scheduler.
Optimization/Scheduling
The overall process will be quite similar to distributed processing, but with a little difference:
- We only use heuristic optimizer for it, and only a limited set of rules will be applied.
- The scheduler will not be involved, and the physical plan is executed in the current thread (coroutine) immediately.
Monitoring/Management
Local execution mode will not go through query management mentioned in batch query manager to reduce latency as much as possible.
How to switch between local/distributed execution modes?
As mentioned in the first paragraph, the main use case for local execution mode is determined(dashboard/reporting), so
currently we just expose a session configuration(query_mode
) to user. In future we may use optimizer to determined
it if required.
RPC execution in local mode
In the distributed mode we have several steps to execute a computing task and fetch results:
There are some problems with above process in local mode:
- We need at least two rpcs to fetch task execution result, this increases query overhead
- We have task lifecycle management APIs, this is unnecessary for local mode.
- We may need to add several new APIs for task monitoring/failure detection
For the local mode we will add a new rpc API:
rpc Execute(ExecuteRequest) returns (ExecuteResponse)
message ExecuteRequest {
batch_plan.PlanFragment plan = 2;
uint64 epoch = 3;
}
message ExecuteResponse {
common.Status status = 1;
data.DataChunk record_batch = 2;
}
This is quite similar to distributed execution APIs, but with some differences:
- Response is returned directly in rpc call, without the need of another call.
- No task lifecycle management/monitoring is involved, and if it fails, we just remove the task and return error in response directly.
Consistent Hash
Background
Scaling could occur for multiple reasons in RisingWave. For example, when workload of some streaming operator is heavier than expected, we need to scale out. During scaling out or scaling in, physical resources will be allocated or freed accordingly. That is to say, new actors will be created on scaling out, while some old actors will be dropped on scaling in. As a result of actor change, redistributing data (i.e. states of the actors) is required inevitably. That yields a question: how to efficiently determine the distribution of data and minimize data movement on scaling?
On the other hand, we need to parallel the scan on tables or materialized views in batch query mode. Therefore, we need to partition the data in a way that could boost the performance most. So what is the most beneficial way of data partition for tables and materialized views?
In RisingWave, we adopt consistent-hash-based strategy to solve the two problems above. This document will elaborate on our design.
Design
Meta
Actor Scheduling
First, we need to introduce a little about how we schedule the actors. Each worker node in RisingWave cluster will have a number of parallel units. A parallel unit is the minimal scheduling unit in RisingWave, as well as the physical location of an actor. Each actor will be scheduled to exactly one parallel unit.
Data Distribution
Here comes the main part, where we will construct a mapping that determines data distribution.
For all data \( k \in U_k \), where \( U_k \) is an unbounded set, we apply a hash function \( v = H(k) \), where \( v \) falls to a limited range. The hash function \( H \) ensures that all data are hashed uniformly to that range. We call \( v \) vnode, namely virtual node, as is shown as the squares in the figure below.
Then we have vnode mapping, which ensures that vnodes are mapped evenly to parallel units in the cluster. In other words, the number of vnodes that are mapped to each parallel unit should be as close as possible. This is denoted by different colors in the figure above. As is depicted, we have 3 parallel units (shown as circles), each taking \( \frac{1}{3} \) of total vnodes. Vnode mapping is constructed and maintained by meta.
As long as the hash function \( H \) could ensure uniformity, the data distribution determined by this strategy would be even across physical resources. The evenness will be retained even if data in \( U_k \) are skewed to a certain range, say, most students scoring over 60 in a hundred-mark system.
Data Redistribution
Since \( v = H(k) \), the way that data are mapped to vnodes will be invariant. Therefore, when scaling occurs, we only need to modify vnode mapping (the way that vnodes are mapped to parallel units), so as to redistribute the data.
Let’s take scaling out for example. Assume that we have one more parallel unit after scaling out, as is depicted as the orange circle in the figure below. Using the optimal strategy, we modify the vnode mapping in such a way that only \( \frac{1}{4} \) of the data have to be moved, as is shown in the figure below. The vnodes whose data are required to be moved are highlighted with bold border in the figure.
To minimize data movement when scaling occurs, we should be careful when we modify the vnode mapping. Below is an opposite example. Modifying vnode mapping like this will result in \( \frac{1}{2} \) of the data being moved.
Streaming
We know that a fragment has several actors as its different parallelisms, and that upstream actors will send data to downstream actors via dispatcher. The figure below illustrates how actors distribute data based on consistent hash by example.
In the figure, we can see that one upstream actor dispatches data to three downstream actors. The downstream actors are scheduled on the parallel units mentioned in previous example respectively.
Based on our consistent hash design, the dispatcher is informed of the latest vnode mapping by meta node. It then decides how to send data by following steps:
- Compute vnode of the data via the hash function \( H \). Let the vnode be \( v_k \).
- Look up vnode mapping and find out parallel unit \( p_n \) that vnode \( v_k \) maps to.
- Send data to the downstream actor that is scheduled on parallel unit \( p_n \) (remember that one actor will be scheduled on exactly one parallel unit).
In this way, all actors’ data (i.e. actors’ states) will be distributed according to the vnode mapping constructed by meta.
When scaling occurs, actors will be re-scheduled accordingly. By modifying the vnode mapping in meta and make streaming act on the new vnode mapping, we could minimize data movement from following aspects:
- The data of existing actors will not be displaced too much.
- The block cache of a compute node will not be invalidated too much.
Batch
When we perform parallel batch read, we should partition the data for each parallelism in some way. Now that we have vnodes corresponding to disjoint sets of data, this naturally forms a data partition pattern: one vnode could be viewed as a minimal data partition unit, and we could aggregate several vnodes together to get a larger data partition.
In vnode mapping introduced above, one parallel unit will correspond to several vnodes, so we could view vnodes that are mapped to one parallel unit as one partition group. In the figure above, namely, a partition group is vnodes with the same color (or the data that are hashed to these vnodes).
This is better than range partition in that this approach of partition is more stable when the primary key of a materialized view distributes non-randomly, for example, monotonically increasing.
Storage
If we look into the read-write pattern of streaming actors, we’ll find that in most cases, actors only need to read the data written by itself (i.e. actor’s internal states). Namely, read data with the same vnodes as it previously writes.
Therefore, an instinctive way to place data in storage is to group data by vnodes. In this way, when actors perform read operation, they could touch as few SST blocks as possible and thus trigger less I/O.
We know that Hummock, our LSM-Tree-based storage engine, sorts key-value pairs by the order of the key. Hence, in order to group data by vnode on the basis of Hummock, we encode vnode into the storage key. The storage key will look like
table_id | vnode | ...
where table_id
denotes the state table, and vnode
is computed via \( H \) on key of the data.
To illustrate this, let’s revisit the previous example. Executors of an operator will share the same logical state table, just as is shown in the figure below:
Now that we have 12 vnodes in total in the example, the data layout in storage will accordingly look like this:
Note that we only show the logical sequence and aggregation of data in this illustration. The actual data may be separated into different SSTs in Hummock.
Since the way that certain data are hashed to vnode is invariant, the encoding of the data will also be invariant. How we schedule the fragment (e.g. parallelism of the fragment) will not affect data encoding. In other words, storage will not care about vnode mapping, which is determined by meta and used only by streaming. This is actually a way of decoupling the storage layer from the compute layer.
Keys
Document the different Keys in RisingWave.
Stream Key
The key which can identify records in the RisingWave stream.
For example, given the following stream chunk, where stream key is k1, k2
:
| op | k1 | k2 | v1 | v2 |
|----|----|----|----|----|
| - | 1 | 2 | 1 | 1 |
| + | 1 | 2 | 3 | 4 |
| + | 0 | 1 | 2 | 3 |
We can tell that the record corresponding to the key (1, 2)
has been updated from (1, 2, 1, 1)
to (1, 2, 3, 4)
.
The record corresponding to key (0, 1)
has been inserted with (0, 1, 2, 3)
.
It may not be the minimal set of columns required to identify a record,
for instance group key
could be part of the stream key, to specify the distribution of records.
Primary Key (Storage)
This discusses the internal primary key (pk) which we often see in streaming operators. It is different from the primary key in SQL.
A more appropriate name for this would be Storage Primary Key
.
Besides uniquely identifying a record in storage, this key may also be used to provide ordering properties.
Let’s use the following query as an example:
create table t1(id bigint primary key, i bigint);
create materialized view mv1 as select id, i from t1 order by i, id;
mv1
has the following plan:
StreamMaterialize {
columns: [id, i],
stream_key: [id],
pk_columns: [i, id], -- notice the pk_columns
pk_conflict: NoCheck
}
└─StreamTableScan { table: t1, columns: [id, i] }
You can see that the pk_columns
are [i, id]
, although the upstream SQL primary key is just id
.
In the storage layer, key-value pairs are sorted by their keys.
Because the materialized view contains an order by i, id
,
the storage primary key is [i, id]
to ensure they are ordered in storage.
Importantly, i
will be a prefix of the key.
Then when iterating over the keys from storage, the records are returned in the correct order per partition.
When the update stream comes, we can just use id
to identify the records that need to be updated.
We can get the whole record corresponding to the id
and get the i
from there.
Then we can use that to update the materialized state accordingly.