initial commit

This commit is contained in:
louis
2015-12-11 15:22:25 +01:00
commit 37b0428070
10 changed files with 651 additions and 0 deletions

12
Dockerfile Normal file
View File

@ -0,0 +1,12 @@
FROM alpine:latest
EXPOSE 9117
ENV GOPATH /go
ENV APPPATH $GOPATH/src/stash.lvint.de/it/nsq_exporter
COPY . $APPPATH
RUN apk add --update -t build-deps go git mercurial libc-dev gcc libgcc \
&& cd $APPPATH && go get -d && go build -o /nsq_exporter \
&& apk del --purge build-deps && rm -rf $GOPATH
ENTRYPOINT ["/nsq_exporter"]

27
LICENSE Normal file
View File

@ -0,0 +1,27 @@
Copyright (c) 2015, LOVOO GmbH
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of LOVOO GmbH nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

26
README.md Normal file
View File

@ -0,0 +1,26 @@
# NSQ Exporter
[![GoDoc](https://godoc.org/github.com/lovoo/nsq_exporter?status.svg)](https://godoc.org/github.com/lovoo/nsq_exporter)
NSQ exporter for prometheus.io, written in go.
## Usage
docker run -d --name nsq_exporter -l nsqd:nsqd -p 9117:9117 lovoo/nsq_exporter:latest -nsq.addr=http://nsqd:4151 -collectors=nsqstats
## Building
go get -u github.com/lovoo/nsq_exporter
go install github.com/lovoo/nsq_exporter
## TODO
* collect all nsqd instances over nsqlookupd
## Contributing
1. Fork it!
2. Create your feature branch: `git checkout -b my-new-feature`
3. Commit your changes: `git commit -am 'Add some feature'`
4. Push to the branch: `git push origin my-new-feature`
5. Submit a pull request

8
collector/collector.go Normal file
View File

@ -0,0 +1,8 @@
package collector
import "github.com/prometheus/client_golang/prometheus"
// Collector defines the interface for collecting all metrics for Prometheus.
type Collector interface {
Collect(out chan<- prometheus.Metric) error
}

73
collector/executor.go Normal file
View File

@ -0,0 +1,73 @@
package collector
import (
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
)
// NsqExecutor collects all NSQ metrics from the registered collectors.
// This type implements the prometheus.Collector interface and can be
// registered in the metrics collection.
//
// The executor takes the time needed by each registered collector and
// provides an extra metric for this. This metric is labeled with the
// result ("success" or "error") and the collector.
type NsqExecutor struct {
collectors map[string]Collector
summary *prometheus.SummaryVec
}
// NewNsqExecutor creates a new executor for the NSQ metrics.
func NewNsqExecutor(namespace string) *NsqExecutor {
return &NsqExecutor{
collectors: make(map[string]Collector),
summary: prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: "exporter",
Name: "scape_duration_seconds",
Help: "Duration of a scrape job of the NSQ exporter",
}, []string{"collector", "result"}),
}
}
// AddCollector adds a new collector for the metrics collection.
// Each collector needs a unique name which is used as a label
// for the executor metric.
func (e *NsqExecutor) AddCollector(name string, c Collector) {
e.collectors[name] = c
}
// Describe implements the prometheus.Collector interface.
func (e *NsqExecutor) Describe(out chan<- *prometheus.Desc) {
e.summary.Describe(out)
}
// Collect implements the prometheus.Collector interface.
func (e *NsqExecutor) Collect(out chan<- prometheus.Metric) {
var wg sync.WaitGroup
wg.Add(len(e.collectors))
for name, coll := range e.collectors {
go func(name string, coll Collector) {
e.exec(name, coll, out)
wg.Done()
}(name, coll)
}
wg.Wait()
}
func (e *NsqExecutor) exec(name string, coll Collector, out chan<- prometheus.Metric) {
start := time.Now()
err := coll.Collect(out)
dur := time.Since(start)
labels := prometheus.Labels{"collector": name}
if err != nil {
labels["result"] = "error"
} else {
labels["result"] = "success"
}
e.summary.With(labels).Observe(dur.Seconds())
}

