Skip to content

Qdrant Upload Stage

The Qdrant upload stage generates embeddings for documents and optionally uploads them to a Qdrant vector database for semantic search and retrieval.

Features

  • Dual Mode Operation: Upload to Qdrant database or store embeddings locally
  • Flexible Embedding Sources: Generate new embeddings or use existing ones from metadata
  • Automatic Deduplication: Skips documents already present in the collection
  • Batch Processing: Efficient batch upload with configurable size
  • Optimized Collection Setup: Automatic creation with HNSW indexing and quantization
  • Metadata Indexing: Creates indexes on key fields for efficient filtering
  • Retry Logic: Automatic retry on failures with configurable attempts

Operating Modes

Qdrant Mode (Default)

Uploads document embeddings to a Qdrant vector database:

- name: qdrant
  config:
    mode: "qdrant"
    vector_store:
      url: "http://localhost:6333"
      api_key: "your-api-key"  # Optional
      collection_name: "my_documents"
      batch_size: 100
      vector_size: 768
    embedder:
      url: "http://localhost:8000"
      model_name: "BAAI/bge-base-en-v1.5"
      timeout: 300
      api_key: "EMPTY"  # Optional

Local Mode

Generates embeddings and stores them in document metadata without uploading:

- name: qdrant
  config:
    mode: "local"
    batch_size: 10
    embedder:
      url: "http://localhost:8000"
      model_name: "BAAI/bge-base-en-v1.5"
      timeout: 300

Configuration Parameters

Mode Configuration

mode

  • Type: String
  • Default: "qdrant"
  • Options: "qdrant", "local"
  • Description: Operating mode - upload to Qdrant database or store embeddings locally

use_existing_embeddings

  • Type: Boolean
  • Default: false
  • Description: Use embeddings already stored in document.embedding field instead of generating new ones

upload_pipeline_metadata

  • Type: Boolean
  • Default: false
  • Description: Include pipeline processing metadata in the Qdrant payload

batch_size

  • Type: Integer
  • Default: 10 (local mode)
  • Description: Number of documents to process in each batch (for local mode only)

Vector Store Configuration (Qdrant Mode)

vector_store.url

  • Type: String
  • Default: "http://localhost:6333"
  • Description: URL of the Qdrant instance

vector_store.api_key

  • Type: String
  • Default: None
  • Description: API key for Qdrant authentication (optional for local instances)

vector_store.collection_name

  • Type: String
  • Required: Yes (for Qdrant mode)
  • Description: Name of the target collection in Qdrant

vector_store.batch_size

  • Type: Integer
  • Required: Yes (for Qdrant mode)
  • Description: Number of documents to upload per batch

vector_store.vector_size

  • Type: Integer
  • Required: Yes (for Qdrant mode)
  • Description: Dimension of embedding vectors (must match model output)

Embedder Configuration

embedder.url

  • Type: String
  • Required: Yes (unless use_existing_embeddings: true)
  • Description: URL of the VLLM embedding server

embedder.model_name

  • Type: String
  • Required: Yes (unless use_existing_embeddings: true)
  • Description: Name of the embedding model

embedder.timeout

  • Type: Integer
  • Default: 300
  • Description: Request timeout in seconds

embedder.api_key

  • Type: String
  • Default: "EMPTY"
  • Description: API key for VLLM authentication (use "EMPTY" for local servers)

Collection Optimization

When creating a new collection, the step automatically applies optimized settings for production use. These settings balance search quality, speed, and resource usage for large-scale document collections.

Learn more about Qdrant Collections →

Vector Configuration

#### Distance Metric: COSINE Cosine similarity measures the angle between vectors, making it ideal for text embeddings where the direction matters more than magnitude. This is the standard choice for semantic search applications. [Read about Distance Metrics →](https://qdrant.tech/documentation/concepts/search/#metrics) #### On-Disk Storage: Enabled Stores vectors on disk rather than in RAM, significantly reducing memory requirements for large collections. While slightly slower than in-memory storage, this allows you to store millions of vectors affordably. #### Shards: 8 Distributes data across 8 shards for parallel processing. More shards improve write throughput and allow better resource utilization, especially important for large datasets. [Learn about Sharding →](https://qdrant.tech/documentation/guides/distributed_deployment/)

HNSW Index Parameters

HNSW (Hierarchical Navigable Small World) is the indexing algorithm that enables fast approximate nearest neighbor search.

Deep dive into HNSW →

m: 16

Number of bidirectional links created for each node. Higher values improve search quality but increase memory usage and indexing time. 16 is a balanced choice for most applications.

  • Lower (4-8): Less memory, faster indexing, slightly lower recall
  • Higher (32-64): Better recall, more memory, slower indexing

ef_construct: 128

