# 微服务网关

Tcp为啥需要三次握手、四次挥手

  • 三次握手主要目的是保证连接的双工(发送和接受可同时),可靠更多的是通过重传机制保证的

# 三次握手

# 四次挥手

1、为什么需要等待2MSL?

  • MSL: Maximum Segment Lifetime ,30s-60s
  • 保证Tcp连接的成功关闭,如果最后client的ACK包没有被服务端接收到,那么服务端将处于LAST-ACK阶段,需要客户端重传ACK,所以需要主动关闭方等待哈
  • 保证这次连接的重复数据段从网络中消息,也就是说如果马上又有一个连接从客户端的这个端口发起,等待2msl保证上次的重传或者连接能成功清除,不会混淆到下一个连接中

2、为撒会出现大量的close_wait?

  • 首先close_wait一般出现在被动关闭方,因为服务端没有关闭连接
  • 并发请求太多
  • 被动关闭方没有及时释放端口资源导致,卡在close_wait,程序就结束了

# 使用抓包工具分析

# Tcp为啥需要流量控制

  • 由于通讯双方,网速不同,通讯方任一方发送过快都会导致对方消息处理不过来,所以需要把数据放到缓冲区中
  • 如果缓冲区满了,发送方还在发送,那接收方只能把数据包丢弃,因此需要控制发送速率
  • 我们缓冲区剩余大小称之为接受窗口,用变量win表示,如果win=0,则发送方停止发送

# Tcp为撒需要拥塞控制

  • 流量控制与拥塞控制是两个概念,拥塞控制是调节网络负载
  • 接受方网络资源繁忙,因未及时响应ACK导致发送方重传大量数据,这样将会导致网络更加拥堵
  • 拥塞控制是动态调整win大小,不只是依赖缓冲区大小确定窗口大小

# 为什么会出现粘/拆包

  • 应用程序写入的数据大于套接字缓冲区大小,这将会发生拆包
  • 应用程序写入数据小于套接字缓冲区的大小,网卡将应用多次写入数据发送到网络上,这将发生粘包
  • 进行mss(最大报文长度)大小的Tcp分段,当Tcp报文长度-Tcp头部长度>mss的时候拆包

如何避免粘包/拆包,获取完成数据报文?

  • 使用带消息头的协议,头部写入包长度,然后读取指定长度
  • 设置定长消息
  • 设置消息边界
  • 更复杂的协议:json、protobuf

代码实现:

Unpack/pack.go

package unpack

import (
	"encoding/binary"
	"errors"
	"io"
)

const Msg_hander ="12345678"

// 编码
func Encode(bytesByffer io.Writer, content string)  error {
	// msg_handle + content_len + content
	// 8 + 4 + len
	if err := binary.Write(bytesByffer,binary.BigEndian,[]byte(Msg_hander)); err != nil {  // 使用二进制编码
		return err
	}

	clen := int32(len([]byte(content)))  // 转化为32位,4字节的长度
	if err := binary.Write(bytesByffer,binary.BigEndian,clen); err != nil {
		return err
	}

	if err := binary.Write(bytesByffer,binary.BigEndian,[]byte(content)); err != nil {
		return err
	}

	return nil
}

// 解码
func Decode(bytesBuffer io.Reader) (bodyBuf []byte, err error) {
	MagicBuf := make([]byte,len(Msg_hander))
	if _, err  = io.ReadFull(bytesBuffer,MagicBuf); err != nil {
		return nil ,err
	}
	if string(MagicBuf) != Msg_hander {
		return nil , errors.New("msg_hander error")
	}

	lengthBuf := make([]byte, 4)
	if _, err := io.ReadFull(bytesBuffer,lengthBuf); err != nil {
		return nil , errors.New("length error")
	}

	length := binary.BigEndian.Uint32(lengthBuf)
	bodyBuf = make([]byte, length)
	if _, err = io.ReadFull(bytesBuffer,bodyBuf);err != nil {
		return nil ,err
	}
	return bodyBuf, nil
}

client.go

func main() {
	// 1、 连接服务器
	conn, err := net.Dial("tcp", "0.0.0.0:9999")
	defer conn.Close()
	if err != nil {
		fmt.Println("connect failed")
		return
	}
	unpack.Encode(conn, "hello, world 000 !!!")
}

Server.go

func process(conn net.Conn) {
	defer conn.Close()
	for {
		buf, err := unpack.Decode(conn)
		if err != nil {
			fmt.Printf("read from connect failed , err: %v\n", err)
			break
		}
		str := string(buf)
		fmt.Printf("receive from client, data: %v\n",str)
	}
}

func main(){
	// 1、监听端口
	listern, err := net.Listen("tcp","0.0.0.0:9999")
	if err != nil {
		fmt.Printf("listen fail,err: %v\n",err)
	}

	// 2、建立套接字连接
	for {
		conn, err := listern.Accept()
		if err != nil {
			fmt.Printf("accept fail,err : %v\n", err)
			continue
		}
		// 3、协程处理监听服务
		go process(conn)
	}
}

# Tcp拥塞控制

  • 慢开始和拥塞避免
  • 快速重传和快速恢复

# Udp连接通信

Client.go

func main() {
	// 监听服务器
	conn, err :=  net.DialUDP("udp",nil, &net.UDPAddr{
		IP:   net.IPv4(127,0,0,1),
		Port: 9909,
	})

	if err != nil {
		fmt.Println("listen failed")
		return
	}
	for i := 0; i <100; i++ {
		_,err = conn.Write([]byte("hello server"))
		if err != nil {
			fmt.Printf("send data failed :%v", err)
			return
		}

		result := make([]byte,1024)
		n, remoteAddr, err := conn.ReadFromUDP(result)
		if err != nil {
			fmt.Printf("receive data failed %v",err)
			return
		}
		fmt.Printf("recevice data: %v, addr: %v  \n", string(result[:n]),remoteAddr)
	}
}

Server.go

func main() {
	// 监听服务器
	listen, err :=  net.ListenUDP("udp",&net.UDPAddr{
		IP:   net.IPv4(0,0,0,0),
		Port: 9909,
	})

	if err != nil {
		fmt.Println("listen failed")
		return
	}

	for {
		var data [1024]byte
		n, addr, err := listen.ReadFromUDP(data[:])
		if err != nil {
			fmt.Printf("read failed from addr: %s",addr)
			break
		}

		go func() {
			fmt.Printf("read failed from addr: %s ,data:%v, count:%v\n",addr, string(data[:n]),n)
			_, err := listen.WriteToUDP([]byte("received sucess"),addr)
			if err != nil {
				fmt.Printf("write failed : %v",err)
			}
		}()
	}
}

# Tcp连接通信

server端

package main

import (
	"fmt"
	"net"
)

func process(conn net.Conn) {
	defer conn.Close() // 如果服务端不主动关闭,会怎么样?
	for {
		var buf [128]byte
		n, err := conn.Read(buf[:])
		if err != nil {
			fmt.Printf("read from connect failed , err: %v\n", err)
			break
		}
		str := string(buf[:n])
		fmt.Printf("receive from client, data: %v\n",str)
	}
}

func main(){
	// 1、监听端口
	listern, err := net.Listen("tcp","0.0.0.0:9999")
	if err != nil {
		fmt.Printf("listen fail,err: %v\n",err)
	}

	// 2、建立套接字连接
	for {
		conn, err := listern.Accept()
		if err != nil {
			fmt.Printf("accept fail,err : %v\n", err)
			continue
		}
		// 3、协程处理监听服务
		go process(conn)
	}
}

client端

package main

import (
	"bufio"
	"fmt"
	"net"
	"os"
	"strings"
)

func main() {
	// 1、 连接服务器
	conn, err := net.Dial("tcp", "0.0.0.0:9999")
	defer conn.Close()  // 思考:如果不关闭回发生什么?如果客户端不主动关闭连接,该连接还是会保持着占用

	if err != nil {
		fmt.Println("connect failed")
		return
	}

	// 2、读取命令行输入
	inputReader := bufio.NewReader(os.Stdin)
	for {
		input, err := inputReader.ReadString('\n')
		if err != nil {
			fmt.Printf("read from console failed")
			break
		}
		// 4、读取Q停止
		trimmedInput := strings.TrimSpace(input)
		if trimmedInput == "Q" {
			break
		}
		// 5、回复消息
		_, err = docke.Write([]byte(trimmedInput))
		if err != nil {
			fmt.Printf("write failed, err : %s\v",err)
			break
		}
	}
}

注意:如果是服务端没有主动关闭连接的动作,那么将会使得client端处于“fin_wait"阶段、服务端处于“close_wait"阶段(服务端协程已经关闭)

# Http服务通信

server端

package main

import (
	"log"
	"net/http"
	"time"
)

var (
	Addr = ":1212"
)

func sayBye(w http.ResponseWriter, r *http.Request) {
	time.Sleep(1 * time.Second)
	w.Write([]byte("bye bye"))
}

func main() {
	// 创建路由器
	mux := http.NewServeMux()
	// 设置路由规则
	mux.HandleFunc("/bye",sayBye)

	// 创建服务器
	server := &http.Server{
		Addr: Addr,
		WriteTimeout: time.Second*3,
		Handler: mux,
	}

	// 监听端口并提供服务
	log.Println("Starting httpServer at ", Addr)
	log.Fatal(server.ListenAndServe())
}

client端

package main

import (
	"fmt"
	"io/ioutil"
	"net"
	"net/http"
	"time"
)

func main() {
	// 创建连接池
	transport := &http.Transport{
		DialContext: (&net.Dialer{
			Timeout:       30*time.Second,  // 连接超时
			KeepAlive:     30*time.Second,  // 长连接超时时间
		}).DialContext,
		MaxIdleConns:100,    // 最大空闲连接
		IdleConnTimeout:90*time.Second,  //  空闲超时使劲啊
		TLSHandshakeTimeout:10*time.Second,  //  tls握手超时时间
		ExpectContinueTimeout:1*time.Second,  //  100-continue状态吗超时时间
	}
	//  创建客户端
	client := &http.Client{
		Timeout: time.Second * 30,
		Transport:transport,
	}

	// 请求数据
	resp, err := client.Get("http://127.0.0.1:1212/bye")
	if err !=  nil {
		panic(err)
	}
	defer resp.Body.Close()  // 不释放会独占内存
	// 读取数据
	bas, err := ioutil.ReadAll(resp.Body)
	if err !=  nil {
		panic(err)
	}
	fmt.Println(string(bas))
}

# 网络代理

  • 用户通过代理请求信息
  • 请求通过网络代理完成转发到达目标服务器
  • 目标服务器响应后通过网络代理回传给用户

网络转发: 客户端ip ——> 经过路由转发 ——> 目标服务器

  • 也就是说网络转发是路由器对报文的转发操作,中间可能对数据包进行修改

网络代理: 客户端ip(携带代理服务器的ip)——> 代理服务器 ——> 分发服务到后台服务

  • 用户不直接连接服务器,网络代理去连接,获取数据后返回给用户

# 正向代理

  • 是一种客户端的代理技术,帮助客户端访问无法访问的服务资源,可以隐藏用户的真实IP,比如浏览器web代理,VPN等

  • 实现一个web浏览器代理:

    • 代理接受客户端请求,复制原请求对象,并根据数据配置新请求各种参数

    • 把新请求发送到真实服务端,并接收到服务端返回

    • 代理服务器对响应做一些处理,然后返回给客户端

      用户请求——>代理服务器监听——>交给上游TCP连接——>回调方法——>拷贝请求数据——>请求下游服务Transport RoundTrip——>回写上游数据
      
  • 代码实现:

    package main
    
    import (
    	"fmt"
    	"io"
    	"net"
    	"net/http"
    	"strings"
    )
    
    type Pxy struct{}
    
    func (p *Pxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
    	fmt.Printf("Received request %s %s %s\n",req.Method,req.Host,req.RemoteAddr)
    	transport := http.DefaultTransport   // 数据连接池
    	// step1, 浅拷贝对象,然后就再新增属性数据,因为浅拷贝可以保存新增属性
    	outReq := new(http.Request)
    	*outReq = *req
    
    	if clienIP,_,err := net.SplitHostPort(req.RemoteAddr); err == nil {
    		if prior,ok := outReq.Header["X-Forwarded-For"];ok { // X-Forwarded-For 是一个代理Ip的链表
    			clienIP = strings.Join(prior,", ") + ", "+clienIP
    		}
    		outReq.Header.Set("X-Forwarded-For",clienIP)
    	}
    
    	// step2, 请求下游
    	res, err := transport.RoundTrip(outReq) // 获取到下游的响应数据
    	if err != nil {
    		rw.WriteHeader(http.StatusBadGateway)
    		return
    	}
    
    	// step3, 下游请求内容返回给上游
    	for key, value := range res.Header {
    		for _, v := range value {
    			rw.Header().Add(key,v)  // 头写入
    		}
    	}
    
    	//rw.Header().Add("xxx","sss")  // 可以给每个请求进行修改
    	rw.WriteHeader(res.StatusCode)
    	io.Copy(rw,res.Body)  // 数据写入
    	res.Body.Close()
    }
    
    func main() {
    	fmt.Println("Serve on :8080")
    	http.Handle("/",&Pxy{})  // Handle中的第二个参数必须实现ServeHTTP
    	http.ListenAndServe("0.0.0.0:8080",nil)
    }
    
  • 服务启动后,打开浏览器,设置网络代理,设置好web网页代理到127.0.0.1,端口为8080,然后去访问一个http网址,如:www.tianya.cn

