Laravelで作る分析・分散処理アプリケーション その2

Kafka Consumer + Prestodb例

ytake.hateblo.jp

上記のエントリの続編です。

その1 ではApache Kafkaを組み合わせて、
データの分散やアプリケーション自体をスケールするアプローチを紹介しました。

今回は分散したサービスのデータの集約をKafkaとPrestoを組み合わせて、
ログ分析の基盤作りの例を紹介します。

アプリケーションのログをfluentd, elasticsearchで収集し、サービズ作りに活かすケースは多いと思います。

今回の例ではログにサービス固有の情報、物理的に異なるデータベースを集め、
ログデータをKPIなどに活かせる形にし、elasticseachに格納します。

あるところでデータ更新などが行われた場合でも、
Kafkaを軸にメッセージを受信することでelasticsearchのドキュメント更新なども簡単に行えます。

Prestoについて

prestoの利用方法や設定については下記を参考、本エントリのサンプルリポジトリを参照ください。

ytake.hateblo.jp

ytake.hateblo.jp

github.com

今回は上記のエントリで紹介したものに加え、prestoからKafkaに接続します。

アプリケーションログをKafkaに送信

fluentdなどを使うのが一般的ですが、
今回の例ではLaravelで作られたアプリケーションにアクセスした場合に、 ユーザーがアクセスしたURLとユーザー名をKafkaへ通知します。

Log送信イベント

アプリケーションログ送信をEventとします。

<?php
declare(strict_types=1);

namespace App\Events;

/**
 * Class Loggable
 */
final class Loggable
{
    /** @var string */
    private $uri;

    /** @var string[] */
    private $names = [
        'presto',
        'kafka',
        'laravel',
        'elasticsearch',
    ];

    /**
     * Loggable constructor.
     *
     * @param string $uri
     */
    public function __construct(string $uri)
    {
        $this->uri = $uri;
    }

    /**
     * @return string
     */
    public function uri(): string
    {
        return $this->uri;
    }

    /**
     * @return string
     */
    public function name(): string
    {
        return $this->names[array_rand($this->names)];
    }
}

アクセスしたURLを送信するイベントですが、
例としてEvent Handler処理時にランダムでユーザー名を決定するようになっています。

ログデータの送信自体はKafka Producerが担当しますが、
これは前回紹介した Kafka Producer抽象クラスを継承したクラスです。
サービスプロバイダーで別の値を注入するために分割して利用します。

<?php
declare(strict_types=1);

namespace App\DataAccess;

/**
 * Class LogProduce
 */
final class LogProduce extends AbstractProduce
{

}

前回のエントリの設定値を混ぜると、 config/kafka.php は次のようになります。
*Consumer実装も含まれますので、こちらで紹介します。

<?php

return [
    'topics'   => [
        'analyze.action'    => [
            'topic'   => 'analyze.action',
            'brokers' => '127.0.0.1',
            'options' => [
                'socket.blocking.max.ms'       => '1',
                'queue.buffering.max.ms'       => '1',
                'queue.buffering.max.messages' => '1000',
                'client.id'                    => 'testingClient',
            ],
        ],
        'fulltext.register' => [
            'topic'   => 'fulltext.register',
            'brokers' => '127.0.0.1',
            'options' => [
                'socket.blocking.max.ms'       => '1',
                'queue.buffering.max.ms'       => '1',
                'queue.buffering.max.messages' => '1000',
                'client.id'                    => 'testingClient',
            ],
        ],
    ],
    'consumer' => [
        'brokers' => '127.0.0.1',
        'options' => [
            'heartbeat.interval.ms'              => '10000',
            'session.timeout.ms'                 => '30000',
            'topic.metadata.refresh.interval.ms' => '60000',
            'topic.metadata.refresh.sparse'      => 'true',
            'log.connection.close'               => 'false',
            'group.id'                           => 'testingConsumer',
        ],
    ],
];

サービスプロバイダも前回のものを混ぜると以下のようになります。
同じクラスをコンストラクタインジェクションで指定しますが、
具象クラスは違うクラスを与えます (Laravelの Contextual Bindingを利用)