Size of the dynamic candidate list during index construction. Higher values produce better index quality but take longer to build. 128 provides good quality without excessive build time.

  • Lower (64): Faster indexing, slightly lower search quality
  • Higher (256-512): Better search quality, slower indexing

full_scan_threshold: 10,000

When collection size is below this threshold, Qdrant uses exact (brute-force) search instead of the HNSW index. Exact search is faster for small collections.

max_indexing_threads: 2

Limits CPU cores used during indexing. Prevents indexing from consuming all available resources.

on_disk: Enabled

Stores the HNSW graph on disk to reduce RAM usage. Essential for collections with millions of vectors.

Quantization

Binary quantization compresses vectors from 32-bit floats to 1-bit representations, reducing memory by ~32x with minimal quality loss. This makes it possible to store much larger collections.

Learn about Quantization →

Type: Binary Quantization

Converts vector components to binary (0 or 1) for massive memory savings. The original vectors are still used for final re-ranking, so search quality remains high.

Benefits:

  • 32x memory reduction (32-bit float → 1-bit)
  • Faster distance calculations
  • More vectors fit in RAM for better performance
  • Negligible impact on search quality (typically <2% recall loss)

always_ram: false

Allows quantized vectors to be stored on disk when needed, rather than always keeping them in RAM. This provides flexibility for very large collections.

Optimizer Settings

These settings control how Qdrant manages and optimizes data segments over time.

Learn about Storage Optimization →

indexing_threshold: 20,000

Build HNSW index when segment reaches this size. Smaller values create indexes sooner but may cause more frequent rebuilds.

memmap_threshold: 5,000

Use memory-mapped files for segments larger than this. Memory mapping allows efficient disk-based storage without loading everything into RAM.

max_segment_size: 5,000,000

Maximum vectors per segment. Larger segments are more memory-efficient but may slow down some operations.

max_optimization_threads: 2

CPU cores dedicated to background optimization tasks. Prevents optimization from impacting query performance.

Payload Indexes

Payload indexes enable fast filtering on metadata fields, similar to database indexes. Without these, filtering requires scanning all documents.

Learn about Payload Indexes →

The pipeline automatically creates indexes on common academic metadata fields:

Text Indexes (title, journal)

Enable full-text search and filtering on text fields. The word tokenizer splits text into searchable terms.

  • min_token_len: Minimum word length to index
  • max_token_len: Maximum word length to index
  • lowercase: Normalize to lowercase for case-insensitive search

Example filters:

  • Find papers with "neural" in title
  • Filter by journal name
  • Combine with vector search for semantic + keyword search

Integer Indexes (year, n_citations)

Enable efficient range queries on numeric fields.

Example filters:

  • Papers published after 2020
  • Papers with >100 citations
  • Combine filters: papers from 2015-2023 with >50 citations

Performance Impact:

  • Indexes speed up filtering by 100-1000x
  • Small storage overhead (~10-20% of original data)
  • Slightly slower writes (indexes must be updated)

Stage Behavior

Metadata Handling

Document metadata is prepared for storage:

  • content: Document text content
  • metadata: All user metadata fields (unwrapped at root level)
  • pipeline_metadata: Processing metadata (if upload_pipeline_metadata: true)

Type conversions:

  • year field converted to integer
  • title field converted to string

Error Handling

  • Batch uploads retry up to 3 times on failure
  • Failed batches are logged and skipped
  • Individual embedding failures are logged without stopping the pipeline
  • Scroll operations retry with exponential backoff

Usage Examples

Basic Upload with New Embeddings

- name: qdrant
  config:
    mode: "qdrant"
    vector_store:
      url: "http://localhost:6333"
      collection_name: "research_papers"
      batch_size: 100
      vector_size: 768
    embedder:
      url: "http://localhost:8000"
      model_name: "BAAI/bge-base-en-v1.5"

Upload with Existing Embeddings

If embeddings were generated in a previous step:

- name: qdrant
  config:
    mode: "qdrant"
    use_existing_embeddings: true
    upload_pipeline_metadata: true
    vector_store:
      url: "http://localhost:6333"
      api_key: "your-api-key"
      collection_name: "processed_docs"
      batch_size: 50
      vector_size: 1024

Local Embedding Generation

Store embeddings in document metadata without uploading:

- name: qdrant
  config:
    mode: "local"
    batch_size: 20
    embedder:
      url: "http://localhost:8000"
      model_name: "sentence-transformers/all-MiniLM-L6-v2"
      timeout: 600

VLLM Server Setup

The Qdrant step requires a VLLM server for embedding generation. Start the server:

cd server
python vllm.py

The server provides an OpenAI-compatible embeddings API at /v1/embeddings.

Complete Pipeline Example