# 反向代理

  • 是一种服务端的代理技术,帮助服务器做负载均衡、缓存、提供安全校验等,可以隐藏服务器真实IP,比如LVS技术、nginx proxy_pass等,所以我们可以在这做:负载、安全校验等功能

    • 代理接收客户端请求,更改请求结构体信息
    • 通过一定的负载均衡算法获取下游服务地址
    • 把请求发送到下游服务器,并获取返回内容
    • 对返回内容做一些处理,返回给客户端

    简单的http代理服务实现:

    real_server/main.go: 真实后端服务

    package main
    
    import (
    	"fmt"
    	"io"
    	"log"
    	"net/http"
    	"os"
    	"os/signal"
    	"syscall"
    	"time"
    )
    
    type RealServer struct {
    	Addr string
    }
    
    func (r *RealServer) HelloHandler(w http.ResponseWriter, req *http.Request) {
    	//127.0.0.1:8008/abc?sdsdsa=11
    	//r.Addr=127.0.0.1:8008
    	//req.URL.Path=/abc
    	upath := fmt.Sprintf("http://%s%s\n",r.Addr,req.URL.Path)
    	realIp := fmt.Sprintf("RemoteAddr=%s,X-Forwarded-For=%v,X-Real-IP=%v\n",req.RemoteAddr,req.Header.Get("X-Forwarded-For"),req.Header.Get("X-Real-Ip"))
    	header := fmt.Sprintf("headers=%v\n",req.Header)
    	io.WriteString(w,upath)
    	io.WriteString(w,realIp)
    	io.WriteString(w,header)
    }
    
    func (r *RealServer) ErrorHandler(w http.ResponseWriter, req *http.Request) {
    	upath := "error handler"
    	w.WriteHeader(500)
    	io.WriteString(w, upath)
    }
    
    func (r *RealServer) TimeoutHandler(w http.ResponseWriter, req *http.Request) {
    	time.Sleep(6*time.Second)
    	upath := "timeout handler"
    	w.WriteHeader(200)
    	io.WriteString(w, upath)
    }
    
    func (r *RealServer) Run() {
    	log.Println("Starting httpServer at: ",r.Addr)
    	mux  := http.NewServeMux()
    	mux.HandleFunc("/",r.HelloHandler)
    	mux.HandleFunc("/base/error", r.ErrorHandler)
    	mux.HandleFunc("/test_http_string/test_http_string/aaa", r.TimeoutHandler)
    	server := &http.Server{
    		Addr:         r.Addr,
    		WriteTimeout: time.Second * 3,
    		Handler:      mux,
    	}
    	go func() {
    		log.Fatal(server.ListenAndServe())
    	}()
    }
    
    func main() {
    	rs1 := &RealServer{Addr: "127.0.0.1:2003"}
    	rs1.Run()
    
    	rs2 := &RealServer{Addr: "127.0.0.1:2004"}
    	rs2.Run()
    
    	quit := make(chan os.Signal)
    	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    	fmt.Println("Got signal:", <-quit)
    }
    

    reverse_proxy_base/main.go: 反向代理服务

    package main
    
    import (
    	"bufio"
    	"log"
    	"net/http"
    	"net/url"
    )
    
    var (
    	proxy_addr = "http://127.0.0.1:2003"
    	port = "2002"
    )
    
    func handler(w http.ResponseWriter,r *http.Request) {
    	// step1 解析代理地址,并更改请求体的协议和主体
    	proxy, err := url.Parse(proxy_addr)
    	r.URL.Scheme = proxy.Scheme // 协议
    	r.URL.Host = proxy.Host
    
    	// step2 请求下游服务
    	transport := http.DefaultTransport
    	resp, err := transport.RoundTrip(r)
    
    	if err != nil {
    		log.Println(err)
    		return
    	}
    
    	// step3 把下游请求内容返回给上游
    	for k, vv := range resp.Header {
    		for _, v := range vv {
    			w.Header().Add(k,v)
    		}
    	}
    
    	defer resp.Body.Close()
    	bufio.NewReader(resp.Body).WriteTo(w)  // 将拿到的下游响应写入
    }
    
    func main() {
    	http.HandleFunc("/",handler)
    	log.Println("Start serving on port: ",port)
    	err := http.ListenAndServe(":"+port,nil)
    	if err != nil {
    		log.Fatal(err)
    	}
    }
    

    上面实现了2002代理2003端口的服务

# HTTP代理

之前的是实现了简单的网络代理,其实我们可以在此基础上实现更为复杂的其他功能:

  • 错误回调、错误日志处理
  • 更改代理返回内容
  • 负载均衡
  • url重写
  • 限流、熔断、降级
  • 数据统计、权限验证等

我们以后会使用ReverseProxy来实现如上功能:

  • 4种负载轮询类型的实现和接口封装
  • 拓展中间件支持:限流、熔断、权限、统计等
# ReverseProxy结构
type ReverseProxy struct {
	// Director must be a function which modifies
	// the request into a new request to be sent
	// using Transport. Its response is then copied
	// back to the original client unmodified.
	// Director must not access the provided Request
	// after returning.
	Director func(*http.Request)  // 控制器,一个函数,可以对请求内容进行修改,如请求参数

	// The transport used to perform proxy requests.
	// If nil, http.DefaultTransport is used.
	Transport http.RoundTripper  // 连接池,默认是DefaultTransport

	// FlushInterval specifies the flush interval
	// to flush to the client while copying the
	// response body.
	// If zero, no periodic flushing is done.
	// A negative value means to flush immediately
	// after each write to the client.
	// The FlushInterval is ignored when ReverseProxy
	// recognizes a response as a streaming response;
	// for such responses, writes are flushed to the client
	// immediately.
	FlushInterval time.Duration  // 客户端的刷新问题

	// ErrorLog specifies an optional logger for errors
	// that occur when attempting to proxy the request.
	// If nil, logging is done via the log package's standard logger.
	ErrorLog *log.Logger  // 错误记录器

	// BufferPool optionally specifies a buffer pool to
	// get byte slices for use by io.CopyBuffer when
	// copying HTTP response bodies.
	BufferPool BufferPool  // 定义一个缓冲池,在复制http相应时使用,提高请求效率

	// ModifyResponse is an optional function that modifies the
	// Response from the backend. It is called if the backend
	// returns a response at all, with any HTTP status code.
	// If the backend is unreachable, the optional ErrorHandler is
	// called without any call to ModifyResponse.
	//
	// If ModifyResponse returns an error, ErrorHandler is called
	// with its error value. If ErrorHandler is nil, its default
	// implementation is used.
	ModifyResponse func(*http.Response) error  // 修改response的函数,修改响应内容,如果有error会触发下面的ErrorHandler

	// ErrorHandler is an optional function that handles errors
	// reaching the backend or errors from ModifyResponse.
	//
	// If nil, the default is to log the provided error and return
	// a 502 Status Bad Gateway response.
	ErrorHandler func(http.ResponseWriter, *http.Request, error)  // 错误处理回调函数,如果为nil,遇到错误返回502
}
# 重写URL

简单实用ReverseProxy来实现反向代理

package main

import (
	"log"
	"net/http"
	"net/http/httputil"
	"net/url"
)
var add = "127.0.0.1:2002"

func main() {
	// 需要代理的下游服务url
	rs1 := "http://127.0.0.1:2003/base"  // 实现了重写url的规则
	//127.0.0.1:2002/xxx
	//127.0.0.1:2003/base/xxx
	url1,err1 := url.Parse(rs1)  // 解析url
	if err1 != nil{
		log.Println(err1)
	}
	proxy := httputil.NewSingleHostReverseProxy(url1) // 传递个url进去后,实现了一个direct,然后注册到ReverseProxy并返回
	log.Println("Starting httpServer at: "+add)
	log.Fatal(http.ListenAndServe(add,proxy)) // proxy中实现了Handler接口中的ServeHTTP
}
# 更改内容

reverse_proxy_step/main.go:重写下游返回的响应体,2002代理2003的服务

package main

import (
	"bytes"
	"errors"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"net/http/httputil"
	"net/url"
	"regexp"
	"strings"
)

var addr = "127.0.0.1:2002"

func main() {
	//127.0.0.1:2002/xxx
	//127.0.0.1:2003/base/xxx
	rs1 := "http://127.0.0.1:2003/base"
	url1, err1 := url.Parse(rs1)
	if err1 != nil {
		log.Println(err1)
	}
	proxy := NewSingleHostReverseProxy(url1)
	log.Println("Starting httpserver at " + addr)
	log.Fatal(http.ListenAndServe(addr, proxy))
}

func NewSingleHostReverseProxy(target *url.URL) *httputil.ReverseProxy {
	//http://127.0.0.1:2002/dir?name=123
	//RayQuery: name=123
	//Scheme: http
	//Host: 127.0.0.1:2002
	targetQuery := target.RawQuery
	// 请求体参数修改
	director := func(req *http.Request) {
		//url_rewrite
		//127.0.0.1:2002/dir/abc ==> 127.0.0.1:2003/base/abc ??
		//127.0.0.1:2002/dir/abc ==> 127.0.0.1:2002/abc
		//127.0.0.1:2002/abc ==> 127.0.0.1:2003/base/abc
		re, _ := regexp.Compile("^/dir(.*)");
		req.URL.Path = re.ReplaceAllString(req.URL.Path, "$1")  // 正则掉所有dir后面的地址,放在新的path中

		req.URL.Scheme = target.Scheme
		req.URL.Host = target.Host

		//target.Path : /base
		//req.URL.Path : /dir
		req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
		if targetQuery == "" || req.URL.RawQuery == "" {
			req.URL.RawQuery = targetQuery + req.URL.RawQuery
		} else {
			req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
		}
		if _, ok := req.Header["User-Agent"]; !ok {
			req.Header.Set("User-Agent", "")
		}
	}

	// 定义返回响应的函数
	modifyFunc := func(res *http.Response) error {
		// 错误码时返回
		if res.StatusCode != 200 {
			return errors.New("error statusCode")
		}
		// 正常情况下直接修改返回体
		oldPayload, err := ioutil.ReadAll(res.Body)  // 拿到返回内容
		if err != nil {
			return err
		}
		newPayLoad := []byte("hello " + string(oldPayload))  // 追加新内容
		res.Body = ioutil.NopCloser(bytes.NewBuffer(newPayLoad))  // NopCloser用一个无操作的Close方法包装r返回一个ReadCloser接口。因为res.Body是一个io.ReaderClose
		res.ContentLength = int64(len(newPayLoad))  // 修改响应长度
		res.Header.Set("Content-Length", fmt.Sprint(len(newPayLoad)))
		return nil
	}

	// 如果modifyFunc有返回error,则触犯这个错误回调
	errorHandler := func(res http.ResponseWriter, req *http.Request, err error) {
		res.Write([]byte(err.Error()))
	}
	return &httputil.ReverseProxy{Director: director, ModifyResponse: modifyFunc, ErrorHandler: errorHandler}
}

func singleJoiningSlash(a, b string) string {
	// URL路径拼接
	aslash := strings.HasSuffix(a, "/")
	bslash := strings.HasPrefix(b, "/")
	switch {
	case aslash && bslash:
		return a + b[1:]
	case !aslash && !bslash:
		return a + "/" + b
	}
	return a + b
}

# 负载均衡策略

  • 随机负载

    随机挑选目标服务器IP

  • 轮询负载

    ABC三台服务器,ABCABC依次轮询

  • 加权负载

    给目标设置访问权重,按照权重轮询

  • 一致性hash负载

    请求固定URL访问指定IP

# 随机负载

  • random.go

    type RandomBalance struct {
    	currIndex int
    	rss []string
    	// 观察主体
    }
    
    // 添加元素
    func (r *RandomBalance) Add(params ...string) error {
    	if len(params) == 0 {
    		return errors.New("param len 1 at least")
    	}
    	add := params[0]
    	r.rss=append(r.rss,add)
    	return nil
    }
    
    // 轮询取得
    func (r *RandomBalance) Next() string {
    	if len(r.rss) == 0{
    		return ""
    	}
    	r.currIndex = rand.Intn(len(r.rss))
    	return r.rss[r.currIndex]
    }
    
    func (r *RandomBalance)  Get( key string) (string, error){
    	return r.Next(),nil
    }
    
  • random_test.go

    func TestRandomBalance(t *testing.T) {
    	rb := &RandomBalance{}
    	rb.Add("127.0.0.1:2003") //0
    	rb.Add("127.0.0.1:2004") //1
    	rb.Add("127.0.0.1:2005") //2
    	rb.Add("127.0.0.1:2006") //3
    	rb.Add("127.0.0.1:2007") //4
    
    	fmt.Println(rb.Next())
    	fmt.Println(rb.Next())
    	fmt.Println(rb.Next())
    	fmt.Println(rb.Next())
    	fmt.Println(rb.Next())
    	fmt.Println(rb.Next())
    	fmt.Println(rb.Next())
    	fmt.Println(rb.Next())
    	fmt.Println(rb.Next())
    }
    

