スケーラビリティは、Teradata Parallel TransporterのUNION ALL機能を使用して実現できます。
次の例では、Teradata Parallel Transporter Exportオペレータを使用してデータをKafkaサーバーに書き込み、Teradata Parallel Transporter Loadオペレータを使用してそのデータをTeradata Databaseに読み取ります。Teradata Parallel Transporter Loadオペレータは3つのコピーのTeradata Parallel Transporter DataConnectorオペレータに関与します。Teradata Parallel Transporter DataConnectorオペレータの各コピーはTeradata Access Module for Kafkaのコピーを添付します。Teradata Kafka Access Moduleの各コピーは次のように、特定のパーティションからのメッセージを消費します。
- Teradata Access Module for Kafkaのコピー1は、"Partition: 0"からコンシュームします。
- Teradata Access Module for Kafkaのコピー2は、"Partition: 1"からコンシュームします。
- Teradata Access Module for Kafkaのコピー3は、"Partition: 2"からコンシュームします。
Teradata Access Module for Kafkaのすべてのコピーをコンシューマ グループとしてKafkaサーバーに宣言できるようにするには、次の例のインポート シナリオのように、初期化文字列で-X group.id=testjobオプションを使用します。
エクスポート シナリオの例
ジョブ変数ファイル
<>大括弧内のパラメータを適切に置き換え、<>大括弧を削除する必要があります。
/********************************************************/ /* TPT attributes - Common for all Samples */ /********************************************************/ TargetTdpId = '<Teradata Database ID>' ,TargetUserName = '<TargetUserName>' ,TargetUserPassword = '<TargetUserPassword>' ,TargetErrorList = [ '3706','3803','3807' ] ,DDLPrivateLogName = 'DDL_OPERATOR_LOG' /********************************************************/ /* TPT EXPORT Operator attributes */ /********************************************************/ ,ExportPrivateLogName = 'EXPORT_OPERATOR_LOG' ,SourceTdpId = '<Teradata Database ID>' ,SourceUserName = '<SourceUserName>' ,SourceUserPassword = '<SourceUserPassword>' /********************************************************/ /* TPT DataConnector Consumer Operator */ /********************************************************/ ,DCCFormat = 'Formatted' ,DCCPrivateLogName = 'FILE_WRITER_LOG' ,DCCFileName = 'KAFKAW001DT' ,DCCDirectoryPath = '.' ,DCCOpenMode = 'Write' /********************************************************/ /* APPLY STATEMENT parameters */ /********************************************************/ ,ExportInstances = 1 ,WriterInstances = 1
ジョブのセットアップ
DEFINE JOB CREATE_SOURCE_TABLE_IN_TERADATA
DESCRIPTION 'Create a source table in Teradata'
(
STEP CREATE_AND_POPULATE_SOURCE_TABLE
(
APPLY
('DROP TABLE KAFKAW001_src;'),
('CT KAFKAW001_src
(
Associate_Id INTEGER,
Associate_Name CHAR(25),
Salary FLOAT,
DOJ DATE,
Designation VARCHAR(25),
Loan_Amount DECIMAL(5,2),
Martial_Status CHAR(1),
No_Of_Dependents BYTEINT
);
'),
(' Ins KAFKAW001_src( 1,''Morgan Tremblay'',10.12,''99-03-20'',
''Software Engineer'',110.12,''y'',1);
Ins KAFKAW001_src( 2,''Chalmers Clayton'',11.12,''99-03-21'',
''Technical Architect'',111.12,''n'',2);
Ins KAFKAW001_src( 3,''Francis Cochran'',12.12,''99-03-22'',
''Software Engineer'',112.12,''n'',2);
Ins KAFKAW001_src( 4,''Hector Nielsen'',13.12,''99-03-23'',
''Software Engineer'',113.12,''y'',4);
Ins KAFKAW001_src( 5,''Sherlock Fisher'',14.12,''99-03-24'',
''HR Manager'',114.12,''n'',3);
Ins KAFKAW001_src( 6,''Orson Sanchez'',15.12,''99-03-25'',
''Software Engineer'',115.12,''y'',6);
Ins KAFKAW001_src( 7,''Gregory Benton'',16.12,''99-03-26'',
''Department Manager'',116.12,''y'',1);
Ins KAFKAW001_src( 8,''Bryant McCabe'',17.12,''99-03-27'',
''Software Engineer'',117.12,''y'',1);
Ins KAFKAW001_src( 9,''Delmar Halsey'',18.12,''99-03-28'',
''Facility Manager'',118.12,''n'',4);
Ins KAFKAW001_src(10,''Aldrich Jones'',19.12,''99-03-29'',
''Floor Manager'',119.12,''n'',5);
Ins KAFKAW001_src(11,''Unwin Russell'',20.12,''99-03-30'',
''Software Tester II'',120.12,''n'',6);
Ins KAFKAW001_src(12,''Gabriel Cooper'',21.12,''99-03-19'',
''Software Tester'',121.12,''n'',2);
Ins KAFKAW001_src(13,''Willis James'',22.12,''99-03-18'',
''Build Engineer'',122.12,''y'',2);
Ins KAFKAW001_src(14,''Phineas Campbell'',23.12,''99-03-17'',
''Software Engineer'',123.12,''y'',2);
Ins KAFKAW001_src(15,''Rupert Butler'',24.12,''99-03-16'',
''Design Engineer'',124.12,''n'',3);
Ins KAFKAW001_src(16,''Toby Ortiz'',25.12,''99-03-15'',
''Software Engineer'',125.12,''y'',4);
Ins KAFKAW001_src(17,''Hardy Peterson'',26.12,''99-03-14'',
''Software Engineer III'',126.12,''n'',2);
Ins KAFKAW001_src(18,''Zane Morgan'',27.12,''99-03-13'',
''Software Engineer'',127.12,''n'',3);
Ins KAFKAW001_src(19,''Igor Smith'',28.12,''99-03-12'',
''Software Engineer'',128.12,''y'',2);
Ins KAFKAW001_src(20,''Edric Davidson'',29.12,''99-03-11'',
''Software Engineer II'',129.12,''y'',4);
Ins KAFKAW001_src
Sel Associate_Id + 20, Associate_Name, Salary, DOJ, Designation,
Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
Ins KAFKAW001_src
Sel Associate_Id + 40, Associate_Name, Salary, DOJ, Designation,
Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
Ins KAFKAW001_src
Sel Associate_Id + 80, Associate_Name, Salary, DOJ, Designation,
Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
Ins KAFKAW001_src
Sel Associate_Id + 160, Associate_Name, Salary, DOJ, Designation,
Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
Ins KAFKAW001_src
Sel Associate_Id + 320, Associate_Name, Salary, DOJ, Designation,
Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
Ins KAFKAW001_src
Sel Associate_Id + 640, Associate_Name, Salary, DOJ, Designation,
Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
Ins KAFKAW001_src
Sel Associate_Id + 1280, Associate_Name, Salary, DOJ, Designation,
Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
')
TO OPERATOR ($DDL);
);
);
ジョブのエクスポート
<>大括弧内のパラメータを適切に置き換え、<>大括弧を削除する必要があります。
DEFINE JOB EXPORT_FROM_TERADATA
DESCRIPTION 'Export data from Teradata to Kafka Server'
(
STEP EXPORT_THE_DATA
(
APPLY TO OPERATOR ( $FILE_WRITER()
ATTR
(
AccessModuleName = 'libkafkaaxsmod.so',
AccessModuleInitStr = '-M P -T <TopicName> -B
<BrokerID> -P 0,1,2 -S y -TL 3 -BKSZ 1200
-X compression.codec=gzip'
)
)
SELECT * FROM OPERATOR ( $EXPORT
ATTR
(
SelectStmt = 'SELECT * FROM KAFKAW001_src;'
)
);
);
);
インポート シナリオの例
ジョブ変数ファイル
<>大括弧内のパラメータを適切に置き換え、<>大括弧を削除する必要があります。
/********************************************************/ /* TPT attributes - Common for all Samples */ /********************************************************/ TargetTdpId = '<Teradata Database ID>' ,TargetUserName = '<TargetUserName>' ,TargetUserPassword = '<TargetUserPassword>' ,TargetErrorList = [ '3706','3803','3807' ] ,DDLPrivateLogName = 'DDL_OPERATOR_LOG' /********************************************************/ /* TPT LOAD Operator attributes */ /********************************************************/ ,LoadPrivateLogName = 'LOAD_OPERATOR_LOG' ,LoadTargetTable = 'KAFKAW001' ,LoadLogTable = 'KAFKAW001_log' ,LoadErrorTable1 = 'KAFKAW001_e1' ,LoadErrorTable2 = 'KAFKAW001_e2' /********************************************************/ /* TPT DataConnector Producer Operator */ /********************************************************/ ,DCPFormat = 'Formatted' ,DCPFileName = 'KAFKAW001DT' ,DCPDirectoryPath = '.' ,DCPOpenMode = 'Read' /********************************************************/ /* APPLY STATEMENT parameters */ /********************************************************/ ,LoadInstances = 1 ,ReaderInstances = 1
ジョブのセットアップ
DEFINE JOB IMPORT_TO_TERADATA_SETUP
DESCRIPTION 'Import data to Teradata from Kafka Server'
(
STEP CLEANUP_CREATE_TABLE_STEP
(
APPLY
(' DROP TABLE KAFKAW001 '),
(' DROP TABLE KAFKAW001_e1 '),
(' DROP TABLE KAFKAW001_e2 '),
(' DROP TABLE KAFKAW001_log '),
('CT KAFKAW001
(
Associate_Id INTEGER,
Associate_Name CHAR(25),
Salary FLOAT,
DOJ DATE,
Designation VARCHAR(25),
Loan_Amount DECIMAL(5,2),
Martial_Status CHAR(1),
No_Of_Dependents BYTEINT
);
')
TO OPERATOR ($DDL);
);
);
インポート ジョブ
<>大括弧内のパラメータを適切に置き換え、<>大括弧を削除する必要があります。
DEFINE JOB IMPORT_TO_TERADATA
DESCRIPTION 'Import data to Teradata from Kafka Server'
(
STEP IMPORT_THE_DATA
(
APPLY $INSERT TO OPERATOR ($LOAD)
SELECT * FROM OPERATOR ($FILE_READER ()
ATTR
(
PrivateLogName = 'KAFKAW001P2_1',
AccessModuleName = 'libkafkaaxsmod.so',
AccessModuleInitStr = '-M C -T <TopicName> -B
<BrokerID> -P 0 -W 5 -S y -TL 3 -BKSZ 1200
-X compression.codec=gzip -X group.id=testjob'
)
)
UNION ALL
SELECT * FROM OPERATOR ($FILE_READER ()
ATTR
(
PrivateLogName = 'KAFKAW001P2_2',
AccessModuleName = 'libkafkaaxsmod.so',
AccessModuleInitStr = '-M C -T <TopicName> -B
<BrokerID> -P 1 -W 5 -S y -TL 3 -BKSZ 1200 -X
compression.codec=gzip -X group.id=testjob'
)
)
UNION ALL
SELECT * FROM OPERATOR ($FILE_READER ()
ATTR
(
PrivateLogName = 'KAFKAW001P2_3',
AccessModuleName = 'libkafkaaxsmod.so',
AccessModuleInitStr = '-M C -T <TopicName> -B
<BrokerID> -P 2 -W 5 -S y -TL 3 -BKSZ 1200 -X
compression.codec=gzip -X group.id=testjob'
)
);
);
);