`

spark (4)spark-shell 读写hdfs 读写redis 读写hbase

阅读更多

 

(1)初学者对于spark的几个疑问 http://aperise.iteye.com/blog/2302481
(2)spark开发环境搭建 http://aperise.iteye.com/blog/2302535
(3)Spark Standalone集群安装介绍 http://aperise.iteye.com/blog/2305905
(4)spark-shell 读写hdfs 读写redis 读写hbase http://aperise.iteye.com/blog/2324253

spark-shell 读写hdfs 读写hbase 读写redis

1.进入spark-shell环境

spark使用的是standalone方式,spark通过zookeeper做了HA(Highe Available),spark master在机器hadoop31和hadoop33上面,登录时候指定每个worker在跑spark-shell任务时候使用内存为4GB
cd /home/hadoop/spark-1.6.0-bin-hadoop2.6/
bin/spark-shell --master spark://hadoop31:7077,hadoop33:7077 --executor-memory 4G

 

2.spark-shell读写hdfs

    2.1 读取HDFS上文件

    spark-shell读取位于hadoop ha集群下的目录/data/2*/2016*/*,目录采用模糊匹配方式

 

sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*")

   

 

    2.2 RDD处理结果写HDFS

    spark-shell存储数据到hadoop的HDFS上,下面这种方式在/hdfsfile下会存在多个结果文件,形如:part-00001.snappy、part-00002.snappy......

 

 

sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*")
.map(line=>(line.split(",")(0),line.split(",")(1),line.split(",")(2),line.split(",")(3)))
.saveAsTextFile("hdfs://hadoop-ha-cluster/hdfsfile")

     

    2.3 RDD处理结果只写一个文件到HDFS

    有时为了汇聚结果到一个文件,可以在存储文件之前增加repartition操作,这样在/hdfsfile下面只会产生一个结果文件part-00000.snappy文件

 

 

sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*")
.map(line=>(line.split(",")(0),line.split(",")(1),line.split(",")(2),line.split(",")(3)))
.repartition(1)
.saveAsTextFile("hdfs://hadoop-ha-cluster/hdfsfile")

 

 

3.spark-shell读写hbase

    3.1 spark加载hbase jar

    修改spark配置文件,使得spark知道哪里加载hbase相关jar包,修改配置文件spark-env.sh,添加如下内容:

#加载hbase相关jar
export SPARK_CLASSPATH=/home/hadoop/hbase-1.2.1/lib/*

#加载jedis相关jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/jedis/jedis-2.9.0.jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/jedis/commons-pool2-2.4.2.jar

 

    3.2 hbase中新增数据表

    首先在hbase中创建一个数据表

cd /home/hadoop/hbase-1.2.1/bin
./hbase shell
disable 'hbase_test'
drop 'hbase_test'
create 'hbase_test', {NAME => 'comumnfamily', TTL=>'604800', COMPRESSION => 'SNAPPY'}, SPLITS => ['2','4','6','8']
quit

 

    3.3 spark-shell往hbase里面写入spark处理的结果

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put

sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*")
.map(line=>(line.split(",")(0),(line.split(",")(1),line.split(",")(2).toLong,line.split(",")(3).toLong)))
.reduceByKey((tuple1,tuple2)=>(tuple1._1+tuple2._1,tuple1._2+tuple2._2))
.foreachPartition{
    iterators=>{
    var tmpConf=HBaseConfiguration.create()
    tmpConf.set("hbase.zookeeper.quorum","hadoop31,hadoop32,hadoop33,hadoop34,hadoop35")
    tmpConf.set("hbase.zookeeper.property.clientPort","2181")
    var table=new HTable(tmpConf,"hbase_test")
    table.setWriteBufferSize(5*1024*1024)
    var putList=new java.util.ArrayList[Put]
    iterators.foreach{tupple=>{
        var fixednumber=("0"*(25-tupple._1.length)+tupple._1).reverse
        var rowkey=fixednumber.getBytes()
        var p=new Put(rowkey)
        p.add("comumnfamily".getBytes,"column1".getBytes,tupple._2._1.toString.getBytes)
        p.add("comumnfamily".getBytes,"column2".getBytes,tupple._2._2.toString.getBytes)
        
        putList.add(p)
        if(putList.size()>0&&putList.size()%1000==0){
            table.put(putList)
            putList.clear()
        }}}
    table.put(putList)
    }
}

     上面小的技巧首先是使用了foreachPartition,使用该操作后,对于每一个parttion,hbase的数据库链接只需建立一个,该parttion内无需频繁创建hbase链接,不用担心序列化相关问题

    第二是hbase使用批量提交,每次提交1000条记录,提高写入速度

 

    3.4 spark-shell读取hbase中的数据,写成hdfs文件

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.HConstants

val tmpConf = HBaseConfiguration.create()
tmpConf.set("hbase.zookeeper.quorum", "hadoop31,hadoop32,hadoop33,hadoop34,hadoop35")
tmpConf.set("hbase.zookeeper.property.clientPort", "2181")
tmpConf.set(TableInputFormat.INPUT_TABLE, "hbase_test")
tmpConf.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, "120000");   
val hBaseRDD = sc.newAPIHadoopRDD(tmpConf, classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
val lineRdd=hBaseRDD.map(r=>
    (if(r._2.getFamilyMap("comumnfamily".getBytes).keySet.contains("column1".getBytes)){new String(r._2.getValue("data".getBytes,"log_date".getBytes))}else{"0"})+","+
    (if(r._2.getFamilyMap("comumnfamily".getBytes).keySet.contains("column2".getBytes)){new String(r._2.getValue("data".getBytes,"area_code".getBytes))}else{"0"})
)
lineRdd.repartition(1).saveAsTextFile("hdfs://hadoop-ha-cluster/hbase2hdfs")

 

4.spark-shell读写redis

    4.1 spark加载redis jar

    配置spark,使得spark知道哪里加载jedis的jar,修改配置文件spark-env.sh,添加如下内容

#加载hbase相关jar
export SPARK_CLASSPATH=/home/hadoop/hbase-1.2.1/lib/*

#加载jedis相关jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/jedis/jedis-2.9.0.jar
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/jedis/commons-pool2-2.4.2.jar

 

    4.2 spark-shell通过jedis API写数据到redis-cluster

import redis.clients.jedis.HostAndPort
import redis.clients.jedis.JedisCluster

sc.textFile("hdfs://hadoop-ha-cluster/auto_data.txt")
      .repartition(10)
      .map(line => (line.split(",")(0), line.split(",")(1), line.split(",")(2)))
      .foreachPartition { iterators => {
        val jedisClusterNodes = new java.util.HashSet[HostAndPort]
        val serverList = new java.util.ArrayList[HostAndPort]
        serverList.add(new HostAndPort("192.168.173.21", 6379))
        serverList.add(new HostAndPort("192.168.173.22", 6380))
        serverList.add(new HostAndPort("192.168.173.23", 6381))
        serverList.add(new HostAndPort("192.168.173.24", 6379))
        serverList.add(new HostAndPort("192.168.173.25", 6380))
        serverList.add(new HostAndPort("192.168.173.26", 6381))
        jedisClusterNodes.addAll(serverList)
        val jc = new JedisCluster(jedisClusterNodes)
        iterators.foreach { t => {
          var key = "auto," + t._1 + "," + t._2
          var value = t._3
          jc.set(key, value)
        }
        }
      }
      }

     此种方式中JedisCluster不需要序列化,因为使用JedisCluster的地方不在RDD里面,RDD已经通过collect汇聚结果到了当前节点

 

    4.3 spark-shell在RDD操作过程中通过jedis API使用redis-cluster中数据

import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import redis.clients.jedis.HostAndPort
import redis.clients.jedis.JedisCluster
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put

object JedisClusterObject extends Serializable {
    var jedisClusterNodes=new java.util.HashSet[HostAndPort]
    var serverList=new java.util.ArrayList[HostAndPort]
    serverList.add(new HostAndPort("192.168.173.21",6379))
    serverList.add(new HostAndPort("192.168.173.22",6380))
    serverList.add(new HostAndPort("192.168.173.23",6381))
    serverList.add(new HostAndPort("192.168.173.24",6379))
    serverList.add(new HostAndPort("192.168.173.25",6380))
    serverList.add(new HostAndPort("192.168.173.26",6381))
    jedisClusterNodes.addAll(serverList)
    val jc:JedisCluster = new JedisCluster(jedisClusterNodes)
}

sc.textFile("hdfs://hadoop-ha-cluster/data/2*/2016*/*")
.map(line=>line.split(",")(0))
.map(line=>(
  line,
  if(JedisClusterObject.jc.exists("auto,"+line)){JedisClusterObject.jc.get("auto,"+line)}else{line},
  if(JedisClusterObject.jc.exists("auto,"+line)){JedisClusterObject.jc.get("auto,"+line)}else{line}
))
.saveAsTextFile("hdfs://hadoop-ha-cluster/jedistest")

     此种方式JedisCluster必须通过单例并序列化,因为JedisCluster实在RDD中使用,会被序列化后在各个节点计算中使用,否则会提示Task not Serialized :JedisCluster

 

5 本地开发工具链接spark

    5.1 本地IDEA工具中如何链接HADOOP HA环境

      hadoop安装的是采用HA的方式,现在本地开发环境开发spark时候,无法解析hadoop-ha方式下的cluster名称,原因是本地程序不知道加载的cluster ha对应的namenode名称和IP,解决办法是通过sparkconf追加参数,让spark 本地local模式知道hadoop ha配置,如下:

  val spark = SparkSession
    .builder()
    .master("local[2]")
    .appName("HtSecApp UserEvent Processor")
    .getOrCreate()

  val sc = spark.sparkContext
  val hadoopConf = sc.hadoopConfiguration

  hadoopConf.set("dfs.nameservices", "mycluster")
  hadoopConf.set("dfs.client.failover.proxy.provider.mycluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
  hadoopConf.set("dfs.ha.namenodes.mycluster", "nn1,nn2")
  hadoopConf.set("dfs.namenode.rpc-address.mycluster.nn1", "192.168.77.38:9000")
  hadoopConf.set("dfs.namenode.rpc-address.mycluster.nn2", "192.168.77.39:9000")

    能够避免以下错误:

 

    5.2 本地搭建spark开发环境

    参见博客spark (2)spark开发环境搭建

  • 大小: 18 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics