To start working with Kite, simply download its binaries here, edit the settings file, and run the jar file kite-console-xx.jar on the cluster machines using Java 8 JVM (xx refers to Kite version number). Kite is a distributed system that run on commodity hardware clusters. Kite jar file should be executed on each machine seprately, no need to provide a list of machines in the cluster ahead of time. As a new machine runs Kite jar file, it is added to the cluster and automatically discovered by other up-and-running machines as long as they belong to the same network. When a running machine gets down, it is also automatically discovered by other machines. As introduced in About Kite, Kite writes its disk-based artifacts to Hadoop Distributed File System (HDFS). So, before start Kite machines, it is necessary to have an up-and-running HDFS instance. Check here to configure an HDFS cluster. Note that all machines of the same Kite instance should share the same settings of the underlying HDFS, as introduced in Kite Settings File.
After starting each Kite machine, it is ready to receive and execute MQL query language statements. In addition, the Kite jar file can be added as a dependant to Java projects to use Kite APIs in Java programs or with compatible programming languages. To gracefully stop a Kite machine, type quit
or exit
. Examples section provides sample MQL statements and queries as well as a ready-made example of a streaming data source to start using Kite immediately.
On running the Kite jar file on each machine, the system administrator should provide a settings file. By default, Kite jar assumes a settings file named kite.settings and located in the same folder of the jar file. If the settings file is located elsewhere or named differently, it should be provided as a command line argument to the jar file.
The settings file include the HDFS settings as mandatory settings, in addition to other optional settings that allow system administrator to tune and control and system performance and behaviour. Kite settings file is a properties file that includes the following parameters:
hdfsHost: a mandatory URL to the master node of HDFS cluster. No default value provided. Example: hdfs://cs-cluster-1.cs.ucr.edu:9000
hdfsRootDirectory: an optional path to the root HDFS directory. Default value is "/". Example: /user/admin/
hdfsUsername: a mandatory HDFS username for an authorized user with write permissions on the hdfsRootDirectory directory. No default value provided. Example: amr
hdfsGroupname: an optional HDFS groupname to which the hdfsUsername belongs to. No default value provided. Example: supergroup
queryAnswerSize: an optional integer to determine the default answer size that is returned by Kite queries. Default value is 20. Example: 100
queryTimeMinutes: an optional integer to determine the default search time horizon in the past. The value of 180 means searching the last 180 minutes (3 hours) of data.
This default value can be overwritten by every single query, when a query does not indicate a search time period, this value is used. Default value is 180. Example: 10080
memoryIndexCapacity: an optional integer to determine the default in-memory index capacity in terms of number of data records. Default value is 1000000.
memoryIndexNumSegments: an optional integer to determine the default number of segments that is used to segment in-memory index structures. Default value is 5.
logsDirectory: an optional path to a folder to create system logs. Default value is the same directory of Kite jar file.
CREATE STREAM
, CREATE INDEX
, DROP STREAM
, DROP INDEX
, and SELECT
to create and drop streams and index structures and query them. It also provides additional statements to manage and administrate the system assets: SHOW
, UNSHOW
, PAUSE
, RESUME
, ACTIVATE
, DEACTIVATE
, RESTART
, and DESC
statements. The usage of each statement is detailed below.
CREATE STREAM stream_name (att1:Type, att2:Type, att3:Type,... attn:Type)
FROM stream_source
FORMAT stream_format
Example:
CREATE STREAM stream1 (id:Long, mtime:Timestamp, keyword:String, location:GeoLocation, username:String)
FROM Network_TCP(128.63.28.36:2334)
FORMAT CSV(0,1,4,3,2)
COUNT
keyword.COUNT
keyword. Each stream is expected to have an attribute named "timestamp" of data type Timestamp to provide the Microblog timestamp that is used in all queires and indexing operations. If there is no "timestamp" attribute exists in the stream, Kite adds this attribute to the stream and assigns it the Microblog arrival time. However, this added attribute cannot be queried, only user defined attributes can be queried.
CREATE INDEX HASH index_name ON stream_name(attribute_name) [OPTIONS index_capacity, num_index_segments]
CREATE INDEX SPATIAL spatial_partitioning_type index_name ON stream_name(attribute_name)
[OPTIONS index_capacity, num_index_segments, north, south, east, west, num_rows, num_cols]
Examples:
CREATE INDEX HASH index1 ON stream1(keyword)
CREATE INDEX HASH index1 ON stream1(keyword) OPTIONS 2000000,20
CREATE INDEX SPATIAL GRID index2 ON stream1(location)
CREATE INDEX SPATIAL GRID index2 ON stream1(location) OPTIONS 2000000,20,90,-90,180,-180,180,360
DROP INDEX index_name stream_name
Example:
DROP INDEX index1 stream1
DROP STREAM stream_name
Example:
DROP STREAM stream1
SELECT attribute_list FROM stream_name [WHERE condition] [TOPK k] [TIME time_interval]
Examples:
SELECT * FROM stream1
SELECT id, keyword FROM stream1 TOPK 17
SELECT id, keyword FROM stream1 WHERE keyword = obama
SELECT id, keyword FROM stream1 WHERE keyword = obama TOPK 70 TIME [13 Jan 2017, 15 Jan 2017]
SELECT id, keyword FROM stream1 WHERE (keyword = obama OR keyword=trump) AND location WITHIN [50,24,-122,-126] TOPK 50
SHOW stream_name
Example:
SHOW stream1
UNSHOW stream_name
Example:
UNSHOW stream1
PAUSE stream_name
Example:
PAUSE stream1
RESUME stream_name
Example:
RESUME stream1
ACTIVATE index_name stream_name
Example:
ACTIVATE index1 stream1
DEACTIVATE index_name stream_name
Example:
DEACTIVATE index1 stream1
RESTART stream_name
Example:
RESTART stream1
DESC [stream_name]
Examples:
DESC
DESC stream1
import edu.umn.cs.kite.*
. Actually, all MQL statements are executed through translating them into the equivalent Java lines of code. In this tutorial, we describe how to launch a Kite machine and give the equivalent Java lines of code for each MQL statement.
Action | Java Code Snippets | Notes |
---|---|---|
Launch Kite Machine |
KiteLaunchTool kite = new KiteLaunchTool();
|
|
Execute an MQL Statement |
String statement = "CREATE....";
|
The parser returns a Boolean indicates a successful or failed parsing, a String error message in case of failed parsing, and a MetadataEntry in case of successful parsing. |
CREATE STREAM ... |
StreamFormatInfo format = new StreamFormatInfo("csv", attrIndecies);
|
|
CREATE INDEX ... |
StreamDataset stream = new StreamDataset (...);
|
loadDiskIndex is true when the index previously exists in the system, and false otherwise. |
DROP INDEX ... |
StreamDataset stream = KiteInstance.getStream(stream_name);
|
|
DROP STREAM ... |
StreamDataset stream = KiteInstance.getStream(stream_name);
|
|
SELECT ... |
StreamDataset stream = KiteInstance.getStream (stream_name);
|
|
SHOW/UNSHOW ... |
StreamDataset stream = KiteInstance.getStream (stream_name);
|
|
PAUSE/RESUME ... |
StreamDataset stream = KiteInstance.getStream (stream_name);
|
|
ACTIVATE/DEACTIVATE ... |
StreamDataset stream = KiteInstance.getStream (stream_name);
|
|
RESTART ... |
KiteInstance.restartStream(stream_name);
|
|
DESC ... |
KiteInstance.descStream (stream_name);
|