Oracle8i Application Developer's Guide - Advanced Queuing Release 2 (8.1.6) Part Number A76938-01 |
|
Creating Applications Using JMS , 8 of 8
This feature enables applications to communicate with each other without having to be connected to the same database.
AQ allows a remote subscriber, that is a subscriber at another database, to subscribe to a topic. When a message published to the topic meets the criterion of the remote subscriber, AQ will automatically propagate the message to the queue/topic at the remote database specified for the remote subscriber.
The snapshot (job_queue
) background process performs propagation. Propagation is performed using database links and Net8
There are two ways to implement remote subscribers:
createRemoteSubscriber
method can be used to create a remote subscriber to/on the topic. The remote subscriber is specified as an instance of the class AQjmsAgent
.
AQjmsAgent
has a name and an address. The address consists of a queue/topic and the database link (dblink) to the database of the subscriber.
There are two kinds of remote subscribers:
The remote subscriber is a topic. This occurs when no name is specified for the remote subscriber in the AQjmsAgent
object and the address is a topic. The message satisfying the subscriber's subscription is propagated to the remote topic. The propagated message is now available to all the subscriptions of the remote topic that it satisfies.
Specify a specific remote recipient for the message. The remote subscription can be for a particular consumer at the remote database. If the name of the remote recipeint is specified (in the AQjmsAgent
object), then the message satisfying the subscription is propagated to the remote database for that recipient only. The recipient at the remote database uses the TopicReceiver
interface to retrieve its messages. The remote subscription can also be for a point-to-point queue
Assume the order entry application and Western region shipping application are on different databases, db1
and db2
. Further assume that there is a dblink dblink_oe_ws
from database db1
, the order entry database, to the western shipping database db2. The WS_bookedorders_topic
at db2 is a remote subscriber to the OE_bookedorders_topic
in db1.
Assume the order entry application and Western region shipping application are on different databases, db1 and db2. Further assume that there is a dblink dblink_oe_ws
from the local order entry database db1
to the western shipping database db2
. The agent "Priority" at WS_bookedorders_topic
in db2
is a remote subscriber to the OE_bookedorders_topic
in db1. Messages propagated to the WS_bookedorders_topic
are for "Priority" only.
public void remote_subscriber(TopicSession jms_session) { Topic topic; ObjectMessage obj_message; AQjmsAgent remote_sub; try { /* get a handle to the OE_bookedorders_topic */ topic = ((AQjmsSession)jms_session).getTopic("OE", "OE_bookedorders_topic"); /* create the remote subscriber, name unspecified and address * the topic WS_booked_orders_topic at db2 */ remote_sub = new AQjmsAgent(null, "WS.WS_bookedorders_topic@dblink_oe_ ws"); /* subscribe for western region orders */ ((AQjmsSession)jms_session).createRemoteSubscriber(topic, remote_sub, "Region = 'Western' "); } catch (JMSException ex) { System.out.println("Exception :" + ex); } catch (java.sql.SQLException ex1) {System.out.println("SQL Exception :" + ex1); } }
Database db2 - shipping database: The WS_booked_orders_topic
has two subscribers, one for priority shipping and the other normal. The messages from the Order Entry database are propagated to the Shipping database and delivered to the correct subscriber. Priority orders have a message priority of 1.
public void get_priority_messages(TopicSession jms_session) { Topic topic; TopicSubscriber tsubs; ObjectMessage obj_message; BolCustomer customer; BolOrder booked_order; try { /* get a handle to the OE_bookedorders_topic */ topic = ((AQjmsSession)jms_session).getTopic("WS", "WS_bookedorders_topic"); /* Create local subscriber - for priority messages */ tsubs = jms_session.createDurableSubscriber(topic, "PRIORITY", " JMSPriority = 1 ", false); obj_message = (ObjectMessage) tsubs.receive(); booked_order = (BolOrder)obj_message.getObject(); customer = booked_order.getCustomer(); System.out.println("Priority Order: for customer " + customer.getName()); jms_session.commit(); } catch (JMSException ex) { System.out.println("Exception :" + ex); } } public void get_normal_messages(TopicSession jms_session) { Topic topic; TopicSubscriber tsubs; ObjectMessage obj_message; BolCustomer customer; BolOrder booked_order; try { /* get a handle to the OE_bookedorders_topic */ topic = ((AQjmsSession)jms_session).getTopic("WS", "WS_bookedorders_topic"); /* Create local subscriber - for priority messages */ tsubs = jms_session.createDurableSubscriber(topic, "PRIORITY", " JMSPriority > 1 ", false); obj_message = (ObjectMessage) tsubs.receive(); booked_order = (BolOrder)obj_message.getObject(); customer = booked_order.getCustomer(); System.out.println("Normal Order: for customer " + customer.getName()); jms_session.commit(); } catch (JMSException ex) { System.out.println("Exception :" + ex); } } public void remote_subscriber1(TopicSession jms_session) { Topic topic; ObjectMessage obj_message; AQjmsAgent remote_sub; try { /* get a handle to the OE_bookedorders_topic */ topic = ((AQjmsSession)jms_session).getTopic("OE", "OE_bookedorders_topic"); /* create the remote subscriber, name "Priority" and address * the topic WS_booked_orders_topic at db2 */ remote_sub = new AQjmsAgent("Priority", "WS.WS_bookedorders_topic@dblink_ oe_ws"); /* subscribe for western region orders */ ((AQjmsSession)jms_session).createRemoteSubscriber(topic, remote_sub, "Region = 'Western' "); } catch (JMSException ex) { System.out.println("Exception :" + ex); } catch (java.sql.SQLException ex1) {System.out.println("SQL Exception :" + ex1); } } Remote database: database db2 - Western Shipping database. /* get messages for subscriber priority */ public void get_priority_messages1(TopicSession jms_session) { Topic topic; TopicReceiver trecs; ObjectMessage obj_message; BolCustomer customer; BolOrder booked_order; try { /* get a handle to the OE_bookedorders_topic */ topic = ((AQjmsSession)jms_session).getTopic("WS", "WS_bookedorders_topic"); /* create a local receiver "Priority" for the remote subscription * to WS_bookedorders_topic */ trecs = ((AQjmsSession)jms_session).createTopicReceiver(topic, "Priority", null); obj_message = (ObjectMessage) trecs.receive(); booked_order = (BolOrder)obj_message.getObject(); customer = booked_order.getCustomer(); System.out.println("Priority Order: for customer " + customer.getName()); jms_session.commit(); } catch (JMSException ex) { System.out.println("Exception :" + ex); } }
Propagation must be scheduled via the schedule_propagation
method for every topic from which messages are propagated to target destination databases.
A schedule indicates the time frame during which messages can be propagated from the source topic. This time frame may depend on a number of factors such as network traffic, load at source database, load at destination database, and so on. The schedule therefore has to be tailored for the specific source and destination. When a schedule is created, a job is automatically submitted to the job_queue
facility to handle propagation.
The administrative calls for propagation scheduling provide great flexibility for managing the schedules (see "Schedule a Queue Propagation", Chapter 12, "Creating Applications Using JMS"). The duration or propagation window parameter of a schedule specifies the time frame during which propagation has to take place. If the duration is unspecified then the time frame is an infinite single window. If a window has to be repeated periodically then a finite duration is specified along with a next_time
function that defines the periodic interval between successive windows.
The latency parameter for a schedule is relevant only when a queue does not have any messages to be propagated. This parameter specifies the time interval within which a queue has to be rechecked for messages. Note that if the latency parameter is to be enforced, then the job_queue_interval
parameter for the job_queue_processes
should be less than or equal to the latency parameter. The propagation schedules defined for a queue can be changed or dropped at anytime during the life of the queue. In addition there are calls for temporarily disabling a schedule (instead of dropping the schedule) and enabling a disabled schedule. A schedule is active when messages are being propagated in that schedule. All the administrative calls can be made irrespective of whether the schedule is active or not. If a schedule is active then it will take a few seconds for the calls to be executed.
Job queue processes must be started for propagation to take place. At least 2 job queue processes must be started. The dblinks to the destination database must also be valid. The source and destination topics of the propagation must be of the same message type. The remote topic must be enabled for enqueue. The user of the dblink must also have enqueue privileges to the remote topic.
public void schedule_propagation(TopicSession jms_session) { Topic topic; try { /* get a handle to the OE_bookedorders_topic */ topic = ((AQjmsSession)jms_session).getTopic("WS", "WS_bookedorders_topic"); /* Schedule propagation immediately with duration of 5 minutes and latency 20 sec */ ((AQjmsDestination)topic).schedulePropagation(jms_session, "dba", null, new Double(5*60), null, new Double(20)); }catch (JMSException ex) {System.out.println("Exception: " + ex);} } Propagation schedule parameters can also be altered. /* alter duration to 10 minutes and latency to zero */ public void alter_propagation(TopicSession jms_session) { Topic topic; try { /* get a handle to the OE_bookedorders_topic */ topic = ((AQjmsSession)jms_session).getTopic("WS", "WS_bookedorders_topic"); /* Schedule propagation immediately with duration of 5 minutes and latency 20 sec */ ((AQjmsDestination)topic).alterPropagationSchedule(jms_session, "dba", new Double(10*60), null, new Double(0)); }catch (JMSException ex) {System.out.println("Exception: " + ex);} }
Detailed information about the schedules can be obtained from the catalog views defined for propagation. Information about active schedules -- such as the name of the background process handling that schedule, the SID (session, serial number) for the session handling the propagation and the Oracle instance handling a schedule (relevant if OPS is being used) -- can be obtained from the catalog views. The same catalog views also provide information about the previous successful execution of a schedule (last successful propagation of message) and the next execution of the schedule.
For each schedule detailed propagation statistics are maintained. This includes the total number of messages propagated in a schedule, total number of bytes propagated in a schedule, maximum number of messages propagated in a window, maximum number of bytes propagated in a window, average number of messages propagated in a window, average size of propagated messages and the average time to propagated a message. These statistics have been designed to provide useful information to the queue administrators for tuning the schedules such that maximum efficiency can be achieved.
Propagation has built-in support for handling failures and reporting errors. For example, if the database link specified is invalid, or the remote database is unavailable, or the remote topic/queue is not enabled for enqueuing, then the appropriate error message is reported. Propagation uses an exponential backoff scheme for retrying propagation from a schedule that encountered a failure. If a schedule continuously encounters failures, the first retry happens after 30 seconds, the second after 60 seconds, the third after 120 seconds and so forth. If the retry time is beyond the expiration time of the current window then the next retry is attempted at the start time of the next window. A maximum of 16 retry attempts are made after which the schedule is automatically disabled. When a schedule is disabled automatically due to failures, the relevant information is written into the alert log. At anytime it is possible to check if there were failures encountered by a schedule and if so how many successive failure were encountered, the error message indicating the cause for the failure and the time at which the last failure was encountered. By examining this information, an administrator can fix the failure and enable the schedule. During a retry if propagation is successful then the number of failures is reset to 0. Propagation has support built in for OPS and is completely transparent to the user and the administrator. The job that handles propagation is submitted to the same instance as the owner of the queue table in which the source topic resides. If at anytime there is a failure at an instance and the queue table that stores the topic is migrated to a different instance, the propagation job is also automatically migrated to the new instance. This will minimize the 'pinging' between instances and thus offer better performance. Propagation has been designed to handle any number of concurrent schedules.
Note that the number of job_queue_processes
is limited to a maximum of 36 and some of these may be used to handle non-propagation related jobs.Hence, propagation has built is support for multi-tasking and load balancing. The propagation algorithms are designed such that multiple schedules can be handled by a single snapshot (job_queue
) process. The propagation load on a job_queue
processes can be skewed based on the arrival rate of messages in the different source topics. If one process is overburdened with several active schedules while another is less loaded with many passive schedules, propagation automatically re-distributes the schedules among the processes such that they are loaded uniformly.
In the BooksOnLine example, the OE_bookedorders_topic
is busy since messages in it are propagated to different shipping sites. The following example code illustrates the calls supported by enhanced propagation scheduling for error checking and schedule monitoring.
CONNECT OE/OE; /* get averages select avg_time, avg_number, avg_size from user_queue_schedules; /* get totals select total_time, total_number, total_bytes from user_queue_schedules; /* get maximums for a window select max_number, max_bytes from user_queue_schedules; /* get current status information of schedule select process_name, session_id, instance, schedule_disabled from user_queue_schedules; /* get information about last and next execution select last_run_date, last_run_time, next_run_date, next_run_time from user_queue_schedules; /* get last error information if any select failures, last_error_msg, last_error_date, last_error_time from user_queue_schedules;
When a system errors such as a network failure occurs, AQ will continue to attempt to propagate messages using an exponential back-off algorithm. In some situations that indicate application errors AQ will mark messages as UNDELIVERABLE
if there is an error in propagating the message.
Examples of such errors are when the remote queue/topic does not exist or when there is a type mismatch between the source queue/topic and the remote queue/topic.In such situations users must query the DBA_SCHEDULES
view to determine the last error that occurred during propagation to a particular destination.The trace files in the $ORACLE_HOME
/log directory can provide additional information about the error.
|
![]() Copyright © 1996-2000, Oracle Corporation. All Rights Reserved. |
|