# 轮询负载

  • round_robin.go

    type RoundRobinBalance struct {
    	curIndex int
    	rss      []string
    	//观察主体
    
    }
    
    func (r *RoundRobinBalance) Add(params ...string) error {
    	if len(params) == 0 {
    		return errors.New("param len 1 at least")
    	}
    	addr := params[0]
    	r.rss = append(r.rss, addr)
    	return nil
    }
    
    func (r *RoundRobinBalance) Next() string {
    	if len(r.rss) == 0 {
    		return ""
    	}
    	lens := len(r.rss) //5
    	if r.curIndex >= lens {
    		r.curIndex = 0
    	}
    	curAddr := r.rss[r.curIndex]
    	r.curIndex = (r.curIndex + 1) % lens
    	return curAddr
    }
    
  • Robin_test.go

    func Test_main(t *testing.T) {
    	rb := &RoundRobinBalance{}
    	rb.Add("127.0.0.1:2003") //0
    	rb.Add("127.0.0.1:2004") //1
    	rb.Add("127.0.0.1:2005") //2
    	rb.Add("127.0.0.1:2006") //3
    	rb.Add("127.0.0.1:2007") //4
    
    	fmt.Println(rb.Next())
    	fmt.Println(rb.Next())
    	fmt.Println(rb.Next())
    	fmt.Println(rb.Next())
    	fmt.Println(rb.Next())
    	fmt.Println(rb.Next())
    	fmt.Println(rb.Next())
    	fmt.Println(rb.Next())
    	fmt.Println(rb.Next())
    }
    

# 加权轮询负载

  • 有三个概念:

    1、weight: 初始化时对节点约定的加权
    2、currentWeight: 节点临时权重,每轮都会变化
    3、effectWeight:节点有效权重,默认与weight相同
    4、totalWeight: 所有节点有效权重之和:sum(effectweight)
    
    忽略了有效权重,算法:
    1、currentWeight=currentWeight+effectWeight
    2、选中最大的currentWeight
    3、currentWeight=currentWeight-totalWeight
    
    请求次数 请求currentWeight 选中节点 请求后的currentWeight
    1 serverA=4,serverB=3,serverC=2 serverA serverA=-1,serverB=3,serverC=2
    2 serverA=-1,serverB=3,serverC=4 serverB serverA=3,serverB=0,serverC=6
    3 serverA=3,serverB=0,serverC=6 serverC serverA=7,serverB=3,serverC=-1

# 一致性hash负载

  • hash_robin.go

    package load_balance
    
    import (
    	"errors"
    	"hash/crc32"
    	"sort"
    	"strconv"
    	"sync"
    )
    
    type Hash func(data []byte) uint32
    
    type UInt32Slice []uint32
    
    func (s UInt32Slice) Len() int {
    	return len(s)
    }
    
    func (s UInt32Slice) Less(i, j int) bool {
    	return s[i] < s[j]
    }
    
    func (s UInt32Slice) Swap(i, j int) {
    	s[i], s[j] = s[j], s[i]
    }
    
    type ConsistentHashBanlance struct {
    	mux      sync.RWMutex
    	hash     Hash
    	replicas int               //复制因子
    	keys     UInt32Slice       //已排序的节点hash切片
    	hashMap  map[uint32]string //节点哈希和Key的map,键是hash值,值是节点key
    
    	//观察主体
    }
    
    func NewConsistentHashBanlance(replicas int, fn Hash) *ConsistentHashBanlance {
    	m := &ConsistentHashBanlance{
    		replicas: replicas,
    		hash:     fn,
    		hashMap:  make(map[uint32]string),
    	}
    	if m.hash == nil {
    		//最多32位,保证是一个2^32-1环
    		m.hash = crc32.ChecksumIEEE
    	}
    	return m
    }
    
    // 验证是否为空
    func (c *ConsistentHashBanlance) IsEmpty() bool {
    	return len(c.keys) == 0
    }
    
    // Add 方法用来添加缓存节点,参数为节点key,比如使用IP
    func (c *ConsistentHashBanlance) Add(params ...string) error {
    	if len(params) == 0 {
    		return errors.New("param len 1 at least")
    	}
    	addr := params[0]
    	c.mux.Lock()
    	defer c.mux.Unlock()
    	// 结合复制因子计算所有虚拟节点的hash值,并存入m.keys中,同时在m.hashMap中保存哈希值和key的映射
    	for i := 0; i < c.replicas; i++ {
    		hash := c.hash([]byte(strconv.Itoa(i) + addr))
    		c.keys = append(c.keys, hash)
    		c.hashMap[hash] = addr
    	}
    	// 对所有虚拟节点的哈希值进行排序,方便之后进行二分查找
    	sort.Sort(c.keys)
    	return nil
    }
    
    // Get 方法根据给定的对象获取最靠近它的那个节点
    func (c *ConsistentHashBanlance) Get(key string) (string, error) {
    	if c.IsEmpty() {
    		return "", errors.New("node is empty")
    	}
    	hash := c.hash([]byte(key))
    
    	// 通过二分查找获取最优节点,第一个"服务器hash"值大于"数据hash"值的就是最优"服务器节点"
    	idx := sort.Search(len(c.keys), func(i int) bool { return c.keys[i] >= hash })
    
    	// 如果查找结果 大于 服务器节点哈希数组的最大索引,表示此时该对象哈希值位于最后一个节点之后,那么放入第一个节点中
    	if idx == len(c.keys) {
    		idx = 0
    	}
    	c.mux.RLock()
    	defer c.mux.RUnlock()
    	return c.hashMap[c.keys[idx]], nil
    }
    
  • hash_test.go

    func TestNewConsistentHashBanlance(t *testing.T) {
    	rb := NewConsistentHashBanlance(10, nil)
    	rb.Add("127.0.0.1:2003") //0
    	rb.Add("127.0.0.1:2004") //1
    	rb.Add("127.0.0.1:2005") //2
    	rb.Add("127.0.0.1:2006") //3
    	rb.Add("127.0.0.1:2007") //4
    
    	//url hash
    	fmt.Println(rb.Get("http://127.0.0.1:2002/base/getinfo"))
    	fmt.Println(rb.Get("http://127.0.0.1:2002/base/error"))
    	fmt.Println(rb.Get("http://127.0.0.1:2002/base/getinfo"))
    	fmt.Println(rb.Get("http://127.0.0.1:2002/base/changepwd"))
    
    	//ip hash
    	fmt.Println(rb.Get("127.0.0.1"))
    	fmt.Println(rb.Get("192.168.0.1"))
    	fmt.Println(rb.Get("127.0.0.1"))
    }
    

# 负载工厂模式

  • factory.go

    package load_balance
    
    type LbType int
    
    const (
    	LbRandom LbType = iota
    	LbRoundRobin
    	LbWeightRoundRobin
    	LbConsistentHash
    )
    
    func LoadBanlanceFactory(lbType LbType) LoadBalance {
    	switch lbType {
    	case LbRandom:
    		return &RandomBalance{}
    	case LbConsistentHash:
    		return NewConsistentHashBanlance(10, nil)
    	case LbRoundRobin:
    		return &RoundRobinBalance{}
    	case LbWeightRoundRobin:
    		return &WeightRoundRobinBalance{}
    	default:
    		return &RandomBalance{}
    	}
    }
    
  • interface.go

    package load_balance
    
    type LoadBalance interface {
    	Add(...string) error
    	Get(string) (string, error)
    }
    
    
  • factory_test.go

    package load_balance
    
    import (
    	"bytes"
    	"io/ioutil"
    	"log"
    	"net"
    	"net/http"
    	"net/http/httputil"
    	"net/url"
    	"strconv"
    	"strings"
    	"testing"
    	"time"
    )
    
    var (
    	addr      = "127.0.0.1:2002"
    	transport = &http.Transport{
    		DialContext: (&net.Dialer{
    			Timeout:   30 * time.Second, //连接超时
    			KeepAlive: 30 * time.Second, //长连接超时时间
    		}).DialContext,
    		MaxIdleConns:          100,              //最大空闲连接
    		IdleConnTimeout:       90 * time.Second, //空闲超时时间
    		TLSHandshakeTimeout:   10 * time.Second, //tls握手超时时间
    		ExpectContinueTimeout: 1 * time.Second,  //100-continue状态码超时时间
    	}
    )
    
    func NewMultipleHostsReverseProxy(lb LoadBalance) *httputil.ReverseProxy {
    	//请求协调者
    	director := func(req *http.Request) {
    		nextAddr, err := lb.Get(req.URL.String())  // 通过URL的变化来负载分配,也可以根据RemoteIP来
    		if err != nil {
    			log.Fatal("get next addr fail")
    		}
    		target, err := url.Parse(nextAddr)
    		if err != nil {
    			log.Fatal(err)
    		}
    		targetQuery := target.RawQuery
    		req.URL.Scheme = target.Scheme
    		req.URL.Host = target.Host
    		req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
    		if targetQuery == "" || req.URL.RawQuery == "" {
    			req.URL.RawQuery = targetQuery + req.URL.RawQuery
    		} else {
    			req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
    		}
    		if _, ok := req.Header["User-Agent"]; !ok {
    			req.Header.Set("User-Agent", "user-agent")
    		}
    	}
    
    	//更改内容
    	modifyFunc := func(resp *http.Response) error {
    		//请求以下命令:curl 'http://127.0.0.1:2002/error'
    		if resp.StatusCode != 200 {
    			//获取内容
    			oldPayload, err := ioutil.ReadAll(resp.Body)
    			if err != nil {
    				return err
    			}
    			//追加内容
    			newPayload := []byte("StatusCode error:" + string(oldPayload))
    			resp.Body = ioutil.NopCloser(bytes.NewBuffer(newPayload))
    			resp.ContentLength = int64(len(newPayload))
    			resp.Header.Set("Content-Length", strconv.FormatInt(int64(len(newPayload)), 10))
    		}
    		return nil
    	}
    
    	//错误回调 :关闭real_server时测试,错误回调
    	//范围:transport.RoundTrip发生的错误、以及ModifyResponse发生的错误
    	errFunc := func(w http.ResponseWriter, r *http.Request, err error) {
    		//todo 如果是权重的负载则调整临时权重
    		http.Error(w, "ErrorHandler error:"+err.Error(), 500)
    	}
    
    	return &httputil.ReverseProxy{Director: director, Transport: transport, ModifyResponse: modifyFunc, ErrorHandler: errFunc}
    }
    
    func singleJoiningSlash(a, b string) string {
    	aslash := strings.HasSuffix(a, "/")
    	bslash := strings.HasPrefix(b, "/")
    	switch {
    	case aslash && bslash:
    		return a + b[1:]
    	case !aslash && !bslash:
    		return a + "/" + b
    	}
    	return a + b
    }
    
    func TestFactory(t *testing.T) {
    	rb := LoadBanlanceFactory(LbConsistentHash)
    	if err := rb.Add("http://127.0.0.1:2003/base", "10"); err != nil {
    		log.Println(err)
    	}
    	if err := rb.Add("http://127.0.0.1:2004/base", "20"); err != nil {
    		log.Println(err)
    	}
    	proxy := NewMultipleHostsReverseProxy(rb)
    	log.Println("Starting httpserver at " + addr)
    	log.Fatal(http.ListenAndServe(addr, proxy))
    }
    

# 中间件支持

# 中间件的意义

  • 避免程序逻辑复杂
  • 提高复用,隔离业务,比如权限认证我们可以单独抽离出来,通过中间件隔离业务
  • 调用清晰,组合随意

可以把中间件的原理理解成“洋葱”,一层一层的进一层一层的出来。

  • 中间件代码实现
    • 中间件一般都封装在路由上,路由是URL请求分发的管理器
    • 中间件选型:
      • 基于链表构建中间件(缺点,实现复杂,调用不方便)
      • 使用数组构建中间件,控制灵活方便,推荐使用
    • 方法组装
      • 构建中间件URL路由
      • 构建URL的中间件方法数组
      • 使用use方法整合路由和方法数组

