ytake Hatena

Web Application Developer

PHPでビッグデータを操作しよう!Presto編 2

ytake.hateblo.jp

*上記の続き

異なるデータベース、NoSQLなどを結合できるということが理解できたと思います。

それではPHPのアプリケーションから実際に利用してみましょう。

PHP Prestodb Client

PHPのPrestoクライアントライブラリは、古いものがありますが、
自分の用途に合わなかった(最新のPHP向けにしたかったなど)為、
新たに作り、公開しました。

*新しく作ったもの

github.com

*以前からある古いもの

github.com

prestoはpdoのようにネイティブのコネクタはなく、
REST APIによる操作が可能です。
以前からあるもののスタイルではなく、javaのクライアントに近い実装にしました。

早速クライアントを利用して、前回のクエリを発行します。

Install

PHP7.0以上にのみ対応していますので、7.0以上の環境で実行してください。

installはcomposerを利用してください。
下記のコマンドを実行するだけでOKです。

$ composer require ytake/php-presto-client

Usage

prestoに問い合わせるクライアントのインスタンス生成は次の通りです。

<?php

$query = "SELECT _key, _value, test_id, test_name, created_at 
FROM my_tests.testing.tests AS myttt 
INNER JOIN red_tests.test.string AS redttt ON redttt._key = myttt.test_name 
WHERE myttt.test_name = 'presto'";

$client = new \Ytake\PrestoClient\StatementClient(
    new \Ytake\PrestoClient\ClientSession('http://127.0.0.1:8080/', 'my_tests'),
    $query
);

ClientSessionクラスに接続先と、第二引数にカタログ名を指定します。
カタログ名を指定すると、そのカタログ名を省略できるようになります。

ClientSessionクラスのインスタンスをStatementClientクラスに与えると、
Prestoに問い合わせる準備が整います。

一番簡単に利用できるのは、Ytake\PrestoClient\ResultsSession 経由で結果を取得する方法です。

<?php
$resultSession = new \Ytake\PrestoClient\ResultsSession($client);
$result = $resultSession->execute()->yieldResults();

/** @var \Ytake\PrestoClient\QueryResult $row */
foreach($result as $row) {
    foreach($row->yieldData() as $yieldRow) {
        if($yieldRow instanceof \Ytake\PrestoClient\FixData) {
            var_dump($yieldRow->offsetGet('_key'), $yieldRow['_key']);
        }
    }
}

yieldResultで様々なデータが返却されます。
Prestoはデータを取得するまでに複数回リクエストをおくる必要があり、
Queryの結果取得以外にも様々なデータを分割して返却する為です。

Prestoから返却されるQueryの結果は Ytake\PrestoClient\FixData で返却されますので、
これを取得します。
このクラスは ArrayAccess を実装していますので、
一般的な配列へのアクセスと同等にカラムの値を取得できます。

前回利用したクエリの結果は次のようになります。

class Ytake\PrestoClient\FixData#20 (5) {
  public $_key =>
  string(6) "presto"
  public $_value =>
  string(7) "awesome"
  public $test_id =>
  int(1)
  public $test_name =>
  string(6) "presto"
  public $created_at =>
  string(23) "2017-09-15 03:49:30.000"
}

接続にユーザーなどが必要であれば、ClientSessionクラスのメソッドが利用できます。
Prepared Statementは現在のバージョンではサポートされていない為、
利用できませんが(ユーザーに近いフロントから利用するものではない為)
サポートされていた以前のバージョンではPreparedStatementクラスがそのまま使えると思います。

利用方法のまとめ

簡単な利用方法をまとめると次のようになります。

<?php

require_once __DIR__ . '/vendor/autoload.php';

$query = "SELECT _key, _value, test_id, test_name, created_at 
FROM my_tests.testing.tests AS myttt 
INNER JOIN red_tests.test.string AS redttt ON redttt._key = myttt.test_name 
WHERE myttt.test_name = 'presto'";

$client = new \Ytake\PrestoClient\StatementClient(
    new \Ytake\PrestoClient\ClientSession('http://127.0.0.1:8080/', 'my_tests'),
    $query
);
$resultSession = new \Ytake\PrestoClient\ResultsSession($client);
// yield results instead of returning them. Recommended.
$result = $resultSession->execute()->yieldResults();

/** @var \Ytake\PrestoClient\QueryResult $row */
foreach($result as $row) {
    foreach($row->yieldData() as $yieldRow) {
        if($yieldRow instanceof \Ytake\PrestoClient\FixData) {
            var_dump($yieldRow->offsetGet('_key'), $yieldRow['_key']);
        }
    }
}

オブジェクト マッピング

PDOのFETCH_CLASSのように、任意のオブジェクトで変更することもできます。

PHP: PDOStatement::fetch - Manual

任意のオブジェクトで返却したい場合、カラムをクラスのプロパティとして作成します。
指定したカラム以外の値は返却されませんので、
以下のようにすると、プロパティに記述した2つのカラム以外は返却されません。
尚、プロパティはpublic, protected, privateのいずれでも値が挿入されますので、
アプリケーションの設計に合わせて利用してください。

