sequence

This paper mainly studies Cheddar’s MessageSender

MessageSender

Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageSender.java

public interface MessageSender<T extends Message> { /** * Send a message * @param message Message to send * @throws MessageSendException */ void send(T message) throws MessageSendException; /** * Send a message, where the message is not visible to receivers for the specified delay duration * @param message Message to send * @param  delaySeconds Duration for which sent message is invisible to receivers * @throws MessageSendException */ void sendDelayedMessage(T message, int delaySeconds) throws MessageSendException; }Copy the code

The MessageSender interface defines send and sendDelayedMessage methods

MessageSenderImpl

Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageSenderImpl.java

public class MessageSenderImpl<T extends Message> implements MessageSender<T> { private final MessageQueue<T> messageQueue; public MessageSenderImpl(final MessageQueue<T> messageQueue) { this.messageQueue = messageQueue; } @Override public void send(final T message) throws MessageSendException { messageQueue.send(message); } @Override public void sendDelayedMessage(final T message, final int delaySeconds) throws MessageSendException { messageQueue.sendDelayedMessage(message, delaySeconds); }}Copy the code

MessageSenderImpl implements MessageSender interface, and its send method is delegated to messageQueue. Send. Its sendDelayedMessage method delegate to the messageQueue. SendDelayedMessage

MessageQueue

Cheddar/cheddar/cheddar-messaging/src/main/java/com/clicktravel/cheddar/infrastructure/messaging/MessageQueue.java

public interface MessageQueue<T extends Message> { /** * @return The queue name */ String getName(); /** * Send a message to this message queue * @param message Message to send * @throws MessageSendException */ void send(T message) throws MessageSendException; /** * Send a message to this message queue; the message is not visible to receivers for the specified delay duration * @param message Message to send * @param delaySeconds Duration for which sent message is invisible to receivers * @throws MessageSendException */ void sendDelayedMessage(T message, int delaySeconds) throws MessageSendException; /** * Receives any number of messages on this queue, but does not delete them. No order or priority of messages is * guaranteed. * @return List of received {@code Message}s * @throws MessageReceiveException */ List<T> receive() throws MessageReceiveException; /** * Receives any number of messages on this queue up to the maximum specified, but does not delete them. No order or * priority of messages is guaranteed. This call will spend up to the wait time given for a message to arrive in the * queue before returning. * @param waitTimeSeconds The duration (in seconds) for which the call will wait for a message to arrive in the * queue before returning. If a message is available, the call will return sooner. * @param maxMessages The maximum number of messages to return. Will never return more messages than this value but * may return fewer. Values can be from 1 to 10. * @return List of received {@code Message}s  * @throws MessageReceiveException */ List<T> receive(int waitTimeSeconds, int maxMessages) throws MessageReceiveException; /** * Deletes a message previously received from this queue. * @param typedMessage {@code Message} to delete * @throws MessageDeleteException */ void delete(T message) throws MessageDeleteException; }Copy the code

The MessageQueue interface defines getName, Send, sendDelayedMessage, receive, and Delete methods

InMemoryMessageQueue

Cheddar/cheddar/cheddar-integration-mocks/src/main/java/com/clicktravel/infrastructure/messaging/inmemory/InMemoryMessag eQueue.java

public class InMemoryMessageQueue<T extends Message> implements MessageQueue<T>, Resettable {

    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final Queue<T> queue = new ConcurrentLinkedQueue<>();
    private final String name;
    private final InMemoryMessageQueuePoller inMemoryMessageQueuePoller;

    @SuppressWarnings("unchecked")
    public InMemoryMessageQueue(final String name, final InMemoryMessageQueuePoller inMemoryMessageQueuePoller,
            final InMemoryExchange<T>... inMemoryExchanges) {
        this.name = name;
        this.inMemoryMessageQueuePoller = inMemoryMessageQueuePoller;

        final List<String> exchangeNames = new ArrayList<>();
        for (final InMemoryExchange<T> inMemoryExchange : inMemoryExchanges) {
            inMemoryExchange.addSubscriber(this);
            exchangeNames.add(inMemoryExchange.getName());
        }
        logger.info("Using in memory message queue: " + name + " with subscriptions to these exchanges: ["
                + StringUtils.join(exchangeNames) + "]");
    }

    @Override
    public void send(final T message) {
        queue.add(message);
        inMemoryMessageQueuePoller.poll();
    }

