feat: 增加负载均衡切换

This commit is contained in:
王一之 2023-10-20 11:31:18 +08:00
parent 20f46c4aef
commit 1aef0c9dc8
4 changed files with 204 additions and 81 deletions

View File

@ -8,111 +8,145 @@ import (
"github.com/codfrm/cago/pkg/logger" "github.com/codfrm/cago/pkg/logger"
"github.com/codfrm/dnspod-watch/pkg/pushcat" "github.com/codfrm/dnspod-watch/pkg/pushcat"
"github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common"
dnspod "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/dnspod/v20210323" dnspod "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/dnspod/v20210323"
"go.uber.org/zap" "go.uber.org/zap"
) )
type record struct { type record struct {
w *watch w *watch
record *dnspod.RecordListItem record *dnspod.RecordListItem
isDisable bool loadBalance *dnspod.RecordListItem
domain, value string isDisable bool
logger *logger.CtxLogger domain *CheckDomain
logger *logger.CtxLogger
} }
func newRecord(w *watch, r *dnspod.RecordListItem, domain, value string) *record { func newRecord(w *watch, r *dnspod.RecordListItem, domain *CheckDomain) (*record, error) {
return &record{ ret := &record{
w: w, w: w,
record: r, record: r,
isDisable: false, isDisable: false,
domain: domain, value: value, domain: domain,
logger: logger.NewCtxLogger(logger.Default()).With( logger: logger.NewCtxLogger(logger.Default()).With(
zap.String("domain", domain), zap.String("value", value), zap.String("domain", domain.Domain), zap.String("value", domain.Value),
), ),
} }
if domain.LoadBalance != nil {
var err error
ret.loadBalance, err = w.queryRecord(domain.Domain, domain.Name,
domain.LoadBalance.Value, domain.LoadBalance.Line)
if err != nil {
return nil, err
}
}
return ret, nil
} }
// watch 每分钟检查ip是否可以访问, 无法访问自动暂停记录 // watch 每分钟检查ip是否可以访问, 无法访问自动暂停记录
func (r *record) watch(ctx context.Context) { func (r *record) watch(ctx context.Context) {
t := time.NewTicker(time.Minute) t := time.NewTicker(time.Second)
lastSwitch := time.Now() s := newRetry()
count := 0 loadBalance := newRetry()
for { for {
select { select {
case <-t.C: case <-t.C:
// 检查ip是否可以访问 duration, err := r.checkIP(ctx, r.domain.Value)
count++ if err != nil {
if err := r.checkIP(ctx, r.value); err != nil {
r.logger.Ctx(ctx).Error("check ip err", zap.Error(err)) r.logger.Ctx(ctx).Error("check ip err", zap.Error(err))
// 连续3次无法访问,暂停记录 } else {
if !r.isDisable && count > 3 { r.logger.Ctx(ctx).Info("check ip ok", zap.Duration("duration", duration))
count = 0 }
// 暂停记录 _ = s.check(err == nil, func() error {
request := dnspod.NewModifyRecordStatusRequest() // 增加负载均衡
request.SetContext(ctx) if r.domain.LoadBalance != nil {
request.Domain = common.StringPtr(r.domain) // 判断延迟是否超过200毫秒
request.RecordId = common.Uint64Ptr(*r.record.RecordId) loadBalance.check(duration > time.Millisecond*100, func() error {
request.Status = common.StringPtr("DISABLE") // 开启记录
_, err := r.w.client.ModifyRecordStatus(request) msg := fmt.Sprintf("开启负载均衡 域名: %s, 记录: %s",
msg := fmt.Sprintf("域名: %s, 记录: %s, ip无法访问,暂停记录", r.domain, r.value) r.domain.Domain, r.domain.LoadBalance.Value)
if err != nil { enableErr := r.w.enable(ctx, r.domain.Domain, *r.loadBalance.RecordId)
r.logger.Ctx(ctx).Error("modify record status err", zap.Error(err)) if enableErr != nil {
msg += "\n记录修改失败: " + err.Error() r.logger.Ctx(ctx).Error("modify record status err", zap.Error(enableErr))
} else { msg += "\n记录修改失败: " + enableErr.Error()
r.logger.Ctx(ctx).Info("modify record status success", } else {
zap.String("status", "DISABLE")) r.logger.Ctx(ctx).Info("modify record status success",
r.isDisable = true zap.String("status", "ENABLE"))
} r.isDisable = false
if err := pushcat.Send(ctx, "ip无法访问,暂停记录", msg); err != nil { }
r.logger.Ctx(ctx).Error("发送通知错误", if pushErr := pushcat.Send(ctx, "开启负载均衡", msg); pushErr != nil {
zap.Error(err), r.logger.Ctx(ctx).Error("发送通知错误",
zap.String("msg", msg)) zap.Error(pushErr),
} zap.String("msg", msg))
}
return enableErr
}, func() error {
// 开启记录
msg := fmt.Sprintf("关闭负载均衡 域名: %s, 记录: %s",
r.domain.Domain, r.domain.LoadBalance.Value)
disableErr := r.w.disable(ctx, r.domain.Domain, *r.loadBalance.RecordId)
if disableErr != nil {
r.logger.Ctx(ctx).Error("modify record status err", zap.Error(disableErr))
msg += "\n记录修改失败: " + disableErr.Error()
} else {
r.logger.Ctx(ctx).Info("modify record status success",
zap.String("status", "DISABLE"))
r.isDisable = false
}
if pushErr := pushcat.Send(ctx, "关闭负载均衡", msg); pushErr != nil {
r.logger.Ctx(ctx).Error("发送通知错误",
zap.Error(pushErr),
zap.String("msg", msg))
}
return disableErr
})
} }
} else if r.isDisable && count > 3 { // 开启记录
// 上次切换时间超过30分钟才能再次切换 msg := fmt.Sprintf("域名: %s, 记录: %s, ip可以访问,开启记录", r.domain.Domain, r.domain.Value)
if time.Since(lastSwitch) < time.Minute*30 { enableErr := r.w.enable(ctx, r.domain.Domain, *r.record.RecordId)
r.logger.Ctx(ctx).Info("ip可以访问,但是上次切换时间不足30分钟") if enableErr != nil {
continue r.logger.Ctx(ctx).Error("modify record status err", zap.Error(enableErr))
} msg += "\n记录修改失败: " + enableErr.Error()
lastSwitch = time.Now()
// 检查连续成功3次,开启记录
count = 0
request := dnspod.NewModifyRecordStatusRequest()
request.SetContext(ctx)
request.Domain = common.StringPtr(r.domain)
request.RecordId = common.Uint64Ptr(*r.record.RecordId)
request.Status = common.StringPtr("ENABLE")
_, err := r.w.client.ModifyRecordStatus(request)
msg := fmt.Sprintf("域名: %s, 记录: %s, ip可以访问,开启记录", r.domain, r.value)
if err != nil {
r.logger.Ctx(ctx).Error("modify record status err", zap.Error(err))
msg += "\n记录修改失败: " + err.Error()
} else { } else {
r.logger.Ctx(ctx).Info("modify record status success", r.logger.Ctx(ctx).Info("modify record status success",
zap.String("status", "ENABLE")) zap.String("status", "ENABLE"))
r.isDisable = false r.isDisable = false
} }
if err := pushcat.Send(ctx, "ip可以访问,开启记录", msg); err != nil { if pushErr := pushcat.Send(ctx, "ip可以访问,开启记录", msg); pushErr != nil {
r.logger.Ctx(ctx).Error("发送通知错误", r.logger.Ctx(ctx).Error("发送通知错误",
zap.Error(err), zap.Error(pushErr),
zap.String("msg", msg)) zap.String("msg", msg))
} }
} else { return enableErr
r.logger.Ctx(ctx).Info("ip is ok") }, func() error {
} // 暂停记录
msg := fmt.Sprintf("域名: %s, 记录: %s, ip无法访问,暂停记录", r.domain.Domain, r.domain.Value)
disableErr := r.w.disable(ctx, r.domain.Domain, *r.record.RecordId)
if disableErr != nil {
r.logger.Ctx(ctx).Error("modify record status err", zap.Error(disableErr))
msg += "\n记录修改失败: " + disableErr.Error()
} else {
r.logger.Ctx(ctx).Info("modify record status success",
zap.String("status", "DISABLE"))
r.isDisable = true
}
if pushErr := pushcat.Send(ctx, "ip无法访问,暂停记录", msg); pushErr != nil {
r.logger.Ctx(ctx).Error("发送通知错误",
zap.Error(pushErr),
zap.String("msg", msg))
}
return disableErr
})
case <-ctx.Done(): case <-ctx.Done():
t.Stop() t.Stop()
} }
} }
} }
func (r *record) checkIP(ctx context.Context, ip string) error { func (r *record) checkIP(ctx context.Context, ip string) (time.Duration, error) {
ts := time.Now()
con, err := net.DialTimeout("tcp", ip+":80", time.Second*10) con, err := net.DialTimeout("tcp", ip+":80", time.Second*10)
if err != nil { if err != nil {
return err return 0, err
} }
return con.Close() return time.Since(ts), con.Close()
} }

