in Linux, Tutorials

Services design: IPC

With this tutorial series we are going to explore and implement with multiple code languages the functionality of a powerful type of programs: the services. A service is a program that waits for other program interaction to perform a task in background with an independent context.

We can start communicating two services using POSIX message queues. As each service can use their own context we can implement each service with different coding language. For our examples we are going to view their implementation using C++, Python and Rust language.

Service IPC: POSIX message queue

We are going to setup one queue to store the requests and other to store the responses. With this structure we could have multiple workers (Services B) for one host (Service A) and parallelize work across multiple processes.

So we can start with the module that manage the message queue. Their first job is to initialize the queue with the correct permissions for each type of service. A host must be able to write in the requests queue and read in the responses queue, by contrast the worker must read in the requests queue and write in the responses queue.

Both queues needs the O_CREAT flag to create the queue in the system if does not exists, the flag O_WRONLY in the queue that writes and O_RDONLY in the queue that need to be read.

We can reuse the same code for host and worker and just exchange the name of the request and response queue for the worker in the connect function call.

mq_handler.py

from posix_ipc import MessageQueue, O_RDONLY, O_WRONLY, O_CREAT

from pcube.common.enums import EExitCode
from pcube.common.logger import log

class MQHandler:
    MAX_MESSAGE_SIZE = 512

    def __init__(self):
        self.mq_request = None
        self.mq_response = None

    def connect(self, mq_request_name: str, mq_response_name: str) -> EExitCode:
        exit_code = EExitCode.SUCCESS
        self.mq_request = MessageQueue(mq_request_name, 
                                        O_CREAT | O_WRONLY, 
                                        max_message_size=MQHandler.MAX_MESSAGE_SIZE, 
                                        max_messages=1, 
                                        read = False, 
                                        write = True)
        if self.mq_request is None:
            log(f"Error opening with mq_request")
            exit_code = EExitCode.FAIL

        self.mq_response = MessageQueue(mq_response_name, 
                                        O_CREAT | O_RDONLY, 
                                        max_message_size=MQHandler.MAX_MESSAGE_SIZE, 
                                        max_messages=1, 
                                        read = True, 
                                        write = False)
        if self.mq_response is None:
            log(f"Error opening with mq_response")
            exit_code = EExitCode.FAIL
        return exit_code

    def disconnect(self, unlink: bool) -> EExitCode: 
        exit_code = EExitCode.SUCCESS
        if self.mq_request:
            self.mq_request.close()
            if unlink:
                self.mq_request.unlink()
            self.mq_request = None
        if self.mq_response:
            self.mq_response.close()
            if unlink:
                self.mq_response.unlink()
            self.mq_response = None
        return exit_code
    
    def send_wait(self, message: str) -> EExitCode:
        try:
            log(f"Sending message '{message}'")
            self.mq_request.send(message=message, timeout=None)
            return EExitCode.SUCCESS
        except KeyboardInterrupt as ex:
            log(f"Safe KeyboardInterrupt")
            return EExitCode.FAIL
        except Exception as ex:
            log(f"Error mq_send {ex}")
            return EExitCode.FAIL

    def receive_wait(self) -> tuple[str, EExitCode]:
        try:
            message, _ = self.mq_response.receive(timeout=None)
            decoded_message = message.decode()
            log(f"Received message '{decoded_message}'")
            return decoded_message, EExitCode.SUCCESS
        except KeyboardInterrupt as ex:
            log(f"Safe KeyboardInterrupt")
            return f"{ex}", EExitCode.FAIL
        except Exception as ex:
            log(f"Error mq_receive {ex}")
            return f"{ex}", EExitCode.FAIL

cpp and rust versions: Github: T-Services

The service will connect the queue in their run method and disconnect in their finalization.

service.py – start/stop_listener

    def start_listener(self) -> bool:
        self._listening = True
        exit_code = self._mq_handler.connect(self._config.q_name_host, self._config.q_name_worker)
        if exit_code == EExitCode.SUCCESS:
            log(f"Service start listening : host({self._config.is_host})")
            return True
        return False

    def stop_listener(self):
        self._listening = False
        log("Service stop listening")
        self._mq_handler.disconnect(self._config.is_host)

The run function for the service starts connecting to both queues using the mq_handler, and if it is a host send the request for the first task and waits indefinitely for their response.

