Advanced Queues  

Posted by Mawahid

1. Create a user, tablespace and grant the privileges
First of all, we'll create a user, named AQDEMO, to own and use the message queues. In real life, you'll probably separate the queue owner from message publishers and consumers. However, to keep that demo short, we'll use AQDEMO to publish and consume messages too. It's worth to notice that a tablespace that contains queue tables doesn't support Tablespace Point In Time Recovery (TSPITR). For that reason and to ease the management of our AQ environment, we'll store the queues in a separate tablespace called AQ. Lastly if you want to share queues across multiple databases, you should  use a distributed environment and set the global_names parameter of all the databases to true.

create tablespace aq datafile '/u01/app/oracle/oradata/BLACK/aq01.dbf'  size 25M autoextend on maxsize 266M;
create user aqdemo identified by aqdemo default tablespace aq temporary tablespace temp;
grant connect, resource to aqdemo;
grant execute on dbms_aqadm to aqdemo;
grant execute on dbms_aq to aqdemo;

2. Create a queue table, a multiple consumer queue and start it
To perform these steps, we'll connect as AQDEMO and will use:
dbms_aqadm.create_queue_table - to create the queue table
dbms_aqadm.create_queue - to create the queue
dbms_aqadm.start_queue - to start the queue
To keep it simple, we'll won't use a user defined type for the message but just a RAW:

                        connect aqdemo/aqdemo

begin
 dbms_aqadm.create_queue_table(
    queue_table        =>'myqueue_table',
    queue_payload_type =>'RAW',
    storage_clause     =>'tablespace aq',
    multiple_consumers => true,
    comment            =>'AQ Demo',
    compatible=>'11.1');
end;
/

begin
 dbms_aqadm.create_queue(
    queue_name        => 'myqueue',
    queue_table       => 'myqueue_table');
end;
/

begin
 dbms_aqadm.start_queue(
    queue_name => 'myqueue',
    enqueue    => true,
    dequeue    => true);
end;
/

Note:

If you plan to create a queue in another schema, you can prefix the queue table or queue name with the schema name like AQDEMO.MYQUEUE_TABLE.
If you want to mix cases, you can surround the name with double-quotes, like "AQDEMO"."myqueue_table" but that's probably easier to use uppercase.


Once the queue table created, several tables and views are used to store and query the associated data; here is an example with AQ$[QUEUE_TABLE]:

                desc aq$myqueue_table


3. Create subscribers for the queue

In the case of Publish/Subscribe, you have to create subscribers to dequeue messages. In our example, we'll register 2 subscribers called SUBSCRIBER1 and SUBSCRIBER2 for that purpose:


connect aqdemo/aqdemo

declare
 subscriber  sys.aq$_agent;
begin
 subscriber := sys.aq$_agent(
                'subscriber1',
                'myqueue',
                null);
 dbms_aqadm.add_subscriber(
                queue_name    => 'myqueue',
                subscriber    => subscriber,
                  delivery_mode => dbms_aqadm.buffered);
end;
/

declare
 subscriber  sys.aq$_agent;
begin
 subscriber := sys.aq$_agent(
               'subscriber2',
               'myqueue',
               null);
 dbms_aqadm.add_subscriber(
               queue_name    => 'myqueue',
               subscriber    => subscriber,
                 delivery_mode => dbms_aqadm.buffered);
end;
/


You can check queue subscribers by querying the AQ$MYQUEUE_TABLE_S view (or AQ$_MYQUEUE_TABLE_S table) like below:

col name    format a12
col address format a40

select name, address
from aq$myqueue_table_s
where queue='MYQUEUE';

NAME      ADDRESS
------------ -------------------------
SUBSCRIBER1
SUBSCRIBER2


4. Create a procedure to enqueue buffered messages

The procedure below can be used to enqueue buffered messages; The options and properties used to enqueue the messages are the following:
·         The message is a RAW(10) passed from the procedure parameter as an hexadecimal string
·         The list of message recipients are SUBSCRIBER1 and SUBSCRIBER2
·         The message is a buffered message (dbms_aq.buffered)


connect aqdemo/aqdemo

create or replace procedure demo_enqueue(p_hexamsg varchar2) is
 enqueue_options     DBMS_AQ.enqueue_options_t;
 message_properties  DBMS_AQ.message_properties_t;
 recipients          DBMS_AQ.aq$_recipient_list_t;
 message_handle      RAW(16);
 message             RAW(10);
