DBInputFormat til at overføre data fra SQL til NoSQL-database



Formålet med denne blog er at lære at overføre data fra SQL-databaser til HDFS, hvordan man overfører data fra SQL-databaser til NoSQL-databaser.

I denne blog vil vi undersøge mulighederne og mulighederne for en af ​​de vigtigste komponenter i Hadoop-teknologien, dvs. MapReduce.

I dag vedtager virksomheder Hadoop-rammen som deres første valg til datalagring på grund af dets evner til effektivt at håndtere store data. Men vi ved også, at dataene er alsidige og findes i forskellige strukturer og formater. For at kontrollere et så stort udvalg af data og dets forskellige formater bør der være en mekanisme til at rumme alle sorterne og alligevel producere et effektivt og konsistent resultat.





Den mest magtfulde komponent i Hadoop-rammen er MapReduce, som kan give kontrol over dataene og dens struktur bedre end dens andre kolleger. Selvom det kræver overhead af læringskurve og programmeringskompleksiteten, hvis du kan håndtere disse kompleksiteter, kan du helt sikkert håndtere enhver form for data med Hadoop.

MapReduce framework opdeler alle sine behandlingsopgaver i grundlæggende to faser: Map og Reduce.



Forberedelse af dine rådata til disse faser kræver forståelse af nogle grundlæggende klasser og grænseflader. Superklassen til denne oparbejdning er InputFormat.

Det InputFormat klasse er en af ​​kerneklasserne i Hadoop MapReduce API. Denne klasse er ansvarlig for at definere to hoved ting:

  • Opdeling af data
  • Optagelæser

Opdeling af data er et grundlæggende koncept i Hadoop MapReduce-rammen, der definerer både størrelsen på de enkelte kortopgaver og dens potentielle eksekveringsserver. Det Optag læseren er ansvarlig for faktisk læsning af poster fra inputfilen og indsendelse af dem (som nøgle / værdipar) til kortlæggeren.



Antallet af kortlæggere bestemmes ud fra antallet af opdelinger. Det er InputFormats opgave at oprette splittelserne. Det meste af tiden er splitstørrelsen ækvivalent med blokstørrelsen, men det er ikke altid, der oprettes splittelser baseret på HDFS-blokstørrelsen. Det afhænger helt af, hvordan metoden getSplits () til din InputFormat er blevet tilsidesat.

Der er en grundlæggende forskel mellem MR split og HDFS blok. En blok er et fysisk stykke data, mens en split kun er et logisk stykke, som en kortlægger læser. En split indeholder ikke inputdataene, den indeholder kun en reference eller adresse på dataene. En split har grundlæggende to ting: En længde i bytes og et sæt lagerplaceringer, som bare er strenge.

For at forstå dette bedre, lad os tage et eksempel: Behandling af data, der er gemt i din MySQL ved hjælp af MR. Da der ikke er noget koncept for blokke i dette tilfælde, teorien: 'splits oprettes altid baseret på HDFS-blokken',mislykkes. En mulighed er at oprette opdelinger baseret på rækkeområder i din MySQL-tabel (og dette er hvad DBInputFormat gør, et inputformat til læsning af data fra en relationsdatabase). Vi kan have k antal opdelinger bestående af n rækker.

Det er kun for InputFormats baseret på FileInputFormat (en InputFormat til håndtering af data, der er gemt i filer), at splittelserne oprettes baseret på den samlede størrelse i byte af inputfilerne. FileSystems blokstørrelse af inputfilerne behandles imidlertid som en øvre grænse for input-opdelinger. Hvis du har en fil, der er mindre end HDFS-blokstørrelsen, får du kun 1 mapper til den fil. Hvis du vil have en anden adfærd, kan du bruge mapred.min.split.size. Men det afhænger igen udelukkende af getSplits () på din InputFormat.

Vi har så mange allerede eksisterende inputformater tilgængelige under pakke org.apache.hadoop.mapreduce.lib.input.

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

