Sample Export_To_Load Program for Returning the Actual Data Type
***********************************************************************
* Copyright 2006-2009, 2011 by Teradata Corporation.
* All Rights Reserved.
* TERADATA CONFIDENTIAL AND TRADE SECRET
*
* This copyrighted material is the Confidential, Unpublished
* Property of the Teradata Corporation. This copyright notice and
* any other copyright notices included in machine readable
* copies must be reproduced on all authorized copies.
*
***********************************************************************/
#include "GenericDriver.h"
#include "OptionsManager.h"
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include "string.h"
#include <vector>
#define MAX 10
using namespace teradata::client::API;
GenericDriver::GenericDriver(OptionsManager * om)
{
inputStream = NULL;
outputStream = NULL;
inputFileName = NULL;
outputFileName = NULL;
m_om = om;
}
GenericDriver::~GenericDriver(void)
{
if( inputFileName != NULL )
{
free(inputFileName);
}
if( outputFileName != NULL )
{
free(outputFileName);
}
}
/***************************************************/
/* testLoad */
/* */
/* Tests transferring data from Export's GetBuffer */
/* to Load's PutBuffer. Uses the following steps: */
/* */
/* 1. Setup and Initiate Load driver */
/* 2. Query buffer layout from Load */
/* 3. Pass Load buffer layout to Export driver */
/* 4. Initiate Export driver */
/* 5. Transfer buffers from GetBuffer to PutBuffer */
/* 6. Call Load's EndAcquisition & ApplyRows */
/* 7. Terminate Load and Export */
/***************************************************/
void GenericDriver::testExportToLoad()
{
time_t now;
time(&now);
int ldStatus = 0;
int expStatus = 0;
int queryStatus = 0;
char *errorMsg = NULL;
TD_ErrorType errorType;
RowCounts ldCount; /* RowCounts struct defined in GetBuffer.h */
char *dataptr;
TD_Length datalen;
char *msg = (char*)malloc(256);
/**********************************************
* Retrieve Options From Option File
**********************************************/
int numSessions = m_om -> getNumSessions();
char *tdp_id = strdup(m_om -> getTdpId().c_str());
char *user_name = strdup(m_om -> getUserName().c_str());
char *user_password = strdup(m_om -> getUserPassword().c_str());
char *target_table = strdup(m_om -> getTargetTable().c_str());
char *target_table_export = strdup(m_om -> getTargetTableExport().c_str());
char *log_table = strdup(m_om -> getLogTable().c_str());
char *error_table_1 = strdup(m_om -> getErrorTable1().c_str());
char *error_table_2 = strdup(m_om -> getErrorTable2().c_str());
char *ins_statement = (char*)malloc(1024);
char *sel_statement = (char*)malloc(1024);
sprintf(ins_statement, "INSERT INTO %s;",target_table);
//sprintf(ins_statement, "INSERT INTO %s( :col1, :col2);",target_table);
sprintf(sel_statement, "SELECT * FROM %s;", target_table_export);
cout << "######################### OUTPUT START #########################" \
<< endl;
cout << "\nTest Description :GenericDriver sample" << endl;
cout << "Date :" << ctime(&now);
Connection *ldConn = new Connection(); /* Load */
Connection *expConn = new Connection(); /* Export */
/**********************************************
* Set Operator Type and Trace/Log Levels
**********************************************/
ldConn -> AddAttribute(TD_SYSTEM_OPERATOR, TD_LOAD);
expConn -> AddAttribute(TD_SYSTEM_OPERATOR, TD_EXPORT);
expConn -> AddAttribute(TD_TRACE_OUTPUT, "exportL.txt");
expConn -> AddArrayAttribute(TD_TRACE_LEVEL, 2, TD_OPER_ALL,\
TD_GENERAL, NULL);
ldConn -> AddAttribute(TD_TRACE_OUTPUT, "load.txt");
ldConn -> AddArrayAttribute(TD_TRACE_LEVEL, 2, TD_OPER_ALL,\
TD_GENERAL, NULL);
/**********************************************
* Add Attributes
**********************************************/
ldConn -> AddAttribute(TD_TDP_ID, tdp_id);
ldConn -> AddAttribute(TD_USER_NAME, user_name);
ldConn -> AddAttribute(TD_USER_PASSWORD, user_password);
ldConn -> AddAttribute(TD_TARGET_TABLE, target_table);
ldConn -> AddAttribute(TD_LOG_TABLE, log_table);
ldConn -> AddAttribute(TD_ERROR_TABLE_1, error_table_1);
ldConn -> AddAttribute(TD_ERROR_TABLE_2, error_table_2);
ldConn -> AddAttribute(TD_MAX_SESSIONS, numSessions);
ldConn -> AddAttribute(TD_MIN_SESSIONS, numSessions);
ldConn -> AddAttribute(TD_WILDCARDINSERT, "Yes");
//ldConn -> AddAttribute(TD_CHARSET, "UTF8");
//ldConn -> AddAttribute(TD_CHARSET, "UTF16");
expConn -> AddAttribute(TD_TDP_ID, tdp_id);
expConn -> AddAttribute(TD_USER_NAME, user_name);
expConn -> AddAttribute(TD_USER_PASSWORD, user_password);
expConn -> AddAttribute(TD_SELECT_STMT, sel_statement);
expConn -> AddAttribute(TD_MAX_SESSIONS, numSessions);
expConn -> AddAttribute(TD_MIN_SESSIONS, numSessions);
expConn -> AddAttribute(TD_RETN_ACT_DATATYPE, "FOO");
//expConn -> AddAttribute(TD_CHARSET, "UTF8");
//expConn -> AddAttribute(TD_CHARSET, "UTF16");
/**********************************************
* Add Schema
**********************************************/
Schema* schema = NULL;
cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
cout << " EXPORT OPERATOR CONNECTION INFO " << endl;
cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
cout << "Operator type :EXPORT" << endl;
cout << "TDP ID :" << tdp_id << endl;
cout << "TDP username :" << user_name << endl;
cout << "TDP password :<not displayable>" << endl;
cout << "TargetTable :" << target_table_export << endl;
cout << "DML Statement :" << sel_statement << endl;
/**********************************************
* Initiate Export Driver
**********************************************/
cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
cout << " INITIATE PHASE " << endl;
cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
expStatus = expConn -> Initiate();
if ( expStatus >= TD_Error )
{
//Get Error Information
cout << "Error occured during Export Initiate" << endl;
expConn -> GetErrorInfo(&errorMsg, &errorType);
if ( errorMsg != NULL )
{
cout << errorMsg << endl;
cout << "Type: " << errorType << endl;
}
else
{
cout << "No Error Info Available" << endl;
}
}
else
{
cout << "Export Driver initiated Successfully" << endl;
/**********************************************
* Add Schema for load
**********************************************/
cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
cout << " COLUMN INFO " << endl;
cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
Schema* dynamicSchema;
int returnValue = expConn -> GetSchema(&dynamicSchema);
ldConn -> AddSchema(dynamicSchema);
if ( returnValue < TD_Error && returnValue != TD_Unavailable )
{
int Count = dynamicSchema -> GetColumnCount();
vector<Column*> column(MAX);
cout << "Column Count:" << Count << endl;
for (vector<Column*>::size_type i=0; i < Count; i++)
{
column[i] = dynamicSchema -> GetColumn(i);
char * Name = column[i] -> GetName();
TD_DataType dataType = (TD_DataType)column[i] -> GetDataType();
int precision = column[i] -> GetPrecision();
int scale = column[i] -> GetScale();
cout << endl;
cout << "Column:" << i+1 << " Info" <<endl;
cout << "Name:" << Name << endl;
cout << "DataType:" << dataType << endl;
cout << "Precision:" << precision << endl;
cout << "scale:" << scale << endl;
}
}
else
{
printf(" GetSchema failed with status != %d", returnValue);
}
/**********************************************
* Query Version Number
**********************************************/
queryStatus = expConn -> GetEvent(TD_Evt_Version, &dataptr, &datalen);
cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
cout << " VERSION INFO " << endl;
cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
if ( queryStatus < TD_Error && queryStatus != TD_Unavailable )
{
cout << "TPTAPI Version :";
cout << *((char**)dataptr) << endl;
cout << "Operator Version:";
cout << *((char**)(dataptr+sizeof(char**))) << endl;
}
else
{
printStatus("TD_Evt_Version event failed with status ", queryStatus);
}
/**********************************************
* Add DMLGroups
**********************************************/
TD_Index dmlGroupIndex = 0;
DMLGroup *dmlGr = new DMLGroup();
dmlGr -> AddStatement(ins_statement);
ldConn -> AddDMLGroup(dmlGr, &dmlGroupIndex);
cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
cout << " LOAD OPERATOR CONNECTION INFO " << endl;
cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
cout << "Operator type :LOAD" << endl;
cout << "TDP ID :" << tdp_id << endl;
cout << "TDP username :" << user_name << endl;
cout << "TDP password :<not displayable>" << endl;
cout << "Target Table :" << target_table << endl;
cout << "DML Statement :" << ins_statement << endl;
/**********************************************
* Initiate Load Driver
**********************************************/
cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
cout << " INITIATE PHASE " << endl;
cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
ldStatus = ldConn -> Initiate();
if ( ldStatus < TD_Error )
{
cout << "Load Driver initiated Successfully" << endl;
/**********************************************
* Query Version Number
**********************************************/
queryStatus = ldConn -> GetEvent(TD_Evt_Version, &dataptr, &datalen);
cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
cout << " VERSION INFO " << endl;
cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
if ( queryStatus < TD_Error && queryStatus != TD_Unavailable )
{
cout << "TPTAPI Version :";
cout << *((char**)dataptr) << endl;
cout << "Operator Version:";
cout << *((char**)(dataptr+sizeof(char**))) << endl;
}
else
{
printStatus("TD_Evt_Version event failed with status ", queryStatus);
}
/**********************************************
* Acquisition
**********************************************/
cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
cout << " ACQUISITION PHASE " << endl;
cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
char* dataBuffer;
TD_Length dataLen = 0;
int count = 0;
int transferDone = 0;
while(!transferDone)
{
/* Get a buffer from Export */
expStatus = expConn -> GetRow(&dataBuffer, &dataLen);
if ( expStatus >= TD_Error )
{
sprintf(msg,"GetBuffer failed on buffer %d with status ", count+1);
printStatus(msg, expStatus);
transferDone = 1;
}
else if ( expStatus == TD_END_Method )
{
cout << "End of Data Reached" << endl;
transferDone = 1;
}
else
{
/* Pass Buffer to Load */
ldStatus = ldConn -> PutRow(dataBuffer, dataLen);
if ( ldStatus != TD_Success )
{
sprintf(msg,"PutBuffer failed on buffer %d with status ", count+1);
printStatus(msg, ldStatus);
transferDone = 1;
}
else
{
count++;
/* Get Load Row Counts */
}
}
}
if ( ldStatus < TD_Error && expStatus < TD_Error )
{
/**********************************************
* End Acquisition
**********************************************/
ldStatus = ldConn -> EndAcquisition();
if ( ldStatus < TD_Error )
{
cout << "Acquisition completed Successfully" << endl;
/* Get Load Row Counts */
queryStatus = ldConn -> GetEvent(TD_Evt_RowCounts, &dataptr, \
&datalen);
if ( queryStatus < TD_Error && queryStatus != TD_Unavailable )
{
ldCount = *((RowCounts*)dataptr);
cout << "Rows Received :";
cout << ldCount.RowsReceived << endl;
cout << "Rows Sent :";
cout << ldCount.RowsSent << endl;
cout << "Rows Applied :";
cout << ldCount.RowsApplied << endl;
}
else
{
cout << "TD_Evt_RowCounts Info Not Available" << \
queryStatus << endl;
}
/**********************************************
* Application
**********************************************/
cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
cout << " APPLICATION PHASE " << endl;
cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
ldStatus = ldConn -> ApplyRows();
if ( ldStatus < TD_Error )
{
cout << "Rows Applied Successfully" << endl;
cout << "Transfer completed successfully" << endl;
cout << "Transferred " << count << " buffers" << endl;
/* Get Export Row Count */
queryStatus = expConn -> GetEvent(TD_Evt_ExportCount,\
&dataptr, &datalen);
if ( queryStatus < TD_Error && queryStatus != TD_Unavailable )
{
cout << "TD_Evt_ExportCount:";
cout << *((int*)dataptr) << endl;
}
else
{
printStatus("TD_Evt_ExportCount event failed with status", \
queryStatus);
}
/* Get Load Apply Count */
queryStatus = ldConn -> GetEvent(TD_Evt_ApplyCount, &dataptr, \
&datalen);
if ( queryStatus < TD_Error && queryStatus != TD_Unavailable )
{
cout << "TD_Evt_ApplyCount :";
cout << *((int*)dataptr) << endl;
}
else
{
printStatus("TD_Evt_ApplyCount event failed with status", \
queryStatus);
}
}
else
{
//Get Error Information
cout << "Error occured during Application" << endl;
ldConn -> GetErrorInfo(&errorMsg, &errorType);
if ( errorMsg != NULL )
{
cout << errorMsg << endl;
cout << "Type: " << errorType << endl;
}
else
{
cout << "No Error Info Available" << endl;
}
}
}
else
{
//Get Error Information
cout << "Error occured during EndAcquisition" << endl;
ldConn -> GetErrorInfo(&errorMsg, &errorType);
if ( errorMsg != NULL )
{
cout << errorMsg << endl;
cout << "Type: " << errorType << endl;
}
else
{
cout << "No Error Info Available" << endl;
}
}
}
else
{
//Get Error Information
if ( ldStatus >= TD_Error )
{
cout << "Load Error occured during Acquisition" << endl;
ldConn -> GetErrorInfo(&errorMsg, &errorType);
if ( errorMsg != NULL )
{
cout << errorMsg << endl;
cout << "Type: " << errorType << endl;
}
else
{
cout << "No Error Info Available" << endl;
}
}
if ( expStatus >= TD_Error )
{
cout << "Export Error occured during Acquisition" << endl;
expConn -> GetErrorInfo(&errorMsg, &errorType);
if ( errorMsg != NULL )
{
cout << errorMsg << endl;
cout << "Type: " << errorType << endl;
}
else
{
cout << "No Error Info Available" << endl;
}
}
}
}
else
{
//Get Error Information
cout << "Error occured during Load Initiate" << endl;
ldConn -> GetErrorInfo(&errorMsg, &errorType);
if ( errorMsg != NULL )
{
cout << errorMsg << endl;
cout << "Type: " << errorType << endl;
}
else
{
cout << "No Error Info Available" << endl;
}
}
/* Terminate Drivers
**********************************************/
cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
cout << " TERMINATE PHASE " << endl;
cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
ldStatus = ldConn -> Terminate();
expStatus = expConn -> Terminate();
if ( ldStatus >= TD_Error || expStatus >= TD_Error )
{
/* Get Error Information */
if ( ldStatus >= TD_Error )
{
cout << "Error occured during Load Terminate" << endl;
ldConn -> GetErrorInfo(&errorMsg, &errorType);
if ( errorMsg != NULL )
{
cout << errorMsg << endl;
cout << "Type: " << errorType << endl;
}
else
{
cout << "No Error Info Available" << endl;
}
}
else
{
cout << "Load Terminated Successfully" << endl;
}
if ( expStatus >= TD_Error )
{
cout << "Error occured during Export Terminate" << endl;
expConn -> GetErrorInfo(&errorMsg, &errorType);
if ( errorMsg != NULL )
{
cout << errorMsg << endl;
cout << "Type: " << errorType << endl;
}
else
{
cout << "No Error Info Available" << endl;
}
}
else
{
cout << "Export Terminated Successfully" << endl;
}
}
/**********************************************
* Clean Up
**********************************************/
cout << "Deleting objects" << endl;
delete dmlGr;
delete dynamicSchema;
delete ldConn;
delete expConn;
free(user_name);
free(user_password);
free(target_table);
free(target_table_export);
free(tdp_id);
free(log_table);
free(error_table_1);
free(error_table_2);
free(ins_statement);
free(sel_statement);
free(msg);
cout << "*** Export to Load Complete ***" << endl;
cout <<"\n######################## OUTPUT END ####################" << endl;
}
}
/*************************** Utility Functions *******************************/
/***********************************************
* Function prints string version of status code
************************************************/
void GenericDriver::printStatus(char* msg, int code)
{
cout << msg;
switch(code)
{
case TD_Success:
cout << "TD_Success";
break;
case TD_SYNC_Barrier:
cout << "TD_SYNC_Barrier";
break;
case TD_SYNC_TELINFO:
cout << "TD_SYNC_TELINFO";
break;
case TD_END_Method:
cout << "TD_END_Method";
break;
case TD_Unavailable:
cout << "TD_Unavailable";
break;
case TD_Error:
cout << "Error Code: TD_Error";
break;
default:
cout << "Error Code: " << code;
break;
}
cout << endl;
}
/***************************************
* Function sets input file name
***************************************/
void GenericDriver::setInput(char* fname)
{
if( inputFileName != NULL )
{
delete inputFileName;
}
inputFileName = strdup(fname);
}
/***************************************
* Function sets output file name
***************************************/
void GenericDriver::setOutput(char* fname)
{
if( outputFileName != NULL )
{
delete outputFileName;
}
outputFileName = strdup(fname);
}
/***************************************
* Function reads in a row of data
***************************************/
int GenericDriver::GetRowData(char* rowBuffer, size_t rowLength)
{
unsigned short rlength;
//Close File if End Signal Received
if ( rowLength <= 0 )
{
if ( inputStream != NULL )
{
fclose(inputStream);
}
return -1;
}
//Open File if not already open
if( inputStream == NULL )
{
inputStream = fopen(inputFileName, "rb");
if( inputStream == NULL )
{
cout << "Error opening file " << inputFileName << endl;
return -1;
}
}
//Read the lenght of each record..read first two bytes
size_t recLen =fread(rowBuffer, 1, (size_t)2, inputStream);
if ( recLen < 1 )
{
if ( feof(inputStream) )
{
cout << "End of file reached" << endl;
if( inputStream != NULL )
{
fclose(inputStream);
}
return -1;
}
else
{
/* return some read error here and exit */
cout << "read error" << endl;
if( inputStream != NULL )
{
fclose(inputStream);
}
return -1;
}
}
//read row of data with size rlength
memcpy(&rlength, rowBuffer, sizeof(unsigned short)); /* DR124305 */
size_t recoLen = fread(rowBuffer+2,1,(size_t)rlength+1,inputStream);
return 0;
}
/*****************************************************
* Function writes out a row of data in indicator mode
******************************************************/
int GenericDriver::WriteRowData(char* data, TD_Length dataLen)
{
if( data != NULL )
{
//Open File if not already open
if( outputStream == NULL )
{
outputStream = fopen(outputFileName, "wb");
if( outputStream == NULL )
{
cout << "Error opening file " << outputFileName << endl;
return -1;
}
}
unsigned long i = 0;
unsigned short shortdataLen = (unsigned short)dataLen;
char* rowBuffer = (char*)malloc(dataLen+20);
memcpy(rowBuffer, &shortdataLen, 2);
for(i = 0; i < dataLen; i++)
{
rowBuffer[i+2] = data[i];
}
rowBuffer[i+2] = 0x0a;
//fprintf (stream, "%s\n",rowBuffer+1); // account for indicator byte
fwrite(rowBuffer, dataLen+3, 1, outputStream);
free(rowBuffer);
}
else
{
if( outputStream != NULL )
{
fclose(outputStream);
}
return -1;
}
return 0;
}
/*****************************************************
* Function writes out a buffer of data in indicator mode
******************************************************/
int GenericDriver::WriteBufferData(char* data, TD_Length dataLen)
{
if( data != NULL )
{
//Open File if not already open
if( outputStream == NULL )
{
outputStream = fopen(outputFileName, "wb");
if( outputStream == NULL )
{
cout << "Error opening file " << outputFileName << endl;
return -1;
}
}
fwrite(data, dataLen, 1, outputStream);
}
else
{
{
fclose(outputStream);
}
return -1;
}
return 0;
}