begin
 message := hextoraw(p_hexamsg);
 recipients(1) := sys.aq$_agent('SUBSCRIBER1', 'MYQUEUE', NULL);
 recipients(2) := sys.aq$_agent('SUBSCRIBER2', 'MYQUEUE', NULL);
 message_properties.recipient_list := recipients;
 enqueue_options.visibility := dbms_aq.immediate;
 enqueue_options.delivery_mode := dbms_aq.buffered;
 dbms_aq.enqueue(
     queue_name         => 'MYQUEUE',
     enqueue_options    => enqueue_options,
     message_properties => message_properties,
     payload            => message,
     msgid              => message_handle);
 commit;
end;
/

Once the procedure created, you can easily enqueue messages from the AQDEMO user:

                SQL> exec demo_enqueue('00000000000000000001');

5. Create a procedure to dequeue buffered messages

The procedure below dequeues the buffered messages and display it with DBMS_OUTPUT.PUT_LINE;  it takes the subscriber name as a parameter

connect aqdemo/aqdemo

set serveroutput on

create or replace procedure demo_dequeue(p_subscriber varchar2)
is
 dequeue_options       dbms_aq.dequeue_options_t;
 message_properties    dbms_aq.message_properties_t;
 message_handle        RAW(16);
 message               RAW(10);
 no_messages           exception;
 pragma exception_init(no_messages, -25228);
begin
 dequeue_options.wait          := dbms_aq.no_wait;
 dequeue_options.consumer_name := p_subscriber;
 dequeue_options.navigation    := dbms_aq.first_message;
 dequeue_options.visibility    := dbms_aq.immediate;
 dequeue_options.delivery_mode := dbms_aq.buffered;
loop
begin
 dbms_aq.dequeue(
     queue_name         => 'myqueue',
     dequeue_options    => dequeue_options,
     message_properties => message_properties,
     payload            => message,
     msgid              => message_handle);
 dbms_output.put_line('Message: '|| hextoraw(message) );
 dequeue_options.navigation := dbms_aq.next_message;
end;
end loop;
exception
 when no_messages then
    dbms_output.put_line('No more messages');
    commit;
end;
/

You can dequeue messages with demo_dequeue like below:

set serveroutput on
exec demo_dequeue('SUBSCRIBER1');

Message: 00000000000000000001
No more messages

6. Enqueue & Dequeue messages

Here is a sequence of enqueue and dequeue of buffered messages:

exec demo_enqueue('00000000000000000002');
exec demo_enqueue('00000000000000000003');

set serveroutput on
exec demo_dequeue('SUBSCRIBER1');

Message: 00000000000000000002
Message: 00000000000000000003
No more messages

exec demo_enqueue('00000000000000000004');

exec demo_dequeue('SUBSCRIBER1');
Message: 00000000000000000004
No more messages

set lines 100
col subscriber_name format a15
col subscriber_type format a15
col startup_time    format a18
col total_dequeued_msg format 999,999

select queue_id,
     subscriber_name,
     subscriber_type,
     startup_time,
     total_dequeued_msg
from v$buffered_subscribers
where queue_schema='AQDEMO'
 and queue_name='MYQUEUE';

QUEUE_ID SUBSCRIBER_NAME SUBSCRIBER_TYPE STARTUP_TIME     TOTAL_DEQUEUED_MSG
-------- --------------- --------------- ------------------ ------------------
 73821 SUBSCRIBER1     SUBSCRIBER      03-JAN-09 17:55:22                  0
 73821 SUBSCRIBER2     SUBSCRIBER      03-JAN-09 17:55:22                  0

7. Monitor queues and subscribers

You can query the V$BUFFERED_QUEUE and V$BUFFERED_SUBSCRIBERS fixed views to check the number of messages in the queues and how many messages have been dequeued:

               
connect / as sysdba

alter session set nls_date_format='DD-MON-YY HH24:MI:SS';

col queue_id     format 999999
col startup_time format a18
col num_msgs     format 999,999
col spill_msgs   format 999,999
set lines 100

select   queue_id
    , startup_time
    , sysdate
    , num_msgs
    , spill_msgs
from v$buffered_queues
where queue_schema='AQDEMO'
 and queue_name='MYQUEUE';

QUEUE_ID STARTUP_TIME       SYSDATE            NUM_MSGS SPILL_MSGS
-------- ------------------ ------------------ -------- ----------
 74026 04-JAN-09 00:13:12 04-JAN-09 00:17:17        4          0

set lines 100
col subscriber_name format a15
col subscriber_type format a15
col startup_time    format a18
col total_dequeued_msg format 999,999

select queue_id
   , subscriber_name
   , subscriber_type
   , startup_time
   , total_dequeued_msg
from v$buffered_subscribers
where queue_schema='AQDEMO'
 and queue_name='MYQUEUE';

