Divide stats collector in sub-collectors.

In order to avoid timeouts in case of many clients,
it should be possible to configure which stats should
be collected.

Implements the following sub-collectors:
 * topics
 * channels
 * clients
This commit is contained in:
Thomas Nitsche
2015-12-21 18:13:03 +01:00
parent 37b0428070
commit 470c5a0ae2
6 changed files with 242 additions and 196 deletions

62
collector/nsqd.go Normal file
View File

@ -0,0 +1,62 @@
package collector
import (
"sync"
"github.com/prometheus/client_golang/prometheus"
)
// StatsCollector defines an interface for collecting specific stats
// from a nsqd node.
type StatsCollector interface {
collect(s *stats, out chan<- prometheus.Metric)
}
// NsqdStats represents a Collector which collects all the configured stats
// from a nsqd node. Besides the configured stats it will also expose a
// metric for the total number of existing topics.
type NsqdStats struct {
nsqdURL string
collectors []StatsCollector
topicCount prometheus.Gauge
}
// NewNsqdStats creates a new stats collector which uses the given namespace
// and reads the stats from the given URL of the nsqd.
func NewNsqdStats(namespace, nsqdURL string) *NsqdStats {
return &NsqdStats{
nsqdURL: nsqdURL,
topicCount: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "topics_total",
Help: "The total number of topics",
}),
}
}
// Use configures a specific stats collector, so the stats could be
// exposed to the Prometheus system.
func (s *NsqdStats) Use(c StatsCollector) {
s.collectors = append(s.collectors, c)
}
// Collect collects all the registered stats metrics from the nsqd node.
func (s *NsqdStats) Collect(out chan<- prometheus.Metric) error {
stats, err := getNsqdStats(s.nsqdURL)
if err != nil {
return err
}
var wg sync.WaitGroup
wg.Add(len(s.collectors))
for _, coll := range s.collectors {
go func(coll StatsCollector) {
coll.collect(stats, out)
wg.Done()
}(coll)
}
s.topicCount.Set(float64(len(stats.Topics)))
wg.Wait()
return nil
}

View File

