Starting HiveServer2 Programmatically


HiveServer2 (HS2) is a server interface that enables remote clients to execute queries against Hive and retrieve the results (a more detailed intro here). The current implementation, based on Thrift RPC, is an improved version of HiveServer and supports multi-client concurrency and authentication. It is designed to provide better support for open API clients like JDBC and ODBC.

In this Blog we will learn how can we use HiveServer2 With java,Do not Confuse yourself there is no requirement of hive on your machine,we are creating a stand alone HiveServer2 which will run in embedded mode,so lets get started

for this purpose i am using sbt first create a sbt project with intellij now add following dependencies in build.sbt

libraryDependencies += "org.apache.hive" % "hive-exec" % "1.2.1" excludeAll
                       ExclusionRule(organization = "org.pentaho")

libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.7.3"

libraryDependencies += "org.apache.httpcomponents" % "httpclient" % "4.3.4"

libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.6.0"

libraryDependencies += "org.apache.hive" % "hive-service" % "1.2.1"

libraryDependencies += "org.apache.hive" % "hive-jdbc" % "1.2.1"

now let the dependency to be resolved now add a package with name as hive in java module and inside that package add a sub package server,inside the server package add a java class named as HiveLocalEmbeddedServer2,this class has two public method start and stop they are used to start and stop the server

package hive.server;

import java.io.File;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.service.Service;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.cli.SessionHandle;
import org.apache.hive.service.server.HiveServer2;

/**
 * Utility starting a local/embedded Hive server2 for testing purposes.
 * Uses sensible defaults to properly clean between reruns.
 * Additionally it wrangles the Hive internals so it rather executes the jobs locally not within a child JVM (which Hive calls local) or external.
 */
public class HiveLocalEmbeddedServer2 {
  private static final String SCRATCH_DIR = "/tmp/hive";
  private static Log log = LogFactory.getLog(Hive.class);
  private HiveServer2 hiveServer;
  private HiveConf config;
  private int port;

  public void start() throws Exception {
    log.info("Starting Hive Local/Embedded Server...");
    if (hiveServer == null) {
      config = configure();
      hiveServer = new HiveServer2();
      port = MetaStoreUtils.findFreePort();
      config.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT, port);
      hiveServer.init(config);
      hiveServer.start();
      waitForStartup();
    }
  }

  public int getFreePort() {
    log.info("Free Port Available is " + port);
    return port;
  }

  private void waitForStartup() throws Exception {
    long timeout = TimeUnit.MINUTES.toMillis(1);
    long unitOfWait = TimeUnit.SECONDS.toMillis(1);

    CLIService hs2Client = getServiceClientInternal();
    SessionHandle sessionHandle = null;
    for (int interval = 0; interval < timeout / unitOfWait; interval++) {
      Thread.sleep(unitOfWait);
      try {
        Map<String, String> sessionConf = new HashMap<String, String>();
        sessionHandle = hs2Client.openSession("foo", "bar", sessionConf);
        return;
      } catch (Exception e) {
        // service not started yet
        continue;
      } finally {
        hs2Client.closeSession(sessionHandle);
      }
    }
    throw new TimeoutException("Couldn't get a hold of HiveServer2...");
  }

  private CLIService getServiceClientInternal() {
    for (Service service : hiveServer.getServices()) {
      if (service instanceof CLIService) {
        return (CLIService) service;
      }
    }
    throw new IllegalStateException("Cannot find CLIService");
  }

  private HiveConf configure() throws Exception {
    log.info("Setting The Hive Conf Variables");
    String scratchDir = SCRATCH_DIR;

    File scratchDirFile = new File(scratchDir);
    //TestUtils.delete(scratchDirFile);

    Configuration cfg = new Configuration();
    HiveConf conf = new HiveConf(cfg, HiveConf.class);
    conf.addToRestrictList("columns.comments");
    conf.set("hive.scratch.dir.permission", "777");
    conf.setVar(ConfVars.SCRATCHDIRPERMISSION, "777");
    scratchDirFile.mkdirs();
    // also set the permissions manually since Hive doesn't do it...
    scratchDirFile.setWritable(true, false);

    int random = new Random().nextInt();

    conf.set("hive.metastore.warehouse.dir", scratchDir + "/warehouse" + random);
    conf.set("hive.metastore.metadb.dir", scratchDir + "/metastore_db" + random);
    conf.set("hive.exec.scratchdir", scratchDir);
    conf.set("fs.permissions.umask-mode", "022");
    conf.set("javax.jdo.option.ConnectionURL",
        "jdbc:derby:;databaseName=" + scratchDir + "/metastore_db" + random + ";create=true");
    conf.set("hive.metastore.local", "true");
    conf.set("hive.aux.jars.path", "");
    conf.set("hive.added.jars.path", "");
    conf.set("hive.added.files.path", "");
    conf.set("hive.added.archives.path", "");
    conf.set("fs.default.name", "file:///");
    conf.set("mapred.job.tracker", "local");

    // clear mapred.job.tracker - Hadoop defaults to 'local' if not defined. Hive however expects this to be set to 'local' - if it's not, it does a remote execution (i.e. no child JVM)
    Field field = Configuration.class.getDeclaredField("properties");
    field.setAccessible(true);
    Properties props = (Properties) field.get(conf);
    //props.remove("mapred.job.tracker");
    props.remove("mapreduce.framework.name");
    props.setProperty("fs.default.name", "file:///");

    // intercept SessionState to clean the threadlocal
    Field tss = SessionState.class.getDeclaredField("tss");
    tss.setAccessible(true);
    return new HiveConf(conf);
  }

  public void stop() {
    if (hiveServer != null) {
      log.info("Stopping Hive Local/Embedded Server...");
      hiveServer.stop();
      hiveServer = null;
      config = null;
      log.info("Hive Local/Embedded Server Stopped SucessFully...");

    }
  }

}

