Python과 Redis를 활용하여 Pub/Sub 모델 구현

Updated:

Python과 Redis를 활용하여 Pub/Sub을 구현하였다. Redis는 고성능 In-memory의 key-value 저장소이다. 램에 데이터를 저장하기 때문에 빠른 응답 시간을 제공한다. 이런 특징으로 인하여 Redis는 캐싱 서버에 널리 사용되고 있다. Redis는 메시지 브로커로서의 기능도 제공하고 있어서, 채널을 통해 메시지를 publish하고 subscribe 할 수 있다. Python으로 Redis가 제공하는 메시지 브로커 기능을 사용하여 Pub/Sub 모델을 구현해보았다.

1. 소스 코드 다운로드

아래 git repo를 클론해서 코드를 받아오자. 샘플 코드는 아래 git repo에 모두 담겨있다.

$ git clone https://github.com/devinlife/python-redis-pubsub.git

동기 방식과 비동기 방식의 코드를 각각 Docker compose로 작성하여 손쉽게 데모를 구동할 수 있다. 동기 방식은 synchronous에서 비동기 방식은 asynchronous 폴더에서 코드를 확인할 수 있다.

2. 동기 방식(Synchronous)

아래 명령어로 docker compose를 빌드하고 실행해본다. 샘플 코드에서는 메시지를 게속 보내서 이를 subscriber가 이를 수신하는 로그를 볼 수 있다.

$ cd synchronous
$ docker compose up --build

발행자 코드(publisher.py)는 아래와같다. publish 메소드가 채널에 메시지를 보내는 부분이다.

while True:
    message = f"send mesagge : {time.time()}"
    print(message, flush=True)
    r.publish('my-sync-channel', message)
    time.sleep(3)

구독자 코드(subscriber.py)는 아래와같다. subscriber 메소드로 수신할 채널을 설정한다.

p.subscribe("my-sync-channel")

while True:
    message = p.get_message()
    if message:
        print(f"received messeage : {message}", flush=True)
    time.sleep(0.01)

3. 비동기 방식(Asynchronous)

아래 명령어로 docker compose를 빌드하고 실행해본다. 샘플 코드에서는 메시지를 게속 보내서 이를 subscriber가 이를 수신하는 로그를 볼 수 있다.

$ cd asynchronous
$ docker compose up --build

발행자 코드(publisher.py)는 아래와같다. publish 메소드가 채널에 메시지를 보내는 부분이다. 비동기 동작을 위해서 asyncio.run으로 이벤트 루프를 생성한다. async_pub 함수는 이벤트 루프에 추가되어 비동기 동작을 수행한다.

async def async_pub():
    async with redis.Redis(host='redis', port=6379) as r:
        try:
            while True:
                message = f"send mesagge : {time.time()}"
                await r.publish("my-async-channel", message)
                await asyncio.sleep(3)
        except redis.RedisError as error:
            print(f"Redis error occurred: {error}")

if __name__ == "__main__":
    asyncio.run(async_pub())

구독자 코드(subscriber.py)는 아래와같다. subscriber 메소드로 수신할 채널을 설정한다. 발행자 코드와 동일하게 asyncio.run으로 이벤트 루프를 생성한다. get_message 메소드로 메시지를 수신한다.

async def reader(channel: redis.client.PubSub):
    while True:
        message = await channel.get_message(ignore_subscribe_messages=True)
        if message is not None:
            print(f"(Reader) Message Received: {message}", flush=True)
            if message["data"].decode() == STOPWORD:
                print("(Reader) STOP")
                break

async def main():
    r = redis.from_url("redis://redis:6379")
    async with r.pubsub() as pubsub:
        await pubsub.subscribe("my-async-channel")
        await asyncio.create_task(reader(pubsub))

if __name__ == "__main__":
    asyncio.run(main())

4. 결론

Python과 Redis를 활용하여 Pub/Sub 모델을 간단히 구현하였다. 동기 방식과 비동기 방식의 모두 구현하였으니 원하는 방식의 코드를 참고할 수 있다. 전체 코드는 git repo를 참조하고 POC 수준의 코드 작성을 하였으니 요구 사항에 맞춰서 코드를 추가 구현하여야 한다.

Leave a comment