Skip to content

Use Cases

Practical examples and integration patterns for pg_semantic_cache in real-world applications.

LLM and AI Applications

RAG (Retrieval Augmented Generation) Caching

Cache expensive LLM API calls based on semantic similarity of user questions.

Problem: LLM API calls cost $0.02-$0.05 per request. Users ask similar questions differently.

Solution: Cache LLM responses with semantic matching.

import openai
import psycopg2
import json

class SemanticLLMCache:
    def __init__(self, db_conn_string):
        self.conn = psycopg2.connect(db_conn_string)
        self.openai_client = openai.OpenAI()

    def get_embedding(self, text):
        """Generate embedding for text"""
        response = self.openai_client.embeddings.create(
            model="text-embedding-ada-002",
            input=text
        )
        return response.data[0].embedding

    def ask_llm_cached(self, question, context="", similarity=0.93):
        """Ask LLM with caching"""
        # Generate embedding for question
        embedding = self.get_embedding(question)
        embedding_str = str(embedding)

        # Try cache first
        cur = self.conn.cursor()
        cur.execute("""
            SELECT found, result_data, similarity_score, age_seconds
            FROM semantic_cache.get_cached_result(%s, %s)
        """, (embedding_str, similarity))

        result = cur.fetchone()
        if result:  # Cache hit
            print(f"✓ Cache HIT (similarity: {result[2]:.4f}, age: {result[3]}s)")
            return json.loads(result[1])

        # Cache miss - call actual LLM
        print("✗ Cache MISS - calling OpenAI API")
        llm_response = self.openai_client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": context},
                {"role": "user", "content": question}
            ]
        )

        answer = llm_response.choices[0].message.content
        tokens = llm_response.usage.total_tokens

        # Cache the response
        result_data = json.dumps({
            "answer": answer,
            "tokens": tokens,
            "model": "gpt-4"
        })

        cur.execute("""
            SELECT semantic_cache.cache_query(
                %s, %s, %s::jsonb, 7200, ARRAY['llm', 'rag']
            )
        """, (question, embedding_str, result_data))
        self.conn.commit()

        return {"answer": answer, "tokens": tokens}

# Usage
cache = SemanticLLMCache("dbname=mydb user=postgres")

# These similar questions will hit the cache
cache.ask_llm_cached("What was our Q4 revenue?")
cache.ask_llm_cached("Show me Q4 revenue")  # Cache hit!
cache.ask_llm_cached("Q4 revenue please")   # Cache hit!

Savings: With 80% hit rate on 10K daily queries: $140/day or $51,100/year

Chatbot Response Caching

import { OpenAI } from 'openai';
import { Pool } from 'pg';

interface CachedResponse {
  answer: string;
  cached: boolean;
  similarity?: number;
}

class ChatbotCache {
  private openai: OpenAI;
  private pool: Pool;

  constructor(dbConfig: any) {
    this.openai = new OpenAI();
    this.pool = new Pool(dbConfig);
  }

  async getCachedResponse(
    userMessage: string,
    context: string[]
  ): Promise<CachedResponse> {
    // Generate embedding
    const embeddingResp = await this.openai.embeddings.create({
      model: 'text-embedding-ada-002',
      input: userMessage,
    });
    const embedding = embeddingResp.data[0].embedding;
    const embeddingStr = `[${embedding.join(',')}]`;

    // Check cache
    const cacheResult = await this.pool.query(
      'SELECT * FROM semantic_cache.get_cached_result($1, 0.92)',
      [embeddingStr]
    );

    if (cacheResult.rows.length > 0) {
      return {
        answer: cacheResult.rows[0].result_data.answer,
        cached: true,
        similarity: cacheResult.rows[0].similarity_score,
      };
    }

    // Call LLM
    const completion = await this.openai.chat.completions.create({
      model: 'gpt-3.5-turbo',
      messages: [
        { role: 'system', content: context.join('\n') },
        { role: 'user', content: userMessage },
      ],
    });

    const answer = completion.choices[0].message.content!;

    // Cache response
    await this.pool.query(
      `SELECT semantic_cache.cache_query($1, $2, $3::jsonb, 3600, ARRAY['chatbot'])`,
      [userMessage, embeddingStr, JSON.stringify({ answer })]
    );

    return { answer, cached: false };
  }
}

