R Table Operator Use Case: Grouping Using K Means | Teradata Vantage - R Table Operator Use Case: Grouping Using K Means - Advanced SQL Engine - Teradata Database

SQL External Routine Programming

Product
Advanced SQL Engine
Teradata Database
Release Number
17.10
Published
July 2021
Language
English (United States)
Last Update
2021-07-27
dita:mapPath
rin1593638965306.ditamap
dita:ditavalPath
rin1593638965306.ditaval
dita:id
B035-1147
lifecycle
previous
Product Category
Teradata Vantage™

This use case uses the iris dataset to group species together using K Means. The entire analysis shown can be done in one query, but it is broken into smaller steps to highlight ExecR's capabilities.

The following tasks are performed:
  • Load the iris dataset into a table.
  • Create the training and test tables and labels. Save the species predictions using knn.
  • Use the predictions previously computed to calculate a ratio.

Load the Dataset

This example shows an INSERT ... SELECT query. The result set from the ExecR query gets inserted into the iris_data table. The example creates a table that has columns that match the columns in the iris dataset in R.

The tdr.TblWrite function is used to write whole data frames to the output stream. The iris dataset is a built-in dataset recognized in R.

create multiset table iris_data(sepal_length float, sepal_width float, petal_length float, petal_width float, species int);

insert iris_data
sel * from td_sysgpl.execr (
  ON (sel * from iris_data) dimension
  ON (sel 1) PARTITION BY 1 
  using
  keepLog(1)
  contract
  (
    'library(tdr);
     on_clause_input_stream <- 0;
     direction <- "R";

     incols <- tdr.GetColDef(on_clause_input_stream, direction);
     tdr.SetOutputColDef(on_clause_input_stream, incols);'
  )
  operator
  (
    'library(tdr);

     stream <- 0;
     options <- 0;
     write_direction <- "W";

     outHandle <- tdr.Open(write_direction, stream, options);
     tdr.TblWrite(outHandle, iris);

     tdr.Close(outHandle);'
  )
) as Rexp;

Select from the iris_data table to show the iris data loaded:

sel * from iris_data;

Result: The following shows partial results from the SELECT statement.

  Sepal.Length Sepal.Width Petal.Length Petal.Width Species
1          5.1         3.5          1.4         0.2       1
2          4.9         3.0          1.4         0.2       1
3          4.7         3.2          1.3         0.2       1
4          4.6         3.1          1.5         0.2       1
5          5.0         3.6          1.4         0.2       1
6          5.4         3.9          1.7         0.4       1
[…]

Because the Operator code is executed in parallel, we need to redistribute the rows so that the Operator code makes sense for the input it gets. In our case, we want to use a fraction of all the iris data to create a training set and then use the rest for the test set.

sel count(*) from iris_data;

*** Query completed. One row found. One column returned.
 *** Total elapsed time was 1 second.

   Count(*)
-----------
        150

The ExecR query below counts the number of rows in the iris_data table on each AMP. The contract will give the definition for the output. The output is one column with the count of the number of rows expressed as an integer. Each row in the output is the result for one AMP.

The Operator code that is run on each AMP reads in the table's rows a chunk at a time and creates them into a data frame using tdr.TblRead. The nrow function in R is used to count the number of rows of the data frame.

SELECT * FROM TD_SYSGPL.ExecR (
  ON (sel * from iris_data)
  USING
    keeplog(1)
    Contract
    (
      'library(tdr)
       streamno_out <- 0
       coltype_integer <-list(datatype="INTEGER_DT", bytesize="SIZEOF_INTEGER")
       coldef <- list(count = coltype_integer)
       tdr.SetOutputColDef(streamno_out, coldef)'
    )

    Operator
    (
     'library(tdr)
      streamno_in <- 0
      streamno_out <- 0

      handle_in <- tdr.Open("R", streamno_in, 0)
      handle_out <- tdr.Open("W", streamno_out, 0)

      rowsread<- 0
      buffSize <- as.integer(12*1024)
      someRows <- tdr.TblRead(handle_in, buffSize)

      while( nrow(someRows) > 0 )
      {
        rowsread<- rowsread + nrow(someRows)
        someRows <- tdr.TblRead(handle_in, buffSize)
      }

      tdr.Close(handle_in)
      dat_export <- data.frame(
        count = as.integer(rowsread)
      )

     tdr.TblWrite(handle_out, dat_export)
     tdr.Close(handle_out)'
    )
  ) as Rexp;

