Skip to content

Pipeline Stages

This section documents all the available pipeline stages for processing documents.

Extraction Stage

Extracts content from various document formats.

ExtractionStep

Bases: PipelineStep

Source code in eve/steps/extraction/extract_step.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class ExtractionStep(PipelineStep):
    async def _html_extraction(self, document: Document) -> Document:
        html_extractor = HtmlExtractor(document)
        text = await html_extractor.extract_text()
        return text

    async def _pdf_extraction(self, document: Document, url: str) -> Document:
        pdf_extractor = PdfExtractor(document, url)
        text = await pdf_extractor.extract_text()
        return text

    async def _xml_extraction(self, document: Document) -> Document:
        xml_extractor = XmlExtractor(document)
        text = await xml_extractor.extract_text()
        return text

    async def _md_extraction(self, document: Document) -> Document:
        md_extractor = MarkdownExtractor(document)
        text = await md_extractor.extract_text()
        return text

    async def _jsonl_extraction(self, document: Document) -> List[Document]:
        jsonl_extractor = JSONLExtractor(document)
        documents = await jsonl_extractor.extract_documents()
        return documents

    async def execute(self, documents: List[Document]) -> List[Document]:
        """Execute text extraction on input files or documents.

        Args:
            input_data: List of file paths or Document objects to extract text from

        Returns:
            List of Document objects with extracted text
        """
        format = self.config.get(
            "format", None
        )  # write a wrapper to find out the extension
        if not format:
            unique_formats = set()

        unique_formats = {document.file_format for document in documents}
        text_extraction_formats = ["html", "xml", "pdf", "md"]
        supported_formats = ["jsonl"]

        self.logger.info(
            f"Extracting text from {unique_formats} files. File count: {len(documents)}"
        )

        result = []
        for document in documents:
            try:
                # Skip documents that already have content (already extracted in create_batches)
                # This happens for JSONL files that are pre-loaded
                if document.content and document.file_format == "md":
                    result.append(document)
                    self.logger.debug(f"Skipping already-loaded document: {document.filename}")
                    continue

                if document.file_format in text_extraction_formats:
                    if document.file_format == "html":
                        document_with_text = await self._html_extraction(document)
                    elif document.file_format == "pdf":
                        url = self.config.get("url", None)
                        if not url:
                            self.logger.error(
                                "No URL provided for PDF extraction service"
                            )
                        document_with_text = await self._pdf_extraction(document, url)
                    elif document.file_format == "xml":
                        document_with_text = await self._xml_extraction(document)
                    elif document.file_format == "md":
                        document_with_text = await self._md_extraction(document)
                    else:
                        self.logger.error(f"Unsupported format: {document.file_format}")
                        continue

                    if (
                        document_with_text
                        and hasattr(document_with_text, "content_length")
                        and document_with_text.content_length > 1
                    ):
                        result.append(document_with_text)
                        self.logger.info(
                            f"Successfully extracted {document_with_text.content_length} characters from {document_with_text.filename}"
                        )
                elif document.file_format in supported_formats:
                    docs = await self._jsonl_extraction(document)
                    docs = docs or []
                    result.extend(docs)
                    self.logger.info(
                        f"Successfully extracted {len(docs)} documents from {document.filename}"
                    )
                else:
                    self.logger.warning(f"No text extracted from {document.filename}")
            except Exception as e:
                self.logger.error(
                    f"Failed to extract text from {document.filename}: {str(e)}"
                )
                continue
        return result

execute(documents) async

Execute text extraction on input files or documents.

Parameters:

Name Type Description Default
input_data

List of file paths or Document objects to extract text from

required

Returns:

Type Description
List[Document]

List of Document objects with extracted text

Source code in eve/steps/extraction/extract_step.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
async def execute(self, documents: List[Document]) -> List[Document]:
    """Execute text extraction on input files or documents.

    Args:
        input_data: List of file paths or Document objects to extract text from

    Returns:
        List of Document objects with extracted text
    """
    format = self.config.get(
        "format", None
    )  # write a wrapper to find out the extension
    if not format:
        unique_formats = set()

    unique_formats = {document.file_format for document in documents}
    text_extraction_formats = ["html", "xml", "pdf", "md"]
    supported_formats = ["jsonl"]

    self.logger.info(
        f"Extracting text from {unique_formats} files. File count: {len(documents)}"
    )

    result = []
    for document in documents:
        try:
            # Skip documents that already have content (already extracted in create_batches)
            # This happens for JSONL files that are pre-loaded
            if document.content and document.file_format == "md":
                result.append(document)
                self.logger.debug(f"Skipping already-loaded document: {document.filename}")
                continue

            if document.file_format in text_extraction_formats:
                if document.file_format == "html":
                    document_with_text = await self._html_extraction(document)
                elif document.file_format == "pdf":
                    url = self.config.get("url", None)
                    if not url:
                        self.logger.error(
                            "No URL provided for PDF extraction service"
                        )
                    document_with_text = await self._pdf_extraction(document, url)
                elif document.file_format == "xml":
                    document_with_text = await self._xml_extraction(document)
                elif document.file_format == "md":
                    document_with_text = await self._md_extraction(document)
                else:
                    self.logger.error(f"Unsupported format: {document.file_format}")
                    continue

                if (
                    document_with_text
                    and hasattr(document_with_text, "content_length")
                    and document_with_text.content_length > 1
                ):
                    result.append(document_with_text)
                    self.logger.info(
                        f"Successfully extracted {document_with_text.content_length} characters from {document_with_text.filename}"
                    )
            elif document.file_format in supported_formats:
                docs = await self._jsonl_extraction(document)
                docs = docs or []
                result.extend(docs)
                self.logger.info(
                    f"Successfully extracted {len(docs)} documents from {document.filename}"
                )
            else:
                self.logger.warning(f"No text extracted from {document.filename}")
        except Exception as e:
            self.logger.error(
                f"Failed to extract text from {document.filename}: {str(e)}"
            )
            continue
    return result

Extractors

PDF Extractor

PdfExtractor

Source code in eve/steps/extraction/pdfs.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class PdfExtractor:
    def __init__(self, document: Document, endpoint: str):
        self.document = document
        self.endpoint = f"{endpoint}/predict"
        self.extraction = None

    async def _call_nougat(self, session: aiohttp.ClientSession) -> Optional[str]:
        """internal method to call the Nougat API."""
        try:
            file_content = await read_file(self.document.file_path, 'rb')
            if not file_content:
                logger.error(f"Failed to read file: {self.file_path}")
                return None

            data = aiohttp.FormData()
            data.add_field('file', file_content, filename = self.document.filename, content_type = 'application/pdf')

            async with session.post(self.endpoint, data = data) as response:
                if response.status == 200:
                    return await response.text()
                else:
                    logger.error(f"Nougat API request for {self.document.file_path} failed with status {response.status}")
                    return None
        except Exception as e:
            logger.error(f"Failed to process {self.document.file_path}: {str(e)}")
            return None

    async def extract_text(self) -> Optional[Document]:
        """Extract text from a single PDF file.

        Returns:
            Document object with extracted text if successful, None otherwise
        """
        try:
            async with aiohttp.ClientSession() as session:
                content = await self._call_nougat(session)
                if not content:
                    logger.error(f"Failed to extract content from {self.document.file_path}")
                    return None
                self.document.content = content
                return self.document
        except Exception as e:
            logger.error(f"Error in PDF extraction for {self.document.file_path}: {str(e)}")
            return None

extract_text() async

Extract text from a single PDF file.

Returns:

Type Description
Optional[Document]

Document object with extracted text if successful, None otherwise

Source code in eve/steps/extraction/pdfs.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
async def extract_text(self) -> Optional[Document]:
    """Extract text from a single PDF file.

    Returns:
        Document object with extracted text if successful, None otherwise
    """
    try:
        async with aiohttp.ClientSession() as session:
            content = await self._call_nougat(session)
            if not content:
                logger.error(f"Failed to extract content from {self.document.file_path}")
                return None
            self.document.content = content
            return self.document
    except Exception as e:
        logger.error(f"Error in PDF extraction for {self.document.file_path}: {str(e)}")
        return None

HTML Extractor

HtmlExtractor

Source code in eve/steps/extraction/htmls.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class HtmlExtractor:
    def __init__(self, document: Document):
        self.document = document

    async def extract_text(self) -> Optional[Document]:
        """Extract text from a single HTML file.

        Returns:
            Document object with extracted text if successful, None otherwise
        """
        try:
            content = await read_file(self.document.file_path, 'r')
            if not content:
                logger.error(f"Failed to read file: {self.document.file_path}")
                return None

            def parse_html():
                return extract(content, include_comments = False, include_tables = True)

            self.document.content = await asyncio.to_thread(parse_html)
            return self.document
        except Exception as e:
            logger.error(f"Error processing HTML file {self.document.file_path}: {e}")
            return None

extract_text() async

Extract text from a single HTML file.

Returns:

Type Description
Optional[Document]

Document object with extracted text if successful, None otherwise

Source code in eve/steps/extraction/htmls.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
async def extract_text(self) -> Optional[Document]:
    """Extract text from a single HTML file.

    Returns:
        Document object with extracted text if successful, None otherwise
    """
    try:
        content = await read_file(self.document.file_path, 'r')
        if not content:
            logger.error(f"Failed to read file: {self.document.file_path}")
            return None

        def parse_html():
            return extract(content, include_comments = False, include_tables = True)

        self.document.content = await asyncio.to_thread(parse_html)
        return self.document
    except Exception as e:
        logger.error(f"Error processing HTML file {self.document.file_path}: {e}")
        return None

XML Extractor

XmlExtractor

Source code in eve/steps/extraction/xmls.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class XmlExtractor:
    def __init__(self, document: Document):
        self.document = document

    async def extract_text(self) -> Optional[Document]:
        """Extract text from a single XML file.

        Returns:
            Document object with extracted text if successful, None otherwise
        """
        try:
            content = await read_file(self.document.file_path, 'r')
            if not content:
                logger.error(f"Failed to read file: {self.document.file_path}")
                return None

            def parse_and_extract():
                root = ET.fromstring(content)

                def extract_text_from_tree(element):
                    texts = []
                    if element.text:
                        texts.append(element.text)
                    for child in element:
                        texts.extend(extract_text_from_tree(child))
                    if element.tail:
                        texts.append(element.tail)
                    return texts

                extracted_texts = extract_text_from_tree(root)
                full_text = ''.join(extracted_texts)
                cleaned_text = re.sub(r'\n{3,}', '\n\n', full_text)
                return cleaned_text.strip()

            self.document.content = await asyncio.to_thread(parse_and_extract)
            return self.document
        except Exception as e:
            logger.error(f"Error processing XML file {self.document.file_path}: {e}")
            return None

extract_text() async

Extract text from a single XML file.

Returns:

Type Description
Optional[Document]

Document object with extracted text if successful, None otherwise

Source code in eve/steps/extraction/xmls.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
async def extract_text(self) -> Optional[Document]:
    """Extract text from a single XML file.

    Returns:
        Document object with extracted text if successful, None otherwise
    """
    try:
        content = await read_file(self.document.file_path, 'r')
        if not content:
            logger.error(f"Failed to read file: {self.document.file_path}")
            return None

        def parse_and_extract():
            root = ET.fromstring(content)

            def extract_text_from_tree(element):
                texts = []
                if element.text:
                    texts.append(element.text)
                for child in element:
                    texts.extend(extract_text_from_tree(child))
                if element.tail:
                    texts.append(element.tail)
                return texts

            extracted_texts = extract_text_from_tree(root)
            full_text = ''.join(extracted_texts)
            cleaned_text = re.sub(r'\n{3,}', '\n\n', full_text)
            return cleaned_text.strip()

        self.document.content = await asyncio.to_thread(parse_and_extract)
        return self.document
    except Exception as e:
        logger.error(f"Error processing XML file {self.document.file_path}: {e}")
        return None

Markdown Extractor

MarkdownExtractor

Source code in eve/steps/extraction/markdown.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class MarkdownExtractor:
    def __init__(self, document: Document):
        self.document = document

    async def extract_text(self) -> Optional[Document]:
        """Extract text from a single markdown file.

        Returns:
            Document object with extracted text if successful, None otherwise
        """
        try:
            content = await read_file(self.document.file_path, 'r')
            if not content:
                logger.error(f"Failed to read file: {self.document.file_path}")
                return None

            self.document.content = content
            return self.document
        except Exception as e:
            logger.error(f"Error processing HTML file {self.document.file_path}: {e}")
            return None

extract_text() async

Extract text from a single markdown file.

Returns:

Type Description
Optional[Document]

Document object with extracted text if successful, None otherwise

