본문 바로가기

Java/Vert.x

메시지 코덱(Message codecs)을 구현하여 이벤트 버스에서 사용자 메시지 주고 받기

테스트 환경 및 주요 아젠다

더보기

이 프로젝트의 개발 환경

  • 프로젝트 구현
    • Open JDK 12.0.1
    • Gradle : vertx-core 3.7.1

예제는 사용자 메시지 코덱을 새로 정의하고 이벤트 버스에서 사용 될 수 있도록 기본 코덱으로 지정합니다.

사용자 메시지 정의 테스트 환경 구축 사용자 메시지 코덱 정의 기본 코덱으로 등록
이벤트 버스에서 주고 받기 위한 사용자 메시지를 정의합니다. 두 개의 버티클을 배포합니다.
버티클은 서로 간에 이벤트 버스를 통해 메시지를 주고 받습니다.
사용자 메시지가 이벤트 버스에서 전달되기 위한 코덱을 정의합니다. 기본 코덱으로 등록하여 이벤트 버스 송신에서 별도 옵션 없이 사용합니다.

사용자 메시지 정의

이벤트 버스에서 주고 받기 위한 사용자 메시지를 정의합니다.

public class CustomEventbusMessage 
{
	public int message_type;
	public String message;
   
	public CustomEventbusMessage() 
	{
	
	}
    
	public CustomEventbusMessage(int _message_type, String _message) 
	{
		message_type = _message_type;
		message = _message;
	}
}
코드 비고
Line 3 메시지 유형을 저장 사용자 메시지를 범용 사용하기 위해서 메시지 유형을 저장하도록 하였습니다.
Line 4 메시지 내용을 저장 사용자 메시지가 실제로 담고 있는 내용입니다.

테스트 환경 구축

두 개의 버티클을 배포(Deployment)하고 이벤트 버스를 사용해서 서로 간에 메시지를 주고 받을 수 있는 환경을 구축합니다.

먼저 메시지를 전송하기 위한 버티클 Sender_Verticle을 구현합니다.

public class Sender_Verticle extends AbstractVerticle 
{
	// 테스트는 2초 간격으로 메시지를 송신
	private static final long TEST_PERIODIC = 2000L;
	// 메시지에 포함되는 내용
	private static final int MESSAGE_TYPE = 1234;
	private static final String MESSAGE = "now we know how message coder could registered";
	// 이벤트 버스 주소
	private static final String EVENTBUS_ADDRESS = "test.for.eventbus.message.codec";
	
	@Override
	public void start(Promise<Void> _future) throws Exception 
	{
		getVertx().setPeriodic(TEST_PERIODIC, ar -> {
			CustomEventbusMessage msg = new CustomEventbusMessage(
				MESSAGE_TYPE,
				MESSAGE
			);
            
			getVertx().eventBus().send(EVENTBUS_ADDRESS, msg);
		});
        
		_future.complete();
	}
}
코드 비고
Line 14:21 메시지 전송 버티클이 배포되면 2초 간격으로 실행되는 타이머를 생성합니다.
타이머가 실행되면 미리 정의한 CustomEventBusMessage를 이벤트 버스로 송신합니다.

이벤트 버스 메시지를 수신하는 버티클은 Receiver_Verticle입니다.

이벤트 버스 주소에 핸들러를 등록하고 핸들러가 호출되면 메시지 내용을 출력합니다.

public class Receiver_Verticle extends AbstractVerticle 
{
	// 이벤트 버스 주소
	private static final String EVENTBUS_ADDRESS = "test.for.eventbus.message.codec";
    
	@Override
	public void start(Promise<Void> _future) throws Exception 
	{
		getVertx().eventBus().consumer(EVENTBUS_ADDRESS, ar -> {
			CustomEventbusMessage msg = (CustomEventbusMessage)(Object) ar.body();
			System.out.println(String.format("some msg received, message_type(%d) message(%s)]", 
				msg.message_type,
				msg.message));
		});
        
		_future.complete();
	}
}
코드 비고
Line 9:14 메시지 수신 이벤트 버스에 핸들러를 등록합니다.
핸들러는 미리 정의된 주소로 수신되는 메시지가 발생하면 콜백됩니다.
CustomEventBusMessage를 요청의 body()로 받아서 필드 값을 출력합니다.

