Sample Export_To_Load Program for Returning the Actual Data Type - Parallel Transporter

Teradata® Parallel Transporter Application Programming Interface Programmer Guide - 17.20

Product
Parallel Transporter
Release Number
17.20
Published
June 2022
Language
English (United States)
Last Update
2022-10-11
dita:mapPath
fag1645201363032.ditamap
dita:ditavalPath
obe1474387269547.ditaval
dita:id
B035-2516
Product Category
Teradata Tools and Utilities
***********************************************************************
* Copyright 2006-2009, 2011 by Teradata Corporation.
* All Rights Reserved.
* TERADATA CONFIDENTIAL AND TRADE SECRET
*
* This copyrighted material is the Confidential, Unpublished
* Property of the Teradata Corporation.  This copyright notice and
* any other copyright notices included in machine readable
* copies must be reproduced on all authorized copies.
*
***********************************************************************/

#include "GenericDriver.h"
#include "OptionsManager.h"
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include "string.h"
#include <vector>
#define MAX 10
using namespace teradata::client::API;

GenericDriver::GenericDriver(OptionsManager * om)
{
  inputStream = NULL;
  outputStream = NULL;
  inputFileName = NULL;
  outputFileName = NULL;
  m_om = om;
}

GenericDriver::~GenericDriver(void)
{
  if( inputFileName != NULL )
  {
    free(inputFileName);
  }
  if( outputFileName != NULL )
  {
    free(outputFileName);
  }
}

