Часто возникает задача разработки системы, включающей в себя несколько независимых модулей. Эти модули могут быть расположены на разных компьютерах и выполнять трудоёмкие задачи. Для реализации взаимодействия между ними используются брокеры сообщений (например, rabbitmq). В этой статье мы напишем простейшего самодельного брокера на основе обычных сокетов.Создадим приложение состоящее из двух пользовательских скриптов, осуществляющих коммуникацию друг с другом через сокеты.
Первым делом следует прочесть официальную документацию Python по созданию Socket сервера. Наиболее интересен раздел 21.21.4.3. Asynchronous Mixins. Обратите внимание, что используется 3-я версия языка Python.
Создайте файл codex_queue.py и заполните его заготовкой из статьи.
# codex_queue.py
import threading
import socketserver
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
data = self.request.recv(1024)
print(data)
Каждое приложение будет иметь собственную очередь сообщений, в которую ей можно будет отправить данные. По мере необходимости приложение берёт сообщения из этой очереди и обрабатывает их.
# продолжение файла codex_queue.py
class Queue:
def __init__(self, ip, port):
self.server = ThreadedTCPServer((ip, port), ThreadedTCPRequestHandler)
self.server.queue = self
self.server_thread = threading.Thread(target=self.server.serve_forever)
self.server_thread.daemon = True
self.messages = []
В конструкторе __init__ происходит инициализация сокет-сервера для работы в отдельном потоке.
self.messages – массив сообщений. Та самая очередь.
self.server.queue = self
Данный код записывает указатель на объект очереди в сокет-сервер, чтобы к нему можно было обращаться из другого потока.
Допишем в класс вспомогательные функции для запуска и остановки сервера, а также для работы с очередью.
# продолжение файла codex_queue.py
def start_server(self):
self.server_thread.start()
print("Server loop running in thread:", self.server_thread.name)
def stop_server(self):
self.server.shutdown()
self.server.server_close()
def add(self, message):
self.messages.append(message)
def view(self):
return self.messages
def get(self):
return self.messages.pop()
def exists(self):
return len(self.messages)
Класс очереди готов. Скачать его полную версию можно по ссылке.
Теперь приступим к написанию сервера, который будет работать с этой очередью. Создадим файл server.py.
# файл server.py
import socket
import time
from codex_queue import Queue
class Server:
def __init__(self, ip, port):
self.queue = Queue(ip, port)
def start_server(self):
self.queue.start_server()
def stop_server(self):
self.queue.stop_server()
Дополнительно понадобится функция loop, которая будет в цикле проверять есть ли данные в очереди, забирать оттуда один элемент и передавать на обработку в метод handle.
def loop(self):
while True:
time.sleep(1)
while self.queue.exists():
self.handle(self.queue.get())
def handle(self, message):
""" Prototype """
pass
Готовый файл вы можете скачать по ссылке.
Теперь можно создать произвольное количество сервисов, наследуя их от класса Server и переопределяя метод handle. Мы остановимся для примера на двух скриптах – Alice и Bob. Боб будет извлекать URL сайтов из сообщения в JSON формате, делать HTTP запрос по полученному адресу и отправлять Алисе коды HTTP ответов. Алиса же просто будет выводить их на экран.
Создадим файл alice.py.
from server import Server
class Alice(Server):
def handle(self, message):
try:
print("Got: {}".format(message))
except Exception as e:
print("Error: {}".format(e))
if __name__ == "__main__":
print("Alice started.")
app = Alice("localhost", 8889)
app.start_server()
app.loop()
app.stop_server()
Запустите этот скрипт командой python alice.py. Откройте отдельную консоль и наберите там команду nc localhost 8889, введите сообщение и нажмите enter. Класс Alice поймает сообщение и выведет его на экран. Есть вы пользователь windows – установите программу putty.
Перейдем ко второму классу. Создайте файл bob.py.
import json
import requests
from server import Server
class Bob(Server):
def handle(self, message):
try:
print("Got: {}".format(message))
url = json.loads(str(message, 'ascii'))["url"]
response = requests.get(url)
except Exception as e:
print("Error: {}".format(e))
else:
result = {}
result['status_code'] = response.status_code
self.send("localhost", 8889, json.dumps(result))
if __name__ == "__main__":
print("Bob started.")
getter = Bob("localhost", 8887)
getter.start_server()
getter.loop()
getter.stop_server()
Можно видеть, что данный класс почти не отличается от предыдущего за исключением порта 8887 и функции обработки.
Осталось только написать функцию отправки сообщения на сокет. Допишем её в класс Server.
def send(self, ip, port, message):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip, port))
try:
sock.sendall(bytes(message, 'ascii'))
finally:
sock.close()
Все файлы вы можете скачать по ссылке.
Запустите оба скрипта командами python bob.py и python alice.py. Затем в отдельной консоли введите команду nc localhost 8887 и отправьте сообщение.
{"url": "http://ifmo.su"}
На основе этого кода можно создавать произвольное количество программ, расположенных на разных машинах и общающихся друг с другом посредством очередей сообщений.