Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions docs/sources/konflux.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Source: konflux

The ``konflux`` push source allows the loading of content from local JSON files
organized by advisory. This source is designed for use with Konflux-generated
advisory metadata and does not require network access or external API calls.
advisory metadata and queries a Pulp 3 server to resolve RPM content hrefs.

Supported content types:

Expand All @@ -28,6 +28,11 @@ Multiple advisories can be specified with a comma-separated list:

``konflux:/path/to/konflux/data?advisories=RHSA-2020:0509,RHSA-2020:0510``

Pulp 3 connection parameters (``pulp_url``, ``pulp_cert``, ``pulp_key``,
``pulp_domain``) are required and can be provided as URL query parameters,
keyword arguments to :meth:`~pushsource.Source.get`, or environment variables
(see :class:`~pushsource.KonfluxSource` for details).

The base directory should contain subdirectories named after each advisory ID.
Each advisory subdirectory must contain:

Expand Down Expand Up @@ -87,9 +92,9 @@ Unlike the `ErrataSource`, the `KonfluxSource`:

* Reads from local JSON files rather than querying the Errata API
* Does not require Koji integration
* Queries Pulp 3 by SHA256 to resolve RPM ``pulp_href`` for Pulp-to-Pulp syncing
* Does not currently support filtering by architecture (this use case may be supported in the future)
* Currently produces RPMs and advisories (additional content types such as modules and container images can be supported in the future)
* RPM push items have ``src=None`` (no local RPM files, only metadata)

Python API reference
--------------------
Expand Down
1 change: 1 addition & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ frozenlist2
kobo
koji>=1.18
more-executors>=2.7.0
pubtools-pulplib>=2.44.0
pushcollector
pyasn1
python-dateutil
Expand Down
216 changes: 147 additions & 69 deletions requirements.txt

Large diffs are not rendered by default.

251 changes: 203 additions & 48 deletions src/pushsource/_impl/backend/konflux_source/konflux_source.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import os

import anyio
from more_executors import Executors

from ...source import Source
Expand All @@ -9,6 +11,8 @@

from .konflux_loader import KonfluxLoader

from pubtools.pulplib import Pulp3Client

LOG = logging.getLogger("pushsource.konflux")


