How to see "who is editing this object" in Workshop?

I’m having user editing many objects fairly quickly (e.g. to generate report, editing text stored in objects, that eventually leads to a full document once concatenated).

How can I “lock” an object when a user is editing it ?

There are multiple approaches.

Idea 1 - Standard Action and Object editing

The most standard one is to use Action and object to “store state” of who is the “owner” of the resource and so use this user_rid as part of the action submission criteria to enforce that only the “owner” can actually edit this object (like a lock). The “set owner” action can still be called even if you are not an owner to avoid deadlock where a user logout without setting the owner back to “null” to release the lock.

In this solution, the persistence of the “owner” makes it more fit for “editing rights” that last some time (e.g. hours to days) like a “ticket”, or a “task”.

If the change of owner is very frequent, this might generate behaviors that don’t fit all workflows, like frequent refreshes of the objects edited, hence triggering refreshes of objects tables or other widgets in Workshop. That being said, “refresh on update” can be configured to only trigger for the displayed object.

A lot is taken care-of for you in this solution. Users visibility, editing permissions, actual locking of the object, etc.

Idea 2 - Compute module

Another possible solution is based on compute modules. The general idea is to use compute module as a temporary in-memory cache of “who is editing what. This won’t enforce or block users to edit at an Action level, but rather at a Workshop/UI level only. This is rather a way to surface information for the user, and on them to act upon it (e.g. reach out to the other person that edits it to coordinate).

The compute module can then have 2 main methods: get_editors to fetch the editors of a given resource rid, and claim_edit to register a user editing a given resource. Below are the prototype of those 2 mains functions. The full code is at the end of this post.

Note: To simplify how Workshop processes the result of those function, a “get_editors_simple” function is exposed, which only return a list of user_rids without a nested struct. Additional functions are available to simplify the interaction and debugging of the compute module, but strictly speaking only the 2 above main functions are required.
Note: The release doesn’t need to be “explicit”, as the compute module will “wipe out” any editor that didn’t recorded they were editing a resource in the last minute. This can be tweaked per use-case.

@function
def claim_edit(context, event: ClaimEditRequest) -> ClaimEditResponse:
    """
    Example:
        Input: {"resource_id": "document-123", "user_id": "alice@example.com"}
        Output: {
            "status": "claimed",
            "resource_id": "document-123",
            "user_id": "alice@example.com",
            "claim_timestamp": 1234567890.123,
            "current_editors": ["alice@example.com", "bob@example.com"] 
            # Note: user_rids are stored and not emails, illustration purposes only
        }
    """

@function
def get_editors(context, event: GetEditorsRequest) -> GetEditorsResponse:
    """
    Example:
        Input: {"resource_id": "document-123"}
        Output: {
            "resource_id": "document-123",
            "editors": [
                {
                    "user_id": "alice@example.com", # Note: same as above
                    "last_claim_timestamp": 1234567890.123,
                    "last_claim_datetime": "2024-01-01T12:00:00",
                    "seconds_since_claim": 5.2
                }
            ],
            "editor_count": 1
        }
    """

In compute module, those functions can then be registered and exposed by the compute module.

Example configuration to “submit a user editing a resource” from a button click (but could be any event trigger)

Example configuration to display a list of users editing the resource

Two ways to improve this example implementation:

  • Converting the list of user rid to a list of username/names can be done via a function (code below). It is important not to store those attributes (usernames/names) directly, as the visibility of a user to another user is not always guaranteed (it is possible for users to not be able to “see” each other via permissions). An improved version could take this into account (e.g. display this and that users are editing and 1 additional user you can’t see)
  • Making sure the “editing state” is refreshed often enough. Currently, the “editing state” is kept for 60sec before being flushed, which means that workshop needs to keep triggering the editing function regularly. There is no built-in mechanism in Workshop to perform this recurring “event sending”. One way to solve this is via a custom react widget or a Slate embed, which have the flexibility to send event at regular intervals. Using a trick (like on user editing a text area) would also be sufficient for a Workshop-only implementation.