*** Query completed. 4 rows found. One column returned.
 *** Total elapsed time was 1 second.

      count
-----------
         29
         36
         48
         37

This count shows the distribution of rows that each AMP (on a four AMP system) has for its input. If we were to create our model in the Operator now, we'd get four different models trained and tested on different subsets of the same data. We want to generate one model based on a single training set and test set from all of the data.

To keep this use case simple, we will get all the data to be processed onto one AMP. To do this, use the following ON clause combination:

ON (sel 1) PARTITION BY 1

ON (sel * from iris_data) DIMENSION

This will force the iris_data rows onto exactly one AMP. The ON clause specified with the DIMENSION option copies the table for every partition on which the table operator operates. In this case, since we are partitioning by a column with just one value, only one copy of the table will get created.

SELECT * FROM TD_SYSGPL.ExecR (
  ON (sel 1) PARTITION BY 1
  ON (sel * from iris_data) DIMENSION
  USING
    keeplog(1)
    Contract
    (
      'library(tdr)
       streamno_out <- 0
       coltype_integer <-list(datatype="INTEGER_DT", bytesize="SIZEOF_INTEGER")
       coldef <- list(count = coltype_integer)
       tdr.SetOutputColDef(streamno_out, coldef)'
    )

    Operator
    (
     'library(tdr)
      streamno_in <- 1
      streamno_out <- 0

      handle_in <- tdr.Open("R", streamno_in, 0)
      handle_out <- tdr.Open("W", streamno_out, 0)

      rowsread<- 0
      buffSize <- as.integer(12*1024)
      someRows <- tdr.TblRead(handle_in, buffSize)

      while( nrow(someRows) > 0 )
      {
        rowsread<- rowsread + nrow(someRows)
        someRows <- tdr.TblRead(handle_in, buffSize)
      }

      someRows <- tdr.TblRead(handle_in, buffSize)
      rowsread <- rowsread + nrow(someRows)

      tdr.Close(handle_in)
      dat_export <- data.frame(
        count = as.integer(rowsread)
      )

     tdr.TblWrite(handle_out, dat_export)
     tdr.Close(handle_out)'
    )
  ) as Rexp
;

 *** Query completed. One row found. One column returned.
 *** Total elapsed time was 1 second.

      count
-----------
        150

Note, because there is another ON clause, the input stream must be changed to reference the correct input. So, the value of streamno_in changed to 1 in the Operator to match with the iris_data input.

Create the Training and Test Tables and Labels and Save with knn

In this example we read in all of the data created in the previous example then split it into a training set and a test set. We save the correct Species labels for comparison with the predicted labels in a data frame. This example uses serialize to save a data frame of predictions and correct labels for comparison later. The serialized data is saved as a blob in the predictions table.

create table predictions (id int, model blob);

insert predictions
sel * from td_sysgpl.execr (
  ON (sel 1) PARTITION BY 1
  ON (sel * from iris_data) DIMENSION
  using
  keepLog(1)
  contract
  (
    'library(tdr);

     on_clause_output_stream <- 0;

     integer <- list(datatype="INTEGER_DT", bytesize="SIZEOF_INTEGER");
     binary_lob <- tdr.Blob(1024);

     coldefs <- list( key = integer, model = binary_lob );
     tdr.SetOutputColDef(on_clause_output_stream, coldefs);'
  )
  operator
  (
    'library(tdr);
     library(class);

     streamin <- 1;
     streamout <- 0;
     read_direction <- "R";
     write_direction <- "W";

     options <- 0;
     inHandle <- tdr.Open(read_direction, streamin, options);
     outHandle <- tdr.Open(write_direction, streamout, options);

     ### Read Data ###

     buffSize <- as.integer(16*1024)
     data <- tdr.TblRead(inHandle, buffSize);

     ### Process ###

     set.seed(2017);
     ind <- sample(2, nrow(data), replace = TRUE, prob = c(0.67, 0.33));
     training <- data[ind == 1, 1:4];
     test <- data[ind == 2, 1:4];

     train_labels <- data[ind == 1, 5];
     test_labels <- data[ind == 2, 5];

     prediction <- knn(train = training, test = test, cl = train_labels, k=3);
     result <- data.frame(prediction, test_labels);
     
     ### Write Data ###

     saved <- serialize(result, NULL);
     locator <- tdr.LobCol2Loc(streamout, 1);
     tdr.LobAppend( locator, saved);

     tdr.SetAttributeByNdx(outHandle, streamout, list(value = 1L, nullindicator=0), NULL);

     tdr.Write(outHandle);

     tdr.Close(inHandle);
     tdr.Close(outHandle);'
  )
) as Rexp;

Note, that the buffer size allocated to read the table is much larger than the size of the input, so we only need to call tdr.TblRead once. In general, the input should be read into data frames using nrow as in the previous examples.

Once the input is read into the data frame we create a vector of labels using sample, which are used to index the data frame for training and test data. We use two-thirds for the training set and one-third for the test. We then use the knn function provided by the class library in R to generate the predictions. The prediction and the test_labels are put into a data frame and serialized. Finally, they are written to the predictions table.

Use the Predictions to Calculate a Ratio

In this example, we read the predictions data frame that we saved and use it to compare predicted labels with the actual labels. This scenario is useful for saving computed models for reuse or other intermediate results into data frames to use later.

sel count(*) from iris_data;

sel * from predictions;

sel * from td_sysgpl.execr (
  ON (sel 1) PARTITION BY 1
  ON (sel * from predictions) DIMENSION
  using
  keepLog(1)
  contract
  (
    'library(tdr);

     on_clause_output_stream <- 0;

     integer <- list(datatype="INTEGER_DT", bytesize="SIZEOF_INTEGER");
     real <- list(datatype="REAL_DT", bytesize="SIZEOF_REAL");

     coldefs <- list( modelno = integer, right_ratio = real );
     tdr.SetOutputColDef(on_clause_output_stream, coldefs);'
  )
  operator
  (
    'library(tdr);

     streamin <- 1;
     streamout <- 0;
     read_direction <- "R";
     write_direction <- "W";

     options <- 0;
     inHandle <- tdr.Open(read_direction, streamin, options);
     outHandle <- tdr.Open(write_direction, streamout, options);

     ### Read Saved Prediction ###

     tdr.Read(inHandle);
     index <- tdr.GetAttributeByNdx(inHandle, 0L, NULL);
     locator <- tdr.GetAttributeByNdx(inHandle, 1L, NULL);

     inLob <- tdr.LobOpen_CL(locator, 0, 0);
     model <- tdr.LobRead(inLob$contextID, inLob$LOBlen);

     raw <- unlist(model$buffer);
     prediction <- unserialize(raw);
     tdr.Close(inHandle);

     ### Process Ratio ###

     total <- nrow(prediction);
     correct <- Filter( function(x){ x },
                        prediction[[1]] == prediction[[2]] );

     result <- length(correct) / total;

     ### Write Data ###

     tdr.SetAttributeByNdx(outHandle, 0L, list(value = index$value, nullindicator = 0), NULL);
     tdr.SetAttributeByNdx(outHandle, 1L, list(value = result, nullindicator = 0), NULL);

     tdr.Write(outHandle);
     tdr.Close(outHandle);'
  )
) as Rexp;

Related Information

For more information about table operators, see the following:
  • Teradata Vantage™ - SQL Data Manipulation Language, B035-1146. See information about the SELECT table operator.
  • Teradata Vantage™ - SQL Operators and User-Defined Functions, B035-1210.