tech_memo / Oracle / Oracle_Streams / stepByStep_memo


tech_memo/ORACLE/Oracle_Streams

概要

  • HRスキーマのEMPLOYEESテーブルに対するDMLを記録する設定手順

構築

0. サプリメンタルロギング

  • HRのEMPLOYEESテーブルに設定が必要かも?

1. Stream管理ユーザ作成

  • strmadminスキーマを管理ユーザとする
    sqlplus / as sysdba
    
    create user strmadmin identified by strmadmin default quota unlimited on users;
     
    grant dba to strmadmin;
    
    begin
    	dbms_streams_auth.grant_admin_privilege(
    		grantee => 'strmadmin',
    		grant_privileges => true);
    end;
    /
    
  • HRのEMPLOYEESテーブルへの権限を付与
    conn hr/hr
    
    grant all on hr.employees to strmadmin;

2. EMPLOYEESの変更を記録するテーブル作成

  • EMPLOYEESテーブルの全カラムに加えて、更新時刻、更新ユーザ名、DMLの種類を記録できるようにする。この表の所有者はHR。
    CREATE TABLE "HR"."EMPLOYEE_AUDIT"
       (    "EMPLOYEE_ID" NUMBER(6,0),
        "FIRST_NAME" VARCHAR2(20),
        "LAST_NAME" VARCHAR2(25),
        "EMAIL" VARCHAR2(25),
        "PHONE_NUMBER" VARCHAR2(20),
        "HIRE_DATE" DATE,
        "JOB_ID" VARCHAR2(10),
        "SALARY" NUMBER(8,2),
        "COMMISSION_PCT" NUMBER(2,2),
        "MANAGER_ID" NUMBER(6,0),
        "DEPARTMENT_ID" NUMBER(4,0),
           upd_date        date,
           user_name       varchar2(30),
           action          varchar2(30)
    );
    
  • strmadminユーザに上記テーブルの権限付与。
    grant all on hr.employee_audit to strmadmin;

3. 行LCRの中身を記録するテーブル作成(必須ではない)

  • captureプロセスがエンキューした行LCRの中身を見れるように作成
    conn strmadmin/strmadmin
    
    create table streams_monitor(
           date_and_time   timestamp(6)    default systimestamp,
           txt_msg clob
    );

4. キューとキュー表の作成

  • 下記プロシージャで作成
    begin
           dbms_streams_adm.set_up_queue(
                   queue_table => 'strmadmin.streams_queue_table',
                   queue_name => 'strmadmin.streams_queue');
    end;
    /

5. Captureプロセスの作成と、ルール作成

  • Captureプロセスが監視するテーブルとエンキュー先の指定
    begin
    DBMS_STREAMS_ADM.ADD_TABLE_RULES(
            table_name              =>  'hr.employees',
            streams_type    =>  'CAPTURE',
            streams_name    =>  'capture_emp',
            queue_name              =>  'strmadmin.streams_queue',
            include_dml             =>  true,
            include_ddl             =>  false,
            inclusion_rule   =>  true);
    end;
    /
    • inclusion_rule オプション
      適用ルール意味
      TRUEポジティブ・ルールセットルール判定がTRUEの場合、メッセージをエンキューする
      FALSEネガティブ・ルールセットルール判定がTRUEの場合、メッセージを破棄する

6. 行LCRに情報を追加

  • CaptureプロセスがREDOからLCRを取得して、Captured LCR(取得LCR)を作成する際に、DML発行ユーザ名を含めるようにする
    begin
           dbms_capture_adm.include_extra_attribute(
                   capture_name => 'capture_emp',
                   attribute_name => 'username',
                   include => true);
    end;
    /

7. 変更監視を開始するSCNの設定

  • 現在のSCNの値を設定。監視対象のDBとテーブルを一緒に指定する
    declare
           iscn    number;
    begin
           iscn := dbms_flashback.get_system_change_number();
           dbms_apply_adm.set_table_instantiation_scn(
                   source_object_name => 'hr.employees',
                   source_database_name => 'orcl',
                   instantiation_scn => iscn);
    end;
    /

