Add support for RPM Errata (updateinfo.xml) (#37125)

Resolves https://github.com/go-gitea/gitea/issues/37124

This PR adds support for RPM Errata (security advisories, bugfixes, and
enhancements) to Gitea's built-in RPM registry.

---------

Signed-off-by: Rohan Guliani <rohansguliani@google.com>
Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
This commit is contained in:
Rohan Guliani
2026-04-07 12:39:53 -04:00
committed by GitHub
parent 290edc1614
commit 1b200dc3da
5 changed files with 623 additions and 18 deletions
+333 -9
View File
@@ -12,12 +12,14 @@ import (
"io"
"net/http"
"net/http/httptest"
"slices"
"strings"
"testing"
"code.gitea.io/gitea/models/packages"
"code.gitea.io/gitea/models/unittest"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/json"
rpm_module "code.gitea.io/gitea/modules/packages/rpm"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/tests"
@@ -74,6 +76,15 @@ Mu0UFYgZ/bYnuvn/vz4wtCz8qMwsHUvP0PX3tbYFUctAPdrY6tiiDtcCddDECahx7SuVNP5dpmb5
content, err := io.ReadAll(zr)
assert.NoError(t, err)
decodeGzipXML := func(t testing.TB, resp *httptest.ResponseRecorder, v any) {
t.Helper()
zr, err := gzip.NewReader(resp.Body)
assert.NoError(t, err)
assert.NoError(t, xml.NewDecoder(zr).Decode(v))
}
rootURL := fmt.Sprintf("/api/packages/%s/rpm", user.Name)
for _, group := range []string{"", "el9", "el9/stable"} {
@@ -247,15 +258,6 @@ gpgkey=%sapi/packages/%s/rpm/repository.key`,
assert.Contains(t, resp.Body.String(), "-----BEGIN PGP SIGNATURE-----")
})
decodeGzipXML := func(t testing.TB, resp *httptest.ResponseRecorder, v any) {
t.Helper()
zr, err := gzip.NewReader(resp.Body)
assert.NoError(t, err)
assert.NoError(t, xml.NewDecoder(zr).Decode(v))
}
t.Run("primary.xml.gz", func(t *testing.T) {
defer tests.PrintCurrentTest(t)()
@@ -420,6 +422,328 @@ gpgkey=%sapi/packages/%s/rpm/repository.key`,
})
})
t.Run("Errata", func(t *testing.T) {
defer tests.PrintCurrentTest(t)()
type updateInfo struct {
XMLName xml.Name `xml:"updates"`
Xmlns string `xml:"xmlns,attr"`
Updates []*rpm_module.Update `xml:"update"`
}
errataURL := fmt.Sprintf("%s/package/%s/%s/errata", groupURL, packageName, packageVersion)
advisory := rpm_module.Update{
From: "security@example.com",
Status: "stable",
Type: "security",
Version: "1.0",
ID: "CVE-2023-1234",
Title: "Test Security Update",
Severity: "Important",
Description: "This is a test security update.",
References: []*rpm_module.Reference{
{
Href: "https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-1234",
ID: "CVE-2023-1234",
Title: "CVE-2023-1234",
Type: "cve",
},
},
PkgList: []*rpm_module.Collection{
{
Short: "el9",
Packages: []*rpm_module.UpdatePackage{
{
Arch: packageArchitecture,
Name: packageName,
Release: "1",
Src: "gitea-test-1.0.2-1.src.rpm",
Version: "1.0.2",
Filename: fmt.Sprintf("%s-%s.%s.rpm", packageName, packageVersion, packageArchitecture),
},
},
},
},
}
t.Run("Success", func(t *testing.T) {
defer tests.PrintCurrentTest(t)()
updates := []*rpm_module.Update{&advisory}
body, err := json.Marshal(updates)
assert.NoError(t, err)
req := NewRequestWithBody(t, "POST", errataURL, bytes.NewReader(body)).
AddBasicAuth(user.Name)
MakeRequest(t, req, http.StatusOK)
url := groupURL + "/repodata"
// Check repomd.xml contains updateinfo
req = NewRequest(t, "GET", url+"/repomd.xml")
resp := MakeRequest(t, req, http.StatusOK)
type testRepoData struct {
Type string `xml:"type,attr"`
}
var repomd struct {
Data []*testRepoData `xml:"data"`
}
err = xml.NewDecoder(resp.Body).Decode(&repomd)
require.NoError(t, err)
found := slices.IndexFunc(repomd.Data, func(s *testRepoData) bool {
return s.Type == "updateinfo"
}) >= 0
assert.True(t, found, "updateinfo not found in repomd.xml")
// Now check updateinfo.xml.gz
req = NewRequest(t, "GET", url+"/updateinfo.xml.gz")
resp = MakeRequest(t, req, http.StatusOK)
var result updateInfo
decodeGzipXML(t, resp, &result)
assert.Equal(t, "http://linux.duke.edu/metadata/updateinfo", result.Xmlns)
assert.Len(t, result.Updates, 1)
assert.Equal(t, "CVE-2023-1234", result.Updates[0].ID)
assert.NotEmpty(t, result.Updates[0].Issued.Date)
assert.NotEmpty(t, result.Updates[0].Updated.Date)
})
t.Run("InvalidJSON", func(t *testing.T) {
defer tests.PrintCurrentTest(t)()
req := NewRequestWithBody(t, "POST", errataURL, strings.NewReader("invalid json")).
AddBasicAuth(user.Name)
MakeRequest(t, req, http.StatusBadRequest)
})
t.Run("NullElementsInJSON", func(t *testing.T) {
defer tests.PrintCurrentTest(t)()
// Send a payload with null inside arrays
payload := `[
{
"id": "CVE-2023-5678",
"from": "security@example.com",
"status": "stable",
"type": "security",
"version": "1.0",
"title": "Test Null Elements",
"severity": "Important",
"description": "Test null elements",
"pkg_list": [
null,
{
"short": "el9",
"packages": [
null,
{
"arch": "x86_64",
"name": "gitea",
"release": "1",
"src": "gitea-1.0.0-1.src.rpm",
"version": "1.0.0",
"filename": "gitea-1.0.0-1.x86_64.rpm"
}
]
}
]
}
]`
req := NewRequestWithBody(t, "POST", errataURL, strings.NewReader(payload)).
AddBasicAuth(user.Name)
MakeRequest(t, req, http.StatusOK)
// Verify it was stored correctly (skipping nulls)
url := groupURL + "/repodata"
req = NewRequest(t, "GET", url+"/updateinfo.xml.gz")
resp := MakeRequest(t, req, http.StatusOK)
var result updateInfo
decodeGzipXML(t, resp, &result)
// We need to find the new advisory CVE-2023-5678
var newAdvisory *rpm_module.Update
for _, u := range result.Updates {
if u.ID == "CVE-2023-5678" {
newAdvisory = u
break
}
}
assert.NotNil(t, newAdvisory)
assert.Len(t, newAdvisory.PkgList, 1)
assert.Equal(t, "el9", newAdvisory.PkgList[0].Short)
assert.Len(t, newAdvisory.PkgList[0].Packages, 1)
assert.Equal(t, "gitea", newAdvisory.PkgList[0].Packages[0].Name)
})
t.Run("PackageNotFound", func(t *testing.T) {
defer tests.PrintCurrentTest(t)()
badURL := fmt.Sprintf("%s/package/%s/non-existent-version/errata", groupURL, packageName)
updates := []*rpm_module.Update{&advisory}
body, err := json.Marshal(updates)
assert.NoError(t, err)
req := NewRequestWithBody(t, "POST", badURL, bytes.NewReader(body)).
AddBasicAuth(user.Name)
MakeRequest(t, req, http.StatusNotFound)
})
t.Run("MergeAdvisories", func(t *testing.T) {
defer tests.PrintCurrentTest(t)()
// Upload a second advisory with the same ID but a different package
advisory2 := advisory
advisory2.PkgList = []*rpm_module.Collection{
{
Short: "el9",
Packages: []*rpm_module.UpdatePackage{
{
Arch: packageArchitecture,
Name: "another-package",
Release: "1",
Src: "another-package-1.0.0-1.src.rpm",
Version: "1.0.0",
Filename: "another-package-1.0.0-1.x86_64.rpm",
},
},
},
}
updates := []*rpm_module.Update{&advisory2}
body, err := json.Marshal(updates)
assert.NoError(t, err)
req := NewRequestWithBody(t, "POST", errataURL, bytes.NewReader(body)).
AddBasicAuth(user.Name)
MakeRequest(t, req, http.StatusOK)
// Check updateinfo.xml.gz again
url := groupURL + "/repodata"
req = NewRequest(t, "GET", url+"/updateinfo.xml.gz")
resp := MakeRequest(t, req, http.StatusOK)
var result updateInfo
decodeGzipXML(t, resp, &result)
var targetUpdate *rpm_module.Update
for _, u := range result.Updates {
if u.ID == "CVE-2023-1234" {
targetUpdate = u
break
}
}
assert.NotNil(t, targetUpdate)
// Verify that package lists are merged into the same collection
assert.Len(t, targetUpdate.PkgList, 1)
assert.Len(t, targetUpdate.PkgList[0].Packages, 2)
assert.Equal(t, "another-package", result.Updates[0].PkgList[0].Packages[1].Name)
})
t.Run("NewCollection", func(t *testing.T) {
defer tests.PrintCurrentTest(t)()
// Upload a third advisory with the same ID but a different collection
advisory3 := advisory
advisory3.PkgList = []*rpm_module.Collection{
{
Short: "el8",
Packages: []*rpm_module.UpdatePackage{
{
Arch: packageArchitecture,
Name: packageName,
Release: "1",
Src: "gitea-test-1.0.2-1.src.rpm",
Version: "1.0.2",
Filename: fmt.Sprintf("%s-%s.%s.rpm", packageName, packageVersion, packageArchitecture),
},
},
},
}
updates := []*rpm_module.Update{&advisory3}
body, err := json.Marshal(updates)
assert.NoError(t, err)
req := NewRequestWithBody(t, "POST", errataURL, bytes.NewReader(body)).
AddBasicAuth(user.Name)
MakeRequest(t, req, http.StatusOK)
// Check updateinfo.xml.gz again
url := groupURL + "/repodata"
req = NewRequest(t, "GET", url+"/updateinfo.xml.gz")
resp := MakeRequest(t, req, http.StatusOK)
var result updateInfo
decodeGzipXML(t, resp, &result)
var targetUpdate *rpm_module.Update
for _, u := range result.Updates {
if u.ID == "CVE-2023-1234" {
targetUpdate = u
break
}
}
assert.NotNil(t, targetUpdate)
// Verify that we now have 2 collections
assert.Len(t, targetUpdate.PkgList, 2)
// We need to be careful with order, map iteration is random
// Let's check both exist
shorts := []string{targetUpdate.PkgList[0].Short, targetUpdate.PkgList[1].Short}
assert.Contains(t, shorts, "el9")
assert.Contains(t, shorts, "el8")
})
t.Run("Idempotency", func(t *testing.T) {
defer tests.PrintCurrentTest(t)()
updates := []*rpm_module.Update{&advisory}
body, err := json.Marshal(updates)
assert.NoError(t, err)
// Post twice
req := NewRequestWithBody(t, "POST", errataURL, bytes.NewReader(body)).
AddBasicAuth(user.Name)
MakeRequest(t, req, http.StatusOK)
req = NewRequestWithBody(t, "POST", errataURL, bytes.NewReader(body)).
AddBasicAuth(user.Name)
MakeRequest(t, req, http.StatusOK)
// Check updateinfo.xml.gz
url := groupURL + "/repodata"
req = NewRequest(t, "GET", url+"/updateinfo.xml.gz")
resp := MakeRequest(t, req, http.StatusOK)
var result updateInfo
decodeGzipXML(t, resp, &result)
var targetUpdate *rpm_module.Update
for _, u := range result.Updates {
if u.ID == "CVE-2023-1234" {
targetUpdate = u
break
}
}
assert.NotNil(t, targetUpdate)
assert.Len(t, targetUpdate.PkgList, 2)
var el9Coll *rpm_module.Collection
for _, coll := range targetUpdate.PkgList {
if coll.Short == "el9" {
el9Coll = coll
break
}
}
assert.NotNil(t, el9Coll)
assert.Len(t, el9Coll.Packages, 2)
})
})
t.Run("Delete", func(t *testing.T) {
defer tests.PrintCurrentTest(t)()