スケーラビリティおよびコンシューマ グループ - Access Module

Teradata® Tools and Utilities Access Moduleリファレンス

Product
Access Module
Release Number
17.10
Published
2021年10月
Language
日本語
Last Update
2021-11-16
dita:mapPath
ja-JP/uur1608578381725.ditamap
dita:ditavalPath
ja-JP/obe1474387269547.ditaval
dita:id
B035-2425
Product Category
Teradata Tools and Utilities

スケーラビリティは、Teradata Parallel TransporterのUNION ALL機能を使用して実現できます。

以下の例では、Teradata Parallel Transporter Exportオペレータを使用してKafkaサーバーにデータを書き込み、Teradata Parallel Transporter Loadオペレータを使用してそのデータを読み取ってデータベースに戻します。Teradata Parallel Transporter Loadオペレータは、Teradata Parallel Transporter DataConnectorオペレータのコピーを3つ呼び出します。Teradata Parallel Transporter DataConnectorオペレータの各コピーは、Teradata Access Module for Kafkaのコピーを接続します。Teradata Kafka Access Moduleの各コピーは、次のように、特定のパーティションからのメッセージを消費します。
  • Teradata Access Module for Kafkaのコピー1は、"Partition: 0"からコンシュームします。
  • Teradata Access Module for Kafkaのコピー2は、"Partition: 1"からコンシュームします。
  • Teradata Access Module for Kafkaのコピー3は、"Partition: 2"からコンシュームします。

Teradata Access Module for Kafkaのすべてのコピーをコンシューマ グループとしてKafkaサーバーに宣言できるようにするには、次の例のインポート シナリオのように、初期化文字列で-X group.id=testjobオプションを使用します。

エクスポート シナリオの例

ジョブ変数ファイル

<>大括弧内のパラメータを適切に置き換え、<>大括弧を削除する必要があります。

/********************************************************/
/* TPT attributes - Common for all Samples              */
/********************************************************/
TargetTdpId = '<Teradata Database ID>'
,TargetUserName = '<TargetUserName>'
,TargetUserPassword = '<TargetUserPassword>'
,TargetErrorList = [ '3706','3803','3807' ]
,DDLPrivateLogName = 'DDL_OPERATOR_LOG'

/********************************************************/
/* TPT EXPORT Operator attributes                       */
/********************************************************/

,ExportPrivateLogName = 'EXPORT_OPERATOR_LOG'
,SourceTdpId = '<Teradata Database ID>'
,SourceUserName = '<SourceUserName>'
,SourceUserPassword = '<SourceUserPassword>'

/********************************************************/
/* TPT DataConnector Consumer Operator                  */
/********************************************************/

,FileWriterFormat = 'Formatted'
,FileWriterPrivateLogName = 'FILE_WRITER_LOG'
,FileWriterOpenMode = 'Write'

/********************************************************/
/* APPLY STATEMENT parameters */
/********************************************************/
,ExportInstances = 1
,WriterInstances = 1

ジョブのセットアップ

