Files
nsq_exporter/collector/executor.go
2017-11-29 08:27:55 -08:00

113 lines
2.7 KiB
Go

package collector
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"net/http"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
)
// NsqExecutor collects all NSQ metrics from the registered collectors.
// This type implements the prometheus.Collector interface and can be
// registered in the metrics collection.
//
// The executor takes the time needed for scraping nsqd stat endpoint and
// provides an extra metric for this. This metric is labeled with the
// scrape result ("success" or "error").
type NsqExecutor struct {
nsqdURL string
collectors []StatsCollector
summary *prometheus.SummaryVec
client *http.Client
mutex sync.RWMutex
}
// NewNsqExecutor creates a new executor for collecting NSQ metrics.
func NewNsqExecutor(namespace, nsqdURL, tlsCACert, tlsCert, tlsKey string) (*NsqExecutor, error) {
sum := prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: "exporter",
Name: "scrape_duration_seconds",
Help: "Duration of a scrape job of the NSQ exporter",
}, []string{"result"})
prometheus.MustRegister(sum)
transport := &http.Transport{}
if tlsCert != "" && tlsKey != "" {
cert, err := tls.LoadX509KeyPair(tlsCert, tlsKey)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
if tlsCACert != "" {
caCert, err := ioutil.ReadFile(tlsCACert)
if err != nil {
return nil, err
}
caCertPool.AppendCertsFromPEM(caCert)
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
tlsConfig.BuildNameToCertificate()
transport.TLSClientConfig = tlsConfig
}
return &NsqExecutor{
nsqdURL: nsqdURL,
summary: sum,
client: &http.Client{Transport: transport},
}, nil
}
// Use configures a specific stats collector, so the stats could be
// exposed to the Prometheus system.
func (e *NsqExecutor) Use(c StatsCollector) {
e.mutex.Lock()
defer e.mutex.Unlock()
e.collectors = append(e.collectors, c)
}
// Describe implements the prometheus.Collector interface.
func (e *NsqExecutor) Describe(ch chan<- *prometheus.Desc) {
for _, c := range e.collectors {
c.describe(ch)
}
}
// Collect implements the prometheus.Collector interface.
func (e *NsqExecutor) Collect(out chan<- prometheus.Metric) {
start := time.Now()
e.mutex.Lock()
defer e.mutex.Unlock()
// reset state, because metrics can gone
for _, c := range e.collectors {
c.reset()
}
stats, err := getNsqdStats(e.client, e.nsqdURL)
tScrape := time.Since(start).Seconds()
result := "success"
if err != nil {
result = "error"
}
e.summary.WithLabelValues(result).Observe(tScrape)
if err == nil {
for _, c := range e.collectors {
c.set(stats)
}
for _, c := range e.collectors {
c.collect(out)
}
}
}