Con esta nueva serie de tutoriales vamos a explorar e implementar con diferentes lenguajes de programación la funcionalidad de un poderoso tipo de programas: los servicios. Un servicio es un programa que espera la interacción de otros programas para realizar una tarea en segundo plano con un contexto independiente.
Empezaremos comunicando dos servicios utilizando colas de mensajes POSIX. Como cada servicio utiliza su propio contexto podemos implementar cada servicio con un lenguaje de programación diferente. Para nuestro ejemplo veremos su implementación en C++, Python y Rust.
IPC con colas de mensajes POSIX
IPC y memoria compartida
Vamos a crear una cola para almacenar las peticiones y otra para las respuestas. Con esta estructura podríamos tener multiples trabajadores (Servicio B) por un solo servidor (Servicio A) y distribuir todo el trabajo a través de múltiples procesos diferentes.
Podemos empezar con el módulo que gestiona las colas de mensajes. Su primer cometido es inicializar la cola con los permisos que le corresponden a cada tipo de servicio. Un host tiene que poder escribir en la cola que contiene las peticiones y leer en la que tiene las respuestas, por el contrario un wroker tiene que leer de la cola de peticiones y escribir en la cola de respuestas.
Ambas colas se abren con el flag O_CREAT para crear la cola en el sistema en caso de que no exista, con el flag O_WRONLY la marcamos para que se pueda escribir en ella y con el flag O_RDONLY para que se pueda escribir.
Vamos a reutilizar el mismo manager tanto en el host como en el worker, solo tendremos que intercambiar los nombres de las colas en el worker cuando iniciemos su gestor de colas.
mq_handler.py
from posix_ipc import MessageQueue, O_RDONLY, O_WRONLY, O_CREAT
from pcube.common.enums import EExitCode
from pcube.common.logger import log
class MQHandler:
MAX_MESSAGE_SIZE = 512
def __init__(self):
self.mq_request = None
self.mq_response = None
def connect(self, mq_request_name: str, mq_response_name: str) -> EExitCode:
exit_code = EExitCode.SUCCESS
self.mq_request = MessageQueue(mq_request_name,
O_CREAT | O_WRONLY,
max_message_size=MQHandler.MAX_MESSAGE_SIZE,
max_messages=1,
read = False,
write = True)
if self.mq_request is None:
log(f"Error opening with mq_request")
exit_code = EExitCode.FAIL
self.mq_response = MessageQueue(mq_response_name,
O_CREAT | O_RDONLY,
max_message_size=MQHandler.MAX_MESSAGE_SIZE,
max_messages=1,
read = True,
write = False)
if self.mq_response is None:
log(f"Error opening with mq_response")
exit_code = EExitCode.FAIL
return exit_code
def disconnect(self, unlink: bool) -> EExitCode:
exit_code = EExitCode.SUCCESS
if self.mq_request:
self.mq_request.close()
if unlink:
self.mq_request.unlink()
self.mq_request = None
if self.mq_response:
self.mq_response.close()
if unlink:
self.mq_response.unlink()
self.mq_response = None
return exit_code
def send_wait(self, message: str) -> EExitCode:
try:
log(f"Sending message '{message}'")
self.mq_request.send(message=message, timeout=None)
return EExitCode.SUCCESS
except KeyboardInterrupt as ex:
log(f"Safe KeyboardInterrupt")
return EExitCode.FAIL
except Exception as ex:
log(f"Error mq_send {ex}")
return EExitCode.FAIL
def receive_wait(self) -> tuple[str, EExitCode]:
try:
message, _ = self.mq_response.receive(timeout=None)
decoded_message = message.decode()
log(f"Received message '{decoded_message}'")
return decoded_message, EExitCode.SUCCESS
except KeyboardInterrupt as ex:
log(f"Safe KeyboardInterrupt")
return f"{ex}", EExitCode.FAIL
except Exception as ex:
log(f"Error mq_receive {ex}")
return f"{ex}", EExitCode.FAILversiones cpp y rust: Github: T-Services
El servicio conectará a la cola en su método run y desconectará en su finalización.
service.py – start/stop_listener
def start_listener(self) -> bool:
self._listening = True
exit_code = self._mq_handler.connect(self._config.q_name_host, self._config.q_name_worker)
if exit_code == EExitCode.SUCCESS:
log(f"Service start listening : host({self._config.is_host})")
return True
return False
def stop_listener(self):
self._listening = False
log("Service stop listening")
self._mq_handler.disconnect(self._config.is_host)El método run comienza connectando a ambas colas mediante el gestor de colas, y si es un servicio host enviaremos la primera petición quedando indefinidamente a la espera de su respuesta.
host/service.py – run
def run(self) -> EExitCode:
exit_code = EExitCode.SUCCESS
if self.start_listener():
self._mq_handler.send_wait("task-1")
while self._listening:
message, status = self._mq_handler.receive_wait()
if status == EExitCode.SUCCESS:
self.stop_listener()
else:
exit_code = EExitCode.FAIL
self.stop_listener()
else:
log("Unable to init listener")
exit_code = EExitCode.FAIL
return exit_codeEl worker estará esperando indefinidamente en su listener una nueva tarea, cuando la reciba enviará una respuesta al host mediante la cola de respuesta.
worker/service.py – run
def run(self) -> EExitCode:
exit_code = EExitCode.SUCCESS
if self.start_listener():
while self._listening:
message, status = self._mq_handler.receive_wait()
if status == EExitCode.SUCCESS:
self._mq_handler.send_wait(f"{message} processed")
self.stop_listener()
else:
exit_code = EExitCode.FAIL
self.stop_listener()
else:
log("Unable to init listener")
exit_code = EExitCode.FAIL
return exit_codeFinalmente el código completo de ambos tipos de servicio:
host/service.py
from pcube.common.logger import log
from pcube.common.enums import EExitCode
from pcube.common.mq_handler import MQHandler
from pcube.common.service_config import ServiceConfig
class Service:
def __init__(self, config: ServiceConfig):
self._config: ServiceConfig = config
self._listening = False
self._mq_handler = MQHandler()
def start_listener(self) -> bool:
self._listening = True
exit_code = self._mq_handler.connect(self._config.q_name_host, self._config.q_name_worker)
if exit_code == EExitCode.SUCCESS:
log(f"Service start listening : host({self._config.is_host})")
return True
return False
def stop_listener(self):
self._listening = False
log("Service stop listening")
self._mq_handler.disconnect(self._config.is_host)
def run(self) -> EExitCode:
exit_code = EExitCode.SUCCESS
if self.start_listener():
self._mq_handler.send_wait("task-1")
while self._listening:
message, status = self._mq_handler.receive_wait()
if status == EExitCode.SUCCESS:
self.stop_listener()
else:
exit_code = EExitCode.FAIL
self.stop_listener()
else:
log("Unable to init listener")
exit_code = EExitCode.FAIL
return exit_codehost/worker.py
from pcube.common.logger import log
from pcube.common.enums import EExitCode
from pcube.common.mq_handler import MQHandler
from pcube.common.service_config import ServiceConfig
class Service:
def __init__(self, config: ServiceConfig):
self._config: ServiceConfig = config
self._listening = False
self._mq_handler = MQHandler()
def start_listener(self) -> bool:
self._listening = True
exit_code = self._mq_handler.connect(self._config.q_name_host, self._config.q_name_worker)
if exit_code == EExitCode.SUCCESS:
log(f"Service start listening : host({self._config.is_host})")
return True
return False
def stop_listener(self):
self._listening = False
log("Service stop listening")
self._mq_handler.disconnect(self._config.is_host)
def run(self) -> EExitCode:
exit_code = EExitCode.SUCCESS
if self.start_listener():
while self._listening:
message, status = self._mq_handler.receive_wait()
if status == EExitCode.SUCCESS:
self._mq_handler.send_wait(f"{message} processed")
self.stop_listener()
else:
exit_code = EExitCode.FAIL
self.stop_listener()
else:
log("Unable to init listener")
exit_code = EExitCode.FAIL
return exit_codeVersion cpp y rust: Github: T-Services
Al iniciar un host y un worker podemos ver la interacción que se produce entre ambos antes de su finalización.
Conclusion
Con este código hemos terminado la primera parte de un servicio, la comunicación con otros servicios. Hemos utilizado una cola de mensajes pero otras primitivas como pipes, semáforos,.. pueden ser igual de válidas para este propósito.
Ahora que tenemos implementada la comunicación básica nos podemos poner a añadir más funciones interesantes como el uso de memoria compartida, pero lo veremos en el próximo tutorial.
Tutorial files
Te puede interesar:
Ayudanos con este blog!
En el último año hemos estado dedicando cada vez más tiempo a la creación de tutoriales, en su mayoria sobre desarrollo de videojuegos. Si crees que estos posts te han ayudado de alguna manera o incluso inspirado, por favor considera ayudarnos a mantener este blog con alguna de estas opciones. Gracias por hacerlo posible!









