HBase – WAL/ memstore/ Log Split

Posted: November 2, 2013 in Uncategorized
Tags:

HDFS is the hadoop file system where random read and write are not possible. Data can be only appended once the file is written in hdfs. HBase is a hdfs based database which makes it possible to have random reads and writes.

HBase is based on the BigTable white paper concept of Google.

Each HBase table is hosted and managed by a set of servers which fall in 3 categories

  1. One active Master server
  2. One or more backup master servers
  3. Many region servers

Region servers actually store the HBase table data, as the table is very large the data is distributed in many region servers and the chuck of data is called region. Each region server can have one or many regions in it. As the table is stored in region server any loss of the master server cannot lead to loss of data.

When a client requests a change, that request is routed to a region server right away by default. However, programmatically, a client can cache the changes in the client side, and flush these changes to region servers in a batch, by turning the autoflush off. If autoflush is turned off, the changes are cached until flush-commits is invoked, or the buffer is full depending on the buffer size set programmatically or configured with parameter “hbase.client.write.buffer”.

The rowkey is sorted and it became easier to determine which region is serving the key. A change request of put/delete is for specific row.

At first, it locates the address of the region server hosting the -ROOT- region from the ZooKeeper quorum.  From the root region server, the client finds out the location of the region server hosting the -META- region.  From the meta region server, then we finally locate the actual region server which serves the requested region.  This is a three-step process, so the region location is cached to avoid this expensive series of operations. If the cached location is invalid (for example, we get some unknown region exception), it’s time to re-locate the region and update the cache.

After the request is received by the correct region server it is not directly written into the HFile as the data in the Hfile should be sorted by rowkey. As data cannot be randomly inserted into HFile as a solution every time data is written to a new file. This may cause to a large number of small files. And this solution will be not scaleable and difficult to read or merge later.

To overcome this problem data is stored in memstore, the data in the memstore is sorted and stored as it is done in HFile. When the memstore accumulates enough data, the entire sorted set is written to a new HFile in HDFS. Completing one large write task is efficient and takes advantage to HDFS’ strengths.

As the memstore is volatile memory and any region server crash may result in data loss. To overcome this situation HBase has a concept of Write ahead log (WAL). Before writing the data to memstore each edit request is written to WAL file. The WAL acts as recovery source incase of any crash. By default WAL is ON but can be turned off as the WAL layer comes with the cost of writing to disk.Data can be retrieved from the WAL layer.

The data in the WAL is written different than the HFile. There is one active WAL file per region server. As one region server serves many regions, the WAL file contains list of edit request of many regions. As WALs grow, they are eventually closed and a new, active WAL file is created to accept additional edits. This is called “rolling” the WAL file. Once a WAL file is rolled, no additional changes are made to the old file. By default WAL is rolled over after its size is 95% of the region server HFile size. However its size can be controlled using below properties

hbase.regionserver.logroll.multiplier -> multipler can be configured using this.
hbase.regionserver.hlog.blocksize -> HFile block size
hbase.regionserver.logroll.period -> roll over period, the file will be roll over even if it is not full.

Constraining WAL file size facilitates efficient file replay if a recovery is required. This is especially important during replay of a region’s WAL file because while a file is being replayed, the corresponding region is not available.

Assuming the default HBase root of “/hbase”, all the WAL files for a region server instance are stored under the same root folder, which is as follows:

/hbase/.logs/<host>,<port>,<startcode>

For example:

/hbase/.logs/srv.example.com,60020,1254173957298

WAL log files are named as follows:

/hbase/.logs/<host>,<port>,<startcode>/<host>%2C<port>%2C<startcode>.<timestamp>

For example:

/hbase/.logs/srv.example.com,60020,1254173957298/srv.example.com%2C60020%2C1254173957298.1254173957495

Log splitting

As there is only single WAL file per region server, who write to multiple regions, the edits are group based on the regions in WAL file so that easy replay can be done in case of any specific region going down. This process of grouping is called Log Splitting.

Distributed Log Splitting

In case of cluster crash all the region servers will be waiting for the HMaster server to complete the splitting and assign the edits file to specific region server. With the new versions of Hbase it makes use of region servers also during this situation to make the processing faster.

