본문 바로가기

책 읽기/스프링 책 읽기

스프링 책 읽기(Spring in action) - 17. 스프링 메시징

* 이전 장에서 알아본, RMI. Hessian, Burlap, HTTP invoker등은, 애플리케이션 간의 동기식(synchoronous) 통신.

* 하지만 애플리케이션에는 동기식만 존재하지는 않음. 비동기식(asynchronous) 메시징 이용 => JMS(Java Message Service), AMQP(Adavanced Message Queuing Protocol)등을 사용할 수 있다


1. 비동기식 메시징에 대한 간단한 소개

* 동기식의 경우에는, 원격 메소드를 호출하면 클라이언트는 메소드의 완료까지 기다린다. 심지어 원격메소드가 아무것도 반환하지 않아도 서비스가 끝나지 않으면 클라이언트는 무한 대기 상태

클라이언트가 서비스가 완료될 때까지 기다려야 함

* 비동기식의 경우, 클라이언트는 서비스가 메시지를 처리하는 것을 기다리지 않아도 된다. 메시지를 전송하면, 해당 서비스는 언젠가 처리할 것이라는 가정하에 클라이언트는 이후 동작을 수행

서비스의 완료를 기다리지 않는 클라이언트


1.1 메시지 보내기

핵심은 간접성 - 어떤 애플리케이션이 정보를 전달 할 때, 직접적인 연결을 하지 않음. 대신 전송 측 애플리케이션은 수신 측 '대행 서비스'에 메시지를 보낸다.

- 메시지 중개자(message broker) : 메시지를 전송할 때, 메시지를 건내 받는 역할을 하는 것 

- 목적지(destination) : 큐(queue) 혹은 토픽(topic)이라는 두 종류의 목적지가 있음 => 큐 : 지점대지점(point-to-point) 모델 // 토픽 : 발행-구독(publish-subscribe)모델


지점-대-지점 모델

지점 대 지점 모델에서는, 각 메시지가 정확히 하나의 전송자와 수신자를 갖는다

전송자 - 큐 - 수신자

메시지를 요청하면, 큐에 넣어둠 -> 수신자가 나타나서 큐의 메시지를 요청하여 가져가는 방식

기본적으로 메시지는 단 하나의 수신자에 전달되지만, 그렇다고 수신자가 딱 1개만 있는 것은 아님 => 어떤 수신자일지에 대한 불확실성 => 리스너를 추가하여 확장성을 높인다

==> 송신자는 어떤 종류의 수신자에 전달 되는지 알 수 있다.


발행-구독 메시징 모델

메시지가 토픽에 전송되고, 구독자는 이를 기다린다 => 큐와 마찬가지로 여러 구독자가 기다림, 하지만 큐와는 다르게 모두 같은 메시지를 수신한다...

* 발행자는 구독자가 누구인지 전혀 알 수 없다(토픽만 알고 있음)


1.2 비동기 메시징의 장점

* 대기 없음 - 대기할 필요가 없기 때문에, 자유롭게 다른 작업을 수행한다. 클라이언트 성능이 극적으로 향상

* 메시지 기반과 결합도 제거 - 데이터 중심이기 때문에. 특정 메소드 시그니쳐에 얽매이지 않음. 누구든 수신자나 구독자가 될 수 이으며, 클라이언트는 구체적인 수신자나, 메시지에 관한 것을 알 필요가 없음

* 위치 독립성 - 동기식RPC의 경우, 보통 네트워크상의 위치를 알아야하는데 반해, 메시징 클라이언트는 누가 처리자인지 알 필요가 없음. 단지 메시지를 전송할 큐와 토픽의 위치만 알면 됨 => 서비스에 발생한 부하를 처리할 때, 같은 큐를 보는 서비스 인스턴스를 추가하기만 하면 됨

* 전달 보증 - 비동기적으로 전송하는 경우에는, 언젠가 메시지가 전달되기 때문에 클라이언트는 서비스의 상태에 관하여 안심해도 됨. 전송 당시  서비스가 이용불가여도, 메시지는 어딘가에 저장되어 있다.


2. JMS(Java Message Service)로 메시지 보내기

* JMS : 메시지 브로커를 사용하여 작업하기 위한 API를 정의하는 자바 표준

* 스프링은 JmsTemplate이라는 템플릿 기반 추상화를 통해 JMS를 지원한다. 메시지 구동형(message-driven) POJO 개념을 지원


2.1 스프링에서 메시지 브로커 셋업

ActiveMQ : 오픈소스 메시지  중개자, JMS를 이용한 비동기식  메시징에 적합


커넥션 팩토리 생성