<?php

    public function register()
    {
        $this->app->when(LogProduce::class)
            ->needs(Producer::class)
            ->give(function (Application $app) {
                $kafkaConfig = $app['config']->get('kafka');
                $topic = $kafkaConfig['topics']['analyze.action'];
                $producer = new Producer($topic['topic'], $topic['brokers'], $topic['options']);
                $producer->setLogger($app['log']);

                return $producer;
            });
        $this->app->when(RegisterProduce::class)
            ->needs(Producer::class)
            ->give(function (Application $app) {
                $kafkaConfig = $app['config']->get('kafka');
                $topic = $kafkaConfig['topics']['fulltext.register'];
                $producer = new Producer($topic['topic'], $topic['brokers'], $topic['options']);
                $producer->setLogger($app['log']);

                return $producer;
            });
    }

次に上記のEvent、LoggableクラスのHandlerクラスです

<?php
declare(strict_types=1);

namespace App\Listeners;

use App\DataAccess\LogProduce;
use App\Definition\AnalysisDefinition;
use App\Events\Loggable;
use Ramsey\Uuid\Uuid;

/**
 * Class LoggableHandler
 */
final class LoggableHandler
{
    /** @var LogProduce */
    protected $producer;

    /**
     * LoggableHandler constructor.
     *
     * @param LogProduce $producer
     */
    public function __construct(LogProduce $producer)
    {
        $this->producer = $producer;
    }

    /**
     * @param Loggable $loggable
     */
    public function handle(Loggable $loggable)
    {
        $this->producer->run(
            new AnalysisDefinition(Uuid::uuid4()->toString(), $loggable->uri(), $loggable->name())
        );
    }
}

これでLoggable イベントが発動するとKafkaに アクセスしたURLユーザー名識別のためのUUID
が送信されます。
この例でKafkaの analyze.action topicにメッセージが格納されます。

アプリケーションのURL全てを対象とするため、
サンプルコードではGlobal Middlewareとして利用しています。

<?php
declare(strict_types=1);

namespace App\Http\Middleware;

use Closure;
use App\Events\Loggable;
use Illuminate\Http\Request;
use Illuminate\Contracts\Events\Dispatcher;

/**
 * Class SendLogger
 */
final class SendLogger
{
    /** @var Dispatcher */
    private $dispatcher;

    /**
     * SendLogger constructor.
     *
     * @param Dispatcher $dispatcher
     */
    public function __construct(Dispatcher $dispatcher)
    {
        $this->dispatcher = $dispatcher;
    }

    /**
     * @param Request $request
     * @param Closure $next
     *
     * @return mixed
     */
    public function handle(Request $request, Closure $next)
    {
        $this->dispatcher->dispatch(new Loggable($request->getUri()));

        return $next($request);
    }
}

これでアクセス時にログデータとしてKafkaに通知されます。

Kafka Consumer メッセージ受信

次にKafka Consumerを実装します。

これもLaravelのQueueを利用すれば、アプリケーション内で完結することができますが、
アプリケーションをスケールさせる上では、フレームワークの知識をメッセージに含めることはできません。
このため前回引き続きフレームワークのQueueは利用しません。

ConsumerをLaravelに組み込むのは非常に簡単です。
次のように実装することができます。

<?php
declare(strict_types=1);

namespace App\Foundation\Consumer;

use RdKafka\Conf;
use RdKafka\Consumer as KafkaConsumer;
use RdKafka\Message;
use RdKafka\TopicConf;
use RdKafka\ConsumerTopic;

/**
 * Class Consumer
 */
class Consumer
{
    /** @var string */
    protected $topic;

    /** @var \RdKafka\Consumer */
    protected $consumer;

    /** @var int */
    protected $partition = 0;

    /** @var string */
    protected $brokers = '';

    /** @var array */
    protected $configure = [];

    /** @var int */
    protected $offset = RD_KAFKA_OFFSET_STORED;

    protected $callable;

    /**
     * Consumer constructor.
     *
     * @param string $brokers
     * @param array  $configure
     */
    public function __construct(string $brokers, array $configure = [])
    {
        $this->brokers = $brokers;
        $this->configure = $configure;
    }

    /**
     * @param string $topic
     */
    public function topic(string $topic)
    {
        $this->topic = $topic;
    }

    /**
     * @param int $partition
     */
    public function partition(int $partition)
    {
        $this->partition = $partition;
    }

