Refactoring for better Prometheus' logic support
1) Breaking changes! Updated collection code for proper Prometheus support - Now metrics names of all subcollectors have prefixes: nsq_topic_, nsq_channel_, nsq_client_ As a result metrics registration code now passing Prometheus' client validation, also it fixes scrape warnings - "type" label in exported metrics removed as obsolete 2) Add reset of gauges before scrape (like in haproxy-exporter) 3) Refactor: subcollectors logic is simplified, multiple collectors support removed We always use 'stats' collector so it removes unnecessary flexibility and complexity.
This commit is contained in:
@ -2,7 +2,11 @@ package collector
|
|||||||
|
|
||||||
import "github.com/prometheus/client_golang/prometheus"
|
import "github.com/prometheus/client_golang/prometheus"
|
||||||
|
|
||||||
// Collector defines the interface for collecting all metrics for Prometheus.
|
// StatsCollector defines an interface for collecting specific stats
|
||||||
type Collector interface {
|
// from a nsqd exported stats data.
|
||||||
Collect(out chan<- prometheus.Metric) error
|
type StatsCollector interface {
|
||||||
|
set(s *stats)
|
||||||
|
collect(out chan<- prometheus.Metric)
|
||||||
|
describe(ch chan<- *prometheus.Desc)
|
||||||
|
reset()
|
||||||
}
|
}
|
||||||
|
@ -11,63 +11,74 @@ import (
|
|||||||
// This type implements the prometheus.Collector interface and can be
|
// This type implements the prometheus.Collector interface and can be
|
||||||
// registered in the metrics collection.
|
// registered in the metrics collection.
|
||||||
//
|
//
|
||||||
// The executor takes the time needed by each registered collector and
|
// The executor takes the time needed for scraping nsqd stat endpoint and
|
||||||
// provides an extra metric for this. This metric is labeled with the
|
// provides an extra metric for this. This metric is labeled with the
|
||||||
// result ("success" or "error") and the collector.
|
// scrape result ("success" or "error").
|
||||||
type NsqExecutor struct {
|
type NsqExecutor struct {
|
||||||
collectors map[string]Collector
|
nsqdURL string
|
||||||
|
|
||||||
|
collectors []StatsCollector
|
||||||
summary *prometheus.SummaryVec
|
summary *prometheus.SummaryVec
|
||||||
|
mutex sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNsqExecutor creates a new executor for the NSQ metrics.
|
// NewNsqExecutor creates a new executor for collecting NSQ metrics.
|
||||||
func NewNsqExecutor(namespace string) *NsqExecutor {
|
func NewNsqExecutor(namespace, nsqdURL string) *NsqExecutor {
|
||||||
|
sum := prometheus.NewSummaryVec(prometheus.SummaryOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: "exporter",
|
||||||
|
Name: "scrape_duration_seconds",
|
||||||
|
Help: "Duration of a scrape job of the NSQ exporter",
|
||||||
|
}, []string{"result"})
|
||||||
|
prometheus.MustRegister(sum)
|
||||||
return &NsqExecutor{
|
return &NsqExecutor{
|
||||||
collectors: make(map[string]Collector),
|
nsqdURL: nsqdURL,
|
||||||
summary: prometheus.NewSummaryVec(prometheus.SummaryOpts{
|
summary: sum,
|
||||||
Namespace: namespace,
|
|
||||||
Subsystem: "exporter",
|
|
||||||
Name: "scape_duration_seconds",
|
|
||||||
Help: "Duration of a scrape job of the NSQ exporter",
|
|
||||||
}, []string{"collector", "result"}),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddCollector adds a new collector for the metrics collection.
|
// Use configures a specific stats collector, so the stats could be
|
||||||
// Each collector needs a unique name which is used as a label
|
// exposed to the Prometheus system.
|
||||||
// for the executor metric.
|
func (e *NsqExecutor) Use(c StatsCollector) {
|
||||||
func (e *NsqExecutor) AddCollector(name string, c Collector) {
|
e.mutex.Lock()
|
||||||
e.collectors[name] = c
|
defer e.mutex.Unlock()
|
||||||
|
e.collectors = append(e.collectors, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Describe implements the prometheus.Collector interface.
|
// Describe implements the prometheus.Collector interface.
|
||||||
func (e *NsqExecutor) Describe(out chan<- *prometheus.Desc) {
|
func (e *NsqExecutor) Describe(ch chan<- *prometheus.Desc) {
|
||||||
e.summary.Describe(out)
|
for _, c := range e.collectors {
|
||||||
|
c.describe(ch)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Collect implements the prometheus.Collector interface.
|
// Collect implements the prometheus.Collector interface.
|
||||||
func (e *NsqExecutor) Collect(out chan<- prometheus.Metric) {
|
func (e *NsqExecutor) Collect(out chan<- prometheus.Metric) {
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(len(e.collectors))
|
|
||||||
for name, coll := range e.collectors {
|
|
||||||
go func(name string, coll Collector) {
|
|
||||||
e.exec(name, coll, out)
|
|
||||||
wg.Done()
|
|
||||||
}(name, coll)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *NsqExecutor) exec(name string, coll Collector, out chan<- prometheus.Metric) {
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
err := coll.Collect(out)
|
e.mutex.Lock()
|
||||||
dur := time.Since(start)
|
defer e.mutex.Unlock()
|
||||||
|
|
||||||
labels := prometheus.Labels{"collector": name}
|
// reset state, because metrics can gone
|
||||||
if err != nil {
|
for _, c := range e.collectors {
|
||||||
labels["result"] = "error"
|
c.reset()
|
||||||
} else {
|
|
||||||
labels["result"] = "success"
|
|
||||||
}
|
}
|
||||||
|
|
||||||
e.summary.With(labels).Observe(dur.Seconds())
|
stats, err := getNsqdStats(e.nsqdURL)
|
||||||
|
tScrape := time.Since(start).Seconds()
|
||||||
|
|
||||||
|
result := "success"
|
||||||
|
if err != nil {
|
||||||
|
result = "error"
|
||||||
|
}
|
||||||
|
|
||||||
|
e.summary.WithLabelValues(result).Observe(tScrape)
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
for _, c := range e.collectors {
|
||||||
|
c.set(stats)
|
||||||
|
}
|
||||||
|
for _, c := range e.collectors {
|
||||||
|
c.collect(out)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,62 +0,0 @@
|
|||||||
package collector
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
)
|
|
||||||
|
|
||||||
// StatsCollector defines an interface for collecting specific stats
|
|
||||||
// from a nsqd node.
|
|
||||||
type StatsCollector interface {
|
|
||||||
collect(s *stats, out chan<- prometheus.Metric)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NsqdStats represents a Collector which collects all the configured stats
|
|
||||||
// from a nsqd node. Besides the configured stats it will also expose a
|
|
||||||
// metric for the total number of existing topics.
|
|
||||||
type NsqdStats struct {
|
|
||||||
nsqdURL string
|
|
||||||
collectors []StatsCollector
|
|
||||||
topicCount prometheus.Gauge
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewNsqdStats creates a new stats collector which uses the given namespace
|
|
||||||
// and reads the stats from the given URL of the nsqd.
|
|
||||||
func NewNsqdStats(namespace, nsqdURL string) *NsqdStats {
|
|
||||||
return &NsqdStats{
|
|
||||||
nsqdURL: nsqdURL,
|
|
||||||
topicCount: prometheus.NewGauge(prometheus.GaugeOpts{
|
|
||||||
Namespace: namespace,
|
|
||||||
Name: "topics_total",
|
|
||||||
Help: "The total number of topics",
|
|
||||||
}),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use configures a specific stats collector, so the stats could be
|
|
||||||
// exposed to the Prometheus system.
|
|
||||||
func (s *NsqdStats) Use(c StatsCollector) {
|
|
||||||
s.collectors = append(s.collectors, c)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Collect collects all the registered stats metrics from the nsqd node.
|
|
||||||
func (s *NsqdStats) Collect(out chan<- prometheus.Metric) error {
|
|
||||||
stats, err := getNsqdStats(s.nsqdURL)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(len(s.collectors))
|
|
||||||
for _, coll := range s.collectors {
|
|
||||||
go func(coll StatsCollector) {
|
|
||||||
coll.collect(stats, out)
|
|
||||||
wg.Done()
|
|
||||||
}(coll)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.topicCount.Set(float64(len(stats.Topics)))
|
|
||||||
wg.Wait()
|
|
||||||
return nil
|
|
||||||
}
|
|
@ -15,7 +15,8 @@ type channelStats []struct {
|
|||||||
// expose the channel metrics of a nsqd node to Prometheus. The
|
// expose the channel metrics of a nsqd node to Prometheus. The
|
||||||
// channel metrics are reported per topic.
|
// channel metrics are reported per topic.
|
||||||
func ChannelStats(namespace string) StatsCollector {
|
func ChannelStats(namespace string) StatsCollector {
|
||||||
labels := []string{"type", "topic", "channel", "paused"}
|
labels := []string{"topic", "channel", "paused"}
|
||||||
|
namespace += "_channel"
|
||||||
|
|
||||||
return channelStats{
|
return channelStats{
|
||||||
{
|
{
|
||||||
@ -101,11 +102,10 @@ func ChannelStats(namespace string) StatsCollector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs channelStats) collect(s *stats, out chan<- prometheus.Metric) {
|
func (cs channelStats) set(s *stats) {
|
||||||
for _, topic := range s.Topics {
|
for _, topic := range s.Topics {
|
||||||
for _, channel := range topic.Channels {
|
for _, channel := range topic.Channels {
|
||||||
labels := prometheus.Labels{
|
labels := prometheus.Labels{
|
||||||
"type": "channel",
|
|
||||||
"topic": topic.Name,
|
"topic": topic.Name,
|
||||||
"channel": channel.Name,
|
"channel": channel.Name,
|
||||||
"paused": strconv.FormatBool(channel.Paused),
|
"paused": strconv.FormatBool(channel.Paused),
|
||||||
@ -113,8 +113,25 @@ func (cs channelStats) collect(s *stats, out chan<- prometheus.Metric) {
|
|||||||
|
|
||||||
for _, c := range cs {
|
for _, c := range cs {
|
||||||
c.vec.With(labels).Set(c.val(channel))
|
c.vec.With(labels).Set(c.val(channel))
|
||||||
c.vec.Collect(out)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cs channelStats) collect(out chan<- prometheus.Metric) {
|
||||||
|
for _, c := range cs {
|
||||||
|
c.vec.Collect(out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs channelStats) describe(ch chan<- *prometheus.Desc) {
|
||||||
|
for _, c := range cs {
|
||||||
|
c.vec.Describe(ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs channelStats) reset() {
|
||||||
|
for _, c := range cs {
|
||||||
|
c.vec.Reset()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -19,7 +19,8 @@ type clientStats []struct {
|
|||||||
// Prometheus collection process. So be sure the number of clients
|
// Prometheus collection process. So be sure the number of clients
|
||||||
// is small enough when using this collector.
|
// is small enough when using this collector.
|
||||||
func ClientStats(namespace string) StatsCollector {
|
func ClientStats(namespace string) StatsCollector {
|
||||||
labels := []string{"type", "topic", "channel", "deflate", "snappy", "tls", "client_id", "hostname", "version", "remote_address"}
|
labels := []string{"topic", "channel", "deflate", "snappy", "tls", "client_id", "hostname", "version", "remote_address"}
|
||||||
|
namespace += "_client"
|
||||||
|
|
||||||
return clientStats{
|
return clientStats{
|
||||||
{
|
{
|
||||||
@ -90,12 +91,11 @@ func ClientStats(namespace string) StatsCollector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs clientStats) collect(s *stats, out chan<- prometheus.Metric) {
|
func (cs clientStats) set(s *stats) {
|
||||||
for _, topic := range s.Topics {
|
for _, topic := range s.Topics {
|
||||||
for _, channel := range topic.Channels {
|
for _, channel := range topic.Channels {
|
||||||
for _, client := range channel.Clients {
|
for _, client := range channel.Clients {
|
||||||
labels := prometheus.Labels{
|
labels := prometheus.Labels{
|
||||||
"type": "client",
|
|
||||||
"topic": topic.Name,
|
"topic": topic.Name,
|
||||||
"channel": channel.Name,
|
"channel": channel.Name,
|
||||||
"deflate": strconv.FormatBool(client.Deflate),
|
"deflate": strconv.FormatBool(client.Deflate),
|
||||||
@ -109,9 +109,26 @@ func (cs clientStats) collect(s *stats, out chan<- prometheus.Metric) {
|
|||||||
|
|
||||||
for _, c := range cs {
|
for _, c := range cs {
|
||||||
c.vec.With(labels).Set(c.val(client))
|
c.vec.With(labels).Set(c.val(client))
|
||||||
c.vec.Collect(out)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cs clientStats) collect(out chan<- prometheus.Metric) {
|
||||||
|
for _, c := range cs {
|
||||||
|
c.vec.Collect(out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs clientStats) describe(ch chan<- *prometheus.Desc) {
|
||||||
|
for _, c := range cs {
|
||||||
|
c.vec.Describe(ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs clientStats) reset() {
|
||||||
|
for _, c := range cs {
|
||||||
|
c.vec.Reset()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -14,7 +14,8 @@ type topicStats []struct {
|
|||||||
// TopicStats creates a new stats collector which is able to
|
// TopicStats creates a new stats collector which is able to
|
||||||
// expose the topic metrics of a nsqd node to Prometheus.
|
// expose the topic metrics of a nsqd node to Prometheus.
|
||||||
func TopicStats(namespace string) StatsCollector {
|
func TopicStats(namespace string) StatsCollector {
|
||||||
labels := []string{"type", "topic", "paused"}
|
labels := []string{"topic", "paused"}
|
||||||
|
namespace += "_topic"
|
||||||
|
|
||||||
return topicStats{
|
return topicStats{
|
||||||
{
|
{
|
||||||
@ -68,17 +69,32 @@ func TopicStats(namespace string) StatsCollector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts topicStats) collect(s *stats, out chan<- prometheus.Metric) {
|
func (ts topicStats) set(s *stats) {
|
||||||
for _, topic := range s.Topics {
|
for _, topic := range s.Topics {
|
||||||
labels := prometheus.Labels{
|
labels := prometheus.Labels{
|
||||||
"type": "topic",
|
|
||||||
"topic": topic.Name,
|
"topic": topic.Name,
|
||||||
"paused": strconv.FormatBool(topic.Paused),
|
"paused": strconv.FormatBool(topic.Paused),
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, c := range ts {
|
for _, c := range ts {
|
||||||
c.vec.With(labels).Set(c.val(topic))
|
c.vec.With(labels).Set(c.val(topic))
|
||||||
c.vec.Collect(out)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
func (ts topicStats) collect(out chan<- prometheus.Metric) {
|
||||||
|
for _, c := range ts {
|
||||||
|
c.vec.Collect(out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ts topicStats) describe(ch chan<- *prometheus.Desc) {
|
||||||
|
for _, c := range ts {
|
||||||
|
c.vec.Describe(ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ts topicStats) reset() {
|
||||||
|
for _, c := range ts {
|
||||||
|
c.vec.Reset()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
55
main.go
55
main.go
@ -23,11 +23,6 @@ var (
|
|||||||
enabledCollectors = flag.String("collect", "stats.topics,stats.channels", "Comma-separated list of collectors to use.")
|
enabledCollectors = flag.String("collect", "stats.topics,stats.channels", "Comma-separated list of collectors to use.")
|
||||||
namespace = flag.String("namespace", "nsq", "Namespace for the NSQ metrics.")
|
namespace = flag.String("namespace", "nsq", "Namespace for the NSQ metrics.")
|
||||||
|
|
||||||
collectorRegistry = map[string]func(names []string) (collector.Collector, error){
|
|
||||||
"stats": createNsqdStats,
|
|
||||||
}
|
|
||||||
|
|
||||||
// stats.* collectors
|
|
||||||
statsRegistry = map[string]func(namespace string) collector.StatsCollector{
|
statsRegistry = map[string]func(namespace string) collector.StatsCollector{
|
||||||
"topics": collector.TopicStats,
|
"topics": collector.TopicStats,
|
||||||
"channels": collector.ChannelStats,
|
"channels": collector.ChannelStats,
|
||||||
@ -65,47 +60,31 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func createNsqExecutor() (*collector.NsqExecutor, error) {
|
func createNsqExecutor() (*collector.NsqExecutor, error) {
|
||||||
collectors := make(map[string][]string)
|
|
||||||
for _, name := range strings.Split(*enabledCollectors, ",") {
|
|
||||||
name = strings.TrimSpace(name)
|
|
||||||
parts := strings.SplitN(name, ".", 2)
|
|
||||||
if len(parts) != 2 {
|
|
||||||
return nil, fmt.Errorf("invalid collector name: %s", name)
|
|
||||||
}
|
|
||||||
collectors[parts[0]] = append(collectors[parts[0]], parts[1])
|
|
||||||
}
|
|
||||||
|
|
||||||
ex := collector.NewNsqExecutor(*namespace)
|
|
||||||
for collector, subcollectors := range collectors {
|
|
||||||
newCollector, has := collectorRegistry[collector]
|
|
||||||
if !has {
|
|
||||||
return nil, fmt.Errorf("invalid collector: %s", collector)
|
|
||||||
}
|
|
||||||
|
|
||||||
c, err := newCollector(subcollectors)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
ex.AddCollector(collector, c)
|
|
||||||
}
|
|
||||||
return ex, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func createNsqdStats(statsCollectors []string) (collector.Collector, error) {
|
|
||||||
nsqdURL, err := normalizeURL(*nsqdURL)
|
nsqdURL, err := normalizeURL(*nsqdURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
stats := collector.NewNsqdStats(*namespace, nsqdURL)
|
ex := collector.NewNsqExecutor(*namespace, nsqdURL)
|
||||||
for _, c := range statsCollectors {
|
for _, param := range strings.Split(*enabledCollectors, ",") {
|
||||||
newStatsCollector, has := statsRegistry[c]
|
param = strings.TrimSpace(param)
|
||||||
if !has {
|
parts := strings.SplitN(param, ".", 2)
|
||||||
return nil, fmt.Errorf("unknown stats collector: %s", c)
|
if len(parts) != 2 {
|
||||||
|
return nil, fmt.Errorf("invalid collector name: %s", param)
|
||||||
}
|
}
|
||||||
stats.Use(newStatsCollector(*namespace))
|
if parts[0] != "stats" {
|
||||||
|
return nil, fmt.Errorf("invalid collector prefix: %s", parts[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
name := parts[1]
|
||||||
|
c, has := statsRegistry[name]
|
||||||
|
if !has {
|
||||||
|
return nil, fmt.Errorf("unknown stats collector: %s", name)
|
||||||
|
}
|
||||||
|
ex.Use(c(*namespace))
|
||||||
}
|
}
|
||||||
return stats, nil
|
return ex, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func normalizeURL(ustr string) (string, error) {
|
func normalizeURL(ustr string) (string, error) {
|
||||||
|
Reference in New Issue
Block a user