file.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. //go:build windows
  2. // +build windows
  3. package winio
  4. import (
  5. "errors"
  6. "io"
  7. "runtime"
  8. "sync"
  9. "sync/atomic"
  10. "syscall"
  11. "time"
  12. "golang.org/x/sys/windows"
  13. )
  14. //sys cancelIoEx(file windows.Handle, o *windows.Overlapped) (err error) = CancelIoEx
  15. //sys createIoCompletionPort(file windows.Handle, port windows.Handle, key uintptr, threadCount uint32) (newport windows.Handle, err error) = CreateIoCompletionPort
  16. //sys getQueuedCompletionStatus(port windows.Handle, bytes *uint32, key *uintptr, o **ioOperation, timeout uint32) (err error) = GetQueuedCompletionStatus
  17. //sys setFileCompletionNotificationModes(h windows.Handle, flags uint8) (err error) = SetFileCompletionNotificationModes
  18. //sys wsaGetOverlappedResult(h windows.Handle, o *windows.Overlapped, bytes *uint32, wait bool, flags *uint32) (err error) = ws2_32.WSAGetOverlappedResult
  19. var (
  20. ErrFileClosed = errors.New("file has already been closed")
  21. ErrTimeout = &timeoutError{}
  22. )
  23. type timeoutError struct{}
  24. func (*timeoutError) Error() string { return "i/o timeout" }
  25. func (*timeoutError) Timeout() bool { return true }
  26. func (*timeoutError) Temporary() bool { return true }
  27. type timeoutChan chan struct{}
  28. var ioInitOnce sync.Once
  29. var ioCompletionPort windows.Handle
  30. // ioResult contains the result of an asynchronous IO operation.
  31. type ioResult struct {
  32. bytes uint32
  33. err error
  34. }
  35. // ioOperation represents an outstanding asynchronous Win32 IO.
  36. type ioOperation struct {
  37. o windows.Overlapped
  38. ch chan ioResult
  39. }
  40. func initIO() {
  41. h, err := createIoCompletionPort(windows.InvalidHandle, 0, 0, 0xffffffff)
  42. if err != nil {
  43. panic(err)
  44. }
  45. ioCompletionPort = h
  46. go ioCompletionProcessor(h)
  47. }
  48. // win32File implements Reader, Writer, and Closer on a Win32 handle without blocking in a syscall.
  49. // It takes ownership of this handle and will close it if it is garbage collected.
  50. type win32File struct {
  51. handle windows.Handle
  52. wg sync.WaitGroup
  53. wgLock sync.RWMutex
  54. closing atomic.Bool
  55. socket bool
  56. readDeadline deadlineHandler
  57. writeDeadline deadlineHandler
  58. }
  59. type deadlineHandler struct {
  60. setLock sync.Mutex
  61. channel timeoutChan
  62. channelLock sync.RWMutex
  63. timer *time.Timer
  64. timedout atomic.Bool
  65. }
  66. // makeWin32File makes a new win32File from an existing file handle.
  67. func makeWin32File(h windows.Handle) (*win32File, error) {
  68. f := &win32File{handle: h}
  69. ioInitOnce.Do(initIO)
  70. _, err := createIoCompletionPort(h, ioCompletionPort, 0, 0xffffffff)
  71. if err != nil {
  72. return nil, err
  73. }
  74. err = setFileCompletionNotificationModes(h, windows.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS|windows.FILE_SKIP_SET_EVENT_ON_HANDLE)
  75. if err != nil {
  76. return nil, err
  77. }
  78. f.readDeadline.channel = make(timeoutChan)
  79. f.writeDeadline.channel = make(timeoutChan)
  80. return f, nil
  81. }
  82. // Deprecated: use NewOpenFile instead.
  83. func MakeOpenFile(h syscall.Handle) (io.ReadWriteCloser, error) {
  84. return NewOpenFile(windows.Handle(h))
  85. }
  86. func NewOpenFile(h windows.Handle) (io.ReadWriteCloser, error) {
  87. // If we return the result of makeWin32File directly, it can result in an
  88. // interface-wrapped nil, rather than a nil interface value.
  89. f, err := makeWin32File(h)
  90. if err != nil {
  91. return nil, err
  92. }
  93. return f, nil
  94. }
  95. // closeHandle closes the resources associated with a Win32 handle.
  96. func (f *win32File) closeHandle() {
  97. f.wgLock.Lock()
  98. // Atomically set that we are closing, releasing the resources only once.
  99. if !f.closing.Swap(true) {
  100. f.wgLock.Unlock()
  101. // cancel all IO and wait for it to complete
  102. _ = cancelIoEx(f.handle, nil)
  103. f.wg.Wait()
  104. // at this point, no new IO can start
  105. windows.Close(f.handle)
  106. f.handle = 0
  107. } else {
  108. f.wgLock.Unlock()
  109. }
  110. }
  111. // Close closes a win32File.
  112. func (f *win32File) Close() error {
  113. f.closeHandle()
  114. return nil
  115. }
  116. // IsClosed checks if the file has been closed.
  117. func (f *win32File) IsClosed() bool {
  118. return f.closing.Load()
  119. }
  120. // prepareIO prepares for a new IO operation.
  121. // The caller must call f.wg.Done() when the IO is finished, prior to Close() returning.
  122. func (f *win32File) prepareIO() (*ioOperation, error) {
  123. f.wgLock.RLock()
  124. if f.closing.Load() {
  125. f.wgLock.RUnlock()
  126. return nil, ErrFileClosed
  127. }
  128. f.wg.Add(1)
  129. f.wgLock.RUnlock()
  130. c := &ioOperation{}
  131. c.ch = make(chan ioResult)
  132. return c, nil
  133. }
  134. // ioCompletionProcessor processes completed async IOs forever.
  135. func ioCompletionProcessor(h windows.Handle) {
  136. for {
  137. var bytes uint32
  138. var key uintptr
  139. var op *ioOperation
  140. err := getQueuedCompletionStatus(h, &bytes, &key, &op, windows.INFINITE)
  141. if op == nil {
  142. panic(err)
  143. }
  144. op.ch <- ioResult{bytes, err}
  145. }
  146. }
  147. // todo: helsaawy - create an asyncIO version that takes a context
  148. // asyncIO processes the return value from ReadFile or WriteFile, blocking until
  149. // the operation has actually completed.
  150. func (f *win32File) asyncIO(c *ioOperation, d *deadlineHandler, bytes uint32, err error) (int, error) {
  151. if err != windows.ERROR_IO_PENDING { //nolint:errorlint // err is Errno
  152. return int(bytes), err
  153. }
  154. if f.closing.Load() {
  155. _ = cancelIoEx(f.handle, &c.o)
  156. }
  157. var timeout timeoutChan
  158. if d != nil {
  159. d.channelLock.Lock()
  160. timeout = d.channel
  161. d.channelLock.Unlock()
  162. }
  163. var r ioResult
  164. select {
  165. case r = <-c.ch:
  166. err = r.err
  167. if err == windows.ERROR_OPERATION_ABORTED { //nolint:errorlint // err is Errno
  168. if f.closing.Load() {
  169. err = ErrFileClosed
  170. }
  171. } else if err != nil && f.socket {
  172. // err is from Win32. Query the overlapped structure to get the winsock error.
  173. var bytes, flags uint32
  174. err = wsaGetOverlappedResult(f.handle, &c.o, &bytes, false, &flags)
  175. }
  176. case <-timeout:
  177. _ = cancelIoEx(f.handle, &c.o)
  178. r = <-c.ch
  179. err = r.err
  180. if err == windows.ERROR_OPERATION_ABORTED { //nolint:errorlint // err is Errno
  181. err = ErrTimeout
  182. }
  183. }
  184. // runtime.KeepAlive is needed, as c is passed via native
  185. // code to ioCompletionProcessor, c must remain alive
  186. // until the channel read is complete.
  187. // todo: (de)allocate *ioOperation via win32 heap functions, instead of needing to KeepAlive?
  188. runtime.KeepAlive(c)
  189. return int(r.bytes), err
  190. }
  191. // Read reads from a file handle.
  192. func (f *win32File) Read(b []byte) (int, error) {
  193. c, err := f.prepareIO()
  194. if err != nil {
  195. return 0, err
  196. }
  197. defer f.wg.Done()
  198. if f.readDeadline.timedout.Load() {
  199. return 0, ErrTimeout
  200. }
  201. var bytes uint32
  202. err = windows.ReadFile(f.handle, b, &bytes, &c.o)
  203. n, err := f.asyncIO(c, &f.readDeadline, bytes, err)
  204. runtime.KeepAlive(b)
  205. // Handle EOF conditions.
  206. if err == nil && n == 0 && len(b) != 0 {
  207. return 0, io.EOF
  208. } else if err == windows.ERROR_BROKEN_PIPE { //nolint:errorlint // err is Errno
  209. return 0, io.EOF
  210. }
  211. return n, err
  212. }
  213. // Write writes to a file handle.
  214. func (f *win32File) Write(b []byte) (int, error) {
  215. c, err := f.prepareIO()
  216. if err != nil {
  217. return 0, err
  218. }
  219. defer f.wg.Done()
  220. if f.writeDeadline.timedout.Load() {
  221. return 0, ErrTimeout
  222. }
  223. var bytes uint32
  224. err = windows.WriteFile(f.handle, b, &bytes, &c.o)
  225. n, err := f.asyncIO(c, &f.writeDeadline, bytes, err)
  226. runtime.KeepAlive(b)
  227. return n, err
  228. }
  229. func (f *win32File) SetReadDeadline(deadline time.Time) error {
  230. return f.readDeadline.set(deadline)
  231. }
  232. func (f *win32File) SetWriteDeadline(deadline time.Time) error {
  233. return f.writeDeadline.set(deadline)
  234. }
  235. func (f *win32File) Flush() error {
  236. return windows.FlushFileBuffers(f.handle)
  237. }
  238. func (f *win32File) Fd() uintptr {
  239. return uintptr(f.handle)
  240. }
  241. func (d *deadlineHandler) set(deadline time.Time) error {
  242. d.setLock.Lock()
  243. defer d.setLock.Unlock()
  244. if d.timer != nil {
  245. if !d.timer.Stop() {
  246. <-d.channel
  247. }
  248. d.timer = nil
  249. }
  250. d.timedout.Store(false)
  251. select {
  252. case <-d.channel:
  253. d.channelLock.Lock()
  254. d.channel = make(chan struct{})
  255. d.channelLock.Unlock()
  256. default:
  257. }
  258. if deadline.IsZero() {
  259. return nil
  260. }
  261. timeoutIO := func() {
  262. d.timedout.Store(true)
  263. close(d.channel)
  264. }
  265. now := time.Now()
  266. duration := deadline.Sub(now)
  267. if deadline.After(now) {
  268. // Deadline is in the future, set a timer to wait
  269. d.timer = time.AfterFunc(duration, timeoutIO)
  270. } else {
  271. // Deadline is in the past. Cancel all pending IO now.
  272. timeoutIO()
  273. }
  274. return nil
  275. }