Source file test/typeparam/chans.go

     1  // run
     2  
     3  // Copyright 2021 The Go Authors. All rights reserved.
     4  // Use of this source code is governed by a BSD-style
     5  // license that can be found in the LICENSE file.
     6  
     7  // Package chans provides utility functions for working with channels.
     8  package main
     9  
    10  import (
    11  	"context"
    12  	"fmt"
    13  	"runtime"
    14  	"sort"
    15  	"sync"
    16  	"time"
    17  )
    18  
    19  // _Equal reports whether two slices are equal: the same length and all
    20  // elements equal. All floating point NaNs are considered equal.
    21  func _SliceEqual[Elem comparable](s1, s2 []Elem) bool {
    22  	if len(s1) != len(s2) {
    23  		return false
    24  	}
    25  	for i, v1 := range s1 {
    26  		v2 := s2[i]
    27  		if v1 != v2 {
    28  			isNaN := func(f Elem) bool { return f != f }
    29  			if !isNaN(v1) || !isNaN(v2) {
    30  				return false
    31  			}
    32  		}
    33  	}
    34  	return true
    35  }
    36  
    37  // _ReadAll reads from c until the channel is closed or the context is
    38  // canceled, returning all the values read.
    39  func _ReadAll[Elem any](ctx context.Context, c <-chan Elem) []Elem {
    40  	var r []Elem
    41  	for {
    42  		select {
    43  		case <-ctx.Done():
    44  			return r
    45  		case v, ok := <-c:
    46  			if !ok {
    47  				return r
    48  			}
    49  			r = append(r, v)
    50  		}
    51  	}
    52  }
    53  
    54  // _Merge merges two channels into a single channel.
    55  // This will leave a goroutine running until either both channels are closed
    56  // or the context is canceled, at which point the returned channel is closed.
    57  func _Merge[Elem any](ctx context.Context, c1, c2 <-chan Elem) <-chan Elem {
    58  	r := make(chan Elem)
    59  	go func(ctx context.Context, c1, c2 <-chan Elem, r chan<- Elem) {
    60  		defer close(r)
    61  		for c1 != nil || c2 != nil {
    62  			select {
    63  			case <-ctx.Done():
    64  				return
    65  			case v1, ok := <-c1:
    66  				if ok {
    67  					r <- v1
    68  				} else {
    69  					c1 = nil
    70  				}
    71  			case v2, ok := <-c2:
    72  				if ok {
    73  					r <- v2
    74  				} else {
    75  					c2 = nil
    76  				}
    77  			}
    78  		}
    79  	}(ctx, c1, c2, r)
    80  	return r
    81  }
    82  
    83  // _Filter calls f on each value read from c. If f returns true the value
    84  // is sent on the returned channel. This will leave a goroutine running
    85  // until c is closed or the context is canceled, at which point the
    86  // returned channel is closed.
    87  func _Filter[Elem any](ctx context.Context, c <-chan Elem, f func(Elem) bool) <-chan Elem {
    88  	r := make(chan Elem)
    89  	go func(ctx context.Context, c <-chan Elem, f func(Elem) bool, r chan<- Elem) {
    90  		defer close(r)
    91  		for {
    92  			select {
    93  			case <-ctx.Done():
    94  				return
    95  			case v, ok := <-c:
    96  				if !ok {
    97  					return
    98  				}
    99  				if f(v) {
   100  					r <- v
   101  				}
   102  			}
   103  		}
   104  	}(ctx, c, f, r)
   105  	return r
   106  }
   107  
   108  // _Sink returns a channel that discards all values sent to it.
   109  // This will leave a goroutine running until the context is canceled
   110  // or the returned channel is closed.
   111  func _Sink[Elem any](ctx context.Context) chan<- Elem {
   112  	r := make(chan Elem)
   113  	go func(ctx context.Context, r <-chan Elem) {
   114  		for {
   115  			select {
   116  			case <-ctx.Done():
   117  				return
   118  			case _, ok := <-r:
   119  				if !ok {
   120  					return
   121  				}
   122  			}
   123  		}
   124  	}(ctx, r)
   125  	return r
   126  }
   127  
   128  // An Exclusive is a value that may only be used by a single goroutine
   129  // at a time. This is implemented using channels rather than a mutex.
   130  type _Exclusive[Val any] struct {
   131  	c chan Val
   132  }
   133  
   134  // _MakeExclusive makes an initialized exclusive value.
   135  func _MakeExclusive[Val any](initial Val) *_Exclusive[Val] {
   136  	r := &_Exclusive[Val]{
   137  		c: make(chan Val, 1),
   138  	}
   139  	r.c <- initial
   140  	return r
   141  }
   142  
   143  // _Acquire acquires the exclusive value for private use.
   144  // It must be released using the Release method.
   145  func (e *_Exclusive[Val]) Acquire() Val {
   146  	return <-e.c
   147  }
   148  
   149  // TryAcquire attempts to acquire the value. The ok result reports whether
   150  // the value was acquired. If the value is acquired, it must be released
   151  // using the Release method.
   152  func (e *_Exclusive[Val]) TryAcquire() (v Val, ok bool) {
   153  	select {
   154  	case r := <-e.c:
   155  		return r, true
   156  	default:
   157  		return v, false
   158  	}
   159  }
   160  
   161  // Release updates and releases the value.
   162  // This method panics if the value has not been acquired.
   163  func (e *_Exclusive[Val]) Release(v Val) {
   164  	select {
   165  	case e.c <- v:
   166  	default:
   167  		panic("_Exclusive Release without Acquire")
   168  	}
   169  }
   170  
   171  // Ranger returns a Sender and a Receiver. The Receiver provides a
   172  // Next method to retrieve values. The Sender provides a Send method
   173  // to send values and a Close method to stop sending values. The Next
   174  // method indicates when the Sender has been closed, and the Send
   175  // method indicates when the Receiver has been freed.
   176  //
   177  // This is a convenient way to exit a goroutine sending values when
   178  // the receiver stops reading them.
   179  func _Ranger[Elem any]() (*_Sender[Elem], *_Receiver[Elem]) {
   180  	c := make(chan Elem)
   181  	d := make(chan struct{})
   182  	s := &_Sender[Elem]{
   183  		values: c,
   184  		done:   d,
   185  	}
   186  	r := &_Receiver[Elem]{
   187  		values: c,
   188  		done:   d,
   189  	}
   190  	runtime.SetFinalizer(r, (*_Receiver[Elem]).finalize)
   191  	return s, r
   192  }
   193  
   194  // A _Sender is used to send values to a Receiver.
   195  type _Sender[Elem any] struct {
   196  	values chan<- Elem
   197  	done   <-chan struct{}
   198  }
   199  
   200  // Send sends a value to the receiver. It reports whether the value was sent.
   201  // The value will not be sent if the context is closed or the receiver
   202  // is freed.
   203  func (s *_Sender[Elem]) Send(ctx context.Context, v Elem) bool {
   204  	select {
   205  	case <-ctx.Done():
   206  		return false
   207  	case s.values <- v:
   208  		return true
   209  	case <-s.done:
   210  		return false
   211  	}
   212  }
   213  
   214  // Close tells the receiver that no more values will arrive.
   215  // After Close is called, the _Sender may no longer be used.
   216  func (s *_Sender[Elem]) Close() {
   217  	close(s.values)
   218  }
   219  
   220  // A _Receiver receives values from a _Sender.
   221  type _Receiver[Elem any] struct {
   222  	values <-chan Elem
   223  	done   chan<- struct{}
   224  }
   225  
   226  // Next returns the next value from the channel. The bool result indicates
   227  // whether the value is valid.
   228  func (r *_Receiver[Elem]) Next(ctx context.Context) (v Elem, ok bool) {
   229  	select {
   230  	case <-ctx.Done():
   231  	case v, ok = <-r.values:
   232  	}
   233  	return v, ok
   234  }
   235  
   236  // finalize is a finalizer for the receiver.
   237  func (r *_Receiver[Elem]) finalize() {
   238  	close(r.done)
   239  }
   240  
   241  func TestReadAll() {
   242  	c := make(chan int)
   243  	go func() {
   244  		c <- 4
   245  		c <- 2
   246  		c <- 5
   247  		close(c)
   248  	}()
   249  	got := _ReadAll(context.Background(), c)
   250  	want := []int{4, 2, 5}
   251  	if !_SliceEqual(got, want) {
   252  		panic(fmt.Sprintf("_ReadAll returned %v, want %v", got, want))
   253  	}
   254  }
   255  
   256  func TestMerge() {
   257  	c1 := make(chan int)
   258  	c2 := make(chan int)
   259  	go func() {
   260  		c1 <- 1
   261  		c1 <- 3
   262  		c1 <- 5
   263  		close(c1)
   264  	}()
   265  	go func() {
   266  		c2 <- 2
   267  		c2 <- 4
   268  		c2 <- 6
   269  		close(c2)
   270  	}()
   271  	ctx := context.Background()
   272  	got := _ReadAll(ctx, _Merge(ctx, c1, c2))
   273  	sort.Ints(got)
   274  	want := []int{1, 2, 3, 4, 5, 6}
   275  	if !_SliceEqual(got, want) {
   276  		panic(fmt.Sprintf("_Merge returned %v, want %v", got, want))
   277  	}
   278  }
   279  
   280  func TestFilter() {
   281  	c := make(chan int)
   282  	go func() {
   283  		c <- 1
   284  		c <- 2
   285  		c <- 3
   286  		close(c)
   287  	}()
   288  	even := func(i int) bool { return i%2 == 0 }
   289  	ctx := context.Background()
   290  	got := _ReadAll(ctx, _Filter(ctx, c, even))
   291  	want := []int{2}
   292  	if !_SliceEqual(got, want) {
   293  		panic(fmt.Sprintf("_Filter returned %v, want %v", got, want))
   294  	}
   295  }
   296  
   297  func TestSink() {
   298  	c := _Sink[int](context.Background())
   299  	after := time.NewTimer(time.Minute)
   300  	defer after.Stop()
   301  	send := func(v int) {
   302  		select {
   303  		case c <- v:
   304  		case <-after.C:
   305  			panic("timed out sending to _Sink")
   306  		}
   307  	}
   308  	send(1)
   309  	send(2)
   310  	send(3)
   311  	close(c)
   312  }
   313  
   314  func TestExclusive() {
   315  	val := 0
   316  	ex := _MakeExclusive(&val)
   317  
   318  	var wg sync.WaitGroup
   319  	f := func() {
   320  		defer wg.Done()
   321  		for i := 0; i < 10; i++ {
   322  			p := ex.Acquire()
   323  			(*p)++
   324  			ex.Release(p)
   325  		}
   326  	}
   327  
   328  	wg.Add(2)
   329  	go f()
   330  	go f()
   331  
   332  	wg.Wait()
   333  	if val != 20 {
   334  		panic(fmt.Sprintf("after Acquire/Release loop got %d, want 20", val))
   335  	}
   336  }
   337  
   338  func TestExclusiveTry() {
   339  	s := ""
   340  	ex := _MakeExclusive(&s)
   341  	p, ok := ex.TryAcquire()
   342  	if !ok {
   343  		panic("TryAcquire failed")
   344  	}
   345  	*p = "a"
   346  
   347  	var wg sync.WaitGroup
   348  	wg.Add(1)
   349  	go func() {
   350  		defer wg.Done()
   351  		_, ok := ex.TryAcquire()
   352  		if ok {
   353  			panic(fmt.Sprintf("TryAcquire succeeded unexpectedly"))
   354  		}
   355  	}()
   356  	wg.Wait()
   357  
   358  	ex.Release(p)
   359  
   360  	p, ok = ex.TryAcquire()
   361  	if !ok {
   362  		panic(fmt.Sprintf("TryAcquire failed"))
   363  	}
   364  }
   365  
   366  func TestRanger() {
   367  	s, r := _Ranger[int]()
   368  
   369  	ctx := context.Background()
   370  	go func() {
   371  		// Receive one value then exit.
   372  		v, ok := r.Next(ctx)
   373  		if !ok {
   374  			panic(fmt.Sprintf("did not receive any values"))
   375  		} else if v != 1 {
   376  			panic(fmt.Sprintf("received %d, want 1", v))
   377  		}
   378  	}()
   379  
   380  	c1 := make(chan bool)
   381  	c2 := make(chan bool)
   382  	go func() {
   383  		defer close(c2)
   384  		if !s.Send(ctx, 1) {
   385  			panic(fmt.Sprintf("Send failed unexpectedly"))
   386  		}
   387  		close(c1)
   388  		if s.Send(ctx, 2) {
   389  			panic(fmt.Sprintf("Send succeeded unexpectedly"))
   390  		}
   391  	}()
   392  
   393  	<-c1
   394  
   395  	// Force a garbage collection to try to get the finalizers to run.
   396  	runtime.GC()
   397  
   398  	select {
   399  	case <-c2:
   400  	case <-time.After(time.Minute):
   401  		panic("_Ranger Send should have failed, but timed out")
   402  	}
   403  }
   404  
   405  func main() {
   406  	TestReadAll()
   407  	TestMerge()
   408  	TestFilter()
   409  	TestSink()
   410  	TestExclusive()
   411  	TestExclusiveTry()
   412  	TestRanger()
   413  }
   414  

View as plain text