RDD ved hjælp af Spark: byggestenen til Apache Spark



Denne blog om RDD ved hjælp af Spark giver dig en detaljeret og omfattende viden om RDD, som er den grundlæggende enhed i Spark & ​​hvor nyttig den er.

, Selve ordet er nok til at skabe en gnist i enhver Hadoop-ingeniørs sind. TIL n i hukommelsen behandlingsværktøj som er lynhurtig i klyngecomputering. Sammenlignet med MapReduce skaber datadeling i hukommelsen RDD'er 10-100x hurtigere end netværks- og diskdeling, og alt dette er muligt på grund af RDD'er (Resilient Distribuerede datasæt). De vigtigste punkter, vi fokuserer i dag i denne RDD ved hjælp af Spark-artiklen, er:

Brug for RDD'er?

Hvorfor har vi brug for RDD? -RDD ved hjælp af Spark





Verden udvikler sig med og Datalogi på grund af fremskridt i . Algoritmer baseret på Regression , , og som kører videre Distribueret Iterativ Comput ation mode, der inkluderer genbrug og deling af data mellem flere computerenheder.

Det traditionelle teknikker havde brug for en stabil mellemliggende og distribueret opbevaring som HDFS omfattende gentagne beregninger med datareplikationer og dataserialisering, hvilket gjorde processen meget langsommere. Det var aldrig let at finde en løsning.



Det er her RDD'er (Resilient Distribuerede datasæt) kommer til det store billede.

RDD s er nemme at bruge og ubesværet at oprette, da data importeres fra datakilder og falder ind i RDD'er. Yderligere anvendes operationerne til at behandle dem. De er en distribueret samling af hukommelse med tilladelser som Læs kun og vigtigst af alt er de Fejl tolerant .



Hvis nogen datadeling af RDD er faret vild , det kan regenereres ved at anvende det samme transformation operation på den mistede partition i afstamning , snarere end at behandle alle data fra bunden. Denne form for tilgang i realtidsscenarier kan få mirakler til at ske i situationer med datatab eller når et system er nede.

Hvad er RDD'er?

RDD eller ( Resilient Distribueret datasæt ) er en grundlæggende datastruktur i Spark. Begrebet Robust definerer den evne, der genererer data automatisk eller data rullende tilbage til oprindelige tilstand når der opstår en uventet katastrofe med sandsynlighed for datatab.

Dataene skrevet i RDD'er er opdelt og opbevares i flere eksekverbare noder . Hvis en udførende node mislykkes i løbetid, så får det øjeblikkeligt sikkerhedskopien fra næste eksekverbare node . Dette er grunden til, at RDD'er betragtes som en avanceret type datastrukturer sammenlignet med andre traditionelle datastrukturer. RDD'er kan gemme strukturerede, ustrukturerede og semistrukturerede data.

Lad os gå videre med vores RDD ved hjælp af Spark-blog og lære om de unikke funktioner i RDD'er, der giver den en fordel i forhold til andre typer datastrukturer.

Funktioner ved RDD

  • In-Memory (VÆDDER) Beregninger : Begrebet In-Memory-beregning fører databehandlingen til et hurtigere og effektivt stadium, hvor det samlede ydeevne af systemet er opgraderet.
  • L hans evaluering : Udtrykket Lazy evaluering siger transformationer anvendes til dataene i RDD, men output genereres ikke. I stedet er de anvendte transformationer logget.
  • Udholdenhed : De resulterende FUD'er er altid genanvendelig.
  • Grovkornede operationer : Brugeren kan anvende transformationer til alle elementer i datasæt igennem kort, filter eller gruppere efter operationer.
  • Fejl tolerant : Hvis der er tab af data, kan systemet rul tilbage til dens oprindelige tilstand ved hjælp af det loggede transformationer .
  • Uforanderlighed : Data defineret, hentet eller oprettet kan ikke være ændret når det er logget ind i systemet. Hvis du har brug for at få adgang til og ændre den eksisterende RDD, skal du oprette en ny RDD ved at anvende et sæt af Transformation fungerer til den aktuelle eller forudgående RDD.
  • Partitionering : Det er afgørende enhed af parallelisme i Spark RDD. Som standard er antallet af oprettede partitioner baseret på din datakilde. Du kan endda bestemme antallet af partitioner, du vil lave ved hjælp af brugerdefineret partition funktioner.

