スケーラビリティは、Teradata Parallel TransporterのUNION ALL機能を使用して実現できます。
以下の例では、Teradata Parallel Transporter Exportオペレータを使用してKafkaサーバーにデータを書き込み、Teradata Parallel Transporter Loadオペレータを使用してそのデータを読み取ってデータベースに戻します。Teradata Parallel Transporter Loadオペレータは、Teradata Parallel Transporter DataConnectorオペレータのコピーを3つ呼び出します。Teradata Parallel Transporter DataConnectorオペレータの各コピーは、Teradata Access Module for Kafkaのコピーを接続します。Teradata Kafka Access Moduleの各コピーは、次のように、特定のパーティションからのメッセージを消費します。
- Teradata Access Module for Kafkaのコピー1は、"Partition: 0"からコンシュームします。
- Teradata Access Module for Kafkaのコピー2は、"Partition: 1"からコンシュームします。
- Teradata Access Module for Kafkaのコピー3は、"Partition: 2"からコンシュームします。
Teradata Access Module for Kafkaのすべてのコピーをコンシューマ グループとしてKafkaサーバーに宣言できるようにするには、次の例のインポート シナリオのように、初期化文字列で-X group.id=testjobオプションを使用します。
エクスポート シナリオの例
ジョブ変数ファイル
<>大括弧内のパラメータを適切に置き換え、<>大括弧を削除する必要があります。
/********************************************************/ /* TPT attributes - Common for all Samples */ /********************************************************/ TargetTdpId = '<Teradata Database ID>' ,TargetUserName = '<TargetUserName>' ,TargetUserPassword = '<TargetUserPassword>' ,TargetErrorList = [ '3706','3803','3807' ] ,DDLPrivateLogName = 'DDL_OPERATOR_LOG' /********************************************************/ /* TPT EXPORT Operator attributes */ /********************************************************/ ,ExportPrivateLogName = 'EXPORT_OPERATOR_LOG' ,SourceTdpId = '<Teradata Database ID>' ,SourceUserName = '<SourceUserName>' ,SourceUserPassword = '<SourceUserPassword>' /********************************************************/ /* TPT DataConnector Consumer Operator */ /********************************************************/ ,FileWriterFormat = 'Formatted' ,FileWriterPrivateLogName = 'FILE_WRITER_LOG' ,FileWriterOpenMode = 'Write' /********************************************************/ /* APPLY STATEMENT parameters */ /********************************************************/ ,ExportInstances = 1 ,WriterInstances = 1
ジョブのセットアップ
DEFINE JOB CREATE_SOURCE_TABLE_IN_TERADATA DESCRIPTION 'Create a source table in Teradata' ( STEP CREATE_AND_POPULATE_SOURCE_TABLE ( APPLY ('DROP TABLE KAFKAW001_src;'), ('CT KAFKAW001_src ( Associate_Id INTEGER, Associate_Name CHAR(25), Salary FLOAT, DOJ DATE, Designation VARCHAR(25), Loan_Amount DECIMAL(5,2), Martial_Status CHAR(1), No_Of_Dependents BYTEINT ); '), (' Ins KAFKAW001_src( 1,''Morgan Tremblay'',10.12,''99-03-20'', ''Software Engineer'',110.12,''y'',1); Ins KAFKAW001_src( 2,''Chalmers Clayton'',11.12,''99-03-21'', ''Technical Architect'',111.12,''n'',2); Ins KAFKAW001_src( 3,''Francis Cochran'',12.12,''99-03-22'', ''Software Engineer'',112.12,''n'',2); Ins KAFKAW001_src( 4,''Hector Nielsen'',13.12,''99-03-23'', ''Software Engineer'',113.12,''y'',4); Ins KAFKAW001_src( 5,''Sherlock Fisher'',14.12,''99-03-24'', ''HR Manager'',114.12,''n'',3); Ins KAFKAW001_src( 6,''Orson Sanchez'',15.12,''99-03-25'', ''Software Engineer'',115.12,''y'',6); Ins KAFKAW001_src( 7,''Gregory Benton'',16.12,''99-03-26'', ''Department Manager'',116.12,''y'',1); Ins KAFKAW001_src( 8,''Bryant McCabe'',17.12,''99-03-27'', ''Software Engineer'',117.12,''y'',1); Ins KAFKAW001_src( 9,''Delmar Halsey'',18.12,''99-03-28'', ''Facility Manager'',118.12,''n'',4); Ins KAFKAW001_src(10,''Aldrich Jones'',19.12,''99-03-29'', ''Floor Manager'',119.12,''n'',5); Ins KAFKAW001_src(11,''Unwin Russell'',20.12,''99-03-30'', ''Software Tester II'',120.12,''n'',6); Ins KAFKAW001_src(12,''Gabriel Cooper'',21.12,''99-03-19'', ''Software Tester'',121.12,''n'',2); Ins KAFKAW001_src(13,''Willis James'',22.12,''99-03-18'', ''Build Engineer'',122.12,''y'',2); Ins KAFKAW001_src(14,''Phineas Campbell'',23.12,''99-03-17'', ''Software Engineer'',123.12,''y'',2); Ins KAFKAW001_src(15,''Rupert Butler'',24.12,''99-03-16'', ''Design Engineer'',124.12,''n'',3); Ins KAFKAW001_src(16,''Toby Ortiz'',25.12,''99-03-15'', ''Software Engineer'',125.12,''y'',4); Ins KAFKAW001_src(17,''Hardy Peterson'',26.12,''99-03-14'', ''Software Engineer III'',126.12,''n'',2); Ins KAFKAW001_src(18,''Zane Morgan'',27.12,''99-03-13'', ''Software Engineer'',127.12,''n'',3); Ins KAFKAW001_src(19,''Igor Smith'',28.12,''99-03-12'', ''Software Engineer'',128.12,''y'',2); Ins KAFKAW001_src(20,''Edric Davidson'',29.12,''99-03-11'', ''Software Engineer II'',129.12,''y'',4); Ins KAFKAW001_src Sel Associate_Id + 20, Associate_Name, Salary, DOJ, Designation, Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src; Ins KAFKAW001_src Sel Associate_Id + 40, Associate_Name, Salary, DOJ, Designation, Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src; Ins KAFKAW001_src Sel Associate_Id + 80, Associate_Name, Salary, DOJ, Designation, Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src; Ins KAFKAW001_src Sel Associate_Id + 160, Associate_Name, Salary, DOJ, Designation, Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src; Ins KAFKAW001_src Sel Associate_Id + 320, Associate_Name, Salary, DOJ, Designation, Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src; Ins KAFKAW001_src Sel Associate_Id + 640, Associate_Name, Salary, DOJ, Designation, Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src; Ins KAFKAW001_src Sel Associate_Id + 1280, Associate_Name, Salary, DOJ, Designation, Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src; ') TO OPERATOR ($DDL); ); );
ジョブのエクスポート
<>大括弧内のパラメータを適切に置き換え、<>大括弧を削除する必要があります。
DEFINE JOB EXPORT_FROM_TERADATA DESCRIPTION 'Export data from Teradata to Kafka Server' ( STEP EXPORT_THE_DATA ( APPLY TO OPERATOR ( $FILE_WRITER() ATTR ( AccessModuleName = 'libkafkaaxsmod.so', AccessModuleInitStr = '-M P -T <TopicName> -B <BrokerID> -P 0,1,2 -S y -TL 3 -BKSZ 1200 -X compression.codec=gzip' ) ) SELECT * FROM OPERATOR ( $EXPORT ATTR ( SelectStmt = 'SELECT * FROM KAFKAW001_src;' ) ); ); );
インポート シナリオの例
ジョブ変数ファイル
<>大括弧内のパラメータを適切に置き換え、<>大括弧を削除する必要があります。
/********************************************************/ /* TPT attributes - Common for all Samples */ /********************************************************/ TargetTdpId = '<Teradata Database ID>' ,TargetUserName = '<TargetUserName>' ,TargetUserPassword = '<TargetUserPassword>' ,TargetErrorList = [ '3706','3803','3807' ] ,DDLPrivateLogName = 'DDL_OPERATOR_LOG' /********************************************************/ /* TPT LOAD Operator attributes */ /********************************************************/ ,LoadPrivateLogName = 'LOAD_OPERATOR_LOG' ,LoadTargetTable = 'KAFKAW001' ,LoadLogTable = 'KAFKAW001_log' ,LoadErrorTable1 = 'KAFKAW001_e1' ,LoadErrorTable2 = 'KAFKAW001_e2' /********************************************************/ /* TPT DataConnector Producer Operator */ /********************************************************/ ,FileReaderFormat = 'Formatted' ,FileReaderOpenMode = 'Read' /********************************************************/ /* APPLY STATEMENT parameters */ /********************************************************/ ,LoadInstances = 1 ,ReaderInstances = 1
ジョブのセットアップ
DEFINE JOB IMPORT_TO_TERADATA_SETUP DESCRIPTION 'Import data to Teradata from Kafka Server' ( STEP CLEANUP_CREATE_TABLE_STEP ( APPLY (' DROP TABLE KAFKAW001 '), (' DROP TABLE KAFKAW001_e1 '), (' DROP TABLE KAFKAW001_e2 '), (' DROP TABLE KAFKAW001_log '), ('CT KAFKAW001 ( Associate_Id INTEGER, Associate_Name CHAR(25), Salary FLOAT, DOJ DATE, Designation VARCHAR(25), Loan_Amount DECIMAL(5,2), Martial_Status CHAR(1), No_Of_Dependents BYTEINT ); ') TO OPERATOR ($DDL); ); );
インポート ジョブ
<>大括弧内のパラメータを適切に置き換え、<>大括弧を削除する必要があります。
DEFINE JOB IMPORT_TO_TERADATA DESCRIPTION 'Import data to Teradata from Kafka Server' ( STEP IMPORT_THE_DATA ( APPLY $INSERT TO OPERATOR ($LOAD) SELECT * FROM OPERATOR ($FILE_READER () ATTR ( PrivateLogName = 'KAFKAW001P2_1', AccessModuleName = 'libkafkaaxsmod.so', AccessModuleInitStr = '-M C -T <TopicName> -B <BrokerID> -P 0 -W 5 -S y -TL 3 -BKSZ 1200 -X compression.codec=gzip -X group.id=testjob' ) ) UNION ALL SELECT * FROM OPERATOR ($FILE_READER () ATTR ( PrivateLogName = 'KAFKAW001P2_2', AccessModuleName = 'libkafkaaxsmod.so', AccessModuleInitStr = '-M C -T <TopicName> -B <BrokerID> -P 1 -W 5 -S y -TL 3 -BKSZ 1200 -X compression.codec=gzip -X group.id=testjob' ) ) UNION ALL SELECT * FROM OPERATOR ($FILE_READER () ATTR ( PrivateLogName = 'KAFKAW001P2_3', AccessModuleName = 'libkafkaaxsmod.so', AccessModuleInitStr = '-M C -T <TopicName> -B <BrokerID> -P 2 -W 5 -S y -TL 3 -BKSZ 1200 -X compression.codec=gzip -X group.id=testjob' ) ); ); );