Source code in eve/steps/extraction/markdown.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
async def extract_text(self) -> Optional[Document]:
    """Extract text from a single markdown file.

    Returns:
        Document object with extracted text if successful, None otherwise
    """
    try:
        content = await read_file(self.document.file_path, 'r')
        if not content:
            logger.error(f"Failed to read file: {self.document.file_path}")
            return None

        self.document.content = content
        return self.document
    except Exception as e:
        logger.error(f"Error processing HTML file {self.document.file_path}: {e}")
        return None

Deduplication Stage

Removes duplicate and near-duplicate documents.

DuplicationStep

Bases: PipelineStep

Source code in eve/steps/dedup/dedup_step.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class DuplicationStep(PipelineStep):
    async def _exact_deduplication(self, documents: List[Document]) -> List[Document]:
        finder = ExactDuplication(documents)
        duplicates = await finder.find_duplicates()
        return duplicates

    async def _lsh_deduplication(self, documents: List[Document]) -> List[Document]:
        shingle_size = self.config.get("shingle_size", 3)
        num_perm = self.config.get("num_perm", 128)
        threshold = self.config.get("threshold", 0.8)
        lsh = LSH(documents, shingle_size, num_perm, threshold)
        duplicates = lsh.find_duplicates() 
        return duplicates

    async def execute(self, documents: List[Document]) -> List[Document]:
        """Execute deduplication on input files or documents.

        Args:
            input_data: List of file paths or Document objects to deduplicate

        Returns:
            List of Document objects with duplicates removed
        """
        method = self.config.get("method", "exact")  # default to exact

        self.logger.info(f"Executing duplication step with method: {method} file count: {len(documents)}")

        if method == "exact":
            duplicates = await self._exact_deduplication(documents)
        elif method == "lsh":
            duplicates = await self._lsh_deduplication(documents)
        else:
            self.logger.error(f"Invalid deduplication method: {method}")
            raise ValueError(f"Invalid deduplication method: {method}")

        # Remove duplicates from documents
        duplicate_docs = set()
        duplicates_removed = 0
        for group in duplicates:
            # Keep the first doc in each group, mark the rest as duplicates
            for doc in group[1:]:
                duplicate_docs.add(doc)
                duplicates_removed += 1

        # Filter out duplicates, keeping the first occurrence
        result_documents = []
        for doc in documents:
            if doc not in duplicate_docs:
                result_documents.append(doc)

        self.logger.info(
            f"Deduplication complete: {len(result_documents)} files remaining, {duplicates_removed} duplicates removed"
        )
        return result_documents

execute(documents) async

Execute deduplication on input files or documents.

Parameters:

Name Type Description Default
input_data

List of file paths or Document objects to deduplicate

required

Returns:

Type Description
List[Document]

List of Document objects with duplicates removed

Source code in eve/steps/dedup/dedup_step.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
async def execute(self, documents: List[Document]) -> List[Document]:
    """Execute deduplication on input files or documents.

    Args:
        input_data: List of file paths or Document objects to deduplicate

    Returns:
        List of Document objects with duplicates removed
    """
    method = self.config.get("method", "exact")  # default to exact

    self.logger.info(f"Executing duplication step with method: {method} file count: {len(documents)}")

    if method == "exact":
        duplicates = await self._exact_deduplication(documents)
    elif method == "lsh":
        duplicates = await self._lsh_deduplication(documents)
    else:
        self.logger.error(f"Invalid deduplication method: {method}")
        raise ValueError(f"Invalid deduplication method: {method}")

    # Remove duplicates from documents
    duplicate_docs = set()
    duplicates_removed = 0
    for group in duplicates:
        # Keep the first doc in each group, mark the rest as duplicates
        for doc in group[1:]:
            duplicate_docs.add(doc)
            duplicates_removed += 1

    # Filter out duplicates, keeping the first occurrence
    result_documents = []
    for doc in documents:
        if doc not in duplicate_docs:
            result_documents.append(doc)

    self.logger.info(
        f"Deduplication complete: {len(result_documents)} files remaining, {duplicates_removed} duplicates removed"
    )
    return result_documents

Deduplication Methods

Exact Duplicates

ExactDuplication

this class does exact duplication by -

  1. calculate size as a first filter to save computation.
  2. calcuates checksum and finds the duplicates.
Source code in eve/steps/dedup/exact_duplicates.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class ExactDuplication:
    """this class does exact duplication by -

    1. calculate size as a first filter to save computation.
    2. calcuates checksum and finds the duplicates.
    """

    def __init__(self, documents: List[Document]):
        self.documents = documents
        self.duplicates = []

        self._validate()

    def _validate(self):
        if len(self.documents) < 2:
            raise ValueError("need at least 2 files for duplication")

    @staticmethod
    async def _calculate_sha256(file_path: Path) -> str:
        """calculate SHA-256 checksum of a file."""
        sha256 = hashlib.sha256()
        async for chunk in read_in_chunks(file_path, 'rb'):
            sha256.update(chunk)
        return sha256.hexdigest()

    @staticmethod
    async def _calculate_size(file_path: Path) -> int:
        """calculate file size"""
        stat = await asyncio.to_thread(lambda: file_path.stat())  # run blocking stat in thread
        return stat.st_size

    async def find_duplicates(self) -> list[list[Document]]:
        """Find duplicate files based on size and SHA-256 checksum."""

        # stage 1: group files by size
        size_tasks = [self._calculate_size(doc.file_path) for doc in self.documents]
        sizes = await asyncio.gather(*size_tasks)

        size_groups = defaultdict(list)
        for doc, size in zip(self.documents, sizes):
            size_groups[size].append(doc)

        # stage 2: calculate checksums for potential duplicates
        checksum_tasks = []
        file_info = []

        for size, docs in size_groups.items():
            if len(docs) >= 2:  # Only consider docs with matching sizes
                for doc in docs:
                    checksum_tasks.append(self._calculate_sha256(doc.file_path))
                    file_info.append((doc, size))

        if not checksum_tasks:
            return []

        checksums = await asyncio.gather(*checksum_tasks)

        file_map = defaultdict(list)
        for (doc, size), checksum in zip(file_info, checksums):
            key = (size, checksum)
            file_map[key].append(doc)

        self.duplicates = {key: docs for key, docs in file_map.items() if len(docs) > 1}
        return list(self.duplicates.values())

find_duplicates() async

Find duplicate files based on size and SHA-256 checksum.

Source code in eve/steps/dedup/exact_duplicates.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
async def find_duplicates(self) -> list[list[Document]]:
    """Find duplicate files based on size and SHA-256 checksum."""

    # stage 1: group files by size
    size_tasks = [self._calculate_size(doc.file_path) for doc in self.documents]
    sizes = await asyncio.gather(*size_tasks)

    size_groups = defaultdict(list)
    for doc, size in zip(self.documents, sizes):
        size_groups[size].append(doc)

    # stage 2: calculate checksums for potential duplicates
    checksum_tasks = []
    file_info = []

    for size, docs in size_groups.items():
        if len(docs) >= 2:  # Only consider docs with matching sizes
            for doc in docs:
                checksum_tasks.append(self._calculate_sha256(doc.file_path))
                file_info.append((doc, size))

    if not checksum_tasks:
        return []

    checksums = await asyncio.gather(*checksum_tasks)

    file_map = defaultdict(list)
    for (doc, size), checksum in zip(file_info, checksums):
        key = (size, checksum)
        file_map[key].append(doc)

    self.duplicates = {key: docs for key, docs in file_map.items() if len(docs) > 1}
    return list(self.duplicates.values())

MinHash LSH

Adjust NUM_PERM: Higher values increase accuracy but use more memory. Adjust THRESHOLD: Higher values find closer duplicates but may miss some. Adjust SHINGLE_SIZE: Larger shingles are more specific but increase computation.

LSH

Source code in eve/steps/dedup/minhash.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class LSH:
    def __init__(
        self,
        documents: List[Document],
        shingle_size: int = 3,
        num_perm: int = 128,
        threshold: float = 0.8,
    ):
        self.documents = documents
        self.shingle_size = shingle_size
        self.num_perm = num_perm
        self.threshold = threshold
        self.doc_hashes = {}   # map: document -> minHash
        self.duplicates = []

        self._validate()

    def _validate(self):
        if len(self.documents) < 2:
            raise ValueError("need at least 2 files for duplication")

    def create_shingles(self, text: str) -> set[str]:
        """Create shingles (word n-grams) from text."""
        words = text.lower().split()
        return {" ".join(gram) for gram in ngrams(words, self.shingle_size)}

    def _do_lsh(self) -> Any:
        lsh = MinHashLSH(threshold=self.threshold, num_perm=self.num_perm)

        for doc in tqdm(self.documents, total=len(self.documents)):
            shingles = self.create_shingles(doc.content)

            m = MinHash(num_perm=self.num_perm)
            for shingle in shingles:
                m.update(shingle.encode("utf8"))

            # Use file_path as the LSH key, but keep mapping to Document
            lsh.insert(str(doc.file_path), m)
            self.doc_hashes[doc] = m

        return lsh

    def find_duplicates(self) -> list[list[Document]]:
        """Find near-duplicate documents using LSH."""
        file_hashes = self._do_lsh()
        processed = set()

        for doc in self.documents:
            if doc in processed:
                continue

            m = self.doc_hashes[doc]
            candidates = file_hashes.query(m)

            # Convert LSH string keys back to Document objects
            candidate_docs = [
                d for d in self.documents if str(d.file_path) in candidates and d != doc
            ]

            if candidate_docs:
                group = [doc, *candidate_docs]
                group = sorted(group, key = lambda d: str(d.file_path))  # consistent ordering
                if group not in self.duplicates:
                    self.duplicates.append(group)
                processed.update(group)

        return self.duplicates

create_shingles(text)

Create shingles (word n-grams) from text.

Source code in eve/steps/dedup/minhash.py
38
39
40
41
def create_shingles(self, text: str) -> set[str]:
    """Create shingles (word n-grams) from text."""
    words = text.lower().split()
    return {" ".join(gram) for gram in ngrams(words, self.shingle_size)}

find_duplicates()

Find near-duplicate documents using LSH.

Source code in eve/steps/dedup/minhash.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def find_duplicates(self) -> list[list[Document]]:
    """Find near-duplicate documents using LSH."""
    file_hashes = self._do_lsh()
    processed = set()

    for doc in self.documents:
        if doc in processed:
            continue

        m = self.doc_hashes[doc]
        candidates = file_hashes.query(m)

        # Convert LSH string keys back to Document objects
        candidate_docs = [
            d for d in self.documents if str(d.file_path) in candidates and d != doc
        ]

        if candidate_docs:
            group = [doc, *candidate_docs]
            group = sorted(group, key = lambda d: str(d.file_path))  # consistent ordering
            if group not in self.duplicates:
                self.duplicates.append(group)
            processed.update(group)

    return self.duplicates

Cleaning Stage

Cleans and improves document quality.

Comprehensive cleaning step that applies all data cleaning components.

CleaningStep

Bases: PipelineStep

Comprehensive cleaning step that applies multiple data cleaning components.

This step processes extracted text through various cleaning components to:

  • Fix OCR-induced errors
  • Remove OCR duplicates
  • Apply Nougat corrections
  • Apply rule-based corrections
  • Remove Nougat artifacts
  • Correct LaTeX syntax errors (optional)
