Skip to content

Message API

This page covers chat generation, streaming, retry, feedback, and testing endpoints.

API call order

  1. Authenticate first (/signup + /verify + /login).
  2. Create conversation with POST /conversations.
  3. Get valid collection names via GET /collections/public (and/or private collection names you own).
  4. Call message or generate endpoints with conversation_id and collection names.
  5. Optionally run retry, feedback, hallucination, and stats endpoints.

Shared request setup is documented once in API index.

Collection name prerequisite

Before generation, fetch valid collection names from the Collection API:

  • Public collections: GET /collections/public?page=1&limit=20
  • Private collections you own: GET /collections?page=1&limit=20

See Collection API for endpoint details and examples.

Create message (non-streaming)

POST /conversations/{conversation_id}/messages

Create a new message in a conversation and generate an answer.

Validates conversation ownership, normalizes requested public collections, persists a placeholder Message, runs generation, updates the message with answer and retrieval metadata, and schedules rollup/trimming of history.

Parameters:

Name Type Description Default
request GenerationRequest

Generation parameters including query, collections, and model settings.

required
conversation_id str

Target conversation identifier.

required
background_tasks BackgroundTasks

Background task runner used to schedule rollups.

required
requesting_user User

Authenticated user injected by dependency.

Depends(get_current_user)

Returns:

Type Description
CreateMessageResponse

Message id, query, answer, documents, flags, and metadata.

Raises:

Type Description
HTTPException

404 if conversation is not found; 403 if ownership/collections invalid; 500 for server errors.

Usage

payload = {
    "query": "Summarize Sentinel-1 mission goals and practical applications.",
    "public_collections": ["qwen-512-filtered", "wikipedia-512"],
    "k": 5,
    "temperature": 0.1,
    "score_threshold": 0.6,
    "llm_type": "main",
    "filters": {"must": [], "should": None, "must_not": None, "min_should": None},
}

resp = requests.post(
    f"{BASE_URL}/conversations/{CONVERSATION_ID}/messages",
    json=payload,
    headers=headers,
    timeout=120,
)
resp.raise_for_status()
message = resp.json()
MESSAGE_ID = message["id"]
print(message["answer"])

Explanation

Runs retrieval + generation and stores the response in the conversation.

Notes

  • Requires a valid conversation_id.
  • Collection names should come from collection endpoints.

Important params

  • query: User prompt sent to the model.
  • score_threshold: Retrieval similarity threshold from 0.0 to 1.0.
  • k: Number of retrieved documents from 0 to 10.
  • filters: Optional Qdrant-compatible filter object.
  • public_collections: Collection names from collection listing endpoints.
  • temperature: Generation temperature from 0.0 to 1.0.
  • llm_type: Optional model selector (for example main, fallback, satcom_small, satcom_large, ship, eve_v05).

Create message (SSE streaming)

POST /conversations/{conversation_id}/stream_messages

Create a new message and stream generation via Server-Sent Events (SSE).

Sets up a per-message stream bus and runs generation in a decoupled task. Yields SSE-formatted chunks including status updates, tokens, and final payloads.

Parameters:

Name Type Description Default
request GenerationRequest

Generation parameters including query, collections, and model settings.

required
conversation_id str

Target conversation identifier.

required
background_tasks BackgroundTasks

Background task runner used to schedule rollups.

required
requesting_user User

Authenticated user injected by dependency.

Depends(get_current_user)

Returns:

Type Description
StreamingResponse

SSE stream for the generation lifecycle.

Raises:

Type Description
HTTPException

404 if conversation is not found; 403 if ownership/collections invalid; 500 for server errors.

Usage

with requests.post(
    f"{BASE_URL}/conversations/{CONVERSATION_ID}/stream_messages",
    json={
        "query": "How is TROPOMI used to support policy making?",
        "score_threshold": 0.6,
        "temperature": 0.0645,
        "k": 10,
        "filters": {
            "should": None,
            "min_should": None,
            "must": [],
            "must_not": None
        },
        "llm_type": "main",
        "public_collections": [
            "Wiley AI Gateway",
            "esa-data-qwen-1024",
            "Wikipedia EO",
            "wikipedia-512",
            "satcom-chunks-collection",
            "qwen-512-filtered"
        ]
    },
    headers={**headers, "Accept": "text/event-stream"},
    stream=True,
    timeout=120,
) as resp:
    resp.raise_for_status()
    for line in resp.iter_lines(decode_unicode=True):
        if line:
            print(line)