Analytics and Reporting

Dashboard Query Caching

Cache expensive analytical queries that power dashboards.

-- Application caching wrapper for analytics
CREATE OR REPLACE FUNCTION app.get_sales_analytics(
    query_text TEXT,
    params JSONB
) RETURNS JSONB AS $$
DECLARE
    query_embedding TEXT;
    cached_result RECORD;
    actual_result JSONB;
    computation_time INTERVAL;
    start_time TIMESTAMPTZ;
BEGIN
    -- Generate deterministic embedding from query + params
    -- (In production, use actual embedding service)
    query_embedding := (
        SELECT array_agg(
            (hashtext((query_text || params::text)::text) + i)::float / 2147483647
        )::text
        FROM generate_series(1, 1536) i
    );

    -- Try cache
    SELECT * INTO cached_result
    FROM semantic_cache.get_cached_result(
        query_embedding,
        0.95,
        1800  -- Max 30 minutes old
    );

    IF cached_result.found IS NOT NULL THEN
        -- Cache hit
        RAISE NOTICE 'Cache HIT - saved query execution';
        RETURN cached_result.result_data;
    END IF;

    -- Cache miss - execute expensive query
    RAISE NOTICE 'Cache MISS - executing analytics query';
    start_time := clock_timestamp();

    -- Example: Complex analytics query
    SELECT jsonb_build_object(
        'total_revenue', SUM(amount),
        'order_count', COUNT(*),
        'avg_order_value', AVG(amount),
        'period', params->>'period'
    ) INTO actual_result
    FROM orders
    WHERE created_at >= (params->>'start_date')::timestamptz
      AND created_at <= (params->>'end_date')::timestamptz
      AND status = 'completed';

    computation_time := clock_timestamp() - start_time;
    RAISE NOTICE 'Query executed in %', computation_time;

    -- Cache result (longer TTL for analytics)
    PERFORM semantic_cache.cache_query(
        query_text,
        query_embedding,
        actual_result,
        7200,  -- 2 hours
        ARRAY['analytics', 'dashboard']
    );

    RETURN actual_result;
END;
$$ LANGUAGE plpgsql;

-- Usage
SELECT app.get_sales_analytics(
    'Total sales and order metrics',
    '{"period": "Q4", "start_date": "2024-10-01", "end_date": "2024-12-31"}'::jsonb
);

Time-Series Report Caching

-- Cache daily/weekly/monthly reports
CREATE OR REPLACE FUNCTION app.cached_time_series_report(
    report_type TEXT,  -- 'daily', 'weekly', 'monthly'
    metric_name TEXT
) RETURNS TABLE(period DATE, value NUMERIC) AS $$
DECLARE
    query_emb TEXT;
    cached RECORD;
    ttl_seconds INTEGER;
BEGIN
    -- Generate embedding (simplified)
    query_emb := (
        SELECT array_agg(random()::float4)::text
        FROM generate_series(1, 1536)
    );

    -- Adjust TTL based on granularity
    ttl_seconds := CASE report_type
        WHEN 'daily' THEN 3600      -- 1 hour
        WHEN 'weekly' THEN 14400    -- 4 hours
        WHEN 'monthly' THEN 86400   -- 24 hours
    END;

    -- Try cache
    SELECT * INTO cached FROM semantic_cache.get_cached_result(query_emb, 0.95);

    IF cached.found IS NOT NULL THEN
        -- Return cached data as table
        RETURN QUERY
        SELECT (item->>'period')::DATE, (item->>'value')::NUMERIC
        FROM jsonb_array_elements(cached.result_data->'data') item;
        RETURN;
    END IF;

    -- Execute and cache (simplified example)
    PERFORM semantic_cache.cache_query(
        format('Report: %s - %s', report_type, metric_name),
        query_emb,
        '{"data": []}'::jsonb,  -- Your actual query results
        ttl_seconds,
        ARRAY['reports', report_type]
    );

    RETURN QUERY SELECT NULL::DATE, NULL::NUMERIC WHERE FALSE;
END;
$$ LANGUAGE plpgsql;

External API Results

Third-Party API Response Caching

