ytake Hatena

Web Application Developer

Laravelで作る分析・分散処理アプリケーション その2

Kafka Consumer + Prestodb例

ytake.hateblo.jp

上記のエントリの続編です。

その1 ではApache Kafkaを組み合わせて、
データの分散やアプリケーション自体をスケールするアプローチを紹介しました。

今回は分散したサービスのデータの集約をKafkaとPrestoを組み合わせて、
ログ分析の基盤作りの例を紹介します。

アプリケーションのログをfluentd, elasticsearchで収集し、サービズ作りに活かすケースは多いと思います。

今回の例ではログにサービス固有の情報、物理的に異なるデータベースを集め、
ログデータをKPIなどに活かせる形にし、elasticseachに格納します。

あるところでデータ更新などが行われた場合でも、
Kafkaを軸にメッセージを受信することでelasticsearchのドキュメント更新なども簡単に行えます。

続きを読む

Laravelで作る分析・分散処理アプリケーション その1

先日のPHPカンファレンスPHPカンファレンス関西、buildersconでお話しした内容を元にして、
Laravel(PHP)を使って分析処理の簡単な実装や、
ミドルウェアを組み合わせた分散処理の実装を紹介します。

本ブログのサンプルアプリケーションは下記になりますので、
コードやミドルウェアなどを参照ください。

github.com

Laravelとkafka Connect、Elasticsearchの組み合わせ

Apache Kafkaを使ったスケーラブルなアプリケーションの入門編です。

レコード量が多い複雑なコンテンツのデータや検索要件、Like検索など、

RDBMSの不得意な分野などを対応することも多いかと思いますが、
RDBMSとElasticsearchを併用しKafkaで複雑さを吸収して、
アプリケーションをスケールさせるようにしてみましょう。

データベースのテーブル設計時に想定されるデータモデリングと、
サービスが成長することによってデータの複雑化と、検索の複雑さが増し、
ビジネス要件がより高度になっていきます。
これらを解消するために全文検索を導入するなどが考えられますが、
このサンプルではそういったデータストレージが異なる場合でも、
CQRS+ESライクに問題を解決するヒントになればと思います。

サンプルアプリケーションでは /fulltext 配下のurlが該当します。

Kafka Producerの実装

いわゆるMessage Queueのメッセージ送信を実装します。

LaravelのQueueを想像されるかもしれませんが、フレームワークのQueueではなく、
こういった処理は原則他の言語でも利用できるようにする必要がありますので、
フレームワークの知識をメッセージに混入させることなく実装するようにします。

Producerの実装自体は難しいものではありません。

<?php
declare(strict_types=1);

namespace App\Foundation\Producer;

use Psr\Log\LoggerInterface;
use RdKafka\Conf;
use RdKafka\Producer as KafkaProducer;
use RdKafka\Producer as RdkafkaProducer;
use RdKafka\ProducerTopic;

/**
 * Class Producer
 */
class Producer
{
    /** @var RdkafkaProducer */
    protected $producer;

    /** @var string */
    protected $topic = 'default';

    /** @var null|LoggerInterface  for optional logger */
    protected $logger;

    /** @var string */
    protected $brokers;

    /** @var array */
    protected $options;

    /**
     * Producer constructor.
     *
     * @param string $topic
     * @param string $brokers
     * @param array  $options
     */
    public function __construct(string $topic, string $brokers, array $options = [])
    {
        $this->topic = $topic;
        $this->brokers = $brokers;
        $this->options = $options;
    }

    /**
     * @param AbstractProduceDefinition $definition
     */
    public function produce(AbstractProduceDefinition $definition)
    {
        $kafkaTopic = $this->producerTopic();
        $kafkaTopic->produce(RD_KAFKA_PARTITION_UA, 0, $definition->payload());
        if ($this->logger instanceof LoggerInterface) {
            $this->logger->info($definition->payload());
        }
        $this->producer->poll(0);
    }

    /**
     * @param LoggerInterface $logger
     */
    public function setLogger(LoggerInterface $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @return ProducerTopic
     */
    protected function producerTopic(): ProducerTopic
    {
        $this->producer = $this->producer();
        $this->producer->setLogLevel(LOG_DEBUG);
        $this->producer->addBrokers($this->brokers);

        return $this->producer->newTopic($this->topic);
    }

    /**
     * @return KafkaProducer
     */
    protected function producer(): KafkaProducer
    {
        $conf = new Conf();
        foreach ($this->options as $key => $item) {
            $conf->set($key, $item);
        }

        return new KafkaProducer($conf);
    }
}

Command 実装

ここで指すCommandとは、artisan commandのアプリケーションではなく、
CQRSのCommandとQuery、データの書き込みと読み込みを分離して実装します。

以下は登録処理のコントローラクラスです。

<?php
declare(strict_types=1);

namespace App\Http\Controllers\Fulltext;

use App\Events\SinkConnect;
use App\Http\Controllers\Controller;
use App\Http\Requests\FulltextRequest;
use Illuminate\Contracts\Events\Dispatcher;
use Illuminate\Http\RedirectResponse;
use Illuminate\Routing\Redirector;

/**
 * Class RegisterAction
 */
final class RegisterAction extends Controller
{
    /** @var Dispatcher */
    private $dispatcher;

