通过eclipse方法来操作Hadoop集群上cassandra数据库(包括创建Keyspace对象以及往数据库写入数据)

(1)下载cassandra,我所用版本为apache-cassandra-2.0.13-bin.tar.gzhadoop版本为1.0.1),将其上传到hadoop集群,然后解压,tar -xzf apache-cassandra-2.0.13-bin.tar.gz; 并改名为 cassandra,放在目录/usr/下面,然后修改几个文件:

vim cassandra.yaml  按照下面的字段修改

data_file_directories:

    - /usr/cassandra/data

# commit log

commitlog_directory: /usr/cassandra/commitlog

# saved caches

saved_caches_directory: /usr/cassandra/saved_caches

 # multiple nodes!

    - class_name: org.apache.cassandra.locator.SimpleSeedProvider

      parameters:

          # seeds is actually a comma-delimited list of addresses.

          # Ex: "<ip1>,<ip2>,<ip3>"

          - seeds: "172.16.2.42,172.16.2.34,172.16.2.54,172.16.2.57"//集群上所有主机的ip地址

# Setting this to 0.0.0.0 is always wrong.

listen_address: 172.16.2.42      //对应主机的ip地址;其他主机要改为对应的IP地址

# For security reasons, you should not expose this port to the internet.  Firewall it if needed.

rpc_address: 0.0.0.0  //所有机器都为这个

 

auto_bootstrap: true

 

接着修改文件:

vim log4j-server.properties 

log4j.appender.R.File=/usr/cassandra/system.log

和:

vim cassandra-topology.properties 

172.16.2.42=DC1:RAC2

172.16.2.34=DC1:RAC2

172.16.2.54=DC1:RAC2

172.16.2.57=DC1:RAC2

 

配置好了,然后通过scp的方式拷贝到其他hadoop集群上的所有机器(注意:黄色背景字体处的IP地址必须修改与对应主机一样的IP地址)

 

(2)配置classpath,在hadoop集群上的所有机器都必须进行配置,如下:

 Vim /etc/profile 

 #set cassandra path

export CASSANDRA_HOME=/usr/cassandra

export PATH=${CASSANDRA_HOME}/bin:${PATH}

export CLASSPATH=.:$CASSANDRA_HOME/lib/*.jar:$CLASSPATH

 

(3)启动cassandra:输入下面命令即可

[hadoop@Masterpc ~]$ cassandra

启动成功的话,会出现下面所所示,表明成功!

[hadoop@Masterpc ~]$ jps

 3575 CassandraDaemon


(4)创建map/reduce工程,命名为CassandraPro,需要把cassandralib目录下的jar包拷贝到hadooplib目录下;然后创建class,命名为TestClient,其代码如下:

import java.io.UnsupportedEncodingException;  

import java.nio.ByteBuffer;  

import java.util.List;  

  

import org.apache.cassandra.thrift.Cassandra;  

import org.apache.cassandra.thrift.Column;  

import org.apache.cassandra.thrift.ColumnOrSuperColumn;  

import org.apache.cassandra.thrift.ColumnParent;  

import org.apache.cassandra.thrift.ColumnPath;  

import org.apache.cassandra.thrift.ConsistencyLevel;  

import org.apache.cassandra.thrift.InvalidRequestException;  

import org.apache.cassandra.thrift.NotFoundException;  

import org.apache.cassandra.thrift.SlicePredicate;  

import org.apache.cassandra.thrift.SliceRange;   

import org.apache.thrift.protocol.TBinaryProtocol;

import org.apache.cassandra.thrift.TimedOutException;  

import org.apache.cassandra.thrift.UnavailableException;  

import org.apache.thrift.TException;  

import org.apache.thrift.protocol.TProtocol;  

import org.apache.thrift.transport.TFramedTransport;  

import org.apache.thrift.transport.TSocket;  

import org.apache.thrift.transport.TTransport;  

  

public class TestClient  

{  

    public static void main(String[] args)  

        throws TException, InvalidRequestException,           

        UnavailableException, UnsupportedEncodingException,  

        NotFoundException, TimedOutException  

    {  

        //包装好的socket  

        //TTransport tr = new TFramedTransport(new TSocket("Masterpc.Hadoop",9160)); 

             //Masterpc.Hadoop : 172.16.2.42        

  // 也可以连接到集群上其他的主机,只需要连接主机的cassandra客户端开启了即可          

     TTransport tr = new TFramedTransport(new TSocket("172.16.2.42",9160)); //既可以用IP地址也可以用主机名  

        TProtocol proto = new TBinaryProtocol(tr);  

        Cassandra.Client client = new Cassandra.Client(proto);  

        tr.open();  

          

        if(!tr.isOpen())  

        {  

            System.out.println("failed to connect server!");  

            return;  

        }  

        long temp = System.currentTimeMillis();  

          

        client.set_keyspace("demo1");//使用DEMO keyspace  

        ColumnParent parent = new ColumnParent("student");//column family   

        /* 

         * 这里我们插入10000 条数据到Student 每条数据包括idname 

         */  

        String key_user_id = "a";  

        for(int i = 0;i < 10000;i++)  

        {  

            String k = key_user_id + i//key  

            long timestamp = System.currentTimeMillis();//时间戳  

              

            Column idColumn = new Column(toByteBuffer("id"));//column name  

            idColumn.setValue(toByteBuffer(i + ""));//column value  

            idColumn.setTimestamp(timestamp);  

            client.insert(  

                toByteBuffer(k),   

                parent,   

                idColumn,   

                ConsistencyLevel.ONE);  

              

            Column nameColumn = new Column(toByteBuffer("name"));  

            nameColumn.setValue(toByteBuffer("student" + i));  

            nameColumn.setTimestamp(timestamp);  

            client.insert(  

                toByteBuffer(k),   

                parent,   

                nameColumn,   

                ConsistencyLevel.ONE);  

        }  

      

        /* 

         * 读取某条数据的单个字段 

         */  

        ColumnPath path = new ColumnPath("student");//设置读取Student的数据  

        path.setColumn(toByteBuffer("id")); //读取id    

        String key3 = "a9999";//读取keya1的那条记录  

        System.out.println(toString(client.get(toByteBuffer(key3), path, ConsistencyLevel.ONE).column.value));  

        /* 

         * 读取整条数据 

         */  

        SlicePredicate predicate = new SlicePredicate();  

        SliceRange sliceRange = new SliceRange(toByteBuffer(""), toByteBuffer(""), false, 10);  

        predicate.setSlice_range(sliceRange);  

        List<ColumnOrSuperColumn> results =   

            client.get_slice(toByteBuffer(key3), parentpredicate, ConsistencyLevel.ONE);  

          

        for (ColumnOrSuperColumn result : results)  

        {  

            Column column = result.column;  

            System.out.println(toString(column.name) + " -> " + toString(column.value));  

        }  

        long temp2 = System.currentTimeMillis();  

        System.out.println("time: " + (temp2 - temp) + " ms");//输出耗费时间  

          

        tr.close();  

    }   

    /* 

     * String转换为bytebuffer,以便插入cassandra 

     */  

    public static ByteBuffer toByteBuffer(String value)   

        throws UnsupportedEncodingException  

    {  

        return ByteBuffer.wrap(value.getBytes("UTF-8"));  

    }  

    /* 

     * bytebuffer转换为String 

     */  

    public static String toString(ByteBuffer buffer)   

        throws UnsupportedEncodingException  

    {  

        byte[] bytes = new byte[buffer.remaining()];  

        buffer.get(bytes);  

        return new String(bytes"UTF-8");  

    }  

}  

