feat: table route cache (#462)

* feat: table route cache

Co-authored-by: luofucong <luofucong@greptime.com>
This commit is contained in:
LFC
2022-11-11 18:54:56 +08:00
committed by GitHub
parent e30879f638
commit f375e18a76
27 changed files with 775 additions and 237 deletions

206
Cargo.lock generated
View File

@@ -276,6 +276,36 @@ dependencies = [
"tokio",
]
[[package]]
name = "async-io"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8121296a9f05be7f34aa4196b1747243b3b62e048bb7906f644f3fbfc490cf7"
dependencies = [
"async-lock",
"autocfg",
"concurrent-queue",
"futures-lite",
"libc",
"log",
"parking",
"polling",
"slab",
"socket2",
"waker-fn",
"winapi",
]
[[package]]
name = "async-lock"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8101efe8695a6c17e02911402145357e718ac92d3ff88ae8419e84b1707b685"
dependencies = [
"event-listener",
"futures-lite",
]
[[package]]
name = "async-stream"
version = "0.3.3"
@@ -657,6 +687,12 @@ version = "3.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1ad822118d20d2c234f427000d5acc36eabe1e29a348c89b63dd60b13f28e5d"
[[package]]
name = "bytecount"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c"
[[package]]
name = "bytemuck"
version = "1.12.1"
@@ -698,6 +734,37 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c"
[[package]]
name = "camino"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88ad0e1e3e88dd237a156ab9f571021b8a158caa0ae44b1968a241efb5144c1e"
dependencies = [
"serde",
]
[[package]]
name = "cargo-platform"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbdb825da8a5df079a43676dbe042702f1707b1109f713a01420fbb4cc71fa27"
dependencies = [
"serde",
]
[[package]]
name = "cargo_metadata"
version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa"
dependencies = [
"camino",
"cargo-platform",
"semver",
"serde",
"serde_json",
]
[[package]]
name = "caseless"
version = "0.2.1"
@@ -1878,6 +1945,15 @@ dependencies = [
"libc",
]
[[package]]
name = "error-chain"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc"
dependencies = [
"version_check",
]
[[package]]
name = "error-code"
version = "2.3.1"
@@ -2015,6 +2091,7 @@ dependencies = [
"async-stream",
"async-trait",
"catalog",
"chrono",
"client",
"common-base",
"common-catalog",
@@ -2033,6 +2110,8 @@ dependencies = [
"futures",
"itertools",
"meta-client",
"meta-srv",
"moka",
"openmetrics-parser",
"prost 0.11.0",
"query",
@@ -2174,6 +2253,21 @@ version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbf4d2a7a308fd4578637c0b17c7e1c7ba127b8f6ba00b29f717e9655d85eb68"
[[package]]
name = "futures-lite"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"memchr",
"parking",
"pin-project-lite",
"waker-fn",
]
[[package]]
name = "futures-macro"
version = "0.3.24"
@@ -2945,6 +3039,7 @@ dependencies = [
"futures",
"meta-srv",
"rand 0.8.5",
"serde",
"snafu",
"tokio",
"tokio-stream",
@@ -3084,6 +3179,32 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "moka"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b49a05f67020456541f4f29cbaa812016a266a86ec76f96d3873d459c68fe5e"
dependencies = [
"async-io",
"async-lock",
"crossbeam-channel",
"crossbeam-epoch",
"crossbeam-utils",
"futures-util",
"num_cpus",
"once_cell",
"parking_lot",
"quanta",
"rustc_version",
"scheduled-thread-pool",
"skeptic",
"smallvec",
"tagptr",
"thiserror",
"triomphe",
"uuid",
]
[[package]]
name = "multimap"
version = "0.8.3"
@@ -3558,6 +3679,12 @@ version = "6.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ff7415e9ae3fff1225851df9e0d9e4e5479f947619774677a63572e55e80eff"
[[package]]
name = "parking"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
[[package]]
name = "parking_lot"
version = "0.12.1"
@@ -3919,6 +4046,20 @@ dependencies = [
"syn",
]
[[package]]
name = "polling"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab4609a838d88b73d8238967b60dd115cc08d38e2bbaf51ee1e4b695f89122e2"
dependencies = [
"autocfg",
"cfg-if",
"libc",
"log",
"wepoll-ffi",
"winapi",
]
[[package]]
name = "portable-atomic"
version = "0.3.15"
@@ -4126,6 +4267,17 @@ dependencies = [
"prost 0.11.0",
]
[[package]]
name = "pulldown-cmark"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d9cc634bc78768157b5cbfe988ffcd1dcba95cd2b2f03a88316c08c6d00ed63"
dependencies = [
"bitflags",
"memchr",
"unicase",
]
[[package]]
name = "quanta"
version = "0.10.1"
@@ -4865,6 +5017,15 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "977a7519bff143a44f842fd07e80ad1329295bd71686457f18e496736f4bf9bf"
dependencies = [
"parking_lot",
]
[[package]]
name = "schemars"
version = "0.8.11"
@@ -4976,6 +5137,9 @@ name = "semver"
version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4"
dependencies = [
"serde",
]
[[package]]
name = "serde"
@@ -5189,6 +5353,21 @@ version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]]
name = "skeptic"
version = "0.13.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8"
dependencies = [
"bytecount",
"cargo_metadata",
"error-chain",
"glob",
"pulldown-cmark",
"tempfile",
"walkdir",
]
[[package]]
name = "sketches-ddsketch"
version = "0.2.0"
@@ -5615,6 +5794,12 @@ dependencies = [
"tokio",
]
[[package]]
name = "tagptr"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]]
name = "tap"
version = "1.0.1"
@@ -6188,6 +6373,12 @@ dependencies = [
"tracing-log",
]
[[package]]
name = "triomphe"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1ee9bd9239c339d714d657fac840c6d2a4f9c45f4f9ec7b0975113458be78db"
[[package]]
name = "try-lock"
version = "0.2.3"
@@ -6444,6 +6635,12 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8e76fae08f03f96e166d2dfda232190638c10e0383841252416f9cfe2ae60e6"
[[package]]
name = "waker-fn"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
[[package]]
name = "walkdir"
version = "2.3.2"
@@ -6572,6 +6769,15 @@ dependencies = [
"webpki",
]
[[package]]
name = "wepoll-ffi"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d743fdedc5c64377b5fc2bc036b01c7fd642205a0d96356034ae3404d49eb7fb"
dependencies = [
"cc",
]
[[package]]
name = "which"
version = "4.3.0"

View File

@@ -69,6 +69,7 @@ message Table {
}
message Region {
// TODO(LFC): Maybe use message RegionNumber?
uint64 id = 1;
string name = 2;
Partition partition = 3;

View File

@@ -1,4 +1,5 @@
use std::fmt::Debug;
use std::sync::Arc;
use async_stream::stream;
use common_telemetry::info;
@@ -10,7 +11,7 @@ use crate::error::{Error, MetaSrvSnafu};
use crate::remote::{Kv, KvBackend, ValueIter};
#[derive(Debug)]
pub struct MetaKvBackend {
pub client: MetaClient,
pub client: Arc<MetaClient>,
}
/// Implement `KvBackend` trait for `MetaKvBackend` instead of opendal's `Accessor` since

View File

@@ -151,6 +151,7 @@ impl TableEngine for MockTableEngine {
table_id,
catalog_name,
schema_name,
vec![0],
)) as Arc<_>;
let mut tables = self.tables.write().await;