QUEUE_ID SUBSCRIBER_NAME SUBSCRIBER_TYPE STARTUP_TIME       TOTAL_DEQUEUED_MSG
-------- --------------- --------------- ------------------ ------------------
 74026 SUBSCRIBER1     SUBSCRIBER      04-JAN-09 00:13:12                  4
 74026 SUBSCRIBER2     SUBSCRIBER      04-JAN-09 00:13:12                  0

You can dequeue the message from the other subscriber:
exec demo_dequeue('SUBSCRIBER2');

Message: 00000000000000000001
Message: 00000000000000000002
Message: 00000000000000000003
Message: 00000000000000000004
No more messages

8. Illustrate how you can loose messages by crashing an instance
You can easily show that a buffered messages can be lost. Here is a simple scenario to illustrate that point with a bounce of the Oracle instance:
connect aqdemo/aqdemo
exec demo_enqueue('00000000000000000005');

connect / as sysdba
startup force

connect aqdemo/aqdemo
exec demo_enqueue('00000000000000000006');

set serveroutput on
exec demo_dequeue('SUBSCRIBER1');

Message: 00000000000000000006
No more messages

exec demo_dequeue('SUBSCRIBER2')
Message: 00000000000000000006
No more messages

As you can see above, the message 00000000000000000005 has never been dequeued but is lost; The same scenario illustrates, as expected, that statistics of queue and subscribers usage are reset when an instance is bounced:
connect / as sysdba

alter session set nls_date_format='DD-MON-YY HH24:MI:SS';

col queue_id     format 999999
col startup_time format a18
col num_msgs     format 999,999
col spill_msgs   format 999,999
set lines 100

select queue_id
   , startup_time
   , sysdate
   , num_msgs
   , spill_msgs
from v$buffered_queues
where queue_schema='AQDEMO'
 and queue_name='MYQUEUE';

QUEUE_ID STARTUP_TIME       SYSDATE            NUM_MSGS SPILL_MSGS
-------- ------------------ ------------------ -------- ----------
 74026 04-JAN-09 00:30:03 04-JAN-09 00:33:55        0          0

set lines 100
col subscriber_name format a15
col subscriber_type format a15
col startup_time    format a18
col total_dequeued_msg format 999,999

select queue_id
   , subscriber_name
   , subscriber_type
   , startup_time
   , total_dequeued_msg
from v$buffered_subscribers
where queue_schema='AQDEMO'
 and queue_name='MYQUEUE';

QUEUE_ID SUBSCRIBER_NAME SUBSCRIBER_TYPE STARTUP_TIME      TOTAL_DEQUEUED_MSG
-------- --------------- --------------- ------------------ ------------------
 74026 SUBSCRIBER1     SUBSCRIBER      04-JAN-09 00:30:03                  1
 74026 SUBSCRIBER2     SUBSCRIBER      04-JAN-09 00:30:03                  1

9. Illustrate Flow Control and Spilled Messages
One of the benefit of using buffered messages is that they remain in memory and are not stored on disks. A direct consequence of that is that buffered messages are not logged and as we've seen previously can be lost. The devil being in the details, the memory allocated for the Streams Pool, that store the messages, is not unlimited and compete with other memory pools. To prevent that memory to increase without limit Oracle offers several mechanisms; One is Publisher Flow Control that prevents messages from being enqueued when the messages are not dequeued fast enough. Another one is the ability for messages to spill on disks in the queue table. Here is a simple test case shows these features :
connect aqdemo/aqdemo

begin
 for i in 7..100000 loop
    demo_enqueue(lpad(to_char(i),20,'0'));
 end loop;
end;
/

begin
*
ERROR at line 1:
ORA-25307: Enqueue rate too high, flow control enabled
ORA-06512: at "SYS.DBMS_AQ", line 6
ORA-06512: at "SYS.DBMS_AQ", line 216
ORA-06512: at "AQDEMO.DEMO_ENQUEUE", line 14
ORA-06512: at line 3

If you monitor the content of the queues right after you've enqueued them, you'll see that messages are in the memory associated with the queue:
connect / as sysdba

select queue_id
   , startup_time
   , sysdate
   , num_msgs
   , spill_msgs
from v$buffered_queues
where queue_schema='AQDEMO'
 and queue_name='MYQUEUE';

QUEUE_ID STARTUP_TIME     SYSDATE   NUM_MSGS SPILL_MSGS
-------- ------------------ --------- -------- ----------
 74026 04-JAN-09          04-JAN-09    4,021          0

select MSG_STATE
   , count(*)
from aqdemo.aq$myqueue_table
where queue='MYQUEUE'
group by MSG_STATE;

