commit 37b0428070d7f8133b19ac9bcd7130effe553886 Author: louis Date: Fri Dec 11 15:22:25 2015 +0100 initial commit diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..41a7bf9 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,12 @@ +FROM alpine:latest + +EXPOSE 9117 + +ENV GOPATH /go +ENV APPPATH $GOPATH/src/stash.lvint.de/it/nsq_exporter +COPY . $APPPATH +RUN apk add --update -t build-deps go git mercurial libc-dev gcc libgcc \ + && cd $APPPATH && go get -d && go build -o /nsq_exporter \ + && apk del --purge build-deps && rm -rf $GOPATH + +ENTRYPOINT ["/nsq_exporter"] diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..0f23042 --- /dev/null +++ b/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2015, LOVOO GmbH +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of LOVOO GmbH nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..7277720 --- /dev/null +++ b/README.md @@ -0,0 +1,26 @@ +# NSQ Exporter + +[![GoDoc](https://godoc.org/github.com/lovoo/nsq_exporter?status.svg)](https://godoc.org/github.com/lovoo/nsq_exporter) + +NSQ exporter for prometheus.io, written in go. + +## Usage + + docker run -d --name nsq_exporter -l nsqd:nsqd -p 9117:9117 lovoo/nsq_exporter:latest -nsq.addr=http://nsqd:4151 -collectors=nsqstats + +## Building + + go get -u github.com/lovoo/nsq_exporter + go install github.com/lovoo/nsq_exporter + +## TODO + +* collect all nsqd instances over nsqlookupd + +## Contributing + +1. Fork it! +2. Create your feature branch: `git checkout -b my-new-feature` +3. Commit your changes: `git commit -am 'Add some feature'` +4. Push to the branch: `git push origin my-new-feature` +5. Submit a pull request diff --git a/collector/collector.go b/collector/collector.go new file mode 100644 index 0000000..e4f879f --- /dev/null +++ b/collector/collector.go @@ -0,0 +1,8 @@ +package collector + +import "github.com/prometheus/client_golang/prometheus" + +// Collector defines the interface for collecting all metrics for Prometheus. +type Collector interface { + Collect(out chan<- prometheus.Metric) error +} diff --git a/collector/executor.go b/collector/executor.go new file mode 100644 index 0000000..f2c38bc --- /dev/null +++ b/collector/executor.go @@ -0,0 +1,73 @@ +package collector + +import ( + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +// NsqExecutor collects all NSQ metrics from the registered collectors. +// This type implements the prometheus.Collector interface and can be +// registered in the metrics collection. +// +// The executor takes the time needed by each registered collector and +// provides an extra metric for this. This metric is labeled with the +// result ("success" or "error") and the collector. +type NsqExecutor struct { + collectors map[string]Collector + summary *prometheus.SummaryVec +} + +// NewNsqExecutor creates a new executor for the NSQ metrics. +func NewNsqExecutor(namespace string) *NsqExecutor { + return &NsqExecutor{ + collectors: make(map[string]Collector), + summary: prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: "exporter", + Name: "scape_duration_seconds", + Help: "Duration of a scrape job of the NSQ exporter", + }, []string{"collector", "result"}), + } +} + +// AddCollector adds a new collector for the metrics collection. +// Each collector needs a unique name which is used as a label +// for the executor metric. +func (e *NsqExecutor) AddCollector(name string, c Collector) { + e.collectors[name] = c +} + +// Describe implements the prometheus.Collector interface. +func (e *NsqExecutor) Describe(out chan<- *prometheus.Desc) { + e.summary.Describe(out) +} + +// Collect implements the prometheus.Collector interface. +func (e *NsqExecutor) Collect(out chan<- prometheus.Metric) { + var wg sync.WaitGroup + wg.Add(len(e.collectors)) + for name, coll := range e.collectors { + go func(name string, coll Collector) { + e.exec(name, coll, out) + wg.Done() + }(name, coll) + } + wg.Wait() +} + +func (e *NsqExecutor) exec(name string, coll Collector, out chan<- prometheus.Metric) { + start := time.Now() + err := coll.Collect(out) + dur := time.Since(start) + + labels := prometheus.Labels{"collector": name} + if err != nil { + labels["result"] = "error" + } else { + labels["result"] = "success" + } + + e.summary.With(labels).Observe(dur.Seconds()) +} diff --git a/collector/stats.go b/collector/stats.go new file mode 100644 index 0000000..7515831 --- /dev/null +++ b/collector/stats.go @@ -0,0 +1,105 @@ +package collector + +import ( + "encoding/json" + "net/http" + + "github.com/prometheus/client_golang/prometheus" +) + +type statsResponse struct { + StatusCode int `json:"status_code"` + StatusText string `json:"status_text"` + Data stats `json:"data"` +} + +type stats struct { + Version string `json:"version"` + Health string `json:"health"` + StartTime int64 `json:"start_time"` + Topics []*topic `json:"topics"` +} + +type statsCollector struct { + nsqUrl string + + topicCount prometheus.Gauge + channelCount *prometheus.GaugeVec + clientCount *prometheus.GaugeVec + + topics topicCollector + channels channelCollector + clients clientCollector +} + +// NewStatsCollector create a collector which collects all NSQ metrics +// from the /stats route of the NSQ host. +func NewStatsCollector(nsqUrl string) Collector { + const namespace = "nsq" + return &statsCollector{ + nsqUrl: nsqUrl, + + topicCount: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "topics_total", + Help: "The total number of topics", + }), + channelCount: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "channels_total", + Help: "The total number of channels", + }, []string{"topic"}), + clientCount: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "channels_total", + Help: "The total number of channels", + }, []string{"topic", "channel"}), + + topics: newTopicCollector(namespace), + channels: newChannelCollector(namespace), + clients: newClientCollector(namespace), + } +} + +func (c *statsCollector) Collect(out chan<- prometheus.Metric) error { + s, err := c.fetchStats() + if err != nil { + return err + } + + c.topicCount.Set(float64(len(s.Topics))) + + for _, topic := range s.Topics { + c.channelCount.With(prometheus.Labels{ + "topic": topic.Name, + }).Set(float64(len(topic.Channels))) + + c.topics.update(topic, out) + for _, channel := range topic.Channels { + c.clientCount.With(prometheus.Labels{ + "topic": topic.Name, + "channel": channel.Name, + }).Set(float64(len(channel.Clients))) + + c.channels.update(topic.Name, channel, out) + for _, client := range channel.Clients { + c.clients.update(topic.Name, channel.Name, client, out) + } + } + } + return nil +} + +func (c *statsCollector) fetchStats() (*stats, error) { + resp, err := http.Get(c.nsqUrl) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var s statsResponse + if err = json.NewDecoder(resp.Body).Decode(&s); err != nil { + return nil, err + } + return &s.Data, nil +} diff --git a/collector/stats_channel.go b/collector/stats_channel.go new file mode 100644 index 0000000..133af21 --- /dev/null +++ b/collector/stats_channel.go @@ -0,0 +1,103 @@ +package collector + +import ( + "strconv" + + "github.com/prometheus/client_golang/prometheus" +) + +// see https://github.com/nsqio/nsq/blob/master/nsqd/stats.go +type channel struct { + Name string `json:"channel_name"` + Depth int64 `json:"depth"` + BackendDepth int64 `json:"backend_depth"` + InFlightCount int `json:"in_flight_count"` + DeferredCount int `json:"deferred_count"` + MessageCount uint64 `json:"message_count"` + RequeueCount uint64 `json:"requeue_count"` + TimeoutCount uint64 `json:"timeout_count"` + Clients []*client `json:"clients"` + Paused bool `json:"paused"` +} + +type channelCollector []struct { + val func(*channel) float64 + vec *prometheus.GaugeVec +} + +func newChannelCollector(namespace string) channelCollector { + labels := []string{"type", "topic", "channel", "paused"} + + return channelCollector{ + { + val: func(c *channel) float64 { return float64(c.Depth) }, + vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "depth", + Help: "Queue depth", + }, labels), + }, + { + val: func(c *channel) float64 { return float64(c.BackendDepth) }, + vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "backend_depth", + Help: "Queue backend depth", + }, labels), + }, + { + val: func(c *channel) float64 { return float64(c.MessageCount) }, + vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "message_count", + Help: "Queue message count", + }, labels), + }, + { + val: func(c *channel) float64 { return float64(c.InFlightCount) }, + vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "in_flight_count", + Help: "In flight count", + }, labels), + }, + { + val: func(c *channel) float64 { return float64(c.DeferredCount) }, + vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "deferred_count", + Help: "Deferred count", + }, labels), + }, + { + val: func(c *channel) float64 { return float64(c.RequeueCount) }, + vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "requeue_count", + Help: "Requeue Count", + }, labels), + }, + { + val: func(c *channel) float64 { return float64(c.TimeoutCount) }, + vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "timeout_count", + Help: "Timeout count", + }, labels), + }, + } +} + +func (c channelCollector) update(topic string, ch *channel, out chan<- prometheus.Metric) { + labels := prometheus.Labels{ + "type": "channel", + "topic": topic, + "channel": ch.Name, + "paused": strconv.FormatBool(ch.Paused), + } + + for _, g := range c { + g.vec.With(labels).Set(g.val(ch)) + g.vec.Collect(out) + } +} diff --git a/collector/stats_client.go b/collector/stats_client.go new file mode 100644 index 0000000..090fa7e --- /dev/null +++ b/collector/stats_client.go @@ -0,0 +1,131 @@ +package collector + +import ( + "strconv" + + "github.com/prometheus/client_golang/prometheus" +) + +// see https://github.com/nsqio/nsq/blob/master/nsqd/stats.go +type client struct { + ID string `json:"client_id"` + Hostname string `json:"hostname"` + Version string `json:"version"` + RemoteAddress string `json:"remote_address"` + State int32 `json:"state"` + ReadyCount int64 `json:"ready_count"` + InFlightCount int64 `json:"in_flight_count"` + MessageCount uint64 `json:"message_count"` + FinishCount uint64 `json:"finish_count"` + RequeueCount uint64 `json:"requeue_count"` + ConnectTime int64 `json:"connect_ts"` + SampleRate int32 `json:"sample_rate"` + Deflate bool `json:"deflate"` + Snappy bool `json:"snappy"` + UserAgent string `json:"user_agent"` + Authed bool `json:"authed,omitempty"` + AuthIdentity string `json:"auth_identity,omitempty"` + AuthIdentityURL string `json:"auth_identity_url,omitempty"` + TLS bool `json:"tls"` + CipherSuite string `json:"tls_cipher_suite"` + TLSVersion string `json:"tls_version"` + TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"` + TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"` +} + +type clientCollector []struct { + val func(*client) float64 + vec *prometheus.GaugeVec +} + +func newClientCollector(namespace string) clientCollector { + labels := []string{"type", "topic", "channel", "deflate", "snappy", "tls", "client_id", "hostname", "version", "remote_address"} + + return clientCollector{ + { + // TODO: Give state a descriptive name instead of a number. + val: func(c *client) float64 { return float64(c.State) }, + vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "state", + Help: "State of client", + }, labels), + }, + { + val: func(c *client) float64 { return float64(c.FinishCount) }, + vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "finish_count", + Help: "Finish count", + }, labels), + }, + { + val: func(c *client) float64 { return float64(c.MessageCount) }, + vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "message_count", + Help: "Queue message count", + }, labels), + }, + { + val: func(c *client) float64 { return float64(c.ReadyCount) }, + vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "ready_count", + Help: "Ready count", + }, labels), + }, + { + val: func(c *client) float64 { return float64(c.InFlightCount) }, + vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "in_flight_count", + Help: "In flight count", + }, labels), + }, + { + val: func(c *client) float64 { return float64(c.RequeueCount) }, + vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "requeue_count", + Help: "Requeue count", + }, labels), + }, + { + val: func(c *client) float64 { return float64(c.ConnectTime) }, + vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "connect_ts", + Help: "Connect timestamp", + }, labels), + }, + { + val: func(c *client) float64 { return float64(c.SampleRate) }, + vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "sample_rate", + Help: "Sample Rate", + }, labels), + }, + } +} + +func (c clientCollector) update(topic, channel string, cl *client, out chan<- prometheus.Metric) { + labels := prometheus.Labels{ + "type": "client", + "topic": topic, + "channel": channel, + "deflate": strconv.FormatBool(cl.Deflate), + "snappy": strconv.FormatBool(cl.Snappy), + "tls": strconv.FormatBool(cl.TLS), + "client_id": cl.ID, + "hostname": cl.Hostname, + "version": cl.Version, + "remote_address": cl.RemoteAddress, + } + + for _, g := range c { + g.vec.With(labels).Set(g.val(cl)) + g.vec.Collect(out) + } +} diff --git a/collector/stats_topic.go b/collector/stats_topic.go new file mode 100644 index 0000000..3c10e51 --- /dev/null +++ b/collector/stats_topic.go @@ -0,0 +1,66 @@ +package collector + +import ( + "strconv" + + "github.com/prometheus/client_golang/prometheus" +) + +// see https://github.com/nsqio/nsq/blob/master/nsqd/stats.go +type topic struct { + Name string `json:"topic_name"` + Channels []*channel `json:"channels"` + Depth int64 `json:"depth"` + BackendDepth int64 `json:"backend_depth"` + MessageCount uint64 `json:"message_count"` + Paused bool `json:"paused"` +} + +type topicCollector []struct { + val func(*topic) float64 + vec *prometheus.GaugeVec +} + +func newTopicCollector(namespace string) topicCollector { + labels := []string{"type", "topic", "paused"} + + return topicCollector{ + { + val: func(t *topic) float64 { return float64(t.Depth) }, + vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "depth", + Help: "Queue depth", + }, labels), + }, + { + val: func(t *topic) float64 { return float64(t.BackendDepth) }, + vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "backend_depth", + Help: "Queue backend depth", + }, labels), + }, + { + val: func(t *topic) float64 { return float64(t.MessageCount) }, + vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "message_count", + Help: "Queue message count", + }, labels), + }, + } +} + +func (c topicCollector) update(t *topic, out chan<- prometheus.Metric) { + labels := prometheus.Labels{ + "type": "topic", + "topic": t.Name, + "paused": strconv.FormatBool(t.Paused), + } + + for _, g := range c { + g.vec.With(labels).Set(g.val(t)) + g.vec.Collect(out) + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..345d2cc --- /dev/null +++ b/main.go @@ -0,0 +1,100 @@ +package main + +import ( + "flag" + "fmt" + "net/http" + "net/url" + "strings" + + "github.com/lovoo/nsq_exporter/collector" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/log" +) + +// Version of nsq_exporter. Set at build time. +const Version = "0.0.0.dev" + +var ( + listenAddress = flag.String("web.listen", ":9117", "Address on which to expose metrics and web interface.") + metricsPath = flag.String("web.path", "/metrics", "Path under which to expose metrics.") + nsqUrl = flag.String("nsq.addr", "http://localhost:4151/stats", "Address of the NSQ host.") + enabledCollectors = flag.String("collectors", "nsqstats", "Comma-separated list of collectors to use.") + namespace = flag.String("namespace", "nsq", "Namespace for the NSQ metrics.") + + collectorRegistry = map[string]func(name string, x *collector.NsqExecutor) error{ + "nsqstats": addStatsCollector, + } +) + +func main() { + flag.Parse() + + ex, err := createNsqExecutor() + if err != nil { + log.Fatalf("error creating nsq executor: %v", err) + } + prometheus.MustRegister(ex) + + handler := prometheus.Handler() + if *metricsPath == "" || *metricsPath == "/" { + http.Handle(*metricsPath, handler) + } else { + http.Handle(*metricsPath, handler) + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(` + NSQ Exporter + +

NSQ Exporter

+

Metrics

+ + `)) + }) + } + + err = http.ListenAndServe(*listenAddress, nil) + if err != nil { + log.Fatal(err) + } +} + +func createNsqExecutor() (*collector.NsqExecutor, error) { + ex := collector.NewNsqExecutor(*namespace) + for _, name := range strings.Split(*enabledCollectors, ",") { + name = strings.TrimSpace(name) + addCollector, has := collectorRegistry[name] + if !has { + return nil, fmt.Errorf("unknown collector: %s", name) + } + + if err := addCollector(name, ex); err != nil { + return nil, err + } + } + return ex, nil +} + +func addStatsCollector(name string, ex *collector.NsqExecutor) error { + u, err := url.Parse(normalizeURL(*nsqUrl)) + if err != nil { + return err + } + if u.Scheme == "" { + u.Scheme = "http" + } + if u.Path == "" { + u.Path = "/stats" + } + u.RawQuery = "format=json" + ex.AddCollector(name, collector.NewStatsCollector(u.String())) + return nil +} + +func normalizeURL(u string) string { + u = strings.ToLower(u) + if !strings.HasPrefix(u, "https://") && !strings.HasPrefix(u, "http://") { + return "http://" + u + } + return u +}