Code Example - Parallel Transporter

Teradata® Parallel Transporter Application Programming Interface Programmer Guide - 17.20

Product
Parallel Transporter
Release Number
17.20
Published
June 2022
Language
English (United States)
Last Update
2022-10-11
dita:mapPath
fag1645201363032.ditamap
dita:ditavalPath
obe1474387269547.ditaval
dita:id
B035-2516
Product Category
Teradata Tools and Utilities
#include "connection.h"
#include "schema.h"
#include "DMLGroup.h"

using namespace teradata::client::API;

int returnValue = 0;
char* errorMsg = NULL;
TD_ErrorType errorType;

cout << "*** Stream Driver Example ***" << endl;

Connection *conn = new Connection();
/**********************************************
* Set Operator Type and Trace/Log Levels
**********************************************/
conn->AddAttribute(TD_SYSTEM_OPERATOR,TD_STREAM);
//conn->AddAttribute(TD_TRACE_OUTPUT,"stream.txt");
//conn->AddArrayAttribute(TD_TRACE_LEVEL,2,TD_OPER_ALL,TD_OFF,NULL);
/**********************************************
* Add Attributes
**********************************************/
conn->AddAttribute(TD_USER_NAME,"user");
conn->AddAttribute(TD_USER_PASSWORD,"password");
conn->AddAttribute(TD_TDP_ID,"database");
conn->AddAttribute(TD_LOG_TABLE,"tdload_log");
conn->AddAttribute(TD_ERROR_TABLE,"tdload_e1");
conn->AddAttribute(TD_TARGET_TABLE,"tdload");
conn->AddAttribute(TD_MAX_SESSIONS,4);
conn->AddAttribute(TD_MIN_SESSIONS,1);
conn->AddAttribute(TD_TENACITY_HOURS,2);
conn->AddAttribute(TD_TENACITY_SLEEP,3);
conn->AddAttribute(TD_ROBUST,"No");
/**********************************************
* Add Schema
**********************************************/

Schema * schema = new Schema("input");
schema->AddColumn("Associate_Id",TD_INTEGER,4);
schema->AddColumn("Associate_Name",TD_CHAR,25);
schema->AddColumn("Salary",TD_FLOAT,8);
schema->AddColumn("DOJ",TD_DATE,4);
schema->AddColumn("Designation",TD_VARCHAR,25);
schema->AddColumn("Loan_Amount",TD_DECIMAL,4,5,2);
schema->AddColumn("Martial_Status",TD_CHAR,1);
schema->AddColumn("No_Of_Dependents",TD_BYTEINT,1);
conn->AddSchema(schema);
/**********************************************
* Add DMLGroups
**********************************************/
TD_Index dmlGroupIndex = 0;

DMLGroup * dmlGr = new DMLGroup();
dmlGr->AddStatement("INSERT INTO tdload( :Associate_Id, :Associate_Name, :Salary, :DOJ, :Designation, :Loan_Amount, :Martial_Status, :No_Of_Dependents);");
dmlGr->AddSerializeOn(8, "Associate_Id", "Associate_Name", "Salary", "DOJ", "Designation", "Loan_Amount", "Martial_Status", "No_Of_Dependents", NULL);
dmlGr->AddUseList(8, "Associate_Id", "Associate_Name", "Salary", "DOJ", "Designation", "Loan_Amount", "Martial_Status", "No_Of_Dependents", NULL);
returnValue = conn->AddDMLGroup(dmlGr,&dmlGroupIndex);
/**********************************************
* Initiate
**********************************************/
returnValue = conn->Initiate();

cout << "Driver Initiated with status " << returnValue << endl;

if ( returnValue < TD_Error )
   {/**********************************************
	* Acquisition
	**********************************************/

	char rowBuffer[78]; /* must include the EOL byte */
	int loadStatus = 0;
	TD_Length bytes;
	int count = 0;

	while( loadStatus != -1 )
	{
		loadStatus = GetRowData(rowBuffer,78);	  // user function - reads in a
											// row of data from a file
											// returns -1 when end of file
											// reached

		if( loadStatus != -1 )
		{
			/* pick up the first 2 bytes as the row length of the indicator mode
				record */
			bytes = *((unsigned short *)rowBuffer);
			returnValue = conn->PutRow( rowBuffer + 2 , bytes);

			if ( returnValue >=TD_Error )
			{
				cout << "PutRow failed on row " << count+1;
				cout << " with status " << returnValue << endl;
				loadStatus = GetRowData(NULL,0); //user function - close file
			}else{
				count++;
			}

		}

	}

	if ( returnValue < TD_Error )
	{
		cout << "Sent " << count << " rows" << endl;
		/**********************************************
		* End Acquisition
		**********************************************/

		returnValue = conn->EndAcquisition();

		cout << "Acquisition completed with status " << returnValue << endl;

		}else{
			//Get Error Information
			cout << "Error occured during EndAcquisition" << endl;
			conn->GetErrorInfo(&errorMsg,&errorType);
			if ( errorMsg != NULL ){
				cout << errorMsg << endl;
				cout << "Type: " << errorType << endl;
			}else{
				cout << "No Error Info Available" << endl;
			}
		}

	}else{
		//Get Error Information
		cout << "Error occured during Acquisition" << endl;
		conn->GetErrorInfo(&errorMsg,&errorType);
		if ( errorMsg != NULL ){
			cout << errorMsg << endl;
			cout << "Type: " << errorType << endl;
		}else{
			cout << "No Error Info Available" << endl;
		}
	}

}else{
	//Get Error Information
	cout << "Error occured during Initiate" << endl;
	conn->GetErrorInfo(&errorMsg,&errorType);
	if ( errorMsg != NULL ){
		cout << errorMsg << endl;
		cout << "Type: " << errorType << endl;
	}else{
		cout << "No Error Info Available" << endl;
	}
}
/**********************************************
* Terminate
**********************************************/

returnValue = conn->Terminate();

cout << "Driver Terminated with status " << returnValue << endl;

if ( returnValue >= TD_Error )
{
	//Get Error Information
	cout << "Error occured during Terminate" << endl;
	conn->GetErrorInfo(&errorMsg,&errorType);
	if ( errorMsg != NULL ){
		cout << errorMsg << endl;
		cout << "Type: " << errorType << endl;
	}else{
		cout << "No Error Info Available" << endl;
	}
}
/**********************************************
* Clean Up
**********************************************/

cout << "Deleting Objects" << endl;
delete dmlGr;
delete schema;
delete conn;

cout << "*** Stream Complete ***" << endl;