모든 경우에, 메시지 브로커를 통해 메시지를 전송하기 위해서는 JMS 커넥션 팩토리가 필요 => ActiveMQ를 위한 ActiveMQConnectionFactory가 제공 됨

<bean id="connectionFactory"
      class="org.apache.activemq.spring.ActiveMQConnectionFactory"
      p:brokerURL="tcp://localhost:61616"/>

* brokerURL : 메시지 중개자가, 위치한  곳을 커넥션 팩토리에 알려주기 위한 프로퍼티


ActiveMQ 메시지 목적지 선언

커넥션 팩토리를 설정한 후, 메시지가 전달될 목적지를 선언해야함(큐, 토픽)

#큐
<bean id="queue" class="org.apache.activemq.command.ActiveMQQueue" c:_="spitter.queue" />

#토픽
<bean id="topic" class="org.apache.activemq.command.ActiveMQTopic" c:_="spitter.queue" />

2.2 스프링의 JMS 템플릿 사용

* 기존 JMS 코드의 경우 굉장히 지저분하다 => "hello world"를 전송하는데에 20줄 이상의 코드가 필요 => 여느 스프링과 마찬가지로 템플릿을 지원하여, 중복 코드를 제거할 수 있다.


JMS 템플릿 활용

JmsTemplae : 커넥션 생성, 세션  획득, 실제  메시지 전송과 수신 작업을 맡음. JMSException 또한 알아서 비검사형인 JmsException으로 던져줌

비검사형인 JmsException의 종류

#JmsTemplate을 사용하기 위한 빈 선언
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" c:_-ref="connectionFactory" />

* JmsTemplate은 메시지 중개자의 커넥션 획득 방법을 알아야하므로, connectionFactory를 프로퍼티에 선언해야 함


메시지 전송

게시글 추가 알림 기능(이메일)을 만들 때, 실제 누가 그것을 받아야하는지 알아야하고, 전송 시간도 고려해야한다면.. 애플리케이션의 인지 성능 저하를 일으킬 수 있다.

따라서 게시글 추가를 할 때, 메시지를 바로 보내서 시간을 소비하기 보다는, 작업을 큐에 쌓아놓고 작업하는 것이 합리적이다 => 메시지를 큐나 토픽에 넣는 것은, 실제 전송에 걸리는 시간과 비교하면 매우 미미

인터페이스 작성

// 비동기식 알림 전송을 지원하는, AlertService 인터페이스
public interface AlertService {
  void sendSpittleAlert(Spittle spittle);
}

=> 인터페이스를 구현할 때엔, 주입된 JmsOperation을 이용한다(JmsTemplate 구현 인터페이스)

public class AlertServiceImpl implements AlertService {
  
  privatre JmsOperations jmsOperations;
  
  @Autowired
  public AlertServiceImpl(JmsOperations jmsOperations) {
    this.jmsOperations = jmsOperations;
  }
  
  public void sendSpittleAlert(final Spittle spittle) {
    jmsOperations.send( // 메시지 전송
            "spittle.alert.queue", // 목적지 (큐)
            new MessageCreator() {
              public Message createMessage(Session session) throws JMSException {
                return session.createObjectMessage(spittle) // 메시지 생성
              }
            }
    );
  }
}

sendSpittleAlert() 메소드에서, 메시지를 조립하고 전송만 한다(커넥션, 세션 X) 

JmsTemplate이, 전송자 대신 복잡한 일을 처리한다


기본 목적지 설정

send() 메소드에서는 목적지를 명시적으로 설정함. 하지만, 항상 같은 목적지로 지정하는 것은 최선이 아님 => 매번 명시적으로 지정하는 대신, 기본 목적지를 지정할 수 있다. 

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" 
    c:_-ref="connectionFactory" p:defaultDestinationName="spittle.alert.queue" />

=> 이런식으로 기본 목적지를 지정한다면.. 위의 send() 메소드는 더 간단해 질 수 있다.

 jmsOperations.send( // 메시지 전송
          new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
              return session.createObjectMessage(spittle) // 메시지 생성
            }
          }
);

==> 하지만 메시지 전송 시에, 변환기를  사용하면 더 간단해질 수 있다.


전송 시 메시지 변환

send()와 더불어, JmsTemplate은 convertAndSend()를 제공 => MessageCreator대신 빌트인 메시지 변환기 사용

public void sendSpittleAlert(Spittle spittle) {
  jmsOperations.convertAndSend(spittle); // 코드 단 한줄
}

=> 객체를 메시지로 변환하기 위해서는 MessageConverter의 구현체를  사용한다

// MessageConverter의 인터페이스
public interface MessageConverter {
  Message toMessage(Object object, Session session) throws JMSException, MessageConversionException;
  
