feat: datanode heartbeat (#377)

* chore: refactor dir for local catalog manager

* refactor: CatalogProvider returns Result

* refactor: SchemaProvider returns Result

* feat: add kv operations to remote catalog

* chore: refactor some code

* feat: impl catalog initialization

* feat: add register table and register system table function

* refactor: add table_info method for Table trait

* chore: add some tests

* chore: add register schema test

* chore: fix build issue after rebase onto develop

* refactor: mock to separate file

* build: failed to compile

* fix: use a container struct to bridge KvBackend and Accessor trait

* feat: upgrade opendal to 0.17

* test: add more tests

* chore: add catalog name and schema name to table info

* chore: add catalog name and schema name to table info

* chore: rebase onto develop

* refactor: common-catalog crate

* chore: refactor dir for local catalog manager

* refactor: CatalogProvider returns Result

* refactor: SchemaProvider returns Result

* feat: add kv operations to remote catalog

* chore: refactor some code

* feat: impl catalog initialization

* feat: add register table and register system table function

* refactor: add table_info method for Table trait

* chore: add some tests

* chore: add register schema test

* chore: fix build issue after rebase onto develop

* refactor: mock to separate file

* build: failed to compile

* fix: use a container struct to bridge KvBackend and Accessor trait

* feat: upgrade opendal to 0.17

* test: add more tests

* chore: add catalog name and schema name to table info

* chore: add catalog name and schema name to table info

* chore: rebase onto develop

* refactor: common-catalog crate

* refactor: remove remote catalog related files

* fix: compilation

* feat: add table version to TableKey

* feat: add node id to TableValue

* fix: some CR comments

* chore: change async fn create_expr_to_request to sync

* fix: add backtrace to errors

* fix: code style

* refactor: merge refactor/catalog-crate

* feat: table key with version

* feat: impl KvBackend for MetaClient

* fix: integrate metaclient

* fix: catalog use local table info as baseline

* fix: sync metsrv

* fix: wip

* fix: update remote catalog on register and deregister

* refactor: CatalogProvider

* refactor: CatalogManager

* fix: catalog key filtering

* fix: pass some test

* refactor: catalog iterating

* fix: CatalogManager::table also requires both catalog_name and schema_name

* chore: merge develop

* chore: merge catalog crate

* fix: adapt to recent meta-client api change

* feat: databode lease

* feat: remote catalog (#356)

* chore: refactor dir for local catalog manager

* refactor: CatalogProvider returns Result

* refactor: SchemaProvider returns Result

* feat: add kv operations to remote catalog

* chore: refactor some code

* feat: impl catalog initialization

* feat: add register table and register system table function

* refactor: add table_info method for Table trait

* chore: add some tests

* chore: add register schema test

* chore: fix build issue after rebase onto develop

* refactor: mock to separate file

* build: failed to compile

* fix: use a container struct to bridge KvBackend and Accessor trait

* feat: upgrade opendal to 0.17

* test: add more tests

* chore: add catalog name and schema name to table info

* chore: add catalog name and schema name to table info

* chore: rebase onto develop

* refactor: common-catalog crate

* chore: refactor dir for local catalog manager

* refactor: CatalogProvider returns Result

* refactor: SchemaProvider returns Result

* feat: add kv operations to remote catalog

* chore: refactor some code

* feat: impl catalog initialization

* feat: add register table and register system table function

* refactor: add table_info method for Table trait

* chore: add some tests

* chore: add register schema test

* chore: fix build issue after rebase onto develop

* refactor: mock to separate file

* build: failed to compile

* fix: use a container struct to bridge KvBackend and Accessor trait

* feat: upgrade opendal to 0.17

* test: add more tests

* chore: add catalog name and schema name to table info

* chore: add catalog name and schema name to table info

* chore: rebase onto develop

* refactor: common-catalog crate

* refactor: remove remote catalog related files

* fix: compilation

* feat: add table version to TableKey

* feat: add node id to TableValue

* fix: some CR comments

* chore: change async fn create_expr_to_request to sync

* fix: add backtrace to errors

* fix: code style

* refactor: merge refactor/catalog-crate

* feat: table key with version

* feat: impl KvBackend for MetaClient

* fix: integrate metaclient

* fix: catalog use local table info as baseline

* fix: sync metsrv

* fix: wip

* fix: update remote catalog on register and deregister

* refactor: CatalogProvider

* refactor: CatalogManager

* fix: catalog key filtering

* fix: pass some test

* refactor: catalog iterating

* fix: CatalogManager::table also requires both catalog_name and schema_name

* chore: merge develop

* chore: merge catalog crate

* fix: adapt to recent meta-client api change

* feat: datanode heartbeat (#355)

* feat: add heartbeat task to instance

* feat: add node_id datanode opts

* fix: use real node id in heartbeat and meta client

* feat: distribute table in frontend

* test: distribute read demo

* test: distribute read demo

* test: distribute read demo

* add write spliter

* fix: node id changed to u64

* feat: datanode uses remote catalog implementation

* dist insert integrate table

* feat: specify region ids on creating table (#359)

* fix: compiling issues

* feat: datanode lease (#354)

* Some glue code about dist_insert

* fix: correctly wrap string value with quotes

* feat: create route

* feat: frontend catalog (#362)

* feat: integrate catalog to frontend

* feat: preserve partition rule on create

* fix: print tables on start

* chore: log in create route

* test: distribute read demo

* feat: support metasrv addr command line options

* feat: optimize DataNodeInstance creation (#368)

* chore: remove unnecessary changes

* chore: revert changes to src/api

* chore: revert changes to src/datanode/src/server.rs

* chore: remove opendal backend

* chore: optimize imports

* chore: revert changes to instance and region ids

* refactor: MetaKvBackend range

* fix: remove some wrap

* refactor: initiation of catalog

* feat: add region id to create table request and add heartbeat task to datanode instance

* fix: fix auto reconnect for heartbeat task

* chore: change TableValue::region_numbers to vec<u32>.

* fix: some tests

* fix: avoid concurrently start Heartbeat task by compare_exchange

* fix: some cr comments

* fix: fix unit tests

Co-authored-by: jiachun <jiachun_fjc@163.com>
Co-authored-by: luofucong <luofucong@greptime.com>
Co-authored-by: fys <1113014250@qq.com>
Co-authored-by: Jiachun Feng <jiachun_feng@proton.me>
This commit is contained in:
Lei, Huang
2022-11-07 17:10:43 +08:00
committed by GitHub
parent 172c9a1e21
commit 2137587091
36 changed files with 517 additions and 107 deletions

2
Cargo.lock generated
View File

@@ -1628,6 +1628,8 @@ dependencies = [
"futures",
"hyper",
"log-store",
"meta-client",
"meta-srv",
"metrics",
"object-store",
"query",

View File

@@ -2,14 +2,13 @@
name = "benchmarks"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
arrow = "10"
clap = { version = "4.0", features = ["derive"] }
client = { path = "../src/client" }
itertools = "0.10.5"
indicatif = "0.17.1"
itertools = "0.10.5"
parquet = { version = "*" }
tokio = { version = "1.21", features = ["full"] }

View File

@@ -1,3 +1,4 @@
node_id = 42
http_addr = '0.0.0.0:3000'
rpc_addr = '0.0.0.0:3001'
wal_dir = '/tmp/greptimedb/wal'
@@ -13,3 +14,9 @@ postgres_runtime_size = 4
[storage]
type = 'File'
data_dir = '/tmp/greptimedb/data/'
[meta_client_opts]
metasrv_addr = "1.1.1.1:3002"
timeout_millis = 3000
connect_timeout_millis = 5000
tcp_nodelay = true

View File

@@ -4,6 +4,7 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_error::prelude::{Snafu, StatusCode};
use datafusion::error::DataFusionError;
use datatypes::arrow;
use datatypes::schema::RawSchema;
use snafu::{Backtrace, ErrorCompat};
#[derive(Debug, Snafu)]
@@ -110,6 +111,19 @@ pub enum Error {
source: table::error::Error,
},
#[snafu(display(
"Invalid table schema in catalog entry, table:{}, schema: {:?}, source: {}",
table_info,
schema,
source
))]
InvalidTableSchema {
table_info: String,
schema: RawSchema,
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Failed to execute system catalog table scan, source: {}", source))]
SystemCatalogTableScanExec {
#[snafu(backtrace)]
@@ -170,6 +184,7 @@ impl ErrorExt for Error {
Error::MetaSrv { source, .. } => source.status_code(),
Error::SystemCatalogTableScan { source } => source.status_code(),
Error::SystemCatalogTableScanExec { source } => source.status_code(),
Error::InvalidTableSchema { source, .. } => source.status_code(),
}
}

View File

@@ -12,21 +12,21 @@ use common_catalog::{
SchemaKey, SchemaValue, TableKey, TableValue,
};
use common_telemetry::{debug, info};
use datatypes::schema::Schema;
use futures::Stream;
use futures_util::StreamExt;
use snafu::{OptionExt, ResultExt};
use table::engine::{EngineContext, TableEngineRef};
use table::metadata::{TableId, TableVersion};
use table::requests::{CreateTableRequest, OpenTableRequest};
use table::table::numbers::NumbersTable;
use table::TableRef;
use tokio::sync::Mutex;
use crate::error::Result;
use crate::error::{
CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu,
SchemaNotFoundSnafu, TableExistsSnafu,
};
use crate::error::{InvalidTableSchemaSnafu, Result};
use crate::remote::{Kv, KvBackendRef};
use crate::{
handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef,
@@ -164,7 +164,7 @@ impl RemoteCatalogManager {
/// Fetch catalogs/schemas/tables from remote catalog manager along with max table id allocated.
async fn initiate_catalogs(&self) -> Result<(HashMap<String, CatalogProviderRef>, TableId)> {
let mut res = HashMap::new();
let max_table_id = MIN_USER_TABLE_ID;
let max_table_id = MIN_USER_TABLE_ID - 1;
// initiate default catalog and schema
let default_catalog = self.initiate_default_catalog().await?;
@@ -246,7 +246,7 @@ impl RemoteCatalogManager {
async fn initiate_default_catalog(&self) -> Result<CatalogProviderRef> {
let default_catalog = self.new_catalog_provider(DEFAULT_CATALOG_NAME);
let default_schema = self.new_schema_provider(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema)?;
default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema.clone())?;
let schema_key = SchemaKey {
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
@@ -310,13 +310,22 @@ impl RemoteCatalogManager {
})? {
Some(table) => Ok(table),
None => {
let schema = meta
.schema
.clone()
.try_into()
.context(InvalidTableSchemaSnafu {
table_info: format!("{}.{}.{}", catalog_name, schema_name, table_name,),
schema: meta.schema.clone(),
})?;
let req = CreateTableRequest {
id: *id,
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
desc: None,
schema: Arc::new(Schema::new(meta.schema.column_schemas.clone())),
schema: Arc::new(schema),
region_numbers: meta.region_numbers.clone(),
primary_key_indices: meta.primary_key_indices.clone(),
create_if_not_exists: true,
table_options: meta.options.clone(),
@@ -352,6 +361,15 @@ impl CatalogManager for RemoteCatalogManager {
let mut system_table_requests = self.system_table_requests.lock().await;
handle_system_table_request(self, self.engine.clone(), &mut system_table_requests).await?;
info!("All system table opened");
self.catalog(DEFAULT_CATALOG_NAME)
.unwrap()
.unwrap()
.schema(DEFAULT_SCHEMA_NAME)
.unwrap()
.unwrap()
.register_table("numbers".to_string(), Arc::new(NumbersTable::default()))
.unwrap();
Ok(())
}
@@ -512,6 +530,7 @@ impl CatalogProvider for RemoteCatalogProvider {
.context(InvalidCatalogValueSnafu)?,
)
.await?;
let prev_schemas = schemas.load();
let mut new_schemas = HashMap::with_capacity(prev_schemas.len() + 1);
new_schemas.clone_from(&prev_schemas);
@@ -590,7 +609,7 @@ impl SchemaProvider for RemoteSchemaProvider {
meta: table_info.meta.clone().into(),
id: table_info.ident.table_id,
node_id: self.node_id,
regions_ids: vec![],
regions_ids: table.table_info().meta.region_numbers.clone(),
};
let backend = self.backend.clone();
let mutex = self.mutex.clone();

View File

@@ -96,6 +96,7 @@ impl SystemCatalogTable {
table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(),
desc: Some("System catalog table".to_string()),
schema: schema.clone(),
region_numbers: vec![0],
primary_key_indices: vec![ENTRY_TYPE_INDEX, KEY_INDEX, TIMESTAMP_INDEX],
create_if_not_exists: true,
table_options: HashMap::new(),

View File

@@ -114,6 +114,7 @@ mod tests {
table_name: table_name.clone(),
desc: None,
schema: table_schema.clone(),
region_numbers: vec![0],
primary_key_indices: vec![],
create_if_not_exists: false,
table_options: Default::default(),
@@ -154,7 +155,7 @@ mod tests {
.schema(DEFAULT_SCHEMA_NAME)
.unwrap()
.unwrap();
assert_eq!(Vec::<String>::new(), default_schema.table_names().unwrap());
assert_eq!(vec!["numbers"], default_schema.table_names().unwrap());
// register a new table with an nonexistent catalog
let catalog_name = DEFAULT_CATALOG_NAME.to_string();
@@ -173,6 +174,7 @@ mod tests {
table_name: table_name.clone(),
desc: None,
schema: table_schema.clone(),
region_numbers: vec![0],
primary_key_indices: vec![],
create_if_not_exists: false,
table_options: Default::default(),
@@ -188,7 +190,14 @@ mod tests {
table,
};
assert_eq!(1, catalog_manager.register_table(reg_req).await.unwrap());
assert_eq!(vec![table_name], default_schema.table_names().unwrap());
assert_eq!(
HashSet::from([table_name, "numbers".to_string()]),
default_schema
.table_names()
.unwrap()
.into_iter()
.collect::<HashSet<_>>()
);
}
#[tokio::test]
@@ -225,6 +234,7 @@ mod tests {
table_name: "".to_string(),
desc: None,
schema: Arc::new(Schema::new(vec![])),
region_numbers: vec![0],
primary_key_indices: vec![],
create_if_not_exists: false,
table_options: Default::default(),

View File

@@ -33,6 +33,8 @@ impl SubCommand {
#[derive(Debug, Parser)]
struct StartCommand {
#[clap(long)]
node_id: Option<u64>,
#[clap(long)]
http_addr: Option<String>,
#[clap(long)]
@@ -41,6 +43,8 @@ struct StartCommand {
mysql_addr: Option<String>,
#[clap(long)]
postgres_addr: Option<String>,
#[clap(long)]
metasrv_addr: Option<String>,
#[clap(short, long)]
config_file: Option<String>,
}
@@ -71,6 +75,9 @@ impl TryFrom<StartCommand> for DatanodeOptions {
DatanodeOptions::default()
};
if let Some(node_id) = cmd.node_id {
opts.node_id = node_id;
}
if let Some(addr) = cmd.http_addr {
opts.http_addr = addr;
}
@@ -83,7 +90,9 @@ impl TryFrom<StartCommand> for DatanodeOptions {
if let Some(addr) = cmd.postgres_addr {
opts.postgres_addr = addr;
}
if let Some(addr) = cmd.metasrv_addr {
opts.meta_client_opts.metasrv_addr = addr;
}
Ok(opts)
}
}
@@ -97,10 +106,12 @@ mod tests {
#[test]
fn test_read_from_config_file() {
let cmd = StartCommand {
node_id: None,
http_addr: None,
rpc_addr: None,
mysql_addr: None,
postgres_addr: None,
metasrv_addr: None,
config_file: Some(format!(
"{}/../../config/datanode.example.toml",
std::env::current_dir().unwrap().as_path().to_str().unwrap()
@@ -112,6 +123,13 @@ mod tests {
assert_eq!("/tmp/greptimedb/wal".to_string(), options.wal_dir);
assert_eq!("0.0.0.0:3306".to_string(), options.mysql_addr);
assert_eq!(4, options.mysql_runtime_size);
assert_eq!(
"1.1.1.1:3002".to_string(),
options.meta_client_opts.metasrv_addr
);
assert_eq!(5000, options.meta_client_opts.connect_timeout_millis);
assert_eq!(3000, options.meta_client_opts.timeout_millis);
assert!(options.meta_client_opts.tcp_nodelay);
assert_eq!("0.0.0.0:5432".to_string(), options.postgres_addr);
assert_eq!(4, options.postgres_runtime_size);

View File

@@ -102,7 +102,7 @@ impl TableKey {
pub struct TableValue {
pub id: TableId,
pub node_id: u64,
pub regions_ids: Vec<u64>,
pub regions_ids: Vec<u32>,
pub meta: RawTableMeta,
}
@@ -278,6 +278,7 @@ mod tests {
engine_options: Default::default(),
value_indices: vec![2, 3],
options: Default::default(),
region_numbers: vec![1],
};
let value = TableValue {

View File

@@ -377,6 +377,7 @@ mod test {
table_name: table_name.to_string(),
desc: None,
schema: Arc::new(Schema::new(supported_types())),
region_numbers: vec![0],
primary_key_indices: vec![],
create_if_not_exists: true,
table_options: Default::default(),

View File

@@ -29,6 +29,8 @@ datatypes = { path = "../datatypes" }
futures = "0.3"
hyper = { version = "0.14", features = ["full"] }
log-store = { path = "../log-store" }
meta-client = { path = "../meta-client" }
meta-srv = { path = "../meta-srv", features = ["mock"] }
metrics = "0.20"
object-store = { path = "../object-store" }
query = { path = "../query" }

View File

@@ -15,13 +15,17 @@ pub enum ObjectStoreConfig {
impl Default for ObjectStoreConfig {
fn default() -> Self {
ObjectStoreConfig::File {
data_dir: "/tmp/greptimedb/data/".to_string(),
data_dir: format!(
"/tmp/greptimedb/data/{}",
common_time::util::current_time_millis()
),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DatanodeOptions {
pub node_id: u64,
pub http_addr: String,
pub rpc_addr: String,
pub rpc_runtime_size: usize,
@@ -29,6 +33,7 @@ pub struct DatanodeOptions {
pub mysql_runtime_size: usize,
pub postgres_addr: String,
pub postgres_runtime_size: usize,
pub meta_client_opts: MetaClientOpts,
pub wal_dir: String,
pub storage: ObjectStoreConfig,
}
@@ -36,6 +41,7 @@ pub struct DatanodeOptions {
impl Default for DatanodeOptions {
fn default() -> Self {
Self {
node_id: 0,
http_addr: "0.0.0.0:3000".to_string(),
rpc_addr: "0.0.0.0:3001".to_string(),
rpc_runtime_size: 8,
@@ -43,7 +49,11 @@ impl Default for DatanodeOptions {
mysql_runtime_size: 2,
postgres_addr: "0.0.0.0:5432".to_string(),
postgres_runtime_size: 2,
wal_dir: "/tmp/greptimedb/wal".to_string(),
meta_client_opts: MetaClientOpts::default(),
wal_dir: format!(
"/tmp/greptimedb/wal/{}",
common_time::util::current_time_millis()
),
storage: ObjectStoreConfig::default(),
}
}
@@ -72,3 +82,23 @@ impl Datanode {
self.services.start(&self.opts).await
}
}
// Options for meta client in datanode instance.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MetaClientOpts {
pub metasrv_addr: String,
pub timeout_millis: u64,
pub connect_timeout_millis: u64,
pub tcp_nodelay: bool,
}
impl Default for MetaClientOpts {
fn default() -> Self {
Self {
metasrv_addr: "127.0.0.1:3002".to_string(),
timeout_millis: 3_000u64,
connect_timeout_millis: 5_000u64,
tcp_nodelay: true,
}
}
}

View File

@@ -279,6 +279,12 @@ pub enum Error {
table_name: String,
source: catalog::error::Error,
},
#[snafu(display("Failed to initialize meta client, source: {}", source))]
MetaClientInit {
#[snafu(backtrace)]
source: meta_client::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -346,6 +352,7 @@ impl ErrorExt for Error {
| Error::CollectRecordBatches { source } => source.status_code(),
Error::ArrowComputation { .. } => StatusCode::Unexpected,
Error::MetaClientInit { source, .. } => source.status_code(),
}
}

View File

@@ -0,0 +1,97 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, Peer};
use common_telemetry::{error, info, warn};
use meta_client::client::{HeartbeatSender, MetaClient};
use snafu::ResultExt;
use crate::error::{MetaClientInitSnafu, Result};
#[derive(Debug, Clone, Default)]
pub struct HeartbeatTask {
node_id: u64,
server_addr: String,
started: Arc<AtomicBool>,
meta_client: MetaClient,
interval: u64,
}
impl HeartbeatTask {
/// Create a new heartbeat task instance.
pub fn new(node_id: u64, server_addr: String, meta_client: MetaClient) -> Self {
Self {
node_id,
server_addr,
started: Arc::new(AtomicBool::new(false)),
meta_client,
interval: 5_000, // default interval is set to 5 secs
}
}
pub async fn create_streams(meta_client: &MetaClient) -> Result<HeartbeatSender> {
let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
common_runtime::spawn_bg(async move {
while let Some(res) = match rx.message().await {
Ok(m) => m,
Err(e) => {
error!(e; "Error while reading heartbeat response");
None
}
} {
Self::handle_response(res).await;
}
info!("Heartbeat handling loop exit.")
});
Ok(tx)
}
async fn handle_response(resp: HeartbeatResponse) {
info!("heartbeat response: {:?}", resp);
}
/// Start heartbeat task, spawn background task.
pub async fn start(&self) -> Result<()> {
let started = self.started.clone();
if started
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
warn!("Heartbeat task started multiple times");
return Ok(());
}
let interval = self.interval;
let node_id = self.node_id;
let server_addr = self.server_addr.clone();
let meta_client = self.meta_client.clone();
let mut tx = Self::create_streams(&meta_client).await?;
common_runtime::spawn_bg(async move {
while started.load(Ordering::Acquire) {
let req = HeartbeatRequest {
peer: Some(Peer {
id: node_id,
addr: server_addr.clone(),
}),
..Default::default()
};
if let Err(e) = tx.send(req).await {
error!("Failed to send heartbeat to metasrv, error: {:?}", e);
match Self::create_streams(&meta_client).await {
Ok(new_tx) => {
info!("Reconnected to metasrv");
tx = new_tx;
}
Err(e) => {
error!(e;"Failed to reconnect to metasrv!");
}
}
}
tokio::time::sleep(Duration::from_millis(interval)).await;
}
});
Ok(())
}
}

View File

@@ -1,8 +1,12 @@
use std::time::Duration;
use std::{fs, path, sync::Arc};
use catalog::remote::MetaKvBackend;
use catalog::CatalogManagerRef;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_telemetry::logging::info;
use log_store::fs::{config::LogConfig, log::LocalFileLogStore};
use meta_client::client::{MetaClient, MetaClientBuilder};
use object_store::{services::fs::Builder, util, ObjectStore};
use query::query_engine::{QueryEngineFactory, QueryEngineRef};
use snafu::prelude::*;
@@ -10,8 +14,9 @@ use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl};
use table_engine::config::EngineConfig as TableEngineConfig;
use table_engine::engine::MitoEngine;
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
use crate::error::{self, NewCatalogSnafu, Result};
use crate::datanode::{DatanodeOptions, MetaClientOpts, ObjectStoreConfig};
use crate::error::{self, MetaClientInitSnafu, NewCatalogSnafu, Result};
use crate::heartbeat::HeartbeatTask;
use crate::script::ScriptExecutor;
use crate::server::grpc::plan::PhysicalPlanner;
use crate::sql::SqlHandler;
@@ -19,15 +24,18 @@ use crate::sql::SqlHandler;
mod grpc;
mod sql;
type DefaultEngine = MitoEngine<EngineImpl<LocalFileLogStore>>;
pub(crate) type DefaultEngine = MitoEngine<EngineImpl<LocalFileLogStore>>;
// An abstraction to read/write services.
pub struct Instance {
query_engine: QueryEngineRef,
sql_handler: SqlHandler,
catalog_manager: CatalogManagerRef,
physical_planner: PhysicalPlanner,
script_executor: ScriptExecutor,
pub(crate) query_engine: QueryEngineRef,
pub(crate) sql_handler: SqlHandler,
pub(crate) catalog_manager: CatalogManagerRef,
pub(crate) physical_planner: PhysicalPlanner,
pub(crate) script_executor: ScriptExecutor,
#[allow(unused)]
pub(crate) meta_client: MetaClient,
pub(crate) heartbeat_task: HeartbeatTask,
}
pub type InstanceRef = Arc<Instance>;
@@ -36,6 +44,7 @@ impl Instance {
pub async fn new(opts: &DatanodeOptions) -> Result<Self> {
let object_store = new_object_store(&opts.storage).await?;
let log_store = create_local_file_log_store(opts).await?;
let meta_client = new_metasrv_client(opts.node_id, &opts.meta_client_opts).await?;
let table_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),
@@ -46,22 +55,34 @@ impl Instance {
),
object_store,
));
let catalog_manager = Arc::new(
catalog::local::LocalCatalogManager::try_new(table_engine.clone())
.await
.context(NewCatalogSnafu)?,
);
// create remote catalog manager
let catalog_manager = Arc::new(catalog::remote::RemoteCatalogManager::new(
table_engine.clone(),
opts.node_id,
Arc::new(MetaKvBackend {
client: meta_client.clone(),
}),
));
let factory = QueryEngineFactory::new(catalog_manager.clone());
let query_engine = factory.query_engine().clone();
let script_executor =
ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?;
let heartbeat_task = HeartbeatTask::new(
opts.node_id, /*node id not set*/
opts.rpc_addr.clone(),
meta_client.clone(),
);
Ok(Self {
query_engine: query_engine.clone(),
sql_handler: SqlHandler::new(table_engine, catalog_manager.clone()),
catalog_manager,
physical_planner: PhysicalPlanner::new(query_engine),
script_executor,
meta_client,
heartbeat_task,
})
}
@@ -70,6 +91,7 @@ impl Instance {
.start()
.await
.context(NewCatalogSnafu)?;
self.heartbeat_task.start().await?;
Ok(())
}
@@ -80,47 +102,9 @@ impl Instance {
pub fn catalog_manager(&self) -> &CatalogManagerRef {
&self.catalog_manager
}
// This method is used in other crate's testing codes, so move it out of "cfg(test)".
// TODO(LFC): Delete it when callers no longer need it.
pub async fn new_mock() -> Result<Self> {
use table_engine::table::test_util::new_test_object_store;
use table_engine::table::test_util::MockEngine;
use table_engine::table::test_util::MockMitoEngine;
let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await;
let mock_engine = Arc::new(MockMitoEngine::new(
TableEngineConfig::default(),
MockEngine::default(),
object_store,
));
let catalog_manager = Arc::new(
catalog::local::manager::LocalCatalogManager::try_new(mock_engine.clone())
.await
.unwrap(),
);
let factory = QueryEngineFactory::new(catalog_manager.clone());
let query_engine = factory.query_engine().clone();
let sql_handler = SqlHandler::new(mock_engine.clone(), catalog_manager.clone());
let physical_planner = PhysicalPlanner::new(query_engine.clone());
let script_executor = ScriptExecutor::new(catalog_manager.clone(), query_engine.clone())
.await
.unwrap();
Ok(Self {
query_engine,
sql_handler,
catalog_manager,
physical_planner,
script_executor,
})
}
}
async fn new_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
// TODO(dennis): supports other backend
let data_dir = util::normalize_dir(match store_config {
ObjectStoreConfig::File { data_dir } => data_dir,
@@ -139,7 +123,38 @@ async fn new_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStor
Ok(ObjectStore::new(accessor))
}
async fn create_local_file_log_store(opts: &DatanodeOptions) -> Result<LocalFileLogStore> {
/// Create metasrv client instance and spawn heartbeat loop.
async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOpts) -> Result<MetaClient> {
let cluster_id = 0; // TODO(hl): read from config
let member_id = node_id;
let config = ChannelConfig::new()
.timeout(Duration::from_millis(meta_config.timeout_millis))
.connect_timeout(Duration::from_millis(meta_config.connect_timeout_millis))
.tcp_nodelay(meta_config.tcp_nodelay);
let channel_manager = ChannelManager::with_config(config);
let mut meta_client = MetaClientBuilder::new(cluster_id, member_id)
.enable_heartbeat()
.enable_router()
.enable_store()
.channel_manager(channel_manager)
.build();
meta_client
.start(&[&meta_config.metasrv_addr])
.await
.context(MetaClientInitSnafu)?;
// required only when the heartbeat_client is enabled
meta_client
.ask_leader()
.await
.context(MetaClientInitSnafu)?;
Ok(meta_client)
}
pub(crate) async fn create_local_file_log_store(
opts: &DatanodeOptions,
) -> Result<LocalFileLogStore> {
// create WAL directory
fs::create_dir_all(path::Path::new(&opts.wal_dir))
.context(error::CreateDirSnafu { dir: &opts.wal_dir })?;

View File

@@ -2,8 +2,10 @@
pub mod datanode;
pub mod error;
mod heartbeat;
pub mod instance;
mod metric;
mod mock;
mod script;
pub mod server;
mod sql;

123
src/datanode/src/mock.rs Normal file
View File

@@ -0,0 +1,123 @@
use std::sync::Arc;
use catalog::remote::MetaKvBackend;
use meta_client::client::{MetaClient, MetaClientBuilder};
use query::QueryEngineFactory;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table_engine::config::EngineConfig as TableEngineConfig;
use crate::datanode::DatanodeOptions;
use crate::error::Result;
use crate::heartbeat::HeartbeatTask;
use crate::instance::{create_local_file_log_store, new_object_store, DefaultEngine, Instance};
use crate::script::ScriptExecutor;
use crate::server::grpc::plan::PhysicalPlanner;
use crate::sql::SqlHandler;
impl Instance {
// This method is used in other crate's testing codes, so move it out of "cfg(test)".
// TODO(LFC): Delete it when callers no longer need it.
pub async fn new_mock() -> Result<Self> {
use table_engine::table::test_util::new_test_object_store;
use table_engine::table::test_util::MockEngine;
use table_engine::table::test_util::MockMitoEngine;
let meta_client = mock_meta_client().await;
let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await;
let mock_engine = Arc::new(MockMitoEngine::new(
TableEngineConfig::default(),
MockEngine::default(),
object_store,
));
let catalog_manager = Arc::new(
catalog::local::manager::LocalCatalogManager::try_new(mock_engine.clone())
.await
.unwrap(),
);
let factory = QueryEngineFactory::new(catalog_manager.clone());
let query_engine = factory.query_engine().clone();
let sql_handler = SqlHandler::new(mock_engine.clone(), catalog_manager.clone());
let physical_planner = PhysicalPlanner::new(query_engine.clone());
let script_executor = ScriptExecutor::new(catalog_manager.clone(), query_engine.clone())
.await
.unwrap();
let heartbeat_task =
HeartbeatTask::new(0, "127.0.0.1:3302".to_string(), meta_client.clone());
Ok(Self {
query_engine,
sql_handler,
catalog_manager,
physical_planner,
script_executor,
meta_client,
heartbeat_task,
})
}
pub async fn with_mock_meta_client(opts: &DatanodeOptions) -> Result<Self> {
let object_store = new_object_store(&opts.storage).await?;
let log_store = create_local_file_log_store(opts).await?;
let meta_client = mock_meta_client().await;
let table_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(log_store),
object_store.clone(),
),
object_store,
));
// create remote catalog manager
let catalog_manager = Arc::new(catalog::remote::RemoteCatalogManager::new(
table_engine.clone(),
opts.node_id,
Arc::new(MetaKvBackend {
client: meta_client.clone(),
}),
));
let factory = QueryEngineFactory::new(catalog_manager.clone());
let query_engine = factory.query_engine().clone();
let script_executor =
ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?;
let heartbeat_task =
HeartbeatTask::new(opts.node_id, opts.rpc_addr.clone(), meta_client.clone());
Ok(Self {
query_engine: query_engine.clone(),
sql_handler: SqlHandler::new(table_engine, catalog_manager.clone()),
catalog_manager,
physical_planner: PhysicalPlanner::new(query_engine),
script_executor,
meta_client,
heartbeat_task,
})
}
}
async fn mock_meta_client() -> MetaClient {
let mock_info = meta_srv::mocks::mock_with_memstore().await;
let meta_srv::mocks::MockInfo {
server_addr,
channel_manager,
} = mock_info;
let id = (1000u64, 2000u64);
let mut meta_client = MetaClientBuilder::new(id.0, id.1)
.enable_heartbeat()
.enable_router()
.enable_store()
.channel_manager(channel_manager)
.build();
meta_client.start(&[&server_addr]).await.unwrap();
// // required only when the heartbeat_client is enabled
meta_client.ask_leader().await.unwrap();
meta_client
}

View File

@@ -84,6 +84,14 @@ impl Instance {
let schema_name = expr
.schema_name
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let region_id = expr
.table_options
.get(&"region_id".to_string())
.unwrap()
.parse::<u32>()
.unwrap();
Ok(CreateTableRequest {
id: table_id,
catalog_name,
@@ -91,6 +99,7 @@ impl Instance {
table_name: expr.table_name,
desc: expr.desc,
schema,
region_numbers: vec![region_id],
primary_key_indices,
create_if_not_exists: expr.create_if_not_exists,
table_options: expr.table_options,
@@ -179,10 +188,11 @@ mod tests {
use super::*;
use crate::tests::test_util;
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_create_expr_to_request() {
common_telemetry::init_default_ut_logging();
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("create_expr_to_request");
let instance = Instance::new(&opts).await.unwrap();
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
let expr = testing_create_expr();
@@ -291,6 +301,9 @@ mod tests {
default_constraint: None,
},
];
let table_options = [("region_id".to_string(), "0".to_string())]
.into_iter()
.collect::<HashMap<_, _>>();
CreateExpr {
catalog_name: None,
schema_name: None,
@@ -300,7 +313,7 @@ mod tests {
time_index: "ts".to_string(),
primary_keys: vec!["ts".to_string(), "host".to_string()],
create_if_not_exists: true,
table_options: HashMap::new(),
table_options,
}
}

View File

@@ -168,6 +168,7 @@ pub fn build_create_table_request(
create_if_not_exists: true,
primary_key_indices,
table_options: HashMap::new(),
region_numbers: vec![0],
});
}

View File

@@ -155,6 +155,7 @@ impl SqlHandler {
table_name,
desc: None,
schema,
region_numbers: vec![0],
primary_key_indices: primary_keys,
create_if_not_exists: stmt.if_not_exists,
table_options: HashMap::new(),

View File

@@ -24,7 +24,7 @@ async fn setup_grpc_server(name: &str, port: usize) -> (String, TestGuard, Arc<G
let (mut opts, guard) = test_util::create_tmp_dir_and_datanode_opts(name);
let addr = format!("127.0.0.1:{}", port);
opts.rpc_addr = addr.clone();
let instance = Arc::new(Instance::new(&opts).await.unwrap());
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
instance.start().await.unwrap();
let addr_cloned = addr.clone();
@@ -50,7 +50,7 @@ async fn setup_grpc_server(name: &str, port: usize) -> (String, TestGuard, Arc<G
(addr, guard, grpc_server)
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_auto_create_table() {
let (addr, _guard, grpc_server) = setup_grpc_server("auto_create_table", 3991).await;
@@ -116,8 +116,9 @@ fn expect_data() -> (Column, Column, Column, Column) {
)
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_insert_and_select() {
common_telemetry::init_default_ut_logging();
let (addr, _guard, grpc_server) = setup_grpc_server("insert_and_select", 3990).await;
let grpc_client = Client::with_urls(vec![addr]);
@@ -247,6 +248,6 @@ fn testing_create_expr() -> CreateExpr {
time_index: "ts".to_string(),
primary_keys: vec!["ts".to_string(), "host".to_string()],
create_if_not_exists: true,
table_options: HashMap::new(),
table_options: HashMap::from([("region_id".to_string(), "0".to_string())]),
}
}

View File

@@ -14,7 +14,7 @@ use crate::tests::test_util;
async fn make_test_app(name: &str) -> (Router, TestGuard) {
let (opts, guard) = test_util::create_tmp_dir_and_datanode_opts(name);
let instance = Arc::new(Instance::new(&opts).await.unwrap());
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
instance.start().await.unwrap();
test_util::create_test_table(&instance, ConcreteDataType::timestamp_millis_datatype())
.await
@@ -23,7 +23,7 @@ async fn make_test_app(name: &str) -> (Router, TestGuard) {
(http_server.make_app(), guard)
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_sql_api() {
common_telemetry::init_default_ut_logging();
let (app, _guard) = make_test_app("sql_api").await;
@@ -83,7 +83,7 @@ async fn test_sql_api() {
);
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_metrics_api() {
common_telemetry::init_default_ut_logging();
common_telemetry::init_default_metrics_recorder();
@@ -104,7 +104,7 @@ async fn test_metrics_api() {
assert!(body.contains("datanode_handle_sql_elapsed"));
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_scripts_api() {
common_telemetry::init_default_ut_logging();
let (app, _guard) = make_test_app("scripts_api").await;

View File

@@ -1,20 +1,18 @@
use arrow::array::{Int64Array, UInt64Array};
use common_query::Output;
use common_recordbatch::util;
use datafusion::arrow_print;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::arrow_array::StringArray;
use datatypes::prelude::ConcreteDataType;
use crate::instance::Instance;
use crate::tests::test_util;
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_insert() {
common_telemetry::init_default_ut_logging();
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("execute_insert");
let instance = Instance::new(&opts).await.unwrap();
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
test_util::create_test_table(&instance, ConcreteDataType::timestamp_millis_datatype())
@@ -33,12 +31,12 @@ async fn test_execute_insert() {
assert!(matches!(output, Output::AffectedRows(2)));
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_insert_query_with_i64_timestamp() {
common_telemetry::init_default_ut_logging();
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("insert_query_i64_timestamp");
let instance = Instance::new(&opts).await.unwrap();
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
test_util::create_test_table(&instance, ConcreteDataType::int64_datatype())
@@ -72,10 +70,10 @@ async fn test_execute_insert_query_with_i64_timestamp() {
}
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_query() {
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("execute_query");
let instance = Instance::new(&opts).await.unwrap();
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
let output = instance
@@ -98,11 +96,11 @@ async fn test_execute_query() {
}
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_show_databases_tables() {
let (opts, _guard) =
test_util::create_tmp_dir_and_datanode_opts("execute_show_databases_tables");
let instance = Instance::new(&opts).await.unwrap();
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
let output = instance.execute_sql("show databases").await.unwrap();
@@ -188,12 +186,12 @@ async fn test_execute_show_databases_tables() {
}
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
pub async fn test_execute_create() {
common_telemetry::init_default_ut_logging();
let (opts, _guard) = test_util::create_tmp_dir_and_datanode_opts("execute_create");
let instance = Instance::new(&opts).await.unwrap();
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
let output = instance
@@ -212,13 +210,13 @@ pub async fn test_execute_create() {
assert!(matches!(output, Output::AffectedRows(1)));
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
pub async fn test_create_table_illegal_timestamp_type() {
common_telemetry::init_default_ut_logging();
let (opts, _guard) =
test_util::create_tmp_dir_and_datanode_opts("create_table_illegal_timestamp_type");
let instance = Instance::new(&opts).await.unwrap();
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
let output = instance
@@ -244,6 +242,8 @@ pub async fn test_create_table_illegal_timestamp_type() {
#[tokio::test]
async fn test_alter_table() {
use datafusion::arrow_print;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
// TODO(LFC) Use real Mito engine when we can alter its region schema,
// and delete the `new_mock` method.
let instance = Instance::new_mock().await.unwrap();

View File

@@ -72,6 +72,7 @@ pub async fn create_test_table(instance: &Instance, ts_type: ConcreteDataType) -
create_if_not_exists: true,
primary_key_indices: vec![3, 0], // "host" and "ts" are primary keys
table_options: HashMap::new(),
region_numbers: vec![0],
},
)
.await

View File

@@ -21,9 +21,9 @@ datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch =
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
datatypes = { path = "../datatypes" }
itertools = "0.10"
openmetrics-parser = "0.4"
prost = "0.11"
itertools = "0.10"
query = { path = "../query" }
serde = "1.0"
servers = { path = "../servers" }

View File

@@ -161,7 +161,10 @@ fn create_to_expr(create: CreateTable) -> Result<CreateExpr> {
primary_keys: find_primary_keys(&create.constraints)?,
create_if_not_exists: create.if_not_exists,
// TODO(LFC): Fill in other table options.
table_options: HashMap::from([("engine".to_string(), create.engine)]),
table_options: HashMap::from([
("engine".to_string(), create.engine),
("region_id".to_string(), "0".to_string()),
]),
..Default::default()
};
Ok(expr)
@@ -550,12 +553,15 @@ mod tests {
default_constraint: None,
},
];
let mut table_options = HashMap::with_capacity(1);
table_options.insert("region_id".to_string(), "0".to_string());
CreateExpr {
table_name: "demo".to_string(),
column_defs,
time_index: "ts".to_string(),
primary_keys: vec!["ts".to_string(), "host".to_string()],
create_if_not_exists: true,
table_options,
..Default::default()
}
}

View File

@@ -328,7 +328,7 @@ mod test {
use super::*;
use crate::partitioning::range::RangePartitionRule;
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_dist_table_scan() {
let table = Arc::new(new_dist_table().await);
@@ -475,7 +475,7 @@ mod test {
..Default::default()
};
let instance = Arc::new(Instance::new(&opts).await.unwrap());
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
instance.start().await.unwrap();
let catalog_manager = instance.catalog_manager().clone();
@@ -498,7 +498,7 @@ mod test {
)
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_find_regions() {
let table = new_dist_table().await;

View File

@@ -11,8 +11,8 @@ use router::Client as RouterClient;
use snafu::OptionExt;
use store::Client as StoreClient;
use self::heartbeat::HeartbeatSender;
use self::heartbeat::HeartbeatStream;
pub use self::heartbeat::HeartbeatSender;
pub use self::heartbeat::HeartbeatStream;
use crate::error;
use crate::error::Result;
use crate::rpc::BatchPutRequest;

View File

@@ -1,6 +1,6 @@
mod router;
mod store;
mod util;
pub mod util;
use api::v1::meta::KeyValue as PbKeyValue;
use api::v1::meta::Peer as PbPeer;

View File

@@ -47,6 +47,7 @@ impl ScriptsTable {
desc: Some("Scripts table".to_string()),
schema,
// name and timestamp as primary key
region_numbers: vec![0],
primary_key_indices: vec![0, 3],
create_if_not_exists: true,
table_options: HashMap::default(),

View File

@@ -270,7 +270,8 @@ impl<S: StorageEngine> MitoEngineInner<S> {
let table_id = request.id;
// TODO(dennis): supports multi regions;
let region_number = 0;
assert_eq!(1, request.region_numbers.len());
let region_number = request.region_numbers[0];
let region_id = region_id(table_id, region_number);
let region_name = region_name(table_id, region_number);
@@ -311,6 +312,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.engine(MITO_ENGINE)
.next_column_id(next_column_id)
.primary_key_indices(request.primary_key_indices.clone())
.region_numbers(vec![region_number])
.build()
.context(error::BuildTableMetaSnafu { table_name })?;
@@ -495,6 +497,7 @@ mod tests {
create_if_not_exists: true,
primary_key_indices: Vec::default(),
table_options: HashMap::new(),
region_numbers: vec![0],
},
)
.await
@@ -753,6 +756,7 @@ mod tests {
desc: None,
primary_key_indices: Vec::default(),
table_options: HashMap::new(),
region_numbers: vec![0],
};
let created_table = table_engine.create_table(&ctx, request).await.unwrap();
@@ -776,6 +780,7 @@ mod tests {
desc: None,
primary_key_indices: Vec::default(),
table_options: HashMap::new(),
region_numbers: vec![0],
};
let result = table_engine.create_table(&ctx, request).await;

View File

@@ -484,10 +484,10 @@ impl<R: Region> MitoTable<R> {
) -> Result<MitoTable<R>> {
let manifest = TableManifest::new(&table_manifest_dir(table_name), object_store);
let table_info = Self::recover_table_info(table_name, &manifest)
let mut table_info = Self::recover_table_info(table_name, &manifest)
.await?
.context(TableInfoNotFoundSnafu { table_name })?;
table_info.meta.region_numbers = vec![(region.id() & 0xFFFFFFFF) as u32];
Ok(MitoTable::new(table_info, region, manifest))
}

View File

@@ -103,6 +103,7 @@ pub async fn setup_test_engine_and_table() -> (
create_if_not_exists: true,
primary_key_indices: Vec::default(),
table_options: HashMap::new(),
region_numbers: vec![0],
},
)
.await
@@ -135,6 +136,7 @@ pub async fn setup_mock_engine_and_table(
create_if_not_exists: true,
primary_key_indices: Vec::default(),
table_options: HashMap::new(),
region_numbers: vec![0],
},
)
.await

View File

@@ -55,6 +55,8 @@ pub struct TableMeta {
pub value_indices: Vec<usize>,
#[builder(default, setter(into))]
pub engine: String,
#[builder(default, setter(into))]
pub region_numbers: Vec<u32>,
pub next_column_id: ColumnId,
/// Options for table engine.
#[builder(default)]
@@ -162,6 +164,7 @@ pub struct RawTableMeta {
pub value_indices: Vec<usize>,
pub engine: String,
pub next_column_id: ColumnId,
pub region_numbers: Vec<u32>,
pub engine_options: HashMap<String, String>,
pub options: HashMap<String, String>,
pub created_on: DateTime<Utc>,
@@ -175,6 +178,7 @@ impl From<TableMeta> for RawTableMeta {
value_indices: meta.value_indices,
engine: meta.engine,
next_column_id: meta.next_column_id,
region_numbers: meta.region_numbers,
engine_options: meta.engine_options,
options: meta.options,
created_on: meta.created_on,
@@ -191,6 +195,7 @@ impl TryFrom<RawTableMeta> for TableMeta {
primary_key_indices: raw.primary_key_indices,
value_indices: raw.value_indices,
engine: raw.engine,
region_numbers: vec![],
next_column_id: raw.next_column_id,
engine_options: raw.engine_options,
options: raw.options,

View File

@@ -22,6 +22,7 @@ pub struct CreateTableRequest {
pub table_name: String,
pub desc: Option<String>,
pub schema: SchemaRef,
pub region_numbers: Vec<u32>,
pub primary_key_indices: Vec<usize>,
pub create_if_not_exists: bool,
pub table_options: HashMap<String, String>,

View File

@@ -8,12 +8,12 @@ use common_recordbatch::{RecordBatch, RecordBatchStream};
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::arrow::array::UInt32Array;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use futures::task::{Context, Poll};
use futures::Stream;
use crate::error::Result;
use crate::metadata::TableInfoRef;
use crate::metadata::{TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType};
use crate::table::scan::SimpleTableScan;
use crate::table::{Expr, Table};
@@ -31,7 +31,12 @@ impl Default for NumbersTable {
false,
)];
Self {
schema: Arc::new(Schema::new(column_schemas)),
schema: Arc::new(
SchemaBuilder::try_from_columns(column_schemas)
.unwrap()
.build()
.unwrap(),
),
}
}
}
@@ -47,7 +52,26 @@ impl Table for NumbersTable {
}
fn table_info(&self) -> TableInfoRef {
unimplemented!()
Arc::new(
TableInfoBuilder::default()
.table_id(1)
.name("numbers")
.catalog_name("greptime")
.schema_name("public")
.table_version(0)
.table_type(TableType::Base)
.meta(
TableMetaBuilder::default()
.schema(self.schema.clone())
.region_numbers(vec![0])
.primary_key_indices(vec![0])
.next_column_id(1)
.build()
.unwrap(),
)
.build()
.unwrap(),
)
}
async fn scan(