Files
tchivert 021991a5d9
build / build (push) Successful in 1m19s
add pid configuration for each process
2026-01-09 15:06:09 +01:00

500 lines
13 KiB
Go

package main
import (
"context"
"flag"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"path/filepath"
"strconv"
"sync"
"syscall"
"time"
"gopkg.in/yaml.v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/shirou/gopsutil/v3/process"
)
type ProcessConfig struct {
Name string `yaml:"name"`
Pid int `yaml:"pid"`
}
type Config struct {
Processes []ProcessConfig `yaml:"processes"`
Interval time.Duration `yaml:"interval"`
}
type Exporter struct {
config Config
metrics *ProcessMetrics
lastCollection time.Time
mutex sync.Mutex
verbose bool
}
type ProcessMetrics struct {
cpuUsage *prometheus.GaugeVec
memoryUsage *prometheus.GaugeVec
memoryPercent *prometheus.GaugeVec
uptime *prometheus.GaugeVec
openFiles *prometheus.GaugeVec
threadCount *prometheus.GaugeVec
residentMemory *prometheus.GaugeVec
virtualMemory *prometheus.GaugeVec
readBytes *prometheus.CounterVec
writeBytes *prometheus.CounterVec
}
func NewExporter(config Config, verbose bool) *Exporter {
return &Exporter{
config: config,
metrics: newProcessMetrics(),
lastCollection: time.Now(),
verbose: verbose,
}
}
func newProcessMetrics() *ProcessMetrics {
return &ProcessMetrics{
cpuUsage: createGaugeVec("process_cpu_usage_percent", "CPU usage percentage of the process"),
memoryUsage: createGaugeVec("process_memory_usage_bytes", "Memory usage in bytes of the process"),
memoryPercent: createGaugeVec("process_memory_percent", "Memory usage percentage of the process"),
uptime: createGaugeVec("process_uptime_seconds", "Uptime of the process in seconds"),
openFiles: createGaugeVec("process_open_files", "Number of open files by the process"),
threadCount: createGaugeVec("process_thread_count", "Number of threads used by the process"),
residentMemory: createGaugeVec("process_resident_memory_bytes", "Resident memory size in bytes"),
virtualMemory: createGaugeVec("process_virtual_memory_bytes", "Virtual memory size in bytes"),
readBytes: createCounterVec("process_read_bytes_total", "Total number of bytes read by the process"),
writeBytes: createCounterVec("process_write_bytes_total", "Total number of bytes written by the process"),
}
}
func createGaugeVec(name, help string) *prometheus.GaugeVec {
return prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: name,
Help: help,
},
[]string{"process", "pid"},
)
}
func createCounterVec(name, help string) *prometheus.CounterVec {
return prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: name,
Help: help,
},
[]string{"process", "pid"},
)
}
func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
e.metrics.describeAll(ch)
}
func (pm *ProcessMetrics) describeAll(ch chan<- *prometheus.Desc) {
pm.cpuUsage.Describe(ch)
pm.memoryUsage.Describe(ch)
pm.memoryPercent.Describe(ch)
pm.uptime.Describe(ch)
pm.openFiles.Describe(ch)
pm.threadCount.Describe(ch)
pm.residentMemory.Describe(ch)
pm.virtualMemory.Describe(ch)
pm.readBytes.Describe(ch)
pm.writeBytes.Describe(ch)
}
func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
if e.verbose {
log.Printf("Serving metrics from cache (last collected: %s)", e.lastCollection.Format("15:04:05"))
}
e.metrics.collectAll(ch)
if e.verbose {
log.Printf("Metrics served successfully")
}
}
func (pm *ProcessMetrics) collectAll(ch chan<- prometheus.Metric) {
pm.cpuUsage.Collect(ch)
pm.memoryUsage.Collect(ch)
pm.memoryPercent.Collect(ch)
pm.uptime.Collect(ch)
pm.openFiles.Collect(ch)
pm.threadCount.Collect(ch)
pm.residentMemory.Collect(ch)
pm.virtualMemory.Collect(ch)
pm.readBytes.Collect(ch)
pm.writeBytes.Collect(ch)
}
func (e *Exporter) collectAndCacheMetrics() {
e.mutex.Lock()
defer e.mutex.Unlock()
if e.verbose {
processNames := []string{}
for _, proc := range e.config.Processes {
if proc.Pid != 0 {
processNames = append(processNames, fmt.Sprintf("%s (PID: %d)", proc.Name, proc.Pid))
} else {
processNames = append(processNames, proc.Name)
}
}
log.Printf("Collecting metrics for processes: %v", processNames)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
select {
case <-ctx.Done():
log.Printf("Warning: Metric collection timed out after 30 seconds")
return
case <-done:
return
}
}()
e.metrics.resetAll()
procs, err := process.Processes()
if err != nil {
log.Printf("Error getting processes: %v", err)
return
}
foundCounts := make(map[string]int)
for _, proc := range procs {
name, err := proc.Name()
if err != nil {
continue
}
// Check if this process matches any of our configured processes
for _, procConfig := range e.config.Processes {
// Match by name
if procConfig.Name != "" && procConfig.Name == name {
foundCounts[procConfig.Name]++
e.collectProcessMetrics(proc, procConfig.Name)
if e.verbose {
log.Printf("Collected metrics for %s (PID: %d)", procConfig.Name, proc.Pid)
}
continue
}
// Match by PID
if procConfig.Pid != 0 && int(proc.Pid) == procConfig.Pid {
// For PID-based monitoring, use the process name as the label
foundCounts[fmt.Sprintf("%s:%d", name, procConfig.Pid)]++
e.collectProcessMetrics(proc, fmt.Sprintf("%s:%d", name, procConfig.Pid))
if e.verbose {
log.Printf("Collected metrics for %s (PID: %d)", name, proc.Pid)
}
}
}
}
for _, procConfig := range e.config.Processes {
var procName string
if procConfig.Pid != 0 {
procName = fmt.Sprintf("%s:%d", procConfig.Name, procConfig.Pid)
} else {
procName = procConfig.Name
}
if e.verbose {
log.Printf("Found %d instances of %s", foundCounts[procName], procName)
}
}
close(done)
e.lastCollection = time.Now()
if e.verbose {
log.Printf("Metrics collection completed and cached")
}
}
func (pm *ProcessMetrics) resetAll() {
pm.cpuUsage.Reset()
pm.memoryUsage.Reset()
pm.memoryPercent.Reset()
pm.uptime.Reset()
pm.openFiles.Reset()
pm.threadCount.Reset()
pm.residentMemory.Reset()
pm.virtualMemory.Reset()
}
func (e *Exporter) collectProcessMetrics(proc *process.Process, procName string) {
pid := strconv.Itoa(int(proc.Pid))
if cpuPercent, err := proc.CPUPercent(); err == nil {
e.metrics.cpuUsage.WithLabelValues(procName, pid).Set(cpuPercent)
if e.verbose {
log.Printf(" CPU%%: %.2f%%", cpuPercent)
}
} else if e.verbose {
log.Printf(" CPU%%: error - %v", err)
}
if memInfo, err := proc.MemoryInfo(); err == nil {
e.metrics.memoryUsage.WithLabelValues(procName, pid).Set(float64(memInfo.RSS))
e.metrics.residentMemory.WithLabelValues(procName, pid).Set(float64(memInfo.RSS))
e.metrics.virtualMemory.WithLabelValues(procName, pid).Set(float64(memInfo.VMS))
if e.verbose {
log.Printf(" Memory: RSS=%d bytes, VMS=%d bytes", memInfo.RSS, memInfo.VMS)
}
} else if e.verbose {
log.Printf(" Memory: error - %v", err)
}
if memPercent, err := proc.MemoryPercent(); err == nil {
e.metrics.memoryPercent.WithLabelValues(procName, pid).Set(float64(memPercent))
if e.verbose {
log.Printf(" Memory%%: %.2f%%", memPercent)
}
} else if e.verbose {
log.Printf(" Memory%%: error - %v", err)
}
if createTime, err := proc.CreateTime(); err == nil {
uptimeSeconds := float64(time.Now().Unix() - int64(createTime/1000))
e.metrics.uptime.WithLabelValues(procName, pid).Set(uptimeSeconds)
if e.verbose {
log.Printf(" Uptime: %.0f seconds", uptimeSeconds)
}
} else if e.verbose {
log.Printf(" Uptime: error - %v", err)
}
if openFiles, err := proc.NumFDs(); err == nil {
e.metrics.openFiles.WithLabelValues(procName, pid).Set(float64(openFiles))
if e.verbose {
log.Printf(" Open files: %d", openFiles)
}
} else if e.verbose {
log.Printf(" Open files: error - %v", err)
}
if threadCount, err := proc.NumThreads(); err == nil {
e.metrics.threadCount.WithLabelValues(procName, pid).Set(float64(threadCount))
if e.verbose {
log.Printf(" Threads: %d", threadCount)
}
} else if e.verbose {
log.Printf(" Threads: error - %v", err)
}
if ioCounters, err := proc.IOCounters(); err == nil {
e.metrics.readBytes.WithLabelValues(procName, pid).Add(float64(ioCounters.ReadBytes))
e.metrics.writeBytes.WithLabelValues(procName, pid).Add(float64(ioCounters.WriteBytes))
if e.verbose {
log.Printf(" I/O: Read=%d bytes, Write=%d bytes", ioCounters.ReadBytes, ioCounters.WriteBytes)
}
} else if e.verbose {
log.Printf(" I/O: error - %v", err)
}
}
func readConfig(filename string, verbose bool) (Config, error) {
var config Config
paths := []string{
filename,
filepath.Join(".", filename),
filepath.Join("/etc", filename),
}
var configData []byte
var err error
for _, path := range paths {
configData, err = os.ReadFile(path)
if err == nil {
if verbose {
log.Printf("Using config file from: %s", path)
}
break
}
if !os.IsNotExist(err) {
return config, err
}
}
if configData == nil {
return config, os.ErrNotExist
}
// Try to parse as the new format first
var yamlConfig struct {
Processes []interface{} `yaml:"processes"`
Interval string `yaml:"interval"`
}
err = yaml.Unmarshal(configData, &yamlConfig)
if err != nil {
return config, fmt.Errorf("failed to parse YAML configuration: %v", err)
}
// Convert to our ProcessConfig format, handling both string and struct formats
for _, proc := range yamlConfig.Processes {
switch p := proc.(type) {
case string:
// Old format: just process name
config.Processes = append(config.Processes, ProcessConfig{
Name: p,
Pid: 0,
})
case map[interface{}]interface{}:
// New format: struct with name and/or pid
procConfig := ProcessConfig{}
if name, ok := p["name"].(string); ok {
procConfig.Name = name
}
if pid, ok := p["pid"].(int); ok {
procConfig.Pid = pid
} else if pid, ok := p["pid"].(float64); ok {
procConfig.Pid = int(pid)
}
config.Processes = append(config.Processes, procConfig)
default:
return config, fmt.Errorf("invalid process configuration format: %v", proc)
}
}
// Parse interval if specified in config
if yamlConfig.Interval != "" {
interval, err := time.ParseDuration(yamlConfig.Interval)
if err != nil {
return config, fmt.Errorf("failed to parse interval duration: %v", err)
}
config.Interval = interval
} else {
// Default to 10 seconds if not specified
config.Interval = 10 * time.Second
}
if verbose {
log.Printf("Parsed YAML configuration")
}
if len(config.Processes) == 0 {
return config, fmt.Errorf("config validation failed: no processes specified in config")
}
// Validate that each process has either a name or a pid
for i, proc := range config.Processes {
if proc.Name == "" && proc.Pid == 0 {
return config, fmt.Errorf("config validation failed: process at index %d must have either a name or a pid", i)
}
}
if verbose {
log.Printf("Loaded config with %d unique processes: %v", len(config.Processes), config.Processes)
}
return config, nil
}
func setupHTTPServer(listenAddr string, handler http.Handler, verbose bool) *http.Server {
return &http.Server{
Addr: listenAddr,
Handler: handler,
}
}
func createLoggingHandler(verbose bool) http.Handler {
if !verbose {
return http.DefaultServeMux
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Printf("HTTP Request: %s %s from %s", r.Method, r.URL.Path, r.RemoteAddr)
http.DefaultServeMux.ServeHTTP(w, r)
})
}
func main() {
configFile := flag.String("c", "config.yml", "Path to configuration file")
listenAddr := flag.String("l", ":9064", "Address to listen on")
verbose := flag.Bool("v", false, "Enable verbose logging")
flag.Parse()
config, err := readConfig(*configFile, *verbose)
if err != nil {
log.Fatalf("Error reading config: %v", err)
}
reg := prometheus.NewRegistry()
exporter := NewExporter(config, *verbose)
exporter.collectAndCacheMetrics()
reg.MustRegister(exporter)
http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
})
handler := createLoggingHandler(*verbose)
// Set up periodic metrics collection if interval is specified
if config.Interval > 0 {
if *verbose {
log.Printf("Starting periodic metrics collection every %s", config.Interval)
}
go func() {
ticker := time.NewTicker(config.Interval)
defer ticker.Stop()
for range ticker.C {
exporter.collectAndCacheMetrics()
}
}()
} else {
if *verbose {
log.Printf("Metrics collection on demand only (no periodic collection)")
}
}
server := setupHTTPServer(*listenAddr, handler, *verbose)
shutdownChan := make(chan os.Signal, 1)
signal.Notify(shutdownChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
log.Printf("Starting process exporter on %s", *listenAddr)
if *verbose {
log.Printf("Verbose logging enabled")
}
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Server error: %v", err)
}
}()
<-shutdownChan
log.Printf("Shutdown signal received, shutting down gracefully...")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
log.Printf("Server shutdown error: %v", err)
}
log.Printf("Server stopped gracefully")
}