Oprettelse af RDD ved hjælp af Spark

RDD'er kan oprettes i tre måder:

  1. Læsning af data fra paralleliserede samlinger
val PCRDD = spark.sparkContext.parallelize (Array ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. Ansøger transformation på tidligere RDD'er
val ord = spark.sparkContext.parallelize (Seq ('Spark', 'er', 'a', 'meget', 'kraftfuld', 'sprog')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Læsning af data fra ekstern lagring eller filstier som HDFS eller HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Operationer udført på RDD'er:

Der er hovedsageligt to typer operationer, der udføres på RDD'er, nemlig:

  • Transformationer
  • Handlinger

Transformationer : Det operationer vi anvender på FUD'er til filter, adgang og modificere dataene i den overordnede RDD for at generere en på hinanden følgende RDD Hedder transformation . Den nye RDD returnerer en markør til den tidligere RDD, der sikrer afhængigheden mellem dem.

Transformationer er Dovne evalueringer, med andre ord, de operationer, der anvendes på RDD, som du arbejder, vil blive logget, men ikke henrettet. Systemet kaster et resultat eller en undtagelse efter at have udløst Handling .

Vi kan dele transformationer i to typer som nedenfor:

  • Smalle transformationer
  • Brede transformationer

Smalle transformationer Vi anvender smalle transformationer på en enkelt partition af den overordnede RDD for at generere en ny RDD, da data, der kræves for at behandle RDD, er tilgængelige på en enkelt partition af forælder ASD . Eksemplerne for snævre transformationer er:

  • kort()
  • filter()
  • flatMap ()
  • skillevæg()
  • mapPartitions ()

Brede transformationer: Vi anvender den brede transformation på flere partitioner at generere en ny RDD. De data, der kræves for at behandle RDD, er tilgængelige på flere partitioner af forælder ASD . Eksemplerne for brede transformationer er:

  • reducere af ()
  • Union()

Handlinger : Handlinger instruerer Apache Spark om at anvende beregning og send resultatet eller en undtagelse tilbage til driver RDD. Få af handlingerne inkluderer:

  • indsamle()
  • tælle()
  • tage()
  • først()

Lad os praktisk talt anvende operationerne på RDD'er:

IPL (indisk Premier League) er en cricket-turnering, hvor den er på højeste niveau. Så lad os i dag få fat på IPL-datasættet og udføre vores RDD ved hjælp af Spark.

  • For det første, lad os downloade en CSV-matchningsdata for IPL. Efter downloadet begynder det at se ud som en EXCEL-fil med rækker og kolonner.

I det næste trin affyrer vi gnisten og indlæser matches.csv-filen fra dens placering, i mit tilfælde mincsvfilplacering er “/Bruger/edureka_566977/test/matches.csv”

Lad os nu starte med Transformation del først:

  • kort():

Vi bruger Korttransformation at anvende en specifik transformationsoperation på hvert element i en RDD. Her opretter vi en RDD ved navn CKfile, hvor vi gemmer vorescsvfil. Vi opretter en anden RDD, der kaldes stater til gem byoplysningerne .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println) val states = CKfile.map (_. split (',') (2)) states.collect (). foreach (println)

  • filter():

Filtertransformation, selve navnet beskriver dets anvendelse. Vi bruger denne transformationsoperation til at filtrere de selektive data ud af en given dataindsamling. Vi ansøger filterbetjening her for at få optegnelser over årets IPL-kampe 2017 og gem den i fil RDD.

php print_r til streng
val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

Vi anvender flatMap er en transformationsoperation til hvert af elementerne i en RDD for at oprette en newRDD. Det ligner korttransformation. her gælder viFlatmaptil spytte ud kampene i Hyderabad by og gem dataene ifilRDDRDD.

val filRDD = fil.flatMap (line => line.split ('Hyderabad')). indsamle ()

  • skillevæg():

Alle data, vi skriver i en RDD, er opdelt i et bestemt antal partitioner. Vi bruger denne transformation til at finde antal partitioner dataene er faktisk opdelt i.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

Vi betragter MapPatitions som et alternativ til Map () ogfor hver() sammen. Vi bruger mapPartitions her for at finde antal rækker vi har i vores fil RDD.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • reducere af ():

Vi brugerReducer ved() på Nøgle-værdipar . Vi brugte denne transformation på vorescsvfil for at finde afspilleren med kampens højeste mand .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • Union():

Navnet forklarer det hele, Vi bruger union transformation er at klub to RDD'er sammen . Her opretter vi to RDD'er, nemlig fil og fil2. fil RDD indeholder registreringer af 2017 IPL-matches og fil2 RDD indeholder 2016 IPL match record.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Lad os starte med Handling del, hvor vi viser faktisk output:

hvordan man skriver i java
  • indsamle():

Collect er den handling, vi bruger til vis indholdet i RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println)

  • tælle():

