Accessing Hive ORC Tables with PXF

1. PXF Configuration Directory

[mxadmin@sdw3 conf]$ pwd
/usr/local/pxf-matrixdb4/conf
[mxadmin@sdw3 conf]$ ls conf/
pxf-env.sh  pxf-log4j.properties  pxf-profiles.xml  pxf-site.xml

2. PXF Environment Variables

[mxadmin@sdw3 conf]$ cat pxf-env.sh
#!/bin/bash
# Path to JAVA
JAVA_HOME=/usr/local/jdk1805

3. Hadoop Configuration Files

cd /data/bigdata/hadoop/etc/hadoop

3.1 core-site.xml

When MatrixDB is started by YMatrix, configure the proxyuser as the matrixdb user in Hadoop's core-site.xml.
PXF requires a proxy user to access data from HDFS and Hive. This configuration is mandatory; otherwise, data cannot be read.

[mxadmin@sdw3 hadoop]$ cat core-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
  <name>fs.defaultFS</name>
  <value>hdfs://sdw3:8020</value>
</property>
<property>
  <name>hadoop.proxyuser.mxadmin.hosts</name>
  <value>*</value>
</property>
<property>
  <name>hadoop.proxyuser.mxadmin.groups</name>
  <value>*</value>
</property>
<property>
  <name>hadoop.proxyuser.mxadmin.users</name>
  <value>*</value>
</property>
<property>
  <name>hadoop.tmp.dir</name>
  <value>/data/bigdata/hadoop/tmp</value>
</property>
</configuration>

4. Hive Server Configuration

4.1 pxf-site.xml

mkdir /usr/local/pxf-matrixdb4/conf/servers/hive

[mxadmin@sdw3 conf]$ pwd
/usr/local/pxf-matrixdb4/conf
[mxadmin@sdw3 conf]$ ls servers/hive/hive-site.xml   -- Obtain from Hive conf directory
servers/hive/hive-site.xml

[mxadmin@sdw3 conf]$ pwd
/usr/local/pxf-matrixdb4/conf
[mxadmin@sdw3 conf]$ cat servers/hive/pxf-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property>
        <name>pxf.service.kerberos.principal</name>
        <value>gpadmin/[email protected]</value>
        <description>Kerberos principal pxf service should use. _HOST is replaced automatically with hostnames FQDN</description>
    </property>
    <property>
        <name>pxf.service.kerberos.keytab</name>
        <value>${pxf.conf}/keytabs/pxf.service.keytab</value>
        <description>Kerberos path to keytab file owned by pxf service with permissions 0400</description>
    </property>
    <property>
        <name>pxf.service.user.impersonation</name>
        <value>${pxf.service.user.impersonation.enabled}</value>
        <description>End-user identity impersonation, set to true to enable, false to disable</description>
    </property>
    <!--
             <property>
        <name>pxf.service.user.name</name>
        <value>${user.name}</value>
        <description>

            Uncomment and set the proper value only if:

            - user impersonation is enabled and you want to use the specified
              user as a proxy on the unsecured Hadoop clusters. This is useful
              when a proxy user has already been configured on the Hadoop side,
              and you don't want to add gpadmin (the default) as a proxy user.

            - user impersonation is disabled and you want queries from all
              Greenplum users to appear on the Hadoop side as coming from the
              specified user.

        </description>
    </property>
    -->
    <!-- Warehouse base path for Hive -->
    <property>
        <name>pxf.fs.basePath</name>
        <value>/hive/warehouse</value> 
        <description>
            Sets the base path when constructing a file URI for read and write
            operations. This property MUST be configured for any server that
            accesses a file using a file:* profile.
        </description>
    </property>

    <property>
        <name>pxf.ppd.hive</name>
        <value>true</value>
        <description>Specifies whether Predicate Pushdown feature is enabled for Hive profiles.</description>
    </property>

</configuration>

4.2 pxf-profiles.xml

