diff --git a/Cargo.lock b/Cargo.lock index 658cd4529c..4c16c66dff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -276,6 +276,36 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-io" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8121296a9f05be7f34aa4196b1747243b3b62e048bb7906f644f3fbfc490cf7" +dependencies = [ + "async-lock", + "autocfg", + "concurrent-queue", + "futures-lite", + "libc", + "log", + "parking", + "polling", + "slab", + "socket2", + "waker-fn", + "winapi", +] + +[[package]] +name = "async-lock" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8101efe8695a6c17e02911402145357e718ac92d3ff88ae8419e84b1707b685" +dependencies = [ + "event-listener", + "futures-lite", +] + [[package]] name = "async-stream" version = "0.3.3" @@ -657,6 +687,12 @@ version = "3.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1ad822118d20d2c234f427000d5acc36eabe1e29a348c89b63dd60b13f28e5d" +[[package]] +name = "bytecount" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" + [[package]] name = "bytemuck" version = "1.12.1" @@ -698,6 +734,37 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c" +[[package]] +name = "camino" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88ad0e1e3e88dd237a156ab9f571021b8a158caa0ae44b1968a241efb5144c1e" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo-platform" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbdb825da8a5df079a43676dbe042702f1707b1109f713a01420fbb4cc71fa27" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", +] + [[package]] name = "caseless" version = "0.2.1" @@ -1878,6 +1945,15 @@ dependencies = [ "libc", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + [[package]] name = "error-code" version = "2.3.1" @@ -2015,6 +2091,7 @@ dependencies = [ "async-stream", "async-trait", "catalog", + "chrono", "client", "common-base", "common-catalog", @@ -2033,6 +2110,8 @@ dependencies = [ "futures", "itertools", "meta-client", + "meta-srv", + "moka", "openmetrics-parser", "prost 0.11.0", "query", @@ -2174,6 +2253,21 @@ version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbf4d2a7a308fd4578637c0b17c7e1c7ba127b8f6ba00b29f717e9655d85eb68" +[[package]] +name = "futures-lite" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.24" @@ -2945,6 +3039,7 @@ dependencies = [ "futures", "meta-srv", "rand 0.8.5", + "serde", "snafu", "tokio", "tokio-stream", @@ -3084,6 +3179,32 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "moka" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b49a05f67020456541f4f29cbaa812016a266a86ec76f96d3873d459c68fe5e" +dependencies = [ + "async-io", + "async-lock", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "futures-util", + "num_cpus", + "once_cell", + "parking_lot", + "quanta", + "rustc_version", + "scheduled-thread-pool", + "skeptic", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid", +] + [[package]] name = "multimap" version = "0.8.3" @@ -3558,6 +3679,12 @@ version = "6.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ff7415e9ae3fff1225851df9e0d9e4e5479f947619774677a63572e55e80eff" +[[package]] +name = "parking" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" + [[package]] name = "parking_lot" version = "0.12.1" @@ -3919,6 +4046,20 @@ dependencies = [ "syn", ] +[[package]] +name = "polling" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab4609a838d88b73d8238967b60dd115cc08d38e2bbaf51ee1e4b695f89122e2" +dependencies = [ + "autocfg", + "cfg-if", + "libc", + "log", + "wepoll-ffi", + "winapi", +] + [[package]] name = "portable-atomic" version = "0.3.15" @@ -4126,6 +4267,17 @@ dependencies = [ "prost 0.11.0", ] +[[package]] +name = "pulldown-cmark" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d9cc634bc78768157b5cbfe988ffcd1dcba95cd2b2f03a88316c08c6d00ed63" +dependencies = [ + "bitflags", + "memchr", + "unicase", +] + [[package]] name = "quanta" version = "0.10.1" @@ -4865,6 +5017,15 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "977a7519bff143a44f842fd07e80ad1329295bd71686457f18e496736f4bf9bf" +dependencies = [ + "parking_lot", +] + [[package]] name = "schemars" version = "0.8.11" @@ -4976,6 +5137,9 @@ name = "semver" version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4" +dependencies = [ + "serde", +] [[package]] name = "serde" @@ -5189,6 +5353,21 @@ version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata", + "error-chain", + "glob", + "pulldown-cmark", + "tempfile", + "walkdir", +] + [[package]] name = "sketches-ddsketch" version = "0.2.0" @@ -5615,6 +5794,12 @@ dependencies = [ "tokio", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tap" version = "1.0.1" @@ -6188,6 +6373,12 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "triomphe" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1ee9bd9239c339d714d657fac840c6d2a4f9c45f4f9ec7b0975113458be78db" + [[package]] name = "try-lock" version = "0.2.3" @@ -6444,6 +6635,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8e76fae08f03f96e166d2dfda232190638c10e0383841252416f9cfe2ae60e6" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "walkdir" version = "2.3.2" @@ -6572,6 +6769,15 @@ dependencies = [ "webpki", ] +[[package]] +name = "wepoll-ffi" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d743fdedc5c64377b5fc2bc036b01c7fd642205a0d96356034ae3404d49eb7fb" +dependencies = [ + "cc", +] + [[package]] name = "which" version = "4.3.0" diff --git a/src/api/greptime/v1/meta/route.proto b/src/api/greptime/v1/meta/route.proto index 61a636b26e..2c5a43b45d 100644 --- a/src/api/greptime/v1/meta/route.proto +++ b/src/api/greptime/v1/meta/route.proto @@ -69,6 +69,7 @@ message Table { } message Region { + // TODO(LFC): Maybe use message RegionNumber? uint64 id = 1; string name = 2; Partition partition = 3; diff --git a/src/catalog/src/remote/client.rs b/src/catalog/src/remote/client.rs index 4099ccf03e..b072add0bc 100644 --- a/src/catalog/src/remote/client.rs +++ b/src/catalog/src/remote/client.rs @@ -1,4 +1,5 @@ use std::fmt::Debug; +use std::sync::Arc; use async_stream::stream; use common_telemetry::info; @@ -10,7 +11,7 @@ use crate::error::{Error, MetaSrvSnafu}; use crate::remote::{Kv, KvBackend, ValueIter}; #[derive(Debug)] pub struct MetaKvBackend { - pub client: MetaClient, + pub client: Arc, } /// Implement `KvBackend` trait for `MetaKvBackend` instead of opendal's `Accessor` since diff --git a/src/catalog/tests/mock.rs b/src/catalog/tests/mock.rs index f2aad50619..f41e1da533 100644 --- a/src/catalog/tests/mock.rs +++ b/src/catalog/tests/mock.rs @@ -151,6 +151,7 @@ impl TableEngine for MockTableEngine { table_id, catalog_name, schema_name, + vec![0], )) as Arc<_>; let mut tables = self.tables.write().await; diff --git a/src/common/catalog/src/helper.rs b/src/common/catalog/src/helper.rs index 52f64c4453..722bda9b02 100644 --- a/src/common/catalog/src/helper.rs +++ b/src/common/catalog/src/helper.rs @@ -118,6 +118,7 @@ pub struct TableGlobalValue { pub id: TableId, /// Id of datanode that created the global table info kv. only for debugging. pub node_id: u64, + // TODO(LFC): Maybe remove it? /// Allocation of region ids across all datanodes. pub regions_id_map: HashMap>, /// Node id -> region ids diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index ba9f6e8366..17a2a140a4 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -14,7 +14,7 @@ pub struct HeartbeatTask { node_id: u64, server_addr: String, running: Arc, - meta_client: MetaClient, + meta_client: Arc, interval: u64, } @@ -26,7 +26,7 @@ impl Drop for HeartbeatTask { impl HeartbeatTask { /// Create a new heartbeat task instance. - pub fn new(node_id: u64, server_addr: String, meta_client: MetaClient) -> Self { + pub fn new(node_id: u64, server_addr: String, meta_client: Arc) -> Self { Self { node_id, server_addr, diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index bba80ac98a..139415e403 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -34,7 +34,7 @@ pub struct Instance { pub(crate) physical_planner: PhysicalPlanner, pub(crate) script_executor: ScriptExecutor, #[allow(unused)] - pub(crate) meta_client: Option, + pub(crate) meta_client: Option>, pub(crate) heartbeat_task: Option, } @@ -48,7 +48,8 @@ impl Instance { let meta_client = match opts.mode { Mode::Standalone => None, Mode::Distributed => { - Some(new_metasrv_client(opts.node_id, &opts.meta_client_opts).await?) + let meta_client = new_metasrv_client(opts.node_id, &opts.meta_client_opts).await?; + Some(Arc::new(meta_client)) } }; diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index d06a8e9fe5..7b2eb6ee36 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use catalog::remote::MetaKvBackend; use meta_client::client::{MetaClient, MetaClientBuilder}; +use meta_srv::mocks::MockInfo; use query::QueryEngineFactory; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; @@ -23,7 +24,8 @@ impl Instance { use table_engine::table::test_util::MockEngine; use table_engine::table::test_util::MockMitoEngine; - let meta_client = Some(mock_meta_client().await); + let mock_info = meta_srv::mocks::mock_with_memstore().await; + let meta_client = Some(Arc::new(mock_meta_client(mock_info, 0).await)); let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await; let mock_engine = Arc::new(MockMitoEngine::new( TableEngineConfig::default(), @@ -63,9 +65,14 @@ impl Instance { } pub async fn with_mock_meta_client(opts: &DatanodeOptions) -> Result { + let mock_info = meta_srv::mocks::mock_with_memstore().await; + Self::with_mock_meta_server(opts, mock_info).await + } + + pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result { let object_store = new_object_store(&opts.storage).await?; let log_store = create_local_file_log_store(opts).await?; - let meta_client = mock_meta_client().await; + let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id).await); let table_engine = Arc::new(DefaultEngine::new( TableEngineConfig::default(), EngineImpl::new( @@ -104,15 +111,14 @@ impl Instance { } } -async fn mock_meta_client() -> MetaClient { - let mock_info = meta_srv::mocks::mock_with_memstore().await; - let meta_srv::mocks::MockInfo { +async fn mock_meta_client(mock_info: MockInfo, node_id: u64) -> MetaClient { + let MockInfo { server_addr, channel_manager, } = mock_info; let id = (1000u64, 2000u64); - let mut meta_client = MetaClientBuilder::new(id.0, id.1) + let mut meta_client = MetaClientBuilder::new(id.0, node_id) .enable_heartbeat() .enable_router() .enable_store() diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 2e9edf4bdb..401adf4454 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -21,9 +21,12 @@ common-time = { path = "../common/time" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } +datanode = { path = "../datanode" } datatypes = { path = "../datatypes" } futures = "0.3" itertools = "0.10" +meta-client = { path = "../meta-client" } +moka = { version = "0.9", features = ["future"] } openmetrics-parser = "0.4" prost = "0.11" query = { path = "../query" } @@ -36,7 +39,6 @@ sql = { path = "../sql" } store-api = { path = "../store-api" } table = { path = "../table" } tokio = { version = "1.18", features = ["full"] } -meta-client = {path = "../meta-client"} [dependencies.arrow] package = "arrow2" @@ -44,8 +46,9 @@ version = "0.10" features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] [dev-dependencies] -datanode = { path = "../datanode" } +chrono = "0.4" futures = "0.3" +meta-srv = { path = "../meta-srv", features = ["mock"] } tempdir = "0.3" tonic = "0.8" tower = "0.4" diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index af1b0da4d2..c58c7fdf68 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -1,5 +1,5 @@ use std::any::Any; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::sync::Arc; use catalog::error::{ @@ -10,30 +10,32 @@ use catalog::{ CatalogList, CatalogProvider, CatalogProviderRef, SchemaProvider, SchemaProviderRef, }; use common_catalog::{CatalogKey, SchemaKey, TableGlobalKey, TableGlobalValue}; -use common_error::ext::BoxedError; use futures::StreamExt; -use snafu::{OptionExt, ResultExt}; +use meta_client::rpc::TableName; +use snafu::prelude::*; use table::TableRef; -use tokio::sync::RwLock; -use crate::error::DatanodeNotAvailableSnafu; -use crate::mock::{DatanodeId, DatanodeInstance}; +use crate::datanode::DatanodeClients; use crate::partitioning::range::RangePartitionRule; +use crate::table::route::TableRoutes; use crate::table::DistTable; -pub type DatanodeInstances = HashMap; - pub struct FrontendCatalogManager { backend: KvBackendRef, - datanode_instances: Arc>, + table_routes: Arc, + datanode_clients: Arc, } impl FrontendCatalogManager { - #[allow(dead_code)] - pub fn new(backend: KvBackendRef, datanode_instances: Arc>) -> Self { + pub(crate) fn new( + backend: KvBackendRef, + table_routes: Arc, + datanode_clients: Arc, + ) -> Self { Self { backend, - datanode_instances, + table_routes, + datanode_clients, } } } @@ -79,7 +81,8 @@ impl CatalogList for FrontendCatalogManager { Ok(Some(Arc::new(FrontendCatalogProvider { catalog_name: name.to_string(), backend: self.backend.clone(), - datanode_instances: self.datanode_instances.clone(), + table_routes: self.table_routes.clone(), + datanode_clients: self.datanode_clients.clone(), }))) } else { Ok(None) @@ -90,7 +93,8 @@ impl CatalogList for FrontendCatalogManager { pub struct FrontendCatalogProvider { catalog_name: String, backend: KvBackendRef, - datanode_instances: Arc>, + table_routes: Arc, + datanode_clients: Arc, } impl CatalogProvider for FrontendCatalogProvider { @@ -136,7 +140,8 @@ impl CatalogProvider for FrontendCatalogProvider { catalog_name: self.catalog_name.clone(), schema_name: name.to_string(), backend: self.backend.clone(), - datanode_instances: self.datanode_instances.clone(), + table_routes: self.table_routes.clone(), + datanode_clients: self.datanode_clients.clone(), }))) } else { Ok(None) @@ -148,7 +153,8 @@ pub struct FrontendSchemaProvider { catalog_name: String, schema_name: String, backend: KvBackendRef, - datanode_instances: Arc>, + table_routes: Arc, + datanode_clients: Arc, } impl SchemaProvider for FrontendSchemaProvider { @@ -187,24 +193,20 @@ impl SchemaProvider for FrontendSchemaProvider { table_name: name.to_string(), }; - let instances = self.datanode_instances.clone(); let backend = self.backend.clone(); - let table_name = name.to_string(); + let table_routes = self.table_routes.clone(); + let datanode_clients = self.datanode_clients.clone(); + let table_name = TableName::new(&self.catalog_name, &self.schema_name, name); let result: Result, catalog::error::Error> = std::thread::spawn(|| { common_runtime::block_on_read(async move { - let mut datanode_instances = HashMap::new(); let res = match backend.get(table_global_key.to_string().as_bytes()).await? { None => { return Ok(None); } Some(r) => r, }; - - let mut region_to_datanode_map = HashMap::new(); - let val = TableGlobalValue::parse(String::from_utf8_lossy(&res.1)) .context(InvalidCatalogValueSnafu)?; - let node_id: DatanodeId = val.node_id; // TODO(hl): We need to deserialize string to PartitionRule trait object let partition_rule: Arc = @@ -214,26 +216,8 @@ impl SchemaProvider for FrontendSchemaProvider { }, )?); - for (node_id, region_ids) in val.regions_id_map { - for region_id in region_ids { - region_to_datanode_map.insert(region_id, node_id); - } - } - - datanode_instances.insert( - node_id, - instances - .read() - .await - .get(&node_id) - .context(DatanodeNotAvailableSnafu { node_id }) - .map_err(BoxedError::new) - .context(catalog::error::InternalSnafu)? - .clone(), - ); - let table = Arc::new(DistTable { - table_name: table_name.clone(), + table_name, schema: Arc::new( val.meta .schema @@ -241,8 +225,8 @@ impl SchemaProvider for FrontendSchemaProvider { .context(InvalidSchemaInCatalogSnafu)?, ), partition_rule, - region_dist_map: region_to_datanode_map, - datanode_instances, + table_routes, + datanode_clients, }); Ok(Some(table as _)) }) diff --git a/src/frontend/src/datanode.rs b/src/frontend/src/datanode.rs new file mode 100644 index 0000000000..9884e70731 --- /dev/null +++ b/src/frontend/src/datanode.rs @@ -0,0 +1,39 @@ +use std::time::Duration; + +use client::Client; +use common_grpc::channel_manager::ChannelManager; +use meta_client::rpc::Peer; +use moka::future::{Cache, CacheBuilder}; + +pub(crate) struct DatanodeClients { + channel_manager: ChannelManager, + clients: Cache, +} + +impl DatanodeClients { + pub(crate) fn new() -> Self { + Self { + channel_manager: ChannelManager::new(), + clients: CacheBuilder::new(1024) + .time_to_live(Duration::from_secs(30 * 60)) + .time_to_idle(Duration::from_secs(5 * 60)) + .build(), + } + } + + pub(crate) async fn get_client(&self, datanode: &Peer) -> Client { + self.clients + .get_with_by_ref(datanode, async move { + Client::with_manager_and_urls( + self.channel_manager.clone(), + vec![datanode.addr.clone()], + ) + }) + .await + } + + #[cfg(test)] + pub(crate) async fn insert_client(&self, datanode: Peer, client: Client) { + self.clients.insert(datanode, client).await + } +} diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 08cf3c96b7..996e5bb435 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -5,8 +5,6 @@ use common_query::logical_plan::Expr; use datafusion_common::ScalarValue; use store_api::storage::RegionId; -use crate::mock::DatanodeId; - #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { @@ -123,12 +121,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Failed to get Datanode instance: {:?}", datanode))] - DatanodeInstance { - datanode: DatanodeId, - backtrace: Backtrace, - }, - #[snafu(display("Invaild InsertRequest, reason: {}", reason))] InvalidInsertRequest { reason: String, @@ -184,8 +176,23 @@ pub enum Error { source: common_catalog::error::Error, }, - #[snafu(display("Cannot find datanode by id: {}", node_id))] - DatanodeNotAvailable { node_id: u64, backtrace: Backtrace }, + #[snafu(display("Failed to request Meta, source: {}", source))] + RequestMeta { + #[snafu(backtrace)] + source: meta_client::error::Error, + }, + + #[snafu(display("Failed to get cache, error: {}", err_msg))] + GetCache { + err_msg: String, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to find table routes for table {}", table_name))] + FindTableRoutes { + table_name: String, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -216,7 +223,8 @@ impl ErrorExt for Error { Error::ColumnDataType { .. } | Error::FindDatanode { .. } - | Error::DatanodeInstance { .. } => StatusCode::Internal, + | Error::GetCache { .. } + | Error::FindTableRoutes { .. } => StatusCode::Internal, Error::IllegalFrontendState { .. } | Error::IncompleteGrpcResult { .. } => { StatusCode::Unexpected @@ -229,7 +237,8 @@ impl ErrorExt for Error { Error::JoinTask { .. } => StatusCode::Unexpected, Error::Catalog { source, .. } => source.status_code(), Error::ParseCatalogEntry { source, .. } => source.status_code(), - Error::DatanodeNotAvailable { .. } => StatusCode::StorageUnavailable, + + Error::RequestMeta { source } => source.status_code(), } } diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 306f92a1a6..3f7666bbb3 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use datanode::datanode::Mode; use serde::{Deserialize, Serialize}; use snafu::prelude::*; @@ -22,6 +23,7 @@ pub struct FrontendOptions { pub opentsdb_options: Option, pub influxdb_options: Option, pub prometheus_options: Option, + pub mode: Mode, } impl Default for FrontendOptions { @@ -34,6 +36,7 @@ impl Default for FrontendOptions { opentsdb_options: Some(OpentsdbOptions::default()), influxdb_options: Some(InfluxdbOptions::default()), prometheus_options: Some(PrometheusOptions::default()), + mode: Mode::Standalone, } } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index efeaa10f07..203f5c0ca4 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -3,6 +3,8 @@ mod opentsdb; mod prometheus; use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; use api::helper::ColumnDataTypeWrapper; use api::v1::{ @@ -10,11 +12,15 @@ use api::v1::{ InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult, }; use async_trait::async_trait; +use catalog::remote::MetaKvBackend; use client::admin::{admin_result_to_output, Admin}; use client::{Client, Database, Select}; use common_error::prelude::BoxedError; +use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_query::Output; +use datanode::datanode::{MetaClientOpts, Mode}; use datatypes::schema::ColumnSchema; +use meta_client::client::MetaClientBuilder; use servers::error as server_error; use servers::query_handler::{ GrpcAdminHandler, GrpcQueryHandler, InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, @@ -27,8 +33,11 @@ use sql::statements::statement::Statement; use sql::statements::{column_def_to_schema, table_idents_to_full_name}; use sql::{dialect::GenericDialect, parser::ParserContext}; +use crate::catalog::FrontendCatalogManager; +use crate::datanode::DatanodeClients; use crate::error::{self, ConvertColumnDefaultConstraintSnafu, Result}; use crate::frontend::FrontendOptions; +use crate::table::route::TableRoutes; #[async_trait] pub trait FrontendInstance: @@ -48,6 +57,7 @@ pub trait FrontendInstance: #[derive(Default)] pub struct Instance { client: Client, + catalog_manager: Option, } impl Instance { @@ -71,6 +81,39 @@ impl FrontendInstance for Instance { async fn start(&mut self, opts: &FrontendOptions) -> Result<()> { let addr = opts.datanode_grpc_addr(); self.client.start(vec![addr]); + + let meta_client = match opts.mode { + Mode::Standalone => None, + Mode::Distributed => { + let meta_config = MetaClientOpts::default(); + let channel_config = ChannelConfig::new() + .timeout(Duration::from_millis(meta_config.timeout_millis)) + .connect_timeout(Duration::from_millis(meta_config.connect_timeout_millis)) + .tcp_nodelay(meta_config.tcp_nodelay); + + let channel_manager = ChannelManager::with_config(channel_config); + + let meta_client = MetaClientBuilder::new(0, 0) + .enable_router() + .enable_store() + .channel_manager(channel_manager) + .build(); + Some(Arc::new(meta_client)) + } + }; + + self.catalog_manager = if let Some(meta_client) = meta_client { + let meta_backend = Arc::new(MetaKvBackend { + client: meta_client.clone(), + }); + let table_routes = Arc::new(TableRoutes::new(meta_client)); + let datanode_clients = Arc::new(DatanodeClients::new()); + let catalog_manager = + FrontendCatalogManager::new(meta_backend, table_routes, datanode_clients); + Some(catalog_manager) + } else { + None + }; Ok(()) } } @@ -78,7 +121,10 @@ impl FrontendInstance for Instance { #[cfg(test)] impl Instance { pub fn with_client(client: Client) -> Self { - Self { client } + Self { + client, + catalog_manager: None, + } } } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 8da90c52e7..4f13c321e0 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -1,6 +1,7 @@ #![feature(assert_matches)] mod catalog; +mod datanode; pub mod error; pub mod frontend; pub mod grpc; diff --git a/src/frontend/src/mock.rs b/src/frontend/src/mock.rs index 5932aa8ed3..a719bdc8a9 100644 --- a/src/frontend/src/mock.rs +++ b/src/frontend/src/mock.rs @@ -4,7 +4,6 @@ use std::fmt::Formatter; use std::sync::Arc; use api::v1::InsertExpr; -use catalog::CatalogManagerRef; use client::ObjectResult; use client::{Database, Select}; use common_query::prelude::Expr; @@ -15,15 +14,14 @@ use datafusion::logical_plan::{LogicalPlan as DfLogicPlan, LogicalPlanBuilder}; use datafusion_expr::Expr as DfExpr; use datatypes::prelude::Value; use datatypes::schema::SchemaRef; +use meta_client::rpc::TableName; use query::plan::LogicalPlan; use table::table::adapter::DfTableProviderAdapter; - -pub(crate) type DatanodeId = u64; +use table::TableRef; #[derive(Clone)] pub struct DatanodeInstance { - pub(crate) datanode_id: DatanodeId, - catalog_manager: CatalogManagerRef, + table: TableRef, db: Database, } @@ -34,30 +32,19 @@ impl std::fmt::Debug for DatanodeInstance { } impl DatanodeInstance { - #[allow(dead_code)] - pub(crate) fn new( - datanode_id: DatanodeId, - catalog_list: CatalogManagerRef, - db: Database, - ) -> Self { - Self { - datanode_id, - catalog_manager: catalog_list, - db, - } + pub(crate) fn new(table: TableRef, db: Database) -> Self { + Self { table, db } } pub(crate) async fn grpc_insert(&self, request: InsertExpr) -> client::Result { self.db.insert(request).await } - #[allow(clippy::print_stdout)] pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> RecordBatches { let logical_plan = self.build_logical_plan(&plan); // TODO(LFC): Directly pass in logical plan to GRPC interface when our substrait codec supports filter. let sql = to_sql(logical_plan); - println!("executing sql \"{}\" in datanode {}", sql, self.datanode_id); let result = self.db.select(Select::Sql(sql)).await.unwrap(); let output: Output = result.try_into().unwrap(); @@ -72,13 +59,10 @@ impl DatanodeInstance { } fn build_logical_plan(&self, table_scan: &TableScanPlan) -> LogicalPlan { - let catalog = self.catalog_manager.catalog("greptime").unwrap().unwrap(); - let schema = catalog.schema("public").unwrap().unwrap(); - let table = schema.table(&table_scan.table_name).unwrap().unwrap(); - let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone())); + let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone())); let mut builder = LogicalPlanBuilder::scan_with_filters( - table_scan.table_name.clone(), + &table_scan.table_name.table_name, table_provider, table_scan.projection.clone(), table_scan @@ -99,7 +83,7 @@ impl DatanodeInstance { #[derive(Debug)] pub(crate) struct TableScanPlan { - pub table_name: String, + pub table_name: TableName, pub projection: Option>, pub filters: Vec, pub limit: Option, diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index cc759b05a9..65a2a83b84 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -1,10 +1,12 @@ mod insert; +pub(crate) mod route; use std::any::Any; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use async_trait::async_trait; +use client::Database; use common_query::error::Result as QueryResult; use common_query::logical_plan::Expr; use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef}; @@ -13,6 +15,7 @@ use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::logical_plan::Expr as DfExpr; use datafusion::physical_plan::Partitioning; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use meta_client::rpc::{Peer, TableName}; use snafu::prelude::*; use store_api::storage::RegionNumber; use table::error::Error as TableError; @@ -21,17 +24,20 @@ use table::requests::InsertRequest; use table::Table; use tokio::sync::RwLock; +use crate::datanode::DatanodeClients; use crate::error::{self, Error, Result}; -use crate::mock::{DatanodeId, DatanodeInstance, TableScanPlan}; +use crate::mock::{DatanodeInstance, TableScanPlan}; use crate::partitioning::{Operator, PartitionExpr, PartitionRuleRef}; use crate::spliter::WriteSpliter; +use crate::table::route::TableRoutes; +#[derive(Clone)] pub struct DistTable { - pub table_name: String, - pub schema: SchemaRef, - pub partition_rule: PartitionRuleRef, - pub region_dist_map: HashMap, - pub datanode_instances: HashMap, + pub(crate) table_name: TableName, + pub(crate) schema: SchemaRef, + pub(crate) partition_rule: PartitionRuleRef, + pub(crate) table_routes: Arc, + pub(crate) datanode_clients: Arc, } #[async_trait] @@ -65,31 +71,28 @@ impl Table for DistTable { limit: Option, ) -> table::Result { let regions = self.find_regions(filters).map_err(TableError::new)?; - let datanodes = self.find_datanodes(regions).map_err(TableError::new)?; - - let partition_execs = datanodes - .iter() - .map(|(datanode, _regions)| { - let datanode_instance = self - .datanode_instances - .get(datanode) - .context(error::DatanodeInstanceSnafu { - datanode: *datanode, - })? - .clone(); - // TODO(LFC): Pass in "regions" when Datanode supports multi regions for a table. - Ok(PartitionExec { - table_name: self.table_name.clone(), - datanode_instance, - projection: projection.clone(), - filters: filters.to_vec(), - limit, - batches: Arc::new(RwLock::new(None)), - }) - }) - .collect::>>() + let datanodes = self + .find_datanodes(regions) + .await .map_err(TableError::new)?; + let mut partition_execs = Vec::with_capacity(datanodes.len()); + for (datanode, _regions) in datanodes.iter() { + let client = self.datanode_clients.get_client(datanode).await; + let db = Database::new(&self.table_name.schema_name, client); + let datanode_instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db); + + // TODO(LFC): Pass in "regions" when Datanode supports multi regions for a table. + partition_execs.push(PartitionExec { + table_name: self.table_name.clone(), + datanode_instance, + projection: projection.clone(), + filters: filters.to_vec(), + limit, + batches: Arc::new(RwLock::new(None)), + }) + } + let dist_scan = DistTableScan { schema: project_schema(self.schema(), projection), partition_execs, @@ -188,15 +191,24 @@ impl DistTable { .collect::>()) } - fn find_datanodes( + async fn find_datanodes( &self, regions: Vec, - ) -> Result>> { + ) -> Result>> { + let route = self.table_routes.get_route(&self.table_name).await?; + let mut datanodes = HashMap::new(); for region in regions.iter() { - let datanode = *self - .region_dist_map - .get(region) + let datanode = route + .region_routes + .iter() + .find_map(|x| { + if x.region.id == *region as u64 { + x.leader_peer.clone() + } else { + None + } + }) .context(error::FindDatanodeSnafu { region: *region })?; datanodes .entry(datanode) @@ -283,7 +295,7 @@ impl PhysicalPlan for DistTableScan { #[derive(Debug)] struct PartitionExec { - table_name: String, + table_name: TableName, datanode_instance: DatanodeInstance, projection: Option>, filters: Vec, @@ -325,8 +337,10 @@ impl PartitionExec { #[allow(clippy::print_stdout)] #[cfg(test)] mod test { + use api::v1::meta::{PutRequest, RequestHeader}; use catalog::RegisterTableRequest; - use client::Database; + use chrono::DateTime; + use common_catalog::{TableGlobalKey, TableGlobalValue}; use common_recordbatch::{util, RecordBatch}; use datafusion::arrow_print; use datafusion_common::record_batch::RecordBatch as DfRecordBatch; @@ -336,8 +350,15 @@ mod test { use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; use datanode::instance::Instance; use datatypes::prelude::{ConcreteDataType, VectorRef}; - use datatypes::schema::{ColumnSchema, Schema}; - use datatypes::vectors::{Int32Vector, UInt32Vector}; + use datatypes::schema::{ColumnSchema, RawSchema, Schema}; + use datatypes::vectors::{Int32Vector, UInt32Vector, UInt64Vector}; + use meta_client::client::MetaClientBuilder; + use meta_client::rpc::{CreateRequest, Partition}; + use meta_srv::metasrv::MetaSrvOptions; + use meta_srv::mocks::MockInfo; + use meta_srv::service::store::kv::KvStoreRef; + use meta_srv::service::store::memory::MemStore; + use table::metadata::RawTableMeta; use table::test_util::MemTable; use table::TableRef; use tempdir::TempDir; @@ -359,21 +380,21 @@ mod test { // should scan only region 1 // select a, row_id from numbers where a < 10 - let projection = Some(vec![0, 1]); + let projection = Some(vec![1, 2]); let filters = vec![binary_expr(col("a"), Operator::Lt, lit(10)).into()]; exec_table_scan(table.clone(), projection, filters, None).await; println!(); // should scan region 1 and 2 // select a, row_id from numbers where a < 15 - let projection = Some(vec![0, 1]); + let projection = Some(vec![1, 2]); let filters = vec![binary_expr(col("a"), Operator::Lt, lit(15)).into()]; exec_table_scan(table.clone(), projection, filters, None).await; println!(); // should scan region 2 and 3 // select a, row_id from numbers where a < 40 and a >= 10 - let projection = Some(vec![0, 1]); + let projection = Some(vec![1, 2]); let filters = vec![and( binary_expr(col("a"), Operator::Lt, lit(40)), binary_expr(col("a"), Operator::GtEq, lit(10)), @@ -384,7 +405,7 @@ mod test { // should scan all regions // select a, row_id from numbers where a < 1000 and row_id == 1 - let projection = Some(vec![0, 1]); + let projection = Some(vec![1, 2]); let filters = vec![and( binary_expr(col("a"), Operator::Lt, lit(1000)), binary_expr(col("row_id"), Operator::Eq, lit(1)), @@ -424,10 +445,12 @@ mod test { } async fn new_dist_table() -> DistTable { - let schema = Arc::new(Schema::new(vec![ + let column_schemas = vec![ + ColumnSchema::new("ts", ConcreteDataType::uint64_datatype(), false), ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true), ColumnSchema::new("row_id", ConcreteDataType::uint32_datatype(), true), - ])); + ]; + let schema = Arc::new(Schema::new(column_schemas.clone())); // PARTITION BY RANGE (a) ( // PARTITION r1 VALUES LESS THAN (10), @@ -438,54 +461,171 @@ mod test { let partition_rule = RangePartitionRule::new( "a", vec![10_i32.into(), 20_i32.into(), 50_i32.into()], - vec![1_u32, 2, 3, 4], + vec![0_u32, 1, 2, 3], ); - let table1 = new_memtable(schema.clone(), (0..5).collect::>()); - let table2 = new_memtable(schema.clone(), (10..15).collect::>()); - let table3 = new_memtable(schema.clone(), (30..35).collect::>()); - let table4 = new_memtable(schema.clone(), (100..105).collect::>()); + let kv_store: KvStoreRef = Arc::new(MemStore::default()) as _; + let meta_srv = + meta_srv::mocks::mock(MetaSrvOptions::default(), kv_store.clone(), None).await; - let instance1 = create_datanode_instance(1, table1).await; - let instance2 = create_datanode_instance(2, table2).await; - let instance3 = create_datanode_instance(3, table3).await; - let instance4 = create_datanode_instance(4, table4).await; + let mut datanode_instances = HashMap::new(); + for datanode_id in 1..=4 { + datanode_instances.insert( + datanode_id, + create_datanode_instance(datanode_id, meta_srv.clone()).await, + ); + } - let datanode_instances = HashMap::from([ - (instance1.datanode_id, instance1), - (instance2.datanode_id, instance2), - (instance3.datanode_id, instance3), - (instance4.datanode_id, instance4), - ]); + let MockInfo { + server_addr, + channel_manager, + } = meta_srv.clone(); + let mut meta_client = MetaClientBuilder::new(1000, 0) + .enable_router() + .enable_store() + .channel_manager(channel_manager) + .build(); + meta_client.start(&[&server_addr]).await.unwrap(); + let meta_client = Arc::new(meta_client); + + let table_name = TableName::new("greptime", "public", "dist_numbers"); + let create_request = CreateRequest { + table_name: table_name.clone(), + partitions: vec![ + Partition { + column_list: vec![b"a".to_vec()], + value_list: vec![b"10".to_vec()], + }, + Partition { + column_list: vec![b"a".to_vec()], + value_list: vec![b"20".to_vec()], + }, + Partition { + column_list: vec![b"a".to_vec()], + value_list: vec![b"50".to_vec()], + }, + Partition { + column_list: vec![b"a".to_vec()], + value_list: vec![b"MAXVALUE".to_vec()], + }, + ], + }; + let mut route_response = meta_client.create_route(create_request).await.unwrap(); + let table_route = route_response.table_routes.remove(0); + println!("{}", serde_json::to_string_pretty(&table_route).unwrap()); + + let mut region_to_datanode_mapping = HashMap::new(); + for region_route in table_route.region_routes.iter() { + let region_id = region_route.region.id as u32; + let datanode_id = region_route.leader_peer.as_ref().unwrap().id; + region_to_datanode_mapping.insert(region_id, datanode_id); + } + + let table_global_key = TableGlobalKey { + catalog_name: table_name.catalog_name.clone(), + schema_name: table_name.schema_name.clone(), + table_name: table_name.table_name.clone(), + }; + let table_global_value = TableGlobalValue { + id: table_route.table.id as u32, + node_id: table_route + .region_routes + .first() + .unwrap() + .leader_peer + .as_ref() + .unwrap() + .id, + regions_id_map: HashMap::new(), + meta: RawTableMeta { + schema: RawSchema { + column_schemas: column_schemas.clone(), + timestamp_index: Some(0), + version: 0, + }, + primary_key_indices: vec![], + value_indices: vec![], + engine: "".to_string(), + next_column_id: column_schemas.len() as u32, + region_numbers: vec![], + engine_options: HashMap::new(), + options: HashMap::new(), + created_on: DateTime::default(), + }, + partition_rules: serde_json::to_string(&partition_rule).unwrap(), + }; + let _put_response = kv_store + .put(PutRequest { + header: Some(RequestHeader::new((1000, 0))), + key: table_global_key.to_string().as_bytes().to_vec(), + value: table_global_value.as_bytes().unwrap(), + prev_kv: true, + }) + .await + .unwrap(); + + let datanode_clients = Arc::new(DatanodeClients::new()); + let mut global_start_ts = 1; + let regional_numbers = vec![ + (0, (0..5).collect::>()), + (1, (10..15).collect::>()), + (2, (30..35).collect::>()), + (3, (100..105).collect::>()), + ]; + for (region_id, numbers) in regional_numbers { + let datanode_id = *region_to_datanode_mapping.get(®ion_id).unwrap(); + let instance = datanode_instances.get(&datanode_id).unwrap().clone(); + + let start_ts = global_start_ts; + global_start_ts += numbers.len() as u64; + + let table = new_memtable(schema.clone(), numbers, vec![region_id], start_ts); + register_datanode_table(instance.clone(), table).await; + + let (addr, client) = crate::tests::create_datanode_client(instance).await; + datanode_clients + .insert_client(Peer::new(datanode_id, addr), client) + .await; + } DistTable { - table_name: "dist_numbers".to_string(), + table_name, schema, partition_rule: Arc::new(partition_rule), - region_dist_map: HashMap::from([(1_u32, 1), (2_u32, 2), (3_u32, 3), (4_u32, 4)]), - datanode_instances, + table_routes: Arc::new(TableRoutes::new(meta_client)), + datanode_clients, } } - fn new_memtable(schema: SchemaRef, data: Vec) -> MemTable { + fn new_memtable( + schema: SchemaRef, + data: Vec, + regions: Vec, + start_ts: u64, + ) -> MemTable { let rows = data.len() as u32; let columns: Vec = vec![ + // column "ts" + Arc::new(UInt64Vector::from_slice( + (start_ts..start_ts + rows as u64).collect::>(), + )), // column "a" Arc::new(Int32Vector::from_slice(data)), // column "row_id" Arc::new(UInt32Vector::from_slice((1..=rows).collect::>())), ]; let recordbatch = RecordBatch::new(schema, columns).unwrap(); - MemTable::new("dist_numbers", recordbatch) + MemTable::new_with_region("dist_numbers", recordbatch, regions) } - async fn create_datanode_instance( - datanode_id: DatanodeId, - table: MemTable, - ) -> DatanodeInstance { - let wal_tmp_dir = TempDir::new_in("/tmp", "gt_wal_dist_table_test").unwrap(); - let data_tmp_dir = TempDir::new_in("/tmp", "gt_data_dist_table_test").unwrap(); + async fn create_datanode_instance(datanode_id: u64, meta_srv: MockInfo) -> Arc { + let current = common_time::util::current_time_millis(); + let wal_tmp_dir = + TempDir::new_in("/tmp", &format!("dist_table_test-wal-{}", current)).unwrap(); + let data_tmp_dir = + TempDir::new_in("/tmp", &format!("dist_table_test-data-{}", current)).unwrap(); let opts = DatanodeOptions { + node_id: datanode_id, wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), storage: ObjectStoreConfig::File { data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), @@ -493,28 +633,26 @@ mod test { ..Default::default() }; - let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); + let instance = Arc::new( + Instance::with_mock_meta_server(&opts, meta_srv) + .await + .unwrap(), + ); instance.start().await.unwrap(); - let catalog_manager = instance.catalog_manager().clone(); - let client = crate::tests::create_datanode_client(instance).await; + instance + } - let table_name = table.table_name().to_string(); - catalog_manager + async fn register_datanode_table(instance: Arc, table: MemTable) { + let catalog_manager = instance.catalog_manager().clone(); + let _ = catalog_manager .register_table(RegisterTableRequest { catalog: "greptime".to_string(), schema: "public".to_string(), - table_name: table_name.clone(), + table_name: table.table_name().to_string(), table_id: 1234, table: Arc::new(table), }) - .await - .unwrap(); - - DatanodeInstance::new( - datanode_id, - catalog_manager, - Database::new("greptime", client), - ) + .await; } #[tokio::test(flavor = "multi_thread")] @@ -531,31 +669,31 @@ mod test { // test simple filter test( vec![binary_expr(col("a"), Operator::Lt, lit(10)).into()], // a < 10 - vec![1], + vec![0], ); test( vec![binary_expr(col("a"), Operator::LtEq, lit(10)).into()], // a <= 10 - vec![1, 2], + vec![0, 1], ); test( vec![binary_expr(lit(20), Operator::Gt, col("a")).into()], // 20 > a - vec![1, 2], + vec![0, 1], ); test( vec![binary_expr(lit(20), Operator::GtEq, col("a")).into()], // 20 >= a - vec![1, 2, 3], + vec![0, 1, 2], ); test( vec![binary_expr(lit(45), Operator::Eq, col("a")).into()], // 45 == a - vec![3], + vec![2], ); test( vec![binary_expr(col("a"), Operator::NotEq, lit(45)).into()], // a != 45 - vec![1, 2, 3, 4], + vec![0, 1, 2, 3], ); test( vec![binary_expr(col("a"), Operator::Gt, lit(50)).into()], // a > 50 - vec![4], + vec![3], ); // test multiple filters @@ -564,21 +702,21 @@ mod test { binary_expr(col("a"), Operator::Gt, lit(10)).into(), binary_expr(col("a"), Operator::Gt, lit(50)).into(), ], // [a > 10, a > 50] - vec![4], + vec![3], ); // test finding all regions when provided with not supported filters or not partition column test( vec![binary_expr(col("row_id"), Operator::LtEq, lit(123)).into()], // row_id <= 123 - vec![1, 2, 3, 4], + vec![0, 1, 2, 3], ); test( vec![binary_expr(col("b"), Operator::Like, lit("foo%")).into()], // b LIKE 'foo%' - vec![1, 2, 3, 4], + vec![0, 1, 2, 3], ); test( vec![binary_expr(col("c"), Operator::Gt, lit(123)).into()], // c > 789 - vec![1, 2, 3, 4], + vec![0, 1, 2, 3], ); // test complex "AND" or "OR" filters @@ -591,7 +729,7 @@ mod test { ), ) .into()], // row_id < 1 OR (row_id < 1 AND a > 1) - vec![1, 2, 3, 4], + vec![0, 1, 2, 3], ); test( vec![or( @@ -599,7 +737,7 @@ mod test { binary_expr(col("a"), Operator::GtEq, lit(20)), ) .into()], // a < 20 OR a >= 20 - vec![1, 2, 3, 4], + vec![0, 1, 2, 3], ); test( vec![and( @@ -607,7 +745,7 @@ mod test { binary_expr(col("a"), Operator::Lt, lit(50)), ) .into()], // a < 20 AND a < 50 - vec![1, 2], + vec![0, 1], ); // test failed to find regions by contradictory filters diff --git a/src/frontend/src/table/insert.rs b/src/frontend/src/table/insert.rs index fb1e637d81..4bde389066 100644 --- a/src/frontend/src/table/insert.rs +++ b/src/frontend/src/table/insert.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::codec; @@ -7,7 +8,7 @@ use api::v1::insert_expr::Expr; use api::v1::Column; use api::v1::InsertExpr; use api::v1::MutateResult; -use client::ObjectResult; +use client::{Database, ObjectResult}; use snafu::ensure; use snafu::OptionExt; use snafu::ResultExt; @@ -17,26 +18,32 @@ use table::requests::InsertRequest; use super::DistTable; use crate::error; use crate::error::Result; +use crate::mock::DatanodeInstance; impl DistTable { pub async fn dist_insert( &self, inserts: HashMap, ) -> Result { - let mut joins = Vec::with_capacity(inserts.len()); + let route = self.table_routes.get_route(&self.table_name).await?; + let mut joins = Vec::with_capacity(inserts.len()); for (region_id, insert) in inserts { - let db = self - .region_dist_map - .get(®ion_id) + let datanode = route + .region_routes + .iter() + .find_map(|x| { + if x.region.id == region_id as u64 { + x.leader_peer.clone() + } else { + None + } + }) .context(error::FindDatanodeSnafu { region: region_id })?; - let instance = self - .datanode_instances - .get(db) - .context(error::DatanodeInstanceSnafu { datanode: *db })?; - - let instance = instance.clone(); + let client = self.datanode_clients.get_client(&datanode).await; + let db = Database::new(&self.table_name.schema_name, client); + let instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db); // TODO(fys): a separate runtime should be used here. let join = tokio::spawn(async move { diff --git a/src/frontend/src/table/route.rs b/src/frontend/src/table/route.rs new file mode 100644 index 0000000000..f058b3cb9a --- /dev/null +++ b/src/frontend/src/table/route.rs @@ -0,0 +1,56 @@ +use std::sync::Arc; +use std::time::Duration; + +use meta_client::client::MetaClient; +use meta_client::rpc::{RouteRequest, TableName, TableRoute}; +use moka::future::{Cache, CacheBuilder}; +use snafu::{ensure, ResultExt}; + +use crate::error::{self, Result}; + +pub(crate) struct TableRoutes { + meta_client: Arc, + cache: Cache>, +} + +impl TableRoutes { + pub(crate) fn new(meta_client: Arc) -> Self { + Self { + meta_client, + cache: CacheBuilder::new(1024) + .time_to_live(Duration::from_secs(30 * 60)) + .time_to_idle(Duration::from_secs(5 * 60)) + .build(), + } + } + + pub(crate) async fn get_route(&self, table_name: &TableName) -> Result> { + self.cache + .try_get_with_by_ref(table_name, self.get_from_meta(table_name)) + .await + .map_err(|e| { + error::GetCacheSnafu { + err_msg: format!("{:?}", e), + } + .build() + }) + } + + async fn get_from_meta(&self, table_name: &TableName) -> Result> { + let mut resp = self + .meta_client + .route(RouteRequest { + table_names: vec![table_name.clone()], + }) + .await + .context(error::RequestMetaSnafu)?; + ensure!( + !resp.table_routes.is_empty(), + error::FindTableRoutesSnafu { + table_name: table_name.to_string() + } + ); + let route = resp.table_routes.swap_remove(0); + Ok(Arc::new(route)) + } +} diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index b4a408109e..76a823fb22 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -20,11 +20,13 @@ async fn create_datanode_instance() -> Arc { pub(crate) async fn create_frontend_instance() -> Arc { let datanode_instance = create_datanode_instance().await; - let client = create_datanode_client(datanode_instance).await; + let (_, client) = create_datanode_client(datanode_instance).await; Arc::new(Instance::with_client(client)) } -pub(crate) async fn create_datanode_client(datanode_instance: Arc) -> Client { +pub(crate) async fn create_datanode_client( + datanode_instance: Arc, +) -> (String, Client) { let (client, server) = tokio::io::duplex(1024); let runtime = Arc::new( @@ -49,8 +51,8 @@ pub(crate) async fn create_datanode_client(datanode_instance: Arc) -> std::fmt::Result { + write!( + f, + "{}.{}.{}", + self.catalog_name, self.schema_name, self.table_name + ) + } +} + impl TableName { pub fn new( catalog_name: impl Into, @@ -131,7 +144,7 @@ impl From for TableName { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)] pub struct Peer { pub id: u64, pub addr: String, diff --git a/src/meta-client/src/rpc/router.rs b/src/meta-client/src/rpc/router.rs index 6366e7d5a3..381199a453 100644 --- a/src/meta-client/src/rpc/router.rs +++ b/src/meta-client/src/rpc/router.rs @@ -6,6 +6,7 @@ use api::v1::meta::Region as PbRegion; use api::v1::meta::RouteRequest as PbRouteRequest; use api::v1::meta::RouteResponse as PbRouteResponse; use api::v1::meta::Table as PbTable; +use serde::{Deserialize, Serialize, Serializer}; use snafu::OptionExt; use super::util; @@ -96,25 +97,29 @@ impl TryFrom for RouteResponse { err_msg: "table required", })? .try_into()?; - let region_routes = table_route - .region_routes - .into_iter() - .map(|region_route| { - let region = region_route.region.map(Into::into); - let leader_peer = get_peer(region_route.leader_peer_index); - let follower_peers = region_route - .follower_peer_indexes - .into_iter() - .filter_map(get_peer) - .collect::>(); - RegionRoute { - region, - leader_peer, - follower_peers, - } - }) - .collect::>(); + let mut region_routes = Vec::with_capacity(table_route.region_routes.len()); + for region_route in table_route.region_routes.into_iter() { + let region = region_route + .region + .context(error::RouteInfoCorruptedSnafu { + err_msg: "'region' not found", + })? + .into(); + + let leader_peer = get_peer(region_route.leader_peer_index); + let follower_peers = region_route + .follower_peer_indexes + .into_iter() + .filter_map(get_peer) + .collect::>(); + + region_routes.push(RegionRoute { + region, + leader_peer, + follower_peers, + }); + } table_routes.push(TableRoute { table, @@ -126,13 +131,13 @@ impl TryFrom for RouteResponse { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct TableRoute { pub table: Table, pub region_routes: Vec, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct Table { pub id: u64, pub table_name: TableName, @@ -157,14 +162,14 @@ impl TryFrom for Table { } } -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, Deserialize, Serialize)] pub struct RegionRoute { - pub region: Option, + pub region: Region, pub leader_peer: Option, pub follower_peers: Vec, } -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, Deserialize, Serialize)] pub struct Region { pub id: u64, pub name: String, @@ -183,12 +188,26 @@ impl From for Region { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct Partition { + #[serde(serialize_with = "as_utf8")] pub column_list: Vec>, + #[serde(serialize_with = "as_utf8")] pub value_list: Vec>, } +fn as_utf8(val: &[Vec], serializer: S) -> std::result::Result { + serializer.serialize_str( + val.iter() + .map(|v| { + String::from_utf8(v.clone()).unwrap_or_else(|_| "".to_string()) + }) + .collect::>() + .join(",") + .as_str(), + ) +} + impl From for PbPartition { fn from(p: Partition) -> Self { Self { @@ -335,7 +354,7 @@ mod tests { let mut region_routes = table_route.region_routes; assert_eq!(1, region_routes.len()); let region_route = region_routes.remove(0); - let region = region_route.region.unwrap(); + let region = region_route.region; assert_eq!(1, region.id); assert_eq!("region1", region.name); let partition = region.partition.unwrap(); diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index 95101250e7..a90dd01a21 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -14,6 +14,7 @@ use crate::service::store::etcd::EtcdStore; use crate::service::store::kv::KvStoreRef; use crate::service::store::memory::MemStore; +#[derive(Clone)] pub struct MockInfo { pub server_addr: String, pub channel_manager: ChannelManager, diff --git a/src/meta-srv/src/selector/lease_based.rs b/src/meta-srv/src/selector/lease_based.rs index c59e2586f8..d584c692d9 100644 --- a/src/meta-srv/src/selector/lease_based.rs +++ b/src/meta-srv/src/selector/lease_based.rs @@ -19,7 +19,7 @@ impl Selector for LeaseBasedSelector { async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result { // filter out the nodes out lease let lease_filter = |_: &LeaseKey, v: &LeaseValue| { - time_util::current_time_millis() - v.timestamp_millis < ctx.datanode_lease_secs + time_util::current_time_millis() - v.timestamp_millis < ctx.datanode_lease_secs * 1000 }; let mut lease_kvs = lease::alive_datanodes(ns, &ctx.kv_store, lease_filter).await?; // TODO(jiachun): At the moment we are just pushing the latest to the forefront, diff --git a/src/table/src/test_util/memtable.rs b/src/table/src/test_util/memtable.rs index c7e90e0197..fda215d94c 100644 --- a/src/table/src/test_util/memtable.rs +++ b/src/table/src/test_util/memtable.rs @@ -13,6 +13,7 @@ use datatypes::vectors::UInt32Vector; use futures::task::{Context, Poll}; use futures::Stream; use snafu::prelude::*; +use store_api::storage::RegionNumber; use crate::error::{Result, SchemaConversionSnafu, TableProjectionSnafu}; use crate::metadata::{ @@ -29,12 +30,21 @@ pub struct MemTable { impl MemTable { pub fn new(table_name: impl Into, recordbatch: RecordBatch) -> Self { + Self::new_with_region(table_name, recordbatch, vec![0]) + } + + pub fn new_with_region( + table_name: impl Into, + recordbatch: RecordBatch, + regions: Vec, + ) -> Self { Self::new_with_catalog( table_name, recordbatch, 0, "greptime".to_string(), "public".to_string(), + regions, ) } @@ -44,6 +54,7 @@ impl MemTable { table_id: TableId, catalog_name: String, schema_name: String, + regions: Vec, ) -> Self { let schema = recordbatch.schema.clone(); @@ -56,6 +67,7 @@ impl MemTable { .engine_options(Default::default()) .options(Default::default()) .created_on(Default::default()) + .region_numbers(regions) .build() .unwrap();