Scalability can be achieved by using the UNION ALL feature of the Teradata Parallel Transporter.
- Copy 1 of Teradata Access Module for Kafka will consume from "Partition: 0".
- Copy 2 of Teradata Access Module for Kafka will consume from "Partition: 1".
- Copy3 of Teradata Access Module for Kafka will consume from "Partition: 2".
All of the copies of the Teradata Access Module for Kafka can be declared as a consumer group to the Kafka Server by using the -X group.id=testjob option in the initialization string, as shown in the example import scenario.
Example of Export Scenario
Job Variable File
The parameters inside <> brackets need to be replaced appropriately and <> brackets need to be removed.
/********************************************************/ /* TPT attributes - Common for all Samples */ /********************************************************/ TargetTdpId = '<Teradata Database ID>' ,TargetUserName = '<TargetUserName>' ,TargetUserPassword = '<TargetUserPassword>' ,TargetErrorList = [ '3706','3803','3807' ] ,DDLPrivateLogName = 'DDL_OPERATOR_LOG' /********************************************************/ /* TPT EXPORT Operator attributes */ /********************************************************/ ,ExportPrivateLogName = 'EXPORT_OPERATOR_LOG' ,SourceTdpId = '<Teradata Database ID>' ,SourceUserName = '<SourceUserName>' ,SourceUserPassword = '<SourceUserPassword>' /********************************************************/ /* TPT DataConnector Consumer Operator */ /********************************************************/ ,FileWriterFormat = 'Formatted' ,FileWriterPrivateLogName = 'FILE_WRITER_LOG' ,FileWriterOpenMode = 'Write' /********************************************************/ /* APPLY STATEMENT parameters */ /********************************************************/ ,ExportInstances = 1 ,WriterInstances = 1
Set Up Job
DEFINE JOB CREATE_SOURCE_TABLE_IN_TERADATA
DESCRIPTION 'Create a source table in Teradata'
(
STEP CREATE_AND_POPULATE_SOURCE_TABLE
(
APPLY
('DROP TABLE KAFKAW001_src;'),
('CT KAFKAW001_src
(
Associate_Id INTEGER,
Associate_Name CHAR(25),
Salary FLOAT,
DOJ DATE,
Designation VARCHAR(25),
Loan_Amount DECIMAL(5,2),
Martial_Status CHAR(1),
No_Of_Dependents BYTEINT
);
'),
(' Ins KAFKAW001_src( 1,''Morgan Tremblay'',10.12,''99-03-20'',
''Software Engineer'',110.12,''y'',1);
Ins KAFKAW001_src( 2,''Chalmers Clayton'',11.12,''99-03-21'',
''Technical Architect'',111.12,''n'',2);
Ins KAFKAW001_src( 3,''Francis Cochran'',12.12,''99-03-22'',
''Software Engineer'',112.12,''n'',2);
Ins KAFKAW001_src( 4,''Hector Nielsen'',13.12,''99-03-23'',
''Software Engineer'',113.12,''y'',4);
Ins KAFKAW001_src( 5,''Sherlock Fisher'',14.12,''99-03-24'',
''HR Manager'',114.12,''n'',3);
Ins KAFKAW001_src( 6,''Orson Sanchez'',15.12,''99-03-25'',
''Software Engineer'',115.12,''y'',6);
Ins KAFKAW001_src( 7,''Gregory Benton'',16.12,''99-03-26'',
''Department Manager'',116.12,''y'',1);
Ins KAFKAW001_src( 8,''Bryant McCabe'',17.12,''99-03-27'',
''Software Engineer'',117.12,''y'',1);
Ins KAFKAW001_src( 9,''Delmar Halsey'',18.12,''99-03-28'',
''Facility Manager'',118.12,''n'',4);
Ins KAFKAW001_src(10,''Aldrich Jones'',19.12,''99-03-29'',
''Floor Manager'',119.12,''n'',5);
Ins KAFKAW001_src(11,''Unwin Russell'',20.12,''99-03-30'',
''Software Tester II'',120.12,''n'',6);
Ins KAFKAW001_src(12,''Gabriel Cooper'',21.12,''99-03-19'',
''Software Tester'',121.12,''n'',2);
Ins KAFKAW001_src(13,''Willis James'',22.12,''99-03-18'',
''Build Engineer'',122.12,''y'',2);
Ins KAFKAW001_src(14,''Phineas Campbell'',23.12,''99-03-17'',
''Software Engineer'',123.12,''y'',2);
Ins KAFKAW001_src(15,''Rupert Butler'',24.12,''99-03-16'',
''Design Engineer'',124.12,''n'',3);
Ins KAFKAW001_src(16,''Toby Ortiz'',25.12,''99-03-15'',
''Software Engineer'',125.12,''y'',4);
Ins KAFKAW001_src(17,''Hardy Peterson'',26.12,''99-03-14'',
''Software Engineer III'',126.12,''n'',2);
Ins KAFKAW001_src(18,''Zane Morgan'',27.12,''99-03-13'',
''Software Engineer'',127.12,''n'',3);
Ins KAFKAW001_src(19,''Igor Smith'',28.12,''99-03-12'',
''Software Engineer'',128.12,''y'',2);
Ins KAFKAW001_src(20,''Edric Davidson'',29.12,''99-03-11'',
''Software Engineer II'',129.12,''y'',4);
Ins KAFKAW001_src
Sel Associate_Id + 20, Associate_Name, Salary, DOJ, Designation,
Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
Ins KAFKAW001_src
Sel Associate_Id + 40, Associate_Name, Salary, DOJ, Designation,
Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
Ins KAFKAW001_src
Sel Associate_Id + 80, Associate_Name, Salary, DOJ, Designation,
Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
Ins KAFKAW001_src
Sel Associate_Id + 160, Associate_Name, Salary, DOJ, Designation,
Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
Ins KAFKAW001_src
Sel Associate_Id + 320, Associate_Name, Salary, DOJ, Designation,
Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
Ins KAFKAW001_src
Sel Associate_Id + 640, Associate_Name, Salary, DOJ, Designation,
Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
Ins KAFKAW001_src
Sel Associate_Id + 1280, Associate_Name, Salary, DOJ, Designation,
Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
')
TO OPERATOR ($DDL);
);
);
Export Job
The parameters inside <> brackets need to be replaced appropriately and <> brackets need to be removed.
DEFINE JOB EXPORT_FROM_TERADATA
DESCRIPTION 'Export data from Teradata to Kafka Server'
(
STEP EXPORT_THE_DATA
(
APPLY TO OPERATOR ( $FILE_WRITER()
ATTR
(
AccessModuleName = 'libkafkaaxsmod.so',
AccessModuleInitStr = '-M P -T <TopicName> -B
<BrokerID> -P 0,1,2 -S y -TL 3 -BKSZ 1200
-X compression.codec=gzip'
)
)
SELECT * FROM OPERATOR ( $EXPORT
ATTR
(
SelectStmt = 'SELECT * FROM KAFKAW001_src;'
)
);
);
);
Example of Import Scenario
Job Variable File
The parameters inside <> brackets need to be replaced appropriately and <> brackets need to be removed.
/********************************************************/ /* TPT attributes - Common for all Samples */ /********************************************************/ TargetTdpId = '<Teradata Database ID>' ,TargetUserName = '<TargetUserName>' ,TargetUserPassword = '<TargetUserPassword>' ,TargetErrorList = [ '3706','3803','3807' ] ,DDLPrivateLogName = 'DDL_OPERATOR_LOG' /********************************************************/ /* TPT LOAD Operator attributes */ /********************************************************/ ,LoadPrivateLogName = 'LOAD_OPERATOR_LOG' ,LoadTargetTable = 'KAFKAW001' ,LoadLogTable = 'KAFKAW001_log' ,LoadErrorTable1 = 'KAFKAW001_e1' ,LoadErrorTable2 = 'KAFKAW001_e2' /********************************************************/ /* TPT DataConnector Producer Operator */ /********************************************************/ ,FileReaderFormat = 'Formatted' ,FileReaderOpenMode = 'Read' /********************************************************/ /* APPLY STATEMENT parameters */ /********************************************************/ ,LoadInstances = 1 ,ReaderInstances = 1
DEFINE JOB IMPORT_TO_TERADATA_SETUP
DESCRIPTION 'Import data to Teradata from Kafka Server'
(
STEP CLEANUP_CREATE_TABLE_STEP
(
APPLY
(' DROP TABLE KAFKAW001 '),
(' DROP TABLE KAFKAW001_e1 '),
(' DROP TABLE KAFKAW001_e2 '),
(' DROP TABLE KAFKAW001_log '),
('CT KAFKAW001
(
Associate_Id INTEGER,
Associate_Name CHAR(25),
Salary FLOAT,
DOJ DATE,
Designation VARCHAR(25),
Loan_Amount DECIMAL(5,2),
Martial_Status CHAR(1),
No_Of_Dependents BYTEINT
);
')
TO OPERATOR ($DDL);
);
);
Import Job
The parameters inside <> brackets need to be replaced appropriately and <> brackets need to be removed.
DEFINE JOB IMPORT_TO_TERADATA
DESCRIPTION 'Import data to Teradata from Kafka Server'
(
STEP IMPORT_THE_DATA
(
APPLY $INSERT TO OPERATOR ($LOAD)
SELECT * FROM OPERATOR ($FILE_READER ()
ATTR
(
PrivateLogName = 'KAFKAW001P2_1',
AccessModuleName = 'libkafkaaxsmod.so',
AccessModuleInitStr = '-M C -T <TopicName> -B
<BrokerID> -P 0 -W 5 -S y -TL 3 -BKSZ 1200
-X compression.codec=gzip -X group.id=testjob'
)
)
UNION ALL
SELECT * FROM OPERATOR ($FILE_READER ()
ATTR
(
PrivateLogName = 'KAFKAW001P2_2',
AccessModuleName = 'libkafkaaxsmod.so',
AccessModuleInitStr = '-M C -T <TopicName> -B
<BrokerID> -P 1 -W 5 -S y -TL 3 -BKSZ 1200 -X
compression.codec=gzip -X group.id=testjob'
)
)
UNION ALL
SELECT * FROM OPERATOR ($FILE_READER ()
ATTR
(
PrivateLogName = 'KAFKAW001P2_3',
AccessModuleName = 'libkafkaaxsmod.so',
AccessModuleInitStr = '-M C -T <TopicName> -B
<BrokerID> -P 2 -W 5 -S y -TL 3 -BKSZ 1200 -X
compression.codec=gzip -X group.id=testjob'
)
);
);
);