    /** @var Redirector */
    private $redirector;

    /**
     * RegisterAction constructor.
     *
     * @param Dispatcher $dispatcher
     * @param Redirector $redirector
     */
    public function __construct(Dispatcher $dispatcher, Redirector $redirector)
    {
        $this->dispatcher = $dispatcher;
        $this->redirector = $redirector;
    }

    /**
     * @param FulltextRequest $request
     *
     * @return RedirectResponse
     */
    public function __invoke(FulltextRequest $request): RedirectResponse
    {
        // 登録処理後に実行されるevent
        $this->dispatcher->dispatch(
            new SinkConnect(strval($request->get('fulltext')))
        );

        return $this->redirector->route('fulltext.index');
    }
}

サンプルではデータ書き込み(RDBMS)は省略していますが、
上記のコードの __invoke メソッドに記述するだけです。

何か登録処理が行われたものとして、その後にEventを発動しています。
このコントローラクラスでは、フロントで送信された文章(ブログの記事など)を保存する、
という機能を提供していますが、Kafkaへの通知はアプリケーションの要件ではなく、
システム都合の処理になりますので、ここのクラスではなく、Event Handlerが処理を行う様になっています。

このEventクラスはシンプルなクラスです。

<?php
declare(strict_types=1);

namespace App\Events;

/**
 * Class SinkConnect
 */
final class SinkConnect
{
    /** @var string */
    private $note;

    /**
     * SinkConnect constructor.
     *
     * @param string $note
     */
    public function __construct(string $note)
    {
        $this->note = $note;
    }

    /**
     * @return string
     */
    public function note(): string
    {
        return $this->note;
    }
}

このEventを処理するHandlerクラスがKafkaのメッセージを送信します。

Event Handler

SinkConnectイベントに反応して処理を行うクラスを実装します。

Handlerクラスから最初に紹介したKafka Producerを利用できる様に次の様に実装しています。

<?php
declare(strict_types=1);

namespace App\DataAccess;

use App\Foundation\Producer\Producer;
use App\Foundation\Producer\AbstractProduceDefinition;

/**
 * Class AbstractProduce
 */
abstract class AbstractProduce
{
    /** @var Producer */
    protected $producer;

    /**
     * MessageProduceUsecase constructor.
     *
     * @param Producer $producer
     */
    public function __construct(Producer $producer)
    {
        $this->producer = $producer;
    }

    /**
     * @param AbstractProduceDefinition $analyze
     */
    public function run(AbstractProduceDefinition $analyze)
    {
        $this->producer->produce($analyze);
    }
}

後述する分析処理にもKafkaを利用するためこのクラスを継承して利用します。

<?php
declare(strict_types=1);

namespace App\DataAccess;

/**
 * Class RegisterProduce
 */
final class RegisterProduce extends AbstractProduce
{

}

kafkaのtopicを処理によって切り替えるため、クラスを別クラスとして切り出しています。
このクラスを利用するHandlerクラスは以下のようになります。

<?php
declare(strict_types=1);

namespace App\Listeners;

use Ramsey\Uuid\Uuid;
use App\Events\SinkConnect;
use App\DataAccess\RegisterProduce;
use App\Definition\FulltextDefinition;

/**
 * Class SinkConnectHandler
 */
final class SinkConnectHandler
{
    /** @var RegisterProduce */
    protected $producer;

    /**
     * SinkConnectHandler constructor.
     *
     * @param RegisterProduce $producer
     */
    public function __construct(RegisterProduce $producer)
    {
        $this->producer = $producer;
    }