HMaster has a SplitLog manager which will be moving all the HFile to be split in location splitlog zookeeper folder (/hbase/splitlog). Once SplitLog manager has moved all these WAL files to splitlogznode folder it monitors them and waits for them to complete.

In each region server there is one daemon split log worker thread, which monitors the splitlog znode folder and picks any unclaimed file from there for further process. It updates the task state properly after the processing.

This feature is controlled by the configuration hbase.master.distributed.log.splitting property. When HMaster starts if this property is not false it will be create instance of Split Log manager, and it will create a monitor thread. This monitor thread does the following.

  1. Check if there are any dead split log worker queued up.
  2. Check if there are any unassigned tasks; if so notify each split log worker to rescan.
  3. Check those assigned tasks if they are expired, if so reassign them. There is no harm in assigning them multiple times.

Based on the state of the task whose data is changed, the split log manager does one of the following:

  1. Resubmit the task if it is unassigned
  2. Heart beat the task if it is assigned
  3. Resubmit or fail* the task if it is resigned
  4. Resubmit or fail* the task if it is completed with errors
  5. Resubmit or fail* the task if it could not complete due to errors
  6. Delete the task if it is successfully completed or failed

HBase commands

Posted: November 2, 2013 in Uncategorized
Tags:

Hbase is columnized database.  It is a database which stores data in tables but is schemaless. It is different from the traditional RDBMS as stores the data with the concept of column family.

A set of columns which are mostly used together are grouped together in memory, they belong to same column family.

Hbase is not RDBMS, there are no secondary index, no multi row trasactions.

No sql interface.

Start the hbase shell from the bin directory

$hbase shell

Open the hbase command line shell.

>create ‘users’, ‘info’

Here users is the table name, and info is column family.

Hbase doesn’t require any information related to type of data which table creation. That’s why hbase is schema less database.

>list

list the current available tables.

> describe ‘users’

Provides all details about the table users.

>count ‘users’

Provides the number of rows in table users.

>scan ‘users’

Returns all rows of table, it returns only the latest value of the column family.

> get 'test','rowkey1'

Returns specific rows with rowkey1, these can be multiple rows.

> get 'test','rowkey1','colfam1:q1'

Returns the latest value.

hbase(main):018:0>  get 'test','rowkey1',{COLUMN => 'colfam1:q1', VERSIONS => 3}
COLUMN                            CELL
colfam1:q1                       timestamp=1382984220422, value=v3
colfam1:q1                       timestamp=1379370165085, value=v2
colfam1:q1                       timestamp=1379370165082, value=v1

Returns last 3 versions of the column family’ column.

Before altering any table disable it first, execute all the alter commands and again enable the table before using it.

hbase> alter ‘t1′, ‘delete’ => ‘f1′

delete any column family.

While dropping the table, first disable the table and then drop it.

>disable ‘users’

>drop ‘users’

The shell describes your table as a map with table name and list of column families.

(Rowkey, columnkey, timestamp) -> value

Rows are stored in sorted manner as per row key. One column can have many versions.

Hive DDL

Posted: November 2, 2013 in Uncategorized

Hive provides the SQL like interface to query HDFS files. HiveQL is the scripting language used for this.

It provides simple SQL like queries and does not require low level coding like in hadoop java Mapreduce programming. The learning curve for the SQL developer is less while using HiveQL.

Hive creates database/table/view to organize data. Also it supports partitioning and bucket concept.
The actual location to store Hive related tables can be configured using hive.metastore.warehouse.dir property.

Similarities and difference between Hive and SQL
Data types – Most of the data types are supported.
Simple
Complex

Different operations in Hive
DDL
DML
Run mode (Local /Map reduce mode )

Hive creates database/table/view to organize data. Also it supports partitioning and bucket concept.

Create table

Hive creates database/table/view to organize data. Also it supports partitioning and bucket concept.

CREATE [EXTERNAL] TABLE [IF NOT EXISTS] tablename (ssn STRING [COMMENT colComment], name STRING, age INT) [COMMENT tableComment]
[PARTITION BY (colName dataType [COMMENT colComment ] ) ]
[ [ROW FORMAT rowFormat] [STORED AS file_format ]
| STORED BY ‘storage.handler.class.name’ [WITH SERDEPROPERTIES (…) ] ]
[LOCATION hdfs_path ]
[TBLPROPERTIES (propertyName= Vale) ]
[AS select_statement ]

