HiveServer2 (HS2) is a server interface that enables remote clients to execute queries against Hive and retrieve the results (a more detailed intro here). The current implementation, based on Thrift RPC, is an improved version of HiveServer and supports multi-client concurrency and authentication. It is designed to provide better support for open API clients like JDBC and ODBC.
In this Blog we will learn how can we use HiveServer2 With java,Do not Confuse yourself there is no requirement of hive on your machine,we are creating a stand alone HiveServer2 which will run in embedded mode,so lets get started
for this purpose i am using sbt first create a sbt project with intellij now add following dependencies in build.sbt
libraryDependencies += "org.apache.hive" % "hive-exec" % "1.2.1" excludeAll ExclusionRule(organization = "org.pentaho") libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.7.3" libraryDependencies += "org.apache.httpcomponents" % "httpclient" % "4.3.4" libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.6.0" libraryDependencies += "org.apache.hive" % "hive-service" % "1.2.1" libraryDependencies += "org.apache.hive" % "hive-jdbc" % "1.2.1"
now let the dependency to be resolved now add a package with name as hive in java module and inside that package add a sub package server,inside the server package add a java class named as HiveLocalEmbeddedServer2,this class has two public method start and stop they are used to start and stop the server
package hive.server; import java.io.File; import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.service.Service; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.server.HiveServer2; /** * Utility starting a local/embedded Hive server2 for testing purposes. * Uses sensible defaults to properly clean between reruns. * Additionally it wrangles the Hive internals so it rather executes the jobs locally not within a child JVM (which Hive calls local) or external. */ public class HiveLocalEmbeddedServer2 { private static final String SCRATCH_DIR = "/tmp/hive"; private static Log log = LogFactory.getLog(Hive.class); private HiveServer2 hiveServer; private HiveConf config; private int port; public void start() throws Exception { log.info("Starting Hive Local/Embedded Server..."); if (hiveServer == null) { config = configure(); hiveServer = new HiveServer2(); port = MetaStoreUtils.findFreePort(); config.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT, port); hiveServer.init(config); hiveServer.start(); waitForStartup(); } } public int getFreePort() { log.info("Free Port Available is " + port); return port; } private void waitForStartup() throws Exception { long timeout = TimeUnit.MINUTES.toMillis(1); long unitOfWait = TimeUnit.SECONDS.toMillis(1); CLIService hs2Client = getServiceClientInternal(); SessionHandle sessionHandle = null; for (int interval = 0; interval < timeout / unitOfWait; interval++) { Thread.sleep(unitOfWait); try { Map<String, String> sessionConf = new HashMap<String, String>(); sessionHandle = hs2Client.openSession("foo", "bar", sessionConf); return; } catch (Exception e) { // service not started yet continue; } finally { hs2Client.closeSession(sessionHandle); } } throw new TimeoutException("Couldn't get a hold of HiveServer2..."); } private CLIService getServiceClientInternal() { for (Service service : hiveServer.getServices()) { if (service instanceof CLIService) { return (CLIService) service; } } throw new IllegalStateException("Cannot find CLIService"); } private HiveConf configure() throws Exception { log.info("Setting The Hive Conf Variables"); String scratchDir = SCRATCH_DIR; File scratchDirFile = new File(scratchDir); //TestUtils.delete(scratchDirFile); Configuration cfg = new Configuration(); HiveConf conf = new HiveConf(cfg, HiveConf.class); conf.addToRestrictList("columns.comments"); conf.set("hive.scratch.dir.permission", "777"); conf.setVar(ConfVars.SCRATCHDIRPERMISSION, "777"); scratchDirFile.mkdirs(); // also set the permissions manually since Hive doesn't do it... scratchDirFile.setWritable(true, false); int random = new Random().nextInt(); conf.set("hive.metastore.warehouse.dir", scratchDir + "/warehouse" + random); conf.set("hive.metastore.metadb.dir", scratchDir + "/metastore_db" + random); conf.set("hive.exec.scratchdir", scratchDir); conf.set("fs.permissions.umask-mode", "022"); conf.set("javax.jdo.option.ConnectionURL", "jdbc:derby:;databaseName=" + scratchDir + "/metastore_db" + random + ";create=true"); conf.set("hive.metastore.local", "true"); conf.set("hive.aux.jars.path", ""); conf.set("hive.added.jars.path", ""); conf.set("hive.added.files.path", ""); conf.set("hive.added.archives.path", ""); conf.set("fs.default.name", "file:///"); conf.set("mapred.job.tracker", "local"); // clear mapred.job.tracker - Hadoop defaults to 'local' if not defined. Hive however expects this to be set to 'local' - if it's not, it does a remote execution (i.e. no child JVM) Field field = Configuration.class.getDeclaredField("properties"); field.setAccessible(true); Properties props = (Properties) field.get(conf); //props.remove("mapred.job.tracker"); props.remove("mapreduce.framework.name"); props.setProperty("fs.default.name", "file:///"); // intercept SessionState to clean the threadlocal Field tss = SessionState.class.getDeclaredField("tss"); tss.setAccessible(true); return new HiveConf(conf); } public void stop() { if (hiveServer != null) { log.info("Stopping Hive Local/Embedded Server..."); hiveServer.stop(); hiveServer = null; config = null; log.info("Hive Local/Embedded Server Stopped SucessFully..."); } } }
start method calls the private method configure to set all the properties that are required by hiveserver2 to run such as scratch dir,hive metastore type etc these are same properties that we used to set in our hive-site.xml with these config variables we initialize the hiveserver2 by calling the init method
one more thing if you want to see the hiveserver2 logs create log4j.properties file in resource folder and paste the below code
# Root logger option log4j.rootLogger=INFO,stdout # Redirect log messages to console log4j.appender.debug=org.apache.log4j.RollingFileAppender log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
now we will query this hiveserver2 using jdbc
so lets create a jdbc client class inside the scala folder
package hive.client import java.sql.{DriverManager, SQLException, Statement} import scala.util.Try import hive.server.HiveLocalEmbeddedServer2 import org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException object JdbcClient { private val driverName: String = "org.apache.hive.jdbc.HiveDriver" /** * @param args * @throws SQLException */ @throws[SQLException] def main(args: Array[String]) { if(Try (Class.forName(driverName)).isFailure){ throw new DatastoreDriverNotFoundException("driver not found") } val hiveEmbeddedServer2 = new HiveLocalEmbeddedServer2() hiveEmbeddedServer2.start() val port = hiveEmbeddedServer2.getFreePort val con = DriverManager.getConnection(s"jdbc:hive2://localhost:$port/default", "", "") val stmt: Statement = con.createStatement stmt.execute("drop table if exists emp") stmt.execute("create table emp(id int)") stmt.execute("insert into emp values(1)") stmt.execute("insert into emp values(2)") stmt.execute("insert into emp values(3)") val resultSet = stmt.executeQuery("select * from emp") import utils.Implicits._ resultSet.toStream .foreach(result => println(s"result after fetching ids ${ result.getInt("id") }")) hiveEmbeddedServer2.stop() System.exit(0) } }
by the way here is my implicits class used in util package that i used to convert result set into stream
package utils import java.sql.ResultSet object Implicits { implicit class ResultSetStream(resultSet: ResultSet) { def toStream: Stream[ResultSet] = { new Iterator[ResultSet] { def hasNext = resultSet.next() def next() = resultSet }.toStream } } }
i hope this blog will help
Reblogged this on akashsethi24.
Man, I was chasing this for couple of days, you saved my day(s). Thanks a lot for sharing.
Reblogged this on Mohamed Nadjib's WebWorld and commented:
This post saved me a lot of time, it’s worth more noise and SEO, let’s reblog for the first time.