17.00 - 17.05 - Rテーブル演算子の使用例:K-Meansを使用したグループ化 - Advanced SQL Engine - Teradata Database

Teradata Vantage™ - SQL外部ルーチン プログラミング

Product
Advanced SQL Engine
Teradata Database
Release Number
17.00
17.05
Published
2020年6月
Content Type
プログラミング リファレンス
Publication ID
B035-1147-170K-JPN
Language
日本語 (日本)

この使用例では、アイリス データセットを使用して、K-Meansで種のグループ化を行ないます。表示される分析全体は1つのクエリーで実行できますが、ExecRの機能を強調表示するために、より小さなステップに分割されます。

次のタスクが実行されます。
  • アイリス データセットをテーブルに読み込みます。
  • トレーニングならびにテスト テーブルおよびラベルを作成します。knnを使用して種の予測を保存します。
  • 以前に計算した予測を使用して比率を計算します。

データセットの読み込み

この例では、INSERT SELECTクエリーを示します。ExecRクエリ―からの結果セットがiris_dataテーブルに挿入されます。この例では、Rのアイリス データセットの列に一致する列を持つテーブルを作成します。

tdr.TblWrite関数は、データ フレーム全体を出力ストリームに書き込むために使用されます。アイリス データセットは、Rで認識される組み込みのデータセットです。

create multiset table iris_data(sepal_length float, sepal_width float, petal_length float, petal_width float, species int);

insert iris_data
sel * from td_sysgpl.execr (
  ON (sel * from iris_data) dimension
  ON (sel 1) PARTITION BY 1 
  using
  keepLog(1)
  contract
  (
    'library(tdr);
     on_clause_input_stream <- 0;
     direction <- "R";

     incols <- tdr.GetColDef(on_clause_input_stream, direction);
     tdr.SetOutputColDef(on_clause_input_stream, incols);'
  )
  operator
  (
    'library(tdr);

     stream <- 0;
     options <- 0;
     write_direction <- "W";

     outHandle <- tdr.Open(write_direction, stream, options);
     tdr.TblWrite(outHandle, iris);

     tdr.Close(outHandle);'
  )
) as Rexp;

Iris_dataテーブルから選択して、読み込まれたアイリス データを表示します。

sel * from iris_data;

結果: SELECT文の部分的な結果を次に示します。

  Sepal.Length Sepal.Width Petal.Length Petal.Width Species
1          5.1         3.5          1.4         0.2       1
2          4.9         3.0          1.4         0.2       1
3          4.7         3.2          1.3         0.2       1
4          4.6         3.1          1.5         0.2       1
5          5.0         3.6          1.4         0.2       1
6          5.4         3.9          1.7         0.4       1
[…]

オペレータ コードは並行して実行されるため、オペレータ コードが取得する入力に対して意味を持つように、行を再配置する必要があります。ここでは、すべてのアイリス データの少数を使用してトレーニング セットを作成し、残りのテスト セットを使用します。

sel count(*) from iris_data;

*** Query completed. One row found. One column returned.
 *** Total elapsed time was 1 second.

   Count(*)
-----------
        150

以下のExecRクエリーは、各AMPのiris_dataテーブルの行数をカウントします。コントラクトは、出力の定義を提供します。出力は、整数として表わされる行数のカウントを持つ1つの列です。出力の各行は、1つのAMPの結果です。

各AMPで実行されるオペレータ コードは、テーブルの行のチャンクを一度に読み取り、tdr.TblReadを使用してデータ フレームに作成します。Rのnrow関数は、データ フレームの行数をカウントするために使用されます。

SELECT * FROM TD_SYSGPL.ExecR (
  ON (sel * from iris_data)
  USING
    keeplog(1)
    Contract
    (
      'library(tdr)
       streamno_out <- 0
       coltype_integer <-list(datatype="INTEGER_DT", bytesize="SIZEOF_INTEGER")
       coldef <- list(count = coltype_integer)
       tdr.SetOutputColDef(streamno_out, coldef)'
    )

    Operator
    (
     'library(tdr)
      streamno_in <- 0
      streamno_out <- 0

      handle_in <- tdr.Open("R", streamno_in, 0)
      handle_out <- tdr.Open("W", streamno_out, 0)

      rowsread<- 0
      buffSize <- as.integer(12*1024)
      someRows <- tdr.TblRead(handle_in, buffSize)

      while( nrow(someRows) > 0 )
      {
        rowsread<- rowsread + nrow(someRows)
        someRows <- tdr.TblRead(handle_in, buffSize)
      }

      tdr.Close(handle_in)
      dat_export <- data.frame(
        count = as.integer(rowsread)
      )

     tdr.TblWrite(handle_out, dat_export)
     tdr.Close(handle_out)'
    )
  ) as Rexp;

