1. Create a user, tablespace and grant the privileges
First of all, we'll create a user, named AQDEMO, to own and use the message queues. In real life, you'll probably separate the queue owner from message publishers and consumers. However, to keep that demo short, we'll use AQDEMO to publish and consume messages too. It's worth to notice that a tablespace that contains queue tables doesn't support Tablespace Point In Time Recovery (TSPITR). For that reason and to ease the management of our AQ environment, we'll store the queues in a separate tablespace called AQ. Lastly if you want to share queues across multiple databases, you should use a distributed environment and set the global_names parameter of all the databases to true.
create tablespace aq datafile '/u01/app/oracle/oradata/BLACK/aq01.dbf' size 25M autoextend on maxsize 266M;
create user aqdemo identified by aqdemo default tablespace aq temporary tablespace temp;
grant connect, resource to aqdemo;
grant execute on dbms_aqadm to aqdemo;
grant execute on dbms_aq to aqdemo;
2. Create a queue table, a multiple consumer queue and start it
To perform these steps, we'll connect as AQDEMO and will use:
dbms_aqadm.create_queue_table - to create the queue table
dbms_aqadm.create_queue - to create the queue
dbms_aqadm.start_queue - to start the queue
To keep it simple, we'll won't use a user defined type for the message but just a RAW:
connect aqdemo/aqdemo
begin
dbms_aqadm.create_queue_table(
queue_table =>'myqueue_table',
queue_payload_type =>'RAW',
storage_clause =>'tablespace aq',
multiple_consumers => true,
comment =>'AQ Demo',
compatible=>'11.1');
end;
/
begin
dbms_aqadm.create_queue(
queue_name => 'myqueue',
queue_table => 'myqueue_table');
end;
/
begin
dbms_aqadm.start_queue(
queue_name => 'myqueue',
enqueue => true,
dequeue => true);
end;
/
Note:
If you plan to create a queue in another schema, you can prefix the queue table or queue name with the schema name like AQDEMO.MYQUEUE_TABLE.
If you want to mix cases, you can surround the name with double-quotes, like "AQDEMO"."myqueue_table" but that's probably easier to use uppercase.
Once the queue table created, several tables and views are used to store and query the associated data; here is an example with AQ$[QUEUE_TABLE]:
desc aq$myqueue_table
3. Create subscribers for the queue
In the case of Publish/Subscribe, you have to create subscribers to dequeue messages. In our example, we'll register 2 subscribers called SUBSCRIBER1 and SUBSCRIBER2 for that purpose:
connect aqdemo/aqdemo
declare
subscriber sys.aq$_agent;
begin
subscriber := sys.aq$_agent(
'subscriber1',
'myqueue',
null);
dbms_aqadm.add_subscriber(
queue_name => 'myqueue',
subscriber => subscriber,
delivery_mode => dbms_aqadm.buffered);
end;
/
declare
subscriber sys.aq$_agent;
begin
subscriber := sys.aq$_agent(
'subscriber2',
'myqueue',
null);
dbms_aqadm.add_subscriber(
queue_name => 'myqueue',
subscriber => subscriber,
delivery_mode => dbms_aqadm.buffered);
end;
/
You can check queue subscribers by querying the AQ$MYQUEUE_TABLE_S view (or AQ$_MYQUEUE_TABLE_S table) like below:
col name format a12
col address format a40
select name, address
from aq$myqueue_table_s
where queue='MYQUEUE';
NAME ADDRESS
------------ -------------------------
SUBSCRIBER1
SUBSCRIBER2
4. Create a procedure to enqueue buffered messages
The procedure below can be used to enqueue buffered messages; The options and properties used to enqueue the messages are the following:
· The message is a RAW(10) passed from the procedure parameter as an hexadecimal string
· The list of message recipients are SUBSCRIBER1 and SUBSCRIBER2
· The message is a buffered message (dbms_aq.buffered)
connect aqdemo/aqdemo
create or replace procedure demo_enqueue(p_hexamsg varchar2) is
enqueue_options DBMS_AQ.enqueue_options_t;
message_properties DBMS_AQ.message_properties_t;
recipients DBMS_AQ.aq$_recipient_list_t;
message_handle RAW(16);
message RAW(10);
begin
message := hextoraw(p_hexamsg);
recipients(1) := sys.aq$_agent('SUBSCRIBER1', 'MYQUEUE', NULL);
recipients(2) := sys.aq$_agent('SUBSCRIBER2', 'MYQUEUE', NULL);
message_properties.recipient_list := recipients;
enqueue_options.visibility := dbms_aq.immediate;
enqueue_options.delivery_mode := dbms_aq.buffered;
dbms_aq.enqueue(
queue_name => 'MYQUEUE',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
commit;
end;
/
Once the procedure created, you can easily enqueue messages from the AQDEMO user:
SQL> exec demo_enqueue('00000000000000000001');
5. Create a procedure to dequeue buffered messages
The procedure below dequeues the buffered messages and display it with DBMS_OUTPUT.PUT_LINE; it takes the subscriber name as a parameter
connect aqdemo/aqdemo
set serveroutput on
create or replace procedure demo_dequeue(p_subscriber varchar2)
is
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message RAW(10);
no_messages exception;
pragma exception_init(no_messages, -25228);
begin
dequeue_options.wait := dbms_aq.no_wait;
dequeue_options.consumer_name := p_subscriber;
dequeue_options.navigation := dbms_aq.first_message;
dequeue_options.visibility := dbms_aq.immediate;
dequeue_options.delivery_mode := dbms_aq.buffered;
loop
begin
dbms_aq.dequeue(
queue_name => 'myqueue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
dbms_output.put_line('Message: '|| hextoraw(message) );
dequeue_options.navigation := dbms_aq.next_message;
end;
end loop;
exception
when no_messages then
dbms_output.put_line('No more messages');
commit;
end;
/
You can dequeue messages with demo_dequeue like below:
set serveroutput on
exec demo_dequeue('SUBSCRIBER1');
Message: 00000000000000000001
No more messages
6. Enqueue & Dequeue messages
Here is a sequence of enqueue and dequeue of buffered messages:
exec demo_enqueue('00000000000000000002');
exec demo_enqueue('00000000000000000003');
set serveroutput on
exec demo_dequeue('SUBSCRIBER1');
Message: 00000000000000000002
Message: 00000000000000000003
No more messages
exec demo_enqueue('00000000000000000004');
exec demo_dequeue('SUBSCRIBER1');
Message: 00000000000000000004
No more messages
set lines 100
col subscriber_name format a15
col subscriber_type format a15
col startup_time format a18
col total_dequeued_msg format 999,999
select queue_id,
subscriber_name,
subscriber_type,
startup_time,
total_dequeued_msg
from v$buffered_subscribers
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';
QUEUE_ID SUBSCRIBER_NAME SUBSCRIBER_TYPE STARTUP_TIME TOTAL_DEQUEUED_MSG
-------- --------------- --------------- ------------------ ------------------
73821 SUBSCRIBER1 SUBSCRIBER 03-JAN-09 17:55:22 0
73821 SUBSCRIBER2 SUBSCRIBER 03-JAN-09 17:55:22 0
You can query the V$BUFFERED_QUEUE and V$BUFFERED_SUBSCRIBERS fixed views to check the number of messages in the queues and how many messages have been dequeued:
connect / as sysdba
alter session set nls_date_format='DD-MON-YY HH24:MI:SS';
col queue_id format 999999
col startup_time format a18
col num_msgs format 999,999
col spill_msgs format 999,999
set lines 100
select queue_id
, startup_time
, sysdate
, num_msgs
, spill_msgs
from v$buffered_queues
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';
QUEUE_ID STARTUP_TIME SYSDATE NUM_MSGS SPILL_MSGS
-------- ------------------ ------------------ -------- ----------
74026 04-JAN-09 00:13:12 04-JAN-09 00:17:17 4 0
set lines 100
col subscriber_name format a15
col subscriber_type format a15
col startup_time format a18
col total_dequeued_msg format 999,999
select queue_id
, subscriber_name
, subscriber_type
, startup_time
, total_dequeued_msg
from v$buffered_subscribers
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';
QUEUE_ID SUBSCRIBER_NAME SUBSCRIBER_TYPE STARTUP_TIME TOTAL_DEQUEUED_MSG
-------- --------------- --------------- ------------------ ------------------
74026 SUBSCRIBER1 SUBSCRIBER 04-JAN-09 00:13:12 4
74026 SUBSCRIBER2 SUBSCRIBER 04-JAN-09 00:13:12 0
You can dequeue the message from the other subscriber:
exec demo_dequeue('SUBSCRIBER2');
Message: 00000000000000000001
Message: 00000000000000000002
Message: 00000000000000000003
Message: 00000000000000000004
No more messages
8. Illustrate how you can loose messages by crashing an instance
You can easily show that a buffered messages can be lost. Here is a simple scenario to illustrate that point with a bounce of the Oracle instance:
connect aqdemo/aqdemo
exec demo_enqueue('00000000000000000005');
connect / as sysdba
startup force
connect aqdemo/aqdemo
exec demo_enqueue('00000000000000000006');
set serveroutput on
exec demo_dequeue('SUBSCRIBER1');
Message: 00000000000000000006
No more messages
exec demo_dequeue('SUBSCRIBER2')
Message: 00000000000000000006
No more messages
As you can see above, the message 00000000000000000005 has never been dequeued but is lost; The same scenario illustrates, as expected, that statistics of queue and subscribers usage are reset when an instance is bounced:
connect / as sysdba
alter session set nls_date_format='DD-MON-YY HH24:MI:SS';
col queue_id format 999999
col startup_time format a18
col num_msgs format 999,999
col spill_msgs format 999,999
set lines 100
select queue_id
, startup_time
, sysdate
, num_msgs
, spill_msgs
from v$buffered_queues
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';
QUEUE_ID STARTUP_TIME SYSDATE NUM_MSGS SPILL_MSGS
-------- ------------------ ------------------ -------- ----------
74026 04-JAN-09 00:30:03 04-JAN-09 00:33:55 0 0
set lines 100
col subscriber_name format a15
col subscriber_type format a15
col startup_time format a18
col total_dequeued_msg format 999,999
select queue_id
, subscriber_name
, subscriber_type
, startup_time
, total_dequeued_msg
from v$buffered_subscribers
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';
QUEUE_ID SUBSCRIBER_NAME SUBSCRIBER_TYPE STARTUP_TIME TOTAL_DEQUEUED_MSG
-------- --------------- --------------- ------------------ ------------------
74026 SUBSCRIBER1 SUBSCRIBER 04-JAN-09 00:30:03 1
74026 SUBSCRIBER2 SUBSCRIBER 04-JAN-09 00:30:03 1
9. Illustrate Flow Control and Spilled Messages
One of the benefit of using buffered messages is that they remain in memory and are not stored on disks. A direct consequence of that is that buffered messages are not logged and as we've seen previously can be lost. The devil being in the details, the memory allocated for the Streams Pool, that store the messages, is not unlimited and compete with other memory pools. To prevent that memory to increase without limit Oracle offers several mechanisms; One is Publisher Flow Control that prevents messages from being enqueued when the messages are not dequeued fast enough. Another one is the ability for messages to spill on disks in the queue table. Here is a simple test case shows these features :
connect aqdemo/aqdemo
begin
for i in 7..100000 loop
demo_enqueue(lpad(to_char(i),20,'0'));
end loop;
end;
/
begin
*
ERROR at line 1:
ORA-25307: Enqueue rate too high, flow control enabled
ORA-06512: at "SYS.DBMS_AQ", line 6
ORA-06512: at "SYS.DBMS_AQ", line 216
ORA-06512: at "AQDEMO.DEMO_ENQUEUE", line 14
ORA-06512: at line 3
If you monitor the content of the queues right after you've enqueued them, you'll see that messages are in the memory associated with the queue:
connect / as sysdba
select queue_id
, startup_time
, sysdate
, num_msgs
, spill_msgs
from v$buffered_queues
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';
QUEUE_ID STARTUP_TIME SYSDATE NUM_MSGS SPILL_MSGS
-------- ------------------ --------- -------- ----------
74026 04-JAN-09 04-JAN-09 4,021 0
select MSG_STATE
, count(*)
from aqdemo.aq$myqueue_table
where queue='MYQUEUE'
group by MSG_STATE;
MSG_STATE COUNT(*)
--------- --------
IN MEMORY 8042
col bytes format 999,999,999
col name format a30
set pages 1000
select name, bytes
from v$sgastat
where pool='streams pool'
order by name;
NAME BYTES
-------------------------- ------------
Sender info 8,484
deqtree_kgqmctx 80
fixed allocation callback 260
free memory 5,702,748
image handles 337,452
kgqbt_alloc_block 99,456
kgqmdm_fl_2 241,288
kgqmsub 144
kodpaih3 image 8,130,416
kwqbcqini:spilledovermsgs 1,968
kwqbdaspl:spilledovermsgs 172,896
kwqbsinfy:bms 305,368
kwqbsinfy:bqg 808
kwqbsinfy:mpr 1,928,180
kwqbsinfy:sta 208
msgtree_kgqmctx 80
name_kgqmsub 32
recov_kgqbtctx 8,192
recov_kgqmctx 616
recov_kgqmsub 336
spilled:kwqbl 2,316
spilled:kwqbm 8,624
substree_kgqmctx 80
time manager index 80
And you'll also see the publisher is "IN FLOW CONTROL":
select publisher_state
from v$buffered_publishers
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';
PUBLISHER_STATE
-----------------------------------------------------------
IN FLOW CONTROL: INSUFFICIENT MEMORY AND UNBROWSED MESSAGES
If you wait for a few minutes, you'll see messages start to spill on disks:
select queue_id
, startup_time
, sysdate
, num_msgs
, spill_msgs
from v$buffered_queues
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';
QUEUE_ID STARTUP_TIME SYSDATE NUM_MSGS SPILL_MSGS
-------- ------------------ --------- -------- ----------
74026 04-JAN-09 04-JAN-09 4,021 4,021
select MSG_STATE, count(*)
from aqdemo.aq$myqueue_table
where queue='MYQUEUE'
group by MSG_STATE;
MSG_STATE COUNT(*)
--------- --------
SPILLED 8042
You can dequeue the messages as you've done already
connect aqdemo/aqdemo
begin
for i in 1..5000 loop
demo_dequeue('SUBSCRIBER1');
demo_dequeue('SUBSCRIBER2');
end loop;
end;
/
10. Clean up the environment
connect / as sysdba
drop procedure aqdemo.demo_enqueue;
drop procedure aqdemo.demo_dequeue;
declare
subscriber sys.aq$_agent;
begin
subscriber := sys.aq$_agent('subscriber1',
'aqdemo.myqueue',
null);
dbms_aqadm.remove_subscriber(
queue_name => 'aqdemo.myqueue',
subscriber => subscriber);
subscriber := sys.aq$_agent('subscriber2',
'aqdemo.myqueue',
null);
dbms_aqadm.remove_subscriber(
queue_name => 'aqdemo.myqueue',
subscriber => subscriber);
end;
/
begin
dbms_aqadm.stop_queue(
queue_name => 'aqdemo.myqueue',
enqueue => true,
dequeue => true);
end;
/
begin
dbms_aqadm.drop_queue(
queue_name => 'aqdemo.myqueue');
end;
/
begin
dbms_aqadm.drop_queue_table(
queue_table =>'aqdemo.myqueue_table');
end;
/
drop user aqdemo;
drop tablespace aq
including contents and datafiles;
Related References:
Master Note for Troubleshooting Advanced Queuing and Oracle Streams Propagation Issues [ID 233099.1]
DBA Views:
DBA_QUEUE_SCHEDULES - Propagation Schedules
DBA_QUEUES - Queues in Database
DBA_QUEUE_TABLES - Queue Tables in Database
DBA_QUEUE_SUBSCRIBERS - Queue Subscribers in Database
QUEUE_PRIVILEGES - Queues for Which User Has Queue Privilege
AQ$Queue_Table_Name - Messages in Queue Table
AQ$Queue_Table_Name_S - Queue Subscribers
AQ$Queue_Table_Name_R - Queue Subscribers and Their Rules
V$BUFFERED_QUEUES - Buffered Queues in the Instance
V$BUFFERED_SUBSCRIBERS - Subscribers for All Buffered Queues in the Instance
For more information on AQ views
http://download.oracle.com/docs/cd/B28359_01/server.111/b28420/aq_views.htm