From f5cc5028741b83433a859ef2c0cec416080fc2e6 Mon Sep 17 00:00:00 2001 From: nordicdyno Date: Fri, 10 Feb 2017 11:33:54 +0300 Subject: [PATCH] Refactoring for better Prometheus' logic support 1) Breaking changes! Updated collection code for proper Prometheus support - Now metrics names of all subcollectors have prefixes: nsq_topic_, nsq_channel_, nsq_client_ As a result metrics registration code now passing Prometheus' client validation, also it fixes scrape warnings - "type" label in exported metrics removed as obsolete 2) Add reset of gauges before scrape (like in haproxy-exporter) 3) Refactor: subcollectors logic is simplified, multiple collectors support removed We always use 'stats' collector so it removes unnecessary flexibility and complexity. --- collector/collector.go | 10 +++-- collector/executor.go | 89 +++++++++++++++++++++----------------- collector/nsqd.go | 62 -------------------------- collector/stats_channel.go | 25 +++++++++-- collector/stats_client.go | 25 +++++++++-- collector/stats_topic.go | 24 ++++++++-- main.go | 55 ++++++++--------------- 7 files changed, 136 insertions(+), 154 deletions(-) delete mode 100644 collector/nsqd.go diff --git a/collector/collector.go b/collector/collector.go index e4f879f..6ed419b 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -2,7 +2,11 @@ package collector import "github.com/prometheus/client_golang/prometheus" -// Collector defines the interface for collecting all metrics for Prometheus. -type Collector interface { - Collect(out chan<- prometheus.Metric) error +// StatsCollector defines an interface for collecting specific stats +// from a nsqd exported stats data. +type StatsCollector interface { + set(s *stats) + collect(out chan<- prometheus.Metric) + describe(ch chan<- *prometheus.Desc) + reset() } diff --git a/collector/executor.go b/collector/executor.go index f2c38bc..648efb0 100644 --- a/collector/executor.go +++ b/collector/executor.go @@ -11,63 +11,74 @@ import ( // This type implements the prometheus.Collector interface and can be // registered in the metrics collection. // -// The executor takes the time needed by each registered collector and +// The executor takes the time needed for scraping nsqd stat endpoint and // provides an extra metric for this. This metric is labeled with the -// result ("success" or "error") and the collector. +// scrape result ("success" or "error"). type NsqExecutor struct { - collectors map[string]Collector + nsqdURL string + + collectors []StatsCollector summary *prometheus.SummaryVec + mutex sync.RWMutex } -// NewNsqExecutor creates a new executor for the NSQ metrics. -func NewNsqExecutor(namespace string) *NsqExecutor { +// NewNsqExecutor creates a new executor for collecting NSQ metrics. +func NewNsqExecutor(namespace, nsqdURL string) *NsqExecutor { + sum := prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: "exporter", + Name: "scrape_duration_seconds", + Help: "Duration of a scrape job of the NSQ exporter", + }, []string{"result"}) + prometheus.MustRegister(sum) return &NsqExecutor{ - collectors: make(map[string]Collector), - summary: prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Namespace: namespace, - Subsystem: "exporter", - Name: "scape_duration_seconds", - Help: "Duration of a scrape job of the NSQ exporter", - }, []string{"collector", "result"}), + nsqdURL: nsqdURL, + summary: sum, } } -// AddCollector adds a new collector for the metrics collection. -// Each collector needs a unique name which is used as a label -// for the executor metric. -func (e *NsqExecutor) AddCollector(name string, c Collector) { - e.collectors[name] = c +// Use configures a specific stats collector, so the stats could be +// exposed to the Prometheus system. +func (e *NsqExecutor) Use(c StatsCollector) { + e.mutex.Lock() + defer e.mutex.Unlock() + e.collectors = append(e.collectors, c) } // Describe implements the prometheus.Collector interface. -func (e *NsqExecutor) Describe(out chan<- *prometheus.Desc) { - e.summary.Describe(out) +func (e *NsqExecutor) Describe(ch chan<- *prometheus.Desc) { + for _, c := range e.collectors { + c.describe(ch) + } } // Collect implements the prometheus.Collector interface. func (e *NsqExecutor) Collect(out chan<- prometheus.Metric) { - var wg sync.WaitGroup - wg.Add(len(e.collectors)) - for name, coll := range e.collectors { - go func(name string, coll Collector) { - e.exec(name, coll, out) - wg.Done() - }(name, coll) - } - wg.Wait() -} - -func (e *NsqExecutor) exec(name string, coll Collector, out chan<- prometheus.Metric) { start := time.Now() - err := coll.Collect(out) - dur := time.Since(start) + e.mutex.Lock() + defer e.mutex.Unlock() - labels := prometheus.Labels{"collector": name} - if err != nil { - labels["result"] = "error" - } else { - labels["result"] = "success" + // reset state, because metrics can gone + for _, c := range e.collectors { + c.reset() } - e.summary.With(labels).Observe(dur.Seconds()) + stats, err := getNsqdStats(e.nsqdURL) + tScrape := time.Since(start).Seconds() + + result := "success" + if err != nil { + result = "error" + } + + e.summary.WithLabelValues(result).Observe(tScrape) + + if err == nil { + for _, c := range e.collectors { + c.set(stats) + } + for _, c := range e.collectors { + c.collect(out) + } + } } diff --git a/collector/nsqd.go b/collector/nsqd.go deleted file mode 100644 index 4c2facf..0000000 --- a/collector/nsqd.go +++ /dev/null @@ -1,62 +0,0 @@ -package collector - -import ( - "sync" - - "github.com/prometheus/client_golang/prometheus" -) - -// StatsCollector defines an interface for collecting specific stats -// from a nsqd node. -type StatsCollector interface { - collect(s *stats, out chan<- prometheus.Metric) -} - -// NsqdStats represents a Collector which collects all the configured stats -// from a nsqd node. Besides the configured stats it will also expose a -// metric for the total number of existing topics. -type NsqdStats struct { - nsqdURL string - collectors []StatsCollector - topicCount prometheus.Gauge -} - -// NewNsqdStats creates a new stats collector which uses the given namespace -// and reads the stats from the given URL of the nsqd. -func NewNsqdStats(namespace, nsqdURL string) *NsqdStats { - return &NsqdStats{ - nsqdURL: nsqdURL, - topicCount: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Name: "topics_total", - Help: "The total number of topics", - }), - } -} - -// Use configures a specific stats collector, so the stats could be -// exposed to the Prometheus system. -func (s *NsqdStats) Use(c StatsCollector) { - s.collectors = append(s.collectors, c) -} - -// Collect collects all the registered stats metrics from the nsqd node. -func (s *NsqdStats) Collect(out chan<- prometheus.Metric) error { - stats, err := getNsqdStats(s.nsqdURL) - if err != nil { - return err - } - - var wg sync.WaitGroup - wg.Add(len(s.collectors)) - for _, coll := range s.collectors { - go func(coll StatsCollector) { - coll.collect(stats, out) - wg.Done() - }(coll) - } - - s.topicCount.Set(float64(len(stats.Topics))) - wg.Wait() - return nil -} diff --git a/collector/stats_channel.go b/collector/stats_channel.go index 98abc68..d9ab3d1 100644 --- a/collector/stats_channel.go +++ b/collector/stats_channel.go @@ -15,7 +15,8 @@ type channelStats []struct { // expose the channel metrics of a nsqd node to Prometheus. The // channel metrics are reported per topic. func ChannelStats(namespace string) StatsCollector { - labels := []string{"type", "topic", "channel", "paused"} + labels := []string{"topic", "channel", "paused"} + namespace += "_channel" return channelStats{ { @@ -101,11 +102,10 @@ func ChannelStats(namespace string) StatsCollector { } } -func (cs channelStats) collect(s *stats, out chan<- prometheus.Metric) { +func (cs channelStats) set(s *stats) { for _, topic := range s.Topics { for _, channel := range topic.Channels { labels := prometheus.Labels{ - "type": "channel", "topic": topic.Name, "channel": channel.Name, "paused": strconv.FormatBool(channel.Paused), @@ -113,8 +113,25 @@ func (cs channelStats) collect(s *stats, out chan<- prometheus.Metric) { for _, c := range cs { c.vec.With(labels).Set(c.val(channel)) - c.vec.Collect(out) } } } } + +func (cs channelStats) collect(out chan<- prometheus.Metric) { + for _, c := range cs { + c.vec.Collect(out) + } +} + +func (cs channelStats) describe(ch chan<- *prometheus.Desc) { + for _, c := range cs { + c.vec.Describe(ch) + } +} + +func (cs channelStats) reset() { + for _, c := range cs { + c.vec.Reset() + } +} diff --git a/collector/stats_client.go b/collector/stats_client.go index f700464..9b32c84 100644 --- a/collector/stats_client.go +++ b/collector/stats_client.go @@ -19,7 +19,8 @@ type clientStats []struct { // Prometheus collection process. So be sure the number of clients // is small enough when using this collector. func ClientStats(namespace string) StatsCollector { - labels := []string{"type", "topic", "channel", "deflate", "snappy", "tls", "client_id", "hostname", "version", "remote_address"} + labels := []string{"topic", "channel", "deflate", "snappy", "tls", "client_id", "hostname", "version", "remote_address"} + namespace += "_client" return clientStats{ { @@ -90,12 +91,11 @@ func ClientStats(namespace string) StatsCollector { } } -func (cs clientStats) collect(s *stats, out chan<- prometheus.Metric) { +func (cs clientStats) set(s *stats) { for _, topic := range s.Topics { for _, channel := range topic.Channels { for _, client := range channel.Clients { labels := prometheus.Labels{ - "type": "client", "topic": topic.Name, "channel": channel.Name, "deflate": strconv.FormatBool(client.Deflate), @@ -109,9 +109,26 @@ func (cs clientStats) collect(s *stats, out chan<- prometheus.Metric) { for _, c := range cs { c.vec.With(labels).Set(c.val(client)) - c.vec.Collect(out) } } } } } + +func (cs clientStats) collect(out chan<- prometheus.Metric) { + for _, c := range cs { + c.vec.Collect(out) + } +} + +func (cs clientStats) describe(ch chan<- *prometheus.Desc) { + for _, c := range cs { + c.vec.Describe(ch) + } +} + +func (cs clientStats) reset() { + for _, c := range cs { + c.vec.Reset() + } +} diff --git a/collector/stats_topic.go b/collector/stats_topic.go index 96280d8..f7062d1 100644 --- a/collector/stats_topic.go +++ b/collector/stats_topic.go @@ -14,7 +14,8 @@ type topicStats []struct { // TopicStats creates a new stats collector which is able to // expose the topic metrics of a nsqd node to Prometheus. func TopicStats(namespace string) StatsCollector { - labels := []string{"type", "topic", "paused"} + labels := []string{"topic", "paused"} + namespace += "_topic" return topicStats{ { @@ -68,17 +69,32 @@ func TopicStats(namespace string) StatsCollector { } } -func (ts topicStats) collect(s *stats, out chan<- prometheus.Metric) { +func (ts topicStats) set(s *stats) { for _, topic := range s.Topics { labels := prometheus.Labels{ - "type": "topic", "topic": topic.Name, "paused": strconv.FormatBool(topic.Paused), } for _, c := range ts { c.vec.With(labels).Set(c.val(topic)) - c.vec.Collect(out) } } } +func (ts topicStats) collect(out chan<- prometheus.Metric) { + for _, c := range ts { + c.vec.Collect(out) + } +} + +func (ts topicStats) describe(ch chan<- *prometheus.Desc) { + for _, c := range ts { + c.vec.Describe(ch) + } +} + +func (ts topicStats) reset() { + for _, c := range ts { + c.vec.Reset() + } +} diff --git a/main.go b/main.go index 78a77d0..57ea9d5 100644 --- a/main.go +++ b/main.go @@ -23,11 +23,6 @@ var ( enabledCollectors = flag.String("collect", "stats.topics,stats.channels", "Comma-separated list of collectors to use.") namespace = flag.String("namespace", "nsq", "Namespace for the NSQ metrics.") - collectorRegistry = map[string]func(names []string) (collector.Collector, error){ - "stats": createNsqdStats, - } - - // stats.* collectors statsRegistry = map[string]func(namespace string) collector.StatsCollector{ "topics": collector.TopicStats, "channels": collector.ChannelStats, @@ -65,47 +60,31 @@ func main() { } func createNsqExecutor() (*collector.NsqExecutor, error) { - collectors := make(map[string][]string) - for _, name := range strings.Split(*enabledCollectors, ",") { - name = strings.TrimSpace(name) - parts := strings.SplitN(name, ".", 2) - if len(parts) != 2 { - return nil, fmt.Errorf("invalid collector name: %s", name) - } - collectors[parts[0]] = append(collectors[parts[0]], parts[1]) - } - ex := collector.NewNsqExecutor(*namespace) - for collector, subcollectors := range collectors { - newCollector, has := collectorRegistry[collector] - if !has { - return nil, fmt.Errorf("invalid collector: %s", collector) - } - - c, err := newCollector(subcollectors) - if err != nil { - return nil, err - } - ex.AddCollector(collector, c) - } - return ex, nil -} - -func createNsqdStats(statsCollectors []string) (collector.Collector, error) { nsqdURL, err := normalizeURL(*nsqdURL) if err != nil { return nil, err } - stats := collector.NewNsqdStats(*namespace, nsqdURL) - for _, c := range statsCollectors { - newStatsCollector, has := statsRegistry[c] - if !has { - return nil, fmt.Errorf("unknown stats collector: %s", c) + ex := collector.NewNsqExecutor(*namespace, nsqdURL) + for _, param := range strings.Split(*enabledCollectors, ",") { + param = strings.TrimSpace(param) + parts := strings.SplitN(param, ".", 2) + if len(parts) != 2 { + return nil, fmt.Errorf("invalid collector name: %s", param) } - stats.Use(newStatsCollector(*namespace)) + if parts[0] != "stats" { + return nil, fmt.Errorf("invalid collector prefix: %s", parts[0]) + } + + name := parts[1] + c, has := statsRegistry[name] + if !has { + return nil, fmt.Errorf("unknown stats collector: %s", name) + } + ex.Use(c(*namespace)) } - return stats, nil + return ex, nil } func normalizeURL(ustr string) (string, error) {