    /**
     * @param SinkConnect $connect
     */
    public function handle(SinkConnect $connect)
    {
        $this->producer->run(
            new FulltextDefinition(Uuid::uuid4()->toString(), $connect->note())
        );
    }
}

送信が可能な状態になりましたが、接続情報がないため、
これをServiceProviderを使って設定値を外から渡します。

設定値はconfig/kafka.phpに配置します

<?php

return [
    'topics'   => [
       'fulltext.register' => [
            'topic'   => 'fulltext.register',
            'brokers' => '127.0.0.1',
            'options' => [
                'socket.blocking.max.ms'       => '1',
                'queue.buffering.max.ms'       => '1',
                'queue.buffering.max.messages' => '1000',
                'client.id'                    => 'testingClient',
            ],
        ],
    ]
];

この設定値を使い、RegisterProduceクラスに与えます

<?php

class AppServiceProvider extends ServiceProvider
{
    /**
     * Register any application services.
     *
     * @return void
     */
    public function register()
    {
        $this->app->when(RegisterProduce::class)
            ->needs(Producer::class)
            ->give(function (Application $app) {
                $kafkaConfig = $app['config']->get('kafka');
                $topic = $kafkaConfig['topics']['fulltext.register'];
                $producer = new Producer($topic['topic'], $topic['brokers'], $topic['options']);
                $producer->setLogger($app['log']);

                return $producer;
            });
    }
}

これでEventが発動するとHandlerクラスが反応し、Kafkaへメッセージが送信されます。
格納されるKafkaのtopicは fulltext.register です。 ここで送信されるメッセージには、uuidと、フォームで入力された文字列となります。

Kafka Connectの設定

送信されたメッセージをElasticsearchに送信するためにKafka Connect Elasticsearchの設定を行います。

github.com

これはConfluentをインストールすると含まれますので、追加で入れる必要はありません。

サンプルアプリケーションにはConfluentやElasticsearchも含まれています

ここでは一台で動かすためStandaloneモードで起動させます。

connect-standalone.propertiesファイルを作成して、以下の内容を記述します。

bootstrap.servers=192.168.10.10:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

rest.port=8093

kafka connectを利用するにはconverterがいくつか種類があり、代表的なものはavroですが、
ここではjsonConverterを指定します。(avro利用例は公式を参照ください)

Elasticsearch Connector — Confluent Platform 3.3.0 documentation

このファイルを /etc/schema-registry/connect-standalone.properties として設置します。
*サンプルは設置済みです。

次にkafka connectで直接elasticsearchに接続して、インデックスにデータを追加する設定を記述します。
elasticsearch-connect.propertiesファイルを作成しkafkaのtopic情報などを記述します。

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector

tasks.max=1
topics=fulltext.register
key.ignore=true
connection.url=http://192.168.10.10:9200

schema.ignore=true
type.name=kafka-connect

上記の内容で、elasticsearchにKafkaのtopicと同じく、fulltext.register index が作成されます。
documentの_typeは kafka-connect となります。

これを /etc/kafka-connect-elasticsearch/elasticsearch-connect.properties として設置します。

次にKafkaに上記のKafka Connectを登録します。

sudo connect-standalone -daemon /etc/schema-registry/connect-standalone.properties /etc/kafka-connect-elasticsearch/elasticsearch-connect.properties
sudo confluent load elasticsearch-sink

daemonでKafka Connect Elasticsearchを起動し、 ConfluentにConnectorをLoadして登録します。

この状態で、Laravelで実装した処理を実行するとElasticsearchのindexに挿入されていきます。

Query実装

CQRSのQuery、データの読み込みを実装します。

LaravelのElasticsearchパッケージなどでも簡単に操作ができます。
サンプルではElasticsearchのphpクライアントライブラリを利用して実装しています。

<?php
declare(strict_types=1);

namespace App\DataAccess;

use Acme\Blog\Entity\EntryCriteria;
use App\Foundation\Elasticsearch\ElasticseachClient;

/**
 * Class FulltextIndex
 */
class FulltextIndex implements EntryCriteria
{
    /** @var ElasticseachClient */
    protected $client;

    /** @var string */
    protected $index = 'fulltext.register';

    /**
     * FulltextIndex constructor.
     *
     * @param ElasticseachClient $client
     */
    public function __construct(ElasticseachClient $client)
    {
        $this->client = $client;
    }

    /**
     * @return array
     */
    public function all(): array
    {
        $result = $this->client->client()->search([
            "index"  => $this->index,
            'type'   => 'kafka-connect',
            "body"   => [
                "query" => [
                    "match_all" => new \stdClass(),
                ],
            ],
        ]);
        $map = [];
        if (count($result)) {
            foreach($result['hits']['hits'] as $hit) {
                $map[] = $hit['_source'];
            }
        }

        return $map;
    }

    public function queryBy(string $string)
    {
        // TODO: Implement queryBy() method.
    }
}

Elasticsearchに問い合わせた結果は、
src/Entry (ドメイン)配下でRepositoryなどを経由してControllerを介してhtml出力されます。
詳細な実装はサンプルコードを参照ください

<?php
declare(strict_types=1);

namespace App\Http\Controllers\Fulltext;

use Acme\Blog\Specification\ActiveEntrySpecification;
use Acme\Blog\Usecase\RetrieveEntryUsecase;
use App\Http\Controllers\Controller;
use App\Http\Responders\HtmlResponder;
use Illuminate\Http\Response;

/**
 * Class IndexAction
 */
final class IndexAction extends Controller
{
    /** @var ActiveEntrySpecification */
    private $specification;

    /** @var RetrieveEntryUsecase */
    private $usecase;

    /**
     * IndexAction constructor.
     *
     * @param ActiveEntrySpecification $specification
     * @param RetrieveEntryUsecase     $usecase
     */
    public function __construct(
        ActiveEntrySpecification $specification,
        RetrieveEntryUsecase $usecase
    ) {
        $this->specification = $specification;
        $this->usecase = $usecase;
    }

