compute_ctl: catalog API endpoints (#7575)

## Problem

There are two cloud's features that require extra compute endpoints.

1. We are running pg_dump to get DB schemas. Currently, we are using a
special service for this. But it would be great to execute pg_dump in an
isolated environment. And we already have such an environment, it's our
compute! And likely enough pg_dump already exists there too! (see
https://github.com/neondatabase/cloud/issues/11644#issuecomment-2084617832)
2. We need to have a way to get databases and roles from compute after
time travel (see https://github.com/neondatabase/cloud/issues/12109)

## Summary of changes

It adds two API endpoints to compute_ctl HTTP API that target both of
the aforementioned cases.

---------

Co-authored-by: Tristan Partin <tristan@neon.tech>
This commit is contained in:
Andrew Rudenko
2024-05-16 12:04:16 +02:00
committed by GitHub
parent 03c6039707
commit 923cf91aa4
11 changed files with 352 additions and 1 deletions

View File

@@ -0,0 +1,116 @@
use compute_api::{
responses::CatalogObjects,
spec::{Database, Role},
};
use futures::Stream;
use postgres::{Client, NoTls};
use std::{path::Path, process::Stdio, result::Result, sync::Arc};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command,
task,
};
use tokio_stream::{self as stream, StreamExt};
use tokio_util::codec::{BytesCodec, FramedRead};
use tracing::warn;
use crate::{
compute::ComputeNode,
pg_helpers::{get_existing_dbs, get_existing_roles},
};
pub async fn get_dbs_and_roles(compute: &Arc<ComputeNode>) -> anyhow::Result<CatalogObjects> {
let connstr = compute.connstr.clone();
task::spawn_blocking(move || {
let mut client = Client::connect(connstr.as_str(), NoTls)?;
let roles: Vec<Role>;
{
let mut xact = client.transaction()?;
roles = get_existing_roles(&mut xact)?;
}
let databases: Vec<Database> = get_existing_dbs(&mut client)?.values().cloned().collect();
Ok(CatalogObjects { roles, databases })
})
.await?
}
#[derive(Debug, thiserror::Error)]
pub enum SchemaDumpError {
#[error("Database does not exist.")]
DatabaseDoesNotExist,
#[error("Failed to execute pg_dump.")]
IO(#[from] std::io::Error),
}
// It uses the pg_dump utility to dump the schema of the specified database.
// The output is streamed back to the caller and supposed to be streamed via HTTP.
//
// Before return the result with the output, it checks that pg_dump produced any output.
// If not, it tries to parse the stderr output to determine if the database does not exist
// and special error is returned.
//
// To make sure that the process is killed when the caller drops the stream, we use tokio kill_on_drop feature.
pub async fn get_database_schema(
compute: &Arc<ComputeNode>,
dbname: &str,
) -> Result<impl Stream<Item = Result<bytes::Bytes, std::io::Error>>, SchemaDumpError> {
let pgbin = &compute.pgbin;
let basepath = Path::new(pgbin).parent().unwrap();
let pgdump = basepath.join("pg_dump");
let mut connstr = compute.connstr.clone();
connstr.set_path(dbname);
let mut cmd = Command::new(pgdump)
.arg("--schema-only")
.arg(connstr.as_str())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.spawn()?;
let stdout = cmd.stdout.take().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::Other, "Failed to capture stdout.")
})?;
let stderr = cmd.stderr.take().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::Other, "Failed to capture stderr.")
})?;
let mut stdout_reader = FramedRead::new(stdout, BytesCodec::new());
let stderr_reader = BufReader::new(stderr);
let first_chunk = match stdout_reader.next().await {
Some(Ok(bytes)) if !bytes.is_empty() => bytes,
Some(Err(e)) => {
return Err(SchemaDumpError::IO(e));
}
_ => {
let mut lines = stderr_reader.lines();
if let Some(line) = lines.next_line().await? {
if line.contains(&format!("FATAL: database \"{}\" does not exist", dbname)) {
return Err(SchemaDumpError::DatabaseDoesNotExist);
}
warn!("pg_dump stderr: {}", line)
}
tokio::spawn(async move {
while let Ok(Some(line)) = lines.next_line().await {
warn!("pg_dump stderr: {}", line)
}
});
return Err(SchemaDumpError::IO(std::io::Error::new(
std::io::ErrorKind::Other,
"failed to start pg_dump",
)));
}
};
let initial_stream = stream::once(Ok(first_chunk.freeze()));
// Consume stderr and log warnings
tokio::spawn(async move {
let mut lines = stderr_reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
warn!("pg_dump stderr: {}", line)
}
});
Ok(initial_stream.chain(stdout_reader.map(|res| res.map(|b| b.freeze()))))
}

