File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change 11from .hopper import HopperPipe , HopperPipeType
2+ from .reader import PipeReader , JsonReader
23
34__all__ = [
45 "HopperPipe" ,
56 "HopperPipeType" ,
7+ "PipeReader" ,
8+ "JsonReader" ,
69]
710
Original file line number Diff line number Diff line change 1+ import json
2+
3+ class PipeReader :
4+ def __init__ (self , pipe ):
5+ self ._PIPE = pipe
6+
7+ def read (self , len = 1024 ):
8+ return self ._PIPE .read (len )
9+
10+ class JsonReader (PipeReader ):
11+ def __init__ (self , pipe , read_validator = None ):
12+ super ().__init__ (pipe )
13+
14+ self .read_validator = read_validator if read_validator else self .default_read_validator
15+ self .tail = ""
16+
17+ @staticmethod
18+ def default_read_validator (_ ):
19+ return True
20+
21+ def _try_decode_json (self , s ):
22+ """ decode and validate the first json object from a string """
23+ decoder = json .JSONDecoder ()
24+ idx = 0
25+ while idx < len (s ):
26+ slice = s [idx :].lstrip ()
27+ if not slice :
28+ break
29+ offset = len (s [idx :]) - len (slice )
30+ try :
31+ obj , end = decoder .raw_decode (slice )
32+ if self .read_validator (obj ):
33+ idx += offset + end
34+ return obj , s [idx :].rstrip ()
35+ else :
36+ idx += 1
37+ except json .JSONDecodeError as e :
38+ idx += 1
39+ return None , s
40+
41+ def read (self ):
42+ buffer = self .tail
43+ chunk_size = 1024
44+ max_buffer_size = 1024 * 1024
45+
46+ while len (buffer ) < max_buffer_size :
47+ try :
48+ chunk = self ._PIPE .read (chunk_size )
49+ if not chunk :
50+ break
51+
52+ buffer += chunk .decode ("utf-8" )
53+
54+ obj , tail = self ._try_decode_json (buffer )
55+
56+ if obj :
57+ self .tail = tail
58+ return obj
59+
60+ except BlockingIOError :
61+ continue
62+
63+ return None
You can’t perform that action at this time.
0 commit comments