    /**
     * @param HtmlResponder $responder
     *
     * @return Response
     */
    public function __invoke(HtmlResponder $responder): Response
    {
        $responder->template('fulltext.index');

        return $responder->emit([
            'list' => $this->usecase->run($this->specification),
        ]);
    }
}

アプローチ

このサンプルではKafka Connectを使ってデータの分散を行い、
RDBMSとElasticsearchの責務を分割した例を紹介しました。

小さなアプリケーションではオーバスペックな実装ですが、
確実なデータと検索に特化したミドルウェアを組み合わせて、データ上のパフォーマンスと堅実さを提供することができます。
また分散することで、どちらかに障害が発生した場合でもアプリケーションの動作を担保したり、
障害復旧などにも活かすことができるかと思います。

次回は物理的に分散したデータベースをPrestoで集約させて、Kafka Consumer経由でElasticsearchに格納する実装例を紹介します。

PHPカンファレンス2017でApache Kafkaについて話しました

PHPカンファレンス2017

今年も参加してきました

f:id:ytakezawa:20171010005624j:plain

2017/10/08 PHPカンファレンス2017で発表に使ったスライドです

speakerdeck.com

巨大化してしまったアプリケーションを分解する場合や、
マイクロサービス化するにあたって、
こうしたメッセージミドルウェアを利用するのがアプリケーション開発の重要なポイントになっています。

コンポーネント化やDDDなどによる堅実なアプリケーションも大事な要素ではありますが、
最近では複数のデータベースを跨いでるアプリケーションや、
ユーザー向けのサービスで分析処理、
ビッグデータなどと連携するアプリケーションでKafkaを利用するケースが多くなっている傾向もあります。

PHPは多くのサービスを支えている言語です。
今後は堅実さと、より巨大なデータフローを支えるシーンが多くなると思います。

そんな場面に直面した時のヒントになればと思います。

デモがみたい!と何人かの方に言われましたので、
折角なのでLaravelとKafka Connect、Prestoなどを使ったサンプルを公開します!

f:id:ytakezawa:20171010005756j:plain

弁当、めっちゃ美味しかった・・!
実は毎年スピーカー弁当あるよ!と数年スピーカーとして参加しているのに初めて知りました・・

今年の登壇シリーズ

毎年各地のPHPカンファレンスに参加していますが、
毎年、「今年登壇するときはこれでやる!」と決めていたりしていまして、
今年は今やっているビッグデータ系をPHP側からアプローチするシリーズでした。
(福岡のカンファレンスだけはADRの話をしましたが)

そんなこんなで実は各地でやったセッション、全て見聞きすると繋がるシリーズになっています。

PHPカンファレンス関西、builderscon2017

speakerdeck.com

www.youtube.com

参加したセッション

session03: 型を意識したPHPアプリケーション開発

buildersconの時に他のセッションに参加していて途中参加だったため、最初から参加しました
とても丁寧な解説と、ライブコーディングで親切なセッションでした。
フレームワークを利用して開発していると、手軽さだけを重視してしまい、
結果的に巨大な処理になってしまい堅実さを軽視してしまいがちです。
複雑さに立ち向かうための型を理解する良いセッションでした。

session35: ChatWorkとPHPと私

弊社では自分がKafkaとHBaseなどを組み合わせて、
データ処理系のアーキテクチャを設計・開発しているところですが、
DataWorks Summit SanJose 2017の発表や、Scala移行などの話題があり、
気になっていたので参加しました。

アーキテクチャの話はありませんでしたが、実際にやった話が聞けたのでよかったです。
PHP7かHHVMか、という話もあり、
HHVMはSymfony4からは対応しなくなるなどがありますので、
Hackでも利用できるようにいくつか作ろうかなーと思いました。

session37: PHPで理解するニューラルネットワークを使った機械学習

php-ai/php-mlではなくて、自身でライブラリを作って学習させてみた、というセッション。
buildersconで "Googleが開発したニューラルネット専用LSITensor Processing Unit」" を聞いていたので、
GPU大丈夫なんだろうかと思いながら、
実践で使う場合はさすがにPHPではやりませんが...

単純に面白いなぁと思ったセッションでした。

来年は他にもいくつかPHPの大きなイベントがあるようなので楽しみです!

PHPでビッグデータを操作しよう!Presto編 2

ytake.hateblo.jp

*上記の続き

異なるデータベース、NoSQLなどを結合できるということが理解できたと思います。

それではPHPのアプリケーションから実際に利用してみましょう。

PHP Prestodb Client

PHPのPrestoクライアントライブラリは、古いものがありますが、
自分の用途に合わなかった(最新のPHP向けにしたかったなど)為、
新たに作り、公開しました。

*新しく作ったもの

github.com

*以前からある古いもの

github.com

prestoはpdoのようにネイティブのコネクタはなく、
REST APIによる操作が可能です。
以前からあるもののスタイルではなく、javaのクライアントに近い実装にしました。

早速クライアントを利用して、前回のクエリを発行します。

Install

PHP7.0以上にのみ対応していますので、7.0以上の環境で実行してください。

installはcomposerを利用してください。
下記のコマンドを実行するだけでOKです。

$ composer require ytake/php-presto-client

Usage

prestoに問い合わせるクライアントのインスタンス生成は次の通りです。

<?php

$query = "SELECT _key, _value, test_id, test_name, created_at 
FROM my_tests.testing.tests AS myttt 
INNER JOIN red_tests.test.string AS redttt ON redttt._key = myttt.test_name 
WHERE myttt.test_name = 'presto'";

$client = new \Ytake\PrestoClient\StatementClient(
    new \Ytake\PrestoClient\ClientSession('http://127.0.0.1:8080/', 'my_tests'),
    $query
);

ClientSessionクラスに接続先と、第二引数にカタログ名を指定します。
カタログ名を指定すると、そのカタログ名を省略できるようになります。

ClientSessionクラスのインスタンスをStatementClientクラスに与えると、
Prestoに問い合わせる準備が整います。

一番簡単に利用できるのは、Ytake\PrestoClient\ResultsSession 経由で結果を取得する方法です。

<?php
$resultSession = new \Ytake\PrestoClient\ResultsSession($client);
$result = $resultSession->execute()->yieldResults();

/** @var \Ytake\PrestoClient\QueryResult $row */
foreach($result as $row) {
    foreach($row->yieldData() as $yieldRow) {
        if($yieldRow instanceof \Ytake\PrestoClient\FixData) {
            var_dump($yieldRow->offsetGet('_key'), $yieldRow['_key']);
        }
    }
}

yieldResultで様々なデータが返却されます。
Prestoはデータを取得するまでに複数回リクエストをおくる必要があり、
Queryの結果取得以外にも様々なデータを分割して返却する為です。

Prestoから返却されるQueryの結果は Ytake\PrestoClient\FixData で返却されますので、
これを取得します。
このクラスは ArrayAccess を実装していますので、
一般的な配列へのアクセスと同等にカラムの値を取得できます。

前回利用したクエリの結果は次のようになります。

class Ytake\PrestoClient\FixData#20 (5) {
  public $_key =>
  string(6) "presto"
  public $_value =>
  string(7) "awesome"
  public $test_id =>
  int(1)
  public $test_name =>
  string(6) "presto"
  public $created_at =>
  string(23) "2017-09-15 03:49:30.000"
}