Expand All @@ -30,6 +34,12 @@ def __init__(
self,
url,
advisories,
pulp_url=None,
Comment thread
crungehottman marked this conversation as resolved.
pulp_cert=None,
pulp_key=None,
pulp_user=None,
pulp_password=None,
pulp_domain=None,
threads=4,
timeout=60 * 60,
):
Expand All @@ -46,18 +56,95 @@ def __init__(
Advisory ID(s) to process. Can be a single string or list.
Multiple IDs can be comma-separated.

pulp_url (str):
URL of hosted Pulp server
(e.g., https://packages.redhat.com/).
If omitted, uses ``PUSHSOURCE_KONFLUX_PULP_URL``
environment variable.

pulp_cert (str):
Path to TLS client certificate for Pulp authentication.
If omitted, uses ``PUSHSOURCE_KONFLUX_PULP_CERT``
environment variable.

pulp_key (str):
Path to TLS client key for Pulp authentication.
If omitted, uses ``PUSHSOURCE_KONFLUX_PULP_KEY``
environment variable.

pulp_user (str):
Username for Pulp basic authentication.
If omitted, uses ``PUSHSOURCE_KONFLUX_PULP_USER``
environment variable. Use with ``pulp_password``
as an alternative to certificate authentication.

pulp_password (str):
Password for Pulp basic authentication.
If omitted, uses ``PUSHSOURCE_KONFLUX_PULP_PASSWORD``
environment variable.

pulp_domain (str):
Pulp domain name (e.g., "konflux-myteam-tenant").
If omitted, uses ``PUSHSOURCE_KONFLUX_PULP_DOMAIN``
environment variable.

threads (int):
Number of threads for concurrent processing.

timeout (int):
Timeout in seconds for operations.
"""
self._base_dir = url
self._advisories = list_argument(advisories)
self._threads = threads
self._timeout = timeout

self._loader = KonfluxLoader(url)

self._advisories = list_argument(advisories)

# Resolve Pulp params from env vars if not provided
pulp_url = pulp_url or os.environ.get("PUSHSOURCE_KONFLUX_PULP_URL")
pulp_cert = pulp_cert or os.environ.get("PUSHSOURCE_KONFLUX_PULP_CERT")
pulp_key = pulp_key or os.environ.get("PUSHSOURCE_KONFLUX_PULP_KEY")
pulp_user = pulp_user or os.environ.get("PUSHSOURCE_KONFLUX_PULP_USER")
pulp_password = pulp_password or os.environ.get(
"PUSHSOURCE_KONFLUX_PULP_PASSWORD"
)
pulp_domain = pulp_domain or os.environ.get("PUSHSOURCE_KONFLUX_PULP_DOMAIN")

# Validate required params
if not pulp_url:
raise RuntimeError("Required parameter not provided: pulp_url")
if not pulp_domain:
raise RuntimeError("Required parameter not provided: pulp_domain")

# Validate that at least one auth method is provided
has_cert_auth = pulp_cert and pulp_key
has_basic_auth = pulp_user and pulp_password
if not has_cert_auth and not has_basic_auth:
raise RuntimeError(
"Pulp authentication not configured. Provide either "
"pulp_cert/pulp_key for mTLS or pulp_user/pulp_password "
"for basic authentication."
)

LOG.info(
"Initializing Pulp3 client: url=%s, domain=%s, auth=%s",
pulp_url,
pulp_domain,
"cert" if has_cert_auth else "basic",
)

# Build Pulp3Client kwargs
self._pulp_client_kwargs = {
"url": pulp_url,
"domain": pulp_domain,
}
if has_cert_auth:
self._pulp_client_kwargs["cert"] = (pulp_cert, pulp_key)
elif has_basic_auth:
self._pulp_client_kwargs["auth"] = (pulp_user, pulp_password)

self._executor = Executors.thread_pool(
name="pushsource-konflux", max_workers=threads
).with_cancel_on_shutdown()
Expand Down Expand Up @@ -152,71 +239,139 @@ def _create_rpm_items(self, data):
Since we don't use koji, we construct RPM push items directly
from the information in advisory_cdn_filelist.json.

Collects all RPM SHA256 checksums and resolves their pulp_hrefs
in a single batch query, then builds the push items.

Parameters:
data (KonfluxAdvisoryData):
Advisory data containing metadata and filelist

Returns:
list[RpmPushItem]: List of RPM push items
"""
items = []
# First pass: collect all RPM info and SHA256 checksums
rpm_entries = []
all_sha256s = []

for build_nvr, build_data in data.filelist.items():
if "rpms" in build_data:
checksums = build_data.get("checksums", {})
sig_key = build_data.get("sig_key")

for rpm_filename, destinations in build_data["rpms"].items():
# Construct RPM push item
item = self._create_rpm_item(
filename=rpm_filename,
build_nvr=build_nvr,
destinations=destinations,
checksums=checksums,
signing_key=sig_key,
origin=data.advisory_id,
if "rpms" not in build_data:
continue
checksums = build_data.get("checksums", {})
sig_key = build_data.get("sig_key")

for rpm_filename, destinations in build_data["rpms"].items():
md5sum = checksums.get("md5", {}).get(rpm_filename)
sha256sum = checksums.get("sha256", {}).get(rpm_filename)

if not sha256sum:
raise RuntimeError(
"No SHA256 checksum found for RPM %s in advisory %s"
% (rpm_filename, data.advisory_id)
)
items.append(item)

all_sha256s.append(sha256sum)
rpm_entries.append(
{
"filename": rpm_filename,
"build_nvr": build_nvr,
"destinations": destinations,
"md5sum": md5sum,
"sha256sum": sha256sum,
"signing_key": sig_key,
}
)

# Batch-resolve all SHA256 checksums to pulp_hrefs
href_map = anyio.run(self._resolve_rpm_hrefs, all_sha256s)

# Second pass: build push items using resolved hrefs
items = []
for entry in rpm_entries:
sha256sum = entry["sha256sum"]
pulp_href = href_map.get(sha256sum)
if not pulp_href:
raise RuntimeError(
"RPM %s (sha256=%s) not found in Pulp"
% (entry["filename"], sha256sum)
)

items.append(
RpmPushItem(
name=entry["filename"],
state="PENDING",
src=pulp_href,
dest=sorted(entry["destinations"]),
md5sum=entry["md5sum"],
sha256sum=sha256sum,
origin=data.advisory_id,
build=entry["build_nvr"],
signing_key=entry["signing_key"],
)
)

return items

def _create_rpm_item(
self, filename, build_nvr, destinations, checksums, signing_key, origin
):
"""Create a single RpmPushItem from filelist data.
# Pulp's content search API enforces a filter complexity limit,
# rejecting queries with more than 7 OR clauses with:
# {"q": ["Filter expression exceeds allowed complexity."]}
_PULP_BATCH_SIZE = 7

async def _resolve_rpm_hrefs(self, sha256sums, batch_size=_PULP_BATCH_SIZE):
"""Resolve SHA256 checksums to pulp_hrefs in batched queries.

Uses one Pulp3Client instance and queries checksums in batches
to stay within Pulp's filter complexity limits.

Parameters:
filename (str):
RPM filename
build_nvr (str):
Build NVR
destinations (list[str]):
List of repository destinations
checksums (dict):
Dict with 'md5' and 'sha256' checksum mappings
signing_key (str):
Signing key ID
origin (str):
Advisory ID
sha256sums (list[str]):
List of SHA256 checksums to resolve

batch_size (int):
Maximum number of checksums per query (default: 7)

Returns:
RpmPushItem: RPM push item
dict: Mapping of sha256 -> pulp_href

Raises:
RuntimeError: If any RPM is not found or has no pulp_href
"""
# Extract checksums for this specific RPM
md5sum = checksums.get("md5", {}).get(filename)
sha256sum = checksums.get("sha256", {}).get(filename)

return RpmPushItem(
name=filename,
state="PENDING",
src=None, # RPMs are stored in artifact storage
dest=sorted(destinations),
md5sum=md5sum,
sha256sum=sha256sum,
origin=origin,
build=build_nvr,
signing_key=signing_key,
)
href_map = {}

async with Pulp3Client(**self._pulp_client_kwargs) as client:
for i in range(0, len(sha256sums), batch_size):
batch = sha256sums[i : i + batch_size]
query = client.build_query_sha256(batch)

results = await client.search_content(
query=query,
fields=["pulp_href", "name", "sha256"],
limit=len(batch),
)

for result in results:
sha256 = result.get("sha256")
pulp_href = result.get("pulp_href")
if not pulp_href:
raise RuntimeError(
"RPM with SHA256 %s found but has no pulp_href: %s"
% (sha256, result)
)
href_map[sha256] = pulp_href
LOG.debug(
"Found RPM in Pulp: sha256=%s, pulp_href=%s, name=%s",
sha256,
pulp_href,
result.get("name"),
)

LOG.info(
"Resolved %d/%d RPMs in Pulp (batch %d)",
len(href_map),
len(sha256sums),
(i // batch_size) + 1,
)

return href_map


# Register the backend
Expand Down
18 changes: 18 additions & 0 deletions tests/konflux/data/RHBA-2020:1234/advisory_cdn_filelist.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"bash-5.0.17-1.el8": {
"checksums": {
"md5": {
"bash-5.0.17-1.el8.x86_64.rpm": "aabbccdd00112233aabbccdd00112233"
},
"sha256": {
"bash-5.0.17-1.el8.x86_64.rpm": "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
}
},
"rpms": {
"bash-5.0.17-1.el8.x86_64.rpm": [
"rhel-8-for-x86_64-baseos-rpms__8"
]
},
"sig_key": "fd431d51"
}
}
Loading
Loading