DEFINE JOB CREATE_SOURCE_TABLE_IN_TERADATA
DESCRIPTION 'Create a source table in Teradata'
(
    STEP CREATE_AND_POPULATE_SOURCE_TABLE
    (
        APPLY
        ('DROP TABLE KAFKAW001_src;'),
        ('CT KAFKAW001_src
          (
              Associate_Id INTEGER,
              Associate_Name CHAR(25),
              Salary FLOAT,
              DOJ DATE,
              Designation VARCHAR(25),
              Loan_Amount DECIMAL(5,2),
              Martial_Status CHAR(1),
              No_Of_Dependents BYTEINT
          );
        '),
        (' Ins KAFKAW001_src( 1,''Morgan Tremblay'',10.12,''99-03-20'',
                              ''Software Engineer'',110.12,''y'',1);
           Ins KAFKAW001_src( 2,''Chalmers Clayton'',11.12,''99-03-21'',
                              ''Technical Architect'',111.12,''n'',2);
           Ins KAFKAW001_src( 3,''Francis Cochran'',12.12,''99-03-22'',
                              ''Software Engineer'',112.12,''n'',2);

           Ins KAFKAW001_src( 4,''Hector Nielsen'',13.12,''99-03-23'',
                              ''Software Engineer'',113.12,''y'',4);
           Ins KAFKAW001_src( 5,''Sherlock Fisher'',14.12,''99-03-24'',
                              ''HR Manager'',114.12,''n'',3);
           Ins KAFKAW001_src( 6,''Orson Sanchez'',15.12,''99-03-25'',
                              ''Software Engineer'',115.12,''y'',6);
           Ins KAFKAW001_src( 7,''Gregory Benton'',16.12,''99-03-26'',
                              ''Department Manager'',116.12,''y'',1);
           Ins KAFKAW001_src( 8,''Bryant McCabe'',17.12,''99-03-27'',
                              ''Software Engineer'',117.12,''y'',1);
           Ins KAFKAW001_src( 9,''Delmar Halsey'',18.12,''99-03-28'',
                              ''Facility Manager'',118.12,''n'',4);
           Ins KAFKAW001_src(10,''Aldrich Jones'',19.12,''99-03-29'',
                              ''Floor Manager'',119.12,''n'',5);
           Ins KAFKAW001_src(11,''Unwin Russell'',20.12,''99-03-30'',
                              ''Software Tester II'',120.12,''n'',6);
           Ins KAFKAW001_src(12,''Gabriel Cooper'',21.12,''99-03-19'',
                              ''Software Tester'',121.12,''n'',2);
           Ins KAFKAW001_src(13,''Willis James'',22.12,''99-03-18'',

                              ''Build Engineer'',122.12,''y'',2);
           Ins KAFKAW001_src(14,''Phineas Campbell'',23.12,''99-03-17'',
                              ''Software Engineer'',123.12,''y'',2);
           Ins KAFKAW001_src(15,''Rupert Butler'',24.12,''99-03-16'',
                              ''Design Engineer'',124.12,''n'',3);
           Ins KAFKAW001_src(16,''Toby Ortiz'',25.12,''99-03-15'',
                              ''Software Engineer'',125.12,''y'',4);
           Ins KAFKAW001_src(17,''Hardy Peterson'',26.12,''99-03-14'',
                              ''Software Engineer III'',126.12,''n'',2);
           Ins KAFKAW001_src(18,''Zane Morgan'',27.12,''99-03-13'',
                              ''Software Engineer'',127.12,''n'',3);
           Ins KAFKAW001_src(19,''Igor Smith'',28.12,''99-03-12'',
                              ''Software Engineer'',128.12,''y'',2);
           Ins KAFKAW001_src(20,''Edric Davidson'',29.12,''99-03-11'',
                              ''Software Engineer II'',129.12,''y'',4);
           Ins KAFKAW001_src
               Sel Associate_Id + 20, Associate_Name, Salary, DOJ, Designation,
               Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
           Ins KAFKAW001_src
               Sel Associate_Id + 40, Associate_Name, Salary, DOJ, Designation,
               Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
           Ins KAFKAW001_src
               Sel Associate_Id + 80, Associate_Name, Salary, DOJ, Designation,
               Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;

           Ins KAFKAW001_src
               Sel Associate_Id + 160, Associate_Name, Salary, DOJ, Designation,
               Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
           Ins KAFKAW001_src
               Sel Associate_Id + 320, Associate_Name, Salary, DOJ, Designation,
               Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
           Ins KAFKAW001_src
               Sel Associate_Id + 640, Associate_Name, Salary, DOJ, Designation,
               Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
           Ins KAFKAW001_src
               Sel Associate_Id + 1280, Associate_Name, Salary, DOJ, Designation,
               Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
        ')
         TO OPERATOR ($DDL);
    );

);

ジョブのエクスポート

<>大括弧内のパラメータを適切に置き換え、<>大括弧を削除する必要があります。

DEFINE JOB EXPORT_FROM_TERADATA
DESCRIPTION 'Export data from Teradata to Kafka Server'
(
    STEP EXPORT_THE_DATA
    (
        APPLY TO OPERATOR ( $FILE_WRITER()
            ATTR
            (
                AccessModuleName = 'libkafkaaxsmod.so',
                AccessModuleInitStr = '-M P -T <TopicName> -B
                  <BrokerID> -P 0,1,2 -S y -TL 3 -BKSZ 1200
                  -X compression.codec=gzip'
            )
        )
        SELECT * FROM OPERATOR ( $EXPORT
            ATTR
            (
                SelectStmt = 'SELECT * FROM KAFKAW001_src;'
            )
        );
    );
);

インポート シナリオの例

ジョブ変数ファイル

<>大括弧内のパラメータを適切に置き換え、<>大括弧を削除する必要があります。

/********************************************************/
/* TPT attributes - Common for all Samples              */
/********************************************************/
TargetTdpId = '<Teradata Database ID>'
,TargetUserName = '<TargetUserName>'
,TargetUserPassword = '<TargetUserPassword>'
,TargetErrorList = [ '3706','3803','3807' ]
,DDLPrivateLogName = 'DDL_OPERATOR_LOG'

/********************************************************/
/* TPT LOAD Operator attributes                         */
/********************************************************/

,LoadPrivateLogName = 'LOAD_OPERATOR_LOG'
,LoadTargetTable = 'KAFKAW001'
,LoadLogTable = 'KAFKAW001_log'
,LoadErrorTable1 = 'KAFKAW001_e1'
,LoadErrorTable2 = 'KAFKAW001_e2'

/********************************************************/
/* TPT DataConnector Producer Operator                  */
/********************************************************/

,FileReaderFormat = 'Formatted'
,FileReaderOpenMode = 'Read'

/********************************************************/
/* APPLY STATEMENT parameters                           */
/********************************************************/
,LoadInstances = 1
,ReaderInstances = 1
ジョブのセットアップ
DEFINE JOB IMPORT_TO_TERADATA_SETUP
DESCRIPTION 'Import data to Teradata from Kafka Server'
(
    STEP CLEANUP_CREATE_TABLE_STEP
    (
        APPLY
        (' DROP TABLE KAFKAW001 '),
        (' DROP TABLE KAFKAW001_e1 '),
        (' DROP TABLE KAFKAW001_e2 '),
        (' DROP TABLE KAFKAW001_log '),
        ('CT KAFKAW001
          (
              Associate_Id INTEGER,
              Associate_Name CHAR(25),
              Salary FLOAT,
              DOJ DATE,
              Designation VARCHAR(25),
              Loan_Amount DECIMAL(5,2),
              Martial_Status CHAR(1),
              No_Of_Dependents BYTEINT
          );
        ')
        TO OPERATOR ($DDL);
    );
);

インポート ジョブ

<>大括弧内のパラメータを適切に置き換え、<>大括弧を削除する必要があります。

DEFINE JOB IMPORT_TO_TERADATA
DESCRIPTION 'Import data to Teradata from Kafka Server'
(

    STEP IMPORT_THE_DATA
    (
        APPLY $INSERT TO OPERATOR ($LOAD)
        SELECT * FROM OPERATOR ($FILE_READER ()
            ATTR
            (
                PrivateLogName = 'KAFKAW001P2_1',
                AccessModuleName = 'libkafkaaxsmod.so',
                AccessModuleInitStr = '-M C -T <TopicName> -B
                  <BrokerID> -P 0 -W 5 -S y -TL 3 -BKSZ 1200
                  -X compression.codec=gzip -X group.id=testjob'
            )
        )
        UNION ALL
        SELECT * FROM OPERATOR ($FILE_READER ()
            ATTR
            (
               PrivateLogName = 'KAFKAW001P2_2',
               AccessModuleName = 'libkafkaaxsmod.so',
               AccessModuleInitStr = '-M C -T <TopicName> -B
                 <BrokerID> -P 1 -W 5 -S y -TL 3 -BKSZ 1200 -X
                 compression.codec=gzip -X group.id=testjob'
            )
        )
        UNION ALL
        SELECT * FROM OPERATOR ($FILE_READER ()
            ATTR
            (
               PrivateLogName = 'KAFKAW001P2_3',
               AccessModuleName = 'libkafkaaxsmod.so',

               AccessModuleInitStr = '-M C -T <TopicName> -B
                 <BrokerID> -P 2 -W 5 -S y -TL 3 -BKSZ 1200 -X
                 compression.codec=gzip -X group.id=testjob'
            )
        );
    );
);