mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-03 19:42:55 +00:00
Merge branch 'main' into rocksdb_pageserver
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1586,6 +1586,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"crc32c",
|
||||
"hex",
|
||||
"log",
|
||||
"rand 0.8.3",
|
||||
]
|
||||
|
||||
@@ -2554,6 +2555,7 @@ dependencies = [
|
||||
"pageserver",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
"postgres_ffi",
|
||||
"regex",
|
||||
"slog",
|
||||
"slog-async",
|
||||
|
||||
@@ -17,7 +17,7 @@ use serde_derive::{Deserialize, Serialize};
|
||||
|
||||
use pageserver::zenith_repo_dir;
|
||||
use pageserver::ZTimelineId;
|
||||
use walkeeper::xlog_utils;
|
||||
use postgres_ffi::xlog_utils;
|
||||
|
||||
//
|
||||
// This data structure represents deserialized zenith config, which should be
|
||||
|
||||
@@ -33,6 +33,7 @@ use crate::page_cache::RelTag;
|
||||
use crate::waldecoder::{decode_wal_record, WalStreamDecoder};
|
||||
use crate::PageServerConf;
|
||||
use crate::ZTimelineId;
|
||||
use postgres_ffi::xlog_utils::*;
|
||||
|
||||
// From pg_tablespace_d.h
|
||||
//
|
||||
@@ -344,59 +345,6 @@ fn restore_wal(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// FIXME: copied from xlog_utils.rs
|
||||
pub const XLOG_FNAME_LEN: usize = 24;
|
||||
pub type XLogRecPtr = u64;
|
||||
pub type XLogSegNo = u64;
|
||||
pub type TimeLineID = u32;
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn XLogSegmentOffset(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> u32 {
|
||||
return (xlogptr as u32) & (wal_segsz_bytes as u32 - 1);
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn XLByteToSeg(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> XLogSegNo {
|
||||
return xlogptr / wal_segsz_bytes as u64;
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
|
||||
return format!(
|
||||
"{:>08X}{:>08X}{:>08X}",
|
||||
tli,
|
||||
logSegNo / XLogSegmentsPerXLogId(wal_segsz_bytes),
|
||||
logSegNo % XLogSegmentsPerXLogId(wal_segsz_bytes)
|
||||
);
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
|
||||
return (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo;
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
|
||||
let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
|
||||
let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
|
||||
let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo;
|
||||
return (log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli);
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn IsXLogFileName(fname: &str) -> bool {
|
||||
return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn IsPartialXLogFileName(fname: &str) -> bool {
|
||||
if let Some(basefname) = fname.strip_suffix(".partial") {
|
||||
IsXLogFileName(basefname)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct FilePathError {
|
||||
msg: String,
|
||||
|
||||
@@ -30,6 +30,7 @@ use tokio::time::{sleep, Duration};
|
||||
use tokio_postgres::replication::{PgTimestamp, ReplicationStream};
|
||||
use tokio_postgres::{NoTls, SimpleQueryMessage, SimpleQueryRow};
|
||||
use tokio_stream::StreamExt;
|
||||
use postgres_ffi::xlog_utils::*;
|
||||
|
||||
//
|
||||
// We keep one WAL Receiver active per timeline.
|
||||
@@ -373,62 +374,6 @@ pub async fn identify_system(client: &tokio_postgres::Client) -> Result<Identify
|
||||
}
|
||||
}
|
||||
|
||||
pub const XLOG_FNAME_LEN: usize = 24;
|
||||
pub const XLOG_BLCKSZ: usize = 8192;
|
||||
pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
|
||||
pub const XLOG_PAGE_MAGIC: u16 = 0xD109;
|
||||
pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8;
|
||||
pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = XLP_REM_LEN_OFFS + 4 + 4;
|
||||
pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = XLOG_SIZE_OF_XLOG_SHORT_PHD + 8 + 4 + 4;
|
||||
pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2;
|
||||
pub const XLOG_SIZE_OF_XLOG_RECORD: usize = XLOG_RECORD_CRC_OFFS + 4;
|
||||
pub type XLogRecPtr = u64;
|
||||
pub type TimeLineID = u32;
|
||||
pub type TimestampTz = u64;
|
||||
pub type XLogSegNo = u64;
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn XLogSegmentOffset(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> u32 {
|
||||
return (xlogptr as u32) & (wal_segsz_bytes as u32 - 1);
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
|
||||
return (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo;
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn XLByteToSeg(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> XLogSegNo {
|
||||
return xlogptr / wal_segsz_bytes as u64;
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn XLogSegNoOffsetToRecPtr(
|
||||
segno: XLogSegNo,
|
||||
offset: u32,
|
||||
wal_segsz_bytes: usize,
|
||||
) -> XLogRecPtr {
|
||||
return segno * (wal_segsz_bytes as u64) + (offset as u64);
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
|
||||
return format!(
|
||||
"{:>08X}{:>08X}{:>08X}",
|
||||
tli,
|
||||
logSegNo / XLogSegmentsPerXLogId(wal_segsz_bytes),
|
||||
logSegNo % XLogSegmentsPerXLogId(wal_segsz_bytes)
|
||||
);
|
||||
}
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
|
||||
let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
|
||||
let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
|
||||
let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo;
|
||||
return (log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli);
|
||||
}
|
||||
|
||||
fn write_wal_file(
|
||||
startpos: XLogRecPtr,
|
||||
timeline: ZTimelineId,
|
||||
|
||||
@@ -14,6 +14,7 @@ byteorder = "1.4.3"
|
||||
anyhow = "1.0"
|
||||
crc32c = "0.6.0"
|
||||
hex = "0.4.3"
|
||||
log = "0.4.14"
|
||||
|
||||
[build-dependencies]
|
||||
bindgen = "0.57"
|
||||
|
||||
3
postgres_ffi/README
Normal file
3
postgres_ffi/README
Normal file
@@ -0,0 +1,3 @@
|
||||
This module contains utility functions for interacting with PostgreSQL
|
||||
file formats.
|
||||
|
||||
@@ -3,6 +3,8 @@
|
||||
#![allow(non_snake_case)]
|
||||
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
|
||||
|
||||
pub mod xlog_utils;
|
||||
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
|
||||
// sizeof(ControlFileData)
|
||||
|
||||
@@ -1,3 +1,12 @@
|
||||
//
|
||||
// This file contains common utilities for dealing with PostgreSQL WAL files and
|
||||
// LSNs.
|
||||
//
|
||||
// Many of these functions have been copied from PostgreSQL, and rewritten in
|
||||
// Rust. That's why they don't follow the usual Rust naming conventions, they
|
||||
// have been named the same as the corresponding PostgreSQL functions instead.
|
||||
//
|
||||
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use crc32c::*;
|
||||
use log::*;
|
||||
@@ -184,6 +193,9 @@ fn find_end_of_wal_segment(
|
||||
last_valid_rec_pos as u32
|
||||
}
|
||||
|
||||
///
|
||||
/// Scan a directory that contains PostgreSQL WAL files, for the end of WAL.
|
||||
///
|
||||
pub fn find_end_of_wal(
|
||||
data_dir: &Path,
|
||||
wal_seg_size: usize,
|
||||
@@ -30,3 +30,4 @@ crc32c = "0.6.0"
|
||||
|
||||
# FIXME: 'pageserver' is needed for ZTimelineId. Refactor
|
||||
pageserver = { path = "../pageserver" }
|
||||
postgres_ffi = { path = "../postgres_ffi" }
|
||||
|
||||
@@ -4,7 +4,6 @@ use std::path::PathBuf;
|
||||
|
||||
mod pq_protocol;
|
||||
pub mod wal_service;
|
||||
pub mod xlog_utils;
|
||||
|
||||
use crate::pq_protocol::SystemId;
|
||||
|
||||
|
||||
@@ -29,9 +29,9 @@ use tokio::task;
|
||||
use tokio_postgres::{connect, Error, NoTls};
|
||||
|
||||
use crate::pq_protocol::*;
|
||||
use crate::xlog_utils::*;
|
||||
use crate::WalAcceptorConf;
|
||||
use pageserver::ZTimelineId;
|
||||
use postgres_ffi::xlog_utils::*;
|
||||
|
||||
type FullTransactionId = u64;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user