    /**
     * @param int $offset
     */
    public function offset(int $offset)
    {
        $this->offset = $offset;
    }

    /**
     * @param Consumable $callable
     *
     * @throws \Exception
     */
    public function handle(Consumable $callable)
    {
        $topic = $this->consumerTopic();
        $topic->consumeStart($this->partition, $this->offset);
        while (true) {
            $message = $topic->consume($this->partition, 120 * 10000);
            if ($message instanceof Message) {
                switch ($message->err) {
                    case RD_KAFKA_RESP_ERR_NO_ERROR:
                        call_user_func($callable, $message);
                        $this->outputMessage($message);
                        break;
                    case RD_KAFKA_RESP_ERR__TIMED_OUT:
                        throw new \RuntimeException("time out.");
                        break;
                    default:
                        break;
                }
            }
        }
    }

    /**
     * @param callable $callable
     */
    public function callbackMessage(callable $callable)
    {
        $this->callable = $callable;
    }

    /**
     * @param Message $message
     */
    protected function outputMessage(Message $message)
    {
        if ($this->callable) {
            call_user_func_array($this->callable, [$message]);
        }
    }

    /**
     * @return ConsumerTopic
     */
    protected function consumerTopic(): ConsumerTopic
    {
        $this->consumer = $this->consumer();
        $this->consumer->addBrokers($this->brokers);

        return $this->consumer->newTopic($this->topic, $this->topicConf());
    }

    /**
     * @return TopicConf
     */
    protected function topicConf(): TopicConf
    {
        $topicConf = new TopicConf();
        $topicConf->set('auto.commit.interval.ms', '100');
        $topicConf->set('offset.store.method', 'file');
        $topicConf->set('offset.store.path', sys_get_temp_dir());
        $topicConf->set('auto.offset.reset', 'smallest');

        return $topicConf;
    }

    /**
     * @return KafkaConsumer
     */
    protected function consumer(): KafkaConsumer
    {
        $conf = new Conf();
        foreach ($this->configure as $key => $item) {
            $conf->set($key, $item);
        }

        return new KafkaConsumer($conf);
    }
}

この例では接続情報を外から与え、handleメソッドに処理をしたいクラスを与えます。
handleメソッドが必要なクラスは、下記のインターフェースを実装したクラスとなります。

<?php
declare(strict_types=1);

namespace App\Foundation\Consumer;

use RdKafka\Message;

/**
 * Interface Consumable
 */
interface Consumable
{
    /**
     * @param Message $message
     *
     * @return mixed
     */
    public function __invoke(Message $message);
}

Kafkaからのメッセージを受信すると、受信メッセージが RdKafka\Message クラスとして利用できます。
Kafka Consumerをartisanコマンドとして実装します。

<?php
declare(strict_types=1);

namespace App\Console;

use App\Foundation\Consumer\Consumable;
use App\Foundation\Consumer\Consumer;
use Illuminate\Console\Command;
use RdKafka\Message;

/**
 * Class ConsumerCommand
 */
class ConsumerCommand extends Command
{
    /** @var string */
    protected $name = 'kafka:consumer';

    /** @var string */
    protected $description = '';

    /** @var Consumer */
    protected $consumer;

    /** @var string */
    protected $topic;

    /** @var Consumable */
    protected $consumable;

    /**
     * ConsumerCommand constructor.
     *
     * @param Consumer   $consumer
     * @param Consumable $consumable
     * @param string     $topic
     */
    public function __construct(
        Consumer $consumer,
        Consumable $consumable,
        string $topic = 'analyze.action'
    ) {
        parent::__construct();
        $this->consumer = $consumer;
        $this->consumable = $consumable;
        $this->topic = $topic;
    }

    public function handle()
    {
        $this->consumer->topic($this->topic);
        $this->consumer->callbackMessage(function (Message $message) {
            $this->info($message->payload);
        });
        $this->consumer->handle($this->consumable);
    }
}

メッセージ受信時にprestoから複数のデータベースを接続し、
elasticsearchに格納するように実装します。  

Prestoの設定 - Kafka Schema

先の実装でKafka Producerで、UUID, URL, 名前を送信するようにしました。
格納されるtopicと、topicに格納されるデータをPrestoで指定します。

