WebSocket Channel Queues
High-performance Channel queues for threadsafe WebSocket management.
Last year I wrote a few articles about WebSockets (the important topics being closing WebSockets and Kestrel hosting). Those articles used the .NET BlockingCollection
object to share data between threads – basically a publish/subscribe model. With just a few minor changes, the modern, high-performance, Span
-based Channel
class simplifies and improves those examples.
Like the earlier articles, I have added new projects to the WebSocketExample solution found in the same GitHub repository used by my previous posts about WebSockets, namely WebSocketExample. Those projects now target .NET Core 3.1. The repository still contains the projects from the older articles based on the obsolete HttpListener
class, but I have removed them from the solution.
The Client Application
The new project ChannelWSClient
is almost identical to the BlockingCollection
project from last year, WebSocketClient
. Instead of declaring a BlockingCollection<string>
variable, we declare a Channel<string>
variable:
1
2
// private static BlockingCollection<string> KeystrokeQueue = new BlockingCollection<string>();
private static Channel<string> KeystrokeQueue;
This time I’m creating the instance in the startup method, since the new-up process is a bit longer and a bit more complex. Channels have “bounded” and “unbounded” versions. A bounded channel is of a limited size – channel-writers can call a method that either fails if the channel is full, or another method that waits until the channel is available for writing again, and the channel definition includes options about how to handle write-requests when the channel is full. However, we’re using an unbounded channel, meaning it can hold an unlimited amount of data. Most articles warn you that if channel-readers are too slow, you risk memory exhaustion – it is truly unbounded.
We also specify two options in the call to the channel factory – SingleReader
and SingleWriter
are both true. Various optimizations / assumptions can be applied inside the channel object when it knows concurrent reads and/or writes will never occur.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static async Task StartAsync(Uri wsUri)
{
Console.WriteLine($"Connecting to server {wsUri.ToString()}");
KeystrokeQueue = Channel.CreateUnbounded<string>(
new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = true
});
SocketLoopTokenSource = new CancellationTokenSource();
// ...etc.
}
The next change adds a user’s keystroke to the queue. The old code is shown commented out, with the replacement immediately following. Even though the method is named TryWrite
, it is guaranteed to succeed when writing to an unbounded channel.
1
2
3
4
5
// public static void QueueKeystroke(string message)
// => KeystrokeQueue.Add(message);
public static void QueueKeystroke(string message)
=> KeystrokeQueue.Writer.TryWrite(message);
The final change is in the KeystrokeTransmitLoopAsync
method. Originally the main processing loop looked like this (excluding error handling):
1
2
3
4
5
6
7
8
9
while(!cancellationToken.IsCancellationRequested)
{
await Task.Delay(Program.KEYSTROKE_TRANSMIT_INTERVAL_MS, cancellationToken);
if (!cancellationToken.IsCancellationRequested && KeystrokeQueue.TryTake(out var message))
{
var msgbuf = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));
await Socket.SendAsync(msgbuf, WebSocketMessageType.Text, endOfMessage: true, CancellationToken.None);
}
}
The channel-based implementation eliminates the delay. Instead we asynchronously wait for data to show up in the channel with a call to WaitToReadAsync
. When that returns true
we then call ReadAsync
to retrieve the data. The new loop (again sans error handling, which hasn’t changed) looks like this:
1
2
3
4
5
6
7
8
9
while(!cancellationToken.IsCancellationRequested)
{
while(await KeystrokeQueue.Reader.WaitToReadAsync(cancellationToken))
{
string message = await KeystrokeQueue.Reader.ReadAsync(cancellationToken);
var msgbuf = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));
await Socket.SendAsync(msgbuf, WebSocketMessageType.Text, endOfMessage: true, CancellationToken.None);
}
}
That’s all it takes. And believe it or not, even though that Task.Delay
was only 100ms, this already feels more responsive. The changes in the Kestrel web server project are similarly simple.
The Kestrel Application
Once again, the new ChannelWSServer
project is almost identical to the older BlockingCollection
-based project, KestrelWebSocketServer
. The ConnectedClient
class is used to track individual clients. As we did earlier, we declare a Channel<string>
variable to queue our data:
1
2
// public BlockingCollection<string> BroadcastQueue { get; } = new BlockingCollection<string>();
public Channel<string> BroadcastQueue { get; private set; }
This time we create the channel in the class constructor. Notice in the server we’re setting SingleWriter
to false. The server has two threads running – one which echoes user keystrokes, but also a timer-based service which periodically emits a timestamp to all connected clients. This means two separate threads could try to write to the channel simultaneously, so we must disable the single-writer optimizations.
1
2
3
4
5
6
7
8
9
10
11
12
public ConnectedClient(int socketId, WebSocket socket, TaskCompletionSource<object> taskCompletion)
{
SocketId = socketId;
Socket = socket;
TaskCompletion = taskCompletion;
BroadcastQueue = Channel.CreateUnbounded<string>(
new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = false
});
}
We then have the exact same processing loop in BroadcastLoopAsync
that we saw in the client code (again, with error handling omitted for brevity):
1
2
3
4
5
6
7
8
9
10
while (!cancellationToken.IsCancellationRequested)
{
while(await BroadcastQueue.Reader.WaitToReadAsync(cancellationToken))
{
string message = await BroadcastQueue.Reader.ReadAsync();
Console.WriteLine($"Socket {SocketId}: Sending from queue.");
var msgbuf = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));
await Socket.SendAsync(msgbuf, WebSocketMessageType.Text, endOfMessage: true, CancellationToken.None);
}
}
Over in the WebSocketMiddleware
class, we have to change how the Broadcast
method puts data into the queue. The commented-out lines are the old loop, followed by the channel-based version. Once again we use TryWrite
because unbounded queues are guaranteed to be available for writes:
1
2
3
4
5
// foreach (var kvp in Clients)
// kvp.Value.BroadcastQueue.Add(message);
foreach (var kvp in Clients)
kvp.Value.BroadcastQueue.Writer.TryWrite(message);
The final change is to alter the echo feature in SocketProcessingLoopAsync
. The old code is commented out and the channel version follows it:
1
2
3
4
5
6
if (client.Socket.State == WebSocketState.Open)
{
string message = Encoding.UTF8.GetString(buffer.Array, 0, receiveResult.Count);
//client.BroadcastQueue.Add(message);
client.BroadcastQueue.Writer.TryWrite(message);
}
It’s that easy.
Channel Completion
One Channel
feature I’m not using here is “completion” – it is possible for the channel writer to essentially close the channel, which signals to the reader that no messages will ever be written again. Because of the way WebSocket lifecycles work, it’s not useful here, but in other pub-sub scenarios it may be useful to allow the reader processes to release resources and end processing. The completion is an awaitable task on the Reader.Completion
property, and the writer would call Write.TryComplete
.
Conclusion
I actually started playing around with channels while trying to improve image-processing throughput on a Raspberry Pi-based security camera. It dawned on me that these would be ideal for managing the queues in a WebSocket application (which I’ll also need for my Pi project). Even though I’d already used the Channel
class, I was still pretty amazed at how easy it was to migrate these projects.
There are other interesting usage-patterns with the Channel
class that are worth reading about. I found this article by Nicholas Portmann to be an especially easy read with a lot of interesting tips and commentary.
Comments