[mxadmin@sdw3 conf]$ cat conf/pxf-profiles.xml
    <profile>
        <name>HiveORC</name>
        <description>This profile is suitable only for Hive tables stored in ORC files and
            serialized with either the ColumnarSerDe or the LazyBinaryColumnarSerDe. It is much
            faster than the general purpose Hive profile. Supports GPDBWritable output format, as
            specified in FORMAT header parameter.
        </description>
        <plugins>
            <fragmenter>org.greenplum.pxf.plugins.hive.HiveInputFormatFragmenter</fragmenter>
            <accessor>org.greenplum.pxf.plugins.hive.HiveORCAccessor</accessor>
            <resolver>org.greenplum.pxf.plugins.hive.HiveORCSerdeResolver</resolver>
            <metadata>org.greenplum.pxf.plugins.hive.HiveMetadataFetcher</metadata>
            <outputFormat>org.greenplum.pxf.api.io.GPDBWritable</outputFormat>
        </plugins>
        <optionMappings>
            <mapping option="ppd" property="pxf.ppd.hive"/>
        </optionMappings>
    </profile>
    <!-- Add new profiles as needed -->
    <profile>
        <name>hive:HiveORC</name>
        <description>This profile is suitable only for Hive tables stored in ORC files and
            serialized with either the ColumnarSerDe or the LazyBinaryColumnarSerDe. It is much
            faster than the general purpose Hive profile. Supports GPDBWritable output format, as
            specified in FORMAT header parameter.
        </description>
        <plugins>
            <fragmenter>org.greenplum.pxf.plugins.hive.HiveInputFormatFragmenter</fragmenter>
            <accessor>org.greenplum.pxf.plugins.hive.HiveORCAccessor</accessor>
            <resolver>org.greenplum.pxf.plugins.hive.HiveORCSerdeResolver</resolver>
            <metadata>org.greenplum.pxf.plugins.hive.HiveMetadataFetcher</metadata>
            <outputFormat>org.greenplum.pxf.api.io.GPDBWritable</outputFormat>
        </plugins>
        <optionMappings>
            <mapping option="ppd" property="pxf.ppd.hive"/>
        </optionMappings>
    </profile>

5. Create Table in YMatrix

postgres=# create table t1(id int,a text);
psql: NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Greenplum Database data distribution key for this table.
HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.

postgres=# insert into t1 select i,md5(i::text) as a from generate_series(1,100) as i;
INSERT 0 100

postgres=# copy t1 to '/home/mxadmin/t1.csv' with delimiter ',';
COPY 100

6. Create ORC Table in Hive

drop table t1_orc;
create table t1_orc(id int,a string )
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' 
WITH SERDEPROPERTIES ( 
  'field.delim'=',', 
  'serialization.format'='') 
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' 
          OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat';

load data local inpath '/home/mxadmin/t1.csv' into table t1_orc; 
# Error:
FAILED: SemanticException Unable to load data to destination table. Error: The file that you are trying to load does not match the file format of the destination table.

# First create a textfile table, then load data indirectly into ORC table
CREATE TABLE t1_text(
 id int,
 a string
)row format delimited fields terminated by ','
stored as textfile;

# Load data into t1_text first
load data local inpath '/home/mxadmin/t1.csv' into table t1_text; 

# Insert from text table into ORC table
hive> insert into table t1_orc select * from t1_text;

6.1 Query Data in Hive

hive> select * from t1_orc limit 5;
OK
34  e369853df766fa44e1ed0ff613f563bd
15  9bf31c7ff062936a96d3c8bd1f8f2ff3
5   e4da3b7fbbce2345d7772b0674a318d5
55  b53b3a3d6ab90ce0268229151c9bde11
1   c4ca4238a0b923820dcc509a6f75849b
Time taken: 0.124 seconds, Fetched: 5 row(s)

# Insert duplicate data
insert into t1_orc select * from t1_orc;

7. Query from YMatrix via Hive FDW

7.1 Create Hive FDW Server

Before creating the server, run:

CREATE EXTENSION pxf_fdw;
drop SERVER hive_svr cascade;
CREATE SERVER hive_svr FOREIGN DATA WRAPPER hive_pxf_fdw OPTIONS ( config 'hive');

7.2 Create User Mapping for mxadmin

CREATE USER MAPPING FOR mxadmin SERVER hive_svr;

7.3 Create Foreign Table

postgres=# drop FOREIGN TABLE f_t1_orc;
postgres=# CREATE FOREIGN TABLE f_t1_orc (
id integer, 
a text
)  
SERVER hive_svr OPTIONS ( resource 'default.t1_orc', format 'HiveORC');

postgres=# \des+
                                                List of foreign servers
        Name        | Owner | Foreign-data wrapper | Access privileges | Type | Version |   FDW options   | Description 
--------------------+-------+----------------------+-------------------+------+---------+-----------------+-------------
 gp_exttable_server | mxadmin | gp_exttable_fdw      |                   |      |         |                 |
 hdfs_svr           | mxadmin | hdfs_pxf_fdw         |                   |      |         | (config 'hdfs') |
 hive_svr           | mxadmin | hive_pxf_fdw         |                   |      |         | (config 'hive') |