Explanation

Streams generated output as server-sent events.

Notes

  • Suitable for token-by-token UI updates.
  • Payload fields are the same as POST /conversations/{conversation_id}/messages.

Retry generation for one message

POST /conversations/{conversation_id}/messages/{message_id}/retry

Retry generation for an existing message.

Re-validates conversation ownership and message relationship, reuses the original request_input stored on the message, regenerates the answer, and updates message content, documents, and metadata.

Parameters:

Name Type Description Default
conversation_id str

Conversation identifier.

required
message_id str

Message identifier to retry.

required
background_tasks BackgroundTasks

Background task runner used to schedule rollups.

required
requesting_user User

Authenticated user injected by dependency.

Depends(get_current_user)

Returns:

Type Description
dict

Response payload mirroring create_message with updated answer and metadata.

Raises:

Type Description
HTTPException

404 if conversation/message not found; 403 if ownership invalid; 400 if message cannot be retried; 500 for server errors.

Usage

resp = requests.post(
    f"{BASE_URL}/conversations/{CONVERSATION_ID}/messages/{MESSAGE_ID}/retry",
    headers=headers,
    timeout=120,
)
resp.raise_for_status()
print(resp.json())

Explanation

Re-runs generation using the stored request input of that message.

Notes

  • Useful when model/provider transient failures occur.

Update message feedback

PATCH /conversations/{conversation_id}/messages/{message_id}

Update message feedback and related annotations.

Supports updating fields such as feedback, feedback_reason, was_copied, and hallucination feedback metadata on the target message.

Parameters:

Name Type Description Default
conversation_id str

Conversation identifier.

required
message_id str

Message identifier to update.

required
request MessageUpdate

Partial update payload for feedback fields.

required
requesting_user User

Authenticated user injected by dependency.

Depends(get_current_user)

Returns:

Type Description
dict

Success message upon update.

Raises:

Type Description
HTTPException

404 if conversation/message not found or mismatched; 403 if ownership invalid; 500 for server errors.

Usage

resp = requests.patch(
    f"{BASE_URL}/conversations/{CONVERSATION_ID}/messages/{MESSAGE_ID}",
    json={
        "feedback": "positive",
        "feedback_reason": "Sources are relevant and accurate",
        "was_copied": True,
    },
    headers=headers,
    timeout=30,
)
resp.raise_for_status()
print(resp.json())

Explanation

Stores user feedback and message-level annotations.

Notes

  • Call after displaying a generated response.

Stop active generation

POST /conversations/{conversation_id}/stop

Signal cancellation for the active generation within a conversation.

Uses the cancel manager to locate the in-flight message/task and requests cooperative cancellation, also notifying downstream subscribers via the stream bus.

Parameters:

Name Type Description Default
conversation_id str

Conversation identifier to stop generation for.

required
requesting_user User

Authenticated user injected by dependency.

Depends(get_current_user)

Returns:

Type Description
dict

Status payload indicating stop state or absence of active generation.

Raises:

Type Description
HTTPException

404 if conversation is not found; 403 if ownership invalid; 500 for server errors.

Usage

resp = requests.post(
    f"{BASE_URL}/conversations/{CONVERSATION_ID}/stop",
    headers=headers,
    timeout=30,
)
resp.raise_for_status()
print(resp.json())

Explanation

Requests cancellation of an active generation stream in the conversation.

Notes

  • Usually paired with streaming UIs.

Add source log for a message

POST /conversations/{conversation_id}/messages/{message_id}/source_logs

Append a source log entry to a message's metadata.

Stores user-attributed source inspection information such as id, url, title, and collection name, with a server-side timestamp.

Parameters:

Name Type Description Default
conversation_id str

Conversation identifier.

required
message_id str

Message identifier.

required
request SourceLogsRequest

Source log details to append.

required
requesting_user User

Authenticated user injected by dependency.

Depends(get_current_user)

Returns:

Type Description
dict

Confirmation message upon successful append.

Raises:

Type Description
HTTPException

404 if conversation/message not found or mismatched; 500 for server errors.

Usage

resp = requests.post(
    f"{BASE_URL}/conversations/{CONVERSATION_ID}/messages/{MESSAGE_ID}/source_logs",
    json={
        "source_id": "doc-001",
        "source_url": "https://example.org/eo-doc",
        "source_title": "EO Mission Documentation",
        "source_collection_name": "qwen-512-filtered",
    },
    headers=headers,
    timeout=30,
)
resp.raise_for_status()
print(resp.json())