8. DMLハンドラ作成

  • Applyプロセスが実行するDMLハンドラプロシージャを作成
    create or replace PROCEDURE emp_dml_handler (in_any IN ANYDATA)
    IS
            lcr     SYS.LCR$_ROW_RECORD;
            rc      PLS_INTEGER;
            command varchar2(30);
           old_values sys.lcr$_row_list;
     BEGIN
            -- Access the LCR
            rc := in_any.GETOBJECT(lcr);
            command := lcr.get_command_type();
            insert into streams_monitor (txt_msg) values (command || dbms_streams.convert_lcr_to_xml(in_any));
    
            lcr.set_command_type('INSERT');
            lcr.set_object_name('EMPLOYEE_AUDIT');
    
           if command in ('DELETE', 'UPDATE') then
                   old_values := lcr.get_values('old');
                   lcr.set_values('new', old_values);
                   lcr.set_values('old', null);
           end if;
    
           lcr.add_column('new', 'UPD_DATE', ANYDATA.ConvertDate(SYSDATE));
           lcr.add_column('new', 'user_name', lcr.get_extra_attribute('USERNAME'));
           lcr.add_column('new', 'ACTION', ANYDATA.ConvertVarchar2(command));
           lcr.execute(true);
           commit;
    end;
    /

9. ApplyプロセスとDMLハンドラプロシージャの紐づけ

  • INSERT, UPDATE, DELETEをそれぞれで定義する
    begin 
    DBMS_APPLY_ADM.SET_DML_HANDLER(
            object_name             => 'hr.employees',
            object_type             => 'TABLE',
            operation_name  => 'INSERT',
            error_handler   => false,
            user_procedure  => 'strmadmin.emp_dml_handler',
            apply_database_link     => NULL,
            apply_name              => 'apply_emp');
    DBMS_APPLY_ADM.SET_DML_HANDLER(
            object_name             => 'hr.employees',
            object_type             => 'TABLE',
            operation_name  => 'UPDATE',
            error_handler   => false,
            user_procedure  => 'strmadmin.emp_dml_handler',
            apply_database_link     => NULL,
            apply_name              => 'apply_emp');
    DBMS_APPLY_ADM.SET_DML_HANDLER(
            object_name             => 'hr.employees',
            object_type             => 'TABLE',
            operation_name  => 'DELETE',
            error_handler   => false,
            user_procedure  => 'strmadmin.emp_dml_handler',
            apply_database_link     => NULL,
            apply_name              => 'apply_emp');
    end;
    /

10. Applyプロセスの作成とルール設定

  • 処理対象のテーブルと表などを設定
    declare
           emp_rule_name_dml       varchar2(30);
           emp_rule_name_ddl       varchar2(30);
    begin
    DBMS_STREAMS_ADM.ADD_TABLE_RULES(
            table_name              =>  'hr.employees',
            streams_type    =>  'apply',
            streams_name    =>  'apply_emp',
            queue_name              =>  'strmadmin.streams_queue',
            include_dml             =>  true,
            include_ddl             =>  false,
            source_database         =>  'orcl',
            dml_rule_name   =>  emp_rule_name_dml,
            ddl_rule_name   =>  emp_rule_name_ddl);
    • emp_rule_name_dmlおよび、emp_rule_name_ddlはOUTパラメータで、上記プロシージャ実行により値が設定される模様
  • 指定したルールを満たすメッセージが適用コンポーネントによって自動的にエンキューされるキューを設定 (するらしい)
    DBMS_APPLY_ADM.set_enqueue_destination(
           rule_name => emp_rule_name_dml,
           destination_queue_name => 'strmadmin.streams_queue');
    end;
    /

11. Applyプロセスパラメータ設定

  • 未解決エラーがあっても、Applyプロセスが動き続けるようパラメータ設定
    begin
    dbms_apply_adm.set_parameter(
           apply_name => 'apply_emp',
           parameter => 'disable_on_error',
           value => 'n');
    end;
    /

12. Applyプロセス開始

begin
dbms_apply_adm.start_apply(
       apply_name => 'apply_emp'
);
end;
/

13. Captureプロセス開始

begin
dbms_capture_adm.start_capture(
       capture_name => 'capture_emp'
);
end;
/