pipeline:
  inputs:
    path: "processed_documents"

  stages:
    - name: chunking
      config:
        max_chunk_size: 512
        chunk_overlap: 50

    - name: metadata
      config:
        enabled_formats: ["pdf", "markdown"]
        enable_scholar_search: true

    - name: qdrant
      config:
        mode: "qdrant"
        upload_pipeline_metadata: true
        vector_store:
          url: "http://localhost:6333"
          collection_name: "academic_papers"
          batch_size: 100
          vector_size: 768
        embedder:
          url: "http://localhost:8000"
          model_name: "BAAI/bge-base-en-v1.5"
          timeout: 300

Next Steps

Code Reference

QdrantUploadStep

Bases: PipelineStep

Pipeline step for uploading chunked documents to Qdrant vector database or storing embeddings locally.

Supports two modes:

  • "qdrant": Upload embeddings to a Qdrant vector database
  • "local": Store embeddings in document metadata without uploading
Source code in eve/steps/qdrant/qdrant_step.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
class QdrantUploadStep(PipelineStep):
    """Pipeline step for uploading chunked documents to Qdrant vector database or storing embeddings locally.

    Supports two modes:

    - "qdrant": Upload embeddings to a Qdrant vector database
    - "local": Store embeddings in document metadata without uploading
    """

    def __init__(self, config: dict, name: str = "QdrantUpload"):
        """Initialize the Qdrant upload step.

        Args:
            config (dict): Configuration dictionary containing:

                - mode (str, optional): "qdrant" or "local". Defaults to "qdrant".
                - use_existing_embeddings (bool, optional): If True, use embeddings
                    from document.embedding field. Defaults to False.
                - upload_pipeline_metadata (bool, optional): If True, include
                    pipeline_metadata in Qdrant payload. Defaults to False.
                - vector_store (dict, required for "qdrant" mode):
                    - url (str): Qdrant instance URL.
                    - api_key (str, optional): API key for Qdrant authentication.
                    - collection_name (str): Target collection name.
                    - batch_size (int): Number of documents per batch.
                    - vector_size (int): Dimension of embedding vectors.
                - embedder (dict, required if use_existing_embeddings=False):
                    - url (str): URL of VLLM embedding server.
                    - model_name (str): Embedding model identifier.
                    - timeout (int, optional): Request timeout in seconds. Defaults to 300.
                    - api_key (str, optional): API key for VLLM. Defaults to "EMPTY".
                - batch_size (int, optional): Batch size for local mode. Defaults to 10.
            name (str, optional): Name for logging purposes. Defaults to "QdrantUpload".

        Raises:
            ValueError: If mode is not "qdrant" or "local".
        """
        super().__init__(config, name)

        # Determine mode: "qdrant" or "local"
        self.mode = config.get("mode", "qdrant").lower()

        if self.mode not in ["qdrant", "local"]:
            raise ValueError(f"Invalid mode '{self.mode}'. Must be 'qdrant' or 'local'")

        # Check if we should use existing embeddings from document.embedding field
        self.use_existing_embeddings = config.get("use_existing_embeddings", False)

        # Check if we should upload pipeline_metadata to Qdrant
        self.upload_pipeline_metadata = config.get("upload_pipeline_metadata", False)

        # Initialize VLLM embedder only if not using existing embeddings
        if not self.use_existing_embeddings:
            embedding_cfg = config["embedder"]
            self.embedder = VLLMEmbedder(
                url=embedding_cfg["url"],
                model_name=embedding_cfg["model_name"],
                timeout=embedding_cfg.get("timeout", 300),
                api_key=embedding_cfg.get("api_key", "EMPTY"),
            )
            self.logger.info(f"Initialized VLLM embedder at {embedding_cfg['url']}")
        else:
            self.embedder = None
            self.logger.info("Using existing embeddings from document metadata")

        self.logger.info(f"Mode: {self.mode}")

        # Initialize Qdrant-specific configuration only if mode is "qdrant"
        if self.mode == "qdrant":
            vector_store_cfg = config.get("vector_store", {})
            self.qdrant_url = vector_store_cfg.get("url", "http://localhost:6333")
            self.vector_store_api_key = vector_store_cfg.get("api_key")

            # Get collection configuration
            self.collection_name = vector_store_cfg["collection_name"]
            self.batch_size = vector_store_cfg["batch_size"]
            self.vector_size = vector_store_cfg["vector_size"]

            # Initialize Qdrant client
            self.client = QdrantClient(
                url=self.qdrant_url, api_key=self.vector_store_api_key
            )

            # Ensure collection exists
            self._ensure_collection()
            self.existing_ids = self._get_existing_ids()
        else:
            # Local mode: set batch size for processing
            self.batch_size = config.get("batch_size", 10)
            self.client = None

    def _ensure_collection(self) -> None:
        """Create Qdrant collection if it doesn't exist.

        Creates a new collection with optimized settings including HNSW indexing,
        binary quantization, and on-disk storage. Also creates payload indexes
        for efficient filtering.
        """
        if self.client.collection_exists(self.collection_name):
            self.logger.info(f"Collection '{self.collection_name}' already exists")
            return

        self.logger.info(f"Creating collection '{self.collection_name}'")

        # Create collection with optimized settings
        self.client.create_collection(
            collection_name=self.collection_name,
            vectors_config=models.VectorParams(
                size=self.vector_size,
                distance=models.Distance.COSINE,
                on_disk=True,
            ),
            shard_number=8,
            on_disk_payload=True,
            quantization_config=models.BinaryQuantization(
                binary=models.BinaryQuantizationConfig(always_ram=False)
            ),
        )

        # Update HNSW configuration
        self.client.update_collection(
            collection_name=self.collection_name,
            hnsw_config=models.HnswConfigDiff(
                m=16,
                ef_construct=128,
                full_scan_threshold=10_000,
                max_indexing_threads=2,
                on_disk=True,
            ),
        )

        # Update optimizer configuration
        self.client.update_collection(
            collection_name=self.collection_name,
            optimizers_config=models.OptimizersConfigDiff(
                indexing_threshold=20000,
                memmap_threshold=5000,
                deleted_threshold=0.2,
                vacuum_min_vector_number=1000,
                default_segment_number=2,
                max_segment_size=5_000_000,
                max_optimization_threads=2,
            ),
        )

        # Create payload indexes
        self._create_payload_indexes()

        self.logger.info("Collection created and optimized")

    def _create_payload_indexes(self) -> None:
        """Create indexes on payload fields for efficient filtering.

        Creates text indexes for 'title' and 'journal' fields, and integer
        indexes for 'year' and 'n_citations' fields to enable fast filtering
        and searching on these metadata fields.
        """
        # Text index for title
        self.client.create_payload_index(
            collection_name=self.collection_name,
            field_name="title",
            field_schema=models.TextIndexParams(
                type="text",
                tokenizer=models.TokenizerType.WORD,
                min_token_len=2,
                max_token_len=50,
                lowercase=True,
            ),
        )

        # Integer indexes
        for field in ["year", "n_citations"]:
            self.client.create_payload_index(
                collection_name=self.collection_name,
                field_name=field,
                field_schema="integer",
            )

        # Text index for journal
        self.client.create_payload_index(
            collection_name=self.collection_name,
            field_name="journal",
            field_schema=models.TextIndexParams(
                type="text",
                tokenizer=models.TokenizerType.WORD,
                min_token_len=1,
                max_token_len=50,
                lowercase=True,
            ),
        )

    @staticmethod
    def _string_to_uint(s: str) -> int:
        """Convert string to unsigned integer using SHA256 hash.

        Args:
            s (str): Input string to hash.

        Returns:
            int: Unsigned 64-bit integer derived from the hash.
        """
        hash_bytes = hashlib.sha256(s.encode("utf-8")).digest()
        return int.from_bytes(hash_bytes[:8], byteorder="big", signed=False)

    def _get_existing_ids(self) -> Set[int]:
        """Retrieve all existing point IDs from the collection with retry logic.

        Scrolls through the entire collection to fetch all point IDs. Implements
        retry logic with exponential backoff to handle temporary failures.

        Returns:
            Set[int]: Set of existing point IDs in the collection.

        Raises:
            Exception: If scroll request fails after max retries.
        """
        existing_ids = set()
        scroll_offset = None
        max_retries = 3
        retry_delay = 5

        while True:
            response = None
            for attempt in range(max_retries):
                try:
                    response = self.client.scroll(
                        collection_name=self.collection_name,
                        offset=scroll_offset,
                        limit=10000,
                        with_payload=False,
                        with_vectors=False,
                        timeout=3000,
                    )
                    break  # Success, exit retry loop
                except Exception as e:
                    if attempt < max_retries - 1:
                        self.logger.warning(f"Scroll request failed (attempt {attempt + 1}/{max_retries}): {e}")
                        self.logger.info(f"Retrying in {retry_delay}s...")
                        time.sleep(retry_delay)
                    else:
                        self.logger.error(f"Scroll request failed after {max_retries} attempts: {e}")
                        raise

            for point in response[0]:
                existing_ids.add(point.id)

            if response[1] is None:
                break
            scroll_offset = response[1]

        return existing_ids

    def _upload_batch(
        self, batch_ids: List[int], batch_chunks: List[str], batch_metadata: List[dict], batch_embeddings: List[List[float]] = None
    ) -> None:
        """Upload a batch of documents to Qdrant.

        Generates embeddings (if not provided) and uploads points to Qdrant.
        Implements retry logic with up to 3 attempts on failure.

        Args:
            batch_ids (List[int]): List of unique point IDs.
            batch_chunks (List[str]): List of text chunks to embed.
            batch_metadata (List[dict]): List of metadata dictionaries for each point.
            batch_embeddings (List[List[float]], optional): Pre-computed embeddings
                to use instead of generating new ones. Defaults to None.
        """
        # Generate embeddings if not provided
        if batch_embeddings is None:
            if self.use_existing_embeddings:
                self.logger.error("use_existing_embeddings=True but no embeddings provided")
                return

            try:
                batch_vectors = self.embedder.embed_documents(batch_chunks)
            except Exception as e:
                self.logger.error(f"Embedding error: {e}")
                return
        else:
            batch_vectors = batch_embeddings

        points = [
            PointStruct(id=id_val, vector=vec, payload=meta)
            for id_val, vec, meta in zip(batch_ids, batch_vectors, batch_metadata)
        ]

        for attempt in range(3):
            try:
                self.client.upload_points(
                    collection_name=self.collection_name,
                    points=points,
                    parallel=10,
                    max_retries=3,
                )
                return
            except Exception as e:
                self.logger.error(f"Error uploading batch (attempt {attempt + 1}): {e}")
                time.sleep(10)
                if attempt < 2:
                    self.logger.info("Retrying...")
                else:
                    self.logger.warning("Skipping batch after 3 failed attempts")

    def _prepare_metadata(self, doc: Document) -> dict:
        """Prepare metadata from Document object for Qdrant storage.

        Extracts and cleans metadata fields, performing type conversions and
        formatting for Qdrant compatibility. Includes document content, user
        metadata (unwrapped), and optionally pipeline metadata (wrapped).

        Args:
            doc (Document): Document object containing content and metadata.

        Returns:
            dict: Dictionary with cleaned metadata ready for Qdrant payload.
                Includes 'content' field, all metadata fields at root level,
                and optionally 'pipeline_metadata' as a nested dict.
        """
        payload = {}

        # Add content
        payload["content"] = doc.content

        # Add original metadata fields directly to root level (unwrapped)
        if doc.metadata:
            # Clean up metadata types
            metadata_copy = doc.metadata.copy()

            # Ensure year is an integer if present
            if "year" in metadata_copy:
                try:
                    metadata_copy["year"] = int(float(metadata_copy["year"]))
                except (ValueError, TypeError):
                    metadata_copy["year"] = None

            # Ensure title is a string if present
            if "title" in metadata_copy:
                metadata_copy["title"] = str(metadata_copy["title"])

            # Add all metadata fields directly to payload (unwrapped)
            payload.update(metadata_copy)

        # Add pipeline_metadata as wrapped dict if configured
        if self.upload_pipeline_metadata and doc.pipeline_metadata:
            payload["pipeline_metadata"] = doc.pipeline_metadata.copy()

        return payload

    async def execute(self, documents: List[Document]) -> List[Document]:
        """Execute the upload step.

        Routes to appropriate execution method based on configured mode
        (local or qdrant).

        Args:
            documents (List[Document]): List of Document objects to process.

        Returns:
            List[Document]: The same list of documents passed through for
                pipeline chaining.
        """
        if not documents:
            self.logger.warning("No documents to process")
            return documents

        self.logger.info(f"Processing {len(documents)} documents")

        if self.mode == "local":
            # Local mode: add embeddings to document metadata
            return await self._execute_local(documents)
        else:
            # Qdrant mode: upload to vector database
            return await self._execute_qdrant(documents)

    async def _execute_local(self, documents: List[Document]) -> List[Document]:
        """Execute local embedding storage.

        Generates embeddings for documents and stores them in the document.embedding
        field without uploading to Qdrant. Processes documents in configurable batches.

        Args:
            documents (List[Document]): List of Document objects to process.

        Returns:
            List[Document]: Documents with embeddings added to embedding field.
        """
        self.logger.info("Generating embeddings in local mode")

        # Process in batches
        for i in tqdm(
            range(0, len(documents), self.batch_size),
            desc="Generating embeddings",
        ):
            batch = documents[i : i + self.batch_size]
            batch_texts = [doc.content for doc in batch]

            try:
                # Generate embeddings
                batch_embeddings = self.embedder.embed_documents(batch_texts)

                # Add embeddings to document.embedding field
                for doc, embedding in zip(batch, batch_embeddings):
                    doc.embedding = embedding

            except Exception as e:
                self.logger.error(f"Error generating embeddings for batch: {e}")
                # Continue with next batch

        self.logger.info(f"Successfully generated embeddings for {len(documents)} documents")
        return documents

    async def _execute_qdrant(self, documents: List[Document]) -> List[Document]:
        """Execute Qdrant upload mode.

        Prepares document data, generates or extracts embeddings, filters out
        existing documents, and uploads to Qdrant in batches.

        Args:
            documents (List[Document]): List of Document objects to upload.

        Returns:
            List[Document]: The same list of documents passed through for
                pipeline chaining.
        """
        # Prepare data for upload
        ids = []
        chunks = []
        metadata = []
        embeddings = [] if self.use_existing_embeddings else None

        for doc in documents:
            # Create unique ID based on file path and content hash
            doc_id = (
                f"{doc.filename}_{hashlib.md5(doc.content.encode()).hexdigest()[:8]}"
            )
            ids.append(doc_id)
            chunks.append(doc.content)
            metadata.append(self._prepare_metadata(doc))

            # Extract existing embeddings if configured
            if self.use_existing_embeddings:
                if doc.embedding is None:
                    self.logger.error(f"Document {doc.filename} missing embedding")
                    # Skip this document
                    ids.pop()
                    chunks.pop()
                    metadata.pop()
                    continue
                embeddings.append(doc.embedding)

        # Convert to uint IDs
        uint_ids = [self._string_to_uint(id_str) for id_str in ids]

        if self.use_existing_embeddings:
            to_process = list(zip(uint_ids, chunks, metadata, embeddings, ids))
        else:
            to_process = list(zip(uint_ids, chunks, metadata, ids))

        # Filter out existing IDs
        if self.use_existing_embeddings:
            to_process = [item for item in to_process if item[0] not in self.existing_ids]
        else:
            to_process = [item for item in to_process if item[0] not in self.existing_ids]

        skipped = len(uint_ids) - len(to_process)
        self.logger.info(f"Skipping {skipped} existing documents")
        self.logger.info(f"Uploading {len(to_process)} new vectors")

        # Upload in batches
        for i in tqdm(
            range(0, len(to_process), self.batch_size),
            desc=f"Uploading to {self.collection_name}",
        ):
            batch = to_process[i : i + self.batch_size]

            if self.use_existing_embeddings:
                batch_ids = [item[0] for item in batch]
                batch_chunks = [item[1] for item in batch]
                batch_metadata = [item[2] for item in batch]
                batch_embeddings = [item[3] for item in batch]
                self._upload_batch(batch_ids, batch_chunks, batch_metadata, batch_embeddings)
            else:
                batch_ids = [item[0] for item in batch]
                batch_chunks = [item[1] for item in batch]
                batch_metadata = [item[2] for item in batch]
                self._upload_batch(batch_ids, batch_chunks, batch_metadata)

        self.logger.info(f"Successfully uploaded {len(to_process)} documents")

        # Return documents for potential further processing
        return documents

