mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-04 22:30:37 +00:00
refactor: remove table ident (#2368)
* refactor: 1. remove TableIdent, use TableId directly 2. use the latest greptime-proto 3. independently invalidate table id cache and table name cache * rebase * fix: resolve PR comments * fix: resolve PR comments
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -4103,7 +4103,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1969b5d610209ad1c2ad8e4cd3e9c3c24e40f1c2#1969b5d610209ad1c2ad8e4cd3e9c3c24e40f1c2"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=81495b166b2c8909f05b3fcaa09eb299bb43a995#81495b166b2c8909f05b3fcaa09eb299bb43a995"
|
||||
dependencies = [
|
||||
"prost",
|
||||
"serde",
|
||||
|
||||
@@ -78,7 +78,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git
|
||||
derive_builder = "0.12"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1969b5d610209ad1c2ad8e4cd3e9c3c24e40f1c2" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "81495b166b2c8909f05b3fcaa09eb299bb43a995" }
|
||||
humantime-serde = "1.1"
|
||||
itertools = "0.10"
|
||||
lazy_static = "1.4"
|
||||
|
||||
@@ -105,7 +105,6 @@ async fn write_data(
|
||||
let (columns, row_count) = convert_record_batch(record_batch);
|
||||
let request = InsertRequest {
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
region_number: 0,
|
||||
columns,
|
||||
row_count,
|
||||
};
|
||||
@@ -189,7 +188,7 @@ fn build_values(column: &ArrayRef) -> (Values, ColumnDataType) {
|
||||
let values = array.values();
|
||||
(
|
||||
Values {
|
||||
ts_microsecond_values: values.to_vec(),
|
||||
timestamp_microsecond_values: values.to_vec(),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnDataType::TimestampMicrosecond,
|
||||
@@ -389,7 +388,6 @@ fn create_table_expr() -> CreateTableExpr {
|
||||
primary_keys: vec!["VendorID".to_string()],
|
||||
create_if_not_exists: false,
|
||||
table_options: Default::default(),
|
||||
region_numbers: vec![0],
|
||||
table_id: None,
|
||||
engine: "mito".to_string(),
|
||||
}
|
||||
|
||||
@@ -214,19 +214,19 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values
|
||||
..Default::default()
|
||||
},
|
||||
ColumnDataType::TimestampSecond => Values {
|
||||
ts_second_values: Vec::with_capacity(capacity),
|
||||
timestamp_second_values: Vec::with_capacity(capacity),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnDataType::TimestampMillisecond => Values {
|
||||
ts_millisecond_values: Vec::with_capacity(capacity),
|
||||
timestamp_millisecond_values: Vec::with_capacity(capacity),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnDataType::TimestampMicrosecond => Values {
|
||||
ts_microsecond_values: Vec::with_capacity(capacity),
|
||||
timestamp_microsecond_values: Vec::with_capacity(capacity),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnDataType::TimestampNanosecond => Values {
|
||||
ts_nanosecond_values: Vec::with_capacity(capacity),
|
||||
timestamp_nanosecond_values: Vec::with_capacity(capacity),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnDataType::TimeSecond => Values {
|
||||
@@ -286,10 +286,10 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) {
|
||||
Value::Date(val) => values.date_values.push(val.val()),
|
||||
Value::DateTime(val) => values.datetime_values.push(val.val()),
|
||||
Value::Timestamp(val) => match val.unit() {
|
||||
TimeUnit::Second => values.ts_second_values.push(val.value()),
|
||||
TimeUnit::Millisecond => values.ts_millisecond_values.push(val.value()),
|
||||
TimeUnit::Microsecond => values.ts_microsecond_values.push(val.value()),
|
||||
TimeUnit::Nanosecond => values.ts_nanosecond_values.push(val.value()),
|
||||
TimeUnit::Second => values.timestamp_second_values.push(val.value()),
|
||||
TimeUnit::Millisecond => values.timestamp_millisecond_values.push(val.value()),
|
||||
TimeUnit::Microsecond => values.timestamp_microsecond_values.push(val.value()),
|
||||
TimeUnit::Nanosecond => values.timestamp_nanosecond_values.push(val.value()),
|
||||
},
|
||||
Value::Time(val) => match val.unit() {
|
||||
TimeUnit::Second => values.time_second_values.push(val.value()),
|
||||
@@ -375,10 +375,16 @@ pub fn pb_value_to_value_ref(value: &v1::Value) -> ValueRef {
|
||||
ValueData::StringValue(string) => ValueRef::String(string.as_str()),
|
||||
ValueData::DateValue(d) => ValueRef::Date(Date::from(*d)),
|
||||
ValueData::DatetimeValue(d) => ValueRef::DateTime(DateTime::new(*d)),
|
||||
ValueData::TsSecondValue(t) => ValueRef::Timestamp(Timestamp::new_second(*t)),
|
||||
ValueData::TsMillisecondValue(t) => ValueRef::Timestamp(Timestamp::new_millisecond(*t)),
|
||||
ValueData::TsMicrosecondValue(t) => ValueRef::Timestamp(Timestamp::new_microsecond(*t)),
|
||||
ValueData::TsNanosecondValue(t) => ValueRef::Timestamp(Timestamp::new_nanosecond(*t)),
|
||||
ValueData::TimestampSecondValue(t) => ValueRef::Timestamp(Timestamp::new_second(*t)),
|
||||
ValueData::TimestampMillisecondValue(t) => {
|
||||
ValueRef::Timestamp(Timestamp::new_millisecond(*t))
|
||||
}
|
||||
ValueData::TimestampMicrosecondValue(t) => {
|
||||
ValueRef::Timestamp(Timestamp::new_microsecond(*t))
|
||||
}
|
||||
ValueData::TimestampNanosecondValue(t) => {
|
||||
ValueRef::Timestamp(Timestamp::new_nanosecond(*t))
|
||||
}
|
||||
ValueData::TimeSecondValue(t) => ValueRef::Time(Time::new_second(*t)),
|
||||
ValueData::TimeMillisecondValue(t) => ValueRef::Time(Time::new_millisecond(*t)),
|
||||
ValueData::TimeMicrosecondValue(t) => ValueRef::Time(Time::new_microsecond(*t)),
|
||||
@@ -418,17 +424,17 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) ->
|
||||
ConcreteDataType::Date(_) => Arc::new(DateVector::from_vec(values.date_values)),
|
||||
ConcreteDataType::DateTime(_) => Arc::new(DateTimeVector::from_vec(values.datetime_values)),
|
||||
ConcreteDataType::Timestamp(unit) => match unit {
|
||||
TimestampType::Second(_) => {
|
||||
Arc::new(TimestampSecondVector::from_vec(values.ts_second_values))
|
||||
}
|
||||
TimestampType::Second(_) => Arc::new(TimestampSecondVector::from_vec(
|
||||
values.timestamp_second_values,
|
||||
)),
|
||||
TimestampType::Millisecond(_) => Arc::new(TimestampMillisecondVector::from_vec(
|
||||
values.ts_millisecond_values,
|
||||
values.timestamp_millisecond_values,
|
||||
)),
|
||||
TimestampType::Microsecond(_) => Arc::new(TimestampMicrosecondVector::from_vec(
|
||||
values.ts_microsecond_values,
|
||||
values.timestamp_microsecond_values,
|
||||
)),
|
||||
TimestampType::Nanosecond(_) => Arc::new(TimestampNanosecondVector::from_vec(
|
||||
values.ts_nanosecond_values,
|
||||
values.timestamp_nanosecond_values,
|
||||
)),
|
||||
},
|
||||
ConcreteDataType::Time(unit) => match unit {
|
||||
@@ -550,22 +556,22 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec<
|
||||
.map(|v| Value::Date(v.into()))
|
||||
.collect(),
|
||||
ConcreteDataType::Timestamp(TimestampType::Second(_)) => values
|
||||
.ts_second_values
|
||||
.timestamp_second_values
|
||||
.into_iter()
|
||||
.map(|v| Value::Timestamp(Timestamp::new_second(v)))
|
||||
.collect(),
|
||||
ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => values
|
||||
.ts_millisecond_values
|
||||
.timestamp_millisecond_values
|
||||
.into_iter()
|
||||
.map(|v| Value::Timestamp(Timestamp::new_millisecond(v)))
|
||||
.collect(),
|
||||
ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => values
|
||||
.ts_microsecond_values
|
||||
.timestamp_microsecond_values
|
||||
.into_iter()
|
||||
.map(|v| Value::Timestamp(Timestamp::new_microsecond(v)))
|
||||
.collect(),
|
||||
ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => values
|
||||
.ts_nanosecond_values
|
||||
.timestamp_nanosecond_values
|
||||
.into_iter()
|
||||
.map(|v| Value::Timestamp(Timestamp::new_nanosecond(v)))
|
||||
.collect(),
|
||||
@@ -682,16 +688,16 @@ pub fn to_proto_value(value: Value) -> Option<v1::Value> {
|
||||
},
|
||||
Value::Timestamp(v) => match v.unit() {
|
||||
TimeUnit::Second => v1::Value {
|
||||
value_data: Some(ValueData::TsSecondValue(v.value())),
|
||||
value_data: Some(ValueData::TimestampSecondValue(v.value())),
|
||||
},
|
||||
TimeUnit::Millisecond => v1::Value {
|
||||
value_data: Some(ValueData::TsMillisecondValue(v.value())),
|
||||
value_data: Some(ValueData::TimestampMillisecondValue(v.value())),
|
||||
},
|
||||
TimeUnit::Microsecond => v1::Value {
|
||||
value_data: Some(ValueData::TsMicrosecondValue(v.value())),
|
||||
value_data: Some(ValueData::TimestampMicrosecondValue(v.value())),
|
||||
},
|
||||
TimeUnit::Nanosecond => v1::Value {
|
||||
value_data: Some(ValueData::TsNanosecondValue(v.value())),
|
||||
value_data: Some(ValueData::TimestampNanosecondValue(v.value())),
|
||||
},
|
||||
},
|
||||
Value::Time(v) => match v.unit() {
|
||||
@@ -747,10 +753,10 @@ pub fn proto_value_type(value: &v1::Value) -> Option<ColumnDataType> {
|
||||
ValueData::StringValue(_) => ColumnDataType::String,
|
||||
ValueData::DateValue(_) => ColumnDataType::Date,
|
||||
ValueData::DatetimeValue(_) => ColumnDataType::Datetime,
|
||||
ValueData::TsSecondValue(_) => ColumnDataType::TimestampSecond,
|
||||
ValueData::TsMillisecondValue(_) => ColumnDataType::TimestampMillisecond,
|
||||
ValueData::TsMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond,
|
||||
ValueData::TsNanosecondValue(_) => ColumnDataType::TimestampNanosecond,
|
||||
ValueData::TimestampSecondValue(_) => ColumnDataType::TimestampSecond,
|
||||
ValueData::TimestampMillisecondValue(_) => ColumnDataType::TimestampMillisecond,
|
||||
ValueData::TimestampMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond,
|
||||
ValueData::TimestampNanosecondValue(_) => ColumnDataType::TimestampNanosecond,
|
||||
ValueData::TimeSecondValue(_) => ColumnDataType::TimeSecond,
|
||||
ValueData::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond,
|
||||
ValueData::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond,
|
||||
@@ -837,10 +843,10 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue {
|
||||
Value::Date(v) => Some(ValueData::DateValue(v.val())),
|
||||
Value::DateTime(v) => Some(ValueData::DatetimeValue(v.val())),
|
||||
Value::Timestamp(v) => Some(match v.unit() {
|
||||
TimeUnit::Second => ValueData::TsSecondValue(v.value()),
|
||||
TimeUnit::Millisecond => ValueData::TsMillisecondValue(v.value()),
|
||||
TimeUnit::Microsecond => ValueData::TsMicrosecondValue(v.value()),
|
||||
TimeUnit::Nanosecond => ValueData::TsNanosecondValue(v.value()),
|
||||
TimeUnit::Second => ValueData::TimestampSecondValue(v.value()),
|
||||
TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(v.value()),
|
||||
TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(v.value()),
|
||||
TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(v.value()),
|
||||
}),
|
||||
Value::Time(v) => Some(match v.unit() {
|
||||
TimeUnit::Second => ValueData::TimeSecondValue(v.value()),
|
||||
@@ -943,7 +949,7 @@ mod tests {
|
||||
assert_eq!(2, values.capacity());
|
||||
|
||||
let values = values_with_capacity(ColumnDataType::TimestampMillisecond, 2);
|
||||
let values = values.ts_millisecond_values;
|
||||
let values = values.timestamp_millisecond_values;
|
||||
assert_eq!(2, values.capacity());
|
||||
|
||||
let values = values_with_capacity(ColumnDataType::TimeMillisecond, 2);
|
||||
@@ -1162,28 +1168,28 @@ mod tests {
|
||||
push_vals(&mut column, 3, vector);
|
||||
assert_eq!(
|
||||
vec![1, 2, 3],
|
||||
column.values.as_ref().unwrap().ts_nanosecond_values
|
||||
column.values.as_ref().unwrap().timestamp_nanosecond_values
|
||||
);
|
||||
|
||||
let vector = Arc::new(TimestampMillisecondVector::from_vec(vec![4, 5, 6]));
|
||||
push_vals(&mut column, 3, vector);
|
||||
assert_eq!(
|
||||
vec![4, 5, 6],
|
||||
column.values.as_ref().unwrap().ts_millisecond_values
|
||||
column.values.as_ref().unwrap().timestamp_millisecond_values
|
||||
);
|
||||
|
||||
let vector = Arc::new(TimestampMicrosecondVector::from_vec(vec![7, 8, 9]));
|
||||
push_vals(&mut column, 3, vector);
|
||||
assert_eq!(
|
||||
vec![7, 8, 9],
|
||||
column.values.as_ref().unwrap().ts_microsecond_values
|
||||
column.values.as_ref().unwrap().timestamp_microsecond_values
|
||||
);
|
||||
|
||||
let vector = Arc::new(TimestampSecondVector::from_vec(vec![10, 11, 12]));
|
||||
push_vals(&mut column, 3, vector);
|
||||
assert_eq!(
|
||||
vec![10, 11, 12],
|
||||
column.values.as_ref().unwrap().ts_second_values
|
||||
column.values.as_ref().unwrap().timestamp_second_values
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1312,7 +1318,7 @@ mod tests {
|
||||
let actual = pb_values_to_values(
|
||||
&ConcreteDataType::Timestamp(TimestampType::Second(TimestampSecondType)),
|
||||
Values {
|
||||
ts_second_values: vec![1_i64, 2_i64, 3_i64],
|
||||
timestamp_second_values: vec![1_i64, 2_i64, 3_i64],
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
@@ -1327,7 +1333,7 @@ mod tests {
|
||||
let actual = pb_values_to_values(
|
||||
&ConcreteDataType::Timestamp(TimestampType::Millisecond(TimestampMillisecondType)),
|
||||
Values {
|
||||
ts_millisecond_values: vec![1_i64, 2_i64, 3_i64],
|
||||
timestamp_millisecond_values: vec![1_i64, 2_i64, 3_i64],
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
|
||||
@@ -20,6 +20,7 @@ use common_error::status_code::StatusCode;
|
||||
use datafusion::error::DataFusionError;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use snafu::{Location, Snafu};
|
||||
use table::metadata::TableId;
|
||||
use tokio::task::JoinError;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
@@ -140,9 +141,9 @@ pub enum Error {
|
||||
#[snafu(display("Operation {} not supported", op))]
|
||||
NotSupported { op: String, location: Location },
|
||||
|
||||
#[snafu(display("Failed to open table, table info: {}, source: {}", table_info, source))]
|
||||
#[snafu(display("Failed to open table {table_id}, source: {source}, at {location}"))]
|
||||
OpenTable {
|
||||
table_info: String,
|
||||
table_id: TableId,
|
||||
location: Location,
|
||||
source: table::error::Error,
|
||||
},
|
||||
|
||||
@@ -66,7 +66,6 @@ async fn run() {
|
||||
create_if_not_exists: false,
|
||||
table_options: Default::default(),
|
||||
table_id: Some(TableId { id: 1024 }),
|
||||
region_numbers: vec![0],
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
};
|
||||
|
||||
|
||||
@@ -131,7 +131,7 @@ fn to_insert_request(records: Vec<WeatherRecord>) -> InsertRequest {
|
||||
Column {
|
||||
column_name: "ts".to_owned(),
|
||||
values: Some(column::Values {
|
||||
ts_millisecond_values: timestamp_millis,
|
||||
timestamp_millisecond_values: timestamp_millis,
|
||||
..Default::default()
|
||||
}),
|
||||
semantic_type: SemanticType::Timestamp as i32,
|
||||
@@ -177,6 +177,5 @@ fn to_insert_request(records: Vec<WeatherRecord>) -> InsertRequest {
|
||||
table_name: "weather_demo".to_owned(),
|
||||
columns,
|
||||
row_count: rows as u32,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,18 +18,15 @@ use api::v1::{
|
||||
column_def, AddColumnLocation as Location, AlterExpr, CreateTableExpr, DropColumns,
|
||||
RenameTable, SemanticType,
|
||||
};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_query::AddColumnLocation;
|
||||
use datatypes::schema::{ColumnSchema, RawSchema};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use table::metadata::TableId;
|
||||
use table::requests::{
|
||||
AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest, TableOptions,
|
||||
};
|
||||
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest};
|
||||
|
||||
use crate::error::{
|
||||
ColumnNotFoundSnafu, InvalidColumnDefSnafu, MissingFieldSnafu, MissingTimestampColumnSnafu,
|
||||
Result, UnknownLocationTypeSnafu, UnrecognizedTableOptionSnafu,
|
||||
InvalidColumnDefSnafu, MissingFieldSnafu, MissingTimestampColumnSnafu, Result,
|
||||
UnknownLocationTypeSnafu,
|
||||
};
|
||||
|
||||
const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32;
|
||||
@@ -121,65 +118,6 @@ pub fn create_table_schema(expr: &CreateTableExpr, require_time_index: bool) ->
|
||||
Ok(RawSchema::new(column_schemas))
|
||||
}
|
||||
|
||||
pub fn create_expr_to_request(
|
||||
table_id: TableId,
|
||||
expr: CreateTableExpr,
|
||||
require_time_index: bool,
|
||||
) -> Result<CreateTableRequest> {
|
||||
let schema = create_table_schema(&expr, require_time_index)?;
|
||||
let primary_key_indices = expr
|
||||
.primary_keys
|
||||
.iter()
|
||||
.map(|key| {
|
||||
// We do a linear search here.
|
||||
schema
|
||||
.column_schemas
|
||||
.iter()
|
||||
.position(|column_schema| column_schema.name == *key)
|
||||
.context(ColumnNotFoundSnafu {
|
||||
column_name: key,
|
||||
table_name: &expr.table_name,
|
||||
})
|
||||
})
|
||||
.collect::<Result<Vec<usize>>>()?;
|
||||
|
||||
let mut catalog_name = expr.catalog_name;
|
||||
if catalog_name.is_empty() {
|
||||
catalog_name = DEFAULT_CATALOG_NAME.to_string();
|
||||
}
|
||||
let mut schema_name = expr.schema_name;
|
||||
if schema_name.is_empty() {
|
||||
schema_name = DEFAULT_SCHEMA_NAME.to_string();
|
||||
}
|
||||
let desc = if expr.desc.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(expr.desc)
|
||||
};
|
||||
|
||||
let region_numbers = if expr.region_numbers.is_empty() {
|
||||
vec![0]
|
||||
} else {
|
||||
expr.region_numbers
|
||||
};
|
||||
|
||||
let table_options =
|
||||
TableOptions::try_from(&expr.table_options).context(UnrecognizedTableOptionSnafu)?;
|
||||
Ok(CreateTableRequest {
|
||||
id: table_id,
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name: expr.table_name,
|
||||
desc,
|
||||
schema,
|
||||
region_numbers,
|
||||
primary_key_indices,
|
||||
create_if_not_exists: expr.create_if_not_exists,
|
||||
table_options,
|
||||
engine: expr.engine,
|
||||
})
|
||||
}
|
||||
|
||||
fn parse_location(location: Option<Location>) -> Result<Option<AddColumnLocation>> {
|
||||
match location {
|
||||
Some(Location {
|
||||
|
||||
@@ -79,7 +79,6 @@ mod tests {
|
||||
fn test_to_table_delete_request() {
|
||||
let grpc_request = GrpcDeleteRequest {
|
||||
table_name: "foo".to_string(),
|
||||
region_number: 0,
|
||||
key_columns: vec![
|
||||
Column {
|
||||
column_name: "id".to_string(),
|
||||
|
||||
@@ -21,12 +21,6 @@ use snafu::{Location, Snafu};
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum Error {
|
||||
#[snafu(display("Column `{}` not found in table `{}`", column_name, table_name))]
|
||||
ColumnNotFound {
|
||||
column_name: String,
|
||||
table_name: String,
|
||||
},
|
||||
|
||||
#[snafu(display("Illegal delete request, reason: {reason}"))]
|
||||
IllegalDeleteRequest { reason: String, location: Location },
|
||||
|
||||
@@ -75,18 +69,9 @@ pub enum Error {
|
||||
source: api::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Unrecognized table option: {}", source))]
|
||||
UnrecognizedTableOption {
|
||||
location: Location,
|
||||
source: table::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Unexpected values length, reason: {}", reason))]
|
||||
UnexpectedValuesLength { reason: String, location: Location },
|
||||
|
||||
#[snafu(display("The column name already exists, column: {}", column))]
|
||||
ColumnAlreadyExists { column: String, location: Location },
|
||||
|
||||
#[snafu(display("Unknown location type: {}", location_type))]
|
||||
UnknownLocationType {
|
||||
location_type: i32,
|
||||
@@ -99,8 +84,6 @@ pub type Result<T> = std::result::Result<T, Error>;
|
||||
impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
match self {
|
||||
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
|
||||
|
||||
Error::IllegalDeleteRequest { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Error::ColumnDataType { .. } => StatusCode::Internal,
|
||||
@@ -111,10 +94,9 @@ impl ErrorExt for Error {
|
||||
Error::CreateVector { .. } => StatusCode::InvalidArguments,
|
||||
Error::MissingField { .. } => StatusCode::InvalidArguments,
|
||||
Error::InvalidColumnDef { source, .. } => source.status_code(),
|
||||
Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,
|
||||
Error::UnexpectedValuesLength { .. }
|
||||
| Error::ColumnAlreadyExists { .. }
|
||||
| Error::UnknownLocationType { .. } => StatusCode::InvalidArguments,
|
||||
Error::UnexpectedValuesLength { .. } | Error::UnknownLocationType { .. } => {
|
||||
StatusCode::InvalidArguments
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,12 +12,9 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::helper;
|
||||
use api::helper::ColumnDataTypeWrapper;
|
||||
use api::v1::column::Values;
|
||||
use api::v1::{AddColumns, Column, CreateTableExpr, InsertRequest as GrpcInsertRequest};
|
||||
use api::v1::{AddColumns, Column, CreateTableExpr};
|
||||
use common_base::BitVec;
|
||||
use datatypes::data_type::{ConcreteDataType, DataType};
|
||||
use datatypes::prelude::VectorRef;
|
||||
@@ -25,12 +22,8 @@ use datatypes::schema::SchemaRef;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use table::engine::TableReference;
|
||||
use table::metadata::TableId;
|
||||
use table::requests::InsertRequest;
|
||||
|
||||
use crate::error::{
|
||||
ColumnAlreadyExistsSnafu, ColumnDataTypeSnafu, CreateVectorSnafu, Result,
|
||||
UnexpectedValuesLengthSnafu,
|
||||
};
|
||||
use crate::error::{CreateVectorSnafu, Result, UnexpectedValuesLengthSnafu};
|
||||
use crate::util;
|
||||
use crate::util::ColumnExpr;
|
||||
|
||||
@@ -59,47 +52,6 @@ pub fn build_create_expr_from_insertion(
|
||||
)
|
||||
}
|
||||
|
||||
pub fn to_table_insert_request(
|
||||
catalog_name: &str,
|
||||
schema_name: &str,
|
||||
request: GrpcInsertRequest,
|
||||
) -> Result<InsertRequest> {
|
||||
let table_name = &request.table_name;
|
||||
let row_count = request.row_count as usize;
|
||||
|
||||
let mut columns_values = HashMap::with_capacity(request.columns.len());
|
||||
for Column {
|
||||
column_name,
|
||||
values,
|
||||
null_mask,
|
||||
datatype,
|
||||
..
|
||||
} in request.columns
|
||||
{
|
||||
let Some(values) = values else { continue };
|
||||
|
||||
let datatype: ConcreteDataType = ColumnDataTypeWrapper::try_new(datatype)
|
||||
.context(ColumnDataTypeSnafu)?
|
||||
.into();
|
||||
let vector = add_values_to_builder(datatype, values, row_count, null_mask)?;
|
||||
|
||||
ensure!(
|
||||
columns_values.insert(column_name.clone(), vector).is_none(),
|
||||
ColumnAlreadyExistsSnafu {
|
||||
column: column_name
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
Ok(InsertRequest {
|
||||
catalog_name: catalog_name.to_string(),
|
||||
schema_name: schema_name.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
columns_values,
|
||||
region_number: request.region_number,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn add_values_to_builder(
|
||||
data_type: ConcreteDataType,
|
||||
values: Values,
|
||||
@@ -150,10 +102,9 @@ mod tests {
|
||||
use common_base::BitVec;
|
||||
use common_catalog::consts::MITO_ENGINE;
|
||||
use common_time::interval::IntervalUnit;
|
||||
use common_time::timestamp::{TimeUnit, Timestamp};
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, SchemaBuilder};
|
||||
use datatypes::value::Value;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use super::*;
|
||||
@@ -356,38 +307,6 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_to_table_insert_request() {
|
||||
let (columns, row_count) = mock_insert_batch();
|
||||
let request = GrpcInsertRequest {
|
||||
table_name: "demo".to_string(),
|
||||
columns,
|
||||
row_count,
|
||||
region_number: 0,
|
||||
};
|
||||
let insert_req = to_table_insert_request("greptime", "public", request).unwrap();
|
||||
|
||||
assert_eq!("greptime", insert_req.catalog_name);
|
||||
assert_eq!("public", insert_req.schema_name);
|
||||
assert_eq!("demo", insert_req.table_name);
|
||||
|
||||
let host = insert_req.columns_values.get("host").unwrap();
|
||||
assert_eq!(Value::String("host1".into()), host.get(0));
|
||||
assert_eq!(Value::String("host2".into()), host.get(1));
|
||||
|
||||
let cpu = insert_req.columns_values.get("cpu").unwrap();
|
||||
assert_eq!(Value::Float64(0.31.into()), cpu.get(0));
|
||||
assert_eq!(Value::Null, cpu.get(1));
|
||||
|
||||
let memory = insert_req.columns_values.get("memory").unwrap();
|
||||
assert_eq!(Value::Null, memory.get(0));
|
||||
assert_eq!(Value::Float64(0.1.into()), memory.get(1));
|
||||
|
||||
let ts = insert_req.columns_values.get("ts").unwrap();
|
||||
assert_eq!(Value::Timestamp(Timestamp::new_millisecond(100)), ts.get(0));
|
||||
assert_eq!(Value::Timestamp(Timestamp::new_millisecond(101)), ts.get(1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_is_null() {
|
||||
let null_mask = BitVec::from_slice(&[0b0000_0001, 0b0000_1000]);
|
||||
@@ -476,7 +395,7 @@ mod tests {
|
||||
};
|
||||
|
||||
let ts_vals = Values {
|
||||
ts_millisecond_values: vec![100, 101],
|
||||
timestamp_millisecond_values: vec![100, 101],
|
||||
..Default::default()
|
||||
};
|
||||
let ts_column = Column {
|
||||
|
||||
@@ -18,5 +18,5 @@ pub mod error;
|
||||
pub mod insert;
|
||||
pub mod util;
|
||||
|
||||
pub use alter::{alter_expr_to_request, create_expr_to_request, create_table_schema};
|
||||
pub use alter::{alter_expr_to_request, create_table_schema};
|
||||
pub use insert::{build_create_expr_from_insertion, find_new_columns};
|
||||
|
||||
@@ -139,8 +139,6 @@ pub fn build_create_table_expr(
|
||||
create_if_not_exists: true,
|
||||
table_options: Default::default(),
|
||||
table_id: table_id.map(|id| api::v1::TableId { id }),
|
||||
// TODO(hl): region number should be allocated by frontend
|
||||
region_numbers: vec![0],
|
||||
engine: engine.to_string(),
|
||||
};
|
||||
|
||||
|
||||
@@ -150,25 +150,25 @@ pub fn values(arrays: &[VectorRef]) -> Result<Values> {
|
||||
(
|
||||
ConcreteDataType::Timestamp(TimestampType::Second(_)),
|
||||
TimestampSecondVector,
|
||||
ts_second_values,
|
||||
timestamp_second_values,
|
||||
|x| { x.into_native() }
|
||||
),
|
||||
(
|
||||
ConcreteDataType::Timestamp(TimestampType::Millisecond(_)),
|
||||
TimestampMillisecondVector,
|
||||
ts_millisecond_values,
|
||||
timestamp_millisecond_values,
|
||||
|x| { x.into_native() }
|
||||
),
|
||||
(
|
||||
ConcreteDataType::Timestamp(TimestampType::Microsecond(_)),
|
||||
TimestampMicrosecondVector,
|
||||
ts_microsecond_values,
|
||||
timestamp_microsecond_values,
|
||||
|x| { x.into_native() }
|
||||
),
|
||||
(
|
||||
ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)),
|
||||
TimestampNanosecondVector,
|
||||
ts_nanosecond_values,
|
||||
timestamp_nanosecond_values,
|
||||
|x| { x.into_native() }
|
||||
),
|
||||
(
|
||||
|
||||
@@ -62,7 +62,7 @@ impl LinesWriter {
|
||||
// It is safe to use unwrap here, because values has been initialized in mut_column()
|
||||
let values = column.values.as_mut().unwrap();
|
||||
values
|
||||
.ts_millisecond_values
|
||||
.timestamp_millisecond_values
|
||||
.push(to_ms_ts(value.1, value.0));
|
||||
self.null_masks[idx].push(false);
|
||||
Ok(())
|
||||
@@ -360,7 +360,7 @@ mod tests {
|
||||
assert_eq!(SemanticType::Timestamp as i32, column.semantic_type);
|
||||
assert_eq!(
|
||||
vec![101011000, 102011001, 103011002],
|
||||
column.values.as_ref().unwrap().ts_millisecond_values
|
||||
column.values.as_ref().unwrap().timestamp_millisecond_values
|
||||
);
|
||||
verify_null_mask(&column.null_mask, vec![false, false, false]);
|
||||
|
||||
|
||||
@@ -14,11 +14,13 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::ident::TableIdent;
|
||||
use crate::table_name::TableName;
|
||||
|
||||
/// Places context of invalidating cache. e.g., span id, trace id etc.
|
||||
#[derive(Debug, Default)]
|
||||
#[derive(Default)]
|
||||
pub struct Context {
|
||||
pub subject: Option<String>,
|
||||
}
|
||||
@@ -26,7 +28,9 @@ pub struct Context {
|
||||
#[async_trait::async_trait]
|
||||
pub trait CacheInvalidator: Send + Sync {
|
||||
// Invalidates table cache
|
||||
async fn invalidate_table(&self, ctx: &Context, table_ident: TableIdent) -> Result<()>;
|
||||
async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> Result<()>;
|
||||
|
||||
async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> Result<()>;
|
||||
}
|
||||
|
||||
pub type CacheInvalidatorRef = Arc<dyn CacheInvalidator>;
|
||||
@@ -35,7 +39,11 @@ pub struct DummyCacheInvalidator;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl CacheInvalidator for DummyCacheInvalidator {
|
||||
async fn invalidate_table(&self, _ctx: &Context, _table_ident: TableIdent) -> Result<()> {
|
||||
async fn invalidate_table_id(&self, _ctx: &Context, _table_id: TableId) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn invalidate_table_name(&self, _ctx: &Context, _table_name: TableName) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,7 +42,6 @@ use crate::ddl::DdlContext;
|
||||
use crate::error::{
|
||||
self, ConvertAlterTableRequestSnafu, InvalidProtoMsgSnafu, Result, TableRouteNotFoundSnafu,
|
||||
};
|
||||
use crate::ident::TableIdent;
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
use crate::key::table_name::TableNameKey;
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
@@ -308,34 +307,25 @@ impl AlterTableProcedure {
|
||||
|
||||
/// Broadcasts the invalidating table cache instructions.
|
||||
async fn on_broadcast(&mut self) -> Result<Status> {
|
||||
let table_ref = self.data.table_ref();
|
||||
|
||||
let table_ident = TableIdent {
|
||||
catalog: table_ref.catalog.to_string(),
|
||||
schema: table_ref.schema.to_string(),
|
||||
table: table_ref.table.to_string(),
|
||||
table_id: self.data.table_id(),
|
||||
engine: self.data.table_info().meta.engine.to_string(),
|
||||
};
|
||||
|
||||
self.context
|
||||
.cache_invalidator
|
||||
.invalidate_table(
|
||||
&Context {
|
||||
subject: Some("Invalidate table cache by alter table procedure".to_string()),
|
||||
},
|
||||
table_ident,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let alter_kind = self.alter_kind()?;
|
||||
if matches!(alter_kind, Kind::RenameTable { .. }) {
|
||||
Ok(Status::Done)
|
||||
let cache_invalidator = &self.context.cache_invalidator;
|
||||
|
||||
let status = if matches!(alter_kind, Kind::RenameTable { .. }) {
|
||||
cache_invalidator
|
||||
.invalidate_table_name(&Context::default(), self.data.table_ref().into())
|
||||
.await?;
|
||||
|
||||
Status::Done
|
||||
} else {
|
||||
cache_invalidator
|
||||
.invalidate_table_id(&Context::default(), self.data.table_id())
|
||||
.await?;
|
||||
|
||||
self.data.state = AlterTableState::SubmitAlterRegionRequests;
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
Status::executing(true)
|
||||
};
|
||||
Ok(status)
|
||||
}
|
||||
|
||||
fn lock_key_inner(&self) -> Vec<String> {
|
||||
|
||||
@@ -36,14 +36,12 @@ use crate::cache_invalidator::Context;
|
||||
use crate::ddl::utils::handle_operate_region_error;
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{self, Result};
|
||||
use crate::ident::TableIdent;
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
use crate::key::table_name::TableNameKey;
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
use crate::metrics;
|
||||
use crate::rpc::ddl::DropTableTask;
|
||||
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
|
||||
use crate::table_name::TableName;
|
||||
|
||||
pub struct DropTableProcedure {
|
||||
pub context: DdlContext,
|
||||
@@ -118,25 +116,17 @@ impl DropTableProcedure {
|
||||
|
||||
/// Broadcasts invalidate table cache instruction.
|
||||
async fn on_broadcast(&mut self) -> Result<Status> {
|
||||
let table_name = self.data.table_name();
|
||||
let engine = &self.data.table_info().meta.engine;
|
||||
|
||||
let table_ident = TableIdent {
|
||||
catalog: table_name.catalog_name,
|
||||
schema: table_name.schema_name,
|
||||
table: table_name.table_name,
|
||||
table_id: self.data.task.table_id,
|
||||
engine: engine.to_string(),
|
||||
let ctx = Context {
|
||||
subject: Some("Invalidate table cache by dropping table".to_string()),
|
||||
};
|
||||
|
||||
self.context
|
||||
.cache_invalidator
|
||||
.invalidate_table(
|
||||
&Context {
|
||||
subject: Some("Invalidate Table Cache by dropping table procedure".to_string()),
|
||||
},
|
||||
table_ident,
|
||||
)
|
||||
let cache_invalidator = &self.context.cache_invalidator;
|
||||
cache_invalidator
|
||||
.invalidate_table_id(&ctx, self.data.table_id())
|
||||
.await?;
|
||||
|
||||
cache_invalidator
|
||||
.invalidate_table_name(&ctx, self.data.table_ref().into())
|
||||
.await?;
|
||||
|
||||
self.data.state = DropTableState::DatanodeDropRegions;
|
||||
@@ -261,10 +251,6 @@ impl DropTableData {
|
||||
self.task.table_ref()
|
||||
}
|
||||
|
||||
fn table_name(&self) -> TableName {
|
||||
self.task.table_name()
|
||||
}
|
||||
|
||||
fn region_routes(&self) -> &Vec<RegionRoute> {
|
||||
&self.table_route_value.region_routes
|
||||
}
|
||||
|
||||
@@ -1,78 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt::{Display, Formatter};
|
||||
|
||||
use api::v1::meta::{TableIdent as RawTableIdent, TableName};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::OptionExt;
|
||||
use table::engine::TableReference;
|
||||
|
||||
use crate::error::{Error, InvalidProtoMsgSnafu};
|
||||
|
||||
#[derive(Eq, Hash, PartialEq, Clone, Debug, Default, Serialize, Deserialize)]
|
||||
pub struct TableIdent {
|
||||
pub catalog: String,
|
||||
pub schema: String,
|
||||
pub table: String,
|
||||
pub table_id: u32,
|
||||
pub engine: String,
|
||||
}
|
||||
|
||||
impl TableIdent {
|
||||
pub fn table_ref(&self) -> TableReference {
|
||||
TableReference::full(&self.catalog, &self.schema, &self.table)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for TableIdent {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"Table(id={}, name='{}.{}.{}', engine='{}')",
|
||||
self.table_id, self.catalog, self.schema, self.table, self.engine,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<RawTableIdent> for TableIdent {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(value: RawTableIdent) -> Result<Self, Self::Error> {
|
||||
let table_name = value.table_name.context(InvalidProtoMsgSnafu {
|
||||
err_msg: "'table_name' is missing in TableIdent",
|
||||
})?;
|
||||
Ok(Self {
|
||||
catalog: table_name.catalog_name,
|
||||
schema: table_name.schema_name,
|
||||
table: table_name.table_name,
|
||||
table_id: value.table_id,
|
||||
engine: value.engine,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TableIdent> for RawTableIdent {
|
||||
fn from(table_ident: TableIdent) -> Self {
|
||||
Self {
|
||||
table_id: table_ident.table_id,
|
||||
engine: table_ident.engine,
|
||||
table_name: Some(TableName {
|
||||
catalog_name: table_ident.catalog,
|
||||
schema_name: table_ident.schema,
|
||||
table_name: table_ident.table,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -15,22 +15,25 @@
|
||||
use std::fmt::{Display, Formatter};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::storage::RegionId;
|
||||
use store_api::storage::{RegionId, RegionNumber};
|
||||
use strum::Display;
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::ident::TableIdent;
|
||||
use crate::table_name::TableName;
|
||||
use crate::{ClusterId, DatanodeId};
|
||||
|
||||
#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct RegionIdent {
|
||||
pub cluster_id: ClusterId,
|
||||
pub datanode_id: DatanodeId,
|
||||
pub table_ident: TableIdent,
|
||||
pub region_number: u32,
|
||||
pub table_id: TableId,
|
||||
pub region_number: RegionNumber,
|
||||
pub engine: String,
|
||||
}
|
||||
|
||||
impl RegionIdent {
|
||||
pub fn get_region_id(&self) -> RegionId {
|
||||
RegionId::new(self.table_ident.table_id, self.region_number)
|
||||
RegionId::new(self.table_id, self.region_number)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,25 +41,12 @@ impl Display for RegionIdent {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"RegionIdent(datanode_id='{}.{}', table_id='{}', table_name='{}.{}.{}', table_engine='{}', region_no='{}')",
|
||||
self.cluster_id,
|
||||
self.datanode_id,
|
||||
self.table_ident.table_id,
|
||||
self.table_ident.catalog,
|
||||
self.table_ident.schema,
|
||||
self.table_ident.table,
|
||||
self.table_ident.engine,
|
||||
self.region_number
|
||||
"RegionIdent(datanode_id='{}.{}', table_id={}, region_number={}, engine = {})",
|
||||
self.cluster_id, self.datanode_id, self.table_id, self.region_number, self.engine
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RegionIdent> for TableIdent {
|
||||
fn from(region_ident: RegionIdent) -> Self {
|
||||
region_ident.table_ident
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||
pub struct SimpleReply {
|
||||
pub result: bool,
|
||||
@@ -69,22 +59,12 @@ impl Display for SimpleReply {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Display)]
|
||||
pub enum Instruction {
|
||||
OpenRegion(RegionIdent),
|
||||
CloseRegion(RegionIdent),
|
||||
InvalidateTableCache(TableIdent),
|
||||
}
|
||||
|
||||
impl Display for Instruction {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::OpenRegion(region) => write!(f, "Instruction::OpenRegion({})", region),
|
||||
Self::CloseRegion(region) => write!(f, "Instruction::CloseRegion({})", region),
|
||||
Self::InvalidateTableCache(table) => write!(f, "Instruction::Invalidate({})", table),
|
||||
}
|
||||
}
|
||||
InvalidateTableIdCache(TableId),
|
||||
InvalidateTableNameCache(TableName),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||
@@ -116,40 +96,30 @@ mod tests {
|
||||
let open_region = Instruction::OpenRegion(RegionIdent {
|
||||
cluster_id: 1,
|
||||
datanode_id: 2,
|
||||
table_ident: TableIdent {
|
||||
catalog: "foo".to_string(),
|
||||
schema: "bar".to_string(),
|
||||
table: "hi".to_string(),
|
||||
table_id: 1024,
|
||||
engine: "mito".to_string(),
|
||||
},
|
||||
table_id: 1024,
|
||||
region_number: 1,
|
||||
engine: "mito2".to_string(),
|
||||
});
|
||||
|
||||
let serialized = serde_json::to_string(&open_region).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
r#"{"type":"open_region","cluster_id":1,"datanode_id":2,"table_ident":{"catalog":"foo","schema":"bar","table":"hi","table_id":1024,"engine":"mito"},"region_number":1}"#,
|
||||
r#"{"OpenRegion":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#,
|
||||
serialized
|
||||
);
|
||||
|
||||
let close_region = Instruction::CloseRegion(RegionIdent {
|
||||
cluster_id: 1,
|
||||
datanode_id: 2,
|
||||
table_ident: TableIdent {
|
||||
catalog: "foo".to_string(),
|
||||
schema: "bar".to_string(),
|
||||
table: "hi".to_string(),
|
||||
table_id: 1024,
|
||||
engine: "mito".to_string(),
|
||||
},
|
||||
table_id: 1024,
|
||||
region_number: 1,
|
||||
engine: "mito2".to_string(),
|
||||
});
|
||||
|
||||
let serialized = serde_json::to_string(&close_region).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
r#"{"type":"close_region","cluster_id":1,"datanode_id":2,"table_ident":{"catalog":"foo","schema":"bar","table":"hi","table_id":1024,"engine":"mito"},"region_number":1}"#,
|
||||
r#"{"CloseRegion":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#,
|
||||
serialized
|
||||
);
|
||||
}
|
||||
|
||||
@@ -21,7 +21,6 @@ pub mod ddl;
|
||||
pub mod ddl_manager;
|
||||
pub mod error;
|
||||
pub mod heartbeat;
|
||||
pub mod ident;
|
||||
pub mod instruction;
|
||||
pub mod key;
|
||||
pub mod kv_backend;
|
||||
|
||||
@@ -76,3 +76,9 @@ impl From<PbTableName> for TableName {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TableReference<'_>> for TableName {
|
||||
fn from(table_ref: TableReference) -> Self {
|
||||
Self::new(table_ref.catalog, table_ref.schema, table_ref.table)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ use servers::define_into_tonic_status;
|
||||
use snafu::{Location, Snafu};
|
||||
use store_api::storage::{RegionId, RegionNumber};
|
||||
use table::error::Error as TableError;
|
||||
use table::metadata::TableId;
|
||||
|
||||
/// Business error of datanode.
|
||||
#[derive(Debug, Snafu)]
|
||||
@@ -54,9 +55,9 @@ pub enum Error {
|
||||
source: TableError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to get table: {}, source: {}", table_name, source))]
|
||||
#[snafu(display("Failed to get table {table_id}, source: {source}, at {location}"))]
|
||||
GetTable {
|
||||
table_name: String,
|
||||
table_id: TableId,
|
||||
location: Location,
|
||||
source: TableError,
|
||||
},
|
||||
|
||||
@@ -46,7 +46,7 @@ impl RegionHeartbeatResponseHandler {
|
||||
Instruction::OpenRegion(region_ident) => {
|
||||
let region_id = Self::region_ident_to_region_id(®ion_ident);
|
||||
let open_region_req = RegionRequest::Open(RegionOpenRequest {
|
||||
engine: region_ident.table_ident.engine,
|
||||
engine: region_ident.engine,
|
||||
region_dir: "".to_string(),
|
||||
options: HashMap::new(),
|
||||
});
|
||||
@@ -57,15 +57,14 @@ impl RegionHeartbeatResponseHandler {
|
||||
let close_region_req = RegionRequest::Close(RegionCloseRequest {});
|
||||
Ok((region_id, close_region_req))
|
||||
}
|
||||
Instruction::InvalidateTableCache(_) => InvalidHeartbeatResponseSnafu.fail(),
|
||||
Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => {
|
||||
InvalidHeartbeatResponseSnafu.fail()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn region_ident_to_region_id(region_ident: &RegionIdent) -> RegionId {
|
||||
RegionId::new(
|
||||
region_ident.table_ident.table_id,
|
||||
region_ident.region_number,
|
||||
)
|
||||
RegionId::new(region_ident.table_id, region_ident.region_number)
|
||||
}
|
||||
|
||||
fn reply_template_from_instruction(instruction: &Instruction) -> InstructionReply {
|
||||
@@ -78,7 +77,7 @@ impl RegionHeartbeatResponseHandler {
|
||||
result: false,
|
||||
error: None,
|
||||
}),
|
||||
Instruction::InvalidateTableCache(_) => {
|
||||
Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => {
|
||||
InstructionReply::InvalidateTableCache(SimpleReply {
|
||||
result: false,
|
||||
error: None,
|
||||
|
||||
@@ -254,7 +254,11 @@ impl RegionServerInner {
|
||||
pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
|
||||
// TODO(ruihang): add metrics and set trace id
|
||||
|
||||
let QueryRequest { region_id, plan } = request;
|
||||
let QueryRequest {
|
||||
header: _,
|
||||
region_id,
|
||||
plan,
|
||||
} = request;
|
||||
let region_id = RegionId::from_u64(region_id);
|
||||
|
||||
// build dummy catalog list
|
||||
|
||||
@@ -23,7 +23,6 @@ use common_meta::heartbeat::handler::{
|
||||
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
|
||||
};
|
||||
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::instruction::{Instruction, InstructionReply, RegionIdent};
|
||||
use common_query::prelude::ScalarUdf;
|
||||
use common_query::Output;
|
||||
@@ -81,31 +80,21 @@ async fn handle_instruction(
|
||||
|
||||
fn close_region_instruction() -> Instruction {
|
||||
Instruction::CloseRegion(RegionIdent {
|
||||
table_ident: TableIdent {
|
||||
catalog: "greptime".to_string(),
|
||||
schema: "public".to_string(),
|
||||
table: "demo".to_string(),
|
||||
table_id: 1024,
|
||||
engine: "mito".to_string(),
|
||||
},
|
||||
table_id: 1024,
|
||||
region_number: 0,
|
||||
cluster_id: 1,
|
||||
datanode_id: 2,
|
||||
engine: "mito2".to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
fn open_region_instruction() -> Instruction {
|
||||
Instruction::OpenRegion(RegionIdent {
|
||||
table_ident: TableIdent {
|
||||
catalog: "greptime".to_string(),
|
||||
schema: "public".to_string(),
|
||||
table: "demo".to_string(),
|
||||
table_id: 1024,
|
||||
engine: "mito".to_string(),
|
||||
},
|
||||
table_id: 1024,
|
||||
region_number: 0,
|
||||
cluster_id: 1,
|
||||
datanode_id: 2,
|
||||
engine: "mito2".to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -28,7 +28,6 @@ use common_error::ext::BoxedError;
|
||||
use common_meta::cache_invalidator::{CacheInvalidator, Context};
|
||||
use common_meta::datanode_manager::DatanodeManagerRef;
|
||||
use common_meta::error::Result as MetaResult;
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::key::catalog_name::CatalogNameKey;
|
||||
use common_meta::key::schema_name::SchemaNameKey;
|
||||
use common_meta::key::table_info::TableInfoKey;
|
||||
@@ -36,10 +35,12 @@ use common_meta::key::table_name::TableNameKey;
|
||||
use common_meta::key::table_route::TableRouteKey;
|
||||
use common_meta::key::{TableMetaKey, TableMetadataManager, TableMetadataManagerRef};
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::table_name::TableName;
|
||||
use common_telemetry::debug;
|
||||
use futures_util::TryStreamExt;
|
||||
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
|
||||
use snafu::prelude::*;
|
||||
use table::metadata::TableId;
|
||||
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
|
||||
use table::TableRef;
|
||||
|
||||
@@ -65,13 +66,8 @@ pub struct FrontendCatalogManager {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl CacheInvalidator for FrontendCatalogManager {
|
||||
async fn invalidate_table(&self, _ctx: &Context, table_ident: TableIdent) -> MetaResult<()> {
|
||||
let table_id = table_ident.table_id;
|
||||
let key = TableNameKey::new(
|
||||
&table_ident.catalog,
|
||||
&table_ident.schema,
|
||||
&table_ident.table,
|
||||
);
|
||||
async fn invalidate_table_name(&self, _ctx: &Context, table_name: TableName) -> MetaResult<()> {
|
||||
let key: TableNameKey = (&table_name).into();
|
||||
self.backend_cache_invalidator
|
||||
.invalidate_key(&key.as_raw_key())
|
||||
.await;
|
||||
@@ -80,6 +76,10 @@ impl CacheInvalidator for FrontendCatalogManager {
|
||||
String::from_utf8_lossy(&key.as_raw_key())
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn invalidate_table_id(&self, _ctx: &Context, table_id: TableId) -> MetaResult<()> {
|
||||
let key = TableInfoKey::new(table_id);
|
||||
self.backend_cache_invalidator
|
||||
.invalidate_key(&key.as_raw_key())
|
||||
|
||||
@@ -119,7 +119,6 @@ pub(crate) async fn create_external_expr(
|
||||
create_if_not_exists: create.if_not_exists,
|
||||
table_options: options,
|
||||
table_id: None,
|
||||
region_numbers: vec![],
|
||||
engine: create.engine.to_string(),
|
||||
};
|
||||
Ok(expr)
|
||||
@@ -151,7 +150,6 @@ pub fn create_to_expr(create: &CreateTable, query_ctx: QueryContextRef) -> Resul
|
||||
create_if_not_exists: create.if_not_exists,
|
||||
table_options,
|
||||
table_id: None,
|
||||
region_numbers: vec![],
|
||||
engine: create.engine.to_string(),
|
||||
};
|
||||
Ok(expr)
|
||||
|
||||
@@ -18,13 +18,15 @@ use common_meta::error::Result as MetaResult;
|
||||
use common_meta::heartbeat::handler::{
|
||||
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
|
||||
};
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
|
||||
use common_meta::key::table_info::TableInfoKey;
|
||||
use common_meta::key::table_name::TableNameKey;
|
||||
use common_meta::key::table_route::TableRouteKey;
|
||||
use common_meta::key::TableMetaKey;
|
||||
use common_meta::table_name::TableName;
|
||||
use common_telemetry::error;
|
||||
use futures::future::Either;
|
||||
use table::metadata::TableId;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct InvalidateTableCacheHandler {
|
||||
@@ -36,22 +38,31 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
|
||||
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
|
||||
matches!(
|
||||
ctx.incoming_message.as_ref(),
|
||||
Some((_, Instruction::InvalidateTableCache { .. }))
|
||||
Some((_, Instruction::InvalidateTableIdCache { .. }))
|
||||
| Some((_, Instruction::InvalidateTableNameCache { .. }))
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
|
||||
// TODO(weny): considers introducing a macro
|
||||
let Some((meta, Instruction::InvalidateTableCache(table_ident))) =
|
||||
ctx.incoming_message.take()
|
||||
else {
|
||||
unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'");
|
||||
};
|
||||
|
||||
let mailbox = ctx.mailbox.clone();
|
||||
let self_ref = self.clone();
|
||||
|
||||
let (meta, invalidator) = match ctx.incoming_message.take() {
|
||||
Some((meta, Instruction::InvalidateTableIdCache(table_id))) => (
|
||||
meta,
|
||||
Either::Left(async move { self_ref.invalidate_table_id_cache(table_id).await }),
|
||||
),
|
||||
Some((meta, Instruction::InvalidateTableNameCache(table_name))) => (
|
||||
meta,
|
||||
Either::Right(
|
||||
async move { self_ref.invalidate_table_name_cache(table_name).await },
|
||||
),
|
||||
),
|
||||
_ => unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'"),
|
||||
};
|
||||
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
self_ref.invalidate_table_cache(table_ident).await;
|
||||
invalidator.await;
|
||||
|
||||
if let Err(e) = mailbox
|
||||
.send((
|
||||
@@ -78,25 +89,19 @@ impl InvalidateTableCacheHandler {
|
||||
}
|
||||
}
|
||||
|
||||
async fn invalidate_table_cache(&self, table_ident: TableIdent) {
|
||||
let table_id = table_ident.table_id;
|
||||
async fn invalidate_table_id_cache(&self, table_id: TableId) {
|
||||
self.backend_cache_invalidator
|
||||
.invalidate_key(&TableInfoKey::new(table_id).as_raw_key())
|
||||
.await;
|
||||
|
||||
self.backend_cache_invalidator
|
||||
.invalidate_key(
|
||||
&TableNameKey::new(
|
||||
&table_ident.catalog,
|
||||
&table_ident.schema,
|
||||
&table_ident.table,
|
||||
)
|
||||
.as_raw_key(),
|
||||
)
|
||||
.await;
|
||||
|
||||
self.backend_cache_invalidator
|
||||
.invalidate_key(&TableRouteKey { table_id }.as_raw_key())
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn invalidate_table_name_cache(&self, table_name: TableName) {
|
||||
self.backend_cache_invalidator
|
||||
.invalidate_key(&TableNameKey::from(&table_name).as_raw_key())
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,6 @@ use common_meta::heartbeat::handler::{
|
||||
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
|
||||
};
|
||||
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
|
||||
use common_meta::key::table_info::TableInfoKey;
|
||||
use common_meta::key::TableMetaKey;
|
||||
@@ -75,13 +74,7 @@ async fn test_invalidate_table_cache_handler() {
|
||||
handle_instruction(
|
||||
executor.clone(),
|
||||
mailbox.clone(),
|
||||
Instruction::InvalidateTableCache(TableIdent {
|
||||
catalog: "test".to_string(),
|
||||
schema: "greptime".to_string(),
|
||||
table: "foo_table".to_string(),
|
||||
table_id,
|
||||
engine: "mito".to_string(),
|
||||
}),
|
||||
Instruction::InvalidateTableIdCache(table_id),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -97,18 +90,7 @@ async fn test_invalidate_table_cache_handler() {
|
||||
.contains_key(&table_info_key.as_raw_key()));
|
||||
|
||||
// removes a invalid key
|
||||
handle_instruction(
|
||||
executor,
|
||||
mailbox,
|
||||
Instruction::InvalidateTableCache(TableIdent {
|
||||
catalog: "test".to_string(),
|
||||
schema: "greptime".to_string(),
|
||||
table: "not_found".to_string(),
|
||||
table_id: 0,
|
||||
engine: "mito".to_string(),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
handle_instruction(executor, mailbox, Instruction::InvalidateTableIdCache(0)).await;
|
||||
|
||||
let (_, reply) = rx.recv().await.unwrap();
|
||||
assert_matches!(
|
||||
|
||||
@@ -114,18 +114,26 @@ fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> {
|
||||
(String, StringValue, string_values),
|
||||
(Date, DateValue, date_values),
|
||||
(Datetime, DatetimeValue, datetime_values),
|
||||
(TimestampSecond, TsSecondValue, ts_second_values),
|
||||
(
|
||||
TimestampSecond,
|
||||
TimestampSecondValue,
|
||||
timestamp_second_values
|
||||
),
|
||||
(
|
||||
TimestampMillisecond,
|
||||
TsMillisecondValue,
|
||||
ts_millisecond_values
|
||||
TimestampMillisecondValue,
|
||||
timestamp_millisecond_values
|
||||
),
|
||||
(
|
||||
TimestampMicrosecond,
|
||||
TsMicrosecondValue,
|
||||
ts_microsecond_values
|
||||
TimestampMicrosecondValue,
|
||||
timestamp_microsecond_values
|
||||
),
|
||||
(
|
||||
TimestampNanosecond,
|
||||
TimestampNanosecondValue,
|
||||
timestamp_nanosecond_values
|
||||
),
|
||||
(TimestampNanosecond, TsNanosecondValue, ts_nanosecond_values),
|
||||
(TimeSecond, TimeSecondValue, time_second_values),
|
||||
(
|
||||
TimeMillisecond,
|
||||
|
||||
@@ -35,6 +35,5 @@ fn request_column_to_row(request: DeleteRequest) -> Result<RowDeleteRequest> {
|
||||
Ok(RowDeleteRequest {
|
||||
table_name: request.table_name,
|
||||
rows: Some(rows),
|
||||
region_number: 0, // FIXME(zhongzc): deprecated field
|
||||
})
|
||||
}
|
||||
|
||||
@@ -35,6 +35,5 @@ fn request_column_to_row(request: InsertRequest) -> Result<RowInsertRequest> {
|
||||
Ok(RowInsertRequest {
|
||||
table_name: request.table_name,
|
||||
rows: Some(rows),
|
||||
region_number: 0, // FIXME(zhongzc): deprecated field
|
||||
})
|
||||
}
|
||||
|
||||
@@ -190,7 +190,6 @@ mod python {
|
||||
create_if_not_exists: request.create_if_not_exists,
|
||||
table_options: (&request.table_options).into(),
|
||||
table_id: None, // Should and will be assigned by Meta.
|
||||
region_numbers: vec![0],
|
||||
engine: request.engine,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -23,7 +23,6 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_catalog::format_full_table_name;
|
||||
use common_meta::cache_invalidator::Context;
|
||||
use common_meta::ddl::ExecutorContext;
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue};
|
||||
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
|
||||
use common_meta::rpc::router::{Partition, Partition as MetaPartition};
|
||||
@@ -114,7 +113,6 @@ impl StatementExecutor {
|
||||
info!("Successfully created table '{table_name}' with table id {table_id}");
|
||||
|
||||
table_info.ident.table_id = table_id;
|
||||
let engine = table_info.meta.engine.to_string();
|
||||
|
||||
let table_info = Arc::new(table_info.try_into().context(error::CreateTableInfoSnafu)?);
|
||||
create_table.table_id = Some(api::v1::TableId { id: table_id });
|
||||
@@ -123,16 +121,7 @@ impl StatementExecutor {
|
||||
|
||||
// Invalidates local cache ASAP.
|
||||
self.cache_invalidator
|
||||
.invalidate_table(
|
||||
&Context::default(),
|
||||
TableIdent {
|
||||
catalog: table_name.catalog_name.to_string(),
|
||||
schema: table_name.schema_name.to_string(),
|
||||
table: table_name.table_name.to_string(),
|
||||
table_id,
|
||||
engine,
|
||||
},
|
||||
)
|
||||
.invalidate_table_id(&Context::default(), table_id)
|
||||
.await
|
||||
.context(error::InvalidateTableCacheSnafu)?;
|
||||
|
||||
@@ -153,21 +142,11 @@ impl StatementExecutor {
|
||||
table_name: table_name.to_string(),
|
||||
})?;
|
||||
let table_id = table.table_info().table_id();
|
||||
let engine = table.table_info().meta.engine.to_string();
|
||||
self.drop_table_procedure(&table_name, table_id).await?;
|
||||
|
||||
// Invalidates local cache ASAP.
|
||||
self.cache_invalidator
|
||||
.invalidate_table(
|
||||
&Context::default(),
|
||||
TableIdent {
|
||||
catalog: table_name.catalog_name.to_string(),
|
||||
schema: table_name.schema_name.to_string(),
|
||||
table: table_name.table_name.to_string(),
|
||||
table_id,
|
||||
engine,
|
||||
},
|
||||
)
|
||||
.invalidate_table_id(&Context::default(), table_id)
|
||||
.await
|
||||
.context(error::InvalidateTableCacheSnafu)?;
|
||||
|
||||
@@ -256,7 +235,6 @@ impl StatementExecutor {
|
||||
})?;
|
||||
|
||||
let table_id = table.table_info().ident.table_id;
|
||||
let engine = table.table_info().meta.engine.to_string();
|
||||
self.verify_alter(table_id, table.table_info(), expr.clone())?;
|
||||
|
||||
info!(
|
||||
@@ -276,16 +254,7 @@ impl StatementExecutor {
|
||||
|
||||
// Invalidates local cache ASAP.
|
||||
self.cache_invalidator
|
||||
.invalidate_table(
|
||||
&Context::default(),
|
||||
TableIdent {
|
||||
catalog: catalog_name.to_string(),
|
||||
schema: schema_name.to_string(),
|
||||
table: table_name.to_string(),
|
||||
table_id,
|
||||
engine,
|
||||
},
|
||||
)
|
||||
.invalidate_table_id(&Context::default(), table_id)
|
||||
.await
|
||||
.context(error::InvalidateTableCacheSnafu)?;
|
||||
|
||||
|
||||
@@ -13,12 +13,14 @@
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cache_invalidator::{CacheInvalidator, Context};
|
||||
use common_meta::error::{self as meta_error, Result as MetaResult};
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::instruction::Instruction;
|
||||
use common_meta::table_name::TableName;
|
||||
use snafu::ResultExt;
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::metasrv::MetasrvInfo;
|
||||
use crate::service::mailbox::{BroadcastChannel, MailboxRef};
|
||||
@@ -37,10 +39,8 @@ impl MetasrvCacheInvalidator {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl CacheInvalidator for MetasrvCacheInvalidator {
|
||||
async fn invalidate_table(&self, ctx: &Context, table_ident: TableIdent) -> MetaResult<()> {
|
||||
let instruction = Instruction::InvalidateTableCache(table_ident);
|
||||
impl MetasrvCacheInvalidator {
|
||||
async fn broadcast(&self, ctx: &Context, instruction: Instruction) -> MetaResult<()> {
|
||||
let subject = &ctx
|
||||
.subject
|
||||
.clone()
|
||||
@@ -62,3 +62,16 @@ impl CacheInvalidator for MetasrvCacheInvalidator {
|
||||
.context(meta_error::ExternalSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl CacheInvalidator for MetasrvCacheInvalidator {
|
||||
async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> {
|
||||
let instruction = Instruction::InvalidateTableIdCache(table_id);
|
||||
self.broadcast(ctx, instruction).await
|
||||
}
|
||||
|
||||
async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> {
|
||||
let instruction = Instruction::InvalidateTableNameCache(table_name);
|
||||
self.broadcast(ctx, instruction).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ use common_meta::peer::Peer;
|
||||
use common_runtime::JoinError;
|
||||
use servers::define_into_tonic_status;
|
||||
use snafu::{Location, Snafu};
|
||||
use table::metadata::TableId;
|
||||
use tokio::sync::mpsc::error::SendError;
|
||||
use tonic::codegen::http;
|
||||
|
||||
@@ -246,9 +247,9 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Table route not found: {}", table_name))]
|
||||
#[snafu(display("Failed to find table route for {table_id}, at {location}"))]
|
||||
TableRouteNotFound {
|
||||
table_name: String,
|
||||
table_id: TableId,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
|
||||
@@ -18,8 +18,7 @@ use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, Role};
|
||||
use async_trait::async_trait;
|
||||
use common_catalog::consts::MITO_ENGINE;
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_catalog::consts::default_engine;
|
||||
use common_meta::RegionIdent;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
@@ -86,13 +85,10 @@ impl HeartbeatHandler for RegionFailureHandler {
|
||||
RegionIdent {
|
||||
cluster_id: stat.cluster_id,
|
||||
datanode_id: stat.id,
|
||||
table_ident: TableIdent {
|
||||
table_id: region_id.table_id(),
|
||||
// TODO(#1583): Use the actual table engine.
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
table_id: region_id.table_id(),
|
||||
region_number: region_id.region_number(),
|
||||
// TODO(LFC): Use the actual table engine (maybe retrieve from heartbeat).
|
||||
engine: default_engine().to_string(),
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
|
||||
@@ -247,8 +247,6 @@ impl FailureDetectorContainer {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_catalog::consts::MITO_ENGINE;
|
||||
use common_meta::ident::TableIdent;
|
||||
use rand::Rng;
|
||||
|
||||
use super::*;
|
||||
@@ -258,16 +256,11 @@ mod tests {
|
||||
fn test_default_failure_detector_container() {
|
||||
let container = FailureDetectorContainer(DashMap::new());
|
||||
let ident = RegionIdent {
|
||||
table_ident: TableIdent {
|
||||
catalog: "a".to_string(),
|
||||
schema: "b".to_string(),
|
||||
table: "c".to_string(),
|
||||
table_id: 1,
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
},
|
||||
table_id: 1,
|
||||
cluster_id: 3,
|
||||
datanode_id: 2,
|
||||
region_number: 1,
|
||||
engine: "mito2".to_string(),
|
||||
};
|
||||
let _ = container.get_failure_detector(ident.clone());
|
||||
assert!(container.0.contains_key(&ident));
|
||||
@@ -287,16 +280,11 @@ mod tests {
|
||||
let container = FailureDetectorContainer(DashMap::new());
|
||||
|
||||
let ident = RegionIdent {
|
||||
table_ident: TableIdent {
|
||||
catalog: "a".to_string(),
|
||||
schema: "b".to_string(),
|
||||
table: "c".to_string(),
|
||||
table_id: 1,
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
},
|
||||
table_id: 1,
|
||||
cluster_id: 3,
|
||||
datanode_id: 2,
|
||||
region_number: 1,
|
||||
engine: "mito2".to_string(),
|
||||
};
|
||||
let _ = container.get_failure_detector(ident.clone());
|
||||
|
||||
@@ -328,16 +316,11 @@ mod tests {
|
||||
region_idents: region_ids
|
||||
.iter()
|
||||
.map(|®ion_number| RegionIdent {
|
||||
table_ident: TableIdent {
|
||||
catalog: "a".to_string(),
|
||||
schema: "b".to_string(),
|
||||
table: "c".to_string(),
|
||||
table_id: 0,
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
},
|
||||
table_id: 0,
|
||||
cluster_id: 1,
|
||||
datanode_id,
|
||||
region_number,
|
||||
engine: "mito2".to_string(),
|
||||
})
|
||||
.collect(),
|
||||
heartbeat_time: start + i * 1000 + rng.gen_range(0..100),
|
||||
|
||||
@@ -70,7 +70,6 @@ impl HeartbeatHandler for RegionLeaseHandler {
|
||||
mod test {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::key::TableMetadataManager;
|
||||
use common_meta::RegionIdent;
|
||||
use store_api::storage::{RegionId, RegionNumber};
|
||||
@@ -131,11 +130,9 @@ mod test {
|
||||
.register_inactive_region(&RegionIdent {
|
||||
cluster_id: 1,
|
||||
datanode_id: 1,
|
||||
table_ident: TableIdent {
|
||||
table_id: 1,
|
||||
..Default::default()
|
||||
},
|
||||
table_id: 1,
|
||||
region_number: 1,
|
||||
engine: "mito2".to_string(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -143,11 +140,9 @@ mod test {
|
||||
.register_inactive_region(&RegionIdent {
|
||||
cluster_id: 1,
|
||||
datanode_id: 1,
|
||||
table_ident: TableIdent {
|
||||
table_id: 1,
|
||||
..Default::default()
|
||||
},
|
||||
table_id: 1,
|
||||
region_number: 3,
|
||||
engine: "mito2".to_string(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -21,12 +21,8 @@ use crate::lock::Key;
|
||||
|
||||
pub(crate) fn table_metadata_lock_key(region: &RegionIdent) -> Key {
|
||||
format!(
|
||||
"table_metadata_lock_({}-{}.{}.{}-{})",
|
||||
region.cluster_id,
|
||||
region.table_ident.catalog,
|
||||
region.table_ident.schema,
|
||||
region.table_ident.table,
|
||||
region.table_ident.table_id,
|
||||
"table_metadata_lock_({}-{})",
|
||||
region.cluster_id, region.table_id,
|
||||
)
|
||||
.into_bytes()
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ use common_telemetry::{debug, error, info, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use servers::http::HttpOptions;
|
||||
use snafu::ResultExt;
|
||||
use table::metadata::TableId;
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
|
||||
use crate::cluster::MetaPeerClientRef;
|
||||
@@ -170,9 +171,7 @@ pub struct SelectorContext {
|
||||
pub datanode_lease_secs: u64,
|
||||
pub kv_store: KvStoreRef,
|
||||
pub meta_peer_client: MetaPeerClientRef,
|
||||
pub catalog: Option<String>,
|
||||
pub schema: Option<String>,
|
||||
pub table: Option<String>,
|
||||
pub table_id: Option<TableId>,
|
||||
}
|
||||
|
||||
pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
|
||||
|
||||
@@ -171,9 +171,7 @@ impl MetaSrvBuilder {
|
||||
server_addr: options.server_addr.clone(),
|
||||
kv_store: kv_store.clone(),
|
||||
meta_peer_client: meta_peer_client.clone(),
|
||||
catalog: None,
|
||||
schema: None,
|
||||
table: None,
|
||||
table_id: None,
|
||||
};
|
||||
let ddl_manager = build_ddl_manager(
|
||||
&options,
|
||||
@@ -195,9 +193,7 @@ impl MetaSrvBuilder {
|
||||
datanode_lease_secs: options.datanode_lease_secs,
|
||||
kv_store: kv_store.clone(),
|
||||
meta_peer_client: meta_peer_client.clone(),
|
||||
catalog: None,
|
||||
schema: None,
|
||||
table: None,
|
||||
table_id: None,
|
||||
};
|
||||
let region_failover_manager = Arc::new(RegionFailoverManager::new(
|
||||
options.region_lease_secs,
|
||||
|
||||
@@ -25,7 +25,6 @@ use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::key::datanode_table::DatanodeTableKey;
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::{ClusterId, RegionIdent};
|
||||
@@ -41,6 +40,7 @@ use failover_start::RegionFailoverStart;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::RegionNumber;
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::error::{Error, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu};
|
||||
use crate::lock::DistLockRef;
|
||||
@@ -54,7 +54,7 @@ const OPEN_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
#[derive(PartialEq, Eq, Hash, Clone)]
|
||||
pub(crate) struct RegionFailoverKey {
|
||||
pub(crate) cluster_id: ClusterId,
|
||||
pub(crate) table_ident: TableIdent,
|
||||
pub(crate) table_id: TableId,
|
||||
pub(crate) region_number: RegionNumber,
|
||||
}
|
||||
|
||||
@@ -62,7 +62,7 @@ impl From<RegionIdent> for RegionFailoverKey {
|
||||
fn from(region_ident: RegionIdent) -> Self {
|
||||
Self {
|
||||
cluster_id: region_ident.cluster_id,
|
||||
table_ident: region_ident.table_ident,
|
||||
table_id: region_ident.table_id,
|
||||
region_number: region_ident.region_number,
|
||||
}
|
||||
}
|
||||
@@ -206,18 +206,17 @@ impl RegionFailoverManager {
|
||||
}
|
||||
|
||||
async fn table_exists(&self, failed_region: &RegionIdent) -> Result<bool> {
|
||||
let table_ident = &failed_region.table_ident;
|
||||
Ok(self
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get_region_distribution(table_ident.table_id)
|
||||
.get_region_distribution(failed_region.table_id)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?
|
||||
.is_some())
|
||||
}
|
||||
|
||||
async fn failed_region_exists(&self, failed_region: &RegionIdent) -> Result<bool> {
|
||||
let table_id = failed_region.table_ident.table_id;
|
||||
let table_id = failed_region.table_id;
|
||||
let datanode_id = failed_region.datanode_id;
|
||||
|
||||
let value = self
|
||||
@@ -372,13 +371,11 @@ impl Procedure for RegionFailoverProcedure {
|
||||
|
||||
fn lock_key(&self) -> LockKey {
|
||||
let region_ident = &self.node.failed_region;
|
||||
let table_key = common_catalog::format_full_table_name(
|
||||
®ion_ident.table_ident.catalog,
|
||||
®ion_ident.table_ident.schema,
|
||||
®ion_ident.table_ident.table,
|
||||
let region_key = format!(
|
||||
"{}/region-{}",
|
||||
region_ident.table_id, region_ident.region_number
|
||||
);
|
||||
let region_key = format!("{}/region-{}", table_key, region_ident.region_number);
|
||||
LockKey::new(vec![table_key, region_key])
|
||||
LockKey::single(region_key)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -389,8 +386,6 @@ mod tests {
|
||||
|
||||
use api::v1::meta::mailbox_message::Payload;
|
||||
use api::v1::meta::{HeartbeatResponse, MailboxMessage, Peer, RequestHeader};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
|
||||
use common_meta::key::TableMetadataManager;
|
||||
use common_meta::sequence::Sequence;
|
||||
@@ -457,13 +452,8 @@ mod tests {
|
||||
cluster_id: 0,
|
||||
region_number,
|
||||
datanode_id: failed_datanode,
|
||||
table_ident: TableIdent {
|
||||
table_id: 1,
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table: "my_table".to_string(),
|
||||
},
|
||||
table_id: 1,
|
||||
engine: "mito2".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -493,6 +483,7 @@ mod tests {
|
||||
// Safety: all required fields set at initialization
|
||||
.unwrap();
|
||||
|
||||
let table_id = 1;
|
||||
let table = "my_table";
|
||||
let table_metadata_manager = Arc::new(TableMetadataManager::new(
|
||||
KvBackendAdapter::wrap(kv_store.clone()),
|
||||
@@ -543,9 +534,7 @@ mod tests {
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
kv_store: kv_store.clone(),
|
||||
meta_peer_client,
|
||||
catalog: Some(DEFAULT_CATALOG_NAME.to_string()),
|
||||
schema: Some(DEFAULT_SCHEMA_NAME.to_string()),
|
||||
table: Some(table.to_string()),
|
||||
table_id: Some(table_id),
|
||||
};
|
||||
|
||||
TestingEnv {
|
||||
@@ -665,7 +654,7 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
procedure.dump().unwrap(),
|
||||
r#"{"failed_region":{"cluster_id":0,"datanode_id":1,"table_ident":{"catalog":"greptime","schema":"public","table":"my_table","table_id":1,"engine":"mito"},"region_number":1},"state":{"region_failover_state":"RegionFailoverEnd"}}"#
|
||||
r#"{"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverEnd"}}"#
|
||||
);
|
||||
|
||||
// Verifies that the failed region (region 1) is moved from failed datanode (datanode 1) to the candidate datanode.
|
||||
@@ -705,12 +694,12 @@ mod tests {
|
||||
let s = procedure.dump().unwrap();
|
||||
assert_eq!(
|
||||
s,
|
||||
r#"{"failed_region":{"cluster_id":0,"datanode_id":1,"table_ident":{"catalog":"greptime","schema":"public","table":"my_table","table_id":1,"engine":"mito"},"region_number":1},"state":{"region_failover_state":"RegionFailoverStart","failover_candidate":null}}"#
|
||||
r#"{"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverStart","failover_candidate":null}}"#
|
||||
);
|
||||
let n: Node = serde_json::from_str(&s).unwrap();
|
||||
assert_eq!(
|
||||
format!("{n:?}"),
|
||||
r#"Node { failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_ident: TableIdent { catalog: "greptime", schema: "public", table: "my_table", table_id: 1, engine: "mito" }, region_number: 1 }, state: RegionFailoverStart { failover_candidate: None } }"#
|
||||
r#"Node { failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_id: 1, region_number: 1, engine: "mito2" }, state: RegionFailoverStart { failover_candidate: None } }"#
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::RegionIdent;
|
||||
use common_telemetry::info;
|
||||
@@ -48,15 +47,7 @@ impl RegionFailoverStart {
|
||||
}
|
||||
|
||||
let mut selector_ctx = ctx.selector_ctx.clone();
|
||||
let TableIdent {
|
||||
catalog,
|
||||
schema,
|
||||
table,
|
||||
..
|
||||
} = &failed_region.table_ident;
|
||||
selector_ctx.catalog = Some(catalog.to_string());
|
||||
selector_ctx.schema = Some(schema.to_string());
|
||||
selector_ctx.table = Some(table.to_string());
|
||||
selector_ctx.table_id = Some(failed_region.table_id);
|
||||
|
||||
let cluster_id = failed_region.cluster_id;
|
||||
let candidates = ctx
|
||||
|
||||
@@ -14,12 +14,12 @@
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use async_trait::async_trait;
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::instruction::Instruction;
|
||||
use common_meta::RegionIdent;
|
||||
use common_telemetry::info;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
use table::metadata::TableId;
|
||||
|
||||
use super::failover_end::RegionFailoverEnd;
|
||||
use super::{RegionFailoverContext, State};
|
||||
@@ -33,9 +33,9 @@ impl InvalidateCache {
|
||||
async fn broadcast_invalidate_table_cache_messages(
|
||||
&self,
|
||||
ctx: &RegionFailoverContext,
|
||||
table_ident: &TableIdent,
|
||||
table_id: TableId,
|
||||
) -> Result<()> {
|
||||
let instruction = Instruction::InvalidateTableCache(table_ident.clone());
|
||||
let instruction = Instruction::InvalidateTableIdCache(table_id);
|
||||
|
||||
let msg = &MailboxMessage::json_message(
|
||||
"Invalidate Table Cache",
|
||||
@@ -62,12 +62,12 @@ impl State for InvalidateCache {
|
||||
ctx: &RegionFailoverContext,
|
||||
failed_region: &RegionIdent,
|
||||
) -> Result<Box<dyn State>> {
|
||||
let table_ident = TableIdent::from(failed_region.clone());
|
||||
let table_id = failed_region.table_id;
|
||||
info!(
|
||||
"Broadcast invalidate table({}) cache message to frontend",
|
||||
table_ident
|
||||
table_id
|
||||
);
|
||||
self.broadcast_invalidate_table_cache_messages(ctx, &table_ident)
|
||||
self.broadcast_invalidate_table_cache_messages(ctx, table_id)
|
||||
.await?;
|
||||
|
||||
Ok(Box::new(RegionFailoverEnd))
|
||||
@@ -108,7 +108,7 @@ mod tests {
|
||||
let _ = heartbeat_receivers.insert(frontend_id, rx);
|
||||
}
|
||||
|
||||
let table_ident: TableIdent = failed_region.clone().into();
|
||||
let table_id = failed_region.table_id;
|
||||
|
||||
// lexicographical order
|
||||
// frontend-4,5,6,7
|
||||
@@ -132,8 +132,7 @@ mod tests {
|
||||
assert_eq!(
|
||||
received.payload,
|
||||
Some(Payload::Json(
|
||||
serde_json::to_string(&Instruction::InvalidateTableCache(table_ident.clone()))
|
||||
.unwrap(),
|
||||
serde_json::to_string(&Instruction::InvalidateTableIdCache(table_id)).unwrap(),
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use super::invalidate_cache::InvalidateCache;
|
||||
use super::{RegionFailoverContext, State};
|
||||
use crate::error::{self, Result, RetryLaterSnafu};
|
||||
use crate::error::{self, Result, RetryLaterSnafu, TableRouteNotFoundSnafu};
|
||||
use crate::lock::keys::table_metadata_lock_key;
|
||||
use crate::lock::Opts;
|
||||
|
||||
@@ -57,8 +57,8 @@ impl UpdateRegionMetadata {
|
||||
ctx: &RegionFailoverContext,
|
||||
failed_region: &RegionIdent,
|
||||
) -> Result<()> {
|
||||
let table_id = failed_region.table_ident.table_id;
|
||||
let engine = failed_region.table_ident.engine.as_str();
|
||||
let table_id = failed_region.table_id;
|
||||
let engine = &failed_region.engine;
|
||||
|
||||
let table_route_value = ctx
|
||||
.table_metadata_manager
|
||||
@@ -66,9 +66,7 @@ impl UpdateRegionMetadata {
|
||||
.get(table_id)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)?
|
||||
.with_context(|| error::TableRouteNotFoundSnafu {
|
||||
table_name: failed_region.table_ident.table_ref().to_string(),
|
||||
})?;
|
||||
.context(TableRouteNotFoundSnafu { table_id })?;
|
||||
|
||||
let mut new_region_routes = table_route_value.region_routes.clone();
|
||||
|
||||
@@ -185,7 +183,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let table_id = failed_region.table_ident.table_id;
|
||||
let table_id = failed_region.table_id;
|
||||
|
||||
env.context
|
||||
.table_metadata_manager
|
||||
@@ -316,7 +314,7 @@ mod tests {
|
||||
let failed_region_1 = env.failed_region(1).await;
|
||||
let failed_region_2 = env.failed_region(2).await;
|
||||
|
||||
let table_id = failed_region_1.table_ident.table_id;
|
||||
let table_id = failed_region_1.table_id;
|
||||
|
||||
let _ = futures::future::join_all(vec![
|
||||
tokio::spawn(async move {
|
||||
|
||||
@@ -80,7 +80,6 @@ fn create_table_task() -> CreateTableTask {
|
||||
create_if_not_exists: false,
|
||||
table_options: HashMap::new(),
|
||||
table_id: None,
|
||||
region_numbers: vec![1, 2, 3],
|
||||
engine: MITO2_ENGINE.to_string(),
|
||||
};
|
||||
|
||||
|
||||
@@ -13,11 +13,11 @@
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::meta::Peer;
|
||||
use common_meta::key::table_name::TableNameKey;
|
||||
use common_meta::key::TableMetadataManager;
|
||||
use common_meta::rpc::router::find_leaders;
|
||||
use common_telemetry::warn;
|
||||
use snafu::ResultExt;
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::keys::{LeaseKey, LeaseValue, StatKey};
|
||||
@@ -32,32 +32,21 @@ pub struct LoadBasedSelector;
|
||||
|
||||
async fn get_leader_peer_ids(
|
||||
table_metadata_manager: &TableMetadataManager,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
table: &str,
|
||||
table_id: TableId,
|
||||
) -> Result<Vec<u64>> {
|
||||
let table_name = table_metadata_manager
|
||||
.table_name_manager()
|
||||
.get(TableNameKey::new(catalog, schema, table))
|
||||
table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get(table_id)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)?;
|
||||
|
||||
Ok(if let Some(table_name) = table_name {
|
||||
table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get(table_name.table_id())
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)?
|
||||
.map(|route| {
|
||||
.context(error::TableMetadataManagerSnafu)
|
||||
.map(|route| {
|
||||
route.map_or_else(Vec::new, |route| {
|
||||
find_leaders(&route.region_routes)
|
||||
.into_iter()
|
||||
.map(|peer| peer.id)
|
||||
.collect()
|
||||
})
|
||||
.unwrap_or_default()
|
||||
} else {
|
||||
Vec::new()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -76,13 +65,11 @@ impl Selector for LoadBasedSelector {
|
||||
let stat_keys: Vec<StatKey> = lease_kvs.keys().map(|k| k.into()).collect();
|
||||
let stat_kvs = ctx.meta_peer_client.get_dn_stat_kvs(stat_keys).await?;
|
||||
|
||||
let leader_peer_ids = if let (Some(catalog), Some(schema), Some(table)) =
|
||||
(&ctx.catalog, &ctx.schema, &ctx.table)
|
||||
{
|
||||
let leader_peer_ids = if let Some(table_id) = ctx.table_id {
|
||||
let table_metadata_manager =
|
||||
TableMetadataManager::new(KvBackendAdapter::wrap(ctx.kv_store.clone()));
|
||||
|
||||
get_leader_peer_ids(&table_metadata_manager, catalog, schema, table).await?
|
||||
get_leader_peer_ids(&table_metadata_manager, table_id).await?
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
@@ -22,7 +22,7 @@ use tonic::codegen::http;
|
||||
|
||||
use super::HttpHandler;
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::error::{Result, TableNotFoundSnafu, TableRouteNotFoundSnafu};
|
||||
|
||||
pub struct RouteHandler {
|
||||
pub table_metadata_manager: TableMetadataManagerRef,
|
||||
@@ -53,15 +53,16 @@ impl HttpHandler for RouteHandler {
|
||||
.get(key)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)?
|
||||
.map(|x| x.table_id());
|
||||
.map(|x| x.table_id())
|
||||
.context(TableNotFoundSnafu { name: table_name })?;
|
||||
|
||||
let table_route_value = self
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get(table_id.context(error::TableNotFoundSnafu { name: table_name })?)
|
||||
.get(table_id)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)?
|
||||
.context(error::TableRouteNotFoundSnafu { table_name })?;
|
||||
.context(TableRouteNotFoundSnafu { table_id })?;
|
||||
http::Response::builder()
|
||||
.status(http::StatusCode::OK)
|
||||
.body(serde_json::to_string(&table_route_value).unwrap())
|
||||
|
||||
@@ -19,7 +19,7 @@ use common_meta::rpc::router::{Table, TableRoute};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::error::{self, Result, TableMetadataManagerSnafu};
|
||||
use crate::error::{self, Result, TableMetadataManagerSnafu, TableRouteNotFoundSnafu};
|
||||
use crate::metasrv::Context;
|
||||
|
||||
pub(crate) async fn fetch_table(
|
||||
@@ -32,9 +32,7 @@ pub(crate) async fn fetch_table(
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
|
||||
if let Some(table_info) = table_info {
|
||||
let table_route = table_route.with_context(|| error::TableRouteNotFoundSnafu {
|
||||
table_name: table_info.table_ref().to_string(),
|
||||
})?;
|
||||
let table_route = table_route.context(TableRouteNotFoundSnafu { table_id })?;
|
||||
|
||||
let table = Table {
|
||||
id: table_id as u64,
|
||||
|
||||
@@ -75,9 +75,7 @@ pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
kv_store: kv_store.clone(),
|
||||
meta_peer_client,
|
||||
catalog: None,
|
||||
schema: None,
|
||||
table: None,
|
||||
table_id: None,
|
||||
};
|
||||
|
||||
Arc::new(RegionFailoverManager::new(
|
||||
|
||||
@@ -147,7 +147,7 @@ fn build_rows_for_tags(
|
||||
value_data: Some(ValueData::F64Value((value_start + idx) as f64)),
|
||||
},
|
||||
api::v1::Value {
|
||||
value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)),
|
||||
value_data: Some(ValueData::TimestampMillisecondValue(ts as i64 * 1000)),
|
||||
},
|
||||
api::v1::Value {
|
||||
value_data: Some(ValueData::StringValue(tag1.to_string())),
|
||||
|
||||
@@ -43,7 +43,7 @@ fn build_rows_multi_tags_fields(
|
||||
});
|
||||
}
|
||||
values.push(api::v1::Value {
|
||||
value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)),
|
||||
value_data: Some(ValueData::TimestampMillisecondValue(ts as i64 * 1000)),
|
||||
});
|
||||
|
||||
api::v1::Row { values }
|
||||
|
||||
@@ -795,7 +795,7 @@ mod tests {
|
||||
value_data: Some(ValueData::I64Value(k1)),
|
||||
},
|
||||
api::v1::Value {
|
||||
value_data: Some(ValueData::TsMillisecondValue(i as i64)),
|
||||
value_data: Some(ValueData::TimestampMillisecondValue(i as i64)),
|
||||
},
|
||||
api::v1::Value {
|
||||
value_data: Some(ValueData::I64Value(i as i64)),
|
||||
|
||||
@@ -307,7 +307,7 @@ pub(crate) fn i64_value(data: i64) -> v1::Value {
|
||||
#[cfg(test)]
|
||||
pub(crate) fn ts_ms_value(data: i64) -> v1::Value {
|
||||
v1::Value {
|
||||
value_data: Some(ValueData::TsMillisecondValue(data)),
|
||||
value_data: Some(ValueData::TimestampMillisecondValue(data)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -467,7 +467,7 @@ pub fn build_rows(start: usize, end: usize) -> Vec<Row> {
|
||||
value_data: Some(ValueData::F64Value(i as f64)),
|
||||
},
|
||||
api::v1::Value {
|
||||
value_data: Some(ValueData::TsMillisecondValue(i as i64 * 1000)),
|
||||
value_data: Some(ValueData::TimestampMillisecondValue(i as i64 * 1000)),
|
||||
},
|
||||
],
|
||||
})
|
||||
@@ -519,7 +519,7 @@ pub fn build_rows_for_key(key: &str, start: usize, end: usize, value_start: usiz
|
||||
value_data: Some(ValueData::F64Value((value_start + idx) as f64)),
|
||||
},
|
||||
api::v1::Value {
|
||||
value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)),
|
||||
value_data: Some(ValueData::TimestampMillisecondValue(ts as i64 * 1000)),
|
||||
},
|
||||
],
|
||||
})
|
||||
@@ -535,7 +535,7 @@ pub fn build_delete_rows_for_key(key: &str, start: usize, end: usize) -> Vec<Row
|
||||
value_data: Some(ValueData::StringValue(key.to_string())),
|
||||
},
|
||||
api::v1::Value {
|
||||
value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)),
|
||||
value_data: Some(ValueData::TimestampMillisecondValue(ts as i64 * 1000)),
|
||||
},
|
||||
],
|
||||
})
|
||||
|
||||
@@ -203,7 +203,7 @@ mod tests {
|
||||
value_data: Some(value::ValueData::StringValue(str_col.to_string())),
|
||||
},
|
||||
Value {
|
||||
value_data: Some(value::ValueData::TsMillisecondValue(*int_col)),
|
||||
value_data: Some(value::ValueData::TimestampMillisecondValue(*int_col)),
|
||||
},
|
||||
];
|
||||
Row { values }
|
||||
|
||||
@@ -162,6 +162,7 @@ impl MergeScanExec {
|
||||
|
||||
for region_id in regions {
|
||||
let request = QueryRequest {
|
||||
header: None,
|
||||
region_id: region_id.into(),
|
||||
plan: substrait_plan.clone(),
|
||||
};
|
||||
|
||||
@@ -12,19 +12,13 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{
|
||||
ColumnDataType, InsertRequest as GrpcInsertRequest, InsertRequests, RowInsertRequests,
|
||||
};
|
||||
use common_grpc::writer::{LinesWriter, Precision};
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::Timestamp;
|
||||
use api::v1::{ColumnDataType, RowInsertRequests};
|
||||
use common_grpc::writer::Precision;
|
||||
use influxdb_line_protocol::{parse_lines, FieldValue};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{Error, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu, TimePrecisionSnafu};
|
||||
use crate::error::{Error, InfluxdbLineProtocolSnafu};
|
||||
use crate::row_writer::{self, MultiTableData};
|
||||
|
||||
pub const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts";
|
||||
@@ -36,105 +30,6 @@ pub struct InfluxdbRequest {
|
||||
pub lines: String,
|
||||
}
|
||||
|
||||
type TableName = String;
|
||||
|
||||
impl TryFrom<InfluxdbRequest> for InsertRequests {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(value: InfluxdbRequest) -> Result<Self, Self::Error> {
|
||||
let mut writers: HashMap<TableName, LinesWriter> = HashMap::new();
|
||||
let lines = parse_lines(&value.lines)
|
||||
.collect::<influxdb_line_protocol::Result<Vec<_>>>()
|
||||
.context(InfluxdbLineProtocolSnafu)?;
|
||||
let line_len = lines.len();
|
||||
|
||||
for line in lines {
|
||||
let table_name = line.series.measurement;
|
||||
let writer = writers
|
||||
.entry(table_name.to_string())
|
||||
.or_insert_with(|| LinesWriter::with_lines(line_len));
|
||||
|
||||
let tags = line.series.tag_set;
|
||||
if let Some(tags) = tags {
|
||||
for (k, v) in tags {
|
||||
writer
|
||||
.write_tag(k.as_str(), v.as_str())
|
||||
.context(InfluxdbLinesWriteSnafu)?;
|
||||
}
|
||||
}
|
||||
|
||||
let fields = line.field_set;
|
||||
for (k, v) in fields {
|
||||
let column_name = k.as_str();
|
||||
match v {
|
||||
FieldValue::I64(value) => {
|
||||
writer
|
||||
.write_i64(column_name, value)
|
||||
.context(InfluxdbLinesWriteSnafu)?;
|
||||
}
|
||||
FieldValue::U64(value) => {
|
||||
writer
|
||||
.write_u64(column_name, value)
|
||||
.context(InfluxdbLinesWriteSnafu)?;
|
||||
}
|
||||
FieldValue::F64(value) => {
|
||||
writer
|
||||
.write_f64(column_name, value)
|
||||
.context(InfluxdbLinesWriteSnafu)?;
|
||||
}
|
||||
FieldValue::String(value) => {
|
||||
writer
|
||||
.write_string(column_name, value.as_str())
|
||||
.context(InfluxdbLinesWriteSnafu)?;
|
||||
}
|
||||
FieldValue::Boolean(value) => {
|
||||
writer
|
||||
.write_bool(column_name, value)
|
||||
.context(InfluxdbLinesWriteSnafu)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(timestamp) = line.timestamp {
|
||||
let precision = unwrap_or_default_precision(value.precision);
|
||||
writer
|
||||
.write_ts(INFLUXDB_TIMESTAMP_COLUMN_NAME, (timestamp, precision))
|
||||
.context(InfluxdbLinesWriteSnafu)?;
|
||||
} else {
|
||||
let precision = unwrap_or_default_precision(value.precision);
|
||||
let timestamp = Timestamp::current_millis();
|
||||
let unit: TimeUnit = precision.try_into().context(InfluxdbLinesWriteSnafu)?;
|
||||
let timestamp = timestamp
|
||||
.convert_to(unit)
|
||||
.with_context(|| TimePrecisionSnafu {
|
||||
name: precision.to_string(),
|
||||
})?;
|
||||
writer
|
||||
.write_ts(
|
||||
INFLUXDB_TIMESTAMP_COLUMN_NAME,
|
||||
(timestamp.into(), precision),
|
||||
)
|
||||
.context(InfluxdbLinesWriteSnafu)?;
|
||||
}
|
||||
writer.commit();
|
||||
}
|
||||
|
||||
let inserts = writers
|
||||
.into_iter()
|
||||
.map(|(table_name, writer)| {
|
||||
let (columns, row_count) = writer.finish();
|
||||
GrpcInsertRequest {
|
||||
table_name,
|
||||
region_number: 0,
|
||||
columns,
|
||||
row_count,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
Ok(InsertRequests { inserts })
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<InfluxdbRequest> for RowInsertRequests {
|
||||
type Error = Error;
|
||||
|
||||
@@ -206,162 +101,12 @@ fn unwrap_or_default_precision(precision: Option<Precision>) -> Precision {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use api::v1::column::Values;
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{Column, ColumnDataType, Rows, SemanticType};
|
||||
use common_base::BitVec;
|
||||
use api::v1::{ColumnDataType, Rows, SemanticType};
|
||||
|
||||
use super::*;
|
||||
use crate::influxdb::InfluxdbRequest;
|
||||
|
||||
#[test]
|
||||
fn test_convert_influxdb_lines() {
|
||||
let lines = r"
|
||||
monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
|
||||
monitor1,host=host2 memory=1027 1663840496400340001
|
||||
monitor2,host=host3 cpu=66.5 1663840496100023102
|
||||
monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
|
||||
|
||||
let influxdb_req = InfluxdbRequest {
|
||||
precision: None,
|
||||
lines: lines.to_string(),
|
||||
};
|
||||
|
||||
let requests: InsertRequests = influxdb_req.try_into().unwrap();
|
||||
assert_eq!(2, requests.inserts.len());
|
||||
|
||||
for request in requests.inserts {
|
||||
match &request.table_name[..] {
|
||||
"monitor1" => assert_monitor_1(&request.columns),
|
||||
"monitor2" => assert_monitor_2(&request.columns),
|
||||
_ => panic!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn assert_monitor_1(columns: &[Column]) {
|
||||
assert_eq!(4, columns.len());
|
||||
verify_column(
|
||||
&columns[0],
|
||||
"host",
|
||||
ColumnDataType::String,
|
||||
SemanticType::Tag,
|
||||
Vec::new(),
|
||||
Values {
|
||||
string_values: vec!["host1".to_string(), "host2".to_string()],
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
|
||||
verify_column(
|
||||
&columns[1],
|
||||
"cpu",
|
||||
ColumnDataType::Float64,
|
||||
SemanticType::Field,
|
||||
vec![false, true],
|
||||
Values {
|
||||
f64_values: vec![66.6],
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
|
||||
verify_column(
|
||||
&columns[2],
|
||||
"memory",
|
||||
ColumnDataType::Float64,
|
||||
SemanticType::Field,
|
||||
Vec::new(),
|
||||
Values {
|
||||
f64_values: vec![1024.0, 1027.0],
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
|
||||
verify_column(
|
||||
&columns[3],
|
||||
"ts",
|
||||
ColumnDataType::TimestampMillisecond,
|
||||
SemanticType::Timestamp,
|
||||
Vec::new(),
|
||||
Values {
|
||||
ts_millisecond_values: vec![1663840496100, 1663840496400],
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
fn assert_monitor_2(columns: &[Column]) {
|
||||
assert_eq!(4, columns.len());
|
||||
verify_column(
|
||||
&columns[0],
|
||||
"host",
|
||||
ColumnDataType::String,
|
||||
SemanticType::Tag,
|
||||
Vec::new(),
|
||||
Values {
|
||||
string_values: vec!["host3".to_string(), "host4".to_string()],
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
|
||||
verify_column(
|
||||
&columns[1],
|
||||
"cpu",
|
||||
ColumnDataType::Float64,
|
||||
SemanticType::Field,
|
||||
Vec::new(),
|
||||
Values {
|
||||
f64_values: vec![66.5, 66.3],
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
|
||||
verify_column(
|
||||
&columns[2],
|
||||
"ts",
|
||||
ColumnDataType::TimestampMillisecond,
|
||||
SemanticType::Timestamp,
|
||||
Vec::new(),
|
||||
Values {
|
||||
ts_millisecond_values: vec![1663840496100, 1663840496400],
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
|
||||
verify_column(
|
||||
&columns[3],
|
||||
"memory",
|
||||
ColumnDataType::Float64,
|
||||
SemanticType::Field,
|
||||
vec![true, false],
|
||||
Values {
|
||||
f64_values: vec![1029.0],
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
fn verify_column(
|
||||
column: &Column,
|
||||
name: &str,
|
||||
datatype: ColumnDataType,
|
||||
semantic_type: SemanticType,
|
||||
null_mask: Vec<bool>,
|
||||
vals: Values,
|
||||
) {
|
||||
assert_eq!(name, column.column_name);
|
||||
assert_eq!(datatype as i32, column.datatype);
|
||||
assert_eq!(semantic_type as i32, column.semantic_type);
|
||||
verify_null_mask(&column.null_mask, null_mask);
|
||||
assert_eq!(Some(vals), column.values);
|
||||
}
|
||||
|
||||
fn verify_null_mask(data: &[u8], expected: Vec<bool>) {
|
||||
let bitvec = BitVec::from_slice(data);
|
||||
for (idx, b) in expected.iter().enumerate() {
|
||||
assert_eq!(b, bitvec.get(idx).unwrap())
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
fn test_convert_influxdb_lines_to_rows() {
|
||||
let lines = r"
|
||||
@@ -553,7 +298,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
|
||||
|
||||
fn extract_ts_millis_value(value: &ValueData) -> i64 {
|
||||
match value {
|
||||
ValueData::TsMillisecondValue(v) => *v,
|
||||
ValueData::TimestampMillisecondValue(v) => *v,
|
||||
_ => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,7 +129,7 @@ impl DataPoint {
|
||||
let ts_column = Column {
|
||||
column_name: OPENTSDB_TIMESTAMP_COLUMN_NAME.to_string(),
|
||||
values: Some(column::Values {
|
||||
ts_millisecond_values: vec![self.ts_millis],
|
||||
timestamp_millisecond_values: vec![self.ts_millis],
|
||||
..Default::default()
|
||||
}),
|
||||
semantic_type: SemanticType::Timestamp as i32,
|
||||
@@ -165,7 +165,6 @@ impl DataPoint {
|
||||
|
||||
GrpcInsertRequest {
|
||||
table_name: self.metric.clone(),
|
||||
region_number: 0,
|
||||
columns,
|
||||
row_count: 1,
|
||||
}
|
||||
@@ -268,7 +267,11 @@ mod test {
|
||||
|
||||
assert_eq!(columns[0].column_name, OPENTSDB_TIMESTAMP_COLUMN_NAME);
|
||||
assert_eq!(
|
||||
columns[0].values.as_ref().unwrap().ts_millisecond_values,
|
||||
columns[0]
|
||||
.values
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.timestamp_millisecond_values,
|
||||
vec![1000]
|
||||
);
|
||||
|
||||
|
||||
@@ -176,7 +176,6 @@ fn encode_gauge(
|
||||
let (columns, row_count) = lines.finish();
|
||||
Ok(InsertRequest {
|
||||
table_name: normalize_otlp_name(name),
|
||||
region_number: 0,
|
||||
columns,
|
||||
row_count,
|
||||
})
|
||||
@@ -208,7 +207,6 @@ fn encode_sum(
|
||||
let (columns, row_count) = lines.finish();
|
||||
Ok(InsertRequest {
|
||||
table_name: normalize_otlp_name(name),
|
||||
region_number: 0,
|
||||
columns,
|
||||
row_count,
|
||||
})
|
||||
@@ -251,7 +249,6 @@ fn encode_histogram(name: &str, hist: &Histogram) -> Result<InsertRequest> {
|
||||
let (columns, row_count) = lines.finish();
|
||||
Ok(InsertRequest {
|
||||
table_name: normalize_otlp_name(name),
|
||||
region_number: 0,
|
||||
columns,
|
||||
row_count,
|
||||
})
|
||||
@@ -310,7 +307,6 @@ fn encode_exponential_histogram(name: &str, hist: &ExponentialHistogram) -> Resu
|
||||
let (columns, row_count) = lines.finish();
|
||||
Ok(InsertRequest {
|
||||
table_name: normalize_otlp_name(name),
|
||||
region_number: 0,
|
||||
columns,
|
||||
row_count,
|
||||
})
|
||||
@@ -351,7 +347,6 @@ fn encode_summary(
|
||||
let (columns, row_count) = lines.finish();
|
||||
Ok(InsertRequest {
|
||||
table_name: normalize_otlp_name(name),
|
||||
region_number: 0,
|
||||
columns,
|
||||
row_count,
|
||||
})
|
||||
|
||||
@@ -15,13 +15,12 @@
|
||||
//! prometheus protocol supportings
|
||||
//! handles prometheus remote_write, remote_read logic
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::collections::BTreeMap;
|
||||
use std::hash::{Hash, Hasher};
|
||||
|
||||
use api::prom_store::remote::label_matcher::Type as MatcherType;
|
||||
use api::prom_store::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
|
||||
use api::v1::{InsertRequest as GrpcInsertRequest, InsertRequests, RowInsertRequests};
|
||||
use common_grpc::writer::{LinesWriter, Precision};
|
||||
use api::v1::RowInsertRequests;
|
||||
use common_recordbatch::{RecordBatch, RecordBatches};
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use datafusion::prelude::{col, lit, regexp_match, Expr};
|
||||
@@ -356,71 +355,6 @@ pub fn to_grpc_row_insert_requests(request: WriteRequest) -> Result<(RowInsertRe
|
||||
Ok(multi_table_data.into_row_insert_requests())
|
||||
}
|
||||
|
||||
pub fn to_grpc_insert_requests(request: WriteRequest) -> Result<(InsertRequests, usize)> {
|
||||
let mut writers: HashMap<String, LinesWriter> = HashMap::new();
|
||||
for timeseries in &request.timeseries {
|
||||
let table_name = timeseries
|
||||
.labels
|
||||
.iter()
|
||||
.find(|label| {
|
||||
// The metric name is a special label
|
||||
label.name == METRIC_NAME_LABEL
|
||||
})
|
||||
.context(error::InvalidPromRemoteRequestSnafu {
|
||||
msg: "missing '__name__' label in timeseries",
|
||||
})?
|
||||
.value
|
||||
.clone();
|
||||
|
||||
let writer = writers
|
||||
.entry(table_name)
|
||||
.or_insert_with(|| LinesWriter::with_lines(16));
|
||||
// For each sample
|
||||
for sample in ×eries.samples {
|
||||
// Insert labels first.
|
||||
for label in ×eries.labels {
|
||||
// The metric name is a special label
|
||||
if label.name == METRIC_NAME_LABEL {
|
||||
continue;
|
||||
}
|
||||
|
||||
writer
|
||||
.write_tag(&label.name, &label.value)
|
||||
.context(error::PromSeriesWriteSnafu)?;
|
||||
}
|
||||
// Insert sample timestamp.
|
||||
writer
|
||||
.write_ts(
|
||||
TIMESTAMP_COLUMN_NAME,
|
||||
(sample.timestamp, Precision::Millisecond),
|
||||
)
|
||||
.context(error::PromSeriesWriteSnafu)?;
|
||||
// Insert sample value.
|
||||
writer
|
||||
.write_f64(FIELD_COLUMN_NAME, sample.value)
|
||||
.context(error::PromSeriesWriteSnafu)?;
|
||||
|
||||
writer.commit();
|
||||
}
|
||||
}
|
||||
|
||||
let mut sample_counts = 0;
|
||||
let inserts = writers
|
||||
.into_iter()
|
||||
.map(|(table_name, writer)| {
|
||||
let (columns, row_count) = writer.finish();
|
||||
sample_counts += row_count as usize;
|
||||
GrpcInsertRequest {
|
||||
table_name,
|
||||
region_number: 0,
|
||||
columns,
|
||||
row_count,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
Ok((InsertRequests { inserts }, sample_counts))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
|
||||
let mut decoder = Decoder::new();
|
||||
@@ -655,7 +589,9 @@ mod tests {
|
||||
value_data: Some(api::v1::value::ValueData::F64Value(value)),
|
||||
},
|
||||
api::v1::Value {
|
||||
value_data: Some(api::v1::value::ValueData::TsMillisecondValue(timestamp)),
|
||||
value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
|
||||
timestamp,
|
||||
)),
|
||||
},
|
||||
],
|
||||
}
|
||||
@@ -674,7 +610,9 @@ mod tests {
|
||||
value_data: Some(api::v1::value::ValueData::F64Value(value)),
|
||||
},
|
||||
api::v1::Value {
|
||||
value_data: Some(api::v1::value::ValueData::TsMillisecondValue(timestamp)),
|
||||
value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
|
||||
timestamp,
|
||||
)),
|
||||
},
|
||||
],
|
||||
}
|
||||
@@ -756,115 +694,6 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_request_to_insert_exprs() {
|
||||
let write_request = WriteRequest {
|
||||
timeseries: mock_timeseries(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut exprs = to_grpc_insert_requests(write_request).unwrap().0.inserts;
|
||||
exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
|
||||
assert_eq!(3, exprs.len());
|
||||
assert_eq!("metric1", exprs[0].table_name);
|
||||
assert_eq!("metric2", exprs[1].table_name);
|
||||
assert_eq!("metric3", exprs[2].table_name);
|
||||
|
||||
let expr = exprs.get_mut(0).unwrap();
|
||||
expr.columns
|
||||
.sort_unstable_by(|l, r| l.column_name.cmp(&r.column_name));
|
||||
|
||||
let columns = &expr.columns;
|
||||
let row_count = expr.row_count;
|
||||
|
||||
assert_eq!(2, row_count);
|
||||
assert_eq!(columns.len(), 3);
|
||||
|
||||
assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME);
|
||||
assert_eq!(
|
||||
columns[0].values.as_ref().unwrap().ts_millisecond_values,
|
||||
vec![1000, 2000]
|
||||
);
|
||||
|
||||
assert_eq!(columns[1].column_name, FIELD_COLUMN_NAME);
|
||||
assert_eq!(
|
||||
columns[1].values.as_ref().unwrap().f64_values,
|
||||
vec![1.0, 2.0]
|
||||
);
|
||||
|
||||
assert_eq!(columns[2].column_name, "job");
|
||||
assert_eq!(
|
||||
columns[2].values.as_ref().unwrap().string_values,
|
||||
vec!["spark", "spark"]
|
||||
);
|
||||
|
||||
let expr = exprs.get_mut(1).unwrap();
|
||||
expr.columns
|
||||
.sort_unstable_by(|l, r| l.column_name.cmp(&r.column_name));
|
||||
|
||||
let columns = &expr.columns;
|
||||
let row_count = expr.row_count;
|
||||
|
||||
assert_eq!(2, row_count);
|
||||
assert_eq!(columns.len(), 4);
|
||||
|
||||
assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME);
|
||||
assert_eq!(
|
||||
columns[0].values.as_ref().unwrap().ts_millisecond_values,
|
||||
vec![1000, 2000]
|
||||
);
|
||||
|
||||
assert_eq!(columns[1].column_name, FIELD_COLUMN_NAME);
|
||||
assert_eq!(
|
||||
columns[1].values.as_ref().unwrap().f64_values,
|
||||
vec![3.0, 4.0]
|
||||
);
|
||||
|
||||
assert_eq!(columns[2].column_name, "idc");
|
||||
assert_eq!(
|
||||
columns[2].values.as_ref().unwrap().string_values,
|
||||
vec!["z001", "z001"]
|
||||
);
|
||||
assert_eq!(columns[3].column_name, "instance");
|
||||
assert_eq!(
|
||||
columns[3].values.as_ref().unwrap().string_values,
|
||||
vec!["test_host1", "test_host1"]
|
||||
);
|
||||
|
||||
let expr = exprs.get_mut(2).unwrap();
|
||||
expr.columns
|
||||
.sort_unstable_by(|l, r| l.column_name.cmp(&r.column_name));
|
||||
|
||||
let columns = &expr.columns;
|
||||
let row_count = expr.row_count;
|
||||
|
||||
assert_eq!(3, row_count);
|
||||
assert_eq!(columns.len(), 4);
|
||||
|
||||
assert_eq!(columns[0].column_name, "app");
|
||||
assert_eq!(
|
||||
columns[0].values.as_ref().unwrap().string_values,
|
||||
vec!["biz", "biz", "biz"]
|
||||
);
|
||||
assert_eq!(columns[1].column_name, TIMESTAMP_COLUMN_NAME);
|
||||
assert_eq!(
|
||||
columns[1].values.as_ref().unwrap().ts_millisecond_values,
|
||||
vec![1000, 2000, 3000]
|
||||
);
|
||||
|
||||
assert_eq!(columns[2].column_name, FIELD_COLUMN_NAME);
|
||||
assert_eq!(
|
||||
columns[2].values.as_ref().unwrap().f64_values,
|
||||
vec![5.0, 6.0, 7.0]
|
||||
);
|
||||
|
||||
assert_eq!(columns[3].column_name, "idc");
|
||||
assert_eq!(
|
||||
columns[3].values.as_ref().unwrap().string_values,
|
||||
vec!["z002", "z002", "z002"]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_recordbatches_to_timeseries() {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
|
||||
@@ -108,7 +108,6 @@ impl<'a> MultiTableData<'a> {
|
||||
RowInsertRequest {
|
||||
table_name: table_name.to_string(),
|
||||
rows: Some(Rows { schema, rows }),
|
||||
..Default::default()
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
@@ -227,14 +226,14 @@ pub fn write_ts_precision<'a>(
|
||||
datatype: ColumnDataType::TimestampMillisecond as i32,
|
||||
semantic_type: SemanticType::Timestamp as i32,
|
||||
});
|
||||
one_row.push(ValueData::TsMillisecondValue(ts).into())
|
||||
one_row.push(ValueData::TimestampMillisecondValue(ts).into())
|
||||
} else {
|
||||
check_schema(
|
||||
ColumnDataType::TimestampMillisecond,
|
||||
SemanticType::Timestamp,
|
||||
&schema[*index],
|
||||
)?;
|
||||
one_row[*index].value_data = Some(ValueData::TsMillisecondValue(ts));
|
||||
one_row[*index].value_data = Some(ValueData::TimestampMillisecondValue(ts));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::greptime_request::Request;
|
||||
use api::v1::InsertRequests;
|
||||
use api::v1::RowInsertRequests;
|
||||
use async_trait::async_trait;
|
||||
use auth::tests::{DatabaseAuthInfo, MockUserProvider};
|
||||
use axum::{http, Router};
|
||||
@@ -54,7 +54,7 @@ impl GrpcQueryHandler for DummyInstance {
|
||||
#[async_trait]
|
||||
impl InfluxdbLineProtocolHandler for DummyInstance {
|
||||
async fn exec(&self, request: InfluxdbRequest, ctx: QueryContextRef) -> Result<()> {
|
||||
let requests: InsertRequests = request.try_into()?;
|
||||
let requests: RowInsertRequests = request.try_into()?;
|
||||
for expr in requests.inserts {
|
||||
let _ = self
|
||||
.tx
|
||||
|
||||
@@ -54,10 +54,8 @@ impl GrpcQueryInterceptor for NoopInterceptor {
|
||||
match req {
|
||||
Request::Inserts(insert) => {
|
||||
ensure!(
|
||||
insert.inserts.iter().all(|x| x.region_number == 0),
|
||||
NotSupportedSnafu {
|
||||
feat: "region not 0"
|
||||
}
|
||||
insert.inserts.iter().all(|x| x.row_count > 0),
|
||||
NotSupportedSnafu { feat: "" }
|
||||
)
|
||||
}
|
||||
_ => {
|
||||
@@ -74,10 +72,7 @@ fn test_grpc_interceptor() {
|
||||
let ctx = QueryContext::arc();
|
||||
|
||||
let req = Request::Inserts(InsertRequests {
|
||||
inserts: vec![InsertRequest {
|
||||
region_number: 1,
|
||||
..Default::default()
|
||||
}],
|
||||
inserts: vec![InsertRequest::default()],
|
||||
});
|
||||
|
||||
let fail = GrpcQueryInterceptor::pre_execute(&di, &req, ctx.clone());
|
||||
|
||||
@@ -448,6 +448,8 @@ pub struct TableInfo {
|
||||
/// Id and version of the table.
|
||||
#[builder(default, setter(into))]
|
||||
pub ident: TableIdent,
|
||||
|
||||
// TODO(LFC): Remove the catalog, schema and table names from TableInfo.
|
||||
/// Name of the table.
|
||||
#[builder(setter(into))]
|
||||
pub name: String,
|
||||
|
||||
Reference in New Issue
Block a user