spenibus.net
2015-07-22 13:58:01 GMT

Using xmlHttpRequest on a pseudo http stream

This is a post based on my answer to a Stack Overflow question (minus code): >[**Memory efficient message chunk processing using a XMLHttpRequest**](http://stackoverflow.com/q/31434575/3512867) >I have a XMLHttpRequest with a progress event handler that is requesting a chunked page which continuously sends adds message chunks. If I do not set a responseType, I can access the response property of the XMLHttpRequest in each progress event and handle the additional message chunk. The problem of this approach is that the browser must keep the entire response in memory, and eventually, the browser will crash due to this memory waste. >So, I tried a responseType of arraybuffer in the hope that I can slice the buffer to prevent the previous excessive memory waste. Unfortunately, the progress event handler is no longer capable of reading the response property of the XMLHttpRequest at this point. The event parameter of the progress event does not contain the buffer, either. Here is a short, self-contained example of my attempt at this (this is written for node.js): >`...some javascript code...` >The progress event handler has no access to the response I wanted. How can I handle the message chunks in the browser in a memory-efficient way? Please do not suggest a WebSocket. I do not wish to use one just to process a read-only stream of message chunks. My answer -- `XMLHttpRequest` doesn't seem really designed for this kind of usage. The obvious solution is polling, which is a popular use of `XMLHttpRequest` but I'm guessing you don't want to miss data from your stream that would slip between the calls. To my question [`Can the "real" data chunks be identified in some way or is it basically random data ?`](http://stackoverflow.com/questions/31434575/memory-efficient-message-chunk-processing-using-a-xmlhttprequest?noredirect=1#comment50976111_31434575), you answered [`With some effort, the chunks could be identified by adding an event-id of sorts to the server-side`](http://stackoverflow.com/questions/31434575/memory-efficient-message-chunk-processing-using-a-xmlhttprequest?noredirect=1#comment50982951_31434575) Based on this premise, I propose: The idea: cooperating concurrent listeners -- 1. Connect to the stream and set up the progress listener (referred to as `listenerA()`). 2. When a chunk arrives, process it and output it. Keep a reference to the ids of both the first and last chunk received by `listenerA()`. Count how many chunks `listenerA()` has received. 3. After `listenerA()` has received a certain amount of chunks, spawn another "thread" (connection + listener, `listenerB()`) doing the steps 1 and 2 in parallel to the first one but keep the processed data in a buffer instead of outputting it. 4. When `listenerA()` receives the chunk with the same id as the first chunk received by `listenerB()`, send a signal to `listenerB()`, drop the first connection and kill `listenerA()`. 5. When `listenerB()` receives the termination signal from the `listenerA()`, dump the buffer to the output and keep processing normally. 6. Have `listenerB()` spawn `listenerC()` on the same conditions as before. 7. Keep repeating with as many connections + listeners as necessary. By using two overlapping connections, you can prevent the possible loss of chunks that would result from dropping a single connection and then reconnecting. Notes -- * This assumes the data stream is the same for all connections and doesn't introduce some individualized settings. * Depending on the output rate of the stream and the connection delay, the buffer dump during the transition from one connection to another might be noticeable. * You could also measure the total response size rather than the chunks count to decide when to switch to a new connection. * It might be necessary to keep a complete list of chunks ids to compare against rather than just the first and last one because we can't guarantee the timing of the overlap. * The `responseType` of `XMLHttpRequest` must be set to its default value of `""` or "text", to return text. Other datatypes will not return a partial `response`. See <https://xhr.spec.whatwg.org/#the-response-attribute> Test server in node.js -- The following code is a node.js server that outputs a consistent stream of elements for testing purposes. You can open multiple connections to it, the output will be the same accross sessions, minus possible server lag. >http://localhost:5500/stream will return data where id is an incremented number >http://localhost:5500/streamRandom will return data where id is a random 40 characters long string. This is meant to test a scenario where the id can not be relied upon for ordering the data. var crypto = require('crypto'); // init + update nodeId var nodeId = 0; var nodeIdRand = '0000000000000000000000000000000000000000'; setInterval(function() { // regular id ++nodeId; //random id nodeIdRand = crypto.createHash('sha1').update(nodeId.toString()).digest('hex'); }, 1000); // create server (port 5500) var http = require('http'); http.createServer(function(req, res) { if(req.url === '/stream') { return serverStream(res); } else if(req.url === '/streamRandom') { return serverStream(res, true); } }).listen(5500); // serve nodeId function serverStream(res, rand) { // headers res.writeHead(200, { 'Content-Type' : 'text/plain', 'Access-Control-Allow-Origin' : '*', }); // remember last served id var last = null; // output interval setInterval(function() { // output on new node if(last != nodeId) { res.write('[node id="'+(rand ? nodeIdRand : nodeId)+'"]'); last = nodeId; } }, 250); } Proof of concept, using aforementioned node.js server code -- <!DOCTYPE html> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/> </head> <body> <button id="stop">stop</button> <div id="output"></div> <script> /* Listening to a never ending page load (http stream) without running out of memory by using concurrent overlapping connections to prevent loss of data, using only xmlHttpRequest, under the condition that the data can be identified. listen arguments url url of the http stream chunkMax number of chunks to receive before switching to new connection listen properties output a reference to a DOM element with id "output" queue an array filled with non-duplicate received chunks and metadata lastFetcherId an incrementing number used to assign an id to new fetchers fetchers an array listing all active fetchers listen methods fire internal use fire an event stop external use stop all connections fetch internal use starts a new connection fetchRun internal use initialize a new fetcher object Usage var myListen = new listen('http://localhost:5500/streamRandom', 20); will listen to url "http://localhost:5500/streamRandom" will switch connections every 20 chunks myListen.stop() will stop all connections in myListen */ function listen(url, chunkMax) { // main ref var that = this; // output element that.output = document.getElementById('output'); // main queue that.queue = []; // last fetcher id that.lastFetcherId = 0; // list of fetchers that.fetchers = []; //********************************************************* event dispatcher that.fire = function(name, data) { document.dispatchEvent(new CustomEvent(name, {'detail':data})); } //******************************************************** kill all fetchers that.stop = function() { that.fire('fetch-kill', -1); } //************************************************************** url fetcher that.fetch = function(fetchId, url, fetchRef) { //console.log('start fetcher #'+fetchId); var len = 0; var xhr = new XMLHttpRequest(); var cb_progress; var cb_kill; // progress listener xhr.addEventListener('progress', cb_progress = function(e) { // extract chunk data var chunkData = xhr.response.substr(len); // chunk id var chunkId = chunkData.match(/id="([a-z0-9]+)"/)[1]; // update response end point len = xhr.response.length; // signal end of chunk processing that.fire('chunk-ready', { 'fetchId' : fetchId, 'fetchRef' : fetchRef, 'chunkId' : chunkId, 'chunkData' : chunkData, }); }, false); // kill switch document.addEventListener('fetch-kill', cb_kill = function(e) { // kill this fetcher or all fetchers (-1) if(e.detail == fetchId || e.detail == -1) { //console.log('kill fetcher #'+fetchId); xhr.removeEventListener('progress', cb_progress); document.removeEventListener('fetch-kill', cb_kill); xhr.abort(); that.fetchers.shift(); // remove oldest fetcher from list xhr = null; delete xhr; } }, false); // go xhr.open('GET', url, true); xhr.responseType = 'text'; xhr.send(); }; //****************************************************** start a new fetcher that.fetchRun = function() { // new id var id = ++that.lastFetcherId; //console.log('create fetcher #'+id); // create fetcher with new id var fetchRef = { 'id' : id, // self id 'queue' : [], // internal queue 'chunksIds' : [], // retrieved ids, also used to count 'hasSuccessor' : false, // keep track of next fetcher spawn 'ignoreId' : null, // when set, ignore chunks until this id is received (this id included) }; that.fetchers.push(fetchRef); // run fetcher that.fetch(id, url, fetchRef); }; //************************************************ a fetcher returns a chunk document.addEventListener('chunk-ready', function(e) { // shorthand var f = e.detail; // ignore flag is not set, process chunk if(f.fetchRef.ignoreId == null) { // store chunk id f.fetchRef.chunksIds.push(f.chunkId); // create queue item var queueItem = {'id':f.chunkId, 'data':f.chunkData}; // chunk is received from oldest fetcher if(f.fetchId == that.fetchers[0].id) { // send to main queue that.queue.push(queueItem); // signal queue insertion that.fire('queue-new'); } // not oldest fetcher else { // use fetcher internal queue f.fetchRef.queue.push(queueItem); } } // ignore flag is set, current chunk id the one to ignore else if(f.fetchRef.ignoreId == f.chunkId) { // disable ignore flag f.fetchRef.ignoreId = null; } //******************** check chunks count for fetcher, threshold reached if(f.fetchRef.chunksIds.length >= chunkMax && !f.fetchRef.hasSuccessor) { // remember the spawn f.fetchRef.hasSuccessor = true; // spawn new fetcher that.fetchRun(); } /*********************************************************************** check if the first chunk of the second oldest fetcher exists in the oldest fetcher. If true, then they overlap and we can kill the oldest fetcher ***********************************************************************/ if( // is this the oldest fetcher ? f.fetchId == that.fetchers[0].id // is there a successor ? && that.fetchers[1] // has oldest fetcher received the first chunk of its successor ? && that.fetchers[0].chunksIds.indexOf( that.fetchers[1].chunksIds[0] ) > -1 ) { // get index of last chunk of the oldest fetcher within successor queue var lastChunkId = that.fetchers[0].chunksIds[that.fetchers[0].chunksIds.length-1] var lastChunkIndex = that.fetchers[1].chunksIds.indexOf(lastChunkId); // successor has not reached its parent last chunk if(lastChunkIndex < 0) { // discard whole queue that.fetchers[1].queue = []; that.fetchers[1].chunksIds = []; // set ignore id in successor to future discard duplicates that.fetchers[1].ignoreId = lastChunkId; } // there is overlap else { /** console.log('triming queue start: '+that.fetchers[1].queue.length +" "+(lastChunkIndex+1) +" "+(that.fetchers[1].queue.length-1) ); /**/ var trimStart = lastChunkIndex+1; var trimEnd = that.fetchers[1].queue.length-1; // trim queue that.fetchers[1].queue = that.fetchers[1].queue.splice(trimStart, trimEnd); that.fetchers[1].chunksIds = that.fetchers[1].chunksIds.splice(trimStart, trimEnd); //console.log('triming queue end: '+that.fetchers[1].queue.length); } // kill oldest fetcher that.fire('fetch-kill', that.fetchers[0].id); } }, false); //***************************************************** main queue processor document.addEventListener('queue-new', function(e) { // process chunks in queue while(that.queue.length > 0) { // get chunk and remove from queue var chunk = that.queue.shift(); // output item to document if(that.output) { that.output.innerHTML += "<br />"+chunk.data; } } }, false); //****************************************************** start first fetcher that.fetchRun(); }; // run var process = new listen('http://localhost:5500/streamRandom', 20); // bind global kill switch to button document.getElementById('stop').addEventListener('click', process.stop, false); </script> </body> </html> **We're done here**