From 8bb41b20dd07a0c21cd70d9d28147bc87802697a Mon Sep 17 00:00:00 2001 From: Fabrice Bellard Date: Mon, 14 Apr 2025 19:13:57 +0200 Subject: [PATCH] enabled os.Worker on Windows (bnoordhuis) --- quickjs-libc.c | 296 +++++++++++++++++++++++++++++++------------------ 1 file changed, 187 insertions(+), 109 deletions(-) diff --git a/quickjs-libc.c b/quickjs-libc.c index a9fff6a..0788d8c 100644 --- a/quickjs-libc.c +++ b/quickjs-libc.c @@ -64,10 +64,8 @@ typedef sig_t sighandler_t; #endif -#if !defined(_WIN32) -/* enable the os.Worker API. IT relies on POSIX threads */ +/* enable the os.Worker API. It relies on POSIX threads */ #define USE_WORKER -#endif #ifdef USE_WORKER #include @@ -114,14 +112,22 @@ typedef struct { size_t sab_tab_len; } JSWorkerMessage; +typedef struct JSWaker { +#ifdef _WIN32 + HANDLE handle; +#else + int read_fd; + int write_fd; +#endif +} JSWaker; + typedef struct { int ref_count; #ifdef USE_WORKER pthread_mutex_t mutex; #endif struct list_head msg_queue; /* list of JSWorkerMessage.link */ - int read_fd; - int write_fd; + JSWaker waker; } JSWorkerMessagePipe; typedef struct { @@ -2138,82 +2144,81 @@ static void call_handler(JSContext *ctx, JSValueConst func) JS_FreeValue(ctx, ret); } -#if defined(_WIN32) +#ifdef USE_WORKER -static int js_os_poll(JSContext *ctx) +#ifdef _WIN32 + +static int js_waker_init(JSWaker *w) { - JSRuntime *rt = JS_GetRuntime(ctx); - JSThreadState *ts = JS_GetRuntimeOpaque(rt); - int min_delay, console_fd; - int64_t cur_time, delay; - JSOSRWHandler *rh; - struct list_head *el; + w->handle = CreateEvent(NULL, TRUE, FALSE, NULL); + return w->handle ? 0 : -1; +} - /* XXX: handle signals if useful */ +static void js_waker_signal(JSWaker *w) +{ + SetEvent(w->handle); +} - if (list_empty(&ts->os_rw_handlers) && list_empty(&ts->os_timers)) - return -1; /* no more events */ +static void js_waker_clear(JSWaker *w) +{ + ResetEvent(w->handle); +} - /* XXX: only timers and basic console input are supported */ - if (!list_empty(&ts->os_timers)) { - cur_time = get_time_ms(); - min_delay = 10000; - list_for_each(el, &ts->os_timers) { - JSOSTimer *th = list_entry(el, JSOSTimer, link); - delay = th->timeout - cur_time; - if (delay <= 0) { - JSValue func; - /* the timer expired */ - func = th->func; - th->func = JS_UNDEFINED; - free_timer(rt, th); - call_handler(ctx, func); - JS_FreeValue(ctx, func); - return 0; - } else if (delay < min_delay) { - min_delay = delay; - } - } - } else { - min_delay = -1; - } +static void js_waker_close(JSWaker *w) +{ + CloseHandle(w->handle); + w->handle = INVALID_HANDLE_VALUE; +} - console_fd = -1; - list_for_each(el, &ts->os_rw_handlers) { - rh = list_entry(el, JSOSRWHandler, link); - if (rh->fd == 0 && !JS_IsNull(rh->rw_func[0])) { - console_fd = rh->fd; - break; - } - } +#else // !_WIN32 - if (console_fd >= 0) { - DWORD ti, ret; - HANDLE handle; - if (min_delay == -1) - ti = INFINITE; - else - ti = min_delay; - handle = (HANDLE)_get_osfhandle(console_fd); - ret = WaitForSingleObject(handle, ti); - if (ret == WAIT_OBJECT_0) { - list_for_each(el, &ts->os_rw_handlers) { - rh = list_entry(el, JSOSRWHandler, link); - if (rh->fd == console_fd && !JS_IsNull(rh->rw_func[0])) { - call_handler(ctx, rh->rw_func[0]); - /* must stop because the list may have been modified */ - break; - } - } - } - } else { - Sleep(min_delay); - } +static int js_waker_init(JSWaker *w) +{ + int fds[2]; + + if (pipe(fds) < 0) + return -1; + w->read_fd = fds[0]; + w->write_fd = fds[1]; return 0; } -#else -#ifdef USE_WORKER +static void js_waker_signal(JSWaker *w) +{ + int ret; + + for(;;) { + ret = write(w->write_fd, "", 1); + if (ret == 1) + break; + if (ret < 0 && (errno != EAGAIN || errno != EINTR)) + break; + } +} + +static void js_waker_clear(JSWaker *w) +{ + uint8_t buf[16]; + int ret; + + for(;;) { + ret = read(w->read_fd, buf, sizeof(buf)); + if (ret >= 0) + break; + if (errno != EAGAIN && errno != EINTR) + break; + } +} + +static void js_waker_close(JSWaker *w) +{ + close(w->read_fd); + close(w->write_fd); + w->read_fd = -1; + w->write_fd = -1; +} + +#endif // _WIN32 static void js_free_message(JSWorkerMessage *msg); @@ -2235,17 +2240,8 @@ static int handle_posted_message(JSRuntime *rt, JSContext *ctx, /* remove the message from the queue */ list_del(&msg->link); - if (list_empty(&ps->msg_queue)) { - uint8_t buf[16]; - int ret; - for(;;) { - ret = read(ps->read_fd, buf, sizeof(buf)); - if (ret >= 0) - break; - if (errno != EAGAIN && errno != EINTR) - break; - } - } + if (list_empty(&ps->msg_queue)) + js_waker_clear(&ps->waker); pthread_mutex_unlock(&ps->mutex); @@ -2288,7 +2284,104 @@ static int handle_posted_message(JSRuntime *rt, JSContext *ctx, { return 0; } -#endif +#endif /* !USE_WORKER */ + +#if defined(_WIN32) + +static int js_os_poll(JSContext *ctx) +{ + JSRuntime *rt = JS_GetRuntime(ctx); + JSThreadState *ts = JS_GetRuntimeOpaque(rt); + int min_delay, count; + int64_t cur_time, delay; + JSOSRWHandler *rh; + struct list_head *el; + HANDLE handles[MAXIMUM_WAIT_OBJECTS]; // 64 + + /* XXX: handle signals if useful */ + + if (list_empty(&ts->os_rw_handlers) && list_empty(&ts->os_timers) && + list_empty(&ts->port_list)) { + return -1; /* no more events */ + } + + if (!list_empty(&ts->os_timers)) { + cur_time = get_time_ms(); + min_delay = 10000; + list_for_each(el, &ts->os_timers) { + JSOSTimer *th = list_entry(el, JSOSTimer, link); + delay = th->timeout - cur_time; + if (delay <= 0) { + JSValue func; + /* the timer expired */ + func = th->func; + th->func = JS_UNDEFINED; + free_timer(rt, th); + call_handler(ctx, func); + JS_FreeValue(ctx, func); + return 0; + } else if (delay < min_delay) { + min_delay = delay; + } + } + } else { + min_delay = -1; + } + + count = 0; + list_for_each(el, &ts->os_rw_handlers) { + rh = list_entry(el, JSOSRWHandler, link); + if (rh->fd == 0 && !JS_IsNull(rh->rw_func[0])) { + handles[count++] = (HANDLE)_get_osfhandle(rh->fd); // stdin + if (count == (int)countof(handles)) + break; + } + } + + list_for_each(el, &ts->port_list) { + JSWorkerMessageHandler *port = list_entry(el, JSWorkerMessageHandler, link); + if (JS_IsNull(port->on_message_func)) + continue; + handles[count++] = port->recv_pipe->waker.handle; + if (count == (int)countof(handles)) + break; + } + + if (count > 0) { + DWORD ret, timeout = INFINITE; + if (min_delay != -1) + timeout = min_delay; + ret = WaitForMultipleObjects(count, handles, FALSE, timeout); + + if (ret < count) { + list_for_each(el, &ts->os_rw_handlers) { + rh = list_entry(el, JSOSRWHandler, link); + if (rh->fd == 0 && !JS_IsNull(rh->rw_func[0])) { + call_handler(ctx, rh->rw_func[0]); + /* must stop because the list may have been modified */ + goto done; + } + } + + list_for_each(el, &ts->port_list) { + JSWorkerMessageHandler *port = list_entry(el, JSWorkerMessageHandler, link); + if (!JS_IsNull(port->on_message_func)) { + JSWorkerMessagePipe *ps = port->recv_pipe; + if (ps->waker.handle == handles[ret]) { + if (handle_posted_message(rt, ctx, port)) + goto done; + } + } + } + } + } else { + Sleep(min_delay); + } + done: + return 0; +} + +#else static int js_os_poll(JSContext *ctx) { @@ -2364,8 +2457,8 @@ static int js_os_poll(JSContext *ctx) JSWorkerMessageHandler *port = list_entry(el, JSWorkerMessageHandler, link); if (!JS_IsNull(port->on_message_func)) { JSWorkerMessagePipe *ps = port->recv_pipe; - fd_max = max_int(fd_max, ps->read_fd); - FD_SET(ps->read_fd, &rfds); + fd_max = max_int(fd_max, ps->waker.read_fd); + FD_SET(ps->waker.read_fd, &rfds); } } @@ -2391,14 +2484,14 @@ static int js_os_poll(JSContext *ctx) JSWorkerMessageHandler *port = list_entry(el, JSWorkerMessageHandler, link); if (!JS_IsNull(port->on_message_func)) { JSWorkerMessagePipe *ps = port->recv_pipe; - if (FD_ISSET(ps->read_fd, &rfds)) { + if (FD_ISSET(ps->waker.read_fd, &rfds)) { if (handle_posted_message(rt, ctx, port)) goto done; } } } } - done: + done: return 0; } #endif /* !_WIN32 */ @@ -3269,22 +3362,17 @@ static void js_sab_dup(void *opaque, void *ptr) static JSWorkerMessagePipe *js_new_message_pipe(void) { JSWorkerMessagePipe *ps; - int pipe_fds[2]; - - if (pipe(pipe_fds) < 0) - return NULL; ps = malloc(sizeof(*ps)); - if (!ps) { - close(pipe_fds[0]); - close(pipe_fds[1]); + if (!ps) + return NULL; + if (js_waker_init(&ps->waker)) { + free(ps); return NULL; } ps->ref_count = 1; init_list_head(&ps->msg_queue); pthread_mutex_init(&ps->mutex, NULL); - ps->read_fd = pipe_fds[0]; - ps->write_fd = pipe_fds[1]; return ps; } @@ -3323,8 +3411,7 @@ static void js_free_message_pipe(JSWorkerMessagePipe *ps) js_free_message(msg); } pthread_mutex_destroy(&ps->mutex); - close(ps->read_fd); - close(ps->write_fd); + js_waker_close(&ps->waker); free(ps); } } @@ -3572,17 +3659,8 @@ static JSValue js_worker_postMessage(JSContext *ctx, JSValueConst this_val, ps = worker->send_pipe; pthread_mutex_lock(&ps->mutex); /* indicate that data is present */ - if (list_empty(&ps->msg_queue)) { - uint8_t ch = '\0'; - int ret; - for(;;) { - ret = write(ps->write_fd, &ch, 1); - if (ret == 1) - break; - if (ret < 0 && (errno != EAGAIN || errno != EINTR)) - break; - } - } + if (list_empty(&ps->msg_queue)) + js_waker_signal(&ps->waker); list_add_tail(&msg->link, &ps->msg_queue); pthread_mutex_unlock(&ps->mutex); return JS_UNDEFINED;