11# -*- coding: utf-8 -*-
22import math
33import enum
4+ import threading
45
56from ._brotlicffi import ffi , lib
67
@@ -249,6 +250,7 @@ def __init__(self,
249250 quality = lib .BROTLI_DEFAULT_QUALITY ,
250251 lgwin = lib .BROTLI_DEFAULT_WINDOW ,
251252 lgblock = 0 ):
253+ self .lock = threading .RLock ()
252254 enc = lib .BrotliEncoderCreateInstance (
253255 ffi .NULL , ffi .NULL , ffi .NULL
254256 )
@@ -271,28 +273,34 @@ def _compress(self, data, operation):
271273 because almost all of the code uses the exact same setup. It wouldn't
272274 have to, but it doesn't hurt at all.
273275 """
274- # The 'algorithm' for working out how big to make this buffer is from
275- # the Brotli source code, brotlimodule.cc.
276- original_output_size = int (
277- math .ceil (len (data ) + (len (data ) >> 2 ) + 10240 )
278- )
279- available_out = ffi .new ("size_t *" )
280- available_out [0 ] = original_output_size
281- output_buffer = ffi .new ("uint8_t []" , available_out [0 ])
282- ptr_to_output_buffer = ffi .new ("uint8_t **" , output_buffer )
283- input_size = ffi .new ("size_t *" , len (data ))
284- input_buffer = ffi .new ("uint8_t []" , data )
285- ptr_to_input_buffer = ffi .new ("uint8_t **" , input_buffer )
286-
287- rc = lib .BrotliEncoderCompressStream (
288- self ._encoder ,
289- operation ,
290- input_size ,
291- ptr_to_input_buffer ,
292- available_out ,
293- ptr_to_output_buffer ,
294- ffi .NULL
295- )
276+ if not self .lock .acquire (blocking = False ):
277+ raise error (
278+ "Concurrently sharing Compressor objects is not allowed" )
279+ try :
280+ # The 'algorithm' for working out how big to make this buffer is
281+ # from the Brotli source code, brotlimodule.cc.
282+ original_output_size = int (
283+ math .ceil (len (data ) + (len (data ) >> 2 ) + 10240 )
284+ )
285+ available_out = ffi .new ("size_t *" )
286+ available_out [0 ] = original_output_size
287+ output_buffer = ffi .new ("uint8_t []" , available_out [0 ])
288+ ptr_to_output_buffer = ffi .new ("uint8_t **" , output_buffer )
289+ input_size = ffi .new ("size_t *" , len (data ))
290+ input_buffer = ffi .new ("uint8_t []" , data )
291+ ptr_to_input_buffer = ffi .new ("uint8_t **" , input_buffer )
292+
293+ rc = lib .BrotliEncoderCompressStream (
294+ self ._encoder ,
295+ operation ,
296+ input_size ,
297+ ptr_to_input_buffer ,
298+ available_out ,
299+ ptr_to_output_buffer ,
300+ ffi .NULL
301+ )
302+ finally :
303+ self .lock .release ()
296304 if rc != lib .BROTLI_TRUE : # pragma: no cover
297305 raise error ("Error encountered compressing data." )
298306
@@ -320,11 +328,17 @@ def flush(self):
320328 will not destroy the compressor. It can be used, for example, to ensure
321329 that given chunks of content will decompress immediately.
322330 """
323- chunks = [self ._compress (b'' , lib .BROTLI_OPERATION_FLUSH )]
324-
325- while lib .BrotliEncoderHasMoreOutput (self ._encoder ) == lib .BROTLI_TRUE :
326- chunks .append (self ._compress (b'' , lib .BROTLI_OPERATION_FLUSH ))
327-
331+ if not self .lock .acquire (blocking = False ):
332+ raise error (
333+ "Concurrently sharing Compressor objects is not allowed" )
334+ try :
335+ chunks = [self ._compress (b'' , lib .BROTLI_OPERATION_FLUSH )]
336+
337+ while ((lib .BrotliEncoderHasMoreOutput (self ._encoder ) ==
338+ lib .BROTLI_TRUE )):
339+ chunks .append (self ._compress (b'' , lib .BROTLI_OPERATION_FLUSH ))
340+ finally :
341+ self .lock .release ()
328342 return b'' .join (chunks )
329343
330344 def finish (self ):
@@ -333,10 +347,16 @@ def finish(self):
333347 transition the compressor to a completed state. The compressor cannot
334348 be used again after this point, and must be replaced.
335349 """
336- chunks = []
337- while lib .BrotliEncoderIsFinished (self ._encoder ) == lib .BROTLI_FALSE :
338- chunks .append (self ._compress (b'' , lib .BROTLI_OPERATION_FINISH ))
339-
350+ if not self .lock .acquire (blocking = False ):
351+ raise error (
352+ "Concurrently sharing Compressor objects is not allowed" )
353+ try :
354+ chunks = []
355+ while ((lib .BrotliEncoderIsFinished (self ._encoder ) ==
356+ lib .BROTLI_FALSE )):
357+ chunks .append (self ._compress (b'' , lib .BROTLI_OPERATION_FINISH ))
358+ finally :
359+ self .lock .release ()
340360 return b'' .join (chunks )
341361
342362
@@ -362,6 +382,7 @@ class Decompressor(object):
362382 _unconsumed_data = None
363383
364384 def __init__ (self , dictionary = b'' ):
385+ self .lock = threading .Lock ()
365386 dec = lib .BrotliDecoderCreateInstance (ffi .NULL , ffi .NULL , ffi .NULL )
366387 self ._decoder = ffi .gc (dec , lib .BrotliDecoderDestroyInstance )
367388 self ._unconsumed_data = b''
@@ -409,6 +430,16 @@ def decompress(self, data, output_buffer_limit=None):
409430 :type output_buffer_limit: ``int`` or ``None``
410431 :returns: A bytestring containing the decompressed data.
411432 """
433+ if not self .lock .acquire (blocking = False ):
434+ raise error (
435+ "Concurrently sharing Decompressor instances is not allowed" )
436+ try :
437+ chunks = self ._decompress (data , output_buffer_limit )
438+ finally :
439+ self .lock .release ()
440+ return b'' .join (chunks )
441+
442+ def _decompress (self , data , output_buffer_limit ):
412443 if self ._unconsumed_data and data :
413444 raise error (
414445 "brotli: decoder process called with data when "
@@ -486,8 +517,7 @@ def decompress(self, data, output_buffer_limit=None):
486517 else :
487518 # It's cool if we need more output, we just loop again.
488519 assert rc == lib .BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT
489-
490- return b'' .join (chunks )
520+ return chunks
491521
492522 process = decompress
493523
@@ -527,7 +557,15 @@ def is_finished(self):
527557 Returns ``True`` if the decompression stream
528558 is complete, ``False`` otherwise
529559 """
530- return lib .BrotliDecoderIsFinished (self ._decoder ) == lib .BROTLI_TRUE
560+ if not self .lock .acquire (blocking = False ):
561+ raise error (
562+ "Concurrently sharing Decompressor instances is not allowed" )
563+ try :
564+ ret = (
565+ lib .BrotliDecoderIsFinished (self ._decoder ) == lib .BROTLI_TRUE )
566+ finally :
567+ self .lock .release ()
568+ return ret
531569
532570 def can_accept_more_data (self ):
533571 """
@@ -550,8 +588,16 @@ def can_accept_more_data(self):
550588 more compressed data.
551589 :rtype: ``bool``
552590 """
553- if len (self ._unconsumed_data ) > 0 :
554- return False
555- if lib .BrotliDecoderHasMoreOutput (self ._decoder ) == lib .BROTLI_TRUE :
556- return False
557- return True
591+ if not self .lock .acquire (blocking = False ):
592+ raise error (
593+ "Concurrently sharing Decompressor instances is not allowed" )
594+ try :
595+ ret = True
596+ if len (self ._unconsumed_data ) > 0 :
597+ ret = False
598+ if ((lib .BrotliDecoderHasMoreOutput (self ._decoder ) ==
599+ lib .BROTLI_TRUE )):
600+ ret = False
601+ finally :
602+ self .lock .release ()
603+ return ret
0 commit comments