-
Notifications
You must be signed in to change notification settings - Fork 155
feat: Python UDFs: per-session inlining toggle and strict refusal setting #1546
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f6be2fb
14178db
eb3c194
7dd4252
a24daf6
74405d8
e8413dd
d00c619
f7c8c6c
949af91
050c7f2
0fc78b7
8827726
bcc4755
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -232,16 +232,39 @@ fn strip_wire_header<'a>( | |
| #[derive(Debug)] | ||
| pub struct PythonLogicalCodec { | ||
| inner: Arc<dyn LogicalExtensionCodec>, | ||
| python_udf_inlining: bool, | ||
| } | ||
|
|
||
| impl PythonLogicalCodec { | ||
| pub fn new(inner: Arc<dyn LogicalExtensionCodec>) -> Self { | ||
| Self { inner } | ||
| Self { | ||
| inner, | ||
| python_udf_inlining: true, | ||
| } | ||
| } | ||
|
|
||
| pub fn inner(&self) -> &Arc<dyn LogicalExtensionCodec> { | ||
| &self.inner | ||
| } | ||
|
|
||
| /// Toggle inline encoding of Python UDFs. See | ||
| /// `SessionContext.with_python_udf_inlining` (Python) for full | ||
| /// behavior and use cases. | ||
| /// | ||
| /// Security scope: strict mode (`false`) narrows only the codec | ||
| /// layer — it stops `Expr::from_bytes` from invoking | ||
| /// `cloudpickle.loads` on the inline `DFPY*` payload. It does | ||
| /// **not** make `pickle.loads(untrusted_bytes)` safe; treat every | ||
| /// `pickle.loads` on untrusted input as unsafe regardless of this | ||
| /// setting. | ||
| pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self { | ||
| self.python_udf_inlining = enabled; | ||
| self | ||
| } | ||
|
|
||
| pub fn python_udf_inlining(&self) -> bool { | ||
| self.python_udf_inlining | ||
| } | ||
| } | ||
|
|
||
| impl Default for PythonLogicalCodec { | ||
|
|
@@ -301,48 +324,104 @@ impl LogicalExtensionCodec for PythonLogicalCodec { | |
| } | ||
|
|
||
| fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> { | ||
| if try_encode_python_scalar_udf(node, buf)? { | ||
| if self.python_udf_inlining && try_encode_python_scalar_udf(node, buf)? { | ||
| return Ok(()); | ||
| } | ||
| self.inner.try_encode_udf(node, buf) | ||
| } | ||
|
|
||
| fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> { | ||
| if let Some(udf) = try_decode_python_scalar_udf(buf)? { | ||
| return Ok(udf); | ||
| if self.python_udf_inlining { | ||
| if let Some(udf) = try_decode_python_scalar_udf(buf)? { | ||
| return Ok(udf); | ||
| } | ||
| } else { | ||
| refuse_if_inline(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", name)?; | ||
| } | ||
| self.inner.try_decode_udf(name, buf) | ||
| } | ||
|
|
||
| fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> { | ||
| if try_encode_python_udaf(node, buf)? { | ||
| if self.python_udf_inlining && try_encode_python_udaf(node, buf)? { | ||
| return Ok(()); | ||
| } | ||
| self.inner.try_encode_udaf(node, buf) | ||
| } | ||
|
|
||
| fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> { | ||
| if let Some(udaf) = try_decode_python_udaf(buf)? { | ||
| return Ok(udaf); | ||
| if self.python_udf_inlining { | ||
| if let Some(udaf) = try_decode_python_udaf(buf)? { | ||
| return Ok(udaf); | ||
| } | ||
| } else { | ||
| refuse_if_inline(buf, PY_AGG_UDF_FAMILY, "aggregate UDF", name)?; | ||
| } | ||
| self.inner.try_decode_udaf(name, buf) | ||
| } | ||
|
|
||
| fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> { | ||
| if try_encode_python_udwf(node, buf)? { | ||
| if self.python_udf_inlining && try_encode_python_udwf(node, buf)? { | ||
| return Ok(()); | ||
| } | ||
| self.inner.try_encode_udwf(node, buf) | ||
| } | ||
|
|
||
| fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> { | ||
| if let Some(udwf) = try_decode_python_udwf(buf)? { | ||
| return Ok(udwf); | ||
| if self.python_udf_inlining { | ||
| if let Some(udwf) = try_decode_python_udwf(buf)? { | ||
| return Ok(udwf); | ||
| } | ||
| } else { | ||
| refuse_if_inline(buf, PY_WINDOW_UDF_FAMILY, "window UDF", name)?; | ||
| } | ||
| self.inner.try_decode_udwf(name, buf) | ||
| } | ||
| } | ||
|
|
||
| /// Strict-mode gate: if `buf` is a well-framed inline payload for | ||
| /// `family`, return the strict-refusal error; otherwise return | ||
| /// `Ok(())` so the caller can delegate to its `inner` codec. | ||
| /// | ||
| /// Routing through [`read_framed_payload`] (rather than a bare | ||
| /// `starts_with` probe) means malformed inline bytes — wrong | ||
| /// wire-format version, mismatched Python version, truncated header — | ||
| /// surface *their* diagnostic instead of the strict-mode message. | ||
| /// The strict message implies sender intent ("inlining is disabled"), | ||
| /// so it should fire only when the bytes really would have decoded. | ||
| /// | ||
| /// Fast path: short-circuit on the family-magic prefix before | ||
| /// acquiring the GIL. Plans with many non-Python UDFs would otherwise | ||
| /// pay a GIL acquisition per decode call just to confirm "not a | ||
| /// Python UDF". `read_framed_payload` itself rejects buffers that | ||
| /// don't start with `family`, so this is purely an optimization. | ||
| fn refuse_if_inline(buf: &[u8], family: &[u8], kind: &str, name: &str) -> Result<()> { | ||
| if !buf.starts_with(family) { | ||
| return Ok(()); | ||
| } | ||
| Python::attach(|py| match read_framed_payload(py, buf, family, kind)? { | ||
| Some(_) => Err(refuse_inline_payload(kind, name)), | ||
| None => Ok(()), | ||
| }) | ||
| } | ||
|
|
||
| /// Build the error returned by a strict codec when it receives an | ||
| /// inline Python-UDF payload it has been told not to deserialize. | ||
| fn refuse_inline_payload(kind: &str, name: &str) -> datafusion::error::DataFusionError { | ||
| // `Execution`, not `Plan`: this is a wire-format decode refusal at | ||
| // codec time, not a planner-stage failure. Downstream error | ||
| // classification keys off the variant — surfacing this as a planner | ||
| // error would mis-route it into "fix your SQL" buckets. | ||
| datafusion::error::DataFusionError::Execution(format!( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice if there was a page on this so we could include a url with even more context. I think your descriptions in the previous two prs are good but I suspect someone will stumble upon this with very little context as a general user and thinking about how to mitigate that. Can be wrapped in next PR or even a follow on for further nits/clarifiaction. |
||
| "Refusing to deserialize inline Python {kind} '{name}': Python UDF \ | ||
| inlining is disabled on this session. Two remediations: \ | ||
| (1) ask the sender to re-encode with inlining disabled so '{name}' \ | ||
| travels by name, and register '{name}' on this receiver; or \ | ||
| (2) enable inlining on this receiver (accepts the cloudpickle \ | ||
| execution risk on inbound payloads). Receivers cannot re-encode \ | ||
| bytes they did not produce." | ||
| )) | ||
| } | ||
|
|
||
| /// `PhysicalExtensionCodec` mirror of [`PythonLogicalCodec`] parked | ||
| /// on the same `SessionContext`. Carries the Python-aware encoding | ||
| /// hooks for physical-layer types (`ExecutionPlan`, `PhysicalExpr`) | ||
|
|
@@ -358,16 +437,33 @@ impl LogicalExtensionCodec for PythonLogicalCodec { | |
| #[derive(Debug)] | ||
| pub struct PythonPhysicalCodec { | ||
| inner: Arc<dyn PhysicalExtensionCodec>, | ||
| python_udf_inlining: bool, | ||
| } | ||
|
|
||
| impl PythonPhysicalCodec { | ||
| pub fn new(inner: Arc<dyn PhysicalExtensionCodec>) -> Self { | ||
| Self { inner } | ||
| Self { | ||
| inner, | ||
| python_udf_inlining: true, | ||
| } | ||
| } | ||
|
|
||
| pub fn inner(&self) -> &Arc<dyn PhysicalExtensionCodec> { | ||
| &self.inner | ||
| } | ||
|
|
||
| /// Toggle inline encoding of Python UDFs on this physical codec. | ||
| /// | ||
| /// Mirrors [`PythonLogicalCodec::with_python_udf_inlining`]; see | ||
| /// that method for the full security and portability discussion. | ||
| pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self { | ||
| self.python_udf_inlining = enabled; | ||
| self | ||
| } | ||
|
|
||
| pub fn python_udf_inlining(&self) -> bool { | ||
| self.python_udf_inlining | ||
| } | ||
| } | ||
|
|
||
| impl Default for PythonPhysicalCodec { | ||
|
|
@@ -391,15 +487,19 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec { | |
| } | ||
|
|
||
| fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> { | ||
| if try_encode_python_scalar_udf(node, buf)? { | ||
| if self.python_udf_inlining && try_encode_python_scalar_udf(node, buf)? { | ||
| return Ok(()); | ||
| } | ||
| self.inner.try_encode_udf(node, buf) | ||
| } | ||
|
|
||
| fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> { | ||
| if let Some(udf) = try_decode_python_scalar_udf(buf)? { | ||
| return Ok(udf); | ||
| if self.python_udf_inlining { | ||
| if let Some(udf) = try_decode_python_scalar_udf(buf)? { | ||
| return Ok(udf); | ||
| } | ||
| } else { | ||
| refuse_if_inline(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", name)?; | ||
| } | ||
| self.inner.try_decode_udf(name, buf) | ||
| } | ||
|
|
@@ -417,29 +517,37 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec { | |
| } | ||
|
|
||
| fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> { | ||
| if try_encode_python_udaf(node, buf)? { | ||
| if self.python_udf_inlining && try_encode_python_udaf(node, buf)? { | ||
| return Ok(()); | ||
| } | ||
| self.inner.try_encode_udaf(node, buf) | ||
| } | ||
|
|
||
| fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> { | ||
| if let Some(udaf) = try_decode_python_udaf(buf)? { | ||
| return Ok(udaf); | ||
| if self.python_udf_inlining { | ||
| if let Some(udaf) = try_decode_python_udaf(buf)? { | ||
| return Ok(udaf); | ||
| } | ||
| } else { | ||
| refuse_if_inline(buf, PY_AGG_UDF_FAMILY, "aggregate UDF", name)?; | ||
| } | ||
| self.inner.try_decode_udaf(name, buf) | ||
| } | ||
|
|
||
| fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> { | ||
| if try_encode_python_udwf(node, buf)? { | ||
| if self.python_udf_inlining && try_encode_python_udwf(node, buf)? { | ||
| return Ok(()); | ||
| } | ||
| self.inner.try_encode_udwf(node, buf) | ||
| } | ||
|
|
||
| fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> { | ||
| if let Some(udwf) = try_decode_python_udwf(buf)? { | ||
| return Ok(udwf); | ||
| if self.python_udf_inlining { | ||
| if let Some(udwf) = try_decode_python_udwf(buf)? { | ||
| return Ok(udwf); | ||
| } | ||
| } else { | ||
| refuse_if_inline(buf, PY_WINDOW_UDF_FAMILY, "window UDF", name)?; | ||
| } | ||
| self.inner.try_decode_udwf(name, buf) | ||
| } | ||
|
|
@@ -476,6 +584,9 @@ pub(crate) fn try_encode_python_scalar_udf(node: &ScalarUDF, buf: &mut Vec<u8>) | |
| /// the caller to delegate to its `inner` codec (and eventually the | ||
| /// `FunctionRegistry`). | ||
| pub(crate) fn try_decode_python_scalar_udf(buf: &[u8]) -> Result<Option<Arc<ScalarUDF>>> { | ||
| if !buf.starts_with(PY_SCALAR_UDF_FAMILY) { | ||
| return Ok(None); | ||
| } | ||
| Python::attach(|py| -> Result<Option<Arc<ScalarUDF>>> { | ||
| let Some(payload) = read_framed_payload(py, buf, PY_SCALAR_UDF_FAMILY, "scalar UDF")? | ||
| else { | ||
|
|
@@ -732,6 +843,9 @@ pub(crate) fn try_encode_python_udwf(node: &WindowUDF, buf: &mut Vec<u8>) -> Res | |
| } | ||
|
|
||
| pub(crate) fn try_decode_python_udwf(buf: &[u8]) -> Result<Option<Arc<WindowUDF>>> { | ||
| if !buf.starts_with(PY_WINDOW_UDF_FAMILY) { | ||
| return Ok(None); | ||
| } | ||
| Python::attach(|py| -> Result<Option<Arc<WindowUDF>>> { | ||
| let Some(payload) = read_framed_payload(py, buf, PY_WINDOW_UDF_FAMILY, "window UDF")? | ||
| else { | ||
|
|
@@ -814,6 +928,9 @@ pub(crate) fn try_encode_python_udaf(node: &AggregateUDF, buf: &mut Vec<u8>) -> | |
| } | ||
|
|
||
| pub(crate) fn try_decode_python_udaf(buf: &[u8]) -> Result<Option<Arc<AggregateUDF>>> { | ||
| if !buf.starts_with(PY_AGG_UDF_FAMILY) { | ||
| return Ok(None); | ||
| } | ||
| Python::attach(|py| -> Result<Option<Arc<AggregateUDF>>> { | ||
| let Some(payload) = read_framed_payload(py, buf, PY_AGG_UDF_FAMILY, "aggregate UDF")? | ||
| else { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1769,3 +1769,48 @@ def with_physical_extension_codec(self, codec: Any) -> SessionContext: | |
| new = SessionContext.__new__(SessionContext) | ||
| new.ctx = new_internal | ||
| return new | ||
|
|
||
| def with_python_udf_inlining(self, *, enabled: bool) -> SessionContext: | ||
| """Control whether Python UDFs are embedded in serialized expressions. | ||
|
|
||
| When ``enabled=True`` (the default), serialized expressions carry | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Enabled isn't an optional argument with a default to true here. So either this is misaligned or it's saying that the class has things enabled by default which isn't clear in the docstring. |
||
| the Python code for any scalar, aggregate, or window UDFs they | ||
| reference. The receiver rebuilds the UDFs from those bytes and | ||
| does not need to register them first. | ||
|
|
||
| When ``enabled=False``, serialized expressions store only the | ||
| UDF names. This has two uses: | ||
|
|
||
| * **Cross-language portability.** The bytes can be decoded by a | ||
| non-Python receiver, which must already have UDFs registered | ||
| under matching names. | ||
| * **Safer deserialization.** :meth:`Expr.from_bytes` will refuse | ||
| to rebuild Python UDFs rather than call ``cloudpickle.loads`` | ||
| on untrusted input. | ||
|
|
||
| The setting affects :meth:`Expr.to_bytes` and | ||
| :meth:`Expr.from_bytes` whenever this session is passed as the | ||
| ``ctx`` argument. :func:`pickle.dumps` and :func:`pickle.loads` | ||
| do not pass a context, so to apply the setting through pickle, | ||
| register this session with | ||
| :func:`datafusion.ipc.set_sender_ctx` on the sender and | ||
| :func:`datafusion.ipc.set_worker_ctx` on the receiver. | ||
|
|
||
| .. warning:: Security | ||
| This setting narrows only :meth:`Expr.from_bytes`. Calling | ||
| :func:`pickle.loads` on untrusted bytes remains unsafe | ||
| regardless of the toggle. | ||
|
|
||
| Returns a new :class:`SessionContext` with the toggle applied; | ||
| the original session is unchanged. | ||
|
|
||
| Examples: | ||
| >>> from datafusion import SessionContext | ||
| >>> strict = SessionContext().with_python_udf_inlining(enabled=False) | ||
| >>> isinstance(strict, SessionContext) | ||
| True | ||
|
Comment on lines
+1808
to
+1811
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't really demonstrate this functionality |
||
| """ | ||
| new_internal = self.ctx.with_python_udf_inlining(enabled) | ||
| new = SessionContext.__new__(SessionContext) | ||
| new.ctx = new_internal | ||
| return new | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: throughout this UDF seems to be used in two contexts. UDF as short hand for scalar UDF and UDF as short hand for the broader set of all flavors of user functions udf,udaf,udwf. Everytime I've been reviewing these I kind of forget that and have to revisit things to make sure I'm doing the right mapping in my head. Maybe that's expected and standard. No change request more of just an FYI in case there are thoughts on how to resolve this overloading for potentially making it easier to bring on future people.