Skip to content

API Reference

The pgEdge RAG Server provides a REST API for querying RAG pipelines.

Base URL

By default, the server listens on http://localhost:8080. All endpoints use the /v1 API version prefix.

API Discovery

The server implements RFC 8631 for API documentation discovery. All JSON responses include a Link header:

Link: </v1/openapi.json>; rel="service-desc"

This allows tools like restish to automatically discover and use the API schema.

Endpoints

OpenAPI Specification

Get the OpenAPI v3 specification for the API.

GET /v1/openapi.json

Response

Returns an OpenAPI 3.0.3 specification document describing all API endpoints, request/response schemas, and error formats.

Status Code Description
200 OpenAPI specification

Health Check

Check if the server is running and healthy.

GET /v1/health

Response

{
  "status": "healthy"
}
Status Code Description
200 Server is healthy

List Pipelines

Get a list of all available RAG pipelines.

GET /v1/pipelines

Response

{
  "pipelines": [
    {
      "name": "my-docs",
      "description": "Search my documentation"
    },
    {
      "name": "knowledge-base",
      "description": "Corporate knowledge base"
    }
  ]
}
Status Code Description
200 List of pipelines

Query Pipeline

Execute a RAG query against a specific pipeline.

POST /v1/pipelines/{name}

Path Parameters

Parameter Description
name Pipeline name (from config)

Request Body

{
  "query": "How do I configure replication?",
  "stream": false,
  "top_n": 10,
  "filter": {
    "conditions": [
      {"column": "product", "operator": "=", "value": "pgEdge"},
      {"column": "version", "operator": "=", "value": "v5.0"}
    ],
    "logic": "AND"
  },
  "include_sources": true,
  "messages": [
    {"role": "user", "content": "What is pgEdge?"},
    {"role": "assistant", "content": "pgEdge is a distributed PostgreSQL platform..."}
  ]
}
Field Type Required Description
query string Yes The question to answer
stream boolean No Enable streaming response (SSE)
top_n integer No Override default result limit
filter object No Structured filter to apply to results
include_sources boolean No Include source documents (default: false)
messages array No Previous conversation history for context

The filter parameter accepts a structured filter object with conditions and operators. This is useful when your data contains multiple products or versions and you want to restrict results. API filters must use this structured format for security (parameterized queries prevent SQL injection).

If the pipeline configuration also specifies a filter, both filters are combined using AND logic.

Filter examples:

Single condition:

{
  "conditions": [
    {"column": "product", "operator": "=", "value": "pgAdmin"}
  ]
}

Multiple conditions with AND:

{
  "conditions": [
    {"column": "product", "operator": "=", "value": "pgAdmin"},
    {"column": "version", "operator": ">=", "value": "v8.0"}
  ],
  "logic": "AND"
}

Multiple conditions with OR:

{
  "conditions": [
    {"column": "status", "operator": "=", "value": "published"},
    {"column": "status", "operator": "=", "value": "draft"}
  ],
  "logic": "OR"
}

Supported operators: =, !=, <, >, <=, >=, LIKE, ILIKE, IN, NOT IN, IS NULL, IS NOT NULL

Message Object
Field Type Description
role string Message role: user or assistant
content string Message content

Non-Streaming Response

{
  "answer": "To configure replication, you need to...",
  "tokens_used": 1523
}

When include_sources: true:

{
  "answer": "To configure replication, you need to...",
  "sources": [
    {
      "id": "doc-123",
      "content": "Replication is configured by...",
      "score": 0.95
    },
    {
      "id": "doc-456",
      "content": "The replication settings include...",
      "score": 0.87
    }
  ],
  "tokens_used": 1523
}
Field Type Description
answer string The generated answer
sources array Source documents (only if requested)
tokens_used integer Total tokens consumed by the request
Source Object
Field Type Description
id string Document identifier (if available)
content string Document text content
score number Relevance score (higher is better)

Streaming Response

When stream: true, the response uses Server-Sent Events (SSE).

Headers:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

Event Format:

Each event is a JSON object sent as an SSE data line:

data: {"type": "chunk", "content": "To configure "}

data: {"type": "chunk", "content": "replication, "}

data: {"type": "chunk", "content": "you need to..."}

data: {"type": "done"}
Event Types
Type Description Fields
chunk Partial response content content
done Stream completed successfully -
error An error occurred error

Error Responses

{
  "error": {
    "code": "PIPELINE_NOT_FOUND",
    "message": "pipeline not found: unknown-pipeline"
  }
}
Status Code Error Code Description
400 INVALID_REQUEST Invalid request body or query
404 PIPELINE_NOT_FOUND Pipeline does not exist
405 METHOD_NOT_ALLOWED Wrong HTTP method
500 EXECUTION_ERROR Pipeline execution failed
500 INTERNAL_ERROR Unexpected server error

Examples

cURL

List pipelines:

curl http://localhost:8080/v1/pipelines

Simple query:

curl -X POST http://localhost:8080/v1/pipelines/my-docs \
  -H "Content-Type: application/json" \
  -d '{"query": "How do I get started?"}'

Query with filter:

curl -X POST http://localhost:8080/v1/pipelines/my-docs \
  -H "Content-Type: application/json" \
  -d '{
    "query": "How do I configure backups?",
    "filter": {
      "conditions": [
        {"column": "product", "operator": "=", "value": "pgAdmin"},
        {"column": "version", "operator": "=", "value": "v9.0"}
      ],
      "logic": "AND"
    }
  }'

Streaming query:

curl -X POST http://localhost:8080/v1/pipelines/my-docs \
  -H "Content-Type: application/json" \
  -N \
  -d '{"query": "Explain the architecture", "stream": true}'

Python

Non-streaming:

import requests

response = requests.post(
    "http://localhost:8080/v1/pipelines/my-docs",
    json={"query": "How do I configure SSL?"}
)

data = response.json()
print(data["answer"])

for source in data["sources"]:
    print(f"- {source['content'][:100]}... (score: {source['score']:.2f})")

Streaming:

import requests

response = requests.post(
    "http://localhost:8080/v1/pipelines/my-docs",
    json={"query": "Explain the setup process", "stream": True},
    stream=True
)

for line in response.iter_lines():
    if line and line.startswith(b"data: "):
        import json
        event = json.loads(line[6:])
        if event["type"] == "chunk":
            print(event["content"], end="", flush=True)
        elif event["type"] == "done":
            print()  # newline at end

JavaScript

Non-streaming:

const response = await fetch("http://localhost:8080/v1/pipelines/my-docs", {
  method: "POST",
  headers: { "Content-Type": "application/json" },
  body: JSON.stringify({ query: "How do I get started?" }),
});

const data = await response.json();
console.log(data.answer);

Streaming with EventSource:

// Using fetch for SSE
const response = await fetch("http://localhost:8080/v1/pipelines/my-docs", {
  method: "POST",
  headers: { "Content-Type": "application/json" },
  body: JSON.stringify({ query: "Explain the setup", stream: true }),
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  const text = decoder.decode(value);
  const lines = text.split("\n");

  for (const line of lines) {
    if (line.startsWith("data: ")) {
      const event = JSON.parse(line.slice(6));
      if (event.type === "chunk") {
        process.stdout.write(event.content);
      }
    }
  }
}

Rate Limiting

The server does not implement rate limiting. If needed, use a reverse proxy (nginx, Caddy, etc.) or API gateway in front of the server.

Authentication

The server does not implement authentication. For production deployments, place the server behind an authenticating proxy or API gateway.