前言
針對(duì)數(shù)據(jù)庫(kù)的查詢操作,使用批量方式自然是要快速不少,本文則介紹關(guān)于批量的API實(shí)現(xiàn)。本共實(shí)現(xiàn)兩類API實(shí)現(xiàn),一類是串行的分組并發(fā),一類是并行的分組并發(fā)。
- 利用Lists.partition分組
- CompletableFuture.supplyAsync多線程并發(fā)
串行的分組并發(fā)
public List queryInCondition(String key, Object… value) { long current = System.currentTimeMillis(); Objects.requireNonNull(key, “Filter key cant be null.”); if (value == null || value.length == 0) { LOG.warn(“Return empty data, cause query by key:{},but value is empty”, key); return Collections.emptyList(); } List result = Lists.newArrayListWithCapacity(value.length); if (value.length MAX_PARTITION_DATA_COUNT ? MAX_PARTITION_DATA_COUNT : value.length / MIN_PARTITION; List valuePartition = Lists.partition(Lists.newArrayList(value), SPLIT); for (List values : valuePartition) { result.addAll(queryByKeyValues(currentSession(), key, values)); } } final long cost = System.currentTimeMillis() – current; LOG.debug(“QIC with key:{},Data Count:{}, Speed:{}/s, Time Cost:{} ms”, key, result.size(), result.size() * 1000 / cost, cost); return result; }
并行的分組并發(fā)
public List queryInConditionHighSpeed(String key, Object… value) { long current = System.currentTimeMillis(); Objects.requireNonNull(key, “Filter key cant be null.”); if (value == null || value.length == 0) { LOG.warn(“Return empty data, cause query by key:{},but value is empty”, key); return Collections.emptyList(); } if (value.length { List resultList = Collections.emptyList(); Session session = null; try { session = sessionFactory.openSession(); resultList = queryByKeyValues(session, key, values); } finally { if (session != null && session.isOpen()) session.close(); } return resultList; }); final long cost = System.currentTimeMillis() – current; LOG.debug(“QICH with key:{},Data Count:{}, Speed:{}, Time Cost:{} ms”, key, result.size(), result.size() * 1000 / cost, cost); return result; }
輔助hibernate過(guò)濾查詢方法
private List queryByKeyValues(Session session, String key, List values) { CriteriaBuilder criteriaBuilder = session.getCriteriaBuilder(); CriteriaQuery query = criteriaBuilder.createQuery(getEntityClass()); Root root = query.from(getEntityClass()); query.select(root); CriteriaBuilder.In in = criteriaBuilder.in(root.get(key)); values.forEach(in::value); query.where(in); final Query queryExe = session.createQuery(query); LOG.trace(“Hibernate execute SQL:{}”, queryExe.getQueryString()); List resultList = queryExe.getResultList(); return resultList; }
ConcurrencyUtil輔助類
自定義并發(fā)輔助類
import com.google.common.collect.Lists;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.List;import java.util.Objects;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Future;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.function.Function;import static java.util.stream.Collectors.toList;public class ConcurrencyUtil { private static final Logger LOG = LoggerFactory.getLogger(ConcurrencyUtil.class); private static final int MAX_PARTITION_SIZE = 2000; /** * 提供大數(shù)據(jù)進(jìn)行分組并發(fā)處理能力 * * @param executorService 并發(fā)執(zhí)行線程池 * @param data 待處理數(shù)據(jù) * @param function 針對(duì)分組后的每組數(shù)據(jù)的處理邏輯 * @param 數(shù)據(jù)類型 * @param 返回?cái)?shù)據(jù)類型 * @return */ public static List groupInvoke(ExecutorService executorService, List data, Function function) { int threadCount = getPoolSize(executorService); int i = (data.size() + threadCount) / threadCount; List partition = Lists.partition(data, i > MAX_PARTITION_SIZE ? MAX_PARTITION_SIZE : i); final List<CompletableFuture> futures = partition.stream().map(p -> CompletableFuture.supplyAsync(() -> function.apply(p) , executorService)).collect(toList()); List result = futures.stream().map(p -> { try { return p.get(5, TimeUnit.MINUTES); } catch (Exception e) { LOG.error(“Concurrency groupInvoke error.”, e); } return null; }).filter(Objects::nonNull).flatMap(List::stream).filter(Objects::nonNull).collect(toList()); return result; } public static List invoke(ExecutorService executorService, List data, Function function) { List futures = Lists.newArrayListWithCapacity(data.size()); for (T datum : data) { final Future future = executorService.submit(() -> function.apply(datum)); futures.add(future); } List result = Lists.newArrayListWithCapacity(data.size()); for (Future future : futures) { try { result.add(future.get(5, TimeUnit.MINUTES)); } catch (Exception e) { LOG.error(“Concurrency invoke error.”, e); } } return result.stream().filter(Objects::nonNull).collect(toList()); } private static int getPoolSize(ExecutorService executorService) { int threadCount = 10; if (executorService instanceof ThreadPoolExecutor) { threadCount = ((ThreadPoolExecutor) executorService).getCorePoolSize(); } return threadCount <= 0 ? 10 : threadCount; }}