inproc_roundtrip.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package httptoo
  2. import (
  3. "context"
  4. "io"
  5. "net/http"
  6. "sync"
  7. "github.com/anacrolix/missinggo"
  8. )
  9. type responseWriter struct {
  10. mu sync.Mutex
  11. r http.Response
  12. headerWritten missinggo.Event
  13. bodyWriter io.WriteCloser
  14. bodyClosed missinggo.SynchronizedEvent
  15. }
  16. var _ interface {
  17. http.ResponseWriter
  18. // We're able to emulate this easily enough.
  19. http.CloseNotifier
  20. } = &responseWriter{}
  21. // Use Request.Context.Done instead.
  22. func (me *responseWriter) CloseNotify() <-chan bool {
  23. ret := make(chan bool, 1)
  24. go func() {
  25. <-me.bodyClosed.C()
  26. ret <- true
  27. }()
  28. return ret
  29. }
  30. func (me *responseWriter) Header() http.Header {
  31. if me.r.Header == nil {
  32. me.r.Header = make(http.Header)
  33. }
  34. return me.r.Header
  35. }
  36. func (me *responseWriter) Write(b []byte) (int, error) {
  37. me.mu.Lock()
  38. if !me.headerWritten.IsSet() {
  39. me.writeHeader(200)
  40. }
  41. me.mu.Unlock()
  42. return me.bodyWriter.Write(b)
  43. }
  44. func (me *responseWriter) WriteHeader(status int) {
  45. me.mu.Lock()
  46. me.writeHeader(status)
  47. me.mu.Unlock()
  48. }
  49. func (me *responseWriter) writeHeader(status int) {
  50. if me.headerWritten.IsSet() {
  51. return
  52. }
  53. me.r.StatusCode = status
  54. me.headerWritten.Set()
  55. }
  56. func (me *responseWriter) runHandler(h http.Handler, req *http.Request) {
  57. var pr *io.PipeReader
  58. pr, me.bodyWriter = io.Pipe()
  59. me.r.Body = struct {
  60. io.Reader
  61. io.Closer
  62. }{pr, eventCloser{pr, &me.bodyClosed}}
  63. // Shouldn't be writing to the response after the handler returns.
  64. defer me.bodyWriter.Close()
  65. // Send a 200 if nothing was written yet.
  66. defer me.WriteHeader(200)
  67. // Wrap the context in the given Request with one that closes when either
  68. // the handler returns, or the response body is closed.
  69. ctx, cancel := context.WithCancel(req.Context())
  70. defer cancel()
  71. go func() {
  72. <-me.bodyClosed.C()
  73. cancel()
  74. }()
  75. h.ServeHTTP(me, req.WithContext(ctx))
  76. }
  77. type eventCloser struct {
  78. c io.Closer
  79. closed *missinggo.SynchronizedEvent
  80. }
  81. func (me eventCloser) Close() (err error) {
  82. err = me.c.Close()
  83. me.closed.Set()
  84. return
  85. }
  86. func RoundTripHandler(req *http.Request, h http.Handler) (*http.Response, error) {
  87. rw := responseWriter{}
  88. go rw.runHandler(h, req)
  89. <-rw.headerWritten.LockedChan(&rw.mu)
  90. return &rw.r, nil
  91. }
  92. type InProcRoundTripper struct {
  93. Handler http.Handler
  94. }
  95. func (me *InProcRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
  96. return RoundTripHandler(req, me.Handler)
  97. }