16.10 - Scalability and Consumer Groups - Access Module

Teradata Tools and Utilities Access Module Reference

prodname
Access Module
vrm_release
16.10
created_date
July 2017
category
Programming Reference
featnum
B035-2425-077K

Scalability can be achieved by using the UNION ALL feature of the Teradata Parallel Transporter.

The following examples use the Teradata Parallel Transporter Export operator to write data to the Kafka Server, and the Teradata Parallel Transporter Load operator to read the data back into the Teradata Database. The Teradata Parallel Transporter Load operator will invoke 3 copies of the Teradata Parallel Transporter DataConnector operator. Each copy of the Teradata Parallel Transporter DataConnector operator will attach a copy of the Teradata Access Module for Kafka. Each copy of the Teradata Kafka Access Module will consume the messages from a particular partition, as follows:
  • Copy 1 of Teradata Access Module for Kafka will consume from "Partition: 0".
  • Copy 2 of Teradata Access Module for Kafka will consume from "Partition: 1".
  • Copy3 of Teradata Access Module for Kafka will consume from "Partition: 2".

All of the copies of the Teradata Access Module for Kafka can be declared as a consumer group to the Kafka Server by using the -X group.id=testjob option in the initialization string, as shown in the example import scenario.

Example of Export Scenario

Job Variable File

The parameters inside <> brackets need to be replaced appropriately and <> brackets need to be removed.

/********************************************************/
/* TPT attributes - Common for all Samples              */
/********************************************************/
TargetTdpId = '<Teradata Database ID>'
,TargetUserName = '<TargetUserName>'
,TargetUserPassword = '<TargetUserPassword>'
,TargetErrorList = [ '3706','3803','3807' ]
,DDLPrivateLogName = 'DDL_OPERATOR_LOG'

/********************************************************/
/* TPT EXPORT Operator attributes                       */
/********************************************************/

,ExportPrivateLogName = 'EXPORT_OPERATOR_LOG'
,SourceTdpId = '<Teradata Database ID>'
,SourceUserName = '<SourceUserName>'
,SourceUserPassword = '<SourceUserPassword>'

/********************************************************/
/* TPT DataConnector Consumer Operator                  */
/********************************************************/

,DCCFormat = 'Formatted'
,DCCPrivateLogName = 'FILE_WRITER_LOG'
,DCCFileName = 'KAFKAW001DT'
,DCCDirectoryPath = '.'
,DCCOpenMode = 'Write'

/********************************************************/
/* APPLY STATEMENT parameters */
/********************************************************/
,ExportInstances = 1
,WriterInstances = 1

Set Up Job