# 开发一个中间件

  • middleware/slice_router.go

    package middleware
    
    import (
    	"context"
    	"math"
    	"net/http"
    	"strings"
    )
    
    //目标定位是 tcp、http通用的中间件
    //知其然也知其所以然
    
    const abortIndex int8 = math.MaxInt8 / 2 //最多 63 个中间件
    
    type HandlerFunc func(*SliceRouterContext)
    
    // router 结构体
    type SliceRouter struct {
    	groups []*SliceGroup
    }
    
    // group 结构体
    type SliceGroup struct {
    	*SliceRouter
    	path     string
    	handlers []HandlerFunc
    }
    
    // router上下文
    type SliceRouterContext struct {
    	Rw  http.ResponseWriter
    	Req *http.Request
    	Ctx context.Context
    	*SliceGroup
    	index int8
    }
    
    func newSliceRouterContext(rw http.ResponseWriter, req *http.Request, r *SliceRouter) *SliceRouterContext {
    	newSliceGroup := &SliceGroup{}
    
    	//最长url前缀匹配
    	matchUrlLen := 0
    	for _, group := range r.groups {
    		//fmt.Println("req.RequestURI")
    		//fmt.Println(req.RequestURI)
    		if strings.HasPrefix(req.RequestURI, group.path) {
    			pathLen := len(group.path)
    			if pathLen > matchUrlLen {
    				matchUrlLen = pathLen
    				*newSliceGroup = *group //浅拷贝数组指针
    			}
    		}
    	}
    
    	c := &SliceRouterContext{Rw: rw, Req: req, SliceGroup: newSliceGroup, Ctx: req.Context()}
    	c.Reset()
    	return c
    }
    
    func (c *SliceRouterContext) Get(key interface{}) interface{} {
    	return c.Ctx.Value(key)
    }
    
    func (c *SliceRouterContext) Set(key, val interface{}) {
    	c.Ctx = context.WithValue(c.Ctx, key, val)
    }
    
    type SliceRouterHandler struct {
    	coreFunc func(*SliceRouterContext) http.Handler
    	router   *SliceRouter
    }
    
    func (w *SliceRouterHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
    	c := newSliceRouterContext(rw, req, w.router)
    	if w.coreFunc != nil {
    		c.handlers = append(c.handlers, func(c *SliceRouterContext) {
    			w.coreFunc(c).ServeHTTP(rw, req)
    		})
    	}
    	c.Reset()
    	c.Next()
    }
    
    func NewSliceRouterHandler(coreFunc func(*SliceRouterContext) http.Handler, router *SliceRouter) *SliceRouterHandler {
    	return &SliceRouterHandler{
    		coreFunc: coreFunc,
    		router:   router,
    	}
    }
    
    // 构造 router
    func NewSliceRouter() *SliceRouter {
    	return &SliceRouter{}
    }
    
    // 创建 Group
    func (g *SliceRouter) Group(path string) *SliceGroup {
    	return &SliceGroup{
    		SliceRouter: g,
    		path:        path,
    	}
    }
    
    // 构造回调方法
    func (g *SliceGroup) Use(middlewares ...HandlerFunc) *SliceGroup {
    	g.handlers = append(g.handlers, middlewares...)
    	existsFlag := false
    	for _, oldGroup := range g.SliceRouter.groups {
    		if oldGroup == g {
    			existsFlag = true
    		}
    	}
    	if !existsFlag {
    		g.SliceRouter.groups = append(g.SliceRouter.groups, g)
    	}
    	return g
    }
    
    // 从最先加入中间件开始回调
    func (c *SliceRouterContext) Next() {
    	c.index++
    	for c.index < int8(len(c.handlers)) {
    		//fmt.Println("c.index")
    		//fmt.Println(c.index)
    		c.handlers[c.index](c)
    		c.index++
    	}
    }
    
    // 跳出中间件方法
    func (c *SliceRouterContext) Abort() {
    	c.index = abortIndex
    }
    
    // 是否跳过了回调
    func (c *SliceRouterContext) IsAborted() bool {
    	return c.index >= abortIndex
    }
    
    // 重置回调
    func (c *SliceRouterContext) Reset() {
    	c.index = -1
    }
    
  • middleware/proxy.go

    package middleware
    
    import (
    	"bytes"
    	"compress/gzip"
    	"fmt"
    	"io/ioutil"
    	"math/rand"
    	"net"
    	"net/http"
    	"net/http/httputil"
    	"net/url"
    	"strconv"
    	"strings"
    	"time"
    )
    
    var transport = &http.Transport{
    	DialContext: (&net.Dialer{
    		Timeout:   30 * time.Second, //连接超时
    		KeepAlive: 30 * time.Second, //长连接超时时间
    	}).DialContext,
    	MaxIdleConns:          100,              //最大空闲连接
    	IdleConnTimeout:       90 * time.Second, //空闲超时时间
    	TLSHandshakeTimeout:   10 * time.Second, //tls握手超时时间
    	ExpectContinueTimeout: 1 * time.Second,  //100-continue超时时间
    }
    
    func NewMultipleHostsReverseProxy(c *SliceRouterContext, targets []*url.URL) *httputil.ReverseProxy {
    	//请求协调者
    	director := func(req *http.Request) {
    		targetIndex := rand.Intn(len(targets))
    		target := targets[targetIndex]
    		targetQuery := target.RawQuery
    
    		req.URL.Scheme = target.Scheme
    		req.URL.Host = target.Host
    		req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
    		//todo 当对域名(非内网)反向代理时需要设置此项, 当作后端反向代理时不需要
    		req.Host = target.Host
    		if targetQuery == "" || req.URL.RawQuery == "" {
    			req.URL.RawQuery = targetQuery + req.URL.RawQuery
    		} else {
    			req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
    		}
    		if _, ok := req.Header["User-Agent"]; !ok {
    			req.Header.Set("User-Agent", "user-agent")
    		}
    	}
    
    	//更改内容
    	modifyFunc := func(resp *http.Response) error {
    		//todo 部分章节功能补充2
    		//todo 兼容websocket
    		if strings.Contains(resp.Header.Get("Connection"), "Upgrade") {
    			return nil
    		}
    		var payload []byte
    		var readErr error
    
    		//todo 部分章节功能补充3
    		//todo 兼容gzip压缩
    		if strings.Contains(resp.Header.Get("Content-Encoding"), "gzip") {
    			gr, err := gzip.NewReader(resp.Body)
    			if err != nil {
    				return err
    			}
    			payload, readErr = ioutil.ReadAll(gr)
    			resp.Header.Del("Content-Encoding")
    		} else {
    			payload, readErr = ioutil.ReadAll(resp.Body)
    		}
    		if readErr != nil {
    			return readErr
    		}
    
    		//异常请求时设置StatusCode
    		if resp.StatusCode != 200 {
    			payload = []byte("StatusCode error:" + string(payload))
    		}
    
    		//todo 部分章节功能补充4
    		//todo 因为预读了数据所以内容重新回写
    		c.Set("status_code", resp.StatusCode)
    		c.Set("payload", payload)
    		resp.Body = ioutil.NopCloser(bytes.NewBuffer(payload))
    		resp.ContentLength = int64(len(payload))
    		resp.Header.Set("Content-Length", strconv.FormatInt(int64(len(payload)), 10))
    		return nil
    	}
    
    	//错误回调 :关闭real_server时测试,错误回调
    	//范围:transport.RoundTrip发生的错误、以及ModifyResponse发生的错误
    	errFunc := func(w http.ResponseWriter, r *http.Request, err error) {
    		//todo record error log
    		fmt.Println(err)
    	}
    
    	return &httputil.ReverseProxy{Director: director, Transport: transport, ModifyResponse: modifyFunc, ErrorHandler: errFunc}
    }
    
    func singleJoiningSlash(a, b string) string {
    	aslash := strings.HasSuffix(a, "/")
    	bslash := strings.HasPrefix(b, "/")
    	switch {
    	case aslash && bslash:
    		return a + b[1:]
    	case !aslash && !bslash:
    		return a + "/" + b
    	}
    	return a + b
    }
    
  • middleware/rancelog.go

    package middleware
    
    import (
    	"log"
    )
    
    func TraceLogSliceMW() func(c *SliceRouterContext) {
    	return func(c *SliceRouterContext) {
    		log.Println("trace_in")
    		c.Next()
    		log.Println("trace_out")
    	}
    }
    
  • slice_test.go

    var addr = "127.0.0.1:2002"
    
    func TestSliceRouter(t *testing.T) {
    	reverseProxy := func(c *SliceRouterContext) http.Handler {
    		rs1 := "http://127.0.0.1:2003/base"
    		url1, err1 := url.Parse(rs1)
    		if err1 != nil {
    			log.Println(err1)
    		}
    
    		rs2 := "http://127.0.0.1:2004/base"
    		url2, err2 := url.Parse(rs2)
    		if err2 != nil {
    			log.Println(err2)
    		}
    
    		urls := []*url.URL{url1, url2}
    		return NewMultipleHostsReverseProxy(c,urls)
    	}
    	log.Println("Starting httpserver at " + addr)
    
    	//初始化方法数组路由器
    	sliceRouter := NewSliceRouter()
    
    	//中间件可充当业务逻辑代码
    	sliceRouter.Group("/base").Use(TraceLogSliceMW(), func(c *SliceRouterContext) {
    		c.Rw.Write([]byte("test func"))
    	})
    
    	//请求到反向代理
    	sliceRouter.Group("/").Use(TraceLogSliceMW(), func(c *SliceRouterContext) {
    		fmt.Println("reverseProxy")
    		reverseProxy(c).ServeHTTP(c.Rw, c.Req)
    	})
    	routerHandler := NewSliceRouterHandler(nil, sliceRouter)
    	log.Fatal(http.ListenAndServe(addr, routerHandler))
    }
    

# 限流

高并发系统的三大利器:缓存、降级、限流

  • 缓存:提升系统访问速度和增大处理容量,为相应业务增加缓存
  • 降级:当服务器压力剧增时,根据业务策略降级,以及释放服务资源保证业务正常
  • 限流:通过对并发限速,以达到拒绝服务、排队或等待、降级处理

# 限流的分类:

  • 漏桶限流:每次请求时计算桶流量,超过阀值则降级请求
  • 令牌桶限流:每次请求时从令牌桶取令牌,取不到则降级请求

我们通过采用漏桶限流的方式进行实现限流,使用time/rate限速器:

1、rate.NewLimter(limit,burst) limit表示每秒产生token数,burst最多存多少token
2、Allow 判断当前是否可以取到token
3、Wait阻塞等待直到取到token
4、Reserve返回等待时间,再去取token

原理解析
1、计算上次请求和当前请求的时间差
2、计算时间差内生成的token数+旧token数
3、如果token为负数,计算等待时间
4、token为正,则请求后token-1

代码实现

import (
	"context"
	"golang.org/x/time/rate"
	"log"
	"testing"
	"time"
)

func Test_RateLimiter(t *testing.T) {
	l := rate.NewLimiter(1, 6) // 每秒1个token,最多存放6个
	log.Println(l.Limit(), l.Burst())
	for i := 0; i < 10; i++ {
		//阻塞等待直到,取到一个token
		log.Println("before Wait")
		c, _ := context.WithTimeout(context.Background(), time.Second*2)
		if err := l.Wait(c); err != nil {
			log.Println("limiter wait err:" + err.Error())
		}
		log.Println("after Wait")

		//返回需要等待多久才有新的token,这样就可以等待指定时间执行任务
		r := l.Reserve()
		log.Println("reserve Delay:", r.Delay())

		//判断当前是否可以取到token
		a := l.Allow()
		log.Println("Allow:", a)
	}
}

# 网关集成限流功能

实现一个可以注册到中间件的限流器,代码如下:

func RateLimiter() func(c *middleware.SliceRouterContext) {
	l := rate.NewLimiter(1, 2)
	return func(c *middleware.SliceRouterContext) {
		if !l.Allow() {
			c.Rw.Write([]byte(fmt.Sprintf("rate limit:%v,%v", l.Limit(), l.Burst())))
			c.Abort()
			return
		}
		c.Next()
	}
}

写一个测试来实现代理限流

var addr = "127.0.0.1:2002"

// 限流方案
func TestRater(t *testing.T) {
	coreFunc := func(c *middleware.SliceRouterContext) http.Handler {  // 创建一个核心方法
		rs1 := "http://127.0.0.1:2003/base"
		url1, err1 := url.Parse(rs1)
		if err1 != nil {
			log.Println(err1)
		}

		rs2 := "http://127.0.0.1:2004/base"
		url2, err2 := url.Parse(rs2)
		if err2 != nil {
			log.Println(err2)
		}

		urls := []*url.URL{url1, url2}
		return middleware.NewMultipleHostsReverseProxy(c, urls)
	}
	log.Println("Starting httpserver at " + addr)

	sliceRouter := middleware.NewSliceRouter()  // 创建一个我们自己封装的路由
	sliceRouter.Group("/").Use(RateLimiter())
	routerHandler := middleware.NewSliceRouterHandler(coreFunc, sliceRouter) // 传入的是核心方法handler,和一个路由
	log.Fatal(http.ListenAndServe(addr, routerHandler))
}

# 熔断与降级

  • 熔断:熔断器是当依赖的服务已经出现故障时,为了保证自身服务的正常运行不再访问依赖服务,防止雪崩效应(保险丝)
  • 降级:当服务器压力剧增时,根据业务策略降级,以此释放服务资源保证业务正常

# 熔断器的三种状态

  • 关闭状态:服务正常,维护失败率统计,达到失败率阀值时,转为开启
  • 开启状态:服务异常,调用fallback函数,一段时间后,进入半开启状态
  • 半开启状态:尝试恢复服务,失败率高于阀值,进入开启状态,第一阀值,进入关闭状态(尝试走业务逻辑)

