diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index b77294ce02..9256ba7d69 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -38,6 +38,9 @@ struct StartCommand { grpc_addr: Option, #[clap(long)] mysql_addr: Option, + #[cfg(feature = "postgres")] + #[clap(long)] + postgres_addr: Option, #[clap(short, long)] config_file: Option, } @@ -69,6 +72,10 @@ impl TryFrom for FrontendOptions { if let Some(addr) = cmd.mysql_addr { opts.mysql_addr = Some(addr); } + #[cfg(feature = "postgres")] + if let Some(addr) = cmd.postgres_addr { + opts.postgres_addr = Some(addr); + } Ok(opts) } } @@ -83,15 +90,24 @@ mod tests { http_addr: Some("127.0.0.1:1234".to_string()), grpc_addr: None, mysql_addr: Some("127.0.0.1:5678".to_string()), + #[cfg(feature = "postgres")] + postgres_addr: Some("127.0.0.1:5432".to_string()), config_file: None, }; let opts: FrontendOptions = command.try_into().unwrap(); assert_eq!(opts.http_addr, Some("127.0.0.1:1234".to_string())); assert_eq!(opts.mysql_addr, Some("127.0.0.1:5678".to_string())); + #[cfg(feature = "postgres")] + assert_eq!(opts.postgres_addr, Some("127.0.0.1:5432".to_string())); let default_opts = FrontendOptions::default(); assert_eq!(opts.grpc_addr, default_opts.grpc_addr); assert_eq!(opts.mysql_runtime_size, default_opts.mysql_runtime_size); + #[cfg(feature = "postgres")] + assert_eq!( + opts.postgres_runtime_size, + default_opts.postgres_runtime_size + ); } } diff --git a/src/datatypes/src/arrow_array.rs b/src/datatypes/src/arrow_array.rs index f784c9574b..8107754ddc 100644 --- a/src/datatypes/src/arrow_array.rs +++ b/src/datatypes/src/arrow_array.rs @@ -80,7 +80,9 @@ pub fn arrow_array_get(array: &dyn Array, idx: usize) -> Result { mod test { use arrow::array::Int64Array as ArrowI64Array; use arrow::array::*; - use common_time::timestamp::TimeUnit; + use arrow::buffer::Buffer; + use arrow::datatypes::{DataType, TimeUnit as ArrowTimeUnit}; + use common_time::timestamp::{TimeUnit, Timestamp}; use super::*; use crate::prelude::Vector; @@ -142,5 +144,25 @@ mod test { value, Value::Timestamp(Timestamp::new(2, TimeUnit::Millisecond)) ); + + let array4 = PrimitiveArray::::from_data( + DataType::Timestamp(ArrowTimeUnit::Millisecond, None), + Buffer::from_slice(&vec![1, 2, 3, 4]), + None, + ); + assert_eq!( + Value::Timestamp(Timestamp::new(1, TimeUnit::Millisecond)), + arrow_array_get(&array4, 0).unwrap() + ); + + let array4 = PrimitiveArray::::from_data( + DataType::Timestamp(ArrowTimeUnit::Nanosecond, None), + Buffer::from_slice(&vec![1, 2, 3, 4]), + None, + ); + assert_eq!( + Value::Timestamp(Timestamp::new(1, TimeUnit::Nanosecond)), + arrow_array_get(&array4, 0).unwrap() + ); } } diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 3fea2cf4f2..b8a3341bb4 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -37,3 +37,7 @@ futures = "0.3" tempdir = "0.3" tonic = "0.8" tower = "0.4" + +[features] +default = ["postgres"] +postgres = ["servers/postgres"] diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 4464062566..6f55cc1e9d 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -13,6 +13,10 @@ pub struct FrontendOptions { pub grpc_addr: Option, pub mysql_addr: Option, pub mysql_runtime_size: u32, + #[cfg(feature = "postgres")] + pub postgres_addr: Option, + #[cfg(feature = "postgres")] + pub postgres_runtime_size: u32, } impl Default for FrontendOptions { @@ -22,6 +26,10 @@ impl Default for FrontendOptions { grpc_addr: Some("0.0.0.0:4001".to_string()), mysql_addr: Some("0.0.0.0:4002".to_string()), mysql_runtime_size: 2, + #[cfg(feature = "postgres")] + postgres_addr: Some("0.0.0.0:4003".to_string()), + #[cfg(feature = "postgres")] + postgres_runtime_size: 2, } } } diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 19d10a792a..12f6d07766 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -5,6 +5,8 @@ use common_runtime::Builder as RuntimeBuilder; use servers::grpc::GrpcServer; use servers::http::HttpServer; use servers::mysql::server::MysqlServer; +#[cfg(feature = "postgres")] +use servers::postgres::PostgresServer; use servers::server::Server; use snafu::ResultExt; use tokio::try_join; @@ -55,10 +57,32 @@ impl Services { None }; + #[cfg(feature = "postgres")] + let postgres_server_and_addr = if let Some(pg_addr) = &opts.postgres_addr { + let pg_addr = parse_addr(pg_addr)?; + + let pg_io_runtime = Arc::new( + RuntimeBuilder::default() + .worker_threads(opts.postgres_runtime_size as usize) + .thread_name("pg-io-handlers") + .build() + .context(error::RuntimeResourceSnafu)?, + ); + + let pg_server = + Box::new(PostgresServer::new(instance.clone(), pg_io_runtime)) as Box; + + Some((pg_server, pg_addr)) + } else { + None + }; + try_join!( start_server(http_server_and_addr), start_server(grpc_server_and_addr), - start_server(mysql_server_and_addr) + start_server(mysql_server_and_addr), + #[cfg(feature = "postgres")] + start_server(postgres_server_and_addr), ) .context(error::StartServerSnafu)?; Ok(())