(3 rows)

postgres=# \deu+
       List of user mappings
  Server  |  User name  | FDW options
----------+-------------+-------------
 hdfs_svr | mxadmin     |
 hive_svr | mxadmin     |
(2 rows)

postgres=# \det+
                                                 List of foreign tables
 Schema |   Table   |  Server  |                                FDW options                                | Description 
--------+-----------+----------+---------------------------------------------------------------------------+-------------
 public | f_t1_orc  | hive_svr | (resource 'default.t1_orc', format 'HiveORC')                             | 
 public | f_t1_text | hdfs_svr | (resource '/hive/warehouse/t1_text/t1.csv', format 'text', delimiter ',') 

# Query data
postgres=# select * from f_t1_orc limit 10;

7.4 Error Encountered

psql: ERROR:  remote component error (500) from '127.0.0.1:5888':  Type  Exception Report   Message  hdfs:HiveORC is not defined in pxf-profiles.xml   Description  The server encountered an unexpected condition that prevented it from fulfilling the request.   Exception   org.greenplum.pxf.service.profile.ProfileConfException: hdfs:HiveORC is not defined in pxf-profiles.xml (libchurl.c:963)

7.5 Solution

cd /usr/local/pxf-matrixdb4/conf
cp pxf-profiles-default.xml pxf-profiles.xml
cp pxf-profiles.xml ./conf/

7.6 Synchronize Configuration to Other Nodes

cd /usr/local/pxf-matrixdb4
scp -r conf sdw4:$PWD
scp -r conf sdw5:$PWD
scp -r conf sdw6:$PWD
scp -r conf sdw7:$PWD

7.7 Restart PXF Service

[mxadmin@sdw3 pxf-matrixdb3]$ pxf cluster restart
Restarting PXF on 4 segment hosts...
PXF restarted successfully on 4 out of 4 hosts
[mxadmin@sdw3 pxf-matrixdb3]$ pxf restart

7.8 Hive Site Configuration

cd /data/bigdata/hive/conf
[mxadmin@sdw3 conf]$ vim hive-site.xml
<property>
    <name>hive.metastore.uris</name>
    <value>thrift://sdw3:9083</value>
    <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
<property>
    <name>hive.server2.enable.impersonation</name>
    <value>true</value>
    <description>Set this property to enable impersonation in Hive Server 2</description>
</property>
<property>
    <name>hive.server2.enable.doAs</name>
    <value>false</value>
    <description>Set this property to enable impersonation in Hive Server 2</description>
</property>
<property>
    <name>hive.execution.engine</name>
    <value>mr</value>
    <description>Chooses execution engine. Options are: mr(default), tez, or spark</description>
</property>
<property>
    <name>hive.metastore.schema.verification</name>
    <value>false</value>
    <description>Modify schema instead of reporting error</description>
</property>
<property>
    <name>datanucleus.autoCreateTables</name>
    <value>True</value>
</property>

Copy this hive-site.xml to the PXF configuration directory, synchronize across all nodes, and restart the PXF service.

Issue 1

postgres=# SELECT * FROM f_t1_orc LIMIT 10;
psql: ERROR:  remote component error (500) from '127.0.0.1:5888':  Type  Exception Report   Message  configure a valid value for 'pxf.fs.basePath' property for this server to access the filesystem   Description  The server encountered an unexpected condition that prevented it from fulfilling the request.   Exception   java.lang.IllegalArgumentException: configure a valid value for 'pxf.fs.basePath' property for this server to access the filesystem (libchurl.c:963)

Solution:

Place pxf-site.xml under /usr/local/pxf-matrixdb4/conf/servers/hive/
[mxadmin@sdw3 hive]$ ls
hive-site.xml  pxf-site.xml
[mxadmin@sdw3 hive]$ pwd
/usr/local/pxf-matrixdb4/conf/servers/hive

--- SPLIT ---

