-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathactor.c
115 lines (102 loc) · 2.41 KB
/
actor.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#include "actor.h"
#include <stdlib.h>
#include <pthread.h>
#ifdef __cplusplus
extern "C" {
#endif
static __shared__ ActorStage *global_stage;
static __shared__ unsigned global_max_cores = 0;
int ActorStage_init(int max_cores, int threads_per_core)
{
if (max_cores < 0) {
max_cores = 1;
}
if (threads_per_core < 0) {
threads_per_core = 1;
}
size_t size = sizeof(Scheduler*) * max_cores;
ActorStage *stage = (ActorStage *) calloc(1, sizeof(ActorStage));
stage->sched = (Scheduler **) calloc(1, size);
int i;
for (i = 0; i < max_cores; i++) {
stage->sched[i] = Scheduler_new(threads_per_core);
}
global_stage = stage;
global_max_cores = max_cores;
return 0;
}
int ActorStage_finish(void)
{
ActorStage *stage = global_stage;
int i;
for (i = 0; i < global_max_cores; i++) {
Scheduler_delete(stage->sched[i]);
stage->sched[i] = NULL;
}
free(stage->sched);
free(stage);
return 0;
}
int ActorStage_wait(enum actor_stage_wait wait, int val)
{
return 0;
}
Actor *Actor_new(JSON o, const struct actor_api *api)
{
Actor *a = (Actor *) calloc(1, sizeof(Actor));
a->api = api;
a->mailbox = Queue_new(1);
a->self = o;
api->finit(a);
return a;
}
void Actor_finalize(Actor *a)
{
a->api->fexit(a);
JSON_free((JSON)a->self);
Queue_delete(a->mailbox);
free(a);
}
int actor_task_run(Task *t)
{
ThreadContext *ctx;
Actor *a = (Actor *) TASK_DATA(t);
Queue *q = a->mailbox;
int res = 1;
if (!Queue_isEmpty(q)) {
ctx = Queue_getContext(q);
JSON message = (JSON) Queue_deq(ctx, q);
res = a->api->act(a, message);
JSON_free(message);
}
return res;
}
void actor_task_destruct(Task *t)
{
}
struct task_api actor_task_api = {
actor_task_run,
actor_task_destruct
};
/*
* sched_id is used for task dispaching. sched_id simulates
* round robin dispatch and may be shared by all of threads
* but I do not care.
*/
static __shared__ int sched_id = 0;
void Actor_act(Actor *a)
{
ActorStage *stage = global_stage;
sched_id = (sched_id+1) & (global_max_cores-1);
Scheduler_enqueue(stage->sched[sched_id],
Task_new((void*) a, &actor_task_api));
}
void Actor_send(Actor *a, JSON message)
{
Queue *q = a->mailbox;
ThreadContext *ctx = Queue_getContext(q);
Queue_enq(ctx, q, (Data)message);
}
#ifdef __cplusplus
}
#endif