0. In-Memory 기반 Message Broker 문제점
Spring에서 제공하는 STOMP를 활용해 내장된 Simple Message Broker를 통해 채팅 서버 구현이 가능하지만, Spring 서버의 내부 메모리에서 동작하게 되면 아래와 같은 문제가 발생할 수 있다.
- 서버거 다운되거나 재시작하면 Message Broker에 있는 데이터들이 유실될 수 있다.
- 다수의 서버일 경우 서버 간 채팅방을 공유할 수 없어 다른 서버 간에 있는 사용자와의 채팅이 불가능하다.
이러한 문제를 해결하기 위해 외부 Message Broker를 사용할 수 있다.
대표적으로 Apache Kafka, Redis, RabbitMQ 등이 있다.
1. Redis
Redis는 STOMP 프로토콜을 지원하지 않지만, Redis가 제공하는 PUB/SUB 기능을 통해 Message Broker로 사용할 수 있다.
Redis의 PUB/SUB은 메시지 지속성이 없다. 메시지를 전송한 후 해당 메시지가 삭제되기 때문에 실시간 데이터 처리에는 매우 적합하지만, 메시지가 저장되지 않는다는 점을 반드시 인지하고 있어야 한다.
또한 수신자가 메시지를 받는 것을 보장하지 않는다는 단점이 있다.
(STOMP 프로토콜을 지원하는 RabbitMQ와 같은 Message Broker를 사용하면 메시지 전달 보장, SSL 지원 등 더 고도화가 가능하다.)
우리 프로젝트는 채팅방의 생명주기가 짧고, 메시지의 유실이 서비스 전체에 큰 문제가 되지 않기 때문에 가장 간단하게 구현이 가능한 Redis를 메시지 브로커로 사용했다.
2. Spring Data Redis로 Redis Pub/Sub 적용