PrestoにKafkaの情報を設定する場合は、
etc/kafka/table名.json ファイルを作成して例として以下の内容を記述します。

{
    "tableName": "action",
    "schemaName": "analyze",
    "topicName": "analyze.action",
    "message": {
        "dataFormat": "json",
        "fields": [
            {
                "name": "uuid",
                "mapping": "uuid",
                "type": "VARCHAR"
            },
            {
                "name": "uri",
                "mapping": "uri",
                "type": "VARCHAR"
            },
            {
                "name": "name",
                "mapping": "name",
                "type": "VARCHAR"
            }
        ]
    }
}

topic名は、prestoの各データベースへ接続する設定と同じく、. という形式で作成しています。
他に各メッセージの型を記述します。
詳細は公式を参照ください。

[https://prestodb.io/docs/current/connector/kafka.html:title]

catalogの指定はMySQLなどと同じです。
サンプルでは etc/catalog/kafka_tests.properties としています。

connector.name=kafka
kafka.nodes=127.0.0.1:9092
kafka.table-names=analyze.action
kafka.hide-internal-columns=false

この他の接続設定はサンプルコード等を参考にしてください。
作成後、prestoの再起動等を行なってください。

このサンプルでは、Redisの文字列型レコードと、MySQL、Kafkaを結合します。
複数のデータベースを結合する利用例としてはもちろんですが、
Redisのランキング機能とMySQLの情報を結合し、
人気順の何かしらのデータと、Kafkaに格納されたログを結合することで、
様々なコンテンツデータを組み合わせたログデータの作成、
またはアプリケーションの複雑な機能提供などが行えます。

PrestoをLaravelに組み込む

PrestoはPDOなどの拡張ではなく、HTTPの通信を使って操作しますので、
DatabaseManager、Eloquentを拡張するメリットはあまりなく、
フロントのアプリケーションから常にアドホックに利用するものではなく、
プリペアドステートメントもありません。(以前のバージョンではありましたが現行にはない様です)

PrestoについてのエントリでPHPからの利用方法を紹介していますので、
それをつかってLaravelから利用できる様にします。

<?php
declare(strict_types=1);

namespace App\Foundation\Presto;

use Ytake\PrestoClient\FixData;
use Ytake\PrestoClient\ClientSession;
use Ytake\PrestoClient\ResultsSession;
use Ytake\PrestoClient\StatementClient;

/**
 * Class PrestoClient
 */
class PrestoClient
{
    /** @var ClientSession */
    protected $session;

    /**
     * PrestoClient constructor.
     *
     * @param ClientSession $session
     */
    public function __construct(ClientSession $session)
    {
        $this->session = $session;
    }

    /**
     * @param string $query
     *
     * @return array
     */
    public function query(string $query): array
    {
        $result = [];
        $client = new StatementClient($this->session, $query);
        $resultSession = new ResultsSession($client);
        $yieldResult = $resultSession->execute()->yieldResults();
        /** @var \Ytake\PrestoClient\QueryResult $row */
        foreach ($yieldResult as $row) {
            foreach ($row->yieldData() as $yieldRow) {
                if ($yieldRow instanceof FixData) {
                    $result[] = $yieldRow;
                }
            }
        }

        return $result;
    }
}

config、サービスプロバイダは次の通りです。

config/presto.php

<?php

return [
    'connections' => [
        'presto_test' => [
            'host'    => 'http://127.0.0.1:8080/',
            'catalog' => 'default',
        ],
    ],
];

サービスプロバイダは次の通りです。

<?php

public function register() 
{
        $this->app->bind(ClientSession::class, function (Application $app) {
            $prestoConfig = $app['config']->get('presto');
            $connectionConfig = $prestoConfig['connections']['presto_test'];

            return new ClientSession($connectionConfig['host'], $connectionConfig['catalog']);
        });
}

Prestoに投げるQueryは次の通りです。

<?php
declare(strict_types=1);

namespace App\DataAccess;

use App\Foundation\Presto\PrestoClient;

/**
 * Class Analysis
 */
final class Analysis
{
    /** @var PrestoClient */
    protected $client;

    /**
     * AnalysisRepository constructor.
     *
     * @param PrestoClient $client
     */
    public function __construct(PrestoClient $client)
    {
        $this->client = $client;
    }

    /**
     * @param string $name
     * @return array
     */
    public function allBy(string $name): array
    {
        $query = "SELECT redttt._key, redttt._value, test_id, test_name, created_at, uri, uuid 
              FROM my_tests.testing.tests AS myttt 
              INNER JOIN red_tests.test.string AS redttt ON redttt._key = myttt.test_name 
              INNER JOIN kafka_tests.analyze.action AS kafkataa ON kafkataa.name = myttt.test_name
              WHERE myttt.test_name = '{$name}' LIMIT 1";
        return $this->client->query($query);
    }
}

メッセージ受信時にユーザー名からRedis, MySQLから該当情報を取得し、ログデータを整形します。
最後にこのログをelasticsearchに格納します。

<?php
declare(strict_types=1);

namespace App\DataAccess;

use App\Foundation\Consumer\Consumable;
use App\Foundation\Elasticsearch\ElasticseachClient;
use Cake\Chronos\Chronos;
use RdKafka\Message;
use Ytake\PrestoClient\FixData;

/**
 * Class LoggableConsume
 */
final class LoggableConsume implements Consumable
{
    /** @var ElasticseachClient */
    protected $client;

    /** @var string */
    protected $index = 'log.index';

    /** @var Analysis */
    private $analysis;

    /**
     * LoggableConsume constructor.
     *
     * @param Analysis           $analysis
     * @param ElasticseachClient $client
     */
    public function __construct(Analysis $analysis, ElasticseachClient $client)
    {
        $this->analysis = $analysis;
        $this->client = $client;
    }

    /**
     * @param Message $message
     *
     * @return void
     */
    public function __invoke(Message $message)
    {
        $decode = json_decode($message->payload);
        /** @var FixData[] $response */
        $response = $this->analysis->allBy($decode->name);
        if (count($response)) {
            $params = [
                'index' => $this->index,
                'type'  => 'logs',
                'body'  => [
                    '_key'       => $response[0]['_key'],
                    '_value'     => $response[0]['_value'],
                    'test_id'    => $response[0]['test_id'],
                    'test_name'  => $response[0]['test_name'],
                    'created_at' => Chronos::now()->toUnixString(),
                    'uri'        => $response[0]['uri'],
                    'uuid'       => $response[0]['uuid'],
                ],
            ];
            $this->client->client()->index($params);
        }
    }
}

上記のクラスがメッセージ受信時に作用する様に、 サービスプロバイダでartisanコマンドとして登録します。

<?php
declare(strict_types=1);

namespace App\Providers;

use App\Console\ConsumerCommand;
use App\Console\InitRedisCommand;
use App\DataAccess\LoggableConsume;
use App\Foundation\Consumer\Consumer;
use Illuminate\Foundation\Application;
use Illuminate\Support\ServiceProvider;

/**
 * Class ConsoleServiceProvider
 */
class ConsoleServiceProvider extends ServiceProvider
{
    /** @var bool */
    protected $defer = true;

    public function boot()
    {
        $this->app->bind('app.command.kafka.consumer', function (Application $app) {
            return new ConsumerCommand(
                $app->make(Consumer::class),
                $app->make(LoggableConsume::class),
                'analyze.action'
            );
        });
        $this->commands([
            'app.command.kafka.consumer',
        ]);
    }

    /**
     * @return array
     */
    public function provides()
    {
        return [
            'app.command.kafka.consumer',
        ];
    }
}

これで php artisan kafka:consumer で起動しますが、
Kafkaからのメッセージは継続的に流れてくるため、supervisorなどを利用してdaemonとして動作するようにしてください。

実行すると下記の様にメッセージを受信し、prestoを使って結合したデータがelasticsearchに格納されます。
サンプルアプリケーションでは、 /analysis にアクセスするとこの結果を確認できます。

全体の流れとしては下記の流れになります。

f:id:ytakezawa:20171027011201p:plain

多少大げさな例ですが前回のスケールさせる例と、
今回の分散したデータを集計するのにもKafkaを利用しました。
次回はこのログデータ成形に利用しているtopicを他の言語の処理も同時に走らせて、
リアルタイムに近い結果を高速に返すアプリケーション例を紹介します。