Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ doc = false
[dependencies]
pyo3 = { version = "0.25.1", features = ["extension-module"] }
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
restate-sdk-shared-core = { git = "https://github.com/restatedev/sdk-shared-core.git", rev = "f6f6e4830226161a441dd7f1063e255cdc4052c1", features = ["request_identity", "sha2_random_seed"] }
restate-sdk-shared-core = { git = "https://github.com/restatedev/sdk-shared-core.git", rev = "5127f0291bff456a515f2b8d572c4090e8ff450e", features = ["request_identity", "sha2_random_seed"] }
36 changes: 21 additions & 15 deletions python/restate/server_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,16 +747,19 @@ def run(
type_hint = signature.return_annotation
serde = serde.with_maybe_type(type_hint)

handle = self.vm.sys_run(name)
run = self.vm.sys_run(name)
handle = run.handle
update_restate_context_is_replaying(self.vm)

if args is not None:
noargs_action = typing.cast(RunAction[T], functools.partial(action, *args))
else:
noargs_action = action
self.run_coros_to_execute[handle] = lambda: self.create_run_coroutine(
handle, noargs_action, serde, max_attempts, max_retry_duration, None, None, None
)
if not run.replayed:
# Schedule the run closure only if the run wasn't replayed.
self.run_coros_to_execute[handle] = lambda: self.create_run_coroutine(
handle, noargs_action, serde, max_attempts, max_retry_duration, None, None, None
)
return self.create_future(handle, serde) # type: ignore

def run_typed(
Expand All @@ -779,20 +782,23 @@ def run_typed(
# use core type as it is more specific. E.g. Optional[T] -> T
options.type_hint = core_type
options.serde = typing.cast(DefaultSerde, options.serde).with_maybe_type(options.type_hint)
handle = self.vm.sys_run(name)
run = self.vm.sys_run(name)
handle = run.handle
update_restate_context_is_replaying(self.vm)

func = typing.cast(RunAction[T], functools.partial(action, *args, **kwargs))
self.run_coros_to_execute[handle] = lambda: self.create_run_coroutine(
handle,
func,
options.serde,
options.max_attempts,
options.max_duration,
options.initial_retry_interval,
options.max_retry_interval,
options.retry_interval_factor,
)
if not run.replayed:
# Schedule the run closure only if the run wasn't replayed.
self.run_coros_to_execute[handle] = lambda: self.create_run_coroutine(
handle,
func,
options.serde,
options.max_attempts,
options.max_duration,
options.initial_retry_interval,
options.max_retry_interval,
options.retry_interval_factor,
)
return self.create_future(handle, options.serde)

def sleep(self, delta: timedelta, name: Optional[str] = None) -> RestateDurableSleepFuture:
Expand Down
19 changes: 17 additions & 2 deletions python/restate/vm.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
PyDoProgressExecuteRun,
PyDoProgressCancelSignalReceived,
PyUnresolvedFuture,
PyRun,
CANCEL_NOTIFICATION_HANDLE,
) # pylint: disable=import-error,no-name-in-module,line-too-long

Expand Down Expand Up @@ -130,6 +131,19 @@ class DoProgressCancelSignalReceived:
"""


@dataclass(frozen=True)
class Run:
"""
Represents the result of registering a run.

Holds the run notification handle and whether the run was replayed,
in which case the run closure must not be scheduled for execution.
"""

replayed: bool
handle: int


DO_PROGRESS_ANY_COMPLETED = DoProgressAnyCompleted()
DO_PROGRESS_WAIT_EXTERNAL_PROGRESS = DoProgressWaitExternalProgress()
DO_PROGRESS_CANCEL_SIGNAL_RECEIVED = DoProgressCancelSignalReceived()
Expand Down Expand Up @@ -395,11 +409,12 @@ def sys_send(
py_headers = [PyHeader(key=h[0], value=h[1]) for h in headers] if headers else None
return self.vm.sys_send(service, handler, parameter, key, delay, idempotency_key, py_headers)

def sys_run(self, name: str) -> int:
def sys_run(self, name: str) -> Run:
"""
Register a run
"""
return self.vm.sys_run(name)
run: PyRun = self.vm.sys_run(name)
return Run(replayed=run.replayed, handle=run.handle)

def sys_awakeable(self) -> typing.Tuple[str, int]:
"""
Expand Down
39 changes: 29 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyNone, PyString};
use restate_sdk_shared_core::fmt::{set_error_formatter, ErrorFormatter};
use restate_sdk_shared_core::{
AwaitResponse, CallHandle, CoreVM, Error, Header, IdentityVerifier, Input, NonEmptyValue,
NotificationHandle, ResponseHead, RetryPolicy, RunExitResult, TakeOutputResult, Target,
TerminalFailure, UnresolvedFuture, VMOptions, Value, CANCEL_NOTIFICATION_HANDLE, VM,
AwaitResponse, AwakeableHandle, CallHandle, CoreVM, Error, Header, IdentityVerifier, Input,
NonEmptyValue, NotificationHandle, OnMaxAttempts, ResponseHead, RetryPolicy, RunExitResult,
RunHandle, TakeOutputResult, Target, TerminalFailure, UnresolvedFuture, VMOptions, Value,
CANCEL_NOTIFICATION_HANDLE, VM,
};
use std::fmt;
use std::time::{Duration, SystemTime};
Expand Down Expand Up @@ -175,6 +176,7 @@ impl From<PyExponentialRetryConfig> for RetryPolicy {
.max_interval
.map(Duration::from_millis)
.or_else(|| Some(Duration::from_secs(10))),
on_max_attempts: OnMaxAttempts::FailAsTerminal,
}
} else {
// Let's use retry policy infinite here, which will give back control to the invocation retry policy
Expand Down Expand Up @@ -317,6 +319,23 @@ impl From<CallHandle> for PyCallHandle {
}
}

