Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proposal: join channels #60829

Closed
rhansen opened this issue Jun 15, 2023 · 8 comments
Closed

proposal: join channels #60829

rhansen opened this issue Jun 15, 2023 · 8 comments
Labels
Milestone

Comments

@rhansen
Copy link

rhansen commented Jun 15, 2023

I would like to be able to connect (join, merge, pipe, mux, link) one or more input channels to a single output channel in a way that does not introduce unintentional buffering. For example, I would like to be able to write the following function:

// Join connects zero or more input channels to a single output channel:
//   - A value received from an open input channel in inChs is immediately sent
//     on the output channel outCh.
//   - If sending to the output channel would block, no input channel receive
//     occurs.
//   - Once no open input channels remain, the output channel is closed.
func Join[T any](outCh chan<-T, inChs <-chan T...) {
	// Magic goes here.
}

It is possible to approximate the desired behavior using reflect.Select:

import "reflect"

func Join[T any](outCh chan<- T, inChs ...<-chan T) {
	cases := make([]reflect.SelectCase, 0, len(inChs))
	for _, ch := range inChs {
		cases = append(cases, reflect.SelectCase{
			Dir:  reflect.SelectRecv,
			Chan: reflect.ValueOf(ch),
		})
	}
	go func() {
		defer close(outCh)
		for len(cases) > 0 {
			n, v, ok := reflect.Select(cases)
			if !ok {
				cases = append(cases[:n], cases[n+1:]...)
				continue
			}
			outCh <- v.Interface().(T)
		}
	}()
}

However, the above does not satisfy the second bullet point:

  • If sending to the output channel would block, no input channel receive occurs.

This is because the local variable v acts like a channel buffer slot, effectively increasing the output channel's buffer size by one. This matters when send blocking is used for synchronization, as feedback for flow control (backpressure), for implicit acknowledgement, to limit concurrency, etc.

I'm not sure what the solution should look like. Brainstorming some ideas:

  • Add a new function to a standard package somewhere.
  • Overload append: append(outCh, inChs...)
  • Introduce a new "receive and send" operator:
    select {
    case v, ok := outCh <<- inCh1:
    	if ok {
    		// v was sent from inCh1 to outCh
    	} else {
    		// inCh1 is closed (no value was sent to outCh)
    	}
    case v := outCh <<- inCh2:
    	// Do something with v.
    case outCh <<- inCh3:
    }

It would also be nice to be able to transform an input value (perhaps to a different type) before it is sent to the output channel while preserving the "receive only happens if sent" behavior. I suspect that would open a can of worms, so it's probably best to leave that as a non-goal for this proposal. At least it would be good if the chosen solution for this proposal did not preclude such transformations as a future enhancement.

@gopherbot gopherbot added this to the Proposal milestone Jun 15, 2023
@seankhliao
Copy link
Member

see also #50324

@ianlancetaylor
Copy link
Contributor

I'm not entirely sure that what you want can be implemented in a reasonable way. You want to be able to block a goroutine until both a channel is ready to receive and at least one of a set of channels is ready to send. But in the general case other goroutines are sending and receiving on those channels. Today we can just put a goroutine to sleep until a channel is ready, then if we lock the channel, and it is still ready, we can proceed. To implement this scheme, we would need to check for the other side of the communication, and if that is not ready, we have to unlock the channel, and wait. But now that channel is ready, so shouldn't we just wake up again? So I guess we have to instead wait on the other channel(s), and hope that the first one is ready. It seems easy for a goroutine like that to starve.

You said what you want, but you didn't say why you wanted it. When does an application need a synchronization mechanism more powerful than your version of Join?

@rhansen
Copy link
Author

rhansen commented Jun 17, 2023

You said what you want, but you didn't say why you wanted it.

In the past, I've wanted to connect an existing <-chan T to an existing chan<- T without any buffering between them. I don't remember the specifics, but IIRC the situation arose due to poor API design and was resolved by yak shaving a 3rd party library to change a function to take a <-chan T argument rather than return a chan<- T.

My motivation for this right now is to speed up and simplify a complex select loop with a mix of static and dynamic cases. Some benchmarks I wrote show about 2x overhead when using reflect.Select vs. a select statement, though the overhead is still small relative to the overall amount of work. More significant is the complexity and unreadability of the code required to manage the []reflect.SelectCase argument. If there was a way to bufferlessly join channels I could replace the reflect.Select with a select statement and drastically simplify the code. I can't use the above Join function (from the OP) because the application is sensitive to the buffering it would introduce.

I'm not entirely sure that what you want can be implemented in a reasonable way.

I'm not very familiar with Go internals so please forgive my ignorance, but I think the join could be implemented purely in the channel send logic. There's no need for an intermediate goroutine that relays values from input to output.

Spitballing:

  • Modify the channel data structure to add a list of connected output channels. (This list is usually empty, sometimes it has a single entry, and very rarely does it have more than one entry.)
  • Join(outCh, inCh) would add outCh to inCh's list of connected output channels. (There's no need to touch outCh.)
  • The list of connected output channels can be thought of as edges in a DAG. Join panics if a cycle is detected.
  • Sending on a channel would look for a waiting receiver or empty buffer slot on any of the downstream channels in the DAG (including the channel itself).
  • If none is found, the goroutine enqueues itself on all of the downstream channels in the DAG and waits.
  • After the sender is woken up and the send completes, it dequeues itself from the other downstream channels in the DAG.

Would something like that be workable?

Some open issues:

  • How would joining work with buffered upstream channels? (I don't think we need to worry about buffered downstream channels. There shouldn't be a need to look upstream for a blocked sender when a downstream buffer slot is vacated—the blocked sender should already be in the downstream channel's wait queue.) Some options:
    • The join operation could reject buffered upstream channels (panic).
    • Backlinks could be added from downstream to upstream so that reads on the downstream channels can look upstream for a buffered value.
    • The sender wait queue could be extended to support either a goroutine or a buffered channel.
  • Should there be a way to unjoin channels? Or query the list of downstream next hop channels?
  • What should happen if the DAG changes while a goroutine is blocked?
  • Would this approach work for single-use joins (for something like v, ok := outCh <<- inCh)? Is that something we would want?

@ianlancetaylor
Copy link
Contributor

My motivation for this right now is to speed up and simplify a complex select loop with a mix of static and dynamic cases.

Why is the exact buffering behavior that you describe important for this use case? It seems to me that you can't get that buffering behavior from reflect.Select. Thanks.

Thanks for the implementation suggestion. What is supposed to happen if c1 is joined to c2, and c2 is joined to c3, and somebody does a select writing to all three channels?

@rsc
Copy link
Contributor

rsc commented Jul 12, 2023

This proposal has been added to the active column of the proposals project
and will now be reviewed at the weekly proposal review meetings.
— rsc for the proposal review group

@willfaught
Copy link
Contributor

Would making the channel sizes one less solve the problem?

Would checking the length of the send channel before trying to receive a value work?

@rsc
Copy link
Contributor

rsc commented Jul 19, 2023

Based on the discussion above, this proposal seems like a likely decline.
— rsc for the proposal review group

@rsc
Copy link
Contributor

rsc commented Jul 26, 2023

No change in consensus, so declined.
— rsc for the proposal review group

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: Declined
Development

No branches or pull requests

6 participants