start method calls the private method configure to set all the properties that are required by hiveserver2 to run such as scratch dir,hive metastore type etc these are same properties that we used to set in our hive-site.xml with these config variables we initialize the hiveserver2 by calling the init method

one more thing if you want to see the hiveserver2 logs create log4j.properties file in resource folder and paste the below code

# Root logger option
log4j.rootLogger=INFO,stdout


# Redirect log messages to console
log4j.appender.debug=org.apache.log4j.RollingFileAppender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

now we will query this hiveserver2 using jdbc

so lets create a jdbc client class inside the scala folder

package hive.client

import java.sql.{DriverManager, SQLException, Statement}

import scala.util.Try

import hive.server.HiveLocalEmbeddedServer2
import org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException

object JdbcClient {
  private val driverName: String = "org.apache.hive.jdbc.HiveDriver"

  /**
   * @param args
   * @throws SQLException
   */
  @throws[SQLException]
  def main(args: Array[String]) {

    if(Try (Class.forName(driverName)).isFailure){
      throw new DatastoreDriverNotFoundException("driver not found")
    }
    val hiveEmbeddedServer2 = new HiveLocalEmbeddedServer2()
    hiveEmbeddedServer2.start()
    val port = hiveEmbeddedServer2.getFreePort
    val con = DriverManager.getConnection(s"jdbc:hive2://localhost:$port/default", "", "")
    val stmt: Statement = con.createStatement

    stmt.execute("drop table if exists emp")
    stmt.execute("create table emp(id int)")
    stmt.execute("insert into emp values(1)")
    stmt.execute("insert into emp values(2)")
    stmt.execute("insert into emp values(3)")

    val resultSet = stmt.executeQuery("select * from emp")

    import utils.Implicits._

    resultSet.toStream
      .foreach(result => println(s"result after fetching ids ${ result.getInt("id") }"))
hiveEmbeddedServer2.stop()
    System.exit(0)
  }
}

by the way here is my implicits class used in util package that i used to convert result set into stream

package utils

import java.sql.ResultSet

object Implicits {

  implicit class ResultSetStream(resultSet: ResultSet) {

    def toStream: Stream[ResultSet] = {
      new Iterator[ResultSet] {
        def hasNext = resultSet.next()

        def next() = resultSet
      }.toStream
    }
  }

}

Screenshot from 2017-04-30 21:19:56

i hope this blog will help

KNOLDUS-advt-sticker

Advertisements
This entry was posted in Scala. Bookmark the permalink.

One Response to Starting HiveServer2 Programmatically

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s