113 lines
2.7 KiB
Go
113 lines
2.7 KiB
Go
package collector
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
)
|
|
|
|
// NsqExecutor collects all NSQ metrics from the registered collectors.
|
|
// This type implements the prometheus.Collector interface and can be
|
|
// registered in the metrics collection.
|
|
//
|
|
// The executor takes the time needed for scraping nsqd stat endpoint and
|
|
// provides an extra metric for this. This metric is labeled with the
|
|
// scrape result ("success" or "error").
|
|
type NsqExecutor struct {
|
|
nsqdURL string
|
|
|
|
collectors []StatsCollector
|
|
summary *prometheus.SummaryVec
|
|
client *http.Client
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
// NewNsqExecutor creates a new executor for collecting NSQ metrics.
|
|
func NewNsqExecutor(namespace, nsqdURL, tlsCACert, tlsCert, tlsKey string) (*NsqExecutor, error) {
|
|
sum := prometheus.NewSummaryVec(prometheus.SummaryOpts{
|
|
Namespace: namespace,
|
|
Subsystem: "exporter",
|
|
Name: "scrape_duration_seconds",
|
|
Help: "Duration of a scrape job of the NSQ exporter",
|
|
}, []string{"result"})
|
|
prometheus.MustRegister(sum)
|
|
|
|
transport := &http.Transport{}
|
|
if tlsCert != "" && tlsKey != "" {
|
|
cert, err := tls.LoadX509KeyPair(tlsCert, tlsKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
caCertPool := x509.NewCertPool()
|
|
if tlsCACert != "" {
|
|
caCert, err := ioutil.ReadFile(tlsCACert)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
caCertPool.AppendCertsFromPEM(caCert)
|
|
}
|
|
tlsConfig := &tls.Config{
|
|
Certificates: []tls.Certificate{cert},
|
|
RootCAs: caCertPool,
|
|
}
|
|
tlsConfig.BuildNameToCertificate()
|
|
transport.TLSClientConfig = tlsConfig
|
|
}
|
|
return &NsqExecutor{
|
|
nsqdURL: nsqdURL,
|
|
summary: sum,
|
|
client: &http.Client{Transport: transport},
|
|
}, nil
|
|
}
|
|
|
|
// Use configures a specific stats collector, so the stats could be
|
|
// exposed to the Prometheus system.
|
|
func (e *NsqExecutor) Use(c StatsCollector) {
|
|
e.mutex.Lock()
|
|
defer e.mutex.Unlock()
|
|
e.collectors = append(e.collectors, c)
|
|
}
|
|
|
|
// Describe implements the prometheus.Collector interface.
|
|
func (e *NsqExecutor) Describe(ch chan<- *prometheus.Desc) {
|
|
for _, c := range e.collectors {
|
|
c.describe(ch)
|
|
}
|
|
}
|
|
|
|
// Collect implements the prometheus.Collector interface.
|
|
func (e *NsqExecutor) Collect(out chan<- prometheus.Metric) {
|
|
start := time.Now()
|
|
e.mutex.Lock()
|
|
defer e.mutex.Unlock()
|
|
|
|
// reset state, because metrics can gone
|
|
for _, c := range e.collectors {
|
|
c.reset()
|
|
}
|
|
|
|
stats, err := getNsqdStats(e.client, e.nsqdURL)
|
|
tScrape := time.Since(start).Seconds()
|
|
|
|
result := "success"
|
|
if err != nil {
|
|
result = "error"
|
|
}
|
|
|
|
e.summary.WithLabelValues(result).Observe(tScrape)
|
|
|
|
if err == nil {
|
|
for _, c := range e.collectors {
|
|
c.set(stats)
|
|
}
|
|
for _, c := range e.collectors {
|
|
c.collect(out)
|
|
}
|
|
}
|
|
}
|