mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 15:22:56 +00:00
refactor: load metadata using offical impl (#7302)
* refactor: load metadata using offical impl Signed-off-by: discord9 <discord9@163.com> * pcr Signed-off-by: discord9 <discord9@163.com> --------- Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
@@ -12,17 +12,22 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::result::Result as StdResult;
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::FutureExt;
|
||||
use futures::future::BoxFuture;
|
||||
use object_store::ObjectStore;
|
||||
use parquet::file::FOOTER_SIZE;
|
||||
use parquet::arrow::async_reader::MetadataFetch;
|
||||
use parquet::errors::{ParquetError, Result as ParquetResult};
|
||||
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
|
||||
use snafu::ResultExt;
|
||||
use snafu::{IntoError as _, ResultExt};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
|
||||
/// The estimated size of the footer and metadata need to read from the end of parquet file.
|
||||
const DEFAULT_PREFETCH_SIZE: u64 = 64 * 1024;
|
||||
|
||||
/// Load the metadata of parquet file in an async way.
|
||||
pub(crate) struct MetadataLoader<'a> {
|
||||
// An object store that supports async read
|
||||
object_store: ObjectStore,
|
||||
@@ -46,111 +51,7 @@ impl<'a> MetadataLoader<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Async load the metadata of parquet file.
|
||||
///
|
||||
/// Read [DEFAULT_PREFETCH_SIZE] from the end of parquet file at first, if File Metadata is in the
|
||||
/// read range, decode it and return [ParquetMetaData], otherwise, read again to get the rest of the metadata.
|
||||
///
|
||||
/// Parquet File Format:
|
||||
/// ```text
|
||||
/// ┌───────────────────────────────────┐
|
||||
/// |4-byte magic number "PAR1" |
|
||||
/// |───────────────────────────────────|
|
||||
/// |Column 1 Chunk 1 + Column Metadata |
|
||||
/// |Column 2 Chunk 1 + Column Metadata |
|
||||
/// |... |
|
||||
/// |Column N Chunk M + Column Metadata |
|
||||
/// |───────────────────────────────────|
|
||||
/// |File Metadata |
|
||||
/// |───────────────────────────────────|
|
||||
/// |4-byte length of file metadata |
|
||||
/// |4-byte magic number "PAR1" |
|
||||
/// └───────────────────────────────────┘
|
||||
/// ```
|
||||
///
|
||||
/// Refer to https://github.com/apache/arrow-rs/blob/093a10e46203be1a0e94ae117854701bf58d4c79/parquet/src/arrow/async_reader/metadata.rs#L55-L106
|
||||
pub async fn load(&self) -> Result<ParquetMetaData> {
|
||||
let object_store = &self.object_store;
|
||||
let path = self.file_path;
|
||||
let file_size = self.get_file_size().await?;
|
||||
|
||||
if file_size < FOOTER_SIZE as u64 {
|
||||
return error::InvalidParquetSnafu {
|
||||
file: path,
|
||||
reason: "file size is smaller than footer size",
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
// Prefetch bytes for metadata from the end and process the footer
|
||||
let buffer_start = file_size.saturating_sub(DEFAULT_PREFETCH_SIZE);
|
||||
let buffer = object_store
|
||||
.read_with(path)
|
||||
.range(buffer_start..file_size)
|
||||
.await
|
||||
.context(error::OpenDalSnafu)?
|
||||
.to_vec();
|
||||
let buffer_len = buffer.len();
|
||||
|
||||
let mut footer = [0; 8];
|
||||
footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
|
||||
|
||||
let footer_tail = ParquetMetaDataReader::decode_footer_tail(&footer).map_err(|e| {
|
||||
error::InvalidParquetSnafu {
|
||||
file: path,
|
||||
reason: format!("failed to decode footer, {e}"),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
let metadata_len = footer_tail.metadata_length() as u64;
|
||||
|
||||
if file_size - (FOOTER_SIZE as u64) < metadata_len {
|
||||
return error::InvalidParquetSnafu {
|
||||
file: path,
|
||||
reason: format!(
|
||||
"the sum of Metadata length {} and Footer size {} is larger than file size {}",
|
||||
metadata_len, FOOTER_SIZE, file_size
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
if (metadata_len as usize) <= buffer_len - FOOTER_SIZE {
|
||||
// The whole metadata is in the first read
|
||||
let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE;
|
||||
let metadata = ParquetMetaDataReader::decode_metadata(
|
||||
&buffer[metadata_start..buffer_len - FOOTER_SIZE],
|
||||
)
|
||||
.map_err(|e| {
|
||||
error::InvalidParquetSnafu {
|
||||
file: path,
|
||||
reason: format!("failed to decode metadata, {e}"),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
Ok(metadata)
|
||||
} else {
|
||||
// The metadata is out of buffer, need to make a second read
|
||||
let metadata_start = file_size - metadata_len - FOOTER_SIZE as u64;
|
||||
let data = object_store
|
||||
.read_with(path)
|
||||
.range(metadata_start..(file_size - FOOTER_SIZE as u64))
|
||||
.await
|
||||
.context(error::OpenDalSnafu)?
|
||||
.to_vec();
|
||||
|
||||
let metadata = ParquetMetaDataReader::decode_metadata(&data).map_err(|e| {
|
||||
error::InvalidParquetSnafu {
|
||||
file: path,
|
||||
reason: format!("failed to decode metadata, {e}"),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
Ok(metadata)
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the size of parquet file.
|
||||
/// Get the size of parquet file. If file_size is 0, stat the object store to get the size.
|
||||
async fn get_file_size(&self) -> Result<u64> {
|
||||
let file_size = match self.file_size {
|
||||
0 => self
|
||||
@@ -163,4 +64,55 @@ impl<'a> MetadataLoader<'a> {
|
||||
};
|
||||
Ok(file_size)
|
||||
}
|
||||
|
||||
pub async fn load(&self) -> Result<ParquetMetaData> {
|
||||
let path = self.file_path;
|
||||
let file_size = self.get_file_size().await?;
|
||||
let reader =
|
||||
ParquetMetaDataReader::new().with_prefetch_hint(Some(DEFAULT_PREFETCH_SIZE as usize));
|
||||
|
||||
let fetch = ObjectStoreFetch {
|
||||
object_store: &self.object_store,
|
||||
file_path: self.file_path,
|
||||
};
|
||||
|
||||
reader
|
||||
.load_and_finish(fetch, file_size)
|
||||
.await
|
||||
.map_err(|e| match unbox_external_error(e) {
|
||||
Ok(os_err) => error::OpenDalSnafu {}.into_error(os_err),
|
||||
Err(parquet_err) => error::ReadParquetSnafu { path }.into_error(parquet_err),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Unpack ParquetError to get object_store::Error if possible.
|
||||
fn unbox_external_error(e: ParquetError) -> StdResult<object_store::Error, ParquetError> {
|
||||
match e {
|
||||
ParquetError::External(boxed_err) => match boxed_err.downcast::<object_store::Error>() {
|
||||
Ok(os_err) => Ok(*os_err),
|
||||
Err(parquet_error) => Err(ParquetError::External(parquet_error)),
|
||||
},
|
||||
other => Err(other),
|
||||
}
|
||||
}
|
||||
|
||||
struct ObjectStoreFetch<'a> {
|
||||
object_store: &'a ObjectStore,
|
||||
file_path: &'a str,
|
||||
}
|
||||
|
||||
impl MetadataFetch for ObjectStoreFetch<'_> {
|
||||
fn fetch(&mut self, range: std::ops::Range<u64>) -> BoxFuture<'_, ParquetResult<Bytes>> {
|
||||
async move {
|
||||
let data = self
|
||||
.object_store
|
||||
.read_with(self.file_path)
|
||||
.range(range)
|
||||
.await
|
||||
.map_err(|e| ParquetError::External(Box::new(e)))?;
|
||||
Ok(data.to_bytes())
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user