This article is part of a series of articles about channels in.NET. Of course, it’s best to start with Part 1, but you can skip anywhere you want using the links below. This series of articles are my translation, translation is also spontaneous, not literal translation, good English can go to see the original, translation can be freely reproduced, but please indicate the source!

  • Part 1 — Getting started with channels
  • Part 2 — Advanced channels
  • Part 3 — Understanding back pressure

1, review

In our previous article, we looked at some simple examples of how Channels works and saw some nifty features, but for the most part, it’s pretty similar to any other Queue implementation.

So let’s dive into some more advanced topics.

All right, that sounds fancy, but a lot of things are pretty simple. Still, it’s worth exploring in order to get more valuable information!

2, read and write separation

You may have heard most of the database read and write separation, MMM, a lot of concepts are very similar, is the so-called, great minds think alike, slightly the same!

If you’ve ever shared queues between two classes, you know that both classes can read/write to queues, even if they shouldn’t. Such as:

The following example is an example where true read/write separation is not possible.

class MyProducer
{
    private readonly Queue<int> _queue;

    public MyProducer(Queue<int> queue){ _queue = queue; }}class MyConsumer
{
    private readonly Queue<int> _queue;

    public MyConsumer(Queue<int> queue){ _queue = queue; }}Copy the code

Thus, although producers should only write queues and consumers should only read queues, in both production and consumer classes that use queues, they can do all operations on queues.

Although you can tell the developers not to do it, there may be a wayward/bad-tempered developer who will oppose you, or they won’t even hear it, and only code review can stop them from doing it.

But with channels, we don’t have to worry about such things.

class Program
{
    static async Task Main(string[] args)
    {
        var myChannel = Channel.CreateUnbounded<int> ();var producer = new MyProducer(myChannel.Writer);
        var consumer = newMyConsumer(myChannel.Reader); }}class MyProducer
{
    private readonly ChannelWriter<int> _channelWriter;

    public MyProducer(ChannelWriter<int> channelWriter){ _channelWriter = channelWriter; }}class MyConsumer
{
    private readonly ChannelReader<int> _channelReader;

    public MyConsumer(ChannelReader<int> channelReader){ _channelReader = channelReader; }}Copy the code

In this example, I added a main method to show you how to create a writer/reader, which is very simple.

Here we can see that for the producer, I only pass it a ChannelWriter, so it can only do writes.

For the consumer, we pass it a ChannelReader, so it can only read.

Of course, this does not mean that other developers can’t modify the code and start injection root Channel object, or at the same time the incoming ChannelWriter/ChannelReader, but at least it is much better than before.

3. Close a channel

As we saw earlier, when we call ReadAsync () on the channel, it actually sits there waiting for a message, but what if no more messages come?

Maybe this is a one-time batch job and the batch is complete.

Typically, for other queues in.NET, we will have to pass some kind of shared Boolean or CancellationToken. But using Channels, it’s easier.

Examples:

static async Task Main(string[] args)
{
    var myChannel = Channel.CreateUnbounded<int> (); _ = Task.Factory.StartNew(async() = > {for (int i = 0; i < 10; i++)
        {
            await myChannel.Writer.WriteAsync(i);
        }

        myChannel.Writer.Complete();
    });

    try
    {
        while (true)
        {
            var item = await myChannel.Reader.ReadAsync();
            Console.WriteLine(item);
            await Task.Delay(1000);
        }
    }catch(ChannelClosedException e)
    {
        Console.WriteLine("Channel was closed!"); }}Copy the code

In the example above, write to the channel as quickly as possible and then complete it.

Our consumer would then delay one second between reads, reading at a slow rate.

Note that we caught the ChannelClosedExecption, which is returned when you try to read the final message in the closed channel.

Writing this code, I also want to tell you that calling Complete () on a channel does not immediately close the channel and destroy everything read from it.

Instead, it’s a way of notifying all readers that once the last message has been read, it’s done.

This is important because it means that while waiting for a new message, it doesn’t matter if Complete () is called if the queue is empty, the queue is full, and so on.

We can be sure that we will end up consuming all available messages.

4 use IAsyncEnumerable

Do you have any phobia? Yes, the part of the code above that seems uncomfortable is the need to catch exceptions!

“Isn’t there a better way?”

“Don’t worry, a little bit, there will be bread and milk!”

Using the “ReadAllAsync ()” command that returns IAsyncEnumerable helps solve these problems. The code looks like this:

static async Task Main(string[] args)
{
    var myChannel = Channel.CreateUnbounded<int> (); _ = Task.Factory.StartNew(async() = > {for (int i = 0; i < 10; i++)
        {
            await myChannel.Writer.WriteAsync(i);
        }

        myChannel.Writer.Complete();
    });

    await foreach(var item in myChannel.Reader.ReadAllAsync())
    {
        Console.WriteLine(item);
        await Task.Delay(1000); }}Copy the code

Now, it’s really perfect programming.

Since we are using IAsyncEnumerable, we can still wait for each item as before, but we no longer need to catch the exception because when the channel is finished, it just says it has nothing left and exits gracefully.

Again, this eliminates some of the messy code you once had to write when dealing with queues. Where before you had to write some kind of infinite loop using the Breakout clause, now it’s just a really neat loop!

5. What’s next

So far, we’ve been using the “unlimited” channel. As you might have guessed, the BoundedChannel is an option. But what is this? And how does the term “back pressure” relate to it? Check out the next part of this series to better understand Backpressure.

6, summary

Choose this topic, but also for common familiarity, common progress! Pay attention to the landlord, do not get lost, I am the net dust.