View File

@@ -5,17 +5,21 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;
use crate::catalog::SchemaDumpError;
use crate::catalog::{get_database_schema, get_dbs_and_roles};
use crate::compute::forward_termination_signal;
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
use anyhow::Result;
use hyper::header::CONTENT_TYPE;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use tokio::task;
use tracing::{error, info, warn};
use tracing_utils::http::OtelName;
use utils::http::request::must_get_query_param;
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
ComputeStatusResponse {
@@ -133,6 +137,34 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
(&Method::GET, "/dbs_and_roles") => {
info!("serving /dbs_and_roles GET request",);
match get_dbs_and_roles(compute).await {
Ok(res) => render_json(Body::from(serde_json::to_string(&res).unwrap())),
Err(_) => {
render_json_error("can't get dbs and roles", StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
(&Method::GET, "/database_schema") => {
let database = match must_get_query_param(&req, "database") {
Err(e) => return e.into_response(),
Ok(database) => database,
};
info!("serving /database_schema GET request with database: {database}",);
match get_database_schema(compute, &database).await {
Ok(res) => render_plain(Body::wrap_stream(res)),
Err(SchemaDumpError::DatabaseDoesNotExist) => {
render_json_error("database does not exist", StatusCode::NOT_FOUND)
}
Err(e) => {
error!("can't get schema dump: {}", e);
render_json_error("can't get schema dump", StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
// download extension files from remote extension storage on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
@@ -303,10 +335,25 @@ fn render_json_error(e: &str, status: StatusCode) -> Response<Body> {
};
Response::builder()
.status(status)
.header(CONTENT_TYPE, "application/json")
.body(Body::from(serde_json::to_string(&error).unwrap()))
.unwrap()
}
fn render_json(body: Body) -> Response<Body> {
Response::builder()
.header(CONTENT_TYPE, "application/json")
.body(body)
.unwrap()
}
fn render_plain(body: Body) -> Response<Body> {
Response::builder()
.header(CONTENT_TYPE, "text/plain")
.body(body)
.unwrap()
}
async fn handle_terminate_request(compute: &Arc<ComputeNode>) -> Result<(), (String, StatusCode)> {
{
let mut state = compute.state.lock().unwrap();

View File

@@ -68,6 +68,51 @@ paths:
schema:
$ref: "#/components/schemas/Info"
/dbs_and_roles:
get:
tags:
- Info
summary: Get databases and roles in the catalog.
description: ""
operationId: getDbsAndRoles
responses:
200:
description: Compute schema objects
content:
application/json:
schema:
$ref: "#/components/schemas/DbsAndRoles"
/database_schema:
get:
tags:
- Info
summary: Get schema dump
parameters:
- name: database
in: query
description: Database name to dump.
required: true
schema:
type: string
example: "postgres"
description: Get schema dump in SQL format.
operationId: getDatabaseSchema
responses:
200:
description: Schema dump
content:
text/plain:
schema:
type: string
description: Schema dump in SQL format.
404:
description: Non existing database.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
/check_writability:
post:
tags:
@@ -229,6 +274,73 @@ components:
num_cpus:
type: integer
DbsAndRoles:
type: object
description: Databases and Roles
required:
- roles
- databases
properties:
roles:
type: array
items:
$ref: "#/components/schemas/Role"
databases:
type: array
items:
$ref: "#/components/schemas/Database"
Database:
type: object
description: Database
required:
- name
- owner
- restrict_conn
- invalid
properties:
name:
type: string
owner:
type: string
options:
type: array
items:
$ref: "#/components/schemas/GenericOption"
restrict_conn:
type: boolean
invalid:
type: boolean
Role:
type: object
description: Role
required:
- name
properties:
name:
type: string
encrypted_password:
type: string
options:
type: array
items:
$ref: "#/components/schemas/GenericOption"
GenericOption:
type: object
description: Schema Generic option
required:
- name
- vartype
properties:
name:
type: string
value:
type: string
vartype:
type: string
ComputeState:
type: object
required:

View File

@@ -8,6 +8,7 @@ pub mod configurator;
pub mod http;
#[macro_use]
pub mod logger;
pub mod catalog;
pub mod compute;
pub mod extension_server;
pub mod monitor;