1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
| <?php
include (__DIR__ .'/JS-amqp-include.php');
$reuse_connection = true;
$reuse_channel = true;
$cycle = 1000;
$input = "test";
$mypid = getmypid();
// $msg_body = implode(' ', array_slice($argv, 1));
echo "PID: $mypid\n";
$co = new AMQPConnection(array('host' => HOST, 'port' => PORT, 'vhost' => VHOST, 'login' => USER, 'password' => PASS));
$co->connect();
$ch = new AMQPChannel($co);
// !!!Do not set name for exchange!!!
$ex = new AMQPExchange($ch);
$q_rpc = new AMQPQueue($ch);
$q_rpc->setName(Q_RPC);
function process_message(AMQPEnvelope $msg, $q) {
global $ex;
global $mypid;
$re = "SERVER_PID:$mypid:{$msg->getBody()}";
$ex->publish($re, $msg->getReplyTo(), AMQP_NOPARAM, array('correlation_id' => $msg->getCorrelationId()));
// Manual ack if not using AMQP_AUTOACK
//$q->ack($msg->getDeliveryTag());
//echo "REQ:{$msg->getBody()}:ReplyTo:{$msg->getReplyTo()}:Reply:$re\n";
echo '.';
}
$q_rpc->consume('process_message', AMQP_AUTOACK);
?>
|