mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-20 15:00:40 +00:00
refactor: update proto and rename incorrect region_id fields (#1670)
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -3949,7 +3949,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=f43972af48f8be1cbb1d27640a8aba8b30955028#f43972af48f8be1cbb1d27640a8aba8b30955028"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45#ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45"
|
||||
dependencies = [
|
||||
"prost",
|
||||
"serde",
|
||||
|
||||
@@ -364,7 +364,7 @@ fn create_table_expr() -> CreateTableExpr {
|
||||
primary_keys: vec!["VendorID".to_string()],
|
||||
create_if_not_exists: false,
|
||||
table_options: Default::default(),
|
||||
region_ids: vec![0],
|
||||
region_numbers: vec![0],
|
||||
table_id: None,
|
||||
engine: "mito".to_string(),
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ common-base = { path = "../common/base" }
|
||||
common-error = { path = "../common/error" }
|
||||
common-time = { path = "../common/time" }
|
||||
datatypes = { path = "../datatypes" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "f43972af48f8be1cbb1d27640a8aba8b30955028" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45" }
|
||||
prost.workspace = true
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
tonic.workspace = true
|
||||
|
||||
@@ -63,7 +63,7 @@ async fn run() {
|
||||
create_if_not_exists: false,
|
||||
table_options: Default::default(),
|
||||
table_id: Some(TableId { id: 1024 }),
|
||||
region_ids: vec![0],
|
||||
region_numbers: vec![0],
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
};
|
||||
|
||||
|
||||
@@ -163,10 +163,10 @@ pub fn create_expr_to_request(
|
||||
Some(expr.desc)
|
||||
};
|
||||
|
||||
let region_ids = if expr.region_ids.is_empty() {
|
||||
let region_numbers = if expr.region_numbers.is_empty() {
|
||||
vec![0]
|
||||
} else {
|
||||
expr.region_ids
|
||||
expr.region_numbers
|
||||
};
|
||||
|
||||
let table_options =
|
||||
@@ -178,7 +178,7 @@ pub fn create_expr_to_request(
|
||||
table_name: expr.table_name,
|
||||
desc,
|
||||
schema,
|
||||
region_numbers: region_ids,
|
||||
region_numbers,
|
||||
primary_key_indices,
|
||||
create_if_not_exists: expr.create_if_not_exists,
|
||||
table_options,
|
||||
@@ -209,6 +209,7 @@ mod tests {
|
||||
default_constraint: vec![],
|
||||
}),
|
||||
is_key: false,
|
||||
location: None,
|
||||
}],
|
||||
})),
|
||||
};
|
||||
|
||||
@@ -68,6 +68,7 @@ pub fn find_new_columns(schema: &SchemaRef, columns: &[Column]) -> Result<Option
|
||||
columns_to_add.push(AddColumn {
|
||||
column_def,
|
||||
is_key: *semantic_type == TAG_SEMANTIC_TYPE,
|
||||
location: None,
|
||||
});
|
||||
new_columns.insert(column_name.to_string());
|
||||
}
|
||||
@@ -257,7 +258,7 @@ pub fn build_create_expr_from_insertion(
|
||||
create_if_not_exists: true,
|
||||
table_options: Default::default(),
|
||||
table_id: table_id.map(|id| api::v1::TableId { id }),
|
||||
region_ids: vec![0], // TODO:(hl): region id should be allocated by frontend
|
||||
region_numbers: vec![0], // TODO:(hl): region number should be allocated by frontend
|
||||
engine: engine.to_string(),
|
||||
};
|
||||
|
||||
|
||||
@@ -372,6 +372,7 @@ mod test {
|
||||
default_constraint: vec![],
|
||||
}),
|
||||
is_key: true,
|
||||
location: None,
|
||||
}],
|
||||
})),
|
||||
})),
|
||||
|
||||
@@ -97,7 +97,7 @@ impl Instance {
|
||||
catalog_name: expr.catalog_name,
|
||||
schema_name: expr.schema_name,
|
||||
table_name,
|
||||
region_number: expr.region_id,
|
||||
region_number: expr.region_number,
|
||||
wait: None,
|
||||
};
|
||||
self.sql_handler()
|
||||
@@ -239,7 +239,7 @@ mod tests {
|
||||
table_id: Some(TableId {
|
||||
id: MIN_USER_TABLE_ID,
|
||||
}),
|
||||
region_ids: vec![0],
|
||||
region_numbers: vec![0],
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -232,7 +232,7 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
create_if_not_exists: request.create_if_not_exists,
|
||||
table_options: (&request.table_options).into(),
|
||||
table_id: None, // Should and will be assigned by Meta.
|
||||
region_ids: vec![0],
|
||||
region_numbers: vec![0],
|
||||
engine: request.engine,
|
||||
};
|
||||
|
||||
|
||||
@@ -115,7 +115,7 @@ pub(crate) async fn create_external_expr(
|
||||
create_if_not_exists: create.if_not_exists,
|
||||
table_options: options,
|
||||
table_id: None,
|
||||
region_ids: vec![],
|
||||
region_numbers: vec![],
|
||||
engine: create.engine.to_string(),
|
||||
};
|
||||
Ok(expr)
|
||||
@@ -144,7 +144,7 @@ pub fn create_to_expr(create: &CreateTable, query_ctx: QueryContextRef) -> Resul
|
||||
create_if_not_exists: create.if_not_exists,
|
||||
table_options,
|
||||
table_id: None,
|
||||
region_ids: vec![],
|
||||
region_numbers: vec![],
|
||||
engine: create.engine.to_string(),
|
||||
};
|
||||
Ok(expr)
|
||||
@@ -299,6 +299,7 @@ pub(crate) fn to_alter_expr(
|
||||
.context(ExternalSnafu)?,
|
||||
),
|
||||
is_key: false,
|
||||
location: None,
|
||||
}],
|
||||
}),
|
||||
AlterTableOperation::DropColumn { name } => Kind::DropColumns(DropColumns {
|
||||
|
||||
@@ -54,7 +54,8 @@ use sql::ast::{Ident, Value as SqlValue};
|
||||
use sql::statements::create::{PartitionEntry, Partitions};
|
||||
use sql::statements::statement::Statement;
|
||||
use sql::statements::{self, sql_value_to_value};
|
||||
use table::engine::TableReference;
|
||||
use store_api::storage::RegionNumber;
|
||||
use table::engine::{self, TableReference};
|
||||
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
|
||||
use table::requests::TableOptions;
|
||||
use table::table::AlterContext;
|
||||
@@ -189,11 +190,11 @@ impl DistInstance {
|
||||
|
||||
let regions = table_route.find_leader_regions(&datanode);
|
||||
let mut create_expr_for_region = create_table.clone();
|
||||
create_expr_for_region.region_ids = regions;
|
||||
create_expr_for_region.region_numbers = regions;
|
||||
|
||||
debug!(
|
||||
"Creating table {:?} on Datanode {:?} with regions {:?}",
|
||||
create_table, datanode, create_expr_for_region.region_ids,
|
||||
create_table, datanode, create_expr_for_region.region_numbers,
|
||||
);
|
||||
|
||||
let _timer = common_telemetry::timer!(crate::metrics::DIST_CREATE_TABLE_IN_DATANODE);
|
||||
@@ -288,7 +289,11 @@ impl DistInstance {
|
||||
Ok(Output::AffectedRows(1))
|
||||
}
|
||||
|
||||
async fn flush_table(&self, table_name: TableName, region_id: Option<u32>) -> Result<Output> {
|
||||
async fn flush_table(
|
||||
&self,
|
||||
table_name: TableName,
|
||||
region_number: Option<RegionNumber>,
|
||||
) -> Result<Output> {
|
||||
let _ = self
|
||||
.catalog_manager
|
||||
.table(
|
||||
@@ -314,13 +319,13 @@ impl DistInstance {
|
||||
catalog_name: table_name.catalog_name.clone(),
|
||||
schema_name: table_name.schema_name.clone(),
|
||||
table_name: table_name.table_name.clone(),
|
||||
region_id,
|
||||
region_number,
|
||||
};
|
||||
|
||||
for table_route in &route_response.table_routes {
|
||||
let should_send_rpc = table_route.region_routes.iter().any(|route| {
|
||||
if let Some(region_id) = region_id {
|
||||
region_id == route.region.id as u32
|
||||
if let Some(n) = region_number {
|
||||
n == engine::region_number(route.region.id)
|
||||
} else {
|
||||
true
|
||||
}
|
||||
@@ -671,7 +676,7 @@ impl GrpcQueryHandler for DistInstance {
|
||||
DdlExpr::FlushTable(expr) => {
|
||||
let table_name =
|
||||
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
|
||||
self.flush_table(table_name, expr.region_id).await
|
||||
self.flush_table(table_name, expr.region_number).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -136,7 +136,7 @@ async fn handle_create(
|
||||
return Ok(RouteResponse {
|
||||
header: Some(ResponseHeader::failed(
|
||||
cluster_id,
|
||||
Error::no_active_datanodes(),
|
||||
Error::not_enough_active_datanodes(0),
|
||||
)),
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
@@ -46,7 +46,7 @@ pub async fn flush(
|
||||
// if table name is not present, flush all tables inside schema
|
||||
let table_name = params.get("table").cloned().unwrap_or_default();
|
||||
|
||||
let region_id: Option<u32> = params
|
||||
let region_number: Option<u32> = params
|
||||
.get("region")
|
||||
.map(|v| v.parse())
|
||||
.transpose()
|
||||
@@ -58,7 +58,7 @@ pub async fn flush(
|
||||
catalog_name: catalog_name.clone(),
|
||||
schema_name: schema_name.clone(),
|
||||
table_name: table_name.clone(),
|
||||
region_id,
|
||||
region_number,
|
||||
})),
|
||||
});
|
||||
|
||||
|
||||
@@ -172,18 +172,26 @@ pub type TableEngineProcedureRef = Arc<dyn TableEngineProcedure>;
|
||||
|
||||
/// Generate region name in the form of "{TABLE_ID}_{REGION_NUMBER}"
|
||||
#[inline]
|
||||
pub fn region_name(table_id: TableId, n: u32) -> String {
|
||||
format!("{table_id}_{n:010}")
|
||||
pub fn region_name(table_id: TableId, region_number: RegionNumber) -> String {
|
||||
format!("{table_id}_{region_number:010}")
|
||||
}
|
||||
|
||||
/// Construct a [RegionId] from specific `table_id` and `region_number`.
|
||||
#[inline]
|
||||
pub fn region_id(table_id: TableId, n: u32) -> RegionId {
|
||||
(u64::from(table_id) << 32) | u64::from(n)
|
||||
pub fn region_id(table_id: TableId, region_number: RegionNumber) -> RegionId {
|
||||
(u64::from(table_id) << 32) | u64::from(region_number)
|
||||
}
|
||||
|
||||
/// Retrieve the table id from specific `region_id`.
|
||||
#[inline]
|
||||
pub fn table_id(region_id: RegionId) -> TableId {
|
||||
(region_id >> 32) as u32
|
||||
(region_id >> 32) as RegionNumber
|
||||
}
|
||||
|
||||
/// Retrieve the region_number from specific `region_id`.
|
||||
#[inline]
|
||||
pub fn region_number(region_id: RegionId) -> RegionNumber {
|
||||
region_id as RegionNumber
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
||||
@@ -34,6 +34,7 @@ mod test {
|
||||
use query::parser::QueryLanguageParser;
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use session::context::QueryContext;
|
||||
use store_api::storage::RegionNumber;
|
||||
use tests::{has_parquet_file, test_region_dir};
|
||||
|
||||
use crate::tests;
|
||||
@@ -115,6 +116,7 @@ mod test {
|
||||
default_constraint: vec![],
|
||||
}),
|
||||
is_key: false,
|
||||
location: None,
|
||||
}],
|
||||
})),
|
||||
})),
|
||||
@@ -406,14 +408,14 @@ CREATE TABLE {table_name} (
|
||||
catalog_name: &str,
|
||||
schema_name: &str,
|
||||
table_name: &str,
|
||||
region_id: Option<u32>,
|
||||
region_number: Option<RegionNumber>,
|
||||
) {
|
||||
let request = Request::Ddl(DdlRequest {
|
||||
expr: Some(DdlExpr::FlushTable(FlushTableExpr {
|
||||
catalog_name: catalog_name.to_string(),
|
||||
schema_name: schema_name.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
region_id,
|
||||
region_number,
|
||||
})),
|
||||
});
|
||||
|
||||
|
||||
@@ -196,6 +196,7 @@ pub async fn test_insert_and_select(store_type: StorageType) {
|
||||
add_columns: vec![AddColumn {
|
||||
column_def: Some(add_column),
|
||||
is_key: false,
|
||||
location: None,
|
||||
}],
|
||||
});
|
||||
let expr = AlterExpr {
|
||||
@@ -308,7 +309,7 @@ fn testing_create_expr() -> CreateTableExpr {
|
||||
table_id: Some(TableId {
|
||||
id: MIN_USER_TABLE_ID,
|
||||
}),
|
||||
region_ids: vec![0],
|
||||
region_numbers: vec![0],
|
||||
engine: MITO_ENGINE.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user