I’m having user editing many objects fairly quickly (e.g. to generate report, editing text stored in objects, that eventually leads to a full document once concatenated).
How can I “lock” an object when a user is editing it ?
I’m having user editing many objects fairly quickly (e.g. to generate report, editing text stored in objects, that eventually leads to a full document once concatenated).
How can I “lock” an object when a user is editing it ?
There are multiple approaches.
The most standard one is to use Action and object to “store state” of who is the “owner” of the resource and so use this user_rid as part of the action submission criteria to enforce that only the “owner” can actually edit this object (like a lock). The “set owner” action can still be called even if you are not an owner to avoid deadlock where a user logout without setting the owner back to “null” to release the lock.
In this solution, the persistence of the “owner” makes it more fit for “editing rights” that last some time (e.g. hours to days) like a “ticket”, or a “task”.
If the change of owner is very frequent, this might generate behaviors that don’t fit all workflows, like frequent refreshes of the objects edited, hence triggering refreshes of objects tables or other widgets in Workshop. That being said, “refresh on update” can be configured to only trigger for the displayed object.
A lot is taken care-of for you in this solution. Users visibility, editing permissions, actual locking of the object, etc.
Another possible solution is based on compute modules. The general idea is to use compute module as a temporary in-memory cache of “who is editing what. This won’t enforce or block users to edit at an Action level, but rather at a Workshop/UI level only. This is rather a way to surface information for the user, and on them to act upon it (e.g. reach out to the other person that edits it to coordinate).
The compute module can then have 2 main methods: get_editors to fetch the editors of a given resource rid, and claim_edit to register a user editing a given resource. Below are the prototype of those 2 mains functions. The full code is at the end of this post.
Note: To simplify how Workshop processes the result of those function, a “get_editors_simple” function is exposed, which only return a list of user_rids without a nested struct. Additional functions are available to simplify the interaction and debugging of the compute module, but strictly speaking only the 2 above main functions are required.
Note: The release doesn’t need to be “explicit”, as the compute module will “wipe out” any editor that didn’t recorded they were editing a resource in the last minute. This can be tweaked per use-case.
@function
def claim_edit(context, event: ClaimEditRequest) -> ClaimEditResponse:
"""
Example:
Input: {"resource_id": "document-123", "user_id": "alice@example.com"}
Output: {
"status": "claimed",
"resource_id": "document-123",
"user_id": "alice@example.com",
"claim_timestamp": 1234567890.123,
"current_editors": ["alice@example.com", "bob@example.com"]
# Note: user_rids are stored and not emails, illustration purposes only
}
"""
@function
def get_editors(context, event: GetEditorsRequest) -> GetEditorsResponse:
"""
Example:
Input: {"resource_id": "document-123"}
Output: {
"resource_id": "document-123",
"editors": [
{
"user_id": "alice@example.com", # Note: same as above
"last_claim_timestamp": 1234567890.123,
"last_claim_datetime": "2024-01-01T12:00:00",
"seconds_since_claim": 5.2
}
],
"editor_count": 1
}
"""
In compute module, those functions can then be registered and exposed by the compute module.
Example configuration to “submit a user editing a resource” from a button click (but could be any event trigger)
Example configuration to display a list of users editing the resource
Two ways to improve this example implementation:
this and that users are editing and 1 additional user you can’t see)Function to resolve user rid to usernames
import { Users } from '@foundry/functions-api';
// To use in your function
const user = await Users.getUserByIdAsync('62bc33ae-319d-4d42-b28c-902b20f64e20');
See https://www.palantir.com/docs/foundry/functions/configure-notifications
Full code of the compute module
# - This file is the main entry point into your Compute Module
# - All functions defined in this file are callable using the function name as the 'queryType'
# - Each function must take 2 args, with the 2nd param being the actual "payload" of your function
# - Each function must return something serializable by `json.dumps`
#
# See the README page of this repo for more details
import logging
import threading
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List
from compute_modules.annotations import function
from compute_modules.logging import get_logger
logger = get_logger(__name__)
logger.setLevel(logging.INFO)
# =============================================================================
# WHO IS EDITING - State Management
# =============================================================================
# Global state to track who is editing what
# Structure: {resource_id: {user_id: last_claim_timestamp}}
_editing_state: Dict[str, Dict[str, float]] = {}
_state_lock = threading.Lock()
# Configuration
CLAIM_TIMEOUT_SECONDS = 60 # Claims older than this are considered stale
CLEANUP_INTERVAL_SECONDS = 30 # How often to run the cleanup task
def _cleanup_stale_claims():
"""
Background task that periodically removes stale claims.
This ensures memory doesn't grow unbounded and users are automatically
removed if they haven't claimed editing for a while.
"""
while True:
try:
time.sleep(CLEANUP_INTERVAL_SECONDS)
current_time = time.time()
cutoff_time = current_time - CLAIM_TIMEOUT_SECONDS
with _state_lock:
resources_to_remove = []
for resource_id, editors in _editing_state.items():
# Remove stale user claims
stale_users = [
user_id
for user_id, claim_time in editors.items()
if claim_time < cutoff_time
]
for user_id in stale_users:
del editors[user_id]
logger.debug(
f"Cleaned up stale claim: resource={resource_id}, "
f"user={user_id}"
)
# Mark empty resources for removal
if not editors:
resources_to_remove.append(resource_id)
# Remove resources with no active editors
for resource_id in resources_to_remove:
del _editing_state[resource_id]
logger.debug(f"Removed empty resource: {resource_id}")
if resources_to_remove or any(stale_users for stale_users in []):
logger.info(
f"Cleanup completed: removed {len(resources_to_remove)} "
f"empty resources"
)
except Exception as e:
logger.error(f"Error in cleanup task: {e}", exc_info=True)
# Start the cleanup thread when the module loads
_cleanup_thread = threading.Thread(target=_cleanup_stale_claims, daemon=True)
_cleanup_thread.start()
logger.info("Background cleanup thread started")
# =============================================================================
# Data Classes for Request/Response Types
# =============================================================================
@dataclass
class ClaimEditRequest:
"""
Request payload for claiming edit on a resource.
"""
resource_id: str
user_id: str
@dataclass
class ClaimEditResponse:
"""
Response from claiming edit on a resource.
"""
status: str
resource_id: str
user_id: str
claim_timestamp: float
claim_datetime: str
current_editors: List[str]
timeout_seconds: int
@dataclass
class GetEditorsRequest:
"""
Request payload for querying who is currently editing a resource.
"""
resource_id: str
@dataclass
class EditorInfo:
"""Information about a user currently editing a resource."""
user_id: str
last_claim_timestamp: float
last_claim_datetime: str
seconds_since_claim: float
@dataclass
class GetEditorsResponse:
"""
Response containing list of current editors for a resource.
"""
resource_id: str
editors: List[EditorInfo]
editor_count: int
timeout_seconds: int
query_timestamp: float
query_datetime: str
@dataclass
class GetEditorsSimpleRequest:
"""
Request payload for querying who is currently editing a resource (simple version).
"""
resource_id: str
@dataclass
class GetEditorsSimpleResponse:
"""
Simple response containing just the list of current editor user IDs.
"""
editors: List[str]
@dataclass
class ReleaseEditRequest:
"""
Request payload for releasing an edit claim.
"""
resource_id: str
user_id: str
@dataclass
class ReleaseEditResponse:
"""
Response from releasing an edit claim.
"""
status: str
resource_id: str
user_id: str
message: str
@dataclass
class GetAllActiveResourcesRequest:
"""
Request payload for getting all active resources (empty).
"""
pass
@dataclass
class ResourceInfo:
"""Information about a resource being edited."""
resource_id: str
editor_count: int
editors: List[str]
@dataclass
class GetAllActiveResourcesResponse:
"""
Response containing all active resources.
"""
resources: List[ResourceInfo]
total_resources: int
timeout_seconds: int
query_timestamp: float
query_datetime: str
@dataclass
class GetHealthRequest:
"""
Request payload for health check (empty).
"""
pass
@dataclass
class HealthMetrics:
"""Health check metrics."""
total_resources: int
total_claims: int
cleanup_thread_alive: bool
@dataclass
class HealthConfig:
"""Health check configuration."""
claim_timeout_seconds: int
cleanup_interval_seconds: int
@dataclass
class GetHealthResponse:
"""
Response from health check endpoint.
"""
status: str
timestamp: float
datetime: str
metrics: HealthMetrics
config: HealthConfig
# =============================================================================
# API Functions
# =============================================================================
@function
def claim_edit(context, event: ClaimEditRequest) -> ClaimEditResponse:
"""
Claim that a user is currently editing a resource.
This function should be called periodically (e.g., every 30 seconds) by the
client to maintain the claim. If not called within CLAIM_TIMEOUT_SECONDS,
the user will be automatically removed from the editors list.
Args:
context: Compute module context (contains job metadata)
event: ClaimEditRequest with resource_id and user_id
Returns:
ClaimEditResponse with status and current editors for the resource
Example:
Input: {"resource_id": "document-123", "user_id": "alice@example.com"}
Output: {
"status": "claimed",
"resource_id": "document-123",
"user_id": "alice@example.com",
"claim_timestamp": 1234567890.123,
"current_editors": ["alice@example.com", "bob@example.com"]
}
"""
resource_id = event.resource_id
user_id = event.user_id
current_time = time.time()
logger.info(f"Claim edit request: resource={resource_id}, user={user_id}")
with _state_lock:
# Initialize resource if it doesn't exist
if resource_id not in _editing_state:
_editing_state[resource_id] = {}
# Update or create the claim
_editing_state[resource_id][user_id] = current_time
# Get all current editors (excluding stale ones)
cutoff_time = current_time - CLAIM_TIMEOUT_SECONDS
current_editors = [
uid
for uid, claim_time in _editing_state[resource_id].items()
if claim_time >= cutoff_time
]
logger.info(
f"Claim successful: resource={resource_id}, user={user_id}, "
f"total_editors={len(current_editors)}"
)
return ClaimEditResponse(
status="claimed",
resource_id=resource_id,
user_id=user_id,
claim_timestamp=current_time,
claim_datetime=datetime.fromtimestamp(current_time).isoformat(),
current_editors=sorted(current_editors),
timeout_seconds=CLAIM_TIMEOUT_SECONDS,
)
@function
def get_editors(context, event: GetEditorsRequest) -> GetEditorsResponse:
"""
Get the list of users currently editing a resource.
Only returns users who have claimed editing within the last
CLAIM_TIMEOUT_SECONDS seconds.
Args:
context: Compute module context (contains job metadata)
event: GetEditorsRequest with resource_id
Returns:
GetEditorsResponse with resource_id and list of current editors with their claim info
Example:
Input: {"resource_id": "document-123"}
Output: {
"resource_id": "document-123",
"editors": [
{
"user_id": "alice@example.com",
"last_claim_timestamp": 1234567890.123,
"last_claim_datetime": "2024-01-01T12:00:00",
"seconds_since_claim": 5.2
}
],
"editor_count": 1
}
"""
resource_id = event.resource_id
current_time = time.time()
cutoff_time = current_time - CLAIM_TIMEOUT_SECONDS
logger.info(f"Get editors request: resource={resource_id}")
with _state_lock:
editors_info: List[EditorInfo] = []
if resource_id in _editing_state:
for user_id, claim_time in _editing_state[resource_id].items():
# Only include non-stale claims
if claim_time >= cutoff_time:
seconds_since = current_time - claim_time
editors_info.append(
EditorInfo(
user_id=user_id,
last_claim_timestamp=claim_time,
last_claim_datetime=datetime.fromtimestamp(
claim_time
).isoformat(),
seconds_since_claim=round(seconds_since, 2),
)
)
# Sort by most recent claim first
editors_info.sort(key=lambda x: x.last_claim_timestamp, reverse=True)
logger.info(
f"Get editors response: resource={resource_id}, "
f"editor_count={len(editors_info)}"
)
return GetEditorsResponse(
resource_id=resource_id,
editors=editors_info,
editor_count=len(editors_info),
timeout_seconds=CLAIM_TIMEOUT_SECONDS,
query_timestamp=current_time,
query_datetime=datetime.fromtimestamp(current_time).isoformat(),
)
@function
def get_editors_simple(
context, event: GetEditorsSimpleRequest
) -> GetEditorsSimpleResponse:
"""
Get a simple list of user IDs currently editing a resource.
This is a simplified version of get_editors that returns only an array of strings,
making it compatible with applications that don't support nested structures.
Only returns users who have claimed editing within the last
CLAIM_TIMEOUT_SECONDS seconds.
Args:
context: Compute module context (contains job metadata)
event: GetEditorsSimpleRequest with resource_id
Returns:
GetEditorsSimpleResponse with just a list of editor user IDs
Example:
Input: {"resource_id": "document-123"}
Output: {
"editors": ["alice@example.com", "bob@example.com"]
}
"""
resource_id = event.resource_id
current_time = time.time()
cutoff_time = current_time - CLAIM_TIMEOUT_SECONDS
logger.info(f"Get editors (simple) request: resource={resource_id}")
with _state_lock:
current_editors: List[str] = []
if resource_id in _editing_state:
for user_id, claim_time in _editing_state[resource_id].items():
# Only include non-stale claims
if claim_time >= cutoff_time:
current_editors.append(user_id)
# Sort alphabetically for consistent ordering
current_editors.sort()
logger.info(
f"Get editors (simple) response: resource={resource_id}, "
f"editor_count={len(current_editors)}"
)
return GetEditorsSimpleResponse(editors=current_editors)
@function
def get_all_active_resources(
context, event: GetAllActiveResourcesRequest
) -> GetAllActiveResourcesResponse:
"""
Get a list of all resources that currently have active editors.
This is useful for monitoring and debugging purposes.
Args:
context: Compute module context
event: GetAllActiveResourcesRequest (empty)
Returns:
GetAllActiveResourcesResponse with list of all active resources and their editor counts
Example:
Output: {
"resources": [
{
"resource_id": "document-123",
"editor_count": 2,
"editors": ["alice@example.com", "bob@example.com"]
}
],
"total_resources": 1
}
"""
current_time = time.time()
cutoff_time = current_time - CLAIM_TIMEOUT_SECONDS
logger.info("Get all active resources request")
with _state_lock:
active_resources: List[ResourceInfo] = []
for resource_id, editors in _editing_state.items():
# Filter out stale claims
active_editors = [
user_id
for user_id, claim_time in editors.items()
if claim_time >= cutoff_time
]
if active_editors:
active_resources.append(
ResourceInfo(
resource_id=resource_id,
editor_count=len(active_editors),
editors=sorted(active_editors),
)
)
# Sort by editor count (most active first)
active_resources.sort(key=lambda x: x.editor_count, reverse=True)
logger.info(f"Active resources response: total={len(active_resources)}")
return GetAllActiveResourcesResponse(
resources=active_resources,
total_resources=len(active_resources),
timeout_seconds=CLAIM_TIMEOUT_SECONDS,
query_timestamp=current_time,
query_datetime=datetime.fromtimestamp(current_time).isoformat(),
)
@function
def release_edit(context, event: ReleaseEditRequest) -> ReleaseEditResponse:
"""
Explicitly release a user's claim on editing a resource.
This is optional - claims will automatically timeout. However, explicitly
releasing when a user closes a resource provides better UX.
Args:
context: Compute module context
event: ReleaseEditRequest with resource_id and user_id
Returns:
ReleaseEditResponse with status
Example:
Input: {"resource_id": "document-123", "user_id": "alice@example.com"}
Output: {
"status": "released",
"resource_id": "document-123",
"user_id": "alice@example.com"
}
"""
resource_id = event.resource_id
user_id = event.user_id
logger.info(f"Release edit request: resource={resource_id}, user={user_id}")
with _state_lock:
if resource_id in _editing_state:
if user_id in _editing_state[resource_id]:
del _editing_state[resource_id][user_id]
logger.info(f"Released claim: resource={resource_id}, user={user_id}")
# Clean up empty resource entries
if not _editing_state[resource_id]:
del _editing_state[resource_id]
logger.debug(f"Removed empty resource: {resource_id}")
return ReleaseEditResponse(
status="released",
resource_id=resource_id,
user_id=user_id,
message="Claim released successfully",
)
logger.warning(
f"Release failed - claim not found: resource={resource_id}, user={user_id}"
)
return ReleaseEditResponse(
status="not_found",
resource_id=resource_id,
user_id=user_id,
message="No active claim found for this user and resource",
)
@function
def get_health(context, event: GetHealthRequest) -> GetHealthResponse:
"""
Health check endpoint to verify the service is running properly.
Returns:
GetHealthResponse with health status and metrics
"""
current_time = time.time()
with _state_lock:
total_resources = len(_editing_state)
total_claims = sum(len(editors) for editors in _editing_state.values())
return GetHealthResponse(
status="healthy",
timestamp=current_time,
datetime=datetime.fromtimestamp(current_time).isoformat(),
metrics=HealthMetrics(
total_resources=total_resources,
total_claims=total_claims,
cleanup_thread_alive=_cleanup_thread.is_alive(),
),
config=HealthConfig(
claim_timeout_seconds=CLAIM_TIMEOUT_SECONDS,
cleanup_interval_seconds=CLEANUP_INTERVAL_SECONDS,
),
)
# =============================================================================
# Legacy example function (kept for reference)
# =============================================================================
@dataclass
class AddPayload:
x: int
y: int
@function
def add(context, event: AddPayload) -> int:
logger.info("Received add request")
return event.x + event.y
See https://www.palantir.com/docs/foundry/compute-modules/get-started