server.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package polling
  2. import (
  3. "bytes"
  4. "html/template"
  5. "io"
  6. "net/http"
  7. "sync"
  8. "github.com/googollee/go-engine.io/message"
  9. "github.com/googollee/go-engine.io/parser"
  10. "github.com/googollee/go-engine.io/transport"
  11. )
  12. type state int
  13. const (
  14. stateUnknow state = iota
  15. stateNormal
  16. stateClosing
  17. stateClosed
  18. )
  19. type Polling struct {
  20. sendChan chan bool
  21. encoder *parser.PayloadEncoder
  22. callback transport.Callback
  23. getLocker *Locker
  24. postLocker *Locker
  25. state state
  26. stateLocker sync.Mutex
  27. }
  28. func NewServer(w http.ResponseWriter, r *http.Request, callback transport.Callback) (transport.Server, error) {
  29. newEncoder := parser.NewBinaryPayloadEncoder
  30. if r.URL.Query()["b64"] != nil {
  31. newEncoder = parser.NewStringPayloadEncoder
  32. }
  33. ret := &Polling{
  34. sendChan: MakeSendChan(),
  35. encoder: newEncoder(),
  36. callback: callback,
  37. getLocker: NewLocker(),
  38. postLocker: NewLocker(),
  39. state: stateNormal,
  40. }
  41. return ret, nil
  42. }
  43. func (p *Polling) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  44. switch r.Method {
  45. case "GET":
  46. p.get(w, r)
  47. case "POST":
  48. p.post(w, r)
  49. }
  50. }
  51. func (p *Polling) Close() error {
  52. if p.getState() != stateNormal {
  53. return nil
  54. }
  55. p.setState(stateClosing)
  56. close(p.sendChan)
  57. if p.getLocker.TryLock() {
  58. if p.postLocker.TryLock() {
  59. p.callback.OnClose(p)
  60. p.setState(stateClosed)
  61. p.postLocker.Unlock()
  62. }
  63. p.getLocker.Unlock()
  64. }
  65. return nil
  66. }
  67. func (p *Polling) NextWriter(msgType message.MessageType, packetType parser.PacketType) (io.WriteCloser, error) {
  68. if p.getState() != stateNormal {
  69. return nil, io.EOF
  70. }
  71. var ret io.WriteCloser
  72. var err error
  73. switch msgType {
  74. case message.MessageText:
  75. ret, err = p.encoder.NextString(packetType)
  76. case message.MessageBinary:
  77. ret, err = p.encoder.NextBinary(packetType)
  78. }
  79. if err != nil {
  80. return nil, err
  81. }
  82. return NewWriter(ret, p), nil
  83. }
  84. func (p *Polling) get(w http.ResponseWriter, r *http.Request) {
  85. if !p.getLocker.TryLock() {
  86. http.Error(w, "overlay get", http.StatusBadRequest)
  87. return
  88. }
  89. if p.getState() != stateNormal {
  90. http.Error(w, "closed", http.StatusBadRequest)
  91. return
  92. }
  93. defer func() {
  94. if p.getState() == stateClosing {
  95. if p.postLocker.TryLock() {
  96. p.setState(stateClosed)
  97. p.callback.OnClose(p)
  98. p.postLocker.Unlock()
  99. }
  100. }
  101. p.getLocker.Unlock()
  102. }()
  103. <-p.sendChan
  104. if j := r.URL.Query().Get("j"); j != "" {
  105. // JSONP Polling
  106. w.Header().Set("Content-Type", "text/javascript; charset=UTF-8")
  107. tmp := bytes.Buffer{}
  108. p.encoder.EncodeTo(&tmp)
  109. pl := template.JSEscapeString(tmp.String())
  110. w.Write([]byte("___eio[" + j + "](\""))
  111. w.Write([]byte(pl))
  112. w.Write([]byte("\");"))
  113. } else {
  114. // XHR Polling
  115. if p.encoder.IsString() {
  116. w.Header().Set("Content-Type", "text/plain; charset=UTF-8")
  117. } else {
  118. w.Header().Set("Content-Type", "application/octet-stream")
  119. }
  120. p.encoder.EncodeTo(w)
  121. }
  122. }
  123. func (p *Polling) post(w http.ResponseWriter, r *http.Request) {
  124. w.Header().Set("Content-Type", "text/html")
  125. if !p.postLocker.TryLock() {
  126. http.Error(w, "overlay post", http.StatusBadRequest)
  127. return
  128. }
  129. if p.getState() != stateNormal {
  130. http.Error(w, "closed", http.StatusBadRequest)
  131. return
  132. }
  133. defer func() {
  134. if p.getState() == stateClosing {
  135. if p.getLocker.TryLock() {
  136. p.setState(stateClosed)
  137. p.callback.OnClose(p)
  138. p.getLocker.Unlock()
  139. }
  140. }
  141. p.postLocker.Unlock()
  142. }()
  143. var decoder *parser.PayloadDecoder
  144. if j := r.URL.Query().Get("j"); j != "" {
  145. // JSONP Polling
  146. d := r.FormValue("d")
  147. decoder = parser.NewPayloadDecoder(bytes.NewBufferString(d))
  148. } else {
  149. // XHR Polling
  150. decoder = parser.NewPayloadDecoder(r.Body)
  151. }
  152. for {
  153. d, err := decoder.Next()
  154. if err == io.EOF {
  155. break
  156. }
  157. if err != nil {
  158. http.Error(w, err.Error(), http.StatusBadRequest)
  159. return
  160. }
  161. p.callback.OnPacket(d)
  162. d.Close()
  163. }
  164. w.Write([]byte("ok"))
  165. }
  166. func (p *Polling) setState(s state) {
  167. p.stateLocker.Lock()
  168. defer p.stateLocker.Unlock()
  169. p.state = s
  170. }
  171. func (p *Polling) getState() state {
  172. p.stateLocker.Lock()
  173. defer p.stateLocker.Unlock()
  174. return p.state
  175. }