Function to resolve user rid to usernames

import { Users } from '@foundry/functions-api';
// To use in your function
const user = await Users.getUserByIdAsync('62bc33ae-319d-4d42-b28c-902b20f64e20');

See https://www.palantir.com/docs/foundry/functions/configure-notifications

Full code of the compute module

# - This file is the main entry point into your Compute Module
# - All functions defined in this file are callable using the function name as the 'queryType'
# - Each function must take 2 args, with the 2nd param being the actual "payload" of your function
# - Each function must return something serializable by `json.dumps`
#
# See the README page of this repo for more details

import logging
import threading
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List
from compute_modules.annotations import function
from compute_modules.logging import get_logger

logger = get_logger(__name__)
logger.setLevel(logging.INFO)

# =============================================================================
# WHO IS EDITING - State Management
# =============================================================================

# Global state to track who is editing what
# Structure: {resource_id: {user_id: last_claim_timestamp}}
_editing_state: Dict[str, Dict[str, float]] = {}
_state_lock = threading.Lock()

# Configuration
CLAIM_TIMEOUT_SECONDS = 60  # Claims older than this are considered stale
CLEANUP_INTERVAL_SECONDS = 30  # How often to run the cleanup task


def _cleanup_stale_claims():
    """
    Background task that periodically removes stale claims.
    This ensures memory doesn't grow unbounded and users are automatically
    removed if they haven't claimed editing for a while.
    """
    while True:
        try:
            time.sleep(CLEANUP_INTERVAL_SECONDS)
            current_time = time.time()
            cutoff_time = current_time - CLAIM_TIMEOUT_SECONDS

            with _state_lock:
                resources_to_remove = []

                for resource_id, editors in _editing_state.items():
                    # Remove stale user claims
                    stale_users = [
                        user_id
                        for user_id, claim_time in editors.items()
                        if claim_time < cutoff_time
                    ]

                    for user_id in stale_users:
                        del editors[user_id]
                        logger.debug(
                            f"Cleaned up stale claim: resource={resource_id}, "
                            f"user={user_id}"
                        )

                    # Mark empty resources for removal
                    if not editors:
                        resources_to_remove.append(resource_id)

                # Remove resources with no active editors
                for resource_id in resources_to_remove:
                    del _editing_state[resource_id]
                    logger.debug(f"Removed empty resource: {resource_id}")

                if resources_to_remove or any(stale_users for stale_users in []):
                    logger.info(
                        f"Cleanup completed: removed {len(resources_to_remove)} "
                        f"empty resources"
                    )

        except Exception as e:
            logger.error(f"Error in cleanup task: {e}", exc_info=True)


# Start the cleanup thread when the module loads
_cleanup_thread = threading.Thread(target=_cleanup_stale_claims, daemon=True)
_cleanup_thread.start()
logger.info("Background cleanup thread started")


# =============================================================================
# Data Classes for Request/Response Types
# =============================================================================


@dataclass
class ClaimEditRequest:
    """
    Request payload for claiming edit on a resource.
    """

    resource_id: str
    user_id: str


@dataclass
class ClaimEditResponse:
    """
    Response from claiming edit on a resource.
    """

    status: str
    resource_id: str
    user_id: str
    claim_timestamp: float
    claim_datetime: str
    current_editors: List[str]
    timeout_seconds: int


@dataclass
class GetEditorsRequest:
    """
    Request payload for querying who is currently editing a resource.
    """

    resource_id: str


@dataclass
class EditorInfo:
    """Information about a user currently editing a resource."""

    user_id: str
    last_claim_timestamp: float
    last_claim_datetime: str
    seconds_since_claim: float


@dataclass
class GetEditorsResponse:
    """
    Response containing list of current editors for a resource.
    """

    resource_id: str
    editors: List[EditorInfo]
    editor_count: int
    timeout_seconds: int
    query_timestamp: float
    query_datetime: str