Explanation

Adds source metadata associated with a generated answer.

Notes

  • Use when you want explicit source tracking beyond default retrieval metadata.

Detect hallucination (non-streaming)

POST /conversations/{conversation_id}/messages/{message_id}/hallucination

Detect and persist hallucination analysis for a message.

Runs a multi-step pipeline (detect, optionally rewrite, retrieve, answer) and stores the result and latency breakdown on the message metadata.

Parameters:

Name Type Description Default
conversation_id str

Conversation identifier.

required
message_id str

Message identifier to analyze.

required
requesting_user User

Authenticated user injected by dependency.

Depends(get_current_user)

Returns:

Type Description
HallucinationDetectResponse

Structured hallucination analysis with optional final answer.

Raises:

Type Description
HTTPException

404 if conversation/message not found or mismatched; 403 if ownership invalid; 500 for server errors.

Usage

resp = requests.post(
    f"{BASE_URL}/conversations/{CONVERSATION_ID}/messages/{MESSAGE_ID}/hallucination",
    headers=headers,
    timeout=120,
)
resp.raise_for_status()
print(resp.json())

Explanation

Runs hallucination detection and returns labels, reason, and related outputs.

Notes

  • Requires an existing message ID.

Detect hallucination (SSE streaming)

POST /conversations/{conversation_id}/messages/{message_id}/stream-hallucination

Stream hallucination handling result as Server-Sent Events (SSE).

Streams structured events for detection, optional rewriting, retrieval, and answer generation steps.

  • If label == 0 (factual), emits a final event with the reason.
  • If label == 1 (hallucination), streams tokens for the final answer and then a final event.

Parameters:

Name Type Description Default
conversation_id str

Conversation identifier.

required
message_id str

Message identifier to analyze.

required
requesting_user User

Authenticated user injected by dependency.

Depends(get_current_user)

Returns:

Type Description
StreamingResponse

SSE events for the detection workflow.

Raises:

Type Description
HTTPException

404 if conversation/message not found or mismatched; 403 if access is forbidden; 500 for streaming errors.

Usage

with requests.post(
    f"{BASE_URL}/conversations/{CONVERSATION_ID}/messages/{MESSAGE_ID}/stream-hallucination",
    headers={**headers, "Accept": "text/event-stream"},
    stream=True,
    timeout=120,
) as resp:
    resp.raise_for_status()
    for line in resp.iter_lines(decode_unicode=True):
        if line:
            print(line)

Explanation

Streams hallucination detection lifecycle events.

Notes

  • Useful for progressive moderation/validation UX.

LLM-only generation

POST /generate-llm

Call EVE-Instruct (v5) (Main model) with a single query. No RAG, no conversation context.

Body: query. Returns the model reply only.

Usage

resp = requests.post(
    f"{BASE_URL}/generate-llm",
    json={"query": "What is Earth Observation?"},
    headers=headers,
    timeout=60,
)
resp.raise_for_status()
print(resp.json())

Explanation

Runs direct LLM generation (main model path) without retrieval or conversation persistence.

Notes

  • Useful for baseline/debug scenarios.

One-off full generate

POST /generate

Run a one-off generation (testing only) and return the full answer and metadata.

Normalizes and validates requested public collections against allowed lists, ensures the user does not reference other users' collections, merges the user's collections and public collections (excluding "Wiley AI Gateway"), extracts year range from filters, then runs the full generation pipeline via generate_answer and returns the answer, documents, RAG flag, latencies, prompts, and retrieved docs.

Parameters:

Name Type Description Default
request GenerationRequest

Generation parameters including query, collections, and model settings.

required
requesting_user User

Authenticated user injected by dependency.

Depends(get_current_user)

Returns:

Type Description
dict

Dictionary containing: - answer: Generated answer text. - documents: Extracted document data from retrieval results. - use_rag: Whether RAG was used for this generation. - latencies: Timing information for pipeline steps. - prompts: Prompt data from generation. - retrieved_docs: Raw retrieved documents from RAG.

Raises:

Type Description
HTTPException

403 if the request references collections owned by other users.

HTTPException

500 for server errors during generation.

Usage

