mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 17:02:56 +00:00
subzero integration WIP5
extract the response body from the local proxy response
This commit is contained in:
@@ -43,7 +43,7 @@ use crate::types::{DbName, RoleName};
|
||||
|
||||
use subzero_core::{
|
||||
api::{ApiRequest, ApiResponse, ContentType::*, SingleVal, ListVal, Payload},
|
||||
error::Error::{self as SubzeroCoreError, SingularityError, PutMatchingPkError, PermissionDenied, JsonDeserialize, NotFound, JwtTokenInvalid,},
|
||||
error::Error::{self as SubzeroCoreError, SingularityError, PutMatchingPkError, PermissionDenied, JsonDeserialize, NotFound, JwtTokenInvalid, InternalError},
|
||||
schema::DbSchema,
|
||||
formatter::{
|
||||
Param,
|
||||
@@ -1015,19 +1015,93 @@ async fn handle_rest_inner(
|
||||
// todo: map body to count egress
|
||||
let _metrics = client.metrics(ctx);
|
||||
|
||||
Ok(client
|
||||
let response = client
|
||||
.inner
|
||||
.inner
|
||||
.send_request(req)
|
||||
.await
|
||||
.map_err(LocalProxyConnError::from)
|
||||
.map_err(HttpConnError::from)?
|
||||
.map(|b| b.boxed()))
|
||||
.map_err(HttpConnError::from)?;
|
||||
|
||||
// Ok(Response::builder()
|
||||
// .status(StatusCode::OK)
|
||||
// .body(body)
|
||||
// .unwrap())
|
||||
// Capture the response body
|
||||
let response_body = response
|
||||
.collect()
|
||||
.await
|
||||
.map_err(ReadPayloadError::from)?
|
||||
.to_bytes();
|
||||
|
||||
// Parse the JSON response and extract the body content efficiently
|
||||
//let body_string = {
|
||||
let mut response_json: serde_json::Value = serde_json::from_slice(&response_body)
|
||||
.map_err(|e| RestError::SubzeroCore(JsonDeserialize { source: e }))?;
|
||||
|
||||
println!("Response from local proxy: {:?}", response_json);
|
||||
|
||||
// Extract the second query result (main query)
|
||||
let results = response_json["results"].as_array_mut()
|
||||
.ok_or_else(|| RestError::SubzeroCore(InternalError {
|
||||
message: "Missing 'results' array".to_string()
|
||||
}))?;
|
||||
|
||||
if results.len() < 2 {
|
||||
return Err(RestError::SubzeroCore(InternalError {
|
||||
message: "Expected at least 2 results".to_string()
|
||||
}));
|
||||
}
|
||||
|
||||
let second_result = &mut results[1];
|
||||
let rows = second_result["rows"].as_array_mut()
|
||||
.ok_or_else(|| RestError::SubzeroCore(InternalError {
|
||||
message: "Missing 'rows' array in second result".to_string()
|
||||
}))?;
|
||||
|
||||
if rows.is_empty() {
|
||||
return Err(RestError::SubzeroCore(InternalError {
|
||||
message: "No rows in second result".to_string()
|
||||
}));
|
||||
}
|
||||
|
||||
// Extract columns from the first (and only) row
|
||||
let row = &mut rows[0];
|
||||
|
||||
|
||||
// Extract the owned String directly from the JsonValue (without copying)
|
||||
let body_string = match row["body"].take() {
|
||||
JsonValue::String(s) => s,
|
||||
_ => {
|
||||
return Err(RestError::SubzeroCore(InternalError {
|
||||
message: "Missing 'body' field".to_string()
|
||||
}));
|
||||
}
|
||||
};
|
||||
let page_total = row["page_total"].as_str();
|
||||
let total_result_set = row["total_result_set"].as_str();
|
||||
let constraints_satisfied = row["constraints_satisfied"].as_bool()
|
||||
.unwrap_or(true);
|
||||
let response_headers = row["response_headers"].as_str();
|
||||
let response_status = row["response_status"].as_str();
|
||||
|
||||
println!("Extracted columns:");
|
||||
println!(" page_total: {:?}", page_total);
|
||||
println!(" total_result_set: {:?}", total_result_set);
|
||||
println!(" constraints_satisfied: {:?}", constraints_satisfied);
|
||||
println!(" response_headers: {:?}", response_headers);
|
||||
println!(" response_status: {:?}", response_status);
|
||||
println!(" body: {:?}", &body_string);
|
||||
|
||||
//body_string
|
||||
//};
|
||||
|
||||
// For now, return the body content as the response - Bytes::from(String) consumes without copying
|
||||
let response_body = Full::new(Bytes::from(body_string))
|
||||
.map_err(|never| match never {})
|
||||
.boxed();
|
||||
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header("content-type", "application/json")
|
||||
.body(response_body)
|
||||
.unwrap())
|
||||
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user