feat: create distributed table in Frontend (#475)

* feat: create distributed table in Frontend

* fix: some table creation issues (#482)

Co-authored-by: luofucong <luofucong@greptime.com>
Co-authored-by: Lei, Huang <6406592+v0y4g3r@users.noreply.github.com>
This commit is contained in:
LFC
2022-11-14 15:49:25 +08:00
committed by GitHub
parent ef12bb7f24
commit e7b4a00ef0
25 changed files with 1211 additions and 267 deletions

View File

@@ -171,13 +171,6 @@ pub enum Error {
source: meta_client::error::Error,
},
#[snafu(display("Failed to deserialize partition rule from string: {:?}", data))]
DeserializePartitionRule {
data: String,
source: serde_json::error::Error,
backtrace: Backtrace,
},
#[snafu(display("Invalid table schema in catalog, source: {:?}", source))]
InvalidSchemaInCatalog {
#[snafu(backtrace)]
@@ -226,7 +219,6 @@ impl ErrorExt for Error {
Error::SystemCatalogTableScan { source } => source.status_code(),
Error::SystemCatalogTableScanExec { source } => source.status_code(),
Error::InvalidTableSchema { source, .. } => source.status_code(),
Error::DeserializePartitionRule { .. } => StatusCode::Unexpected,
Error::InvalidSchemaInCatalog { .. } => StatusCode::Unexpected,
Error::Internal { source, .. } => source.status_code(),
}

View File

@@ -128,8 +128,11 @@ impl Client {
.context(error::IllegalGrpcClientStateSnafu {
err_msg: "No available peer found",
})?;
let mut client = self.make_client(peer)?;
let result = client.batch(req).await.context(error::TonicStatusSnafu)?;
let mut client = self.make_client(&peer)?;
let result = client
.batch(req)
.await
.context(error::TonicStatusSnafu { addr: peer })?;
Ok(result.into_inner())
}

View File

@@ -25,8 +25,9 @@ pub enum Error {
#[snafu(display("Missing result header"))]
MissingHeader,
#[snafu(display("Tonic internal error, source: {}", source))]
#[snafu(display("Tonic internal error, addr: {}, source: {}", addr, source))]
TonicStatus {
addr: String,
source: tonic::Status,
backtrace: Backtrace,
},

View File

@@ -84,9 +84,9 @@ impl TryFrom<StartCommand> for DatanodeOptions {
// Running mode is only set to Distributed when
// both metasrv addr and node id are set in
// commandline options
opts.meta_client_opts.metasrv_addr = meta_addr;
opts.meta_client_opts.metasrv_addr = meta_addr.clone();
opts.node_id = node_id;
opts.mode = Mode::Distributed;
opts.mode = Mode::Distributed(vec![meta_addr]);
}
(None, None) => {
opts.mode = Mode::Standalone;
@@ -110,6 +110,8 @@ impl TryFrom<StartCommand> for DatanodeOptions {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use datanode::datanode::ObjectStoreConfig;
use frontend::frontend::Mode;
@@ -162,18 +164,16 @@ mod tests {
.mode
);
assert_eq!(
Mode::Distributed,
DatanodeOptions::try_from(StartCommand {
node_id: Some(42),
rpc_addr: None,
mysql_addr: None,
metasrv_addr: Some("127.0.0.1:3002".to_string()),
config_file: None
})
.unwrap()
.mode
);
let mode = DatanodeOptions::try_from(StartCommand {
node_id: Some(42),
rpc_addr: None,
mysql_addr: None,
metasrv_addr: Some("127.0.0.1:3002".to_string()),
config_file: None,
})
.unwrap()
.mode;
assert_matches!(mode, Mode::Distributed(_));
assert!(DatanodeOptions::try_from(StartCommand {
node_id: None,

View File

@@ -1,5 +1,5 @@
use clap::Parser;
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::frontend::{Frontend, FrontendOptions, Mode};
use frontend::grpc::GrpcOptions;
use frontend::influxdb::InfluxdbOptions;
use frontend::instance::Instance;
@@ -52,6 +52,8 @@ pub struct StartCommand {
config_file: Option<String>,
#[clap(short, long)]
influxdb_enable: Option<bool>,
#[clap(long)]
metasrv_addr: Option<String>,
}
impl StartCommand {
@@ -107,6 +109,15 @@ impl TryFrom<StartCommand> for FrontendOptions {
if let Some(enable) = cmd.influxdb_enable {
opts.influxdb_options = Some(InfluxdbOptions { enable });
}
if let Some(metasrv_addr) = cmd.metasrv_addr {
opts.mode = Mode::Distributed(
metasrv_addr
.split(',')
.into_iter()
.map(|x| x.trim().to_string())
.collect::<Vec<String>>(),
);
}
Ok(opts)
}
}
@@ -125,6 +136,7 @@ mod tests {
opentsdb_addr: Some("127.0.0.1:4321".to_string()),
influxdb_enable: Some(false),
config_file: None,
metasrv_addr: None,
};
let opts: FrontendOptions = command.try_into().unwrap();

View File

@@ -1,3 +1,5 @@
#![feature(assert_matches)]
pub mod datanode;
pub mod error;
pub mod frontend;

View File

@@ -121,10 +121,8 @@ pub struct TableGlobalValue {
// TODO(LFC): Maybe remove it?
/// Allocation of region ids across all datanodes.
pub regions_id_map: HashMap<u64, Vec<u32>>,
/// Node id -> region ids
// TODO(LFC): Too much for assembling the table schema that DistTable needs, find another way.
pub meta: RawTableMeta,
/// Partition rules for table
pub partition_rules: String,
}
/// Table regional info that varies between datanode, so it contains a `node_id` field.
@@ -332,7 +330,6 @@ mod tests {
node_id: 0,
regions_id_map: HashMap::from([(0, vec![1, 2, 3])]),
meta,
partition_rules: "{}".to_string(),
};
let serialized = serde_json::to_string(&value).unwrap();
let deserialized = TableGlobalValue::parse(&serialized).unwrap();

View File

@@ -51,7 +51,7 @@ impl Instance {
let meta_client = match opts.mode {
Mode::Standalone => None,
Mode::Distributed => {
Mode::Distributed(_) => {
let meta_client = new_metasrv_client(opts.node_id, &opts.meta_client_opts).await?;
Some(Arc::new(meta_client))
}
@@ -83,7 +83,7 @@ impl Instance {
)
}
Mode::Distributed => {
Mode::Distributed(_) => {
let catalog = Arc::new(catalog::remote::RemoteCatalogManager::new(
table_engine.clone(),
opts.node_id,
@@ -102,7 +102,7 @@ impl Instance {
let heartbeat_task = match opts.mode {
Mode::Standalone => None,
Mode::Distributed => Some(HeartbeatTask::new(
Mode::Distributed(_) => Some(HeartbeatTask::new(
opts.node_id, /*node id not set*/
opts.rpc_addr.clone(),
meta_client.as_ref().unwrap().clone(),

View File

@@ -8,6 +8,7 @@ api = { path = "../api" }
async-stream = "0.3"
async-trait = "0.1"
catalog = { path = "../catalog" }
chrono = "0.4"
client = { path = "../client" }
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
@@ -49,7 +50,6 @@ features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc
[dev-dependencies]
datanode = { path = "../datanode" }
chrono = "0.4"
futures = "0.3"
meta-srv = { path = "../meta-srv", features = ["mock"] }
tempdir = "0.3"

View File

@@ -2,9 +2,7 @@ use std::any::Any;
use std::collections::HashSet;
use std::sync::Arc;
use catalog::error::{
DeserializePartitionRuleSnafu, InvalidCatalogValueSnafu, InvalidSchemaInCatalogSnafu,
};
use catalog::error::{InvalidCatalogValueSnafu, InvalidSchemaInCatalogSnafu};
use catalog::remote::{Kv, KvBackendRef};
use catalog::{
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, RegisterSchemaRequest,
@@ -17,7 +15,6 @@ use snafu::prelude::*;
use table::TableRef;
use crate::datanode::DatanodeClients;
use crate::partitioning::range::RangePartitionRule;
use crate::table::route::TableRoutes;
use crate::table::DistTable;
@@ -40,6 +37,10 @@ impl FrontendCatalogManager {
datanode_clients,
}
}
pub(crate) fn backend(&self) -> KvBackendRef {
self.backend.clone()
}
}
// FIXME(hl): Frontend only needs a CatalogList, should replace with trait upcasting
@@ -249,14 +250,6 @@ impl SchemaProvider for FrontendSchemaProvider {
let val = TableGlobalValue::parse(String::from_utf8_lossy(&res.1))
.context(InvalidCatalogValueSnafu)?;
// TODO(hl): We need to deserialize string to PartitionRule trait object
let partition_rule: Arc<RangePartitionRule> =
Arc::new(serde_json::from_str(&val.partition_rules).context(
DeserializePartitionRuleSnafu {
data: &val.partition_rules,
},
)?);
let table = Arc::new(DistTable {
table_name,
schema: Arc::new(
@@ -265,7 +258,6 @@ impl SchemaProvider for FrontendSchemaProvider {
.try_into()
.context(InvalidSchemaInCatalogSnafu)?,
),
partition_rule,
table_routes,
datanode_clients,
});

View File

@@ -164,18 +164,24 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Failed access catalog: {}", source))]
#[snafu(display("General catalog error: {}", source))]
Catalog {
#[snafu(backtrace)]
source: catalog::error::Error,
},
#[snafu(display("Failed to parse catalog entry: {}", source))]
ParseCatalogEntry {
#[snafu(display("Failed to serialize or deserialize catalog entry: {}", source))]
CatalogEntrySerde {
#[snafu(backtrace)]
source: common_catalog::error::Error,
},
#[snafu(display("Failed to start Meta client, source: {}", source))]
StartMetaClient {
#[snafu(backtrace)]
source: meta_client::error::Error,
},
#[snafu(display("Failed to request Meta, source: {}", source))]
RequestMeta {
#[snafu(backtrace)]
@@ -280,6 +286,63 @@ pub enum Error {
#[snafu(backtrace)]
source: sql::error::Error,
},
#[snafu(display("Failed to find region routes for table {}", table_name))]
FindRegionRoutes {
table_name: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to serialize value to json, source: {}", source))]
SerializeJson {
source: serde_json::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to deserialize value from json, source: {}", source))]
DeserializeJson {
source: serde_json::Error,
backtrace: Backtrace,
},
#[snafu(display(
"Failed to find leader peer for region {} in table {}",
region,
table_name
))]
FindLeaderPeer {
region: u64,
table_name: String,
backtrace: Backtrace,
},
#[snafu(display(
"Failed to find partition info for region {} in table {}",
region,
table_name
))]
FindRegionPartition {
region: u64,
table_name: String,
backtrace: Backtrace,
},
#[snafu(display(
"Illegal table routes data for table {}, error message: {}",
table_name,
err_msg
))]
IllegalTableRoutesData {
table_name: String,
err_msg: String,
backtrace: Backtrace,
},
#[snafu(display("Invalid admin result, source: {}", source))]
InvalidAdminResult {
#[snafu(backtrace)]
source: client::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -287,8 +350,7 @@ pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ConnectDatanode { .. }
| Error::ParseAddr { .. }
Error::ParseAddr { .. }
| Error::InvalidSql { .. }
| Error::FindRegion { .. }
| Error::FindRegions { .. }
@@ -311,12 +373,20 @@ impl ErrorExt for Error {
Error::ConvertColumnDefaultConstraint { source, .. }
| Error::ConvertScalarValue { source, .. } => source.status_code(),
Error::RequestDatanode { source } => source.status_code(),
Error::ConnectDatanode { source, .. }
| Error::RequestDatanode { source }
| Error::InvalidAdminResult { source } => source.status_code(),
Error::ColumnDataType { .. }
| Error::FindDatanode { .. }
| Error::GetCache { .. }
| Error::FindTableRoutes { .. } => StatusCode::Internal,
| Error::FindTableRoutes { .. }
| Error::SerializeJson { .. }
| Error::DeserializeJson { .. }
| Error::FindRegionRoutes { .. }
| Error::FindLeaderPeer { .. }
| Error::FindRegionPartition { .. }
| Error::IllegalTableRoutesData { .. } => StatusCode::Internal,
Error::IllegalFrontendState { .. } | Error::IncompleteGrpcResult { .. } => {
StatusCode::Unexpected
@@ -328,9 +398,11 @@ impl ErrorExt for Error {
Error::JoinTask { .. } => StatusCode::Unexpected,
Error::Catalog { source, .. } => source.status_code(),
Error::ParseCatalogEntry { source, .. } => source.status_code(),
Error::CatalogEntrySerde { source, .. } => source.status_code(),
Error::RequestMeta { source } => source.status_code(),
Error::StartMetaClient { source } | Error::RequestMeta { source } => {
source.status_code()
}
Error::BumpTableId { source, .. } => source.status_code(),
Error::SchemaNotFound { .. } => StatusCode::InvalidArguments,
Error::CatalogNotFound { .. } => StatusCode::InvalidArguments,

View File

@@ -85,5 +85,6 @@ where
#[serde(rename_all = "lowercase")]
pub enum Mode {
Standalone,
Distributed,
// with meta server's addr
Distributed(Vec<String>),
}

View File

@@ -1,3 +1,4 @@
pub(crate) mod distributed;
mod influxdb;
mod opentsdb;
mod prometheus;
@@ -26,6 +27,7 @@ use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_query::Output;
use common_telemetry::{debug, error, info};
use datatypes::schema::ColumnSchema;
use distributed::DistInstance;
use meta_client::client::MetaClientBuilder;
use meta_client::MetaClientOpts;
use servers::error as server_error;
@@ -85,6 +87,8 @@ pub struct Instance {
// Standalone and Distributed, then the code behind it doesn't need to use so
// many match statements.
mode: Mode,
// TODO(LFC): Refactor consideration: Can we split Frontend to DistInstance and EmbedInstance?
dist_instance: Option<DistInstance>,
}
impl Default for Instance {
@@ -94,19 +98,29 @@ impl Default for Instance {
catalog_manager: None,
table_id_provider: None,
mode: Mode::Standalone,
dist_instance: None,
}
}
}
impl Instance {
pub async fn try_new(opts: &FrontendOptions) -> Result<Self> {
let mut instance = Instance::default();
let mut instance = Instance {
mode: opts.mode.clone(),
..Default::default()
};
let addr = opts.datanode_grpc_addr();
instance.client.start(vec![addr]);
let meta_client = match opts.mode {
instance.dist_instance = match &opts.mode {
Mode::Standalone => None,
Mode::Distributed => {
Mode::Distributed(metasrv_addr) => {
info!(
"Creating Frontend instance in distributed mode with Meta server addr {:?}",
metasrv_addr
);
let meta_config = MetaClientOpts::default();
let channel_config = ChannelConfig::new()
.timeout(Duration::from_millis(meta_config.timeout_millis))
@@ -115,26 +129,36 @@ impl Instance {
let channel_manager = ChannelManager::with_config(channel_config);
let meta_client = MetaClientBuilder::new(0, 0)
let mut meta_client = MetaClientBuilder::new(0, 0)
.enable_router()
.enable_store()
.channel_manager(channel_manager)
.build();
Some(Arc::new(meta_client))
}
};
meta_client
.start(metasrv_addr)
.await
.context(error::StartMetaClientSnafu)?;
let meta_client = Arc::new(meta_client);
instance.catalog_manager = if let Some(meta_client) = meta_client {
let meta_backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),
});
let table_routes = Arc::new(TableRoutes::new(meta_client));
let datanode_clients = Arc::new(DatanodeClients::new());
let catalog_manager =
FrontendCatalogManager::new(meta_backend, table_routes, datanode_clients);
Some(Arc::new(catalog_manager))
} else {
None
let meta_backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),
});
let table_routes = Arc::new(TableRoutes::new(meta_client.clone()));
let datanode_clients = Arc::new(DatanodeClients::new());
let catalog_manager = FrontendCatalogManager::new(
meta_backend,
table_routes,
datanode_clients.clone(),
);
instance.catalog_manager = Some(Arc::new(catalog_manager.clone()));
Some(DistInstance::new(
meta_client,
catalog_manager,
datanode_clients,
))
}
};
Ok(instance)
}
@@ -162,17 +186,14 @@ impl Instance {
}
/// Convert `CreateTable` statement to `CreateExpr` gRPC request.
async fn create_to_expr(&self, create: CreateTable) -> Result<CreateExpr> {
fn create_to_expr(
table_id: Option<u32>,
region_ids: Vec<u32>,
create: &CreateTable,
) -> Result<CreateExpr> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&create.name).context(error::ParseSqlSnafu)?;
let table_id = match &self.table_id_provider {
Some(provider) => Some(provider.next_table_id().await.context(BumpTableIdSnafu)?),
None => None,
};
// FIXME(hl): Region id should be generated from metasrv
let region_ids = vec![0];
let time_index = find_time_index(&create.constraints)?;
let expr = CreateExpr {
catalog_name: Some(catalog_name),
@@ -184,7 +205,7 @@ impl Instance {
primary_keys: find_primary_keys(&create.constraints)?,
create_if_not_exists: create.if_not_exists,
// TODO(LFC): Fill in other table options.
table_options: HashMap::from([("engine".to_string(), create.engine)]),
table_options: HashMap::from([("engine".to_string(), create.engine.clone())]),
table_id,
region_ids,
};
@@ -478,6 +499,7 @@ impl Instance {
catalog_manager: Some(catalog),
table_id_provider: None,
mode: Mode::Standalone,
dist_instance: None,
}
}
}
@@ -526,7 +548,7 @@ impl SqlQueryHandler for Instance {
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
}
Mode::Distributed => {
Mode::Distributed(_) => {
let affected = self
.sql_dist_insert(insert)
.await
@@ -538,15 +560,32 @@ impl SqlQueryHandler for Instance {
}
},
Statement::CreateTable(create) => {
let expr = self
.create_to_expr(create)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })?;
self.handle_create_table(expr)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
if let Some(dist_instance) = &self.dist_instance {
dist_instance
.create_table(&create)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
} else {
let table_id = match &self.table_id_provider {
Some(provider) => Some(
provider
.next_table_id()
.await
.context(BumpTableIdSnafu)
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })?,
),
None => None,
};
let expr = Self::create_to_expr(table_id, vec![0], &create)
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })?;
self.handle_create_table(expr)
.await
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })
}
}
Statement::ShowDatabases(_) | Statement::ShowTables(_) => self

View File

@@ -0,0 +1,329 @@
use std::collections::HashMap;
use std::sync::Arc;
use chrono::DateTime;
use client::admin::{admin_result_to_output, Admin};
use common_catalog::{TableGlobalKey, TableGlobalValue};
use common_query::Output;
use common_telemetry::debug;
use datatypes::schema::RawSchema;
use meta_client::client::MetaClient;
use meta_client::rpc::{
CreateRequest as MetaCreateRequest, Partition as MetaPartition, RouteResponse, TableName,
TableRoute,
};
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::create::CreateTable;
use sql::statements::{
column_def_to_schema, sql_data_type_to_concrete_data_type, sql_value_to_value,
table_idents_to_full_name,
};
use sqlparser::ast::ColumnDef;
use sqlparser::ast::Value as SqlValue;
use table::metadata::RawTableMeta;
use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::error::{self, Result};
use crate::instance::{find_primary_keys, find_time_index, Instance};
use crate::partitioning::{PartitionBound, PartitionDef};
#[derive(Clone)]
pub(crate) struct DistInstance {
meta_client: Arc<MetaClient>,
catalog_manager: FrontendCatalogManager,
datanode_clients: Arc<DatanodeClients>,
}
impl DistInstance {
pub(crate) fn new(
meta_client: Arc<MetaClient>,
catalog_manager: FrontendCatalogManager,
datanode_clients: Arc<DatanodeClients>,
) -> Self {
Self {
meta_client,
catalog_manager,
datanode_clients,
}
}
pub(crate) async fn create_table(&self, create_table: &CreateTable) -> Result<Output> {
let response = self.create_table_in_meta(create_table).await?;
let table_routes = response.table_routes;
ensure!(
table_routes.len() == 1,
error::FindTableRoutesSnafu {
table_name: create_table.name.to_string()
}
);
let table_route = table_routes.first().unwrap();
let region_routes = &table_route.region_routes;
ensure!(
!region_routes.is_empty(),
error::FindRegionRoutesSnafu {
table_name: create_table.name.to_string()
}
);
self.put_table_global_meta(create_table, table_route)
.await?;
for datanode in table_route.find_leaders() {
let client = self.datanode_clients.get_client(&datanode).await;
let client = Admin::new("greptime", client);
let regions = table_route.find_leader_regions(&datanode);
let create_expr = Instance::create_to_expr(
Some(table_route.table.id as u32),
regions.clone(),
create_table,
)?;
debug!(
"Creating table {:?} on Datanode {:?} with regions {:?}",
create_table, datanode, regions,
);
client
.create(create_expr)
.await
.and_then(admin_result_to_output)
.context(error::InvalidAdminResultSnafu)?;
}
Ok(Output::AffectedRows(region_routes.len()))
}
async fn create_table_in_meta(&self, create_table: &CreateTable) -> Result<RouteResponse> {
let (catalog, schema, table) =
table_idents_to_full_name(&create_table.name).context(error::ParseSqlSnafu)?;
let table_name = TableName::new(catalog, schema, table);
let partitions = parse_partitions(create_table)?;
let request = MetaCreateRequest {
table_name,
partitions,
};
self.meta_client
.create_route(request)
.await
.context(error::RequestMetaSnafu)
}
// TODO(LFC): Maybe move this to FrontendCatalogManager's "register_table" method?
async fn put_table_global_meta(
&self,
create_table: &CreateTable,
table_route: &TableRoute,
) -> Result<()> {
let table_name = &table_route.table.table_name;
let key = TableGlobalKey {
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
};
let value = create_table_global_value(create_table, table_route)?
.as_bytes()
.context(error::CatalogEntrySerdeSnafu)?;
self.catalog_manager
.backend()
.set(key.to_string().as_bytes(), &value)
.await
.context(error::CatalogSnafu)
}
}
fn create_table_global_value(
create_table: &CreateTable,
table_route: &TableRoute,
) -> Result<TableGlobalValue> {
let table_name = &table_route.table.table_name;
let region_routes = &table_route.region_routes;
let node_id = region_routes[0]
.leader_peer
.as_ref()
.context(error::FindLeaderPeerSnafu {
region: region_routes[0].region.id,
table_name: table_name.to_string(),
})?
.id;
let mut column_schemas = Vec::with_capacity(create_table.columns.len());
let time_index = find_time_index(&create_table.constraints)?;
for column in create_table.columns.iter() {
column_schemas.push(
column_def_to_schema(column, column.name.value == time_index)
.context(error::ParseSqlSnafu)?,
);
}
let timestamp_index = column_schemas.iter().enumerate().find_map(|(i, c)| {
if c.name == time_index {
Some(i)
} else {
None
}
});
let raw_schema = RawSchema {
column_schemas: column_schemas.clone(),
timestamp_index,
version: 0,
};
let primary_key_indices = find_primary_keys(&create_table.constraints)?
.iter()
.map(|k| {
column_schemas
.iter()
.enumerate()
.find_map(|(i, c)| if &c.name == k { Some(i) } else { None })
.unwrap() // unwrap is safe because primary key's column name must have been defined
})
.collect::<Vec<usize>>();
let meta = RawTableMeta {
schema: raw_schema,
primary_key_indices,
value_indices: vec![],
engine: create_table.engine.clone(),
next_column_id: column_schemas.len() as u32,
region_numbers: vec![],
engine_options: HashMap::new(),
options: HashMap::new(),
created_on: DateTime::default(),
};
Ok(TableGlobalValue {
id: table_route.table.id as u32,
node_id,
regions_id_map: HashMap::new(),
meta,
})
}
fn parse_partitions(create_table: &CreateTable) -> Result<Vec<MetaPartition>> {
// If partitions are not defined by user, use the timestamp column (which has to be existed) as
// the partition column, and create only one partition.
let partition_columns = find_partition_columns(create_table)?;
let partition_entries = find_partition_entries(create_table, &partition_columns)?;
partition_entries
.into_iter()
.map(|x| PartitionDef::new(partition_columns.clone(), x).try_into())
.collect::<Result<Vec<MetaPartition>>>()
}
fn find_partition_entries(
create_table: &CreateTable,
partition_columns: &[String],
) -> Result<Vec<Vec<PartitionBound>>> {
let entries = if let Some(partitions) = &create_table.partitions {
let column_defs = partition_columns
.iter()
.map(|pc| {
create_table
.columns
.iter()
.find(|c| &c.name.value == pc)
// unwrap is safe here because we have checked that partition columns are defined
.unwrap()
})
.collect::<Vec<&ColumnDef>>();
let mut column_name_and_type = Vec::with_capacity(column_defs.len());
for column in column_defs {
let column_name = &column.name.value;
let data_type = sql_data_type_to_concrete_data_type(&column.data_type)
.context(error::ParseSqlSnafu)?;
column_name_and_type.push((column_name, data_type));
}
let mut entries = Vec::with_capacity(partitions.entries.len());
for e in partitions.entries.iter() {
let mut values = Vec::with_capacity(e.value_list.len());
for (i, v) in e.value_list.iter().enumerate() {
// indexing is safe here because we have checked that "value_list" and "column_list" are matched in size
let (column_name, data_type) = &column_name_and_type[i];
let v = match v {
SqlValue::Number(n, _) if n == "MAXVALUE" => PartitionBound::MaxValue,
_ => PartitionBound::Value(
sql_value_to_value(column_name, data_type, v)
.context(error::ParseSqlSnafu)?,
),
};
values.push(v);
}
entries.push(values);
}
entries
} else {
vec![vec![PartitionBound::MaxValue]]
};
Ok(entries)
}
fn find_partition_columns(create_table: &CreateTable) -> Result<Vec<String>> {
let columns = if let Some(partitions) = &create_table.partitions {
partitions
.column_list
.iter()
.map(|x| x.value.clone())
.collect::<Vec<String>>()
} else {
vec![find_time_index(&create_table.constraints)?]
};
Ok(columns)
}
#[cfg(test)]
mod test {
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
use sqlparser::dialect::GenericDialect;
use super::*;
#[test]
fn test_parse_partitions() {
let cases = [
(
r"
CREATE TABLE rcx ( a INT, b STRING, c INT )
PARTITION BY RANGE COLUMNS (b) (
PARTITION r0 VALUES LESS THAN ('hz'),
PARTITION r1 VALUES LESS THAN ('sh'),
PARTITION r2 VALUES LESS THAN (MAXVALUE),
)
ENGINE=mito",
r#"[{"column_list":"b","value_list":"{\"Value\":{\"String\":\"hz\"}}"},{"column_list":"b","value_list":"{\"Value\":{\"String\":\"sh\"}}"},{"column_list":"b","value_list":"\"MaxValue\""}]"#,
),
(
r"
CREATE TABLE rcx ( a INT, b STRING, c INT )
PARTITION BY RANGE COLUMNS (b, a) (
PARTITION r0 VALUES LESS THAN ('hz', 10),
PARTITION r1 VALUES LESS THAN ('sh', 20),
PARTITION r2 VALUES LESS THAN (MAXVALUE, MAXVALUE),
)
ENGINE=mito",
r#"[{"column_list":"b,a","value_list":"{\"Value\":{\"String\":\"hz\"}},{\"Value\":{\"Int32\":10}}"},{"column_list":"b,a","value_list":"{\"Value\":{\"String\":\"sh\"}},{\"Value\":{\"Int32\":20}}"},{"column_list":"b,a","value_list":"\"MaxValue\",\"MaxValue\""}]"#,
),
];
for (sql, expected) in cases {
let result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
match &result[0] {
Statement::CreateTable(c) => {
let partitions = parse_partitions(c).unwrap();
let json = serde_json::to_string(&partitions).unwrap();
assert_eq!(json, expected);
}
_ => unreachable!(),
}
}
}
}

View File

@@ -24,7 +24,7 @@ impl InfluxdbLineProtocolHandler for Instance {
query: &request.lines,
})?;
}
Mode::Distributed => {
Mode::Distributed(_) => {
self.dist_insert(request.try_into()?)
.await
.map_err(BoxedError::new)

View File

@@ -23,7 +23,7 @@ impl OpentsdbProtocolHandler for Instance {
data_point: format!("{:?}", data_point),
})?;
}
Mode::Distributed => {
Mode::Distributed(_) => {
self.dist_insert(vec![data_point.as_insert_request()])
.await
.map_err(BoxedError::new)

View File

@@ -107,7 +107,7 @@ impl PrometheusProtocolHandler for Instance {
msg: "failed to write prometheus remote request",
})?;
}
Mode::Distributed => {
Mode::Distributed(_) => {
let inserts = prometheus::write_request_to_insert_reqs(request)?;
self.dist_insert(inserts)

View File

@@ -1,18 +1,26 @@
mod columns;
pub(crate) mod columns;
pub(crate) mod range;
use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;
pub use datafusion_expr::Operator;
use datatypes::prelude::Value;
use meta_client::rpc::Partition as MetaPartition;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::storage::RegionNumber;
use crate::error::{self, Error};
pub(crate) type PartitionRuleRef<E> = Arc<dyn PartitionRule<Error = E>>;
pub trait PartitionRule: Sync + Send {
type Error: Debug;
fn as_any(&self) -> &dyn Any;
fn partition_columns(&self) -> Vec<String>;
// TODO(LFC): Unify `find_region` and `find_regions` methods when distributed read and write features are both merged into develop.
@@ -23,14 +31,92 @@ pub trait PartitionRule: Sync + Send {
}
/// The right bound(exclusive) of partition range.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
enum PartitionBound {
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub(crate) enum PartitionBound {
Value(Value),
// FIXME(LFC): no allow, for clippy temporarily
#[allow(dead_code)]
MaxValue,
}
#[derive(Debug)]
pub(crate) struct PartitionDef {
partition_columns: Vec<String>,
partition_bounds: Vec<PartitionBound>,
}
impl PartitionDef {
pub(crate) fn new(
partition_columns: Vec<String>,
partition_bounds: Vec<PartitionBound>,
) -> Self {
Self {
partition_columns,
partition_bounds,
}
}
pub(crate) fn partition_columns(&self) -> &Vec<String> {
&self.partition_columns
}
pub(crate) fn partition_bounds(&self) -> &Vec<PartitionBound> {
&self.partition_bounds
}
}
impl TryFrom<MetaPartition> for PartitionDef {
type Error = Error;
fn try_from(partition: MetaPartition) -> Result<Self, Self::Error> {
let MetaPartition {
column_list,
value_list,
} = partition;
let partition_columns = column_list
.into_iter()
.map(|x| String::from_utf8_lossy(&x).to_string())
.collect::<Vec<String>>();
let partition_bounds = value_list
.into_iter()
.map(|x| serde_json::from_str(&String::from_utf8_lossy(&x)))
.collect::<Result<Vec<PartitionBound>, serde_json::Error>>()
.context(error::DeserializeJsonSnafu)?;
Ok(PartitionDef {
partition_columns,
partition_bounds,
})
}
}
impl TryFrom<PartitionDef> for MetaPartition {
type Error = Error;
fn try_from(partition: PartitionDef) -> Result<Self, Self::Error> {
let PartitionDef {
partition_columns: columns,
partition_bounds: bounds,
} = partition;
let column_list = columns
.into_iter()
.map(|x| x.into_bytes())
.collect::<Vec<Vec<u8>>>();
let value_list = bounds
.into_iter()
.map(|x| serde_json::to_string(&x).map(|s| s.into_bytes()))
.collect::<Result<Vec<Vec<u8>>, serde_json::Error>>()
.context(error::SerializeJsonSnafu)?;
Ok(MetaPartition {
column_list,
value_list,
})
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct PartitionExpr {
column: String,
@@ -56,6 +142,44 @@ impl PartitionExpr {
mod tests {
use super::*;
#[test]
fn test_partition_def() {
// PartitionDef -> MetaPartition
let def = PartitionDef {
partition_columns: vec!["a".to_string(), "b".to_string()],
partition_bounds: vec![
PartitionBound::MaxValue,
PartitionBound::Value(1_i32.into()),
],
};
let partition: MetaPartition = def.try_into().unwrap();
assert_eq!(
r#"{"column_list":"a,b","value_list":"\"MaxValue\",{\"Value\":{\"Int32\":1}}"}"#,
serde_json::to_string(&partition).unwrap(),
);
// MetaPartition -> PartitionDef
let partition = MetaPartition {
column_list: vec![b"a".to_vec(), b"b".to_vec()],
value_list: vec![
b"\"MaxValue\"".to_vec(),
b"{\"Value\":{\"Int32\":1}}".to_vec(),
],
};
let def: PartitionDef = partition.try_into().unwrap();
assert_eq!(
def.partition_columns,
vec!["a".to_string(), "b".to_string()]
);
assert_eq!(
def.partition_bounds,
vec![
PartitionBound::MaxValue,
PartitionBound::Value(1_i32.into())
]
);
}
#[test]
fn test_partition_bound() {
let b1 = PartitionBound::Value(1_i32.into());

View File

@@ -1,3 +1,5 @@
use std::any::Any;
use datafusion_expr::Operator;
use datatypes::value::Value;
use snafu::ensure;
@@ -30,7 +32,7 @@ use crate::partitioning::{PartitionBound, PartitionExpr, PartitionRule};
///
/// Please refer to MySQL's ["RANGE COLUMNS Partitioning"](https://dev.mysql.com/doc/refman/8.0/en/partitioning-columns-range.html)
/// document for more details.
struct RangeColumnsPartitionRule {
pub struct RangeColumnsPartitionRule {
column_list: Vec<String>,
value_lists: Vec<Vec<PartitionBound>>,
regions: Vec<RegionNumber>,
@@ -61,9 +63,7 @@ struct RangeColumnsPartitionRule {
impl RangeColumnsPartitionRule {
// It's assured that input arguments are valid because they are checked in SQL parsing stage.
// So we can skip validating them.
// FIXME(LFC): no allow, for clippy temporarily
#[allow(dead_code)]
fn new(
pub(crate) fn new(
column_list: Vec<String>,
value_lists: Vec<Vec<PartitionBound>>,
regions: Vec<RegionNumber>,
@@ -108,11 +108,30 @@ impl RangeColumnsPartitionRule {
first_column_regions,
}
}
#[cfg(test)]
pub(crate) fn column_list(&self) -> &Vec<String> {
&self.column_list
}
#[cfg(test)]
pub(crate) fn value_lists(&self) -> &Vec<Vec<PartitionBound>> {
&self.value_lists
}
#[cfg(test)]
pub(crate) fn regions(&self) -> &Vec<RegionNumber> {
&self.regions
}
}
impl PartitionRule for RangeColumnsPartitionRule {
type Error = Error;
fn as_any(&self) -> &dyn Any {
self
}
fn partition_columns(&self) -> Vec<String> {
self.column_list.clone()
}

View File

@@ -1,3 +1,5 @@
use std::any::Any;
use datatypes::prelude::*;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
@@ -54,8 +56,6 @@ pub struct RangePartitionRule {
}
impl RangePartitionRule {
// FIXME(LFC): no allow, for clippy temporarily
#[allow(dead_code)]
pub(crate) fn new(
column_name: impl Into<String>,
bounds: Vec<Value>,
@@ -68,24 +68,44 @@ impl RangePartitionRule {
}
}
fn column_name(&self) -> &String {
pub(crate) fn column_name(&self) -> &String {
&self.column_name
}
fn all_regions(&self) -> &Vec<RegionNumber> {
pub(crate) fn all_regions(&self) -> &Vec<RegionNumber> {
&self.regions
}
#[cfg(test)]
pub(crate) fn bounds(&self) -> &Vec<Value> {
&self.bounds
}
}
impl PartitionRule for RangePartitionRule {
type Error = Error;
fn as_any(&self) -> &dyn Any {
self
}
fn partition_columns(&self) -> Vec<String> {
vec![self.column_name().to_string()]
}
fn find_region(&self, _values: &[Value]) -> Result<RegionNumber, Self::Error> {
unimplemented!()
fn find_region(&self, values: &[Value]) -> Result<RegionNumber, Self::Error> {
debug_assert_eq!(
values.len(),
1,
"RangePartitionRule can only handle one partition value, actual {}",
values.len()
);
let value = &values[0];
Ok(match self.bounds.binary_search(value) {
Ok(i) => self.regions[i + 1],
Err(i) => self.regions[i],
})
}
fn find_regions(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Self::Error> {

View File

@@ -158,6 +158,7 @@ fn partition_insert_request(
#[cfg(test)]
mod tests {
use std::any::Any;
use std::{collections::HashMap, result::Result, sync::Arc};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
@@ -422,6 +423,10 @@ mod tests {
impl PartitionRule for MockPartitionRule {
type Error = Error;
fn as_any(&self) -> &dyn Any {
self
}
fn partition_columns(&self) -> Vec<String> {
vec!["id".to_string()]
}

View File

@@ -13,6 +13,7 @@ use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_plan::Expr as DfExpr;
use datafusion::physical_plan::Partitioning;
use datatypes::prelude::Value;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use meta_client::rpc::{Peer, TableName};
use snafu::prelude::*;
@@ -26,7 +27,11 @@ use tokio::sync::RwLock;
use crate::datanode::DatanodeClients;
use crate::error::{self, Error, Result};
use crate::mock::{DatanodeInstance, TableScanPlan};
use crate::partitioning::{Operator, PartitionExpr, PartitionRuleRef};
use crate::partitioning::columns::RangeColumnsPartitionRule;
use crate::partitioning::range::RangePartitionRule;
use crate::partitioning::{
Operator, PartitionBound, PartitionDef, PartitionExpr, PartitionRuleRef,
};
use crate::spliter::WriteSpliter;
use crate::table::route::TableRoutes;
pub mod insert;
@@ -35,7 +40,6 @@ pub mod insert;
pub struct DistTable {
pub(crate) table_name: TableName,
pub(crate) schema: SchemaRef,
pub(crate) partition_rule: PartitionRuleRef<Error>,
pub(crate) table_routes: Arc<TableRoutes>,
pub(crate) datanode_clients: Arc<DatanodeClients>,
}
@@ -55,7 +59,9 @@ impl Table for DistTable {
}
async fn insert(&self, request: InsertRequest) -> table::Result<usize> {
let spliter = WriteSpliter::with_patition_rule(self.partition_rule.clone());
let partition_rule = self.find_partition_rule().await.map_err(TableError::new)?;
let spliter = WriteSpliter::with_patition_rule(partition_rule);
let inserts = spliter.split(request).map_err(TableError::new)?;
let result = match self.dist_insert(inserts).await.map_err(TableError::new)? {
client::ObjectResult::Select(_) => unreachable!(),
@@ -70,7 +76,11 @@ impl Table for DistTable {
filters: &[Expr],
limit: Option<usize>,
) -> table::Result<PhysicalPlanRef> {
let regions = self.find_regions(filters).map_err(TableError::new)?;
let partition_rule = self.find_partition_rule().await.map_err(TableError::new)?;
let regions = self
.find_regions(partition_rule, filters)
.map_err(TableError::new)?;
let datanodes = self
.find_datanodes(regions)
.await
@@ -107,11 +117,15 @@ impl Table for DistTable {
impl DistTable {
// TODO(LFC): Finding regions now seems less efficient, should be further looked into.
fn find_regions(&self, filters: &[Expr]) -> Result<Vec<RegionNumber>> {
fn find_regions(
&self,
partition_rule: PartitionRuleRef<Error>,
filters: &[Expr],
) -> Result<Vec<RegionNumber>> {
let regions = if let Some((first, rest)) = filters.split_first() {
let mut target = self.find_regions0(first)?;
let mut target = self.find_regions0(partition_rule.clone(), first)?;
for filter in rest {
let regions = self.find_regions0(filter)?;
let regions = self.find_regions0(partition_rule.clone(), filter)?;
// When all filters are provided as a collection, it often implicitly states that
// "all filters must be satisfied". So we join all the results here.
@@ -124,7 +138,7 @@ impl DistTable {
}
target.into_iter().collect::<Vec<_>>()
} else {
self.partition_rule.find_regions(&[])?
partition_rule.find_regions(&[])?
};
ensure!(
!regions.is_empty(),
@@ -139,7 +153,11 @@ impl DistTable {
// - BETWEEN and IN (maybe more)
// - expr with arithmetic like "a + 1 < 10" (should have been optimized in logic plan?)
// - not comparison or neither "AND" nor "OR" operations, for example, "a LIKE x"
fn find_regions0(&self, filter: &Expr) -> Result<HashSet<RegionNumber>> {
fn find_regions0(
&self,
partition_rule: PartitionRuleRef<Error>,
filter: &Expr,
) -> Result<HashSet<RegionNumber>> {
let expr = filter.df_expr();
match expr {
DfExpr::BinaryExpr { left, op, right } if is_compare_op(op) => {
@@ -155,8 +173,7 @@ impl DistTable {
.clone()
.try_into()
.with_context(|_| error::ConvertScalarValueSnafu { value: sv.clone() })?;
return Ok(self
.partition_rule
return Ok(partition_rule
.find_regions(&[PartitionExpr::new(column, op, value)])?
.into_iter()
.collect::<HashSet<RegionNumber>>());
@@ -165,8 +182,10 @@ impl DistTable {
DfExpr::BinaryExpr { left, op, right }
if matches!(op, Operator::And | Operator::Or) =>
{
let left_regions = self.find_regions0(&(*left.clone()).into())?;
let right_regions = self.find_regions0(&(*right.clone()).into())?;
let left_regions =
self.find_regions0(partition_rule.clone(), &(*left.clone()).into())?;
let right_regions =
self.find_regions0(partition_rule.clone(), &(*right.clone()).into())?;
let regions = match op {
Operator::And => left_regions
.intersection(&right_regions)
@@ -184,8 +203,7 @@ impl DistTable {
}
// Returns all regions for not supported partition expr as a safety hatch.
Ok(self
.partition_rule
Ok(partition_rule
.find_regions(&[])?
.into_iter()
.collect::<HashSet<RegionNumber>>())
@@ -217,6 +235,85 @@ impl DistTable {
}
Ok(datanodes)
}
async fn find_partition_rule(&self) -> Result<PartitionRuleRef<Error>> {
let route = self.table_routes.get_route(&self.table_name).await?;
ensure!(
!route.region_routes.is_empty(),
error::FindRegionRoutesSnafu {
table_name: self.table_name.to_string()
}
);
let mut partitions = Vec::with_capacity(route.region_routes.len());
for r in route.region_routes.iter() {
let partition =
r.region
.partition
.clone()
.context(error::FindRegionPartitionSnafu {
region: r.region.id,
table_name: self.table_name.to_string(),
})?;
let partition_def: PartitionDef = partition.try_into()?;
partitions.push((r.region.id, partition_def));
}
partitions.sort_by(|a, b| a.1.partition_bounds().cmp(b.1.partition_bounds()));
ensure!(
partitions
.windows(2)
.all(|w| w[0].1.partition_columns() == w[1].1.partition_columns()),
error::IllegalTableRoutesDataSnafu {
table_name: self.table_name.to_string(),
err_msg: "partition columns of all regions are not the same"
}
);
let partition_columns = partitions[0].1.partition_columns();
ensure!(
!partition_columns.is_empty(),
error::IllegalTableRoutesDataSnafu {
table_name: self.table_name.to_string(),
err_msg: "no partition columns found"
}
);
let regions = partitions
.iter()
.map(|x| x.0 as u32)
.collect::<Vec<RegionNumber>>();
// TODO(LFC): Serializing and deserializing partition rule is ugly, must find a much more elegant way.
let partition_rule: PartitionRuleRef<Error> = match partition_columns.len() {
1 => {
// Omit the last "MAXVALUE".
let bounds = partitions
.iter()
.filter_map(|(_, p)| match &p.partition_bounds()[0] {
PartitionBound::Value(v) => Some(v.clone()),
PartitionBound::MaxValue => None,
})
.collect::<Vec<Value>>();
Arc::new(RangePartitionRule::new(
partition_columns[0].clone(),
bounds,
regions,
)) as _
}
_ => {
let bounds = partitions
.iter()
.map(|x| x.1.partition_bounds().clone())
.collect::<Vec<Vec<PartitionBound>>>();
Arc::new(RangeColumnsPartitionRule::new(
partition_columns.clone(),
bounds,
regions,
)) as _
}
};
Ok(partition_rule)
}
}
fn project_schema(table_schema: SchemaRef, projection: &Option<Vec<usize>>) -> SchemaRef {
@@ -337,11 +434,11 @@ impl PartitionExec {
#[allow(clippy::print_stdout)]
#[cfg(test)]
mod test {
use api::v1::meta::{PutRequest, RequestHeader};
use catalog::RegisterTableRequest;
use chrono::DateTime;
use common_catalog::{TableGlobalKey, TableGlobalValue};
use common_recordbatch::{util, RecordBatch};
use api::v1::codec::InsertBatch;
use api::v1::column::SemanticType;
use api::v1::{column, insert_expr, Column, ColumnDataType};
use catalog::remote::MetaKvBackend;
use common_recordbatch::util;
use datafusion::arrow_print;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datafusion_expr::expr_fn::col;
@@ -349,24 +446,214 @@ mod test {
use datafusion_expr::lit;
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
use datanode::instance::Instance;
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnSchema, RawSchema, Schema};
use datatypes::vectors::{Int32Vector, UInt32Vector, UInt64Vector};
use meta_client::client::MetaClientBuilder;
use meta_client::rpc::{CreateRequest, Partition};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::rpc::router::RegionRoute;
use meta_client::rpc::{Region, Table, TableRoute};
use meta_srv::metasrv::MetaSrvOptions;
use meta_srv::mocks::MockInfo;
use meta_srv::service::store::kv::KvStoreRef;
use meta_srv::service::store::memory::MemStore;
use table::metadata::RawTableMeta;
use table::test_util::MemTable;
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
use sqlparser::dialect::GenericDialect;
use table::TableRef;
use tempdir::TempDir;
use super::*;
use crate::catalog::FrontendCatalogManager;
use crate::instance::distributed::DistInstance;
use crate::partitioning::range::RangePartitionRule;
#[tokio::test(flavor = "multi_thread")]
async fn test_find_partition_rule() {
let table_name = TableName::new("greptime", "public", "foo");
let column_schemas = vec![
ColumnSchema::new("ts", ConcreteDataType::uint64_datatype(), false),
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
];
let schema = Arc::new(Schema::new(column_schemas.clone()));
let table_routes = Arc::new(TableRoutes::new(Arc::new(MetaClient::default())));
let table = DistTable {
table_name: table_name.clone(),
schema,
table_routes: table_routes.clone(),
datanode_clients: Arc::new(DatanodeClients::new()),
};
let table_route = TableRoute {
table: Table {
id: 1,
table_name: table_name.clone(),
table_schema: vec![],
},
region_routes: vec![
RegionRoute {
region: Region {
id: 3,
name: "r1".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string()],
vec![PartitionBound::Value(10_i32.into())],
)
.try_into()
.unwrap(),
),
attrs: HashMap::new(),
},
leader_peer: None,
follower_peers: vec![],
},
RegionRoute {
region: Region {
id: 2,
name: "r2".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string()],
vec![PartitionBound::Value(50_i32.into())],
)
.try_into()
.unwrap(),
),
attrs: HashMap::new(),
},
leader_peer: None,
follower_peers: vec![],
},
RegionRoute {
region: Region {
id: 1,
name: "r3".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string()],
vec![PartitionBound::MaxValue],
)
.try_into()
.unwrap(),
),
attrs: HashMap::new(),
},
leader_peer: None,
follower_peers: vec![],
},
],
};
table_routes
.insert_table_route(table_name.clone(), Arc::new(table_route))
.await;
let partition_rule = table.find_partition_rule().await.unwrap();
let range_rule = partition_rule
.as_any()
.downcast_ref::<RangePartitionRule>()
.unwrap();
assert_eq!(range_rule.column_name(), "a");
assert_eq!(range_rule.all_regions(), &vec![3, 2, 1]);
assert_eq!(range_rule.bounds(), &vec![10_i32.into(), 50_i32.into()]);
let table_route = TableRoute {
table: Table {
id: 1,
table_name: table_name.clone(),
table_schema: vec![],
},
region_routes: vec![
RegionRoute {
region: Region {
id: 1,
name: "r1".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string(), "b".to_string()],
vec![
PartitionBound::Value(10_i32.into()),
PartitionBound::Value("hz".into()),
],
)
.try_into()
.unwrap(),
),
attrs: HashMap::new(),
},
leader_peer: None,
follower_peers: vec![],
},
RegionRoute {
region: Region {
id: 2,
name: "r2".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string(), "b".to_string()],
vec![
PartitionBound::Value(50_i32.into()),
PartitionBound::Value("sh".into()),
],
)
.try_into()
.unwrap(),
),
attrs: HashMap::new(),
},
leader_peer: None,
follower_peers: vec![],
},
RegionRoute {
region: Region {
id: 3,
name: "r3".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string(), "b".to_string()],
vec![PartitionBound::MaxValue, PartitionBound::MaxValue],
)
.try_into()
.unwrap(),
),
attrs: HashMap::new(),
},
leader_peer: None,
follower_peers: vec![],
},
],
};
table_routes
.insert_table_route(table_name.clone(), Arc::new(table_route))
.await;
let partition_rule = table.find_partition_rule().await.unwrap();
let range_columns_rule = partition_rule
.as_any()
.downcast_ref::<RangeColumnsPartitionRule>()
.unwrap();
assert_eq!(range_columns_rule.column_list(), &vec!["a", "b"]);
assert_eq!(
range_columns_rule.value_lists(),
&vec![
vec![
PartitionBound::Value(10_i32.into()),
PartitionBound::Value("hz".into()),
],
vec![
PartitionBound::Value(50_i32.into()),
PartitionBound::Value("sh".into()),
],
vec![PartitionBound::MaxValue, PartitionBound::MaxValue]
]
);
assert_eq!(range_columns_rule.regions(), &vec![1, 2, 3]);
}
#[tokio::test(flavor = "multi_thread")]
// FIXME(LFC): Remove ignore when auto create table upon insertion is ready.
#[ignore]
async fn test_dist_table_scan() {
common_telemetry::init_default_ut_logging();
let table = Arc::new(new_dist_table().await);
@@ -452,28 +739,21 @@ mod test {
];
let schema = Arc::new(Schema::new(column_schemas.clone()));
// PARTITION BY RANGE (a) (
// PARTITION r1 VALUES LESS THAN (10),
// PARTITION r2 VALUES LESS THAN (20),
// PARTITION r3 VALUES LESS THAN (50),
// PARTITION r4 VALUES LESS THAN (MAXVALUE),
// )
let partition_rule = RangePartitionRule::new(
"a",
vec![10_i32.into(), 20_i32.into(), 50_i32.into()],
vec![0_u32, 1, 2, 3],
);
let kv_store: KvStoreRef = Arc::new(MemStore::default()) as _;
let meta_srv =
meta_srv::mocks::mock(MetaSrvOptions::default(), kv_store.clone(), None).await;
let datanode_clients = Arc::new(DatanodeClients::new());
let mut datanode_instances = HashMap::new();
for datanode_id in 1..=4 {
datanode_instances.insert(
datanode_id,
create_datanode_instance(datanode_id, meta_srv.clone()).await,
);
let dn_instance = create_datanode_instance(datanode_id, meta_srv.clone()).await;
datanode_instances.insert(datanode_id, dn_instance.clone());
let (addr, client) = crate::tests::create_datanode_client(dn_instance).await;
datanode_clients
.insert_client(Peer::new(datanode_id, addr), client)
.await;
}
let MockInfo {
@@ -489,29 +769,47 @@ mod test {
let meta_client = Arc::new(meta_client);
let table_name = TableName::new("greptime", "public", "dist_numbers");
let create_request = CreateRequest {
table_name: table_name.clone(),
partitions: vec![
Partition {
column_list: vec![b"a".to_vec()],
value_list: vec![b"10".to_vec()],
},
Partition {
column_list: vec![b"a".to_vec()],
value_list: vec![b"20".to_vec()],
},
Partition {
column_list: vec![b"a".to_vec()],
value_list: vec![b"50".to_vec()],
},
Partition {
column_list: vec![b"a".to_vec()],
value_list: vec![b"MAXVALUE".to_vec()],
},
],
let meta_backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),
});
let table_routes = Arc::new(TableRoutes::new(meta_client.clone()));
let catalog_manager = FrontendCatalogManager::new(
meta_backend,
table_routes.clone(),
datanode_clients.clone(),
);
let dist_instance = DistInstance::new(
meta_client.clone(),
catalog_manager,
datanode_clients.clone(),
);
let sql = "
CREATE TABLE greptime.public.dist_numbers (
ts BIGINT,
a INT,
row_id INT,
TIME INDEX (ts),
)
PARTITION BY RANGE COLUMNS (a) (
PARTITION r0 VALUES LESS THAN (10),
PARTITION r1 VALUES LESS THAN (20),
PARTITION r2 VALUES LESS THAN (50),
PARTITION r3 VALUES LESS THAN (MAXVALUE),
)
ENGINE=mito";
let create_table = match ParserContext::create_with_dialect(sql, &GenericDialect {})
.unwrap()
.pop()
.unwrap()
{
Statement::CreateTable(c) => c,
_ => unreachable!(),
};
let mut route_response = meta_client.create_route(create_request).await.unwrap();
let table_route = route_response.table_routes.remove(0);
let _result = dist_instance.create_table(&create_table).await.unwrap();
let table_route = table_routes.get_route(&table_name).await.unwrap();
println!("{}", serde_json::to_string_pretty(&table_route).unwrap());
let mut region_to_datanode_mapping = HashMap::new();
@@ -521,50 +819,6 @@ mod test {
region_to_datanode_mapping.insert(region_id, datanode_id);
}
let table_global_key = TableGlobalKey {
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
};
let table_global_value = TableGlobalValue {
id: table_route.table.id as u32,
node_id: table_route
.region_routes
.first()
.unwrap()
.leader_peer
.as_ref()
.unwrap()
.id,
regions_id_map: HashMap::new(),
meta: RawTableMeta {
schema: RawSchema {
column_schemas: column_schemas.clone(),
timestamp_index: Some(0),
version: 0,
},
primary_key_indices: vec![],
value_indices: vec![],
engine: "".to_string(),
next_column_id: column_schemas.len() as u32,
region_numbers: vec![],
engine_options: HashMap::new(),
options: HashMap::new(),
created_on: DateTime::default(),
},
partition_rules: serde_json::to_string(&partition_rule).unwrap(),
};
let _put_response = kv_store
.put(PutRequest {
header: Some(RequestHeader::new((1000, 0))),
key: table_global_key.to_string().as_bytes().to_vec(),
value: table_global_value.as_bytes().unwrap(),
prev_kv: true,
})
.await
.unwrap();
let datanode_clients = Arc::new(DatanodeClients::new());
let mut global_start_ts = 1;
let regional_numbers = vec![
(0, (0..5).collect::<Vec<i32>>()),
@@ -577,45 +831,70 @@ mod test {
let instance = datanode_instances.get(&datanode_id).unwrap().clone();
let start_ts = global_start_ts;
global_start_ts += numbers.len() as u64;
global_start_ts += numbers.len() as i64;
let table = new_memtable(schema.clone(), numbers, vec![region_id], start_ts);
register_datanode_table(instance.clone(), table).await;
let (addr, client) = crate::tests::create_datanode_client(instance).await;
datanode_clients
.insert_client(Peer::new(datanode_id, addr), client)
.await;
insert_testing_data(&table_name, instance.clone(), numbers, start_ts).await;
}
DistTable {
table_name,
schema,
partition_rule: Arc::new(partition_rule),
table_routes: Arc::new(TableRoutes::new(meta_client)),
table_routes,
datanode_clients,
}
}
fn new_memtable(
schema: SchemaRef,
async fn insert_testing_data(
table_name: &TableName,
dn_instance: Arc<Instance>,
data: Vec<i32>,
regions: Vec<RegionNumber>,
start_ts: u64,
) -> MemTable {
start_ts: i64,
) {
let rows = data.len() as u32;
let columns: Vec<VectorRef> = vec![
// column "ts"
Arc::new(UInt64Vector::from_slice(
(start_ts..start_ts + rows as u64).collect::<Vec<u64>>(),
)),
// column "a"
Arc::new(Int32Vector::from_slice(data)),
// column "row_id"
Arc::new(UInt32Vector::from_slice((1..=rows).collect::<Vec<u32>>())),
];
let recordbatch = RecordBatch::new(schema, columns).unwrap();
MemTable::new_with_region("dist_numbers", recordbatch, regions)
let values = vec![InsertBatch {
columns: vec![
Column {
column_name: "ts".to_string(),
values: Some(column::Values {
i64_values: (start_ts..start_ts + rows as i64).collect::<Vec<i64>>(),
..Default::default()
}),
datatype: ColumnDataType::Int64 as i32,
semantic_type: SemanticType::Timestamp as i32,
..Default::default()
},
Column {
column_name: "a".to_string(),
values: Some(column::Values {
i32_values: data,
..Default::default()
}),
datatype: ColumnDataType::Int32 as i32,
..Default::default()
},
Column {
column_name: "row_id".to_string(),
values: Some(column::Values {
i32_values: (1..=rows as i32).collect::<Vec<i32>>(),
..Default::default()
}),
datatype: ColumnDataType::Int32 as i32,
..Default::default()
},
],
row_count: rows,
}
.into()];
let values = insert_expr::Values { values };
dn_instance
.execute_grpc_insert(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
values,
)
.await
.unwrap();
}
async fn create_datanode_instance(datanode_id: u64, meta_srv: MockInfo) -> Arc<Instance> {
@@ -642,25 +921,36 @@ mod test {
instance
}
async fn register_datanode_table(instance: Arc<Instance>, table: MemTable) {
let catalog_manager = instance.catalog_manager().clone();
let _ = catalog_manager
.register_table(RegisterTableRequest {
catalog: "greptime".to_string(),
schema: "public".to_string(),
table_name: table.table_name().to_string(),
table_id: 1234,
table: Arc::new(table),
})
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_find_regions() {
let table = new_dist_table().await;
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"a",
ConcreteDataType::int32_datatype(),
true,
)]));
let table = DistTable {
table_name: TableName::new("greptime", "public", "foo"),
schema,
table_routes: Arc::new(TableRoutes::new(Arc::new(MetaClient::default()))),
datanode_clients: Arc::new(DatanodeClients::new()),
};
// PARTITION BY RANGE (a) (
// PARTITION r1 VALUES LESS THAN (10),
// PARTITION r2 VALUES LESS THAN (20),
// PARTITION r3 VALUES LESS THAN (50),
// PARTITION r4 VALUES LESS THAN (MAXVALUE),
// )
let partition_rule: PartitionRuleRef<Error> = Arc::new(RangePartitionRule::new(
"a",
vec![10_i32.into(), 20_i32.into(), 50_i32.into()],
vec![0_u32, 1, 2, 3],
)) as _;
let test = |filters: Vec<Expr>, expect_regions: Vec<RegionNumber>| {
let mut regions = table.find_regions(filters.as_slice()).unwrap();
let mut regions = table
.find_regions(partition_rule.clone(), filters.as_slice())
.unwrap();
regions.sort();
assert_eq!(regions, expect_regions);
@@ -750,6 +1040,7 @@ mod test {
// test failed to find regions by contradictory filters
let regions = table.find_regions(
partition_rule,
vec![and(
binary_expr(col("a"), Operator::Lt, lit(20)),
binary_expr(col("a"), Operator::GtEq, lit(20)),

View File

@@ -53,4 +53,13 @@ impl TableRoutes {
let route = resp.table_routes.swap_remove(0);
Ok(Arc::new(route))
}
#[cfg(test)]
pub(crate) async fn insert_table_route(
&self,
table_name: TableName,
table_route: Arc<TableRoute>,
) {
self.cache.insert(table_name, table_route).await
}
}

View File

@@ -1,4 +1,4 @@
mod router;
pub mod router;
mod store;
pub mod util;

View File

@@ -137,10 +137,35 @@ pub struct TableRoute {
pub region_routes: Vec<RegionRoute>,
}
impl TableRoute {
pub fn find_leaders(&self) -> Vec<Peer> {
self.region_routes
.iter()
.flat_map(|x| &x.leader_peer)
.cloned()
.collect::<Vec<Peer>>()
}
pub fn find_leader_regions(&self, datanode: &Peer) -> Vec<u32> {
self.region_routes
.iter()
.filter_map(|x| {
if let Some(peer) = &x.leader_peer {
if peer == datanode {
return Some(x.region.id as u32);
}
}
None
})
.collect::<Vec<u32>>()
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Table {
pub id: u64,
pub table_name: TableName,
#[serde(serialize_with = "as_utf8")]
pub table_schema: Vec<u8>,
}
@@ -190,13 +215,24 @@ impl From<PbRegion> for Region {
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Partition {
#[serde(serialize_with = "as_utf8")]
#[serde(serialize_with = "as_utf8_vec")]
pub column_list: Vec<Vec<u8>>,
#[serde(serialize_with = "as_utf8")]
#[serde(serialize_with = "as_utf8_vec")]
pub value_list: Vec<Vec<u8>>,
}
fn as_utf8<S: Serializer>(val: &[Vec<u8>], serializer: S) -> std::result::Result<S::Ok, S::Error> {
fn as_utf8<S: Serializer>(val: &[u8], serializer: S) -> std::result::Result<S::Ok, S::Error> {
serializer.serialize_str(
String::from_utf8(val.to_vec())
.unwrap_or_else(|_| "<unknown-not-UTF8>".to_string())
.as_str(),
)
}
fn as_utf8_vec<S: Serializer>(
val: &[Vec<u8>],
serializer: S,
) -> std::result::Result<S::Ok, S::Error> {
serializer.serialize_str(
val.iter()
.map(|v| {