2013/09/11

[azure][blob][hdinsight][php]WindowsAzureの各種ストレージを外部から利用したサンプル

ちょっとWindows Azureを触る機会があったので、いろいろと試してみました。
基本的にはMSのドキュメントが結構丁寧に記載されており困る事はないのですが、 ローカルや外部サーバーから自動で動かしたかったのでレガシーに書いてみました。

流れとしてはローカルのファイルをBLOBにアップして、HDInsghitで処理した結果をSQLServerに登録するといった感じです。

BLOBのデータアクセスはAzureのSDKを利用します。
※HTTP_Request2やNet_URL2を利用しているのでPEARからインストールが必要です。

HDInsightはcurlを利用してAPI経由でアクセスします。
APIの使い方はTempltonの仕様を確認してください。
・HDInsight クラスター上でリモート ジョブをプログラムから実行
・Templeton

SQLServer周りの設定は下記の記事を参考にさせて頂きました。
http://codezine.jp/article/detail/5736
http://dupont-kedama.blogspot.jp/2012/12/sqlsrv.html

サンプルは過去2年分の日経平均のデータをHDInsighntに読み込ませ、それをダウンロードしてきてSQLServerに登録する何の意味もないコードです。
※ソースはgithubにも上げています。
createBlobService($connectionString);

 //-■■■■■-ローカルのデータを取り込んでBLOBファイルにアップロード-■■■■■-
    $blobRestProxy->createBlockBlob(BLOB_CONTAINER, 'sample/nikkei_avg255.csv', file_get_contents("data/nikkei_avg255.csv"));


    //-■■■■■-HDInsightにデータ登録-■■■■■-
 $output_path="sample/output/";

 $ch = curl_init(HD_ENDPOINT);

 // 返り値を文字列として受け取る
 curl_setopt($ch, CURLOPT_RETURNTRANSFER, TRUE);
 curl_setopt($ch, CURLOPT_USERPWD, HD_USERID.":".HD_USERPW);
 //CA証明書の検証をしない
 curl_setopt($ch,CURLOPT_SSL_VERIFYPEER,false);
 curl_setopt($ch, CURLOPT_VERBOSE, true);
 // POSTするデータをセット
 curl_setopt($ch,CURLOPT_HTTPHEADER, array('Content-type: application/x-www-form-urlencoded'));
 curl_setopt($ch,CURLOPT_POST ,1);

 //テーブル削除
 $HiveQL="DROP TABLE NIKKEI255_RAW";
 curl_setopt($ch,CURLOPT_POSTFIELDS, "user.name=".HD_USERID."&execute=".$HiveQL.";&statusdir=".$output_path);
 $result = curl_exec($ch);
 $rs=json_decode($result);
 checkJob($rs->id);

 //テーブル作成およびデータ投入
 $HiveQL ="CREATE EXTERNAL TABLE NIKKEI255_RAW (";
 $HiveQL.="C_DATE string,STA_PRICE float,MAX_PRICE float,MIN_PRICE float,END_PRICE float) ";
 $HiveQL.="row format delimited  fields terminated by ','  lines terminated by '\\n'  stored as textfile  location ";
 $HiveQL.="'asv://".BLOB_CONTAINER."@".BLOB_ACCOUNTNAME.".blob.core.windows.net/sample/'";
 curl_setopt($ch,CURLOPT_POSTFIELDS, "user.name=".HD_USERID."&execute=".$HiveQL.";&statusdir=".$output_path);
 $result = curl_exec($ch);
 $rs=json_decode($result);
 checkJob($rs->id);

 //不要データ以外をファイルに書き込み
 $HiveQL ="INSERT OVERWRITE LOCAL DIRECTORY '/user/username/sample/result/' SELECT * FROM NIKKEI255_RAW ";
 $HiveQL.="WHERE END_PRICE is not null ";
 curl_setopt($ch,CURLOPT_POSTFIELDS, "user.name=".HD_USERID."&execute=".$HiveQL.";&statusdir=".$output_path);
 $result = curl_exec($ch);
 $rs=json_decode($result);
 checkJob($rs->id);

 curl_close($ch);

    //-■■■■■-HDInsightが登録したBLOBデータを読み込み-■■■■■-
    $blob_list = $blobRestProxy->listBlobs(BLOB_CONTAINER);
    $blobs = $blob_list->getBlobs();

    $calc_data_raw="";
    $calc_data=array();
    foreach($blobs as $blob){
     if(strstr($blob->getName(),'user/username/sample/result/') ){
         $calc_data_raw=str_replace("\001",",",file_get_contents($blob->getUrl()));
         $calc_data=explode("\n", $calc_data_raw);
     }
    }

 //-■■■■■-SQLServer接続-■■■■■-
 $conn = new PDO ("sqlsrv:server = tcp:".SQLSRV_HOST.",1433; Database = ".SQLSRV_DATABASE, SQLSRV_USER, SQLSRV_PW);
 $conn->setAttribute( PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION );
 //既存レコード初期化
 $stmt = $conn->prepare("DELETE FROM NIKKEI255");
 $stmt->execute();
 //データ追加
 $sql="INSERT INTO NIKKEI255(C_DATE,STA_PRICE,MAX_PRICE,MIN_PRICE,END_PRICE)VALUES";
 $sql_add="";
 $row_cnt=0;
 foreach ($calc_data as $recs) {
  $rec=explode(",",$recs);
  if(count($rec)!=5){
   continue;
  }
  if($sql_add!=""){
   $sql_add.=",";
  }
  $sql_add.="('".$rec[0]."',".$rec[1].",".$rec[2].",".$rec[3].",".$rec[4].")";
  $row_cnt++;

  if($row_cnt==1000){
   $stmt = $conn->prepare($sql.$sql_add);
   $stmt->execute();
   $sql_add="";
   $row_cnt=0;
  }
 }

 $stmt = $conn->prepare($sql.$sql_add);
 $stmt->execute();

 exit;

 //-■■■■■-ジョブ完了チェック(再帰)-■■■■■-
 function checkJob($job_id){

  $ch = curl_init(HD_ENDPOINT_JOB.$job_id."?user.name=".HD_USERID);

  // 返り値を文字列として受け取る
  curl_setopt($ch, CURLOPT_RETURNTRANSFER, TRUE);
  curl_setopt($ch, CURLOPT_USERPWD, HD_USERID.":".HD_USERPW);
  //CA証明書の検証をしない
  curl_setopt($ch,CURLOPT_SSL_VERIFYPEER,false);
  curl_setopt($ch, CURLOPT_VERBOSE, true);
  $result = curl_exec($ch);
  $rs=json_decode($result);

  if($rs->completed!="done"){
   sleep(1);
   checkJob($job_id);
  }
 }
?>


Hiveジョブを実行した後は、処理の完了チェックが必要となります。
またHDInsight(Hadoop)のデータをSQLServerに登録する場合、一般的にはsqoopコマンドを利用するかと思います。
リモートデスクトップでクラスタ上にログインすればsqoopコマンドが利用可能なのですが、API等は用意されておらず自前で作成する必要があるため、サンプルでは一度ファイルをダウンロードしてSQLServerに登録する流れになっています。
※簡単に出来る方法が見つかりませんでした

ちょっと個人で利用するのは高いので、また触る機会があったら頑張ってみようかなー

0 件のコメント:

コメントを投稿