1- from cassandra . connection import Connection , ConnectionShutdown
1+ import threading
22
3+ from cassandra .connection import Connection , ConnectionShutdown
4+ import sys
35import asyncio
46import logging
57import os
@@ -88,9 +90,11 @@ def __init__(self, *args, **kwargs):
8890
8991 self ._connect_socket ()
9092 self ._socket .setblocking (0 )
91-
92- self ._write_queue = asyncio .Queue ()
93- self ._write_queue_lock = asyncio .Lock ()
93+ loop_args = dict ()
94+ if sys .version_info [0 ] == 3 and sys .version_info [1 ] < 10 :
95+ loop_args ['loop' ] = self ._loop
96+ self ._write_queue = asyncio .Queue (** loop_args )
97+ self ._write_queue_lock = asyncio .Lock (** loop_args )
9498
9599 # see initialize_reactor -- loop is running in a separate thread, so we
96100 # have to use a threadsafe call
@@ -108,8 +112,11 @@ def initialize_reactor(cls):
108112 if cls ._pid != os .getpid ():
109113 cls ._loop = None
110114 if cls ._loop is None :
111- cls ._loop = asyncio .new_event_loop ()
112- asyncio .set_event_loop (cls ._loop )
115+ try :
116+ cls ._loop = asyncio .get_running_loop ()
117+ except RuntimeError :
118+ cls ._loop = asyncio .new_event_loop ()
119+ asyncio .set_event_loop (cls ._loop )
113120
114121 if not cls ._loop_thread :
115122 # daemonize so the loop will be shut down on interpreter
@@ -162,7 +169,7 @@ def push(self, data):
162169 else :
163170 chunks = [data ]
164171
165- if self ._loop_thread . ident != get_ident ():
172+ if self ._loop_thread != threading . current_thread ():
166173 asyncio .run_coroutine_threadsafe (
167174 self ._push_msg (chunks ),
168175 loop = self ._loop
@@ -173,7 +180,7 @@ def push(self, data):
173180
174181 async def _push_msg (self , chunks ):
175182 # This lock ensures all chunks of a message are sequential in the Queue
176- with await self ._write_queue_lock :
183+ async with self ._write_queue_lock :
177184 for chunk in chunks :
178185 self ._write_queue .put_nowait (chunk )
179186
0 commit comments