Source file src/internal/poll/fd_mutex.go

     1  // Copyright 2013 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package poll
     6  
     7  import "sync/atomic"
     8  
     9  // fdMutex is a specialized synchronization primitive that manages
    10  // lifetime of an fd and serializes access to Read, Write and Close
    11  // methods on FD.
    12  type fdMutex struct {
    13  	state uint64
    14  	rsema uint32
    15  	wsema uint32
    16  }
    17  
    18  // fdMutex.state is organized as follows:
    19  // 1 bit - whether FD is closed, if set all subsequent lock operations will fail.
    20  // 1 bit - lock for read operations.
    21  // 1 bit - lock for write operations.
    22  // 20 bits - total number of references (read+write+misc).
    23  // 20 bits - number of outstanding read waiters.
    24  // 20 bits - number of outstanding write waiters.
    25  const (
    26  	mutexClosed  = 1 << 0
    27  	mutexRLock   = 1 << 1
    28  	mutexWLock   = 1 << 2
    29  	mutexRef     = 1 << 3
    30  	mutexRefMask = (1<<20 - 1) << 3
    31  	mutexRWait   = 1 << 23
    32  	mutexRMask   = (1<<20 - 1) << 23
    33  	mutexWWait   = 1 << 43
    34  	mutexWMask   = (1<<20 - 1) << 43
    35  )
    36  
    37  const overflowMsg = "too many concurrent operations on a single file or socket (max 1048575)"
    38  
    39  // Read operations must do rwlock(true)/rwunlock(true).
    40  //
    41  // Write operations must do rwlock(false)/rwunlock(false).
    42  //
    43  // Misc operations must do incref/decref.
    44  // Misc operations include functions like setsockopt and setDeadline.
    45  // They need to use incref/decref to ensure that they operate on the
    46  // correct fd in presence of a concurrent close call (otherwise fd can
    47  // be closed under their feet).
    48  //
    49  // Close operations must do increfAndClose/decref.
    50  
    51  // incref adds a reference to mu.
    52  // It reports whether mu is available for reading or writing.
    53  func (mu *fdMutex) incref() bool {
    54  	for {
    55  		old := atomic.LoadUint64(&mu.state)
    56  		if old&mutexClosed != 0 {
    57  			return false
    58  		}
    59  		new := old + mutexRef
    60  		if new&mutexRefMask == 0 {
    61  			panic(overflowMsg)
    62  		}
    63  		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
    64  			return true
    65  		}
    66  	}
    67  }
    68  
    69  // increfAndClose sets the state of mu to closed.
    70  // It returns false if the file was already closed.
    71  func (mu *fdMutex) increfAndClose() bool {
    72  	for {
    73  		old := atomic.LoadUint64(&mu.state)
    74  		if old&mutexClosed != 0 {
    75  			return false
    76  		}
    77  		// Mark as closed and acquire a reference.
    78  		new := (old | mutexClosed) + mutexRef
    79  		if new&mutexRefMask == 0 {
    80  			panic(overflowMsg)
    81  		}
    82  		// Remove all read and write waiters.
    83  		new &^= mutexRMask | mutexWMask
    84  		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
    85  			// Wake all read and write waiters,
    86  			// they will observe closed flag after wakeup.
    87  			for old&mutexRMask != 0 {
    88  				old -= mutexRWait
    89  				runtime_Semrelease(&mu.rsema)
    90  			}
    91  			for old&mutexWMask != 0 {
    92  				old -= mutexWWait
    93  				runtime_Semrelease(&mu.wsema)
    94  			}
    95  			return true
    96  		}
    97  	}
    98  }
    99  
   100  // decref removes a reference from mu.
   101  // It reports whether there is no remaining reference.
   102  func (mu *fdMutex) decref() bool {
   103  	for {
   104  		old := atomic.LoadUint64(&mu.state)
   105  		if old&mutexRefMask == 0 {
   106  			panic("inconsistent poll.fdMutex")
   107  		}
   108  		new := old - mutexRef
   109  		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
   110  			return new&(mutexClosed|mutexRefMask) == mutexClosed
   111  		}
   112  	}
   113  }
   114  
   115  // lock adds a reference to mu and locks mu.
   116  // It reports whether mu is available for reading or writing.
   117  func (mu *fdMutex) rwlock(read bool) bool {
   118  	var mutexBit, mutexWait, mutexMask uint64
   119  	var mutexSema *uint32
   120  	if read {
   121  		mutexBit = mutexRLock
   122  		mutexWait = mutexRWait
   123  		mutexMask = mutexRMask
   124  		mutexSema = &mu.rsema
   125  	} else {
   126  		mutexBit = mutexWLock
   127  		mutexWait = mutexWWait
   128  		mutexMask = mutexWMask
   129  		mutexSema = &mu.wsema
   130  	}
   131  	for {
   132  		old := atomic.LoadUint64(&mu.state)
   133  		if old&mutexClosed != 0 {
   134  			return false
   135  		}
   136  		var new uint64
   137  		if old&mutexBit == 0 {
   138  			// Lock is free, acquire it.
   139  			new = (old | mutexBit) + mutexRef
   140  			if new&mutexRefMask == 0 {
   141  				panic(overflowMsg)
   142  			}
   143  		} else {
   144  			// Wait for lock.
   145  			new = old + mutexWait
   146  			if new&mutexMask == 0 {
   147  				panic(overflowMsg)
   148  			}
   149  		}
   150  		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
   151  			if old&mutexBit == 0 {
   152  				return true
   153  			}
   154  			runtime_Semacquire(mutexSema)
   155  			// The signaller has subtracted mutexWait.
   156  		}
   157  	}
   158  }
   159  
   160  // unlock removes a reference from mu and unlocks mu.
   161  // It reports whether there is no remaining reference.
   162  func (mu *fdMutex) rwunlock(read bool) bool {
   163  	var mutexBit, mutexWait, mutexMask uint64
   164  	var mutexSema *uint32
   165  	if read {
   166  		mutexBit = mutexRLock
   167  		mutexWait = mutexRWait
   168  		mutexMask = mutexRMask
   169  		mutexSema = &mu.rsema
   170  	} else {
   171  		mutexBit = mutexWLock
   172  		mutexWait = mutexWWait
   173  		mutexMask = mutexWMask
   174  		mutexSema = &mu.wsema
   175  	}
   176  	for {
   177  		old := atomic.LoadUint64(&mu.state)
   178  		if old&mutexBit == 0 || old&mutexRefMask == 0 {
   179  			panic("inconsistent poll.fdMutex")
   180  		}
   181  		// Drop lock, drop reference and wake read waiter if present.
   182  		new := (old &^ mutexBit) - mutexRef
   183  		if old&mutexMask != 0 {
   184  			new -= mutexWait
   185  		}
   186  		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
   187  			if old&mutexMask != 0 {
   188  				runtime_Semrelease(mutexSema)
   189  			}
   190  			return new&(mutexClosed|mutexRefMask) == mutexClosed
   191  		}
   192  	}
   193  }
   194  
   195  // Implemented in runtime package.
   196  func runtime_Semacquire(sema *uint32)
   197  func runtime_Semrelease(sema *uint32)
   198  
   199  // incref adds a reference to fd.
   200  // It returns an error when fd cannot be used.
   201  func (fd *FD) incref() error {
   202  	if !fd.fdmu.incref() {
   203  		return errClosing(fd.isFile)
   204  	}
   205  	return nil
   206  }
   207  
   208  // decref removes a reference from fd.
   209  // It also closes fd when the state of fd is set to closed and there
   210  // is no remaining reference.
   211  func (fd *FD) decref() error {
   212  	if fd.fdmu.decref() {
   213  		return fd.destroy()
   214  	}
   215  	return nil
   216  }
   217  
   218  // readLock adds a reference to fd and locks fd for reading.
   219  // It returns an error when fd cannot be used for reading.
   220  func (fd *FD) readLock() error {
   221  	if !fd.fdmu.rwlock(true) {
   222  		return errClosing(fd.isFile)
   223  	}
   224  	return nil
   225  }
   226  
   227  // readUnlock removes a reference from fd and unlocks fd for reading.
   228  // It also closes fd when the state of fd is set to closed and there
   229  // is no remaining reference.
   230  func (fd *FD) readUnlock() {
   231  	if fd.fdmu.rwunlock(true) {
   232  		fd.destroy()
   233  	}
   234  }
   235  
   236  // writeLock adds a reference to fd and locks fd for writing.
   237  // It returns an error when fd cannot be used for writing.
   238  func (fd *FD) writeLock() error {
   239  	if !fd.fdmu.rwlock(false) {
   240  		return errClosing(fd.isFile)
   241  	}
   242  	return nil
   243  }
   244  
   245  // writeUnlock removes a reference from fd and unlocks fd for writing.
   246  // It also closes fd when the state of fd is set to closed and there
   247  // is no remaining reference.
   248  func (fd *FD) writeUnlock() {
   249  	if fd.fdmu.rwunlock(false) {
   250  		fd.destroy()
   251  	}
   252  }
   253  

View as plain text