compute_ctl: database_schema should keep process::Child as part of returned value (#10273)

## Problem

/database_schema endpoint returns incomplete output from `pg_dump`

## Summary of changes

The Tokio process was not used properly. The returned stream does not
include `process::Child`, and the process is scheduled to be killed
immediately after the `get_database_schema` call when `cmd` goes out of
scope.

The solution in this PR is to return a special Stream implementation
that retains `process::Child`.
This commit is contained in:
Andrew Rudenko
2025-02-11 08:02:13 +01:00
committed by GitHub
parent 98883e4b30
commit 4ab18444ec
2 changed files with 31 additions and 2 deletions

View File

@@ -140,5 +140,34 @@ pub async fn get_database_schema(
warn!("pg_dump stderr: {}", line)
}
});
Ok(initial_stream.chain(stdout_reader.map(|res| res.map(|b| b.freeze()))))
#[allow(dead_code)]
struct SchemaStream<S> {
// We keep a reference to the child process to ensure it stays alive
// while the stream is being consumed. When SchemaStream is dropped,
// cmd will be dropped, which triggers kill_on_drop and terminates pg_dump
cmd: tokio::process::Child,
stream: S,
}
impl<S> Stream for SchemaStream<S>
where
S: Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin,
{
type Item = Result<bytes::Bytes, std::io::Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Stream::poll_next(std::pin::Pin::new(&mut self.stream), cx)
}
}
let schema_stream = SchemaStream {
cmd,
stream: initial_stream.chain(stdout_reader.map(|res| res.map(|b| b.freeze()))),
};
Ok(schema_stream)
}

View File

@@ -82,7 +82,7 @@ def test_compute_catalog(neon_simple_env: NeonEnv):
ddl = client.database_schema(database=test_db["name"])
# Check that it looks like a valid PostgreSQL dump
assert "-- PostgreSQL database dump" in ddl
assert "-- PostgreSQL database dump complete" in ddl
# Check that it doesn't contain health_check and migration traces.
# They are only created in system `postgres` database, so by checking