Nightfall - Injetando dependências no Spark (Parte 1)

O início

Começamos a utilizar o Spark no Elo7 para processar métricas em tempo real, enviando eventos assíncronos do nosso site para serem consumidos em um sistema de agregação. Essa foi uma forma de remover o acoplamento entre as métricas e o negócio.

Mas o que é Spark?

Spark é uma plataforma para computação distribuída, que extende o modelo de MapReduce. É uma ferramenta de propósito geral e projetada para alta performance, incluindo queries iterativas e processamento em batch e streaming.

Por que Spark?

Precisávamos de uma ferramenta para análise em tempo real, e tínhamos preferência por opções com recursos de machine learning (que pensamos em utilizar futuramente). Optamos pelo Spark, que além de atender esses requisitos, possui integração nativa com o Apache Kafka e o Amazon Kinesis, que eram ferramentas cogitadas para Streams de mensagem.

Após a produção dos eventos, precisamos criar os consumidores para processá-los, que são nossos jobs do Spark. Nossos jobs possuem várias tasks onde cada uma processa um tipo de evento de uma forma específica. O nosso código no início do desenvolvimento dos jobs era parecido com:

  • Job
public class SparkJobExample {

    public static void main(String[] args) {
        MyJobConfiguration config = new MyJobConfiguration(args);
        JavaStreamingContext context = null;

        try {
            if (config.getSparkCheckpoint().isPresent()) {
                context = JavaStreamingContext.getOrCreate(
                        config.getSparkCheckpoint().get(),
                        (Function0<JavaStreamingContext>) () -> MyJob.createContext(config));
            } else {
                context = MyJob.createContext(config);
            }
            context.start();
            context.awaitTermination();
        } catch (Exception e) {
            throw e;
        } finally {
            if(context != null) {
                context.stop(true, true);
            }
        }
    }

    [...]

Para mais detalhes veja aqui

Dessa forma, tínhamos uma injeção de dependência para as configurações do banco e do job. Caso precisássemos fazer injeção em outras classes seria necessário fazer algo assim:

    @SuppressWarnings("unchecked")
    private static JavaStreamingContext createContext(MyJobConfiguration config) {
        [...]
        MyFilter myFilter = new MyFilter(config);

        Arrays.asList(
                new MyTask(config),
                new OtherTask(config, myFilter)
        ).stream().forEach(processor -> processor.process(stream));

        checkpoint.ifPresent(context::checkpoint);

        return context;
    }
}

Ou seja, para cada nova classe que nossa task (a classe que efetivamente executa o processamento dos eventos) utilize, precisamos instanciá-la no Job. Um pouco ruim não acham?

Simplificando as coisas

Para resolver nosso problema, criamos um projeto chamado Nightfall, que utiliza o Netflix Governator e o Google Guava para prover o contexto do Spark, injeção de dependência e configuração. Com o Nightfall, simplificamos o código dos nossos novos jobs, utilizando inversão de controle e injeção de dependências. Um exemplo do código:

@KafkaSimple
public class KafkaSimpleTest {

    public static void main(String[] args) {
        NightfallApplication.run(KafkaSimpleTest.class, args);
    }
}

Muito mais simples, não? O código acima provê um job SparkStream que utiliza o Simple API do Kafka.

Criando um Stream

Agora que já sabemos o que nos motivou a criar o Nightfall, vejamos como utilizá-lo para facilitar nossa vida :D

Digamos que temos um produtor de evento que envia um payload como o json abaixo:

{
    "type": "OrderStarted",
    "date": "2016-11-07T14:22:08.592-03:00",
    "payload": {
        "orderCreatedAt": "2016-11-02T18:00:50.798-03:00",
        "orderId": 1,
        "deviceFamily": "DESKTOP",
        "sellerName": "Vendedor 1",
        "buyerId": 3,
        "sellerId": 3,
        "products": [{
            "itemId": 1,
            "quantity": 3,
            "price": 500.00,
            "itemTitle": "Product 1"
        }],
        "access": "WEB_BROWSER",
        "totalPrice": 1500.00,
        "browser": "CHROME"
    }
}

Abaixo, o código para consumir esse evento como um Stream:

@KafkaSimple
public class OrderJob {

    public static void main(String[] args) {
        NightfallApplication.run(OrderJob.class, args);
    }
}

Após a criação do nosso job, precisamos criar a task para processar a mensagem. Nossa task é uma classe Java simples, anotada com @Task (anotação fornecida pelo NightFall). Ao inicializar o NightfallApplication, é realizado um classpath scan para encontrar todas as classes que contem essa anotação. Utilizarei a implementação que usa DataPoint (DataPoint é um contrato criado para padronizar a estrutura das mensagens, representando o evento a ser processado, possuindo uma data, tipo e um payload):

@Task
public class HelloWorldTask implements StreamTaskProcessor<DataPoint<String>> {
    private static final String ORDER_STARTED = "OrderStarted";

