Sending messages

Retrieve the implementation of the message broker interface and send a message to a specific destination:

$broker = message_broker_get();

$message_body = json_encode(array('data' => 'anyData'));
$broker->sendMessage($message_body, 'yourDestination');

Remarks: Note that you are responsible for serializing and unserializing your message data when sending and receiving. We recommend to use json_encode and json_decode. But you can also use the built in serialize function and its pendant if you know for sure that every client is using PHP.

The destination variable refers to the exchange name in AMQP terminology.

You specify a routing key by passing a third array parameter that is able to hold more (implementation dependent) flags for a specific message:

$broker->sendMessage($message_body, 'yourDestination', array(
  'routing_key' => 'my.routing.key', 
  'content_type' => 'application/json');

Receiving messages

To receive messages, you have to writer a so called consumer, which is a user defined callable that is invoked for every incoming message on a specific queue.

You register your consumers via implementing the hook "message_broker_consumers". Here is an example that binds the consumer "user-synchronisation" to the queue "user-updates".

function mymodule_message_broker_consumers($self_name) {
  $consumers = array();
    
  $consumers['user-synchronisation'] = array(
    'queue' => 'user-updates', 
    'callback' => 'mymodule_handle_user_updates');    

  return $consumers;
}

function mymodule_handle_user_updates($message_body, $ack) {
  $messageData = json_decode($message_body);

  // Process message ...

  // and ack it
  $ack();
}

Make sure that you acknowledge every successfully processed message. If not, the message will be re-sent to your (or another) consumer later on.
If you want to stop processing a message - for instance because another dependent system is offline - simply throw an exception.

See the page How to write fault tolerant message consumers" for more details about this topic.

If you want to run your consumers now, use the Drush plugin that ships with the Message Broker AMQP module.

Comments

adityaj’s picture

mymodule_message_broker_consumers($self_name) not binding the consumer , notice $self_name unknown variable, The callback never calls the function 'callback' => 'mymodule_handle_user_updates');

if one directly call mymodule_handle_user_updates it gives error $ack() is not the valid php function name , can only use string for the function name

function mymodule_handle_user_updates($message_body, $ack) {
$messageData = json_decode($message_body);
// Process message ...
// and ack it
$ack();
}

pbattino’s picture

I think there is an error in the mymodule_handle_user_updates function: the variable $message_body is actually an object!
I tried $message_body->getBody() and I got a json string.