Go advanced concurrency patterns: part 4 (unlimited buffer channels)

2024-12-16 - Rob

Preface

It’s been almost 6 years since I added to this series, but I thought it was time to resume working on it.

Since “perfect is the opposite of done” I thought I would talk about a simple pattern that I haven’t covered yet.

A couple of years ago I worked on a toy library to see if the new-ish generics proposal would finally allow to express some of the most common channel patterns in Go.

Turns out it did, so I stopped working on it and decided to wait for the community or the standard library to actually come up with something better.

I’m still not convinced by anything that came out since, so here I am: sharing some of the learnings from my experiments so that you can write your own.

Disclaimer

Probably one of the most common requests that I get from new gophers is “how do I create a channel with unlimited buffer?“. The answer to that is “you don’t”.

The main reason for this is that unlimited channels make code a lot harder to debug and issues that would otherwise be symptomatic become silent until a whole system comes down.

You can find more in this conversation.

That said, there are some rare cases in which such a feature would allow a system to absorb bursty traffic without blocking, or where quickly deallocating or reusing producers is much more efficient than just keeping a buffer of what they produced.

For those rare situations the following code can be used.

Solutions

The nicest API I could think of is in the form of:

func BufferChan[T any](in <-chan T) (out <-chan T)

This allows us to still deal with channels without introducing iterators or things that don’t work well with built in statements like select or that can’t easily be cancelled with context.

The semantics would be:

  • guarantee that all values received on in will be emitted on out
  • when in is closed and all potentially buffered values have been emitted out will be closed.

If you are unfamiliar with the <-chan type declaration, in this case it means that this function guarantees to never send on in and prevents its callers to ever send on out (closing a channel is a send operation).

When order doesn’t matter and the operation is short-lived

If producers are much faster than consumers and so send operations outnumber receive operations, order rarely matters.

For all of those cases where operations are short lived we can use this simple solution:

func BufShortLived[T any](in <-chan T) <-chan T {
 var buf []T
 out := make(chan T)
 go func() {
  defer close(out)         // Close once we are done
  for v := range in {      // Process all input
   select {
   case out <- v:          // Try to send
   default:                // If we fail
    buf = append(buf, v)   // Add to buffer
   }
  }
  for _, v := range buf {  // Input has been closed, emit buffered values.
   out <- v
  }
 }()
 return out
}

This buffers all values that were produced and that the consumers weren’t fast enough to consume, and emits them back once the channel is closed.

Since the buffer is only ever released at the end of operations, this solution might not be for you.

Please still consider it as it is very simple to read and test.

When order does matter, and the operation lasts a while

I still have to see a reasonable use case for ordered unlimited channels, but since there are some people, especially from other languages communities, that insist this is a fundamental feature to have, we might as well talk about it.

I cannot stress enough how much harder this will make your life if you use this in production code, but this is a blogpost, not a cop, so you do you.

When reading this code consider that operations on nil channels block forever. A select case with a nil channel is, therefore, never selected.

Here nout will be nil if and only if there are no buffered values to send, otherwise it will be equal to out. We use it to switch on and off its case.

func BufLongLivedOrdered[T any](in <-chan T) <-chan T {
 out := make(chan T)
 go func() {
  defer close(out)
  var (
   queue []T     // This is our buffer
   next  T       // This is the next value to be sent
   nout  chan T  // This is a nil-lable reference to "out"
  )
  for in != nil || nout != nil { // Until both "in" and "out" have been disabled.
   select {
   case v, ok := <-in: // If we receive
    if !ok {           // and "in" has been closed
     in = nil          // disable this case.
     continue
    }
    if nout == nil { // If "out" is disabled
     nout = out      // enable it
     next = v        // and prepare the value to send.
    } else {
     queue = append(queue, v) // otherwise we have already stuff queued, append
    }
   case nout <- next:    // If we send a value
    if len(queue) == 0 { // and ran out of buffer
     nout = nil          // disable this case
     continue
    }
    next = queue[0] // otherwise prepare next value to send.
    queue = queue[1:] // Consume the buffer.
   }
  }
 }()
 return out
}

What I like about this solution is that it doesn’t make use of mutexes or conditions to protect the buffer or wake up waiters, but solely relies on channel semantics.

Since only one goroutine is ever going to touch in and out we don’t need to guard against potential race conditions.

Improving on it

One minor issue with the solution above is that we give equal priority to sending buffered values and receiving new values.

