An interprocess communication module for Linux Kernel developed as Final project for the Advanced Operating Systems and Virtualisation course @ La Sapienza
This inter-process messaging and synchronization subsystem is implemented as a kernel loadable module that manages two kind of devices: group root and group.
In order to compile and run the module it is sufficient to :
$ make
to compile the module$ make install
to installudev
rules$ make insert
to insert module in the kernel$ make test
to compile userspace library
Symmetrically there exist
$ make uninstall
to delete udev rules$ make remove
to remove module from kernel
The group root device is created at module insertion time and is mapped to /dev/synch/group
using a udev
rule. It implements the open, ioctl and release operations.
struct file_operations ipc_group_root_ops = {
.open = ipc_group_root_open,
.unlocked_ioctl = ipc_group_root_ioctl,
.release = ipc_group_root_release
};
The ioctl
accepts two ioctl numbers:
IPC_GROUP_UNINSTALL
to create a new groupIPC_GROUP_UNINSTALL
to destroy an existing group
Additionally a second parameter, an integer value that will be casted to group_t
, should be provided, and it identifies the group number to be created/destroyed.
Group devices are dinamically created by calling ioctl
on the aforementioned device, each group is mapped to /dev/synch/group<i>
wherei
is the identifier of the group and should be in the range 1 to IPC_MAX_GROUPS
included.
Each group device implements the following operations:
struct file_operations ipc_group_ops = {
.open = ipc_group_open,
.write = ipc_group_write,
.read = ipc_group_read,
.release = ipc_group_release,
.unlocked_ioctl = ipc_group_ioctl,
.flush = ipc_group_flush
};
write
allows to publish a messageread
allows to retrieve a published messageflush
Makes all the delayed messages availableioctl
accepts 5 different values as ioctl number:SET_SEND_DELAY
sets the delay between the moment a message is published and the moment it is made available, a second parameter should be provided that is interpreted as milliseconds.REVOKE_DELAYED_MESSAGES
Will destroy all messages that have been published with a delay value greater than 0 and that are not been made available yetFLUSH_DELAYED_MESSAGES
alias for flushSLEEP_ON_BARRIER
The process invoking this function will sleep until another process issues an awake command on this groupAWAKE_BARRIER
Awakes all process sleeping on this group
Additionally it is possible to execute basic primitives by using the wrappers provided inside ipc_lib
void print_code(int err);
int install_group(group_t groupno, char* group_path, int lrn );
int uninstall_group(group_t groupno);
int open_group(group_t groupno);
int close_group(group_t groupno);
int send_msg(group_t groupno, char* payload, ssize_t payload_len);
int recv_msg(group_t groupno, char* payload, ssize_t payload_len);
int set_send_delay(group_t groupno, int delay);
int flush_delayed_messages(group_t groupno);
int revoke_delayed_messages(group_t groupno);
int sleep_on_barrier(group_t groupno);
int awake_barrier(group_t groupno);
This library simply incapsulate the device installation/uninstallation procedures and keeps track about file opening/closing procedures by mantaining 2 arrays: one for group devices paths and one for group device files descriptor.
This module revolves around three main data structures:
This device is installed on module insertion and removed upon module deletion. It's only purpose is to allow the creation and deletion of regular messaging/synchronization group devices. It has a custom ipc_group_root_dev
struct to mantain some required fields:
typedef struct ipc_group_root_dev_t {
spinlock_t lock;
bool closing;
struct cdev cdev;
} ipc_group_root_dev;
This structure wraps cdev
,in this way it can be retrieved from the file operations context using the container_of
macro.
Additionally it stores only two meaningful values:
- A
spinlock_t lock
used to serialize group installation/removal procedures - A
bool closing
flag, it is set when the module removal is triggered, forbidding to open this device to other processes
The core of the module, this is the device to which messages are posted and synching requests are issued.
typedef struct ipc_group_dev_t {
struct list_head msg_list;
struct list_head delayed_msg_list;
int msg_count;
int delayed_msg_count;
spinlock_t lock;
spinlock_t delayed_lock;
int threads_count;
ktime_t delay;
struct wait_queue_head wait_queue;
int waiting_count;
int awaking_count;
bool closing;
struct cdev cdev;
} ipc_group_dev;
It contains the 3 sets of variables,
-
The general purpose ones:
- A
int threads_count
to keep track of the number of threads that currently opened this device - A
bool closing
flag that, similarly to the root group, inhibitis the open of this device once it is set to true - The
struct cdev cdev
to allow the retrieval of this structure using thecontainer_of
macro
- A
-
Those used by the messaging subsystem (there are 2 version for each of these, one for the available messages and one for the delayed)
- The head of a doubly linked list
struct list_head msg_list
(delayed_msg_list
) to store available (delayed) messages - A
int msg_count
(delayed_msg_count
) to count available (delayed) messages - A
spinlock_t lock
(delayed_lock
) to synchronize list operations
- The head of a doubly linked list
-
And those used for the synchronization operations:
struct wait_queue_head wait_queue
to store waiting processesint awaking_count
andint waiting_count
to keep count of processes in one or the other state
The last structure is used to represent internal messages exchanged between processes
typedef struct ipc_message_t {
struct list_head next;
char* payload;
ssize_t payload_len;
ipc_group_dev* group_dev;
struct hrtimer timer;
} ipc_message;
struct list_head next
used to insert messages inside the linked listschar* payload
andssize_t payload_len
for the message payloadipc_group_dev* group_dev
is the group in which this message has been published ( although not strictly required it's handy to have this reference during some operations )struct hrtimer timer
is the timer used to make delayed messages available
Also, there are some global variables for the message and storage size and to keep track of installed devices.
int max_message_size = IPC_DEFAULT_MSG_SIZE;
int max_storage_size = IPC_DEFAULT_STORAGE_SIZE;
int curr_storage_size = 0;
ipc_group_root_dev* group_root_dev ;
ipc_group_dev* group_devs[IPC_MAX_GROUPS+1] = {0};
Additionally max_message_size
and max_storage_size
are module parameters, allowing to customize them at runtime
module_param(max_message_size, int, 0660);
module_param(max_storage_size, int, 0660);
The susbsystem is basically composed of three kinds of actions: module management, message passing and barrier synchronization.
The module_init()
and module_exit()
operation will relatively perform the group root device creation and removal by calling ipc_group_root_install()
and ipc_group_root_uninstall()
The group root installation basically consist of sequentially calling the functions below
alloc_chrdev_region(&devno, 0, IPC_MAX_GROUPS+1, IPC_ROOT_DEV_NAME);
class_create(THIS_MODULE, IPC_CLASS_NAME)
kmalloc(sizeof(ipc_group_root_dev), GFP_USER)
// fields initialization to default value
cdev_init(&(group_root_dev -> cdev) , &ipc_group_root_ops)
cdev_add(&(group_root_dev -> cdev) ,devno , 1)
device_create(group_dev_class, NULL, devno, NULL, IPC_ROOT_DEV_NAME)
Basically what they do is to allocate space for the device, initialize boh the custom fields a and the cdev
ones, add it to sysfs using cdev_add
and then create the kobject with device_create
that will make this device appear under /dev
As these operation may fail, we should be checking the result of each one in order to assess if they were executed correctly like:
group_root_device = device_create(group_dev_class, NULL, devno, NULL, IPC_ROOT_DEV_NAME);
if (group_root_device < 0) {
GR_ERROR( "Failed creating device\n");
res = group_root_device;
goto DEVICE_CREATE_FAIL;
} else {
GR_DEBUG( "Device creation success");
}
If everything is executed correctly 0
is returned, otherwise we may leave the system in an inconsistent state, for this reason errors are handled in the following way:
-
int res
is set to the return value ( or to a custom error ) -
the
goto
construct is invoked to revert the steps already done
DEVICE_CREATE_FAIL:
cdev_del(&(group_root_dev -> cdev));
CDEV_ADD_FAIL:
kfree(group_root_dev);
CDEV_ALLOC_FAIL:
class_destroy(group_dev_class);
CLASS_CREATE_FAIL:
unregister_chrdev_region(devno, IPC_MAX_GROUPS+1);
ALLOC_CHRDEV_REGION_FAIL:
return res;
The removal of the group root consist of 3 phases:
-
First the
closing
flag is set to true, ensuring that no other process can open this device (and subsequently forbidding to create a new group device once this routine has been fired)spin_lock(&(group_root_dev -> lock)); group_root_dev -> closing = true; spin_unlock(&(group_root_dev -> lock));
Note that taking the lock is required in order to ensure that we wait the completion of concurrent group createion/deletion operations
-
Then all group devices are uninstalled
for (i=1; i<= IPC_MAX_GROUPS; i++){ ipc_group_uninstall((group_t)i); }
-
Finally the classical routines to remove the device from the system and deallocate the used memory are issued
device_destroy(group_dev_class, devno); cdev_del(&(group_root_dev -> cdev)); kfree(group_root_dev); class_destroy(group_dev_class); unregister_chrdev_region(devno, IPC_MAX_GROUPS+1); return SUCCESS;
Group installation is triggered by issuing ioctl
with the IPC_GROUP_INSTALL
number as first parameter and group_t groupno
as second one.
Once ioctl is called two things happen:
- The group root lock is taken with
spin_lock(&(group_root_dev -> lock));
to ensure that no other group creation and deletion operation are happening concurrently int ipc_group_install(group_t groupno)
is invoked, and, it will perform the same steps of group_root creation, with a few additional checks.- Check if
groupno
is valid ( i.e in the range 1 toIPC_MAX_GROUPS
included ) - Check if the group is not already installed
- Allocate the required space for the device with
kmalloc
- Initialize the fields of
ipc_group_dev
used by this module - Initialize cdev and add it to sysfs with
cdev_init
andcdev_alloc
- Create the kobject for the device with
device_create
- Store a pointer to this device into the
ipc_group_dev* group_devs[]
array
- Check if
kmalloc(sizeof(ipc_group_dev), GFP_USER);
cdev_init(&(group_dev->cdev), &ipc_group_ops);
// fields initialization to default value
cdev_add(&(group_dev->cdev), group_devno, 1);
device_create(group_dev_class, NULL, group_devno, NULL, devname);
The group removal basically reverts the group installation operation, with a few preliminary steps required to ensure that no other process is using this group, basically what happens is that:
-
Removal is triggered by issuing
ioctl
with theIPC_GROUP_UNINSTALL
parameter. -
The group root lock is taken with
spin_lock(&(group_root_dev -> lock));
-
ipc_group_uninstall(group_t groupno)
is invoked and, now it happens that:-
A validity check on
group_t groupno
is performed, similarly to the installation routine -
Group closing flag is set to true, inhibiting other processes to open this device from now on
group_dev -> closing = true;
-
The process enters a busy waiting loop waiting until it is the only one using the device
while (group_dev -> threads_count > 0 ){};
-
All delayed and available messages are deleted
_revoke_delayed_messages( group_dev ); _delete_messages( group_dev );
-
The device itself is removed from the system and the memory it used is deallocated
device_destroy(group_dev_class, group_devno); cdev_del(&(group_dev->cdev)); kfree(group_dev);
-
The reference to this device in the
ipc_group_dev* group_devs[]
array is set toNULL
-
Message publishing is triggered by issuing a write on the group device. First it will perform a few checks on the device in order to ensure that message_size
is less than the maximum allowed
if (lrn > max_message_size) return -EFBIG;
Then it will check if there is still space available on the device, this is implemented by means of a local read and compare and swap, in order to make this operation robust towards concurrency
do{
old_storage_size = curr_storage_size;
new_storage_size = old_storage_size + lrn + sizeof(ipc_message);
if (new_storage_size > max_storage_size) {
return -ENOSPC;
}
} while (__sync_bool_compare_and_swap(&curr_storage_size,
old_storage_size,
new_storage_size) == false );
Basically what it does is to
- Read the current space usage
- Increase it locally if space is available
- Update the global variable if it didn't change in the meanwhile
Then it allocates space for the message and initialize the internal fields
msg = kmalloc(sizeof(ipc_message), GFP_USER);
if (msg == NULL) {
res = -MEM_ALLOCATION_FAILED;
goto MSG_ALLOC_FAIL;
};
payload_buf = kmalloc(lrn, GFP_USER);
if (payload_buf == NULL) {
res = -MEM_ALLOCATION_FAILED;
goto MSG_PAYLOAD_ALLOC_FAIL;
};
Then copies the buffer from user space
copied = 0;
while( copied < lrn){
res = copy_from_user(payload_buf, buf, lrn-copied);
if (res >0) copied += res;
else if (copied ==0) break;
}
And finally, it adds the message in the available or delayed message queues by means of _enqueue_message(ipc_message* msg, ipc_group_dev* group_dev)
or _enqueue_delayed_message(ipc_message* msg, ipc_group_dev* group_dev)
depending on the current delay value for the group.
If the delay is 0, the message is instantly added by taking a lock and inserting it to the list
spin_lock( &(group_dev ->lock));
(group_dev -> msg_count )++ ;
list_add_tail ( &(msg -> next), &(group_dev -> msg_list));
spin_unlock( &(group_dev ->lock));
If delay is greater than 0 it means that the message should not be available at publish time but we need to wait for an interval.
In order to implement this mechanism we'll rely on timers, so in addition to the normal list insertion we'll initialize a timer before taking the lock
hrtimer_init(timer,CLOCK_MONOTONIC,HRTIMER_MODE_REL);
timer -> function = _publish_delayed_message_handler;
spin_lock( &(group_dev ->delayed_lock));
(group_dev -> delayed_msg_count )++ ;
list_add_tail ( &(msg -> next), &(group_dev -> delayed_msg_list));
spin_unlock( &(group_dev ->delayed_lock));
hrtimer_start(timer,group_dev -> delay,HRTIMER_MODE_REL);
And we'll start it right before releasing it, the publish _publish_delayed_message_handler
will simply invoke _publish_delayed_message
, that in turn will move the message from the delayed list to the available message list
spin_lock( &(group_dev ->delayed_lock));
(group_dev -> delayed_msg_count )-- ;
__list_del_entry(&(msg->next));
spin_unlock( &(group_dev ->delayed_lock));
spin_lock( &(group_dev ->lock));
(group_dev -> msg_count )++ ;
list_add_tail ( &(msg -> next), &(group_dev -> msg_list));
spin_unlock( &(group_dev ->lock));
Retrieving a message is executed by calling read on the group file device, regardless of how many bytes are read one and only one message is consumed everytime.
In order to read one message we will take the available messages list lock, check if there is at least one message, if there is one we store a reference to it and release the lock
spin_lock( &(group_dev ->lock));
if (group_dev -> msg_count == 0){
spin_unlock( &(group_dev ->lock));
DEBUG("no msg found");
return -NO_MESSAGES;
}
msg = list_first_entry(&(group_dev -> msg_list), ipc_message, next);
(group_dev -> msg_count )-- ;
__list_del_entry(&(msg->next));
spin_unlock( &(group_dev ->lock));
Then we copy the content of the message to userspace
to_copy = lrn > msg -> payload_len ? msg -> payload_len : lrn;
while( copied < to_copy){
res = copy_to_user(buf, msg->payload, to_copy-copied);
if (res >0) copied += res;
else if (copied ==0) break;
}
Free the message and update the space usage counter
payload_len = msg -> payload_len;
kfree(msg->payload);
kfree(msg);
__sync_sub_and_fetch( &curr_storage_size, payload_len + sizeof(ipc_message));
Setting the message send delay is implemented inside the ioctl
of the ipc_group
, and basically consist in changing a single variable
group_dev -> delay = ktime_set((int)ioctl_param,0);
Flushing messages requires to make all delayed messages available, in order to accomplish this, we may simply move them from one list to the other, but we also need to take care of timer expiration or concurrent execution. Finally we do not want to take the lock for a long time.
In order to accomplish this we'll create a temporary head for the list tmp_list
INIT_LIST_HEAD(&tmp_list);
Then we'll take the lock, move there all the messages from the delayed_msg_list
and release the lock
spin_lock( &(group_dev ->delayed_lock));
if (group_dev -> delayed_msg_count == 0 ) {
return SUCCESS;
} else {
list_splice_init( &(group_dev -> delayed_msg_list), &tmp_list );
};
spin_unlock( &(group_dev ->delayed_lock));
Now we can safely operate on the temporary list, but there may still be timer in execution or close to expiration, for this reason we'll need to iterate over the list, stop all timers and count how many of them did we actually cancel.
canceled = 0;
list_for_each_entry_safe(tmp_msg, _tmp_msg, &tmp_list, next){
canceled += 1 - hrtimer_cancel(&(tmp_msg->timer));
}
__sync_sub_and_fetch(&(group_dev -> delayed_msg_count), canceled);
Finally we will take the lock and move all the remaining messages to the available messages listmsg_list
spin_lock( &(group_dev ->lock));
list_splice( &tmp_list, &(group_dev -> msg_list) );
group_dev -> msg_count += canceled ;
spin_unlock( &(group_dev ->lock));
Revoking basically works in the same way of flushing messages, with the only exception that, once we have our local list of messages to be revoked, instead of adding them to available list, we'll deallocate them
list_for_each_entry_safe(tmp_msg, _tmp_msg, &tmp_list, next){
__list_del_entry(&(tmp_msg -> next) );
payload_len = tmp_msg -> payload_len;
kfree(tmp_msg -> payload);
kfree(tmp_msg);
__sync_sub_and_fetch( &curr_storage_size, payload_len + sizeof(ipc_message));
}
The synchronization subsystem is implemented by relying on the waitqueue mechanism. Basically each groups stores two counters:
int waiting_count
for the sleeping processesint awaking_count
for the sleeping process that are waiting to be awoken
int pos;
int attempts=0;
pos = __sync_add_and_fetch( &(group_dev -> waiting_count), 1);
wait_event_interruptible(group_dev -> wait_queue,
(attempts++ >0) && (group_dev -> awaking_count > 0));
__sync_sub_and_fetch( &(group_dev -> awaking_count), 1);
When a process invokes the _sleep_on_barrier(ipc_group_dev* group_dev)
routine, it will simply
- Increase the
waiting_count
- Invoke
wait_event_interruptible
on the condition(attempts++ >0) && (group_dev -> awaking_count > 0)
where(attempts++ >0)
is used to ensure that this process will enter the sleeping phase at least once, preventing the case in which a process tries to sleep in the moment in which orther processing are awaking. - Decrease
awaking_count
The awaking procedure is implemented by _awake_barrier
int to_awake;
to_awake = __sync_fetch_and_and( &(group_dev -> waiting_count), 0);
__sync_add_and_fetch( &(group_dev -> awaking_count), to_awake);
wake_up_nr(&(group_dev -> wait_queue), to_awake);
And it basically consist of
-
Fetch the number of sleeping processes and set it to 0
-
Increase the counter of processes to be awoken `
-
wake up the same amount of processes
The testing has been written using a test program in c that basically installs and opens a target group and, while keeping it open ( in order to avoid triggering flushes ) spawns a thread that perform one of the sleep | awake | send | recv | flush | revoke | set_delay
operations in loop.
It also accepts 3 optional arguments:
-
--repeat
that sets the number of loop iterations -
--delay
that sets the delay between successive loop iterations -
--group
that sets the group where operations are performed
This allows to combine the operation in various ways, for example I created a bash script spawning 4 sender processess and 4 receiver processes with a parametric value for --repeat
#! /bin/bash
REPEATS=${REPEATS:-100}
echo "Testing with 4 sender/receiver threads and $REPEATS repeats"
./test send "--repeat=$REPEATS" --delay=0 --group=2 &
./test send "--repeat=$REPEATS" --delay=0 --group=2 &
./test send "--repeat=$REPEATS" --delay=0 --group=2 &
./test send "--repeat=$REPEATS" --delay=0 --group=2 &
./test recv "--repeat=$REPEATS" --delay=0 --group=2 &
./test recv "--repeat=$REPEATS" --delay=0 --group=2 &
./test recv "--repeat=$REPEATS" --delay=0 --group=2 &
./test recv "--repeat=$REPEATS" --delay=0 --group=2 &
for job in `jobs -p`
do
wait $job
done
I tested the above program in the following settings:
- Ubuntu 20.04 under Virtualbox with 4 cores assigned
- Physical machine is a MacBook Pro mounting an I7-7700HQ ( 2.80 GHz base speed with 4 cores / 8 threads )
- 4 different values for
REPEATS
variable: 10, 1000, 10k and 50k
I obtained the following result
Testing with 4 sender/receiver threads and 10 repeats each
0.01 user
0.00 system
0.01 elapsed
Testing with 4 sender/receiver threads and 1000 repeats each
0.04 user
0.40 system
0.33 elapsed
Testing with 4 sender/receiver threads and 10000 repeats each
0.47 user
2.83 system
3.00 elapsed
Testing with 4 sender/receiver threads and 10000 repeats each
1.89 user
18.30 system
18.68 elapsed