接続にユーザーなどが必要であれば、ClientSessionクラスのメソッドが利用できます。
Prepared Statementは現在のバージョンではサポートされていない為、
利用できませんが(ユーザーに近いフロントから利用するものではない為)
サポートされていた以前のバージョンではPreparedStatementクラスがそのまま使えると思います。

利用方法のまとめ

簡単な利用方法をまとめると次のようになります。

<?php

require_once __DIR__ . '/vendor/autoload.php';

$query = "SELECT _key, _value, test_id, test_name, created_at 
FROM my_tests.testing.tests AS myttt 
INNER JOIN red_tests.test.string AS redttt ON redttt._key = myttt.test_name 
WHERE myttt.test_name = 'presto'";

$client = new \Ytake\PrestoClient\StatementClient(
    new \Ytake\PrestoClient\ClientSession('http://127.0.0.1:8080/', 'my_tests'),
    $query
);
$resultSession = new \Ytake\PrestoClient\ResultsSession($client);
// yield results instead of returning them. Recommended.
$result = $resultSession->execute()->yieldResults();

/** @var \Ytake\PrestoClient\QueryResult $row */
foreach($result as $row) {
    foreach($row->yieldData() as $yieldRow) {
        if($yieldRow instanceof \Ytake\PrestoClient\FixData) {
            var_dump($yieldRow->offsetGet('_key'), $yieldRow['_key']);
        }
    }
}

オブジェクト マッピング

PDOのFETCH_CLASSのように、任意のオブジェクトで変更することもできます。

PHP: PDOStatement::fetch - Manual

任意のオブジェクトで返却したい場合、カラムをクラスのプロパティとして作成します。
指定したカラム以外の値は返却されませんので、
以下のようにすると、プロパティに記述した2つのカラム以外は返却されません。
尚、プロパティはpublic, protected, privateのいずれでも値が挿入されますので、
アプリケーションの設計に合わせて利用してください。

<?php

class Testing
{
    /** @var string */
    private $_key;

    /** @var string */
    private $_value;

    /**
     * @return string
     */
    public function key()
    {
        return $this->_key;
    }

    /**
     * @return string
     */
    public function value(): string
    {
        return $this->_value;
    }
}

あとはyieldObjectメソッドにクラス名を指定して値を取得します。

<?php

$resultSession = new \Ytake\PrestoClient\ResultsSession($client);
$result = $resultSession->execute()->yieldResults();

/** @var \Ytake\PrestoClient\QueryResult $row */
foreach ($result as $row) {
    foreach ($row->yieldObject(Testing::class) as $item) {
        if ($item instanceof Testing) {
            var_dump($item);
        }
    }
}

これでPHPのアプリケーションからPrestoを操作することができるようになりました。
管理画面や、分析用途に活用してみてください。

PHPでビッグデータを操作しよう!Presto編 1

引き続きビッグデータ関連のミドルウェアを使った開発ばかりしてます。
もうすぐスーパーファミコンですね。

PHPを使ったビッグデータへのアプローチ方法などを話すことが多いですが、
今回は登壇時に例に挙げることも多いPrestoとPHPからの利用方法について紹介します。

speakerdeck.com

Prestoって何?

PrestoはFacebookで開発された分散SQLクエリエンジンで、
Webアプリケーションにとってはあまり馴染みがないかもしれませんが、
大規模なデータに対してインタラクティブに結果を返せるようになっていて、
Apache HiveやApache Impalaと同じようなものと考えるのが早いでしょう。

prestodb.io

