A classe tAMQP representa um objeto de comunicação do tipo AMQP (Advanced Message Queuing Protocol) da versão 0.9.1, que se comunica com servidores RabbitMQ. A instancia de uma classe permite a comunicação, envio e recebimento de mensagens através de um servidor AMQP, sendo possível o desenvolvimento de diversos tipos de aplicações, realizando transações ou comunicações padronizadas, de forma assíncrona, indiferente da arquitetura de cada uma delas.
Essa classe foi desenvolvida com base na existente biblioteca RabbitMQ.Client desenvolvida para C# .Net, sendo inclusive possível ser usado como referencia a documentação presente em https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html, assim como montar o ambiente de um servidor AMQP em https://www.rabbitmq.com/#getstarted.
|
#define AMQP_AUTODELETE .T.
#define AMQP_EXCLUSIVE .T.
#define AMQP_DURABLE .T.
#define AMQP_PERSISTENT .T.
#define AMQP_AUTOACK .T.
#define fixed_channel_id 1
User Function _sender
Local oSender := tAmqp():New("localhost",5672,"guest","guest",fixed_channel_id)
oSender:QueueDeclare("test_queue", .F.,.F.,AMQP_AUTODELETE )
oSender:BasicPublish("","test_queue", AMQP_PERSISTENT, "hello world!")
u___Receiver()
Freeobj(oSender)
Return
User Function _Receiver
Local oRecv := tAmqp():New("localhost",5672,"guest","guest",fixed_channel_id)
local var
oRecv:QueueDeclare("test_queue", .F.,.F.,AMQP_AUTODELETE)
oRecv:BasicConsume("test_queue", AMQP_AUTOACK, )
var := oRecv:Body
Freeobj(oRecv)
Return
|
17.3.0.x
Cria um objeto tAMQP com um determinado AMQP Server.
tAmqp():New( < uParam1 >, [ uParam2 ], [ uParam3 ], [ uParam4 ], [ uParam5 ] ) |
Nome | Tipo | Descrição | Obrigatório | Referência |
|---|---|---|---|---|
uParam1 | caractere | Endereço do AMQP Server | X | |
uParam2 | numérico | Porta do AMQP Server | ||
uParam3 | caractere | Usuário para logar na fila do AMQP Server | ||
uParam4 | caractere | Senha para logar na fila do AMQP Server | ||
uParam5 | caractere | Canal da comunicação com o AMQP Server |
Nome | Tipo | Descrição |
|---|---|---|
oObj | objeto | Nova instância da classe tAmqp |
Local oClient := tAmqp():New("localhost",5672,"guest","guest",1)
|
Indica qual canal esta sendo usado para a fila atual
Tipo | Valor Padrão | Somente Leitura |
|---|---|---|
numérico | N/A | N |
Conteudo da mensagem recebida após uma solicitação ao server AMQP via BasicConsume()
Tipo | Valor Padrão | Somente Leitura |
|---|---|---|
caractere | "" | N |
Indica qual o timeout atual que esta sendo usado para a comunicação com o AMQP
Tipo | Valor Padrão | Somente Leitura |
|---|---|---|
numérico | 5 | N |
Cria uma nova fila no AMQP Server.
QueueDeclare( [ cFila ], [ bisDurable ], [ bisExclusive ], [ bisAutodelete ] ) |
Nome | Tipo | Descrição | Obrigatório | Referência |
|---|---|---|---|---|
cFila | caractere | Indica o nome da fila onde será criada . | ||
bisDurable | lógico | Propriedade da fila bisDurable (TODO DOC). | ||
bisExclusive | lógico | Propriedade da fila bisExclusive (TODO DOC). | ||
bisAutodelete | lógico | Propriedade da fila bisAutodelete (TODO DOC). |
oSender:QueueDeclare(cFila,bisDurable,bisExclusive,bisAutodelete ) |
Cria uma nova exchange no AMQP Server.
QueueDeclare( [ cexchange ], [ ctype ], [ bisExclusive ], [ bisAutodelete ] ) |
Nome | Tipo | Descrição | Obrigatório | Referência |
|---|---|---|---|---|
cexchange | caractere | Indica o nome da exchange | X | |
ctype | carectere | Indica o tipo da fila (fanout, direct, topic) | X | |
bpassive | lógico | Indica bpassive(TODO DOC). | ||
| bburable | lógico | Indica bburable (TODO DOC). | ||
| bauto_delete | lógico | Indica binternal(TODO DOC). | ||
binternal | lógico | Indica binternal(TODO DOC). |
oSender:QueueDeclare(cFila,bisDurable,bisExclusive,bisAutodelete ) |
Resgata uma mensagem no AMQP Server.
BasicConsume( < cFila >, < bAck >, < bWaitingEvent > ) |
Nome | Tipo | Descrição | Obrigatório | Referência |
|---|---|---|---|---|
cFila | caractere | Indica o nome da fila onde será resgatada . | X | |
bAck | lógico | Indica se o consumo irá ser autoack. | X | |
bWaitingEvent | lógico | Ignora o timeout e se fica aguardando por uma nova mensagem na fila. | X |
oRecv:BasicConsume("test_queue", bAck, bWaitingEvent)
|
Liga uma fila a uma exchange para que as mensagens fluam (sujeitas a vários critérios) da exchange (origem) para a fila (destino).
QueueBind( < cExchange >, < cQueue >, < croutingkey > ) |
Nome | Tipo | Descrição | Obrigatório | Referência |
|---|---|---|---|---|
cExchange | caractere | Indica o nome da exchange. | X | |
cQueue | caractere | Indica o nome da fila | X | |
croutingkey | caractere | Indica o routingkey (TODO). |
oRecv:BasicConsume("test_queue", bAck, bWaitingEvent) |
Envia uma mensagem para o AMQP Server.
BasicPublish( < cExchange >, < cFila >, [ nPERSISTENT ], [ cMsg ], [ correlationID ], [ ReplyTo ] ) |
Nome | Tipo | Descrição | Obrigatório | Referência |
|---|---|---|---|---|
cExchange | caractere | Indica o nome da exchage onde será enviada a mensagem . | X | |
cFila | caractere | Indica o nome da fila onde será enviada a mensagem . | X | |
nPERSISTENT | lógico | Indica a requisição será persistente. | ||
cMsg | caractere | Informa a mensagem a ser postada | ||
correlationID | caractere | Id de correlação | ||
ReplyTo | caractere | Fila para resposta dessa mensagem |
oRecv:BasicPublish("test_exchange", "test_queue", AMQP_PERSISTENT, "Hello World!" )
|
Seta para a conexão atual parametros QoS(quality of service)
BasicQos( < nprefetchSize >, < nprefetchCount >, < bglobal > ) |
Nome | Tipo | Descrição | Obrigatório | Referência |
|---|---|---|---|---|
nprefetchSize | numérico | Indica cprefetchSize (TODO) | X | |
nprefetchCount | numérico | Indica cprefetchCount (TODO) | X | |
bglobal | lógico | Indica bglobal (TODO) | X |
oRecv:BasicQos(cprefetchSize, cprefetchCount, bglobal) |
Indica para a fila que voce recebeu e processou a mensagem com sucesso (acknowledge)
BasicAck( < ctag>, < cmultiple>) |
Nome | Tipo | Descrição | Obrigatório | Referência |
|---|---|---|---|---|
ctag | caractere | Indica ctag (TODO) | X | |
cmultiple | caractere | Indica cmultiple (TODO) | X |
oRecv:BasicQos(cprefetchSize, cprefetchCount, bglobal) |
Indica qual a informação de correlação da mensagem recebida
CorrelationID() |
oRecv:CorrelationID() |
Indica quantas mensagens tem na fila prontas para serem recebidas
oRecv:Received() |
oRecv:Received() |
Descreve o erro da ultima operação realizada
Error() |
oRecv:Error() |
Retorna qual o nome da fila atual que esta sendo usado
QueueName() |
Informa qual a deve ser a fila solicitada para a resposta da mensagem recebida
ReplyTo() |
oRecv:ReplyTo() |
Retorna o código de erro da ultima operação realizada
Status() |
oRecv:Status() |
Informa qual tag foi associada a uma fila, se houver
Tag() |
oRecv:Tag() |