Source code in eve/steps/cleaning/cleaning_step.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
class CleaningStep(PipelineStep):
    """
    Comprehensive cleaning step that applies multiple data cleaning components.

    This step processes extracted text through various cleaning components to:

    - Fix OCR-induced errors
    - Remove OCR duplicates
    - Apply Nougat corrections
    - Apply rule-based corrections
    - Remove Nougat artifacts
    - Correct LaTeX syntax errors (optional)
    """

    def __init__(self, config: dict):
        """Initialize the cleaning step with configuration.

        Args:
            config: Configuration dictionary with component settings.

                Expected keys:

                - ocr_threshold: float (default 0.99) - OCR duplicate threshold
                - min_words: int (default 2) - Minimum words for processing
                - enable_latex_correction: bool (default False) - Enable LaTeX correction
                - openrouter_api_key: str (optional) - API key for LaTeX correction
                - openrouter_model: str (default "anthropic/claude-3-haiku") - Model for corrections
                - debug: bool (default False) - Enable debug output
        """
        super().__init__(config, name="CleaningStep")

        ocr_threshold = config.get("ocr_threshold", 0.99)
        min_words = config.get("min_words", 2)
        enable_latex = config.get("enable_latex_correction", False)
        openrouter_key = config.get("openrouter_api_key")
        openrouter_model = config.get("openrouter_model", "anthropic/claude-3-haiku")

        self.processors = [
            OCRProcessor(debug=self.debug),
            DuplicateRemovalProcessor(threshold=ocr_threshold, min_words=min_words, debug=self.debug),
            NougatProcessor(debug=self.debug),
            RuleBasedProcessor(debug=self.debug),
        ]

        if enable_latex:
            self.processors.append(
                LaTeXProcessor(
                    debug=self.debug,
                    api_key=openrouter_key,
                    model=openrouter_model
                )
            )

    async def execute(self, documents: List[Document]) -> List[Document]:
        """Execute the cleaning step on input data.

        Args:
            documents: List of Documents.

        Returns:
            List of cleaned Documents.
        """
        self.logger.info(f"Executing cleaning step on {len(documents)} documents")

        if not documents:
            self.logger.warning("No input data provided to cleaning step")
            return []

        result = []
        processed_count = 0
        failed_count = 0

        for document in documents:
            if document.is_empty():
                self.logger.warning(f"{document.filename} - Empty content, skipping cleaning")
                result.append(document)
                failed_count += 1
                continue

            try:
                processed_document = document
                original_length = document.content_length

                for processor in self.processors:
                    try:
                        processed_document = await processor.process(processed_document)

                        if processed_document is None:
                            self.logger.error(f"{document.filename} - Processor {processor.__class__.__name__} returned None")
                            processed_document = document
                            break

                    except Exception as e:
                        self.logger.error(f"{document.filename} - Processor {processor.__class__.__name__} failed: {str(e)}")
                        continue

                if original_length > 0 and processed_document.content_length != original_length:
                    reduction_percent = ((original_length - processed_document.content_length) / original_length) * 100

                    if reduction_percent > 0:
                        self.logger.info(f"{document.filename} - Cleaned: {reduction_percent:.2f}% text removed ({original_length} -> {processed_document.content_length} chars)")
                    else:
                        self.logger.info(f"{document.filename} - Cleaned: No significant changes")

                result.append(processed_document)
                processed_count += 1

            except Exception as e:
                self.logger.error(f"{document.filename} - Cleaning failed: {str(e)}")
                result.append(document)
                failed_count += 1

        self.logger.info(f"Cleaning step completed: {processed_count} processed, {failed_count} failed")
        return result

    def _get_applicable_formats(self) -> List[str]:
        """Get list of formats that these cleaning components apply to.

        Returns:
            List of file formats that can be processed by cleaning components.
        """
        return [
            "md",
            "txt",
            "tex",
            "html",
            "xml",
        ]

    def get_component_info(self) -> dict:
        """Get information about enabled cleaning processors.

        Returns:
            Dictionary with processor information.
        """
        component_info = {
            "total_processors": len(self.processors),
            "processors": [processor.__class__.__name__ for processor in self.processors],
            "applicable_formats": self._get_applicable_formats(),
            "debug_enabled": self.debug
        }

        latex_enabled = any(isinstance(proc, LaTeXProcessor) for proc in self.processors)
        component_info["latex_correction_enabled"] = latex_enabled

        return component_info

__init__(config)

Initialize the cleaning step with configuration.

Parameters:

Name Type Description Default
config dict

Configuration dictionary with component settings.

Expected keys:

  • ocr_threshold: float (default 0.99) - OCR duplicate threshold
  • min_words: int (default 2) - Minimum words for processing
  • enable_latex_correction: bool (default False) - Enable LaTeX correction
  • openrouter_api_key: str (optional) - API key for LaTeX correction
  • openrouter_model: str (default "anthropic/claude-3-haiku") - Model for corrections
  • debug: bool (default False) - Enable debug output
required
Source code in eve/steps/cleaning/cleaning_step.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def __init__(self, config: dict):
    """Initialize the cleaning step with configuration.

    Args:
        config: Configuration dictionary with component settings.

            Expected keys:

            - ocr_threshold: float (default 0.99) - OCR duplicate threshold
            - min_words: int (default 2) - Minimum words for processing
            - enable_latex_correction: bool (default False) - Enable LaTeX correction
            - openrouter_api_key: str (optional) - API key for LaTeX correction
            - openrouter_model: str (default "anthropic/claude-3-haiku") - Model for corrections
            - debug: bool (default False) - Enable debug output
    """
    super().__init__(config, name="CleaningStep")

    ocr_threshold = config.get("ocr_threshold", 0.99)
    min_words = config.get("min_words", 2)
    enable_latex = config.get("enable_latex_correction", False)
    openrouter_key = config.get("openrouter_api_key")
    openrouter_model = config.get("openrouter_model", "anthropic/claude-3-haiku")

    self.processors = [
        OCRProcessor(debug=self.debug),
        DuplicateRemovalProcessor(threshold=ocr_threshold, min_words=min_words, debug=self.debug),
        NougatProcessor(debug=self.debug),
        RuleBasedProcessor(debug=self.debug),
    ]

    if enable_latex:
        self.processors.append(
            LaTeXProcessor(
                debug=self.debug,
                api_key=openrouter_key,
                model=openrouter_model
            )
        )

execute(documents) async

Execute the cleaning step on input data.

Parameters:

Name Type Description Default
documents List[Document]

List of Documents.

required

Returns:

Type Description
List[Document]

List of cleaned Documents.

Source code in eve/steps/cleaning/cleaning_step.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
async def execute(self, documents: List[Document]) -> List[Document]:
    """Execute the cleaning step on input data.

    Args:
        documents: List of Documents.

    Returns:
        List of cleaned Documents.
    """
    self.logger.info(f"Executing cleaning step on {len(documents)} documents")

    if not documents:
        self.logger.warning("No input data provided to cleaning step")
        return []

    result = []
    processed_count = 0
    failed_count = 0

    for document in documents:
        if document.is_empty():
            self.logger.warning(f"{document.filename} - Empty content, skipping cleaning")
            result.append(document)
            failed_count += 1
            continue

        try:
            processed_document = document
            original_length = document.content_length

            for processor in self.processors:
                try:
                    processed_document = await processor.process(processed_document)

                    if processed_document is None:
                        self.logger.error(f"{document.filename} - Processor {processor.__class__.__name__} returned None")
                        processed_document = document
                        break

                except Exception as e:
                    self.logger.error(f"{document.filename} - Processor {processor.__class__.__name__} failed: {str(e)}")
                    continue

            if original_length > 0 and processed_document.content_length != original_length:
                reduction_percent = ((original_length - processed_document.content_length) / original_length) * 100

                if reduction_percent > 0:
                    self.logger.info(f"{document.filename} - Cleaned: {reduction_percent:.2f}% text removed ({original_length} -> {processed_document.content_length} chars)")
                else:
                    self.logger.info(f"{document.filename} - Cleaned: No significant changes")

            result.append(processed_document)
            processed_count += 1

        except Exception as e:
            self.logger.error(f"{document.filename} - Cleaning failed: {str(e)}")
            result.append(document)
            failed_count += 1

    self.logger.info(f"Cleaning step completed: {processed_count} processed, {failed_count} failed")
    return result

get_component_info()

Get information about enabled cleaning processors.

Returns:

Type Description
dict

Dictionary with processor information.

Source code in eve/steps/cleaning/cleaning_step.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def get_component_info(self) -> dict:
    """Get information about enabled cleaning processors.

    Returns:
        Dictionary with processor information.
    """
    component_info = {
        "total_processors": len(self.processors),
        "processors": [processor.__class__.__name__ for processor in self.processors],
        "applicable_formats": self._get_applicable_formats(),
        "debug_enabled": self.debug
    }

    latex_enabled = any(isinstance(proc, LaTeXProcessor) for proc in self.processors)
    component_info["latex_correction_enabled"] = latex_enabled

    return component_info

Cleaning Components

Processors

Consolidated text processing components for the cleaning pipeline.

This module combines all the individual cleaning components into a unified structure for better organization and maintainability.

DuplicateRemovalProcessor

Bases: TextProcessor

Processor for removing OCR-induced duplicate text segments.

Source code in eve/steps/cleaning/processors.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
class DuplicateRemovalProcessor(TextProcessor):
    """Processor for removing OCR-induced duplicate text segments."""

    def __init__(
        self, threshold: float = 0.99, min_words: int = 2, debug: bool = False
    ):
        """
        Initialize the duplicate removal processor.

        Args:
            threshold: Similarity threshold for duplicates.
            min_words: Minimum words required for a unit to be processed.
            debug: Enable debug output.
        """
        super().__init__(debug=debug)
        self.threshold = threshold
        self.min_words = min_words

    def _is_similar(self, sent1: str, sent2: str) -> bool:
        """Check if two sentences are similar based on word overlap."""
        words1 = sent1.lower().split()
        words2 = sent2.lower().split()

        if len(words1) < self.min_words:
            return False

        set1, set2 = set(words1), set(words2)
        overlap = len(set1 & set2)
        return (
            overlap / len(set1) >= self.threshold
            or overlap / len(set2) >= self.threshold
        )

    def _remove_near_adjacent_duplicates(
        self, content: str, filename: str
    ) -> Tuple[str, List[str]]:
        """Remove near-adjacent duplicate sentences."""
        sentences = content.split("\n")
        cleaned = []
        removed = []
        i = 0

        while i < len(sentences):
            current = sentences[i]
            if len(current.split()) < self.min_words:
                cleaned.append(current)
                i += 1
                continue

            j = i + 1
            while j < len(sentences) and not sentences[j].strip():
                j += 1

            if j < len(sentences) and self._is_similar(current, sentences[j]):
                self.logger.info(
                    f"{filename} - Removing near-duplicate: {repr(sentences[j])}"
                )
                removed.append(sentences[j])
                i = j
            else:
                cleaned.append(current)
                i += 1

        return "\n".join(cleaned), removed

    async def process(self, document: Document) -> Document:
        """Remove duplicate content from the document."""
        if self.debug:
            self.logger.info(
                f"Before duplicate removal ({document.filename}): {document.content[:200]}..."
            )

        if document.is_empty():
            self.logger.warning(
                f"{document.filename} - Empty content in duplicate removal"
            )
            return document

        try:
            cleaned_content, removed = self._remove_near_adjacent_duplicates(
                document.content, document.filename
            )

            percent_removed = 0.0
            if document.content:
                percent_removed = (
                    (len(document.content) - len(cleaned_content))
                    / len(document.content)
                    * 100
                )

            document.update_content(cleaned_content)
            document.add_metadata("duplicates_removed", len(removed))
            document.add_metadata("duplicate_removal_percent", percent_removed)

            self.logger.info(
                f"{document.filename} - Duplicate removal: {len(removed)} segments, {percent_removed:.2f}% text removed"
            )

            if self.debug:
                self.logger.info(
                    f"After duplicate removal ({document.filename}): {document.content[:200]}..."
                )

            return document

        except Exception as e:
            self.logger.error(
                f"{document.filename} - Duplicate removal failed: {str(e)}"
            )
            return document

__init__(threshold=0.99, min_words=2, debug=False)

Initialize the duplicate removal processor.

Parameters:

Name Type Description Default
threshold float

Similarity threshold for duplicates.

0.99
min_words int

Minimum words required for a unit to be processed.

2
debug bool

Enable debug output.

False
Source code in eve/steps/cleaning/processors.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def __init__(
    self, threshold: float = 0.99, min_words: int = 2, debug: bool = False
):
    """
    Initialize the duplicate removal processor.

    Args:
        threshold: Similarity threshold for duplicates.
        min_words: Minimum words required for a unit to be processed.
        debug: Enable debug output.
    """
    super().__init__(debug=debug)
    self.threshold = threshold
    self.min_words = min_words

process(document) async

Remove duplicate content from the document.

Source code in eve/steps/cleaning/processors.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
async def process(self, document: Document) -> Document:
    """Remove duplicate content from the document."""
    if self.debug:
        self.logger.info(
            f"Before duplicate removal ({document.filename}): {document.content[:200]}..."
        )

    if document.is_empty():
        self.logger.warning(
            f"{document.filename} - Empty content in duplicate removal"
        )
        return document

    try:
        cleaned_content, removed = self._remove_near_adjacent_duplicates(
            document.content, document.filename
        )

        percent_removed = 0.0
        if document.content:
            percent_removed = (
                (len(document.content) - len(cleaned_content))
                / len(document.content)
                * 100
            )

        document.update_content(cleaned_content)
        document.add_metadata("duplicates_removed", len(removed))
        document.add_metadata("duplicate_removal_percent", percent_removed)

        self.logger.info(
            f"{document.filename} - Duplicate removal: {len(removed)} segments, {percent_removed:.2f}% text removed"
        )

        if self.debug:
            self.logger.info(
                f"After duplicate removal ({document.filename}): {document.content[:200]}..."
            )

        return document

    except Exception as e:
        self.logger.error(
            f"{document.filename} - Duplicate removal failed: {str(e)}"
        )
        return document

