You've already forked prometheus-process-exporter
500 lines
13 KiB
Go
500 lines
13 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"strconv"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"gopkg.in/yaml.v2"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"github.com/shirou/gopsutil/v3/process"
|
|
)
|
|
|
|
type ProcessConfig struct {
|
|
Name string `yaml:"name"`
|
|
Pid int `yaml:"pid"`
|
|
}
|
|
|
|
type Config struct {
|
|
Processes []ProcessConfig `yaml:"processes"`
|
|
Interval time.Duration `yaml:"interval"`
|
|
}
|
|
|
|
type Exporter struct {
|
|
config Config
|
|
metrics *ProcessMetrics
|
|
lastCollection time.Time
|
|
mutex sync.Mutex
|
|
verbose bool
|
|
}
|
|
|
|
type ProcessMetrics struct {
|
|
cpuUsage *prometheus.GaugeVec
|
|
memoryUsage *prometheus.GaugeVec
|
|
memoryPercent *prometheus.GaugeVec
|
|
uptime *prometheus.GaugeVec
|
|
openFiles *prometheus.GaugeVec
|
|
threadCount *prometheus.GaugeVec
|
|
residentMemory *prometheus.GaugeVec
|
|
virtualMemory *prometheus.GaugeVec
|
|
readBytes *prometheus.CounterVec
|
|
writeBytes *prometheus.CounterVec
|
|
}
|
|
|
|
func NewExporter(config Config, verbose bool) *Exporter {
|
|
return &Exporter{
|
|
config: config,
|
|
metrics: newProcessMetrics(),
|
|
lastCollection: time.Now(),
|
|
verbose: verbose,
|
|
}
|
|
}
|
|
|
|
func newProcessMetrics() *ProcessMetrics {
|
|
return &ProcessMetrics{
|
|
cpuUsage: createGaugeVec("process_cpu_usage_percent", "CPU usage percentage of the process"),
|
|
memoryUsage: createGaugeVec("process_memory_usage_bytes", "Memory usage in bytes of the process"),
|
|
memoryPercent: createGaugeVec("process_memory_percent", "Memory usage percentage of the process"),
|
|
uptime: createGaugeVec("process_uptime_seconds", "Uptime of the process in seconds"),
|
|
openFiles: createGaugeVec("process_open_files", "Number of open files by the process"),
|
|
threadCount: createGaugeVec("process_thread_count", "Number of threads used by the process"),
|
|
residentMemory: createGaugeVec("process_resident_memory_bytes", "Resident memory size in bytes"),
|
|
virtualMemory: createGaugeVec("process_virtual_memory_bytes", "Virtual memory size in bytes"),
|
|
readBytes: createCounterVec("process_read_bytes_total", "Total number of bytes read by the process"),
|
|
writeBytes: createCounterVec("process_write_bytes_total", "Total number of bytes written by the process"),
|
|
}
|
|
}
|
|
|
|
func createGaugeVec(name, help string) *prometheus.GaugeVec {
|
|
return prometheus.NewGaugeVec(
|
|
prometheus.GaugeOpts{
|
|
Name: name,
|
|
Help: help,
|
|
},
|
|
[]string{"process", "pid"},
|
|
)
|
|
}
|
|
|
|
func createCounterVec(name, help string) *prometheus.CounterVec {
|
|
return prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Name: name,
|
|
Help: help,
|
|
},
|
|
[]string{"process", "pid"},
|
|
)
|
|
}
|
|
|
|
func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
|
|
e.metrics.describeAll(ch)
|
|
}
|
|
|
|
func (pm *ProcessMetrics) describeAll(ch chan<- *prometheus.Desc) {
|
|
pm.cpuUsage.Describe(ch)
|
|
pm.memoryUsage.Describe(ch)
|
|
pm.memoryPercent.Describe(ch)
|
|
pm.uptime.Describe(ch)
|
|
pm.openFiles.Describe(ch)
|
|
pm.threadCount.Describe(ch)
|
|
pm.residentMemory.Describe(ch)
|
|
pm.virtualMemory.Describe(ch)
|
|
pm.readBytes.Describe(ch)
|
|
pm.writeBytes.Describe(ch)
|
|
}
|
|
|
|
func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
|
|
if e.verbose {
|
|
log.Printf("Serving metrics from cache (last collected: %s)", e.lastCollection.Format("15:04:05"))
|
|
}
|
|
|
|
e.metrics.collectAll(ch)
|
|
|
|
if e.verbose {
|
|
log.Printf("Metrics served successfully")
|
|
}
|
|
}
|
|
|
|
func (pm *ProcessMetrics) collectAll(ch chan<- prometheus.Metric) {
|
|
pm.cpuUsage.Collect(ch)
|
|
pm.memoryUsage.Collect(ch)
|
|
pm.memoryPercent.Collect(ch)
|
|
pm.uptime.Collect(ch)
|
|
pm.openFiles.Collect(ch)
|
|
pm.threadCount.Collect(ch)
|
|
pm.residentMemory.Collect(ch)
|
|
pm.virtualMemory.Collect(ch)
|
|
pm.readBytes.Collect(ch)
|
|
pm.writeBytes.Collect(ch)
|
|
}
|
|
|
|
func (e *Exporter) collectAndCacheMetrics() {
|
|
e.mutex.Lock()
|
|
defer e.mutex.Unlock()
|
|
|
|
if e.verbose {
|
|
processNames := []string{}
|
|
for _, proc := range e.config.Processes {
|
|
if proc.Pid != 0 {
|
|
processNames = append(processNames, fmt.Sprintf("%s (PID: %d)", proc.Name, proc.Pid))
|
|
} else {
|
|
processNames = append(processNames, proc.Name)
|
|
}
|
|
}
|
|
log.Printf("Collecting metrics for processes: %v", processNames)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
done := make(chan struct{})
|
|
|
|
go func() {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Printf("Warning: Metric collection timed out after 30 seconds")
|
|
return
|
|
case <-done:
|
|
return
|
|
}
|
|
}()
|
|
|
|
e.metrics.resetAll()
|
|
|
|
procs, err := process.Processes()
|
|
if err != nil {
|
|
log.Printf("Error getting processes: %v", err)
|
|
return
|
|
}
|
|
|
|
foundCounts := make(map[string]int)
|
|
|
|
for _, proc := range procs {
|
|
name, err := proc.Name()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
// Check if this process matches any of our configured processes
|
|
for _, procConfig := range e.config.Processes {
|
|
// Match by name
|
|
if procConfig.Name != "" && procConfig.Name == name {
|
|
foundCounts[procConfig.Name]++
|
|
e.collectProcessMetrics(proc, procConfig.Name)
|
|
if e.verbose {
|
|
log.Printf("Collected metrics for %s (PID: %d)", procConfig.Name, proc.Pid)
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Match by PID
|
|
if procConfig.Pid != 0 && int(proc.Pid) == procConfig.Pid {
|
|
// For PID-based monitoring, use the process name as the label
|
|
foundCounts[fmt.Sprintf("%s:%d", name, procConfig.Pid)]++
|
|
e.collectProcessMetrics(proc, fmt.Sprintf("%s:%d", name, procConfig.Pid))
|
|
if e.verbose {
|
|
log.Printf("Collected metrics for %s (PID: %d)", name, proc.Pid)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, procConfig := range e.config.Processes {
|
|
var procName string
|
|
if procConfig.Pid != 0 {
|
|
procName = fmt.Sprintf("%s:%d", procConfig.Name, procConfig.Pid)
|
|
} else {
|
|
procName = procConfig.Name
|
|
}
|
|
if e.verbose {
|
|
log.Printf("Found %d instances of %s", foundCounts[procName], procName)
|
|
}
|
|
}
|
|
|
|
close(done)
|
|
e.lastCollection = time.Now()
|
|
if e.verbose {
|
|
log.Printf("Metrics collection completed and cached")
|
|
}
|
|
}
|
|
|
|
func (pm *ProcessMetrics) resetAll() {
|
|
pm.cpuUsage.Reset()
|
|
pm.memoryUsage.Reset()
|
|
pm.memoryPercent.Reset()
|
|
pm.uptime.Reset()
|
|
pm.openFiles.Reset()
|
|
pm.threadCount.Reset()
|
|
pm.residentMemory.Reset()
|
|
pm.virtualMemory.Reset()
|
|
}
|
|
|
|
func (e *Exporter) collectProcessMetrics(proc *process.Process, procName string) {
|
|
pid := strconv.Itoa(int(proc.Pid))
|
|
|
|
if cpuPercent, err := proc.CPUPercent(); err == nil {
|
|
e.metrics.cpuUsage.WithLabelValues(procName, pid).Set(cpuPercent)
|
|
if e.verbose {
|
|
log.Printf(" CPU%%: %.2f%%", cpuPercent)
|
|
}
|
|
} else if e.verbose {
|
|
log.Printf(" CPU%%: error - %v", err)
|
|
}
|
|
|
|
if memInfo, err := proc.MemoryInfo(); err == nil {
|
|
e.metrics.memoryUsage.WithLabelValues(procName, pid).Set(float64(memInfo.RSS))
|
|
e.metrics.residentMemory.WithLabelValues(procName, pid).Set(float64(memInfo.RSS))
|
|
e.metrics.virtualMemory.WithLabelValues(procName, pid).Set(float64(memInfo.VMS))
|
|
if e.verbose {
|
|
log.Printf(" Memory: RSS=%d bytes, VMS=%d bytes", memInfo.RSS, memInfo.VMS)
|
|
}
|
|
} else if e.verbose {
|
|
log.Printf(" Memory: error - %v", err)
|
|
}
|
|
|
|
if memPercent, err := proc.MemoryPercent(); err == nil {
|
|
e.metrics.memoryPercent.WithLabelValues(procName, pid).Set(float64(memPercent))
|
|
if e.verbose {
|
|
log.Printf(" Memory%%: %.2f%%", memPercent)
|
|
}
|
|
} else if e.verbose {
|
|
log.Printf(" Memory%%: error - %v", err)
|
|
}
|
|
|
|
if createTime, err := proc.CreateTime(); err == nil {
|
|
uptimeSeconds := float64(time.Now().Unix() - int64(createTime/1000))
|
|
e.metrics.uptime.WithLabelValues(procName, pid).Set(uptimeSeconds)
|
|
if e.verbose {
|
|
log.Printf(" Uptime: %.0f seconds", uptimeSeconds)
|
|
}
|
|
} else if e.verbose {
|
|
log.Printf(" Uptime: error - %v", err)
|
|
}
|
|
|
|
if openFiles, err := proc.NumFDs(); err == nil {
|
|
e.metrics.openFiles.WithLabelValues(procName, pid).Set(float64(openFiles))
|
|
if e.verbose {
|
|
log.Printf(" Open files: %d", openFiles)
|
|
}
|
|
} else if e.verbose {
|
|
log.Printf(" Open files: error - %v", err)
|
|
}
|
|
|
|
if threadCount, err := proc.NumThreads(); err == nil {
|
|
e.metrics.threadCount.WithLabelValues(procName, pid).Set(float64(threadCount))
|
|
if e.verbose {
|
|
log.Printf(" Threads: %d", threadCount)
|
|
}
|
|
} else if e.verbose {
|
|
log.Printf(" Threads: error - %v", err)
|
|
}
|
|
|
|
if ioCounters, err := proc.IOCounters(); err == nil {
|
|
e.metrics.readBytes.WithLabelValues(procName, pid).Add(float64(ioCounters.ReadBytes))
|
|
e.metrics.writeBytes.WithLabelValues(procName, pid).Add(float64(ioCounters.WriteBytes))
|
|
if e.verbose {
|
|
log.Printf(" I/O: Read=%d bytes, Write=%d bytes", ioCounters.ReadBytes, ioCounters.WriteBytes)
|
|
}
|
|
} else if e.verbose {
|
|
log.Printf(" I/O: error - %v", err)
|
|
}
|
|
}
|
|
|
|
func readConfig(filename string, verbose bool) (Config, error) {
|
|
var config Config
|
|
|
|
paths := []string{
|
|
filename,
|
|
filepath.Join(".", filename),
|
|
filepath.Join("/etc", filename),
|
|
}
|
|
|
|
var configData []byte
|
|
var err error
|
|
|
|
for _, path := range paths {
|
|
configData, err = os.ReadFile(path)
|
|
if err == nil {
|
|
if verbose {
|
|
log.Printf("Using config file from: %s", path)
|
|
}
|
|
break
|
|
}
|
|
if !os.IsNotExist(err) {
|
|
return config, err
|
|
}
|
|
}
|
|
|
|
if configData == nil {
|
|
return config, os.ErrNotExist
|
|
}
|
|
|
|
// Try to parse as the new format first
|
|
var yamlConfig struct {
|
|
Processes []interface{} `yaml:"processes"`
|
|
Interval string `yaml:"interval"`
|
|
}
|
|
err = yaml.Unmarshal(configData, &yamlConfig)
|
|
if err != nil {
|
|
return config, fmt.Errorf("failed to parse YAML configuration: %v", err)
|
|
}
|
|
|
|
// Convert to our ProcessConfig format, handling both string and struct formats
|
|
for _, proc := range yamlConfig.Processes {
|
|
switch p := proc.(type) {
|
|
case string:
|
|
// Old format: just process name
|
|
config.Processes = append(config.Processes, ProcessConfig{
|
|
Name: p,
|
|
Pid: 0,
|
|
})
|
|
case map[interface{}]interface{}:
|
|
// New format: struct with name and/or pid
|
|
procConfig := ProcessConfig{}
|
|
if name, ok := p["name"].(string); ok {
|
|
procConfig.Name = name
|
|
}
|
|
if pid, ok := p["pid"].(int); ok {
|
|
procConfig.Pid = pid
|
|
} else if pid, ok := p["pid"].(float64); ok {
|
|
procConfig.Pid = int(pid)
|
|
}
|
|
config.Processes = append(config.Processes, procConfig)
|
|
default:
|
|
return config, fmt.Errorf("invalid process configuration format: %v", proc)
|
|
}
|
|
}
|
|
|
|
// Parse interval if specified in config
|
|
if yamlConfig.Interval != "" {
|
|
interval, err := time.ParseDuration(yamlConfig.Interval)
|
|
if err != nil {
|
|
return config, fmt.Errorf("failed to parse interval duration: %v", err)
|
|
}
|
|
config.Interval = interval
|
|
} else {
|
|
// Default to 10 seconds if not specified
|
|
config.Interval = 10 * time.Second
|
|
}
|
|
|
|
if verbose {
|
|
log.Printf("Parsed YAML configuration")
|
|
}
|
|
|
|
if len(config.Processes) == 0 {
|
|
return config, fmt.Errorf("config validation failed: no processes specified in config")
|
|
}
|
|
|
|
// Validate that each process has either a name or a pid
|
|
for i, proc := range config.Processes {
|
|
if proc.Name == "" && proc.Pid == 0 {
|
|
return config, fmt.Errorf("config validation failed: process at index %d must have either a name or a pid", i)
|
|
}
|
|
}
|
|
|
|
if verbose {
|
|
log.Printf("Loaded config with %d unique processes: %v", len(config.Processes), config.Processes)
|
|
}
|
|
|
|
return config, nil
|
|
}
|
|
|
|
func setupHTTPServer(listenAddr string, handler http.Handler, verbose bool) *http.Server {
|
|
return &http.Server{
|
|
Addr: listenAddr,
|
|
Handler: handler,
|
|
}
|
|
}
|
|
|
|
func createLoggingHandler(verbose bool) http.Handler {
|
|
if !verbose {
|
|
return http.DefaultServeMux
|
|
}
|
|
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
log.Printf("HTTP Request: %s %s from %s", r.Method, r.URL.Path, r.RemoteAddr)
|
|
http.DefaultServeMux.ServeHTTP(w, r)
|
|
})
|
|
}
|
|
|
|
func main() {
|
|
configFile := flag.String("c", "config.yml", "Path to configuration file")
|
|
listenAddr := flag.String("l", ":9064", "Address to listen on")
|
|
verbose := flag.Bool("v", false, "Enable verbose logging")
|
|
flag.Parse()
|
|
|
|
config, err := readConfig(*configFile, *verbose)
|
|
if err != nil {
|
|
log.Fatalf("Error reading config: %v", err)
|
|
}
|
|
|
|
reg := prometheus.NewRegistry()
|
|
exporter := NewExporter(config, *verbose)
|
|
exporter.collectAndCacheMetrics()
|
|
reg.MustRegister(exporter)
|
|
http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
|
|
|
|
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write([]byte("OK"))
|
|
})
|
|
|
|
handler := createLoggingHandler(*verbose)
|
|
|
|
// Set up periodic metrics collection if interval is specified
|
|
if config.Interval > 0 {
|
|
if *verbose {
|
|
log.Printf("Starting periodic metrics collection every %s", config.Interval)
|
|
}
|
|
go func() {
|
|
ticker := time.NewTicker(config.Interval)
|
|
defer ticker.Stop()
|
|
|
|
for range ticker.C {
|
|
exporter.collectAndCacheMetrics()
|
|
}
|
|
}()
|
|
} else {
|
|
if *verbose {
|
|
log.Printf("Metrics collection on demand only (no periodic collection)")
|
|
}
|
|
}
|
|
|
|
server := setupHTTPServer(*listenAddr, handler, *verbose)
|
|
|
|
shutdownChan := make(chan os.Signal, 1)
|
|
signal.Notify(shutdownChan, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
go func() {
|
|
log.Printf("Starting process exporter on %s", *listenAddr)
|
|
if *verbose {
|
|
log.Printf("Verbose logging enabled")
|
|
}
|
|
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
log.Fatalf("Server error: %v", err)
|
|
}
|
|
}()
|
|
|
|
<-shutdownChan
|
|
log.Printf("Shutdown signal received, shutting down gracefully...")
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
if err := server.Shutdown(ctx); err != nil {
|
|
log.Printf("Server shutdown error: %v", err)
|
|
}
|
|
|
|
log.Printf("Server stopped gracefully")
|
|
}
|