*** Query completed. 4 rows found. One column returned.
 *** Total elapsed time was 1 second.

      count
-----------
         29
         36
         48
         37

このカウントは、(4つのAMPシステム上の)各AMPが入力に対して持つ行の分散を示します。オペレータでモデルを作成するとしたら、同一データの異なるサブセットでトレーニングしテストした種類の異なる4つのモデルを取得できます。すべてのデータから1つのトレーニング セットとテスト セットを基にしたモデルを生成します。

この使用例の簡略化のために、すべてのデータを1つのAMPで処理されるようにします。これを行なうには、次のON句の組み合わせを使用します:

ON (sel 1) PARTITION BY 1

ON (sel * from iris_data) DIMENSION

これにより、iris_data行が1つのAMPに正確に強制実行されます。DIMENSIONオプションで指定されたON句は、テーブル演算子が動作するすべてのパーティション テーブルをコピーします。この場合、1つの値を持つ列によってパーティション分割されるため、テーブルのコピーは1つだけ作成されます。

SELECT * FROM TD_SYSGPL.ExecR (
  ON (sel 1) PARTITION BY 1
  ON (sel * from iris_data) DIMENSION
  USING
    keeplog(1)
    Contract
    (
      'library(tdr)
       streamno_out <- 0
       coltype_integer <-list(datatype="INTEGER_DT", bytesize="SIZEOF_INTEGER")
       coldef <- list(count = coltype_integer)
       tdr.SetOutputColDef(streamno_out, coldef)'
    )

    Operator
    (
     'library(tdr)
      streamno_in <- 1
      streamno_out <- 0

      handle_in <- tdr.Open("R", streamno_in, 0)
      handle_out <- tdr.Open("W", streamno_out, 0)

      rowsread<- 0
      buffSize <- as.integer(12*1024)
      someRows <- tdr.TblRead(handle_in, buffSize)

      while( nrow(someRows) > 0 )
      {
        rowsread<- rowsread + nrow(someRows)
        someRows <- tdr.TblRead(handle_in, buffSize)
      }

      someRows <- tdr.TblRead(handle_in, buffSize)
      rowsread <- rowsread + nrow(someRows)

      tdr.Close(handle_in)
      dat_export <- data.frame(
        count = as.integer(rowsread)
      )

     tdr.TblWrite(handle_out, dat_export)
     tdr.Close(handle_out)'
    )
  ) as Rexp
;

 *** Query completed. One row found. One column returned.
 *** Total elapsed time was 1 second.

      count
-----------
        150

注:別のON句があるため、入力ストリームは正しい入力を参照するように変更する必要があります。したがって、streamno_inの値は、iris_data入力と一致するようにオペレータの1に変更されます。

トレーニングとテスト テーブルとラベルを作成し、knnで保存します。

この例では、前の例で作成したすべてのデータを読み、トレーニング セットとテスト セットに分割します。データ フレーム内の予測ラベルと比較するために正しい種のラベルを保存します。この例では、シリアライズを使用して予測のデータ フレームを保存し、後で比較するためにラベルを改訂します。シリアライズされたデータは、予測テーブルにblobとして保存されます。

create table predictions (id int, model blob);

