Часто возникает задача разработки системы, включающей в себя несколько независимых модулей. Эти модули могут быть расположены на разных компьютерах и выполнять трудоёмкие задачи. Для реализации взаимодействия между ними используются брокеры сообщений (например, rabbitmq). В этой статье мы напишем простейшего самодельного брокера на основе обычных сокетов.Создадим приложение состоящее из двух пользовательских скриптов, осуществляющих коммуникацию друг с другом через сокеты.
Первым делом следует прочесть официальную документацию Python по созданию Socket сервера. Наиболее интересен раздел 21.21.4.3. Asynchronous Mixins. Обратите внимание, что используется 3-я версия языка Python.
Создайте файл codex_queue.py и заполните его заготовкой из статьи.
# codex_queue.py
import threading
import socketserver
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    pass
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
    def handle(self):
        data = self.request.recv(1024)
        print(data)
Каждое приложение будет иметь собственную очередь сообщений, в которую ей можно будет отправить данные. По мере необходимости приложение берёт сообщения из этой очереди и обрабатывает их.
 
    
            
    # продолжение файла codex_queue.py
class Queue:
    def __init__(self, ip, port):
        self.server = ThreadedTCPServer((ip, port), ThreadedTCPRequestHandler)
        self.server.queue = self
        self.server_thread = threading.Thread(target=self.server.serve_forever)
        self.server_thread.daemon = True
        self.messages = []
В конструкторе __init__ происходит инициализация сокет-сервера для работы в отдельном потоке.
self.messages – массив сообщений. Та самая очередь.
self.server.queue = self
Данный код записывает указатель на объект очереди в сокет-сервер, чтобы к нему можно было обращаться из другого потока.
Допишем в класс вспомогательные функции для запуска и остановки сервера, а также для работы с очередью.
# продолжение файла codex_queue.py
def start_server(self):
    self.server_thread.start()
    print("Server loop running in thread:", self.server_thread.name)
def stop_server(self):
    self.server.shutdown()
    self.server.server_close()
def add(self, message):
    self.messages.append(message)
def view(self):
    return self.messages
def get(self):
    return self.messages.pop()
def exists(self):
    return len(self.messages)
Класс очереди готов. Скачать его полную версию можно по ссылке.
Теперь приступим к написанию сервера, который будет работать с этой очередью. Создадим файл server.py.
# файл server.py
import socket
import time
from codex_queue import Queue
class Server:
    def __init__(self, ip, port):
        self.queue = Queue(ip, port)
    def start_server(self):
        self.queue.start_server()
    def stop_server(self):
        self.queue.stop_server()
Дополнительно понадобится функция loop, которая будет в цикле проверять есть ли данные в очереди, забирать оттуда один элемент и передавать на обработку в метод handle.
def loop(self):
    while True:
        time.sleep(1)
        while self.queue.exists():
            self.handle(self.queue.get())
def handle(self, message):
    """    Prototype    """
    pass
Готовый файл вы можете скачать по ссылке.
Теперь можно создать произвольное количество сервисов, наследуя их от класса Server и переопределяя метод handle. Мы остановимся для примера на двух скриптах – Alice и Bob. Боб будет извлекать URL сайтов из сообщения в JSON формате, делать HTTP запрос по полученному адресу и отправлять Алисе коды HTTP ответов. Алиса же просто будет выводить их на экран.
Создадим файл alice.py.
from server import Server
class Alice(Server):
    def handle(self, message):
        try:
            print("Got: {}".format(message))
        except Exception as e:
            print("Error: {}".format(e))
if __name__ == "__main__":
    print("Alice started.")
    app = Alice("localhost", 8889)
    app.start_server()
    app.loop()
    app.stop_server()
Запустите этот скрипт командой python alice.py. Откройте отдельную консоль и наберите там команду nc localhost 8889, введите сообщение и нажмите enter. Класс Alice поймает сообщение и выведет его на экран. Есть вы пользователь windows – установите программу putty.
Перейдем ко второму классу. Создайте файл bob.py.
import json
import requests
from server import Server
class Bob(Server):
    def handle(self, message):
        try:
            print("Got: {}".format(message))
            url = json.loads(str(message, 'ascii'))["url"]
            response = requests.get(url)
        except Exception as e:
            print("Error: {}".format(e))
        else:
            result = {}
            result['status_code'] = response.status_code
            self.send("localhost", 8889, json.dumps(result))
if __name__ == "__main__":
    print("Bob started.")
    getter = Bob("localhost", 8887)
    getter.start_server()
    getter.loop()
    getter.stop_server()
Можно видеть, что данный класс почти не отличается от предыдущего за исключением порта 8887 и функции обработки.
Осталось только написать функцию отправки сообщения на сокет. Допишем её в класс Server.
def send(self, ip, port, message):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((ip, port))
    try:
        sock.sendall(bytes(message, 'ascii'))
    finally:
        sock.close()
Все файлы вы можете скачать по ссылке.
Запустите оба скрипта командами python bob.py и python alice.py. Затем в отдельной консоли введите команду nc localhost 8887 и отправьте сообщение.
{"url": "http://ifmo.su"}
 
    
            
    На основе этого кода можно создавать произвольное количество программ, расположенных на разных машинах и общающихся друг с другом посредством очередей сообщений.