  Object fromMessage(Message message)
}

==> 다양한 구현체가 존재한다

==> convertAndSend()를 사용할 때, 기본적으로 SimpleMessageConverter를 JmsTemplate에서 사용한다.

==> 당연히, 빈 설정시에, 오버라이드 가능

# 컨버터 선언
<bean id="messageConverter"
      class="org.springframework.jms.support.converter.MappingJacksonMessageConverter" />
      
# 와이어링
<bean id="jmsTemplate"
      class="org.springframework.jms.core.JmsTemplate"
      c:_-ref="connectionFactory"
      p:defaultDestinationName="spittle.alert.queue"
      p:messageConverter-ref="messageConverter" />

메시지 소비

지금까지는 전송을 알아보았음 => JmsTemplate은 수신에도 사용가능. 심지어 더 쉬움 => 단순히 reveicve() 메소드만 호출

public Spittle receiveSpittleAlert() {
  try {
    ObjectMessage receivedMessage =
    (ObjectMessage) jmsOperations.receive(); // 메시지 수신
    return (Spittle) receivedMessage.getObject(); // 객체 얻기
  } catch (JMSException jmsException) {
    throw JmsUtils.convertJmsAccessException(jmsException); // 변환된 예외 던지기
  }
}

recieve() 메소드가 호출되면, 메시지 브로커로부터 조회를 한다. 도착한 메싲기가 없으면 대기

JmsTemplate을 통해, 큐나 토픽에서 메시지 수신

메시지는 객체 메시지로 전송된다는 것을 알기에 => getObject() 사용

* 더 명확하게 메시지를 처리하는 방법은, convertAndSend()를 사용하는 방법

public Spittle retrieveSpittleAlert() {
  return (Spittle) jmsOperations.receiveAndConvert(); // 와~ 또 단 한줄
}

==> 메시지를 ObjectMessage로 캐스팅할 필요도 없으며, JMSException을 사용할 필요도 없음 

==> 하지만, 단점은 recieve(), recieveAndConvert()는 동기식이라는 점... 비동기 메시지를 왜 동기로?.. 왜 알려줌?

==> 이 때, 메시지 드리븐 POJO의 편의성이 두드러진다


2.3 메시지 드리븐 POJO 작성

* EJB2 명세에서는 메시지-드리븐 빈(Message-Driven Bean, 이하 MDB)이 포함됨. 이는 메시지를 비동기식으로 처리하는 EJB이다. 따라서 목적지에 도착한 이벤트에 반응하여 메시지를 처리함 => 동기식과는 반대

* EJB3에서는 MDB가 좀 더 POJO 친화적으로 정리 됨.

* 스프링에선 유사하지만 독자적인 메시지 드리븐 빈 형태를 제공한다 => 메시지-드리븐 POJO(Message-Driven POJO, 이하 MDP) 


메시지 리스너 생성

* 기존 EJB의 경우, @MessageDriven을 적용하고, MessgaListner 인터페이스를 구현해야 했다.

==> 스프링의 경우 MDP를 지원하기 때문에, POJO를 구현한다

public classs SpittleAlertHandler {
  public void handleSpittleAlert(Spittle spittle) {
     // 알림 구현
} }

* 이후에는, 메시지 리스너에 관련된 설정만 해주면 됨


메시지 리스너 설정

메시지 수신 기능을 POJO에 부여하기 위해선, 스프링에 메시지 리스너로 설정을 하면 됨

# SpittleAlertHandler를 빈으로 설정 
<bean id="spittleHandler" class="com.habuma.spittr.alerts.SpittleAlertHandler" />

#메시지 리스너 설정
<jms:listener-container connection-factory="connectionFactory">
  <jms:listener destination="spitter.alert.queue" ref="spittleHandler" method="handleSpittleAlert" />
</jms:listener-container>

* 메시지 리스너 컨테이너에 포함된 메시지 리스너

* jms:listener로 지정하면, onMessage() 메소드는 기본적으로 호출

=> 메시지 리스너 컨테이너 : JMS 목적지를 감시하는 빈, 메시지가 도착하면 MDP(메시지 리스너)로 전달

'메시지 리스너 컨테이너'는 큐/토픽을 계속 읽다가 메시지가 오면 전달


2.4 메시지 기반의 RPC 활용

* 메시지 기반의 RPC(원격 프로시저 호출)를 지원하기 위해, 스프링은 JmsInvokerServiceExporter를 제공하며, 클라이언트가 서비스를 소비할 수 있도로 JmsInvokerProxyFactoryBean을 제공 -> 옵션은 서로 비슷하다