ROW FORMAT

DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] [COLLECTION ITEMS TERMINATED BY char] [MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char]
| SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, property_name=property_value, ...)]

Below are the file formats supported in hive.
SEQUENCEFILE
TEXTFILE
RCFILE
ORC
INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname

You can use the DELIMITED clause to read delimited files, you can enable escaping for the delimiter characters by using ‘ESCAPED BY’ clause (e.g. ESCAPED BY ‘\’) (escaping is needed if you want to work with data that can contain these delimiter chars). Use the SERDE clause to create a table with custom SerDe.

Use STORED BY to create a non-native table.
Use STORED AS TEXTFILE if the data needs to be stored as plain text files.
Use STORED AS SEQUENCEFILE if the data needs to be compressed.

EXTERNAL
An EXTERNAL table points to any HDFS location for its storage, rather than being stored in a folder specified by the configuration property hive.metastore.warehouse.dir

DESCRIBE
To describe any table and get its details.
DESCRIBE EXTENDED table_name;

DROP TABLE [IF EXISTS ] table_name

ALTER TABLE
ALTER TABLE table_name RENAME TO new_table_name
ALTER TABLE table_name SET TBLPROPERTIES table_properties
ALTER TABLE table_name SET TBLPROPERTIES ('comment' = new_comment);
ALTER TABLE table_name SET SERDE serde_class_name [WITH SERDEPROPERTIES serde_properties]
ALTER TABLE table_name SET SERDEPROPERTIES serde_properties

ALTER TABLE table_name ADD [IF NOT EXISTS] PARTITION partition_spec [LOCATION 'location1'] partition_spec [LOCATION 'location2'] ...
ALTER TABLE table_name PARTITION partition_spec RENAME TO PARTITION partition_spec;
ALTER TABLE table_name DROP [IF EXISTS] PARTITION partition_spec, PARTITION partition_spec,...

For tables that are protected by NO DROP CASCADE, you can use the predicate IGNORE PROTECTION to drop a specified partition

ALTER TABLE table_name [PARTITION partitionSpec] SET FILEFORMAT file_format
ALTER TABLE table_name [PARTITION partitionSpec] SET LOCATION "new location"

Alter Table/Partition Protections

ALTER TABLE table_name [PARTITION partition_spec] ENABLE|DISABLE NO_DROP;
ALTER TABLE table_name [PARTITION partition_spec] ENABLE|DISABLE OFFLINE;

Protection on data can be set at either the table or partition level. Enabling NO_DROP prevents a table or partition from being dropped. Enabling OFFLINE prevents the data in a table or partition from being queried, but the metadata can still be accessed. Note, if any partition in a table has NO_DROP enabled, the table cannot be dropped either.

Change Column Name/Type/Position/Comment

ALTER TABLE table_name CHANGE [COLUMN] col_old_name col_new_name column_type [COMMENT col_comment] [FIRST|AFTER column_name]

This command will allow users to change a column’s name, data type, comment, or position, or an arbitrary combination of them. The column change command will only modify Hive’s metadata, and will NOT touch data. Users should make sure the actual data layout conforms with the metadata definition.

Add/Replace Columns

ALTER TABLE table_name ADD|REPLACE COLUMNS (col_name data_type [COMMENT col_comment], ...)

ADD COLUMNS lets you add new columns to the end of the existing columns but before the partition columns. REPLACE COLUMNS removes all existing columns and adds the new set of columns. Note that this does not delete underlying data, it just changes the schema.

CTAS (Create table as select )

Inserting data into hive table using queries.
Data from one table can be inserted into another table using hive queries

INSERT OVERWIRTE TABLE tablename [PARTITION [IF NOT EXISTS]] FROM from statement.
INSERT INTO TABLE tablename [PARTITION [IF NOT EXISTS]] FROM from statement.

OVERWIRTE will overwrite any existing data.
CTAS target table cannot be EXTERNAL table and PARTITIONED.
Create Table Like
Create table LIKE will copy the table definition only and not the data.