LaTeXProcessor

Bases: TextProcessor

Processor for detecting and correcting LaTeX syntax errors.

Source code in eve/steps/cleaning/processors.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
class LaTeXProcessor(TextProcessor):
    """Processor for detecting and correcting LaTeX syntax errors."""

    def __init__(
        self,
        debug: bool = False,
        api_key: Optional[str] = None,
        model: str = "anthropic/claude-3-haiku",
    ):
        """Initialize the LaTeX processor.

        Args:
            debug: Enable debug output.
            api_key: OpenRouter API key. If None, will use OPENROUTER_API_KEY environment variable.
            model: OpenRouter model to use for corrections.
        """
        super().__init__(debug=debug)
        self.api_key = api_key or os.getenv("OPENROUTER_API_KEY")
        self.model = model
        self.formula_patterns = get_latex_formula_patterns()

        if not self.api_key and debug:
            self.logger.warning(
                "No OPENROUTER_API_KEY found. LaTeX correction will only detect errors."
            )

    def _extract_latex_formulas(self, content: str) -> List[Tuple[str, str]]:
        """Extract LaTeX formulas from content with their types."""
        formulas = []

        for formula_type, pattern in self.formula_patterns.items():
            for match in pattern.finditer(content):
                if formula_type == "environment":
                    env_name = match.group(1)
                    env_content = match.group(2).strip()
                    formulas.append(
                        (
                            formula_type,
                            f"\\begin{{{env_name}}}{env_content}\\end{{{env_name}}}",
                        )
                    )
                else:
                    formulas.append((formula_type, match.group(1).strip()))

        return formulas

    async def _check_formula_syntax(
        self, formula: str, formula_type: str
    ) -> Tuple[bool, str]:
        """Check if a LaTeX formula has valid syntax using pdflatex package."""
        try:
            # Create appropriate test content based on formula type
            if formula_type == "inline":
                test_content = f"\\documentclass{{article}}\\usepackage{{amsmath}}\\usepackage{{amssymb}}\\begin{{document}}${formula}$\\end{{document}}"
            elif formula_type == "display":
                test_content = f"\\documentclass{{article}}\\usepackage{{amsmath}}\\usepackage{{amssymb}}\\begin{{document}}$${formula}$$\\end{{document}}"
            elif formula_type == "bracket":
                test_content = f"\\documentclass{{article}}\\usepackage{{amsmath}}\\usepackage{{amssymb}}\\begin{{document}}\\({formula}\\)\\end{{document}}"
            elif formula_type == "square_bracket":
                test_content = f"\\documentclass{{article}}\\usepackage{{amsmath}}\\usepackage{{amssymb}}\\begin{{document}}\\[{formula}\\]\\end{{document}}"
            else:
                test_content = f"\\documentclass{{article}}\\usepackage{{amsmath}}\\usepackage{{amssymb}}\\usepackage{{multirow}}\\usepackage{{bm}}\\begin{{document}}{formula}\\end{{document}}"

            def check_latex():
                """Run pdflatex compilation in a separate thread."""
                try:
                    with tempfile.TemporaryDirectory() as tmp_dir:
                        tex_file = os.path.join(tmp_dir, "test.tex")
                        with open(tex_file, "w", encoding="utf-8") as f:
                            f.write(test_content)

                        pdfl = PDFLaTeX.from_texfile(tex_file)
                        _, log, completed_process = pdfl.create_pdf(
                            keep_pdf_file=False, keep_log_file=False
                        )

                        if completed_process.returncode == 0:
                            return True, "Formula syntax is valid"
                        else:
                            # Parse log for error messages
                            log_content = (
                                log.decode("utf-8", errors="replace")
                                if log
                                else "No log available"
                            )
                            error_lines = log_content.split("\n")
                            error_msg = "Unknown error"
                            for i, line in enumerate(error_lines):
                                if "! " in line:
                                    error_msg = line.strip()
                                    if (
                                        i + 1 < len(error_lines)
                                        and error_lines[i + 1].strip()
                                    ):
                                        error_msg += " " + error_lines[i + 1].strip()
                                    break
                            return False, error_msg
                except Exception as e:
                    return False, f"PDFLaTeX compilation failed: {str(e)}"

            # Run the blocking operation in a thread pool
            return await asyncio.get_event_loop().run_in_executor(None, check_latex)

        except Exception as e:
            return False, f"Syntax check failed: {str(e)}"

    def _replace_formula_in_content(
        self, content: str, original: str, corrected: str, formula_type: str
    ) -> str:
        """Replace original formula with corrected version in content."""
        try:
            if formula_type == "inline":
                pattern = re.escape(f"${original}$")
                replacement = f"${corrected}$"
            elif formula_type == "display":
                pattern = re.escape(f"$${original}$$")
                replacement = f"$${corrected}$$"
            elif formula_type == "bracket":
                pattern = re.escape(f"\\({original}\\)")
                replacement = f"\\({corrected}\\)"
            elif formula_type == "square_bracket":
                pattern = re.escape(f"\\[{original}\\]")
                replacement = f"\\[{corrected}\\]"
            else:
                pattern = re.escape(original)
                replacement = corrected

            return re.sub(pattern, replacement, content, count=1)
        except Exception:
            return content.replace(original, corrected, 1)

    async def process(self, document: Document) -> Document:
        """Process document to detect and correct LaTeX syntax errors."""
        if self.debug:
            self.logger.info(
                f"Before LaTeX processing ({document.filename}): {document.content[:200]}..."
            )

        if document.is_empty():
            self.logger.warning(
                f"{document.filename} - Empty content in LaTeX processing"
            )
            return document

        try:
            formulas = self._extract_latex_formulas(document.content)

            if not formulas:
                self.logger.info(f"{document.filename} - No LaTeX formulas found")
                document.add_metadata("latex_processed", True)
                return document

            errors_found = 0
            corrections_made = 0
            modified_content = document.content

            for formula_type, formula in formulas:
                is_valid, error_message = await self._check_formula_syntax(
                    formula, formula_type
                )

                if not is_valid:
                    errors_found += 1
                    self.logger.warning(
                        f"{document.filename} - Invalid LaTeX formula: {formula[:10]}... Error: {error_message}"
                    )

                    if self.api_key:
                        prompt = get_latex_correction_prompt(
                            formula_type, error_message, formula, document.content
                        )
                        corrected_formula = await make_openrouter_request(
                            self.api_key, self.model, prompt
                        )

                        if corrected_formula and corrected_formula != formula:
                            is_corrected_valid, _ = await self._check_formula_syntax(
                                corrected_formula, formula_type
                            )

                            if is_corrected_valid:
                                modified_content = self._replace_formula_in_content(
                                    modified_content,
                                    formula,
                                    corrected_formula,
                                    formula_type,
                                )
                                corrections_made += 1
                                self.logger.info(
                                    f"{document.filename} - Corrected LaTeX formula: {formula[:30]}... -> {corrected_formula[:30]}..."
                                )
                            else:
                                self.logger.warning(
                                    f"{document.filename} - LLM correction still invalid: {corrected_formula[:50]}..."
                                )

            document.update_content(modified_content)
            document.add_metadata("latex_errors_found", errors_found)
            document.add_metadata("latex_corrections_made", corrections_made)
            document.add_metadata("latex_processed", True)

            if errors_found > 0:
                self.logger.info(
                    f"{document.filename} - LaTeX processing complete: {errors_found} errors found, {corrections_made} corrected"
                )
            else:
                self.logger.info(f"{document.filename} - All LaTeX formulas are valid")

            if self.debug:
                self.logger.info(
                    f"After LaTeX processing ({document.filename}): {errors_found} errors, {corrections_made} fixed"
                )

            return document

        except Exception as e:
            self.logger.error(
                f"{document.filename} - LaTeX processing failed: {str(e)}"
            )
            return document

__init__(debug=False, api_key=None, model='anthropic/claude-3-haiku')

Initialize the LaTeX processor.

Parameters:

Name Type Description Default
debug bool

Enable debug output.

False
api_key Optional[str]

OpenRouter API key. If None, will use OPENROUTER_API_KEY environment variable.

None
model str

OpenRouter model to use for corrections.

'anthropic/claude-3-haiku'
Source code in eve/steps/cleaning/processors.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def __init__(
    self,
    debug: bool = False,
    api_key: Optional[str] = None,
    model: str = "anthropic/claude-3-haiku",
):
    """Initialize the LaTeX processor.

    Args:
        debug: Enable debug output.
        api_key: OpenRouter API key. If None, will use OPENROUTER_API_KEY environment variable.
        model: OpenRouter model to use for corrections.
    """
    super().__init__(debug=debug)
    self.api_key = api_key or os.getenv("OPENROUTER_API_KEY")
    self.model = model
    self.formula_patterns = get_latex_formula_patterns()

    if not self.api_key and debug:
        self.logger.warning(
            "No OPENROUTER_API_KEY found. LaTeX correction will only detect errors."
        )

process(document) async

Process document to detect and correct LaTeX syntax errors.

Source code in eve/steps/cleaning/processors.py
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
async def process(self, document: Document) -> Document:
    """Process document to detect and correct LaTeX syntax errors."""
    if self.debug:
        self.logger.info(
            f"Before LaTeX processing ({document.filename}): {document.content[:200]}..."
        )

    if document.is_empty():
        self.logger.warning(
            f"{document.filename} - Empty content in LaTeX processing"
        )
        return document

    try:
        formulas = self._extract_latex_formulas(document.content)

        if not formulas:
            self.logger.info(f"{document.filename} - No LaTeX formulas found")
            document.add_metadata("latex_processed", True)
            return document

        errors_found = 0
        corrections_made = 0
        modified_content = document.content

        for formula_type, formula in formulas:
            is_valid, error_message = await self._check_formula_syntax(
                formula, formula_type
            )

            if not is_valid:
                errors_found += 1
                self.logger.warning(
                    f"{document.filename} - Invalid LaTeX formula: {formula[:10]}... Error: {error_message}"
                )

                if self.api_key:
                    prompt = get_latex_correction_prompt(
                        formula_type, error_message, formula, document.content
                    )
                    corrected_formula = await make_openrouter_request(
                        self.api_key, self.model, prompt
                    )

                    if corrected_formula and corrected_formula != formula:
                        is_corrected_valid, _ = await self._check_formula_syntax(
                            corrected_formula, formula_type
                        )

                        if is_corrected_valid:
                            modified_content = self._replace_formula_in_content(
                                modified_content,
                                formula,
                                corrected_formula,
                                formula_type,
                            )
                            corrections_made += 1
                            self.logger.info(
                                f"{document.filename} - Corrected LaTeX formula: {formula[:30]}... -> {corrected_formula[:30]}..."
                            )
                        else:
                            self.logger.warning(
                                f"{document.filename} - LLM correction still invalid: {corrected_formula[:50]}..."
                            )

        document.update_content(modified_content)
        document.add_metadata("latex_errors_found", errors_found)
        document.add_metadata("latex_corrections_made", corrections_made)
        document.add_metadata("latex_processed", True)

        if errors_found > 0:
            self.logger.info(
                f"{document.filename} - LaTeX processing complete: {errors_found} errors found, {corrections_made} corrected"
            )
        else:
            self.logger.info(f"{document.filename} - All LaTeX formulas are valid")

        if self.debug:
            self.logger.info(
                f"After LaTeX processing ({document.filename}): {errors_found} errors, {corrections_made} fixed"
            )

        return document

    except Exception as e:
        self.logger.error(
            f"{document.filename} - LaTeX processing failed: {str(e)}"
        )
        return document

NougatProcessor

Bases: TextProcessor

Processor for fixing Nougat-related issues and artifacts.

Source code in eve/steps/cleaning/processors.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
class NougatProcessor(TextProcessor):
    """Processor for fixing Nougat-related issues and artifacts."""

    async def process(self, document: Document) -> Document:
        """Process Nougat-specific issues in the document."""
        if self.debug:
            self.logger.info(
                f"Before Nougat processing ({document.filename}): {document.content[:200]}..."
            )

        if document.is_empty():
            self.logger.warning(
                f"{document.filename} - Empty content in Nougat processing"
            )
            return document

        try:
            # Apply Nougat postprocessing
            cleaned = postprocess_single(document.content, markdown_fix=True)

            # Clean LaTeX table formatting
            cleaned = clean_doubled_backslashes(cleaned)

            # Remove Nougat artifacts
            cleaned = remove_nougat_artifacts(cleaned)

            # Convert escaped newlines
            cleaned = cleaned.replace("\\n", "\n")

            # Remove surrounding quotes
            cleaned = cleaned.strip('"')

            document.update_content(cleaned)
            document.add_metadata("nougat_processed", True)

            self.logger.info(f"{document.filename} - Nougat processing completed")

            if self.debug:
                self.logger.info(
                    f"After Nougat processing ({document.filename}): {document.content[:200]}..."
                )

            return document

        except Exception as e:
            self.logger.error(
                f"{document.filename} - Nougat processing failed: {str(e)}"
            )
            return document

