From 8b44f5b479df49b2f6ad412fa6aeb3a69c104eb4 Mon Sep 17 00:00:00 2001 From: Ruslan Talpa Date: Tue, 24 Jun 2025 17:03:42 +0300 Subject: [PATCH] subzero integration WIP5 extract the response body from the local proxy response --- proxy/src/serverless/rest.rs | 90 ++++++++++++++++++++++++++++++++---- 1 file changed, 82 insertions(+), 8 deletions(-) diff --git a/proxy/src/serverless/rest.rs b/proxy/src/serverless/rest.rs index f453e44ae9..9a26a84b72 100644 --- a/proxy/src/serverless/rest.rs +++ b/proxy/src/serverless/rest.rs @@ -43,7 +43,7 @@ use crate::types::{DbName, RoleName}; use subzero_core::{ api::{ApiRequest, ApiResponse, ContentType::*, SingleVal, ListVal, Payload}, - error::Error::{self as SubzeroCoreError, SingularityError, PutMatchingPkError, PermissionDenied, JsonDeserialize, NotFound, JwtTokenInvalid,}, + error::Error::{self as SubzeroCoreError, SingularityError, PutMatchingPkError, PermissionDenied, JsonDeserialize, NotFound, JwtTokenInvalid, InternalError}, schema::DbSchema, formatter::{ Param, @@ -1015,19 +1015,93 @@ async fn handle_rest_inner( // todo: map body to count egress let _metrics = client.metrics(ctx); - Ok(client + let response = client .inner .inner .send_request(req) .await .map_err(LocalProxyConnError::from) - .map_err(HttpConnError::from)? - .map(|b| b.boxed())) + .map_err(HttpConnError::from)?; - // Ok(Response::builder() - // .status(StatusCode::OK) - // .body(body) - // .unwrap()) + // Capture the response body + let response_body = response + .collect() + .await + .map_err(ReadPayloadError::from)? + .to_bytes(); + + // Parse the JSON response and extract the body content efficiently + //let body_string = { + let mut response_json: serde_json::Value = serde_json::from_slice(&response_body) + .map_err(|e| RestError::SubzeroCore(JsonDeserialize { source: e }))?; + + println!("Response from local proxy: {:?}", response_json); + + // Extract the second query result (main query) + let results = response_json["results"].as_array_mut() + .ok_or_else(|| RestError::SubzeroCore(InternalError { + message: "Missing 'results' array".to_string() + }))?; + + if results.len() < 2 { + return Err(RestError::SubzeroCore(InternalError { + message: "Expected at least 2 results".to_string() + })); + } + + let second_result = &mut results[1]; + let rows = second_result["rows"].as_array_mut() + .ok_or_else(|| RestError::SubzeroCore(InternalError { + message: "Missing 'rows' array in second result".to_string() + }))?; + + if rows.is_empty() { + return Err(RestError::SubzeroCore(InternalError { + message: "No rows in second result".to_string() + })); + } + + // Extract columns from the first (and only) row + let row = &mut rows[0]; + + + // Extract the owned String directly from the JsonValue (without copying) + let body_string = match row["body"].take() { + JsonValue::String(s) => s, + _ => { + return Err(RestError::SubzeroCore(InternalError { + message: "Missing 'body' field".to_string() + })); + } + }; + let page_total = row["page_total"].as_str(); + let total_result_set = row["total_result_set"].as_str(); + let constraints_satisfied = row["constraints_satisfied"].as_bool() + .unwrap_or(true); + let response_headers = row["response_headers"].as_str(); + let response_status = row["response_status"].as_str(); + + println!("Extracted columns:"); + println!(" page_total: {:?}", page_total); + println!(" total_result_set: {:?}", total_result_set); + println!(" constraints_satisfied: {:?}", constraints_satisfied); + println!(" response_headers: {:?}", response_headers); + println!(" response_status: {:?}", response_status); + println!(" body: {:?}", &body_string); + + //body_string + //}; + + // For now, return the body content as the response - Bytes::from(String) consumes without copying + let response_body = Full::new(Bytes::from(body_string)) + .map_err(|never| match never {}) + .boxed(); + + Ok(Response::builder() + .status(StatusCode::OK) + .header("content-type", "application/json") + .body(response_body) + .unwrap()) }