tech_memo/ORACLE/Oracle_Streams
sqlplus / as sysdba create user strmadmin identified by strmadmin default quota unlimited on users; grant dba to strmadmin; begin dbms_streams_auth.grant_admin_privilege( grantee => 'strmadmin', grant_privileges => true); end; /
conn hr/hr grant all on hr.employees to strmadmin;
CREATE TABLE "HR"."EMPLOYEE_AUDIT" ( "EMPLOYEE_ID" NUMBER(6,0), "FIRST_NAME" VARCHAR2(20), "LAST_NAME" VARCHAR2(25), "EMAIL" VARCHAR2(25), "PHONE_NUMBER" VARCHAR2(20), "HIRE_DATE" DATE, "JOB_ID" VARCHAR2(10), "SALARY" NUMBER(8,2), "COMMISSION_PCT" NUMBER(2,2), "MANAGER_ID" NUMBER(6,0), "DEPARTMENT_ID" NUMBER(4,0), upd_date date, user_name varchar2(30), action varchar2(30) );
grant all on hr.employee_audit to strmadmin;
conn strmadmin/strmadmin create table streams_monitor( date_and_time timestamp(6) default systimestamp, txt_msg clob );
begin dbms_streams_adm.set_up_queue( queue_table => 'strmadmin.streams_queue_table', queue_name => 'strmadmin.streams_queue'); end; /
begin DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'hr.employees', streams_type => 'CAPTURE', streams_name => 'capture_emp', queue_name => 'strmadmin.streams_queue', include_dml => true, include_ddl => false, inclusion_rule => true); end; /
値 | 適用ルール | 意味 |
TRUE | ポジティブ・ルールセット | ルール判定がTRUEの場合、メッセージをエンキューする |
FALSE | ネガティブ・ルールセット | ルール判定がTRUEの場合、メッセージを破棄する |
begin dbms_capture_adm.include_extra_attribute( capture_name => 'capture_emp', attribute_name => 'username', include => true); end; /
declare iscn number; begin iscn := dbms_flashback.get_system_change_number(); dbms_apply_adm.set_table_instantiation_scn( source_object_name => 'hr.employees', source_database_name => 'orcl', instantiation_scn => iscn); end; /
create or replace PROCEDURE emp_dml_handler (in_any IN ANYDATA) IS lcr SYS.LCR$_ROW_RECORD; rc PLS_INTEGER; command varchar2(30); old_values sys.lcr$_row_list; BEGIN -- Access the LCR rc := in_any.GETOBJECT(lcr); command := lcr.get_command_type(); insert into streams_monitor (txt_msg) values (command || dbms_streams.convert_lcr_to_xml(in_any)); lcr.set_command_type('INSERT'); lcr.set_object_name('EMPLOYEE_AUDIT'); if command in ('DELETE', 'UPDATE') then old_values := lcr.get_values('old'); lcr.set_values('new', old_values); lcr.set_values('old', null); end if; lcr.add_column('new', 'UPD_DATE', ANYDATA.ConvertDate(SYSDATE)); lcr.add_column('new', 'user_name', lcr.get_extra_attribute('USERNAME')); lcr.add_column('new', 'ACTION', ANYDATA.ConvertVarchar2(command)); lcr.execute(true); commit; end; /
begin DBMS_APPLY_ADM.SET_DML_HANDLER( object_name => 'hr.employees', object_type => 'TABLE', operation_name => 'INSERT', error_handler => false, user_procedure => 'strmadmin.emp_dml_handler', apply_database_link => NULL, apply_name => 'apply_emp'); DBMS_APPLY_ADM.SET_DML_HANDLER( object_name => 'hr.employees', object_type => 'TABLE', operation_name => 'UPDATE', error_handler => false, user_procedure => 'strmadmin.emp_dml_handler', apply_database_link => NULL, apply_name => 'apply_emp'); DBMS_APPLY_ADM.SET_DML_HANDLER( object_name => 'hr.employees', object_type => 'TABLE', operation_name => 'DELETE', error_handler => false, user_procedure => 'strmadmin.emp_dml_handler', apply_database_link => NULL, apply_name => 'apply_emp'); end; /
declare emp_rule_name_dml varchar2(30); emp_rule_name_ddl varchar2(30); begin DBMS_STREAMS_ADM.ADD_TABLE_RULES( table_name => 'hr.employees', streams_type => 'apply', streams_name => 'apply_emp', queue_name => 'strmadmin.streams_queue', include_dml => true, include_ddl => false, source_database => 'orcl', dml_rule_name => emp_rule_name_dml, ddl_rule_name => emp_rule_name_ddl);
DBMS_APPLY_ADM.set_enqueue_destination( rule_name => emp_rule_name_dml, destination_queue_name => 'strmadmin.streams_queue'); end; /
begin dbms_apply_adm.set_parameter( apply_name => 'apply_emp', parameter => 'disable_on_error', value => 'n'); end; /
begin dbms_apply_adm.start_apply( apply_name => 'apply_emp' ); end; /
begin dbms_capture_adm.start_capture( capture_name => 'capture_emp' ); end; /
UPDATE<ROW_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd"> <source_database_name>ORCL</source_database_name> <command_type>UPDATE</command_type> <object_owner>HR</object_owner> <object_name>EMPLOYEES</object_name> <transaction_id>4.27.1301</transaction_id> <scn>1162078</scn> <old_values> <old_value> <column_name>EMPLOYEE_ID</column_name> <data> <number>1</number> </data> </old_value> <old_value> <column_name>FIRST_NAME</column_name> <data> <varchar2>YAMADA</varchar2> </data> </old_value> <old_value> <column_name>LAST_NAME</column_name> <data> <varchar2>TARO</varchar2> </data> </old_value> <old_value> <column_name>EMAIL</column_name> <data> <varchar2>aaa@aa.co.jp</varchar2> </data> </old_value> <old_value> <column_name>PHONE_NUMBER</column_name> <data> <varchar2>090111111</varchar2> </data> </old_value> <old_value> <column_name>HIRE_DATE</column_name> <data> <date> <value> 2017/03/05 17:37:28</value> <format>SYYYY/MM/DD HH24:MI:SS</format> </date> </data> </old_value> <old_value> <column_name>JOB_ID</column_name> <data> <varchar2>JOB_ID_111</varchar2> </data> </old_value> <old_value> <column_name>SALARY</column_name> <data> <number>10000</number> </data> </old_value> <old_value> <column_name>COMMISSION_PCT</column_name> <data> <number>.2</number> </data> </old_value> <old_value> <column_name>MANAGER_ID</column_name> <data> <number>10</number> </data> </old_value> <old_value> <column_name>DEPARTMENT_ID</column_name> <data> <number>120</number> </data> </old_value> </old_values> <new_values> <new_value> <column_name>FIRST_NAME</column_name> <data> <varchar2>FIRST</varchar2> </data> </new_value> </new_values> <extra_attribute_values> <extra_attribute_value> <attribute_name>USERNAME</attribute_name> <attribute_value> <varchar2>HR</varchar2> </attribute_value> </extra_attribute_value> </extra_attribute_values> </ROW_LCR>
begin dbms_apply_adm.stop_apply( apply_name => 'apply_emp' ); end; / begin dbms_capture_adm.stop_capture( capture_name => 'capture_emp' ); end; /
begin DBMS_APPLY_ADM.SET_DML_HANDLER( object_name => 'hr.employees', object_type => 'TABLE', operation_name => 'INSERT', error_handler => false, user_procedure => null, apply_database_link => NULL, apply_name => 'apply_emp'); end; / begin DBMS_APPLY_ADM.SET_DML_HANDLER( object_name => 'hr.employees', object_type => 'TABLE', operation_name => 'UPDATE', error_handler => false, user_procedure => null, apply_database_link => NULL, apply_name => 'apply_emp'); end; / begin DBMS_APPLY_ADM.SET_DML_HANDLER( object_name => 'hr.employees', object_type => 'TABLE', operation_name => 'DELETE', error_handler => false, user_procedure => null, apply_database_link => NULL, apply_name => 'apply_emp'); end; /
begin DBMS_APPLY_ADM.DELETE_ALL_ERRORS(apply_name=>'apply_emp'); end; /
begin DBMS_APPLY_ADM.DROP_APPLY( apply_name => 'apply_emp', drop_unused_rule_sets => true); end; / begin DBMS_CAPTURE_ADM.DROP_CAPTURE( capture_name => 'capture_emp', drop_unused_rule_sets => true); end; /