Cache responses from expensive external APIs (weather, geocoding, stock prices, etc.).

import requests
import psycopg2
from sentence_transformers import SentenceTransformer

class APICache:
    def __init__(self, db_conn_string):
        self.conn = psycopg2.connect(db_conn_string)
        self.encoder = SentenceTransformer('all-MiniLM-L6-v2')

    def fetch_with_cache(self, query, api_call_fn, ttl=3600):
        """
        Fetch from API with semantic caching

        Args:
            query: Natural language query (e.g., "weather in San Francisco")
            api_call_fn: Function to call API
            ttl: Cache TTL in seconds
        """
        # Generate embedding
        embedding = self.encoder.encode(query)
        embedding_str = str(embedding.tolist())

        # Check cache
        cur = self.conn.cursor()
        cur.execute("""
            SELECT found, result_data
            FROM semantic_cache.get_cached_result(%s, 0.90, %s)
        """, (embedding_str, ttl))

        result = cur.fetchone()
        if result:
            print(f"✓ Using cached API response")
            return result[1]

        # Call actual API
        print(f"✗ Calling external API")
        api_response = api_call_fn()

        # Cache response
        import json
        cur.execute("""
            SELECT semantic_cache.cache_query(
                %s, %s, %s::jsonb, %s, ARRAY['api', 'external']
            )
        """, (query, embedding_str, json.dumps(api_response), ttl))
        self.conn.commit()

        return api_response

# Usage examples

# Weather API
def get_weather(city):
    cache = APICache("dbname=mydb")
    return cache.fetch_with_cache(
        f"weather in {city}",
        lambda: requests.get(f"https://api.weather.com/{city}").json(),
        ttl=1800  # 30 minutes
    )

# Geocoding API
def geocode(address):
    cache = APICache("dbname=mydb")
    return cache.fetch_with_cache(
        f"geocode {address}",
        lambda: requests.get(f"https://api.geocode.com?q={address}").json(),
        ttl=86400  # 24 hours (addresses don't change)
    )

# Stock prices
def get_stock_price(symbol):
    cache = APICache("dbname=mydb")
    return cache.fetch_with_cache(
        f"stock price {symbol}",
        lambda: requests.get(f"https://api.stocks.com/{symbol}").json(),
        ttl=60  # 1 minute (real-time data)
    )

Database Query Optimization

Expensive Join Caching

Cache results from expensive multi-table joins.

-- Wrap expensive queries with semantic caching
CREATE OR REPLACE FUNCTION app.get_customer_summary(
    customer_identifier TEXT  -- email, name, or ID
) RETURNS JSONB AS $$
DECLARE
    query_emb TEXT;
    cached RECORD;
    result JSONB;
BEGIN
    -- Simple embedding generation (replace with actual service)
    query_emb := (
        SELECT array_agg((hashtext(customer_identifier || i::text)::float / 2147483647)::float4)::text
        FROM generate_series(1, 1536) i
    );

    -- Check cache
    SELECT * INTO cached
    FROM semantic_cache.get_cached_result(query_emb, 0.98, 300);

    IF cached.found IS NOT NULL THEN
        RETURN cached.result_data;
    END IF;

    -- Execute expensive query
    WITH customer_data AS (
        SELECT
            c.id,
            c.name,
            c.email,
            COUNT(DISTINCT o.id) as total_orders,
            SUM(o.amount) as lifetime_value,
            AVG(o.amount) as avg_order_value,
            MAX(o.created_at) as last_order_date
        FROM customers c
        LEFT JOIN orders o ON c.id = o.customer_id
        LEFT JOIN order_items oi ON o.id = oi.order_id
        LEFT JOIN products p ON oi.product_id = p.id
        WHERE c.email ILIKE '%' || customer_identifier || '%'
           OR c.name ILIKE '%' || customer_identifier || '%'
           OR c.id::text = customer_identifier
        GROUP BY c.id, c.name, c.email
    )
    SELECT jsonb_build_object(
        'customer', row_to_json(cd.*)
    ) INTO result
    FROM customer_data cd;

    -- Cache for 5 minutes
    PERFORM semantic_cache.cache_query(
        'Customer summary: ' || customer_identifier,
        query_emb,
        result,
        300,
        ARRAY['customer', 'summary']
    );

    RETURN result;
