This commit is contained in:
Conrad Ludgate
2024-10-01 16:11:54 +01:00
parent 973eb69cd3
commit 6588edd693

View File

@@ -22,7 +22,6 @@ use hyper1::Response;
use hyper1::StatusCode;
use hyper1::{HeaderMap, Request};
use pq_proto::StartupMessageParamsBuilder;
use serde::Serialize;
use tokio::time;
use tokio_postgres::error::DbError;
use tokio_postgres::error::ErrorPosition;
@@ -580,16 +579,16 @@ async fn handle_db_inner(
//
// Determine the destination and connection params
//
let headers = request.headers();
let (parts, body) = request.into_parts();
// Allow connection pooling only if explicitly requested
// or if we have decided that http pool is no longer opt-in
let allow_pool = !config.http_config.pool_options.opt_in
|| headers.get(&ALLOW_POOL) == Some(&HEADER_VALUE_TRUE);
|| parts.headers.get(&ALLOW_POOL) == Some(&HEADER_VALUE_TRUE);
let parsed_headers = HttpHeaders::try_parse(headers)?;
let parsed_headers = HttpHeaders::try_parse(&parts.headers)?;
let request_content_length = match request.body().size_hint().upper() {
let request_content_length = match body.size_hint().upper() {
Some(v) => v,
None => config.http_config.max_request_size_bytes + 1,
};
@@ -607,20 +606,19 @@ async fn handle_db_inner(
));
}
let mut arena = Arena::default();
let fetch_and_process_request = Box::pin(async {
let fetch_and_process_request = Box::pin(async move {
let mut arena = Arena::default();
let seed = SerdeArena {
arena: &mut arena,
_t: PhantomData::<Payload>,
};
let payload = parse_json_body_with_limit(
seed,
request.into_body(),
body,
config.http_config.max_request_size_bytes as usize,
)
.await?;
Ok::<Payload, SqlOverHttpError>(payload) // Adjust error type accordingly
Ok::<(Arena, Payload), SqlOverHttpError>((arena, payload)) // Adjust error type accordingly
});
let authenticate_and_connect = Box::pin(
@@ -664,7 +662,7 @@ async fn handle_db_inner(
.map_err(SqlOverHttpError::from),
);
let (payload, mut client) = match run_until_cancelled(
let ((mut arena, payload), mut client) = match run_until_cancelled(
// Run both operations in parallel
try_join(
pin!(fetch_and_process_request),
@@ -711,6 +709,13 @@ async fn handle_db_inner(
}
};
info!(
str_len = arena.str_arena.len(),
params = arena.params_arena.len(),
response = json_output.len(),
"data size"
);
let metrics = client.metrics();
let len = json_output.len();
@@ -822,10 +827,7 @@ impl QueryData {
// The query successfully completed.
Either::Left((Ok((status, results)), __not_yet_cancelled)) => {
discard.check_idle(status);
let json_output =
serde_json::to_string(&results).expect("json serialization should not fail");
Ok(json_output)
Ok(results)
}
// The query failed with an error
Either::Left((Err(e), __not_yet_cancelled)) => {
@@ -958,7 +960,8 @@ async fn query_batch(
queries: BatchQueryData,
parsed_headers: HttpHeaders,
) -> Result<String, SqlOverHttpError> {
let mut result_bytes = br#"{"results":["#.to_vec();
let mut comma = false;
let mut results = r#"{"results":["#.to_string();
let mut current_size = 0;
for stmt in queries.queries {
@@ -975,12 +978,11 @@ async fn query_batch(
match res {
// TODO: maybe we should check that the transaction bit is set here
Either::Left((Ok((_, values)), _cancelled)) => {
if !result_bytes.is_empty() {
result_bytes.push(b',');
if comma {
results.push(',');
}
values
.serialize(&mut serde_json::Serializer::new(&mut result_bytes))
.expect("serializing results to memory should always succeed");
results.push_str(&values);
comma = true;
}
Either::Left((Err(e), _cancelled)) => {
return Err(e);
@@ -991,12 +993,9 @@ async fn query_batch(
}
}
result_bytes.extend_from_slice(b"]}");
results.push_str("]}");
let json_output =
String::from_utf8(result_bytes).expect("JSON serialization should always be utf8");
Ok(json_output)
Ok(results)
}
async fn query_to_json<T: GenericClient>(
@@ -1006,7 +1005,7 @@ async fn query_to_json<T: GenericClient>(
data: QueryData,
current_size: &mut usize,
parsed_headers: HttpHeaders,
) -> Result<(ReadyForQueryStatus, impl Serialize), SqlOverHttpError> {
) -> Result<(ReadyForQueryStatus, String), SqlOverHttpError> {
info!("executing query");
let query_params = arena.params_arena[data.params.into_range()]
@@ -1084,12 +1083,13 @@ async fn query_to_json<T: GenericClient>(
// Resulting JSON format is based on the format of node-postgres result.
let results = json!({
"command": command_tag_name.to_string(),
"command": command_tag_name,
"rowCount": command_tag_count,
"rows": rows,
"fields": fields,
"rowAsArray": array_mode,
});
})
.to_string();
Ok((ready, results))
}