__init__(config, name='QdrantUpload')

Initialize the Qdrant upload step.

Parameters:

Name Type Description Default
config dict

Configuration dictionary containing:

  • mode (str, optional): "qdrant" or "local". Defaults to "qdrant".
  • use_existing_embeddings (bool, optional): If True, use embeddings from document.embedding field. Defaults to False.
  • upload_pipeline_metadata (bool, optional): If True, include pipeline_metadata in Qdrant payload. Defaults to False.
  • vector_store (dict, required for "qdrant" mode):
    • url (str): Qdrant instance URL.
    • api_key (str, optional): API key for Qdrant authentication.
    • collection_name (str): Target collection name.
    • batch_size (int): Number of documents per batch.
    • vector_size (int): Dimension of embedding vectors.
  • embedder (dict, required if use_existing_embeddings=False):
    • url (str): URL of VLLM embedding server.
    • model_name (str): Embedding model identifier.
    • timeout (int, optional): Request timeout in seconds. Defaults to 300.
    • api_key (str, optional): API key for VLLM. Defaults to "EMPTY".
  • batch_size (int, optional): Batch size for local mode. Defaults to 10.
required
name str

Name for logging purposes. Defaults to "QdrantUpload".

'QdrantUpload'

Raises:

Type Description
ValueError

