Message API
This page covers chat generation, streaming, retry, feedback, and testing endpoints.
API call order
- Authenticate first (
/signup+/verify+/login). - Create conversation with
POST /conversations. - Get valid collection names via
GET /collections/public(and/or private collection names you own). - Call message or generate endpoints with
conversation_idand collection names. - Optionally run retry, feedback, hallucination, and stats endpoints.
Shared request setup is documented once in API index.
Collection name prerequisite
Before generation, fetch valid collection names from the Collection API:
- Public collections:
GET /collections/public?page=1&limit=20 - Private collections you own:
GET /collections?page=1&limit=20
See Collection API for endpoint details and examples.
Create message (non-streaming)
POST /conversations/{conversation_id}/messages
Create a new message in a conversation and generate an answer.
Validates conversation ownership, normalizes requested public collections, persists a placeholder Message, runs generation, updates the message with answer and retrieval metadata, and schedules rollup/trimming of history.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request
|
GenerationRequest
|
Generation parameters including query, collections, and model settings. |
required |
conversation_id
|
str
|
Target conversation identifier. |
required |
background_tasks
|
BackgroundTasks
|
Background task runner used to schedule rollups. |
required |
requesting_user
|
User
|
Authenticated user injected by dependency. |
Depends(get_current_user)
|
Returns:
| Type | Description |
|---|---|
CreateMessageResponse
|
Message id, query, answer, documents, flags, and metadata. |
Raises:
| Type | Description |
|---|---|
HTTPException
|
404 if conversation is not found; 403 if ownership/collections invalid; 500 for server errors. |
Usage
payload = {
"query": "Summarize Sentinel-1 mission goals and practical applications.",
"public_collections": ["qwen-512-filtered", "wikipedia-512"],
"k": 5,
"temperature": 0.1,
"score_threshold": 0.6,
"llm_type": "main",
"filters": {"must": [], "should": None, "must_not": None, "min_should": None},
}
resp = requests.post(
f"{BASE_URL}/conversations/{CONVERSATION_ID}/messages",
json=payload,
headers=headers,
timeout=120,
)
resp.raise_for_status()
message = resp.json()
MESSAGE_ID = message["id"]
print(message["answer"])
Explanation
Runs retrieval + generation and stores the response in the conversation.
Notes
- Requires a valid
conversation_id. - Collection names should come from collection endpoints.
Important params
query: User prompt sent to the model.score_threshold: Retrieval similarity threshold from0.0to1.0.k: Number of retrieved documents from0to10.filters: Optional Qdrant-compatible filter object.public_collections: Collection names from collection listing endpoints.temperature: Generation temperature from0.0to1.0.llm_type: Optional model selector (for examplemain,fallback,satcom_small,satcom_large,ship,eve_v05).
Create message (SSE streaming)
POST /conversations/{conversation_id}/stream_messages
Create a new message and stream generation via Server-Sent Events (SSE).
Sets up a per-message stream bus and runs generation in a decoupled task. Yields SSE-formatted chunks including status updates, tokens, and final payloads.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request
|
GenerationRequest
|
Generation parameters including query, collections, and model settings. |
required |
conversation_id
|
str
|
Target conversation identifier. |
required |
background_tasks
|
BackgroundTasks
|
Background task runner used to schedule rollups. |
required |
requesting_user
|
User
|
Authenticated user injected by dependency. |
Depends(get_current_user)
|
Returns:
| Type | Description |
|---|---|
StreamingResponse
|
SSE stream for the generation lifecycle. |
Raises:
| Type | Description |
|---|---|
HTTPException
|
404 if conversation is not found; 403 if ownership/collections invalid; 500 for server errors. |
Usage
with requests.post(
f"{BASE_URL}/conversations/{CONVERSATION_ID}/stream_messages",
json={
"query": "How is TROPOMI used to support policy making?",
"score_threshold": 0.6,
"temperature": 0.0645,
"k": 10,
"filters": {
"should": None,
"min_should": None,
"must": [],
"must_not": None
},
"llm_type": "main",
"public_collections": [
"Wiley AI Gateway",
"esa-data-qwen-1024",
"Wikipedia EO",
"wikipedia-512",
"satcom-chunks-collection",
"qwen-512-filtered"
]
},
headers={**headers, "Accept": "text/event-stream"},
stream=True,
timeout=120,
) as resp:
resp.raise_for_status()
for line in resp.iter_lines(decode_unicode=True):
if line:
print(line)
Explanation
Streams generated output as server-sent events.
Notes
- Suitable for token-by-token UI updates.
- Payload fields are the same as
POST /conversations/{conversation_id}/messages.
Retry generation for one message
POST /conversations/{conversation_id}/messages/{message_id}/retry
Retry generation for an existing message.
Re-validates conversation ownership and message relationship, reuses the original request_input stored on the message, regenerates the answer, and updates message content, documents, and metadata.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conversation_id
|
str
|
Conversation identifier. |
required |
message_id
|
str
|
Message identifier to retry. |
required |
background_tasks
|
BackgroundTasks
|
Background task runner used to schedule rollups. |
required |
requesting_user
|
User
|
Authenticated user injected by dependency. |
Depends(get_current_user)
|
Returns:
| Type | Description |
|---|---|
dict
|
Response payload mirroring create_message with updated answer and metadata. |
Raises:
| Type | Description |
|---|---|
HTTPException
|
404 if conversation/message not found; 403 if ownership invalid; 400 if message cannot be retried; 500 for server errors. |
Usage
resp = requests.post(
f"{BASE_URL}/conversations/{CONVERSATION_ID}/messages/{MESSAGE_ID}/retry",
headers=headers,
timeout=120,
)
resp.raise_for_status()
print(resp.json())
Explanation
Re-runs generation using the stored request input of that message.
Notes
- Useful when model/provider transient failures occur.
Update message feedback
PATCH /conversations/{conversation_id}/messages/{message_id}
Update message feedback and related annotations.
Supports updating fields such as feedback, feedback_reason, was_copied, and hallucination feedback metadata on the target message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conversation_id
|
str
|
Conversation identifier. |
required |
message_id
|
str
|
Message identifier to update. |
required |
request
|
MessageUpdate
|
Partial update payload for feedback fields. |
required |
requesting_user
|
User
|
Authenticated user injected by dependency. |
Depends(get_current_user)
|
Returns:
| Type | Description |
|---|---|
dict
|
Success message upon update. |
Raises:
| Type | Description |
|---|---|
HTTPException
|
404 if conversation/message not found or mismatched; 403 if ownership invalid; 500 for server errors. |
Usage
resp = requests.patch(
f"{BASE_URL}/conversations/{CONVERSATION_ID}/messages/{MESSAGE_ID}",
json={
"feedback": "positive",
"feedback_reason": "Sources are relevant and accurate",
"was_copied": True,
},
headers=headers,
timeout=30,
)
resp.raise_for_status()
print(resp.json())
Explanation
Stores user feedback and message-level annotations.
Notes
- Call after displaying a generated response.
Stop active generation
POST /conversations/{conversation_id}/stop
Signal cancellation for the active generation within a conversation.
Uses the cancel manager to locate the in-flight message/task and requests cooperative cancellation, also notifying downstream subscribers via the stream bus.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conversation_id
|
str
|
Conversation identifier to stop generation for. |
required |
requesting_user
|
User
|
Authenticated user injected by dependency. |
Depends(get_current_user)
|
Returns:
| Type | Description |
|---|---|
dict
|
Status payload indicating stop state or absence of active generation. |
Raises:
| Type | Description |
|---|---|
HTTPException
|
404 if conversation is not found; 403 if ownership invalid; 500 for server errors. |
Usage
resp = requests.post(
f"{BASE_URL}/conversations/{CONVERSATION_ID}/stop",
headers=headers,
timeout=30,
)
resp.raise_for_status()
print(resp.json())
Explanation
Requests cancellation of an active generation stream in the conversation.
Notes
- Usually paired with streaming UIs.
Add source log for a message
POST /conversations/{conversation_id}/messages/{message_id}/source_logs
Append a source log entry to a message's metadata.
Stores user-attributed source inspection information such as id, url, title, and collection name, with a server-side timestamp.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conversation_id
|
str
|
Conversation identifier. |
required |
message_id
|
str
|
Message identifier. |
required |
request
|
SourceLogsRequest
|
Source log details to append. |
required |
requesting_user
|
User
|
Authenticated user injected by dependency. |
Depends(get_current_user)
|
Returns:
| Type | Description |
|---|---|
dict
|
Confirmation message upon successful append. |
Raises:
| Type | Description |
|---|---|
HTTPException
|
404 if conversation/message not found or mismatched; 500 for server errors. |
Usage
resp = requests.post(
f"{BASE_URL}/conversations/{CONVERSATION_ID}/messages/{MESSAGE_ID}/source_logs",
json={
"source_id": "doc-001",
"source_url": "https://example.org/eo-doc",
"source_title": "EO Mission Documentation",
"source_collection_name": "qwen-512-filtered",
},
headers=headers,
timeout=30,
)
resp.raise_for_status()
print(resp.json())
Explanation
Adds source metadata associated with a generated answer.
Notes
- Use when you want explicit source tracking beyond default retrieval metadata.
Detect hallucination (non-streaming)
POST /conversations/{conversation_id}/messages/{message_id}/hallucination
Detect and persist hallucination analysis for a message.
Runs a multi-step pipeline (detect, optionally rewrite, retrieve, answer) and stores the result and latency breakdown on the message metadata.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conversation_id
|
str
|
Conversation identifier. |
required |
message_id
|
str
|
Message identifier to analyze. |
required |
requesting_user
|
User
|
Authenticated user injected by dependency. |
Depends(get_current_user)
|
Returns:
| Type | Description |
|---|---|
HallucinationDetectResponse
|
Structured hallucination analysis with optional final answer. |
Raises:
| Type | Description |
|---|---|
HTTPException
|
404 if conversation/message not found or mismatched; 403 if ownership invalid; 500 for server errors. |
Usage
resp = requests.post(
f"{BASE_URL}/conversations/{CONVERSATION_ID}/messages/{MESSAGE_ID}/hallucination",
headers=headers,
timeout=120,
)
resp.raise_for_status()
print(resp.json())
Explanation
Runs hallucination detection and returns labels, reason, and related outputs.
Notes
- Requires an existing message ID.
Detect hallucination (SSE streaming)
POST /conversations/{conversation_id}/messages/{message_id}/stream-hallucination
Stream hallucination handling result as Server-Sent Events (SSE).
Streams structured events for detection, optional rewriting, retrieval, and answer generation steps.
- If label == 0 (factual), emits a final event with the reason.
- If label == 1 (hallucination), streams tokens for the final answer and then a final event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conversation_id
|
str
|
Conversation identifier. |
required |
message_id
|
str
|
Message identifier to analyze. |
required |
requesting_user
|
User
|
Authenticated user injected by dependency. |
Depends(get_current_user)
|
Returns:
| Type | Description |
|---|---|
StreamingResponse
|
SSE events for the detection workflow. |
Raises:
| Type | Description |
|---|---|
HTTPException
|
404 if conversation/message not found or mismatched; 403 if access is forbidden; 500 for streaming errors. |
Usage
with requests.post(
f"{BASE_URL}/conversations/{CONVERSATION_ID}/messages/{MESSAGE_ID}/stream-hallucination",
headers={**headers, "Accept": "text/event-stream"},
stream=True,
timeout=120,
) as resp:
resp.raise_for_status()
for line in resp.iter_lines(decode_unicode=True):
if line:
print(line)
Explanation
Streams hallucination detection lifecycle events.
Notes
- Useful for progressive moderation/validation UX.
LLM-only generation
POST /generate-llm
Call EVE-Instruct (v5) (Main model) with a single query. No RAG, no conversation context.
Body: query. Returns the model reply only.
Usage
resp = requests.post(
f"{BASE_URL}/generate-llm",
json={"query": "What is Earth Observation?"},
headers=headers,
timeout=60,
)
resp.raise_for_status()
print(resp.json())
Explanation
Runs direct LLM generation (main model path) without retrieval or conversation persistence.
Notes
- Useful for baseline/debug scenarios.
One-off full generate
POST /generate
Run a one-off generation (testing only) and return the full answer and metadata.
Normalizes and validates requested public collections against allowed lists, ensures the user does not reference other users' collections, merges the user's collections and public collections (excluding "Wiley AI Gateway"), extracts year range from filters, then runs the full generation pipeline via generate_answer and returns the answer, documents, RAG flag, latencies, prompts, and retrieved docs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request
|
GenerationRequest
|
Generation parameters including query, collections, and model settings. |
required |
requesting_user
|
User
|
Authenticated user injected by dependency. |
Depends(get_current_user)
|
Returns:
| Type | Description |
|---|---|
dict
|
Dictionary containing: - answer: Generated answer text. - documents: Extracted document data from retrieval results. - use_rag: Whether RAG was used for this generation. - latencies: Timing information for pipeline steps. - prompts: Prompt data from generation. - retrieved_docs: Raw retrieved documents from RAG. |
Raises:
| Type | Description |
|---|---|
HTTPException
|
403 if the request references collections owned by other users. |
HTTPException
|
500 for server errors during generation. |
Usage
resp = requests.post(
f"{BASE_URL}/generate",
json={
"query": "How is TROPOMI used to support policy making?",
"score_threshold": 0.6,
"temperature": 0.0645,
"k": 10,
"filters": {
"should": None,
"min_should": None,
"must": [],
"must_not": None
},
"llm_type": "main",
"public_collections": [
"Wiley AI Gateway",
"esa-data-qwen-1024",
"Wikipedia EO",
"wikipedia-512",
"satcom-chunks-collection",
"qwen-512-filtered"
]
},
headers=headers,
timeout=120,
)
resp.raise_for_status()
print(resp.json())
Explanation
Runs full retrieval + generation pipeline without storing a conversation message.
Important params
query: User prompt sent to the model.score_threshold: Retrieval similarity threshold from0.0to1.0.k: Number of retrieved documents from0to10.filters: Optional Qdrant-compatible filter object.public_collections: Collection names from collection listing endpoints.temperature: Generation temperature from0.0to1.0(lower is more deterministic).llm_type: Optional model selector (for examplemain,fallback,satcom_small,satcom_large,ship,eve_v05).
Retrieval-only
POST /retrieve
Run the entire retrieval pipeline and return all documents.
Runs the requery/rewrite step (same as generate_answer) to refine the query for retrieval, then executes the RAG retrieval pipeline using setup_rag_and_context and returns all retrieved documents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request
|
GenerationRequest
|
Generation parameters including query, collections, and model settings. |
required |
requesting_user
|
User
|
Authenticated user injected by dependency. |
Depends(get_current_user)
|
Returns:
| Type | Description |
|---|---|
dict
|
Dictionary containing: - retrieved_docs: All formatted documents from the retrieval pipeline - latencies: Timing information (includes rewrite and retrieval operations) - original_query: The query as sent in the request - requery: The rewritten query used for retrieval (or original if rewrite skipped/failed) |
Usage
resp = requests.post(
f"{BASE_URL}/retrieve",
json={
"query": "How is TROPOMI used to support policy making?",
"score_threshold": 0.6,
"k": 10,
"filters": {
"should": None,
"min_should": None,
"must": [],
"must_not": None
},
"public_collections": [
"Wiley AI Gateway",
"esa-data-qwen-1024",
"Wikipedia EO",
"wikipedia-512",
"satcom-chunks-collection",
"qwen-512-filtered"
]
},
headers=headers,
timeout=120,
)
resp.raise_for_status()
print(resp.json())
Explanation
Runs only retrieval and returns matched documents/metadata.
Important params
query: User query string.score_threshold: Retrieval similarity threshold from0.0to1.0.k: Number of retrieved documents from0to10.filters: Optional Qdrant-compatible filter object.public_collections: Collection names from collection listing endpoints.
User message stats
GET /conversations/messages/me/stats
Return counts and character totals for the current user's messages.
Aggregates across all messages belonging to conversations owned by the user.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
requesting_user
|
User
|
Authenticated user injected by dependency. |
Depends(get_current_user)
|
Returns:
| Type | Description |
|---|---|
dict
|
Aggregated stats including counts and character sums. |
Raises:
| Type | Description |
|---|---|
HTTPException
|
500 for server errors during aggregation. |
Usage
resp = requests.get(
f"{BASE_URL}/conversations/messages/me/stats",
headers=headers,
timeout=30,
)
resp.raise_for_status()
print(resp.json())
Explanation
Returns message-level usage statistics for the authenticated user.
Notes
- Requires authentication.
Average latency stats
GET /conversations/messages/average-latencies
Return average latencies aggregated across all messages.
Optionally filters the aggregation by a timestamp window.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
start_date
|
datetime | None
|
Optional start of the time window (inclusive). |
None
|
end_date
|
datetime | None
|
Optional end of the time window (inclusive). |
None
|
Returns:
| Type | Description |
|---|---|
dict
|
Mapping of latency metric name to average value. |
Raises:
| Type | Description |
|---|---|
HTTPException
|
500 for server errors during aggregation. |
Usage
resp = requests.get(
f"{BASE_URL}/conversations/messages/average-latencies",
params={"start_date": "2026-01-01T00:00:00Z", "end_date": "2026-12-31T23:59:59Z"},
timeout=30,
)
resp.raise_for_status()
print(resp.json())
Explanation
Returns average pipeline latency metrics for the selected date range.
Notes
- Endpoint can be used for performance monitoring dashboards.
Full API reference
For exhaustive schema details, use Swagger API.