/***************************************************/
/* testLoad                                        */
/*                                                 */
/* Tests transferring data from Export's GetBuffer */
/* to Load's PutBuffer. Uses the following steps:  */
/*                                                 */
/* 1. Setup and Initiate Load driver               */
/* 2. Query buffer layout from Load                */
/* 3. Pass Load buffer layout to Export driver     */
/* 4. Initiate Export driver                       */
/* 5. Transfer buffers from GetBuffer to PutBuffer */
/* 6. Call Load's EndAcquisition & ApplyRows       */
/* 7. Terminate Load and Export                    */
/***************************************************/
void GenericDriver::testExportToLoad()
{
  time_t now;
  time(&now);

  int ldStatus = 0;
  int expStatus = 0;
  int queryStatus = 0;
  char *errorMsg = NULL;
  TD_ErrorType errorType;
  RowCounts ldCount;     /* RowCounts struct defined in GetBuffer.h */

  char *dataptr;
  TD_Length datalen;
  char *msg = (char*)malloc(256);

  /**********************************************
  * Retrieve Options From Option File
  **********************************************/

  int numSessions           = m_om -> getNumSessions();
  char *tdp_id              = strdup(m_om -> getTdpId().c_str());
  char *user_name           = strdup(m_om -> getUserName().c_str());
  char *user_password       = strdup(m_om -> getUserPassword().c_str());
  char *target_table        = strdup(m_om -> getTargetTable().c_str());
  char *target_table_export = strdup(m_om -> getTargetTableExport().c_str());
  char *log_table           = strdup(m_om -> getLogTable().c_str());
  char *error_table_1       = strdup(m_om -> getErrorTable1().c_str());
  char *error_table_2       = strdup(m_om -> getErrorTable2().c_str());
  char *ins_statement       = (char*)malloc(1024);
  char *sel_statement       = (char*)malloc(1024);
  sprintf(ins_statement, "INSERT INTO %s;",target_table);
  //sprintf(ins_statement, "INSERT INTO %s( :col1, :col2);",target_table);
  sprintf(sel_statement, "SELECT * FROM %s;", target_table_export);

  cout << "######################### OUTPUT START #########################" \
  << endl;
  cout << "\nTest Description     :GenericDriver sample" << endl;
  cout << "Date                 :" << ctime(&now);

  Connection *ldConn = new Connection();   /* Load */
  Connection *expConn = new Connection();  /* Export */

  /**********************************************
  * Set Operator Type and Trace/Log Levels
  **********************************************/
  ldConn -> AddAttribute(TD_SYSTEM_OPERATOR, TD_LOAD);
  expConn -> AddAttribute(TD_SYSTEM_OPERATOR, TD_EXPORT);
  expConn -> AddAttribute(TD_TRACE_OUTPUT, "exportL.txt");
  expConn -> AddArrayAttribute(TD_TRACE_LEVEL, 2, TD_OPER_ALL,\
  TD_GENERAL, NULL);

  ldConn -> AddAttribute(TD_TRACE_OUTPUT, "load.txt");
  ldConn -> AddArrayAttribute(TD_TRACE_LEVEL, 2, TD_OPER_ALL,\
  TD_GENERAL, NULL);

  /**********************************************
  * Add Attributes
  **********************************************/
  ldConn -> AddAttribute(TD_TDP_ID, tdp_id);
  ldConn -> AddAttribute(TD_USER_NAME, user_name);
  ldConn -> AddAttribute(TD_USER_PASSWORD, user_password);
  ldConn -> AddAttribute(TD_TARGET_TABLE, target_table);
  ldConn -> AddAttribute(TD_LOG_TABLE, log_table);
  ldConn -> AddAttribute(TD_ERROR_TABLE_1, error_table_1);
  ldConn -> AddAttribute(TD_ERROR_TABLE_2, error_table_2);
  ldConn -> AddAttribute(TD_MAX_SESSIONS, numSessions);
  ldConn -> AddAttribute(TD_MIN_SESSIONS, numSessions);
  ldConn -> AddAttribute(TD_WILDCARDINSERT, "Yes");
  //ldConn -> AddAttribute(TD_CHARSET, "UTF8");
  //ldConn -> AddAttribute(TD_CHARSET, "UTF16");

  expConn -> AddAttribute(TD_TDP_ID, tdp_id);
  expConn -> AddAttribute(TD_USER_NAME, user_name);
  expConn -> AddAttribute(TD_USER_PASSWORD, user_password);
  expConn -> AddAttribute(TD_SELECT_STMT, sel_statement);
  expConn -> AddAttribute(TD_MAX_SESSIONS, numSessions);
  expConn -> AddAttribute(TD_MIN_SESSIONS, numSessions);
  expConn -> AddAttribute(TD_RETN_ACT_DATATYPE, "FOO");
  //expConn -> AddAttribute(TD_CHARSET, "UTF8");
  //expConn -> AddAttribute(TD_CHARSET, "UTF16");
  /**********************************************
  * Add Schema
  **********************************************/
  Schema* schema = NULL;

  cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
  cout << "       EXPORT OPERATOR CONNECTION INFO       " << endl;
  cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;

  cout << "Operator type      :EXPORT" << endl;
  cout << "TDP ID             :" << tdp_id  << endl;
  cout << "TDP username       :" << user_name << endl;
  cout << "TDP password       :<not displayable>" << endl;
  cout << "TargetTable        :" << target_table_export << endl;
  cout << "DML Statement      :" << sel_statement << endl;

  /**********************************************
  * Initiate Export Driver
  **********************************************/
  cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
  cout << "             INITIATE PHASE                " << endl;
  cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;

  expStatus = expConn -> Initiate();
  if ( expStatus >= TD_Error )
  {
    //Get Error Information
    cout << "Error occured during Export Initiate" << endl;
    expConn -> GetErrorInfo(&errorMsg, &errorType);

    if ( errorMsg != NULL )
    {
      cout << errorMsg << endl;
      cout << "Type: " << errorType << endl;
    }
    else
    {
      cout << "No Error Info Available" << endl;
    }
  }
  else
  {
    cout << "Export Driver initiated Successfully" << endl;
  /**********************************************
  * Add Schema for load
  **********************************************/
  cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
  cout << "              COLUMN INFO                    " << endl;
  cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;

  Schema* dynamicSchema;
  int returnValue = expConn -> GetSchema(&dynamicSchema);
  ldConn -> AddSchema(dynamicSchema);

  if ( returnValue < TD_Error && returnValue != TD_Unavailable )
  {
     int Count = dynamicSchema -> GetColumnCount();
     vector<Column*> column(MAX);
     cout << "Column Count:" << Count  << endl;
     for (vector<Column*>::size_type i=0; i < Count; i++)
     {
       column[i] = dynamicSchema -> GetColumn(i);
       char * Name = column[i] -> GetName();
       TD_DataType dataType = (TD_DataType)column[i] -> GetDataType();
       int precision = column[i] -> GetPrecision();
       int scale = column[i] -> GetScale();
       cout << endl;
       cout << "Column:" << i+1 << " Info" <<endl;
       cout << "Name:" << Name  << endl;
       cout << "DataType:" << dataType << endl;
       cout << "Precision:" << precision << endl;
       cout << "scale:" << scale << endl;
    }
  }
  else
  {
    printf(" GetSchema failed with status != %d", returnValue);
  }

  /**********************************************
  * Query Version Number
  **********************************************/
  queryStatus = expConn -> GetEvent(TD_Evt_Version, &dataptr, &datalen);

  cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
  cout << "              VERSION INFO                  " << endl;
  cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;

  if ( queryStatus < TD_Error && queryStatus != TD_Unavailable )
  {
    cout << "TPTAPI Version  :";
    cout << *((char**)dataptr) << endl;
    cout << "Operator Version:";
    cout << *((char**)(dataptr+sizeof(char**))) << endl;
  }
  else
  {
    printStatus("TD_Evt_Version event failed with status ", queryStatus);
  }
  /**********************************************
  * Add DMLGroups
  **********************************************/
  TD_Index dmlGroupIndex = 0;

  DMLGroup *dmlGr = new DMLGroup();
  dmlGr -> AddStatement(ins_statement);

  ldConn -> AddDMLGroup(dmlGr, &dmlGroupIndex);

  cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
  cout << "      LOAD OPERATOR CONNECTION INFO        " << endl;
  cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;

  cout << "Operator type     :LOAD" << endl;
  cout << "TDP ID            :" << tdp_id << endl;
  cout << "TDP username      :" << user_name << endl;
  cout << "TDP password      :<not displayable>" << endl;
  cout << "Target Table      :" << target_table << endl;
  cout << "DML Statement     :" << ins_statement << endl;

  /**********************************************
  * Initiate Load Driver
  **********************************************/
  cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
  cout << "               INITIATE PHASE               " << endl;
  cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;

  ldStatus = ldConn -> Initiate();
  if ( ldStatus < TD_Error )
  {
    cout << "Load Driver initiated Successfully" << endl;

  /**********************************************
  * Query Version Number
  **********************************************/
  queryStatus = ldConn -> GetEvent(TD_Evt_Version, &dataptr, &datalen);

  cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
  cout << "              VERSION INFO                  " << endl;
  cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;

  if ( queryStatus < TD_Error && queryStatus != TD_Unavailable )
  {
    cout << "TPTAPI Version  :";
    cout << *((char**)dataptr) << endl;
    cout << "Operator Version:";
    cout << *((char**)(dataptr+sizeof(char**))) << endl;
  }
  else
  {
     printStatus("TD_Evt_Version event failed with status ", queryStatus)
  }

  /**********************************************
  * Acquisition
  **********************************************/
  cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
  cout << "             ACQUISITION PHASE            " << endl;
  cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;

  char* dataBuffer;
  TD_Length dataLen = 0;
  int count = 0;
  int transferDone = 0;

  while(!transferDone)
  {
     /* Get a buffer from Export */
     expStatus = expConn -> GetRow(&dataBuffer, &dataLen);

     if ( expStatus >= TD_Error )
     {
       sprintf(msg,"GetBuffer failed on buffer %d with status ", count+1);
       printStatus(msg, expStatus);
       transferDone = 1;
     }
     else if ( expStatus == TD_END_Method )
     {
       cout << "End of Data Reached" << endl;
       transferDone = 1;
     }
     else
     {
       /* Pass Buffer to Load */
       ldStatus = ldConn -> PutRow(dataBuffer, dataLen);
       if ( ldStatus != TD_Success )
       {
         sprintf(msg,"PutBuffer failed on buffer %d with status ", count+1);
	     printStatus(msg, ldStatus);
	     transferDone = 1;
       }
       else
       {
         count++;
	     /* Get Load Row Counts */
	   }
     }
   }
   if ( ldStatus < TD_Error && expStatus < TD_Error )
   {

     /**********************************************
     * End Acquisition
     **********************************************/

     ldStatus = ldConn -> EndAcquisition();

     if ( ldStatus < TD_Error )
     {
       cout << "Acquisition completed Successfully" << endl;

	   /* Get Load Row Counts */

	   queryStatus = ldConn -> GetEvent(TD_Evt_RowCounts, &dataptr, \
       &datalen);

	   if ( queryStatus < TD_Error && queryStatus != TD_Unavailable )
       {
	     ldCount = *((RowCounts*)dataptr);
         cout << "Rows Received :";
         cout << ldCount.RowsReceived << endl;
         cout << "Rows Sent     :";
	     cout << ldCount.RowsSent << endl;
	     cout << "Rows Applied  :";
	     cout << ldCount.RowsApplied << endl;
	   }
       else
       {
         cout << "TD_Evt_RowCounts Info Not Available" << \
         queryStatus << endl;
	   }

	   /**********************************************
	   * Application
	   **********************************************/
       cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
       cout << "             APPLICATION PHASE             " << endl;
       cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;

	   ldStatus = ldConn -> ApplyRows();

	   if ( ldStatus < TD_Error )
	   {
        cout << "Rows Applied Successfully" << endl;
	     cout << "Transfer completed successfully" << endl;
	     cout << "Transferred " << count << " buffers" << endl;

	     /* Get Export Row Count */
	     queryStatus = expConn -> GetEvent(TD_Evt_ExportCount,\
         &dataptr, &datalen);

	     if ( queryStatus < TD_Error && queryStatus != TD_Unavailable )
         {
	       cout << "TD_Evt_ExportCount:";
	       cout << *((int*)dataptr) << endl;
	     }
         else
         {
	        printStatus("TD_Evt_ExportCount event failed with status", \
            queryStatus);
	     }

	     /* Get Load Apply Count */
	     queryStatus = ldConn -> GetEvent(TD_Evt_ApplyCount, &dataptr, \

         &datalen);

	     if ( queryStatus < TD_Error && queryStatus != TD_Unavailable )
         {
	       cout << "TD_Evt_ApplyCount :";
	       cout << *((int*)dataptr) << endl;
 	     }
         else
         {
	       printStatus("TD_Evt_ApplyCount event failed with status", \
           queryStatus);
	     }
	   }
       else
       {
         //Get Error Information

         cout << "Error occured during Application" << endl;
	     ldConn -> GetErrorInfo(&errorMsg, &errorType);

	     if ( errorMsg != NULL )
         {
	       cout << errorMsg << endl;
	       cout << "Type: " << errorType << endl;
	     }
         else
         {
	       cout << "No Error Info Available" << endl;
	     }
	   }
	 }
     else
     {
        //Get Error Information
	    cout << "Error occured during EndAcquisition" << endl;
	    ldConn -> GetErrorInfo(&errorMsg, &errorType);
	    if ( errorMsg != NULL )
        {
	      cout << errorMsg << endl;
	      cout << "Type: " << errorType << endl;
	    }
        else
        {
	      cout << "No Error Info Available" << endl;
	    }
      }
	}
    else
    {
      //Get Error Information
      if ( ldStatus >= TD_Error )
      {
        cout << "Load Error occured during Acquisition" << endl;
	    ldConn -> GetErrorInfo(&errorMsg, &errorType);
	    if ( errorMsg != NULL )
        {
	      cout << errorMsg << endl;
	      cout << "Type: " << errorType << endl;
	    }
        else
		{
	      cout << "No Error Info Available" << endl;
	    }
      }
      if ( expStatus >= TD_Error )
      {
	    cout << "Export Error occured during Acquisition" << endl;
	    expConn -> GetErrorInfo(&errorMsg, &errorType);
	    if ( errorMsg != NULL )
        {
	      cout << errorMsg << endl;
	      cout << "Type: " << errorType << endl;
	    }
        else
        {
	      cout << "No Error Info Available" << endl;
	    }
	  }
	}
 }
  else
  {
    //Get Error Information
    cout << "Error occured during Load Initiate" << endl;
    ldConn -> GetErrorInfo(&errorMsg, &errorType);
    if ( errorMsg != NULL )
    {
      cout << errorMsg << endl;
      cout << "Type: " << errorType << endl;
    }
    else
    {
      cout << "No Error Info Available" << endl;
    }
  }

    /* Terminate Drivers
    **********************************************/
    cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
    cout << "              TERMINATE PHASE                " << endl;
    cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;

    ldStatus = ldConn -> Terminate();
    expStatus = expConn -> Terminate();

    if ( ldStatus >= TD_Error || expStatus >= TD_Error )
    {
      /* Get Error Information */

      if ( ldStatus >= TD_Error )
      {
        cout << "Error occured during Load Terminate" << endl;
        ldConn -> GetErrorInfo(&errorMsg, &errorType);

        if ( errorMsg != NULL )
        {
          cout << errorMsg << endl;
          cout << "Type: " << errorType << endl;
        }
        else
        {
          cout << "No Error Info Available" << endl;
        }
      }
      else
      {
        cout << "Load Terminated Successfully" << endl;
      }
      if ( expStatus >= TD_Error )
      {
        cout << "Error occured during Export Terminate" << endl;
        expConn -> GetErrorInfo(&errorMsg, &errorType);
        if ( errorMsg != NULL )
        {
          cout << errorMsg << endl;
          cout << "Type: " << errorType << endl;
        }
        else
        {
          cout << "No Error Info Available" << endl;
        }
      }
      else
      {
        cout << "Export Terminated Successfully" << endl;
      }
    }
    /**********************************************
    * Clean Up
    **********************************************/

    cout << "Deleting objects" << endl;

    delete dmlGr;
    delete dynamicSchema;
    delete ldConn;
    delete expConn;

    free(user_name);
    free(user_password);
    free(target_table);
    free(target_table_export);
    free(tdp_id);
    free(log_table);
    free(error_table_1);
    free(error_table_2);
    free(ins_statement);
    free(sel_statement);
    free(msg);

   cout << "*** Export to Load Complete ***" << endl;
    cout <<"\n######################## OUTPUT END ####################" << endl;
 }
}

