Milvus Dataset
Milvus Dataset is a versatile Python library for efficient management and processing of large-scale datasets. While optimized for seamless integration with Milvus vector database, it also serves as a powerful standalone dataset management tool. The library provides a simple yet powerful interface for creating, writing, reading, and managing datasets, particularly excelling in handling large-scale vector data and general-purpose data management tasks.
Key Features
-
Flexible Storage Support
- Local storage support
- Object storage support (S3/MinIO)
- Easy migration between different storage types
-
Rich Data Type Support
- Basic data types (INT64, VARCHAR, etc.)
- Vector data types (FLOAT_VECTOR)
- JSON fields
- Sparse vectors
- Binary vectors
-
Dataset Management
- Training and test set split support
- Dataset metadata management
- Dataset statistics and analytics
- Schema definition and validation
-
Integration Capabilities
- Import to Milvus database
- Upload to Hugging Face Hub
- Seamless pandas DataFrame integration
- Built-in nearest neighbor computation
- Built-in mock data generation
Installation
pip install milvus-dataset
Quick Start Guide
1. Basic Configuration
from milvus_dataset import ConfigManager, StorageType
ConfigManager().init_storage(
root_path="./data/my-dataset",
storage_type=StorageType.LOCAL,
)
ConfigManager().init_storage(
root_path="s3://bucket/path",
storage_type=StorageType.S3,
options={
"aws_access_key_id": "your_key",
"aws_secret_access_key": "your_secret",
"endpoint_url": "your_endpoint"
}
)
2. Creating a Dataset
from pymilvus import CollectionSchema, DataType, FieldSchema
from milvus_dataset import load_dataset
schema = CollectionSchema(
fields=[
FieldSchema("id", DataType.INT64, is_primary=True),
FieldSchema("text", DataType.VARCHAR, max_length=65535),
FieldSchema("embedding", DataType.FLOAT_VECTOR, dim=1024)
],
description="Text vector dataset"
)
dataset = load_dataset("my-dataset", schema=schema)
3. Writing Data
import pandas as pd
import numpy as np
df = pd.DataFrame({
"id": range(1000),
"text": ["text_" + str(i) for i in range(1000)],
"embedding": [np.random.rand(1024) for _ in range(1000)]
})
with dataset["train"].get_writer(mode="append") as writer:
writer.write(df)
4. Dataset Operations
print(dataset.summary())
dataset.compute_neighbors(
vector_field_name="embedding",
pk_field_name="id",
top_k=100
)
dataset.to_milvus(
milvus_config={
"host": "localhost",
"port": 19530
},
milvus_storage=StorageConfig(
root_path="s3://bucket/path",
storage_type=StorageType.S3,
options={
"aws_access_key_id": "your_key",
"aws_secret_access_key": "your_secret",
"endpoint_url": "your_endpoint"
}
)
)
dataset.to_hf(repo_name="username/dataset-name")
Advanced Usage
Performance Optimization
-
File Size Configuration
with dataset["train"].get_writer(
mode="append",
target_file_size_mb=512,
num_buffers=15,
queue_size=30
) as writer:
writer.write(df)
-
Batch Processing
for batch in dataset["train"].read(mode="batch", batch_size=1000):
process_batch(batch)
Storage Migration
dataset.to_storage(StorageConfig(
storage_type=StorageType.S3,
root_path="s3://bucket/path",
options={...}
))
Common Issues and Solutions
Contributing
We welcome contributions! Please feel free to submit a Pull Request.