hbase存储图片等非结构化数据出的错误


#1

如下,是通过JAVA API的形式存储非结构化数据的代码:
package aa.aa;

import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.protobuf.generated.HyperbaseProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hyperbase.client.HyperbaseAdmin;
import org.apache.hadoop.hyperbase.metadata.HyperbaseMetadata;
import org.apache.hadoop.hyperbase.secondaryindex.IndexedColumn;
import org.apache.hadoop.hyperbase.secondaryindex.LOBIndex;

public class TestLOB4_3 {
protected static HyperbaseAdmin admin = null;
protected static Configuration conf = null;
static {
conf = HBaseConfiguration.create();
//改成对应的集群上面的配置!
conf.set(“hbase.zookeeper.quorum”,“idc248-118,idc248-119,idc248-120”);
conf.set(“zookeeper.znode.parent”, “/hyperbase1”);
conf.set(“hbase.zookeeper.property.clientPort”, “2181”);
//conf.set(“hbase.client.keyvalue.maxsize”,“102400000”);
try {
admin = new HyperbaseAdmin(conf);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

  public static void main(String[] args) throws Exception {
    testLOBGet();
  }

  public static void testLOBGet() throws Exception {
    byte[] row = Bytes.toBytes("rowkey01");
    byte[] tableName = Bytes
        .toBytes("SIMPLE_TEST_PUT_SCAN11" + System.nanoTime());
    byte[] indexName = Bytes.toBytes("IDX");
    byte[] family11 = Bytes.toBytes("f1");
    byte[] family22 = Bytes.toBytes("f2");
    String path = "C:/images/beauty.jpg";
    createTable(TableName.valueOf(tableName), family11, family22);
    addLOB(tableName, family11, indexName);
    byte[] value01 = getFileBytes(path);

    HTable htable = new HTable(conf, tableName);

    Put put = new Put(row);
    put.add(family11, Bytes.toBytes("q1"), value01);
    htable.put(put);
    htable.flushCommits();
    TimeUnit.SECONDS.sleep(1);

    Get get = new Get(row);
    Result rs = htable.get(get);
    CellScanner cs = rs.cellScanner();
    while(cs.advance()){
      assertTrue(Bytes.equals(CellUtil.cloneValue(cs.current()), value01));
      System.out.println(Bytes.toString(CellUtil.cloneValue(cs.current())));
    }
    htable.close();
    admin.deleteTable(TableName.valueOf(tableName));
  }

  public static byte[] getFileBytes(String path) throws IOException {
    FileInputStream fis = new FileInputStream(new File(path));// 新建一个FileInputStream对象
    byte[] b = new byte[fis.available()];// 新建一个字节数组
    fis.read(b);// 将文件中的内容读取到字节数组中
    fis.close();
    return b;
  }

  public static void createTable(TableName tableName, byte[]... families) throws Exception {
    HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
    for (byte[] family : families) {
      tableDescriptor.addFamily(new HColumnDescriptor(family));
    }
    // 注意,object store一定要预分配region,每个region最好不要超过500G的数据。
    byte[][] splitKeys = new byte[10][];
    for (int i = 0; i < 10; i++) {
      splitKeys[i] =  Bytes.toBytes("rowkey" + i);
    }
    // create table succ
    admin.createTable(tableDescriptor, null, splitKeys);
    HyperbaseMetadata metadata = admin.getTableMetadata(tableName);
    assertTrue(metadata != null);
    // check metadata
    assertTrue(metadata.getFulltextMetadata() == null);
    assertTrue(metadata.getGlobalIndexes().isEmpty());
    assertTrue(metadata.getLocalIndexes().isEmpty());
    assertTrue(metadata.getLobs().isEmpty());
    assertTrue(metadata.isTransactionTable() == false);
  }

  public static void addLOB(byte[] tableName, byte[] family, byte[] LOBFamily) throws
      IOException {
    HyperbaseProtos.SecondaryIndex.Builder LOBBuilder = HyperbaseProtos.SecondaryIndex
        .newBuilder();
    LOBBuilder.setClassName(LOBIndex.class.getName());
    LOBBuilder.setUpdate(true);
    LOBBuilder.setDcop(true);
    IndexedColumn column = new IndexedColumn(family, Bytes.toBytes("q1"));
    LOBBuilder.addColumns(column.toPb());
    admin.addLob(TableName.valueOf(tableName), new LOBIndex(LOBBuilder.build()), LOBFamily, false, 1);
  }

}

上面的代码执行没有出错,但在/var/log/hyperbase1/hbase-hbase-regionserver-idc248-118.log发现有异常:
2018-06-12 12:03:17,816 WARN org.apache.hadoop.ipc.RpcServer: RpcServer.listener,port=60020: count of bytes read: 0
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at org.apache.hadoop.hbase.ipc.RpcServer.channelRead(RpcServer.java:2246)
at org.apache.hadoop.hbase.ipc.RpcServer$Connection.readAndProcess(RpcServer.java:1423)
at org.apache.hadoop.hbase.ipc.RpcServer$Listener.doRead(RpcServer.java:798)
at org.apache.hadoop.hbase.ipc.RpcServer$Listener$Reader.doRunLoop(RpcServer.java:589)
at org.apache.hadoop.hbase.ipc.RpcServer$Listener$Reader.run(RpcServer.java:564)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

请专家分析下原因在哪里。


#2

https://issues.apache.org/jira/browse/HBASE-11494
这应该不是问题,如果不放心,请从hbase中取图片数据来对比原数据


#3

感谢您的热心解答,现在的问题是:在hbase中并没有看到为非结合化数据存储创建的hbase表,我是在hbase shell中使用list命令进行的查看。另外比较让人疑惑的是:通过上面的代码我是曾经有创建成功过hbase表,但后来再次执行同样上面的代码却怎么也不成功(并未改动代码):看不到成功创建的hbase 表。


#4

感谢您的热心解答,现在的问题是:在hbase中并没有看到为非结合化数据存储创建的hbase表,我是在hbase shell中使用list命令进行的查看。另外比较让人疑惑的是:通过上面的代码我是曾经有创建成功过hbase表,但后来再次执行同样上面的代码却怎么也不成功(并未改动代码):看不到成功创建的hbase 表。


#5

建议方便的话点击右侧在线咨询


#6

上面代码中有个
admin.deleteTable(TableName.valueOf(tableName));
可能是你调用结束就删掉了吧


#7

谢谢,就是该问题, 已解决。


在线客服
在线客服
微信客服
微信客服