/*************************** Utility Functions *******************************/

/***********************************************
* Function prints string version of status code
************************************************/
void GenericDriver::printStatus(char* msg, int code)
{
  cout << msg;

  switch(code)
  {
    case TD_Success:
	 cout << "TD_Success";
	 break;
    case TD_SYNC_Barrier:
	 cout << "TD_SYNC_Barrier";
	 break;
    case TD_SYNC_TELINFO:
	 cout << "TD_SYNC_TELINFO";
	 break;
    case TD_END_Method:
	 cout << "TD_END_Method";
	 break;
    case TD_Unavailable:
	 cout << "TD_Unavailable";
	 break;
    case TD_Error:
	 cout << "Error Code: TD_Error";
	 break;
    default:
     cout << "Error Code: " << code;
	 break;
  }
  cout << endl;
}

/***************************************
* Function sets input file name
***************************************/
void GenericDriver::setInput(char* fname)
{
  if( inputFileName != NULL )
  {
    delete inputFileName;
  }
  inputFileName = strdup(fname);
}

/***************************************
* Function sets output file name
***************************************/
void GenericDriver::setOutput(char* fname)
{
  if( outputFileName != NULL )
  {
    delete outputFileName;
  }
  outputFileName = strdup(fname);
}

/***************************************
* Function reads in a row of data
***************************************/
int GenericDriver::GetRowData(char* rowBuffer, size_t rowLength)
{
  unsigned short rlength;

  //Close File if End Signal Received
  if ( rowLength <= 0 )
  {
    if ( inputStream != NULL )
    {
      fclose(inputStream);
    }
    return -1;
  }

  //Open File if not already open
  if( inputStream == NULL )
  {
    inputStream = fopen(inputFileName, "rb");

    if( inputStream == NULL )
    {
      cout << "Error opening file " << inputFileName << endl;
      return -1;
    }
  }

  //Read  the lenght of each record..read first two bytes
  size_t recLen =fread(rowBuffer, 1, (size_t)2, inputStream);

  if ( recLen < 1 )
  {
    if ( feof(inputStream) )
    {
      cout << "End of file reached" << endl;
      if( inputStream != NULL )
      {
        fclose(inputStream);
      }
      return -1;
    }
    else
    {
      /* return some read error here and exit */
      cout << "read error" << endl;
      if( inputStream != NULL )
      {
        fclose(inputStream);
      }
      return -1;
    }
  }
  //read row of data with size rlength

  memcpy(&rlength, rowBuffer, sizeof(unsigned short));   /* DR124305 */
  size_t recoLen = fread(rowBuffer+2,1,(size_t)rlength+1,inputStream);

  return 0;
}