Sender_VerticleReceiver_Verticle을 배포하기 위한 Launcher 버티클입니다.

자바 실행문으로 런처 클래스가 시작되면 두 개의 버티클을 배포합니다.

public class Launcher extends AbstractVerticle 
{
	@Override
	public void start(Promise<Void> _future) throws Exception 
	{
		getVertx().deployVerticle(Sender_Verticle.class.getName());
		getVertx().deployVerticle(Receiver_Verticle.class.getName());
        
		_future.complete();
	}
}

기본적인 테스트 환경은 이렇게 구축되지만 아직 제대로 동작하지 않습니다(아래와 같은 오류가 발생합니다).

이벤트 버스에서 주고 받는  CustomEventbusMessage 사용자 메시지에 대한 코덱을 등록하지 않았기 때문입니다.

SEVERE: Unhandled exception
java.lang.IllegalArgumentException: No message codec for type: class Codecs.CustomEventbusMessage
	at io.vertx.core.eventbus.impl.CodecManager.lookupCodec(CodecManager.java:101)
	at io.vertx.core.eventbus.impl.EventBusImpl.createMessage(EventBusImpl.java:232)
	at io.vertx.core.eventbus.impl.EventBusImpl.send(EventBusImpl.java:102)
	at io.vertx.core.eventbus.impl.EventBusImpl.send(EventBusImpl.java:97)
	at Verticle.Sender_Verticle.lambda$start$0(Sender_Verticle.java:26)
	at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:889)
	at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:860)
	at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:50)
	at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:274)
	at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:22)
	at io.vertx.core.impl.AbstractContext.emit(AbstractContext.java:53)
	at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:22)
	at io.vertx.core.impl.VertxImpl$InternalTimerHandler.run(VertxImpl.java:883)
	at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
	at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:176)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:835)

사용자 메시지 코덱 정의

사용자 메시지 CustomEventbusMessage가 이벤트 버스에서 전달되기 위한 코덱을 정의합니다.

코덱은 MessageCodec 인터페이스를 구현해야 합니다.

public class CustomMessageCodec<S, R> implements MessageCodec<S, R> 
{
	...
}

두 개의 제네릭 타입 <S, R>은 송신 메시지 유형(S)와 수신 메시지 유형(R)입니다.

public class CustomMessageCodec<CustomEventbusMessage, CustomEventbusMessage> implements MessageCodec<CustomEventbusMessage, CustomEventbusMessage> 
{
	...
}
더보기

송수신 메시지의 유형이 동일하다면 같은 클래스를 입력합니다.

사용자 메시지 코덱 정의: encodeToWire

encodeToWire에서 이벤트 버스로 메시지가 송신 될 때의 인코딩(Encoding) 알고리즘을 구현합니다.

/**
 * Called by Vert.x when marshalling a message to the wire.
 *
 * @param buffer  the message should be written into this buffer
 * @param s  the message that is being sent
 */
void encodeToWire(Buffer buffer, S s);

첫 번째 인자 buffer는 인코딩 결과를 나타내는 버퍼입니다. 인자로는 빈 버퍼가 전달되며 인코딩 알고리즘을 구현하여 메시지를 버퍼에 작성해야 합니다.

이 때 인코딩이 필요한 메시지는 두 번째 인자 s로 전달됩니다.

@Override
public void encodeToWire(Buffer buffer, CustomEventbusMessage customEventbusMessage) 
{
	// 메시지 유형을 인코딩
	buffer.appendInt(customEventbusMessage.message_type);
	// 메시지 내용을 인코딩
	buffer.appendString(customEventbusMessage.message);
}

사용자 메시지 코덱 정의: decodeToWire

decodeToWire에서 이벤트 버스로 메시지가 수신 될 때의 디코딩(Decoding) 알고리즘을 구현합니다.

/**
 * Called by Vert.x when a message is decoded from the wire.
 *
 * @param pos  the position in the buffer where the message should be read from.
 * @param buffer  the buffer to read the message from
 * @return  the read message
 */
R decodeFromWire(int pos, Buffer buffer);

첫 번째 인자 pos는 버퍼의 시작 인덱스이며 두 번째 인자 buffer는 인코딩 결과를 포함하는 버퍼입니다.

리턴 값 R은 버퍼를 디코딩 한 결과로 얻을 수 있는 메시입니다.

