# 微服务网关
Tcp为啥需要三次握手、四次挥手
- 三次握手主要目的是保证连接的双工(发送和接受可同时),可靠更多的是通过重传机制保证的
# 三次握手
# 四次挥手
1、为什么需要等待2MSL?
- MSL: Maximum Segment Lifetime ,30s-60s
- 保证Tcp连接的成功关闭,如果最后client的ACK包没有被服务端接收到,那么服务端将处于LAST-ACK阶段,需要客户端重传ACK,所以需要主动关闭方等待哈
- 保证这次连接的重复数据段从网络中消息,也就是说如果马上又有一个连接从客户端的这个端口发起,等待2msl保证上次的重传或者连接能成功清除,不会混淆到下一个连接中
2、为撒会出现大量的close_wait?
- 首先close_wait一般出现在被动关闭方,因为服务端没有关闭连接
- 并发请求太多
- 被动关闭方没有及时释放端口资源导致,卡在close_wait,程序就结束了
# 使用抓包工具分析
# Tcp为啥需要流量控制
- 由于通讯双方,网速不同,通讯方任一方发送过快都会导致对方消息处理不过来,所以需要把数据放到缓冲区中
- 如果缓冲区满了,发送方还在发送,那接收方只能把数据包丢弃,因此需要控制发送速率
- 我们缓冲区剩余大小称之为接受窗口,用变量win表示,如果win=0,则发送方停止发送
# Tcp为撒需要拥塞控制
- 流量控制与拥塞控制是两个概念,拥塞控制是调节网络负载
- 接受方网络资源繁忙,因未及时响应ACK导致发送方重传大量数据,这样将会导致网络更加拥堵
- 拥塞控制是动态调整win大小,不只是依赖缓冲区大小确定窗口大小
# 为什么会出现粘/拆包
- 应用程序写入的数据大于套接字缓冲区大小,这将会发生拆包
- 应用程序写入数据小于套接字缓冲区的大小,网卡将应用多次写入数据发送到网络上,这将发生粘包
- 进行mss(最大报文长度)大小的Tcp分段,当Tcp报文长度-Tcp头部长度>mss的时候拆包
如何避免粘包/拆包,获取完成数据报文?
- 使用带消息头的协议,头部写入包长度,然后读取指定长度
- 设置定长消息
- 设置消息边界
- 更复杂的协议:json、protobuf
代码实现:
Unpack/pack.go
package unpack
import (
"encoding/binary"
"errors"
"io"
)
const Msg_hander ="12345678"
// 编码
func Encode(bytesByffer io.Writer, content string) error {
// msg_handle + content_len + content
// 8 + 4 + len
if err := binary.Write(bytesByffer,binary.BigEndian,[]byte(Msg_hander)); err != nil { // 使用二进制编码
return err
}
clen := int32(len([]byte(content))) // 转化为32位,4字节的长度
if err := binary.Write(bytesByffer,binary.BigEndian,clen); err != nil {
return err
}
if err := binary.Write(bytesByffer,binary.BigEndian,[]byte(content)); err != nil {
return err
}
return nil
}
// 解码
func Decode(bytesBuffer io.Reader) (bodyBuf []byte, err error) {
MagicBuf := make([]byte,len(Msg_hander))
if _, err = io.ReadFull(bytesBuffer,MagicBuf); err != nil {
return nil ,err
}
if string(MagicBuf) != Msg_hander {
return nil , errors.New("msg_hander error")
}
lengthBuf := make([]byte, 4)
if _, err := io.ReadFull(bytesBuffer,lengthBuf); err != nil {
return nil , errors.New("length error")
}
length := binary.BigEndian.Uint32(lengthBuf)
bodyBuf = make([]byte, length)
if _, err = io.ReadFull(bytesBuffer,bodyBuf);err != nil {
return nil ,err
}
return bodyBuf, nil
}
client.go
func main() {
// 1、 连接服务器
conn, err := net.Dial("tcp", "0.0.0.0:9999")
defer conn.Close()
if err != nil {
fmt.Println("connect failed")
return
}
unpack.Encode(conn, "hello, world 000 !!!")
}
Server.go
func process(conn net.Conn) {
defer conn.Close()
for {
buf, err := unpack.Decode(conn)
if err != nil {
fmt.Printf("read from connect failed , err: %v\n", err)
break
}
str := string(buf)
fmt.Printf("receive from client, data: %v\n",str)
}
}
func main(){
// 1、监听端口
listern, err := net.Listen("tcp","0.0.0.0:9999")
if err != nil {
fmt.Printf("listen fail,err: %v\n",err)
}
// 2、建立套接字连接
for {
conn, err := listern.Accept()
if err != nil {
fmt.Printf("accept fail,err : %v\n", err)
continue
}
// 3、协程处理监听服务
go process(conn)
}
}
# Tcp拥塞控制
- 慢开始和拥塞避免
- 快速重传和快速恢复
# Udp连接通信
Client.go
func main() {
// 监听服务器
conn, err := net.DialUDP("udp",nil, &net.UDPAddr{
IP: net.IPv4(127,0,0,1),
Port: 9909,
})
if err != nil {
fmt.Println("listen failed")
return
}
for i := 0; i <100; i++ {
_,err = conn.Write([]byte("hello server"))
if err != nil {
fmt.Printf("send data failed :%v", err)
return
}
result := make([]byte,1024)
n, remoteAddr, err := conn.ReadFromUDP(result)
if err != nil {
fmt.Printf("receive data failed %v",err)
return
}
fmt.Printf("recevice data: %v, addr: %v \n", string(result[:n]),remoteAddr)
}
}
Server.go
func main() {
// 监听服务器
listen, err := net.ListenUDP("udp",&net.UDPAddr{
IP: net.IPv4(0,0,0,0),
Port: 9909,
})
if err != nil {
fmt.Println("listen failed")
return
}
for {
var data [1024]byte
n, addr, err := listen.ReadFromUDP(data[:])
if err != nil {
fmt.Printf("read failed from addr: %s",addr)
break
}
go func() {
fmt.Printf("read failed from addr: %s ,data:%v, count:%v\n",addr, string(data[:n]),n)
_, err := listen.WriteToUDP([]byte("received sucess"),addr)
if err != nil {
fmt.Printf("write failed : %v",err)
}
}()
}
}
# Tcp连接通信
server端
package main
import (
"fmt"
"net"
)
func process(conn net.Conn) {
defer conn.Close() // 如果服务端不主动关闭,会怎么样?
for {
var buf [128]byte
n, err := conn.Read(buf[:])
if err != nil {
fmt.Printf("read from connect failed , err: %v\n", err)
break
}
str := string(buf[:n])
fmt.Printf("receive from client, data: %v\n",str)
}
}
func main(){
// 1、监听端口
listern, err := net.Listen("tcp","0.0.0.0:9999")
if err != nil {
fmt.Printf("listen fail,err: %v\n",err)
}
// 2、建立套接字连接
for {
conn, err := listern.Accept()
if err != nil {
fmt.Printf("accept fail,err : %v\n", err)
continue
}
// 3、协程处理监听服务
go process(conn)
}
}
client端
package main
import (
"bufio"
"fmt"
"net"
"os"
"strings"
)
func main() {
// 1、 连接服务器
conn, err := net.Dial("tcp", "0.0.0.0:9999")
defer conn.Close() // 思考:如果不关闭回发生什么?如果客户端不主动关闭连接,该连接还是会保持着占用
if err != nil {
fmt.Println("connect failed")
return
}
// 2、读取命令行输入
inputReader := bufio.NewReader(os.Stdin)
for {
input, err := inputReader.ReadString('\n')
if err != nil {
fmt.Printf("read from console failed")
break
}
// 4、读取Q停止
trimmedInput := strings.TrimSpace(input)
if trimmedInput == "Q" {
break
}
// 5、回复消息
_, err = docke.Write([]byte(trimmedInput))
if err != nil {
fmt.Printf("write failed, err : %s\v",err)
break
}
}
}
注意:如果是服务端没有主动关闭连接的动作,那么将会使得client端处于“fin_wait"阶段、服务端处于“close_wait"阶段(服务端协程已经关闭)
# Http服务通信
server端
package main
import (
"log"
"net/http"
"time"
)
var (
Addr = ":1212"
)
func sayBye(w http.ResponseWriter, r *http.Request) {
time.Sleep(1 * time.Second)
w.Write([]byte("bye bye"))
}
func main() {
// 创建路由器
mux := http.NewServeMux()
// 设置路由规则
mux.HandleFunc("/bye",sayBye)
// 创建服务器
server := &http.Server{
Addr: Addr,
WriteTimeout: time.Second*3,
Handler: mux,
}
// 监听端口并提供服务
log.Println("Starting httpServer at ", Addr)
log.Fatal(server.ListenAndServe())
}
client端
package main
import (
"fmt"
"io/ioutil"
"net"
"net/http"
"time"
)
func main() {
// 创建连接池
transport := &http.Transport{
DialContext: (&net.Dialer{
Timeout: 30*time.Second, // 连接超时
KeepAlive: 30*time.Second, // 长连接超时时间
}).DialContext,
MaxIdleConns:100, // 最大空闲连接
IdleConnTimeout:90*time.Second, // 空闲超时使劲啊
TLSHandshakeTimeout:10*time.Second, // tls握手超时时间
ExpectContinueTimeout:1*time.Second, // 100-continue状态吗超时时间
}
// 创建客户端
client := &http.Client{
Timeout: time.Second * 30,
Transport:transport,
}
// 请求数据
resp, err := client.Get("http://127.0.0.1:1212/bye")
if err != nil {
panic(err)
}
defer resp.Body.Close() // 不释放会独占内存
// 读取数据
bas, err := ioutil.ReadAll(resp.Body)
if err != nil {
panic(err)
}
fmt.Println(string(bas))
}
# 网络代理
- 用户通过代理请求信息
- 请求通过网络代理完成转发到达目标服务器
- 目标服务器响应后通过网络代理回传给用户
网络转发: 客户端ip ——> 经过路由转发 ——> 目标服务器
- 也就是说网络转发是路由器对报文的转发操作,中间可能对数据包进行修改
网络代理: 客户端ip(携带代理服务器的ip)——> 代理服务器 ——> 分发服务到后台服务
- 用户不直接连接服务器,网络代理去连接,获取数据后返回给用户
# 正向代理
是一种客户端的代理技术,帮助客户端访问无法访问的服务资源,可以隐藏用户的真实IP,比如浏览器web代理,VPN等
实现一个web浏览器代理:
代理接受客户端请求,复制原请求对象,并根据数据配置新请求各种参数
把新请求发送到真实服务端,并接收到服务端返回
代理服务器对响应做一些处理,然后返回给客户端
用户请求——>代理服务器监听——>交给上游TCP连接——>回调方法——>拷贝请求数据——>请求下游服务Transport RoundTrip——>回写上游数据
代码实现:
package main import ( "fmt" "io" "net" "net/http" "strings" ) type Pxy struct{} func (p *Pxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { fmt.Printf("Received request %s %s %s\n",req.Method,req.Host,req.RemoteAddr) transport := http.DefaultTransport // 数据连接池 // step1, 浅拷贝对象,然后就再新增属性数据,因为浅拷贝可以保存新增属性 outReq := new(http.Request) *outReq = *req if clienIP,_,err := net.SplitHostPort(req.RemoteAddr); err == nil { if prior,ok := outReq.Header["X-Forwarded-For"];ok { // X-Forwarded-For 是一个代理Ip的链表 clienIP = strings.Join(prior,", ") + ", "+clienIP } outReq.Header.Set("X-Forwarded-For",clienIP) } // step2, 请求下游 res, err := transport.RoundTrip(outReq) // 获取到下游的响应数据 if err != nil { rw.WriteHeader(http.StatusBadGateway) return } // step3, 下游请求内容返回给上游 for key, value := range res.Header { for _, v := range value { rw.Header().Add(key,v) // 头写入 } } //rw.Header().Add("xxx","sss") // 可以给每个请求进行修改 rw.WriteHeader(res.StatusCode) io.Copy(rw,res.Body) // 数据写入 res.Body.Close() } func main() { fmt.Println("Serve on :8080") http.Handle("/",&Pxy{}) // Handle中的第二个参数必须实现ServeHTTP http.ListenAndServe("0.0.0.0:8080",nil) }
服务启动后,打开浏览器,设置网络代理,设置好web网页代理到127.0.0.1,端口为8080,然后去访问一个http网址,如:www.tianya.cn
# 反向代理
是一种服务端的代理技术,帮助服务器做负载均衡、缓存、提供安全校验等,可以隐藏服务器真实IP,比如LVS技术、nginx proxy_pass等,所以我们可以在这做:负载、安全校验等功能
- 代理接收客户端请求,更改请求结构体信息
- 通过一定的负载均衡算法获取下游服务地址
- 把请求发送到下游服务器,并获取返回内容
- 对返回内容做一些处理,返回给客户端
简单的http代理服务实现:
real_server/main.go: 真实后端服务
package main import ( "fmt" "io" "log" "net/http" "os" "os/signal" "syscall" "time" ) type RealServer struct { Addr string } func (r *RealServer) HelloHandler(w http.ResponseWriter, req *http.Request) { //127.0.0.1:8008/abc?sdsdsa=11 //r.Addr=127.0.0.1:8008 //req.URL.Path=/abc upath := fmt.Sprintf("http://%s%s\n",r.Addr,req.URL.Path) realIp := fmt.Sprintf("RemoteAddr=%s,X-Forwarded-For=%v,X-Real-IP=%v\n",req.RemoteAddr,req.Header.Get("X-Forwarded-For"),req.Header.Get("X-Real-Ip")) header := fmt.Sprintf("headers=%v\n",req.Header) io.WriteString(w,upath) io.WriteString(w,realIp) io.WriteString(w,header) } func (r *RealServer) ErrorHandler(w http.ResponseWriter, req *http.Request) { upath := "error handler" w.WriteHeader(500) io.WriteString(w, upath) } func (r *RealServer) TimeoutHandler(w http.ResponseWriter, req *http.Request) { time.Sleep(6*time.Second) upath := "timeout handler" w.WriteHeader(200) io.WriteString(w, upath) } func (r *RealServer) Run() { log.Println("Starting httpServer at: ",r.Addr) mux := http.NewServeMux() mux.HandleFunc("/",r.HelloHandler) mux.HandleFunc("/base/error", r.ErrorHandler) mux.HandleFunc("/test_http_string/test_http_string/aaa", r.TimeoutHandler) server := &http.Server{ Addr: r.Addr, WriteTimeout: time.Second * 3, Handler: mux, } go func() { log.Fatal(server.ListenAndServe()) }() } func main() { rs1 := &RealServer{Addr: "127.0.0.1:2003"} rs1.Run() rs2 := &RealServer{Addr: "127.0.0.1:2004"} rs2.Run() quit := make(chan os.Signal) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) fmt.Println("Got signal:", <-quit) }
reverse_proxy_base/main.go: 反向代理服务
package main import ( "bufio" "log" "net/http" "net/url" ) var ( proxy_addr = "http://127.0.0.1:2003" port = "2002" ) func handler(w http.ResponseWriter,r *http.Request) { // step1 解析代理地址,并更改请求体的协议和主体 proxy, err := url.Parse(proxy_addr) r.URL.Scheme = proxy.Scheme // 协议 r.URL.Host = proxy.Host // step2 请求下游服务 transport := http.DefaultTransport resp, err := transport.RoundTrip(r) if err != nil { log.Println(err) return } // step3 把下游请求内容返回给上游 for k, vv := range resp.Header { for _, v := range vv { w.Header().Add(k,v) } } defer resp.Body.Close() bufio.NewReader(resp.Body).WriteTo(w) // 将拿到的下游响应写入 } func main() { http.HandleFunc("/",handler) log.Println("Start serving on port: ",port) err := http.ListenAndServe(":"+port,nil) if err != nil { log.Fatal(err) } }
上面实现了2002代理2003端口的服务
# HTTP代理
之前的是实现了简单的网络代理,其实我们可以在此基础上实现更为复杂的其他功能:
- 错误回调、错误日志处理
- 更改代理返回内容
- 负载均衡
- url重写
- 限流、熔断、降级
- 数据统计、权限验证等
我们以后会使用ReverseProxy来实现如上功能:
- 4种负载轮询类型的实现和接口封装
- 拓展中间件支持:限流、熔断、权限、统计等
# ReverseProxy结构
type ReverseProxy struct {
// Director must be a function which modifies
// the request into a new request to be sent
// using Transport. Its response is then copied
// back to the original client unmodified.
// Director must not access the provided Request
// after returning.
Director func(*http.Request) // 控制器,一个函数,可以对请求内容进行修改,如请求参数
// The transport used to perform proxy requests.
// If nil, http.DefaultTransport is used.
Transport http.RoundTripper // 连接池,默认是DefaultTransport
// FlushInterval specifies the flush interval
// to flush to the client while copying the
// response body.
// If zero, no periodic flushing is done.
// A negative value means to flush immediately
// after each write to the client.
// The FlushInterval is ignored when ReverseProxy
// recognizes a response as a streaming response;
// for such responses, writes are flushed to the client
// immediately.
FlushInterval time.Duration // 客户端的刷新问题
// ErrorLog specifies an optional logger for errors
// that occur when attempting to proxy the request.
// If nil, logging is done via the log package's standard logger.
ErrorLog *log.Logger // 错误记录器
// BufferPool optionally specifies a buffer pool to
// get byte slices for use by io.CopyBuffer when
// copying HTTP response bodies.
BufferPool BufferPool // 定义一个缓冲池,在复制http相应时使用,提高请求效率
// ModifyResponse is an optional function that modifies the
// Response from the backend. It is called if the backend
// returns a response at all, with any HTTP status code.
// If the backend is unreachable, the optional ErrorHandler is
// called without any call to ModifyResponse.
//
// If ModifyResponse returns an error, ErrorHandler is called
// with its error value. If ErrorHandler is nil, its default
// implementation is used.
ModifyResponse func(*http.Response) error // 修改response的函数,修改响应内容,如果有error会触发下面的ErrorHandler
// ErrorHandler is an optional function that handles errors
// reaching the backend or errors from ModifyResponse.
//
// If nil, the default is to log the provided error and return
// a 502 Status Bad Gateway response.
ErrorHandler func(http.ResponseWriter, *http.Request, error) // 错误处理回调函数,如果为nil,遇到错误返回502
}
# 重写URL
简单实用ReverseProxy来实现反向代理
package main
import (
"log"
"net/http"
"net/http/httputil"
"net/url"
)
var add = "127.0.0.1:2002"
func main() {
// 需要代理的下游服务url
rs1 := "http://127.0.0.1:2003/base" // 实现了重写url的规则
//127.0.0.1:2002/xxx
//127.0.0.1:2003/base/xxx
url1,err1 := url.Parse(rs1) // 解析url
if err1 != nil{
log.Println(err1)
}
proxy := httputil.NewSingleHostReverseProxy(url1) // 传递个url进去后,实现了一个direct,然后注册到ReverseProxy并返回
log.Println("Starting httpServer at: "+add)
log.Fatal(http.ListenAndServe(add,proxy)) // proxy中实现了Handler接口中的ServeHTTP
}
# 更改内容
reverse_proxy_step/main.go:重写下游返回的响应体,2002代理2003的服务
package main
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/http/httputil"
"net/url"
"regexp"
"strings"
)
var addr = "127.0.0.1:2002"
func main() {
//127.0.0.1:2002/xxx
//127.0.0.1:2003/base/xxx
rs1 := "http://127.0.0.1:2003/base"
url1, err1 := url.Parse(rs1)
if err1 != nil {
log.Println(err1)
}
proxy := NewSingleHostReverseProxy(url1)
log.Println("Starting httpserver at " + addr)
log.Fatal(http.ListenAndServe(addr, proxy))
}
func NewSingleHostReverseProxy(target *url.URL) *httputil.ReverseProxy {
//http://127.0.0.1:2002/dir?name=123
//RayQuery: name=123
//Scheme: http
//Host: 127.0.0.1:2002
targetQuery := target.RawQuery
// 请求体参数修改
director := func(req *http.Request) {
//url_rewrite
//127.0.0.1:2002/dir/abc ==> 127.0.0.1:2003/base/abc ??
//127.0.0.1:2002/dir/abc ==> 127.0.0.1:2002/abc
//127.0.0.1:2002/abc ==> 127.0.0.1:2003/base/abc
re, _ := regexp.Compile("^/dir(.*)");
req.URL.Path = re.ReplaceAllString(req.URL.Path, "$1") // 正则掉所有dir后面的地址,放在新的path中
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
//target.Path : /base
//req.URL.Path : /dir
req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
if targetQuery == "" || req.URL.RawQuery == "" {
req.URL.RawQuery = targetQuery + req.URL.RawQuery
} else {
req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
}
if _, ok := req.Header["User-Agent"]; !ok {
req.Header.Set("User-Agent", "")
}
}
// 定义返回响应的函数
modifyFunc := func(res *http.Response) error {
// 错误码时返回
if res.StatusCode != 200 {
return errors.New("error statusCode")
}
// 正常情况下直接修改返回体
oldPayload, err := ioutil.ReadAll(res.Body) // 拿到返回内容
if err != nil {
return err
}
newPayLoad := []byte("hello " + string(oldPayload)) // 追加新内容
res.Body = ioutil.NopCloser(bytes.NewBuffer(newPayLoad)) // NopCloser用一个无操作的Close方法包装r返回一个ReadCloser接口。因为res.Body是一个io.ReaderClose
res.ContentLength = int64(len(newPayLoad)) // 修改响应长度
res.Header.Set("Content-Length", fmt.Sprint(len(newPayLoad)))
return nil
}
// 如果modifyFunc有返回error,则触犯这个错误回调
errorHandler := func(res http.ResponseWriter, req *http.Request, err error) {
res.Write([]byte(err.Error()))
}
return &httputil.ReverseProxy{Director: director, ModifyResponse: modifyFunc, ErrorHandler: errorHandler}
}
func singleJoiningSlash(a, b string) string {
// URL路径拼接
aslash := strings.HasSuffix(a, "/")
bslash := strings.HasPrefix(b, "/")
switch {
case aslash && bslash:
return a + b[1:]
case !aslash && !bslash:
return a + "/" + b
}
return a + b
}
# 负载均衡策略
随机负载
随机挑选目标服务器IP
轮询负载
ABC三台服务器,ABCABC依次轮询
加权负载
给目标设置访问权重,按照权重轮询
一致性hash负载
请求固定URL访问指定IP
# 随机负载
random.go
type RandomBalance struct { currIndex int rss []string // 观察主体 } // 添加元素 func (r *RandomBalance) Add(params ...string) error { if len(params) == 0 { return errors.New("param len 1 at least") } add := params[0] r.rss=append(r.rss,add) return nil } // 轮询取得 func (r *RandomBalance) Next() string { if len(r.rss) == 0{ return "" } r.currIndex = rand.Intn(len(r.rss)) return r.rss[r.currIndex] } func (r *RandomBalance) Get( key string) (string, error){ return r.Next(),nil }
random_test.go
func TestRandomBalance(t *testing.T) { rb := &RandomBalance{} rb.Add("127.0.0.1:2003") //0 rb.Add("127.0.0.1:2004") //1 rb.Add("127.0.0.1:2005") //2 rb.Add("127.0.0.1:2006") //3 rb.Add("127.0.0.1:2007") //4 fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) }
# 轮询负载
round_robin.go
type RoundRobinBalance struct { curIndex int rss []string //观察主体 } func (r *RoundRobinBalance) Add(params ...string) error { if len(params) == 0 { return errors.New("param len 1 at least") } addr := params[0] r.rss = append(r.rss, addr) return nil } func (r *RoundRobinBalance) Next() string { if len(r.rss) == 0 { return "" } lens := len(r.rss) //5 if r.curIndex >= lens { r.curIndex = 0 } curAddr := r.rss[r.curIndex] r.curIndex = (r.curIndex + 1) % lens return curAddr }
Robin_test.go
func Test_main(t *testing.T) { rb := &RoundRobinBalance{} rb.Add("127.0.0.1:2003") //0 rb.Add("127.0.0.1:2004") //1 rb.Add("127.0.0.1:2005") //2 rb.Add("127.0.0.1:2006") //3 rb.Add("127.0.0.1:2007") //4 fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) fmt.Println(rb.Next()) }
# 加权轮询负载
有三个概念:
1、weight: 初始化时对节点约定的加权 2、currentWeight: 节点临时权重,每轮都会变化 3、effectWeight:节点有效权重,默认与weight相同 4、totalWeight: 所有节点有效权重之和:sum(effectweight) 忽略了有效权重,算法: 1、currentWeight=currentWeight+effectWeight 2、选中最大的currentWeight 3、currentWeight=currentWeight-totalWeight
请求次数 请求currentWeight 选中节点 请求后的currentWeight 1 serverA=4,serverB=3,serverC=2 serverA serverA=-1,serverB=3,serverC=2 2 serverA=-1,serverB=3,serverC=4 serverB serverA=3,serverB=0,serverC=6 3 serverA=3,serverB=0,serverC=6 serverC serverA=7,serverB=3,serverC=-1
# 一致性hash负载
hash_robin.go
package load_balance import ( "errors" "hash/crc32" "sort" "strconv" "sync" ) type Hash func(data []byte) uint32 type UInt32Slice []uint32 func (s UInt32Slice) Len() int { return len(s) } func (s UInt32Slice) Less(i, j int) bool { return s[i] < s[j] } func (s UInt32Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } type ConsistentHashBanlance struct { mux sync.RWMutex hash Hash replicas int //复制因子 keys UInt32Slice //已排序的节点hash切片 hashMap map[uint32]string //节点哈希和Key的map,键是hash值,值是节点key //观察主体 } func NewConsistentHashBanlance(replicas int, fn Hash) *ConsistentHashBanlance { m := &ConsistentHashBanlance{ replicas: replicas, hash: fn, hashMap: make(map[uint32]string), } if m.hash == nil { //最多32位,保证是一个2^32-1环 m.hash = crc32.ChecksumIEEE } return m } // 验证是否为空 func (c *ConsistentHashBanlance) IsEmpty() bool { return len(c.keys) == 0 } // Add 方法用来添加缓存节点,参数为节点key,比如使用IP func (c *ConsistentHashBanlance) Add(params ...string) error { if len(params) == 0 { return errors.New("param len 1 at least") } addr := params[0] c.mux.Lock() defer c.mux.Unlock() // 结合复制因子计算所有虚拟节点的hash值,并存入m.keys中,同时在m.hashMap中保存哈希值和key的映射 for i := 0; i < c.replicas; i++ { hash := c.hash([]byte(strconv.Itoa(i) + addr)) c.keys = append(c.keys, hash) c.hashMap[hash] = addr } // 对所有虚拟节点的哈希值进行排序,方便之后进行二分查找 sort.Sort(c.keys) return nil } // Get 方法根据给定的对象获取最靠近它的那个节点 func (c *ConsistentHashBanlance) Get(key string) (string, error) { if c.IsEmpty() { return "", errors.New("node is empty") } hash := c.hash([]byte(key)) // 通过二分查找获取最优节点,第一个"服务器hash"值大于"数据hash"值的就是最优"服务器节点" idx := sort.Search(len(c.keys), func(i int) bool { return c.keys[i] >= hash }) // 如果查找结果 大于 服务器节点哈希数组的最大索引,表示此时该对象哈希值位于最后一个节点之后,那么放入第一个节点中 if idx == len(c.keys) { idx = 0 } c.mux.RLock() defer c.mux.RUnlock() return c.hashMap[c.keys[idx]], nil }
hash_test.go
func TestNewConsistentHashBanlance(t *testing.T) { rb := NewConsistentHashBanlance(10, nil) rb.Add("127.0.0.1:2003") //0 rb.Add("127.0.0.1:2004") //1 rb.Add("127.0.0.1:2005") //2 rb.Add("127.0.0.1:2006") //3 rb.Add("127.0.0.1:2007") //4 //url hash fmt.Println(rb.Get("http://127.0.0.1:2002/base/getinfo")) fmt.Println(rb.Get("http://127.0.0.1:2002/base/error")) fmt.Println(rb.Get("http://127.0.0.1:2002/base/getinfo")) fmt.Println(rb.Get("http://127.0.0.1:2002/base/changepwd")) //ip hash fmt.Println(rb.Get("127.0.0.1")) fmt.Println(rb.Get("192.168.0.1")) fmt.Println(rb.Get("127.0.0.1")) }
# 负载工厂模式
factory.go
package load_balance type LbType int const ( LbRandom LbType = iota LbRoundRobin LbWeightRoundRobin LbConsistentHash ) func LoadBanlanceFactory(lbType LbType) LoadBalance { switch lbType { case LbRandom: return &RandomBalance{} case LbConsistentHash: return NewConsistentHashBanlance(10, nil) case LbRoundRobin: return &RoundRobinBalance{} case LbWeightRoundRobin: return &WeightRoundRobinBalance{} default: return &RandomBalance{} } }
interface.go
package load_balance type LoadBalance interface { Add(...string) error Get(string) (string, error) }
factory_test.go
package load_balance import ( "bytes" "io/ioutil" "log" "net" "net/http" "net/http/httputil" "net/url" "strconv" "strings" "testing" "time" ) var ( addr = "127.0.0.1:2002" transport = &http.Transport{ DialContext: (&net.Dialer{ Timeout: 30 * time.Second, //连接超时 KeepAlive: 30 * time.Second, //长连接超时时间 }).DialContext, MaxIdleConns: 100, //最大空闲连接 IdleConnTimeout: 90 * time.Second, //空闲超时时间 TLSHandshakeTimeout: 10 * time.Second, //tls握手超时时间 ExpectContinueTimeout: 1 * time.Second, //100-continue状态码超时时间 } ) func NewMultipleHostsReverseProxy(lb LoadBalance) *httputil.ReverseProxy { //请求协调者 director := func(req *http.Request) { nextAddr, err := lb.Get(req.URL.String()) // 通过URL的变化来负载分配,也可以根据RemoteIP来 if err != nil { log.Fatal("get next addr fail") } target, err := url.Parse(nextAddr) if err != nil { log.Fatal(err) } targetQuery := target.RawQuery req.URL.Scheme = target.Scheme req.URL.Host = target.Host req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path) if targetQuery == "" || req.URL.RawQuery == "" { req.URL.RawQuery = targetQuery + req.URL.RawQuery } else { req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery } if _, ok := req.Header["User-Agent"]; !ok { req.Header.Set("User-Agent", "user-agent") } } //更改内容 modifyFunc := func(resp *http.Response) error { //请求以下命令:curl 'http://127.0.0.1:2002/error' if resp.StatusCode != 200 { //获取内容 oldPayload, err := ioutil.ReadAll(resp.Body) if err != nil { return err } //追加内容 newPayload := []byte("StatusCode error:" + string(oldPayload)) resp.Body = ioutil.NopCloser(bytes.NewBuffer(newPayload)) resp.ContentLength = int64(len(newPayload)) resp.Header.Set("Content-Length", strconv.FormatInt(int64(len(newPayload)), 10)) } return nil } //错误回调 :关闭real_server时测试,错误回调 //范围:transport.RoundTrip发生的错误、以及ModifyResponse发生的错误 errFunc := func(w http.ResponseWriter, r *http.Request, err error) { //todo 如果是权重的负载则调整临时权重 http.Error(w, "ErrorHandler error:"+err.Error(), 500) } return &httputil.ReverseProxy{Director: director, Transport: transport, ModifyResponse: modifyFunc, ErrorHandler: errFunc} } func singleJoiningSlash(a, b string) string { aslash := strings.HasSuffix(a, "/") bslash := strings.HasPrefix(b, "/") switch { case aslash && bslash: return a + b[1:] case !aslash && !bslash: return a + "/" + b } return a + b } func TestFactory(t *testing.T) { rb := LoadBanlanceFactory(LbConsistentHash) if err := rb.Add("http://127.0.0.1:2003/base", "10"); err != nil { log.Println(err) } if err := rb.Add("http://127.0.0.1:2004/base", "20"); err != nil { log.Println(err) } proxy := NewMultipleHostsReverseProxy(rb) log.Println("Starting httpserver at " + addr) log.Fatal(http.ListenAndServe(addr, proxy)) }
# 中间件支持
# 中间件的意义
- 避免程序逻辑复杂
- 提高复用,隔离业务,比如权限认证我们可以单独抽离出来,通过中间件隔离业务
- 调用清晰,组合随意
可以把中间件的原理理解成“洋葱”,一层一层的进一层一层的出来。
- 中间件代码实现
- 中间件一般都封装在路由上,路由是URL请求分发的管理器
- 中间件选型:
- 基于链表构建中间件(缺点,实现复杂,调用不方便)
- 使用数组构建中间件,控制灵活方便,推荐使用
- 方法组装
- 构建中间件URL路由
- 构建URL的中间件方法数组
- 使用use方法整合路由和方法数组
# 开发一个中间件
middleware/slice_router.go
package middleware import ( "context" "math" "net/http" "strings" ) //目标定位是 tcp、http通用的中间件 //知其然也知其所以然 const abortIndex int8 = math.MaxInt8 / 2 //最多 63 个中间件 type HandlerFunc func(*SliceRouterContext) // router 结构体 type SliceRouter struct { groups []*SliceGroup } // group 结构体 type SliceGroup struct { *SliceRouter path string handlers []HandlerFunc } // router上下文 type SliceRouterContext struct { Rw http.ResponseWriter Req *http.Request Ctx context.Context *SliceGroup index int8 } func newSliceRouterContext(rw http.ResponseWriter, req *http.Request, r *SliceRouter) *SliceRouterContext { newSliceGroup := &SliceGroup{} //最长url前缀匹配 matchUrlLen := 0 for _, group := range r.groups { //fmt.Println("req.RequestURI") //fmt.Println(req.RequestURI) if strings.HasPrefix(req.RequestURI, group.path) { pathLen := len(group.path) if pathLen > matchUrlLen { matchUrlLen = pathLen *newSliceGroup = *group //浅拷贝数组指针 } } } c := &SliceRouterContext{Rw: rw, Req: req, SliceGroup: newSliceGroup, Ctx: req.Context()} c.Reset() return c } func (c *SliceRouterContext) Get(key interface{}) interface{} { return c.Ctx.Value(key) } func (c *SliceRouterContext) Set(key, val interface{}) { c.Ctx = context.WithValue(c.Ctx, key, val) } type SliceRouterHandler struct { coreFunc func(*SliceRouterContext) http.Handler router *SliceRouter } func (w *SliceRouterHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { c := newSliceRouterContext(rw, req, w.router) if w.coreFunc != nil { c.handlers = append(c.handlers, func(c *SliceRouterContext) { w.coreFunc(c).ServeHTTP(rw, req) }) } c.Reset() c.Next() } func NewSliceRouterHandler(coreFunc func(*SliceRouterContext) http.Handler, router *SliceRouter) *SliceRouterHandler { return &SliceRouterHandler{ coreFunc: coreFunc, router: router, } } // 构造 router func NewSliceRouter() *SliceRouter { return &SliceRouter{} } // 创建 Group func (g *SliceRouter) Group(path string) *SliceGroup { return &SliceGroup{ SliceRouter: g, path: path, } } // 构造回调方法 func (g *SliceGroup) Use(middlewares ...HandlerFunc) *SliceGroup { g.handlers = append(g.handlers, middlewares...) existsFlag := false for _, oldGroup := range g.SliceRouter.groups { if oldGroup == g { existsFlag = true } } if !existsFlag { g.SliceRouter.groups = append(g.SliceRouter.groups, g) } return g } // 从最先加入中间件开始回调 func (c *SliceRouterContext) Next() { c.index++ for c.index < int8(len(c.handlers)) { //fmt.Println("c.index") //fmt.Println(c.index) c.handlers[c.index](c) c.index++ } } // 跳出中间件方法 func (c *SliceRouterContext) Abort() { c.index = abortIndex } // 是否跳过了回调 func (c *SliceRouterContext) IsAborted() bool { return c.index >= abortIndex } // 重置回调 func (c *SliceRouterContext) Reset() { c.index = -1 }
middleware/proxy.go
package middleware import ( "bytes" "compress/gzip" "fmt" "io/ioutil" "math/rand" "net" "net/http" "net/http/httputil" "net/url" "strconv" "strings" "time" ) var transport = &http.Transport{ DialContext: (&net.Dialer{ Timeout: 30 * time.Second, //连接超时 KeepAlive: 30 * time.Second, //长连接超时时间 }).DialContext, MaxIdleConns: 100, //最大空闲连接 IdleConnTimeout: 90 * time.Second, //空闲超时时间 TLSHandshakeTimeout: 10 * time.Second, //tls握手超时时间 ExpectContinueTimeout: 1 * time.Second, //100-continue超时时间 } func NewMultipleHostsReverseProxy(c *SliceRouterContext, targets []*url.URL) *httputil.ReverseProxy { //请求协调者 director := func(req *http.Request) { targetIndex := rand.Intn(len(targets)) target := targets[targetIndex] targetQuery := target.RawQuery req.URL.Scheme = target.Scheme req.URL.Host = target.Host req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path) //todo 当对域名(非内网)反向代理时需要设置此项, 当作后端反向代理时不需要 req.Host = target.Host if targetQuery == "" || req.URL.RawQuery == "" { req.URL.RawQuery = targetQuery + req.URL.RawQuery } else { req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery } if _, ok := req.Header["User-Agent"]; !ok { req.Header.Set("User-Agent", "user-agent") } } //更改内容 modifyFunc := func(resp *http.Response) error { //todo 部分章节功能补充2 //todo 兼容websocket if strings.Contains(resp.Header.Get("Connection"), "Upgrade") { return nil } var payload []byte var readErr error //todo 部分章节功能补充3 //todo 兼容gzip压缩 if strings.Contains(resp.Header.Get("Content-Encoding"), "gzip") { gr, err := gzip.NewReader(resp.Body) if err != nil { return err } payload, readErr = ioutil.ReadAll(gr) resp.Header.Del("Content-Encoding") } else { payload, readErr = ioutil.ReadAll(resp.Body) } if readErr != nil { return readErr } //异常请求时设置StatusCode if resp.StatusCode != 200 { payload = []byte("StatusCode error:" + string(payload)) } //todo 部分章节功能补充4 //todo 因为预读了数据所以内容重新回写 c.Set("status_code", resp.StatusCode) c.Set("payload", payload) resp.Body = ioutil.NopCloser(bytes.NewBuffer(payload)) resp.ContentLength = int64(len(payload)) resp.Header.Set("Content-Length", strconv.FormatInt(int64(len(payload)), 10)) return nil } //错误回调 :关闭real_server时测试,错误回调 //范围:transport.RoundTrip发生的错误、以及ModifyResponse发生的错误 errFunc := func(w http.ResponseWriter, r *http.Request, err error) { //todo record error log fmt.Println(err) } return &httputil.ReverseProxy{Director: director, Transport: transport, ModifyResponse: modifyFunc, ErrorHandler: errFunc} } func singleJoiningSlash(a, b string) string { aslash := strings.HasSuffix(a, "/") bslash := strings.HasPrefix(b, "/") switch { case aslash && bslash: return a + b[1:] case !aslash && !bslash: return a + "/" + b } return a + b }
middleware/rancelog.go
package middleware import ( "log" ) func TraceLogSliceMW() func(c *SliceRouterContext) { return func(c *SliceRouterContext) { log.Println("trace_in") c.Next() log.Println("trace_out") } }
slice_test.go
var addr = "127.0.0.1:2002" func TestSliceRouter(t *testing.T) { reverseProxy := func(c *SliceRouterContext) http.Handler { rs1 := "http://127.0.0.1:2003/base" url1, err1 := url.Parse(rs1) if err1 != nil { log.Println(err1) } rs2 := "http://127.0.0.1:2004/base" url2, err2 := url.Parse(rs2) if err2 != nil { log.Println(err2) } urls := []*url.URL{url1, url2} return NewMultipleHostsReverseProxy(c,urls) } log.Println("Starting httpserver at " + addr) //初始化方法数组路由器 sliceRouter := NewSliceRouter() //中间件可充当业务逻辑代码 sliceRouter.Group("/base").Use(TraceLogSliceMW(), func(c *SliceRouterContext) { c.Rw.Write([]byte("test func")) }) //请求到反向代理 sliceRouter.Group("/").Use(TraceLogSliceMW(), func(c *SliceRouterContext) { fmt.Println("reverseProxy") reverseProxy(c).ServeHTTP(c.Rw, c.Req) }) routerHandler := NewSliceRouterHandler(nil, sliceRouter) log.Fatal(http.ListenAndServe(addr, routerHandler)) }
# 限流
高并发系统的三大利器:缓存、降级、限流
- 缓存:提升系统访问速度和增大处理容量,为相应业务增加缓存
- 降级:当服务器压力剧增时,根据业务策略降级,以及释放服务资源保证业务正常
- 限流:通过对并发限速,以达到拒绝服务、排队或等待、降级处理
# 限流的分类:
- 漏桶限流:每次请求时计算桶流量,超过阀值则降级请求
- 令牌桶限流:每次请求时从令牌桶取令牌,取不到则降级请求
我们通过采用漏桶限流的方式进行实现限流,使用time/rate限速器:
1、rate.NewLimter(limit,burst) limit表示每秒产生token数,burst最多存多少token
2、Allow 判断当前是否可以取到token
3、Wait阻塞等待直到取到token
4、Reserve返回等待时间,再去取token
原理解析
1、计算上次请求和当前请求的时间差
2、计算时间差内生成的token数+旧token数
3、如果token为负数,计算等待时间
4、token为正,则请求后token-1
代码实现
import (
"context"
"golang.org/x/time/rate"
"log"
"testing"
"time"
)
func Test_RateLimiter(t *testing.T) {
l := rate.NewLimiter(1, 6) // 每秒1个token,最多存放6个
log.Println(l.Limit(), l.Burst())
for i := 0; i < 10; i++ {
//阻塞等待直到,取到一个token
log.Println("before Wait")
c, _ := context.WithTimeout(context.Background(), time.Second*2)
if err := l.Wait(c); err != nil {
log.Println("limiter wait err:" + err.Error())
}
log.Println("after Wait")
//返回需要等待多久才有新的token,这样就可以等待指定时间执行任务
r := l.Reserve()
log.Println("reserve Delay:", r.Delay())
//判断当前是否可以取到token
a := l.Allow()
log.Println("Allow:", a)
}
}
# 网关集成限流功能
实现一个可以注册到中间件的限流器,代码如下:
func RateLimiter() func(c *middleware.SliceRouterContext) {
l := rate.NewLimiter(1, 2)
return func(c *middleware.SliceRouterContext) {
if !l.Allow() {
c.Rw.Write([]byte(fmt.Sprintf("rate limit:%v,%v", l.Limit(), l.Burst())))
c.Abort()
return
}
c.Next()
}
}
写一个测试来实现代理限流
var addr = "127.0.0.1:2002"
// 限流方案
func TestRater(t *testing.T) {
coreFunc := func(c *middleware.SliceRouterContext) http.Handler { // 创建一个核心方法
rs1 := "http://127.0.0.1:2003/base"
url1, err1 := url.Parse(rs1)
if err1 != nil {
log.Println(err1)
}
rs2 := "http://127.0.0.1:2004/base"
url2, err2 := url.Parse(rs2)
if err2 != nil {
log.Println(err2)
}
urls := []*url.URL{url1, url2}
return middleware.NewMultipleHostsReverseProxy(c, urls)
}
log.Println("Starting httpserver at " + addr)
sliceRouter := middleware.NewSliceRouter() // 创建一个我们自己封装的路由
sliceRouter.Group("/").Use(RateLimiter())
routerHandler := middleware.NewSliceRouterHandler(coreFunc, sliceRouter) // 传入的是核心方法handler,和一个路由
log.Fatal(http.ListenAndServe(addr, routerHandler))
}
# 熔断与降级
- 熔断:熔断器是当依赖的服务已经出现故障时,为了保证自身服务的正常运行不再访问依赖服务,防止雪崩效应(保险丝)
- 降级:当服务器压力剧增时,根据业务策略降级,以此释放服务资源保证业务正常
# 熔断器的三种状态
- 关闭状态:服务正常,维护失败率统计,达到失败率阀值时,转为开启
- 开启状态:服务异常,调用fallback函数,一段时间后,进入半开启状态
- 半开启状态:尝试恢复服务,失败率高于阀值,进入开启状态,第一阀值,进入关闭状态(尝试走业务逻辑)
# hystrix-go
Hystrix-go类库,是熔断、降级、限流集成的类库,同时提供了监控面板;hystri-go的基本使用:
配套的Hystrix Dashboard用作展示服务状态,代码实现:
import (
"errors"
"github.com/afex/hystrix-go/hystrix"
"log"
"net/http"
"testing"
"time"
)
func Test_main(t *testing.T) {
hystrixStreamHandler := hystrix.NewStreamHandler()
hystrixStreamHandler.Start()
go http.ListenAndServe(":8074", hystrixStreamHandler)
hystrix.ConfigureCommand("aaa", hystrix.CommandConfig{
Timeout: 1000, // 单次请求 超时时间
MaxConcurrentRequests: 1, // 最大并发量
SleepWindow: 5000, // 熔断后多久去尝试服务是否可用
RequestVolumeThreshold: 1, // 验证熔断的 请求数量, 10秒内采样
ErrorPercentThreshold: 1, // 验证熔断的 错误百分比
})
for i := 0; i < 10000; i++ {
//异步调用使用 hystrix.Go
err := hystrix.Do("aaa", func() error { // hystrix.Do是同步调用
//test case 1 并发测试
if i == 0 {
return errors.New("service error") // 第一次发生报错,触发熔断,之后如果并发量太多也会触发
}
//test case 2 超时测试
//time.Sleep(2 * time.Second)
log.Println("do services")
return nil
}, nil)
if err != nil {
log.Println("hystrix err:" + err.Error())
time.Sleep(1 * time.Second)
log.Println("sleep 1 second")
}
}
time.Sleep(100 * time.Second)
}
# WebSocket
注意:反向代理服务器在客户端和服务端中间时,需要注意必须满足第一代理的角色
# WebSocket代理实战
webSocket实际服务器/客户端
package main
import (
"flag"
"github.com/gorilla/websocket"
"html/template"
"log"
"net/http"
)
var addr = flag.String("addr", "localhost:2003", "http service address")
var upgrader = websocket.Upgrader{} // use default options
func echo(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Print("upgrade:", err)
return
}
defer c.Close()
for {
mt, message, err := c.ReadMessage()
if err != nil {
log.Println("read:", err)
break
}
log.Printf("recv: %s", message)
err = c.WriteMessage(mt, message)
if err != nil {
log.Println("write:", err)
break
}
}
}
func home(w http.ResponseWriter, r *http.Request) {
homeTemplate.Execute(w, "ws://"+r.Host+"/echo")
}
func main() {
flag.Parse()
log.SetFlags(0)
http.HandleFunc("/echo", echo)
http.HandleFunc("/", home)
log.Println("Starting websocket server at " + *addr)
log.Fatal(http.ListenAndServe(*addr, nil))
}
var homeTemplate = template.Must(template.New("").Parse(`
<!DOCTYPE html>
<head>
<meta charset="utf-8">
<script>
window.addEventListener("load", function(evt) {
var output = document.getElementById("output");
var input = document.getElementById("input");
var ws;
var print = function(message) {
var d = document.createElement("div");
d.innerHTML = message;
output.appendChild(d);
};
document.getElementById("open").onclick = function(evt) {
if (ws) {
return false;
}
var web_url=document.getElementById("web_url").value
ws = new WebSocket(web_url);
ws.onopen = function(evt) {
print("OPEN");
}
ws.onclose = function(evt) {
print("CLOSE");
ws = null;
}
ws.onmessage = function(evt) {
print("RESPONSE: " + evt.data);
}
ws.onerror = function(evt) {
print("ERROR: " + evt.data);
}
return false;
};
document.getElementById("send").onclick = function(evt) {
if (!ws) {
return false;
}
print("SEND: " + input.value);
ws.send(input.value);
return false;
};
document.getElementById("close").onclick = function(evt) {
if (!ws) {
return false;
}
ws.close();
return false;
};
});
</script>
</head>
<body>
<table>
<tr><td valign="top" width="50%">
<p>Click "Open" to create a connection to the server,
"Send" to send a message to the server and "Close" to close the connection.
You can change the message and send multiple times.
<p>
<form>
<button id="open">Open</button>
<button id="close">Close</button>
<p><input id="web_url" type="text" value="{{.}}">
<p><input id="input" type="text" value="Hello world!">
<button id="send">Send</button>
</form>
</td><td valign="top" width="50%">
<div id="output"></div>
</td></tr></table>
</body>
</html>
`))
深入理解WebSocket的upgrade.Upgrade步骤:
1、获取Sec-Websocket-key
2、sha1生成Sec-Websocket-Accept
3、向客户端发送101status
设置反向代理
import (
"learn-plus/proxy/load_balance"
"learn-plus/proxy/middleware"
"log"
"net/http"
)
var (
addr = "127.0.0.1:2002"
)
func main() {
rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
rb.Add("http://127.0.0.1:2003", "50")
proxy := load_balance.NewLoadBalanceReverseProxy(&middleware.SliceRouterContext{}, rb)
log.Println("Starting httpserver at " + addr)
log.Fatal(http.ListenAndServe(addr, proxy))
}
# http2
# https和http2的关系
- http2代表多路复用的传输协议
- https代表http服务器使用了加密传输
- 一个启用https的服务器不一定使用http2
- 但是使用http2的服务器必须启用http2
# http2的设计目标
- 大多数情况下的感知延迟要有实质上改进
- 解决HTTP1.1中的“队首阻塞”问题
- 并行操作无需与服务器建立多个连接
# http2基本概念
- 流:流是连接中的虚拟信道,可以承载双向消息
- 消息:是指逻辑上HTTP消息,比如请求、响应等
- 帧:HTTP2.0 通信的最小单位,每个帧包含帧首部,至少也会识别出当前帧所属的流,承载着特定的数据
# https/http2下游服务实现
实现下游服务
package main import ( "fmt" "golang.org/x/net/http2" "io" "learn-plus/https/testcrt" "log" "net/http" "os" "os/signal" "syscall" "time" ) /* 证书签名生成方式: //CA私钥 openssl genrsa -out ca.key 2048 //CA数据证书 openssl req -x509 -new -nodes -key ca.key -subj "/CN=example1.com" -days 5000 -out ca.crt //服务器私钥(默认由CA签发) openssl genrsa -out server.key 2048 //服务器证书签名请求:Certificate Sign Request,简称csr(example1.com代表你的域名) openssl req -new -key server.key -subj "/CN=example1.com" -out server.csr //上面2个文件生成服务器证书(days代表有效期) openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 5000 */ func main() { rs1 := &RealServer{Addr: "127.0.0.1:3003"} rs1.Run() rs2 := &RealServer{Addr: "127.0.0.1:3004"} rs2.Run() //监听关闭信号 quit := make(chan os.Signal) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit } type RealServer struct { Addr string } func (r *RealServer) Run() { log.Println("Starting httpserver at " + r.Addr) mux := http.NewServeMux() mux.HandleFunc("/", r.HelloHandler) mux.HandleFunc("/base/error", r.ErrorHandler) server := &http.Server{ Addr: r.Addr, WriteTimeout: time.Second * 3, Handler: mux, } go func() { http2.ConfigureServer(server, &http2.Server{}) // 设置http2的支持,以帧的形式输出 log.Fatal(server.ListenAndServeTLS(testcrt.Path("server.crt"), testcrt.Path("server.key"))) }() } func (r *RealServer) HelloHandler(w http.ResponseWriter, req *http.Request) { upath := fmt.Sprintf("http://%s%s\n", r.Addr, req.URL.Path) io.WriteString(w, upath) } func (r *RealServer) ErrorHandler(w http.ResponseWriter, req *http.Request) { upath := "error handler" w.WriteHeader(500) io.WriteString(w, upath) }
testcrt/testdata.go
// basepath is the root directory of this package. var basepath string func init() { _, currentFile, _, _ := runtime.Caller(0) basepath = filepath.Dir(currentFile) } // Path returns the absolute path the given relative file or directory path, // relative to the google.golang.org/grpc/testdata directory in the user's GOPATH. // If rel is already absolute, it is returned unmodified. func Path(rel string) string { if filepath.IsAbs(rel) { return rel } return filepath.Join(basepath, rel) }
分析
1、ListenAndServer调整ListenAndServeTLS即可支持http2 2、http服务器设置http2.ConfigureServer支持http2 2.1、设置h2对应的handler回调 2.2、http.Server中设置了对h2的回调 2.3、h2回调方法负责从帧数据中转换数据为http请求
# Tcp代理
# 什么是负载均衡
负载均衡(Load Balance)建立在现有网络结构之上,它提供了一种廉价有效透明的方法扩展网络设备和服务器的带宽、增加吞吐量、加强网络数据处理能力、提高网络的灵活性和可用性。负载均衡有两方面的含义:首先,大量的并发访问或数据流量分担到多台节点设备上分别处理,减少用户等待响应的时间;其次,单个重负载的运算分担到多台节点设备上做并行处理,每个节点设备处理结束后,将结果汇总,返回给用户,系统处理能力得到大幅度提高。
简单来说就是:其一是将大量的并发处理转发给后端多个节点处理,减少工作响应时间;其二是将单个繁重的工作转发给后端多个节点处理,处理完再返回给负载均衡中心,再返回给用户。目前负载均衡技术大多数是用于提高诸如在Web服务器、FTP服务器和其它关键任务服务器上的Internet服务器程序的可用性和可伸缩性。
# 负载均衡分类
- 二层负载均衡(mac):根据OSI模型分的二层负载,一般是用虚拟mac地址方式,外部对虚拟MAC地址请求,负载均衡接收后分配后端实际的MAC地址响应.
- 三层负载均衡(ip):一般采用虚拟IP地址方式,外部对虚拟的ip地址请求,负载均衡接收后分配后端实际的IP地址响应. (即一个ip对一个ip的转发, 端口全放开)
- 四层负载均衡(tcp):在三次负载均衡的基础上,即从第四层"传输层"开始, 使用"ip+port"接收请求,再转发到对应的机器。
- 七层负载均衡(http):从第七层"应用层"开始, 根据虚拟的url或IP,主机名接收请求,再转向相应的处理服务器
我们运维中最常见的四层和七层负载均衡,这里重点说下这两种负载均衡。
1)四层的负载均衡就是基于IP+端口的负载均衡:在三层负载均衡的基础上,通过发布三层的IP地址(VIP),然后加四层的端口号,来决定哪些流量需要做负载均衡,对需要处理的流量进行NAT处理,转发至后台服务器,并记录下这个TCP或者UDP的流量是由哪台服务器处理的,后续这个连接的所有流量都同样转发到同一台服务器处理。
对应的负载均衡器称为四层交换机(L4 switch),主要分析IP层及TCP/UDP层,实现四层负载均衡。此种负载均衡器不理解应用协议(如HTTP/FTP/MySQL等等)。
实现四层负载均衡的软件有:
F5:硬件负载均衡器,功能很好,但是成本很高。
lvs:重量级的四层负载软件
nginx:轻量级的四层负载软件,带缓存功能,正则表达式较灵活
haproxy:模拟四层转发,较灵活
2)七层的负载均衡就是基于虚拟的URL或主机IP的负载均衡:在四层负载均衡的基础上(没有四层是绝对不可能有七层的),再考虑应用层的特征,比如同一个Web服务器的负载均衡,除了根据VIP加80端口辨别是否需要处理的流量,还可根据七层的URL、浏览器类别、语言来决定是否要进行负载均衡。举个例子,如果你的Web服务器分成两组,一组是中文语言的,一组是英文语言的,那么七层负载均衡就可以当用户来访问你的域名时,自动辨别用户语言,然后选择对应的语言服务器组进行负载均衡处理。
对应的负载均衡器称为七层交换机(L7 switch),除了支持四层负载均衡以外,还有分析应用层的信息,如HTTP协议URI或Cookie信息,实现七层负载均衡。此种负载均衡器能理解应用协议。
实现七层负载均衡的软件有:
haproxy:天生负载均衡技能,全面支持七层代理,会话保持,标记,路径转移;
nginx:只在http协议和mail协议上功能比较好,性能与haproxy差不多;
apache:功能较差
Mysql proxy:功能尚可。
总的来说,一般是lvs做4层负载;nginx做7层负载(也能做4层负载, 通过stream模块);haproxy比较灵活,4层和7层负载均衡都能做
# 四层负载和七层负载的区别
- 实际是路由转发和反向代理的区别
- 转发客户端与服务器只会有一次三次握手,而代理有两次
- NAT是作用于内核运行,代理是用户程序运行的
- 代理的数据进入程序buffer中
# Tcp代理原理
- 本质上还是七层反向代理,只是代理的内容是tcp协议包
- 初始化tcp服务器
- 创建上下游连接
- 上下游连接数据交换
# Tcp代理实现
参考http.util.RecerseProxy实现,服务与代理逻辑分离:
- 构建tcp服务器
- 构建tcp代理
- tcp服务器与tcp代理结合,实现基于负载均衡的代理
构建tcp服务器
package main
import (
"context"
"fmt"
"learn-plus/proxy/tcp_proxy"
"log"
"net"
)
var (
addr = ":7002"
)
type tcpHandler struct {
}
func (t *tcpHandler) ServeTCP(ctx context.Context, src net.Conn) {
src.Write([]byte("tcpHandler\n"))
}
func main() {
//tcp服务器测试
log.Println("Starting tcpserver at " + addr)
tcpServ := tcp_proxy.TcpServer{
Addr: addr,
Handler: &tcpHandler{},
}
fmt.Println("Starting tcp_server at " + addr)
tcpServ.ListenAndServe()
//代理测试
//rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
//rb.Add("127.0.0.1:6001", "40")
//proxy := proxy.NewTcpLoadBalanceReverseProxy(&tcp_middleware.TcpSliceRouterContext{}, rb)
//tcpServ := tcp_proxy.TcpServer{Addr: addr, Handler: proxy,}
//fmt.Println("Starting tcp_proxy at " + addr)
//tcpServ.ListenAndServe()
//redis服务器测试
//rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
//rb.Add("127.0.0.1:6379", "40")
//proxy := proxy.NewTcpLoadBalanceReverseProxy(&tcp_middleware.TcpSliceRouterContext{}, rb)
//tcpServ := tcp_proxy.TcpServer{Addr: addr, Handler: proxy,}
//fmt.Println("Starting tcp_proxy at " + addr)
//tcpServ.ListenAndServe()
//http服务器测试:
//缺点对请求的管控不足,比如我们用来做baidu代理,因为无法更改请求host,所以很轻易把我们拒绝
//rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
//rb.Add("127.0.0.1:2003", "40")
////rb.Add("www.baidu.com:80", "40")
//proxy := proxy.NewTcpLoadBalanceReverseProxy(&tcp_tcp_middleware.TcpSliceRouterContext{}, rb)
//tcpServ := tcp_proxy.TcpServer{Addr: addr, Handler: proxy,}
//fmt.Println("tcp_proxy start at:" + addr)
//tcpServ.ListenAndServe()
//websocket服务器测试:缺点对请求的管控不足
//rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
//rb.Add("127.0.0.1:2003", "40")
//proxy := proxy.NewTcpLoadBalanceReverseProxy(&tcp_middleware.TcpSliceRouterContext{}, rb)
//tcpServ := tcp_proxy.TcpServer{Addr: addr, Handler: proxy,}
//fmt.Println("Starting tcp_proxy at " + addr)
//tcpServ.ListenAndServe()
//http2服务器测试:缺点对请求的管控不足
//rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
//rb.Add("127.0.0.1:3003", "40")
//proxy := proxy.NewTcpLoadBalanceReverseProxy(&tcp_middleware.TcpSliceRouterContext{}, rb)
//tcpServ := tcp_proxy.TcpServer{Addr: addr, Handler: proxy,}
//fmt.Println("Starting tcp_proxy at " + addr)
//tcpServ.ListenAndServe()
prox/tcp_server.go
package tcp_proxy
import (
"context"
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
)
var (
ErrServerClosed = errors.New("tcp: Server closed")
ErrAbortHandler = errors.New("tcp: abort TCPHandler")
ServerContextKey = &contextKey{"tcp-server"}
LocalAddrContextKey = &contextKey{"local-addr"}
)
type onceCloseListener struct {
net.Listener
once sync.Once
closeErr error
}
func (oc *onceCloseListener) Close() error {
oc.once.Do(oc.close)
return oc.closeErr
}
func (oc *onceCloseListener) close() {
oc.closeErr = oc.Listener.Close()
}
type TCPHandler interface {
ServeTCP(ctx context.Context, conn net.Conn)
}
type TcpServer struct {
Addr string
Handler TCPHandler
err error
BaseCtx context.Context
WriteTimeout time.Duration
ReadTimeout time.Duration
KeepAliveTimeout time.Duration
mu sync.Mutex
inShutdown int32
doneChan chan struct{}
l *onceCloseListener
}
func (s *TcpServer) shuttingDown() bool {
return atomic.LoadInt32(&s.inShutdown) != 0
}
func (srv *TcpServer) ListenAndServe() error {
if srv.shuttingDown() {
return ErrServerClosed
}
if srv.doneChan == nil {
srv.doneChan = make(chan struct{})
}
addr := srv.Addr
if addr == "" {
return errors.New("need addr")
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
return srv.Serve(tcpKeepAliveListener{
ln.(*net.TCPListener)})
}
func (srv *TcpServer) Close() error {
atomic.StoreInt32(&srv.inShutdown, 1)
close(srv.doneChan) //关闭channel
srv.l.Close() //执行listener关闭
return nil
}
func (srv *TcpServer) Serve(l net.Listener) error {
srv.l = &onceCloseListener{Listener: l}
defer srv.l.Close() //执行listener关闭
if srv.BaseCtx == nil {
srv.BaseCtx = context.Background()
}
baseCtx := srv.BaseCtx
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
rw, e := l.Accept()
if e != nil {
select {
case <-srv.getDoneChan():
return ErrServerClosed
default:
}
fmt.Printf("accept fail, err: %v\n", e)
continue
}
c := srv.newConn(rw)
go c.serve(ctx)
}
return nil
}
func (srv *TcpServer) newConn(rwc net.Conn) *conn {
c := &conn{
server: srv,
rwc: rwc,
}
// 设置参数
if d := c.server.ReadTimeout; d != 0 {
c.rwc.SetReadDeadline(time.Now().Add(d))
}
if d := c.server.WriteTimeout; d != 0 {
c.rwc.SetWriteDeadline(time.Now().Add(d))
}
if d := c.server.KeepAliveTimeout; d != 0 {
if tcpConn, ok := c.rwc.(*net.TCPConn); ok {
tcpConn.SetKeepAlive(true)
tcpConn.SetKeepAlivePeriod(d)
}
}
return c
}
func (s *TcpServer) getDoneChan() <-chan struct{} {
s.mu.Lock()
defer s.mu.Unlock()
if s.doneChan == nil {
s.doneChan = make(chan struct{})
}
return s.doneChan
}
func ListenAndServe(addr string, handler TCPHandler) error {
server := &TcpServer{Addr: addr, Handler: handler, doneChan: make(chan struct{}),}
return server.ListenAndServe()
}
proxy/conn.go
package tcp_proxy
import (
"context"
"fmt"
"net"
"runtime"
)
type tcpKeepAliveListener struct {
*net.TCPListener
}
//todo 思考点:继承方法覆写方法时,只要使用非指针接口
func (ln tcpKeepAliveListener) Accept() (net.Conn, error) {
tc, err := ln.AcceptTCP()
if err != nil {
return nil, err
}
return tc, nil
}
type contextKey struct {
name string
}
func (k *contextKey) String() string {
return "tcp_proxy context value " + k.name
}
type conn struct {
server *TcpServer
cancelCtx context.CancelFunc
rwc net.Conn
remoteAddr string
}
func (c *conn) close() {
c.rwc.Close()
}
func (c *conn) serve(ctx context.Context) {
defer func() {
if err := recover(); err != nil && err != ErrAbortHandler {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
fmt.Printf("tcp: panic serving %v: %v\n%s", c.remoteAddr, err, buf)
}
c.close()
}()
c.remoteAddr = c.rwc.RemoteAddr().String()
ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
if c.server.Handler == nil {
panic("handler empty")
}
c.server.Handler.ServeTCP(ctx, c.rwc)
}
# Tcp反向代理实现
proxy_tcp.go
package main
import (
"context"
"fmt"
"learn-plus/proxy/load_balance"
"learn-plus/proxy/proxy"
"learn-plus/proxy/tcp_middleware"
"learn-plus/proxy/tcp_proxy"
"net"
)
var (
addr = ":2002"
)
type tcpHandler struct {
}
func (t *tcpHandler) ServeTCP(ctx context.Context, src net.Conn) {
src.Write([]byte("tcpHandler\n"))
}
func main() {
//tcp服务器测试
//log.Println("Starting tcpserver at " + addr)
//tcpServ := tcp_proxy.TcpServer{
// Addr: addr,
// Handler: &tcpHandler{},
//}
//fmt.Println("Starting tcp_server at " + addr)
//tcpServ.ListenAndServe()
//代理测试
rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
rb.Add("127.0.0.1:7002", "40") // 代理上游服务
proxy := proxy.NewTcpLoadBalanceReverseProxy(&tcp_middleware.TcpSliceRouterContext{}, rb)
tcpServ := tcp_proxy.TcpServer{Addr: addr, Handler: proxy,}
fmt.Println("Starting tcp_proxy at " + addr)
tcpServ.ListenAndServe()
//redis服务器测试
//rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
//rb.Add("127.0.0.1:6379", "40")
//proxy := proxy.NewTcpLoadBalanceReverseProxy(&tcp_middleware.TcpSliceRouterContext{}, rb)
//tcpServ := tcp_proxy.TcpServer{Addr: addr, Handler: proxy,}
//fmt.Println("Starting tcp_proxy at " + addr)
//tcpServ.ListenAndServe()
//http服务器测试:
//缺点对请求的管控不足,比如我们用来做baidu代理,因为无法更改请求host,所以很轻易把我们拒绝
//rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
//rb.Add("127.0.0.1:2003", "40")
////rb.Add("www.baidu.com:80", "40")
//proxy := proxy.NewTcpLoadBalanceReverseProxy(&tcp_tcp_middleware.TcpSliceRouterContext{}, rb)
//tcpServ := tcp_proxy.TcpServer{Addr: addr, Handler: proxy,}
//fmt.Println("tcp_proxy start at:" + addr)
//tcpServ.ListenAndServe()
//websocket服务器测试:缺点对请求的管控不足
//rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
//rb.Add("127.0.0.1:2003", "40")
//proxy := proxy.NewTcpLoadBalanceReverseProxy(&tcp_middleware.TcpSliceRouterContext{}, rb)
//tcpServ := tcp_proxy.TcpServer{Addr: addr, Handler: proxy,}
//fmt.Println("Starting tcp_proxy at " + addr)
//tcpServ.ListenAndServe()
//http2服务器测试:缺点对请求的管控不足
//rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
//rb.Add("127.0.0.1:3003", "40")
//proxy := proxy.NewTcpLoadBalanceReverseProxy(&tcp_middleware.TcpSliceRouterContext{}, rb)
//tcpServ := tcp_proxy.TcpServer{Addr: addr, Handler: proxy,}
//fmt.Println("Starting tcp_proxy at " + addr)
//tcpServ.ListenAndServe()
}
Proxy/tcp_reverse_proxy.go
package proxy
import (
"context"
"learn-plus/proxy/load_balance"
"learn-plus/proxy/tcp_middleware"
"io"
"log"
"net"
"time"
)
func NewTcpLoadBalanceReverseProxy(c *tcp_middleware.TcpSliceRouterContext, lb load_balance.LoadBalance) *TcpReverseProxy {
return func() *TcpReverseProxy {
nextAddr, err := lb.Get("")
if err != nil {
log.Fatal("get next addr fail")
}
return &TcpReverseProxy{
ctx: c.Ctx,
Addr: nextAddr,
KeepAlivePeriod: time.Second,
DialTimeout: time.Second,
}
}()
}
//TCP反向代理
type TcpReverseProxy struct {
ctx context.Context //单次请求单独设置
Addr string
KeepAlivePeriod time.Duration //设置
DialTimeout time.Duration //设置超时时间
DialContext func(ctx context.Context, network, address string) (net.Conn, error)
OnDialError func(src net.Conn, dstDialErr error)
ProxyProtocolVersion int
}
func (dp *TcpReverseProxy) dialTimeout() time.Duration {
if dp.DialTimeout > 0 {
return dp.DialTimeout
}
return 10 * time.Second
}
var defaultDialer = new(net.Dialer)
func (dp *TcpReverseProxy) dialContext() func(ctx context.Context, network, address string) (net.Conn, error) {
if dp.DialContext != nil {
return dp.DialContext
}
return (&net.Dialer{
Timeout: dp.DialTimeout, //连接超时
KeepAlive: dp.KeepAlivePeriod, //设置连接的检测时长
}).DialContext
}
func (dp *TcpReverseProxy) keepAlivePeriod() time.Duration {
if dp.KeepAlivePeriod != 0 {
return dp.KeepAlivePeriod
}
return time.Minute
}
//传入上游 conn,在这里完成下游连接与数据交换
func (dp *TcpReverseProxy) ServeTCP(ctx context.Context, src net.Conn) {
//设置连接超时
var cancel context.CancelFunc
if dp.DialTimeout >= 0 {
ctx, cancel = context.WithTimeout(ctx, dp.dialTimeout())
}
dst, err := dp.dialContext()(ctx, "tcp", dp.Addr)
if cancel != nil {
cancel()
}
if err != nil {
dp.onDialError()(src, err)
return
}
defer func() { go dst.Close() }() //记得退出下游连接
//设置dst的 keepAlive 参数,在数据请求之前
if ka := dp.keepAlivePeriod(); ka > 0 {
if c, ok := dst.(*net.TCPConn); ok {
c.SetKeepAlive(true)
c.SetKeepAlivePeriod(ka)
}
}
errc := make(chan error, 1)
go dp.proxyCopy(errc, src, dst)
go dp.proxyCopy(errc, dst, src)
<-errc
}
func (dp *TcpReverseProxy) onDialError() func(src net.Conn, dstDialErr error) {
if dp.OnDialError != nil {
return dp.OnDialError
}
return func(src net.Conn, dstDialErr error) {
log.Printf("tcpproxy: for incoming conn %v, error dialing %q: %v", src.RemoteAddr().String(), dp.Addr, dstDialErr)
src.Close()
}
}
func (dp *TcpReverseProxy) proxyCopy(errc chan<- error, dst, src net.Conn) {
_, err := io.Copy(dst, src)
errc <- err
}
# thrift安装介绍
安装:https://thrift.apache.org/docs/install/
构建thrift测试sever和client
- 首先编写
thrift_gen.thrift
- 运行IDL生成命令
thrift --gen go thrift_gen.thrift
- 使用生成的IDL单独构建 server 与 client 即可
介绍:
- Facebook的开源项目,后来进入apache孵化
- thrift也是支持跨语言的一个rpc框架,有自己的一套IDL
- thrift的网络协议建立在tcp的基础上
# gRPC透明代理
介绍:gRPC是谷歌出品的一个高性能、开源和通用的RPC框架,是基于http/2标准设计,支持普通rpc也支持双向流式传递数据,相对于thrift连接可以多路复用,可传递header头数据
(关于grpc方面的功能演示,统一使用非加密方式),首先确保grpc安装,步骤如下:
官网:https://github.com/grpc/grpc-go
# 安装步骤
# go mod安装方式
- 开启 go mod
export GO111MODULE=on
- 开启代理 go mod
export GOPROXY=https://goproxy.io
- 安装grpc
go get -u google.golang.org/grpc
- 安装proto
go get -u github.com/golang/protobuf/proto
- 安装protoc-gen-go
go get -u github.com/golang/protobuf/protoc-gen-go
# 遇到命令不存在 command not found: protoc
缺少protobuf预先安装导致的问题。 protoc-gen-go相当于只是protobuf的一个语言支持。
安装protobuf参照资料
mac 下 brew install protobuf
windows 下 https://blog.csdn.net/qq_41185868/article/details/82882206
linux 下 https://blog.csdn.net/sszzyzzy/article/details/89946075
# 遇到 I/O Timeout Errors
$ go get -u google.golang.org/grpc
package google.golang.org/grpc: unrecognized import path "google.golang.org/grpc" (https fetch: Get https://google.golang.org/grpc?go-get=1: dial tcp 216.239.37.1:443: i/o timeout)
- 要么翻墙解决问题
- 要么设置
export GOPROXY=https://goproxy.io
# 遇到 permission denied
go get -u google.golang.org/grpc
go get github.com/golang/protobuf/protoc-gen-go: open /usr/local/go/bin/protoc-gen-go: permission denied
- 要么给目录增加读写权限
chmod -R 777 /usr/local/go/
- 要么设置重新设置一下GOROOT地址指向非系统目录 比如 mac 下面 vim ~/.bashrc
#export GOROOT=/usr/local/go
#export GOBIN=/usr/local/go/bin
#export LGOBIN=/usr/local/go/bin
export GOROOT=/Users/niuyufu/go_1.12
export GOBIN=/Users/niuyufu/go_1.12/bin
export LGOBIN=/Users/niuyufu/go_1.12/bin
# 构建grpc测试server与client
- 首先编写
echo.proto
- 运行IDL生成命令,(如:遇到命令不存在 command not found: protoc 请参照上文帮助)
protoc -I . --go_out=plugins=grpc:proto ./echo.proto
- 使用生成的IDL单独构建 server 与 client 即可
# 构建grpc-gateway 测试服务端让服务器支持http
# 安装参考
https://github.com/grpc-ecosystem/grpc-gateway
- 开启 go mod
export GO111MODULE=on
- 开启代理 go mod
export GOPROXY=https://goproxy.io
- 执行安装命令
go install github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
go install github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger
go install github.com/golang/protobuf/protoc-gen-go
# 构建grpc-gateway 测试服务端
- 编写
echo-gateway.proto
- 运行IDL生成命令
protoc -I/usr/local/include -I. -I$GOPATH/src -I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis --go_out=plugins=grpc:proto echo-gateway.proto
- 删除 proto/echo.pb.go 防止结构体冲突
rm proto/echo.pb.go
- 运行gateway生成命令
protoc -I/usr/local/include -I. -I$GOPATH/src \
-I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis --grpc-gateway_out=logtostderr=true:proto echo-gateway.proto
- 使用生成的IDL单独构建 server
- 使用浏览器测试 server
curl 'http://127.0.0.1:8081/v1/example/echo' -d '{"message":"11222222"}'
# 代码实现
grpc/echo.proto
syntax = "proto3"; package echo; option go_package = ".;proto"; // 输出文件 // EchoRequest is the request for echo. message EchoRequest { string message = 1; } // EchoResponse is the response for echo. message EchoResponse { string message = 1; } // Echo is the echo service. service Echo { // UnaryEcho is unary echo. rpc UnaryEcho(EchoRequest) returns (EchoResponse) {} // ServerStreamingEcho is server side streaming. rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {} // ClientStreamingEcho is client side streaming. rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {} // BidirectionalStreamingEcho is bidi streaming. rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {} }
在grpc文件目录下创建proto文件夹,执行:protoc -I . --go_out=plugins=grpc:proto ./echo.proto命令,会输出一个满足实现proto文件的go文件,我们的客户端和服务端都是根据这个桥梁来进行通信
grpc/server/main.go
package main import ( "context" "flag" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "io" pb "learn-plus/grpc/proto" "log" "net" ) var port = flag.Int("port", 50055, "the port to serve on") const ( streamingCount = 10 ) type server struct{} func (s *server) ServerStreamingEcho(in *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error { fmt.Printf("--- ServerStreamingEcho ---\n") fmt.Printf("request received: %v\n", in) // Read requests and send responses. for i := 0; i < streamingCount; i++ { fmt.Printf("echo message %v\n", in.Message) err := stream.Send(&pb.EchoResponse{Message: in.Message}) if err != nil { return err } } return nil } func (s *server) ClientStreamingEcho(stream pb.Echo_ClientStreamingEchoServer) error { fmt.Printf("--- ClientStreamingEcho ---\n") // Read requests and send responses. var message string for { in, err := stream.Recv() if err == io.EOF { fmt.Printf("echo last received message\n") return stream.SendAndClose(&pb.EchoResponse{Message: message}) } message = in.Message fmt.Printf("request received: %v, building echo\n", in) if err != nil { return err } } } func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error { fmt.Printf("--- BidirectionalStreamingEcho ---\n") // Read requests and send responses. for { in, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return err } fmt.Printf("request received %v, sending echo\n", in) if err := stream.Send(&pb.EchoResponse{Message: in.Message}); err != nil { return err } } } func (s *server) UnaryEcho(ctx context.Context, in *pb.EchoRequest) (*pb.EchoResponse, error) { fmt.Printf("--- UnaryEcho ---\n") md, ok := metadata.FromIncomingContext(ctx) if !ok { log.Println("miss metadata from context") } fmt.Println("md",md) fmt.Printf("request received: %v, sending echo\n", in) return &pb.EchoResponse{Message: in.Message}, nil } func main() { flag.Parse() lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) if err != nil { log.Fatalf("failed to listen: %v", err) } fmt.Printf("server listening at %v\n", lis.Addr()) s := grpc.NewServer() pb.RegisterEchoServer(s, &server{}) s.Serve(lis) }
grace/client/main.go
package main import ( "context" "flag" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "io" pb "learn-plus/grpc/proto" "log" "sync" "time" ) var addr = flag.String("addr", "localhost:50055", "the address to connect to") const ( timestampFormat = time.StampNano // "Jan _2 15:04:05.000" streamingCount = 10 AccessToken="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1ODk2OTExMTQsImlzcyI6ImFwcF9pZF9iIn0.qb2A_WsDP_-jfQBxJk6L57gTnAzZs-SPLMSS_UO6Gkc" ) func unaryCallWithMetadata(c pb.EchoClient, message string) { fmt.Printf("--- unary ---\n") // Create metadata and context. md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) md.Append("authorization", "Bearer "+AccessToken) ctx := metadata.NewOutgoingContext(context.Background(), md) r, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: message}) if err != nil { log.Fatalf("failed to call UnaryEcho: %v", err) } fmt.Printf("response:%v\n", r.Message) } func serverStreamingWithMetadata(c pb.EchoClient, message string) { fmt.Printf("--- server streaming ---\n") md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) md.Append("authorization", "Bearer "+AccessToken) ctx := metadata.NewOutgoingContext(context.Background(), md) stream, err := c.ServerStreamingEcho(ctx, &pb.EchoRequest{Message: message}) if err != nil { log.Fatalf("failed to call ServerStreamingEcho: %v", err) } // Read all the responses. var rpcStatus error fmt.Printf("response:\n") for { r, err := stream.Recv() if err != nil { rpcStatus = err break } fmt.Printf(" - %s\n", r.Message) } if rpcStatus != io.EOF { log.Fatalf("failed to finish server streaming: %v", rpcStatus) } } func clientStreamWithMetadata(c pb.EchoClient, message string) { fmt.Printf("--- client streaming ---\n") md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) md.Append("authorization", "Bearer "+AccessToken) ctx := metadata.NewOutgoingContext(context.Background(), md) stream, err := c.ClientStreamingEcho(ctx) if err != nil { log.Fatalf("failed to call ClientStreamingEcho: %v\n", err) } // Send all requests to the server. for i := 0; i < streamingCount; i++ { if err := stream.Send(&pb.EchoRequest{Message: message}); err != nil { log.Fatalf("failed to send streaming: %v\n", err) } } // Read the response. r, err := stream.CloseAndRecv() if err != nil { log.Fatalf("failed to CloseAndRecv: %v\n", err) } fmt.Printf("response:%v\n", r.Message) } func bidirectionalWithMetadata(c pb.EchoClient, message string) { fmt.Printf("--- bidirectional ---\n") md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) md.Append("authorization", "Bearer "+AccessToken) ctx := metadata.NewOutgoingContext(context.Background(), md) stream, err := c.BidirectionalStreamingEcho(ctx) if err != nil { log.Fatalf("failed to call BidirectionalStreamingEcho: %v\n", err) } go func() { // Send all requests to the server. for i := 0; i < streamingCount; i++ { if err := stream.Send(&pb.EchoRequest{Message: message}); err != nil { log.Fatalf("failed to send streaming: %v\n", err) } } stream.CloseSend() }() // Read all the responses. var rpcStatus error fmt.Printf("response:\n") for { r, err := stream.Recv() if err != nil { rpcStatus = err break } fmt.Printf(" - %s\n", r.Message) } if rpcStatus != io.EOF { log.Fatalf("failed to finish server streaming: %v", rpcStatus) } } const message = "this is examples/metadata" func main() { flag.Parse() wg := sync.WaitGroup{} for i := 0; i < 1; i++ { wg.Add(1) go func() { defer wg.Done() conn, err := grpc.Dial(*addr, grpc.WithInsecure()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewEchoClient(conn) //调用一元方法 //for i := 0; i < 100; i++ { unaryCallWithMetadata(c, message) time.Sleep(400 * time.Millisecond) //} // //服务端流式 serverStreamingWithMetadata(c, message) time.Sleep(1 * time.Second) //客户端流式 clientStreamWithMetadata(c, message) time.Sleep(1 * time.Second) //双向流式 bidirectionalWithMetadata(c, message) }() } wg.Wait() time.Sleep(1 * time.Second) }
# gRPC实现反向代理
设置gRPC反向代理和透明服务:https://github.com/e421083458/gateway_demo/tree/master/demo/proxy/grpc_reverse_proxy
# 网关服务发现
服务发现是指用注册中心来记录服务信息,以便其他服务快速查找已注册服务,服务发现分类:
- 客户端服务发现(服务发现模块)
- 服务端服务发现(注册中心)
# zookeeper介绍
是一个分布式数据库(程序协调服务),Hadoop子项目;是一个树状方式维护节点的数据增删改查;监听通知机制,通过监听可以获取相应的消息事件
# zookeeper核心功能补充
- 持久节点:一直保存在服务器上
- 临时节点:会话失效,节点自动清理
- 顺序节点:节点创建,自动分配序列号
# zookeeper安装
参考官方文档安装:https://zookeeper.apache.org/doc/r3.6.0/zookeeperStarted.html#sc_Download
解压文件
> tar -zxvf zookeeper-3.4.10.tar.gz //解压 > cd zookeeper-3.4.10/conf //切换到配置目录下 > mv zoo_sample.cfg zoo.cfg //更改默认配置文件名称 > vi zoo.cfg //编辑配置文件,自定义dataDir
修改配置文件:
tickTime: zookeeper中使用的基本时间单位, 毫秒 dataDir: 内存数据快照的保存目录;如果没有自定义Log也使用该目录 clientPort: 监听Cli连接的端口号
zookeepr使用增删改查
[zk: 127.0.0.1:2181(CONNECTED) 2] ls / [zookeeper] [zk: 127.0.0.1:2181(CONNECTED) 3] create /zk_test my_data // 增 Created /zk_test [zk: 127.0.0.1:2181(CONNECTED) 4] ls / [zookeeper, zk_test] [zk: 127.0.0.1:2181(CONNECTED) 5] get /zk_test // 查 my_data [zk: 127.0.0.1:2181(CONNECTED) 6] set /zk_test junk // 该 [zk: 127.0.0.1:2181(CONNECTED) 8] delete /zk_test // 删
运行于关闭:
> cd zookeeper-3.4.10/bin //切换到 bin目录 > ./zkServer.sh start //启动 > ./zkServer.sh stop //停止后,如果CLi没有关闭,将报错
# Go操作zookeeper
代码实现
package main import ( "fmt" "github.com/samuel/go-zookeeper/zk" "time" ) var ( host = []string{"127.0.0.1:2181"} ) func main() { conn, _, err := zk.Connect(host, 5*time.Second) if err != nil { panic(err) } //增 if _, err := conn.Create("/test_tree2", []byte("tree_content"), 0, zk.WorldACL(zk.PermAll)); err != nil { // acl是权限 fmt.Println("create err", err) } //查 nodeValue, dStat, err := conn.Get("/test_tree2") if err != nil { fmt.Println("get err", err) return } fmt.Println("nodeValue", string(nodeValue)) //改 if _, err := conn.Set("/test_tree2", []byte("new_content"), dStat.Version); err != nil { fmt.Println("update err", err) } //删除 _, dStat, _ = conn.Get("/test_tree2") if err := conn.Delete("/test_tree2", dStat.Version); err != nil { fmt.Println("Delete err", err) //return } //验证存在 hasNode, _, err := conn.Exists("/test_tree2") if err != nil { fmt.Println("Exists err", err) //return } fmt.Println("node Exist", hasNode) //增加 if _, err := conn.Create("/test_tree2", []byte("tree_content"), 0, zk.WorldACL(zk.PermAll)); err != nil { fmt.Println("create err", err) } //设置子节点 if _, err := conn.Create("/test_tree2/subnode", []byte("node_content"), 0, zk.WorldACL(zk.PermAll)); err != nil { fmt.Println("create err", err) } //获取子节点列表 childNodes, _, err := conn.Children("/test_tree2") if err != nil { fmt.Println("Children err", err) } fmt.Println("childNodes", childNodes) }
# zookeeper监听子节点
监听部分,代码实现:
package main import ( "fmt" "github.com/e421083458/gateway_demo/proxy/zookeeper" "log" "os" "os/signal" "syscall" ) var addr = "127.0.0.1:2002" func main() { //获取zk节点列表 zkManager := zookeeper.NewZkManager([]string{"127.0.0.1:2181"}) zkManager.GetConnect() defer zkManager.Close() // 第一次获取一下 zlist, err := zkManager.GetServerListByPath("/real_server") fmt.Println("server node:") fmt.Println(zlist) if err != nil { log.Println(err) } //动态监听节点变化 -- 配套register //chanList, chanErr := zkManager.WatchServerListByPath("/real_server") //go func() { // for { // select { // case changeErr := <-chanErr: // fmt.Println("changeErr") // fmt.Println(changeErr) // case changedList := <-chanList: // fmt.Println("watch node changed") // fmt.Println(changedList) // } // } //}() //获取节点内容 -- 配套write zc, _, err := zkManager.GetPathData("/rs_server_conf") if err != nil { log.Println(err) } fmt.Println("get node data:") fmt.Println(string(zc)) //动态监听节点内容 dataChan, dataErrChan := zkManager.WatchPathData("/rs_server_conf") go func() { for { select { case changeErr := <-dataErrChan: fmt.Println("changeErr") fmt.Println(changeErr) case changedData := <-dataChan: fmt.Println("WatchGetData changed") fmt.Println(string(changedData)) } } }() //关闭信号监听 quit := make(chan os.Signal) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit }
服务注册后,数据变化部分
package main import ( "fmt" "github.com/e421083458/gateway_demo/proxy/zookeeper" "time" ) func main() { zkManager := zookeeper.NewZkManager([]string{"127.0.0.1:2181"}) zkManager.GetConnect() defer zkManager.Close() i := 0 for { conf := fmt.Sprintf("{name:" + fmt.Sprint(i) + "}") zkManager.SetPathData("/rs_server_conf", []byte(conf), int32(i)) time.Sleep(5 * time.Second) i++ } }
# 网关服务发现原理
网关中的负载均衡监听zookeeper变化,服务启动时在zookeeper中创建临时节点
下游机器启动时创建临时节点,节点名和内容为服务地址
以观察者模式构建负载配置LoadBalanConf
负载均衡配置LoadBalanceConf与负载均衡器整合
结合服务启动时,将服务注册到zookeeper中的代码,这样可以通过watch实现动态监听服务变化
package main
import (
"fmt"
"github.com/e421083458/gateway_demo/proxy/zookeeper"
"io"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
rs1 := &RealServer{Addr: "127.0.0.1:2003"}
rs1.Run()
time.Sleep(2 * time.Second)
rs2 := &RealServer{Addr: "127.0.0.1:2004"}
rs2.Run()
time.Sleep(2 * time.Second)
//监听关闭信号
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
}
type RealServer struct {
Addr string
}
func (r *RealServer) Run() {
log.Println("Starting httpserver at " + r.Addr)
mux := http.NewServeMux()
mux.HandleFunc("/", r.HelloHandler)
mux.HandleFunc("/base/error", r.ErrorHandler)
server := &http.Server{
Addr: r.Addr,
WriteTimeout: time.Second * 3,
Handler: mux,
}
go func() {
//注册zk节点
zkManager := zookeeper.NewZkManager([]string{"127.0.0.1:2181"})
err := zkManager.GetConnect()
if err != nil {
fmt.Printf(" connect zk error: %s ", err)
}
defer zkManager.Close()
err = zkManager.RegistServerPath("/real_server", r.Addr)
if err != nil {
fmt.Printf(" regist node error: %s ", err)
}
zlist, err := zkManager.GetServerListByPath("/real_server")
fmt.Println(zlist)
log.Fatal(server.ListenAndServe())
}()
}
func (r *RealServer) HelloHandler(w http.ResponseWriter, req *http.Request) {
upath := fmt.Sprintf("http://%s%s\n", r.Addr, req.URL.Path)
io.WriteString(w, upath)
}
func (r *RealServer) ErrorHandler(w http.ResponseWriter, req *http.Request) {
upath := "error handler"
w.WriteHeader(500)
io.WriteString(w, upath)
}
上面实现了怎么将服务启动时注册到zookeeper中,下面我们来实现怎么从zookeeper中拿到服务,来做负载均衡,采用观察者模式。
实现思路:负载均衡策略使用观察者模式,通过监听zk的动态变化,来更新配置
# 客户端服务发现
- 下游机器启动时无需进行任何操作
- 以观察者模式构建负载均衡配置LoadBalanceConf
- 负载均衡配置固定时间频率检测下游节点健康状况
← 设计模式