Tutorial de Apache Spark
    1. Introdução
    2. Informação adicional
    3. Shell & SparkContext
    4. Introdução ao Scala
    5. Abstrações de dados
    6. RDDs
    7. Shared vars
    8. DataFrame
    9. Operações de DataFrame
    10. Transformações
    11. Ações

  1. Introdução
  2. Todos os exemplos a seguir introduzem os conceitos básicos do Apache Spark. Baixe os arquivos spark-training.zip e urbs.zip para seguir este tutorial.


    O spark-training.zip é uma instalação auto-contida do Apache Spark (v-1.6.1). Baixe e execute Spark diretamente deste .zip. ;)

    spark-training.zip

    Dados dos ônibus de Curitiba fornecidos pela URBS.

    urbs.zip

  3. Informação adicional
  4. Referências adicionais.

  5. Shell & SparkContext
  6. Após baixar o spark-training.zip, descompacte-o e inicie o spark-shell. Descompacte também o urbs.zip dentro do diretório spark-training/spark.

    ramiro@atm:~/nobackup/$ wget -c http://www.inf.ufpr.br/erlfilho/tutorials/spark/spark-training.zip
    ramiro@atm:~/nobackup/$ unzip spark-training.zip
    ramiro@atm:~/nobackup/$ cd spark-training
    ramiro@atm:spark-training$ cd spark
    ramiro@atm:spark$ mv ~/Downloads/urbs.zip .
    ramiro@atm:spark$ unzip urbs.zip
    ramiro@atm:spark$ ./bin/spark-shell
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
          /_/
    
    Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_60)
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala>

  7. Introdução ao Scala
  8. Antes de começarmos o tutorial vamos revisar alguns conceitos de Scala. Para eventuais dúvidas sobre assuntos não abordadas neste tutorial consulta o manual da linguagem.

    Scala faz inferência de tipos. Não é preciso declarar o tipo da variável. No exemplo abaixo é criado uma tupla com um inteiro e uma string. Usando getClass é possível verificar que Scala sabe exatamente os tipos de cada variável, sendo a tupla uma variável com um conjunto de tipos.

    scala> val tup1 = (0, "zero")
    scala> tup1.getClass
    res1: Class[_ <: (Int, String)] = class scala.Tuple2
    scala> val tup2 = (1, "um", 1.0)
    scala> tup2.getClass
    res2: Class[_ <: (Int, String, Double)] = class scala.Tuple3
    scala> val tup2 = (1, "um", 1.0)
    scala> tup2.getClass
    res2: Class[_ <: (Int, String, Double)] = class scala.Tuple3

    É possível acessar os valores de cada tupla utilizando a sintaxe sublinhado ("_").

    scala> println(tup1._1)
    0

    scala> println(tup1._2)
    zero

    scala> println(tup2._2)
    um

    scala> println(tup2._3)
    1.0

    Para definir uma função em Scala é preciso dizer o tipo dos parâmetros e o tipo do retorno. Em Scala é possível definir funções dentro de funções. Além disso, Scala suporta nomes de funções com os caracteres +, ++, ~, &,-, -- , \, /, : etc. Abaixo segue a função sum que realiza a some de dois inteiros.

    scala> def sum (x: Int, y: Int) : Int = { return x + y }
    sum: (x: Int, y: Int)Int

    scala> sum (1,2)
    res0: Int = 3

    Funções não necessariamente precisam ter um nome. Toda função sem nome é dita função anônima ou função lambda. Abaixo segue uma função lambda que soma dois inteiros.

    scala> (x: Int, y: Int) => x + y
    res1: (Int, Int) => Int = <function2<

    scala> res1(1,2)
    res2: Int = 3

    Para que serve uma função anônima (lambda)? ou, como invocar uma função lambda? Muitas funções do Spark tem como parâmetro outras funções. Portanto, funções lambdas são importantes para não precisar declarar formalmente uma função. Além de simplificar e economizar código.

    A função max apresentada abaixo retorna o maior valor dentre dois inteiros. Note que max recebe dois inteiros, não atribuindo a eles nenhuma variável. No corpo da função max há a declaração de uma função lambda, que recebe dois inteiros, e verifica o maior inteiro.

    scala> val max: (Int, Int) => Int = (m: Int, n: Int) => if(m > n) m else n
    max: (Int, Int) => Int = <function2<

    scala> max(3,4)
    res8: Int = 4

    a função max pode ser simplificada como abaixo.

    scala> def max (x: Int, y: Int) : Int = { if (x > y) x else y }

  9. Abstrações de dados
  10. Toda aplicação Spark consiste de um driver que executa uma função main escrita pelo usuário e executa também várias operações em paralelo no cluster. Cada uma destas operações é executada sobre uma coleção de dados, representada internamente no Spark por uma abstração de dados.

    RDD
    Um RDD (Resilient Distributed Dataset) é a primeira abstração fornecida pelo Spark para a manipulação de dados. É uma representação de um dado distribuído pelos nodos do cluster que pode ser operado em paralelo. RDDs podem ser criados a partir de arquivos no HDFS ou de coleções da linguagem Scala.
    Shared vars
    Uma Shared variable é a segunda abstração fornecida pelo Spark utilizada em operações paralelas. Quando Spark executa uma função em paralelo como um conjunto de Tarefa em diferentes nodos, Spark copia cada variável usada na função para cada Tarefa. As vezes é necessário compartilhar estas variáveis entre as Tarefas, ou entre as Tarefas e o programa driver. Spark suporta dois tipos de shared variables: broadcast variables, que são utilizadas para compartilhar valores entre todos os nodos. E accumulator, que são variáveis que somente agregam valores.
    DataFrame
    Um DataFrame é uma coleção de dados distribuída organizada em colunas. É equivalente a uma tabela em um banco de dados relacional. Os DataFrames podem ser construídos a partir de: Dados Estruturados, Tabelas do Hive, ou RDDs.
    Datasets
    Não é abordado neste tutorial.
    Um Dataset é uma interface experimental adicionada no Spark 1.6 que mescla os benefícios do RDD com Spark SQL. Um Dataset pode ser constuído a partir de objetos JVM e operados com transformações funcionais (map, flatMap, filter, etc).

  11. RDDs
  12. Existem duas maneiras de criar um RDD: (1) paralelizando uma coleção já existente dentro do programa, ou (2) referenciando dados em um sistema externo ao programa (HDFS, HBase, ou qualquer fonte de dado com um Hadoop InputFormat).

    RDDs a partir de uma coleção existente

    Para paralelizar uma coleção é preciso invocar o método parallelize do Spark Context sobre uma coleção existente.

    scala> val data = Array(1, 2, 3, 4, 5)
    scala> val distData = sc.parallelize(data)
    distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at :29

    Também é possível utilizar a função makeRDD para criar um RDD.

    scala> val data = Array(1, 2, 3, 4, 5)
    scala> val dataRDD = sc.makeRDD(data)
    dataRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at makeRDD at :29

    Não existe diferença entre makeRDD e parallelize. A função makeRDD invoca parallelize. Você pode verificar a invocação de parallelize diretamente na implementação do makeRDD

    Um parâmetro importante para a coleção paralela é o número de partições em qual o dado será dividido. Spark executará uma Tarefa para cada partição no cluster. Normalmente se divide em 2 à 4 partições por CPU do cluster. O Spark tenta configurar o número de partições de acordo com o seu cluster, mas você pode configurar esse número manualmente passando o número de partições como segundo argumento para parallelize().

    scala> val data = Array(1, 2, 3, 4, 5)
    scala> val distData = sc.parallelize(data, 10)

    Uma vez criado o RDD as operações executadas sobre ele são automaticamente realizadas em paralelo pelo Spark de maneira transparente. Por exemplo, é possível somar os valores de distData com:

    scala> distData.reduce((a, b) => a + b)

    RDDs a partir de dados externos

    Spark pode criar RDDs a partir de qualquer fonte de dados suportada pelo Hadoop, incluindo seu sistema de arquivos local, HDFS, Cassandra, HBase, Amazon S3, entre outros. Spark suporta qualquer formato suportado por Hadoop InputFormat.

    RDDs podem ser criados a partir de arquivos texto usando o método textFile do SparkContext. Este método pega a URI do arquivo (que pode ser um caminho relativo no sistema de arquivo local, ou um caminho completo de um sistema externo como hdfs:// e s3n://) e lê o conteúdo do arquivo como uma coleção de linhas..

    scala> val distFile = sc.textFile("README.md")
    distFile: RDD[String] = MappedRDD@1d4cee08

    Uma vez criado o RDD, distFile pode ser transformados com operações chamadas de dataset operations. Por exemplo, podemos somar todos os tamanhos das linhas usando as operações map e reduce.

    scala> distFile.map(s => s.length).reduce((a, b) => a + b)

    Observações: sobre ler arquivos com Spark.

    • Se estiver lendo um arquivo do sistema de arquivos local, este arquivo precisa estar acessível com o mesmo caminho em todos os nodos. Ou o mesmo arquivo é copiado para todos os nodos, ou o arquivo é compartilhado atrávez de um sistema de arquivos distribuído como NFS.
    • Todos os métodos para ler arquivos, como o textFile, suportam ler diretórios, arquivos comprimidos e expressões regulares (Unix-like). Por exemplo, você pode usar: textFile("/dir"), textFile("/dir/*.txt"), and textFile("/dir/*.gz").
    • O método textFile tem um parâmetro adicional para controlar o número de partições do arquivo. Por padrão, Spark cria uma partição para cada block do arquivo (64MB, tamanho padrão do HDFS), mas você pode configurar o Spark para usar um número maior de partições. Não é possível ter um número menor de blocos.

    Além dos arquivos texto, Spark suporta diversos outros formatos:

    • O método SparkContext.wholeTextFiles lê um diretório contendo vários arquivos texto e retorna um vetor com o par {nome-do-arquivo, conteúdo-do-arquivo}.
    • Para ler outros formatos é possível usar o método SparkContext.hadoopRDD. Este método lê a classe InputFormat de um dado jobConf

    Salvando arquivos

    Para salvar um RDD em arquivo pode-se usar o método saveAsObjectFile, que salva o arquivo usando serialização Java, ou saveAsTextFile que salva o arquivo em texto puro.

    scala> val file = sc.textFile("README.md")
    file: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[10] at textFile at :27

    scala> file.count
    res1: Long = 95

    scala> file.saveAsObjectFile("/tmp/readme_object")
    scala> file.saveAsTextFile("/tmp/readme_text")

    O método saveAsObjectFile salva os arquivos utilizando serialização Java. As partições são convertidas para um stream de bytes, onde este stream pode ser posteriormente revertido ao arquivo original.

    ramiro@atm:spark-training$ cat /tmp/readme_object/part-00000
    SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.
    BytesWritableC���kuPj���:���ur
    [Ljava.lang.String;��V��{Gxpt# Apache SparkttNSpark is a fast and general
    cluster computing system for Big Data. It providestKhigh-level APIs in Scala,
    Java, Python, and R, and an

    O método saveAsTextFile salva todas as partições em texto puro.

    ramiro@atm:spark-training$ cat /tmp/readme_text/part-00000
    Spark is a fast and general cluster computing system for Big Data. It provides
    high-level APIs in Scala, Java, Python, and R, and an optimized engine that
    supports general computation graphs for data analysis. It also supports a

    Observe que em ambos os métodos o arquivo é salvo em diversas partições (e.g., part-0000).

    ramiro@atm:spark-training$ ls /tmp/readme_object/
    _SUCCESS part-00000 part-00001

    ramiro@atm:spark-training$ ls /tmp/readme_text/
    _SUCCESS part-00000 part-00001

    Para recuperar os arquivos é possível ler todo o conteúdo do diretório utilizando textFile. Note que o número de linhas é o mesmo, pois os arquivos são os mesmos.

    scala> val f = sc.textFile("/tmp/readme_text/*")
    f: org.apache.spark.rdd.RDD[String] = /tmp/readme_text/ MapPartitionsRDD[10] at textFile at :27

    scala> f.count
    res28: Long = 95

    scala> val f = sc.objectFile("/tmp/readme_object/*")
    f: org.apache.spark.rdd.RDD[Nothing] = MapPartitionsRDD[7] at objectFile at :27

    scala> f.count
    res4: Long = 95

  13. Shared vars
  14. As funções Spark (e.g., map, reduce) são executadas em um cluster, onde cada nodo do cluster executa uma cópia da função com uma cópia de todas as variáveis do método. Estas variáveis são copiadas para cada nodo e não são atualizadas para o programa principal (i.e., driver).

    Implementar variáveis de leitura e escrita compartilhadas entre todos os nodos é ineficiente. Ao invés disto, Spark fornece duas maneiras limitadas de compartilhar variáveis. A primeira são as variáveis do tipo broadcast, a segunda são os accumulators.

    Broadcast variables

    Variáveis do tipo broadcast possibilitam o programador manter uma variável somente leitura em cache em cada um dos nodos, ao invés de enviar uma cópia do dado para cada um dos nodos. Essas variáveis podem ser usadas, por exemplo, para dar a cada nodo uma cópia de um dado entrada muito grande de maneira eficiente.

    Criar variáveis do tipo broadcast é somente útil quando diversas tarefas (executando em diferentes nodos) precisam do mesmo dado. A variável do tipo broadcast é uma interface para outra variável. O exemplo abaixo cria uma variável broadcast onde broadcastVar é uma interface para Array(1, 2, 3).

    scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
    broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

    scala> broadcastVar.value
    res0: Array[Int] = Array(1, 2, 3)

    Accumulators

    As variáveis do tipo accumulators são variáveis que só podem ser "somadas". São variáveis utilizadas para implementar contadores (como os dos MapReduce). Spark suporta nativamente accumulators numéricos, e tais variáveis são criadas utilizando a função SparkContext.accumulator(v). Tarefas executando em um cluster podem, então, usar += para somar valores no accumulator v. Entretanto, as Tarefas não podem ler este valor. Somente o programa driver pode ler v. O exemplo abaixo mostra uma variável do tipo accumulator sendo usada para contar os valores de um array.

    scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
    broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(7)

    scala> broadcastVar.value
    res5: Array[Int] = Array(1, 2, 3)

    scala> val accum = sc.accumulator(0, "My Accumulator")
    accum: org.apache.spark.Accumulator[Int] = 0

    scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

    scala> accum.value
    res7: Int = 10

    Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like map(). The below code fragment demonstrates this property:

    scala> val accum = sc.accumulator(0)
    scala> data.map { x => accum += x; f(x) }
    // Here, accum is still 0 because no actions have caused the map to be computed.

  15. DataFrame
  16. Para ler arquivos de outros formatos de arquivo é preciso chamar os métodos do sqlContext. Formatos como parquet e json podem ser lidos como no exemplo abaixo. Note que diferentemente do método textFile o valor retornado é um DataFrame e não um RDD.

    scala> import org.apache.spark.sql._
    scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    scala> val j = sqlContext.read.json("urbs/linhas.json")
    j: org.apache.spark.sql.DataFrame =
    [CATEGORIA_SERVICO: string, COD: string, NOME: string, SOMENTE_CARTAO: string]

  17. Operações de DataFrame
  18. DataFrames provêem uma languagem específica de domínio para manipulação de dados estruturados. Para uma lista completa das operações que podem ser executadas sobre um DataFrame, leia a documentação da API. Abaixo nós incluímos alguns exemplos básicos para a manipulação de DataFrames:

    scala> j.printSchema
    root
    |-- CATEGORIA_SERVICO: string (nullable = true)
    |-- COD: string (nullable = true)
    |-- NOME: string (nullable = true)
    |-- SOMENTE_CARTAO: string (nullable = true)

    Abaixo seguem exemplos de "consultas" utilizando DataFrame.

    scala> j.count
    res1: Long = 412

    Selectione os elementos da coluna COD e me mostre os três primeiros.

    scala> j.select("COD").show(3)
    +---+
    |COD|
    +---+
    |464|
    |226|
    |182|
    +---+
    only showing top 3 rows
    

    Selecione todos os COD distintos.

    scala> j.select("COD").distinct.show(2)
    +---+
    |COD|
    +---+
    |899|
    |331|
    +---+
    only showing top 2 rows

    Selecione todos as tuplas odenando-as por CATEGORIA_SERVICO.

    scala> j.orderBy("CATEGORIA_SERVICO").show(3)
    +-----------------+---+--------------+--------------+
    |CATEGORIA_SERVICO|COD|          NOME|SOMENTE_CARTAO|
    +-----------------+---+--------------+--------------+
    |      ALIMENTADOR|232|       ALIAN?A|             N|
    |      ALIMENTADOR|823|       AUGUSTA|             S|
    |      ALIMENTADOR|629|ALTO BOQUEIR?O|             N|
    +-----------------+---+--------------+--------------+
    only showing top 3 rows

    Selecione o número de ônibus que somente aceitam cartão.

    scala> j.filter( j("SOMENTE_CARTAO").contains("S") ).count
    res19: Long = 71

    Selecione as linhas onde o nome começa com "MA".

    scala> j.filter( j("NOME").startsWith("MA")).show(2)
    +-----------------+---+----------------+--------------+
    |CATEGORIA_SERVICO|COD|            NOME|SOMENTE_CARTAO|
    +-----------------+---+----------------+--------------+
    |      MADRUGUEIRO|189|  MAD. ABRANCHES|             N|
    |      MADRUGUEIRO|549|MAD. BAIRRO NOVO|             N|
    +-----------------+---+----------------+--------------+
    only showing top 2 rows

    Seleciona as linhas onde o nome da linha termina com "MA".

    scala> j.filter( j("NOME").endsWith("MA")).show
    +-----------------+---+---------+--------------+
    |CATEGORIA_SERVICO|COD|     NOME|SOMENTE_CARTAO|
    +-----------------+---+---------+--------------+
    |      ALIMENTADOR|639| FUTURAMA|             N|
    |      ALIMENTADOR|634|    PLUMA|             N|
    |     CONVENCIONAL|274|STA. GEMA|             S|
    +-----------------+---+---------+--------------+ 
    

    Utilizando for, imprima cada código COD.

    for (key <- j.select("COD")) { println(key) } 
    

    Para demais funções disponíveis digite j. e pressione TAB duas vezes.

  19. Transformações
  20. A tabela a seguir lista algumas das transformações mais comuns implementadas pelo Spark. Para mais detalhes recorra à API dos RDDs (Scala, Java, Python, R) e para a documentação das funções para pares RDD (Scala, Java) .

    Transformação Descrição Exemplo Resultado
    map(func) retorna um RDD depois de processar cada elemento com a função func sc.parallelize(Array(1, 2, 3, 4, 5)).map (x => x + 10) { 11, 12, 13, 14, 15 }
    filter(func) Retorna um RDD depois de selecionar os enementos onde func é verdadeira sc.parallelize(Array(1, 2, 3, 4, 5)).filter( x => x == 3) 3
    flatMap(func) Similar ao map, mas cada item pode ser mapeado para 0 ou mais items (func deve retornar uma sequência, ao invés de um item). sc.parallelize(List("this is line 1", "this is the second line"), 2).flatMap(_.split(" ")).foreach {println _} {this, is, the, second, line, this, is, line, 1}
    mapPartitions(func) Similar ao map, mas executa func separadamente em cada partição (bloco). A função func deve ser do tipo Iterator<T> => Iterator<U> quando invocada sobre um RDD do tipo T. sc.parallelize(Array((1,2), (1,2), (1,2), (3,4), (5,6), (7,8), (7,8)), 3).mapPartitions ( x => x.map( (t: (Int, Int)) => t._1 + t._2 ) ).foreach {println } {3, 3, 3, 7, 11, 15, 15}
    mapPartitionsWithIndex(func) Similar a mapPartitions, mas provê func com um valor inteiro representando o índice da partição. A função func deve ser do tipo Iterator<T> => Iterator<U> quando invocada sobre um RDD do tipo T. sc.parallelize(Array((1,2), (1,2), (1,2), (3,4), (5,6), (7,8), (7,8)), 3).mapPartitionsWithIndex ( (index: Int , x) => x.map( (t: (Int, Int)) => (index, t._1 + t._2 )) ).foreach {println } (0,3)
    (0,3)
    (1,3)
    (1,7)
    (2,11)
    (2,15)
    (2,15)
    sample(withReplacement, fraction,seed) Amostragem de uma fração de um dado, com ou sem substituição, usando um número aleatório. sc.parallelize(Array(1, 2, 3, 4, 5)).sample(false, 0.4).foreach { println _ } {3,4,2}
    union(otherDataset) Retorna uma coleção que contém a união de todos os elementos da fonte e do argumento. sc.parallelize(Array(1, 2, 3, 4, 5)).union( sc.parallelize(Array(6,7,8,9,10)) ).foreach( println _ ) {1,2,3,4,5,6,7,8,9,10}
    intersection(otherDataset) Retorna um RDD que contém uma interseção dos elementos da fonte e do argumento. sc.parallelize(Array(1, 2, 3, 4, 5)).intersection( sc.parallelize(Array(4,5,6,7,8)) ).foreach( println _ ) {5,4}
    distinct([numTasks])) Retorna uma coleção que contém os elementos distintos da fonte. sc.parallelize(Array((1,2), (1,2), (1,2), (3,4), (5,6), (7,8), (7,8))).distinct().foreach { println _ } {(1,2), (5,6), (7,8), (3,4)}
    groupByKey([numTasks]) Quando invocado sobre uma coleção de pares (k, v), retorna um par (k, Iterable<v>). sc.parallelize(Array((1,2), (1,2), (1,2), (3,4), (5,6), (7,8), (7,8))).groupByKey().foreach { println _ } (1,CompactBuffer(2, 2, 2))
    (3,CompactBuffer(4))
    (7,CompactBuffer(8, 8))
    (5,CompactBuffer(6))
    reduceByKey(func, [numTasks]) Quando invocado sobre uma coleção de pares (k, v), retorna uma coleção de pares (k, v) onde os valores de cada chave são agregados usando pela função func, que deve ser do tipo (v, v). sc.parallelize(Array((1,2), (1,2), (1,2), (3,4), (5,6), (7,8), (7,8))).reduceByKey( _ + _ ).foreach { println _ } (1,6)
    (3,4)
    (7,16)
    (5,6)
    aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) Quando invocado sobre uma coleção de pares (k, v) a função seqOp é aplicada nos valores v de cada chave. Depois, função combOp é invocada para agregar os valores resultantes de seqOp. O zeroValue representa o valor zero (ponto inicial) para calcular o valor de retorno da função combOp. Esta função permite que o tipo do valor de retorno seja diferente do tipo valor de entrada. O tipo do valor de retorno é definido por zeroValue. sc.parallelize(Array((1,2), (1,2), (1,2), (3,4), (5,6), (7,8), (7,8)), 3 ).aggregateByKey(List[Int]())( (x,y) => x ++ List(y) , _ ++ _ ).foreach {println} (1,List(2, 2, 2))
    (3,List(4))
    (7,List(8, 8))
    (5,List(6))
    sortByKey([ascending], [numTasks]) Quando invocado sobre uma coleção de pares (k, v), onde k implementa Ordered, retorna uma coleção de pares (k, v) ordenados pela chaves. sc.parallelize(Array((1,2), (1,2), (1,2), (3,4), (5,6), (7,8), (7,8)), 1 ).sortByKey().foreach {println _} (1,2)
    (1,2)
    (1,2)
    (3,4)
    (5,6)
    (7,8)
    (7,8)
    join(otherDataset, [numTasks]) Quando invocado sobre uma coleção de pares do tipo (k, v) e (k, w), retorna uma coleção de pares (k, (v, w)). sc.parallelize(Array ((1, "um"), (2, "dois"), (3, "tres"))) .join( sc.parallelize(Array ((1, "one"), (2, "two"), (3, "three"))) ).foreach { println _ } (1,(um,one))
    (3,(tres,three))
    (2,(dois,two))
    cogroup(otherDataset, [numTasks]) Mesmo que groupWith Quando invocado sobre uma coleção de pares (k, v) e (k, w), retorna uma coleção do tipo (k, (Iterable<v>,Iterable<w>)). sc.parallelize(Array ((1, "um"), (2, "dois"), (3, "tres"))) .cogroup( sc.parallelize(Array ((1, "one"), (2, "two"), (3, "three"))) ).foreach { println _ } (2,(CompactBuffer(dois),CompactBuffer(two)))
    (1,(CompactBuffer(um),CompactBuffer(one)))
    (3,(CompactBuffer(tres),CompactBuffer(three)))
    cartesian(otherDataset) Quando invocado sobre uma coleção de dados do tipo T e U, retorna o conjunto de pares (T, U). sc.parallelize (1 to 2).cartesian( sc.parallelize(10 to 11) ).foreach {println _ } (1,10) (1,11)
    (2,10)
    (2,11)
    pipe(command, [envVars]) Concatena cada partição do RDD por meio de um comando shell e.g. um script Perl ou Bash. Elementos do RDD são escritos para a stdin do processo. ramiro$ cat script.sh
    #!/bin/bash
    while read line; do echo "(script, $line)"; done
    sc.parallelize (0 to 5).pipe ("/Users/Edson/Development/spark-training/usb/spark-training/spark/script.sh").foreach {println _ }
    (script, 3)
    (script, 4)
    (script, 5)
    (script, 0)
    (script, 1)
    (script, 2)
    coalesce(numPartitions) Diminui o número de partições de um RDD para numPartitions. scala> val d = sc.parallelize (1 to 100, 4)
    scala> d.partitions.size
    res29: Int = 4

    scala> d.coalesce (2).partitions.size
    res32: Int = 2
    4, 2
    repartition(numPartitions) Distribui novamente os dados de um RDD aleatóriamente para criar, ou mais, ou menos partições. scala> val d = sc.parallelize (1 to 100, 4)
    scala> d.partitions.size
    res29: Int = 4

    scala> scala> d.repartition(2).partitions.size
    res36: Int = 2
    4, 2

  21. Ações
  22. A tabela a seguir lista algumas das ações mais comuns implementadas pelo Spark. Para mais detalhes recorra à API dos RDDs (Scala, Java, Python, R) e para a documentação das funções para pares RDD. (Scala, Java) for details.

    Ações Descrição Exemplo Resultado
    reduce(func) Agrega os elementos de uma coleção usando a função func (que precisa de dois argumentos, retornando um). A função deve ser acumulativa ou associativa, para ser computada corretamente em paralelo. sc.parallelize(Array(1, 2, 3, 4, 5)).reduce( (a, b) => a+b ) 15
    collect() Retorna todos os elementos de uma coleção como um vetor dentro do programa driver. sc.parallelize(Array(1, 2, 3, 4, 5)).collect Array(1, 2, 3, 4, 5)
    count() Retorna o número de elementos dentro da coleção de dados. sc.parallelize(Array(1, 2, 3, 4, 5)).count 5
    first() Retorna o primeiro elemento da coleção de dados. sc.parallelize(Array(1, 2, 3, 4, 5)).first 1
    take(n) Return an array with the first n elements of the dataset. sc.parallelize(Array(1, 2, 3, 4, 5)).take(2) Array(1, 2)
    takeSample(withReplacement, num, [seed]) Retorna um vetor com uma amostra aleatória de num selecionando opcionalmente uma semente para os números aleatóreos. O parâmetro withReplacemente diz respeito a repetir os elementos na amostra ou não. sc.parallelize(Array(1, 2)).takeSample(true,4) Array(1, 1, 2, 1)
    takeOrdered(n, [ordering]) Retorna os primeiros n elementos de um RDD usando ou a ordem natural ou um comparador customizado. sc.parallelize(Array(1, 2, 3, 4, 5)).takeOrdered(3) Array(1, 2, 3)
    saveAsTextFile(path) Escreve todos os elementos da coleção de dados em um arquivo texto no sistema de arquivos local ou em qualquer sistema de arquivos suportado pelo Hadoop. sc.textFile("README.md").saveAsTextFile("README.md2") README.md2/_SUCCESS
    README.md2/part-00000
    README.md2/part-00001
    saveAsSequenceFile(path)
    (Java and Scala)
    Escreve todos os elementos da coleção de dados como um SequenceFile do Hadoop no sistem de arquivos local, ou em qualquer sistema de arquivos suportado pelo Hadoop. sc.parallelize(Array((1,2), (1,2), (1,2), (3,4), (5,6), (7,8), (7,8))).saveAsSequenceFile("SEQ") SEQ/_SUCCESS
    SEQ/part-00000
    SEQ/part-00001
    saveAsObjectFile(path)
    (Java and Scala)
    Escreve todos os elementos da coleção de dados em um formato simples usando serialização Java, que pode ser deserializada posteriormente usando SparkContext.objectFile(). sc.textFile("README.md").saveAsObjectFile("README.obj") README.obj/_SUCCESS
    README.obj/part-00000
    README.obj/part-00001
    countByKey() Somente disponível em RDDs do tipo (k, v) hashmap de pares (K, Int) com o número de ocorrência de cada chave. sc.parallelize(Array((1,2), (1,2), (1,2), (3,4), (5,6), (7,8), (7,8))).countByKey Map(1 -> 3, 3 -> 1, 7 -> 2, 5 -> 1)
    foreach(func) Executa uma função func em cada elemento da coleção de dados. Geralmente utilizada para atualizar contadores ou interagir com sistemas de arquivos externos.
    Observação: modificar accumulators fora de um foreach pode um comportamento indefinido.
    sc.parallelize(Array(1, 2, 3, 4, 5)).foreach { println _ } {1,2,3,4,5}