Standard er TextInputFormat.

På samme måde har vi så mange outputformater, som læser dataene fra reduceringsanordninger og gemmer dem i HDFS:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

hvad er en java bønne

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

Standard er TextOutputFormat.

Når du er færdig med at læse denne blog, ville du have lært:

  • Sådan skriver du et kortreduceringsprogram
  • Om forskellige typer InputFormater tilgængelige i Mapreduce
  • Hvad er behovet for InputFormats
  • Sådan skriver du brugerdefinerede InputFormats
  • Sådan overføres data fra SQL-databaser til HDFS
  • Sådan overføres data fra SQL (her MySQL) databaser til NoSQL databaser (her Hbase)
  • Sådan overføres data fra en SQL-database til en anden tabel i SQL-databaser (Måske er det måske ikke så meget vigtigt, hvis vi gør dette i den samme SQL-database. Der er dog intet galt i at have kendskab til det samme. Man ved aldrig hvordan det kan komme i brug)

Forudsætning:

  • Hadoop forudinstalleret
  • SQL forudinstalleret
  • Hbase forudinstalleret
  • Java grundlæggende forståelse
  • MapReducer viden
  • Hadoop framework grundlæggende viden

Lad os forstå den problemstilling, som vi skal løse her:

Vi har en medarbejdertabel i MySQL DB i vores relationsdatabase Edureka. Nu i henhold til forretningskravet er vi nødt til at skifte alle tilgængelige data i relationel DB til Hadoop-filsystem, dvs. HDFS, NoSQL DB kendt som Hbase.

hvordan man åbner aws cli

Vi har mange muligheder for at udføre denne opgave:

  • Sqoop
  • Flume
  • MapReduce

Nu vil du ikke installere og konfigurere noget andet værktøj til denne handling. Du har kun én mulighed, der er Hadoop's behandlingsramme MapReduce. MapReduce-rammen giver dig fuld kontrol over dataene, mens du overfører. Du kan manipulere kolonnerne og placere dem direkte på et af de to målplaceringer.

Bemærk:

  • Vi er nødt til at downloade og placere MySQL-stikket i Hadoop-klassestien for at hente tabeller fra MySQL-tabellen. For at gøre dette skal du downloade stikket com.mysql.jdbc_5.1.5.jar og holde det under biblioteket Hadoop_home / share / Hadoop / MaPreduce / lib.
cp Downloads / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / del / hadoop / mapreduce / lib /
  • Sæt også alle Hbase-krukker under Hadoop-klassestien for at gøre dit MR-program adgang til Hbase. For at gøre dette skal du udføre følgende kommando :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / del / hadoop / mapreduce / lib /

De softwareversioner, som jeg har brugt til udførelsen af ​​denne opgave, er:

  • Hadooop-2.3.0
  • HBase 0,98,9-Hadoop2
  • Formørkelsesmåne

For at undgå programmet i ethvert kompatibilitetsproblem foreskriver jeg mine læsere at køre kommandoen med lignende miljø.

Brugerdefineret DBInputWritable:

pakke com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBInputWritable implementes Writable, DBWritable {private int id private String name, dept public void readFields (DataInput in) throw IOException {} public void readFields (ResultSet rs) kaster SQLException // Resultset objekt repræsenterer de data, der returneres fra en SQL-sætning {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) throw IOException { } offentlig ugyldig skrivning (PreparedStatement ps) kaster SQLException {ps.setInt (1, id) ps.setString (2, name) ps.setString (3, dept)} public int getId () {return id} public String getName () {return name} public String getDept () {return dept}}

Brugerdefineret DBOutput Skrivelig:

pakke com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritable implementes Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = navngiv dette.id = id dette.dept = dept} offentlige ugyldige readFields (DataInput in) kaster IOException {} offentlige ugyldige readFields (ResultSet rs) kaster SQLException {} offentlig ugyldig skrivning (DataOutput ud) kaster IOException {} offentlig tom skriv (PreparedStatement ps) kaster SQLException {ps.setString (1, navn) ps.setInt (2, id) ps.setString (3, afd.)}}

