2024-01-18 23:16:21 +01:00
|
|
|
const std = @import("std");
|
|
|
|
const assert = std.debug.assert;
|
|
|
|
const atomic = std.atomic;
|
|
|
|
const Futex = std.Thread.Futex;
|
|
|
|
|
2024-01-19 02:02:59 +01:00
|
|
|
const log = std.log.scoped(.queue);
|
|
|
|
|
2024-01-19 19:28:48 +01:00
|
|
|
/// Thread safe. Fixed size. Blocking push and pop.
|
2024-01-18 23:16:21 +01:00
|
|
|
pub fn Queue(
|
|
|
|
comptime T: type,
|
|
|
|
comptime size: usize,
|
|
|
|
) type {
|
|
|
|
return struct {
|
|
|
|
buf: [size]T = undefined,
|
|
|
|
|
|
|
|
read_index: usize = 0,
|
|
|
|
write_index: usize = 0,
|
|
|
|
|
|
|
|
mutex: std.Thread.Mutex = .{},
|
|
|
|
// blocks when the buffer is full or empty
|
|
|
|
futex: atomic.Value(u32) = atomic.Value(u32).init(0),
|
|
|
|
|
|
|
|
const Self = @This();
|
|
|
|
|
|
|
|
pub fn pop(self: *Self) T {
|
|
|
|
self.mutex.lock();
|
|
|
|
defer self.mutex.unlock();
|
|
|
|
if (self.isEmpty()) {
|
|
|
|
// If we don't have any items, we unlock and wait
|
|
|
|
self.mutex.unlock();
|
|
|
|
Futex.wait(&self.futex, 0);
|
|
|
|
// regain our lock
|
|
|
|
self.mutex.lock();
|
|
|
|
}
|
|
|
|
if (self.isFull()) {
|
|
|
|
// If we are full, wake up the push
|
2024-02-03 17:05:42 +01:00
|
|
|
Futex.wake(&self.futex, 1);
|
2024-01-18 23:16:21 +01:00
|
|
|
}
|
|
|
|
const i = self.read_index;
|
|
|
|
self.read_index += 1;
|
|
|
|
self.read_index = self.read_index % self.buf.len;
|
|
|
|
return self.buf[i];
|
|
|
|
}
|
|
|
|
|
2024-01-22 16:33:23 +01:00
|
|
|
/// push an item into the queue. Blocks until the message has been put
|
|
|
|
/// in the queue
|
2024-01-18 23:16:21 +01:00
|
|
|
pub fn push(self: *Self, item: T) void {
|
|
|
|
self.mutex.lock();
|
|
|
|
defer self.mutex.unlock();
|
|
|
|
if (self.isFull()) {
|
|
|
|
self.mutex.unlock();
|
|
|
|
Futex.wait(&self.futex, 0);
|
|
|
|
self.mutex.lock();
|
|
|
|
}
|
|
|
|
if (self.isEmpty()) {
|
2024-02-03 17:05:42 +01:00
|
|
|
Futex.wake(&self.futex, 1);
|
2024-01-18 23:16:21 +01:00
|
|
|
}
|
|
|
|
const i = self.write_index;
|
|
|
|
self.write_index += 1;
|
|
|
|
self.write_index = self.write_index % self.buf.len;
|
|
|
|
self.buf[i] = item;
|
|
|
|
}
|
|
|
|
|
2024-01-22 16:33:23 +01:00
|
|
|
/// push an item into the queue. If the queue is full, this returns
|
|
|
|
/// immediately and the item has not been place in the queue
|
|
|
|
pub fn tryPush(self: *Self, item: T) bool {
|
|
|
|
self.mutex.lock();
|
|
|
|
if (self.isFull()) {
|
|
|
|
self.mutex.unlock();
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
self.mutex.unlock();
|
|
|
|
self.push(item);
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2024-01-18 23:16:21 +01:00
|
|
|
/// Returns `true` if the ring buffer is empty and `false` otherwise.
|
2024-01-22 16:33:23 +01:00
|
|
|
fn isEmpty(self: Self) bool {
|
2024-01-18 23:16:21 +01:00
|
|
|
return self.write_index == self.read_index;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns `true` if the ring buffer is full and `false` otherwise.
|
2024-01-22 16:33:23 +01:00
|
|
|
fn isFull(self: Self) bool {
|
2024-01-18 23:16:21 +01:00
|
|
|
return self.mask2(self.write_index + self.buf.len) == self.read_index;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns the length
|
2024-01-22 16:33:23 +01:00
|
|
|
fn len(self: Self) usize {
|
2024-01-18 23:16:21 +01:00
|
|
|
const wrap_offset = 2 * self.buf.len * @intFromBool(self.write_index < self.read_index);
|
|
|
|
const adjusted_write_index = self.write_index + wrap_offset;
|
|
|
|
return adjusted_write_index - self.read_index;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns `index` modulo the length of the backing slice.
|
2024-01-22 16:33:23 +01:00
|
|
|
fn mask(self: Self, index: usize) usize {
|
2024-01-18 23:16:21 +01:00
|
|
|
return index % self.buf.len;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns `index` modulo twice the length of the backing slice.
|
2024-01-22 16:33:23 +01:00
|
|
|
fn mask2(self: Self, index: usize) usize {
|
2024-01-18 23:16:21 +01:00
|
|
|
return index % (2 * self.buf.len);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
test "Queue: simple push / pop" {
|
|
|
|
var queue: Queue(u8, 16) = .{};
|
|
|
|
queue.push(1);
|
|
|
|
const pop = queue.pop();
|
|
|
|
try std.testing.expectEqual(1, pop);
|
|
|
|
}
|