appsrv.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package appsrv
  15. import (
  16. "bufio"
  17. "bytes"
  18. "context"
  19. "crypto/sha1"
  20. "crypto/tls"
  21. "encoding/base64"
  22. "fmt"
  23. "io"
  24. olog "log"
  25. "math/rand"
  26. "net"
  27. "net/http"
  28. "net/url"
  29. "os"
  30. "strings"
  31. "sync"
  32. "syscall"
  33. "time"
  34. "yunion.io/x/jsonutils"
  35. "yunion.io/x/log"
  36. "yunion.io/x/pkg/appctx"
  37. "yunion.io/x/pkg/errors"
  38. "yunion.io/x/pkg/trace"
  39. "yunion.io/x/pkg/util/httputils"
  40. "yunion.io/x/pkg/util/signalutils"
  41. "yunion.io/x/pkg/utils"
  42. "yunion.io/x/onecloud/pkg/httperrors"
  43. "yunion.io/x/onecloud/pkg/proxy"
  44. "yunion.io/x/onecloud/pkg/util/ctx"
  45. )
  46. type Application struct {
  47. name string
  48. context context.Context
  49. session *SWorkerManager
  50. readSession *SWorkerManager
  51. systemSession *SWorkerManager
  52. roots map[string]*RadixNode
  53. rootLock *sync.RWMutex
  54. connMax int
  55. idleTimeout time.Duration
  56. readTimeout time.Duration
  57. readHeaderTimeout time.Duration
  58. writeTimeout time.Duration
  59. processTimeout time.Duration
  60. defHandlerInfo SHandlerInfo
  61. cors *Cors
  62. middlewares []MiddlewareFunc
  63. hostId string
  64. isExiting bool
  65. idleConnsClosed chan struct{}
  66. httpServer *http.Server
  67. slaveHttpServer *http.Server
  68. exception func(method, path string, body jsonutils.JSONObject, err error)
  69. isTLS bool
  70. enableProfiling bool
  71. allowTLS1x bool
  72. }
  73. const (
  74. DEFAULT_BACKLOG = 1024
  75. DEFAULT_IDLE_TIMEOUT = 10 * time.Second
  76. DEFAULT_READ_TIMEOUT = 0
  77. DEFAULT_READ_HEADER_TIMEOUT = 10 * time.Second
  78. DEFAULT_WRITE_TIMEOUT = 0
  79. // set default process timeout to 60 seconds
  80. DEFAULT_PROCESS_TIMEOUT = 60 * time.Second
  81. )
  82. var quitHandlerRegisted bool
  83. func NewApplication(name string, connMax int, queueSize int, db bool) *Application {
  84. app := Application{name: name,
  85. context: ctx.CtxWithTime(),
  86. connMax: connMax,
  87. session: NewWorkerManager("HttpRequestWorkerManager", connMax, connMax*queueSize, db),
  88. readSession: NewWorkerManager("HttpGetRequestWorkerManager", connMax, connMax*queueSize, db),
  89. systemSession: NewWorkerManager("InternalHttpRequestWorkerManager", 1, queueSize, false),
  90. roots: make(map[string]*RadixNode),
  91. rootLock: &sync.RWMutex{},
  92. idleTimeout: DEFAULT_IDLE_TIMEOUT,
  93. readTimeout: DEFAULT_READ_TIMEOUT,
  94. readHeaderTimeout: DEFAULT_READ_HEADER_TIMEOUT,
  95. writeTimeout: DEFAULT_WRITE_TIMEOUT,
  96. processTimeout: DEFAULT_PROCESS_TIMEOUT,
  97. }
  98. app.SetContext(appctx.APP_CONTEXT_KEY_APP, &app)
  99. app.SetContext(appctx.APP_CONTEXT_KEY_APPNAME, app.name)
  100. hm := sha1.New()
  101. hm.Write([]byte(name))
  102. hostname, _ := os.Hostname()
  103. hm.Write([]byte(hostname))
  104. outIp := utils.GetOutboundIP()
  105. hm.Write([]byte(outIp.String()))
  106. hostId := base64.URLEncoding.EncodeToString(hm.Sum(nil))
  107. log.Infof("App hostId: %s (%s,%s,%s)", hostId, name, hostname, outIp.String())
  108. app.hostId = hostId
  109. app.SetContext(appctx.APP_CONTEXT_KEY_HOST_ID, hostId)
  110. // initialize random seed
  111. rand.Seed(time.Now().UnixNano())
  112. return &app
  113. }
  114. func (app *Application) OnException(exception func(method, path string, body jsonutils.JSONObject, err error)) *Application {
  115. app.exception = exception
  116. return app
  117. }
  118. func (app *Application) SetDefaultTimeout(to time.Duration) *Application {
  119. log.Infof("adjust application default timeout to %f seconds", to.Seconds())
  120. app.processTimeout = to
  121. return app
  122. }
  123. func (app *Application) AllowTLS1x() *Application {
  124. log.Infof("Allow TLS1.0&1.1")
  125. app.allowTLS1x = true
  126. return app
  127. }
  128. func SplitPath(path string) []string {
  129. ret := make([]string, 0)
  130. for _, seg := range strings.Split(path, "/") {
  131. seg = strings.Trim(seg, " \t\r\n")
  132. if len(seg) > 0 {
  133. ret = append(ret, seg)
  134. }
  135. }
  136. return ret
  137. }
  138. func (app *Application) GetName() string {
  139. return app.name
  140. }
  141. func (app *Application) getRoot(method string) *RadixNode {
  142. app.rootLock.RLock()
  143. if v, ok := app.roots[method]; ok {
  144. app.rootLock.RUnlock()
  145. return v
  146. }
  147. app.rootLock.RUnlock()
  148. v := NewRadix()
  149. app.rootLock.Lock()
  150. app.roots[method] = v
  151. app.rootLock.Unlock()
  152. return v
  153. }
  154. func (app *Application) AddReverseProxyHandler(prefix string, ef *proxy.SEndpointFactory, m proxy.RequestManipulator) {
  155. app.AddReverseProxyHandlerWithCallbackConfig(prefix, ef, m,
  156. func(method string, hi *SHandlerInfo) *SHandlerInfo {
  157. return hi
  158. },
  159. )
  160. }
  161. func (app *Application) AddReverseProxyHandlerWithCallbackConfig(prefix string, ef *proxy.SEndpointFactory, m proxy.RequestManipulator, confCb func(string, *SHandlerInfo) *SHandlerInfo) {
  162. handler := proxy.NewHTTPReverseProxy(ef, m).ServeHTTP
  163. for _, method := range []string{"GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"} {
  164. hi := &SHandlerInfo{}
  165. hi = confCb(method, hi)
  166. if hi != nil {
  167. hi.SetMethod(method)
  168. hi.SetPath(prefix)
  169. hi.SetHandler(handler)
  170. app.AddHandler3(hi)
  171. }
  172. }
  173. }
  174. func (app *Application) AddHandler(method string, prefix string,
  175. handler func(context.Context, http.ResponseWriter, *http.Request)) *SHandlerInfo {
  176. return app.AddHandler2(method, prefix, handler, nil, "", nil)
  177. }
  178. func (app *Application) AddHandler2(method string, prefix string,
  179. handler func(context.Context, http.ResponseWriter, *http.Request),
  180. metadata map[string]interface{}, name string, tags map[string]string) *SHandlerInfo {
  181. segs := SplitPath(prefix)
  182. hi := newHandlerInfo(method, segs, handler, metadata, name, tags)
  183. return app.AddHandler3(hi)
  184. }
  185. func (app *Application) AddHandler3(hi *SHandlerInfo) *SHandlerInfo {
  186. e := app.getRoot(hi.method).Add(hi.path, hi)
  187. if e != nil {
  188. log.Fatalf("Fail to register %s %s: %s", hi.method, hi.path, e)
  189. }
  190. return hi
  191. }
  192. type loggingResponseWriter struct {
  193. http.ResponseWriter
  194. status int
  195. data []byte
  196. }
  197. func (lrw *loggingResponseWriter) Flush() {
  198. if fw, ok := lrw.ResponseWriter.(http.Flusher); ok {
  199. fw.Flush()
  200. }
  201. }
  202. func (lrw *loggingResponseWriter) Write(data []byte) (int, error) {
  203. lrw.data = data
  204. return lrw.ResponseWriter.Write(data)
  205. }
  206. func (lrw *loggingResponseWriter) Hijack() (rwc net.Conn, buf *bufio.ReadWriter, err error) {
  207. if f, ok := lrw.ResponseWriter.(http.Hijacker); ok {
  208. return f.Hijack()
  209. }
  210. return nil, nil, fmt.Errorf("not a hijacker")
  211. }
  212. func (lrw *loggingResponseWriter) WriteHeader(code int) {
  213. if code < 100 || code >= 600 {
  214. log.Errorf("Invalud status code %d, set code to 598", code)
  215. code = 598
  216. }
  217. lrw.status = code
  218. lrw.ResponseWriter.WriteHeader(code)
  219. }
  220. func genRequestId(w http.ResponseWriter, r *http.Request) string {
  221. rid := r.Header.Get("X-Request-Id")
  222. if len(rid) == 0 {
  223. rid = utils.GenRequestId(3)
  224. } else {
  225. rid = fmt.Sprintf("%s-%s", rid, utils.GenRequestId(3))
  226. }
  227. w.Header().Set("X-Request-Id", rid)
  228. return rid
  229. }
  230. func (app *Application) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  231. rid := genRequestId(w, r)
  232. w.Header().Set("X-Request-Host-Id", app.hostId)
  233. lrw := &loggingResponseWriter{ResponseWriter: w, status: http.StatusOK, data: []byte{}}
  234. start := time.Now()
  235. hi, params := app.defaultHandle(lrw, r, rid)
  236. if hi == nil {
  237. hi = &app.defHandlerInfo
  238. }
  239. var counter *handlerRequestCounter
  240. if lrw.status < 400 {
  241. counter = &hi.counter2XX
  242. } else if lrw.status < 500 {
  243. counter = &hi.counter4XX
  244. } else {
  245. counter = &hi.counter5XX
  246. }
  247. duration := float64(time.Since(start).Nanoseconds()) / 1000000
  248. counter.hit += 1
  249. counter.duration += duration
  250. skipLog := false
  251. if params != nil {
  252. if params.SkipLog {
  253. skipLog = true
  254. }
  255. } else if hi.skipLog {
  256. skipLog = true
  257. }
  258. peerServiceName := r.Header.Get("X-Yunion-Peer-Service-Name")
  259. var remote string
  260. if len(peerServiceName) > 0 {
  261. remote = fmt.Sprintf("%s:%s", r.RemoteAddr, peerServiceName)
  262. } else {
  263. remote = r.RemoteAddr
  264. }
  265. if !skipLog {
  266. log.Infof("%s %d %s %s %s (%s) %.2fms", app.hostId, lrw.status, rid, r.Method, r.URL, remote, duration)
  267. }
  268. if lrw.status >= 500 && app.exception != nil {
  269. url := fmt.Sprintf("%d %s (%s) %.2fms", lrw.status, r.URL.String(), remote, duration)
  270. app.exception(r.Method, url, params.Body, errors.Errorf("%s", string(lrw.data)))
  271. }
  272. }
  273. func (app *Application) handleCORS(w http.ResponseWriter, r *http.Request) bool {
  274. if app.cors == nil {
  275. return false
  276. }
  277. if r.Method == "OPTIONS" && r.Header.Get("Access-Control-Request-Method") != "" {
  278. app.cors.handlePreflight(w, r)
  279. return true
  280. } else {
  281. app.cors.handleActualRequest(w, r)
  282. return false
  283. }
  284. }
  285. type appTask struct {
  286. ctx context.Context
  287. hand *SHandlerInfo
  288. rid string
  289. params map[string]string
  290. appParams *SAppParams
  291. app *Application
  292. fw responseWriterChannel
  293. r *http.Request
  294. segs []string
  295. to time.Duration
  296. cancel context.CancelFunc
  297. }
  298. func (t *appTask) Run() {
  299. if t.ctx.Err() == nil {
  300. t.ctx = context.WithValue(t.ctx, appctx.APP_CONTEXT_KEY_REQUEST_ID, t.rid)
  301. t.ctx = context.WithValue(t.ctx, appctx.APP_CONTEXT_KEY_CUR_ROOT, t.hand.path)
  302. t.ctx = context.WithValue(t.ctx, appctx.APP_CONTEXT_KEY_CUR_PATH, t.segs[len(t.hand.path):])
  303. t.ctx = context.WithValue(t.ctx, appctx.APP_CONTEXT_KEY_PARAMS, t.params)
  304. t.ctx = context.WithValue(t.ctx, appctx.APP_CONTEXT_KEY_START_TIME, time.Now().UTC())
  305. if t.hand.metadata != nil {
  306. t.ctx = context.WithValue(t.ctx, appctx.APP_CONTEXT_KEY_METADATA, t.hand.metadata)
  307. }
  308. t.ctx = context.WithValue(t.ctx, APP_CONTEXT_KEY_APP_PARAMS, t.appParams)
  309. func() {
  310. span := trace.StartServerTrace(&t.fw, t.r, t.appParams.Name, t.app.GetName(), t.hand.GetTags())
  311. defer func() {
  312. if !t.appParams.SkipTrace {
  313. span.EndTrace()
  314. }
  315. }()
  316. t.ctx = context.WithValue(t.ctx, appctx.APP_CONTEXT_KEY_TRACE, span)
  317. t.hand.handler(t.ctx, &t.fw, t.r)
  318. }()
  319. } // otherwise, the task has been timeout
  320. t.fw.closeChannels()
  321. }
  322. func (t *appTask) Dump() string {
  323. return fmt.Sprintf("%s %s", t.r.Method, t.r.URL.String())
  324. }
  325. func (app *Application) defaultHandle(w http.ResponseWriter, r *http.Request, rid string) (*SHandlerInfo, *SAppParams) {
  326. segs := SplitPath(r.URL.EscapedPath())
  327. for i := range segs {
  328. if p, err := url.PathUnescape(segs[i]); err == nil {
  329. segs[i] = p
  330. }
  331. }
  332. params := make(map[string]string)
  333. w.Header().Set("Server", "Yunion AppServer/Go/2018.4")
  334. w.Header().Set("X-Frame-Options", "SAMEORIGIN")
  335. w.Header().Set("X-XSS-Protection", "1; mode=block")
  336. w.Header().Set("X-Content-Type-Options", "nosniff")
  337. if app.isTLS {
  338. w.Header().Set("Strict-Transport-Security", "max-age=31536000; includeSubDomains")
  339. }
  340. isCors := app.handleCORS(w, r)
  341. handler := app.getRoot(r.Method).Match(segs, params)
  342. if handler != nil {
  343. // log.Print("Found handler", params)
  344. hand, ok := handler.(*SHandlerInfo)
  345. if ok {
  346. task := &appTask{
  347. ctx: app.context,
  348. hand: hand,
  349. rid: rid,
  350. params: params,
  351. app: app,
  352. fw: newResponseWriterChannel(w),
  353. r: r,
  354. segs: segs,
  355. to: hand.fetchProcessTimeout(r),
  356. cancel: nil,
  357. }
  358. currentWorker := make(chan *SWorker, 1) // make it a buffered channel
  359. if task.to == 0 {
  360. task.to = app.processTimeout
  361. }
  362. if task.to > 0 {
  363. task.ctx, task.cancel = context.WithTimeout(task.ctx, task.to)
  364. }
  365. if task.cancel != nil {
  366. defer task.cancel()
  367. }
  368. task.ctx = appctx.WithRequestLang(task.ctx, r)
  369. session := hand.fetchWorkerManager(r)
  370. if session == nil {
  371. if r.Method == "GET" || r.Method == "HEAD" {
  372. session = app.readSession
  373. } else {
  374. session = app.session
  375. }
  376. }
  377. task.appParams = hand.GetAppParams(params, segs)
  378. task.appParams.Request = r
  379. task.appParams.Response = w
  380. if r.Body != nil && r.ContentLength > 0 && getContentType(r) == ContentTypeJson {
  381. data, _ := io.ReadAll(r.Body)
  382. task.appParams.Body, _ = jsonutils.Parse(data)
  383. r.Body = io.NopCloser(bytes.NewBuffer(data))
  384. }
  385. inqueue := session.Run(
  386. task,
  387. currentWorker,
  388. func(err error) {
  389. httperrors.InternalServerError(task.ctx, &task.fw, "Internal server error: %s", err)
  390. task.fw.closeChannels()
  391. },
  392. )
  393. if !inqueue {
  394. httperrors.TooManyRequestsError(task.ctx, w, "Request queue is full")
  395. } else {
  396. runErr := task.fw.wait(task.ctx, currentWorker)
  397. if runErr != nil {
  398. switch je := runErr.(type) {
  399. case *httputils.JSONClientError:
  400. httperrors.GeneralServerError(task.ctx, w, je)
  401. default:
  402. httperrors.InternalServerError(task.ctx, w, "Internal server error")
  403. }
  404. }
  405. }
  406. task.fw.closeChannels()
  407. return hand, task.appParams
  408. } else {
  409. ctx := appctx.WithRequestLang(context.TODO(), r)
  410. httperrors.InternalServerError(ctx, w, "Invalid handler %s", r.URL)
  411. }
  412. } else if !isCors {
  413. ctx := appctx.WithRequestLang(context.TODO(), r)
  414. httperrors.NotFoundError(ctx, w, "Handler %s not found", "/"+strings.Join(segs, "/"))
  415. }
  416. return nil, nil
  417. }
  418. func (app *Application) AddDefaultHandler(method string, prefix string, handler func(context.Context, http.ResponseWriter, *http.Request), name string) {
  419. segs := SplitPath(prefix)
  420. hi := newHandlerInfo(method, segs, handler, nil, name, nil)
  421. hi.SetSkipLog(true).SetWorkerManager(app.systemSession)
  422. app.AddHandler3(hi)
  423. }
  424. func (app *Application) addDefaultHandlers() {
  425. app.AddDefaultHandler("GET", "/version", WhitelistFilter(VersionHandler), "version")
  426. app.AddDefaultHandler("GET", "/stats", WhitelistFilter(StatisticHandler), "stats")
  427. app.AddDefaultHandler("POST", "/ping", WhitelistFilter(PingHandler), "ping")
  428. app.AddDefaultHandler("GET", "/ping", WhitelistFilter(PingHandler), "ping")
  429. app.AddDefaultHandler("GET", "/worker_stats", WhitelistFilter(WorkerStatsHandler), "worker_stats")
  430. app.AddDefaultHandler("GET", "/process_stats", WhitelistFilter(ProcessStatsHandler), "process_stats")
  431. }
  432. func timeoutHandle(h http.Handler) http.HandlerFunc {
  433. return func(w http.ResponseWriter, r *http.Request) {
  434. if r.Method == "POST" && strings.HasSuffix(r.URL.Path, "upload") {
  435. // 上传文件接口默认不超时
  436. h.ServeHTTP(w, r)
  437. } else {
  438. // 服务器超时时间默认设置为10秒.
  439. http.TimeoutHandler(h, DEFAULT_IDLE_TIMEOUT, "").ServeHTTP(w, r)
  440. }
  441. }
  442. }
  443. func (app *Application) initServer(addr string) *http.Server {
  444. return InitHTTPServer(app, addr)
  445. }
  446. func InitHTTPServer(app *Application, addr string) *http.Server {
  447. /* db := AppContextDB(app.context)
  448. if db != nil {
  449. db.SetMaxIdleConns(app.connMax + 1)
  450. db.SetMaxOpenConns(app.connMax + 1)
  451. }
  452. */
  453. cipherSuites := []uint16{}
  454. for _, suite := range tls.CipherSuites() {
  455. if !strings.HasSuffix(suite.Name, "_SHA") {
  456. cipherSuites = append(cipherSuites, suite.ID)
  457. }
  458. }
  459. minTLSVer := uint16(tls.VersionTLS12)
  460. if app.allowTLS1x {
  461. minTLSVer = tls.VersionTLS10
  462. }
  463. tlsConf := &tls.Config{
  464. CipherSuites: cipherSuites,
  465. MinVersion: minTLSVer,
  466. }
  467. s := &http.Server{
  468. Addr: addr,
  469. Handler: app,
  470. IdleTimeout: app.idleTimeout,
  471. ReadTimeout: app.readTimeout,
  472. ReadHeaderTimeout: app.readHeaderTimeout,
  473. WriteTimeout: app.writeTimeout,
  474. MaxHeaderBytes: 1 << 20,
  475. // fix aliyun elb healt check tls error
  476. // issue like: https://github.com/megaease/easegress/issues/481
  477. ErrorLog: olog.New(io.Discard, "", olog.LstdFlags),
  478. TLSConfig: tlsConf,
  479. }
  480. return s
  481. }
  482. func (app *Application) registerCleanShutdown(s *http.Server, onStop func()) {
  483. if quitHandlerRegisted {
  484. log.Warningf("Application quit handler registed, duplicated!!!")
  485. return
  486. } else {
  487. quitHandlerRegisted = true
  488. }
  489. app.idleConnsClosed = make(chan struct{})
  490. signalutils.SetDumpStackSignal()
  491. quitSignals := []os.Signal{syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM}
  492. signalutils.RegisterSignal(func() {
  493. if app.isExiting {
  494. log.Infof("Quit signal received!!! clean up in progress, be patient...")
  495. return
  496. }
  497. app.isExiting = true
  498. log.Infof("Quit signal received!!! do cleanup...")
  499. if err := s.Shutdown(context.Background()); err != nil {
  500. // Error from closing listeners, or context timeout:
  501. log.Errorf("HTTP server Shutdown: %v", err)
  502. }
  503. if onStop != nil {
  504. func() {
  505. defer func() {
  506. if r := recover(); r != nil {
  507. log.Errorf("app exiting error: %s", r)
  508. }
  509. }()
  510. onStop()
  511. }()
  512. }
  513. close(app.idleConnsClosed)
  514. }, quitSignals...)
  515. signalutils.StartTrap()
  516. }
  517. func (app *Application) waitCleanShutdown() {
  518. <-app.idleConnsClosed
  519. log.Infof("Service stopped.")
  520. }
  521. func (app *Application) ListenAndServe(addr string) {
  522. app.ListenAndServeWithCleanup(addr, nil)
  523. }
  524. func (app *Application) ListenAndServeTLS(addr string, certFile, keyFile string) {
  525. app.ListenAndServeTLSWithCleanup(addr, certFile, keyFile, nil)
  526. }
  527. func (app *Application) ListenAndServeWithCleanup(addr string, onStop func()) {
  528. app.ListenAndServeTLSWithCleanup(addr, "", "", onStop)
  529. }
  530. func (app *Application) ListenAndServeTLSWithCleanup(addr string, certFile, keyFile string, onStop func()) {
  531. app.ListenAndServeTLSWithCleanup2(addr, certFile, keyFile, onStop, true)
  532. }
  533. func (app *Application) ListenAndServeWithoutCleanup(addr, certFile, keyFile string) {
  534. app.ListenAndServeTLSWithCleanup2(addr, certFile, keyFile, nil, false)
  535. }
  536. func (app *Application) ListenAndServeTLSWithCleanup2(addr string, certFile, keyFile string, onStop func(), isMaster bool) {
  537. app.isTLS = true
  538. httpSrv := app.initServer(addr)
  539. if isMaster {
  540. app.addDefaultHandlers()
  541. if app.enableProfiling {
  542. addPProfHandler("", app)
  543. }
  544. app.httpServer = httpSrv
  545. app.registerCleanShutdown(app.httpServer, onStop)
  546. } else {
  547. app.slaveHttpServer = httpSrv
  548. }
  549. app.listenAndServeInternal(httpSrv, certFile, keyFile)
  550. if isMaster {
  551. app.waitCleanShutdown()
  552. }
  553. }
  554. func (app *Application) Stop(ctx context.Context) error {
  555. if app.httpServer != nil {
  556. return app.httpServer.Shutdown(ctx)
  557. }
  558. return nil
  559. }
  560. func (app *Application) listenAndServeInternal(s *http.Server, certFile, keyFile string) {
  561. var err error
  562. if len(certFile) == 0 && len(keyFile) == 0 {
  563. err = s.ListenAndServe()
  564. } else {
  565. err = s.ListenAndServeTLS(certFile, keyFile)
  566. }
  567. if err != nil && err != http.ErrServerClosed {
  568. log.Fatalf("ListAndServer fail: %s (cert=%s key=%s)", err, certFile, keyFile)
  569. }
  570. }
  571. type TContentType string
  572. const (
  573. ContentTypeJson = TContentType("Json")
  574. ContentTypeForm = TContentType("Form")
  575. ContentTypeOctetStream = TContentType("OctetStream")
  576. ContentTypeUnknown = TContentType("Unknown")
  577. )
  578. func getContentType(r *http.Request) TContentType {
  579. contType := func() string {
  580. for _, k := range []string{"Content-Type", "content-type"} {
  581. contentType := r.Header.Get(k)
  582. if len(contentType) > 0 {
  583. return strings.ToLower(contentType)
  584. }
  585. }
  586. return ""
  587. }()
  588. for k, v := range map[string]TContentType{
  589. "application/json": ContentTypeJson,
  590. "application/x-www-form-urlencoded": ContentTypeForm,
  591. "application/octet-stream": ContentTypeOctetStream,
  592. } {
  593. if strings.HasPrefix(contType, k) {
  594. return v
  595. }
  596. }
  597. return ContentTypeUnknown
  598. }
  599. func FetchEnv(ctx context.Context, w http.ResponseWriter, r *http.Request) (params map[string]string, query jsonutils.JSONObject, body jsonutils.JSONObject) {
  600. var err error
  601. params = appctx.AppContextParams(ctx)
  602. query, err = jsonutils.ParseQueryString(r.URL.RawQuery)
  603. if err != nil {
  604. log.Errorf("Parse query string %s failed: %v", r.URL.RawQuery, err)
  605. }
  606. if r.Method == "PUT" || r.Method == "POST" || r.Method == "DELETE" || r.Method == "PATCH" {
  607. switch getContentType(r) {
  608. case ContentTypeJson:
  609. if r.ContentLength > 0 {
  610. body, err = FetchJSON(r)
  611. if err != nil {
  612. log.Warningf("Fail to decode JSON request body: %v", err)
  613. }
  614. }
  615. case ContentTypeForm:
  616. err := r.ParseForm()
  617. if err != nil {
  618. log.Warningf("ParseForm %s error: %v", r.URL.String(), err)
  619. }
  620. query, err = jsonutils.ParseQueryString(r.PostForm.Encode())
  621. if err != nil {
  622. log.Warningf("Parse query string %s failed: %v", r.PostForm.Encode(), err)
  623. }
  624. case ContentTypeOctetStream:
  625. default:
  626. log.Warningf("%s invalid contentType with header %v", r.URL.String(), r.Header)
  627. }
  628. }
  629. return params, query, body
  630. }
  631. func (app *Application) GetContext() context.Context {
  632. return app.context
  633. }
  634. func (app *Application) EnableProfiling() {
  635. app.enableProfiling = true
  636. }