Indgangstabel:

Opret database edureka
Opret tabel emp (empid int ikke null, navn varchar (30), dept varchar (20), primær nøgle (empid))
indsæt i emp-værdier (1, 'abhay', 'udvikling'), (2, 'brundesh', 'test')
vælg * fra emp

Tilfælde 1: Overførsel fra MySQL til HDFS

pakke com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable public class MainDbtohdfs {public static void main (String [] args) throw Exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // driverklasse' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // brugernavn' root ') // adgangskode Jobjob = nyt job (conf) job .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormat.class) FileOput. ny sti (args [0])) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // input tabel navn null, null, ny streng [] {'empid', 'name', 'dept'} / / tabelkolonner) Sti p = ny sti (args [0]) FileSystem fs = FileSystem.get (ny URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Dette stykke kode lader os forberede eller konfigurere inputformatet til at få adgang til vores kilde SQL DB.Parameteren inkluderer driverklassen, URL'en har adressen på SQL-databasen, dens brugernavn og adgangskoden.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver klasse 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // brugernavn 'root') //adgangskode

Dette stykke kode lader os videregive detaljerne i tabellerne i databasen og sætte det i jobobjektet. Parametrene inkluderer naturligvis jobforekomsten, den brugerdefinerede skrivbare klasse, der skal implementere DBWritable-grænseflade, kildetabellens navn, betingelse, hvis noget andet er nul, eventuelle sorteringsparametre, der er andet, henholdsvis listen over tabelkolonner.

DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // input tabelnavn null, null, ny streng [] {'empid', 'name', 'dept'} // tabelkolonner)

Kortlægger

pakke com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritable public class Map udvider Mapper {.
beskyttet ugyldigt kort (LongWritable-nøgle, DBInputWritable-værdi, Context ctx) {prøv {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (ny tekst (navn + '' + id + '' + dept), id)
} fangst (IOException e) {e.printStackTrace ()} fangst (InterruptedException e) {e.printStackTrace ()}}}

Reducer: Identitetsreducer brugt

Kommando til at køre:

hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Output: MySQL-tabel overført til HDFS

hadoop dfs -ls / dbtohdfs / *

Tilfælde 2: Overførsel fra et bord i MySQL til et andet i MySQL

oprettelse af output-tabel i MySQL

Opret tabel medarbejder1 (navn varchar (20), id int, dept varchar (20))

pakke com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat importerer org.apache.hadoop.mapreduce.lib.db.DBOutputFormat importerer org.apache.hadoop.io.Text importerer org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable offentlig klasse Mainonetable_to_other_table {public static void main (String [] args) throw Exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // brugernavn' root ') // adgangskode Jobjob = nyt job (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) job .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // input tabel navn null, null, ny String [] {'empid ',' name ',' dept '} // tabelkolonner) DBOutputFormat.setOutput (job,' medarbejder1 ', // output-tabelnavn nyt String [] {' name ',' id ',' dept '} // table kolonner) System.exit (job.waitForCompletion (true)? 0: 1)}}

Dette stykke kode giver os mulighed for at konfigurere navnet på outputtabellen i SQL DB. Parametrene er henholdsvis jobforekomst, outputtabellnavn og outputkolonnenavne.

DBOutputFormat.setOutput (job, 'medarbejder1', // navn på outputtabel nyt String [] {'name', 'id', 'dept'} // tabelkolonner)

Mapper: Samme som sag 1

Reducer:

pakke com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable offentlig klasse Reducere udvider Reducer {beskyttet ugyldig reduktion (Tekstnøgle, Iterable værdier, Context ctx) {int sum = 0 Strenglinje [] = key.toString (). Split ('') prøv {ctx.write (ny DBOutputWritable (linje [0] .toString (), Integer.parseInt (linje [1] .toString ()), linje [2] .toString ()), NullWritable.get ())} fangst (IOException e) {e.printStackTrace ()} fangst (InterruptedException e) {e.printStackTrace ()}}}

Kommando til at køre:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Output: Overførte data fra EMP-tabel i MySQL til en anden tabelmedarbejder1 i MySQL

Tilfælde 3: Overførsel fra tabel i MySQL til NoSQL (Hbase) tabel

Oprettelse af Hbase-tabel for at modtage output fra SQL-tabellen:

Opret 'medarbejder', 'officiel_info'

Førerklasse:

pakke Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text public class MainDbToHbase {public static void main (String [] args) throw Exception {Configuration conf = HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost: 3306 / edureka' , // db url 'root', // brugernavn 'root') // adgangskode Jobjob = nyt job (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('medarbejder', Reducer.klasse, job) job.setInputFormatCatformatClass. klasse) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // input tabelnavn null, null, ny streng [] {'empid', 'name', 'dept'} // tabelkolonner) System.exit (job.waitForCompletion (sand)? 0: 1)}}

Dette stykke kode lader dig konfigurere outputnøgleklassen, som i tilfælde af hbase er ImmutableBytesWritable

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Her sender vi navnet på hbase-tabellen og reduceringsanordningen til at handle på bordet.

TableMapReduceUtil.initTableReducerJob ('medarbejder', Reducer.klasse, job)

Mapper:

pakke Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable public class Kort udvider Mapper {privat IntWritable en = ny IntWritable (1) beskyttet ugyldigt kort (LongWritable id, DBInputWritable værdi, kontekst kontekst) {prøv {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + ' '+ dept))} fangst (IOException e) {e.printStackTrace ()} fangst (InterruptedException e) {e.printStackTrace ()}}}

I dette stykke kode tager vi værdier fra getters i klassen DBinputwritable og sender dem derefter ind
ImmutableBytesSkrivelig, så de når reduktionsenheden i bytewriatble-form, som Hbase forstår.

String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + '' + dept ))

Reducer:

pakke Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes importerer org.apache.hadoop.io.Text public class Reduce udvider TableReducer {public void reduce (ImmutableBytesWritable key, Iterable values, Context context) kaster IOException, InterruptedException {String [] årsag = null // Loop værdier for (Tekst val: værdier) {årsag = val.toString (). split ('')} // Put til HBase Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info') ), Bytes.toBytes ('navn'), Bytes.toBytes (årsag [0])) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('department'), Bytes.toBytes (årsag [1 ])) context.write (nøgle, sæt)}}

Dette stykke kode lader os bestemme den nøjagtige række og den kolonne, hvor vi vil gemme værdier fra reduceringsenheden. Her gemmer vi hver empid i separat række, da vi lavede empid som radenøgle, hvilket ville være unikt. I hver række gemmer vi de officielle oplysninger om medarbejderne under kolonnefamilien 'official_info' under henholdsvis kolonnerne 'navn' og 'afdeling'.

Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('name'), Bytes.toBytes (årsag [0])) put.add (Bytes. toBytes ('official_info'), Bytes.toBytes ('department'), Bytes.toBytes (årsag [1])) context.write (key, put)

Overførte data i Hbase:

scan medarbejder

Som vi ser, var vi i stand til at fuldføre opgaven med at migrere vores forretningsdata fra en relationel SQL DB til en NoSQL DB med succes.

I den næste blog lærer vi, hvordan man skriver og udfører koder til andre input- og outputformater.

Bliv ved med at sende dine kommentarer, spørgsmål eller feedback. Jeg vil meget gerne høre fra dig.

Har du et spørgsmål til os? Nævn det i kommentarfeltet, og vi vender tilbage til dig.

Relaterede indlæg: