Add TLS support for nsqd stats connections.
This commit is contained in:
@ -1,6 +1,10 @@
|
|||||||
package collector
|
package collector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -19,11 +23,12 @@ type NsqExecutor struct {
|
|||||||
|
|
||||||
collectors []StatsCollector
|
collectors []StatsCollector
|
||||||
summary *prometheus.SummaryVec
|
summary *prometheus.SummaryVec
|
||||||
|
client *http.Client
|
||||||
mutex sync.RWMutex
|
mutex sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNsqExecutor creates a new executor for collecting NSQ metrics.
|
// NewNsqExecutor creates a new executor for collecting NSQ metrics.
|
||||||
func NewNsqExecutor(namespace, nsqdURL string) *NsqExecutor {
|
func NewNsqExecutor(namespace, nsqdURL, tlsCACert, tlsCert, tlsKey string) (*NsqExecutor, error) {
|
||||||
sum := prometheus.NewSummaryVec(prometheus.SummaryOpts{
|
sum := prometheus.NewSummaryVec(prometheus.SummaryOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: "exporter",
|
Subsystem: "exporter",
|
||||||
@ -31,10 +36,33 @@ func NewNsqExecutor(namespace, nsqdURL string) *NsqExecutor {
|
|||||||
Help: "Duration of a scrape job of the NSQ exporter",
|
Help: "Duration of a scrape job of the NSQ exporter",
|
||||||
}, []string{"result"})
|
}, []string{"result"})
|
||||||
prometheus.MustRegister(sum)
|
prometheus.MustRegister(sum)
|
||||||
|
|
||||||
|
transport := &http.Transport{}
|
||||||
|
if tlsCert != "" && tlsKey != "" {
|
||||||
|
cert, err := tls.LoadX509KeyPair(tlsCert, tlsKey)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
caCertPool := x509.NewCertPool()
|
||||||
|
if tlsCACert != "" {
|
||||||
|
caCert, err := ioutil.ReadFile(tlsCACert)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
caCertPool.AppendCertsFromPEM(caCert)
|
||||||
|
}
|
||||||
|
tlsConfig := &tls.Config{
|
||||||
|
Certificates: []tls.Certificate{cert},
|
||||||
|
RootCAs: caCertPool,
|
||||||
|
}
|
||||||
|
tlsConfig.BuildNameToCertificate()
|
||||||
|
transport.TLSClientConfig = tlsConfig
|
||||||
|
}
|
||||||
return &NsqExecutor{
|
return &NsqExecutor{
|
||||||
nsqdURL: nsqdURL,
|
nsqdURL: nsqdURL,
|
||||||
summary: sum,
|
summary: sum,
|
||||||
}
|
client: &http.Client{Transport: transport},
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use configures a specific stats collector, so the stats could be
|
// Use configures a specific stats collector, so the stats could be
|
||||||
@ -63,7 +91,7 @@ func (e *NsqExecutor) Collect(out chan<- prometheus.Metric) {
|
|||||||
c.reset()
|
c.reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
stats, err := getNsqdStats(e.nsqdURL)
|
stats, err := getNsqdStats(e.client, e.nsqdURL)
|
||||||
tScrape := time.Since(start).Seconds()
|
tScrape := time.Since(start).Seconds()
|
||||||
|
|
||||||
result := "success"
|
result := "success"
|
||||||
|
@ -84,9 +84,8 @@ func getPercentile(t *topic, percentile int) float64 {
|
|||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func getNsqdStats(nsqdURL string) (*stats, error) {
|
func getNsqdStats(client *http.Client, nsqdURL string) (*stats, error) {
|
||||||
|
resp, err := client.Get(nsqdURL)
|
||||||
resp, err := http.Get(nsqdURL)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
9
main.go
9
main.go
@ -22,6 +22,9 @@ var (
|
|||||||
nsqdURL = flag.String("nsqd.addr", "http://localhost:4151/stats", "Address of the nsqd node.")
|
nsqdURL = flag.String("nsqd.addr", "http://localhost:4151/stats", "Address of the nsqd node.")
|
||||||
enabledCollectors = flag.String("collect", "stats.topics,stats.channels", "Comma-separated list of collectors to use.")
|
enabledCollectors = flag.String("collect", "stats.topics,stats.channels", "Comma-separated list of collectors to use.")
|
||||||
namespace = flag.String("namespace", "nsq", "Namespace for the NSQ metrics.")
|
namespace = flag.String("namespace", "nsq", "Namespace for the NSQ metrics.")
|
||||||
|
tlsCACert = flag.String("tls.ca_cert", "", "CA certificate file to be used for nsqd connections.")
|
||||||
|
tlsCert = flag.String("tls.cert", "", "TLS certificate file to be used for client connections to nsqd.")
|
||||||
|
tlsKey = flag.String("tls.key", "", "TLS key file to be used for TLS client connections to nsqd.")
|
||||||
|
|
||||||
statsRegistry = map[string]func(namespace string) collector.StatsCollector{
|
statsRegistry = map[string]func(namespace string) collector.StatsCollector{
|
||||||
"topics": collector.TopicStats,
|
"topics": collector.TopicStats,
|
||||||
@ -60,13 +63,15 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func createNsqExecutor() (*collector.NsqExecutor, error) {
|
func createNsqExecutor() (*collector.NsqExecutor, error) {
|
||||||
|
|
||||||
nsqdURL, err := normalizeURL(*nsqdURL)
|
nsqdURL, err := normalizeURL(*nsqdURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ex := collector.NewNsqExecutor(*namespace, nsqdURL)
|
ex, err := collector.NewNsqExecutor(*namespace, nsqdURL, *tlsCACert, *tlsCert, *tlsKey)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
for _, param := range strings.Split(*enabledCollectors, ",") {
|
for _, param := range strings.Split(*enabledCollectors, ",") {
|
||||||
param = strings.TrimSpace(param)
|
param = strings.TrimSpace(param)
|
||||||
parts := strings.SplitN(param, ".", 2)
|
parts := strings.SplitN(param, ".", 2)
|
||||||
|
Reference in New Issue
Block a user