Merge remote-tracking branch 'origin/main' into zhongzc/repartition-procedure-scaffold

This commit is contained in:
Zhenchi
2025-10-22 08:50:16 +00:00
255 changed files with 12209 additions and 2428 deletions

View File

@@ -108,6 +108,8 @@ pub const INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID: u32 = 36;
pub const INFORMATION_SCHEMA_SSTS_MANIFEST_TABLE_ID: u32 = 37;
/// id for information_schema.ssts_storage
pub const INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID: u32 = 38;
/// id for information_schema.ssts_index_meta
pub const INFORMATION_SCHEMA_SSTS_INDEX_META_TABLE_ID: u32 = 39;
// ----- End of information_schema tables -----

View File

@@ -11,15 +11,14 @@ workspace = true
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-stat.workspace = true
config.workspace = true
humantime-serde.workspace = true
num_cpus.workspace = true
object-store.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
snafu.workspace = true
sysinfo.workspace = true
toml.workspace = true
[dev-dependencies]

View File

@@ -13,61 +13,22 @@
// limitations under the License.
use common_base::readable_size::ReadableSize;
use sysinfo::System;
/// Get the CPU core number of system, aware of cgroups.
pub fn get_cpus() -> usize {
// This function will check cgroups
num_cpus::get()
}
/// Get the total memory of the system.
/// If `cgroup_limits` is enabled, it will also check it.
pub fn get_sys_total_memory() -> Option<ReadableSize> {
if sysinfo::IS_SUPPORTED_SYSTEM {
let mut sys_info = System::new();
sys_info.refresh_memory();
let mut total_memory = sys_info.total_memory();
// Compare with cgroups memory limit, use smaller values
// This method is only implemented for Linux. It always returns None for all other systems.
if let Some(cgroup_limits) = sys_info.cgroup_limits() {
total_memory = total_memory.min(cgroup_limits.total_memory)
}
Some(ReadableSize(total_memory))
} else {
None
}
}
use common_stat::{get_total_cpu_millicores, get_total_memory_readable};
/// `ResourceSpec` holds the static resource specifications of a node,
/// such as CPU cores and memory capacity. These values are fixed
/// at startup and do not change dynamically during runtime.
#[derive(Debug, Clone, Copy)]
pub struct ResourceSpec {
pub cpus: usize,
pub cpus: i64,
pub memory: Option<ReadableSize>,
}
impl Default for ResourceSpec {
fn default() -> Self {
Self {
cpus: get_cpus(),
memory: get_sys_total_memory(),
cpus: get_total_cpu_millicores(),
memory: get_total_memory_readable(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_cpus() {
assert!(get_cpus() > 0);
}
#[test]
fn test_get_sys_total_memory() {
assert!(get_sys_total_memory().unwrap() > ReadableSize::mb(0));
}
}

View File

@@ -37,6 +37,8 @@ const COMPACT_TYPE_STRICT_WINDOW: &str = "strict_window";
/// Compact type: strict window (short name).
const COMPACT_TYPE_STRICT_WINDOW_SHORT: &str = "swcs";
const DEFAULT_COMPACTION_PARALLELISM: u32 = 1;
#[admin_fn(
name = FlushTableFunction,
display_name = flush_table,
@@ -95,7 +97,7 @@ pub(crate) async fn compact_table(
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
let request = parse_compact_params(params, query_ctx)?;
let request = parse_compact_request(params, query_ctx)?;
info!("Compact table request: {:?}", request);
let affected_rows = table_mutation_handler
@@ -117,37 +119,46 @@ fn compact_signature() -> Signature {
/// - `[<table_name>]`: only tables name provided, using default compaction type: regular
/// - `[<table_name>, <type>]`: specify table name and compaction type. The compaction options will be default.
/// - `[<table_name>, <type>, <options>]`: provides both type and type-specific options.
fn parse_compact_params(
/// - For `twcs`, it accepts `parallelism=[N]` where N is an unsigned 32 bits number
/// - For `swcs`, it accepts two numeric parameter: `parallelism` and `window`.
fn parse_compact_request(
params: &[ValueRef<'_>],
query_ctx: &QueryContextRef,
) -> Result<CompactTableRequest> {
ensure!(
!params.is_empty(),
!params.is_empty() && params.len() <= 3,
InvalidFuncArgsSnafu {
err_msg: "Args cannot be empty",
err_msg: format!(
"The length of the args is not correct, expect 1-4, have: {}",
params.len()
),
}
);
let (table_name, compact_type) = match params {
let (table_name, compact_type, parallelism) = match params {
// 1. Only table name, strategy defaults to twcs and default parallelism.
[ValueRef::String(table_name)] => (
table_name,
compact_request::Options::Regular(Default::default()),
DEFAULT_COMPACTION_PARALLELISM,
),
// 2. Both table name and strategy are provided.
[
ValueRef::String(table_name),
ValueRef::String(compact_ty_str),
] => {
let compact_type = parse_compact_type(compact_ty_str, None)?;
(table_name, compact_type)
let (compact_type, parallelism) = parse_compact_options(compact_ty_str, None)?;
(table_name, compact_type, parallelism)
}
// 3. Table name, strategy and strategy specific options
[
ValueRef::String(table_name),
ValueRef::String(compact_ty_str),
ValueRef::String(options_str),
] => {
let compact_type = parse_compact_type(compact_ty_str, Some(options_str))?;
(table_name, compact_type)
let (compact_type, parallelism) =
parse_compact_options(compact_ty_str, Some(options_str))?;
(table_name, compact_type, parallelism)
}
_ => {
return UnsupportedInputDataTypeSnafu {
@@ -167,35 +178,126 @@ fn parse_compact_params(
schema_name,
table_name,
compact_options: compact_type,
parallelism,
})
}
/// Parses compaction strategy type. For `strict_window` or `swcs` strict window compaction is chose,
/// Parses compaction strategy type. For `strict_window` or `swcs` strict window compaction is chosen,
/// otherwise choose regular (TWCS) compaction.
fn parse_compact_type(type_str: &str, option: Option<&str>) -> Result<compact_request::Options> {
fn parse_compact_options(
type_str: &str,
option: Option<&str>,
) -> Result<(compact_request::Options, u32)> {
if type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW)
| type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW_SHORT)
{
let window_seconds = option
.map(|v| {
i64::from_str(v).map_err(|_| {
InvalidFuncArgsSnafu {
err_msg: format!(
"Compact window is expected to be a valid number, provided: {}",
v
),
}
.build()
})
})
.transpose()?
.unwrap_or(0);
let Some(option_str) = option else {
return Ok((
compact_request::Options::StrictWindow(StrictWindow { window_seconds: 0 }),
DEFAULT_COMPACTION_PARALLELISM,
));
};
Ok(compact_request::Options::StrictWindow(StrictWindow {
window_seconds,
}))
// For compatibility, accepts single number as window size.
if let Ok(window_seconds) = i64::from_str(option_str) {
return Ok((
compact_request::Options::StrictWindow(StrictWindow { window_seconds }),
DEFAULT_COMPACTION_PARALLELISM,
));
};
// Parse keyword arguments in forms: `key1=value1,key2=value2`
let mut window_seconds = 0i64;
let mut parallelism = DEFAULT_COMPACTION_PARALLELISM;
let pairs: Vec<&str> = option_str.split(',').collect();
for pair in pairs {
let kv: Vec<&str> = pair.trim().split('=').collect();
if kv.len() != 2 {
return InvalidFuncArgsSnafu {
err_msg: format!("Invalid key-value pair: {}", pair.trim()),
}
.fail();
}
let key = kv[0].trim();
let value = kv[1].trim();
match key {
"window" | "window_seconds" => {
window_seconds = i64::from_str(value).map_err(|_| {
InvalidFuncArgsSnafu {
err_msg: format!("Invalid value for window: {}", value),
}
.build()
})?;
}
"parallelism" => {
parallelism = value.parse::<u32>().map_err(|_| {
InvalidFuncArgsSnafu {
err_msg: format!("Invalid value for parallelism: {}", value),
}
.build()
})?;
}
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!("Unknown parameter: {}", key),
}
.fail();
}
}
}
Ok((
compact_request::Options::StrictWindow(StrictWindow { window_seconds }),
parallelism,
))
} else {
Ok(compact_request::Options::Regular(Default::default()))
// TWCS strategy
let Some(option_str) = option else {
return Ok((
compact_request::Options::Regular(Default::default()),
DEFAULT_COMPACTION_PARALLELISM,
));
};
let mut parallelism = DEFAULT_COMPACTION_PARALLELISM;
let pairs: Vec<&str> = option_str.split(',').collect();
for pair in pairs {
let kv: Vec<&str> = pair.trim().split('=').collect();
if kv.len() != 2 {
return InvalidFuncArgsSnafu {
err_msg: format!("Invalid key-value pair: {}", pair.trim()),
}
.fail();
}
let key = kv[0].trim();
let value = kv[1].trim();
match key {
"parallelism" => {
parallelism = value.parse::<u32>().map_err(|_| {
InvalidFuncArgsSnafu {
err_msg: format!("Invalid value for parallelism: {}", value),
}
.build()
})?;
}
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!("Unknown parameter: {}", key),
}
.fail();
}
}
}
Ok((
compact_request::Options::Regular(Default::default()),
parallelism,
))
}
}
@@ -301,7 +403,7 @@ mod tests {
assert_eq!(
expected,
&parse_compact_params(&params, &QueryContext::arc()).unwrap()
&parse_compact_request(&params, &QueryContext::arc()).unwrap()
);
}
}
@@ -316,6 +418,7 @@ mod tests {
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
parallelism: 1,
},
),
(
@@ -325,6 +428,7 @@ mod tests {
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
parallelism: 1,
},
),
(
@@ -337,6 +441,7 @@ mod tests {
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
parallelism: 1,
},
),
(
@@ -346,6 +451,7 @@ mod tests {
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
parallelism: 1,
},
),
(
@@ -355,6 +461,7 @@ mod tests {
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::StrictWindow(StrictWindow { window_seconds: 0 }),
parallelism: 1,
},
),
(
@@ -366,15 +473,7 @@ mod tests {
compact_options: Options::StrictWindow(StrictWindow {
window_seconds: 3600,
}),
},
),
(
&["table", "regular", "abcd"],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
parallelism: 1,
},
),
(
@@ -386,12 +485,82 @@ mod tests {
compact_options: Options::StrictWindow(StrictWindow {
window_seconds: 120,
}),
parallelism: 1,
},
),
// Test with parallelism parameter
(
&["table", "regular", "parallelism=4"],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
parallelism: 4,
},
),
(
&["table", "strict_window", "window=3600,parallelism=2"],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::StrictWindow(StrictWindow {
window_seconds: 3600,
}),
parallelism: 2,
},
),
(
&["table", "strict_window", "window=3600"],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::StrictWindow(StrictWindow {
window_seconds: 3600,
}),
parallelism: 1,
},
),
(
&["table", "strict_window", "window_seconds=7200"],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::StrictWindow(StrictWindow {
window_seconds: 7200,
}),
parallelism: 1,
},
),
(
&["table", "strict_window", "window=1800"],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::StrictWindow(StrictWindow {
window_seconds: 1800,
}),
parallelism: 1,
},
),
(
&["table", "regular", "parallelism=8"],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
parallelism: 8,
},
),
]);
assert!(
parse_compact_params(
parse_compact_request(
&["table", "strict_window", "abc"]
.into_iter()
.map(ValueRef::String)
@@ -402,7 +571,7 @@ mod tests {
);
assert!(
parse_compact_params(
parse_compact_request(
&["a.b.table", "strict_window", "abc"]
.into_iter()
.map(ValueRef::String)
@@ -411,5 +580,88 @@ mod tests {
)
.is_err()
);
// Test invalid parallelism
assert!(
parse_compact_request(
&["table", "regular", "options", "invalid"]
.into_iter()
.map(ValueRef::String)
.collect::<Vec<_>>(),
&QueryContext::arc(),
)
.is_err()
);
// Test too many parameters
assert!(
parse_compact_request(
&["table", "regular", "options", "4", "extra"]
.into_iter()
.map(ValueRef::String)
.collect::<Vec<_>>(),
&QueryContext::arc(),
)
.is_err()
);
// Test invalid keyword argument format
assert!(
parse_compact_request(
&["table", "strict_window", "window"]
.into_iter()
.map(ValueRef::String)
.collect::<Vec<_>>(),
&QueryContext::arc(),
)
.is_err()
);
// Test invalid keyword
assert!(
parse_compact_request(
&["table", "strict_window", "invalid_key=123"]
.into_iter()
.map(ValueRef::String)
.collect::<Vec<_>>(),
&QueryContext::arc(),
)
.is_err()
);
assert!(
parse_compact_request(
&["table", "regular", "abcd"]
.into_iter()
.map(ValueRef::String)
.collect::<Vec<_>>(),
&QueryContext::arc(),
)
.is_err()
);
// Test invalid window value
assert!(
parse_compact_request(
&["table", "strict_window", "window=abc"]
.into_iter()
.map(ValueRef::String)
.collect::<Vec<_>>(),
&QueryContext::arc(),
)
.is_err()
);
// Test invalid parallelism in options string
assert!(
parse_compact_request(
&["table", "strict_window", "parallelism=abc"]
.into_iter()
.map(ValueRef::String)
.collect::<Vec<_>>(),
&QueryContext::arc(),
)
.is_err()
);
}
}

View File

@@ -15,7 +15,7 @@
use std::borrow::Cow;
use std::sync::Arc;
use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, StringArray};
use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, LargeStringArray, StringArray};
use arrow_schema::{DataType, Field};
use datafusion::logical_expr::{Signature, TypeSignature, Volatility};
use datafusion_common::{Result, ScalarValue};
@@ -63,7 +63,7 @@ impl VectorProduct {
}
let t = args.schema.field(0).data_type();
if !matches!(t, DataType::Utf8 | DataType::Binary) {
if !matches!(t, DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary) {
return Err(datafusion_common::DataFusionError::Internal(format!(
"unexpected input datatype {t} when creating `VEC_PRODUCT`"
)));
@@ -91,6 +91,13 @@ impl VectorProduct {
.map(|x| x.map(Cow::Owned))
.collect::<Result<Vec<_>>>()?
}
DataType::LargeUtf8 => {
let arr: &LargeStringArray = values[0].as_string();
arr.iter()
.filter_map(|x| x.map(|s| parse_veclit_from_strlit(s).map_err(Into::into)))
.map(|x: Result<Vec<f32>>| x.map(Cow::Owned))
.collect::<Result<Vec<_>>>()?
}
DataType::Binary => {
let arr: &BinaryArray = values[0].as_binary();
arr.iter()

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, StringArray};
use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, LargeStringArray, StringArray};
use arrow_schema::{DataType, Field};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{
@@ -63,7 +63,7 @@ impl VectorSum {
}
let t = args.schema.field(0).data_type();
if !matches!(t, DataType::Utf8 | DataType::Binary) {
if !matches!(t, DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary) {
return Err(datafusion_common::DataFusionError::Internal(format!(
"unexpected input datatype {t} when creating `VEC_SUM`"
)));
@@ -98,6 +98,21 @@ impl VectorSum {
*self.inner(vec_column.len()) += vec_column;
}
}
DataType::LargeUtf8 => {
let arr: &LargeStringArray = values[0].as_string();
for s in arr.iter() {
let Some(s) = s else {
if is_update {
self.has_null = true;
self.sum = None;
}
return Ok(());
};
let values = parse_veclit_from_strlit(s)?;
let vec_column = DVectorView::from_slice(&values, values.len());
*self.inner(vec_column.len()) += vec_column;
}
}
DataType::Binary => {
let arr: &BinaryArray = values[0].as_binary();
for b in arr.iter() {

View File

@@ -1,123 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{Column, DeleteRequest as GrpcDeleteRequest};
use datatypes::prelude::ConcreteDataType;
use snafu::{ResultExt, ensure};
use table::requests::DeleteRequest;
use crate::error::{ColumnDataTypeSnafu, IllegalDeleteRequestSnafu, Result};
use crate::insert::add_values_to_builder;
pub fn to_table_delete_request(
catalog_name: &str,
schema_name: &str,
request: GrpcDeleteRequest,
) -> Result<DeleteRequest> {
let row_count = request.row_count as usize;
let mut key_column_values = HashMap::with_capacity(request.key_columns.len());
for Column {
column_name,
values,
null_mask,
datatype,
datatype_extension,
..
} in request.key_columns
{
let Some(values) = values else { continue };
let datatype: ConcreteDataType =
ColumnDataTypeWrapper::try_new(datatype, datatype_extension)
.context(ColumnDataTypeSnafu)?
.into();
let vector = add_values_to_builder(datatype, values, row_count, null_mask)?;
ensure!(
key_column_values
.insert(column_name.clone(), vector)
.is_none(),
IllegalDeleteRequestSnafu {
reason: format!("Duplicated column '{column_name}' in delete request.")
}
);
}
Ok(DeleteRequest {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: request.table_name,
key_column_values,
})
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::ColumnDataType;
use api::v1::column::Values;
use datatypes::prelude::{ScalarVector, VectorRef};
use datatypes::vectors::{Int32Vector, StringVector};
use super::*;
#[test]
fn test_to_table_delete_request() {
let grpc_request = GrpcDeleteRequest {
table_name: "foo".to_string(),
key_columns: vec![
Column {
column_name: "id".to_string(),
values: Some(Values {
i32_values: vec![1, 2, 3],
..Default::default()
}),
datatype: ColumnDataType::Int32 as i32,
..Default::default()
},
Column {
column_name: "name".to_string(),
values: Some(Values {
string_values: vec!["a".to_string(), "b".to_string(), "c".to_string()],
..Default::default()
}),
datatype: ColumnDataType::String as i32,
..Default::default()
},
],
row_count: 3,
};
let mut request =
to_table_delete_request("foo_catalog", "foo_schema", grpc_request).unwrap();
assert_eq!(request.catalog_name, "foo_catalog");
assert_eq!(request.schema_name, "foo_schema");
assert_eq!(request.table_name, "foo");
assert_eq!(
Arc::new(Int32Vector::from_slice(vec![1, 2, 3])) as VectorRef,
request.key_column_values.remove("id").unwrap()
);
assert_eq!(
Arc::new(StringVector::from_slice(&["a", "b", "c"])) as VectorRef,
request.key_column_values.remove("name").unwrap()
);
assert!(request.key_column_values.is_empty());
}
}

View File

@@ -25,13 +25,6 @@ use store_api::metadata::MetadataError;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Illegal delete request, reason: {reason}"))]
IllegalDeleteRequest {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Column datatype error"))]
ColumnDataType {
#[snafu(implicit)]
@@ -65,13 +58,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to create vector"))]
CreateVector {
#[snafu(implicit)]
location: Location,
source: datatypes::error::Error,
},
#[snafu(display("Missing required field in protobuf, field: {}", field))]
MissingField {
field: String,
@@ -87,13 +73,6 @@ pub enum Error {
source: api::error::Error,
},
#[snafu(display("Unexpected values length, reason: {}", reason))]
UnexpectedValuesLength {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unknown location type: {}", location_type))]
UnknownLocationType {
location_type: i32,
@@ -189,18 +168,13 @@ pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::IllegalDeleteRequest { .. } => StatusCode::InvalidArguments,
Error::ColumnDataType { .. } => StatusCode::Internal,
Error::DuplicatedTimestampColumn { .. }
| Error::DuplicatedColumnName { .. }
| Error::MissingTimestampColumn { .. } => StatusCode::InvalidArguments,
Error::CreateVector { .. } => StatusCode::InvalidArguments,
Error::MissingField { .. } => StatusCode::InvalidArguments,
Error::InvalidColumnDef { source, .. } => source.status_code(),
Error::UnexpectedValuesLength { .. } | Error::UnknownLocationType { .. } => {
StatusCode::InvalidArguments
}
Error::UnknownLocationType { .. } => StatusCode::InvalidArguments,
Error::UnknownColumnDataType { .. } | Error::InvalidStringIndexColumnType { .. } => {
StatusCode::InvalidArguments

View File

@@ -1,80 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::helper;
use api::v1::column::Values;
use common_base::BitVec;
use datatypes::data_type::{ConcreteDataType, DataType};
use datatypes::prelude::VectorRef;
use snafu::{ResultExt, ensure};
use crate::error::{CreateVectorSnafu, Result, UnexpectedValuesLengthSnafu};
pub(crate) fn add_values_to_builder(
data_type: ConcreteDataType,
values: Values,
row_count: usize,
null_mask: Vec<u8>,
) -> Result<VectorRef> {
if null_mask.is_empty() {
Ok(helper::pb_values_to_vector_ref(&data_type, values))
} else {
let builder = &mut data_type.create_mutable_vector(row_count);
let values = helper::pb_values_to_values(&data_type, values);
let null_mask = BitVec::from_vec(null_mask);
ensure!(
null_mask.count_ones() + values.len() == row_count,
UnexpectedValuesLengthSnafu {
reason: "If null_mask is not empty, the sum of the number of nulls and the length of values must be equal to row_count."
}
);
let mut idx_of_values = 0;
for idx in 0..row_count {
match is_null(&null_mask, idx) {
Some(true) => builder.push_null(),
_ => {
builder
.try_push_value_ref(&values[idx_of_values].as_value_ref())
.context(CreateVectorSnafu)?;
idx_of_values += 1
}
}
}
Ok(builder.to_vector())
}
}
fn is_null(null_mask: &BitVec, idx: usize) -> Option<bool> {
null_mask.get(idx).as_deref().copied()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_is_null() {
let null_mask = BitVec::from_slice(&[0b0000_0001, 0b0000_1000]);
assert_eq!(Some(true), is_null(&null_mask, 0));
assert_eq!(Some(false), is_null(&null_mask, 1));
assert_eq!(Some(false), is_null(&null_mask, 10));
assert_eq!(Some(true), is_null(&null_mask, 11));
assert_eq!(Some(false), is_null(&null_mask, 12));
assert_eq!(None, is_null(&null_mask, 16));
assert_eq!(None, is_null(&null_mask, 99));
}
}

View File

@@ -13,9 +13,7 @@
// limitations under the License.
mod alter;
pub mod delete;
pub mod error;
pub mod insert;
pub mod util;
pub use alter::{alter_expr_to_request, create_table_schema};

View File

@@ -90,6 +90,7 @@ fn impl_schema_method(fields: &[ParsedField<'_>]) -> Result<TokenStream2> {
Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim: #dim })) })
}
}
// TODO(sunng87): revisit all these implementations
Some(TypeExt::ListType(ext)) => {
let item_type = syn::Ident::new(&ext.datatype.to_string(), ident.span());
quote! {
@@ -108,6 +109,12 @@ fn impl_schema_method(fields: &[ParsedField<'_>]) -> Result<TokenStream2> {
Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::StructType(StructTypeExtension { fields: [#(#fields),*] })) })
}
}
Some(TypeExt::JsonNativeType(ext)) => {
let inner = syn::Ident::new(&ext.datatype.to_string(), ident.span());
quote! {
Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::JsonNativeType(JsonNativeTypeExtension { datatype: #inner })) })
}
}
None => {
quote! { None }
}

View File

@@ -124,6 +124,9 @@ pub struct NodeInfo {
// The node build memory bytes
#[serde(default)]
pub memory_bytes: u64,
// The node build hostname
#[serde(default)]
pub hostname: String,
}
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
@@ -332,6 +335,7 @@ mod tests {
start_time_ms: 1,
cpus: 0,
memory_bytes: 0,
hostname: "test_hostname".to_string(),
};
let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();

View File

@@ -131,6 +131,7 @@ pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo {
region_numbers: vec![],
options: TableOptions::try_from_iter(&expr.table_options).unwrap(),
created_on: DateTime::default(),
updated_on: DateTime::default(),
partition_key_indices: vec![],
column_ids: vec![],
},

View File

@@ -24,7 +24,7 @@ async fn test_heartbeat_mailbox() {
let mailbox = HeartbeatMailbox::new(tx);
let meta = MessageMeta::new_test(1, "test", "foo", "bar");
let reply = InstructionReply::OpenRegion(SimpleReply {
let reply = InstructionReply::OpenRegions(SimpleReply {
result: true,
error: None,
});

View File

@@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::time::Duration;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize};
use store_api::storage::{RegionId, RegionNumber};
use strum::Display;
use table::metadata::TableId;
@@ -394,16 +394,33 @@ impl From<RegionId> for FlushRegions {
}
}
#[derive(Debug, Deserialize)]
#[serde(untagged)]
enum SingleOrMultiple<T> {
Single(T),
Multiple(Vec<T>),
}
fn single_or_multiple_from<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
where
D: Deserializer<'de>,
T: Deserialize<'de>,
{
let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
Ok(match helper {
SingleOrMultiple::Single(x) => vec![x],
SingleOrMultiple::Multiple(xs) => xs,
})
}
#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
pub enum Instruction {
/// Opens a region.
///
/// - Returns true if a specified region exists.
OpenRegion(OpenRegion),
/// Closes a region.
///
/// - Returns true if a specified region does not exist.
CloseRegion(RegionIdent),
/// Opens regions.
#[serde(deserialize_with = "single_or_multiple_from", alias = "OpenRegion")]
OpenRegions(Vec<OpenRegion>),
/// Closes regions.
#[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")]
CloseRegions(Vec<RegionIdent>),
/// Upgrades a region.
UpgradeRegion(UpgradeRegion),
/// Downgrades a region.
@@ -438,8 +455,10 @@ impl Display for UpgradeRegionReply {
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum InstructionReply {
OpenRegion(SimpleReply),
CloseRegion(SimpleReply),
#[serde(alias = "open_region")]
OpenRegions(SimpleReply),
#[serde(alias = "close_region")]
CloseRegions(SimpleReply),
UpgradeRegion(UpgradeRegionReply),
DowngradeRegion(DowngradeRegionReply),
FlushRegions(FlushRegionReply),
@@ -448,8 +467,8 @@ pub enum InstructionReply {
impl Display for InstructionReply {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply),
Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply),
Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
Self::DowngradeRegion(reply) => {
write!(f, "InstructionReply::DowngradeRegion({})", reply)
@@ -459,13 +478,30 @@ impl Display for InstructionReply {
}
}
#[cfg(any(test, feature = "testing"))]
impl InstructionReply {
pub fn expect_close_regions_reply(self) -> SimpleReply {
match self {
Self::CloseRegions(reply) => reply,
_ => panic!("Expected CloseRegions reply"),
}
}
pub fn expect_open_regions_reply(self) -> SimpleReply {
match self {
Self::OpenRegions(reply) => reply,
_ => panic!("Expected OpenRegions reply"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_serialize_instruction() {
let open_region = Instruction::OpenRegion(OpenRegion::new(
let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
RegionIdent {
datanode_id: 2,
table_id: 1024,
@@ -476,30 +512,78 @@ mod tests {
HashMap::new(),
HashMap::new(),
false,
));
)]);
let serialized = serde_json::to_string(&open_region).unwrap();
assert_eq!(
r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#,
r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#,
serialized
);
let close_region = Instruction::CloseRegion(RegionIdent {
let close_region = Instruction::CloseRegions(vec![RegionIdent {
datanode_id: 2,
table_id: 1024,
region_number: 1,
engine: "mito2".to_string(),
});
}]);
let serialized = serde_json::to_string(&close_region).unwrap();
assert_eq!(
r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#,
r#"{"CloseRegions":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#,
serialized
);
}
#[test]
fn test_deserialize_instruction() {
let open_region_instruction = r#"{"OpenRegion":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#;
let open_region_instruction: Instruction =
serde_json::from_str(open_region_instruction).unwrap();
let open_region = Instruction::OpenRegions(vec![OpenRegion::new(
RegionIdent {
datanode_id: 2,
table_id: 1024,
region_number: 1,
engine: "mito2".to_string(),
},
"test/foo",
HashMap::new(),
HashMap::new(),
false,
)]);
assert_eq!(open_region_instruction, open_region);
let close_region_instruction = r#"{"CloseRegion":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#;
let close_region_instruction: Instruction =
serde_json::from_str(close_region_instruction).unwrap();
let close_region = Instruction::CloseRegions(vec![RegionIdent {
datanode_id: 2,
table_id: 1024,
region_number: 1,
engine: "mito2".to_string(),
}]);
assert_eq!(close_region_instruction, close_region);
let close_region_instruction_reply =
r#"{"result":true,"error":null,"type":"close_region"}"#;
let close_region_instruction_reply: InstructionReply =
serde_json::from_str(close_region_instruction_reply).unwrap();
let close_region_reply = InstructionReply::CloseRegions(SimpleReply {
result: true,
error: None,
});
assert_eq!(close_region_instruction_reply, close_region_reply);
let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#;
let open_region_instruction_reply: InstructionReply =
serde_json::from_str(open_region_instruction_reply).unwrap();
let open_region_reply = InstructionReply::OpenRegions(SimpleReply {
result: true,
error: None,
});
assert_eq!(open_region_instruction_reply, open_region_reply);
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct LegacyOpenRegion {
region_ident: RegionIdent,

View File

@@ -287,8 +287,13 @@ mod tests {
#[test]
fn test_deserialization_compatibility() {
let s = r#"{"version":1,"table_info":{"ident":{"table_id":8714,"version":0},"name":"go_gc_duration_seconds","desc":"Created on insertion","catalog_name":"e87lehzy63d4cloud_docs_test","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"instance","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"job","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"quantile","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"greptime_timestamp","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}},{"name":"greptime_value","data_type":{"Float64":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}}],"timestamp_index":3,"version":0},"primary_key_indices":[0,1,2],"value_indices":[],"engine":"mito","next_column_id":5,"region_numbers":[],"engine_options":{},"options":{"write_buffer_size":null,"ttl":null,"extra_options":{}},"created_on":"1970-01-01T00:00:00Z"},"table_type":"Base"}}"#;
let v = TableInfoValue::try_from_raw_value(s.as_bytes()).unwrap();
let old_fmt = r#"{"version":1,"table_info":{"ident":{"table_id":8714,"version":0},"name":"go_gc_duration_seconds","desc":"Created on insertion","catalog_name":"e87lehzy63d4cloud_docs_test","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"instance","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"job","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"quantile","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"greptime_timestamp","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}},{"name":"greptime_value","data_type":{"Float64":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}}],"timestamp_index":3,"version":0},"primary_key_indices":[0,1,2],"value_indices":[],"engine":"mito","next_column_id":5,"region_numbers":[],"engine_options":{},"options":{"write_buffer_size":null,"ttl":null,"extra_options":{}},"created_on":"1970-01-01T00:00:00Z"},"table_type":"Base"}}"#;
let new_fmt = r#"{"version":1,"table_info":{"ident":{"table_id":8714,"version":0},"name":"go_gc_duration_seconds","desc":"Created on insertion","catalog_name":"e87lehzy63d4cloud_docs_test","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"instance","data_type":{"String":{"size_type":"Utf8"}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"job","data_type":{"String":{"size_type":"Utf8"}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"quantile","data_type":{"String":{"size_type":"Utf8"}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"greptime_timestamp","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}},{"name":"greptime_value","data_type":{"Float64":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}}],"timestamp_index":3,"version":0},"primary_key_indices":[0,1,2],"value_indices":[],"engine":"mito","next_column_id":5,"region_numbers":[],"engine_options":{},"options":{"write_buffer_size":null,"ttl":null,"extra_options":{}},"created_on":"1970-01-01T00:00:00Z"},"table_type":"Base"}}"#;
let v = TableInfoValue::try_from_raw_value(old_fmt.as_bytes()).unwrap();
let new_v = TableInfoValue::try_from_raw_value(new_fmt.as_bytes()).unwrap();
assert_eq!(v, new_v);
assert_eq!(v.table_info.meta.created_on, v.table_info.meta.updated_on);
assert!(v.table_info.meta.partition_key_indices.is_empty());
}
@@ -328,6 +333,7 @@ mod tests {
schema: RawSchema::from(&schema),
engine: "mito".to_string(),
created_on: chrono::DateTime::default(),
updated_on: chrono::DateTime::default(),
primary_key_indices: vec![0, 1],
next_column_id: 3,
value_indices: vec![2, 3],

View File

@@ -1503,6 +1503,7 @@ mod tests {
region_numbers: vec![0],
options: Default::default(),
created_on: Default::default(),
updated_on: Default::default(),
partition_key_indices: Default::default(),
column_ids: Default::default(),
};

View File

@@ -12,8 +12,7 @@ use api::v1::{
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use crate::error;
use crate::error::Result;
use crate::error::{self, Result, TooLargeDurationSnafu};
use crate::rpc::ddl::DdlTask;
// Create trigger
@@ -27,7 +26,11 @@ pub struct CreateTriggerTask {
pub labels: HashMap<String, String>,
pub annotations: HashMap<String, String>,
pub interval: Duration,
pub raw_interval_expr: String,
pub raw_interval_expr: Option<String>,
pub r#for: Option<Duration>,
pub for_raw_expr: Option<String>,
pub keep_firing_for: Option<Duration>,
pub keep_firing_for_raw_expr: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
@@ -62,10 +65,20 @@ impl TryFrom<CreateTriggerTask> for PbCreateTriggerTask {
.map(PbNotifyChannel::from)
.collect();
let interval = task
.interval
.try_into()
.context(error::TooLargeDurationSnafu)?;
let interval = task.interval.try_into().context(TooLargeDurationSnafu)?;
let raw_interval_expr = task.raw_interval_expr.unwrap_or_default();
let r#for = task
.r#for
.map(|d| d.try_into().context(TooLargeDurationSnafu))
.transpose()?;
let for_raw_expr = task.for_raw_expr.unwrap_or_default();
let keep_firing_for = task
.keep_firing_for
.map(|d| d.try_into().context(TooLargeDurationSnafu))
.transpose()?;
let keep_firing_for_raw_expr = task.keep_firing_for_raw_expr.unwrap_or_default();
let expr = PbCreateTriggerExpr {
catalog_name: task.catalog_name,
@@ -76,7 +89,11 @@ impl TryFrom<CreateTriggerTask> for PbCreateTriggerTask {
labels: task.labels,
annotations: task.annotations,
interval: Some(interval),
raw_interval_expr: task.raw_interval_expr,
raw_interval_expr,
r#for,
for_raw_expr,
keep_firing_for,
keep_firing_for_raw_expr,
};
Ok(PbCreateTriggerTask {
@@ -102,6 +119,26 @@ impl TryFrom<PbCreateTriggerTask> for CreateTriggerTask {
let interval = expr.interval.context(error::MissingIntervalSnafu)?;
let interval = interval.try_into().context(error::NegativeDurationSnafu)?;
let r#for = expr
.r#for
.map(Duration::try_from)
.transpose()
.context(error::NegativeDurationSnafu)?;
let keep_firing_for = expr
.keep_firing_for
.map(Duration::try_from)
.transpose()
.context(error::NegativeDurationSnafu)?;
let raw_interval_expr =
(!expr.raw_interval_expr.is_empty()).then_some(expr.raw_interval_expr);
let for_raw_expr = (!expr.for_raw_expr.is_empty()).then_some(expr.for_raw_expr);
let keep_firing_for_raw_expr =
(!expr.keep_firing_for_raw_expr.is_empty()).then_some(expr.keep_firing_for_raw_expr);
let task = CreateTriggerTask {
catalog_name: expr.catalog_name,
trigger_name: expr.trigger_name,
@@ -111,7 +148,11 @@ impl TryFrom<PbCreateTriggerTask> for CreateTriggerTask {
labels: expr.labels,
annotations: expr.annotations,
interval,
raw_interval_expr: expr.raw_interval_expr,
raw_interval_expr,
r#for,
for_raw_expr,
keep_firing_for,
keep_firing_for_raw_expr,
};
Ok(task)
}
@@ -271,7 +312,11 @@ mod tests {
.into_iter()
.collect(),
interval: Duration::from_secs(60),
raw_interval_expr: "'1 minute'::INTERVAL".to_string(),
raw_interval_expr: Some("'1 minute'::INTERVAL".to_string()),
r#for: Duration::from_secs(300).into(),
for_raw_expr: Some("'5 minute'::INTERVAL".to_string()),
keep_firing_for: Duration::from_secs(600).into(),
keep_firing_for_raw_expr: Some("'10 minute'::INTERVAL".to_string()),
};
let pb_task: PbCreateTriggerTask = original.clone().try_into().unwrap();
@@ -306,6 +351,14 @@ mod tests {
assert_eq!(original.labels, round_tripped.labels);
assert_eq!(original.annotations, round_tripped.annotations);
assert_eq!(original.interval, round_tripped.interval);
assert_eq!(original.raw_interval_expr, round_tripped.raw_interval_expr);
assert_eq!(original.r#for, round_tripped.r#for);
assert_eq!(original.for_raw_expr, round_tripped.for_raw_expr);
assert_eq!(original.keep_firing_for, round_tripped.keep_firing_for);
assert_eq!(
original.keep_firing_for_raw_expr,
round_tripped.keep_firing_for_raw_expr
);
// Invalid, since create_trigger is None and it's required.
let invalid_task = PbCreateTriggerTask {

View File

@@ -27,4 +27,9 @@ snafu.workspace = true
tokio.workspace = true
[dev-dependencies]
criterion = "0.7.0"
tokio.workspace = true
[[bench]]
name = "iter_record_batch_rows"
harness = false

View File

@@ -0,0 +1,179 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::hint::black_box;
use std::sync::Arc;
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use datafusion::arrow::array::{Int32Array, TimestampMillisecondArray};
use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
use datafusion_common::arrow::array::{ArrayRef, RecordBatch, StringArray};
use datafusion_common::arrow::datatypes::Schema;
use datafusion_common::{ScalarValue, utils};
use datatypes::arrow::array::AsArray;
use datatypes::arrow::datatypes::{
Int32Type, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType,
};
use datatypes::schema::SchemaRef;
fn prepare_record_batch(rows: usize) -> RecordBatch {
let schema = Schema::new(vec![
Field::new(
"ts",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("i", DataType::Int32, true),
Field::new("s", DataType::Utf8, true),
]);
let columns: Vec<ArrayRef> = vec![
Arc::new(TimestampMillisecondArray::from_iter_values(
(0..rows).map(|x| (1760313600000 + x) as i64),
)),
Arc::new(Int32Array::from_iter_values((0..rows).map(|x| x as i32))),
Arc::new(StringArray::from_iter((0..rows).map(|x| {
if x % 2 == 0 {
Some(format!("s_{x}"))
} else {
None
}
}))),
];
RecordBatch::try_new(Arc::new(schema), columns).unwrap()
}
fn iter_by_greptimedb_values(schema: SchemaRef, record_batch: RecordBatch) {
let record_batch =
common_recordbatch::RecordBatch::try_from_df_record_batch(schema, record_batch).unwrap();
for row in record_batch.rows() {
black_box(row);
}
}
fn iter_by_loop_rows_and_columns(record_batch: RecordBatch) {
for i in 0..record_batch.num_rows() {
for column in record_batch.columns() {
match column.data_type() {
DataType::Timestamp(time_unit, _) => {
let v = match time_unit {
TimeUnit::Second => {
let array = column.as_primitive::<TimestampSecondType>();
array.value(i)
}
TimeUnit::Millisecond => {
let array = column.as_primitive::<TimestampMillisecondType>();
array.value(i)
}
TimeUnit::Microsecond => {
let array = column.as_primitive::<TimestampMicrosecondType>();
array.value(i)
}
TimeUnit::Nanosecond => {
let array = column.as_primitive::<TimestampNanosecondType>();
array.value(i)
}
};
black_box(v);
}
DataType::Int32 => {
let array = column.as_primitive::<Int32Type>();
let v = array.value(i);
black_box(v);
}
DataType::Utf8 => {
let array = column.as_string::<i32>();
let v = array.value(i);
black_box(v);
}
_ => unreachable!(),
}
}
}
}
fn iter_by_datafusion_scalar_values(record_batch: RecordBatch) {
let columns = record_batch.columns();
for i in 0..record_batch.num_rows() {
let row = utils::get_row_at_idx(columns, i).unwrap();
black_box(row);
}
}
fn iter_by_datafusion_scalar_values_with_buf(record_batch: RecordBatch) {
let columns = record_batch.columns();
let mut buf = vec![ScalarValue::Null; columns.len()];
for i in 0..record_batch.num_rows() {
utils::extract_row_at_idx_to_buf(columns, i, &mut buf).unwrap();
}
}
pub fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("iter_record_batch");
for rows in [1usize, 10, 100, 1_000, 10_000] {
group.bench_with_input(
BenchmarkId::new("by_greptimedb_values", rows),
&rows,
|b, rows| {
let record_batch = prepare_record_batch(*rows);
let schema =
Arc::new(datatypes::schema::Schema::try_from(record_batch.schema()).unwrap());
b.iter(|| {
iter_by_greptimedb_values(schema.clone(), record_batch.clone());
})
},
);
group.bench_with_input(
BenchmarkId::new("by_loop_rows_and_columns", rows),
&rows,
|b, rows| {
let record_batch = prepare_record_batch(*rows);
b.iter(|| {
iter_by_loop_rows_and_columns(record_batch.clone());
})
},
);
group.bench_with_input(
BenchmarkId::new("by_datafusion_scalar_values", rows),
&rows,
|b, rows| {
let record_batch = prepare_record_batch(*rows);
b.iter(|| {
iter_by_datafusion_scalar_values(record_batch.clone());
})
},
);
group.bench_with_input(
BenchmarkId::new("by_datafusion_scalar_values_with_buf", rows),
&rows,
|b, rows| {
let record_batch = prepare_record_batch(*rows);
b.iter(|| {
iter_by_datafusion_scalar_values_with_buf(record_batch.clone());
})
},
);
}
group.finish();
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -33,7 +33,7 @@ use datatypes::arrow::util::pretty;
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::types::json_type_value_to_string;
use datatypes::types::{JsonFormat, jsonb_to_string};
use datatypes::vectors::{BinaryVector, StringVectorBuilder};
use error::Result;
use futures::task::{Context, Poll};
@@ -90,32 +90,34 @@ pub fn map_json_type_to_string(
) -> Result<RecordBatch> {
let mut vectors = Vec::with_capacity(original_schema.column_schemas().len());
for (vector, schema) in batch.columns.iter().zip(original_schema.column_schemas()) {
if let ConcreteDataType::Json(j) = schema.data_type {
let mut string_vector_builder = StringVectorBuilder::with_capacity(vector.len());
let binary_vector = vector
.as_any()
.downcast_ref::<BinaryVector>()
.with_context(|| error::DowncastVectorSnafu {
from_type: schema.data_type.clone(),
to_type: ConcreteDataType::binary_datatype(),
})?;
for value in binary_vector.iter_data() {
let Some(value) = value else {
string_vector_builder.push(None);
continue;
};
let string_value =
json_type_value_to_string(value, &j.format).with_context(|_| {
error::CastVectorSnafu {
if let ConcreteDataType::Json(j) = &schema.data_type {
if matches!(&j.format, JsonFormat::Jsonb) {
let mut string_vector_builder = StringVectorBuilder::with_capacity(vector.len());
let binary_vector = vector
.as_any()
.downcast_ref::<BinaryVector>()
.with_context(|| error::DowncastVectorSnafu {
from_type: schema.data_type.clone(),
to_type: ConcreteDataType::binary_datatype(),
})?;
for value in binary_vector.iter_data() {
let Some(value) = value else {
string_vector_builder.push(None);
continue;
};
let string_value =
jsonb_to_string(value).with_context(|_| error::CastVectorSnafu {
from_type: schema.data_type.clone(),
to_type: ConcreteDataType::string_datatype(),
}
})?;
string_vector_builder.push(Some(string_value.as_str()));
}
})?;
string_vector_builder.push(Some(string_value.as_str()));
}
let string_vector = string_vector_builder.finish();
vectors.push(Arc::new(string_vector) as VectorRef);
let string_vector = string_vector_builder.finish();
vectors.push(Arc::new(string_vector) as VectorRef);
} else {
vectors.push(vector.clone());
}
} else {
vectors.push(vector.clone());
}

View File

@@ -16,9 +16,10 @@ use std::str::FromStr;
use common_time::Timestamp;
use common_time::timezone::Timezone;
use datatypes::json::JsonStructureSettings;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnDefaultConstraint;
use datatypes::types::{parse_string_to_json_type_value, parse_string_to_vector_type_value};
use datatypes::types::{JsonFormat, parse_string_to_jsonb, parse_string_to_vector_type_value};
use datatypes::value::{OrderedF32, OrderedF64, Value};
use snafu::{OptionExt, ResultExt, ensure};
pub use sqlparser::ast::{
@@ -210,7 +211,8 @@ pub fn sql_value_to_value(
| Value::Duration(_)
| Value::IntervalYearMonth(_)
| Value::IntervalDayTime(_)
| Value::IntervalMonthDayNano(_) => match unary_op {
| Value::IntervalMonthDayNano(_)
| Value::Json(_) => match unary_op {
UnaryOperator::Plus => {}
UnaryOperator::Minus => {
value = value
@@ -297,8 +299,21 @@ pub(crate) fn parse_string_to_value(
}
ConcreteDataType::Binary(_) => Ok(Value::Binary(s.as_bytes().into())),
ConcreteDataType::Json(j) => {
let v = parse_string_to_json_type_value(&s, &j.format).context(DatatypeSnafu)?;
Ok(Value::Binary(v.into()))
match &j.format {
JsonFormat::Jsonb => {
let v = parse_string_to_jsonb(&s).context(DatatypeSnafu)?;
Ok(Value::Binary(v.into()))
}
JsonFormat::Native(_inner) => {
// Always use the structured version at this level.
let serde_json_value =
serde_json::from_str(&s).context(DeserializeSnafu { json: s })?;
let json_structure_settings = JsonStructureSettings::Structured(None);
json_structure_settings
.encode(serde_json_value)
.context(DatatypeSnafu)
}
}
}
ConcreteDataType::Vector(d) => {
let v = parse_string_to_vector_type_value(&s, Some(d.dim)).context(DatatypeSnafu)?;

View File

@@ -5,9 +5,12 @@ edition.workspace = true
license.workspace = true
[dependencies]
common-base.workspace = true
lazy_static.workspace = true
nix.workspace = true
num_cpus.workspace = true
prometheus.workspace = true
sysinfo.workspace = true
[lints]
workspace = true

View File

@@ -23,9 +23,6 @@ use prometheus::core::{Collector, Desc};
use prometheus::proto::MetricFamily;
use prometheus::{IntGauge, Opts};
/// `MAX_VALUE` is used to indicate that the resource is unlimited.
pub const MAX_VALUE: i64 = -1;
const CGROUP_UNIFIED_MOUNTPOINT: &str = "/sys/fs/cgroup";
const MEMORY_MAX_FILE_CGROUP_V2: &str = "memory.max";
@@ -43,11 +40,11 @@ const MAX_VALUE_CGROUP_V2: &str = "max";
// For easier comparison, if the memory limit is larger than 1PB we consider it as unlimited.
const MAX_MEMORY_IN_BYTES: i64 = 1125899906842624; // 1PB
/// Get the limit of memory in bytes.
/// Get the limit of memory in bytes from cgroups filesystem.
///
/// - If the memory is unlimited, return `-1`.
/// - If the cgroup total memory is unset, return `None`.
/// - Return `None` if it fails to read the memory limit or not on linux.
pub fn get_memory_limit() -> Option<i64> {
pub fn get_memory_limit_from_cgroups() -> Option<i64> {
#[cfg(target_os = "linux")]
{
let memory_max_file = if is_cgroup_v2()? {
@@ -58,13 +55,13 @@ pub fn get_memory_limit() -> Option<i64> {
MEMORY_MAX_FILE_CGROUP_V1
};
// For cgroup v1, it will return a very large value(different from platform) if the memory is unlimited.
// For cgroup v1, it will return a very large value(different from platform) if the memory is unset.
let memory_limit =
read_value_from_file(Path::new(CGROUP_UNIFIED_MOUNTPOINT).join(memory_max_file))?;
// If memory limit exceeds 1PB(cgroup v1), consider it as unlimited.
// If memory limit exceeds 1PB(cgroup v1), consider it as unset.
if memory_limit > MAX_MEMORY_IN_BYTES {
return Some(MAX_VALUE);
return None;
}
Some(memory_limit)
}
@@ -73,10 +70,10 @@ pub fn get_memory_limit() -> Option<i64> {
None
}
/// Get the usage of memory in bytes.
/// Get the usage of memory in bytes from cgroups filesystem.
///
/// - Return `None` if it fails to read the memory usage or not on linux or cgroup is v1.
pub fn get_memory_usage() -> Option<i64> {
pub fn get_memory_usage_from_cgroups() -> Option<i64> {
#[cfg(target_os = "linux")]
{
if is_cgroup_v2()? {
@@ -93,11 +90,11 @@ pub fn get_memory_usage() -> Option<i64> {
None
}
/// Get the limit of cpu in millicores.
/// Get the limit of cpu in millicores from cgroups filesystem.
///
/// - If the cpu is unlimited, return `-1`.
/// - If the cpu limit is unset, return `None`.
/// - Return `None` if it fails to read the cpu limit or not on linux.
pub fn get_cpu_limit() -> Option<i64> {
pub fn get_cpu_limit_from_cgroups() -> Option<i64> {
#[cfg(target_os = "linux")]
if is_cgroup_v2()? {
// Read `/sys/fs/cgroup/cpu.max` to get the cpu limit.
@@ -108,10 +105,6 @@ pub fn get_cpu_limit() -> Option<i64> {
Path::new(CGROUP_UNIFIED_MOUNTPOINT).join(CPU_QUOTA_FILE_CGROUP_V1),
)?;
if quota == MAX_VALUE {
return Some(MAX_VALUE);
}
let period = read_value_from_file(
Path::new(CGROUP_UNIFIED_MOUNTPOINT).join(CPU_PERIOD_FILE_CGROUP_V1),
)?;
@@ -167,9 +160,9 @@ fn is_cgroup_v2() -> Option<bool> {
fn read_value_from_file<P: AsRef<Path>>(path: P) -> Option<i64> {
let content = read_to_string(&path).ok()?;
// If the content starts with "max", return `MAX_VALUE`.
// If the content starts with "max", return `None`.
if content.starts_with(MAX_VALUE_CGROUP_V2) {
return Some(MAX_VALUE);
return None;
}
content.trim().parse::<i64>().ok()
@@ -183,10 +176,10 @@ fn get_cgroup_v2_cpu_limit<P: AsRef<Path>>(path: P) -> Option<i64> {
return None;
}
// If the cpu is unlimited, it will be `-1`.
// If the cgroup cpu limit is unset, return `None`.
let quota = fields[0].trim();
if quota == MAX_VALUE_CGROUP_V2 {
return Some(MAX_VALUE);
return None;
}
let quota = quota.parse::<i64>().ok()?;
@@ -241,7 +234,7 @@ impl Collector for CgroupsMetricsCollector {
self.cpu_usage.set(cpu_usage);
}
if let Some(memory_usage) = get_memory_usage() {
if let Some(memory_usage) = get_memory_usage_from_cgroups() {
self.memory_usage.set(memory_usage);
}
@@ -263,8 +256,8 @@ mod tests {
100000
);
assert_eq!(
read_value_from_file(Path::new("testdata").join("memory.max.unlimited")).unwrap(),
MAX_VALUE
read_value_from_file(Path::new("testdata").join("memory.max.unlimited")),
None
);
assert_eq!(read_value_from_file(Path::new("non_existent_file")), None);
}
@@ -276,8 +269,8 @@ mod tests {
1500
);
assert_eq!(
get_cgroup_v2_cpu_limit(Path::new("testdata").join("cpu.max.unlimited")).unwrap(),
MAX_VALUE
get_cgroup_v2_cpu_limit(Path::new("testdata").join("cpu.max.unlimited")),
None
);
assert_eq!(
get_cgroup_v2_cpu_limit(Path::new("non_existent_file")),

View File

@@ -15,3 +15,64 @@
mod cgroups;
pub use cgroups::*;
use common_base::readable_size::ReadableSize;
use sysinfo::System;
/// Get the total CPU in millicores.
pub fn get_total_cpu_millicores() -> i64 {
// Get CPU limit from cgroups filesystem.
if let Some(cgroup_cpu_limit) = get_cpu_limit_from_cgroups() {
cgroup_cpu_limit
} else {
// Get total CPU cores from host system.
num_cpus::get() as i64 * 1000
}
}
/// Get the total memory in bytes.
pub fn get_total_memory_bytes() -> i64 {
// Get memory limit from cgroups filesystem.
if let Some(cgroup_memory_limit) = get_memory_limit_from_cgroups() {
cgroup_memory_limit
} else {
// Get total memory from host system.
if sysinfo::IS_SUPPORTED_SYSTEM {
let mut sys_info = System::new();
sys_info.refresh_memory();
sys_info.total_memory() as i64
} else {
// If the system is not supported, return -1.
-1
}
}
}
/// Get the total CPU cores. The result will be rounded to the nearest integer.
/// For example, if the total CPU is 1.5 cores(1500 millicores), the result will be 2.
pub fn get_total_cpu_cores() -> usize {
((get_total_cpu_millicores() as f64) / 1000.0).round() as usize
}
/// Get the total memory in readable size.
pub fn get_total_memory_readable() -> Option<ReadableSize> {
if get_total_memory_bytes() > 0 {
Some(ReadableSize(get_total_memory_bytes() as u64))
} else {
None
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_total_cpu_cores() {
assert!(get_total_cpu_cores() > 0);
}
#[test]
fn test_get_total_memory_readable() {
assert!(get_total_memory_readable().unwrap() > ReadableSize::mb(0));
}
}

View File

@@ -25,6 +25,7 @@ use crate::config::kafka::common::{
};
use crate::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use crate::config::raft_engine::RaftEngineConfig;
use crate::error::{Error, UnsupportedWalProviderSnafu};
/// Wal configurations for metasrv.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
@@ -43,6 +44,7 @@ pub enum MetasrvWalConfig {
pub enum DatanodeWalConfig {
RaftEngine(RaftEngineConfig),
Kafka(DatanodeKafkaConfig),
Noop,
}
impl Default for DatanodeWalConfig {
@@ -51,11 +53,13 @@ impl Default for DatanodeWalConfig {
}
}
impl From<DatanodeWalConfig> for MetasrvWalConfig {
fn from(config: DatanodeWalConfig) -> Self {
impl TryFrom<DatanodeWalConfig> for MetasrvWalConfig {
type Error = Error;
fn try_from(config: DatanodeWalConfig) -> Result<Self, Self::Error> {
match config {
DatanodeWalConfig::RaftEngine(_) => Self::RaftEngine,
DatanodeWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig {
DatanodeWalConfig::RaftEngine(_) => Ok(Self::RaftEngine),
DatanodeWalConfig::Kafka(config) => Ok(Self::Kafka(MetasrvKafkaConfig {
connection: config.connection,
kafka_topic: config.kafka_topic,
auto_create_topics: config.auto_create_topics,
@@ -67,7 +71,11 @@ impl From<DatanodeWalConfig> for MetasrvWalConfig {
flush_trigger_size: DEFAULT_FLUSH_TRIGGER_SIZE,
// This field won't be used in standalone mode
checkpoint_trigger_size: DEFAULT_CHECKPOINT_TRIGGER_SIZE,
}),
})),
DatanodeWalConfig::Noop => UnsupportedWalProviderSnafu {
provider: "noop".to_string(),
}
.fail(),
}
}
}

View File

@@ -92,6 +92,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported WAL provider: {}", provider))]
UnsupportedWalProvider {
provider: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;