mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-18 14:00:39 +00:00
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -2000,6 +2000,7 @@ dependencies = [
|
||||
"catalog",
|
||||
"client",
|
||||
"common-base",
|
||||
"common-catalog",
|
||||
"common-error",
|
||||
"common-grpc",
|
||||
"common-query",
|
||||
@@ -2014,10 +2015,12 @@ dependencies = [
|
||||
"datatypes",
|
||||
"futures",
|
||||
"itertools",
|
||||
"meta-client",
|
||||
"openmetrics-parser",
|
||||
"prost 0.11.0",
|
||||
"query",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"servers",
|
||||
"snafu",
|
||||
"sql",
|
||||
|
||||
@@ -9,6 +9,6 @@ message InsertBatch {
|
||||
uint32 row_count = 2;
|
||||
}
|
||||
|
||||
message RegionId {
|
||||
uint64 id = 1;
|
||||
message RegionNumber {
|
||||
uint32 id = 1;
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
pub use prost::DecodeError;
|
||||
use prost::Message;
|
||||
|
||||
use crate::v1::codec::{InsertBatch, PhysicalPlanNode, RegionId, SelectResult};
|
||||
use crate::v1::codec::{InsertBatch, PhysicalPlanNode, RegionNumber, SelectResult};
|
||||
|
||||
macro_rules! impl_convert_with_bytes {
|
||||
($data_type: ty) => {
|
||||
@@ -24,7 +24,7 @@ macro_rules! impl_convert_with_bytes {
|
||||
impl_convert_with_bytes!(InsertBatch);
|
||||
impl_convert_with_bytes!(SelectResult);
|
||||
impl_convert_with_bytes!(PhysicalPlanNode);
|
||||
impl_convert_with_bytes!(RegionId);
|
||||
impl_convert_with_bytes!(RegionNumber);
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
@@ -130,10 +130,10 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_convert_region_id() {
|
||||
let region_id = RegionId { id: 12 };
|
||||
let region_id = RegionNumber { id: 12 };
|
||||
|
||||
let bytes: Vec<u8> = region_id.into();
|
||||
let region_id: RegionId = bytes.deref().try_into().unwrap();
|
||||
let region_id: RegionNumber = bytes.deref().try_into().unwrap();
|
||||
|
||||
assert_eq!(12, region_id.id);
|
||||
}
|
||||
|
||||
@@ -155,6 +155,25 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("Failed to parse table id from metasrv, data: {:?}", data))]
|
||||
ParseTableId { data: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Failed to deserialize partition rule from string: {:?}", data))]
|
||||
DeserializePartitionRule {
|
||||
data: String,
|
||||
source: serde_json::error::Error,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid table schema in catalog, source: {:?}", source))]
|
||||
InvalidSchemaInCatalog {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Catalog internal error: {}", source))]
|
||||
Internal {
|
||||
#[snafu(backtrace)]
|
||||
source: BoxedError,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -194,6 +213,9 @@ impl ErrorExt for Error {
|
||||
Error::BumpTableId { .. } | Error::ParseTableId { .. } => {
|
||||
StatusCode::StorageUnavailable
|
||||
}
|
||||
Error::DeserializePartitionRule { .. } => StatusCode::Unexpected,
|
||||
Error::InvalidSchemaInCatalog { .. } => StatusCode::Unexpected,
|
||||
Error::Internal { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,5 +3,5 @@ pub mod memory;
|
||||
|
||||
pub use manager::LocalCatalogManager;
|
||||
pub use memory::{
|
||||
new_memory_catalog_list, MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider,
|
||||
new_memory_catalog_list, MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider,
|
||||
};
|
||||
|
||||
@@ -25,7 +25,7 @@ use crate::error::{
|
||||
SchemaNotFoundSnafu, SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu, TableExistsSnafu,
|
||||
TableNotFoundSnafu,
|
||||
};
|
||||
use crate::local::memory::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
|
||||
use crate::local::memory::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
|
||||
use crate::system::{
|
||||
decode_system_catalog, Entry, SystemCatalogTable, TableEntry, ENTRY_TYPE_INDEX, KEY_INDEX,
|
||||
VALUE_INDEX,
|
||||
@@ -40,7 +40,7 @@ use crate::{
|
||||
/// A `CatalogManager` consists of a system catalog and a bunch of user catalogs.
|
||||
pub struct LocalCatalogManager {
|
||||
system: Arc<SystemCatalog>,
|
||||
catalogs: Arc<MemoryCatalogList>,
|
||||
catalogs: Arc<MemoryCatalogManager>,
|
||||
engine: TableEngineRef,
|
||||
next_table_id: AtomicU32,
|
||||
init_lock: Mutex<bool>,
|
||||
|
||||
@@ -1,23 +1,94 @@
|
||||
use std::any::Any;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
|
||||
use common_catalog::consts::MIN_USER_TABLE_ID;
|
||||
use snafu::OptionExt;
|
||||
use table::metadata::TableId;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{Result, TableExistsSnafu};
|
||||
use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
|
||||
use crate::schema::SchemaProvider;
|
||||
use crate::{CatalogList, CatalogProvider, CatalogProviderRef, SchemaProviderRef};
|
||||
use crate::{
|
||||
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, RegisterSystemTableRequest,
|
||||
RegisterTableRequest, SchemaProviderRef,
|
||||
};
|
||||
|
||||
/// Simple in-memory list of catalogs
|
||||
#[derive(Default)]
|
||||
pub struct MemoryCatalogList {
|
||||
pub struct MemoryCatalogManager {
|
||||
/// Collection of catalogs containing schemas and ultimately Tables
|
||||
pub catalogs: RwLock<HashMap<String, CatalogProviderRef>>,
|
||||
pub table_id: AtomicU32,
|
||||
}
|
||||
|
||||
impl MemoryCatalogList {
|
||||
impl Default for MemoryCatalogManager {
|
||||
fn default() -> Self {
|
||||
let manager = Self {
|
||||
table_id: AtomicU32::new(MIN_USER_TABLE_ID),
|
||||
catalogs: Default::default(),
|
||||
};
|
||||
let default_catalog = Arc::new(MemoryCatalogProvider::new());
|
||||
manager
|
||||
.register_catalog("greptime".to_string(), default_catalog.clone())
|
||||
.unwrap();
|
||||
default_catalog
|
||||
.register_schema("public".to_string(), Arc::new(MemorySchemaProvider::new()))
|
||||
.unwrap();
|
||||
manager
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl CatalogManager for MemoryCatalogManager {
|
||||
async fn start(&self) -> Result<()> {
|
||||
self.table_id.store(MIN_USER_TABLE_ID, Ordering::Relaxed);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn next_table_id(&self) -> Result<TableId> {
|
||||
Ok(self.table_id.fetch_add(1, Ordering::Relaxed))
|
||||
}
|
||||
|
||||
async fn register_table(&self, request: RegisterTableRequest) -> Result<usize> {
|
||||
let catalogs = self.catalogs.write().unwrap();
|
||||
let catalog = catalogs
|
||||
.get(&request.catalog)
|
||||
.context(CatalogNotFoundSnafu {
|
||||
catalog_name: &request.catalog,
|
||||
})?
|
||||
.clone();
|
||||
let schema = catalog
|
||||
.schema(&request.schema)?
|
||||
.with_context(|| SchemaNotFoundSnafu {
|
||||
schema_info: format!("{}.{}", &request.catalog, &request.schema),
|
||||
})?;
|
||||
schema
|
||||
.register_table(request.table_name, request.table)
|
||||
.map(|v| if v.is_some() { 0 } else { 1 })
|
||||
}
|
||||
|
||||
async fn register_system_table(&self, _request: RegisterSystemTableRequest) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn table(&self, catalog: &str, schema: &str, table_name: &str) -> Result<Option<TableRef>> {
|
||||
let c = self.catalogs.read().unwrap();
|
||||
let catalog = if let Some(c) = c.get(catalog) {
|
||||
c.clone()
|
||||
} else {
|
||||
return Ok(None);
|
||||
};
|
||||
match catalog.schema(schema)? {
|
||||
None => Ok(None),
|
||||
Some(s) => s.table(table_name),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MemoryCatalogManager {
|
||||
/// Registers a catalog and return `None` if no catalog with the same name was already
|
||||
/// registered, or `Some` with the previously registered catalog.
|
||||
pub fn register_catalog_if_absent(
|
||||
@@ -37,7 +108,7 @@ impl MemoryCatalogList {
|
||||
}
|
||||
}
|
||||
|
||||
impl CatalogList for MemoryCatalogList {
|
||||
impl CatalogList for MemoryCatalogManager {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
@@ -162,8 +233,8 @@ impl SchemaProvider for MemorySchemaProvider {
|
||||
}
|
||||
|
||||
/// Create a memory catalog list contains a numbers table for test
|
||||
pub fn new_memory_catalog_list() -> Result<Arc<MemoryCatalogList>> {
|
||||
Ok(Arc::new(MemoryCatalogList::default()))
|
||||
pub fn new_memory_catalog_list() -> Result<Arc<MemoryCatalogManager>> {
|
||||
Ok(Arc::new(MemoryCatalogManager::default()))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -178,23 +249,11 @@ mod tests {
|
||||
#[test]
|
||||
fn test_new_memory_catalog_list() {
|
||||
let catalog_list = new_memory_catalog_list().unwrap();
|
||||
let default_catalog = catalog_list.catalog(DEFAULT_CATALOG_NAME).unwrap().unwrap();
|
||||
|
||||
assert!(catalog_list
|
||||
.catalog(DEFAULT_CATALOG_NAME)
|
||||
.unwrap()
|
||||
.is_none());
|
||||
let default_catalog = Arc::new(MemoryCatalogProvider::default());
|
||||
catalog_list
|
||||
.register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog.clone())
|
||||
.unwrap();
|
||||
|
||||
assert!(default_catalog
|
||||
let default_schema = default_catalog
|
||||
.schema(DEFAULT_SCHEMA_NAME)
|
||||
.unwrap()
|
||||
.is_none());
|
||||
let default_schema = Arc::new(MemorySchemaProvider::default());
|
||||
default_catalog
|
||||
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema.clone())
|
||||
.unwrap();
|
||||
|
||||
default_schema
|
||||
@@ -203,7 +262,6 @@ mod tests {
|
||||
|
||||
let table = default_schema.table("numbers").unwrap();
|
||||
assert!(table.is_some());
|
||||
|
||||
assert!(default_schema.table("not_exists").unwrap().is_none());
|
||||
}
|
||||
|
||||
@@ -229,7 +287,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
pub fn test_register_if_absent() {
|
||||
let list = MemoryCatalogList::default();
|
||||
let list = MemoryCatalogManager::default();
|
||||
assert!(list
|
||||
.register_catalog_if_absent(
|
||||
"test_catalog".to_string(),
|
||||
@@ -241,6 +299,8 @@ mod tests {
|
||||
Arc::new(MemoryCatalogProvider::new()),
|
||||
)
|
||||
.unwrap();
|
||||
list.as_any().downcast_ref::<MemoryCatalogList>().unwrap();
|
||||
list.as_any()
|
||||
.downcast_ref::<MemoryCatalogManager>()
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -312,6 +312,7 @@ fn build_schema_for_tables() -> Schema {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_query::physical_plan::RuntimeEnv;
|
||||
use datatypes::arrow::array::Utf8Array;
|
||||
use datatypes::arrow::datatypes::DataType;
|
||||
@@ -319,32 +320,30 @@ mod tests {
|
||||
use table::table::numbers::NumbersTable;
|
||||
|
||||
use super::*;
|
||||
use crate::local::memory::{
|
||||
new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider,
|
||||
};
|
||||
use crate::local::memory::new_memory_catalog_list;
|
||||
use crate::CatalogList;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_tables() {
|
||||
let catalog_list = new_memory_catalog_list().unwrap();
|
||||
let catalog_provider = Arc::new(MemoryCatalogProvider::default());
|
||||
let schema = Arc::new(MemorySchemaProvider::new());
|
||||
let schema = catalog_list
|
||||
.catalog(DEFAULT_CATALOG_NAME)
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.schema(DEFAULT_SCHEMA_NAME)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
schema
|
||||
.register_table("test_table".to_string(), Arc::new(NumbersTable::default()))
|
||||
.unwrap();
|
||||
catalog_provider
|
||||
.register_schema("test_schema".to_string(), schema)
|
||||
.unwrap();
|
||||
catalog_list
|
||||
.register_catalog("test_catalog".to_string(), catalog_provider)
|
||||
.unwrap();
|
||||
let tables = Tables::new(catalog_list, "test_engine".to_string());
|
||||
|
||||
let tables = Tables::new(catalog_list, "test_engine".to_string());
|
||||
let tables_stream = tables.scan(&None, &[], None).await.unwrap();
|
||||
let mut tables_stream = tables_stream
|
||||
.execute(0, Arc::new(RuntimeEnv::default()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
if let Some(t) = tables_stream.next().await {
|
||||
let batch = t.unwrap().df_recordbatch;
|
||||
assert_eq!(1, batch.num_rows());
|
||||
@@ -354,7 +353,7 @@ mod tests {
|
||||
assert_eq!(&DataType::Utf8, batch.column(2).data_type());
|
||||
assert_eq!(&DataType::Utf8, batch.column(3).data_type());
|
||||
assert_eq!(
|
||||
"test_catalog",
|
||||
"greptime",
|
||||
batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
@@ -364,7 +363,7 @@ mod tests {
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
"test_schema",
|
||||
"public",
|
||||
batch
|
||||
.column(1)
|
||||
.as_any()
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::ops::Deref;
|
||||
|
||||
use api::v1::codec::RegionId;
|
||||
use api::v1::codec::RegionNumber;
|
||||
use api::v1::{
|
||||
admin_expr, codec::InsertBatch, insert_expr, object_expr, select_expr, AdminExpr, AdminResult,
|
||||
ObjectExpr, ObjectResult, SelectExpr,
|
||||
@@ -209,13 +209,13 @@ impl GrpcQueryHandler for Instance {
|
||||
})?;
|
||||
|
||||
// TODO(fys): _region_id is for later use.
|
||||
let _region_id: Option<RegionId> = insert_expr
|
||||
let _region_id: Option<RegionNumber> = insert_expr
|
||||
.options
|
||||
.get("region_id")
|
||||
.map(|id| {
|
||||
id.deref()
|
||||
.try_into()
|
||||
.context(servers::error::DecodeRegionIdSnafu)
|
||||
.context(servers::error::DecodeRegionNumberSnafu)
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ common-error = { path = "../common/error" }
|
||||
common-grpc = { path = "../common/grpc" }
|
||||
common-query = { path = "../common/query" }
|
||||
common-recordbatch = { path = "../common/recordbatch" }
|
||||
common-catalog = { path = "../common/catalog" }
|
||||
common-runtime = { path = "../common/runtime" }
|
||||
common-telemetry = { path = "../common/telemetry" }
|
||||
common-time = { path = "../common/time" }
|
||||
@@ -21,11 +22,13 @@ datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch =
|
||||
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
|
||||
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
|
||||
datatypes = { path = "../datatypes" }
|
||||
futures = "0.3"
|
||||
itertools = "0.10"
|
||||
openmetrics-parser = "0.4"
|
||||
prost = "0.11"
|
||||
query = { path = "../query" }
|
||||
serde = "1.0"
|
||||
serde_json = "1.0"
|
||||
sqlparser = "0.15"
|
||||
servers = { path = "../servers" }
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
@@ -33,6 +36,7 @@ sql = { path = "../sql" }
|
||||
store-api = { path = "../store-api" }
|
||||
table = { path = "../table" }
|
||||
tokio = { version = "1.18", features = ["full"] }
|
||||
meta-client = {path = "../meta-client"}
|
||||
|
||||
[dependencies.arrow]
|
||||
package = "arrow2"
|
||||
|
||||
270
src/frontend/src/catalog.rs
Normal file
270
src/frontend/src/catalog.rs
Normal file
@@ -0,0 +1,270 @@
|
||||
use std::any::Any;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::error::{
|
||||
DeserializePartitionRuleSnafu, InvalidCatalogValueSnafu, InvalidSchemaInCatalogSnafu,
|
||||
};
|
||||
use catalog::remote::{Kv, KvBackendRef};
|
||||
use catalog::{
|
||||
CatalogList, CatalogProvider, CatalogProviderRef, SchemaProvider, SchemaProviderRef,
|
||||
};
|
||||
use common_catalog::{CatalogKey, SchemaKey, TableGlobalKey, TableGlobalValue};
|
||||
use common_error::ext::BoxedError;
|
||||
use futures::StreamExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::TableRef;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::error::DatanodeNotAvailableSnafu;
|
||||
use crate::mock::{DatanodeId, DatanodeInstance};
|
||||
use crate::partitioning::range::RangePartitionRule;
|
||||
use crate::table::DistTable;
|
||||
|
||||
pub type DatanodeInstances = HashMap<DatanodeId, DatanodeInstance>;
|
||||
|
||||
pub struct FrontendCatalogManager {
|
||||
backend: KvBackendRef,
|
||||
datanode_instances: Arc<RwLock<DatanodeInstances>>,
|
||||
}
|
||||
|
||||
impl FrontendCatalogManager {
|
||||
#[allow(dead_code)]
|
||||
pub fn new(backend: KvBackendRef, datanode_instances: Arc<RwLock<DatanodeInstances>>) -> Self {
|
||||
Self {
|
||||
backend,
|
||||
datanode_instances,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CatalogList for FrontendCatalogManager {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn register_catalog(
|
||||
&self,
|
||||
_name: String,
|
||||
_catalog: CatalogProviderRef,
|
||||
) -> catalog::error::Result<Option<CatalogProviderRef>> {
|
||||
unimplemented!("Frontend catalog list does not support register catalog")
|
||||
}
|
||||
|
||||
fn catalog_names(&self) -> catalog::error::Result<Vec<String>> {
|
||||
let backend = self.backend.clone();
|
||||
let res = std::thread::spawn(|| {
|
||||
common_runtime::block_on_read(async move {
|
||||
let key = common_catalog::build_catalog_prefix();
|
||||
let mut iter = backend.range(key.as_bytes());
|
||||
let mut res = HashSet::new();
|
||||
|
||||
while let Some(r) = iter.next().await {
|
||||
let Kv(k, _) = r?;
|
||||
let key = CatalogKey::parse(String::from_utf8_lossy(&k))
|
||||
.context(InvalidCatalogValueSnafu)?;
|
||||
res.insert(key.catalog_name);
|
||||
}
|
||||
Ok(res.into_iter().collect())
|
||||
})
|
||||
})
|
||||
.join()
|
||||
.unwrap();
|
||||
res
|
||||
}
|
||||
|
||||
fn catalog(&self, name: &str) -> catalog::error::Result<Option<CatalogProviderRef>> {
|
||||
let all_catalogs = self.catalog_names()?;
|
||||
if all_catalogs.contains(&name.to_string()) {
|
||||
Ok(Some(Arc::new(FrontendCatalogProvider {
|
||||
catalog_name: name.to_string(),
|
||||
backend: self.backend.clone(),
|
||||
datanode_instances: self.datanode_instances.clone(),
|
||||
})))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FrontendCatalogProvider {
|
||||
catalog_name: String,
|
||||
backend: KvBackendRef,
|
||||
datanode_instances: Arc<RwLock<DatanodeInstances>>,
|
||||
}
|
||||
|
||||
impl CatalogProvider for FrontendCatalogProvider {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn schema_names(&self) -> catalog::error::Result<Vec<String>> {
|
||||
let backend = self.backend.clone();
|
||||
let catalog_name = self.catalog_name.clone();
|
||||
let res = std::thread::spawn(|| {
|
||||
common_runtime::block_on_read(async move {
|
||||
let key = common_catalog::build_schema_prefix(&catalog_name);
|
||||
let mut iter = backend.range(key.as_bytes());
|
||||
let mut res = HashSet::new();
|
||||
|
||||
while let Some(r) = iter.next().await {
|
||||
let Kv(k, _) = r?;
|
||||
let key = SchemaKey::parse(String::from_utf8_lossy(&k))
|
||||
.context(InvalidCatalogValueSnafu)?;
|
||||
res.insert(key.schema_name);
|
||||
}
|
||||
Ok(res.into_iter().collect())
|
||||
})
|
||||
})
|
||||
.join()
|
||||
.unwrap();
|
||||
res
|
||||
}
|
||||
|
||||
fn register_schema(
|
||||
&self,
|
||||
_name: String,
|
||||
_schema: SchemaProviderRef,
|
||||
) -> catalog::error::Result<Option<SchemaProviderRef>> {
|
||||
unimplemented!("Frontend catalog provider does not support register schema")
|
||||
}
|
||||
|
||||
fn schema(&self, name: &str) -> catalog::error::Result<Option<SchemaProviderRef>> {
|
||||
let all_schemas = self.schema_names()?;
|
||||
if all_schemas.contains(&name.to_string()) {
|
||||
Ok(Some(Arc::new(FrontendSchemaProvider {
|
||||
catalog_name: self.catalog_name.clone(),
|
||||
schema_name: name.to_string(),
|
||||
backend: self.backend.clone(),
|
||||
datanode_instances: self.datanode_instances.clone(),
|
||||
})))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FrontendSchemaProvider {
|
||||
catalog_name: String,
|
||||
schema_name: String,
|
||||
backend: KvBackendRef,
|
||||
datanode_instances: Arc<RwLock<DatanodeInstances>>,
|
||||
}
|
||||
|
||||
impl SchemaProvider for FrontendSchemaProvider {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn table_names(&self) -> catalog::error::Result<Vec<String>> {
|
||||
let backend = self.backend.clone();
|
||||
let catalog_name = self.catalog_name.clone();
|
||||
let schema_name = self.schema_name.clone();
|
||||
|
||||
std::thread::spawn(|| {
|
||||
common_runtime::block_on_read(async move {
|
||||
let key = common_catalog::build_table_global_prefix(catalog_name, schema_name);
|
||||
let mut iter = backend.range(key.as_bytes());
|
||||
let mut res = HashSet::new();
|
||||
|
||||
while let Some(r) = iter.next().await {
|
||||
let Kv(k, _) = r?;
|
||||
let key = TableGlobalKey::parse(String::from_utf8_lossy(&k))
|
||||
.context(InvalidCatalogValueSnafu)?;
|
||||
res.insert(key.table_name);
|
||||
}
|
||||
Ok(res.into_iter().collect())
|
||||
})
|
||||
})
|
||||
.join()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn table(&self, name: &str) -> catalog::error::Result<Option<TableRef>> {
|
||||
let table_global_key = TableGlobalKey {
|
||||
catalog_name: self.catalog_name.clone(),
|
||||
schema_name: self.schema_name.clone(),
|
||||
table_name: name.to_string(),
|
||||
};
|
||||
|
||||
let instances = self.datanode_instances.clone();
|
||||
let backend = self.backend.clone();
|
||||
let table_name = name.to_string();
|
||||
let result: Result<Option<TableRef>, catalog::error::Error> = std::thread::spawn(|| {
|
||||
common_runtime::block_on_read(async move {
|
||||
let mut datanode_instances = HashMap::new();
|
||||
let res = match backend.get(table_global_key.to_string().as_bytes()).await? {
|
||||
None => {
|
||||
return Ok(None);
|
||||
}
|
||||
Some(r) => r,
|
||||
};
|
||||
|
||||
let mut region_to_datanode_map = HashMap::new();
|
||||
|
||||
let val = TableGlobalValue::parse(String::from_utf8_lossy(&res.1))
|
||||
.context(InvalidCatalogValueSnafu)?;
|
||||
let node_id: DatanodeId = val.node_id;
|
||||
|
||||
// TODO(hl): We need to deserialize string to PartitionRule trait object
|
||||
let partition_rule: Arc<RangePartitionRule> =
|
||||
Arc::new(serde_json::from_str(&val.partition_rules).context(
|
||||
DeserializePartitionRuleSnafu {
|
||||
data: &val.partition_rules,
|
||||
},
|
||||
)?);
|
||||
|
||||
for (node_id, region_ids) in val.regions_id_map {
|
||||
for region_id in region_ids {
|
||||
region_to_datanode_map.insert(region_id, node_id);
|
||||
}
|
||||
}
|
||||
|
||||
datanode_instances.insert(
|
||||
node_id,
|
||||
instances
|
||||
.read()
|
||||
.await
|
||||
.get(&node_id)
|
||||
.context(DatanodeNotAvailableSnafu { node_id })
|
||||
.map_err(BoxedError::new)
|
||||
.context(catalog::error::InternalSnafu)?
|
||||
.clone(),
|
||||
);
|
||||
|
||||
let table = Arc::new(DistTable {
|
||||
table_name: table_name.clone(),
|
||||
schema: Arc::new(
|
||||
val.meta
|
||||
.schema
|
||||
.try_into()
|
||||
.context(InvalidSchemaInCatalogSnafu)?,
|
||||
),
|
||||
partition_rule,
|
||||
region_dist_map: region_to_datanode_map,
|
||||
datanode_instances,
|
||||
});
|
||||
Ok(Some(table as _))
|
||||
})
|
||||
})
|
||||
.join()
|
||||
.unwrap();
|
||||
result
|
||||
}
|
||||
|
||||
fn register_table(
|
||||
&self,
|
||||
_name: String,
|
||||
_table: TableRef,
|
||||
) -> catalog::error::Result<Option<TableRef>> {
|
||||
unimplemented!("Frontend schema provider does not support register table")
|
||||
}
|
||||
|
||||
fn deregister_table(&self, _name: &str) -> catalog::error::Result<Option<TableRef>> {
|
||||
unimplemented!("Frontend schema provider does not support deregister table")
|
||||
}
|
||||
|
||||
fn table_exist(&self, name: &str) -> catalog::error::Result<bool> {
|
||||
Ok(self.table_names()?.contains(&name.to_string()))
|
||||
}
|
||||
}
|
||||
@@ -142,12 +142,6 @@ pub enum Error {
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to access catalog, source: {}", source))]
|
||||
Catalog {
|
||||
#[snafu(backtrace)]
|
||||
source: catalog::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Table not found: {}", table_name))]
|
||||
TableNotFound {
|
||||
table_name: String,
|
||||
@@ -177,6 +171,21 @@ pub enum Error {
|
||||
source: common_runtime::JoinError,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed access catalog: {}", source))]
|
||||
Catalog {
|
||||
#[snafu(backtrace)]
|
||||
source: catalog::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse catalog entry: {}", source))]
|
||||
ParseCatalogEntry {
|
||||
#[snafu(backtrace)]
|
||||
source: common_catalog::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Cannot find datanode by id: {}", node_id))]
|
||||
DatanodeNotAvailable { node_id: u64, backtrace: Backtrace },
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -205,8 +214,6 @@ impl ErrorExt for Error {
|
||||
|
||||
Error::RequestDatanode { source } => source.status_code(),
|
||||
|
||||
Error::Catalog { source } => source.status_code(),
|
||||
|
||||
Error::ColumnDataType { .. }
|
||||
| Error::FindDatanode { .. }
|
||||
| Error::DatanodeInstance { .. } => StatusCode::Internal,
|
||||
@@ -220,6 +227,9 @@ impl ErrorExt for Error {
|
||||
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
|
||||
|
||||
Error::JoinTask { .. } => StatusCode::Unexpected,
|
||||
Error::Catalog { source, .. } => source.status_code(),
|
||||
Error::ParseCatalogEntry { source, .. } => source.status_code(),
|
||||
Error::DatanodeNotAvailable { .. } => StatusCode::StorageUnavailable,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
#![feature(assert_matches)]
|
||||
|
||||
mod catalog;
|
||||
pub mod error;
|
||||
pub mod frontend;
|
||||
pub mod grpc;
|
||||
|
||||
@@ -21,7 +21,7 @@ use table::table::adapter::DfTableProviderAdapter;
|
||||
pub(crate) type DatanodeId = u64;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct DatanodeInstance {
|
||||
pub struct DatanodeInstance {
|
||||
pub(crate) datanode_id: DatanodeId,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
db: Database,
|
||||
@@ -37,12 +37,12 @@ impl DatanodeInstance {
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn new(
|
||||
datanode_id: DatanodeId,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
catalog_list: CatalogManagerRef,
|
||||
db: Database,
|
||||
) -> Self {
|
||||
Self {
|
||||
datanode_id,
|
||||
catalog_manager,
|
||||
catalog_manager: catalog_list,
|
||||
db,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ use std::sync::Arc;
|
||||
|
||||
pub use datafusion_expr::Operator;
|
||||
use datatypes::prelude::Value;
|
||||
use store_api::storage::RegionId;
|
||||
use store_api::storage::RegionNumber;
|
||||
|
||||
pub(crate) type PartitionRuleRef<E> = Arc<dyn PartitionRule<Error = E>>;
|
||||
|
||||
@@ -17,9 +17,9 @@ pub trait PartitionRule: Sync + Send {
|
||||
|
||||
// TODO(LFC): Unify `find_region` and `find_regions` methods when distributed read and write features are both merged into develop.
|
||||
// Or find better names since one is mainly for writes and the other is for reads.
|
||||
fn find_region(&self, values: &[Value]) -> Result<RegionId, Self::Error>;
|
||||
fn find_region(&self, values: &[Value]) -> Result<RegionNumber, Self::Error>;
|
||||
|
||||
fn find_regions(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionId>, Self::Error>;
|
||||
fn find_regions(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Self::Error>;
|
||||
}
|
||||
|
||||
/// The right bound(exclusive) of partition range.
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
use datafusion_expr::Operator;
|
||||
use datatypes::value::Value;
|
||||
use snafu::ensure;
|
||||
use store_api::storage::RegionNumber;
|
||||
|
||||
use crate::error::{self, Error};
|
||||
use crate::partitioning::{PartitionBound, PartitionExpr, PartitionRule, RegionId};
|
||||
use crate::partitioning::{PartitionBound, PartitionExpr, PartitionRule};
|
||||
|
||||
/// A [RangeColumnsPartitionRule] is very similar to [RangePartitionRule] except that it allows
|
||||
/// partitioning by multiple columns.
|
||||
@@ -32,7 +33,7 @@ use crate::partitioning::{PartitionBound, PartitionExpr, PartitionRule, RegionId
|
||||
struct RangeColumnsPartitionRule {
|
||||
column_list: Vec<String>,
|
||||
value_lists: Vec<Vec<PartitionBound>>,
|
||||
regions: Vec<RegionId>,
|
||||
regions: Vec<RegionNumber>,
|
||||
|
||||
// TODO(LFC): Implement finding regions by all partitioning columns, not by the first one only.
|
||||
// Singled out the first partitioning column's bounds for finding regions by range.
|
||||
@@ -54,7 +55,7 @@ struct RangeColumnsPartitionRule {
|
||||
//
|
||||
// The following two fields are acted as caches, so we don't need to recalculate them every time.
|
||||
first_column_bounds: Vec<PartitionBound>,
|
||||
first_column_regions: Vec<Vec<RegionId>>,
|
||||
first_column_regions: Vec<Vec<RegionNumber>>,
|
||||
}
|
||||
|
||||
impl RangeColumnsPartitionRule {
|
||||
@@ -65,7 +66,7 @@ impl RangeColumnsPartitionRule {
|
||||
fn new(
|
||||
column_list: Vec<String>,
|
||||
value_lists: Vec<Vec<PartitionBound>>,
|
||||
regions: Vec<RegionId>,
|
||||
regions: Vec<RegionNumber>,
|
||||
) -> Self {
|
||||
// An example range columns partition rule to calculate the first column bounds and regions:
|
||||
// SQL:
|
||||
@@ -87,7 +88,7 @@ impl RangeColumnsPartitionRule {
|
||||
|
||||
let mut distinct_bounds = Vec::<PartitionBound>::new();
|
||||
distinct_bounds.push(first_column_bounds[0].clone());
|
||||
let mut first_column_regions = Vec::<Vec<RegionId>>::new();
|
||||
let mut first_column_regions = Vec::<Vec<RegionNumber>>::new();
|
||||
first_column_regions.push(vec![regions[0]]);
|
||||
|
||||
for i in 1..first_column_bounds.len() {
|
||||
@@ -116,7 +117,7 @@ impl PartitionRule for RangeColumnsPartitionRule {
|
||||
self.column_list.clone()
|
||||
}
|
||||
|
||||
fn find_region(&self, values: &[Value]) -> Result<RegionId, Self::Error> {
|
||||
fn find_region(&self, values: &[Value]) -> Result<RegionNumber, Self::Error> {
|
||||
ensure!(
|
||||
values.len() == self.column_list.len(),
|
||||
error::RegionKeysSizeSnafu {
|
||||
@@ -137,7 +138,7 @@ impl PartitionRule for RangeColumnsPartitionRule {
|
||||
})
|
||||
}
|
||||
|
||||
fn find_regions(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionId>, Self::Error> {
|
||||
fn find_regions(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Self::Error> {
|
||||
let regions = if exprs.iter().all(|x| self.column_list.contains(&x.column)) {
|
||||
let PartitionExpr {
|
||||
column: _,
|
||||
@@ -173,7 +174,7 @@ impl PartitionRule for RangeColumnsPartitionRule {
|
||||
.iter()
|
||||
.flatten()
|
||||
.cloned()
|
||||
.collect::<Vec<RegionId>>()
|
||||
.collect::<Vec<RegionNumber>>()
|
||||
} else {
|
||||
self.regions.clone()
|
||||
};
|
||||
@@ -224,7 +225,7 @@ mod tests {
|
||||
vec![1, 2, 3, 4, 5, 6],
|
||||
);
|
||||
|
||||
let test = |op: Operator, value: &str, expected_regions: Vec<u64>| {
|
||||
let test = |op: Operator, value: &str, expected_regions: Vec<RegionNumber>| {
|
||||
let exprs = vec![
|
||||
// Intentionally fix column b's partition expr to "b < 1". If we support finding
|
||||
// regions by both columns("a" and "b") in the future, some test cases should fail.
|
||||
@@ -242,7 +243,7 @@ mod tests {
|
||||
let regions = rule.find_regions(&exprs).unwrap();
|
||||
assert_eq!(
|
||||
regions,
|
||||
expected_regions.into_iter().collect::<Vec<RegionId>>()
|
||||
expected_regions.into_iter().collect::<Vec<RegionNumber>>()
|
||||
);
|
||||
};
|
||||
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
use datatypes::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::OptionExt;
|
||||
use store_api::storage::RegionNumber;
|
||||
|
||||
use crate::error::{self, Error};
|
||||
use crate::partitioning::{Operator, PartitionExpr, PartitionRule, RegionId};
|
||||
use crate::partitioning::{Operator, PartitionExpr, PartitionRule};
|
||||
|
||||
/// [RangePartitionRule] manages the distribution of partitions partitioning by some column's value
|
||||
/// range. It's generated from create table request, using MySQL's syntax:
|
||||
@@ -41,13 +43,14 @@ use crate::partitioning::{Operator, PartitionExpr, PartitionRule, RegionId};
|
||||
///
|
||||
// TODO(LFC): Further clarify "partition" and "region".
|
||||
// Could be creating an extra layer between partition and region.
|
||||
pub(crate) struct RangePartitionRule {
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct RangePartitionRule {
|
||||
column_name: String,
|
||||
// Does not store the last "MAXVALUE" bound; because in this way our binary search in finding
|
||||
// partitions are easier (besides, it's hard to represent "MAXVALUE" in our `Value`).
|
||||
// Then the length of `bounds` is one less than `regions`.
|
||||
bounds: Vec<Value>,
|
||||
regions: Vec<RegionId>,
|
||||
regions: Vec<RegionNumber>,
|
||||
}
|
||||
|
||||
impl RangePartitionRule {
|
||||
@@ -56,7 +59,7 @@ impl RangePartitionRule {
|
||||
pub(crate) fn new(
|
||||
column_name: impl Into<String>,
|
||||
bounds: Vec<Value>,
|
||||
regions: Vec<RegionId>,
|
||||
regions: Vec<RegionNumber>,
|
||||
) -> Self {
|
||||
Self {
|
||||
column_name: column_name.into(),
|
||||
@@ -69,7 +72,7 @@ impl RangePartitionRule {
|
||||
&self.column_name
|
||||
}
|
||||
|
||||
fn all_regions(&self) -> &Vec<RegionId> {
|
||||
fn all_regions(&self) -> &Vec<RegionNumber> {
|
||||
&self.regions
|
||||
}
|
||||
}
|
||||
@@ -81,11 +84,11 @@ impl PartitionRule for RangePartitionRule {
|
||||
vec![self.column_name().to_string()]
|
||||
}
|
||||
|
||||
fn find_region(&self, _values: &[Value]) -> Result<RegionId, Self::Error> {
|
||||
fn find_region(&self, _values: &[Value]) -> Result<RegionNumber, Self::Error> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn find_regions(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionId>, Self::Error> {
|
||||
fn find_regions(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Self::Error> {
|
||||
if exprs.is_empty() {
|
||||
return Ok(self.regions.clone());
|
||||
}
|
||||
@@ -152,18 +155,19 @@ mod test {
|
||||
regions: vec![1, 2, 3, 4],
|
||||
};
|
||||
|
||||
let test = |column: &str, op: Operator, value: &str, expected_regions: Vec<u64>| {
|
||||
let expr = PartitionExpr {
|
||||
column: column.to_string(),
|
||||
op,
|
||||
value: value.into(),
|
||||
let test =
|
||||
|column: &str, op: Operator, value: &str, expected_regions: Vec<RegionNumber>| {
|
||||
let expr = PartitionExpr {
|
||||
column: column.to_string(),
|
||||
op,
|
||||
value: value.into(),
|
||||
};
|
||||
let regions = rule.find_regions(&[expr]).unwrap();
|
||||
assert_eq!(
|
||||
regions,
|
||||
expected_regions.into_iter().collect::<Vec<RegionNumber>>()
|
||||
);
|
||||
};
|
||||
let regions = rule.find_regions(&[expr]).unwrap();
|
||||
assert_eq!(
|
||||
regions,
|
||||
expected_regions.into_iter().collect::<Vec<RegionId>>()
|
||||
);
|
||||
};
|
||||
|
||||
test("a", Operator::NotEq, "hz", vec![1, 2, 3, 4]);
|
||||
test("a", Operator::NotEq, "what", vec![1, 2, 3, 4]);
|
||||
|
||||
@@ -5,7 +5,7 @@ use datatypes::vectors::VectorBuilder;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use snafu::ensure;
|
||||
use snafu::OptionExt;
|
||||
use store_api::storage::RegionId;
|
||||
use store_api::storage::RegionNumber;
|
||||
use table::requests::InsertRequest;
|
||||
|
||||
use crate::error::Error;
|
||||
@@ -15,7 +15,7 @@ use crate::error::InvalidInsertRequestSnafu;
|
||||
use crate::error::Result;
|
||||
use crate::partitioning::PartitionRuleRef;
|
||||
|
||||
pub type DistInsertRequest = HashMap<RegionId, InsertRequest>;
|
||||
pub type DistInsertRequest = HashMap<RegionNumber, InsertRequest>;
|
||||
|
||||
pub struct WriteSpliter {
|
||||
partition_rule: PartitionRuleRef<Error>,
|
||||
@@ -41,11 +41,11 @@ impl WriteSpliter {
|
||||
fn split_partitioning_values(
|
||||
&self,
|
||||
values: &[VectorRef],
|
||||
) -> Result<HashMap<RegionId, Vec<usize>>> {
|
||||
) -> Result<HashMap<RegionNumber, Vec<usize>>> {
|
||||
if values.is_empty() {
|
||||
return Ok(HashMap::default());
|
||||
}
|
||||
let mut region_map: HashMap<RegionId, Vec<usize>> = HashMap::new();
|
||||
let mut region_map: HashMap<RegionNumber, Vec<usize>> = HashMap::new();
|
||||
let row_count = values[0].len();
|
||||
for idx in 0..row_count {
|
||||
let region_id = match self
|
||||
@@ -113,9 +113,9 @@ fn partition_values(partition_columns: &[VectorRef], idx: usize) -> Vec<Value> {
|
||||
|
||||
fn partition_insert_request(
|
||||
insert: &InsertRequest,
|
||||
region_map: HashMap<RegionId, Vec<usize>>,
|
||||
region_map: HashMap<RegionNumber, Vec<usize>>,
|
||||
) -> DistInsertRequest {
|
||||
let mut dist_insert: HashMap<RegionId, HashMap<&str, VectorBuilder>> =
|
||||
let mut dist_insert: HashMap<RegionNumber, HashMap<&str, VectorBuilder>> =
|
||||
HashMap::with_capacity(region_map.len());
|
||||
|
||||
let column_count = insert.columns_values.len();
|
||||
@@ -162,10 +162,12 @@ mod tests {
|
||||
value::Value,
|
||||
vectors::VectorBuilder,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::storage::RegionNumber;
|
||||
use table::requests::InsertRequest;
|
||||
|
||||
use super::{
|
||||
check_req, find_partitioning_values, partition_insert_request, partition_values, RegionId,
|
||||
check_req, find_partitioning_values, partition_insert_request, partition_values,
|
||||
WriteSpliter,
|
||||
};
|
||||
use crate::{
|
||||
@@ -242,13 +244,13 @@ mod tests {
|
||||
#[test]
|
||||
fn test_partition_insert_request() {
|
||||
let insert = mock_insert_request();
|
||||
let mut region_map: HashMap<RegionId, Vec<usize>> = HashMap::with_capacity(2);
|
||||
let mut region_map: HashMap<RegionNumber, Vec<usize>> = HashMap::with_capacity(2);
|
||||
region_map.insert(1, vec![2, 0]);
|
||||
region_map.insert(2, vec![1]);
|
||||
|
||||
let dist_insert = partition_insert_request(&insert, region_map);
|
||||
|
||||
let r1_insert = dist_insert.get(&1_u64).unwrap();
|
||||
let r1_insert = dist_insert.get(&1_u32).unwrap();
|
||||
assert_eq!("demo", r1_insert.table_name);
|
||||
let expected: Value = 3_i16.into();
|
||||
assert_eq!(expected, r1_insert.columns_values.get("id").unwrap().get(0));
|
||||
@@ -283,7 +285,7 @@ mod tests {
|
||||
.get(1)
|
||||
);
|
||||
|
||||
let r2_insert = dist_insert.get(&2_u64).unwrap();
|
||||
let r2_insert = dist_insert.get(&2_u32).unwrap();
|
||||
assert_eq!("demo", r2_insert.table_name);
|
||||
let expected: Value = 2_i16.into();
|
||||
assert_eq!(expected, r2_insert.columns_values.get("id").unwrap().get(0));
|
||||
@@ -401,6 +403,7 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct MockPartitionRule;
|
||||
|
||||
// PARTITION BY LIST COLUMNS(id) (
|
||||
@@ -414,7 +417,7 @@ mod tests {
|
||||
vec!["id".to_string()]
|
||||
}
|
||||
|
||||
fn find_region(&self, values: &[Value]) -> Result<RegionId, Self::Error> {
|
||||
fn find_region(&self, values: &[Value]) -> Result<RegionNumber, Self::Error> {
|
||||
let val = values.get(0).unwrap().to_owned();
|
||||
let id_1: Value = 1_i16.into();
|
||||
let id_2: Value = 2_i16.into();
|
||||
@@ -428,7 +431,7 @@ mod tests {
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
fn find_regions(&self, _: &[PartitionExpr]) -> Result<Vec<RegionId>, Self::Error> {
|
||||
fn find_regions(&self, _: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Error> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ use datafusion::logical_plan::Expr as DfExpr;
|
||||
use datafusion::physical_plan::Partitioning;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use snafu::prelude::*;
|
||||
use store_api::storage::RegionId;
|
||||
use store_api::storage::RegionNumber;
|
||||
use table::error::Error as TableError;
|
||||
use table::metadata::{FilterPushDownType, TableInfoRef};
|
||||
use table::requests::InsertRequest;
|
||||
@@ -26,12 +26,12 @@ use crate::mock::{DatanodeId, DatanodeInstance, TableScanPlan};
|
||||
use crate::partitioning::{Operator, PartitionExpr, PartitionRuleRef};
|
||||
use crate::spliter::WriteSpliter;
|
||||
|
||||
struct DistTable {
|
||||
table_name: String,
|
||||
schema: SchemaRef,
|
||||
partition_rule: PartitionRuleRef<Error>,
|
||||
region_dist_map: HashMap<RegionId, DatanodeId>,
|
||||
datanode_instances: HashMap<DatanodeId, DatanodeInstance>,
|
||||
pub struct DistTable {
|
||||
pub table_name: String,
|
||||
pub schema: SchemaRef,
|
||||
pub partition_rule: PartitionRuleRef<Error>,
|
||||
pub region_dist_map: HashMap<RegionNumber, DatanodeId>,
|
||||
pub datanode_instances: HashMap<DatanodeId, DatanodeInstance>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -104,7 +104,7 @@ impl Table for DistTable {
|
||||
|
||||
impl DistTable {
|
||||
// TODO(LFC): Finding regions now seems less efficient, should be further looked into.
|
||||
fn find_regions(&self, filters: &[Expr]) -> Result<Vec<RegionId>> {
|
||||
fn find_regions(&self, filters: &[Expr]) -> Result<Vec<RegionNumber>> {
|
||||
let regions = if let Some((first, rest)) = filters.split_first() {
|
||||
let mut target = self.find_regions0(first)?;
|
||||
for filter in rest {
|
||||
@@ -119,7 +119,7 @@ impl DistTable {
|
||||
break;
|
||||
}
|
||||
}
|
||||
target.into_iter().collect::<Vec<RegionId>>()
|
||||
target.into_iter().collect::<Vec<_>>()
|
||||
} else {
|
||||
self.partition_rule.find_regions(&[])?
|
||||
};
|
||||
@@ -136,7 +136,7 @@ impl DistTable {
|
||||
// - BETWEEN and IN (maybe more)
|
||||
// - expr with arithmetic like "a + 1 < 10" (should have been optimized in logic plan?)
|
||||
// - not comparison or neither "AND" nor "OR" operations, for example, "a LIKE x"
|
||||
fn find_regions0(&self, filter: &Expr) -> Result<HashSet<RegionId>> {
|
||||
fn find_regions0(&self, filter: &Expr) -> Result<HashSet<RegionNumber>> {
|
||||
let expr = filter.df_expr();
|
||||
match expr {
|
||||
DfExpr::BinaryExpr { left, op, right } if is_compare_op(op) => {
|
||||
@@ -156,7 +156,7 @@ impl DistTable {
|
||||
.partition_rule
|
||||
.find_regions(&[PartitionExpr::new(column, op, value)])?
|
||||
.into_iter()
|
||||
.collect::<HashSet<RegionId>>());
|
||||
.collect::<HashSet<RegionNumber>>());
|
||||
}
|
||||
}
|
||||
DfExpr::BinaryExpr { left, op, right }
|
||||
@@ -168,11 +168,11 @@ impl DistTable {
|
||||
Operator::And => left_regions
|
||||
.intersection(&right_regions)
|
||||
.cloned()
|
||||
.collect::<HashSet<RegionId>>(),
|
||||
.collect::<HashSet<RegionNumber>>(),
|
||||
Operator::Or => left_regions
|
||||
.union(&right_regions)
|
||||
.cloned()
|
||||
.collect::<HashSet<RegionId>>(),
|
||||
.collect::<HashSet<RegionNumber>>(),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
return Ok(regions);
|
||||
@@ -185,10 +185,13 @@ impl DistTable {
|
||||
.partition_rule
|
||||
.find_regions(&[])?
|
||||
.into_iter()
|
||||
.collect::<HashSet<RegionId>>())
|
||||
.collect::<HashSet<RegionNumber>>())
|
||||
}
|
||||
|
||||
fn find_datanodes(&self, regions: Vec<RegionId>) -> Result<HashMap<DatanodeId, Vec<RegionId>>> {
|
||||
fn find_datanodes(
|
||||
&self,
|
||||
regions: Vec<RegionNumber>,
|
||||
) -> Result<HashMap<DatanodeId, Vec<RegionNumber>>> {
|
||||
let mut datanodes = HashMap::new();
|
||||
for region in regions.iter() {
|
||||
let datanode = *self
|
||||
@@ -344,6 +347,7 @@ mod test {
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_dist_table_scan() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let table = Arc::new(new_dist_table().await);
|
||||
|
||||
// should scan all regions
|
||||
@@ -434,7 +438,7 @@ mod test {
|
||||
let partition_rule = RangePartitionRule::new(
|
||||
"a",
|
||||
vec![10_i32.into(), 20_i32.into(), 50_i32.into()],
|
||||
vec![1_u64, 2, 3, 4],
|
||||
vec![1_u32, 2, 3, 4],
|
||||
);
|
||||
|
||||
let table1 = new_memtable(schema.clone(), (0..5).collect::<Vec<i32>>());
|
||||
@@ -458,7 +462,7 @@ mod test {
|
||||
table_name: "dist_numbers".to_string(),
|
||||
schema,
|
||||
partition_rule: Arc::new(partition_rule),
|
||||
region_dist_map: HashMap::from([(1_u64, 1), (2_u64, 2), (3_u64, 3), (4_u64, 4)]),
|
||||
region_dist_map: HashMap::from([(1_u32, 1), (2_u32, 2), (3_u32, 3), (4_u32, 4)]),
|
||||
datanode_instances,
|
||||
}
|
||||
}
|
||||
@@ -491,20 +495,21 @@ mod test {
|
||||
|
||||
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
|
||||
instance.start().await.unwrap();
|
||||
|
||||
let catalog_manager = instance.catalog_manager().clone();
|
||||
let client = crate::tests::create_datanode_client(instance).await;
|
||||
|
||||
let table_name = table.table_name().to_string();
|
||||
catalog_manager
|
||||
.register_table(RegisterTableRequest {
|
||||
catalog: "greptime".to_string(),
|
||||
schema: "public".to_string(),
|
||||
table_name: table.table_name().to_string(),
|
||||
table_name: table_name.clone(),
|
||||
table_id: 1234,
|
||||
table: Arc::new(table),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let client = crate::tests::create_datanode_client(instance).await;
|
||||
DatanodeInstance::new(
|
||||
datanode_id,
|
||||
catalog_manager,
|
||||
@@ -516,7 +521,7 @@ mod test {
|
||||
async fn test_find_regions() {
|
||||
let table = new_dist_table().await;
|
||||
|
||||
let test = |filters: Vec<Expr>, expect_regions: Vec<u64>| {
|
||||
let test = |filters: Vec<Expr>, expect_regions: Vec<RegionNumber>| {
|
||||
let mut regions = table.find_regions(filters.as_slice()).unwrap();
|
||||
regions.sort();
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ use client::ObjectResult;
|
||||
use snafu::ensure;
|
||||
use snafu::OptionExt;
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::RegionId;
|
||||
use store_api::storage::RegionNumber;
|
||||
use table::requests::InsertRequest;
|
||||
|
||||
use super::DistTable;
|
||||
@@ -21,7 +21,7 @@ use crate::error::Result;
|
||||
impl DistTable {
|
||||
pub async fn dist_insert(
|
||||
&self,
|
||||
inserts: HashMap<RegionId, InsertRequest>,
|
||||
inserts: HashMap<RegionNumber, InsertRequest>,
|
||||
) -> Result<ObjectResult> {
|
||||
let mut joins = Vec::with_capacity(inserts.len());
|
||||
|
||||
@@ -66,7 +66,7 @@ impl DistTable {
|
||||
}
|
||||
}
|
||||
|
||||
fn to_insert_expr(region_id: RegionId, insert: InsertRequest) -> Result<InsertExpr> {
|
||||
fn to_insert_expr(region_id: RegionNumber, insert: InsertRequest) -> Result<InsertExpr> {
|
||||
let mut row_count = None;
|
||||
|
||||
let columns = insert
|
||||
@@ -109,7 +109,7 @@ fn to_insert_expr(region_id: RegionId, insert: InsertRequest) -> Result<InsertEx
|
||||
options.insert(
|
||||
// TODO(fys): Temporarily hard code here
|
||||
"region_id".to_string(),
|
||||
codec::RegionId { id: region_id }.into(),
|
||||
codec::RegionNumber { id: region_id }.into(),
|
||||
);
|
||||
|
||||
Ok(InsertExpr {
|
||||
@@ -196,7 +196,7 @@ mod tests {
|
||||
}
|
||||
|
||||
let bytes = insert_expr.options.get("region_id").unwrap();
|
||||
let region_id: codec::RegionId = bytes.deref().try_into().unwrap();
|
||||
let region_id: codec::RegionNumber = bytes.deref().try_into().unwrap();
|
||||
assert_eq!(12, region_id.id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::local::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
|
||||
use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
|
||||
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_query::Output;
|
||||
@@ -18,7 +18,7 @@ use table::test_util::MemTable;
|
||||
pub fn create_query_engine() -> Arc<dyn QueryEngine> {
|
||||
let schema_provider = Arc::new(MemorySchemaProvider::new());
|
||||
let catalog_provider = Arc::new(MemoryCatalogProvider::new());
|
||||
let catalog_list = Arc::new(MemoryCatalogList::default());
|
||||
let catalog_list = Arc::new(MemoryCatalogManager::default());
|
||||
|
||||
let mut column_schemas = vec![];
|
||||
let mut columns = vec![];
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::fmt::Debug;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::local::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
|
||||
use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
|
||||
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_function::scalars::aggregate::AggregateFunctionMeta;
|
||||
@@ -242,7 +242,7 @@ fn new_query_engine_factory(table: MemTable) -> QueryEngineFactory {
|
||||
|
||||
let schema_provider = Arc::new(MemorySchemaProvider::new());
|
||||
let catalog_provider = Arc::new(MemoryCatalogProvider::new());
|
||||
let catalog_list = Arc::new(MemoryCatalogList::default());
|
||||
let catalog_list = Arc::new(MemoryCatalogManager::default());
|
||||
|
||||
schema_provider.register_table(table_name, table).unwrap();
|
||||
catalog_provider
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
mod function;
|
||||
use catalog::local::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
|
||||
use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
|
||||
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_query::Output;
|
||||
@@ -114,7 +114,7 @@ fn create_correctness_engine() -> Arc<dyn QueryEngine> {
|
||||
// create engine
|
||||
let schema_provider = Arc::new(MemorySchemaProvider::new());
|
||||
let catalog_provider = Arc::new(MemoryCatalogProvider::new());
|
||||
let catalog_list = Arc::new(MemoryCatalogList::default());
|
||||
let catalog_list = Arc::new(MemoryCatalogManager::default());
|
||||
|
||||
let mut column_schemas = vec![];
|
||||
let mut columns = vec![];
|
||||
|
||||
@@ -3,7 +3,7 @@ mod pow;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::UInt32Array;
|
||||
use catalog::local::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
|
||||
use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
|
||||
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_query::prelude::{create_udf, make_scalar_function, Volatility};
|
||||
@@ -150,7 +150,7 @@ async fn test_udf() -> Result<()> {
|
||||
fn create_query_engine() -> Arc<dyn QueryEngine> {
|
||||
let schema_provider = Arc::new(MemorySchemaProvider::new());
|
||||
let catalog_provider = Arc::new(MemoryCatalogProvider::new());
|
||||
let catalog_list = Arc::new(MemoryCatalogList::default());
|
||||
let catalog_list = Arc::new(MemoryCatalogManager::default());
|
||||
|
||||
// create table with primitives, and all columns' length are even
|
||||
let mut column_schemas = vec![];
|
||||
|
||||
@@ -154,7 +154,7 @@ pub enum Error {
|
||||
InvalidPromRemoteReadQueryResult { msg: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Failed to decode region id, source: {}", source))]
|
||||
DecodeRegionId { source: api::DecodeError },
|
||||
DecodeRegionNumber { source: api::DecodeError },
|
||||
|
||||
#[snafu(display("Failed to build gRPC reflection service, source: {}", source))]
|
||||
GrpcReflectionService {
|
||||
@@ -196,7 +196,7 @@ impl ErrorExt for Error {
|
||||
| DecodePromRemoteRequest { .. }
|
||||
| DecompressPromRemoteRequest { .. }
|
||||
| InvalidPromRemoteRequest { .. }
|
||||
| DecodeRegionId { .. }
|
||||
| DecodeRegionNumber { .. }
|
||||
| TimePrecision { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
InfluxdbLinesWrite { source, .. } => source.status_code(),
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::collections::HashMap;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use catalog::local::{MemoryCatalogList, MemoryCatalogProvider, MemorySchemaProvider};
|
||||
use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
|
||||
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_query::Output;
|
||||
@@ -70,7 +70,7 @@ fn create_testing_sql_query_handler(table: MemTable) -> SqlQueryHandlerRef {
|
||||
|
||||
let schema_provider = Arc::new(MemorySchemaProvider::new());
|
||||
let catalog_provider = Arc::new(MemoryCatalogProvider::new());
|
||||
let catalog_list = Arc::new(MemoryCatalogList::default());
|
||||
let catalog_list = Arc::new(MemoryCatalogManager::default());
|
||||
schema_provider.register_table(table_name, table).unwrap();
|
||||
catalog_provider
|
||||
.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider)
|
||||
|
||||
@@ -10,6 +10,8 @@ pub type ColumnFamilyId = u32;
|
||||
/// Id of the region.
|
||||
pub type RegionId = u64;
|
||||
|
||||
pub type RegionNumber = u32;
|
||||
|
||||
/// A [ColumnDescriptor] contains information to create a column.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Builder)]
|
||||
#[builder(pattern = "owned", build_fn(validate = "Self::validate"))]
|
||||
|
||||
Reference in New Issue
Block a user