JMS 기반의 서비스 익스포팅

* JmsInvokerServiceExporter를 사용하기 위한 AlertServiceImpl

@Component("alertService")
public class AlertServiceImpl implements AlertService {
  private JavaMailSender mailSender;
  private String alertEmailAddress;
  
  public AlertServiceImpl(JavaMailSender mailSender, String alertEmailAddress) {
  this.mailSender = mailSender;
  this.alertEmailAddress = alertEmailAddress;
  }
  
  public void sendSpittleAlert(final Spittle spittle) {
    SimpleMailMessage message = new SimpleMailMessage();
    String spitterName = spittle.getSpitter().getFullName();
    message.setFrom("noreply@spitter.com");
    message.setTo(alertEmailAddress);
    message.setSubject("New spittle from " + spitterName);
    message.setText(spitterName + " says: " + spittle.getText());
    mailSender.send(message);
  }
}

* 일반적인 POJO => Jms 메시지를 처리하는데 사용되는 내용이 없음

* alertService라는 ID로 빈 등록이 되며, JmsInvokerServiceExporter 설정으로 빈을 참조한다

// AlertSerivce 인터페이스 
public interface AlertService {
  void sendSpittleAlert(Spittle spittle);
}
# AlertService 등록
<bean id="alertServiceExporter" class="org.springframework.jms.remoting.JmsInvokerServiceExporter"
    p:service-ref="alertService" p:serviceInterface="com.habuma.spittr.alerts.AlertService" />

* 익스포터의 프로퍼티는 서비스가 JMS를 통해 전달되는 세부 사항을 기술하지 않지만, 익스포터는 JMS 리스너로서의 자격이 주어짐

#메시지 리스너 설정
<jms:listener-container connection-factory="connectionFactory">
  <jms:listener destination="spitter.alert.queue" ref="alertServiceExporter"/>
</jms:listener-container>

JMS 기반의 서비스 소비

JMS 기반의 알림 서비스가 준비되고, jms.alert.queue라는 이름의 큐에 메시지가 도착하기를 기다려야함. 클라이언트 측에서는 JmsInvokerProxyFactoryBean을 사용하여 액세스한다.

* 15장의 다른 리모팅 팩토리 빈과 유사함 => 다른점은, RMI나 HTTP대신 JMS라는 것..

<bean id="alertService" 
  class="org.springframework.jms.remoting.JmsInvokerProxyFactoryBean" 
  p:connectionFactory-ref="connectionFactory" 
  p:queueName="spittle.alert.queue" 
  propp:serviceInterface="com.habuma.spittr.alerts.AlertService" 
/>

** JMS가 유일한 메시징 방법은 아님 => AMQP(Advanced Message Queueing Protocol)가 존재


3. AMQP를 이용한 메시징

* JMS에 비해 몇 가지의 장점이 존재 

  •  단순히 API규격을 제공하는 JMS에 비해, 메시징을 위한 와이어 수준의 프로토콜 정의
  •  유연하고 투명한 메시징 모델, 지점대지점, 발행-구독 모델 이외에도, 더 다양한 방식이 있음

* 스프링 AMQP는 스프링 애플리케이션인 AMQP 스타일의 메시징을 할 수 있게 하는 프레임워크 확장


3.1 AMQP에 대한 간략한 소개

* AMQP의 생산자는 큐에 직접 발행(게시)하지 않음, 대신에 생산자와 큐 사이에 메시지 전달을 위한 새로운 방법 제공

메시지 브로커는,  메시지 라우팅을 처리하는 교환에 의해 메시지 큐를 분리

* 메시지 생성자는, 교환을 위해 메시지를 발행 => 교환은 하나 이상의 큐에 적용, 메시지는 큐로 전달 => 소비자는 메시지를 가져와 처리

* 교환 알고리즘에 따라, 메시지의 라우팅 키 및 인자를 고려 => 교환 및 큐 사이의 바인딩 인수와 라우팅 키 비교 => 알고리즘이 만족 되면 메시지는 큐에 라우팅

* AMQP 교환의 네가지 표준 유형

  • 다이렉트 - 라우팅 키가 바인딩을 위하 라우팅 키 대상으로 다이렉트 매칭 될 경우
  • 토픽 - 라우팅 키가 바인딩을 위한 라우팅 키 대상으로 와일드카드 매칭 될 경우
  • 헤더 - 인자의 테이블 내 값과 헤더가 인자의 바이딩 테이블과 일치하는 경우, x-match라는 특별 헤더 사용
  • 팬아웃 - 인자의 테이블 내 헤더, 라우팅 키에 상관 없이 메시지는 모든 교환될 큐에 연결