51
internal/watch/switch.go Normal file
View File

@ -0,0 +1,51 @@
package watch
import "time"
type retry struct {
lastStatus bool
currentStatus bool
count int
lastTime time.Time
}
func newRetry() *retry {
return &retry{
count: 0,
//lastTime: time.Now(),
}
}
func (r *retry) check(check bool, ok func() error, bad func() error) error {
if check {
if r.lastStatus {
r.count += 1
} else {
r.count = 0
}
r.lastStatus = true
if !r.currentStatus && r.count > 3 {
if time.Since(r.lastTime) < time.Minute*60 {
return nil
}
if err := ok(); err == nil {
r.currentStatus = true
r.lastTime = time.Now()
}
}
} else {
if !r.lastStatus {
r.count += 1
} else {
r.count = 0
}
r.lastStatus = false
if r.currentStatus && r.count > 3 {
if err := bad(); err == nil {
r.currentStatus = false
r.lastTime = time.Now()
}
}
}
return nil
}

View File

@ -1,15 +1,15 @@
package watch package watch
import ( import (
"context"
"errors" "errors"
"github.com/codfrm/cago/pkg/logger" "github.com/codfrm/cago/pkg/logger"
"github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common" "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common"
dnspod "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/dnspod/v20210323" dnspod "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/dnspod/v20210323"
"go.uber.org/zap" "go.uber.org/zap"
) )
func (w *watch) queryRecord(domain, name, value string) (*dnspod.RecordListItem, error) { func (w *watch) queryRecord(domain, name, value, line string) (*dnspod.RecordListItem, error) {
// 实例化一个请求对象,每个接口都会对应一个request对象 // 实例化一个请求对象,每个接口都会对应一个request对象
request := dnspod.NewDescribeRecordListRequest() request := dnspod.NewDescribeRecordListRequest()
@ -22,10 +22,40 @@ func (w *watch) queryRecord(domain, name, value string) (*dnspod.RecordListItem,
} }
for _, v := range response.Response.RecordList { for _, v := range response.Response.RecordList {
if *v.Name == name && *v.Value == value { if *v.Name == name && *v.Value == value {
if line != "" && *v.Line != line {
continue
}
logger.Default().Info("record found", zap.Any("record", v)) logger.Default().Info("record found", zap.Any("record", v))
return v, nil return v, nil
} }
} }
return nil, errors.New("record not found") return nil, errors.New("record not found")
}
func (w *watch) enable(ctx context.Context, domain string, recordId uint64) error {
// 开启记录
request := dnspod.NewModifyRecordStatusRequest()
request.SetContext(ctx)
request.Domain = common.StringPtr(domain)
request.RecordId = common.Uint64Ptr(recordId)
request.Status = common.StringPtr("ENABLE")
_, err := w.client.ModifyRecordStatus(request)
if err != nil {
return err
}
return nil
}
func (w *watch) disable(ctx context.Context, domain string, recordId uint64) error {
// 开启记录
request := dnspod.NewModifyRecordStatusRequest()
request.SetContext(ctx)
request.Domain = common.StringPtr(domain)
request.RecordId = common.Uint64Ptr(recordId)
request.Status = common.StringPtr("DISABLE")
_, err := w.client.ModifyRecordStatus(request)
if err != nil {
return err
}
return nil
} }

View File

@ -12,10 +12,17 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
type LoadBalance struct {
Value string // 记录值
Line string // 线路
}
type CheckDomain struct { type CheckDomain struct {
Domain string // 域名 Domain string // 域名
Name string // 记录名 Name string // 记录名
Value []string // 记录值 Value string // 记录值
Line string // 线路
LoadBalance *LoadBalance `yaml:"loadBalance"` // 负载均衡
} }
type Config struct { type Config struct {
@ -57,14 +64,15 @@ func (w *watch) Start(ctx context.Context, cfg *configs.Config) error {
return err return err
} }
for _, c := range w.config.CheckDomain { for _, c := range w.config.CheckDomain {
for _, v := range c.Value { r, err := w.queryRecord(c.Domain, c.Name, c.Value, c.Line)
r, err := w.queryRecord(c.Domain, c.Name, v) if err != nil {
if err != nil { return err
return err
}
record := newRecord(w, r, c.Domain, v)
go record.watch(ctx)
} }
record, err := newRecord(w, r, c)
if err != nil {
return err
}
go record.watch(ctx)
} }
return nil return nil
} }