现在我是这样子做的,A和和服务端S建立一个和UDP连接,然后B和服务器S也建立一个长连接,B通过rpc向S发消息,然后S发给A。这样子就达到了B向A发消息了。反之A给B发消息也是这样子。因为发出去的消息要经过S处理的,比如敏感词之类的。

首先是服务端的代码:

  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "net"
  6. "net/http"
  7. "net/rpc"
  8. "os"
  9. )
  10. type TCPPush struct {
  11. }
  12. type PushRequest struct {
  13. A string
  14. }
  15. type PushResponse struct {
  16. Data string
  17. }
  18. // 建立连接池,用于广播消息
  19. var conns = make(map[string]net.Conn)
  20. func main() {
  21. port := "9090"
  22. go Start(port)
  23. // 以下是创建rpc服务器
  24. rpc.Register(new(TCPPush)) // 注册rpc服务
  25. rpc.HandleHTTP() // 采用http协议作为rpc载体
  26. lis, err := net.Listen("tcp", "127.0.0.1:8095")
  27. if err != nil {
  28. log.Fatalln("fatal error: ", err)
  29. }
  30. fmt.Fprintf(os.Stdout, "%s", "start rpc connection!\n")
  31. http.Serve(lis, nil)
  32. }
  33. func (*TCPPush) PushData(req PushRequest, res *PushResponse) error {
  34. doPush(req)
  35. res.Data = "success!"
  36. return nil
  37. }
  38. func doPush(req PushRequest) {
  39. // 发送数据
  40. for _, conn := range conns {
  41. sendData := []byte(req.A)
  42. fmt.Println("remoteAddr:", conn, "sendData:", string(sendData))
  43. _, err := conn.Write(sendData)
  44. if err != nil {
  45. log.Printf("broad message to %s failed.\n", conn)
  46. }
  47. }
  48. }
  49. // 启动服务器
  50. func Start(port string) {
  51. host := ":" + port
  52. // 获取tcp地址
  53. tcpAddr, err := net.ResolveTCPAddr("tcp4", host)
  54. if err != nil {
  55. log.Printf("resolve tcp addr failed: %v\n", err)
  56. return
  57. }
  58. // 监听
  59. listener, err := net.ListenTCP("tcp", tcpAddr)
  60. if err != nil {
  61. log.Printf("listen tcp port failed: %v\n", err)
  62. return
  63. }
  64. // 消息通道
  65. messageChan := make(chan string, 10)
  66. // 广播消息
  67. go BroadMessages(&conns, messageChan)
  68. // 启动
  69. for {
  70. fmt.Printf("listening port %s ...\n", port)
  71. conn, err := listener.AcceptTCP()
  72. if err != nil {
  73. log.Printf("Accept failed:%v\n", err)
  74. continue
  75. }
  76. // 把每个客户端连接扔进连接池
  77. conns[conn.RemoteAddr().String()] = conn
  78. fmt.Println(conns)
  79. // 处理消息
  80. go Handler(conn, &conns, messageChan)
  81. }
  82. }
  83. // 向所有连接上的乡亲们发广播
  84. func BroadMessages(conns *map[string]net.Conn, messages chan string) {
  85. for {
  86. // 不断从通道里读取消息
  87. msg := <-messages
  88. fmt.Println(msg)
  89. // 向所有的乡亲们发消息
  90. for key, conn := range *conns {
  91. fmt.Println("connection is connected from ", key)
  92. _, err := conn.Write([]byte(msg))
  93. if err != nil {
  94. log.Printf("broad message to %s failed: %v\n", key, err)
  95. delete(*conns, key)
  96. }
  97. }
  98. }
  99. }
  100. // 处理客户端发到服务端的消息,将其扔到通道中
  101. func Handler(conn net.Conn, conns *map[string]net.Conn, messages chan string) {
  102. fmt.Println("connect from client ", conn.RemoteAddr().String())
  103. buf := make([]byte, 1024)
  104. for {
  105. length, err := conn.Read(buf)
  106. if err != nil {
  107. log.Printf("read client message failed:%v\n", err)
  108. delete(*conns, conn.RemoteAddr().String())
  109. conn.Close()
  110. break
  111. }
  112. // 把收到的消息写到通道中
  113. receiveStr := string(buf[0:length])
  114. messages <- receiveStr
  115. }
  116. }

然后是tcp客户端:

  1. package main
  2. import (
  3. "net"
  4. "log"
  5. "fmt"
  6. "os"
  7. )
  8. func main() {
  9. StartClient(os.Args[1])
  10. }
  11. func StartClient(tcpAddrStr string) {
  12. tcpAddr, err := net.ResolveTCPAddr("tcp4", tcpAddrStr)
  13. if err != nil {
  14. log.Printf("Resolve tcp addr failed: %v\n", err)
  15. return
  16. }
  17. // 向服务器拨号
  18. conn, err := net.DialTCP("tcp", nil, tcpAddr)
  19. if err != nil {
  20. log.Printf("Dial to server failed: %v\n", err)
  21. return
  22. }
  23. // 向服务器发消息
  24. go SendMsg(conn)
  25. // 接收来自服务器端的广播消息
  26. buf := make([]byte, 1024)
  27. for {
  28. length, err := conn.Read(buf)
  29. if err != nil {
  30. log.Printf("recv server msg failed: %v\n", err)
  31. conn.Close()
  32. os.Exit(0)
  33. break
  34. }
  35. fmt.Println(string(buf[0:length]))
  36. }
  37. }
  38. // 向服务器端发消息
  39. func SendMsg(conn net.Conn) {
  40. username := conn.LocalAddr().String()
  41. for {
  42. var input string
  43. // 接收输入消息,放到input变量中
  44. fmt.Scanln(&input)
  45. if input == "/q" || input == "/quit" {
  46. fmt.Println("ByeBye ...")
  47. conn.Close()
  48. os.Exit(0)
  49. }
  50. // 只处理有内容的消息
  51. if len(input) > 0 {
  52. msg := username + " say:" + input
  53. _, err := conn.Write([]byte(msg))
  54. if err != nil {
  55. conn.Close()
  56. break
  57. }
  58. }
  59. }
  60. }

rpc客户端:

  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "net/rpc"
  6. )
  7. // 算数运算请求结构体
  8. type PushRequestClient struct {
  9. A string
  10. }
  11. type PushResponseClient struct {
  12. Data string
  13. }
  14. func main() {
  15. conn, err := rpc.DialHTTP("tcp", "127.0.0.1:8095")
  16. if err != nil {
  17. log.Fatalln("dialing error: ", err)
  18. }
  19. req := PushRequestClient{"this data is from rpc client....\n"}
  20. var res PushResponseClient
  21. err = conn.Call("TCPPush.PushData", req, &res) // 推送消息,TCPPush和rpcServer里面的是一样的,并且PushData在rpcServer里面是可导出的
  22. if err != nil {
  23. log.Fatalln("push error: ", err)
  24. }
  25. fmt.Println(res.Data)
  26. }

首先运行服务器:

  1. go run server.go

再跑tcp客户端:

  1. go run tcp_client.go :9090

最后跑rpc客户端:

  1. go run rpc_client.go

这样子,在rpc发出去的东西,在tcp客户端就可以看到了。

分类: web

标签:   golang