Oracle8i Application Developer's Guide - Advanced Queuing Release 2 (8.1.6) Part Number A76938-01 |
|
A Sample Application Using AQ, 5 of 6
A message can be dequeued from a queue using one of two dequeue methods: a correlation identifier or a message identifier.
A correlation identifier is a user defined message property (of VARCHAR2
datatype) while a message identifier is a system-assigned value (of RAW
datatype). Multiple messages with the same correlation identifier can be present in a queue while only one message with a given message identifier can be present. A dequeue call with a correlation identifier will directly remove a message of specific interest rather than using a combination of locked and remove mode to first examine the content and then remove the message. Hence, the correlation identifier usually contains the most useful attribute of a payload. If there are multiple messages with the same correlation identifier, the ordering (enqueue order) between messages may not be preserved on dequeue calls. The correlation identifier cannot be changed between successive dequeue calls without specifying the first message navigation option.
Note that dequeueing a message with either of the two dequeue methods will not preserve the message grouping property (see "Message Grouping" and "Message Navigation in Dequeue" for further information).
In the following scenario of the BooksOnLine
example, rush orders received by the East shipping site are processed first. This is achieved by dequeueing the message using the correlation identifier which has been defined to contain the order type (rush/normal). For an illustration of dequeueing using a message identifier please refer to the get_northamerican_orders
procedure discussed in the example under "Modes of Dequeuing".
CONNECT boladm/boladm; /* Create procedures to enqueue into single-consumer queues: */ create or replace procedure get_rushtitles(consumer in varchar2) as deq_cust_data BOLADM.customer_typ; deq_book_data BOLADM.book_typ; deq_item_data BOLADM.orderitem_typ; deq_msgid RAW(16); dopt dbms_aq.dequeue_options_t; mprop dbms_aq.message_properties_t; deq_order_data BOLADM.order_typ; qname varchar2(30); no_messages exception; pragma exception_init (no_messages, -25228); new_orders BOOLEAN := TRUE; begin dopt.consumer_name := consumer; dopt.wait := 1; dopt.correlation := 'RUSH'; IF (consumer = 'West_Shipping') THEN qname := 'WS.WS_bookedorders_que'; ELSIF (consumer = 'East_Shipping') THEN qname := 'ES.ES_bookedorders_que'; ELSE qname := 'OS.OS_bookedorders_que'; END IF; WHILE (new_orders) LOOP BEGIN dbms_aq.dequeue( queue_name => qname, dequeue_options => dopt, message_properties => mprop, payload => deq_order_data, msgid => deq_msgid); commit; deq_item_data := deq_order_data.items(1); deq_book_data := deq_item_data.item; dbms_output.put_line(' rushorder book_title: ' || deq_book_data.title || ' quantity: ' || deq_item_data.quantity); EXCEPTION WHEN no_messages THEN dbms_output.put_line (' ---- NO MORE RUSH TITLES ---- '); new_orders := FALSE; END; END LOOP; end; / CONNECT EXECUTE on get_rushtitles to ES; /* Dequeue the orders: */ CONNECT ES/ES; /* Dequeue all rush order titles for East_Shipping: */ EXECUTE BOLADM.get_rushtitles('East_Shipping');
set oraaq1 = OraDatabase.CreateAQ("WS.WS_backorders_que") set oraaq2 = OraDatabase.CreateAQ("ES.ES_backorders_que") set oraaq3 = OraDatabase.CreateAQ("CBADM.deferbilling_que") Set OraMsg = OraAq.AQMsg(ORATYPE_OBJECT, "BOLADM.order_typ") Set OraBackOrder = OraDatabase.CreateOraObject("BOLADM.order_typ") Private Sub Requeue_backorder Dim q as oraobject If sale_region = WEST then q = oraaq1 else if sale_region = EAST then q = oraaq2 else q = oraaq3 end if OraMsg.delay = 7*60*60*24 OraMsg = OraBackOrder 'OraOrder contains the order details Msgid = q.enqueue End Sub
public static void getRushTitles(Connection db_conn, String consumer) { AQSession aq_sess; Order deq_order; byte[] deq_msgid; AQDequeueOption deq_option; AQMessageProperty msg_prop; AQQueue bookedorders_q; AQMessage message; AQObjectPayload obj_payload; boolean new_orders = true; try { /* Create an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); deq_option = new AQDequeueOption(); deq_option.setConsumerName(consumer); deq_option.setWaitTime(1); deq_option.setCorrelation("RUSH"); if(consumer.equals("West_Shipping")) { bookedorders_q = aq_sess.getQueue("WS", "WS_bookedorders_que"); } else if(consumer.equals("East_Shipping")) { bookedorders_q = aq_sess.getQueue("ES", "ES_bookedorders_que"); } else { bookedorders_q = aq_sess.getQueue("OS", "OS_bookedorders_que"); } while(new_orders) { try { /* Dequeue the message */ message = bookedorders_q.dequeue(deq_option, Order.getFactory()); obj_payload = message.getObjectPayload(); deq_order = (Order)(obj_payload.getPayloadData()); System.out.println("Order number " + deq_order.getOrderno() + " is a rush order"); } catch (AQException aqex) { new_orders = false; System.out.println("No more rush titles"); System.out.println("Exception-1: " + aqex); } } } catch (Exception ex) { System.out.println("Exception-2: " + ex); } }
A consumer can dequeue a message from a multi-consumer normal queue by supplying the name that was used in the AQ$_AGENT
type of the DBMS_AQADM
.ADD_SUBSCRIBER
procedure or the recipient list of the message properties (see "Add a Subscriber" or Enqueue a Message [Specify Message Properties]).
consumer_name
field of the dequeue_options_t
record.
OCISetAttr
procedure to specify a text string as the OCI_ATTR_CONSUMER_NAME
of an OCI_DTYPE_AQDEQ_OPTIONS
descriptor.
There can be multiple processes or operating system threads that use the same consumer_name
to dequeue concurrently from a queue. In that case AQ will provide the first unlocked message that is at the head of the queue and is intended for the consumer. Unless the message ID of a specific message is specified during dequeue, the consumers can dequeue messages that are in the READY
state.
A message is considered PROCESSED
only when all intended consumers have successfully dequeued the message. A message is considered EXPIRED
if one or more consumers did not dequeue the message before the EXPIRATION
time. When a message has expired, it is moved to an exception queue.
The exception queue must also be a multi-consumer queue. Expired messages from multi-consumer queues cannot be dequeued the intended recipients of the message. However, they can be dequeued in the REMOVE
mode exactly once by specifying a NULL
consumer name in the dequeue options. Hence, from a dequeue perspective, multi-consumer exception queues behave like single-consumer queues because each expired message can be dequeued only once using a NULL
consumer name. Note that expired messages can be dequeued only by specifying a message ID if the multi-consumer exception queue was created in a queue table without the compatible parameter or with the compatible parameter set to '8.0'.
In release 8.0.x when two or more processes/threads that are using different consumer_names
are dequeuing from a queue, only one process/thread can dequeue a given message in the LOCKED
or REMOVE
mode at any time. What this means is that other consumers that need to the dequeue the same message will have to wait until the consumer that has locked the message commits or aborts the transaction and releases the lock on the message. However, while release 8.0.x did not support concurrency among different consumers for the same message., with release 8.1.6 all consumers can access the same message concurrently. The result is that two processes/threads that are using different consumer_name to dequeue the same message do not block each other. AQ achieves this improvement by decoupling the task of dequeuing a message and the process of removing the message from the queue. In release 8.1.6 only the queue monitor removes messages from multi-consumer queues. This allows dequeuers to complete the dequeue operation by not locking the message in the queue table. Since the queue monitor performs the task of removing messages that have been processed by all consumers from multi-consumer queues approximately once every minute, users may see a delay when the messages have been completely processed and when they are physically removed from the queue.
Consumers of a message in multi-consumer queues (either by virtue of being a subscriber to the queue or because the consumer was a recipient in the enqueuer's recipient list) can be local or remote.
NULL
NAME
and a NULL
ADDRESS
and PROTOCOL
field in the AQ$_AGENT
type (see "Agent" in Chapter 2, "Basic Components").
ADDRESS
field refers to a queue in the same database. In this case the consumer will dequeue the message from a different queue in the same database. These addresses will be of the form [schema]
.queue_name
where queue_name
(optionally qualified by the schema name) is the target queue. If the schema is not specified, the schema of the current user executing the ADD_SUBSCRIBER
procedure or the enqueue is used (see "Add a Subscriber", or "Enqueue a Message" in Chapter 11, "Operational Interface: Basic Operations"). Use the DBMS_AQADM
.SCHEDULE_PROPAGATION
command with a NULL
destination (which is the default) to schedule propagation to such remote consumers (see "Schedule a Queue Propagation" in Chapter 9, "Administrative Interface").
ADDRESS
field refers to a queue in a different database. In this case the database must be reachable using database links and the PROTOCOL
must be either NULL
or 0. These addresses will be of the form [schema]
.queue_name@dblink
. If the schema is not specified, the schema of the current user executing the ADD_SUBSCRIBER
procedure or the enqueue is used. If the database link is not a fully qualified name (does not have a domain name specified) the default domain as specified by the db_domain
init
.ora
parameter will be used. Use the DBMS_AQADM
.SCHEDULE_PROPAGATION
procedure with the database link as the destination to schedule the propagation. AQ does not support the use of synonyms to refer to queues or database links.
ADDRESS
field refers to a destination that can be reached by a third party protocol. You will need to refer to the documentation of the third party software to determine how to specify the ADDRESS
and the PROTOCOL
database link, and on how to schedule propagation.
When a consumer is remote, a message will be marked as PROCESSED
in the source queue immediately after the message has been propagated even though the consumer may not have dequeued the message at the remote queue. Similarly, when a propagated message expires at the remote queue, the message is moved to the DEFAULT
exception queue of the remote queue's queue table, and not to the exception queue of the local queue. As can be seen in both cases, AQ does not currently propagate the exceptions to the source queue. You can use the MSGID
and the ORIGINAL_MSGID
columns in the queue table view (AQ$<queue_table>
) to chain the propagated messages. When a message with message ID m1 is propagated to a remote queue, m1 is stored in the ORIGINAL_MSGID
column of the remote queue.
The DELAY
, EXPIRATION
and PRIORITY
parameters apply identically to both local and remote consumers. AQ accounts for any delay in propagation by adjusting the DELAY
and EXPIRATION
parameters accordingly. For example, if the EXPIRATION
is set to one hour, and the message is propagated after 15 minutes, the expiration at the remote queue will be set to 45 minutes.
Since the database handles message propagation, OO4O does not differentiate between remote and local recipients. The same sequence of calls/steps are required to dequeue a message, for local and remote recipients.
You have several options for selecting a message from a queue. You can select the "first message". Alternatively, once you have selected a message and established its position in the queue (for example, as the fourth message), you can then retrieve the "next message".
These selections work in a slightly different way if the queue is enabled for transactional grouping.
Note that the transaction grouping property is negated if a dequeue is performed in one of the following ways: dequeue by specifying a correlation identifier, dequeue by specifying a message identifier, or dequeueing some of the messages of a transaction and committing (see "Dequeue Methods").
If in navigating through the queue, the program reaches the end of the queue while using the "next message" or "next transaction" option, and you have specified a non-zero wait time, then the navigating position is automatically changed to the beginning of the queue.
The following scenario in the BooksOnLine
example continues the message grouping example already discussed with regard to enqueuing (see "Dequeue Methods").
The get_orders
() procedure dequeues orders from the OE_neworders_que
. Recall that each transaction refers to an order and each message corresponds to an individual book in the order. The get_orders
() procedure loops through the messages to dequeue the book orders. It resets the position to the beginning of the queue using the first message option before the first dequeues. It then uses the next message navigation option to retrieve the next book (message) of an order (transaction). If it gets an error message indicating all message in the current group/transaction have been fetched, it changes the navigation option to next transaction and get the first book of the next order. It then changes the navigation option back to next message for fetching subsequent messages in the same transaction. This is repeated until all orders (transactions) have been fetched.
CONNECT boladm/boladm; create or replace procedure get_new_orders as deq_cust_data BOLADM.customer_typ; deq_book_data BOLADM.book_typ; deq_item_data BOLADM.orderitem_typ; deq_msgid RAW(16); dopt dbms_aq.dequeue_options_t; mprop dbms_aq.message_properties_t; deq_order_data BOLADM.order_typ; qname VARCHAR2(30); no_messages exception; end_of_group exception; pragma exception_init (no_messages, -25228); pragma exception_init (end_of_group, -25235); new_orders BOOLEAN := TRUE; BEGIN dopt.wait := 1; dopt.navigation := DBMS_AQ.FIRST_MESSAGE; qname := 'OE.OE_neworders_que'; WHILE (new_orders) LOOP BEGIN LOOP BEGIN dbms_aq.dequeue( queue_name => qname, dequeue_options => dopt, message_properties => mprop, payload => deq_order_data, msgid => deq_msgid); deq_item_data := deq_order_data.items(1); deq_book_data := deq_item_data.item; deq_cust_data := deq_order_data.customer; IF (deq_cust_data IS NOT NULL) THEN dbms_output.put_line(' **** NEXT ORDER **** '); dbms_output.put_line('order_num: ' || deq_order_data.orderno); dbms_output.put_line('ship_state: ' || deq_cust_data.state); END IF; dbms_output.put_line(' ---- next book ---- '); dbms_output.put_line(' book_title: ' || deq_book_data.title || ' quantity: ' || deq_item_data.quantity); EXCEPTION WHEN end_of_group THEN dbms_output.put_line ('*** END OF ORDER ***'); commit; dopt.navigation := DBMS_AQ.NEXT_TRANSACTION; END; END LOOP; EXCEPTION WHEN no_messages THEN dbms_output.put_line (' ---- NO MORE NEW ORDERS ---- '); new_orders := FALSE; END; END LOOP; END; / CONNECT EXECUTE ON get_new_orders to OE; /* Dequeue the orders: */ CONNECT OE/OE; EXECUTE BOLADM.get_new_orders;
Dim OraSession as object Dim OraDatabase as object Dim OraAq as object Dim OraMsg as Object Dim OraOrder,OraItemList,OraItem,OraBook,OraCustomer as Object Dim Msgid as String Set OraSession = CreateObject("OracleInProcServer.XOraSession") Set OraDatabase = OraSession.DbOpenDatabase("", "boladm/boladm", 0&) set oraaq = OraDatabase.CreateAQ("OE.OE_neworders_que") Set OraMsg = OraAq.AQMsg(ORATYPE_OBJECT, "BOLADM.order_typ") OraAq.wait = 1 OraAq.Navigation = ORAAQ_DQ_FIRST_MESSAGE private sub get_new_orders Dim MsgIsDequeued as Boolean On Error goto ErrHandler MsgIsDequeued = TRUE msgid = q.Dequeue if MsgIsDequeued then set OraOrder = OraMsg OraItemList = OraOrder("items") OraItem = OraItemList(1) OraBook = OraItem("item") OraCustomer = OraOrder("customer") ' Populate the textboxes with the values if( OraCustomer ) then if OraAq.Navigation <> ORAAQ_DQ_NEXT_MESSAGE then MsgBox " ********* NEXT ORDER *******" end if txt_book_orderno = OraOrder("orderno") txt_book_shipstate = OraCustomer("state") End if OraAq.Navigation = ORAAQ_DQ_NEXT_MESSAGE txt_book_title = OraBook("title") txt_book_qty = OraItem("quantity") Else MsgBox " ********* END OF ORDER *******" End if ErrHandler : 'Handle error case, like no message etc If OraDatabase.LastServerErr = 25228 then OraAq.Navigation = ORAAQ_DQ_NEXT_TRANSACTION MsgIsDequeued = FALSE Resume Next End If 'Process other errors end sub
No example is provided with this release.
A dequeue request can either view a message or delete a message (see "Dequeue a Message" in Chapter 11, "Operational Interface: Basic Operations").
If a message is browsed it remains available for further processing. Similarly if a message is locked it remains available for further processing once the lock on it is released by performing a transaction commit or rollback. Once a message is deleted using either of the remove modes, it is no longer available for dequeue requests.
When a message is dequeued using REMOVE_NODATA
mode, the payload of the message is not retrieved. This mode can be useful when the user has already examined the message payload, possibly by means of a previous BROWSE
dequeue. In this way, you can avoid the overhead of payload retrieval which can be substantial for large payloads
A message is retained in the queue table after it has been removed only if a retention time is specified for a queue. Messages cannot be retained in exception queues (refer to the section on exceptions for further information). Removing a message with no data is generally used if the payload is known (from a previous browse/locked mode dequeue call), or the message will not be used.
Note that after a message has been browsed there is no guarantee that the message can be dequeued again since a dequeue call from a concurrent user might have removed the message. To prevent a viewed message from being dequeued by a concurrent user, you should view the message in the locked mode.
You need to take special care while using the browse mode for other reasons as well. The dequeue position is automatically changed to the beginning of the queue if a non-zero wait time is specified and the navigating position reaches the end of the queue. Hence repeating a dequeue call in the browse mode with the "next message" navigation option and a non-zero wait time can dequeue the same message over and over again. We recommend that you use a non-zero wait time for the first dequeue call on a queue in a session, and then use a zero wait time with the next message navigation option for subsequent dequeue calls. If a dequeue call gets an "end of queue" error message, the dequeue position can be explicitly set by the dequeue call to the beginning of the queue using the "first message" navigation option, following which the messages in the queue can be browsed again.
In the following scenario from the BooksOnLine
example, international orders destined to Mexico and Canada are to be processed separately due to trade policies and carrier discounts. Hence, a message is viewed in the locked mode (so no other concurrent user removes the message) and the customer country (message payload) is checked. If the customer country is Mexico or Canada the message be deleted from the queue using the remove with no data (since the payload is already known) mode. Otherwise, the lock on the message is released by the commit call. Note that the remove dequeue call uses the message identifier obtained from the locked mode dequeue call. The shipping_bookedorder_deq
(refer to the example code for the description of this procedure) call illustrates the use of the browse mode.
CONNECT boladm/boladm; create or replace procedure get_northamerican_orders as deq_cust_data BOLADM.customer_typ; deq_book_data BOLADM.book_typ; deq_item_data BOLADM.orderitem_typ; deq_msgid RAW(16); dopt dbms_aq.dequeue_options_t; mprop dbms_aq.message_properties_t; deq_order_data BOLADM.order_typ; deq_order_nodata BOLADM.order_typ; qname VARCHAR2(30); no_messages exception; pragma exception_init (no_messages, -25228); new_orders BOOLEAN := TRUE; begin dopt.consumer_name := consumer; dopt.wait := DBMS_AQ.NO_WAIT; dopt.navigation := dbms_aq.FIRST_MESSAGE; dopt.dequeue_mode := DBMS_AQ.LOCKED; qname := 'OS.OS_bookedorders_que'; WHILE (new_orders) LOOP BEGIN dbms_aq.dequeue( queue_name => qname, dequeue_options => dopt, message_properties => mprop, payload => deq_order_data, msgid => deq_msgid); deq_item_data := deq_order_data.items(1); deq_book_data := deq_item_data.item; deq_cust_data := deq_order_data.customer; IF (deq_cust_data.country = 'Canada' OR deq_cust_data.country = 'Mexico' ) THEN dopt.dequeue_mode := dbms_aq.REMOVE_NODATA; dopt.msgid := deq_msgid; dbms_aq.dequeue( queue_name => qname, dequeue_options => dopt, message_properties => mprop, payload => deq_order_nodata, msgid => deq_msgid); commit; dbms_output.put_line(' **** next booked order **** '); dbms_output.put_line('order_no: ' || deq_order_data.orderno || ' book_title: ' || deq_book_data.title || ' quantity: ' || deq_item_data.quantity); dbms_output.put_line('ship_state: ' || deq_cust_data.state || ' ship_country: ' || deq_cust_data.country || ' ship_order_type: ' || deq_order_data.ordertype); END IF; commit; dopt.dequeue_mode := DBMS_AQ.LOCKED; dopt.msgid := NULL; dopt.navigation := dbms_aq.NEXT_MESSAGE; EXCEPTION WHEN no_messages THEN dbms_output.put_line (' ---- NO MORE BOOKED ORDERS ---- '); new_orders := FALSE; END; END LOOP; end; / CONNECT EXECUTE on get_northamerican_orders to OS; CONNECT ES/ES; /* Browse all booked orders for East_Shipping: */ EXECUTE BOLADM.shipping_bookedorder_deq('East_Shipping', DBMS_AQ.BROWSE); CONNECT OS/OS; /* Dequeue all international North American orders for Overseas_Shipping: */ EXECUTE BOLADM.get_northamerican_orders;
OO4O supports all the mdoes of dequeuing described above. Possible values include:
Dim OraSession as object Dim OraDatabase as object Dim OraAq as object Dim OraMsg as Object Dim OraOrder,OraItemList,OraItem,OraBook,OraCustomer as Object Dim Msgid as String Set OraSession = CreateObject("OracleInProcServer.XOraSession") Set OraDatabase = OraSession.DbOpenDatabase("", "boladm/boladm", 0&) set oraaq = OraDatabase.CreateAQ("OE.OE_neworders_que") OraAq.DequeueMode = ORAAQ_DQ_BROWSE
public static void get_northamerican_orders(Connection db_conn) { AQSession aq_sess; Order deq_order; Customer deq_cust; String cust_country; byte[] deq_msgid; AQDequeueOption deq_option; AQMessageProperty msg_prop; AQQueue bookedorders_q; AQMessage message; AQObjectPayload obj_payload; boolean new_orders = true; try { /* Create an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); deq_option = new AQDequeueOption(); deq_option.setConsumerName("Overseas_Shipping"); deq_option.setWaitTime(AQDequeueOption.WAIT_NONE); deq_option.setNavigationMode(AQDequeueOption.NAVIGATION_FIRST_MESSAGE); deq_option.setDequeueMode(AQDequeueOption.DEQUEUE_LOCKED); bookedorders_q = aq_sess.getQueue("OS", "OS_bookedorders_que"); while(new_orders) { try { /* Dequeue the message - browse with lock */ message = bookedorders_q.dequeue(deq_option, Order.getFactory()); obj_payload = message.getObjectPayload(); deq_msgid = message.getMessageId(); deq_order = (Order)(obj_payload.getPayloadData()); deq_cust = deq_order.getCustomer(); cust_country = deq_cust.getCountry(); if(cust_country.equals("Canada") || cust_country.equals("Mexico")) { deq_option.setDequeueMode( AQDequeueOption.DEQUEUE_REMOVE_NODATA); deq_option.setMessageId(deq_msgid); /* Delete the message */ bookedorders_q.dequeue(deq_option, Order.getFactory()); System.out.println("---- next booked order ------"); System.out.println("Order no: " + deq_order.getOrderno()); System.out.println("Ship state: " + deq_cust.getState()); System.out.println("Ship country: " + deq_cust.getCountry()); System.out.println("Order type: " + deq_order.getOrdertype()); } db_conn.commit(); deq_option.setDequeueMode(AQDequeueOption.DEQUEUE_LOCKED); deq_option.setMessageId(null); deq_option.setNavigationMode( AQDequeueOption.NAVIGATION_NEXT_MESSAGE); } catch (AQException aqex) { new_orders = false; System.out.println("--- No more booked orders ----"); System.out.println("Exception-1: " + aqex); } } } catch (Exception ex) { System.out.println("Exception-2: " + ex); } }
One of the most important features of AQ is that it allows applications to block on one or more queues waiting for the arrival of either a newly enqueued message or for a message that becomes ready. You can use the DEQUEUE
operation to wait for arrival of a message in a queue (see "Dequeue a Message") or the LISTEN
operation to wait for the arrival of a message in more than one queue (see "Listen to One (Many) Queue(s)" in Chapter 11, "Operational Interface: Basic Operations").
When the blocking DEQUEUE
call returns, it returns the message properties and the message payload. By contrast, when the blocking LISTEN
call returns, it discloses only the name of the queue in which a message has arrived. A subsequent DEQUEUE
operation is needed to dequeue the message.
Applications can optionally specify a timeout of zero or more seconds to indicate the time that AQ must wait for the arrival of a message. The default is to wait forever until a message arrives in the queue. This optimization is important in two ways. It removes the burden of continually polling for messages from the application. And it saves CPU and network resource because the application remains blocked until a new message is enqueued or becomes READY
after its DELAY
time. In release 8.1.5 applications can also perform a blocking dequeue on exception queues to wait for arrival of EXPIRED
messages.
A process or thread that is blocked on a dequeue is either woken up directly by the enqueuer if the new message has no DELAY
or is woken up by the queue monitor process when the DELAY
or EXPIRATION
time has passed. Applications can not only wait for the arrival of a message in the queue that an enqueuer enqueues a message, but also on a remote queue, provided that propagation has been schedule to the remote queue using DBMS_AQADM
.SCHEDULE_PROPAGATION
. In this case the AQ propagator will wake-up the blocked dequeuer after a message has been propagated.
In the BooksOnLine example, the get_rushtitles
procedure discussed under dequeue methods specifies a wait time of 1 second in the dequeue_options
argument for the dequeue call. Wait time can be specified in different ways as illustrated in the code below.
DBMS_AQ
.NO_WAIT
, a wait time of 0 seconds is implemented. The dequeue call in this case will return immediately even if there are no messages in the queue.
DBMS_AQ
.FOREVER
, the dequeue call is blocked without a timeout until a message is available in the queue.
/* dopt is a variable of type dbms_aq.dequeue_options_t. Set the dequeue wait time to 10 seconds: */ dopt.wait := 10; /* Set the dequeue wait time to 0 seconds: */ dopt.wait := DBMS_AQ.NO_WAIT; /* Set the dequeue wait time to infinite (forever): */ dopt.wait := DBMS_AQ.FOREVER;
OO4O supports asynchronous dequeuing of messages. First, the monitor is started for a particular queue. When messages that fulfil the user criteria are dequeued, the user's callback object is notified. For more details, refer to the MonitorStart method of the OraAQ Object.
AQDequeueOption deq-opt;
deq-opt = new AQDequeueOption ();
This feature allows OCI clients to receive notifications when there is a message in a queue of interest. The client can use it to monitor multiple subscriptions. The client does not have to be connected to the database to receive notifications regarding its subscriptions.
You use the OCI function, OCISubcriptionRegister
, to register interest in messages in a queue (see "Register for Notification" in Chapter 11, "Operational Interface: Basic Operations").
The client can specify a callback function which is invoked for every new message that is enqueued. For non-persistent queues, the message is delivered to the client as part of the notification. For persistent queues, only the message properties are delivered as part of the notification. Consequently, in the case of persistent queues, the client has to make an explicit dequeue to access the contents of the message.
In the BooksOnLine
application, a customer can request Fed-ex shipping (priority 1), Priority air shipping (priority 2). or Regular ground shipping (priority 3).
The shipping application then ships the orders according to the user's request. It is of interest to BooksOnLine
to find out how many requests of each shipping type come in each day. The application uses asynchronous notification facility for this purpose. It registers for notification on the WS
.WS_bookedorders_que
. When it is notified of new message in the queue, it updates the count for the appropriate shipping type depending on the priority of the message.
Refer to the Visual Basic online help, "Monitoring Messages".
This feature is not supported by the Java API.
This example illustrates the use of OCIRegister. At the shipping site, an OCI client program keeps track of how many orders were made for each of the shipping types, FEDEX, AIR and GROUND. The priority field of the message enables us to determine the type of shipping desired.
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> #ifdef WIN32COMMON #define sleep(x) Sleep(1000*(x)) #endif static text *username = (text *) "WS"; static text *password = (text *) "WS"; static OCIEnv *envhp; static OCIServer *srvhp; static OCIError *errhp; static OCISvcCtx *svchp; static void checkerr(/*_ OCIError *errhp, sword status _*/); struct ship_data { ub4 fedex; ub4 air; ub4 ground; }; typedef struct ship_data ship_data; int main(/*_ int argc, char *argv[] _*/); /* Notify callback: */ ub4 notifyCB(ctx, subscrhp, pay, payl, desc, mode) dvoid *ctx; OCISubscription *subscrhp; dvoid *pay; ub4 payl; dvoid *desc; ub4 mode; { text *subname; ub4 size; ship_data *ship_stats = (ship_data *)ctx; text *queue; text *consumer; OCIRaw *msgid; ub4 priority; OCIAQMsgProperties *msgprop; OCIAttrGet((dvoid *)subscrhp, OCI_HTYPE_SUBSCRIPTION, (dvoid *)&subname, &size, OCI_ATTR_SUBSCR_NAME, errhp); /* Extract the attributes from the AQ descriptor. Queue name: */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&queue, &size, OCI_ATTR_QUEUE_NAME, errhp); /* Consumer name: */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&consumer, &size, OCI_ATTR_CONSUMER_NAME, errhp); /* Msgid: */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgid, &size, OCI_ATTR_NFY_MSGID, errhp); /* Message properties: */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgprop, &size, OCI_ATTR_MSG_PROP, errhp); /* Get priority from message properties: */ checkerr(errhp, OCIAttrGet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *)&priority, 0, OCI_ATTR_PRIORITY, errhp)); switch (priority) { case 1: ship_stats->fedex++; break; case 2 : ship_stats->air++; break; case 3: ship_stats->ground++; break; default: printf(" Error priority %d", priority); } } int main(argc, argv) int argc; char *argv[]; { OCISession *authp = (OCISession *) 0; OCISubscription *subscrhp[8]; ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ; ship_data ctx = {0,0,0}; ub4 sleep_time = 0; printf("Initializing OCI Process\n"); /* Initialize OCI environment with OCI_EVENTS flag set: */ (void) OCIInitialize((ub4) OCI_EVENTS|OCI_OBJECT, (dvoid *)0, (dvoid * (*)(dvoid *, size_t)) 0, (dvoid * (*)(dvoid *, dvoid *, size_t))0, (void (*)(dvoid *, dvoid *)) 0 ); printf("Initialization successful\n"); printf("Initializing OCI Env\n"); (void) OCIEnvInit( (OCIEnv **) &envhp, OCI_DEFAULT, (size_t) 0, (dvoid **) 0 ); printf("Initialization successful\n"); checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, OCI_HTYPE_ ERROR, (size_t) 0, (dvoid **) 0)); checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, OCI_HTYPE_ SERVER, (size_t) 0, (dvoid **) 0)); checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, OCI_HTYPE_ SVCCTX, (size_t) 0, (dvoid **) 0)); printf("connecting to server\n"); checkerr(errhp, OCIServerAttach( srvhp, errhp, (text *)"inst1_alias", strlen("inst1_alias"), (ub4) OCI_DEFAULT)); printf("connect successful\n"); /* Set attribute server context in the service context: */ checkerr(errhp, OCIAttrSet( (dvoid *) svchp, OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, OCI_ATTR_SERVER, (OCIError *) errhp)); checkerr(errhp, OCIHandleAlloc((dvoid *) envhp, (dvoid **)&authp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0)); /* Set username and password in the session handle: */ checkerr(errhp, OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION, (dvoid *) username, (ub4) strlen((char *)username), (ub4) OCI_ATTR_USERNAME, errhp)); checkerr(errhp, OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION, (dvoid *) password, (ub4) strlen((char *)password), (ub4) OCI_ATTR_PASSWORD, errhp)); /* Begin session: */ checkerr(errhp, OCISessionBegin ( svchp, errhp, authp, OCI_CRED_RDBMS, (ub4) OCI_DEFAULT)); (void) OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *) authp, (ub4) 0, (ub4) OCI_ATTR_SESSION, errhp); /* Register for notification: */ printf("allocating subscription handle\n"); subscrhp[0] = (OCISubscription *)0; (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (size_t) 0, (dvoid **) 0); printf("setting subscription name\n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) "WS.WS_BOOKEDORDERS_QUE:BOOKED_ORDERS", (ub4) strlen("WS.WS_BOOKEDORDERS_QUE:BOOKED_ORDERS"), (ub4) OCI_ATTR_SUBSCR_NAME, errhp); printf("setting subscription callback\n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) notifyCB, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *)&ctx, (ub4)sizeof(ctx), (ub4) OCI_ATTR_SUBSCR_CTX, errhp); printf("setting subscription namespace\n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) &namespace, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); printf("Registering \n"); checkerr(errhp, OCISubscriptionRegister(svchp, subscrhp, 1, errhp, OCI_DEFAULT)); sleep_time = (ub4)atoi(argv[1]); printf ("waiting for %d s", sleep_time); sleep(sleep_time); printf("Exiting"); exit(0); } void checkerr(errhp, status) OCIError *errhp; sword status; { text errbuf[512]; sb4 errcode = 0; switch (status) { case OCI_SUCCESS: break; case OCI_SUCCESS_WITH_INFO: (void) printf("Error - OCI_SUCCESS_WITH_INFO\n"); break; case OCI_NEED_DATA: (void) printf("Error - OCI_NEED_DATA\n"); break; case OCI_NO_DATA: (void) printf("Error - OCI_NODATA\n"); break; case OCI_ERROR: (void) OCIErrorGet((dvoid *)errhp, (ub4) 1, (text *) NULL, &errcode, errbuf, (ub4) sizeof(errbuf), OCI_HTYPE_ERROR); (void) printf("Error - %.*s\n", 512, errbuf); break; case OCI_INVALID_HANDLE: (void) printf("Error - OCI_INVALID_HANDLE\n"); break; case OCI_STILL_EXECUTING: (void) printf("Error - OCI_STILL_EXECUTE\n"); break; case OCI_CONTINUE: (void) printf("Error - OCI_CONTINUE\n"); break; default: break; } }
If the transaction dequeuing the message from a queue fails, it is regarded as an unsuccessful attempt to remove the message. AQ records the number of failed attempts to remove the message in the message history. Applications can query the retry_count column of the queue table view to find out the number of unsuccessful attempts on a message. In addition, AQ also allows the application to specify, at the queue level, the maximum number of retries for messages in the queue. If the number of failed attempts to remove a message exceed this number, the message is moved to the exception queue and is no longer available to applications.
Retry Delay
If the transaction receiving a message aborted, this could be because of a 'bad' condition. AQ allows users to 'hide' the bad message for a pre-specified interval. A retry_delay can be specified along with maximum retries. This means that a message which has had a failed attempt will be visible in the queue for dequeue after 'retry_delay' interval. Until then it will be in the 'WAITING' state. The AQ background process, the time manager enforces the retry delay property. The default value for maximum retries is 5 and that for retry delay is 0. Note that maximum retries and retry delay are not available with 8.0 compatible multi-consumer queues.
/* Create a package that enqueue with delay set to one day: /* CONNECT BOLADM/BOLADM > /* queue has max retries = 4 and retry delay = 12 hours */ execute dbms_aqadm.alter_queue(queue_name = 'WS.WS_BOOKED_ORDERS_QUE', max_retr ies = 4, retry_delay = 3600*12); > /* processes the next order available in the booked_order_queue */ CREATE OR REPLACE PROCEDURE process_next_order() AS dqqopt dbms_aq.dequeue_options_t; msgprop dbms_aq.message_properties_t; deq_msgid RAW(16); book BOLADM.book_typ; item BOLADM.orderitem_typ; BOLADM.order_typ order; BEGIN > dqqopt.dequeue_option := DBMS_AQ.FIRST_MESSAGE; dbms_aq.dequeue('WS.WS_BOOKED_ORDERS_QUEUE', dqqopt, msgprop, order, deq_msgid ); > /* for simplicity, assume order has a single item */ item = order.items(1); book = the_orders.item; > /* assume search_inventory searches inventory for the book */ /* if we don't find the book in the warehouse, abort transaction */ IF (search_inventory(book) != TRUE) rollback; ELSE process_order(order); END IF; > END; /
Use the dbexecutesql interface from the database for this functionality.
public static void setup_queue(Connection db_conn) { AQSession aq_sess; AQQueue bookedorders_q; AQQueueProperty q_prop; try { /* Create an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); bookedorders_q = aq_sess.getQueue("WS", "WS_bookedorders_que"); /* Alter queue - set max retries = 4 and retry delay = 12 hours */ q_prop = new AQQueueProperty(); q_prop.setMaxRetries(4); q_prop.setRetryInterval(3600*12); // specified in seconds bookedorders_q.alterQueue(q_prop); } catch (Exception ex) { System.out.println("Exception: " + ex); } } public static void process_next_order(Connection db_conn) { AQSession aq_sess; Order deq_order; OrderItem order_item; Book book; AQDequeueOption deq_option; AQMessageProperty msg_prop; AQQueue bookedorders_q; AQMessage message; AQObjectPayload obj_payload; try { /* Create an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); deq_option = new AQDequeueOption(); deq_option.setNavigationMode(AQDequeueOption.NAVIGATION_FIRST_MESSAGE); bookedorders_q = aq_sess.getQueue("WS", "WS_bookedorders_que"); /* Dequeue the message */ message = bookedorders_q.dequeue(deq_option, Order.getFactory()); obj_payload = message.getObjectPayload(); deq_order = (Order)(obj_payload.getPayloadData()); /* for simplicity, assume order has a single item */ order_item = deq_order.getItems().getElement(0); book = order_item.getItem(); /* assume search_inventory searches inventory for the book * if we don't find the book in the warehouse, abort transaction */ if(search_inventory(book) != true) db_conn.rollback(); else process_order(deq_order); } catch (AQException aqex) { System.out.println("Exception-1: " + aqex); } catch (Exception ex) { System.out.println("Exception-2: " + ex); } }
AQ provides four integrated mechanisms to support exception handling in applications: EXCEPTION_QUEUES
, EXPIRATION
, MAX_RETRIES
and RETRY_DELAY
.
An exception_queue
is a repository for all expired or unserviceable messages. Applications cannot directly enqueue into exception queues. Also, a multi-consumer exception queue cannot have subscribers associated with it. However, an application that intends to handle these expired or unserviceable messages can dequeue from the exception queue. The exception queue created for messages intended for a multi-consumer queue must itself be a multi-consumer queue. Like any other queue, the exception queue must be enabled for dequeue using the DBMS_AQADM
.START_QUEUE
procedure. You will get an Oracle error if you try to enable an exception queue for enqueue.
When a message has expired, it is moved to an exception queue. The exception queue for a message in multi-consumer queue must also be a multi-consumer queue. Expired messages from multi-consumer queues cannot be dequeued by the intended recipients of the message. However, they can be dequeued in the REMOVE
mode exactly once by specifying a NULL
consumer name in the dequeue options. Hence, from a dequeue perspective multi-consumer exception queues behave like single-consumer queues because each expired message can be dequeued only once using a NULL
consumer name. Messages can also be dequeued from the exception queue by specifying the message ID. Note that expired messages can be dequeued only by specifying a message ID if the multi-consumer exception queue was created in a queue table without the compatible parameter or with the compatible parameter set to '8.0'.
The exception queue is a message property that can be specified during enqueue time (see "Enqueue a Message [Specify Message Properties]" in Chapter 11, "Operational Interface: Basic Operations"). In PL/SQL users can use the exception_queue
attribute of the DBMS_AQ
.MESSAGE_PROPERTIES_T
record to specify the exception queue. In OCI users can use the OCISetAttr
procedure to set the OCI_ATTR_EXCEPTION_QUEUE
attribute of the OCIAQMsgProperties
descriptor.
If an exception queue is not specified, the default exception queue is used. If the queue is created in a queue table, say QTAB
, the default exception queue will be called AQ$_QTAB_E. The default exception queue is automatically created when the queue table is created. Messages are moved to the exception queues by AQ under the following conditions.
DBMS_AQ
.NEVER
, which means the messages will not expire.
For messages intended for multiple recipients, each message keeps a separate retry count for each recipient. The message is moved to the exception queue only when retry counts for all recipients of the message have exceeded the specified retry limit. The default retry limit is 5 for single consumer queues and 8.1-compatible multi-consumer queues. No retry limit is not supported for 8.0- compatible multi-consumer queues.
DBMS_AQ
.DEQUEUE
. If the dequeue procedure succeeds but the PL/SQL procedure raises an exception, AQ will attempt to increment the RETRY_COUNT
of the message returned by the dequeue procedure.
Messages intended for 8.1-compatible multi-consumer queues cannot be dequeued by the intended recipients once the messages have been moved to an exception queue. These messages should instead be dequeued in the REMOVE
or BROWSE
mode exactly once by specifying a NULL
consumer name in the dequeue options. The messages can also be dequeued by their message IDs.
Messages intended for single consumer queues, or for 8.0-compatible multi-consumer queues, can only be dequeued by their message IDs once the messages have been moved to an exception queue.
Users can associate a RETRY_DELAY
with a queue. The default value for this parameter is 0 which means that the message will be available for dequeue immediately after the RETRY_COUNT
is incremented. Otherwise the message will be unavailable for RETRY_DELAY
seconds. After RETRY_DELAY
seconds the queue monitor will mark the message as READY
.
In the BooksOnLine
application, the business rule for each shipping region is that an order will be placed in a back order queue if the order cannot be filled immediately. The back order application will try to fill the order once a day. If the order cannot be filled within 5 days, it is placed in an exception queue for special processing. You can implement this process by making use of the retry and exception handling features in AQ.
The example below shows how you can create a queue with specific maximum retry and retry delay interval.
/* Example for creating a back order queue in Western Region which allows a maximum of 5 retries and 1 day delay between each retry. */ CONNECT BOLADM/BOLADM BEGIN dbms_aqadm.create_queue ( queue_name => 'WS.WS_backorders_que', queue_table => 'WS.WS_orders_mqtab', max_retries => 5, retry_delay => 60*60*24); END; / /* Create an exception queue for the back order queue for Western Region. */ CONNECT BOLADM/BOLADM BEGIN dbms_aqadm.create_queue ( queue_name => 'WS.WS_backorders_excpt_que', queue_table => 'WS.WS_orders_mqtab', queue_type => DBMS_AQADM.EXCEPTION_QUEUE); end; / /* Enqueue a message to WS_backorders_que and specify WS_backorders_excpt_que as the exception queue for the message: */ CONNECT BOLADM/BOLADM CREATE OR REPLACE PROCEDURE enqueue_WS_unfilled_order(backorder order_typ) AS back_order_queue_name varchar2(62); enqopt dbms_aq.enqueue_options_t; msgprop dbms_aq.message_properties_t; enq_msgid raw(16); BEGIN /* Set back order queue name for this message: */ back_order_queue_name := 'WS.WS_backorders_que'; /* Set exception queue name for this message: */ msgprop.exception_queue := 'WS.WS_backorders_excpt_que'; dbms_aq.enqueue(back_order_queue_name, enqopt, msgprop, backorder, enq_msgid); END; /
The exception queue is a message property that can be provided at the time of enqueuing a message. If this property is not set, the default exception queue of the queue will be used for any error conditions.
set oraaq = OraDatabase.CreateAQ("CBADM.deferbilling_que") Set OraMsg = OraAq.AQMsg(ORATYPE_OBJECT, "BOLADM.order_typ") Set OraOrder = OraDatabase.CreateOraObject("BOLADM.order_typ") OraMsg = OraOrder OraMsg.delay = 15*60*60*24 OraMsg.ExceptionQueue = "WS.WS_backorders_que" 'Fill up the order values OraMsg = OraOrder 'OraOrder contains the order details Msgid = OraAq.enqueue
public static void createBackOrderQueues(Connection db_conn) { AQSession aq_sess; AQQueue backorders_q; AQQueue backorders_excp_q; AQQueueProperty q_prop; AQQueueProperty q_prop2; AQQueueTable mq_table; try { /* Create an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); mq_table = aq_sess.getQueueTable("WS", "WS_orders_mqtab"); /* Create a back order queue in Western Region which allows a maximum of 5 retries and 1 day delay between each retry. */ q_prop = new AQQueueProperty(); q_prop.setMaxRetries(5); q_prop.setRetryInterval(60*24*24); backorders_q = aq_sess.createQueue(mq_table, "WS_backorders_que", q_prop); backorders_q.start(true, true); /* Create an exception queue for the back order queue for Western Region. */ q_prop2 = new AQQueueProperty(); q_prop2.setQueueType(AQQueueProperty.EXCEPTION_QUEUE); backorders_excp_q = aq_sess.createQueue(mq_table, "WS_backorders_excpt_que", q_prop2); } catch (Exception ex) { System.out.println("Exception " + ex); } } /* Enqueue a message to WS_backorders_que and specify WS_backorders_excpt_que as the exception queue for the message: */ public static void enqueue_WS_unfilled_order(Connection db_conn, Order back_order) { AQSession aq_sess; AQQueue back_order_q; AQEnqueueOption enq_option; AQMessageProperty m_property; AQMessage message; AQObjectPayload obj_payload; byte[] enq_msg_id; try { /* Create an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); back_order_q = aq_sess.getQueue("WS", "WS_backorders_que"); message = back_order_q.createMessage(); /* Set exception queue name for this message: */ m_property = message.getMessageProperty(); m_property.setExceptionQueue("WS.WS_backorders_excpt_que"); obj_payload = message.getObjectPayload(); obj_payload.setPayloadData(back_order); enq_option = new AQEnqueueOption(); /* Enqueue the message */ enq_msg_id = back_order_q.enqueue(enq_option, message); db_conn.commit(); } catch (Exception ex) { System.out.println("Exception: " + ex); } }
Messages may be routed to various recipients based on message properties or message content. Users define a rule-based subscription for a given queue to specify interest in receiving messages that meet particular conditions.
Rules are boolean expressions that evaluate to TRUE
or FALSE
. Similar in syntax to the WHERE
clause of a SQL query, rules are expressed in terms of the attributes that represent message properties or message content. These subscriber rules are evaluated against incoming messages and those rules that match are used to determine message recipients. This feature thus supports the notions of content-based subscriptions and content-based routing of messages.
For the BooksOnLine
application, we illustrate how rule-based subscriptions are used to implement a publish/subscribe paradigm utilizing content-based subscription and content-based routing of messages. The interaction between the Order Entry application and each of the Shipping Applications is modeled as follows;
Each shipping application subscribes to the OE booked orders queue. The following rule-based subscriptions are defined by the Order Entry user to handle the routing of booked orders from the Order Entry application to each of the Shipping applications.
CONNECT OE/OE;
Western Region Shipping defines an agent called 'West_Shipping
' with the WS
booked orders queue as the agent address (destination queue to which messages must be delivered). This agent subscribes to the OE booked orders queue using a rule specified on order region and ordertype
attributes.
/* Add a rule-based subscriber for West Shipping - West Shipping handles Western region US orders, Rush Western region orders are handled by East Shipping: */ DECLARE subscriber aq$_agent; BEGIN subscriber := aq$_agent('West_Shipping', 'WS.WS_bookedorders_que', null); dbms_aqadm.add_subscriber( queue_name => 'OE.OE_bookedorders_que', subscriber => subscriber, rule => 'tab.user_data.orderregion = ''WESTERN'' AND tab.user_data.ordertype != ''RUSH'''); END; /
Eastern Region Shipping defines an agent called East_Shipping
with the ES
booked orders queue as the agent address (the destination queue to which messages must be delivered). This agent subscribes to the OE
booked orders queue using a rule specified on orderregion
, ordertype
and customer attributes.
/* Add a rule-based subscriber for East Shipping - East shipping handles all Eastern region orders, East shipping also handles all US rush orders: */ DECLARE subscriber aq$_agent; BEGIN subscriber := aq$_agent('East_Shipping', 'ES.ES_bookedorders_que', null); dbms_aqadm.add_subscriber( queue_name => 'OE.OE_bookedorders_que', subscriber => subscriber, rule => 'tab.user_data.orderregion = ''EASTERN'' OR (tab.user_data.ordertype = ''RUSH'' AND tab.user_data.customer.country = ''USA'') '); END; /
Overseas Shipping defines an agent called Overseas_Shipping
with the OS
booked orders queue as the agent address (destination queue to which messages must be delivered). This agent subscribes to the OE
booked orders queue using a rule specified on orderregion
attribute.
/* Add a rule-based subscriber for Overseas Shipping Intl Shipping handles all non-US orders: */ DECLARE subscriber aq$_agent; BEGIN subscriber := aq$_agent('Overseas_Shipping', 'OS.OS_bookedorders_que', null); dbms_aqadm.add_subscriber( queue_name => 'OE.OE_bookedorders_que', subscriber => subscriber, rule => 'tab.user_data.orderregion = ''INTERNATIONAL'''); END; /
This functionality is currently not available.
public static void addRuleBasedSubscribers(Connection db_conn) { AQSession aq_sess; AQQueue bookedorders_q; String rule; AQAgent agt1, agt2, agt3; try { /* Create an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); bookedorders_q = aq_sess.getQueue("OE", "OE_booked_orders_que"); /* Add a rule-based subscriber for West Shipping - West Shipping handles Western region US orders, Rush Western region orders are handled by East Shipping: */ agt1 = new AQAgent("West_Shipping", "WS.WS_bookedorders_que"); rule = "tab.user_data.orderregion = 'WESTERN' AND " + "tab.user_data.ordertype != 'RUSH'"; bookedorders_q.addSubscriber(agt1, rule); /* Add a rule-based subscriber for East Shipping - East shipping handles all Eastern region orders, East shipping also handles all US rush orders: */ agt2 = new AQAgent("East_Shipping", "ES.ES_bookedorders_que"); rule = "tab.user_data.orderregion = 'EASTERN' OR " + "(tab.user_data.ordertype = 'RUSH' AND " + "tab.user_data.customer.country = 'USA')"; bookedorders_q.addSubscriber(agt2, rule); /* Add a rule-based subscriber for Overseas Shipping Intl Shipping handles all non-US orders: */ agt3 = new AQAgent("Overseas_Shipping", "OS.OS_bookedorders_que"); rule = "tab.user_data.orderregion = 'INTERNATIONAL'"; bookedorders_q.addSubscriber(agt3, rule); } catch (Exception ex) { System.out.println("Exception: " + ex); } }
In Oracle8i release 8.1.6, AQ has the capability to monitor multiple queues for messages with a single call, listen
. An application can use listen
to wait for messages for multiple subscriptions. It can also be used by gateway applications to monitor multiple queues. If the listen
call returns successfully, a dequeue must be used to retrieve the message (see Listen to One (Many) Queue(s) in Chapter 11, "Operational Interface: Basic Operations").
Without the listen
call, an application which sought to dequeue from a set of queues would have to continuously poll the queues to determine if there were a message. Alternatively, you could design your application to have a separate dequeue process for each queue. However, if there are long periods with no traffic in any of the queues, these approaches will create an unacceptable overhead. The listen
call is well suited for such applications.
Note that when there are messages for multiple agents in the agent list, listen
returns with the first agent for whom there is a message. In that sense listen
is not 'fair' in monitoring the queues. The application designer must keep this in mind when using the call. To prevent one agent from 'starving' other agents for messages, the application could change the order of the agents in the agent list.
In the customer service component of the BooksOnLine
example, messages from different databases arrive in the customer service queues, indicating the state of the message. The customer service application monitors the queues and whenever there is a message about a customer order, it updates the order status in the order_status_table
. The application uses the listen
call to monitor the different queues. Whenever there is a message in any of the queues, it dequeues the message and updates the order status accordingly.
CODE (in tkaqdocd.sql) /* Update the status of the order in the order status table: */ CREATE OR REPLACE PROCEDURE update_status( new_status IN VARCHAR2, order_msg IN BOLADM.ORDER_TYP) IS old_status VARCHAR2(30); dummy NUMBER; BEGIN BEGIN /* Query old status from the table: */ SELECT st.status INTO old_status FROM order_status_table st WHERE st.customer_order.orderno = order_msg.orderno; /* Status can be 'BOOKED_ORDER', 'SHIPPED_ORDER', 'BACK_ORDER' and 'BILLED_ORDER': */ IF new_status = 'SHIPPED_ORDER' THEN IF old_status = 'BILLED_ORDER' THEN return; /* message about a previous state */ END IF; ELSIF new_status = 'BACK_ORDER' THEN IF old_status = 'SHIPPED_ORDER' OR old_status = 'BILLED_ORDER' THEN return; /* message about a previous state */ END IF; END IF; /* Update the order status: */ UPDATE order_status_table st SET st.customer_order = order_msg, st.status = new_status; COMMIT; EXCEPTION WHEN OTHERS THEN /* change to no data found */ /* First update for the order: */ INSERT INTO order_status_table(customer_order, status) VALUES (order_msg, new_status); COMMIT; END; END; / /* Dequeues message from 'QUEUE' for 'CONSUMER': */ CREATE OR REPLACE PROCEDURE DEQUEUE_MESSAGE( queue IN VARCHAR2, consumer IN VARCHAR2, message OUT BOLADM.order_typ) IS dopt dbms_aq.dequeue_options_t; mprop dbms_aq.message_properties_t; deq_msgid RAW(16); BEGIN dopt.dequeue_mode := dbms_aq.REMOVE; dopt.navigation := dbms_aq.FIRST_MESSAGE; dopt.consumer_name := consumer; dbms_aq.dequeue( queue_name => queue, dequeue_options => dopt, message_properties => mprop, payload => message, msgid => deq_msgid); commit; END; / /* Monitor the queues in the customer service databse for 'time' seconds: */ CREATE OR REPLACE PROCEDURE MONITOR_STATUS_QUEUE(time IN NUMBER) IS agent_w_message aq$_agent; agent_list dbms_aq.agent_list_t; wait_time INTEGER := 120; no_message EXCEPTION; pragma EXCEPTION_INIT(no_message, -25254); order_msg boladm.order_typ; new_status VARCHAR2(30); monitor BOOLEAN := TRUE; begin_time NUMBER; end_time NUMBER; BEGIN begin_time := dbms_utility.get_time; WHILE (monitor) LOOP BEGIN /* Construct the waiters list: */ agent_list(1) := aq$_agent('BILLED_ORDER', 'CS_billedorders_que', NULL); agent_list(1) := aq$_agent('SHIPPED_ORDER', 'CS_shippedorders_que', NULL); agent_list(2) := aq$_agent('BACK_ORDER', 'CS_backorders_que', NULL); agent_list(3) := aq$_agent('Booked_ORDER', 'CS_bookedorders_que', NULL); /* Wait for order status messages: */ dbms_aq.listen(agent_list, wait_time, agent_w_message); dbms_output.put_line('Agent' || agent_w_message.name || ' Address '|| agent_w_message.address); /* Dequeue the message from the queue: */ dequeue_message(agent_w_message.address, agent_w_message.name, order_msg); /* Update the status of the order depending on the type of the message, * the name of the agent contains the new state: */ update_status(agent_w_message.name, order_msg); /* Exit if we have been working long enough: */ end_time := dbms_utility.get_time; IF (end_time - begin_time > time) THEN EXIT; END IF; EXCEPTION WHEN no_message THEN dbms_output.put_line('No messages in the past 2 minutes'); end_time := dbms_utility.get_time; /* Exit if we have done enough work: */ IF (end_time - begin_time > time) THEN EXIT; END IF; END; END LOOP; END; /
Feature not currently available.
Feature not supported in Java.
|
![]() Copyright © 1996-2000, Oracle Corporation. All Rights Reserved. |
|