DEFINE JOB CREATE_SOURCE_TABLE_IN_TERADATA
DESCRIPTION 'Create a source table in Teradata'
(
    STEP CREATE_AND_POPULATE_SOURCE_TABLE
    (
        APPLY
        ('DROP TABLE KAFKAW001_src;'),
        ('CT KAFKAW001_src
          (
              Associate_Id INTEGER,
              Associate_Name CHAR(25),
              Salary FLOAT,
              DOJ DATE,
              Designation VARCHAR(25),
              Loan_Amount DECIMAL(5,2),
              Martial_Status CHAR(1),
              No_Of_Dependents BYTEINT
          );
        '),
        (' Ins KAFKAW001_src( 1,''Morgan Tremblay'',10.12,''99-03-20'',
                              ''Software Engineer'',110.12,''y'',1);
           Ins KAFKAW001_src( 2,''Chalmers Clayton'',11.12,''99-03-21'',
                              ''Technical Architect'',111.12,''n'',2);
           Ins KAFKAW001_src( 3,''Francis Cochran'',12.12,''99-03-22'',
                              ''Software Engineer'',112.12,''n'',2);

           Ins KAFKAW001_src( 4,''Hector Nielsen'',13.12,''99-03-23'',
                              ''Software Engineer'',113.12,''y'',4);
           Ins KAFKAW001_src( 5,''Sherlock Fisher'',14.12,''99-03-24'',
                              ''HR Manager'',114.12,''n'',3);
           Ins KAFKAW001_src( 6,''Orson Sanchez'',15.12,''99-03-25'',
                              ''Software Engineer'',115.12,''y'',6);
           Ins KAFKAW001_src( 7,''Gregory Benton'',16.12,''99-03-26'',
                              ''Department Manager'',116.12,''y'',1);
           Ins KAFKAW001_src( 8,''Bryant McCabe'',17.12,''99-03-27'',
                              ''Software Engineer'',117.12,''y'',1);
           Ins KAFKAW001_src( 9,''Delmar Halsey'',18.12,''99-03-28'',
                              ''Facility Manager'',118.12,''n'',4);
           Ins KAFKAW001_src(10,''Aldrich Jones'',19.12,''99-03-29'',
                              ''Floor Manager'',119.12,''n'',5);
           Ins KAFKAW001_src(11,''Unwin Russell'',20.12,''99-03-30'',
                              ''Software Tester II'',120.12,''n'',6);
           Ins KAFKAW001_src(12,''Gabriel Cooper'',21.12,''99-03-19'',
                              ''Software Tester'',121.12,''n'',2);
           Ins KAFKAW001_src(13,''Willis James'',22.12,''99-03-18'',

                              ''Build Engineer'',122.12,''y'',2);
           Ins KAFKAW001_src(14,''Phineas Campbell'',23.12,''99-03-17'',
                              ''Software Engineer'',123.12,''y'',2);
           Ins KAFKAW001_src(15,''Rupert Butler'',24.12,''99-03-16'',
                              ''Design Engineer'',124.12,''n'',3);
           Ins KAFKAW001_src(16,''Toby Ortiz'',25.12,''99-03-15'',
                              ''Software Engineer'',125.12,''y'',4);
           Ins KAFKAW001_src(17,''Hardy Peterson'',26.12,''99-03-14'',
                              ''Software Engineer III'',126.12,''n'',2);
           Ins KAFKAW001_src(18,''Zane Morgan'',27.12,''99-03-13'',
                              ''Software Engineer'',127.12,''n'',3);
           Ins KAFKAW001_src(19,''Igor Smith'',28.12,''99-03-12'',
                              ''Software Engineer'',128.12,''y'',2);
           Ins KAFKAW001_src(20,''Edric Davidson'',29.12,''99-03-11'',
                              ''Software Engineer II'',129.12,''y'',4);
           Ins KAFKAW001_src
               Sel Associate_Id + 20, Associate_Name, Salary, DOJ, Designation,
               Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
           Ins KAFKAW001_src
               Sel Associate_Id + 40, Associate_Name, Salary, DOJ, Designation,
               Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
           Ins KAFKAW001_src
               Sel Associate_Id + 80, Associate_Name, Salary, DOJ, Designation,
               Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;

           Ins KAFKAW001_src
               Sel Associate_Id + 160, Associate_Name, Salary, DOJ, Designation,
               Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
           Ins KAFKAW001_src
               Sel Associate_Id + 320, Associate_Name, Salary, DOJ, Designation,
               Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
           Ins KAFKAW001_src
               Sel Associate_Id + 640, Associate_Name, Salary, DOJ, Designation,
               Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
           Ins KAFKAW001_src
               Sel Associate_Id + 1280, Associate_Name, Salary, DOJ, Designation,
               Loan_Amount, Martial_Status, No_Of_Dependents from KAFKAW001_src;
        ')
         TO OPERATOR ($DDL);
    );

);

Export Job

The parameters inside <> brackets need to be replaced appropriately and <> brackets need to be removed.

DEFINE JOB EXPORT_FROM_TERADATA
DESCRIPTION 'Export data from Teradata to Kafka Server'
(
    STEP EXPORT_THE_DATA
    (
        APPLY TO OPERATOR ( $FILE_WRITER()
            ATTR
            (
                AccessModuleName = 'libkafkaaxsmod.so',
                AccessModuleInitStr = '-M P -T <TopicName> -B
                  <BrokerID> -P 0,1,2 -S y -TL 3 -BKSZ 1200
                  -X compression.codec=gzip'
            )
        )
        SELECT * FROM OPERATOR ( $EXPORT
            ATTR
            (
                SelectStmt = 'SELECT * FROM KAFKAW001_src;'
            )
        );
    );
);

Example of Import Scenario

Job Variable File

The parameters inside <> brackets need to be replaced appropriately and <> brackets need to be removed.

/********************************************************/
/* TPT attributes - Common for all Samples              */
/********************************************************/
TargetTdpId = '<Teradata Database ID>'
,TargetUserName = '<TargetUserName>'
,TargetUserPassword = '<TargetUserPassword>'
,TargetErrorList = [ '3706','3803','3807' ]
,DDLPrivateLogName = 'DDL_OPERATOR_LOG'

/********************************************************/
/* TPT LOAD Operator attributes                         */
/********************************************************/

,LoadPrivateLogName = 'LOAD_OPERATOR_LOG'
,LoadTargetTable = 'KAFKAW001'
,LoadLogTable = 'KAFKAW001_log'
,LoadErrorTable1 = 'KAFKAW001_e1'
,LoadErrorTable2 = 'KAFKAW001_e2'

/********************************************************/
/* TPT DataConnector Producer Operator                  */
/********************************************************/

,DCPFormat = 'Formatted'
,DCPFileName = 'KAFKAW001DT'
,DCPDirectoryPath = '.'
,DCPOpenMode = 'Read'

/********************************************************/
/* APPLY STATEMENT parameters                           */
/********************************************************/
,LoadInstances = 1
,ReaderInstances = 1
Set Up Job
DEFINE JOB IMPORT_TO_TERADATA_SETUP
DESCRIPTION 'Import data to Teradata from Kafka Server'
(
    STEP CLEANUP_CREATE_TABLE_STEP
    (
        APPLY
        (' DROP TABLE KAFKAW001 '),
        (' DROP TABLE KAFKAW001_e1 '),
        (' DROP TABLE KAFKAW001_e2 '),
        (' DROP TABLE KAFKAW001_log '),
        ('CT KAFKAW001
          (
              Associate_Id INTEGER,
              Associate_Name CHAR(25),
              Salary FLOAT,
              DOJ DATE,
              Designation VARCHAR(25),
              Loan_Amount DECIMAL(5,2),
              Martial_Status CHAR(1),
              No_Of_Dependents BYTEINT
          );
        ')
        TO OPERATOR ($DDL);
    );
);

Import Job

The parameters inside <> brackets need to be replaced appropriately and <> brackets need to be removed.

DEFINE JOB IMPORT_TO_TERADATA
DESCRIPTION 'Import data to Teradata from Kafka Server'
(

    STEP IMPORT_THE_DATA
    (
        APPLY $INSERT TO OPERATOR ($LOAD)
        SELECT * FROM OPERATOR ($FILE_READER ()
            ATTR
            (
                PrivateLogName = 'KAFKAW001P2_1',
                AccessModuleName = 'libkafkaaxsmod.so',
                AccessModuleInitStr = '-M C -T <TopicName> -B
                  <BrokerID> -P 0 -W 5 -S y -TL 3 -BKSZ 1200
                  -X compression.codec=gzip -X group.id=testjob'
            )
        )
        UNION ALL
        SELECT * FROM OPERATOR ($FILE_READER ()
            ATTR
            (
               PrivateLogName = 'KAFKAW001P2_2',
               AccessModuleName = 'libkafkaaxsmod.so',
               AccessModuleInitStr = '-M C -T <TopicName> -B
                 <BrokerID> -P 1 -W 5 -S y -TL 3 -BKSZ 1200 -X
                 compression.codec=gzip -X group.id=testjob'
            )
        )
        UNION ALL
        SELECT * FROM OPERATOR ($FILE_READER ()
            ATTR
            (
               PrivateLogName = 'KAFKAW001P2_3',
               AccessModuleName = 'libkafkaaxsmod.so',

               AccessModuleInitStr = '-M C -T <TopicName> -B
                 <BrokerID> -P 2 -W 5 -S y -TL 3 -BKSZ 1200 -X
                 compression.codec=gzip -X group.id=testjob'
            )
        );
    );
);