process(document) async

Process Nougat-specific issues in the document.

Source code in eve/steps/cleaning/processors.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
async def process(self, document: Document) -> Document:
    """Process Nougat-specific issues in the document."""
    if self.debug:
        self.logger.info(
            f"Before Nougat processing ({document.filename}): {document.content[:200]}..."
        )

    if document.is_empty():
        self.logger.warning(
            f"{document.filename} - Empty content in Nougat processing"
        )
        return document

    try:
        # Apply Nougat postprocessing
        cleaned = postprocess_single(document.content, markdown_fix=True)

        # Clean LaTeX table formatting
        cleaned = clean_doubled_backslashes(cleaned)

        # Remove Nougat artifacts
        cleaned = remove_nougat_artifacts(cleaned)

        # Convert escaped newlines
        cleaned = cleaned.replace("\\n", "\n")

        # Remove surrounding quotes
        cleaned = cleaned.strip('"')

        document.update_content(cleaned)
        document.add_metadata("nougat_processed", True)

        self.logger.info(f"{document.filename} - Nougat processing completed")

        if self.debug:
            self.logger.info(
                f"After Nougat processing ({document.filename}): {document.content[:200]}..."
            )

        return document

    except Exception as e:
        self.logger.error(
            f"{document.filename} - Nougat processing failed: {str(e)}"
        )
        return document

OCRProcessor

Bases: TextProcessor

Processor for fixing OCR-induced text issues.

Source code in eve/steps/cleaning/processors.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class OCRProcessor(TextProcessor):
    """Processor for fixing OCR-induced text issues."""

    async def process(self, document: Document) -> Document:
        """Fix OCR issues in the document content."""
        if self.debug:
            self.logger.info(
                f"Before OCR processing ({document.filename}): {document.content[:200]}..."
            )

        if document.is_empty():
            self.logger.warning(
                f"{document.filename} - Empty content in OCR processing"
            )
            return document

        try:
            # Fix digit-letter spacing issues
            cleaned_content = fix_ocr_digit_letter_spacing(document.content)

            document.update_content(cleaned_content)
            document.add_metadata("ocr_processed", True)

            self.logger.info(f"{document.filename} - OCR processing completed")

            if self.debug:
                self.logger.info(
                    f"After OCR processing ({document.filename}): {document.content[:200]}..."
                )

            return document

        except Exception as e:
            self.logger.error(f"{document.filename} - OCR processing failed: {str(e)}")
            return document

process(document) async

Fix OCR issues in the document content.

Source code in eve/steps/cleaning/processors.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
async def process(self, document: Document) -> Document:
    """Fix OCR issues in the document content."""
    if self.debug:
        self.logger.info(
            f"Before OCR processing ({document.filename}): {document.content[:200]}..."
        )

    if document.is_empty():
        self.logger.warning(
            f"{document.filename} - Empty content in OCR processing"
        )
        return document

    try:
        # Fix digit-letter spacing issues
        cleaned_content = fix_ocr_digit_letter_spacing(document.content)

        document.update_content(cleaned_content)
        document.add_metadata("ocr_processed", True)

        self.logger.info(f"{document.filename} - OCR processing completed")

        if self.debug:
            self.logger.info(
                f"After OCR processing ({document.filename}): {document.content[:200]}..."
            )

        return document

    except Exception as e:
        self.logger.error(f"{document.filename} - OCR processing failed: {str(e)}")
        return document

RuleBasedProcessor

Bases: TextProcessor

Processor for applying rule-based text corrections.

Source code in eve/steps/cleaning/processors.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
class RuleBasedProcessor(TextProcessor):
    """Processor for applying rule-based text corrections."""

    async def process(self, document: Document) -> Document:
        """Apply rule-based corrections to the document."""
        if self.debug:
            self.logger.info(
                f"Before rule-based processing ({document.filename}): {document.content[:200]}..."
            )

        if document.is_empty():
            self.logger.warning(
                f"{document.filename} - Empty content in rule-based processing"
            )
            return document

        try:
            # Remove single symbol lines
            cleaned = remove_single_symbol_lines(document.content)

            # Normalize excessive newlines
            cleaned = normalize_excessive_newlines(cleaned)

            # Trim whitespace
            cleaned = cleaned.strip()

            document.update_content(cleaned)
            document.add_metadata("rule_based_processed", True)

            self.logger.info(f"{document.filename} - Rule-based processing completed")

            if self.debug:
                self.logger.info(
                    f"After rule-based processing ({document.filename}): {document.content[:200]}..."
                )

            return document

        except Exception as e:
            self.logger.error(
                f"{document.filename} - Rule-based processing failed: {str(e)}"
            )
            return document

process(document) async

Apply rule-based corrections to the document.

Source code in eve/steps/cleaning/processors.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
async def process(self, document: Document) -> Document:
    """Apply rule-based corrections to the document."""
    if self.debug:
        self.logger.info(
            f"Before rule-based processing ({document.filename}): {document.content[:200]}..."
        )

    if document.is_empty():
        self.logger.warning(
            f"{document.filename} - Empty content in rule-based processing"
        )
        return document

    try:
        # Remove single symbol lines
        cleaned = remove_single_symbol_lines(document.content)

        # Normalize excessive newlines
        cleaned = normalize_excessive_newlines(cleaned)

        # Trim whitespace
        cleaned = cleaned.strip()

        document.update_content(cleaned)
        document.add_metadata("rule_based_processed", True)

        self.logger.info(f"{document.filename} - Rule-based processing completed")

        if self.debug:
            self.logger.info(
                f"After rule-based processing ({document.filename}): {document.content[:200]}..."
            )

        return document

    except Exception as e:
        self.logger.error(
            f"{document.filename} - Rule-based processing failed: {str(e)}"
        )
        return document

TextProcessor

Bases: ABC

Abstract base class for text processing components.

Source code in eve/steps/cleaning/processors.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class TextProcessor(ABC):
    """Abstract base class for text processing components."""

    def __init__(self, debug: bool = False):
        """Initialize the text processor.

        Args:
            debug: Enable debug output.
        """
        self.debug = debug
        self.logger = get_logger(self.__class__.__name__)

    @abstractmethod
    async def process(self, document: Document) -> Document:
        """Process a document and return the cleaned result.

        Args:
            document: The document to process.

        Returns:
            Processed document.
        """
        pass

__init__(debug=False)

Initialize the text processor.

Parameters:

Name Type Description Default
debug bool

Enable debug output.

False
Source code in eve/steps/cleaning/processors.py
35
36
37
38
39
40
41
42
def __init__(self, debug: bool = False):
    """Initialize the text processor.

    Args:
        debug: Enable debug output.
    """
    self.debug = debug
    self.logger = get_logger(self.__class__.__name__)

process(document) abstractmethod async

Process a document and return the cleaned result.

Parameters:

Name Type Description Default
document Document

The document to process.

required

Returns:

Type Description
Document

Processed document.

Source code in eve/steps/cleaning/processors.py
44
45
46
47
48
49
50
51
52
53
54
@abstractmethod
async def process(self, document: Document) -> Document:
    """Process a document and return the cleaned result.

    Args:
        document: The document to process.

    Returns:
        Processed document.
    """
    pass

Nougat Helpers

Copyright (c) Meta Platforms, Inc. and affiliates.

This source code is licensed under the MIT license found in the LICENSE file in the root directory of this source tree.

Script from here - https://github.com/facebookresearch/nougat/blob/main/nougat/postprocessing.py

get_slices(lines, clean_lines)

Get slices of potentially hallucinated reference sections.

Source code in eve/steps/cleaning/nougat_helpers.py
103
104
105
106
107
108
109
110
111
112
def get_slices(lines: List[str], clean_lines: List[str]) -> List[slice]:
    """Get slices of potentially hallucinated reference sections."""
    slices = []
    for i, line in enumerate(lines):
        if line.strip().lower().startswith('## references'):
            j = i + 1
            while j < len(lines) and not lines[j].strip().startswith('##'):
                j += 1
            slices.append(slice(i, j))
    return slices

markdown_compatible(s)

Make text compatible with Markdown formatting.

This function makes various text formatting adjustments to make it compatible with Markdown.

Parameters:

Name Type Description Default
s str

The input text to be made Markdown-compatible.

required

Returns:

Name Type Description
str str

The Markdown-compatible text.

Source code in eve/steps/cleaning/nougat_helpers.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def markdown_compatible(s: str) -> str:
    """
    Make text compatible with Markdown formatting.

    This function makes various text formatting adjustments to make it compatible with Markdown.

    Args:
        s (str): The input text to be made Markdown-compatible.

    Returns:
        str: The Markdown-compatible text.
    """
    s = re.sub(
        r"^\(([\d.]+[a-zA-Z]?)\) \\\[(.+?)\\\]$", r"\[\2 \\tag{\1}\]", s, flags=re.M
    )
    s = re.sub(
        r"^\\\[(.+?)\\\] \(([\d.]+[a-zA-Z]?)\)$", r"\[\1 \\tag{\2}\]", s, flags=re.M
    )
    s = re.sub(
        r"^\\\[(.+?)\\\] \(([\d.]+[a-zA-Z]?)\) (\\\[.+?\\\])$",
        r"\[\1 \\tag{\2}\] \3",
        s,
        flags=re.M,
    )
    s = s.replace(r"\. ", ". ")
    s = s.replace(r"\.}", ".}")
    s = s.replace(r"\. }", ". }")
    s = s.replace(r"\.\]", ".]")
    s = s.replace(r"\. ]", ". ]")
    s = re.sub(r"\\begin\{table\}\s*\\begin\{tabular\}(.*?)\\end\{tabular\}\s*\\end\{table\}", r"\n\\begin{table}\n\\begin{tabular}\1\\end{tabular}\n\\end{table}\n", s, flags=re.DOTALL)

    s = re.sub(r"([^\s])\$([^\$]*)\$", r"\1 $\2$", s)
    s = re.sub(r"\$([^\$]*)\$([^\s])", r"$\1$ \2", s)

    return s

postprocess(generation, markdown_fix=True)

Postprocess generated text or a list of generated texts.

This function can be used to perform postprocessing on generated text, such as fixing Markdown formatting.

Parameters:

Name Type Description Default
generation Union[str, List[str]]

The generated text or a list of generated texts.

required
markdown_fix bool

Whether to perform Markdown formatting fixes. Default is True.

True

Returns:

Type Description
Union[str, List[str]]

Union[str, List[str]]: The postprocessed text or list of postprocessed texts.

Source code in eve/steps/cleaning/nougat_helpers.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def postprocess(
    generation: Union[str, List[str]], markdown_fix: bool = True
) -> Union[str, List[str]]:
    """
    Postprocess generated text or a list of generated texts.

    This function can be used to perform postprocessing on generated text, such as fixing Markdown formatting.

    Args:
        generation (Union[str, List[str]]): The generated text or a list of generated texts.
        markdown_fix (bool, optional): Whether to perform Markdown formatting fixes. Default is True.

    Returns:
        Union[str, List[str]]: The postprocessed text or list of postprocessed texts.
    """
    if type(generation) == list:
        if os.environ.get("NOUGAT_MULTIPROCESSING"):
            with Pool(int(os.environ.get("NOUGAT_MULTIPROCESSING"))) as p:
                return p.map(
                    partial(postprocess_single, markdown_fix=markdown_fix), generation
                )
        else:
            return [
                postprocess_single(s, markdown_fix=markdown_fix) for s in generation
            ]
    else:
        return postprocess_single(generation, markdown_fix=markdown_fix)

postprocess_single(generation, markdown_fix=True)

Postprocess a single generated text.

Parameters:

Name Type Description Default
generation str

The generated text to be postprocessed.

required
markdown_fix bool

Whether to perform Markdown formatting fixes. Default is True.

True

Returns:

Name Type Description
str str

The postprocessed text.

