allocator.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. /*
  2. * Copyright 2020 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package z
  17. import (
  18. "bytes"
  19. "fmt"
  20. "math"
  21. "math/bits"
  22. "math/rand"
  23. "strings"
  24. "sync"
  25. "sync/atomic"
  26. "time"
  27. "unsafe"
  28. "github.com/dustin/go-humanize"
  29. )
  30. // Allocator amortizes the cost of small allocations by allocating memory in
  31. // bigger chunks. Internally it uses z.Calloc to allocate memory. Once
  32. // allocated, the memory is not moved, so it is safe to use the allocated bytes
  33. // to unsafe cast them to Go struct pointers. Maintaining a freelist is slow.
  34. // Instead, Allocator only allocates memory, with the idea that finally we
  35. // would just release the entire Allocator.
  36. type Allocator struct {
  37. sync.Mutex
  38. compIdx uint64 // Stores bufIdx in 32 MSBs and posIdx in 32 LSBs.
  39. buffers [][]byte
  40. Ref uint64
  41. Tag string
  42. }
  43. // allocs keeps references to all Allocators, so we can safely discard them later.
  44. var allocsMu *sync.Mutex
  45. var allocRef uint64
  46. var allocs map[uint64]*Allocator
  47. var calculatedLog2 []int
  48. func init() {
  49. allocsMu = new(sync.Mutex)
  50. allocs = make(map[uint64]*Allocator)
  51. // Set up a unique Ref per process.
  52. rand.Seed(time.Now().UnixNano())
  53. allocRef = uint64(rand.Int63n(1<<16)) << 48
  54. calculatedLog2 = make([]int, 1025)
  55. for i := 1; i <= 1024; i++ {
  56. calculatedLog2[i] = int(math.Log2(float64(i)))
  57. }
  58. }
  59. // NewAllocator creates an allocator starting with the given size.
  60. func NewAllocator(sz int, tag string) *Allocator {
  61. ref := atomic.AddUint64(&allocRef, 1)
  62. // We should not allow a zero sized page because addBufferWithMinSize
  63. // will run into an infinite loop trying to double the pagesize.
  64. if sz < 512 {
  65. sz = 512
  66. }
  67. a := &Allocator{
  68. Ref: ref,
  69. buffers: make([][]byte, 64),
  70. Tag: tag,
  71. }
  72. l2 := uint64(log2(sz))
  73. if bits.OnesCount64(uint64(sz)) > 1 {
  74. l2 += 1
  75. }
  76. a.buffers[0] = Calloc(1<<l2, a.Tag)
  77. allocsMu.Lock()
  78. allocs[ref] = a
  79. allocsMu.Unlock()
  80. return a
  81. }
  82. func (a *Allocator) Reset() {
  83. atomic.StoreUint64(&a.compIdx, 0)
  84. }
  85. func Allocators() string {
  86. allocsMu.Lock()
  87. tags := make(map[string]uint64)
  88. num := make(map[string]int)
  89. for _, ac := range allocs {
  90. tags[ac.Tag] += ac.Allocated()
  91. num[ac.Tag] += 1
  92. }
  93. var buf bytes.Buffer
  94. for tag, sz := range tags {
  95. fmt.Fprintf(&buf, "Tag: %s Num: %d Size: %s . ", tag, num[tag], humanize.IBytes(sz))
  96. }
  97. allocsMu.Unlock()
  98. return buf.String()
  99. }
  100. func (a *Allocator) String() string {
  101. var s strings.Builder
  102. s.WriteString(fmt.Sprintf("Allocator: %x\n", a.Ref))
  103. var cum int
  104. for i, b := range a.buffers {
  105. cum += len(b)
  106. if len(b) == 0 {
  107. break
  108. }
  109. s.WriteString(fmt.Sprintf("idx: %d len: %d cum: %d\n", i, len(b), cum))
  110. }
  111. pos := atomic.LoadUint64(&a.compIdx)
  112. bi, pi := parse(pos)
  113. s.WriteString(fmt.Sprintf("bi: %d pi: %d\n", bi, pi))
  114. s.WriteString(fmt.Sprintf("Size: %d\n", a.Size()))
  115. return s.String()
  116. }
  117. // AllocatorFrom would return the allocator corresponding to the ref.
  118. func AllocatorFrom(ref uint64) *Allocator {
  119. allocsMu.Lock()
  120. a := allocs[ref]
  121. allocsMu.Unlock()
  122. return a
  123. }
  124. func parse(pos uint64) (bufIdx, posIdx int) {
  125. return int(pos >> 32), int(pos & 0xFFFFFFFF)
  126. }
  127. // Size returns the size of the allocations so far.
  128. func (a *Allocator) Size() int {
  129. pos := atomic.LoadUint64(&a.compIdx)
  130. bi, pi := parse(pos)
  131. var sz int
  132. for i, b := range a.buffers {
  133. if i < bi {
  134. sz += len(b)
  135. continue
  136. }
  137. sz += pi
  138. return sz
  139. }
  140. panic("Size should not reach here")
  141. }
  142. func log2(sz int) int {
  143. if sz < len(calculatedLog2) {
  144. return calculatedLog2[sz]
  145. }
  146. pow := 10
  147. sz >>= 10
  148. for sz > 1 {
  149. sz >>= 1
  150. pow++
  151. }
  152. return pow
  153. }
  154. func (a *Allocator) Allocated() uint64 {
  155. var alloc int
  156. for _, b := range a.buffers {
  157. alloc += cap(b)
  158. }
  159. return uint64(alloc)
  160. }
  161. func (a *Allocator) TrimTo(max int) {
  162. var alloc int
  163. for i, b := range a.buffers {
  164. if len(b) == 0 {
  165. break
  166. }
  167. alloc += len(b)
  168. if alloc < max {
  169. continue
  170. }
  171. Free(b)
  172. a.buffers[i] = nil
  173. }
  174. }
  175. // Release would release the memory back. Remember to make this call to avoid memory leaks.
  176. func (a *Allocator) Release() {
  177. if a == nil {
  178. return
  179. }
  180. var alloc int
  181. for _, b := range a.buffers {
  182. if len(b) == 0 {
  183. break
  184. }
  185. alloc += len(b)
  186. Free(b)
  187. }
  188. allocsMu.Lock()
  189. delete(allocs, a.Ref)
  190. allocsMu.Unlock()
  191. }
  192. const maxAlloc = 1 << 30
  193. func (a *Allocator) MaxAlloc() int {
  194. return maxAlloc
  195. }
  196. const nodeAlign = unsafe.Sizeof(uint64(0)) - 1
  197. func (a *Allocator) AllocateAligned(sz int) []byte {
  198. tsz := sz + int(nodeAlign)
  199. out := a.Allocate(tsz)
  200. // We are reusing allocators. In that case, it's important to zero out the memory allocated
  201. // here. We don't always zero it out (in Allocate), because other functions would be immediately
  202. // overwriting the allocated slices anyway (see Copy).
  203. ZeroOut(out, 0, len(out))
  204. addr := uintptr(unsafe.Pointer(&out[0]))
  205. aligned := (addr + nodeAlign) & ^nodeAlign
  206. start := int(aligned - addr)
  207. return out[start : start+sz]
  208. }
  209. func (a *Allocator) Copy(buf []byte) []byte {
  210. if a == nil {
  211. return append([]byte{}, buf...)
  212. }
  213. out := a.Allocate(len(buf))
  214. copy(out, buf)
  215. return out
  216. }
  217. func (a *Allocator) addBufferAt(bufIdx, minSz int) {
  218. for {
  219. if bufIdx >= len(a.buffers) {
  220. panic(fmt.Sprintf("Allocator can not allocate more than %d buffers", len(a.buffers)))
  221. }
  222. if len(a.buffers[bufIdx]) == 0 {
  223. break
  224. }
  225. if minSz <= len(a.buffers[bufIdx]) {
  226. // No need to do anything. We already have a buffer which can satisfy minSz.
  227. return
  228. }
  229. bufIdx++
  230. }
  231. assert(bufIdx > 0)
  232. // We need to allocate a new buffer.
  233. // Make pageSize double of the last allocation.
  234. pageSize := 2 * len(a.buffers[bufIdx-1])
  235. // Ensure pageSize is bigger than sz.
  236. for pageSize < minSz {
  237. pageSize *= 2
  238. }
  239. // If bigger than maxAlloc, trim to maxAlloc.
  240. if pageSize > maxAlloc {
  241. pageSize = maxAlloc
  242. }
  243. buf := Calloc(pageSize, a.Tag)
  244. assert(len(a.buffers[bufIdx]) == 0)
  245. a.buffers[bufIdx] = buf
  246. }
  247. func (a *Allocator) Allocate(sz int) []byte {
  248. if a == nil {
  249. return make([]byte, sz)
  250. }
  251. if sz > maxAlloc {
  252. panic(fmt.Sprintf("Unable to allocate more than %d\n", maxAlloc))
  253. }
  254. if sz == 0 {
  255. return nil
  256. }
  257. for {
  258. pos := atomic.AddUint64(&a.compIdx, uint64(sz))
  259. bufIdx, posIdx := parse(pos)
  260. buf := a.buffers[bufIdx]
  261. if posIdx > len(buf) {
  262. a.Lock()
  263. newPos := atomic.LoadUint64(&a.compIdx)
  264. newBufIdx, _ := parse(newPos)
  265. if newBufIdx != bufIdx {
  266. a.Unlock()
  267. continue
  268. }
  269. a.addBufferAt(bufIdx+1, sz)
  270. atomic.StoreUint64(&a.compIdx, uint64((bufIdx+1)<<32))
  271. a.Unlock()
  272. // We added a new buffer. Let's acquire slice the right way by going back to the top.
  273. continue
  274. }
  275. data := buf[posIdx-sz : posIdx]
  276. return data
  277. }
  278. }
  279. type AllocatorPool struct {
  280. numGets int64
  281. allocCh chan *Allocator
  282. closer *Closer
  283. }
  284. func NewAllocatorPool(sz int) *AllocatorPool {
  285. a := &AllocatorPool{
  286. allocCh: make(chan *Allocator, sz),
  287. closer: NewCloser(1),
  288. }
  289. go a.freeupAllocators()
  290. return a
  291. }
  292. func (p *AllocatorPool) Get(sz int, tag string) *Allocator {
  293. if p == nil {
  294. return NewAllocator(sz, tag)
  295. }
  296. atomic.AddInt64(&p.numGets, 1)
  297. select {
  298. case alloc := <-p.allocCh:
  299. alloc.Reset()
  300. alloc.Tag = tag
  301. return alloc
  302. default:
  303. return NewAllocator(sz, tag)
  304. }
  305. }
  306. func (p *AllocatorPool) Return(a *Allocator) {
  307. if a == nil {
  308. return
  309. }
  310. if p == nil {
  311. a.Release()
  312. return
  313. }
  314. a.TrimTo(400 << 20)
  315. select {
  316. case p.allocCh <- a:
  317. return
  318. default:
  319. a.Release()
  320. }
  321. }
  322. func (p *AllocatorPool) Release() {
  323. if p == nil {
  324. return
  325. }
  326. p.closer.SignalAndWait()
  327. }
  328. func (p *AllocatorPool) freeupAllocators() {
  329. defer p.closer.Done()
  330. ticker := time.NewTicker(2 * time.Second)
  331. defer ticker.Stop()
  332. releaseOne := func() bool {
  333. select {
  334. case alloc := <-p.allocCh:
  335. alloc.Release()
  336. return true
  337. default:
  338. return false
  339. }
  340. }
  341. var last int64
  342. for {
  343. select {
  344. case <-p.closer.HasBeenClosed():
  345. close(p.allocCh)
  346. for alloc := range p.allocCh {
  347. alloc.Release()
  348. }
  349. return
  350. case <-ticker.C:
  351. gets := atomic.LoadInt64(&p.numGets)
  352. if gets != last {
  353. // Some retrievals were made since the last time. So, let's avoid doing a release.
  354. last = gets
  355. continue
  356. }
  357. releaseOne()
  358. }
  359. }
  360. }