@dataclass
class GetEditorsSimpleRequest:
    """
    Request payload for querying who is currently editing a resource (simple version).
    """

    resource_id: str


@dataclass
class GetEditorsSimpleResponse:
    """
    Simple response containing just the list of current editor user IDs.
    """

    editors: List[str]


@dataclass
class ReleaseEditRequest:
    """
    Request payload for releasing an edit claim.
    """

    resource_id: str
    user_id: str


@dataclass
class ReleaseEditResponse:
    """
    Response from releasing an edit claim.
    """

    status: str
    resource_id: str
    user_id: str
    message: str


@dataclass
class GetAllActiveResourcesRequest:
    """
    Request payload for getting all active resources (empty).
    """

    pass


@dataclass
class ResourceInfo:
    """Information about a resource being edited."""

    resource_id: str
    editor_count: int
    editors: List[str]


@dataclass
class GetAllActiveResourcesResponse:
    """
    Response containing all active resources.
    """

    resources: List[ResourceInfo]
    total_resources: int
    timeout_seconds: int
    query_timestamp: float
    query_datetime: str


@dataclass
class GetHealthRequest:
    """
    Request payload for health check (empty).
    """

    pass


@dataclass
class HealthMetrics:
    """Health check metrics."""

    total_resources: int
    total_claims: int
    cleanup_thread_alive: bool


@dataclass
class HealthConfig:
    """Health check configuration."""

    claim_timeout_seconds: int
    cleanup_interval_seconds: int


@dataclass
class GetHealthResponse:
    """
    Response from health check endpoint.
    """

    status: str
    timestamp: float
    datetime: str
    metrics: HealthMetrics
    config: HealthConfig


# =============================================================================
# API Functions
# =============================================================================
@function
def claim_edit(context, event: ClaimEditRequest) -> ClaimEditResponse:
    """
    Claim that a user is currently editing a resource.

    This function should be called periodically (e.g., every 30 seconds) by the
    client to maintain the claim. If not called within CLAIM_TIMEOUT_SECONDS,
    the user will be automatically removed from the editors list.

    Args:
        context: Compute module context (contains job metadata)
        event: ClaimEditRequest with resource_id and user_id

    Returns:
        ClaimEditResponse with status and current editors for the resource

    Example:
        Input: {"resource_id": "document-123", "user_id": "alice@example.com"}
        Output: {
            "status": "claimed",
            "resource_id": "document-123",
            "user_id": "alice@example.com",
            "claim_timestamp": 1234567890.123,
            "current_editors": ["alice@example.com", "bob@example.com"]
        }
    """
    resource_id = event.resource_id
    user_id = event.user_id
    current_time = time.time()

    logger.info(f"Claim edit request: resource={resource_id}, user={user_id}")

    with _state_lock:
        # Initialize resource if it doesn't exist
        if resource_id not in _editing_state:
            _editing_state[resource_id] = {}

        # Update or create the claim
        _editing_state[resource_id][user_id] = current_time

        # Get all current editors (excluding stale ones)
        cutoff_time = current_time - CLAIM_TIMEOUT_SECONDS
        current_editors = [
            uid
            for uid, claim_time in _editing_state[resource_id].items()
            if claim_time >= cutoff_time
        ]

    logger.info(
        f"Claim successful: resource={resource_id}, user={user_id}, "
        f"total_editors={len(current_editors)}"
    )

    return ClaimEditResponse(
        status="claimed",
        resource_id=resource_id,
        user_id=user_id,
        claim_timestamp=current_time,
        claim_datetime=datetime.fromtimestamp(current_time).isoformat(),
        current_editors=sorted(current_editors),
        timeout_seconds=CLAIM_TIMEOUT_SECONDS,
    )


