diff --git a/Cargo.lock b/Cargo.lock index 22e51f0b73..f21f03eafd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1628,6 +1628,8 @@ dependencies = [ "futures", "hyper", "log-store", + "meta-client", + "meta-srv", "metrics", "object-store", "query", diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 311f8867ac..f5908c9562 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -2,14 +2,13 @@ name = "benchmarks" version = "0.1.0" edition = "2021" - # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] arrow = "10" clap = { version = "4.0", features = ["derive"] } client = { path = "../src/client" } -itertools = "0.10.5" indicatif = "0.17.1" +itertools = "0.10.5" parquet = { version = "*" } tokio = { version = "1.21", features = ["full"] } diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 866e806664..18cba573b5 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -1,3 +1,4 @@ +node_id = 42 http_addr = '0.0.0.0:3000' rpc_addr = '0.0.0.0:3001' wal_dir = '/tmp/greptimedb/wal' @@ -13,3 +14,9 @@ postgres_runtime_size = 4 [storage] type = 'File' data_dir = '/tmp/greptimedb/data/' + +[meta_client_opts] +metasrv_addr = "1.1.1.1:3002" +timeout_millis = 3000 +connect_timeout_millis = 5000 +tcp_nodelay = true diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 4263c94de0..99e81a5767 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -4,6 +4,7 @@ use common_error::ext::{BoxedError, ErrorExt}; use common_error::prelude::{Snafu, StatusCode}; use datafusion::error::DataFusionError; use datatypes::arrow; +use datatypes::schema::RawSchema; use snafu::{Backtrace, ErrorCompat}; #[derive(Debug, Snafu)] @@ -110,6 +111,19 @@ pub enum Error { source: table::error::Error, }, + #[snafu(display( + "Invalid table schema in catalog entry, table:{}, schema: {:?}, source: {}", + table_info, + schema, + source + ))] + InvalidTableSchema { + table_info: String, + schema: RawSchema, + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + #[snafu(display("Failed to execute system catalog table scan, source: {}", source))] SystemCatalogTableScanExec { #[snafu(backtrace)] @@ -170,6 +184,7 @@ impl ErrorExt for Error { Error::MetaSrv { source, .. } => source.status_code(), Error::SystemCatalogTableScan { source } => source.status_code(), Error::SystemCatalogTableScanExec { source } => source.status_code(), + Error::InvalidTableSchema { source, .. } => source.status_code(), } } diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 92236e9244..b89f427b05 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -12,21 +12,21 @@ use common_catalog::{ SchemaKey, SchemaValue, TableKey, TableValue, }; use common_telemetry::{debug, info}; -use datatypes::schema::Schema; use futures::Stream; use futures_util::StreamExt; use snafu::{OptionExt, ResultExt}; use table::engine::{EngineContext, TableEngineRef}; use table::metadata::{TableId, TableVersion}; use table::requests::{CreateTableRequest, OpenTableRequest}; +use table::table::numbers::NumbersTable; use table::TableRef; use tokio::sync::Mutex; -use crate::error::Result; use crate::error::{ CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu, SchemaNotFoundSnafu, TableExistsSnafu, }; +use crate::error::{InvalidTableSchemaSnafu, Result}; use crate::remote::{Kv, KvBackendRef}; use crate::{ handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, @@ -164,7 +164,7 @@ impl RemoteCatalogManager { /// Fetch catalogs/schemas/tables from remote catalog manager along with max table id allocated. async fn initiate_catalogs(&self) -> Result<(HashMap, TableId)> { let mut res = HashMap::new(); - let max_table_id = MIN_USER_TABLE_ID; + let max_table_id = MIN_USER_TABLE_ID - 1; // initiate default catalog and schema let default_catalog = self.initiate_default_catalog().await?; @@ -246,7 +246,7 @@ impl RemoteCatalogManager { async fn initiate_default_catalog(&self) -> Result { let default_catalog = self.new_catalog_provider(DEFAULT_CATALOG_NAME); let default_schema = self.new_schema_provider(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); - default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema)?; + default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema.clone())?; let schema_key = SchemaKey { schema_name: DEFAULT_SCHEMA_NAME.to_string(), catalog_name: DEFAULT_CATALOG_NAME.to_string(), @@ -310,13 +310,22 @@ impl RemoteCatalogManager { })? { Some(table) => Ok(table), None => { + let schema = meta + .schema + .clone() + .try_into() + .context(InvalidTableSchemaSnafu { + table_info: format!("{}.{}.{}", catalog_name, schema_name, table_name,), + schema: meta.schema.clone(), + })?; let req = CreateTableRequest { id: *id, catalog_name: catalog_name.clone(), schema_name: schema_name.clone(), table_name: table_name.clone(), desc: None, - schema: Arc::new(Schema::new(meta.schema.column_schemas.clone())), + schema: Arc::new(schema), + region_numbers: meta.region_numbers.clone(), primary_key_indices: meta.primary_key_indices.clone(), create_if_not_exists: true, table_options: meta.options.clone(), @@ -352,6 +361,15 @@ impl CatalogManager for RemoteCatalogManager { let mut system_table_requests = self.system_table_requests.lock().await; handle_system_table_request(self, self.engine.clone(), &mut system_table_requests).await?; info!("All system table opened"); + + self.catalog(DEFAULT_CATALOG_NAME) + .unwrap() + .unwrap() + .schema(DEFAULT_SCHEMA_NAME) + .unwrap() + .unwrap() + .register_table("numbers".to_string(), Arc::new(NumbersTable::default())) + .unwrap(); Ok(()) } @@ -512,6 +530,7 @@ impl CatalogProvider for RemoteCatalogProvider { .context(InvalidCatalogValueSnafu)?, ) .await?; + let prev_schemas = schemas.load(); let mut new_schemas = HashMap::with_capacity(prev_schemas.len() + 1); new_schemas.clone_from(&prev_schemas); @@ -590,7 +609,7 @@ impl SchemaProvider for RemoteSchemaProvider { meta: table_info.meta.clone().into(), id: table_info.ident.table_id, node_id: self.node_id, - regions_ids: vec![], + regions_ids: table.table_info().meta.region_numbers.clone(), }; let backend = self.backend.clone(); let mutex = self.mutex.clone(); diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 5fd802d1ed..66dc2e1116 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -96,6 +96,7 @@ impl SystemCatalogTable { table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(), desc: Some("System catalog table".to_string()), schema: schema.clone(), + region_numbers: vec![0], primary_key_indices: vec![ENTRY_TYPE_INDEX, KEY_INDEX, TIMESTAMP_INDEX], create_if_not_exists: true, table_options: HashMap::new(), diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs index 40b9de8eb3..b54d15275c 100644 --- a/src/catalog/tests/remote_catalog_tests.rs +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -114,6 +114,7 @@ mod tests { table_name: table_name.clone(), desc: None, schema: table_schema.clone(), + region_numbers: vec![0], primary_key_indices: vec![], create_if_not_exists: false, table_options: Default::default(), @@ -154,7 +155,7 @@ mod tests { .schema(DEFAULT_SCHEMA_NAME) .unwrap() .unwrap(); - assert_eq!(Vec::::new(), default_schema.table_names().unwrap()); + assert_eq!(vec!["numbers"], default_schema.table_names().unwrap()); // register a new table with an nonexistent catalog let catalog_name = DEFAULT_CATALOG_NAME.to_string(); @@ -173,6 +174,7 @@ mod tests { table_name: table_name.clone(), desc: None, schema: table_schema.clone(), + region_numbers: vec![0], primary_key_indices: vec![], create_if_not_exists: false, table_options: Default::default(), @@ -188,7 +190,14 @@ mod tests { table, }; assert_eq!(1, catalog_manager.register_table(reg_req).await.unwrap()); - assert_eq!(vec![table_name], default_schema.table_names().unwrap()); + assert_eq!( + HashSet::from([table_name, "numbers".to_string()]), + default_schema + .table_names() + .unwrap() + .into_iter() + .collect::>() + ); } #[tokio::test] @@ -225,6 +234,7 @@ mod tests { table_name: "".to_string(), desc: None, schema: Arc::new(Schema::new(vec![])), + region_numbers: vec![0], primary_key_indices: vec![], create_if_not_exists: false, table_options: Default::default(), diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 6fd3682a67..7c0919c81a 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -33,6 +33,8 @@ impl SubCommand { #[derive(Debug, Parser)] struct StartCommand { + #[clap(long)] + node_id: Option, #[clap(long)] http_addr: Option, #[clap(long)] @@ -41,6 +43,8 @@ struct StartCommand { mysql_addr: Option, #[clap(long)] postgres_addr: Option, + #[clap(long)] + metasrv_addr: Option, #[clap(short, long)] config_file: Option, } @@ -71,6 +75,9 @@ impl TryFrom for DatanodeOptions { DatanodeOptions::default() }; + if let Some(node_id) = cmd.node_id { + opts.node_id = node_id; + } if let Some(addr) = cmd.http_addr { opts.http_addr = addr; } @@ -83,7 +90,9 @@ impl TryFrom for DatanodeOptions { if let Some(addr) = cmd.postgres_addr { opts.postgres_addr = addr; } - + if let Some(addr) = cmd.metasrv_addr { + opts.meta_client_opts.metasrv_addr = addr; + } Ok(opts) } } @@ -97,10 +106,12 @@ mod tests { #[test] fn test_read_from_config_file() { let cmd = StartCommand { + node_id: None, http_addr: None, rpc_addr: None, mysql_addr: None, postgres_addr: None, + metasrv_addr: None, config_file: Some(format!( "{}/../../config/datanode.example.toml", std::env::current_dir().unwrap().as_path().to_str().unwrap() @@ -112,6 +123,13 @@ mod tests { assert_eq!("/tmp/greptimedb/wal".to_string(), options.wal_dir); assert_eq!("0.0.0.0:3306".to_string(), options.mysql_addr); assert_eq!(4, options.mysql_runtime_size); + assert_eq!( + "1.1.1.1:3002".to_string(), + options.meta_client_opts.metasrv_addr + ); + assert_eq!(5000, options.meta_client_opts.connect_timeout_millis); + assert_eq!(3000, options.meta_client_opts.timeout_millis); + assert!(options.meta_client_opts.tcp_nodelay); assert_eq!("0.0.0.0:5432".to_string(), options.postgres_addr); assert_eq!(4, options.postgres_runtime_size); diff --git a/src/common/catalog/src/helper.rs b/src/common/catalog/src/helper.rs index dfbd5ef231..db8e9b5565 100644 --- a/src/common/catalog/src/helper.rs +++ b/src/common/catalog/src/helper.rs @@ -102,7 +102,7 @@ impl TableKey { pub struct TableValue { pub id: TableId, pub node_id: u64, - pub regions_ids: Vec, + pub regions_ids: Vec, pub meta: RawTableMeta, } @@ -278,6 +278,7 @@ mod tests { engine_options: Default::default(), value_indices: vec![2, 3], options: Default::default(), + region_numbers: vec![1], }; let value = TableValue { diff --git a/src/common/substrait/src/df_logical.rs b/src/common/substrait/src/df_logical.rs index 2e746cdc81..45adbb748c 100644 --- a/src/common/substrait/src/df_logical.rs +++ b/src/common/substrait/src/df_logical.rs @@ -377,6 +377,7 @@ mod test { table_name: table_name.to_string(), desc: None, schema: Arc::new(Schema::new(supported_types())), + region_numbers: vec![0], primary_key_indices: vec![], create_if_not_exists: true, table_options: Default::default(), diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 031e78a424..0f6b1ae5d6 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -29,6 +29,8 @@ datatypes = { path = "../datatypes" } futures = "0.3" hyper = { version = "0.14", features = ["full"] } log-store = { path = "../log-store" } +meta-client = { path = "../meta-client" } +meta-srv = { path = "../meta-srv", features = ["mock"] } metrics = "0.20" object-store = { path = "../object-store" } query = { path = "../query" } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 329db00ae2..5286879c3b 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -15,13 +15,17 @@ pub enum ObjectStoreConfig { impl Default for ObjectStoreConfig { fn default() -> Self { ObjectStoreConfig::File { - data_dir: "/tmp/greptimedb/data/".to_string(), + data_dir: format!( + "/tmp/greptimedb/data/{}", + common_time::util::current_time_millis() + ), } } } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct DatanodeOptions { + pub node_id: u64, pub http_addr: String, pub rpc_addr: String, pub rpc_runtime_size: usize, @@ -29,6 +33,7 @@ pub struct DatanodeOptions { pub mysql_runtime_size: usize, pub postgres_addr: String, pub postgres_runtime_size: usize, + pub meta_client_opts: MetaClientOpts, pub wal_dir: String, pub storage: ObjectStoreConfig, } @@ -36,6 +41,7 @@ pub struct DatanodeOptions { impl Default for DatanodeOptions { fn default() -> Self { Self { + node_id: 0, http_addr: "0.0.0.0:3000".to_string(), rpc_addr: "0.0.0.0:3001".to_string(), rpc_runtime_size: 8, @@ -43,7 +49,11 @@ impl Default for DatanodeOptions { mysql_runtime_size: 2, postgres_addr: "0.0.0.0:5432".to_string(), postgres_runtime_size: 2, - wal_dir: "/tmp/greptimedb/wal".to_string(), + meta_client_opts: MetaClientOpts::default(), + wal_dir: format!( + "/tmp/greptimedb/wal/{}", + common_time::util::current_time_millis() + ), storage: ObjectStoreConfig::default(), } } @@ -72,3 +82,23 @@ impl Datanode { self.services.start(&self.opts).await } } + +// Options for meta client in datanode instance. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct MetaClientOpts { + pub metasrv_addr: String, + pub timeout_millis: u64, + pub connect_timeout_millis: u64, + pub tcp_nodelay: bool, +} + +impl Default for MetaClientOpts { + fn default() -> Self { + Self { + metasrv_addr: "127.0.0.1:3002".to_string(), + timeout_millis: 3_000u64, + connect_timeout_millis: 5_000u64, + tcp_nodelay: true, + } + } +} diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 408a1cbd21..c2c4adacda 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -279,6 +279,12 @@ pub enum Error { table_name: String, source: catalog::error::Error, }, + + #[snafu(display("Failed to initialize meta client, source: {}", source))] + MetaClientInit { + #[snafu(backtrace)] + source: meta_client::error::Error, + }, } pub type Result = std::result::Result; @@ -346,6 +352,7 @@ impl ErrorExt for Error { | Error::CollectRecordBatches { source } => source.status_code(), Error::ArrowComputation { .. } => StatusCode::Unexpected, + Error::MetaClientInit { source, .. } => source.status_code(), } } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs new file mode 100644 index 0000000000..229742472f --- /dev/null +++ b/src/datanode/src/heartbeat.rs @@ -0,0 +1,97 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, Peer}; +use common_telemetry::{error, info, warn}; +use meta_client::client::{HeartbeatSender, MetaClient}; +use snafu::ResultExt; + +use crate::error::{MetaClientInitSnafu, Result}; + +#[derive(Debug, Clone, Default)] +pub struct HeartbeatTask { + node_id: u64, + server_addr: String, + started: Arc, + meta_client: MetaClient, + interval: u64, +} + +impl HeartbeatTask { + /// Create a new heartbeat task instance. + pub fn new(node_id: u64, server_addr: String, meta_client: MetaClient) -> Self { + Self { + node_id, + server_addr, + started: Arc::new(AtomicBool::new(false)), + meta_client, + interval: 5_000, // default interval is set to 5 secs + } + } + + pub async fn create_streams(meta_client: &MetaClient) -> Result { + let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?; + common_runtime::spawn_bg(async move { + while let Some(res) = match rx.message().await { + Ok(m) => m, + Err(e) => { + error!(e; "Error while reading heartbeat response"); + None + } + } { + Self::handle_response(res).await; + } + info!("Heartbeat handling loop exit.") + }); + Ok(tx) + } + + async fn handle_response(resp: HeartbeatResponse) { + info!("heartbeat response: {:?}", resp); + } + + /// Start heartbeat task, spawn background task. + pub async fn start(&self) -> Result<()> { + let started = self.started.clone(); + if started + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_err() + { + warn!("Heartbeat task started multiple times"); + return Ok(()); + } + let interval = self.interval; + let node_id = self.node_id; + let server_addr = self.server_addr.clone(); + let meta_client = self.meta_client.clone(); + + let mut tx = Self::create_streams(&meta_client).await?; + common_runtime::spawn_bg(async move { + while started.load(Ordering::Acquire) { + let req = HeartbeatRequest { + peer: Some(Peer { + id: node_id, + addr: server_addr.clone(), + }), + ..Default::default() + }; + if let Err(e) = tx.send(req).await { + error!("Failed to send heartbeat to metasrv, error: {:?}", e); + match Self::create_streams(&meta_client).await { + Ok(new_tx) => { + info!("Reconnected to metasrv"); + tx = new_tx; + } + Err(e) => { + error!(e;"Failed to reconnect to metasrv!"); + } + } + } + tokio::time::sleep(Duration::from_millis(interval)).await; + } + }); + + Ok(()) + } +} diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 4c88fdb5ed..9c8a0897e2 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -1,8 +1,12 @@ +use std::time::Duration; use std::{fs, path, sync::Arc}; +use catalog::remote::MetaKvBackend; use catalog::CatalogManagerRef; +use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_telemetry::logging::info; use log_store::fs::{config::LogConfig, log::LocalFileLogStore}; +use meta_client::client::{MetaClient, MetaClientBuilder}; use object_store::{services::fs::Builder, util, ObjectStore}; use query::query_engine::{QueryEngineFactory, QueryEngineRef}; use snafu::prelude::*; @@ -10,8 +14,9 @@ use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl}; use table_engine::config::EngineConfig as TableEngineConfig; use table_engine::engine::MitoEngine; -use crate::datanode::{DatanodeOptions, ObjectStoreConfig}; -use crate::error::{self, NewCatalogSnafu, Result}; +use crate::datanode::{DatanodeOptions, MetaClientOpts, ObjectStoreConfig}; +use crate::error::{self, MetaClientInitSnafu, NewCatalogSnafu, Result}; +use crate::heartbeat::HeartbeatTask; use crate::script::ScriptExecutor; use crate::server::grpc::plan::PhysicalPlanner; use crate::sql::SqlHandler; @@ -19,15 +24,18 @@ use crate::sql::SqlHandler; mod grpc; mod sql; -type DefaultEngine = MitoEngine>; +pub(crate) type DefaultEngine = MitoEngine>; // An abstraction to read/write services. pub struct Instance { - query_engine: QueryEngineRef, - sql_handler: SqlHandler, - catalog_manager: CatalogManagerRef, - physical_planner: PhysicalPlanner, - script_executor: ScriptExecutor, + pub(crate) query_engine: QueryEngineRef, + pub(crate) sql_handler: SqlHandler, + pub(crate) catalog_manager: CatalogManagerRef, + pub(crate) physical_planner: PhysicalPlanner, + pub(crate) script_executor: ScriptExecutor, + #[allow(unused)] + pub(crate) meta_client: MetaClient, + pub(crate) heartbeat_task: HeartbeatTask, } pub type InstanceRef = Arc; @@ -36,6 +44,7 @@ impl Instance { pub async fn new(opts: &DatanodeOptions) -> Result { let object_store = new_object_store(&opts.storage).await?; let log_store = create_local_file_log_store(opts).await?; + let meta_client = new_metasrv_client(opts.node_id, &opts.meta_client_opts).await?; let table_engine = Arc::new(DefaultEngine::new( TableEngineConfig::default(), @@ -46,22 +55,34 @@ impl Instance { ), object_store, )); - let catalog_manager = Arc::new( - catalog::local::LocalCatalogManager::try_new(table_engine.clone()) - .await - .context(NewCatalogSnafu)?, - ); + + // create remote catalog manager + let catalog_manager = Arc::new(catalog::remote::RemoteCatalogManager::new( + table_engine.clone(), + opts.node_id, + Arc::new(MetaKvBackend { + client: meta_client.clone(), + }), + )); + let factory = QueryEngineFactory::new(catalog_manager.clone()); let query_engine = factory.query_engine().clone(); let script_executor = ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?; + let heartbeat_task = HeartbeatTask::new( + opts.node_id, /*node id not set*/ + opts.rpc_addr.clone(), + meta_client.clone(), + ); Ok(Self { query_engine: query_engine.clone(), sql_handler: SqlHandler::new(table_engine, catalog_manager.clone()), catalog_manager, physical_planner: PhysicalPlanner::new(query_engine), script_executor, + meta_client, + heartbeat_task, }) } @@ -70,6 +91,7 @@ impl Instance { .start() .await .context(NewCatalogSnafu)?; + self.heartbeat_task.start().await?; Ok(()) } @@ -80,47 +102,9 @@ impl Instance { pub fn catalog_manager(&self) -> &CatalogManagerRef { &self.catalog_manager } - - // This method is used in other crate's testing codes, so move it out of "cfg(test)". - // TODO(LFC): Delete it when callers no longer need it. - pub async fn new_mock() -> Result { - use table_engine::table::test_util::new_test_object_store; - use table_engine::table::test_util::MockEngine; - use table_engine::table::test_util::MockMitoEngine; - - let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await; - let mock_engine = Arc::new(MockMitoEngine::new( - TableEngineConfig::default(), - MockEngine::default(), - object_store, - )); - - let catalog_manager = Arc::new( - catalog::local::manager::LocalCatalogManager::try_new(mock_engine.clone()) - .await - .unwrap(), - ); - - let factory = QueryEngineFactory::new(catalog_manager.clone()); - let query_engine = factory.query_engine().clone(); - - let sql_handler = SqlHandler::new(mock_engine.clone(), catalog_manager.clone()); - let physical_planner = PhysicalPlanner::new(query_engine.clone()); - let script_executor = ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()) - .await - .unwrap(); - - Ok(Self { - query_engine, - sql_handler, - catalog_manager, - physical_planner, - script_executor, - }) - } } -async fn new_object_store(store_config: &ObjectStoreConfig) -> Result { +pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result { // TODO(dennis): supports other backend let data_dir = util::normalize_dir(match store_config { ObjectStoreConfig::File { data_dir } => data_dir, @@ -139,7 +123,38 @@ async fn new_object_store(store_config: &ObjectStoreConfig) -> Result Result { +/// Create metasrv client instance and spawn heartbeat loop. +async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOpts) -> Result { + let cluster_id = 0; // TODO(hl): read from config + let member_id = node_id; + + let config = ChannelConfig::new() + .timeout(Duration::from_millis(meta_config.timeout_millis)) + .connect_timeout(Duration::from_millis(meta_config.connect_timeout_millis)) + .tcp_nodelay(meta_config.tcp_nodelay); + let channel_manager = ChannelManager::with_config(config); + let mut meta_client = MetaClientBuilder::new(cluster_id, member_id) + .enable_heartbeat() + .enable_router() + .enable_store() + .channel_manager(channel_manager) + .build(); + meta_client + .start(&[&meta_config.metasrv_addr]) + .await + .context(MetaClientInitSnafu)?; + + // required only when the heartbeat_client is enabled + meta_client + .ask_leader() + .await + .context(MetaClientInitSnafu)?; + Ok(meta_client) +} + +pub(crate) async fn create_local_file_log_store( + opts: &DatanodeOptions, +) -> Result { // create WAL directory fs::create_dir_all(path::Path::new(&opts.wal_dir)) .context(error::CreateDirSnafu { dir: &opts.wal_dir })?; diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 9181297887..bcf7097377 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -2,8 +2,10 @@ pub mod datanode; pub mod error; +mod heartbeat; pub mod instance; mod metric; +mod mock; mod script; pub mod server; mod sql; diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs new file mode 100644 index 0000000000..78ab01cb0b --- /dev/null +++ b/src/datanode/src/mock.rs @@ -0,0 +1,123 @@ +use std::sync::Arc; + +use catalog::remote::MetaKvBackend; +use meta_client::client::{MetaClient, MetaClientBuilder}; +use query::QueryEngineFactory; +use storage::config::EngineConfig as StorageEngineConfig; +use storage::EngineImpl; +use table_engine::config::EngineConfig as TableEngineConfig; + +use crate::datanode::DatanodeOptions; +use crate::error::Result; +use crate::heartbeat::HeartbeatTask; +use crate::instance::{create_local_file_log_store, new_object_store, DefaultEngine, Instance}; +use crate::script::ScriptExecutor; +use crate::server::grpc::plan::PhysicalPlanner; +use crate::sql::SqlHandler; + +impl Instance { + // This method is used in other crate's testing codes, so move it out of "cfg(test)". + // TODO(LFC): Delete it when callers no longer need it. + pub async fn new_mock() -> Result { + use table_engine::table::test_util::new_test_object_store; + use table_engine::table::test_util::MockEngine; + use table_engine::table::test_util::MockMitoEngine; + + let meta_client = mock_meta_client().await; + let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await; + let mock_engine = Arc::new(MockMitoEngine::new( + TableEngineConfig::default(), + MockEngine::default(), + object_store, + )); + + let catalog_manager = Arc::new( + catalog::local::manager::LocalCatalogManager::try_new(mock_engine.clone()) + .await + .unwrap(), + ); + + let factory = QueryEngineFactory::new(catalog_manager.clone()); + let query_engine = factory.query_engine().clone(); + + let sql_handler = SqlHandler::new(mock_engine.clone(), catalog_manager.clone()); + let physical_planner = PhysicalPlanner::new(query_engine.clone()); + let script_executor = ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()) + .await + .unwrap(); + + let heartbeat_task = + HeartbeatTask::new(0, "127.0.0.1:3302".to_string(), meta_client.clone()); + Ok(Self { + query_engine, + sql_handler, + catalog_manager, + physical_planner, + script_executor, + meta_client, + heartbeat_task, + }) + } + + pub async fn with_mock_meta_client(opts: &DatanodeOptions) -> Result { + let object_store = new_object_store(&opts.storage).await?; + let log_store = create_local_file_log_store(opts).await?; + let meta_client = mock_meta_client().await; + let table_engine = Arc::new(DefaultEngine::new( + TableEngineConfig::default(), + EngineImpl::new( + StorageEngineConfig::default(), + Arc::new(log_store), + object_store.clone(), + ), + object_store, + )); + + // create remote catalog manager + let catalog_manager = Arc::new(catalog::remote::RemoteCatalogManager::new( + table_engine.clone(), + opts.node_id, + Arc::new(MetaKvBackend { + client: meta_client.clone(), + }), + )); + + let factory = QueryEngineFactory::new(catalog_manager.clone()); + let query_engine = factory.query_engine().clone(); + let script_executor = + ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?; + + let heartbeat_task = + HeartbeatTask::new(opts.node_id, opts.rpc_addr.clone(), meta_client.clone()); + Ok(Self { + query_engine: query_engine.clone(), + sql_handler: SqlHandler::new(table_engine, catalog_manager.clone()), + catalog_manager, + physical_planner: PhysicalPlanner::new(query_engine), + script_executor, + meta_client, + heartbeat_task, + }) + } +} + +async fn mock_meta_client() -> MetaClient { + let mock_info = meta_srv::mocks::mock_with_memstore().await; + let meta_srv::mocks::MockInfo { + server_addr, + channel_manager, + } = mock_info; + + let id = (1000u64, 2000u64); + let mut meta_client = MetaClientBuilder::new(id.0, id.1) + .enable_heartbeat() + .enable_router() + .enable_store() + .channel_manager(channel_manager) + .build(); + meta_client.start(&[&server_addr]).await.unwrap(); + // // required only when the heartbeat_client is enabled + meta_client.ask_leader().await.unwrap(); + + meta_client +} diff --git a/src/datanode/src/server/grpc/ddl.rs b/src/datanode/src/server/grpc/ddl.rs index 4d52b39767..87e6e161a2 100644 --- a/src/datanode/src/server/grpc/ddl.rs +++ b/src/datanode/src/server/grpc/ddl.rs @@ -84,6 +84,14 @@ impl Instance { let schema_name = expr .schema_name .unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()); + + let region_id = expr + .table_options + .get(&"region_id".to_string()) + .unwrap() + .parse::() + .unwrap(); + Ok(CreateTableRequest { id: table_id, catalog_name, @@ -91,6 +99,7 @@ impl Instance { table_name: expr.table_name, desc: expr.desc, schema, + region_numbers: vec![region_id], primary_key_indices, create_if_not_exists: expr.create_if_not_exists, table_options: expr.table_options, @@ -179,10 +188,11 @@ mod tests { use super::*; use crate::tests::test_util; - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_create_expr_to_request() { + common_telemetry::init_default_ut_logging(); let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("create_expr_to_request"); - let instance = Instance::new(&opts).await.unwrap(); + let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); instance.start().await.unwrap(); let expr = testing_create_expr(); @@ -291,6 +301,9 @@ mod tests { default_constraint: None, }, ]; + let table_options = [("region_id".to_string(), "0".to_string())] + .into_iter() + .collect::>(); CreateExpr { catalog_name: None, schema_name: None, @@ -300,7 +313,7 @@ mod tests { time_index: "ts".to_string(), primary_keys: vec!["ts".to_string(), "host".to_string()], create_if_not_exists: true, - table_options: HashMap::new(), + table_options, } } diff --git a/src/datanode/src/server/grpc/insert.rs b/src/datanode/src/server/grpc/insert.rs index 6007d6db50..817f55ea44 100644 --- a/src/datanode/src/server/grpc/insert.rs +++ b/src/datanode/src/server/grpc/insert.rs @@ -168,6 +168,7 @@ pub fn build_create_table_request( create_if_not_exists: true, primary_key_indices, table_options: HashMap::new(), + region_numbers: vec![0], }); } diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index e2e880fa6e..fb24519bc2 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -155,6 +155,7 @@ impl SqlHandler { table_name, desc: None, schema, + region_numbers: vec![0], primary_key_indices: primary_keys, create_if_not_exists: stmt.if_not_exists, table_options: HashMap::new(), diff --git a/src/datanode/src/tests/grpc_test.rs b/src/datanode/src/tests/grpc_test.rs index 74e25658ba..cb19bd5291 100644 --- a/src/datanode/src/tests/grpc_test.rs +++ b/src/datanode/src/tests/grpc_test.rs @@ -24,7 +24,7 @@ async fn setup_grpc_server(name: &str, port: usize) -> (String, TestGuard, Arc (String, TestGuard, Arc (Column, Column, Column, Column) { ) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_insert_and_select() { + common_telemetry::init_default_ut_logging(); let (addr, _guard, grpc_server) = setup_grpc_server("insert_and_select", 3990).await; let grpc_client = Client::with_urls(vec![addr]); @@ -247,6 +248,6 @@ fn testing_create_expr() -> CreateExpr { time_index: "ts".to_string(), primary_keys: vec!["ts".to_string(), "host".to_string()], create_if_not_exists: true, - table_options: HashMap::new(), + table_options: HashMap::from([("region_id".to_string(), "0".to_string())]), } } diff --git a/src/datanode/src/tests/http_test.rs b/src/datanode/src/tests/http_test.rs index 58e52689bc..b84ec01197 100644 --- a/src/datanode/src/tests/http_test.rs +++ b/src/datanode/src/tests/http_test.rs @@ -14,7 +14,7 @@ use crate::tests::test_util; async fn make_test_app(name: &str) -> (Router, TestGuard) { let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts(name); - let instance = Arc::new(Instance::new(&opts).await.unwrap()); + let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); instance.start().await.unwrap(); test_util::create_test_table(&instance, ConcreteDataType::timestamp_millis_datatype()) .await @@ -23,7 +23,7 @@ async fn make_test_app(name: &str) -> (Router, TestGuard) { (http_server.make_app(), guard) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_sql_api() { common_telemetry::init_default_ut_logging(); let (app, _guard) = make_test_app("sql_api").await; @@ -83,7 +83,7 @@ async fn test_sql_api() { ); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_metrics_api() { common_telemetry::init_default_ut_logging(); common_telemetry::init_default_metrics_recorder(); @@ -104,7 +104,7 @@ async fn test_metrics_api() { assert!(body.contains("datanode_handle_sql_elapsed")); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_scripts_api() { common_telemetry::init_default_ut_logging(); let (app, _guard) = make_test_app("scripts_api").await; diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index a65e867780..4bbaf9d14d 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -1,20 +1,18 @@ use arrow::array::{Int64Array, UInt64Array}; use common_query::Output; use common_recordbatch::util; -use datafusion::arrow_print; -use datafusion_common::record_batch::RecordBatch as DfRecordBatch; use datatypes::arrow_array::StringArray; use datatypes::prelude::ConcreteDataType; use crate::instance::Instance; use crate::tests::test_util; -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_execute_insert() { common_telemetry::init_default_ut_logging(); let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("execute_insert"); - let instance = Instance::new(&opts).await.unwrap(); + let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); instance.start().await.unwrap(); test_util::create_test_table(&instance, ConcreteDataType::timestamp_millis_datatype()) @@ -33,12 +31,12 @@ async fn test_execute_insert() { assert!(matches!(output, Output::AffectedRows(2))); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_execute_insert_query_with_i64_timestamp() { common_telemetry::init_default_ut_logging(); let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("insert_query_i64_timestamp"); - let instance = Instance::new(&opts).await.unwrap(); + let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); instance.start().await.unwrap(); test_util::create_test_table(&instance, ConcreteDataType::int64_datatype()) @@ -72,10 +70,10 @@ async fn test_execute_insert_query_with_i64_timestamp() { } } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_execute_query() { let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("execute_query"); - let instance = Instance::new(&opts).await.unwrap(); + let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); instance.start().await.unwrap(); let output = instance @@ -98,11 +96,11 @@ async fn test_execute_query() { } } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_execute_show_databases_tables() { let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("execute_show_databases_tables"); - let instance = Instance::new(&opts).await.unwrap(); + let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); instance.start().await.unwrap(); let output = instance.execute_sql("show databases").await.unwrap(); @@ -188,12 +186,12 @@ async fn test_execute_show_databases_tables() { } } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] pub async fn test_execute_create() { common_telemetry::init_default_ut_logging(); let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("execute_create"); - let instance = Instance::new(&opts).await.unwrap(); + let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); instance.start().await.unwrap(); let output = instance @@ -212,13 +210,13 @@ pub async fn test_execute_create() { assert!(matches!(output, Output::AffectedRows(1))); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] pub async fn test_create_table_illegal_timestamp_type() { common_telemetry::init_default_ut_logging(); let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("create_table_illegal_timestamp_type"); - let instance = Instance::new(&opts).await.unwrap(); + let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); instance.start().await.unwrap(); let output = instance @@ -244,6 +242,8 @@ pub async fn test_create_table_illegal_timestamp_type() { #[tokio::test] async fn test_alter_table() { + use datafusion::arrow_print; + use datafusion_common::record_batch::RecordBatch as DfRecordBatch; // TODO(LFC) Use real Mito engine when we can alter its region schema, // and delete the `new_mock` method. let instance = Instance::new_mock().await.unwrap(); diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index ec8f9f6614..d54f15f8c1 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -72,6 +72,7 @@ pub async fn create_test_table(instance: &Instance, ts_type: ConcreteDataType) - create_if_not_exists: true, primary_key_indices: vec![3, 0], // "host" and "ts" are primary keys table_options: HashMap::new(), + region_numbers: vec![0], }, ) .await diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 7f5b08dc81..c367686402 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -21,9 +21,9 @@ datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datatypes = { path = "../datatypes" } +itertools = "0.10" openmetrics-parser = "0.4" prost = "0.11" -itertools = "0.10" query = { path = "../query" } serde = "1.0" servers = { path = "../servers" } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index d840222a59..ffc420d139 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -161,7 +161,10 @@ fn create_to_expr(create: CreateTable) -> Result { primary_keys: find_primary_keys(&create.constraints)?, create_if_not_exists: create.if_not_exists, // TODO(LFC): Fill in other table options. - table_options: HashMap::from([("engine".to_string(), create.engine)]), + table_options: HashMap::from([ + ("engine".to_string(), create.engine), + ("region_id".to_string(), "0".to_string()), + ]), ..Default::default() }; Ok(expr) @@ -550,12 +553,15 @@ mod tests { default_constraint: None, }, ]; + let mut table_options = HashMap::with_capacity(1); + table_options.insert("region_id".to_string(), "0".to_string()); CreateExpr { table_name: "demo".to_string(), column_defs, time_index: "ts".to_string(), primary_keys: vec!["ts".to_string(), "host".to_string()], create_if_not_exists: true, + table_options, ..Default::default() } } diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 879e058ea1..39e8a379ac 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -328,7 +328,7 @@ mod test { use super::*; use crate::partitioning::range::RangePartitionRule; - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_dist_table_scan() { let table = Arc::new(new_dist_table().await); @@ -475,7 +475,7 @@ mod test { ..Default::default() }; - let instance = Arc::new(Instance::new(&opts).await.unwrap()); + let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); instance.start().await.unwrap(); let catalog_manager = instance.catalog_manager().clone(); @@ -498,7 +498,7 @@ mod test { ) } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_find_regions() { let table = new_dist_table().await; diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 10a5b34e58..0d1e3cbc19 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -11,8 +11,8 @@ use router::Client as RouterClient; use snafu::OptionExt; use store::Client as StoreClient; -use self::heartbeat::HeartbeatSender; -use self::heartbeat::HeartbeatStream; +pub use self::heartbeat::HeartbeatSender; +pub use self::heartbeat::HeartbeatStream; use crate::error; use crate::error::Result; use crate::rpc::BatchPutRequest; diff --git a/src/meta-client/src/rpc.rs b/src/meta-client/src/rpc.rs index 0e34f0057c..3fa5b1a605 100644 --- a/src/meta-client/src/rpc.rs +++ b/src/meta-client/src/rpc.rs @@ -1,6 +1,6 @@ mod router; mod store; -mod util; +pub mod util; use api::v1::meta::KeyValue as PbKeyValue; use api::v1::meta::Peer as PbPeer; diff --git a/src/script/src/table.rs b/src/script/src/table.rs index fec37e6667..19b58178df 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -47,6 +47,7 @@ impl ScriptsTable { desc: Some("Scripts table".to_string()), schema, // name and timestamp as primary key + region_numbers: vec![0], primary_key_indices: vec![0, 3], create_if_not_exists: true, table_options: HashMap::default(), diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index beb3ea9578..09dcd13ffa 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -270,7 +270,8 @@ impl MitoEngineInner { let table_id = request.id; // TODO(dennis): supports multi regions; - let region_number = 0; + assert_eq!(1, request.region_numbers.len()); + let region_number = request.region_numbers[0]; let region_id = region_id(table_id, region_number); let region_name = region_name(table_id, region_number); @@ -311,6 +312,7 @@ impl MitoEngineInner { .engine(MITO_ENGINE) .next_column_id(next_column_id) .primary_key_indices(request.primary_key_indices.clone()) + .region_numbers(vec![region_number]) .build() .context(error::BuildTableMetaSnafu { table_name })?; @@ -495,6 +497,7 @@ mod tests { create_if_not_exists: true, primary_key_indices: Vec::default(), table_options: HashMap::new(), + region_numbers: vec![0], }, ) .await @@ -753,6 +756,7 @@ mod tests { desc: None, primary_key_indices: Vec::default(), table_options: HashMap::new(), + region_numbers: vec![0], }; let created_table = table_engine.create_table(&ctx, request).await.unwrap(); @@ -776,6 +780,7 @@ mod tests { desc: None, primary_key_indices: Vec::default(), table_options: HashMap::new(), + region_numbers: vec![0], }; let result = table_engine.create_table(&ctx, request).await; diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index 2a6e239970..52ea98dc94 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -484,10 +484,10 @@ impl MitoTable { ) -> Result> { let manifest = TableManifest::new(&table_manifest_dir(table_name), object_store); - let table_info = Self::recover_table_info(table_name, &manifest) + let mut table_info = Self::recover_table_info(table_name, &manifest) .await? .context(TableInfoNotFoundSnafu { table_name })?; - + table_info.meta.region_numbers = vec![(region.id() & 0xFFFFFFFF) as u32]; Ok(MitoTable::new(table_info, region, manifest)) } diff --git a/src/table-engine/src/table/test_util.rs b/src/table-engine/src/table/test_util.rs index dd4a3ac3bb..00e6a08c97 100644 --- a/src/table-engine/src/table/test_util.rs +++ b/src/table-engine/src/table/test_util.rs @@ -103,6 +103,7 @@ pub async fn setup_test_engine_and_table() -> ( create_if_not_exists: true, primary_key_indices: Vec::default(), table_options: HashMap::new(), + region_numbers: vec![0], }, ) .await @@ -135,6 +136,7 @@ pub async fn setup_mock_engine_and_table( create_if_not_exists: true, primary_key_indices: Vec::default(), table_options: HashMap::new(), + region_numbers: vec![0], }, ) .await diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 9212f6265a..4a26c16d8b 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -55,6 +55,8 @@ pub struct TableMeta { pub value_indices: Vec, #[builder(default, setter(into))] pub engine: String, + #[builder(default, setter(into))] + pub region_numbers: Vec, pub next_column_id: ColumnId, /// Options for table engine. #[builder(default)] @@ -162,6 +164,7 @@ pub struct RawTableMeta { pub value_indices: Vec, pub engine: String, pub next_column_id: ColumnId, + pub region_numbers: Vec, pub engine_options: HashMap, pub options: HashMap, pub created_on: DateTime, @@ -175,6 +178,7 @@ impl From for RawTableMeta { value_indices: meta.value_indices, engine: meta.engine, next_column_id: meta.next_column_id, + region_numbers: meta.region_numbers, engine_options: meta.engine_options, options: meta.options, created_on: meta.created_on, @@ -191,6 +195,7 @@ impl TryFrom for TableMeta { primary_key_indices: raw.primary_key_indices, value_indices: raw.value_indices, engine: raw.engine, + region_numbers: vec![], next_column_id: raw.next_column_id, engine_options: raw.engine_options, options: raw.options, diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 115bf41ee3..449dca5d69 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -22,6 +22,7 @@ pub struct CreateTableRequest { pub table_name: String, pub desc: Option, pub schema: SchemaRef, + pub region_numbers: Vec, pub primary_key_indices: Vec, pub create_if_not_exists: bool, pub table_options: HashMap, diff --git a/src/table/src/table/numbers.rs b/src/table/src/table/numbers.rs index 119ad78b9c..cf4f4303a2 100644 --- a/src/table/src/table/numbers.rs +++ b/src/table/src/table/numbers.rs @@ -8,12 +8,12 @@ use common_recordbatch::{RecordBatch, RecordBatchStream}; use datafusion_common::record_batch::RecordBatch as DfRecordBatch; use datatypes::arrow::array::UInt32Array; use datatypes::data_type::ConcreteDataType; -use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; use futures::task::{Context, Poll}; use futures::Stream; use crate::error::Result; -use crate::metadata::TableInfoRef; +use crate::metadata::{TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType}; use crate::table::scan::SimpleTableScan; use crate::table::{Expr, Table}; @@ -31,7 +31,12 @@ impl Default for NumbersTable { false, )]; Self { - schema: Arc::new(Schema::new(column_schemas)), + schema: Arc::new( + SchemaBuilder::try_from_columns(column_schemas) + .unwrap() + .build() + .unwrap(), + ), } } } @@ -47,7 +52,26 @@ impl Table for NumbersTable { } fn table_info(&self) -> TableInfoRef { - unimplemented!() + Arc::new( + TableInfoBuilder::default() + .table_id(1) + .name("numbers") + .catalog_name("greptime") + .schema_name("public") + .table_version(0) + .table_type(TableType::Base) + .meta( + TableMetaBuilder::default() + .schema(self.schema.clone()) + .region_numbers(vec![0]) + .primary_key_indices(vec![0]) + .next_column_id(1) + .build() + .unwrap(), + ) + .build() + .unwrap(), + ) } async fn scan(