If mode is not "qdrant" or "local".

Source code in eve/steps/qdrant/qdrant_step.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def __init__(self, config: dict, name: str = "QdrantUpload"):
    """Initialize the Qdrant upload step.

    Args:
        config (dict): Configuration dictionary containing:

            - mode (str, optional): "qdrant" or "local". Defaults to "qdrant".
            - use_existing_embeddings (bool, optional): If True, use embeddings
                from document.embedding field. Defaults to False.
            - upload_pipeline_metadata (bool, optional): If True, include
                pipeline_metadata in Qdrant payload. Defaults to False.
            - vector_store (dict, required for "qdrant" mode):
                - url (str): Qdrant instance URL.
                - api_key (str, optional): API key for Qdrant authentication.
                - collection_name (str): Target collection name.
                - batch_size (int): Number of documents per batch.
                - vector_size (int): Dimension of embedding vectors.
            - embedder (dict, required if use_existing_embeddings=False):
                - url (str): URL of VLLM embedding server.
                - model_name (str): Embedding model identifier.
                - timeout (int, optional): Request timeout in seconds. Defaults to 300.
                - api_key (str, optional): API key for VLLM. Defaults to "EMPTY".
            - batch_size (int, optional): Batch size for local mode. Defaults to 10.
        name (str, optional): Name for logging purposes. Defaults to "QdrantUpload".

    Raises:
        ValueError: If mode is not "qdrant" or "local".
    """
    super().__init__(config, name)

    # Determine mode: "qdrant" or "local"
    self.mode = config.get("mode", "qdrant").lower()

    if self.mode not in ["qdrant", "local"]:
        raise ValueError(f"Invalid mode '{self.mode}'. Must be 'qdrant' or 'local'")

    # Check if we should use existing embeddings from document.embedding field
    self.use_existing_embeddings = config.get("use_existing_embeddings", False)

    # Check if we should upload pipeline_metadata to Qdrant
    self.upload_pipeline_metadata = config.get("upload_pipeline_metadata", False)

    # Initialize VLLM embedder only if not using existing embeddings
    if not self.use_existing_embeddings:
        embedding_cfg = config["embedder"]
        self.embedder = VLLMEmbedder(
            url=embedding_cfg["url"],
            model_name=embedding_cfg["model_name"],
            timeout=embedding_cfg.get("timeout", 300),
            api_key=embedding_cfg.get("api_key", "EMPTY"),
        )
        self.logger.info(f"Initialized VLLM embedder at {embedding_cfg['url']}")
    else:
        self.embedder = None
        self.logger.info("Using existing embeddings from document metadata")

    self.logger.info(f"Mode: {self.mode}")

    # Initialize Qdrant-specific configuration only if mode is "qdrant"
    if self.mode == "qdrant":
        vector_store_cfg = config.get("vector_store", {})
        self.qdrant_url = vector_store_cfg.get("url", "http://localhost:6333")
        self.vector_store_api_key = vector_store_cfg.get("api_key")

        # Get collection configuration
        self.collection_name = vector_store_cfg["collection_name"]
        self.batch_size = vector_store_cfg["batch_size"]
        self.vector_size = vector_store_cfg["vector_size"]

        # Initialize Qdrant client
        self.client = QdrantClient(
            url=self.qdrant_url, api_key=self.vector_store_api_key
        )

        # Ensure collection exists
        self._ensure_collection()
        self.existing_ids = self._get_existing_ids()
    else:
        # Local mode: set batch size for processing
        self.batch_size = config.get("batch_size", 10)
        self.client = None

