add ip addr to columnar (#1805)

This commit is contained in:
PSeitz
2023-01-19 12:36:06 +08:00
committed by GitHub
parent 9f42b6440a
commit f9abd256b7
11 changed files with 285 additions and 59 deletions

View File

@@ -6,7 +6,10 @@ use std::sync::Arc;
use common::BinarySerializable;
pub use dictionary_encoded::BytesColumn;
pub use serialize::{open_column_bytes, open_column_u64, serialize_column_u64};
pub use serialize::{
open_column_bytes, open_column_u128, open_column_u64, serialize_column_u128,
serialize_column_u64,
};
use crate::column_index::ColumnIndex;
use crate::column_values::ColumnValues;

View File

@@ -2,23 +2,43 @@ use std::io;
use std::io::Write;
use std::sync::Arc;
use common::{CountingWriter, OwnedBytes};
use common::OwnedBytes;
use sstable::Dictionary;
use crate::column::{BytesColumn, Column};
use crate::column_index::{serialize_column_index, SerializableColumnIndex};
use crate::column_values::serialize::serialize_column_values_u128;
use crate::column_values::{
serialize_column_values, ColumnValues, MonotonicallyMappableToU64, ALL_CODEC_TYPES,
serialize_column_values, ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64,
ALL_CODEC_TYPES,
};
pub fn serialize_column_u128<
F: Fn() -> I,
I: Iterator<Item = T>,
T: MonotonicallyMappableToU128,
>(
column_index: SerializableColumnIndex<'_>,
column_values: F,
num_vals: u32,
output: &mut impl Write,
) -> io::Result<()> {
let column_index_num_bytes = serialize_column_index(column_index, output)?;
serialize_column_values_u128(
|| column_values().map(|val| val.to_u128()),
num_vals,
output,
)?;
output.write_all(&column_index_num_bytes.to_le_bytes())?;
Ok(())
}
pub fn serialize_column_u64<T: MonotonicallyMappableToU64>(
column_index: SerializableColumnIndex<'_>,
column_values: &impl ColumnValues<T>,
output: &mut impl Write,
) -> io::Result<()> {
let mut counting_writer = CountingWriter::wrap(output);
serialize_column_index(column_index, &mut counting_writer)?;
let column_index_num_bytes = counting_writer.written_bytes() as u32;
let output = counting_writer.finish();
let column_index_num_bytes = serialize_column_index(column_index, output)?;
serialize_column_values(column_values, &ALL_CODEC_TYPES[..], output)?;
output.write_all(&column_index_num_bytes.to_le_bytes())?;
Ok(())
@@ -41,6 +61,25 @@ pub fn open_column_u64<T: MonotonicallyMappableToU64>(bytes: OwnedBytes) -> io::
})
}
pub fn open_column_u128<T: MonotonicallyMappableToU128>(
bytes: OwnedBytes,
) -> io::Result<Column<T>> {
let (body, column_index_num_bytes_payload) = bytes.rsplit(4);
let column_index_num_bytes = u32::from_le_bytes(
column_index_num_bytes_payload
.as_slice()
.try_into()
.unwrap(),
);
let (column_index_data, column_values_data) = body.split(column_index_num_bytes as usize);
let column_index = crate::column_index::open_column_index(column_index_data)?;
let column_values = crate::column_values::open_u128_mapped(column_values_data)?;
Ok(Column {
idx: column_index,
values: column_values,
})
}
pub fn open_column_bytes(data: OwnedBytes) -> io::Result<BytesColumn> {
let (body, dictionary_len_bytes) = data.rsplit(4);
let dictionary_len = u32::from_le_bytes(dictionary_len_bytes.as_slice().try_into().unwrap());

View File

@@ -1,7 +1,7 @@
use std::io;
use std::io::Write;
use common::OwnedBytes;
use common::{CountingWriter, OwnedBytes};
use crate::column_index::multivalued_index::{serialize_multivalued_index, MultivaluedIndex};
use crate::column_index::optional_index::serialize_optional_index;
@@ -29,19 +29,21 @@ impl<'a> SerializableColumnIndex<'a> {
pub fn serialize_column_index(
column_index: SerializableColumnIndex,
output: &mut impl Write,
) -> io::Result<()> {
) -> io::Result<u32> {
let mut output = CountingWriter::wrap(output);
let cardinality = column_index.get_cardinality().to_code();
output.write_all(&[cardinality])?;
match column_index {
SerializableColumnIndex::Full => {}
SerializableColumnIndex::Optional(optional_index) => {
serialize_optional_index(&*optional_index, output)?
serialize_optional_index(&*optional_index, &mut output)?
}
SerializableColumnIndex::Multivalued(multivalued_index) => {
serialize_multivalued_index(multivalued_index, output)?
serialize_multivalued_index(multivalued_index, &mut output)?
}
}
Ok(())
let column_index_num_bytes = output.written_bytes() as u32;
Ok(column_index_num_bytes)
}
pub fn open_column_index(mut bytes: OwnedBytes) -> io::Result<ColumnIndex<'static>> {

View File

@@ -28,7 +28,7 @@ mod compact_space;
mod line;
mod linear;
pub(crate) mod monotonic_mapping;
// mod monotonic_mapping_u128;
pub(crate) mod monotonic_mapping_u128;
mod column;
mod column_with_cardinality;
@@ -37,7 +37,7 @@ pub mod serialize;
pub use self::column::{monotonic_map_column, ColumnValues, IterColumn, VecColumn};
pub use self::monotonic_mapping::{MonotonicallyMappableToU64, StrictlyMonotonicFn};
// pub use self::monotonic_mapping_u128::MonotonicallyMappableToU128;
pub use self::monotonic_mapping_u128::MonotonicallyMappableToU128;
pub use self::serialize::{serialize_and_load, serialize_column_values, NormalizedHeader};
use crate::column_values::bitpacked::BitpackedCodec;
use crate::column_values::blockwise_linear::BlockwiseLinearCodec;
@@ -136,6 +136,19 @@ impl U128FastFieldCodecType {
// // Ok(Arc::new(monotonic_map_column(reader, inverted)))
// }
/// Returns the correct codec reader wrapped in the `Arc` for the data.
pub fn open_u128_mapped<T: MonotonicallyMappableToU128>(
mut bytes: OwnedBytes,
) -> io::Result<Arc<dyn ColumnValues<T>>> {
let header = U128Header::deserialize(&mut bytes)?;
assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace);
let reader = CompactSpaceDecompressor::open(bytes)?;
let inverted: StrictlyMonotonicMappingInverter<StrictlyMonotonicMappingToInternal<T>> =
StrictlyMonotonicMappingToInternal::<T>::new().into();
Ok(Arc::new(monotonic_map_column(reader, inverted)))
}
/// Returns the correct codec reader wrapped in the `Arc` for the data.
pub fn open_u64_mapped<T: MonotonicallyMappableToU64>(
mut bytes: OwnedBytes,

View File

@@ -2,6 +2,7 @@ use std::marker::PhantomData;
use fastdivide::DividerU64;
use super::MonotonicallyMappableToU128;
use crate::RowId;
/// Monotonic maps a value to u64 value space.
@@ -80,21 +81,20 @@ impl<T> StrictlyMonotonicMappingToInternal<T> {
}
}
// TODO
// impl<External: MonotonicallyMappableToU128, T: MonotonicallyMappableToU128>
// StrictlyMonotonicFn<External, u128> for StrictlyMonotonicMappingToInternal<T>
// where T: MonotonicallyMappableToU128
// {
// #[inline(always)]
// fn mapping(&self, inp: External) -> u128 {
// External::to_u128(inp)
// }
impl<External: MonotonicallyMappableToU128, T: MonotonicallyMappableToU128>
StrictlyMonotonicFn<External, u128> for StrictlyMonotonicMappingToInternal<T>
where T: MonotonicallyMappableToU128
{
#[inline(always)]
fn mapping(&self, inp: External) -> u128 {
External::to_u128(inp)
}
// #[inline(always)]
// fn inverse(&self, out: u128) -> External {
// External::from_u128(out)
// }
// }
#[inline(always)]
fn inverse(&self, out: u128) -> External {
External::from_u128(out)
}
}
impl<External: MonotonicallyMappableToU64, T: MonotonicallyMappableToU64>
StrictlyMonotonicFn<External, u64> for StrictlyMonotonicMappingToInternal<T>

View File

@@ -35,6 +35,7 @@ use super::{
monotonic_map_column, ColumnValues, FastFieldCodec, FastFieldCodecType,
MonotonicallyMappableToU64, U128FastFieldCodecType, VecColumn, ALL_CODEC_TYPES,
};
use crate::column_values::compact_space::CompactSpaceCompressor;
/// The normalized header gives some parameters after applying the following
/// normalization of the vector:
@@ -182,32 +183,23 @@ pub(crate) fn estimate<T: MonotonicallyMappableToU64>(
}
}
// TODO
/// Serializes u128 values with the compact space codec.
// pub fn serialize_u128_new<F: Fn() -> I, I: Iterator<Item = u128>>(
// value_index: ColumnIndex,
// iter_gen: F,
// num_vals: u32,
// output: &mut impl io::Write,
// ) -> io::Result<()> {
// let header = U128Header {
// num_vals,
// codec_type: U128FastFieldCodecType::CompactSpace,
// };
// header.serialize(output)?;
// let compressor = CompactSpaceCompressor::train_from(iter_gen(), num_vals);
// compressor.compress_into(iter_gen(), output).unwrap();
pub fn serialize_column_values_u128<F: Fn() -> I, I: Iterator<Item = u128>>(
iter_gen: F,
num_vals: u32,
output: &mut impl io::Write,
) -> io::Result<()> {
let header = U128Header {
num_vals,
codec_type: U128FastFieldCodecType::CompactSpace,
};
header.serialize(output)?;
// let null_index_footer = ColumnFooter {
// cardinality: value_index.get_cardinality(),
// null_index_codec: NullIndexCodec::Full,
// null_index_byte_range: 0..0,
// };
// append_null_index_footer(output, null_index_footer)?;
// append_format_version(output)?;
let compressor = CompactSpaceCompressor::train_from(iter_gen(), num_vals);
compressor.compress_into(iter_gen(), output)?;
// Ok(())
// }
Ok(())
}
/// Serializes the column with the codec with the best estimate on the data.
pub fn serialize_column_values<T: MonotonicallyMappableToU64>(

View File

@@ -11,6 +11,7 @@ pub enum ColumnType {
Bytes,
Numerical(NumericalType),
Bool,
IpAddr,
}
impl ColumnType {
@@ -31,6 +32,10 @@ impl ColumnType {
column_type_category = ColumnTypeCategory::Bool;
numerical_type_code = 0u8;
}
ColumnType::IpAddr => {
column_type_category = ColumnTypeCategory::IpAddr;
numerical_type_code = 0u8;
}
}
place_bits::<0, 3>(column_type_category.to_code()) | place_bits::<3, 6>(numerical_type_code)
}
@@ -49,6 +54,12 @@ impl ColumnType {
}
Ok(ColumnType::Bool)
}
ColumnTypeCategory::IpAddr => {
if numerical_type_code != 0u8 {
return Err(InvalidData);
}
Ok(ColumnType::IpAddr)
}
ColumnTypeCategory::Str => {
if numerical_type_code != 0u8 {
return Err(InvalidData);
@@ -76,6 +87,7 @@ pub(crate) enum ColumnTypeCategory {
Bool = 0u8,
Str = 1u8,
Numerical = 2u8,
IpAddr = 3u8,
}
impl ColumnTypeCategory {
@@ -88,6 +100,7 @@ impl ColumnTypeCategory {
0u8 => Ok(Self::Bool),
1u8 => Ok(Self::Str),
2u8 => Ok(Self::Numerical),
3u8 => Ok(Self::IpAddr),
_ => Err(InvalidData),
}
}

View File

@@ -1,3 +1,5 @@
use std::net::Ipv6Addr;
use crate::dictionary::UnorderedId;
use crate::utils::{place_bits, pop_first_byte, select_bits};
use crate::value::NumericalValue;
@@ -25,12 +27,12 @@ struct ColumnOperationMetadata {
impl ColumnOperationMetadata {
fn to_code(self) -> u8 {
place_bits::<0, 4>(self.len) | place_bits::<4, 8>(self.op_type.to_code())
place_bits::<0, 6>(self.len) | place_bits::<6, 8>(self.op_type.to_code())
}
fn try_from_code(code: u8) -> Result<Self, InvalidData> {
let len = select_bits::<0, 4>(code);
let typ_code = select_bits::<4, 8>(code);
let len = select_bits::<0, 6>(code);
let typ_code = select_bits::<6, 8>(code);
let column_type = ColumnOperationType::try_from_code(typ_code)?;
Ok(ColumnOperationMetadata {
op_type: column_type,
@@ -142,9 +144,21 @@ impl SymbolValue for bool {
}
}
impl SymbolValue for Ipv6Addr {
fn serialize(self, buffer: &mut [u8]) -> u8 {
buffer[0..16].copy_from_slice(&self.octets());
16
}
fn deserialize(bytes: &[u8]) -> Self {
let octets: [u8; 16] = bytes[0..16].try_into().unwrap();
Ipv6Addr::from(octets)
}
}
#[derive(Default)]
struct MiniBuffer {
pub bytes: [u8; 10],
pub bytes: [u8; 17],
pub len: u8,
}

View File

@@ -4,6 +4,7 @@ mod serializer;
mod value_index;
use std::io;
use std::net::Ipv6Addr;
use column_operation::ColumnOperation;
use common::CountingWriter;
@@ -11,7 +12,9 @@ use serializer::ColumnarSerializer;
use stacker::{Addr, ArenaHashMap, MemoryArena};
use crate::column_index::SerializableColumnIndex;
use crate::column_values::{ColumnValues, MonotonicallyMappableToU64, VecColumn};
use crate::column_values::{
ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64, VecColumn,
};
use crate::columnar::column_type::{ColumnType, ColumnTypeCategory};
use crate::columnar::writer::column_writers::{
ColumnWriter, NumericalColumnWriter, StrColumnWriter,
@@ -30,6 +33,7 @@ struct SpareBuffers {
u64_values: Vec<u64>,
f64_values: Vec<f64>,
bool_values: Vec<bool>,
ip_addr_values: Vec<Ipv6Addr>,
}
/// Makes it possible to create a new columnar.
@@ -48,6 +52,7 @@ struct SpareBuffers {
pub struct ColumnarWriter {
numerical_field_hash_map: ArenaHashMap,
bool_field_hash_map: ArenaHashMap,
ip_addr_field_hash_map: ArenaHashMap,
bytes_field_hash_map: ArenaHashMap,
arena: MemoryArena,
// Dictionaries used to store dictionary-encoded values.
@@ -60,6 +65,7 @@ impl Default for ColumnarWriter {
ColumnarWriter {
numerical_field_hash_map: ArenaHashMap::new(10_000),
bool_field_hash_map: ArenaHashMap::new(10_000),
ip_addr_field_hash_map: ArenaHashMap::new(10_000),
bytes_field_hash_map: ArenaHashMap::new(10_000),
dictionaries: Vec::new(),
arena: MemoryArena::default(),
@@ -90,6 +96,22 @@ impl ColumnarWriter {
);
}
pub fn record_ip_addr(&mut self, doc: RowId, column_name: &str, ip_addr: Ipv6Addr) {
assert!(
!column_name.as_bytes().contains(&0u8),
"key may not contain the 0 byte"
);
let (hash_map, arena) = (&mut self.ip_addr_field_hash_map, &mut self.arena);
hash_map.mutate_or_create(
column_name.as_bytes(),
|column_opt: Option<ColumnWriter>| {
let mut column: ColumnWriter = column_opt.unwrap_or_default();
column.record(doc, ip_addr, arena);
column
},
);
}
pub fn record_bool(&mut self, doc: RowId, column_name: &str, val: bool) {
assert!(
!column_name.as_bytes().contains(&0u8),
@@ -148,9 +170,16 @@ impl ColumnarWriter {
.iter()
.map(|(term, addr, _)| (term, ColumnTypeCategory::Bool, addr)),
);
field_columns.extend(
self.ip_addr_field_hash_map
.iter()
.map(|(term, addr, _)| (term, ColumnTypeCategory::IpAddr, addr)),
);
field_columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type));
let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries);
let mut symbol_byte_buffer: Vec<u8> = Vec::new();
for (column_name, bytes_or_numerical, addr) in field_columns {
match bytes_or_numerical {
ColumnTypeCategory::Bool => {
@@ -166,6 +195,19 @@ impl ColumnarWriter {
&mut column_serializer,
)?;
}
ColumnTypeCategory::IpAddr => {
let column_writer: ColumnWriter = self.ip_addr_field_hash_map.read(addr);
let cardinality = column_writer.get_cardinality(num_docs);
let mut column_serializer =
serializer.serialize_column(column_name, ColumnType::IpAddr);
serialize_ip_addr_column(
cardinality,
num_docs,
column_writer.operation_iterator(arena, &mut symbol_byte_buffer),
buffers,
&mut column_serializer,
)?;
}
ColumnTypeCategory::Str => {
let str_column_writer: StrColumnWriter = self.bytes_field_hash_map.read(addr);
let dictionary_builder =
@@ -317,6 +359,76 @@ fn serialize_bool_column(
Ok(())
}
fn serialize_ip_addr_column(
cardinality: Cardinality,
num_docs: RowId,
column_operations_it: impl Iterator<Item = ColumnOperation<Ipv6Addr>>,
buffers: &mut SpareBuffers,
wrt: &mut impl io::Write,
) -> io::Result<()> {
let SpareBuffers {
value_index_builders,
ip_addr_values,
..
} = buffers;
serialize_column_u128(
column_operations_it,
cardinality,
num_docs,
value_index_builders,
ip_addr_values,
wrt,
)?;
Ok(())
}
fn serialize_column_u128<
T: Copy + std::fmt::Debug + Send + Sync + MonotonicallyMappableToU128 + PartialOrd,
>(
op_iterator: impl Iterator<Item = ColumnOperation<T>>,
cardinality: Cardinality,
num_docs: RowId,
value_index_builders: &mut PreallocatedIndexBuilders,
values: &mut Vec<T>,
mut wrt: impl io::Write,
) -> io::Result<()>
where
for<'a> VecColumn<'a, T>: ColumnValues<T>,
{
values.clear();
// TODO: split index and values
let serializable_column_index = match cardinality {
Cardinality::Full => {
consume_operation_iterator(
op_iterator,
value_index_builders.borrow_required_index_builder(),
values,
);
SerializableColumnIndex::Full
}
Cardinality::Optional => {
let optional_index_builder = value_index_builders.borrow_optional_index_builder();
consume_operation_iterator(op_iterator, optional_index_builder, values);
let optional_index = optional_index_builder.finish(num_docs);
SerializableColumnIndex::Optional(Box::new(optional_index))
}
Cardinality::Multivalued => {
let multivalued_index_builder = value_index_builders.borrow_multivalued_index_builder();
consume_operation_iterator(op_iterator, multivalued_index_builder, values);
let _multivalued_index = multivalued_index_builder.finish(num_docs);
todo!();
// SerializableColumnIndex::Multivalued(Box::new(multivalued_index))
}
};
crate::column::serialize_column_u128(
serializable_column_index,
|| values.iter().cloned(),
values.len() as u32,
&mut wrt,
)?;
Ok(())
}
fn serialize_column<
T: Copy + Default + std::fmt::Debug + Send + Sync + MonotonicallyMappableToU64 + PartialOrd,
>(
@@ -349,7 +461,7 @@ where
Cardinality::Multivalued => {
let multivalued_index_builder = value_index_builders.borrow_multivalued_index_builder();
consume_operation_iterator(op_iterator, multivalued_index_builder, values);
let multivalued_index = multivalued_index_builder.finish(num_docs);
let _multivalued_index = multivalued_index_builder.finish(num_docs);
todo!();
// SerializableColumnIndex::Multivalued(Box::new(multivalued_index))
}

View File

@@ -1,5 +1,5 @@
use std::io;
use std::net::IpAddr;
use std::net::Ipv6Addr;
use common::file_slice::FileSlice;
use common::{HasLen, OwnedBytes};
@@ -14,7 +14,7 @@ pub enum DynamicColumn {
I64(Column<i64>),
U64(Column<u64>),
F64(Column<f64>),
IpAddr(Column<IpAddr>),
IpAddr(Column<Ipv6Addr>),
DateTime(Column<DateTime>),
Str(BytesColumn),
}
@@ -49,6 +49,12 @@ impl From<BytesColumn> for DynamicColumn {
}
}
impl From<Column<Ipv6Addr>> for DynamicColumn {
fn from(column: Column<Ipv6Addr>) -> Self {
DynamicColumn::IpAddr(column)
}
}
#[derive(Clone)]
pub struct DynamicColumnHandle {
pub(crate) file_slice: FileSlice,
@@ -81,6 +87,7 @@ impl DynamicColumnHandle {
}
},
ColumnType::Bool => crate::column::open_column_u64::<bool>(column_bytes)?.into(),
ColumnType::IpAddr => crate::column::open_column_u128::<Ipv6Addr>(column_bytes)?.into(),
};
Ok(dynamic_column)
}

View File

@@ -1,3 +1,6 @@
use std::net::{IpAddr, Ipv6Addr};
use crate::column_values::MonotonicallyMappableToU128;
use crate::columnar::ColumnType;
use crate::dynamic_column::{DynamicColumn, DynamicColumnHandle};
use crate::value::NumericalValue;
@@ -36,6 +39,34 @@ fn test_dataframe_writer_bool() {
assert_eq!(&vals, &[None, Some(false), None, Some(true), None,]);
}
#[test]
fn test_dataframe_writer_ip_addr() {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_ip_addr(1, "ip_addr", Ipv6Addr::from_u128(1001));
dataframe_writer.record_ip_addr(3, "ip_addr", Ipv6Addr::from_u128(1050));
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(5, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("ip_addr").unwrap();
assert_eq!(cols.len(), 1);
// assert_eq!(cols[0].num_bytes(), 29);
assert_eq!(cols[0].column_type(), ColumnType::IpAddr);
let dyn_bool_col = cols[0].open().unwrap();
let DynamicColumn::IpAddr(ip_col) = dyn_bool_col else { panic!(); };
let vals: Vec<Option<Ipv6Addr>> = (0..5).map(|row_id| ip_col.first(row_id)).collect();
assert_eq!(
&vals,
&[
None,
Some(Ipv6Addr::from_u128(1001)),
None,
Some(Ipv6Addr::from_u128(1050)),
None,
]
);
}
#[test]
fn test_dataframe_writer_numerical() {
let mut dataframe_writer = ColumnarWriter::default();