    @Override
    public void sendDelayedMessage(final T message, final int delaySeconds) {
        send(message); // delay not supported
    }

    @Override
    public String getName() {
        return name;
    }

    @Override
    public List<T> receive(final int waitTimeSeconds, final int maxMessages) {
        return receive();
    }

    @Override
    public List<T> receive() {
        final T message = queue.peek();
        final List<T> messages = new ArrayList<T>(1);
        if (message != null) {
            messages.add(message);
        }
        return messages;
    }

    @Override
    public void delete(final T message) {
        queue.remove(message);
    }

    @Override
    public String toString() {
        return "InMemoryMessageQueue [name=" + name + ", queue=" + queue + "]";
    }

    @Override
    public void reset() {
        queue.clear();
    }
}
Copy the code

InMemoryMessageQueue has realized the MessageQueue, Resettable interface, which defines the ConcurrentLinkedQueue and InMemoryMessageQueuePoller two attributes; The send method would be to add a message queue, and then execute inMemoryMessageQueuePoller. The poll (); The sendDelayedMessage method is not currently supported

SqsMessageQueue

Cheddar/cheddar/cheddar-integration-aws/src/main/java/com/clicktravel/infrastructure/messaging/aws/sqs/SqsMessageQueue.j ava

public abstract class SqsMessageQueue<T extends Message> implements MessageQueue<T> { private final SqsQueueResource sqsQueueResource; public SqsMessageQueue(final SqsQueueResource sqsQueueResource) { this.sqsQueueResource = sqsQueueResource; } protected abstract String toSqsMessageBody(final T message); protected abstract T toMessage(final com.amazonaws.services.sqs.model.Message sqsMessage); @Override public String getName() { return sqsQueueResource.getQueueName(); } @Override public void send(final T message) throws MessageSendException { try { sqsQueueResource.sendMessage(toSqsMessageBody(message)); } catch (final AmazonClientException e) { throw new MessageSendException("Unable to send message on SQS queue:[" + sqsQueueResource.getQueueName() + "]", e); } } @Override public void sendDelayedMessage(final T message, final int delaySeconds) throws MessageSendException { try { sqsQueueResource.sendDelayedMessage(toSqsMessageBody(message), delaySeconds); } catch (final AmazonClientException e) { throw new MessageSendException("Unable to send message on SQS queue:[" + sqsQueueResource.getQueueName() + "]", e); } } @Override public List<T> receive() throws MessageReceiveException { try { return toMessages(sqsQueueResource.receiveMessages()); } catch (final AmazonClientException e) { throw new MessageReceiveException("Unable to receive messages on SQS queue:[" + sqsQueueResource.getQueueName() + "]", e); } } @Override public List<T> receive(final int waitTimeSeconds, final int maxMessages) throws MessageReceiveException { try { return toMessages(sqsQueueResource.receiveMessages(waitTimeSeconds, maxMessages)); } catch (final AmazonClientException e) { throw new MessageReceiveException("Unable to receive messages on SQS queue:[" + sqsQueueResource.getQueueName() + "]", e); } } private List<T> toMessages(final List<com.amazonaws.services.sqs.model.Message> sqsMessages) { final ArrayList<T> messages = new ArrayList<>(); for (final com.amazonaws.services.sqs.model.Message sqsMessage : sqsMessages) { messages.add(toMessage(sqsMessage)); } return messages; } @Override public void delete(final T message) throws MessageDeleteException { try { sqsQueueResource.deleteMessage(message.getReceiptHandle()); } catch (final AmazonClientException e) { throw new MessageDeleteException("Unable to delete message on SQS queue:[" + sqsQueueResource.getQueueName() + "]", e); } } public SqsQueueResource getSqsQueue() { return sqsQueueResource; }}Copy the code

SqsMessageQueue is an abstract class, declare to realize MessageQueue interface, its entrusted the send method sqsQueueResource. SendMessage; The entrusted to the sqsQueueResource sendDelayedMessage method. SendDelayedMessage

summary

The MessageSender interface of Cheddar defines send and sendDelayedMessage methods. MessageSenderImpl implements MessageSender interface, and its send method is delegated to messageQueue. Send. Its sendDelayedMessage method delegate to the messageQueue. SendDelayedMessage; InMemoryMessageQueue and SqsMessageQueue provide two implementations. The inMemory implementation does not support the sendDelayedMessage method.

doc

  • Cheddar