結果

  • 1行UPDATEしたときの、streams_monitorの中身
    UPDATE<ROW_LCR xmlns="http://xmlns.oracle.com/streams/schemas/lcr" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.oracle.com/streams/schemas/lcr  http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
     <source_database_name>ORCL</source_database_name>
     <command_type>UPDATE</command_type>
     <object_owner>HR</object_owner>
     <object_name>EMPLOYEES</object_name>
     <transaction_id>4.27.1301</transaction_id>
     <scn>1162078</scn>
     <old_values>
       <old_value>
         <column_name>EMPLOYEE_ID</column_name>
         <data>
           <number>1</number>
         </data>
       </old_value>
       <old_value>
         <column_name>FIRST_NAME</column_name>
         <data>
           <varchar2>YAMADA</varchar2>
         </data>
       </old_value>
       <old_value>
         <column_name>LAST_NAME</column_name>
         <data>
           <varchar2>TARO</varchar2>
         </data>
       </old_value>
       <old_value>
         <column_name>EMAIL</column_name>
         <data>
           <varchar2>aaa@aa.co.jp</varchar2>
         </data>
       </old_value>
       <old_value>
         <column_name>PHONE_NUMBER</column_name>
         <data>
           <varchar2>090111111</varchar2>
         </data>
       </old_value>
       <old_value>
         <column_name>HIRE_DATE</column_name>
         <data>
           <date>
             <value> 2017/03/05 17:37:28</value>
             <format>SYYYY/MM/DD HH24:MI:SS</format>
           </date>
         </data>
       </old_value>
       <old_value>
         <column_name>JOB_ID</column_name>
         <data>
           <varchar2>JOB_ID_111</varchar2>
         </data>
       </old_value>
       <old_value>
         <column_name>SALARY</column_name>
         <data>
           <number>10000</number>
         </data>
       </old_value>
       <old_value>
         <column_name>COMMISSION_PCT</column_name>
         <data>
           <number>.2</number>
         </data>
       </old_value>
       <old_value>
         <column_name>MANAGER_ID</column_name>
         <data>
           <number>10</number>
         </data>
       </old_value>
       <old_value>
         <column_name>DEPARTMENT_ID</column_name>
         <data>
           <number>120</number>
         </data>
       </old_value>
     </old_values>
     <new_values>
       <new_value>
         <column_name>FIRST_NAME</column_name>
         <data>
           <varchar2>FIRST</varchar2>
         </data>
       </new_value>
     </new_values>
     <extra_attribute_values>
       <extra_attribute_value>
         <attribute_name>USERNAME</attribute_name>
         <attribute_value>
           <varchar2>HR</varchar2>
         </attribute_value>
       </extra_attribute_value>
     </extra_attribute_values>
    </ROW_LCR>

削除

  • apply, capture停止
    begin
    dbms_apply_adm.stop_apply(
           apply_name => 'apply_emp'
    );
    end;
    /
    
    begin
    dbms_capture_adm.stop_capture(
           capture_name => 'capture_emp'
    );
    end;
    /
  • dml_handler解除
    begin
    DBMS_APPLY_ADM.SET_DML_HANDLER(
           object_name             => 'hr.employees',
           object_type             => 'TABLE',
           operation_name  => 'INSERT',
           error_handler   => false,
           user_procedure  => null,
           apply_database_link     => NULL,
           apply_name              => 'apply_emp');
    end;
    /
    
    begin
    DBMS_APPLY_ADM.SET_DML_HANDLER(
           object_name             => 'hr.employees',
           object_type             => 'TABLE',
           operation_name  => 'UPDATE',
           error_handler   => false,
           user_procedure  => null,
           apply_database_link     => NULL,
           apply_name              => 'apply_emp');
    end;
    /
    
    begin
    DBMS_APPLY_ADM.SET_DML_HANDLER(
           object_name             => 'hr.employees',
           object_type             => 'TABLE',
           operation_name  => 'DELETE',
           error_handler   => false,
           user_procedure  => null,
           apply_database_link     => NULL,
           apply_name              => 'apply_emp');
    end;
    /
  • エラーQUEUEの中身削除
    begin
    DBMS_APPLY_ADM.DELETE_ALL_ERRORS(apply_name=>'apply_emp');
    end;
    /
  • apply, caputure削除
    begin
    DBMS_APPLY_ADM.DROP_APPLY(
        apply_name => 'apply_emp',
        drop_unused_rule_sets => true);
    end;
    /
    
    begin
    DBMS_CAPTURE_ADM.DROP_CAPTURE(
        capture_name => 'capture_emp',
        drop_unused_rule_sets => true);
    end;
    /