END;
$$ LANGUAGE plpgsql;

-- Usage - these similar queries hit cache:
SELECT app.get_customer_summary('[email protected]');
SELECT app.get_customer_summary('[email protected]');     -- Exact match
SELECT app.get_customer_summary('John Doe');              -- By name
SELECT app.get_customer_summary('john');                  -- Partial match

Scheduled Maintenance

Automatic Cache Cleanup

-- Create maintenance function
CREATE OR REPLACE FUNCTION semantic_cache.scheduled_maintenance()
RETURNS TABLE(operation TEXT, affected_rows BIGINT, duration INTERVAL) AS $$
DECLARE
    start_time TIMESTAMPTZ;
    evicted BIGINT;
BEGIN
    -- 1. Evict expired entries
    start_time := clock_timestamp();
    evicted := semantic_cache.evict_expired();
    RETURN QUERY SELECT
        'evict_expired'::TEXT,
        evicted,
        clock_timestamp() - start_time;

    -- 2. Auto-evict based on policy
    start_time := clock_timestamp();
    evicted := semantic_cache.auto_evict();
    RETURN QUERY SELECT
        'auto_evict'::TEXT,
        evicted,
        clock_timestamp() - start_time;

    -- 3. Analyze tables
    start_time := clock_timestamp();
    EXECUTE 'ANALYZE semantic_cache.cache_entries';
    EXECUTE 'ANALYZE semantic_cache.cache_metadata';
    RETURN QUERY SELECT
        'analyze_tables'::TEXT,
        0::BIGINT,
        clock_timestamp() - start_time;
END;
$$ LANGUAGE plpgsql;

-- Schedule with pg_cron (if available)
-- Run every hour
SELECT cron.schedule(
    'cache-maintenance',
    '0 * * * *',
    'SELECT semantic_cache.scheduled_maintenance()'
);

-- Or run manually
SELECT * FROM semantic_cache.scheduled_maintenance();

Cache Warming

Pre-populate cache with common queries.

-- Warm cache with popular queries
CREATE OR REPLACE FUNCTION app.warm_cache()
RETURNS INTEGER AS $$
DECLARE
    warmed_count INTEGER := 0;
BEGIN
    -- Example: Pre-cache common dashboard queries
    PERFORM semantic_cache.cache_query(
        'Total sales this month',
        (SELECT array_agg(random()::float4)::text FROM generate_series(1, 1536)),
        (SELECT jsonb_build_object('total', SUM(amount)) FROM orders
         WHERE created_at >= DATE_TRUNC('month', NOW())),
        3600,
        ARRAY['dashboard', 'warmed']
    );
    warmed_count := warmed_count + 1;

    -- Add more common queries...

    RETURN warmed_count;
END;
$$ LANGUAGE plpgsql;

-- Run on application startup or schedule daily
SELECT app.warm_cache();

Multi-Language Support

Caching Across Languages

Cache queries regardless of language using embeddings.

from sentence_transformers import SentenceTransformer
import psycopg2

class MultilingualCache:
    def __init__(self, db_conn_string):
        self.conn = psycopg2.connect(db_conn_string)
        # Use multilingual model
        self.encoder = SentenceTransformer('paraphrase-multilingual-mpnet-base-v2')

    def cached_query(self, query_text, language):
        """Cache works across languages!"""
        embedding = self.encoder.encode(query_text)
        embedding_str = str(embedding.tolist())

        # Check cache (works for all languages)
        cur = self.conn.cursor()
        cur.execute("""
            SELECT * FROM semantic_cache.get_cached_result(%s, 0.90)
        """, (embedding_str,))

        result = cur.fetchone()
        if result:
            return result[1]

        # Execute query and cache
        # ... your query logic ...

# These queries in different languages can hit the same cache entry!
cache = MultilingualCache("dbname=mydb")

cache.cached_query("What is the total revenue?", "en")
cache.cached_query("¿Cuál es el ingreso total?", "es")      # Cache hit!
cache.cached_query("Quel est le revenu total?", "fr")       # Cache hit!
cache.cached_query("Qual é a receita total?", "pt")         # Cache hit!

Next Steps