diff --git a/Cargo.lock b/Cargo.lock index e55db3a4df..829b4ee1f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5092,6 +5092,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73d946ec7d256b04dfadc4e6a3292324e6f417124750fc5c0950f981b703a0f1" dependencies = [ "bytes", + "chrono", "fallible-iterator", "postgres-protocol", ] @@ -6671,6 +6672,7 @@ dependencies = [ "base64 0.13.1", "bytes", "catalog", + "chrono", "common-base", "common-catalog", "common-error", @@ -6698,6 +6700,7 @@ dependencies = [ "opensrv-mysql", "pgwire", "pin-project", + "postgres-types", "prost 0.11.6", "query", "rand 0.8.5", diff --git a/src/common/time/src/date.rs b/src/common/time/src/date.rs index e6057c81b0..fff9f412db 100644 --- a/src/common/time/src/date.rs +++ b/src/common/time/src/date.rs @@ -71,6 +71,10 @@ impl Date { pub fn val(&self) -> i32 { self.0 } + + pub fn to_chrono_date(&self) -> Option { + NaiveDate::from_num_days_from_ce_opt(UNIX_EPOCH_FROM_CE + self.0) + } } #[cfg(test)] diff --git a/src/common/time/src/datetime.rs b/src/common/time/src/datetime.rs index fba96b233a..32bdf96206 100644 --- a/src/common/time/src/datetime.rs +++ b/src/common/time/src/datetime.rs @@ -23,7 +23,7 @@ use crate::error::{Error, ParseDateStrSnafu, Result}; const DATETIME_FORMAT: &str = "%F %T"; -/// [DateTime] represents the **seconds elapsed since "1970-01-01 00:00:00 UTC" (UNIX Epoch)**. +/// [DateTime] represents the **seconds elapsed since "1970-01-01 00:00:00 UTC" (UNIX Epoch)**. #[derive( Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize, )] @@ -69,6 +69,10 @@ impl DateTime { pub fn val(&self) -> i64 { self.0 } + + pub fn to_chrono_datetime(&self) -> Option { + NaiveDateTime::from_timestamp_millis(self.0) + } } #[cfg(test)] diff --git a/src/common/time/src/timestamp.rs b/src/common/time/src/timestamp.rs index 132af898d3..e34f35e6ef 100644 --- a/src/common/time/src/timestamp.rs +++ b/src/common/time/src/timestamp.rs @@ -114,8 +114,15 @@ impl Timestamp { /// Format timestamp to ISO8601 string. If the timestamp exceeds what chrono timestamp can /// represent, this function simply print the timestamp unit and value in plain string. pub fn to_iso8601_string(&self) -> String { - let nano_factor = TimeUnit::Second.factor() / TimeUnit::Nanosecond.factor(); + if let LocalResult::Single(datetime) = self.to_chrono_datetime() { + format!("{}", datetime.format("%Y-%m-%d %H:%M:%S%.f%z")) + } else { + format!("[Timestamp{}: {}]", self.unit, self.value) + } + } + pub fn to_chrono_datetime(&self) -> LocalResult> { + let nano_factor = TimeUnit::Second.factor() / TimeUnit::Nanosecond.factor(); let (mut secs, mut nsecs) = self.split(); if nsecs < 0 { @@ -123,11 +130,7 @@ impl Timestamp { nsecs += nano_factor; } - if let LocalResult::Single(datetime) = Utc.timestamp_opt(secs, nsecs as u32) { - format!("{}", datetime.format("%Y-%m-%d %H:%M:%S%.f%z")) - } else { - format!("[Timestamp{}: {}]", self.unit, self.value) - } + Utc.timestamp_opt(secs, nsecs as u32) } } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index c0498ae3bd..b0dd12fa0d 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -14,6 +14,7 @@ axum-macros = "0.3" base64 = "0.13" bytes = "1.2" catalog = { path = "../catalog" } +chrono.workspace = true common-base = { path = "../common/base" } common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } @@ -40,6 +41,7 @@ openmetrics-parser = "0.4" opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "b44c9d1360da297b305abf33aecfa94888e1554c" } pgwire = "0.10" pin-project = "1.0" +postgres-types = { version = "0.2", features = ["with-chrono-0_4"] } prost.workspace = true query = { path = "../query" } rand = "0.8" diff --git a/src/servers/src/postgres/handler.rs b/src/servers/src/postgres/handler.rs index 4d64c156cf..8a991c00e5 100644 --- a/src/servers/src/postgres/handler.rs +++ b/src/servers/src/postgres/handler.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use chrono::LocalResult; use common_query::Output; use common_recordbatch::error::Result as RecordBatchResult; use common_recordbatch::RecordBatch; @@ -171,9 +172,37 @@ fn encode_text_value(value: &Value, builder: &mut DataRowEncoder) -> PgWireResul Value::Float64(v) => builder.encode_text_format_field(Some(&v.0)), Value::String(v) => builder.encode_text_format_field(Some(&v.as_utf8())), Value::Binary(v) => builder.encode_text_format_field(Some(&hex::encode(v.deref()))), - Value::Date(v) => builder.encode_text_format_field(Some(&v.to_string())), - Value::DateTime(v) => builder.encode_text_format_field(Some(&v.to_string())), - Value::Timestamp(v) => builder.encode_text_format_field(Some(&v.to_iso8601_string())), + Value::Date(v) => { + if let Some(date) = v.to_chrono_date() { + builder.encode_text_format_field(Some(&date.format("%Y-%m-%d").to_string())) + } else { + Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!("Failed to convert date to postgres type {v:?}",), + }))) + } + } + Value::DateTime(v) => { + if let Some(datetime) = v.to_chrono_datetime() { + builder.encode_text_format_field(Some( + &datetime.format("%Y-%m-%d %H:%M:%S%.6f").to_string(), + )) + } else { + Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!("Failed to convert date to postgres type {v:?}",), + }))) + } + } + Value::Timestamp(v) => { + if let LocalResult::Single(datetime) = v.to_chrono_datetime() { + builder.encode_text_format_field(Some( + &datetime.format("%Y-%m-%d %H:%M:%S%.6f").to_string(), + )) + } else { + Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!("Failed to convert date to postgres type {v:?}",), + }))) + } + } Value::List(_) => Err(PgWireError::ApiError(Box::new(Error::Internal { err_msg: format!( "cannot write value {:?} in postgres protocol: unimplemented", @@ -203,9 +232,24 @@ fn encode_binary_value( Value::Float64(v) => builder.encode_binary_format_field(&v.0, datatype), Value::String(v) => builder.encode_binary_format_field(&v.as_utf8(), datatype), Value::Binary(v) => builder.encode_binary_format_field(&v.deref(), datatype), - // TODO(sunng87): correct date/time types encoding - Value::Date(v) => builder.encode_binary_format_field(&v.to_string(), datatype), - Value::DateTime(v) => builder.encode_binary_format_field(&v.to_string(), datatype), + Value::Date(v) => { + if let Some(date) = v.to_chrono_date() { + builder.encode_binary_format_field(&date, datatype) + } else { + Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!("Failed to convert date to postgres type {v:?}",), + }))) + } + } + Value::DateTime(v) => { + if let Some(datetime) = v.to_chrono_datetime() { + builder.encode_binary_format_field(&datetime, datatype) + } else { + Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!("Failed to convert datetime to postgres type {v:?}",), + }))) + } + } Value::Timestamp(v) => { // convert timestamp to SystemTime if let Some(ts) = v.convert_to(TimeUnit::Microsecond) { @@ -213,7 +257,7 @@ fn encode_binary_value( builder.encode_binary_format_field(&sys_time, datatype) } else { Err(PgWireError::ApiError(Box::new(Error::Internal { - err_msg: format!("Failed to conver timestamp to postgres type {v:?}",), + err_msg: format!("Failed to convert timestamp to postgres type {v:?}",), }))) } }