В нашей таблице HBase каждая строка имеет столбец с именем идентификатор обхода. Используя задание MapReduce, мы хотим обрабатывать в любой момент только строки из данного обхода. Чтобы выполнить задание более эффективно, мы снабдили наш объект сканирования фильтром, который (как мы надеялись) удалит все строки, кроме строк с заданным идентификатором обхода. Однако мы быстро обнаружили, что наши задания не обрабатывали правильное количество строк.
Я написал тестовый сопоставитель, чтобы просто подсчитать количество строк с правильным идентификатором обхода без каких-либо фильтров. Он перебрал все строки в таблице и подсчитал правильное ожидаемое количество строк (~ 15000). Когда мы взяли ту же работу, добавили фильтр к объекту сканирования, счет упал до ~ 3000. Не было никаких манипуляций с самой таблицей во время или между этими двумя заданиями.
Поскольку добавление фильтра сканирования привело к столь резкому изменению видимых строк, мы ожидаем, что просто неправильно построили фильтр.
В нашем задании MapReduce есть один картограф:
public static class RowCountMapper extends TableMapper<ImmutableBytesWritable, Put>{
public String crawlIdentifier;
// counters
private static enum CountRows {
ROWS_WITH_MATCHED_CRAWL_IDENTIFIER
}
@Override
public void setup(Context context){
Configuration configuration=context.getConfiguration();
crawlIdentifier=configuration.get(ConfigPropertyLib.CRAWL_IDENTIFIER_PROPERTY);
}
@Override
public void map(ImmutableBytesWritable legacykey, Result row, Context context){
String rowIdentifier=HBaseSchema.getValueFromRow(row, HBaseSchema.CRAWL_IDENTIFIER_COLUMN);
if (StringUtils.equals(crawlIdentifier, rowIdentifier)){
context.getCounter(CountRows.ROWS_WITH_MATCHED_CRAWL_IDENTIFIER).increment(1l);
}
}
}
Настройка фильтра такая:
String crawlIdentifier=configuration.get(ConfigPropertyLib.CRAWL_IDENTIFIER_PROPERTY);
if (StringUtils.isBlank(crawlIdentifier)){
throw new IllegalArgumentException("Crawl Identifier not set.");
}
// build an HBase scanner
Scan scan=new Scan();
SingleColumnValueFilter filter=new SingleColumnValueFilter(HBaseSchema.CRAWL_IDENTIFIER_COLUMN.getFamily(),
HBaseSchema.CRAWL_IDENTIFIER_COLUMN.getQualifier(),
CompareOp.EQUAL,
Bytes.toBytes(crawlIdentifier));
filter.setFilterIfMissing(true);
scan.setFilter(filter);
Мы используем неправильный фильтр или неправильно его настроили?
РЕДАКТИРОВАТЬ: мы рассматриваем ручное добавление всех семейств столбцов в соответствии с https://issues.apache.org/jira/browse/HBASE-2198, но я уверен, что сканирование включает все семейства по умолчанию.