Source code in eve/steps/cleaning/nougat_helpers.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def postprocess_single(generation: str, markdown_fix: bool = True) -> str:
    """
    Postprocess a single generated text.

    Args:
        generation (str): The generated text to be postprocessed.
        markdown_fix (bool, optional): Whether to perform Markdown formatting fixes. Default is True.

    Returns:
        str: The postprocessed text.
    """
    generation = re.sub(
        r"(?:\n|^)#+ \d*\W? ?(.{100,})", r"\n\1", generation
    )
    generation = generation.strip()
    generation = generation.replace("\n* [leftmargin=*]\n", "\n")
    generation = re.sub(
        r"^#+ (?:\.?(?:\d|[ixv])+)*\s*(?:$|\n\s*)", "", generation, flags=re.M
    )
    lines = generation.split("\n")
    if (
        lines[-1].startswith("#")
        and lines[-1].lstrip("#").startswith(" ")
        and len(lines) > 1
    ):
        print("INFO: likely hallucinated title at the end of the page: " + lines[-1])
        generation = "\n".join(lines[:-1])
    generation = truncate_repetitions(generation)
    generation = remove_hallucinated_references(generation)
    generation = re.sub(
        r"^\* \[\d+\](\s?[A-W]\.+\s?){10,}.*$", "", generation, flags=re.M
    )
    generation = re.sub(r"^(\* \[\d+\])\[\](.*)$", r"\1\2", generation, flags=re.M)
    generation = re.sub(r"(^\w\n\n|\n\n\w$)", "", generation)
    generation = re.sub(
        r"([\s.,()])_([a-zA-Z0-9])__([a-zA-Z0-9]){1,3}_([\s.,:()])",
        r"\1\(\2_{\3}\)\4",
        generation,
    )
    generation = re.sub(
        r"([\s.,\d])_([a-zA-Z0-9])_([\s.,\d;])", r"\1\(\2\)\3", generation
    )
    generation = re.sub(
        r"(\nFootnote .*?:) (?:footnotetext|thanks):\W*(.*(?:\n\n|$))",
        r"\1 \2",
        generation,
    )
    generation = re.sub(r"\[FOOTNOTE:.+?\](.*?)\[ENDFOOTNOTE\]", "", generation)
    for match in reversed(
        list(
            re.finditer(
                r"(?:^)(-|\*)?(?!-|\*) ?((?:\d|[ixv])+ )?.+? (-|\*) (((?:\d|[ixv])+)\.(\d|[ixv]) )?.*(?:$)",
                generation,
                flags=re.I | re.M,
            )
        )
    ):
        start, stop = match.span()
        delim = match.group(3) + " "
        splits = match.group(0).split(delim)
        replacement = ""
        if match.group(1) is not None:
            splits = splits[1:]
            delim1 = match.group(1) + " "
        else:
            delim1 = ""
            continue
        pre, post = generation[:start], generation[stop:]
        for i, item in enumerate(splits):
            level = 0
            potential_numeral, _, rest = item.strip().partition(" ")
            if not rest:
                continue
            if re.match(
                r"^[\dixv]+((?:\.[\dixv])?)+$", potential_numeral, flags=re.I | re.M
            ):
                level = potential_numeral.count(".")

            replacement += (
                ("\n" if i > 0 else "")
                + ("\t" * level)
                + (delim if i > 0 or start == 0 else delim1)
                + item.strip()
            )
        if post == "":
            post = "\n"
        generation = pre + replacement + post

    if generation.endswith((".", "}")):
        generation += "\n\n"
    if re.match(r"[A-Z0-9,;:]$", generation):
        generation += " "
    elif generation.startswith(("#", "**", "\\begin")):
        generation = "\n\n" + generation
    elif generation.split("\n")[-1].startswith(("#", "Figure", "Table")):
        generation = generation + "\n\n"
    else:
        try:
            last_word = generation.split(" ")[-1]
            if last_word in words.words():
                generation += " "
        except LookupError:
            generation += " "
            import nltk

            nltk.download("words")
    for l in generation.split("\n"):
        if (
            l.count("\\begin{tabular}") > 15
            or l.count("\\multicolumn") > 60
            or l.count("&") > 400
        ):
            generation = generation.replace(l, "")
    generation = generation.replace(
        "\\begin{table} \\begin{tabular}", "\\begin{table}\n\\begin{tabular}"
    )
    generation = generation.replace(
        "\\end{tabular} \\end{table}", "\\end{tabular}\n\\end{table}"
    )
    generation = generation.replace("\\end{table} Tab", "\\end{table}\nTab")
    generation = re.sub(r"(^.+)\\begin{tab", r"\1\n\\begin{tab", generation, flags=re.M)

    generation = generation.replace(
        r"\begin{tabular}{l l}  & \\ \end{tabular}", ""
    ).replace("\\begin{tabular}{}\n\n\\end{tabular}", "")
    generation = generation.replace("\\begin{array}[]{", "\\begin{array}{")
    generation = re.sub(
        r"\\begin{tabular}{([clr ]){2,}}\s*[& ]*\s*(\\\\)? \\end{tabular}",
        "",
        generation,
    )
    generation = re.sub(r"(\*\*S\. A\. B\.\*\*\n+){2,}", "", generation)
    generation = re.sub(r"^#+( [\[\d\w])?$", "", generation, flags=re.M)
    generation = re.sub(r"^\.\s*$", "", generation, flags=re.M)
    generation = re.sub(r"\n{3,}", "\n\n", generation)
    if markdown_fix:
        return markdown_compatible(generation)
    else:
        return generation

remove_hallucinated_references(text)

Remove hallucinated or missing references from the text.

This function identifies and removes references that are marked as missing or hallucinated from the input text.

Parameters:

Name Type Description Default
text str

The input text containing references.

required

Returns:

Name Type Description
str str

The text with hallucinated references removed.

Source code in eve/steps/cleaning/nougat_helpers.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def remove_hallucinated_references(text: str) -> str:
    """
    Remove hallucinated or missing references from the text.

    This function identifies and removes references that are marked as missing or hallucinated
    from the input text.

    Args:
        text (str): The input text containing references.

    Returns:
        str: The text with hallucinated references removed.
    """
    lines = text.split("\n")
    if len(lines) == 0:
        return ""
    clean_lines = remove_numbers(lines)
    slices = get_slices(lines, clean_lines)
    to_delete = []
    for sli in slices:
        to_delete.append(remove_slice_from_lines(lines, clean_lines, sli))
    for to_delete in reversed(to_delete):
        text = text.replace(to_delete, "\n\n[MISSING_PAGE_POST]\n\n")
    text = re.sub(
        r"## References\n+\[MISSING_PAGE_POST(:\d+)?\]",
        "\n\n[MISSING_PAGE_POST\\1]",
        text,
    )
    return text

remove_numbers(lines)

Remove number patterns from lines.

Source code in eve/steps/cleaning/nougat_helpers.py
 93
 94
 95
 96
 97
 98
 99
100
def remove_numbers(lines: List[str]) -> List[str]:
    """Remove number patterns from lines."""
    clean_lines = []
    for line in lines:
        clean_line = re.sub(r'\[\d+\]', '', line)
        clean_line = re.sub(r'\d+\.', '', clean_line)
        clean_lines.append(clean_line.strip())
    return clean_lines

remove_slice_from_lines(lines, clean_lines, sli)

Remove slice from lines and return the removed text.

Source code in eve/steps/cleaning/nougat_helpers.py
115
116
117
118
def remove_slice_from_lines(lines: List[str], clean_lines: List[str], sli: slice) -> str:
    """Remove slice from lines and return the removed text."""
    removed_text = '\n'.join(lines[sli])
    return removed_text

truncate_repetitions(generation, score_cutoff=0.5, min_len=30)

Truncate repetitions in the given generation.

This function identifies and truncates repetitive content in the text.

Source code in eve/steps/cleaning/nougat_helpers.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def truncate_repetitions(generation: str, score_cutoff: float = 0.5, min_len: int = 30):
    """
    Truncate repetitions in the given generation.

    This function identifies and truncates repetitive content in the text.
    """
    try:
        sentences = generation.split(".")
        if len(sentences) < 3:
            return generation

        to_delete = set()
        for i in range(len(sentences)):
            for j in range(i + 1, len(sentences)):
                sent_i = sentences[i].strip()
                sent_j = sentences[j].strip()

                if len(sent_i) < min_len or len(sent_j) < min_len:
                    continue

                if ratio(sent_i, sent_j) > score_cutoff:
                    to_delete.add(j)

        new_sentences = [sent for i, sent in enumerate(sentences) if i not in to_delete]
        return ".".join(new_sentences)
    except Exception:
        return generation

PII Removal Stage

Removes personally identifiable information from documents.

PiiStep

Bases: PipelineStep

Source code in eve/steps/pii/pii_step.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class PiiStep(PipelineStep):

    async def _remove_pii(
        self,
        document: Document,
        entities: Optional[List[str]] = None,
        threshold: float = 0.35,
        return_analysis: bool = False,
        url: str = None,
    ) -> Document:
        """Make a call to the litserve API and remove PII (async with aiohttp)."""

        if not url:
            self.logger.error("No URL provided for PII service")
            return document

        if entities is None:
            entities = ["PERSON", "EMAIL_ADDRESS"]

        payload = {
            "text": document.content,
            "entities": entities,
            "score_threshold": threshold,
            "return_analysis": return_analysis,
        }

        try:
            async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=120)) as session:
                async with session.post(
                    url,
                    json = payload,
                    headers={"Content-Type": "application/json"},
                ) as response:
                    response.raise_for_status()
                    result = await response.json()
                    self.logger.debug(f"PII API result: {result}")
                    document.content = result.get("anonymized_text", document.content)
                    return document

        except aiohttp.ClientError as e:
            self.logger.error(f"PII API request failed: {e}")
            return document

    async def remove_pii(
        self,
        document: Document,
        entities: Optional[List[str]] = None,
        threshold: float = 0.35,
        return_analysis: bool = False,
        url: str = None,
    ) -> Document:
        return await self._remove_pii(document, entities, threshold, return_analysis, url)

    async def execute(self, documents: List[Document]) -> List[Document]:
        base_url = self.config.get("url")
        if not base_url:
            self.logger.error("No URL provided for PII service")
            return []

        url = f"{base_url}/predict"

        # Run all requests concurrently
        tasks = [self.remove_pii(document, url=url) for document in documents]
        results = await asyncio.gather(*tasks, return_exceptions = True)

        final = []
        for doc in results:
            if doc and getattr(doc, "content_length", 0) > 1:
                final.append(doc)
                self.logger.info(f"Successfully anonymized {doc.filename}")

        return final

Metadata Extraction Stage

Extracts structured metadata from documents.

Metadata extraction step for the EVE pipeline.

MetadataStep

Bases: PipelineStep

Metadata extraction step that extracts metadata from PDF and HTML documents.

