mybatis实现mysql流式查询的原理 背景 如果使用mybatis查询mysql,查询结果集非常大时,可能发生oom,我们可以使用mysql的流式查询,按需逐行查询每条数据,实现低内存占用。 下面分析其原理。
jdbc驱动如何支持流式查询的 jdbc驱动的基本原理 jdbc驱动与mysql之间建立tcp连接,将sql语句发送到mysql服务端后,并从mysql服务端读取数据,并封装成resultSet的形式返回。
resultSet 我们使用resultSet时,调用resultSet的next方法是否存在下一行数据,resultSet内部包装了ResultsetRows对象,他将请求委托给ResultsetRows对象来处理。resultSet本身只是一个包装类,本质还是被委托到ResultsetRows对象。
resultRows ResultsetRows 有两个实现类,ResultsetRowsStreaming和ResultsetRowsStatic。 ResultsetRowsStreaming发送完请求后,不会读取响应数据,他会等到调用next方法时才会从mysql读取。 ResultsetRowsStatic会使用while循环读取所有的数据存储到list中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 @Override public Resultset read (int maxRows, boolean streamResults, NativePacketPayload resultPacket, ColumnDefinition metadata, ProtocolEntityFactory<Resultset, NativePacketPayload> resultSetFactory) throws IOException { Resultset rs = null ; long columnCount = resultPacket.readInteger(IntegerDataType.INT_LENENC); if (columnCount > 0 ) { ColumnDefinition cdef = this, new ColumnDefinitionFactory (columnCount, metadata)); if (!this .protocol.getServerSession().isEOFDeprecated()) { this .protocol.skipPacket(); } ResultsetRows rows = null ; if (!streamResults) { TextRowFactory trf = new TextRowFactory (this .protocol, cdef, resultSetFactory.getResultSetConcurrency(), false ); ArrayList<ResultsetRow> rowList = new ArrayList <>(); ResultsetRow row = this, trf); while (row != null ) { if ((maxRows == -1 ) || (rowList.size() < maxRows)) { rowList.add(row); } row = this, trf); } rows = new ResultsetRowsStatic (rowList, cdef); } else { rows = new ResultsetRowsStreaming <>(this .protocol, cdef, false , resultSetFactory); this .protocol.setStreamingData(rows); } rs = resultSetFactory.createFromProtocolEntity(rows); } else { if (columnCount == NativePacketPayload.NULL_LENGTH) { String charEncoding = this .protocol.getPropertySet().getStringProperty(PropertyKey.characterEncoding).getValue(); String fileName = resultPacket.readString(StringSelfDataType.STRING_TERM, this .protocol.doesPlatformDbCharsetMatches() ? charEncoding : null ); resultPacket = this .protocol.sendFileToServer(fileName); } OkPacket ok = this .protocol.readServerStatusForResultSets(resultPacket, false ); rs = resultSetFactory.createFromProtocolEntity(ok); } return rs; }
其中有一个重要的判断参数是fetchSize == Integer.MIN_VALUE。 也就是说我们要设置statement的fetchSize为Integer.MIN_VALUE才能启用流式查询。
1 2 3 4 protected boolean createStreamingResultSet () { return ((this .query.getResultType() == Type.FORWARD_ONLY) && (this .resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY) && (this .query.getResultFetchSize() == Integer.MIN_VALUE)); }
mybatis如何处理流式查询的 mybatis查询mysql后会获取到resultSet对象,其会对resultSet中的数据进行逐行next()处理。 如果resultHandler为null,则创建一个默认的resultHandler,否则使用我们提供的resultHandler。 默认的resultHandler源码如下,他会将结果存储到list中作为结果返回,如果使用我们提供的resultHandler,则返回的是空集合。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private void handleResultSet (ResultSetWrapper rsw, ResultMap resultMap, List<Object> multipleResults, ResultMapping parentMapping) throws SQLException { try { if (parentMapping != null ) { handleRowValues(rsw, resultMap, null , RowBounds.DEFAULT, parentMapping); } else { if (resultHandler == null ) { DefaultResultHandler defaultResultHandler = new DefaultResultHandler (objectFactory); handleRowValues(rsw, resultMap, defaultResultHandler, rowBounds, null ); multipleResults.add(defaultResultHandler.getResultList()); } else { handleRowValues(rsw, resultMap, resultHandler, rowBounds, null ); } } } finally { closeResultSet(rsw.getResultSet()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class DefaultResultHandler implements ResultHandler <Object> { private final List<Object> list; public DefaultResultHandler () { list = new ArrayList <>(); } @SuppressWarnings("unchecked") public DefaultResultHandler (ObjectFactory objectFactory) { list = objectFactory.create(List.class); } @Override public void handleResult (ResultContext<?> context) { list.add(context.getResultObject()); } public List<Object> getResultList () { return list; } }
1 2 3 4 5 6 7 8 9 10 11 private void handleRowValuesForSimpleResultMap (ResultSetWrapper rsw, ResultMap resultMap, ResultHandler<?> resultHandler, RowBounds rowBounds, ResultMapping parentMapping) throws SQLException { DefaultResultContext<Object> resultContext = new DefaultResultContext <>(); ResultSet resultSet = rsw.getResultSet(); skipRows(resultSet, rowBounds); while (shouldProcessMoreRows(resultContext, rowBounds) && !resultSet.isClosed() && { ResultMap discriminatedResultMap = resolveDiscriminatedResultMap(resultSet, resultMap, null ); Object rowValue = getRowValue(rsw, discriminatedResultMap, null ); storeObject(resultHandler, resultContext, rowValue, parentMapping, resultSet); } }
1 2 3 4 5 interface CustomSettleInChannelRecordMapper { void query (@Param("request") SettleRequest request, ResultHandler<SettleRecord> handler) ; }
xml文件应该这样写,一定要写 fetchSize=”-2147483648”,不然到达jdbc那里返回的并不是流式的结果集。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 <select id ="query" fetchSize ="-2147483648" resultMap ="bastResultMap" > select id,currency,pay_amount,status,payment_status from settle_record <where > <if test ="request.reconCompleteTimeEnd != null" > and txn_complete_time < = #{request.reconCompleteTimeEnd} </if > <if test ="request.bizType != null" > and biz_type = #{request.bizType} </if > </where > </select >
总结 mybatis本身支持在mapper里面提供resultHandler用来流式处理每个结果。其实它内部本身就是创建一个默认的resultHandler将结果汇集后返回给我们的。
mybatis本身属于框架上层服务,他只负责从jdbc出取得resultSet,将resultSet每一行读取出来包装成流式的形式。 而resultSet本身是不是流式的,mybatis无法决定。 resultSet是真流式还是伪流式,需要判断一个重要的参数fetchSize, 要想实现真流式一定要将fetchSize设置为Integer.MIN_VALUE 底层才是真流式,否则只是mybatis包装的伪流式而已。