@@ -2,6 +2,7 @@ import buffers from './testing/test-buffers'
22import BufferList from './testing/buffer-list'
33import { PgPacketStream } from './'
44import assert from 'assert'
5+ import { Readable } from 'stream'
56
67var authOkBuffer = buffers . authenticationOk ( )
78var paramStatusBuffer = buffers . parameterStatus ( 'client_encoding' , 'UTF8' )
@@ -136,23 +137,25 @@ var expectedTwoRowMessage = {
136137 } ]
137138}
138139
140+ const concat = ( stream : Readable ) : Promise < any [ ] > => {
141+ return new Promise ( ( resolve ) => {
142+ const results : any [ ] = [ ]
143+ stream . on ( 'data' , item => results . push ( item ) )
144+ stream . on ( 'end' , ( ) => resolve ( results ) )
145+ } )
146+ }
147+
139148var testForMessage = function ( buffer : Buffer , expectedMessage : any ) {
140149 it ( 'recieves and parses ' + expectedMessage . name , async ( ) => {
141150 const parser = new PgPacketStream ( ) ;
151+ parser . write ( buffer ) ;
152+ parser . end ( ) ;
153+ const [ lastMessage ] = await concat ( parser ) ;
142154
143- await new Promise ( ( resolve ) => {
144- let lastMessage : any = { }
145- parser . on ( 'message' , function ( msg ) {
146- lastMessage = msg
147- } )
148-
149- parser . write ( buffer ) ;
155+ for ( const key in expectedMessage ) {
156+ assert . deepEqual ( lastMessage [ key ] , expectedMessage [ key ] )
157+ }
150158
151- for ( const key in expectedMessage ) {
152- assert . deepEqual ( lastMessage [ key ] , expectedMessage [ key ] )
153- }
154- resolve ( ) ;
155- } )
156159 } )
157160}
158161
@@ -388,17 +391,14 @@ describe('PgPacketStream', function () {
388391 describe ( 'split buffer, single message parsing' , function ( ) {
389392 var fullBuffer = buffers . dataRow ( [ null , 'bang' , 'zug zug' , null , '!' ] )
390393
391- const parse = ( buffers : Buffer [ ] ) : Promise < any > => {
392- return new Promise ( ( resolve ) => {
393- const parser = new PgPacketStream ( ) ;
394- parser . once ( 'message' , ( msg ) => {
395- resolve ( msg )
396- } )
397- for ( const buffer of buffers ) {
398- parser . write ( buffer ) ;
399- }
400- parser . end ( )
401- } )
394+ const parse = async ( buffers : Buffer [ ] ) : Promise < any > => {
395+ const parser = new PgPacketStream ( ) ;
396+ for ( const buffer of buffers ) {
397+ parser . write ( buffer ) ;
398+ }
399+ parser . end ( )
400+ const [ msg ] = await concat ( parser )
401+ return msg ;
402402 }
403403
404404 it ( 'parses when full buffer comes in' , async function ( ) {
@@ -448,20 +448,12 @@ describe('PgPacketStream', function () {
448448 readyForQueryBuffer . copy ( fullBuffer , dataRowBuffer . length , 0 )
449449
450450 const parse = ( buffers : Buffer [ ] ) : Promise < any [ ] > => {
451- return new Promise ( ( resolve ) => {
452- const parser = new PgPacketStream ( ) ;
453- const results : any [ ] = [ ]
454- parser . on ( 'message' , ( msg ) => {
455- results . push ( msg )
456- if ( results . length === 2 ) {
457- resolve ( results )
458- }
459- } )
460- for ( const buffer of buffers ) {
461- parser . write ( buffer ) ;
462- }
463- parser . end ( )
464- } )
451+ const parser = new PgPacketStream ( ) ;
452+ for ( const buffer of buffers ) {
453+ parser . write ( buffer ) ;
454+ }
455+ parser . end ( )
456+ return concat ( parser )
465457 }
466458
467459 var verifyMessages = function ( messages : any [ ] ) {
0 commit comments