request-server.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. package sftp
  2. import (
  3. "context"
  4. "errors"
  5. "io"
  6. "path"
  7. "path/filepath"
  8. "strconv"
  9. "sync"
  10. )
  11. var maxTxPacket uint32 = 1 << 15
  12. // Handlers contains the 4 SFTP server request handlers.
  13. type Handlers struct {
  14. FileGet FileReader
  15. FilePut FileWriter
  16. FileCmd FileCmder
  17. FileList FileLister
  18. }
  19. // RequestServer abstracts the sftp protocol with an http request-like protocol
  20. type RequestServer struct {
  21. Handlers Handlers
  22. *serverConn
  23. pktMgr *packetManager
  24. startDirectory string
  25. mu sync.RWMutex
  26. handleCount int
  27. openRequests map[string]*Request
  28. }
  29. // A RequestServerOption is a function which applies configuration to a RequestServer.
  30. type RequestServerOption func(*RequestServer)
  31. // WithRSAllocator enable the allocator.
  32. // After processing a packet we keep in memory the allocated slices
  33. // and we reuse them for new packets.
  34. // The allocator is experimental
  35. func WithRSAllocator() RequestServerOption {
  36. return func(rs *RequestServer) {
  37. alloc := newAllocator()
  38. rs.pktMgr.alloc = alloc
  39. rs.conn.alloc = alloc
  40. }
  41. }
  42. // WithStartDirectory sets a start directory to use as base for relative paths.
  43. // If unset the default is "/"
  44. func WithStartDirectory(startDirectory string) RequestServerOption {
  45. return func(rs *RequestServer) {
  46. rs.startDirectory = cleanPath(startDirectory)
  47. }
  48. }
  49. // NewRequestServer creates/allocates/returns new RequestServer.
  50. // Normally there will be one server per user-session.
  51. func NewRequestServer(rwc io.ReadWriteCloser, h Handlers, options ...RequestServerOption) *RequestServer {
  52. svrConn := &serverConn{
  53. conn: conn{
  54. Reader: rwc,
  55. WriteCloser: rwc,
  56. },
  57. }
  58. rs := &RequestServer{
  59. Handlers: h,
  60. serverConn: svrConn,
  61. pktMgr: newPktMgr(svrConn),
  62. startDirectory: "/",
  63. openRequests: make(map[string]*Request),
  64. }
  65. for _, o := range options {
  66. o(rs)
  67. }
  68. return rs
  69. }
  70. // New Open packet/Request
  71. func (rs *RequestServer) nextRequest(r *Request) string {
  72. rs.mu.Lock()
  73. defer rs.mu.Unlock()
  74. rs.handleCount++
  75. r.handle = strconv.Itoa(rs.handleCount)
  76. rs.openRequests[r.handle] = r
  77. return r.handle
  78. }
  79. // Returns Request from openRequests, bool is false if it is missing.
  80. //
  81. // The Requests in openRequests work essentially as open file descriptors that
  82. // you can do different things with. What you are doing with it are denoted by
  83. // the first packet of that type (read/write/etc).
  84. func (rs *RequestServer) getRequest(handle string) (*Request, bool) {
  85. rs.mu.RLock()
  86. defer rs.mu.RUnlock()
  87. r, ok := rs.openRequests[handle]
  88. return r, ok
  89. }
  90. // Close the Request and clear from openRequests map
  91. func (rs *RequestServer) closeRequest(handle string) error {
  92. rs.mu.Lock()
  93. defer rs.mu.Unlock()
  94. if r, ok := rs.openRequests[handle]; ok {
  95. delete(rs.openRequests, handle)
  96. return r.close()
  97. }
  98. return EBADF
  99. }
  100. // Close the read/write/closer to trigger exiting the main server loop
  101. func (rs *RequestServer) Close() error { return rs.conn.Close() }
  102. func (rs *RequestServer) serveLoop(pktChan chan<- orderedRequest) error {
  103. defer close(pktChan) // shuts down sftpServerWorkers
  104. var err error
  105. var pkt requestPacket
  106. var pktType uint8
  107. var pktBytes []byte
  108. for {
  109. pktType, pktBytes, err = rs.serverConn.recvPacket(rs.pktMgr.getNextOrderID())
  110. if err != nil {
  111. // we don't care about releasing allocated pages here, the server will quit and the allocator freed
  112. return err
  113. }
  114. pkt, err = makePacket(rxPacket{fxp(pktType), pktBytes})
  115. if err != nil {
  116. switch {
  117. case errors.Is(err, errUnknownExtendedPacket):
  118. // do nothing
  119. default:
  120. debug("makePacket err: %v", err)
  121. rs.conn.Close() // shuts down recvPacket
  122. return err
  123. }
  124. }
  125. pktChan <- rs.pktMgr.newOrderedRequest(pkt)
  126. }
  127. }
  128. // Serve requests for user session
  129. func (rs *RequestServer) Serve() error {
  130. defer func() {
  131. if rs.pktMgr.alloc != nil {
  132. rs.pktMgr.alloc.Free()
  133. }
  134. }()
  135. ctx, cancel := context.WithCancel(context.Background())
  136. defer cancel()
  137. var wg sync.WaitGroup
  138. runWorker := func(ch chan orderedRequest) {
  139. wg.Add(1)
  140. go func() {
  141. defer wg.Done()
  142. if err := rs.packetWorker(ctx, ch); err != nil {
  143. rs.conn.Close() // shuts down recvPacket
  144. }
  145. }()
  146. }
  147. pktChan := rs.pktMgr.workerChan(runWorker)
  148. err := rs.serveLoop(pktChan)
  149. wg.Wait() // wait for all workers to exit
  150. rs.mu.Lock()
  151. defer rs.mu.Unlock()
  152. // make sure all open requests are properly closed
  153. // (eg. possible on dropped connections, client crashes, etc.)
  154. for handle, req := range rs.openRequests {
  155. if err == io.EOF {
  156. err = io.ErrUnexpectedEOF
  157. }
  158. req.transferError(err)
  159. delete(rs.openRequests, handle)
  160. req.close()
  161. }
  162. return err
  163. }
  164. func (rs *RequestServer) packetWorker(ctx context.Context, pktChan chan orderedRequest) error {
  165. for pkt := range pktChan {
  166. orderID := pkt.orderID()
  167. if epkt, ok := pkt.requestPacket.(*sshFxpExtendedPacket); ok {
  168. if epkt.SpecificPacket != nil {
  169. pkt.requestPacket = epkt.SpecificPacket
  170. }
  171. }
  172. var rpkt responsePacket
  173. switch pkt := pkt.requestPacket.(type) {
  174. case *sshFxInitPacket:
  175. rpkt = &sshFxVersionPacket{Version: sftpProtocolVersion, Extensions: sftpExtensions}
  176. case *sshFxpClosePacket:
  177. handle := pkt.getHandle()
  178. rpkt = statusFromError(pkt.ID, rs.closeRequest(handle))
  179. case *sshFxpRealpathPacket:
  180. var realPath string
  181. var err error
  182. switch pather := rs.Handlers.FileList.(type) {
  183. case RealPathFileLister:
  184. realPath, err = pather.RealPath(pkt.getPath())
  185. case legacyRealPathFileLister:
  186. realPath = pather.RealPath(pkt.getPath())
  187. default:
  188. realPath = cleanPathWithBase(rs.startDirectory, pkt.getPath())
  189. }
  190. if err != nil {
  191. rpkt = statusFromError(pkt.ID, err)
  192. } else {
  193. rpkt = cleanPacketPath(pkt, realPath)
  194. }
  195. case *sshFxpOpendirPacket:
  196. request := requestFromPacket(ctx, pkt, rs.startDirectory)
  197. handle := rs.nextRequest(request)
  198. rpkt = request.opendir(rs.Handlers, pkt)
  199. if _, ok := rpkt.(*sshFxpHandlePacket); !ok {
  200. // if we return an error we have to remove the handle from the active ones
  201. rs.closeRequest(handle)
  202. }
  203. case *sshFxpOpenPacket:
  204. request := requestFromPacket(ctx, pkt, rs.startDirectory)
  205. handle := rs.nextRequest(request)
  206. rpkt = request.open(rs.Handlers, pkt)
  207. if _, ok := rpkt.(*sshFxpHandlePacket); !ok {
  208. // if we return an error we have to remove the handle from the active ones
  209. rs.closeRequest(handle)
  210. }
  211. case *sshFxpFstatPacket:
  212. handle := pkt.getHandle()
  213. request, ok := rs.getRequest(handle)
  214. if !ok {
  215. rpkt = statusFromError(pkt.ID, EBADF)
  216. } else {
  217. request = &Request{
  218. Method: "Stat",
  219. Filepath: cleanPathWithBase(rs.startDirectory, request.Filepath),
  220. }
  221. rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID)
  222. }
  223. case *sshFxpFsetstatPacket:
  224. handle := pkt.getHandle()
  225. request, ok := rs.getRequest(handle)
  226. if !ok {
  227. rpkt = statusFromError(pkt.ID, EBADF)
  228. } else {
  229. request = &Request{
  230. Method: "Setstat",
  231. Filepath: cleanPathWithBase(rs.startDirectory, request.Filepath),
  232. }
  233. rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID)
  234. }
  235. case *sshFxpExtendedPacketPosixRename:
  236. request := &Request{
  237. Method: "PosixRename",
  238. Filepath: cleanPathWithBase(rs.startDirectory, pkt.Oldpath),
  239. Target: cleanPathWithBase(rs.startDirectory, pkt.Newpath),
  240. }
  241. rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID)
  242. case *sshFxpExtendedPacketStatVFS:
  243. request := &Request{
  244. Method: "StatVFS",
  245. Filepath: cleanPathWithBase(rs.startDirectory, pkt.Path),
  246. }
  247. rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID)
  248. case hasHandle:
  249. handle := pkt.getHandle()
  250. request, ok := rs.getRequest(handle)
  251. if !ok {
  252. rpkt = statusFromError(pkt.id(), EBADF)
  253. } else {
  254. rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID)
  255. }
  256. case hasPath:
  257. request := requestFromPacket(ctx, pkt, rs.startDirectory)
  258. rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID)
  259. request.close()
  260. default:
  261. rpkt = statusFromError(pkt.id(), ErrSSHFxOpUnsupported)
  262. }
  263. rs.pktMgr.readyPacket(
  264. rs.pktMgr.newOrderedResponse(rpkt, orderID))
  265. }
  266. return nil
  267. }
  268. // clean and return name packet for file
  269. func cleanPacketPath(pkt *sshFxpRealpathPacket, realPath string) responsePacket {
  270. return &sshFxpNamePacket{
  271. ID: pkt.id(),
  272. NameAttrs: []*sshFxpNameAttr{
  273. {
  274. Name: realPath,
  275. LongName: realPath,
  276. Attrs: emptyFileStat,
  277. },
  278. },
  279. }
  280. }
  281. // Makes sure we have a clean POSIX (/) absolute path to work with
  282. func cleanPath(p string) string {
  283. return cleanPathWithBase("/", p)
  284. }
  285. func cleanPathWithBase(base, p string) string {
  286. p = filepath.ToSlash(filepath.Clean(p))
  287. if !path.IsAbs(p) {
  288. return path.Join(base, p)
  289. }
  290. return p
  291. }