# hystrix-go

Hystrix-go类库,是熔断、降级、限流集成的类库,同时提供了监控面板;hystri-go的基本使用:

配套的Hystrix Dashboard用作展示服务状态,代码实现:

import (
	"errors"
	"github.com/afex/hystrix-go/hystrix"
	"log"
	"net/http"
	"testing"
	"time"
)

func Test_main(t *testing.T) {
	hystrixStreamHandler := hystrix.NewStreamHandler()
	hystrixStreamHandler.Start()
	go http.ListenAndServe(":8074", hystrixStreamHandler)

	hystrix.ConfigureCommand("aaa", hystrix.CommandConfig{
		Timeout:                1000, // 单次请求 超时时间
		MaxConcurrentRequests:  1,    // 最大并发量
		SleepWindow:            5000, // 熔断后多久去尝试服务是否可用
		RequestVolumeThreshold: 1,    // 验证熔断的 请求数量, 10秒内采样
		ErrorPercentThreshold:  1,    // 验证熔断的 错误百分比
	})

	for i := 0; i < 10000; i++ {
		//异步调用使用 hystrix.Go
		err := hystrix.Do("aaa", func() error {  // hystrix.Do是同步调用
			//test case 1 并发测试
			if i == 0 {
				return errors.New("service error")  // 第一次发生报错,触发熔断,之后如果并发量太多也会触发
			}
			//test case 2 超时测试
			//time.Sleep(2 * time.Second)
			log.Println("do services")
			return nil
		}, nil)
		if err != nil {
			log.Println("hystrix err:" + err.Error())
			time.Sleep(1 * time.Second)
			log.Println("sleep 1 second")
		}
	}
	time.Sleep(100 * time.Second)
}

# WebSocket

注意:反向代理服务器在客户端和服务端中间时,需要注意必须满足第一代理的角色

# WebSocket代理实战

webSocket实际服务器/客户端

package main

import (
	"flag"
	"github.com/gorilla/websocket"
	"html/template"
	"log"
	"net/http"
)

var addr = flag.String("addr", "localhost:2003", "http service address")

var upgrader = websocket.Upgrader{} // use default options

func echo(w http.ResponseWriter, r *http.Request) {
	c, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Print("upgrade:", err)
		return
	}
	defer c.Close()
	for {
		mt, message, err := c.ReadMessage()
		if err != nil {
			log.Println("read:", err)
			break
		}
		log.Printf("recv: %s", message)
		err = c.WriteMessage(mt, message)
		if err != nil {
			log.Println("write:", err)
			break
		}
	}
}

func home(w http.ResponseWriter, r *http.Request) {
	homeTemplate.Execute(w, "ws://"+r.Host+"/echo")
}

func main() {
	flag.Parse()
	log.SetFlags(0)
	http.HandleFunc("/echo", echo)
	http.HandleFunc("/", home)
	log.Println("Starting websocket server at " + *addr)
	log.Fatal(http.ListenAndServe(*addr, nil))
}

var homeTemplate = template.Must(template.New("").Parse(`
<!DOCTYPE html>
<head>
<meta charset="utf-8">
<script>  
window.addEventListener("load", function(evt) {
    var output = document.getElementById("output");
    var input = document.getElementById("input");
    var ws;
    var print = function(message) {
        var d = document.createElement("div");
        d.innerHTML = message;
        output.appendChild(d);
    };
    document.getElementById("open").onclick = function(evt) {
        if (ws) {
            return false;
        }
		var web_url=document.getElementById("web_url").value
        ws = new WebSocket(web_url);
        ws.onopen = function(evt) {
            print("OPEN");
        }
        ws.onclose = function(evt) {
            print("CLOSE");
            ws = null;
        }
        ws.onmessage = function(evt) {
            print("RESPONSE: " + evt.data);
        }
        ws.onerror = function(evt) {
            print("ERROR: " + evt.data);
        }
        return false;
    };
    document.getElementById("send").onclick = function(evt) {
        if (!ws) {
            return false;
        }
        print("SEND: " + input.value);
        ws.send(input.value);
        return false;
    };
    document.getElementById("close").onclick = function(evt) {
        if (!ws) {
            return false;
        }
        ws.close();
        return false;
    };
});
</script>
</head>
<body>
<table>
<tr><td valign="top" width="50%">
<p>Click "Open" to create a connection to the server, 
"Send" to send a message to the server and "Close" to close the connection. 
You can change the message and send multiple times.
<p>
<form>
<button id="open">Open</button>
<button id="close">Close</button>
<p><input id="web_url" type="text" value="{{.}}">
<p><input id="input" type="text" value="Hello world!">
<button id="send">Send</button>
</form>
</td><td valign="top" width="50%">
<div id="output"></div>
</td></tr></table>
</body>
</html>
`))

深入理解WebSocket的upgrade.Upgrade步骤:

1、获取Sec-Websocket-key

2、sha1生成Sec-Websocket-Accept

3、向客户端发送101status

设置反向代理

import (
	"learn-plus/proxy/load_balance"
	"learn-plus/proxy/middleware"
	"log"
	"net/http"
)

var (
	addr = "127.0.0.1:2002"
)

func main() {
	rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
	rb.Add("http://127.0.0.1:2003", "50")
	proxy := load_balance.NewLoadBalanceReverseProxy(&middleware.SliceRouterContext{}, rb)
	log.Println("Starting httpserver at " + addr)
	log.Fatal(http.ListenAndServe(addr, proxy))
}

# http2

# https和http2的关系

  • http2代表多路复用的传输协议
  • https代表http服务器使用了加密传输
  • 一个启用https的服务器不一定使用http2
  • 但是使用http2的服务器必须启用http2

# http2的设计目标

  • 大多数情况下的感知延迟要有实质上改进
  • 解决HTTP1.1中的“队首阻塞”问题
  • 并行操作无需与服务器建立多个连接

# http2基本概念

  • 流:流是连接中的虚拟信道,可以承载双向消息
  • 消息:是指逻辑上HTTP消息,比如请求、响应等
  • 帧:HTTP2.0 通信的最小单位,每个帧包含帧首部,至少也会识别出当前帧所属的流,承载着特定的数据

# https/http2下游服务实现

  • 实现下游服务

    package main
    
    
    import (
    	"fmt"
    	"golang.org/x/net/http2"
    	"io"
    	"learn-plus/https/testcrt"
    	"log"
    	"net/http"
    	"os"
    	"os/signal"
    	"syscall"
    	"time"
    )
    
    /*
    证书签名生成方式:
    //CA私钥
    openssl genrsa -out ca.key 2048
    //CA数据证书
    openssl req -x509 -new -nodes -key ca.key -subj "/CN=example1.com" -days 5000 -out ca.crt
    //服务器私钥(默认由CA签发)
    openssl genrsa -out server.key 2048
    //服务器证书签名请求:Certificate Sign Request,简称csr(example1.com代表你的域名)
    openssl req -new -key server.key -subj "/CN=example1.com" -out server.csr
    //上面2个文件生成服务器证书(days代表有效期)
    openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 5000
    */
    
    func main() {
    	rs1 := &RealServer{Addr: "127.0.0.1:3003"}
    	rs1.Run()
    	rs2 := &RealServer{Addr: "127.0.0.1:3004"}
    	rs2.Run()
    
    	//监听关闭信号
    	quit := make(chan os.Signal)
    	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    	<-quit
    }
    
    type RealServer struct {
    	Addr string
    }
    
    func (r *RealServer) Run() {
    	log.Println("Starting httpserver at " + r.Addr)
    	mux := http.NewServeMux()
    	mux.HandleFunc("/", r.HelloHandler)
    	mux.HandleFunc("/base/error", r.ErrorHandler)
    	server := &http.Server{
    		Addr:         r.Addr,
    		WriteTimeout: time.Second * 3,
    		Handler:      mux,
    	}
    	go func() {
    		http2.ConfigureServer(server, &http2.Server{})  // 设置http2的支持,以帧的形式输出
    		log.Fatal(server.ListenAndServeTLS(testcrt.Path("server.crt"), testcrt.Path("server.key")))
    	}()
    }
    
    func (r *RealServer) HelloHandler(w http.ResponseWriter, req *http.Request) {
    	upath := fmt.Sprintf("http://%s%s\n", r.Addr, req.URL.Path)
    	io.WriteString(w, upath)
    }
    
    func (r *RealServer) ErrorHandler(w http.ResponseWriter, req *http.Request) {
    	upath := "error handler"
    	w.WriteHeader(500)
    	io.WriteString(w, upath)
    }
    
  • testcrt/testdata.go

    // basepath is the root directory of this package.
    var basepath string
    
    func init() {
    	_, currentFile, _, _ := runtime.Caller(0)
    	basepath = filepath.Dir(currentFile)
    }
    
    // Path returns the absolute path the given relative file or directory path,
    // relative to the google.golang.org/grpc/testdata directory in the user's GOPATH.
    // If rel is already absolute, it is returned unmodified.
    func Path(rel string) string {
    	if filepath.IsAbs(rel) {
    		return rel
    	}
    	return filepath.Join(basepath, rel)
    }
    
  • 分析

    1、ListenAndServer调整ListenAndServeTLS即可支持http2
    2、http服务器设置http2.ConfigureServer支持http2
    	2.1、设置h2对应的handler回调
    	2.2、http.Server中设置了对h2的回调
    	2.3、h2回调方法负责从帧数据中转换数据为http请求
    

# Tcp代理

# 什么是负载均衡

  • 负载均衡(Load Balance)建立在现有网络结构之上,它提供了一种廉价有效透明的方法扩展网络设备和服务器的带宽、增加吞吐量、加强网络数据处理能力、提高网络的灵活性和可用性。负载均衡有两方面的含义:首先,大量的并发访问或数据流量分担到多台节点设备上分别处理,减少用户等待响应的时间;其次,单个重负载的运算分担到多台节点设备上做并行处理,每个节点设备处理结束后,将结果汇总,返回给用户,系统处理能力得到大幅度提高。

  • 简单来说就是:其一是将大量的并发处理转发给后端多个节点处理,减少工作响应时间;其二是将单个繁重的工作转发给后端多个节点处理,处理完再返回给负载均衡中心,再返回给用户。目前负载均衡技术大多数是用于提高诸如在Web服务器、FTP服务器和其它关键任务服务器上的Internet服务器程序的可用性和可伸缩性。

# 负载均衡分类

  • 二层负载均衡(mac):根据OSI模型分的二层负载,一般是用虚拟mac地址方式,外部对虚拟MAC地址请求,负载均衡接收后分配后端实际的MAC地址响应.
  • 三层负载均衡(ip):一般采用虚拟IP地址方式,外部对虚拟的ip地址请求,负载均衡接收后分配后端实际的IP地址响应. (即一个ip对一个ip的转发, 端口全放开)
  • 四层负载均衡(tcp):在三次负载均衡的基础上,即从第四层"传输层"开始, 使用"ip+port"接收请求,再转发到对应的机器。
  • 七层负载均衡(http):从第七层"应用层"开始, 根据虚拟的url或IP,主机名接收请求,再转向相应的处理服务器
我们运维中最常见的四层和七层负载均衡,这里重点说下这两种负载均衡。
1)四层的负载均衡就是基于IP+端口的负载均衡:在三层负载均衡的基础上,通过发布三层的IP地址(VIP),然后加四层的端口号,来决定哪些流量需要做负载均衡,对需要处理的流量进行NAT处理,转发至后台服务器,并记录下这个TCP或者UDP的流量是由哪台服务器处理的,后续这个连接的所有流量都同样转发到同一台服务器处理。
对应的负载均衡器称为四层交换机(L4 switch),主要分析IP层及TCP/UDP层,实现四层负载均衡。此种负载均衡器不理解应用协议(如HTTP/FTP/MySQL等等)。

实现四层负载均衡的软件有:
F5:硬件负载均衡器,功能很好,但是成本很高。
lvs:重量级的四层负载软件
nginx:轻量级的四层负载软件,带缓存功能,正则表达式较灵活
haproxy:模拟四层转发,较灵活

2)七层的负载均衡就是基于虚拟的URL或主机IP的负载均衡:在四层负载均衡的基础上(没有四层是绝对不可能有七层的),再考虑应用层的特征,比如同一个Web服务器的负载均衡,除了根据VIP加80端口辨别是否需要处理的流量,还可根据七层的URL、浏览器类别、语言来决定是否要进行负载均衡。举个例子,如果你的Web服务器分成两组,一组是中文语言的,一组是英文语言的,那么七层负载均衡就可以当用户来访问你的域名时,自动辨别用户语言,然后选择对应的语言服务器组进行负载均衡处理。

对应的负载均衡器称为七层交换机(L7 switch),除了支持四层负载均衡以外,还有分析应用层的信息,如HTTP协议URI或Cookie信息,实现七层负载均衡。此种负载均衡器能理解应用协议。
实现七层负载均衡的软件有:
haproxy:天生负载均衡技能,全面支持七层代理,会话保持,标记,路径转移;
nginx:只在http协议和mail协议上功能比较好,性能与haproxy差不多;
apache:功能较差
Mysql proxy:功能尚可。