@Override
public CustomEventbusMessage decodeFromWire(int pos, Buffer buffer) 
{
	CustomEventbusMessage msg = new CustomEventbusMessage();
	
	// 메시지 유형을 디코딩
	int curr = pos;
	msg.message_type = buffer.getInt(curr);
	// 메시지 내용을 디코딩
	curr += 4;
	msg.message = buffer.getString(curr, buffer.length());
    
	return msg;
}

사용자 메시지 코덱 정의: transform

tranform은 메시지가 이벤트 버스를 통해 로컬 송신되는 경우에 대한 처리입니다. 

더보기

로컬 송신은 메시지가 동일한 머신에서 송수신되는 경우입니다. 

네트워크를 통해 전송되지 않고 메모리에서 즉시 처리되기 때문에 인코딩 또는 디코딩의 작업이 필요 없습니다.

이 예제는 단일 호스트로 동작하기 때문에 이 함수가 실행됩니다.

송수신 과정에서 메시지의 유형이 변경된다면 이곳에서 처리합니다.

/**
 * If a message is sent <i>locally</i> across the event bus, this method is called to transform the message from
 * the sent type S to the received type R
 *
 * @param s  the sent message
 * @return  the transformed message
 */
R transform(S s);

예제에서는 별도 처리할 것이 없기 때문에 인스턴스를 그대로 리턴합니다.

@Override
public CustomEventbusMessage transform(CustomEventbusMessage customEventbusMessage) 
{
	return customEventbusMessage;
}

사용자 메시지 코덱 정의: name

name은 코덱의 이름을 정의합니다. 코덱의 이름은 애플리케이션에 등록되는 코덱과 중복되어서는 안됩니다.

/**
 * The codec name. Each codec must have a unique name. This is used to identify a codec when sending a message and
 * for unregistering codecs.
 *
 * @return the name
 */
String name();

예제에서는 사용자 메시지에 대한 클래스 전체 경로를 리턴합니다.

사용자 메시지 코덱 정의: systemCodeID

systemCodeID는 코덱의 식별자입니다.

/**
 * Used to identify system codecs. Should always return -1 for a user codec.
 *
 * @return -1 for a user codec.
 */
byte systemCodecID();

사용자가 정의한 코덱은 항상 -1을 리턴합니다.

@Override
public byte systemCodecID() 
{
	return -1;
}

기본 코덱으로 등록 및 테스트

앞서 정의한 CustomeMessageCodec을 예제 애플리케이션에서 기본 코덱으로 등록합니다.

런처 Launcher 버티클의 코드를 다음과 같이 수정합니다.

@Override
public void start(Promise<Void> _future) throws Exception 
{
	CustomMessageCodec codec = new CustomMessageCodec();
	getVertx().eventBus().registerDefaultCodec(CustomEventbusMessage.class, codec);
	
	getVertx().deployVerticle(Sender_Verticle.class.getName());
	getVertx().deployVerticle(Receiver_Verticle.class.getName());
    
	_future.complete();
}
코드 비고
Line 4:5 기본 코덱 등록 CustomeMessageCodec 사용자 코덱을 이벤트 버스에서 기본 코덱으로 등록합니다.

기본 코덱을 등록하면 이제 프로그램이 정상 동작합니다.

3월 02, 2022 7:32:40 오후 io.vertx.core.impl.launcher.commands.VertxIsolatedDeployer
INFO: Succeeded in deploying verticle
some msg received, message_type(1234) sender(Sender_Verticle) message(now we know how message coder could registered)]
some msg received, message_type(1234) sender(Sender_Verticle) message(now we know how message coder could registered)]
some msg received, message_type(1234) sender(Sender_Verticle) message(now we know how message coder could registered)]

정리 및 복습

  • 이벤트 버스를 사용하여 사용자가 정의한 메시지를 송수신 할 수 있습니다.
  • 사용자가 정의한 메시지는 커스텀 클래스입니다.
  • 시스템은 커스텀 클래스를 이벤트 버스로 주고 받기 위해 버퍼로 변환합니다.
  • 커스텀 클래스 객체를 버퍼와 상호 변환하기 위한 방법을 정의하기 위해서 메시지 코덱을 사용합니다.
  • 사용자 정의 메시지를 주고 받기 위해 메시지 코덱을 작성하고 기본 코덱으로 등록합니다.