@function
def get_editors(context, event: GetEditorsRequest) -> GetEditorsResponse:
    """
    Get the list of users currently editing a resource.

    Only returns users who have claimed editing within the last
    CLAIM_TIMEOUT_SECONDS seconds.

    Args:
        context: Compute module context (contains job metadata)
        event: GetEditorsRequest with resource_id

    Returns:
        GetEditorsResponse with resource_id and list of current editors with their claim info

    Example:
        Input: {"resource_id": "document-123"}
        Output: {
            "resource_id": "document-123",
            "editors": [
                {
                    "user_id": "alice@example.com",
                    "last_claim_timestamp": 1234567890.123,
                    "last_claim_datetime": "2024-01-01T12:00:00",
                    "seconds_since_claim": 5.2
                }
            ],
            "editor_count": 1
        }
    """
    resource_id = event.resource_id
    current_time = time.time()
    cutoff_time = current_time - CLAIM_TIMEOUT_SECONDS

    logger.info(f"Get editors request: resource={resource_id}")

    with _state_lock:
        editors_info: List[EditorInfo] = []

        if resource_id in _editing_state:
            for user_id, claim_time in _editing_state[resource_id].items():
                # Only include non-stale claims
                if claim_time >= cutoff_time:
                    seconds_since = current_time - claim_time
                    editors_info.append(
                        EditorInfo(
                            user_id=user_id,
                            last_claim_timestamp=claim_time,
                            last_claim_datetime=datetime.fromtimestamp(
                                claim_time
                            ).isoformat(),
                            seconds_since_claim=round(seconds_since, 2),
                        )
                    )

        # Sort by most recent claim first
        editors_info.sort(key=lambda x: x.last_claim_timestamp, reverse=True)

    logger.info(
        f"Get editors response: resource={resource_id}, "
        f"editor_count={len(editors_info)}"
    )

    return GetEditorsResponse(
        resource_id=resource_id,
        editors=editors_info,
        editor_count=len(editors_info),
        timeout_seconds=CLAIM_TIMEOUT_SECONDS,
        query_timestamp=current_time,
        query_datetime=datetime.fromtimestamp(current_time).isoformat(),
    )


@function
def get_editors_simple(
    context, event: GetEditorsSimpleRequest
) -> GetEditorsSimpleResponse:
    """
    Get a simple list of user IDs currently editing a resource.

    This is a simplified version of get_editors that returns only an array of strings,
    making it compatible with applications that don't support nested structures.

    Only returns users who have claimed editing within the last
    CLAIM_TIMEOUT_SECONDS seconds.

    Args:
        context: Compute module context (contains job metadata)
        event: GetEditorsSimpleRequest with resource_id

    Returns:
        GetEditorsSimpleResponse with just a list of editor user IDs

    Example:
        Input: {"resource_id": "document-123"}
        Output: {
            "editors": ["alice@example.com", "bob@example.com"]
        }
    """
    resource_id = event.resource_id
    current_time = time.time()
    cutoff_time = current_time - CLAIM_TIMEOUT_SECONDS

    logger.info(f"Get editors (simple) request: resource={resource_id}")

    with _state_lock:
        current_editors: List[str] = []
        if resource_id in _editing_state:
            for user_id, claim_time in _editing_state[resource_id].items():
                # Only include non-stale claims
                if claim_time >= cutoff_time:
                    current_editors.append(user_id)

        # Sort alphabetically for consistent ordering
        current_editors.sort()

    logger.info(
        f"Get editors (simple) response: resource={resource_id}, "
        f"editor_count={len(current_editors)}"
    )

    return GetEditorsSimpleResponse(editors=current_editors)


