mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-06 09:50:38 +00:00
Compare commits
4 Commits
rustls
...
embedded_w
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c2431ca28b | ||
|
|
aac2d1e04c | ||
|
|
033ea537e2 | ||
|
|
a2841c750d |
@@ -1,12 +1,11 @@
|
|||||||
|
use crate::ZTimelineId;
|
||||||
use log::*;
|
use log::*;
|
||||||
|
use postgres_ffi::FilePathError;
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use std::fmt;
|
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use tar::Builder;
|
use tar::Builder;
|
||||||
use walkdir::WalkDir;
|
use walkdir::WalkDir;
|
||||||
|
|
||||||
use crate::ZTimelineId;
|
|
||||||
|
|
||||||
pub fn send_snapshot_tarball(
|
pub fn send_snapshot_tarball(
|
||||||
write: &mut dyn Write,
|
write: &mut dyn Write,
|
||||||
timelineid: ZTimelineId,
|
timelineid: ZTimelineId,
|
||||||
@@ -85,45 +84,7 @@ pub fn send_snapshot_tarball(
|
|||||||
// <oid>.<segment number>
|
// <oid>.<segment number>
|
||||||
// <oid>_<fork name>.<segment number>
|
// <oid>_<fork name>.<segment number>
|
||||||
|
|
||||||
#[derive(Debug)]
|
fn parse_filename(fname: &str) -> Result<(u32, u8, u32), FilePathError> {
|
||||||
struct FilePathError {
|
|
||||||
msg: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FilePathError {
|
|
||||||
fn new(msg: &str) -> FilePathError {
|
|
||||||
FilePathError {
|
|
||||||
msg: msg.to_string(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<core::num::ParseIntError> for FilePathError {
|
|
||||||
fn from(e: core::num::ParseIntError) -> Self {
|
|
||||||
return FilePathError {
|
|
||||||
msg: format!("invalid filename: {}", e),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Display for FilePathError {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
write!(f, "invalid filename")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn forkname_to_forknum(forkname: Option<&str>) -> Result<u32, FilePathError> {
|
|
||||||
match forkname {
|
|
||||||
// "main" is not in filenames, it's implicit if the fork name is not present
|
|
||||||
None => Ok(0),
|
|
||||||
Some("fsm") => Ok(1),
|
|
||||||
Some("vm") => Ok(2),
|
|
||||||
Some("init") => Ok(3),
|
|
||||||
Some(_) => Err(FilePathError::new("invalid forkname")),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn parse_filename(fname: &str) -> Result<(u32, u32, u32), FilePathError> {
|
|
||||||
let re = Regex::new(r"^(?P<relnode>\d+)(_(?P<forkname>[a-z]+))?(\.(?P<segno>\d+))?$").unwrap();
|
let re = Regex::new(r"^(?P<relnode>\d+)(_(?P<forkname>[a-z]+))?(\.(?P<segno>\d+))?$").unwrap();
|
||||||
|
|
||||||
let caps = re
|
let caps = re
|
||||||
@@ -134,7 +95,7 @@ fn parse_filename(fname: &str) -> Result<(u32, u32, u32), FilePathError> {
|
|||||||
let relnode = u32::from_str_radix(relnode_str, 10)?;
|
let relnode = u32::from_str_radix(relnode_str, 10)?;
|
||||||
|
|
||||||
let forkname = caps.name("forkname").map(|f| f.as_str());
|
let forkname = caps.name("forkname").map(|f| f.as_str());
|
||||||
let forknum = forkname_to_forknum(forkname)?;
|
let forknum = postgres_ffi::forkname_to_forknum(forkname)?;
|
||||||
|
|
||||||
let segno_match = caps.name("segno");
|
let segno_match = caps.name("segno");
|
||||||
let segno = if segno_match.is_none() {
|
let segno = if segno_match.is_none() {
|
||||||
|
|||||||
@@ -202,6 +202,7 @@ pub struct CacheEntryContent {
|
|||||||
|
|
||||||
const PAGE_IMAGE_FLAG: u8 = 1u8;
|
const PAGE_IMAGE_FLAG: u8 = 1u8;
|
||||||
const UNUSED_VERSION_FLAG: u8 = 2u8;
|
const UNUSED_VERSION_FLAG: u8 = 2u8;
|
||||||
|
const TRUNCATED_FLAG: u8 = 4u8;
|
||||||
|
|
||||||
impl CacheEntryContent {
|
impl CacheEntryContent {
|
||||||
pub fn pack(&self, buf: &mut BytesMut) {
|
pub fn pack(&self, buf: &mut BytesMut) {
|
||||||
@@ -210,7 +211,11 @@ impl CacheEntryContent {
|
|||||||
buf.put_u16(image.len() as u16);
|
buf.put_u16(image.len() as u16);
|
||||||
buf.put_slice(&image[..]);
|
buf.put_slice(&image[..]);
|
||||||
} else if let Some(rec) = &self.wal_record {
|
} else if let Some(rec) = &self.wal_record {
|
||||||
buf.put_u8(0);
|
if rec.truncate {
|
||||||
|
buf.put_u8(TRUNCATED_FLAG);
|
||||||
|
} else {
|
||||||
|
buf.put_u8(0);
|
||||||
|
}
|
||||||
rec.pack(buf);
|
rec.pack(buf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -790,7 +795,7 @@ impl PageCache {
|
|||||||
minkey.lsn = Lsn(0); // first version
|
minkey.lsn = Lsn(0); // first version
|
||||||
|
|
||||||
// reconstruct most recent page version
|
// reconstruct most recent page version
|
||||||
if (v[0] & PAGE_IMAGE_FLAG) == 0 {
|
if (v[0] & (TRUNCATED_FLAG | PAGE_IMAGE_FLAG)) == 0 {
|
||||||
trace!("Reconstruct most recent page {:?}", key);
|
trace!("Reconstruct most recent page {:?}", key);
|
||||||
// force reconstruction of most recent page version
|
// force reconstruction of most recent page version
|
||||||
let (base_img, records) =
|
let (base_img, records) =
|
||||||
@@ -818,7 +823,7 @@ impl PageCache {
|
|||||||
let key = CacheKey::unpack(&mut buf);
|
let key = CacheKey::unpack(&mut buf);
|
||||||
if key.tag == maxkey.tag {
|
if key.tag == maxkey.tag {
|
||||||
let v = iter.value().unwrap();
|
let v = iter.value().unwrap();
|
||||||
if (v[0] & PAGE_IMAGE_FLAG) == 0 {
|
if (v[0] & (TRUNCATED_FLAG | PAGE_IMAGE_FLAG)) == 0 {
|
||||||
trace!("Reconstruct horizon page {:?}", key);
|
trace!("Reconstruct horizon page {:?}", key);
|
||||||
let (base_img, records) =
|
let (base_img, records) =
|
||||||
self.collect_records_for_apply(key.tag, key.lsn);
|
self.collect_records_for_apply(key.tag, key.lsn);
|
||||||
|
|||||||
@@ -12,10 +12,8 @@
|
|||||||
|
|
||||||
use log::*;
|
use log::*;
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use std::fmt;
|
|
||||||
|
|
||||||
use std::cmp::max;
|
use std::cmp::max;
|
||||||
use std::error::Error;
|
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
@@ -35,6 +33,7 @@ use crate::PageServerConf;
|
|||||||
use crate::ZTimelineId;
|
use crate::ZTimelineId;
|
||||||
use postgres_ffi::pg_constants;
|
use postgres_ffi::pg_constants;
|
||||||
use postgres_ffi::xlog_utils::*;
|
use postgres_ffi::xlog_utils::*;
|
||||||
|
use postgres_ffi::FilePathError;
|
||||||
use zenith_utils::lsn::Lsn;
|
use zenith_utils::lsn::Lsn;
|
||||||
|
|
||||||
// From pg_tablespace_d.h
|
// From pg_tablespace_d.h
|
||||||
@@ -210,8 +209,7 @@ fn restore_relfile(
|
|||||||
let mut file = File::open(path)?;
|
let mut file = File::open(path)?;
|
||||||
let mut buf: [u8; 8192] = [0u8; 8192];
|
let mut buf: [u8; 8192] = [0u8; 8192];
|
||||||
|
|
||||||
// FIXME: use constants (BLCKSZ)
|
let mut blknum: u32 = segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32);
|
||||||
let mut blknum: u32 = segno * (1024 * 1024 * 1024 / 8192);
|
|
||||||
loop {
|
loop {
|
||||||
let r = file.read_exact(&mut buf);
|
let r = file.read_exact(&mut buf);
|
||||||
match r {
|
match r {
|
||||||
@@ -221,7 +219,7 @@ fn restore_relfile(
|
|||||||
spcnode: spcoid,
|
spcnode: spcoid,
|
||||||
dbnode: dboid,
|
dbnode: dboid,
|
||||||
relnode,
|
relnode,
|
||||||
forknum: forknum as u8,
|
forknum,
|
||||||
},
|
},
|
||||||
blknum,
|
blknum,
|
||||||
};
|
};
|
||||||
@@ -257,7 +255,7 @@ fn restore_nonrelfile(
|
|||||||
pcache: &PageCache,
|
pcache: &PageCache,
|
||||||
_timeline: ZTimelineId,
|
_timeline: ZTimelineId,
|
||||||
snapshot: &str,
|
snapshot: &str,
|
||||||
forknum: u32,
|
forknum: u8,
|
||||||
path: &Path,
|
path: &Path,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let lsn = Lsn::from_hex(snapshot)?;
|
let lsn = Lsn::from_hex(snapshot)?;
|
||||||
@@ -268,7 +266,6 @@ fn restore_nonrelfile(
|
|||||||
let mut buf: [u8; 8192] = [0u8; 8192];
|
let mut buf: [u8; 8192] = [0u8; 8192];
|
||||||
let segno = u32::from_str_radix(path.file_name().unwrap().to_str().unwrap(), 16)?;
|
let segno = u32::from_str_radix(path.file_name().unwrap().to_str().unwrap(), 16)?;
|
||||||
|
|
||||||
// FIXME: use constants (BLCKSZ)
|
|
||||||
let mut blknum: u32 = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
|
let mut blknum: u32 = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||||
loop {
|
loop {
|
||||||
let r = file.read_exact(&mut buf);
|
let r = file.read_exact(&mut buf);
|
||||||
@@ -279,7 +276,7 @@ fn restore_nonrelfile(
|
|||||||
spcnode: 0,
|
spcnode: 0,
|
||||||
dbnode: 0,
|
dbnode: 0,
|
||||||
relnode: 0,
|
relnode: 0,
|
||||||
forknum: forknum as u8,
|
forknum,
|
||||||
},
|
},
|
||||||
blknum,
|
blknum,
|
||||||
};
|
};
|
||||||
@@ -411,55 +408,12 @@ fn restore_wal(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
struct FilePathError {
|
|
||||||
msg: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Error for FilePathError {
|
|
||||||
fn description(&self) -> &str {
|
|
||||||
&self.msg
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl FilePathError {
|
|
||||||
fn new(msg: &str) -> FilePathError {
|
|
||||||
FilePathError {
|
|
||||||
msg: msg.to_string(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<core::num::ParseIntError> for FilePathError {
|
|
||||||
fn from(e: core::num::ParseIntError) -> Self {
|
|
||||||
return FilePathError {
|
|
||||||
msg: format!("invalid filename: {}", e),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Display for FilePathError {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
write!(f, "invalid filename")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn forkname_to_forknum(forkname: Option<&str>) -> Result<u32, FilePathError> {
|
|
||||||
match forkname {
|
|
||||||
// "main" is not in filenames, it's implicit if the fork name is not present
|
|
||||||
None => Ok(0),
|
|
||||||
Some("fsm") => Ok(1),
|
|
||||||
Some("vm") => Ok(2),
|
|
||||||
Some("init") => Ok(3),
|
|
||||||
Some(_) => Err(FilePathError::new("invalid forkname")),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct ParsedBaseImageFileName {
|
struct ParsedBaseImageFileName {
|
||||||
pub spcnode: u32,
|
pub spcnode: u32,
|
||||||
pub dbnode: u32,
|
pub dbnode: u32,
|
||||||
pub relnode: u32,
|
pub relnode: u32,
|
||||||
pub forknum: u32,
|
pub forknum: u8,
|
||||||
pub segno: u32,
|
pub segno: u32,
|
||||||
|
|
||||||
pub lsn: u64,
|
pub lsn: u64,
|
||||||
@@ -471,7 +425,7 @@ struct ParsedBaseImageFileName {
|
|||||||
// <oid>.<segment number>
|
// <oid>.<segment number>
|
||||||
// <oid>_<fork name>.<segment number>
|
// <oid>_<fork name>.<segment number>
|
||||||
|
|
||||||
fn parse_relfilename(fname: &str) -> Result<(u32, u32, u32), FilePathError> {
|
fn parse_relfilename(fname: &str) -> Result<(u32, u8, u32), FilePathError> {
|
||||||
let re = Regex::new(r"^(?P<relnode>\d+)(_(?P<forkname>[a-z]+))?(\.(?P<segno>\d+))?$").unwrap();
|
let re = Regex::new(r"^(?P<relnode>\d+)(_(?P<forkname>[a-z]+))?(\.(?P<segno>\d+))?$").unwrap();
|
||||||
|
|
||||||
let caps = re
|
let caps = re
|
||||||
@@ -482,7 +436,7 @@ fn parse_relfilename(fname: &str) -> Result<(u32, u32, u32), FilePathError> {
|
|||||||
let relnode = u32::from_str_radix(relnode_str, 10)?;
|
let relnode = u32::from_str_radix(relnode_str, 10)?;
|
||||||
|
|
||||||
let forkname = caps.name("forkname").map(|f| f.as_str());
|
let forkname = caps.name("forkname").map(|f| f.as_str());
|
||||||
let forknum = forkname_to_forknum(forkname)?;
|
let forknum = postgres_ffi::forkname_to_forknum(forkname)?;
|
||||||
|
|
||||||
let segno_match = caps.name("segno");
|
let segno_match = caps.name("segno");
|
||||||
let segno = if segno_match.is_none() {
|
let segno = if segno_match.is_none() {
|
||||||
|
|||||||
@@ -133,50 +133,12 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> {
|
|||||||
const DEFAULTTABLESPACE_OID: u32 = 1663;
|
const DEFAULTTABLESPACE_OID: u32 = 1663;
|
||||||
const GLOBALTABLESPACE_OID: u32 = 1664;
|
const GLOBALTABLESPACE_OID: u32 = 1664;
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct FilePathError {
|
|
||||||
msg: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FilePathError {
|
|
||||||
fn new(msg: &str) -> FilePathError {
|
|
||||||
FilePathError {
|
|
||||||
msg: msg.to_string(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<core::num::ParseIntError> for FilePathError {
|
|
||||||
fn from(e: core::num::ParseIntError) -> Self {
|
|
||||||
return FilePathError {
|
|
||||||
msg: format!("invalid filename: {}", e),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Display for FilePathError {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
write!(f, "invalid filename")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn forkname_to_forknum(forkname: Option<&str>) -> Result<u32, FilePathError> {
|
|
||||||
match forkname {
|
|
||||||
// "main" is not in filenames, it's implicit if the fork name is not present
|
|
||||||
None => Ok(0),
|
|
||||||
Some("fsm") => Ok(1),
|
|
||||||
Some("vm") => Ok(2),
|
|
||||||
Some("init") => Ok(3),
|
|
||||||
Some(_) => Err(FilePathError::new("invalid forkname")),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct ParsedBaseImageFileName {
|
struct ParsedBaseImageFileName {
|
||||||
pub spcnode: u32,
|
pub spcnode: u32,
|
||||||
pub dbnode: u32,
|
pub dbnode: u32,
|
||||||
pub relnode: u32,
|
pub relnode: u32,
|
||||||
pub forknum: u32,
|
pub forknum: u8,
|
||||||
pub segno: u32,
|
pub segno: u32,
|
||||||
|
|
||||||
pub lsn: u64,
|
pub lsn: u64,
|
||||||
@@ -188,7 +150,7 @@ struct ParsedBaseImageFileName {
|
|||||||
// <oid>.<segment number>
|
// <oid>.<segment number>
|
||||||
// <oid>_<fork name>.<segment number>
|
// <oid>_<fork name>.<segment number>
|
||||||
|
|
||||||
fn parse_filename(fname: &str) -> Result<(u32, u32, u32, u64), FilePathError> {
|
fn parse_filename(fname: &str) -> Result<(u32, u8, u32, u64), FilePathError> {
|
||||||
let re = Regex::new(r"^(?P<relnode>\d+)(_(?P<forkname>[a-z]+))?(\.(?P<segno>\d+))?_(?P<lsnhi>[[:xdigit:]]{8})(?P<lsnlo>[[:xdigit:]]{8})$").unwrap();
|
let re = Regex::new(r"^(?P<relnode>\d+)(_(?P<forkname>[a-z]+))?(\.(?P<segno>\d+))?_(?P<lsnhi>[[:xdigit:]]{8})(?P<lsnlo>[[:xdigit:]]{8})$").unwrap();
|
||||||
|
|
||||||
let caps = re
|
let caps = re
|
||||||
@@ -294,8 +256,7 @@ async fn slurp_base_file(
|
|||||||
|
|
||||||
let mut bytes = BytesMut::from(data.as_slice()).freeze();
|
let mut bytes = BytesMut::from(data.as_slice()).freeze();
|
||||||
|
|
||||||
// FIXME: use constants (BLCKSZ)
|
let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32);
|
||||||
let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192);
|
|
||||||
|
|
||||||
let pcache = page_cache::get_pagecache(conf, sys_id);
|
let pcache = page_cache::get_pagecache(conf, sys_id);
|
||||||
|
|
||||||
@@ -305,7 +266,7 @@ async fn slurp_base_file(
|
|||||||
spcnode: parsed.spcnode,
|
spcnode: parsed.spcnode,
|
||||||
dbnode: parsed.dbnode,
|
dbnode: parsed.dbnode,
|
||||||
relnode: parsed.relnode,
|
relnode: parsed.relnode,
|
||||||
forknum: parsed.forknum as u8,
|
forknum: parsed.forknum,
|
||||||
},
|
},
|
||||||
blknum,
|
blknum,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -307,10 +307,9 @@ pub struct DecodedWALRecord {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub type Oid = u32;
|
pub type Oid = u32;
|
||||||
|
pub type TransactionId = u32;
|
||||||
pub type BlockNumber = u32;
|
pub type BlockNumber = u32;
|
||||||
|
pub type OffsetNumber = u16;
|
||||||
pub const MAIN_FORKNUM: u8 = 0;
|
|
||||||
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
|
|
||||||
|
|
||||||
#[repr(C)]
|
#[repr(C)]
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
@@ -366,6 +365,82 @@ impl XlCreateDatabase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct XlHeapInsert {
|
||||||
|
pub offnum: OffsetNumber,
|
||||||
|
pub flags: u8,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl XlHeapInsert {
|
||||||
|
pub fn decode(buf: &mut Bytes) -> XlHeapInsert {
|
||||||
|
XlHeapInsert {
|
||||||
|
offnum: buf.get_u16_le(),
|
||||||
|
flags: buf.get_u8(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct XlHeapMultiInsert {
|
||||||
|
pub flags: u8,
|
||||||
|
pub ntuples: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl XlHeapMultiInsert {
|
||||||
|
pub fn decode(buf: &mut Bytes) -> XlHeapMultiInsert {
|
||||||
|
XlHeapMultiInsert {
|
||||||
|
flags: buf.get_u8(),
|
||||||
|
ntuples: buf.get_u16_le(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct XlHeapDelete {
|
||||||
|
pub xmax: TransactionId,
|
||||||
|
pub offnum: OffsetNumber,
|
||||||
|
pub infobits_set: u8,
|
||||||
|
pub flags: u8,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl XlHeapDelete {
|
||||||
|
pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
|
||||||
|
XlHeapDelete {
|
||||||
|
xmax: buf.get_u32_le(),
|
||||||
|
offnum: buf.get_u16_le(),
|
||||||
|
infobits_set: buf.get_u8(),
|
||||||
|
flags: buf.get_u8(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct XlHeapUpdate {
|
||||||
|
pub old_xmax: TransactionId,
|
||||||
|
pub old_offnum: OffsetNumber,
|
||||||
|
pub old_infobits_set: u8,
|
||||||
|
pub flags: u8,
|
||||||
|
pub new_xmax: TransactionId,
|
||||||
|
pub new_offnum: OffsetNumber,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl XlHeapUpdate {
|
||||||
|
pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
|
||||||
|
XlHeapUpdate {
|
||||||
|
old_xmax: buf.get_u32_le(),
|
||||||
|
old_offnum: buf.get_u16_le(),
|
||||||
|
old_infobits_set: buf.get_u8(),
|
||||||
|
flags: buf.get_u8(),
|
||||||
|
new_xmax: buf.get_u32_le(),
|
||||||
|
new_offnum: buf.get_u16_le(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// Routines to decode a WAL record and figure out which blocks are modified
|
// Routines to decode a WAL record and figure out which blocks are modified
|
||||||
//
|
//
|
||||||
@@ -617,7 +692,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
|||||||
//5. Handle special CLOG and XACT records
|
//5. Handle special CLOG and XACT records
|
||||||
if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID {
|
if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID {
|
||||||
let mut blk = DecodedBkpBlock::new();
|
let mut blk = DecodedBkpBlock::new();
|
||||||
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
|
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||||
blk.blkno = buf.get_i32_le() as u32;
|
blk.blkno = buf.get_i32_le() as u32;
|
||||||
blk.will_init = true;
|
blk.will_init = true;
|
||||||
trace!("RM_CLOG_ID updates block {}", blk.blkno);
|
trace!("RM_CLOG_ID updates block {}", blk.blkno);
|
||||||
@@ -626,7 +701,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
|||||||
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||||
if info == pg_constants::XLOG_XACT_COMMIT {
|
if info == pg_constants::XLOG_XACT_COMMIT {
|
||||||
let mut blk = DecodedBkpBlock::new();
|
let mut blk = DecodedBkpBlock::new();
|
||||||
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
|
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||||
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||||
trace!(
|
trace!(
|
||||||
"XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
"XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
||||||
@@ -659,7 +734,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
|||||||
if prev_blkno != blkno {
|
if prev_blkno != blkno {
|
||||||
prev_blkno = blkno;
|
prev_blkno = blkno;
|
||||||
let mut blk = DecodedBkpBlock::new();
|
let mut blk = DecodedBkpBlock::new();
|
||||||
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
|
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||||
blk.blkno = blkno;
|
blk.blkno = blkno;
|
||||||
blocks.push(blk);
|
blocks.push(blk);
|
||||||
}
|
}
|
||||||
@@ -694,7 +769,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
|||||||
}
|
}
|
||||||
} else if info == pg_constants::XLOG_XACT_ABORT {
|
} else if info == pg_constants::XLOG_XACT_ABORT {
|
||||||
let mut blk = DecodedBkpBlock::new();
|
let mut blk = DecodedBkpBlock::new();
|
||||||
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
|
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||||
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||||
trace!(
|
trace!(
|
||||||
"XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
"XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
||||||
@@ -726,7 +801,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
|||||||
if prev_blkno != blkno {
|
if prev_blkno != blkno {
|
||||||
prev_blkno = blkno;
|
prev_blkno = blkno;
|
||||||
let mut blk = DecodedBkpBlock::new();
|
let mut blk = DecodedBkpBlock::new();
|
||||||
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
|
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||||
blk.blkno = blkno;
|
blk.blkno = blkno;
|
||||||
blocks.push(blk);
|
blocks.push(blk);
|
||||||
}
|
}
|
||||||
@@ -782,6 +857,79 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
|||||||
} else {
|
} else {
|
||||||
trace!("XLOG_TBLSPC_DROP is not handled yet");
|
trace!("XLOG_TBLSPC_DROP is not handled yet");
|
||||||
}
|
}
|
||||||
|
} else if xlogrec.xl_rmid == pg_constants::RM_HEAP_ID {
|
||||||
|
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||||
|
let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32;
|
||||||
|
if info == pg_constants::XLOG_HEAP_INSERT {
|
||||||
|
let xlrec = XlHeapInsert::decode(&mut buf);
|
||||||
|
if (xlrec.flags
|
||||||
|
& (pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED
|
||||||
|
| pg_constants::XLH_INSERT_ALL_FROZEN_SET))
|
||||||
|
!= 0
|
||||||
|
{
|
||||||
|
let mut blk = DecodedBkpBlock::new();
|
||||||
|
blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
|
||||||
|
blk.blkno = blkno;
|
||||||
|
blk.rnode_spcnode = blocks[0].rnode_spcnode;
|
||||||
|
blk.rnode_dbnode = blocks[0].rnode_dbnode;
|
||||||
|
blk.rnode_relnode = blocks[0].rnode_relnode;
|
||||||
|
blocks.push(blk);
|
||||||
|
}
|
||||||
|
} else if info == pg_constants::XLOG_HEAP_DELETE {
|
||||||
|
let xlrec = XlHeapDelete::decode(&mut buf);
|
||||||
|
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
|
||||||
|
let mut blk = DecodedBkpBlock::new();
|
||||||
|
blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
|
||||||
|
blk.blkno = blkno;
|
||||||
|
blk.rnode_spcnode = blocks[0].rnode_spcnode;
|
||||||
|
blk.rnode_dbnode = blocks[0].rnode_dbnode;
|
||||||
|
blk.rnode_relnode = blocks[0].rnode_relnode;
|
||||||
|
blocks.push(blk);
|
||||||
|
}
|
||||||
|
} else if info == pg_constants::XLOG_HEAP_UPDATE
|
||||||
|
|| info == pg_constants::XLOG_HEAP_HOT_UPDATE
|
||||||
|
{
|
||||||
|
let xlrec = XlHeapUpdate::decode(&mut buf);
|
||||||
|
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
|
||||||
|
let mut blk = DecodedBkpBlock::new();
|
||||||
|
blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
|
||||||
|
blk.blkno = blkno;
|
||||||
|
blk.rnode_spcnode = blocks[0].rnode_spcnode;
|
||||||
|
blk.rnode_dbnode = blocks[0].rnode_dbnode;
|
||||||
|
blk.rnode_relnode = blocks[0].rnode_relnode;
|
||||||
|
blocks.push(blk);
|
||||||
|
}
|
||||||
|
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0
|
||||||
|
&& blocks.len() > 1
|
||||||
|
{
|
||||||
|
let mut blk = DecodedBkpBlock::new();
|
||||||
|
blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
|
||||||
|
blk.blkno = blocks[1].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32;
|
||||||
|
blk.rnode_spcnode = blocks[1].rnode_spcnode;
|
||||||
|
blk.rnode_dbnode = blocks[1].rnode_dbnode;
|
||||||
|
blk.rnode_relnode = blocks[1].rnode_relnode;
|
||||||
|
blocks.push(blk);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if xlogrec.xl_rmid == pg_constants::RM_HEAP2_ID {
|
||||||
|
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||||
|
if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
|
||||||
|
let xlrec = XlHeapMultiInsert::decode(&mut buf);
|
||||||
|
if (xlrec.flags
|
||||||
|
& (pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED
|
||||||
|
| pg_constants::XLH_INSERT_ALL_FROZEN_SET))
|
||||||
|
!= 0
|
||||||
|
{
|
||||||
|
let mut blk = DecodedBkpBlock::new();
|
||||||
|
let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32;
|
||||||
|
blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
|
||||||
|
blk.blkno = blkno;
|
||||||
|
blk.rnode_spcnode = blocks[0].rnode_spcnode;
|
||||||
|
blk.rnode_dbnode = blocks[0].rnode_dbnode;
|
||||||
|
blk.rnode_relnode = blocks[0].rnode_relnode;
|
||||||
|
blocks.push(blk);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
DecodedWALRecord {
|
DecodedWALRecord {
|
||||||
|
|||||||
@@ -246,13 +246,13 @@ fn walreceiver_main(
|
|||||||
== pg_constants::XLOG_SMGR_TRUNCATE
|
== pg_constants::XLOG_SMGR_TRUNCATE
|
||||||
{
|
{
|
||||||
let truncate = XlSmgrTruncate::decode(&decoded);
|
let truncate = XlSmgrTruncate::decode(&decoded);
|
||||||
if (truncate.flags & SMGR_TRUNCATE_HEAP) != 0 {
|
if (truncate.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
|
||||||
let tag = BufferTag {
|
let tag = BufferTag {
|
||||||
rel: RelTag {
|
rel: RelTag {
|
||||||
spcnode: truncate.rnode.spcnode,
|
spcnode: truncate.rnode.spcnode,
|
||||||
dbnode: truncate.rnode.dbnode,
|
dbnode: truncate.rnode.dbnode,
|
||||||
relnode: truncate.rnode.relnode,
|
relnode: truncate.rnode.relnode,
|
||||||
forknum: MAIN_FORKNUM,
|
forknum: pg_constants::MAIN_FORKNUM,
|
||||||
},
|
},
|
||||||
blknum: truncate.blkno,
|
blknum: truncate.blkno,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -226,7 +226,7 @@ impl WalRedoManagerInternal {
|
|||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
let apply_result: Result<Bytes, Error>;
|
let apply_result: Result<Bytes, Error>;
|
||||||
if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM as u8 {
|
if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM {
|
||||||
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
|
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
|
||||||
let mut page = BytesMut::new();
|
let mut page = BytesMut::new();
|
||||||
if let Some(fpi) = base_img {
|
if let Some(fpi) = base_img {
|
||||||
|
|||||||
@@ -7,6 +7,8 @@ pub mod pg_constants;
|
|||||||
pub mod xlog_utils;
|
pub mod xlog_utils;
|
||||||
|
|
||||||
use bytes::{Buf, Bytes, BytesMut};
|
use bytes::{Buf, Bytes, BytesMut};
|
||||||
|
use std::error::Error;
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
// sizeof(ControlFileData)
|
// sizeof(ControlFileData)
|
||||||
const SIZEOF_CONTROLDATA: usize = std::mem::size_of::<ControlFileData>();
|
const SIZEOF_CONTROLDATA: usize = std::mem::size_of::<ControlFileData>();
|
||||||
@@ -49,6 +51,50 @@ pub fn decode_pg_control(mut buf: Bytes) -> Result<ControlFileData, anyhow::Erro
|
|||||||
Ok(controlfile)
|
Ok(controlfile)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct FilePathError {
|
||||||
|
msg: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Error for FilePathError {
|
||||||
|
fn description(&self) -> &str {
|
||||||
|
&self.msg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FilePathError {
|
||||||
|
pub fn new(msg: &str) -> FilePathError {
|
||||||
|
FilePathError {
|
||||||
|
msg: msg.to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<core::num::ParseIntError> for FilePathError {
|
||||||
|
fn from(e: core::num::ParseIntError) -> Self {
|
||||||
|
return FilePathError {
|
||||||
|
msg: format!("invalid filename: {}", e),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for FilePathError {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(f, "invalid filename")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn forkname_to_forknum(forkname: Option<&str>) -> Result<u8, FilePathError> {
|
||||||
|
match forkname {
|
||||||
|
// "main" is not in filenames, it's implicit if the fork name is not present
|
||||||
|
None => Ok(pg_constants::MAIN_FORKNUM),
|
||||||
|
Some("fsm") => Ok(pg_constants::FSM_FORKNUM),
|
||||||
|
Some("vm") => Ok(pg_constants::VISIBILITYMAP_FORKNUM),
|
||||||
|
Some("init") => Ok(pg_constants::INIT_FORKNUM),
|
||||||
|
Some(_) => Err(FilePathError::new("invalid forkname")),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn encode_pg_control(controlfile: ControlFileData) -> Bytes {
|
pub fn encode_pg_control(controlfile: ControlFileData) -> Bytes {
|
||||||
let b: [u8; SIZEOF_CONTROLDATA];
|
let b: [u8; SIZEOF_CONTROLDATA];
|
||||||
|
|
||||||
|
|||||||
@@ -2,13 +2,17 @@
|
|||||||
//
|
//
|
||||||
pub const DEFAULTTABLESPACE_OID: u32 = 1663;
|
pub const DEFAULTTABLESPACE_OID: u32 = 1663;
|
||||||
pub const GLOBALTABLESPACE_OID: u32 = 1664;
|
pub const GLOBALTABLESPACE_OID: u32 = 1664;
|
||||||
//Special values for non-rel files' tags
|
|
||||||
//TODO maybe use enum?
|
//TODO maybe use enum?
|
||||||
pub const PG_CONTROLFILE_FORKNUM: u32 = 42;
|
pub const MAIN_FORKNUM: u8 = 0;
|
||||||
pub const PG_FILENODEMAP_FORKNUM: u32 = 43;
|
pub const FSM_FORKNUM: u8 = 1;
|
||||||
pub const PG_XACT_FORKNUM: u32 = 44;
|
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
|
||||||
pub const PG_MXACT_OFFSETS_FORKNUM: u32 = 45;
|
pub const INIT_FORKNUM: u8 = 3;
|
||||||
pub const PG_MXACT_MEMBERS_FORKNUM: u32 = 46;
|
//Special values for non-rel files' tags
|
||||||
|
pub const PG_CONTROLFILE_FORKNUM: u8 = 42;
|
||||||
|
pub const PG_FILENODEMAP_FORKNUM: u8 = 43;
|
||||||
|
pub const PG_XACT_FORKNUM: u8 = 44;
|
||||||
|
pub const PG_MXACT_OFFSETS_FORKNUM: u8 = 45;
|
||||||
|
pub const PG_MXACT_MEMBERS_FORKNUM: u8 = 46;
|
||||||
|
|
||||||
//
|
//
|
||||||
// constants from clog.h
|
// constants from clog.h
|
||||||
@@ -18,6 +22,13 @@ pub const CLOG_XACTS_PER_PAGE: u32 = 8192 * CLOG_XACTS_PER_BYTE;
|
|||||||
pub const CLOG_BITS_PER_XACT: u8 = 2;
|
pub const CLOG_BITS_PER_XACT: u8 = 2;
|
||||||
pub const CLOG_XACT_BITMASK: u8 = (1 << CLOG_BITS_PER_XACT) - 1;
|
pub const CLOG_XACT_BITMASK: u8 = (1 << CLOG_BITS_PER_XACT) - 1;
|
||||||
|
|
||||||
|
//
|
||||||
|
// Constants from visbilitymap.h
|
||||||
|
//
|
||||||
|
pub const SIZE_OF_PAGE_HEADER: u16 = 24;
|
||||||
|
pub const BITS_PER_HEAPBLOCK: u16 = 2;
|
||||||
|
pub const HEAPBLOCKS_PER_PAGE: u16 = (BLCKSZ - SIZE_OF_PAGE_HEADER) * 8 / BITS_PER_HEAPBLOCK;
|
||||||
|
|
||||||
pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01;
|
pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01;
|
||||||
pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02;
|
pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02;
|
||||||
pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03;
|
pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03;
|
||||||
@@ -53,13 +64,32 @@ pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4;
|
|||||||
// From pg_control.h and rmgrlist.h
|
// From pg_control.h and rmgrlist.h
|
||||||
pub const XLOG_SWITCH: u8 = 0x40;
|
pub const XLOG_SWITCH: u8 = 0x40;
|
||||||
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
|
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
|
||||||
|
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
|
||||||
|
|
||||||
|
// From heapam_xlog.h
|
||||||
|
pub const XLOG_HEAP_INSERT: u8 = 0x00;
|
||||||
|
pub const XLOG_HEAP_DELETE: u8 = 0x10;
|
||||||
|
pub const XLOG_HEAP_UPDATE: u8 = 0x20;
|
||||||
|
pub const XLOG_HEAP_HOT_UPDATE: u8 = 0x40;
|
||||||
|
pub const XLOG_HEAP2_VISIBLE: u8 = 0x40;
|
||||||
|
pub const XLOG_HEAP2_MULTI_INSERT: u8 = 0x50;
|
||||||
|
pub const XLH_INSERT_ALL_FROZEN_SET: u8 = (1 << 5) as u8;
|
||||||
|
pub const XLH_INSERT_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
|
||||||
|
pub const XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
|
||||||
|
pub const XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED: u8 = (1 << 1) as u8;
|
||||||
|
pub const XLH_DELETE_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
|
||||||
|
|
||||||
pub const RM_XLOG_ID: u8 = 0;
|
pub const RM_XLOG_ID: u8 = 0;
|
||||||
pub const RM_XACT_ID: u8 = 1;
|
pub const RM_XACT_ID: u8 = 1;
|
||||||
pub const RM_SMGR_ID: u8 = 2;
|
pub const RM_SMGR_ID: u8 = 2;
|
||||||
pub const RM_CLOG_ID: u8 = 3;
|
pub const RM_CLOG_ID: u8 = 3;
|
||||||
pub const RM_DBASE_ID: u8 = 4;
|
pub const RM_DBASE_ID: u8 = 4;
|
||||||
pub const RM_TBLSPC_ID: u8 = 5;
|
pub const RM_TBLSPC_ID: u8 = 5;
|
||||||
// pub const RM_MULTIXACT_ID:u8 = 6;
|
pub const RM_MULTIXACT_ID: u8 = 6;
|
||||||
|
pub const RM_RELMAP_ID: u8 = 7;
|
||||||
|
pub const RM_STANDBY_ID: u8 = 8;
|
||||||
|
pub const RM_HEAP2_ID: u8 = 9;
|
||||||
|
pub const RM_HEAP_ID: u8 = 10;
|
||||||
|
|
||||||
// from xlogreader.h
|
// from xlogreader.h
|
||||||
pub const XLR_INFO_MASK: u8 = 0x0F;
|
pub const XLR_INFO_MASK: u8 = 0x0F;
|
||||||
|
|||||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: c2409039d6...87080ddc02
@@ -194,14 +194,14 @@ fn parse_hex_str(s: &str) -> Result<u64> {
|
|||||||
|
|
||||||
impl Serializer for NodeId {
|
impl Serializer for NodeId {
|
||||||
fn pack(&self, buf: &mut BytesMut) {
|
fn pack(&self, buf: &mut BytesMut) {
|
||||||
buf.put_u128_le(self.uuid);
|
buf.put_u64_le(self.term);
|
||||||
buf.put_u64(self.term); // use big endian to provide compatibility with memcmp
|
buf.put_u128(self.uuid); // use big endian to provide compatibility with memcmp
|
||||||
}
|
}
|
||||||
|
|
||||||
fn unpack(buf: &mut BytesMut) -> NodeId {
|
fn unpack(buf: &mut BytesMut) -> NodeId {
|
||||||
NodeId {
|
NodeId {
|
||||||
uuid: buf.get_u128_le(),
|
term: buf.get_u64_le(),
|
||||||
term: buf.get_u64(), // use big endian to provide compatibility with memcmp
|
uuid: buf.get_u128(), // use big endian to provide compatibility with memcmp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user