105
collector/stats.go Normal file
View File

@ -0,0 +1,105 @@
package collector
import (
"encoding/json"
"net/http"
"github.com/prometheus/client_golang/prometheus"
)
type statsResponse struct {
StatusCode int `json:"status_code"`
StatusText string `json:"status_text"`
Data stats `json:"data"`
}
type stats struct {
Version string `json:"version"`
Health string `json:"health"`
StartTime int64 `json:"start_time"`
Topics []*topic `json:"topics"`
}
type statsCollector struct {
nsqUrl string
topicCount prometheus.Gauge
channelCount *prometheus.GaugeVec
clientCount *prometheus.GaugeVec
topics topicCollector
channels channelCollector
clients clientCollector
}
// NewStatsCollector create a collector which collects all NSQ metrics
// from the /stats route of the NSQ host.
func NewStatsCollector(nsqUrl string) Collector {
const namespace = "nsq"
return &statsCollector{
nsqUrl: nsqUrl,
topicCount: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "topics_total",
Help: "The total number of topics",
}),
channelCount: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "channels_total",
Help: "The total number of channels",
}, []string{"topic"}),
clientCount: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "channels_total",
Help: "The total number of channels",
}, []string{"topic", "channel"}),
topics: newTopicCollector(namespace),
channels: newChannelCollector(namespace),
clients: newClientCollector(namespace),
}
}
func (c *statsCollector) Collect(out chan<- prometheus.Metric) error {
s, err := c.fetchStats()
if err != nil {
return err
}
c.topicCount.Set(float64(len(s.Topics)))
for _, topic := range s.Topics {
c.channelCount.With(prometheus.Labels{
"topic": topic.Name,
}).Set(float64(len(topic.Channels)))
c.topics.update(topic, out)
for _, channel := range topic.Channels {
c.clientCount.With(prometheus.Labels{
"topic": topic.Name,
"channel": channel.Name,
}).Set(float64(len(channel.Clients)))
c.channels.update(topic.Name, channel, out)
for _, client := range channel.Clients {
c.clients.update(topic.Name, channel.Name, client, out)
}
}
}
return nil
}
func (c *statsCollector) fetchStats() (*stats, error) {
resp, err := http.Get(c.nsqUrl)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var s statsResponse
if err = json.NewDecoder(resp.Body).Decode(&s); err != nil {
return nil, err
}
return &s.Data, nil
}

103
collector/stats_channel.go Normal file
View File

@ -0,0 +1,103 @@
package collector
import (
"strconv"
"github.com/prometheus/client_golang/prometheus"
)
// see https://github.com/nsqio/nsq/blob/master/nsqd/stats.go
type channel struct {
Name string `json:"channel_name"`
Depth int64 `json:"depth"`
BackendDepth int64 `json:"backend_depth"`
InFlightCount int `json:"in_flight_count"`
DeferredCount int `json:"deferred_count"`
MessageCount uint64 `json:"message_count"`
RequeueCount uint64 `json:"requeue_count"`
TimeoutCount uint64 `json:"timeout_count"`
Clients []*client `json:"clients"`
Paused bool `json:"paused"`
}
type channelCollector []struct {
val func(*channel) float64
vec *prometheus.GaugeVec
}
func newChannelCollector(namespace string) channelCollector {
labels := []string{"type", "topic", "channel", "paused"}
return channelCollector{
{
val: func(c *channel) float64 { return float64(c.Depth) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "depth",
Help: "Queue depth",
}, labels),
},
{
val: func(c *channel) float64 { return float64(c.BackendDepth) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "backend_depth",
Help: "Queue backend depth",
}, labels),
},
{
val: func(c *channel) float64 { return float64(c.MessageCount) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "message_count",
Help: "Queue message count",
}, labels),
},
{
val: func(c *channel) float64 { return float64(c.InFlightCount) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "in_flight_count",
Help: "In flight count",
}, labels),
},
{
val: func(c *channel) float64 { return float64(c.DeferredCount) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "deferred_count",
Help: "Deferred count",
}, labels),
},
{
val: func(c *channel) float64 { return float64(c.RequeueCount) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "requeue_count",
Help: "Requeue Count",
}, labels),
},
{
val: func(c *channel) float64 { return float64(c.TimeoutCount) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "timeout_count",
Help: "Timeout count",
}, labels),
},
}
}
func (c channelCollector) update(topic string, ch *channel, out chan<- prometheus.Metric) {
labels := prometheus.Labels{
"type": "channel",
"topic": topic,
"channel": ch.Name,
"paused": strconv.FormatBool(ch.Paused),
}
for _, g := range c {
g.vec.With(labels).Set(g.val(ch))
g.vec.Collect(out)
}
}