execute(documents) async

Execute the upload step.

Routes to appropriate execution method based on configured mode (local or qdrant).

Parameters:

Name Type Description Default
documents List[Document]

List of Document objects to process.

required

Returns:

Type Description
List[Document]

List[Document]: The same list of documents passed through for pipeline chaining.

Source code in eve/steps/qdrant/qdrant_step.py
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
async def execute(self, documents: List[Document]) -> List[Document]:
    """Execute the upload step.

    Routes to appropriate execution method based on configured mode
    (local or qdrant).

    Args:
        documents (List[Document]): List of Document objects to process.

    Returns:
        List[Document]: The same list of documents passed through for
            pipeline chaining.
    """
    if not documents:
        self.logger.warning("No documents to process")
        return documents

    self.logger.info(f"Processing {len(documents)} documents")

    if self.mode == "local":
        # Local mode: add embeddings to document metadata
        return await self._execute_local(documents)
    else:
        # Qdrant mode: upload to vector database
        return await self._execute_qdrant(documents)

VLLMEmbedder

Client for VLLM embedding server.

This class provides a simple interface to interact with a VLLM server that exposes an OpenAI-compatible embeddings API endpoint.

Source code in eve/steps/qdrant/qdrant_step.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class VLLMEmbedder:
    """Client for VLLM embedding server.

    This class provides a simple interface to interact with a VLLM server
    that exposes an OpenAI-compatible embeddings API endpoint.
    """

    def __init__(
        self, url: str, model_name: str, timeout: int = 300, api_key: str = "EMPTY"
    ):
        """Initialize VLLM embedder client.

        Args:
            url (str): Base URL of the VLLM server.
            model_name (str): Name of the embedding model to use.
            timeout (int, optional): Request timeout in seconds. Defaults to 300.
            api_key (str, optional): API key for authentication. Use "EMPTY" for
                local servers. Defaults to "EMPTY".
        """
        self.url = url.rstrip("/")
        self.model_name = model_name
        self.timeout = timeout
        self.api_key = api_key

        # Set up headers with API key
        headers = (
            {"Authorization": f"Bearer {api_key}"}
            if api_key and api_key != "EMPTY"
            else {}
        )
        self.client = httpx.Client(timeout=timeout, headers=headers)

    def embed_documents(self, texts: List[str]) -> List[List[float] | None]:
        """Generate embeddings for a list of texts.

        Sends requests to the VLLM server's /v1/embeddings endpoint to generate
        embeddings for each text. Failed requests return None for that text.

        Args:
            texts (List[str]): List of text strings to embed.

        Returns:
            List[List[float] | None]: List of embedding vectors. Each element is
                either a list of floats (the embedding) or None if embedding failed.
        """
        embeddings = []
        for text in texts:
            endpoint = f"{self.url}/v1/embeddings"

            payload = {"input": [text], "model": self.model_name, "encoding_format": "float"}

            try:
                response = self.client.post(endpoint, json=payload)
                response.raise_for_status()

                result = response.json()

                # Extract embedding (single document)
                embedding = result["data"][0]["embedding"]
                embeddings.append(embedding)

            except httpx.HTTPError as e:
                print(f"VLLM embedding request failed: {e}")
                embeddings.append(None)
            except KeyError as e:
                embeddings.append(None)
                print(f"Unexpected response format from VLLM server: {e}")
        return embeddings

    def __del__(self):
        """Clean up HTTP client."""
        if hasattr(self, "client"):
            self.client.close()