@function
def get_all_active_resources(
    context, event: GetAllActiveResourcesRequest
) -> GetAllActiveResourcesResponse:
    """
    Get a list of all resources that currently have active editors.

    This is useful for monitoring and debugging purposes.

    Args:
        context: Compute module context
        event: GetAllActiveResourcesRequest (empty)

    Returns:
        GetAllActiveResourcesResponse with list of all active resources and their editor counts

    Example:
        Output: {
            "resources": [
                {
                    "resource_id": "document-123",
                    "editor_count": 2,
                    "editors": ["alice@example.com", "bob@example.com"]
                }
            ],
            "total_resources": 1
        }
    """
    current_time = time.time()
    cutoff_time = current_time - CLAIM_TIMEOUT_SECONDS
    logger.info("Get all active resources request")

    with _state_lock:
        active_resources: List[ResourceInfo] = []

        for resource_id, editors in _editing_state.items():
            # Filter out stale claims
            active_editors = [
                user_id
                for user_id, claim_time in editors.items()
                if claim_time >= cutoff_time
            ]

            if active_editors:
                active_resources.append(
                    ResourceInfo(
                        resource_id=resource_id,
                        editor_count=len(active_editors),
                        editors=sorted(active_editors),
                    )
                )

        # Sort by editor count (most active first)
        active_resources.sort(key=lambda x: x.editor_count, reverse=True)

    logger.info(f"Active resources response: total={len(active_resources)}")

    return GetAllActiveResourcesResponse(
        resources=active_resources,
        total_resources=len(active_resources),
        timeout_seconds=CLAIM_TIMEOUT_SECONDS,
        query_timestamp=current_time,
        query_datetime=datetime.fromtimestamp(current_time).isoformat(),
    )


@function
def release_edit(context, event: ReleaseEditRequest) -> ReleaseEditResponse:
    """
    Explicitly release a user's claim on editing a resource.

    This is optional - claims will automatically timeout. However, explicitly
    releasing when a user closes a resource provides better UX.

    Args:
        context: Compute module context
        event: ReleaseEditRequest with resource_id and user_id

    Returns:
        ReleaseEditResponse with status

    Example:
        Input: {"resource_id": "document-123", "user_id": "alice@example.com"}
        Output: {
            "status": "released",
            "resource_id": "document-123",
            "user_id": "alice@example.com"
        }
    """
    resource_id = event.resource_id
    user_id = event.user_id

    logger.info(f"Release edit request: resource={resource_id}, user={user_id}")

    with _state_lock:
        if resource_id in _editing_state:
            if user_id in _editing_state[resource_id]:
                del _editing_state[resource_id][user_id]
                logger.info(f"Released claim: resource={resource_id}, user={user_id}")

                # Clean up empty resource entries
                if not _editing_state[resource_id]:
                    del _editing_state[resource_id]
                    logger.debug(f"Removed empty resource: {resource_id}")

                return ReleaseEditResponse(
                    status="released",
                    resource_id=resource_id,
                    user_id=user_id,
                    message="Claim released successfully",
                )

        logger.warning(
            f"Release failed - claim not found: resource={resource_id}, user={user_id}"
        )
        return ReleaseEditResponse(
            status="not_found",
            resource_id=resource_id,
            user_id=user_id,
            message="No active claim found for this user and resource",
        )


@function
def get_health(context, event: GetHealthRequest) -> GetHealthResponse:
    """
    Health check endpoint to verify the service is running properly.

    Returns:
        GetHealthResponse with health status and metrics
    """
    current_time = time.time()

    with _state_lock:
        total_resources = len(_editing_state)
        total_claims = sum(len(editors) for editors in _editing_state.values())

    return GetHealthResponse(
        status="healthy",
        timestamp=current_time,
        datetime=datetime.fromtimestamp(current_time).isoformat(),
        metrics=HealthMetrics(
            total_resources=total_resources,
            total_claims=total_claims,
            cleanup_thread_alive=_cleanup_thread.is_alive(),
        ),
        config=HealthConfig(
            claim_timeout_seconds=CLAIM_TIMEOUT_SECONDS,
            cleanup_interval_seconds=CLEANUP_INTERVAL_SECONDS,
        ),
    )


# =============================================================================
# Legacy example function (kept for reference)
# =============================================================================


@dataclass
class AddPayload:
    x: int
    y: int


@function
def add(context, event: AddPayload) -> int:
    logger.info("Received add request")
    return event.x + event.y

See https://www.palantir.com/docs/foundry/compute-modules/get-started

3 Likes

This topic was automatically closed 91 days after the last reply. New replies are no longer allowed.