msgbartop
News, views, tips and tricks on Oracle and other fun stuff
msgbarbottom

Event-based Scheduler Jobs and Oracle Streams Advanced Queuing: A Powerful Combination

Problem:

An external or internal application wants to send inbound data to your Oracle database, like for example a purchase order or a receivable invoice. You want to consume this data in real time.

Assumptions:

The version of your Oracle database is 11gR2 or above. The only “tool” you have is the Oracle database, i.e. you do not have any middleware (like SOA suite) installed. The external or internal application can connect to your Oracle database.

Solution:

Use Oracle Streams Advanced Queuing (AQ) for messaging. Use an event-based scheduler job for real time data consumption. Both are standard features of the Oracle database.

Implementation Steps:

The following is a fully functional example of the solution, starting from creating a new database user and ending with consuming the message in the database.

Step 1:

Login as a user with DBA privileges and execute the following to create a new database user. Alternatively, you may skip this step if you want to use an existing user in your database, but make sure the existing user has the privileges listed next.

CREATE USER john IDENTIFIED BY john DEFAULT TABLESPACE users;
ALTER USER john QUOTA UNLIMITED ON users;

Grant John the privileges to connect to the database, create types, tables and stored procedures:

GRANT CREATE SESSION TO john;
GRANT CREATE TYPE TO john;
GRANT CREATE TABLE TO john;
GRANT CREATE PROCEDURE TO john;

To create DBMS Scheduler jobs, John needs the CREATE JOB privilege:

GRANT CREATE JOB TO john;

To create Advanced Queues, enqueue and dequeue, John needs to use DBMS_AQADM and DBMS_AQ:

GRANT EXECUTE ON DBMS_AQADM TO john;
GRANT EXECUTE ON DBMS_AQ TO john;

Step 2:

Log in as John and create the object type of the message you are going to receive from the external application. This is also the type of the payload in the queue. I’m using a simple object type for demonstration purposes. The object type may be as complex as you need it to be. The Payload type may also be a LOB, in this case, you do not need to create an object type. Demonstrating a LOB payload type is not part of this article.

CREATE OR REPLACE TYPE msg_ot AS OBJECT
(
   msg VARCHAR2(4000)
);

For demo purposes, I create a table to save the payload of the received message:

CREATE TABLE messages (msg XMLTYPE, date_created TIMESTAMP);

Step 3:

Create the Oracle AQ queue table and queue that will be used in the messaging between the external application and Oracle database. The table has to be created with “multiple_consumers => TRUE” if to be used by an event based job, otherwise you get ORA-27373.

DECLARE
   l_q_name              CONSTANT VARCHAR2 (30) := 'MY_Q';
   l_payload_type_name   CONSTANT VARCHAR2 (30) := 'MSG_OT';
BEGIN
   DBMS_AQADM.create_queue_table (queue_table          => l_q_name,
                                  queue_payload_type   => l_payload_type_name,
                                  multiple_consumers   => TRUE,
                                  COMMENT              => 'My queue');
   DBMS_AQADM.create_queue (queue_name       => l_q_name,
                            queue_table      => l_q_name,
                            retention_time   => 86400, /*24 hours in seconds*/
                            COMMENT          => 'My queue');
   DBMS_AQADM.start_queue (queue_name => l_q_name);

   COMMIT;
END;
/

Step 4:

Create the PL/SQL code that will accept the message and consume it. For demonstration purposes, I just convert the message into an XML instance and insert it into a table.

CREATE OR REPLACE PACKAGE my_pkg
AS
   PROCEDURE process_q_msg (p_msg_o IN msg_ot);
END my_pkg;
/
CREATE OR REPLACE PACKAGE BODY my_pkg
AS
   PROCEDURE process_q_msg (p_msg_o IN msg_ot)
   IS
   BEGIN
      INSERT INTO messages (msg, date_created) 
      VALUES (XMLTYPE(p_msg_o.msg), SYSTIMESTAMP);
   END process_q_msg;
END my_pkg;
/

