diff --git a/README.md b/README.md index 14acb50..d004a4e 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ These are the available config options for making requests. Only the url is requ timeout: 1000, // default is `15000` milliseconds // `responseType` indicates the type of data that the server will respond with - // options are: 'json', 'text', 'buffer' + // options are: 'json', 'text', 'buffer','stream' responseType: 'json', // default // `agent` define a custom agent to be used when performing http or https requests, diff --git a/lib/request.js b/lib/request.js index 6d108f6..78b0a4e 100644 --- a/lib/request.js +++ b/lib/request.js @@ -2,6 +2,7 @@ const http = require('http'); const https = require('https'); const { URL } = require('url'); const qs = require('querystring'); +const pump = require('pump'); const Response = require('./response'); const createError = require('./createError'); const enhanceError = require('./enhanceError'); @@ -41,12 +42,20 @@ async function request(options = {}) { options.headers || {} ); const contentType = headers['Content-Type']; + const isStream = + data && + data.readable !== false && + typeof data === 'object' && + typeof data.pipe === 'function' && + typeof data._read === 'function' && + typeof data._readableState === 'object' + if ( Object.prototype.toString.call(data) === '[object Object]' && contentType.startsWith('application/x-www-form-urlencoded') ) { data = qs.stringify(data); - } else if (Object.prototype.toString.call(data) !== '[object String]') { + } else if (Object.prototype.toString.call(data) !== '[object String]' && !isStream) { data = JSON.stringify(data); } @@ -67,12 +76,20 @@ async function request(options = {}) { const resHandler = (res) => { const statusCode = res.statusCode; + let response = new Response({ headers: res.headers, statusCode, statusMessage: res.statusMessage, }); + // 如果 responseType 为 stream 直接返回 + if (options.responseType === 'stream') { + response.data = res; + resolve(response); + return; + } + res.on('data', (chunk) => { response.addChunk(chunk); }); @@ -91,6 +108,13 @@ async function request(options = {}) { }); }; + const errHandler = (err) => { + if (req.aborted) { + return; + } + reject(enhanceError(err, options, null, req)); + } + let req; if (protocol === 'http:') { req = http.request(requestOptions, resHandler); @@ -101,24 +125,27 @@ async function request(options = {}) { } // Handle errors - req.on('error', (err) => { - if (req.aborted) { - return; - } - reject(enhanceError(err, options, null, req)); - }); + req.on('error', errHandler); // Handle request timeout if (timeout) { req.setTimeout(timeout, () => { req.abort(); + + if (isStream) { + data.destroy(); + } + reject(createError('timeout of ' + timeout + 'ms exceeded', options, 'ECONNABORTED', req)); }); } - if (data && ['GET', 'HEAD'].indexOf(method) === -1) { + if (isStream) { + return pump(data, req, (err) => err && errHandler(err)); + } else if (data && ['GET', 'HEAD'].indexOf(method) === -1) { req.write(data); } + req.end(); }); } diff --git a/package.json b/package.json index 9336f1e..b6175e9 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,7 @@ }, "ava": { "files": [ - "test/**/*.js" + "test/**/*.test.js" ] }, "repository": { @@ -30,6 +30,7 @@ }, "homepage": "https://github.com/node-labx/lightning-request", "dependencies": { + "pump": "^3.0.0", "sync-rpc": "^1.3.6" } } diff --git a/test/file/tmp.txt b/test/file/tmp.txt new file mode 100644 index 0000000..c317603 --- /dev/null +++ b/test/file/tmp.txt @@ -0,0 +1 @@ +this is a text file for stream test \ No newline at end of file diff --git a/test/request-with-stream.test.js b/test/request-with-stream.test.js new file mode 100644 index 0000000..60df61b --- /dev/null +++ b/test/request-with-stream.test.js @@ -0,0 +1,64 @@ +const test = require('ava'); +const fs = require('fs'); +const path = require('path'); +const request = require('../index'); +const httpServer = require('./server/index'); + +let port; +let server; + +test.before('run server', async () => { + try { + const result = await httpServer(); + port = result.port; + server = result.server; + } catch (err) { + console.log('test before hook error: ', err); + } +}); + +test.after('close server', function() { + server && server.close(); +}); + +test('put request with stream', async (t) => { + const tmp = path.join(process.cwd(), 'test/file/tmp.txt'); + const stream = fs.createReadStream(tmp); + + try { + const result = await request({ + url: 'http://localhost:' + port + '/stream', + data: stream, + method: 'PUT', + }); + + t.is(result.data, fs.readFileSync(tmp, { encoding: 'utf-8'})); + } catch (err) { + console.log(err); + t.fail(err); + } +}); + +test.cb('get request with stream responseType', (t) => { + t.plan(1); + + request({ + url: 'http://localhost:' + port + '/stream', + responseType: 'stream', + }).then((result) => { + const chunks = []; + + result.data.on('data', (buf) => chunks.push(buf)); + result.data.on('end', () => { + const datas = Buffer.concat(chunks).toString(); + const tmp = path.join(process.cwd(), 'test/file/tmp.txt'); + + t.is(datas, fs.readFileSync(tmp, { encoding: 'utf-8'})); + t.end(); + }) + }).catch((err) => { + console.log(err); + t.fail(err); + t.end(); + }); +}) diff --git a/test/server/index.js b/test/server/index.js new file mode 100644 index 0000000..85f755c --- /dev/null +++ b/test/server/index.js @@ -0,0 +1,47 @@ +const http = require('http'); +const fs = require('fs'); +const path = require('path'); + +module.exports = () => { + return new Promise((resolve, reject) => { + const server = http.createServer(function(req, res) { + const chunks = []; + + req.on('data', (buf) => { + chunks.push(buf); + }); + + req.on('end', () => { + const content = Buffer.concat(chunks).toString(); + + if (req.url === '/stream') { + if (req.method === 'PUT') { + res.end(content); + } else if (req.method === 'GET') { + const tmp = path.join(process.cwd(), 'test/file/tmp.txt'); + res.end(fs.readFileSync(tmp, { encoding: 'utf-8' })) + } + + return; + } + + res.end('test server') + }); + }); + + server.on('error', reject); + server.on('close', () => { + // console.log('test server closeed'); + }) + + server.listen(function() { + const address = server.address(); + + // console.log('test server listen at ', address); + resolve({ + port: address.port, + server, + }); + }); + }) +}; \ No newline at end of file