ただWebアプリケーションエンジニアに嬉しいのは、
異なるデータベース(RDBMS)やデータストレージを結合して集計ができるという、
管理ツールや分散したデータベースを跨ぐ処理へのアプローチなど、活用できる場面が多くあります。
(Re:dashなどもあります)

更なる詳細については多くの記事や解説がありますので、そちらを参照してください。

tug.red

aws.amazon.com

blog.cloudera.co.jp

当然、利用ケースや場面によってはSparkやImpalaを利用した方が高速な場合もあり、
構成によっては、Kuduの方がよりフィットする場合もあると思います
(HDFSに格納されていてHBaseも使ってるぜ!みたいなケースだったり)

jp.cloudera.com

今回はPHPのアプリケーションで想定できるケースでどう使うか、
実際にどう組み込むかをやってみましょう。

異なるデータベースのjoinは、PostgreSQLのFDWやSQLServerのOPENQUERYなどもあります

F.31. postgres_fdw

Prestoをうごかす

現時点の最新は0.184ですので、マニュアルに沿ってインストールします。

$ wget https://repo1.maven.org/maven2/com/facebook/presto/presto-server/0.184/presto-server-0.184.tar.gz

解凍後に設置するだけで済みますが、
Prestoの設定や、接続したいデータベースの情報や記載するために
解凍したprestoのディレクトリに etc ディレクトリを作成します。

詳細はマニュアルに記載されていますので、ささっと初期設定をやってしまいます。

2.1. Deploying Presto — Presto 0.184 Documentation

launcherのrunを実行し、問題がなければstartなどで起動させます。

$ bin/launcher run

起動して、configで設定した discovery.uri にアクセスすると下記の画面が表示されます。

f:id:ytakezawa:20170915033255p:plain

接続先を追加する

次にMySQLに接続するように設定を追加します。

追加したetcディレクトリに、更に catalog ディレクトリを追加します。
各データベースへの接続情報はカタログに置いていきます。

MySQLのテーブル例

テーブルは下記のようなものを作ったとします。 (databaseはtestingとします)

