Oracle8i Application Developer's Guide - Advanced Queuing Release 2 (8.1.6) Part Number A76938-01 |
|
Oracle Advanced Queuing by Example, 7 of 8
/* * The program uses the XA interface to enqueue 100 messages and then * dequeue them. * Login: aq/aq * Requires: AQ_USER_ROLE to be granted to aq * a RAW queue called "aqsqueue" to be created in aqs schema * (above steps can be performed by running aqaq.sql) * Message Format: Msgno: [0-1000] HELLO, WORLD! * Author: schandra@us.oracle.com */ #ifndef OCI_ORACLE #include <oci.h> #endif #include <xa.h> /* XA open string */ char xaoinfo[] = "oracle_xa+ACC=P/AQ/AQ+SESTM=30+Objects=T"; /* template for generating XA XIDs */ XID xidtempl = { 0x1e0a0a1e, 12, 8, "GTRID001BQual001" }; /* Pointer to Oracle XA function table */ extern struct xa_switch_t xaosw; /* Oracle XA switch */ static struct xa_switch_t *xafunc = &xaosw; /* dummy stubs for ax_reg and ax_unreg */ int ax_reg(rmid, xid, flags) int rmid; XID *xid; long flags; { xid->formatID = -1; return 0; } int ax_unreg(rmid, flags) int rmid; long flags; { return 0; } /* generate an XID */ void xidgen(xid, serialno) XID *xid; int serialno; { char seq [11]; sprintf(seq, "%d", serialno); memcpy((void *)xid, (void *)&xidtempl, sizeof(XID)); strncpy((&xid->data[5]), seq, 3); } /* check if XA operation succeeded */ #define checkXAerr(action, funcname) \ if ((action) != XA_OK) \ { \ printf("%s failed!\n", funcname); \ exit(-1); \ } else /* check if OCI operation succeeded */ static void checkOCIerr(errhp, status) OCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; if (status == OCI_SUCCESS) return; if (status == OCI_ERROR) { OCIErrorGet((dvoid *) errhp, 1, (text *)0, &errcode, errbuf, (ub4)sizeof(errbuf), OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); } else printf("Error - %d\n", status); exit (-1); } void main(argc, argv) int argc; char **argv; { int msgno = 0; /* message being enqueued */ OCIEnv *envhp; /* OCI environment handle */ OCIError *errhp; /* OCI Error handle */ OCISvcCtx *svchp; /* OCI Service handle */ char message[128]; /* message buffer */ ub4 mesglen; /* length of message */ OCIRaw *rawmesg = (OCIRaw *)0; /* message in OCI RAW format */ OCIInd ind = 0; /* OCI null indicator */ dvoid *indptr = (dvoid *)&ind; /* null indicator pointer */ OCIType *mesg_tdo = (OCIType *) 0; /* TDO for RAW datatype */ XID xid; /* XA's global transaction id */ ub4 i; /* array index */ checkXAerr(xafunc->xa_open_entry(xaoinfo, 1, TMNOFLAGS), "xaoopen"); svchp = xaoSvcCtx((text *)0); /* get service handle from XA */ envhp = xaoEnv((text *)0); /* get enviornment handle from XA */ if (!svchp || !envhp) { printf("Unable to obtain OCI Handles from XA!\n"); exit (-1); } OCIHandleAlloc((dvoid *)envhp, (dvoid **)&errhp, OCI_HTYPE_ERROR, 0, (dvoid **)0); /* allocate error handle */ /* enqueue 1000 messages, 1 message per XA transaction */ for (msgno = 0; msgno < 1000; msgno++) { sprintf((const char *)message, "Msgno: %d, Hello, World!", msgno); mesglen = (ub4)strlen((const char *)message); xidgen(&xid, msgno); /* generate an XA xid */ checkXAerr(xafunc->xa_start_entry(&xid, 1, TMNOFLAGS), "xaostart"); checkOCIerr(errhp, OCIRawAssignBytes(envhp, errhp, (ub1 *)message, mesglen, &rawmesg)); if (!mesg_tdo) /* get Type descriptor (TDO) for RAW type */ checkOCIerr(errhp, OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQADM", strlen("AQADM"), (CONST text *)"RAW", strlen("RAW"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo)); checkOCIerr(errhp, OCIAQEnq(svchp, errhp, (CONST text *)"aqsqueue", 0, 0, mesg_tdo, (dvoid **)&rawmesg, &indptr, 0, 0)); checkXAerr(xafunc->xa_end_entry(&xid, 1, TMSUCCESS), "xaoend"); checkXAerr(xafunc->xa_commit_entry(&xid, 1, TMONEPHASE), "xaocommit"); printf("%s Enqueued\n", message); } /* dequeue 1000 messages within one XA transaction */ xidgen(&xid, msgno); /* generate an XA xid */ checkXAerr(xafunc->xa_start_entry(&xid, 1, TMNOFLAGS), "xaostart"); for (msgno = 0; msgno < 1000; msgno++) { checkOCIerr(errhp, OCIAQDeq(svchp, errhp, (CONST text *)"aqsqueue", 0, 0, mesg_tdo, (dvoid **)&rawmesg, &indptr, 0, 0)); if (ind) printf("Null Raw Message"); else for (i = 0; i < OCIRawSize(envhp, rawmesg); i++) printf("%c", *(OCIRawPtr(envhp, rawmesg) + i)); printf("\n"); } checkXAerr(xafunc->xa_end_entry(&xid, 1, TMSUCCESS), "xaoend"); checkXAerr(xafunc->xa_commit_entry(&xid, 1, TMONEPHASE), "xaocommit"); }
|
![]() Copyright © 1996-2000, Oracle Corporation. All Rights Reserved. |
|