<?php

class Testing
{
    /** @var string */
    private $_key;

    /** @var string */
    private $_value;

    /**
     * @return string
     */
    public function key()
    {
        return $this->_key;
    }

    /**
     * @return string
     */
    public function value(): string
    {
        return $this->_value;
    }
}

あとはyieldObjectメソッドにクラス名を指定して値を取得します。

<?php

$resultSession = new \Ytake\PrestoClient\ResultsSession($client);
$result = $resultSession->execute()->yieldResults();

/** @var \Ytake\PrestoClient\QueryResult $row */
foreach ($result as $row) {
    foreach ($row->yieldObject(Testing::class) as $item) {
        if ($item instanceof Testing) {
            var_dump($item);
        }
    }
}

これでPHPのアプリケーションからPrestoを操作することができるようになりました。
管理画面や、分析用途に活用してみてください。

PHPでビッグデータを操作しよう!Presto編 1

引き続きビッグデータ関連のミドルウェアを使った開発ばかりしてます。
もうすぐスーパーファミコンですね。

PHPを使ったビッグデータへのアプローチ方法などを話すことが多いですが、
今回は登壇時に例に挙げることも多いPrestoとPHPからの利用方法について紹介します。

speakerdeck.com

Prestoって何?

PrestoはFacebookで開発された分散SQLクエリエンジンで、
Webアプリケーションにとってはあまり馴染みがないかもしれませんが、
大規模なデータに対してインタラクティブに結果を返せるようになっていて、
Apache HiveやApache Impalaと同じようなものと考えるのが早いでしょう。

prestodb.io

ただWebアプリケーションエンジニアに嬉しいのは、
異なるデータベース(RDBMS)やデータストレージを結合して集計ができるという、
管理ツールや分散したデータベースを跨ぐ処理へのアプローチなど、活用できる場面が多くあります。
(Re:dashなどもあります)

更なる詳細については多くの記事や解説がありますので、そちらを参照してください。

tug.red

aws.amazon.com

blog.cloudera.co.jp

当然、利用ケースや場面によってはSparkやImpalaを利用した方が高速な場合もあり、
構成によっては、Kuduの方がよりフィットする場合もあると思います
(HDFSに格納されていてHBaseも使ってるぜ!みたいなケースだったり)

jp.cloudera.com

今回はPHPのアプリケーションで想定できるケースでどう使うか、
実際にどう組み込むかをやってみましょう。

異なるデータベースのjoinは、PostgreSQLのFDWやSQLServerのOPENQUERYなどもあります

F.31. postgres_fdw

Prestoをうごかす

現時点の最新は0.184ですので、マニュアルに沿ってインストールします。

$ wget https://repo1.maven.org/maven2/com/facebook/presto/presto-server/0.184/presto-server-0.184.tar.gz

解凍後に設置するだけで済みますが、
Prestoの設定や、接続したいデータベースの情報や記載するために
解凍したprestoのディレクトリに etc ディレクトリを作成します。

詳細はマニュアルに記載されていますので、ささっと初期設定をやってしまいます。

2.1. Deploying Presto — Presto 0.184 Documentation

launcherのrunを実行し、問題がなければstartなどで起動させます。

$ bin/launcher run

起動して、configで設定した discovery.uri にアクセスすると下記の画面が表示されます。

f:id:ytakezawa:20170915033255p:plain

接続先を追加する

次にMySQLに接続するように設定を追加します。

追加したetcディレクトリに、更に catalog ディレクトリを追加します。
各データベースへの接続情報はカタログに置いていきます。

MySQLのテーブル例

テーブルは下記のようなものを作ったとします。 (databaseはtestingとします)

