Oracle8i Application Developer's Guide - Advanced Queuing Release 2 (8.1.6) Part Number A76938-01 |
|
Oracle Advanced Queuing by Example, 3 of 8
To enqueue a single message without any other parameters specify the queue name and the payload.
/* Enqueue to msg_queue: */ DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_typ; BEGIN message := message_typ('NORMAL MESSAGE', 'enqueued to msg_queue first.'); dbms_aq.enqueue(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; /* Dequeue from msg_queue: */ DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_typ; BEGIN DBMS_AQ.DEQUEUE(queue_name => 'msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); COMMIT; END;
#include <stdio.h> #include <string.h> #include <sqlca.h> #include <sql2oci.h> /* The header file generated by processing object type 'aq.Message_typ': */ #include "pceg.h" void sql_error(msg) char *msg; { EXEC SQL WHENEVER SQLERROR CONTINUE; printf("%s\n", msg); printf("\n% .800s \n", sqlca.sqlerrm.sqlerrmc); EXEC SQL ROLLBACK WORK RELEASE; exit(1); } main() { Message_typ *message = (Message_typ*)0; /* payload */ char user[60]="aq/AQ"; /* user logon password */ char subject[30]; /* components of the */ char txt[80]; /* payload type */ /* ENQUEUE and DEQUEUE to an OBJECT QUEUE */ /* Connect to database: */ EXEC SQL CONNECT :user; /* On an oracle error print the error number :*/ EXEC SQL WHENEVER SQLERROR DO sql_error("Oracle Error :"); /* Allocate memory for the host variable from the object cache : */ EXEC SQL ALLOCATE :message; /* ENQUEUE */ strcpy(subject, "NORMAL ENQUEUE"); strcpy(txt, "The Enqueue was done through PLSQL embedded in PROC"); /* Initialize the components of message : */ EXEC SQL OBJECT SET subject, text OF :message TO :subject, :txt; /* Embedded PLSQL call to the AQ enqueue procedure : */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; enqueue_options dbms_aq.enqueue_options_t; msgid RAW(16); BEGIN /* Bind the host variable 'message' to the payload: */ dbms_aq.enqueue(queue_name => 'msg_queue', message_properties => message_properties, enqueue_options => enqueue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work */ EXEC SQL COMMIT; printf("Enqueued Message \n");printf("Subject :%s\n",subject);
printf("Text :%s\n",txt);
/* Dequeue */ /* Embedded PLSQL call to the AQ dequeue procedure : */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; dequeue_options dbms_aq.dequeue_options_t; msgid RAW(16); BEGIN /* Return the payload into the host variable 'message': */ dbms_aq.dequeue(queue_name => 'msg_queue', message_properties => message_properties, dequeue_options => dequeue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work :*/ EXEC SQL COMMIT; /* Extract the components of message: */ EXEC SQL OBJECT GET SUBJECT,TEXT FROM :message INTO :subject,:txt; printf("Dequeued Message \n"); printf("Subject :%s\n",subject); printf("Text :%s\n",txt); }
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> struct message { OCIString *subject; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_subject; OCIInd null_data; }; typedef struct null_message null_message; int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; dvoid *tmp; OCIType *mesg_tdo = (OCIType *) 0; message msg; null_message nmsg; message *mesg = &msg; null_message *nmesg = &nmsg; message *deqmesg = (message *)0; null_message *ndeqmesg = (null_message *)0; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit(&envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); OCILogon(envhp, errhp, &svchp, "AQ", strlen("AQ"), "AQ", strlen("AQ"), 0, 0); /* Obtain TDO of message_typ */ OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", strlen("AQ"), (CONST text *)"MESSAGE_TYP", strlen("MESSAGE_TYP"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo); /* Prepare the message payload */ mesg->subject = (OCIString *)0; mesg->data = (OCIString *)0; OCIStringAssignText(envhp, errhp, (CONST text *)"NORMAL MESSAGE", strlen("NORMAL MESSAGE"), &mesg->subject); OCIStringAssignText(envhp, errhp, (CONST text *)"OCI ENQUEUE", strlen("OCI ENQUEUE"), &mesg->data); nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL; /* Enqueue into the msg_queue */ OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue", 0, 0, mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0); OCITransCommit(svchp, errhp, (ub4) 0); /* Dequeue from the msg_queue */ OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue", 0, 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0); printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); OCITransCommit(svchp, errhp, (ub4) 0); }
To enqueue and dequeue of object type messages follow the lettered steps below.
a. Create the SQL type for the Queue Payload
connect aquser/aquser create type ADDRESS as object (street VARCHAR (30), city VARCHAR(30)); create type PERSON as object (name VARCHAR (30), home ADDRESS);
b. Generate the java class that maps to the PERSON Adt and implements the CustomDatum interface (using Jpublisher tool)
jpub -user=aquser/aquser -sql=ADDRESS,PERSON -case=mixed -usertypes=oracle -methods=false This creates two classes - PERSON.java and ADDRESS.java corresponding to the PERSON and ADDRESS Adt types.
c. Create the queue table and queue with ADT payload
\d. Enqueue and dequeue messages containing object payloads
public static void AQObjectPayloadTest(AQSession aq_sess) throws AQException, SQLException, ClassNotFoundException { Connection db_conn = null; AQQueue queue = null; AQMessage message = null; AQObjectPayload payload = null; AQEnqueueOption eq_option = null; AQDequeueOption dq_option = null; PERSON pers = null; PERSON pers2= null; ADDRESS addr = null; db_conn = ((AQOracleSession)aq_sess).getDBConnection(); queue = aq_sess.getQueue("aquser", "test_queue2"); /* Enable enqueue/dequeue on this queue */ queue.start(); /* Enqueue a message in test_queue2 */ message = queue.createMessage(); pers = new PERSON(); pers.setName("John"); addr = new ADDRESS(); addr.setStreet("500 Easy Street"); addr.setCity("San Francisco"); pers.setHome(addr); payload = message.getObjectPayload(); payload.setPayloadData(pers); eq_option = new AQEnqueueOption(); /* Enqueue a message into test_queue2 */ queue.enqueue(eq_option, message); db_conn.commit(); /* Dequeue a message from test_queue2 */ dq_option = new AQDequeueOption(); message = ((AQOracleQueue)queue).dequeue(dq_option, PERSON.getFactory()); payload = message.getObjectPayload(); pers2 = (PERSON) payload.getPayloadData(); System.out.println("Object data retrieved: [PERSON]"); System.out.println("Name: " + pers2.getName()); System.out.println("Address "); System.out.println("Street: " + pers2.getHome().getStreet()); System.out.println("City: " + pers2.getHome().getCity()); db_conn.commit(); }
To enqueue and dequeue of object type messages follow the lettered steps below.
a. Create the SQL type for the Queue Payload
connect aquser/aquser create type EMPLOYEE as object (empname VARCHAR (50), empno INTEGER);
b. Create a java class that maps to the EMPLOYEE Adt and implements the SQLData interface. This class can also be generated using JPublisher using the following syntax
jpub -user=aquser/aquser -sql=EMPLOYEE -case=mixed -usertypes=jdbc -methods=false import java.sql.*; import oracle.jdbc2.*; public class Employee implements SQLData { private String sql_type; public String empName; public int empNo; public Employee() {} public Employee (String sql_type, String empName, int empNo) { this.sql_type = sql_type; this.empName = empName; this.empNo = empNo; } ////// implements SQLData ////// public String getSQLTypeName() throws SQLException { return sql_type; } public void readSQL(SQLInput stream, String typeName) throws SQLException { sql_type = typeName; empName = stream.readString(); empNo = stream.readInt(); } public void writeSQL(SQLOutput stream) throws SQLException { stream.writeString(empName); stream.writeInt(empNo); } public String toString() { String ret_str = ""; ret_str += "[Employee]\n"; ret_str += "Name: " + empName + "\n"; ret_str += "Number: " + empNo + "\n"; return ret_str; } }
c. Create the queue table and queue with ADT payload
public static void createEmployeeObjQueue(AQSession aq_sess) throws AQException { AQQueueTableProperty qt_prop = null; AQQueueProperty q_prop = null; AQQueueTable q_table = null; AQQueue queue = null; /* Message payload type is aquser.EMPLOYEE */ qt_prop = new AQQueueTableProperty("AQUSER.EMPLOYEE"); qt_prop.setComment("queue-table1"); /* Create aQTable1 */ System.out.println("\nCreate QueueTable: [aqtable1]"); q_table = aq_sess.createQueueTable("aquser", "aqtable1", qt_prop); /* Create test_queue1 */ q_prop = new AQQueueProperty(); queue = q_table.createQueue("test_queue1", q_prop); /* Enable enqueue/dequeue on this queue */ queue.start(); }
d. Enqueue and dequeue messages containing object payloads
public static void AQObjectPayloadTest2(AQSession aq_sess) throws AQException, SQLException, ClassNotFoundException { Connection db_conn = null; AQQueue queue = null; AQMessage message = null; AQObjectPayload payload = null; AQEnqueueOption eq_option = null; AQDequeueOption dq_option = null; Employee emp = null; Employee emp2 = null; Hashtable map; db_conn = ((AQOracleSession)aq_sess).getDBConnection(); /* Get the Queue object */ queue = aq_sess.getQueue("aquser", "test_queue1"); /* Register Employee class (corresponding to EMPLOYEE Adt) * in the connection type map */ try { map = (java.util.Hashtable)(((OracleConnection)db_conn).getTypeMap()); map.put("AQUSER.EMPLOYEE", Class.forName("Employee")); } catch(Exception ex) { System.out.println("Error registering type: " + ex); } /* Enqueue a message in test_queue1 */ message = queue.createMessage(); emp = new Employee("AQUSER.EMPLOYEE", "Mark", 1007); /* Set the object payload */ payload = message.getObjectPayload(); payload.setPayloadData(emp); /* Enqueue a message into test_queue1*/ eq_option = new AQEnqueueOption(); queue.enqueue(eq_option, message); db_conn.commit(); /* Dequeue a message from test_queue1 */ dq_option = new AQDequeueOption(); message = queue.dequeue(dq_option, Class.forName("Employee")); payload = message.getObjectPayload(); emp2 = (Employee) payload.getPayloadData(); System.out.println("\nObject data retrieved: [EMPLOYEE]"); System.out.println("Name : " + emp2.empName); System.out.println("EmpId : " + emp2.empNo); db_conn.commit(); }
DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message RAW(4096); BEGIN message := HEXTORAW(RPAD('FF',4095,'FF')); DBMS_AQ.ENQUEUE(queue_name => 'raw_msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END; /* Dequeue from raw_msg_queue: */ /* Dequeue from raw_msg_queue: */ DECLARE dequeue_options DBMS_AQ.dequeue_options_t; message_properties DBMS_AQ.message_properties_t; message_handle RAW(16); message RAW(4096); BEGIN DBMS_AQ.DEQUEUE(queue_name => 'raw_msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END;
#include <stdio.h>
#include <string.h>
#include <sqlca.h>
#include <sql2oci.h>
void sql_error(msg)
char *msg;
{
EXEC SQL WHENEVER SQLERROR CONTINUE;
printf("%s\n", msg);
printf("\n% .800s \n", sqlca.sqlerrm.sqlerrmc);
EXEC SQL ROLLBACK WORK RELEASE;
exit(1);
}
main()
{
OCIEnv *oeh;
/* OCI Env handle */OCIError *err;
/* OCI Err handle */OCIRaw *message= (OCIRaw*)0;
/* payload */ub1 message_txt[100];
/* data for payload */char user[60]="aq/AQ";
/* user logon password */int status;
/* returns status of the OCI call */ /* Enqueue and dequeue to a RAW queue */ /* Connect to database: */EXEC SQL CONNECT :user;
/* On an oracle error print the error number: */EXEC SQL WHENEVER SQLERROR DO sql_error("Oracle Error :");
/* Get the OCI Env handle: */
if (SQLEnvGet(SQL_SINGLE_RCTX, &oeh) != OCI_SUCCESS)
{
printf(" error in SQLEnvGet \n");
exit(1);
}
/* Get the OCI Error handle: */if (status = OCIHandleAlloc((dvoid *)oeh, (dvoid **)&err,
(ub4)OCI_HTYPE_ERROR, (ub4)0, (dvoid **)0))
{
printf(" error in OCIHandleAlloc %d \n", status);
exit(1);
}
/
* Enqueue */ /* The bytes to be put into the raw payload:*/strcpy(message_txt, "Enqueue to a Raw payload queue ");
/* Assign bytes to the OCIRaw pointer : Memory needs to be allocated explicitly to OCIRaw*: */if (status=OCIRawAssignBytes(oeh, err, message_txt, 100,
&message))
{
printf(" error in OCIRawAssignBytes %d \n", status);
exit(1);
}
/
* Embedded PLSQL call to the AQ enqueue procedure : */EXEC SQL EXECUTE
DECLARE
message_properties dbms_aq.message_properties_t;
enqueue_options dbms_aq.enqueue_options_t;
msgid RAW(16);
BEGIN
/* Bind the host variable message to the raw payload: */dbms_aq.enqueue(queue_name => 'raw_msg_queue',
message_properties => message_properties,
enqueue_options => enqueue_options,
payload => :message,
msgid => msgid);
END;
END-EXEC;
/* Commit work: */EXEC SQL COMMIT;
/
* Dequeue */ /* Embedded PLSQL call to the AQ dequeue procedure :*/
EXEC SQL EXECUTE
DECLARE
message_properties dbms_aq.message_properties_t;
dequeue_options dbms_aq.dequeue_options_t;
msgid RAW(16);
BEGIN
/
* Return the raw payload into the host variable 'message':*/dbms_aq.dequeue(queue_name => 'raw_msg_queue',
message_properties => message_properties,
dequeue_options => dequeue_options,
payload => :message,
msgid => msgid);
END;
END-EXEC;
/* Commit work: */
EXEC SQL COMMIT;
}
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; dvoid *tmp; OCIType *mesg_tdo = (OCIType *) 0; char msg_text[100]; OCIRaw *mesg = (OCIRaw *)0; OCIRaw *deqmesg = (OCIRaw *)0; OCIInd ind = 0; dvoid *indptr = (dvoid *)&ind; int i; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); OCILogon(envhp, errhp, &svchp, "AQ", strlen("AQ"), "AQ", strlen("AQ"), 0, 0); /* Obtain the TDO of the RAW data type */ OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQADM", strlen("AQADM"), (CONST text *)"RAW", strlen("RAW"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo); /* Prepare the message payload */ strcpy(msg_text, "Enqueue to a RAW queue"); OCIRawAssignBytes(envhp, errhp, msg_text, strlen(msg_text), &mesg); /* Enqueue the message into raw_msg_queue */ OCIAQEnq(svchp, errhp, (CONST text *)"raw_msg_queue", 0, 0, mesg_tdo, (dvoid **)&mesg, (dvoid **)&indptr, 0, 0); OCITransCommit(svchp, errhp, (ub4) 0); /* Dequeue the same message into C variable deqmesg */ OCIAQDeq(svchp, errhp, (CONST text *)"raw_msg_queue", 0, 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&indptr, 0, 0); for (i = 0; i < OCIRawSize(envhp, deqmesg); i++) printf("%c", *(OCIRawPtr(envhp, deqmesg) + i)); OCITransCommit(svchp, errhp, (ub4) 0); }
public static void runTest(AQSession aq_sess) throws AQException { AQQueueTable q_table; AQQueue queue; AQMessage message; AQRawPayload raw_payload; AQEnqueueOption enq_option; String test_data = "new message"; byte[] b_array; Connection db_conn; db_conn = ((AQOracleSession)aq_sess).getDBConnection(); /* Get a handle to queue table - aq_table4 in aqjava schema: */ q_table = aq_sess.getQueueTable ("aqjava", "aq_table4"); System.out.println("Successful getQueueTable"); /* Get a handle to a queue - aq_queue4 in aquser schema: */ queue = aq_sess.getQueue ("aqjava", "aq_queue4"); System.out.println("Successful getQueue"); /* Create a message to contain raw payload: */ message = queue.createMessage(); /* Get handle to the AQRawPayload object and populate it with raw data: */ b_array = test_data.getBytes(); raw_payload = message.getRawPayload(); raw_payload.setStream(b_array, b_array.length); /* Create a AQEnqueueOption object with default options: */ enq_option = new AQEnqueueOption(); /* Enqueue the message: */ queue.enqueue(enq_option, message); db_conn.commit(); }
public static void runTest(AQSession aq_sess) throws AQException { AQQueueTable q_table; AQQueue queue; AQMessage message; AQRawPayload raw_payload; AQEnqueueOption enq_option; String test_data = "new message"; AQDequeueOption deq_option; byte[] b_array; Connection db_conn; db_conn = ((AQOracleSession)aq_sess).getDBConnection(); /* Get a handle to queue table - aq_table4 in aqjava schema: */ q_table = aq_sess.getQueueTable ("aqjava", "aq_table4"); System.out.println("Successful getQueueTable"); /* Get a handle to a queue - aq_queue4 in aquser schema: */ queue = aq_sess.getQueue ("aqjava", "aq_queue4"); System.out.println("Successful getQueue"); /* Create a message to contain raw payload: */ message = queue.createMessage(); /* Get handle to the AQRawPayload object and populate it with raw data: */ b_array = test_data.getBytes(); raw_payload = message.getRawPayload(); raw_payload.setStream(b_array, b_array.length); /* Create a AQEnqueueOption object with default options: */ enq_option = new AQEnqueueOption(); /* Enqueue the message: */ queue.enqueue(enq_option, message); System.out.println("Successful enqueue"); db_conn.commit(); /* Create a AQDequeueOption object with default options: */ deq_option = new AQDequeueOption(); /* Dequeue a message: */ message = queue.dequeue(deq_option); System.out.println("Successful dequeue"); /* Retrieve raw data from the message: */ raw_payload = message.getRawPayload(); b_array = raw_payload.getBytes(); db_conn.commit(); }
public static void runTest(AQSession aq_sess) throws AQException { AQQueueTable q_table; AQQueueTable q_table; AQQueue queue; AQMessage message; AQRawPayload raw_payload; AQEnqueueOption enq_option; String test_data = "new message"; AQDequeueOption deq_option; byte[] b_array; Connection db_conn; db_conn = ((AQOracleSession)aq_sess).getDBConnection(); /* Get a handle to queue table - aq_table4 in aqjava schema: */ q_table = aq_sess.getQueueTable ("aqjava", "aq_table4"); System.out.println("Successful getQueueTable"); /* Get a handle to a queue - aq_queue4 in aquser schema: */ queue = aq_sess.getQueue ("aqjava", "aq_queue4"); System.out.println("Successful getQueue"); /* Create a message to contain raw payload: */ message = queue.createMessage(); /* Get handle to the AQRawPayload object and populate it with raw data: */ b_array = test_data.getBytes(); raw_payload = message.getRawPayload(); raw_payload.setStream(b_array, b_array.length); /* Create a AQEnqueueOption object with default options: */ enq_option = new AQEnqueueOption(); /* Enqueue the message: */ queue.enqueue(enq_option, message); System.out.println("Successful enqueue"); db_conn.commit(); /* Create a AQDequeueOption object with default options: */ deq_option = new AQDequeueOption(); /* Set dequeue mode to BROWSE: */ deq_option.setDequeueMode(AQDequeueOption.DEQUEUE_BROWSE); /* Set wait time to 10 seconds: */ deq_option.setWaitTime(10); /* Dequeue a message: */ message = queue.dequeue(deq_option); /* Retrieve raw data from the message: */ raw_payload = message.getRawPayload(); b_array = raw_payload.getBytes(); String ret_value = new String(b_array); System.out.println("Dequeued message: " + ret_value); db_conn.commit(); }
When two messages are enqued with the same priority, the message which was enqued earlier will be dequeued first. However, if two messages are of different priorities, the message with the lower value (higher priority) will be dequeued first.
/* Enqueue two messages with priority 30 and 5: */ DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_typ; BEGIN message := message_typ('PRIORITY MESSAGE', 'enqued at priority 30.'); message_properties.priority := 30; DBMS_AQ.ENQUEUE(queue_name => 'priority_msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); message := message_typ('PRIORITY MESSAGE', 'Enqueued at priority 5.'); message_properties.priority := 5; DBMS_AQ.ENQUEUE(queue_name => 'priority_msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); END; /* Dequeue from priority queue: */ DECLARE dequeue_options DBMS_AQ.dequeue_options_t; message_properties DBMS_AQ.message_properties_t; message_handle RAW(16); message aq.message_typ; BEGIN DBMS_AQ.DEQUEUE(queue_name => 'priority_msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); COMMIT; DBMS_AQ.DEQUEUE(queue_name => 'priority_msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); COMMIT; END; /* On return, the second message with priority set to 5 will be retrieved before the message with priority set to 30 since priority takes precedence over enqueue time. */
public static void runTest(AQSession aq_sess) throws AQException { AQQueueTable q_table; AQQueue queue; AQMessage message; AQMessageProperty m_property; AQRawPayload raw_payload; AQEnqueueOption enq_option; String test_data; byte[] b_array; Connection db_conn; db_conn = ((AQOracleSession)aq_sess).getDBConnection(); /* Get a handle to queue table - aq_table4 in aqjava schema: */ qtable = aq_sess.getQueueTable ("aqjava", "aq_table4"); System.out.println("Successful getQueueTable"); /* Get a handle to a queue - aq_queue4 in aqjava schema: */ queue = aq_sess.getQueue ("aqjava", "aq_queue4"); System.out.println("Successful getQueue"); /* Enqueue 5 messages with priorities with different priorities: */ for (int i = 0; i < 5; i++ ) { /* Create a message to contain raw payload: */ message = queue.createMessage(); test_data = "Small_message_" + (i+1); /* some test data */ /* Get a handle to the AQRawPayload object and populate it with raw data: */ b_array = test_data.getBytes(); raw_payload = message.getRawPayload(); raw_payload.setStream(b_array, b_array.length); /* Set message priority: */ m_property = message.getMessageProperty(); if( i < 2) m_property.setPriority(2); else m_property.setPriority(3); /* Create a AQEnqueueOption object with default options: */ enq_option = new AQEnqueueOption(); /* Enqueue the message: */ queue.enqueue(enq_option, message); System.out.println("Successful enqueue"); } db_conn.commit(); }
An application can preview messages in browse mode or locked mode without deleting the message. The message of interest can then be removed from the queue.
/* Enqueue 6 messages to msg_queue -- GREEN, GREEN, YELLOW, VIOLET, BLUE, RED */ DECLARE enqueue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; message_handle RAW(16); message aq.message_typ; BEGIN message := message_typ('GREEN', 'GREEN enqueued to msg_queue first.'); DBMS_AQ.ENQUEUE(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); message := message_typ('GREEN', 'GREEN also enqueued to msg_queue second.'); DBMS_AQ.ENQUEUE(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); message := message_typ('YELLOW', 'YELLOW enqueued to msg_queue third.'); DBMS_AQ.ENQUEUE(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message handle: ' || message_handle); message := message_typ('VIOLET', 'VIOLET enqueued to msg_queue fourth.'); DBMS_AQ.ENQUEUE(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); message := message_typ('BLUE', 'BLUE enqueued to msg_queue fifth.'); DBMS_AQ.ENQUEUE(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); message := message_typ('RED', 'RED enqueued to msg_queue sixth.'); DBMS_AQ.ENQUEUE(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END; /* Dequeue in BROWSE mode until RED is found, and remove RED from queue: */ DECLARE dequeue_options DBMS_AQ.dequeue_options_t; message_properties DBMS_AQ.message_properties_t; message_handle RAW(16); message aq.message_typ; BEGIN dequeue_options.dequeue_mode := DBMS_AQ.BROWSE; LOOP DBMS_AQ.DEQUEUE(queue_name => 'msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); EXIT WHEN message.subject = 'RED'; END LOOP; dequeue_options.dequeue_mode := DBMS_AQ.REMOVE; dequeue_options.msgid := message_handle; DBMS_AQ.DEQUEUE(queue_name => 'msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); COMMIT; END; /* Dequeue in LOCKED mode until BLUE is found, and remove BLUE from queue: */ DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_typ; BEGIN dequeue_options.dequeue_mode := dbms_aq.LOCKED; LOOP dbms_aq.dequeue(queue_name => 'msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line ('Message: ' || message.subject || ' ... ' || message.text ); EXIT WHEN message.subject = 'BLUE'; END LOOP; dequeue_options.dequeue_mode := dbms_aq.REMOVE; dequeue_options.msgid := message_handle; dbms_aq.dequeue(queue_name => 'msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); COMMIT; END;
/* Enqueue message for delayed availability: */ DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.Message_typ; BEGIN message := Message_typ('DELAYED', 'This message is delayed one week.'); message_properties.delay := 7*24*60*60; message_properties.expiration := 2*7*24*60*60; dbms_aq.enqueue(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END;
#include <stdio.h>
#include <string.h>
#include <sqlca.h>
#include <sql2oci.h>
/* The header file generated by processing object type 'aq.Message_typ': */#include "pceg.h"
void sql_error(msg)
char *msg;
{
EXEC SQL WHENEVER SQLERROR CONTINUE;
printf("%s\n", msg);
printf("\n% .800s \n", sqlca.sqlerrm.sqlerrmc);
EXEC SQL ROLLBACK WORK RELEASE;
exit(1);
}
main()
{
OCIEnv *oeh;
/* OCI Env Handle */OCIError *err;
/* OCI Error Handle */Message_typ *message = (Message_typ*)0;
/* queue payload */OCIRaw *msgid = (OCIRaw*)0;
/* message id */ub1 msgmem[16]="";
/* memory for msgid */char user[60]="aq/AQ";
/* user login password */char subject[30];
/* components of */char txt[80];
/* Message_typ */char correlation1[30];
/* message correlation */char correlation2[30];
int status;
/* code returned by the OCI calls */
/
* Dequeue by correlation and msgid *//*
Connect to the database: */
EXEC SQL CONNECT :user;
EXEC SQL WHENEVER SQLERROR DO sql_error("Oracle Error :");
/
* Allocate space in the object cache for the host variable: */EXEC SQL ALLOCATE :message;
/* Get the OCI Env handle: */if (SQLEnvGet(SQL_SINGLE_RCTX, &oeh) != OCI_SUCCESS)
{
printf(" error in SQLEnvGet \n");
exit(1);
}
/
* Get the OCI Error handle: */if (status = OCIHandleAlloc((dvoid *)oeh, (dvoid **)&err,
(ub4)OCI_HTYPE_ERROR, (ub4)0, (dvoid **)0))
{
printf(" error in OCIHandleAlloc %d \n", status);
exit(1);
}
/*
Assign memory for msgid: Memory needs to be allocated explicitly to OCIRaw*: */if (status=OCIRawAssignBytes(oeh, err, msgmem, 16, &msgid))
{
printf(" error in OCIRawAssignBytes %d \n", status);
exit(1);
}
/
* First enqueue */
strcpy(correlation1, "1st message");
strcpy(subject, "NORMAL ENQUEUE1");
strcpy(txt, "The Enqueue was done through PLSQL embedded in PROC");
/
* Initialize the components of message: */EXEC SQL OJECT SET subject, text OF :message TO :subject, :txt;
/*
Embedded PLSQL call to the AQ enqueue procedure:*/
EXEC SQL EXECUTE
DECLARE
message_properties dbms_aq.message_properties_t;
enqueue_options dbms_aq.enqueue_options_t;
BEGIN
/
* Bind the host variable 'correlation1': to message correlation*/message_properties.correlation := :correlation1;
/
* Bind the host variable 'message' to payload and return message id into host variable 'msgid': */dbms_aq.enqueue(queue_name => 'msg_queue',
message_properties => message_properties,
enqueue_options => enqueue_options,
payload => :message,
msgid => :msgid);
END;
END-EXEC; /* Commit work: */ EXEC SQL COMMIT; printf("Enqueued Message \n"); printf("Subject :%s\n",subject); printf("Text :%s\n",txt); /* Second enqueue */ strcpy(correlation2, "2nd message"); strcpy(subject, "NORMAL ENQUEUE2"); strcpy(txt, "The Enqueue was done through PLSQL embedded in PROC"); /* Initialize the components of message: */ EXEC SQL OBJECT SET subject, text OF :messsage TO :subject,:txt; /* Embedded PLSQL call to the AQ enqueue procedure: */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; enqueue_options dbms_aq.enqueue_options_t; msgid RAW(16); BEGIN /* Bind the host variable 'correlation2': to message correlaiton */ message_properties.correlation := :correlation2; /* Bind the host variable 'message': to payload */ dbms_aq.enqueue(queue_name => 'msg_queue', message_properties => message_properties, enqueue_options => enqueue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work: */ EXEC SQL COMMIT; printf("Enqueued Message \n"); printf("Subject :%s\n",subject); printf("Text :%s\n",txt); /* First dequeue - by correlation */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; dequeue_options dbms_aq.dequeue_options_t; msgid RAW(16); BEGIN /* Dequeue by correlation in host variable 'correlation2': */ dequeue_options.correlation := :correlation2; /* Return the payload into host variable 'message': */ dbms_aq.dequeue(queue_name => 'msg_queue', message_properties => message_properties, dequeue_options => dequeue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work : */ EXEC SQL COMMIT; /* Extract the values of the components of message: */ EXEC SQL OBJECT GET subject, text FROM :message INTO :subject,:txt; printf("Dequeued Message \n"); printf("Subject :%s\n",subject); printf("Text :%s\n",txt); /* SECOND DEQUEUE - by MSGID */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; dequeue_options dbms_aq.dequeue_options_t; msgid RAW(16); BEGIN /* Dequeue by msgid in host variable 'msgid': */ dequeue_options.msgid := :msgid; /* Return the payload into host variable 'message': */ dbms_aq.dequeue(queue_name => 'msg_queue', message_properties => message_properties, dequeue_options => dequeue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work: */ EXEC SQL COMMIT; }
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> struct message { OCIString *subject; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_subject; OCIInd null_data; }; typedef struct null_message null_message; int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; dvoid *tmp; OCIType *mesg_tdo = (OCIType *) 0; message msg; null_message nmsg; message *mesg = &msg; null_message *nmesg = &nmsg; message *deqmesg = (message *)0; null_message *ndeqmesg = (null_message *)0; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); OCILogon(envhp, errhp, &svchp, "AQ", strlen("AQ"), "AQ", strlen("AQ"), 0, 0); /* Obtain TDO of message_typ */ OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", strlen("AQ"), (CONST text *)"MESSAGE_TYP", strlen("MESSAGE_TYP"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo); /* Prepare the message payload */ mesg->subject = (OCIString *)0; mesg->data = (OCIString *)0; OCIStringAssignText(envhp, errhp, (CONST text *)"NORMAL MESSAGE", strlen("NORMAL MESSAGE"), &mesg->subject); OCIStringAssignText(envhp, errhp, (CONST text *)"OCI ENQUEUE", strlen("OCI ENQUEUE"), &mesg->data); nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL; /* Enqueue into the msg_queue */ OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue", 0, 0, mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0); OCITransCommit(svchp, errhp, (ub4) 0); /* Dequeue from the msg_queue */ OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue", 0, 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0); printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); OCITransCommit(svchp, errhp, (ub4) 0); }
/* Create subscriber list: */ DECLARE subscriber aq$_agent; /* Add subscribers RED and GREEN to the suscriber list: */ BEGIN subscriber := aq$_agent('RED', NULL, NULL); DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'msg_queue_multiple', subscriber => subscriber); subscriber := aq$_agent('GREEN', NULL, NULL); DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'msg_queue_multiple', subscriber => subscriber); END; DECLARE enqueue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; recipients DBMS_AQ.aq$_recipient_list_t; message_handle RAW(16); message aq.message_typ; /* Enqueue MESSAGE 1 for subscribers to the queue i.e. for RED and GREEN: */ BEGIN message := message_typ('MESSAGE 1', 'This message is queued for queue subscribers.'); DBMS_AQ.ENQUEUE(queue_name => 'msg_queue_multiple', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); /* Enqueue MESSAGE 2 for specified recipients i.e. for RED and BLUE.*/ message := message_typ('MESSAGE 2', 'This message is queued for two recipients.'); recipients(1) := aq$_agent('RED', NULL, NULL); recipients(2) := aq$_agent('BLUE', NULL, NULL); message_properties.recipient_list := recipients; DBMS_AQ.ENQUEUE(queue_name => 'msg_queue_multiple', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END;
Note that RED is both a subscriber to the queue, as well as being a specified recipient of MESSAGE 2. By contrast, GREEN is only a subscriber to those messages in the queue (in this case, MESSAGE) for which no recipients have been specified. BLUE, while not a subscriber to the queue, is nevertheless specified to receive MESSAGE 2.
/* Dequeue messages from msg_queue_multiple: */ DECLARE dequeue_options DBMS_AQ.dequeue_options_t; message_properties DBMS_AQ.message_properties_t; message_handle RAW(16); message aq.message_typ; no_messages exception; pragma exception_init (no_messages, -25228); BEGIN dequeue_options.wait := DBMS_AQ.NO_WAIT; BEGIN /* Consumer BLUE will get MESSAGE 2: */ dequeue_options.consumer_name := 'BLUE'; dequeue_options.navigation := FIRST_MESSAGE; LOOP DBMS_AQ.DEQUEUE(queue_name => 'msg_queue_multiple', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); dequeue_options.navigation := NEXT_MESSAGE; END LOOP; EXCEPTION WHEN no_messages THEN DBMS_OUTPUT.PUT_LINE ('No more messages for BLUE'); COMMIT; END; BEGIN /* Consumer RED will get MESSAGE 1 and MESSAGE 2: */ dequeue_options.consumer_name := 'RED'; dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE LOOP DBMS_AQ.DEQUEUE(queue_name => 'msg_queue_multiple', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); dequeue_options.navigation := NEXT_MESSAGE; END LOOP; EXCEPTION WHEN no_messages THEN DBMS_OUTPUT.PUT_LINE ('No more messages for RED'); COMMIT; END; BEGIN /* Consumer GREEN will get MESSAGE 1: */ dequeue_options.consumer_name := 'GREEN'; dequeue_options.navigation := FIRST_MESSAGE; LOOP DBMS_AQ.DEQUEUE(queue_name => 'msg_queue_multiple', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); dequeue_options.navigation := NEXT_MESSAGE; END LOOP; EXCEPTION WHEN no_messages THEN DBMS_OUTPUT.PUT_LINE ('No more messages for GREEN'); COMMIT; END;
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> struct message { OCIString *subject; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_subject; OCIInd null_data; }; typedef struct null_message null_message; int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; dvoid *tmp; OCIType *mesg_tdo = (OCIType *) 0; message msg; null_message nmsg; message *mesg = &msg; null_message *nmesg = &nmsg; message *deqmesg = (message *)0; null_message *ndeqmesg = (null_message *)0; OCIAQMsgProperties *msgprop = (OCIAQMsgProperties *)0; OCIAQAgent *agents[2]; OCIAQDeqOptions *deqopt = (OCIAQDeqOptions *)0; ub4 wait = OCI_DEQ_NO_WAIT; ub4 navigation = OCI_DEQ_FIRST_MSG; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); OCILogon(envhp, errhp, &svchp, "AQ", strlen("AQ"), "AQ", strlen("AQ"), 0, 0); /* Obtain TDO of message_typ */ OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", strlen("AQ"), (CONST text *)"MESSAGE_TYP", strlen("MESSAGE_TYP"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo); /* Prepare the message payload */ mesg->subject = (OCIString *)0; mesg->data = (OCIString *)0; OCIStringAssignText(envhp, errhp, (CONST text *)"MESSAGE 1", strlen("MESSAGE 1"), &mesg->subject); OCIStringAssignText(envhp, errhp, (CONST text *)"mesg for queue subscribers", strlen("mesg for queue subscribers"), &mesg->data); nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL; /* Enqueue MESSAGE 1 for subscribers to the queue i.e. for RED and GREEN */ OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue_multiple", 0, 0, mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0); /* Enqueue MESSAGE 2 for specified recipients i.e. for RED and BLUE */ /* prepare message payload */ OCIStringAssignText(envhp, errhp, (CONST text *)"MESSAGE 2", strlen("MESSAGE 2"), &mesg->subject); OCIStringAssignText(envhp, errhp, (CONST text *)"mesg for two recipients", strlen("mesg for two recipients"), &mesg->data); /* Allocate AQ message properties and agent descriptors */ OCIDescriptorAlloc(envhp, (dvoid **)&msgprop, OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0); OCIDescriptorAlloc(envhp, (dvoid **)&agents[0], OCI_DTYPE_AQAGENT, 0, (dvoid **)0); OCIDescriptorAlloc(envhp, (dvoid **)&agents[1], OCI_DTYPE_AQAGENT, 0, (dvoid **)0); /* Prepare the recipient list, RED and BLUE */ OCIAttrSet(agents[0], OCI_DTYPE_AQAGENT, "RED", strlen("RED"), OCI_ATTR_AGENT_NAME, errhp); OCIAttrSet(agents[1], OCI_DTYPE_AQAGENT, "BLUE", strlen("BLUE"), OCI_ATTR_AGENT_NAME, errhp); OCIAttrSet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *)agents, 2, OCI_ATTR_RECIPIENT_LIST, errhp); OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue_multiple", 0, msgprop, mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0); OCITransCommit(svchp, errhp, (ub4) 0); /* Now dequeue the messages using different consumer names */ /* Allocate dequeue options descriptor to set the dequeue options */ OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **)0); /* Set wait parameter to NO_WAIT so that the dequeue returns immediately */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&wait, 0, OCI_ATTR_WAIT, errhp); /* Set navigation to FIRST_MESSAGE so that the dequeue resets the position */ /* after a new consumer_name is set in the dequeue options */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&navigation, 0, OCI_ATTR_NAVIGATION, errhp); /* Dequeue from the msg_queue_multiple as consumer BLUE */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)"BLUE", strlen("BLUE"), OCI_ATTR_CONSUMER_NAME, errhp); while (OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue_multiple", deqopt, 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0) == OCI_SUCCESS) { printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); } OCITransCommit(svchp, errhp, (ub4) 0); /* Dequeue from the msg_queue_multiple as consumer RED */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)"RED", strlen("RED"), OCI_ATTR_CONSUMER_NAME, errhp); while (OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue_multiple", deqopt, 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0) == OCI_SUCCESS) { printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); } OCITransCommit(svchp, errhp, (ub4) 0); /* Dequeue from the msg_queue_multiple as consumer GREEN */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS,(dvoid *)"GREEN",strlen("GREEN"), OCI_ATTR_CONSUMER_NAME, errhp); while (OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue_multiple", deqopt, 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0) == OCI_SUCCESS) { printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); } OCITransCommit(svchp, errhp, (ub4) 0); }
CONNECT aq/aq EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'aq.msggroup', queue_payload_type => 'aq.message_typ', message_grouping => DBMS_AQADM.TRANSACTIONAL); EXECUTE DBMS_AQADM.CREATE_QUEUE( queue_name => 'msggroup_queue', queue_table => 'aq.msggroup'); EXECUTE DBMS_AQADM.START_QUEUE( queue_name => 'msggroup_queue'); /* Enqueue three messages in each transaction */ DECLARE enqueue_options DBMS_AQ.enqueue_options_t; message_properties DBMS_AQ.message_properties_t; message_handle RAW(16); message aq.message_typ; BEGIN /* Loop through three times, committing after every iteration */ FOR txnno in 1..3 LOOP /* Loop through three times, enqueuing each iteration */ FOR mesgno in 1..3 LOOP message := message_typ('GROUP#' || txnno, 'Message#' || mesgno || ' in group' || txnno); DBMS_AQ.ENQUEUE(queue_name => 'msggroup_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); END LOOP; /* Commit the transaction */ COMMIT; END LOOP; END; /* Now dequeue the messages as groups */ DECLARE dequeue_options DBMS_AQ.dequeue_options_t; message_properties DBMS_AQ.message_properties_t; message_handle RAW(16); message aq.message_typ; no_messages exception; end_of_group exception; PRAGMA EXCEPTION_INIT (no_messages, -25228); PRAGMA EXCEPTION_INIT (end_of_group, -25235); BEGIN dequeue_options.wait := DBMS_AQ.NO_WAIT; dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE; LOOP BEGIN DBMS_AQ.DEQUEUE(queue_name => 'msggroup_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); dequeue_options.navigation := DBMS_AQ.NEXT_MESSAGE; EXCEPTION WHEN end_of_group THEN DBMS_OUTPUT.PUT_LINE ('Finished processing a group of messages'); COMMIT; dequeue_options.navigation := DBMS_AQ.NEXT_TRANSACTION; END; END LOOP; EXCEPTION WHEN no_messages THEN DBMS_OUTPUT.PUT_LINE ('No more messages'); END;
/* Create the message payload object type with one or more LOB attributes. On
enqueue, set the LOB attribute to EMPTY_BLOB. After the enqueue completes,
before you commit your transaction. Select the LOB attribute from the
user_data
column of the queue table or queue table view. You can now
use the LOB interfaces (which are available through both OCI and PL/SQL) to
write the LOB data to the queue. On dequeue, the message payload
will contain the LOB locator. You can use this LOB locator after
the dequeue, but before you commit your transaction, to read the LOB data.
*/
/* Setup the accounts: */
connect system/manager
CREATE USER aqadm IDENTIFIED BY aqadm;
GRANT CONNECT, RESOURCE TO aqadm;
GRANT aq_administrator_role TO aqadm;
CREATE USER aq IDENTIFIED BY aq;
GRANT CONNECT, RESOURCE TO aq;
GRANT EXECUTE ON DBMS_AQ TO aq;
CREATE TYPE aq.message AS OBJECT(id NUMBER,
subject VARCHAR2(100),
data BLOB,
trailer NUMBER);
CREATE TABLESPACE aq_tbs DATAFILE 'aq.dbs' SIZE 2M REUSE;
/* create the queue table, queues and start the queue: */
CONNECT aqadm/aqadm
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE(
queue_table => 'aq.qt1',
queue_payload_type => 'aq.message');
EXECUTE DBMS_AQADM.CREATE_QUEUE(
queue_name => 'aq.queue1',
queue_table => 'aq.qt1');
EXECUTE DBMS_AQADM.START_QUEUE(queue_name => 'aq.queue1');
/* End set up: */
/* Enqueue of Large data types: */
CONNECT aq/aq
CREATE OR REPLACE PROCEDURE blobenqueue(msgno IN NUMBER) AS
enq_userdata aq.message;
enq_msgid RAW(16);
enqopt DBMS_AQ.enqueue_options_t;
msgprop DBMS_AQ.message_properties_t;
lob_loc BLOB;
buffer RAW(4096);
BEGIN
buffer := HEXTORAW(RPAD('FF', 4096, 'FF'));
enq_userdata := aq.message(msgno, 'Large Lob data', EMPTY_BLOB(), msgno);
DBMS_AQ.ENQUEUE('aq.queue1', enqopt, msgprop, enq_userdata, enq_msgid);
--select the lob locator for the queue table
SELECT t.user_data.data INTO lob_loc
FROM qt1 t
WHERE t.msgid = enq_msgid;
DBMS_LOB.WRITE(lob_loc, 2000, 1, buffer );
COMMIT;
END;
/* Dequeue lob data: */
CREATE OR REPLACE PROCEDURE blobdequeue AS
dequeue_options DBMS_AQ.dequeue_options_t;
message_properties DBMS_AQ.message_properties_t;
mid RAW(16);
pload aq.message;
lob_loc BLOB;
amount BINARY_INTEGER;
buffer RAW(4096);
BEGIN
DBMS_AQ.DEQUEUE('aq.queue1', dequeue_options, message_properties,
pload, mid);
lob_loc := pload.data;
-- read the lob data info buffer
amount := 2000;
DBMS_LOB.READ(lob_loc, amount, 1, buffer);
DBMS_OUTPUT.PUT_LINE('Amount of data read: '||amount);
COMMIT;
END;
/* Do the enqueues and dequeues: */
SET SERVEROUTPUT ON
BEGIN
FOR i IN 1..5 LOOP
blobenqueue(i);
END LOOP;
END;
BEGIN
FOR i IN 1..5 LOOP
blobdequeue();
END LOOP;
END;
1. Create the message type (ADT with CLOB and blob)
connect aquser/aquser create type LobMessage as object(id NUMBER, subject varchar2(100), data blob, cdata clob, trailer number);
2. Create the queue table and queue
connect aquser/aquser execute dbms_aqadm.create_queue_table( queue_table => 'qt_adt', queue_payload_type => 'LOBMESSAGE', comment => 'single-consumer, default sort ordering, ADT Message', compatible => '8.1.0' ); execute dbms_aqadm.create_queue( queue_name => 'q1_adt', queue_table => 'qt_adt' ); execute dbms_aqadm.start_queue(queue_name => 'q1_adt');
3. Run jpublisher to generate the java class that maps to the LobMessage
Oracle object type jpub -user=aquser/aquser -sql=LobMessage -case=mixed -methods=false
4. Enqueue and Dequeue Messages
public static void runTest(AQSession aq_sess) { Connection db_conn = null; AQEnqueueOption eq_option = null; AQDequeueOption dq_option = null; AQQueue queue1 = null; AQMessage adt_msg = null; AQMessage adt_msg2 = null; AQObjectPayload sPayload = null; AQObjectPayload sPayload2 = null; LobMessage sPayl = null; LobMessage sPayl2 = null; AQObjectPayload rPayload = null; LobMessage rPayl = null; byte[] smsgid; AQMessage rMessage = null; int i = 0; int j = 0; int id = 0; boolean more = false; byte[] b_array; char[] c_array; String mStr = null; BLOB b1 = null; CLOB c1 = null; BLOB b2 = null; CLOB c2 = null; BLOB b3 = null; CLOB c3 = null; int b_len = 0; int c_len = 0; OracleCallableStatement blob_stmt0= null; OracleCallableStatement clob_stmt0= null; OracleResultSet rset0 = null; OracleResultSet rset1 = null; OracleCallableStatement blob_stmt = null; OracleResultSet rset2 = null; OracleCallableStatement clob_stmt = null; OracleResultSet rset3 = null; try { db_conn = ((AQOracleSession)aq_sess).getDBConnection(); queue1 = aq_sess.getQueue("aquser", "q1_adt"); b_array = new byte[5000]; c_array = new char[5000]; for (i = 0; i < 5000; i++) { b_array[i] = 67; c_array[i] = 'c'; } sPayl = new LobMessage(); System.out.println("Enqueue Long messages"); eq_option = new AQEnqueueOption(); /* Enqueue messages with LOB attributes */ for ( i = 0; i < 10; i++) { adt_msg = queue1.createMessage(); sPayload = adt_msg.getObjectPayload(); /* Get Empty BLOB handle */ blob_stmt0 = (OracleCallableStatement)db_conn.prepareCall( "select empty_blob() from dual"); rset0 = (OracleResultSet) blob_stmt0.executeQuery (); try { if (rset0.next()) { b1 = (oracle.sql.BLOB)rset0.getBlob(1); } if (b1 == null) { System.out.println("select empty_blob() from dual failed"); } } catch (Exception ex) { System.out.println("Exception during select from dual " + ex); ex.printStackTrace(); } /* Get Empty CLOB handle */ clob_stmt0 = (OracleCallableStatement)db_conn.prepareCall( "select empty_clob() from dual"); rset1 = (OracleResultSet) clob_stmt0.executeQuery (); try { if (rset1.next()) { c1 = (oracle.sql.CLOB)rset1.getClob(1); } if (c1 == null) { System.out.println("select empty_clob() from dual failed"); } } catch (Exception ex) { System.out.println("Exception2 during select from dual " + ex); ex.printStackTrace(); } id = i+1; mStr = "Message #" + id; sPayl.setId(new BigDecimal(id)); sPayl.setTrailer(new BigDecimal(id)); sPayl.setSubject(mStr); sPayl.setData(b1); sPayl.setCdata(c1); /* Set Object Payload data */ sPayload.setPayloadData(sPayl); /* Enqueue the message */ queue1.enqueue(eq_option, adt_msg); System.out.println("Enqueued Message: " + id ); smsgid = adt_msg.getMessageId(); /* * Note: The message is initially enqueued with an EMPTY BLOB and CLOB * After enqueuing the message, we need to get the lob locators and * then populate the LOBs */ blob_stmt = (OracleCallableStatement)db_conn.prepareCall( "SELECT user_data FROM qt_adt where msgid = ?"); blob_stmt.setBytes(1,smsgid); rset2 = (OracleResultSet) blob_stmt.executeQuery (); try { if (rset2.next()) { /* Get message contents */ sPayl2 = (LobMessage)rset2.getCustomDatum(1, ((CustomDatumFactory)LobMessage.getFactory())); /* Get BLOB locator */ b2 = sPayl2.getData(); /* Popuate the BLOB */ if (b2 == null) { System.out.println("Blob select null"); } if ((i % 3) == 0) { b_len = b2.putBytes(1000,b_array); } else { b_len = b2.putBytes(1,b_array); } /* Get CLOB locator */ c2 = sPayl2.getCdata(); /* Populate the CLOB */ if (c2 == null) { System.out.println("Clob select null"); } if ((i % 4) == 0) { c_len = c2.putChars(2500,c_array); } else { c_len = c2.putChars(1,c_array); } } } catch (Exception ex) { System.out.println("Blob or Clob exception: " + ex); } } Thread.sleep(30000); // dequeue messages dq_option = new AQDequeueOption(); dq_option.setWaitTime(AQDequeueOption.WAIT_NONE); for (i = 0 ; i < 10 ; i++) { /* Dequeue the message */ adt_msg2 = ((AQOracleQueue)queue1).dequeue(dq_option, LobMessage.getFactory()); /* Get payload containing LOB data */ rPayload = adt_msg2.getObjectPayload(); rPayl = (LobMessage) rPayload.getPayloadData(); System.out.println("\n Message: #" + (i+1)); System.out.println(" Id: " + rPayl.getId()); System.out.println(" Subject: " + rPayl.getSubject()); /* Get BLOB data */ b3 = rPayl.getData(); System.out.println(" " + b3.length() + " bytes of data"); /* Get CLOB data */ c3 = rPayl.getCdata(); System.out.println(" " + c3.length() + " chars of data"); System.out.println(" Trailer: " + rPayl.getTrailer()); db_conn.commit(); } } catch (java.sql.SQLException sql_ex) { System.out.println("SQL Exception: " + sql_ex); sql_ex.printStackTrace(); } catch (Exception ex) { System.out.println("Exception-2: " + ex); ex.printStackTrace(); } }
|
![]() Copyright © 1996-2000, Oracle Corporation. All Rights Reserved. |
|