Source code in eve/steps/metadata/metadata_step.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class MetadataStep(PipelineStep):
    """
    Metadata extraction step that extracts metadata from PDF and HTML documents.
    """

    def __init__(self, config: dict):
        """
        Initialize the metadata extraction step.

        Args:
            config: Configuration dictionary with options:

                - enabled_formats: List of file formats to process (pdf, html, txt, md)
                - fallback_to_filename: Use filename as title fallback
                - debug: Enable debug logging
                - export_metadata: Whether to export metadata to JSON file
                - metadata_destination: Directory to save metadata file
                - metadata_filename: Name of the metadata JSON file
                  Note: Text formats (txt, md) automatically enable this feature
        """
        super().__init__(config)

        self.enabled_formats = set(config.get("enabled_formats", ["pdf", "html", "txt", "md"]))
        self.fallback_to_filename = config.get("fallback_to_filename", True)
        self.export_metadata = config.get("export_metadata", True)
        self.metadata_destination = Path(config.get("metadata_destination", "./output"))
        self.metadata_filename = config.get("metadata_filename", "metadata.jsonl")

        self.extractors = {
            "pdf": PdfMetadataExtractor(debug=self.debug),
            "html": HtmlMetadataExtractor(debug=self.debug)
        }

    async def _extract_metadata_for_document(self, document: Document) -> Document:
        """
        Extract metadata for a single document using appropriate extractor.
        Args:
            document: Document to extract metadata from

        Returns:
            Document with metadata added to the metadata field:
            - extracted_title: Document title
            - extracted_authors: List of authors
            - extracted_year: Publication year
            - extracted_metadata: Full metadata dictionary
            - extraction_error: Error message (if extraction failed)
        """
        if document.file_format not in self.enabled_formats:
            self.logger.debug(f"Skipping metadata extraction for unsupported format: {document.file_format}")
            return document

        metadata = None

        try:
            if document.file_format in ["txt", "md"]:
                if not document.content.strip() and document.file_path.exists():
                    try:
                        with open(document.file_path, 'r', encoding='utf-8') as f:
                            document.update_content(f.read())
                        self.logger.info(f"Loaded content for text file {document.filename}: {len(document.content)} characters")
                    except Exception as e:
                        self.logger.error(f"Failed to load content for {document.filename}: {str(e)}")
                        return document

            elif document.file_format in self.extractors:
                extractor = self.extractors[document.file_format]
                metadata = await extractor.extract_metadata(document)
            else:
                self.logger.warning(f"No extractor available for format: {document.file_format}")

            if metadata:
                document.add_metadata("extracted_metadata", metadata)

                self.logger.info(f"Successfully extracted metadata from {document.filename}")
                if self.debug:
                    self.logger.debug(f"Extracted metadata keys: {list(metadata.keys())}")
            else:
                self.logger.warning(f"No metadata extracted from {document.filename}")

                if self.fallback_to_filename:
                    title = document.file_path.stem.replace("_", " ").replace("-", " ")
                    document.add_metadata("title", title)
                    self.logger.info(f"Using filename as title fallback: {title}")

        except Exception as e:
            self.logger.error(f"Failed to extract metadata from {document.filename}: {str(e)}")

            if self.fallback_to_filename:
                title = document.file_path.stem.replace("_", " ").replace("-", " ")
                document.add_metadata("title", title)
            document.add_metadata("extraction_error", str(e))

        return document

    async def execute(self, documents: List[Document]) -> List[Document]:
        """
        Execute metadata extraction on input documents.

        Args:
            documents: List of Document objects to extract metadata from

        Returns:
            List of Document objects with metadata added
        """
        if not documents:
            self.logger.warning("No input documents provided to metadata step")
            return []

        supported_documents = [
            doc for doc in documents 
            if doc.file_format in self.enabled_formats
        ]

        unsupported_documents = [
            doc for doc in documents 
            if doc.file_format not in self.enabled_formats
        ]

        if unsupported_documents:
            self.logger.info(f"Skipping {len(unsupported_documents)} documents with unsupported formats")

        if not supported_documents:
            self.logger.warning("No documents with supported formats found")
            return documents

        self.logger.info(f"Extracting metadata from {len(supported_documents)} documents")

        tasks = [
            self._extract_metadata_for_document(document) 
            for document in supported_documents
        ]

        processed_supported = await asyncio.gather(*tasks, return_exceptions=True)

        result_supported = []
        for i, result in enumerate(processed_supported):
            if isinstance(result, Exception):
                self.logger.error(f"Exception processing {supported_documents[i].filename}: {result}")
                result_supported.append(supported_documents[i])
            else:
                result_supported.append(result)

        final_result = result_supported + unsupported_documents

        successful_count = sum(1 for doc in result_supported if doc.get_metadata("extracted_metadata"))
        self.logger.info(f"Successfully extracted metadata from {successful_count}/{len(supported_documents)} supported documents")

        if self.export_metadata:
            await self._export_metadata_to_json(final_result)

        return final_result

    async def _export_metadata_to_json(self, documents: List[Document]) -> None:
        """
        Export extracted metadata to a JSON file.

        Args:
            documents: List of processed documents with metadata
        """
        if not self.metadata_destination.exists():
            self.logger.info(f"Creating metadata destination directory: {self.metadata_destination}")
            self.metadata_destination.mkdir(parents=True, exist_ok=True)

        metadata_file = self.metadata_destination / self.metadata_filename

        for document in documents:
            doc_metadata = {
                "filename": document.filename,
                "file_path": str(document.file_path),
                "file_format": document.file_format,
                "content_length": document.content_length,
                "has_extracted_metadata": bool(document.get_metadata("extracted_metadata"))
            }
            if document.metadata:
                for key, value in document.metadata.items():
                    doc_metadata[key] = value

            metadata_file = self.metadata_destination / self.metadata_filename

            try:
                with open(metadata_file, 'a', encoding='utf-8') as f:
                    json.dump(doc_metadata, f, ensure_ascii=False, default=str)
                    f.write('\n')

                self.logger.info(f"Exported metadata to: {metadata_file}")
                self.logger.info(f"Metadata exported for {len(documents)} documents ({sum(1 for doc in documents if doc.get_metadata('extracted_metadata'))} with extracted metadata)")

            except Exception as e:
                self.logger.error(f"Failed to export metadata to {metadata_file}: {str(e)}")

__init__(config)

Initialize the metadata extraction step.

Parameters:

Name Type Description Default
config dict

Configuration dictionary with options:

  • enabled_formats: List of file formats to process (pdf, html, txt, md)
  • fallback_to_filename: Use filename as title fallback
  • debug: Enable debug logging
  • export_metadata: Whether to export metadata to JSON file
  • metadata_destination: Directory to save metadata file
  • metadata_filename: Name of the metadata JSON file Note: Text formats (txt, md) automatically enable this feature
required
Source code in eve/steps/metadata/metadata_step.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(self, config: dict):
    """
    Initialize the metadata extraction step.

    Args:
        config: Configuration dictionary with options:

            - enabled_formats: List of file formats to process (pdf, html, txt, md)
            - fallback_to_filename: Use filename as title fallback
            - debug: Enable debug logging
            - export_metadata: Whether to export metadata to JSON file
            - metadata_destination: Directory to save metadata file
            - metadata_filename: Name of the metadata JSON file
              Note: Text formats (txt, md) automatically enable this feature
    """
    super().__init__(config)

    self.enabled_formats = set(config.get("enabled_formats", ["pdf", "html", "txt", "md"]))
    self.fallback_to_filename = config.get("fallback_to_filename", True)
    self.export_metadata = config.get("export_metadata", True)
    self.metadata_destination = Path(config.get("metadata_destination", "./output"))
    self.metadata_filename = config.get("metadata_filename", "metadata.jsonl")

    self.extractors = {
        "pdf": PdfMetadataExtractor(debug=self.debug),
        "html": HtmlMetadataExtractor(debug=self.debug)
    }

execute(documents) async

Execute metadata extraction on input documents.

Parameters:

Name Type Description Default
documents List[Document]

List of Document objects to extract metadata from

required

Returns:

Type Description
List[Document]

List of Document objects with metadata added

Source code in eve/steps/metadata/metadata_step.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
async def execute(self, documents: List[Document]) -> List[Document]:
    """
    Execute metadata extraction on input documents.

    Args:
        documents: List of Document objects to extract metadata from

    Returns:
        List of Document objects with metadata added
    """
    if not documents:
        self.logger.warning("No input documents provided to metadata step")
        return []

    supported_documents = [
        doc for doc in documents 
        if doc.file_format in self.enabled_formats
    ]

    unsupported_documents = [
        doc for doc in documents 
        if doc.file_format not in self.enabled_formats
    ]

    if unsupported_documents:
        self.logger.info(f"Skipping {len(unsupported_documents)} documents with unsupported formats")

    if not supported_documents:
        self.logger.warning("No documents with supported formats found")
        return documents

    self.logger.info(f"Extracting metadata from {len(supported_documents)} documents")

    tasks = [
        self._extract_metadata_for_document(document) 
        for document in supported_documents
    ]

    processed_supported = await asyncio.gather(*tasks, return_exceptions=True)

    result_supported = []
    for i, result in enumerate(processed_supported):
        if isinstance(result, Exception):
            self.logger.error(f"Exception processing {supported_documents[i].filename}: {result}")
            result_supported.append(supported_documents[i])
        else:
            result_supported.append(result)

    final_result = result_supported + unsupported_documents

    successful_count = sum(1 for doc in result_supported if doc.get_metadata("extracted_metadata"))
    self.logger.info(f"Successfully extracted metadata from {successful_count}/{len(supported_documents)} supported documents")

    if self.export_metadata:
        await self._export_metadata_to_json(final_result)

    return final_result

Metadata Extractors

HTML Metadata Extractor

HtmlMetadataExtractor

Metadata extractor for HTML files and web pages.

Source code in eve/steps/metadata/extractors/html_extractor.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class HtmlMetadataExtractor():
    """
    Metadata extractor for HTML files and web pages.
    """

    def __init__(self, debug: bool = False):
        """
        Initialize the HTML metadata extractor.

        The HTML extractor relies on regex patterns defined in eve.common.regex_patterns
        for parsing HTML content efficiently without requiring a full HTML parser.

        Args:
            debug: Enable debug logging for detailed extraction information
        """
        self.debug = debug
        self.logger = get_logger(self.__class__.__name__)

    def _clean_title(self, title: str) -> Optional[str]:
        """
        Clean and normalize a title string.

        Args:
            title: Raw title string from extracted metadata

        Returns:
            Cleaned title string, or None if title is invalid
        """
        if not title or not isinstance(title, str):
            return None

        # Remove leading/trailing whitespace
        cleaned = title.strip()

        # Convert newlines and carriage returns to spaces
        cleaned = cleaned.replace('\n', ' ').replace('\r', ' ')

        # Collapse multiple spaces into single spaces
        while '  ' in cleaned:
            cleaned = cleaned.replace('  ', ' ')

        # Filter out invalid titles
        if len(cleaned) < 3 or cleaned.isdigit():
            return None

        return cleaned

    def _extract_content_with_tags(self, document: Document) -> Optional[str]:
        with open(document.file_path, 'r', encoding = 'utf-8') as file:
            html_content = file.read()

        document.content = html_content
        return document

    def _extract_title_from_html(self, html_content: str) -> Optional[str]:
        """
        Extract title from HTML <title> tag using regex patterns.

        Args:
            html_content: Raw HTML content as string

        Returns:
            Cleaned title string from <title> tag, or None if not found/invalid
        """
        # Use regex pattern to extract title content
        title = extract_html_title(html_content)

        if title:
            # Apply standard title cleaning (whitespace, length validation, etc.)
            cleaned_title = self._clean_title(title)

            if cleaned_title:
                self.logger.debug(f"Extracted title from HTML: {cleaned_title}")
                return cleaned_title

        return None

    async def extract_metadata(self, document: Document) -> Optional[Dict[str, Any]]:
        """
        Extract metadata from an HTML document using multi-source approach.

        Args:
            document: HTML document to extract metadata from

        Returns:
            Dictionary containing extracted metadata with fields:
            - title: Page title (from various sources, with title_source indicator)
            - title_source: Source of title ('html_tag', 'meta_tag', 'filename')
            - url: Source URL if available
            - domain: Domain name from URL
            - scheme: URL scheme (http/https)
            - content_length: Length of HTML content
            - has_content: Boolean indicating content exists
            - extraction_methods: List containing 'html_parsing'

            Returns None if document format is invalid
        """

        metadata = {}

        document = self._extract_content_with_tags(document) # do this because extraction from previous step removes tag

        extracted_title = self._extract_title_from_html(document.content)
        metadata['title'] = extracted_title
        metadata['content_length'] = len(document.content)

        return metadata

__init__(debug=False)

Initialize the HTML metadata extractor.

The HTML extractor relies on regex patterns defined in eve.common.regex_patterns for parsing HTML content efficiently without requiring a full HTML parser.

Parameters:

Name Type Description Default
debug bool

Enable debug logging for detailed extraction information

False
Source code in eve/steps/metadata/extractors/html_extractor.py
13
14
15
16
17
18
19
20
21
22
23
24
def __init__(self, debug: bool = False):
    """
    Initialize the HTML metadata extractor.

    The HTML extractor relies on regex patterns defined in eve.common.regex_patterns
    for parsing HTML content efficiently without requiring a full HTML parser.

    Args:
        debug: Enable debug logging for detailed extraction information
    """
    self.debug = debug
    self.logger = get_logger(self.__class__.__name__)

extract_metadata(document) async

Extract metadata from an HTML document using multi-source approach.

Parameters:

Name Type Description Default
document Document

HTML document to extract metadata from

required

Returns:

Type Description
Optional[Dict[str, Any]]

Dictionary containing extracted metadata with fields:

Optional[Dict[str, Any]]
  • title: Page title (from various sources, with title_source indicator)
Optional[Dict[str, Any]]
  • title_source: Source of title ('html_tag', 'meta_tag', 'filename')
Optional[Dict[str, Any]]
  • url: Source URL if available
Optional[Dict[str, Any]]
  • domain: Domain name from URL
Optional[Dict[str, Any]]
  • scheme: URL scheme (http/https)
Optional[Dict[str, Any]]
  • content_length: Length of HTML content
Optional[Dict[str, Any]]
  • has_content: Boolean indicating content exists
Optional[Dict[str, Any]]
  • extraction_methods: List containing 'html_parsing'
Optional[Dict[str, Any]]

Returns None if document format is invalid

Source code in eve/steps/metadata/extractors/html_extractor.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
async def extract_metadata(self, document: Document) -> Optional[Dict[str, Any]]:
    """
    Extract metadata from an HTML document using multi-source approach.

    Args:
        document: HTML document to extract metadata from

    Returns:
        Dictionary containing extracted metadata with fields:
        - title: Page title (from various sources, with title_source indicator)
        - title_source: Source of title ('html_tag', 'meta_tag', 'filename')
        - url: Source URL if available
        - domain: Domain name from URL
        - scheme: URL scheme (http/https)
        - content_length: Length of HTML content
        - has_content: Boolean indicating content exists
        - extraction_methods: List containing 'html_parsing'

        Returns None if document format is invalid
    """

    metadata = {}

    document = self._extract_content_with_tags(document) # do this because extraction from previous step removes tag

    extracted_title = self._extract_title_from_html(document.content)
    metadata['title'] = extracted_title
    metadata['content_length'] = len(document.content)

    return metadata