总的来说,一般是lvs做4层负载;nginx做7层负载(也能做4层负载, 通过stream模块);haproxy比较灵活,4层和7层负载均衡都能做

# 四层负载和七层负载的区别

  • 实际是路由转发和反向代理的区别
  • 转发客户端与服务器只会有一次三次握手,而代理有两次
  • NAT是作用于内核运行,代理是用户程序运行的
  • 代理的数据进入程序buffer中

# Tcp代理原理

  • 本质上还是七层反向代理,只是代理的内容是tcp协议包
    • 初始化tcp服务器
    • 创建上下游连接
    • 上下游连接数据交换

# Tcp代理实现

参考http.util.RecerseProxy实现,服务与代理逻辑分离:

  • 构建tcp服务器
  • 构建tcp代理
  • tcp服务器与tcp代理结合,实现基于负载均衡的代理

构建tcp服务器

package main

import (
	"context"
	"fmt"
	"learn-plus/proxy/tcp_proxy"
	"log"
	"net"
)

var (
	addr = ":7002"
)

type tcpHandler struct {
}

func (t *tcpHandler) ServeTCP(ctx context.Context, src net.Conn) {
	src.Write([]byte("tcpHandler\n"))
}

func main() {
	//tcp服务器测试
	log.Println("Starting tcpserver at " + addr)
	tcpServ := tcp_proxy.TcpServer{
		Addr:    addr,
		Handler: &tcpHandler{},
	}
	fmt.Println("Starting tcp_server at " + addr)
	tcpServ.ListenAndServe()

	//代理测试
	//rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
	//rb.Add("127.0.0.1:6001", "40")
	//proxy := proxy.NewTcpLoadBalanceReverseProxy(&tcp_middleware.TcpSliceRouterContext{}, rb)
	//tcpServ := tcp_proxy.TcpServer{Addr: addr, Handler: proxy,}
	//fmt.Println("Starting tcp_proxy at " + addr)
	//tcpServ.ListenAndServe()

	//redis服务器测试
	//rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
	//rb.Add("127.0.0.1:6379", "40")
	//proxy := proxy.NewTcpLoadBalanceReverseProxy(&tcp_middleware.TcpSliceRouterContext{}, rb)
	//tcpServ := tcp_proxy.TcpServer{Addr: addr, Handler: proxy,}
	//fmt.Println("Starting tcp_proxy at " + addr)
	//tcpServ.ListenAndServe()

	//http服务器测试:
	//缺点对请求的管控不足,比如我们用来做baidu代理,因为无法更改请求host,所以很轻易把我们拒绝
	//rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
	//rb.Add("127.0.0.1:2003", "40")
	////rb.Add("www.baidu.com:80", "40")
	//proxy := proxy.NewTcpLoadBalanceReverseProxy(&tcp_tcp_middleware.TcpSliceRouterContext{}, rb)
	//tcpServ := tcp_proxy.TcpServer{Addr: addr, Handler: proxy,}
	//fmt.Println("tcp_proxy start at:" + addr)
	//tcpServ.ListenAndServe()

	//websocket服务器测试:缺点对请求的管控不足
	//rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
	//rb.Add("127.0.0.1:2003", "40")
	//proxy := proxy.NewTcpLoadBalanceReverseProxy(&tcp_middleware.TcpSliceRouterContext{}, rb)
	//tcpServ := tcp_proxy.TcpServer{Addr: addr, Handler: proxy,}
	//fmt.Println("Starting tcp_proxy at " + addr)
	//tcpServ.ListenAndServe()

	//http2服务器测试:缺点对请求的管控不足
	//rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
	//rb.Add("127.0.0.1:3003", "40")
	//proxy := proxy.NewTcpLoadBalanceReverseProxy(&tcp_middleware.TcpSliceRouterContext{}, rb)
	//tcpServ := tcp_proxy.TcpServer{Addr: addr, Handler: proxy,}
	//fmt.Println("Starting tcp_proxy at " + addr)
	//tcpServ.ListenAndServe()

prox/tcp_server.go

package tcp_proxy

import (
	"context"
	"errors"
	"fmt"
	"net"
	"sync"
	"sync/atomic"
	"time"
)

var (
	ErrServerClosed     = errors.New("tcp: Server closed")
	ErrAbortHandler     = errors.New("tcp: abort TCPHandler")
	ServerContextKey    = &contextKey{"tcp-server"}
	LocalAddrContextKey = &contextKey{"local-addr"}
)

type onceCloseListener struct {
	net.Listener
	once     sync.Once
	closeErr error
}

func (oc *onceCloseListener) Close() error {
	oc.once.Do(oc.close)
	return oc.closeErr
}

func (oc *onceCloseListener) close() {
	oc.closeErr = oc.Listener.Close()
}

type TCPHandler interface {
	ServeTCP(ctx context.Context, conn net.Conn)
}

type TcpServer struct {
	Addr    string
	Handler TCPHandler
	err     error
	BaseCtx context.Context

	WriteTimeout     time.Duration
	ReadTimeout      time.Duration
	KeepAliveTimeout time.Duration

	mu         sync.Mutex
	inShutdown int32
	doneChan   chan struct{}
	l          *onceCloseListener
}

func (s *TcpServer) shuttingDown() bool {
	return atomic.LoadInt32(&s.inShutdown) != 0
}

func (srv *TcpServer) ListenAndServe() error {
	if srv.shuttingDown() {
		return ErrServerClosed
	}
	if srv.doneChan == nil {
		srv.doneChan = make(chan struct{})
	}
	addr := srv.Addr
	if addr == "" {
		return errors.New("need addr")
	}
	ln, err := net.Listen("tcp", addr)
	if err != nil {
		return err
	}
	return srv.Serve(tcpKeepAliveListener{
		ln.(*net.TCPListener)})
}

func (srv *TcpServer) Close() error {
	atomic.StoreInt32(&srv.inShutdown, 1)
	close(srv.doneChan) //关闭channel
	srv.l.Close()       //执行listener关闭
	return nil
}

func (srv *TcpServer) Serve(l net.Listener) error {
	srv.l = &onceCloseListener{Listener: l}
	defer srv.l.Close() //执行listener关闭
	if srv.BaseCtx == nil {
		srv.BaseCtx = context.Background()
	}
	baseCtx := srv.BaseCtx
	ctx := context.WithValue(baseCtx, ServerContextKey, srv)
	for {
		rw, e := l.Accept()
		if e != nil {
			select {
			case <-srv.getDoneChan():
				return ErrServerClosed
			default:
			}
			fmt.Printf("accept fail, err: %v\n", e)
			continue
		}
		c := srv.newConn(rw)
		go c.serve(ctx)
	}
	return nil
}

func (srv *TcpServer) newConn(rwc net.Conn) *conn {
	c := &conn{
		server: srv,
		rwc:    rwc,
	}
	// 设置参数
	if d := c.server.ReadTimeout; d != 0 {
		c.rwc.SetReadDeadline(time.Now().Add(d))
	}
	if d := c.server.WriteTimeout; d != 0 {
		c.rwc.SetWriteDeadline(time.Now().Add(d))
	}
	if d := c.server.KeepAliveTimeout; d != 0 {
		if tcpConn, ok := c.rwc.(*net.TCPConn); ok {
			tcpConn.SetKeepAlive(true)
			tcpConn.SetKeepAlivePeriod(d)
		}
	}
	return c
}

func (s *TcpServer) getDoneChan() <-chan struct{} {
	s.mu.Lock()
	defer s.mu.Unlock()
	if s.doneChan == nil {
		s.doneChan = make(chan struct{})
	}
	return s.doneChan
}

func ListenAndServe(addr string, handler TCPHandler) error {
	server := &TcpServer{Addr: addr, Handler: handler, doneChan: make(chan struct{}),}
	return server.ListenAndServe()
}

proxy/conn.go

package tcp_proxy

import (
	"context"
	"fmt"
	"net"
	"runtime"
)

type tcpKeepAliveListener struct {
	*net.TCPListener
}

//todo 思考点:继承方法覆写方法时,只要使用非指针接口
func (ln tcpKeepAliveListener) Accept() (net.Conn, error) {
	tc, err := ln.AcceptTCP()
	if err != nil {
		return nil, err
	}
	return tc, nil
}

type contextKey struct {
	name string
}

func (k *contextKey) String() string {
	return "tcp_proxy context value " + k.name
}

type conn struct {
	server     *TcpServer
	cancelCtx  context.CancelFunc
	rwc        net.Conn
	remoteAddr string
}

func (c *conn) close() {
	c.rwc.Close()
}

func (c *conn) serve(ctx context.Context) {
	defer func() {
		if err := recover(); err != nil && err != ErrAbortHandler {
			const size = 64 << 10
			buf := make([]byte, size)
			buf = buf[:runtime.Stack(buf, false)]
			fmt.Printf("tcp: panic serving %v: %v\n%s", c.remoteAddr, err, buf)
		}
		c.close()
	}()
	c.remoteAddr = c.rwc.RemoteAddr().String()
	ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
	if c.server.Handler == nil {
		panic("handler empty")
	}
	c.server.Handler.ServeTCP(ctx, c.rwc)
}

# Tcp反向代理实现

proxy_tcp.go

package main

import (
	"context"
	"fmt"
	"learn-plus/proxy/load_balance"
	"learn-plus/proxy/proxy"
	"learn-plus/proxy/tcp_middleware"
	"learn-plus/proxy/tcp_proxy"
	"net"
)

var (
	addr = ":2002"
)

type tcpHandler struct {
}

func (t *tcpHandler) ServeTCP(ctx context.Context, src net.Conn) {
	src.Write([]byte("tcpHandler\n"))
}

func main() {
	//tcp服务器测试
	//log.Println("Starting tcpserver at " + addr)
	//tcpServ := tcp_proxy.TcpServer{
	//	Addr:    addr,
	//	Handler: &tcpHandler{},
	//}
	//fmt.Println("Starting tcp_server at " + addr)
	//tcpServ.ListenAndServe()

	//代理测试
	rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
	rb.Add("127.0.0.1:7002", "40")  // 代理上游服务

	proxy := proxy.NewTcpLoadBalanceReverseProxy(&tcp_middleware.TcpSliceRouterContext{}, rb)
	tcpServ := tcp_proxy.TcpServer{Addr: addr, Handler: proxy,}
	fmt.Println("Starting tcp_proxy at " + addr)
	tcpServ.ListenAndServe()

	//redis服务器测试
	//rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
	//rb.Add("127.0.0.1:6379", "40")
	//proxy := proxy.NewTcpLoadBalanceReverseProxy(&tcp_middleware.TcpSliceRouterContext{}, rb)
	//tcpServ := tcp_proxy.TcpServer{Addr: addr, Handler: proxy,}
	//fmt.Println("Starting tcp_proxy at " + addr)
	//tcpServ.ListenAndServe()

	//http服务器测试:
	//缺点对请求的管控不足,比如我们用来做baidu代理,因为无法更改请求host,所以很轻易把我们拒绝
	//rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
	//rb.Add("127.0.0.1:2003", "40")
	////rb.Add("www.baidu.com:80", "40")
	//proxy := proxy.NewTcpLoadBalanceReverseProxy(&tcp_tcp_middleware.TcpSliceRouterContext{}, rb)
	//tcpServ := tcp_proxy.TcpServer{Addr: addr, Handler: proxy,}
	//fmt.Println("tcp_proxy start at:" + addr)
	//tcpServ.ListenAndServe()

	//websocket服务器测试:缺点对请求的管控不足
	//rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
	//rb.Add("127.0.0.1:2003", "40")
	//proxy := proxy.NewTcpLoadBalanceReverseProxy(&tcp_middleware.TcpSliceRouterContext{}, rb)
	//tcpServ := tcp_proxy.TcpServer{Addr: addr, Handler: proxy,}
	//fmt.Println("Starting tcp_proxy at " + addr)
	//tcpServ.ListenAndServe()

	//http2服务器测试:缺点对请求的管控不足
	//rb := load_balance.LoadBanlanceFactory(load_balance.LbWeightRoundRobin)
	//rb.Add("127.0.0.1:3003", "40")
	//proxy := proxy.NewTcpLoadBalanceReverseProxy(&tcp_middleware.TcpSliceRouterContext{}, rb)
	//tcpServ := tcp_proxy.TcpServer{Addr: addr, Handler: proxy,}
	//fmt.Println("Starting tcp_proxy at " + addr)
	//tcpServ.ListenAndServe()
}

Proxy/tcp_reverse_proxy.go

package proxy

import (
	"context"
	"learn-plus/proxy/load_balance"
	"learn-plus/proxy/tcp_middleware"

	"io"
	"log"
	"net"
	"time"
)

func NewTcpLoadBalanceReverseProxy(c *tcp_middleware.TcpSliceRouterContext, lb load_balance.LoadBalance) *TcpReverseProxy {
	return func() *TcpReverseProxy {
		nextAddr, err := lb.Get("")
		if err != nil {
			log.Fatal("get next addr fail")
		}
		return &TcpReverseProxy{
			ctx:             c.Ctx,
			Addr:            nextAddr,
			KeepAlivePeriod: time.Second,
			DialTimeout:     time.Second,
		}
	}()
}

//TCP反向代理
type TcpReverseProxy struct {
	ctx                  context.Context //单次请求单独设置
	Addr                 string
	KeepAlivePeriod      time.Duration //设置
	DialTimeout          time.Duration //设置超时时间
	DialContext          func(ctx context.Context, network, address string) (net.Conn, error)
	OnDialError          func(src net.Conn, dstDialErr error)
	ProxyProtocolVersion int
}

func (dp *TcpReverseProxy) dialTimeout() time.Duration {
	if dp.DialTimeout > 0 {
		return dp.DialTimeout
	}
	return 10 * time.Second
}

var defaultDialer = new(net.Dialer)

func (dp *TcpReverseProxy) dialContext() func(ctx context.Context, network, address string) (net.Conn, error) {
	if dp.DialContext != nil {
		return dp.DialContext
	}
	return (&net.Dialer{
		Timeout:   dp.DialTimeout,     //连接超时
		KeepAlive: dp.KeepAlivePeriod, //设置连接的检测时长
	}).DialContext
}

func (dp *TcpReverseProxy) keepAlivePeriod() time.Duration {
	if dp.KeepAlivePeriod != 0 {
		return dp.KeepAlivePeriod
	}
	return time.Minute
}

//传入上游 conn,在这里完成下游连接与数据交换
func (dp *TcpReverseProxy) ServeTCP(ctx context.Context, src net.Conn) {
	//设置连接超时
	var cancel context.CancelFunc
	if dp.DialTimeout >= 0 {
		ctx, cancel = context.WithTimeout(ctx, dp.dialTimeout())
	}
	dst, err := dp.dialContext()(ctx, "tcp", dp.Addr)
	if cancel != nil {
		cancel()
	}
	if err != nil {
		dp.onDialError()(src, err)
		return
	}

	defer func() { go dst.Close() }() //记得退出下游连接

	//设置dst的 keepAlive 参数,在数据请求之前
	if ka := dp.keepAlivePeriod(); ka > 0 {
		if c, ok := dst.(*net.TCPConn); ok {
			c.SetKeepAlive(true)
			c.SetKeepAlivePeriod(ka)
		}
	}
	errc := make(chan error, 1)
	go dp.proxyCopy(errc, src, dst)
	go dp.proxyCopy(errc, dst, src)
	<-errc
}

func (dp *TcpReverseProxy) onDialError() func(src net.Conn, dstDialErr error) {
	if dp.OnDialError != nil {
		return dp.OnDialError
	}
	return func(src net.Conn, dstDialErr error) {
		log.Printf("tcpproxy: for incoming conn %v, error dialing %q: %v", src.RemoteAddr().String(), dp.Addr, dstDialErr)
		src.Close()
	}
}

func (dp *TcpReverseProxy) proxyCopy(errc chan<- error, dst, src net.Conn) {
	_, err := io.Copy(dst, src)
	errc <- err
}

# thrift安装介绍

安装:https://thrift.apache.org/docs/install/

构建thrift测试sever和client

  • 首先编写 thrift_gen.thrift
  • 运行IDL生成命令 thrift --gen go thrift_gen.thrift
  • 使用生成的IDL单独构建 server 与 client 即可

介绍:

  • Facebook的开源项目,后来进入apache孵化
  • thrift也是支持跨语言的一个rpc框架,有自己的一套IDL
  • thrift的网络协议建立在tcp的基础上

# gRPC透明代理

介绍:gRPC是谷歌出品的一个高性能、开源和通用的RPC框架,是基于http/2标准设计,支持普通rpc也支持双向流式传递数据,相对于thrift连接可以多路复用,可传递header头数据

(关于grpc方面的功能演示,统一使用非加密方式),首先确保grpc安装,步骤如下:

官网:https://github.com/grpc/grpc-go

# 安装步骤

# go mod安装方式
  • 开启 go mod export GO111MODULE=on
  • 开启代理 go mod export GOPROXY=https://goproxy.io
  • 安装grpc go get -u google.golang.org/grpc
  • 安装proto go get -u github.com/golang/protobuf/proto
  • 安装protoc-gen-go go get -u github.com/golang/protobuf/protoc-gen-go
# 遇到命令不存在 command not found: protoc
缺少protobuf预先安装导致的问题。 protoc-gen-go相当于只是protobuf的一个语言支持。

安装protobuf参照资料
mac 下   brew install protobuf
windows 下 https://blog.csdn.net/qq_41185868/article/details/82882206
linux 下 https://blog.csdn.net/sszzyzzy/article/details/89946075
# 遇到 I/O Timeout Errors
$ go get -u google.golang.org/grpc
package google.golang.org/grpc: unrecognized import path "google.golang.org/grpc" (https fetch: Get https://google.golang.org/grpc?go-get=1: dial tcp 216.239.37.1:443: i/o timeout)
  • 要么翻墙解决问题
  • 要么设置 export GOPROXY=https://goproxy.io
# 遇到 permission denied
go get -u google.golang.org/grpc
go get github.com/golang/protobuf/protoc-gen-go: open /usr/local/go/bin/protoc-gen-go: permission denied
  • 要么给目录增加读写权限 chmod -R 777 /usr/local/go/
  • 要么设置重新设置一下GOROOT地址指向非系统目录 比如 mac 下面 vim ~/.bashrc
#export GOROOT=/usr/local/go
#export GOBIN=/usr/local/go/bin
#export LGOBIN=/usr/local/go/bin
export GOROOT=/Users/niuyufu/go_1.12
export GOBIN=/Users/niuyufu/go_1.12/bin
export LGOBIN=/Users/niuyufu/go_1.12/bin

# 构建grpc测试server与client

  • 首先编写 echo.proto
  • 运行IDL生成命令,(如:遇到命令不存在 command not found: protoc 请参照上文帮助)
protoc -I . --go_out=plugins=grpc:proto ./echo.proto
  • 使用生成的IDL单独构建 server 与 client 即可

# 构建grpc-gateway 测试服务端让服务器支持http

# 安装参考
https://github.com/grpc-ecosystem/grpc-gateway
  • 开启 go mod export GO111MODULE=on
  • 开启代理 go mod export GOPROXY=https://goproxy.io
  • 执行安装命令
go install github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
go install  github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger
go install github.com/golang/protobuf/protoc-gen-go

# 构建grpc-gateway 测试服务端

  • 编写 echo-gateway.proto
  • 运行IDL生成命令
protoc -I/usr/local/include -I. -I$GOPATH/src -I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis --go_out=plugins=grpc:proto echo-gateway.proto
  • 删除 proto/echo.pb.go 防止结构体冲突 rm proto/echo.pb.go
  • 运行gateway生成命令
protoc -I/usr/local/include -I. -I$GOPATH/src \
  -I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis --grpc-gateway_out=logtostderr=true:proto echo-gateway.proto
  • 使用生成的IDL单独构建 server
  • 使用浏览器测试 server
curl 'http://127.0.0.1:8081/v1/example/echo' -d '{"message":"11222222"}'

# 代码实现

  • grpc/echo.proto

    syntax = "proto3";
    
    package echo;
    
    option go_package = ".;proto";  // 输出文件
    
    // EchoRequest is the request for echo.
    message EchoRequest {
        string message = 1;
    }
    
    // EchoResponse is the response for echo.
    message EchoResponse {
        string message = 1;
    }
    
    // Echo is the echo service.
    service Echo {
        // UnaryEcho is unary echo.
        rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}
        // ServerStreamingEcho is server side streaming.
        rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}
        // ClientStreamingEcho is client side streaming.
        rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}
        // BidirectionalStreamingEcho is bidi streaming.
        rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {}
    }
    

    在grpc文件目录下创建proto文件夹,执行:protoc -I . --go_out=plugins=grpc:proto ./echo.proto命令,会输出一个满足实现proto文件的go文件,我们的客户端和服务端都是根据这个桥梁来进行通信

  • grpc/server/main.go

    package main
    
    import (
    	"context"
    	"flag"
    	"fmt"
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/metadata"
    	"io"
    	pb "learn-plus/grpc/proto"
    	"log"
    	"net"
    )
    
    var port = flag.Int("port", 50055, "the port to serve on")
    
    const (
    	streamingCount  = 10
    )
    
    type server struct{}
    
    func (s *server) ServerStreamingEcho(in *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error {
    	fmt.Printf("--- ServerStreamingEcho ---\n")
    	fmt.Printf("request received: %v\n", in)
    	// Read requests and send responses.
    	for i := 0; i < streamingCount; i++ {
    		fmt.Printf("echo message %v\n", in.Message)
    		err := stream.Send(&pb.EchoResponse{Message: in.Message})
    		if err != nil {
    			return err
    		}
    	}
    	return nil
    }
    
    func (s *server) ClientStreamingEcho(stream pb.Echo_ClientStreamingEchoServer) error {
    	fmt.Printf("--- ClientStreamingEcho ---\n")
    	// Read requests and send responses.
    	var message string
    	for {
    		in, err := stream.Recv()
    		if err == io.EOF {
    			fmt.Printf("echo last received message\n")
    			return stream.SendAndClose(&pb.EchoResponse{Message: message})
    		}
    		message = in.Message
    		fmt.Printf("request received: %v, building echo\n", in)
    		if err != nil {
    			return err
    		}
    	}
    }
    
    func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
    	fmt.Printf("--- BidirectionalStreamingEcho ---\n")
    	// Read requests and send responses.
    	for {
    		in, err := stream.Recv()
    		if err == io.EOF {
    			return nil
    		}
    		if err != nil {
    			return err
    		}
    		fmt.Printf("request received %v, sending echo\n", in)
    		if err := stream.Send(&pb.EchoResponse{Message: in.Message}); err != nil {
    			return err
    		}
    	}
    }
    
    func (s *server) UnaryEcho(ctx context.Context, in *pb.EchoRequest) (*pb.EchoResponse, error) {
    	fmt.Printf("--- UnaryEcho ---\n")
    	md, ok := metadata.FromIncomingContext(ctx)
    	if !ok {
    		log.Println("miss metadata from context")
    	}
    	fmt.Println("md",md)
    	fmt.Printf("request received: %v, sending echo\n", in)
    	return &pb.EchoResponse{Message: in.Message}, nil
    }
    
    func main() {
    	flag.Parse()
    	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
    	if err != nil {
    		log.Fatalf("failed to listen: %v", err)
    	}
    	fmt.Printf("server listening at %v\n", lis.Addr())
    	s := grpc.NewServer()
    	pb.RegisterEchoServer(s, &server{})
    	s.Serve(lis)
    }
    
  • grace/client/main.go

    package main
    
    import (
    	"context"
    	"flag"
    	"fmt"
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/metadata"
    	"io"
    	pb "learn-plus/grpc/proto"
    	"log"
    	"sync"
    	"time"
    )
    
    var addr = flag.String("addr", "localhost:50055", "the address to connect to")
    
    const (
    	timestampFormat = time.StampNano // "Jan _2 15:04:05.000"
    	streamingCount  = 10
    	AccessToken="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1ODk2OTExMTQsImlzcyI6ImFwcF9pZF9iIn0.qb2A_WsDP_-jfQBxJk6L57gTnAzZs-SPLMSS_UO6Gkc"
    )
    
    func unaryCallWithMetadata(c pb.EchoClient, message string) {
    	fmt.Printf("--- unary ---\n")
    
    	// Create metadata and context.
    	md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
    	md.Append("authorization", "Bearer "+AccessToken)
    
    	ctx := metadata.NewOutgoingContext(context.Background(), md)
    	r, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: message})
    	if err != nil {
    		log.Fatalf("failed to call UnaryEcho: %v", err)
    	}
    	fmt.Printf("response:%v\n", r.Message)
    }
    
    func serverStreamingWithMetadata(c pb.EchoClient, message string) {
    	fmt.Printf("--- server streaming ---\n")
    
    	md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
    	md.Append("authorization", "Bearer "+AccessToken)
    	ctx := metadata.NewOutgoingContext(context.Background(), md)
    
    	stream, err := c.ServerStreamingEcho(ctx, &pb.EchoRequest{Message: message})
    	if err != nil {
    		log.Fatalf("failed to call ServerStreamingEcho: %v", err)
    	}
    
    	// Read all the responses.
    	var rpcStatus error
    	fmt.Printf("response:\n")
    	for {
    		r, err := stream.Recv()
    		if err != nil {
    			rpcStatus = err
    			break
    		}
    		fmt.Printf(" - %s\n", r.Message)
    	}
    	if rpcStatus != io.EOF {
    		log.Fatalf("failed to finish server streaming: %v", rpcStatus)
    	}
    }
    
    func clientStreamWithMetadata(c pb.EchoClient, message string) {
    	fmt.Printf("--- client streaming ---\n")
    	md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
    	md.Append("authorization", "Bearer "+AccessToken)
    	ctx := metadata.NewOutgoingContext(context.Background(), md)
    	stream, err := c.ClientStreamingEcho(ctx)
    	if err != nil {
    		log.Fatalf("failed to call ClientStreamingEcho: %v\n", err)
    	}
    
    	// Send all requests to the server.
    	for i := 0; i < streamingCount; i++ {
    		if err := stream.Send(&pb.EchoRequest{Message: message}); err != nil {
    			log.Fatalf("failed to send streaming: %v\n", err)
    		}
    	}
    
    	// Read the response.
    	r, err := stream.CloseAndRecv()
    	if err != nil {
    		log.Fatalf("failed to CloseAndRecv: %v\n", err)
    	}
    	fmt.Printf("response:%v\n", r.Message)
    }
    
    func bidirectionalWithMetadata(c pb.EchoClient, message string) {
    	fmt.Printf("--- bidirectional ---\n")
    	md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
    	md.Append("authorization", "Bearer "+AccessToken)
    	ctx := metadata.NewOutgoingContext(context.Background(), md)
    	stream, err := c.BidirectionalStreamingEcho(ctx)
    	if err != nil {
    		log.Fatalf("failed to call BidirectionalStreamingEcho: %v\n", err)
    	}
    
    	go func() {
    		// Send all requests to the server.
    		for i := 0; i < streamingCount; i++ {
    			if err := stream.Send(&pb.EchoRequest{Message: message}); err != nil {
    				log.Fatalf("failed to send streaming: %v\n", err)
    			}
    		}
    		stream.CloseSend()
    	}()
    
    	// Read all the responses.
    	var rpcStatus error
    	fmt.Printf("response:\n")
    	for {
    		r, err := stream.Recv()
    		if err != nil {
    			rpcStatus = err
    			break
    		}
    		fmt.Printf(" - %s\n", r.Message)
    	}
    	if rpcStatus != io.EOF {
    		log.Fatalf("failed to finish server streaming: %v", rpcStatus)
    	}
    }
    
    const message = "this is examples/metadata"
    
    func main() {
    	flag.Parse()
    	wg := sync.WaitGroup{}
    	for i := 0; i < 1; i++ {
    		wg.Add(1)
    		go func() {
    			defer wg.Done()
    			conn, err := grpc.Dial(*addr, grpc.WithInsecure())
    			if err != nil {
    				log.Fatalf("did not connect: %v", err)
    			}
    			defer conn.Close()
    
    			c := pb.NewEchoClient(conn)
    
    			//调用一元方法
    			//for i := 0; i < 100; i++ {
    			unaryCallWithMetadata(c, message)
    			time.Sleep(400 * time.Millisecond)
    			//}
    			//
    			//服务端流式
    			serverStreamingWithMetadata(c, message)
    			time.Sleep(1 * time.Second)
    
    			//客户端流式
    			clientStreamWithMetadata(c, message)
    			time.Sleep(1 * time.Second)
    
    			//双向流式
    			bidirectionalWithMetadata(c, message)
    		}()
    	}
    	wg.Wait()
    	time.Sleep(1 * time.Second)
    }
    