host/service.py – run

    def run(self) -> EExitCode:
        exit_code = EExitCode.SUCCESS
        if self.start_listener():

            self._mq_handler.send_wait("task-1")
            while self._listening:
                message, status = self._mq_handler.receive_wait()
                if status == EExitCode.SUCCESS:
                    self.stop_listener()
                else:
                    exit_code = EExitCode.FAIL
                    self.stop_listener()
        else:
            log("Unable to init listener")
            exit_code = EExitCode.FAIL
        return exit_code

The worker is waiting indefinitely for a request in their listener, and when receives a new one send the response to the host using the responses queue.

worker/service.py – run

    def run(self) -> EExitCode:
        exit_code = EExitCode.SUCCESS
        if self.start_listener():

            while self._listening:
                message, status = self._mq_handler.receive_wait()
                if status == EExitCode.SUCCESS:
                    self._mq_handler.send_wait(f"{message} processed")
                    self.stop_listener()
                else:
                    exit_code = EExitCode.FAIL
                    self.stop_listener()
        else:
            log("Unable to init listener")
            exit_code = EExitCode.FAIL
        return exit_code

Finally the full code for both service types:

host/service.py

from pcube.common.logger import log
from pcube.common.enums import EExitCode
from pcube.common.mq_handler import MQHandler
from pcube.common.service_config import ServiceConfig

class Service:
    def __init__(self, config: ServiceConfig):
        self._config: ServiceConfig = config
        self._listening = False
        self._mq_handler = MQHandler()

    def start_listener(self) -> bool:
        self._listening = True
        exit_code = self._mq_handler.connect(self._config.q_name_host, self._config.q_name_worker)
        if exit_code == EExitCode.SUCCESS:
            log(f"Service start listening : host({self._config.is_host})")
            return True
        return False

    def stop_listener(self):
        self._listening = False
        log("Service stop listening")
        self._mq_handler.disconnect(self._config.is_host)

    def run(self) -> EExitCode:
        exit_code = EExitCode.SUCCESS
        if self.start_listener():

            self._mq_handler.send_wait("task-1")
            while self._listening:
                message, status = self._mq_handler.receive_wait()
                if status == EExitCode.SUCCESS:
                    self.stop_listener()
                else:
                    exit_code = EExitCode.FAIL
                    self.stop_listener()
        else:
            log("Unable to init listener")
            exit_code = EExitCode.FAIL
        return exit_code

host/worker.py

from pcube.common.logger import log
from pcube.common.enums import EExitCode
from pcube.common.mq_handler import MQHandler
from pcube.common.service_config import ServiceConfig

class Service:
    def __init__(self, config: ServiceConfig):
        self._config: ServiceConfig = config
        self._listening = False
        self._mq_handler = MQHandler()

    def start_listener(self) -> bool:
        self._listening = True
        exit_code = self._mq_handler.connect(self._config.q_name_host, self._config.q_name_worker)
        if exit_code == EExitCode.SUCCESS:
            log(f"Service start listening : host({self._config.is_host})")
            return True
        return False

    def stop_listener(self):
        self._listening = False
        log("Service stop listening")
        self._mq_handler.disconnect(self._config.is_host)

    def run(self) -> EExitCode:
        exit_code = EExitCode.SUCCESS
        if self.start_listener():

            while self._listening:
                message, status = self._mq_handler.receive_wait()
                if status == EExitCode.SUCCESS:
                    self._mq_handler.send_wait(f"{message} processed")
                    self.stop_listener()
                else:
                    exit_code = EExitCode.FAIL
                    self.stop_listener()
        else:
            log("Unable to init listener")
            exit_code = EExitCode.FAIL
        return exit_code

cpp and rust versions: Github: T-Services

We can run a host and a worker to see the interaction of both services using POSIX message queues

service_posix_mq_com

Conclusion

With this code we have finished the first part of a service program, the communication with other services. We have use a message queue but other primitives like pipes, semaphores,.. can be valid too for this purpose.

Now that we have the basic communication we can start adding more interesting features like a shared memory handler, but in the next tutorial part.

Tutorial files

Support this blog!

For the past year I've been dedicating more of my time to the creation of tutorials, mainly about game development. If you think these posts have either helped or inspired you, please consider supporting this blog. Thank you so much for your contribution!

Write a Comment

Comment