mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
Fix check for trailing garbage in basebackup import.
There was a warning for trailing garbage after end-of-tar archive, but it didn't always work. The reason is that we created a StreamReader over the original copyin-stream, but performed the check for garbage on the copyin-stream. There could be some garbage bytes buffered in the StreamReader, which were not caught by the warning. I considered turning the the warning into a fatal error, aborting the import, but I wasn't sure if we handle aborting the import properly. Do we clean up the timeline directory on error? If we don't, we should make that more robust, but that's a different story. Also, normally a valid tar archive ends with two 512-byte blocks of zeros. The tokio_tar crate stops at the first all-zeros block. Read and check the second all-zeros block, and error out if it's not there, or contains something unexpected.
This commit is contained in:
committed by
Heikki Linnakangas
parent
5396273541
commit
4a92799f24
@@ -12,7 +12,7 @@
|
||||
use anyhow::Context;
|
||||
use bytes::Buf;
|
||||
use bytes::Bytes;
|
||||
use futures::{Stream, StreamExt};
|
||||
use futures::Stream;
|
||||
use pageserver_api::models::TenantState;
|
||||
use pageserver_api::models::{
|
||||
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
|
||||
@@ -31,6 +31,7 @@ use std::str;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio_util::io::StreamReader;
|
||||
use tracing::*;
|
||||
use utils::id::ConnectionId;
|
||||
use utils::{
|
||||
@@ -115,6 +116,49 @@ fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream<Item = io::Result<
|
||||
}
|
||||
}
|
||||
|
||||
/// Read the end of a tar archive.
|
||||
///
|
||||
/// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
|
||||
/// `tokio_tar` already read the first such block. Read the second all-zeros block,
|
||||
/// and check that there is no more data after the EOF marker.
|
||||
///
|
||||
/// XXX: Currently, any trailing data after the EOF marker prints a warning.
|
||||
/// Perhaps it should be a hard error?
|
||||
async fn read_tar_eof(mut reader: (impl tokio::io::AsyncRead + Unpin)) -> anyhow::Result<()> {
|
||||
use tokio::io::AsyncReadExt;
|
||||
let mut buf = [0u8; 512];
|
||||
|
||||
// Read the all-zeros block, and verify it
|
||||
let mut total_bytes = 0;
|
||||
while total_bytes < 512 {
|
||||
let nbytes = reader.read(&mut buf[total_bytes..]).await?;
|
||||
total_bytes += nbytes;
|
||||
if nbytes == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if total_bytes < 512 {
|
||||
anyhow::bail!("incomplete or invalid tar EOF marker");
|
||||
}
|
||||
if !buf.iter().all(|&x| x == 0) {
|
||||
anyhow::bail!("invalid tar EOF marker");
|
||||
}
|
||||
|
||||
// Drain any data after the EOF marker
|
||||
let mut trailing_bytes = 0;
|
||||
loop {
|
||||
let nbytes = reader.read(&mut buf).await?;
|
||||
trailing_bytes += nbytes;
|
||||
if nbytes == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if trailing_bytes > 0 {
|
||||
warn!("ignored {trailing_bytes} unexpected bytes after the tar archive");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
///
|
||||
@@ -422,19 +466,14 @@ impl PageServerHandler {
|
||||
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
|
||||
pgb.flush().await?;
|
||||
|
||||
let mut copyin_stream = Box::pin(copyin_stream(pgb));
|
||||
let copyin_reader = StreamReader::new(copyin_stream(pgb));
|
||||
tokio::pin!(copyin_reader);
|
||||
timeline
|
||||
.import_basebackup_from_tar(&mut copyin_stream, base_lsn, &ctx)
|
||||
.import_basebackup_from_tar(&mut copyin_reader, base_lsn, &ctx)
|
||||
.await?;
|
||||
|
||||
// Drain the rest of the Copy data
|
||||
let mut bytes_after_tar = 0;
|
||||
while let Some(bytes) = copyin_stream.next().await {
|
||||
bytes_after_tar += bytes?.len();
|
||||
}
|
||||
if bytes_after_tar > 0 {
|
||||
warn!("ignored {bytes_after_tar} unexpected bytes after the tar archive");
|
||||
}
|
||||
// Read the end of the tar archive.
|
||||
read_tar_eof(copyin_reader).await?;
|
||||
|
||||
// TODO check checksum
|
||||
// Meanwhile you can verify client-side by taking fullbackup
|
||||
@@ -473,19 +512,13 @@ impl PageServerHandler {
|
||||
info!("importing wal");
|
||||
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
|
||||
pgb.flush().await?;
|
||||
let mut copyin_stream = Box::pin(copyin_stream(pgb));
|
||||
let mut reader = tokio_util::io::StreamReader::new(&mut copyin_stream);
|
||||
import_wal_from_tar(&timeline, &mut reader, start_lsn, end_lsn, &ctx).await?;
|
||||
let copyin_reader = StreamReader::new(copyin_stream(pgb));
|
||||
tokio::pin!(copyin_reader);
|
||||
import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
|
||||
info!("wal import complete");
|
||||
|
||||
// Drain the rest of the Copy data
|
||||
let mut bytes_after_tar = 0;
|
||||
while let Some(bytes) = copyin_stream.next().await {
|
||||
bytes_after_tar += bytes?.len();
|
||||
}
|
||||
if bytes_after_tar > 0 {
|
||||
warn!("ignored {bytes_after_tar} unexpected bytes after the tar archive");
|
||||
}
|
||||
// Read the end of the tar archive.
|
||||
read_tar_eof(copyin_reader).await?;
|
||||
|
||||
// TODO Does it make sense to overshoot?
|
||||
if timeline.get_last_record_lsn() < end_lsn {
|
||||
|
||||
@@ -12,9 +12,7 @@
|
||||
//!
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use bytes::Bytes;
|
||||
use futures::FutureExt;
|
||||
use futures::Stream;
|
||||
use pageserver_api::models::TimelineState;
|
||||
use remote_storage::DownloadError;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
@@ -239,14 +237,13 @@ impl UninitializedTimeline<'_> {
|
||||
/// Prepares timeline data by loading it from the basebackup archive.
|
||||
pub async fn import_basebackup_from_tar(
|
||||
self,
|
||||
copyin_stream: &mut (impl Stream<Item = io::Result<Bytes>> + Sync + Send + Unpin),
|
||||
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
|
||||
base_lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let raw_timeline = self.raw_timeline()?;
|
||||
|
||||
let mut reader = tokio_util::io::StreamReader::new(copyin_stream);
|
||||
import_datadir::import_basebackup_from_tar(raw_timeline, &mut reader, base_lsn, ctx)
|
||||
import_datadir::import_basebackup_from_tar(raw_timeline, copyin_read, base_lsn, ctx)
|
||||
.await
|
||||
.context("Failed to import basebackup")?;
|
||||
|
||||
|
||||
@@ -61,6 +61,12 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
cwd=unpacked_base,
|
||||
)
|
||||
|
||||
# Make copy of base.tar and append some garbage to it.
|
||||
base_plus_garbage_tar = os.path.join(basebackup_dir, "base-plus-garbage.tar")
|
||||
shutil.copyfile(base_tar, base_plus_garbage_tar)
|
||||
with open(base_plus_garbage_tar, "a") as f:
|
||||
f.write("trailing garbage")
|
||||
|
||||
# Get start_lsn and end_lsn
|
||||
with open(os.path.join(basebackup_dir, "backup_manifest")) as f:
|
||||
manifest = json.load(f)
|
||||
@@ -74,7 +80,8 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
# Set up pageserver for import
|
||||
neon_env_builder.enable_local_fs_remote_storage()
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.http_client().tenant_create(tenant)
|
||||
client = env.pageserver.http_client()
|
||||
client.tenant_create(tenant)
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
@@ -85,6 +92,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
".*InternalServerError.*Tenant .* not found.*",
|
||||
".*InternalServerError.*Timeline .* not found.*",
|
||||
".*InternalServerError.*Cannot delete timeline which has child timelines.*",
|
||||
".*ignored .* unexpected bytes after the tar archive.*",
|
||||
]
|
||||
)
|
||||
|
||||
@@ -130,11 +138,18 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
with pytest.raises(Exception):
|
||||
import_tar(corrupt_base_tar, wal_tar)
|
||||
|
||||
# A tar with trailing garbage is currently accepted. It prints a warnings
|
||||
# to the pageserver log, however. Check that.
|
||||
import_tar(base_plus_garbage_tar, wal_tar)
|
||||
assert env.pageserver.log_contains(
|
||||
".*WARN.*ignored .* unexpected bytes after the tar archive.*"
|
||||
)
|
||||
client.timeline_delete(tenant, timeline)
|
||||
|
||||
# Importing correct backup works
|
||||
import_tar(base_tar, wal_tar)
|
||||
|
||||
# Wait for data to land in s3
|
||||
client = env.pageserver.http_client()
|
||||
wait_for_last_record_lsn(client, tenant, timeline, Lsn(end_lsn))
|
||||
wait_for_upload(client, tenant, timeline, Lsn(end_lsn))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user