feat(inverted_index.create): add read/write for external intermediate files (#2942)

* feat(inverted_index.create): add read/write for external intermediate files

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: MAGIC_CODEC_V1 -> CODEC_V1_MAGIC

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: polish comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: fix typos intermedia -> intermediate

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: typos

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: futures_code -> asynchronous_codec

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: bump bytes to 1.5

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2023-12-18 17:44:48 +08:00
committed by GitHub
parent 9af9c0229a
commit 029ff2f1e3
20 changed files with 389 additions and 10 deletions

15
Cargo.lock generated
View File

@@ -633,6 +633,19 @@ dependencies = [
"syn 2.0.39",
]
[[package]]
name = "asynchronous-codec"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a860072022177f903e59730004fb5dc13db9275b79bb2aef7ba8ce831956c233"
dependencies = [
"bytes",
"futures-sink",
"futures-util",
"memchr",
"pin-project-lite",
]
[[package]]
name = "atoi"
version = "1.0.0"
@@ -3961,7 +3974,9 @@ name = "index"
version = "0.4.4"
dependencies = [
"async-trait",
"asynchronous-codec",
"bytemuck",
"bytes",
"common-base",
"common-error",
"common-macro",

View File

@@ -75,6 +75,7 @@ base64 = "0.21"
bigdecimal = "0.4.2"
bitflags = "2.4.1"
bytemuck = "1.12"
bytes = { version = "1.5", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }

View File

@@ -7,7 +7,7 @@ license.workspace = true
[dependencies]
anymap = "1.0.0-beta.2"
bitvec = "1.0"
bytes = { version = "1.1", features = ["serde"] }
bytes.workspace = true
common-error.workspace = true
common-macro.workspace = true
paste = "1.0"

View File

@@ -16,7 +16,7 @@ async-compression = { version = "0.3", features = [
"tokio",
] }
async-trait.workspace = true
bytes = "1.1"
bytes.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-runtime.workspace = true

View File

@@ -13,7 +13,7 @@ async-recursion = "1.0"
async-stream.workspace = true
async-trait.workspace = true
base64.workspace = true
bytes = "1.4"
bytes.workspace = true
common-catalog.workspace = true
common-error.workspace = true
common-grpc-expr.workspace = true

View File

@@ -7,7 +7,7 @@ license.workspace = true
[dependencies]
async-recursion = "1.0"
async-trait.workspace = true
bytes = "1.1"
bytes.workspace = true
catalog.workspace = true
common-catalog.workspace = true
common-error.workspace = true

View File

@@ -12,7 +12,7 @@ async-stream.workspace = true
async-trait.workspace = true
axum = "0.6"
axum-macros = "0.3"
bytes = "1.1"
bytes.workspace = true
catalog.workspace = true
client.workspace = true
common-base.workspace = true

View File

@@ -6,7 +6,9 @@ license.workspace = true
[dependencies]
async-trait.workspace = true
asynchronous-codec = "0.7.0"
bytemuck.workspace = true
bytes.workspace = true
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod create;
pub mod error;
pub mod format;
pub mod search;

View File

@@ -0,0 +1,15 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod sort;

View File

@@ -0,0 +1,24 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::BitVec;
use futures::Stream;
use crate::inverted_index::error::Result;
use crate::inverted_index::Bytes;
mod intermediate_rw;
/// A stream of sorted values along with their associated bitmap
pub type SortedStream = Box<dyn Stream<Item = Result<(Bytes, BitVec)>> + Send + Unpin>;

View File

@@ -0,0 +1,141 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod codec_v1;
use std::collections::BTreeMap;
use asynchronous_codec::{FramedRead, FramedWrite};
use common_base::BitVec;
use futures::{stream, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, StreamExt};
use snafu::ResultExt;
use crate::inverted_index::create::sort::SortedStream;
use crate::inverted_index::error::{
CloseSnafu, FlushSnafu, ReadSnafu, Result, UnknownIntermediateCodecMagicSnafu, WriteSnafu,
};
use crate::inverted_index::Bytes;
/// `IntermediateWriter` serializes and writes intermediate data to the wrapped `writer`
pub struct IntermediateWriter<W> {
writer: W,
}
impl<W: AsyncWrite + Unpin> IntermediateWriter<W> {
/// Creates a new `IntermediateWriter` wrapping an `AsyncWrite`
pub fn new(writer: W) -> IntermediateWriter<W> {
IntermediateWriter { writer }
}
/// Serializes and writes all provided values to the wrapped writer
pub async fn write_all(mut self, values: BTreeMap<Bytes, BitVec>) -> Result<()> {
let (codec_magic, encoder) = (codec_v1::CODEC_V1_MAGIC, codec_v1::IntermediateCodecV1);
self.writer
.write_all(codec_magic)
.await
.context(WriteSnafu)?;
let value_stream = stream::iter(values.into_iter().map(Ok));
let frame_write = FramedWrite::new(&mut self.writer, encoder);
value_stream.forward(frame_write).await?;
self.writer.flush().await.context(FlushSnafu)?;
self.writer.close().await.context(CloseSnafu)
}
}
/// Reads intermediate serialized data from an `AsyncRead` source and converts it to a [`SortedStream`]
pub struct IntermediateReader<R> {
reader: R,
}
impl<R: AsyncRead + Unpin + Send + 'static> IntermediateReader<R> {
pub fn new(reader: R) -> IntermediateReader<R> {
IntermediateReader { reader }
}
/// Reads the magic header, determines the codec, and returns a stream of deserialized values.
pub async fn into_stream(mut self) -> Result<SortedStream> {
let mut magic = [0u8; 4];
self.reader
.read_exact(&mut magic)
.await
.context(ReadSnafu)?;
let decoder = match &magic {
codec_v1::CODEC_V1_MAGIC => codec_v1::IntermediateCodecV1,
_ => return UnknownIntermediateCodecMagicSnafu { magic }.fail(),
};
Ok(Box::new(FramedRead::new(self.reader, decoder)))
}
}
#[cfg(test)]
mod tests {
use futures::io::Cursor;
use super::*;
use crate::inverted_index::error::Error;
#[tokio::test]
async fn test_intermediate_read_write_basic() {
let mut buf = vec![];
let values = BTreeMap::from_iter([
(Bytes::from("a"), BitVec::from_slice(&[0b10101010])),
(Bytes::from("b"), BitVec::from_slice(&[0b01010101])),
]);
let writer = IntermediateWriter::new(&mut buf);
writer.write_all(values.clone()).await.unwrap();
let reader = IntermediateReader::new(Cursor::new(buf));
let mut stream = reader.into_stream().await.unwrap();
let a = stream.next().await.unwrap().unwrap();
assert_eq!(a, (Bytes::from("a"), BitVec::from_slice(&[0b10101010])));
let b = stream.next().await.unwrap().unwrap();
assert_eq!(b, (Bytes::from("b"), BitVec::from_slice(&[0b01010101])));
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn test_intermediate_read_write_empty() {
let mut buf = vec![];
let values = BTreeMap::new();
let writer = IntermediateWriter::new(&mut buf);
writer.write_all(values.clone()).await.unwrap();
let reader = IntermediateReader::new(Cursor::new(buf));
let mut stream = reader.into_stream().await.unwrap();
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn test_intermediate_read_with_invalid_magic() {
let buf = b"invalid".to_vec();
let reader = IntermediateReader::new(Cursor::new(buf));
let result = reader.into_stream().await;
assert!(matches!(
result,
Err(Error::UnknownIntermediateCodecMagic { .. })
))
}
}

View File

@@ -0,0 +1,166 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io;
use asynchronous_codec::{BytesMut, Decoder, Encoder};
use bytes::{Buf, BufMut};
use common_base::BitVec;
use snafu::{location, Location};
use crate::inverted_index::error::{Error, Result};
use crate::inverted_index::Bytes;
const U64_LENGTH: usize = std::mem::size_of::<u64>();
/// Magic bytes for this intermediate codec version
pub const CODEC_V1_MAGIC: &[u8; 4] = b"im01";
/// Codec for serializing and deserializing intermediate data for external sorting.
///
/// Binary format serialization. The item is laid out as follows:
/// ```text
/// [value len][value][bitmap len][bitmap]
/// [8] [?] [8] [?]
/// ```
pub struct IntermediateCodecV1;
/// [`FramedWrite`] requires the [`Encoder`] trait to be implemented.
impl Encoder for IntermediateCodecV1 {
type Item<'a> = (Bytes, BitVec);
type Error = Error;
fn encode(&mut self, item: (Bytes, BitVec), dst: &mut BytesMut) -> Result<()> {
let value_bytes = item.0;
let bitmap_bytes = item.1.into_vec();
dst.reserve(U64_LENGTH * 2 + value_bytes.len() + bitmap_bytes.len());
dst.put_u64_le(value_bytes.len() as u64);
dst.extend_from_slice(&value_bytes);
dst.put_u64_le(bitmap_bytes.len() as u64);
dst.extend_from_slice(&bitmap_bytes);
Ok(())
}
}
/// [`FramedRead`] requires the [`Decoder`] trait to be implemented.
impl Decoder for IntermediateCodecV1 {
type Item = (Bytes, BitVec);
type Error = Error;
/// Decodes the `src` into `(Bytes, BitVec)`. Returns `None` if
/// the `src` does not contain enough data for a complete item.
///
/// Only after successful decoding, the `src` is advanced. Otherwise,
/// it is left untouched to wait for filling more data and retrying.
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
// [value len][value][bitmap len][bitmap]
// [8] [?] [8] [?]
// decode value len
if src.len() < U64_LENGTH {
return Ok(None);
}
let (value_len, buf) = src.split_at(U64_LENGTH);
let value_len = u64::from_le_bytes(value_len.try_into().unwrap()) as usize;
// decode value
if buf.len() < value_len {
return Ok(None);
}
let (value_bytes, buf) = buf.split_at(value_len);
// decode bitmap len
if buf.len() < U64_LENGTH {
return Ok(None);
}
let (bitmap_len, buf) = buf.split_at(U64_LENGTH);
let bitmap_len = u64::from_le_bytes(bitmap_len.try_into().unwrap()) as usize;
// decode bitmap
if buf.len() < bitmap_len {
return Ok(None);
}
let bitmap_bytes = &buf[..bitmap_len];
let item = (value_bytes.to_vec(), BitVec::from_slice(bitmap_bytes));
src.advance(U64_LENGTH * 2 + value_len + bitmap_len);
Ok(Some(item))
}
}
/// Required for [`Encoder`] and [`Decoder`] implementations.
impl From<io::Error> for Error {
fn from(error: io::Error) -> Self {
Error::CommonIoError {
error,
location: location!(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_intermediate_codec_basic() {
let mut codec = IntermediateCodecV1;
let mut buf = BytesMut::new();
let item = (b"hello".to_vec(), BitVec::from_slice(&[0b10101010]));
codec.encode(item.clone(), &mut buf).unwrap();
assert_eq!(codec.decode(&mut buf).unwrap().unwrap(), item);
assert_eq!(codec.decode(&mut buf).unwrap(), None);
let item1 = (b"world".to_vec(), BitVec::from_slice(&[0b01010101]));
codec.encode(item.clone(), &mut buf).unwrap();
codec.encode(item1.clone(), &mut buf).unwrap();
assert_eq!(codec.decode(&mut buf).unwrap().unwrap(), item);
assert_eq!(codec.decode(&mut buf).unwrap().unwrap(), item1);
assert_eq!(codec.decode(&mut buf).unwrap(), None);
assert!(buf.is_empty());
}
#[test]
fn test_intermediate_codec_empty_item() {
let mut codec = IntermediateCodecV1;
let mut buf = BytesMut::new();
let item = (b"".to_vec(), BitVec::from_slice(&[]));
codec.encode(item.clone(), &mut buf).unwrap();
assert_eq!(codec.decode(&mut buf).unwrap().unwrap(), item);
assert_eq!(codec.decode(&mut buf).unwrap(), None);
assert!(buf.is_empty());
}
#[test]
fn test_intermediate_codec_partial() {
let mut codec = IntermediateCodecV1;
let mut buf = BytesMut::new();
let item = (b"hello".to_vec(), BitVec::from_slice(&[0b10101010]));
codec.encode(item.clone(), &mut buf).unwrap();
let partial_length = U64_LENGTH + 3;
let mut partial_bytes = buf.split_to(partial_length);
assert_eq!(codec.decode(&mut partial_bytes).unwrap(), None); // not enough data
partial_bytes.extend_from_slice(&buf[..]);
assert_eq!(codec.decode(&mut partial_bytes).unwrap().unwrap(), item);
assert_eq!(codec.decode(&mut partial_bytes).unwrap(), None);
assert!(partial_bytes.is_empty());
}
}

View File

@@ -150,6 +150,16 @@ pub enum Error {
error: fst::Error,
location: Location,
},
#[snafu(display("Failed to perform IO operation"))]
CommonIoError {
#[snafu(source)]
error: IoError,
location: Location,
},
#[snafu(display("Unknown intermediate codec magic: {magic:?}"))]
UnknownIntermediateCodecMagic { magic: [u8; 4], location: Location },
}
impl ErrorExt for Error {
@@ -168,6 +178,8 @@ impl ErrorExt for Error {
| DecodeProto { .. }
| DecodeFst { .. }
| KeysApplierUnexpectedPredicates { .. }
| CommonIoError { .. }
| UnknownIntermediateCodecMagic { .. }
| FstCompile { .. } => StatusCode::Unexpected,
ParseRegex { .. }

View File

@@ -13,5 +13,7 @@
// limitations under the License.
#![feature(iter_partition_in_place)]
// TODO(zhongzc): remove once further code is added
#![allow(dead_code)]
pub mod inverted_index;

View File

@@ -13,7 +13,7 @@ protobuf-build = { version = "0.15", default-features = false, features = [
async-stream.workspace = true
async-trait.workspace = true
byteorder = "1.4"
bytes = "1.1"
bytes.workspace = true
common-base.workspace = true
common-config.workspace = true
common-error.workspace = true

View File

@@ -17,7 +17,7 @@ async-channel = "1.9"
async-compat = "0.2"
async-stream.workspace = true
async-trait = "0.1"
bytes = "1.4"
bytes.workspace = true
chrono.workspace = true
common-base.workspace = true
common-catalog.workspace = true

View File

@@ -6,7 +6,7 @@ license.workspace = true
[dependencies]
async-trait = "0.1"
bytes = "1.4"
bytes.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-runtime.workspace = true

View File

@@ -19,7 +19,7 @@ auth.workspace = true
axum = { version = "0.6", features = ["headers"] }
axum-macros = "0.3.8"
base64.workspace = true
bytes = "1.2"
bytes.workspace = true
catalog.workspace = true
chrono.workspace = true
common-base.workspace = true

View File

@@ -8,7 +8,7 @@ license.workspace = true
api.workspace = true
aquamarine.workspace = true
async-trait.workspace = true
bytes = "1.1"
bytes.workspace = true
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true