@ -3,8 +3,6 @@ package collector
import (
"encoding/json"
"net/http"
"github.com/prometheus/client_golang/prometheus"
)
type statsResponse struct {
@ -20,86 +18,57 @@ type stats struct {
Topics []*topic `json:"topics"`
}
type statsCollector struct {
nsqUrl string
topicCount prometheus.Gauge
channelCount *prometheus.GaugeVec
clientCount *prometheus.GaugeVec
topics topicCollector
channels channelCollector
clients clientCollector
// see https://github.com/nsqio/nsq/blob/master/nsqd/stats.go
type topic struct {
Name string `json:"topic_name"`
Paused bool `json:"paused"`
Depth int64 `json:"depth"`
BackendDepth int64 `json:"backend_depth"`
MessageCount uint64 `json:"message_count"`
Channels []*channel `json:"channels"`
}
// NewStatsCollector create a collector which collects all NSQ metrics
// from the /stats route of the NSQ host.
func NewStatsCollector(nsqUrl string) Collector {
const namespace = "nsq"
return &statsCollector{
nsqUrl: nsqUrl,
topicCount: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "topics_total",
Help: "The total number of topics",
}),
channelCount: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "channels_total",
Help: "The total number of channels",
}, []string{"topic"}),
clientCount: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "channels_total",
Help: "The total number of channels",
}, []string{"topic", "channel"}),
topics: newTopicCollector(namespace),
channels: newChannelCollector(namespace),
clients: newClientCollector(namespace),
}
type channel struct {
Name string `json:"channel_name"`
Paused bool `json:"paused"`
Depth int64 `json:"depth"`
BackendDepth int64 `json:"backend_depth"`
MessageCount uint64 `json:"message_count"`
InFlightCount int `json:"in_flight_count"`
DeferredCount int `json:"deferred_count"`
RequeueCount uint64 `json:"requeue_count"`
TimeoutCount uint64 `json:"timeout_count"`
Clients []*client `json:"clients"`
}
func (c *statsCollector) Collect(out chan<- prometheus.Metric) error {
s, err := c.fetchStats()
if err != nil {
return err
}
c.topicCount.Set(float64(len(s.Topics)))
for _, topic := range s.Topics {
c.channelCount.With(prometheus.Labels{
"topic": topic.Name,
}).Set(float64(len(topic.Channels)))
c.topics.update(topic, out)
for _, channel := range topic.Channels {
c.clientCount.With(prometheus.Labels{
"topic": topic.Name,
"channel": channel.Name,
}).Set(float64(len(channel.Clients)))
c.channels.update(topic.Name, channel, out)
for _, client := range channel.Clients {
c.clients.update(topic.Name, channel.Name, client, out)
}
}
}
return nil
type client struct {
ID string `json:"client_id"`
Hostname string `json:"hostname"`
Version string `json:"version"`
RemoteAddress string `json:"remote_address"`
State int32 `json:"state"`
FinishCount uint64 `json:"finish_count"`
MessageCount uint64 `json:"message_count"`
ReadyCount int64 `json:"ready_count"`
InFlightCount int64 `json:"in_flight_count"`
RequeueCount uint64 `json:"requeue_count"`
ConnectTime int64 `json:"connect_ts"`
SampleRate int32 `json:"sample_rate"`
Deflate bool `json:"deflate"`
Snappy bool `json:"snappy"`
TLS bool `json:"tls"`
}
func (c *statsCollector) fetchStats() (*stats, error) {
resp, err := http.Get(c.nsqUrl)
func getNsqdStats(nsqdURL string) (*stats, error) {
resp, err := http.Get(nsqdURL)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var s statsResponse
if err = json.NewDecoder(resp.Body).Decode(&s); err != nil {
var sr statsResponse
if err = json.NewDecoder(resp.Body).Decode(&sr); err != nil {
return nil, err
}
return &s.Data, nil
return &sr.Data, nil
}

View File

@ -6,29 +6,26 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
// see https://github.com/nsqio/nsq/blob/master/nsqd/stats.go
type channel struct {
Name string `json:"channel_name"`
Depth int64 `json:"depth"`
BackendDepth int64 `json:"backend_depth"`
InFlightCount int `json:"in_flight_count"`
DeferredCount int `json:"deferred_count"`
MessageCount uint64 `json:"message_count"`
RequeueCount uint64 `json:"requeue_count"`
TimeoutCount uint64 `json:"timeout_count"`
Clients []*client `json:"clients"`
Paused bool `json:"paused"`
}
type channelCollector []struct {
type channelsCollector []struct {
val func(*channel) float64
vec *prometheus.GaugeVec
}
func newChannelCollector(namespace string) channelCollector {
// ChannelsCollector creates a new stats collector which is able to
// expose the channel metrics of a nsqd node to Prometheus. The
// channel metrics are reported per topic.
func ChannelsCollector(namespace string) StatsCollector {
labels := []string{"type", "topic", "channel", "paused"}
return channelCollector{
return channelsCollector{
{
val: func(c *channel) float64 { return float64(len(c.Clients)) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "client_count",
Help: "Number of clients",
}, labels),
},
{
val: func(c *channel) float64 { return float64(c.Depth) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
@ -88,16 +85,20 @@ func newChannelCollector(namespace string) channelCollector {
}
}
func (c channelCollector) update(topic string, ch *channel, out chan<- prometheus.Metric) {
labels := prometheus.Labels{
"type": "channel",
"topic": topic,
"channel": ch.Name,
"paused": strconv.FormatBool(ch.Paused),
}
func (coll channelsCollector) collect(s *stats, out chan<- prometheus.Metric) {
for _, topic := range s.Topics {
for _, channel := range topic.Channels {
labels := prometheus.Labels{
"type": "channel",
"topic": topic.Name,
"channel": channel.Name,
"paused": strconv.FormatBool(channel.Paused),
}
for _, g := range c {
g.vec.With(labels).Set(g.val(ch))
g.vec.Collect(out)
for _, c := range coll {
c.vec.With(labels).Set(c.val(channel))
c.vec.Collect(out)
}
}
}
}

View File

@ -6,42 +6,22 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
// see https://github.com/nsqio/nsq/blob/master/nsqd/stats.go
type client struct {
ID string `json:"client_id"`
Hostname string `json:"hostname"`
Version string `json:"version"`
RemoteAddress string `json:"remote_address"`
State int32 `json:"state"`
ReadyCount int64 `json:"ready_count"`
InFlightCount int64 `json:"in_flight_count"`
MessageCount uint64 `json:"message_count"`
FinishCount uint64 `json:"finish_count"`
RequeueCount uint64 `json:"requeue_count"`
ConnectTime int64 `json:"connect_ts"`
SampleRate int32 `json:"sample_rate"`
Deflate bool `json:"deflate"`
Snappy bool `json:"snappy"`
UserAgent string `json:"user_agent"`
Authed bool `json:"authed,omitempty"`
AuthIdentity string `json:"auth_identity,omitempty"`
AuthIdentityURL string `json:"auth_identity_url,omitempty"`
TLS bool `json:"tls"`
CipherSuite string `json:"tls_cipher_suite"`
TLSVersion string `json:"tls_version"`
TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"`
TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"`
}
type clientCollector []struct {
type clientsCollector []struct {
val func(*client) float64
vec *prometheus.GaugeVec
}
func newClientCollector(namespace string) clientCollector {
// ClientsCollector creates a new stats collector which is able to
// expose the client metrics of a nsqd node to Prometheus. The
// client metrics are reported per topic and per channel.
//
// If there are too many clients, it could cause a timeout of the
// Prometheus collection process. So be sure the number of clients
// is small enough when using this collector.
func ClientsCollector(namespace string) StatsCollector {
labels := []string{"type", "topic", "channel", "deflate", "snappy", "tls", "client_id", "hostname", "version", "remote_address"}
return clientCollector{
return clientsCollector{
{
// TODO: Give state a descriptive name instead of a number.
val: func(c *client) float64 { return float64(c.State) },
@ -110,22 +90,28 @@ func newClientCollector(namespace string) clientCollector {
}
}
func (c clientCollector) update(topic, channel string, cl *client, out chan<- prometheus.Metric) {
labels := prometheus.Labels{
"type": "client",
"topic": topic,
"channel": channel,
"deflate": strconv.FormatBool(cl.Deflate),
"snappy": strconv.FormatBool(cl.Snappy),
"tls": strconv.FormatBool(cl.TLS),
"client_id": cl.ID,
"hostname": cl.Hostname,
"version": cl.Version,
"remote_address": cl.RemoteAddress,
}
func (coll clientsCollector) collect(s *stats, out chan<- prometheus.Metric) {
for _, topic := range s.Topics {
for _, channel := range topic.Channels {
for _, client := range channel.Clients {
labels := prometheus.Labels{
"type": "client",
"topic": topic.Name,
"channel": channel.Name,
"deflate": strconv.FormatBool(client.Deflate),
"snappy": strconv.FormatBool(client.Snappy),
"tls": strconv.FormatBool(client.TLS),
"client_id": client.ID,
"hostname": client.Hostname,
"version": client.Version,
"remote_address": client.RemoteAddress,
}
for _, g := range c {
g.vec.With(labels).Set(g.val(cl))
g.vec.Collect(out)
for _, c := range coll {
c.vec.With(labels).Set(c.val(client))
c.vec.Collect(out)
}
}
}
}
}

View File

@ -6,25 +6,25 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
// see https://github.com/nsqio/nsq/blob/master/nsqd/stats.go
type topic struct {
Name string `json:"topic_name"`
Channels []*channel `json:"channels"`
Depth int64 `json:"depth"`
BackendDepth int64 `json:"backend_depth"`
MessageCount uint64 `json:"message_count"`
Paused bool `json:"paused"`
}
type topicCollector []struct {
type topicsCollector []struct {
val func(*topic) float64
vec *prometheus.GaugeVec
}
func newTopicCollector(namespace string) topicCollector {
// TopicsCollector creates a new stats collector which is able to
// expose the topic metrics of a nsqd node to Prometheus.
func TopicsCollector(namespace string) StatsCollector {
labels := []string{"type", "topic", "paused"}
return topicCollector{
return topicsCollector{
{
val: func(t *topic) float64 { return float64(len(t.Channels)) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "channel_count",
Help: "Number of channels",
}, labels),
},
{
val: func(t *topic) float64 { return float64(t.Depth) },
vec: prometheus.NewGaugeVec(prometheus.GaugeOpts{
@ -52,15 +52,17 @@ func newTopicCollector(namespace string) topicCollector {
}
}
func (c topicCollector) update(t *topic, out chan<- prometheus.Metric) {
labels := prometheus.Labels{
"type": "topic",
"topic": t.Name,
"paused": strconv.FormatBool(t.Paused),
}
func (coll topicsCollector) collect(s *stats, out chan<- prometheus.Metric) {
for _, topic := range s.Topics {
labels := prometheus.Labels{
"type": "topic",
"topic": topic.Name,
"paused": strconv.FormatBool(topic.Paused),
}
for _, g := range c {
g.vec.With(labels).Set(g.val(t))
g.vec.Collect(out)
for _, c := range coll {
c.vec.With(labels).Set(c.val(topic))
c.vec.Collect(out)
}
}
}