# gRPC实现反向代理

设置gRPC反向代理和透明服务:https://github.com/e421083458/gateway_demo/tree/master/demo/proxy/grpc_reverse_proxy

# 网关服务发现

服务发现是指用注册中心来记录服务信息,以便其他服务快速查找已注册服务,服务发现分类:

  • 客户端服务发现(服务发现模块)
  • 服务端服务发现(注册中心)

# zookeeper介绍

是一个分布式数据库(程序协调服务),Hadoop子项目;是一个树状方式维护节点的数据增删改查;监听通知机制,通过监听可以获取相应的消息事件

# zookeeper核心功能补充

  • 持久节点:一直保存在服务器上
  • 临时节点:会话失效,节点自动清理
  • 顺序节点:节点创建,自动分配序列号

# zookeeper安装

  • 参考官方文档安装:https://zookeeper.apache.org/doc/r3.6.0/zookeeperStarted.html#sc_Download

  • 解压文件

    > tar -zxvf zookeeper-3.4.10.tar.gz //解压
    > cd zookeeper-3.4.10/conf //切换到配置目录下
    > mv zoo_sample.cfg zoo.cfg //更改默认配置文件名称
    > vi zoo.cfg //编辑配置文件,自定义dataDir
    
  • 修改配置文件:

    tickTime: zookeeper中使用的基本时间单位, 毫秒
    dataDir: 内存数据快照的保存目录;如果没有自定义Log也使用该目录
    clientPort: 监听Cli连接的端口号
    
  • zookeepr使用增删改查

    [zk: 127.0.0.1:2181(CONNECTED) 2] ls /
    [zookeeper]
    [zk: 127.0.0.1:2181(CONNECTED) 3] create /zk_test my_data   // 增
    Created /zk_test
    [zk: 127.0.0.1:2181(CONNECTED) 4] ls /
    [zookeeper, zk_test]
    [zk: 127.0.0.1:2181(CONNECTED) 5] get /zk_test   // 查
    my_data
    [zk: 127.0.0.1:2181(CONNECTED) 6] set /zk_test junk // 该
    [zk: 127.0.0.1:2181(CONNECTED) 8] delete /zk_test  // 删
    
  • 运行于关闭:

    > cd zookeeper-3.4.10/bin //切换到 bin目录
    > ./zkServer.sh start //启动
    
    
    > ./zkServer.sh stop //停止后,如果CLi没有关闭,将报错
    

