Qdrant Upload Stage
The Qdrant upload stage generates embeddings for documents and optionally uploads them to a Qdrant vector database for semantic search and retrieval.
Features
- Dual Mode Operation: Upload to Qdrant database or store embeddings locally
- Flexible Embedding Sources: Generate new embeddings or use existing ones from metadata
- Automatic Deduplication: Skips documents already present in the collection
- Batch Processing: Efficient batch upload with configurable size
- Optimized Collection Setup: Automatic creation with HNSW indexing and quantization
- Metadata Indexing: Creates indexes on key fields for efficient filtering
- Retry Logic: Automatic retry on failures with configurable attempts
Operating Modes
Qdrant Mode (Default)
Uploads document embeddings to a Qdrant vector database:
- name: qdrant
config:
mode: "qdrant"
vector_store:
url: "http://localhost:6333"
api_key: "your-api-key" # Optional
collection_name: "my_documents"
batch_size: 100
vector_size: 768
embedder:
url: "http://localhost:8000"
model_name: "BAAI/bge-base-en-v1.5"
timeout: 300
api_key: "EMPTY" # Optional
Local Mode
Generates embeddings and stores them in document metadata without uploading:
- name: qdrant
config:
mode: "local"
batch_size: 10
embedder:
url: "http://localhost:8000"
model_name: "BAAI/bge-base-en-v1.5"
timeout: 300
Configuration Parameters
Mode Configuration
mode
- Type: String
- Default:
"qdrant"
- Options:
"qdrant", "local"
- Description: Operating mode - upload to Qdrant database or store embeddings locally
use_existing_embeddings
- Type: Boolean
- Default:
false
- Description: Use embeddings already stored in
document.embedding field instead of generating new ones
- Type: Boolean
- Default:
false
- Description: Include pipeline processing metadata in the Qdrant payload
batch_size
- Type: Integer
- Default:
10 (local mode)
- Description: Number of documents to process in each batch (for local mode only)
Vector Store Configuration (Qdrant Mode)
vector_store.url
- Type: String
- Default:
"http://localhost:6333"
- Description: URL of the Qdrant instance
vector_store.api_key
- Type: String
- Default: None
- Description: API key for Qdrant authentication (optional for local instances)
vector_store.collection_name
- Type: String
- Required: Yes (for Qdrant mode)
- Description: Name of the target collection in Qdrant
vector_store.batch_size
- Type: Integer
- Required: Yes (for Qdrant mode)
- Description: Number of documents to upload per batch
vector_store.vector_size
- Type: Integer
- Required: Yes (for Qdrant mode)
- Description: Dimension of embedding vectors (must match model output)
Embedder Configuration
embedder.url
- Type: String
- Required: Yes (unless
use_existing_embeddings: true)
- Description: URL of the VLLM embedding server
embedder.model_name
- Type: String
- Required: Yes (unless
use_existing_embeddings: true)
- Description: Name of the embedding model
embedder.timeout
- Type: Integer
- Default:
300
- Description: Request timeout in seconds
embedder.api_key
- Type: String
- Default:
"EMPTY"
- Description: API key for VLLM authentication (use "EMPTY" for local servers)
Collection Optimization
When creating a new collection, the step automatically applies optimized settings for production use. These settings balance search quality, speed, and resource usage for large-scale document collections.
Learn more about Qdrant Collections →
Vector Configuration
#### Distance Metric: COSINE
Cosine similarity measures the angle between vectors, making it ideal for text embeddings where the direction matters more than magnitude. This is the standard choice for semantic search applications.
[Read about Distance Metrics →](https://qdrant.tech/documentation/concepts/search/#metrics)
#### On-Disk Storage: Enabled
Stores vectors on disk rather than in RAM, significantly reducing memory requirements for large collections. While slightly slower than in-memory storage, this allows you to store millions of vectors affordably.
#### Shards: 8
Distributes data across 8 shards for parallel processing. More shards improve write throughput and allow better resource utilization, especially important for large datasets.
[Learn about Sharding →](https://qdrant.tech/documentation/guides/distributed_deployment/)
HNSW Index Parameters
HNSW (Hierarchical Navigable Small World) is the indexing algorithm that enables fast approximate nearest neighbor search.
Deep dive into HNSW →
m: 16
Number of bidirectional links created for each node. Higher values improve search quality but increase memory usage and indexing time. 16 is a balanced choice for most applications.
- Lower (4-8): Less memory, faster indexing, slightly lower recall
- Higher (32-64): Better recall, more memory, slower indexing
ef_construct: 128
Size of the dynamic candidate list during index construction. Higher values produce better index quality but take longer to build. 128 provides good quality without excessive build time.
- Lower (64): Faster indexing, slightly lower search quality
- Higher (256-512): Better search quality, slower indexing
full_scan_threshold: 10,000
When collection size is below this threshold, Qdrant uses exact (brute-force) search instead of the HNSW index. Exact search is faster for small collections.
max_indexing_threads: 2
Limits CPU cores used during indexing. Prevents indexing from consuming all available resources.
on_disk: Enabled
Stores the HNSW graph on disk to reduce RAM usage. Essential for collections with millions of vectors.
Quantization
Binary quantization compresses vectors from 32-bit floats to 1-bit representations, reducing memory by ~32x with minimal quality loss. This makes it possible to store much larger collections.
Learn about Quantization →
Type: Binary Quantization
Converts vector components to binary (0 or 1) for massive memory savings. The original vectors are still used for final re-ranking, so search quality remains high.
Benefits:
- 32x memory reduction (32-bit float → 1-bit)
- Faster distance calculations
- More vectors fit in RAM for better performance
- Negligible impact on search quality (typically <2% recall loss)
always_ram: false
Allows quantized vectors to be stored on disk when needed, rather than always keeping them in RAM. This provides flexibility for very large collections.
Optimizer Settings
These settings control how Qdrant manages and optimizes data segments over time.
Learn about Storage Optimization →
indexing_threshold: 20,000
Build HNSW index when segment reaches this size. Smaller values create indexes sooner but may cause more frequent rebuilds.
memmap_threshold: 5,000
Use memory-mapped files for segments larger than this. Memory mapping allows efficient disk-based storage without loading everything into RAM.
max_segment_size: 5,000,000
Maximum vectors per segment. Larger segments are more memory-efficient but may slow down some operations.
max_optimization_threads: 2
CPU cores dedicated to background optimization tasks. Prevents optimization from impacting query performance.
Payload Indexes
Payload indexes enable fast filtering on metadata fields, similar to database indexes. Without these, filtering requires scanning all documents.
Learn about Payload Indexes →
The pipeline automatically creates indexes on common academic metadata fields:
Text Indexes (title, journal)
Enable full-text search and filtering on text fields. The word tokenizer splits text into searchable terms.
- min_token_len: Minimum word length to index
- max_token_len: Maximum word length to index
- lowercase: Normalize to lowercase for case-insensitive search
Example filters:
- Find papers with "neural" in title
- Filter by journal name
- Combine with vector search for semantic + keyword search
Integer Indexes (year, n_citations)
Enable efficient range queries on numeric fields.
Example filters:
- Papers published after 2020
- Papers with >100 citations
- Combine filters: papers from 2015-2023 with >50 citations
Performance Impact:
- Indexes speed up filtering by 100-1000x
- Small storage overhead (~10-20% of original data)
- Slightly slower writes (indexes must be updated)
Stage Behavior
Document metadata is prepared for storage:
- content: Document text content
- metadata: All user metadata fields (unwrapped at root level)
- pipeline_metadata: Processing metadata (if
upload_pipeline_metadata: true)
Type conversions:
year field converted to integer
title field converted to string
Error Handling
- Batch uploads retry up to 3 times on failure
- Failed batches are logged and skipped
- Individual embedding failures are logged without stopping the pipeline
- Scroll operations retry with exponential backoff
Usage Examples
Basic Upload with New Embeddings
- name: qdrant
config:
mode: "qdrant"
vector_store:
url: "http://localhost:6333"
collection_name: "research_papers"
batch_size: 100
vector_size: 768
embedder:
url: "http://localhost:8000"
model_name: "BAAI/bge-base-en-v1.5"
Upload with Existing Embeddings
If embeddings were generated in a previous step:
- name: qdrant
config:
mode: "qdrant"
use_existing_embeddings: true
upload_pipeline_metadata: true
vector_store:
url: "http://localhost:6333"
api_key: "your-api-key"
collection_name: "processed_docs"
batch_size: 50
vector_size: 1024
Local Embedding Generation
Store embeddings in document metadata without uploading:
- name: qdrant
config:
mode: "local"
batch_size: 20
embedder:
url: "http://localhost:8000"
model_name: "sentence-transformers/all-MiniLM-L6-v2"
timeout: 600
VLLM Server Setup
The Qdrant step requires a VLLM server for embedding generation. Start the server:
cd server
python vllm.py
The server provides an OpenAI-compatible embeddings API at /v1/embeddings.
Complete Pipeline Example
pipeline:
inputs:
path: "processed_documents"
stages:
- name: chunking
config:
max_chunk_size: 512
chunk_overlap: 50
- name: metadata
config:
enabled_formats: ["pdf", "markdown"]
enable_scholar_search: true
- name: qdrant
config:
mode: "qdrant"
upload_pipeline_metadata: true
vector_store:
url: "http://localhost:6333"
collection_name: "academic_papers"
batch_size: 100
vector_size: 768
embedder:
url: "http://localhost:8000"
model_name: "BAAI/bge-base-en-v1.5"
timeout: 300
Next Steps
Code Reference
QdrantUploadStep
Bases: PipelineStep
Pipeline step for uploading chunked documents to Qdrant vector database or storing embeddings locally.
Supports two modes:
- "qdrant": Upload embeddings to a Qdrant vector database
- "local": Store embeddings in document metadata without uploading
Source code in eve/steps/qdrant/qdrant_step.py
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581 | class QdrantUploadStep(PipelineStep):
"""Pipeline step for uploading chunked documents to Qdrant vector database or storing embeddings locally.
Supports two modes:
- "qdrant": Upload embeddings to a Qdrant vector database
- "local": Store embeddings in document metadata without uploading
"""
def __init__(self, config: dict, name: str = "QdrantUpload"):
"""Initialize the Qdrant upload step.
Args:
config (dict): Configuration dictionary containing:
- mode (str, optional): "qdrant" or "local". Defaults to "qdrant".
- use_existing_embeddings (bool, optional): If True, use embeddings
from document.embedding field. Defaults to False.
- upload_pipeline_metadata (bool, optional): If True, include
pipeline_metadata in Qdrant payload. Defaults to False.
- vector_store (dict, required for "qdrant" mode):
- url (str): Qdrant instance URL.
- api_key (str, optional): API key for Qdrant authentication.
- collection_name (str): Target collection name.
- batch_size (int): Number of documents per batch.
- vector_size (int): Dimension of embedding vectors.
- embedder (dict, required if use_existing_embeddings=False):
- url (str): URL of VLLM embedding server.
- model_name (str): Embedding model identifier.
- timeout (int, optional): Request timeout in seconds. Defaults to 300.
- api_key (str, optional): API key for VLLM. Defaults to "EMPTY".
- batch_size (int, optional): Batch size for local mode. Defaults to 10.
name (str, optional): Name for logging purposes. Defaults to "QdrantUpload".
Raises:
ValueError: If mode is not "qdrant" or "local".
"""
super().__init__(config, name)
# Determine mode: "qdrant" or "local"
self.mode = config.get("mode", "qdrant").lower()
if self.mode not in ["qdrant", "local"]:
raise ValueError(f"Invalid mode '{self.mode}'. Must be 'qdrant' or 'local'")
# Check if we should use existing embeddings from document.embedding field
self.use_existing_embeddings = config.get("use_existing_embeddings", False)
# Check if we should upload pipeline_metadata to Qdrant
self.upload_pipeline_metadata = config.get("upload_pipeline_metadata", False)
# Initialize VLLM embedder only if not using existing embeddings
if not self.use_existing_embeddings:
embedding_cfg = config["embedder"]
self.embedder = VLLMEmbedder(
url=embedding_cfg["url"],
model_name=embedding_cfg["model_name"],
timeout=embedding_cfg.get("timeout", 300),
api_key=embedding_cfg.get("api_key", "EMPTY"),
)
self.logger.info(f"Initialized VLLM embedder at {embedding_cfg['url']}")
else:
self.embedder = None
self.logger.info("Using existing embeddings from document metadata")
self.logger.info(f"Mode: {self.mode}")
# Initialize Qdrant-specific configuration only if mode is "qdrant"
if self.mode == "qdrant":
vector_store_cfg = config.get("vector_store", {})
self.qdrant_url = vector_store_cfg.get("url", "http://localhost:6333")
self.vector_store_api_key = vector_store_cfg.get("api_key")
# Get collection configuration
self.collection_name = vector_store_cfg["collection_name"]
self.batch_size = vector_store_cfg["batch_size"]
self.vector_size = vector_store_cfg["vector_size"]
# Initialize Qdrant client
self.client = QdrantClient(
url=self.qdrant_url, api_key=self.vector_store_api_key
)
# Ensure collection exists
self._ensure_collection()
self.existing_ids = self._get_existing_ids()
else:
# Local mode: set batch size for processing
self.batch_size = config.get("batch_size", 10)
self.client = None
def _ensure_collection(self) -> None:
"""Create Qdrant collection if it doesn't exist.
Creates a new collection with optimized settings including HNSW indexing,
binary quantization, and on-disk storage. Also creates payload indexes
for efficient filtering.
"""
if self.client.collection_exists(self.collection_name):
self.logger.info(f"Collection '{self.collection_name}' already exists")
return
self.logger.info(f"Creating collection '{self.collection_name}'")
# Create collection with optimized settings
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=models.VectorParams(
size=self.vector_size,
distance=models.Distance.COSINE,
on_disk=True,
),
shard_number=8,
on_disk_payload=True,
quantization_config=models.BinaryQuantization(
binary=models.BinaryQuantizationConfig(always_ram=False)
),
)
# Update HNSW configuration
self.client.update_collection(
collection_name=self.collection_name,
hnsw_config=models.HnswConfigDiff(
m=16,
ef_construct=128,
full_scan_threshold=10_000,
max_indexing_threads=2,
on_disk=True,
),
)
# Update optimizer configuration
self.client.update_collection(
collection_name=self.collection_name,
optimizers_config=models.OptimizersConfigDiff(
indexing_threshold=20000,
memmap_threshold=5000,
deleted_threshold=0.2,
vacuum_min_vector_number=1000,
default_segment_number=2,
max_segment_size=5_000_000,
max_optimization_threads=2,
),
)
# Create payload indexes
self._create_payload_indexes()
self.logger.info("Collection created and optimized")
def _create_payload_indexes(self) -> None:
"""Create indexes on payload fields for efficient filtering.
Creates text indexes for 'title' and 'journal' fields, and integer
indexes for 'year' and 'n_citations' fields to enable fast filtering
and searching on these metadata fields.
"""
# Text index for title
self.client.create_payload_index(
collection_name=self.collection_name,
field_name="title",
field_schema=models.TextIndexParams(
type="text",
tokenizer=models.TokenizerType.WORD,
min_token_len=2,
max_token_len=50,
lowercase=True,
),
)
# Integer indexes
for field in ["year", "n_citations"]:
self.client.create_payload_index(
collection_name=self.collection_name,
field_name=field,
field_schema="integer",
)
# Text index for journal
self.client.create_payload_index(
collection_name=self.collection_name,
field_name="journal",
field_schema=models.TextIndexParams(
type="text",
tokenizer=models.TokenizerType.WORD,
min_token_len=1,
max_token_len=50,
lowercase=True,
),
)
@staticmethod
def _string_to_uint(s: str) -> int:
"""Convert string to unsigned integer using SHA256 hash.
Args:
s (str): Input string to hash.
Returns:
int: Unsigned 64-bit integer derived from the hash.
"""
hash_bytes = hashlib.sha256(s.encode("utf-8")).digest()
return int.from_bytes(hash_bytes[:8], byteorder="big", signed=False)
def _get_existing_ids(self) -> Set[int]:
"""Retrieve all existing point IDs from the collection with retry logic.
Scrolls through the entire collection to fetch all point IDs. Implements
retry logic with exponential backoff to handle temporary failures.
Returns:
Set[int]: Set of existing point IDs in the collection.
Raises:
Exception: If scroll request fails after max retries.
"""
existing_ids = set()
scroll_offset = None
max_retries = 3
retry_delay = 5
while True:
response = None
for attempt in range(max_retries):
try:
response = self.client.scroll(
collection_name=self.collection_name,
offset=scroll_offset,
limit=10000,
with_payload=False,
with_vectors=False,
timeout=3000,
)
break # Success, exit retry loop
except Exception as e:
if attempt < max_retries - 1:
self.logger.warning(f"Scroll request failed (attempt {attempt + 1}/{max_retries}): {e}")
self.logger.info(f"Retrying in {retry_delay}s...")
time.sleep(retry_delay)
else:
self.logger.error(f"Scroll request failed after {max_retries} attempts: {e}")
raise
for point in response[0]:
existing_ids.add(point.id)
if response[1] is None:
break
scroll_offset = response[1]
return existing_ids
def _upload_batch(
self, batch_ids: List[int], batch_chunks: List[str], batch_metadata: List[dict], batch_embeddings: List[List[float]] = None
) -> None:
"""Upload a batch of documents to Qdrant.
Generates embeddings (if not provided) and uploads points to Qdrant.
Implements retry logic with up to 3 attempts on failure.
Args:
batch_ids (List[int]): List of unique point IDs.
batch_chunks (List[str]): List of text chunks to embed.
batch_metadata (List[dict]): List of metadata dictionaries for each point.
batch_embeddings (List[List[float]], optional): Pre-computed embeddings
to use instead of generating new ones. Defaults to None.
"""
# Generate embeddings if not provided
if batch_embeddings is None:
if self.use_existing_embeddings:
self.logger.error("use_existing_embeddings=True but no embeddings provided")
return
try:
batch_vectors = self.embedder.embed_documents(batch_chunks)
except Exception as e:
self.logger.error(f"Embedding error: {e}")
return
else:
batch_vectors = batch_embeddings
points = [
PointStruct(id=id_val, vector=vec, payload=meta)
for id_val, vec, meta in zip(batch_ids, batch_vectors, batch_metadata)
]
for attempt in range(3):
try:
self.client.upload_points(
collection_name=self.collection_name,
points=points,
parallel=10,
max_retries=3,
)
return
except Exception as e:
self.logger.error(f"Error uploading batch (attempt {attempt + 1}): {e}")
time.sleep(10)
if attempt < 2:
self.logger.info("Retrying...")
else:
self.logger.warning("Skipping batch after 3 failed attempts")
def _prepare_metadata(self, doc: Document) -> dict:
"""Prepare metadata from Document object for Qdrant storage.
Extracts and cleans metadata fields, performing type conversions and
formatting for Qdrant compatibility. Includes document content, user
metadata (unwrapped), and optionally pipeline metadata (wrapped).
Args:
doc (Document): Document object containing content and metadata.
Returns:
dict: Dictionary with cleaned metadata ready for Qdrant payload.
Includes 'content' field, all metadata fields at root level,
and optionally 'pipeline_metadata' as a nested dict.
"""
payload = {}
# Add content
payload["content"] = doc.content
# Add original metadata fields directly to root level (unwrapped)
if doc.metadata:
# Clean up metadata types
metadata_copy = doc.metadata.copy()
# Ensure year is an integer if present
if "year" in metadata_copy:
try:
metadata_copy["year"] = int(float(metadata_copy["year"]))
except (ValueError, TypeError):
metadata_copy["year"] = None
# Ensure title is a string if present
if "title" in metadata_copy:
metadata_copy["title"] = str(metadata_copy["title"])
# Add all metadata fields directly to payload (unwrapped)
payload.update(metadata_copy)
# Add pipeline_metadata as wrapped dict if configured
if self.upload_pipeline_metadata and doc.pipeline_metadata:
payload["pipeline_metadata"] = doc.pipeline_metadata.copy()
return payload
async def execute(self, documents: List[Document]) -> List[Document]:
"""Execute the upload step.
Routes to appropriate execution method based on configured mode
(local or qdrant).
Args:
documents (List[Document]): List of Document objects to process.
Returns:
List[Document]: The same list of documents passed through for
pipeline chaining.
"""
if not documents:
self.logger.warning("No documents to process")
return documents
self.logger.info(f"Processing {len(documents)} documents")
if self.mode == "local":
# Local mode: add embeddings to document metadata
return await self._execute_local(documents)
else:
# Qdrant mode: upload to vector database
return await self._execute_qdrant(documents)
async def _execute_local(self, documents: List[Document]) -> List[Document]:
"""Execute local embedding storage.
Generates embeddings for documents and stores them in the document.embedding
field without uploading to Qdrant. Processes documents in configurable batches.
Args:
documents (List[Document]): List of Document objects to process.
Returns:
List[Document]: Documents with embeddings added to embedding field.
"""
self.logger.info("Generating embeddings in local mode")
# Process in batches
for i in tqdm(
range(0, len(documents), self.batch_size),
desc="Generating embeddings",
):
batch = documents[i : i + self.batch_size]
batch_texts = [doc.content for doc in batch]
try:
# Generate embeddings
batch_embeddings = self.embedder.embed_documents(batch_texts)
# Add embeddings to document.embedding field
for doc, embedding in zip(batch, batch_embeddings):
doc.embedding = embedding
except Exception as e:
self.logger.error(f"Error generating embeddings for batch: {e}")
# Continue with next batch
self.logger.info(f"Successfully generated embeddings for {len(documents)} documents")
return documents
async def _execute_qdrant(self, documents: List[Document]) -> List[Document]:
"""Execute Qdrant upload mode.
Prepares document data, generates or extracts embeddings, filters out
existing documents, and uploads to Qdrant in batches.
Args:
documents (List[Document]): List of Document objects to upload.
Returns:
List[Document]: The same list of documents passed through for
pipeline chaining.
"""
# Prepare data for upload
ids = []
chunks = []
metadata = []
embeddings = [] if self.use_existing_embeddings else None
for doc in documents:
# Create unique ID based on file path and content hash
doc_id = (
f"{doc.filename}_{hashlib.md5(doc.content.encode()).hexdigest()[:8]}"
)
ids.append(doc_id)
chunks.append(doc.content)
metadata.append(self._prepare_metadata(doc))
# Extract existing embeddings if configured
if self.use_existing_embeddings:
if doc.embedding is None:
self.logger.error(f"Document {doc.filename} missing embedding")
# Skip this document
ids.pop()
chunks.pop()
metadata.pop()
continue
embeddings.append(doc.embedding)
# Convert to uint IDs
uint_ids = [self._string_to_uint(id_str) for id_str in ids]
if self.use_existing_embeddings:
to_process = list(zip(uint_ids, chunks, metadata, embeddings, ids))
else:
to_process = list(zip(uint_ids, chunks, metadata, ids))
# Filter out existing IDs
if self.use_existing_embeddings:
to_process = [item for item in to_process if item[0] not in self.existing_ids]
else:
to_process = [item for item in to_process if item[0] not in self.existing_ids]
skipped = len(uint_ids) - len(to_process)
self.logger.info(f"Skipping {skipped} existing documents")
self.logger.info(f"Uploading {len(to_process)} new vectors")
# Upload in batches
for i in tqdm(
range(0, len(to_process), self.batch_size),
desc=f"Uploading to {self.collection_name}",
):
batch = to_process[i : i + self.batch_size]
if self.use_existing_embeddings:
batch_ids = [item[0] for item in batch]
batch_chunks = [item[1] for item in batch]
batch_metadata = [item[2] for item in batch]
batch_embeddings = [item[3] for item in batch]
self._upload_batch(batch_ids, batch_chunks, batch_metadata, batch_embeddings)
else:
batch_ids = [item[0] for item in batch]
batch_chunks = [item[1] for item in batch]
batch_metadata = [item[2] for item in batch]
self._upload_batch(batch_ids, batch_chunks, batch_metadata)
self.logger.info(f"Successfully uploaded {len(to_process)} documents")
# Return documents for potential further processing
return documents
|
__init__(config, name='QdrantUpload')
Initialize the Qdrant upload step.
Parameters:
| Name |
Type |
Description |
Default |
config
|
dict
|
Configuration dictionary containing:
- mode (str, optional): "qdrant" or "local". Defaults to "qdrant".
- use_existing_embeddings (bool, optional): If True, use embeddings
from document.embedding field. Defaults to False.
- upload_pipeline_metadata (bool, optional): If True, include
pipeline_metadata in Qdrant payload. Defaults to False.
- vector_store (dict, required for "qdrant" mode):
- url (str): Qdrant instance URL.
- api_key (str, optional): API key for Qdrant authentication.
- collection_name (str): Target collection name.
- batch_size (int): Number of documents per batch.
- vector_size (int): Dimension of embedding vectors.
- embedder (dict, required if use_existing_embeddings=False):
- url (str): URL of VLLM embedding server.
- model_name (str): Embedding model identifier.
- timeout (int, optional): Request timeout in seconds. Defaults to 300.
- api_key (str, optional): API key for VLLM. Defaults to "EMPTY".
- batch_size (int, optional): Batch size for local mode. Defaults to 10.
|
required
|
name
|
str
|
Name for logging purposes. Defaults to "QdrantUpload".
|
'QdrantUpload'
|
Raises:
| Type |
Description |
ValueError
|
If mode is not "qdrant" or "local".
|
Source code in eve/steps/qdrant/qdrant_step.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180 | def __init__(self, config: dict, name: str = "QdrantUpload"):
"""Initialize the Qdrant upload step.
Args:
config (dict): Configuration dictionary containing:
- mode (str, optional): "qdrant" or "local". Defaults to "qdrant".
- use_existing_embeddings (bool, optional): If True, use embeddings
from document.embedding field. Defaults to False.
- upload_pipeline_metadata (bool, optional): If True, include
pipeline_metadata in Qdrant payload. Defaults to False.
- vector_store (dict, required for "qdrant" mode):
- url (str): Qdrant instance URL.
- api_key (str, optional): API key for Qdrant authentication.
- collection_name (str): Target collection name.
- batch_size (int): Number of documents per batch.
- vector_size (int): Dimension of embedding vectors.
- embedder (dict, required if use_existing_embeddings=False):
- url (str): URL of VLLM embedding server.
- model_name (str): Embedding model identifier.
- timeout (int, optional): Request timeout in seconds. Defaults to 300.
- api_key (str, optional): API key for VLLM. Defaults to "EMPTY".
- batch_size (int, optional): Batch size for local mode. Defaults to 10.
name (str, optional): Name for logging purposes. Defaults to "QdrantUpload".
Raises:
ValueError: If mode is not "qdrant" or "local".
"""
super().__init__(config, name)
# Determine mode: "qdrant" or "local"
self.mode = config.get("mode", "qdrant").lower()
if self.mode not in ["qdrant", "local"]:
raise ValueError(f"Invalid mode '{self.mode}'. Must be 'qdrant' or 'local'")
# Check if we should use existing embeddings from document.embedding field
self.use_existing_embeddings = config.get("use_existing_embeddings", False)
# Check if we should upload pipeline_metadata to Qdrant
self.upload_pipeline_metadata = config.get("upload_pipeline_metadata", False)
# Initialize VLLM embedder only if not using existing embeddings
if not self.use_existing_embeddings:
embedding_cfg = config["embedder"]
self.embedder = VLLMEmbedder(
url=embedding_cfg["url"],
model_name=embedding_cfg["model_name"],
timeout=embedding_cfg.get("timeout", 300),
api_key=embedding_cfg.get("api_key", "EMPTY"),
)
self.logger.info(f"Initialized VLLM embedder at {embedding_cfg['url']}")
else:
self.embedder = None
self.logger.info("Using existing embeddings from document metadata")
self.logger.info(f"Mode: {self.mode}")
# Initialize Qdrant-specific configuration only if mode is "qdrant"
if self.mode == "qdrant":
vector_store_cfg = config.get("vector_store", {})
self.qdrant_url = vector_store_cfg.get("url", "http://localhost:6333")
self.vector_store_api_key = vector_store_cfg.get("api_key")
# Get collection configuration
self.collection_name = vector_store_cfg["collection_name"]
self.batch_size = vector_store_cfg["batch_size"]
self.vector_size = vector_store_cfg["vector_size"]
# Initialize Qdrant client
self.client = QdrantClient(
url=self.qdrant_url, api_key=self.vector_store_api_key
)
# Ensure collection exists
self._ensure_collection()
self.existing_ids = self._get_existing_ids()
else:
# Local mode: set batch size for processing
self.batch_size = config.get("batch_size", 10)
self.client = None
|
execute(documents)
async
Execute the upload step.
Routes to appropriate execution method based on configured mode
(local or qdrant).
Parameters:
| Name |
Type |
Description |
Default |
documents
|
List[Document]
|
List of Document objects to process.
|
required
|
Returns:
| Type |
Description |
List[Document]
|
List[Document]: The same list of documents passed through for
pipeline chaining.
|
Source code in eve/steps/qdrant/qdrant_step.py
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463 | async def execute(self, documents: List[Document]) -> List[Document]:
"""Execute the upload step.
Routes to appropriate execution method based on configured mode
(local or qdrant).
Args:
documents (List[Document]): List of Document objects to process.
Returns:
List[Document]: The same list of documents passed through for
pipeline chaining.
"""
if not documents:
self.logger.warning("No documents to process")
return documents
self.logger.info(f"Processing {len(documents)} documents")
if self.mode == "local":
# Local mode: add embeddings to document metadata
return await self._execute_local(documents)
else:
# Qdrant mode: upload to vector database
return await self._execute_qdrant(documents)
|
VLLMEmbedder
Client for VLLM embedding server.
This class provides a simple interface to interact with a VLLM server
that exposes an OpenAI-compatible embeddings API endpoint.
Source code in eve/steps/qdrant/qdrant_step.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 | class VLLMEmbedder:
"""Client for VLLM embedding server.
This class provides a simple interface to interact with a VLLM server
that exposes an OpenAI-compatible embeddings API endpoint.
"""
def __init__(
self, url: str, model_name: str, timeout: int = 300, api_key: str = "EMPTY"
):
"""Initialize VLLM embedder client.
Args:
url (str): Base URL of the VLLM server.
model_name (str): Name of the embedding model to use.
timeout (int, optional): Request timeout in seconds. Defaults to 300.
api_key (str, optional): API key for authentication. Use "EMPTY" for
local servers. Defaults to "EMPTY".
"""
self.url = url.rstrip("/")
self.model_name = model_name
self.timeout = timeout
self.api_key = api_key
# Set up headers with API key
headers = (
{"Authorization": f"Bearer {api_key}"}
if api_key and api_key != "EMPTY"
else {}
)
self.client = httpx.Client(timeout=timeout, headers=headers)
def embed_documents(self, texts: List[str]) -> List[List[float] | None]:
"""Generate embeddings for a list of texts.
Sends requests to the VLLM server's /v1/embeddings endpoint to generate
embeddings for each text. Failed requests return None for that text.
Args:
texts (List[str]): List of text strings to embed.
Returns:
List[List[float] | None]: List of embedding vectors. Each element is
either a list of floats (the embedding) or None if embedding failed.
"""
embeddings = []
for text in texts:
endpoint = f"{self.url}/v1/embeddings"
payload = {"input": [text], "model": self.model_name, "encoding_format": "float"}
try:
response = self.client.post(endpoint, json=payload)
response.raise_for_status()
result = response.json()
# Extract embedding (single document)
embedding = result["data"][0]["embedding"]
embeddings.append(embedding)
except httpx.HTTPError as e:
print(f"VLLM embedding request failed: {e}")
embeddings.append(None)
except KeyError as e:
embeddings.append(None)
print(f"Unexpected response format from VLLM server: {e}")
return embeddings
def __del__(self):
"""Clean up HTTP client."""
if hasattr(self, "client"):
self.client.close()
|
__del__()
Clean up HTTP client.
Source code in eve/steps/qdrant/qdrant_step.py
| def __del__(self):
"""Clean up HTTP client."""
if hasattr(self, "client"):
self.client.close()
|
__init__(url, model_name, timeout=300, api_key='EMPTY')
Initialize VLLM embedder client.
Parameters:
| Name |
Type |
Description |
Default |
url
|
str
|
Base URL of the VLLM server.
|
required
|
model_name
|
str
|
Name of the embedding model to use.
|
required
|
timeout
|
int
|
Request timeout in seconds. Defaults to 300.
|
300
|
api_key
|
str
|
API key for authentication. Use "EMPTY" for
local servers. Defaults to "EMPTY".
|
'EMPTY'
|
Source code in eve/steps/qdrant/qdrant_step.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 | def __init__(
self, url: str, model_name: str, timeout: int = 300, api_key: str = "EMPTY"
):
"""Initialize VLLM embedder client.
Args:
url (str): Base URL of the VLLM server.
model_name (str): Name of the embedding model to use.
timeout (int, optional): Request timeout in seconds. Defaults to 300.
api_key (str, optional): API key for authentication. Use "EMPTY" for
local servers. Defaults to "EMPTY".
"""
self.url = url.rstrip("/")
self.model_name = model_name
self.timeout = timeout
self.api_key = api_key
# Set up headers with API key
headers = (
{"Authorization": f"Bearer {api_key}"}
if api_key and api_key != "EMPTY"
else {}
)
self.client = httpx.Client(timeout=timeout, headers=headers)
|
embed_documents(texts)
Generate embeddings for a list of texts.
Sends requests to the VLLM server's /v1/embeddings endpoint to generate
embeddings for each text. Failed requests return None for that text.
Parameters:
| Name |
Type |
Description |
Default |
texts
|
List[str]
|
List of text strings to embed.
|
required
|
Returns:
| Type |
Description |
List[List[float] | None]
|
List[List[float] | None]: List of embedding vectors. Each element is
either a list of floats (the embedding) or None if embedding failed.
|
Source code in eve/steps/qdrant/qdrant_step.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 | def embed_documents(self, texts: List[str]) -> List[List[float] | None]:
"""Generate embeddings for a list of texts.
Sends requests to the VLLM server's /v1/embeddings endpoint to generate
embeddings for each text. Failed requests return None for that text.
Args:
texts (List[str]): List of text strings to embed.
Returns:
List[List[float] | None]: List of embedding vectors. Each element is
either a list of floats (the embedding) or None if embedding failed.
"""
embeddings = []
for text in texts:
endpoint = f"{self.url}/v1/embeddings"
payload = {"input": [text], "model": self.model_name, "encoding_format": "float"}
try:
response = self.client.post(endpoint, json=payload)
response.raise_for_status()
result = response.json()
# Extract embedding (single document)
embedding = result["data"][0]["embedding"]
embeddings.append(embedding)
except httpx.HTTPError as e:
print(f"VLLM embedding request failed: {e}")
embeddings.append(None)
except KeyError as e:
embeddings.append(None)
print(f"Unexpected response format from VLLM server: {e}")
return embeddings
|