PDF Metadata Extractor

PDF metadata extractor is a two stage process.

  1. Extract content using monkeyocr.
  2. Use crossref to extract metadata from the content.

PdfMetadataExtractor

Source code in eve/steps/metadata/extractors/pdf_extractor.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class PdfMetadataExtractor():

    def __init__(self, debug: bool = False):
        """
        Initialize the PDF metadata extractor.

        Args:
            debug: Enable debug logging for detailed extraction information
        """
        self.debug = debug
        self.logger = get_logger(self.__class__.__name__)

    @staticmethod
    def _safe_str(value):
        if isinstance(value, list):
            return value[0] if value else None
        if isinstance(value, dict):
            return json.dumps(value)
        return str(value) if value not in (None, "", []) else None

    @staticmethod
    def _extract_identifier(main_dir, sub_dir):
        md_path = f"{main_dir}/{sub_dir}/{sub_dir}.md"
        try:
            with open(md_path, 'r', encoding="utf-8", errors="ignore") as f:
                content = f.read()
        except Exception:
            return "NA", None

        for pattern in doi_regexp:
            match = re.findall(pattern, content, re.I)
            if match:
                return "doi", match[0]

        for pattern in arxiv_regexp:
            match = re.findall(pattern, content, re.I)
            if match:
                return "arxiv", match[0]

        for pattern in isbn_regexp:
            match = re.findall(pattern, content, re.I)
            if match:
                return "isbn", match[0]

        return "NA", None

    @staticmethod
    def _extract_title(main_dir, sub_dir):
        json_path = os.path.join(main_dir, sub_dir, f"{sub_dir}_content_list.json")
        try:
            with open(json_path, "r", encoding="utf-8") as f:
                data = json.load(f)
            candidates = [row["text"] for row in data if row.get("text_level", 0) == 1]
            return max(candidates, key=len) if candidates else "NA"
        except Exception:
            return "NA"

    @staticmethod
    async def _fetch_json(client, url):
        try:
            resp = await client.get(url, timeout=50) # 
            resp.raise_for_status()
            return resp.json()
        except Exception:
            return None

    async def _fetch_crossref_by_doi(self, client, doi):
        data = await self._fetch_json(client, f"https://api.crossref.org/works/{doi}")
        if not data:
            return None
        item = data.get("message", {})

        return {
            "title": self._safe_str(item.get("title")),
            "authors": ", ".join(
                f"{a.get('given', '')} {a.get('family', '')}".strip()
                for a in item.get("author", [])
                if isinstance(a, dict)
            ) or None,
            "year": self._safe_str(item.get("issued", {}).get("date-parts", [[None]])[0]),
            "publisher": self._safe_str(item.get("publisher")),
            "journal": self._safe_str(item.get("container-title")),
            "pub_url": self._safe_str(item.get("URL")),
            "doi": self._safe_str(item.get("DOI")),
            "citation_count": item.get("is-referenced-by-count")
        }


    async def _fetch_crossref_by_title(self, client, title):
        q = re.sub(r"[\s]+", "+", title)
        data = await self._fetch_json(client, f"https://api.crossref.org/works?query.bibliographic={q}&rows=1")
        if not data:
            return None
        items = data.get("message", {}).get("items", [])
        if not items:
            return None
        item = items[0]

        return {
            "title": self._safe_str(item.get("title")),
            "authors": ", ".join(
                f"{a.get('given', '')} {a.get('family', '')}".strip()
                for a in item.get("author", [])
                if isinstance(a, dict)
            ) or None,
            "year": self._safe_str(item.get("issued", {}).get("date-parts", [[None]])[0]),
            "publisher": self._safe_str(item.get("publisher")),
            "journal": self._safe_str(item.get("container-title")),
            "pub_url": self._safe_str(item.get("URL")),
            "doi": self._safe_str(item.get("DOI")),
        }


    async def fetch_doi_from_arxiv(self, client, arxiv_id):
        data = await self._fetch_json(client, f"https://api.crossref.org/works?query={arxiv_id}")
        if not data:
            return None
        items = data.get("message", {}).get("items", [])
        return self._safe_str(items[0].get("DOI")) if items else None

    async def _extract_metadata(self, sub_dir, main_dir, client, sem):
        async with sem:
            id_type, identifier = self._extract_identifier(main_dir, sub_dir)
            meta = None

            if id_type == "doi":
                meta = await self._fetch_crossref_by_doi(client, identifier)
            elif id_type == "arxiv":
                doi = await self._fetch_doi_from_arxiv(client, identifier)
                if doi:
                    meta = await self._fetch_crossref_by_doi(client, doi)

            title = self._extract_title(main_dir, sub_dir)
            if not meta and title != "NA":
                meta = await self._fetch_crossref_by_title(client, title)

            return {
                "sub_dir": sub_dir,
                "id_type": id_type,
                "identifier": identifier,
                "title_extracted": title,
                "meta": meta,
            }

    async def extract_metadata(self, document: Document) -> Optional[Dict[str, Any]]:
        """
        Extract metadata from a PDF document using crossref.

        Args:
            document: PDF document to extract metadata from

        Returns:
            Dictionary containing extracted metadata with fields:
            - title: Document title
            - authors: List of author names 
            - year: Publication year
            - journal: Publication venue
            - doi: Digital Object Identifier
            - identifier: Document identifier (DOI, arXiv, etc.)

            Returns None if document format is invalid
        """

        metadata = {}
        done = set()

        subdirs = [d for d in os.listdir(main_dir)
                if os.path.isdir(os.path.join(main_dir, d)) and d not in done]

        if not subdirs:
            print("All subdirectories already processed.")
            return

        sem = asyncio.Semaphore(MAX_CONCURRENT)
        async with httpx.AsyncClient() as client:
            tasks = [self._extract_metadata(d, main_dir, client, sem) for d in subdirs]

            for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks)):
                try:
                    r = await coro
                    metadata[r.get("sub_dir")] = r.get("meta")
                except Exception as e:
                    print(f"Failed on {coro}: {e}")

        return metadata

__init__(debug=False)

Initialize the PDF metadata extractor.

Parameters:

Name Type Description Default
debug bool

Enable debug logging for detailed extraction information

False
Source code in eve/steps/metadata/extractors/pdf_extractor.py
25
26
27
28
29
30
31
32
33
def __init__(self, debug: bool = False):
    """
    Initialize the PDF metadata extractor.

    Args:
        debug: Enable debug logging for detailed extraction information
    """
    self.debug = debug
    self.logger = get_logger(self.__class__.__name__)

extract_metadata(document) async

Extract metadata from a PDF document using crossref.

Parameters:

Name Type Description Default
document Document

PDF document to extract metadata from

required

Returns:

Type Description
Optional[Dict[str, Any]]

Dictionary containing extracted metadata with fields:

Optional[Dict[str, Any]]
  • title: Document title
Optional[Dict[str, Any]]
  • authors: List of author names
Optional[Dict[str, Any]]
  • year: Publication year
Optional[Dict[str, Any]]
  • journal: Publication venue
Optional[Dict[str, Any]]
  • doi: Digital Object Identifier
Optional[Dict[str, Any]]
  • identifier: Document identifier (DOI, arXiv, etc.)
Optional[Dict[str, Any]]

Returns None if document format is invalid

Source code in eve/steps/metadata/extractors/pdf_extractor.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
async def extract_metadata(self, document: Document) -> Optional[Dict[str, Any]]:
    """
    Extract metadata from a PDF document using crossref.

    Args:
        document: PDF document to extract metadata from

    Returns:
        Dictionary containing extracted metadata with fields:
        - title: Document title
        - authors: List of author names 
        - year: Publication year
        - journal: Publication venue
        - doi: Digital Object Identifier
        - identifier: Document identifier (DOI, arXiv, etc.)

        Returns None if document format is invalid
    """

    metadata = {}
    done = set()

    subdirs = [d for d in os.listdir(main_dir)
            if os.path.isdir(os.path.join(main_dir, d)) and d not in done]

    if not subdirs:
        print("All subdirectories already processed.")
        return

    sem = asyncio.Semaphore(MAX_CONCURRENT)
    async with httpx.AsyncClient() as client:
        tasks = [self._extract_metadata(d, main_dir, client, sem) for d in subdirs]

        for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks)):
            try:
                r = await coro
                metadata[r.get("sub_dir")] = r.get("meta")
            except Exception as e:
                print(f"Failed on {coro}: {e}")

    return metadata

Export Stage

Saves processed documents to output formats.

ExportStep

Bases: PipelineStep

Source code in eve/steps/export/export_step.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class ExportStep(PipelineStep):

    def __init__(self, config: dict, name: str = "ExportStep"):
        """Initialize the export step.

        Args:
            config: Configuration containing:

                - output_dir: Output directory path
                - format: Output format (jsonl, md, etc.)
                - resume: Whether to enable resume functionality (default: False)
            name: Name for logging purposes
        """
        super().__init__(config, name)

        # Initialize checkpoint manager if resume is enabled
        self.resume = config.get("resume", False)
        output_dir = Path(config.get("output_dir", "./output"))

        if self.resume:
            self.checkpoint = CheckpointManager(output_dir, resume=True)
            stats = self.checkpoint.get_stats()
            self.logger.info(f"Resume mode enabled: {stats['processed_count']} documents already processed")
        else:
            self.checkpoint = None

    async def export_jsonl(self, documents: List[Document]) -> List[Document]:
        output_dir = Path(self.config.get("output_dir", "./output"))
        result = []

        if not output_dir.exists():
            self.logger.info(f"{output_dir} does not exist. creating...")
            output_dir.mkdir(parents=True, exist_ok=True)

        for document in documents:
            output_file = (
                output_dir
                / f"{Path(document.filename).stem}.{self.config.get('format', 'jsonl')}"
            )
            async with aiofiles.open(output_file, "a+", encoding="utf-8") as f:
                await f.write(json.dumps(document.__dict__()))
                await f.write("\n")

            # Mark as processed in checkpoint
            if self.checkpoint:
                self.checkpoint.mark_processed(document)

            result.append(document)
        return result

    async def export_md(self, documents: List[Document]) -> List[Document]:
        output_dir = Path(self.config.get("output_dir", "./output"))
        result = []
        if not output_dir.exists():
            self.logger.info(f"{output_dir} does not exist. creating...")
            output_dir.mkdir(parents=True, exist_ok=True)
        for document in documents:
            output_file = (
                output_dir
                / f"{Path(document.filename).stem}.{self.config.get('format', 'jsonl')}"
            )
            async with aiofiles.open(output_file, "a+", encoding="utf-8") as f:
                await f.write(json.dumps(document.__dict__()))
                await f.write("\n")
            self.logger.info(f"Saved file: {output_file}")

            # Mark as processed in checkpoint
            if self.checkpoint:
                self.checkpoint.mark_processed(document)

            result.append(document)
        return result

    async def dummy_export(self, documents: List[Document]) -> List[Document]:
        return documents

    async def execute(self, documents: List[Document]) -> List[Document]:
        # Note: Documents are already filtered in create_batches() when resume is enabled
        # No need to filter again here to avoid memory overhead

        format = self.config.get("format", "jsonl")
        if format == "jsonl":
            result = await self.export_jsonl(documents)
        elif format == "dummy":
            result = await self.dummy_export(documents)
        else:
            result = await self.export_md(documents)

        return result

__init__(config, name='ExportStep')

Initialize the export step.

Parameters:

Name Type Description Default
config dict

Configuration containing:

  • output_dir: Output directory path
  • format: Output format (jsonl, md, etc.)
  • resume: Whether to enable resume functionality (default: False)
required
name str

Name for logging purposes

'ExportStep'
Source code in eve/steps/export/export_step.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def __init__(self, config: dict, name: str = "ExportStep"):
    """Initialize the export step.

    Args:
        config: Configuration containing:

            - output_dir: Output directory path
            - format: Output format (jsonl, md, etc.)
            - resume: Whether to enable resume functionality (default: False)
        name: Name for logging purposes
    """
    super().__init__(config, name)

    # Initialize checkpoint manager if resume is enabled
    self.resume = config.get("resume", False)
    output_dir = Path(config.get("output_dir", "./output"))

    if self.resume:
        self.checkpoint = CheckpointManager(output_dir, resume=True)
        stats = self.checkpoint.get_stats()
        self.logger.info(f"Resume mode enabled: {stats['processed_count']} documents already processed")
    else:
        self.checkpoint = None