/*****************************************************
* Function writes out a row of data in indicator mode
******************************************************/

int GenericDriver::WriteRowData(char* data, TD_Length dataLen)
{
  if( data != NULL )
  {
    //Open File if not already open
    if( outputStream == NULL )
    {
      outputStream = fopen(outputFileName, "wb");

      if( outputStream == NULL )
      {
	    cout << "Error opening file " << outputFileName << endl;
	    return -1;
      }
    }

    unsigned long i = 0;
    unsigned short shortdataLen = (unsigned short)dataLen;
    char* rowBuffer = (char*)malloc(dataLen+20);

    memcpy(rowBuffer, &shortdataLen, 2);

    for(i = 0; i < dataLen; i++)
    {
      rowBuffer[i+2] = data[i];

    }
    rowBuffer[i+2] = 0x0a;
    //fprintf (stream, "%s\n",rowBuffer+1); // account for indicator byte
    fwrite(rowBuffer, dataLen+3, 1, outputStream);

    free(rowBuffer);
  }
  else
  {
    if( outputStream != NULL )
    {
      fclose(outputStream);
    }
    return -1;
  }
  return 0;
}

/*****************************************************
* Function writes out a buffer of data in indicator mode
******************************************************/
int GenericDriver::WriteBufferData(char* data, TD_Length dataLen)
{
  if( data != NULL )
  {
    //Open File if not already open
    if( outputStream == NULL )
    {
      outputStream = fopen(outputFileName, "wb");

      if( outputStream == NULL )
      {
        cout << "Error opening file " << outputFileName << endl;
        return -1;
      }
    }
    fwrite(data, dataLen, 1, outputStream);
  }
  else
  {

    {
      fclose(outputStream);
    }
    return -1;
  }
  return 0;
}