增加consul及grpc组件

This commit is contained in:
suguo.yao 2021-09-13 20:52:49 +08:00
parent c03c0f06ab
commit 0853025ee2
4 changed files with 256 additions and 0 deletions

View File

@ -14,3 +14,5 @@
> 支持连接池dbresolver
* Sqlite
> 支持多级目录创建
* Grpc
* Consul

185
consul/consul.go Normal file
View File

@ -0,0 +1,185 @@
package consul
import (
"context"
"errors"
"fmt"
"math/rand"
"time"
consulapi "github.com/hashicorp/consul/api"
"google.golang.org/grpc"
)
var (
_client *consulapi.Client
conf *Config = &Config{
Address: "127.0.0.1:8500",
}
)
type Config struct {
Address string
}
//Init 初始化consul连接
func Init(config *Config) error {
if config == nil {
return errors.New("config 空实例")
}
conf = config
return nil
}
func New() (*consulapi.Client, error) {
if _client != nil {
return _client, nil
}
// 创建连接consul服务配置
config := consulapi.DefaultConfig()
config.Address = conf.Address
client, err := consulapi.NewClient(config)
if err != nil {
return nil, err
}
_client = client
return client, nil
}
//Register 注册服务到consul
func Register(name string, addr string, port int, tags ...string) error {
client, err := New()
if err != nil {
return err
}
if client == nil {
return errors.New("consul 实例空")
}
// 创建注册到consul的服务到
registration := new(consulapi.AgentServiceRegistration)
registration.ID = fmt.Sprintf("%s-%s:%d", name, addr, port)
registration.Name = name
registration.Port = port
registration.Tags = tags
registration.Address = addr
// 增加consul健康检查回调函数
check := new(consulapi.AgentServiceCheck)
check.GRPC = fmt.Sprintf("%s:%d", registration.Address, registration.Port)
check.Timeout = "5s"
check.Interval = "5s"
check.DeregisterCriticalServiceAfter = "30s" // 故障检查失败30s后 consul自动将注册服务删除
registration.Check = check
// 注册服务到consul
if err := client.Agent().ServiceRegister(registration); err != nil {
return err
}
return nil
}
//DeRegister 取消consul注册的服务
func DeRegister(name string, addr string, port int) error {
client, err := New()
if err != nil {
return err
}
if client == nil {
return errors.New("consul 实例空")
}
client.Agent().ServiceDeregister(fmt.Sprintf("%s-%s:%d", name, addr, port))
return nil
}
//FindNode 查找节点
func FindNode(servicename string) (*consulapi.AgentService, error) {
client, err := New()
if err != nil {
return nil, err
}
if client == nil {
return nil, errors.New("consul 实例空")
}
services, _, err := client.Health().Service(servicename, "", true, nil)
if err != nil {
return nil, err
}
l := len(services)
if l == 0 {
return nil, nil
}
if l == 1 {
return services[0].Service, nil
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
return services[r.Intn(l)%l].Service, nil
}
//FindServer 从consul中发现服务
func FindServer(servicename string) (*grpc.ClientConn, error) {
node, err := FindNode(servicename)
if err != nil {
return nil, err
}
if node == nil {
return nil, errors.New("微服务" + servicename + "不可用,稍后再试!")
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, fmt.Sprintf("%s:%d", node.Address, node.Port), grpc.WithBlock(), grpc.WithInsecure(), grpc.WithBalancerName("round_robin"))
if err != nil {
return nil, err
}
return conn, nil
}
//CheckHeath 健康检查
func CheckHeath(serviceid string) error {
client, err := New()
if err != nil {
return err
}
if client == nil {
return errors.New("consul 实例空")
}
// 健康检查
// a, b, _ := client.Agent().AgentHealthServiceByID(serviceid)
return nil
}
//KVPut test
func KVPut(key string, values *[]byte, flags uint64) (*consulapi.WriteMeta, error) {
client, err := New()
if err != nil {
return nil, err
}
if client == nil {
return nil, errors.New("consul 实例空")
}
return client.KV().Put(&consulapi.KVPair{Key: key, Flags: flags, Value: *values}, nil)
}
//KVGet 获取值
func KVGet(key string, flags uint64) (*[]byte, error) {
client, err := New()
if err != nil {
return nil, err
}
if client == nil {
return nil, errors.New("consul 实例空")
}
// KV get值
data, _, _ := client.KV().Get(key, nil)
if data != nil {
return &data.Value, nil
}
return nil, nil
}

20
go.mod
View File

@ -3,6 +3,8 @@ module myschools.me/suguo/snippet
go 1.17
require (
github.com/hashicorp/consul/api v1.10.1
google.golang.org/grpc v1.40.0
gorm.io/driver/mysql v1.1.2
gorm.io/driver/sqlite v1.1.4
gorm.io/gorm v1.21.14
@ -10,8 +12,26 @@ require (
)
require (
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
github.com/fatih/color v1.9.0 // indirect
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/golang/protobuf v1.4.3 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/hashicorp/go-hclog v0.12.0 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.0 // indirect
github.com/hashicorp/serf v0.9.5 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.2 // indirect
github.com/mattn/go-colorable v0.1.6 // indirect
github.com/mattn/go-isatty v0.0.12 // indirect
github.com/mattn/go-sqlite3 v1.14.5 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.1.2 // indirect
golang.org/x/net v0.0.0-20200822124328-c89045814202 // indirect
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd // indirect
golang.org/x/text v0.3.2 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
google.golang.org/protobuf v1.25.0 // indirect
)

49
grpc/grpc.go Normal file
View File

@ -0,0 +1,49 @@
package grpc
import (
"fmt"
"log"
"net"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
type GrpcService interface {
Register(srv *grpc.Server)
}
type Config struct {
Address string
Port int
AppName string
}
var rpc *grpc.Server
var conf Config
func Init(config Config) *grpc.Server {
if rpc == nil {
rpc = grpc.NewServer(grpc.MaxRecvMsgSize(1024*1024), grpc.MaxSendMsgSize(1024*1024))
}
conf = config
return rpc
}
func Start() {
//注册反射 用于grpcurl调试
reflection.Register(rpc)
// grpc服务启动
go func() {
log.Printf("starting grpc service on %s:%d", conf.Address, conf.Port)
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", conf.Address, conf.Port))
if err != nil {
log.Fatal("fail to open port: ", err)
}
err = rpc.Serve(lis)
if err != nil {
log.Fatal("fail to open microservice: ", err)
}
}()
}