From 4a92799f24a6b470b3ba1bddebf228589fca5ba2 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 14 Mar 2023 13:10:00 +0200 Subject: [PATCH] Fix check for trailing garbage in basebackup import. There was a warning for trailing garbage after end-of-tar archive, but it didn't always work. The reason is that we created a StreamReader over the original copyin-stream, but performed the check for garbage on the copyin-stream. There could be some garbage bytes buffered in the StreamReader, which were not caught by the warning. I considered turning the the warning into a fatal error, aborting the import, but I wasn't sure if we handle aborting the import properly. Do we clean up the timeline directory on error? If we don't, we should make that more robust, but that's a different story. Also, normally a valid tar archive ends with two 512-byte blocks of zeros. The tokio_tar crate stops at the first all-zeros block. Read and check the second all-zeros block, and error out if it's not there, or contains something unexpected. --- pageserver/src/page_service.rs | 77 +++++++++++++++++++++--------- pageserver/src/tenant.rs | 7 +-- test_runner/regress/test_import.py | 19 +++++++- 3 files changed, 74 insertions(+), 29 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index dc9bf955f7..aad6099952 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -12,7 +12,7 @@ use anyhow::Context; use bytes::Buf; use bytes::Bytes; -use futures::{Stream, StreamExt}; +use futures::Stream; use pageserver_api::models::TenantState; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, @@ -31,6 +31,7 @@ use std::str; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use tokio_util::io::StreamReader; use tracing::*; use utils::id::ConnectionId; use utils::{ @@ -115,6 +116,49 @@ fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream anyhow::Result<()> { + use tokio::io::AsyncReadExt; + let mut buf = [0u8; 512]; + + // Read the all-zeros block, and verify it + let mut total_bytes = 0; + while total_bytes < 512 { + let nbytes = reader.read(&mut buf[total_bytes..]).await?; + total_bytes += nbytes; + if nbytes == 0 { + break; + } + } + if total_bytes < 512 { + anyhow::bail!("incomplete or invalid tar EOF marker"); + } + if !buf.iter().all(|&x| x == 0) { + anyhow::bail!("invalid tar EOF marker"); + } + + // Drain any data after the EOF marker + let mut trailing_bytes = 0; + loop { + let nbytes = reader.read(&mut buf).await?; + trailing_bytes += nbytes; + if nbytes == 0 { + break; + } + } + if trailing_bytes > 0 { + warn!("ignored {trailing_bytes} unexpected bytes after the tar archive"); + } + Ok(()) +} + /////////////////////////////////////////////////////////////////////////////// /// @@ -422,19 +466,14 @@ impl PageServerHandler { pgb.write_message_noflush(&BeMessage::CopyInResponse)?; pgb.flush().await?; - let mut copyin_stream = Box::pin(copyin_stream(pgb)); + let copyin_reader = StreamReader::new(copyin_stream(pgb)); + tokio::pin!(copyin_reader); timeline - .import_basebackup_from_tar(&mut copyin_stream, base_lsn, &ctx) + .import_basebackup_from_tar(&mut copyin_reader, base_lsn, &ctx) .await?; - // Drain the rest of the Copy data - let mut bytes_after_tar = 0; - while let Some(bytes) = copyin_stream.next().await { - bytes_after_tar += bytes?.len(); - } - if bytes_after_tar > 0 { - warn!("ignored {bytes_after_tar} unexpected bytes after the tar archive"); - } + // Read the end of the tar archive. + read_tar_eof(copyin_reader).await?; // TODO check checksum // Meanwhile you can verify client-side by taking fullbackup @@ -473,19 +512,13 @@ impl PageServerHandler { info!("importing wal"); pgb.write_message_noflush(&BeMessage::CopyInResponse)?; pgb.flush().await?; - let mut copyin_stream = Box::pin(copyin_stream(pgb)); - let mut reader = tokio_util::io::StreamReader::new(&mut copyin_stream); - import_wal_from_tar(&timeline, &mut reader, start_lsn, end_lsn, &ctx).await?; + let copyin_reader = StreamReader::new(copyin_stream(pgb)); + tokio::pin!(copyin_reader); + import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?; info!("wal import complete"); - // Drain the rest of the Copy data - let mut bytes_after_tar = 0; - while let Some(bytes) = copyin_stream.next().await { - bytes_after_tar += bytes?.len(); - } - if bytes_after_tar > 0 { - warn!("ignored {bytes_after_tar} unexpected bytes after the tar archive"); - } + // Read the end of the tar archive. + read_tar_eof(copyin_reader).await?; // TODO Does it make sense to overshoot? if timeline.get_last_record_lsn() < end_lsn { diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 1fb312fe07..5f1e23b873 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -12,9 +12,7 @@ //! use anyhow::{bail, Context}; -use bytes::Bytes; use futures::FutureExt; -use futures::Stream; use pageserver_api::models::TimelineState; use remote_storage::DownloadError; use remote_storage::GenericRemoteStorage; @@ -239,14 +237,13 @@ impl UninitializedTimeline<'_> { /// Prepares timeline data by loading it from the basebackup archive. pub async fn import_basebackup_from_tar( self, - copyin_stream: &mut (impl Stream> + Sync + Send + Unpin), + copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin), base_lsn: Lsn, ctx: &RequestContext, ) -> anyhow::Result> { let raw_timeline = self.raw_timeline()?; - let mut reader = tokio_util::io::StreamReader::new(copyin_stream); - import_datadir::import_basebackup_from_tar(raw_timeline, &mut reader, base_lsn, ctx) + import_datadir::import_basebackup_from_tar(raw_timeline, copyin_read, base_lsn, ctx) .await .context("Failed to import basebackup")?; diff --git a/test_runner/regress/test_import.py b/test_runner/regress/test_import.py index 0388e24e98..1dc10fbf4f 100644 --- a/test_runner/regress/test_import.py +++ b/test_runner/regress/test_import.py @@ -61,6 +61,12 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build cwd=unpacked_base, ) + # Make copy of base.tar and append some garbage to it. + base_plus_garbage_tar = os.path.join(basebackup_dir, "base-plus-garbage.tar") + shutil.copyfile(base_tar, base_plus_garbage_tar) + with open(base_plus_garbage_tar, "a") as f: + f.write("trailing garbage") + # Get start_lsn and end_lsn with open(os.path.join(basebackup_dir, "backup_manifest")) as f: manifest = json.load(f) @@ -74,7 +80,8 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build # Set up pageserver for import neon_env_builder.enable_local_fs_remote_storage() env = neon_env_builder.init_start() - env.pageserver.http_client().tenant_create(tenant) + client = env.pageserver.http_client() + client.tenant_create(tenant) env.pageserver.allowed_errors.extend( [ @@ -85,6 +92,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build ".*InternalServerError.*Tenant .* not found.*", ".*InternalServerError.*Timeline .* not found.*", ".*InternalServerError.*Cannot delete timeline which has child timelines.*", + ".*ignored .* unexpected bytes after the tar archive.*", ] ) @@ -130,11 +138,18 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build with pytest.raises(Exception): import_tar(corrupt_base_tar, wal_tar) + # A tar with trailing garbage is currently accepted. It prints a warnings + # to the pageserver log, however. Check that. + import_tar(base_plus_garbage_tar, wal_tar) + assert env.pageserver.log_contains( + ".*WARN.*ignored .* unexpected bytes after the tar archive.*" + ) + client.timeline_delete(tenant, timeline) + # Importing correct backup works import_tar(base_tar, wal_tar) # Wait for data to land in s3 - client = env.pageserver.http_client() wait_for_last_record_lsn(client, tenant, timeline, Lsn(end_lsn)) wait_for_upload(client, tenant, timeline, Lsn(end_lsn))