Compare commits

...

16 Commits

Author SHA1 Message Date
discord9
77e340270e Downgrade rust-toolchain.toml
DO NOT MERGE
2025-04-29 17:52:43 +08:00
Weny Xu
06e8d46ba9 feat: implement batch region opening in metric engine (#6017)
feat: implement batch open metric regions
2025-04-29 09:05:27 +00:00
zyy17
89661c0626 ci: fix the bugs of release-dev-builder-images and add update-dev-builder-image-tag (#6009)
* fix: the dev-builder release job is not triggered by merged event

* ci: add update-dev-builder-image-tag
2025-04-29 06:25:15 +00:00
Weny Xu
a3ae2d7b52 feat: flush leader region before downgrading (#5995)
* feat: flush leader region before downgrading

* test: add unit tests

* chore: apply suggestions from CR
2025-04-29 03:28:00 +00:00
Ruihang Xia
789f585a7f fix: disable recursion limit in prost (#6010)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-04-28 17:21:49 +00:00
jeremyhi
133f404547 fix: sanitize_connection_string (#6012) 2025-04-28 13:56:26 +00:00
discord9
bdd44fd7ec chore: only retry when retry-able in flow (#5987)
* chore: only retry when retry-able

* chore: revert dbg change

* refactor: per review

* fix: check for available frontend first

* docs: more explain&longer timeout&feat: more retry at every level&try send select 1

* fix: use `sql` method for "SELECT 1"

* fix: also put recover flows in spawned task and a dead loop

* test: update transient error in flow rebuild test

* chore: sleep after sqlness sleep

* chore: add a warning

* chore: wait even more time after reboot
2025-04-28 09:49:49 +00:00
Weny Xu
13ac4d5048 fix: only consider the datanode that reports the failure (#6004)
* fix: only consider the datanode that reports the failure

* chore: fix clippy
2025-04-28 06:08:02 +00:00
dennis zhuang
c6448a6ccc feat: remove own greatest fn (#5994) 2025-04-28 05:27:34 +00:00
Yingwen
86aae6733d fix: prune primary key with multiple columns may use default value as statistics (#5996)
* test: incorrect test result when filtering pk with multiple columns

* fix: prune non first tag correctly

Distinguish no column and no stats and only use default value when no
column

* test: update test result

* refactor: rename test file

* test: add test for null filter

* fix: use StatValues for null counts

* test: drop table

* test: fix unstable flow test
2025-04-28 04:53:30 +00:00
liyang
ed1ce8438f ci: update dev-builder image version to 2025-04-15-1a517ec8-202504280… (#6003)
ci: update dev-builder image version to 2025-04-15-1a517ec8-20250428023155
2025-04-28 03:34:31 +00:00
fys
4b921b8425 chore: make txn_helper pub (#6002)
chore: make txn_helper from pub(crate) to pub
2025-04-28 02:52:39 +00:00
Lei, HUANG
1a517ec8ac fix: check if memtable is empty by stats (#5989)
fix/checking-memtable-empty-and-stats:
 - **Refactor timestamp updates**: Simplified timestamp range updates in `PartitionTreeMemtable` and `TimeSeriesMemtable` by replacing `update_timestamp_range` with `fetch_max` and `fetch_min` methods for `max_timestamp` and `min_timestamp`.
   - Affected files: `partition_tree.rs`, `time_series.rs`

 - **Remove unused code**: Deleted the `update_timestamp_range` method from `WriteMetrics` and removed unnecessary imports.
   - Affected file: `stats.rs`

 - **Optimize memtable filtering**: Streamlined the check for empty memtables in `ScanRegion` by directly using `time_range`.
   - Affected file: `scan_region.rs`
2025-04-28 01:57:17 +00:00
discord9
21044c7339 feat: uddsketch_merge udaf (#5992) 2025-04-27 12:43:21 +00:00
Ning Sun
8e1ec2a201 chore: update nix for new toolchain (#5991) 2025-04-27 11:40:44 +00:00
Weny Xu
5ed0a095b6 feat: introduce RegionStatAwareSelector trait (#5990)
* feat: introduce `RegionStatAwareSelector`

* feat: exclude all failed datanodes

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: apply suggestions from CR
2025-04-27 11:22:39 +00:00
66 changed files with 1864 additions and 617 deletions

37
.github/scripts/update-dev-builder-version.sh vendored Executable file
View File

@@ -0,0 +1,37 @@
#!/bin/bash
DEV_BUILDER_IMAGE_TAG=$1
update_dev_builder_version() {
if [ -z "$DEV_BUILDER_IMAGE_TAG" ]; then
echo "Error: Should specify the dev-builder image tag"
exit 1
fi
# Configure Git configs.
git config --global user.email greptimedb-ci@greptime.com
git config --global user.name greptimedb-ci
# Checkout a new branch.
BRANCH_NAME="ci/update-dev-builder-$(date +%Y%m%d%H%M%S)"
git checkout -b $BRANCH_NAME
# Update the dev-builder image tag in the Makefile.
gsed -i "s/DEV_BUILDER_IMAGE_TAG ?=.*/DEV_BUILDER_IMAGE_TAG ?= ${DEV_BUILDER_IMAGE_TAG}/g" Makefile
# Commit the changes.
git add Makefile
git commit -m "ci: update dev-builder image tag"
git push origin $BRANCH_NAME
# Create a Pull Request.
gh pr create \
--title "ci: update dev-builder image tag" \
--body "This PR updates the dev-builder image tag" \
--base main \
--head $BRANCH_NAME \
--reviewer zyy17 \
--reviewer daviderli614
}
update_dev_builder_version

View File

@@ -24,11 +24,19 @@ on:
description: Release dev-builder-android image
required: false
default: false
update_dev_builder_image_tag:
type: boolean
description: Update the DEV_BUILDER_IMAGE_TAG in Makefile and create a PR
required: false
default: false
jobs:
release-dev-builder-images:
name: Release dev builder images
if: ${{ inputs.release_dev_builder_ubuntu_image || inputs.release_dev_builder_centos_image || inputs.release_dev_builder_android_image }} # Only manually trigger this job.
# The jobs are triggered by the following events:
# 1. Manually triggered workflow_dispatch event
# 2. Push event when the PR that modifies the `rust-toolchain.toml` or `docker/dev-builder/**` is merged to main
if: ${{ github.event_name == 'push' || inputs.release_dev_builder_ubuntu_image || inputs.release_dev_builder_centos_image || inputs.release_dev_builder_android_image }}
runs-on: ubuntu-latest
outputs:
version: ${{ steps.set-version.outputs.version }}
@@ -57,9 +65,9 @@ jobs:
version: ${{ env.VERSION }}
dockerhub-image-registry-username: ${{ secrets.DOCKERHUB_USERNAME }}
dockerhub-image-registry-token: ${{ secrets.DOCKERHUB_TOKEN }}
build-dev-builder-ubuntu: ${{ inputs.release_dev_builder_ubuntu_image }}
build-dev-builder-centos: ${{ inputs.release_dev_builder_centos_image }}
build-dev-builder-android: ${{ inputs.release_dev_builder_android_image }}
build-dev-builder-ubuntu: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }}
build-dev-builder-centos: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }}
build-dev-builder-android: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }}
release-dev-builder-images-ecr:
name: Release dev builder images to AWS ECR
@@ -85,7 +93,7 @@ jobs:
- name: Push dev-builder-ubuntu image
shell: bash
if: ${{ inputs.release_dev_builder_ubuntu_image }}
if: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -106,7 +114,7 @@ jobs:
- name: Push dev-builder-centos image
shell: bash
if: ${{ inputs.release_dev_builder_centos_image }}
if: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -127,7 +135,7 @@ jobs:
- name: Push dev-builder-android image
shell: bash
if: ${{ inputs.release_dev_builder_android_image }}
if: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -162,7 +170,7 @@ jobs:
- name: Push dev-builder-ubuntu image
shell: bash
if: ${{ inputs.release_dev_builder_ubuntu_image }}
if: ${{ inputs.release_dev_builder_ubuntu_image || github.event_name == 'push' }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -176,7 +184,7 @@ jobs:
- name: Push dev-builder-centos image
shell: bash
if: ${{ inputs.release_dev_builder_centos_image }}
if: ${{ inputs.release_dev_builder_centos_image || github.event_name == 'push' }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -190,7 +198,7 @@ jobs:
- name: Push dev-builder-android image
shell: bash
if: ${{ inputs.release_dev_builder_android_image }}
if: ${{ inputs.release_dev_builder_android_image || github.event_name == 'push' }}
env:
IMAGE_VERSION: ${{ needs.release-dev-builder-images.outputs.version }}
IMAGE_NAMESPACE: ${{ vars.IMAGE_NAMESPACE }}
@@ -201,3 +209,24 @@ jobs:
quay.io/skopeo/stable:latest \
copy -a docker://docker.io/$IMAGE_NAMESPACE/dev-builder-android:$IMAGE_VERSION \
docker://$ACR_IMAGE_REGISTRY/$IMAGE_NAMESPACE/dev-builder-android:$IMAGE_VERSION
update-dev-builder-image-tag:
name: Update dev-builder image tag
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
if: ${{ github.event_name == 'push' || inputs.update_dev_builder_image_tag }}
needs: [
release-dev-builder-images
]
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Update dev-builder image tag
shell: bash
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
./.github/scripts/update-dev-builder-version.sh ${{ needs.release-dev-builder-images.outputs.version }}

View File

@@ -163,7 +163,7 @@ paste = "1.0"
pin-project = "1.0"
prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { version = "0.5.1", features = ["ser"] }
prost = "0.13"
prost = { version = "0.13", features = ["no-recursion-limit"] }
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.9"
ratelimit = "0.10"

View File

@@ -8,7 +8,7 @@ CARGO_BUILD_OPTS := --locked
IMAGE_REGISTRY ?= docker.io
IMAGE_NAMESPACE ?= greptime
IMAGE_TAG ?= latest
DEV_BUILDER_IMAGE_TAG ?= 2024-12-25-a71b93dd-20250305072908
DEV_BUILDER_IMAGE_TAG ?= 2025-04-15-1a517ec8-20250428023155
BUILDX_MULTI_PLATFORM_BUILD ?= false
BUILDX_BUILDER_NAME ?= gtbuilder
BASE_IMAGE ?= ubuntu

18
flake.lock generated
View File

@@ -8,11 +8,11 @@
"rust-analyzer-src": "rust-analyzer-src"
},
"locked": {
"lastModified": 1742452566,
"narHash": "sha256-sVuLDQ2UIWfXUBbctzrZrXM2X05YjX08K7XHMztt36E=",
"lastModified": 1745735608,
"narHash": "sha256-L0jzm815XBFfF2wCFmR+M1CF+beIEFj6SxlqVKF59Ec=",
"owner": "nix-community",
"repo": "fenix",
"rev": "7d9ba794daf5e8cc7ee728859bc688d8e26d5f06",
"rev": "c39a78eba6ed2a022cc3218db90d485077101496",
"type": "github"
},
"original": {
@@ -41,11 +41,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1743576891,
"narHash": "sha256-vXiKURtntURybE6FMNFAVpRPr8+e8KoLPrYs9TGuAKc=",
"lastModified": 1745487689,
"narHash": "sha256-FQoi3R0NjQeBAsEOo49b5tbDPcJSMWc3QhhaIi9eddw=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "44a69ed688786e98a101f02b712c313f1ade37ab",
"rev": "5630cf13cceac06cefe9fc607e8dfa8fb342dde3",
"type": "github"
},
"original": {
@@ -65,11 +65,11 @@
"rust-analyzer-src": {
"flake": false,
"locked": {
"lastModified": 1742296961,
"narHash": "sha256-gCpvEQOrugHWLimD1wTFOJHagnSEP6VYBDspq96Idu0=",
"lastModified": 1745694049,
"narHash": "sha256-fxvRYH/tS7hGQeg9zCVh5RBcSWT+JGJet7RA8Ss+rC0=",
"owner": "rust-lang",
"repo": "rust-analyzer",
"rev": "15d87419f1a123d8f888d608129c3ce3ff8f13d4",
"rev": "d8887c0758bbd2d5f752d5bd405d4491e90e7ed6",
"type": "github"
},
"original": {

View File

@@ -21,7 +21,7 @@
lib = nixpkgs.lib;
rustToolchain = fenix.packages.${system}.fromToolchainName {
name = (lib.importTOML ./rust-toolchain.toml).toolchain.channel;
sha256 = "sha256-i0Sh/ZFFsHlZ3oFZFc24qdk6Cd8Do8OPU4HJQsrKOeM=";
sha256 = "sha256-arzEYlWLGGYeOhECHpBxQd2joZ4rPKV3qLNnZ+eql6A=";
};
in
{

View File

@@ -1,2 +1,2 @@
[toolchain]
channel = "nightly-2025-04-15"
channel = "nightly-2024-12-25"

View File

@@ -36,8 +36,8 @@ use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_query::Output;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::RecordBatchStreamWrapper;
use common_telemetry::error;
use common_telemetry::tracing_context::W3cTrace;
use common_telemetry::{error, warn};
use futures::future;
use futures_util::{Stream, StreamExt, TryStreamExt};
use prost::Message;
@@ -192,6 +192,36 @@ impl Database {
from_grpc_response(response)
}
/// Retry if connection fails, max_retries is the max number of retries, so the total wait time
/// is `max_retries * GRPC_CONN_TIMEOUT`
pub async fn handle_with_retry(&self, request: Request, max_retries: u32) -> Result<u32> {
let mut client = make_database_client(&self.client)?.inner;
let mut retries = 0;
let request = self.to_rpc_request(request);
loop {
let raw_response = client.handle(request.clone()).await;
match (raw_response, retries < max_retries) {
(Ok(resp), _) => return from_grpc_response(resp.into_inner()),
(Err(err), true) => {
// determine if the error is retryable
if is_grpc_retryable(&err) {
// retry
retries += 1;
warn!("Retrying {} times with error = {:?}", retries, err);
continue;
}
}
(Err(err), false) => {
error!(
"Failed to send request to grpc handle after {} retries, error = {:?}",
retries, err
);
return Err(err.into());
}
}
}
}
#[inline]
fn to_rpc_request(&self, request: Request) -> GreptimeRequest {
GreptimeRequest {
@@ -368,6 +398,11 @@ impl Database {
}
}
/// by grpc standard, only `Unavailable` is retryable, see: https://github.com/grpc/grpc/blob/master/doc/statuscodes.md#status-codes-and-their-use-in-grpc
pub fn is_grpc_retryable(err: &tonic::Status) -> bool {
matches!(err.code(), tonic::Code::Unavailable)
}
#[derive(Default, Debug, Clone)]
struct FlightContext {
auth_header: Option<AuthHeader>,

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::time::Duration;
use async_trait::async_trait;
@@ -131,7 +132,7 @@ impl SubCommand {
}
}
#[derive(Debug, Default, Parser)]
#[derive(Default, Parser)]
pub struct StartCommand {
/// The address to bind the gRPC server.
#[clap(long, alias = "bind-addr")]
@@ -171,6 +172,27 @@ pub struct StartCommand {
backend: Option<BackendImpl>,
}
impl fmt::Debug for StartCommand {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StartCommand")
.field("rpc_bind_addr", &self.rpc_bind_addr)
.field("rpc_server_addr", &self.rpc_server_addr)
.field("store_addrs", &self.sanitize_store_addrs())
.field("config_file", &self.config_file)
.field("selector", &self.selector)
.field("use_memory_store", &self.use_memory_store)
.field("enable_region_failover", &self.enable_region_failover)
.field("http_addr", &self.http_addr)
.field("http_timeout", &self.http_timeout)
.field("env_prefix", &self.env_prefix)
.field("data_home", &self.data_home)
.field("store_key_prefix", &self.store_key_prefix)
.field("max_txn_ops", &self.max_txn_ops)
.field("backend", &self.backend)
.finish()
}
}
impl StartCommand {
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
let mut opts = MetasrvOptions::load_layered_options(
@@ -184,6 +206,15 @@ impl StartCommand {
Ok(opts)
}
fn sanitize_store_addrs(&self) -> Option<Vec<String>> {
self.store_addrs.as_ref().map(|addrs| {
addrs
.iter()
.map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
.collect()
})
}
// The precedence order is: cli > config file > environment variables > default values.
fn merge_with_cli_options(
&self,

View File

@@ -19,4 +19,4 @@ mod uddsketch_state;
pub use geo_path::{GeoPathAccumulator, GEO_PATH_NAME};
pub(crate) use hll::HllStateType;
pub use hll::{HllState, HLL_MERGE_NAME, HLL_NAME};
pub use uddsketch_state::{UddSketchState, UDDSKETCH_STATE_NAME};
pub use uddsketch_state::{UddSketchState, UDDSKETCH_MERGE_NAME, UDDSKETCH_STATE_NAME};

View File

@@ -31,23 +31,28 @@ use datafusion::physical_plan::expressions::Literal;
use datafusion::prelude::create_udaf;
use datatypes::arrow::array::ArrayRef;
use datatypes::arrow::datatypes::{DataType, Float64Type};
use serde::{Deserialize, Serialize};
use uddsketch::{SketchHashKey, UDDSketch};
pub const UDDSKETCH_STATE_NAME: &str = "uddsketch_state";
#[derive(Debug)]
pub const UDDSKETCH_MERGE_NAME: &str = "uddsketch_merge";
#[derive(Debug, Serialize, Deserialize)]
pub struct UddSketchState {
uddsketch: UDDSketch,
error_rate: f64,
}
impl UddSketchState {
pub fn new(bucket_size: u64, error_rate: f64) -> Self {
Self {
uddsketch: UDDSketch::new(bucket_size, error_rate),
error_rate,
}
}
pub fn udf_impl() -> AggregateUDF {
pub fn state_udf_impl() -> AggregateUDF {
create_udaf(
UDDSKETCH_STATE_NAME,
vec![DataType::Int64, DataType::Float64, DataType::Float64],
@@ -61,18 +66,55 @@ impl UddSketchState {
)
}
/// Create a UDF for the `uddsketch_merge` function.
///
/// `uddsketch_merge` accepts bucket size, error rate, and a binary column of states generated by `uddsketch_state`
/// and merges them into a single state.
///
/// The bucket size and error rate must be the same as the original state.
pub fn merge_udf_impl() -> AggregateUDF {
create_udaf(
UDDSKETCH_MERGE_NAME,
vec![DataType::Int64, DataType::Float64, DataType::Binary],
Arc::new(DataType::Binary),
Volatility::Immutable,
Arc::new(|args| {
let (bucket_size, error_rate) = downcast_accumulator_args(args)?;
Ok(Box::new(UddSketchState::new(bucket_size, error_rate)))
}),
Arc::new(vec![DataType::Binary]),
)
}
fn update(&mut self, value: f64) {
self.uddsketch.add_value(value);
}
fn merge(&mut self, raw: &[u8]) {
if let Ok(uddsketch) = bincode::deserialize::<UDDSketch>(raw) {
if uddsketch.count() != 0 {
self.uddsketch.merge_sketch(&uddsketch);
fn merge(&mut self, raw: &[u8]) -> DfResult<()> {
if let Ok(uddsketch) = bincode::deserialize::<Self>(raw) {
if uddsketch.uddsketch.count() != 0 {
if self.uddsketch.max_allowed_buckets() != uddsketch.uddsketch.max_allowed_buckets()
|| (self.error_rate - uddsketch.error_rate).abs() >= 1e-9
{
return Err(DataFusionError::Plan(format!(
"Merging UDDSketch with different parameters: arguments={:?} vs actual input={:?}",
(
self.uddsketch.max_allowed_buckets(),
self.error_rate
),
(uddsketch.uddsketch.max_allowed_buckets(), uddsketch.error_rate)
)));
}
self.uddsketch.merge_sketch(&uddsketch.uddsketch);
}
} else {
trace!("Warning: Failed to deserialize UDDSketch from {:?}", raw);
return Err(DataFusionError::Plan(
"Failed to deserialize UDDSketch from binary".to_string(),
));
}
Ok(())
}
}
@@ -113,9 +155,21 @@ fn downcast_accumulator_args(args: AccumulatorArgs) -> DfResult<(u64, f64)> {
impl DfAccumulator for UddSketchState {
fn update_batch(&mut self, values: &[ArrayRef]) -> DfResult<()> {
let array = &values[2]; // the third column is data value
let f64_array = as_primitive_array::<Float64Type>(array)?;
for v in f64_array.iter().flatten() {
self.update(v);
match array.data_type() {
DataType::Float64 => {
let f64_array = as_primitive_array::<Float64Type>(array)?;
for v in f64_array.iter().flatten() {
self.update(v);
}
}
// meaning instantiate as `uddsketch_merge`
DataType::Binary => self.merge_batch(&[array.clone()])?,
_ => {
return not_impl_err!(
"UDDSketch functions do not support data type: {}",
array.data_type()
)
}
}
Ok(())
@@ -123,7 +177,7 @@ impl DfAccumulator for UddSketchState {
fn evaluate(&mut self) -> DfResult<ScalarValue> {
Ok(ScalarValue::Binary(Some(
bincode::serialize(&self.uddsketch).map_err(|e| {
bincode::serialize(&self).map_err(|e| {
DataFusionError::Internal(format!("Failed to serialize UDDSketch: {}", e))
})?,
)))
@@ -150,7 +204,7 @@ impl DfAccumulator for UddSketchState {
fn state(&mut self) -> DfResult<Vec<ScalarValue>> {
Ok(vec![ScalarValue::Binary(Some(
bincode::serialize(&self.uddsketch).map_err(|e| {
bincode::serialize(&self).map_err(|e| {
DataFusionError::Internal(format!("Failed to serialize UDDSketch: {}", e))
})?,
))])
@@ -160,7 +214,7 @@ impl DfAccumulator for UddSketchState {
let array = &states[0];
let binary_array = as_binary_array(array)?;
for v in binary_array.iter().flatten() {
self.merge(v);
self.merge(v)?;
}
Ok(())
@@ -182,8 +236,8 @@ mod tests {
let result = state.evaluate().unwrap();
if let ScalarValue::Binary(Some(bytes)) = result {
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
assert_eq!(deserialized.count(), 3);
let deserialized: UddSketchState = bincode::deserialize(&bytes).unwrap();
assert_eq!(deserialized.uddsketch.count(), 3);
} else {
panic!("Expected binary scalar value");
}
@@ -201,13 +255,15 @@ mod tests {
// Create new state and merge the serialized data
let mut new_state = UddSketchState::new(10, 0.01);
if let ScalarValue::Binary(Some(bytes)) = &serialized {
new_state.merge(bytes);
new_state.merge(bytes).unwrap();
// Verify the merged state matches original by comparing deserialized values
let original_sketch: UDDSketch = bincode::deserialize(bytes).unwrap();
let original_sketch: UddSketchState = bincode::deserialize(bytes).unwrap();
let original_sketch = original_sketch.uddsketch;
let new_result = new_state.evaluate().unwrap();
if let ScalarValue::Binary(Some(new_bytes)) = new_result {
let new_sketch: UDDSketch = bincode::deserialize(&new_bytes).unwrap();
let new_sketch: UddSketchState = bincode::deserialize(&new_bytes).unwrap();
let new_sketch = new_sketch.uddsketch;
assert_eq!(original_sketch.count(), new_sketch.count());
assert_eq!(original_sketch.sum(), new_sketch.sum());
assert_eq!(original_sketch.mean(), new_sketch.mean());
@@ -244,7 +300,8 @@ mod tests {
let result = state.evaluate().unwrap();
if let ScalarValue::Binary(Some(bytes)) = result {
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
let deserialized: UddSketchState = bincode::deserialize(&bytes).unwrap();
let deserialized = deserialized.uddsketch;
assert_eq!(deserialized.count(), 3);
} else {
panic!("Expected binary scalar value");
@@ -273,7 +330,8 @@ mod tests {
let result = merged_state.evaluate().unwrap();
if let ScalarValue::Binary(Some(bytes)) = result {
let deserialized: UDDSketch = bincode::deserialize(&bytes).unwrap();
let deserialized: UddSketchState = bincode::deserialize(&bytes).unwrap();
let deserialized = deserialized.uddsketch;
assert_eq!(deserialized.count(), 2);
} else {
panic!("Expected binary scalar value");

View File

@@ -13,10 +13,8 @@
// limitations under the License.
use std::sync::Arc;
mod greatest;
mod to_unixtime;
use greatest::GreatestFunction;
use to_unixtime::ToUnixtimeFunction;
use crate::function_registry::FunctionRegistry;
@@ -26,6 +24,5 @@ pub(crate) struct TimestampFunction;
impl TimestampFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(ToUnixtimeFunction));
registry.register(Arc::new(GreatestFunction));
}
}

View File

@@ -1,328 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{self};
use common_query::error::{
self, ArrowComputeSnafu, InvalidFuncArgsSnafu, Result, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, Volatility};
use datafusion::arrow::compute::kernels::cmp::gt;
use datatypes::arrow::array::AsArray;
use datatypes::arrow::compute::cast;
use datatypes::arrow::compute::kernels::zip;
use datatypes::arrow::datatypes::{
DataType as ArrowDataType, Date32Type, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
};
use datatypes::prelude::ConcreteDataType;
use datatypes::types::TimestampType;
use datatypes::vectors::{Helper, VectorRef};
use snafu::{ensure, ResultExt};
use crate::function::{Function, FunctionContext};
#[derive(Clone, Debug, Default)]
pub struct GreatestFunction;
const NAME: &str = "greatest";
macro_rules! gt_time_types {
($ty: ident, $columns:expr) => {{
let column1 = $columns[0].to_arrow_array();
let column2 = $columns[1].to_arrow_array();
let column1 = column1.as_primitive::<$ty>();
let column2 = column2.as_primitive::<$ty>();
let boolean_array = gt(&column1, &column2).context(ArrowComputeSnafu)?;
let result = zip::zip(&boolean_array, &column1, &column2).context(ArrowComputeSnafu)?;
Helper::try_into_vector(&result).context(error::FromArrowArraySnafu)
}};
}
impl Function for GreatestFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
ensure!(
input_types.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
input_types.len()
)
}
);
match &input_types[0] {
ConcreteDataType::String(_) => Ok(ConcreteDataType::timestamp_millisecond_datatype()),
ConcreteDataType::Date(_) => Ok(ConcreteDataType::date_datatype()),
ConcreteDataType::Timestamp(ts_type) => Ok(ConcreteDataType::Timestamp(*ts_type)),
_ => UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: input_types,
}
.fail(),
}
}
fn signature(&self) -> Signature {
Signature::uniform(
2,
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::date_datatype(),
ConcreteDataType::timestamp_nanosecond_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_second_datatype(),
],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
columns.len()
),
}
);
match columns[0].data_type() {
ConcreteDataType::String(_) => {
let column1 = cast(
&columns[0].to_arrow_array(),
&ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
)
.context(ArrowComputeSnafu)?;
let column1 = column1.as_primitive::<TimestampMillisecondType>();
let column2 = cast(
&columns[1].to_arrow_array(),
&ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
)
.context(ArrowComputeSnafu)?;
let column2 = column2.as_primitive::<TimestampMillisecondType>();
let boolean_array = gt(&column1, &column2).context(ArrowComputeSnafu)?;
let result =
zip::zip(&boolean_array, &column1, &column2).context(ArrowComputeSnafu)?;
Ok(Helper::try_into_vector(&result).context(error::FromArrowArraySnafu)?)
}
ConcreteDataType::Date(_) => gt_time_types!(Date32Type, columns),
ConcreteDataType::Timestamp(ts_type) => match ts_type {
TimestampType::Second(_) => gt_time_types!(TimestampSecondType, columns),
TimestampType::Millisecond(_) => {
gt_time_types!(TimestampMillisecondType, columns)
}
TimestampType::Microsecond(_) => {
gt_time_types!(TimestampMicrosecondType, columns)
}
TimestampType::Nanosecond(_) => {
gt_time_types!(TimestampNanosecondType, columns)
}
},
_ => UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
}
.fail(),
}
}
}
impl fmt::Display for GreatestFunction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "GREATEST")
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_time::timestamp::TimeUnit;
use common_time::{Date, Timestamp};
use datatypes::types::{
DateType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType,
};
use datatypes::value::Value;
use datatypes::vectors::{
DateVector, StringVector, TimestampMicrosecondVector, TimestampMillisecondVector,
TimestampNanosecondVector, TimestampSecondVector, Vector,
};
use paste::paste;
use super::*;
#[test]
fn test_greatest_takes_string_vector() {
let function = GreatestFunction;
assert_eq!(
function
.return_type(&[
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype()
])
.unwrap(),
ConcreteDataType::timestamp_millisecond_datatype()
);
let columns = vec![
Arc::new(StringVector::from(vec![
"1970-01-01".to_string(),
"2012-12-23".to_string(),
])) as _,
Arc::new(StringVector::from(vec![
"2001-02-01".to_string(),
"1999-01-01".to_string(),
])) as _,
];
let result = function
.eval(&FunctionContext::default(), &columns)
.unwrap();
let result = result
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
result.get(0),
Value::Timestamp(Timestamp::from_str("2001-02-01 00:00:00", None).unwrap())
);
assert_eq!(
result.get(1),
Value::Timestamp(Timestamp::from_str("2012-12-23 00:00:00", None).unwrap())
);
}
#[test]
fn test_greatest_takes_date_vector() {
let function = GreatestFunction;
assert_eq!(
function
.return_type(&[
ConcreteDataType::date_datatype(),
ConcreteDataType::date_datatype()
])
.unwrap(),
ConcreteDataType::Date(DateType)
);
let columns = vec![
Arc::new(DateVector::from_slice(vec![-1, 2])) as _,
Arc::new(DateVector::from_slice(vec![0, 1])) as _,
];
let result = function
.eval(&FunctionContext::default(), &columns)
.unwrap();
let result = result.as_any().downcast_ref::<DateVector>().unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
result.get(0),
Value::Date(Date::from_str_utc("1970-01-01").unwrap())
);
assert_eq!(
result.get(1),
Value::Date(Date::from_str_utc("1970-01-03").unwrap())
);
}
#[test]
fn test_greatest_takes_datetime_vector() {
let function = GreatestFunction;
assert_eq!(
function
.return_type(&[
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_millisecond_datatype()
])
.unwrap(),
ConcreteDataType::timestamp_millisecond_datatype()
);
let columns = vec![
Arc::new(TimestampMillisecondVector::from_slice(vec![-1, 2])) as _,
Arc::new(TimestampMillisecondVector::from_slice(vec![0, 1])) as _,
];
let result = function
.eval(&FunctionContext::default(), &columns)
.unwrap();
let result = result
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
result.get(0),
Value::Timestamp(Timestamp::from_str("1970-01-01 00:00:00", None).unwrap())
);
assert_eq!(
result.get(1),
Value::Timestamp(Timestamp::from_str("1970-01-01 00:00:00.002", None).unwrap())
);
}
macro_rules! test_timestamp {
($type: expr,$unit: ident) => {
paste! {
#[test]
fn [<test_greatest_takes_ $unit:lower _vector>]() {
let function = GreatestFunction;
assert_eq!(
function.return_type(&[$type, $type]).unwrap(),
ConcreteDataType::Timestamp(TimestampType::$unit([<Timestamp $unit Type>]))
);
let columns = vec![
Arc::new([<Timestamp $unit Vector>]::from_slice(vec![-1, 2])) as _,
Arc::new([<Timestamp $unit Vector>]::from_slice(vec![0, 1])) as _,
];
let result = function.eval(&FunctionContext::default(), &columns).unwrap();
let result = result.as_any().downcast_ref::<[<Timestamp $unit Vector>]>().unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
result.get(0),
Value::Timestamp(Timestamp::new(0, TimeUnit::$unit))
);
assert_eq!(
result.get(1),
Value::Timestamp(Timestamp::new(2, TimeUnit::$unit))
);
}
}
}
}
test_timestamp!(
ConcreteDataType::timestamp_nanosecond_datatype(),
Nanosecond
);
test_timestamp!(
ConcreteDataType::timestamp_microsecond_datatype(),
Microsecond
);
test_timestamp!(
ConcreteDataType::timestamp_millisecond_datatype(),
Millisecond
);
test_timestamp!(ConcreteDataType::timestamp_second_datatype(), Second);
}

View File

@@ -217,7 +217,9 @@ pub enum Instruction {
/// Invalidates batch cache.
InvalidateCaches(Vec<CacheIdent>),
/// Flushes regions.
FlushRegion(FlushRegions),
FlushRegions(FlushRegions),
/// Flushes a single region.
FlushRegion(RegionId),
}
/// The reply of [UpgradeRegion].
@@ -248,6 +250,7 @@ pub enum InstructionReply {
CloseRegion(SimpleReply),
UpgradeRegion(UpgradeRegionReply),
DowngradeRegion(DowngradeRegionReply),
FlushRegion(SimpleReply),
}
impl Display for InstructionReply {
@@ -259,6 +262,7 @@ impl Display for InstructionReply {
Self::DowngradeRegion(reply) => {
write!(f, "InstructionReply::DowngradeRegion({})", reply)
}
Self::FlushRegion(reply) => write!(f, "InstructionReply::FlushRegion({})", reply),
}
}
}

View File

@@ -112,7 +112,7 @@ pub mod test_utils;
mod tombstone;
pub mod topic_name;
pub mod topic_region;
pub(crate) mod txn_helper;
pub mod txn_helper;
pub mod view_info;
use std::collections::{BTreeMap, HashMap, HashSet};

View File

@@ -25,7 +25,7 @@ pub struct TxnOpGetResponseSet(Vec<KeyValue>);
impl TxnOpGetResponseSet {
/// Returns a filter to consume a [KeyValue] where the key equals `key`.
pub(crate) fn filter(key: Vec<u8>) -> impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>> {
pub fn filter(key: Vec<u8>) -> impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>> {
move |set| {
let pos = set.0.iter().position(|kv| kv.key == key);
match pos {
@@ -36,7 +36,7 @@ impl TxnOpGetResponseSet {
}
/// Returns a decoder to decode bytes to `DeserializedValueWithBytes<T>`.
pub(crate) fn decode_with<F, T>(
pub fn decode_with<F, T>(
mut f: F,
) -> impl FnMut(&mut TxnOpGetResponseSet) -> Result<Option<DeserializedValueWithBytes<T>>>
where

View File

@@ -35,7 +35,7 @@ pub mod memory;
pub mod rds;
pub mod test;
pub mod txn;
pub mod util;
pub type KvBackendRef<E = Error> = Arc<dyn KvBackend<Error = E> + Send + Sync>;
#[async_trait]

View File

@@ -0,0 +1,85 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Removes sensitive information like passwords from connection strings.
///
/// This function sanitizes connection strings by removing credentials:
/// - For URL format (mysql://user:password@host:port/db): Removes everything before '@'
/// - For parameter format (host=localhost password=secret): Removes the password parameter
/// - For URL format without credentials (mysql://host:port/db): Removes the protocol prefix
///
/// # Arguments
///
/// * `conn_str` - The connection string to sanitize
///
/// # Returns
///
/// A sanitized version of the connection string with sensitive information removed
pub fn sanitize_connection_string(conn_str: &str) -> String {
// Case 1: URL format with credentials (mysql://user:password@host:port/db)
// Extract everything after the '@' symbol
if let Some(at_pos) = conn_str.find('@') {
return conn_str[at_pos + 1..].to_string();
}
// Case 2: Parameter format with password (host=localhost password=secret dbname=mydb)
// Filter out any parameter that starts with "password="
if conn_str.contains("password=") {
return conn_str
.split_whitespace()
.filter(|param| !param.starts_with("password="))
.collect::<Vec<_>>()
.join(" ");
}
// Case 3: URL format without credentials (mysql://host:port/db)
// Extract everything after the protocol prefix
if let Some(host_part) = conn_str.split("://").nth(1) {
return host_part.to_string();
}
// Case 4: Already sanitized or unknown format
// Return as is
conn_str.to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sanitize_connection_string() {
// Test URL format with username/password
let conn_str = "mysql://user:password123@localhost:3306/db";
assert_eq!(sanitize_connection_string(conn_str), "localhost:3306/db");
// Test URL format without credentials
let conn_str = "mysql://localhost:3306/db";
assert_eq!(sanitize_connection_string(conn_str), "localhost:3306/db");
// Test parameter format with password
let conn_str = "host=localhost port=5432 user=postgres password=secret dbname=mydb";
assert_eq!(
sanitize_connection_string(conn_str),
"host=localhost port=5432 user=postgres dbname=mydb"
);
// Test parameter format without password
let conn_str = "host=localhost port=5432 user=postgres dbname=mydb";
assert_eq!(
sanitize_connection_string(conn_str),
"host=localhost port=5432 user=postgres dbname=mydb"
);
}
}

View File

@@ -39,6 +39,7 @@ pub struct RegionHeartbeatResponseHandler {
region_server: RegionServer,
catchup_tasks: TaskTracker<()>,
downgrade_tasks: TaskTracker<()>,
flush_tasks: TaskTracker<()>,
}
/// Handler of the instruction.
@@ -50,6 +51,7 @@ pub struct HandlerContext {
region_server: RegionServer,
catchup_tasks: TaskTracker<()>,
downgrade_tasks: TaskTracker<()>,
flush_tasks: TaskTracker<()>,
}
impl HandlerContext {
@@ -63,6 +65,7 @@ impl HandlerContext {
region_server,
catchup_tasks: TaskTracker::new(),
downgrade_tasks: TaskTracker::new(),
flush_tasks: TaskTracker::new(),
}
}
}
@@ -74,6 +77,7 @@ impl RegionHeartbeatResponseHandler {
region_server,
catchup_tasks: TaskTracker::new(),
downgrade_tasks: TaskTracker::new(),
flush_tasks: TaskTracker::new(),
}
}
@@ -95,8 +99,11 @@ impl RegionHeartbeatResponseHandler {
handler_context.handle_upgrade_region_instruction(upgrade_region)
})),
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
Instruction::FlushRegion(flush_regions) => Ok(Box::new(move |handler_context| {
handler_context.handle_flush_region_instruction(flush_regions)
Instruction::FlushRegions(flush_regions) => Ok(Box::new(move |handler_context| {
handler_context.handle_flush_regions_instruction(flush_regions)
})),
Instruction::FlushRegion(flush_region) => Ok(Box::new(move |handler_context| {
handler_context.handle_flush_region_instruction(flush_region)
})),
}
}
@@ -111,6 +118,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
| Some((_, Instruction::CloseRegion { .. }))
| Some((_, Instruction::DowngradeRegion { .. }))
| Some((_, Instruction::UpgradeRegion { .. }))
| Some((_, Instruction::FlushRegion { .. }))
)
}
@@ -124,12 +132,14 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
let region_server = self.region_server.clone();
let catchup_tasks = self.catchup_tasks.clone();
let downgrade_tasks = self.downgrade_tasks.clone();
let flush_tasks = self.flush_tasks.clone();
let handler = Self::build_handler(instruction)?;
let _handle = common_runtime::spawn_global(async move {
let reply = handler(HandlerContext {
region_server,
catchup_tasks,
downgrade_tasks,
flush_tasks,
})
.await;

View File

@@ -12,16 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{FlushRegions, InstructionReply};
use common_meta::instruction::{FlushRegions, InstructionReply, SimpleReply};
use common_telemetry::warn;
use futures_util::future::BoxFuture;
use store_api::region_request::{RegionFlushRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::error;
use crate::heartbeat::handler::HandlerContext;
impl HandlerContext {
pub(crate) fn handle_flush_region_instruction(
pub(crate) fn handle_flush_regions_instruction(
self,
flush_regions: FlushRegions,
) -> BoxFuture<'static, Option<InstructionReply>> {
@@ -49,6 +50,59 @@ impl HandlerContext {
None
})
}
pub(crate) fn handle_flush_region_instruction(
self,
region_id: RegionId,
) -> BoxFuture<'static, Option<InstructionReply>> {
Box::pin(async move {
let Some(writable) = self.region_server.is_region_leader(region_id) else {
return Some(InstructionReply::FlushRegion(SimpleReply {
result: false,
error: Some("Region is not leader".to_string()),
}));
};
if !writable {
return Some(InstructionReply::FlushRegion(SimpleReply {
result: false,
error: Some("Region is not writable".to_string()),
}));
}
let region_server_moved = self.region_server.clone();
let register_result = self
.flush_tasks
.try_register(
region_id,
Box::pin(async move {
let request = RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
});
region_server_moved
.handle_request(region_id, request)
.await?;
Ok(())
}),
)
.await;
if register_result.is_busy() {
warn!("Another flush task is running for the region: {region_id}");
}
let mut watcher = register_result.into_watcher();
let result = self.flush_tasks.wait_until_finish(&mut watcher).await;
match result {
Ok(()) => Some(InstructionReply::FlushRegion(SimpleReply {
result: true,
error: None,
})),
Err(err) => Some(InstructionReply::FlushRegion(SimpleReply {
result: false,
error: Some(format!("{err:?}")),
})),
}
})
}
}
#[cfg(test)]
@@ -84,7 +138,7 @@ mod tests {
let reply = handler_context
.clone()
.handle_flush_region_instruction(FlushRegions {
.handle_flush_regions_instruction(FlushRegions {
region_ids: region_ids.clone(),
})
.await;
@@ -94,7 +148,7 @@ mod tests {
flushed_region_ids.write().unwrap().clear();
let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
let reply = handler_context
.handle_flush_region_instruction(FlushRegions {
.handle_flush_regions_instruction(FlushRegions {
region_ids: not_found_region_ids.clone(),
})
.await;

View File

@@ -144,6 +144,11 @@ impl<T: Send + Sync + Clone + 'static> TaskTracker<T> {
}
}
/// Waits for a [RegisterResult] and returns a [WaitResult].
pub(crate) async fn wait_until_finish(&self, watcher: &mut TaskWatcher<T>) -> Result<T> {
wait(watcher).await
}
/// Tries to register a new async task, returns [RegisterResult::Busy] if previous task is running.
pub(crate) async fn try_register(
&self,

View File

@@ -37,11 +37,12 @@ use tokio::sync::{Mutex, RwLock};
use crate::adapter::{CreateFlowArgs, StreamingEngine};
use crate::batching_mode::engine::BatchingEngine;
use crate::batching_mode::{FRONTEND_SCAN_TIMEOUT, MIN_REFRESH_DURATION};
use crate::engine::FlowEngine;
use crate::error::{
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, IllegalCheckTaskStateSnafu,
InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, SyncCheckTaskSnafu,
UnexpectedSnafu,
InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, NoAvailableFrontendSnafu,
SyncCheckTaskSnafu, UnexpectedSnafu,
};
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow};
@@ -81,6 +82,11 @@ impl FlowDualEngine {
}
}
/// Determine if the engine is in distributed mode
pub fn is_distributed(&self) -> bool {
self.streaming_engine.node_id.is_some()
}
pub fn streaming_engine(&self) -> Arc<StreamingEngine> {
self.streaming_engine.clone()
}
@@ -89,6 +95,39 @@ impl FlowDualEngine {
self.batching_engine.clone()
}
/// In distributed mode, scan periodically(1s) until available frontend is found, or timeout,
/// in standalone mode, return immediately
/// notice here if any frontend appear in cluster info this function will return immediately
async fn wait_for_available_frontend(&self, timeout: std::time::Duration) -> Result<(), Error> {
if !self.is_distributed() {
return Ok(());
}
let frontend_client = self.batching_engine().frontend_client.clone();
let sleep_duration = std::time::Duration::from_millis(1_000);
let now = std::time::Instant::now();
loop {
let frontend_list = frontend_client.scan_for_frontend().await?;
if !frontend_list.is_empty() {
let fe_list = frontend_list
.iter()
.map(|(_, info)| &info.peer.addr)
.collect::<Vec<_>>();
info!("Available frontend found: {:?}", fe_list);
return Ok(());
}
let elapsed = now.elapsed();
tokio::time::sleep(sleep_duration).await;
info!("Waiting for available frontend, elapsed={:?}", elapsed);
if elapsed >= timeout {
return NoAvailableFrontendSnafu {
timeout,
context: "No available frontend found in cluster info",
}
.fail();
}
}
}
/// Try to sync with check task, this is only used in drop flow&flush flow, so a flow id is required
///
/// the need to sync is to make sure flush flow actually get called
@@ -338,18 +377,36 @@ struct ConsistentCheckTask {
impl ConsistentCheckTask {
async fn start_check_task(engine: &Arc<FlowDualEngine>) -> Result<Self, Error> {
// first do recover flows
engine.check_flow_consistent(true, false).await?;
let inner = engine.clone();
let engine = engine.clone();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let (trigger_tx, mut trigger_rx) =
tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
let handle = common_runtime::spawn_global(async move {
// first check if available frontend is found
if let Err(err) = engine
.wait_for_available_frontend(FRONTEND_SCAN_TIMEOUT)
.await
{
warn!("No frontend is available yet:\n {err:?}");
}
// then do recover flows, if failed, always retry
let mut recover_retry = 0;
while let Err(err) = engine.check_flow_consistent(true, false).await {
recover_retry += 1;
error!(
"Failed to recover flows:\n {err:?}, retry {} in {}s",
recover_retry,
MIN_REFRESH_DURATION.as_secs()
);
tokio::time::sleep(MIN_REFRESH_DURATION).await;
}
// then do check flows, with configurable allow_create and allow_drop
let (mut allow_create, mut allow_drop) = (false, false);
let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
loop {
if let Err(err) = inner.check_flow_consistent(allow_create, allow_drop).await {
if let Err(err) = engine.check_flow_consistent(allow_create, allow_drop).await {
error!(err; "Failed to check flow consistent");
}
if let Some(done) = ret_signal.take() {
@@ -534,7 +591,12 @@ impl FlowEngine for FlowDualEngine {
match flow_type {
Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
None => Ok(0),
None => {
warn!(
"Currently flow={flow_id} doesn't exist in flownode, ignore flush_flow request"
);
Ok(0)
}
}
}

View File

@@ -31,10 +31,19 @@ pub const DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs(
pub const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(60);
/// The minimum duration between two queries execution by batching mode task
const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);
pub const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);
/// Grpc connection timeout
const GRPC_CONN_TIMEOUT: Duration = Duration::from_secs(5);
/// Grpc max retry number
const GRPC_MAX_RETRIES: u32 = 3;
/// Flow wait for available frontend timeout,
/// if failed to find available frontend after FRONTEND_SCAN_TIMEOUT elapsed, return error
/// which should prevent flownode from starting
pub const FRONTEND_SCAN_TIMEOUT: Duration = Duration::from_secs(30);
/// Frontend activity timeout
/// if frontend is down(not sending heartbeat) for more than FRONTEND_ACTIVITY_TIMEOUT, it will be removed from the list that flownode use to connect
pub const FRONTEND_ACTIVITY_TIMEOUT: Duration = Duration::from_secs(60);

View File

@@ -49,7 +49,8 @@ use crate::{CreateFlowArgs, Error, FlowId, TableName};
pub struct BatchingEngine {
tasks: RwLock<BTreeMap<FlowId, BatchingTask>>,
shutdown_txs: RwLock<BTreeMap<FlowId, oneshot::Sender<()>>>,
frontend_client: Arc<FrontendClient>,
/// frontend client for insert request
pub(crate) frontend_client: Arc<FrontendClient>,
flow_metadata_manager: FlowMetadataManagerRef,
table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,

View File

@@ -15,6 +15,7 @@
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
use std::sync::{Arc, Weak};
use std::time::SystemTime;
use api::v1::greptime_request::Request;
use api::v1::CreateTableExpr;
@@ -26,15 +27,17 @@ use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
use common_query::Output;
use common_telemetry::warn;
use itertools::Itertools;
use meta_client::client::MetaClient;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use crate::batching_mode::{
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, GRPC_CONN_TIMEOUT, GRPC_MAX_RETRIES,
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
GRPC_MAX_RETRIES,
};
use crate::error::{ExternalSnafu, InvalidRequestSnafu, UnexpectedSnafu};
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
use crate::Error;
/// Just like [`GrpcQueryHandler`] but use BoxedError
@@ -127,10 +130,24 @@ impl DatabaseWithPeer {
fn new(database: Database, peer: Peer) -> Self {
Self { database, peer }
}
/// Try sending a "SELECT 1" to the database
async fn try_select_one(&self) -> Result<(), Error> {
// notice here use `sql` for `SELECT 1` return 1 row
let _ = self
.database
.sql("SELECT 1")
.await
.with_context(|_| InvalidRequestSnafu {
context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer),
})?;
Ok(())
}
}
impl FrontendClient {
async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
/// scan for available frontend from metadata
pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
let Self::Distributed { meta_client, .. } = self else {
return Ok(vec![]);
};
@@ -160,8 +177,8 @@ impl FrontendClient {
Ok(res)
}
/// Get the database with max `last_activity_ts`
async fn get_last_active_frontend(
/// Get the database with maximum `last_activity_ts`& is able to process query
async fn get_latest_active_frontend(
&self,
catalog: &str,
schema: &str,
@@ -177,22 +194,50 @@ impl FrontendClient {
.fail();
};
let frontends = self.scan_for_frontend().await?;
let mut peer = None;
let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT);
interval.tick().await;
for retry in 0..GRPC_MAX_RETRIES {
let frontends = self.scan_for_frontend().await?;
let now_in_ms = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
if let Some((_, val)) = frontends.iter().max_by_key(|(_, val)| val.last_activity_ts) {
peer = Some(val.peer.clone());
// found node with maximum last_activity_ts
for (_, node_info) in frontends
.iter()
.sorted_by_key(|(_, node_info)| node_info.last_activity_ts)
.rev()
// filter out frontend that have been down for more than 1 min
.filter(|(_, node_info)| {
node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64
> now_in_ms
})
{
let addr = &node_info.peer.addr;
let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
let database = Database::new(catalog, schema, client);
let db = DatabaseWithPeer::new(database, node_info.peer.clone());
match db.try_select_one().await {
Ok(_) => return Ok(db),
Err(e) => {
warn!(
"Failed to connect to frontend {} on retry={}: \n{e:?}",
addr, retry
);
}
}
}
// no available frontend
// sleep and retry
interval.tick().await;
}
let Some(peer) = peer else {
UnexpectedSnafu {
reason: format!("No frontend available: {:?}", frontends),
}
.fail()?
};
let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![peer.addr.clone()]);
let database = Database::new(catalog, schema, client);
Ok(DatabaseWithPeer::new(database, peer))
NoAvailableFrontendSnafu {
timeout: GRPC_CONN_TIMEOUT,
context: "No available frontend found that is able to process query",
}
.fail()
}
pub async fn create(
@@ -222,38 +267,18 @@ impl FrontendClient {
) -> Result<u32, Error> {
match self {
FrontendClient::Distributed { .. } => {
let db = self.get_last_active_frontend(catalog, schema).await?;
let db = self.get_latest_active_frontend(catalog, schema).await?;
*peer_desc = Some(PeerDesc::Dist {
peer: db.peer.clone(),
});
let mut retry = 0;
loop {
let ret = db.database.handle(req.clone()).await.with_context(|_| {
InvalidRequestSnafu {
context: format!("Failed to handle request: {:?}", req),
}
});
if let Err(err) = ret {
if retry < GRPC_MAX_RETRIES {
retry += 1;
warn!(
"Failed to send request to grpc handle at Peer={:?}, retry = {}, error = {:?}",
db.peer, retry, err
);
continue;
} else {
common_telemetry::error!(
"Failed to send request to grpc handle at Peer={:?} after {} retries, error = {:?}",
db.peer, retry, err
);
return Err(err);
}
}
return ret;
}
db.database
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
.await
.with_context(|_| InvalidRequestSnafu {
context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
})
}
FrontendClient::Standalone { database_client } => {
let ctx = QueryContextBuilder::default()

View File

@@ -61,6 +61,16 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"No available frontend found after timeout: {timeout:?}, context: {context}"
))]
NoAvailableFrontend {
timeout: std::time::Duration,
context: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("External error"))]
External {
source: BoxedError,
@@ -296,7 +306,8 @@ impl ErrorExt for Error {
Self::Eval { .. }
| Self::JoinTask { .. }
| Self::Datafusion { .. }
| Self::InsertIntoFlow { .. } => StatusCode::Internal,
| Self::InsertIntoFlow { .. }
| Self::NoAvailableFrontend { .. } => StatusCode::Internal,
Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Self::TableNotFound { .. }
| Self::TableNotFoundMeta { .. }

View File

@@ -172,6 +172,8 @@ impl FlownodeServer {
}
/// Start the background task for streaming computation.
///
/// Should be called only after heartbeat is establish, hence can get cluster info
async fn start_workers(&self) -> Result<(), Error> {
let manager_ref = self.inner.flow_service.dual_engine.clone();
let handle = manager_ref

View File

@@ -14,7 +14,7 @@
pub mod builder;
use std::fmt::Display;
use std::fmt::{self, Display};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
@@ -48,6 +48,7 @@ use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::http::HttpOptions;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::broadcast::error::RecvError;
@@ -65,7 +66,7 @@ use crate::procedure::wal_prune::manager::WalPruneTickerRef;
use crate::procedure::ProcedureManagerListenerAdapter;
use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
use crate::region::supervisor::RegionSupervisorTickerRef;
use crate::selector::{Selector, SelectorType};
use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
use crate::state::{become_follower, become_leader, StateRef};
@@ -96,7 +97,7 @@ pub enum BackendImpl {
MysqlStore,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct MetasrvOptions {
/// The address the server listens on.
@@ -166,6 +167,47 @@ pub struct MetasrvOptions {
pub node_max_idle_time: Duration,
}
impl fmt::Debug for MetasrvOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug_struct = f.debug_struct("MetasrvOptions");
debug_struct
.field("bind_addr", &self.bind_addr)
.field("server_addr", &self.server_addr)
.field("store_addrs", &self.sanitize_store_addrs())
.field("selector", &self.selector)
.field("use_memory_store", &self.use_memory_store)
.field("enable_region_failover", &self.enable_region_failover)
.field(
"allow_region_failover_on_local_wal",
&self.allow_region_failover_on_local_wal,
)
.field("http", &self.http)
.field("logging", &self.logging)
.field("procedure", &self.procedure)
.field("failure_detector", &self.failure_detector)
.field("datanode", &self.datanode)
.field("enable_telemetry", &self.enable_telemetry)
.field("data_home", &self.data_home)
.field("wal", &self.wal)
.field("export_metrics", &self.export_metrics)
.field("store_key_prefix", &self.store_key_prefix)
.field("max_txn_ops", &self.max_txn_ops)
.field("flush_stats_factor", &self.flush_stats_factor)
.field("tracing", &self.tracing)
.field("backend", &self.backend);
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
debug_struct.field("meta_table_name", &self.meta_table_name);
#[cfg(feature = "pg_kvbackend")]
debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id);
debug_struct
.field("node_max_idle_time", &self.node_max_idle_time)
.finish()
}
}
const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
impl Default for MetasrvOptions {
@@ -249,6 +291,13 @@ impl MetasrvOptions {
common_telemetry::debug!("detect local IP is not supported on Android");
}
}
fn sanitize_store_addrs(&self) -> Vec<String> {
self.store_addrs
.iter()
.map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
.collect()
}
}
pub struct MetasrvInfo {
@@ -338,6 +387,8 @@ pub struct SelectorContext {
}
pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
pub type RegionStatAwareSelectorRef =
Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;
pub struct MetaStateHandler {

View File

@@ -40,7 +40,7 @@ use common_meta::state_store::KvStateStore;
use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use common_telemetry::warn;
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
use crate::cache_invalidator::MetasrvCacheInvalidator;
@@ -54,16 +54,16 @@ use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, Regi
use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
use crate::lease::MetaPeerLookupService;
use crate::metasrv::{
ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, SelectTarget, SelectorContext, SelectorRef,
FLOW_ID_SEQ, TABLE_ID_SEQ,
ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, RegionStatAwareSelectorRef, SelectTarget,
SelectorContext, SelectorRef, FLOW_ID_SEQ, TABLE_ID_SEQ,
};
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::DefaultContextFactory;
use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
use crate::procedure::wal_prune::Context as WalPruneContext;
use crate::region::supervisor::{
HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorTicker,
DEFAULT_TICK_INTERVAL,
HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
RegionSupervisorTicker, DEFAULT_TICK_INTERVAL,
};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::round_robin::RoundRobinSelector;
@@ -320,13 +320,24 @@ impl MetasrvBuilder {
),
));
region_migration_manager.try_start()?;
let region_supervisor_selector = plugins
.as_ref()
.and_then(|plugins| plugins.get::<RegionStatAwareSelectorRef>());
let supervisor_selector = match region_supervisor_selector {
Some(selector) => {
info!("Using region stat aware selector");
RegionSupervisorSelector::RegionStatAwareSelector(selector)
}
None => RegionSupervisorSelector::NaiveSelector(selector.clone()),
};
let region_failover_handler = if options.enable_region_failover {
let region_supervisor = RegionSupervisor::new(
rx,
options.failure_detector,
selector_ctx.clone(),
selector.clone(),
supervisor_selector,
region_migration_manager.clone(),
maintenance_mode_manager.clone(),
peer_lookup_service.clone(),

View File

@@ -14,6 +14,7 @@
pub(crate) mod close_downgraded_region;
pub(crate) mod downgrade_leader_region;
pub(crate) mod flush_leader_region;
pub(crate) mod manager;
pub(crate) mod migration_abort;
pub(crate) mod migration_end;
@@ -111,6 +112,8 @@ impl PersistentContext {
pub struct Metrics {
/// Elapsed time of downgrading region and upgrading region.
operations_elapsed: Duration,
/// Elapsed time of flushing leader region.
flush_leader_region_elapsed: Duration,
/// Elapsed time of downgrading leader region.
downgrade_leader_region_elapsed: Duration,
/// Elapsed time of open candidate region.
@@ -121,10 +124,15 @@ pub struct Metrics {
impl Display for Metrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let total = self.flush_leader_region_elapsed
+ self.downgrade_leader_region_elapsed
+ self.open_candidate_region_elapsed
+ self.upgrade_candidate_region_elapsed;
write!(
f,
"operations_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
self.operations_elapsed,
"total: {:?}, flush_leader_region_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
total,
self.flush_leader_region_elapsed,
self.downgrade_leader_region_elapsed,
self.open_candidate_region_elapsed,
self.upgrade_candidate_region_elapsed
@@ -138,6 +146,11 @@ impl Metrics {
self.operations_elapsed += elapsed;
}
/// Updates the elapsed time of flushing leader region.
pub fn update_flush_leader_region_elapsed(&mut self, elapsed: Duration) {
self.flush_leader_region_elapsed += elapsed;
}
/// Updates the elapsed time of downgrading leader region.
pub fn update_downgrade_leader_region_elapsed(&mut self, elapsed: Duration) {
self.downgrade_leader_region_elapsed += elapsed;
@@ -156,10 +169,18 @@ impl Metrics {
impl Drop for Metrics {
fn drop(&mut self) {
if !self.operations_elapsed.is_zero() {
let total = self.flush_leader_region_elapsed
+ self.downgrade_leader_region_elapsed
+ self.open_candidate_region_elapsed
+ self.upgrade_candidate_region_elapsed;
METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
.with_label_values(&["total"])
.observe(total.as_secs_f64());
if !self.flush_leader_region_elapsed.is_zero() {
METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
.with_label_values(&["operations"])
.observe(self.operations_elapsed.as_secs_f64());
.with_label_values(&["flush_leader_region"])
.observe(self.flush_leader_region_elapsed.as_secs_f64());
}
if !self.downgrade_leader_region_elapsed.is_zero() {
@@ -320,6 +341,13 @@ impl Context {
.update_operations_elapsed(instant.elapsed());
}
/// Updates the elapsed time of flushing leader region.
pub fn update_flush_leader_region_elapsed(&mut self, instant: Instant) {
self.volatile_ctx
.metrics
.update_flush_leader_region_elapsed(instant.elapsed());
}
/// Updates the elapsed time of downgrading leader region.
pub fn update_downgrade_leader_region_elapsed(&mut self, instant: Instant) {
self.volatile_ctx
@@ -700,7 +728,8 @@ mod tests {
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
use crate::procedure::region_migration::test_util::*;
use crate::procedure::test_util::{
new_downgrade_region_reply, new_open_region_reply, new_upgrade_region_reply,
new_downgrade_region_reply, new_flush_region_reply, new_open_region_reply,
new_upgrade_region_reply,
};
use crate::service::mailbox::Channel;
@@ -1208,6 +1237,15 @@ mod tests {
to_peer_id,
Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
)),
Assertion::simple(assert_flush_leader_region, assert_no_persist),
),
// Flush Leader Region
Step::next(
"Should be the flush leader region",
Some(mock_datanode_reply(
from_peer_id,
Arc::new(|id| Ok(new_flush_region_reply(id, true, None))),
)),
Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
),
// UpdateMetadata::Downgrade

View File

@@ -170,7 +170,7 @@ impl DowngradeLeaderRegion {
if error.is_some() {
return error::RetryLaterSnafu {
reason: format!(
"Failed to downgrade the region {} on Datanode {:?}, error: {:?}, elapsed: {:?}",
"Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
region_id, leader, error, now.elapsed()
),
}
@@ -179,13 +179,14 @@ impl DowngradeLeaderRegion {
if !exists {
warn!(
"Trying to downgrade the region {} on Datanode {}, but region doesn't exist!, elapsed: {:?}",
"Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
region_id, leader, now.elapsed()
);
} else {
info!(
"Region {} leader is downgraded, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
"Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
region_id,
leader,
last_entry_id,
metadata_last_entry_id,
now.elapsed()

View File

@@ -0,0 +1,285 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_procedure::Status;
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use tokio::time::Instant;
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
/// Flushes the leader region before downgrading it.
///
/// This can minimize the time window where the region is not writable.
#[derive(Debug, Serialize, Deserialize)]
pub struct PreFlushRegion;
#[async_trait::async_trait]
#[typetag::serde]
impl State for PreFlushRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
let timer = Instant::now();
self.flush_region(ctx).await?;
ctx.update_flush_leader_region_elapsed(timer);
// We intentionally don't update `operations_elapsed` here to prevent
// the `next_operation_timeout` from being reduced by the flush operation.
// This ensures sufficient time for subsequent critical operations.
Ok((
Box::new(UpdateMetadata::Downgrade),
Status::executing(false),
))
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl PreFlushRegion {
/// Builds flush leader region instruction.
fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction {
let pc = &ctx.persistent_ctx;
let region_id = pc.region_id;
Instruction::FlushRegion(region_id)
}
/// Tries to flush a leader region.
///
/// Ignore:
/// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
/// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
/// - Failed to flush region on the Datanode.
///
/// Abort:
/// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
/// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
/// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
/// - [ExceededDeadline](error::Error::ExceededDeadline)
/// - Invalid JSON.
async fn flush_region(&self, ctx: &mut Context) -> Result<()> {
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
operation: "Flush leader region",
})?;
let flush_instruction = self.build_flush_leader_region_instruction(ctx);
let region_id = ctx.persistent_ctx.region_id;
let leader = &ctx.persistent_ctx.from_peer;
let msg = MailboxMessage::json_message(
&format!("Flush leader region: {}", region_id),
&format!("Metasrv@{}", ctx.server_addr()),
&format!("Datanode-{}@{}", leader.id, leader.addr),
common_time::util::current_time_millis(),
&flush_instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: flush_instruction.to_string(),
})?;
let ch = Channel::Datanode(leader.id);
let now = Instant::now();
let result = ctx.mailbox.send(&ch, msg, operation_timeout).await;
match result {
Ok(receiver) => match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received flush leader region reply: {:?}, region: {}, elapsed: {:?}",
reply,
region_id,
now.elapsed()
);
let InstructionReply::FlushRegion(SimpleReply { result, error }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect flush region reply",
}
.fail();
};
if error.is_some() {
warn!(
"Failed to flush leader region {} on datanode {:?}, error: {:?}. Skip flush operation.",
region_id, leader, error
);
} else if result {
info!(
"The flush leader region {} on datanode {:?} is successful, elapsed: {:?}",
region_id,
leader,
now.elapsed()
);
}
Ok(())
}
Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
operation: "Flush leader region",
}
.fail(),
Err(err) => Err(err),
},
Err(Error::PusherNotFound { .. }) => {
warn!(
"Failed to flush leader region({}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
region_id,
leader
);
Ok(())
}
Err(err) => Err(err),
}
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use store_api::storage::RegionId;
use super::*;
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
use crate::procedure::test_util::{
new_close_region_reply, new_flush_region_reply, send_mock_reply,
};
fn new_persistent_context() -> PersistentContext {
test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
}
#[tokio::test]
async fn test_datanode_is_unreachable() {
let state = PreFlushRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
// Should be ok, if leader region is unreachable. it will skip flush operation.
state.flush_region(&mut ctx).await.unwrap();
}
#[tokio::test]
async fn test_unexpected_instruction_reply() {
common_telemetry::init_default_ut_logging();
let state = PreFlushRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
// Sends an incorrect reply.
send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
let err = state.flush_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::UnexpectedInstructionReply { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_instruction_exceeded_deadline() {
let state = PreFlushRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
// Sends an timeout error.
send_mock_reply(mailbox, rx, |id| {
Err(error::MailboxTimeoutSnafu { id }.build())
});
let err = state.flush_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::ExceededDeadline { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_flush_region_failed() {
common_telemetry::init_default_ut_logging();
let state = PreFlushRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
send_mock_reply(mailbox, rx, |id| {
Ok(new_flush_region_reply(
id,
false,
Some("test mocked".to_string()),
))
});
// Should be ok, if flush leader region failed. it will skip flush operation.
state.flush_region(&mut ctx).await.unwrap();
}
#[tokio::test]
async fn test_next_update_metadata_downgrade_state() {
common_telemetry::init_default_ut_logging();
let mut state = PreFlushRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
send_mock_reply(mailbox, rx, |id| Ok(new_flush_region_reply(id, true, None)));
let (next, _) = state.next(&mut ctx).await.unwrap();
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(update_metadata, UpdateMetadata::Downgrade);
}
}

View File

@@ -28,7 +28,7 @@ use tokio::time::Instant;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
@@ -47,10 +47,7 @@ impl State for OpenCandidateRegion {
self.open_candidate_region(ctx, instruction).await?;
ctx.update_open_candidate_region_elapsed(now);
Ok((
Box::new(UpdateMetadata::Downgrade),
Status::executing(false),
))
Ok((Box::new(PreFlushRegion), Status::executing(false)))
}
fn as_any(&self) -> &dyn Any {
@@ -399,7 +396,7 @@ mod tests {
}
#[tokio::test]
async fn test_next_update_metadata_downgrade_state() {
async fn test_next_flush_leader_region_state() {
let mut state = Box::new(OpenCandidateRegion);
// from_peer: 1
// to_peer: 2
@@ -445,8 +442,7 @@ mod tests {
(to_peer_id, region_id)
);
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(update_metadata, UpdateMetadata::Downgrade);
let flush_leader_region = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
assert_matches!(flush_leader_region, PreFlushRegion);
}
}

View File

@@ -44,6 +44,7 @@ use crate::error::{self, Error, Result};
use crate::metasrv::MetasrvInfo;
use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
use crate::procedure::region_migration::manager::RegionMigrationProcedureTracker;
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
@@ -415,6 +416,11 @@ pub(crate) fn assert_open_candidate_region(next: &dyn State) {
let _ = next.as_any().downcast_ref::<OpenCandidateRegion>().unwrap();
}
/// Asserts the [State] should be [FlushLeaderRegion].
pub(crate) fn assert_flush_leader_region(next: &dyn State) {
let _ = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
}
/// Asserts the [State] should be [UpdateMetadata::Downgrade].
pub(crate) fn assert_update_metadata_downgrade(next: &dyn State) {
let state = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();

View File

@@ -101,6 +101,24 @@ pub fn new_open_region_reply(id: u64, result: bool, error: Option<String>) -> Ma
}
}
/// Generates a [InstructionReply::FlushRegion] reply.
pub fn new_flush_region_reply(id: u64, result: bool, error: Option<String>) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::FlushRegion(SimpleReply {
result,
error,
}))
.unwrap(),
)),
}
}
/// Generates a [InstructionReply::CloseRegion] reply.
pub fn new_close_region_reply(id: u64) -> MailboxMessage {
MailboxMessage {

View File

@@ -181,7 +181,7 @@ impl WalPruneProcedure {
let peer_and_instructions = peer_region_ids_map
.into_iter()
.map(|(peer, region_ids)| {
let flush_instruction = Instruction::FlushRegion(FlushRegions { region_ids });
let flush_instruction = Instruction::FlushRegions(FlushRegions { region_ids });
(peer.clone(), flush_instruction)
})
.collect();
@@ -536,7 +536,7 @@ mod tests {
let msg = resp.mailbox_message.unwrap();
let flush_instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
let mut flush_requested_region_ids = match flush_instruction {
Instruction::FlushRegion(FlushRegions { region_ids, .. }) => region_ids,
Instruction::FlushRegions(FlushRegions { region_ids, .. }) => region_ids,
_ => unreachable!(),
};
let sorted_region_ids = region_ids

View File

@@ -22,20 +22,20 @@ use common_meta::datanode::Stat;
use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
use common_meta::key::maintenance::MaintenanceModeManagerRef;
use common_meta::leadership_notifier::LeadershipChangeListener;
use common_meta::peer::PeerLookupServiceRef;
use common_meta::peer::{Peer, PeerLookupServiceRef};
use common_meta::DatanodeId;
use common_runtime::JoinHandle;
use common_telemetry::{debug, error, info, warn};
use common_time::util::current_time_millis;
use error::Error::{LeaderPeerChanged, MigrationRunning, TableRouteNotFound};
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time::{interval, MissedTickBehavior};
use crate::error::{self, Result};
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef};
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::procedure::region_migration::{
RegionMigrationProcedureTask, DEFAULT_REGION_MIGRATION_TIMEOUT,
@@ -203,6 +203,12 @@ pub type RegionSupervisorRef = Arc<RegionSupervisor>;
/// The default tick interval.
pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
/// Selector for region supervisor.
pub enum RegionSupervisorSelector {
NaiveSelector(SelectorRef),
RegionStatAwareSelector(RegionStatAwareSelectorRef),
}
/// The [`RegionSupervisor`] is used to detect Region failures
/// and initiate Region failover upon detection, ensuring uninterrupted region service.
pub struct RegionSupervisor {
@@ -215,7 +221,7 @@ pub struct RegionSupervisor {
/// The context of [`SelectorRef`]
selector_context: SelectorContext,
/// Candidate node selector.
selector: SelectorRef,
selector: RegionSupervisorSelector,
/// Region migration manager.
region_migration_manager: RegionMigrationManagerRef,
/// The maintenance mode manager.
@@ -288,7 +294,7 @@ impl RegionSupervisor {
event_receiver: Receiver<Event>,
options: PhiAccrualFailureDetectorOptions,
selector_context: SelectorContext,
selector: SelectorRef,
selector: RegionSupervisorSelector,
region_migration_manager: RegionMigrationManagerRef,
maintenance_mode_manager: MaintenanceModeManagerRef,
peer_lookup: PeerLookupServiceRef,
@@ -362,6 +368,7 @@ impl RegionSupervisor {
}
}
// Extracts regions that are migrating(failover), which means they are already being triggered failover.
let migrating_regions = regions
.extract_if(.., |(_, region_id)| {
self.region_migration_manager.tracker().contains(*region_id)
@@ -374,10 +381,43 @@ impl RegionSupervisor {
);
}
warn!("Detects region failures: {:?}", regions);
if regions.is_empty() {
// If all detected regions are failover or migrating, just return.
return;
}
let mut grouped_regions: HashMap<u64, Vec<RegionId>> =
HashMap::with_capacity(regions.len());
for (datanode_id, region_id) in regions {
if let Err(err) = self.do_failover(datanode_id, region_id).await {
error!(err; "Failed to execute region failover for region: {region_id}, datanode: {datanode_id}");
grouped_regions
.entry(datanode_id)
.or_default()
.push(region_id);
}
for (datanode_id, regions) in grouped_regions {
warn!(
"Detects region failures on datanode: {}, regions: {:?}",
datanode_id, regions
);
// We can't use `grouped_regions.keys().cloned().collect::<Vec<_>>()` here
// because there may be false positives in failure detection on the datanode.
// So we only consider the datanode that reports the failure.
let failed_datanodes = [datanode_id];
match self
.generate_failover_tasks(datanode_id, &regions, &failed_datanodes)
.await
{
Ok(tasks) => {
for (task, count) in tasks {
let region_id = task.region_id;
let datanode_id = task.from_peer.id;
if let Err(err) = self.do_failover(task, count).await {
error!(err; "Failed to execute region failover for region: {}, datanode: {}", region_id, datanode_id);
}
}
}
Err(err) => error!(err; "Failed to generate failover tasks"),
}
}
}
@@ -389,49 +429,107 @@ impl RegionSupervisor {
.context(error::MaintenanceModeManagerSnafu)
}
async fn do_failover(&mut self, datanode_id: DatanodeId, region_id: RegionId) -> Result<()> {
let count = *self
.failover_counts
.entry((datanode_id, region_id))
.and_modify(|count| *count += 1)
.or_insert(1);
async fn select_peers(
&self,
from_peer_id: DatanodeId,
regions: &[RegionId],
failure_datanodes: &[DatanodeId],
) -> Result<Vec<(RegionId, Peer)>> {
let exclude_peer_ids = HashSet::from_iter(failure_datanodes.iter().cloned());
match &self.selector {
RegionSupervisorSelector::NaiveSelector(selector) => {
let opt = SelectorOptions {
min_required_items: regions.len(),
allow_duplication: true,
exclude_peer_ids,
};
let peers = selector.select(&self.selector_context, opt).await?;
ensure!(
peers.len() == regions.len(),
error::NoEnoughAvailableNodeSnafu {
required: regions.len(),
available: peers.len(),
select_target: SelectTarget::Datanode,
}
);
let region_peers = regions
.iter()
.zip(peers)
.map(|(region_id, peer)| (*region_id, peer))
.collect::<Vec<_>>();
Ok(region_peers)
}
RegionSupervisorSelector::RegionStatAwareSelector(selector) => {
let peers = selector
.select(
&self.selector_context,
from_peer_id,
regions,
exclude_peer_ids,
)
.await?;
ensure!(
peers.len() == regions.len(),
error::NoEnoughAvailableNodeSnafu {
required: regions.len(),
available: peers.len(),
select_target: SelectTarget::Datanode,
}
);
Ok(peers)
}
}
}
async fn generate_failover_tasks(
&mut self,
from_peer_id: DatanodeId,
regions: &[RegionId],
failed_datanodes: &[DatanodeId],
) -> Result<Vec<(RegionMigrationProcedureTask, u32)>> {
let mut tasks = Vec::with_capacity(regions.len());
let from_peer = self
.peer_lookup
.datanode(datanode_id)
.datanode(from_peer_id)
.await
.context(error::LookupPeerSnafu {
peer_id: datanode_id,
peer_id: from_peer_id,
})?
.context(error::PeerUnavailableSnafu {
peer_id: datanode_id,
peer_id: from_peer_id,
})?;
let mut peers = self
.selector
.select(
&self.selector_context,
SelectorOptions {
min_required_items: 1,
allow_duplication: false,
exclude_peer_ids: HashSet::from([from_peer.id]),
},
)
let region_peers = self
.select_peers(from_peer_id, regions, failed_datanodes)
.await?;
let to_peer = peers.remove(0);
if to_peer.id == from_peer.id {
warn!(
"Skip failover for region: {region_id}, from_peer: {from_peer}, trying to failover to the same peer."
);
return Ok(());
for (region_id, peer) in region_peers {
let count = *self
.failover_counts
.entry((from_peer_id, region_id))
.and_modify(|count| *count += 1)
.or_insert(1);
let task = RegionMigrationProcedureTask {
region_id,
from_peer: from_peer.clone(),
to_peer: peer,
timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
};
tasks.push((task, count));
}
Ok(tasks)
}
async fn do_failover(&mut self, task: RegionMigrationProcedureTask, count: u32) -> Result<()> {
let from_peer_id = task.from_peer.id;
let region_id = task.region_id;
info!(
"Failover for region: {region_id}, from_peer: {from_peer}, to_peer: {to_peer}, tries: {count}"
"Failover for region: {}, from_peer: {}, to_peer: {}, timeout: {:?}, tries: {}",
task.region_id, task.from_peer, task.to_peer, task.timeout, count
);
let task = RegionMigrationProcedureTask {
region_id,
from_peer,
to_peer,
timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
};
if let Err(err) = self.region_migration_manager.submit_procedure(task).await {
return match err {
@@ -439,25 +537,25 @@ impl RegionSupervisor {
MigrationRunning { .. } => {
info!(
"Another region migration is running, skip failover for region: {}, datanode: {}",
region_id, datanode_id
region_id, from_peer_id
);
Ok(())
}
TableRouteNotFound { .. } => {
self.deregister_failure_detectors(vec![(datanode_id, region_id)])
self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
.await;
info!(
"Table route is not found, the table is dropped, removed failover detector for region: {}, datanode: {}",
region_id, datanode_id
region_id, from_peer_id
);
Ok(())
}
LeaderPeerChanged { .. } => {
self.deregister_failure_detectors(vec![(datanode_id, region_id)])
self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
.await;
info!(
"Region's leader peer changed, removed failover detector for region: {}, datanode: {}",
region_id, datanode_id
region_id, from_peer_id
);
Ok(())
}
@@ -521,6 +619,7 @@ pub(crate) mod tests {
use tokio::sync::oneshot;
use tokio::time::sleep;
use super::RegionSupervisorSelector;
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::test_util::TestingEnv;
use crate::region::supervisor::{
@@ -548,7 +647,7 @@ pub(crate) mod tests {
rx,
Default::default(),
selector_context,
selector,
RegionSupervisorSelector::NaiveSelector(selector),
region_migration_manager,
maintenance_mode_manager,
peer_lookup,

View File

@@ -23,6 +23,7 @@ pub mod weighted_choose;
use std::collections::HashSet;
use serde::{Deserialize, Serialize};
use store_api::storage::RegionId;
use strum::AsRefStr;
use crate::error;
@@ -36,6 +37,24 @@ pub trait Selector: Send + Sync {
async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Self::Output>;
}
/// A selector that aware of region statistics
///
/// It selects the best destination peer for a list of regions.
/// The selection is based on the region statistics, such as the region leader's write throughput.
#[async_trait::async_trait]
pub trait RegionStatAwareSelector: Send + Sync {
type Context;
type Output;
async fn select(
&self,
ctx: &Self::Context,
from_peer_id: u64,
region_ids: &[RegionId],
exclude_peer_ids: HashSet<u64>,
) -> Result<Self::Output>;
}
#[derive(Debug)]
pub struct SelectorOptions {
/// Minimum number of selected results.

View File

@@ -42,11 +42,11 @@ pub(crate) use state::MetricEngineState;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
SyncManifestResponse,
BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
RegionStatistic, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::{BatchRegionDdlRequest, RegionRequest};
use store_api::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use crate::config::EngineConfig;
@@ -131,6 +131,17 @@ impl RegionEngine for MetricEngine {
METRIC_ENGINE_NAME
}
async fn handle_batch_open_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<BatchResponses, BoxedError> {
self.inner
.handle_batch_open_requests(parallelism, requests)
.await
.map_err(BoxedError::new)
}
async fn handle_batch_ddl_requests(
&self,
batch_request: BatchRegionDdlRequest,

View File

@@ -14,24 +14,80 @@
//! Open a metric region.
use std::collections::HashSet;
use common_telemetry::info;
use mito2::engine::MITO_ENGINE_NAME;
use object_store::util::join_dir;
use snafu::{OptionExt, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR};
use store_api::region_engine::RegionEngine;
use store_api::region_engine::{BatchResponses, RegionEngine};
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::engine::create::region_options_for_metadata_region;
use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
use crate::engine::MetricEngineInner;
use crate::error::{OpenMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result};
use crate::error::{
BatchOpenMitoRegionSnafu, OpenMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result,
};
use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
use crate::utils;
impl MetricEngineInner {
pub async fn handle_batch_open_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<BatchResponses> {
// We need to open metadata region and data region for each request.
let mut all_requests = Vec::with_capacity(requests.len() * 2);
let mut physical_region_ids = Vec::with_capacity(requests.len());
let mut data_region_ids = HashSet::with_capacity(requests.len());
for (region_id, request) in requests {
if !request.is_physical_table() {
continue;
}
let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
let metadata_region_id = utils::to_metadata_region_id(region_id);
let data_region_id = utils::to_data_region_id(region_id);
let (open_metadata_region_request, open_data_region_request) =
self.transform_open_physical_region_request(request);
all_requests.push((metadata_region_id, open_metadata_region_request));
all_requests.push((data_region_id, open_data_region_request));
physical_region_ids.push((region_id, physical_region_options));
data_region_ids.insert(data_region_id);
}
let results = self
.mito
.handle_batch_open_requests(parallelism, all_requests)
.await
.context(BatchOpenMitoRegionSnafu {})?
.into_iter()
.filter(|(region_id, _)| data_region_ids.contains(region_id))
.collect::<Vec<_>>();
for (physical_region_id, physical_region_options) in physical_region_ids {
let primary_key_encoding = self
.mito
.get_primary_key_encoding(physical_region_id)
.context(PhysicalRegionNotFoundSnafu {
region_id: physical_region_id,
})?;
self.recover_states(
physical_region_id,
primary_key_encoding,
physical_region_options,
)
.await?;
}
Ok(results)
}
/// Open a metric region.
///
/// Only open requests to a physical region matter. Those to logical regions are
@@ -69,12 +125,15 @@ impl MetricEngineInner {
}
}
/// Invokes mito engine to open physical regions (data and metadata).
async fn open_physical_region(
/// Transform the open request to open metadata region and data region.
///
/// Returns:
/// - The open request for metadata region.
/// - The open request for data region.
fn transform_open_physical_region_request(
&self,
region_id: RegionId,
request: RegionOpenRequest,
) -> Result<AffectedRows> {
) -> (RegionOpenRequest, RegionOpenRequest) {
let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
let data_region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
@@ -98,8 +157,19 @@ impl MetricEngineInner {
skip_wal_replay: request.skip_wal_replay,
};
(open_metadata_region_request, open_data_region_request)
}
/// Invokes mito engine to open physical regions (data and metadata).
async fn open_physical_region(
&self,
region_id: RegionId,
request: RegionOpenRequest,
) -> Result<AffectedRows> {
let metadata_region_id = utils::to_metadata_region_id(region_id);
let data_region_id = utils::to_data_region_id(region_id);
let (open_metadata_region_request, open_data_region_request) =
self.transform_open_physical_region_request(request);
self.mito
.handle_request(

View File

@@ -42,6 +42,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to batch open mito region"))]
BatchOpenMitoRegion {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to close mito region, region id: {}", region_id))]
CloseMitoRegion {
region_id: RegionId,
@@ -337,7 +344,8 @@ impl ErrorExt for Error {
| MitoCatchupOperation { source, .. }
| MitoFlushOperation { source, .. }
| MitoDeleteOperation { source, .. }
| MitoSyncOperation { source, .. } => source.status_code(),
| MitoSyncOperation { source, .. }
| BatchOpenMitoRegion { source, .. } => source.status_code(),
EncodePrimaryKey { source, .. } => source.status_code(),

View File

@@ -302,7 +302,10 @@ impl PartitionTreeMemtable {
fn update_stats(&self, metrics: &WriteMetrics) {
// Only let the tracker tracks value bytes.
self.alloc_tracker.on_allocation(metrics.value_bytes);
metrics.update_timestamp_range(&self.max_timestamp, &self.min_timestamp);
self.max_timestamp
.fetch_max(metrics.max_ts, Ordering::SeqCst);
self.min_timestamp
.fetch_min(metrics.min_ts, Ordering::SeqCst);
}
}

View File

@@ -14,8 +14,6 @@
//! Internal metrics of the memtable.
use std::sync::atomic::{AtomicI64, Ordering};
/// Metrics of writing memtables.
pub(crate) struct WriteMetrics {
/// Size allocated by keys.
@@ -28,51 +26,6 @@ pub(crate) struct WriteMetrics {
pub(crate) max_ts: i64,
}
impl WriteMetrics {
/// Update the min/max timestamp range according to current write metric.
pub(crate) fn update_timestamp_range(&self, prev_max_ts: &AtomicI64, prev_min_ts: &AtomicI64) {
loop {
let current_min = prev_min_ts.load(Ordering::Relaxed);
if self.min_ts >= current_min {
break;
}
let Err(updated) = prev_min_ts.compare_exchange(
current_min,
self.min_ts,
Ordering::Relaxed,
Ordering::Relaxed,
) else {
break;
};
if updated == self.min_ts {
break;
}
}
loop {
let current_max = prev_max_ts.load(Ordering::Relaxed);
if self.max_ts <= current_max {
break;
}
let Err(updated) = prev_max_ts.compare_exchange(
current_max,
self.max_ts,
Ordering::Relaxed,
Ordering::Relaxed,
) else {
break;
};
if updated == self.max_ts {
break;
}
}
}
}
impl Default for WriteMetrics {
fn default() -> Self {
Self {

View File

@@ -147,7 +147,8 @@ impl TimeSeriesMemtable {
fn update_stats(&self, stats: WriteMetrics) {
self.alloc_tracker
.on_allocation(stats.key_bytes + stats.value_bytes);
stats.update_timestamp_range(&self.max_timestamp, &self.min_timestamp);
self.max_timestamp.fetch_max(stats.max_ts, Ordering::SeqCst);
self.min_timestamp.fetch_min(stats.min_ts, Ordering::SeqCst);
}
fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {

View File

@@ -322,13 +322,10 @@ impl ScanRegion {
let memtables: Vec<_> = memtables
.into_iter()
.filter(|mem| {
if mem.is_empty() {
// check if memtable is empty by reading stats.
let Some((start, end)) = mem.stats().time_range() else {
return false;
}
let stats = mem.stats();
// Safety: the memtable is not empty.
let (start, end) = stats.time_range().unwrap();
};
// The time range of the memtable is inclusive.
let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
memtable_range.intersects(&time_range)

View File

@@ -134,6 +134,7 @@ impl WriteFormat {
/// Helper for reading the SST format.
pub struct ReadFormat {
/// The metadata stored in the SST.
metadata: RegionMetadataRef,
/// SST file schema.
arrow_schema: SchemaRef,
@@ -305,17 +306,23 @@ impl ReadFormat {
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> Option<ArrayRef> {
let column = self.metadata.column_by_id(column_id)?;
) -> StatValues {
let Some(column) = self.metadata.column_by_id(column_id) else {
// No such column in the SST.
return StatValues::NoColumn;
};
match column.semantic_type {
SemanticType::Tag => self.tag_values(row_groups, column, true),
SemanticType::Field => {
let index = self.field_id_to_index.get(&column_id)?;
Self::column_values(row_groups, column, *index, true)
// Safety: `field_id_to_index` is initialized by the semantic type.
let index = self.field_id_to_index.get(&column_id).unwrap();
let stats = Self::column_values(row_groups, column, *index, true);
StatValues::from_stats_opt(stats)
}
SemanticType::Timestamp => {
let index = self.time_index_position();
Self::column_values(row_groups, column, index, true)
let stats = Self::column_values(row_groups, column, index, true);
StatValues::from_stats_opt(stats)
}
}
}
@@ -325,17 +332,23 @@ impl ReadFormat {
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> Option<ArrayRef> {
let column = self.metadata.column_by_id(column_id)?;
) -> StatValues {
let Some(column) = self.metadata.column_by_id(column_id) else {
// No such column in the SST.
return StatValues::NoColumn;
};
match column.semantic_type {
SemanticType::Tag => self.tag_values(row_groups, column, false),
SemanticType::Field => {
let index = self.field_id_to_index.get(&column_id)?;
Self::column_values(row_groups, column, *index, false)
// Safety: `field_id_to_index` is initialized by the semantic type.
let index = self.field_id_to_index.get(&column_id).unwrap();
let stats = Self::column_values(row_groups, column, *index, false);
StatValues::from_stats_opt(stats)
}
SemanticType::Timestamp => {
let index = self.time_index_position();
Self::column_values(row_groups, column, index, false)
let stats = Self::column_values(row_groups, column, index, false);
StatValues::from_stats_opt(stats)
}
}
}
@@ -345,17 +358,23 @@ impl ReadFormat {
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> Option<ArrayRef> {
let column = self.metadata.column_by_id(column_id)?;
) -> StatValues {
let Some(column) = self.metadata.column_by_id(column_id) else {
// No such column in the SST.
return StatValues::NoColumn;
};
match column.semantic_type {
SemanticType::Tag => None,
SemanticType::Tag => StatValues::NoStats,
SemanticType::Field => {
let index = self.field_id_to_index.get(&column_id)?;
Self::column_null_counts(row_groups, *index)
// Safety: `field_id_to_index` is initialized by the semantic type.
let index = self.field_id_to_index.get(&column_id).unwrap();
let stats = Self::column_null_counts(row_groups, *index);
StatValues::from_stats_opt(stats)
}
SemanticType::Timestamp => {
let index = self.time_index_position();
Self::column_null_counts(row_groups, index)
let stats = Self::column_null_counts(row_groups, index);
StatValues::from_stats_opt(stats)
}
}
}
@@ -390,8 +409,7 @@ impl ReadFormat {
row_groups: &[impl Borrow<RowGroupMetaData>],
column: &ColumnMetadata,
is_min: bool,
) -> Option<ArrayRef> {
let primary_key_encoding = self.metadata.primary_key_encoding;
) -> StatValues {
let is_first_tag = self
.metadata
.primary_key
@@ -400,9 +418,28 @@ impl ReadFormat {
.unwrap_or(false);
if !is_first_tag {
// Only the min-max of the first tag is available in the primary key.
return None;
return StatValues::NoStats;
}
StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
}
/// Returns min/max values of the first tag.
/// Returns None if the tag does not have statistics.
fn first_tag_values(
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column: &ColumnMetadata,
is_min: bool,
) -> Option<ArrayRef> {
debug_assert!(self
.metadata
.primary_key
.first()
.map(|id| *id == column.column_id)
.unwrap_or(false));
let primary_key_encoding = self.metadata.primary_key_encoding;
let converter = build_primary_key_codec_with_fields(
primary_key_encoding,
[(
@@ -452,6 +489,7 @@ impl ReadFormat {
}
/// Returns min/max values of specific non-tag columns.
/// Returns None if the column does not have statistics.
fn column_values(
row_groups: &[impl Borrow<RowGroupMetaData>],
column: &ColumnMetadata,
@@ -544,6 +582,29 @@ impl ReadFormat {
}
}
/// Values of column statistics of the SST.
///
/// It also distinguishes the case that a column is not found and
/// the column exists but has no statistics.
pub enum StatValues {
/// Values of each row group.
Values(ArrayRef),
/// No such column.
NoColumn,
/// Column exists but has no statistics.
NoStats,
}
impl StatValues {
/// Creates a new `StatValues` instance from optional statistics.
pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
match stats {
Some(stats) => StatValues::Values(stats),
None => StatValues::NoStats,
}
}
}
#[cfg(test)]
impl ReadFormat {
/// Creates a helper with existing `metadata` and all columns.

View File

@@ -25,7 +25,7 @@ use parquet::file::metadata::RowGroupMetaData;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::format::{ReadFormat, StatValues};
/// Statistics for pruning row groups.
pub(crate) struct RowGroupPruningStats<'a, T> {
@@ -100,16 +100,18 @@ impl<T: Borrow<RowGroupMetaData>> PruningStatistics for RowGroupPruningStats<'_,
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
let column_id = self.column_id_to_prune(&column.name)?;
match self.read_format.min_values(self.row_groups, column_id) {
Some(values) => Some(values),
None => self.compat_default_value(&column.name),
StatValues::Values(values) => Some(values),
StatValues::NoColumn => self.compat_default_value(&column.name),
StatValues::NoStats => None,
}
}
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
let column_id = self.column_id_to_prune(&column.name)?;
match self.read_format.max_values(self.row_groups, column_id) {
Some(values) => Some(values),
None => self.compat_default_value(&column.name),
StatValues::Values(values) => Some(values),
StatValues::NoColumn => self.compat_default_value(&column.name),
StatValues::NoStats => None,
}
}
@@ -118,10 +120,12 @@ impl<T: Borrow<RowGroupMetaData>> PruningStatistics for RowGroupPruningStats<'_,
}
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
let Some(column_id) = self.column_id_to_prune(&column.name) else {
return self.compat_null_count(&column.name);
};
self.read_format.null_counts(self.row_groups, column_id)
let column_id = self.column_id_to_prune(&column.name)?;
match self.read_format.null_counts(self.row_groups, column_id) {
StatValues::Values(values) => Some(values),
StatValues::NoColumn => self.compat_null_count(&column.name),
StatValues::NoStats => None,
}
}
fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {

View File

@@ -20,7 +20,7 @@ use arrow_schema::DataType;
use catalog::table_source::DfTableSourceProvider;
use common_function::aggr::{
GeoPathAccumulator, HllState, UddSketchState, GEO_PATH_NAME, HLL_MERGE_NAME, HLL_NAME,
UDDSKETCH_STATE_NAME,
UDDSKETCH_MERGE_NAME, UDDSKETCH_STATE_NAME,
};
use common_function::scalars::udf::create_udf;
use common_query::logical_plan::create_aggregate_function;
@@ -165,7 +165,9 @@ impl ContextProvider for DfContextProviderAdapter {
fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
if name == UDDSKETCH_STATE_NAME {
return Some(Arc::new(UddSketchState::udf_impl()));
return Some(Arc::new(UddSketchState::state_udf_impl()));
} else if name == UDDSKETCH_MERGE_NAME {
return Some(Arc::new(UddSketchState::merge_udf_impl()));
} else if name == HLL_NAME {
return Some(Arc::new(HllState::state_udf_impl()));
} else if name == HLL_MERGE_NAME {

View File

@@ -128,7 +128,8 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder {
session_state
.register_udf(udf)
.context(RegisterUdfSnafu { name: func.name() })?;
let _ = session_state.register_udaf(Arc::new(UddSketchState::udf_impl()));
let _ = session_state.register_udaf(Arc::new(UddSketchState::state_udf_impl()));
let _ = session_state.register_udaf(Arc::new(UddSketchState::merge_udf_impl()));
let _ = session_state.register_udaf(Arc::new(HllState::state_udf_impl()));
let _ = session_state.register_udaf(Arc::new(HllState::merge_udf_impl()));
let _ = session_state.register_udaf(Arc::new(GeoPathAccumulator::udf_impl()));

View File

@@ -52,7 +52,41 @@ select uddsketch_calc(0.95, uddsketch_state(128, 0.01, `value`)) from test_uddsk
| 100.49456770856492 |
+----------------------------------------------------------------------------------------------+
CREATE TABLE grouped_uddsketch (
`state` BINARY,
id_group INT PRIMARY KEY,
`ts` timestamp time index default now()
);
Affected Rows: 0
INSERT INTO grouped_uddsketch (`state`, id_group) SELECT uddsketch_state(128, 0.01, `value`), `id`/5*5 as id_group FROM test_uddsketch GROUP BY id_group;
Affected Rows: 3
SELECT uddsketch_calc(0.1, uddsketch_merge(128, 0.01, `state`)) FROM grouped_uddsketch;
+------------------------------------------------------------------------------------------------+
| uddsketch_calc(Float64(0.1),uddsketch_merge(Int64(128),Float64(0.01),grouped_uddsketch.state)) |
+------------------------------------------------------------------------------------------------+
| 19.886670240866184 |
+------------------------------------------------------------------------------------------------+
-- should fail
SELECT uddsketch_calc(0.1, uddsketch_merge(128, 0.1, `state`)) FROM grouped_uddsketch;
Error: 3001(EngineExecuteQuery), Error during planning: Merging UDDSketch with different parameters: arguments=(128, 0.1) vs actual input=(128, 0.01)
-- should fail
SELECT uddsketch_calc(0.1, uddsketch_merge(64, 0.01, `state`)) FROM grouped_uddsketch;
Error: 3001(EngineExecuteQuery), Error during planning: Merging UDDSketch with different parameters: arguments=(64, 0.01) vs actual input=(128, 0.01)
drop table test_uddsketch;
Affected Rows: 0
drop table grouped_uddsketch;
Affected Rows: 0

View File

@@ -24,4 +24,21 @@ select uddsketch_calc(0.75, uddsketch_state(128, 0.01, `value`)) from test_uddsk
select uddsketch_calc(0.95, uddsketch_state(128, 0.01, `value`)) from test_uddsketch;
CREATE TABLE grouped_uddsketch (
`state` BINARY,
id_group INT PRIMARY KEY,
`ts` timestamp time index default now()
);
INSERT INTO grouped_uddsketch (`state`, id_group) SELECT uddsketch_state(128, 0.01, `value`), `id`/5*5 as id_group FROM test_uddsketch GROUP BY id_group;
SELECT uddsketch_calc(0.1, uddsketch_merge(128, 0.01, `state`)) FROM grouped_uddsketch;
-- should fail
SELECT uddsketch_calc(0.1, uddsketch_merge(128, 0.1, `state`)) FROM grouped_uddsketch;
-- should fail
SELECT uddsketch_calc(0.1, uddsketch_merge(64, 0.01, `state`)) FROM grouped_uddsketch;
drop table test_uddsketch;
drop table grouped_uddsketch;

View File

@@ -31,6 +31,15 @@ FROM
Affected Rows: 0
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
distinct_basic
VALUES

View File

@@ -23,6 +23,9 @@ FROM
distinct_basic;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
distinct_basic
VALUES

View File

@@ -44,6 +44,15 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
@@ -101,6 +110,16 @@ GROUP BY
Affected Rows: 0
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
@@ -118,6 +137,15 @@ SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
SHOW CREATE FLOW test_numbers_basic;
+--------------------+---------------------------------------------------------------------------------------+

View File

@@ -20,6 +20,9 @@ SHOW CREATE TABLE out_num_cnt_basic;
ADMIN FLUSH_FLOW('test_numbers_basic');
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
SHOW CREATE TABLE out_num_cnt_basic;
SHOW CREATE FLOW test_numbers_basic;
@@ -44,10 +47,16 @@ FROM
numbers_input_basic
GROUP BY
ts;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
SHOW CREATE TABLE out_num_cnt_basic;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
SHOW CREATE FLOW test_numbers_basic;
SHOW CREATE TABLE out_num_cnt_basic;

View File

@@ -62,6 +62,15 @@ SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
numbers_input_basic
VALUES
@@ -206,6 +215,15 @@ SHOW CREATE TABLE out_basic;
+-----------+---------------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
@@ -306,6 +324,15 @@ ADMIN FLUSH_FLOW('test_distinct_basic');
+-----------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
distinct_basic
VALUES
@@ -1665,6 +1692,15 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
numbers_input_basic
VALUES

View File

@@ -24,6 +24,9 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
SHOW CREATE TABLE out_num_cnt_basic;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
numbers_input_basic
VALUES
@@ -91,6 +94,9 @@ FROM
SHOW CREATE TABLE out_basic;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
@@ -130,6 +136,9 @@ SHOW CREATE TABLE out_distinct_basic;
ADMIN FLUSH_FLOW('test_distinct_basic');
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
distinct_basic
VALUES
@@ -788,6 +797,9 @@ SHOW CREATE TABLE out_num_cnt_basic;
ADMIN FLUSH_FLOW('test_numbers_basic');
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
numbers_input_basic
VALUES

View File

@@ -730,10 +730,21 @@ SELECT key FROM api_stats;
+-----+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 5s
INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1);
Affected Rows: 1
-- wait more time so flownode have time to recover flows
-- SQLNESS SLEEP 5s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('api_stats_flow');

View File

@@ -399,8 +399,13 @@ ADMIN FLUSH_FLOW('api_stats_flow');
SELECT key FROM api_stats;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 5s
INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1);
-- wait more time so flownode have time to recover flows
-- SQLNESS SLEEP 5s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('api_stats_flow');

View File

@@ -50,6 +50,7 @@ ADMIN FLUSH_FLOW('calc_access_log_10s');
+-----------------------------------------+
-- query should return 3 rows
-- SQLNESS SORT_RESULT 3 1
SELECT "url", time_window FROM access_log_10s
ORDER BY
time_window;
@@ -63,6 +64,7 @@ ORDER BY
+------------+---------------------+
-- use hll_count to query the approximate data in access_log_10s
-- SQLNESS SORT_RESULT 3 1
SELECT "url", time_window, hll_count(state) FROM access_log_10s
ORDER BY
time_window;
@@ -76,6 +78,7 @@ ORDER BY
+------------+---------------------+---------------------------------+
-- further, we can aggregate 10 seconds of data to every minute, by using hll_merge to merge 10 seconds of hyperloglog state
-- SQLNESS SORT_RESULT 3 1
SELECT
"url",
date_bin('1 minute'::INTERVAL, time_window) AS time_window_1m,
@@ -91,8 +94,8 @@ ORDER BY
+------------+---------------------+------------+
| url | time_window_1m | uv_per_min |
+------------+---------------------+------------+
| /not_found | 2025-03-04T00:00:00 | 1 |
| /dashboard | 2025-03-04T00:00:00 | 3 |
| /not_found | 2025-03-04T00:00:00 | 1 |
+------------+---------------------+------------+
DROP FLOW calc_access_log_10s;
@@ -201,6 +204,13 @@ CREATE TABLE percentile_5s (
Affected Rows: 0
CREATE TABLE percentile_10s (
"percentile_state" BINARY,
time_window timestamp(0) time index
);
Affected Rows: 0
CREATE FLOW calc_percentile_5s SINK TO percentile_5s
AS
SELECT
@@ -213,6 +223,18 @@ GROUP BY
Affected Rows: 0
CREATE FLOW calc_percentile_10s SINK TO percentile_10s
AS
SELECT
uddsketch_merge(128, 0.01, percentile_state),
date_bin('10 seconds'::INTERVAL, time_window) AS time_window
FROM
percentile_5s
GROUP BY
date_bin('10 seconds'::INTERVAL, time_window);
Affected Rows: 0
INSERT INTO percentile_base ("id", "value", ts) VALUES
(1, 10.0, 1),
(2, 20.0, 2),
@@ -236,6 +258,15 @@ ADMIN FLUSH_FLOW('calc_percentile_5s');
| FLOW_FLUSHED |
+----------------------------------------+
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_percentile_10s');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('calc_percentile_10s') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
SELECT
time_window,
uddsketch_calc(0.99, percentile_state) AS p99
@@ -252,14 +283,37 @@ ORDER BY
| 1970-01-01T00:00:10 | |
+---------------------+--------------------+
SELECT
time_window,
uddsketch_calc(0.99, percentile_state) AS p99
FROM
percentile_10s
ORDER BY
time_window;
+---------------------+--------------------+
| time_window | p99 |
+---------------------+--------------------+
| 1970-01-01T00:00:00 | 59.745049810145126 |
| 1970-01-01T00:00:10 | |
+---------------------+--------------------+
DROP FLOW calc_percentile_5s;
Affected Rows: 0
DROP FLOW calc_percentile_10s;
Affected Rows: 0
DROP TABLE percentile_5s;
Affected Rows: 0
DROP TABLE percentile_10s;
Affected Rows: 0
DROP TABLE percentile_base;
Affected Rows: 0

View File

@@ -36,16 +36,19 @@ INSERT INTO access_log VALUES
ADMIN FLUSH_FLOW('calc_access_log_10s');
-- query should return 3 rows
-- SQLNESS SORT_RESULT 3 1
SELECT "url", time_window FROM access_log_10s
ORDER BY
time_window;
-- use hll_count to query the approximate data in access_log_10s
-- SQLNESS SORT_RESULT 3 1
SELECT "url", time_window, hll_count(state) FROM access_log_10s
ORDER BY
time_window;
-- further, we can aggregate 10 seconds of data to every minute, by using hll_merge to merge 10 seconds of hyperloglog state
-- SQLNESS SORT_RESULT 3 1
SELECT
"url",
date_bin('1 minute'::INTERVAL, time_window) AS time_window_1m,
@@ -123,6 +126,11 @@ CREATE TABLE percentile_5s (
time_window timestamp(0) time index
);
CREATE TABLE percentile_10s (
"percentile_state" BINARY,
time_window timestamp(0) time index
);
CREATE FLOW calc_percentile_5s SINK TO percentile_5s
AS
SELECT
@@ -133,6 +141,16 @@ FROM
GROUP BY
time_window;
CREATE FLOW calc_percentile_10s SINK TO percentile_10s
AS
SELECT
uddsketch_merge(128, 0.01, percentile_state),
date_bin('10 seconds'::INTERVAL, time_window) AS time_window
FROM
percentile_5s
GROUP BY
date_bin('10 seconds'::INTERVAL, time_window);
INSERT INTO percentile_base ("id", "value", ts) VALUES
(1, 10.0, 1),
(2, 20.0, 2),
@@ -148,6 +166,9 @@ INSERT INTO percentile_base ("id", "value", ts) VALUES
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_percentile_5s');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_percentile_10s');
SELECT
time_window,
uddsketch_calc(0.99, percentile_state) AS p99
@@ -156,6 +177,16 @@ FROM
ORDER BY
time_window;
SELECT
time_window,
uddsketch_calc(0.99, percentile_state) AS p99
FROM
percentile_10s
ORDER BY
time_window;
DROP FLOW calc_percentile_5s;
DROP FLOW calc_percentile_10s;
DROP TABLE percentile_5s;
DROP TABLE percentile_10s;
DROP TABLE percentile_base;

View File

@@ -263,6 +263,15 @@ SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORM
-- makesure after recover should be the same
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
+---------------------+---------------+-------------------------------------------------------------+------------------------------------+

View File

@@ -108,6 +108,9 @@ SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORM
-- makesure after recover should be the same
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';

View File

@@ -9,7 +9,7 @@ select GREATEST('1999-01-30', '2023-03-01');
+-------------------------------------------------+
| greatest(Utf8("1999-01-30"),Utf8("2023-03-01")) |
+-------------------------------------------------+
| 2023-03-01T00:00:00 |
| 2023-03-01 |
+-------------------------------------------------+
select GREATEST('2000-02-11'::Date, '2020-12-30'::Date);

View File

@@ -0,0 +1,158 @@
CREATE TABLE IF NOT EXISTS `test_multi_pk_filter` ( `namespace` STRING NULL, `env` STRING NULL DEFAULT 'NULL', `flag` INT NULL, `total` BIGINT NULL, `greptime_timestamp` TIMESTAMP(9) NOT NULL, TIME INDEX (`greptime_timestamp`), PRIMARY KEY (`namespace`, `env`, `flag`) ) ENGINE=mito;
Affected Rows: 0
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'production', 1, 5289, '2023-05-15 10:00:00');
Affected Rows: 1
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'production', 0, 421, '2023-05-15 10:05:00');
Affected Rows: 1
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'dev', 1, 356, '2023-05-15 10:10:00');
Affected Rows: 1
ADMIN FLUSH_TABLE('test_multi_pk_filter');
+-------------------------------------------+
| ADMIN FLUSH_TABLE('test_multi_pk_filter') |
+-------------------------------------------+
| 0 |
+-------------------------------------------+
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'dev', 1, 412, '2023-05-15 10:15:00');
Affected Rows: 1
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'dev', 1, 298, '2023-05-15 10:20:00');
Affected Rows: 1
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'production', 1, 5289, '2023-05-15 10:25:00');
Affected Rows: 1
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'production', 1, 5874, '2023-05-15 10:30:00');
Affected Rows: 1
ADMIN FLUSH_TABLE('test_multi_pk_filter');
+-------------------------------------------+
| ADMIN FLUSH_TABLE('test_multi_pk_filter') |
+-------------------------------------------+
| 0 |
+-------------------------------------------+
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'production', 1, 6132, '2023-05-15 10:35:00');
Affected Rows: 1
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'testing', 1, 1287, '2023-05-15 10:40:00');
Affected Rows: 1
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'testing', 1, 1432, '2023-05-15 10:45:00');
Affected Rows: 1
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'testing', 1, 1056, '2023-05-15 10:50:00');
Affected Rows: 1
SELECT greptime_timestamp, namespace, env, total FROM test_multi_pk_filter WHERE
greptime_timestamp BETWEEN '2023-05-15 10:00:00' AND '2023-05-15 11:00:00' AND flag = 1 AND namespace = 'thermostat_v2'
ORDER BY greptime_timestamp;
+---------------------+---------------+------------+-------+
| greptime_timestamp | namespace | env | total |
+---------------------+---------------+------------+-------+
| 2023-05-15T10:00:00 | thermostat_v2 | production | 5289 |
| 2023-05-15T10:10:00 | thermostat_v2 | dev | 356 |
| 2023-05-15T10:15:00 | thermostat_v2 | dev | 412 |
| 2023-05-15T10:20:00 | thermostat_v2 | dev | 298 |
| 2023-05-15T10:25:00 | thermostat_v2 | production | 5289 |
| 2023-05-15T10:30:00 | thermostat_v2 | production | 5874 |
| 2023-05-15T10:35:00 | thermostat_v2 | production | 6132 |
| 2023-05-15T10:40:00 | thermostat_v2 | testing | 1287 |
| 2023-05-15T10:45:00 | thermostat_v2 | testing | 1432 |
| 2023-05-15T10:50:00 | thermostat_v2 | testing | 1056 |
+---------------------+---------------+------------+-------+
SELECT greptime_timestamp, namespace, env, total FROM test_multi_pk_filter WHERE
greptime_timestamp BETWEEN '2023-05-15 10:00:00' AND '2023-05-15 11:00:00' AND flag = 1 AND namespace = 'thermostat_v2' AND env='dev'
ORDER BY greptime_timestamp;
+---------------------+---------------+-----+-------+
| greptime_timestamp | namespace | env | total |
+---------------------+---------------+-----+-------+
| 2023-05-15T10:10:00 | thermostat_v2 | dev | 356 |
| 2023-05-15T10:15:00 | thermostat_v2 | dev | 412 |
| 2023-05-15T10:20:00 | thermostat_v2 | dev | 298 |
+---------------------+---------------+-----+-------+
DROP TABLE test_multi_pk_filter;
Affected Rows: 0
CREATE TABLE IF NOT EXISTS `test_multi_pk_null` ( `namespace` STRING NULL, `env` STRING NULL DEFAULT 'NULL', `total` BIGINT NULL, `greptime_timestamp` TIMESTAMP(9) NOT NULL, TIME INDEX (`greptime_timestamp`), PRIMARY KEY (`namespace`, `env`) ) ENGINE=mito;
Affected Rows: 0
INSERT INTO test_multi_pk_null
(namespace, env, total, greptime_timestamp)
VALUES ('thermostat_v2', 'production', 5289, '2023-05-15 10:00:00');
Affected Rows: 1
INSERT INTO test_multi_pk_null
(namespace, env, total, greptime_timestamp)
VALUES ('thermostat_v2', 'production', 421, '2023-05-15 10:05:00');
Affected Rows: 1
ADMIN FLUSH_TABLE('test_multi_pk_null');
+-----------------------------------------+
| ADMIN FLUSH_TABLE('test_multi_pk_null') |
+-----------------------------------------+
| 0 |
+-----------------------------------------+
SELECT * FROM test_multi_pk_null WHERE env IS NOT NULL;
+---------------+------------+-------+---------------------+
| namespace | env | total | greptime_timestamp |
+---------------+------------+-------+---------------------+
| thermostat_v2 | production | 5289 | 2023-05-15T10:00:00 |
| thermostat_v2 | production | 421 | 2023-05-15T10:05:00 |
+---------------+------------+-------+---------------------+
DROP TABLE test_multi_pk_null;
Affected Rows: 0

View File

@@ -0,0 +1,66 @@
CREATE TABLE IF NOT EXISTS `test_multi_pk_filter` ( `namespace` STRING NULL, `env` STRING NULL DEFAULT 'NULL', `flag` INT NULL, `total` BIGINT NULL, `greptime_timestamp` TIMESTAMP(9) NOT NULL, TIME INDEX (`greptime_timestamp`), PRIMARY KEY (`namespace`, `env`, `flag`) ) ENGINE=mito;
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'production', 1, 5289, '2023-05-15 10:00:00');
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'production', 0, 421, '2023-05-15 10:05:00');
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'dev', 1, 356, '2023-05-15 10:10:00');
ADMIN FLUSH_TABLE('test_multi_pk_filter');
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'dev', 1, 412, '2023-05-15 10:15:00');
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'dev', 1, 298, '2023-05-15 10:20:00');
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'production', 1, 5289, '2023-05-15 10:25:00');
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'production', 1, 5874, '2023-05-15 10:30:00');
ADMIN FLUSH_TABLE('test_multi_pk_filter');
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'production', 1, 6132, '2023-05-15 10:35:00');
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'testing', 1, 1287, '2023-05-15 10:40:00');
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'testing', 1, 1432, '2023-05-15 10:45:00');
INSERT INTO test_multi_pk_filter
(namespace, env, flag, total, greptime_timestamp)
VALUES ('thermostat_v2', 'testing', 1, 1056, '2023-05-15 10:50:00');
SELECT greptime_timestamp, namespace, env, total FROM test_multi_pk_filter WHERE
greptime_timestamp BETWEEN '2023-05-15 10:00:00' AND '2023-05-15 11:00:00' AND flag = 1 AND namespace = 'thermostat_v2'
ORDER BY greptime_timestamp;
SELECT greptime_timestamp, namespace, env, total FROM test_multi_pk_filter WHERE
greptime_timestamp BETWEEN '2023-05-15 10:00:00' AND '2023-05-15 11:00:00' AND flag = 1 AND namespace = 'thermostat_v2' AND env='dev'
ORDER BY greptime_timestamp;
DROP TABLE test_multi_pk_filter;
CREATE TABLE IF NOT EXISTS `test_multi_pk_null` ( `namespace` STRING NULL, `env` STRING NULL DEFAULT 'NULL', `total` BIGINT NULL, `greptime_timestamp` TIMESTAMP(9) NOT NULL, TIME INDEX (`greptime_timestamp`), PRIMARY KEY (`namespace`, `env`) ) ENGINE=mito;
INSERT INTO test_multi_pk_null
(namespace, env, total, greptime_timestamp)
VALUES ('thermostat_v2', 'production', 5289, '2023-05-15 10:00:00');
INSERT INTO test_multi_pk_null
(namespace, env, total, greptime_timestamp)
VALUES ('thermostat_v2', 'production', 421, '2023-05-15 10:05:00');
ADMIN FLUSH_TABLE('test_multi_pk_null');
SELECT * FROM test_multi_pk_null WHERE env IS NOT NULL;
DROP TABLE test_multi_pk_null;