From 0853025ee2db0ac1ee22a5982528fa9e11588a0c Mon Sep 17 00:00:00 2001 From: "suguo.yao" Date: Mon, 13 Sep 2021 20:52:49 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0consul=E5=8F=8Agrpc=E7=BB=84?= =?UTF-8?q?=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 + consul/consul.go | 185 +++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 20 +++++ grpc/grpc.go | 49 +++++++++++++ 4 files changed, 256 insertions(+) create mode 100644 consul/consul.go create mode 100644 grpc/grpc.go diff --git a/README.md b/README.md index 381a7e2..4c3c98c 100644 --- a/README.md +++ b/README.md @@ -14,3 +14,5 @@ > 支持连接池dbresolver * Sqlite > 支持多级目录创建 +* Grpc +* Consul diff --git a/consul/consul.go b/consul/consul.go new file mode 100644 index 0000000..7d0513a --- /dev/null +++ b/consul/consul.go @@ -0,0 +1,185 @@ +package consul + +import ( + "context" + "errors" + "fmt" + "math/rand" + "time" + + consulapi "github.com/hashicorp/consul/api" + "google.golang.org/grpc" +) + +var ( + _client *consulapi.Client + conf *Config = &Config{ + Address: "127.0.0.1:8500", + } +) + +type Config struct { + Address string +} + +//Init 初始化consul连接 +func Init(config *Config) error { + if config == nil { + return errors.New("config 空实例") + } + conf = config + return nil +} + +func New() (*consulapi.Client, error) { + if _client != nil { + return _client, nil + } + + // 创建连接consul服务配置 + config := consulapi.DefaultConfig() + config.Address = conf.Address + client, err := consulapi.NewClient(config) + if err != nil { + return nil, err + } + _client = client + return client, nil +} + +//Register 注册服务到consul +func Register(name string, addr string, port int, tags ...string) error { + client, err := New() + if err != nil { + return err + } + if client == nil { + return errors.New("consul 实例空") + } + + // 创建注册到consul的服务到 + registration := new(consulapi.AgentServiceRegistration) + registration.ID = fmt.Sprintf("%s-%s:%d", name, addr, port) + registration.Name = name + registration.Port = port + registration.Tags = tags + registration.Address = addr + + // 增加consul健康检查回调函数 + check := new(consulapi.AgentServiceCheck) + check.GRPC = fmt.Sprintf("%s:%d", registration.Address, registration.Port) + check.Timeout = "5s" + check.Interval = "5s" + check.DeregisterCriticalServiceAfter = "30s" // 故障检查失败30s后 consul自动将注册服务删除 + registration.Check = check + + // 注册服务到consul + if err := client.Agent().ServiceRegister(registration); err != nil { + return err + } + return nil +} + +//DeRegister 取消consul注册的服务 +func DeRegister(name string, addr string, port int) error { + client, err := New() + if err != nil { + return err + } + if client == nil { + return errors.New("consul 实例空") + } + client.Agent().ServiceDeregister(fmt.Sprintf("%s-%s:%d", name, addr, port)) + return nil +} + +//FindNode 查找节点 +func FindNode(servicename string) (*consulapi.AgentService, error) { + client, err := New() + if err != nil { + return nil, err + } + + if client == nil { + return nil, errors.New("consul 实例空") + } + services, _, err := client.Health().Service(servicename, "", true, nil) + if err != nil { + return nil, err + } + l := len(services) + if l == 0 { + return nil, nil + } + if l == 1 { + return services[0].Service, nil + } + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + return services[r.Intn(l)%l].Service, nil +} + +//FindServer 从consul中发现服务 +func FindServer(servicename string) (*grpc.ClientConn, error) { + node, err := FindNode(servicename) + if err != nil { + return nil, err + } + if node == nil { + return nil, errors.New("微服务" + servicename + "不可用,稍后再试!") + } + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + conn, err := grpc.DialContext(ctx, fmt.Sprintf("%s:%d", node.Address, node.Port), grpc.WithBlock(), grpc.WithInsecure(), grpc.WithBalancerName("round_robin")) + if err != nil { + return nil, err + } + return conn, nil +} + +//CheckHeath 健康检查 +func CheckHeath(serviceid string) error { + client, err := New() + if err != nil { + return err + } + if client == nil { + return errors.New("consul 实例空") + } + // 健康检查 + // a, b, _ := client.Agent().AgentHealthServiceByID(serviceid) + return nil +} + +//KVPut test +func KVPut(key string, values *[]byte, flags uint64) (*consulapi.WriteMeta, error) { + client, err := New() + if err != nil { + return nil, err + } + if client == nil { + return nil, errors.New("consul 实例空") + } + + return client.KV().Put(&consulapi.KVPair{Key: key, Flags: flags, Value: *values}, nil) +} + +//KVGet 获取值 +func KVGet(key string, flags uint64) (*[]byte, error) { + client, err := New() + if err != nil { + return nil, err + } + if client == nil { + return nil, errors.New("consul 实例空") + } + + // KV get值 + data, _, _ := client.KV().Get(key, nil) + if data != nil { + return &data.Value, nil + } + + return nil, nil +} diff --git a/go.mod b/go.mod index 228f439..da74bc9 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,8 @@ module myschools.me/suguo/snippet go 1.17 require ( + github.com/hashicorp/consul/api v1.10.1 + google.golang.org/grpc v1.40.0 gorm.io/driver/mysql v1.1.2 gorm.io/driver/sqlite v1.1.4 gorm.io/gorm v1.21.14 @@ -10,8 +12,26 @@ require ( ) require ( + github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect + github.com/fatih/color v1.9.0 // indirect github.com/go-sql-driver/mysql v1.6.0 // indirect + github.com/golang/protobuf v1.4.3 // indirect + github.com/hashicorp/go-cleanhttp v0.5.1 // indirect + github.com/hashicorp/go-hclog v0.12.0 // indirect + github.com/hashicorp/go-immutable-radix v1.0.0 // indirect + github.com/hashicorp/go-rootcerts v1.0.2 // indirect + github.com/hashicorp/golang-lru v0.5.0 // indirect + github.com/hashicorp/serf v0.9.5 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.2 // indirect + github.com/mattn/go-colorable v0.1.6 // indirect + github.com/mattn/go-isatty v0.0.12 // indirect github.com/mattn/go-sqlite3 v1.14.5 // indirect + github.com/mitchellh/go-homedir v1.1.0 // indirect + github.com/mitchellh/mapstructure v1.1.2 // indirect + golang.org/x/net v0.0.0-20200822124328-c89045814202 // indirect + golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd // indirect + golang.org/x/text v0.3.2 // indirect + google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect + google.golang.org/protobuf v1.25.0 // indirect ) diff --git a/grpc/grpc.go b/grpc/grpc.go new file mode 100644 index 0000000..55cf5b4 --- /dev/null +++ b/grpc/grpc.go @@ -0,0 +1,49 @@ +package grpc + +import ( + "fmt" + "log" + "net" + + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" +) + +type GrpcService interface { + Register(srv *grpc.Server) +} + +type Config struct { + Address string + Port int + AppName string +} + +var rpc *grpc.Server +var conf Config + +func Init(config Config) *grpc.Server { + if rpc == nil { + rpc = grpc.NewServer(grpc.MaxRecvMsgSize(1024*1024), grpc.MaxSendMsgSize(1024*1024)) + } + conf = config + return rpc +} + +func Start() { + //注册反射 用于grpcurl调试 + reflection.Register(rpc) + + // grpc服务启动 + go func() { + log.Printf("starting grpc service on %s:%d", conf.Address, conf.Port) + lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", conf.Address, conf.Port)) + if err != nil { + log.Fatal("fail to open port: ", err) + } + err = rpc.Serve(lis) + if err != nil { + log.Fatal("fail to open microservice: ", err) + } + }() +}