(5)启动需要连接主机(如:172.16.2.42)的cassandra数据库客户端

[hadoop@Masterpc ~]$ cassandra

启动成功的话,会出现下面所所示,表明成功!

[hadoop@Masterpc ~]$ jps

 3575 CassandraDaemon

 

 (6)通过节点的ip地址和端口连接到一个多节点群集中的任意一个节点:

cassandra-cli -host 172.16.2.42 -port 9160

 

然后创建一个 Keyspace对象

create keyspace demo1

with placement_strategy = ‘org.apache.cassandra.locator.SimpleStrategy‘

and strategy_options = [{replication_factor:1}];

 

接着创建一个列族,列族包括两个列:id  和 name

[default@unknown] use demo1;

[default@demo1] CREATE COLUMN FAMILY student

WITH comparator = UTF8Type

AND key_validation_class=UTF8Type

AND column_metadata = [

{column_name: id, validation_class:IntegerType}

{column_name: name, validation_class:UTF8Type}

];

 

(7)然后运行程序(注意:所连接的主机的cassandra客户端必须开启了),结果如下:

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/E:/HadoopWorkPlat/hadoop/lib/slf4j-log4j12-1.7.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/E:/HadoopWorkPlat/hadoop/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/E:/HadoopWorkPlat/hadoop/lib/slf4j-log4j12-1.4.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/C:/Users/wd/Desktop/hadoop/apache-cassandra-2.0.13-bin/apache-cassandra-2.0.13/lib/slf4j-log4j12-1.7.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

9999

id -> 9999

name -> student9999

time: 36163 ms

 

(8)查看cassandra数据库:

[default@demo1] list student;

Using default limit of 100

Using default cell limit of 100

-------------------

RowKey: a4

=> (name=id, value=52, timestamp=1427518050680)

=> (name=name, value=student4, timestamp=1427518050680)

-------------------

RowKey: a12

=> (name=id, value=12594, timestamp=1427518050715)

=> (name=name, value=student12, timestamp=1427518050715)

-------------------

RowKey: a54

=> (name=id, value=13620, timestamp=1427518050921)

=> (name=name, value=student54, timestamp=1427518050921)

-------------------

。。。。。。(省)


郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。