本文共 6865 字,大约阅读时间需要 22 分钟。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>emg</groupId>
<artifactId>emg.spark</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<!-- 编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!-- 编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 指定自己的类名 -->
<mainClass>emg.branchs.EmgFilterDemo</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
经自己测试 ,hive的metastore启动不了,只启动hiveServer2,这种方式一直报错,找不到hive的元数据库
def main(args: Array[String]): Unit = {
val Array(inpath, dt, hour) = args
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
//.setMaster("local[*]")
.setMaster("spark://192.168.40.52:7077")
val session = SparkSession.builder()
.config(conf)
// 指定hive的metastore的端口 默认为9083 在hive-site.xml中查看
.config("hive.metastore.uris", "thrift://192.168.40.51:9083")
//指定hive的warehouse目录
.config("spark.sql.warehouse.dir", "hdfs://192.168.40.51:9000/user/hive/warehouse")
//直接连接hive
.enableHiveSupport()
.getOrCreate()
import session.implicits._
val df1 = session.read.parquet(inpath)
//df1.write.saveAsTable(s"tmp.tmp_app_log_1")
df1.createOrReplaceTempView("tmp_app_log_test")
//sql的代码省略
val sql1 =
s"""
|select *
|from tmp_app_log_test
""".stripMargin
val hive_table = "dwb2.fact_mbk_offline_log_mbk_app_action_event_v2_i_h"
val sql2 = s"alter table $hive_table add if not exists partition ( dt='$dt',hour='$hour')"
session.sql(sql2)
val tmp_table =s"""tmp.app_log_${dt}_${hour}"""
val sql3 = s"""drop table IF EXISTS $tmp_table""".stripMargin
session.sql(sql3)
val df2 = session.sql(sql1)
//结果先写入临时表
df2.write.saveAsTable(tmp_table)
//结果从临时表写入分区表
val sql4 =
s"""INSERT OVERWRITE TABLE $hive_table
|PARTITION( dt='$dt',hour='$hour')
| select * from $tmp_table """.stripMargin
session.sql(sql4)
val sql5 = s"""drop table IF EXISTS $tmp_table""".stripMargin
session.sql(sql5)
}
经自己测试 hive的metastore启动不了 只启动hiveServer2 jdbc连接方式可以正常使用
def main(args: Array[String]): Unit = {
//经自己测试 hive的metastore启动不了 只启动hiveServer2 jdbc连接方式可以正常使用
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
//.setMaster("local[*]")
.setMaster("spark://192.168.40.**:7077")
val session = SparkSession.builder()
.config(conf)
.getOrCreate()
//注意这里的写法 好像是hive1.3版本以上不一样了 自行百度
val url = "jdbc:hive2://192.168.40.**:10000/emg"
val username = "root"
val password = "123456"
val driverName = "org.apache.hive.jdbc.HiveDriver"
try {
Class.forName(driverName)
} catch {
case e: ClassNotFoundException =>
println("Missing Class", e)
}
val con: Connection = DriverManager.getConnection(url, username, password)
val state = con.createStatement()
import session.implicits._
var paths = "/user/emg/cxb_out/" + CurrentTime.getMonthDate() + "/" + CurrentTime.getYesterday() + "/" + CurrentTime.getHourDate() + "/"
//由于hive的元数据库启动不了 连接不上 只能用jdbc的方式将结果load进hive表中
var sql2 = "load data inpath '" + paths + "' into table result01"
try {
val assertion = state.execute(sql2)
state.execute(sql2)
println("===============================存入hvie成功==========================")
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (null != con) {
con.close()
}
}
/* val sql =
"""
|create external table zz_result(id bigint,lat float,lon float,utc bigint,tags int)
|row format delimited fields terminated by '\t' location '/user/hive/zz'
""".stripMargin
state.executeQuery(sql)
println("建表成功")
try {
val assertion = state.execute(sql)
state.execute(sql)
println("===============================存入hvie成功==========================")
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (null != con) {
con.close()
}
}
*/
session.close()
}
转载地址:http://iexei.baihongyu.com/