From 060a6ea190453f252c1d3475ca36f787bb50b95b Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 8 Jun 2026 10:50:12 +0200 Subject: [PATCH] Bump shared core --- Cargo.lock | 2 +- Cargo.toml | 2 +- python/restate/server_context.py | 36 +++++++++++++++++------------ python/restate/vm.py | 19 ++++++++++++++-- src/lib.rs | 39 ++++++++++++++++++++++++-------- 5 files changed, 69 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ff60afb..eda9ea2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -437,7 +437,7 @@ dependencies = [ [[package]] name = "restate-sdk-shared-core" version = "0.10.0" -source = "git+https://github.com/restatedev/sdk-shared-core.git?rev=f6f6e4830226161a441dd7f1063e255cdc4052c1#f6f6e4830226161a441dd7f1063e255cdc4052c1" +source = "git+https://github.com/restatedev/sdk-shared-core.git?rev=5127f0291bff456a515f2b8d572c4090e8ff450e#5127f0291bff456a515f2b8d572c4090e8ff450e" dependencies = [ "base64", "bs58", diff --git a/Cargo.toml b/Cargo.toml index 518532f..5e6e675 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,4 +14,4 @@ doc = false [dependencies] pyo3 = { version = "0.25.1", features = ["extension-module"] } tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } -restate-sdk-shared-core = { git = "https://github.com/restatedev/sdk-shared-core.git", rev = "f6f6e4830226161a441dd7f1063e255cdc4052c1", features = ["request_identity", "sha2_random_seed"] } +restate-sdk-shared-core = { git = "https://github.com/restatedev/sdk-shared-core.git", rev = "5127f0291bff456a515f2b8d572c4090e8ff450e", features = ["request_identity", "sha2_random_seed"] } diff --git a/python/restate/server_context.py b/python/restate/server_context.py index 56e8443..a45962a 100644 --- a/python/restate/server_context.py +++ b/python/restate/server_context.py @@ -747,16 +747,19 @@ def run( type_hint = signature.return_annotation serde = serde.with_maybe_type(type_hint) - handle = self.vm.sys_run(name) + run = self.vm.sys_run(name) + handle = run.handle update_restate_context_is_replaying(self.vm) if args is not None: noargs_action = typing.cast(RunAction[T], functools.partial(action, *args)) else: noargs_action = action - self.run_coros_to_execute[handle] = lambda: self.create_run_coroutine( - handle, noargs_action, serde, max_attempts, max_retry_duration, None, None, None - ) + if not run.replayed: + # Schedule the run closure only if the run wasn't replayed. + self.run_coros_to_execute[handle] = lambda: self.create_run_coroutine( + handle, noargs_action, serde, max_attempts, max_retry_duration, None, None, None + ) return self.create_future(handle, serde) # type: ignore def run_typed( @@ -779,20 +782,23 @@ def run_typed( # use core type as it is more specific. E.g. Optional[T] -> T options.type_hint = core_type options.serde = typing.cast(DefaultSerde, options.serde).with_maybe_type(options.type_hint) - handle = self.vm.sys_run(name) + run = self.vm.sys_run(name) + handle = run.handle update_restate_context_is_replaying(self.vm) func = typing.cast(RunAction[T], functools.partial(action, *args, **kwargs)) - self.run_coros_to_execute[handle] = lambda: self.create_run_coroutine( - handle, - func, - options.serde, - options.max_attempts, - options.max_duration, - options.initial_retry_interval, - options.max_retry_interval, - options.retry_interval_factor, - ) + if not run.replayed: + # Schedule the run closure only if the run wasn't replayed. + self.run_coros_to_execute[handle] = lambda: self.create_run_coroutine( + handle, + func, + options.serde, + options.max_attempts, + options.max_duration, + options.initial_retry_interval, + options.max_retry_interval, + options.retry_interval_factor, + ) return self.create_future(handle, options.serde) def sleep(self, delta: timedelta, name: Optional[str] = None) -> RestateDurableSleepFuture: diff --git a/python/restate/vm.py b/python/restate/vm.py index 58d5369..d3345f4 100644 --- a/python/restate/vm.py +++ b/python/restate/vm.py @@ -34,6 +34,7 @@ PyDoProgressExecuteRun, PyDoProgressCancelSignalReceived, PyUnresolvedFuture, + PyRun, CANCEL_NOTIFICATION_HANDLE, ) # pylint: disable=import-error,no-name-in-module,line-too-long @@ -130,6 +131,19 @@ class DoProgressCancelSignalReceived: """ +@dataclass(frozen=True) +class Run: + """ + Represents the result of registering a run. + + Holds the run notification handle and whether the run was replayed, + in which case the run closure must not be scheduled for execution. + """ + + replayed: bool + handle: int + + DO_PROGRESS_ANY_COMPLETED = DoProgressAnyCompleted() DO_PROGRESS_WAIT_EXTERNAL_PROGRESS = DoProgressWaitExternalProgress() DO_PROGRESS_CANCEL_SIGNAL_RECEIVED = DoProgressCancelSignalReceived() @@ -395,11 +409,12 @@ def sys_send( py_headers = [PyHeader(key=h[0], value=h[1]) for h in headers] if headers else None return self.vm.sys_send(service, handler, parameter, key, delay, idempotency_key, py_headers) - def sys_run(self, name: str) -> int: + def sys_run(self, name: str) -> Run: """ Register a run """ - return self.vm.sys_run(name) + run: PyRun = self.vm.sys_run(name) + return Run(replayed=run.replayed, handle=run.handle) def sys_awakeable(self) -> typing.Tuple[str, int]: """ diff --git a/src/lib.rs b/src/lib.rs index f84bb34..83a9440 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,9 +3,10 @@ use pyo3::prelude::*; use pyo3::types::{PyBytes, PyNone, PyString}; use restate_sdk_shared_core::fmt::{set_error_formatter, ErrorFormatter}; use restate_sdk_shared_core::{ - AwaitResponse, CallHandle, CoreVM, Error, Header, IdentityVerifier, Input, NonEmptyValue, - NotificationHandle, ResponseHead, RetryPolicy, RunExitResult, TakeOutputResult, Target, - TerminalFailure, UnresolvedFuture, VMOptions, Value, CANCEL_NOTIFICATION_HANDLE, VM, + AwaitResponse, AwakeableHandle, CallHandle, CoreVM, Error, Header, IdentityVerifier, Input, + NonEmptyValue, NotificationHandle, OnMaxAttempts, ResponseHead, RetryPolicy, RunExitResult, + RunHandle, TakeOutputResult, Target, TerminalFailure, UnresolvedFuture, VMOptions, Value, + CANCEL_NOTIFICATION_HANDLE, VM, }; use std::fmt; use std::time::{Duration, SystemTime}; @@ -175,6 +176,7 @@ impl From for RetryPolicy { .max_interval .map(Duration::from_millis) .or_else(|| Some(Duration::from_secs(10))), + on_max_attempts: OnMaxAttempts::FailAsTerminal, } } else { // Let's use retry policy infinite here, which will give back control to the invocation retry policy @@ -317,6 +319,23 @@ impl From for PyCallHandle { } } +#[pyclass] +pub struct PyRun { + #[pyo3(get)] + replayed: bool, + #[pyo3(get)] + handle: PyNotificationHandle, +} + +impl From for PyRun { + fn from(value: RunHandle) -> Self { + PyRun { + replayed: value.replayed, + handle: value.handle.into(), + } + } +} + // Errors and Exceptions #[derive(Debug)] @@ -615,7 +634,7 @@ impl PyVM { self_ .vm .sys_awakeable() - .map(|(id, handle)| (id, handle.into())) + .map(|AwakeableHandle { id, handle }| (id, handle.into())) .map_err(Into::into) } @@ -699,11 +718,9 @@ impl PyVM { .map_err(Into::into) } - /// Returns the associated `PyNotificationHandle`. - fn sys_run( - mut self_: PyRefMut<'_, Self>, - name: String, - ) -> Result { + /// Returns the associated `PyRun`, holding the run notification handle and + /// whether the run was replayed. + fn sys_run(mut self_: PyRefMut<'_, Self>, name: String) -> Result { self_.vm.sys_run(name).map(Into::into).map_err(Into::into) } @@ -780,6 +797,7 @@ impl PyVM { interval: delay_override_ms.map(Duration::from_millis), max_attempts: max_retry_attempts_override, max_duration: max_retry_duration_override_ms.map(Duration::from_millis), + on_max_attempts: OnMaxAttempts::FailAsTerminal, } } else { RetryPolicy::Infinite @@ -838,7 +856,7 @@ impl PyVM { } fn is_replaying(self_: PyRef<'_, Self>) -> bool { - self_.vm.is_replaying() + self_.vm.state().is_replaying() } } @@ -937,6 +955,7 @@ fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add("VMException", m.py().get_type::())?; m.add(