mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-29 11:00:38 +00:00
Compare commits
5 Commits
jcsp/issue
...
problame/w
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e0b760ad82 | ||
|
|
0d55fd8b72 | ||
|
|
082c891afc | ||
|
|
5a8af46504 | ||
|
|
21deb81acb |
@@ -11,10 +11,7 @@ use std::sync::{Arc, Barrier};
|
||||
|
||||
use bytes::{Buf, Bytes};
|
||||
use pageserver::{
|
||||
config::PageServerConf,
|
||||
repository::Key,
|
||||
walrecord::NeonWalRecord,
|
||||
walredo::{PostgresRedoManager, WalRedoError},
|
||||
config::PageServerConf, repository::Key, walrecord::NeonWalRecord, walredo::PostgresRedoManager,
|
||||
};
|
||||
use utils::{id::TenantId, lsn::Lsn};
|
||||
|
||||
@@ -152,7 +149,7 @@ impl Drop for JoinOnDrop {
|
||||
}
|
||||
}
|
||||
|
||||
fn execute_all<I>(input: I, manager: &PostgresRedoManager) -> Result<(), WalRedoError>
|
||||
fn execute_all<I>(input: I, manager: &PostgresRedoManager) -> anyhow::Result<()>
|
||||
where
|
||||
I: IntoIterator<Item = Request>,
|
||||
{
|
||||
@@ -160,7 +157,7 @@ where
|
||||
input.into_iter().try_for_each(|req| {
|
||||
let page = req.execute(manager)?;
|
||||
assert_eq!(page.remaining(), 8192);
|
||||
Ok::<_, WalRedoError>(())
|
||||
anyhow::Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -473,7 +470,7 @@ struct Request {
|
||||
}
|
||||
|
||||
impl Request {
|
||||
fn execute(self, manager: &PostgresRedoManager) -> Result<Bytes, WalRedoError> {
|
||||
fn execute(self, manager: &PostgresRedoManager) -> anyhow::Result<Bytes> {
|
||||
use pageserver::walredo::WalRedoManager;
|
||||
|
||||
let Request {
|
||||
|
||||
@@ -136,9 +136,7 @@ impl From<PageReconstructError> for ApiError {
|
||||
PageReconstructError::AncestorStopping(_) => {
|
||||
ApiError::ResourceUnavailable(format!("{pre}").into())
|
||||
}
|
||||
PageReconstructError::WalRedo(pre) => {
|
||||
ApiError::InternalServerError(anyhow::Error::new(pre))
|
||||
}
|
||||
PageReconstructError::WalRedo(pre) => ApiError::InternalServerError(pre),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,11 +29,11 @@ use std::cmp::min;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::BTreeSet;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::Debug;
|
||||
use std::fmt::Display;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io;
|
||||
use std::ops::Bound::Included;
|
||||
use std::process::Command;
|
||||
use std::process::Stdio;
|
||||
@@ -49,6 +49,7 @@ use self::config::AttachmentMode;
|
||||
use self::config::LocationConf;
|
||||
use self::config::TenantConf;
|
||||
use self::delete::DeleteTenantFlow;
|
||||
use self::metadata::LoadMetadataError;
|
||||
use self::metadata::TimelineMetadata;
|
||||
use self::mgr::TenantsMap;
|
||||
use self::remote_timeline_client::RemoteTimelineClient;
|
||||
@@ -372,13 +373,6 @@ struct RemoteStartupData {
|
||||
remote_metadata: TimelineMetadata,
|
||||
}
|
||||
|
||||
struct TimelinePreload {
|
||||
timeline_id: TimelineId,
|
||||
remote_client: Option<RemoteTimelineClient>,
|
||||
index_part: Option<IndexPart>,
|
||||
metadata: TimelineMetadata,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum WaitToBecomeActiveError {
|
||||
WillNotBecomeActive {
|
||||
@@ -419,6 +413,11 @@ pub enum CreateTimelineError {
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
struct TenantDirectoryScan {
|
||||
sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>,
|
||||
timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)>,
|
||||
}
|
||||
|
||||
enum CreateTimelineCause {
|
||||
Load,
|
||||
Delete,
|
||||
@@ -662,14 +661,41 @@ impl Tenant {
|
||||
Ok(tenant)
|
||||
}
|
||||
|
||||
fn download_indices(
|
||||
&self,
|
||||
timeline_ids: HashSet<TimelineId>,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
) -> JoinSet<Result<(TimelineId, RemoteTimelineClient, MaybeDeletedIndexPart), anyhow::Error>>
|
||||
{
|
||||
///
|
||||
/// Background task that downloads all data for a tenant and brings it to Active state.
|
||||
///
|
||||
/// No background tasks are started as part of this routine.
|
||||
///
|
||||
async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id);
|
||||
if !tokio::fs::try_exists(&marker_file)
|
||||
.await
|
||||
.context("check for existence of marker file")?
|
||||
{
|
||||
anyhow::bail!(
|
||||
"implementation error: marker file should exist at beginning of this function"
|
||||
);
|
||||
}
|
||||
|
||||
// Get list of remote timelines
|
||||
// download index files for every tenant timeline
|
||||
info!("listing remote timelines");
|
||||
|
||||
let remote_storage = self
|
||||
.remote_storage
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?;
|
||||
|
||||
let remote_timeline_ids =
|
||||
remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?;
|
||||
|
||||
info!("found {} timelines", remote_timeline_ids.len());
|
||||
|
||||
// Download & parse index parts
|
||||
let mut part_downloads = JoinSet::new();
|
||||
for timeline_id in timeline_ids {
|
||||
for timeline_id in remote_timeline_ids {
|
||||
let client = RemoteTimelineClient::new(
|
||||
remote_storage.clone(),
|
||||
self.deletion_queue_client.clone(),
|
||||
@@ -698,56 +724,11 @@ impl Tenant {
|
||||
);
|
||||
}
|
||||
|
||||
part_downloads
|
||||
}
|
||||
|
||||
/// Special variant of preload_timelines that does not rely on remote storage
|
||||
async fn preload_timelines_local(
|
||||
self: &Arc<Self>,
|
||||
timeline_ids: &HashSet<TimelineId>,
|
||||
) -> anyhow::Result<Vec<TimelinePreload>> {
|
||||
let mut preload_map = HashMap::new();
|
||||
for timeline_id in timeline_ids {
|
||||
let metadata = load_metadata(self.conf, &self.tenant_id, timeline_id)?;
|
||||
preload_map.insert(
|
||||
*timeline_id,
|
||||
TimelinePreload {
|
||||
timeline_id: *timeline_id,
|
||||
remote_client: None,
|
||||
// TODO: synthesize an index_part and make it non-optional
|
||||
index_part: None,
|
||||
metadata,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// Sort by ancestry
|
||||
Ok(
|
||||
tree_sort_timelines(preload_map, |p| p.metadata.ancestor_timeline())?
|
||||
.into_iter()
|
||||
.map(|i| i.1)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Do the remote I/O and sorting required to prepare a list of timelines
|
||||
/// with their IndexParts, ready for hydrating into `Timeline`
|
||||
async fn preload_timelines(
|
||||
self: &Arc<Self>,
|
||||
timeline_ids: HashSet<TimelineId>,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
) -> anyhow::Result<Vec<TimelinePreload>> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let mut part_downloads = self.download_indices(timeline_ids, remote_storage);
|
||||
|
||||
let mut timelines_to_resume_deletions = vec![];
|
||||
|
||||
// We construct a map all timeline's preload state, prior to sorting
|
||||
// it by ancestry at the end of the function
|
||||
let mut preload_map: HashMap<TimelineId, TimelinePreload> = HashMap::new();
|
||||
|
||||
// Wait for all the download tasks to complete & collect results.
|
||||
let mut remote_index_and_client = HashMap::new();
|
||||
let mut timeline_ancestors = HashMap::new();
|
||||
while let Some(result) = part_downloads.join_next().await {
|
||||
// NB: we already added timeline_id as context to the error
|
||||
let result: Result<_, anyhow::Error> = result.context("joinset task join")?;
|
||||
@@ -755,16 +736,8 @@ impl Tenant {
|
||||
debug!("successfully downloaded index part for timeline {timeline_id}");
|
||||
match index_part {
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => {
|
||||
let metadata = index_part.metadata.clone();
|
||||
preload_map.insert(
|
||||
timeline_id,
|
||||
TimelinePreload {
|
||||
timeline_id,
|
||||
remote_client: Some(client),
|
||||
index_part: Some(index_part),
|
||||
metadata,
|
||||
},
|
||||
);
|
||||
timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
|
||||
remote_index_and_client.insert(timeline_id, (index_part, client));
|
||||
}
|
||||
MaybeDeletedIndexPart::Deleted(index_part) => {
|
||||
info!(
|
||||
@@ -776,6 +749,35 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
// For every timeline, download the metadata file, scan the local directory,
|
||||
// and build a layer map that contains an entry for each remote and local
|
||||
// layer file.
|
||||
let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
|
||||
for (timeline_id, remote_metadata) in sorted_timelines {
|
||||
let (index_part, remote_client) = remote_index_and_client
|
||||
.remove(&timeline_id)
|
||||
.expect("just put it in above");
|
||||
|
||||
// TODO again handle early failure
|
||||
self.load_remote_timeline(
|
||||
timeline_id,
|
||||
index_part,
|
||||
remote_metadata,
|
||||
TimelineResources {
|
||||
remote_client: Some(remote_client),
|
||||
deletion_queue_client: self.deletion_queue_client.clone(),
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to load remote timeline {} for tenant {}",
|
||||
timeline_id, self.tenant_id
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
// Walk through deleted timelines, resume deletion
|
||||
for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
|
||||
remote_timeline_client
|
||||
@@ -796,81 +798,6 @@ impl Tenant {
|
||||
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
|
||||
}
|
||||
|
||||
// Sort by ancestry
|
||||
Ok(
|
||||
tree_sort_timelines(preload_map, |p| p.metadata.ancestor_timeline())?
|
||||
.into_iter()
|
||||
.map(|i| i.1)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
///
|
||||
/// Background task that downloads all data for a tenant and brings it to Active state.
|
||||
///
|
||||
/// No background tasks are started as part of this routine.
|
||||
///
|
||||
async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id);
|
||||
if !tokio::fs::try_exists(&marker_file)
|
||||
.await
|
||||
.context("check for existence of marker file")?
|
||||
{
|
||||
anyhow::bail!(
|
||||
"implementation error: marker file should exist at beginning of this function"
|
||||
);
|
||||
}
|
||||
|
||||
// Get list of remote timelines
|
||||
info!("listing remote timelines");
|
||||
|
||||
let remote_storage = self
|
||||
.remote_storage
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?;
|
||||
|
||||
let remote_timeline_ids =
|
||||
remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?;
|
||||
|
||||
info!("found {} timelines", remote_timeline_ids.len());
|
||||
|
||||
// Download & parse index parts
|
||||
let sorted_timelines = self
|
||||
.preload_timelines(remote_timeline_ids, remote_storage)
|
||||
.await?;
|
||||
|
||||
// For every timeline, download the metadata file, scan the local directory,
|
||||
// and build a layer map that contains an entry for each remote and local
|
||||
// layer file.
|
||||
for timeline_preload in sorted_timelines {
|
||||
let TimelinePreload {
|
||||
timeline_id,
|
||||
remote_client,
|
||||
index_part,
|
||||
metadata: _,
|
||||
} = timeline_preload;
|
||||
|
||||
// TODO again handle early failure
|
||||
self.load_remote_timeline(
|
||||
timeline_id,
|
||||
index_part.unwrap(),
|
||||
TimelineResources {
|
||||
remote_client,
|
||||
deletion_queue_client: self.deletion_queue_client.clone(),
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to load remote timeline {} for tenant {}",
|
||||
timeline_id, self.tenant_id
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
std::fs::remove_file(&marker_file)
|
||||
.with_context(|| format!("unlink attach marker file {marker_file}"))?;
|
||||
crashsafe::fsync(marker_file.parent().expect("marker file has parent dir"))
|
||||
@@ -903,6 +830,7 @@ impl Tenant {
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
index_part: IndexPart,
|
||||
remote_metadata: TimelineMetadata,
|
||||
resources: TimelineResources,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -913,7 +841,7 @@ impl Tenant {
|
||||
.await
|
||||
.context("Failed to create new timeline directory")?;
|
||||
|
||||
let ancestor = if let Some(ancestor_id) = index_part.metadata.ancestor_timeline() {
|
||||
let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
|
||||
|| {
|
||||
@@ -931,7 +859,6 @@ impl Tenant {
|
||||
// cannot be older than the local one
|
||||
let local_metadata = None;
|
||||
|
||||
let remote_metadata = index_part.metadata.clone();
|
||||
self.timeline_init_and_sync(
|
||||
timeline_id,
|
||||
resources,
|
||||
@@ -1105,9 +1032,12 @@ impl Tenant {
|
||||
tenant
|
||||
}
|
||||
|
||||
async fn scan_timelines_dir(self: &Arc<Tenant>) -> anyhow::Result<HashSet<TimelineId>> {
|
||||
let mut timelines_to_load: HashSet<TimelineId> = HashSet::new();
|
||||
let mut timelines_to_resume_deletion: HashSet<TimelineId> = HashSet::new();
|
||||
fn scan_and_sort_timelines_dir(self: Arc<Tenant>) -> anyhow::Result<TenantDirectoryScan> {
|
||||
let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
|
||||
// Note timelines_to_resume_deletion needs to be separate because it can be not sortable
|
||||
// from the point of `tree_sort_timelines`. I e some parents can be missing because deletion
|
||||
// completed in non topological order (for example because parent has smaller number of layer files in it)
|
||||
let mut timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)> = vec![];
|
||||
|
||||
let timelines_dir = self.conf.timelines_path(&self.tenant_id);
|
||||
|
||||
@@ -1156,7 +1086,38 @@ impl Tenant {
|
||||
})?;
|
||||
|
||||
info!("Found deletion mark for timeline {}", timeline_id);
|
||||
timelines_to_resume_deletion.insert(timeline_id);
|
||||
|
||||
match load_metadata(self.conf, &self.tenant_id, &timeline_id) {
|
||||
Ok(metadata) => {
|
||||
timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
|
||||
}
|
||||
Err(e) => match &e {
|
||||
LoadMetadataError::Read(r) => {
|
||||
if r.kind() != io::ErrorKind::NotFound {
|
||||
return Err(anyhow::anyhow!(e)).with_context(|| {
|
||||
format!("Failed to load metadata for timeline_id {timeline_id}")
|
||||
});
|
||||
}
|
||||
|
||||
// If metadata doesnt exist it means that we've crashed without
|
||||
// completing cleanup_remaining_timeline_fs_traces in DeleteTimelineFlow.
|
||||
// So save timeline_id for later call to `DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces`.
|
||||
// We cant do it here because the method is async so we'd need block_on
|
||||
// and here we're in spawn_blocking. cleanup_remaining_timeline_fs_traces uses fs operations
|
||||
// so that basically results in a cycle:
|
||||
// spawn_blocking
|
||||
// - block_on
|
||||
// - spawn_blocking
|
||||
// which can lead to running out of threads in blocing pool.
|
||||
timelines_to_resume_deletion.push((timeline_id, None));
|
||||
}
|
||||
_ => {
|
||||
return Err(anyhow::anyhow!(e)).with_context(|| {
|
||||
format!("Failed to load metadata for timeline_id {timeline_id}")
|
||||
})
|
||||
}
|
||||
},
|
||||
}
|
||||
} else {
|
||||
if !timeline_dir.exists() {
|
||||
warn!("Timeline dir entry become invalid: {timeline_dir}");
|
||||
@@ -1194,7 +1155,9 @@ impl Tenant {
|
||||
|
||||
let file_name = entry.file_name();
|
||||
if let Ok(timeline_id) = file_name.parse::<TimelineId>() {
|
||||
timelines_to_load.insert(timeline_id);
|
||||
let metadata = load_metadata(self.conf, &self.tenant_id, &timeline_id)
|
||||
.context("failed to load metadata")?;
|
||||
timelines_to_load.insert(timeline_id, metadata);
|
||||
} else {
|
||||
// A file or directory that doesn't look like a timeline ID
|
||||
warn!("unexpected file or directory in timelines directory: {file_name}");
|
||||
@@ -1202,18 +1165,14 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
for timeline_id in timelines_to_resume_deletion {
|
||||
if let Err(e) =
|
||||
DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id).await
|
||||
{
|
||||
warn!(
|
||||
"cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
|
||||
timeline_id, e
|
||||
);
|
||||
// Sort the array of timeline IDs into tree-order, so that parent comes before
|
||||
// all its children.
|
||||
tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| {
|
||||
TenantDirectoryScan {
|
||||
sorted_timelines_to_load: sorted_timelines,
|
||||
timelines_to_resume_deletion,
|
||||
}
|
||||
}
|
||||
|
||||
Ok(timelines_to_load)
|
||||
})
|
||||
}
|
||||
|
||||
///
|
||||
@@ -1236,34 +1195,24 @@ impl Tenant {
|
||||
//
|
||||
// Scan the directory, peek into the metadata file of each timeline, and
|
||||
// collect a list of timelines and their ancestors.
|
||||
let span = info_span!("blocking");
|
||||
let cloned = Arc::clone(self);
|
||||
|
||||
let local_timelines = tokio::task::spawn(async move { cloned.scan_timelines_dir().await })
|
||||
.await
|
||||
.context("load spawn_blocking")
|
||||
.and_then(|res| res)?;
|
||||
let scan = tokio::task::spawn_blocking(move || {
|
||||
let _g = span.entered();
|
||||
cloned.scan_and_sort_timelines_dir()
|
||||
})
|
||||
.await
|
||||
.context("load spawn_blocking")
|
||||
.and_then(|res| res)?;
|
||||
|
||||
let sorted_timelines = match &self.remote_storage {
|
||||
Some(remote_storage) => {
|
||||
self.preload_timelines(local_timelines, remote_storage)
|
||||
.await?
|
||||
}
|
||||
None => {
|
||||
// Deprecated mode, only used in dev.
|
||||
self.preload_timelines_local(&local_timelines).await?
|
||||
}
|
||||
};
|
||||
|
||||
for timeline_preload in sorted_timelines {
|
||||
let TimelinePreload {
|
||||
timeline_id,
|
||||
remote_client: _,
|
||||
index_part: _,
|
||||
metadata,
|
||||
} = timeline_preload;
|
||||
// FIXME original collect_timeline_files contained one more check:
|
||||
// 1. "Timeline has no ancestor and no layer files"
|
||||
|
||||
// Process loadable timelines first
|
||||
for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
|
||||
if let Err(e) = self
|
||||
.load_local_timeline(timeline_id, metadata, init_order, ctx, false)
|
||||
.load_local_timeline(timeline_id, local_metadata, init_order, ctx, false)
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
@@ -1280,6 +1229,43 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
// Resume deletion ones with deleted_mark
|
||||
for (timeline_id, maybe_local_metadata) in scan.timelines_to_resume_deletion {
|
||||
match maybe_local_metadata {
|
||||
None => {
|
||||
// See comment in `scan_and_sort_timelines_dir`.
|
||||
if let Err(e) =
|
||||
DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id)
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
"cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
|
||||
timeline_id, e
|
||||
);
|
||||
}
|
||||
}
|
||||
Some(local_metadata) => {
|
||||
if let Err(e) = self
|
||||
.load_local_timeline(timeline_id, local_metadata, init_order, ctx, true)
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
LoadLocalTimelineError::Load(source) => {
|
||||
// We tried to load deleted timeline, this is a bug.
|
||||
return Err(anyhow::anyhow!(source).context(
|
||||
"This is a bug. We tried to load deleted timeline which is wrong and loading failed. Timeline: {timeline_id}"
|
||||
));
|
||||
}
|
||||
LoadLocalTimelineError::ResumeDeletion(source) => {
|
||||
// Make sure resumed deletion wont fail loading for entire tenant.
|
||||
error!("Failed to resume timeline deletion: {source:#}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trace!("Done");
|
||||
|
||||
Ok(())
|
||||
@@ -3462,11 +3448,8 @@ pub mod harness {
|
||||
|
||||
use crate::deletion_queue::mock::MockDeletionQueue;
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
repository::Key,
|
||||
tenant::Tenant,
|
||||
walrecord::NeonWalRecord,
|
||||
walredo::{WalRedoError, WalRedoManager},
|
||||
config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
|
||||
walredo::WalRedoManager,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
@@ -3639,7 +3622,7 @@ pub mod harness {
|
||||
base_img: Option<(Lsn, Bytes)>,
|
||||
records: Vec<(Lsn, NeonWalRecord)>,
|
||||
_pg_version: u32,
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let s = format!(
|
||||
"redo for {} to get to {}, with {} and {} records",
|
||||
key,
|
||||
|
||||
@@ -370,7 +370,7 @@ pub enum PageReconstructError {
|
||||
|
||||
/// An error happened replaying WAL records
|
||||
#[error(transparent)]
|
||||
WalRedo(#[from] crate::walredo::WalRedoError),
|
||||
WalRedo(anyhow::Error),
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for PageReconstructError {
|
||||
@@ -4327,7 +4327,7 @@ impl Timeline {
|
||||
let img = match self
|
||||
.walredo_mgr
|
||||
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
|
||||
.context("Failed to reconstruct a page image:")
|
||||
.context("Failed to reconstruct a page image")
|
||||
{
|
||||
Ok(img) => img,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
//! any WAL records, so that even if an attacker hijacks the Postgres
|
||||
//! process, he cannot escape out of it.
|
||||
//!
|
||||
use anyhow::Context;
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use nix::poll::*;
|
||||
@@ -89,7 +90,7 @@ pub trait WalRedoManager: Send + Sync {
|
||||
base_img: Option<(Lsn, Bytes)>,
|
||||
records: Vec<(Lsn, NeonWalRecord)>,
|
||||
pg_version: u32,
|
||||
) -> Result<Bytes, WalRedoError>;
|
||||
) -> anyhow::Result<Bytes>;
|
||||
}
|
||||
|
||||
struct ProcessInput {
|
||||
@@ -100,6 +101,14 @@ struct ProcessInput {
|
||||
n_requests: usize,
|
||||
}
|
||||
|
||||
enum ApplyWalRecordsError {
|
||||
WithRequestNo {
|
||||
error: anyhow::Error,
|
||||
request_no: usize,
|
||||
},
|
||||
NoRequestNo(anyhow::Error),
|
||||
}
|
||||
|
||||
struct ProcessOutput {
|
||||
stdout: ChildStdout,
|
||||
pending_responses: VecDeque<Option<Bytes>>,
|
||||
@@ -140,20 +149,6 @@ fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
|
||||
}
|
||||
}
|
||||
|
||||
/// An error happened in WAL redo
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum WalRedoError {
|
||||
#[error(transparent)]
|
||||
IoError(#[from] std::io::Error),
|
||||
|
||||
#[error("cannot perform WAL redo now")]
|
||||
InvalidState,
|
||||
#[error("cannot perform WAL redo for this request")]
|
||||
InvalidRequest,
|
||||
#[error("cannot perform WAL redo for this record")]
|
||||
InvalidRecord,
|
||||
}
|
||||
|
||||
///
|
||||
/// Public interface of WAL redo manager
|
||||
///
|
||||
@@ -171,10 +166,9 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
base_img: Option<(Lsn, Bytes)>,
|
||||
records: Vec<(Lsn, NeonWalRecord)>,
|
||||
pg_version: u32,
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
) -> anyhow::Result<Bytes> {
|
||||
if records.is_empty() {
|
||||
error!("invalid WAL redo request with no records");
|
||||
return Err(WalRedoError::InvalidRequest);
|
||||
anyhow::bail!("invalid WAL redo request with no records");
|
||||
}
|
||||
|
||||
let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
|
||||
@@ -186,7 +180,13 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
|
||||
if rec_neon != batch_neon {
|
||||
let result = if batch_neon {
|
||||
self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
|
||||
self.apply_batch_neon(
|
||||
key,
|
||||
lsn,
|
||||
img,
|
||||
&records[batch_start..i],
|
||||
(batch_start, records.len()),
|
||||
)
|
||||
} else {
|
||||
self.apply_batch_postgres(
|
||||
key,
|
||||
@@ -196,6 +196,7 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
&records[batch_start..i],
|
||||
self.conf.wal_redo_timeout,
|
||||
pg_version,
|
||||
(batch_start, records.len()),
|
||||
)
|
||||
};
|
||||
img = Some(result?);
|
||||
@@ -206,7 +207,13 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
}
|
||||
// last batch
|
||||
if batch_neon {
|
||||
self.apply_batch_neon(key, lsn, img, &records[batch_start..])
|
||||
self.apply_batch_neon(
|
||||
key,
|
||||
lsn,
|
||||
img,
|
||||
&records[batch_start..],
|
||||
(batch_start, records.len()),
|
||||
)
|
||||
} else {
|
||||
self.apply_batch_postgres(
|
||||
key,
|
||||
@@ -216,6 +223,7 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
&records[batch_start..],
|
||||
self.conf.wal_redo_timeout,
|
||||
pg_version,
|
||||
(batch_start, records.len()),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -260,8 +268,9 @@ impl PostgresRedoManager {
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
wal_redo_timeout: Duration,
|
||||
pg_version: u32,
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
|
||||
diag: impl std::fmt::Debug,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
|
||||
const MAX_RETRY_ATTEMPTS: u32 = 1;
|
||||
let start_time = Instant::now();
|
||||
let mut n_attempts = 0u32;
|
||||
@@ -271,15 +280,15 @@ impl PostgresRedoManager {
|
||||
|
||||
// launch the WAL redo process on first use
|
||||
if proc.is_none() {
|
||||
self.launch(&mut proc, pg_version)?;
|
||||
self.launch(&mut proc, pg_version)
|
||||
.context("launch process")?;
|
||||
}
|
||||
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
|
||||
|
||||
// Relational WAL records are applied using wal-redo-postgres
|
||||
let buf_tag = BufferTag { rel, blknum };
|
||||
let result = self
|
||||
.apply_wal_records(proc, buf_tag, &base_img, records, wal_redo_timeout)
|
||||
.map_err(WalRedoError::IoError);
|
||||
let result =
|
||||
self.apply_wal_records(proc, buf_tag, &base_img, records, wal_redo_timeout);
|
||||
|
||||
let end_time = Instant::now();
|
||||
let duration = end_time.duration_since(lock_time);
|
||||
@@ -298,26 +307,35 @@ impl PostgresRedoManager {
|
||||
WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
|
||||
|
||||
debug!(
|
||||
"postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
|
||||
"postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {} diag={:?}",
|
||||
len,
|
||||
nbytes,
|
||||
duration.as_micros(),
|
||||
lsn
|
||||
lsn,
|
||||
diag,
|
||||
);
|
||||
|
||||
// If something went wrong, don't try to reuse the process. Kill it, and
|
||||
// next request will launch a new one.
|
||||
if let Err(e) = result.as_ref() {
|
||||
let (e, request_no) = match e {
|
||||
ApplyWalRecordsError::WithRequestNo { error, request_no } => {
|
||||
(error, Some(*request_no))
|
||||
}
|
||||
ApplyWalRecordsError::NoRequestNo(e) => (e, None),
|
||||
};
|
||||
error!(
|
||||
n_attempts,
|
||||
"error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {}: {}",
|
||||
"error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {} n_attempts={} request_no={:?} {:?}: {:?}",
|
||||
records.len(),
|
||||
records.first().map(|p| p.0).unwrap_or(Lsn(0)),
|
||||
records.last().map(|p| p.0).unwrap_or(Lsn(0)),
|
||||
nbytes,
|
||||
base_img_lsn,
|
||||
lsn,
|
||||
utils::error::report_compact_sources(e),
|
||||
n_attempts,
|
||||
request_no,
|
||||
diag,
|
||||
e,
|
||||
);
|
||||
// self.stdin only holds stdin & stderr as_raw_fd().
|
||||
// Dropping it as part of take() doesn't close them.
|
||||
@@ -336,11 +354,14 @@ impl PostgresRedoManager {
|
||||
proc.child.kill_and_wait();
|
||||
}
|
||||
} else if n_attempts != 0 {
|
||||
info!(n_attempts, "retried walredo succeeded");
|
||||
info!(n_attempts, ?diag, "retried walredo succeeded");
|
||||
}
|
||||
n_attempts += 1;
|
||||
if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
|
||||
return result;
|
||||
return result.map_err(|e| match e {
|
||||
ApplyWalRecordsError::WithRequestNo { error, .. } => error,
|
||||
ApplyWalRecordsError::NoRequestNo(e) => e,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -354,7 +375,8 @@ impl PostgresRedoManager {
|
||||
lsn: Lsn,
|
||||
base_img: Option<Bytes>,
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
diag: impl std::fmt::Debug,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let start_time = Instant::now();
|
||||
|
||||
let mut page = BytesMut::new();
|
||||
@@ -363,8 +385,7 @@ impl PostgresRedoManager {
|
||||
page.extend_from_slice(&fpi[..]);
|
||||
} else {
|
||||
// All the current WAL record types that we can handle require a base image.
|
||||
error!("invalid neon WAL redo request with no base image");
|
||||
return Err(WalRedoError::InvalidRequest);
|
||||
anyhow::bail!("invalid neon WAL redo request with no base image");
|
||||
}
|
||||
|
||||
// Apply all the WAL records in the batch
|
||||
@@ -377,10 +398,11 @@ impl PostgresRedoManager {
|
||||
WAL_REDO_TIME.observe(duration.as_secs_f64());
|
||||
|
||||
debug!(
|
||||
"neon applied {} WAL records in {} ms to reconstruct page image at LSN {}",
|
||||
"neon applied {} WAL records in {} ms to reconstruct page image at LSN {} diag={:?}",
|
||||
records.len(),
|
||||
duration.as_micros(),
|
||||
lsn
|
||||
lsn,
|
||||
diag,
|
||||
);
|
||||
|
||||
Ok(page.freeze())
|
||||
@@ -392,14 +414,13 @@ impl PostgresRedoManager {
|
||||
page: &mut BytesMut,
|
||||
_record_lsn: Lsn,
|
||||
record: &NeonWalRecord,
|
||||
) -> Result<(), WalRedoError> {
|
||||
) -> anyhow::Result<()> {
|
||||
match record {
|
||||
NeonWalRecord::Postgres {
|
||||
will_init: _,
|
||||
rec: _,
|
||||
} => {
|
||||
error!("tried to pass postgres wal record to neon WAL redo");
|
||||
return Err(WalRedoError::InvalidRequest);
|
||||
anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
|
||||
}
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno,
|
||||
@@ -407,7 +428,7 @@ impl PostgresRedoManager {
|
||||
flags,
|
||||
} => {
|
||||
// sanity check that this is modifying the correct relation
|
||||
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
|
||||
let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
|
||||
assert!(
|
||||
rel.forknum == VISIBILITYMAP_FORKNUM,
|
||||
"ClearVisibilityMapFlags record on unexpected rel {}",
|
||||
@@ -445,7 +466,7 @@ impl PostgresRedoManager {
|
||||
// same effects as the corresponding Postgres WAL redo function.
|
||||
NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
|
||||
let (slru_kind, segno, blknum) =
|
||||
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
|
||||
key_to_slru_block(key).context("invalid record")?;
|
||||
assert_eq!(
|
||||
slru_kind,
|
||||
SlruKind::Clog,
|
||||
@@ -495,7 +516,7 @@ impl PostgresRedoManager {
|
||||
}
|
||||
NeonWalRecord::ClogSetAborted { xids } => {
|
||||
let (slru_kind, segno, blknum) =
|
||||
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
|
||||
key_to_slru_block(key).context("invalid record")?;
|
||||
assert_eq!(
|
||||
slru_kind,
|
||||
SlruKind::Clog,
|
||||
@@ -526,7 +547,7 @@ impl PostgresRedoManager {
|
||||
}
|
||||
NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
|
||||
let (slru_kind, segno, blknum) =
|
||||
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
|
||||
key_to_slru_block(key).context("invalid record")?;
|
||||
assert_eq!(
|
||||
slru_kind,
|
||||
SlruKind::MultiXactOffsets,
|
||||
@@ -559,7 +580,7 @@ impl PostgresRedoManager {
|
||||
}
|
||||
NeonWalRecord::MultixactMembersCreate { moff, members } => {
|
||||
let (slru_kind, segno, blknum) =
|
||||
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
|
||||
key_to_slru_block(key).context("invalid record")?;
|
||||
assert_eq!(
|
||||
slru_kind,
|
||||
SlruKind::MultiXactMembers,
|
||||
@@ -759,7 +780,7 @@ impl PostgresRedoManager {
|
||||
base_img: &Option<Bytes>,
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
wal_redo_timeout: Duration,
|
||||
) -> Result<Bytes, std::io::Error> {
|
||||
) -> Result<Bytes, ApplyWalRecordsError> {
|
||||
// Serialize all the messages to send the WAL redo process first.
|
||||
//
|
||||
// This could be problematic if there are millions of records to replay,
|
||||
@@ -782,10 +803,9 @@ impl PostgresRedoManager {
|
||||
{
|
||||
build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
|
||||
} else {
|
||||
return Err(Error::new(
|
||||
ErrorKind::Other,
|
||||
"tried to pass neon wal record to postgres WAL redo",
|
||||
));
|
||||
return Err(ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(
|
||||
"tried to pass neon wal record to postgres WAL redo"
|
||||
)));
|
||||
}
|
||||
}
|
||||
build_get_page_msg(tag, &mut writebuf);
|
||||
@@ -807,7 +827,7 @@ impl PostgresRedoManager {
|
||||
writebuf: &[u8],
|
||||
mut input: MutexGuard<Option<ProcessInput>>,
|
||||
wal_redo_timeout: Duration,
|
||||
) -> Result<Bytes, std::io::Error> {
|
||||
) -> Result<Bytes, ApplyWalRecordsError> {
|
||||
let proc = input.as_mut().unwrap();
|
||||
let mut nwrite = 0usize;
|
||||
let stdout_fd = proc.stdout_fd;
|
||||
@@ -828,10 +848,13 @@ impl PostgresRedoManager {
|
||||
Err(nix::errno::Errno::EINTR) => continue,
|
||||
res => break res,
|
||||
}
|
||||
}?;
|
||||
}
|
||||
.map_err(|e| ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(e)))?;
|
||||
|
||||
if n == 0 {
|
||||
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
|
||||
return Err(ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(
|
||||
"WAL redo timed out"
|
||||
)));
|
||||
}
|
||||
|
||||
// If we have some messages in stderr, forward them to the log.
|
||||
@@ -840,7 +863,9 @@ impl PostgresRedoManager {
|
||||
let mut errbuf: [u8; 16384] = [0; 16384];
|
||||
let mut stderr_guard = self.stderr.lock().unwrap();
|
||||
let stderr = stderr_guard.as_mut().unwrap();
|
||||
let len = stderr.read(&mut errbuf)?;
|
||||
let len = stderr
|
||||
.read(&mut errbuf)
|
||||
.map_err(|e| ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(e)))?;
|
||||
|
||||
// The message might not be split correctly into lines here. But this is
|
||||
// good enough, the important thing is to get the message to the log.
|
||||
@@ -855,22 +880,23 @@ impl PostgresRedoManager {
|
||||
continue;
|
||||
}
|
||||
} else if err_revents.contains(PollFlags::POLLHUP) {
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stderr unexpectedly",
|
||||
));
|
||||
return Err(ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(
|
||||
"WAL redo process closed its stderr unexpectedly"
|
||||
)));
|
||||
}
|
||||
|
||||
// If 'stdin' is writeable, do write.
|
||||
let in_revents = pollfds[0].revents().unwrap();
|
||||
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
|
||||
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
|
||||
nwrite += proc
|
||||
.stdin
|
||||
.write(&writebuf[nwrite..])
|
||||
.map_err(|e| ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(e)))?;
|
||||
} else if in_revents.contains(PollFlags::POLLHUP) {
|
||||
// We still have more data to write, but the process closed the pipe.
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stdin unexpectedly",
|
||||
));
|
||||
return Err(ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(
|
||||
"WAL redo process closed its stdin unexpectedly"
|
||||
)));
|
||||
}
|
||||
}
|
||||
let request_no = proc.n_requests;
|
||||
@@ -901,10 +927,10 @@ impl PostgresRedoManager {
|
||||
//
|
||||
// Cross-read this with the comment in apply_batch_postgres if result.is_err().
|
||||
// That's where we kill the child process.
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stdout unexpectedly",
|
||||
));
|
||||
return Err(ApplyWalRecordsError::WithRequestNo {
|
||||
request_no,
|
||||
error: anyhow::anyhow!("WAL redo process closed its stdout unexpectedly"),
|
||||
});
|
||||
}
|
||||
let n_processed_responses = output.n_processed_responses;
|
||||
while n_processed_responses + output.pending_responses.len() <= request_no {
|
||||
@@ -920,10 +946,14 @@ impl PostgresRedoManager {
|
||||
Err(nix::errno::Errno::EINTR) => continue,
|
||||
res => break res,
|
||||
}
|
||||
}?;
|
||||
}
|
||||
.map_err(|e| ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(e)))?;
|
||||
|
||||
if n == 0 {
|
||||
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
|
||||
return Err(ApplyWalRecordsError::WithRequestNo {
|
||||
request_no,
|
||||
error: anyhow::anyhow!("WAL redo timed out"),
|
||||
});
|
||||
}
|
||||
|
||||
// If we have some messages in stderr, forward them to the log.
|
||||
@@ -932,7 +962,12 @@ impl PostgresRedoManager {
|
||||
let mut errbuf: [u8; 16384] = [0; 16384];
|
||||
let mut stderr_guard = self.stderr.lock().unwrap();
|
||||
let stderr = stderr_guard.as_mut().unwrap();
|
||||
let len = stderr.read(&mut errbuf)?;
|
||||
let len = stderr.read(&mut errbuf).map_err(|e| {
|
||||
ApplyWalRecordsError::WithRequestNo {
|
||||
request_no,
|
||||
error: anyhow::anyhow!(e),
|
||||
}
|
||||
})?;
|
||||
|
||||
// The message might not be split correctly into lines here. But this is
|
||||
// good enough, the important thing is to get the message to the log.
|
||||
@@ -947,21 +982,26 @@ impl PostgresRedoManager {
|
||||
continue;
|
||||
}
|
||||
} else if err_revents.contains(PollFlags::POLLHUP) {
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stderr unexpectedly",
|
||||
));
|
||||
return Err(ApplyWalRecordsError::WithRequestNo {
|
||||
request_no,
|
||||
error: anyhow::anyhow!("WAL redo process closed its stderr unexpectedly"),
|
||||
});
|
||||
}
|
||||
|
||||
// If we have some data in stdout, read it to the result buffer.
|
||||
let out_revents = pollfds[2].revents().unwrap();
|
||||
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
||||
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
|
||||
nresult += output.stdout.read(&mut resultbuf[nresult..]).map_err(|e| {
|
||||
ApplyWalRecordsError::WithRequestNo {
|
||||
request_no,
|
||||
error: anyhow::anyhow!(e),
|
||||
}
|
||||
})?;
|
||||
} else if out_revents.contains(PollFlags::POLLHUP) {
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stdout unexpectedly",
|
||||
));
|
||||
return Err(ApplyWalRecordsError::WithRequestNo {
|
||||
request_no,
|
||||
error: anyhow::anyhow!("WAL redo process closed its stdout unexpectedly"),
|
||||
});
|
||||
}
|
||||
}
|
||||
output
|
||||
|
||||
@@ -102,9 +102,9 @@ fn json_array_to_pg_array(value: &Value) -> Result<Option<String>, serde_json::E
|
||||
// convert to text with escaping
|
||||
Value::Bool(_) => serde_json::to_string(value).map(Some),
|
||||
Value::Number(_) => serde_json::to_string(value).map(Some),
|
||||
Value::Object(_) => serde_json::to_string(value).map(Some),
|
||||
|
||||
// here string needs to be escaped, as it is part of the array
|
||||
Value::Object(_) => json_array_to_pg_array(&Value::String(serde_json::to_string(value)?)),
|
||||
Value::String(_) => serde_json::to_string(value).map(Some),
|
||||
|
||||
// recurse into array
|
||||
@@ -613,7 +613,7 @@ fn _pg_array_parse(
|
||||
}
|
||||
}
|
||||
}
|
||||
'}' => {
|
||||
'}' if !quote => {
|
||||
level -= 1;
|
||||
if level == 0 {
|
||||
push_checked(&mut entry, &mut entries, elem_type)?;
|
||||
@@ -697,6 +697,14 @@ mod tests {
|
||||
"{{true,false},{NULL,42},{\"foo\",\"bar\\\"-\\\\\"}}".to_owned()
|
||||
)]
|
||||
);
|
||||
// array of objects
|
||||
let json = r#"[{"foo": 1},{"bar": 2}]"#;
|
||||
let json: Value = serde_json::from_str(json).unwrap();
|
||||
let pg_params = json_to_pg_text(vec![json]).unwrap();
|
||||
assert_eq!(
|
||||
pg_params,
|
||||
vec![Some(r#"{"{\"foo\":1}","{\"bar\":2}"}"#.to_owned())]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -824,4 +832,23 @@ mod tests {
|
||||
json!([[[1, 2, 3], [4, 5, 6]]])
|
||||
);
|
||||
}
|
||||
#[test]
|
||||
fn test_pg_array_parse_json() {
|
||||
fn pt(pg_arr: &str) -> Value {
|
||||
pg_array_parse(pg_arr, &Type::JSONB).unwrap()
|
||||
}
|
||||
assert_eq!(pt(r#"{"{}"}"#), json!([{}]));
|
||||
assert_eq!(
|
||||
pt(r#"{"{\"foo\": 1, \"bar\": 2}"}"#),
|
||||
json!([{"foo": 1, "bar": 2}])
|
||||
);
|
||||
assert_eq!(
|
||||
pt(r#"{"{\"foo\": 1}", "{\"bar\": 2}"}"#),
|
||||
json!([{"foo": 1}, {"bar": 2}])
|
||||
);
|
||||
assert_eq!(
|
||||
pt(r#"{{"{\"foo\": 1}", "{\"bar\": 2}"}}"#),
|
||||
json!([[{"foo": 1}, {"bar": 2}]])
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -188,7 +188,7 @@ def test_sql_over_http(static_proxy: NeonProxy):
|
||||
headers={"Content-Type": "application/sql", "Neon-Connection-String": connstr},
|
||||
verify=str(static_proxy.test_output_dir / "proxy.crt"),
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.status_code == 200, response.text
|
||||
return response.json()
|
||||
|
||||
rows = q("select 42 as answer")["rows"]
|
||||
@@ -206,6 +206,12 @@ def test_sql_over_http(static_proxy: NeonProxy):
|
||||
rows = q("select $1::json->'a' as answer", [{"a": {"b": 42}}])["rows"]
|
||||
assert rows == [{"answer": {"b": 42}}]
|
||||
|
||||
rows = q("select $1::jsonb[] as answer", [[{}]])["rows"]
|
||||
assert rows == [{"answer": [{}]}]
|
||||
|
||||
rows = q("select $1::jsonb[] as answer", [[{"foo": 1}, {"bar": 2}]])["rows"]
|
||||
assert rows == [{"answer": [{"foo": 1}, {"bar": 2}]}]
|
||||
|
||||
rows = q("select * from pg_class limit 1")["rows"]
|
||||
assert len(rows) == 1
|
||||
|
||||
|
||||
Reference in New Issue
Block a user