View File

@@ -118,6 +118,7 @@ pub struct TableGlobalValue {
pub id: TableId,
/// Id of datanode that created the global table info kv. only for debugging.
pub node_id: u64,
// TODO(LFC): Maybe remove it?
/// Allocation of region ids across all datanodes.
pub regions_id_map: HashMap<u64, Vec<u32>>,
/// Node id -> region ids

View File

@@ -14,7 +14,7 @@ pub struct HeartbeatTask {
node_id: u64,
server_addr: String,
running: Arc<AtomicBool>,
meta_client: MetaClient,
meta_client: Arc<MetaClient>,
interval: u64,
}
@@ -26,7 +26,7 @@ impl Drop for HeartbeatTask {
impl HeartbeatTask {
/// Create a new heartbeat task instance.
pub fn new(node_id: u64, server_addr: String, meta_client: MetaClient) -> Self {
pub fn new(node_id: u64, server_addr: String, meta_client: Arc<MetaClient>) -> Self {
Self {
node_id,
server_addr,

View File

@@ -34,7 +34,7 @@ pub struct Instance {
pub(crate) physical_planner: PhysicalPlanner,
pub(crate) script_executor: ScriptExecutor,
#[allow(unused)]
pub(crate) meta_client: Option<MetaClient>,
pub(crate) meta_client: Option<Arc<MetaClient>>,
pub(crate) heartbeat_task: Option<HeartbeatTask>,
}
@@ -48,7 +48,8 @@ impl Instance {
let meta_client = match opts.mode {
Mode::Standalone => None,
Mode::Distributed => {
Some(new_metasrv_client(opts.node_id, &opts.meta_client_opts).await?)
let meta_client = new_metasrv_client(opts.node_id, &opts.meta_client_opts).await?;
Some(Arc::new(meta_client))
}
};

View File

@@ -2,6 +2,7 @@ use std::sync::Arc;
use catalog::remote::MetaKvBackend;
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_srv::mocks::MockInfo;
use query::QueryEngineFactory;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
@@ -23,7 +24,8 @@ impl Instance {
use table_engine::table::test_util::MockEngine;
use table_engine::table::test_util::MockMitoEngine;
let meta_client = Some(mock_meta_client().await);
let mock_info = meta_srv::mocks::mock_with_memstore().await;
let meta_client = Some(Arc::new(mock_meta_client(mock_info, 0).await));
let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await;
let mock_engine = Arc::new(MockMitoEngine::new(
TableEngineConfig::default(),
@@ -63,9 +65,14 @@ impl Instance {
}
pub async fn with_mock_meta_client(opts: &DatanodeOptions) -> Result<Self> {
let mock_info = meta_srv::mocks::mock_with_memstore().await;
Self::with_mock_meta_server(opts, mock_info).await
}
pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result<Self> {
let object_store = new_object_store(&opts.storage).await?;
let log_store = create_local_file_log_store(opts).await?;
let meta_client = mock_meta_client().await;
let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id).await);
let table_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),
EngineImpl::new(
@@ -104,15 +111,14 @@ impl Instance {
}
}
async fn mock_meta_client() -> MetaClient {
let mock_info = meta_srv::mocks::mock_with_memstore().await;
let meta_srv::mocks::MockInfo {
async fn mock_meta_client(mock_info: MockInfo, node_id: u64) -> MetaClient {
let MockInfo {
server_addr,
channel_manager,
} = mock_info;
let id = (1000u64, 2000u64);
let mut meta_client = MetaClientBuilder::new(id.0, id.1)
let mut meta_client = MetaClientBuilder::new(id.0, node_id)
.enable_heartbeat()
.enable_router()
.enable_store()

View File

@@ -21,9 +21,12 @@ common-time = { path = "../common/time" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
datanode = { path = "../datanode" }
datatypes = { path = "../datatypes" }
futures = "0.3"
itertools = "0.10"
meta-client = { path = "../meta-client" }
moka = { version = "0.9", features = ["future"] }
openmetrics-parser = "0.4"
prost = "0.11"
query = { path = "../query" }
@@ -36,7 +39,6 @@ sql = { path = "../sql" }
store-api = { path = "../store-api" }
table = { path = "../table" }
tokio = { version = "1.18", features = ["full"] }
meta-client = {path = "../meta-client"}
[dependencies.arrow]
package = "arrow2"
@@ -44,8 +46,9 @@ version = "0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
[dev-dependencies]
datanode = { path = "../datanode" }
chrono = "0.4"
futures = "0.3"
meta-srv = { path = "../meta-srv", features = ["mock"] }
tempdir = "0.3"
tonic = "0.8"
tower = "0.4"

View File

@@ -1,5 +1,5 @@
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::sync::Arc;
use catalog::error::{
@@ -10,30 +10,32 @@ use catalog::{
CatalogList, CatalogProvider, CatalogProviderRef, SchemaProvider, SchemaProviderRef,
};
use common_catalog::{CatalogKey, SchemaKey, TableGlobalKey, TableGlobalValue};
use common_error::ext::BoxedError;
use futures::StreamExt;
use snafu::{OptionExt, ResultExt};
use meta_client::rpc::TableName;
use snafu::prelude::*;
use table::TableRef;
use tokio::sync::RwLock;
use crate::error::DatanodeNotAvailableSnafu;
use crate::mock::{DatanodeId, DatanodeInstance};
use crate::datanode::DatanodeClients;
use crate::partitioning::range::RangePartitionRule;
use crate::table::route::TableRoutes;
use crate::table::DistTable;
pub type DatanodeInstances = HashMap<DatanodeId, DatanodeInstance>;
pub struct FrontendCatalogManager {
backend: KvBackendRef,
datanode_instances: Arc<RwLock<DatanodeInstances>>,
table_routes: Arc<TableRoutes>,
datanode_clients: Arc<DatanodeClients>,
}
impl FrontendCatalogManager {
#[allow(dead_code)]
pub fn new(backend: KvBackendRef, datanode_instances: Arc<RwLock<DatanodeInstances>>) -> Self {
pub(crate) fn new(
backend: KvBackendRef,
table_routes: Arc<TableRoutes>,
datanode_clients: Arc<DatanodeClients>,
) -> Self {
Self {
backend,
datanode_instances,
table_routes,
datanode_clients,
}
}
}
@@ -79,7 +81,8 @@ impl CatalogList for FrontendCatalogManager {
Ok(Some(Arc::new(FrontendCatalogProvider {
catalog_name: name.to_string(),
backend: self.backend.clone(),
datanode_instances: self.datanode_instances.clone(),
table_routes: self.table_routes.clone(),
datanode_clients: self.datanode_clients.clone(),
})))
} else {
Ok(None)
@@ -90,7 +93,8 @@ impl CatalogList for FrontendCatalogManager {
pub struct FrontendCatalogProvider {
catalog_name: String,
backend: KvBackendRef,
datanode_instances: Arc<RwLock<DatanodeInstances>>,
table_routes: Arc<TableRoutes>,
datanode_clients: Arc<DatanodeClients>,
}
impl CatalogProvider for FrontendCatalogProvider {
@@ -136,7 +140,8 @@ impl CatalogProvider for FrontendCatalogProvider {
catalog_name: self.catalog_name.clone(),
schema_name: name.to_string(),
backend: self.backend.clone(),
datanode_instances: self.datanode_instances.clone(),
table_routes: self.table_routes.clone(),
datanode_clients: self.datanode_clients.clone(),
})))
} else {
Ok(None)
@@ -148,7 +153,8 @@ pub struct FrontendSchemaProvider {
catalog_name: String,
schema_name: String,
backend: KvBackendRef,
datanode_instances: Arc<RwLock<DatanodeInstances>>,
table_routes: Arc<TableRoutes>,
datanode_clients: Arc<DatanodeClients>,
}
impl SchemaProvider for FrontendSchemaProvider {
@@ -187,24 +193,20 @@ impl SchemaProvider for FrontendSchemaProvider {
table_name: name.to_string(),
};
let instances = self.datanode_instances.clone();
let backend = self.backend.clone();
let table_name = name.to_string();
let table_routes = self.table_routes.clone();
let datanode_clients = self.datanode_clients.clone();
let table_name = TableName::new(&self.catalog_name, &self.schema_name, name);
let result: Result<Option<TableRef>, catalog::error::Error> = std::thread::spawn(|| {
common_runtime::block_on_read(async move {
let mut datanode_instances = HashMap::new();
let res = match backend.get(table_global_key.to_string().as_bytes()).await? {
None => {
return Ok(None);
}
Some(r) => r,
};
let mut region_to_datanode_map = HashMap::new();
let val = TableGlobalValue::parse(String::from_utf8_lossy(&res.1))
.context(InvalidCatalogValueSnafu)?;
let node_id: DatanodeId = val.node_id;
// TODO(hl): We need to deserialize string to PartitionRule trait object
let partition_rule: Arc<RangePartitionRule> =
@@ -214,26 +216,8 @@ impl SchemaProvider for FrontendSchemaProvider {
},
)?);
for (node_id, region_ids) in val.regions_id_map {
for region_id in region_ids {
region_to_datanode_map.insert(region_id, node_id);
}
}
datanode_instances.insert(
node_id,
instances
.read()
.await
.get(&node_id)
.context(DatanodeNotAvailableSnafu { node_id })
.map_err(BoxedError::new)
.context(catalog::error::InternalSnafu)?
.clone(),
);
let table = Arc::new(DistTable {
table_name: table_name.clone(),
table_name,
schema: Arc::new(
val.meta
.schema
@@ -241,8 +225,8 @@ impl SchemaProvider for FrontendSchemaProvider {
.context(InvalidSchemaInCatalogSnafu)?,
),
partition_rule,
region_dist_map: region_to_datanode_map,
datanode_instances,
table_routes,
datanode_clients,
});
Ok(Some(table as _))
})

View File

@@ -0,0 +1,39 @@
use std::time::Duration;
use client::Client;
use common_grpc::channel_manager::ChannelManager;
use meta_client::rpc::Peer;
use moka::future::{Cache, CacheBuilder};
pub(crate) struct DatanodeClients {
channel_manager: ChannelManager,
clients: Cache<Peer, Client>,
}
impl DatanodeClients {
pub(crate) fn new() -> Self {
Self {
channel_manager: ChannelManager::new(),
clients: CacheBuilder::new(1024)
.time_to_live(Duration::from_secs(30 * 60))
.time_to_idle(Duration::from_secs(5 * 60))
.build(),
}
}
pub(crate) async fn get_client(&self, datanode: &Peer) -> Client {
self.clients
.get_with_by_ref(datanode, async move {
Client::with_manager_and_urls(
self.channel_manager.clone(),
vec![datanode.addr.clone()],
)
})
.await
}
#[cfg(test)]
pub(crate) async fn insert_client(&self, datanode: Peer, client: Client) {
self.clients.insert(datanode, client).await
}
}

View File

@@ -5,8 +5,6 @@ use common_query::logical_plan::Expr;
use datafusion_common::ScalarValue;
use store_api::storage::RegionId;
use crate::mock::DatanodeId;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
@@ -123,12 +121,6 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Failed to get Datanode instance: {:?}", datanode))]
DatanodeInstance {
datanode: DatanodeId,
backtrace: Backtrace,
},
#[snafu(display("Invaild InsertRequest, reason: {}", reason))]
InvalidInsertRequest {
reason: String,
@@ -184,8 +176,23 @@ pub enum Error {
source: common_catalog::error::Error,
},
#[snafu(display("Cannot find datanode by id: {}", node_id))]
DatanodeNotAvailable { node_id: u64, backtrace: Backtrace },
#[snafu(display("Failed to request Meta, source: {}", source))]
RequestMeta {
#[snafu(backtrace)]
source: meta_client::error::Error,
},
#[snafu(display("Failed to get cache, error: {}", err_msg))]
GetCache {
err_msg: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to find table routes for table {}", table_name))]
FindTableRoutes {
table_name: String,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -216,7 +223,8 @@ impl ErrorExt for Error {
Error::ColumnDataType { .. }
| Error::FindDatanode { .. }
| Error::DatanodeInstance { .. } => StatusCode::Internal,
| Error::GetCache { .. }
| Error::FindTableRoutes { .. } => StatusCode::Internal,
Error::IllegalFrontendState { .. } | Error::IncompleteGrpcResult { .. } => {
StatusCode::Unexpected
@@ -229,7 +237,8 @@ impl ErrorExt for Error {
Error::JoinTask { .. } => StatusCode::Unexpected,
Error::Catalog { source, .. } => source.status_code(),
Error::ParseCatalogEntry { source, .. } => source.status_code(),
Error::DatanodeNotAvailable { .. } => StatusCode::StorageUnavailable,
Error::RequestMeta { source } => source.status_code(),
}
}

View File

@@ -1,5 +1,6 @@
use std::sync::Arc;
use datanode::datanode::Mode;
use serde::{Deserialize, Serialize};
use snafu::prelude::*;
@@ -22,6 +23,7 @@ pub struct FrontendOptions {
pub opentsdb_options: Option<OpentsdbOptions>,
pub influxdb_options: Option<InfluxdbOptions>,
pub prometheus_options: Option<PrometheusOptions>,
pub mode: Mode,
}
impl Default for FrontendOptions {
@@ -34,6 +36,7 @@ impl Default for FrontendOptions {
opentsdb_options: Some(OpentsdbOptions::default()),
influxdb_options: Some(InfluxdbOptions::default()),
prometheus_options: Some(PrometheusOptions::default()),
mode: Mode::Standalone,
}
}
}

View File

@@ -3,6 +3,8 @@ mod opentsdb;
mod prometheus;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{
@@ -10,11 +12,15 @@ use api::v1::{
InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult,
};
use async_trait::async_trait;
use catalog::remote::MetaKvBackend;
use client::admin::{admin_result_to_output, Admin};
use client::{Client, Database, Select};
use common_error::prelude::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_query::Output;
use datanode::datanode::{MetaClientOpts, Mode};
use datatypes::schema::ColumnSchema;
use meta_client::client::MetaClientBuilder;
use servers::error as server_error;
use servers::query_handler::{
GrpcAdminHandler, GrpcQueryHandler, InfluxdbLineProtocolHandler, OpentsdbProtocolHandler,
@@ -27,8 +33,11 @@ use sql::statements::statement::Statement;
use sql::statements::{column_def_to_schema, table_idents_to_full_name};
use sql::{dialect::GenericDialect, parser::ParserContext};
use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::error::{self, ConvertColumnDefaultConstraintSnafu, Result};
use crate::frontend::FrontendOptions;
use crate::table::route::TableRoutes;
#[async_trait]
pub trait FrontendInstance:
@@ -48,6 +57,7 @@ pub trait FrontendInstance:
#[derive(Default)]
pub struct Instance {
client: Client,
catalog_manager: Option<FrontendCatalogManager>,
}
impl Instance {
@@ -71,6 +81,39 @@ impl FrontendInstance for Instance {
async fn start(&mut self, opts: &FrontendOptions) -> Result<()> {
let addr = opts.datanode_grpc_addr();
self.client.start(vec![addr]);
let meta_client = match opts.mode {
Mode::Standalone => None,
Mode::Distributed => {
let meta_config = MetaClientOpts::default();
let channel_config = ChannelConfig::new()
.timeout(Duration::from_millis(meta_config.timeout_millis))
.connect_timeout(Duration::from_millis(meta_config.connect_timeout_millis))
.tcp_nodelay(meta_config.tcp_nodelay);
let channel_manager = ChannelManager::with_config(channel_config);
let meta_client = MetaClientBuilder::new(0, 0)
.enable_router()
.enable_store()
.channel_manager(channel_manager)
.build();
Some(Arc::new(meta_client))
}
};
self.catalog_manager = if let Some(meta_client) = meta_client {
let meta_backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),
});
let table_routes = Arc::new(TableRoutes::new(meta_client));
let datanode_clients = Arc::new(DatanodeClients::new());
let catalog_manager =
FrontendCatalogManager::new(meta_backend, table_routes, datanode_clients);
Some(catalog_manager)
} else {
None
};
Ok(())
}
}
@@ -78,7 +121,10 @@ impl FrontendInstance for Instance {
#[cfg(test)]
impl Instance {
pub fn with_client(client: Client) -> Self {
Self { client }
Self {
client,
catalog_manager: None,
}
}
}

View File

@@ -1,6 +1,7 @@
#![feature(assert_matches)]
mod catalog;
mod datanode;
pub mod error;
pub mod frontend;
pub mod grpc;

View File

@@ -4,7 +4,6 @@ use std::fmt::Formatter;
use std::sync::Arc;
use api::v1::InsertExpr;
use catalog::CatalogManagerRef;
use client::ObjectResult;
use client::{Database, Select};
use common_query::prelude::Expr;
@@ -15,15 +14,14 @@ use datafusion::logical_plan::{LogicalPlan as DfLogicPlan, LogicalPlanBuilder};
use datafusion_expr::Expr as DfExpr;
use datatypes::prelude::Value;
use datatypes::schema::SchemaRef;
use meta_client::rpc::TableName;
use query::plan::LogicalPlan;
use table::table::adapter::DfTableProviderAdapter;
pub(crate) type DatanodeId = u64;
use table::TableRef;
#[derive(Clone)]
pub struct DatanodeInstance {
pub(crate) datanode_id: DatanodeId,
catalog_manager: CatalogManagerRef,
table: TableRef,
db: Database,
}
@@ -34,30 +32,19 @@ impl std::fmt::Debug for DatanodeInstance {
}
impl DatanodeInstance {
#[allow(dead_code)]
pub(crate) fn new(
datanode_id: DatanodeId,
catalog_list: CatalogManagerRef,
db: Database,
) -> Self {
Self {
datanode_id,
catalog_manager: catalog_list,
db,
}
pub(crate) fn new(table: TableRef, db: Database) -> Self {
Self { table, db }
}
pub(crate) async fn grpc_insert(&self, request: InsertExpr) -> client::Result<ObjectResult> {
self.db.insert(request).await
}
#[allow(clippy::print_stdout)]
pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> RecordBatches {
let logical_plan = self.build_logical_plan(&plan);
// TODO(LFC): Directly pass in logical plan to GRPC interface when our substrait codec supports filter.
let sql = to_sql(logical_plan);
println!("executing sql \"{}\" in datanode {}", sql, self.datanode_id);
let result = self.db.select(Select::Sql(sql)).await.unwrap();
let output: Output = result.try_into().unwrap();
@@ -72,13 +59,10 @@ impl DatanodeInstance {
}
fn build_logical_plan(&self, table_scan: &TableScanPlan) -> LogicalPlan {
let catalog = self.catalog_manager.catalog("greptime").unwrap().unwrap();
let schema = catalog.schema("public").unwrap().unwrap();
let table = schema.table(&table_scan.table_name).unwrap().unwrap();
let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone()));
let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone()));
let mut builder = LogicalPlanBuilder::scan_with_filters(
table_scan.table_name.clone(),
&table_scan.table_name.table_name,
table_provider,
table_scan.projection.clone(),
table_scan
@@ -99,7 +83,7 @@ impl DatanodeInstance {
#[derive(Debug)]
pub(crate) struct TableScanPlan {
pub table_name: String,
pub table_name: TableName,
pub projection: Option<Vec<usize>>,
pub filters: Vec<Expr>,
pub limit: Option<usize>,

View File

@@ -1,10 +1,12 @@
mod insert;
pub(crate) mod route;
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use async_trait::async_trait;
use client::Database;
use common_query::error::Result as QueryResult;
use common_query::logical_plan::Expr;
use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef};
@@ -13,6 +15,7 @@ use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_plan::Expr as DfExpr;
use datafusion::physical_plan::Partitioning;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use meta_client::rpc::{Peer, TableName};
use snafu::prelude::*;
use store_api::storage::RegionNumber;
use table::error::Error as TableError;
@@ -21,17 +24,20 @@ use table::requests::InsertRequest;
use table::Table;
use tokio::sync::RwLock;
use crate::datanode::DatanodeClients;
use crate::error::{self, Error, Result};
use crate::mock::{DatanodeId, DatanodeInstance, TableScanPlan};
use crate::mock::{DatanodeInstance, TableScanPlan};
use crate::partitioning::{Operator, PartitionExpr, PartitionRuleRef};
use crate::spliter::WriteSpliter;
use crate::table::route::TableRoutes;
#[derive(Clone)]
pub struct DistTable {
pub table_name: String,
pub schema: SchemaRef,
pub partition_rule: PartitionRuleRef<Error>,
pub region_dist_map: HashMap<RegionNumber, DatanodeId>,
pub datanode_instances: HashMap<DatanodeId, DatanodeInstance>,
pub(crate) table_name: TableName,
pub(crate) schema: SchemaRef,
pub(crate) partition_rule: PartitionRuleRef<Error>,
pub(crate) table_routes: Arc<TableRoutes>,
pub(crate) datanode_clients: Arc<DatanodeClients>,
}
#[async_trait]
@@ -65,31 +71,28 @@ impl Table for DistTable {
limit: Option<usize>,
) -> table::Result<PhysicalPlanRef> {
let regions = self.find_regions(filters).map_err(TableError::new)?;
let datanodes = self.find_datanodes(regions).map_err(TableError::new)?;
let partition_execs = datanodes
.iter()
.map(|(datanode, _regions)| {
let datanode_instance = self
.datanode_instances
.get(datanode)
.context(error::DatanodeInstanceSnafu {
datanode: *datanode,
})?
.clone();
// TODO(LFC): Pass in "regions" when Datanode supports multi regions for a table.
Ok(PartitionExec {
table_name: self.table_name.clone(),
datanode_instance,
projection: projection.clone(),
filters: filters.to_vec(),
limit,
batches: Arc::new(RwLock::new(None)),
})
})
.collect::<Result<Vec<PartitionExec>>>()
let datanodes = self
.find_datanodes(regions)
.await
.map_err(TableError::new)?;
let mut partition_execs = Vec::with_capacity(datanodes.len());
for (datanode, _regions) in datanodes.iter() {
let client = self.datanode_clients.get_client(datanode).await;
let db = Database::new(&self.table_name.schema_name, client);
let datanode_instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db);
// TODO(LFC): Pass in "regions" when Datanode supports multi regions for a table.
partition_execs.push(PartitionExec {
table_name: self.table_name.clone(),
datanode_instance,
projection: projection.clone(),
filters: filters.to_vec(),
limit,
batches: Arc::new(RwLock::new(None)),
})
}
let dist_scan = DistTableScan {
schema: project_schema(self.schema(), projection),
partition_execs,
@@ -188,15 +191,24 @@ impl DistTable {
.collect::<HashSet<RegionNumber>>())
}
fn find_datanodes(
async fn find_datanodes(
&self,
regions: Vec<RegionNumber>,
) -> Result<HashMap<DatanodeId, Vec<RegionNumber>>> {
) -> Result<HashMap<Peer, Vec<RegionNumber>>> {
let route = self.table_routes.get_route(&self.table_name).await?;
let mut datanodes = HashMap::new();
for region in regions.iter() {
let datanode = *self
.region_dist_map
.get(region)
let datanode = route
.region_routes
.iter()
.find_map(|x| {
if x.region.id == *region as u64 {
x.leader_peer.clone()
} else {
None
}
})
.context(error::FindDatanodeSnafu { region: *region })?;
datanodes
.entry(datanode)
@@ -283,7 +295,7 @@ impl PhysicalPlan for DistTableScan {
#[derive(Debug)]
struct PartitionExec {
table_name: String,
table_name: TableName,
datanode_instance: DatanodeInstance,
projection: Option<Vec<usize>>,
filters: Vec<Expr>,
@@ -325,8 +337,10 @@ impl PartitionExec {
#[allow(clippy::print_stdout)]
#[cfg(test)]
mod test {
use api::v1::meta::{PutRequest, RequestHeader};
use catalog::RegisterTableRequest;
use client::Database;
use chrono::DateTime;
use common_catalog::{TableGlobalKey, TableGlobalValue};
use common_recordbatch::{util, RecordBatch};
use datafusion::arrow_print;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
@@ -336,8 +350,15 @@ mod test {
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
use datanode::instance::Instance;
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{Int32Vector, UInt32Vector};
use datatypes::schema::{ColumnSchema, RawSchema, Schema};
use datatypes::vectors::{Int32Vector, UInt32Vector, UInt64Vector};
use meta_client::client::MetaClientBuilder;
use meta_client::rpc::{CreateRequest, Partition};
use meta_srv::metasrv::MetaSrvOptions;
use meta_srv::mocks::MockInfo;
use meta_srv::service::store::kv::KvStoreRef;
use meta_srv::service::store::memory::MemStore;
use table::metadata::RawTableMeta;
use table::test_util::MemTable;
use table::TableRef;
use tempdir::TempDir;
@@ -359,21 +380,21 @@ mod test {
// should scan only region 1
// select a, row_id from numbers where a < 10
let projection = Some(vec![0, 1]);
let projection = Some(vec![1, 2]);
let filters = vec![binary_expr(col("a"), Operator::Lt, lit(10)).into()];
exec_table_scan(table.clone(), projection, filters, None).await;
println!();
// should scan region 1 and 2
// select a, row_id from numbers where a < 15
let projection = Some(vec![0, 1]);
let projection = Some(vec![1, 2]);
let filters = vec![binary_expr(col("a"), Operator::Lt, lit(15)).into()];
exec_table_scan(table.clone(), projection, filters, None).await;
println!();
// should scan region 2 and 3
// select a, row_id from numbers where a < 40 and a >= 10
let projection = Some(vec![0, 1]);
let projection = Some(vec![1, 2]);
let filters = vec![and(
binary_expr(col("a"), Operator::Lt, lit(40)),
binary_expr(col("a"), Operator::GtEq, lit(10)),
@@ -384,7 +405,7 @@ mod test {
// should scan all regions
// select a, row_id from numbers where a < 1000 and row_id == 1
let projection = Some(vec![0, 1]);
let projection = Some(vec![1, 2]);
let filters = vec![and(
binary_expr(col("a"), Operator::Lt, lit(1000)),
binary_expr(col("row_id"), Operator::Eq, lit(1)),
@@ -424,10 +445,12 @@ mod test {
}
async fn new_dist_table() -> DistTable {
let schema = Arc::new(Schema::new(vec![
let column_schemas = vec![
ColumnSchema::new("ts", ConcreteDataType::uint64_datatype(), false),
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("row_id", ConcreteDataType::uint32_datatype(), true),
]));
];
let schema = Arc::new(Schema::new(column_schemas.clone()));
// PARTITION BY RANGE (a) (
// PARTITION r1 VALUES LESS THAN (10),
@@ -438,54 +461,171 @@ mod test {
let partition_rule = RangePartitionRule::new(
"a",
vec![10_i32.into(), 20_i32.into(), 50_i32.into()],
vec![1_u32, 2, 3, 4],
vec![0_u32, 1, 2, 3],
);
let table1 = new_memtable(schema.clone(), (0..5).collect::<Vec<i32>>());
let table2 = new_memtable(schema.clone(), (10..15).collect::<Vec<i32>>());
let table3 = new_memtable(schema.clone(), (30..35).collect::<Vec<i32>>());
let table4 = new_memtable(schema.clone(), (100..105).collect::<Vec<i32>>());
let kv_store: KvStoreRef = Arc::new(MemStore::default()) as _;
let meta_srv =
meta_srv::mocks::mock(MetaSrvOptions::default(), kv_store.clone(), None).await;
let instance1 = create_datanode_instance(1, table1).await;
let instance2 = create_datanode_instance(2, table2).await;
let instance3 = create_datanode_instance(3, table3).await;
let instance4 = create_datanode_instance(4, table4).await;
let mut datanode_instances = HashMap::new();
for datanode_id in 1..=4 {
datanode_instances.insert(
datanode_id,
create_datanode_instance(datanode_id, meta_srv.clone()).await,
);
}
let datanode_instances = HashMap::from([
(instance1.datanode_id, instance1),
(instance2.datanode_id, instance2),
(instance3.datanode_id, instance3),
(instance4.datanode_id, instance4),
]);
let MockInfo {
server_addr,
channel_manager,
} = meta_srv.clone();
let mut meta_client = MetaClientBuilder::new(1000, 0)
.enable_router()
.enable_store()
.channel_manager(channel_manager)
.build();
meta_client.start(&[&server_addr]).await.unwrap();
let meta_client = Arc::new(meta_client);
let table_name = TableName::new("greptime", "public", "dist_numbers");
let create_request = CreateRequest {
table_name: table_name.clone(),
partitions: vec![
Partition {
column_list: vec![b"a".to_vec()],
value_list: vec![b"10".to_vec()],
},
Partition {
column_list: vec![b"a".to_vec()],
value_list: vec![b"20".to_vec()],
},
Partition {
column_list: vec![b"a".to_vec()],
value_list: vec![b"50".to_vec()],
},
Partition {
column_list: vec![b"a".to_vec()],
value_list: vec![b"MAXVALUE".to_vec()],
},
],
};
let mut route_response = meta_client.create_route(create_request).await.unwrap();
let table_route = route_response.table_routes.remove(0);
println!("{}", serde_json::to_string_pretty(&table_route).unwrap());
let mut region_to_datanode_mapping = HashMap::new();
for region_route in table_route.region_routes.iter() {
let region_id = region_route.region.id as u32;
let datanode_id = region_route.leader_peer.as_ref().unwrap().id;
region_to_datanode_mapping.insert(region_id, datanode_id);
}
let table_global_key = TableGlobalKey {
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
};
let table_global_value = TableGlobalValue {
id: table_route.table.id as u32,
node_id: table_route
.region_routes
.first()
.unwrap()
.leader_peer
.as_ref()
.unwrap()
.id,
regions_id_map: HashMap::new(),
meta: RawTableMeta {
schema: RawSchema {
column_schemas: column_schemas.clone(),
timestamp_index: Some(0),
version: 0,
},
primary_key_indices: vec![],
value_indices: vec![],
engine: "".to_string(),
next_column_id: column_schemas.len() as u32,
region_numbers: vec![],
engine_options: HashMap::new(),
options: HashMap::new(),
created_on: DateTime::default(),
},
partition_rules: serde_json::to_string(&partition_rule).unwrap(),
};
let _put_response = kv_store
.put(PutRequest {
header: Some(RequestHeader::new((1000, 0))),
key: table_global_key.to_string().as_bytes().to_vec(),
value: table_global_value.as_bytes().unwrap(),
prev_kv: true,
})
.await
.unwrap();
let datanode_clients = Arc::new(DatanodeClients::new());
let mut global_start_ts = 1;
let regional_numbers = vec![
(0, (0..5).collect::<Vec<i32>>()),
(1, (10..15).collect::<Vec<i32>>()),
(2, (30..35).collect::<Vec<i32>>()),
(3, (100..105).collect::<Vec<i32>>()),
];
for (region_id, numbers) in regional_numbers {
let datanode_id = *region_to_datanode_mapping.get(&region_id).unwrap();
let instance = datanode_instances.get(&datanode_id).unwrap().clone();
let start_ts = global_start_ts;
global_start_ts += numbers.len() as u64;
let table = new_memtable(schema.clone(), numbers, vec![region_id], start_ts);
register_datanode_table(instance.clone(), table).await;
let (addr, client) = crate::tests::create_datanode_client(instance).await;
datanode_clients
.insert_client(Peer::new(datanode_id, addr), client)
.await;
}
DistTable {
table_name: "dist_numbers".to_string(),
table_name,
schema,
partition_rule: Arc::new(partition_rule),
region_dist_map: HashMap::from([(1_u32, 1), (2_u32, 2), (3_u32, 3), (4_u32, 4)]),
datanode_instances,
table_routes: Arc::new(TableRoutes::new(meta_client)),
datanode_clients,
}
}
fn new_memtable(schema: SchemaRef, data: Vec<i32>) -> MemTable {
fn new_memtable(
schema: SchemaRef,
data: Vec<i32>,
regions: Vec<RegionNumber>,
start_ts: u64,
) -> MemTable {
let rows = data.len() as u32;
let columns: Vec<VectorRef> = vec![
// column "ts"
Arc::new(UInt64Vector::from_slice(
(start_ts..start_ts + rows as u64).collect::<Vec<u64>>(),
)),
// column "a"
Arc::new(Int32Vector::from_slice(data)),
// column "row_id"
Arc::new(UInt32Vector::from_slice((1..=rows).collect::<Vec<u32>>())),
];
let recordbatch = RecordBatch::new(schema, columns).unwrap();
MemTable::new("dist_numbers", recordbatch)
MemTable::new_with_region("dist_numbers", recordbatch, regions)
}
async fn create_datanode_instance(
datanode_id: DatanodeId,
table: MemTable,
) -> DatanodeInstance {
let wal_tmp_dir = TempDir::new_in("/tmp", "gt_wal_dist_table_test").unwrap();
let data_tmp_dir = TempDir::new_in("/tmp", "gt_data_dist_table_test").unwrap();
async fn create_datanode_instance(datanode_id: u64, meta_srv: MockInfo) -> Arc<Instance> {
let current = common_time::util::current_time_millis();
let wal_tmp_dir =
TempDir::new_in("/tmp", &format!("dist_table_test-wal-{}", current)).unwrap();
let data_tmp_dir =
TempDir::new_in("/tmp", &format!("dist_table_test-data-{}", current)).unwrap();
let opts = DatanodeOptions {
node_id: datanode_id,
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
storage: ObjectStoreConfig::File {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
@@ -493,28 +633,26 @@ mod test {
..Default::default()
};
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
let instance = Arc::new(
Instance::with_mock_meta_server(&opts, meta_srv)
.await
.unwrap(),
);
instance.start().await.unwrap();
let catalog_manager = instance.catalog_manager().clone();
let client = crate::tests::create_datanode_client(instance).await;
instance
}
let table_name = table.table_name().to_string();
catalog_manager
async fn register_datanode_table(instance: Arc<Instance>, table: MemTable) {
let catalog_manager = instance.catalog_manager().clone();
let _ = catalog_manager
.register_table(RegisterTableRequest {
catalog: "greptime".to_string(),
schema: "public".to_string(),
table_name: table_name.clone(),
table_name: table.table_name().to_string(),
table_id: 1234,
table: Arc::new(table),
})
.await
.unwrap();
DatanodeInstance::new(
datanode_id,
catalog_manager,
Database::new("greptime", client),
)
.await;
}
#[tokio::test(flavor = "multi_thread")]
@@ -531,31 +669,31 @@ mod test {
// test simple filter
test(
vec![binary_expr(col("a"), Operator::Lt, lit(10)).into()], // a < 10
vec![1],
vec![0],
);
test(
vec![binary_expr(col("a"), Operator::LtEq, lit(10)).into()], // a <= 10
vec![1, 2],
vec![0, 1],
);
test(
vec![binary_expr(lit(20), Operator::Gt, col("a")).into()], // 20 > a
vec![1, 2],
vec![0, 1],
);
test(
vec![binary_expr(lit(20), Operator::GtEq, col("a")).into()], // 20 >= a
vec![1, 2, 3],
vec![0, 1, 2],
);
test(
vec![binary_expr(lit(45), Operator::Eq, col("a")).into()], // 45 == a
vec![3],
vec![2],
);
test(
vec![binary_expr(col("a"), Operator::NotEq, lit(45)).into()], // a != 45
vec![1, 2, 3, 4],
vec![0, 1, 2, 3],
);
test(
vec![binary_expr(col("a"), Operator::Gt, lit(50)).into()], // a > 50
vec![4],
vec![3],
);
// test multiple filters
@@ -564,21 +702,21 @@ mod test {
binary_expr(col("a"), Operator::Gt, lit(10)).into(),
binary_expr(col("a"), Operator::Gt, lit(50)).into(),
], // [a > 10, a > 50]
vec![4],
vec![3],
);
// test finding all regions when provided with not supported filters or not partition column
test(
vec![binary_expr(col("row_id"), Operator::LtEq, lit(123)).into()], // row_id <= 123
vec![1, 2, 3, 4],
vec![0, 1, 2, 3],
);
test(
vec![binary_expr(col("b"), Operator::Like, lit("foo%")).into()], // b LIKE 'foo%'
vec![1, 2, 3, 4],
vec![0, 1, 2, 3],
);
test(
vec![binary_expr(col("c"), Operator::Gt, lit(123)).into()], // c > 789
vec![1, 2, 3, 4],
vec![0, 1, 2, 3],
);
// test complex "AND" or "OR" filters
@@ -591,7 +729,7 @@ mod test {
),
)
.into()], // row_id < 1 OR (row_id < 1 AND a > 1)
vec![1, 2, 3, 4],
vec![0, 1, 2, 3],
);
test(
vec![or(
@@ -599,7 +737,7 @@ mod test {
binary_expr(col("a"), Operator::GtEq, lit(20)),
)
.into()], // a < 20 OR a >= 20
vec![1, 2, 3, 4],
vec![0, 1, 2, 3],
);
test(
vec![and(
@@ -607,7 +745,7 @@ mod test {
binary_expr(col("a"), Operator::Lt, lit(50)),
)
.into()], // a < 20 AND a < 50
vec![1, 2],
vec![0, 1],
);
// test failed to find regions by contradictory filters

View File

@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::codec;
@@ -7,7 +8,7 @@ use api::v1::insert_expr::Expr;
use api::v1::Column;
use api::v1::InsertExpr;
use api::v1::MutateResult;
use client::ObjectResult;
use client::{Database, ObjectResult};
use snafu::ensure;
use snafu::OptionExt;
use snafu::ResultExt;
@@ -17,26 +18,32 @@ use table::requests::InsertRequest;
use super::DistTable;
use crate::error;
use crate::error::Result;
use crate::mock::DatanodeInstance;
impl DistTable {
pub async fn dist_insert(
&self,
inserts: HashMap<RegionNumber, InsertRequest>,
) -> Result<ObjectResult> {
let mut joins = Vec::with_capacity(inserts.len());
let route = self.table_routes.get_route(&self.table_name).await?;
let mut joins = Vec::with_capacity(inserts.len());
for (region_id, insert) in inserts {
let db = self
.region_dist_map
.get(&region_id)
let datanode = route
.region_routes
.iter()
.find_map(|x| {
if x.region.id == region_id as u64 {
x.leader_peer.clone()
} else {
None
}
})
.context(error::FindDatanodeSnafu { region: region_id })?;
let instance = self
.datanode_instances
.get(db)
.context(error::DatanodeInstanceSnafu { datanode: *db })?;
let instance = instance.clone();
let client = self.datanode_clients.get_client(&datanode).await;
let db = Database::new(&self.table_name.schema_name, client);
let instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db);
// TODO(fys): a separate runtime should be used here.
let join = tokio::spawn(async move {

View File

@@ -0,0 +1,56 @@
use std::sync::Arc;
use std::time::Duration;
use meta_client::client::MetaClient;
use meta_client::rpc::{RouteRequest, TableName, TableRoute};
use moka::future::{Cache, CacheBuilder};
use snafu::{ensure, ResultExt};
use crate::error::{self, Result};
pub(crate) struct TableRoutes {
meta_client: Arc<MetaClient>,
cache: Cache<TableName, Arc<TableRoute>>,
}
impl TableRoutes {
pub(crate) fn new(meta_client: Arc<MetaClient>) -> Self {
Self {
meta_client,
cache: CacheBuilder::new(1024)
.time_to_live(Duration::from_secs(30 * 60))
.time_to_idle(Duration::from_secs(5 * 60))
.build(),
}
}
pub(crate) async fn get_route(&self, table_name: &TableName) -> Result<Arc<TableRoute>> {
self.cache
.try_get_with_by_ref(table_name, self.get_from_meta(table_name))
.await
.map_err(|e| {
error::GetCacheSnafu {
err_msg: format!("{:?}", e),
}
.build()
})
}
async fn get_from_meta(&self, table_name: &TableName) -> Result<Arc<TableRoute>> {
let mut resp = self
.meta_client
.route(RouteRequest {
table_names: vec![table_name.clone()],
})
.await
.context(error::RequestMetaSnafu)?;
ensure!(
!resp.table_routes.is_empty(),
error::FindTableRoutesSnafu {
table_name: table_name.to_string()
}
);
let route = resp.table_routes.swap_remove(0);
Ok(Arc::new(route))
}
}

View File

@@ -20,11 +20,13 @@ async fn create_datanode_instance() -> Arc<DatanodeInstance> {
pub(crate) async fn create_frontend_instance() -> Arc<Instance> {
let datanode_instance = create_datanode_instance().await;
let client = create_datanode_client(datanode_instance).await;
let (_, client) = create_datanode_client(datanode_instance).await;
Arc::new(Instance::with_client(client))
}
pub(crate) async fn create_datanode_client(datanode_instance: Arc<DatanodeInstance>) -> Client {
pub(crate) async fn create_datanode_client(
datanode_instance: Arc<DatanodeInstance>,
) -> (String, Client) {
let (client, server) = tokio::io::duplex(1024);
let runtime = Arc::new(
@@ -49,8 +51,8 @@ pub(crate) async fn create_datanode_client(datanode_instance: Arc<DatanodeInstan
// Move client to an option so we can _move_ the inner value
// on the first attempt to connect. All other attempts will fail.
let mut client = Some(client);
// "http://[::]:50051" is just a placeholder, does not actually connect to it,
let addr = "[::].50051";
// "0.0.0.0:3001" is just a placeholder, does not actually connect to it.
let addr = "0.0.0.0:3001";
let channel_manager = ChannelManager::new();
channel_manager
.reset_with_connector(
@@ -71,5 +73,8 @@ pub(crate) async fn create_datanode_client(datanode_instance: Arc<DatanodeInstan
}),
)
.unwrap();
Client::with_manager_and_urls(channel_manager, vec![addr])
(
addr.to_string(),
Client::with_manager_and_urls(channel_manager, vec![addr]),
)
}

View File

@@ -11,6 +11,7 @@ common-grpc = { path = "../common/grpc" }
common-telemetry = { path = "../common/telemetry" }
etcd-client = "0.10"
rand = "0.8"
serde = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.18", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"] }

View File

@@ -46,7 +46,7 @@ pub enum Error {
#[snafu(display("Failed create heartbeat stream to server"))]
CreateHeartbeatStream { backtrace: Backtrace },
#[snafu(display("Route info corruped: {}", err_msg))]
#[snafu(display("Route info corrupted: {}", err_msg))]
RouteInfoCorrupted {
err_msg: String,
backtrace: Backtrace,

View File

@@ -2,6 +2,8 @@ mod router;
mod store;
pub mod util;
use std::fmt::{Display, Formatter};
use api::v1::meta::KeyValue as PbKeyValue;
use api::v1::meta::Peer as PbPeer;
use api::v1::meta::ResponseHeader as PbResponseHeader;
@@ -13,6 +15,7 @@ pub use router::RouteRequest;
pub use router::RouteResponse;
pub use router::Table;
pub use router::TableRoute;
use serde::{Deserialize, Serialize};
pub use store::BatchPutRequest;
pub use store::BatchPutResponse;
pub use store::CompareAndPutRequest;
@@ -90,13 +93,23 @@ impl KeyValue {
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
pub struct TableName {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
}
impl Display for TableName {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}.{}.{}",
self.catalog_name, self.schema_name, self.table_name
)
}
}
impl TableName {
pub fn new(
catalog_name: impl Into<String>,
@@ -131,7 +144,7 @@ impl From<PbTableName> for TableName {
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
pub struct Peer {
pub id: u64,
pub addr: String,

View File

@@ -6,6 +6,7 @@ use api::v1::meta::Region as PbRegion;
use api::v1::meta::RouteRequest as PbRouteRequest;
use api::v1::meta::RouteResponse as PbRouteResponse;
use api::v1::meta::Table as PbTable;
use serde::{Deserialize, Serialize, Serializer};
use snafu::OptionExt;
use super::util;
@@ -96,25 +97,29 @@ impl TryFrom<PbRouteResponse> for RouteResponse {
err_msg: "table required",
})?
.try_into()?;
let region_routes = table_route
.region_routes
.into_iter()
.map(|region_route| {
let region = region_route.region.map(Into::into);
let leader_peer = get_peer(region_route.leader_peer_index);
let follower_peers = region_route
.follower_peer_indexes
.into_iter()
.filter_map(get_peer)
.collect::<Vec<_>>();
RegionRoute {
region,
leader_peer,
follower_peers,
}
})
.collect::<Vec<_>>();
let mut region_routes = Vec::with_capacity(table_route.region_routes.len());
for region_route in table_route.region_routes.into_iter() {
let region = region_route
.region
.context(error::RouteInfoCorruptedSnafu {
err_msg: "'region' not found",
})?
.into();
let leader_peer = get_peer(region_route.leader_peer_index);
let follower_peers = region_route
.follower_peer_indexes
.into_iter()
.filter_map(get_peer)
.collect::<Vec<_>>();
region_routes.push(RegionRoute {
region,
leader_peer,
follower_peers,
});
}
table_routes.push(TableRoute {
table,
@@ -126,13 +131,13 @@ impl TryFrom<PbRouteResponse> for RouteResponse {
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TableRoute {
pub table: Table,
pub region_routes: Vec<RegionRoute>,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Table {
pub id: u64,
pub table_name: TableName,
@@ -157,14 +162,14 @@ impl TryFrom<PbTable> for Table {
}
}
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct RegionRoute {
pub region: Option<Region>,
pub region: Region,
pub leader_peer: Option<Peer>,
pub follower_peers: Vec<Peer>,
}
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct Region {
pub id: u64,
pub name: String,
@@ -183,12 +188,26 @@ impl From<PbRegion> for Region {
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Partition {
#[serde(serialize_with = "as_utf8")]
pub column_list: Vec<Vec<u8>>,
#[serde(serialize_with = "as_utf8")]
pub value_list: Vec<Vec<u8>>,
}
fn as_utf8<S: Serializer>(val: &[Vec<u8>], serializer: S) -> std::result::Result<S::Ok, S::Error> {
serializer.serialize_str(
val.iter()
.map(|v| {
String::from_utf8(v.clone()).unwrap_or_else(|_| "<unknown-not-UTF8>".to_string())
})
.collect::<Vec<String>>()
.join(",")
.as_str(),
)
}
impl From<Partition> for PbPartition {
fn from(p: Partition) -> Self {
Self {
@@ -335,7 +354,7 @@ mod tests {
let mut region_routes = table_route.region_routes;
assert_eq!(1, region_routes.len());
let region_route = region_routes.remove(0);
let region = region_route.region.unwrap();
let region = region_route.region;
assert_eq!(1, region.id);
assert_eq!("region1", region.name);
let partition = region.partition.unwrap();

View File

@@ -14,6 +14,7 @@ use crate::service::store::etcd::EtcdStore;
use crate::service::store::kv::KvStoreRef;
use crate::service::store::memory::MemStore;
#[derive(Clone)]
pub struct MockInfo {
pub server_addr: String,
pub channel_manager: ChannelManager,

View File

@@ -19,7 +19,7 @@ impl Selector for LeaseBasedSelector {
async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result<Self::Output> {
// filter out the nodes out lease
let lease_filter = |_: &LeaseKey, v: &LeaseValue| {
time_util::current_time_millis() - v.timestamp_millis < ctx.datanode_lease_secs
time_util::current_time_millis() - v.timestamp_millis < ctx.datanode_lease_secs * 1000
};
let mut lease_kvs = lease::alive_datanodes(ns, &ctx.kv_store, lease_filter).await?;
// TODO(jiachun): At the moment we are just pushing the latest to the forefront,

View File

@@ -13,6 +13,7 @@ use datatypes::vectors::UInt32Vector;
use futures::task::{Context, Poll};
use futures::Stream;
use snafu::prelude::*;
use store_api::storage::RegionNumber;
use crate::error::{Result, SchemaConversionSnafu, TableProjectionSnafu};
use crate::metadata::{
@@ -29,12 +30,21 @@ pub struct MemTable {
impl MemTable {
pub fn new(table_name: impl Into<String>, recordbatch: RecordBatch) -> Self {
Self::new_with_region(table_name, recordbatch, vec![0])
}
pub fn new_with_region(
table_name: impl Into<String>,
recordbatch: RecordBatch,
regions: Vec<RegionNumber>,
) -> Self {
Self::new_with_catalog(
table_name,
recordbatch,
0,
"greptime".to_string(),
"public".to_string(),
regions,
)
}
@@ -44,6 +54,7 @@ impl MemTable {
table_id: TableId,
catalog_name: String,
schema_name: String,
regions: Vec<RegionNumber>,
) -> Self {
let schema = recordbatch.schema.clone();
@@ -56,6 +67,7 @@ impl MemTable {
.engine_options(Default::default())
.options(Default::default())
.created_on(Default::default())
.region_numbers(regions)
.build()
.unwrap();