__del__()

Clean up HTTP client.

Source code in eve/steps/qdrant/qdrant_step.py
85
86
87
88
def __del__(self):
    """Clean up HTTP client."""
    if hasattr(self, "client"):
        self.client.close()

__init__(url, model_name, timeout=300, api_key='EMPTY')

Initialize VLLM embedder client.

Parameters:

Name Type Description Default
url str

Base URL of the VLLM server.

required
model_name str

Name of the embedding model to use.

required
timeout int

Request timeout in seconds. Defaults to 300.

300
api_key str

API key for authentication. Use "EMPTY" for local servers. Defaults to "EMPTY".

'EMPTY'
Source code in eve/steps/qdrant/qdrant_step.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(
    self, url: str, model_name: str, timeout: int = 300, api_key: str = "EMPTY"
):
    """Initialize VLLM embedder client.

    Args:
        url (str): Base URL of the VLLM server.
        model_name (str): Name of the embedding model to use.
        timeout (int, optional): Request timeout in seconds. Defaults to 300.
        api_key (str, optional): API key for authentication. Use "EMPTY" for
            local servers. Defaults to "EMPTY".
    """
    self.url = url.rstrip("/")
    self.model_name = model_name
    self.timeout = timeout
    self.api_key = api_key

    # Set up headers with API key
    headers = (
        {"Authorization": f"Bearer {api_key}"}
        if api_key and api_key != "EMPTY"
        else {}
    )
    self.client = httpx.Client(timeout=timeout, headers=headers)

