Parallel For (~100 LOC)

After watching this talk (Dennis Gustafsson – Parallelizing the physics solver – BSC 2025): https://www.youtube.com/watch?v=Kvsvd67XUKw I decided to give up on job systems for now and try using a simple parallel for. Roast the code if you want, as I’m sure it has issues:

package parallel

import "core:fmt"
import "core:sync"
import "core:thread"
import "core:time"

pn :: fmt.println
pfn :: fmt.printfln

Instance :: struct {
	userdata:     rawptr,
	callback:     proc(),
	id, count:    int,
	thread_count: int,
	running:      int,
	sema:         sync.Sema,
	finished:     bool,
}

init :: proc(self: ^Instance, thread_count: int) {
	self.thread_count = thread_count
}

loop :: proc(self: ^Instance, count: int, data: rawptr, callback: proc()) {
	self.userdata = data
	self.callback = callback
	self.count = count
	self.id = 0
	self.running = self.thread_count
	sync.post(&self.sema, self.thread_count)
	do_work(self)
	join(self)
}

@(private)
join :: proc(self: ^Instance) {
	count := 0
	for {
		count += 1
		running := sync.atomic_load_explicit(&self.running, .Relaxed)
		if running <= 0 do break
	}
	// pn("waited for: ", count)
}

finish :: proc(self: ^Instance) {
	self.finished = true
	sync.post(&self.sema, self.thread_count)
}

@(private)
do_work :: proc(self: ^Instance) {
	for {
		id := sync.atomic_add_explicit(&self.id, 1, .Relaxed)
		(id < self.count) or_break
		// pfn("thread_id: %v, id: %v", context.user_index, id)
		context.user_index = id
		context.user_ptr = self.userdata
		self.callback()
	}
}

pull :: proc($T: typeid) -> (int, ^T) {
	ptr := transmute(^T)context.user_ptr
	return context.user_index, ptr
}

thread_proc :: proc() {
	self := transmute(^Instance)context.user_ptr
	for {
		sync.wait(&self.sema)
		if self.finished do return
		do_work(self)
		sync.atomic_sub_explicit(&self.running, 1, .Release)
	}
}

@(private)
example :: proc() {
	THREAD_COUNT :: 4
	instance: Instance;p := &instance
	init(p, THREAD_COUNT)
	context.user_ptr = p
	threads: [THREAD_COUNT]^thread.Thread
	for _, i in threads {
		context.user_index = i + 1
		threads[i] = thread.create_and_start(thread_proc, context)
	}
	SIZE :: THREAD_COUNT * 4
	Ints :: [SIZE]int
	data: Ints;d := &data
	work :: proc() {time.sleep(50 * time.Microsecond)}
	loop(p, SIZE, d, proc() {
		id, d := pull(Ints)
		d[id] = id + 1
		work()
	})
	pn(data)
	data[0] *= -20
	loop(p, SIZE, d, proc() {
		id, d := pull(Ints)
		d[id] -= 2 * id
		work()
	})
	pn(data)
	finish(p)
	for t in threads {
		thread.destroy(t)
	}
}

main :: proc() {
	example()
}
6 Likes

The idea is good, although to be practical I’d say it needs a tad bit more work

Hot-looping is kinda bad?

Your imlpementation of join() does hot-looping over the atomic variable to check that all of the threads have stopped. This may cause the operating system to wastefully burn cycles waiting for the threads to finish and perform the cleanup instead of doing some other more useful work in other threads/processes.

Hot-looping is generally more efficient for your program, as it will generally perform the check faster, but it’s not great for the system as a whole. I can suggest using either a thread.join_multiple() which performs a generally correct thing, waiting for each thread to stop separately, or sync.Wait_Group, which does the same thing, but uses semaphores to wait for the whole group to finish instead of each thread separately.

Also you seem to be double-freeing the threads, what’s going on? loop() calls thread.destroy() and there’s also thread.destroy() outside of the loop()?

The wait group is also useful for when you want to re-use the thread group for later operations. Just reset the semaphore inside the wait group on every call to loop() and you’re done. This can probably help you decouple looping the threads vs. waiting for them to finish.

1 Like

The main thread here also participates in doing the work, and do_work() only returns when the final work has started, so I assumed hot looping here would be better than putting the thread to sleep since it shouldn’t loop many times and it’s kinda unpredictable when the thread resumes after sleeping (the time argument is only the minimum amount of time it sleeps I think). Is it still that bad?

On the other point about double freeing, I’m not sure I understand. Does loop() call thread.destroy() somehow?

Thanks for taking a look at the code though, I appreciate it. It’s been a while since I posted this though so I’m having trouble remembering the exact reasoning for why I did things like this.

Sorry for necropost.
Concurrency by nature is unpredictable, may be your code will run on single cpu core, so stalling the only available core in hot loop waiting might be not the best option, additionally operating system has no information that this core is “waiting” and actually OS will consider this thread running perfectly.
But the is a solution just for situations like this: package sync - pkg.odin-lang.org
This thing “is designed” to tell the OS that this thread wants to sleep until some other thread awakens it.

1 Like

If I ever make a version 2, I’ll probably use a Cond instead of hot looping. Another solution could be to have the other threads go to sleep when (count - 1) work has been taken, so that the main thread is guaranteed to take the last work. Then there is no need to hot-loop or sleep. Or so I think, but I might have missed something.

EDIT: I just made some quick changes which implements the second solution. This now uses user_ptr to grab the instance.

EDIT2: Turns out I did miss something.

Since the main thread starts the last work after all other work has only been taken, not completed, it is possible for the main thread to finish the last work and continue out of _for() before other threads have finished their work, potentially causing data races.

