Mercurial > hg > Members > anatofuz > MoarVM
view src/core/threads.c @ 40:9b496a0c430a
merge
author | anatofuz |
---|---|
date | Tue, 27 Nov 2018 11:25:43 +0900 (2018-11-27) |
parents | 2cf249471370 |
children |
line wrap: on
line source
#include "moar.h" #include <platform/threads.h> /* Temporary structure for passing data to thread start. */ typedef struct { MVMThreadContext *tc; MVMObject *thread_obj; } ThreadStart; /* Creates a new thread handle with the MVMThread representation. Does not * actually start execution of the thread, but does give it its unique ID. */ MVMObject * MVM_thread_new(MVMThreadContext *tc, MVMObject *invokee, MVMint64 app_lifetime) { MVMThread *thread; MVMThreadContext *child_tc; unsigned int interval_id; interval_id = MVM_telemetry_interval_start(tc, "spawning a new thread off of me"); /* Create the Thread object and stash code to run and lifetime. */ MVMROOT(tc, invokee, { thread = (MVMThread *)MVM_repr_alloc_init(tc, tc->instance->Thread); }); thread->body.stage = MVM_thread_stage_unstarted; MVM_ASSIGN_REF(tc, &(thread->common.header), thread->body.invokee, invokee); thread->body.app_lifetime = app_lifetime; /* Try to create the new threadcontext. Can throw if libuv can't * create a loop for it for some reason (i.e. too many open files) */ MVMROOT(tc, thread, { child_tc = MVM_tc_create(tc, tc->instance); }); /* Set up the new threadcontext a little. */ child_tc->thread_obj = thread; child_tc->thread_id = 1 + MVM_incr(&tc->instance->next_user_thread_id); /* Add one, since MVM_incr returns original. */ thread->body.tc = child_tc; MVM_telemetry_interval_stop(child_tc, interval_id, "i'm the newly spawned thread."); /* Also make a copy of the thread ID in the thread object itself, so it * is available once the thread dies and its ThreadContext is gone. */ thread->body.thread_id = child_tc->thread_id; return (MVMObject *)thread; } /* This callback is passed to the interpreter code. It takes care of making * the initial invocation of the thread code. */ static void thread_initial_invoke(MVMThreadContext *tc, void *data) { /* The passed data is simply the code object to invoke. */ ThreadStart *ts = (ThreadStart *)data; MVMThread *thread = (MVMThread *)ts->thread_obj; MVMObject *invokee = thread->body.invokee; thread->body.invokee = NULL; /* Create initial frame, which sets up all of the interpreter state also. */ invokee = MVM_frame_find_invokee(tc, invokee, NULL); STABLE(invokee)->invoke(tc, invokee, MVM_callsite_get_common(tc, MVM_CALLSITE_ID_NULL_ARGS), NULL); /* This frame should be marked as the thread entry frame, so that any * return from it will cause us to drop out of the interpreter and end * the thread. */ tc->thread_entry_frame = tc->cur_frame; } /* This callback handles starting execution of a thread. */ static void start_thread(void *data) { ThreadStart *ts = (ThreadStart *)data; MVMThreadContext *tc = ts->tc; /* wait for the GC to finish if it's not finished stealing us. */ MVM_gc_mark_thread_unblocked(tc); tc->thread_obj->body.stage = MVM_thread_stage_started; /* Stash thread ID. */ tc->thread_obj->body.native_thread_id = MVM_platform_thread_id(); /* Create a spesh log for this thread, unless it's just going to run C * code (and thus it's a VM internal worker). */ if (REPR(tc->thread_obj->body.invokee)->ID != MVM_REPR_ID_MVMCFunction) MVM_spesh_log_initialize_thread(tc, 0); MVM_debugserver_notify_thread_creation(tc); /* Enter the interpreter, to run code. */ MVM_interp_run(tc, thread_initial_invoke, ts); MVM_debugserver_notify_thread_destruction(tc); /* Pop the temp root stack's ts->thread_obj, if it's still there (if we * cleared the temp root stack on exception at some point, it'll already be * gone). */ if (tc->num_temproots != 0) MVM_gc_root_temp_pop_n(tc, tc->num_temproots); MVM_free(ts); /* Mark as exited, so the GC will know to clear our stuff. */ tc->thread_obj->body.stage = MVM_thread_stage_exited; /* Mark ourselves as blocked, so that another thread will take care * of GC-ing our objects and cleaning up our thread context. */ MVM_gc_mark_thread_blocked(tc); /* Exit the thread, now it's completed. */ MVM_platform_thread_exit(NULL); } /* Begins execution of a thread. */ void MVM_thread_run(MVMThreadContext *tc, MVMObject *thread_obj) { MVMThread *child = (MVMThread *)thread_obj; int status, added; ThreadStart *ts; if (REPR(child)->ID == MVM_REPR_ID_MVMThread && IS_CONCRETE(thread_obj)) { MVMThreadContext *child_tc = child->body.tc; /* Mark thread as GC blocked until the thread actually starts. */ MVM_gc_mark_thread_blocked(child_tc); /* Create thread state, to pass to the thread start callback. */ ts = MVM_malloc(sizeof(ThreadStart)); ts->tc = child_tc; /* Push to starting threads list. We may need to retry this if we are * asked to join a GC run at this point (since the GC would already * have taken a snapshot of the thread list, so it's not safe to add * another at this point). */ added = 0; while (!added) { uv_mutex_lock(&tc->instance->mutex_threads); if (MVM_load(&tc->gc_status) == MVMGCStatus_NONE) { /* Insert into list. */ MVM_ASSIGN_REF(tc, &(child->common.header), child->body.next, tc->instance->threads); tc->instance->threads = child; /* Store the thread object in the thread start information and * keep it alive by putting it in the *child* tc's temp roots. */ ts->thread_obj = thread_obj; MVM_gc_root_temp_push(child_tc, (MVMCollectable **)&ts->thread_obj); /* Move thread to starting stage. */ child->body.stage = MVM_thread_stage_starting; /* Mark us done and unlock the mutex; any GC run will now have * a consistent view of the thread list and can safely run. */ added = 1; uv_mutex_unlock(&tc->instance->mutex_threads); } else { /* Another thread decided we'll GC now. Release mutex, and * do the GC, making sure thread_obj and child are marked. */ uv_mutex_unlock(&tc->instance->mutex_threads); MVMROOT2(tc, thread_obj, child, { GC_SYNC_POINT(tc); }); } } /* Do the actual thread creation. */ status = uv_thread_create(&child->body.thread, start_thread, ts); if (status < 0) MVM_panic(MVM_exitcode_compunit, "Could not spawn thread: errorcode %d", status); } else { MVM_exception_throw_adhoc(tc, "Thread handle passed to run must have representation MVMThread"); } } /* Waits for a thread to finish. */ static int try_join(MVMThreadContext *tc, MVMThread *thread) { /* Join the thread, marking ourselves as unable to GC while we wait. */ int status; MVM_gc_root_temp_push(tc, (MVMCollectable **)&thread); MVM_gc_mark_thread_blocked(tc); if (thread->body.stage < MVM_thread_stage_exited) { status = uv_thread_join(&thread->body.thread); } else { /* the target already ended */ status = 0; } MVM_gc_mark_thread_unblocked(tc); MVM_gc_root_temp_pop(tc); /* After a thread has been joined, we trigger a GC run to clean up after * it. This avoids problems where a program spawns threads and joins them * in a loop gobbling a load of memory and other resources because we do * not ever trigger a GC run to clean up the thread. */ MVM_gc_enter_from_allocator(tc); return status; } void MVM_thread_join(MVMThreadContext *tc, MVMObject *thread_obj) { if (REPR(thread_obj)->ID == MVM_REPR_ID_MVMThread && IS_CONCRETE(thread_obj)) { int status = try_join(tc, (MVMThread *)thread_obj); if (status < 0) MVM_panic(MVM_exitcode_compunit, "Could not join thread: errorcode %d", status); } else { MVM_exception_throw_adhoc(tc, "Thread handle passed to join must have representation MVMThread"); } } /* Gets the (VM-level) ID of a thread. */ MVMint64 MVM_thread_id(MVMThreadContext *tc, MVMObject *thread_obj) { if (REPR(thread_obj)->ID == MVM_REPR_ID_MVMThread && IS_CONCRETE(thread_obj)) return ((MVMThread *)thread_obj)->body.thread_id; else MVM_exception_throw_adhoc(tc, "Thread handle passed to threadid must have representation MVMThread"); } /* Gets the native OS ID of a thread. If it's not yet available because * the thread was not yet started, this will return 0. */ MVMint64 MVM_thread_native_id(MVMThreadContext *tc, MVMObject *thread_obj) { if (REPR(thread_obj)->ID == MVM_REPR_ID_MVMThread && IS_CONCRETE(thread_obj)) return ((MVMThread *)thread_obj)->body.native_thread_id; else MVM_exception_throw_adhoc(tc, "Thread handle passed to threadnativeid must have representation MVMThread"); } /* Yields control to another thread. */ void MVM_thread_yield(MVMThreadContext *tc) { MVM_telemetry_timestamp(tc, "thread yielding"); MVM_platform_thread_yield(); } /* Gets the object representing the current thread. */ MVMObject * MVM_thread_current(MVMThreadContext *tc) { return (MVMObject *)tc->thread_obj; } /* Gets the number of locks held by a thread. */ MVMint64 MVM_thread_lock_count(MVMThreadContext *tc, MVMObject *thread_obj) { if (REPR(thread_obj)->ID == MVM_REPR_ID_MVMThread && IS_CONCRETE(thread_obj)) { MVMThreadContext *thread_tc = ((MVMThread *)thread_obj)->body.tc; return thread_tc ? thread_tc->num_locks : 0; } else { MVM_exception_throw_adhoc(tc, "Thread handle used with threadlockcount must have representation MVMThread"); } } void MVM_thread_cleanup_threads_list(MVMThreadContext *tc, MVMThread **head) { /* Assumed to be the only thread accessing the list. * must set next on every item. */ MVMThread *new_list = NULL, *this = *head, *next; *head = NULL; while (this) { next = this->body.next; switch (this->body.stage) { case MVM_thread_stage_starting: case MVM_thread_stage_waiting: case MVM_thread_stage_started: case MVM_thread_stage_exited: case MVM_thread_stage_clearing_nursery: /* push it to the new starting list */ this->body.next = new_list; new_list = this; break; case MVM_thread_stage_destroyed: /* don't put in a list */ this->body.next = NULL; break; default: MVM_panic(MVM_exitcode_threads, "Thread in unknown stage: %"MVM_PRSz"\n", this->body.stage); } this = next; } *head = new_list; } /* Goes through all non-app-lifetime threads and joins them. */ void MVM_thread_join_foreground(MVMThreadContext *tc) { MVMint64 work = 1; while (work) { MVMThread *cur_thread = tc->instance->threads; work = 0; while (cur_thread) { if (cur_thread->body.tc != tc->instance->main_thread) { if (!cur_thread->body.app_lifetime) { if (MVM_load(&cur_thread->body.stage) < MVM_thread_stage_exited) { /* Join may trigger GC and invalidate cur_thread, so we * just set work to 1 and do another trip around the main * loop. */ try_join(tc, cur_thread); work = 1; break; } } } cur_thread = cur_thread->body.next; } } }