diff --git a/Cargo.lock b/Cargo.lock index d93c7cd42e..f08097c95a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9203,9 +9203,9 @@ dependencies = [ [[package]] name = "pgwire" -version = "0.36.1" +version = "0.36.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d331bb0eef5bc83a221c0a85b1f205bccf094d4f72a26ae1d68a1b1c535123b7" +checksum = "70a2bcdcc4b20a88e0648778ecf00415bbd5b447742275439c22176835056f99" dependencies = [ "async-trait", "base64 0.22.1", diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index b5e6371785..42ab4d1cd0 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -87,7 +87,7 @@ operator.workspace = true otel-arrow-rust.workspace = true parking_lot.workspace = true pg_interval = "0.4" -pgwire = { version = "0.36.1", default-features = false, features = [ +pgwire = { version = "0.36.3", default-features = false, features = [ "server-api-ring", "pg-ext-types", ] } diff --git a/src/servers/src/postgres/handler.rs b/src/servers/src/postgres/handler.rs index ab0ea32b5e..ad242390ec 100644 --- a/src/servers/src/postgres/handler.rs +++ b/src/servers/src/postgres/handler.rs @@ -461,8 +461,18 @@ impl ExtendedQueryHandler for PostgresServerHandlerInner { // we will not support other show statements for extended query protocol at least for now. // because the return columns is not predictable at this stage _ => { - // fallback to NoData - Ok(DescribePortalResponse::new(vec![])) + // test if query caught by fixture + if let Some(mut resp) = + fixtures::process(&sql_plan.query, self.session.new_query_context()) + && let Response::Query(query_response) = resp.remove(0) + { + Ok(DescribePortalResponse::new( + (*query_response.row_schema()).clone(), + )) + } else { + // fallback to NoData + Ok(DescribePortalResponse::new(vec![])) + } } } } diff --git a/src/servers/src/postgres/types.rs b/src/servers/src/postgres/types.rs index 323b4fb558..96a2105d44 100644 --- a/src/servers/src/postgres/types.rs +++ b/src/servers/src/postgres/types.rs @@ -1260,6 +1260,204 @@ pub(super) fn parameters_to_scalar_values( ScalarValue::Null } } + &Type::TIMESTAMP_ARRAY => { + let data = portal.parameter::>(idx, &client_type)?; + if let Some(data) = data { + if let Some(ConcreteDataType::List(list_type)) = &server_type { + match list_type.item_type() { + ConcreteDataType::Timestamp(unit) => match *unit { + TimestampType::Second(_) => { + let values = data + .into_iter() + .map(|ts| { + ScalarValue::TimestampSecond( + Some(ts.and_utc().timestamp()), + None, + ) + }) + .collect::>(); + ScalarValue::List(ScalarValue::new_list( + &values, + &ArrowDataType::Timestamp(TimeUnit::Second, None), + true, + )) + } + TimestampType::Millisecond(_) => { + let values = data + .into_iter() + .map(|ts| { + ScalarValue::TimestampMillisecond( + Some(ts.and_utc().timestamp_millis()), + None, + ) + }) + .collect::>(); + ScalarValue::List(ScalarValue::new_list( + &values, + &ArrowDataType::Timestamp(TimeUnit::Millisecond, None), + true, + )) + } + TimestampType::Microsecond(_) => { + let values = data + .into_iter() + .map(|ts| { + ScalarValue::TimestampMicrosecond( + Some(ts.and_utc().timestamp_micros()), + None, + ) + }) + .collect::>(); + ScalarValue::List(ScalarValue::new_list( + &values, + &ArrowDataType::Timestamp(TimeUnit::Microsecond, None), + true, + )) + } + TimestampType::Nanosecond(_) => { + let values = data + .into_iter() + .filter_map(|ts| { + ts.and_utc().timestamp_nanos_opt().map(|nanos| { + ScalarValue::TimestampNanosecond(Some(nanos), None) + }) + }) + .collect::>(); + ScalarValue::List(ScalarValue::new_list( + &values, + &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), + true, + )) + } + }, + _ => { + return Err(invalid_parameter_error( + "invalid_parameter_type", + Some(format!( + "Expected: {}, found: {}", + list_type.item_type(), + client_type + )), + )); + } + } + } else { + // Default to millisecond when no server type is specified + let values = data + .into_iter() + .map(|ts| { + ScalarValue::TimestampMillisecond( + Some(ts.and_utc().timestamp_millis()), + None, + ) + }) + .collect::>(); + ScalarValue::List(ScalarValue::new_list( + &values, + &ArrowDataType::Timestamp(TimeUnit::Millisecond, None), + true, + )) + } + } else { + ScalarValue::Null + } + } + &Type::TIMESTAMPTZ_ARRAY => { + let data = portal.parameter::>>(idx, &client_type)?; + if let Some(data) = data { + if let Some(ConcreteDataType::List(list_type)) = &server_type { + match list_type.item_type() { + ConcreteDataType::Timestamp(unit) => match *unit { + TimestampType::Second(_) => { + let values = data + .into_iter() + .map(|ts| { + ScalarValue::TimestampSecond(Some(ts.timestamp()), None) + }) + .collect::>(); + ScalarValue::List(ScalarValue::new_list( + &values, + &ArrowDataType::Timestamp(TimeUnit::Second, None), + true, + )) + } + TimestampType::Millisecond(_) => { + let values = data + .into_iter() + .map(|ts| { + ScalarValue::TimestampMillisecond( + Some(ts.timestamp_millis()), + None, + ) + }) + .collect::>(); + ScalarValue::List(ScalarValue::new_list( + &values, + &ArrowDataType::Timestamp(TimeUnit::Millisecond, None), + true, + )) + } + TimestampType::Microsecond(_) => { + let values = data + .into_iter() + .map(|ts| { + ScalarValue::TimestampMicrosecond( + Some(ts.timestamp_micros()), + None, + ) + }) + .collect::>(); + ScalarValue::List(ScalarValue::new_list( + &values, + &ArrowDataType::Timestamp(TimeUnit::Microsecond, None), + true, + )) + } + TimestampType::Nanosecond(_) => { + let values = data + .into_iter() + .filter_map(|ts| { + ts.timestamp_nanos_opt().map(|nanos| { + ScalarValue::TimestampNanosecond(Some(nanos), None) + }) + }) + .collect::>(); + ScalarValue::List(ScalarValue::new_list( + &values, + &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), + true, + )) + } + }, + _ => { + return Err(invalid_parameter_error( + "invalid_parameter_type", + Some(format!( + "Expected: {}, found: {}", + list_type.item_type(), + client_type + )), + )); + } + } + } else { + // Default to millisecond when no server type is specified + let values = data + .into_iter() + .map(|ts| { + ScalarValue::TimestampMillisecond(Some(ts.timestamp_millis()), None) + }) + .collect::>(); + ScalarValue::List(ScalarValue::new_list( + &values, + &ArrowDataType::Timestamp(TimeUnit::Millisecond, None), + true, + )) + } + } else { + ScalarValue::Null + } + } _ => Err(invalid_parameter_error( "unsupported_parameter_value", Some(format!("Found type: {}", client_type)),