131
collector/stats_client.go Normal file
View File

@ -0,0 +1,131 @@
package collector
import (
"strconv"
"github.com/prometheus/client_golang/prometheus"
)
// see https://github.com/nsqio/nsq/blob/master/nsqd/stats.go
type client struct {
ID string `json:"client_id"`
Hostname string `json:"hostname"`
Version string `json:"version"`
RemoteAddress string `json:"remote_address"`
State int32 `json:"state"`
ReadyCount int64 `json:"ready_count"`
InFlightCount int64 `json:"in_flight_count"`
MessageCount uint64 `json:"message_count"`
FinishCount uint64 `json:"finish_count"`
RequeueCount uint64 `json:"requeue_count"`
ConnectTime int64 `json:"connect_ts"`
SampleRate int32 `json:"sample_rate"`
Deflate bool `json:"deflate"`
Snappy bool `json:"snappy"`
UserAgent string `json:"user_agent"`
Authed bool `json:"authed,omitempty"`
AuthIdentity string `json:"auth_identity,omitempty"`
AuthIdentityURL string `json:"auth_identity_url,omitempty"`
TLS bool `json:"tls"`
CipherSuite string `json:"tls_cipher_suite"`
TLSVersion string `json:"tls_version"`
TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"`
TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"`
}
type clientCollector []struct {
val func(*client) float64
vec *prometheus.GaugeVec
}
func newClientCollector(namespace string) clientCollector {
labels := []string{"type", "topic", "channel", "deflate", "snappy", "tls", "client_id", "hostname", "version", "remote_address"}
return clientCollector{
{
// TODO: Give state a descriptive name instead of a number.
val: func(c *client) float64 { return float64(c.State) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "state",
Help: "State of client",
}, labels),
},
{
val: func(c *client) float64 { return float64(c.FinishCount) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "finish_count",
Help: "Finish count",
}, labels),
},
{
val: func(c *client) float64 { return float64(c.MessageCount) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "message_count",
Help: "Queue message count",
}, labels),
},
{
val: func(c *client) float64 { return float64(c.ReadyCount) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "ready_count",
Help: "Ready count",
}, labels),
},
{
val: func(c *client) float64 { return float64(c.InFlightCount) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "in_flight_count",
Help: "In flight count",
}, labels),
},
{
val: func(c *client) float64 { return float64(c.RequeueCount) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "requeue_count",
Help: "Requeue count",
}, labels),
},
{
val: func(c *client) float64 { return float64(c.ConnectTime) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "connect_ts",
Help: "Connect timestamp",
}, labels),
},
{
val: func(c *client) float64 { return float64(c.SampleRate) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "sample_rate",
Help: "Sample Rate",
}, labels),
},
}
}
func (c clientCollector) update(topic, channel string, cl *client, out chan<- prometheus.Metric) {
labels := prometheus.Labels{
"type": "client",
"topic": topic,
"channel": channel,
"deflate": strconv.FormatBool(cl.Deflate),
"snappy": strconv.FormatBool(cl.Snappy),
"tls": strconv.FormatBool(cl.TLS),
"client_id": cl.ID,
"hostname": cl.Hostname,
"version": cl.Version,
"remote_address": cl.RemoteAddress,
}
for _, g := range c {
g.vec.With(labels).Set(g.val(cl))
g.vec.Collect(out)
}
}

66
collector/stats_topic.go Normal file
View File

