diff --git a/collector/nsqd.go b/collector/nsqd.go new file mode 100644 index 0000000..4c2facf --- /dev/null +++ b/collector/nsqd.go @@ -0,0 +1,62 @@ +package collector + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +// StatsCollector defines an interface for collecting specific stats +// from a nsqd node. +type StatsCollector interface { + collect(s *stats, out chan<- prometheus.Metric) +} + +// NsqdStats represents a Collector which collects all the configured stats +// from a nsqd node. Besides the configured stats it will also expose a +// metric for the total number of existing topics. +type NsqdStats struct { + nsqdURL string + collectors []StatsCollector + topicCount prometheus.Gauge +} + +// NewNsqdStats creates a new stats collector which uses the given namespace +// and reads the stats from the given URL of the nsqd. +func NewNsqdStats(namespace, nsqdURL string) *NsqdStats { + return &NsqdStats{ + nsqdURL: nsqdURL, + topicCount: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "topics_total", + Help: "The total number of topics", + }), + } +} + +// Use configures a specific stats collector, so the stats could be +// exposed to the Prometheus system. +func (s *NsqdStats) Use(c StatsCollector) { + s.collectors = append(s.collectors, c) +} + +// Collect collects all the registered stats metrics from the nsqd node. +func (s *NsqdStats) Collect(out chan<- prometheus.Metric) error { + stats, err := getNsqdStats(s.nsqdURL) + if err != nil { + return err + } + + var wg sync.WaitGroup + wg.Add(len(s.collectors)) + for _, coll := range s.collectors { + go func(coll StatsCollector) { + coll.collect(stats, out) + wg.Done() + }(coll) + } + + s.topicCount.Set(float64(len(stats.Topics))) + wg.Wait() + return nil +} diff --git a/collector/stats.go b/collector/stats.go index 7515831..575f32e 100644 --- a/collector/stats.go +++ b/collector/stats.go @@ -3,8 +3,6 @@ package collector import ( "encoding/json" "net/http" - - "github.com/prometheus/client_golang/prometheus" ) type statsResponse struct { @@ -20,86 +18,57 @@ type stats struct { Topics []*topic `json:"topics"` } -type statsCollector struct { - nsqUrl string - - topicCount prometheus.Gauge - channelCount *prometheus.GaugeVec - clientCount *prometheus.GaugeVec - - topics topicCollector - channels channelCollector - clients clientCollector +// see https://github.com/nsqio/nsq/blob/master/nsqd/stats.go +type topic struct { + Name string `json:"topic_name"` + Paused bool `json:"paused"` + Depth int64 `json:"depth"` + BackendDepth int64 `json:"backend_depth"` + MessageCount uint64 `json:"message_count"` + Channels []*channel `json:"channels"` } -// NewStatsCollector create a collector which collects all NSQ metrics -// from the /stats route of the NSQ host. -func NewStatsCollector(nsqUrl string) Collector { - const namespace = "nsq" - return &statsCollector{ - nsqUrl: nsqUrl, - - topicCount: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Name: "topics_total", - Help: "The total number of topics", - }), - channelCount: prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: namespace, - Name: "channels_total", - Help: "The total number of channels", - }, []string{"topic"}), - clientCount: prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: namespace, - Name: "channels_total", - Help: "The total number of channels", - }, []string{"topic", "channel"}), - - topics: newTopicCollector(namespace), - channels: newChannelCollector(namespace), - clients: newClientCollector(namespace), - } +type channel struct { + Name string `json:"channel_name"` + Paused bool `json:"paused"` + Depth int64 `json:"depth"` + BackendDepth int64 `json:"backend_depth"` + MessageCount uint64 `json:"message_count"` + InFlightCount int `json:"in_flight_count"` + DeferredCount int `json:"deferred_count"` + RequeueCount uint64 `json:"requeue_count"` + TimeoutCount uint64 `json:"timeout_count"` + Clients []*client `json:"clients"` } -func (c *statsCollector) Collect(out chan<- prometheus.Metric) error { - s, err := c.fetchStats() - if err != nil { - return err - } - - c.topicCount.Set(float64(len(s.Topics))) - - for _, topic := range s.Topics { - c.channelCount.With(prometheus.Labels{ - "topic": topic.Name, - }).Set(float64(len(topic.Channels))) - - c.topics.update(topic, out) - for _, channel := range topic.Channels { - c.clientCount.With(prometheus.Labels{ - "topic": topic.Name, - "channel": channel.Name, - }).Set(float64(len(channel.Clients))) - - c.channels.update(topic.Name, channel, out) - for _, client := range channel.Clients { - c.clients.update(topic.Name, channel.Name, client, out) - } - } - } - return nil +type client struct { + ID string `json:"client_id"` + Hostname string `json:"hostname"` + Version string `json:"version"` + RemoteAddress string `json:"remote_address"` + State int32 `json:"state"` + FinishCount uint64 `json:"finish_count"` + MessageCount uint64 `json:"message_count"` + ReadyCount int64 `json:"ready_count"` + InFlightCount int64 `json:"in_flight_count"` + RequeueCount uint64 `json:"requeue_count"` + ConnectTime int64 `json:"connect_ts"` + SampleRate int32 `json:"sample_rate"` + Deflate bool `json:"deflate"` + Snappy bool `json:"snappy"` + TLS bool `json:"tls"` } -func (c *statsCollector) fetchStats() (*stats, error) { - resp, err := http.Get(c.nsqUrl) +func getNsqdStats(nsqdURL string) (*stats, error) { + resp, err := http.Get(nsqdURL) if err != nil { return nil, err } defer resp.Body.Close() - var s statsResponse - if err = json.NewDecoder(resp.Body).Decode(&s); err != nil { + var sr statsResponse + if err = json.NewDecoder(resp.Body).Decode(&sr); err != nil { return nil, err } - return &s.Data, nil + return &sr.Data, nil } diff --git a/collector/stats_channel.go b/collector/stats_channel.go index 133af21..c1483be 100644 --- a/collector/stats_channel.go +++ b/collector/stats_channel.go @@ -6,29 +6,26 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -// see https://github.com/nsqio/nsq/blob/master/nsqd/stats.go -type channel struct { - Name string `json:"channel_name"` - Depth int64 `json:"depth"` - BackendDepth int64 `json:"backend_depth"` - InFlightCount int `json:"in_flight_count"` - DeferredCount int `json:"deferred_count"` - MessageCount uint64 `json:"message_count"` - RequeueCount uint64 `json:"requeue_count"` - TimeoutCount uint64 `json:"timeout_count"` - Clients []*client `json:"clients"` - Paused bool `json:"paused"` -} - -type channelCollector []struct { +type channelsCollector []struct { val func(*channel) float64 vec *prometheus.GaugeVec } -func newChannelCollector(namespace string) channelCollector { +// ChannelsCollector creates a new stats collector which is able to +// expose the channel metrics of a nsqd node to Prometheus. The +// channel metrics are reported per topic. +func ChannelsCollector(namespace string) StatsCollector { labels := []string{"type", "topic", "channel", "paused"} - return channelCollector{ + return channelsCollector{ + { + val: func(c *channel) float64 { return float64(len(c.Clients)) }, + vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "client_count", + Help: "Number of clients", + }, labels), + }, { val: func(c *channel) float64 { return float64(c.Depth) }, vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ @@ -88,16 +85,20 @@ func newChannelCollector(namespace string) channelCollector { } } -func (c channelCollector) update(topic string, ch *channel, out chan<- prometheus.Metric) { - labels := prometheus.Labels{ - "type": "channel", - "topic": topic, - "channel": ch.Name, - "paused": strconv.FormatBool(ch.Paused), - } +func (coll channelsCollector) collect(s *stats, out chan<- prometheus.Metric) { + for _, topic := range s.Topics { + for _, channel := range topic.Channels { + labels := prometheus.Labels{ + "type": "channel", + "topic": topic.Name, + "channel": channel.Name, + "paused": strconv.FormatBool(channel.Paused), + } - for _, g := range c { - g.vec.With(labels).Set(g.val(ch)) - g.vec.Collect(out) + for _, c := range coll { + c.vec.With(labels).Set(c.val(channel)) + c.vec.Collect(out) + } + } } } diff --git a/collector/stats_client.go b/collector/stats_client.go index 090fa7e..22f8e08 100644 --- a/collector/stats_client.go +++ b/collector/stats_client.go @@ -6,42 +6,22 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -// see https://github.com/nsqio/nsq/blob/master/nsqd/stats.go -type client struct { - ID string `json:"client_id"` - Hostname string `json:"hostname"` - Version string `json:"version"` - RemoteAddress string `json:"remote_address"` - State int32 `json:"state"` - ReadyCount int64 `json:"ready_count"` - InFlightCount int64 `json:"in_flight_count"` - MessageCount uint64 `json:"message_count"` - FinishCount uint64 `json:"finish_count"` - RequeueCount uint64 `json:"requeue_count"` - ConnectTime int64 `json:"connect_ts"` - SampleRate int32 `json:"sample_rate"` - Deflate bool `json:"deflate"` - Snappy bool `json:"snappy"` - UserAgent string `json:"user_agent"` - Authed bool `json:"authed,omitempty"` - AuthIdentity string `json:"auth_identity,omitempty"` - AuthIdentityURL string `json:"auth_identity_url,omitempty"` - TLS bool `json:"tls"` - CipherSuite string `json:"tls_cipher_suite"` - TLSVersion string `json:"tls_version"` - TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"` - TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"` -} - -type clientCollector []struct { +type clientsCollector []struct { val func(*client) float64 vec *prometheus.GaugeVec } -func newClientCollector(namespace string) clientCollector { +// ClientsCollector creates a new stats collector which is able to +// expose the client metrics of a nsqd node to Prometheus. The +// client metrics are reported per topic and per channel. +// +// If there are too many clients, it could cause a timeout of the +// Prometheus collection process. So be sure the number of clients +// is small enough when using this collector. +func ClientsCollector(namespace string) StatsCollector { labels := []string{"type", "topic", "channel", "deflate", "snappy", "tls", "client_id", "hostname", "version", "remote_address"} - return clientCollector{ + return clientsCollector{ { // TODO: Give state a descriptive name instead of a number. val: func(c *client) float64 { return float64(c.State) }, @@ -110,22 +90,28 @@ func newClientCollector(namespace string) clientCollector { } } -func (c clientCollector) update(topic, channel string, cl *client, out chan<- prometheus.Metric) { - labels := prometheus.Labels{ - "type": "client", - "topic": topic, - "channel": channel, - "deflate": strconv.FormatBool(cl.Deflate), - "snappy": strconv.FormatBool(cl.Snappy), - "tls": strconv.FormatBool(cl.TLS), - "client_id": cl.ID, - "hostname": cl.Hostname, - "version": cl.Version, - "remote_address": cl.RemoteAddress, - } +func (coll clientsCollector) collect(s *stats, out chan<- prometheus.Metric) { + for _, topic := range s.Topics { + for _, channel := range topic.Channels { + for _, client := range channel.Clients { + labels := prometheus.Labels{ + "type": "client", + "topic": topic.Name, + "channel": channel.Name, + "deflate": strconv.FormatBool(client.Deflate), + "snappy": strconv.FormatBool(client.Snappy), + "tls": strconv.FormatBool(client.TLS), + "client_id": client.ID, + "hostname": client.Hostname, + "version": client.Version, + "remote_address": client.RemoteAddress, + } - for _, g := range c { - g.vec.With(labels).Set(g.val(cl)) - g.vec.Collect(out) + for _, c := range coll { + c.vec.With(labels).Set(c.val(client)) + c.vec.Collect(out) + } + } + } } } diff --git a/collector/stats_topic.go b/collector/stats_topic.go index 3c10e51..1e94ad4 100644 --- a/collector/stats_topic.go +++ b/collector/stats_topic.go @@ -6,25 +6,25 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -// see https://github.com/nsqio/nsq/blob/master/nsqd/stats.go -type topic struct { - Name string `json:"topic_name"` - Channels []*channel `json:"channels"` - Depth int64 `json:"depth"` - BackendDepth int64 `json:"backend_depth"` - MessageCount uint64 `json:"message_count"` - Paused bool `json:"paused"` -} - -type topicCollector []struct { +type topicsCollector []struct { val func(*topic) float64 vec *prometheus.GaugeVec } -func newTopicCollector(namespace string) topicCollector { +// TopicsCollector creates a new stats collector which is able to +// expose the topic metrics of a nsqd node to Prometheus. +func TopicsCollector(namespace string) StatsCollector { labels := []string{"type", "topic", "paused"} - return topicCollector{ + return topicsCollector{ + { + val: func(t *topic) float64 { return float64(len(t.Channels)) }, + vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Name: "channel_count", + Help: "Number of channels", + }, labels), + }, { val: func(t *topic) float64 { return float64(t.Depth) }, vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{ @@ -52,15 +52,17 @@ func newTopicCollector(namespace string) topicCollector { } } -func (c topicCollector) update(t *topic, out chan<- prometheus.Metric) { - labels := prometheus.Labels{ - "type": "topic", - "topic": t.Name, - "paused": strconv.FormatBool(t.Paused), - } +func (coll topicsCollector) collect(s *stats, out chan<- prometheus.Metric) { + for _, topic := range s.Topics { + labels := prometheus.Labels{ + "type": "topic", + "topic": topic.Name, + "paused": strconv.FormatBool(topic.Paused), + } - for _, g := range c { - g.vec.With(labels).Set(g.val(t)) - g.vec.Collect(out) + for _, c := range coll { + c.vec.With(labels).Set(c.val(topic)) + c.vec.Collect(out) + } } } diff --git a/main.go b/main.go index 345d2cc..d713849 100644 --- a/main.go +++ b/main.go @@ -3,14 +3,14 @@ package main import ( "flag" "fmt" + "log" "net/http" "net/url" "strings" - "github.com/lovoo/nsq_exporter/collector" + "github.com/tsne/nsq_exporter/collector" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/log" ) // Version of nsq_exporter. Set at build time. @@ -19,12 +19,19 @@ const Version = "0.0.0.dev" var ( listenAddress = flag.String("web.listen", ":9117", "Address on which to expose metrics and web interface.") metricsPath = flag.String("web.path", "/metrics", "Path under which to expose metrics.") - nsqUrl = flag.String("nsq.addr", "http://localhost:4151/stats", "Address of the NSQ host.") - enabledCollectors = flag.String("collectors", "nsqstats", "Comma-separated list of collectors to use.") + nsqdURL = flag.String("nsqd.addr", "http://localhost:4151/stats", "Address of the nsqd node.") + enabledCollectors = flag.String("collect", "stats.topics,stats.channels", "Comma-separated list of collectors to use.") namespace = flag.String("namespace", "nsq", "Namespace for the NSQ metrics.") - collectorRegistry = map[string]func(name string, x *collector.NsqExecutor) error{ - "nsqstats": addStatsCollector, + collectorRegistry = map[string]func(names []string) (collector.Collector, error){ + "stats": createNsqdStats, + } + + // stats.* collectors + statsRegistry = map[string]func(namespace string) collector.StatsCollector{ + "topics": collector.TopicsCollector, + "channels": collector.ChannelsCollector, + "clients": collector.ClientsCollector, } ) @@ -37,11 +44,8 @@ func main() { } prometheus.MustRegister(ex) - handler := prometheus.Handler() - if *metricsPath == "" || *metricsPath == "/" { - http.Handle(*metricsPath, handler) - } else { - http.Handle(*metricsPath, handler) + http.Handle(*metricsPath, prometheus.Handler()) + if *metricsPath != "" && *metricsPath != "/" { http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte(`