Python과 Redis를 활용하여 Pub/Sub 모델 구현
Updated:
Python과 Redis를 활용하여 Pub/Sub을 구현하였다. Redis는 고성능 In-memory의 key-value 저장소이다. 램에 데이터를 저장하기 때문에 빠른 응답 시간을 제공한다. 이런 특징으로 인하여 Redis는 캐싱 서버에 널리 사용되고 있다. Redis는 메시지 브로커로서의 기능도 제공하고 있어서, 채널을 통해 메시지를 publish하고 subscribe 할 수 있다. Python으로 Redis가 제공하는 메시지 브로커 기능을 사용하여 Pub/Sub 모델을 구현해보았다.
1. 소스 코드 다운로드
아래 git repo를 클론해서 코드를 받아오자. 샘플 코드는 아래 git repo에 모두 담겨있다.
$ git clone https://github.com/devinlife/python-redis-pubsub.git
동기 방식과 비동기 방식의 코드를 각각 Docker compose로 작성하여 손쉽게 데모를 구동할 수 있다. 동기 방식은 synchronous에서 비동기 방식은 asynchronous 폴더에서 코드를 확인할 수 있다.
2. 동기 방식(Synchronous)
아래 명령어로 docker compose를 빌드하고 실행해본다. 샘플 코드에서는 메시지를 게속 보내서 이를 subscriber가 이를 수신하는 로그를 볼 수 있다.
$ cd synchronous
$ docker compose up --build
발행자 코드(publisher.py)는 아래와같다. publish 메소드가 채널에 메시지를 보내는 부분이다.
while True:
message = f"send mesagge : {time.time()}"
print(message, flush=True)
r.publish('my-sync-channel', message)
time.sleep(3)
구독자 코드(subscriber.py)는 아래와같다. subscriber 메소드로 수신할 채널을 설정한다.
p.subscribe("my-sync-channel")
while True:
message = p.get_message()
if message:
print(f"received messeage : {message}", flush=True)
time.sleep(0.01)
3. 비동기 방식(Asynchronous)
아래 명령어로 docker compose를 빌드하고 실행해본다. 샘플 코드에서는 메시지를 게속 보내서 이를 subscriber가 이를 수신하는 로그를 볼 수 있다.
$ cd asynchronous
$ docker compose up --build
발행자 코드(publisher.py)는 아래와같다. publish 메소드가 채널에 메시지를 보내는 부분이다. 비동기 동작을 위해서 asyncio.run으로 이벤트 루프를 생성한다. async_pub 함수는 이벤트 루프에 추가되어 비동기 동작을 수행한다.
async def async_pub():
async with redis.Redis(host='redis', port=6379) as r:
try:
while True:
message = f"send mesagge : {time.time()}"
await r.publish("my-async-channel", message)
await asyncio.sleep(3)
except redis.RedisError as error:
print(f"Redis error occurred: {error}")
if __name__ == "__main__":
asyncio.run(async_pub())
구독자 코드(subscriber.py)는 아래와같다. subscriber 메소드로 수신할 채널을 설정한다. 발행자 코드와 동일하게 asyncio.run으로 이벤트 루프를 생성한다. get_message 메소드로 메시지를 수신한다.
async def reader(channel: redis.client.PubSub):
while True:
message = await channel.get_message(ignore_subscribe_messages=True)
if message is not None:
print(f"(Reader) Message Received: {message}", flush=True)
if message["data"].decode() == STOPWORD:
print("(Reader) STOP")
break
async def main():
r = redis.from_url("redis://redis:6379")
async with r.pubsub() as pubsub:
await pubsub.subscribe("my-async-channel")
await asyncio.create_task(reader(pubsub))
if __name__ == "__main__":
asyncio.run(main())
4. 결론
Python과 Redis를 활용하여 Pub/Sub 모델을 간단히 구현하였다. 동기 방식과 비동기 방식의 모두 구현하였으니 원하는 방식의 코드를 참고할 수 있다. 전체 코드는 git repo를 참조하고 POC 수준의 코드 작성을 하였으니 요구 사항에 맞춰서 코드를 추가 구현하여야 한다.
Leave a comment