insert predictions
sel * from td_sysgpl.execr (
  ON (sel 1) PARTITION BY 1
  ON (sel * from iris_data) DIMENSION
  using
  keepLog(1)
  contract
  (
    'library(tdr);

     on_clause_output_stream <- 0;

     integer <- list(datatype="INTEGER_DT", bytesize="SIZEOF_INTEGER");
     binary_lob <- tdr.Blob(1024);

     coldefs <- list( key = integer, model = binary_lob );
     tdr.SetOutputColDef(on_clause_output_stream, coldefs);'
  )
  operator
  (
    'library(tdr);
     library(class);

     streamin <- 1;
     streamout <- 0;
     read_direction <- "R";
     write_direction <- "W";

     options <- 0;
     inHandle <- tdr.Open(read_direction, streamin, options);
     outHandle <- tdr.Open(write_direction, streamout, options);

     ### Read Data ###

     buffSize <- as.integer(16*1024)
     data <- tdr.TblRead(inHandle, buffSize);

     ### Process ###

     set.seed(2017);
     ind <- sample(2, nrow(data), replace = TRUE, prob = c(0.67, 0.33));
     training <- data[ind == 1, 1:4];
     test <- data[ind == 2, 1:4];

     train_labels <- data[ind == 1, 5];
     test_labels <- data[ind == 2, 5];

     prediction <- knn(train = training, test = test, cl = train_labels, k=3);
     result <- data.frame(prediction, test_labels);
     
     ### Write Data ###

     saved <- serialize(result, NULL);
     locator <- tdr.LobCol2Loc(streamout, 1);
     tdr.LobAppend( locator, saved);

     tdr.SetAttributeByNdx(outHandle, streamout, list(value = 1L, nullindicator=0), NULL);

     tdr.Write(outHandle);

     tdr.Close(inHandle);
     tdr.Close(outHandle);'
  )
) as Rexp;

テーブルを読み取るために割り当てられたバッファ サイズは入力のサイズよりもはるかに大きく、tdr.TblReadの呼び出しは一度のみ必要であることにご注意ください。一般的には、前例のようにnrowを使用して、入力をデータ フレームに読み込む必要があります。

入力がデータ フレームに読み取られると、サンプルを使用してベクトルのラベルを作成し、これを使用してトレーニングおよびテスト データのデータ フレームをインデックス化します。トレーニング セットには3分の2を使用し、テストのための3分の1を使います。次に、Rのクラス ライブラリによって提供されるknn関数を使用して、予測を生成します。予測とtest_labelsをデータ フレームに入れてシリアライズします。最後に、予測テーブルに書き込まれます。

予測を使用して比率を計算する

この例では、保存した予測データ フレームを読み取って、予測されたラベルと実際のラベルを比較します。このシナリオは、後で使用するデータ フレームに再利用またはその他の中間結果の計算モデルを保存する場合に便利です。

sel count(*) from iris_data;

sel * from predictions;

sel * from td_sysgpl.execr (
  ON (sel 1) PARTITION BY 1
  ON (sel * from predictions) DIMENSION
  using
  keepLog(1)
  contract
  (
    'library(tdr);

     on_clause_output_stream <- 0;

     integer <- list(datatype="INTEGER_DT", bytesize="SIZEOF_INTEGER");
     real <- list(datatype="REAL_DT", bytesize="SIZEOF_REAL");

     coldefs <- list( modelno = integer, right_ratio = real );
     tdr.SetOutputColDef(on_clause_output_stream, coldefs);'
  )
  operator
  (
    'library(tdr);

     streamin <- 1;
     streamout <- 0;
     read_direction <- "R";
     write_direction <- "W";

     options <- 0;
     inHandle <- tdr.Open(read_direction, streamin, options);
     outHandle <- tdr.Open(write_direction, streamout, options);

     ### Read Saved Prediction ###

     tdr.Read(inHandle);
     index <- tdr.GetAttributeByNdx(inHandle, 0L, NULL);
     locator <- tdr.GetAttributeByNdx(inHandle, 1L, NULL);

     inLob <- tdr.LobOpen_CL(locator, 0, 0);
     model <- tdr.LobRead(inLob$contextID, inLob$LOBlen);

     raw <- unlist(model$buffer);
     prediction <- unserialize(raw);
     tdr.Close(inHandle);

     ### Process Ratio ###

     total <- nrow(prediction);
     correct <- Filter( function(x){ x },
                        prediction[[1]] == prediction[[2]] );

     result <- length(correct) / total;

     ### Write Data ###

     tdr.SetAttributeByNdx(outHandle, 0L, list(value = index$value, nullindicator = 0), NULL);
     tdr.SetAttributeByNdx(outHandle, 1L, list(value = result, nullindicator = 0), NULL);

     tdr.Write(outHandle);
     tdr.Close(outHandle);'
  )
) as Rexp;

関連情報

テーブル演算子の詳細については、以下を参照してください。
  • <Teradata Vantage™ - SQLデータ操作言語、B035-1146>のSELECTテーブル演算を参照してください。
  • <Teradata Vantage™ - SQL演算子およびユーザー定義関数、B035-1210>の「テーブル演算子」