resp = requests.post(
    f"{BASE_URL}/generate",
    json={
        "query": "How is TROPOMI used to support policy making?",
        "score_threshold": 0.6,
        "temperature": 0.0645,
        "k": 10,
        "filters": {
            "should": None,
            "min_should": None,
            "must": [],
            "must_not": None
        },
        "llm_type": "main",
        "public_collections": [
            "Wiley AI Gateway",
            "esa-data-qwen-1024",
            "Wikipedia EO",
            "wikipedia-512",
            "satcom-chunks-collection",
            "qwen-512-filtered"
        ]
    },
    headers=headers,
    timeout=120,
)
resp.raise_for_status()
print(resp.json())

Explanation

Runs full retrieval + generation pipeline without storing a conversation message.

Important params

  • query: User prompt sent to the model.
  • score_threshold: Retrieval similarity threshold from 0.0 to 1.0.
  • k: Number of retrieved documents from 0 to 10.
  • filters: Optional Qdrant-compatible filter object.
  • public_collections: Collection names from collection listing endpoints.
  • temperature: Generation temperature from 0.0 to 1.0 (lower is more deterministic).
  • llm_type: Optional model selector (for example main, fallback, satcom_small, satcom_large, ship, eve_v05).

Retrieval-only

POST /retrieve

Run the entire retrieval pipeline and return all documents.

Runs the requery/rewrite step (same as generate_answer) to refine the query for retrieval, then executes the RAG retrieval pipeline using setup_rag_and_context and returns all retrieved documents.

Parameters:

Name Type Description Default
request GenerationRequest

Generation parameters including query, collections, and model settings.

required
requesting_user User

Authenticated user injected by dependency.

Depends(get_current_user)

Returns:

Type Description
dict

Dictionary containing: - retrieved_docs: All formatted documents from the retrieval pipeline - latencies: Timing information (includes rewrite and retrieval operations) - original_query: The query as sent in the request - requery: The rewritten query used for retrieval (or original if rewrite skipped/failed)

Usage

resp = requests.post(
    f"{BASE_URL}/retrieve",
    json={
        "query": "How is TROPOMI used to support policy making?",
        "score_threshold": 0.6,
        "k": 10,
        "filters": {
            "should": None,
            "min_should": None,
            "must": [],
            "must_not": None
        },
        "public_collections": [
            "Wiley AI Gateway",
            "esa-data-qwen-1024",
            "Wikipedia EO",
            "wikipedia-512",
            "satcom-chunks-collection",
            "qwen-512-filtered"
        ]
    },
    headers=headers,
    timeout=120,
)
resp.raise_for_status()
print(resp.json())

Explanation

Runs only retrieval and returns matched documents/metadata.

Important params

  • query: User query string.
  • score_threshold: Retrieval similarity threshold from 0.0 to 1.0.
  • k: Number of retrieved documents from 0 to 10.
  • filters: Optional Qdrant-compatible filter object.
  • public_collections: Collection names from collection listing endpoints.

User message stats

GET /conversations/messages/me/stats

Return counts and character totals for the current user's messages.

Aggregates across all messages belonging to conversations owned by the user.

Parameters:

Name Type Description Default
requesting_user User

Authenticated user injected by dependency.

Depends(get_current_user)

Returns:

Type Description
dict

Aggregated stats including counts and character sums.

Raises:

Type Description
HTTPException

500 for server errors during aggregation.

Usage

resp = requests.get(
    f"{BASE_URL}/conversations/messages/me/stats",
    headers=headers,
    timeout=30,
)
resp.raise_for_status()
print(resp.json())

Explanation

Returns message-level usage statistics for the authenticated user.

Notes

  • Requires authentication.

Average latency stats

GET /conversations/messages/average-latencies

Return average latencies aggregated across all messages.

Optionally filters the aggregation by a timestamp window.

Parameters:

Name Type Description Default
start_date datetime | None

Optional start of the time window (inclusive).

None
end_date datetime | None

Optional end of the time window (inclusive).

None

Returns:

Type Description
dict

Mapping of latency metric name to average value.

Raises:

Type Description
HTTPException

500 for server errors during aggregation.

Usage

resp = requests.get(
    f"{BASE_URL}/conversations/messages/average-latencies",
    params={"start_date": "2026-01-01T00:00:00Z", "end_date": "2026-12-31T23:59:59Z"},
    timeout=30,
)
resp.raise_for_status()
print(resp.json())

Explanation

Returns average pipeline latency metrics for the selected date range.

Notes

  • Endpoint can be used for performance monitoring dashboards.

Full API reference

For exhaustive schema details, use Swagger API.