queue: fix thread safety
Fix thread safety with changes by leeward on IRC. leeward put this code on sourcehut here: https://hg.sr.ht/~nmichaels/threadsafeq
This commit is contained in:
parent
dff38f84d1
commit
cf71bb4e7c
1 changed files with 235 additions and 41 deletions
278
src/queue.zig
278
src/queue.zig
|
@ -1,7 +1,7 @@
|
||||||
const std = @import("std");
|
const std = @import("std");
|
||||||
const assert = std.debug.assert;
|
const assert = std.debug.assert;
|
||||||
const atomic = std.atomic;
|
const atomic = std.atomic;
|
||||||
const Futex = std.Thread.Futex;
|
const Condition = std.Thread.Condition;
|
||||||
|
|
||||||
const log = std.log.scoped(.queue);
|
const log = std.log.scoped(.queue);
|
||||||
|
|
||||||
|
@ -17,56 +17,56 @@ pub fn Queue(
|
||||||
write_index: usize = 0,
|
write_index: usize = 0,
|
||||||
|
|
||||||
mutex: std.Thread.Mutex = .{},
|
mutex: std.Thread.Mutex = .{},
|
||||||
// blocks when the buffer is full or empty
|
// blocks when the buffer is full
|
||||||
futex: atomic.Value(u32) = atomic.Value(u32).init(0),
|
not_full: Condition = .{},
|
||||||
|
// ...or empty
|
||||||
|
not_empty: Condition = .{},
|
||||||
|
|
||||||
const Self = @This();
|
const Self = @This();
|
||||||
|
|
||||||
/// pop an item from the queue. Blocks until an item is available
|
/// Pop an item from the queue. Blocks until an item is available.
|
||||||
pub fn pop(self: *Self) T {
|
pub fn pop(self: *Self) T {
|
||||||
self.mutex.lock();
|
self.mutex.lock();
|
||||||
defer self.mutex.unlock();
|
defer self.mutex.unlock();
|
||||||
if (self.isEmpty()) {
|
while (self.isEmptyLH()) {
|
||||||
// If we don't have any items, we unlock and wait
|
self.not_empty.wait(&self.mutex);
|
||||||
self.mutex.unlock();
|
|
||||||
Futex.wait(&self.futex, 0);
|
|
||||||
// regain our lock
|
|
||||||
self.mutex.lock();
|
|
||||||
}
|
}
|
||||||
if (self.isFull()) {
|
std.debug.assert(!self.isEmptyLH());
|
||||||
// If we are full, wake up the push
|
if (self.isFullLH()) {
|
||||||
Futex.wake(&self.futex, 1);
|
// If we are full, wake up a push that might be
|
||||||
}
|
// waiting here.
|
||||||
const i = self.read_index;
|
self.not_full.signal();
|
||||||
self.read_index += 1;
|
|
||||||
self.read_index = self.read_index % self.buf.len;
|
|
||||||
return self.buf[i];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// push an item into the queue. Blocks until the item has been put in
|
const result = self.buf[self.mask(self.read_index)];
|
||||||
/// the queue
|
self.read_index = self.mask2(self.read_index + 1);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Push an item into the queue. Blocks until an item has been
|
||||||
|
/// put in the queue.
|
||||||
pub fn push(self: *Self, item: T) void {
|
pub fn push(self: *Self, item: T) void {
|
||||||
self.mutex.lock();
|
self.mutex.lock();
|
||||||
defer self.mutex.unlock();
|
defer self.mutex.unlock();
|
||||||
if (self.isFull()) {
|
while (self.isFullLH()) {
|
||||||
self.mutex.unlock();
|
self.not_full.wait(&self.mutex);
|
||||||
Futex.wait(&self.futex, 0);
|
|
||||||
self.mutex.lock();
|
|
||||||
}
|
}
|
||||||
if (self.isEmpty()) {
|
if (self.isEmptyLH()) {
|
||||||
Futex.wake(&self.futex, 1);
|
// If we were empty, wake up a pop if it was waiting.
|
||||||
|
self.not_empty.signal();
|
||||||
}
|
}
|
||||||
const i = self.write_index;
|
std.debug.assert(!self.isFullLH());
|
||||||
self.write_index += 1;
|
|
||||||
self.write_index = self.write_index % self.buf.len;
|
self.buf[self.mask(self.write_index)] = item;
|
||||||
self.buf[i] = item;
|
self.write_index = self.mask2(self.write_index + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// push an item into the queue. Returns true when the item was
|
/// Push an item into the queue. Returns true when the item
|
||||||
/// successfully placed in the queue
|
/// was successfully placed in the queue, false if the queue
|
||||||
|
/// was full.
|
||||||
pub fn tryPush(self: *Self, item: T) bool {
|
pub fn tryPush(self: *Self, item: T) bool {
|
||||||
self.mutex.lock();
|
self.mutex.lock();
|
||||||
if (self.isFull()) {
|
if (self.isFullLH()) {
|
||||||
self.mutex.unlock();
|
self.mutex.unlock();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -75,10 +75,11 @@ pub fn Queue(
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// pop an item from the queue. Returns null when no item is available
|
/// Pop an item from the queue. Returns null when no item is
|
||||||
|
/// available.
|
||||||
pub fn tryPop(self: *Self) ?T {
|
pub fn tryPop(self: *Self) ?T {
|
||||||
self.mutex.lock();
|
self.mutex.lock();
|
||||||
if (self.isEmpty()) {
|
if (self.isEmptyLH()) {
|
||||||
self.mutex.unlock();
|
self.mutex.unlock();
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -86,19 +87,33 @@ pub fn Queue(
|
||||||
return self.pop();
|
return self.pop();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns `true` if the ring buffer is empty and `false` otherwise.
|
fn isEmptyLH(self: Self) bool {
|
||||||
fn isEmpty(self: Self) bool {
|
|
||||||
return self.write_index == self.read_index;
|
return self.write_index == self.read_index;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns `true` if the ring buffer is full and `false` otherwise.
|
fn isFullLH(self: Self) bool {
|
||||||
fn isFull(self: Self) bool {
|
return self.mask2(self.write_index + self.buf.len) ==
|
||||||
return self.mask2(self.write_index + self.buf.len) == self.read_index;
|
self.read_index;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns `true` if the queue is empty and `false` otherwise.
|
||||||
|
pub fn isEmpty(self: *Self) bool {
|
||||||
|
self.mutex.lock();
|
||||||
|
defer self.mutex.unlock();
|
||||||
|
return self.isEmptyLH();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns `true` if the queue is full and `false` otherwise.
|
||||||
|
pub fn isFull(self: *Self) bool {
|
||||||
|
self.mutex.lock();
|
||||||
|
defer self.mutex.unlock();
|
||||||
|
return self.isFullLH();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the length
|
/// Returns the length
|
||||||
fn len(self: Self) usize {
|
fn len(self: Self) usize {
|
||||||
const wrap_offset = 2 * self.buf.len * @intFromBool(self.write_index < self.read_index);
|
const wrap_offset = 2 * self.buf.len *
|
||||||
|
@intFromBool(self.write_index < self.read_index);
|
||||||
const adjusted_write_index = self.write_index + wrap_offset;
|
const adjusted_write_index = self.write_index + wrap_offset;
|
||||||
return adjusted_write_index - self.read_index;
|
return adjusted_write_index - self.read_index;
|
||||||
}
|
}
|
||||||
|
@ -115,9 +130,188 @@ pub fn Queue(
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const testing = std.testing;
|
||||||
|
const cfg = Thread.SpawnConfig{ .allocator = testing.allocator };
|
||||||
test "Queue: simple push / pop" {
|
test "Queue: simple push / pop" {
|
||||||
var queue: Queue(u8, 16) = .{};
|
var queue: Queue(u8, 16) = .{};
|
||||||
queue.push(1);
|
queue.push(1);
|
||||||
|
queue.push(2);
|
||||||
const pop = queue.pop();
|
const pop = queue.pop();
|
||||||
try std.testing.expectEqual(1, pop);
|
try testing.expectEqual(1, pop);
|
||||||
|
try testing.expectEqual(2, queue.pop());
|
||||||
|
}
|
||||||
|
|
||||||
|
const Thread = std.Thread;
|
||||||
|
fn testPushPop(q: *Queue(u8, 2)) !void {
|
||||||
|
q.push(3);
|
||||||
|
try testing.expectEqual(2, q.pop());
|
||||||
|
}
|
||||||
|
|
||||||
|
test "Fill, wait to push, pop once in another thread" {
|
||||||
|
var queue: Queue(u8, 2) = .{};
|
||||||
|
queue.push(1);
|
||||||
|
queue.push(2);
|
||||||
|
const t = try Thread.spawn(cfg, testPushPop, .{&queue});
|
||||||
|
try testing.expectEqual(false, queue.tryPush(3));
|
||||||
|
try testing.expectEqual(1, queue.pop());
|
||||||
|
t.join();
|
||||||
|
try testing.expectEqual(3, queue.pop());
|
||||||
|
try testing.expectEqual(null, queue.tryPop());
|
||||||
|
}
|
||||||
|
|
||||||
|
fn testPush(q: *Queue(u8, 2)) void {
|
||||||
|
q.push(0);
|
||||||
|
q.push(1);
|
||||||
|
q.push(2);
|
||||||
|
q.push(3);
|
||||||
|
q.push(4);
|
||||||
|
}
|
||||||
|
|
||||||
|
test "Try to pop, fill from another thread" {
|
||||||
|
var queue: Queue(u8, 2) = .{};
|
||||||
|
const thread = try Thread.spawn(cfg, testPush, .{&queue});
|
||||||
|
for (0..5) |idx| {
|
||||||
|
try testing.expectEqual(@as(u8, @intCast(idx)), queue.pop());
|
||||||
|
}
|
||||||
|
thread.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sleepyPop(q: *Queue(u8, 2)) !void {
|
||||||
|
// First we wait for the queue to be full.
|
||||||
|
while (!q.isFull())
|
||||||
|
try Thread.yield();
|
||||||
|
|
||||||
|
// Then we spuriously wake it up, because that's a thing that can
|
||||||
|
// happen.
|
||||||
|
q.not_full.signal();
|
||||||
|
q.not_empty.signal();
|
||||||
|
|
||||||
|
// Then give the other thread a good chance of waking up. It's not
|
||||||
|
// clear that yield guarantees the other thread will be scheduled,
|
||||||
|
// so we'll throw a sleep in here just to be sure. The queue is
|
||||||
|
// still full and the push in the other thread is still blocked
|
||||||
|
// waiting for space.
|
||||||
|
try Thread.yield();
|
||||||
|
std.time.sleep(std.time.ns_per_s);
|
||||||
|
// Finally, let that other thread go.
|
||||||
|
try std.testing.expectEqual(1, q.pop());
|
||||||
|
|
||||||
|
// This won't continue until the other thread has had a chance to
|
||||||
|
// put at least one item in the queue.
|
||||||
|
while (!q.isFull())
|
||||||
|
try Thread.yield();
|
||||||
|
// But we want to ensure that there's a second push waiting, so
|
||||||
|
// here's another sleep.
|
||||||
|
std.time.sleep(std.time.ns_per_s / 2);
|
||||||
|
|
||||||
|
// Another spurious wake...
|
||||||
|
q.not_full.signal();
|
||||||
|
q.not_empty.signal();
|
||||||
|
// And another chance for the other thread to see that it's
|
||||||
|
// spurious and go back to sleep.
|
||||||
|
try Thread.yield();
|
||||||
|
std.time.sleep(std.time.ns_per_s / 2);
|
||||||
|
|
||||||
|
// Pop that thing and we're done.
|
||||||
|
try std.testing.expectEqual(2, q.pop());
|
||||||
|
}
|
||||||
|
|
||||||
|
test "Fill, block, fill, block" {
|
||||||
|
// Fill the queue, block while trying to write another item, have
|
||||||
|
// a background thread unblock us, then block while trying to
|
||||||
|
// write yet another thing. Have the background thread unblock
|
||||||
|
// that too (after some time) then drain the queue. This test
|
||||||
|
// fails if the while loop in `push` is turned into an `if`.
|
||||||
|
|
||||||
|
var queue: Queue(u8, 2) = .{};
|
||||||
|
const thread = try Thread.spawn(cfg, sleepyPop, .{&queue});
|
||||||
|
queue.push(1);
|
||||||
|
queue.push(2);
|
||||||
|
const now = std.time.milliTimestamp();
|
||||||
|
queue.push(3); // This one should block.
|
||||||
|
const then = std.time.milliTimestamp();
|
||||||
|
|
||||||
|
// Just to make sure the sleeps are yielding to this thread, make
|
||||||
|
// sure it took at least 900ms to do the push.
|
||||||
|
try std.testing.expect(then - now > 900);
|
||||||
|
|
||||||
|
// This should block again, waiting for the other thread.
|
||||||
|
queue.push(4);
|
||||||
|
|
||||||
|
// And once that push has gone through, the other thread's done.
|
||||||
|
thread.join();
|
||||||
|
try std.testing.expectEqual(3, queue.pop());
|
||||||
|
try std.testing.expectEqual(4, queue.pop());
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sleepyPush(q: *Queue(u8, 1)) !void {
|
||||||
|
// Try to ensure the other thread has already started trying to pop.
|
||||||
|
try Thread.yield();
|
||||||
|
std.time.sleep(std.time.ns_per_s / 2);
|
||||||
|
|
||||||
|
// Spurious wake
|
||||||
|
q.not_full.signal();
|
||||||
|
q.not_empty.signal();
|
||||||
|
|
||||||
|
try Thread.yield();
|
||||||
|
std.time.sleep(std.time.ns_per_s / 2);
|
||||||
|
|
||||||
|
// Stick something in the queue so it can be popped.
|
||||||
|
q.push(1);
|
||||||
|
// Ensure it's been popped.
|
||||||
|
while (!q.isEmpty())
|
||||||
|
try Thread.yield();
|
||||||
|
// Give the other thread time to block again.
|
||||||
|
try Thread.yield();
|
||||||
|
std.time.sleep(std.time.ns_per_s / 2);
|
||||||
|
|
||||||
|
// Spurious wake
|
||||||
|
q.not_full.signal();
|
||||||
|
q.not_empty.signal();
|
||||||
|
|
||||||
|
q.push(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
test "Drain, block, drain, block" {
|
||||||
|
// This is like fill/block/fill/block, but on the pop end. This
|
||||||
|
// test should fail if the `while` loop in `pop` is turned into an
|
||||||
|
// `if`.
|
||||||
|
|
||||||
|
var queue: Queue(u8, 1) = .{};
|
||||||
|
const thread = try Thread.spawn(cfg, sleepyPush, .{&queue});
|
||||||
|
try std.testing.expectEqual(1, queue.pop());
|
||||||
|
try std.testing.expectEqual(2, queue.pop());
|
||||||
|
thread.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn readerThread(q: *Queue(u8, 1)) !void {
|
||||||
|
try testing.expectEqual(1, q.pop());
|
||||||
|
}
|
||||||
|
|
||||||
|
test "2 readers" {
|
||||||
|
// 2 threads read, one thread writes
|
||||||
|
var queue: Queue(u8, 1) = .{};
|
||||||
|
const t1 = try Thread.spawn(cfg, readerThread, .{&queue});
|
||||||
|
const t2 = try Thread.spawn(cfg, readerThread, .{&queue});
|
||||||
|
try Thread.yield();
|
||||||
|
std.time.sleep(std.time.ns_per_s / 2);
|
||||||
|
queue.push(1);
|
||||||
|
queue.push(1);
|
||||||
|
t1.join();
|
||||||
|
t2.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn writerThread(q: *Queue(u8, 1)) !void {
|
||||||
|
q.push(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
test "2 writers" {
|
||||||
|
var queue: Queue(u8, 1) = .{};
|
||||||
|
const t1 = try Thread.spawn(cfg, writerThread, .{&queue});
|
||||||
|
const t2 = try Thread.spawn(cfg, writerThread, .{&queue});
|
||||||
|
|
||||||
|
try testing.expectEqual(1, queue.pop());
|
||||||
|
try testing.expectEqual(1, queue.pop());
|
||||||
|
t1.join();
|
||||||
|
t2.join();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue