DBMS_PIPE Connecting the Sessions Mustafa, 2023-08-182023-08-18 Hi, It has been a long time since I wrote something. It is summer and I was in a long vacation. Well I worked mostly of course but at least get away a little bit. In this post I would like to talk about PIPEs in Oracle Database. It is a way to communicate between sessions. As you know, in Oracle Database, you cannot read another sessions variable values. Let’s say a session made some calculations and generate a result. to inform other sessions about this data vast majority of the systems are using database tables of course and it is the correct way to do it because all data will be safe via redo logs. What if we need more speed and loosing that data is not important? That is where PIPEs are involved. PIPEs allow sessions communicate each other without any transactional operation. which means no insert, no select, no redo and as a result more speed. PIPEs are simply a memory area (a variable you may say) that you put some data into and some other session can read that data and do some stuff. They work as FIFO (First In First Out), you put some messages into the queue and some other session or even the same session can get those messages in order. How does it work: you prepare a message (message may contain one or more different or same data types). during the preparation, your data will be written into a small buffer in your session. When you send the message, data you prepared will be copied into the pipe memory area. your session buffer will be purged. some other session (or the same session) will receive the message from pipe memory area into its own session buffer, message will be purged from the pipe, then unpack the message and get data into variables and do some stuff. Basic components of Pipes Message: Message is a collection of message items. by saying collection, don’t confuse with plsql collections. your messages can contain different type and number of data, for example, your message contains a user information like name, last name, login date, user id etc. you can set as many different “message item” in your messages. you can think them as a “PLSQL Record”. Message Item: Every single entity in your messages. you can put message items in your messages as many as you want or as many as your session buffer/pipe size can hold. when you receive a message, it may contain more then one item in it. For example, received message might contain, Stock Name and Time, so you can send back the price of that stock at that time. Session Buffer: every message item is written into a buffer in your session. When you send the message, all items in the buffer will be pushed into PIPE and the session buffer will be purged. if you don’t send the message, then message items will be stay there as long as the session exists or you send another message. this session buffer size is strict and it is 4096 bytes (when I checked the size in an autonomous database, I saw that it is set to 32767, so autonomous db is always better 🙂 ). PIPE Size: this is the size of the memory area of the pipe. if you keep sending messages to pipe but not receive them eventually memory area fill be full and your “send message” function will hang until some space is available. you can set the size of the pipe while creating it. default is 8192 byte (and again in the cloud autonomous database default is 65536 bytes). let’s create a pipe: Oracle PL/SQL declare v_status number; begin v_status := dbms_pipe.create_pipe('MY_PIPE', maxpipesize=>32768); if v_status != 0 then raise_application_error(-20000, 'PIPE create error'); end if; end; / 123456789 declare v_status number;begin v_status := dbms_pipe.create_pipe('MY_PIPE', maxpipesize=>32768); if v_status != 0 then raise_application_error(-20000, 'PIPE create error'); end if;end;/ create_pipe is a function and if return value is 0 then it is all set, our pipe is ready. This is explicit pipe creation. we can set PIPE size via MaxPipeSize parameter, the memory area in the SGA which will store the sent messages. send a message: Oracle PL/SQL declare v_status NUMBER; BEGIN DBMS_PIPE.PACK_MESSAGE('hello pipe'); --1 v_status := DBMS_PIPE.SEND_MESSAGE('MY_PIPE', timeout=>10, maxpipesize=>32767); --2 IF v_status != 0 THEN --3 raise_application_error(-20099, 'Message Send Error'); END IF; END ; / 123456789101112 declare v_status NUMBER;BEGIN DBMS_PIPE.PACK_MESSAGE('hello pipe'); --1 v_status := DBMS_PIPE.SEND_MESSAGE('MY_PIPE', timeout=>10, maxpipesize=>32767); --2 IF v_status != 0 THEN --3 raise_application_error(-20099, 'Message Send Error'); END IF;END ;/ there are 3 important parts in this message as I added comments. first one is PACK_MESSAGE. this is the “message item” that I mentioned above. every message can contain one or more different data in it. Oracle PIPEs allow 5 different data type in your messages. VARCHAR2, NUMBER, DATE, RAW and ROWID. PACK_MESSAGE procedure is an overloaded procedure and can handle the data types you send. in the example above, we add “hello pipe” varchar2 data into our message. every time you call PACK_MESSAGE, the data will be put into “session buffer” (which is limited to 4096 on prem and 32767 on autonomous database). so, if you try to add data bigger than the session buffer size, you will get “ORA-06558: buffer in dbms_pipe package is full. No more items allowed” error. at that point either you must RESET the buffer via dbms_pipe.reset_buffer function or send the message (even if it is not complete from your perspective). sending message will clear the buffer as well. second part is sending message, SEND_MESSAGE function. only PIPE name is enough to send a message. this function returns the status of sending message. 0: message is sent. 1: timeout. PIPE might be full. in that case your send message will wait for a time, default is 1000 days which is a little bit long. I set it to 10 seconds in my example above and if in 10 seconds message cannot be sent, then it will stop waiting and return the status 1. 3: interrupted, which you shouldn’t face this error ever. there is a third parameter which is the pipe size! why there is a pipe size in send_message? Because you don’t have to create pipes explicitly. they might be created implicitly too. when you run the SEND_MESSAGE function if the pipe is not exist, then it will be created implicitly (automatically) and at that time we can specify the size of that pipe. remember, pipes will be gone after a database restart. when database is started, instead of checking existing pipes and create one if you don’t have it, you can simply call send_message in your application flow and pipe will be created. Third part is checking the result of send_message function. if result is not 0, then your message is not send but that is not all! since your message is not send, that means you have residue data in your session buffer! if send another message, it will also send those residue data too. As a rule of thumb, you should always call DBMS_PIPE.RESET_BUFFER before packing messages. here is another example with some more data: Oracle PL/SQL declare v_status number; begin dbms_pipe.reset_buffer; dbms_pipe.pack_message('hello pipe'); dbms_pipe.pack_message(123); dbms_pipe.pack_message(sysdate); v_status := dbms_pipe.send_message('MY_PIPE', timeout=>10, maxpipesize=>32767); --2 if v_status != 0 then raise_application_error(-20099, 'Message Send Error'); end if; end ; / / 1234567891011121314151617 declare v_status number;begin dbms_pipe.reset_buffer; dbms_pipe.pack_message('hello pipe'); dbms_pipe.pack_message(123); dbms_pipe.pack_message(sysdate); v_status := dbms_pipe.send_message('MY_PIPE', timeout=>10, maxpipesize=>32767); --2 if v_status != 0 then raise_application_error(-20099, 'Message Send Error'); end if;end ;// this message has 3 items in it and they have different data types (could be same data type too of course). How to receive a message? When messages are written into PIPE, any session (if pipe is not the private. private pipes can be read and write by created db user sessions only) can receive the message and when a message is received, it will be purged from the PIPE. no other session can get it too. it is in reverse order of sending the message: Oracle PL/SQL DECLARE v_status NUMBER; v_data VARCHAR2(100); BEGIN dbms_pipe.reset_buffer; v_status := dbms_pipe.receive_message('MY_PIPE', TIMEOUT => 100); IF v_status = 0 THEN dbms_pipe.unpack_message(v_data); dbms_output.put_line(v_data); ELSIF v_status = 1 THEN dbms_output.put_line('timeout! no message'); END IF; END ; / hello pipe PL/SQL procedure successfully completed. 12345678910111213141516171819202122 DECLARE v_status NUMBER; v_data VARCHAR2(100);BEGIN dbms_pipe.reset_buffer; v_status := dbms_pipe.receive_message('MY_PIPE', TIMEOUT => 100); IF v_status = 0 THEN dbms_pipe.unpack_message(v_data); dbms_output.put_line(v_data); ELSIF v_status = 1 THEN dbms_output.put_line('timeout! no message'); END IF; END ;/ hello pipe PL/SQL procedure successfully completed. always reset the buffer first, so don’t mix up your message with some residue data, then call RECEIVE_MESSAGE function. that will receive a message from PIPE into your session buffer. as you see we have a “timeout” parameter which is 1000 days for default. if PIPE has no message and you call receive_message function then it will wait until a message arrives or timeout. in my example I set it to 100 seconds. if timeout occurs then receive_message will return 1 which means no message arrived in defined time range. 0 means a message is received, let’s read it. message is now received and it is copied into our session buffer. now we can unpack that message using UNPACK_MESSAGE. as the pack_message procedure, unpack_message is an overloaded procedure too. you provide a variable as parameter and data will be written into it. Whichever the first message item is packed, it will be unpacked first (FIFO). my example is returned “hello pipe” in this message because it was the first message I sent. lets run the same code second time: Oracle PL/SQL DECLARE v_status NUMBER; v_data VARCHAR2(100); BEGIN dbms_pipe.reset_buffer; v_status := dbms_pipe.receive_message('MY_PIPE', TIMEOUT => 100); IF v_status = 0 THEN dbms_pipe.unpack_message(v_data); dbms_output.put_line(v_data); ELSIF v_status = 1 THEN dbms_output.put_line('timeout! no message'); END IF; END ; / hello pipe PL/SQL procedure successfully completed. 12345678910111213141516171819202122 DECLARE v_status NUMBER; v_data VARCHAR2(100);BEGIN dbms_pipe.reset_buffer; v_status := dbms_pipe.receive_message('MY_PIPE', TIMEOUT => 100); IF v_status = 0 THEN dbms_pipe.unpack_message(v_data); dbms_output.put_line(v_data); ELSIF v_status = 1 THEN dbms_output.put_line('timeout! no message'); END IF; END ;/ hello pipe PL/SQL procedure successfully completed. same result? what happened to 123 and sysdate value. well we received that message but we didn’t unpack them and they are gone! if you run the code again it will just wait for 100 seconds to get a new message. not cool! we shouldn’t be skipping to get all packed message items. there is one more important thing here. we must use the same data type variable with the message item to get it. even if the data conversion is available, it will raise an error. if message item is a number, then it must be unpacked with a number variable. Oracle PL/SQL declare v_status number; begin dbms_pipe.reset_buffer; dbms_pipe.pack_message('hello pipe'); dbms_pipe.pack_message(123); dbms_pipe.pack_message(sysdate); v_status := dbms_pipe.send_message('MY_PIPE', timeout=>10, maxpipesize=>32767); --2 if v_status != 0 then raise_application_error(-20099, 'Message Send Error'); end if; end ; / DECLARE v_status NUMBER; v_data VARCHAR2(100); BEGIN dbms_pipe.reset_buffer; v_status := dbms_pipe.receive_message('MY_PIPE', TIMEOUT => 10); IF v_status = 0 THEN dbms_pipe.unpack_message(v_data); dbms_pipe.unpack_message(v_data); dbms_output.put_line(v_data); ELSIF v_status = 1 THEN dbms_output.put_line('timeout! no message'); END IF; END ; / Error report - ORA-06559: wrong datatype requested, 9, actual datatype is 6 ORA-06512: at "SYS.DBMS_PIPE", line 80 ORA-06512: at line 11 06559. 00000 - "wrong datatype requested, %s, actual datatype is %s" *Cause: The sender put different datatype on the pipe than that being requested (package 'dbms_pipe'). The numbers are: 6 - number, 9 - char, 12 - date. *Action: Check that the sender and receiver agree on the number and types of items placed on the pipe. 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647 declare v_status number;begin dbms_pipe.reset_buffer; dbms_pipe.pack_message('hello pipe'); dbms_pipe.pack_message(123); dbms_pipe.pack_message(sysdate); v_status := dbms_pipe.send_message('MY_PIPE', timeout=>10, maxpipesize=>32767); --2 if v_status != 0 then raise_application_error(-20099, 'Message Send Error'); end if;end ;/ DECLARE v_status NUMBER; v_data VARCHAR2(100);BEGIN dbms_pipe.reset_buffer; v_status := dbms_pipe.receive_message('MY_PIPE', TIMEOUT => 10); IF v_status = 0 THEN dbms_pipe.unpack_message(v_data); dbms_pipe.unpack_message(v_data); dbms_output.put_line(v_data); ELSIF v_status = 1 THEN dbms_output.put_line('timeout! no message'); END IF; END ;/ Error report -ORA-06559: wrong datatype requested, 9, actual datatype is 6ORA-06512: at "SYS.DBMS_PIPE", line 80ORA-06512: at line 1106559. 00000 - "wrong datatype requested, %s, actual datatype is %s"*Cause: The sender put different datatype on the pipe than that being requested (package 'dbms_pipe'). The numbers are: 6 - number, 9 - char, 12 - date.*Action: Check that the sender and receiver agree on the number and types of items placed on the pipe. I send the same message again and try to unpack it twice into v_data variable. since the second message item is a NUMBER, it raise “ORA-06559: wrong datatype requested, 9, actual datatype is 6”. Looking at that message we can learned that those data types has code numbers. 9 is for VARCHAR2 and 6 is for NUMBER. what will happen if we don’t know how many message item exists and what the data type they have. a simple loop with a new function will help us: Oracle PL/SQL declare v_status number; begin dbms_pipe.reset_buffer; dbms_pipe.pack_message('hello pipe'); dbms_pipe.pack_message(123); dbms_pipe.pack_message(sysdate); v_status := dbms_pipe.send_message('MY_PIPE', timeout=>10, maxpipesize=>32767); --2 if v_status != 0 then raise_application_error(-20099, 'Message Send Error'); end if; end ; / declare v_receive_result NUMBER; procedure ip_unpack_message is v_varchar2 varchar2(4000); v_date date; v_number number; v_rowid rowid; v_raw raw(2000); v_next_item number; begin v_next_item := dbms_pipe.next_item_type; while v_next_item > 0 loop if v_next_item = 6 then dbms_pipe.UNPACK_MESSAGE(v_number); v_varchar2 := 'Number: ' || to_char(v_number); elsif v_next_item = 9 then dbms_pipe.UNPACK_MESSAGE(v_varchar2); v_varchar2 := 'Varchar2: ' || v_varchar2; elsif v_next_item = 11 then dbms_pipe.UNPACK_MESSAGE_ROWID(v_rowid); v_varchar2 := 'Rowid: ' || rowidtochar(v_rowid); elsif v_next_item = 12 then dbms_pipe.UNPACK_MESSAGE(v_date); v_varchar2 := 'Date: ' || to_char(v_date, 'YYYY-MM-DD HH24:MI:SS'); elsif v_next_item = 23 then dbms_pipe.UNPACK_MESSAGE_RAW(v_raw); v_varchar2 := 'Raw: ' || Rawtohex(v_raw); else v_varchar2 := 'Unknown Data Type: ' || to_char(v_next_item); end if; dbms_output.put_line(v_varchar2); v_next_item := dbms_pipe.next_item_type; end loop; end ip_unpack_message; begin dbms_pipe.reset_buffer; v_receive_result := dbms_pipe.receive_message('MY_PIPE'); if v_receive_result = 0 then ip_unpack_message; elsif v_receive_result = 1 then dbms_output.put_line('timeout! no new message'); else raise_application_error(-20000, 'error while reading the message'); end if; end; / Varchar2: hello pipe Number: 123 Date: 2023-08-18 12:34:38 PL/SQL procedure successfully completed. 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 declare v_status number;begin dbms_pipe.reset_buffer; dbms_pipe.pack_message('hello pipe'); dbms_pipe.pack_message(123); dbms_pipe.pack_message(sysdate); v_status := dbms_pipe.send_message('MY_PIPE', timeout=>10, maxpipesize=>32767); --2 if v_status != 0 then raise_application_error(-20099, 'Message Send Error'); end if;end ;/ declare v_receive_result NUMBER; procedure ip_unpack_message is v_varchar2 varchar2(4000); v_date date; v_number number; v_rowid rowid; v_raw raw(2000); v_next_item number; begin v_next_item := dbms_pipe.next_item_type; while v_next_item > 0 loop if v_next_item = 6 then dbms_pipe.UNPACK_MESSAGE(v_number); v_varchar2 := 'Number: ' || to_char(v_number); elsif v_next_item = 9 then dbms_pipe.UNPACK_MESSAGE(v_varchar2); v_varchar2 := 'Varchar2: ' || v_varchar2; elsif v_next_item = 11 then dbms_pipe.UNPACK_MESSAGE_ROWID(v_rowid); v_varchar2 := 'Rowid: ' || rowidtochar(v_rowid); elsif v_next_item = 12 then dbms_pipe.UNPACK_MESSAGE(v_date); v_varchar2 := 'Date: ' || to_char(v_date, 'YYYY-MM-DD HH24:MI:SS'); elsif v_next_item = 23 then dbms_pipe.UNPACK_MESSAGE_RAW(v_raw); v_varchar2 := 'Raw: ' || Rawtohex(v_raw); else v_varchar2 := 'Unknown Data Type: ' || to_char(v_next_item); end if; dbms_output.put_line(v_varchar2); v_next_item := dbms_pipe.next_item_type; end loop; end ip_unpack_message;begin dbms_pipe.reset_buffer; v_receive_result := dbms_pipe.receive_message('MY_PIPE'); if v_receive_result = 0 then ip_unpack_message; elsif v_receive_result = 1 then dbms_output.put_line('timeout! no new message'); else raise_application_error(-20000, 'error while reading the message'); end if;end;/ Varchar2: hello pipeNumber: 123Date: 2023-08-18 12:34:38 PL/SQL procedure successfully completed. sending the same message first and then receiving the message. this time I call “ip_unpack_message” internal funciton. That function calls “NEXT_ITEM_TYPE” function in dbms_pipe. it loops every item in the message. next_item_type will return next message items data type. if it returns 0, this means there is no message item in the message. so if you have some kind of a “daemon” or a “slave” process to do some stuff, you can simply put the code above into an infinite loop and run in a scheduler job: Oracle PL/SQL ... suppressed begin loop dbms_pipe.reset_buffer; v_receive_result := dbms_pipe.receive_message('MY_PIPE'); if v_receive_result = 0 then ip_unpack_message; elsif v_receive_result = 1 then dbms_output.put_line('timeout! no new message'); else raise_application_error(-20000, 'error while reading the message'); end if; end loop; end; / 1234567891011121314151617 ... suppressedbegin loop dbms_pipe.reset_buffer; v_receive_result := dbms_pipe.receive_message('MY_PIPE'); if v_receive_result = 0 then ip_unpack_message; elsif v_receive_result = 1 then dbms_output.put_line('timeout! no new message'); else raise_application_error(-20000, 'error while reading the message'); end if; end loop;end;/ this scheduled job will run for forever and whenever a session put a message into PIPE, it will receive and process it. you can remove a pipe via REMOVE_PIPE function and also purge a pipe with PURGE procedure as well. Please remember, PIPEs are not transactional operations so that they are not protected with redo logs. This means if the instance is crashed, then all data in PIPE will be lost but they are quite fast and responsive. They are implemented in C language not plsql. You can implement in sql / plsql whatever you can do in pipes but it will require more work and will consume more resources. Let’s say you want to implement a module that runs OS commands that the sessions requires (never do that such stuff). You can create a table called “COMMAND_QUEUEU” and create a procedure that reads this table and if new rows are found, run the commands that is required and finally create a job which calls this procedure for ever 2 seconds (or 10 seconds or 1 minute etc). Down side of this technic, even if there is no new commands in the table your job will run every 2 seconds and run the query against the table and check the rows. When you implement this in PIPEs, receiver will wait until a new message arrives and that has no cost at all. Also process will be instantaneous. when a session requires a command to run, it will write it into pipe and receiver process will receive it immediately and process it. As I mentioned in the post, Autonomous database has more functionality on DBMS_PIPE like singleton pipes and shelf life etc. ATP is in 19.20 version now but on prem 19.20 does not have this functionality but when I checked 23c developer edition I can confirm that it has these functionalities. So, can’t wait to use 23c. edit: I just checked the 23c documentation and the functionalities I mentioned are not listed. Maybe 23c developer edition has but actual 23c will not have them or documentation might not be updated. there are other stuff too like every message item has an overhead (based on message item data type, for example 3 bytes for numbers, 6 bytes for varchar2 if db is using unicode charset etc), message send / receive buffer etc. Maybe I can post about them and run some tests. I hope this helps you in somewhere. wish you all healthy beautiful days. 11g 12c 18c 19c 21c 23ai Administration Development Useful Scripts communication between sessionsdbms_pipepipepipessession communication
Really cool. I’ve seen you put three different messages into the same pipe. Can session 101 read (unpack) the first message (‘hello pipe’), another session, say 202 read the second message (123) and session 303 read the third message (sysdate) ? Or would session 101 “erase” all three messages ? Alternatively, can the sender send the three messages into 3 different named pipes, within the same pl/sql block (nested pl/sqls) ? Reply
Hello Hemant, sorry for my late reply, for the first question, well that is a good question. only the received message will be erased from the pipe. other messages will be intact in pipe and also can be read by another receiver process. if pipe has 1500 messages and you have 3 receiver process (session) then they will receive messages on by one but they all might not get same number of messages. some messages may take more time to process based on the business logic etc. for your second question, you can send messages into different pipes as many as you wish. here is a sample code: declare xx number ; begin xx := dbms_pipe.create_pipe('MY_PIPE1', private=> true, maxpipesize => 8192*4) ; xx := dbms_pipe.create_pipe('MY_PIPE2', private=> true, maxpipesize => 8192*4) ; xx := dbms_pipe.create_pipe('MY_PIPE2', private=> true, maxpipesize => 8192*4) ; end; / declare v_status number; begin dbms_pipe.reset_buffer; for i in 1..3 loop dbms_pipe.pack_message('hello pipe' ||to_char(i)); v_status := dbms_pipe.send_message('MY_PIPE' || to_char(i), timeout=>10, maxpipesize=>32767); --2 end loop; if v_status != 0 then raise_application_error(-20099, 'Message Send Error'); end if; end ; / --receiver declare v_varchar2 varchar2(4000); v_receive_result number; begin for i in 1..3 loop dbms_pipe.reset_buffer; v_receive_result := dbms_pipe.receive_message('MY_PIPE' || to_char(i), 0); if v_receive_result = 0 then dbms_pipe.UNPACK_MESSAGE(v_varchar2); dbms_output.put_line(v_varchar2); end if; end loop; end; / Reply