CREATE TABLE `tests` (
  `test_id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `test_name` varchar(85) NOT NULL DEFAULT '',
  `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`test_id`),
  UNIQUE KEY `UIDX_TESTS_NAME` (`test_name`),
  KEY `IDX_TESTS` (`test_name`,`test_id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;

データは適当に入れておきます

INSERT INTO tests(test_name) VALUE('presto');

mysqlのcatalogは次の形式で記述します。

connector.name=mysql # コネクタのドライバ指定になるので、mysqlのままです
connection-url=jdbc:mysql://example.net:3306 # 以下は利用環境に合わせて指定します
connection-user=root
connection-password=secret

接続時にファイル名をカタログ名として利用しますので、
わかりやすい名前にしておきましょう。

仮にここでは etc/catalog/my_tests.properties として進めます。

Redisの例

次にRedisです。

せっかくなので、Redisのdatabaseをデフォルトではなく10を指定したとして設定を記述します。
10を利用するときは次の通りです

$ redis-cli
127.0.0.1:6379> SELECT 10
OK
127.0.0.1:6379[10]>

*IPなどは環境に合わせてください

Redisのcatalogに記述する項目は、先ほどのMySQLとは少し違います

5.14. Redis Connector — Presto 0.184 Documentation

今回はRedisの一番の基本のStringを主として、次のように記述します。

connector.name=redis
redis.table-names=test.string
redis.nodes=127.0.0.1:6379
redis.database-index=10

仮にここでは etc/catalog/red_tests.properties として進めます。

ここでも適当にデータを入れておきます。

127.0.0.1:6379[10]> SET presto awesome
OK
127.0.0.1:6379[10]> GET presto
"awesome"

これで準備が整いました。 prestoから接続できるか確認してみます。

Presto Command Line Interface

最初にインストールしたUIからはなにもできません。
CLIをダウンロードします。

2.2. Command Line Interface — Presto 0.184 Documentation

presto で利用できるようにjarのファイル名を変更します

$ wget https://repo1.maven.org/maven2/com/facebook/presto/presto-cli/0.184/presto-cli-0.184-executable.jar
$ mv presto-cli-0.184-executable.jar presto
$ chmod 755 presto

–serverにはprestoのconfigで設定した discovery.uri を指定します。

$ ./presto --server 127.0.0.1:8080
presto>

コンソールに入ったら現在時刻でも取得して動作確認しておきましょう。

presto> SELECT NOW();
               _col0
------------------------------------
 2017-09-15 04:29:51.689 Asia/Tokyo
(1 row)

Query 20170914_192951_00000_bv9h4, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:01 [0 rows, 0B] [0 rows/s, 0B/s]

ばっちりです

MySQL, Redisに接続する

事前に記述した接続先に接続してクエリを投げてみましょう *launcherのrestartは忘れずに

MySQL

事前に用意した接続先にクエリを投げます

<catalog名>.<データベース名>.<テーブル名>で指定します。

presto> SELECT * FROM my_tests.testing.tests;
 test_id | test_name |       created_at
---------+-----------+-------------------------
       1 | presto    | 2017-09-15 03:49:30.000
(1 row)

Query 20170914_194006_00007_ijqj3, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:01 [1 rows, 0B] [1 rows/s, 0B/s]

prestoから結果が取得できているのが見えます。

エラーが出る場合はSCHEMASが見えるか、など確認しましょう。

presto> SHOW SCHEMAS FROM my_tests;
       Schema
--------------------
 information_schema
 testing
(2 rows)

Redis

MySQLと問い合わせ方法は同じです。

redis.table-names で指定した名前を使って、<カタログ名>.<namesで記述した名前>を指定します。

マニュアルにもある通り、Redisは内部的なカラムとしていくつかあります。

Column name Type Description
_key VARCHAR Redis key.
_value VARCHAR Redis value corresponding to the key.

*一部抜粋

今はkey, valueをSELECT 区に入れてみます。

presto> SELECT _key, _value FROM red_tests.test.string;
  _key  | _value
--------+---------
 presto | awesome
(1 row)

Query 20170914_194457_00012_ijqj3, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:00 [1 rows, 7B] [12 rows/s, 90B/s]

先ほど挿入した値が取得できました。

MySQL Redis JOIN

ことなるデータストレージを結合して検索してみましょう。

presto> SELECT _key, _value, test_id, test_name, created_at FROM my_tests.testing.tests AS myttt
     -> INNER JOIN red_tests.test.string AS redttt ON redttt._key = myttt.test_name
     -> WHERE myttt.test_name = 'presto';

  _key  | _value  | test_id | test_name |       created_at
--------+---------+---------+-----------+-------------------------
 presto | awesome |       1 | presto    | 2017-09-15 03:49:30.000
(1 row)

Query 20170914_195255_00018_ijqj3, FINISHED, 1 node
Splits: 66 total, 66 done (100.00%)
0:00 [2 rows, 7B] [4 rows/s, 15B/s]

当然存在しない値を指定すると、0rowsとなります。

presto> SELECT _key, _value, test_id, test_name, created_at FROM my_tests.testing.tests AS myttt
     -> INNER JOIN red_tests.test.string AS redttt ON redttt._key = myttt.test_name
     -> WHERE myttt.test_name = 'ytake';

 _key | _value | test_id | test_name | created_at
------+--------+---------+-----------+------------
(0 rows)

Query 20170914_195337_00020_ijqj3, FINISHED, 1 node
Splits: 66 total, 66 done (100.00%)
0:00 [1 rows, 7B] [2 rows/s, 15B/s]

次回は、早速PHPから利用し、もう少しだけ複雑にしてみます。

Laravel5.5 API Resourcesを利用する その1

先日、Laravel5.5がリリースされました。
このバージョンは新しいLTSとなりますので、5.1からのアップグレードなどを検討してみましょう!

5.5で追加された仕組みの一つに、Eloquent: API Resourcesがあります。

laravel.com

以前からEloquentで取得したオブジェクトを直接レスポンスに与えると、
jsonで返却する機能がありました。

追加されたこの仕組みは、レスポンスとEloquentとの橋渡しをする機能となります。

便利な機能ではありますが、
データベースとの接続、データ返却を行うEloquentと、Httpのレスポンスが結び付き合うのは、
利便性という以外では責務が多すぎるため、
議論の的になることもあります。

と、マニュアルだけでは確かにそう見える機能ですが、
RESTでおなじみのLevel 3 - Hypermedia Controlsに対応することができます。

martinfowler.com

Eloquentを利用すれば、Resourceクラスがjsonに変換する際に、
meta情報などを付け加えることができます。
このレスポンスはjsonapiと呼ばれるフォーマットに近しい形で出力されます。

JSON API — A specification for building APIs in JSON

実際のアプリケーションではEloquent以外のものを利用することも多くあると思います。

本エントリでは、Eloquent以外のものでこのResourceを使って、
HALを適用します。

Resourceクラスは、 \Illuminate\Http\Resources\Json\Resource を利用することで、
さまざまなフォーマットに沿ったjsonを定義することができます。

このResourceクラスには、型宣言がありませんが、toArrayメソッドを持つものであれば、
変換に利用することができます。

シンプルな配列を利用する場合は、次のようになるでしょう

<?php

    $embedded = new \Illuminate\Http\Resources\Json\Resource(
        new class implements \Illuminate\Contracts\Support\Arrayable
        {
            public function toArray()
            {
                return [
                    [
                        'message' => 'hello',
                        'type'    => 'example',
                        '_links'  => [
                            'self' => 'http://example.com/1',
                            'hoge' => 'http://example.com/hoge/1',
                        ],
                    ],
                    [
                        'message' => 'hello',
                        'type'    => 'example',
                        '_links'  => [
                            'self' => 'http://example.com/2',
                            'hoge' => 'http://example.com/hoge/2',
                        ],
                    ],
                ];
            }
        }
    );

このリソース一つ一つにナビゲート可能なリンクがあるものとして、記述しています。

ResourceCollectionクラスを利用することもありますが、
Collectionクラスに依存するため、今回は通常の配列のみを利用します。

このままレスポンスとして利用することも可能ですが、
このリソースを他のリソースで利用し、埋め込み情報として利用します。

<?php

    $resource = new \Illuminate\Http\Resources\Json\Resource(
        new class($embedded) implements \Illuminate\Contracts\Support\Arrayable
        {
            /** @var \Illuminate\Http\Resources\Json\Resource */
            protected $resource;

            /**
             *  constructor.
             *
             * @param \Illuminate\Http\Resources\Json\Resource $resource
             */
            public function __construct(\Illuminate\Http\Resources\Json\Resource $resource)
            {
                $this->resource = $resource;
            }

            public function toArray()
            {
                return [
                    'title'     => 'illuminate/http resource',
                    '_links'    => [
                        'self' => 'http://google.com',
                    ],
                    '_embedded' => $this->resource->jsonSerialize(),
                ];
            }
        }
    );
    $resource::$wrap = null;

最初に記述したリソースを埋め込み情報として利用するリソースクラスを作成します。
デフォルトでdataの配列となりますが、静的プロパティの$wrapに任意の名称を与えることができますので、
ここでは利用しないためnullとします。

埋め込み情報は _embedded として利用します

最後にcontent-typeを 'application/hal+json' としてレスポンスを返却するようにします。

<?php

$response = $resource->response()->header('content-type', 'application/hal+json');
return $response;

レスポンスとしては次のように返却されます。

f:id:ytakezawa:20170908035400p:plain

親子関係がはっきりしたデータ構造や実装であれば、
比較的ライトに実装することができると思います。

LaravelでEntityなどを表現して、データマッパーライクな実装をしている場合は、
以前のエントリを参考にアノテーションを使う実装の方が、
フィットするかと思います。

qiita.com

ytake.hateblo.jp1

2016-2017買ってよかった書籍

最近はphpよりもSparkやKafkaといったミドルウェアを使った開発や、設計ばかりやってます。

そんな中で良い書籍がありましたので、せっかくなのでいくつか紹介します。

ビッグデータ

スケーラブルリアルタイムデータ分析入門

スケーラブルリアルタイムデータ分析入門 ―ラムダアーキテクチャによるビッグデータ処理

スケーラブルリアルタイムデータ分析入門 ―ラムダアーキテクチャによるビッグデータ処理

ラムダアーキテクチャについて詳しく書かれているのはもちろんですが、
どういう様に分析処理を作っていくべきか、説明や実装例を使って解説されているので、
アーキテクチャのイメージがつきやすい書籍です。
分析処理や、Hadoopなどを扱う場合は持っておいて損はしないと思います。

書籍のリポジトリ

github.com

参考

www.oreilly.com

www.oreilly.com

ビッグデータ分析・活用のためのSQLレシピ

ビッグデータ分析・活用のためのSQLレシピ

ビッグデータ分析・活用のためのSQLレシピ

前述のスケーラブルリアルタイムデータ分析入門とセットで持っておくと良いかもしれません。
当然ながら載っているSQLそのまま利用することはできませんが、
ミドルウェアによるQueryの違いや、後半の章などはビッグデータ系に携わるのであれば持っておいて良いと思います。
あと読みやすいです。

実際にリアルタイムなストリーミング処理ではSpark SQLとpipelineDBとかを使うことになると思いますので、
読んで実際に試してみてください。

参考

Spark SQL and DataFrames - Spark 2.2.0 Documentation

www.pipelinedb.com

詳解 Apache Spark

詳解 Apache Spark

詳解 Apache Spark

SparkのDocumentを見ながらやっていましたが、やはり日本語の書籍が欲しくて購入。
前述の2冊を読んでいると、さらにグッとあーーなるほど、という繋がりが見えて楽しいです。
MLlibやGraphXも記述がありますので、全体像を掴むにはバッチリでした。

Sparkによる実践データ解析

Sparkによる実践データ解析 ―大規模データのための機械学習事例集

Sparkによる実践データ解析 ―大規模データのための機械学習事例集

Spark1.2ベースで多少古いですが、アルゴリズムや処理のポイントが記述されているので、
phperの自分にもとてもよくわかる書籍で、
Sparkを実践的に利用していく前に読んでおくと、アーキテクチャなどを考える際にヒントとなります。

分析処理系をやる場合は、ビッグデータ系の知識がなければ難しいので、
あげた書籍はかなりおすすめです

洋書

Spark in Action

詳解 Apache Sparkのさらに突っ込んだ内容で、H2Oを組み合わせたレコメンドの例や、 しかも初心者でも置いてけぼりにされない様に小難しくなりすぎない内容だったり、
ステップアップにはとても良いと思います。

Spark in Action

Spark in Action

Akka in Action

This very good Akka guidebook と言われるくらいの本
phperの自分がscalaを理解、Akkaを理解するために購入して現時点でも読みながら触っている最中です。
買ってよかった・・・

Akka in Action

Akka in Action

実践ドメイン駆動設計の著者が書いたAkkaの実践的な本も一緒に書いましたが、
こちらはまだ全然読めておらず、そしてもっとAkkaを理解せねば。。

Reactive Messaging Patterns with the Actor Model: Applications and Integration in Scala and Akka

Reactive Messaging Patterns with the Actor Model: Applications and Integration in Scala and Akka

他にもまだ良い本何冊か書いまして、またの機会に

PHP with Apache Kafka

Apache Foundation

ビッグデータ系の処理向けにApache Kafkaを利用し始めました。

これまでもMessage Queueなどにzmq、Redis(PubSub)、ActiveMQ/RabbitMQなどを利用はしていましたが、

スケールのしやすさや、運用面や機能など今後フル活用できそうなためKafkaに、と。
HadoopやHbase、Cassandraといったミドルウェアを扱う機会も増えていているため、
親和性なども当然あります

Kafkaどうなのよ

保存機能

Kafkaでは指定した期間、メッセージを保存する機能が用意されています。
これまでもその機能を持つMessage Queueはありましたが、
配信後にアプリケーション側から再配信させるといったことができます。
デフォルトで2週間ほど保持することができるので、
障害発生時に、再度処理を実行させることも、配信したメッセージの内容を取得することも容易です。

ただ、たしかに最近のPHP、特にLaravelなどにみられる非同期処理を溜め込むためのQueueレベルであれば、
かなりのオーバースペックとなります。
ビッグデータではデファクトスタンダードと言っても良いくらいのラムダアーキテクチャの上で動くアプリケーションであれば、
集計処理やリアルタイム処理など全てが異なるミドルウェアで実行されますので、
速度層などでは重宝します。

Sparkと一緒に利用することでストリーム処理がより強力になるのも魅力的です
(そのために最近scalaをやり始めました)

実績もあるZookeeperを利用したスケールのしやすさなどは現時点ではKafka一択でした。

Kafkaから直接CassandraやHbaseに値を挿入することもできます

www.confluent.io

Partition

Kafkaは簡単にクラスタを構築することができるのはもちろんですが、
このパーティションを利用してアプリケーションレベルで並列や分散処理が設計できます。

ごく一般的なPubSubやQueueではChannel(KafkaではTopicと呼びます)を利用して、
Round-Robinや、同一のQueueを複数のSubscriberが同時に処理するなどがあげられます。

Kafkaでは上記はもちろんですが、アプリケーションで自由に決めることができ、
場合によっては木の枝の様にどんどん細分化して効率的に処理を行うこともできます。

f:id:ytakezawa:20170521001922p:plain

これはSubscriber側だけではなく、PublishするProducer側からも指定することができます。
一つのTopicに対してPartitionで分割し、特定のサーバからはpartition0, あるサーバからはpartition1に対して、
Publishするなどが可能で、
複数のConsumerはこれらをまとめて処理することもProducer同様に、
アプリケーションに合わせてConsumerを異なるサーバで動かすなどができます。

Chatの様なリアルタイム性が重視されるアプリケーションにおいても、
TopicとPartitionがいきてくるのではないかと思います。

Kafka Install

インストール方法はいろんなサイトに記載されていますので省きます。

ただし、アプリケーション側から接続ができない、などがあれば、
confg/server.properties の中身を確認しましょう。

必要に応じて、 advertised.listeners=PLAINTEXT://ipアドレス指定:9092 などデフォルトのままではなく、
正しい情報になっているかどうかを確認します。

Zookeeperの利用が必須になりますので、Kafkaとは別に起動が必要です。
こちらもあわせてZookpeerのアドレスが正しいかどうかを確認してください。

# root directory for all kafka znodes.
zookeeper.connect=ipアドレス指定:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

*Zookeeperのクラスタの構築方法は他のサイトを参照ください

PHPから利用する

Kafkaを細かく利用していくのであれば、rdkafkaがおすすめです。
libkafkaをラップしたphpエクステンションで、
かなり自由に設定を変更できます。

arnaud-lb/php-rdkafka

設定についはアプリケーションの規模や構成によって異なりますので、
公式のドキュイメントかconfluentなどのドキュメントを読み込むのをおすすめします。

Kafka Consumers — Confluent Platform 3.2.1 documentation

Producerは queue.buffering.max.messages などをアプリケーションに合わせて変更してください。
処理速度が早すぎる場合は、指定した件数以上は処理されなくなってしまいます。

Consumerは heartbeat.interval.ms, session.timeout.ms を適切な値に設定する必要があります。
(正しく設定していない場合は切断されるなどが発生します 切断時はsupervisorでプロセス再起動)
heartbeat.interval.msは session.timeoutの1/3くらいに設定すると良いかもしれません。

簡単にProducer/Consumerの動作確認、partition分割の動作確認などを行えるサンプルを用意していますので、
難しければ参考にしてみてください

github.com

RdKafka\Conf クラスを利用して、上記の設定値を反映させることができます。

<?php
declare(strict_types=1);

namespace Ytake\KafkaConsole\Foundation;

use RdKafka\Conf;
use RdKafka\Producer;
use RdKafka\Consumer;

/**
 * Class Configure
 */
final class Configure
{
    // 省略
    /**
     * @return Producer
     */
    public function producer(): Producer
    {
        $conf = new Conf();
        foreach ($this->producerConfigure as $key => $item) {
            $conf->set($key, $item);
        }
        return new Producer($conf);
    }
    /**
     * @return Consumer
     */
    public function consumer(): Consumer
    {
        $conf = new Conf();
        foreach ($this->consumerConfigure as $key => $item) {
            $conf->set($key, $item);
        }
        return new Consumer($conf);
    }
    // 
}

Produce自体は非常にシンプルです

<?php

    /**
     * @param AbstractProduceDefinition $definition
     */
    public function produce(AbstractProduceDefinition $definition)
    {
        $kafkaTopic = $this->producerTopic();
        $kafkaTopic->produce(RD_KAFKA_PARTITION_UA, 0, $definition->payload());
        if ($this->logger instanceof LoggerInterface) {
            $this->logger->info($definition->payload());
        }
        $this->producer->poll(0);
    }
    /**
     * @return ProducerTopic
     */
    protected function producerTopic(): ProducerTopic
    {
        $configure = $this->repository->find($this->topic);
        $this->producer = $configure->producer();
        $this->producer->setLogLevel(LOG_DEBUG);
        $this->producer->addBrokers($configure->brokers());
        return $this->producer->newTopic($this->topic);
    }

RD_KAFKA_PARTITION_UA で、Producer側から自動でpartitionを選択しpublishします。
複数のpartitionを意図した様に操作する場合は
RdKafka\ProducerTopic, RdKafka\Producer クラスのproduceメソッドの引数で意図したものに指定してください。

Consumerは下記の様になります

<?php

    /**
     * @param Consumable $callable
     *
     * @throws \Exception
     */
    public function handle(Consumable $callable)
    {
        $topic = $this->consumerTopic();
        $topic->consumeStart($this->partition, $this->offset);
        while (true) {
            $message = $topic->consume($this->partition, 120 * 10000);
            if ($message instanceof Message) {
                switch ($message->err) {
                    case RD_KAFKA_RESP_ERR_NO_ERROR:
                        call_user_func($callable, $message);
                        break;
                    case RD_KAFKA_RESP_ERR__TIMED_OUT:
                        throw new \Exception("time out.");
                        break;
                    default:
                        break;
                }
            }
        }
    }

サンプルのアプリケーションでは、
producerは $ php kafka-console kafka:produce message-topic hello
consumerは $ php kafka-console kafka:consume message-topic でそれぞれ簡単に動作確認ができます。

consumerを複数立ち上げて、produceを実行すると複数のconsumerが反応します。
またpartitionを指定してconsumerを起動すると、特定のconusmerのみ反応することが確認できると思います。

複雑なアプリケーションや、ビッグデータを扱うアプリケーションなどでは、
一つのアプリケーションだけで解決するには、困難が付きまといます。
マイクロサービスアーキテクチャなどはこうしたミドルウェアを用いたEvent Sourcingなども重要になってきます。
これらのミドルウェアを適切に使い、アプリケーション作りに役立てていきましょう。

Laravel5.4 Data MapperライクなDatabaseアプローチ

Laravelで使われているilluminate/databaseはPDOを利用して実装されています。
つまりPDOでできることは全て利用可能です。

Laravelの標準の機能では、データベースのレコードはCollectionクラスで、
stdClassまたは、配列でカラムと値が共に返却されます。

Data Mapperライクに任意のオブジェクトで返却する様にするには、
Illuminate\Database\Events\StatementPrepared をlistenする必要がありますが、
Database処理にEventが依存してしまうため(eventヘルパーを使ってもクラスに依存していることになります)、
fetchModeを変更できるメソッドが欲しくなります(laravel5.3まではありましたが変更されました)

5.4で利用したい場合は次の様な拡張で簡単に追加することができます。

<?php
declare(strict_types=1);

namespace App\Foundation;

use PDOStatement;

trait QueryPrepared
{
    /** @var string */
    protected $fetchStyleClass = '';

    /**
     * @param string $class
     *
     * @return QueryPrepared
     */
    public function fetchClass(string $class): self
    {
        $this->fetchStyleClass = $class;

        return $this;
    }

    /**
     * @param PDOStatement $statement
     *
     * @return PDOStatement
     */
    protected function prepared(PDOStatement $statement): PDOStatement
    {
        $statement->setFetchMode($this->fetchMode);
        if (!empty($this->fetchStyleClass)) {
            $statement->setFetchMode(\PDO::FETCH_CLASS, $this->fetchStyleClass);
        }

        return $statement;
    }
}

5.4ではquery発行の直前で Illuminate\Database\Connection クラスの preparedメソッドが実行され、
このメソッドで前述のeventが発火されますので、この動作を変更します。

この例ではeventを利用せずに、fetch styleを変更します。

次にConnectionクラスを継承したクラスを作成し、トレイトを使って変更します。
(当然トレイトではなく、通常の継承でも構いません)

<?php
declare(strict_types=1);

namespace App\Foundation;

/**
 * Class SqliteConnection
 */
final class SqliteConnection extends \Illuminate\Database\SQLiteConnection
{
    use QueryPrepared;
}

この例ではsqliteだけを変更します。

次にService Containerを使って、実行クラスを変更します。

<?php
declare(strict_types=1);

namespace App\Providers;

use App\Foundation\SqliteConnection;
use Illuminate\Support\ServiceProvider;
use Illuminate\Database\DatabaseManager;
use Illuminate\Database\Connectors\SQLiteConnector;

/**
 * Class AppServiceProvider
 */
final class AppServiceProvider extends ServiceProvider
{
    /**
     *
     */
    public function register()
    {
        /** @var DatabaseManager $dbManager */
        $dbManager = $this->app['db'];
        $dbManager->extend('sqlite', function (array $config, $name) {
            $connectior = new SQLiteConnector;
            return new SqliteConnection($connectior->connect($config));
        });
    }
}

これでsqlite利用時は拡張したクラスが利用される様になります。
(SqliteConnectionクラスの引数は必要に応じて指定してください)

データベースで返却されるカラムなどに応じてFETCH_CLASSで指定するクラスを作成します。

次の例は、返却されるカラムが二つだけのシンプルなものです。

<?php
declare(strict_types=1);

namespace App\Mapper;

/**
 * Class UserMapper
 */
class UserMapper
{
    /** @var string */
    private $id;

    /** @var string */
    private $name;

    /**
     * @return string
     */
    public function getId(): string
    {
        return $this->id;
    }

    /**
     * @return string
     */
    public function getName(): string
    {
        return $this->name;
    }
}

最後に利用したい処理に記述します。

<?php

use App\Mapper\UserMapper;
use App\Foundation\SqliteConnection;
use Illuminate\Database\DatabaseManager;

public function __invoke(DatabaseManager $databaseManager)
{
    /** @var SQLiteConnection $connection */
    $connection = $databaseManager->connection();
    $connection->fetchClass(UserMapper::class);
    $generator = $connection->table('users')->cursor();

    /** @var UserMapper $item */
    foreach ($generator as $item) {
        dump($item);
    }
}

cursorはLaravel5.3から追加されたGeneratorで返却するメソッドです。
返却されるオブジェクトが指定したものになっていることが確認できます。

UserMapper {#107 ▼
  -id: "1"
  -name: "onamae"
}

Laravel-Aspect update MessageDriven, QueryLog (1.7)

リリースノート作ればいいんですが、便利機能をいくつか追加しましたので、 使い所などを踏まえて追加機能の紹介です。

今回からLaravel5.2, 5.1は1.6、5.3以上は1.7となりました。
5.5リリース後、以前のバージョン対応は停止する予定です。

What

そもそもこのライブラリは、ビジネス要件をサポートするシステム要件を分離し、
AOPを用いてアプリケーションをサポートするものです。
Cache削除追加更新, Transactionをはじめ、Log出力やリトライ処理など
どれもビジネス要件を解決する実装コードに含めてしまうとどうしても複雑になりがちです。

それらはビジネス要件以外のシステム要件なども多く入ってしまうと、
大規模アプリケーションでは理解が難しくなっていくため、そんな要件をサポートするためのものです。

ユースケースによるアスペクト指向ソフトウェア開発 (Object Oriented Selectionシリーズ)

ユースケースによるアスペクト指向ソフトウェア開発 (Object Oriented Selectionシリーズ)

そんなサポートの機能によく使うものを追加しました

Message Driven

これは特定のメソッドの処理をQueueにpushする機能です。
illuminate/busを利用してQueueに挿入しますが、dispatchヘルパーと用途的には同じです。

Queueを利用するため、serializeできないClosureが含まれている場合はエラーになります。

Usage

<?php

declare(strict_types=1);

namespace App\Foundation;

use Ytake\LaravelAspect\Annotation\Loggable;
use Ytake\LaravelAspect\Annotation\LazyQueue;
use Ytake\LaravelAspect\Annotation\MessageDriven;

class MessageDispatcher
{
    /**
     * @Loggable
     * @MessageDriven(
     *     @LazyQueue(10)
     * )
     *
     * @param string $message
     */
    public function queued(string $message)
    {
        $this->hello($message);
    }

    /**
     * @Loggable
     * @param string $message
     *
     * @return string
     */
    public function hello(string $message): string
    {   
        return $message;
    }
}

MessageDrivenアノテーションの第一引数でQueueの振る舞いが変更されます。
LazyQueueをアノテーションで指定すると、databaseやredisなどのドライバーを利用するQueueとして作用し、
EagerQueueを指定すると、syncドライバを利用して遅延なしで実行されます。

上記の例の様に他のアノテーションと組み合わせることもできます。
この場合はQueueに queued メソッドの処理が登録されます。
helloメソッドが実行されるときに引数と実行時間をlogに出力します。

Query Log

Laravelを使っている方はdatabaseのクエリログ取得にEventを利用したり、 Debugbarといったものを使って表示させているかと思います。

Eventを毎回ServiceProviderに記述して必要な処理のところだけLogを出力して・・、
となるとなかなか手間がかかります。

Debugbarは便利ではありますが、商用環境にこのライブラリをインストールするわけにもいかず。。

*大幅にパフォーマンス劣化、実行しているものが見えるため場合によっては脆弱性に繋がるかもしれません

ですが商用環境でSlowQueryを解決するためにコードに記述することも多いと思いますが、
どうしてもアプリケーション本来の要件とは違うコードが記述されるため、処理が複雑になっていくこともあると思います。

そんな問題をAOPで解決する機能です。

Usage

EloquentでもDBファサードやConstruct Injectionでも構いませんが下記の様に、
特定の処理を実行するメソッドに対して @QueryLogアノテーションを記述します。

<?php

use Ytake\LaravelAspect\Annotation\QueryLog;
use Illuminate\Database\ConnectionResolverInterface;

/**
 * Class AspectQueryLog
 */
class AspectQueryLog
{
    /** @var ConnectionResolverInterface */
    protected $db;

    /**
     * @param ConnectionResolverInterface $db
     */
    public function __construct(ConnectionResolverInterface $db)
    {
        $this->db = $db;
    }

    /**
     * @QueryLog
     */
    public function multipleDatabaseAppendRecord()
    {
        $this->db->connection()->statement('CREATE TABLE tests (test varchar(255) NOT NULL)');
        $this->db->connection('testing_second')->statement('CREATE TABLE tests (test varchar(255) NOT NULL)');
        $this->db->connection()->table("tests")->insert(['test' => 'testing']);
        $this->db->connection('testing_second')->table("tests")->insert(['test' => 'testing second']);
    }
}

上記の例では multipleDatabaseAppendRecord メソッドが実行されると、
このメソッド内でコールされたSQLがログファイルに出力されます

Transactional拡張

今までTransactionalアノテーションは一つのメソッドに対して一つしか指定できなかったため、
複数のデータベースに対してTransactionを指示したい場合は、下記の様にする必要がありました。

<?php

namespace App\Domain\Service;

use Ytake\LaravelAspect\Annotation\Transactional;

class ProductActivation
{
    /**
     * @Transactional("db1")
     */
    public function activate()
    {
        $this->activateCampaign();
        // 処理
    }

    /**
     * @Transactional("db2")
     */
    public function activateCampaign()
    {

    }
}

これを1メソッドに対して同時に複数指定することができる様になりました。

<?php

namespace App\Domain\Service;

use Ytake\LaravelAspect\Annotation\Transactional;

class ProductActivation
{
    /**
     * @Transactional({"db1", "db2"})
     */
    public function activate()
    {
        // 処理
    }
}