@ -0,0 +1,66 @@
package collector
import (
"strconv"
"github.com/prometheus/client_golang/prometheus"
)
// see https://github.com/nsqio/nsq/blob/master/nsqd/stats.go
type topic struct {
Name string `json:"topic_name"`
Channels []*channel `json:"channels"`
Depth int64 `json:"depth"`
BackendDepth int64 `json:"backend_depth"`
MessageCount uint64 `json:"message_count"`
Paused bool `json:"paused"`
}
type topicCollector []struct {
val func(*topic) float64
vec *prometheus.GaugeVec
}
func newTopicCollector(namespace string) topicCollector {
labels := []string{"type", "topic", "paused"}
return topicCollector{
{
val: func(t *topic) float64 { return float64(t.Depth) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "depth",
Help: "Queue depth",
}, labels),
},
{
val: func(t *topic) float64 { return float64(t.BackendDepth) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "backend_depth",
Help: "Queue backend depth",
}, labels),
},
{
val: func(t *topic) float64 { return float64(t.MessageCount) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "message_count",
Help: "Queue message count",
}, labels),
},
}
}
func (c topicCollector) update(t *topic, out chan<- prometheus.Metric) {
labels := prometheus.Labels{
"type": "topic",
"topic": t.Name,
"paused": strconv.FormatBool(t.Paused),
}
for _, g := range c {
g.vec.With(labels).Set(g.val(t))
g.vec.Collect(out)
}
}

100
main.go Normal file
View File

@ -0,0 +1,100 @@
package main
import (
"flag"
"fmt"
"net/http"
"net/url"
"strings"
"github.com/lovoo/nsq_exporter/collector"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
)
// Version of nsq_exporter. Set at build time.
const Version = "0.0.0.dev"
var (
listenAddress = flag.String("web.listen", ":9117", "Address on which to expose metrics and web interface.")
metricsPath = flag.String("web.path", "/metrics", "Path under which to expose metrics.")
nsqUrl = flag.String("nsq.addr", "http://localhost:4151/stats", "Address of the NSQ host.")
enabledCollectors = flag.String("collectors", "nsqstats", "Comma-separated list of collectors to use.")
namespace = flag.String("namespace", "nsq", "Namespace for the NSQ metrics.")
collectorRegistry = map[string]func(name string, x *collector.NsqExecutor) error{
"nsqstats": addStatsCollector,
}
)
func main() {
flag.Parse()
ex, err := createNsqExecutor()
if err != nil {
log.Fatalf("error creating nsq executor: %v", err)
}
prometheus.MustRegister(ex)
handler := prometheus.Handler()
if *metricsPath == "" || *metricsPath == "/" {
http.Handle(*metricsPath, handler)
} else {
http.Handle(*metricsPath, handler)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`<html>
<head><title>NSQ Exporter</title></head>
<body>
<h1>NSQ Exporter</h1>
<p><a href="` + *metricsPath + `">Metrics</a></p>
</body>
</html>`))
})
}
err = http.ListenAndServe(*listenAddress, nil)
if err != nil {
log.Fatal(err)
}
}
func createNsqExecutor() (*collector.NsqExecutor, error) {
ex := collector.NewNsqExecutor(*namespace)
for _, name := range strings.Split(*enabledCollectors, ",") {
name = strings.TrimSpace(name)
addCollector, has := collectorRegistry[name]
if !has {
return nil, fmt.Errorf("unknown collector: %s", name)
}
if err := addCollector(name, ex); err != nil {
return nil, err
}
}
return ex, nil
}
func addStatsCollector(name string, ex *collector.NsqExecutor) error {
u, err := url.Parse(normalizeURL(*nsqUrl))
if err != nil {
return err
}
if u.Scheme == "" {
u.Scheme = "http"
}
if u.Path == "" {
u.Path = "/stats"
}
u.RawQuery = "format=json"
ex.AddCollector(name, collector.NewStatsCollector(u.String()))
return nil
}
func normalizeURL(u string) string {
u = strings.ToLower(u)
if !strings.HasPrefix(u, "https://") && !strings.HasPrefix(u, "http://") {
return "http://" + u
}
return u
}