forwards.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package models
  15. import (
  16. "context"
  17. "fmt"
  18. "math/rand"
  19. "time"
  20. "yunion.io/x/jsonutils"
  21. "yunion.io/x/pkg/gotypes"
  22. "yunion.io/x/sqlchemy"
  23. cloudproxy_api "yunion.io/x/onecloud/pkg/apis/cloudproxy"
  24. "yunion.io/x/onecloud/pkg/cloudcommon/db"
  25. "yunion.io/x/onecloud/pkg/cloudcommon/validators"
  26. "yunion.io/x/onecloud/pkg/httperrors"
  27. "yunion.io/x/onecloud/pkg/mcclient"
  28. "yunion.io/x/onecloud/pkg/util/stringutils2"
  29. )
  30. type SForward struct {
  31. db.SVirtualResourceBase
  32. ProxyEndpointId string `width:"36" charset:"ascii" nullable:"false" list:"user" update:"user" create:"required"`
  33. ProxyAgentId string `width:"36" charset:"ascii" nullable:"true" list:"user" update:"user" create:"optional"`
  34. Type string `width:"16" charset:"ascii" nullable:"false" list:"user" create:"required"`
  35. RemoteAddr string `width:"16" charset:"ascii" nullable:"false" list:"user" create:"required"`
  36. RemotePort int `width:"16" charset:"ascii" nullable:"false" list:"user" create:"required"`
  37. BindPortReq int `width:"16" charset:"ascii" nullable:"false" list:"user" update:"user" create:"optional"`
  38. Opaque string `width:"36" charset:"ascii" nullable:"true" list:"user" update:"user" create:"optional"`
  39. BindPort int `width:"16" charset:"ascii" nullable:"false" list:"user" update:"user" create:"optional"`
  40. LastSeen time.Time `nullable:"true" get:"user" list:"user"`
  41. LastSeenTimeout int `width:"16" charset:"ascii" nullable:"false" list:"user" update:"user" create:"optional" default:"117"`
  42. }
  43. type SForwardManager struct {
  44. db.SVirtualResourceBaseManager
  45. }
  46. var ForwardManager *SForwardManager
  47. func init() {
  48. ForwardManager = &SForwardManager{
  49. SVirtualResourceBaseManager: db.NewVirtualResourceBaseManager(
  50. SForward{},
  51. "forwards_tbl",
  52. "forward",
  53. "forwards",
  54. ),
  55. }
  56. ForwardManager.SetVirtualObject(ForwardManager)
  57. }
  58. func (man *SForwardManager) validateLocalSetPort(ctx context.Context, data *jsonutils.JSONDict, agentId string, portReq int) (*jsonutils.JSONDict, error) {
  59. var (
  60. fwds []SForward
  61. q = man.Query().
  62. Equals("proxy_agent_id", agentId).
  63. Equals("bind_port_req", portReq)
  64. )
  65. if err := db.FetchModelObjects(man, q, &fwds); err != nil {
  66. return nil, httperrors.NewServerError("query forwards by agent failed: %v", err)
  67. }
  68. if len(fwds) > 0 {
  69. return nil, httperrors.NewConflictError("port %d on agent %s was already occupied",
  70. portReq, agentId)
  71. }
  72. data.Set("bind_port", jsonutils.NewInt(int64(portReq)))
  73. return data, nil
  74. }
  75. func (man *SForwardManager) validateRemoteSetPort(ctx context.Context, data *jsonutils.JSONDict, epId string, portReq int) (*jsonutils.JSONDict, error) {
  76. var (
  77. fwds []SForward
  78. q = man.Query().
  79. Equals("proxy_endpoint_id", epId).
  80. Equals("bind_port_req", portReq)
  81. )
  82. if err := db.FetchModelObjects(man, q, &fwds); err != nil {
  83. return nil, httperrors.NewServerError("query forwards by endpoint id failed: %v", err)
  84. }
  85. if len(fwds) > 0 {
  86. return nil, httperrors.NewConflictError("port %d on proxy endpoint %s was already occupied",
  87. portReq, epId)
  88. }
  89. data.Set("bind_port", jsonutils.NewInt(int64(portReq)))
  90. return data, nil
  91. }
  92. func (man *SForwardManager) validateLocalSelectAgent(ctx context.Context, data *jsonutils.JSONDict, portReq int) (*jsonutils.JSONDict, error) {
  93. agents, err := ProxyAgentManager.allAgents(ctx)
  94. if err != nil {
  95. return nil, httperrors.NewGeneralError(err)
  96. }
  97. agentsNum := len(agents)
  98. if agentsNum == 0 {
  99. return nil, httperrors.NewResourceNotFoundError("empty proxy agents set")
  100. }
  101. s := rand.Intn(agentsNum)
  102. for i := s; ; {
  103. agent := &agents[i]
  104. var err error
  105. data, err = man.validateLocalSetPort(ctx, data, agent.Id, portReq)
  106. if err == nil {
  107. data.Set("proxy_agent_id", jsonutils.NewString(agent.Id))
  108. return data, nil
  109. }
  110. i += 1
  111. if i == agentsNum {
  112. i = 0
  113. }
  114. if i == s {
  115. break
  116. }
  117. }
  118. return nil, httperrors.NewResourceNotFoundError("no proxy agent accepts request for port %d", portReq)
  119. }
  120. func (man *SForwardManager) validateRemoteSelectAgent(ctx context.Context, data *jsonutils.JSONDict) (*jsonutils.JSONDict, error) {
  121. agents, err := ProxyAgentManager.allAgents(ctx)
  122. if err != nil {
  123. return nil, httperrors.NewGeneralError(err)
  124. }
  125. agentsNum := len(agents)
  126. if agentsNum == 0 {
  127. return nil, httperrors.NewResourceNotFoundError("empty proxy agents set")
  128. }
  129. i := rand.Intn(agentsNum)
  130. agent := agents[i]
  131. data.Set("proxy_agent_id", jsonutils.NewString(agent.Id))
  132. return data, nil
  133. }
  134. func (man *SForwardManager) validatePortReq(
  135. ctx context.Context,
  136. typ string, portReq int, agentId, epId string,
  137. data *jsonutils.JSONDict,
  138. ) (*jsonutils.JSONDict, error) {
  139. validateOne := func(portReq int) (*jsonutils.JSONDict, error) {
  140. var err error
  141. switch typ {
  142. case cloudproxy_api.FORWARD_TYPE_LOCAL:
  143. if agentId == "" {
  144. data, err = man.validateLocalSelectAgent(ctx, data, portReq)
  145. } else {
  146. data, err = man.validateLocalSetPort(ctx, data, agentId, portReq)
  147. }
  148. case cloudproxy_api.FORWARD_TYPE_REMOTE:
  149. data, err = man.validateRemoteSetPort(ctx, data, epId, portReq)
  150. }
  151. return data, err
  152. }
  153. if typ == cloudproxy_api.FORWARD_TYPE_REMOTE && agentId == "" {
  154. var err error
  155. data, err = man.validateRemoteSelectAgent(ctx, data)
  156. if err != nil {
  157. return nil, httperrors.NewResourceNotFoundError("select proxy agent: %v", err)
  158. }
  159. }
  160. var err error
  161. if portReq <= 0 {
  162. portTotal := cloudproxy_api.BindPortMax - cloudproxy_api.BindPortMin + 1
  163. portReqStart := rand.Intn(portTotal)
  164. for portInc := portReqStart; ; {
  165. data, err = validateOne(cloudproxy_api.BindPortMin + portInc)
  166. if err == nil {
  167. break
  168. }
  169. portInc += 1
  170. if portInc == portTotal {
  171. portInc = 0
  172. }
  173. if portInc == portReqStart {
  174. return nil, httperrors.NewOutOfResourceError("no available port for bind")
  175. }
  176. }
  177. } else {
  178. data, err = validateOne(portReq)
  179. }
  180. return data, err
  181. }
  182. func (man *SForwardManager) PerformCreateFromServer(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input *cloudproxy_api.ForwardCreateFromServerInput) (jsonutils.JSONObject, error) {
  183. data := jsonutils.Marshal(input).(*jsonutils.JSONDict)
  184. typeV := validators.NewStringChoicesValidator("type", cloudproxy_api.FORWARD_TYPES)
  185. portReqV := validators.NewRangeValidator("bind_port_req", cloudproxy_api.BindPortMin, cloudproxy_api.BindPortMax)
  186. remotePortV := validators.NewPortValidator("remote_port")
  187. {
  188. for _, v := range []validators.IValidator{
  189. typeV,
  190. portReqV.Optional(true),
  191. remotePortV,
  192. validators.NewNonNegativeValidator("last_seen_timeout").Optional(true),
  193. } {
  194. if err := v.Validate(ctx, data); err != nil {
  195. return nil, err
  196. }
  197. }
  198. }
  199. serverId := input.ServerId
  200. if serverId == "" {
  201. return nil, httperrors.NewBadRequestError("server_id is required")
  202. }
  203. serverInfo, err := getServerInfo(ctx, userCred, serverId)
  204. if err != nil {
  205. return nil, err
  206. }
  207. nic := serverInfo.GetNic()
  208. if nic == nil {
  209. return nil, httperrors.NewBadRequestError("cannot find network interface for this server")
  210. }
  211. proxymatch := ProxyMatchManager.findMatch(ctx, nic.NetworkId, nic.VpcId)
  212. if proxymatch == nil {
  213. return nil, httperrors.NewBadRequestError("cannot find an endpoint for this server")
  214. }
  215. data.Set("opaque", jsonutils.NewString(serverInfo.Server.Id))
  216. data.Set("remote_addr", jsonutils.NewString(nic.IpAddr))
  217. data.Set("proxy_endpoint_id", jsonutils.NewString(proxymatch.ProxyEndpointId))
  218. typ := typeV.Value
  219. agentId := ""
  220. epId := proxymatch.ProxyEndpointId
  221. if data.Contains("bind_port_req") {
  222. portReq := int(portReqV.Value)
  223. data, err = man.validatePortReq(ctx, typ, portReq, agentId, epId, data)
  224. } else {
  225. data, err = man.validatePortReq(ctx, typ, -1, agentId, epId, data)
  226. }
  227. forwardObj, err := db.NewModelObject(man)
  228. if err != nil {
  229. return nil, httperrors.NewGeneralError(err)
  230. }
  231. forward := forwardObj.(*SForward)
  232. if err := data.Unmarshal(forward); err != nil {
  233. return nil, httperrors.NewServerError("unmarshal create params: %v", err)
  234. }
  235. forward.Name = fmt.Sprintf("%s-%s-%d", serverInfo.Server.Name, typ, forward.RemotePort)
  236. forward.DomainId = userCred.GetProjectDomainId()
  237. forward.ProjectId = userCred.GetProjectId()
  238. if err := man.TableSpec().Insert(ctx, forward); err != nil {
  239. return nil, httperrors.NewServerError("database insertion error: %v", err)
  240. }
  241. return db.GetItemDetails(man, forward, ctx, userCred)
  242. }
  243. func (man *SForwardManager) ValidateCreateData(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, data *jsonutils.JSONDict) (*jsonutils.JSONDict, error) {
  244. endpointV := validators.NewModelIdOrNameValidator("proxy_endpoint", ProxyEndpointManager.Keyword(), ownerId)
  245. agentV := validators.NewModelIdOrNameValidator("proxy_agent", ProxyAgentManager.Keyword(), ownerId)
  246. typeV := validators.NewStringChoicesValidator("type", cloudproxy_api.FORWARD_TYPES)
  247. portReqV := validators.NewRangeValidator("bind_port_req", cloudproxy_api.BindPortMin, cloudproxy_api.BindPortMax)
  248. for _, v := range []validators.IValidator{
  249. endpointV,
  250. agentV.Optional(true),
  251. typeV,
  252. validators.NewIPv4AddrValidator("remote_addr"),
  253. validators.NewPortValidator("remote_port"),
  254. portReqV.Optional(true),
  255. validators.NewNonNegativeValidator("last_seen_timeout").Optional(true),
  256. } {
  257. if err := v.Validate(ctx, data); err != nil {
  258. return nil, err
  259. }
  260. }
  261. typ := typeV.Value
  262. epId := endpointV.Model.GetId()
  263. var agentId string
  264. if agentV.Model != nil {
  265. agentId = agentV.Model.GetId()
  266. }
  267. var err error
  268. if data.Contains("bind_port_req") {
  269. portReq := int(portReqV.Value)
  270. data, err = man.validatePortReq(ctx, typ, portReq, agentId, epId, data)
  271. } else {
  272. data, err = man.validatePortReq(ctx, typ, -1, agentId, epId, data)
  273. }
  274. return data, err
  275. }
  276. func (fwd *SForward) ValidateUpdateData(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data *jsonutils.JSONDict) (*jsonutils.JSONDict, error) {
  277. endpointV := validators.NewModelIdOrNameValidator("proxy_endpoint", ProxyEndpointManager.Keyword(), userCred)
  278. agentV := validators.NewModelIdOrNameValidator("proxy_agent", ProxyAgentManager.Keyword(), userCred)
  279. portReqV := validators.NewRangeValidator("bind_port_req", cloudproxy_api.BindPortMin, cloudproxy_api.BindPortMax)
  280. for _, v := range []validators.IValidator{
  281. endpointV,
  282. agentV.Optional(true),
  283. validators.NewIPv4AddrValidator("remote_addr"),
  284. validators.NewPortValidator("remote_port"),
  285. portReqV,
  286. validators.NewNonNegativeValidator("last_seen_timeout"),
  287. } {
  288. v.Optional(true)
  289. if err := v.Validate(ctx, data); err != nil {
  290. return nil, err
  291. }
  292. }
  293. portReq := int(portReqV.Value)
  294. if portReq != fwd.BindPortReq {
  295. var err error
  296. var agentId string
  297. if agentV.Model == nil {
  298. agentId = fwd.ProxyAgentId
  299. } else {
  300. agentId = agentV.Model.GetId()
  301. }
  302. switch typ := fwd.Type; typ {
  303. case cloudproxy_api.FORWARD_TYPE_LOCAL:
  304. data, err = ForwardManager.validateLocalSetPort(ctx, data, agentId, portReq)
  305. case cloudproxy_api.FORWARD_TYPE_REMOTE:
  306. data, err = ForwardManager.validateRemoteSetPort(ctx, data, agentId, portReq)
  307. }
  308. if err != nil {
  309. return nil, err
  310. }
  311. }
  312. return data, nil
  313. }
  314. func (man *SForwardManager) ListItemFilter(
  315. ctx context.Context,
  316. q *sqlchemy.SQuery,
  317. userCred mcclient.TokenCredential,
  318. input cloudproxy_api.ForwardListInput,
  319. ) (*sqlchemy.SQuery, error) {
  320. filters := [][2]string{
  321. [2]string{"type", input.Type},
  322. [2]string{"remote_addr", input.RemoteAddr},
  323. [2]string{"proxy_endpoint_id", input.ProxyEndpointId},
  324. [2]string{"proxy_agent_id", input.ProxyAgentId},
  325. [2]string{"opaque", input.Opaque},
  326. }
  327. for _, filter := range filters {
  328. if v := filter[1]; v != "" {
  329. q = q.Equals(filter[0], v)
  330. }
  331. }
  332. intFilters := []struct {
  333. name string
  334. val *int
  335. }{
  336. {"remote_port", input.RemotePort},
  337. {"bind_port_req", input.BindPortReq},
  338. }
  339. for _, filter := range intFilters {
  340. if v := filter.val; v != nil {
  341. q = q.Equals(filter.name, *v)
  342. }
  343. }
  344. return q, nil
  345. }
  346. func (man *SForwardManager) FetchCustomizeColumns(
  347. ctx context.Context,
  348. userCred mcclient.TokenCredential,
  349. query jsonutils.JSONObject,
  350. objs []interface{},
  351. fields stringutils2.SSortedStrings,
  352. isList bool,
  353. ) []*jsonutils.JSONDict {
  354. fwds := gotypes.ConvertSliceElemType(objs, (**SForward)(nil)).([]*SForward)
  355. paMap := map[string]*SProxyAgent{}
  356. peMap := map[string]*SProxyEndpoint{}
  357. {
  358. var paIds []string
  359. var peIds []string
  360. {
  361. paIdMap := map[string]string{}
  362. peIdMap := map[string]string{}
  363. for _, fwd := range fwds {
  364. paIdMap[fwd.ProxyAgentId] = ""
  365. peIdMap[fwd.ProxyEndpointId] = ""
  366. }
  367. for id := range paIdMap {
  368. if id != "" {
  369. paIds = append(paIds, id)
  370. }
  371. }
  372. for id := range peIdMap {
  373. if id != "" {
  374. peIds = append(peIds, id)
  375. }
  376. }
  377. }
  378. var pas []SProxyAgent
  379. var pes []SProxyEndpoint
  380. {
  381. paQ := ProxyAgentManager.Query().In("id", paIds)
  382. if err := db.FetchModelObjects(ProxyAgentManager, paQ, &pas); err != nil {
  383. return nil
  384. }
  385. peQ := ProxyEndpointManager.Query().In("id", peIds)
  386. if err := db.FetchModelObjects(ProxyEndpointManager, peQ, &pes); err != nil {
  387. return nil
  388. }
  389. }
  390. for i := range pas {
  391. pa := &pas[i]
  392. paMap[pa.Id] = pa
  393. }
  394. for i := range pes {
  395. pe := &pes[i]
  396. peMap[pe.Id] = pe
  397. }
  398. }
  399. r := make([]*jsonutils.JSONDict, len(objs))
  400. for i, fwd := range fwds {
  401. d := jsonutils.NewDict()
  402. pa, paOK := paMap[fwd.ProxyAgentId]
  403. pe, peOK := peMap[fwd.ProxyEndpointId]
  404. if paOK || peOK {
  405. if paOK {
  406. d.Set("proxy_agent", jsonutils.NewString(pa.Name))
  407. }
  408. if peOK {
  409. d.Set("proxy_endpoint", jsonutils.NewString(pe.Name))
  410. }
  411. switch fwd.Type {
  412. case cloudproxy_api.FORWARD_TYPE_LOCAL:
  413. if paOK {
  414. d.Set("bind_addr", jsonutils.NewString(pa.AdvertiseAddr))
  415. }
  416. case cloudproxy_api.FORWARD_TYPE_REMOTE:
  417. if peOK {
  418. d.Set("bind_addr", jsonutils.NewString(pe.IntranetIpAddr))
  419. }
  420. }
  421. r[i] = d
  422. }
  423. }
  424. return r
  425. }
  426. func (fwd *SForward) PerformHeartbeat(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input *cloudproxy_api.ForwardHeartbeatInput) (jsonutils.JSONObject, error) {
  427. if _, err := db.Update(fwd, func() error {
  428. fwd.LastSeen = time.Now()
  429. return nil
  430. }); err != nil {
  431. return nil, err
  432. }
  433. return nil, nil
  434. }
  435. func (fwd *SForward) GetDetailsLastseen(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject) (jsonutils.JSONObject, error) {
  436. ret := jsonutils.NewDict()
  437. var lastSeen string
  438. if fwd.LastSeen.IsZero() {
  439. lastSeen = ""
  440. } else {
  441. lastSeen = fwd.LastSeen.Format("2006-01-02T15:04:05.000000Z")
  442. }
  443. ret.Add(jsonutils.NewString(lastSeen), "last_seen")
  444. return ret, nil
  445. }