=> 다양한 표준 유형을 통해 정의를 하지만, 실제 개발에 큰 영향을 미치지 않는다

=> 단순히, 생산자는 라우팅 키를 교환하기 위해 개발, 소비자는 큐에서 검색


3.2 AMQP 메시징을 위한 스프링 설정하기

AMQP 커넥션 팩토리를 설정 => RabbitMQ(AMQP를 지원하는 오픈소스 메시지 브로커) 커넥션 팩토리

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/rabbit"
  xmlns:beans="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/rabbit 
   http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd">
...
</beans:beans>

기본적으로 커넥션 팩토리를 RabbitMQ 서버의 5762 포트를 사용한 수신 대기를 가정하며, 사용자 명과 암호는 guest

# RabbitMQ 커넥션 팩토리 오버라이드
<connection-factory id="connectionFactory"
    host="${rabbitmq.host}"
    port="${rabbitmq.port}"
    username="${rabbitmq.username}"
    password="${rabbitmq.password}" />

* 플레이스 홀더를 사용하여, 외부 프로퍼티로 관리


큐의 선언, 교환, 바인딩

JMS와 달리, 큐와 토픽의 라우팅 동작은 스펙에 의해 정의

스프링 AMQP의 rabbit네임 스페이스

rabbit 네임스페이스는 큐 생성 교환 바인딩을 위한, 여러 요소를 포함

위 요소들은 <admin> 요소와 같이 사용

# 큐 생성 교환, 바인딩을 자동으로 수행하는 RabbitMQ 관리 컴포넌트 생성
<admin connection-factory="connectionFactory"/> 
<queue id="spittleAlertQueue" name="spittle.alerts/>

 * 기본적인 다이렉트 교환은 이름이 없이 이루어짐 => 모든 큐는 큐명과 동일한 이름을 가지는 라우팅 키로 교환 한정

** 하지만 더 확실하게, 라우팅을 하기 위해선 하나 이상의 교환을 선언하고, 큐와 바인딩 시켜야함

<admin connection-factory="connectionFactory" /> 
<queue name="spittle.alert.queue.1"> 
<queue name="spittle.alert.queue.2"> 
<queue name="spittle.alert.queue.3"> 

<fanout-exchange name="spittle.fanout"> 
  <bindings> 
    <binding queue="spittle.alert.queue.1"/> 
    <binding queue="spittle.alert.queue.2"/>
    <binding queue="spittle.alert.queue.3" />
  </bindings> 
</fanout- exchange>

=> queue1 ,2, 3을 바인딩하고, fanout 교환 생성

** 이 외에도 RabbitMQ는 라우팅 설정을 위한 다양한 방법을 사용 (너무 많아서 책으로는 못 다룬다고 적힘)


3.3 RabbitTemplate으로 메시지 전송

- RabbitMQ 커넥션 팩토리는, RabbitMQ와 연결하기 위해 필요  => RabbitMQ로 메시지 전종 => connectionFactory 빈을 AlertServiceImpl 클래스로 주입 => Connection 생성 => Channel 생성 => Channel 사용

=> RabbitMQ또한, 쓸모없는 코드들이 많고, 제거해야할 코드가 많음 => 당연히 Template지원(똑같은 레파토리..)

===> 송수신에 필요한 쓸모없는 코드들을 제거한 RabbitTemplate

<template id="rabbitTemplate" connection-factory="connectionFactory" />

 

==> RabbitTemplate을 사용하는 AlertServiceImpl 

public class AlertServiceImpl implements AlertService {
  private RabbitTemplate rabbit;
  
  @Autowired
  public AlertServiceImpl(RabbitTemplate rabbit) {
    this.rabbit = rabbit;
  }
  
  public void sendSpittleAlert(Spittle spittle) {
    rabbit.convertAndSend("spittle.alert.exchange",
                          "spittle.alerts",
                          spittle);
  } 
}

* convertAndSend() => 파라미터 : 교환명, 라우팅 키, 전송 객체 => 필요없는 코드인, 라우팅방식, 큐 전달방식 등은 사용하지 않음

 => 필요할 경우에는, 교환명과 라우팅 키 모두 생략 가능 => convertAndSend(spittle) 가능 => teplate 요소에서 정의해줄 경우

<template id="rabbitTemplate" connection-factory="connectionFactory" exchange="spittle.alert.exchange" routing-key="spittle.alerts" />

==> convertAndSend() 메소드가 아닌 일반 send() 메소드가 있지만, 메시지 변환기를 사용할 수 있는 converAndSend()메소드를 책에서는 추천함


3.4 AMQP 메시지 수신하기