The CLUSTERED BY and SORTED BY creation commands do not affect how data is inserted into a table – only how it is read. This means that users must be careful to insert data correctly by specifying the number of reducers to be equal to the number of buckets, and using CLUSTER BY and SORT BY commands in their query.
CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS

LOAD TABLE

Loading files into tables.Hive supports multi table insert. In this construct, users can perform multiple queries
on the same input data using a single HiveQL query. Hive optimizes these queries to share
the scan of the input data, thus increasing the throughput of these queries
Hive does not do any processing/ transformation while loading file. It is just copying / moving the data from one location to hive table configured location.

LOAD DATA [LOCAL] IN PATH ‘filepath’ [OVERWRITE] INTO TABLE tablename

Filepath can be relative, absolute or URI.
Filepath can be file which is moved to hive table or directory whose all files will be moved to table.

LOCAL: Load command searches for the files in local file system and copies them to target file system and then loads them in table.
LOCAL not specified: Files are present in hdfs file system and will load the files into table.
If the scheme and authority are not specified then hive will load these properties from hadoop configuration variables fs.default.name which specifies the namenode URI.

OVERWRITE
Contents of the target table or partition will be deleted and replaced with files referred to by filepath. Otherwise the files will be appended in the table.

Customize input output format
Compression

The default file format for storage in Hive is TextFile, when any compressed file is loaded in hive table it will be stored as TextFile. However, there is disadvantage with the TextFile format as hadoop will be not able to split this file into chunks and run multiple parallel threads for data processing.
The recommended practice is to load the compressed data into table as textfile and then insert that data into another table, which will be stored as a SequenceFile. A SequenceFile can be split by Hadoop and distributed across map jobs where as a GZIP file cannot be.

CREATE TABLE raw (line STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t’ LINES TERMINATED BY ‘\n’;

CREATE TABLE raw_sequence (line STRING)
STORED AS SEQUENCEFILE;

LOAD DATA LOCAL INPATH ‘/tmp/weblogs/20090603-access.log.z’ INTO TABLE raw;

SET hive.exec.compress.output=true;
SET io.seqfile.compression.type=BLOCK; — NONE/RECORD/BLOCK (see below)
INSERT OVERWRITE TABLE raw_sequence SELECT * FROM raw;

VIEW
CREATE VIEW [IF NOT EXISTS] view_name [(column_name [COMMENT column_comment], ...) ]
[COMMENT view_comment]
[TBLPROPERTIES (property_name = property_value, ...)]
AS SELECT ...

View has all the properties inherited from the table by default except col comment. Any changes in comments can be done.
View is created if the underlying table exists and the viewName does not exist. Use the IF NOT EXISTS keyword to avoid exceptions. View may content the ORDER BY and LIMIT clauses in its create statement.

DROP VIEW [IF EXISTS] view_name

Drop statement will drop the metadata of the view and data is not affected. However, the other dependent views become invalid as no warning is given while dropping the view.

ALTER VIEW view_name SET TBLPROPERTIES table_properties

View can be altered to add user defined metadata.

ALTER VIEW view_name AS select_statement

This is same as create or update view. The view must already exist, and if the view has partitions, it could not be replaced by Alter View As Select.

INDEX
CREATE INDEX index_name
ON TABLE base_table_name (col_name, ...)
AS index_type
[WITH DEFERRED REBUILD]
[IDXPROPERTIES (property_name=property_value, ...)]
[IN TABLE index_table_name]
[
   [ ROW FORMAT ...] STORED AS ...
   | STORED BY ...
]
[LOCATION hdfs_path]
[TBLPROPERTIES (...)]
[COMMENT "index comment"]

Create index statement is used to create index with specified columns on the table.

DROP INDEX [IF EXISTS] index_name ON table_name

Drop statement will drop the index and also the index table.

SQL operation
SELECT /FILTERS
GROUP BY
JOIN
UNION

UDF (user defined functions)
Small scripting functions as well as java class method can be called from the hive query.

Hello world!

Posted: February 19, 2012 in Uncategorized

These articles are written by me to tracks the work I am doing and my learning.