From 6588edd693480a81c623c18cfac2de5ad50f17a6 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Tue, 1 Oct 2024 16:11:54 +0100 Subject: [PATCH] log --- proxy/src/serverless/sql_over_http.rs | 58 +++++++++++++-------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index af9e170847..f86a11a8df 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -22,7 +22,6 @@ use hyper1::Response; use hyper1::StatusCode; use hyper1::{HeaderMap, Request}; use pq_proto::StartupMessageParamsBuilder; -use serde::Serialize; use tokio::time; use tokio_postgres::error::DbError; use tokio_postgres::error::ErrorPosition; @@ -580,16 +579,16 @@ async fn handle_db_inner( // // Determine the destination and connection params // - let headers = request.headers(); + let (parts, body) = request.into_parts(); // Allow connection pooling only if explicitly requested // or if we have decided that http pool is no longer opt-in let allow_pool = !config.http_config.pool_options.opt_in - || headers.get(&ALLOW_POOL) == Some(&HEADER_VALUE_TRUE); + || parts.headers.get(&ALLOW_POOL) == Some(&HEADER_VALUE_TRUE); - let parsed_headers = HttpHeaders::try_parse(headers)?; + let parsed_headers = HttpHeaders::try_parse(&parts.headers)?; - let request_content_length = match request.body().size_hint().upper() { + let request_content_length = match body.size_hint().upper() { Some(v) => v, None => config.http_config.max_request_size_bytes + 1, }; @@ -607,20 +606,19 @@ async fn handle_db_inner( )); } - let mut arena = Arena::default(); - - let fetch_and_process_request = Box::pin(async { + let fetch_and_process_request = Box::pin(async move { + let mut arena = Arena::default(); let seed = SerdeArena { arena: &mut arena, _t: PhantomData::, }; let payload = parse_json_body_with_limit( seed, - request.into_body(), + body, config.http_config.max_request_size_bytes as usize, ) .await?; - Ok::(payload) // Adjust error type accordingly + Ok::<(Arena, Payload), SqlOverHttpError>((arena, payload)) // Adjust error type accordingly }); let authenticate_and_connect = Box::pin( @@ -664,7 +662,7 @@ async fn handle_db_inner( .map_err(SqlOverHttpError::from), ); - let (payload, mut client) = match run_until_cancelled( + let ((mut arena, payload), mut client) = match run_until_cancelled( // Run both operations in parallel try_join( pin!(fetch_and_process_request), @@ -711,6 +709,13 @@ async fn handle_db_inner( } }; + info!( + str_len = arena.str_arena.len(), + params = arena.params_arena.len(), + response = json_output.len(), + "data size" + ); + let metrics = client.metrics(); let len = json_output.len(); @@ -822,10 +827,7 @@ impl QueryData { // The query successfully completed. Either::Left((Ok((status, results)), __not_yet_cancelled)) => { discard.check_idle(status); - - let json_output = - serde_json::to_string(&results).expect("json serialization should not fail"); - Ok(json_output) + Ok(results) } // The query failed with an error Either::Left((Err(e), __not_yet_cancelled)) => { @@ -958,7 +960,8 @@ async fn query_batch( queries: BatchQueryData, parsed_headers: HttpHeaders, ) -> Result { - let mut result_bytes = br#"{"results":["#.to_vec(); + let mut comma = false; + let mut results = r#"{"results":["#.to_string(); let mut current_size = 0; for stmt in queries.queries { @@ -975,12 +978,11 @@ async fn query_batch( match res { // TODO: maybe we should check that the transaction bit is set here Either::Left((Ok((_, values)), _cancelled)) => { - if !result_bytes.is_empty() { - result_bytes.push(b','); + if comma { + results.push(','); } - values - .serialize(&mut serde_json::Serializer::new(&mut result_bytes)) - .expect("serializing results to memory should always succeed"); + results.push_str(&values); + comma = true; } Either::Left((Err(e), _cancelled)) => { return Err(e); @@ -991,12 +993,9 @@ async fn query_batch( } } - result_bytes.extend_from_slice(b"]}"); + results.push_str("]}"); - let json_output = - String::from_utf8(result_bytes).expect("JSON serialization should always be utf8"); - - Ok(json_output) + Ok(results) } async fn query_to_json( @@ -1006,7 +1005,7 @@ async fn query_to_json( data: QueryData, current_size: &mut usize, parsed_headers: HttpHeaders, -) -> Result<(ReadyForQueryStatus, impl Serialize), SqlOverHttpError> { +) -> Result<(ReadyForQueryStatus, String), SqlOverHttpError> { info!("executing query"); let query_params = arena.params_arena[data.params.into_range()] @@ -1084,12 +1083,13 @@ async fn query_to_json( // Resulting JSON format is based on the format of node-postgres result. let results = json!({ - "command": command_tag_name.to_string(), + "command": command_tag_name, "rowCount": command_tag_count, "rows": rows, "fields": fields, "rowAsArray": array_mode, - }); + }) + .to_string(); Ok((ready, results)) }