embed_documents(texts)

Generate embeddings for a list of texts.

Sends requests to the VLLM server's /v1/embeddings endpoint to generate embeddings for each text. Failed requests return None for that text.

Parameters:

Name Type Description Default
texts List[str]

List of text strings to embed.

required

Returns:

Type Description
List[List[float] | None]

List[List[float] | None]: List of embedding vectors. Each element is either a list of floats (the embedding) or None if embedding failed.

Source code in eve/steps/qdrant/qdrant_step.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def embed_documents(self, texts: List[str]) -> List[List[float] | None]:
    """Generate embeddings for a list of texts.

    Sends requests to the VLLM server's /v1/embeddings endpoint to generate
    embeddings for each text. Failed requests return None for that text.

    Args:
        texts (List[str]): List of text strings to embed.

    Returns:
        List[List[float] | None]: List of embedding vectors. Each element is
            either a list of floats (the embedding) or None if embedding failed.
    """
    embeddings = []
    for text in texts:
        endpoint = f"{self.url}/v1/embeddings"

        payload = {"input": [text], "model": self.model_name, "encoding_format": "float"}

        try:
            response = self.client.post(endpoint, json=payload)
            response.raise_for_status()

            result = response.json()

            # Extract embedding (single document)
            embedding = result["data"][0]["embedding"]
            embeddings.append(embedding)

        except httpx.HTTPError as e:
            print(f"VLLM embedding request failed: {e}")
            embeddings.append(None)
        except KeyError as e:
            embeddings.append(None)
            print(f"Unexpected response format from VLLM server: {e}")
    return embeddings