ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Publish/Subscribe 패턴
    Java 2022. 6. 18. 23:15

    PUB/SUB pattern이 뭘까?

     

     Publisher(전송자)가 특정 subscriber(수신자)에게 직접 메시지를 보내도록 프로그래밍하지 않고, subscriber에 대한 정보는 갖지 않은 채로, 발행된 메시지를 클래스로 분류하는 것이다.

     

    신문기사나 잡지, 유튜브의 구독과 같은 방법이라고 생각하면 쉽다. 발행자는 자신이 모르는, 알 필요 없는 구독자에게 메시지를 전파한다. 구독을 신청한 구독자는 신문 등을 받을 수 있다.

     

    PUB/SUB pattern의 장점

    느슨한 결합: 메시지 전달과정에서 message broker나 event bus 등을 사용하기에 구독자에 관한 정보는 publisher에서 신경쓰지 않아도 된다. 이 때문에 publisher와 subscriber는 낮은 결합도를 갖는다.

     

    확장성: 병렬 운영, 메시지 캐싱, 트리기반/네트워크 기반 라우팅 등을 통해 더 나은 확장성을 갖는다.

     

     

    PUB/SUB pattern구현

    Topic 객체는 메시지 큐 뿐만 아니라 publisher, subscriber에 대한 정보도 갖고 있다. 이를 Channel과 Mailman이 참조해서 메시지를 전달한다.

    ※클래스, 메서드 등에 사용된 어휘가 주관적이거나 작성된 로직이 pub/sub 개념과 다소 다를 수 있습니다😅

    https://github.com/ghchoi0427/SocketServer/tree/master/src/main/java/pubandsub

     

    GitHub - ghchoi0427/SocketServer: 소켓을 이용한 서버

    소켓을 이용한 서버. Contribute to ghchoi0427/SocketServer development by creating an account on GitHub.

    github.com

     

    • Topic
    package pubandsub;
    
    import java.net.Socket;
    import java.util.ArrayList;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Queue;
    
    public class Topic {
    
        public static Queue<String> messageQueue;
        private List<Socket> publishers;
        private List<Socket> subscribers;
        public static Topic topic;
    
        public static Topic getInstance() {
            if (topic == null) {
                topic = new Topic();
            }
            return topic;
        }
    
        public List<Socket> getSubscribers() {
            return subscribers;
        }
    
        public void addPublisher(Socket pub) {
            publishers.add(pub);
        }
    
        public void addSubscriber(Socket sub) {
            subscribers.add(sub);
        }
    
        public Topic() {
            messageQueue = new LinkedList<>();
            publishers = new ArrayList<>();
            subscribers = new ArrayList<>();
        }
    
        public Queue<String> getMessageQueue() {
            return messageQueue;
        }
    
        public void offerMessage(String message) {
            System.out.println("[TOPIC] offer message: " + message);
            messageQueue.offer(message);
        }
    
        public String pollMessage() {
            if (messageQueue.isEmpty()) {
                return null;
            }
            System.out.println("[TOPIC] poll message");
            return messageQueue.poll();
        }
    }

    구독자, 발행자, 메시지 큐를 갖고 있는 Topic 클래스를 생성했다.

     

    Socket을 담는 List로 구독자, 발행자를 저장한다.

     

    발행자는 메시지 큐에 offer, 구독자는 poll한다.

     

    Topic은 싱글톤 패턴으로 생성한다.

     

    • Channel
    package pubandsub;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.Socket;
    import java.nio.charset.StandardCharsets;
    
    public class Channel implements Runnable {
        Socket socket;
        byte[] bytes;
        InputStream is;
        Topic topic;
        String message = null;
    
        public Channel(Socket socket) {
            this.socket = socket;
            topic = Topic.getInstance();
            bytes = new byte[100];
        }
    
        @Override
        public void run() {
            while (true) {
                int readByteCount = 0;
                try {
                    is = socket.getInputStream();
                    readByteCount = is.read(bytes);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                message = new String(bytes, 0, readByteCount, StandardCharsets.UTF_8);
                topic.offerMessage(message);
    
                if (socket.isClosed()) {
                    break;
                }
            }
        }
    }

    Channel 클래스는 publisher들이 보낸 메시지를 Topic의 메시지 큐에 저장하는 Runnable 구현체이다.

     

    • Mailman
    package pubandsub;
    
    import java.io.IOException;
    import java.net.Socket;
    import java.nio.charset.StandardCharsets;
    
    public class Mailman implements Runnable {
        Topic topic;
        byte[] bytes;
    
        public Mailman() {
            topic = Topic.getInstance();
            bytes = new byte[100];
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (!topic.getMessageQueue().isEmpty()) {
                    try {
                        bytes = topic.pollMessage().getBytes(StandardCharsets.UTF_8);
    
                        for (Socket s : topic.getSubscribers()) {
                            s.getOutputStream().write(bytes);
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    Mailman 클래스는 Topic의 메시지 큐에 쌓여있는 메시지를 순차적으로 모든 subscriber들에게 뿌려주는 runnable 구현체이다.

     

    간단하게 만들어보긴 했는데 정확하게 pub-sub의 개념과 맞는지는 모르겠다.

    여러 송신자의 메시지를 다중화한 다음에 구분없이 모든 수신자에게 보내주는 방식에 가깝다고 생각된다.

    이번에 구현된 코드에서는 publisher를 구분하지 않고 모든 발행자를 구독하게 된다.

     

     

     

     

     

Designed by Tistory.