#[pyclass]
pub struct PyRun {
#[pyo3(get)]
replayed: bool,
#[pyo3(get)]
handle: PyNotificationHandle,
}

impl From<RunHandle> for PyRun {
fn from(value: RunHandle) -> Self {
PyRun {
replayed: value.replayed,
handle: value.handle.into(),
}
}
}

// Errors and Exceptions

#[derive(Debug)]
Expand Down Expand Up @@ -615,7 +634,7 @@ impl PyVM {
self_
.vm
.sys_awakeable()
.map(|(id, handle)| (id, handle.into()))
.map(|AwakeableHandle { id, handle }| (id, handle.into()))
.map_err(Into::into)
}

Expand Down Expand Up @@ -699,11 +718,9 @@ impl PyVM {
.map_err(Into::into)
}

/// Returns the associated `PyNotificationHandle`.
fn sys_run(
mut self_: PyRefMut<'_, Self>,
name: String,
) -> Result<PyNotificationHandle, PyVMError> {
/// Returns the associated `PyRun`, holding the run notification handle and
/// whether the run was replayed.
fn sys_run(mut self_: PyRefMut<'_, Self>, name: String) -> Result<PyRun, PyVMError> {
self_.vm.sys_run(name).map(Into::into).map_err(Into::into)
}

Expand Down Expand Up @@ -780,6 +797,7 @@ impl PyVM {
interval: delay_override_ms.map(Duration::from_millis),
max_attempts: max_retry_attempts_override,
max_duration: max_retry_duration_override_ms.map(Duration::from_millis),
on_max_attempts: OnMaxAttempts::FailAsTerminal,
}
} else {
RetryPolicy::Infinite
Expand Down Expand Up @@ -838,7 +856,7 @@ impl PyVM {
}

fn is_replaying(self_: PyRef<'_, Self>) -> bool {
self_.vm.is_replaying()
self_.vm.state().is_replaying()
}
}

Expand Down Expand Up @@ -937,6 +955,7 @@ fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyDoProgressCancelSignalReceived>()?;
m.add_class::<PyUnresolvedFuture>()?;
m.add_class::<PyCallHandle>()?;
m.add_class::<PyRun>()?;

m.add("VMException", m.py().get_type::<VMException>())?;
m.add(
Expand Down
Loading