    @Override
    public void process(JavaDStream<DataPoint<String>> dataPointsStream) {
        dataPointsStream
                .filter(dataPoint -> DataPointValidator.isValidForType(dataPoint, ORDER_STARTED))
                .foreachRDD(rdd -> {
                    if (!rdd.isEmpty()) {
                        rdd.foreachPartition(partition -> partition.forEachRemaining(this::log));
                    }
                });
    }

}

Esse é um exemplo de task que processaria apenas os eventos do tipo OrderStarted.

Já sabemos porque o NightFall foi criado e como ele funciona, então vejamos mais alguns exemplos. Primeiramente veremos um exemplo de Stream.

  • Siga as intruções do Quick Start Kafka para:
    1. Instalação e startup. OBS: utilizar a versão 0.8.2 do Kafka.
    2. Crie um tópico no Kafka.
    3. Enviar mensagens para o tópico criado.

Uma vez que temos um tópico, podemos criar um stream para consumir as mensagens. Para isso, podemos utilizar o próprio projeto do Nightfall para adicionar a task que criamos acima, adicionando-a no sub-módulo examples do Nightfall. Além de adicionar o job e a task, precisaremos configurar o arquivo nightfall.properties:

# Kafka Consumer
kafka.brokers=localhost:9092
kafka.topics=${NOME_DO_TÓPICO_CRIADO}

# Kafka Offset
kafka.offset.persistent=false
kafka.cassandra.hosts=cassandra
kafka.cassandra.keyspace=kafka
kafka.cassandra.auto.migration=false
kafka.cassandra.user=
kafka.cassandra.password=
kafka.cassandra.datacenter=

# Stream Configurations
stream.batch.interval.ms=20000
stream.provider.converter=com.elo7.nightfall.di.providers.spark.stream.DataPointStreamContextConverter
stream.checkpoint.directory=/Users/developer/dev/tmp/examples/HelloWorldJob

# Monitoring config
reporter.statsd.host=localhost
reporter.statsd.port=8125
reporter.statsd.prefix=dev.spark
reporter.enabled=false
reporter.class=com.elo7.nightfall.di.providers.reporter.jmx.JMXReporterFactory

Após a configuração do arquivo localizado em examples/src/main/resources podemos executar o job através do comando:

./gradlew 'jobs/example':run -PmainClass="${JOB_PACKAGE}.OrderJob"

Após o job ser iniciado podemos enviar um evento do tipo OrderStarted (como no exemplo mais acima). Será impresso o json nos logs; caso enviemos um outro tipo de evento, ele não será exibido.

Criando um Batch

Agora podemos criar nosso job, task e configurações para processar em Batch ao invés de Stream. Para isso, precisaremos criar o seguinte job:

@FileRDD
public class BatchOrderJob {

    public static void main(String[] args) {
        NightfallApplication.run(OrderJob.class, args);
    }
}

Precisamos criar a task também:

@Task
public class BatchHelloWorldTask implements BatchTaskProcessor<DataPoint<String>> {
    private static final String ORDER_STARTED = "OrderStarted";

    @Override
    public void process(JavaRDD<DataPoint<String>> dataPointsStream) {
        dataPointsStream
                .filter(dataPoint -> DataPointValidator.isValidForType(dataPoint, ORDER_STARTED))
                .foreachPartition(
                    rdd.foreachPartition(partition -> partition.forEachRemaining(this::log))
                );
    }

}

Não podemos esquecer de criar as seguintes configurações:

# Batch Configuration
# File Configuration
file.s3.access.key=
file.s3.secret.key=
file.source=/tmp/nightfall

# Batch
batch.history.enabled=false

# Batch - Job History
batch.cassandra.hosts=cassandrar
batch.cassandra.port=9042
batch.cassandra.user=
batch.cassandra.password=
batch.cassandra.keyspace=kafka
batch.cassandra.datacenter=
batch.history.ttl.days=7

Como passamos na configuração file.source um arquivo local, precisaremos criar o arquivo compactado contendo os eventos que serão processados pelo Batch. O arquivo que vamos utilizar é um txt (zipado) com os eventos, localizado no caminho especificado. Para executar o Batch executamos o seguinte comando:

./gradlew 'jobs/example':run -PmainClass="${JOB_PACKAGE}.BatchOrderJob"

Podemos ver a impressão dos eventos que são do tipo OrderStarted no log da aplicação :)

É hora da revisão

Nesse post vimos o que nos motivou a criar o Nightfall, as configurações básicas para conseguir criar um Stream e um Batch. Aqui vocês podem acessar o repositório contendo todos os códigos mostrados nesse post ;D Por hoje é só, pessoal, mas iremos fazer uma série de posts para explicar mais usos do Nigthfall. Gostou? Se tiver algo para acrescentar/sugerir/dúvida, deixe nos comentários e aguardem os próximos posts.