MSG_STATE COUNT(*)
--------- --------
IN MEMORY     8042

col bytes format 999,999,999
col name  format a30
set pages 1000

select name, bytes
from v$sgastat
where pool='streams pool'
order by name;

NAME                              BYTES
-------------------------- ------------
Sender info                       8,484
deqtree_kgqmctx                      80
fixed allocation callback           260
free memory                   5,702,748
image handles                   337,452
kgqbt_alloc_block                99,456
kgqmdm_fl_2                     241,288
kgqmsub                             144
kodpaih3 image                8,130,416
kwqbcqini:spilledovermsgs         1,968
kwqbdaspl:spilledovermsgs       172,896
kwqbsinfy:bms                   305,368
kwqbsinfy:bqg                       808
kwqbsinfy:mpr                 1,928,180
kwqbsinfy:sta                       208
msgtree_kgqmctx                      80
name_kgqmsub                         32
recov_kgqbtctx                    8,192
recov_kgqmctx                       616
recov_kgqmsub                       336
spilled:kwqbl                     2,316
spilled:kwqbm                     8,624
substree_kgqmctx                     80
time manager index                   80

And you'll also see the publisher is "IN FLOW CONTROL":
select publisher_state
from v$buffered_publishers
where queue_schema='AQDEMO'
 and queue_name='MYQUEUE';

PUBLISHER_STATE
-----------------------------------------------------------
IN FLOW CONTROL: INSUFFICIENT MEMORY AND UNBROWSED MESSAGES

If you wait for a few minutes, you'll see messages start to spill on disks:
select queue_id
   , startup_time
   , sysdate
   , num_msgs
   , spill_msgs
from v$buffered_queues
where queue_schema='AQDEMO'
 and queue_name='MYQUEUE';

QUEUE_ID STARTUP_TIME       SYSDATE   NUM_MSGS SPILL_MSGS
-------- ------------------ --------- -------- ----------
 74026 04-JAN-09          04-JAN-09    4,021      4,021

select MSG_STATE, count(*)
from aqdemo.aq$myqueue_table
where queue='MYQUEUE'
group by MSG_STATE;

MSG_STATE COUNT(*)
--------- --------
SPILLED      8042

You can dequeue the messages as you've done already
connect aqdemo/aqdemo

begin
 for i in 1..5000 loop
    demo_dequeue('SUBSCRIBER1');
    demo_dequeue('SUBSCRIBER2');
 end loop;
end;
/

10. Clean up the environment
connect / as sysdba

drop procedure aqdemo.demo_enqueue;
drop procedure aqdemo.demo_dequeue;

declare
 subscriber       sys.aq$_agent;
begin
 subscriber := sys.aq$_agent('subscriber1',
                             'aqdemo.myqueue',
                             null);
 dbms_aqadm.remove_subscriber(
       queue_name => 'aqdemo.myqueue',
       subscriber =>  subscriber);
 subscriber := sys.aq$_agent('subscriber2',
                             'aqdemo.myqueue',
                              null);
 dbms_aqadm.remove_subscriber(
       queue_name => 'aqdemo.myqueue',
       subscriber =>  subscriber);
end;
/

begin
 dbms_aqadm.stop_queue(
    queue_name => 'aqdemo.myqueue',
    enqueue    => true,
    dequeue    => true);
end;
/

begin
 dbms_aqadm.drop_queue(
    queue_name => 'aqdemo.myqueue');
end;
/

begin
 dbms_aqadm.drop_queue_table(
    queue_table =>'aqdemo.myqueue_table');
end;
/

drop user aqdemo;

drop tablespace aq
   including contents and datafiles;

Related References:
Master Note for Troubleshooting Advanced Queuing and Oracle Streams Propagation Issues [ID 233099.1]
DBA Views:
DBA_QUEUE_SCHEDULES            -              Propagation Schedules
DBA_QUEUES                                    -              Queues in Database
DBA_QUEUE_TABLES                     -              Queue Tables in Database
DBA_QUEUE_SUBSCRIBERS        -              Queue Subscribers in Database
QUEUE_PRIVILEGES                       -              Queues for Which User Has Queue Privilege
AQ$Queue_Table_Name             -              Messages in Queue Table
AQ$Queue_Table_Name_S        -              Queue Subscribers
AQ$Queue_Table_Name_R       -              Queue Subscribers and Their Rules
V$BUFFERED_QUEUES                  -              Buffered Queues in the Instance
V$BUFFERED_SUBSCRIBERS        -              Subscribers for All Buffered Queues in the Instance

For more information on AQ views

http://download.oracle.com/docs/cd/B28359_01/server.111/b28420/aq_views.htm