# Go操作zookeeper

  • 代码实现

    package main
    
    import (
    	"fmt"
    	"github.com/samuel/go-zookeeper/zk"
    	"time"
    )
    
    var (
    	host = []string{"127.0.0.1:2181"}
    )
    
    func main() {
    	conn, _, err := zk.Connect(host, 5*time.Second)
    	if err != nil {
    		panic(err)
    	}
    
    	//增
    	if _, err := conn.Create("/test_tree2", []byte("tree_content"),
    		0, zk.WorldACL(zk.PermAll)); err != nil {  // acl是权限
    		fmt.Println("create err", err)
    	}
    
    	//查
    	nodeValue, dStat, err := conn.Get("/test_tree2")
    	if err != nil {
    		fmt.Println("get err", err)
    		return
    	}
    	fmt.Println("nodeValue", string(nodeValue))
    
    	//改
    	if _, err := conn.Set("/test_tree2", []byte("new_content"),
    		dStat.Version); err != nil {
    		fmt.Println("update err", err)
    	}
    
    	//删除
    	_, dStat, _ = conn.Get("/test_tree2")
    	if err := conn.Delete("/test_tree2", dStat.Version); err != nil {
    		fmt.Println("Delete err", err)
    		//return
    	}
    
    	//验证存在
    	hasNode, _, err := conn.Exists("/test_tree2")
    	if err != nil {
    		fmt.Println("Exists err", err)
    		//return
    	}
    	fmt.Println("node Exist", hasNode)
    
    	//增加
    	if _, err := conn.Create("/test_tree2", []byte("tree_content"),
    		0, zk.WorldACL(zk.PermAll)); err != nil {
    		fmt.Println("create err", err)
    	}
    
    	//设置子节点
    	if _, err := conn.Create("/test_tree2/subnode", []byte("node_content"),
    		0, zk.WorldACL(zk.PermAll)); err != nil {
    		fmt.Println("create err", err)
    	}
    
    	//获取子节点列表
    	childNodes, _, err := conn.Children("/test_tree2")
    	if err != nil {
    		fmt.Println("Children err", err)
    	}
    	fmt.Println("childNodes", childNodes)
    }
    

# zookeeper监听子节点

  • 监听部分,代码实现:

    package main
    
    import (
    	"fmt"
    	"github.com/e421083458/gateway_demo/proxy/zookeeper"
    	"log"
    	"os"
    	"os/signal"
    	"syscall"
    )
    
    var addr = "127.0.0.1:2002"
    
    func main() {
    	//获取zk节点列表
    	zkManager := zookeeper.NewZkManager([]string{"127.0.0.1:2181"})
    	zkManager.GetConnect()
    	defer zkManager.Close()
    
    	// 第一次获取一下
    	zlist, err := zkManager.GetServerListByPath("/real_server")
    	fmt.Println("server node:")
    	fmt.Println(zlist)
    	if err != nil {
    		log.Println(err)
    	}
    
    	//动态监听节点变化  -- 配套register
    	//chanList, chanErr := zkManager.WatchServerListByPath("/real_server")
    	//go func() {
    	//	for {
    	//		select {
    	//		case changeErr := <-chanErr:
    	//			fmt.Println("changeErr")
    	//			fmt.Println(changeErr)
    	//		case changedList := <-chanList:
    	//			fmt.Println("watch node changed")
    	//			fmt.Println(changedList)
    	//		}
    	//	}
    	//}()
    
    	//获取节点内容 -- 配套write
    	zc, _, err := zkManager.GetPathData("/rs_server_conf")
    	if err != nil {
    		log.Println(err)
    	}
    	fmt.Println("get node data:")
    	fmt.Println(string(zc))
    
    	//动态监听节点内容
    	dataChan, dataErrChan := zkManager.WatchPathData("/rs_server_conf")
    	go func() {
    		for {
    			select {
    			case changeErr := <-dataErrChan:
    				fmt.Println("changeErr")
    				fmt.Println(changeErr)
    			case changedData := <-dataChan:
    				fmt.Println("WatchGetData changed")
    				fmt.Println(string(changedData))
    			}
    		}
    	}()
    
    	//关闭信号监听
    	quit := make(chan os.Signal)
    	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    	<-quit
    }
    
  • 服务注册后,数据变化部分

    package main
    
    import (
    	"fmt"
    	"github.com/e421083458/gateway_demo/proxy/zookeeper"
    	"time"
    )
    
    func main() {
    	zkManager := zookeeper.NewZkManager([]string{"127.0.0.1:2181"})
    	zkManager.GetConnect()
    	defer zkManager.Close()
    	i := 0
    
    	for {
    		conf := fmt.Sprintf("{name:" + fmt.Sprint(i) + "}")
    		zkManager.SetPathData("/rs_server_conf", []byte(conf), int32(i))
    		time.Sleep(5 * time.Second)
    		i++
    	}
    }
    

# 网关服务发现原理

  • 网关中的负载均衡监听zookeeper变化,服务启动时在zookeeper中创建临时节点

  • 下游机器启动时创建临时节点,节点名和内容为服务地址

  • 以观察者模式构建负载配置LoadBalanConf

  • 负载均衡配置LoadBalanceConf与负载均衡器整合

结合服务启动时,将服务注册到zookeeper中的代码,这样可以通过watch实现动态监听服务变化

package main

import (
	"fmt"
	"github.com/e421083458/gateway_demo/proxy/zookeeper"
	"io"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func main() {
	rs1 := &RealServer{Addr: "127.0.0.1:2003"}
	rs1.Run()
	time.Sleep(2 * time.Second)
	rs2 := &RealServer{Addr: "127.0.0.1:2004"}
	rs2.Run()
	time.Sleep(2 * time.Second)

	//监听关闭信号
	quit := make(chan os.Signal)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
}

type RealServer struct {
	Addr string
}

func (r *RealServer) Run() {
	log.Println("Starting httpserver at " + r.Addr)
	mux := http.NewServeMux()
	mux.HandleFunc("/", r.HelloHandler)
	mux.HandleFunc("/base/error", r.ErrorHandler)
	server := &http.Server{
		Addr:         r.Addr,
		WriteTimeout: time.Second * 3,
		Handler:      mux,
	}
	go func() {
		//注册zk节点
		zkManager := zookeeper.NewZkManager([]string{"127.0.0.1:2181"})
		err := zkManager.GetConnect()
		if err != nil {
			fmt.Printf(" connect zk error: %s ", err)
		}
		defer zkManager.Close()
		err = zkManager.RegistServerPath("/real_server", r.Addr)
		if err != nil {
			fmt.Printf(" regist node error: %s ", err)
		}
		zlist, err := zkManager.GetServerListByPath("/real_server")
		fmt.Println(zlist)
		log.Fatal(server.ListenAndServe())
	}()
}

func (r *RealServer) HelloHandler(w http.ResponseWriter, req *http.Request) {
	upath := fmt.Sprintf("http://%s%s\n", r.Addr, req.URL.Path)
	io.WriteString(w, upath)
}

func (r *RealServer) ErrorHandler(w http.ResponseWriter, req *http.Request) {
	upath := "error handler"
	w.WriteHeader(500)
	io.WriteString(w, upath)
}

上面实现了怎么将服务启动时注册到zookeeper中,下面我们来实现怎么从zookeeper中拿到服务,来做负载均衡,采用观察者模式。

实现思路:负载均衡策略使用观察者模式,通过监听zk的动态变化,来更新配置

# 客户端服务发现

  • 下游机器启动时无需进行任何操作
  • 以观察者模式构建负载均衡配置LoadBalanceConf
  • 负载均衡配置固定时间频率检测下游节点健康状况