This might, in turn, cause buffers to grow more than necessary. To avoid this, a preamble that is a copy of the second case can be added at the beginning of the for, just with a default to not block on it:

  for in != nil || nout != nil { // Same as before
   select { // Added block
   case nout <- next:
    if len(queue) == 0 {
     nout = nil
     continue // This cannot be a break statement, we need to check the exit condition
    }
    next = queue[0]
    queue = queue[1:]
    continue
   default:
      // If we get here no consumers were ready, or no value was buffered,
      // just continue.
   }
   // Same select as above
   select {

This will make the code above always try to send before it tries to both send and receive at once. In some rare cases this might actually reduce the buffer size.

Full code

If you just came here for the code, here it is:

func BufLongLived[T any](in <-chan T) <-chan T {
	out := make(chan T)
	go func() {
		defer close(out)
		var (
			queue []T
			next  T
			nout  chan T
		)
		for in != nil || nout != nil {
			select {
			case nout <- next:
				if len(queue) == 0 {
					nout = nil
				  continue
				}
				next = queue[0]
				queue = queue[1:]
				continue
			default:
			}
			select {
			case v, ok := <-in:
				if !ok {
					in = nil
					continue
				}
				if nout == nil {
					nout = out
					next = v
				} else {
					queue = append(queue, v)
				}
			case nout <- next:
				if len(queue) == 0 {
					nout = nil
					continue
				}
				next = queue[0]
				queue = queue[1:]
			}
		}
	}()
	return out
}

Conclusions

In 31 lines of code we implemented a channel operator that gives us a new ordered, unlimited buffer channel from an existing one.

I found this quite tricky to get right and I still think that there are rare use cases for this, in fact, in my career I have yet to find one. I think you’re better off trying to find a good size for your buffers or reconsider your architecture before you resort to this.

That said, I hope it was useful!

Edit(20 Dec 2024) One minor difference

As someone on reddit pointed out, there is a minor but important difference between my implementation and a “real” infinite buffer channel: when a value is received from the source channel it’s not immediately sent on the out one.

This means that it is possible to observe a situation in which a sender has successfully sent a value on in but the receiver fails to receive it without waiting (e.g. with a select with a default case), which would be impossible to reproduce with a “real” channel.

Example test to reproduce the issue:

// Real Go channel
ch := make(chan int, 1)

mu := sync.Mutex{}
mu.Lock()

go func() {
	ch <- 1
	mu.Unlock()
}()

mu.Lock()
// We now know for sure the value 1 is in the buffer
mu.Unlock()

select {
case <-ch:
// This will always be selected
default:
	panic("unreachable with real Go channels")
}
src := make(chan int)
// A channel built with my solution
ch := BufLongLived(src)

mu := sync.Mutex{}
mu.Lock()

go func() {
	src <- 1
	mu.Unlock()
}()

mu.Lock()
mu.Unlock()

// We don't know which branch we'll pick
select {
case <-ch:
default:
}

If your code relies on this channel guarantee, please consider using a real channel instead.

Edit (21 Dec 2024)

As my friend Roger Peppe rightfully pointed out, using a sliding window on a slice as a queue implementation might lead to some suboptimal performance, especially if several million values need to pass through this channel (causing more allocations than necessary). This is mostly due to the capacity at the beginning of the slice being lost once we advance.

I still insist that no one should rely on this implementation for anything that big for the aforementioned reasons, but I got this far, so might as well try to be precise.

I, thus, propose the following implementation that takes a queue as an argument:

type Queue[T any] interface {
	Len() int

	// If your implementation is self-shrinking
	// these two methods are not necessary.
	Cap() int
	SetCap(v int)

	Dequeue() (t T)
	Enqueue(t T)
}

func BufLongLivedCustomQueue[T any](in <-chan T, q Queue[T]) <-chan T {
	out := make(chan T)
	go func() {
		defer close(out)
		var (
			next T
			nout chan T
		)
		for in != nil || nout != nil {
			if q.Len() < q.Cap()/4 {
				// Shrink if the number of elements has reduced a lot.
				// You might want to do rely on the underlying implementation to
				// resize instead of adding custom code here.
				q.SetCap(q.Len() * 2)
			}
			select {
			case nout <- next:
				if q.Len() == 0 {
					nout = nil
					continue
				}
				next = q.Dequeue()
				continue
			default:
			}
			select {
			case v, ok := <-in:
				if !ok {
					in = nil
					continue
				}
				if nout == nil {
					nout = out
					next = v
				} else {
					q.Enqueue(v)
				}
			case nout <- next:
				if q.Len() == 0 {
					nout = nil
					continue
				}
				next = q.Dequeue()
			}
		}
	}()
	return out
}

Again, many thanks to Roger that not only proofread this post, but also helped me figure out what was wrong with my benchmarks (copy-pasta is gonna kill me one day).

Happy hacking!