7.9 pxf-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property>
        <name>pxf.service.kerberos.principal</name>
        <value>gpadmin/[email protected]</value>
        <description>Kerberos principal pxf service should use. _HOST is replaced automatically with hostnames FQDN</description>
    </property>
    <property>
        <name>pxf.service.kerberos.keytab</name>
        <value>${pxf.conf}/keytabs/pxf.service.keytab</value>
        <description>Kerberos path to keytab file owned by pxf service with permissions 0400</description>
    </property>
    <property>
        <name>pxf.service.user.impersonation</name>
        <value>${pxf.service.user.impersonation.enabled}</value>
        <description>End-user identity impersonation, set to true to enable, false to disable</description>
    </property>
    <!--
    <property>
        <name>pxf.service.user.name</name>
        <value>${user.name}</value>
        <description>

            Uncomment and set the proper value only if:

            - user impersonation is enabled and you want to use the specified
              user as a proxy on the unsecured Hadoop clusters. This is useful
              when a proxy user has already been configured on the Hadoop side,
              and you don't want to add gpadmin (the default) as a proxy user.

            - user impersonation is disabled and you want queries from all
              Greenplum users to appear on the Hadoop side as coming from the
              specified user.

        </description>
    </property>
    -->

    <property>
        <name>pxf.fs.basePath</name>
        <value>/hive/warehouse</value>
        <description>
            Sets the base path when constructing a file URI for read and write
            operations. This property MUST be configured for any server that
            accesses a file using a file:* profile.
        </description>
    </property>

    <property>
        <name>pxf.ppd.hive</name>
        <value>true</value>
        <description>Specifies whether Predicate Pushdown feature is enabled for Hive profiles.</description>
    </property>

</configuration>

8. Start Hive Meta Server

[mxadmin@sdw3 hive]$ ./bin/hive --service metastore
2021-07-31 17:11:28: Starting Hive Metastore Server
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data/bigdata/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/data/bigdata/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Exception in thread "org.apache.hadoop.hive.common.JvmPauseMonitor$Monitor@73c5c0f1" java.lang.IllegalAccessError: tried to access method com.google.common.base.Stopwatch.<init>()V from class org.apache.hadoop.hive.common.JvmPauseMonitor$Monitor
    at org.apache.hadoop.hive.common.JvmPauseMonitor$Monitor.run(JvmPauseMonitor.java:176)
    at java.lang.Thread.run(Thread.java:748)
[mxadmin@sdw3 hive]$ ./bin/hiveserver2
which: no hbase in (/data/kafka/bin:/data/gitrepo/go/bin:/usr/bin:/data/bigdata/hive/bin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/usr/local/jdk1805/bin:/usr/local/jdk1805/jre/bin:/data/bigdata/hadoop/bin:/data/bigdata/hadoop/sbin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/usr/local/jdk1805/bin:/usr/local/jdk1805/jre/bin:::/home/mxadmin/.local/bin:/home/mxadmin/bin)
2021-07-31 17:12:47: Starting HiveServer2
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data/bigdata/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/data/bigdata/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Exception in thread "org.apache.hadoop.hive.common.JvmPauseMonitor$Monitor@2149594a" java.lang.IllegalAccessError: tried to access method com.google.common.base.Stopwatch.<init>()V from class org.apache.hadoop.hive.common.JvmPauseMonitor$Monitor
    at org.apache.hadoop.hive.common.JvmPauseMonitor$Monitor.run(JvmPauseMonitor.java:176)
    at java.lang.Thread.run(Thread.java:748)

Problem 1

SELECT * FROM f_sdp_electric_vehicles_data_rdb_orc LIMIT 1;
psql: ERROR:  remote component error (500) from '127.0.0.1:5888':  Type  Exception Report   Message  javax.servlet.ServletException: com.google.common.util.concurrent.ExecutionError: java.lang.OutOfMemoryError: GC overhead limit exceeded   Description  The server encountered an unexpected condition that prevented it from fulfilling the request.   Exception   javax.servlet.ServletException: javax.servlet.ServletException: com.google.common.util.concurrent.ExecutionError: java.lang.OutOfMemoryError: GC overhead limit exceeded (libchurl.c:963)

Problem 2

SELECT * FROM f_sdp_electric_vehicles_data_rdb_orc LIMIT 1;
psql: ERROR:  remote component error (500) from '127.0.0.1:5888':  Type  Exception Report   Message  javax.servlet.ServletException: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.authorize.AuthorizationException): User: mxadmin is not allowed to impersonate hive   Description  The server encountered an unexpected condition that prevented it from fulfilling the request.   Exception   javax.servlet.ServletException: javax.servlet.ServletException: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.authorize.AuthorizationException): User: mxadmin is not allowed to impersonate hive (libchurl.c:963)

Solution

<property>
  <name>hadoop.proxyuser.mxadmin.hosts</name>
  <value>*</value>
</property>
<property>
  <name>hadoop.proxyuser.mxadmin.groups</name>
  <value>*</value>
</property>
<property>
  <name>hadoop.proxyuser.mxadmin.users</name>
  <value>*</value>
</property>