I wasn’t able to confirm this by testing but it should exist in theory.

I’ve updated the code to use use Wait_Group like @flysand7 suggested, which internally uses Cond as @Andrew7898 suggested. Thanks, you guys. I should’ve just done this from the start.

EDIT3: The final update, hopefully. Made some changes to allow for using both styles of waiting interchangeably for different workloads.

package parallel

import "core:fmt"
import "core:sync"
import "core:thread"
import "core:time"

Instance :: struct {
	userdata:     rawptr,
	callback:     proc(),
	id, count:    int,
	thread_count: int,
	sema:         sync.Sema,
	wait_group:   sync.Wait_Group,
	finished:     bool,
	lockfree:     bool,
}

// 'for_*' procs and thread_proc access this through context.user_ptr
create_instance :: proc(thread_count: int) -> Instance {
	return {thread_count = thread_count}
}

@(private)
_for :: proc(self: ^Instance, count: int, data: rawptr, callback: proc(), lockfree: bool) {
	self.userdata = data
	self.callback = callback
	self.count = count
	self.id = 0
	self.lockfree = lockfree
	self.wait_group.counter = self.thread_count
	sync.post(&self.sema, self.thread_count)
	do_work(self)
}

// uses hot looping, better for small work (count agnostic)
// ^Instance should be set to context.user_ptr
for_lockfree :: proc(count: int, data: rawptr, work: proc()) {
	self := (^Instance)(context.user_ptr)
	_for(self, count, data, work, true)
	for sync.atomic_load_explicit(&self.wait_group.counter, .Relaxed) != 0 {
		// waiting
	}
}

// uses cond variable, better for large work (count agnostic)
// ^Instance should be set to context.user_ptr
for_locking :: proc(count: int, data: rawptr, work: proc()) {
	self := (^Instance)(context.user_ptr)
	_for(self, count, data, work, false)
	sync.wait(&self.wait_group)
}

// call before destroying threads
finish :: proc(self: ^Instance) {
	self.finished = true
	sync.post(&self.sema, self.thread_count)
}

@(private)
do_work :: proc(self: ^Instance) {
	for {
		id := sync.atomic_add_explicit(&self.id, 1, .Relaxed)
		(id < self.count) or_break
		context.user_index = id
		context.user_ptr = self.userdata
		self.callback()
	}
}

// use inside of work proc to get work id and data
pull :: proc($T: typeid) -> (int, ^T) {
	ptr := (^T)(context.user_ptr)
	return context.user_index, ptr
}

// create and start thread with this proc
// ^Instance should be set to context.user_ptr
thread_proc :: proc() {
	self := (^Instance)(context.user_ptr)
	for {
		sync.wait(&self.sema)
		if self.finished do return
		do_work(self)
		work_done(self)
	}
}

@(private)
work_done :: proc(self: ^Instance) {
	wg := &self.wait_group
	if self.lockfree {
		sync.atomic_sub_explicit(&wg.counter, 1, .Release)
	} else {
		sync.wait_group_done(wg)
	}
}

@(private)
example :: proc() {
	THREAD_COUNT :: 4

	instance := create_instance(THREAD_COUNT)
	context.user_ptr = &instance

	threads: [THREAD_COUNT]^thread.Thread
	for _, i in threads {
		context.user_index = i + 1
		threads[i] = thread.create_and_start(thread_proc, context)
	}

	SIZE :: THREAD_COUNT * 4
	data: [SIZE]int

	for_lockfree(SIZE, &data, proc() {
		id, data := pull([SIZE]int)
		data[id] = id + 1
		time.sleep(10 * time.Microsecond)
	})
	fmt.println(data)

	data[SIZE - 1] *= 2

	for_locking(SIZE, &data, proc() {
		id, data := pull([SIZE]int)
		data[id] -= 2 * (id + 1)
		time.sleep(10 * time.Millisecond)
	})
	fmt.println(data)

	finish(&instance)
	for t in threads {
		thread.destroy(t)
	}
}

main :: proc() {
	example()
}
1 Like

I looked at the source for Wait_Group and found this:

// extended.odin
wait_group_wait :: proc "contextless" (wg: ^Wait_Group) {
	guard(&wg.mutex)

	if wg.counter != 0 {
		cond_wait(&wg.cond, &wg.mutex)
		if wg.counter != 0 {
			panic_contextless("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait")
		}
	}
}

// primitives.odin
cond_wait :: proc "contextless" (c: ^Cond, m: ^Mutex) {
	_cond_wait(c, m)
}

// primitives_windows.odin
_cond_wait :: proc "contextless" (c: ^Cond, m: ^Mutex) {
	_ = win32.SleepConditionVariableSRW(&c.impl.cond, &m.impl.srwlock, win32.INFINITE, 0)
}

Then I checked the documentation for SleepConditionVariableSRW and I found this in the remarks:

Condition variables are subject to spurious wakeups (those not associated with an explicit wake) and stolen wakeups (another thread manages to run before the woken thread). Therefore, you should recheck a predicate (typically in a while loop) after a sleep operation returns.

Should I be worried about this? From what I can tell, Odin’s implementation of Wait_Group doesn’t do the extra safety check.

EDIT: I added a makeshift solution to my previous post above. Not sure if it’s correct.

EDIT2: I checked the source on Github and the code in there seems correct. I guess I was using an older version of the compiler. Nevermind this post then, there should be no issue.

Funny that Odin code always look cleaner than other modern languages (even Ocaml sometimes).

It would be more painful if I do this raw thread stuffs in other languages (without supporting libraries).

1 Like