Tælleer en handling, som vi bruger til at tælle antal poster til stede i RDD.Hervi bruger denne operation til at tælle det samlede antal poster i vores matches.csv-fil.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.count ()

  • tage():

Take er en handling, der ligner indsamling, men den eneste forskel er, at den kan udskrive enhver selektivt antal rækker som pr. brugeranmodning. Her anvender vi følgende kode for at udskrive top ti førende rapporter.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. tage (10) .foreach (println)

  • først():

Første () er en handling, der ligner indsamle () og tage ()detbruges til at udskrive den øverste rapport s output Her bruger vi den første () operation til at finde maksimalt antal kampe spillet i en bestemt by og vi får Mumbai som output.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') val states = CKfile.map (_. split (',') (2)) val Scount = states.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

For at gøre vores proces til vores læring af RDD ved hjælp af Spark, endnu mere interessant, er jeg kommet med en interessant brugssag.

RDD ved hjælp af Spark: Pokemon Use Case

  • For det første, Lad os downloade en Pokemon.csv-fil og indlæse den til gnistskallen, som vi gjorde til Matches.csv-filen.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Pokemoner er faktisk tilgængelige i et stort udvalg. Lad os finde et par varianter.

  • Fjernelse af skema fra Pokemon.csv-filen

Vi har muligvis ikke brug for Skema af Pokemon.csv-filen. Derfor fjerner vi det.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Finde antallet af skillevægge vores pokemon.csv distribueres i.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Vand Pokemon

At finde antal vandpokemon

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Fire Pokemon

At finde antal Fire Pokemon

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Vi kan også registrere befolkning af en anden type pokemon ved hjælp af tællefunktionen
WaterRDD.count () FireRDD.count ()

  • Da jeg kan lide spillet af defensiv strategi lad os finde pokemon med maksimalt forsvar.
val defenceList = NoHeader.map {x => x.split (',')}. kort {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

  • Vi ved det maksimale forsvarsstyrke værdi men vi ved ikke, hvilken pokemon det er. så lad os finde ud af, hvad der er det pokemon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. kort {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Bestilling [Dobbelt] .omvendt.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Lad os nu ordne pokemon med mindst forsvar
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • Lad os nu se Pokemon med en mindre defensiv strategi.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val defWithPeader .kort {x => x.split (',')}. kort {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Bestilling [Dobbelt ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Så med dette kommer vi til en ende af denne RDD ved hjælp af Spark-artiklen. Jeg håber, at vi udløste lidt lys over din viden om RDD'er, deres funktioner og de forskellige typer operationer, der kan udføres på dem.

Denne artikel er baseret på er designet til at forberede dig til Cloudera Hadoop og Spark Developer Certification Exam (CCA175). Du får en indgående viden om Apache Spark og Spark Ecosystem, som inkluderer Spark RDD, Spark SQL, Spark MLlib og Spark Streaming. Du får omfattende viden om Scala-programmeringssprog, HDFS, Sqoop, Flume, Spark GraphX ​​og Messaging System såsom Kafka.