Skip to content

Commit 5734a33

Browse files
sanityclaude
andauthored
fix: cache contract state locally before forwarding client-initiated PUT (#2011)
Co-authored-by: Claude <noreply@anthropic.com>
1 parent 608e719 commit 5734a33

File tree

4 files changed

+90
-16
lines changed

4 files changed

+90
-16
lines changed

crates/core/src/operations/put.rs

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,22 +1096,48 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re
10961096
return Ok(());
10971097
}
10981098

1099-
// At least one peer found - forward to network
1099+
// At least one peer found - cache locally first, then forward to network
11001100
let target_peer = target.unwrap();
11011101

11021102
tracing::debug!(
11031103
tx = %id,
11041104
%key,
11051105
target_peer = %target_peer.peer,
11061106
target_location = ?target_peer.location,
1107-
"Forwarding PUT to target peer"
1107+
"Caching state locally before forwarding PUT to target peer"
1108+
);
1109+
1110+
// Cache the contract state locally before forwarding
1111+
// This ensures the publishing node has immediate access to the new state
1112+
let updated_value = put_contract(
1113+
op_manager,
1114+
key,
1115+
value.clone(),
1116+
related_contracts.clone(),
1117+
&contract,
1118+
)
1119+
.await
1120+
.map_err(|e| {
1121+
tracing::error!(
1122+
tx = %id,
1123+
%key,
1124+
error = %e,
1125+
"Failed to cache state locally before forwarding PUT"
1126+
);
1127+
e
1128+
})?;
1129+
1130+
tracing::debug!(
1131+
tx = %id,
1132+
%key,
1133+
"Local cache updated, now forwarding PUT to target peer"
11081134
);
11091135

11101136
put_op.state = Some(PutState::AwaitingResponse {
11111137
key,
11121138
upstream: None,
11131139
contract: contract.clone(),
1114-
state: value.clone(),
1140+
state: updated_value.clone(),
11151141
subscribe,
11161142
});
11171143

@@ -1121,7 +1147,7 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re
11211147
sender: own_location,
11221148
contract,
11231149
related_contracts,
1124-
value,
1150+
value: updated_value,
11251151
htl,
11261152
target: target_peer,
11271153
};

crates/core/src/operations/update.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1011,7 +1011,36 @@ pub(crate) async fn request_update(
10111011
};
10121012

10131013
// Normal case: we found a remote target
1014+
// Apply the update locally first to ensure the initiating peer has the updated state
10141015
let id = update_op.id;
1016+
1017+
tracing::debug!(
1018+
tx = %id,
1019+
%key,
1020+
target_peer = %target.peer,
1021+
"Applying UPDATE locally before forwarding to target peer"
1022+
);
1023+
1024+
// Apply update locally - this ensures the initiating peer serves the updated state
1025+
// even if the remote UPDATE times out or fails
1026+
let updated_value = update_contract(op_manager, key, value.clone(), related_contracts.clone())
1027+
.await
1028+
.map_err(|e| {
1029+
tracing::error!(
1030+
tx = %id,
1031+
%key,
1032+
error = %e,
1033+
"Failed to apply update locally before forwarding UPDATE"
1034+
);
1035+
e
1036+
})?;
1037+
1038+
tracing::debug!(
1039+
tx = %id,
1040+
%key,
1041+
"Local update complete, now forwarding UPDATE to target peer"
1042+
);
1043+
10151044
if let Some(stats) = &mut update_op.stats {
10161045
stats.target = Some(target.clone());
10171046
}
@@ -1026,7 +1055,7 @@ pub(crate) async fn request_update(
10261055
sender,
10271056
related_contracts,
10281057
target,
1029-
value,
1058+
value: updated_value, // Send the updated value, not the original
10301059
};
10311060

10321061
let op = UpdateOp {

crates/core/tests/connectivity.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,17 @@ async fn test_gateway_reconnection() -> TestResult {
226226
recv_contract.as_ref().expect("Contract should exist").key(),
227227
contract_key
228228
);
229+
if recv_state != wrapped_state {
230+
eprintln!("State mismatch!");
231+
eprintln!(
232+
"Expected state: {:?}",
233+
String::from_utf8_lossy(wrapped_state.as_ref())
234+
);
235+
eprintln!(
236+
"Received state: {:?}",
237+
String::from_utf8_lossy(recv_state.as_ref())
238+
);
239+
}
229240
assert_eq!(recv_state, wrapped_state);
230241
tracing::info!("Initial GET successful");
231242
}

tests/test-contract-integration/src/lib.rs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ impl ContractInterface for Contract {
8686
data: Vec<UpdateData<'static>>,
8787
) -> Result<UpdateModification<'static>, ContractError> {
8888
// Deserialize the current state
89-
let mut todo_list: TodoList = match serde_json::from_slice(state.as_ref()) {
89+
let original_state_bytes = state.as_ref();
90+
let mut todo_list: TodoList = match serde_json::from_slice(original_state_bytes) {
9091
Ok(list) => list,
9192
Err(e) => return Err(ContractError::Deser(e.to_string())),
9293
};
@@ -161,16 +162,23 @@ impl ContractInterface for Contract {
161162
}
162163
}
163164

164-
// Increment the state version
165-
todo_list.version += 1;
166-
167-
// Serialize the new state
168-
let new_state = match serde_json::to_vec(&todo_list) {
169-
Ok(bytes) => State::from(bytes),
170-
Err(e) => return Err(ContractError::Other(e.to_string())),
171-
};
172-
173-
Ok(UpdateModification::valid(new_state))
165+
// Check if the state actually changed by comparing serialized forms
166+
let new_state_bytes =
167+
serde_json::to_vec(&todo_list).map_err(|e| ContractError::Other(e.to_string()))?;
168+
let state_changed = original_state_bytes != new_state_bytes.as_slice();
169+
170+
// Only increment version if the state actually changed
171+
// This prevents double-incrementing when the same state is merged at multiple peers
172+
if state_changed {
173+
todo_list.version += 1;
174+
// Re-serialize with incremented version
175+
let new_state =
176+
serde_json::to_vec(&todo_list).map_err(|e| ContractError::Other(e.to_string()))?;
177+
Ok(UpdateModification::valid(State::from(new_state)))
178+
} else {
179+
// Reuse already serialized bytes since state didn't change
180+
Ok(UpdateModification::valid(State::from(new_state_bytes)))
181+
}
174182
}
175183

176184
fn summarize_state(

0 commit comments

Comments
 (0)