Scheduler Implementation
This document explains the implementation details of the probe scheduler.
Overview
The ProbeScheduler is responsible for the following
tasks:
- The scheduler loads probe configurations from the database.
- The scheduler manages probe execution timers.
- The scheduler coordinates parallel probe execution.
- The scheduler handles timeouts and errors.
- The scheduler performs graceful shutdown.
Scheduler Structure
The following code shows the ProbeScheduler struct:
type ProbeScheduler struct {
datastore *database.Datastore
poolManager *database.MonitoredConnectionPoolManager
serverSecret string
config Config
probes map[string]probes.MetricsProbe
shutdownChan chan struct{}
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
Initialization
The scheduler starts by loading configurations and launching goroutines.
Start Sequence
The following code shows the start sequence:
func (ps *ProbeScheduler) Start(
ctx context.Context) error {
// 1. Load probe configurations from database
conn, err := ps.datastore.GetConnection()
if err != nil {
return err
}
configs, err := probes.LoadProbeConfigs(
ctx, conn)
ps.datastore.ReturnConnection(conn)
if err != nil {
return err
}
// 2. Initialize probe instances
for _, config := range configs {
probe := ps.createProbe(&config)
if probe != nil {
ps.probes[config.Name] = probe
}
}
// 3. Start scheduling goroutines
for _, probe := range ps.probes {
ps.wg.Add(1)
go ps.scheduleProbe(probe)
}
return nil
}
Probe Scheduling
Each probe runs in its own goroutine with an independent timer. The following code shows the scheduling loop:
func (ps *ProbeScheduler) scheduleProbe(
probe probes.MetricsProbe) {
defer ps.wg.Done()
config := probe.GetConfig()
interval := time.Duration(
config.CollectionIntervalSeconds,
) * time.Second
ticker := time.NewTicker(interval)
defer ticker.Stop()
// Execute immediately on startup
ps.executeProbe(ps.ctx, probe)
// Then execute on timer
for {
select {
case <-ps.shutdownChan:
return
case <-ps.ctx.Done():
return
case <-ticker.C:
ps.executeProbe(ps.ctx, probe)
}
}
}
Probe Execution
The scheduler executes each probe against all monitored connections in parallel.
Execution Overview
The following code shows how the scheduler executes a probe:
func (ps *ProbeScheduler) executeProbe(
ctx context.Context,
probe probes.MetricsProbe) {
// 1. Get all monitored connections
connections, err :=
ps.datastore.GetMonitoredConnections()
if err != nil {
log.Printf(
"Error getting connections: %v", err)
return
}
// 2. Execute probe for each connection
var wg sync.WaitGroup
for _, conn := range connections {
wg.Add(1)
go func(c database.MonitoredConnection) {
defer wg.Done()
ps.executeProbeForConnection(
ctx, probe, c)
}(conn)
}
// 3. Wait for all to complete
wg.Wait()
}
Per-Connection Execution
The following code shows how the scheduler executes a probe for a single connection:
func (ps *ProbeScheduler) executeProbeForConnection(
ctx context.Context,
probe probes.MetricsProbe,
conn database.MonitoredConnection) {
timeout := time.Duration(
ps.config.GetMonitoredPoolMaxWaitSeconds(),
) * time.Second
execCtx, cancel := context.WithTimeout(
ctx, timeout)
defer cancel()
timestamp := time.Now()
var allMetrics []map[string]interface{}
if probe.IsDatabaseScoped() {
allMetrics =
ps.executeProbeForAllDatabases(
execCtx, probe, conn)
} else {
allMetrics =
ps.executeProbeForServerWide(
execCtx, probe, conn)
}
if execCtx.Err() == context.DeadlineExceeded {
log.Printf("Probe %s timed out for %s",
probe.GetName(), conn.Name)
return
}
if len(allMetrics) > 0 {
ps.storeMetrics(ctx, probe, conn.ID,
timestamp, allMetrics)
}
}
Server-Scoped Execution
Server-scoped probes execute once per monitored connection. The following code shows the execution flow for server-wide probes:
func (ps *ProbeScheduler) executeProbeForServerWide(
ctx context.Context,
probe probes.MetricsProbe,
conn database.MonitoredConnection,
) []map[string]interface{} {
monitoredConn, err :=
ps.poolManager.GetConnection(
ctx, conn, ps.serverSecret)
if err != nil {
log.Printf(
"Error getting connection: %v", err)
return nil
}
defer ps.poolManager.ReturnConnection(
conn.ID, monitoredConn)
metrics, err := probe.Execute(
ctx, monitoredConn)
if err != nil {
log.Printf(
"Error executing probe: %v", err)
return nil
}
return metrics
}
Database-Scoped Execution
Database-scoped probes execute once for each database
on a monitored server. The scheduler queries
pg_database for the database list and then executes
the probe against each database.
The following code shows the execution flow for database-scoped probes:
func (ps *ProbeScheduler)
executeProbeForAllDatabases(
ctx context.Context,
probe probes.MetricsProbe,
conn database.MonitoredConnection,
) []map[string]interface{} {
var allMetrics []map[string]interface{}
defaultConn, err :=
ps.poolManager.GetConnection(
ctx, conn, ps.serverSecret)
if err != nil {
return allMetrics
}
databases, err := ps.getDatabaseList(
ctx, defaultConn)
if err != nil {
ps.poolManager.ReturnConnection(
conn.ID, defaultConn)
return allMetrics
}
// Execute on first database
if len(databases) > 0 {
metrics, err := probe.Execute(
ctx, defaultConn)
if err == nil {
for i := range metrics {
metrics[i]["_database_name"] =
databases[0]
}
allMetrics = append(
allMetrics, metrics...)
}
}
ps.poolManager.ReturnConnection(
conn.ID, defaultConn)
// Execute on remaining databases
for i := 1; i < len(databases); i++ {
dbName := databases[i]
dbConn, err :=
ps.poolManager.GetConnectionForDatabase(
ctx, conn, dbName,
ps.serverSecret)
if err != nil {
continue
}
metrics, err := probe.Execute(
ctx, dbConn)
ps.poolManager.ReturnConnection(
conn.ID, dbConn)
if err == nil {
for j := range metrics {
metrics[j]["_database_name"] =
dbName
}
allMetrics = append(
allMetrics, metrics...)
}
}
return allMetrics
}
Metrics Storage
The following code shows how the scheduler stores collected metrics:
func (ps *ProbeScheduler) storeMetrics(
ctx context.Context,
probe probes.MetricsProbe,
connectionID int,
timestamp time.Time,
metrics []map[string]interface{}) {
timeout := time.Duration(
ps.config.GetDatastorePoolMaxWaitSeconds(),
) * time.Second
storeCtx, cancel := context.WithTimeout(
ctx, timeout)
defer cancel()
datastoreConn, err :=
ps.datastore.GetConnectionWithContext(
storeCtx)
if err != nil {
log.Printf(
"Error getting datastore conn: %v",
err)
return
}
defer ps.datastore.ReturnConnection(
datastoreConn)
err = probe.Store(storeCtx, datastoreConn,
connectionID, timestamp, metrics)
if err != nil {
log.Printf(
"Error storing metrics: %v", err)
}
}
Shutdown Sequence
The following code shows the shutdown sequence:
func (ps *ProbeScheduler) Stop() {
// 1. Cancel context
ps.cancel()
// 2. Signal all goroutines to stop
close(ps.shutdownChan)
// 3. Wait for all probe goroutines
ps.wg.Wait()
log.Println("Probe scheduler stopped")
}
Concurrency Control
The scheduler manages concurrency at several levels.
Per-Probe Goroutines
Each probe has one goroutine managing its schedule, giving a total of 34 goroutines for the standard probe set.
Per-Connection Goroutines
When a probe executes, the scheduler spawns one goroutine per connection. These goroutines are short-lived and last only for the duration of probe execution. With 10 monitored servers, the system can produce up to 340 concurrent goroutines during probe execution.
Connection Pool Limits
The datastore uses a global limit of 25 connections by default. Monitored servers use a per-server limit of 5 connections each by default.
Semaphores
Each monitored server has a semaphore implemented as a buffered channel. The semaphore limits concurrent probe executions per server.
The following code shows the semaphore implementation:
type MonitoredConnectionPoolManager struct {
semaphores map[int]chan struct{}
maxConnections int
}
func (m *MonitoredConnectionPoolManager) acquireSlot(
ctx context.Context,
connectionID int) error {
sem := m.getSemaphore(connectionID)
select {
case sem <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (m *MonitoredConnectionPoolManager) releaseSlot(
connectionID int) {
sem := m.getSemaphore(connectionID)
<-sem
}
Timeout Handling
The scheduler applies timeouts at two stages.
Execution Timeout
Each probe execution has a timeout equal to
monitored_pool_max_wait_seconds. If the timeout
is exceeded, the probe execution is cancelled and
the error is logged. The probe retries on the next
interval.
Storage Timeout
Metric storage has its own timeout equal to
datastore_pool_max_wait_seconds. The system
applies this timeout independently from the
execution timeout.
Error Handling
The scheduler handles errors at several levels.
Connection Errors
The scheduler logs connection errors with the connection name and error message. The probe is skipped for that connection and retries on the next interval.
Execution Errors
The scheduler logs execution errors with the probe name, connection, and error message. Metrics are not collected for that interval.
Storage Errors
The scheduler logs storage errors with the probe name. Metrics are lost for that interval and the probe continues on the next interval.
Timeout Errors
The scheduler logs timeout errors with the timeout duration. The execution is cancelled and retries on the next interval.
Performance Characteristics
This section describes the scheduler performance profile.
Scheduling Overhead
The scheduling overhead is minimal because each probe sleeps on a timer. The 34 goroutines with minimal stack sizes consume approximately 2-4 MB of memory.
Execution Parallelism
Probes execute independently of each other. The scheduler processes connections in parallel, limited by connection pool sizes.
Memory Usage
Each probe execution uses 100 KB to 10 MB depending on the result set size. Peak memory occurs during mass probe execution. The system releases memory when metrics are stored.
Monitoring the Scheduler
This section describes how to monitor the scheduler.
Check Probe Timing
Watch the logs for probe execution times:
2025/11/05 10:00:00 Probe pg_stat_activity
on Server1 completed in 45.23ms
Check for Timeouts
Look for timeout messages in the logs:
2025/11/05 10:00:00 Probe pg_stat_activity
timed out for Server1 (timeout: 60 seconds)
Check Probe Status
In the following example, the query shows the most recent collection time for each connection:
SELECT
c.name,
MAX(pa.collected_at)
AS last_activity_collection,
MAX(pd.collected_at)
AS last_database_collection
FROM connections c
LEFT JOIN metrics.pg_stat_activity pa
ON c.id = pa.connection_id
LEFT JOIN metrics.pg_stat_database pd
ON c.id = pd.connection_id
WHERE c.is_monitored = TRUE
GROUP BY c.name
ORDER BY c.name;
Tuning the Scheduler
This section describes how to tune the scheduler for optimal performance.
Adjusting Collection Intervals
Balance data freshness against system load by adjusting collection intervals. In the following example, the commands adjust intervals for specific probes:
-- Reduce frequency for low-priority probes
UPDATE probes
SET collection_interval_seconds = 900
WHERE name = 'pg_stat_io';
-- Increase frequency for critical probes
UPDATE probes
SET collection_interval_seconds = 30
WHERE name = 'pg_stat_replication';
Adjusting Timeouts
Increase timeouts if the logs show timeout errors. In the following example, the configuration file settings increase both timeouts:
monitored_pool_max_wait_seconds = 120
datastore_pool_max_wait_seconds = 90
Adjusting Concurrency
Increase pool sizes for better parallelism. In the following example, the configuration file settings increase both pool sizes:
monitored_pool_max_connections = 10
datastore_pool_max_connections = 50
See Also
The following resources provide additional details.
- Probes covers the probe system internals.
- Architecture describes the overall system design.
- Testing and Development covers the development environment.