다중 서버 환경에의 채팅 플로우
- Publisher가 채팅 메시지를 전송한다.
- 로드밸런서가 Server 1으로 요청을 전달한다.
- Server 1에서 RedisPublisher로 채팅방에 대한 Topic에 메시지를 발행하고, 메시지를 저장한다.
- 채팅방에 대한 Topic을 구독 중인 다른 서버(Server 1, Server 2, Server 3)의 RedisSubscriber에서 메시지를 받으면 내부 메시지 브로커로 Subscriber들에게 메시지를 발행한다.
2.1. 의존성 추가
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
}
2.2. Redis 설정
@Configuration
public class RedisConfig {
private static final String TOPIC_NAME = "chatting";
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private int port;
@Value("${spring.redis.password}")
private String password;
@Bean
public RedisTemplate<String, Object> redisTemplate() { (1)
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory());
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));
return redisTemplate;
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, (2)
MessageListenerAdapter listenerAdapter,
ChannelTopic channelTopic) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, channelTopic);
return container;
}
@Bean
public RedisConnectionFactory redisConnectionFactory() { (3)
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
redisStandaloneConfiguration.setHostName(host);
redisStandaloneConfiguration.setPort(port);
redisStandaloneConfiguration.setPassword(password);
return new LettuceConnectionFactory(redisStandaloneConfiguration);
}
@Bean
public MessageListenerAdapter listenerAdapter(RedisSubscriber subscriber) { (4)
return new MessageListenerAdapter(subscriber, "onMessage");
}
@Bean
public ChannelTopic channelTopic() { (5)
return new ChannelTopic(TOPIC_NAME);
}
}
(1) Redis 서버와 상호작용하기 위한 RedisTemplate 관련 설정
Redis 서버에는 바이트 코드만 저장되므로 key와 value에 Serializer를 설정해준다.
JSON 형식으로 메시지를 교환하기 위해 ValueSerializer를 Jackson2JsonRedisSerializer로 설정해준다.
(2) 발행된 메시지 처리를 위한 리스너 설정
RedisMessageListenerContainer: Redis Channel(Topic)로부터 메시지를 받고, 주입된 리스너들에게 비동기적으로 전송
(3) Netty(비동기 이벤트 기반 고성능 네트워크 프레임워크) 기반의 Redis 클라이언트 LettuceConnectionFactory 빈 등록
(4) RedisMessageListenerContainer로부터 메시지를 전달받고, 실제 메시지를 처리하는 비즈니스 로직이 담긴 Subscriber 빈 추가
(5) Topic 공유를 위해 Channel Topic을 빈으로 등록
2.3. RedisMessageListener
@RequiredArgsConstructor
@Component
public class RedisMessageListener {
private static final Map<Long, ChannelTopic> TOPICS = new HashMap<>();
private final RedisMessageListenerContainer redisMessageListenerContainer;
private final RedisSubscriber redisSubscriber;
public void enterChattingRoom(Long chattingRoomId) { (1)
ChannelTopic topic = getTopic(chattingRoomId);
if (topic == null) {
topic = new ChannelTopic(String.valueOf(chattingRoomId));
redisMessageListenerContainer.addMessageListener(redisSubscriber, topic);
TOPICS.put(chattingRoomId, topic);
}
}
public ChannelTopic getTopic(Long chattingRoomId) { (2)
return TOPICS.get(chattingRoomId);
}
}
(1) 채팅방 아이디로 Topic을 가져오고, 없으면 새로운 Topic을 발행해 저장한 후 RedisSubscriber에 해당 Topic을 등록한다.
(2) 채팅방 아이디로 Topic을 가져온다.
2.4. RedisPublisher
@RequiredArgsConstructor
@Service
public class RedisPublisher {
private final RedisTemplate<String, String> redisTemplate;
private final JsonParser jsonParser;
public void publish(ChannelTopic topic, ChattingRequest chattingRequest) {
redisTemplate.convertAndSend(topic.getTopic(), jsonParser.toJson(chattingRequest)); (1)
}
}
(1) Topic에 메시지를 발행한다.
2.5. RedisSubscriber
@RequiredArgsConstructor
@Service
public class RedisSubscriber implements MessageListener {
private final JsonParser jsonParser;
private final RedisTemplate redisTemplate;
private final SimpMessageSendingOperations messagingTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
ChattingRequest chattingRequest = jsonParser.toChattingRequest((String) redisTemplate.getStringSerializer().deserialize(message.getBody()));
messagingTemplate.convertAndSend("/subscription/chattings/rooms/" + chattingRequest.getChattingRoomId(), chattingRequest); (1)
}
}
(1) 구독 중인 채널에 메시지가 발행되면 내부 브로커로 메시지를 전달한다.
2.6. ChattingRepository
@RequiredArgsConstructor
@Repository
public class ChattingRoomRepository {
private static final String CHATTING_ROOM = "CHATTING_ROOM";
private static final String DELIMITER = ":";
private final RedisTemplate<String, Object> redisTemplate;
private HashOperations<String, String, ChattingRoom> opsHashChattingRoom;
private HashOperations<String, String, Chatting> opsHashChatting;
@PostConstruct
private void init() {
opsHashChattingRoom = redisTemplate.opsForHash();
opsHashChatting = redisTemplate.opsForHash();
}
public void createChattingRoom(String name) { (1)
ChattingRoom chattingRoom = ChattingRoom.from(name);
opsHashChattingRoom.put(CHATTING_ROOM, chattingRoom.getId().toString(), chattingRoom);
}
public List<ChattingRoom> findAllRoom() {
return opsHashChattingRoom.values(CHATTING_ROOM);
}
public ChattingRoom findById(String chattingRoomId) {
return opsHashChattingRoom.get(CHATTING_ROOM, chattingRoomId);
}
public void saveChatting(String chattingRoomId, Chatting chatting) {
opsHashChatting.put(CHATTING_ROOM + DELIMITER + chattingRoomId, chatting.getId(), chatting);
}
public List<Chatting> findChattingByChattingRoomId(String chattingRoomId) { (2)
return opsHashChatting.values(CHATTING_ROOM + DELIMITER + chattingRoomId);
}
}
(1) CHATTING_ROOM:{채팅방 아이디}를 키 값으로 갖는 해시에 채팅 아이디와 Chatting을 추가한다.
(2) CHATTING_ROOM:{채팅방 아이디}를 키 값으로 갖는 해시의 value를 모두 반환한다.
2.7. ChattingService
| ChattingService.java
@RequiredArgsConstructor
@Transactional
@Service
public class ChattingService {
private final RedisPublisher redisPublisher;
private final RedisMessageListener redisMessageListener;
private final ChattingRoomRepository chattingRoomRepository;
public void send(Message message) {
redisPublisher.publish(redisMessageListener.getTopic(message.getChattingRoomId()), message); (1)
chattingRoomRepository.saveChatting(message.getChattingRoomId(), message.toChatting()); (2)
}
public void createChattingRoom(ChattingRoomCreateRequest chattingRoomCreateRequest) {
chattingRoomRepository.createChattingRoom(chattingRoomCreateRequest.getName());
}
public void enterChattingRoom(String chattingRoomId) {
redisMessageListener.enterChattingRoom(chattingRoomId); (3)
}
public List<ChattingRoom> getChattingRooms() {
return chattingRoomRepository.findAllRoom();
}
public ChattingRoom getChattingRoom(String chattingRoomId) { (4)
return chattingRoomRepository.findById(chattingRoomId);
}
public List<Message> getChattings(String chattingRoomId) { (5)
return chattingRoomRepository.findChattingByChattingRoomId(chattingRoomId)
.stream()
.map(chatting -> Message.builder()
.chattingRoomId(chatting.getId())
.senderId(chatting.getSenderId())
.content(chatting.getContent())
.build())
.collect(Collectors.toList());
}
}
(1) 클라이언트가 메시지를 전송하면 채팅방 아이디로 Topic을 가져와 해당 Topic에 Message를 발행한다.
(2) Redis에 Message를 저장한다.
(3) 채팅방에 입장하면 MessageListener에 채팅방 아이디로 Topic을 등록하고, 채팅방에 속한 사용자 및 상대팀 정보를 반환한다.
(4) 사용자가 속한 채팅방 목록을 반환한다.
(5) 채팅방에 입장하면 채팅 목록을 반환한다.
2.8. ChattingController & ChattingRoomController
| ChattingController.java
@RequiredArgsConstructor
@Controller
public class ChattingController {
private final ChattingService chattingService;
@MessageMapping("/chattings/rooms/messages")
public void send(Message message) {
chattingService.send(message);
}
}
| ChattingRoomController.java
@RequiredArgsConstructor
@RestController
@RequestMapping("/chattings/rooms")
public class ChattingRoomController {
private final ChattingService chattingService;
@PostMapping
public void createChattingRoom(@RequestBody ChattingRoomCreateRequest chattingRoomCreateRequest) {
chattingService.createChattingRoom(chattingRoomCreateRequest);
}
@PostMapping("{chattingRoomId}")
public void enterChattingRoom(@PathVariable String chattingRoomId) {
chattingService.enterChattingRoom(chattingRoomId);
}
@GetMapping
public List<ChattingRoom> getChattingRooms() {
return chattingService.getChattingRooms();
}
@GetMapping("/detail/{chattingRoomId}")
public ChattingRoom getChattingRoom(@PathVariable String chattingRoomId) {
return chattingService.getChattingRoom(chattingRoomId);
}
@GetMapping("/{chattingRoomId}")
public List<Message> getChattings(@PathVariable String chattingRoomId) {
return chattingService.getChattings(chattingRoomId);
}
}
2.9. DTO
| Message.java
@Getter
@Builder
@AllArgsConstructor(access = AccessLevel.PROTECTED)
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Message {
private String chattingRoomId;
private Long senderId;
private String content;
public Chatting toChatting() {
return Chatting.of(senderId, content);
}
}
| ChattingRoomCreateRequest.java
@Getter
@Builder
@AllArgsConstructor(access = AccessLevel.PROTECTED)
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class ChattingRoomCreateRequest {
private String name;
}
3. 소스 코드
https://github.com/jeongyuneo/spring-labs/tree/main/spring-websocket-redis
4. To Do
- 채팅 목록 최신 메시지 상단 배치
- 채팅방 입장 시 모든 메시지 반환 → 특정 개수만큼 반환 및 무한 스크롤 처리
- 채팅 알림 기능 구현
References
[WebSocket] Spring Boot + STOMP + Redis Pub/Sub 이용한 채팅 서버 구현
'study > Spring' 카테고리의 다른 글
| [Spring] Redis Sorted Set을 이용한 검색 랭킹 조회 기능 개선 (1) | 2023.12.10 |
|---|---|
| [Spring Boot] 이메일 인증 with NCP & Redis (0) | 2023.09.25 |
| [Spring Boot] 전화번호 인증 with NCP SMS API & Redis (0) | 2023.08.03 |
| [Spring Boot] AWS S3 이미지 업로드 (0) | 2023.08.03 |
| [Spring WebSocket] Spring WebSocket STOMP 적용 (3) | 2023.06.19 |