CREATE TABLE `tests` (
  `test_id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `test_name` varchar(85) NOT NULL DEFAULT '',
  `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`test_id`),
  UNIQUE KEY `UIDX_TESTS_NAME` (`test_name`),
  KEY `IDX_TESTS` (`test_name`,`test_id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;

データは適当に入れておきます

INSERT INTO tests(test_name) VALUE('presto');

mysqlのcatalogは次の形式で記述します。

connector.name=mysql # コネクタのドライバ指定になるので、mysqlのままです
connection-url=jdbc:mysql://example.net:3306 # 以下は利用環境に合わせて指定します
connection-user=root
connection-password=secret

接続時にファイル名をカタログ名として利用しますので、
わかりやすい名前にしておきましょう。

仮にここでは etc/catalog/my_tests.properties として進めます。

Redisの例

次にRedisです。

せっかくなので、Redisのdatabaseをデフォルトではなく10を指定したとして設定を記述します。
10を利用するときは次の通りです

$ redis-cli
127.0.0.1:6379> SELECT 10
OK
127.0.0.1:6379[10]>

*IPなどは環境に合わせてください

Redisのcatalogに記述する項目は、先ほどのMySQLとは少し違います

5.14. Redis Connector — Presto 0.184 Documentation

今回はRedisの一番の基本のStringを主として、次のように記述します。

connector.name=redis
redis.table-names=test.string
redis.nodes=127.0.0.1:6379
redis.database-index=10

仮にここでは etc/catalog/red_tests.properties として進めます。

ここでも適当にデータを入れておきます。

127.0.0.1:6379[10]> SET presto awesome
OK
127.0.0.1:6379[10]> GET presto
"awesome"

これで準備が整いました。 prestoから接続できるか確認してみます。

Presto Command Line Interface

最初にインストールしたUIからはなにもできません。
CLIをダウンロードします。

2.2. Command Line Interface — Presto 0.184 Documentation

presto で利用できるようにjarのファイル名を変更します

$ wget https://repo1.maven.org/maven2/com/facebook/presto/presto-cli/0.184/presto-cli-0.184-executable.jar
$ mv presto-cli-0.184-executable.jar presto
$ chmod 755 presto

–serverにはprestoのconfigで設定した discovery.uri を指定します。

$ ./presto --server 127.0.0.1:8080
presto>

コンソールに入ったら現在時刻でも取得して動作確認しておきましょう。

presto> SELECT NOW();
               _col0
------------------------------------
 2017-09-15 04:29:51.689 Asia/Tokyo
(1 row)

Query 20170914_192951_00000_bv9h4, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:01 [0 rows, 0B] [0 rows/s, 0B/s]

ばっちりです

MySQL, Redisに接続する

事前に記述した接続先に接続してクエリを投げてみましょう *launcherのrestartは忘れずに

MySQL

事前に用意した接続先にクエリを投げます

<catalog名>.<データベース名>.<テーブル名>で指定します。

presto> SELECT * FROM my_tests.testing.tests;
 test_id | test_name |       created_at
---------+-----------+-------------------------
       1 | presto    | 2017-09-15 03:49:30.000
(1 row)

Query 20170914_194006_00007_ijqj3, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:01 [1 rows, 0B] [1 rows/s, 0B/s]

prestoから結果が取得できているのが見えます。

エラーが出る場合はSCHEMASが見えるか、など確認しましょう。

presto> SHOW SCHEMAS FROM my_tests;
       Schema
--------------------
 information_schema
 testing
(2 rows)

Redis

MySQLと問い合わせ方法は同じです。

redis.table-names で指定した名前を使って、<カタログ名>.<namesで記述した名前>を指定します。

マニュアルにもある通り、Redisは内部的なカラムとしていくつかあります。

Column name Type Description
_key VARCHAR Redis key.
_value VARCHAR Redis value corresponding to the key.

*一部抜粋

今はkey, valueをSELECT 区に入れてみます。

presto> SELECT _key, _value FROM red_tests.test.string;
  _key  | _value
--------+---------
 presto | awesome
(1 row)

Query 20170914_194457_00012_ijqj3, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:00 [1 rows, 7B] [12 rows/s, 90B/s]

先ほど挿入した値が取得できました。

MySQL Redis JOIN

ことなるデータストレージを結合して検索してみましょう。

presto> SELECT _key, _value, test_id, test_name, created_at FROM my_tests.testing.tests AS myttt
     -> INNER JOIN red_tests.test.string AS redttt ON redttt._key = myttt.test_name
     -> WHERE myttt.test_name = 'presto';

  _key  | _value  | test_id | test_name |       created_at
--------+---------+---------+-----------+-------------------------
 presto | awesome |       1 | presto    | 2017-09-15 03:49:30.000
(1 row)

Query 20170914_195255_00018_ijqj3, FINISHED, 1 node
Splits: 66 total, 66 done (100.00%)
0:00 [2 rows, 7B] [4 rows/s, 15B/s]

当然存在しない値を指定すると、0rowsとなります。

presto> SELECT _key, _value, test_id, test_name, created_at FROM my_tests.testing.tests AS myttt
     -> INNER JOIN red_tests.test.string AS redttt ON redttt._key = myttt.test_name
     -> WHERE myttt.test_name = 'ytake';

 _key | _value | test_id | test_name | created_at
------+--------+---------+-----------+------------
(0 rows)

Query 20170914_195337_00020_ijqj3, FINISHED, 1 node
Splits: 66 total, 66 done (100.00%)
0:00 [1 rows, 7B] [2 rows/s, 15B/s]

次回は、早速PHPから利用し、もう少しだけ複雑にしてみます。

Laravel5.5 API Resourcesを利用する その1

先日、Laravel5.5がリリースされました。
このバージョンは新しいLTSとなりますので、5.1からのアップグレードなどを検討してみましょう!

5.5で追加された仕組みの一つに、Eloquent: API Resourcesがあります。

laravel.com

以前からEloquentで取得したオブジェクトを直接レスポンスに与えると、
jsonで返却する機能がありました。

追加されたこの仕組みは、レスポンスとEloquentとの橋渡しをする機能となります。

便利な機能ではありますが、
データベースとの接続、データ返却を行うEloquentと、Httpのレスポンスが結び付き合うのは、
利便性という以外では責務が多すぎるため、
議論の的になることもあります。

と、マニュアルだけでは確かにそう見える機能ですが、
RESTでおなじみのLevel 3 - Hypermedia Controlsに対応することができます。

martinfowler.com

Eloquentを利用すれば、Resourceクラスがjsonに変換する際に、
meta情報などを付け加えることができます。
このレスポンスはjsonapiと呼ばれるフォーマットに近しい形で出力されます。

JSON API — A specification for building APIs in JSON

実際のアプリケーションではEloquent以外のものを利用することも多くあると思います。

本エントリでは、Eloquent以外のものでこのResourceを使って、
HALを適用します。

Resourceクラスは、 \Illuminate\Http\Resources\Json\Resource を利用することで、
さまざまなフォーマットに沿ったjsonを定義することができます。

このResourceクラスには、型宣言がありませんが、toArrayメソッドを持つものであれば、
変換に利用することができます。

シンプルな配列を利用する場合は、次のようになるでしょう

<?php

    $embedded = new \Illuminate\Http\Resources\Json\Resource(
        new class implements \Illuminate\Contracts\Support\Arrayable
        {
            public function toArray()
            {
                return [
                    [
                        'message' => 'hello',
                        'type'    => 'example',
                        '_links'  => [
                            'self' => 'http://example.com/1',
                            'hoge' => 'http://example.com/hoge/1',
                        ],
                    ],
                    [
                        'message' => 'hello',
                        'type'    => 'example',
                        '_links'  => [
                            'self' => 'http://example.com/2',
                            'hoge' => 'http://example.com/hoge/2',
                        ],
                    ],
                ];
            }
        }
    );

このリソース一つ一つにナビゲート可能なリンクがあるものとして、記述しています。

ResourceCollectionクラスを利用することもありますが、
Collectionクラスに依存するため、今回は通常の配列のみを利用します。

このままレスポンスとして利用することも可能ですが、
このリソースを他のリソースで利用し、埋め込み情報として利用します。

<?php

    $resource = new \Illuminate\Http\Resources\Json\Resource(
        new class($embedded) implements \Illuminate\Contracts\Support\Arrayable
        {
            /** @var \Illuminate\Http\Resources\Json\Resource */
            protected $resource;

            /**
             *  constructor.
             *
             * @param \Illuminate\Http\Resources\Json\Resource $resource
             */
            public function __construct(\Illuminate\Http\Resources\Json\Resource $resource)
            {
                $this->resource = $resource;
            }

            public function toArray()
            {
                return [
                    'title'     => 'illuminate/http resource',
                    '_links'    => [
                        'self' => 'http://google.com',
                    ],
                    '_embedded' => $this->resource->jsonSerialize(),
                ];
            }
        }
    );
    $resource::$wrap = null;

最初に記述したリソースを埋め込み情報として利用するリソースクラスを作成します。
デフォルトでdataの配列となりますが、静的プロパティの$wrapに任意の名称を与えることができますので、
ここでは利用しないためnullとします。

埋め込み情報は _embedded として利用します

最後にcontent-typeを 'application/hal+json' としてレスポンスを返却するようにします。

<?php

$response = $resource->response()->header('content-type', 'application/hal+json');
return $response;

レスポンスとしては次のように返却されます。

f:id:ytakezawa:20170908035400p:plain

親子関係がはっきりしたデータ構造や実装であれば、
比較的ライトに実装することができると思います。

LaravelでEntityなどを表現して、データマッパーライクな実装をしている場合は、
以前のエントリを参考にアノテーションを使う実装の方が、
フィットするかと思います。

qiita.com

ytake.hateblo.jp1

2016-2017買ってよかった書籍

最近はphpよりもSparkやKafkaといったミドルウェアを使った開発や、設計ばかりやってます。

そんな中で良い書籍がありましたので、せっかくなのでいくつか紹介します。

ビッグデータ

スケーラブルリアルタイムデータ分析入門

スケーラブルリアルタイムデータ分析入門 ―ラムダアーキテクチャによるビッグデータ処理

スケーラブルリアルタイムデータ分析入門 ―ラムダアーキテクチャによるビッグデータ処理

ラムダアーキテクチャについて詳しく書かれているのはもちろんですが、
どういう様に分析処理を作っていくべきか、説明や実装例を使って解説されているので、
アーキテクチャのイメージがつきやすい書籍です。
分析処理や、Hadoopなどを扱う場合は持っておいて損はしないと思います。

書籍のリポジトリ

github.com

参考

www.oreilly.com

www.oreilly.com

ビッグデータ分析・活用のためのSQLレシピ

ビッグデータ分析・活用のためのSQLレシピ

ビッグデータ分析・活用のためのSQLレシピ

前述のスケーラブルリアルタイムデータ分析入門とセットで持っておくと良いかもしれません。
当然ながら載っているSQLそのまま利用することはできませんが、
ミドルウェアによるQueryの違いや、後半の章などはビッグデータ系に携わるのであれば持っておいて良いと思います。
あと読みやすいです。

実際にリアルタイムなストリーミング処理ではSpark SQLとpipelineDBとかを使うことになると思いますので、
読んで実際に試してみてください。

参考

Spark SQL and DataFrames - Spark 2.2.0 Documentation

www.pipelinedb.com

詳解 Apache Spark

詳解 Apache Spark

詳解 Apache Spark

SparkのDocumentを見ながらやっていましたが、やはり日本語の書籍が欲しくて購入。
前述の2冊を読んでいると、さらにグッとあーーなるほど、という繋がりが見えて楽しいです。
MLlibやGraphXも記述がありますので、全体像を掴むにはバッチリでした。

Sparkによる実践データ解析

Sparkによる実践データ解析 ―大規模データのための機械学習事例集

Sparkによる実践データ解析 ―大規模データのための機械学習事例集

Spark1.2ベースで多少古いですが、アルゴリズムや処理のポイントが記述されているので、
phperの自分にもとてもよくわかる書籍で、
Sparkを実践的に利用していく前に読んでおくと、アーキテクチャなどを考える際にヒントとなります。

分析処理系をやる場合は、ビッグデータ系の知識がなければ難しいので、
あげた書籍はかなりおすすめです

洋書

Spark in Action

詳解 Apache Sparkのさらに突っ込んだ内容で、H2Oを組み合わせたレコメンドの例や、 しかも初心者でも置いてけぼりにされない様に小難しくなりすぎない内容だったり、
ステップアップにはとても良いと思います。

Spark in Action

Spark in Action

Akka in Action

This very good Akka guidebook と言われるくらいの本
phperの自分がscalaを理解、Akkaを理解するために購入して現時点でも読みながら触っている最中です。
買ってよかった・・・

Akka in Action

Akka in Action

実践ドメイン駆動設計の著者が書いたAkkaの実践的な本も一緒に書いましたが、
こちらはまだ全然読めておらず、そしてもっとAkkaを理解せねば。。

Reactive Messaging Patterns with the Actor Model: Applications and Integration in Scala and Akka

Reactive Messaging Patterns with the Actor Model: Applications and Integration in Scala and Akka

他にもまだ良い本何冊か書いまして、またの機会に