Step 5:

Create an event based job that dequeues, in real time, messages queued into the queue created in step 3. the job also calls the procedure my_pkg.process_q_msg created in step 4 and passes to it the message payload.

BEGIN
   DBMS_SCHEDULER.create_program (program_name          => 'my_program_p',
                          program_action        => 'my_pkg.process_q_msg',
                          program_type          => 'stored_procedure',
                          number_of_arguments   => 1,
                          enabled               => FALSE);

   DBMS_SCHEDULER.define_metadata_argument 
                         ('my_program_p', 'event_message', 1);
   DBMS_SCHEDULER.enable ('my_program_p');

   DBMS_SCHEDULER.create_job (job_name       => 'my_job_j',
                                  program_name   => 'my_program_p',
                                  queue_spec     => 'john.my_q',
                                  enabled        => FALSE);
   DBMS_SCHEDULER.set_attribute ('my_job_j', 'parallel_instances', TRUE);
   DBMS_SCHEDULER.enable ('my_job_j');
END;
/

Test:

Now, we are going to assume the identity of an external application and enqueue a couple of messages into the MY_Q queue to test the process. Note that the event-based scheduler job has to be created first, before putting any message into the queue, otherwise you get this error: ORA-24033: no recipients for message.

DECLARE
   l_userdata             msg_ot;
   l_q_name      CONSTANT VARCHAR2 (30) := 'MY_Q';
   l_enqueue_options      DBMS_AQ.ENQUEUE_OPTIONS_T;
   l_message_properties   DBMS_AQ.MESSAGE_PROPERTIES_T;
   l_message_id           RAW (16);
BEGIN
   l_userdata := msg_ot ('
        <PurchaseOrder>
          <Reference>PO1</Reference>
          <LineItems>
            <LineItem ItemNumber="1">
              <Description>A Night to Remember</Description>
              <Part Id="715515009058" UnitPrice="39.95" Quantity="2"/>
            </LineItem>
          </LineItems>
        </PurchaseOrder>      
      ');
   DBMS_AQ.enqueue (queue_name           => l_q_name,
                    enqueue_options      => l_enqueue_options,
                    message_properties   => l_message_properties,
                    payload              => l_userdata,
                    msgid                => l_message_id);

   l_userdata := msg_ot ('
        <PurchaseOrder>
          <Reference>PO2</Reference>
          <LineItems>
            <LineItem ItemNumber="1">
              <Description>The Unbearable Lightness Of Being</Description>
              <Part Id="37429140222" UnitPrice="29.95" Quantity="2"/>
            </LineItem>
          </LineItems>
        </PurchaseOrder>      
      ');
   DBMS_AQ.enqueue (queue_name           => l_q_name,
                    enqueue_options      => l_enqueue_options,
                    message_properties   => l_message_properties,
                    payload              => l_userdata,
                    msgid                => l_message_id);
   COMMIT;
END;
/

When you execute the above code and query the MY_Q queue, you’ll see the two message are there in a PROCESSED state, which means they are already picked up by the event-based job and processed by my_pkg.process_q_msg.

john@DB11202> SELECT msg_id, msg_state FROM aq$my_q;

MSG_ID                           MSG_STATE
-------------------------------- ----------------
E5F3EDD4A23C4928B46E7FBD8EAF2910 PROCESSED
A2B715B3FC014C2B80519EE7DA520A73 PROCESSED

2 rows selected.

You can also see that the payload was saved in the messages table.

john@DB11202> COLUMN po_reference format A15
john@DB11202> SELECT EXTRACT (msg, '//PurchaseOrder/Reference/text()')
  2         AS po_reference,
  3             date_created
  4  FROM messages;

PO_REFERENCE    DATE_CREATED
--------------- ------------------------------------------------------
PO2             21-OCT-12 07.31.03.168000 PM
PO1             21-OCT-12 07.31.03.152000 PM

2 rows selected.

Beautiful!


Filed in Oracle on 23 Oct 12 | Tags: ,


Comments are closed.